[Java] Reactive Stream 이란?
reactive stream이란 non-blocking(넌블럭킹) backPressure(역압)을 이용하여 비동기 서비스를 할 때 기본이 되는 스펙입니다. java의 RxJava, Spring5 Webflux의 Core에 있는 ProjectReactor 프로젝트 모두 해당 스펙을 따르고 있습니다. 또한 Java9에 추가된 Flow 역시 reactvie stream 스펙을 채택하여 사용하고 있습니다. 따라서 비동기 프로젝트를 잘 이해하기 위해서는 기본 스펙이 되는 Reactive Stream에 대해서 이해가 필요합니다. 이 스펙은 다행스럽게도 공개된 사이트가 있습니다. 원 페이지 주소는 참조에 두도록 하겠습니다.
오늘은 Reactive Stream 스펙에 대해서 한번 알아보는 시간을 가져보겠습니다.
개요
The purpose of Reactive Streams is to provide a standard for asynchronous stream processing with non-blocking backpressure.
Reactive Stream 스펙 제일 위에 나오는 문구입니다. Reactive Stream의 목적이 명확하게 나와있습니다. 해석해보면 "Reactie Stream의 목적은 non-blocking backpressure를 이용하여 비동기 스트림 처리의 표준을 제공하는 것이다." 라고 되어있습니다.
Goals, Design and Scope
계속적으로 들어오는 스트림 데이터를 효율적으로 처리하기 위해서는 비동기 시스템이 효과적입니다. 비동기 처리를 하면서 가장 중요한 문제는 데이터 처리가 목적지의 리소스 소비를 예측가능한 범위에서 신중하게 제어할 수 있어야 하는 것입니다. 비동기는 네트워크를 통한 서버간의 협업 또는 단일 서버에서 컴퓨팅 리소스를 동시에 사용할 때 주로 사용됩니다.
Reactive Stream의 주된 목적은 비동기의 경계를 명확히하여 스트림 데이터의 교환을 효과적으로 관리하는것에 있습니다. 즉, 비동기로 데이터를 처리하는 시스템에 어느정도의 data가 들어올 지 예측가능하도록 하는것입니다. Reactive Stream에서는 BackPressure이 이를 달성할 수 있게 해주는 중요한 부분입니다.
요약하면, Reactive Steam은 다음과 같은 스트림 지향 라이브러리에 대한 표준 및 사양입니다.
- 잠재적으로 무한한 숫자의 데이터 처리
- 순서대로 처리
- 컴포넌트간에 데이터를 비동기적으로 전달
- backpressure를 이용한 데이터 흐름제어
BackPressure (배압)
Reactive Stream의 목적을 보면 bacpressure를 이용하여 비동기 스트림의 표준을 제공한다고 되어있습니다. 여기서 backpressure의 정의를 한번 더 알아보도록 하겠습니다. 리액티드 선언문의 용어집을 보면 아래와 같이 BackPressure에 대해서 설명합니다.
한 컴포넌트가 부하를 이겨내기 힘들 때, 시스템 전체가 합리적인 방법으로 대응해야 한다. 과부하 상태의 컴포넌트에서 치명적인 장애가 발생하거나 제어 없이 메시지를 유실해서는 안 된다. 컴포넌트가 대처할 수 없고 장애가 발생해선 안 되기 때문에 컴포넌트는 상류 컴포넌트들에 자신이 과부하 상태라는 것을 알려 부하를 줄이도록 해야 한다. 이러한 배압은 시스템이 부하로 인해 무너지지 않고 정상적으로 응답할 수 있게 하는 중요한 피드백 방법이다. 배압은 사용자에게까지 전달되어 응답성이 떨어질 수 있지만, 이 메커니즘은 부하에 대한 시스템의 복원력을 보장하고 시스템 자체가 부하를 분산할 다른 자원을 제공할 수 있는지 정보를 제공할 것이다.
API Componenets
Reactive Stream API의 구성요소는 아래와 같습니다.
- Publisher
- Subscriber
- Subscription
- Processor
Publisher는 무한한 data를 제공합니다. 제공되어진 data는 Subscriber가 구독하는 형식으로 처리됩니다. Publisher.subscribe(Subscriber)
의 형식으로 data 제공자와 구독자가 연결을 맺게 됩니다. 그리고 호출되는 순서는 아래와 같습니다.
onSubscribe onNext* (onError | onComplete)?
onSubscribe는 Publisher가 생산하는 data를 Subscriber가 항상 신호를 받을 준비가 되어있다는 의미이며, onNext로 데이터를 수신합니다. 그리고 실패가 있으경우에는 onError 신호, 더 이상 사용할 수 있는 신호가 없을 경우 onComplete 신호를 호출합니다. 이는 Subscription(구독)이 취소될때까지 지속됩니다.
명세서
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
Publisher(생산자)는 Subscriber(구독자)를 받아들이는 메서드를 가집니다.
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
Subscriber(구독자)는 Subscription을 등록하고 Subscription에서 오는 신호에 따라서 동작합니다. 구독자에게 오는 신호로는 onNext
, onError
, onComplete
가 있습니다.
public interface Subscription {
public void request(long n);
public void cancel();
}
Subscription은 Publisher와 Subscriber 사이에서 중계하는 역할을 합니다. request 메서드는 Subscriber가 Publisher에게 데이터를 요청하는 갯수이며 cancel은 구독을 취소하겠다는 의미입니다.
위 interface를 토대로 아래 와같은 flow를 만들 수 있습니다.
- Publisher에 본인이 소유할 Subscription을 구현하고 publishing할 data를 만듭니다.
- Publisher는
subscribe()
메서드를 통해 subscriber를 등록합니다. - Subscriber는
onSubscribe()
메서드를 통해 Subscription을 등록하고 Publisher를 구독하기 시작합니다. 이는 Publisher에 구현된 Subscription을 통해 이루어집니다. 이렇게 하면 Publisher와 Subscriber는 Subscription을 통해 연결된 상태가 됩니다.onSubscribe()
내부에 Subscription의request()
를 요청하면 그때부터 data 구독이 시작됩니다. - Suscriber는 Subscription 메서드의
request()
또는cancel()
을 호출을 통해 data의 흐름을 제어할 수 있습니다. - Subscription의
request()
에는 조건에 따라 Subscriber의onNext()
,onComplete()
또는onError()
를 호출합니다. 그러면 Subscriber의 해당 메서드의 로직에 따라request()
또는cancle()
로 제어하게 됩니다.
이 내용은 아래 Link의 예제를 통해 한번은 예제를 보고 이해하는 것을 추천드립니다.
https://grokonez.com/java/java-9-flow-api-example-publisher-and-subscriber
마무리
오늘은 이렇게 Reactive Stream 스펙에 대해서 알아보는 시간을 가졌습니다.
참조
http://www.reactive-streams.org/
https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md