본문 바로가기

프로그래밍(TA, AA)/JVM 언어

[Java] 개선된 자바 동시성(1) - 병렬 데이터 처리와 성능

1. 병렬 데이터 처리와 성능

- 병렬 스트림으로 데이터를 병렬 처리하기

- 병렬 스트림의 성능 분석

- 포크/조인 프레임워크

- Spliterator로 스트림 데이터 쪼개기

 

자바 개발자는 컬렉션 데이터 처리 속도를 높이려고 따로 고민할 필요가 없다.

 - 자바 스트림 인터페이스를 이용해서 데이터 컬렉션을 선언형으로 제어할 수 있다.

 - 외부 반복을 내부 반복으로 바꾸면 네이티브 자바 라이브러리가 스트림 요소의 처리를 제어할 수 있다.

 - 컴퓨터의 멀티코어를 활용해서 파이프라인 연산을 실행할 수 있다.

 

자바7이 등장하기 전에는 데이터 컬렉션을 병렬로 처리하기가 아래처럼 어려웠다. 자바7은 더 쉽게 병렬화를 수행하면서 에러를 최소화할 수 있도록 fork/join framework 기능을 제공한다.

 1. 데이터를 서브파트로 분할

 2. 분할된 서브파트를 각각의 스레드로 할당

 3. 스레드로 할당한 다음에는 의도치 않은 race condition이 발생하지 않도록 적절한 동기화 추가

 4. 마지막으로 부분 결과를 합쳐야 한다.

 

스트림으로 데이터 컬렉션 관련 동작을 병렬로 쉽게 실행할 수 있다. 자바 7에 추가된 포크/조인 프레임워크와 내부적인 병렬 스트림 처리는 어떤 관계가 있는지 살펴보자. 병렬 스트림이 내부적으로 어떻게 처리되는지 알아야만 스트림을 잘못 사용하는 상황을 피할 수 있다.

 

우선 여러 청크를 병렬로 처리하기 전에 병렬 스트림이 요소를 여러 청크로 분할하는 방법을 설명할 것이다. 이 원리를 이해하지 못하면 의도치 않은, 설명하기 어려운 결과가 발생할 수 있다. 따라서 커스텀 Spliterator를 직접 구현하면서 분할 과정을 우리가 원하는 방식으로 제어하는 방법도 설명한다.


1.1. 병렬 스트림

스트림 인터페이스를 이용하면 아주 간단하게 요소를 병렬로 처리할 수 있다고 설명했다. 콜렉션에 parallelStream을 호출하면 병렬 스트림(parallel stream)이 생성된다. 병렬 스트림이란 각가의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다. 따라서 병렬 스트림을 이용하면 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당할 수 있다.

 

숫자 n을 인수로 받아서 1부터 n까지의 모든 숫자의 합계를 반환하는 메서드를 구현한다고 가정하자. 전통적인 자바에서는 반복문으로 이를 구현할 수 있다. n이 커진다면 이 연산을 병렬로 처리하는 것이 좋을 것이다.

 - 무엇부터 건드려야 할까?

 - 결과 변수는 어떻게 동기화해야 할까?

 - 몇 개의 스레드를 사용해야 할까?

 - 숫자는 어떻게 생성할까?

 - 생성된 숫자는 누가 더할까?

1.1.1. 순차 스트림을 병렬 스트림으로 변환하기

순차 스트림에 parallel 메서드를 호출하면 기존의 함수형 리듀싱 연산(숫자 합계 계산)이 스트림이 여러 청크로 분할되어 병렬로 처리된다.

public long parallelSum(long n) {
    return Stream.iterate(1L, i -> i + 1)
                 .limit(n)
                 .parallel()    // 스트림을 병렬 스트림으로 변환
                 .reduce(0L, Long::sum);
}

병렬 리듀싱 연산

