-
8장 backpressureSpring/Webflux 2023. 4. 23. 18:30
8.1 Backpressure란?
Subscriber 의 속도가 느린 경우 필요
8.2 Reactor에서의 Backpressure 처리 방식
8.2.1 데이터 개수 제어
Subscriber 가 적절히 처리할 수 있는 수준의 데이터 개수를 Publisher 에게 요청하는 것
public static void main(String[] args) { Flux.range(1, 5) .doOnRequest(data -> log.info("# doOnRequest: {}", data)) .subscribe(new BaseSubscriber<Integer>() { @Override protected void hookOnSubscribe(Subscription subscription) { request(1); } @SneakyThrows @Override protected void hookOnNext(Integer value) { Thread.sleep(2000L); log.info("# hookOnNext: {}", value); request(1); } }); } //결과 > Task :Example8_1.main() 19:24:28.200 [main] DEBUG- Using Slf4j logging framework 19:24:28.212 [main] INFO - # doOnRequest: 1 19:24:30.217 [main] INFO - # hookOnNext: 1 19:24:30.217 [main] INFO - # doOnRequest: 1 19:24:32.221 [main] INFO - # hookOnNext: 2 19:24:32.222 [main] INFO - # doOnRequest: 1 19:24:34.223 [main] INFO - # hookOnNext: 3 19:24:34.223 [main] INFO - # doOnRequest: 1 19:24:36.224 [main] INFO - # hookOnNext: 4 19:24:36.224 [main] INFO - # doOnRequest: 1 19:24:38.225 [main] INFO - # hookOnNext: 5 19:24:38.225 [main] INFO - # doOnRequest: 1
- subscribe() 메서드의 파라미터로 람다 표현식 대신 BaseSubscriber 객체를 생성해서 전달
- hookOnSubscribe() 메서드는 Subscriber 인터페이스에 정의된 onSubscribe() 메서드를 대신해 구독 시점에 request(1) 메서드를 호출해서, 1개씩 요청합니다.
- hookOnNext() 메서드는 Subscriber 인터페이스에 정의된 onNext() 메서드를 대신해 Publisher 가 emit 한 데이터를 전달받아 처리(log.info~ )한 후에 Publisher 에게 또다시 데이터를 1개 요청하는 역할을 합니다
# doOnRequest: 1 <- OnNext() 의 request(n=1) 의 호출에 의해 출력된 결과
public interface Subscription{ public void request(long n); ... }
# hookOnNext: 1 <- .range(1,5) 에 의해 전달된 data 를 OnNext 에서 출력한 결과
# hookOnNext: 2
# hookOnNext: 3
# hookOnNext: 4
# hookOnNext: 5
protected void hookOnNext(Integer value) { ... log.info("# hookOnNext: {}", value); ... }
참고 @SneakyThrows 란
Lombok 에서 Throw 처리 해줌; 위에서 Thread.sleep 이 예외 처리를 요구하여, 간편하게 사용
https://dev-jwblog.tistory.com/130
8.2.2 Backpressure 전략 사용
ignore 전략
: Backpressure를 사용하지 않기에, illigalStateException 이 발생할 수 있음
error 전략
: 버퍼가 가득찰 경우,
- illigalStateException exception 을 발생
- Publisher 는 Error Signal 을 Subscriber 에게 전송하고 삭제한 데이터를 폐기합니다.
public static void main(String[] args) throws InterruptedException { Flux .interval(Duration.ofMillis(1L)) .onBackpressureError() .doOnNext(data -> log.info("# doOnNext: {}", data)) .publishOn(Schedulers.parallel()) .subscribe(data -> { try { Thread.sleep(5L); } catch (InterruptedException e) {} log.info("# onNext: {}", data); }, error -> log.error("# onError", error)); Thread.sleep(2000L); } //결과 20:07:27.467 [main] DEBUG- Using Slf4j logging framework 20:07:27.493 [parallel-2] INFO - # doOnNext: 0 20:07:27.496 [parallel-2] INFO - # doOnNext: 1 20:07:27.496 [parallel-2] INFO - # doOnNext: 2 20:07:27.496 [parallel-2] INFO - # doOnNext: 3 20:07:27.496 [parallel-2] INFO - # doOnNext: 4 20:07:27.497 [parallel-2] INFO - # doOnNext: 5 20:07:27.498 [parallel-2] INFO - # doOnNext: 6 20:07:27.499 [parallel-2] INFO - # doOnNext: 7 20:07:27.500 [parallel-2] INFO - # doOnNext: 8 20:07:27.501 [parallel-2] INFO - # doOnNext: 9 20:07:27.502 [parallel-1] INFO - # onNext: 0 20:07:27.502 [parallel-2] INFO - # doOnNext: 10 20:07:27.503 [parallel-2] INFO - # doOnNext: 11 20:07:27.504 [parallel-2] INFO - # doOnNext: 12 20:07:27.505 [parallel-2] INFO - # doOnNext: 13 20:07:27.506 [parallel-2] INFO - # doOnNext: 14 20:07:27.507 [parallel-2] INFO - # doOnNext: 15 20:07:27.508 [parallel-1] INFO - # onNext: 1 20:07:27.508 [parallel-2] INFO - # doOnNext: 16 20:07:27.509 [parallel-2] INFO - # doOnNext: 17 20:07:27.510 [parallel-2] INFO - # doOnNext: 18 20:07:27.511 [parallel-2] INFO - # doOnNext: 19 20:07:27.512 [parallel-2] INFO - # doOnNext: 20 20:07:27.513 [parallel-2] INFO - # doOnNext: 21 20:07:27.514 [parallel-2] INFO - # doOnNext: 22 20:07:27.514 [parallel-1] INFO - # onNext: 2 ... (중략) ... 20:07:29.095 [parallel-1] INFO - # onNext: 254 20:07:29.102 [parallel-1] INFO - # onNext: 255 20:07:29.108 [parallel-1] ERROR- # onError reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...) at reactor.core.Exceptions.failWithOverflow(Exceptions.java:220) at reactor.core.publisher.Flux.lambda$onBackpressureError$27(Flux.java:6739) at reactor.core.publisher.FluxOnBackpressureDrop$DropSubscriber.onNext(FluxOnBackpressureDrop.java:135) at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:125) at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59) at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)
- .interval() Operator 를 사용하여 0부터 1씩 증가한 숫자를 0.001초에 한 번씩 아주 빠른 속도로 emit
- .subscribe() 에 Thread.sleep(5L) 를 주어 처리하는데 0.005초 걸리게 하여 pub 간격대비 * 5배 sub 이 느리게 함
- error 전략 사용위해 onBackpressureError() operator 사용
- downstream 에서 처리량이 부족하면 발생
- 기본 전략으로, onBackpressureError() 를 붙인거나 안 붙인거나 결과는 같다
interval
onBackpressureError
doOnNext
doOnNext() , doOnComplete(), doOnError() 라는 세 함수는 알림 이벤트에 해당
데이터가 방출될 때 트리거될 리스너를 연결할 수 있습니다
debug 용도등으로 사용
publishOn
publishOn() Operator 는 Reactor Sequence 중 일부를 별도의 스레드에서 실행할 수 있도록 해 주는 Operator입니다.
parallet 로 시작하는 이름의 스레드가 두개가 실행됨
- 20:07:27.501 [parallel-2] INFO - # doOnNext: 9
- 20:07:27.502 [parallel-1] INFO - # onNext: 0
- 20:07:27.502 [parallel-2] INFO - # doOnNext: 10
(10장 Scheduler에서 자세히 다룰 예정)
sleep 이 없는 doOnNext() 는 0.001 초마다 emit
sleep 이 있는 onNext() 는 0.005초마다 처리하다가 255 처리한 후 에러 발생
reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:220)
...(중략)...
at java.base/java.lang.Thread.run(Thread.java:829)* overrun [│oʊvərʌn] 1.침략하다, (침략하여) 황폐하게 만들다; 압도하다, 괴멸시키다2.…에 퍼지다, 우거지다; …에 들끓다; 발호하다; …에 널리 퍼지다, ...3.퍼지다4.범람하다, 넘치다, 홍수가 나다5.베이스를 지나쳐 감, 오버런; (시간·비용 등의) 초과; 초과 생산(량)[비용]; 나머지, 잉여; 견적 금액의 초과(액);...6.[보통 sing.] 초과 시간
Drop 전략
Publisher 가 downstream 으로 전달할 데이터가 버퍼에 가득 찰 경우,
버퍼 밖에서 대기 중인 데이터 중에서 먼저 emit 된 데이터부터 Drop 시키는 전략입니다.
Drop 된 데이터는 폐기됩니다.
step 2 : 1~10 으로 버퍼가 가득 찹니다.
step 3 : emit 되었으나, 버퍼 밖에서 대기 중인 11~17
step 4 : 버퍼 밖에서 대기 중인 11~17 중 먼저 emit 된 순서대로 11,12, 13 순으로 dropped 시킵니다.
step 5 : 버퍼에 있는 10건이 모두 처리 됩니다.
step 6 : 버퍼밖에 있던 14~18의 데이터가 버퍼로 채워집니다.
onBackpressureDrop 예제
public static void main(String[] args) throws InterruptedException { Flux .interval(Duration.ofMillis(1L)) .onBackpressureDrop(dropped -> log.info("# dropped: {}", dropped)) .publishOn(Schedulers.parallel()) .subscribe(data -> { try { Thread.sleep(5L); } catch (InterruptedException e) {} log.info("# onNext: {}", data); }, error -> log.error("# onError", error)); Thread.sleep(2000L); } // 결과 > Task :Example8_3.main() 12:51:11.522 [main] DEBUG- Using Slf4j logging framework 12:51:11.552 [parallel-1] INFO - # onNext: 0 12:51:11.560 [parallel-1] INFO - # onNext: 1 12:51:11.566 [parallel-1] INFO - # onNext: 2 12:51:11.572 [parallel-1] INFO - # onNext: 3 12:51:11.578 [parallel-1] INFO - # onNext: 4 12:51:11.584 [parallel-1] INFO - # onNext: 5 12:51:11.590 [parallel-1] INFO - # onNext: 6 12:51:11.595 [parallel-1] INFO - # onNext: 7 12:51:11.601 [parallel-1] INFO - # onNext: 8 12:51:11.607 [parallel-1] INFO - # onNext: 9 12:51:11.613 [parallel-1] INFO - # onNext: 10 12:51:11.619 [parallel-1] INFO - # onNext: 11 12:51:11.625 [parallel-1] INFO - # onNext: 12 12:51:11.630 [parallel-1] INFO - # onNext: 13 12:51:11.637 [parallel-1] INFO - # onNext: 14 12:51:11.644 [parallel-1] INFO - # onNext: 15 ...중략 12:51:11.777 [parallel-1] INFO - # onNext: 36 12:51:11.784 [parallel-1] INFO - # onNext: 37 12:51:11.789 [parallel-1] INFO - # onNext: 38 12:51:11.796 [parallel-1] INFO - # onNext: 39 12:51:11.801 [parallel-2] INFO - # dropped: 256 12:51:11.802 [parallel-2] INFO - # dropped: 257 12:51:11.802 [parallel-1] INFO - # onNext: 40 12:51:11.803 [parallel-2] INFO - # dropped: 258 12:51:11.804 [parallel-2] INFO - # dropped: 259 12:51:11.805 [parallel-2] INFO - # dropped: 260 12:51:11.806 [parallel-2] INFO - # dropped: 261 12:51:11.807 [parallel-2] INFO - # dropped: 262 12:51:11.808 [parallel-1] INFO - # onNext: 41 12:51:11.808 [parallel-2] INFO - # dropped: 263 ... 중략 12:51:12.676 [parallel-2] INFO - # dropped: 1131 12:51:12.677 [parallel-2] INFO - # dropped: 1132 12:51:12.678 [parallel-1] INFO - # onNext: 191 12:51:12.684 [parallel-1] INFO - # onNext: 192 12:51:12.690 [parallel-1] INFO - # onNext: 193 12:51:12.696 [parallel-1] INFO - # onNext: 194 12:51:12.702 [parallel-1] INFO - # onNext: 195 12:51:12.708 [parallel-1] INFO - # onNext: 196 12:51:12.713 [parallel-1] INFO - # onNext: 197 12:51:12.719 [parallel-1] INFO - # onNext: 198 12:51:12.726 [parallel-1] INFO - # onNext: 199 12:51:12.732 [parallel-1] INFO - # onNext: 200 12:51:12.738 [parallel-1] INFO - # onNext: 201 12:51:12.743 [parallel-1] INFO - # onNext: 202 12:51:12.750 [parallel-1] INFO - # onNext: 203 ... 중략 12:51:13.064 [parallel-1] INFO - # onNext: 255 12:51:13.065 [parallel-2] INFO - # dropped: 1520 12:51:13.066 [parallel-2] INFO - # dropped: 1521 12:51:13.067 [parallel-2] INFO - # dropped: 1522 12:51:13.068 [parallel-2] INFO - # dropped: 1523 12:51:13.069 [parallel-2] INFO - # dropped: 1524 12:51:13.070 [parallel-1] INFO - # onNext: 1133 - 256 ~ 1132 는 drop 되어 1133 처리 12:51:13.070 [parallel-2] INFO - # dropped: 1525 12:51:13.071 [parallel-2] INFO - # dropped: 1526 12:51:13.072 [parallel-2] INFO - # dropped: 1527 12:51:13.073 [parallel-2] INFO - # dropped: 1528 12:51:13.074 [parallel-2] INFO - # dropped: 1529 12:51:13.075 [parallel-1] INFO - # onNext: 1134 ... 중략
01 .interval()로 0.001초 간격으로 데이터를 emit
02 .onBackpressureDrop(dropped -> log.info("# dropped: {}", dropped)) 에서 dropped 된 데이터를 출력
03 .publishOn(Schedulers.parallel()) 로 병렬 스레드 생성
04 .subscribe 에서 Thread.sleep(0.005초)
05 error -> log.error("# onError", error)); 에러 발생하는지 보려고 error 출력
실행
버퍼에 255 까지 채워지고, 256 ~ 1132 는 dropped 됩니다.
버퍼가 비워지고 1133, 1134 가 처리됩니다.
onBackpressureDrop 효과로 onError 가 발생하지 않습니다
Latest 전략
Publisher 가 downstream 으로 전달할 데이터가 버퍼에 가득찰 경우, 버퍼 밖에서 대기 중인 데이터 중에서 가장 최근에(나중에) emit 된 데이터 부터 버퍼에 채우는 전략입니다.
새로운 데이터가 들어오는 시점에 가장 최근의 데이터만 남겨 두고 나머지 데이터를 폐기 합니다.
step 2 : 버퍼가 1~10으로 가득 찼습니다.
step 3 : 버퍼밖에 11~17 까지 대기합니다.
step 4 : 버퍼안의 1~10이 모두 처리되고 empty 상태로 변경되면, latest 는 17을 버퍼안으로 집어넣고 11~16까지는 버립니다.
step 5 : 버퍼 안으로 17, 18, 19.. 이렇게 순서대로 채워집니다.
public static void main(String[] args) throws InterruptedException { Flux .interval(Duration.ofMillis(1L)) .onBackpressureLatest() .publishOn(Schedulers.parallel()) .subscribe(data -> { try { Thread.sleep(5L); } catch (InterruptedException e) {} log.info("# onNext: {}", data); }, error -> log.error("# onError", error)); Thread.sleep(2000L); } // 결과 > Task :Example8_4.main() 14:04:52.709 [main] DEBUG- Using Slf4j logging framework 14:04:52.732 [parallel-1] INFO - # onNext: 0 14:04:52.742 [parallel-1] INFO - # onNext: 1 14:04:52.747 [parallel-1] INFO - # onNext: 2 14:04:52.753 [parallel-1] INFO - # onNext: 3 14:04:52.759 [parallel-1] INFO - # onNext: 4 14:04:52.765 [parallel-1] INFO - # onNext: 5 14:04:52.772 [parallel-1] INFO - # onNext: 6 14:04:52.777 [parallel-1] INFO - # onNext: 7 14:04:52.783 [parallel-1] INFO - # onNext: 8 14:04:52.788 [parallel-1] INFO - # onNext: 9 14:04:52.794 [parallel-1] INFO - # onNext: 10 14:04:52.800 [parallel-1] INFO - # onNext: 11 14:04:54.369 [parallel-1] INFO - # onNext: 254 14:04:54.375 [parallel-1] INFO - # onNext: 255 ... 중략 14:04:54.382 [parallel-1] INFO - # onNext: 1274 14:04:54.388 [parallel-1] INFO - # onNext: 1275 14:04:54.393 [parallel-1] INFO - # onNext: 1276 14:04:54.400 [parallel-1] INFO - # onNext: 1277 14:04:54.406 [parallel-1] INFO - # onNext: 1278 ... 중략 14:04:54.713 [parallel-1] INFO - # onNext: 1330 14:04:54.718 [parallel-1] INFO - # onNext: 1331 14:04:54.723 [parallel-1] INFO - # onNext: 1332
코드는 drop 전략과 유사합니다.
01 .interval()로 1초 간격으로 데이터를 emit
02 .onBackpressurelatest() 전략 설정
03 .publishOn(Schedulers.parallel()) 로 병렬 스레드 생성
04 .subscribe 에서 Thread.sleep(5초)
05 error -> log.error("# onError", error)); 에러 발생하는지 보려고 error 출력
실행
버퍼에 255 까지 채워집니다
버퍼가 비워지고 latest 인 1274 가 처리되고, 256 ~ 1273 은 dropped 됩니다.
1274 + 254 = 1528 전에 끝나므로 2차 onBackpressurelatest 가 발생하기 전에 종료됩니다.
vsDrop 전략과 비교하면,
drop 전략 latest 전략 버리는 우선순위 전략
버퍼 밖에서 대기 중인 건 중 가장 먼저 emit 된 데이터 부터 버림버퍼를 채우는 우선순위 전략
버퍼 밖 대기 건 중 가장 최근에 emit 된 데이터 부터 버퍼에 채움
latest 채우기 위해, 앞에 대기 중인 데이터 다 버림Buffer 전략
버퍼 : 입출력을 수행하는 장치들간의 속도 차이를 조절하기 위해 입출력 장치 중간에 위치햇 데이터를 어느정도 쌓아 두었다가 전송하는 것
backpressure Buffer 전략도 이와 비슷
버퍼가 가득차면
- 버퍼 내의 데이터를 폐기하는 전략
- 버퍼가 가득차면 에러를 발생시키는 전략
둘 다 지원합니다
그 중 버퍼 내의 데이터를 폐기하는 전략을 살펴봅니다
BUFFER DROP_LATEST 전략
step 2 : 버퍼가 1~10으로 가득 찼습니다.
step 3 : 11 이 버퍼안으로 들어와서 overflow 가 발생합니다.
step 4 : overflow 를 일으킨 11이 drop 됩니다
onBackpressureBuffer DROP_LATEST 예제
public static void main(String[] args) throws InterruptedException { Flux .interval(Duration.ofMillis(300L)) .doOnNext(data -> log.info("# emitted by original Flux: {}", data)) .onBackpressureBuffer(2, dropped -> log.info("** Overflow & Dropped: {} **", dropped), BufferOverflowStrategy.DROP_LATEST) .doOnNext(data -> log.info("[ # emitted by Buffer: {} ]", data)) .publishOn(Schedulers.parallel(), false, 1) .subscribe(data -> { try { Thread.sleep(1000L); } catch (InterruptedException e) {} log.info("# onNext: {}", data); }, error -> log.error("# onError", error)); Thread.sleep(2500L); } // 결과 > Task :Example8_5.main() 14:42:03.221 [main] DEBUG- Using Slf4j logging framework 14:42:03.543 [parallel-2] INFO - # emitted by original Flux: 0 14:42:03.547 [parallel-2] INFO - [ # emitted by Buffer: 0 ] 14:42:03.843 [parallel-2] INFO - # emitted by original Flux: 1 14:42:04.142 [parallel-2] INFO - # emitted by original Flux: 2 14:42:04.443 [parallel-2] INFO - # emitted by original Flux: 3 14:42:04.444 [parallel-2] INFO - ** Overflow & Dropped: 3 ** 14:42:04.548 [parallel-1] INFO - # onNext: 0 14:42:04.549 [parallel-1] INFO - [ # emitted by Buffer: 1 ] 14:42:04.746 [parallel-2] INFO - # emitted by original Flux: 4 14:42:05.044 [parallel-2] INFO - # emitted by original Flux: 5 14:42:05.044 [parallel-2] INFO - ** Overflow & Dropped: 5 ** 14:42:05.344 [parallel-2] INFO - # emitted by original Flux: 6 14:42:05.344 [parallel-2] INFO - ** Overflow & Dropped: 6 ** 14:42:05.553 [parallel-1] INFO - # onNext: 1 14:42:05.553 [parallel-1] INFO - [ # emitted by Buffer: 2 ] 14:42:05.644 [parallel-2] INFO - # emitted by original Flux: 7
01 .interval()로 0.3초 간격으로 데이터를 emit
02 .doOnNext(data -> log.info("# emitted by original Flux: {}", data)) 로 원본Flux 즉 interval() operator 에서 생성된 원본 데이터가 emit 되는 과정 확인 가능
03 .onBackpressureBuffer(maxsize = 2, DROP_LATEST ) 전략 설정하고,
overflow 발생 시 dropped -> log.info("** Overflow & Dropped: {} **", dropped), 출력
04 .doOnNext(data -> log.info("[ # emitted by Buffer: {} ]", data)) 통해 Buffer에서 downstream 으로 emit 되는 데이터 확인
05 .publishOn(Schedulers.parallel()) 로 병렬 스레드 생성, prefectch 1로 지정
06 .subscribe 에서 Thread.sleep(1초)
07 error -> log.error("# onError", error)); 에러 발생하는지 보려고 error 출력
결과설명
내용이 복잡합니다. 아래처럼 진행할께요
01. study 진행자가 먼저 1차 설명 드립니다.
02. 여러분들이, 코드와 로그를 보며, 재현 해 봅니다.
03. 진행자가 2차로, 다시한 번 설명 드리며, 질문 받습니다.
publisher - buffer - subscriber 이렇게 3개의 상태를 각각 분리해서 생각하세요15:30:12.001 [parallel-2] INFO - # emitted by original Flux: 0
15:30:12.004 [parallel-2] INFO - [ # emitted by Buffer: 0 ]original Flux 가 0을 emit 하고, 잠시 후 buffer 에도 0 이 emit 됨, buffer 는 empty
14:42:03.843 [parallel-2] INFO - # emitted by original Flux: 1
14:42:04.142 [parallel-2] INFO - # emitted by original Flux: 2
14:42:04.443 [parallel-2] INFO - # emitted by original Flux: 3
14:42:04.444 [parallel-2] INFO - ** Overflow & Dropped: 3 **maxsize = 2 인데, 3이 들어올때 overflow 발생하고 3 버려짐, buffer 에는 [1,2 ]
14:42:04.548 [parallel-1] INFO - # onNext: 0Subscriber 가 0 처리
14:42:04.549 [parallel-1] INFO - [ # emitted by Buffer: 1 ]
buffer 에서는 1 emit 됨, buffer 에는 [2] 만 남음
14:42:04.746 [parallel-2] INFO - # emitted by original Flux: 4
14:42:05.044 [parallel-2] INFO - # emitted by original Flux: 5original Flux 에서 4,5 가 추가로 emit 됨
14:42:05.044 [parallel-2] INFO - ** Overflow & Dropped: 5 **buffer 에 [2,4 ] 가 들어있는 상태에 5가 들어와서 overflow 발생하고 버려짐
14:42:05.344 [parallel-2] INFO - # emitted by original Flux: 6
14:42:05.344 [parallel-2] INFO - ** Overflow & Dropped: 6 **buffer 에 [2,4] 가 들어있는 상태에 6이 들어와서 overflow 발생하고 버려짐
14:42:05.553 [parallel-1] INFO - # onNext: 1
Subscriber 가 1 처리
14:42:05.553 [parallel-1] INFO - [ # emitted by Buffer: 2 ]buffer 에서 2가 emit 됨, buffer 에는 [4] 남음
14:42:05.644 [parallel-2] INFO - # emitted by original Flux: 7original Flux 에서 7 emit 되고 종료, buffer 에는 [4, 7]
참고 : prefetch 는 Scheduler 가 생성하는 스레드의 비동기 경계 시점에 미리 보관할 데이터의 개수를 의미하며,
데이터의 요청 개수에 영향을 미칩니다.
BUFFER DROP_OLDEST 전략
Publisher 가 downstream 으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 안에 채워진 데이터 중에서 가장 오래된 데이터를 Drop하여 폐기한 후, 확보된 공간에 emit된 데이터를 채우는 전략입니다.
step 2 : 버퍼가 1~10으로 가득 찼습니다.
step 3 : 11 이 버퍼안으로 들어와서 overflow 가 발생합니다.
step 4 : 버퍼안에서 가장 오래된 1을 drop 하고, 11을 버퍼에 채웁니다.
onBackpressureBuffer DROP_OLDEST 예제
public static void main(String[] args) throws InterruptedException { Flux .interval(Duration.ofMillis(300L)) .doOnNext(data -> log.info("# emitted by original Flux: {}", data)) .onBackpressureBuffer(2, dropped -> log.info("** Overflow & Dropped: {} **", dropped), BufferOverflowStrategy.DROP_OLDEST) .doOnNext(data -> log.info("[ # emitted by Buffer: {} ]", data)) .publishOn(Schedulers.parallel(), false, 1) .subscribe(data -> { try { Thread.sleep(1000L); } catch (InterruptedException e) {} log.info("# onNext: {}", data); }, error -> log.error("# onError", error)); Thread.sleep(2500L); } // 결과 15:51:25.424 [parallel-2] INFO - # emitted by original Flux: 0 15:51:25.428 [parallel-2] INFO - [ # emitted by Buffer: 0 ] 15:51:25.723 [parallel-2] INFO - # emitted by original Flux: 1 15:51:26.028 [parallel-2] INFO - # emitted by original Flux: 2 15:51:26.323 [parallel-2] INFO - # emitted by original Flux: 3 15:51:26.324 [parallel-2] INFO - ** Overflow & Dropped: 1 ** 15:51:26.432 [parallel-1] INFO - # onNext: 0 15:51:26.444 [parallel-1] INFO - [ # emitted by Buffer: 2 ] 15:51:26.625 [parallel-2] INFO - # emitted by original Flux: 4 15:51:26.928 [parallel-2] INFO - # emitted by original Flux: 5 15:51:26.929 [parallel-2] INFO - ** Overflow & Dropped: 3 ** 15:51:27.226 [parallel-2] INFO - # emitted by original Flux: 6 15:51:27.227 [parallel-2] INFO - ** Overflow & Dropped: 4 ** 15:51:27.467 [parallel-1] INFO - # onNext: 2 15:51:27.468 [parallel-1] INFO - [ # emitted by Buffer: 5 ] 15:51:27.528 [parallel-2] INFO - # emitted by original Flux: 7
01 .interval()로 0.3초 간격으로 데이터를 emit
02 .doOnNext(data -> log.info("# emitted by original Flux: {}", data)) 로 원본Flux 즉 interval() operator 에서 생성된 원본 데이터가 emit 되는 과정 확인 가능
03 .onBackpressureBuffer(maxsize = 2, DROP_OLDEST ) 전략 설정하고,
overflow 발생 시 dropped -> log.info("** Overflow & Dropped: {} **", dropped), 출력
04 .doOnNext(data -> log.info("[ # emitted by Buffer: {} ]", data)) 통해 Buffer에서 downstream 으로 emit 되는 데이터 확인
05 .publishOn(Schedulers.parallel()) 로 병렬 스레드 생성, prefectch 1로 지정
06 .subscribe 에서 Thread.sleep(1초)
07 error -> log.error("# onError", error)); 에러 발생하는지 보려고 error 출력
결과
15:51:25.424 [parallel-2] INFO - # emitted by original Flux: 0
15:51:25.428 [parallel-2] INFO - [ # emitted by Buffer: 0 ]original Flux 가 0을 emit 하고, 잠시 후 buffer 에도 0 이 emit 됨, buffer 는 empty
15:51:25.723 [parallel-2] INFO - # emitted by original Flux: 1
15:51:26.028 [parallel-2] INFO - # emitted by original Flux: 2
15:51:26.323 [parallel-2] INFO - # emitted by original Flux: 3
15:51:26.324 [parallel-2] INFO - ** Overflow & Dropped: 1 **maxsize = 2 인데, 3이 들어올때 overflow 발생하고 오래된 1 버려짐, buffer 에는 [2,3 ]
15:51:26.432 [parallel-1] INFO - # onNext: 0Subscriber 가 0 처리
15:51:26.444 [parallel-1] INFO - [ # emitted by Buffer: 2 ]buffer 에서 2가 emit 됨, buffer 에는 [3] 남음
15:51:26.625 [parallel-2] INFO - # emitted by original Flux: 4
15:51:26.928 [parallel-2] INFO - # emitted by original Flux: 5original Flux 에서 4,5 가 추가로 emit 됨
15:51:26.929 [parallel-2] INFO - ** Overflow & Dropped: 3 **buffer 에 [3] 인 상태에서, 4, 5가 들어오고 overflow 발생, 오래된 3이 버려짐, buffer 에 [4,5] 남음
15:51:27.226 [parallel-2] INFO - # emitted by original Flux: 615:51:27.227 [parallel-2] INFO - ** Overflow & Dropped: 4 **
buffer 에 [4,5] 남은 상태에서 6이 들어와 overflow 발생, 오래된 4가 버려짐, buffer 에 [5,6 ] 남음
15:51:27.467 [parallel-1] INFO - # onNext: 2Subscriber 가 2 처리
15:51:27.468 [parallel-1] INFO - [ # emitted by Buffer: 5 ]buffer 에서 5 emit 됨, buffer 에 [6] 남음
15:51:27.528 [parallel-2] INFO - # emitted by original Flux: 7original flux 에서 7 emit 됨, buffer 에 [6, 7] 남음
기억하세요
- Backpressure 는 Publisher 가 끊임없이 emit 하는 무수히 많은 데이터를 적절하게 제어하여 데이터 처리에 있어 과부하가 걸리지 않도록 제어하는 데이터 처리 방식이다.
- Reactor 에서 지원하는 Backpressure 처리 방식에는 데이터 요청 개수를 제어하는 방식, Backpressure 전략(버리는 전략)을 사용하는 방식등이 있다.
- Backpressure ignore 전략은 backpressure 를 적용하지 않는 전략이다.
- backpressure error 전략은 downstream 의 데이터 처리 속도가 느려서 upstream 의 emit 속도를 따라가지 못할 경우 에러를 발생시키는 전략이다.
- Backpressure drop 전략은 Publisher 가 downstream 으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기 중인 데이터 중에서, 먼저 emit 된 데이터부터 drop하는 전략이다.
- Backpressure latest 전략은 publisher 가 downstream 으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기 중인 데이터 중에서, 가장 최근에(나중에) emit된 데이터부터 버퍼에 채우는 전략이다.
- Backpressure buffer 전략은 버퍼의 데이터를 폐기하지 않고, 버퍼링을 하는 전략. 버퍼가 가득 차면 버퍼 내의 데이터를 폐기하는 전략, 버퍼가 가득 차면 에러를 발생시키는 전략 등으로 구분할 수 있다.
- Backpressure Buffer drop_latest 전략은 Publisher 가 downstream 으로 전달할 데이터가 버퍼에 가득 찰 경우, 가장 최근에(나중에) 버퍼 안에 채워진 데이터를 drop하는 전략이다.
- Backpressure Buffer drop_oldest 전략은 Publisher가 downstream 으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 안에 채워진 데이터 중에서 가장 오래된 데이터를 drop 하는 전략이다.
참고
https://devfunny.tistory.com/914
스프링으로 시작하는 리액티브 스트림즈 책 스터디 8장 backpressure
'Spring > Webflux' 카테고리의 다른 글
10장 Scheduler (0) 2023.05.07 9장 Sinks (0) 2023.05.01 7장 Cold Sequence 와 Hot Sequence (0) 2023.04.23 6장 마블 다이어그램 (1) 2023.04.22 5장 Reactor 개요 (0) 2023.04.22