-
14장 Operator 8 - multicastSpring/Webflux 2023. 7. 30. 14:22
14.9 다수의 Subscriber 에게 Flux 를 멀티캐스팅(Muticasting)하기 위한 Operator
1) publish
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#publish--
마블 다이어 그램에서 보다시피 publish() 는 구독을 하더라도 구독 시점에 즉시 데이터를 emit 하지 않고,
connect() 를 호출하는 시점에 비로소 데이터를 emit 합니다
그리고 Hot Sequence 로 변환되기 때문에 구독 시점 이후에 emit 된 데이터만 전달 받을 수 있습니다.
코드 14-60 publish 예제 1
public static void main(String[] args) throws InterruptedException { ConnectableFlux<Integer> flux = Flux .range(1, 5) .delayElements(Duration.ofMillis(300L)) .publish(); Thread.sleep(500L); flux.subscribe(data -> log.info("# subscriber1: {}", data)); Thread.sleep(200L); flux.subscribe(data -> log.info("# subscriber2: {}", data)); flux.connect(); Thread.sleep(1000L); flux.subscribe(data -> log.info("# subscriber3: {}", data)); Thread.sleep(2000L); } // result > Task :Example14_60.main() 14:42:02.082 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 14:42:03.167 [parallel-1] c.operator_8_multicast.Example14_60 INFO - # subscriber1: 1 14:42:03.171 [parallel-1] c.operator_8_multicast.Example14_60 INFO - # subscriber2: 1 14:42:03.476 [parallel-2] c.operator_8_multicast.Example14_60 INFO - # subscriber1: 2 14:42:03.476 [parallel-2] c.operator_8_multicast.Example14_60 INFO - # subscriber2: 2 14:42:03.779 [parallel-3] c.operator_8_multicast.Example14_60 INFO - # subscriber1: 3 14:42:03.779 [parallel-3] c.operator_8_multicast.Example14_60 INFO - # subscriber2: 3 14:42:04.081 [parallel-4] c.operator_8_multicast.Example14_60 INFO - # subscriber1: 4 14:42:04.082 [parallel-4] c.operator_8_multicast.Example14_60 INFO - # subscriber2: 4 14:42:04.082 [parallel-4] c.operator_8_multicast.Example14_60 INFO - # subscriber3: 4 14:42:04.386 [parallel-5] c.operator_8_multicast.Example14_60 INFO - # subscriber1: 5 14:42:04.386 [parallel-5] c.operator_8_multicast.Example14_60 INFO - # subscriber2: 5 14:42:04.386 [parallel-5] c.operator_8_multicast.Example14_60 INFO - # subscriber3: 5
ConnectableFlux 이기에 Hot 으로 동작합니다
01. 먼저 pubish() 를 이용해 0.3초에 한 번씩 1~5까지 emit 하는 ConnectableFlux 를 리턴받습니다
publish() 를 호출했지만 아직 connect() 를 호출하지 않았기 때문에 이 시점에 emit 되는 데이터는 없습니다
Thread.sleep(500L);
flux.subscribe(data -> log.info("# subscriber1: {}", data));02. 05초 뒤에 첫 번째 구독(subscriber1) 이 발생합니다
Thread.sleep(200L);
flux.subscribe(data -> log.info("# subscriber2: {}", data));03. 02초 뒤에 두 번째 구독(subscriber2)이 발생합니다.
flux.connect();
04. connect() 가 호출됩니다 이 시점부터 데이터가 0.3초에 한 번씩 emit 됩니다
Thread.sleep(1000L);
flux.subscribe(data -> log.info("# subscriber3: {}", data));05. 1초 뒤에 세 번째 구독이 발생합니다. 그런데 connect() 가 호출된 시점에서 부터 0.3초에 한 번씩 데이터가 emit 되기 때문에,
숫자 1~3까지는 이미 emit 된 상태여서 세 번째 subscriber3 는 전달받지 못합니다 (4,5만 전달 받습니다 )
코드 14-61 publish 예제 2
private static ConnectableFlux<String> publisher; private static int checkedAudience; static { publisher = Flux .just("Concert part1", "Concert part2", "Concert part3") .delayElements(Duration.ofMillis(300L)) .publish(); } public static void main(String[] args) throws InterruptedException { checkAudience(); Thread.sleep(500L); publisher.subscribe(data -> log.info("# audience 1 is watching {}", data)); checkedAudience++; Thread.sleep(500L); publisher.subscribe(data -> log.info("# audience 2 is watching {}", data)); checkedAudience++; checkAudience(); Thread.sleep(500L); publisher.subscribe(data -> log.info("# audience 3 is watching {}", data)); Thread.sleep(1000L); } public static void checkAudience() { if (checkedAudience >= 2) { publisher.connect(); } } // result > Task :Example14_61.main() 15:11:21.281 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 15:11:22.669 [parallel-1] c.operator_8_multicast.Example14_61 INFO - # audience 1 is watching Concert part1 15:11:22.674 [parallel-1] c.operator_8_multicast.Example14_61 INFO - # audience 2 is watching Concert part1 15:11:22.976 [parallel-2] c.operator_8_multicast.Example14_61 INFO - # audience 1 is watching Concert part2 15:11:22.977 [parallel-2] c.operator_8_multicast.Example14_61 INFO - # audience 2 is watching Concert part2 15:11:22.977 [parallel-2] c.operator_8_multicast.Example14_61 INFO - # audience 3 is watching Concert part2 15:11:23.281 [parallel-3] c.operator_8_multicast.Example14_61 INFO - # audience 1 is watching Concert part3 15:11:23.282 [parallel-3] c.operator_8_multicast.Example14_61 INFO - # audience 2 is watching Concert part3 15:11:23.282 [parallel-3] c.operator_8_multicast.Example14_61 INFO - # audience 3 is watching Concert part3
콘서트 관객이 입장할 때마다 관객 수를 체크해서 관객이 두 명 이상 입장하면 콘서트를 시작하는 상황을 시뮬레이션한 코드 입니다
뒤늦게 입장한 관객은 입장한 시점 이후의 콘서트만 볼 수 있습니다
예제 코드에서는 관객이 입장(구독 발생) 할 때마다 checkAudience() 메서드를 호출해 관객 입장 수(구독 수)를 체크하고,
관객 입장 수(구독 수)가 2 이상이면 콘서트를 시작(데이터 emit)합니다
세 번째 관객이 입장(구독 발생)하는 시점에 콘서트 1부가 이미 시작된 상황이라서, 세 번째 관객은 콘서트 2부 부터 볼 수 있습니다.
2) autoConnect
publish() 의 경우, 구독이 발생하더라도 connect() 를 직접 호출하기 전까지는 데이터를 emit 하지 않기 때문에, 코드상에서 connect 를 직접 호출해야 합니다.
반면에, autoConnect() 는 파라미터로 지정하는 숫자만큼의 구독이 발생하는 시점에 Upstream 소스로 자동으로 연결되기 때문에 별도의 connect() 호출이 필요하지 않습니다
코드 14-62 autoConnect 예제
public static void main(String[] args) throws InterruptedException { Flux<String> publisher = Flux .just("Concert part1", "Concert part2", "Concert part3") .delayElements(Duration.ofMillis(300L)) .publish() .autoConnect(2); Thread.sleep(500L); publisher.subscribe(data -> log.info("# audience 1 is watching {}", data)); Thread.sleep(500L); publisher.subscribe(data -> log.info("# audience 2 is watching {}", data)); Thread.sleep(500L); publisher.subscribe(data -> log.info("# audience 3 is watching {}", data)); Thread.sleep(1000L); } // result > Task :Example14_62.main() 15:23:31.161 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 15:23:31.161 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 15:23:32.542 [parallel-1] c..Example14_62 INFO - # audience 1 is watching Concert part1 15:23:32.546 [parallel-1] c..Example14_62 INFO - # audience 2 is watching Concert part1 15:23:32.849 [parallel-2] c..Example14_62 INFO - # audience 1 is watching Concert part2 15:23:32.849 [parallel-2] c..Example14_62 INFO - # audience 2 is watching Concert part2 15:23:32.849 [parallel-2] c..Example14_62 INFO - # audience 3 is watching Concert part2 15:23:33.150 [parallel-3] c..Example14_62 INFO - # audience 1 is watching Concert part3 15:23:33.150 [parallel-3] c..Example14_62 INFO - # audience 2 is watching Concert part3 15:23:33.151 [parallel-3] c..Example14_62 INFO - # audience 3 is watching Concert part3
.autoConnect(2)
를 사용했기에 두 번째 구독이 발생하는 시점에 데이터가 emit 됩니다
3) refCount
refCount() 는 파라미터로 입력된 숫자만큼의 구독이 발생하는 시점에 Upstream 소스에 연결되며,
모든 구독이 취소되거나 Upstream 의 데이터 emit 이 종료되면 연결이 해제됩니다
refCount() 는 주로 무한 스트림 상황에서 모든 구독이 취소될 경우 연결을 해제하는 데 사용할 수 있습니다
코드 14-63-1 refCount예제 중 refCount(1) 사용
public static void main(String[] args) throws InterruptedException { Flux<Long> publisher = Flux .interval(Duration.ofMillis(500)) .publish() //.autoConnect(1); .refCount(1); Disposable disposable = publisher.subscribe(data -> log.info("# subscriber 1: {}", data)); Thread.sleep(2100L); disposable.dispose(); publisher.subscribe(data -> log.info("# subscriber 2: {}", data)); Thread.sleep(2500L); } // result > Task :Example14_63.main() 15:39:07.618 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 15:39:08.145 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 1: 0 15:39:08.643 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 1: 1 15:39:09.141 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 1: 2 15:39:09.643 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 1: 3 15:39:10.250 [parallel-2] c.operator_8_multicast.Example14_63 INFO - # subscriber 2: 0 15:39:10.745 [parallel-2] c.operator_8_multicast.Example14_63 INFO - # subscriber 2: 1 15:39:11.248 [parallel-2] c.operator_8_multicast.Example14_63 INFO - # subscriber 2: 2 15:39:11.748 [parallel-2] c.operator_8_multicast.Example14_63 INFO - # subscriber 2: 3 15:39:12.249 [parallel-2] c.operator_8_multicast.Example14_63 INFO - # subscriber 2: 4
refCount() 를 이용해 1개의 구독이 발생하는 시점에 Upstream 소스에 연결되도록 했습니다.
그런데 첫 번째 구독이 발생한 이후 2.1초 후에 구독을 해제했습니다.
이 시점에는 모든 구독이 취소된 상태이기 때문에 연결이 해제되고,
두 번째 구독이 발생할 경우에는 Upstream 소스에 다시 연결됩니다
Subscriber1 은 interval()이 emit 한 숫자 3까지 전달받아 출력하지만, 이후에는 구독이 취소되기 때문에 연결이 해제됩니다
그런데 refCount(1) 로 지정했기 때문에 두번째 구독(Subscriber2)이 발생하면 Upstream 소스에 새롭게 연결되어,
interval() 이 0 부터 다시 숫자를 emit 하는 것을 볼 수 있습니다.
코드 14-63-2 refCount예제 중 autoConnect(1) 사용
public static void main(String[] args) throws InterruptedException { Flux<Long> publisher = Flux .interval(Duration.ofMillis(500)) .publish() .autoConnect(1); //.refCount(1); Disposable disposable = publisher.subscribe(data -> log.info("# subscriber 1: {}", data)); Thread.sleep(2100L); disposable.dispose(); publisher.subscribe(data -> log.info("# subscriber 2: {}", data)); Thread.sleep(2500L); } // result > Task :Example14_63.main() 15:34:48.185 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 15:34:48.713 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 1: 0 15:34:49.212 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 1: 1 15:34:49.712 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 1: 2 15:34:50.211 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 1: 3 15:34:50.714 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 2: 4 15:34:51.212 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 2: 5 15:34:51.710 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 2: 6 15:34:52.210 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 2: 7 15:34:52.710 [parallel-1] c.operator_8_multicast.Example14_63 INFO - # subscriber 2: 8
//refCount(1) 를 주석처리 하고
.autoConnect(1)
를 활성화 하면,
첫 번째 구독이 발생한 시점에 Upstream 소스에 연결되어,
Subscriber 1은 0~3 까지 전달 받은 후에 첫 번째 구독이 취소되지만,
Upstream 소스로의 연결이 해제딘 것은 아니기 때문에 Subscriber 2 는 숫자 4부터 이어서 전달 받는 것을 볼 수 있습니다.
connectableFlux
https://hyunsoori.com/entry/Spring-Webflux-Cold-Hot-%EC%9D%B4%ED%95%B4%ED%95%98%EA%B8%B0
Flux 를 publish 메소드를 호출하면 ConnectableFlux 로 변경이 되고, connect 메소드를 호출 하면 구독을 시작하게 된다.
또한, ConnectableFlux 에는 autoConnect 라는 메소드도 있다.
아래 코드에서 autoConnect(2) 라는 것은 2개의 구독자가 생기게 된다면 실행한다라는 것이다. 명시적으로 connect 를 호출하지 않아도 된다.https://velog.io/@neity16/WebFlux-2-Operator
Cold -> Hot 변환
Hot Publisher로 변환하려면, ConnectableFlux로 변환하면 된다https://tech.kakao.com/2018/05/29/reactor-programming/
'Spring > Webflux' 카테고리의 다른 글
14장 Operator 7 - split (0) 2023.07.29 14장 Operator 6 - time (0) 2023.07.26 14장 Operator 5 - Error (0) 2023.07.22 14장 Operator 4 - peek (0) 2023.07.22 14장 Operator 3 - transformation (0) 2023.07.16