ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 8장 backpressure
    Spring/Webflux 2023. 4. 23. 18:30

     

    8.1 Backpressure란?

    Subscriber 의 속도가 느린 경우 필요

     

    8.2 Reactor에서의 Backpressure 처리 방식

    8.2.1 데이터 개수 제어

    Subscriber 가 적절히 처리할 수 있는 수준의 데이터 개수를 Publisher 에게 요청하는 것

    public static void main(String[] args) {
        Flux.range(1, 5)
            .doOnRequest(data -> log.info("# doOnRequest: {}", data))
            .subscribe(new BaseSubscriber<Integer>() {
                @Override
                protected void hookOnSubscribe(Subscription subscription) {
                    request(1);
                }
    
                @SneakyThrows
                @Override
                protected void hookOnNext(Integer value) {
                    Thread.sleep(2000L);
                    log.info("# hookOnNext: {}", value);
                    request(1);
                }
            });
    }
    
    //결과
    > Task :Example8_1.main()
    19:24:28.200 [main] DEBUG- Using Slf4j logging framework
    19:24:28.212 [main] INFO - # doOnRequest: 1
    19:24:30.217 [main] INFO - # hookOnNext: 1
    19:24:30.217 [main] INFO - # doOnRequest: 1
    19:24:32.221 [main] INFO - # hookOnNext: 2
    19:24:32.222 [main] INFO - # doOnRequest: 1
    19:24:34.223 [main] INFO - # hookOnNext: 3
    19:24:34.223 [main] INFO - # doOnRequest: 1
    19:24:36.224 [main] INFO - # hookOnNext: 4
    19:24:36.224 [main] INFO - # doOnRequest: 1
    19:24:38.225 [main] INFO - # hookOnNext: 5
    19:24:38.225 [main] INFO - # doOnRequest: 1
    • subscribe() 메서드의 파라미터로 람다 표현식 대신 BaseSubscriber 객체를 생성해서 전달
    • hookOnSubscribe() 메서드는 Subscriber 인터페이스에 정의된 onSubscribe() 메서드를 대신해 구독 시점에 request(1) 메서드를 호출해서, 1개씩 요청합니다.
    • hookOnNext() 메서드는 Subscriber 인터페이스에 정의된 onNext() 메서드를 대신해 Publisher 가 emit 한 데이터를 전달받아 처리(log.info~ )한 후에 Publisher 에게 또다시 데이터를 1개 요청하는 역할을 합니다

     

    # doOnRequest: 1 <- OnNext() 의 request(n=1) 의 호출에 의해 출력된 결과

    public interface Subscription{
        public void request(long n);
        ...
    }

    # hookOnNext: 1 <- .range(1,5) 에 의해 전달된 data 를 OnNext 에서 출력한 결과

    # hookOnNext: 2

    # hookOnNext: 3

    # hookOnNext: 4

    # hookOnNext: 5

    protected void hookOnNext(Integer value) {
        ...
        log.info("# hookOnNext: {}", value);
        ...
    }

     

     

    참고 @SneakyThrows 란

    Lombok 에서 Throw 처리 해줌; 위에서 Thread.sleep 이 예외 처리를 요구하여, 간편하게 사용

    https://dev-jwblog.tistory.com/130

     

     

     

    8.2.2 Backpressure 전략 사용

     

    ignore 전략

    : Backpressure를 사용하지 않기에, illigalStateException 이 발생할 수 있음

     

    error 전략

    : 버퍼가 가득찰 경우,

    1.  illigalStateException exception 을 발생
    2.  Publisher 는 Error Signal 을 Subscriber 에게 전송하고 삭제한 데이터를 폐기합니다. 
    public static void main(String[] args) throws InterruptedException {
        Flux
            .interval(Duration.ofMillis(1L))
            .onBackpressureError()
            .doOnNext(data -> log.info("# doOnNext: {}", data))
            .publishOn(Schedulers.parallel())
            .subscribe(data -> {
                        try {
                            Thread.sleep(5L);
                        } catch (InterruptedException e) {}
                        log.info("# onNext: {}", data);
                    },
                    error -> log.error("# onError", error));
    
        Thread.sleep(2000L);
    }
    
    //결과
    20:07:27.467 [main] DEBUG- Using Slf4j logging framework
    20:07:27.493 [parallel-2] INFO - # doOnNext: 0
    20:07:27.496 [parallel-2] INFO - # doOnNext: 1
    20:07:27.496 [parallel-2] INFO - # doOnNext: 2
    20:07:27.496 [parallel-2] INFO - # doOnNext: 3
    20:07:27.496 [parallel-2] INFO - # doOnNext: 4
    20:07:27.497 [parallel-2] INFO - # doOnNext: 5
    20:07:27.498 [parallel-2] INFO - # doOnNext: 6
    20:07:27.499 [parallel-2] INFO - # doOnNext: 7
    20:07:27.500 [parallel-2] INFO - # doOnNext: 8
    20:07:27.501 [parallel-2] INFO - # doOnNext: 9
    20:07:27.502 [parallel-1] INFO - # onNext: 0
    20:07:27.502 [parallel-2] INFO - # doOnNext: 10
    20:07:27.503 [parallel-2] INFO - # doOnNext: 11
    20:07:27.504 [parallel-2] INFO - # doOnNext: 12
    20:07:27.505 [parallel-2] INFO - # doOnNext: 13
    20:07:27.506 [parallel-2] INFO - # doOnNext: 14
    20:07:27.507 [parallel-2] INFO - # doOnNext: 15
    20:07:27.508 [parallel-1] INFO - # onNext: 1
    20:07:27.508 [parallel-2] INFO - # doOnNext: 16
    20:07:27.509 [parallel-2] INFO - # doOnNext: 17
    20:07:27.510 [parallel-2] INFO - # doOnNext: 18
    20:07:27.511 [parallel-2] INFO - # doOnNext: 19
    20:07:27.512 [parallel-2] INFO - # doOnNext: 20
    20:07:27.513 [parallel-2] INFO - # doOnNext: 21
    20:07:27.514 [parallel-2] INFO - # doOnNext: 22
    20:07:27.514 [parallel-1] INFO - # onNext: 2
    ... (중략) ...
    20:07:29.095 [parallel-1] INFO - # onNext: 254
    20:07:29.102 [parallel-1] INFO - # onNext: 255
    20:07:29.108 [parallel-1] ERROR- # onError
    reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:220)
    at reactor.core.publisher.Flux.lambda$onBackpressureError$27(Flux.java:6739)
    at reactor.core.publisher.FluxOnBackpressureDrop$DropSubscriber.onNext(FluxOnBackpressureDrop.java:135)
    at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:125)
    at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
    at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
    • .interval() Operator 를 사용하여 0부터 1씩 증가한 숫자를 0.001초에 한 번씩 아주 빠른 속도로 emit
    • .subscribe() 에 Thread.sleep(5L) 를 주어 처리하는데 0.005초 걸리게 하여 pub 간격대비 * 5배 sub  이 느리게 함
    • error 전략 사용위해 onBackpressureError() operator 사용
      • downstream 에서 처리량이 부족하면 발생
    • 기본 전략으로, onBackpressureError() 를 붙인거나 안 붙인거나 결과는 같다

     

    interval

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#subscribe-java.util.function.Consumer-

     

    onBackpressureError

     

     

    doOnNext

    doOnNext() , doOnComplete(), doOnError() 라는 세 함수는  알림 이벤트에 해당

    데이터가 방출될 때 트리거될 리스너를 연결할 수 있습니다

    debug 용도등으로 사용

     

     

    publishOn

    publishOn() Operator 는 Reactor Sequence 중 일부를 별도의 스레드에서 실행할 수 있도록 해 주는 Operator입니다.

    parallet 로 시작하는 이름의 스레드가 두개가 실행됨

    • 20:07:27.501 [parallel-2] INFO - # doOnNext: 9
    • 20:07:27.502 [parallel-1] INFO - # onNext: 0
    • 20:07:27.502 [parallel-2] INFO - # doOnNext: 10

    (10장 Scheduler에서 자세히 다룰 예정)

    sleep 이 없는 doOnNext() 는 0.001 초마다 emit 

    sleep 이 있는 onNext() 는 0.005초마다 처리하다가 255 처리한 후 에러 발생

    reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:220)
    ...(중략)...
    at java.base/java.lang.Thread.run(Thread.java:829)

     

    * overrun [│oʊvərʌn] 1.침략하다, (침략하여) 황폐하게 만들다; 압도하다, 괴멸시키다2.…에 퍼지다, 우거지다; …에 들끓다; 발호하다; …에 널리 퍼지다, ...3.퍼지다4.범람하다, 넘치다, 홍수가 나다5.베이스를 지나쳐 감, 오버런; (시간·비용 등의) 초과; 초과 생산(량)[비용]; 나머지, 잉여; 견적 금액의 초과(액);...6.[보통 sing.] 초과 시간

     

    Drop 전략

    Publisher 가 downstream 으로 전달할 데이터가 버퍼에 가득 찰 경우,

    버퍼 밖에서 대기 중인 데이터 중에서 먼저 emit 된 데이터부터 Drop 시키는 전략입니다.

    Drop 된 데이터는 폐기됩니다.

     

    그림으로 이해하는 Backpressure Drop 전략

    step 2 :  1~10 으로 버퍼가 가득 찹니다.

    step 3 : emit 되었으나, 버퍼 밖에서 대기 중인 11~17

    step 4 : 버퍼 밖에서 대기 중인 11~17 중 먼저 emit 된 순서대로 11,12, 13 순으로 dropped 시킵니다.

    step 5 : 버퍼에 있는 10건이 모두 처리 됩니다.

    step 6 : 버퍼밖에 있던 14~18의 데이터가 버퍼로 채워집니다.

     

     

    onBackpressureDrop 예제

    public static void main(String[] args) throws InterruptedException {
        Flux
            .interval(Duration.ofMillis(1L))
            .onBackpressureDrop(dropped -> log.info("# dropped: {}", dropped))
            .publishOn(Schedulers.parallel())
            .subscribe(data -> {
                        try {
                            Thread.sleep(5L);
                        } catch (InterruptedException e) {}
                        log.info("# onNext: {}", data);
                    },
                    error -> log.error("# onError", error));
    
        Thread.sleep(2000L);
    }
    
    // 결과
    > Task :Example8_3.main()
    12:51:11.522 [main] DEBUG- Using Slf4j logging framework
    12:51:11.552 [parallel-1] INFO - # onNext: 0
    12:51:11.560 [parallel-1] INFO - # onNext: 1
    12:51:11.566 [parallel-1] INFO - # onNext: 2
    12:51:11.572 [parallel-1] INFO - # onNext: 3
    12:51:11.578 [parallel-1] INFO - # onNext: 4
    12:51:11.584 [parallel-1] INFO - # onNext: 5
    12:51:11.590 [parallel-1] INFO - # onNext: 6
    12:51:11.595 [parallel-1] INFO - # onNext: 7
    12:51:11.601 [parallel-1] INFO - # onNext: 8
    12:51:11.607 [parallel-1] INFO - # onNext: 9
    12:51:11.613 [parallel-1] INFO - # onNext: 10
    12:51:11.619 [parallel-1] INFO - # onNext: 11
    12:51:11.625 [parallel-1] INFO - # onNext: 12
    12:51:11.630 [parallel-1] INFO - # onNext: 13
    12:51:11.637 [parallel-1] INFO - # onNext: 14
    12:51:11.644 [parallel-1] INFO - # onNext: 15
    ...중략
    12:51:11.777 [parallel-1] INFO - # onNext: 36
    12:51:11.784 [parallel-1] INFO - # onNext: 37
    12:51:11.789 [parallel-1] INFO - # onNext: 38
    12:51:11.796 [parallel-1] INFO - # onNext: 39
    12:51:11.801 [parallel-2] INFO - # dropped: 256
    12:51:11.802 [parallel-2] INFO - # dropped: 257
    12:51:11.802 [parallel-1] INFO - # onNext: 40
    12:51:11.803 [parallel-2] INFO - # dropped: 258
    12:51:11.804 [parallel-2] INFO - # dropped: 259
    12:51:11.805 [parallel-2] INFO - # dropped: 260
    12:51:11.806 [parallel-2] INFO - # dropped: 261
    12:51:11.807 [parallel-2] INFO - # dropped: 262
    12:51:11.808 [parallel-1] INFO - # onNext: 41
    12:51:11.808 [parallel-2] INFO - # dropped: 263
    ... 중략
    12:51:12.676 [parallel-2] INFO - # dropped: 1131
    12:51:12.677 [parallel-2] INFO - # dropped: 1132
    12:51:12.678 [parallel-1] INFO - # onNext: 191
    12:51:12.684 [parallel-1] INFO - # onNext: 192
    12:51:12.690 [parallel-1] INFO - # onNext: 193
    12:51:12.696 [parallel-1] INFO - # onNext: 194
    12:51:12.702 [parallel-1] INFO - # onNext: 195
    12:51:12.708 [parallel-1] INFO - # onNext: 196
    12:51:12.713 [parallel-1] INFO - # onNext: 197
    12:51:12.719 [parallel-1] INFO - # onNext: 198
    12:51:12.726 [parallel-1] INFO - # onNext: 199
    12:51:12.732 [parallel-1] INFO - # onNext: 200
    12:51:12.738 [parallel-1] INFO - # onNext: 201
    12:51:12.743 [parallel-1] INFO - # onNext: 202
    12:51:12.750 [parallel-1] INFO - # onNext: 203
    ... 중략
    12:51:13.064 [parallel-1] INFO - # onNext: 255  
    12:51:13.065 [parallel-2] INFO - # dropped: 1520
    12:51:13.066 [parallel-2] INFO - # dropped: 1521
    12:51:13.067 [parallel-2] INFO - # dropped: 1522
    12:51:13.068 [parallel-2] INFO - # dropped: 1523
    12:51:13.069 [parallel-2] INFO - # dropped: 1524
    12:51:13.070 [parallel-1] INFO - # onNext: 1133 - 256 ~ 1132 는 drop 되어 1133 처리
    12:51:13.070 [parallel-2] INFO - # dropped: 1525
    12:51:13.071 [parallel-2] INFO - # dropped: 1526
    12:51:13.072 [parallel-2] INFO - # dropped: 1527
    12:51:13.073 [parallel-2] INFO - # dropped: 1528
    12:51:13.074 [parallel-2] INFO - # dropped: 1529
    12:51:13.075 [parallel-1] INFO - # onNext: 1134
    ... 중략

     

    01  .interval()로 0.001초 간격으로 데이터를 emit

    02  .onBackpressureDrop(dropped -> log.info("# dropped: {}", dropped)) 에서 dropped 된 데이터를 출력

    03  .publishOn(Schedulers.parallel()) 로 병렬 스레드 생성

    04  .subscribe 에서 Thread.sleep(0.005초) 

    05  error -> log.error("# onError", error));   에러 발생하는지 보려고 error 출력 

     

    실행

    버퍼에 255 까지 채워지고, 256 ~ 1132 는 dropped 됩니다.

    버퍼가 비워지고 1133, 1134 가  처리됩니다.

    onBackpressureDrop 효과로 onError 가 발생하지 않습니다

     

     

    Latest 전략

    Publisher 가 downstream 으로 전달할 데이터가 버퍼에 가득찰 경우, 버퍼 밖에서 대기 중인 데이터 중에서 가장 최근에(나중에) emit 된 데이터 부터 버퍼에 채우는 전략입니다.

    새로운 데이터가 들어오는 시점에 가장 최근의 데이터만 남겨 두고 나머지 데이터를 폐기 합니다.

     

    그림으로 이해하는 backpressure latest 전략

     

    step 2 : 버퍼가 1~10으로 가득 찼습니다.

    step 3 : 버퍼밖에 11~17 까지 대기합니다.

    step 4 : 버퍼안의 1~10이 모두 처리되고 empty 상태로 변경되면, latest 는 17을 버퍼안으로 집어넣고 11~16까지는 버립니다.

    step 5 : 버퍼 안으로 17, 18, 19.. 이렇게 순서대로 채워집니다.

     

    onBackpressureLatest marble diagram

     

    public static void main(String[] args) throws InterruptedException {
        Flux
            .interval(Duration.ofMillis(1L))
            .onBackpressureLatest()
            .publishOn(Schedulers.parallel())
            .subscribe(data -> {
                        try {
                            Thread.sleep(5L);
                        } catch (InterruptedException e) {}
                        log.info("# onNext: {}", data);
                    },
                    error -> log.error("# onError", error));
    
        Thread.sleep(2000L);
    }
    
    // 결과
    > Task :Example8_4.main()
    14:04:52.709 [main] DEBUG- Using Slf4j logging framework
    14:04:52.732 [parallel-1] INFO - # onNext: 0
    14:04:52.742 [parallel-1] INFO - # onNext: 1
    14:04:52.747 [parallel-1] INFO - # onNext: 2
    14:04:52.753 [parallel-1] INFO - # onNext: 3
    14:04:52.759 [parallel-1] INFO - # onNext: 4
    14:04:52.765 [parallel-1] INFO - # onNext: 5
    14:04:52.772 [parallel-1] INFO - # onNext: 6
    14:04:52.777 [parallel-1] INFO - # onNext: 7
    14:04:52.783 [parallel-1] INFO - # onNext: 8
    14:04:52.788 [parallel-1] INFO - # onNext: 9
    14:04:52.794 [parallel-1] INFO - # onNext: 10
    14:04:52.800 [parallel-1] INFO - # onNext: 11
    14:04:54.369 [parallel-1] INFO - # onNext: 254
    14:04:54.375 [parallel-1] INFO - # onNext: 255
    ... 중략
    14:04:54.382 [parallel-1] INFO - # onNext: 1274
    14:04:54.388 [parallel-1] INFO - # onNext: 1275
    14:04:54.393 [parallel-1] INFO - # onNext: 1276
    14:04:54.400 [parallel-1] INFO - # onNext: 1277
    14:04:54.406 [parallel-1] INFO - # onNext: 1278
    ... 중략
    14:04:54.713 [parallel-1] INFO - # onNext: 1330
    14:04:54.718 [parallel-1] INFO - # onNext: 1331
    14:04:54.723 [parallel-1] INFO - # onNext: 1332

     

    코드는 drop 전략과 유사합니다.

    01  .interval()로 1초 간격으로 데이터를 emit

    02  .onBackpressurelatest() 전략 설정

    03  .publishOn(Schedulers.parallel()) 로 병렬 스레드 생성

    04  .subscribe 에서 Thread.sleep(5초) 

    05  error -> log.error("# onError", error));   에러 발생하는지 보려고 error 출력 

     

    실행

    버퍼에 255 까지 채워집니다

    버퍼가 비워지고 latest 인 1274 가 처리되고, 256 ~ 1273 은 dropped 됩니다.

    1274 + 254 = 1528 전에 끝나므로 2차 onBackpressurelatest 가 발생하기 전에 종료됩니다.

     

    vsDrop 전략과 비교하면,

    drop 전략 latest 전략
    버리는 우선순위 전략
    버퍼 밖에서 대기 중인 건 중 가장 먼저 emit 된 데이터 부터 버림
    버퍼를 채우는 우선순위 전략
    버퍼 밖 대기 건 중 가장 최근에 emit 된 데이터 부터 버퍼에 채움
    latest 채우기 위해, 앞에 대기 중인 데이터 다 버림

     

    Buffer 전략

    버퍼 : 입출력을 수행하는 장치들간의 속도 차이를 조절하기 위해 입출력 장치 중간에 위치햇 데이터를 어느정도 쌓아 두었다가 전송하는 것

    backpressure Buffer 전략도 이와 비슷

     

    버퍼가 가득차면

    • 버퍼 내의 데이터를 폐기하는 전략
    • 버퍼가 가득차면 에러를 발생시키는 전략

    둘 다 지원합니다

    그 중 버퍼 내의 데이터를 폐기하는 전략을 살펴봅니다

     

     

    BUFFER DROP_LATEST 전략

    그림으로 이해하는 backpressure BUFFER DROP_LATEST 전략

    step 2 : 버퍼가 1~10으로 가득 찼습니다.

    step 3 : 11 이 버퍼안으로 들어와서 overflow 가 발생합니다.

    step 4 : overflow 를 일으킨 11이 drop 됩니다

     

     

    onBackpressBuffer drop_latest marble diagram

    onBackpressureBuffer  DROP_LATEST 예제

    public static void main(String[] args) throws InterruptedException {
        Flux
            .interval(Duration.ofMillis(300L))
            .doOnNext(data -> log.info("# emitted by original Flux: {}", data))
            .onBackpressureBuffer(2,
                    dropped -> log.info("** Overflow & Dropped: {} **", dropped),
                    BufferOverflowStrategy.DROP_LATEST)
            .doOnNext(data -> log.info("[ # emitted by Buffer: {} ]", data))
            .publishOn(Schedulers.parallel(), false, 1)
            .subscribe(data -> {
                        try {
                            Thread.sleep(1000L);
                        } catch (InterruptedException e) {}
                        log.info("# onNext: {}", data);
                    },
                    error -> log.error("# onError", error));
    
        Thread.sleep(2500L);
    }
    
    // 결과
    
    > Task :Example8_5.main()
    14:42:03.221 [main] DEBUG- Using Slf4j logging framework
    14:42:03.543 [parallel-2] INFO - # emitted by original Flux: 0
    14:42:03.547 [parallel-2] INFO - [ # emitted by Buffer: 0 ]
    14:42:03.843 [parallel-2] INFO - # emitted by original Flux: 1
    14:42:04.142 [parallel-2] INFO - # emitted by original Flux: 2
    14:42:04.443 [parallel-2] INFO - # emitted by original Flux: 3
    14:42:04.444 [parallel-2] INFO - ** Overflow & Dropped: 3 **
    14:42:04.548 [parallel-1] INFO - # onNext: 0
    14:42:04.549 [parallel-1] INFO - [ # emitted by Buffer: 1 ]
    14:42:04.746 [parallel-2] INFO - # emitted by original Flux: 4
    14:42:05.044 [parallel-2] INFO - # emitted by original Flux: 5
    14:42:05.044 [parallel-2] INFO - ** Overflow & Dropped: 5 **
    14:42:05.344 [parallel-2] INFO - # emitted by original Flux: 6
    14:42:05.344 [parallel-2] INFO - ** Overflow & Dropped: 6 **
    14:42:05.553 [parallel-1] INFO - # onNext: 1
    14:42:05.553 [parallel-1] INFO - [ # emitted by Buffer: 2 ]
    14:42:05.644 [parallel-2] INFO - # emitted by original Flux: 7

     

    01  .interval()로 0.3초 간격으로 데이터를 emit

    02  .doOnNext(data -> log.info("# emitted by original Flux: {}", data)) 로 원본Flux 즉 interval() operator 에서 생성된 원본 데이터가 emit 되는 과정 확인 가능

    03  .onBackpressureBuffer(maxsize = 2, DROP_LATEST ) 전략 설정하고, 

    overflow 발생 시  dropped -> log.info("** Overflow & Dropped: {} **", dropped), 출력

     

    04  .doOnNext(data -> log.info("[ # emitted by Buffer: {} ]", data))  통해 Buffer에서 downstream 으로 emit 되는 데이터 확인

    05  .publishOn(Schedulers.parallel()) 로 병렬 스레드 생성, prefectch 1로 지정

    06  .subscribe 에서 Thread.sleep(1초) 

    07  error -> log.error("# onError", error));   에러 발생하는지 보려고 error 출력 

     

    결과설명

    내용이 복잡합니다. 아래처럼 진행할께요
    01. study 진행자가 먼저 1차 설명 드립니다.
    02. 여러분들이, 코드와 로그를 보며, 재현 해 봅니다.
    03. 진행자가 2차로, 다시한 번 설명 드리며, 질문 받습니다.

    publisher - buffer - subscriber  이렇게 3개의 상태를 각각 분리해서 생각하세요

     

    15:30:12.001 [parallel-2] INFO - # emitted by original Flux: 0
    15:30:12.004 [parallel-2] INFO - [ # emitted by Buffer: 0 ]

    original Flux 가 0을 emit 하고, 잠시 후 buffer 에도 0 이 emit 됨, buffer 는 empty

     

    14:42:03.843 [parallel-2] INFO - # emitted by original Flux: 1
    14:42:04.142 [parallel-2] INFO - # emitted by original Flux: 2
    14:42:04.443 [parallel-2] INFO - # emitted by original Flux: 3
    14:42:04.444 [parallel-2] INFO - ** Overflow & Dropped: 3 **

    maxsize = 2 인데, 3이 들어올때 overflow 발생하고 3 버려짐, buffer 에는 [1,2 ]


    14:42:04.548 [parallel-1] INFO - # onNext: 0

    Subscriber 가 0 처리

     

    14:42:04.549 [parallel-1] INFO - [ # emitted by Buffer: 1 ]

    buffer 에서는 1 emit 됨, buffer 에는 [2] 만 남음

     

    14:42:04.746 [parallel-2] INFO - # emitted by original Flux: 4
    14:42:05.044 [parallel-2] INFO - # emitted by original Flux: 5

    original Flux 에서  4,5 가 추가로 emit 됨


    14:42:05.044 [parallel-2] INFO - ** Overflow & Dropped: 5 **

    buffer 에 [2,4 ] 가 들어있는 상태에 5가 들어와서 overflow 발생하고 버려짐


    14:42:05.344 [parallel-2] INFO - # emitted by original Flux: 6
    14:42:05.344 [parallel-2] INFO - ** Overflow & Dropped: 6 **

    buffer 에 [2,4] 가 들어있는 상태에 6이  들어와서 overflow 발생하고 버려짐

     

    14:42:05.553 [parallel-1] INFO - # onNext: 1

    Subscriber 가 1 처리


    14:42:05.553 [parallel-1] INFO - [ # emitted by Buffer: 2 ]

    buffer 에서 2가 emit 됨, buffer 에는 [4] 남음


    14:42:05.644 [parallel-2] INFO - # emitted by original Flux: 7

    original Flux 에서 7 emit 되고 종료, buffer 에는 [4, 7]

     

     

    참고 : prefetch 는 Scheduler 가 생성하는 스레드의 비동기 경계 시점에 미리 보관할 데이터의 개수를 의미하며,

    데이터의 요청 개수에 영향을 미칩니다.

     

    BUFFER DROP_OLDEST 전략

    Publisher 가 downstream 으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 안에 채워진 데이터 중에서 가장 오래된 데이터를 Drop하여 폐기한 후, 확보된 공간에 emit된 데이터를 채우는 전략입니다.

     

    그림으로 이해하는 Backpressure BUFFER DROP_OLDEST 전략

    step 2 : 버퍼가 1~10으로 가득 찼습니다.

    step 3 : 11 이 버퍼안으로 들어와서 overflow 가 발생합니다.

    step 4 : 버퍼안에서 가장 오래된 1을 drop 하고, 11을 버퍼에 채웁니다.

     

    onBackpressureBuffer 의 drop_oldest 전략 marble diagram

     

     

    onBackpressureBuffer DROP_OLDEST 예제

    public static void main(String[] args) throws InterruptedException {
        Flux
            .interval(Duration.ofMillis(300L))
            .doOnNext(data -> log.info("# emitted by original Flux: {}", data))
            .onBackpressureBuffer(2,
                    dropped -> log.info("** Overflow & Dropped: {} **", dropped),
                    BufferOverflowStrategy.DROP_OLDEST)
            .doOnNext(data -> log.info("[ # emitted by Buffer: {} ]", data))
            .publishOn(Schedulers.parallel(), false, 1)
            .subscribe(data -> {
                        try {
                            Thread.sleep(1000L);
                        } catch (InterruptedException e) {}
                        log.info("# onNext: {}", data);
                    },
                    error -> log.error("# onError", error));
    
        Thread.sleep(2500L);
    }
    
    // 결과
    15:51:25.424 [parallel-2] INFO - # emitted by original Flux: 0
    15:51:25.428 [parallel-2] INFO - [ # emitted by Buffer: 0 ]
    15:51:25.723 [parallel-2] INFO - # emitted by original Flux: 1
    15:51:26.028 [parallel-2] INFO - # emitted by original Flux: 2
    15:51:26.323 [parallel-2] INFO - # emitted by original Flux: 3
    15:51:26.324 [parallel-2] INFO - ** Overflow & Dropped: 1 **
    15:51:26.432 [parallel-1] INFO - # onNext: 0
    15:51:26.444 [parallel-1] INFO - [ # emitted by Buffer: 2 ]
    15:51:26.625 [parallel-2] INFO - # emitted by original Flux: 4
    15:51:26.928 [parallel-2] INFO - # emitted by original Flux: 5
    15:51:26.929 [parallel-2] INFO - ** Overflow & Dropped: 3 **
    15:51:27.226 [parallel-2] INFO - # emitted by original Flux: 6
    15:51:27.227 [parallel-2] INFO - ** Overflow & Dropped: 4 **
    15:51:27.467 [parallel-1] INFO - # onNext: 2
    15:51:27.468 [parallel-1] INFO - [ # emitted by Buffer: 5 ]
    15:51:27.528 [parallel-2] INFO - # emitted by original Flux: 7

    01  .interval()로 0.3초 간격으로 데이터를 emit

    02  .doOnNext(data -> log.info("# emitted by original Flux: {}", data)) 로 원본Flux 즉 interval() operator 에서 생성된 원본 데이터가 emit 되는 과정 확인 가능

    03  .onBackpressureBuffer(maxsize = 2, DROP_OLDEST ) 전략 설정하고, 

    overflow 발생 시  dropped -> log.info("** Overflow & Dropped: {} **", dropped), 출력

     

    04  .doOnNext(data -> log.info("[ # emitted by Buffer: {} ]", data))  통해 Buffer에서 downstream 으로 emit 되는 데이터 확인

    05  .publishOn(Schedulers.parallel()) 로 병렬 스레드 생성, prefectch 1로 지정

    06  .subscribe 에서 Thread.sleep(1초) 

    07  error -> log.error("# onError", error));   에러 발생하는지 보려고 error 출력 

     

    결과

    15:51:25.424 [parallel-2] INFO - # emitted by original Flux: 0
    15:51:25.428 [parallel-2] INFO - [ # emitted by Buffer: 0 ]

    original Flux 가 0을 emit 하고, 잠시 후 buffer 에도 0 이 emit 됨, buffer 는 empty


    15:51:25.723 [parallel-2] INFO - # emitted by original Flux: 1
    15:51:26.028 [parallel-2] INFO - # emitted by original Flux: 2
    15:51:26.323 [parallel-2] INFO - # emitted by original Flux: 3
    15:51:26.324 [parallel-2] INFO - ** Overflow & Dropped: 1 **

    maxsize = 2 인데, 3이 들어올때 overflow 발생하고 오래된 1 버려짐, buffer 에는 [2,3 ]


    15:51:26.432 [parallel-1] INFO - # onNext: 0

    Subscriber 가 0 처리


    15:51:26.444 [parallel-1] INFO - [ # emitted by Buffer: 2 ]

    buffer 에서 2가 emit 됨, buffer 에는 [3] 남음


    15:51:26.625 [parallel-2] INFO - # emitted by original Flux: 4
    15:51:26.928 [parallel-2] INFO - # emitted by original Flux: 5

    original Flux 에서  4,5 가 추가로 emit 됨


    15:51:26.929 [parallel-2] INFO - ** Overflow & Dropped: 3 **

    buffer 에 [3] 인 상태에서, 4, 5가 들어오고 overflow 발생, 오래된 3이 버려짐, buffer 에 [4,5] 남음


    15:51:27.226 [parallel-2] INFO - # emitted by original Flux: 6

    15:51:27.227 [parallel-2] INFO - ** Overflow & Dropped: 4 **

    buffer 에 [4,5] 남은 상태에서 6이 들어와 overflow 발생, 오래된 4가 버려짐, buffer 에 [5,6 ] 남음


    15:51:27.467 [parallel-1] INFO - # onNext: 2

    Subscriber 가 2 처리


    15:51:27.468 [parallel-1] INFO - [ # emitted by Buffer: 5 ]

    buffer 에서 5 emit  됨,  buffer 에 [6] 남음


    15:51:27.528 [parallel-2] INFO - # emitted by original Flux: 7

    original flux 에서 7 emit 됨,   buffer 에 [6, 7] 남음

     

    기억하세요

    1. Backpressure 는 Publisher 가 끊임없이  emit 하는 무수히 많은 데이터를 적절하게 제어하여 데이터 처리에 있어 과부하가 걸리지 않도록 제어하는 데이터 처리 방식이다.
    2. Reactor 에서 지원하는 Backpressure 처리 방식에는 데이터 요청 개수를 제어하는 방식, Backpressure 전략(버리는 전략)을 사용하는 방식등이 있다.
    3. Backpressure ignore 전략은 backpressure 를 적용하지 않는 전략이다.
    4. backpressure error 전략은 downstream 의 데이터 처리 속도가 느려서 upstream 의 emit 속도를 따라가지 못할 경우 에러를 발생시키는 전략이다.
    5. Backpressure drop 전략은 Publisher 가  downstream 으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기 중인 데이터 중에서, 먼저 emit 된 데이터부터 drop하는 전략이다.
    6. Backpressure latest 전략은 publisher 가 downstream 으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기 중인 데이터 중에서, 가장 최근에(나중에) emit된 데이터부터 버퍼에 채우는 전략이다.
    7. Backpressure buffer 전략은 버퍼의 데이터를 폐기하지 않고, 버퍼링을 하는 전략.  버퍼가 가득 차면 버퍼 내의 데이터를 폐기하는 전략, 버퍼가 가득 차면 에러를 발생시키는 전략 등으로 구분할 수 있다.
      1. Backpressure Buffer drop_latest 전략은 Publisher 가 downstream 으로 전달할 데이터가 버퍼에 가득 찰 경우, 가장 최근에(나중에) 버퍼 안에 채워진 데이터를 drop하는 전략이다.
      2. Backpressure Buffer drop_oldest 전략은 Publisher가 downstream 으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 안에 채워진 데이터 중에서 가장 오래된 데이터를 drop 하는 전략이다.

     

     

     

    참고

    https://devfunny.tistory.com/914

     

    [리액티브 프로그래밍] Backpressure의 개념과 Backpressure 전략

    Backpressure 리액티브 프로그래밍에서의 배압, 즉 Backpressure은 Publisher가 끊임없이 emit하는 무수히 많은 데이터를 적절하게 제어하여 데이터 처리에 과부하가 걸리지 않도록 제어하는 것이다. Publish

    devfunny.tistory.com

     

    스프링으로 시작하는 리액티브 스트림즈 책 스터디 8장 backpressure

    'Spring > Webflux' 카테고리의 다른 글

    10장 Scheduler  (0) 2023.05.07
    9장 Sinks  (0) 2023.05.01
    7장 Cold Sequence 와 Hot Sequence  (0) 2023.04.23
    6장 마블 다이어그램  (1) 2023.04.22
    5장 Reactor 개요  (0) 2023.04.22

    댓글

Designed by Tistory.