본문 바로가기

프로그래밍(TA, AA)/JVM 언어

[자바] 자바8 스트림으로 데이터 수집 (Collector)

자바의 스트림 긴으을 사용하면 데이터베이스 같은 연산을 할 수 있다. 데이터 집합을 유연하게 처리할 수 있는 lazy iterator의 개념으로 이해해도 좋다. 스트림 연산은 중간연산/최종연산으로 구분되는데 중간 연산은 한 스트림을 다른 스트림으로 변환하는 연산으로서, 여러 연산을 연결할 수 있다. (파이프의 역할)중간 연산은 스트림 파이프라인을 구성하며, 스트림의 요소를 consume하지는 않는다.

 

반면 최종 연산은 스트림의 요소를 소비해서 최종 결과를 도출한다. 최종 연산은 스트림 파이프라인을 최적화하면서 계산 과정을 짧게 생략하기도 한다.

 

스트림의 최종 연산중 collect가 존재하는데, 다양한 요소 누적 방식을 인수로 받아서 스트림을 최종 결과로 도출하는 리듀싱 연산을 수행할 수 있다. 다양한 요소 누적 방식은 Collector 인터페이스에 정의되어 있다.

Map<Currency, List<Transaction>> transactionsByCurrencies = transactions.stream().collect(groupingBy(Transaction::getCurrency));

 


컬렉터란 무엇인가?

함수형 프로그래밍에서는 '무엇'을 원하는지 직접 명시할 수 있어서 어떤 방법으로 이를 얻을지는 신경 쓸 필요가 없다. Collector 인터페이스 구현은 스트림의 요소를 어떤식으로 도출할지 지정한다. 자주 사용하는 toList는 '각 요소를 리스트로 만들어라'를 의미하는 Collector 인터페이스의 구현이다. 위 예제의 groupingBy는 '각 key 버킷 그리고 각 key 버킷에 대응하는 요소 리스트를 value로 포함하는 Map을 만들라'는 동작을 수행한다.

 

multiLevel으로 그룹화를 수행할 때 명령형 프로그래밍과 함수형 프로그래밍의 차이점은 더욱 두드러지게 된다. 명령형 코드에서는 문제를 해결하는 과정에서 다중 루프와 조건문을 추가하게되어 가독성과 유지보수성이 매우 떨어지게 된다. 함수형 프로그래밍에서는 필요한 컬렉터를 쉽게 추가할 수 있다.

 

고급 리듀싱 기능을 수행하는 컬렉터

collect로 결과를 수집하는 과정을 간단하면서도 유연한 방식으로 정의할 수 있다는 점은 컬렉터의 최대 강점이다. 스트림에 collect를 호출하면 스트림의 요소에 (컬렉터로 파라미터화된) 리듀싱 연산이 수행된다. 명령형 프로그래밍에서는 개발자가 직접 구현해야 했던 작업이 자동으로 수행된다. collect에서는 리듀싱 연산을 이용해서 스트림의 각 요소를 방문하면서 컬렉터가 작업을 처리한다.

 

보통 함수를 요소로 변환(toList처럼 데이터 자체를 변환하는 것보다는 데이터 저장 구조를 변환할 때가 많다)할때는 컬렉터를 적용하며 최종 결과를 저장하는 자료구조에 값을 누적한다. Collector 인터페이스의 메서드를 어떻게 구현하느냐에 따라 스트림에 어떤 리듀싱 연산을 수행할지 결정된다. Collectors 유틸리티 클래스는 자주 사용하는 컬렉터 인스턴스를 손쉽게 생성할 수 있는 정적 팩토리 메서드를 제공한다. 가장 대표적인 예로 toList는 스트림의 모든 요소를 리스트로 누적한다.

List<Transaction> transactions = transactionStream.collect(Collectors.toList());

 

미리 정의된 컬렉터

Collectors에서 제공하는 메서드의 기능은 크게 세가지(스트림 요소를 하나의 값으로 리듀스하고 요약 / 요소 그룹화 / 요소 분할)로 구분할 수 있다.

 

