[Java9] Reactive Stream Flow - Processor 실습
안녕하세요. 우리는 이전 포스팅에서 Reactive Stream에 대해서 알아봤었습니다. Reactive Stream의 스펙으로 Publisher, Subscriber, Subscription을 소개해드렸었습니다. 하지만 저희가 알아보지 않은 스펙이 하나 더 존재합니다. 바로 Processor Interface 입니다. 오늘은 이 Interface에 대해서 간단하게 알아가보는 시간을 가지도록 하겠습니다.
개요
Processor는 리액티브 스트림에서 처리하는 이벤트의 변환 단계입니다. 예를 들어 Subscriber가 여러개 있을 때 하나의 Processor가 에러를 수신하면 이로부터 회복하거나 즉시 onError 신호를 모든 Subscriber에게 에러를 전파할 수 있습니다. 또한 Producer가 생성한 데이터를 Subscriber가 구독할 때는 다른 형식, 변형된 데이터로 일관되게 수신도 가능합니다. Processor는 Producer와 Subscriber 사이에 위치하며 아래의 경우 동작하게 됩니다.
- Subscriber가 Publisher에게 request 전달
- Publisher가 Subscriber에게 data 전달
Processor Interface는 아래와 같습니다.
/**
* A component that acts as both a Subscriber and Publisher.
*
* @param <T> the subscribed item type
* @param <R> the published item type
*/
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {}
예제
그렇다면 이제 Processor를 직접 구현해 보도록 하겠습니다. 아래 예제는 이전 포스팅 [Java9] Reactive Stream Flow - 실습편의 예제에 추가로 진행됩니다. 따라서 해당 포스팅을 먼저 읽어보시지 않으셨다면 이해가 안되는 부분이 분명히 있으실 것으로 먼저 읽어주시기 바랍니다.
Processor를 구현하여 MyProcessor를 제작하였습니다. 해당 Processor 역시 모던 자바 인 액션에서 인용하였습니다. MyProcessor는 Subscriber를 맴버변수로 가지고 있습니다. 그리고 Processor가 Subscriber와 Publisher Interface를 상속받고 있기 때문에 Publisher의 subscribe
, Subscriber의 onSubscribe
, onNext
, onError
, onComplete
를 모두 구현해야합니다.
아래 구현을 보면 MyProcessor의 onNext
에서 onNext가 호출되면 temp를 변경하는 것을 보실 수 있습니다.
public class MyProcessor implements Flow.Processor<TempInfo, TempInfo> {
private Flow.Subscriber<? super TempInfo> subscriber;
@Override
public void subscribe(Flow.Subscriber<? super TempInfo> subscriber) {
this.subscriber = subscriber;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscriber.onSubscribe(subscription);
}
@Override
public void onNext(TempInfo item) {
subscriber.onNext(new TempInfo(item.getTown() + ", 화씨 온도", (item.getTemp() - 32) * 5 / 9));
}
@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
}
이렇게 제작한 Processor는 Publisher에서 Subscription을 등록할 때, 사용할 수 있습니다. Processor는 Subscriber를 상속받고 있기 때문에 MySubscription 인스턴스를 만들 때 Subscriber가 들어갈 자리에 Processor를 생성자 파라미터로 넣어도 문제는 없습니다.
public class MyPublisher implements Flow.Publisher<TempInfo> {
private String town;
public MyPublisher(String town) {
this.town = town;
}
@Override
public void subscribe(Flow.Subscriber<? super TempInfo> subscriber) {
MyProcessor myProcessor = new MyProcessor();
myProcessor.subscribe(subscriber);
subscriber.onSubscribe(new MySubscription(myProcessor, town));
}
}
Proceesor를 등록 한 후 테스트를 돌려보도록 하겠습니다. 테스트 코드는 아래와 같으며 결과는 그 아래에 나와있습니다. 우리가 onNext
에 넣은 변환 코드가 정상적으로 적용되어 있는것을 확인할 수 있었습니다.
@org.junit.jupiter.api.Test
public void test2() throws InterruptedException {
MyPublisher myPublisher = new MyPublisher("Seoul");
MySubscriber subscriber_1 = new MySubscriber();
MySubscriber subscriber_2 = new MySubscriber();
myPublisher.subscribe(subscriber_1);
myPublisher.subscribe(subscriber_2);
}
TempInfo {town='Seoul, 화씨 온도', temp=-10} thread : Test worker
TempInfo {town='Seoul, 화씨 온도', temp=30} thread : Test worker
TempInfo {town='Seoul, 화씨 온도', temp=-11} thread : Test worker
TempInfo {town='Seoul, 화씨 온도', temp=-11} thread : Test worker
TempInfo {town='Seoul, 화씨 온도', temp=2} thread : Test worker
TempInfo {town='Seoul, 화씨 온도', temp=17} thread : Test worker
TempInfo {town='Seoul, 화씨 온도', temp=-2} thread : Test worker
throwable.getMessage() = error [Test worker]
TempInfo {town='Seoul, 화씨 온도', temp=-15} thread : Test worker
TempInfo {town='Seoul, 화씨 온도', temp=7} thread : Test worker
TempInfo {town='Seoul, 화씨 온도', temp=-10} thread : Test worker
throwable.getMessage() = error [Test worker]
마무리
이전 포스팅과 오늘 포스팅은 Reactive Stream의 Spec을 구현해보는 시간을 가져봤습니다.
다음 시간에는 또 다른 비동기의 이야기로 찾아뵙겠습니다.
감사합니다.
참조
https://grokonez.com/java/java-9-flow-api-example-processor
모던 자바 인 액션 (17장, 리액티브 프로그래밍)