본문 바로가기

프로그래밍(TA, AA)/JVM 언어

[자바] Executor 프레임워크


작업(Task)은 논리적인 업무의 단위이며, 스레드는 특정 작업을 비동기적으로 동작시킬 수 있는 방법을 제공합니다. 순차적인 방법은 응답 속도와 전체적인 성능이 크게 떨어지는 문제점이 있고, 작업별로 스레드를 만들어내는 방법은 자원관리 측면에서 허점이 있습니다.


자바 클래스 라이브러리에서 작업을 실행하고자 할 때는 Thread보다 Executor가 훨씬 추상화가 잘되어 있으며 사용하기 좋습니다.

public interface Executor {
	void execute(Runnable command);
}


Executor는 굉장히 단순한 인터페이스로 보이지만, 아주 다양한 여러 가지 종류의 작업 실행 정책을 지원하는 유연하면서도 강력한 비동기적 작업 실행 프레임워크의 근간을 이루는 인터페이스입니다. Executor는 작업 등록과 작업 실행을 분리하는 표준적인 방법이며, 각 작업은 Runnable의 형태로 정의합니다. Executor 인터페이스를 구현한 클래스는 작업의 라이프 사이클을 관리하는 기능도 갖고 있고, 몇 가지 통계 값을 뽑아내거나 또는 애플리케이션에서 작업 실행 과정을 관리하고 모니터링하기 위한 기능도 갖고 있습니다.


Executor의 구조는 프로듀서-컨슈머 패턴에 기반하고 있으며, 작업을 생성해 등록하는 클래스가 프로듀서(처리해야 할 작업을 생성하는 주체)가 되고 작업을 실제로 실행하는 스레드가 컨슈머(생성된 작업을 처리하는 주체)가 되는 모양을 갖추고 있습니다. 일반적으로 프로듀서-컨슈머 패턴을 애플리케이션에 적용해 구현할 수 있는 가장 쉬운 방법이 바로 Executor 프레임워크를 사용하는 방법입니다.



Executor를 사용한 웹서버


웹서버를 구현할 때 Executor를 적용하면 작업이 굉장히 간단해집니다. 아래 TaskExecutorWebServer 클래스를 보면 스레드를 직접 생성하던 부분에 Executor를 사용하도록 변경했습니다. 여기에서는 몇가지 표준 Executor 가운데 100개의 고정된 스레드를 확보하는 풀을 사용했습니다.


TaskExecutionWebServer 클래스를 보면 요청 처리 작업을 등록하는 부분과 실제로 처리 기능을 실행하는 부분이 Executor를 사이에 두고 분리되어 있고, Executor를 다른 방법으로 구현한 클래스를 사용하면 비슷한 기능에 다른 특성으로 동작하도록 손쉽게 변경할 수 있습니다. 스레드를 직접 생성하도록 구현되어 있는 상태에서는 서버의 동작 특성을 쉽게 변경할 수 없었지만, Executor를 사용하면 Executor의 설정을 변경하는 것만으로 쉽게 변경됩니다. Executor에 필요한 설정은 대부분 초기에 한 번 지정하는 것이 보통이며 처음 실행하는 시점에 설정 값을 지정하는 편이 좋습니다. 하지만 Executor를 사용해 작업을 등록하는 코드는 전체 프로그램의 여기저기에 퍼져있는 경우가 많기 때문에 한눈에 보기가 어렵습니다.

class TaskExecutionWebServer {
	private static final int NTHREADS = 100;
	private static final Executor exec
		= Executors.newFixedThreadPool(NTHREADS);

	public static void main(String[] args) throws IOException {
		ServerSocker socker = new ServerSocker(80);
		while(true) {
			final Socket connection = socket.accept();
			Runnable task = new Runnable() {
				public void run() {
					handleRequest(connection);
				}
			};
			exec.execute(task);
		}
	}
}


TaskExecutionWebServer의 구조를 그대로 유지하면서 들어오는 요청마다 새로운 스레드를 생성해 실행하도록 변경할 수 있을까요? 이처럼 Executor를 상속받아 또 다른 모양으로 동작하는 클래스를 쉽게 구현할 수 있습니다. 아래 예제와 같이 ThreadPerTaskExecutor를 구현해 적용하면 TaskExecutionWebServer를 간단하게 ThreadPerTaskWebServer로 변경할 수 있습니다.

public class ThreadPerTaskExecutor implements Executor {
	public void execute(Runnable r) {
		new Thread(r).start();
	};
}


이와 유사하게 TaskExecutionWebServer가 작업을 순차적으로 처리하도록 만드는 일도 아주 간단합니다. WithinThreadExecutor와 같이 execute 메소드 안에서 요청에 대한 처리 작업을 모두 실행하고, 처리가 끝나면 executor에서 리턴되도록 구현하면 됩니다.