사실 순차 스트림에 parallel을 호출해도 스트림 자체에는 아무 변화도 일어나지 않는다. 내부적으로는 parallel을 호출하면 이후 연산이 병렬로 수행해야 함을 의미하는 boolean flag가 설정된다. 반대로 sequential로 병렬 스트림을 순차 스트림으로 바꿀 수 있다. 이 두 메서드를 이용해서 어떤 연산을 병렬로 실행하고 어떤 연산을 순차로 실행할지 제어할 수 있다.

병렬 스트림에서 사용하는 스레드 풀 설정

스트림의 parallel 메서드에서 병렬로 작업을 수행하는 스레드는 어디서 생성되는 것이며, 몇개나 생성되는지, 그리고 그 과정을 어떻게 커스터마이즈할 수 있는지 궁금할 것이다.

 

병렬 스트림은 내부적으로 ForkJoinPool을 사용한다. 기본적으로 ForkJoinPool은 프로세서 수, 즉 Runtime.getRuntime().availableProcessors()가 반환하는 값에 상응하는 스레드를 갖는다.

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");

위 예제는 전역 설정 코드이므로 이후의 모든 병렬 스트림 연산에 영향을 준다. 현재는 하나의 병렬 스트림에 사용할 수 있는 특정한 값을 지정할 수 없다. 일반적으로 기기의 프로세서 수와 같으므로 특별한 이유가 없다면 ForkJoinPool의 기본값을 그대로 사용할 것을 권장한다.

1.1.2. 스트림 성능 측정

Java Microbenchmark Harness(JMH)를 이용하면 간단하고, 어노테이션 기반 방식을 지원하며, 안정적으로 자바 프로그램이나 JVM을 대상으로 하는 다른 언어용 벤치마크를 구현할 수 있다. 사실 JVM으로 실행되는 프로그램을 벤치마크하는 작업은 쉽지 않다. HotSpot이 바이트코드를 최적화하는데 필요한 warm-up 시간, 가비지 컬렉터로 인한 오버헤드 등과 같은 여러 요소를 고려해야 하기 때문이다. 

@State(Scope.Thread)
@BenchmarkMode(Mode.AverageTime)                 // 벤치마크 대상 메서드를 실행하는데 걸린 평균 시간 측정
@OutputTimeUnit(TimeUnit.MILLISECONDS)           // 벤치마크 결과를 밀리초 단위로 출력
@Fork(value = 2, jvmArgs = {"-Xms4G", "-Xmx4G"}) // 4Gb의 힙 공간을 제공한 환경에서 두번 벤치마크를 수행해 결과의 신뢰성 확보
@Measurement(iterations = 8)
@Warmup(iterations = 3)
public class ParallelStreamBenchmarkTest {
	private static final long N = 10_000_000L;

	@Benchmark
	public long sequentialSum() {
		return Stream.iterate(1L, i -> i + 1).limit(N)
				.reduce(0L, Long::sum);
	}

	@Benchmark
	public long iterativeSum() {
		long result = 0;
		for (long i = 1L; i <= N; i++) {
			result += i;
		}
		return result;
	}

	@Benchmark
	public long parallelSum() {
		return Stream.iterate(1L, i -> i + 1).limit(N)
				.parallel()
				.reduce(0L, Long::sum);
	}

	@Benchmark
	public long rangedSum() {
		return LongStream.rangeClosed(1, N)
				.reduce(0L, Long::sum);
	}

	@Benchmark
	public long parallelRangedSum() {
		return LongStream.rangeClosed(1, N)
				.parallel()
				.reduce(0L, Long::sum);
	}

	@TearDown(Level.Invocation)
	public void tearDown() {
		System.gc();
	}
}

벤치마크가 가능한 GC의 영향을 받지 않도록 힙의 크기를 충분하게 설정했을 뿐 아니라 벤치마크가 끝날때마다 GC가 실행되도록 강제했다. 이렇게 주의를 기울였지만 여전히 결과는 정확하지 않을 수 있음을 기억하자. 기계가 지원하는 코어의 갯수 등이 실행 시간에 영향을 미칠 수 있기 때문이다.

