Java Executor
Executor
public interface Executor {
void execute(Runnable command);
}
- 제출된 Runnable 작업을 실행하는 객체이다.
- 태스크 제출과 각 태스크가 어떻게 실행될 것인지의 메커니즘을 분리하는 방법을 제공하며, 이는 스레드 사용, 스케줄링 등의 세부 사항을 포함한다.
- Executor는 일반적으로 명시적으로 스레드를 생성하는 대신 사용된다. 보통 스레드 풀을 사용하는 구현체가 많다.
- Executor 인터페이스는 실행이 반드시 비동기라는 것을 요구하지 않는다.
- Executor는 제출된 태스크를 즉시 호출자 스레드에서 실행할 수도 있다.
- Execuotr에서는 보통 태스크는 호출자의 스레드가 아닌 다른 스레드에서 실행된다.
- 많은 Executor 구현체는 태스크가 어떻게 예약되고 언제 예약되어야 하는지에 대한 어떠한 제약을 부과한다.
- java.util.concurrent 패키지에서 제공되는 Executor 구현체들은 보다 확장된 인터페이스인 ExecutorService를 구현한다.
ExecutorService
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
- 작업 종료를 관리하고 하나 이상의 비동기 작업 진행을 추적하기 위한 Future를 생성할 수 있는 Executor다.
- ExecutorService는 종료될 수 있으며, 종료하면 새 작업을 거부한다.
- ExecutorService를 종료하는 두 가지 다른 방법이 제공된다.
- shutdown 메서드는 이전에 제출된 작업이 종료되기 전에 종료를 허용한다.
- shutdownNow 메서드는 대기 중인 작업이 시작되지 않도록 하고 현재 실행 중인 작업을 중단하려 시도한다.
- 종료 후에는 Executor에 실행 중인 작업이 없으며, 실행 대기 중인 작업도 없으며 새 작업을 제출할 수 없다.
- 사용되지 않는 ExecutorService는 리소스를 회수하기 위해 종료되어야 한다.
- submit 메서드는 Executor.execute(Runnable)의 기본 메서드를 확장하여 실행을 취소하고 또는 완료를 기다리기 위해 사용할 수 있는 Future를 생성하고 반환한다.
- invokeAny 및 invokeAll 메서드는 대부분 유용한 형태의 대량 실행을 수행하며, 작업 컬렉션을 실행한 다음 적어도 하나 또는 모두가 완료될 때까지 대기한다.
ThreadPoolExecutor
스프링 부트를 사용하면 자동 구성을 통해 ThreadPoolTaskExecutor를 스프링 빈으로 등록해준다.
ThreadPoolTaskExecutor가 내부적으로 사용하는 것이 바로 ThreadPoolExecutor이다.
이 클래스는 ExecutorService의 구현 클래스다.
아래와 같은 이미지 구조가 ThreadPoolExecutor라고 생각한다.
태스크가 BlockingQueue에서 대기하고 쓰레드 풀의 쓰레드가 태스크를 가져가서 이를 수행한다.
중요한 설정 변수 몇 가지만 알아보겠다.
- 스레드 풀은 항상 내부에 유지되는 고정된 개수의 코어 스레드로 구성된다. 또한 생성된 후 더 이상 필요하지 않을 때 종료될 수 있는 일부 excessive 스레드로 구성된다.
- corePoolSize 매개변수는 인스턴스화되어 풀 에 보관될 코어 스레드 수이다.
- 새 작업이 들어올 때 모든 코어 스레드가 사용 중이고 내부 큐가 가득 차면 풀은 maximumPoolSize 까지 커질 수 있다 .
- keepAliveTime 매개변수는 excessive 스레드(corePoolSize를 초과하여 인스턴스화됨)가 유휴 상태에 존재할 수 있도록 허용되는 시간 간격이다.
- 기본적으로 ThreadPoolExecutor는 제거할 코어가 아닌 스레드만 고려한다.
- 코어 스레드에 동일한 제거 정책을 적용하기 위해 AllowCoreThreadTimeOut(true) 메소드를 사용할 수 있습니다.
Queuing
모든 BlockingQueue는 제출된 작업을 전달하고 유지하는 데 사용될 수 있다. 이 큐의 사용은 풀 크기와 상호 작용한다.
- 만약 corePoolSize 스레드보다 적은 스레드가 실행 중이라면, 실행자는 큐에 넣는 대신 새 스레드를 추가하는 것을 항상 선호한다.
- 만약 corePoolSize 이상의 스레드가 실행 중이라면, 실행자는 새 요청을 큐에 넣는 것을 항상 선호한다.
- 요청을 큐에 넣을 수 없는 경우, 대부분의 경우 새 스레드가 생성된다. 다만 이로 인해 maximumPoolSize를 초과하는 경우에는 작업이 거부된다.
직접 전달
- 작업 큐로서 좋은 디폴트 선택은 작업을 보유하지 않고 스레드에 직접 전달하는 SynchronousQueue다.
- 여기서 작업을 큐에 넣으려고 시도하면 즉시 실행할 수 있는 스레드가 없는 경우 작업을 실행할 새로운 스레드가 생성된다.
- 이 정책은 내부 종속성을 가질 수 있는 요청 세트를 처리할 때 데드락을 피할 수 있다.
- 직접 전달은 일반적으로 새로 제출된 작업을 거부하지 않도록 무제한 maximumPoolSize를 필요로 한다.
- 그 결과로 명령이 평균적으로 처리되는 속도보다 빨리 도착하면 무제한 스레드 성장 가능성을 허용한다.
무제한 큐
- 미리 정의된 용량이 없는 LinkedBlockingQueue와 같은 무제한 큐를 사용하면 모든 corePoolSize 스레드가 바쁠 때 새로운 작업이 대기열에서 대기하게 된다.
- 따라서 corePoolSize 이상의 스레드가 생성되지 않는다. (그리고 maximumPoolSize의 값은 아무런 영향을 미치지 않는다.)
- 이 방식은 각 작업이 다른 작업과 완전히 독립적이어서 작업이 서로 실행에 영향을 주지 않는 경우에 적합할 수 있다.
- 예를 들어 웹 페이지 서버에서 사용될 수 있다.
- 이런 종류의 대기열은 일시적인 요청 폭증을 완화하는 데 유용할 수 있지만, 명령이 평균적으로 처리되는 속도보다 빨리 도착하면 무제한 작업 대기열 성장 가능성을 허용한다.
제한된 큐
- ArrayBlockingQueue와 같은 제한된 큐를 사용하면 한정된 maximumPoolSize와 함께 사용될 때 자원 고갈을 방지하는 데 도움이 된다.
- 큐 크기와 최대 풀 크기를 서로 교환할 수 있다.
- 큰 큐와 작은 풀을 사용하면 CPU 사용량, OS 자원 및 컨텍스트 전환 오버헤드를 최소화하지만 인위적으로 낮은 처리량으로 이어질 수 있다.
- 작업이 자주 블록되는 경우 (예: I/O 바운드 작업), 시스템은 일반적으로 허용하는 스레드보다 더 많은 스레드에 대한 스케줄을 할 수 있다.
- 작은 큐를 사용하는 경우 일반적으로 더 큰 풀 크기가 필요하며, 이는 CPU를 더 바쁘게 유지하지만 수용 불가능한 스케줄링 오버헤드를 겪을 수 있어 처리량을 줄일 수 있다.
참고
스레드 풀 패턴은 다중 스레드 애플리케이션에서 리소스를 절약하고 미리 정의된 특정 제한 내에서 병렬 처리를 포함하는 데 도움이 된다.
작업 대기 시간을 최소화하려면 스레드 풀을 생성할 때 코어 스레드를 미리 시작하는 것이 좋다.
Spring TaskExecutor
독립적인 스레드 안에서 동작하도록 만들어진 오브젝트를 독립적으로 실행 가능한 작업이라는 의미로 태스크라고 부른다.
스프링은 이런 태스크를 다양한 방법으로 실행하도록 만들어진 오브젝트 특징을 추상화한 TaskExecutor라는 인터페이스를 제공한다.
@FunctionalInterface
public interface TaskExecutor extends Executor {
@Override
void execute(Runnable task);
}
스프링의 TaskExecutor 인터페이스는 java.lang.concurrent 패키지의 Executor과 똑같은 메서드를 가지고 있다.
자바 5의 Executor는 바로 TaskExecutor와 동일한 목적으로 만들어진 인터페이스다.
그럼에도 스프링이 똑같은 메서드를 가진 TaskExecutor를 가진 이유는 JDK의 Executor를 구현하지 않은 CommonJ WorkManager나 Quartz 같은 기술의 태스크 실행기에 대한 어댑터를 제공하는, 좀 더 폭넓은 서비스 추상화를 위해서다.
또한 스프링에 최적화된 방식으로 태스크 실행기를 확장하고 활용하는 독자적인 태스크 실행 기능을 제공하기 위해서이기도 하다.
사실 TaskExecutor는 Executor 타입이기도 하다.
JDK의 Executor를 사용하는 기능에서 스프링의 TaskExecutor를 이용해 만들어진 오브젝트를 사용하기 위해서다.
Executor도 마찬가지이지만, TaskExecutor도 그 자체로 독립적인 스레드에서 태스크를 실행해야 할 의무는 없다.
그럼에도 대부분의 TaskExecutor가 추상화한 기술은 비동기적으로 독립적인 스레드에서 실행되며, 주로 스레드 풀을 사용하는 방식을 사용한다.
ThreadPoolTaskExecutor
이 클래스는 ThreadPoolExecutor를 빈 스타일로 설정할 수 있게 해주며 Spring의 org.springframework.core.task.TaskExecutor로 노출시킬 수 있는 기능을 제공한다.
또한 스프링 부트를 사용하면 기본적으로 스프링 컨테이너에 등록되는 TaskExecutor 타입 스프링빈이다.
스프링 애플리케이션이 동작하는 자바 엔터프라이즈 환경은 제한된 크기의 스레드 풀을 사용한다고 하더라도 비동기 작업을 함부로 적용하는 건 위험하다. 비동기 작업의 특징과 서버가 받게 될 영향 등을 면밀히 파악하고 조심스럽게 적용해야 한다.
@Async와 Executor
서블릿 기반 스프링 MVC에서 @Async 애노테이션을 스프링 빈 객체 메소드에 작성하면 해당 메소드가 별도의 스레드에서 실행된다.
@Async를 사용하려면 @EnableAsync 주석을 @Configuration 클래스에 작성하면 된다.
@EnableAsync를 사용하면 기본적으로 Spring은 관련된 스레드 풀 정의를 스프링 컨테이너에서 검색한다.
- 컨텍스트 내에서 고유한 TaskExecutor 빈이거나 그렇지 않으면 "taskExecutor"라는 이름의 Executor 빈을 찾는다.
둘 중 어느 것도 찾을 수 없다면, 비동기 메서드 호출을 처리하기 위해 SimpleAsyncTaskExecutor가 사용된다.
- SimpleAsyncTaskExecutor: 이 구현 클래스는 스레드를 재사용하지 않는다. 호출할 때마다 새 스레드를 생성한다.
@Async이 붙은 클래스에서 Thread.currentThread().getName()를 출력하니 task-1이 출력됐다.
이는 SimpleAsyncTaskExecutor를 사용한다는 증거이다.
따라서 사용자가 직접 ThreadPoolTaskExecutor를 정의해주는 것이 좋다.
void 반환 유형을 갖는 주석이 달린 메서드는 호출자에게 예외를 전달할 수 없다.
기본적으로 이러한 처리되지 않은 예외는 로깅되기만 한다.
이 모든 것을 커스텀하려면 AsyncConfigurer를 작성해야 한다.
- AsyncConfigurer 구성 클래스는 응용 프로그램 컨텍스트 부트스트랩 초기에 초기화된다.
- 다른 빈에 대한 종속성이 필요한 경우 가능한 한 "lazy"로 선언하여 다른 후처리기를 통해 통과할 수 있도록 해야 한다.
@Configuration
@EnableAsync
public class AppConfig implements AsyncConfigurer {
//getAsyncExecutor() 메서드를 통해 사용자 정의 Executor.
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(7);
executor.setMaxPoolSize(42);
executor.setQueueCapacity(11);
executor.setThreadNamePrefix("MyExecutor-");
executor.initialize();
return executor;
}
//getAsyncUncaughtExceptionHandler() 메서드를 통해 사용자 정의 AsyncUncaughtExceptionHandler.
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new MyAsyncUncaughtExceptionHandler();
}
}
의문점
의문점이 있다.
분명히 스프링 부트가 자동 구성으로 인해 ThreadPoolTaskExecutor를 빈으로 등록해준다.
이 클래스의 맨 조상은 Executor이다. 그런데 왜 @Async를 사용하면 ThreadPoolTaskExecutor를 사용하지 않을까..?
일단 본인은 AsyncConfigurer이 부트스트랩 당시에 구동되므로 null이 리턴되면서 그냥 SimpleAsyncTaskExecutor가 디폴트로 쓰인다고 생각하고 있다.
추후에 더 자세히 살펴보자.
참고 자료
토비의 스프링 3.1 VOL 2
oracle java 공식 문서
https://www.baeldung.com/thread-pool-java-and-guava
댓글