ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 14장 Operator 3 - transformation
    Spring/Webflux 2023. 7. 16. 10:57

     

    github : https://github.com/bjpublic/Spring-Reactive/tree/main/part2/src/main/java/chapter14/operator_3_transformation

     

    14.4 Sequence 변환 Operator

    1) map

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#map-java.util.function.Function-

    map() Operator 는 Upstream 에서 emit 된 데이터를 mapper Function 을 사용하여 변환한 후, Downstream 으로 emit 합니다.

    map() Operator 의 마블 다이어그램을 보면 어떤 기능을 수행하는지 직관적으로 알 수 있습니다

     

    map() 내부에서 에러 발생 시 Sequence 가 종료되지 않고 계속 진행되도록 하는 기능을 지원하는데, 이 부분은 14.6절 에러 처리를 위한 Operator 에서 자세히 알아보겠습니다.

     

    코드 14-27 map 예제 1

    public static void main(String[] args) {
        Flux
            .just("1-Circle", "3-Circle", "5-Circle")
            .map(circle -> circle.replace("Circle", "Rectangle"))
            .subscribe(data -> log.info("# onNext: {}", data));
    }
    
    // result
    > Task :Example14_27.main()
    10:54:13.794 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    10:54:13.805 [main] c.o.Example14_27 INFO - # onNext: 1-Rectangle
    10:54:13.806 [main] c.o.Example14_27 INFO - # onNext: 3-Rectangle
    10:54:13.806 [main] c.o.Example14_27 INFO - # onNext: 5-Rectangle

    Upstream 에서 emit 된 문자열의 일부인 Circle 을 map() 내부에서 replace() 메서드를 이용해서 Rectangle 로 변환한 후 Downstream 으로 emit 합니다

     

    코드 14-28 map 예제 2

    public static void main(String[] args) {
      final double buyPrice = 50_000_000;
      Flux
          .fromIterable(SampleData.btcTopPricesPerYear)
          .filter(tuple -> tuple.getT1() == 2021)
          .doOnNext(data -> log.info("# doOnNext: {}", data))
          .map(tuple -> calculateProfitRate(buyPrice, tuple.getT2()))
          .subscribe(data -> log.info("# onNext: {}%", data));
    }
    
    private static double calculateProfitRate(final double buyPrice, Long topPrice) {
      return (topPrice - buyPrice) / buyPrice * 100;
    }
    
    // result
    > Task :Example14_28.main()
    11:20:58.770 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    11:20:58.869 [main] c.o.Example14_28 INFO - # doOnNext: [2021,63364000]
    11:20:58.870 [main] c.o.Example14_28 INFO - # onNext: 26.728%

    map() Operator 를 이용해 특정 연도의 BTC 최고가 시점의 수익률을 계산하는 예제 코드 입니다

    BTC를 50,000,000 원 어치 구매했을 경우, 2021년도 최고가 시점의 수익률이 얼마인지 계산합니다

    또, 이해를 돕기 위해서 doOnNext() Operator 를 이용해 2021년도 BTC 최고가 금액을 출력합니다

     

    2) flatmap

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#flatMap-java.util.function.Function-

    Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave.
    이 Flux에서 비동기적으로 내보낸 요소를 게시자로 변환한 다음 병합을 통해 이러한 내부 게시자를 단일 Flux로 병합하여 interleave(끼워넣을) 수 있습니다.

    There are three dimensions to this operator that can be compared with flatMapSequential and concatMap:
    flatMapSequential, concatMap 과 비교하여 3가지 차원이 있습니다

    - Generation of inners and subscription: this operator is eagerly subscribing to its inners.
    이 연산자는 innser sequence 의 결과를 열렬히? 구독하고 있습니다.

    - Ordering of the flattened values: this operator does not necessarily preserve original ordering, as inner element are flattened as they arrive.
    이 연산자는 내부 요소가 도착하면 병합되므로 원래 순서를 반드시 유지하지는 않습니다.

    - Interleaving: this operator lets values from different inners interleave (similar to merging the inner sequences).
    서로 다른 내부 interleave 로 부터 값을 놓아둡니다
    서로 다른 내부 게시자(publisher)에서 내보낸 값을 인터리브 방식으로 함께 병합할 수 있음을 나타냅니다.
    * interleave [|ɪntər│liːv] 동사 (특히 얇은 막 같은 것을) 끼우다
     - First, what is new is often interleaved with the commonplace : 첫째, 새로운 것은 종종 진부한 것과 섞여(끼여) 있습니다.
     - Thread scheduling is much more efficient when done in the client process, without requiring hardware- and operating-system context switches when synchronizing or interleaving thread executions. : 스레드 스케줄링은 스레드 실행을 동기화하거나 인터리빙할 때 하드웨어 및 운영 체제 컨텍스트 전환 없이 클라이언트 프로세스에서 수행될 때 훨씬 더 효율적입니다.

     

    마블 다이어그램에서는 flatMap() Operator 영역 위쪽의 Upstream 에서 동그라미 모양의 데이터가 emit 되어 flatMap() Operator 로 전달되면,

    flapMap() 내부에서 Inner Sequence 를 생성한 후, 한 개 이상의 네모 모양으로 변환된 데이터를 emit 하고 있는 것을 볼 수 있습니다.

    즉, Upstream 에서 emit 된 데이터 한 건이 Inner Sequence 에서 여러 건의 데이터로 변환된다는 것을 알 수 있습니다.

    그런데, Upstream 에서 emit 된 데이터는 이렇게 Inner Sequence 에서 평탄화(flatten) 작업을 거치면서 하나의 Sequence 로 병합(merge) 되어 Downstream 으로 emit 됩니다.

     

    코드 14-29 flatMap 예제 1

    public static void main(String[] args) {
      Flux
          .just("Good", "Bad")
          .flatMap(feeling -> Flux
              .just("Morning", "Afternoon", "Evening")
              .map(time -> feeling + " " + time))
          .subscribe(log::info);
    }
    
    // result
    > Task :Example14_29.main()
    13:40:18.630 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    13:40:18.662 [main] c.o.Example14_29 INFO - Good Morning
    13:40:18.664 [main] c.o.Example14_29 INFO - Good Afternoon
    13:40:18.665 [main] c.o.Example14_29 INFO - Good Evening
    13:40:18.665 [main] c.o.Example14_29 INFO - Bad Morning
    13:40:18.665 [main] c.o.Example14_29 INFO - Bad Afternoon
    13:40:18.666 [main] c.o.Example14_29 INFO - Bad Evening

    코드 14-29는 flatMap() Operator 의 기본 개념을 이해하기 위한 예제 코드입니다.

    Upstream 인 just() Operator 에서 두 개의 데이터를 emit 하면.

    flatMap() 내부의 Inner Sequence 에서 세 개의 데이터를 emit 합니다

    즉, 2(Upstream 에서 emit 되는 데이터 수) x 3(Inner Sequence에서 emit 되는 데이터 수) = 6개의 데이터가 최종적으로 Subscriber 에게 전달됩니다

     

    코드 14-30 flatMap 예제2

    구구단

    public static void main(String[] args) throws InterruptedException {
      Flux
          .range(2, 8)
          .flatMap(dan -> Flux
              .range(1, 9)
              .publishOn(Schedulers.parallel())
              .map(n -> dan + " * " + n + " = " + dan * n))
          .subscribe(log::info);
    
      Thread.sleep(100L);
    }
    
    // result
    > Task :Example14_30.main()
    13:44:00.995 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    13:44:01.051 [parallel-2] c.o.Example14_30 INFO - 3 * 1 = 3
    13:44:01.055 [parallel-2] c.o.Example14_30 INFO - 3 * 2 = 6
    13:44:01.055 [parallel-2] c.o.Example14_30 INFO - 3 * 3 = 9
    13:44:01.055 [parallel-2] c.o.Example14_30 INFO - 3 * 4 = 12
    13:44:01.055 [parallel-2] c.o.Example14_30 INFO - 3 * 5 = 15
    13:44:01.055 [parallel-2] c.o.Example14_30 INFO - 3 * 6 = 18
    13:44:01.055 [parallel-2] c.o.Example14_30 INFO - 3 * 7 = 21
    13:44:01.055 [parallel-2] c.o.Example14_30 INFO - 3 * 8 = 24
    13:44:01.055 [parallel-2] c.o.Example14_30 INFO - 3 * 9 = 27
    13:44:01.056 [parallel-3] c.o.Example14_30 INFO - 4 * 1 = 4
    13:44:01.056 [parallel-3] c.o.Example14_30 INFO - 5 * 1 = 5
    13:44:01.057 [parallel-3] c.o.Example14_30 INFO - 5 * 2 = 10
    13:44:01.057 [parallel-3] c.o.Example14_30 INFO - 5 * 3 = 15
    13:44:01.057 [parallel-3] c.o.Example14_30 INFO - 5 * 4 = 20
    13:44:01.057 [parallel-3] c.o.Example14_30 INFO - 5 * 5 = 25
    13:44:01.057 [parallel-3] c.o.Example14_30 INFO - 5 * 6 = 30
    13:44:01.057 [parallel-3] c.o.Example14_30 INFO - 5 * 7 = 35
    13:44:01.057 [parallel-3] c.o.Example14_30 INFO - 5 * 8 = 40
    13:44:01.058 [parallel-3] c.o.Example14_30 INFO - 5 * 9 = 45
    13:44:01.058 [parallel-3] c.o.Example14_30 INFO - 6 * 1 = 6
    13:44:01.058 [parallel-3] c.o.Example14_30 INFO - 6 * 2 = 12
    13:44:01.058 [parallel-3] c.o.Example14_30 INFO - 6 * 3 = 18
    13:44:01.058 [parallel-3] c.o.Example14_30 INFO - 6 * 4 = 24
    13:44:01.058 [parallel-3] c.o.Example14_30 INFO - 6 * 5 = 30
    13:44:01.058 [parallel-3] c.o.Example14_30 INFO - 6 * 6 = 36
    13:44:01.058 [parallel-3] c.o.Example14_30 INFO - 6 * 7 = 42
    13:44:01.059 [parallel-3] c.o.Example14_30 INFO - 6 * 8 = 48
    13:44:01.059 [parallel-3] c.o.Example14_30 INFO - 6 * 9 = 54
    13:44:01.059 [parallel-3] c.o.Example14_30 INFO - 7 * 1 = 7
    13:44:01.059 [parallel-3] c.o.Example14_30 INFO - 7 * 2 = 14
    13:44:01.059 [parallel-3] c.o.Example14_30 INFO - 7 * 3 = 21
    13:44:01.059 [parallel-3] c.o.Example14_30 INFO - 7 * 4 = 28
    13:44:01.059 [parallel-3] c.o.Example14_30 INFO - 7 * 5 = 35
    13:44:01.059 [parallel-3] c.o.Example14_30 INFO - 7 * 6 = 42
    13:44:01.059 [parallel-3] c.o.Example14_30 INFO - 7 * 7 = 49
    13:44:01.059 [parallel-3] c.o.Example14_30 INFO - 7 * 8 = 56
    13:44:01.060 [parallel-3] c.o.Example14_30 INFO - 7 * 9 = 63
    13:44:01.060 [parallel-3] c.o.Example14_30 INFO - 8 * 1 = 8
    13:44:01.060 [parallel-3] c.o.Example14_30 INFO - 8 * 2 = 16
    13:44:01.060 [parallel-3] c.o.Example14_30 INFO - 8 * 3 = 24
    13:44:01.060 [parallel-3] c.o.Example14_30 INFO - 8 * 4 = 32
    13:44:01.060 [parallel-3] c.o.Example14_30 INFO - 8 * 5 = 40
    13:44:01.060 [parallel-3] c.o.Example14_30 INFO - 8 * 6 = 48
    13:44:01.060 [parallel-3] c.o.Example14_30 INFO - 8 * 7 = 56
    13:44:01.060 [parallel-3] c.o.Example14_30 INFO - 8 * 8 = 64
    13:44:01.060 [parallel-3] c.o.Example14_30 INFO - 8 * 9 = 72
    13:44:01.060 [parallel-3] c.o.Example14_30 INFO - 9 * 1 = 9
    13:44:01.061 [parallel-3] c.o.Example14_30 INFO - 9 * 2 = 18
    13:44:01.061 [parallel-3] c.o.Example14_30 INFO - 9 * 3 = 27
    13:44:01.061 [parallel-3] c.o.Example14_30 INFO - 9 * 4 = 36
    13:44:01.061 [parallel-3] c.o.Example14_30 INFO - 9 * 5 = 45
    13:44:01.061 [parallel-3] c.o.Example14_30 INFO - 9 * 6 = 54
    13:44:01.061 [parallel-3] c.o.Example14_30 INFO - 9 * 7 = 63
    13:44:01.061 [parallel-3] c.o.Example14_30 INFO - 9 * 8 = 72
    13:44:01.061 [parallel-3] c.o.Example14_30 INFO - 9 * 9 = 81
    13:44:01.061 [parallel-3] c.o.Example14_30 INFO - 2 * 1 = 2
    13:44:01.061 [parallel-3] c.o.Example14_30 INFO - 2 * 2 = 4
    13:44:01.061 [parallel-3] c.o.Example14_30 INFO - 2 * 3 = 6
    13:44:01.062 [parallel-3] c.o.Example14_30 INFO - 2 * 4 = 8
    13:44:01.062 [parallel-3] c.o.Example14_30 INFO - 2 * 5 = 10
    13:44:01.062 [parallel-3] c.o.Example14_30 INFO - 2 * 6 = 12
    13:44:01.062 [parallel-3] c.o.Example14_30 INFO - 2 * 7 = 14
    13:44:01.062 [parallel-3] c.o.Example14_30 INFO - 2 * 8 = 16
    13:44:01.062 [parallel-3] c.o.Example14_30 INFO - 2 * 9 = 18
    13:44:01.062 [parallel-3] c.o.Example14_30 INFO - 4 * 2 = 8
    13:44:01.062 [parallel-3] c.o.Example14_30 INFO - 4 * 3 = 12
    13:44:01.063 [parallel-3] c.o.Example14_30 INFO - 4 * 4 = 16
    13:44:01.063 [parallel-3] c.o.Example14_30 INFO - 4 * 5 = 20
    13:44:01.063 [parallel-3] c.o.Example14_30 INFO - 4 * 6 = 24
    13:44:01.063 [parallel-3] c.o.Example14_30 INFO - 4 * 7 = 28
    13:44:01.064 [parallel-3] c.o.Example14_30 INFO - 4 * 8 = 32
    13:44:01.064 [parallel-3] c.o.Example14_30 INFO - 4 * 9 = 36

    코드 14-30은 flatMap() Operator 를 사용하여 구구단을 출력하는 예제 코드 입니다

    이번에는 flatMap() 내부의 Inner Sequence 에 Scheduler 를 설정해서 비동기적으로 데이터를 emit 하도록 했습니다.

    이처럼 flatMap() 내부의 Inner Sequence를 비동기적으로 실행하면 데이터 emit 의 순서를 보장하지 않는다는 사실을 알 수 있습니다.

     

    추가로 아래 2개의 파라미터도 지원합니다

    • concrrency : 동시실행 스레드 갯 수
    • prefetch : request 요청할 데이터 갯 수 

     

    3) concat

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#concat-java.lang.Iterable-

    concat() Operator 는 파라미터로 입력되는 Publisher 의 Sequence 를 연결해서 데이터를 순차적으로 emit 합니다

    특히 먼저 입력된 Publisher 의 Sequence 가 종료될 때까지 나머지 Publisher 의 Sequence 는 subscribe 되지 않고 대기하는 특성을 가집니다.

     

    코드 14-31 concat 예제 1

    public static void main(String[] args) {
      Flux
          .concat(Flux.just(1, 2, 3), Flux.just(4, 5))
          .subscribe(data -> log.info("# onNext: {}", data));
    }
    // result 
    > Task :Example14_31.main()
    17:28:33.302 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    17:28:33.317 [main] c.o.Example14_31 INFO - # onNext: 1
    17:28:33.319 [main] c.o.Example14_31 INFO - # onNext: 2
    17:28:33.319 [main] c.o.Example14_31 INFO - # onNext: 3
    17:28:33.319 [main] c.o.Example14_31 INFO - # onNext: 4
    17:28:33.319 [main] c.o.Example14_31 INFO - # onNext: 5

    concat() 의 파라미터로 2개의 Flux 를 입력했으며, 먼저 입력된 순서대로 Flux 의 Sequence 를 차례대로 emit 합니다

     

     

    코드 14-32 concat 예제 2

    public static void main(String[] args) {
      Flux
          .concat(
              Flux.fromIterable(getViralVector()),
              Flux.fromIterable(getMRNA()),
              Flux.fromIterable(getSubunit()))
          .subscribe(data -> log.info("# onNext: {}", data));
    }
    
    private static List<Tuple2<SampleData.CovidVaccine, Integer>> getViralVector() {
      return SampleData.viralVectorVaccines;
    }
    
    private static List<Tuple2<SampleData.CovidVaccine, Integer>> getMRNA() {
      return SampleData.mRNAVaccines;
    }
    
    private static List<Tuple2<SampleData.CovidVaccine, Integer>> getSubunit() {
      return SampleData.subunitVaccines;
    }
    
    
    public class SampleData {
      public static final List<Tuple2<CovidVaccine, Integer>> viralVectorVaccines =
        Arrays.asList(
          Tuples.of(CovidVaccine.AstraZeneca, 3_000_000),
          Tuples.of(CovidVaccine.Janssen, 2_000_000)
      );
    
      public static final List<Tuple2<CovidVaccine, Integer>> mRNAVaccines =
        Arrays.asList(
          Tuples.of(CovidVaccine.Pfizer, 1_000_000),
          Tuples.of(CovidVaccine.Moderna, 4_000_000)
      );
    
      public static final List<Tuple2<CovidVaccine, Integer>> subunitVaccines =
        Arrays.asList(
          Tuples.of(CovidVaccine.Novavax, 2_500_000)
      );
    }
    
    
    // result 
    > Task :Example14_32.main()
    17:30:56.086 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    17:30:56.208 [main] c.o.Example14_32 INFO - # onNext: [AstraZeneca,3000000]
    17:30:56.210 [main] c.o.Example14_32 INFO - # onNext: [Janssen,2000000]
    17:30:56.210 [main] c.o.Example14_32 INFO - # onNext: [Pfizer,1000000]
    17:30:56.210 [main] c.o.Example14_32 INFO - # onNext: [Moderna,4000000]
    17:30:56.210 [main] c.o.Example14_32 INFO - # onNext: [Novavax,2500000]

    유형별 코로나 백신 List 를 concat() Operator 를 사용해 연결하는 예제 코드 입니다

    세 개의 Flux 를 concat() Operator 의 파라미터로 전달해서 유형별 코로나 백신의 재고 수량을 순차적으로 subscriber 에게 전달합니다

     

    4) merge

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#merge-int-org.reactivestreams.Publisher...-

    merge() Operator 는 파라미터로 입력되는 Publisher 의 Sequence 에서 emit 된 데이터를 인터리빙(interleave) 방식으로 병합합니다.

    merge() 는 concat() 처럼 먼저 입력된 Publisher 의 Sequence 가 종료될 때까지 나머지 Publisher 의 Sequence 가 Subscribe 되지 않고 대기하는 것이 아니라 모든 Publisher 의 Sequence 가 즉시 subscribe 됩니다

    인터리브(interleave)
    인터리브는 '교차로 배치하다'라는 의미가 있습니다. 컴퓨터 하드디스크상의 데이터를 서로 인접하지 않게 배열해 성능을 향상시키거나 인접한 메모리 위치를 서로 다른 메모리 뱅크에 두어 동시에 여러 곳을 접근할 수 있게 해 주는 것을 인터리빙이라고 합니다
    merge() Operator 의 마블 다이어그램을 보면 두 개의 Sequence 에서 emit 되는 데이터가 서로 교차되는(interleaved) 방식으로 merge 되고 있는 것을 볼 수 있습니다.
    그런데 주의해야 할 것은 인터리빙 방식이라고 해서 각각의 Publisher 가 emit 하는 데이터를 하나씩 번갈아 가며 merge 한다는 것이 아니라 emit 된 시간 순서대로 merge 한다는 것입니다

    코드 14-33 merge 예제 1

    public static void main(String[] args) throws InterruptedException {
      Flux
          .merge(
              Flux.just(1, 2, 3, 4).delayElements(Duration.ofMillis(300L)),
              Flux.just(5, 6, 7).delayElements(Duration.ofMillis(500L))
          )
          .subscribe(data -> log.info("# onNext: {}", data));
    
      Thread.sleep(2000L);
    }
    //result 
    > Task :Example14_33.main()
    17:42:32.493 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    17:42:32.869 [parallel-1] c.o.Example14_33 INFO - # onNext: 1
    17:42:33.069 [parallel-2] c.o.Example14_33 INFO - # onNext: 5
    17:42:33.183 [parallel-3] c.o.Example14_33 INFO - # onNext: 2
    17:42:33.486 [parallel-5] c.o.Example14_33 INFO - # onNext: 3
    17:42:33.572 [parallel-4] c.o.Example14_33 INFO - # onNext: 6
    17:42:33.789 [parallel-6] c.o.Example14_33 INFO - # onNext: 4
    17:42:34.074 [parallel-7] c.o.Example14_33 INFO - # onNext: 7

    delayElements() 를 사용해서 첫 번째 파라미터의 Flux 는 0.3초에 한 번씩 데이터를 emit 하게 설정했고,

    두 번째  파라미터의 Flux 는 0.5초에 한 번식 데이터를 emit 하도록 설정했습니다.

    각각의 [parallel-#] 스레드에서 Publisher 가 emit 하는 시간 이 빠른 데이터부터 차례대로 emit 하고 있는 것을 볼 수 있습니다.

    ms 1st Flux 2nd Flux
    300 1  
    500   5
    600 2  
    900 3  
    1000   6
    1200 4  
    1500   7

     

    코드 13-34 merge 예제 2

    public static void main(String[] args) throws InterruptedException {
      String[] usaStates = {
          "Ohio", "Michigan", "New Jersey", "Illinois", "New Hampshire",
          "Virginia", "Vermont", "North Carolina", "Ontario", "Georgia"
      };
    
      Flux
          .merge(getMeltDownRecoveryMessage(usaStates))
          .subscribe(log::info);
    
      Thread.sleep(2000L);
    }
    
    private static List<Mono<String>> getMeltDownRecoveryMessage(String[] usaStates) {
      List<Mono<String>> messages = new ArrayList<>();
      for (String state : usaStates) {
        messages.add(SampleData.nppMap.get(state));
      }
    
      return messages;
    }
    
    public class SampleData {
    	public static Map<String, Mono<String>> nppMap = new HashMap<>();
    
      static {
    	nppMap.put("Ontario",
    		  Mono.just("1500 Ontario Done").delayElement(Duration.ofMillis(1500L)));
    		nppMap.put("Vermont",
    			Mono.just("400 Vermont Done").delayElement(Duration.ofMillis(400L)));
    		nppMap.put("New Hampshire",
    			Mono.just("700 New Hampshire Done").delayElement(Duration.ofMillis(700L)));
    		nppMap.put("New Jersey",
    			Mono.just("500 New Jersey Done").delayElement(Duration.ofMillis(500L)));
    		nppMap.put("Ohio",
    			Mono.just("1000 Ohio Done").delayElement(Duration.ofMillis(1000L)));
    		nppMap.put("Michigan",
    			Mono.just("200 Michigan Done").delayElement(Duration.ofMillis(200L)));
    		nppMap.put("Illinois",
    			Mono.just("300 Illinois Done").delayElement(Duration.ofMillis(300L)));
    		nppMap.put("Virginia",
    			Mono.just("600 Virginia Done").delayElement(Duration.ofMillis(600L)));
    		nppMap.put("North Carolina",
    			Mono.just("800 North Carolina Done").delayElement(Duration.ofMillis(800L)));
    		nppMap.put("Georgia",
    			Mono.just("900 Georgia Done").delayElement(Duration.ofMillis(900L)));
                
        for (char c = 'a'; c <= 'a' + 25; c++) {
          morseCodeMap.put(morseCodes[c - ('z' - 25)], Character.toString(c));
        }
      }
    }
    
    
    // result 
    > Task :Example14_34.main()
    17:59:11.234 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    18:04:03.824 [parallel-2] c.o.Example14_34 INFO - 200 Michigan Done
    18:04:03.922 [parallel-4] c.o.Example14_34 INFO - 300 Illinois Done
    18:04:04.025 [parallel-7] c.o.Example14_34 INFO - 400 Vermont Done
    18:04:04.125 [parallel-3] c.o.Example14_34 INFO - 500 New Jersey Done
    18:04:04.225 [parallel-6] c.o.Example14_34 INFO - 600 Virginia Done
    18:04:04.323 [parallel-5] c.o.Example14_34 INFO - 700 New Hampshire Done
    18:04:04.429 [parallel-8] c.o.Example14_34 INFO - 800 North Carolina Done
    18:04:04.527 [parallel-10] c.o.Example14_34 INFO - 900 Georgia Done
    18:04:04.622 [parallel-1] c.o.Example14_34 INFO - 1000 Ohio Done
    18:04:05.125 [parallel-9] c.o.Example14_34 INFO - 1500 Ontario Done

    주별 원자로 복구되는 시간 순서대로 작성한 Sample Data 예제 입니다

    입력된 순서가 아닌, delayElement(Duration.ofMillis()) 시간 순서대로 데이터가 emit 되었습니다

     

     

    5) zip

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#zip-org.reactivestreams.Publisher-org.reactivestreams.Publisher-

    zip() Operator s s 파라미터로 입력되는 Publisher Sequence 에서 emit 된 데이터를 결합하는데,

    각 publisher 가 데이터를 하나씩 emit 할 때까지 기다렸다가 결합합니다

    public static void main(String[] args) throws InterruptedException {
      Flux
          .zip(
              Flux.just(1, 2, 3).delayElements(Duration.ofMillis(300L)),
              Flux.just(4, 5, 6).delayElements(Duration.ofMillis(500L))
          )
          .subscribe(tuple2 -> log.info("# onNext: {}", tuple2));
    
      Thread.sleep(2500L);
    }
    
    // result 
    > Task :Example14_35.main()
    18:16:08.046 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    18:16:08.614 [parallel-2] c.o.Example14_35 INFO - # onNext: [1,4]
    18:16:09.118 [parallel-4] c.o.Example14_35 INFO - # onNext: [2,5]
    18:16:09.620 [parallel-6] c.o.Example14_35 INFO - # onNext: [3,6]

    zip() Operator 의 첫 번째 파라미터인 Flux 에서 0.3초에 한번식 데이터를 emit 하고,

    두번째 Flux 에서 0.5초에 한 번씩 데이터를 emit 합니다

    두 개의 Flux 가 emit 하는 시간이 다르지만 각 Flux 에서 하나씩 emit 할 때까지 기다렸다가,

    emit 된 데이터를 Tuple2 객체로 묶어서 Subscriber 에게 전달합니다

     

    3rd 파라미터 combinator 

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#zip-org.reactivestreams.Publisher-org.reactivestreams.Publisher-java.util.function.BiFunction-

     

    코드 14-36 zip 예제 2

    public static void main(String[] args) throws InterruptedException {
      Flux
          .zip(
              Flux.just(1, 2, 3).delayElements(Duration.ofMillis(300L)),
              Flux.just(4, 5, 6).delayElements(Duration.ofMillis(500L)),
              (n1, n2) -> n1 * n2
          )
          .subscribe(data -> log.info("# onNext: {}", data));
    
      Thread.sleep(2500L);
    }
    //result 
    > Task :Example14_36.main()
    18:18:53.351 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    18:18:53.919 [parallel-2] c.o.Example14_36 INFO - # onNext: 4  // 1 * 4
    18:18:54.425 [parallel-4] c.o.Example14_36 INFO - # onNext: 10 // 2 * 5
    18:18:54.928 [parallel-6] c.o.Example14_36 INFO - # onNext: 18 // 3 * 6

    zip() Operator 의 세 번째 파라미터로 combinator (BiFunction 함수형 인터페이스)를 추가해서, 

    두 개의 Flux 가 emit 하는 한 쌍의 데이터를 combinator 에서 전달받아 변환 작업을 거친 후,

    최종 변환된 데이터를 Subscriber 에게 전달합니다.

    코드 14-36 에서는 한 쌍의 데이터를 서로 곱한 값을 Subscriber 에게 전달합니다

     

    코드 14-37 zip 예제 3

    public static void main(String[] args) throws InterruptedException {
      getInfectedPersonsPerHour(10, 21)
        .subscribe(tuples -> {
          Tuple3<Tuple2, Tuple2, Tuple2> t3 = (Tuple3) tuples;
          int sum = (int) t3.getT1().getT2() +
            (int) t3.getT2().getT2() + (int) t3.getT3().getT2();
          log.info("# onNext: time : {}, sum : {}", t3.getT1().getT1(), sum);        
        });
    }
    
    private static Flux getInfectedPersonsPerHour(int start, int end) {
      return Flux.zip(
        Flux.fromIterable(SampleData.seoulInfected)
          .filter(t2 -> t2.getT1() >= start && t2.getT1() <= end),
        Flux.fromIterable(SampleData.incheonInfected)
          .filter(t2 -> t2.getT1() >= start && t2.getT1() <= end),
        Flux.fromIterable(SampleData.suwonInfected)
         .filter(t2 -> t2.getT1() >= start && t2.getT1() <= end)
      );
    }
    
    public class SampleData {
      public static final List<Tuple2<Integer, Integer>> seoulInfected =
        Arrays.asList(
          Tuples.of(1, 0), Tuples.of(2, 0), Tuples.of(3, 0), Tuples.of(4, 0),
          Tuples.of(5, 0), Tuples.of(6, 0), Tuples.of(7, 0), Tuples.of(8, 0),
          Tuples.of(9, 0), Tuples.of(10, 20), Tuples.of(11, 23),
          Tuples.of(12, 33), Tuples.of(13, 10), Tuples.of(14, 15),
          Tuples.of(15, 20), Tuples.of(16, 30), Tuples.of(17, 10),
          Tuples.of(18, 11), Tuples.of(19, 13), Tuples.of(20, 8),
          Tuples.of(21, 14), Tuples.of(22, 4), Tuples.of(23, 7), Tuples.of(24, 2)
        );
    
      public static final List<Tuple2<Integer, Integer>> incheonInfected =
        Arrays.asList(
          Tuples.of(1, 0), Tuples.of(2, 0), Tuples.of(3, 0), Tuples.of(4, 0),
          Tuples.of(5, 0), Tuples.of(6, 0), Tuples.of(7, 0), Tuples.of(8, 0),
          Tuples.of(9, 0), Tuples.of(10, 3), Tuples.of(11, 5), Tuples.of(12, 2),
          Tuples.of(13, 10), Tuples.of(14, 5), Tuples.of(15, 6),
          Tuples.of(16, 7), Tuples.of(17, 2), Tuples.of(18, 5),
          Tuples.of(19, 2), Tuples.of(20, 0), Tuples.of(21, 2),
          Tuples.of(22, 0), Tuples.of(23, 2), Tuples.of(24, 1)
        );
    
      public static final List<Tuple2<Integer, Integer>> suwonInfected =
        Arrays.asList(
          Tuples.of(1, 0), Tuples.of(2, 0), Tuples.of(3, 0), Tuples.of(4, 0),
          Tuples.of(5, 0), Tuples.of(6, 0), Tuples.of(7, 0), Tuples.of(8, 0),
          Tuples.of(9, 0), Tuples.of(10, 2), Tuples.of(11, 1),
          Tuples.of(12, 0), Tuples.of(13, 3), Tuples.of(14, 2),
          Tuples.of(15, 3), Tuples.of(16, 6), Tuples.of(17, 3),
          Tuples.of(18, 1), Tuples.of(19, 1), Tuples.of(20, 0),
          Tuples.of(21, 0), Tuples.of(22, 1), Tuples.of(23, 0), Tuples.of(24, 0)
        );
    }
    
    // result
    > Task :Example14_37.main()
    18:29:43.627 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    18:43:22.394 [main] c.o.Example14_37 INFO - # onNext: time : 10, sum : 25
    18:43:22.397 [main] c.o.Example14_37 INFO - # onNext: time : 11, sum : 29
    18:43:22.397 [main] c.o.Example14_37 INFO - # onNext: time : 12, sum : 35
    18:43:22.397 [main] c.o.Example14_37 INFO - # onNext: time : 13, sum : 23
    18:43:22.397 [main] c.o.Example14_37 INFO - # onNext: time : 14, sum : 22
    18:43:22.397 [main] c.o.Example14_37 INFO - # onNext: time : 15, sum : 29
    18:43:22.398 [main] c.o.Example14_37 INFO - # onNext: time : 16, sum : 43
    18:43:22.398 [main] c.o.Example14_37 INFO - # onNext: time : 17, sum : 15
    18:43:22.398 [main] c.o.Example14_37 INFO - # onNext: time : 18, sum : 17
    18:43:22.398 [main] c.o.Example14_37 INFO - # onNext: time : 19, sum : 16
    18:43:22.398 [main] c.o.Example14_37 INFO - # onNext: time : 20, sum : 8
    18:43:22.398 [main] c.o.Example14_37 INFO - # onNext: time : 21, sum : 16

    zip() Operator 를 사용해서 수도권에 있는 세 개 도시의 특정 시간 범위 내의 코로나 확진자 수를 출력합니다

    zip() 파라미터로 총 세 개의 Flux 를 전달했습니다. 

    그리고 getInfectedPersonPerHour() 메서드의 파라미터로 특정 시간 범위를 전달해서 해당 범위 내에서만 시간별로 통계를 내고 있습니다

     

     

    6) and

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#and-org.reactivestreams.Publisher-

    and() Operator 는 Mono 의 Complete Signal 과 파라미터로 입력된 Publisher 의 Complete Signal 을 결합하여 새로운 Mono<Void>를 반환합니다

    즉, Mono 와 파라미터로 입력된 Publisher 의 Sequence 가 모두 종료되었음을 Subscriber 에게 알릴 수 있습니다

    and() Operator 의 마블 다이어그램을 보면, and() Operator 위쪽의 Sequence 와 and() Operator 내부의 Sequence 가 종료되어도 Downstream 쪽으로 emit 되는 데이터가 하나도 없는 것을 볼 수 있습니다

     

    코드 14-38 and 예제 1

    public static void main(String[] args) throws InterruptedException {
      Mono
        .just("Task 1")
        .delayElement(Duration.ofSeconds(1))
        .doOnNext(data -> log.info("# Mono doOnNext: {}", data))
        .and(
          Flux
            .just("Task 2", "Task 3")
            .delayElements(Duration.ofMillis(600))
            .doOnNext(data -> log.info("# Flux doOnNext: {}", data))
        )
        .subscribe(
          data -> log.info("# onNext: {}", data),
          error -> log.error("# onError:", error),
          () -> log.info("# onComplete")
        );
    
      Thread.sleep(5000);
    }
    // result 
    > Task :Example14_38.main()
    21:52:33.887 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    21:52:34.601 [parallel-2] c.o.Example14_38 INFO - # Flux doOnNext: Task 2
    21:52:34.993 [parallel-1] c.o.Example14_38 INFO - # Mono doOnNext: Task 1
    21:52:35.216 [parallel-3] c.o.Example14_38 INFO - # Flux doOnNext: Task 3
    21:52:35.217 [parallel-3] c.o.Example14_38 INFO - # onComplete

    and() Operator 를 기준으로 Upstream 에서 1초의 지연 시간을 가진 뒤 "Task 1" 을 emit 하고, 

    and() 내부의 Inner Sequence 에서는 0.6초에 한 번씩 "Task2", "Task 3" 을 emit 합니다

    결과적으로 Subscriber 에게 onComplete Signal 만 전달되고, Upstream 에서 emit 된 데이터는 전달되지 않습니다.

    즉, and() Operator 는 모든 Sequence 가 종료되길 기다렸다가 최종적으로 onComplte Signal 만 전송합니다.

     

    코드 14-39 and 예제 2

    public static void main(String[] args) throws InterruptedException {
      restartApplicationServer()
        .and(restartDBServer())
        .subscribe(
          data -> log.info("# onNext: {}", data),
          error -> log.error("# onError:", error),
          () -> log.info("# sent an email to Administrator: " +
              "All Servers are restarted successfully")
        );
    
      Thread.sleep(6000L);
    }
    
    private static Mono<String> restartApplicationServer() {
      return Mono
        .just("Application Server was restarted successfully.")
        .delayElement(Duration.ofSeconds(2))
        .doOnNext(log::info);
    }
    
    private static Publisher<String> restartDBServer() {
      return Mono
        .just("DB Server was restarted successfully.")
        .delayElement(Duration.ofSeconds(4))
        .doOnNext(log::info);
    }
    
    // result
    > Task :Example14_39.main()
    22:05:40.235 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    22:05:42.261 [parallel-1] c.o.Example14_39 INFO - Application Server was restarted successfully.
    22:05:44.259 [parallel-2] c.o.Example14_39 INFO - DB Server was restarted successfully.
    22:05:44.259 [parallel-2] c.o.Example14_39 INFO - # sent an email to Administrator: All Servers are restarted successfully

    and() Operator 를 이용해서 두 개의 서버를 재가동하는 상황을 시뮬레이션 했습니다

    먼저 애플리케이션 서버가 2초 뒤에 재가동되었고, 4초 뒤에 DB서버가 재가동 되었습니다

    두 개의 서버가 성공적으로 재가동된 후, 두 개 서버의 재가동이 성공적으로 수행되었음을 이메일로 관리자에게 알리고 있습니다.

    이처럼 and() Operator 는 모든 작업이 끝난 시점에 최종적으로 후처리 작업을 수행하기 적합한 Operator 입니다

     

    7) collectList

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#zip-java.util.function.Function-int-org.reactivestreams.Publisher...-

    collectList() Operator 는 Flux 에서 emit 된 데이터를 모아서 List 로 변환한 후, 

    변환된 List 를 emit 하는 Mono 를 반환합니다.

    만약, Upstream Sequence 가 비어있다면 비어 있는 List 를 Downstream 으로 emit 합니다

    코드 14-40 collectList 예제 

    public static void main(String[] args) {
      Flux
        .just("...", "---", "...")
        .map(code -> transformMorseCode(code))
        .collectList()
        .subscribe(list -> log.info(list.stream().collect(Collectors.joining())));
    }
    
    public static String transformMorseCode(String morseCode) {
      return SampleData.morseCodeMap.get(morseCode);
    }
    
    public class SampleData {
    	public static Map<String, String> morseCodeMap = new HashMap<>();
    	public static String[] morseCodes = {
        ".-", "-...", "-.-.", "-..", ".", "..-.", "--.", "....", "..", ".---", "-.-",
        ".-..", "--", "-.", "---", ".--.", "--.-", ".-.", "...", "-", "..-", "...-",
        ".--", "-..-", "-.--", "--.."};
    
    	static {
    	  for (char c = 'a'; c <= 'a' + 25; c++) {
    	    morseCodeMap.put(morseCodes[c - ('z' - 25)], Character.toString(c));
    	  }
    	}
    }
    
    
    
    // result
    > Task :Example14_40.main()
    22:15:27.722 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    22:15:27.776 [main] c.o.Example14_40 INFO - sos

    collectList() Operator 를 이용해 Upstream 에서 emit 되는 모스 부호를 해석한 문자를 List 로 변환한 후, Subscriber 에게 전달합니다.

    Subscriber 는 전달받은 List 에 포함된 문자를 Stream 을 사용해서 연결한 후, 모스 부호를 최종적으로 해석합니다

     

     

     

    8) collectMap

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#collectMap-java.util.function.Function-

    collectMap() Operator 는 Flux 에서 emit 된 데이터를 기반으로 key 와 value 를 생성하여 Map 의 Element 로 추가한 후,

    최종적으로 Map 을 emit 하는 Mono 를 반환합니다

     

    코드 14-41 collectMap 예제

    public static void main(String[] args) {
      Flux
          .range(0, 26)
          .collectMap(key -> SampleData.morseCodes[key],
              value -> transformToLetter(value))
          .subscribe(map -> log.info("# onNext: {}", map));
    }
    
    private static String transformToLetter(int value) {
      return Character.toString((char) ('a' + value));
    }
    
    // result
    > Task :Example14_41.main()
    22:30:09.735 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    22:30:09.783 [main] c.o.Example14_41 INFO - # onNext: {..=i, .---=j, --.-=q, ---=o, --.=g, .--=w, .-.=r, -.--=y, ....=h, -.-.=c, --=m, -.=n, .-..=l, ...-=v, -.-=k, -..=d, -=t, ..-=u, .=e, ...=s, -...=b, ..-.=f, .--.=p, -..-=x, --..=z, .-=a}

    range() Operator 를 이용해서 0부터 25까지의 숫자를 차례대로 emit 합니다

    그런 다음 collectMap() 의 첫 번째 파라미터인 람다 표현식에서 range() 로부터 emit 된 숫자를 기반으로 SampleData의 morseCodes 배열의 모스 부호를 차례대로 읽어 옵니다

    이렇게 읽어 온 모스 부호는 Map 원소의 Key 가 됩니다

    collectMap() 의 두번째 파라미터인 람다 표현식에서는 range() 로 부터 emit 된 숫자를 a부터 z까지의 알파벳으로 변환합니다

    변환된 a부터 z까지의 알파벳은 Map원소의 value 가 됩니다

     

     

     

    'Spring > Webflux' 카테고리의 다른 글

    14장 Operator 5 - Error  (0) 2023.07.22
    14장 Operator 4 - peek  (0) 2023.07.22
    14장 Operator 2 - filter  (0) 2023.07.09
    14장 Operator 1 - Sequence 생성  (0) 2023.07.08
    13장 Testing  (0) 2023.05.27

    댓글

Designed by Tistory.