10 min to read
3-03.Subject는 무엇인가
Subject를 사용하는 방법에 대해 알아보고, 다양한 Subjects를 알아봅시다.
Subjects
주제
Hot Observable
핫 옵저버블을 구현하는 좋은 방법 중 하나인 Subjects
주제가 있습니다. Subjects
는 기본적으로 옵저버블과 옵저버의 조합입니다. 그 이유는 그 두가지의 특징을 갖고 있기 때문입니다.
핫 옵저버블과 마찬가지로, 내부 Observer
를 유지하고, 배출시에 구독한 모든 옵저버에게 단일 푸시를 주게됩니다.
다음은 Subject
의 특징들입니다.
- 옵저버블이 갖고있어야 하는 모든 연산자를 갖고있다.
- 옵저버와 마찬가지로 모든 값에 접근이 가능하다.
onComplete
완료 시,onError
에러 시,onDispose
구독 해지 이후에는 재 사용이 불가능합니다.- Subject는 동시에 옵저버이기 때문에, 값을
onNext(T)
로 전달하면, 그 값을 온전히 Observable로 전달 받을 수 있게 된다.
따라서 Subjects
는 옵저버블과 동시에 옵저버의 특성을 가지고 있습니다.
앞장에서 저희는 조금이나마 Subjects의 사용법을 익혀보았지만, 좀 더 정확히 알기 위해 예제를 보겠습니다.
fun main(args: Array<String>) {
val observable = Observable.interval(100, TimeUnit.MILLISECONDS)//1
val subject = PublishSubject.create<Long>()//2
observable.subscribe(subject)//3
subject.subscribe {//4
println("Received $it")
}
runBlocking { delay(1100) }//5
}
주석1에서는 옵저버블 객체를 생성이후 100ms간격으로 옵저버블 인스턴스를 다시 생성합니다.
주석2에서는 PublishSubject
를 만들었습니다.(후에 설명하도록 하겠습니다.)
주석3에서는 옵저버처럼 생성한 Subject 인스턴스를 사용해 구독을 하고 있습니다.
주석4에서는 옵저버블 인스턴스처럼 Subject를 구독하여 데이터를 람다를 사용하여 구독하고 있습니다.
주석5는 많이 본 코드입니다. 코루틴 컨텍스트를 지정하여 1100ms만큼 delay를 주었습니다.
결과는 다음과 같습니다.
Received 0 Received 1 Received 2 Received 3 Received 4 Received 5 Received 6 Received 7 Received 8 Received 9 Received 10
Subject
인스턴스는 옵저버블 인스턴스에 의한 배출을 대기하고 있다가 그 배출된 값을 자신의 옵저버 인스턴스에 Broadcasting
방송합니다. 마치 영상 데이터를 우리가 여러 TV에서 한채널을 통해 볼 수 있듯이 말이죠.
이런방식이 가지는 장점이 어떤것이 있길래 사용을 하는것일까요? 그리고, 옵저버블에 직접 구독하지 않고 PublishSubject
를 사용한 이유는 무엇일까요? 다른 예제를 통해 좀 더 깊숙히 이해 해보겠습니다.
fun main(args: Array<String>) {
val observable = Observable.interval(100, TimeUnit.MILLISECONDS)//1
val subject = PublishSubject.create<Long>()//2
observable.subscribe(subject)//3
subject.subscribe {//4
println("Subscription 1 Received $it")
}
runBlocking { delay(1100) }//5
subject.subscribe {//6
println("Subscription 2 Received $it")
}
runBlocking { delay(1100) }//7
}
아까 예제와 비교해 보았을 때 문자열 출력에 Subscription을 1/2로 나눈것 외에는 큰 차이가 없는 코드입니다.
보면 주석6에서 PublishSubject
를 재 구독하였는데, 두번째 구독의 경우 1100ms 이후 하게 되어 처음 11회 발생된 데이터 이후의 값들을 받아보게됩니다.
결과는 다음과 같습니다.
Subscription 1 Received 0 Subscription 1 Received 1 Subscription 1 Received 2 Subscription 1 Received 3 Subscription 1 Received 4 Subscription 1 Received 5 Subscription 1 Received 6 Subscription 1 Received 7 Subscription 1 Received 8 Subscription 1 Received 9 Subscription 1 Received 10 Subscription 1 Received 11 Subscription 2 Received 11 Subscription 1 Received 12 Subscription 2 Received 12 Subscription 1 Received 13 Subscription 2 Received 13 Subscription 1 Received 14 Subscription 2 Received 14 Subscription 1 Received 15 Subscription 2 Received 15 Subscription 1 Received 16 Subscription 2 Received 16 Subscription 1 Received 17 Subscription 2 Received 17 Subscription 1 Received 18 Subscription 2 Received 18 Subscription 1 Received 19 Subscription 2 Received 19 Subscription 1 Received 20 Subscription 2 Received 20 Subscription 1 Received 21 Subscription 2 Received 21
Subjects : 다양한 종류
앞에서 잠깐 언급했듯이, Subjects는 다양한 형태로 제공합니다. 다음은 저희가 앞으로 사용해 볼 가장 유용하고 중요한 Subjects입니다.
- AsyncSubject
- PublishSubject
- BehaviorSubject
- ReplaySubject
필자도 실제로 사용해 본 Subject는 PublishSubject
, BehaviorSubject
뿐이지만, 실제로 실무에서 다양한 상황에서 접할 수 있는 Subject라고 생각합니다.
AsyncSubject
AsyncSubject
는 수신 대기중인 옵저버블의 마지막 값과, 그에대한 배출만 전달하게 됩니다. 쉽게 말하면, 마지막 값만을 전달한다는 것입니다. 다음은 ReactiveX에서 제공하는 AsyncSubject의 다이어그램입니다.
AsyncSubject
는 소스 Observable로부터 배출된 마지막 값만을 배출하고 소스 Observalbe의 동작이 완료된 후에야 동작합니다(만약, 소스 Observable이 아무 값도 배출하지 않으면 AsyncSubject
역시 아무 값도 배출하지 않습니다.)
하지만, 예외가 있습니다. 옵저버블이 어떤 에러로 인하여 종료가 될 경우, AsyncSubject
도 마찬가지로 아무값도 배출하지 않은 상태로 오류를 전달하게 됩니다.
다음은 코드 예제입니다.
fun main(args: Array<String>) {
val observable = Observable.just(1,2,3,4)//1
val subject = AsyncSubject.create<Int>()//2
observable.subscribe(subject)//3
subject.subscribe({//4
//onNext
println("Received $it")
},{
//onError
it.printStackTrace()
},{
//onComplete
println("Complete")
})
subject.onComplete()//5
}
이 예제에서는 Observable.just(vararg T)
를 사용하여 4개의 정수를 생성하였습니다.
그런다음 주석2에서 AsyncSubject를 생성하였습니다.
주석3에서는 전 예제들과 마찬가지로 해당 Subject인스턴스는 옵저버블 인스턴스에서 넘겨받은 값을 구독중인 옵저버로 전달하게 될 것입니다.
주석4에서는 onNext
, onError
, onComplete
세가지 메서드를 구현하였습니다.
주석5에서는 onComplete
를 호출하여 완료를 했습니다.
여러분들이 예상하듯이 옵저버블에서 마지막으로 발행된 값인 4를 출력할 것이고, onComplete
가 호출됨에 따라 Complete가 출력될 것입니다. 결과는 다음과 같습니다.
Received 4 Complete
Subject 인스턴스에서 옵저버블 인스턴스를 구독하지 않고도 onNext
로 값을 넘겨 직접 값을 전달할 수도있습니다. 내부적으로 Subject는 옵저버블 인스턴스가 값을 배출할 때마다 onNext를 호출합니다.
이번엔 위 예제를 조금 수정하여 옵저버블을 구독하지 않고 onNext
를 호출하여 값을 전달하고 다른 구독도 가지게 해보겠습니다.
val subject = AsyncSubject.create<Int>() //1
subject.onNext(1) //2
subject.onNext(2)
subject.onNext(3)
subject.onNext(4)
subject.subscribe({ //3
//onNext
println("S1 Received $it")
},{
//onError
it.printStackTrace()
},{
//onComplete
println("S1 Complete")
})
subject.onNext(5) //4
subject.subscribe({ //5
//onNext
println("S2 Received $it")
},{
//onError
it.printStackTrace()
},{
//onComplete
println("S2 Complete")
})
subject.onComplete()//6
}
위 예제에서는 onNext
를 통해 모든 값을 전달하고 있습니다. ConnecttiveObservable
은 connect 호출 시 배출되기 시작하기 때문에 AsyncSubject는 onComplete
에서만 유일한 값을 배출합니다. 따라서 결과는 다음과 같습니다.
S1 Received 5 S1 Complete S2 Received 5 S2 Complete
PublishSubject
PublishSubject
는 onNext
메서드 또는, 다른 구독을 통해 값을 받았는지는 상관없이 해당 구독 이후에 모든 값들을 옵저버에게 배출하기 시작합니다. 가장 흔하게 사용되는 Subjects중 하나입니다.
다음은 ReactiveX에서 제공하는 PublishSubject의 다이어그램입니다.
주의할 점은, PublishSubject
는 (이를 막지 않는 이상) 생성 시점에서 즉시 항목들을 배출하기 시작할 것입니다.
이런 특성 때문에 주제가 생성되는 시점과 옵저버가 이 주제를 구독하기 시작하는 그 사이에 배출되는 항목들을 잃어 버릴 수 있다는 단점이 있습니다.
따라서, 소스 Observable이 배출하는 모든 항목들의 배출을 보장해야 한다면 create()
를 사용해서 명시적으로 차가운 Observable(항목들을 배출하기 전에 모든 옵저버가 구독을 시작했는지 체크한다)을 생성하거나, PublishSubject 대신 ReplaySubject
를 사용해야 합니다.
소스 Observable이 오류 때문에 종료되면 BehaviorSubject
는 아무런 항목도 배출하지 않고 소스 Observable에서 발생한 오류를 그대로 옵저버에 전달합니다.
BehaviorSubject
BehaviorSubject
는 AsyncSubject와 PublishSubject의 장점을 결합한 Subject입니다.
BehaviorSubject
는 멀티캐스팅으로 동작하는데, 구독 전 마지막 아이템과, 구독 이후 발행된 모든 데이터를 배출하게 됩니다.
다음은 ReactiveX에서 제공하는 BehaviorSubject의 다이어그램입니다.
다른 Subject와 마찬가지로 옵저버블이 오류 때문에 종료되면 BehaviorSubject
는 아무런 항목도 배출하지 않고 소스 Observable에서 발생한 오류를 그대로 전달합니다.
마지막으로 보았던 예제를 AsyncSubject에서 BehaviorSubject로
변형해 보면서 어떻게 결과가 달라지는지 보도록 하겠습니다.
val subject = BehaviorSubject.create<Int>() //1 AsyncSubject -> BehaviorSubject
subject.onNext(1) //2
subject.onNext(2)
subject.onNext(3)
subject.onNext(4)
subject.subscribe({ //3
//onNext
println("S1 Received $it")
},{
//onError
it.printStackTrace()
},{
//onComplete
println("S1 Complete")
})
subject.onNext(5) //4
subject.subscribe({ //5
//onNext
println("S2 Received $it")
},{
//onError
it.printStackTrace()
},{
//onComplete
println("S2 Complete")
})
subject.onComplete()//6
}
첫번째 구독은 구독 전 발행된 최신 값인 4와, 구독 후 발행된 값인 5를 받습니다. 두번째 구독시에는 구독전에 배출된 최신값인 5를 받고 onComplete
로 구독 동작을 종료하게됩니다.
결과는 다음과 같습니다.
S1 Received 4 S1 Received 5 S2 Received 5 S1 Complete S2 Complete
ReplaySubject
ReplaySubejct
는 갖고 있는 모든 데이터를 옵저버의 구독시점과 상관없이 다시 전달합니다. 이는 Cold Observable
의 특징과 유사합니다.
다음은 ReactiveX에서 제공하는 ReplaySubject의 다이어그램입니다.
좀 특이한 점이 있다면, ReplaySubject
는 몇 개의 생성자 오버로드를 제공합니다.
이를 통해, 재생 버퍼의 크기가 특정 이상으로 증가할 경우, 또는 처음 배출 이후 지정한 시간이 경과할 경우 오래된 항목들을 제거합니다.
ReplaySubject
을 옵저버로 사용할 경우, 멀티 스레드 환경에서는 Observable 계약 위반과 주제에서 어느 항목 또는 알림을 먼저 재생해야 하는지 알 수 없는 모호함이 동시에 발생할 수 있기 때문에 (비순차적) 호출을 유발시키는 onNext
(또는 그 외 on
) 메서드를 사용하지 않도록 주의해야 합니다.
다음은 예시입니다.
val subject = ReplaySubject.create<Int>() //1
subject.onNext(1) //2
subject.onNext(2)
subject.onNext(3)
subject.onNext(4)
subject.subscribe({ //3
//onNext
println("S1 Received $it")
},{
//onError
it.printStackTrace()
},{
//onComplete
println("S1 Complete")
})
subject.onNext(5) //4
subject.subscribe({ //5
//onNext
println("S2 Received $it")
},{
//onError
it.printStackTrace()
},{
//onComplete
println("S2 Complete")
})
subject.onComplete()//6
}
그리고 결과입니다. 결과에서는 두가지 구독 모두 구독시점 상관없이 모든 배출 값을 받고 있는 것을 볼 수 있습니다.
S1 Received 1 S1 Received 2 S1 Received 3 S1 Received 4 S1 Received 5 S2 Received 1 S2 Received 2 S2 Received 3 S2 Received 4 S2 Received 5 S1 Complete S2 Complete
3장에서는 옵저버블인 무엇인지, 옵저버는 무엇인지, 그리고 어떻게 사용하는지 알 수 있었습니다. 이해를 돕기위해 여러 예제를 통해 같이 공부해볼 수 있었습니다. 그리고 옵저버블이 Cold Observable
과 Hot Observable
로 두 종류로 나뉜다는 것을 알 수 있었습니다. 또한 그중 Hot Observable로 변형이 가능한 Subjects
를 접했고, 다양한 종류로 제공이 되는 것을 보았습니다.
»포스트 3-01.옵저버블을 구독했을 때 해제하는 방법
옵저버블은 큰 유연성과 많은 기능들을 제공하지만, RxJava1의 경우 많은 데이터에 대한 배출이 일어났을 경우 BackPressure
배압현상과 같은 여러가지 단점이 존재합니다. 그 단점에 대한 극복은 4장을 통해 보도록 하겠습니다. 읽어주셔서 감사합니다.
Comments