[Java8] Java 비동기 - CompletableFuture
안녕하세요. 우리는 이전시간에 [비동기] Reactive Stream 이란 포스팅에서 reactive stream에 대해서 알아보았습니다.
Reactive Stream은 published된 이벤트에 반응하여 일련의 값을 처리하는 방식입니다. 하지만 우리가 비동기를 이용할 때는 이벤트를 처리할 때도 있지만 일회성으로 무거운 작업을 여러 CPU로 나눠서 처리하고자 함도 있습니다. 물론 Reactive 방식을 이용하여 처리할 수 있지만 Reactive 방식으로는 이럴경우 Thread를 완벽하게 제어하기가 까다롭습니다. 이럴 때 사용하는 비동기 방식이 일반적으로 생각할 수 있는 thread를 생성하여 작업을 위임하는 방법입니다.이러한 방법은 Java 5에서부터는 Future
라는 이름을 추상화 되어 제공됩니다. 그리고 오늘은 Future를 좀 더 추상화 한 Java 8에서 나온 비동기 방식인 CompletableFuture
에 대해서 알아보는 시간을 가지도록 하겠습니다.
간단한 예제
무거운 작업이 2개 있습니다. 첫번째 작업은 1초만큼 시간이 걸리는 일이며, 두번째 작업은 2초만큼 시간이 걸리는 일입니다. 요청이 존재한다면 우리는 이 작업들의 합을 요청자에게 전달해야합니다. 작업의 예제는 간단히 아래와 같이 만들었습니다. exception은 생략했습니다.
private Integer work_1() {
TimeUnit.SECONDS.sleep(1);
return 1;
}
private Integer work_2() {
TimeUnit.SECONDS.sleep(2);
return 2;
}
첫번째 방법
이 문제를 해결하기 위한 가장 간단한방법은 첫번째 작업을 실행한 후 작업이 마무리되면 두번째 작업을 실행합니다. 그리고 나왔던 결과를 합치는 것입니다. 코드와 diagram으로 나타내면 아래와 같습니다. 결과적으로 1번작업과 2번작업이 마무리된 후 3초의 시간이 걸릴 것입니다. 이 경우 main Thread 하나로 일을 처리했습니다.
public void sync() {
int value_1 = work_1();
int value_2 = work_2();
System.out.println("sum = " + (value_1 + value_2));
}
두번째 방법
두번째 방법은 작업 1과 작업 2를 동시에 진행하고 그 결과를 시스템이 기다리는 방법입니다. 3개의 thread를 사용합니다. work_1 메서드용 thread, work_2 메서드용 thread, main thread입니다. 이렇게 작업을 진행하면 1번 작업과 2번 작업 중 더 오래걸리는 시간에 수렴합니다. 이렇게 되면 작업시간은 2초가 걸립니다.
@Test
public void async_2() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
CompletableFuture<Integer> value_1 = new CompletableFuture<>();
CompletableFuture<Integer> value_2 = new CompletableFuture<>();
executorService.submit(() -> value_1.complete(work_1()));
executorService.submit(() -> value_2.complete(work_2()));
System.out.println("sum = " + (value_1.get() + value_2.get()));
}
두번째 방법은 3개의 thread를 사용합니다. 그리고 결과적으로 work_1 메서드와 work_2 메서드 중 오래 걸리는 시간인 2초에 수렴합니다. 하지만 이런 방법의 단점은 무엇일까요? thread가 아무 일 없이 점유만 하고있는 시간이 길다는 것입니다. main thread의 경우는 2초간 block되며 work_1의 경우도 남은 1초간 아무일 없이 thread를 점유합니다.
thread는 아무일을 하지 않더라도 메모리의 Stack 영역, PC 레지스터 등 리소스를 잡아먹습니다. 특히 thread pool을 이용하고 있다면 아무일도 하지 않는 thread는 반환되지는 않았기 때문에 사용할 수가 없습니다. 위 경우 또 다른 요청이 온다면 thread pool의 thread 자원은 N-2의 갯수로 한정적으로 밖에 사용할 수 없게 됩니다.
3번째 방법
3번째 방법은 main thread를 바로 리턴시키는 방법입니다. 바로 main thread가 양쪽 응답을 받아 처리하는 것이 아니라 응답을 받을 thread를 별도로 생성 및 지정하여 처리하는 방법입니다. 아래와 같이 처리하면 main thread는 결과를 기다리지 않고 바로 반환됩니다. 코드는 아래와 같습니다. 테스트로 만들어 둔 코드로 로그로 찍는 결과는 확인할 수 없습니다. 확인하기 위해서는 2초간 main thread를 sleep해주면 됩니다.
@Test
public void async_3() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture<Integer> value_1 = new CompletableFuture<>();
CompletableFuture<Integer> value_2 = new CompletableFuture<>();
CompletableFuture<Integer> value_3 = value_1.thenCombine(value_2, (a, b) -> a + b);
executorService.submit(() -> value_1.complete(work_1()));
executorService.submit(() -> value_2.complete(work_2()));
executorService.submit(() -> {
try {
System.out.println("sum = " + (value_3.get()));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
}
마무리
오늘은 이렇게 Java 8에 추가된 CompletableFuture에 대해서 알아보는 시간을 가져보았습니다. Thread를 이용하는 것은 예상치 못한 에러를 발생할 가능성이 있습니다. 하지만 멀티코어 시스템에서 멀티 코어로 시스템을 이용하기 위해서는 thread를 자유자재로 쓰는것은 필수일것입니다.
또한 Reactive Stream과 CompletableFuture를 필요에 따라 적절히 섞어서 사용하는 것 또한 중요한 비동기 프로그래밍의 요소라고 생각합니다.
오늘은 여기서 글을 줄이겠습니다.
감사합니다.
참조
모던 자바 인 액션 (15장, 16장)