본문 바로가기

프로그래밍(TA, AA)/JVM 언어

[자바] 스트림과 병렬 데이터 처리

스트림


거의 모든 자바 애플리케이션은 컬렉션을 만들고 활용합니다. 하지만 컬렉션으로 모든 문제가 해결되는 것은 아닙니다. 예를 들어 리스트에서 고가의 트랜잭션(Transaction)만 필터링한 다음에 통화로 결과를 그룹화해야 한다고 가정해보겠습니다. 다음 코드처럼 많은 기본 코드를 구현해야 합니다.

 

// 그룹화된 트랜잭션을 더할 Map 생성
Map<Currency, List<Transaction>> transactionsByCurrencies = new HashMap<>();
// 트랜잭션의 리스트를 반복
for (Transaction transcation : transactions) {
	// 고가의 트랜잭션을 필터링
	if (transaction.getPrice() > 1000) {
		// 트랜잭션의 통화 추출
		Currency currency = transaction.getCurrency();
		List<Transaction> transactionsForCurrency = transactionsByCurrencies.get(currency);
		// 현재 통화의 그룹화된 맵에 항목이 없으면 새로 만든다.
		if (transactionsForCurrency == null) {
			transactionsForCurrency = new ArrayList<>();
			transactionsByCurrencies.put(currency, transactionsForCurrency);
		}
		// 현재 탐색된 트랜잭션을 같은 통화의 트랜잭션 리스트에 추가한다.
		transactionsForCurrency.add(transaction);
	}
}

 

게다가 위 예제 코드에는 중첩된 제어 흐름 문장이 많아서 코드를 한번에 이해하기도 어렵습니다. 스트림 API를 이용하면 다음처럼 문제를 해결할 수 있습니다. 

import static java.util.stream.Collectors.toList; Map<Currency, List<Transaction>> transactionsByCurrencies = transactions.stream() .filter((Transaction t) -> t.getPrice() > 1000) .collection(groupingBy(Transaction::getCurrency));

 

스트림 API를 이용하면 컬렉션 API와는 상당히 다른 방식으로 데이터를 처리할 수 있습니다. 컬렉션에서는 반복 과정을 직접 처리해야 했습니다. 즉, for-each 루프를 이용해서 각 요소를 반복하면서 작업을 수행했습니다. 이런 방식의 반복을 외부 반복(external iteration) 이라고 합니다. 반면 스트림 API를 이용하면 루프를 신경 쓸 필요가 없습니다. 스트림 API에서는 라이브러리 내부에서 모든 데이터가 처리됩니다. 이와 같은 반복을 내부 반복(internal iteration)이라고 합니다.

 

컬렉션을 이용했을때 다른 문제도 생길 수 있습니다. 단일 CPU로는 거대한 데이터를 처리하기 힘들 것입니다. 따라서 서로 다른 CPU 코어에 작업을 각각 할당해서 처리 시간을 줄일 수 있다면 좋을 것입니다. 이론적으로 8개 코어를 가진 컴퓨터라면 8개 코어를 활용해서 병렬로 작업을 수행하여 단일 CPU 컴퓨터에 비해 8배 빨리 작업을 처리할 수 있습니다.

 

 

* 멀티 코어

최신 컴퓨터들을 모두 멀티코어를 장착했습니다. 즉, 단일 CPU가 아닌 넷 또는 여덟개 이상의 CPU(일반적으로 코어라 불리는)를 갖습니다. 즉, 단일 CPU가 아닌 넷 또는 여덟개 이상의 CPU를 갖습니다. 전통적인 자바 프로그램은 이렇게 많은 CPU 중 단 하나만 사용하면서 나머지 CPU를 낭비시키는 것이 문제였습니다. 많은 회사에서 컴퓨팅 클러스터(computing cluster: 고속 네트워크로 서로 연결된 컴퓨터)를이용해서 대량의 데이터를 효과적으로 처리합니다. 자바 8에서는 이런 컴퓨터를 더 잘 활용할 수 있는 새로운 프로그래밍 스타일을 제공합니다.

 

구글의 검색 엔진은 하나의 컴퓨터로는 수행할 수 없는 종류의 코드를 실행하는 좋은 예제입니다. 구글 검색 엔진은 인터넷의 모든 페이지를 읽어 인터넷 페이지에 등장한 모든 단어를 관련 URL과 매핑하는 인덱스를 만듭니다. 우리가 구글 검색에 몇가지 단어를 입력하면 소프트웨어는 만들어진 인덱스를 이용해서 입력한 단어를 포함하는 몇가지 웹페이지를 보여줍니다. 구글보다 작은 인덱스를 구성하더라도 컴퓨터의 모든 코어를 활용해야 할만큼 만만치 않은 작업입니다. 

 

 

