ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 14장 Operator 2 - filter
    Spring/Webflux 2023. 7. 9. 16:51

    github: https://github.com/bjpublic/Spring-Reactive/tree/main/part2/src/main/java/chapter14/operator_2_filter

     

     

    14.3 Sequence 필터링 Operator

    1) filter

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#filter-java.util.function.Predicate-

    filter() Operator 는 Upstream 에서 emit 된 데이터 중에서 조건에 일치하는 데이터만 Downstream 으로 emit 합니다.

    즉, filter Operator 의 파라미터로 입력받은 Predicate 의 리턴값이  true 인 데이터만 Downstream 으로 emit 합니다

     

    코드 14-15 filter 예제 1

    홀수 filter

    public static void main(String[] args) {
      Flux
          .range(1, 20)
          .filter(num -> num % 2 != 0)
          .subscribe(data -> log.info("# onNext: {}", data));
    }
    // result
    > Task :Example14_15.main()
    17:43:46.355 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    17:43:46.364 [main] c.operator_2_filter.Example14_15 INFO - # onNext: 1
    17:43:46.366 [main] c.operator_2_filter.Example14_15 INFO - # onNext: 3
    17:43:46.366 [main] c.operator_2_filter.Example14_15 INFO - # onNext: 5
    17:43:46.366 [main] c.operator_2_filter.Example14_15 INFO - # onNext: 7
    17:43:46.366 [main] c.operator_2_filter.Example14_15 INFO - # onNext: 9
    17:43:46.366 [main] c.operator_2_filter.Example14_15 INFO - # onNext: 11
    17:43:46.366 [main] c.operator_2_filter.Example14_15 INFO - # onNext: 13
    17:43:46.367 [main] c.operator_2_filter.Example14_15 INFO - # onNext: 15
    17:43:46.367 [main] c.operator_2_filter.Example14_15 INFO - # onNext: 17
    17:43:46.367 [main] c.operator_2_filter.Example14_15 INFO - # onNext: 19

     

    14-16 filter 예제 2

    btc Top Prices 가 2천만 초과한 연도 출력

    public static void main(String[] args) {
      Flux
        .fromIterable(SampleData.btcTopPricesPerYear)
        .filter(tuple -> tuple.getT2() > 20_000_000)
        .subscribe(data -> log.info(data.getT1() + ":" + data.getT2()));
    }
    
    public class SampleData {
    	public static final List<Tuple2<Integer, Long>> btcTopPricesPerYear =
    			Arrays.asList(
    					Tuples.of(2010, 565L),
    					Tuples.of(2011, 36_094L),
    					Tuples.of(2012, 17_425L),
    					Tuples.of(2013, 1_405_209L),
    					Tuples.of(2014, 1_237_182L),
    					Tuples.of(2015, 557_603L),
    					Tuples.of(2016, 1_111_811L),
    					Tuples.of(2017, 22_483_583L),
    					Tuples.of(2018, 19_521_543L),
    					Tuples.of(2019, 15_761_568L),
    					Tuples.of(2020, 22_439_002L),
    					Tuples.of(2021, 63_364_000L)
    			);  
    }
    // result 
    
    > Task :Example14_16.main()
    17:45:39.689 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    17:45:39.790 [main] c.operator_2_filter.Example14_16 INFO - 2017:22483583
    17:45:39.790 [main] c.operator_2_filter.Example14_16 INFO - 2020:22439002
    17:45:39.790 [main] c.operator_2_filter.Example14_16 INFO - 2021:63364000

     

    filterWhen

    내부의 asynchronously Flux 에 의해 emitted 된 데이터에 filter 를 적용합니다.

    내부의 flux 에 emitted 된 데이터가 복수개 일때는 첫번째 값만 true 인지 판정합니다

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#filterWhen-java.util.function.Function-

    코드 14-17 filterWhen 예제

    수량 3_000_000 이상만 emit 하는 예제

    public static void main(String[] args) throws InterruptedException {
      Map<CovidVaccine, Tuple2<CovidVaccine, Integer>> vaccineMap =
          getCovidVaccines();
      Flux
          .fromIterable(SampleData.coronaVaccineNames)
          .filterWhen(vaccine -> Mono
              .just(vaccineMap.get(vaccine).getT2() >= 3_000_000)
              .publishOn(Schedulers.parallel()))
          .subscribe(data -> log.info("# onNext: {}", data));
    
      Thread.sleep(1000);
    }
    
    
    
    public class SampleData {
      // upstream data
      public static final List<CovidVaccine> coronaVaccineNames = CovidVaccine.toList();
    
      public enum CovidVaccine {
        Pfizer,
        AstraZeneca,
        Moderna,
        Janssen,
        Novavax;
    
        public static List<CovidVaccine> toList() {
          return Arrays.asList(
              CovidVaccine.Pfizer,
              CovidVaccine.AstraZeneca,
              CovidVaccine.Moderna,
              CovidVaccine.Janssen,
              CovidVaccine.Novavax
          );
        }
      }
      
      // downstream data
      public static Map<CovidVaccine, Tuple2<CovidVaccine, Integer>> getCovidVaccines() {
    		return coronaVaccines
    				.stream()
    				.collect(Collectors.toMap(t1 -> t1.getT1(), t2 -> t2));
      }
    
      public static final List<Tuple2<CovidVaccine, Integer>> coronaVaccines =
    			Arrays.asList(
    					Tuples.of(CovidVaccine.Pfizer, 1_000_000),
    					Tuples.of(CovidVaccine.AstraZeneca, 3_000_000),
    					Tuples.of(CovidVaccine.Moderna, 4_000_000),
    					Tuples.of(CovidVaccine.Janssen, 2_000_000),
    					Tuples.of(CovidVaccine.Novavax, 2_500_000)
    			);
    
    
    }
    
    // result
    > Task :Example14_17.main()
    20:36:59.527 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    20:36:59.628 [parallel-2] c.operator_2_filter.Example14_17 INFO - # onNext: AstraZeneca
    20:36:59.639 [parallel-3] c.operator_2_filter.Example14_17 INFO - # onNext: Moderna

    비동기적으로 필터링을 수행하는 filterWhen() Operator 를 사용한 예제 코드입니다

    동작과정

    01 fromIterable() Operator 로 코로나 백신명을 emit 합니다

    02 filterWhen() Operator 에서 코로나 백신명을 전달받습니다

    03 filterWhen() Operator 의 Inner Sequece 를 통해 백신명에 해당하는 백신의 수량이 3,000,000 개 이상이라면 해당 백신명을 Subscriber 에게 emit 합니다

     

    filterWhen() Operator 는 내부에서 Inner Sequence 를 통해 조건에 맞는 데이터인지를 비동기적으로 테스트한 후, 

    테스트 결과가 true 라면 filterWhen() 의 Upstream 으로부터 전달받은 데이터를 Downsteam 으로 emit 합니다

     

    Upstream

    List<CovidVaccine> coronaVaccineNames = Arrays.asList(
      CovidVaccine.Pfizer,
      CovidVaccine.AstraZeneca,
      CovidVaccine.Moderna,
      CovidVaccine.Janssen,
      CovidVaccine.Novavax
    );

     

    filterWhen() Inner Sequence

    List<Tuple2<CovidVaccine, Integer>> coronaVaccines = Arrays.asList(
      Tuples.of(CovidVaccine.Pfizer, 1_000_000),
      Tuples.of(CovidVaccine.AstraZeneca3_000_000),
      Tuples.of(CovidVaccine.Moderna4_000_000),
      Tuples.of(CovidVaccine.Janssen, 2_000_000),
      Tuples.of(CovidVaccine.Novavax, 2_500_000)
    );

     

    2) skip

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#skip-java.time.Duration-

    skip(long skipped)

    skip() Operator 는 Upstream 에서 emit 된 데이터 중에서 파라미터로 입력받은 숫자만큼 건너띈 후,

    나머지 데이터를 Downstream 으로 emit 합니다

     

    코드 14-18 skip 예제 1

    public static void main(String[] args) throws InterruptedException {
      Flux
          .interval(Duration.ofSeconds(1))
          .skip(2)
          .subscribe(data -> log.info("# onNext: {}", data));
    
      Thread.sleep(5500L);
    }
    // result
    > Task :Example14_18.main()
    21:11:22.033 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    21:11:25.052 [parallel-1] c.operator_2_filter.Example14_18 INFO - # onNext: 2
    21:11:26.054 [parallel-1] c.operator_2_filter.Example14_18 INFO - # onNext: 3
    21:11:27.053 [parallel-1] c.operator_2_filter.Example14_18 INFO - # onNext: 4

    skip(2) 가 적용되어 0, 1 을 건너띄고, 2,3,4...  가 emit 됩니다

     

     

    skip(Duration timespan)

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#skip-java.time.Duration-

    코드 14-19 skip 예제 2

    skip(Duration timespan)

    public static void main(String[] args) throws InterruptedException {
      Flux
          .interval(Duration.ofMillis(300))
          .skip(Duration.ofSeconds(1))
          .subscribe(data -> log.info("# onNext: {}", data));
    
      Thread.sleep(2000L);
    }
    // result
    > Task :Example14_19.main()
    21:15:23.845 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    21:15:25.100 [parallel-2] c.operator_2_filter.Example14_19 INFO - # onNext: 3
    21:15:25.400 [parallel-2] c.operator_2_filter.Example14_19 INFO - # onNext: 4
    21:15:25.700 [parallel-2] c.operator_2_filter.Example14_19 INFO - # onNext: 5

    skip() Operator 의 파라미터로 시간을 지정하면 파라미터로 지정한 시간 내에 emit 된 데이터를 건너뛴 후, 나머지 데이터를 emit 합니다.

    주기가 300ms 이므로 0,1,2 를 건너띄고, 3,4,5 가 emit 됩니다

    ms data
    300 0
    600 1
    900 2
    1200 3
    1500 4
    1800 5

     

    14-20 skip 예제 3

    btcTopPricesPerYear 에 filter 와 skip 동시 적용 예제

    public static void main(String[] args) {
      Flux
          .fromIterable(SampleData.btcTopPricesPerYear)
          .filter(tuple -> tuple.getT2() >= 20_000_000)
          .skip(2)
          .subscribe(tuple -> log.info("{}, {}", tuple.getT1(), tuple.getT2()));
    }
    
    public class SampleData {
      public static final List<Tuple2<Integer, Long>> btcTopPricesPerYear =
        Arrays.asList(
          Tuples.of(2010, 565L),
          Tuples.of(2011, 36_094L),
          Tuples.of(2012, 17_425L),
          Tuples.of(2013, 1_405_209L),
          Tuples.of(2014, 1_237_182L),
          Tuples.of(2015, 557_603L),
          Tuples.of(2016, 1_111_811L),
          Tuples.of(2017, 22_483_583L),
          Tuples.of(2018, 19_521_543L),
          Tuples.of(2019, 15_761_568L),
          Tuples.of(2020, 22_439_002L),
          Tuples.of(2021, 63_364_000L)
    );
    
    
    // result
    > Task :Example14_20.main()
    21:24:58.883 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    21:24:58.971 [main] c.operator_2_filter.Example14_20 INFO - 2021, 63364000

    filter( >= 20,000,000 ) 이 넘는 3개 중 skip(2) 한 후 2021, 63_364_000L 만 emit 되었습니다

    # 연도 TopPrice
    1 2017 22_483_583L
    2 2020 22_439_002L
    3 2021 63_364_000L

     

    3) take

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#take-long-

    take() Operator 는 Upstream 에서 emit 되는 데이터 중에서 파라미터로 입력받은 숫자만큼만 Downstream 으로 emit 합니다

    코드 14-21 take 예제 1

    public static void main(String[] args) throws InterruptedException {
      Flux
          .interval(Duration.ofSeconds(1))
          .take(3)
          .subscribe(data -> log.info("# onNext: {}", data));
    
      Thread.sleep(4000L);
    }
    // result
    > Task :Example14_21.main()
    21:39:16.182 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    21:39:17.201 [parallel-1] c.operator_2_filter.Example14_21 INFO - # onNext: 0
    21:39:18.200 [parallel-1] c.operator_2_filter.Example14_21 INFO - # onNext: 1
    21:39:19.200 [parallel-1] c.operator_2_filter.Example14_21 INFO - # onNext: 2

    interval(Duration.ofSeconds(1)) 에서 0부터 1증가한 숫자를 무한히 emit 합니다

    하지만 take() Operator 의 파라미터로 3을 지정했기 때문에 세 개의 데이터만 Subscriber 에게 전달하고 Sequence 가 종료됩니다

     

     

    take(Duration timespan)

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#take-java.time.Duration-

    코드 14-22 take 예제 2

    public static void main(String[] args) throws InterruptedException {
      Flux
          .interval(Duration.ofSeconds(1))
          .take(Duration.ofMillis(2500))
          .subscribe(data -> log.info("# onNext: {}", data));
    
      Thread.sleep(3000L);
    }
    // result
    > Task :Example14_22.main()
    21:44:20.873 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    21:44:21.928 [parallel-2] c.operator_2_filter.Example14_22 INFO - # onNext: 0
    21:44:22.926 [parallel-2] c.operator_2_filter.Example14_22 INFO - # onNext: 1

    interval(Duration.ofSeconds(1)) 로 주기 1초로 데이터 emit 

    take(Duration.ofMillis(2500)) 2.5 초 시간 내에 emit 되는 데이터만 Downstream 으로 emit 합니다

    ms data
    1000 1
    2000 2
    3000 3

     

    takeLast()

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#takeLast-int-

    코드 14-23 takeLast 예제

    public static void main(String[] args) {
      Flux
          .fromIterable(SampleData.btcTopPricesPerYear)
          .takeLast(2)
          .subscribe(tuple -> log.info("# onNext: {}, {}",
              tuple.getT1(), tuple.getT2()));
    }
    
    public class SampleData {
      public static final List<Tuple2<Integer, Long>> btcTopPricesPerYear =
        Arrays.asList(
          Tuples.of(2010, 565L),
          Tuples.of(2011, 36_094L),
          Tuples.of(2012, 17_425L),
          Tuples.of(2013, 1_405_209L),
          Tuples.of(2014, 1_237_182L),
          Tuples.of(2015, 557_603L),
          Tuples.of(2016, 1_111_811L),
          Tuples.of(2017, 22_483_583L),
          Tuples.of(2018, 19_521_543L),
          Tuples.of(2019, 15_761_568L),
          Tuples.of(2020, 22_439_002L),
          Tuples.of(2021, 63_364_000L)
      );
    }
    
    // result 
    > Task :Example14_23.main()
    21:49:11.332 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    21:49:11.415 [main] c.operator_2_filter.Example14_23 INFO - # onNext: 2020, 22439002
    21:49:11.417 [main] c.operator_2_filter.Example14_23 INFO - # onNext: 2021, 63364000

    takeLast() Operator 는 Upstream 에서 emit 된 데이터 중에서 파라미터로 입력한 개수만큼 가장 마지막에 emit 된 데이터를 Downstream 으로 emit 합니다

     

    takeUntil()

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#takeUntil-java.util.function.Predicate-

    코드 14-24 takeUntil 예제

    public static void main(String[] args) {
      Flux
          .fromIterable(SampleData.btcTopPricesPerYear)
          .takeUntil(tuple -> tuple.getT2() > 20_000_000)
          .subscribe(tuple -> log.info("# onNext: {}, {}",
              tuple.getT1(), tuple.getT2()));
    }
    
    public class SampleData {
      public static final List<Tuple2<Integer, Long>> btcTopPricesPerYear =
        Arrays.asList(
          Tuples.of(2010, 565L),
          Tuples.of(2011, 36_094L),
          Tuples.of(2012, 17_425L),
          Tuples.of(2013, 1_405_209L),
          Tuples.of(2014, 1_237_182L),
          Tuples.of(2015, 557_603L),
          Tuples.of(2016, 1_111_811L),
          Tuples.of(2017, 22_483_583L),
          Tuples.of(2018, 19_521_543L),
          Tuples.of(2019, 15_761_568L),
          Tuples.of(2020, 22_439_002L),
          Tuples.of(2021, 63_364_000L)
      );
    }
    
    
    // result
    > Task :Example14_24.main()
    21:56:05.608 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    21:56:05.693 [main] c.operator_2_filter.Example14_24 INFO - # onNext: 2010, 565
    21:56:05.694 [main] c.operator_2_filter.Example14_24 INFO - # onNext: 2011, 36094
    21:56:05.694 [main] c.operator_2_filter.Example14_24 INFO - # onNext: 2012, 17425
    21:56:05.694 [main] c.operator_2_filter.Example14_24 INFO - # onNext: 2013, 1405209
    21:56:05.695 [main] c.operator_2_filter.Example14_24 INFO - # onNext: 2014, 1237182
    21:56:05.695 [main] c.operator_2_filter.Example14_24 INFO - # onNext: 2015, 557603
    21:56:05.695 [main] c.operator_2_filter.Example14_24 INFO - # onNext: 2016, 1111811
    21:56:05.695 [main] c.operator_2_filter.Example14_24 INFO - # onNext: 2017, 22483583

    takeUntil() Operator 는 파라미터로 입력한 람다 표현식(Predicate, > 20_000_000)이 true 가 될 때가지 Upstream 에서 emit 된 데이터를 Downstream 으로 emit 합니다

    Upstream 에서 emit 된 데이터에는 Predicate 을 평가할 때 사용한 데이터 (2017, 22,483,583) 이 포함됩니다

    다시말해, Predicate, > 20_000_000 = false 인 경우 + 1st true 의 데이터가 emit 됩니다

     

    코드 14-25 takeWhile 예제

    public static void main(String[] args) {
      Flux
          .fromIterable(SampleData.btcTopPricesPerYear)
          .takeWhile(tuple -> tuple.getT2() < 20_000_000)
          .subscribe(tuple -> log.info("# onNext: {}, {}",
              tuple.getT1(), tuple.getT2()));
    }
    
    // result 
    > Task :Example14_25.main()
    22:01:04.623 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    22:01:04.709 [main] c.operator_2_filter.Example14_25 INFO - # onNext: 2010, 565
    22:01:04.710 [main] c.operator_2_filter.Example14_25 INFO - # onNext: 2011, 36094
    22:01:04.711 [main] c.operator_2_filter.Example14_25 INFO - # onNext: 2012, 17425
    22:01:04.711 [main] c.operator_2_filter.Example14_25 INFO - # onNext: 2013, 1405209
    22:01:04.711 [main] c.operator_2_filter.Example14_25 INFO - # onNext: 2014, 1237182
    22:01:04.711 [main] c.operator_2_filter.Example14_25 INFO - # onNext: 2015, 557603
    22:01:04.711 [main] c.operator_2_filter.Example14_25 INFO - # onNext: 2016, 1111811

    takeWhile() Operator 는 takeUntil() Operator 와 달리 파라미터로 입력한 람다 표현식(Predicate, < 20_000_000 ) 이 true 가 되는 동안에만 Upstream에서 emit 된 데이터를 Downstream 으로 emit 합니다. 

    즉, Upstream 에서 emit 된 데이터가 false 라면 sequence 가 종료됩니다

    takeWhile() Operator 는 Predicate 을 평가할 때 사용한 데이터 ( 2017, 22_483_583L ) 가 Downstream 으로 emit 되지 않습니다

     

    4) next

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#next--

    next() Operator 는 Upstream 에서 emit 되는 데이터 중에서 첫 번째 데이터만 Downstream 으로 emit 합니다

    만일 Upstream 에서 emit 되는 데이터가 empty 라면 Downstream 으로 empty Mono 를 emit 합니다

     

    코드 14-26 next 예제 

    public static void main(String[] args) {
      Flux
        .fromIterable(SampleData.btcTopPricesPerYear)
        .next()
        .subscribe(tuple -> log.info("# onNext: {}, {}",
          tuple.getT1(), tuple.getT2()));
    }
    
    // result 
    > Task :Example14_26.main()
    22:09:35.926 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    22:09:36.011 [main] c.operator_2_filter.Example14_26 INFO - # onNext: 2010, 565

    next() Operator 를 사용하여 가장 첫 해의 데이터(2010, 565L) 만 출력합니다

     

     

     

     

     

    'Spring > Webflux' 카테고리의 다른 글

    14장 Operator 4 - peek  (0) 2023.07.22
    14장 Operator 3 - transformation  (0) 2023.07.16
    14장 Operator 1 - Sequence 생성  (0) 2023.07.08
    13장 Testing  (0) 2023.05.27
    12장 Debugging  (0) 2023.05.21

    댓글

Designed by Tistory.