ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 14장 Operator 8 - multicast
    Spring/Webflux 2023. 7. 30. 14:22

     

    14.9 다수의 Subscriber 에게 Flux 를 멀티캐스팅(Muticasting)하기 위한 Operator

     

    1) publish

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#publish--

    마블 다이어 그램에서 보다시피 publish() 는 구독을 하더라도 구독 시점에 즉시 데이터를 emit 하지 않고,

    connect() 를 호출하는 시점에 비로소 데이터를 emit 합니다

    그리고 Hot Sequence 로 변환되기 때문에 구독 시점 이후에 emit 된 데이터만 전달 받을 수 있습니다.

     

    코드 14-60 publish 예제 1

    public static void main(String[] args) throws InterruptedException {
      ConnectableFlux<Integer> flux =
          Flux
              .range(1, 5)
              .delayElements(Duration.ofMillis(300L))
              .publish();
    
      Thread.sleep(500L);
      flux.subscribe(data -> log.info("# subscriber1: {}", data));
    
      Thread.sleep(200L);
      flux.subscribe(data -> log.info("# subscriber2: {}", data));
    
      flux.connect();
    
      Thread.sleep(1000L);
      flux.subscribe(data -> log.info("# subscriber3: {}", data));
    
      Thread.sleep(2000L);
    }
    
    // result 
    > Task :Example14_60.main()
    14:42:02.082 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    14:42:03.167 [parallel-1] c.operator_8_multicast.Example14_60 INFO - # subscriber1: 1
    14:42:03.171 [parallel-1] c.operator_8_multicast.Example14_60 INFO - # subscriber2: 1
    14:42:03.476 [parallel-2] c.operator_8_multicast.Example14_60 INFO - # subscriber1: 2
    14:42:03.476 [parallel-2] c.operator_8_multicast.Example14_60 INFO - # subscriber2: 2
    14:42:03.779 [parallel-3] c.operator_8_multicast.Example14_60 INFO - # subscriber1: 3
    14:42:03.779 [parallel-3] c.operator_8_multicast.Example14_60 INFO - # subscriber2: 3
    14:42:04.081 [parallel-4] c.operator_8_multicast.Example14_60 INFO - # subscriber1: 4
    14:42:04.082 [parallel-4] c.operator_8_multicast.Example14_60 INFO - # subscriber2: 4
    14:42:04.082 [parallel-4] c.operator_8_multicast.Example14_60 INFO - # subscriber3: 4
    14:42:04.386 [parallel-5] c.operator_8_multicast.Example14_60 INFO - # subscriber1: 5
    14:42:04.386 [parallel-5] c.operator_8_multicast.Example14_60 INFO - # subscriber2: 5
    14:42:04.386 [parallel-5] c.operator_8_multicast.Example14_60 INFO - # subscriber3: 5

    ConnectableFlux 이기에 Hot 으로 동작합니다

     

    01. 먼저 pubish() 를 이용해 0.3초에 한 번씩 1~5까지 emit 하는 ConnectableFlux 를 리턴받습니다

    publish() 를 호출했지만 아직 connect() 를 호출하지 않았기 때문에 이 시점에 emit 되는 데이터는 없습니다

     

    Thread.sleep(500L);
    flux.subscribe(data -> log.info("# subscriber1: {}", data));

    02. 05초 뒤에 첫 번째 구독(subscriber1) 이 발생합니다 

     

    Thread.sleep(200L);
    flux.subscribe(data -> log.info("# subscriber2: {}", data));

    03. 02초 뒤에 두 번째 구독(subscriber2)이 발생합니다.

     

    flux.connect();

    04. connect() 가 호출됩니다 이 시점부터 데이터가 0.3초에 한 번씩 emit 됩니다

     

    Thread.sleep(1000L);
    flux.subscribe(data -> log.info("# subscriber3: {}", data));

    05. 1초 뒤에 세 번째 구독이 발생합니다. 그런데 connect() 가 호출된 시점에서 부터 0.3초에 한 번씩 데이터가 emit 되기 때문에,

    숫자 1~3까지는 이미 emit 된 상태여서 세 번째 subscriber3 는 전달받지 못합니다 (4,5만 전달 받습니다 )

     

    코드 14-61 publish 예제 2

    private static ConnectableFlux<String> publisher;
    private static int checkedAudience;
    
    static {
      publisher =
          Flux
              .just("Concert part1", "Concert part2", "Concert part3")
              .delayElements(Duration.ofMillis(300L))
              .publish();
    }
    
    public static void main(String[] args) throws InterruptedException {
      checkAudience();
      Thread.sleep(500L);
      publisher.subscribe(data -> log.info("# audience 1 is watching {}", data));
      checkedAudience++;
    
      Thread.sleep(500L);
      publisher.subscribe(data -> log.info("# audience 2 is watching {}", data));
      checkedAudience++;
    
      checkAudience();
    
      Thread.sleep(500L);
      publisher.subscribe(data -> log.info("# audience 3 is watching {}", data));
    
      Thread.sleep(1000L);
    }
    
    public static void checkAudience() {
      if (checkedAudience >= 2) {
        publisher.connect();
      }
    }
    
    // result 
    > Task :Example14_61.main()
    15:11:21.281 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    15:11:22.669 [parallel-1] c.operator_8_multicast.Example14_61 INFO - # audience 1 is watching Concert part1
    15:11:22.674 [parallel-1] c.operator_8_multicast.Example14_61 INFO - # audience 2 is watching Concert part1
    15:11:22.976 [parallel-2] c.operator_8_multicast.Example14_61 INFO - # audience 1 is watching Concert part2
    15:11:22.977 [parallel-2] c.operator_8_multicast.Example14_61 INFO - # audience 2 is watching Concert part2
    15:11:22.977 [parallel-2] c.operator_8_multicast.Example14_61 INFO - # audience 3 is watching Concert part2
    15:11:23.281 [parallel-3] c.operator_8_multicast.Example14_61 INFO - # audience 1 is watching Concert part3
    15:11:23.282 [parallel-3] c.operator_8_multicast.Example14_61 INFO - # audience 2 is watching Concert part3
    15:11:23.282 [parallel-3] c.operator_8_multicast.Example14_61 INFO - # audience 3 is watching Concert part3

    콘서트 관객이 입장할 때마다 관객 수를 체크해서 관객이 두 명 이상 입장하면 콘서트를 시작하는 상황을 시뮬레이션한 코드 입니다

    뒤늦게 입장한 관객은 입장한 시점 이후의 콘서트만 볼 수 있습니다

     

    예제 코드에서는 관객이 입장(구독 발생) 할 때마다 checkAudience() 메서드를 호출해 관객 입장 수(구독 수)를 체크하고,

    관객 입장 수(구독 수)가 2 이상이면 콘서트를 시작(데이터 emit)합니다

     

    세 번째 관객이 입장(구독 발생)하는 시점에 콘서트 1부가 이미 시작된 상황이라서, 세 번째 관객은 콘서트 2부 부터 볼 수 있습니다.

     

    2) autoConnect

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/ConnectableFlux.html#autoConnect-int-

    publish() 의 경우, 구독이 발생하더라도 connect() 를 직접 호출하기 전까지는 데이터를 emit 하지 않기 때문에, 코드상에서 connect 를 직접 호출해야 합니다. 

    반면에, autoConnect() 는 파라미터로 지정하는 숫자만큼의 구독이 발생하는 시점에 Upstream 소스로 자동으로 연결되기 때문에 별도의 connect() 호출이 필요하지 않습니다

     

    코드 14-62 autoConnect 예제

    public static void main(String[] args) throws InterruptedException {
      Flux<String> publisher =
        Flux
          .just("Concert part1", "Concert part2", "Concert part3")
          .delayElements(Duration.ofMillis(300L))
          .publish()
          .autoConnect(2);
    
      Thread.sleep(500L);
      publisher.subscribe(data -> log.info("# audience 1 is watching {}", data));
    
      Thread.sleep(500L);
      publisher.subscribe(data -> log.info("# audience 2 is watching {}", data));
    
      Thread.sleep(500L);
      publisher.subscribe(data -> log.info("# audience 3 is watching {}", data));
    
      Thread.sleep(1000L);
    }
    
    // result 
    > Task :Example14_62.main()
    15:23:31.161 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    15:23:31.161 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    15:23:32.542 [parallel-1] c..Example14_62 INFO - # audience 1 is watching Concert part1
    15:23:32.546 [parallel-1] c..Example14_62 INFO - # audience 2 is watching Concert part1
    15:23:32.849 [parallel-2] c..Example14_62 INFO - # audience 1 is watching Concert part2
    15:23:32.849 [parallel-2] c..Example14_62 INFO - # audience 2 is watching Concert part2
    15:23:32.849 [parallel-2] c..Example14_62 INFO - # audience 3 is watching Concert part2
    15:23:33.150 [parallel-3] c..Example14_62 INFO - # audience 1 is watching Concert part3
    15:23:33.150 [parallel-3] c..Example14_62 INFO - # audience 2 is watching Concert part3
    15:23:33.151 [parallel-3] c..Example14_62 INFO - # audience 3 is watching Concert part3

    .autoConnect(2) 

    를 사용했기에 두 번째 구독이 발생하는 시점에 데이터가 emit 됩니다

     

     

    3) refCount

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/ConnectableFlux.html#refCount-int-

    refCount() 는 파라미터로 입력된 숫자만큼의 구독이 발생하는 시점에 Upstream 소스에 연결되며, 

    모든 구독이 취소되거나 Upstream 의 데이터 emit 이 종료되면 연결이 해제됩니다

    refCount() 는 주로 무한 스트림 상황에서 모든 구독이 취소될 경우 연결을 해제하는 데 사용할 수 있습니다

     

    코드 14-63-1 refCount예제 중 refCount(1) 사용

    public static void main(String[] args) throws InterruptedException {
      Flux<Long> publisher =
        Flux
          .interval(Duration.ofMillis(500))
          .publish()
          //.autoConnect(1);
          .refCount(1);
      Disposable disposable =
        publisher.subscribe(data -> log.info("# subscriber 1: {}", data));
    
      Thread.sleep(2100L);
      disposable.dispose();
    
      publisher.subscribe(data -> log.info("# subscriber 2: {}", data));
    
      Thread.sleep(2500L);
    }
    
    // result
    > Task :Example14_63.main()
    15:39:07.618 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    15:39:08.145 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 1: 0
    15:39:08.643 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 1: 1
    15:39:09.141 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 1: 2
    15:39:09.643 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 1: 3
    15:39:10.250 [parallel-2] c.operator_8_multicast.Example14_63 INFO - # subscriber 2: 0
    15:39:10.745 [parallel-2] c.operator_8_multicast.Example14_63 INFO - # subscriber 2: 1
    15:39:11.248 [parallel-2] c.operator_8_multicast.Example14_63 INFO - # subscriber 2: 2
    15:39:11.748 [parallel-2] c.operator_8_multicast.Example14_63 INFO - # subscriber 2: 3
    15:39:12.249 [parallel-2] c.operator_8_multicast.Example14_63 INFO - # subscriber 2: 4

    refCount() 를 이용해 1개의 구독이 발생하는 시점에 Upstream 소스에 연결되도록 했습니다.

    그런데 첫 번째 구독이 발생한 이후 2.1초 후에 구독을 해제했습니다.

    이 시점에는 모든 구독이 취소된 상태이기 때문에 연결이 해제되고, 

    두 번째 구독이 발생할 경우에는 Upstream 소스에 다시 연결됩니다

     

    Subscriber1 은 interval()이 emit 한 숫자 3까지 전달받아 출력하지만, 이후에는 구독이 취소되기 때문에 연결이 해제됩니다

    그런데 refCount(1) 로 지정했기 때문에 두번째 구독(Subscriber2)이 발생하면 Upstream 소스에 새롭게 연결되어,

    interval() 이 0 부터 다시 숫자를 emit 하는 것을 볼 수 있습니다.

     

    코드 14-63-2 refCount예제 중 autoConnect(1) 사용

    public static void main(String[] args) throws InterruptedException {
      Flux<Long> publisher =
        Flux
          .interval(Duration.ofMillis(500))
          .publish()
          .autoConnect(1);
          //.refCount(1);
      Disposable disposable =
        publisher.subscribe(data -> log.info("# subscriber 1: {}", data));
    
      Thread.sleep(2100L);
      disposable.dispose();
    
      publisher.subscribe(data -> log.info("# subscriber 2: {}", data));
    
      Thread.sleep(2500L);
    }
    
    // result
    > Task :Example14_63.main()
    15:34:48.185 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    15:34:48.713 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 1: 0
    15:34:49.212 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 1: 1
    15:34:49.712 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 1: 2
    15:34:50.211 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 1: 3
    15:34:50.714 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 2: 4
    15:34:51.212 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 2: 5
    15:34:51.710 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 2: 6
    15:34:52.210 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 2: 7
    15:34:52.710 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 2: 8

    //refCount(1) 를 주석처리 하고

    .autoConnect(1) 

    를 활성화 하면, 

    첫 번째 구독이 발생한 시점에 Upstream 소스에 연결되어,

    Subscriber 1은  0~3 까지 전달 받은 후에 첫 번째 구독이 취소되지만,

    Upstream 소스로의 연결이 해제딘 것은 아니기 때문에 Subscriber 2 는 숫자 4부터 이어서 전달 받는 것을 볼 수 있습니다.

     

     

    connectableFlux

    https://godekdls.github.io/Reactor%20Core/advancedfeaturesandconcepts/#93-broadcasting-to-multiple-subscribers-with-connectableflux

     

    Advanced Features and Concepts

    리액터 고급 기능 한글 번역

    godekdls.github.io

     

     

    https://hyunsoori.com/entry/Spring-Webflux-Cold-Hot-%EC%9D%B4%ED%95%B4%ED%95%98%EA%B8%B0

     

    Spring Webflux Cold / Hot 이해하기

    스프링의 웹플럭스에는 Flux와 Mono 라는 Reactive Streams Publisher 의 구현체가 있다. Flux 와 Mono 는 두 종류의 발행 방식이 있는데, Cold 과 Hot 방식이 존재한다. Cold sequences subscribe 할때 마다, 매번 새로

    hyunsoori.com

    Flux 를 publish 메소드를 호출하면 ConnectableFlux 로 변경이 되고, connect 메소드를 호출 하면 구독을 시작하게 된다.

    또한, ConnectableFlux 에는 autoConnect 라는 메소드도 있다.
    아래 코드에서 autoConnect(2) 라는 것은 2개의 구독자가 생기게 된다면 실행한다라는 것이다.  명시적으로 connect 를 호출하지 않아도 된다.

     

    https://velog.io/@neity16/WebFlux-2-Operator

     

    WebFlux (2) - Operator

    [Operator] [Cold Publisher] [Hot Publisher]

    velog.io

    Cold -> Hot 변환
    Hot Publisher로 변환하려면, ConnectableFlux로 변환하면 된다

     

     

    https://tech.kakao.com/2018/05/29/reactor-programming/

     

    사용하면서 알게 된 Reactor, 예제 코드로 살펴보기

    Reactor는 Pivotal의 오픈소스 프로젝트로, JVM 위에서 동작하는 논블럭킹 애플리케이션을 만들기 위한 리액티브 라이브러리입니다. Reactor는 RxJava 2와 함께 Reactive Stream의 구현체이기도 하고, Spring Fra

    tech.kakao.com

     

     

     

     

     

     

     

     

    'Spring > Webflux' 카테고리의 다른 글

    14장 Operator 7 - split  (0) 2023.07.29
    14장 Operator 6 - time  (0) 2023.07.26
    14장 Operator 5 - Error  (0) 2023.07.22
    14장 Operator 4 - peek  (0) 2023.07.22
    14장 Operator 3 - transformation  (0) 2023.07.16

    댓글

Designed by Tistory.