스트림과 멀티스레딩

 

이전 자바 버전에서 제공하는 스레드 API로 멀티스레딩 코드를 구현해서 병렬성을 이용하는 것은 쉽지 않습니다. 멀티스레딩 환경에서 각각의 스레드는 동시에 공유된 데이터에 접근하고, 데이터를 갱신할 수 있습니다. 결과적으로 스레드를 잘 제어하지 못하면 원치 않는 방식으로 데이터가 바뀔 수 있습니다. 멀티스레딩 모델은 순차적인 모델보다 다루기 어렵습니다. 

 

전통적으로 멀티스레딩 환경에서는 synchronized를 자주 활용했습니다. 하지만 synchronized를 활용하더라도 많은 미묘한 버그가 발생할 수 있습니다. 자바 8에서는 synchronized가 필요치 않은 함수형 프로그래밍 형식의 스트림 기반 병렬성을 이용하도록 권고합니다. 자바 8에서는 데이터 접근 방법을 제어하는 것이 아니라 어떻게 데이터를 분할할지 고민하게 됩니다.

 

자바 8은 스트림 API(java.util.stream)로 '컬렉션을 처리하면서 발생하는 모호함과 반복적인 코드 문제' 그리고 '멀티코어 활용 어려움'이라는 두 가지 문제를 모두 해결했습니다. 기존의 컬렉션에서는 데이터를 처리할 때 반복되는 패턴이 너무 많았습니다. 따라서 라이브러리에서 이러한 반복되는 패턴을 제공한다면 좋을 것이라는 아이디어가 변화의 동기가 되었습니다.

 

즉, 자주 반복되는 패턴으로 주어진 조건에 따라 데이터를 필터링(filtering)하거나, 데이터를 추출(extracting)하거나, 데이터를 그룹화(grouping)하는 등의 기능이 있습니다. 또한 이러한 동작들을 쉽게 병렬화할 수 있다는 점도 변화의 동기가 되었습니다. 또한 이러한 동작들을 쉽게 병렬화할 수 있다는 점도 변화의 동기가 되었습니다.

 

예를 들어 두 CPU를 가진 환경에서 리스트를 필터링할 때 한 CPU는 리스트의 앞부분을 처리하고, 다른 CPU는 리스트의 뒷부분을 처리하도록 요청할 수 있습니다. 이 과정을 포킹 단계(forking step)라고 합니다. 그리고 각각의 CPU는 자신이 맡은 절반의 리스트를 처리합니다. 마지막으로 하나의 CPU가 두 결과를 정리합니다. (구글 검색도 이와 같은 방식으로 작동하면서 빠르게 검색 결과를 제공합니다. 물론 구글 검색은 두 개 이상의 프로세서를 사용합니다.)

 

지금은 새로운 스트림 API도 기존의 컬렉션 API와 아주 비슷한 방식으로 동작한다고(즉, 두 방식 모두 순차적인 데이터 항목 접근 방법을 제공) 간주할 것입니다. 다만 컬렉션은 어떻게 데이터를 저장하고 접근할지에 중점을 두는 반면, 스트림은 데이터에 어떤 계산을 할 것인지 묘사하는 것에 중점을 둔다는 점을 기억하면 좋습니다. 스트림은 스트림 내의 요소를 쉽게 병렬로 처리할 수 있는 환경을 제공한다는 것이 핵심입니다. 컬렉션을 필터링할 수 있는 가장 빠른 방법은 컬렉션을 스트림으로 바꾸고, 병렬로 처리한 다음에, 리스트로 다시 복원하는 것입니다. 

 

 

1) 순차 처리 방식의 코드

 

import static java.util.stream.Collectors.toList;
List<Apple> heavyApples =
	inventory.stream().filter((Apple a) -> a.getWeight() > 150)
	                  .collect(toList());

 

2) 병렬 처리 방식의 코드

 

import static java.util.stream.Collectors.toList;
List<Apple> heavyApples =
	inventory.parallelStream().filter((Apple a) -> a.getWeight() > 150)
	                          .collect(toList());

 