public class WithinThreadExecutor implements Executor {
	public void execute(Runnable r) {
		r.run();
	};
}


실행 정책


작업을 등록하는 부분과 실행하는 부분을 서로 분리시켜두면 특정 작업을 실행하고자 할 때 코드를 많이 변경하거나 기타 여러 가지 어려운 상황에 맞닥뜨리지 않으면서도 execution policy을 언제든지 쉽게 변경할 수 있다는 장점이 있습니다.


 - 작업을 어느 스레드에서 실행할 것인가?

 - 작업을 어떤 순서로 실행할 것인가? (FIFO, LIFO, 기타 다양한 우선순위 정책)

 - 동시에 몇 개의 작업을 병렬로 실행할 것인가?

 - 최대 몇 개까지의 작업이 큐에서 실행을 대기할 수 있게 할 것인가?

 - 시스템에 부하가 많이 걸려서 작업을 거절해야 하는 경우, 어떤 작업을 희생양으로 삼아야할 것이며, 작업을 요청한 프로그램에 어떻게 알려야 할 것인가?

 - 작업을 실행하기 직전이나 실행한 직후에 어떤 동작이 있어야 하는가?


실행 정책은 일종의 자원 관리 도구라고도 할 수 있습니다. 가장 최적화된 실행 정책을 찾으려면 하드웨어나 소프트웨어적인 자원을 얼마나 확보할 수 있는지 확인해야 하고, 더불어 애플리케이션의 성능과 반응 속도가 요구사항에 얼마만큼 명시되어 있는지도 알아야 합니다. 병렬로 실행되는 스레드의 수를 제한한다면 아마도 애프릴케이션에서 자원이 모자라는 상황에 다다르거나 제한된 자원을 서로 사용하기 위해 각 작업이 경쟁하느라 애플리케이션의 성능이 떨어지는 일은 별로 보기 어려울 것입니다. 실행 정책과 작업 등록 부분을 명확하게 분리시켜 두면 애플리케이션을 실제 상황에 적용하려 할 때 설치할 하드웨어나 기타 자원의 양에 따라 적절한 실행 정책을 임의로 지정할 수 있습니다.


프로그램 어디에서든 간에 new Thread(runnable).start() 와 같은 코드가 남아 있다면 조만간 이런 부분에 유연한 실행 정책을 적용할 준비를 해야할 것이며, 나중을 위해서 Executor를 사용해 구현하는 방안을 심각하게 고려해봐야 합니다.



스레드 풀


스레드 풀은 이름 그대로 작업을 처리할 수 있는 동일한 형태의 스레드를 풀의 형태로 관리합니다. 그리고 일반적으로 스레드 풀은 풀 내부의 스레드로 처리할 작업을 쌓아둬야 하기 때문에 작업 큐(work queue)와 굉장히 밀접한 관련이 있습니다. 작업 스레드는 아주 간단한 주기로 동작하는데, 먼저 작업 큐에서 실행할 다음 작업을 가져오고, 작업을 실행하고, 가져와 실행할 다음 작업이 나타날 때까지 대기하는 일을 반복합니다. 


풀 내부의 스레드를 사용해 작업을 실행하는 방법을 사용하면, 작업별로 매번 스레드를 생성해 처리하는 방법보다 굉장히 많은 장점이 있습니다. 매번 스레드를 생성하는 대신 이전에 사용했던 스레드를 재상요하기 때문에 스레드를 계속해서 생성할 필요가 없고, 따라서 여러 개의 요청을 처리하는 데 필요한 시스템 자원이 줄어드는 효과가 있습니다. 더군다나 클라이언트가 요청을 보냈을 때 해당 요청을 처리할 스레드가 이미 만들어진 상태로 대기하고 있기 때문에 작업을 실행하는 데 딜레이가 발생하지 않아 전체적인 반응 속도도 향상됩니다. 스레드 풀의 크기를 적절히 조절해두면 하드웨어 프로세서가 쉬지 않고 동작하도록할 수 있으며, 하드웨어  프로세서가 바쁘게 동작하는 와중에도 메모리를 전부 소모하거나 여러 스레드가 한정된 자원을 두고 서로 경쟁하느라 성능을 까먹는 현상도 없앨 수 있습니다.


자바 클래스 라이브러리에서는 흔히 사용하는 여러 가지 설정 상태에 맞춰 몇 가지 종류의 스레드 풀을 제공하고 있습니다. 미리 정의되어 있는 스레드 풀을 사용하려면 Executors 클래스에 만들어져 있는 다음과 같은 메소드를 호출하면 됩니다.


newFixedThreadPool

