[Ch6] 스트림으로 데이터 수집

이전 챕터에서는 Stream에서 최종연산을 Collectors.toList로 스트림 요소를 리스트로 반환한 것만 확인했다. 이번 챕터에서는 Collectors 인터페이스가 제공하는 다양한 메서드에 대해 알아본다.

컬렉터란 무엇인가?

Collector 인터페이스 구현은 스트림의 요소를 어떤 식으로 도출할지 지정한다.

결과를 수집하는 과정을 간단하면서도 유연한 방식으로 정의할 수 있다는 점이 컬랙터의 최대 강점이다. (reduce와 비교했을 때) 컬렉터는 내부적으로 리듀싱 연산이 일어난다. 즉, Collector 인터페이스의 메서드를 어떻게 구현하느냐에 따라 스트림에 어떤 리듀싱 연산을 수행할지 결정된다.

우리는 이러한 리듀싱 연산을 Collectors 클래스에서 제공하는 여러 정적 팩터리 메서드를 사용해서 손쉽게 구현할 수 있다.

Collectors 에서 제공하는 메서드의 기능은 크게 세 가지로 구분할 수 있다.

  • 스트림 요소를 하나의 값으로 리듀스하고 요약
  • 요소 그룹화
  • 요소 분할

리듀싱과 요약

Collectors.counting

스트림 내부 요소의 개수를 카운팅해준다.

long howManyDishes = menu.stream().collect(Collectors.counting());

long howManyDishes = menu.stream().count();

maxBy, minBy

스트립값에서 최대값과 최소값을 계산할 수 있다. 해당 컬렉터는 스트림의 요소를 비교할 때 사용할 Comparator를 인수로 받는다.

Comparator<Dish> dishCaloriesComparator = Comparator.comparingInt(Dish::getCaloreis);

Optional<Dish> mostCalorieDish = menu.stream().collect(maxBy(dishCaloriesComparator));

summingInt

Collectors 클래스는 Collectors.summingInt라는 특별한 요약 팩토리 메서드를 제공한다. (summingLong, summingDouble 도 있다.)

int totalCaloreis = menu.stream().collect(summingInt(Dish::getCaloreis));

averagingInt

동일한 맥락으로 평균값 계산도 요약 기능으로 제공한다.

double avgCalories = menu.stream().collect(aaveragingInt(Dish::getCaloreis));

summarizingInt

지금까지 살펴본 연산들의 두 개 이상의 연산을 한 번에 수행해야 할 때도 있다. 카운트, 합계, 최소, 최대, 평균을 한번에 도출해낸다.

IntSummaryStatistics menuStatistics = menu.stream().collect(summarizingInt(Dish::getCaloreis));

// 출력결과 IntSummaryStatistics{count=9, sum=4300, min-120, average=477.777778, max=800}

joining

문자열을 연결해주는 컬렉터다.

String shortMenu = menu.stream().map(Dish::getName).collect(joining());

joining 메서드는 내부적으로 StringBuilder를 이용해서 문자열을 하나로 만든다.


지금까지 살펴본 모든 컬렉터는 reducing 팩토리 매서드로도 정의할 수 있다. 즉, Collectors.reducing 으로 모두 구현할 수 있다는 뜻이다. 그럼에도 불구하고 Collectors 에서 다양한 정적 메서드를 제공하는 이유는 프로그래머의 편의성과 가독성 때문이다.

또 다른점은, collect 를 대체할 reduce 메서드를 사용하면 로직에 가변형 연산을 초래할 수도 있다. 가변형 연산은 병렬성 확보에 좋지 않다.

다양한 방법으로 연산을 수행할 수 있다.

칼로리 합계 구하기

아래 두가지 방법으로 결과를 도출할 수 있지만, IntStream 을 사용하면 자동으로 언박싱 연산을 수행하거나 Integer를 int로 변환하는 과정을 피할 수 있으므로 성능에 좋다.

// 1. map 과 reduce 활용
int totalCaloreis = menu.stream().map(Dish::getCaloreis).reduce(Integer::sum).get();

// 2. IntStream 사용
int totalCaloreis = menu.stream().mapToInt(Dish::getCaloreis).sum();

그룹화

데이터 집합을 하나 이상의 특성으로 분류해서 그룹화하는 연산을 스트림으로 가독성 있게 수행할 수 있다.

Map<Dish.Type, List<Dish>> dishesByType = menu.stream().collect(Collectors.groupingBy(Dish::getType));

groupingBy 메서드로 전달되는 람다함수를 분류 함수라고 부른다. 위의 예제에서는 메서드 참조를 썼는데, 원하는 방식으로 필요한 로직을 구현해서 람다 표현식으로 전달할 수도 있다.