Benchmark                                      Mode  Cnt   Score    Error  Units
ParallelStreamBenchmarkTest.sequentialSum      avgt    4  53.939 ±  0.528  ms/op
ParallelStreamBenchmarkTest.iterativeSum       avgt    4   4.040 ±  0.029  ms/op
ParallelStreamBenchmarkTest.parallelSum        avgt    4  51.883 ± 90.744  ms/op
ParallelStreamBenchmarkTest.rangedSum          avgt    4   4.302 ±  0.235  ms/op
ParallelStreamBenchmarkTest.parallelRangedSum  avgt    4   2.950 ±  6.470  ms/op

Benchmark                                      Mode  Cnt   Score    Error  Units
ParallelStreamBenchmarkTest.sequentialSum      avgt   16  53.963 ±  0.140  ms/op
ParallelStreamBenchmarkTest.iterativeSum       avgt   16   4.049 ±  0.017  ms/op
ParallelStreamBenchmarkTest.parallelSum        avgt   16  44.990 ± 11.248  ms/op
ParallelStreamBenchmarkTest.rangedSum          avgt   16   9.691 ±  4.157  ms/op
ParallelStreamBenchmarkTest.parallelRangedSum  avgt   16   3.687 ±  0.191  ms/op

Benchmark                                      Mode  Cnt   Score   Error  Units
ParallelStreamBenchmarkTest.sequentialSum      avgt   40  54.029 ± 0.034  ms/op
ParallelStreamBenchmarkTest.iterativeSum       avgt   40   4.200 ± 0.039  ms/op
ParallelStreamBenchmarkTest.parallelSum        avgt   40  46.679 ± 7.806  ms/op
ParallelStreamBenchmarkTest.rangedSum          avgt   40   6.246 ± 0.430  ms/op
ParallelStreamBenchmarkTest.parallelRangedSum  avgt   40   4.059 ± 0.176  ms/op

전통적인 for 루프를 사용해 반복하는 방법이 더 저수준으로 동작할 뿐 아니라 특히 기본값을 박싱하거나 언박싱할 필요가 없으므로 더 빠르다. 순차적 스트립을 사용하는 버전에 비해 거의 10배가 빠르다는 것을 확인할 수 있다.

 

병렬 버전이 멀티코어 CPU를 활용하지 못하고 순차 버전과 크게 차이가 없는 결과가 나왔다. 이 의외의 결과를 어떻게 설명해야 할까?

 - 반복 결과로 박싱된 객체가 만들어지므로 숫자를 더하려면 언박싱을 해야 한다.

 - 반복 작업은 병렬로 수행할 수 있는 독립 단위로 나누기가 어렵다.

 

이전 연산의 결과에 따라 다음 함수의 입력이 달라지기 때문에 iterate 연산을 청크로 분할하기가 어렵다. 디슈싱 과정을 시작하는 시점에 전체 숫자 리스트가 준비되지 않았으므로 스트림을 병렬로 처리할 수 있도록 청크로 분할할 수 없다. 스트림이 병렬로 처리되도록 지시했고 각각의 합계가 다른 스레드에서 수행되었지만 결국 순차처리 방식과 크게 다른 점이 없으므로 스레드를 할당하는 오버헤드만 증가하게 된다.

 

이처럼 병렬 프로그래밍은 까다롭고 때로는 이해하기 어려운 함정이 숨어있다. 심지어 병렬프로그래밍을 오용(예를 들어 병렬ㄹ과 거리가 먼 반복 작접)하면 오히려 전체 프로그램의 성능이 더 나빠질 수도 있다. 따라서 마법같은 parallel 메서드를 호출했을때 내부적으로 어떤 일이 일어나는지 꼭 이해해야 한다.

더 특화된 메서드 사용

멀티코어 프로세서를 활용해서 효과적으로 합계 연산을 병렬로 실행하려면 어떻게 해야 할까? LongStream.rangeClosed 메서드는 iterate에 비해 다음과 같은 장점을 제공한다.

