본문 바로가기
language, framework, library/Spring

[Spring] @Async에서 사용하는 ThreadPoolTaskExecutor 최적화하기

by 사바라다 2022. 2. 19.
반응형

오늘은 Spring MVC에서 쉽게 비동기 프로그래밍을 할 수 있는 @Async 을 튜닝하는 방법에 대해서 알아보도록 하겠습니다. 오늘 코드는 Java가 아닌 코틀린으로 작성되었습니다. Java 로도 충분히 작성하실 수 있으니 천천히 따라오시면 됩니다. :)

@Async

Spring MVC 에서 간단하게 병렬프로그래밍을 구현할 때 @Async 어노테이션을 많이 이용합니다. 이 어노테이션을 메서드에 붙이기만하면 간단하게 사용할 수 있기 때문에 많이 사용하곤 합니다. 아래 코드가 사용의 예시입니다.

@Async
fun sendCloudMessage(event: Event) {
  fireBaseSuspendApi.sendCloudMessage(id = event.id);
}

하지만 이런 어노테이션으로 간단하게 사용하더라도 그 내부에는 Spring에서 설정한 제공해주는 thread pool을 사용합니다. 그렇기 때문에 튜닝 없이 사용하면 작업을 적절하게 마무리하지 못할 가능성도 있고 리소스를 무분별하게 소모할 가능성도 있습니다. 따라서 적절한 값으로 튜닝하고 사용하는게 필요합니다. @Async가 사용하는 thread poll 설정은 다양한 옵션을 제공하고 있으므로 본인의 환경에 맞게 적절하게 변경하여 사용할 수 있습니다.

ThreadPoolTaskExecutor

@Async를 사용하게 되면 Spring에서 제공하는 thread pool 설정 bean인 ThreadPoolTaskExecutor bean을 사용하게 됩니다. 해당 bean은 본인 환경에 맞게 커스터마이징하는 것이 가능합니다. ThreadPoolTaskExecutor를 어떻게 설정할 수 있는지 알아보도록 합시다.

기본 설정

ThreadPoolTaskExecutor의 기본 설정은 ThreadPool이 가지고 있어야할 최소 항목으로 아래와 같습니다. ThreadNamePrefix는 해당 thread-pool이 함께 가지고 있는 이름이므로 간단하게 알고 넘어가도 좋지만 나머지는 어떤 관계를 가지고 있는지 아래 테스트를 통해서 알아보도록 하겠습니다.

corePoolSize

thread-pool에 항상 살아있는 thread의 최소 갯수입니다.

maxPoolSize

thread-pool에서 사용할 수 있는 최대 thread의 갯수입니다.

queueCapacity

thread-pool에서 사용할 최대 queue의 크기입니다.

ThreadNamePrefix

thread의 이름의 prefix입니다.

corePoolSize와 maxPoolSize, 그리고 queueCapacity의 관계

corePoolSize는 thread의 최소값이며 thread-pool이 바빠지면 maxPoolSize까지 늘어나게됩니다. 그렇다면 여기서 바쁘다는 기준은 어떤것일까요? 아래의 테스트 코드를 보면서 확인해보도록 하겠습니다.

@Test
fun test() {
  val threadPoolTaskExecutor = ThreadPoolTaskExecutor()
  threadPoolTaskExecutor.corePoolSize = 1
  threadPoolTaskExecutor.maxPoolSize = 5
  threadPoolTaskExecutor.setQueueCapacity(5)
  threadPoolTaskExecutor.initialize()

  for (i in 1..10) {
      threadPoolTaskExecutor.execute {
          Thread.sleep(10 * 1000)
      }

      print("pool Size : ${threadPoolTaskExecutor.poolSize}")
      print("active Count : ${threadPoolTaskExecutor.activeCount}")
      print("queue Size : ${threadPoolTaskExecutor.threadPoolExecutor.queue.size}")
      println("=============")
  }
}

아래의 결과를 보도록하겠습니다. 아래의 결과를 보시면 먼저 queue의 사이즈만큼 내부의 queue에 1개씩 차오르는 것을 볼 수 있습니다. 그리고 queue안에서 max까지 차 올랐을 때 thread가 하나씩 더 active 되는것을 확인할 수 있었습니다. 따라서 core에서 max로의 thread 수의 증가는 queue의 사이즈가 가득차게되면 추가로 thread를 생성해서 사용한다는 것을 확인할 수 있었습니다.

pool Size : 1
active Count : 1
queue Size : 0
=============
pool Size : 1
active Count : 1
queue Size : 1
=============
pool Size : 1
active Count : 1
queue Size : 2
=============
[..중략..]
=============
pool Size : 1
active Count : 1
queue Size : 5
=============
pool Size : 2
active Count : 2
queue Size : 5
=============
[..중략..]
=============
pool Size : 5
active Count : 5
queue Size : 5
=============

추가 설정

keepAliveSeconds

maxPoolSize가 모두 사용되다가 idle로 돌아갔을 때 종료하기까지 대기하는 걸리는 시간입니다.

예를들어 core size가 5이며, max size가 15라고 하겠습니다. 요청이 많아져서 thread pool이 바빠져서 추가적으로 10개의 thread를 생성해서 15개의 thread를 사용하게됩니다. 그리고 얼마 후 바빠진게 해소가 되어서 이제 10개의 추가된 thread는 한가한(idle) 상태가 되게됩니다. 그리고 자원의 절약을 위해 한가한 상태의 thread가 죽게(die)됩니다.

thread가 idle 상태에서 die 상태가 되기까지 대기하는 시간을 keepAlivesSeconds 옵션으로 설정할 수 있습니다. thread를 다시 생성하는 비용과 idle 상태로 유지하는 비용의 trade-off를 잘 생각해서 설정하면 좋습니다.

