-
7장 Cold Sequence 와 Hot SequenceSpring/Webflux 2023. 4. 23. 16:13
7.1 Cold와 Hot의 의미
Hot swap : 컴퓨터 전원이 켜져 있는 상태에서 디스크등의 장치를 교체할 경우
Hot deploy : 서버를 재시작하지 않고서 응용 프로그램의 변경 사항을 적용할 수 있는 기능
Hot wallet : 인터넷에 연결되어 즉시 사용 가능하지만, 보안에 취약
-> Hot 무언가 처음부터 다시 시작하지 않고, 같은 작업이 반복되지 않는 느낌
Cold wallet : 인터넷에 단절되어 사용성은 떨어지지만 보안이 강화
-> Cold 처음부터 새로 시작해야 하고, 새로 시작하기 때문에 같은 작업이 반복Cold 는 무언가를 새로 시작하고, Hot 은 무언가를 새로 시작하지 않는다
7.2 Cold Sequence
Cold Sequece : Sequece 가 새로 시작한다 정도로 볼 수 있다
Subscriber 가 구독할 때마다 데이터 흐름이 처음부터 다시 시작되는 Sequece
Subscriber A 와 Subscriber B의 구독시점이 다름니다.
구독 할 때마다 Publisher 가 데이터를 emit 하는 과정을 처음부터 다시 시작하는 데이터 흐름을 Cold Sequence, Cold Publisher 라고 합니다.
타임라인이 구독을 할 때마다 하나씩 더 생깁니다.
예) 구독자가 5월부터 잡지를 구독했는데, 1월달 잡지부터 모두 보내주는 경우에 비유할 수 있습니다.
public static void main(String[] args) throws InterruptedException { Flux<String> coldFlux = Flux .fromIterable(Arrays.asList("KOREA", "JAPAN", "CHINESE")) .map(String::toLowerCase); coldFlux.subscribe(country -> log.info("# Subscriber1: {}", country)); System.out.println("----------------------------------------------------------------------"); Thread.sleep(2000L); coldFlux.subscribe(country -> log.info("# Subscriber2: {}", country)); } // 결과 > Task :Example7_1.main() 16:31:26.898 [main] DEBUG- Using Slf4j logging framework 16:31:26.905 [main] INFO - # Subscriber1: korea 16:31:26.907 [main] INFO - # Subscriber1: japan 16:31:26.907 [main] INFO - # Subscriber1: chinese ---------------------------------------------------------------------- 16:31:28.908 [main] INFO - # Subscriber2: korea 16:31:28.908 [main] INFO - # Subscriber2: japan 16:31:28.909 [main] INFO - # Subscriber2: chinese
fromIterable() operator 를 사용하여 List 로 전달받은 데이터 소스를 emit 하는 예제
7.3 Hot Sequence
Hot Sequece의 경우 Subscriber가 구독이 발생한 시점 이후에 emit 된 데이터만 전달받을 수 있다.
Subscriber A 는 1,3,5,7 모두 받음
Subscriber B 는 5,7 받음
Subscriber C는 7 받음
뮤직 콘서트에 입장하는 관객의 콘서트 관람 상황을 시뮬레이션
public static void main(String[] args) throws InterruptedException { String[] singers = {"Singer A", "Singer B", "Singer C", "Singer D", "Singer E"}; log.info("# Begin concert:"); Flux<String> concertFlux = Flux .fromArray(singers) .delayElements(Duration.ofSeconds(1)) .share(); concertFlux.subscribe( singer -> log.info("# Subscriber1 is watching {}'s song", singer) ); Thread.sleep(2500); concertFlux.subscribe( singer -> log.info("# Subscriber2 is watching {}'s song", singer) ); Thread.sleep(3000); } // result > Task :Example7_2.main() 16:37:57.396 [main] INFO - # Begin concert: 16:37:57.480 [main] DEBUG- Using Slf4j logging framework 16:37:58.542 [parallel-1] INFO - # Subscriber1 is watching Singer A's song 16:37:59.551 [parallel-2] INFO - # Subscriber1 is watching Singer B's song 16:38:00.556 [parallel-3] INFO - # Subscriber1 is watching Singer C's song 16:38:00.556 [parallel-3] INFO - # Subscriber2 is watching Singer C's song 16:38:01.556 [parallel-4] INFO - # Subscriber1 is watching Singer D's song 16:38:01.557 [parallel-4] INFO - # Subscriber2 is watching Singer D's song 16:38:02.557 [parallel-5] INFO - # Subscriber1 is watching Singer E's song 16:38:02.558 [parallel-5] INFO - # Subscriber2 is watching Singer E's song Subscriber1 Singer A, B, C, D, E Subscriber2 Singer C, D, E
delayElements()
share()
public final Flux<T> share()
Returns a new Flux that multicasts (shares) the original Flux. As long as there is at least one Subscriber this Flux will be subscribed and emitting data. When all subscribers have cancelled it will cancel the source Flux.This is an alias for publish().ConnectableFlux.refCount().
Returns:a Flux that upon first subscribe causes the source Flux to subscribe once, late subscribers might therefore miss items.
https://kau.sh/blog/a-note-about-the-warmth-share-operator/
원본 Flux를 멀티캐스트(동시에 여러 사용자에게 보냄)하는 새로운 Flux를 리턴한다
01. fromArray() 도 Flux 리턴
02. delayElements() 도 Flux 리턴
03. share() 역시도 Flux를 리턴
멀티캐스트가 시작되면, Subscriber가 구독하는 시점에 원본 Flux 에서 이미 emit 된 데이터를 전달받을 수 없게된다.
구독 이후에 emit 된 데이터만 받게 됩니다.
Subscriber1 Singer A, B, C, D, E
Subscriber2 Singer C, D, EdelayElements() Operator 의 디폴트 스레드 스케줄러가 parallel 이기 때문?
7.4 HTTP 요청과 응답에서 Cold Sequence와 Hot Sequence의 동작 흐름
2초 sleep 후 시간조회 예제 Mono 사용
http://worldtimeapi.org:80/api/timezone/Asia/Seoul
public static void main(String[] args) throws InterruptedException { URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http") .host("worldtimeapi.org") .port(80) .path("/api/timezone/Asia/Seoul") .build() .encode() .toUri(); Mono<String> mono = getWorldTime(worldTimeUri); mono.subscribe(dateTime -> log.info("# dateTime 1: {}", dateTime)); Thread.sleep(2000); mono.subscribe(dateTime -> log.info("# dateTime 2: {}", dateTime)); Thread.sleep(2000); } private static Mono<String> getWorldTime(URI worldTimeUri) { return WebClient.create() .get() .uri(worldTimeUri) .retrieve() .bodyToMono(String.class) .map(response -> { DocumentContext jsonContext = JsonPath.parse(response); String dateTime = jsonContext.read("$.datetime"); return dateTime; }); } // 결과 2초 차이 18:18:02.851 [reactor-http-nio-2] INFO - # dateTime 1: 2023-04-23T18:18:02.633322+09:00 18:18:04.165 [main] DEBUG- [7d3430a7] HTTP GET http://worldtimeapi.org:80/api/timezone/Asia/Seoul 18:18:04.368 [reactor-http-nio-2] INFO - # dateTime 2: 2023-04-23T18:18:04.221333+09:00
cache 사용
cache() operator 사용하여 Hot Sequence 로 변경
Mono<String> mono = getWorldTime(worldTimeUri).cache();
public static void main(String[] args) throws InterruptedException { URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http") .host("worldtimeapi.org") .port(80) .path("/api/timezone/Asia/Seoul") .build() .encode() .toUri(); Mono<String> mono = getWorldTime(worldTimeUri).cache(); mono.subscribe(dateTime -> log.info("# dateTime 1: {}", dateTime)); Thread.sleep(2000); mono.subscribe(dateTime -> log.info("# dateTime 2: {}", dateTime)); Thread.sleep(2000); } private static Mono<String> getWorldTime(URI worldTimeUri) { return WebClient.create() .get() .uri(worldTimeUri) .retrieve() .bodyToMono(String.class) .map(response -> { DocumentContext jsonContext = JsonPath.parse(response); String dateTime = jsonContext.read("$.datetime"); return dateTime; }); } // 결과 18:21:36.928 [reactor-http-nio-2] INFO - # dateTime 1: 2023-04-23T18:21:36.699458+09:00 18:21:38.345 [main] INFO - # dateTime 2: 2023-04-23T18:21:36.699458+09:00
cache
public final Mono<T> cache()
Turn this Flux into a hot source and cache last emitted signals for further Subscriber. Will retain an unbounded volume of onNext signals. Completion and Error will also be replayed.Once the first subscription is made to this Mono, the source is subscribed to and the signal will be cached, indefinitely. This process cannot be cancelled.
In the face of multiple concurrent subscriptions, this operator ensures that only one subscription is made to the source.이 모노를 핫 소스로 전환하고 마지막으로 방출된 신호를 추가 가입자를 위해 캐시합니다.
완료 및 오류도 재생됩니다.
이 모노에 대한 첫 번째 가입이 이루어지면 소스가 가입되고 신호가 무기한 캐시됩니다. 이 프로세스를 취소할 수 없습니다.
여러 개의 동시 구독이 있는 경우 이 운영자는 소스에 대해 하나의 구독만 수행하도록 합니다cache 를 사용하는 경우 : 토큰 받아오기
참고 : replay
public final ConnectableFlux<T> replay()
Will retain an unbounded amount of onNext signals. Completion and Error will also be replayed.'Spring > Webflux' 카테고리의 다른 글
9장 Sinks (0) 2023.05.01 8장 backpressure (0) 2023.04.23 6장 마블 다이어그램 (1) 2023.04.22 5장 Reactor 개요 (0) 2023.04.22 4장 리액티브 프로그래밍을 위한 사전 지식 (0) 2023.04.22