리듀싱과 요약(summarize) 관련 기능을 수행하는 컬렉터는 트랜잭션 리스트에서 트랜잭션 총합을 찾는 등의 다양한 계산을 수행할 때 이들 컬렉터를 유용하게 활용할 수 있다. 스트림 요소를 그룹화하는 방법의 경우, multiLevel 그룹화나 각각의 결과 서브그룹에 추가로 리듀싱 연산을 적용할 수 있다록 다양한 컬렉터를 조합할 수도 있다. 또한 그룹화의 특수연산인 partitioning도 존재한다. 파티셔닝은 한개의 인수를 받아 Boolean을 반환하는 함수인 Predicate를 그룹화 함수로 사용한다.

 

 


리듀싱과 요약

컬렉터(Stream.collect 메서드의 인수)로 스트림의 항목을 컬렉션으로 재구성할 수 있다. 컬렉터로 스트림의 모든 항목을 하나의 결과로 합칠 수 있다. 트리를 구성하는 다수준 맵, 메뉴의 칼로리 합계를 가리키는 단순한 정수 등 다양한 형식으로 도출될 수가 있다.

 

최댓값과 최솟값 검색

Collectors.maxBy, Collectors.minBy 두 개의 메서드를 이용해서 스트림의 최댓값과 최솟값을 계산할 수 있다. 두 컬렉터는 스트림의 요소를 비교하는데 사용할 Comparator를 인수로 받는다. 

 

Comparator<Dish> disCaloriesComparator = Comparator.comparingInt(Dish::getCalories);
Optional<Dish> mostCalorieDish = menu.stream().collect(maxBy(dishCaloriesComparator));

자바8은 값을 포함하거나 포함하지 않을 수 있는 컨테이너 Optional을 제공한다. 스트림에 있는 개체의 숫자 필드의 합계나 평균 등을 반환하는 연산에도 리듀싱 기능이 자주 사용된다. 이러한 연산을 요약(summarization) 연산이라고 부른다.

 

요약 연산

Collectors 클래스는 Collectors.summingInt라는 특별한 요약 팩토리 메서드를 제공한다. summingInt는 객체를 int로 매핑하는 함수를 인수로 받고, 인수로 전달된 함수는 객체를 int를 int로 매핑한 컬렉터를 반환한다. 그리고 summingInt가 collect 메서드로 전달되면 요약 작업을 수행한다. 각 요소의 값을 탐색하면서 초깃값으로 설정되어있는 누적자에 요소값을 더해간다.

 

Collectors.averagingInt,  Collectors.averagingLong, Collectors.averagingDouble 등으로 다양한 형식으로 이루어진 숫자 집합의 평균을 계산할 수도 있다.

 

Stream 인터페이스의 collect와 reduce 메서드의 차이는 무엇일까?

collect 메서드는 도출하려는 결과를 누적하는 컨테이너를 바꾸도록 설계된 메서드인 반면 reduce는 두 값을 하나로 도출하는 불변형 연산이라는 점에서 의미론적인 문제가 일어난다. reduce 메서드는 누적자로 사용된 리스트를 변환시키므로 reduce를 잘못 활용하게 된다.

의미론적으로 reduce 메서드를 잘못하면서 실용성 문제도 발생하게 된다. 여러 스레드가 동시에 같은 데이터 구조체를 고치면 리스트 자체가 망가져버리므로 리듀싱 연산을 병렬로 수행할 수 없다는 점도 문제다. 이 문제를 해결하려면 매번 새로운 리스트를 할당해야 하고 따라서 객체를 할당하느라 성능이 저하될 것이다.

가변 컨테이너 관련 작업이면서 병렬성을 확보하려면 collect 메서드로 리듀싱 연산을 구현하는 것이 바람직하다.

 

컬렉션 프레임워크 유연성: 같은 연산도 다양한 방식으로 수행 가능 

reducing 컬렉터를 사용한 이전 예제에서 람다 표현식 대신 Integer 클래스의 sum 메서드 레퍼런스를 이용하면 코드를 더 단순화할 수 있다.

 