WaitForTasksToCompleteOnShutdown

시스템을 종료(shutdown)할 때 queue에 남아있는 작업을 모두 완료한 후 종료 하도록 처리합니다.

AwaitTerminationSeconds

시스템을 종료(shutdown)할 때 queue에 남아있는 작업을 모두 완료한 후 종료 하도록 처리하거나 타임아웃을 지정해서 해당 시간이 넘으면 강제 종료합니다.

TaskDecorator

thread-pool로 작업(task)를 시작하기 직전에 진행하는 작업을 추가할 수 있도록 하는 interface입니다. 해당 interface를 구현하여 thread간 traceid를 복사하는 등의 작업을 할 수 있습니다.

RejectedExecutionHandler

RejectedExecutionHandler는 ThreadPoolExecutor에서 task를 더 이상 받을 수 없을 때 호출됩니다. 이런 경우는 queue 허용치를 초과하거나 Executor가 종료되어 Thread 또는 큐를 사용할 수 없는 경우에 발생하게 됩니다.

  • 지정할 수 있는 옵션(ThreadPoolExecutor)
    • AbortPolicy(Default) : Reject된 작업이 RejectedExecutionException을 던집니다
    • CallerRunsPolicy : 호출한 Thread에서 reject된 작업을 대신 실행합니다
    • DiscardPolicy : Exception 없이 Reject된 작업을 버립니다.
    • DiscardOldestPolicy : queue에서 가장 오래되고 처리되지 않은 요청을 삭제하고 다시 시도합니다.

기본 값

그렇다면 @Async를 튜닝 없이 기본값으로 사용하면 어떤 설정으로 돌아가게 되는건지 확인해보도록 하겠습니다. 각 설정의 기본값은 아래와 같습니다.

  • corePoolSize : 1
  • maxPoolSize : Integer.MAX_VALUE
  • keepAliveSeconds : 60(second)
  • QueueCapacity : Integer.MAX_VALUE
  • AllowCoreThreadTimeOut : false
  • WaitForTasksToCompleteOnShutdown : false
  • RejectedExecutionHandler : AbortPolicy

기본값을 보았을 때, corePoolSize가 1이기 때문에 1개의 스레드로 들어오는 모든 일을 처리합니다. 이때문에 요청이 많이 들어오게 되면 처리의 지연이 발생할 수 있습니다. 그리고 더더욱 많은 요청이 들어와서 처리할 수 없는 수준이 된다면 RejectedExecutionHandler 값이 AbortPolicy이기 때문에 Exception을 발생시키며 종료되게 됩니다. 위 설정을 잘 보시고 각자의 환경에 맞게 튜닝하도록 합시다.

traceId 추가하기

만약 로깅에 traceId를 추가하여 트레이싱을 하고 있으셨다면 MDC에 넣어서 사용하고 있으셨을 것 같습니다. 이럴경우 ThreadLocal을 저장소로 하여 전달하기 때문에 비동기를 사용하는 경우 traceId가 함께 전달되지 않는 문제가 있습니다. 이러한 부분에서 TaskDecorator를 커스터마이징하여 사용할 수 있습니다. 따라서 그런 경우 아래 코드 처럼 사용하시면 traceId를 이어 붙여줄 수 있습니다.

class MDCTaskDecorator : TaskDecorator {
  override fun decorate(runnable: Runnable): Runnable {
      val contextMap: Map<String, String> = MDC.getCopyOfContextMap() // main thread block

      return Runnable { // async thread block
          try {
              MDC.setContextMap(contextMap)
              runnable.run()
          } finally {
              MDC.clear()
          }
      }
  }
}

커스텀 ThreadPoolTaskExecutor 생성하기

위 처럼 알아본 설정들은 아래의 코드처럼 사용하여 bean을 생성할 수 있습니다.

@Bean
fun cloudEventExecutor(): ThreadPoolTaskExecutor {
  return ThreadPoolTaskExecutor().apply {
      this.corePoolSize = 10
      this.maxPoolSize = 20
      setThreadNamePrefix("cloud-event-exec-")
      setQueueCapacity(500)
      setTaskDecorator(MDCTaskDecorator()) 
      setRejectedExecutionHandler(ThreadPoolExecutor.CallerRunsPolicy())
      initialize()
  }
}

메트릭 추가하기

그리고 추가적으로 프로메테우스를 통해서 메트릭을 측정할 수도 있습니다. 아래의 코드로 현재 실행되는 thread 및 한가한(idle) thread에 대해서 모니터링 하는것이 가능합니다. 아래 코드를 이용하기 위해서는 프로메테우스에 대한 의존성이 필요합니다.

@Bean
fun cloudExecutorWithMetric(registry: MeterRegistry): Executor {
  return ExecutorServiceMetrics.monitor(
      registry,
      cloudEventExecutor(),
      "cloud-event-thread-pool",
      Tags.empty()
  )
}

적용하기

이렇게 커스터마이징한 thread-pool은 @Async 어노테이션의 value를 이용해서 쉽게 적용할 수 있습니다.

@Async(value = "cloudEventExecutor") // cloudExecutorWithMetric로 하면 메트릭까지 추가
fun sendCloudMessage(event: Event) {
  fireBaseSuspendApi.sendCloudMessage(id = event.id);
}

마무리

오늘은 이렇게 Spring에서 @Async를 환경에 맞게 튜닝하여 적절한 리소스를 이용하여 비동기 프로그래밍이 가능하게하는 방법에 대해서 알아보았습니다.

감사합니다.

참조

Async JavaDoc

docs_spring_ThreadPoolTaskExecutor

반응형

댓글