-
14장 Operator 5 - ErrorSpring/Webflux 2023. 7. 22. 19:51
github : https://github.com/bjpublic/Spring-Reactive/tree/main/part2/src/main/java/chapter14/operator_5_error
질문
flatMap vs map ?
14.6 에러 처리를 위한 Operator
1) error
error() Operator 는 파라미터로 지정된 에러로 종료하는 Flux 를 생성합니다
error() 는 마치 throw 를 사용해서 예외를 의도적으로 던지는 것 같이 동작하는데
checked exception 을 캐치해서 다시 던져야 하는 경우에 사용합니다
코드 14-43 error 예제 1
public static void main(String[] args) { Flux .range(1, 5) .flatMap(num -> { if ((num * 2) % 3 == 0) { return Flux.error( new IllegalArgumentException("Not allowed multiple of 3")); } else { return Mono.just(num * 2); } }) .subscribe(data -> log.info("# onNext: {}", data), error -> log.error("# onError: ", error)); } // result > Task :Example14_43.main() 18:11:36.067 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 18:11:36.118 [main] c.operator_5_error.Example14_43 INFO - # onNext: 2 18:11:36.119 [main] c.operator_5_error.Example14_43 INFO - # onNext: 4 18:11:36.124 [main] c.operator_5_error.Example14_43 ERROR- # onError: java.lang.IllegalArgumentException: Not allowed multiple of 3 at chapter14.operator_5_error.Example14_43.lambda$main$0(Example14_43.java:18) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386) at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:156) at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:111) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371) at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:69) at reactor.core.publisher.Flux.subscribe(Flux.java:8466) at reactor.core.publisher.Flux.subscribeWith(Flux.java:8639) at reactor.core.publisher.Flux.subscribe(Flux.java:8436) at reactor.core.publisher.Flux.subscribe(Flux.java:8360) at reactor.core.publisher.Flux.subscribe(Flux.java:8330) at chapter14.operator_5_error.Example14_43.main(Example14_43.java:24)
1~5 까지 emit 하는데
num * 2 % 3 ==0 이면 illegalArgumentException 을 발생시킵니다
3이 emit 되고, Error 가 발생하는 것을 알 수 있습니다
코드 14-44 error 예제 2
public static void main(String[] args) { Flux .just('a', 'b', 'c', '3', 'd') .flatMap(letter -> { try { return convert(letter); } catch (DataFormatException e) { return Flux.error(e); } }) .subscribe(data -> log.info("# onNext: {}", data), error -> log.error("# onError: ", error)); } private static Mono<String> convert(char ch) throws DataFormatException { if (!Character.isAlphabetic(ch)) { throw new DataFormatException("Not Alphabetic"); } return Mono.just("Converted to " + Character.toUpperCase(ch)); } // result > Task :Example14_44.main() 18:16:23.246 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 18:16:23.293 [main] c.operator_5_error.Example14_44 INFO - # onNext: Converted to A 18:16:23.294 [main] c.operator_5_error.Example14_44 INFO - # onNext: Converted to B 18:16:23.294 [main] c.operator_5_error.Example14_44 INFO - # onNext: Converted to C 18:16:23.299 [main] c.operator_5_error.Example14_44 ERROR- # onError: java.util.zip.DataFormatException: Not Alphabetic at chapter14.operator_5_error.Example14_44.convert(Example14_44.java:33) at chapter14.operator_5_error.Example14_44.lambda$main$0(Example14_44.java:22) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386) at reactor.core.publisher.FluxArray$ArraySubscription.slowPath(FluxArray.java:127) at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:100) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371) at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53) at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59) at reactor.core.publisher.Flux.subscribe(Flux.java:8466) at reactor.core.publisher.Flux.subscribeWith(Flux.java:8639) at reactor.core.publisher.Flux.subscribe(Flux.java:8436) at reactor.core.publisher.Flux.subscribe(Flux.java:8360) at reactor.core.publisher.Flux.subscribe(Flux.java:8330) at chapter14.operator_5_error.Example14_44.main(Example14_44.java:27)
a b c 3 d 을 emit 하는데
Character.isAlphabetic() 이 아니면 DataFormatException 을 발생시킵니다
3이 emit 된 후 DataFormatException: Not Alphabetic Error 가 발생한 것을 확인 할 수 있습니다
DataFormatException 은 checked Exception 이기 때문에 try ~ catch 로 반드시 처리해야 합니다
2) onErrorReturn
onErrorReturn() Operator 는 에러 이벤트가 발생 했을 때, 에러 이벤트를 Downstream 으로 전파하지 않고 대체 값을 emit 합니다
코드 14-45 onErrorReturn 예제 1
public static void main(String[] args) { getBooks() .map(book -> book.getPenName().toUpperCase()) .onErrorReturn("No pen name") .subscribe(log::info); } public static Flux<Book> getBooks() { return Flux.fromIterable(SampleData.books); } public class SampleData { public static final List<Book> books = Arrays.asList( new Book("Advance Java", "Tom", "Tom-boy", 25000, 100), new Book("Advance Python", "Grace", "Grace-girl", 22000, 150), new Book("Advance Reactor", "Smith", "David-boy", 35000, 200), new Book("Getting started Java", "Tom", "Tom-boy", 32000, 230), new Book("Advance Kotlin", "Kevin", "Kevin-boy", 32000, 250), new Book("Advance Javascript", "Mike", "Tom-boy", 32000, 320), new Book("Getting started Kotlin", "Kevin", "Kevin-boy", 32000, 150), new Book("Getting started Python", "Grace", "Grace-girl", 32000, 200), new Book("Getting started Reactor", "Smith", null, 32000, 250), #penName null new Book("Getting started Javascript", "Mike", "David-boy", 32000, 330) ); } public class Book { private String bookName; private String authorName; private String penName; private int price; private int stockQuantity; } // result > Task :Example14_45.main() 18:36:26.761 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 18:36:26.889 [main] c.operator_5_error.Example14_45 INFO - TOM-BOY 18:36:26.890 [main] c.operator_5_error.Example14_45 INFO - GRACE-GIRL 18:36:26.890 [main] c.operator_5_error.Example14_45 INFO - DAVID-BOY 18:36:26.891 [main] c.operator_5_error.Example14_45 INFO - TOM-BOY 18:36:26.891 [main] c.operator_5_error.Example14_45 INFO - KEVIN-BOY 18:36:26.891 [main] c.operator_5_error.Example14_45 INFO - TOM-BOY 18:36:26.891 [main] c.operator_5_error.Example14_45 INFO - KEVIN-BOY 18:36:26.891 [main] c.operator_5_error.Example14_45 INFO - GRACE-GIRL 18:36:26.895 [main] c.operator_5_error.Example14_45 INFO - No pen name
penName(필명)을 대문자로 변환합니다
book.getPenName().toUpperCase()
끝에서 두 번째 데이터인, Getting started Reactor 의 3번째 속성 penName 이 null 이므로 Error 가 발생합니다
결과처럼 onErrorReturn 이 발생하고 no pen name 이 return 됩니다.
Error 가 발생하는 데이터에서 Sequence 는 종료되며, 이후 데이터는 emit 되지 않습니다
코드 14-46 처럼 Exception 을 지정하여 처리할 수도 있습니다.
.onErrorReturn(NullPointerException.class, "no pen name") .onErrorReturn(IllegalFormatException.class, "Illegal pen name")
3) onErrorResume
onErrorResume() 은 에러가 발생했을 때, 에러 이벤트를 Downstream 으로 전파하지 않고 대체 Publisher 를 리턴합니다
코드 14-47 onErrorResume 예제
public static void main(String[] args) { final String keyword = "DDD"; getBooksFromCache(keyword) .onErrorResume(error -> getBooksFromDatabase(keyword)) .subscribe(data -> log.info("# onNext: {}", data.getBookName()), error -> log.error("# onError: ", error)); } public static Flux<Book> getBooksFromCache(final String keyword) { return Flux .fromIterable(SampleData.books) .filter(book -> book.getBookName().contains(keyword)) .switchIfEmpty(Flux.error(new NoSuchBookException("No such Book"))); } public static Flux<Book> getBooksFromDatabase(final String keyword) { List<Book> books = new ArrayList<>(SampleData.books); books.add(new Book("DDD: Domain Driven Design", "Joy", "ddd-man", 35000, 200)); return Flux .fromIterable(books) .filter(book -> book.getBookName().contains(keyword)) .switchIfEmpty(Flux.error(new NoSuchBookException("No such Book"))); } private static class NoSuchBookException extends RuntimeException { NoSuchBookException(String message) { super(message); } } // result > Task :Example14_47.main() 22:07:05.851 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 22:07:05.958 [main] c.operator_5_error.Example14_47 INFO - # onNext: DDD: Domain Driven Design
01. getBooksFromCache 메소드에서 DDD keyword 로 책이름을 검색하여 없으면 NowSuchBookException 을 발생시킵니다
.filter(book -> book.getBookName().contains(keyword)) .switchIfEmpty(Flux.error(new NoSuchBookException("No such Book")));
02. .onErrorResume(error -> getBooksFromDatabase(keyword)) 에서 다시 검색을 합니다
- DB 를 조회한 효과를 내기 위해 "DDD: Domain Driven Design" 책을 추가하고, filter 를 다시 수행합니다
03. Subscriber 에서 "DDD: Domain Driven Design" 가 검색되어 출력됩니다
4) onErrorContinue
onErrorContinue() 는 에러가 발생했을 때, Sequence 를 종료시키지 않고, 아직 emit 되지 않은 데이터를 emit 합니다
에러 영역 내에 있는 데이터는 제거하고, Upstream 에서 후속 데이터를 emit 하는 방식으로 에러를 복구할 수 있도록 해 줍니다.
마블 다이어그램에서 노란색 원 데이터에서 에러가 발생했으나, 노란색을 제거하고
다시 Upstream 으로 이동해서 파란색 원 데이터를 emit 하여 처리합니다
onErrorContinue() 의 파라미터인 BiConsumer 함수형 인터페이스를 통해 에러 메시지와 에러가 발생했을 때 emit 된 데이터를 전달받아서 로그를 기록하는 등의 후처리를 할 수 있습니다.
코드 14-48 onErrorContinue 예제
public static void main(String[] args) { Flux .just(1, 2, 4, 0, 6, 12) .map(num -> 12 / num) .onErrorContinue((error, num) -> log.error("error: {}, num: {}", error, num)) .subscribe(data -> log.info("# onNext: {}", data), error -> log.error("# onError: ", error)); } // result > Task :Example14_48.main() 22:17:41.865 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 22:17:41.877 [main] c.operator_5_error.Example14_48 INFO - # onNext: 12 22:17:41.879 [main] c.operator_5_error.Example14_48 INFO - # onNext: 6 22:17:41.879 [main] c.operator_5_error.Example14_48 INFO - # onNext: 3 22:17:41.881 [main] c.operator_5_error.Example14_48 ERROR- error: java.lang.ArithmeticException: / by zero, num: 0 22:17:41.882 [main] c.operator_5_error.Example14_48 INFO - # onNext: 2 22:17:41.882 [main] c.operator_5_error.Example14_48 INFO - # onNext: 1
4번째 데이터에서 12/0 을 하므로 ArithmeticException 이 발생합니다.
.onErrorContinue((error, num) ->log.error("error: {}, num: {}", error, num))
로 에러(error)와 데이터(num)를 함께 출력합니다.
이후 계속 진행되어, 후속데이터들이 처리됩니다
Reactor 공식 문서에서는 onErrorContinue() Operator 가 명확하지 않은 Sequence 의 동작으로 개발자가 의도하지 않은 상황을 발생시킬 수 있기 때문에 onErrorContinue() Operator 를 신중하게 사용하기를 권고합니다.
대부분의 에러는 Operator 내부에서 doOnError() Operator 를 통해 로그를 기록하고 onErrorResume() Operator 등으로 처리할 수 있다고 명시합니다.5) retry
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#retry--
retry() Operator 는 Publisher 가 데이터를 emit 하는 과정에서 에러가 발생하면 파라미터로 입력한 횟수 만큼 원본 Flux 의 Sequence를 다시 구독합니다
만약 파라미터로 Long.MAX_VALUE 를 입력하면 재구독을 무한 반복합니다
코드 14-49 retry 예제
public static void main(String[] args) throws InterruptedException { final int[] count = {1}; Flux .range(1, 3) .delayElements(Duration.ofSeconds(1)) .map(num -> { try { if (num == 3 && count[0] == 1) { count[0]++; Thread.sleep(1000); } } catch (InterruptedException e) { } return num; }) .timeout(Duration.ofMillis(1500)) .retry(1) .subscribe(data -> log.info("# onNext: {}", data), (error -> log.error("# onError: ", error)), () -> log.info("# onComplete")); Thread.sleep(7000); } // result > Task :Example14_49.main() 22:25:03.982 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 22:25:05.083 [parallel-2] c.operator_5_error.Example14_49 INFO - # onNext: 1 22:25:06.092 [parallel-4] c.operator_5_error.Example14_49 INFO - # onNext: 2 22:25:08.094 [parallel-6] reactor.core.publisher.Operators DEBUG- onNextDropped: 3 22:25:08.593 [parallel-8] c.operator_5_error.Example14_49 INFO - # onNext: 1 22:25:09.595 [parallel-10] c.operator_5_error.Example14_49 INFO - # onNext: 2 22:25:10.599 [parallel-12] c.operator_5_error.Example14_49 INFO - # onNext: 3 22:25:10.600 [parallel-12] c.operator_5_error.Example14_49 INFO - # onComplete
retry() 를 사용해 데이터 emit 중에 에러가 발생하면 원복 Flux 의 Sequence 를 1회 재구독하도록 했습니다
retry() Operator 는 특히 timeout() Operator 와 함께 사용하여 네트워크 지연으로 인해 정해진 시간 안에 응답을 받지 못하면 일정 횟수 만큼 재요청해야 하는 상황에서 유용하게 사용할 수 있습니다
.timeout(Duration.ofMillis(1500))
를 통해 1.5초 동안 Upstream 으로부터 emit 되는 데이터가 없으면 TimeoutException 이 발생하도록 했습니다
.range(1, 3)
.delayElements(Duration.ofSeconds(1))range() 후에 delayElements 를 사용하여 1초씩 delay 되게 하였습니다.
if (num == 3 && count[0] == 1) {
count[0]++;
Thread.sleep(1000);
}위 구문으로 3일이고 첫번째 시도일 때만 1초 더 delay 되게 하였습니다
따라서 map() 이후 숫자 3이 emit 될 경우에는 TimeoutException이 발생하여 재구독 하게 합니다.
실행 결과에서는 첫 번째 구독에서 숫자 3이 TimeoutException 으로 인해 Drop되는 것을 확인할 수 있으며,
다시 1회 재구독을 해서 Sequence 가 정상적으로 종료되는 것을 볼 수 있습니다.
cold 이기에, 처음부터 다시 데이터 emit 되어 1,2 에러 1,2,3 의 데이터가 emit 됩니다
코드 14-50
public static void main(String[] args) throws InterruptedException { getBooks() .collect(Collectors.toSet()) .subscribe(bookSet -> bookSet.stream() .forEach(book -> log.info("book name: {}, price: {}", book.getBookName(), book.getPrice()))); Thread.sleep(12000); } private static Flux<Book> getBooks() { final int[] count = {0}; return Flux .fromIterable(SampleData.books) .delayElements(Duration.ofMillis(500)) .map(book -> { try { count[0]++; if (count[0] == 3) { Thread.sleep(2000); } } catch (InterruptedException e) { } return book; }) .timeout(Duration.ofSeconds(2)) .retry(1) .doOnNext(book -> log.info("# getBooks > doOnNext: {}, price: {}", book.getBookName(), book.getPrice())); } // result > Task :Example14_50.main() 22:38:26.636 [main] reactor.util.Loggers DEBUG- Using Slf4j logging framework 22:38:27.271 [parallel-2] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Advance Java, price: 25000 22:38:27.779 [parallel-4] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Advance Python, price: 22000 22:38:30.286 [parallel-6] reactor.core.publisher.Operators DEBUG- onNextDropped: chapter14.Book@73f2018a 22:38:30.286 [parallel-8] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Advance Java, price: 25000 22:38:30.791 [parallel-10] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Advance Python, price: 22000 22:38:31.296 [parallel-12] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Advance Reactor, price: 35000 22:38:31.798 [parallel-14] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Getting started Java, price: 32000 22:38:32.302 [parallel-16] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Advance Kotlin, price: 32000 22:38:32.805 [parallel-2] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Advance Javascript, price: 32000 22:38:33.307 [parallel-4] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Getting started Kotlin, price: 32000 22:38:33.808 [parallel-6] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Getting started Python, price: 32000 22:38:34.312 [parallel-8] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Getting started Reactor, price: 32000 22:38:34.814 [parallel-10] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Getting started Javascript, price: 32000 22:38:34.816 [parallel-10] c.operator_5_error.Example14_50 INFO - book name: Getting started Python, price: 32000 22:38:34.816 [parallel-10] c.operator_5_error.Example14_50 INFO - book name: Getting started Kotlin, price: 32000 22:38:34.816 [parallel-10] c.operator_5_error.Example14_50 INFO - book name: Advance Java, price: 25000 22:38:34.817 [parallel-10] c.operator_5_error.Example14_50 INFO - book name: Advance Kotlin, price: 32000 22:38:34.817 [parallel-10] c.operator_5_error.Example14_50 INFO - book name: Advance Python, price: 22000 22:38:34.817 [parallel-10] c.operator_5_error.Example14_50 INFO - book name: Advance Reactor, price: 35000 22:38:34.817 [parallel-10] c.operator_5_error.Example14_50 INFO - book name: Advance Javascript, price: 32000 22:38:34.817 [parallel-10] c.operator_5_error.Example14_50 INFO - book name: Getting started Javascript, price: 32000 22:38:34.817 [parallel-10] c.operator_5_error.Example14_50 INFO - book name: Getting started Java, price: 32000 22:38:34.817 [parallel-10] c.operator_5_error.Example14_50 INFO - book name: Getting started Reactor, price: 32000
코드 14-50은 getBooks() 메서드에서 HTTP 요청을 통해 도서 목록을 조회하는 중에 네트워크 지연으로 일정 시간이 지나면 1회 재요청하는 것을 시뮬레이션한 코드 입니다
.delayElements(Duration.ofMillis(500))
기본 delay 는 0.5초 입니다
count[0]++;
if (count[0] == 3) {
Thread.sleep(2000);
}3번째 데이터에서만 2초 delay 를 더 주었습니다
.timeout(Duration.ofSeconds(2))
.retry(1)를 통해 2초 timeout 을 주고, retry 1회 시도하도록 하였습니다.
3번째 데이터 조회시 아래 처럼 Error 발생하여 droppped 되지만
22:38:30.286 [parallel-6] reactor.core.publisher.Operators DEBUG- onNextDropped: chapter14.Book@73f2018a
재시도하여 데이터가 다시 처리됩니다
22:38:30.286 [parallel-8] c.operator_5_error.Example14_50 INFO - # getBooks > doOnNext: Advance Java, price: 25000중복을 무시하기 위해 set 으로 데이터를 수집합니다
collect(Collectors.toSet())
'Spring > Webflux' 카테고리의 다른 글
14장 Operator 7 - split (0) 2023.07.29 14장 Operator 6 - time (0) 2023.07.26 14장 Operator 4 - peek (0) 2023.07.22 14장 Operator 3 - transformation (0) 2023.07.16 14장 Operator 2 - filter (0) 2023.07.09