int totalCalories = menu.stream().collect(reducing(0, 	// 초기값
                            Dish::getCalories,          // 변환 함수
                            Integer::sum));             // 합계 함수

리듀싱 연산과정은 누적자를 초깃값으로 초기화하고, 합계 함수를 이용해서 각 요소에 변환 함수를 적용한 결과 숫자를 반복적으로 조합한다. counting 컬렉터도 3개의 인수를 갖는 reducing 팩토리 메서드를 이용해서 구현할 수 있다. 

 

한 개의 인수를 갖는 reduce를 스트림에 적용한 다른 예제와 마찬가지로 reduce(Integer::sum)도 빈 스트림과 관련한 널 문제를 피할 수 있도록 int가 아닌 Optional<Integer>를 반환한다. 그리고 get으로 Optional 객체 내부의 값을 추출한다. 스트림이 비어있지않다는 가정하에 get을 자유롭게 사용할수 있다. 하지만 일반적으로는 기본값을 제공할 수 있는 orElse, orElseGet 등을 이용해서 Optional의 값을 얻어오는 것이 좋다. 마지막으로 스트림을 IntStream으로 매핑한 다음에 sum 메서드를 호출하는 방법으로도 결과를 얻을 수 있다.

 

문제를 해결할 수 있는 다양한 해결 방법을 확인한 다음에 가장 일반적으로 문제에 특화된 해결책을 고르는 것이 바람직하다. 이렇게 함으로써 가독성과 성능이라는 두마리 토끼를 잡을수 있게 된다. 

 

int totalCalories = menu.stream().mapToInt(Dish::getCalories).sum();

예를 들어, 위 해결방법은 가독성이 가장 좋고 간결하다. 또한 IntStream 덕분에 자동 언박싱 연산을 수행하거나 Integer를 int로 변환하는 과정을 피할 수 있으므로 성능까지 좋다.

 


그룹화

데이터 집합을 하나 이상의 특성으로 분류해서 그룹화하는 연산도 데이터베이스에서 많이 수행되는 작업이다. 명령형 코드에서 직접 그룹화를 구현하려면 까다롭고, 할일이 많으며, 에러도 많이 발생한다. 자바8 함수형으로는 가독성있는 한줄의 코드로 그룹화를 구현할 수 있다. 다음처럼 팩토리 메서드 Collectors.groupingBy를 이용해서 쉽게 메뉴를 그룹화할 수 있다.

 

Map<Dish.Type, List<Dish>> dishesByType = menu.stream().collect(groupingBy(Dish::getType));

 

다음은 Map에 표함된 결과다.

{FISH=[prawns, salmon], OTHER=[french fries, rice, season fruit, pizza], MEAT=[pork, beef, chicken]}

 

groupingBy 함수를 기준으로 스트림이 그룹화되므로 이를 분류함수(classification function)라고 부른다. 그룹화 연산의 결과로 그룹화 함수가 반환하는 키 그리고 각 키에 대응하는 스트림의 모든 항목 리스트를 값으로 갖는 맵이 반환된다. 메뉴 그룹화 예제에서 키는 요리 종류고, 값은 해당 종류에 포함되는 모든 요리가 되는 것이다. 단순한 속성 접근자 대신 더 복잡한 분류 기준이 필요한 상황에서는 메서드 레퍼런스를 분류함수로 사용할 수 없다. 

 

다수준 그룹화

두 인수를 받는 팩토리 메서드 Collectors.groupingBy를 이요해서 항목을 다수준으로 그룹화할 수 있다. Collectors.groupingBy는 일반적은 분류 함수와 컬렉터를 인수로 받는다. 즉, 바깥쪽 groupingBy 메서드에 스트림의 항목을 분류할 두번째 기준을 정의하는 내부 groupingBy를 전달해서 두 수준으로 스트림의 항목을 그룹화할 수 있다.

 