처리할 작업이 등록되면 그에 따라 실제 작업할 스레드를 하나씩 생성한다. 생성할 수 있는 스레드의 최대 개수는 제한되어 있으며 제한된 개수까지 스레드를 생성하고 나면 더이상 생성하지 않고 스레드수를 유지한다. (만약 스레드가 작업하는 도중에 예상치 못한 예외가 발생해서 스레드가 종료되거나 하면 하나씩 더 생성하기도 한다)


newCachedThreadPool

캐시 스레드 풀은 현재 풀에 갖고 있는 스레드의 수가 처리할 작업의 수보다 많아서 쉬는 스레드가 많이 발생할 때 쉬는 스레드를 종료시켜 훨씬 유연하게 대응할 수 있으며, 처리할 작업의 수가 많아지면 필요한 만큼 스레드를 새로 생성한다. 반면에 스레드의 수에는 제한을 두지 않는다.


newSingleThreadExecutor

단일 스레드로 동작하는 Executor로서 작업을 처리하는 스레드가 단 하나 뿐이다. 만약 작업중에 Exception이 발생해 비정상적으로 종료되면 새로운 스레드를 하나 생성해 나머지 작업을 실행한다. 등록된 작업은 설정된 큐에서 지정하는 순서(FIFO, LIFO, 우선순위)에 따라 반드시 순차적으로 처리된다.


단일 스레드로 실행되는 Executor 역시 내부적으로 충분한 동기화 기법을 적용하고 있으며, 따라서 특정 작업이 진행되는 동안 메모리에 남겨진 기록을 다음에 실행되는 작업에서 가져다 사용할 수 있다. 이것은 여러 작업이 모두 하나의 스레드에 제한된 상태로 실행되기 때문에, 간혹 작업 실행 스레드가 종료되어 새로운 스레드를 만들어 실행하는 경우에도 똑같이 적용된다.


newScheduledThreadPool

일정 시간 이후에 실행하거나 주기적으로 작업을 실행할 수 있으며, 스레드의 수가 고정되어 있는 형태의 Executor.Timer 클래스의 기능과 유사하다.


newFixedThreadPool과 newCachedThreadPool 팩토리 메소드는 일반화된 형태로 구현되어 있는 ThreadPoolExecutor 클래스의 인스턴스를 생성합니다. 생성된 ThreadPoolExecutor 인스턴스에 설정 값을 조절해 필요한 형태를 갖추고 사용할 수도 있습니다.


TaskExecutionWebServer 예제에서 구현했던 웹서버는 제한된 개수의 스레드로 동작하는 Executor를 사용했었습니다. 처리할 작업을 execute 메소드로 등록해 두면 Executor 내부의 큐에 쌓이고, Executor 내부의 풀에 있는 스레드가 큐에 쌓여 있는 작업을 하나씩 뽑아내 처리하게 되어 있습니다.


이처럼 작업별로 스레드를 생성하는 전략(thread-per-task)에서 풀을 기반으로 하는 전략으로 변경하면 안정성 측면에서 엄청난 장점을 얻을 수 있습니다. 바로 웹서버에 부하가 걸리더라도 더이상 메모리가 부족해 죽는 일이 발생하지 않는다는 점입니다.


여기에서 죽지 않는다는 것은 스레드를 계속 생성하느라 메모리가 모자라 죽는 경우를 말합니다. 물론 요청을 처리하는 속도보다 요청이 새로 추가되는 속도가 더 빨라 처리하지 못하고 쌓여있느 ㄴ작업의 수가 계속해서 늘어난다면 여전히 메모리가 모자라 웹서버가 다운될 가능성이 있습니다. 이렇게 작업이 적체될 수 있는 문제점은 작업 큐의 크기를 제한하는 것으로 해결이 가능합니다.


더군다나 부하에 따라 수천 개의 스레드를 생성해 제한된 양의 CPU와 메모리 자원을 서로 사용하려고 경쟁시키는 상황에 이르지 않기 때문에 성능이 떨어질 때도 점진적으로 서서히 떨어지는 특징을 갖습니다. 또한 Executor를 사용하면 사용하지 않을 때보다 성능을 튜닝하거나, 실행 과정을 관리하거나, 실행 상태를 모니터링하거나, 실행 기록을 로그로 남기거나, 오류가 발생했을 때 처리하고자 할 때 여러 가지 방법을 동원해 쉽고 효과적으로 처리하기가 좋습니다.



Executor 동작 주기


Executor를 구현하는 클래스는 대부분을 작업을 처리하기 위한 스레드를 생성하도록 되어있습니다. 하지만 JVM은 모든 스레드가 종료되기 전에는 종료하지 않고 대기하기 때문에 Executor를 제대로 종료시키지 않으면 JVM 자체가 종료되지 않고 대기하기도 합니다. 


