본문 바로가기
프로그래밍/Java

[Java9] Reactive Stream Flow - 실습편

by 사바라다 2020. 7. 15.

안녕하세요. 이전 포스팅에서 Reactive Stream에 대해서 알아봤었습니다.

오늘은 Reactive Stream 스펙을 이용해 실제로 Reactive Stream으로 구현해보도록 하겠습니다.

예제를 만들어 보도록 하겠습니다.

스펙

이전 포스팅에서 확인했던 3가지의 Reactive Stream의 스펙은 아래와 같습니다. 자세한 내용은 이전 포스팅를 참고 부탁드립니다.

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
public interface Subscription {
    public void request(long n);
    public void cancel();
}

예제

먼저 예제는 지속적으로 온도를 이벤트로 받아 화면에 표출하는 예제입니다. 이 예제는 모던 자바 인 액선 17장에서 참고하였습니다.

온도를 전달하는 간단한 클래스를 먼저 정의합니다. TempInfo Class는 Producer로 생산하는 역할 또한 병행하고 있습니다. 로직을 보면 fetch() 메서드를 실행하면 random한 숫자가 0 ~ 9까지 중 하나가 생성되며 0이 생성되었을 때는 Exception이 발생합니다. 0이 아니라면 0 ~ 99 까지의 temp를 가지고 TempInfo instance가 생성됩니다.

public class TempInfo {

    public static final Random random = new Random();

    private final String town;
    private final int temp;

    public TempInfo(String town, int temp) {
        this.town = town;
        this.temp = temp;
    }

    public static TempInfo fetch(String town) {
        if (random.nextInt(10) == 0) {
            throw new RuntimeException("error");
        }
        return new TempInfo(town, random.nextInt(100));
    }

    @Override
    public String toString() {
        return "TempInfo {" +
                "town='" + town + '\'' +
                ", temp=" + temp +
                '}' + " thread : " + Thread.currentThread().getName();
    }

    public String getTown() {
        return town;
    }

    public int getTemp() {
        return temp;
    }
}

TempInfo라는 Publisher가 Subscriber에게 전해 줄 정보를 정의했으니 이번에는 Publisher와 Subscriber간의 구독이 어떻게 이루어 질 건지 정의하는 Subscription을 정의하도록 하겠습니다.

먼저 Subscription은 맴버 변수로 Subscriber를 가집니다. 왜냐하면 request() 또는 cancel()에 대해서 subscriber로 다음 signal과 정보를 보내줘야하기 때문입니다. 일반적으로 등록은 생성자를 통해 하며, setter도 이용할 수 있습니다.

request()를 보시면 들어온 요청 수 n에 대해서 loop를 돌며 onNext신호를 subscriber에게 보내주고 있습니다. 만약 Exception이 발생하면 onError signal를 전달합니다. 또한 만약 subscriber로 부터 cancel이라는 명령어가 들어오면 더이상 신호를 주지 않고 onComplete 신호를 줍니다.

public class MySubscription implements Flow.Subscription {

    private final Flow.Subscriber<? super TempInfo> subscriber;
    private final String town;

    public MySubscription(Flow.Subscriber<? super TempInfo> subscriber,
                          String town) {
        this.subscriber = subscriber;
        this.town = town;
    }

    @Override
    public void request(long n) {
        for (long i = 0L; i < n; i++) {
            try {
                subscriber.onNext(TempInfo.fetch(town));
            } catch (Exception e) {
                subscriber.onError(e);
                break;
            }
        }
    }

    @Override
    public void cancel() {
        subscriber.onComplete();
    }
}

MyPublisher는 Publisher를 구현한 것입니다. subscribe 오버라이드 메서드를 통해 Subscriber를 받습니다. 그리고 subscribe 메서드를 보시면 subscriber에게 Subscription을 등록해 줍니다. 이렇게 해서 Publisher와 Subscriber, 그리고 Subscription의 관계가 완성됩니다. 그리고 Subscription이 신호를 대신 전달 해 주기 때문에 구현할 때 Publisher안에 Subscription을 두는 방법도 있습니다.

public class MyPublisher implements Flow.Publisher<TempInfo> {

    private String town;

    public MyPublisher(String town) {
        this.town = town;
    }

    @Override
    public void subscribe(Flow.Subscriber<? super TempInfo> subscriber) {
        subscriber.onSubscribe(new MySubscription(subscriber, town));
    }
}

Subscriber입니다. 생성된 신호에 따라 정보를 처리하는 Class입니다. subscription을 맴버 변수로 가집니다. Subscriber는 Publisher가 아닌 Subscription을 통해서 신호를 주고받기 때문입니다. onSubscribe 메서드에서 subscription에게 request를 보냅니다. 그러면 이제 Subscription에서 signal이 onNext, onError, onComplete 중 하나로 넘어 올 것이며 신호에 따라서 처리가 될 것입니다.

public class MySubscriber implements Flow.Subscriber<TempInfo> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(TempInfo item) {
        System.out.println(item);
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("throwable.getMessage() = " + throwable.getMessage() + " [" + Thread.currentThread().getName() + "]");
    }

    @Override
    public void onComplete() {
        System.out.println(" Done ! " + " [" + Thread.currentThread().getName() + "]");
    }
}

