-
14장 Operator 1 - Sequence 생성Spring/Webflux 2023. 7. 8. 14:04
github code : https://github.com/bjpublic/Spring-Reactive/tree/main/part2/src/main/java/chapter14/operator_1_create
14.1 Operator 란
리액티브 프로그래밍은 operator 로 시작해서 operator 로 끝난다라고 해도 과언이 아닐정도로 가장 중요한 구성요소 입니다
Chapter 14 의 내용 구성방식
operator 의
1. 마블 다이어그램
2. 기본 예제
3. 실무 활용 예제14.2 Sequence 생성을 위한 Operator
1) justOrEmpty
justOrEmpty() 는 just() 의 확장 operator 로서,
emit 할 데이터가 null 일 경우 NullPointExpection 이 발생하지 않고 onComplete Signal 을 전송합니다
그리고 null 이 아닌 데이터일 경우 해당 데이터를 emit 하는 Mono 를 생성합니다
코드 14-1 justOrEmpty 예제
public static void main(String[] args) { Mono .justOrEmpty(null) .subscribe(data -> {}, error -> {}, () -> log.info("# onComplete")); } // result > Task :Example14_1.main() 14:50:51.591 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 14:50:51.597 [main] c.operator_1_create.Example14_1 INFO - # onComplete
justOrEmtpy(null) 이 전송되어도, 오류 발생하지 않고 # onComplete Signal 이 전송되었습니다
2) fromIterable
fromIterable() operator 는 Iterable 에 포함된 데이터를 emit 하는 Flux 를 생성합니다.
즉, Java 에서 제공하는 Iterable 을 구현한 구현체를 fromIterable() 의 파라미터로 전달할 수 있습니다
코드 14-2 fromIterable 예제
public static void main(String[] args) { Flux .fromIterable(SampleData.coins) .subscribe(coin -> log.info("coin 명: {}, 현재가: {}", coin.getT1(), coin.getT2()) ); } // SampleData.coins public class SampleData { public static final List<Tuple2<String, Integer>> coins = Arrays.asList( Tuples.of("BTC", 52_000_000), Tuples.of("ETH", 1_720_000), Tuples.of("XRP", 533), Tuples.of("ICX", 2_080), Tuples.of("EOS", 4_020), Tuples.of("BCH", 558_000)); } // result > Task :Example14_2.main() 16:12:32.628 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 16:12:32.734 [main] c.operator_1_create.Example14_2 INFO - coin 명: BTC, 현재가: 52000000 16:12:32.739 [main] c.operator_1_create.Example14_2 INFO - coin 명: ETH, 현재가: 1720000 16:12:32.739 [main] c.operator_1_create.Example14_2 INFO - coin 명: XRP, 현재가: 533 16:12:32.740 [main] c.operator_1_create.Example14_2 INFO - coin 명: ICX, 현재가: 2080 16:12:32.740 [main] c.operator_1_create.Example14_2 INFO - coin 명: EOS, 현재가: 4020 16:12:32.740 [main] c.operator_1_create.Example14_2 INFO - coin 명: BCH, 현재가: 558000
3) fromStream
fromStream() operator 는 Stream 에 포함된 데이터를 emit 하는 Flux를 생성합니다.
Java 의 Stream 특성상 Stream 은 재사용할 수 없으며, cancel, error, complete 시에 자동으로 닫히게 됩니다
코드14-3 fromStream 예제
public static void main(String[] args) { Flux .fromStream(() -> SampleData.coinNames.stream()) .filter(coin -> coin.equals("BTC") || coin.equals("ETH")) .subscribe(data -> log.info("{}", data)); } // coinNames public class SampleData { public static final List<String> coinNames = Arrays.asList("BTC", "ETH", "XRP", "ICX", "EOS", "BCH"); } // result > Task :Example14_3.main() 16:27:35.895 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 16:27:35.948 [main] c.operator_1_create.Example14_3 INFO - BTC 16:27:35.949 [main] c.operator_1_create.Example14_3 INFO - ETH
4) range
range() operator 는 n 부터 1씩 증가한 연속된 수를 m 개 emit 하는 Flux 를 생성합니다.
range() operator 는 명령형 언어의 for 문처럼 특정 횟수만큼 어떤 작업을 처리하고자 할 경우에 주로 사용됩니다
코드 14-4 range 예제
public static void main(String[] args) { Flux .range(5, 10) .subscribe(data -> log.info("{}", data)); } // result > Task :Example14_4.main() 16:37:15.051 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 16:37:15.060 [main] c.operator_1_create.Example14_4 INFO - 5 16:37:15.061 [main] c.operator_1_create.Example14_4 INFO - 6 16:37:15.061 [main] c.operator_1_create.Example14_4 INFO - 7 16:37:15.062 [main] c.operator_1_create.Example14_4 INFO - 8 16:37:15.062 [main] c.operator_1_create.Example14_4 INFO - 9 16:37:15.062 [main] c.operator_1_create.Example14_4 INFO - 10 16:37:15.062 [main] c.operator_1_create.Example14_4 INFO - 11 16:37:15.062 [main] c.operator_1_create.Example14_4 INFO - 12 16:37:15.062 [main] c.operator_1_create.Example14_4 INFO - 13 16:37:15.062 [main] c.operator_1_create.Example14_4 INFO - 14
코드 14-5 range 예제 2, range & map
public static void main(String[] args) { Flux .range(7, 5) .map(idx -> SampleData.btcTopPricesPerYear.get(idx)) .subscribe(tuple -> log.info("{}'s {}", tuple.getT1(), tuple.getT2())); } public class SampleData { public static final List<Tuple2<Integer, Long>> btcTopPricesPerYear = Arrays.asList( Tuples.of(2010, 565L), Tuples.of(2011, 36_094L), Tuples.of(2012, 17_425L), Tuples.of(2013, 1_405_209L), Tuples.of(2014, 1_237_182L), Tuples.of(2015, 557_603L), Tuples.of(2016, 1_111_811L), Tuples.of(2017, 22_483_583L), Tuples.of(2018, 19_521_543L), Tuples.of(2019, 15_761_568L), Tuples.of(2020, 22_439_002L), Tuples.of(2021, 63_364_000L) ); } // result > Task :Example14_5.main() 16:39:39.925 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 16:39:39.979 [main] c.operator_1_create.Example14_5 INFO - 2017's 22483583 16:39:39.980 [main] c.operator_1_create.Example14_5 INFO - 2018's 19521543 16:39:39.981 [main] c.operator_1_create.Example14_5 INFO - 2019's 15761568 16:39:39.981 [main] c.operator_1_create.Example14_5 INFO - 2020's 22439002 16:39:39.981 [main] c.operator_1_create.Example14_5 INFO - 2021's 63364000
range() operator 에서 7 부터 5개의 숫자를 emit
map() 에서 emit 된 숫자를 btcTopPricesPerYear 리스트의 index 로 사용하여, 해당 연도별 btc 최고가 정보를 가지는 튜플을 subscriber 에게 전달합니다
5) defer
defer() operator 는 operator 를 선언한 시점에 데이터를 emit 하는 것이 아니라
구독하는 시점에 데이터를 emit 하는 Flux 또는 Mono 를 생성합니다
defer() 는 데이터 emit 을 지연시키기 때문에 꼭 필요한 시점에 데이터를 emit 하여 불필요한 프로세스를 줄일 수 있습니다
코드 14-6 defer 예제,
LocalDateTime.now() 를 이용한 justMono 와 deferMono 비교
public static void main(String[] args) throws InterruptedException { log.info("# start: {}", LocalDateTime.now()); Mono<LocalDateTime> justMono = Mono.just(LocalDateTime.now()); Mono<LocalDateTime> deferMono = Mono.defer(() -> Mono.just(LocalDateTime.now())); Thread.sleep(2000); justMono.subscribe(data -> log.info("# onNext just1: {}", data)); deferMono.subscribe(data -> log.info("# onNext defer1: {}", data)); Thread.sleep(2000); justMono.subscribe(data -> log.info("# onNext just2: {}", data)); deferMono.subscribe(data -> log.info("# onNext defer2: {}", data)); } // result > Task :Example14_6.main() 16:49:25.646 [main] c.operator_1_create.Example14_6 INFO - # start: 2023-07-08T16:49:25.644913 16:49:25.703 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 16:49:27.717 [main] c.operator_1_create.Example14_6 INFO - # onNext just1: 2023-07-08T16:49:25.649058 16:49:27.718 [main] c.operator_1_create.Example14_6 INFO - # onNext defer1: 2023-07-08T16:49:27.718505 16:49:29.719 [main] c.operator_1_create.Example14_6 INFO - # onNext just2: 2023-07-08T16:49:25.649058 16:49:29.720 [main] c.operator_1_create.Example14_6 INFO - # onNext defer2: 2023-07-08T16:49:29.719996
justMono 는 Hot Publisher 이기 때문에 Subscriber 의 구독 여부와 상관없이 데이터를 emit 하게 됩니다
그리고 구독이 발생하면 emit 된 데이터를 다시 replay 해서 Subscriber 에게 전달합니다.
그래서 just1 과 just2 모두 just1: 2023-07-08T16:49:25 가 출력 되었습니다.
하지만 defer() operator 는 구독이 발생하기 전까지 데이터의 emit 을 지연시키기 때문에, 실제 구독이 발생해야 데이터를 emit 합니다.
그래서 sleep 2초 씩이 적용되어
defer1: 2023-07-08T16:49:27
defer2: 2023-07-08T16:49:29가 출력되었습니다
public static void main(String[] args) throws InterruptedException { log.info("# start: {}", LocalDateTime.now()); Mono .just("Hello") .delayElement(Duration.ofSeconds(3)) .switchIfEmpty(sayDefault()) // .switchIfEmpty(Mono.defer(() -> sayDefault())) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(3500); } private static Mono<String> sayDefault() { log.info("# Say Hi"); return Mono.just("Hi"); } //result > Task :Example14_7.main() 17:05:30.725 [main] c.operator_1_create.Example14_7 INFO - # start: 2023-07-08T17:05:30.724267 17:05:30.783 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 17:05:30.791 [main] c.operator_1_create.Example14_7 INFO - # Say Hi 17:05:33.806 [parallel-1] c.operator_1_create.Example14_7 INFO - # onNext: Hello
#1 defer() 가 주석된 상태
얼핏 보면 Upstream 쪽에서 Hello 문자열이 emit 되기 때문에 sayDefault() 메서드가 호출되지 않을 것 같지만,
코드를 실행하면 sayDefault() 메서드가 호출되어 "Say Hi" 가 출력됩니다
결과적으로 메서드가 불필요하게 호출되는 문제가 발생하게 됩니다
; 데이터는 emit 되었지만, 구독이 발생하기 전상태여서, switchIfEmtpy = true
#2 defer() 사용
public static void main(String[] args) throws InterruptedException { log.info("# start: {}", LocalDateTime.now()); Mono .just("Hello") .delayElement(Duration.ofSeconds(3)) // .switchIfEmpty(sayDefault()) .switchIfEmpty(Mono.defer(() -> sayDefault())) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(3500); } private static Mono<String> sayDefault() { log.info("# Say Hi"); return Mono.just("Hi"); } // result > Task :Example14_7.main() 17:11:49.839 [main] c.operator_1_create.Example14_7 INFO - # start: 2023-07-08T17:11:49.838073 17:11:49.902 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 17:11:52.925 [parallel-1] c.operator_1_create.Example14_7 INFO - # onNext: Hello
sayDefault() 메서드를 defer() 로 감싸기 때문에 Upstream 에서 emit 되는 데이터가 있어서, sayDefault() 메서드는 호출되지 않습니다
6) using
using() operator 는 파라미터로 전달받은 resource 를 emit 하는 Flux 를 생성합니다
첫 번째 파라미터는 읽어 올 resource 이고,
두 번째 파라미터는 읽어 온 resource 를 emit 하는 Flux 입니다
마지막 세 번째 파라미터는 종료 Signal(onComplete 또는 onError) 이 발생할 경우, resource 를 해제하는 등의 후처리를 할 수 있게 해 줍니다.
코드 14-8 using 예제
public static void main(String[] args) { //Path path = Paths.get("D:\\resources\\using_example.txt"); Path path = Paths.get("part2/src/main/resources/using_example.txt"); Flux .using(() -> Files.lines(path), Flux::fromStream, Stream::close) .subscribe(log::info); } //result > Task :Example14_8.main() 18:07:38.851 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 18:07:38.868 [main] c.operator_1_create.Example14_8 INFO - Hello, world! 18:07:38.868 [main] c.operator_1_create.Example14_8 INFO - Nice to meet you! 18:07:38.868 [main] c.operator_1_create.Example14_8 INFO - Good bye~
using(() -> Files.lines(path), Flux::fromStream, Stream::close)
- 1st Files.lines(path) 는 읽어올 resource 이고
- 2nd Flux::fromStream 는 emit 하는 flux 입니다
- 3rd Stream::close 는 후처리 입니다
File 을 한 라인씩 읽어와서, Stream<String> 으로 변경하고
subscribe(log::info) 를 통해 출력합니다
참고
저자께서 windows os 를 사용하셔서, mac 에서 돌리기위해,
아래 스샷처럼 resource 폴더에 using_example.txt 파일을 만들고, 아래의 데이터 만들었습니다
Hello, world!
Nice to meet you!
Good bye~7) generate
generator() Operator 는 프로그래밍 방식으로 Signal 이벤트를 발생시키며, 특히 동기적으로 데이터를 하나씩 순차적으로 emit 하고자 할 경우 사용됩니다
- 마블 다이어그램에서 generate() 의 첫번째 파라미터는 emit 할 숫자의 초깃값을 지정합니다.
숫자 0 옆에 붙은 +/- 기호는 초깃값으로 음수도 가능하고 양수도 가능하다는 의미입니다 - 두 번째 파라미터의 람다 표현식에 보이는 V는 State입니다
generate() Operator 는 초깃값으로 지정한 숫자부터 emit 하고 emit 한 숫자를 1씩 증가시켜서 다시 emit 하는 작업을 반복하는데, 이렇게 1씩 증가하는 숫자를 상태값으로 정의합니다
따라서 V옆에 붙은 +/- 기호 역시 초깃값 이후부터 1씩 증가한 숫자가 음수나 양수가 될 수 있다는 것을 의미합니다.
그리고 return 옆의 V에 붙는 +는 1씩 증가시킨다는 의미 입니다
Tip
generate() 의 마블 다이어그램상에는 상태값이 숫자로 표현되지만 숫자 이외에 객쳐여도 상관없습니다
다만 상태 값으로 표현되는 객체 내부에 1씩 증가하는 숫자를 포함하고 있어야 해당 숫자값을 조건으로 지정해서 onComplete Signal 을 발생시킬 수 있습니다.코드 14-9 generate 예제
public static void main(String[] args) { Flux .generate(() -> 0, (state, sink) -> { sink.next(state); if (state == 10) { sink.complete(); } return ++state; }) .subscribe(data -> log.info("# onNext: {}", data)); } //result > Task :Example14_9.main() 18:33:35.162 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 18:33:35.169 [main] c.operator_1_create.Example14_9 INFO - # onNext: 0 18:33:35.171 [main] c.operator_1_create.Example14_9 INFO - # onNext: 1 18:33:35.171 [main] c.operator_1_create.Example14_9 INFO - # onNext: 2 18:33:35.171 [main] c.operator_1_create.Example14_9 INFO - # onNext: 3 18:33:35.171 [main] c.operator_1_create.Example14_9 INFO - # onNext: 4 18:33:35.171 [main] c.operator_1_create.Example14_9 INFO - # onNext: 5 18:33:35.171 [main] c.operator_1_create.Example14_9 INFO - # onNext: 6 18:33:35.171 [main] c.operator_1_create.Example14_9 INFO - # onNext: 7 18:33:35.172 [main] c.operator_1_create.Example14_9 INFO - # onNext: 8 18:33:35.172 [main] c.operator_1_create.Example14_9 INFO - # onNext: 9 18:33:35.172 [main] c.operator_1_create.Example14_9 INFO - # onNext: 10
generate() 의 첫 번째 파라미터에서 초깃값을 0으로 지정했으며,
두 번째 파라미터에서 전달받은 SynchronousSink 객체로 상태 값을 emit 합니다
SynchronousSink 는 하나의 Signal 만 동기적으로 발생시킬 수 있으며 최대 하나의 상태 값만 emit 하는 인터페이스입니다
3번째 후처리 인자가 있는 generate
코드 14-10 generate 예제 2
구구단 3단 출력
public static void main(String[] args) { final int dan = 3; Flux .generate(() -> Tuples.of(dan, 1), (state, sink) -> { sink.next(state.getT1() + " * " + state.getT2() + " = " + state.getT1() * state.getT2()); if (state.getT2() == 9) { sink.complete(); } return Tuples.of(state.getT1(), state.getT2() + 1); }, state -> log.info("# 구구단 {}단 종료!", state.getT1())) .subscribe(data -> log.info("# onNext: {}", data)); } // result > Task :Example14_10.main() 13:43:12.201 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 13:43:12.220 [main] c.operator_1_create.Example14_10 INFO - # onNext: 3 * 1 = 3 13:43:12.223 [main] c.operator_1_create.Example14_10 INFO - # onNext: 3 * 2 = 6 13:43:12.223 [main] c.operator_1_create.Example14_10 INFO - # onNext: 3 * 3 = 9 13:43:12.223 [main] c.operator_1_create.Example14_10 INFO - # onNext: 3 * 4 = 12 13:43:12.223 [main] c.operator_1_create.Example14_10 INFO - # onNext: 3 * 5 = 15 13:43:12.223 [main] c.operator_1_create.Example14_10 INFO - # onNext: 3 * 6 = 18 13:43:12.224 [main] c.operator_1_create.Example14_10 INFO - # onNext: 3 * 7 = 21 13:43:12.224 [main] c.operator_1_create.Example14_10 INFO - # onNext: 3 * 8 = 24 13:43:12.224 [main] c.operator_1_create.Example14_10 INFO - # onNext: 3 * 9 = 27 13:43:12.225 [main] c.operator_1_create.Example14_10 INFO - # 구구단 3단 종료!
숫자가 아닌 Tuplues 객체를 상태 값으로 사용했기 때문에 generate()의 첫 번째 파라미터인 Callable 의 리턴 값고 두 번째 파라미터인 BiFuction 의 리턴 값이 모두 Tuples 객체입니다
Tuples.of(dan,1) 에서 두 번째 값을 1씩 증가시켜 주고
if (state.getT2() == 9) 조건으로 sink.complete() 을 발생시켰습니다.
그리고 Sequence 가 모두 종료되면 generate() 의 세 번째 파라미터인 Consumer 를 통해 후처리 로그
"# 구구단 {}단 종료!"
를 출력합니다
코드 14-11 generate 예제 3
2019~2021 까지, 연도별 BTC 최고가를 출력하는 예제
public static void main(String[] args) { Map<Integer, Tuple2<Integer, Long>> map = SampleData.getBtcTopPricesPerYearMap(); Flux .generate(() -> 2019, (state, sink) -> { if (state > 2021) { sink.complete(); } else { sink.next(map.get(state)); } return ++state; }) .subscribe(data -> log.info("# onNext: {}", data)); } public static Map<Integer, Tuple2<Integer, Long>> getBtcTopPricesPerYearMap() { return btcTopPricesPerYear .stream() .collect(Collectors.toMap(t1 -> t1.getT1(), t2 -> t2)); } public static final List<Tuple2<Integer, Long>> btcTopPricesPerYear = Arrays.asList( Tuples.of(2010, 565L), Tuples.of(2011, 36_094L), Tuples.of(2012, 17_425L), Tuples.of(2013, 1_405_209L), Tuples.of(2014, 1_237_182L), Tuples.of(2015, 557_603L), Tuples.of(2016, 1_111_811L), Tuples.of(2017, 22_483_583L), Tuples.of(2018, 19_521_543L), Tuples.of(2019, 15_761_568L), Tuples.of(2020, 22_439_002L), Tuples.of(2021, 63_364_000L) ); // result > Task :Example14_11.main() 14:21:27.776 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 14:21:27.905 [main] c.operator_1_create.Example14_11 INFO - # onNext: [2019,15761568] 14:21:27.907 [main] c.operator_1_create.Example14_11 INFO - # onNext: [2020,22439002] 14:21:27.907 [main] c.operator_1_create.Example14_11 INFO - # onNext: [2021,63364000]
generate() 의 초기 상태 값을 2019로 지정해서 1씩 증가한 연도별 BTC 최고가 금액을 map 에서 읽어 온 후, emit 합니다
8) create
create() operator 는 generate() 처럼 프로그래밍 방식으로 Signal 이벤트를 발생시키지만 generate() 와 몇 가지 차이점이 있습니다
generate() 는 데이터를 동기적으로 한 번에 한 건씩 emit 할 수 있는 반면에, create() 는 한 번에 여러 건의 데이터를 비동기적으로 emit 할 수 있습니다.
코드 14-12 create 예제 1 (pull 방식)
1,2,3 ~10 을 2개씩 요청하여, emit 하는 예제
static int SIZE = 0; static int COUNT = -1; // 시작 final static List<Integer> DATA_SOURCE = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); public static void main(String[] args) { log.info("# start"); Flux.create((FluxSink<Integer> sink) -> { sink.onRequest(n -> { try { Thread.sleep(1000L); for (int i = 0; i < n; i++) { if (COUNT >= 9) { sink.complete(); } else { COUNT++; sink.next(DATA_SOURCE.get(COUNT)); } } } catch (InterruptedException e) { } }); sink.onDispose(() -> log.info("# clean up")); }).subscribe(new BaseSubscriber<>() { @Override protected void hookOnSubscribe(Subscription subscription) { request(2); } @Override protected void hookOnNext(Integer value) { SIZE++; log.info("# onNext: {}", value); if (SIZE == 2) { request(2); SIZE = 0; } } @Override protected void hookOnComplete() { log.info("# onComplete"); } }); } // result > Task :Example14_12.main() 14:31:47.369 [main] c.operator_1_create.Example14_12 INFO - # start 14:31:47.475 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 14:31:48.488 [main] c.operator_1_create.Example14_12 INFO - # onNext: 1 14:31:48.491 [main] c.operator_1_create.Example14_12 INFO - # onNext: 2 14:31:49.491 [main] c.operator_1_create.Example14_12 INFO - # onNext: 3 14:31:49.492 [main] c.operator_1_create.Example14_12 INFO - # onNext: 4 14:31:50.495 [main] c.operator_1_create.Example14_12 INFO - # onNext: 5 14:31:50.495 [main] c.operator_1_create.Example14_12 INFO - # onNext: 6 14:31:51.498 [main] c.operator_1_create.Example14_12 INFO - # onNext: 7 14:31:51.499 [main] c.operator_1_create.Example14_12 INFO - # onNext: 8 14:31:52.501 [main] c.operator_1_create.Example14_12 INFO - # onNext: 9 14:31:52.502 [main] c.operator_1_create.Example14_12 INFO - # onNext: 10 14:31:53.503 [main] c.operator_1_create.Example14_12 INFO - # onComplete 14:31:53.506 [main] c.operator_1_create.Example14_12 INFO - # clean up
코드 14-12는 Subscriber 에서 요청한 경우에만(?) create() 내에서 요청 개수만큼의 데이터를 emit 하는 예제 코드 입니다
코드 이해를 위한 Tips
i. 한 번에 2개씩의 데이터를 요청하기 위해, 아래 구문을 이용하였습니다.
SIZE++
if ( SIZE == 2) {
request(2);
SIZE=0
}
ii. static int COUNT = -1;
으로 -1 부터 시작하지만
} else {
COUNT++;
sink.next(DATA_SOURCE.get(COUNT));
}
COUNT++ 을 먼저 하기 때문에 index 0 의 데이터 부터 읽어 들입니다.List<Integer> DATA_SOURCE = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);를 데이터소스로 이용했습니다 그리고, Subscriber 가 직접 요청 개수를 지정하기 위해서 BaseSubscriber를 사용했습니다
01. 구독이 발생하면 BaseSubscriber 의 hookOnSubcriber() 메서드 내부에서 request(2) 를 호출하여 한 번에 두 개의 데이터를 요청합니다
02. Subscriber 쪽에서 request() 메서드를 호출하면 create() 내부에서 sink.onRequest(n -> { ... }) 메서드의 람다 표현식이 실행됩니다
03. Subscriber 가 요청한 개수만큼 데이터를 emit 합니다sink.next(DATA_SOURCE.get(COUNT));
04. BaseSubscriber 의 hookOnNext() 메서드 내부에서 emit 된 데이터를 로그로 출력한 후 다시 request(2) 를 호출하여 두 개의 데이터를 요청합니다
05. 2~4의 과정이 반복되다가 dataSource List 의 숫자를 모두 emit 하면 onComplete Signal 을 발생시킵니다if (COUNT >= 9) {
sink.complete();06. BaseSubscriber 의 hookOnComplete() 메서드 내부에서 종료 로그를 출력합니다
protected void hookOnComplete() {
log.info("# onComplete");
}07. sink.onDispose() 의 람다 표현식이 실행되어 후처리 로그를 출력합니다
sink.onDispose(() -> log.info("# clean up"));
FluxSink 의 onDispose() 메서드에서 dispose 의 의미는 FluxSink 관점에서 FluxSink가 더이상 사용되지 않는다는 의미 입니다
구독을 취소하기 위해 사용되는 Disposable.dispose() 메서드의 dispose 와 의미는 같지만, dispose 하려는 대상이 다르다는 것을 참고하기 바랍니다.코드 14-12의 create() 의 경우 Subscriber 가 request() 메서드를 통해 요청을 보내면 Publisher 가 해당 요청 개수만큼의 데이터를 emit 하는 일종의 pull 방식으로 데이터를 처리합니다
그런데 create() operator 는 Subscriber 의 요청과 상관없이 비동기적으로 데이터를 emit 하는 push 방식 역시 사용할 수 있습니다.
코드 14-13 create 예제 2 (push 방식)
암호 화폐의 가격 변동이 있을 때마다 변동되는 가격 데이터를 Subscriber 에게 비동기적으로 emit 하는 상황을 시뮬레이션한 예제 코드
public static void main(String[] args) throws InterruptedException { CryptoCurrencyPriceEmitter priceEmitter = new CryptoCurrencyPriceEmitter(); Flux.create((FluxSink<Integer> sink) -> priceEmitter.setListener(new CryptoCurrencyPriceListener() { @Override public void onPrice(List<Integer> priceList) { priceList.stream().forEach(price -> { sink.next(price); }); } @Override public void onComplete() { sink.complete(); } })) .publishOn(Schedulers.parallel()) .subscribe( data -> log.info("# onNext: {}", data), error -> { }, () -> log.info("# onComplete")); Thread.sleep(3000L); priceEmitter.flowInto(); Thread.sleep(2000L); priceEmitter.complete(); } // result > Task :Example14_13.main() 15:20:59.423 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 15:21:02.495 [parallel-1] c.operator_1_create.Example14_13 INFO - # onNext: 50000000 15:21:02.507 [parallel-1] c.operator_1_create.Example14_13 INFO - # onNext: 50100000 15:21:02.508 [parallel-1] c.operator_1_create.Example14_13 INFO - # onNext: 50700000 15:21:02.508 [parallel-1] c.operator_1_create.Example14_13 INFO - # onNext: 51500000 15:21:02.508 [parallel-1] c.operator_1_create.Example14_13 INFO - # onNext: 52000000
코드 14-13에서 사용하는 CryptoCurrencyPriceEmitter 와 SampleData.btcPrices
public class CryptoCurrencyPriceEmitter { private CryptoCurrencyPriceListener listener; public void setListener(CryptoCurrencyPriceListener listener) { this.listener = listener; } public void flowInto() { listener.onPrice(SampleData.btcPrices); } public void complete() { listener.onComplete(); } } public class SampleData { public static final List<Integer> btcPrices = Arrays.asList(50_000_000, 50_100_000, 50_700_000, 51_500_000, 52_000_000); } public interface CryptoCurrencyPriceListener { void onPrice(List<Integer> priceList); void onComplete(); }
동작과정
01. 구독이 발생하면 create() operator 의 파라미터인 람다 표현식이 실행됩니다
02. 암호 화폐의 가격 변동이 발생하면 변동된 가격 데이터를 emit 할 수 있도록 CryptoCurrencyPriceEmitter 가 CryptoCurrencyPriceListener 를 Listener 로 등록합니다
priceEmitter.setListener(new CryptoCurrencyPriceListener() {
@Override
public void onPrice(List<Integer> priceList) {
priceList.stream().forEach(price -> {
sink.next(price);
});
}
@Override
public void onComplete() {
sink.complete();
}
}))이제 암호 화폐의 가격 변동이 있을 때마다 CryptoCurrencyPriceListener 클래스의 onPrice() 메서드가 호출됩니다
03. 암호 화폐의 가격 변동이 발생하기 전에 3초의 지연 시간을 가집니다
Thread.sleep(3000L);
04.암호 화폐의 가격 변동이 발생하는 것을 시뮬레이션하기 위해 CryptoCurrencyPriceEmitter 의 flowInto() 메서드를 호출합니다
priceEmitter.flowInto();
05. 2초의 지연 시간을 가진 후 해당 암호 화폐의 데이터 처리를 종료합니다
코드 14-13의 create() operator 의 경우, Subscriber 가 요청을 보내는 것과 상관없이 Listener 를 통해 들어오는 데이터를 리스닝하고 있다가 실제로 들어오는 데이터가 있을 경우에만 데이터를 emit 하는 일종의 push 방식으로 데이터를 처리합니다
즉, Subscriber 에서 데이터를 지속적으로 요청하지 않지만,
create(FluxSink -> {}) 가 listner 를 등록하고, 그로 부터 데이터를 받자마자 emit 하는 방식입니다.
create() + backpressure
코드 14-14 create 예제 3 (Backpressure DROP)
static int start = 1; static int end = 4; public static void main(String[] args) throws InterruptedException { Flux.create((FluxSink<Integer> emitter) -> { emitter.onRequest(n -> { log.info("# requested: " + n); try { Thread.sleep(500L); for (int i = start; i <= end; i++) { emitter.next(i); } start += 4; end += 4; } catch (InterruptedException e) { } }); emitter.onDispose(() -> { log.info("# clean up"); }); }, FluxSink.OverflowStrategy.DROP) .subscribeOn(Schedulers.boundedElastic()) .publishOn(Schedulers.parallel(), 2) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(3000L); } // result > Task :Example14_14.main() 15:44:11.937 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 15:44:11.978 [boundedElastic-1] c.operator_1_create.Example14_14 INFO - # requested: 2 15:44:12.482 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 1 15:44:12.486 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 2 15:44:12.488 [boundedElastic-1] c.operator_1_create.Example14_14 INFO - # requested: 2 15:44:12.988 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 5 15:44:12.989 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 6 15:44:12.989 [boundedElastic-1] c.operator_1_create.Example14_14 INFO - # requested: 2 15:44:13.493 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 9 15:44:13.494 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 10 15:44:13.494 [boundedElastic-1] c.operator_1_create.Example14_14 INFO - # requested: 2 15:44:13.999 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 13 15:44:13.999 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 14 15:44:13.999 [boundedElastic-1] c.operator_1_create.Example14_14 INFO - # requested: 2 15:44:14.503 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 17 15:44:14.503 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 18 15:44:14.503 [boundedElastic-1] c.operator_1_create.Example14_14 INFO - # requested: 2
onRequest
subscribeOn : 구독이 발생한 직후 실행될 스레드를 지정하는 operator
publishOn 을 기준으로 아래쪽인 downstream 의 실행 스레드를 변경합니다코드 14-14는 create() Operator 에 Backpressure 전략을 지정하는 예제 코드 입니다
예제 코드에서는 Backpressure 전략 중에서 DROP 전략을 지정했습니다
FluxSink.OverflowStrategy.DROP
그리고, publishOn() 으로 Scheduler 를 설정하면서 prefetch 수를 2로 지정한뒤에
publishOn(Schedulers.parallel(), 2)
start 와 end 변수를 사용해서 매번 4개의 데이터를 emit 하게 함으로써 코드 실행 결과에서 Backpressure DROP 전략이 적용되는 것을 조금 더 쉽게 확인할 수 있도록 했습니다
동작과정
01 구독이 발생하면 publishOn() 에서 설정한 숫자만큼의 데이터를 요청합니다
publishOn(Schedulers.parallel(), 2)
02 create() 내부에서 sink.onRequest() 메서드의 람다 표현식이 실행됩니다
03 요청한 개수보다 2개 더 많은 4개의 데이터를 emit 합니다
static int start = 1;
static int end = 4;for (int i = start; i <= end; i++) {
emitter.next(i);
}04 Subscriber 가 emit 된 데이터를 전달받아 로그로 출력합니다
.subscribe(data -> log.info("# onNext: {}", data));
05 이때 Backpressure DROP 전략이 적용되었으므로, 요청 개수를 초과하여 emit 된 3,4 데이터는 DROP 됩니다
06 다시 publishOn() 에서 지정한 숫자만큼의 데이터를 요청합니다
07 create() Operator 내부에서 onComplete Signal 이 발생하지 않았기 때문에 2~6의 과정을 반복하다가 설정한 지연 시간이 지나면 main 스레드가 종료되어 코드 실행이 종료됩니다
Thread.sleep(3000L);
실행 결과를 보면 Downstream 에서 매번 두 개의 데이터를 요청하고 있는데,
create() operator 내부에서는 네 4개의 데이터를 emit 하기 때문에 emit 된 처음 두 개의 데이터는 정상적으로 emit 되고, 나머지 두 개의 데이터는 DROP 되는 것을 알 수 있습니다
Tip, log 와 onBackpressureDrop 을 추가하면, drop 되는 데이터 확인할 수 있습니다
근데, 특이하게 requested: 2 -> 9223372036854775807 로 늘어나면서 제대로 실행이 안되는데요. 사유를 잘 모르겠습니다}, FluxSink.OverflowStrategy.DROP) .log() .onBackpressureDrop(dropped -> log.info("# dropped: {}", dropped)) // changed result > Task :Example14_14.main() 16:26:48.500 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 16:26:48.537 [boundedElastic-1] reactor.Flux.Create.1 INFO - onSubscribe(FluxCreate.DropAsyncSink) 16:26:48.539 [boundedElastic-1] reactor.Flux.Create.1 INFO - request(unbounded) 16:26:48.546 [boundedElastic-1] c.operator_1_create.Example14_14 INFO - # requested: 9223372036854775807 16:26:49.050 [boundedElastic-1] reactor.Flux.Create.1 INFO - onNext(1) 16:26:49.051 [boundedElastic-1] reactor.Flux.Create.1 INFO - onNext(2) 16:26:49.051 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 1 16:26:49.051 [boundedElastic-1] reactor.Flux.Create.1 INFO - onNext(3) 16:26:49.051 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 2 16:26:49.051 [boundedElastic-1] c.operator_1_create.Example14_14 INFO - # dropped: 3 16:26:49.052 [boundedElastic-1] reactor.Flux.Create.1 INFO - onNext(4) 16:26:49.052 [boundedElastic-1] c.operator_1_create.Example14_14 INFO - # dropped: 4
기억하세요
- just() Operator 는 Hot Publisher 이기 때문에 Subscriber 의 구독 여부와는 상관없이 데이터를 emit 하며, 구독이 발생하면 emit 된 데이터를 다시 재생(replay) 해서 Subscriber 에게 전달합니다.
- defer() Operator 는 구독이 발생하기 전까지는 데이터의 emit 을 지연시키기 때문에 just() Operator 를 defer() 로 감싸게 되면 실제 구독이 발생해야 데이터를 emit 합니다.
- using() Operator 는 파라미터로 전달받은 resource 를 emit 하는 Flux 를 생성합니다.
- generator() Operator 는 프로그래밍 방식으로 Signal 이벤트를 발생시키며, 동기적으로 데이터를 하나씩 순차적으로 emit 합니다.
- create() Operator 는 generator() Operator 와 마찬가지로 프로그래밍 방식으로 Signal 이벤트를 발생시키지만 generate() Operator와 달리 한 번에 여러 건의 데이터를 비동기적으로 emit 할 수 있다.
'Spring > Webflux' 카테고리의 다른 글
14장 Operator 3 - transformation (0) 2023.07.16 14장 Operator 2 - filter (0) 2023.07.09 13장 Testing (0) 2023.05.27 12장 Debugging (0) 2023.05.21 11장 Context (0) 2023.05.12