ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 6장 마블 다이어그램
    Spring/Webflux 2023. 4. 22. 17:10

    Marbel Diagram

    사전학습

    https://brunch.co.kr/@lonnie/20

     

    RxJava, 마블 다이어그램

    마블 다이어그램, Rx를 공부하면 필연적으로 마주하게 되는 도표다. 리액티브 세상에서 일어나는 모든 일들(비동기 데이터 흐름)은 마블 다이어그램으로 시각화된다. 따라서 이 도표를 익히는

    brunch.co.kr

    6.1 마블 다이어그램(Marble Diagram)이란?

     

    6-1 마블 다이어그램의 구성

    2개의 Timeline + 1 operator

    • 1~4  : Publisher 가 emit 하는 타임라인
    • 5 : operator
    • 6 ~ 9 : Downstream 에 가공된 데이터

     

     

     


    6.2 마블 다이어그램으로 Reactor의 Publisher 이해하기

    Mono

    Mono 는 위 그림처럼 하나의 데이터를 emit 합니다.

    public static void main(String[] args) {
        Mono.just("Hello Reactor")
                .subscribe(System.out::println);
    }

     

    public static void main(String[] args) {
        Mono
            .empty()
            .subscribe(
                    none -> System.out.println("# emitted onNext signal"),
                    error -> {},
                    () -> System.out.println("# emitted onComplete signal")
            );
    }
    
    
    실행결과
    13:05:46.132 [main] DEBUG- Using Slf4j logging framework
    # emitted onComplete signal

    empty() : 데이터를 emit 하지 않고 onComplete Signal 을 전송

     

    onNext Signal 

    none -> System.out.println("# emitted onNext signal")

     

    onError Signal
    error -> {},

     

    onComplete Signal
    () -> System.out.println("# emitted onComplete signal")

     

    시간 API 호출 예제

    https://worldtimeapi.org//api/timezone/Asia/Seoul

    public static void main(String[] args) {
        URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
                .host("worldtimeapi.org")
                .port(80)
                .path("/api/timezone/Asia/Seoul")
                .build()
                .encode()
                .toUri();
    
        RestTemplate restTemplate = new RestTemplate();
        HttpHeaders headers = new HttpHeaders();
        headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
    
    
        Mono.just(
                    restTemplate
                            .exchange(worldTimeUri,
                                    HttpMethod.GET,
                                    new HttpEntity<String>(headers),
                                    String.class)
                )
                .map(response -> {
                    DocumentContext jsonContext = JsonPath.parse(response.getBody());
                    String dateTime = jsonContext.read("$.datetime");
                    return dateTime;
                })
                .subscribe(
                        data -> System.out.println("# emitted data: " + data),
                        error -> {
                            System.out.println(error);
                        },
                        () -> System.out.println("# emitted onComplete signal")
                );
    }
    
    // 실행결과
    14:03:26.654 [main] DEBUG- HTTP GET http://worldtimeapi.org:80/api/timezone/Asia/Seoul
    14:03:26.666 [main] DEBUG- Accept=[text/plain, application/json, application/*+json, */*]
    14:03:27.141 [main] DEBUG- Response 200 OK
    14:03:27.143 [main] DEBUG- Reading to [java.lang.String] as "application/json;charset=utf-8"
    14:03:27.228 [main] DEBUG- Evaluating path: $['datetime']
    # emitted data: 2023-04-23T14:03:27.048628+09:00
    # emitted onComplete signal

     

    Flux

    Flux 는 복수 N 개의 데이터 emit

     

    public static void main(String[] args) {
        Flux.just(6, 9, 13)
                .map(num -> num % 2)
                .subscribe(System.out::println);
    }
    
    // 결과
    0
    1
    1

    6, 9, 13   3개의 데이터를 emit 하고

    각각 %2 가공 하여

    결과 출력

     

    public static void main(String[] args) {
        Flux.fromArray(new Integer[]{3, 6, 7, 9})
                .filter(num -> num > 6)
                .map(num -> num * 2)
                .subscribe(System.out::println);
    }
    
    //결과
    14
    18

     

     

    Mono + Mono -> Flux

    concatwith 는 데이터를 연결하는 것이 아니라 두개의 데이터 소스를 연결해서 하나의 데이터 소스를 만든다

    2개의 데이터 소스만 연결 가능

    public static void main(String[] args) {
        Flux<String> flux =
                Mono.justOrEmpty("Steve")
                        .concatWith(Mono.justOrEmpty("Jobs"));
        flux.subscribe(System.out::println);
    }
    
    // 결과
    Steve
    Jobs

     

     

    concat()

    여러 개의 데이터 소스를 원하는 만큼 연결 할 수 있다

    태양계 행성을 하나의 List 에 포함시켜 Subscriber 에 전달하는 예제

    public static void main(String[] args) {
        // quiz 1
        Flux.concat(
                        Flux.just("Mercury", "Venus", "Earth"),
                        Flux.just("Mars", "Jupiter", "Saturn"),
                        Flux.just("Uranus", "Neptune", "Pluto"))
                .collectList()
                .subscribe(planets -> System.out.println(planets));
    }
    
    // 결과
    > Task :Example6_7.main()
    16:10:18.441 [main] DEBUG- Using Slf4j logging framework
    [Mercury, Venus, Earth, Mars, Jupiter, Saturn, Uranus, Neptune, Pluto]

     

    Quiz 1

    Flux.concat() operator 에서 리턴하는 Publisher 는 Mono 일까요? Flux 일까요?

     

    Quiz 2

    .collectList() operator 에서 리턴하는 Publisher 는 Mono 일까요? Flux 일까요?

     

    Quiz 3

    .subscriber 에서 최종 출력되는 데이터의 형태는 무엇일까요?

     

    Answer 1 : Flux

    Answer 2 : 리턴 타입이 List 이므로 Mono

    Answer 3 : List

     

     

     

     

    'Spring > Webflux' 카테고리의 다른 글

    8장 backpressure  (0) 2023.04.23
    7장 Cold Sequence 와 Hot Sequence  (0) 2023.04.23
    5장 Reactor 개요  (0) 2023.04.22
    4장 리액티브 프로그래밍을 위한 사전 지식  (0) 2023.04.22
    3장 Blocking I/O 와 Non-Blocking I/O  (0) 2023.04.21

    댓글

Designed by Tistory.