Map<Dish.Type, Map<CaloricLevel, List<Dish>>> dishesByTypeCaloricLevel = 
  menu.stream().collect(
      groupingBy(Dish::getType,  // 첫번째수준 분류함수
        groupingBy(dish -> {     // 두번째수준 분류함수
        	if (dish.getCalories() <= 400) {
              return CaloricLevel.DIET;
            } else if (dish.getCalories() <= 700) {
              return CaloricLevel.NORMAL;
            } else {
              return CaloricLevel.FAT;
            }
          })
        )
      );
        

그룹화의 결과로 다음과 같은 2 Level 수준의 맵이 만들어진다.

{MEAT={DIET=[chicken], NORMAL=[beef], FAT=[pork]},
 FISH={DIET=[prawns], NORMAL=[salmon]},
 OTHER={DIET=[rice, seasonal fruit], NORMAL=[french fries, pizza]}}

외부맵은 첫번째 수준의 분류함수에서 분류한 키값 'fish, meat, other'를 갖는다. 그리고 외부 맵의 값은 두번째 수준의 분류 함수의 기준 'normal, diet, fat'을 키값으로 갖는다. 다수준 그룹화 연산은 다양한 수준으로 확장할 수 있다. 즉, n수준 그룹화의 결과는 n수준 트리 구조로 표현되는 n수준 맵이 된다.

 

보통 groupingBy 연산은 버킷 개념으로 생각하면 쉽다. 첫번째 groupingBy는 각 키의 버킷을 만든다. 그리고 준비된 각각의 버킷을 서브스트림 컬렉터로 채워가기를 반복하면서 n수준 그룹화가 이루어진다.

 

서브그룹으로 데이터 수집

groupingBy로 넘겨주는 컬렉터의 형식은 제한이 없다. 다음 코드처럼 groupingBy 컬렉터에 두번째 인수로 counting 컬렉터를 전달해서 메뉴에서 요리의 수를 종류별로 계산할 수 있다.

Map<Dish.Type, Long> typesCount = menu.stream().collect(groupingBy(Dish::getType, counting()));

 

분류함수 한개의 인수를 갖는 groupingBy(f)는 사실 groupingBy(f, toList())의 축약형이다. 요리의 종류를 분류하는 컬렉터로 메뉴에서 가장 높은 칼로리를 가진 요리를 찾는 프로그램도 다시 구현할 수 있다.

Map<Dish.Type, Optional<Dish>> mostCaloricByType =
  menu.stream()
      .collect(groupingBy(Dish::getType,
                          maxBy(comparingInt(Dish::getCalories))));

 

그룹화의 결과로 요리의 종류를 key로, Optional<Dish>를 value로 갖는 맵이 반환된다. Optional<Dish>는 해당 종류의 음식 중 가장 높은 칼로리를 래핑한다. 팩토리 메서드 maxBy가 생성하는 컬렉터의 결과 형식에 따라 맵의 값이 Optional 형식이 되었다. 실제로 메뉴의 요리 중 Optional.empty()를 값으로 갖는 요리는 존재하지 않는다. 처음부터 존재하지 않는 요리의 키는 맵에 추가되지 않기 때문이다. groupingBy 컬렉터는 스트림의 첫번째 요소를 찾은 이후에야 그룹화 맵에 새로운 키를 추가(lazy)한다. 리듀싱 컬렉터가 반환하는 형식을 사용하는 상황이므로 굳이 Optional 래퍼를 사용할 필요는 없다.

 

컬렉터 결과를 다른 형식에 적용하기

그룹화 연산에서 맵의 모든 값을 Optional로 감살 필요가 없으므로 Optional을 삭제할 수 있다. 즉, 다음처럼 팩토리 메서드 Collectors.collectingAndThen으로 컬렉터가 반환한 결과를 다른 형식으로 활용할 수 있다.

Map<Dish.Type, Dish> mostCaloricByType =
 menu.stream()
     .collect(groupingBy(Dish::getType,  // 분류 함수
              collectingAndThen(maxBy(comparingInt(Dish::getCalories)), Optional::get)));