Map<CaloricLevel, List<Dish>> dishesByCaloricLevel = menu.stream().collect(Collectors.groupingBy(dish -> {
    if(dish.getCalories() <= 400) return CaloricLevel.DIET;
    else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
    else return CaloricLevel.FAT;
}));

책에서는 위와 같이 그루핑 함수를 사용자 정의 함수로 넘길 수 있다는 의미로 해당 예제를 보여준다. 하지만 나는 이 코드를 보고,, 감추고 싶다는 생각이 들었다. 그래서 CaloricLevel을 리턴하는 함수를 객체 안으로 넣어버렸다.

public class Dish {

	public enum Type {MEAT, FISH, OTHER};

	private String name;
	private boolean vegetarian;
	private int calories;
	private Type type;
	private StremGrouping.CaloricLevel caloricLevel;

  // 안으로 감춰버리기!
	public StremGrouping.CaloricLevel getCaloricLevel() {
		if(this.getCalories() <= 400) return StremGrouping.CaloricLevel.DIET;
		else if (this.getCalories() <= 700) return StremGrouping.CaloricLevel.NORMAL;
		else return StremGrouping.CaloricLevel.FAT;
	}

}

// 그리고 클라이언트에서 빼오기
Map<CaloricLevel, List<Dish>> dishesByCaloricLevel = menu.stream().collect(Collectors.groupingBy(Dish::getCaloricLevel));

그룹화된 요소 조작

요소를 그룹화한 다음에는 각 결과 그룹의 요소를 조작하는 연산이 필요하다. 예를 들어, 그루핑을 하는데, 어떤 조건에 해당하는 요소들만 그루핑을 한다고 하자. filter -> grouping 을 하면, 조건에 맞는 데이터가 없을 경우, 해당 요소는 그루핑 결과에서 제외되서 결과값에 포함되지 않는다. 만약 조건에 맞지 않는 요소도 포함은 하고 싶다면 groupingBy 팩토리 메서드 중, Predicate 를 두 번째 파라미터로 받는 메서드를 사용하면 된다.

groupingBy(Function<? super T, ? extends K> classifier, Supplier<M> mapFactory, Collector<? super T, A, D> downstream)

Map<Dish.Type, List<Dish>> caloricDishesByType = menu.stream().collect(groupingBy(Dish::getType, filtering(dish -> dish.getCaloreis() > 500, toList())));

mapping 메서드 사용

그룹화된 항목을 조작하는 다른 유용한 기능 중 또 다른 하나로 매핑 함수르 이용해 요소를 변환하는 작업이 있다.

Map<Dish.Type, List<String>> dishNameByType = menu.stream()
                                                  .collect(Collectors.groupingBy(Dish::getType, Collectors.mapping(Dish::getName, Collectors.toList())));

이렇게 하면 Value 값이 String이 된다.(Dish::getName) mapping 대신 flatMapping 을 넘길수도 있다.

다수준 그룹화

Collectors.groupingBy는 일반적인 분류함수와 컬렉터를 인수로 받는다. 여기서 더 나아가, 분류할 두번째 기준을 정의할 내부 groupingBy를 전달해서 두 수준으로 스트림의 항목을 그룹화할 수 있다.

Map<Dish.Type, Map<CaloricLevel, List<Dish>>> dishesByTypeCaloricLevel = menu.stream().collect(
        Collectors.groupingBy(Dish::getType, // 첫 번째 수준의 분류 함수
                Collectors.groupingBy(dish -> { // 두 번째 수준의 분류함수
                    if(dish.getCalories() <= 400)
                        return CaloricLevel.DIET;
                    else if(dish.getCalories() <= 700)
                        return CaloricLevel.NORMAL;
                    else return CaloricLevel.FAT;
                })
                )
);

이 예제는

  1. Type 으로 한번 분류하고, 그 내부에서 다시
  2. CaloricLevel로 분류했다.

이렇듯 다수준 그룹화 연산은 다양한 수준으로 확장할 수 있다. 즉, n수준 그룹화의 결과는 n수준 트리 구조로 표현되는 n수준 맵이된다.

컬렉터 결과를 다른 형식에 적용하기

Collectors.collectingAndThen

컬렉터가 반환한 결과를 다른형식으로 활용할 수 있다. collectingAndThen은 적용할 컬렉터와 변환 함수를 인수로 받아 다른 컬렉터를 반환한다.

Map<Dish.Type, Dish> mostCaloricByType = menu.stream()
                                             .collect(Collectors.groupingBy(Dish::getType // 1. Type 별로 그룹핑
                                                                            ,Collectors.collectingAndThen(
                                                                                    Collectors.maxBy(Comparator.comparingInt(Dish::getCalories)) // 2-1. 그룹된 결과를 macBy로 감싼다.
                                                                                    ,Optional::get))); // 2-2. finishing

