오늘은 Spring MVC에서 쉽게 비동기 프로그래밍을 할 수 있는 @Async 을 튜닝하는 방법에 대해서 알아보도록 하겠습니다. 오늘 코드는 Java가 아닌 코틀린으로 작성되었습니다. Java 로도 충분히 작성하실 수 있으니 천천히 따라오시면 됩니다. :)
@Async
Spring MVC 에서 간단하게 병렬프로그래밍을 구현할 때 @Async
어노테이션을 많이 이용합니다. 이 어노테이션을 메서드에 붙이기만하면 간단하게 사용할 수 있기 때문에 많이 사용하곤 합니다. 아래 코드가 사용의 예시입니다.
@Async
fun sendCloudMessage(event: Event) {
fireBaseSuspendApi.sendCloudMessage(id = event.id);
}
하지만 이런 어노테이션으로 간단하게 사용하더라도 그 내부에는 Spring에서 설정한 제공해주는 thread pool을 사용합니다. 그렇기 때문에 튜닝 없이 사용하면 작업을 적절하게 마무리하지 못할 가능성도 있고 리소스를 무분별하게 소모할 가능성도 있습니다. 따라서 적절한 값으로 튜닝하고 사용하는게 필요합니다. @Async가 사용하는 thread poll 설정은 다양한 옵션을 제공하고 있으므로 본인의 환경에 맞게 적절하게 변경하여 사용할 수 있습니다.
ThreadPoolTaskExecutor
@Async
를 사용하게 되면 Spring에서 제공하는 thread pool 설정 bean인 ThreadPoolTaskExecutor bean을 사용하게 됩니다. 해당 bean은 본인 환경에 맞게 커스터마이징하는 것이 가능합니다. ThreadPoolTaskExecutor를 어떻게 설정할 수 있는지 알아보도록 합시다.
기본 설정
ThreadPoolTaskExecutor
의 기본 설정은 ThreadPool이 가지고 있어야할 최소 항목으로 아래와 같습니다. ThreadNamePrefix는 해당 thread-pool이 함께 가지고 있는 이름이므로 간단하게 알고 넘어가도 좋지만 나머지는 어떤 관계를 가지고 있는지 아래 테스트를 통해서 알아보도록 하겠습니다.
corePoolSize
thread-pool에 항상 살아있는 thread의 최소 갯수입니다.
maxPoolSize
thread-pool에서 사용할 수 있는 최대 thread의 갯수입니다.
queueCapacity
thread-pool에서 사용할 최대 queue의 크기입니다.
ThreadNamePrefix
thread의 이름의 prefix입니다.
corePoolSize와 maxPoolSize, 그리고 queueCapacity의 관계
corePoolSize는 thread의 최소값이며 thread-pool이 바빠지면 maxPoolSize까지 늘어나게됩니다. 그렇다면 여기서 바쁘다는 기준은 어떤것일까요? 아래의 테스트 코드를 보면서 확인해보도록 하겠습니다.
@Test
fun test() {
val threadPoolTaskExecutor = ThreadPoolTaskExecutor()
threadPoolTaskExecutor.corePoolSize = 1
threadPoolTaskExecutor.maxPoolSize = 5
threadPoolTaskExecutor.setQueueCapacity(5)
threadPoolTaskExecutor.initialize()
for (i in 1..10) {
threadPoolTaskExecutor.execute {
Thread.sleep(10 * 1000)
}
print("pool Size : ${threadPoolTaskExecutor.poolSize}")
print("active Count : ${threadPoolTaskExecutor.activeCount}")
print("queue Size : ${threadPoolTaskExecutor.threadPoolExecutor.queue.size}")
println("=============")
}
}
아래의 결과를 보도록하겠습니다. 아래의 결과를 보시면 먼저 queue의 사이즈만큼 내부의 queue에 1개씩 차오르는 것을 볼 수 있습니다. 그리고 queue안에서 max까지 차 올랐을 때 thread가 하나씩 더 active 되는것을 확인할 수 있었습니다. 따라서 core에서 max로의 thread 수의 증가는 queue의 사이즈가 가득차게되면 추가로 thread를 생성해서 사용한다는 것을 확인할 수 있었습니다.
pool Size : 1
active Count : 1
queue Size : 0
=============
pool Size : 1
active Count : 1
queue Size : 1
=============
pool Size : 1
active Count : 1
queue Size : 2
=============
[..중략..]
=============
pool Size : 1
active Count : 1
queue Size : 5
=============
pool Size : 2
active Count : 2
queue Size : 5
=============
[..중략..]
=============
pool Size : 5
active Count : 5
queue Size : 5
=============
추가 설정
keepAliveSeconds
maxPoolSize가 모두 사용되다가 idle로 돌아갔을 때 종료하기까지 대기하는 걸리는 시간입니다.
예를들어 core size가 5이며, max size가 15라고 하겠습니다. 요청이 많아져서 thread pool이 바빠져서 추가적으로 10개의 thread를 생성해서 15개의 thread를 사용하게됩니다. 그리고 얼마 후 바빠진게 해소가 되어서 이제 10개의 추가된 thread는 한가한(idle) 상태가 되게됩니다. 그리고 자원의 절약을 위해 한가한 상태의 thread가 죽게(die)됩니다.
thread가 idle 상태에서 die 상태가 되기까지 대기하는 시간을 keepAlivesSeconds 옵션으로 설정할 수 있습니다. thread를 다시 생성하는 비용과 idle 상태로 유지하는 비용의 trade-off를 잘 생각해서 설정하면 좋습니다.
WaitForTasksToCompleteOnShutdown
시스템을 종료(shutdown)할 때 queue에 남아있는 작업을 모두 완료한 후 종료 하도록 처리합니다.
AwaitTerminationSeconds
시스템을 종료(shutdown)할 때 queue에 남아있는 작업을 모두 완료한 후 종료 하도록 처리하거나 타임아웃을 지정해서 해당 시간이 넘으면 강제 종료합니다.
TaskDecorator
thread-pool로 작업(task)를 시작하기 직전에 진행하는 작업을 추가할 수 있도록 하는 interface입니다. 해당 interface를 구현하여 thread간 traceid를 복사하는 등의 작업을 할 수 있습니다.
RejectedExecutionHandler
RejectedExecutionHandler는 ThreadPoolExecutor에서 task를 더 이상 받을 수 없을 때 호출됩니다. 이런 경우는 queue 허용치를 초과하거나 Executor가 종료되어 Thread 또는 큐를 사용할 수 없는 경우에 발생하게 됩니다.
- 지정할 수 있는 옵션(ThreadPoolExecutor)
- AbortPolicy(Default) : Reject된 작업이 RejectedExecutionException을 던집니다
- CallerRunsPolicy : 호출한 Thread에서 reject된 작업을 대신 실행합니다
- DiscardPolicy : Exception 없이 Reject된 작업을 버립니다.
- DiscardOldestPolicy : queue에서 가장 오래되고 처리되지 않은 요청을 삭제하고 다시 시도합니다.
기본 값
그렇다면 @Async를 튜닝 없이 기본값으로 사용하면 어떤 설정으로 돌아가게 되는건지 확인해보도록 하겠습니다. 각 설정의 기본값은 아래와 같습니다.
- corePoolSize : 1
- maxPoolSize : Integer.MAX_VALUE
- keepAliveSeconds : 60(second)
- QueueCapacity : Integer.MAX_VALUE
- AllowCoreThreadTimeOut : false
- WaitForTasksToCompleteOnShutdown : false
- RejectedExecutionHandler : AbortPolicy
기본값을 보았을 때, corePoolSize가 1이기 때문에 1개의 스레드로 들어오는 모든 일을 처리합니다. 이때문에 요청이 많이 들어오게 되면 처리의 지연이 발생할 수 있습니다. 그리고 더더욱 많은 요청이 들어와서 처리할 수 없는 수준이 된다면 RejectedExecutionHandler
값이 AbortPolicy
이기 때문에 Exception을 발생시키며 종료되게 됩니다. 위 설정을 잘 보시고 각자의 환경에 맞게 튜닝하도록 합시다.
traceId 추가하기
만약 로깅에 traceId를 추가하여 트레이싱을 하고 있으셨다면 MDC에 넣어서 사용하고 있으셨을 것 같습니다. 이럴경우 ThreadLocal을 저장소로 하여 전달하기 때문에 비동기를 사용하는 경우 traceId가 함께 전달되지 않는 문제가 있습니다. 이러한 부분에서 TaskDecorator
를 커스터마이징하여 사용할 수 있습니다. 따라서 그런 경우 아래 코드 처럼 사용하시면 traceId를 이어 붙여줄 수 있습니다.
class MDCTaskDecorator : TaskDecorator {
override fun decorate(runnable: Runnable): Runnable {
val contextMap: Map<String, String> = MDC.getCopyOfContextMap() // main thread block
return Runnable { // async thread block
try {
MDC.setContextMap(contextMap)
runnable.run()
} finally {
MDC.clear()
}
}
}
}
커스텀 ThreadPoolTaskExecutor 생성하기
위 처럼 알아본 설정들은 아래의 코드처럼 사용하여 bean을 생성할 수 있습니다.
@Bean
fun cloudEventExecutor(): ThreadPoolTaskExecutor {
return ThreadPoolTaskExecutor().apply {
this.corePoolSize = 10
this.maxPoolSize = 20
setThreadNamePrefix("cloud-event-exec-")
setQueueCapacity(500)
setTaskDecorator(MDCTaskDecorator())
setRejectedExecutionHandler(ThreadPoolExecutor.CallerRunsPolicy())
initialize()
}
}
메트릭 추가하기
그리고 추가적으로 프로메테우스를 통해서 메트릭을 측정할 수도 있습니다. 아래의 코드로 현재 실행되는 thread 및 한가한(idle) thread에 대해서 모니터링 하는것이 가능합니다. 아래 코드를 이용하기 위해서는 프로메테우스에 대한 의존성이 필요합니다.
@Bean
fun cloudExecutorWithMetric(registry: MeterRegistry): Executor {
return ExecutorServiceMetrics.monitor(
registry,
cloudEventExecutor(),
"cloud-event-thread-pool",
Tags.empty()
)
}
적용하기
이렇게 커스터마이징한 thread-pool은 @Async
어노테이션의 value를 이용해서 쉽게 적용할 수 있습니다.
@Async(value = "cloudEventExecutor") // cloudExecutorWithMetric로 하면 메트릭까지 추가
fun sendCloudMessage(event: Event) {
fireBaseSuspendApi.sendCloudMessage(id = event.id);
}
마무리
오늘은 이렇게 Spring에서 @Async
를 환경에 맞게 튜닝하여 적절한 리소스를 이용하여 비동기 프로그래밍이 가능하게하는 방법에 대해서 알아보았습니다.
감사합니다.
참조
Async JavaDoc
댓글