팩토리 메서드 collectingAndThen은 적용할 컬렉터와 변환 함수를 인수로 받아 다른 컬렉터를 반환한다. 반환되는 컬렉터는 기존 컬렉터의 래퍼 역할을 하며 collect의 마지막 과정에서 변환함수로 자신이 반환하는 값을 매핑한다.

 

  1. 컬렉터는 점선으로 표시되어 있으며 groupingBy는 가장 바깥쪽에 위치하면서 요리의 종류에 따라 메뉴 스트림을 세개의 서브스트림으로 그룹화한다.
  2. groupingBy 컬렉터는 collectingAndThen 컬렉터를 감싼다. 따라서 두번째 컬렉터는 그룹화된 세개의 서브스트림에 적용된다.
  3. collectingAndThen 컬렉터는 세번째 컬렉터 maxBy를 감싼다.
  4. 리듀싱 컬렉터가 서브스트림에 연산을 수행한 결과에 collectingAndThen의 Optional::get 변환 함수가 적용된다.
  5. groupingBy 컬렉터가 반환하는 맵의 분류 키에 대응하는 세 값이 각각의 요리 형식에서 가장 높은 칼로리다.

 


분할(partitioning)

partitioning은 partitioning function(이하 분할 함수)이라 불리는 Predicate를 분류함수로 사용하는 특수한 기능이다. 분할 함수는 Boolean을 반환하므로 맵의 키 형식은 Boolean이다. 결과적으로 그룹화 맵은 최대 두개(참 or 거짓)의 그룹으로 분류된다. 

Map<Boolean, List<Dish>> partitionedMenu =
  menu.stream().collect(partitioningBy(Dish::isVegetarian));  // 분할 함수

 

위 코드를 실행하면 다음과 같은 맵이 반환된다.

{false=[pork, beef, chicken, prawns, salmon],
 true=[french fries, rice, season fruit, pizza]}

 

이제 true값의 key로 맵에서 모든 채식 요리를 얻을 수 있다.

List<Dish> vegetarianDishes = partitionedMenu.get(true);

 

물론 메뉴 리스트로 생성한 스트림을 이전 예제에서 사용한 Predicate로 필터링한 다음에 별도의 리스트에 결과를 수집해도 같은 결과를 얻을 수 있다.

List<Dish> vegetarianDishes = 
  menu.stream().filter(Dish::isVegetarian).collect(toList());

 

분할의 장점

분할 함수가 반환하는 참, 거짓 두 가지 요소의 스트림 리스트를 모두 유지한다는 것이 분할의 장점이다. 이전 예제에서 partitionedMenu 맵에 false 키를 이용해서(즉, Predicate와 그 결과를 반전시키는 두가지 필터링 연산을 적용해서) 채식이 아닌 모든 요리 리스트를 얻을 수도 있다. 또한 다음 코드에서 보여주는 것처럼 컬렉터를 두번째 인수로 전달할 수 있는 오버로드된 버전의 partitioningBy 메서드도 있다.

Map<Boolean, Map<Dish.Type, List<Dish>>> vegetarianDishesByType = 
  menu.stream().collect(
    partitioningBy(Dish::isVegetarian,  // 분할함수
                   groupingBy(Dish::getType)));  // 두번째 컬렉터

 

다음은 위 코드를 실행한 두 수준의 맵 결과이다.