자바 8은 라이브러리에서 분할 처리를 합니다. 즉, 큰 스트림을 병렬로 처리할 수 있도록 작은 스트림으로 분할합니다. 또한 filter 같은 라이브러리 메서드로 전달된 메서드가 상호작용하지 않는다면 가변 공유 객체를 통해 병렬성을 누릴 수 있습니다. 함수형 프로그래밍에서 함수형이란 부가적으로 '프로그램이 실행되는 동안 컴포넌트 간에 상호작용이 일어나지 않는다'라는 의미도 포함합니다.

 

 

 

병렬 데이터 처리


외부 반복을 내부 반복으로 바꾸면 네이티브 자바 라이브러리가 스트림 요소의 처리를 제어할 수 있습니다. 컴퓨터의 멀티코어를 활용해서 파이프라인 연산을 실행할 수 있는 점이 가장 큰 장점입니다.

 

자바7 이전에는 데이터 컬렉션을 병렬로 처리하기가 어려웠습니다. 우선 데이터를 서브파트로 분할해야 합니다. 그리고 분할된 서브파트를 각각의 스레드로 할당합니다. 스레드로 할당한 다음에는 의도치 않은 Race Condition이 발생하지 않도록 적절한 동기화를 추가해야 하며, 마지막으로 부분결과를 합쳐야 합니다. 자바7은 더 쉽게 병렬화를 수행하면서 에러를 최소화할 수 있도록 fork/join framework 라는 기능을 제공합니다.

 

 

병렬 스트림

 

스트림 인터페이스를 이용하면 아주 간단하게 요소를 병렬로 처리할 수 있습니다. 컬렉션에 parallelStream을 호출하면 병렬 스트림(parallel stream)이 생성됩니다. 병렬 스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림입니다. 따라서 병렬 스트림을 이용하면 모든 멀티코어 프로세서가 각각의 청크를 처리할 수 있도록 할당할 수 있습니다.

 

숫자 n을 인수로 받아서 1부터 n까지의 모든 숫자의 합계를 반환하는 메서드를 구현한다고 가정해보겠습니다. 숫자로 이루어진 무한 스트림을 만든다음에 인수로 주어진 크기로 스트림을 제한하고, 두 숫자를 더하는 BinaryOperator로 리듀싱 작업을 수행할 수 있습니다.

public static long sequentialSum(long n) {
	return Stream.iterate(1L, i -> i + 1) // 무한 자연수 스트림
			.limit(n) // n개 이하로 제한
			.reduce(0L, Long::sum); // 모든 숫자를 더하는 스트림 리듀싱 연산
}

 

전통적인 자바에서는 다음과 같이 반복문으로 구현이 가능합니다.

public static long iterativeSum(long n) {
	long result = 0;
	for (long i = 1L; i <= n; i++) {
		result += i;
	}
	return result;
}

 

특히 n이 커진다면 이 연산을 병렬로 처리하는 것이 좋을 것입니다. 병렬 스트림을 이용하면 이와 같은 문제를 쉽게 해결할 수 있습니다.

 

 

순차 스트림을 병렬 스트림으로 변환하기

 

순차 스트림에 parallel 메서드를 호출하면 기존의 함수형 리듀싱 연산이 병렬로 처리됩니다.