- LongStream.rangeClosed는 기본형 long을 직접 사용하므로 박싱과 언박싱 오버헤드가 사라진다.

- LongStream.rangeClosed는 쉽게 청크로 분할할 수 있는 숫자 범위를 생산한다. 예를 들어 1-20 범위의 숫자를 각각 1-5, 6-10, 11-15, 16-20 범위의 숫자로 분할할 수 있다.

 

기존의 iterate 팩토리 메서드로 생성한 순차 버전에 비해 숫자 스트림 처리 속도가 더 빠르다. 특화되지 않은 스트림을 처리할 때는 오토박싱, 언박싱 등의 오버헤드를 수반하기 때문이다. 상황에 따라서는 어떤 알고리즘을 병렬화하는 것보다 적절한 자료구조를 선택하는 것이 더 중요하다는 사실을 단적으로 보여준다.

 

parallelRangedSum의 결과는 어떠할까? 순차실행보다 빠른 성능을 갖는 병렬 리듀싱이 만들어졌다. 올바른 자료구조를 선택해야 병렬실행도 최적의 성능을 발휘할 수 있다는 사실을 확인할 수 있다. 함수형 프로그래밍을 올바로 사용하면 반복적으로 코드를 실행하는 방법에 비해 최신 멀티 코어 CPU가 제공하는 병렬 실행의 힘을 단순하게 직접적으로 얻을 수 있다.

parallelSum
parallelRangedSum

병렬화를 이용하려면 스트림을 재귀적으로 분할해야 하고, 각 서브스트림을 서로 다른 스레디의 리듀싱 연산으로 할당하고, 이들 결과를 하나의 값으로 합쳐야 한다. 멀티코어 간의 데이터 이동은 우리 생각보다 비싸다. 따라서 코어 간에 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 병렬로 다른 코어에서 수행하는 것이 바람직하다. 스트림을 병렬화해서 코드 실행 속도를 빠르게 하고 싶으면 항상 병렬화를 올바르게 사용하고 있는지 확인해야 한다.

1.1.4. 병렬 스트림 효과적으로 사용하기

'천 개 이상의 요소가 있을때만 병렬 스트림을 사용하라'와 같이 양을 기준으로 병렬 스트림 사용을 결정하는 것은 적절하지 않다. 정해진 기기에서 정해진 연산을 수행할 때는 이와 같은 기준을 사용할 수 있지만 상황이 달라지면 이와 같은 기준이 제 역할을 하지 못하다. 그래도 어떤 상황에서 병렬 스트림을 사용할 것인지 약간의 수량적 힌트를 정하는 것이 도움이 될 때도 있다.

  • 병렬 스트림의 수행 과정을 투명하지 않을 때가 많다. 따라서 순차 스트림과 병렬 스트림 중 어떤 것이 좋을지 모르겠다면 적절한 벤치마크로 직접 성능을 측정하는 것이 바람직 하다.
  • 자동 박싱과 언박싱은 성능을 크게 저하시킬 수 있는 요소다. 따라서 되도록이면 기본형 특화 스트림을 사용하는 것이 좋다.
  • 순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있다. 특히 limit, findFirst처럼 요소의 순서에 의존하는 연산을 병렬 스트림에서 수행하려면 비싼 비용을 치러야 하낟. 예를 들어 findAny는 요소의 순서와 상관없이 연산하므로 findFirst보다 성능이 좋다. 정렬된 스트림에 unordered를 호출하면 비정렬된 스트림을 얻을 수 있다. 스트림에 N개 요소가 있을때 요소의 순서가 상관없다면 비정렬된 스트림에 limit을 호출하는 것이 더 효율적이다.
  • 스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하라. 처리해야할 요소 수가 N이고 하나의 요소를 처리하는데 드는 비용을 Q라 하면 전체 스트림 파이프라인 처리 비용을 N*Q로 예상할 수 있다. Q가 높아진다는 것은 병렬 스트림으로 성능을 개선할 수 있는 가능성이 있음을 의미한다.
  • 소량의 데이터에서는 병렬 스트림이 도움되지 않는다. 소량의 데이터를 처리하는 상호아에서는 병렬화 과정에서 생기는 부가 비용을 상쇄할 수 있을 만큼의 이득을 얻지 못하기 때문이다.
  • 스트림을 구성하는 자료구조가 적절한지 확인하라. 또한 커스텀 Spliterator를 구현해서 분해 과정을 완벽하게 제어할 수 있다.
  • 스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다.
  • 최종 연산의 병합 과정(예를 들면 Collector와 combiner 메서드) 비용을 살펴보라. 병합 과정의 비용이 비싸다면 병렬 스트림으로 얻은 성능의 이익의 서브스트림의 부분 결과를 합치는 과정에서 상쇄 될 수 있다.