{false={FISH=[prawns, salmon], MEAT=[pork, beef, chicken]},
 true={OTHER=[french fries, rice, season fruit, pizza]}

 

결과에서 확인할 수 있는 것처럼 채식 요리의 스트림과 채식이 아닌 요리의 스트림을 각각 요리 종류로 그룹화해서 두 수준의 맵이 반환되었다. 분할이란 특수한 종류의 그룹화이다. 이 외에도 groupingBy와 partitioningBy 컬렉터의 비슷한 점이 또 있다.

 

숫자를 소수/비소수로 분할하기

주어진 수가 소수인지 아닌지 판단하는 Predicate를 구현한다.

public boolean isPrime(int candidate) {
  return IntStream.range(2, candidate) // 2부터 candidate 미만 사이의 자연수 생성
                  .noneMatch(i -> candidate%1 == 0); // 스트림의 모든 정수로 candidate를 나눌수 없으면 참을 반환
}

다음처럼 소수의 대상을 주어진 수의 제곱근 이하의 수로 제한할 수 있다.

public boolean isPrime(int candidate) {
  int candidateRoot = (int) Math.sqrt((double) candidate);
  return IntStream.rangeClosed(2, candidateRoot)
                    .noneMatch(i -> candidate%1 == 0);
}

 

이제 n개의 숫자를 포함하는 스트림을 만든 다음에 우리가 구현한 isPrime 메서드를 Predicate로 이용하고 partitioningBy 컬렉터로 리듀스해서 숫자를 소수와 비소수로 분류할 수 있다.

public Map<Boolean, List<Integer>> partitionPrimes(int n) {
  return IntStream.rangeClosed(2, n).boxed()
                  .collect(partitioningBy(candidate -> isPrime(candidate)));
}

 


Collector 인터페이스

Collector 인터페이스는 리듀싱 연산(즉, 컬렉터)을 어떻게 어떻게 구현할지 제공하는 메서드 집합으로 구성된다. Collector 인터페이스를 살펴보기전에 팩토리 메서드인 toList(스트림의 요소를 리스트로 수집)을 확인한다. toList는 앞으로 일상에서 자주 사용하는 컬렉터 중 하나다. 동시에 toList는 가장 구현하기 쉬운 컬렉터이기도 하다. toList가 어떻게 구현되었는지 살펴보면서 Collecotr는 어떻게 정의되어 있고, 내부적으로 collect 메서드는 toList가 반환하는 함수를 어떻게 활용했는지 이해할 수 있다.

 

public interface Collector<T, A, R> {
    Supplier<A> supplier();
    BiConsumer<A, T> accumulator();
    Function<A, R> finisher();
    BinaryOperator<A> Combiner();
    Set<Characteristics> characteristics();
}
  • T는 수집될 스트림 항목의 제네릭 형식이다.
  • A는 누적자, 즉 수집 과정에서 중간 결과를 누적하는 객체의 형식이다.
  • R은 수집 연산 결과 객체의 형식(항상 그런것은 아니지만 대개 컬렉션 형식)이다.

예를 들어 Stream<T>의 모든 요소를 List<T>로 수집하는 ToListCollector<T>라는 클래스를 구현할 수 있다. 누적 과정에서 사용되는 객체가 수집 과정의 최종 결과로 사용된다.

public class ToListController<T> implements Collecotr<T, List<T>, List<T>>

 


Collector 인터페이스 메서드

Collector 인터페이스에 정의된 다섯 개의 메서드(Supplier, BiConsumer, Function, BinaryOperator, Set)를 하나씩 살펴보겠다. 4개의 메서드는 collect 메서드에서 실행하는 함수를 반환하는 반면, 마지막 메서드 characteristics는 collect 메서드가 어떤 최적화(예를 들면 병렬화 같은)를 이용해서 리듀싱 연산을 수행할 것인지 결정하도록 돕는 힌트 특성 집합을 제공한다.

 

supplier 메서드: 새로운 결과 컨테이너 만들기

supplier 메서드는 빈 결과로 이루어진 Supplier를 반환해야 한다. 즉, supplier는 수집 과정에서 빈 누적자 인스턴스를 만드는 파라미터가 없는 함수다. ToListCollector처럼 누적자를 반환하는 컬렉터에서는 빈 누적자가 비어있는 스트림의 수집 과정의 결과가 될 수 있다. ToListCollector에서 supplier는 다음처럼 빈 리스트를 반환한다.

public Supplier<List<T>> supplier() {
    return () -> new ArrayList<T>();
}
public Supplier<List<T>> supplier() {
	return ArrayList::new;
}

 

accumulator 메서드: 결과 컨테이너에 요소 추가하기

accumulator 메서드는 리듀싱 연산을 수행하는 함수를 반환한다. 스트림에서 n번째 요소를 탐색할때 두 인수, 즉 누적자(스트림의 첫 n-1개 항목을 수집한 상태)와 n번째 요소를 함수에 적용한다. 함수의 반환값은 void, 즉 요소를 탐색하면서 적용하는 함수에 의해 누적자 내부 상태가 바뀌므로 누적자가 어떤 값일지 단정할 수 없다. ToListCollector에서 accumulator가 반환하는 함수는 이미 탐색한 항목을 포함하는 리스트에 현재 항목을 추가하는 연산을 수행한다. 

 

public BiConsumer<List<T>, T> accumulator() {
    return (list, item) -> list.add(item);
}
public BiConsumer<List<T>, T> accumulator() {\
    return List::add;
}

 

finisher 메서드: 최종 변환값을 결과 컨테이너로 적용하기

finisher 메서드는 스트림 탐색을 끝내고 누적자 객체를 최종 결과로 변환하면서 누적 과정을 끝낼때 호출할 함수를 반환해야 한다. 때로는 ToListCollector에서 볼 수 있는 것처럼 누적자 객체가 이미 최종 결과인 상황도 있다. 이런때는 변환 과정이 필요하지 않으므로 finisher 메서드는 항등 함수를 반환한다.

public Function<List<T>, List<T>> finisher() {
	return Function.identity();
}

supplier -> accumulator -> finisher 세가지 메서드로도 순차적 스트림 리듀싱 기능을 수행할 수 있다. 실제로는 collect가 동작하기 전에 다른 중간 연산과 파이프라인을 구성할 수 있게 해주는 lazy 특성 그리고 병렬 실행 등도 고려해야 하므로 스트림 리듀싱 기능 구현은 생각보다 복잡하다.

 

combine 메서드: 두 결과 컨테이너 병합

마지막으로 리듀싱 연산에서 사용할 함수를 반환하는 네번째 메서드는 combiner를 살펴본다. combiner는 스트림의 서로 다른 서브파트를 병렬로 처리할 때 누적자가 이 결과를 어떻게 처리할지 정의한다. toList의 combiner는 비교적 쉽게 구현할 수 있다. 즉, 스트림의 두번째 서브파트에서 수집한 항목리스트를 첫번째 서브파트 결과 리스트의 뒤에 추가하면 된다. combine 메서드를 이용하면 스트림의 리듀싱을 병렬로 수행할 수 있다.

 

  • 스트림을 분할해야 하는지 정의하는 조건이 거짓으로 바뀌기 전까지 원래 스트림을 재귀적으로 분할한다(보통 분산된 작업의 크기가 너무 작아지면 병렬 수행의 속도는 순차 수행의 속도보다 느려진다. 즉, 병렬 수행의 효과가 상쇄된다. 일반적으로 프로세싱 코어의 개수를 초과하는 병렬 작업은 효율적이지 않다)
  • 서브 스트림의 각 요소에 리듀싱 연산을 순차적으로 적용해서 서브스트림을 병렬로 처리할 수 있다.
  • 마지막에는 컬렉터의 combiner 메서드가 반환하는 함수로 모든 부분결과를 쌍으로 합친다. 즉, 분할된 모든 서브스트림의 결과를 합치면서 연산이 완료된다.

 

Characteristics 메서드

마지막으로 characteristics 메서드는 컬렉터의 연산은 정의하는 Characteristics 형식의 불변 집합을 반환한다. Characteristics는 스트림을 병렬로 리듀스할 것인지 그리고 병렬로 리듀스한다면 어떤 최적화를 선택해야 할지 힌트를 제공한다. Characteristics는 다음 세 항목을 포함하는 열거형이다.

 

UNORDERED 리듀싱 결과는 스트림 요소의 방문 순서나 누적 순서에 영향을 받지 않는다.
CONCURRENT 다중 스레드에서 accumulator 함수를 동시에 호출할 수 있으며 이 컬렉터는 스트림의 병렬 리듀싱을 수행할 수 있다. 컬렉터의 플래그에 UNORDERED를 함께 설정하지 않았다면 데이터 소스가 정렬되어 있지 않은(즉, 집합처럼 요소의 순서가 무의미한) 상황에서만 병렬 리듀싱을 수행할 수가 있다.
IDENTITY_FINISH finisher 메서드가 반환하는 함수는 단순히 identity를 적용할 뿐이므로 이를 생략할 수 있다. 따라서 리듀싱 과정의 최종 결과로 누적자 객체를 바로 사용할 수 있다. 또한 누적자 A를 결과 R로 안전하게 형변활할 수 있다.

 

지금까지 개발한 ToListCollector에서 스트림의 요소를 누적하는데 사용한 리스트가 최정 결과 형식이므로 추가 변환이 필요없다. 따라서 ToListCollector는 IDENTITY_FINISH다. 하지만 리스트의 순서는 상관이 없으므로 UNORDERED다. 마지막으로 ToListCollector는 CONCURRENT다. 하지만 요소의 순서가 무의미한 데이터 소스여야 병렬로 실행할 수 있다.

 

import java.util.*;
import java.util.function.*;
import java.util.stream.Collector;
import static java.util.stream.Collector.Characteristics.*;

public class ToListCollector<T> implements Collector<T, List<T>, List<T>> {
  
  @Override
  public Supplier<List<T>> supplier() {
    return Array::new;    // 수집 연산의 시발점
  }
  
  @Override
  public BiConsumer<List<T>, T> accumulator() {
    return List::add;    // 탐색한 항목을 누적하고 바로 누적자를 고친다.
  }
  
  @Override
  public Function<List<T>, List<T>> finisher() {
    return Function.identity();    // 항등 함수
  }
  
  @Override
  public BinaryOperator<List<T>> combiner() {
    return (list1, list2) -> {
      list1.addAll(list2);    // 두번째 콘텐츠와 합쳐서 첫번째 누적자를 고친다.
      return list1;    // 변경된 첫번째 누적자를 반환한다.
    };
  }
  
  @Override
  public Set<Characteristics> characteristics() {
    // 콜렉터의 플래그를 IDENTITY_FINISH, CONCURRENT로 설정한다.
    return Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH, CONCURRENT));
  }
}

