-
[코틀린리액티브] 03. 옵저버블과 옵저버와 구독자읽고있는책 2020. 5. 16. 19:09
3장에서는 세 가지를 알아본다.
Observables, Observers, Subjects
Observables
Observable 을 설명하면서 Observer 가 필요하다. (이하 옵저버블과 옵저버)
옵저버블은 옵저버가 Pull 하는 방식을 갖고 있지 않다. 옵저버블은 옵저버(컨슈머) 에게 Push 하는 방식을 가지고 있다.
간단한 3단계를 보자.
- 옵저버는 옵저버블을 구독한다.
- 옵저버블이 그 내부의 아이템을 보내기 시작한다.
- 옵저버는 옵저버블이 내보내는 모든 아이템에 반응한다.
'구독' 이라는 단어가 생소하면서 어쩌면 당연하다는 생각이 든다. 옵저버블이 옵저버에게 Push 하는 방식이기 때문에 적어도 누구에게 Push 해야하는지는 알고 있어야 하지 않을까? 누구에게 할 지 미리 알고 있다는 것 정도로 '구독'을 이해하려 한다.
옵저버블은 onNext, onComplete, onError 와 같은 이벤트 메소드를 통해 작동한다. (모나드...)
예제에서 object 키워드는 익명클래스를 위해 쓰임.import io.reactivex.Observable import io.reactivex.Observer import io.reactivex.disposables.Disposable import io.reactivex.rxkotlin.toObservable fun main(args: Array<String>) { val observer : Observer<Any> = object : Observer<Any> { override fun onComplete() { println("onComplete") } override fun onNext(t: Any) { println("onNext") } override fun onError(e: Throwable) { println("onError") } override fun onSubscribe(d: Disposable) { println("onSubscribe") } } val observable : Observable<Any> = listOf("One", 2, "Three", "Four", 4.5).toObservable() observable.subscribe(observer) val observableOnList : Observable<Any> = Observable.just( listOf("One", 2, "Three", "Four", 4.5), listOf("List with Single"), listOf(1,2,3,4)) observableOnList.subscribe(observer) }출력은 다음과 같다. 책에서는 아이템 하나하나 print 하게 해서 보는데, 여기에선 메소드 호출만 출력했다.
onSubscribe onNext onNext onNext onNext onNext onComplete onSubscribe onNext onNext onNext onComplete Process finished with exit code 0위에서 이야기한 것처럼, 구독을 해놓은 것만으로도 옵저버블에서 Push 를 해주면서 onNext 가 실행되고,
최종적으로 OnComplete 까지 호출되는 것을 확인할 수 있다.
위에서는 옵저버블을 생성할 때, listOf 의 toObservable 을 이용하고, Observalbe.just 를 이용해서 생성했다.
옵저버블을 생성하는 방법에 대해 좀 더 알아보자.
Observable.create
val observable : Observable<String> = Observable.create<String> { it.onNext("1") it.onNext("2") it.onComplete() }인텔리제이에서 위 코드를 입력하면서 옵저버블의 create 입력하면 ObservableEmitter 를 입력받는 다는 것을 알 수 있다.
특별한 특징은 모르겠고, 옵저버블의 여러가지 방법을 보고 비교해보자.
Observable.from
RxKotlin1 에서는 from 만 존재하지만, RxKotlin2 에서는 fromArray, fromIterable, fromFuture 등이 추가되었다고 한다.
//1 val list = listOf("String 1", "String 2", "String 3") val observableFromIterable : Observable<String> = Observable.fromIterable(list) observableFromIterable.subscribe(observer) //2 val callable = object : Callable<String> { override fun call(): String { return "Callable" } } val observableFromCallable : Observable<String> = Observable.fromCallable(callable) observableFromIterable.subscribe(observer) //3 val future : Future<String> = object : Future<String> { override fun get(): String = "Hello" override fun get(timeout: Long, unit: TimeUnit): String = "Helloo" override fun isDone(): Boolean = true override fun isCancelled(): Boolean = false override fun cancel(mayInterruptIfRunning: Boolean): Boolean = false } val observableFromFuture: Observable<String> = Observable.fromFuture(future) observableFromIterable.subscribe(observer)Iterable, Callable, Future 를 통해서 옵저버블을 생성했다. 이를 통해서 옵저버블을 생성하고 데이터를 Push 할 수 있다.
또한 옵저버블 from 이 과거 사용한 예제의 .toObservable() 내부에서도 사용되고 있다. 이는 코틀린의 확장함수를 이용한 기능이다.
list.toObservable()Observable.just
드디어 차이점을 비교해볼만 한 것이 나왔다.
just 는 넘겨진 인자만을 배출하는 옵저버블을 생성한다.
iterable 을 넘겨도 전체목록을 하나의 아이템으로 배출한다. 코드 결과를 통해 보자.
val list: List<String> = listOf("A", "B", "C", "D", "E") list.toObservable() Observable.just(listOf("A","B","C","D","E")).subscribe(observer)하나는 toObserverable 곧, 옵저버블 from 을 이용하였고, 다른 하다는 just 를 이용하였다.
Subs Next A Next B Next C Next D Next E Complete SUbs Next [A,B,C,D,E] Completejust 는 list 를 하나의 Next 에서 전부 반환한다. 리스트와 맵을 단일 아이템으로 취급하고 있다는 것이다.
하지만 list 가 아닌 Observable.just("A", "B") 와 같이 인자를 나누어 보낼 경우 별개의 아이템으로 취급한다.
Observable.range
range(1, 10) 과 같이 사용하고, start 부터 시작해 count 만큼의 정수를 내보낸다.
Observable.empty
empty 는 onNext 가 없다. 바로 onComplete 다.
Observable.interval
지정된 간격만큼 숫자를 0부터 순차적으로 내보낸다.
Observable.timer
지정된 시간이 경과한 후에 한번만 실행된다.
옵저버블의 메소드들을 살펴보았다.
이제 Observer 에 대해서 알아보자.
RxKotlin1 의 Subscriber 가 RxKotlin2 에서는 Observer 로 변경되었다. 실제로 이 글을 쓰면서도, 책에 있는 예제도 버전에 따라 큰 차이를 보여주고 있어, 예제를 타이핑해볼 때에도 컴파일이 되는지는 포기하고 타이핑해보고 있다. 여튼 Observer 로 변경되었긴 하지만, subscribe() 메서드에 전달하는 것은 Subscriber 이고, 이는 Observer 를 구현한다.
Observer
옵저버를 설명하면서 앞서 옵저버블을 이용하여, 옵저버를 구독하는 방법에 대해 설명했다.
구독하는 방법도 있으니, 구독을 해지하는 방법도 있을 것이다.
앞서 옵저버를 구현하면서 onSubscribe 메소드를 보았다.
val observer : Observer<Any> = object : Observer<Any> { override fun onComplete() { println("onComplete") } override fun onNext(t: Any) { println("onNext") } override fun onError(e: Throwable) { println("onError") } override fun onSubscribe(d: Disposable) { println("onSubscribe") } }Disposable 인터페이스의 인스턴스를 이용하여 구독을 해지하는 방법이 있다.
Disposable 을 받아서 disposable.dispose() 메소드를 호출하면 구독을 멈출 수 있다.
Observable 의 두 가지 행동 스타일 (행동 범주)
옵저버블은 구독자들에게 아이템을 Push 해준다. 이 옵저버블에는 두 가지 행동 스타일이 있다.
Cold 와 Hot 이다. 책을 읽으면서 이미 한두번 이름을 들어봐서 망정이지 일단 이름 자체가 전혀 행동이 예상되지 않는다.
Cold Observable
콜드 옵저버블은 위에서 다룬 옵저버블의 스타일이 전부 콜드옵저버블이다.
정의는 각 구독마다 처음부터 아이템을 배출하는 것을 말한다. 원래 당연한 거 아닌가... 다른 방법도 있나 싶은 생각이 들 것 같다.
핫 옵저버블이 뭔가 특별한 것이라고 기대하고 봐보자.
Hot Observable
콜드 옵저버블은 위에서 설명한 것처럼 구독이 이루어지면 아이템을 소진할 때까지 보내기 시작한다.
핫 옵저버블은 구독을 할 필요가 없다. 구독과 관계없이 계속 아이템을 보낸다.
둘을 비교한다면, 콜드 옵저버블은 구몬 학습지처럼 구독을 신청하면 1단계 수업부터 쭉 학습지를 보내준다.
핫 옵저버블은 TV 채널과 비슷하다. 구독하지 않아도 이미 방송이 나오고 있다. 이미 방송하고 있었기에 1단계가 아닐 수도 있다.
굳이 왜 Cold 와 Hot 이라는 단어를 썼을까 생각해보면, Cold 는 구독을 신청할 때까지 동작을 하고 있지 않다. Hot 은 구독을 신청하지 않아도 이미 동작하고 있다. 이래서 Cold 와 Hot 으로 생각하면 기억하기 편할 듯 하다.
ConnectableObservable
ConnectableObservable 은 가장 유용한 핫 옵저버블 중 하나이다.
fun main(args: Array<String>) { val connectableObservable = listOf("String 1", "String 2").toObservable().publish() connectableObservable.subscribe( { println("Subs 1 : $it") } ) connectableObservable.map(String::reversed).subscribe( { println("Subs 2 : $it") } ) connectableObservable.connect() connectableObservable.subscribe( { println("Subs 3 : $it") }) }처음 보는 메소드가 보인다. publish, connect
publish 는 list 의 toObservable 을 통해 생성한 옵저버블을 connectableObservable 로 바꾸는 역할을 한다.
다시 말하면, 콜드 옵저버블을 핫옵저버블로 바꾸는 역할을 한다.
connectableObservable 은 connect 이전까지 subscribe 된 것들을 연결하고 하나의 Push 로, 마치 하나의 observable 로 동작할 수 있게 한다. 따라서 예제의 Subs 3 은 connect 이후기 때문에 Push 에 포함되지 못한다.
connectableObservable 은 앞에서 말한 핫, 콜드의 차이점과 약간은 색이 다른 차이점처럼 보이지만, connect 뒤에 구독이 아예 적용이 안되는 것이 아니다라는 것을 알면 앞의 설명과 같은 맥락이라는 것을 이해할 수 있다. 또한 위에서 핫은 구독을 할 필요가 없다고 설명했지만 아예 구독을 안하는 건 아니라는 것도 알 수 있다. 왜 책의 설명이 앞뒤가 다르지 라는 생각보다는 핫, 콜드의 흐름을 이해하는데 중점을 두고 넘어갔다.
Subject
Subject 는 옵저버블 + 옵저버 라고 할 수 있다. Push 하는 것과 그것을 받는 주체를 합쳤다고 이해했다.
Subject 의 대표적인 종류들을 보면서 하나씩 이해해보자. (AsyncSubject, PublishSubject, BehaviorSubject, ReplaySubject)
코드 예제가 있는데, 의미를 파악하는 것이 더 중요해보여 그림과 정의만 명시하였다.
AsyncSubject

http://reactivex.io/documentation/ko/subject.html AsyncSubject 는 옵저버블의 마지막값과 배출만 전달한다.
PublishSubject

http://reactivex.io/documentation/ko/subject.html PublishSubject 는 구독시점부터 모든 값을 전달한다.
BehaviorSubject

http://reactivex.io/documentation/ko/subject.html Async와 Publish 를 합친 느낌이다. 구독 직전 마지막 아이템과 구독 후 모든 값을 전달한다.
ReplaySubject

http://reactivex.io/documentation/ko/subject.html Replay 는 마치 콜드 옵저버블 같다. 구독 시점에 상관없이 처음부터 다 전달한다.
이렇게 Subject 의 종류까지 보았다. 각각의 특징은 알았지만, 왜 이런 종류들로 이루어져 있으며 어떤 장단점이 있는지는 명시되어 있지 않다. 좀 더 진도를 나가면서 알 수 있을 것 같아서 굳이 더 찾아보진 않았다.
반응형'읽고있는책' 카테고리의 다른 글
[코틀린리액티브] 02.코틀린과 RxKotlin 을 사용한 함수형 프로그래밍 (0) 2020.05.15 [코틀린리액티브] 01. 리액티브 프로그래밍 소개 (0) 2020.05.15