reducing 컬렉터는 절대 Optional.empty()를 반환하지 않으므로 안전한 코드다

일반적으로 스트림에서 같은 그룹으로 분류된 모든 요소에 리듀싱 작업을 수행할 때는 groupingBy에 두 번째 인수로 컬렉터를 전달하면 된다. 위에서 살펴본 예제들은 두 번째 인수로 컬렉터를 전달해서 그룹핑된 값을 입맛에 맞게 가공해서 리턴했다.

분할

분할 함수 (partitioning function)는 boolean 을 반환하고, true / false 에 해당하는 원소들로 분류한다.

이러한 분할 함수는 filter로 요소들을 필터링 한 뒤, 리스트로 얻는것과 다른 점은 참, 거짓 두가지 요소의 스트림 리스트를 모두 유지한다는 것이 장점이다.

또한, 두 번째 인수로 컬렉터를 전달할 수 있는 overload 된 버전의 partitioningBy 메서드도 있다. 그래서 다수준 그룹화를 수행할 수 있다.

// 1. Predicate 를 파라미터로 받는 partitioningBy
public static <T>
Collector<T, ?, Map<Boolean, List<T>>> partitioningBy(Predicate<? super T> predicate) {
    return partitioningBy(predicate, toList());
}

// 2. Predicate 와 Collector (downstream) 를 받는 partitioningBy
public static <T, D, A>
Collector<T, ?, Map<Boolean, D>> partitioningBy(Predicate<? super T> predicate,
                                                Collector<? super T, A, D> downstream)

분할은 특수한 종류의 그룹화라고 볼 수 있다. 그룹화와 다른 점은

  • boolean 을 반환하는 predicate 를 가져야하는 점
  • 참/거짓 두 가지 키만 포함해서 보다 간결하고 효과적이라는 점이다.

다수준 분할

groupingBy 컬렉터와 마찬가지로 partitioningBy 컬렉터도 다른 컬렉터와 조합해서 사용할 수 있다. 특히 두개의 partitioningBy 컬렉터를 이용해 다수준 분할을 수행할 수 있다.

Map<Boolean, Map<Boolean, List<Dish>>> quiz1 = menu.stream().collect(Collectors.partitioningBy(Dish::isVegetarian // 1차 분할
                                                                                             , Collectors.partitioningBy(d -> d.getCalories() > 500))); // 2차분할

숫자를 소수와 비소수로 분할하기

정수 n을 인수로 받아서 2에서 n까지의 자연수를 primie 과 nonprime 으로 나누는 프로그램을 구현하자.

소스코드는 git을 참고하자. 소수 분할하기 실습코드

Collector 인터페이스

지금까지 Collector 인터페이스의 여러 정적 팩터리 메서드를 배워봤다.

팩토리 메서드 반환 형식
toList List
toSet Set
toCollection Collection
counting Long
summingInt Integer
averagingInt Double
summarizingInt IntSummaryStatistics
joining String
maxBy Optional
minBy Optional
reducing The type produced by the reduction operation
collectingAndThen The type returned by the transforming function
groupingBy Map<K, List>
partitioningBy Map<Boolean, List>

Collector 인터페이스를 개발자가 직접 구현할수도 있다. Collector 인터페이스의 시그니처와 다섯 개의 메서드 정의는 다음과 같다.

public interface Collector<T, A, R> {
    Supplier<A> supplier();

    BiConsumer<A, T> accumulator();

    BinaryOperator<A> combiner();

    Function<A, R> finisher();

    Set<Collector.Characteristics> characteristics();
}
  • T 는 스트림 항목의 제네릭 형식
  • A 는 누적자, 즉 수집 과정에서 중간 결과를 누적하는 객체의 형식
  • R 은 수집연산 결과 객체의 형식 (result)

Stream 의 모든 요소를 List로 수집하는 ToListCollector 라는 구현체를 만들어보자

supplier 메서드 : 새로운 결과 컨테이너 만들기

supplier 메서드는 empty 결과로 이루어진 Supplier를 반환해야 한다. Supplier 는 파라미터 없이, T 객체를 리턴하는 함수다.

@FunctionalInterface
public interface Supplier<T> {
    T get();
}
@Override
public Supplier<List<T>> supplier() {
    return ArrayList::new;
}

accumulator 메서드 : 결과 컨테이너에 요소 추가하기

accumulator 메서드는 리듀싱 연산을 수행하는 함수를 반환하다. 함수의 반환값은 void 로, Consumer 함수형 인터페이스를 쓴다.

@Override
public BiConsumer<List<T>, T> accumulator() {
    return List::add;
}

finisher 메서드 : 최종 변환값을 결과 컨테이너로 적용하기