Executor는 작업을 비동기적으로 실행하기 때문에 앞서 실행시켰던 작업의 상태를 특정 시점에 정확하게 파악하기 어렵습니다. 어떤 작업은 이미 완료됐을 수도 있고, 또 몇개의 작업은 아직 실행 중일 수 있고, 또 다른 작업은 아직 큐에서 대기 상태에 머물러 있을 수도 있습니다.


애플리케이션을 종료하는 과정을 보면 안전한 종료 방법(graceful, 작업을 새로 등록하지는 못하고 시작된 모든 작업을 끝낼때까지 기다림)이 있겠고, 또 한편으로는 강제적인 종료(abrupt, 예를 들어 플러그가 빠져 전원인 꺼지는 경우) 방법이 있겠습니다. 


물론 안전한 종료 방법과 강제 종료 사이에 위치시킬수 있는 여러 가지 종료 방법이 있습니다. Executor가 애플리케이션에 스레드 풀 등의 서비스를 제공한다는 관점으로 생각해본다면, Executor 역시 안전한 방법이건 강제적인 방법이건 종료 절차를 밟아야할 필요가 있습니다. 그리고 종료 절차를 밟는 동안 실행중이거나 대기 중이던 작업을 어떻게 철했는지 작업을 맡겼던 애플리케이션에게 알려줄 의무가 있습니다.


이처럼 서비스를 실행하는 동작 주기와 관련해 Executor를 상속받은 ExecutorService 인터페이스에는 동작 주기를 관리할 수 있는 여러 가지 메소드가 추가되어 있습니다. ExecutorService 인터페이스의 동작 주기 관리 관련 메소드는 아래와 같습니다. 

public interface ExecutorService extends Executor {
	void shutdown();
	List<Runnable> shutdownNow();
	boolean isShutdown();
	boolean isTerminated();
	boolean awaitTermination(long timeout, TimeUnit unit)
		throws InterruptedException;
	// ... 작업을 등록할 수 있는 몇가지 추가 메소드
}

내부적으로 ExecutorService가 갖고 있는 동작 주기에는 실행 중(runnint), 종료중(shutting down), 종료(terminated)의 세 가지 상태가 있습니다. ExecutorService를 처음 생성했을 때에는 실행 중 상태로 동작합니다. 어느 시점엔가 shutdown 메소드를 실행하면 안전한 종료절차를 진행하며 종료중 상태로 들어갑니다. 이 상태에서는 새로우 작업을 등록받지 않으며, 이전에 등록되어 있던 작업까지는 모두 끝마칠 수 있습니다. shutdownNow 메소드를 실행하면 강제 종료 절차를 진행합니다. 현재 진행 중인 작업도 가능한 한 취소시키고, 실행되지 않고 대기 중이던 작업은 더이상 실행시키지 않습니다.


ExecutorService의 하위 클래스인 ThreadPoolExecutor는 이미 종료 절차가 시작되거나 종료된 이후에 새로운 작업을 등록하려 하면 실행 거절 핸들러를 통해 오류로 처리합니다. 실행 거절 핸들러에 따라 다르지만 등록하려했던 작업을 조용히 무시할 수도 있고, RejectedExecutionException을 발생시켜 오류로 처리하도록 할 수도 있습니다. 종료 절차가 시작된 이후 실행 중기거나 대기 중이던 작업을 모두 끝내고 나면 ExecutorService는 종료 상태로 들어갑니다. ExecutorService가 종료 상태로 들어갈 때까지 기다리고자 한다면 awaitTermination 메소드로 대기할 수 있고, isTerminated 메소드를 주기적으로 호출해 종료 상태로 들어갔는지 확인할 수도 있습니다. 


일반적으로 shutdown 메소드를 실행한 이후 바로 awaitTermination을 실행하면 마치 ExecutorService를 직접 종료시키는 것과 비슷한 효과를 얻을 수 있습니다.


LifecycleWebServer 클래스에는 앞서 작업했던 웹서버에 동작 주기에 대한 지원 부분을 추가했습니다. LifecycleWebServer는 두가지 방법으로 종료시킬 수 있는데, 첫번째는 stop 메소드를 호출하는 방법이고, 두번째는 클라이언트 측에서 특정한 형태의 HTTP 요청을 전송하는 방법입니다.

class LifecycleWebServer {
	private final ExecutorService exec = ...;

	public void start() throws IOException {
		ServerSocket socket = new ServerSocket(80);
		while (!exec.isShutdown()) {
			try {
				final Socket conn = socket.accept();
				exec.execute(new Runnable() { public void run()
					{ handleRequest(conn); }
				});
			} catch (RejectedExecutionException e) {
				if (!exec.isShutdown()) { log("task submission rejected", e); }

			}
		}
	}

	public void stop() { exec.shutdown(); }

	void handleRequest(Socket connection) {
		Request req = readRequest(connection);
		if (isShutdownRequest(req))
			stop();
		else
			dispatchRequest(req);
	}
}