ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 7장 Cold Sequence 와 Hot Sequence
    Spring/Webflux 2023. 4. 23. 16:13

     

    7.1 Cold와 Hot의 의미

    Hot swap : 컴퓨터 전원이 켜져 있는 상태에서 디스크등의 장치를 교체할 경우
    Hot deploy : 서버를 재시작하지 않고서 응용 프로그램의 변경 사항을 적용할 수 있는 기능
    Hot wallet : 인터넷에 연결되어 즉시 사용 가능하지만, 보안에 취약
    -> Hot 무언가 처음부터 다시 시작하지 않고, 같은 작업이 반복되지 않는 느낌

    Cold wallet : 인터넷에 단절되어 사용성은 떨어지지만 보안이 강화
    -> Cold 처음부터 새로 시작해야 하고, 새로 시작하기 때문에 같은 작업이 반복

     

    Cold 는 무언가를 새로 시작하고, Hot 은 무언가를 새로 시작하지 않는다

     

    7.2 Cold Sequence

     

    Cold Sequece : Sequece 가 새로 시작한다 정도로 볼 수 있다

    Subscriber 가 구독할 때마다 데이터 흐름이 처음부터 다시 시작되는 Sequece

    Subscriber A 와 Subscriber B의 구독시점이 다름니다.

    구독 할 때마다 Publisher 가 데이터를 emit 하는 과정을 처음부터 다시 시작하는 데이터 흐름을 Cold Sequence, Cold Publisher 라고 합니다.

    타임라인이 구독을 할 때마다 하나씩 더 생깁니다.

    예) 구독자가 5월부터 잡지를 구독했는데, 1월달 잡지부터 모두 보내주는 경우에 비유할 수 있습니다.

     

    public static void main(String[] args) throws InterruptedException {
    
        Flux<String> coldFlux =
                Flux
                    .fromIterable(Arrays.asList("KOREA", "JAPAN", "CHINESE"))
                    .map(String::toLowerCase);
    
        coldFlux.subscribe(country -> log.info("# Subscriber1: {}", country));
        System.out.println("----------------------------------------------------------------------");
        Thread.sleep(2000L);
        coldFlux.subscribe(country -> log.info("# Subscriber2: {}", country));
    }
    
    // 결과
    > Task :Example7_1.main()
    16:31:26.898 [main] DEBUG- Using Slf4j logging framework
    16:31:26.905 [main] INFO - # Subscriber1: korea
    16:31:26.907 [main] INFO - # Subscriber1: japan
    16:31:26.907 [main] INFO - # Subscriber1: chinese
    ----------------------------------------------------------------------
    16:31:28.908 [main] INFO - # Subscriber2: korea
    16:31:28.908 [main] INFO - # Subscriber2: japan
    16:31:28.909 [main] INFO - # Subscriber2: chinese

    fromIterable() operator 를 사용하여 List 로 전달받은 데이터 소스를 emit 하는 예제

     

    7.3 Hot Sequence

     

    Hot Sequece의 경우 Subscriber가 구독이 발생한 시점 이후에 emit 된 데이터만 전달받을 수 있다.

    Subscriber A 는 1,3,5,7 모두 받음

    Subscriber B 는 5,7 받음

    Subscriber C는 7 받음

     

    뮤직 콘서트에 입장하는 관객의 콘서트 관람 상황을 시뮬레이션

    public static void main(String[] args) throws InterruptedException {
        String[] singers = {"Singer A", "Singer B", "Singer C", "Singer D", "Singer E"};
    
        log.info("# Begin concert:");
        Flux<String> concertFlux =
                Flux
                    .fromArray(singers)
                    .delayElements(Duration.ofSeconds(1))
                    .share();
    
        concertFlux.subscribe(
                singer -> log.info("# Subscriber1 is watching {}'s song", singer)
        );
    
        Thread.sleep(2500);
    
        concertFlux.subscribe(
                singer -> log.info("# Subscriber2 is watching {}'s song", singer)
        );
    
        Thread.sleep(3000);
    }
    
    // result
    > Task :Example7_2.main()
    16:37:57.396 [main] INFO - # Begin concert:
    16:37:57.480 [main] DEBUG- Using Slf4j logging framework
    16:37:58.542 [parallel-1] INFO - # Subscriber1 is watching Singer A's song
    16:37:59.551 [parallel-2] INFO - # Subscriber1 is watching Singer B's song
    16:38:00.556 [parallel-3] INFO - # Subscriber1 is watching Singer C's song
    16:38:00.556 [parallel-3] INFO - # Subscriber2 is watching Singer C's song
    16:38:01.556 [parallel-4] INFO - # Subscriber1 is watching Singer D's song
    16:38:01.557 [parallel-4] INFO - # Subscriber2 is watching Singer D's song
    16:38:02.557 [parallel-5] INFO - # Subscriber1 is watching Singer E's song
    16:38:02.558 [parallel-5] INFO - # Subscriber2 is watching Singer E's song
    
    Subscriber1 Singer A, B, C, D, E
    Subscriber2 Singer C, D, E

     

    delayElements()

    https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#subscribe-java.util.function.Consumer-

     

    share()

    public final Flux<T> share()
    Returns a new Flux that multicasts (shares) the original Flux. As long as there is at least one Subscriber this Flux will be subscribed and emitting data. When all subscribers have cancelled it will cancel the source Flux.

    This is an alias for publish().ConnectableFlux.refCount().

    Returns:a Flux that upon first subscribe causes the source Flux to subscribe once, late subscribers might therefore miss items.

     

     

    https://kau.sh/blog/a-note-about-the-warmth-share-operator/

     

     

    원본 Flux를 멀티캐스트(동시에 여러 사용자에게 보냄)하는 새로운 Flux를 리턴한다

    01. fromArray() 도 Flux 리턴

    02. delayElements() 도 Flux 리턴

    03. share() 역시도 Flux를 리턴

     

    멀티캐스트가 시작되면, Subscriber가 구독하는 시점에 원본 Flux 에서 이미 emit 된 데이터를 전달받을 수 없게된다.

    구독 이후에 emit 된 데이터만 받게 됩니다.

     

    Subscriber1 Singer A, B, C, D, E
    Subscriber2 Singer C, D, E

     

    delayElements() Operator 의 디폴트 스레드 스케줄러가 parallel 이기 때문?

     

     

     

    7.4 HTTP 요청과 응답에서 Cold Sequence와 Hot Sequence의 동작 흐름

     

    2초 sleep 후 시간조회 예제 Mono  사용

    http://worldtimeapi.org:80/api/timezone/Asia/Seoul

    public static void main(String[] args) throws InterruptedException {
        URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
                .host("worldtimeapi.org")
                .port(80)
                .path("/api/timezone/Asia/Seoul")
                .build()
                .encode()
                .toUri();
    
        Mono<String> mono = getWorldTime(worldTimeUri);
        mono.subscribe(dateTime -> log.info("# dateTime 1: {}", dateTime));
        Thread.sleep(2000);
        mono.subscribe(dateTime -> log.info("# dateTime 2: {}", dateTime));
    
        Thread.sleep(2000);
    }
    
    private static Mono<String> getWorldTime(URI worldTimeUri) {
        return WebClient.create()
                .get()
                .uri(worldTimeUri)
                .retrieve()
                .bodyToMono(String.class)
                .map(response -> {
                    DocumentContext jsonContext = JsonPath.parse(response);
                    String dateTime = jsonContext.read("$.datetime");
                    return dateTime;
                });
    }
    
    // 결과 2초 차이
    18:18:02.851 [reactor-http-nio-2] INFO - # dateTime 1: 2023-04-23T18:18:02.633322+09:00
    18:18:04.165 [main] DEBUG- [7d3430a7] HTTP GET http://worldtimeapi.org:80/api/timezone/Asia/Seoul
    18:18:04.368 [reactor-http-nio-2] INFO - # dateTime 2: 2023-04-23T18:18:04.221333+09:00

     

    cache 사용

    cache() operator 사용하여 Hot Sequence 로 변경

    Mono<String> mono = getWorldTime(worldTimeUri).cache();

    public static void main(String[] args) throws InterruptedException {
        URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
                .host("worldtimeapi.org")
                .port(80)
                .path("/api/timezone/Asia/Seoul")
                .build()
                .encode()
                .toUri();
    
        Mono<String> mono = getWorldTime(worldTimeUri).cache();
        mono.subscribe(dateTime -> log.info("# dateTime 1: {}", dateTime));
        Thread.sleep(2000);
        mono.subscribe(dateTime -> log.info("# dateTime 2: {}", dateTime));
    
        Thread.sleep(2000);
    }
    
    private static Mono<String> getWorldTime(URI worldTimeUri) {
        return WebClient.create()
                .get()
                .uri(worldTimeUri)
                .retrieve()
                .bodyToMono(String.class)
                .map(response -> {
                    DocumentContext jsonContext = JsonPath.parse(response);
                    String dateTime = jsonContext.read("$.datetime");
                    return dateTime;
                });
    }
    
    // 결과
    
    18:21:36.928 [reactor-http-nio-2] INFO - # dateTime 1: 2023-04-23T18:21:36.699458+09:00
    18:21:38.345 [main] INFO - # dateTime 2: 2023-04-23T18:21:36.699458+09:00

     

    cache

    public final Mono<T> cache()
    Turn this Flux into a  hot source  and cache last emitted signals for further Subscriber. Will retain an unbounded volume of onNext signals. Completion and Error will also be replayed.

    Once the first subscription is made to this Mono, the source is subscribed to and the signal will be cached, indefinitely. This process cannot be cancelled.
    In the face of multiple concurrent subscriptions, this operator ensures that only one subscription is made to the source.

     

     

    이 모노를 핫 소스로 전환하고 마지막으로 방출된 신호를 추가 가입자를 위해 캐시합니다.
    완료 및 오류도 재생됩니다.
    이 모노에 대한 첫 번째 가입이 이루어지면 소스가 가입되고 신호가 무기한 캐시됩니다. 이 프로세스를 취소할 수 없습니다.
    여러 개의 동시 구독이 있는 경우 이 운영자는 소스에 대해 하나의 구독만 수행하도록 합니다

     

    cache 를 사용하는 경우 :  토큰 받아오기

     

     

    참고 : replay

    public final ConnectableFlux<T> replay()
    Turn this Flux into a hot source and cache last emitted signals for further Subscriber.
    Will retain an unbounded amount of onNext signals. Completion and Error will also be replayed.

    'Spring > Webflux' 카테고리의 다른 글

    9장 Sinks  (0) 2023.05.01
    8장 backpressure  (0) 2023.04.23
    6장 마블 다이어그램  (1) 2023.04.22
    5장 Reactor 개요  (0) 2023.04.22
    4장 리액티브 프로그래밍을 위한 사전 지식  (0) 2023.04.22

    댓글

Designed by Tistory.