-
14장 Operator 2 - filterSpring/Webflux 2023. 7. 9. 16:51
14.3 Sequence 필터링 Operator
1) filter
filter() Operator 는 Upstream 에서 emit 된 데이터 중에서 조건에 일치하는 데이터만 Downstream 으로 emit 합니다.
즉, filter Operator 의 파라미터로 입력받은 Predicate 의 리턴값이 true 인 데이터만 Downstream 으로 emit 합니다
코드 14-15 filter 예제 1
홀수 filter
public static void main(String[] args) { Flux .range(1, 20) .filter(num -> num % 2 != 0) .subscribe(data -> log.info("# onNext: {}", data)); } // result > Task :Example14_15.main() 17:43:46.355 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 17:43:46.364 [main] c.operator_2_filter.Example14_15 INFO - # onNext: 1 17:43:46.366 [main] c.operator_2_filter.Example14_15 INFO - # onNext: 3 17:43:46.366 [main] c.operator_2_filter.Example14_15 INFO - # onNext: 5 17:43:46.366 [main] c.operator_2_filter.Example14_15 INFO - # onNext: 7 17:43:46.366 [main] c.operator_2_filter.Example14_15 INFO - # onNext: 9 17:43:46.366 [main] c.operator_2_filter.Example14_15 INFO - # onNext: 11 17:43:46.366 [main] c.operator_2_filter.Example14_15 INFO - # onNext: 13 17:43:46.367 [main] c.operator_2_filter.Example14_15 INFO - # onNext: 15 17:43:46.367 [main] c.operator_2_filter.Example14_15 INFO - # onNext: 17 17:43:46.367 [main] c.operator_2_filter.Example14_15 INFO - # onNext: 19
14-16 filter 예제 2
btc Top Prices 가 2천만 초과한 연도 출력
public static void main(String[] args) { Flux .fromIterable(SampleData.btcTopPricesPerYear) .filter(tuple -> tuple.getT2() > 20_000_000) .subscribe(data -> log.info(data.getT1() + ":" + data.getT2())); } public class SampleData { public static final List<Tuple2<Integer, Long>> btcTopPricesPerYear = Arrays.asList( Tuples.of(2010, 565L), Tuples.of(2011, 36_094L), Tuples.of(2012, 17_425L), Tuples.of(2013, 1_405_209L), Tuples.of(2014, 1_237_182L), Tuples.of(2015, 557_603L), Tuples.of(2016, 1_111_811L), Tuples.of(2017, 22_483_583L), Tuples.of(2018, 19_521_543L), Tuples.of(2019, 15_761_568L), Tuples.of(2020, 22_439_002L), Tuples.of(2021, 63_364_000L) ); } // result > Task :Example14_16.main() 17:45:39.689 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 17:45:39.790 [main] c.operator_2_filter.Example14_16 INFO - 2017:22483583 17:45:39.790 [main] c.operator_2_filter.Example14_16 INFO - 2020:22439002 17:45:39.790 [main] c.operator_2_filter.Example14_16 INFO - 2021:63364000
filterWhen
내부의 asynchronously Flux 에 의해 emitted 된 데이터에 filter 를 적용합니다.
내부의 flux 에 emitted 된 데이터가 복수개 일때는 첫번째 값만 true 인지 판정합니다
코드 14-17 filterWhen 예제
수량 3_000_000 이상만 emit 하는 예제
public static void main(String[] args) throws InterruptedException { Map<CovidVaccine, Tuple2<CovidVaccine, Integer>> vaccineMap = getCovidVaccines(); Flux .fromIterable(SampleData.coronaVaccineNames) .filterWhen(vaccine -> Mono .just(vaccineMap.get(vaccine).getT2() >= 3_000_000) .publishOn(Schedulers.parallel())) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(1000); } public class SampleData { // upstream data public static final List<CovidVaccine> coronaVaccineNames = CovidVaccine.toList(); public enum CovidVaccine { Pfizer, AstraZeneca, Moderna, Janssen, Novavax; public static List<CovidVaccine> toList() { return Arrays.asList( CovidVaccine.Pfizer, CovidVaccine.AstraZeneca, CovidVaccine.Moderna, CovidVaccine.Janssen, CovidVaccine.Novavax ); } } // downstream data public static Map<CovidVaccine, Tuple2<CovidVaccine, Integer>> getCovidVaccines() { return coronaVaccines .stream() .collect(Collectors.toMap(t1 -> t1.getT1(), t2 -> t2)); } public static final List<Tuple2<CovidVaccine, Integer>> coronaVaccines = Arrays.asList( Tuples.of(CovidVaccine.Pfizer, 1_000_000), Tuples.of(CovidVaccine.AstraZeneca, 3_000_000), Tuples.of(CovidVaccine.Moderna, 4_000_000), Tuples.of(CovidVaccine.Janssen, 2_000_000), Tuples.of(CovidVaccine.Novavax, 2_500_000) ); } // result > Task :Example14_17.main() 20:36:59.527 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 20:36:59.628 [parallel-2] c.operator_2_filter.Example14_17 INFO - # onNext: AstraZeneca 20:36:59.639 [parallel-3] c.operator_2_filter.Example14_17 INFO - # onNext: Moderna
비동기적으로 필터링을 수행하는 filterWhen() Operator 를 사용한 예제 코드입니다
동작과정
01 fromIterable() Operator 로 코로나 백신명을 emit 합니다
02 filterWhen() Operator 에서 코로나 백신명을 전달받습니다
03 filterWhen() Operator 의 Inner Sequece 를 통해 백신명에 해당하는 백신의 수량이 3,000,000 개 이상이라면 해당 백신명을 Subscriber 에게 emit 합니다
filterWhen() Operator 는 내부에서 Inner Sequence 를 통해 조건에 맞는 데이터인지를 비동기적으로 테스트한 후,
테스트 결과가 true 라면 filterWhen() 의 Upstream 으로부터 전달받은 데이터를 Downsteam 으로 emit 합니다
Upstream
List<CovidVaccine> coronaVaccineNames = Arrays.asList(
CovidVaccine.Pfizer,
CovidVaccine.AstraZeneca,
CovidVaccine.Moderna,
CovidVaccine.Janssen,
CovidVaccine.Novavax
);filterWhen() Inner Sequence
List<Tuple2<CovidVaccine, Integer>> coronaVaccines = Arrays.asList(
Tuples.of(CovidVaccine.Pfizer, 1_000_000),
Tuples.of(CovidVaccine.AstraZeneca, 3_000_000),
Tuples.of(CovidVaccine.Moderna, 4_000_000),
Tuples.of(CovidVaccine.Janssen, 2_000_000),
Tuples.of(CovidVaccine.Novavax, 2_500_000)
);2) skip
skip(long skipped)
skip() Operator 는 Upstream 에서 emit 된 데이터 중에서 파라미터로 입력받은 숫자만큼 건너띈 후,
나머지 데이터를 Downstream 으로 emit 합니다
코드 14-18 skip 예제 1
public static void main(String[] args) throws InterruptedException { Flux .interval(Duration.ofSeconds(1)) .skip(2) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(5500L); } // result > Task :Example14_18.main() 21:11:22.033 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 21:11:25.052 [parallel-1] c.operator_2_filter.Example14_18 INFO - # onNext: 2 21:11:26.054 [parallel-1] c.operator_2_filter.Example14_18 INFO - # onNext: 3 21:11:27.053 [parallel-1] c.operator_2_filter.Example14_18 INFO - # onNext: 4
skip(2) 가 적용되어 0, 1 을 건너띄고, 2,3,4... 가 emit 됩니다
skip(Duration timespan)
코드 14-19 skip 예제 2
skip(Duration timespan)
public static void main(String[] args) throws InterruptedException { Flux .interval(Duration.ofMillis(300)) .skip(Duration.ofSeconds(1)) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(2000L); } // result > Task :Example14_19.main() 21:15:23.845 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 21:15:25.100 [parallel-2] c.operator_2_filter.Example14_19 INFO - # onNext: 3 21:15:25.400 [parallel-2] c.operator_2_filter.Example14_19 INFO - # onNext: 4 21:15:25.700 [parallel-2] c.operator_2_filter.Example14_19 INFO - # onNext: 5
skip() Operator 의 파라미터로 시간을 지정하면 파라미터로 지정한 시간 내에 emit 된 데이터를 건너뛴 후, 나머지 데이터를 emit 합니다.
주기가 300ms 이므로 0,1,2 를 건너띄고, 3,4,5 가 emit 됩니다
ms data 300 0 600 1 900 2 1200 3 1500 4 1800 5 14-20 skip 예제 3
btcTopPricesPerYear 에 filter 와 skip 동시 적용 예제
public static void main(String[] args) { Flux .fromIterable(SampleData.btcTopPricesPerYear) .filter(tuple -> tuple.getT2() >= 20_000_000) .skip(2) .subscribe(tuple -> log.info("{}, {}", tuple.getT1(), tuple.getT2())); } public class SampleData { public static final List<Tuple2<Integer, Long>> btcTopPricesPerYear = Arrays.asList( Tuples.of(2010, 565L), Tuples.of(2011, 36_094L), Tuples.of(2012, 17_425L), Tuples.of(2013, 1_405_209L), Tuples.of(2014, 1_237_182L), Tuples.of(2015, 557_603L), Tuples.of(2016, 1_111_811L), Tuples.of(2017, 22_483_583L), Tuples.of(2018, 19_521_543L), Tuples.of(2019, 15_761_568L), Tuples.of(2020, 22_439_002L), Tuples.of(2021, 63_364_000L) ); // result > Task :Example14_20.main() 21:24:58.883 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 21:24:58.971 [main] c.operator_2_filter.Example14_20 INFO - 2021, 63364000
filter( >= 20,000,000 ) 이 넘는 3개 중 skip(2) 한 후 2021, 63_364_000L 만 emit 되었습니다
# 연도 TopPrice 1 2017 22_483_583L 2 2020 22_439_002L 3 2021 63_364_000L 3) take
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#take-long-
take() Operator 는 Upstream 에서 emit 되는 데이터 중에서 파라미터로 입력받은 숫자만큼만 Downstream 으로 emit 합니다
코드 14-21 take 예제 1
public static void main(String[] args) throws InterruptedException { Flux .interval(Duration.ofSeconds(1)) .take(3) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(4000L); } // result > Task :Example14_21.main() 21:39:16.182 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 21:39:17.201 [parallel-1] c.operator_2_filter.Example14_21 INFO - # onNext: 0 21:39:18.200 [parallel-1] c.operator_2_filter.Example14_21 INFO - # onNext: 1 21:39:19.200 [parallel-1] c.operator_2_filter.Example14_21 INFO - # onNext: 2
interval(Duration.ofSeconds(1)) 에서 0부터 1증가한 숫자를 무한히 emit 합니다
하지만 take() Operator 의 파라미터로 3을 지정했기 때문에 세 개의 데이터만 Subscriber 에게 전달하고 Sequence 가 종료됩니다
take(Duration timespan)
코드 14-22 take 예제 2
public static void main(String[] args) throws InterruptedException { Flux .interval(Duration.ofSeconds(1)) .take(Duration.ofMillis(2500)) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(3000L); } // result > Task :Example14_22.main() 21:44:20.873 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 21:44:21.928 [parallel-2] c.operator_2_filter.Example14_22 INFO - # onNext: 0 21:44:22.926 [parallel-2] c.operator_2_filter.Example14_22 INFO - # onNext: 1
interval(Duration.ofSeconds(1)) 로 주기 1초로 데이터 emit
take(Duration.ofMillis(2500)) 2.5 초 시간 내에 emit 되는 데이터만 Downstream 으로 emit 합니다
ms data 1000 1 2000 2 3000 3 takeLast()
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#takeLast-int-
코드 14-23 takeLast 예제
public static void main(String[] args) { Flux .fromIterable(SampleData.btcTopPricesPerYear) .takeLast(2) .subscribe(tuple -> log.info("# onNext: {}, {}", tuple.getT1(), tuple.getT2())); } public class SampleData { public static final List<Tuple2<Integer, Long>> btcTopPricesPerYear = Arrays.asList( Tuples.of(2010, 565L), Tuples.of(2011, 36_094L), Tuples.of(2012, 17_425L), Tuples.of(2013, 1_405_209L), Tuples.of(2014, 1_237_182L), Tuples.of(2015, 557_603L), Tuples.of(2016, 1_111_811L), Tuples.of(2017, 22_483_583L), Tuples.of(2018, 19_521_543L), Tuples.of(2019, 15_761_568L), Tuples.of(2020, 22_439_002L), Tuples.of(2021, 63_364_000L) ); } // result > Task :Example14_23.main() 21:49:11.332 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 21:49:11.415 [main] c.operator_2_filter.Example14_23 INFO - # onNext: 2020, 22439002 21:49:11.417 [main] c.operator_2_filter.Example14_23 INFO - # onNext: 2021, 63364000
takeLast() Operator 는 Upstream 에서 emit 된 데이터 중에서 파라미터로 입력한 개수만큼 가장 마지막에 emit 된 데이터를 Downstream 으로 emit 합니다
takeUntil()
코드 14-24 takeUntil 예제
public static void main(String[] args) { Flux .fromIterable(SampleData.btcTopPricesPerYear) .takeUntil(tuple -> tuple.getT2() > 20_000_000) .subscribe(tuple -> log.info("# onNext: {}, {}", tuple.getT1(), tuple.getT2())); } public class SampleData { public static final List<Tuple2<Integer, Long>> btcTopPricesPerYear = Arrays.asList( Tuples.of(2010, 565L), Tuples.of(2011, 36_094L), Tuples.of(2012, 17_425L), Tuples.of(2013, 1_405_209L), Tuples.of(2014, 1_237_182L), Tuples.of(2015, 557_603L), Tuples.of(2016, 1_111_811L), Tuples.of(2017, 22_483_583L), Tuples.of(2018, 19_521_543L), Tuples.of(2019, 15_761_568L), Tuples.of(2020, 22_439_002L), Tuples.of(2021, 63_364_000L) ); } // result > Task :Example14_24.main() 21:56:05.608 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 21:56:05.693 [main] c.operator_2_filter.Example14_24 INFO - # onNext: 2010, 565 21:56:05.694 [main] c.operator_2_filter.Example14_24 INFO - # onNext: 2011, 36094 21:56:05.694 [main] c.operator_2_filter.Example14_24 INFO - # onNext: 2012, 17425 21:56:05.694 [main] c.operator_2_filter.Example14_24 INFO - # onNext: 2013, 1405209 21:56:05.695 [main] c.operator_2_filter.Example14_24 INFO - # onNext: 2014, 1237182 21:56:05.695 [main] c.operator_2_filter.Example14_24 INFO - # onNext: 2015, 557603 21:56:05.695 [main] c.operator_2_filter.Example14_24 INFO - # onNext: 2016, 1111811 21:56:05.695 [main] c.operator_2_filter.Example14_24 INFO - # onNext: 2017, 22483583
takeUntil() Operator 는 파라미터로 입력한 람다 표현식(Predicate, > 20_000_000)이 true 가 될 때가지 Upstream 에서 emit 된 데이터를 Downstream 으로 emit 합니다
Upstream 에서 emit 된 데이터에는 Predicate 을 평가할 때 사용한 데이터 (2017, 22,483,583) 이 포함됩니다
다시말해, Predicate, > 20_000_000 = false 인 경우 + 1st true 의 데이터가 emit 됩니다
코드 14-25 takeWhile 예제
public static void main(String[] args) { Flux .fromIterable(SampleData.btcTopPricesPerYear) .takeWhile(tuple -> tuple.getT2() < 20_000_000) .subscribe(tuple -> log.info("# onNext: {}, {}", tuple.getT1(), tuple.getT2())); } // result > Task :Example14_25.main() 22:01:04.623 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 22:01:04.709 [main] c.operator_2_filter.Example14_25 INFO - # onNext: 2010, 565 22:01:04.710 [main] c.operator_2_filter.Example14_25 INFO - # onNext: 2011, 36094 22:01:04.711 [main] c.operator_2_filter.Example14_25 INFO - # onNext: 2012, 17425 22:01:04.711 [main] c.operator_2_filter.Example14_25 INFO - # onNext: 2013, 1405209 22:01:04.711 [main] c.operator_2_filter.Example14_25 INFO - # onNext: 2014, 1237182 22:01:04.711 [main] c.operator_2_filter.Example14_25 INFO - # onNext: 2015, 557603 22:01:04.711 [main] c.operator_2_filter.Example14_25 INFO - # onNext: 2016, 1111811
takeWhile() Operator 는 takeUntil() Operator 와 달리 파라미터로 입력한 람다 표현식(Predicate, < 20_000_000 ) 이 true 가 되는 동안에만 Upstream에서 emit 된 데이터를 Downstream 으로 emit 합니다.
즉, Upstream 에서 emit 된 데이터가 false 라면 sequence 가 종료됩니다
takeWhile() Operator 는 Predicate 을 평가할 때 사용한 데이터 ( 2017, 22_483_583L ) 가 Downstream 으로 emit 되지 않습니다
4) next
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#next--
next() Operator 는 Upstream 에서 emit 되는 데이터 중에서 첫 번째 데이터만 Downstream 으로 emit 합니다
만일 Upstream 에서 emit 되는 데이터가 empty 라면 Downstream 으로 empty Mono 를 emit 합니다
코드 14-26 next 예제
public static void main(String[] args) { Flux .fromIterable(SampleData.btcTopPricesPerYear) .next() .subscribe(tuple -> log.info("# onNext: {}, {}", tuple.getT1(), tuple.getT2())); } // result > Task :Example14_26.main() 22:09:35.926 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 22:09:36.011 [main] c.operator_2_filter.Example14_26 INFO - # onNext: 2010, 565
next() Operator 를 사용하여 가장 첫 해의 데이터(2010, 565L) 만 출력합니다
'Spring > Webflux' 카테고리의 다른 글
14장 Operator 4 - peek (0) 2023.07.22 14장 Operator 3 - transformation (0) 2023.07.16 14장 Operator 1 - Sequence 생성 (0) 2023.07.08 13장 Testing (0) 2023.05.27 12장 Debugging (0) 2023.05.21