다음 표에 분해와 관련해서 다양한 스트림 소스의 병렬화 친밀도를 요약 설명했다.

소스 분해성
ArrayList 훌륭함
LinkedList 나쁨
IntStream.range 훌륭함
Stream.iterate 나쁨
HashSet 좋음
TreeSet 좋음

마지막으로 병렬 스트림이 수행되는 내부 인프라구조도 살펴봐야 한다. 자바 7에서 추가된 포크/조인 프레임워크로 병렬 스트림이 처리된다. 병렬 합꼐 예제에서는 병렬 스트림을 제대로 사용하려면 병렬 스트림의 내부 구조를 잘 알아야 함을 보여줬다. 다음 절에서 포크/조인 프레임워크를 자세히 살펴보자.


1.2. Fork/Join Framework

포크/조인 프레임워크는 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계되었다. 포크/조인 프레임워크에서는 서브태스크를 스레드풀(ForkJoinPool)의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현한다.

1.2.2. 포크/조인 프레임워크를 제대로 사용하는 방법

포크/조인 프레임워크는 쉽게 사용할 수 있는 편이지만 항상 주의를 기울여야 한다. 다음은 포크/조인 프레임워크를 효과적으로 사용하는 방법이다.

  • join 메서드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될때까지 호출자를 블록시킨다. 따라서 두 서브태스크가 모두 시작된 다음에 join을 호출해야 한다. 그렇지 않으면 각각의 서브태스크가 다른 태스크가 끝나길 기다리는 일이 발생하며 원래 순차 알고리즘보다 느리고 복잡한 프로그램이 되어버릴 수 있다.
  • RecursiveTask 내에서는 ForkJoinPool의 invoke 메서드를 사용하지 말아야 한다. 대신 compute나 fork 메서드를 직접 호출할 수 있다. 순차 코드에서 병렬 계산을 시작할 때만 invoke를 사용한다.
  • 병렬 스트림에서 살펴본 것처럼 멀티코어에서 포크/조인 프레임워크를 사용하는 것이 순차 처리보다 무조건 빠를거라는 생각은 버려야 한다. 병렬 처리로 성능을 개선하려면 태스크를 여러 독립적인 서브태스크로 분할할 수 있어야 한다. 각 서브태스크의 실행시간은 새로운 태스크를 포킹하는데 드는 시간보다 길어야 한다.
    예를 들어, I/O를 한 서브태스크에 할당하고 다른 서브태스크에서는 계산을 실행, 즉 I/O와 계산을 병렬로 실행할 수 있다. 또한 순차 버전과 병렬 버전의 성능을 비교할 때는 다른 요소도 고려해야 한다. 다른 자바 코드와 마찬가지로 JIT 컴파일러에 의해 최적화되려면 몇차례의 '준비과정(warm up)' 또는 실행과정을 거쳐야 한다. 따라서 성능을 측정할 때는 여러번 프로그램을 실행한 결과를 측정해야 한다. 또한 컴파일러 최적화는 병렬 버전보다는 순차 버전에 집중될 수 있다는 사실도 기억하자.

1.2.3. 작업 훔치기 (Work stealing)

