-
14장 Operator 3 - transformationSpring/Webflux 2023. 7. 16. 10:57
14.4 Sequence 변환 Operator
1) map
map() Operator 는 Upstream 에서 emit 된 데이터를 mapper Function 을 사용하여 변환한 후, Downstream 으로 emit 합니다.
map() Operator 의 마블 다이어그램을 보면 어떤 기능을 수행하는지 직관적으로 알 수 있습니다
map() 내부에서 에러 발생 시 Sequence 가 종료되지 않고 계속 진행되도록 하는 기능을 지원하는데, 이 부분은 14.6절 에러 처리를 위한 Operator 에서 자세히 알아보겠습니다.
코드 14-27 map 예제 1
public static void main(String[] args) { Flux .just("1-Circle", "3-Circle", "5-Circle") .map(circle -> circle.replace("Circle", "Rectangle")) .subscribe(data -> log.info("# onNext: {}", data)); } // result > Task :Example14_27.main() 10:54:13.794 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 10:54:13.805 [main] c.o.Example14_27 INFO - # onNext: 1-Rectangle 10:54:13.806 [main] c.o.Example14_27 INFO - # onNext: 3-Rectangle 10:54:13.806 [main] c.o.Example14_27 INFO - # onNext: 5-Rectangle
Upstream 에서 emit 된 문자열의 일부인 Circle 을 map() 내부에서 replace() 메서드를 이용해서 Rectangle 로 변환한 후 Downstream 으로 emit 합니다
코드 14-28 map 예제 2
public static void main(String[] args) { final double buyPrice = 50_000_000; Flux .fromIterable(SampleData.btcTopPricesPerYear) .filter(tuple -> tuple.getT1() == 2021) .doOnNext(data -> log.info("# doOnNext: {}", data)) .map(tuple -> calculateProfitRate(buyPrice, tuple.getT2())) .subscribe(data -> log.info("# onNext: {}%", data)); } private static double calculateProfitRate(final double buyPrice, Long topPrice) { return (topPrice - buyPrice) / buyPrice * 100; } // result > Task :Example14_28.main() 11:20:58.770 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 11:20:58.869 [main] c.o.Example14_28 INFO - # doOnNext: [2021,63364000] 11:20:58.870 [main] c.o.Example14_28 INFO - # onNext: 26.728%
map() Operator 를 이용해 특정 연도의 BTC 최고가 시점의 수익률을 계산하는 예제 코드 입니다
BTC를 50,000,000 원 어치 구매했을 경우, 2021년도 최고가 시점의 수익률이 얼마인지 계산합니다
또, 이해를 돕기 위해서 doOnNext() Operator 를 이용해 2021년도 BTC 최고가 금액을 출력합니다
2) flatmap
Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave.
이 Flux에서 비동기적으로 내보낸 요소를 게시자로 변환한 다음 병합을 통해 이러한 내부 게시자를 단일 Flux로 병합하여 interleave(끼워넣을) 수 있습니다.
There are three dimensions to this operator that can be compared with flatMapSequential and concatMap:
flatMapSequential, concatMap 과 비교하여 3가지 차원이 있습니다
- Generation of inners and subscription: this operator is eagerly subscribing to its inners.
이 연산자는 innser sequence 의 결과를 열렬히? 구독하고 있습니다.
- Ordering of the flattened values: this operator does not necessarily preserve original ordering, as inner element are flattened as they arrive.
이 연산자는 내부 요소가 도착하면 병합되므로 원래 순서를 반드시 유지하지는 않습니다.
- Interleaving: this operator lets values from different inners interleave (similar to merging the inner sequences).
서로 다른 내부 interleave 로 부터 값을 놓아둡니다
서로 다른 내부 게시자(publisher)에서 내보낸 값을 인터리브 방식으로 함께 병합할 수 있음을 나타냅니다.* interleave [|ɪntər│liːv] 동사 (특히 얇은 막 같은 것을) 끼우다
- First, what is new is often interleaved with the commonplace : 첫째, 새로운 것은 종종 진부한 것과 섞여(끼여) 있습니다.
- Thread scheduling is much more efficient when done in the client process, without requiring hardware- and operating-system context switches when synchronizing or interleaving thread executions. : 스레드 스케줄링은 스레드 실행을 동기화하거나 인터리빙할 때 하드웨어 및 운영 체제 컨텍스트 전환 없이 클라이언트 프로세스에서 수행될 때 훨씬 더 효율적입니다.마블 다이어그램에서는 flatMap() Operator 영역 위쪽의 Upstream 에서 동그라미 모양의 데이터가 emit 되어 flatMap() Operator 로 전달되면,
flapMap() 내부에서 Inner Sequence 를 생성한 후, 한 개 이상의 네모 모양으로 변환된 데이터를 emit 하고 있는 것을 볼 수 있습니다.
즉, Upstream 에서 emit 된 데이터 한 건이 Inner Sequence 에서 여러 건의 데이터로 변환된다는 것을 알 수 있습니다.
그런데, Upstream 에서 emit 된 데이터는 이렇게 Inner Sequence 에서 평탄화(flatten) 작업을 거치면서 하나의 Sequence 로 병합(merge) 되어 Downstream 으로 emit 됩니다.
코드 14-29 flatMap 예제 1
public static void main(String[] args) { Flux .just("Good", "Bad") .flatMap(feeling -> Flux .just("Morning", "Afternoon", "Evening") .map(time -> feeling + " " + time)) .subscribe(log::info); } // result > Task :Example14_29.main() 13:40:18.630 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 13:40:18.662 [main] c.o.Example14_29 INFO - Good Morning 13:40:18.664 [main] c.o.Example14_29 INFO - Good Afternoon 13:40:18.665 [main] c.o.Example14_29 INFO - Good Evening 13:40:18.665 [main] c.o.Example14_29 INFO - Bad Morning 13:40:18.665 [main] c.o.Example14_29 INFO - Bad Afternoon 13:40:18.666 [main] c.o.Example14_29 INFO - Bad Evening
코드 14-29는 flatMap() Operator 의 기본 개념을 이해하기 위한 예제 코드입니다.
Upstream 인 just() Operator 에서 두 개의 데이터를 emit 하면.
flatMap() 내부의 Inner Sequence 에서 세 개의 데이터를 emit 합니다
즉, 2(Upstream 에서 emit 되는 데이터 수) x 3(Inner Sequence에서 emit 되는 데이터 수) = 6개의 데이터가 최종적으로 Subscriber 에게 전달됩니다
코드 14-30 flatMap 예제2
구구단
public static void main(String[] args) throws InterruptedException { Flux .range(2, 8) .flatMap(dan -> Flux .range(1, 9) .publishOn(Schedulers.parallel()) .map(n -> dan + " * " + n + " = " + dan * n)) .subscribe(log::info); Thread.sleep(100L); } // result > Task :Example14_30.main() 13:44:00.995 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 13:44:01.051 [parallel-2] c.o.Example14_30 INFO - 3 * 1 = 3 13:44:01.055 [parallel-2] c.o.Example14_30 INFO - 3 * 2 = 6 13:44:01.055 [parallel-2] c.o.Example14_30 INFO - 3 * 3 = 9 13:44:01.055 [parallel-2] c.o.Example14_30 INFO - 3 * 4 = 12 13:44:01.055 [parallel-2] c.o.Example14_30 INFO - 3 * 5 = 15 13:44:01.055 [parallel-2] c.o.Example14_30 INFO - 3 * 6 = 18 13:44:01.055 [parallel-2] c.o.Example14_30 INFO - 3 * 7 = 21 13:44:01.055 [parallel-2] c.o.Example14_30 INFO - 3 * 8 = 24 13:44:01.055 [parallel-2] c.o.Example14_30 INFO - 3 * 9 = 27 13:44:01.056 [parallel-3] c.o.Example14_30 INFO - 4 * 1 = 4 13:44:01.056 [parallel-3] c.o.Example14_30 INFO - 5 * 1 = 5 13:44:01.057 [parallel-3] c.o.Example14_30 INFO - 5 * 2 = 10 13:44:01.057 [parallel-3] c.o.Example14_30 INFO - 5 * 3 = 15 13:44:01.057 [parallel-3] c.o.Example14_30 INFO - 5 * 4 = 20 13:44:01.057 [parallel-3] c.o.Example14_30 INFO - 5 * 5 = 25 13:44:01.057 [parallel-3] c.o.Example14_30 INFO - 5 * 6 = 30 13:44:01.057 [parallel-3] c.o.Example14_30 INFO - 5 * 7 = 35 13:44:01.057 [parallel-3] c.o.Example14_30 INFO - 5 * 8 = 40 13:44:01.058 [parallel-3] c.o.Example14_30 INFO - 5 * 9 = 45 13:44:01.058 [parallel-3] c.o.Example14_30 INFO - 6 * 1 = 6 13:44:01.058 [parallel-3] c.o.Example14_30 INFO - 6 * 2 = 12 13:44:01.058 [parallel-3] c.o.Example14_30 INFO - 6 * 3 = 18 13:44:01.058 [parallel-3] c.o.Example14_30 INFO - 6 * 4 = 24 13:44:01.058 [parallel-3] c.o.Example14_30 INFO - 6 * 5 = 30 13:44:01.058 [parallel-3] c.o.Example14_30 INFO - 6 * 6 = 36 13:44:01.058 [parallel-3] c.o.Example14_30 INFO - 6 * 7 = 42 13:44:01.059 [parallel-3] c.o.Example14_30 INFO - 6 * 8 = 48 13:44:01.059 [parallel-3] c.o.Example14_30 INFO - 6 * 9 = 54 13:44:01.059 [parallel-3] c.o.Example14_30 INFO - 7 * 1 = 7 13:44:01.059 [parallel-3] c.o.Example14_30 INFO - 7 * 2 = 14 13:44:01.059 [parallel-3] c.o.Example14_30 INFO - 7 * 3 = 21 13:44:01.059 [parallel-3] c.o.Example14_30 INFO - 7 * 4 = 28 13:44:01.059 [parallel-3] c.o.Example14_30 INFO - 7 * 5 = 35 13:44:01.059 [parallel-3] c.o.Example14_30 INFO - 7 * 6 = 42 13:44:01.059 [parallel-3] c.o.Example14_30 INFO - 7 * 7 = 49 13:44:01.059 [parallel-3] c.o.Example14_30 INFO - 7 * 8 = 56 13:44:01.060 [parallel-3] c.o.Example14_30 INFO - 7 * 9 = 63 13:44:01.060 [parallel-3] c.o.Example14_30 INFO - 8 * 1 = 8 13:44:01.060 [parallel-3] c.o.Example14_30 INFO - 8 * 2 = 16 13:44:01.060 [parallel-3] c.o.Example14_30 INFO - 8 * 3 = 24 13:44:01.060 [parallel-3] c.o.Example14_30 INFO - 8 * 4 = 32 13:44:01.060 [parallel-3] c.o.Example14_30 INFO - 8 * 5 = 40 13:44:01.060 [parallel-3] c.o.Example14_30 INFO - 8 * 6 = 48 13:44:01.060 [parallel-3] c.o.Example14_30 INFO - 8 * 7 = 56 13:44:01.060 [parallel-3] c.o.Example14_30 INFO - 8 * 8 = 64 13:44:01.060 [parallel-3] c.o.Example14_30 INFO - 8 * 9 = 72 13:44:01.060 [parallel-3] c.o.Example14_30 INFO - 9 * 1 = 9 13:44:01.061 [parallel-3] c.o.Example14_30 INFO - 9 * 2 = 18 13:44:01.061 [parallel-3] c.o.Example14_30 INFO - 9 * 3 = 27 13:44:01.061 [parallel-3] c.o.Example14_30 INFO - 9 * 4 = 36 13:44:01.061 [parallel-3] c.o.Example14_30 INFO - 9 * 5 = 45 13:44:01.061 [parallel-3] c.o.Example14_30 INFO - 9 * 6 = 54 13:44:01.061 [parallel-3] c.o.Example14_30 INFO - 9 * 7 = 63 13:44:01.061 [parallel-3] c.o.Example14_30 INFO - 9 * 8 = 72 13:44:01.061 [parallel-3] c.o.Example14_30 INFO - 9 * 9 = 81 13:44:01.061 [parallel-3] c.o.Example14_30 INFO - 2 * 1 = 2 13:44:01.061 [parallel-3] c.o.Example14_30 INFO - 2 * 2 = 4 13:44:01.061 [parallel-3] c.o.Example14_30 INFO - 2 * 3 = 6 13:44:01.062 [parallel-3] c.o.Example14_30 INFO - 2 * 4 = 8 13:44:01.062 [parallel-3] c.o.Example14_30 INFO - 2 * 5 = 10 13:44:01.062 [parallel-3] c.o.Example14_30 INFO - 2 * 6 = 12 13:44:01.062 [parallel-3] c.o.Example14_30 INFO - 2 * 7 = 14 13:44:01.062 [parallel-3] c.o.Example14_30 INFO - 2 * 8 = 16 13:44:01.062 [parallel-3] c.o.Example14_30 INFO - 2 * 9 = 18 13:44:01.062 [parallel-3] c.o.Example14_30 INFO - 4 * 2 = 8 13:44:01.062 [parallel-3] c.o.Example14_30 INFO - 4 * 3 = 12 13:44:01.063 [parallel-3] c.o.Example14_30 INFO - 4 * 4 = 16 13:44:01.063 [parallel-3] c.o.Example14_30 INFO - 4 * 5 = 20 13:44:01.063 [parallel-3] c.o.Example14_30 INFO - 4 * 6 = 24 13:44:01.063 [parallel-3] c.o.Example14_30 INFO - 4 * 7 = 28 13:44:01.064 [parallel-3] c.o.Example14_30 INFO - 4 * 8 = 32 13:44:01.064 [parallel-3] c.o.Example14_30 INFO - 4 * 9 = 36
코드 14-30은 flatMap() Operator 를 사용하여 구구단을 출력하는 예제 코드 입니다
이번에는 flatMap() 내부의 Inner Sequence 에 Scheduler 를 설정해서 비동기적으로 데이터를 emit 하도록 했습니다.
이처럼 flatMap() 내부의 Inner Sequence를 비동기적으로 실행하면 데이터 emit 의 순서를 보장하지 않는다는 사실을 알 수 있습니다.
추가로 아래 2개의 파라미터도 지원합니다
- concrrency : 동시실행 스레드 갯 수
- prefetch : request 요청할 데이터 갯 수
3) concat
concat() Operator 는 파라미터로 입력되는 Publisher 의 Sequence 를 연결해서 데이터를 순차적으로 emit 합니다
특히 먼저 입력된 Publisher 의 Sequence 가 종료될 때까지 나머지 Publisher 의 Sequence 는 subscribe 되지 않고 대기하는 특성을 가집니다.
코드 14-31 concat 예제 1
public static void main(String[] args) { Flux .concat(Flux.just(1, 2, 3), Flux.just(4, 5)) .subscribe(data -> log.info("# onNext: {}", data)); } // result > Task :Example14_31.main() 17:28:33.302 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 17:28:33.317 [main] c.o.Example14_31 INFO - # onNext: 1 17:28:33.319 [main] c.o.Example14_31 INFO - # onNext: 2 17:28:33.319 [main] c.o.Example14_31 INFO - # onNext: 3 17:28:33.319 [main] c.o.Example14_31 INFO - # onNext: 4 17:28:33.319 [main] c.o.Example14_31 INFO - # onNext: 5
concat() 의 파라미터로 2개의 Flux 를 입력했으며, 먼저 입력된 순서대로 Flux 의 Sequence 를 차례대로 emit 합니다
코드 14-32 concat 예제 2
public static void main(String[] args) { Flux .concat( Flux.fromIterable(getViralVector()), Flux.fromIterable(getMRNA()), Flux.fromIterable(getSubunit())) .subscribe(data -> log.info("# onNext: {}", data)); } private static List<Tuple2<SampleData.CovidVaccine, Integer>> getViralVector() { return SampleData.viralVectorVaccines; } private static List<Tuple2<SampleData.CovidVaccine, Integer>> getMRNA() { return SampleData.mRNAVaccines; } private static List<Tuple2<SampleData.CovidVaccine, Integer>> getSubunit() { return SampleData.subunitVaccines; } public class SampleData { public static final List<Tuple2<CovidVaccine, Integer>> viralVectorVaccines = Arrays.asList( Tuples.of(CovidVaccine.AstraZeneca, 3_000_000), Tuples.of(CovidVaccine.Janssen, 2_000_000) ); public static final List<Tuple2<CovidVaccine, Integer>> mRNAVaccines = Arrays.asList( Tuples.of(CovidVaccine.Pfizer, 1_000_000), Tuples.of(CovidVaccine.Moderna, 4_000_000) ); public static final List<Tuple2<CovidVaccine, Integer>> subunitVaccines = Arrays.asList( Tuples.of(CovidVaccine.Novavax, 2_500_000) ); } // result > Task :Example14_32.main() 17:30:56.086 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 17:30:56.208 [main] c.o.Example14_32 INFO - # onNext: [AstraZeneca,3000000] 17:30:56.210 [main] c.o.Example14_32 INFO - # onNext: [Janssen,2000000] 17:30:56.210 [main] c.o.Example14_32 INFO - # onNext: [Pfizer,1000000] 17:30:56.210 [main] c.o.Example14_32 INFO - # onNext: [Moderna,4000000] 17:30:56.210 [main] c.o.Example14_32 INFO - # onNext: [Novavax,2500000]
유형별 코로나 백신 List 를 concat() Operator 를 사용해 연결하는 예제 코드 입니다
세 개의 Flux 를 concat() Operator 의 파라미터로 전달해서 유형별 코로나 백신의 재고 수량을 순차적으로 subscriber 에게 전달합니다
4) merge
merge() Operator 는 파라미터로 입력되는 Publisher 의 Sequence 에서 emit 된 데이터를 인터리빙(interleave) 방식으로 병합합니다.
merge() 는 concat() 처럼 먼저 입력된 Publisher 의 Sequence 가 종료될 때까지 나머지 Publisher 의 Sequence 가 Subscribe 되지 않고 대기하는 것이 아니라 모든 Publisher 의 Sequence 가 즉시 subscribe 됩니다
인터리브(interleave)
인터리브는 '교차로 배치하다'라는 의미가 있습니다. 컴퓨터 하드디스크상의 데이터를 서로 인접하지 않게 배열해 성능을 향상시키거나 인접한 메모리 위치를 서로 다른 메모리 뱅크에 두어 동시에 여러 곳을 접근할 수 있게 해 주는 것을 인터리빙이라고 합니다
merge() Operator 의 마블 다이어그램을 보면 두 개의 Sequence 에서 emit 되는 데이터가 서로 교차되는(interleaved) 방식으로 merge 되고 있는 것을 볼 수 있습니다.
그런데 주의해야 할 것은 인터리빙 방식이라고 해서 각각의 Publisher 가 emit 하는 데이터를 하나씩 번갈아 가며 merge 한다는 것이 아니라 emit 된 시간 순서대로 merge 한다는 것입니다코드 14-33 merge 예제 1
public static void main(String[] args) throws InterruptedException { Flux .merge( Flux.just(1, 2, 3, 4).delayElements(Duration.ofMillis(300L)), Flux.just(5, 6, 7).delayElements(Duration.ofMillis(500L)) ) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(2000L); } //result > Task :Example14_33.main() 17:42:32.493 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 17:42:32.869 [parallel-1] c.o.Example14_33 INFO - # onNext: 1 17:42:33.069 [parallel-2] c.o.Example14_33 INFO - # onNext: 5 17:42:33.183 [parallel-3] c.o.Example14_33 INFO - # onNext: 2 17:42:33.486 [parallel-5] c.o.Example14_33 INFO - # onNext: 3 17:42:33.572 [parallel-4] c.o.Example14_33 INFO - # onNext: 6 17:42:33.789 [parallel-6] c.o.Example14_33 INFO - # onNext: 4 17:42:34.074 [parallel-7] c.o.Example14_33 INFO - # onNext: 7
delayElements() 를 사용해서 첫 번째 파라미터의 Flux 는 0.3초에 한 번씩 데이터를 emit 하게 설정했고,
두 번째 파라미터의 Flux 는 0.5초에 한 번식 데이터를 emit 하도록 설정했습니다.
각각의 [parallel-#] 스레드에서 Publisher 가 emit 하는 시간 이 빠른 데이터부터 차례대로 emit 하고 있는 것을 볼 수 있습니다.
ms 1st Flux 2nd Flux 300 1 500 5 600 2 900 3 1000 6 1200 4 1500 7 코드 13-34 merge 예제 2
public static void main(String[] args) throws InterruptedException { String[] usaStates = { "Ohio", "Michigan", "New Jersey", "Illinois", "New Hampshire", "Virginia", "Vermont", "North Carolina", "Ontario", "Georgia" }; Flux .merge(getMeltDownRecoveryMessage(usaStates)) .subscribe(log::info); Thread.sleep(2000L); } private static List<Mono<String>> getMeltDownRecoveryMessage(String[] usaStates) { List<Mono<String>> messages = new ArrayList<>(); for (String state : usaStates) { messages.add(SampleData.nppMap.get(state)); } return messages; } public class SampleData { public static Map<String, Mono<String>> nppMap = new HashMap<>(); static { nppMap.put("Ontario", Mono.just("1500 Ontario Done").delayElement(Duration.ofMillis(1500L))); nppMap.put("Vermont", Mono.just("400 Vermont Done").delayElement(Duration.ofMillis(400L))); nppMap.put("New Hampshire", Mono.just("700 New Hampshire Done").delayElement(Duration.ofMillis(700L))); nppMap.put("New Jersey", Mono.just("500 New Jersey Done").delayElement(Duration.ofMillis(500L))); nppMap.put("Ohio", Mono.just("1000 Ohio Done").delayElement(Duration.ofMillis(1000L))); nppMap.put("Michigan", Mono.just("200 Michigan Done").delayElement(Duration.ofMillis(200L))); nppMap.put("Illinois", Mono.just("300 Illinois Done").delayElement(Duration.ofMillis(300L))); nppMap.put("Virginia", Mono.just("600 Virginia Done").delayElement(Duration.ofMillis(600L))); nppMap.put("North Carolina", Mono.just("800 North Carolina Done").delayElement(Duration.ofMillis(800L))); nppMap.put("Georgia", Mono.just("900 Georgia Done").delayElement(Duration.ofMillis(900L))); for (char c = 'a'; c <= 'a' + 25; c++) { morseCodeMap.put(morseCodes[c - ('z' - 25)], Character.toString(c)); } } } // result > Task :Example14_34.main() 17:59:11.234 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 18:04:03.824 [parallel-2] c.o.Example14_34 INFO - 200 Michigan Done 18:04:03.922 [parallel-4] c.o.Example14_34 INFO - 300 Illinois Done 18:04:04.025 [parallel-7] c.o.Example14_34 INFO - 400 Vermont Done 18:04:04.125 [parallel-3] c.o.Example14_34 INFO - 500 New Jersey Done 18:04:04.225 [parallel-6] c.o.Example14_34 INFO - 600 Virginia Done 18:04:04.323 [parallel-5] c.o.Example14_34 INFO - 700 New Hampshire Done 18:04:04.429 [parallel-8] c.o.Example14_34 INFO - 800 North Carolina Done 18:04:04.527 [parallel-10] c.o.Example14_34 INFO - 900 Georgia Done 18:04:04.622 [parallel-1] c.o.Example14_34 INFO - 1000 Ohio Done 18:04:05.125 [parallel-9] c.o.Example14_34 INFO - 1500 Ontario Done
주별 원자로 복구되는 시간 순서대로 작성한 Sample Data 예제 입니다
입력된 순서가 아닌, delayElement(Duration.ofMillis()) 시간 순서대로 데이터가 emit 되었습니다
5) zip
zip() Operator s s 파라미터로 입력되는 Publisher Sequence 에서 emit 된 데이터를 결합하는데,
각 publisher 가 데이터를 하나씩 emit 할 때까지 기다렸다가 결합합니다
public static void main(String[] args) throws InterruptedException { Flux .zip( Flux.just(1, 2, 3).delayElements(Duration.ofMillis(300L)), Flux.just(4, 5, 6).delayElements(Duration.ofMillis(500L)) ) .subscribe(tuple2 -> log.info("# onNext: {}", tuple2)); Thread.sleep(2500L); } // result > Task :Example14_35.main() 18:16:08.046 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 18:16:08.614 [parallel-2] c.o.Example14_35 INFO - # onNext: [1,4] 18:16:09.118 [parallel-4] c.o.Example14_35 INFO - # onNext: [2,5] 18:16:09.620 [parallel-6] c.o.Example14_35 INFO - # onNext: [3,6]
zip() Operator 의 첫 번째 파라미터인 Flux 에서 0.3초에 한번식 데이터를 emit 하고,
두번째 Flux 에서 0.5초에 한 번씩 데이터를 emit 합니다
두 개의 Flux 가 emit 하는 시간이 다르지만 각 Flux 에서 하나씩 emit 할 때까지 기다렸다가,
emit 된 데이터를 Tuple2 객체로 묶어서 Subscriber 에게 전달합니다
3rd 파라미터 combinator
코드 14-36 zip 예제 2
public static void main(String[] args) throws InterruptedException { Flux .zip( Flux.just(1, 2, 3).delayElements(Duration.ofMillis(300L)), Flux.just(4, 5, 6).delayElements(Duration.ofMillis(500L)), (n1, n2) -> n1 * n2 ) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(2500L); } //result > Task :Example14_36.main() 18:18:53.351 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 18:18:53.919 [parallel-2] c.o.Example14_36 INFO - # onNext: 4 // 1 * 4 18:18:54.425 [parallel-4] c.o.Example14_36 INFO - # onNext: 10 // 2 * 5 18:18:54.928 [parallel-6] c.o.Example14_36 INFO - # onNext: 18 // 3 * 6
zip() Operator 의 세 번째 파라미터로 combinator (BiFunction 함수형 인터페이스)를 추가해서,
두 개의 Flux 가 emit 하는 한 쌍의 데이터를 combinator 에서 전달받아 변환 작업을 거친 후,
최종 변환된 데이터를 Subscriber 에게 전달합니다.
코드 14-36 에서는 한 쌍의 데이터를 서로 곱한 값을 Subscriber 에게 전달합니다
코드 14-37 zip 예제 3
public static void main(String[] args) throws InterruptedException { getInfectedPersonsPerHour(10, 21) .subscribe(tuples -> { Tuple3<Tuple2, Tuple2, Tuple2> t3 = (Tuple3) tuples; int sum = (int) t3.getT1().getT2() + (int) t3.getT2().getT2() + (int) t3.getT3().getT2(); log.info("# onNext: time : {}, sum : {}", t3.getT1().getT1(), sum); }); } private static Flux getInfectedPersonsPerHour(int start, int end) { return Flux.zip( Flux.fromIterable(SampleData.seoulInfected) .filter(t2 -> t2.getT1() >= start && t2.getT1() <= end), Flux.fromIterable(SampleData.incheonInfected) .filter(t2 -> t2.getT1() >= start && t2.getT1() <= end), Flux.fromIterable(SampleData.suwonInfected) .filter(t2 -> t2.getT1() >= start && t2.getT1() <= end) ); } public class SampleData { public static final List<Tuple2<Integer, Integer>> seoulInfected = Arrays.asList( Tuples.of(1, 0), Tuples.of(2, 0), Tuples.of(3, 0), Tuples.of(4, 0), Tuples.of(5, 0), Tuples.of(6, 0), Tuples.of(7, 0), Tuples.of(8, 0), Tuples.of(9, 0), Tuples.of(10, 20), Tuples.of(11, 23), Tuples.of(12, 33), Tuples.of(13, 10), Tuples.of(14, 15), Tuples.of(15, 20), Tuples.of(16, 30), Tuples.of(17, 10), Tuples.of(18, 11), Tuples.of(19, 13), Tuples.of(20, 8), Tuples.of(21, 14), Tuples.of(22, 4), Tuples.of(23, 7), Tuples.of(24, 2) ); public static final List<Tuple2<Integer, Integer>> incheonInfected = Arrays.asList( Tuples.of(1, 0), Tuples.of(2, 0), Tuples.of(3, 0), Tuples.of(4, 0), Tuples.of(5, 0), Tuples.of(6, 0), Tuples.of(7, 0), Tuples.of(8, 0), Tuples.of(9, 0), Tuples.of(10, 3), Tuples.of(11, 5), Tuples.of(12, 2), Tuples.of(13, 10), Tuples.of(14, 5), Tuples.of(15, 6), Tuples.of(16, 7), Tuples.of(17, 2), Tuples.of(18, 5), Tuples.of(19, 2), Tuples.of(20, 0), Tuples.of(21, 2), Tuples.of(22, 0), Tuples.of(23, 2), Tuples.of(24, 1) ); public static final List<Tuple2<Integer, Integer>> suwonInfected = Arrays.asList( Tuples.of(1, 0), Tuples.of(2, 0), Tuples.of(3, 0), Tuples.of(4, 0), Tuples.of(5, 0), Tuples.of(6, 0), Tuples.of(7, 0), Tuples.of(8, 0), Tuples.of(9, 0), Tuples.of(10, 2), Tuples.of(11, 1), Tuples.of(12, 0), Tuples.of(13, 3), Tuples.of(14, 2), Tuples.of(15, 3), Tuples.of(16, 6), Tuples.of(17, 3), Tuples.of(18, 1), Tuples.of(19, 1), Tuples.of(20, 0), Tuples.of(21, 0), Tuples.of(22, 1), Tuples.of(23, 0), Tuples.of(24, 0) ); } // result > Task :Example14_37.main() 18:29:43.627 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 18:43:22.394 [main] c.o.Example14_37 INFO - # onNext: time : 10, sum : 25 18:43:22.397 [main] c.o.Example14_37 INFO - # onNext: time : 11, sum : 29 18:43:22.397 [main] c.o.Example14_37 INFO - # onNext: time : 12, sum : 35 18:43:22.397 [main] c.o.Example14_37 INFO - # onNext: time : 13, sum : 23 18:43:22.397 [main] c.o.Example14_37 INFO - # onNext: time : 14, sum : 22 18:43:22.397 [main] c.o.Example14_37 INFO - # onNext: time : 15, sum : 29 18:43:22.398 [main] c.o.Example14_37 INFO - # onNext: time : 16, sum : 43 18:43:22.398 [main] c.o.Example14_37 INFO - # onNext: time : 17, sum : 15 18:43:22.398 [main] c.o.Example14_37 INFO - # onNext: time : 18, sum : 17 18:43:22.398 [main] c.o.Example14_37 INFO - # onNext: time : 19, sum : 16 18:43:22.398 [main] c.o.Example14_37 INFO - # onNext: time : 20, sum : 8 18:43:22.398 [main] c.o.Example14_37 INFO - # onNext: time : 21, sum : 16
zip() Operator 를 사용해서 수도권에 있는 세 개 도시의 특정 시간 범위 내의 코로나 확진자 수를 출력합니다
zip() 파라미터로 총 세 개의 Flux 를 전달했습니다.
그리고 getInfectedPersonPerHour() 메서드의 파라미터로 특정 시간 범위를 전달해서 해당 범위 내에서만 시간별로 통계를 내고 있습니다
6) and
and() Operator 는 Mono 의 Complete Signal 과 파라미터로 입력된 Publisher 의 Complete Signal 을 결합하여 새로운 Mono<Void>를 반환합니다
즉, Mono 와 파라미터로 입력된 Publisher 의 Sequence 가 모두 종료되었음을 Subscriber 에게 알릴 수 있습니다
and() Operator 의 마블 다이어그램을 보면, and() Operator 위쪽의 Sequence 와 and() Operator 내부의 Sequence 가 종료되어도 Downstream 쪽으로 emit 되는 데이터가 하나도 없는 것을 볼 수 있습니다
코드 14-38 and 예제 1
public static void main(String[] args) throws InterruptedException { Mono .just("Task 1") .delayElement(Duration.ofSeconds(1)) .doOnNext(data -> log.info("# Mono doOnNext: {}", data)) .and( Flux .just("Task 2", "Task 3") .delayElements(Duration.ofMillis(600)) .doOnNext(data -> log.info("# Flux doOnNext: {}", data)) ) .subscribe( data -> log.info("# onNext: {}", data), error -> log.error("# onError:", error), () -> log.info("# onComplete") ); Thread.sleep(5000); } // result > Task :Example14_38.main() 21:52:33.887 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 21:52:34.601 [parallel-2] c.o.Example14_38 INFO - # Flux doOnNext: Task 2 21:52:34.993 [parallel-1] c.o.Example14_38 INFO - # Mono doOnNext: Task 1 21:52:35.216 [parallel-3] c.o.Example14_38 INFO - # Flux doOnNext: Task 3 21:52:35.217 [parallel-3] c.o.Example14_38 INFO - # onComplete
and() Operator 를 기준으로 Upstream 에서 1초의 지연 시간을 가진 뒤 "Task 1" 을 emit 하고,
and() 내부의 Inner Sequence 에서는 0.6초에 한 번씩 "Task2", "Task 3" 을 emit 합니다
결과적으로 Subscriber 에게 onComplete Signal 만 전달되고, Upstream 에서 emit 된 데이터는 전달되지 않습니다.
즉, and() Operator 는 모든 Sequence 가 종료되길 기다렸다가 최종적으로 onComplte Signal 만 전송합니다.
코드 14-39 and 예제 2
public static void main(String[] args) throws InterruptedException { restartApplicationServer() .and(restartDBServer()) .subscribe( data -> log.info("# onNext: {}", data), error -> log.error("# onError:", error), () -> log.info("# sent an email to Administrator: " + "All Servers are restarted successfully") ); Thread.sleep(6000L); } private static Mono<String> restartApplicationServer() { return Mono .just("Application Server was restarted successfully.") .delayElement(Duration.ofSeconds(2)) .doOnNext(log::info); } private static Publisher<String> restartDBServer() { return Mono .just("DB Server was restarted successfully.") .delayElement(Duration.ofSeconds(4)) .doOnNext(log::info); } // result > Task :Example14_39.main() 22:05:40.235 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 22:05:42.261 [parallel-1] c.o.Example14_39 INFO - Application Server was restarted successfully. 22:05:44.259 [parallel-2] c.o.Example14_39 INFO - DB Server was restarted successfully. 22:05:44.259 [parallel-2] c.o.Example14_39 INFO - # sent an email to Administrator: All Servers are restarted successfully
and() Operator 를 이용해서 두 개의 서버를 재가동하는 상황을 시뮬레이션 했습니다
먼저 애플리케이션 서버가 2초 뒤에 재가동되었고, 4초 뒤에 DB서버가 재가동 되었습니다
두 개의 서버가 성공적으로 재가동된 후, 두 개 서버의 재가동이 성공적으로 수행되었음을 이메일로 관리자에게 알리고 있습니다.
이처럼 and() Operator 는 모든 작업이 끝난 시점에 최종적으로 후처리 작업을 수행하기 적합한 Operator 입니다
7) collectList
collectList() Operator 는 Flux 에서 emit 된 데이터를 모아서 List 로 변환한 후,
변환된 List 를 emit 하는 Mono 를 반환합니다.
만약, Upstream Sequence 가 비어있다면 비어 있는 List 를 Downstream 으로 emit 합니다
코드 14-40 collectList 예제
public static void main(String[] args) { Flux .just("...", "---", "...") .map(code -> transformMorseCode(code)) .collectList() .subscribe(list -> log.info(list.stream().collect(Collectors.joining()))); } public static String transformMorseCode(String morseCode) { return SampleData.morseCodeMap.get(morseCode); } public class SampleData { public static Map<String, String> morseCodeMap = new HashMap<>(); public static String[] morseCodes = { ".-", "-...", "-.-.", "-..", ".", "..-.", "--.", "....", "..", ".---", "-.-", ".-..", "--", "-.", "---", ".--.", "--.-", ".-.", "...", "-", "..-", "...-", ".--", "-..-", "-.--", "--.."}; static { for (char c = 'a'; c <= 'a' + 25; c++) { morseCodeMap.put(morseCodes[c - ('z' - 25)], Character.toString(c)); } } } // result > Task :Example14_40.main() 22:15:27.722 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 22:15:27.776 [main] c.o.Example14_40 INFO - sos
collectList() Operator 를 이용해 Upstream 에서 emit 되는 모스 부호를 해석한 문자를 List 로 변환한 후, Subscriber 에게 전달합니다.
Subscriber 는 전달받은 List 에 포함된 문자를 Stream 을 사용해서 연결한 후, 모스 부호를 최종적으로 해석합니다
8) collectMap
collectMap() Operator 는 Flux 에서 emit 된 데이터를 기반으로 key 와 value 를 생성하여 Map 의 Element 로 추가한 후,
최종적으로 Map 을 emit 하는 Mono 를 반환합니다
코드 14-41 collectMap 예제
public static void main(String[] args) { Flux .range(0, 26) .collectMap(key -> SampleData.morseCodes[key], value -> transformToLetter(value)) .subscribe(map -> log.info("# onNext: {}", map)); } private static String transformToLetter(int value) { return Character.toString((char) ('a' + value)); } // result > Task :Example14_41.main() 22:30:09.735 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 22:30:09.783 [main] c.o.Example14_41 INFO - # onNext: {..=i, .---=j, --.-=q, ---=o, --.=g, .--=w, .-.=r, -.--=y, ....=h, -.-.=c, --=m, -.=n, .-..=l, ...-=v, -.-=k, -..=d, -=t, ..-=u, .=e, ...=s, -...=b, ..-.=f, .--.=p, -..-=x, --..=z, .-=a}
range() Operator 를 이용해서 0부터 25까지의 숫자를 차례대로 emit 합니다
그런 다음 collectMap() 의 첫 번째 파라미터인 람다 표현식에서 range() 로부터 emit 된 숫자를 기반으로 SampleData의 morseCodes 배열의 모스 부호를 차례대로 읽어 옵니다
이렇게 읽어 온 모스 부호는 Map 원소의 Key 가 됩니다
collectMap() 의 두번째 파라미터인 람다 표현식에서는 range() 로 부터 emit 된 숫자를 a부터 z까지의 알파벳으로 변환합니다
변환된 a부터 z까지의 알파벳은 Map원소의 value 가 됩니다
'Spring > Webflux' 카테고리의 다른 글
14장 Operator 5 - Error (0) 2023.07.22 14장 Operator 4 - peek (0) 2023.07.22 14장 Operator 2 - filter (0) 2023.07.09 14장 Operator 1 - Sequence 생성 (0) 2023.07.08 13장 Testing (0) 2023.05.27