본문 바로가기
프로그래밍/Java

[Java9] Reactive Stream Flow - Processor 실습

by 차곡차곡 사바라다 2020. 7. 17.

안녕하세요. 우리는 이전 포스팅에서 Reactive Stream에 대해서 알아봤었습니다. Reactive Stream의 스펙으로 Publisher, Subscriber, Subscription을 소개해드렸었습니다. 하지만 저희가 알아보지 않은 스펙이 하나 더 존재합니다. 바로 Processor Interface 입니다. 오늘은 이 Interface에 대해서 간단하게 알아가보는 시간을 가지도록 하겠습니다.

개요

Processor는 리액티브 스트림에서 처리하는 이벤트의 변환 단계입니다. 예를 들어 Subscriber가 여러개 있을 때 하나의 Processor가 에러를 수신하면 이로부터 회복하거나 즉시 onError 신호를 모든 Subscriber에게 에러를 전파할 수 있습니다. 또한 Producer가 생성한 데이터를 Subscriber가 구독할 때는 다른 형식, 변형된 데이터로 일관되게 수신도 가능합니다. Processor는 Producer와 Subscriber 사이에 위치하며 아래의 경우 동작하게 됩니다.

  • Subscriber가 Publisher에게 request 전달
  • Publisher가 Subscriber에게 data 전달

Processor Interface는 아래와 같습니다.

/**
* A component that acts as both a Subscriber and Publisher.
*
* @param <T> the subscribed item type
* @param <R> the published item type
*/
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {}

reactive stream의 processor flow, 출처 : https://grokonez.com/java/java-9-flow-api-example-processor

예제

그렇다면 이제 Processor를 직접 구현해 보도록 하겠습니다. 아래 예제는 이전 포스팅 [Java9] Reactive Stream Flow - 실습편의 예제에 추가로 진행됩니다. 따라서 해당 포스팅을 먼저 읽어보시지 않으셨다면 이해가 안되는 부분이 분명히 있으실 것으로 먼저 읽어주시기 바랍니다.

Processor를 구현하여 MyProcessor를 제작하였습니다. 해당 Processor 역시 모던 자바 인 액션에서 인용하였습니다. MyProcessor는 Subscriber를 맴버변수로 가지고 있습니다. 그리고 Processor가 Subscriber와 Publisher Interface를 상속받고 있기 때문에 Publisher의 subscribe, Subscriber의 onSubscribe, onNext, onError, onComplete를 모두 구현해야합니다.

아래 구현을 보면 MyProcessor의 onNext 에서 onNext가 호출되면 temp를 변경하는 것을 보실 수 있습니다.

public class MyProcessor implements Flow.Processor<TempInfo, TempInfo> {

    private Flow.Subscriber<? super TempInfo> subscriber;

    @Override
    public void subscribe(Flow.Subscriber<? super TempInfo> subscriber) {
        this.subscriber = subscriber;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        subscriber.onSubscribe(subscription);
    }

    @Override
    public void onNext(TempInfo item) {
        subscriber.onNext(new TempInfo(item.getTown() + ", 화씨 온도", (item.getTemp() - 32) * 5 / 9));
    }

    @Override
    public void onError(Throwable throwable) {
        subscriber.onError(throwable);
    }

    @Override
    public void onComplete() {
        subscriber.onComplete();
    }
}

이렇게 제작한 Processor는 Publisher에서 Subscription을 등록할 때, 사용할 수 있습니다. Processor는 Subscriber를 상속받고 있기 때문에 MySubscription 인스턴스를 만들 때 Subscriber가 들어갈 자리에 Processor를 생성자 파라미터로 넣어도 문제는 없습니다.

public class MyPublisher implements Flow.Publisher<TempInfo> {

    private String town;

    public MyPublisher(String town) {
        this.town = town;
    }

    @Override
    public void subscribe(Flow.Subscriber<? super TempInfo> subscriber) {
        MyProcessor myProcessor = new MyProcessor();
        myProcessor.subscribe(subscriber);
        subscriber.onSubscribe(new MySubscription(myProcessor, town));
    }
}

Proceesor를 등록 한 후 테스트를 돌려보도록 하겠습니다. 테스트 코드는 아래와 같으며 결과는 그 아래에 나와있습니다. 우리가 onNext에 넣은 변환 코드가 정상적으로 적용되어 있는것을 확인할 수 있었습니다.

@org.junit.jupiter.api.Test
public void test2() throws InterruptedException {
    MyPublisher myPublisher = new MyPublisher("Seoul");

    MySubscriber subscriber_1 = new MySubscriber();
    MySubscriber subscriber_2 = new MySubscriber();

    myPublisher.subscribe(subscriber_1);
    myPublisher.subscribe(subscriber_2);
}
TempInfo {town='Seoul, 화씨 온도', temp=-10} thread : Test worker
TempInfo {town='Seoul, 화씨 온도', temp=30} thread : Test worker
TempInfo {town='Seoul, 화씨 온도', temp=-11} thread : Test worker
TempInfo {town='Seoul, 화씨 온도', temp=-11} thread : Test worker
TempInfo {town='Seoul, 화씨 온도', temp=2} thread : Test worker
TempInfo {town='Seoul, 화씨 온도', temp=17} thread : Test worker
TempInfo {town='Seoul, 화씨 온도', temp=-2} thread : Test worker
throwable.getMessage() = error [Test worker]
TempInfo {town='Seoul, 화씨 온도', temp=-15} thread : Test worker
TempInfo {town='Seoul, 화씨 온도', temp=7} thread : Test worker
TempInfo {town='Seoul, 화씨 온도', temp=-10} thread : Test worker
throwable.getMessage() = error [Test worker]

마무리

이전 포스팅과 오늘 포스팅은 Reactive Stream의 Spec을 구현해보는 시간을 가져봤습니다.

다음 시간에는 또 다른 비동기의 이야기로 찾아뵙겠습니다.

감사합니다.

참조

https://grokonez.com/java/java-9-flow-api-example-processor

모던 자바 인 액션 (17장, 리액티브 프로그래밍)

댓글0