포크/조인 프레임워크에서는 work stealing이라는 기법으로 ForkJoinPool의 모든 스레드를 거의 공정하게 분할한다. 각각의 스레드는 자신에게 할당된 태스크를 포함하는 doubly linked list를 참조하면서 작업이 끝날때마다 큐의 헤드에서 다른 태스크를 가져와서 작업을 처리한다. 이때 한 스레드는 다른 스레드보다 자신에게 할당된 태스크를 더 빨리 처리할 수 있다. 즉, 다른 스레드는 바쁘게 일하고 있는데 한 스레드는 할일이 다 떨어진 상황이다. 이때 할일이 없어진 스레드는 유휴 상태로 바뀌는 것이 아니라 다른 스레드 큐의 꼬리에서 작업을 훔쳐온다. 모든 태스크가 작업을 끝낼때까지, 즉 모든 큐가 빌때까지 이 과정을 반복한다. 따라서 태스크의 크기를 작게 나누어야 작업자 스레드 간의 작업부하를 비슷한 수준으로 유지할 수 있다.

work stealing


1.3. Spliterator 인터페이스

자바8은 Spliterator라는 새로운 인터페이스를 제공한다. Spliterator는 '분할할 수 있는 반복자(splitable iterator)'라는 의미다. Iterator처럼 Spliterator는 소스의 요소 탐색 기능을 제공한다는 점은 같지만 Spliterator는 병렬작업에 특화되어 있다. Spliterator가 어떻게 동작하는지 이해한다면 병렬 스트림 동작과 관련한 통찰력을 얻을 수 있을 것이다.

 

자바8은 컬렉션 프레임워크에 포함된 모든 자료구조에 사용할 수 있는 디폴트 Spliterator 구현을 제공한다. 컬렉션은 spliterator라는 메서드를 제공하는 Spliterator 인터페이스를 구현한다.

public interface Spliterator<T> {
    boolean tryAdvance(Consumer<? super T> action);
    Spliterator<T> trySplit();
    long estimateSize();
    int characteristics();
}
  • T는 Spliterator에서 탐색하는 요소의 형식을 가리킨다.
  • tryAdvance는 Spliterator의 요소를 하나씩 순차적으로 소비하면서 탐색해야할 요소가 남아있으면 참을 반환한다.
  • trySplit 메서드는 Spliterator의 일부 요소를 분할해서 두번째 Spliterator를 생성하는 메서드다.
  • estimateSize 메서드로 탐색해야 할 요소 수 정보를 제공할 수 있다. 제공된 값을 이용해서 더 쉽고 공평하게 Spliterator를 분할할 수 있다.

이 기능을 더 효과적으로 이용하려면 분할 과정이 어떻게 진행되는지 이해하는 것이 좋다.

1.3.1. 분할 과정

recursive splitting process


1.4. 마치며

  • 내부 반복을 이용하면 명시적으로 다른 스레드를 사용하지 않고도 스트림을 병렬로 처리할 수 있다.
  • 간단하게 스트림을 병렬로 처리할 수 있지만 항상 병렬 처리가 빠른 것은 아니다. 병렬 소프트웨어 동작 방법과 성능은 직관적이지 않을때가 많으므로 병렬 처리를 사용했을때 성능을 직접 측정해봐야 한다.
  • 병렬 스트림으로 데이터 집합을 병렬 실행할때 특히 처리해야 할 데이터가 아주 많거나 각 요소를 처리하는데 오랜 시간이 걸릴때 성능을 높일 수 있다.
  • 가능하면 기본형 특화 스트림을 사용하는 등 올바른 자료구조 선택이 어떤 연산을 병렬로 처리하는 것보다 성능적으로 더 큰 영향을 미칠 수 있다.
  • 포크/조인 프레임워크에서는 병렬화할 수 있는 태스크를 작은 태스크로 반할한 다음에 분할된 태스크를 각각의 스레드로 실행하며 서브태스크 각각의 결과를 합쳐서 최종 결과를 생산한다.
  • Spliterator는 탐색하려는 데이터를 포함하는 스트림을 어떻게 병렬화할 것인지 정의한다.