아래는 Publisher와 Subscriber, Subscription으 테스트 코드입니다. 아래의 코드는 Subscription을 MyPublisher에 녹여냈지만 명시적으로 Subscription을 선언해도 동일한 결과를얻을 것입니다.

@Test
public void test2() throws InterruptedException {
    MyPublisher myPublisher = new MyPublisher("Seoul");

    MySubscriber subscriber_1 = new MySubscriber();
    MySubscriber subscriber_2 = new MySubscriber();

    myPublisher.subscribe(subscriber_1);
    myPublisher.subscribe(subscriber_2);
}
TempInfo {town='Seoul', temp=37} thread : Test worker
TempInfo {town='Seoul', temp=-11} thread : Test worker
TempInfo {town='Seoul', temp=-11} thread : Test worker
TempInfo {town='Seoul', temp=34} thread : Test worker
TempInfo {town='Seoul', temp=4} thread : Test worker
throwable.getMessage() = error [Test worker]
TempInfo {town='Seoul', temp=-11} thread : Test worker
TempInfo {town='Seoul', temp=4} thread : Test worker
TempInfo {town='Seoul', temp=-6} thread : Test worker
TempInfo {town='Seoul', temp=1} thread : Test worker
TempInfo {town='Seoul', temp=3} thread : Test worker
TempInfo {town='Seoul', temp=-6} thread : Test worker
TempInfo {town='Seoul', temp=33} thread : Test worker
TempInfo {town='Seoul', temp=32} thread : Test worker
throwable.getMessage() = error [Test worker]

Reactive Stream의 강력한 기능 중 하나는 병렬성입니다. 멀티코어 시스템에서 thread를 이용해 동시에 실행시킬 수 있다는 것인데요. 동시에 실행시키는 방법은 아래와 같습니다. 즉, Subscription에 Thread Pool을 설정 후 원하는 곳에서 비동기로 실행시켜 주면 됩니다. 코드를 아래과 같이 변경하고 다시 실행해보겠습니다.

public class MySubscription implements Flow.Subscription {

    private static final ExecutorService executor = Executors.newFixedThreadPool(2);
    private final Flow.Subscriber<? super TempInfo> subscriber;
    private final String town;

    public MySubscription(Flow.Subscriber<? super TempInfo> subscriber,
                          String town) {
        this.subscriber = subscriber;
        this.town = town;
    }

    @Override
    public void request(long n) {
        executor.submit(() -> {
            for (long i = 0L; i < n; i++) {
                try {
                    subscriber.onNext(TempInfo.fetch(town));
                } catch (Exception e) {
                    subscriber.onError(e);
                    break;
                }
            }
        });
    }

    @Override
    public void cancel() {
        subscriber.onComplete();
    }
}
@Test
public void test2() throws InterruptedException {
    MyPublisher myPublisher = new MyPublisher("Seoul");

    MySubscriber subscriber_1 = new MySubscriber();
    MySubscriber subscriber_2 = new MySubscriber();

    myPublisher.subscribe(subscriber_1);
    myPublisher.subscribe(subscriber_2);
}
 hello 
TempInfo {town='Seoul', temp=41} thread : pool-1-thread-2
TempInfo {town='Seoul', temp=73} thread : pool-1-thread-1
TempInfo {town='Seoul', temp=31} thread : pool-1-thread-2
TempInfo {town='Seoul', temp=5} thread : pool-1-thread-1
TempInfo {town='Seoul', temp=35} thread : pool-1-thread-1
TempInfo {town='Seoul', temp=44} thread : pool-1-thread-2
TempInfo {town='Seoul', temp=25} thread : pool-1-thread-1
TempInfo {town='Seoul', temp=8} thread : pool-1-thread-2
TempInfo {town='Seoul', temp=43} thread : pool-1-thread-1
TempInfo {town='Seoul', temp=96} thread : pool-1-thread-2
TempInfo {town='Seoul', temp=37} thread : pool-1-thread-1
TempInfo {town='Seoul', temp=63} thread : pool-1-thread-2
TempInfo {town='Seoul', temp=1} thread : pool-1-thread-1
TempInfo {town='Seoul', temp=60} thread : pool-1-thread-2
TempInfo {town='Seoul', temp=21} thread : pool-1-thread-2
TempInfo {town='Seoul', temp=98} thread : pool-1-thread-2
TempInfo {town='Seoul', temp=46} thread : pool-1-thread-2
throwable.getMessage() = error [pool-1-thread-1]
throwable.getMessage() = error [pool-1-thread-2]

2개의 thread로 실행된 것을 확인하실 수 있습니다.

마무리

오늘은 이렇게 Reactive Stream을 Flow 인터페이스를 이용하여 직접구현해보는 시간을 가졌습니다. Flow는 구현된 것은 없으며 interface 뿐입니다. 구현된 라이브러리는 RxJava, Reactor 등이 있는데요. 이 라이브러리들은 다음에 또 다뤄 보도록 하겠습니다.

오늘은 여기까지 입니다. 다음시간에는 오늘 다루지 않은 Processor에 대해서 알아보는 시간을 가져보도록 하겠습니다.

감사합니다.

참조

모던 자바 인 액션 (17장, 리액티브 프로그래밍)

댓글