ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 12장 Debugging
    Spring/Webflux 2023. 5. 21. 16:51

    12.1.0 Reactor에서의 디버깅 방법

    동기식, 명령형 프로그래밍에서는 Exception이 발생했을 때 Stacktrace 를 확인하거나 예외 발생이 예상되는 코드에 Breakpoint 를 걸어서 문제가 발생한 원인을 단계적으로 찾아가면 되기 때문에 상대적으로 디버깅이 쉽습니다.

    반면에 Reactor 는 작업이 비동기적으로 실행되고, Reactor Sequence 는 선언형 프로그래밍 방식으로 구성되므로 디버깅이 쉽지 않습니다.

     


    12.1.1 Debug Mode를 사용한 디버깅

    Hooks.onOperatorDebug() 를 사용해서 Debug Mode 를 활성화 합니다.

     

    example 12_1 : 4th 데이터의 key grape 인데, MELONS 를 읽어서 오류가 발생하는 코드

    public static Map<String, String> fruits = new HashMap<>();
    
    static {
        fruits.put("banana", "바나나");
        fruits.put("apple", "사과");
        fruits.put("pear", "배");
        fruits.put("grape", "포도");
    }
    
    public static void main(String[] args) throws InterruptedException {
        Hooks.onOperatorDebug();
    
        Flux
                .fromArray(new String[]{"BANANAS", "APPLES", "PEARS", "MELONS"})
                .subscribeOn(Schedulers.boundedElastic())
                .publishOn(Schedulers.parallel())
                .map(String::toLowerCase)
                .map(fruit -> fruit.substring(0, fruit.length() - 1))
                .map(fruits::get)
                .map(translated -> "맛있는 " + translated)
                .subscribe(
                        log::info,
                        error -> log.error("# onError:", error));
    
        Thread.sleep(100L);
    }
    
    // result1 : hooks 사용 전
    > Task :Example12_1.main()
    17:20:32.900 [main] DEBUG- Using Slf4j logging framework
    17:20:32.959 [parallel-1] INFO - 맛있는 바나나
    17:20:32.961 [parallel-1] INFO - 맛있는 사과
    17:20:32.961 [parallel-1] INFO - 맛있는 배
    17:20:32.989 [parallel-1] ERROR- # onError:
    java.lang.NullPointerException: The mapper [chapter12.Example12_1$$Lambda$58/0x0000000800153840] returned a null value.
    	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115)
    	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
    	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
    	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
    	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
    	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    	at java.base/java.lang.Thread.run(Thread.java:829)
    
    // result2 : Hooks.onOperatorDebug(); 사용 후
    > Task :Example12_1.main()
    16:14:49.713 [main] DEBUG- Using Slf4j logging framework
    16:14:49.715 [main] DEBUG- Enabling stacktrace debugging via onOperatorDebug
    16:14:49.836 [parallel-1] INFO - 맛있는 바나나
    16:14:49.838 [parallel-1] INFO - 맛있는 사과
    16:14:49.838 [parallel-1] INFO - 맛있는 배
    16:14:49.856 [parallel-1] ERROR- # onError:
    java.lang.NullPointerException: The mapper [chapter12.Example12_1$$Lambda$66/0x0000000800168040] returned a null value.
    	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115)
    	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
    Assembly trace from producer [reactor.core.publisher.FluxMapFuseable] :
    	reactor.core.publisher.Flux.map(Flux.java:6271)
    	chapter12.Example12_1.main(Example12_1.java:35)
    Error has been observed at the following site(s):
    	*__Flux.map ⇢ at chapter12.Example12_1.main(Example12_1.java:35)
    	|_ Flux.map ⇢ at chapter12.Example12_1.main(Example12_1.java:36)
    Original Stack Trace:
    		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115)
    		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
    		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
    		at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
    		at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
    		at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    		at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    		at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    		at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    		at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    		at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    		at java.base/java.lang.Thread.run(Thread.java:829)

    line 19 의 .map(fruits::get)  에서 4th 데이터인 MELONS 을 읽을 때 오류가 발생합니다.

    java.lang.NullPointerException: The mapper [chapter12.Example12_1$$Lambda$66/0x0000000800168040] returned a null value.
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115)

     

    Hooks.onOperatorDebug()

    를 사용하면

    • Assembly trace from producer 와
    • Error has been observed at the following site

    구문이 추가로 노출된다.

    Assembly trace from producer [reactor.core.publisher.FluxMapFuseable] :
    reactor.core.publisher.Flux.map(Flux.java:6271)
    chapter12.Example12_1.main(Example12_1.java:35)

     

    Error has been observed at the following site(s):
    *__Flux.map ⇢ at chapter12.Example12_1.main(Example12_1.java:35)
    |_ Flux.map ⇢ at chapter12.Example12_1.main(Example12_1.java:36)

     

    에러 발생 line number 를 확인 할 수 있다.

    chapter12.Example12_1.main(Example12_1.java:35)

     

    또 에러가 시작된 지점부터 에러 전파 상태를 친절하게 표시해 주고 있습니다

    *__Flux.map ⇢ at chapter12.Example12_1.main(Example12_1.java:35)
    |_ Flux.map ⇢ at chapter12.Example12_1.main(Example12_1.java:36)

     

    사용 전 후 비교

    Hooks.onOperatorDebug() 사용의 한계

    : 디버그 모드의 활성화는 다음과 같이 애플리케이션 내에서 비용이 많이 드는 동작 과정을 거칩니다.

    • 애플리케이션 내에 있는 모든 Operator 의 Stacktrace 를 Capture 한다
    • 에러가 발생하면 캡처한 정보를 기반으로 에러가 발생한 Assembly 의 Stacktrace를 원본 Stracktrace 중간에 끼워 넣는다.

     

    Assembly

    Reactor 공식 문서를 읽다 보면 Assembly 라는 용어를 종종 만납니다. Reactor 의 Operator 들은 대부분 Mono 또는 Flux 를 리턴하기 때문에 Operator 체인을 형성할 수 있습니다. 이처럼 Operator 에서 리턴하는 새로운 Mono 또는 Flux 가 선언된 지점을 Assembly 라고 합니다.

     

    Traceback

    디버그 모드를 활성화하면 Operator 의 Assembly 정보를 캡처하는데 이중에서 에러가 발생한 Operator 의 stacktrace 를 캡처하는데 이중에서 에러가 발생한 Operator 의 stacktrace 를 캡처한 Assembly 정보를 Traceback 이라고 합니다.

     

    프로덕션 (Production) 환경에서의 디버깅 설정

    Reactor 에서는 애플리케이션 내 모든 Operator 체인의 스택트레이스 캡처 비용을 지불하지 않고 디버깅 정보를 추가할 수 있도록 별도의 Java agent 를 제공합니다.

    예를 들어, 여러분이 Spring Webflux 기반의 애플리케이션을 제작하여 프로덕션 환경에서 사용할 목적이라면 build.gradle 파일의  dependencies 항목에 

    compile io.projectreactor:reactor-tools

    를 추가하여 ReactorDebugAent 를 활성화할 수 있습니다.

    ReactorDebugAgent 가 클래스 경로에 존재하고 spring.reactor.dbug-agent.enabled: true (default:true)이면 애플리케이션 시작 시 ReactorDebugAgent.init() 이 자동으로 호출되면서 ReactorDebugAgent 가 활성화 됩니다.

     

    12.1.2 checkpoint( ) Operator를 사용한 디버깅

    디버그 모드를 활성화하는 방법이 애플리케이션 내에 있는 모든 Operator 에서 stacktrace 를 캡처하는 반면에, checkpoint() Operator 를 사용하면 특정 Operator 체인 내의 stacktrace 만 캡쳐합니다.

     

    TraceBack 을 출력하는 방법

    checkpoint()를 사용하면 실제 에러가 발생한 assembly 지점 또는 에러가 전파된 assembly 지점의 traceback 이 추가됩니다.

     

    example 12_2 : 4번째 연산에서 8/0 을 하여 에러를 발생시키는 코드

    public static void main(String[] args) {
        Flux
            .just(2, 4, 6, 8)
            .zipWith(Flux.just(1, 2, 3, 0), (x, y) -> x/y)
            .map(num -> num + 2)
            .checkpoint()
            .subscribe(
                    data -> log.info("# onNext: {}", data),
                    error -> log.error("# onError:", error)
            );
    }
    
    //result
    > Task :Example12_2.main()
    18:07:22.703 [main] DEBUG- Using Slf4j logging framework
    18:07:22.725 [main] INFO - # onNext: 4
    18:07:22.727 [main] INFO - # onNext: 4
    18:07:22.727 [main] INFO - # onNext: 4
    18:07:22.738 [main] ERROR- # onError:
    java.lang.ArithmeticException: / by zero
    	at chapter12.Example12_2.lambda$main$0(Example12_2.java:15)
    	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
    Assembly trace from producer [reactor.core.publisher.FluxMap] :
    	reactor.core.publisher.Flux.checkpoint(Flux.java:3352)
    	chapter12.Example12_2.main(Example12_2.java:17)
    Error has been observed at the following site(s):
    	*__checkpoint() ⇢ at chapter12.Example12_2.main(Example12_2.java:17)
    Original Stack Trace:
    		at chapter12.Example12_2.lambda$main$0(Example12_2.java:15)
    		at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:982)
    		at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:971)
    		at reactor.core.publisher.FluxZip$ZipCoordinator.drain(FluxZip.java:738)
    		at reactor.core.publisher.FluxZip$ZipInner.onSubscribe(FluxZip.java:888)
    		at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
    		at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
    		at reactor.core.publisher.FluxZip$ZipCoordinator.subscribe(FluxZip.java:595)
    		at reactor.core.publisher.FluxZip.handleBoth(FluxZip.java:332)
    		at reactor.core.publisher.FluxZip.handleArrayMode(FluxZip.java:273)
    		at reactor.core.publisher.FluxZip.subscribe(FluxZip.java:137)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
    		at reactor.core.publisher.Flux.subscribeWith(Flux.java:8639)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8436)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8360)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8330)
    		at chapter12.Example12_2.main(Example12_2.java:18)

     

    zipWith() 에서 /0 을 통해 에러를 발생시키지만 .checkpoint() 를 map() 밑에더 붙여서 error line 을 정확히 말해주지 못하고 있습니다.

    .zipWith(Flux.just(1, 2, 3, 0), (x, y) -> x/y)
    .map(num -> num + 2)
    .checkpoint()

     

    public static void main(String[] args) {
        Flux
            .just(2, 4, 6, 8)
            .zipWith(Flux.just(1, 2, 3, 0), (x, y) -> x/y)
            .checkpoint()
            .map(num -> num + 2)
            //.checkpoint()
            .subscribe(
                    data -> log.info("# onNext: {}", data),
                    error -> log.error("# onError:", error)
            );
    }
    // result
    > Task :Example12_3.main()
    18:15:04.878 [main] DEBUG- Using Slf4j logging framework
    18:15:04.897 [main] INFO - # onNext: 4
    18:15:04.898 [main] INFO - # onNext: 4
    18:15:04.898 [main] INFO - # onNext: 4
    18:15:04.910 [main] ERROR- # onError:
    java.lang.ArithmeticException: / by zero
    	at chapter12.Example12_3.lambda$main$0(Example12_3.java:15)
    	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
    Assembly trace from producer [reactor.core.publisher.FluxZip] :
    	reactor.core.publisher.Flux.checkpoint(Flux.java:3352)
    	chapter12.Example12_3.main(Example12_3.java:16)
    Error has been observed at the following site(s):
    	*__checkpoint() ⇢ at chapter12.Example12_3.main(Example12_3.java:16)
    Original Stack Trace:
    		at chapter12.Example12_3.lambda$main$0(Example12_3.java:15)
    		at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:982)
    		at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:971)
    		at reactor.core.publisher.FluxZip$ZipCoordinator.drain(FluxZip.java:738)
    		at reactor.core.publisher.FluxZip$ZipInner.onSubscribe(FluxZip.java:888)
    		at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
    		at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
    		at reactor.core.publisher.FluxZip$ZipCoordinator.subscribe(FluxZip.java:595)
    		at reactor.core.publisher.FluxZip.handleBoth(FluxZip.java:332)
    		at reactor.core.publisher.FluxZip.handleArrayMode(FluxZip.java:273)
    		at reactor.core.publisher.FluxZip.subscribe(FluxZip.java:137)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
    		at reactor.core.publisher.Flux.subscribeWith(Flux.java:8639)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8436)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8360)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8330)
    		at chapter12.Example12_3.main(Example12_3.java:19)

     

    checkpoint() 를 정확하게 붙여주니, Error line 을 정확히 표시해 주고 있습니다.

    java.lang.ArithmeticException: / by zero
    at chapter12.Example12_3.lambda$main$0(Example12_3.java:15)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
    Assembly trace from producer [reactor.core.publisher.FluxZip] :
    reactor.core.publisher.Flux.checkpoint(Flux.java:3352)
    chapter12.Example12_3.main(Example12_3.java:16)
    Error has been observed at the following site(s):
    *__checkpoint() ⇢ at chapter12.Example12_3.main(Example12_3.java:16)

     

    zipwith()

    원본Flux 와 zipwith()의 파라미터로 입력된 Flux를 하나의 Flux 로 합치고 있습니다.

    원본에서 x 를 받고 zipWith( 내부Flux ) 에서  y 를 받아 연산 (x,y) -> x/y

     

     

    checkpoint  Description only

    example 12_4 : Trackback 출력 없이 식별자를 포함한 Description 을 출력해서 에러 발생 지점을 예상하는 방법

    public static void main(String[] args) {
        Flux
            .just(2, 4, 6, 8)
            .zipWith(Flux.just(1, 2, 3, 0), (x, y) -> x/y)
            .checkpoint("Example12_4.zipWith.checkpoint")
            .map(num -> num + 2)
            .checkpoint("Example12_4.map.checkpoint")
            .subscribe(
                    data -> log.info("# onNext: {}", data),
                    error -> log.error("# onError:", error)
            );
    }
    
    // result
    > Task :Example12_4.main()
    22:31:25.856 [main] DEBUG- Using Slf4j logging framework
    22:31:25.870 [main] INFO - # onNext: 4
    22:31:25.871 [main] INFO - # onNext: 4
    22:31:25.871 [main] INFO - # onNext: 4
    22:31:25.881 [main] ERROR- # onError:
    java.lang.ArithmeticException: / by zero
    	at chapter12.Example12_4.lambda$main$0(Example12_4.java:16)
    	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
    Error has been observed at the following site(s):
    	*__checkpoint ⇢ Example12_4.zipWith.checkpoint
    	|_ checkpoint ⇢ Example12_4.map.checkpoint
    Original Stack Trace:
    		at chapter12.Example12_4.lambda$main$0(Example12_4.java:16)
    		at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:982)
    		at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:971)
    		at reactor.core.publisher.FluxZip$ZipCoordinator.drain(FluxZip.java:738)
    		at reactor.core.publisher.FluxZip$ZipInner.onSubscribe(FluxZip.java:888)
    		at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
    		at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
    		at reactor.core.publisher.FluxZip$ZipCoordinator.subscribe(FluxZip.java:595)
    		at reactor.core.publisher.FluxZip.handleBoth(FluxZip.java:332)
    		at reactor.core.publisher.FluxZip.handleArrayMode(FluxZip.java:273)
    		at reactor.core.publisher.FluxZip.subscribe(FluxZip.java:137)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
    		at reactor.core.publisher.Flux.subscribeWith(Flux.java:8639)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8436)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8360)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8330)
    		at chapter12.Example12_4.main(Example12_4.java:20)

    아래 처럼 입력한 Description 이 출력됩니다.

    Error has been observed at the following site(s):
    *__checkpoint ⇢ Example12_4.zipWith.checkpoint
    |_ checkpoint ⇢ Example12_4.map.checkpoint

     

     

    checkpoint  & Description 

    checkpoint 에 description 과 trackback 을 둘 다 나오게 하는 방법입니다.

     

    public final Flux<T> checkpoint(@Nullable String description, boolean forceStackTrace) {
        final AssemblySnapshot stacktrace;
        if (!forceStackTrace) {
            stacktrace = new CheckpointLightSnapshot(description);
        }
        else {
            stacktrace = new CheckpointHeavySnapshot(description, Traces.callSiteSupplierFactory.get());
        }
    
        return new FluxOnAssembly<>(this, stacktrace);
    }

     

    example 12_2 : checkpoint(description,froceStackTrace=true) 예제

    public static void main(String[] args) {
    		Flux
    				.just(2, 4, 6, 8)
    				.zipWith(Flux.just(1, 2, 3, 0), (x, y) -> x/y)
    				.checkpoint("Example12_4.zipWith.checkpoint", true)
    				.map(num -> num + 2)
    				.checkpoint("Example12_4.map.checkpoint", true)
    				.subscribe(
    								data -> log.info("# onNext: {}", data),
    								error -> log.error("# onError:", error)
    				);
    }
    
    // result 
    
    > Task :Example12_5.main()
    22:42:34.698 [main] DEBUG- Using Slf4j logging framework
    22:42:34.717 [main] INFO - # onNext: 4
    22:42:34.719 [main] INFO - # onNext: 4
    22:42:34.719 [main] INFO - # onNext: 4
    22:42:34.731 [main] ERROR- # onError:
    java.lang.ArithmeticException: / by zero
    	at chapter12.Example12_5.lambda$main$0(Example12_5.java:16)
    	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
    Assembly trace from producer [reactor.core.publisher.FluxZip], described as [Example12_4.zipWith.checkpoint] :
    	reactor.core.publisher.Flux.checkpoint(Flux.java:3417)
    	chapter12.Example12_5.main(Example12_5.java:17)
    Error has been observed at the following site(s):
    	*__checkpoint(Example12_4.zipWith.checkpoint) ⇢ at chapter12.Example12_5.main(Example12_5.java:17)
    	|_     checkpoint(Example12_4.map.checkpoint) ⇢ at chapter12.Example12_5.main(Example12_5.java:19)
    Original Stack Trace:
    		at chapter12.Example12_5.lambda$main$0(Example12_5.java:16)
    		at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:982)
    		at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:971)
    		at reactor.core.publisher.FluxZip$ZipCoordinator.drain(FluxZip.java:738)
    		at reactor.core.publisher.FluxZip$ZipInner.onSubscribe(FluxZip.java:888)
    		at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
    		at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
    		at reactor.core.publisher.FluxZip$ZipCoordinator.subscribe(FluxZip.java:595)
    		at reactor.core.publisher.FluxZip.handleBoth(FluxZip.java:332)
    		at reactor.core.publisher.FluxZip.handleArrayMode(FluxZip.java:273)
    		at reactor.core.publisher.FluxZip.subscribe(FluxZip.java:137)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
    		at reactor.core.publisher.Flux.subscribeWith(Flux.java:8639)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8436)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8360)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8330)
    		at chapter12.Example12_5.main(Example12_5.java:20)

    아래 처럼 description 과 trackback 동시에 출력됩니다.

    Error has been observed at the following site(s):
    *__checkpoint(Example12_4.zipWith.checkpoint) ⇢ at chapter12.Example12_5.main(Example12_5.java:17)
    |_     checkpoint(Example12_4.map.checkpoint) ⇢ at chapter12.Example12_5.main(Example12_5.java:19)

     

    example12_6 : 같은 연산과 결과지만, 메소드를 분리한  다소 복잡한 예제

    public static void main(String[] args) {
        Flux<Integer> source = Flux.just(2, 4, 6, 8);
        Flux<Integer> other = Flux.just(1, 2, 3, 0);
    
        Flux<Integer> multiplySource = divide(source, other).checkpoint();
        Flux<Integer> plusSource = plus(multiplySource).checkpoint();
    
    
        plusSource.subscribe(
                data -> log.info("# onNext: {}", data),
                error -> log.error("# onError:", error)
        );
    }
    
    private static Flux<Integer> divide(Flux<Integer> source, Flux<Integer> other) {
    		return source.zipWith(other, (x, y) -> x/y);
    }
    
    private static Flux<Integer> plus(Flux<Integer> source) {
    		return source.map(num -> num + 2);
    }
    
    // result
    
    > Task :Example12_6.main()
    22:48:52.501 [main] DEBUG- Using Slf4j logging framework
    22:48:52.521 [main] INFO - # onNext: 4
    22:48:52.522 [main] INFO - # onNext: 4
    22:48:52.522 [main] INFO - # onNext: 4
    22:48:52.534 [main] ERROR- # onError:
    java.lang.ArithmeticException: / by zero
    	at chapter12.Example12_6.lambda$divide$2(Example12_6.java:26)
    	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
    Assembly trace from producer [reactor.core.publisher.FluxZip] :
    	reactor.core.publisher.Flux.checkpoint(Flux.java:3352)
    	chapter12.Example12_6.main(Example12_6.java:15)
    Error has been observed at the following site(s):
    	*__checkpoint() ⇢ at chapter12.Example12_6.main(Example12_6.java:15)
    	|_ checkpoint() ⇢ at chapter12.Example12_6.main(Example12_6.java:16)
    Original Stack Trace:
    		at chapter12.Example12_6.lambda$divide$2(Example12_6.java:26)
    		at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:982)
    		at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:971)
    		at reactor.core.publisher.FluxZip$ZipCoordinator.drain(FluxZip.java:738)
    		at reactor.core.publisher.FluxZip$ZipInner.onSubscribe(FluxZip.java:888)
    		at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
    		at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
    		at reactor.core.publisher.FluxZip$ZipCoordinator.subscribe(FluxZip.java:595)
    		at reactor.core.publisher.FluxZip.handleBoth(FluxZip.java:332)
    		at reactor.core.publisher.FluxZip.handleArrayMode(FluxZip.java:273)
    		at reactor.core.publisher.FluxZip.subscribe(FluxZip.java:137)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
    		at reactor.core.publisher.Flux.subscribeWith(Flux.java:8639)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8436)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8360)
    		at reactor.core.publisher.Flux.subscribe(Flux.java:8330)
    		at chapter12.Example12_6.main(Example12_6.java:19)

     


    12.1.3 log( ) Operator를 사용한 디버깅

     

    example 12_7 : log() 활용 예제

    public static Map<String, String> fruits = new HashMap<>();
    
    static {
        fruits.put("banana", "바나나");
        fruits.put("apple", "사과");
        fruits.put("pear", "배");
        fruits.put("grape", "포도");
    }
    
    public static void main(String[] args) {
        Flux.fromArray(new String[]{"BANANAS", "APPLES", "PEARS", "MELONS"})
                .map(String::toLowerCase)
                .map(fruit -> fruit.substring(0, fruit.length() - 1))
                .log()
    //                .log("Fruit.Substring", Level.FINE)
                .map(fruits::get)
                .subscribe(
                        log::info,
                        error -> log.error("# onError:", error));
    }
    // result
    > Task :Example12_7.main()
    22:55:26.742 [main] DEBUG- Using Slf4j logging framework
    22:55:26.758 [main] INFO - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
    22:55:26.760 [main] INFO - | request(unbounded)
    22:55:26.760 [main] INFO - | onNext(banana)
    22:55:26.760 [main] INFO - 바나나
    22:55:26.761 [main] INFO - | onNext(apple)
    22:55:26.761 [main] INFO - 사과
    22:55:26.761 [main] INFO - | onNext(pear)
    22:55:26.761 [main] INFO - 배
    22:55:26.761 [main] INFO - | onNext(melon)
    22:55:26.764 [main] INFO - | cancel()
    22:55:26.767 [main] ERROR- # onError:
    java.lang.NullPointerException: The mapper [chapter12.Example12_7$$Lambda$38/0x0000000800150840] returned a null value.
    	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115)
    	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
    	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
    	at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:172)
    	at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:97)
    	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
    	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
    	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
    	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
    	at reactor.core.publisher.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:119)
    	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
    	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178)
    	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
    	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
    	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
    	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
    	at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
    	at reactor.core.publisher.Flux.subscribeWith(Flux.java:8639)
    	at reactor.core.publisher.Flux.subscribe(Flux.java:8436)
    	at reactor.core.publisher.Flux.subscribe(Flux.java:8360)
    	at reactor.core.publisher.Flux.subscribe(Flux.java:8330)
    	at chapter12.Example12_7.main(Example12_7.java:30)

    log() Operator 사용 시

    onSubscribe(), request(), onNext() 같은 Signal 이 출력됩니다.

    특히 cancel() Signal 은 두 번째 map() Operator 가 "MELONS" 이라는 key 에 접근했지만 없는 key 여서 NullPointerException 이 발생했음을 의미합니다.

     

     

    .log("Fruit.Substring", Level.FINE) 로 log Level.FINE 사용시

    > Task :Example12_7.main()
    23:05:01.096 [main] DEBUG- Using Slf4j logging framework
    23:05:01.108 [main] DEBUG- | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
    23:05:01.109 [main] DEBUG- | request(unbounded)
    23:05:01.110 [main] DEBUG- | onNext(banana)
    23:05:01.110 [main] INFO - 바나나
    23:05:01.111 [main] DEBUG- | onNext(apple)
    23:05:01.111 [main] INFO - 사과
    23:05:01.111 [main] DEBUG- | onNext(pear)
    23:05:01.111 [main] INFO - 배
    23:05:01.111 [main] DEBUG- | onNext(melon)
    23:05:01.114 [main] DEBUG- | cancel()
    23:05:01.115 [main] ERROR- # onError:
    java.lang.NullPointerException: The mapper [chapter12.Example12_7$$Lambda$38/0x0000000800150840] returned a null value.
    	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115)
    	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
    	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
    	at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:172)
    	at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:97)
    	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
    	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
    	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
    	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
    	at reactor.core.publisher.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:119)
    	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
    	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178)
    	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
    	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
    	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
    	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
    	at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
    	at reactor.core.publisher.Flux.subscribeWith(Flux.java:8639)
    	at reactor.core.publisher.Flux.subscribe(Flux.java:8436)
    	at reactor.core.publisher.Flux.subscribe(Flux.java:8360)
    	at reactor.core.publisher.Flux.subscribe(Flux.java:8330)
    	at chapter12.Example12_7.main(Example12_7.java:31)

     

    책과 달리 Description이 표시 되지 않는다


    23:05:01.111 [main] DEBUG Fruit.Substring- | onNext(melon)

    실제
    23:05:01.111 [main] DEBUG- | onNext(melon)

     

    LEVEL.FINE 은 slf4j 로깅 프레임워크에서 사용하는 로그 레벨 중 DEBUG 레벨에 해당됩니다

     

     

    intellij 디버깅 지원

     

    기억하세요

    • Reactor 에서는 Hooks.onOperatorDebug() 메서드를 호출해서 디버그 모드를 활성화 할 수 있다.
    • 디버그 모드를 활성화하면 애플리케이션 내에 있는 모든 Operator 의 스택트레이스(Stacktrace)를 캡처하므로 프로덕션 환경에서는 사용하지 않아야 한다
    • Reactor Tools에서 지원하는 ReactorDebugAgent를 사용하여 프로덕션 환경에서 디버그 모드를 대체할 수 있다
    • checkpoint() Operator 를 사용하면 특정 Operator 체인 내의 stacktrace 만 캡처한다
    • log() Operator 는 사용 개수에 제한이 없기 때문에 1개 이상의 log() Operator 로 Reactor Sequence 의 내부 동작을 좀 더 상세하게 분석하며 디버깅 할 수 있다.

     

     

    'Spring > Webflux' 카테고리의 다른 글

    14장 Operator 1 - Sequence 생성  (0) 2023.07.08
    13장 Testing  (0) 2023.05.27
    11장 Context  (0) 2023.05.12
    10장 Scheduler  (0) 2023.05.07
    9장 Sinks  (0) 2023.05.01

    댓글

Designed by Tistory.