ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 14장 Operator 1 - Sequence 생성
    Spring/Webflux 2023. 7. 8. 14:04

    github code : https://github.com/bjpublic/Spring-Reactive/tree/main/part2/src/main/java/chapter14/operator_1_create

     

    14.1 Operator 란 

    리액티브 프로그래밍은 operator 로 시작해서 operator 로 끝난다라고 해도 과언이 아닐정도로 가장 중요한 구성요소 입니다

    Chapter 14 의 내용 구성방식
    operator 의 
    1. 마블 다이어그램
    2. 기본 예제
    3. 실무 활용 예제

    14.2 Sequence 생성을 위한 Operator

    1) justOrEmpty

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#justOrEmpty-java.util.Optional-

    justOrEmpty() 는 just() 의 확장 operator 로서, 

    emit 할 데이터가 null 일 경우 NullPointExpection 이 발생하지 않고 onComplete Signal 을 전송합니다

    그리고 null 이 아닌 데이터일 경우 해당 데이터를 emit 하는 Mono 를 생성합니다

     

    코드 14-1 justOrEmpty 예제 

    public static void main(String[] args) {
        Mono
            .justOrEmpty(null)
            .subscribe(data -> {},
                    error -> {},
                    () -> log.info("# onComplete"));
    }
    
    // result
    > Task :Example14_1.main()
    14:50:51.591 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    14:50:51.597 [main] c.operator_1_create.Example14_1 INFO - # onComplete

    justOrEmtpy(null) 이 전송되어도, 오류 발생하지 않고 # onComplete Signal 이 전송되었습니다

     

    2) fromIterable

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#fromIterable-java.lang.Iterable-

    fromIterable() operator 는 Iterable 에 포함된 데이터를 emit 하는 Flux 를 생성합니다.

    즉, Java 에서 제공하는 Iterable 을 구현한 구현체를 fromIterable() 의 파라미터로 전달할 수 있습니다

     

    코드 14-2 fromIterable 예제

    public static void main(String[] args) {
        Flux
                .fromIterable(SampleData.coins)
                .subscribe(coin ->
                        log.info("coin 명: {}, 현재가: {}", coin.getT1(), coin.getT2())
                );
    }
    
    // SampleData.coins
    public class SampleData {
      public static final List<Tuple2<String, Integer>> coins =
        Arrays.asList(
          Tuples.of("BTC", 52_000_000),
          Tuples.of("ETH", 1_720_000),
          Tuples.of("XRP", 533),
          Tuples.of("ICX", 2_080),
          Tuples.of("EOS", 4_020),
          Tuples.of("BCH", 558_000));
    }
    
    // result
    > Task :Example14_2.main()
    16:12:32.628 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    16:12:32.734 [main] c.operator_1_create.Example14_2 INFO - coin 명: BTC, 현재가: 52000000
    16:12:32.739 [main] c.operator_1_create.Example14_2 INFO - coin 명: ETH, 현재가: 1720000
    16:12:32.739 [main] c.operator_1_create.Example14_2 INFO - coin 명: XRP, 현재가: 533
    16:12:32.740 [main] c.operator_1_create.Example14_2 INFO - coin 명: ICX, 현재가: 2080
    16:12:32.740 [main] c.operator_1_create.Example14_2 INFO - coin 명: EOS, 현재가: 4020
    16:12:32.740 [main] c.operator_1_create.Example14_2 INFO - coin 명: BCH, 현재가: 558000



    3) fromStream

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#fromStream-java.util.stream.Stream-

    fromStream() operator 는 Stream 에 포함된 데이터를 emit 하는 Flux를 생성합니다.

    Java 의 Stream 특성상 Stream 은 재사용할 수 없으며, cancel, error, complete 시에 자동으로 닫히게 됩니다

     

    코드14-3 fromStream 예제

    public static void main(String[] args) {
        Flux
            .fromStream(() -> SampleData.coinNames.stream())
            .filter(coin -> coin.equals("BTC") || coin.equals("ETH"))
            .subscribe(data -> log.info("{}", data));
    }
    
    // coinNames
    public class SampleData {
    	public static final List<String> coinNames =
    			Arrays.asList("BTC", "ETH", "XRP", "ICX", "EOS", "BCH");
    }
    
    // result
    > Task :Example14_3.main()
    16:27:35.895 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    16:27:35.948 [main] c.operator_1_create.Example14_3 INFO - BTC
    16:27:35.949 [main] c.operator_1_create.Example14_3 INFO - ETH

     

    4) range

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#fromIterable-java.lang.Iterable-

    range() operator 는 n 부터 1씩 증가한 연속된 수를 m 개 emit 하는 Flux 를 생성합니다.

    range() operator 는 명령형 언어의 for 문처럼 특정 횟수만큼 어떤 작업을 처리하고자 할 경우에 주로 사용됩니다

    코드 14-4 range 예제

    public static void main(String[] args) {
    	Flux
    			.range(5, 10)
    			.subscribe(data -> log.info("{}", data));
    }
    
    // result
    > Task :Example14_4.main()
    16:37:15.051 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    16:37:15.060 [main] c.operator_1_create.Example14_4 INFO - 5
    16:37:15.061 [main] c.operator_1_create.Example14_4 INFO - 6
    16:37:15.061 [main] c.operator_1_create.Example14_4 INFO - 7
    16:37:15.062 [main] c.operator_1_create.Example14_4 INFO - 8
    16:37:15.062 [main] c.operator_1_create.Example14_4 INFO - 9
    16:37:15.062 [main] c.operator_1_create.Example14_4 INFO - 10
    16:37:15.062 [main] c.operator_1_create.Example14_4 INFO - 11
    16:37:15.062 [main] c.operator_1_create.Example14_4 INFO - 12
    16:37:15.062 [main] c.operator_1_create.Example14_4 INFO - 13
    16:37:15.062 [main] c.operator_1_create.Example14_4 INFO - 14

     

    코드 14-5 range 예제 2, range & map

    public static void main(String[] args) {
      Flux
        .range(7, 5)
        .map(idx -> SampleData.btcTopPricesPerYear.get(idx))
        .subscribe(tuple -> log.info("{}'s {}", tuple.getT1(), tuple.getT2()));
    }
    
    public class SampleData {
    	public static final List<Tuple2<Integer, Long>> btcTopPricesPerYear =
    			Arrays.asList(
    					Tuples.of(2010, 565L),
    					Tuples.of(2011, 36_094L),
    					Tuples.of(2012, 17_425L),
    					Tuples.of(2013, 1_405_209L),
    					Tuples.of(2014, 1_237_182L),
    					Tuples.of(2015, 557_603L),
    					Tuples.of(2016, 1_111_811L),
    					Tuples.of(2017, 22_483_583L),
    					Tuples.of(2018, 19_521_543L),
    					Tuples.of(2019, 15_761_568L),
    					Tuples.of(2020, 22_439_002L),
    					Tuples.of(2021, 63_364_000L)
    			);
    }
    // result
    > Task :Example14_5.main()
    16:39:39.925 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    16:39:39.979 [main] c.operator_1_create.Example14_5 INFO - 2017's 22483583
    16:39:39.980 [main] c.operator_1_create.Example14_5 INFO - 2018's 19521543
    16:39:39.981 [main] c.operator_1_create.Example14_5 INFO - 2019's 15761568
    16:39:39.981 [main] c.operator_1_create.Example14_5 INFO - 2020's 22439002
    16:39:39.981 [main] c.operator_1_create.Example14_5 INFO - 2021's 63364000

    range() operator 에서 7 부터 5개의 숫자를 emit

    map() 에서 emit 된 숫자를 btcTopPricesPerYear 리스트의 index 로 사용하여, 해당 연도별 btc 최고가 정보를 가지는 튜플을  subscriber 에게 전달합니다

     

    5) defer

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#defer-java.util.function.Supplier-

    defer() operator 는 operator 를 선언한 시점에 데이터를 emit 하는 것이 아니라

    구독하는 시점에 데이터를 emit 하는 Flux 또는 Mono 를 생성합니다

    defer() 는 데이터 emit 을 지연시키기 때문에 꼭 필요한 시점에 데이터를 emit 하여 불필요한 프로세스를 줄일 수 있습니다

     

    코드 14-6 defer 예제,

    LocalDateTime.now() 를 이용한 justMono 와 deferMono 비교

    public static void main(String[] args) throws InterruptedException {
        log.info("# start: {}", LocalDateTime.now());
        Mono<LocalDateTime> justMono = Mono.just(LocalDateTime.now());
        Mono<LocalDateTime> deferMono = Mono.defer(() ->
                Mono.just(LocalDateTime.now()));
    
        Thread.sleep(2000);
    
        justMono.subscribe(data -> log.info("# onNext just1: {}", data));
        deferMono.subscribe(data -> log.info("# onNext defer1: {}", data));
    
        Thread.sleep(2000);
    
        justMono.subscribe(data -> log.info("# onNext just2: {}", data));
        deferMono.subscribe(data -> log.info("# onNext defer2: {}", data));
    }
    
    // result
    > Task :Example14_6.main()
    16:49:25.646 [main] c.operator_1_create.Example14_6 INFO - # start: 2023-07-08T16:49:25.644913
    16:49:25.703 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    16:49:27.717 [main] c.operator_1_create.Example14_6 INFO - # onNext just1: 2023-07-08T16:49:25.649058
    16:49:27.718 [main] c.operator_1_create.Example14_6 INFO - # onNext defer1: 2023-07-08T16:49:27.718505
    16:49:29.719 [main] c.operator_1_create.Example14_6 INFO - # onNext just2: 2023-07-08T16:49:25.649058
    16:49:29.720 [main] c.operator_1_create.Example14_6 INFO - # onNext defer2: 2023-07-08T16:49:29.719996

    justMono 는 Hot Publisher 이기 때문에 Subscriber 의 구독 여부와 상관없이 데이터를 emit 하게 됩니다

    그리고 구독이 발생하면 emit 된 데이터를 다시 replay 해서 Subscriber 에게 전달합니다.

    그래서 just1 과 just2 모두 just1: 2023-07-08T16:49:25 가 출력 되었습니다.

     

    하지만 defer() operator 는 구독이 발생하기 전까지 데이터의 emit 을 지연시키기 때문에, 실제 구독이 발생해야 데이터를 emit 합니다.

    그래서 sleep 2초 씩이 적용되어

    defer1: 2023-07-08T16:49:27
    defer2: 2023-07-08T16:49:29

    가 출력되었습니다

     

    public static void main(String[] args) throws InterruptedException {
      log.info("# start: {}", LocalDateTime.now());
      Mono
          .just("Hello")
          .delayElement(Duration.ofSeconds(3))
          .switchIfEmpty(sayDefault())
    //            .switchIfEmpty(Mono.defer(() -> sayDefault()))
          .subscribe(data -> log.info("# onNext: {}", data));
    
      Thread.sleep(3500);
    }
    
    private static Mono<String> sayDefault() {
      log.info("# Say Hi");
      return Mono.just("Hi");
    }
    
    //result
    > Task :Example14_7.main()
    17:05:30.725 [main] c.operator_1_create.Example14_7 INFO - # start: 2023-07-08T17:05:30.724267
    17:05:30.783 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    17:05:30.791 [main] c.operator_1_create.Example14_7 INFO - # Say Hi
    17:05:33.806 [parallel-1] c.operator_1_create.Example14_7 INFO - # onNext: Hello

    #1 defer() 가 주석된 상태

    얼핏 보면 Upstream 쪽에서 Hello 문자열이 emit 되기 때문에 sayDefault() 메서드가 호출되지 않을 것 같지만,

    코드를 실행하면 sayDefault()  메서드가 호출되어 "Say Hi" 가 출력됩니다

    결과적으로 메서드가 불필요하게 호출되는 문제가 발생하게 됩니다

    ; 데이터는 emit 되었지만, 구독이 발생하기 전상태여서, switchIfEmtpy = true

     

    #2 defer() 사용

    public static void main(String[] args) throws InterruptedException {
      log.info("# start: {}", LocalDateTime.now());
      Mono
          .just("Hello")
          .delayElement(Duration.ofSeconds(3))
    //				.switchIfEmpty(sayDefault())
          .switchIfEmpty(Mono.defer(() -> sayDefault()))
          .subscribe(data -> log.info("# onNext: {}", data));
    
      Thread.sleep(3500);
    }
    
    private static Mono<String> sayDefault() {
      log.info("# Say Hi");
      return Mono.just("Hi");
    }
    // result
    > Task :Example14_7.main()
    17:11:49.839 [main] c.operator_1_create.Example14_7 INFO - # start: 2023-07-08T17:11:49.838073
    17:11:49.902 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    17:11:52.925 [parallel-1] c.operator_1_create.Example14_7 INFO - # onNext: Hello

    sayDefault() 메서드를 defer() 로 감싸기 때문에 Upstream 에서 emit 되는 데이터가 있어서, sayDefault() 메서드는 호출되지 않습니다

     

     

    6) using

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#using-java.util.concurrent.Callable-java.util.function.Function-java.util.function.Consumer-

    using() operator 는 파라미터로 전달받은 resource 를 emit 하는 Flux 를 생성합니다

    첫 번째 파라미터는 읽어 올 resource 이고,

    두 번째 파라미터는 읽어 온 resource 를 emit 하는 Flux 입니다

    마지막 세 번째 파라미터는 종료 Signal(onComplete 또는 onError) 이 발생할 경우, resource 를 해제하는 등의 후처리를 할 수 있게 해 줍니다.

     

    코드 14-8 using 예제

    public static void main(String[] args) {
        //Path path = Paths.get("D:\\resources\\using_example.txt");
        Path path = Paths.get("part2/src/main/resources/using_example.txt");
    
        Flux
          .using(() -> Files.lines(path), Flux::fromStream, Stream::close)
          .subscribe(log::info);
    }
    
    //result
    > Task :Example14_8.main()
    18:07:38.851 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    18:07:38.868 [main] c.operator_1_create.Example14_8 INFO - Hello, world!
    18:07:38.868 [main] c.operator_1_create.Example14_8 INFO - Nice to meet you!
    18:07:38.868 [main] c.operator_1_create.Example14_8 INFO - Good bye~

     

    using(() -> Files.lines(path), Flux::fromStream, Stream::close)

    • 1st  Files.lines(path) 는 읽어올 resource 이고
    • 2nd Flux::fromStream 는 emit 하는 flux 입니다
    • 3rd Stream::close 는 후처리 입니다

     

    File 을 한 라인씩 읽어와서, Stream<String> 으로 변경하고 

    subscribe(log::info) 를 통해 출력합니다

     

     

    참고

    저자께서 windows os 를 사용하셔서, mac 에서 돌리기위해, 

    아래 스샷처럼 resource 폴더에 using_example.txt 파일을 만들고, 아래의 데이터 만들었습니다

    Hello, world!
    Nice to meet you!
    Good bye~

     

     

    7) generate

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#generate-java.util.concurrent.Callable-java.util.function.BiFunction-

    generator() Operator 는 프로그래밍 방식으로 Signal 이벤트를 발생시키며, 특히 동기적으로 데이터를 하나씩 순차적으로 emit 하고자 할 경우 사용됩니다

    1. 마블 다이어그램에서 generate() 의 첫번째 파라미터는 emit 할 숫자의 초깃값을 지정합니다.
      숫자 0 옆에 붙은 +/- 기호는 초깃값으로 음수도 가능하고 양수도 가능하다는 의미입니다
    2. 두 번째 파라미터의 람다 표현식에 보이는 V는 State입니다
      generate() Operator 는 초깃값으로 지정한 숫자부터 emit 하고 emit 한 숫자를 1씩 증가시켜서 다시 emit 하는 작업을 반복하는데, 이렇게 1씩 증가하는 숫자를 상태값으로 정의합니다

    따라서 V옆에 붙은 +/- 기호 역시 초깃값 이후부터 1씩 증가한 숫자가 음수나 양수가 될 수 있다는 것을 의미합니다. 

    그리고 return 옆의 V에 붙는 +는 1씩 증가시킨다는 의미 입니다

     

    Tip
    generate() 의 마블 다이어그램상에는 상태값이 숫자로 표현되지만 숫자 이외에 객쳐여도 상관없습니다
    다만 상태 값으로 표현되는 객체 내부에 1씩 증가하는 숫자를 포함하고 있어야 해당 숫자값을 조건으로 지정해서 onComplete Signal 을 발생시킬 수 있습니다.

     

    코드 14-9 generate 예제

    public static void main(String[] args) {
      Flux
          .generate(() -> 0, (state, sink) -> {
            sink.next(state);
            if (state == 10) {
              sink.complete();
            }
            return ++state;
          })
          .subscribe(data -> log.info("# onNext: {}", data));
    }
    //result
    > Task :Example14_9.main()
    18:33:35.162 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    18:33:35.169 [main] c.operator_1_create.Example14_9 INFO - # onNext: 0
    18:33:35.171 [main] c.operator_1_create.Example14_9 INFO - # onNext: 1
    18:33:35.171 [main] c.operator_1_create.Example14_9 INFO - # onNext: 2
    18:33:35.171 [main] c.operator_1_create.Example14_9 INFO - # onNext: 3
    18:33:35.171 [main] c.operator_1_create.Example14_9 INFO - # onNext: 4
    18:33:35.171 [main] c.operator_1_create.Example14_9 INFO - # onNext: 5
    18:33:35.171 [main] c.operator_1_create.Example14_9 INFO - # onNext: 6
    18:33:35.171 [main] c.operator_1_create.Example14_9 INFO - # onNext: 7
    18:33:35.172 [main] c.operator_1_create.Example14_9 INFO - # onNext: 8
    18:33:35.172 [main] c.operator_1_create.Example14_9 INFO - # onNext: 9
    18:33:35.172 [main] c.operator_1_create.Example14_9 INFO - # onNext: 10

    generate() 의 첫 번째 파라미터에서 초깃값을 0으로 지정했으며,

    두 번째 파라미터에서 전달받은 SynchronousSink 객체로 상태 값을 emit 합니다 

    SynchronousSink 는 하나의 Signal 만 동기적으로 발생시킬 수 있으며 최대 하나의 상태 값만 emit 하는 인터페이스입니다

     

    3번째 후처리 인자가 있는 generate

    코드 14-10 generate 예제 2

    구구단 3단 출력

    public static void main(String[] args) {
      final int dan = 3;
      Flux
          .generate(() -> Tuples.of(dan, 1), (state, sink) -> {
            sink.next(state.getT1() + " * " +
                state.getT2() + " = " + state.getT1() * state.getT2());
            if (state.getT2() == 9) {
              sink.complete();
            }
            return Tuples.of(state.getT1(), state.getT2() + 1);
          }, state -> log.info("# 구구단 {}단 종료!", state.getT1()))
          .subscribe(data -> log.info("# onNext: {}", data));
    }
    // result
    > Task :Example14_10.main()
    13:43:12.201 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    13:43:12.220 [main] c.operator_1_create.Example14_10 INFO - # onNext: 3 * 1 = 3
    13:43:12.223 [main] c.operator_1_create.Example14_10 INFO - # onNext: 3 * 2 = 6
    13:43:12.223 [main] c.operator_1_create.Example14_10 INFO - # onNext: 3 * 3 = 9
    13:43:12.223 [main] c.operator_1_create.Example14_10 INFO - # onNext: 3 * 4 = 12
    13:43:12.223 [main] c.operator_1_create.Example14_10 INFO - # onNext: 3 * 5 = 15
    13:43:12.223 [main] c.operator_1_create.Example14_10 INFO - # onNext: 3 * 6 = 18
    13:43:12.224 [main] c.operator_1_create.Example14_10 INFO - # onNext: 3 * 7 = 21
    13:43:12.224 [main] c.operator_1_create.Example14_10 INFO - # onNext: 3 * 8 = 24
    13:43:12.224 [main] c.operator_1_create.Example14_10 INFO - # onNext: 3 * 9 = 27
    13:43:12.225 [main] c.operator_1_create.Example14_10 INFO - # 구구단 3단 종료!

    숫자가 아닌 Tuplues 객체를 상태 값으로 사용했기 때문에 generate()의 첫 번째 파라미터인 Callable 의 리턴 값고 두 번째 파라미터인 BiFuction 의 리턴 값이 모두 Tuples 객체입니다

    Tuples.of(dan,1) 에서 두 번째 값을 1씩 증가시켜 주고

    if (state.getT2() == 9) 조건으로 sink.complete() 을 발생시켰습니다.

    그리고 Sequence 가 모두 종료되면 generate() 의 세 번째 파라미터인 Consumer 를 통해 후처리 로그

    "# 구구단 {}단 종료!"

    를 출력합니다

     

    코드 14-11 generate 예제 3

    2019~2021 까지, 연도별 BTC 최고가를 출력하는 예제

    public static void main(String[] args) {
      Map<Integer, Tuple2<Integer, Long>> map =
          SampleData.getBtcTopPricesPerYearMap();
      Flux
          .generate(() -> 2019, (state, sink) -> {
            if (state > 2021) {
              sink.complete();
            } else {
              sink.next(map.get(state));
            }
    
            return ++state;
          })
          .subscribe(data -> log.info("# onNext: {}", data));
    }
    
    public static Map<Integer, Tuple2<Integer, Long>> getBtcTopPricesPerYearMap() {
      return btcTopPricesPerYear
          .stream()
          .collect(Collectors.toMap(t1 -> t1.getT1(), t2 -> t2));
    }
    
    public static final List<Tuple2<Integer, Long>> btcTopPricesPerYear =
        Arrays.asList(
            Tuples.of(2010, 565L),
            Tuples.of(2011, 36_094L),
            Tuples.of(2012, 17_425L),
            Tuples.of(2013, 1_405_209L),
            Tuples.of(2014, 1_237_182L),
            Tuples.of(2015, 557_603L),
            Tuples.of(2016, 1_111_811L),
            Tuples.of(2017, 22_483_583L),
            Tuples.of(2018, 19_521_543L),
            Tuples.of(2019, 15_761_568L),
            Tuples.of(2020, 22_439_002L),
            Tuples.of(2021, 63_364_000L)
        );
    
    // result
    > Task :Example14_11.main()
    14:21:27.776 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    14:21:27.905 [main] c.operator_1_create.Example14_11 INFO - # onNext: [2019,15761568]
    14:21:27.907 [main] c.operator_1_create.Example14_11 INFO - # onNext: [2020,22439002]
    14:21:27.907 [main] c.operator_1_create.Example14_11 INFO - # onNext: [2021,63364000]

    generate() 의 초기 상태 값을 2019로 지정해서 1씩 증가한 연도별 BTC 최고가 금액을 map 에서 읽어 온 후, emit 합니다

     

    8) create

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#create-java.util.function.Consumer-

    create() operator 는 generate() 처럼 프로그래밍 방식으로 Signal 이벤트를 발생시키지만 generate() 와 몇 가지 차이점이 있습니다 

    generate() 는 데이터를 동기적으로 한 번에 한 건씩 emit 할 수 있는 반면에, create() 는 한 번에 여러 건의 데이터를 비동기적으로 emit 할 수 있습니다.

     

    코드 14-12 create 예제 1 (pull 방식)

    1,2,3 ~10 을 2개씩 요청하여, emit 하는 예제

    static int SIZE = 0;
    static int COUNT = -1; // 시작
    final static List<Integer> DATA_SOURCE = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    
    public static void main(String[] args) {
      log.info("# start");
      Flux.create((FluxSink<Integer> sink) -> {
        sink.onRequest(n -> {
          try {
            Thread.sleep(1000L);
            for (int i = 0; i < n; i++) {
              if (COUNT >= 9) {
                sink.complete();
              } else {
                COUNT++;
                sink.next(DATA_SOURCE.get(COUNT));
              }
            }
          } catch (InterruptedException e) {
          }
        });
    
        sink.onDispose(() -> log.info("# clean up"));
      }).subscribe(new BaseSubscriber<>() {
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
          request(2);
        }
    
        @Override
        protected void hookOnNext(Integer value) {
          SIZE++;
          log.info("# onNext: {}", value);
          if (SIZE == 2) {
            request(2);
            SIZE = 0;
          }
        }
    
        @Override
        protected void hookOnComplete() {
          log.info("# onComplete");
        }
      });
    }
    
    // result
    > Task :Example14_12.main()
    14:31:47.369 [main] c.operator_1_create.Example14_12 INFO - # start
    14:31:47.475 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    14:31:48.488 [main] c.operator_1_create.Example14_12 INFO - # onNext: 1
    14:31:48.491 [main] c.operator_1_create.Example14_12 INFO - # onNext: 2
    14:31:49.491 [main] c.operator_1_create.Example14_12 INFO - # onNext: 3
    14:31:49.492 [main] c.operator_1_create.Example14_12 INFO - # onNext: 4
    14:31:50.495 [main] c.operator_1_create.Example14_12 INFO - # onNext: 5
    14:31:50.495 [main] c.operator_1_create.Example14_12 INFO - # onNext: 6
    14:31:51.498 [main] c.operator_1_create.Example14_12 INFO - # onNext: 7
    14:31:51.499 [main] c.operator_1_create.Example14_12 INFO - # onNext: 8
    14:31:52.501 [main] c.operator_1_create.Example14_12 INFO - # onNext: 9
    14:31:52.502 [main] c.operator_1_create.Example14_12 INFO - # onNext: 10
    14:31:53.503 [main] c.operator_1_create.Example14_12 INFO - # onComplete
    14:31:53.506 [main] c.operator_1_create.Example14_12 INFO - # clean up

    코드 14-12는 Subscriber 에서 요청한 경우에만(?) create() 내에서 요청 개수만큼의 데이터를 emit 하는 예제 코드 입니다

    코드 이해를 위한 Tips

    i. 한 번에 2개씩의 데이터를 요청하기 위해, 아래 구문을 이용하였습니다.
    SIZE++
    if ( SIZE == 2) {
      request(2);
      SIZE=0
    }

    ii. static int COUNT = -1; 
    으로 -1 부터 시작하지만
    } else {
      COUNT++;
      sink.next(DATA_SOURCE.get(COUNT));
    }
    COUNT++ 을 먼저 하기 때문에 index 0 의 데이터 부터  읽어 들입니다. 

    List<Integer> DATA_SOURCE = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);를 데이터소스로 이용했습니다 그리고, Subscriber 가 직접 요청 개수를 지정하기 위해서 BaseSubscriber를 사용했습니다

     

    01. 구독이 발생하면 BaseSubscriber 의 hookOnSubcriber() 메서드 내부에서 request(2) 를 호출하여 한 번에 두 개의 데이터를 요청합니다

     

    02. Subscriber 쪽에서 request() 메서드를 호출하면 create() 내부에서 sink.onRequest(n -> { ... }) 메서드의 람다 표현식이 실행됩니다

     

    03. Subscriber 가 요청한 개수만큼 데이터를 emit 합니다sink.next(DATA_SOURCE.get(COUNT));

     

    04. BaseSubscriber 의 hookOnNext() 메서드 내부에서 emit 된 데이터를 로그로 출력한 후 다시 request(2) 를 호출하여 두 개의 데이터를 요청합니다 

     

    05. 2~4의 과정이 반복되다가 dataSource List 의 숫자를 모두 emit 하면 onComplete Signal 을 발생시킵니다if (COUNT >= 9) {
      sink.complete();

     

    06. BaseSubscriber 의 hookOnComplete() 메서드 내부에서 종료 로그를 출력합니다

    protected void hookOnComplete() {
      log.info("# onComplete");
    }

     

    07. sink.onDispose() 의 람다 표현식이 실행되어 후처리 로그를 출력합니다

    sink.onDispose(() -> log.info("# clean up"));

     

    FluxSink 의 onDispose() 메서드에서 dispose 의 의미는 FluxSink 관점에서 FluxSink가 더이상 사용되지 않는다는 의미 입니다
    구독을 취소하기 위해 사용되는 Disposable.dispose() 메서드의 dispose 와 의미는 같지만, dispose 하려는 대상이 다르다는 것을 참고하기 바랍니다.

     

    코드 14-12의 create() 의 경우 Subscriber 가 request() 메서드를 통해 요청을 보내면 Publisher 가 해당 요청 개수만큼의 데이터를 emit  하는 일종의 pull 방식으로 데이터를 처리합니다

     

    그런데 create() operator 는 Subscriber 의 요청과 상관없이 비동기적으로 데이터를 emit 하는 push 방식 역시 사용할 수 있습니다.

     

    코드 14-13 create 예제 2 (push 방식)

    암호 화폐의 가격 변동이 있을 때마다 변동되는 가격 데이터를 Subscriber 에게 비동기적으로 emit 하는 상황을 시뮬레이션한 예제 코드

    public static void main(String[] args) throws InterruptedException {
      CryptoCurrencyPriceEmitter priceEmitter = new CryptoCurrencyPriceEmitter();
    
      Flux.create((FluxSink<Integer> sink) ->
              priceEmitter.setListener(new CryptoCurrencyPriceListener() {
                @Override
                public void onPrice(List<Integer> priceList) {
                  priceList.stream().forEach(price -> {
                    sink.next(price);
                  });
                }
    
                @Override
                public void onComplete() {
                  sink.complete();
                }
              }))
          .publishOn(Schedulers.parallel())
          .subscribe(
              data -> log.info("# onNext: {}", data),
              error -> {
              },
              () -> log.info("# onComplete"));
    
      Thread.sleep(3000L);
    
      priceEmitter.flowInto();
    
      Thread.sleep(2000L);
      priceEmitter.complete();
    }
    // result
    > Task :Example14_13.main()
    15:20:59.423 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    15:21:02.495 [parallel-1] c.operator_1_create.Example14_13 INFO - # onNext: 50000000
    15:21:02.507 [parallel-1] c.operator_1_create.Example14_13 INFO - # onNext: 50100000
    15:21:02.508 [parallel-1] c.operator_1_create.Example14_13 INFO - # onNext: 50700000
    15:21:02.508 [parallel-1] c.operator_1_create.Example14_13 INFO - # onNext: 51500000
    15:21:02.508 [parallel-1] c.operator_1_create.Example14_13 INFO - # onNext: 52000000

    코드 14-13에서 사용하는 CryptoCurrencyPriceEmitter 와 SampleData.btcPrices

    public class CryptoCurrencyPriceEmitter {
    
    	private CryptoCurrencyPriceListener listener;
    
    	public void setListener(CryptoCurrencyPriceListener listener) {
    		this.listener = listener;
    	}
    
    	public void flowInto() {
    		listener.onPrice(SampleData.btcPrices);
    	}
    
    	public void complete() {
    		listener.onComplete();
    	}
    }
    
    public class SampleData {
    	public static final List<Integer> btcPrices =
    			Arrays.asList(50_000_000, 50_100_000, 50_700_000, 51_500_000, 52_000_000);
    }
    
    public interface CryptoCurrencyPriceListener {
    
    	void onPrice(List<Integer> priceList);
    
    	void onComplete();
    }

    동작과정

    01. 구독이 발생하면 create() operator 의 파라미터인 람다 표현식이 실행됩니다

    02. 암호 화폐의 가격 변동이 발생하면 변동된 가격 데이터를 emit 할 수 있도록 CryptoCurrencyPriceEmitter 가 CryptoCurrencyPriceListener 를 Listener 로 등록합니다

    priceEmitter.setListener(new CryptoCurrencyPriceListener() {
      @Override
      public void onPrice(List<Integer> priceList) {
        priceList.stream().forEach(price -> {
          sink.next(price);
        });
      }

      @Override
      public void onComplete() {
        sink.complete();
      }
    }))

     

    이제 암호 화폐의 가격 변동이 있을 때마다 CryptoCurrencyPriceListener 클래스의 onPrice() 메서드가 호출됩니다

     

    03. 암호 화폐의 가격 변동이 발생하기 전에 3초의 지연 시간을 가집니다

    Thread.sleep(3000L);

     

    04.암호 화폐의 가격 변동이 발생하는 것을 시뮬레이션하기 위해 CryptoCurrencyPriceEmitter 의 flowInto() 메서드를 호출합니다

    priceEmitter.flowInto();

     

    05. 2초의 지연 시간을 가진 후 해당 암호 화폐의 데이터 처리를 종료합니다

     

    코드 14-13의 create() operator 의 경우, Subscriber 가 요청을 보내는 것과 상관없이 Listener 를 통해 들어오는 데이터를 리스닝하고  있다가 실제로 들어오는 데이터가 있을 경우에만 데이터를 emit 하는 일종의 push 방식으로 데이터를 처리합니다

    즉, Subscriber 에서 데이터를 지속적으로 요청하지 않지만,

    create(FluxSink -> {}) 가 listner 를 등록하고, 그로 부터 데이터를 받자마자 emit 하는 방식입니다.

     

    create() + backpressure

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#create-java.util.function.Consumer-

     

    코드 14-14 create 예제 3 (Backpressure DROP)

    static int start = 1;
    static int end = 4;
    
    public static void main(String[] args) throws InterruptedException {
      Flux.create((FluxSink<Integer> emitter) -> {
            emitter.onRequest(n -> {
              log.info("# requested: " + n);
              try {
                Thread.sleep(500L);
                for (int i = start; i <= end; i++) {
                  emitter.next(i);
                }
                start += 4;
                end += 4;
              } catch (InterruptedException e) {
              }
            });
    
            emitter.onDispose(() -> {
              log.info("# clean up");
            });
          }, FluxSink.OverflowStrategy.DROP)
          .subscribeOn(Schedulers.boundedElastic())
          .publishOn(Schedulers.parallel(), 2)
          .subscribe(data -> log.info("# onNext: {}", data));
    
      Thread.sleep(3000L);
    }
    // result
    > Task :Example14_14.main()
    15:44:11.937 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    15:44:11.978 [boundedElastic-1] c.operator_1_create.Example14_14 INFO - # requested: 2
    15:44:12.482 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 1
    15:44:12.486 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 2
    15:44:12.488 [boundedElastic-1] c.operator_1_create.Example14_14 INFO - # requested: 2
    15:44:12.988 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 5
    15:44:12.989 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 6
    15:44:12.989 [boundedElastic-1] c.operator_1_create.Example14_14 INFO - # requested: 2
    15:44:13.493 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 9
    15:44:13.494 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 10
    15:44:13.494 [boundedElastic-1] c.operator_1_create.Example14_14 INFO - # requested: 2
    15:44:13.999 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 13
    15:44:13.999 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 14
    15:44:13.999 [boundedElastic-1] c.operator_1_create.Example14_14 INFO - # requested: 2
    15:44:14.503 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 17
    15:44:14.503 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 18
    15:44:14.503 [boundedElastic-1] c.operator_1_create.Example14_14 INFO - # requested: 2
    onRequest
    subscribeOn : 구독이 발생한 직후 실행될 스레드를 지정하는 operator
    publishOn 을 기준으로 아래쪽인 downstream 의 실행 스레드를 변경합니다

    코드 14-14는 create() Operator 에 Backpressure 전략을 지정하는 예제 코드 입니다

    예제 코드에서는 Backpressure 전략 중에서 DROP 전략을 지정했습니다

    FluxSink.OverflowStrategy.DROP

     

    그리고, publishOn() 으로 Scheduler 를 설정하면서 prefetch 수를 2로 지정한뒤에

    publishOn(Schedulers.parallel(), 2)

     

    start 와 end 변수를 사용해서 매번 4개의 데이터를 emit 하게 함으로써 코드 실행 결과에서 Backpressure DROP 전략이 적용되는 것을 조금 더 쉽게 확인할 수 있도록 했습니다

     

    동작과정

    01 구독이 발생하면 publishOn() 에서 설정한 숫자만큼의 데이터를 요청합니다

    publishOn(Schedulers.parallel(), 2)

     

    02 create() 내부에서 sink.onRequest() 메서드의 람다 표현식이 실행됩니다

     

    03 요청한 개수보다 2개 더 많은 4개의 데이터를  emit 합니다

    static int start = 1;
    static int end = 4;

    for (int i = start; i <= end; i++) {
      emitter.next(i);
    }

     

    04 Subscriber 가 emit 된 데이터를 전달받아 로그로 출력합니다

    .subscribe(data -> log.info("# onNext: {}", data));

     

    05 이때 Backpressure DROP 전략이 적용되었으므로, 요청 개수를 초과하여 emit 된 3,4 데이터는 DROP 됩니다

    06 다시 publishOn() 에서 지정한 숫자만큼의 데이터를 요청합니다

    07 create() Operator 내부에서 onComplete Signal 이 발생하지 않았기 때문에 2~6의 과정을 반복하다가 설정한 지연 시간이 지나면 main 스레드가 종료되어 코드 실행이 종료됩니다

    Thread.sleep(3000L);

     

    실행 결과를 보면 Downstream 에서 매번 두 개의 데이터를 요청하고 있는데, 

    create() operator 내부에서는 네 4개의 데이터를 emit 하기 때문에 emit 된 처음 두 개의 데이터는 정상적으로 emit 되고, 나머지 두 개의 데이터는 DROP 되는 것을 알 수 있습니다

     

    Tip,  log 와 onBackpressureDrop 을 추가하면, drop 되는 데이터 확인할 수 있습니다
    근데, 특이하게 requested: 2 -> 9223372036854775807 로 늘어나면서 제대로 실행이 안되는데요. 사유를 잘 모르겠습니다
    }, FluxSink.OverflowStrategy.DROP)
    .log()
    .onBackpressureDrop(dropped -> log.info("# dropped: {}", dropped))
    
    // changed result
    > Task :Example14_14.main()
    16:26:48.500 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    16:26:48.537 [boundedElastic-1] reactor.Flux.Create.1 INFO - onSubscribe(FluxCreate.DropAsyncSink)
    16:26:48.539 [boundedElastic-1] reactor.Flux.Create.1 INFO - request(unbounded)
    16:26:48.546 [boundedElastic-1] c.operator_1_create.Example14_14 INFO - # requested: 9223372036854775807
    16:26:49.050 [boundedElastic-1] reactor.Flux.Create.1 INFO - onNext(1)
    16:26:49.051 [boundedElastic-1] reactor.Flux.Create.1 INFO - onNext(2)
    16:26:49.051 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 1
    16:26:49.051 [boundedElastic-1] reactor.Flux.Create.1 INFO - onNext(3)
    16:26:49.051 [parallel-1] c.operator_1_create.Example14_14 INFO - # onNext: 2
    16:26:49.051 [boundedElastic-1] c.operator_1_create.Example14_14 INFO - # dropped: 3
    16:26:49.052 [boundedElastic-1] reactor.Flux.Create.1 INFO - onNext(4)
    16:26:49.052 [boundedElastic-1] c.operator_1_create.Example14_14 INFO - # dropped: 4

     

    기억하세요

    1. just() Operator 는 Hot Publisher 이기 때문에 Subscriber 의 구독 여부와는 상관없이 데이터를 emit 하며, 구독이 발생하면 emit 된 데이터를 다시 재생(replay) 해서 Subscriber 에게 전달합니다.
    2. defer() Operator 는 구독이 발생하기 전까지는 데이터의 emit 을 지연시키기 때문에 just() Operator 를 defer() 로 감싸게 되면 실제 구독이 발생해야 데이터를 emit 합니다.
    3. using() Operator 는 파라미터로 전달받은 resource 를 emit 하는 Flux 를 생성합니다.
    4. generator() Operator 는 프로그래밍 방식으로 Signal 이벤트를 발생시키며, 동기적으로 데이터를 하나씩 순차적으로 emit 합니다.
    5. create() Operator 는 generator() Operator 와 마찬가지로 프로그래밍 방식으로 Signal 이벤트를 발생시키지만 generate() Operator와 달리 한 번에 여러 건의 데이터를 비동기적으로 emit 할 수 있다.

     

     

     

     

     

     

     

    'Spring > Webflux' 카테고리의 다른 글

    14장 Operator 3 - transformation  (0) 2023.07.16
    14장 Operator 2 - filter  (0) 2023.07.09
    13장 Testing  (0) 2023.05.27
    12장 Debugging  (0) 2023.05.21
    11장 Context  (0) 2023.05.12

    댓글

Designed by Tistory.