public static long parallelSum(long n) { return Stream.iterate(1L, i -> i + 1) .limit(n) .parallel() // 병렬스트림으로 변환 .reduce(0L, Long::sum); }

 

위 코드가 이전코드와 다른점은 여러 청크로 분할되어 있다는 것입니다. 리듀싱 연산을 여러 청크에 병렬로 수행할 수 있습니다. 리듀싱 연산으로 생성된 부분결과를 다시 리듀싱 연산으로 합쳐서 전체 스트림의 리듀싱 결과를 도출합니다.

 

사실 순차 스트림에 parallel을 호출해도 스트림 자체에는 아무 변화도 일어나지 않습니다. 내부적으로는 parallel을 호출하면 이후 연산이 병렬로 수행해야 함을 의미하는 Boolean 플래그가 설정됩니다. 반대로 sequential로 병렬 스트림을 순차 스트림으로 바꿀 수 있습니다. 이 두 메서드를 이용해서 어떤 연산을 병렬로 실행하고 어떤 연산을 순차로 실행할지 제어할 수 있습니다. 

stream.parallel()      .filter(...)      .sequential()      .map(...)      .parallel()      .reduce();

parallel과 sequential 두 메서드 중 최종적으로 호출된 메서드가 전체 파이프라인에 영향을 미칩니다. 이 예제에서 파이프라인의 마지막 호출은 parallel이므로 파이프라인은 전체적으로 병렬로 실행됩니다.

 

 

* 병렬 스트림에서 사용하는 스레드 풀 설정

스트림의 parallel 메서드에서 병렬로 작업을 수행하는 스레드는 어디서 생성되는 것이며, 몇 개나 생성되는지, 그리고 그 과정을 어떻게 커스터마이징할 수 있는지 확인해보겠습니다.

 

병렬 스트림은 내부적으로 ForkJoinPool을 사용합니다. 기본적으로 ForkJoinPool은 프로세서 수, 즉 Runtime.getRuntime().availableProcessors()가 반환하는 값에 상응하는 스레드를 갖습니다.

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");

 

위 예제는 전역 설정 코드이므로 이후의 모든 병렬 스트림 연산에 영향을 줍니다. 하나의 병렬 스트림에 사용할 수 있는 특정한 값을 지정할 수 없습니다. 일반적으로 기기의 프로세서 수와 같으므로 특별한 이유가 없다면 ForkJoinPool의 기본값을 그대로 사용할 것을 권장합니다. 병렬을 사용하면, 속도가 빨라질것이라고 예상하겠지만 소프트웨어 공학에서 추측은 위험한 방법입니다. 성능을 최적화할 때는 측정이 가장 중요합니다. 

 

 - iterate가 박싱된 객체를 생성하므로 이를 다시 언박싱하는 과정이 필요

 - iterate는 병렬로 실행될 수 있도록 독립적인 청크로 분할하기가 어렵다. iterate는 본질적으로 순차적이다.

 

이처럼 병렬 프로그래밍은 까다로우며 때로는 이해하기 어려운 함정도 숨어있습니다. 심지어 병렬 프로그래밍을 병렬과 거리가 먼 iterate를 사용하는 등 오용하게 될 경우 전체 프로그램의 성능이 더 나빠질 수도 있습니다. parallel 메서드를 호출했을 때 내부적으로 어떤 일이 일어나는지 꼭 이해해야 합니다.

 

 

더 특화된 메서드 사용하기

 

멀티코어 프로세서를 활용해서 효과적으로 합계 연산을 병렬로 실행하려면 LongStream.rangeClosed라는 메서드를 사용하는 것도 권장됩니다. 이 메서드는 iterate에 비해 다음과 같은 장점을 제공합니다.

 

 - LongStream.rangeClosed는 기본형 long을 직접 사용하므로 박싱과 언박싱 오버헤드가 사라진다.

 - LongStream.rangeClosed는 쉽게 청크로 분할할 수 있는 숫자 범위를 생산한다. 예를 들어 1-20 범위의 숫자를 각각 1-5, 6-10, 11-15, 16-20 범위의 숫자로 분할할 수 있다.

 

언박싱과 관련한 오버헤드를 측정하면, 다음과 같습니다. 우선 순차 스트림을 처리하는 시간을 측정하겠습니다.

public static long rangedSum(long n) {
	return LongStream.rangeClosed(1, n)
					.reduce(0L, Long::sum);
}

Ranged sum done in: 17 msesc

 

 

기존의 iterate 팩토리 메서드로 생성한 순차 버전에 비해 이 예제의 숫자 스트림 처리 속도가 더 빠릅니다. 특화되지 않은 스트림을 처리할 때는 오토박싱, 언박싱 등의 오버헤드를 수반하기 때문입니다. 상황에 따라서는 어떤 알고리즘을 병렬화하는 것보다 적절한 자료구조를 선택하는 것이 더 중요하다는 사실을 단적으로 보여줍니다.

public static long parallelRangedSum(long n) {
	return LongStream.rangeClosed(1, n)
					.parallel()
					.reduce(0L, Long::sum);
}

System.out.println("Parallel range sum done in: " +
	measureSumPerf(ParallelStreams::parallelRangedSum, 10000000) +
	" msesc");

Parallel range sum done in: 1 msecs

 

 

드디어 순차 실행보다 빠른 성능을 갖는 병렬 리듀싱을 만들었고, 실질적으로 리듀싱 연산이 병렬로 수행됩니다. 올바른 자료구조를 선택해야 병렬 실행도 최적의 성능을 발휘할 수 있다는 사실을 확인할 수 있습니다.

 

하지만 병렬화를 이용하려면 스트림을 재귀적으로 분할해야 하고, 각 서브스트림을 서로 다른 스레드의 리듀싱 연산으로 할당하고, 이들 결과를 하나의 값으로 합쳐야 합니다. 멀티코어 간의 데이터 이동은 생각보다 비쌉니다. 따라서 코어 간에 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 병렬로 다른 코어에서 수행하는 것이 바람직합니다. 

 

 

출처: 자바 8 인액션 (52p - 스트림)