위 구현이 Collectors.toList 메서드가 반환하는 결과와 완전히 같은 것은 아니지만 사소한 최적화를 제외하면 대체로 비슷한다. 특히 자바 API에서 제공하는 컬렉터는 싱글턴 Collections.emptyList()로 빈 리스트를 반환한다. 

 

IDENTITY_FINISH 수집 연산에서는 Collector 인터페이스를 완전히 새로 구현하지 않고도 같은 결과를 얻을 수 있다. Stream은 세 함수(supplier, accumulator, combiner)를 인수로 받는 collect 메소드를 오버로드하며 각각의 메서드는 Collector 인터페이스의 메서드가 반환하는 함수와 같은 기능을 수행한다. 예를 들어 다음처럼 스트림의 모든 항목을 리스트에 수집하는 방법도 있다.

List<Dish> dishes = menuStream.collect(
                        ArrayList::new,  // supplier
                        List::add,       // accumulator
                        List:addAll);    // combiner

위 코드가 이전 코드에 비해 좀더 간결하고 축약되어 있지만 가독성은 떨어진다. 또한 두번째 collect 메서드로는 Characteristics를 전달할 수 없다. 즉, 두번째 collect 메서드는 IDENTITY_FINISH와 CONCURRENT지만 UNOIRDERED는 아닌 컬렉터로만 동작한다.


  • collect는 스트림의 요소를 요약 결과로 누적하는 다양한 방법(컬렉터라 불리는)을 인수로 갖는 최종 연산이다.
  • 스트림의 요소를 하나의 값으로 리듀스하고 요약하는 컬렉터뿐 아니라 최솟값, 최댓값, 평균값을 계산하는 컬렉터 등이 미리 정의되어 있다.
  • 미리 정의된 컬렉터인 groupingBy로 스트림의 요소를 그룹화하거나, partitioningBy로 스트림의 요소를 분할할 수 있다.
  • 컬렉터는 다수준의 그룹화, 분할, 리듀싱 연산에 적합하게 설계되어 있다.
  • Collector 인터페이스에 정의된 메서드를 구현해서 커스텀 컬렉터를 개발할 수 있다.