-
10장 SchedulerSpring/Webflux 2023. 5. 7. 22:18
10.1 스레드 Thread 의 개념 이해
스레드는 크게
- 물리적인 스레드 Physical Thread 와
- 논리적인 스레드 Logical Thread 로 구분합니다.
CPU 사양이 듀얼코어 4 스레드라고 표기되었다면
4 스레드는 물리적인 스레드를 의미합니다.
논리적인 스레드
논리적인 스레드는 이론적으로 메모리가 허용하는 범위 내에서 얼마든지 만들 수 있지만, 물리적인 스레드의 가용 범위 내에서 실행될 수 있습니다.
물리적인 스레드는 병렬성 Parallelism 과 관련 있으며,
논리적인 스레드는 동시성 Concurrency 와 관련 있습니다.
병렬성 -> 물리적인 스레드가 실제로 동시에 실행되기 때문에 여러 작업을 동시에 처리함을 의미합니다.
동시성 -> 동시에 실행되는 것처럼 보이는 것을 의미 합니다. 많은 논리적인 스레드가 4개의 물리적인 스레드를 아주 빠른 속도로 번갈아 가며 사용하면서 마치 동시에 실행되는 것처럼 보이는 동시성을 가지게 됩니다.
10.2 Scheduler 란?
Reactor 에서 사용되는 Scheduler 는 Reactor Sequence 에서 사용되는 스레드를 관리해 주는 관리자 역할을 합니다.
Scheduler 를 사용하여 어떤 스레드에서 무엇을 처리할지 제어합니다.
스레드 간의 경쟁 조건 Race Condition 등을 신중하게 고려해서 코드를 작성해야 하는데, 이로 인해 코드의 복잡도가 높아지고 결과적으로 예상치 못한 오류가 발생할 가능성이 높습니다.
Reactor 에서는 Scheduler 를 통해 이러한 문제를 최소화 할 수 있스니다.
코드가 간결해지고, 직접 스레드를 제어해야 하는 부담에서 벗어 날 수 있습니다.
10.3 Scheduler 를 위한 전용 Operator
subscriberOn() 과 publishOn() Operator 가 바로 Scheduler 전용 Operator 입니다.
파라미터로 적절한 Scheduler 를 전달하면, 해당 Scheduler 의 특성에 맞는 스레드가 Reactor Sequence 에 할당 됩니다.
또, parallel() 이라는 특별한 Operator 도 있습니다.
subscribeOn()
구독이 발생한 직후 실행될 스레드를 지정하는 operator
public static void main(String[] args) throws InterruptedException { Flux.fromArray(new Integer[] {1, 3, 5, 7}) .subscribeOn(Schedulers.boundedElastic()) .doOnNext(data -> log.info("# doOnNext: {}", data)) .doOnSubscribe(subscription -> log.info("# doOnSubscribe")) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(500L); } // result > Task :Example10_1.main() 22:38:06.795 [main] DEBUG- Using Slf4j logging framework 22:38:06.824 [main] INFO - # doOnSubscribe 22:38:06.830 [boundedElastic-1] INFO - # doOnNext: 1 22:38:06.832 [boundedElastic-1] INFO - # onNext: 1 22:38:06.832 [boundedElastic-1] INFO - # doOnNext: 3 22:38:06.832 [boundedElastic-1] INFO - # onNext: 3 22:38:06.832 [boundedElastic-1] INFO - # doOnNext: 5 22:38:06.832 [boundedElastic-1] INFO - # onNext: 5 22:38:06.832 [boundedElastic-1] INFO - # doOnNext: 7 22:38:06.833 [boundedElastic-1] INFO - # onNext: 7
subscribeOn : 구독이 발생한 직후에 원본 Publisher 의 동작을 처리하기 위한 스레드를 할당.
boundedElastic 유형의 scheduler 를 지정. subscribeOn(Schedulers.boundedElastic())
subscribeOn 에서 Sceduler 를 지정하여, main 과 별도의 [boundedElastic-1] 스레드에서 처리합니다.
doOnNext : 원본 Flux 에서 emit 되는 데이터 로그로 출력
doOnSubscribe : 구독이 발생한 시점에 추가적인 어떤 처리가 필요할 경우 동작 추가. 구독이 발생한 시점에 실행되는 스레드를 확인
publishOn
publishOn 을 기준으로 아래쪽인 downstream 의 실행 스레드를 변경합니다.
public static void main(String[] args) throws InterruptedException { Flux.fromArray(new Integer[] {1, 3, 5, 7}) .doOnNext(data -> log.info("# doOnNext: {}", data)) .doOnSubscribe(subscription -> log.info("# doOnSubscribe")) .publishOn(Schedulers.parallel()) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(500L); } // result > Task :Example10_2.main() 23:00:05.882 [main] DEBUG- Using Slf4j logging framework 23:00:05.905 [main] INFO - # doOnSubscribe 23:00:05.909 [main] INFO - # doOnNext: 1 23:00:05.911 [main] INFO - # doOnNext: 3 23:00:05.911 [parallel-1] INFO - # onNext: 1 23:00:05.911 [main] INFO - # doOnNext: 5 23:00:05.911 [main] INFO - # doOnNext: 7 23:00:05.911 [parallel-1] INFO - # onNext: 3 23:00:05.912 [parallel-1] INFO - # onNext: 5 23:00:05.912 [parallel-1] INFO - # onNext: 7
doOnNext : 원본 flux 에서 emit 되는 데이터 로그를 출력 합니다. main 스레드에서 실행되었습니다.
23:00:05.909 [main] INFO - # doOnNext: 1
23:00:05.911 [main] INFO - # doOnNext: 3
23:00:05.911 [main] INFO - # doOnNext: 5
23:00:05.911 [main] INFO - # doOnNext: 7doOnSubscribe : 구독이 발생한 직후 로그를 출력합니다.
23:00:05.905 [main] INFO - # doOnSubscribe
publishOn(Schedulers.parallel()) : 이후 downstream (.subscribe(data -> log.info("# onNext: {}", data)) ) 을 병렬 스레드로 실행합니다.
23:00:05.911 [parallel-1] INFO - # onNext: 1
23:00:05.911 [parallel-1] INFO - # onNext: 3
23:00:05.912 [parallel-1] INFO - # onNext: 5
23:00:05.912 [parallel-1] INFO - # onNext: 7parallel()
parallel() Operator 는 병렬성을 가지는 물리적인 스레드에 해당됩니다.
paralell() 병렬 스레드 지정 예제
public static void main(String[] args) throws InterruptedException { Flux.fromArray(new Integer[]{1, 3, 5, 7, 9, 11, 13, 15, 17, 19}) .parallel() .runOn(Schedulers.parallel()) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(100L); } > Task :Example10_3.main() 23:18:56.610 [main] DEBUG- Using Slf4j logging framework 23:18:56.649 [parallel-5] INFO - # onNext: 9 23:18:56.649 [parallel-6] INFO - # onNext: 11 23:18:56.650 [parallel-8] INFO - # onNext: 15 23:18:56.649 [parallel-4] INFO - # onNext: 7 23:18:56.649 [parallel-3] INFO - # onNext: 5 23:18:56.649 [parallel-1] INFO - # onNext: 1 23:18:56.650 [parallel-9] INFO - # onNext: 17 23:18:56.650 [parallel-7] INFO - # onNext: 13 23:18:56.649 [parallel-2] INFO - # onNext: 3 23:18:56.650 [parallel-10] INFO - # onNext: 19
.paralle() 를 지정하여 emit 된 10개의 데이터를 병렬 스레드로 실행합니다.
parallel() operator 는 emit 되는 데이터를 CPU 의 논리적인 코어(물리적인 스레드) 수에 맞게 사전에 골고루 분배하는 역할만 하며, 실제로 병렬 작업을 수행할 스레드의 할당은 runOn() Operator 가 담당합니다.
Tips : Reactor 에서는 라운드 로빈 방식으로 CPU 의 논리적인 코어 수에 맞게 데이터를 그룹화한 것을 'rail' 이라고 합니다.
만약 .runOn(Schedulers.parallel()) 부분을 주석처리 하면, 병렬처리 되지 않고 main 스레드에서 처리됩니다.
parallel(4) 지정 예제
public static void main(String[] args) throws InterruptedException { Flux.fromArray(new Integer[]{1, 3, 5, 7, 9, 11, 13, 15, 17, 19}) .parallel(4) .runOn(Schedulers.parallel()) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(100L); } // result > Task :Example10_4.main() 23:19:47.801 [main] DEBUG- Using Slf4j logging framework 23:19:47.829 [parallel-4] INFO - # onNext: 7 23:19:47.829 [parallel-2] INFO - # onNext: 3 23:19:47.829 [parallel-3] INFO - # onNext: 5 23:19:47.829 [parallel-1] INFO - # onNext: 1 23:19:47.845 [parallel-4] INFO - # onNext: 15 23:19:47.845 [parallel-2] INFO - # onNext: 11 23:19:47.845 [parallel-3] INFO - # onNext: 13 23:19:47.845 [parallel-1] INFO - # onNext: 9 23:19:47.846 [parallel-2] INFO - # onNext: 19 23:19:47.847 [parallel-1] INFO - # onNext: 17
스레드 갯수를 4개로 지정하면, 4개의 스레드에서 처리합니다.
기억하세요
Scheduler 를 위한 전용 Operator
01. Reactor 에서의 Scheduler 는 비동기 프로그래밍을 위해 사용되는 스레드를 관리해주는 역할을 한다
02. subscribeOn() Operator 는 구독이 발생한 직후에, 실행될 스레드를 지정하는 Operator 이다.
03. publishOn() Operator 는 downstream 으로 signal 을 전송할 때 실행되는 스레드를 제어하는 역할을 하는 Operator 이다.
04. parallel() Operator 는 라운드 로빈(Round Robin) 방식으로 CPU 코어 개수만큼의 스레드를 병렬로 실행합니다.
10.4 publisherOn() 과 subscribeOn() 의 동작 이해
main thread 만 사용한 경우
public static void main(String[] args) { Flux .fromArray(new Integer[] {1, 3, 5, 7}) .doOnNext(data -> log.info("# doOnNext fromArray: {}", data)) .filter(data -> data > 3) .doOnNext(data -> log.info("# doOnNext filter: {}", data)) .map(data -> data * 10) .doOnNext(data -> log.info("# doOnNext map: {}", data)) .subscribe(data -> log.info("# onNext: {}", data)); } // result > Task :Example10_5.main() 23:28:16.850 [main] DEBUG- Using Slf4j logging framework 23:28:16.866 [main] INFO - # doOnNext fromArray: 1 23:28:16.868 [main] INFO - # doOnNext fromArray: 3 23:28:16.868 [main] INFO - # doOnNext fromArray: 5 23:28:16.868 [main] INFO - # doOnNext filter: 5 23:28:16.868 [main] INFO - # doOnNext map: 50 23:28:16.869 [main] INFO - # onNext: 50 23:28:16.869 [main] INFO - # doOnNext fromArray: 7 23:28:16.869 [main] INFO - # doOnNext filter: 7 23:28:16.869 [main] INFO - # doOnNext map: 70 23:28:16.870 [main] INFO - # onNext: 70
위처럼 모든 동작이 main thread 에서 동작
하나의 publishOn 사용한 경우
public static void main(String[] args) throws InterruptedException { Flux .fromArray(new Integer[] {1, 3, 5, 7}) .doOnNext(data -> log.info("# doOnNext fromArray: {}", data)) .publishOn(Schedulers.parallel()) .filter(data -> data > 3) .doOnNext(data -> log.info("# doOnNext filter: {}", data)) .map(data -> data * 10) .doOnNext(data -> log.info("# doOnNext map: {}", data)) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(500L); } // result > Task :Example10_6.main() 23:31:11.649 [main] DEBUG- Using Slf4j logging framework 23:31:11.682 [main] INFO - # doOnNext fromArray: 1 23:31:11.684 [main] INFO - # doOnNext fromArray: 3 23:31:11.684 [main] INFO - # doOnNext fromArray: 5 23:31:11.684 [main] INFO - # doOnNext fromArray: 7 23:31:11.684 [parallel-1] INFO - # doOnNext filter: 5 23:31:11.685 [parallel-1] INFO - # doOnNext map: 50 23:31:11.685 [parallel-1] INFO - # onNext: 50 23:31:11.685 [parallel-1] INFO - # doOnNext filter: 7 23:31:11.686 [parallel-1] INFO - # doOnNext map: 70 23:31:11.686 [parallel-1] INFO - # onNext: 70
publishOn 이후의 filter, map subscribe 는 [parallel-1] 스레드로 실행되었습니다.
publishOn 을 2번 사용한 경우
Flux .fromArray(new Integer[] {1, 3, 5, 7}) .doOnNext(data -> log.info("# doOnNext fromArray: {}", data)) .publishOn(Schedulers.parallel()) .filter(data -> data > 3) .doOnNext(data -> log.info("# doOnNext filter: {}", data)) .publishOn(Schedulers.parallel()) .map(data -> data * 10) .doOnNext(data -> log.info("# doOnNext map: {}", data)) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(500L); } // result > Task :Example10_7.main() 23:33:27.203 [main] DEBUG- Using Slf4j logging framework 23:33:27.230 [main] INFO - # doOnNext fromArray: 1 23:33:27.232 [main] INFO - # doOnNext fromArray: 3 23:33:27.232 [main] INFO - # doOnNext fromArray: 5 23:33:27.233 [main] INFO - # doOnNext fromArray: 7 23:33:27.233 [parallel-2] INFO - # doOnNext filter: 5 23:33:27.234 [parallel-1] INFO - # doOnNext map: 50 23:33:27.234 [parallel-2] INFO - # doOnNext filter: 7 23:33:27.234 [parallel-1] INFO - # onNext: 50 23:33:27.234 [parallel-1] INFO - # doOnNext map: 70 23:33:27.234 [parallel-1] INFO - # onNext: 70
doOnNext filter 는 [parallel-2] 에서 실행됨
# doOnNext map 과 onNext 는 [parallel-1] 에서 실행됨
subscribeOn() 과 publishOn() 을 함께 사용한 경우
public static void main(String[] args) throws InterruptedException { Flux .fromArray(new Integer[] {1, 3, 5, 7}) .subscribeOn(Schedulers.boundedElastic()) .doOnNext(data -> log.info("# doOnNext fromArray: {}", data)) .filter(data -> data > 3) .doOnNext(data -> log.info("# doOnNext filter: {}", data)) .publishOn(Schedulers.parallel()) .map(data -> data * 10) .doOnNext(data -> log.info("# doOnNext map: {}", data)) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(500L); } // result > Task :Example10_8.main() 23:37:39.463 [main] DEBUG- Using Slf4j logging framework 23:37:39.497 [boundedElastic-1] INFO - # doOnNext fromArray: 1 23:37:39.499 [boundedElastic-1] INFO - # doOnNext fromArray: 3 23:37:39.499 [boundedElastic-1] INFO - # doOnNext fromArray: 5 23:37:39.500 [boundedElastic-1] INFO - # doOnNext filter: 5 23:37:39.500 [boundedElastic-1] INFO - # doOnNext fromArray: 7 23:37:39.500 [parallel-1] INFO - # doOnNext map: 50 23:37:39.500 [boundedElastic-1] INFO - # doOnNext filter: 7 23:37:39.500 [parallel-1] INFO - # onNext: 50 23:37:39.501 [parallel-1] INFO - # doOnNext map: 70 23:37:39.501 [parallel-1] INFO - # onNext: 70
subScribeOn() 에 의해 구독이 발생한 직후에 실행되는
fromArray 와 filter 는 [boundedElastic-1] 에서 실행됨
publishOn 의 downstream 인
doOnNext map 과 subscribe onNext 는 [parallel-1] 에서 실행됩니다.
기억하세요
publishOn() 과 subscribe() 의 특징
01. publishOn() Operator 는 한 개 이상 사용할 수 있으며, 실행 스레드를 목적에 맞게 적절하게 분리할 수 있다.
02. subscribe() Operator 와 publishOn() Operator 를 함께 사용해서 원본 Publisher 에서 데이터를 emit 하는 스레드와 emit 된 데이터를 가공 처리하는 스레드를 적절하게 분리할 수 있다.
03. subscribeOn() 은 Operator 체인상에서 어떤 위치에 있든 간에 구독 시점 직후, 즉 Publisher 가 데이터를 emit 하기 전에 실행 스레드를 변경한다.
10.5 Scheduler 의 종류
Schedulers.immediate()
별도의 스레드를 추가적으로 생성하지 않고, 현재 스레드에서 작업을 처리하고자 할 때 사용
operator chain 간에 parameter 를 전달할 필요가 있을 경우 사용
public static void main(String[] args) throws InterruptedException { Flux .fromArray(new Integer[] {1, 3, 5, 7}) .publishOn(Schedulers.parallel()) .filter(data -> data > 3) .doOnNext(data -> log.info("# doOnNext filter: {}", data)) .publishOn(Schedulers.immediate()) .map(data -> data * 10) .doOnNext(data -> log.info("# doOnNext map: {}", data)) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(200L); } // result > Task :Example10_9.main() 23:52:55.912 [main] DEBUG- Using Slf4j logging framework 23:52:55.935 [parallel-1] INFO - # doOnNext filter: 5 23:52:55.942 [parallel-1] INFO - # doOnNext map: 50 23:52:55.942 [parallel-1] INFO - # onNext: 50 23:52:55.942 [parallel-1] INFO - # doOnNext filter: 7 23:52:55.942 [parallel-1] INFO - # doOnNext map: 70 23:52:55.942 [parallel-1] INFO - # onNext: 70
Schedulers.single()
스레드 하나만 생성해서 Scheduler 가 제거되기 전까지 재사용하는 방식입니다.
public static void main(String[] args) throws InterruptedException { doTask("task1") .subscribe(data -> log.info("# onNext: {}", data)); doTask("task2") .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(200L); } private static Flux<Integer> doTask(String taskName) { return Flux.fromArray(new Integer[] {1, 3, 5, 7}) .publishOn(Schedulers.single()) .filter(data -> data > 3) .doOnNext(data -> log.info("# {} doOnNext filter: {}", taskName, data)) .map(data -> data * 10) .doOnNext(data -> log.info("# {} doOnNext map: {}", taskName, data)); } // result > Task :Example10_10.main() 23:57:57.631 [main] DEBUG- Using Slf4j logging framework 23:57:57.655 [single-1] INFO - # task1 doOnNext filter: 5 23:57:57.659 [single-1] INFO - # task1 doOnNext map: 50 23:57:57.660 [single-1] INFO - # onNext: 50 23:57:57.660 [single-1] INFO - # task1 doOnNext filter: 7 23:57:57.660 [single-1] INFO - # task1 doOnNext map: 70 23:57:57.660 [single-1] INFO - # onNext: 70 23:57:57.662 [single-1] INFO - # task2 doOnNext filter: 5 23:57:57.662 [single-1] INFO - # task2 doOnNext map: 50 23:57:57.663 [single-1] INFO - # onNext: 50 23:57:57.663 [single-1] INFO - # task2 doOnNext filter: 7 23:57:57.663 [single-1] INFO - # task2 doOnNext map: 70 23:57:57.663 [single-1] INFO - # onNext: 70
위처럼 [single-1] 스레드에서 처리됩니다
Schedulers.newSingle()
호출할 때마다 매번 새로운 스레드 하나를 생성합니다.
public static void main(String[] args) throws InterruptedException { doTask("task1") .subscribe(data -> log.info("# onNext: {}", data)); doTask("task2") .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(200L); } private static Flux<Integer> doTask(String taskName) { return Flux.fromArray(new Integer[] {1, 3, 5, 7}) .publishOn(Schedulers.newSingle("new-single", true)) .filter(data -> data > 3) .doOnNext(data -> log.info("# {} doOnNext filter: {}", taskName, data)) .map(data -> data * 10) .doOnNext(data -> log.info("# {} doOnNext map: {}", taskName, data)); } // result > Task :Example10_11.main() 00:20:11.337 [main] DEBUG- Using Slf4j logging framework 00:20:11.360 [new-single-1] INFO - # task1 doOnNext filter: 5 00:20:11.361 [new-single-2] INFO - # task2 doOnNext filter: 5 00:20:11.363 [new-single-1] INFO - # task1 doOnNext map: 50 00:20:11.363 [new-single-2] INFO - # task2 doOnNext map: 50 00:20:11.363 [new-single-1] INFO - # onNext: 50 00:20:11.363 [new-single-2] INFO - # onNext: 50 00:20:11.364 [new-single-1] INFO - # task1 doOnNext filter: 7 00:20:11.364 [new-single-2] INFO - # task2 doOnNext filter: 7 00:20:11.364 [new-single-2] INFO - # task2 doOnNext map: 70 00:20:11.364 [new-single-1] INFO - # task1 doOnNext map: 70 00:20:11.364 [new-single-2] INFO - # onNext: 70 00:20:11.364 [new-single-1] INFO - # onNext: 70
task 1 은 [new-single-1] 에서 실행되었으며
task 2 는 [new-single-2] 에서 실행되었습니다
.publishOn(Schedulers.newSingle("new-single", true)) 에서
new-single 은 thread name 이고, true 는 데몬 스레드로 동작하게 할지 여부입니다.
public static Scheduler newSingle(String name, boolean daemon) { return newSingle(new ReactorThreadFactory(name, SingleScheduler.COUNTER, daemon, true, Schedulers::defaultUncaughtException)); }
* 데몬 스레드 : 보조 스레드라고 불리는데, 주 스레드가 종료되면, 자동으로 종료되는 특성이 있습니다.
Schedulers.boundedElastic()
ExecutorService 기반의 스레드 풀 Thread pool 을 생성한 후, 그 안에서 정해진 수만큼의 스레드를 사용하여 작업을 처리하고, 작업이 종료된 스레드는 반납하여 재사용하는 방식입니다
기본적으로 CPU 코어 수 x 10 만큼의 스레드를 생성하며, 풀에 있는 모든 스레드가 작업을 처리하고 있다면, 이용 가능한 스레드가 생길 때까지 최대 100,000개의 작업이 큐에서 대기할 수 있습니다.
Schedulers.boundedElastic() 은 db 또는 api 요청 같은 blocking i/o 작업을 효과적으로 처리하기 위한 방식입니다
실행 시간이 긴 blcoking i/o 작업이 포함된 경우, 다른 non-blocking 처리에 영향을 주지 않도록 전용 스레드를 할당해서 blocking i/o 작업을 처리하기 때문에 처리 시간을 효율적으로 사용할 수 있습니다.
Schedulers.parallel()
non-blocking i/o 에 최적화되어 있는 scheduler 로서 cpu 코어 수만큼의 스레드를 생성합니다.
Schedulers.fromExecutorService()
기존에 이미 사용하고 있는 ExecutorService 가 있다면 이 ExecutorService 로부터 Scheduler 를 생성하는 방식입니다
Schedulers.newXXXX()
Schedulers.newSingle()
Schedulers.newBoundedElastic()
Schedulers.newParallel()
는 스레드 이름, 생성 가능한 디폴트 스레드의 개수, 스레드의 유휴 시간, 데몬 스레드로의 동작 여부 등을 직접 지정해서 커스텀 스레드 풀을 새로 생성할 수 있습니다.
기억하세요
Scheduler 의 종류
01. Schedulers.immediate() 은 별도의 스레드를 추가적으로 생성하지 않고, 현재 스레드에서 작업을 처리한다.
02. Schedulers.single() 은 스레드 하나만 생성해서 Scheduler 가 제거되기 전까지 재사용한다.
03. Schedulers.boundedElastic() 은 ExcutoerService 기반의 스레드 풀을 생성한 후, 그 안에서 정해진 수만큼의 스레드를 사용하여 작업을 처리하고 작업이 종료된 스레드는 반납하여 재사용한다. blocking i/o 에 최적화되어 있다.
04. Schedulers.parallel() 은 non-blocking i/o 에 최적화되어 있는 Scheduler 로서 CPU 코어 수만큼의 스레드를 생성한다.
05. Schedulers.newSingle(), Schedulers.newBoundedElastic(), Schedulers.newParallel() 메서드를 사용해서 새로운 Scheduler 인스턴스를 생성할 수 있다.
'Spring > Webflux' 카테고리의 다른 글
12장 Debugging (0) 2023.05.21 11장 Context (0) 2023.05.12 9장 Sinks (0) 2023.05.01 8장 backpressure (0) 2023.04.23 7장 Cold Sequence 와 Hot Sequence (0) 2023.04.23