ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 9장 Sinks
    Spring/Webflux 2023. 5. 1. 10:53

     

    9.1  Sinks란?

    Processor 는 Publisher 와 Subscriber 의 기능을 모두 지니기 때문에 Subscriber 로서 기능할 땐 다른 Publisher 로 구독할 수 있고, Publisher 로서 기능할 땐 다른 Subscriber 가 구독할 수 있습니다.

    Reactor 3.4 부터 Sinks 가 등장해서 Processor 는 3.5 부터 제거 될 예정입니다.

     

    Sinks 는 리액티브 스트림즈의 Signal을 프로그래밍 방식으로 푸시할 수 있는 구조이며, Flux 또는 Mono 의 의미 체계를 가집니다

    Flux 또는 Mono 가  onNext 같은 Signal 을 내부적으로 전송해 주는 방식이었는데, Sinks 를 사용하면 프로그래밍 코드를 통해 명시적으로 Signal 을 전송할 수 있습니다

     

    Reactor 에서 프로그래밍 방식으로 Sinal 을 전송하는 가장 일반적인 방법은  generate(), create()  Operator  였습니다.

    차이는, 

    generate(), create() : 싱글스레드 기반 이며, 

    Sinks : 멀티스레드 방식으로 Singal 을 전송해도 스레드 안전성을 보장합니다

     

     

    https://projectreactor.io/docs/core/snapshot/api/reactor/core/publisher/Sinks.html

     

    Sinks (reactor-core 3.5.6-SNAPSHOT)

    Sinks are constructs through which Reactive Streams signals can be programmatically pushed, with Flux or Mono semantics. These standalone sinks expose tryEmit methods that return an Sinks.EmitResult enum, allowing to atomically fail in case the attempted s

    projectreactor.io

    public final class Sinks extends Object

     

    Sinks are constructs through which Reactive Streams signals can be programmatically pushed, with Flux or Mono semantics. These standalone sinks expose tryEmit methods that return an Sinks.EmitResult enum, allowing to atomically fail in case the attempted signal is inconsistent with the spec and/or the state of the sink.
    This class exposes a collection of (Sinks.Many builders and Sinks.One factories. Unless constructed through the unsafe() spec, these sinks are thread safe in the sense that they will detect concurrent access and fail fast on one of the attempts. unsafe() sinks on the other hand are expected to be externally synchronized (typically by being called from within a Reactive Streams-compliant context, like a Subscriber or an operator, which means it is ok to remove the overhead of detecting concurrent access from the sink itself).

     

    create() operator 를 사용해 프로그래밍 방식으로 Signal 을 전송하는 예제 코드

    public static void main(String[] args) throws InterruptedException {
        int tasks = 6;
        Flux
            .create((FluxSink<String> sink) -> {
                IntStream
                        .range(1, tasks)
                        .forEach(n -> sink.next(doTask(n)));
            })
            .subscribeOn(Schedulers.boundedElastic())
            .doOnNext(n -> log.info("# create(): {}", n))
            .publishOn(Schedulers.parallel())
            .map(result -> result + " success!")
            .doOnNext(n -> log.info("# map(): {}", n))
            .publishOn(Schedulers.parallel())
            .subscribe(data -> log.info("# onNext: {}", data));
    
        Thread.sleep(500L);
    }
    
    private static String doTask(int taskNumber) {
        // now tasking.
        // complete to task.
        return "task " + taskNumber + " result";
    }
    
    // result
    > Task :Example9_1.main()
    12:04:38.437 [main] DEBUG- Using Slf4j logging framework
    12:04:38.477 [boundedElastic-1] INFO - # create(): task 1 result
    12:04:38.479 [boundedElastic-1] INFO - # create(): task 2 result
    12:04:38.479 [boundedElastic-1] INFO - # create(): task 3 result
    12:04:38.480 [boundedElastic-1] INFO - # create(): task 4 result
    12:04:38.480 [boundedElastic-1] INFO - # create(): task 5 result
    12:04:38.491 [parallel-2] INFO - # map(): task 1 result success!
    12:04:38.492 [parallel-2] INFO - # map(): task 2 result success!
    12:04:38.492 [parallel-1] INFO - # onNext: task 1 result success!
    12:04:38.492 [parallel-2] INFO - # map(): task 3 result success!
    12:04:38.492 [parallel-1] INFO - # onNext: task 2 result success!
    12:04:38.492 [parallel-2] INFO - # map(): task 4 result success!
    12:04:38.492 [parallel-1] INFO - # onNext: task 3 result success!
    12:04:38.492 [parallel-2] INFO - # map(): task 5 result success!
    12:04:38.492 [parallel-1] INFO - # onNext: task 4 result success!
    12:04:38.492 [parallel-1] INFO - # onNext: task 5 result success!

     

    01. create() operator 가 처리해야 할 작업의 개수만큼 doTask() 메서드를 호출해서 작업을 처리한 후, 결과를 리턴받습니다

    02. 이 처리 결과를 map() Operator 를 사용해서 추가적으로 가공 처리한 후에 최종적으로 Subscriber에게 전달합니다. 

    각각 별도의 3개의 스레드

    스레드 1 : 작업을 처리 doTask : subscribeOn(Schedulers.boundedElastic()), [boundedElastic-1]

        task 1 result

        task 2 result

    스레드 2 : 처리 결과를 가공하는 단계 map() : publishOn(Schedulers.parallel()) 1st, [parallel-2]

        # map(): task 1 result success!

        # map(): task 2 result success!

    스레드 3 :  가공된 결과를 Subscriber 에게 전달하는 단계 : publishOn(Schedulers.parallel()) 2nd, [parallel-1]

        # onNext: task 1 result success!

        # onNext: task 2 result success!

     

    스레드 2, 스레드 3을 색을 다르게 표현하였습니다

     

    실행 결과

    12:04:38.477 [boundedElastic-1] INFO - # create(): task 1 result
    12:04:38.479 [boundedElastic-1] INFO - # create(): task 2 result
    12:04:38.479 [boundedElastic-1] INFO - # create(): task 3 result
    12:04:38.480 [boundedElastic-1] INFO - # create(): task 4 result
    12:04:38.480 [boundedElastic-1] INFO - # create(): task 5 result

    subscribeOn(Schedulers.boundedElastic()), [boundedElastic-1] 에 의한 create 에서 호출한 doTask 처리 결과

     

    12:04:38.491 [boundedElastic-1] INFO - # map(): task 1 result success!

    [parallel-2] 로 실행될것으로 예상했으나,  [boundedElastic-1] 로 처리됨. 이런 경우도 있나 봄

    12:04:38.492 [parallel-2] INFO - # map(): task 2 result success!

    [parallel-2] 의 map(result -> result + " success!") +  downstream doOnNext(n -> log.info("# map(): {}", n)) 처리 결과


    12:04:38.492 [parallel-1] INFO - onNext: task 1 result success!

    [parallel-1] 의 최종 subscribe(data -> log.info("# onNext: {}", data)); 처리 결과


    12:04:38.492 [parallel-2] INFO - # map(): task 3 result success!

    [parallel-2] 의 map(result -> result + " success!") +  downstream doOnNext(n -> log.info("# map(): {}", n)) 처리 결과


    12:04:38.492 [parallel-1] INFO - onNext: task 2 result success!

    [parallel-1] 의 최종 subscribe(data -> log.info("# onNext: {}", data)); 처리 결과

     

    12:04:38.492 [parallel-2] INFO - # map(): task 4 result success!

    [parallel-2] 의 map(result -> result + " success!") +  downstream doOnNext(n -> log.info("# map(): {}", n)) 처리 결과

     

    12:04:38.492 [parallel-1] INFO - onNext: task 3 result success!

    [parallel-1] 의 최종 subscribe(data -> log.info("# onNext: {}", data)); 처리 결과


    12:04:38.492 [parallel-2] INFO - # map(): task 5 result success!

    [parallel-2] 의 map(result -> result + " success!") +  downstream doOnNext(n -> log.info("# map(): {}", n)) 처리 결과


    12:04:38.492 [parallel-1] INFO - onNext: task 4 result success!
    12:04:38.492 [parallel-1] INFO - onNext: task 5 result success!

    [parallel-1] 의 최종 subscribe(data -> log.info("onNext: {}", data)); 처리 결과

     

    의 상황에서 doTask() 메서드가 싱글스레드가 아닌 여러 개의 스레드에서 각각의 전혀 다른 작업들을 처리한 후, 처리 결과를 반환하는 상황이 발생할 수도 있습니다. 이 같은 상황에서 적절하게 사용할 수 있는 방식이 바로 Sinks 입니다.

     

     

    unicast 예제

    public static void main(String[] args) throws InterruptedException {
        int tasks = 6;
    
        Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
        Flux<String> fluxView = unicastSink.asFlux();
        IntStream
                .range(1, tasks)
                .forEach(n -> {
                    try {
                        new Thread(() -> {
                            unicastSink.emitNext(doTask(n), Sinks.EmitFailureHandler.FAIL_FAST);
                            log.info("# emitted: {}", n);
                        }).start();
                        Thread.sleep(100L);
                    } catch (InterruptedException e) {
                        log.error(e.getMessage());
                    }
                });
    
        fluxView
                .publishOn(Schedulers.parallel())
                .map(result -> result + " success!")
                .doOnNext(n -> log.info("# map(): {}", n))
                .publishOn(Schedulers.parallel())
                .subscribe(data -> log.info("# onNext: {}", data));
    
        Thread.sleep(200L);
    }
    
    private static String doTask(int taskNumber) {
        // now tasking.
        // complete to task.
        return "task " + taskNumber + " result";
    }
    
    // result
    > Task :Example9_2.main()
    12:48:49.283 [main] DEBUG- Using Slf4j logging framework
    12:48:49.296 [Thread-0] INFO - # emitted: 1
    12:48:49.393 [Thread-1] INFO - # emitted: 2
    12:48:49.494 [Thread-2] INFO - # emitted: 3
    12:48:49.598 [Thread-3] INFO - # emitted: 4
    12:48:49.702 [Thread-4] INFO - # emitted: 5
    12:48:49.835 [parallel-2] INFO - # map(): task 1 result success!
    12:48:49.836 [parallel-2] INFO - # map(): task 2 result success!
    12:48:49.836 [parallel-1] INFO - # onNext: task 1 result success!
    12:48:49.836 [parallel-2] INFO - # map(): task 3 result success!
    12:48:49.836 [parallel-1] INFO - # onNext: task 2 result success!
    12:48:49.836 [parallel-1] INFO - # onNext: task 3 result success!
    12:48:49.836 [parallel-2] INFO - # map(): task 4 result success!
    12:48:49.836 [parallel-2] INFO - # map(): task 5 result success!
    12:48:49.836 [parallel-1] INFO - # onNext: task 4 result success!
    12:48:49.837 [parallel-1] INFO - # onNext: task 5 result success!

     

    01. doTask() 메서드가 루프를 돌 때마다 새로운 스레드에서 실행됩니다.

    02. 이 처리 결과를 Sinks를 통해서 downstream emit 합니다map() Operator 를 사용해서 추가적으로 가공 처리한 후에 최종적으로 Subscriber에게 전달합니다. 

    각각 별도의 3개의 스레드

    스레드 0~4 : 작업을 처리 doTask ; 구독이 시작되기 전에 doTask 가 가장 먼저 시작됨

        [Thread-0] INFO - # emitted: 1
        [Thread-1] INFO - # emitted: 2
        [Thread-2] INFO - # emitted: 3
        [Thread-3] INFO - # emitted: 4
        [Thread-4] INFO - # emitted: 5

     

    스레드 5 : 처리 결과를 가공하는 단계 map() : publishOn(Schedulers.parallel()) 1st, [parallel-2]

        [parallel-2] # map(): task 1 result success!

        [parallel-2] # map(): task 2 result success!

     

    스레드 6 :  가공된 결과를 Subscriber 에게 전달하는 단계 : publishOn(Schedulers.parallel()) 2nd, [parallel-1]

        [parallel-1] # onNext: task 1 result success!

        [parallel-1] # onNext: task 2 result success!

     

     

    실행결과

     

    12:48:49.296 [Thread-0] INFO - # emitted: 1
    12:48:49.393 [Thread-1] INFO - # emitted: 2
    12:48:49.494 [Thread-2] INFO - # emitted: 3
    12:48:49.598 [Thread-3] INFO - # emitted: 4
    12:48:49.702 [Thread-4] INFO - # emitted: 5

    [Thread-0~4] 에서 수행된 task 수행결과


    12:48:49.835 [parallel-2] INFO - # map(): task 1 result success!
    12:48:49.836 [parallel-2] INFO - # map(): task 2 result success!

    [parallel-2] 의 map(result -> result + " success!") +  downstream doOnNext(n -> log.info("# map(): {}", n)) 처리 결과


    12:48:49.836 [parallel-1] INFO - # onNext: task 1 result success!

    [parallel-1] 의  subscribe(data -> log.info("# onNext: {}", data)); 처리 결과


    12:48:49.836 [parallel-2] INFO - # map(): task 3 result success!

    [parallel-2] 의 map(result -> result + " success!") +  downstream doOnNext(n -> log.info("# map(): {}", n)) 처리 결과

     

    12:48:49.836 [parallel-1] INFO - # onNext: task 2 result success!
    12:48:49.836 [parallel-1] INFO - # onNext: task 3 result success!

    [parallel-1] 의  subscribe(data -> log.info("# onNext: {}", data)); 처리 결과


    12:48:49.836 [parallel-2] INFO - # map(): task 4 result success!
    12:48:49.836 [parallel-2] INFO - # map(): task 5 result success!

    [parallel-2] 의 map(result -> result + " success!") +  downstream doOnNext(n -> log.info("# map(): {}", n)) 처리 결과

     

    12:48:49.836 [parallel-1] INFO - # onNext: task 4 result success!
    12:48:49.837 [parallel-1] INFO - # onNext: task 5 result success!

    [parallel-1] 의  subscribe(data -> log.info("# onNext: {}", data)); 처리 결과

     

    스레드 안전성(Thead safety)이란?

    스레드 안전성이란 함수나 변수 같은 공유 자원에 동시 접근할 경우에도 프로그램의 실행에 문제가 없음을 의미합니다. 

    공유 변수에 동시에 접근해서 올바르지 않은 값이 할당 된다거나 공유 함수에 동시에 접근함으로써 교착 상태, 즉 Dead lock에 빠지게 되면 스레드 안전성이 깨지게 됩니다.

     

     

    Processor 에서는 onNext, onComplete, onError 메서드를 직접적으로 호출함으로써 스레드 안전성이 보장되지 않을 수 있는데, Sinks의 경우에는 동시 접근을 감지하고, 동시 접근하는 스레드 중 하나가 빠르게 실패함으로써 스레드 안전성을 보장합니다.

    참고 : https://www.baeldung.com/java-thread-safety

     

     

     

    9.2. Sinks 종류 및 특징

    2가지

    1st Sinks.One 을 사용하는 방법

    2nd Sinks.Many 를 사용하는 방법

     

    Sinks.One

    한 건의 데이터를 전송하는 방법을 정의해 둔 기능 명세

    Mono 방식으로 Subscriber 가 데이터 소비 가능

    public final class Sinks {
    ...
    	public static <T> Sinks.One<T> one() {
    		return SinksSpecs.DEFAULT_SINKS.one();
    	}
    ...
    }

    Tips : 책 작성 당시에서 조금 변경되었다 default_root_spec -> default_sinks ( reactor-core 3.4.21 )

    default_sinks 는 데이터가 여러개여도 첫 1건만 emit

     

    Sink.one 예제

    public static void main(String[] args) throws InterruptedException {
        Sinks.One<String> sinkOne = Sinks.one();
        Mono<String> mono = sinkOne.asMono();
    
        sinkOne.emitValue("Hello Reactor", FAIL_FAST);
        sinkOne.emitValue("Hi Reactor", FAIL_FAST);
        sinkOne.emitValue(null, FAIL_FAST);
    
        mono.subscribe(data -> log.info("# Subscriber1 {}", data));
        mono.subscribe(data -> log.info("# Subscriber2 {}", data));
    }
    
    // result 
    > Task :Example9_4.main()
    16:10:02.535 [main] DEBUG- Using Slf4j logging framework
    16:10:02.541 [main] DEBUG- onNextDropped: Hi Reactor
    16:10:02.544 [main] INFO - # Subscriber1 Hello Reactor
    16:10:02.545 [main] INFO - # Subscriber2 Hello Reactor

     

    Sinks.one 을 이용하여

    1) Hello Reactor

    2) Hi reactor 

    3) null 

    을 emit

     

    16:11:55.556 [main] DEBUG- onNextDropped: Hi Reactor

    2) 의 Hi Reactor 는 dropped 되었음을 알 수 있음

     

    Sinks.One 으로 많은 수의 데이터를 emit 해도, 처음 emit 한 데이터는 정상적으로 emit 되지만, 나머지 데이터들은 Drop 된다는 사실을 알 수 있음

     

    EmitFailHandler

    FAIL_FAST 의 의미는 에러가 발생했을 때 재시도를 하지 않고 즉시 실패 처리한다는 의미

     

    public final class Sinks {
    ...
    	public interface EmitFailureHandler {
    
    		EmitFailureHandler FAIL_FAST = (signalType, emission) -> false;
    
    		static EmitFailureHandler busyLooping(Duration duration){
    			return new OptimisticEmitFailureHandler(duration);
    		}
    		boolean onEmitFailure(SignalType signalType, EmitResult emitResult);
    	}
    ...
    }

     

     

    Sinks.Many

    Sinks.Many 는 여러 건의 데이터를 여러 가지 방식으로 전송하는 기능 명세

     

    public final class Sinks {
    ...
    	public static ManySpec many() {
    		return SinksSpecs.DEFAULT_SINKS.many();
    	}
    ...
    }

    Sinks.One 을 리턴하는 것과 다르게, ManySpec 을 리턴함

     

    ManySpec 인터페이스 내부, 총 3가지 기능 정의

    public final class Sinks {
    ...
    	public interface ManySpec {
    		// Help building {@link Sinks.Many} that will broadcast signals to a single {@link Subscriber}
    		UnicastSpec unicast();
    
    		// Help building {@link Sinks.Many} that will broadcast signals to multiple {@link Subscriber}
    		MulticastSpec multicast();
    
    		// Help building {@link Sinks.Many} that will broadcast signals to multiple {@link Subscriber} with the ability to retain
    		//  and replay all or an arbitrary number of elements.
    		MulticastReplaySpec replay();
    	}
    ...
    }

     

    ManySpec 의 unicast 예제

    public static void main(String[] args) throws InterruptedException {
        Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
        Flux<Integer> fluxView = unicastSink.asFlux();
    
        unicastSink.emitNext(1, FAIL_FAST);
        unicastSink.emitNext(2, FAIL_FAST);
    
    
        fluxView.log().subscribe(data -> log.info("# Subscriber1: {}", data));
    
        unicastSink.emitNext(3, FAIL_FAST);
    
    //        fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));
    }
    
    // result
    > Task :Example9_8.main()
    16:31:15.816 [main] DEBUG- Using Slf4j logging framework
    16:31:15.824 [main] INFO - # Subscriber1: 1
    16:31:15.825 [main] INFO - # Subscriber1: 2
    16:31:15.825 [main] INFO - # Subscriber1: 3
    
    
    // result 2 [Tips] flux 의 log 사용 시 아래처럼 이벤트 발생도 볼 수 있습니다.
    > Task :Example9_8.main()
       //16:45:59.092 [main] DEBUG- Using Slf4j logging framework
    16:45:59.107 [main] INFO - | onSubscribe([Fuseable] UnicastProcessor)
    16:45:59.109 [main] INFO - | request(unbounded)
    16:45:59.109 [main] INFO - | onNext(1)
       //16:45:59.110 [main] INFO - # Subscriber1: 1
    16:45:59.110 [main] INFO - | onNext(2)
       //16:45:59.110 [main] INFO - # Subscriber1: 2
    16:45:59.110 [main] INFO - | onNext(3)
       //16:45:59.110 [main] INFO - # Subscriber1: 3

    01. Sinks.many().unicast 를 사용 하여, 1,2,3 을 emit 합니다.

    02. unicastSink.asFlux() 를 통해 flux 로 전환 합니다.

    03. fluxview 를 구독하여 결과 표시합니다. fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));

     

    unicast 를 사용하였기 때문에 Subscriber2 의 주석을 해제하면 오류 발생합니다.

    Caused by: java.lang.IllegalStateException: UnicastProcessor allows only a single Subscriber

     

    Tip : flux.log() 사용시 이벤트 발생도 볼 수  있습니다

     

    Tip : 네트워크 통신에서 사용하는

    • Broadcast 라는 용어는 네트워크에 연결된 모든 시스템이 정보를 전달받는 One to All 방식 입니다.
    • Unicast 는 하나의 특정 시스템만 정보를 전달받는 One to One 방식이고,
    • Multicast 는 일부 시스템들만 정보를 전달받는 One to Many 방식 입니다.

     

    Sinks.many().multicast() 예제

    public static void main(String[] args) {
        Sinks.Many<Integer> multicastSink =
                Sinks.many().multicast().onBackpressureBuffer();
        Flux<Integer> fluxView = multicastSink.asFlux();
    
        multicastSink.emitNext(1, FAIL_FAST);
        multicastSink.emitNext(2, FAIL_FAST);
    
        fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));
        fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));
    
        multicastSink.emitNext(3, FAIL_FAST);
    }
    
    // result
    > Task :Example9_9.main()
    18:20:25.061 [main] DEBUG- Using Slf4j logging framework
    18:20:25.073 [main] INFO - # Subscriber1: 1
    18:20:25.075 [main] INFO - # Subscriber1: 2
    18:20:25.075 [main] INFO - # Subscriber1: 3
    18:20:25.076 [main] INFO - # Subscriber2: 3

     

    01  Sinks.many().multicast()  에서 1,2 emit  합니다.

    02. Subcriber1, subscriber2 추가합니다.

    03. Sinks.many().multicast() 에서 3 emit 합니다.

     

    첫 구독인 subscriber1 은 1,2,3 모두 전달 받습니다.

    2 가 emit 된 후 두번째로 구독 추가된 subscriber2 는 이후 emit 된 3만  전달 받습니다.

     

    Sinks 가 Publisher 의 역할을 할 경우 기본적으로 Hot Publisher로 동작하며, 

    특히 onBackpressureBuffer() 메서드는 Warm up 의 특징을 가지는 Hot Sequence 로 동작하기 때문에 첫 번째 구독이 발생한 시점에 downstream 쪽으로 데이터가 전달되는 것입니다.

     

     

    Sinks.many().replay() 예제

    public static void main(String[] args) {
        Sinks.Many<Integer> replaySink = Sinks.many().replay().limit(2);
        Flux<Integer> fluxView = replaySink.asFlux();
    
        replaySink.emitNext(1, FAIL_FAST);
        replaySink.emitNext(2, FAIL_FAST);
        replaySink.emitNext(3, FAIL_FAST);
    
        fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));
    
        replaySink.emitNext(4, FAIL_FAST);
    
        fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));
    }
    
    // result
    > Task :Example9_10.main()
    18:26:43.664 [main] DEBUG- Using Slf4j logging framework
    18:26:43.701 [main] INFO - # Subscriber1: 2
    18:26:43.711 [main] INFO - # Subscriber1: 3
    18:26:43.730 [main] INFO - # Subscriber1: 4
    18:26:43.751 [main] INFO - # Subscriber2: 3
    18:26:43.785 [main] INFO - # Subscriber2: 4

     

    replay() 메서드를 호출하면 리턴 값으로 MulticastReplaySpec 을 리턴하고, 이 MulticastReplaySpec의 구현 메서드 중 하나인 limi() 메서드를 호출합니다.

    MulticastReplaySpec 에는 emit 된 데이터를 다시 replay 해서 구독 전에 이미 emit 된 데이터라도 Subscriber가 전달받을 수 있게 하는 다양한 메서드들이 정의되어 있습니다.

    예로 all() 메서드는 모든 데이터를 replay 합니다.

    예제에서는 limit(2) 를 사용하여, 2개만 뒤로 돌려서 전달합니다.

     

     

    기억하세요

    01. Sinks 는 Publisher 와 Subscriber 의 기능을 모두 지닌 Processor 의 향상된 기능을 제공한다

    02. 데이터를 emit 하는 Sinks는 크게 Sinks.One 과 Sinks.Many 가 있다.

    03. Sinks.One 은 한 건의 데이터를 프로그래밍 방식으로 emit 한다

    04. Sinks.Many 는 여러 건의 데이터를 프로그래밍 방식으로 emit 한다

    05. Sinks.Many 의 UnicastSpec 은 단 하나의 Subscriber 에게만 데이터를 emit 한다

    06. Sinks.Many 의 MulticastSpec 은 하나 이상의 Subscriber 에게 데이터를 emit 한다

    07. Sinks.Many 의 MulticastReplaySpec 은 emit 된 데이터 중에서 특정 시점으로 되돌린 (replay) 데이터부터 emit 한다

     

     

     

     

     

    'Spring > Webflux' 카테고리의 다른 글

    11장 Context  (0) 2023.05.12
    10장 Scheduler  (0) 2023.05.07
    8장 backpressure  (0) 2023.04.23
    7장 Cold Sequence 와 Hot Sequence  (0) 2023.04.23
    6장 마블 다이어그램  (1) 2023.04.22

    댓글

Designed by Tistory.