ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 2장 리액티브 스트림즈
    Spring/Webflux 2023. 4. 19. 09:01

    스프링으로 시작하는 리액티브 프로그래밍

    p39 2.1 리액티브 스트림즈란

    데이터 스트림을 non-blocking 이면서 비동기적인 방식으로 처리하기 위한 리액티브 라이브러리의 표준 사양

     

    p40 2.2 리액티브 스트림즈 구성요소

    publisher : 데이터 생성 통지(발생,게시,방출) 역할
    subscriber : 데이터를 받아서 처리하는 역할 
    subscription : publisher 에 요청할 데이터의 개수를 지정하고 구독을 취소하는 역할을 한다
    processor : publisher 와 subscriber 의 기능을 모두 가지고 있다. 즉, subscriber 로서 다른 publisher 를 구독할 수 있고, publishe 로서 다른 subscriber 가 구독할 수 있다.

     

     

    p40

    publisher / subscriber / subscription 

    출처 : https://jade314.tistory.com/entry/%EB%A6%AC%EC%97%91%ED%8B%B0%EB%B8%8CReactive-%ED%94%84%EB%A1%9C%EA%B7%B8%EB%9E%98%EB%B0%8D-%EA%B8%B0%EB%B3%B8-%EA%B5%AC%EC%84%B1-%EC%9A%94%EC%86%8C

     

    [리액티브 스트림즈(Reactive Streams)] Publisher, Subscriber, Subscription, Processor 4개 인터페이스 소개

    리액티브 스트림즈(Reactive Streams)란? 리액티브 프로그래밍 라이브러리의 표준 사양이다. (github.com/reactive-streams/reactive-streams-jvm/ ) RxJava는 이 Reactive Streams의 인터페이스들을 구현한 구현체임. Reacti

    jade314.tistory.com

     

    p42 왜 subscription.request ?

    01. 각각 다른 스레드에서 비동기적으로 상호 작용
    02. pub 의 통지 속도가 sub 처리 속도보다 더 빠르면, pub에 처리를 기다리는 데이터는 쌓이게 됨 -> 시스템 부하
    - pub이 기다리는 시간 최소화는 위해 subscription.request 개수를 제어

     

    p42 코드로 보는 리액티브 스트림즈 컴포넌트

    publisher

    interface Publisher<T>{
        // 파라미터로 전달받은 subscriber 를 등록하는 역할
        public void subscribe(Subscriber<? super T> s);
    }

    publisher가 subscribe 메서드의 파라미터인 Subscriber를 등록하는 형태로 구독이 이루어 집니다.


    <? super T>
    - 상속관계에 존재하는 클래스만 자료형으로 받고 싶은 경우
    - T : 자식클래스로 고정으로 지정해주고 자식클래스와 연관이 있는 부모클래스는 전부 적용이 된다(매개변수로 허용)

    <? extends T>
    - 상속관계로 이루어진 클래스만 자료형으로 받는다는 의미
    - ? : (자식 클래스) / T : (부모 클래스)로 부모 클래스와 자식 클래스의 임의의 자료형만 적용된다는 의미

     

    p44 Subscriber

    public interface Subscriber<T>{
        public void onSubscribe(Subscription s);
        public void onNext(T t);
        public void onError(Throwable t);
        public void onComplete();
    }

     

    onSubscribed (d: Disposable) : 구독 시작 시점에 어떤 처리를 하는 역할. pub에게 요청할 데이터의 개수를 지정하거나 구독을 해지 

    onSubscribe 메서드의 파라미터로 전달되는 Subscription 객체를 통해서 이루어 집니다
    onNext (item T) : pub 이 통지한 데이터를 처리하는 역할 / 값을 전달할 때 호출하여 값을 넘겨줌
    onError (e: Throwable) : 에러가 발생하면 호출 함
    onComplete() : 가지고 있는 값을 모두 전달하면 호출 함. 정상완료 시 후처리 코드 

     

    Subscription

    public interface Subscription{
        public void request(long n);
        public void cancel();
    }

    req 갯수 또는 구독 취소

     

    p46 2.4 Reactive 용어

    Signal

    신호, Publisher 와 Subscriber 간에 주고받는 상호작용
    - onSubscribe, onNext, onComplete, onError, request, cancel 
    Subscriber 에 정의되지만, 메서드를 실제 호출하는 주체는 Publisher

    - Publisher 이 Subscriber 에 보낸는 Singal 

    Demand 
    수요 요구, Sub 이 Pub 에게 요청하는 데이터, pub 이 sub 에 전달하지 않은 요청한 데이터

    Emit
    pub -> sub 에 전달하는 것을 통지 (발생, 게시, 방출)
    pub 이 emit 하는 singal 중에서 데이터를 전달하기 위한 onNext Signal 을 줄여서 '데이터를 emit 한다'

    Upstream  위쪽 흐름
    Downstream 아래쪽 흐름

    public static void main(String[] args) {
        Flux
                .just(1, 2, 3, 4, 5, 6)
                .filter(n -> n % 2 == 0)
                .map(n -> n * 2)
                .subscribe(System.out::println);
    }

    메서드 체인

    .just()

    .filter()

    .map() 

    반환값은 모두 Flux 타입이라 메서드 체인 가능

    just(1,2,3,4,5,6) 은 pub 역할

     

    just updatestream

    just / filter 만 보면 filter 가 downstream

     

    Sequence

    pub 이 emit 하는 데이터의 연속적인 흐름을 정의해 놓은 것

    Sequence 는 Operator chain 형태

     

    Operator

    연산자  jsut() / filter() / map()

     

    Source

    Data Source, Source Publisher, Source Flux  -> 대부분 최초 또는 original 의 의미로 사용

     

     

    p52 리액티브 스트림즈의 구현 규칙

    pub 주요 규칙

    규칙
    01. pub 이 sub 에게 보내는 onNext signal 의 총 개수는 항상 sub 의 구독을 통해 요청된 데이터의 총 개수보다 더 작거나 같아야 한다
    02. pub 은 요청된 것보다 적은 수의 onNext signal 을 보내고 onComplte 또는 onError 를 호출하여 구독을 종료할 수 있다.
    (무한 스트림 예외)
    03. pub 의 데이터 처리가 실패하면 onError signal 을 보내야 한다
    04. pub 의 데이터 처리가 성공적으로 종료되면 onComplete signal 을 보내야 한다
    05. pub 이 sub 에게 onError 또는 onCompete signal 을 보내는 경우 해당 sub 의 구독은 취소된 것으로 간주되어야 한다
    06. 일단 종료 상태 signal 을 받으면 (onError, onComplete) 더 이상 signal 이 발생되지 않아야 한다
    07. 구독이 취소되면 sub 은 결국 signal 을 받는 것을 중지해야 한다

    sub 주요 규칙

    규칙
    01. Subscriber 은 pub 으로부터 onNext signal 을 수신하기 위해 Subscription.request(n)을 통해 Demand signal 을 pub 에게 보내야 한다
    02. Subscriber.onComplete() 및 Subscriber.onError(Throwable t)는 Subscription 또는 Publisher 의 메서드를 호출해서는 안된다

    왜? 순환 및 경쟁 조건(Race Condition) 을 방지
    03. Subscriber.onComplete() 및 Subscriber.onError(throwable t)는 Signal을 수신한 후 구독이 취소된 것으로 간주해야 한다
    04. 구독이 더 이상 필요하지 않은 경우 Subscriber는 Subscription.cancel() 을 호출해야 한다
    05. Subscriber.onSubscribe() 는 지정된 Subscriber 에 대해 최대 한 번만 호출되어야 한다

     

    Subscription 구현을 위한 주요 기본 규칙

    규칙
    01. 구독은 Subscriber 가 onNext 또는 onSubscribe 내에서 동기적으로 Subscription.request 를 호출하도록 허용해야 한다
    02. 구독이 취소된 후 추가적으로 호출되는 Subscription.request(n) 은 효력이 없어야 한다
    03. 구독이 취소된 후 추가적으로 호출되는 Subscription.cancel() 은 효력이 없어야 한다
    04. 구독이 취소되지 않은 동안 Subscription.request(n)의 매개변수가 0보다 작거나 같으면 illegalArgumentException과 함께 onError signal 을 보내야 한다
    05. 구독이 취소되지 않은 동안 Subscription.cancel() 은 publisher가 subscriber 에게 보내는 signal 을 결국 중지하도록 요청해야 한다
    06. 구독이 취소되지 않은 동안 Subscription.cancel() 은 Publisher 에게 해당 구독자에 대한 참조를 결국 삭제하도록 요청해야 한다
    07. Subscription.cancel(), Subscription.request() 호출에 대한 응답으로 예외를 던지는 것을 허용하지 않는다
    ? 리액티브 스트림즈에서는 예외를 onError signal로 보내야 한다
    08. 구독은 무제한 수의 request 호출을 지원해야 하고 최대 2^63-1개의 Demand 를 지원해야 한다

     

    p57 2.6 리액티브 스트림즈 구현체

    RxJava

    Reactive Extentions : .net 환경의 리액티브 확장 라이브러리를 넷플릭스에서 Java 언어로 포팅

     

    Project Reactor

    Spring Framework 팀에 의해 주도적으로 개발된 리액티브 스트림즈 구현체

    Spring Framework 5 부터 Reactor 3.x

    WebFlux 용어처럼 Flux 라는 Reactor 의 핵심 구성요소

     

    Akka Streams

    Actor 로 시작해서 Actor 로 끝난다

     

    Java Flow API

    SPI (service Provider Interface) 로서 Java API 에 정의되어 있음

     

     

     

     

     

     

     

     

    'Spring > Webflux' 카테고리의 다른 글

    6장 마블 다이어그램  (1) 2023.04.22
    5장 Reactor 개요  (0) 2023.04.22
    4장 리액티브 프로그래밍을 위한 사전 지식  (0) 2023.04.22
    3장 Blocking I/O 와 Non-Blocking I/O  (0) 2023.04.21
    Spring Webflux 공부하기 1  (0) 2023.04.18

    댓글

Designed by Tistory.