ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 14장 Operator 7 - split
    Spring/Webflux 2023. 7. 29. 17:26

    github : https://github.com/bjpublic/Spring-Reactive/tree/main/part2/src/main/java/chapter14/operator_7_split

     

    14.8 Flux Sequence 분할을 위한 Operator 

     

    1) windows

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#window-java.time.Duration-

    window(int maxSize) 는 Upstream 에서 emit 되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터를 포함하는 새로운 Flux 로 분할합니다. Reactor 에서는 이렇게 분할된 Flux 를 윈도우(window) 라고 부릅니다

     

    코드 14-53 window 예제 1

    public static void main(String[] args) {
      Flux.range(1, 11)
          .window(3)
          .flatMap(flux -> {
            log.info("======================");
            return flux;
          })
          .subscribe(new BaseSubscriber<>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
              subscription.request(2);
            }
    
            @Override
            protected void hookOnNext(Integer value) {
              log.info("# onNext: {}", value);
              request(2);
            }
          });
    }
    // result 
    > Task :Example14_53.main()
    20:51:55.330 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    20:51:55.356 [main] c.operator_7_split.Example14_53 INFO - ======================
    20:51:55.358 [main] c.operator_7_split.Example14_53 INFO - # onNext: 1
    20:51:55.359 [main] c.operator_7_split.Example14_53 INFO - # onNext: 2
    20:51:55.359 [main] c.operator_7_split.Example14_53 INFO - # onNext: 3
    20:51:55.359 [main] c.operator_7_split.Example14_53 INFO - ======================
    20:51:55.359 [main] c.operator_7_split.Example14_53 INFO - # onNext: 4
    20:51:55.360 [main] c.operator_7_split.Example14_53 INFO - # onNext: 5
    20:51:55.360 [main] c.operator_7_split.Example14_53 INFO - # onNext: 6
    20:51:55.360 [main] c.operator_7_split.Example14_53 INFO - ======================
    20:51:55.360 [main] c.operator_7_split.Example14_53 INFO - # onNext: 7
    20:51:55.360 [main] c.operator_7_split.Example14_53 INFO - # onNext: 8
    20:51:55.360 [main] c.operator_7_split.Example14_53 INFO - # onNext: 9
    20:51:55.361 [main] c.operator_7_split.Example14_53 INFO - ======================
    20:51:55.361 [main] c.operator_7_split.Example14_53 INFO - # onNext: 10
    20:51:55.361 [main] c.operator_7_split.Example14_53 INFO - # onNext: 11

    .window(3) 을 이용해 데이터를 3개씩 분할한 후, 

    flatMap 에서 구분선 "=============" 출력합니다

     

     

    코드 14-54 window 예제 2

    public static void main(String[] args) {
      Flux.fromIterable(SampleData.monthlyBookSales2021)
          .window(3)
          .flatMap(flux -> MathFlux.sumInt(flux))
          .subscribe(new BaseSubscriber<>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
              subscription.request(2);
            }
    
            @Override
            protected void hookOnNext(Integer value) {
              log.info("# onNext: {}", value);
              request(2);
            }
          });
    }
    
    public class SampleData {
    	public static final List<Integer> monthlyBookSales2021 =
    			Arrays.asList(2_500_000, 3_200_000, 2_300_000, 4_500_000,
    					6_500_000, 5_500_000, 3_100_000, 2_000_000,
    					2_800_000, 4_100_000, 6_200_000, 4_200_000);
    }          
    
    // result
    > Task :Example14_54.main()
    20:56:37.656 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    20:56:37.799 [main] c.operator_7_split.Example14_54 INFO - # onNext: 8000000
    20:56:37.807 [main] c.operator_7_split.Example14_54 INFO - # onNext: 16500000
    20:56:37.807 [main] c.operator_7_split.Example14_54 INFO - # onNext: 7900000
    20:56:37.807 [main] c.operator_7_split.Example14_54 INFO - # onNext: 14500000

    2021년도 분기별 도서 매출액 구하는 예제 코드 입니다

    .windows(3) 로 3개씩 분할한 후에 MathFlux.sumInt() 를 이용해 3개씩 분할된 합계를 구합니다

     

    MathFlux
    MathFlux 는 수학 계산을 위한 전용 Flux 로, MathFlux 를 사용하려면 reactor-extra 를 build.gradle 에 아래와 같이 의존 라이브러리로 추가해 주어야 합니다
    implementation 'io.projectreactor.addons:reactor-extra:3.4.8'

     

    2) buffer

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#buffer-int-

     

    buffer(int maxSize) 는 Upstream 에서 emit 되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터를 List 버퍼로 한 번에 emit 합니다

    마지막 버퍼에 포함된 데이터의 개수는 maxSize보다 더 적거나 같습니다

     

    코드 14-55 buffer 예제

    public static void main(String[] args) {
      Flux.range(1, 95)
          .buffer(10)
          .subscribe(buffer -> log.info("# onNext: {}", buffer));
    }
    
    // result 
    > Task :Example14_55.main()
    22:04:07.565 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    22:04:07.573 [main] c.operator_7_split.Example14_55 INFO - # onNext: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    22:04:07.574 [main] c.operator_7_split.Example14_55 INFO - # onNext: [11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
    22:04:07.574 [main] c.operator_7_split.Example14_55 INFO - # onNext: [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
    22:04:07.574 [main] c.operator_7_split.Example14_55 INFO - # onNext: [31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
    22:04:07.575 [main] c.operator_7_split.Example14_55 INFO - # onNext: [41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
    22:04:07.575 [main] c.operator_7_split.Example14_55 INFO - # onNext: [51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
    22:04:07.575 [main] c.operator_7_split.Example14_55 INFO - # onNext: [61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
    22:04:07.575 [main] c.operator_7_split.Example14_55 INFO - # onNext: [71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
    22:04:07.575 [main] c.operator_7_split.Example14_55 INFO - # onNext: [81, 82, 83, 84, 85, 86, 87, 88, 89, 90]
    22:04:07.575 [main] c.operator_7_split.Example14_55 INFO - # onNext: [91, 92, 93, 94, 95]

    buffer(10) 으로 지정했기 때문에 Upstream 에서 emit 된 데이터가 최대 10개까지 버퍼에 담기면, List 버퍼 형태로 Downstream 에 emit 합니다

    List 버퍼로 10개씩 한꺼번에 Subscriber 에 전달되어 출력됩니다

     

    높은 처리량을 요구하는 애플리케이션이 있다면, 들어오는 데이터를 순차적으로 처리하기보다는 batch insert 같은 일괄 작업에  buffer() 를 이용해서 성능 향상을 기대할 수 있습니다.

     

    3) bufferTimeout

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#bufferTimeout-int-java.time.Duration-

    bufferTimeout(maxSize, maxTime) 은 Upstream 에서 emit 되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터 또는 maxTime 내에 emit 된 데이터를 List 버퍼로 한 번에 emit 합니다

    bufferTimeout() 의 마블 다이어그램을 보면 파란색은 maxSize = 2 를 채우지 못했지만, timeout 조건에 부합되어 버퍼에 담겼습니다.

     

    코드 14-56 bufferTimeout 예제

    public static void main(String[] args) {
      Flux
          .range(1, 20)
          .map(num -> {
            try {
              if (num < 10) {
                Thread.sleep(100L);
              } else {
                Thread.sleep(300L);
              }
            } catch (InterruptedException e) {
            }
            return num;
          })
          .bufferTimeout(3, Duration.ofMillis(400L))
          .subscribe(buffer -> log.info("# onNext: {}", buffer));
    }
    
    // result 
    > Task :Example14_56.main()
    22:10:31.802 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    22:10:32.129 [main] c.operator_7_split.Example14_56 INFO - # onNext: [1, 2, 3]
    22:10:32.438 [main] c.operator_7_split.Example14_56 INFO - # onNext: [4, 5, 6]
    22:10:32.750 [main] c.operator_7_split.Example14_56 INFO - # onNext: [7, 8, 9]
    22:10:33.455 [parallel-1] c.operator_7_split.Example14_56 INFO - # onNext: [10, 11]
    22:10:34.066 [parallel-1] c.operator_7_split.Example14_56 INFO - # onNext: [12, 13]
    22:10:34.670 [parallel-1] c.operator_7_split.Example14_56 INFO - # onNext: [14, 15]
    22:10:35.272 [parallel-1] c.operator_7_split.Example14_56 INFO - # onNext: [16, 17]
    22:10:35.880 [parallel-1] c.operator_7_split.Example14_56 INFO - # onNext: [18, 19]
    22:10:36.078 [main] c.operator_7_split.Example14_56 INFO - # onNext: [20]

    num < 10 이면 0.1초를 sleep 하고

    num >= 이면 0.3초를 sleep 합니다

    bufferTimeout(3, 0.4초) 조건으로 데이터를 전달합니다.

     

    10 미만에서는 3개씩 List 버퍼로 데이터가 전달되면, 

    10 이상에서는 2개씩 List 버퍼로 데이터가 전달됩니다

     

    buffer(maxSize) Operator 의 경우, 입력으로 들어오는 데이터가 maxSize 가 되기 전에 어떤 오류로 인해 들어오지 못하는 상황이 발생할 경우, 애플리케이션은 maxSize 가 될때까지 무한정 기다리게 될 것입니다
    따라서 bufferTimeout(maxSize, maxTime) Operator 를 사용함으로써, maxSize 만큼 데이터가 입력으로 들어오지 않더라도 maxTime 에 도달했을 때 버퍼를 비우게 해서 애플리케이션이 무한정 기다려야 되는 상황을 방지 할 수 있습니다.

     

    4) groupBy

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#groupBy-java.util.function.Function-

    groupBy(keyMapper) 는 emit 되는 데이터를 keyMapper 로 생성한 key 를 기준으로 그룹화한 GroupedFlux 를 리턴하며,

    이 GroupedFlux 를 통해서 그룹별로 작업을 수행할 수 있습니다.

     

    코드 14-57 groupBy 예제 1

    public static void main(String[] args) {
      Flux.fromIterable(SampleData.books)
          .groupBy(book -> book.getAuthorName())
          .flatMap(groupedFlux ->
              groupedFlux
                  .map(book -> book.getBookName() +
                      "(" + book.getAuthorName() + ")")
                  .collectList()
          )
          .subscribe(bookByAuthor ->
              log.info("# book by author: {}", bookByAuthor));
    }
    
    public class SampleData {
    	public static final List<Book> books = Arrays.asList(
    			new Book("Advance Java", "Tom", "Tom-boy", 25000, 100),
    			new Book("Advance Python", "Grace", "Grace-girl", 22000, 150),
    			new Book("Advance Reactor", "Smith", "David-boy", 35000, 200),
    			new Book("Getting started Java", "Tom", "Tom-boy", 32000, 230),
    			new Book("Advance Kotlin", "Kevin", "Kevin-boy", 32000, 250),
    			new Book("Advance Javascript", "Mike", "Tom-boy", 32000, 320),
    			new Book("Getting started Kotlin", "Kevin", "Kevin-boy", 32000, 150),
    			new Book("Getting started Python", "Grace", "Grace-girl", 32000, 200),
    			new Book("Getting started Reactor", "Smith", null, 32000, 250),
    			new Book("Getting started Javascript", "Mike", "David-boy", 32000, 330)
    	);
     }
    
    // result
    > Task :Example14_57.main()
    22:18:17.186 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    22:18:17.279 [main] c.operator_7_split.Example14_57 INFO - # book by author: [Advance Kotlin(Kevin), Getting started Kotlin(Kevin)]
    22:18:17.280 [main] c.operator_7_split.Example14_57 INFO - # book by author: [Advance Javascript(Mike), Getting started Javascript(Mike)]
    22:18:17.280 [main] c.operator_7_split.Example14_57 INFO - # book by author: [Advance Java(Tom), Getting started Java(Tom)]
    22:18:17.280 [main] c.operator_7_split.Example14_57 INFO - # book by author: [Advance Python(Grace), Getting started Python(Grace)]
    22:18:17.281 [main] c.operator_7_split.Example14_57 INFO - # book by author: [Advance Reactor(Smith), Getting started Reactor(Smith)]

    author 로 groupBy 한 후  List 로 변환하여 subscriber 에 전달하는 예제 입니다

     

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#groupBy-java.util.function.Function-java.util.function.Function-

    groupby(keyMapper, valueMapper) 의 마블다이어 그램입니다

    groupBy 는 keyMapper 를 통해 생성되는 key 를 기준으로 emit 되는 데이터를 그룹화하지만, 

    groupby(keyMapper, valueMapper) 는 그룹화하면서 valueMapper를 통해 그룹화되는 데이터를 다른 형태로 가공 처리할 수 있습니다.

    마블 다이어그램에서는 그룹화되는 도형들의 선이 제거된 후(valueMapper : noBorder) emit 되는 것을 볼 수 있습니다. 

     

    코드 14-58 groupBy 예제 2 ( keyMapper, valueMapper )

    public static void main(String[] args) {
      Flux.fromIterable(SampleData.books)
          .groupBy(book ->
                  book.getAuthorName(),
              book -> book.getBookName() + "(" + book.getAuthorName() + ")")
          .flatMap(groupedFlux -> groupedFlux.collectList())
          .subscribe(bookByAuthor ->
              log.info("# book by author: {}", bookByAuthor));
    }
    
    // result
    > Task :Example14_58.main()
    22:28:47.763 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    22:28:47.877 [main] c.operator_7_split.Example14_58 INFO - # book by author: [Advance Kotlin(Kevin), Getting started Kotlin(Kevin)]
    22:28:47.879 [main] c.operator_7_split.Example14_58 INFO - # book by author: [Advance Javascript(Mike), Getting started Javascript(Mike)]
    22:28:47.879 [main] c.operator_7_split.Example14_58 INFO - # book by author: [Advance Java(Tom), Getting started Java(Tom)]
    22:28:47.879 [main] c.operator_7_split.Example14_58 INFO - # book by author: [Advance Python(Grace), Getting started Python(Grace)]
    22:28:47.879 [main] c.operator_7_split.Example14_58 INFO - # book by author: [Advance Reactor(Smith), Getting started Reactor(Smith)]

     

    코드 14-57 에서는 2 줄에 나눠서 groupBy, flatMap 으로 각각 처리했으나,

    .groupBy(book -> book.getAuthorName())
    .flatMap(groupedFlux ->groupedFlux.map(book -> book.getBookName() +"(" + book.getAuthorName() + ")").collectList())

     

    코드 14-58 에서는 groupBy 와 mapping 을 한 번에 처리했습니다.

    .groupBy(book ->book.getAuthorName(),book -> book.getBookName() + "(" + book.getAuthorName() + ")")
    .flatMap(groupedFlux -> groupedFlux.collectList())    

     

    코드 14-59 groupBy 예제 3

    Flux.fromIterable(SampleData.books)
      .groupBy(book -> book.getAuthorName())
      .flatMap(groupedFlux ->
        Mono
          .just(groupedFlux.key())
          .zipWith(
            groupedFlux
              .map(book -> (int) (book.getPrice() * book.getStockQuantity() * 0.1))
              .reduce((y1, y2) -> y1 + y2), (authorName, sumRoyalty) -> authorName + "'s royalty: " + sumRoyalty)
      )
      .subscribe(log::info);
    }
    
    // result 
    > Task :Example14_59.main()
    22:34:49.786 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    22:34:49.894 [main] c.operator_7_split.Example14_59 INFO - Kevin's royalty: 1280000
    22:34:49.894 [main] c.operator_7_split.Example14_59 INFO - Mike's royalty: 2080000
    22:34:49.895 [main] c.operator_7_split.Example14_59 INFO - Tom's royalty: 986000
    22:34:49.895 [main] c.operator_7_split.Example14_59 INFO - Grace's royalty: 970000
    22:34:49.895 [main] c.operator_7_split.Example14_59 INFO - Smith's royalty: 1500000

    groupBy(keyMapper) 를 이용해 저자별로 도서를 그룹화한 후에 그룹화한 도서가 수량만큼 모두 판매되었을 때 저자가 얻을 수 있는 수익을 계산하는 예제 입니다

     

    .groupBy(book -> book.getAuthorName())

    1) 저자명을 기준으로 도서를 그룹화 합니다

     

    .flatMap(groupedFlux ->
      Mono
        .just(groupedFlux.key())

    2) 계산을 위해 flatMap 으로 평탄화 작업을 합니다

     

    .zipWith(
      groupedFlux
        .map(book -> (int) (book.getPrice() * book.getStockQuantity() * 0.1))
        .reduce((y1, y2) -> y1 + y2), (authorName, sumRoyalty) -> authorName + "'s royalty: " + sumRoyalty)

    4) '저자명: 총 인세 수익' 형태로 Subscriber 에게 전달하기 위해 zipwith(저자명, 총 인세 수익) 를 사용합니다

     

    .reduce((y1, y2) -> y1 + y2), (authorName, sumRoyalty) -> authorName + "'s royalty: " + sumRoyalty)

    6) reduce() 를 이용해 그룹화된 도서의 총 인세합을 계산하고,

    autherName 과 sumRoyalty 하나의 string 으로 합쳐 줍니다

    7) 계산된 총 인세 금액을 Subscriber 에게 전달합니다.

     

     

     

     

     

     

     

     

     

    'Spring > Webflux' 카테고리의 다른 글

    14장 Operator 8 - multicast  (0) 2023.07.30
    14장 Operator 6 - time  (0) 2023.07.26
    14장 Operator 5 - Error  (0) 2023.07.22
    14장 Operator 4 - peek  (0) 2023.07.22
    14장 Operator 3 - transformation  (0) 2023.07.16

    댓글

Designed by Tistory.