스트림 탐색을 끝내고 누적자 객체를 최종 결과로 변환하면서 누적 과정을 끝낼 때 호출할 함수를 반환한다. 누적자 객체 자신이 이미 최종 결과라면 Function.identity() 항등 함수를 반환한다.

@Override
public Function<List<T>, List<T>> finisher() {
    return Function.identity();
}

combiner 메서드 : 두 결과 컨테이너 병합

combiner는 스트림의 서로 다른 서브파트를 병렬로 처리할 때, 누적자가 이 결과를 어떻게 처리할지 정의한다. (두 스트림의 결과를 받았을 때 어떻게 할 지 결정하는 것 같다.)

@Override
public BinaryOperator<List<T>> combiner() {
    return(list1, list2) -> {
        list1.addAll(list2);
        return list1;
    };
}

Characteristics 메서드

컬렉터의 연산을 정의하는 Characteristics 형식의 불변 집합을 반환한다. 스트림을 병렬로 reduce 할 것인지, 그리고 병렬로 리듀스한다면 어떤 최적화를 선택해야 할지 힌트를 제공한다.

  • UNORDERED : 리듀싱 결과가 스트림 요소의 방문 순서나 누적 순서에 영향을 받지 않는다.
  • CONCURRENT : 다중 스레드에서 accumulator 함수를 동시에 호출할 수 있으며 병렬 리듀싱을 수행할 수 있다.
  • IDENTITY_FINISH : finisher 메서드가 반환하는 함수는 단순히 identity를 적용할 뿐이므로 이를 생략할 수 있다. (리듀싱 최종 결과로 누적자 객체를 바로 사용하게 할 수 있다.)

이렇게 만들어진 ToListCollector 로 Collectors.toList() 메서드를 대체할 수 있다.

컬렉터 구현을 만들지 않고도 커스텀 수집 수행하기

인터페이스를 구현하지 않아도 커스텀 Collector를 만들 수 있다.

List<Dish> dishes = menuStream.collect(
  ArrayList::new, // 발행
  List::add, // 누적
  List::addAll // 합침
)

구현체와 다른 점은 Characteristics를 전달할 수 없다는 점, 간결하지만 가독성은 떨어진다는 점이다.

커스텀 컬렉터

다시 소수와 비소수로 분류하기 예제로 돌아오자.

개선 포인트 : 소수로만 나누기

원래 소스로는 제곱근까지 루프를 돌면서 2로 나누어 떨어지는지를 확인했다. 이를 조금 더 개선하려면 소수로 나누어 떨어지는지 확인해서 대상의 범위를 좁힐 수 있다. 그러려면 지금까지 발견한 소수 리스트에 접근해야 하는데, 일반 컬렉터로는 불가하다. 커스텀 컬렉터를 만들어서 부분결과에 접근할 수 있도록 구현하자. 소수인지 확인하는 isPrime() 메서드를 개선하면 된다.

public static boolean isPrime(List<Integer> primes, int candidate) {
    int candidateRoot = (int) Math.sqrt((double) candidate);
    return primes.stream()
                 .takeWhile(x -> x <= candidateRoot)
                 .noneMatch(x -> candidate % x == 0);
}

자바8로 takeWhile 흉내내기

takeWhile 메소드는 자마9에서 지원하므로 자바8에서는 이 기능을 사용할 수 없다. 하지만, 정렬된 리스트와 predicate 를 인수로 받아 가장 긴 첫 요소 리스트를 반환하도록 takeWhile 메서드를 구현할 수 있다.

  /**
   * 자바 9부터 지원되는 takeWhile 메서드 직접 구현
   * @param list
   * @param predicate
   * @param <A>
   * @return
   */
  public static <A> List<A> takeWhile(List<A> list, Predicate<A> predicate) {
      int i = 0;
      for(A item : list) {
          if(!predicate.test(item)){
              return list.subList(0, i);
          }
          i ++;
      }
      return list;
  }

컬렉터 구현

구현은 소스코드를 보는게 좋다. 커스텀 컬렉터를 활용한 개선된 소수 리스트 분할하기

알고리즘 자체가 순차적이다. 이게 무슨 말이냐면 소수로 정렬되어 있는 리스트를 순차적으로 돌면서 제곱근까지 갔을 경우 끝내버린다. 개인적인 생각으로는, 이러한 컬렉터를 병렬로 돌렸을 경우, 시간이 엄청나게 악화될 것 같다.

병렬로 구현할 수 없는 경우, combiner() 메서드는 빈 구현으로 남겨둬도 상관없다.

성능 체크

// partitioningBy 사용시
Fastest execution done in 435 msecs

// 커스텀 컬렉터 사용시
Fastest execution done in 215 msecs

댓글남기기