-
2장 리액티브 스트림즈Spring/Webflux 2023. 4. 19. 09:01
스프링으로 시작하는 리액티브 프로그래밍
p39 2.1 리액티브 스트림즈란
데이터 스트림을 non-blocking 이면서 비동기적인 방식으로 처리하기 위한 리액티브 라이브러리의 표준 사양
p40 2.2 리액티브 스트림즈 구성요소
publisher : 데이터 생성 통지(발생,게시,방출) 역할
subscriber : 데이터를 받아서 처리하는 역할
subscription : publisher 에 요청할 데이터의 개수를 지정하고 구독을 취소하는 역할을 한다
processor : publisher 와 subscriber 의 기능을 모두 가지고 있다. 즉, subscriber 로서 다른 publisher 를 구독할 수 있고, publishe 로서 다른 subscriber 가 구독할 수 있다.p40
publisher / subscriber / subscription
[리액티브 스트림즈(Reactive Streams)] Publisher, Subscriber, Subscription, Processor 4개 인터페이스 소개
리액티브 스트림즈(Reactive Streams)란? 리액티브 프로그래밍 라이브러리의 표준 사양이다. (github.com/reactive-streams/reactive-streams-jvm/ ) RxJava는 이 Reactive Streams의 인터페이스들을 구현한 구현체임. Reacti
jade314.tistory.com
p42 왜 subscription.request ?
01. 각각 다른 스레드에서 비동기적으로 상호 작용
02. pub 의 통지 속도가 sub 처리 속도보다 더 빠르면, pub에 처리를 기다리는 데이터는 쌓이게 됨 -> 시스템 부하
- pub이 기다리는 시간 최소화는 위해 subscription.request 개수를 제어p42 코드로 보는 리액티브 스트림즈 컴포넌트
publisher
interface Publisher<T>{ // 파라미터로 전달받은 subscriber 를 등록하는 역할 public void subscribe(Subscriber<? super T> s); }
publisher가 subscribe 메서드의 파라미터인 Subscriber를 등록하는 형태로 구독이 이루어 집니다.
<? super T>
- 상속관계에 존재하는 클래스만 자료형으로 받고 싶은 경우
- T : 자식클래스로 고정으로 지정해주고 자식클래스와 연관이 있는 부모클래스는 전부 적용이 된다(매개변수로 허용)
<? extends T>
- 상속관계로 이루어진 클래스만 자료형으로 받는다는 의미
- ? : (자식 클래스) / T : (부모 클래스)로 부모 클래스와 자식 클래스의 임의의 자료형만 적용된다는 의미p44 Subscriber
public interface Subscriber<T>{ public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }
onSubscribed (d: Disposable) : 구독 시작 시점에 어떤 처리를 하는 역할. pub에게 요청할 데이터의 개수를 지정하거나 구독을 해지
onSubscribe 메서드의 파라미터로 전달되는 Subscription 객체를 통해서 이루어 집니다
onNext (item T) : pub 이 통지한 데이터를 처리하는 역할 / 값을 전달할 때 호출하여 값을 넘겨줌
onError (e: Throwable) : 에러가 발생하면 호출 함
onComplete() : 가지고 있는 값을 모두 전달하면 호출 함. 정상완료 시 후처리 코드Subscription
public interface Subscription{ public void request(long n); public void cancel(); }
req 갯수 또는 구독 취소
p46 2.4 Reactive 용어
Signal
신호, Publisher 와 Subscriber 간에 주고받는 상호작용
- onSubscribe, onNext, onComplete, onError, request, cancel
Subscriber 에 정의되지만, 메서드를 실제 호출하는 주체는 Publisher- Publisher 이 Subscriber 에 보낸는 Singal
Demand
수요 요구, Sub 이 Pub 에게 요청하는 데이터, pub 이 sub 에 전달하지 않은 요청한 데이터
Emit
pub -> sub 에 전달하는 것을 통지 (발생, 게시, 방출)
pub 이 emit 하는 singal 중에서 데이터를 전달하기 위한 onNext Signal 을 줄여서 '데이터를 emit 한다'
Upstream 위쪽 흐름
Downstream 아래쪽 흐름public static void main(String[] args) { Flux .just(1, 2, 3, 4, 5, 6) .filter(n -> n % 2 == 0) .map(n -> n * 2) .subscribe(System.out::println); }
메서드 체인
.just()
.filter()
.map()
반환값은 모두 Flux 타입이라 메서드 체인 가능
just(1,2,3,4,5,6) 은 pub 역할
just updatestream
just / filter 만 보면 filter 가 downstream
Sequence
pub 이 emit 하는 데이터의 연속적인 흐름을 정의해 놓은 것
Sequence 는 Operator chain 형태
Operator
연산자 jsut() / filter() / map()
Source
Data Source, Source Publisher, Source Flux -> 대부분 최초 또는 original 의 의미로 사용
p52 리액티브 스트림즈의 구현 규칙
pub 주요 규칙
규칙 01. pub 이 sub 에게 보내는 onNext signal 의 총 개수는 항상 sub 의 구독을 통해 요청된 데이터의 총 개수보다 더 작거나 같아야 한다 02. pub 은 요청된 것보다 적은 수의 onNext signal 을 보내고 onComplte 또는 onError 를 호출하여 구독을 종료할 수 있다.
(무한 스트림 예외)03. pub 의 데이터 처리가 실패하면 onError signal 을 보내야 한다 04. pub 의 데이터 처리가 성공적으로 종료되면 onComplete signal 을 보내야 한다 05. pub 이 sub 에게 onError 또는 onCompete signal 을 보내는 경우 해당 sub 의 구독은 취소된 것으로 간주되어야 한다 06. 일단 종료 상태 signal 을 받으면 (onError, onComplete) 더 이상 signal 이 발생되지 않아야 한다 07. 구독이 취소되면 sub 은 결국 signal 을 받는 것을 중지해야 한다 sub 주요 규칙
규칙 01. Subscriber 은 pub 으로부터 onNext signal 을 수신하기 위해 Subscription.request(n)을 통해 Demand signal 을 pub 에게 보내야 한다 02. Subscriber.onComplete() 및 Subscriber.onError(Throwable t)는 Subscription 또는 Publisher 의 메서드를 호출해서는 안된다
왜? 순환 및 경쟁 조건(Race Condition) 을 방지03. Subscriber.onComplete() 및 Subscriber.onError(throwable t)는 Signal을 수신한 후 구독이 취소된 것으로 간주해야 한다 04. 구독이 더 이상 필요하지 않은 경우 Subscriber는 Subscription.cancel() 을 호출해야 한다 05. Subscriber.onSubscribe() 는 지정된 Subscriber 에 대해 최대 한 번만 호출되어야 한다 Subscription 구현을 위한 주요 기본 규칙
규칙 01. 구독은 Subscriber 가 onNext 또는 onSubscribe 내에서 동기적으로 Subscription.request 를 호출하도록 허용해야 한다 02. 구독이 취소된 후 추가적으로 호출되는 Subscription.request(n) 은 효력이 없어야 한다 03. 구독이 취소된 후 추가적으로 호출되는 Subscription.cancel() 은 효력이 없어야 한다 04. 구독이 취소되지 않은 동안 Subscription.request(n)의 매개변수가 0보다 작거나 같으면 illegalArgumentException과 함께 onError signal 을 보내야 한다 05. 구독이 취소되지 않은 동안 Subscription.cancel() 은 publisher가 subscriber 에게 보내는 signal 을 결국 중지하도록 요청해야 한다 06. 구독이 취소되지 않은 동안 Subscription.cancel() 은 Publisher 에게 해당 구독자에 대한 참조를 결국 삭제하도록 요청해야 한다 07. Subscription.cancel(), Subscription.request() 호출에 대한 응답으로 예외를 던지는 것을 허용하지 않는다
? 리액티브 스트림즈에서는 예외를 onError signal로 보내야 한다08. 구독은 무제한 수의 request 호출을 지원해야 하고 최대 2^63-1개의 Demand 를 지원해야 한다 p57 2.6 리액티브 스트림즈 구현체
RxJava
Reactive Extentions : .net 환경의 리액티브 확장 라이브러리를 넷플릭스에서 Java 언어로 포팅
Project Reactor
Spring Framework 팀에 의해 주도적으로 개발된 리액티브 스트림즈 구현체
Spring Framework 5 부터 Reactor 3.x
WebFlux 용어처럼 Flux 라는 Reactor 의 핵심 구성요소
Akka Streams
Actor 로 시작해서 Actor 로 끝난다
Java Flow API
SPI (service Provider Interface) 로서 Java API 에 정의되어 있음
'Spring > Webflux' 카테고리의 다른 글
6장 마블 다이어그램 (1) 2023.04.22 5장 Reactor 개요 (0) 2023.04.22 4장 리액티브 프로그래밍을 위한 사전 지식 (0) 2023.04.22 3장 Blocking I/O 와 Non-Blocking I/O (0) 2023.04.21 Spring Webflux 공부하기 1 (0) 2023.04.18