ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 14장 Operator 5 - Error
    Spring/Webflux 2023. 7. 22. 19:51

    github : https://github.com/bjpublic/Spring-Reactive/tree/main/part2/src/main/java/chapter14/operator_5_error

     

    질문

    flatMap vs map ?

     

     

    14.6 에러 처리를 위한 Operator

     

    1) error

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#error-java.util.function.Supplier-

    error() Operator 는 파라미터로 지정된 에러로 종료하는 Flux 를 생성합니다

    error() 는 마치 throw 를 사용해서 예외를 의도적으로 던지는 것 같이 동작하는데

    checked exception 을 캐치해서 다시 던져야 하는 경우에 사용합니다

     

    코드 14-43 error 예제 1

    public static void main(String[] args) {
      Flux
          .range(1, 5)
          .flatMap(num -> {
            if ((num * 2) % 3 == 0) {
              return Flux.error(
                  new IllegalArgumentException("Not allowed multiple of 3"));
            } else {
              return Mono.just(num * 2);
            }
          })
          .subscribe(data -> log.info("# onNext: {}", data),
              error -> log.error("# onError: ", error));
    }
    // result 
    > Task :Example14_43.main()
    18:11:36.067 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    18:11:36.118 [main] c.operator_5_error.Example14_43 INFO - # onNext: 2
    18:11:36.119 [main] c.operator_5_error.Example14_43 INFO - # onNext: 4
    18:11:36.124 [main] c.operator_5_error.Example14_43 ERROR- # onError: 
    java.lang.IllegalArgumentException: Not allowed multiple of 3
    	at chapter14.operator_5_error.Example14_43.lambda$main$0(Example14_43.java:18)
    	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386)
    	at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:156)
    	at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:111)
    	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371)
    	at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:69)
    	at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
    	at reactor.core.publisher.Flux.subscribeWith(Flux.java:8639)
    	at reactor.core.publisher.Flux.subscribe(Flux.java:8436)
    	at reactor.core.publisher.Flux.subscribe(Flux.java:8360)
    	at reactor.core.publisher.Flux.subscribe(Flux.java:8330)
    	at chapter14.operator_5_error.Example14_43.main(Example14_43.java:24)

    1~5 까지 emit 하는데

    num * 2 % 3 ==0 이면 illegalArgumentException 을 발생시킵니다

    3이 emit 되고, Error 가 발생하는 것을 알 수 있습니다

     

    코드 14-44 error 예제 2

    public static void main(String[] args) {
      Flux
          .just('a', 'b', 'c', '3', 'd')
          .flatMap(letter -> {
            try {
              return convert(letter);
            } catch (DataFormatException e) {
              return Flux.error(e);
            }
          })
          .subscribe(data -> log.info("# onNext: {}", data),
              error -> log.error("# onError: ", error));
    }
    
    private static Mono<String> convert(char ch) throws DataFormatException {
      if (!Character.isAlphabetic(ch)) {
        throw new DataFormatException("Not Alphabetic");
      }
      return Mono.just("Converted to " + Character.toUpperCase(ch));
    }
    
    // result 
    > Task :Example14_44.main()
    18:16:23.246 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    18:16:23.293 [main] c.operator_5_error.Example14_44 INFO - # onNext: Converted to A
    18:16:23.294 [main] c.operator_5_error.Example14_44 INFO - # onNext: Converted to B
    18:16:23.294 [main] c.operator_5_error.Example14_44 INFO - # onNext: Converted to C
    18:16:23.299 [main] c.operator_5_error.Example14_44 ERROR- # onError: 
    java.util.zip.DataFormatException: Not Alphabetic
    	at chapter14.operator_5_error.Example14_44.convert(Example14_44.java:33)
    	at chapter14.operator_5_error.Example14_44.lambda$main$0(Example14_44.java:22)
    	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386)
    	at reactor.core.publisher.FluxArray$ArraySubscription.slowPath(FluxArray.java:127)
    	at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:100)
    	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371)
    	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
    	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
    	at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
    	at reactor.core.publisher.Flux.subscribeWith(Flux.java:8639)
    	at reactor.core.publisher.Flux.subscribe(Flux.java:8436)
    	at reactor.core.publisher.Flux.subscribe(Flux.java:8360)
    	at reactor.core.publisher.Flux.subscribe(Flux.java:8330)
    	at chapter14.operator_5_error.Example14_44.main(Example14_44.java:27)

    a b c 3 d 을 emit 하는데

    Character.isAlphabetic() 이 아니면 DataFormatException 을 발생시킵니다

     

    3이 emit 된 후 DataFormatException: Not Alphabetic Error 가 발생한 것을 확인 할 수 있습니다

    DataFormatException 은 checked Exception 이기 때문에 try ~ catch 로 반드시 처리해야 합니다

     

    2) onErrorReturn

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#error-java.util.function.Supplier-

    onErrorReturn() Operator 는 에러 이벤트가 발생 했을 때, 에러 이벤트를 Downstream 으로 전파하지 않고 대체 값을 emit 합니다 

     

    코드 14-45 onErrorReturn 예제 1

    public static void main(String[] args) {
      getBooks()
          .map(book -> book.getPenName().toUpperCase())
          .onErrorReturn("No pen name")
          .subscribe(log::info);
    }
    
    public static Flux<Book> getBooks() {
      return Flux.fromIterable(SampleData.books);
    }
    
    
    public class SampleData {
    	public static final List<Book> books = Arrays.asList(
    			new Book("Advance Java", "Tom", "Tom-boy", 25000, 100),
    			new Book("Advance Python", "Grace", "Grace-girl", 22000, 150),
    			new Book("Advance Reactor", "Smith", "David-boy", 35000, 200),
    			new Book("Getting started Java", "Tom", "Tom-boy", 32000, 230),
    			new Book("Advance Kotlin", "Kevin", "Kevin-boy", 32000, 250),
    			new Book("Advance Javascript", "Mike", "Tom-boy", 32000, 320),
    			new Book("Getting started Kotlin", "Kevin", "Kevin-boy", 32000, 150),
    			new Book("Getting started Python", "Grace", "Grace-girl", 32000, 200),
    			new Book("Getting started Reactor", "Smith", null, 32000, 250), #penName null
    			new Book("Getting started Javascript", "Mike", "David-boy", 32000, 330)
    	);
    }
    
    public class Book {
        private String bookName;
        private String authorName;
        private String penName;
        private int price;
        private int stockQuantity;
    }
    
    
    // result 
    > Task :Example14_45.main()
    18:36:26.761 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    18:36:26.889 [main] c.operator_5_error.Example14_45 INFO - TOM-BOY
    18:36:26.890 [main] c.operator_5_error.Example14_45 INFO - GRACE-GIRL
    18:36:26.890 [main] c.operator_5_error.Example14_45 INFO - DAVID-BOY
    18:36:26.891 [main] c.operator_5_error.Example14_45 INFO - TOM-BOY
    18:36:26.891 [main] c.operator_5_error.Example14_45 INFO - KEVIN-BOY
    18:36:26.891 [main] c.operator_5_error.Example14_45 INFO - TOM-BOY
    18:36:26.891 [main] c.operator_5_error.Example14_45 INFO - KEVIN-BOY
    18:36:26.891 [main] c.operator_5_error.Example14_45 INFO - GRACE-GIRL
    18:36:26.895 [main] c.operator_5_error.Example14_45 INFO - No pen name

    penName(필명)을 대문자로 변환합니다

    book.getPenName().toUpperCase()

    끝에서 두 번째 데이터인, Getting started Reactor 의 3번째 속성 penName 이 null 이므로 Error 가 발생합니다

    결과처럼 onErrorReturn 이 발생하고 no pen name 이 return 됩니다.

    Error 가 발생하는 데이터에서 Sequence 는 종료되며, 이후 데이터는 emit 되지 않습니다

     

    코드 14-46 처럼 Exception 을 지정하여 처리할 수도 있습니다.

    .onErrorReturn(NullPointerException.class, "no pen name")
    .onErrorReturn(IllegalFormatException.class, "Illegal pen name")

     

     

    3) onErrorResume

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#onErrorResume-java.lang.Class-java.util.function.Function-

    onErrorResume() 은 에러가 발생했을 때, 에러 이벤트를 Downstream 으로 전파하지 않고 대체 Publisher 를 리턴합니다

     

    코드 14-47 onErrorResume 예제

    public static void main(String[] args) {
      final String keyword = "DDD";
      getBooksFromCache(keyword)
          .onErrorResume(error -> getBooksFromDatabase(keyword))
          .subscribe(data -> log.info("# onNext: {}", data.getBookName()),
              error -> log.error("# onError: ", error));
    }
    
    public static Flux<Book> getBooksFromCache(final String keyword) {
      return Flux
          .fromIterable(SampleData.books)
          .filter(book -> book.getBookName().contains(keyword))
          .switchIfEmpty(Flux.error(new NoSuchBookException("No such Book")));
    }
    
    public static Flux<Book> getBooksFromDatabase(final String keyword) {
      List<Book> books = new ArrayList<>(SampleData.books);
      books.add(new Book("DDD: Domain Driven Design",
          "Joy", "ddd-man", 35000, 200));
      return Flux
          .fromIterable(books)
          .filter(book -> book.getBookName().contains(keyword))
          .switchIfEmpty(Flux.error(new NoSuchBookException("No such Book")));
    }
    
    private static class NoSuchBookException extends RuntimeException {
    
      NoSuchBookException(String message) {
        super(message);
      }
    }
    
    // result 
    > Task :Example14_47.main()
    22:07:05.851 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    22:07:05.958 [main] c.operator_5_error.Example14_47 INFO - # onNext: DDD: Domain Driven Design

    01. getBooksFromCache 메소드에서 DDD keyword 로 책이름을 검색하여 없으면 NowSuchBookException 을 발생시킵니다

    .filter(book -> book.getBookName().contains(keyword))
    .switchIfEmpty(Flux.error(new NoSuchBookException("No such Book")));

     

    02. .onErrorResume(error -> getBooksFromDatabase(keyword)) 에서 다시 검색을 합니다

    - DB 를 조회한 효과를 내기 위해  "DDD: Domain Driven Design" 책을 추가하고, filter 를 다시 수행합니다

     

    03. Subscriber 에서 "DDD: Domain Driven Design" 가 검색되어 출력됩니다

     

    4) onErrorContinue

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#onErrorContinue-java.util.function.BiConsumer-

     

    onErrorContinue() 는 에러가 발생했을 때, Sequence 를 종료시키지 않고, 아직 emit 되지 않은 데이터를 emit 합니다

    에러 영역 내에 있는 데이터는 제거하고, Upstream 에서 후속 데이터를 emit 하는 방식으로 에러를 복구할 수 있도록 해 줍니다.

     

    마블 다이어그램에서 노란색 원 데이터에서  에러가 발생했으나, 노란색을 제거하고

    다시 Upstream 으로 이동해서 파란색 원 데이터를 emit 하여 처리합니다

     

    onErrorContinue() 의 파라미터인 BiConsumer 함수형 인터페이스를 통해 에러 메시지와 에러가 발생했을 때 emit 된 데이터를 전달받아서 로그를 기록하는 등의 후처리를 할 수 있습니다.

     

    코드 14-48 onErrorContinue 예제

    public static void main(String[] args) {
      Flux
          .just(1, 2, 4, 0, 6, 12)
          .map(num -> 12 / num)
          .onErrorContinue((error, num) ->
              log.error("error: {}, num: {}", error, num))
          .subscribe(data -> log.info("# onNext: {}", data),
              error -> log.error("# onError: ", error));
    }
    // result
    > Task :Example14_48.main()
    22:17:41.865 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    22:17:41.877 [main] c.operator_5_error.Example14_48 INFO - # onNext: 12
    22:17:41.879 [main] c.operator_5_error.Example14_48 INFO - # onNext: 6
    22:17:41.879 [main] c.operator_5_error.Example14_48 INFO - # onNext: 3
    22:17:41.881 [main] c.operator_5_error.Example14_48 ERROR- error: java.lang.ArithmeticException: / by zero, num: 0
    22:17:41.882 [main] c.operator_5_error.Example14_48 INFO - # onNext: 2
    22:17:41.882 [main] c.operator_5_error.Example14_48 INFO - # onNext: 1

     

    4번째 데이터에서 12/0 을 하므로 ArithmeticException 이 발생합니다.

    .onErrorContinue((error, num) ->log.error("error: {}, num: {}", error, num))

    로 에러(error)와 데이터(num)를 함께 출력합니다.

    이후 계속 진행되어, 후속데이터들이 처리됩니다

     

    Reactor 공식 문서에서는 onErrorContinue() Operator 가 명확하지 않은 Sequence 의 동작으로 개발자가 의도하지 않은 상황을 발생시킬 수 있기 때문에 onErrorContinue() Operator 를 신중하게 사용하기를 권고합니다.
    대부분의 에러는 Operator 내부에서 doOnError() Operator 를 통해 로그를 기록하고 onErrorResume() Operator 등으로 처리할 수 있다고 명시합니다.

     

    5) retry

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#retry--

    retry() Operator 는 Publisher 가 데이터를 emit 하는 과정에서 에러가 발생하면 파라미터로 입력한 횟수 만큼 원본 Flux 의 Sequence를 다시 구독합니다

    만약 파라미터로 Long.MAX_VALUE 를 입력하면 재구독을 무한 반복합니다

     

    코드 14-49 retry 예제 

    public static void main(String[] args) throws InterruptedException {
      final int[] count = {1};
      Flux
          .range(1, 3)
          .delayElements(Duration.ofSeconds(1))
          .map(num -> {
            try {
              if (num == 3 && count[0] == 1) {
                count[0]++;
                Thread.sleep(1000);
              }
            } catch (InterruptedException e) {
            }
    
            return num;
          })
          .timeout(Duration.ofMillis(1500))
          .retry(1)
          .subscribe(data -> log.info("# onNext: {}", data),
              (error -> log.error("# onError: ", error)),
              () -> log.info("# onComplete"));
    
      Thread.sleep(7000);
    }
    // result 
    > Task :Example14_49.main()
    22:25:03.982 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    22:25:05.083 [parallel-2] c.operator_5_error.Example14_49 INFO - # onNext: 1
    22:25:06.092 [parallel-4] c.operator_5_error.Example14_49 INFO - # onNext: 2
    22:25:08.094 [parallel-6] reactor.core.publisher.Operators DEBUG- onNextDropped: 3
    22:25:08.593 [parallel-8] c.operator_5_error.Example14_49 INFO - # onNext: 1
    22:25:09.595 [parallel-10] c.operator_5_error.Example14_49 INFO - # onNext: 2
    22:25:10.599 [parallel-12] c.operator_5_error.Example14_49 INFO - # onNext: 3
    22:25:10.600 [parallel-12] c.operator_5_error.Example14_49 INFO - # onComplete

    retry() 를 사용해 데이터 emit 중에 에러가 발생하면 원복 Flux 의 Sequence 를 1회 재구독하도록 했습니다

    retry() Operator 는 특히 timeout() Operator 와 함께 사용하여 네트워크 지연으로 인해 정해진 시간 안에 응답을 받지 못하면 일정 횟수 만큼 재요청해야 하는 상황에서 유용하게 사용할 수 있습니다

     

    .timeout(Duration.ofMillis(1500))

    를 통해 1.5초 동안 Upstream 으로부터 emit 되는 데이터가 없으면 TimeoutException 이 발생하도록 했습니다

     

    .range(1, 3)
    .delayElements(Duration.ofSeconds(1))

    range() 후에 delayElements 를 사용하여 1초씩  delay 되게 하였습니다.

     

    if (num == 3 && count[0] == 1) {
      count[0]++;
      Thread.sleep(1000);
    }

    위 구문으로 3일이고 첫번째 시도일 때만 1초 더 delay 되게 하였습니다

     

    따라서 map() 이후 숫자 3이 emit 될 경우에는 TimeoutException이 발생하여 재구독 하게 합니다.

     

    실행 결과에서는 첫 번째 구독에서 숫자 3이 TimeoutException 으로 인해 Drop되는 것을 확인할 수 있으며, 

    다시 1회 재구독을 해서 Sequence 가 정상적으로 종료되는 것을 볼 수 있습니다.

     

    cold 이기에, 처음부터 다시 데이터 emit 되어 1,2 에러  1,2,3  의 데이터가 emit 됩니다

     

     

    코드 14-50

    public static void main(String[] args) throws InterruptedException {
      getBooks()
          .collect(Collectors.toSet())
          .subscribe(bookSet -> bookSet.stream()
              .forEach(book -> log.info("book name: {}, price: {}",
                  book.getBookName(), book.getPrice())));
    
      Thread.sleep(12000);
    }
    
    private static Flux<Book> getBooks() {
      final int[] count = {0};
      return Flux
          .fromIterable(SampleData.books)
          .delayElements(Duration.ofMillis(500))
          .map(book -> {
            try {
              count[0]++;
              if (count[0] == 3) {
                Thread.sleep(2000);
              }
            } catch (InterruptedException e) {
            }
    
            return book;
          })
          .timeout(Duration.ofSeconds(2))
          .retry(1)
          .doOnNext(book -> log.info("# getBooks > doOnNext: {}, price: {}",
              book.getBookName(), book.getPrice()));
    }
    // result
    > Task :Example14_50.main()
    22:38:26.636 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework
    22:38:27.271 [parallel-2] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Advance Java, price: 25000
    22:38:27.779 [parallel-4] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Advance Python, price: 22000
    22:38:30.286 [parallel-6] reactor.core.publisher.Operators DEBUG- onNextDropped: chapter14.Book@73f2018a
    22:38:30.286 [parallel-8] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Advance Java, price: 25000
    22:38:30.791 [parallel-10] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Advance Python, price: 22000
    22:38:31.296 [parallel-12] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Advance Reactor, price: 35000
    22:38:31.798 [parallel-14] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Getting started Java, price: 32000
    22:38:32.302 [parallel-16] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Advance Kotlin, price: 32000
    22:38:32.805 [parallel-2] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Advance Javascript, price: 32000
    22:38:33.307 [parallel-4] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Getting started Kotlin, price: 32000
    22:38:33.808 [parallel-6] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Getting started Python, price: 32000
    22:38:34.312 [parallel-8] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Getting started Reactor, price: 32000
    22:38:34.814 [parallel-10] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Getting started Javascript, price: 32000
    22:38:34.816 [parallel-10] c.operator_5_error.Example14_50 INFO - book name: Getting started Python, price: 32000
    22:38:34.816 [parallel-10] c.operator_5_error.Example14_50 INFO - book name: Getting started Kotlin, price: 32000
    22:38:34.816 [parallel-10] c.operator_5_error.Example14_50 INFO - book name: Advance Java, price: 25000
    22:38:34.817 [parallel-10] c.operator_5_error.Example14_50 INFO - book name: Advance Kotlin, price: 32000
    22:38:34.817 [parallel-10] c.operator_5_error.Example14_50 INFO - book name: Advance Python, price: 22000
    22:38:34.817 [parallel-10] c.operator_5_error.Example14_50 INFO - book name: Advance Reactor, price: 35000
    22:38:34.817 [parallel-10] c.operator_5_error.Example14_50 INFO - book name: Advance Javascript, price: 32000
    22:38:34.817 [parallel-10] c.operator_5_error.Example14_50 INFO - book name: Getting started Javascript, price: 32000
    22:38:34.817 [parallel-10] c.operator_5_error.Example14_50 INFO - book name: Getting started Java, price: 32000
    22:38:34.817 [parallel-10] c.operator_5_error.Example14_50 INFO - book name: Getting started Reactor, price: 32000

    코드 14-50은 getBooks() 메서드에서 HTTP 요청을 통해 도서 목록을 조회하는 중에 네트워크 지연으로 일정 시간이 지나면 1회 재요청하는 것을 시뮬레이션한 코드 입니다

     

    .delayElements(Duration.ofMillis(500))

    기본 delay 는 0.5초 입니다

     

    count[0]++;
    if (count[0] == 3) {
      Thread.sleep(2000);
    }

    3번째 데이터에서만 2초 delay 를 더 주었습니다

     

    .timeout(Duration.ofSeconds(2))
    .retry(1)

    를 통해 2초 timeout 을 주고, retry 1회 시도하도록 하였습니다.

     

    3번째 데이터 조회시 아래 처럼 Error 발생하여 droppped 되지만

    22:38:30.286 [parallel-6] reactor.core.publisher.Operators DEBUG- onNextDropped: chapter14.Book@73f2018a

     

    재시도하여 데이터가 다시 처리됩니다
    22:38:30.286 [parallel-8] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Advance Java, price: 25000

     

    중복을 무시하기 위해 set 으로 데이터를 수집합니다

    collect(Collectors.toSet())

    'Spring > Webflux' 카테고리의 다른 글

    14장 Operator 7 - split  (0) 2023.07.29
    14장 Operator 6 - time  (0) 2023.07.26
    14장 Operator 4 - peek  (0) 2023.07.22
    14장 Operator 3 - transformation  (0) 2023.07.16
    14장 Operator 2 - filter  (0) 2023.07.09

    댓글

Designed by Tistory.