[Ch7] 병렬 데이터 처리와 성능

병렬 스트림

컬렉션에서 parallelStream 을 호출하면 병렬 스트림이 생성된다. 병렬 스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 chunk로 분할한 스트림이다.

숫자 n을 인수로 받아서 1부터 n까지의 모든 숫자의 합계를 반환하는 메서드를 구현해보자.

// 스트림 사용
public long sequentialSum() {
    return Stream.iterate(1L, i -> i + 1)
                 .limit(N)
                 .reduce(0L, Long::sum);
}
// 전통적인 for 문
public long iterativeSum() {
    long result = 0;
    for(long i = 1L; i<N; i++) {
        result += i;
    }
    return result;
}

이러한 연산은 N이 커진다면 부하가 커질 것이므로 병렬로 처리하는 것이 좋다. 이제 병렬 스트림을 사용해보자

public long parallelSum() {
    return Stream.iterate(1L, i -> i + 1).limit(N)
            .parallel() // 스트림을 병렬 스트림으로 변환
            .reduce(0L, Long::sum);
}

parallel() 메서드를 쓰면 스트림이 여러 chunk 로 분할된다. 이렇게 분할된 chunk를 병렬로 수행한 뒤에, 리듀싱 연산으로 합쳐서 전체 스트림의 리듀싱 결과를 도출한다.

그렇다면 이렇게 만들어진 스레드는 어디서 생성되며 몇 개나 생성될까? 병렬 스트림은 내부적으로 ForkJoinPool 을 사용한다. ForkJoinPool은 프로세서 수, 즉 Runtime.getRuntime().availableProcessors() 가 반환하는 값에 상응하는 스레드를 갖는다.

// 전역설정코드!! 시스템의 병렬 스레드 수를 지정할 수 있다.
System.setProperty("java.util.concurrent.ForJoinPool.common.parallelism", "12");

JMH 를 활용한 성능 측정

병렬화를 이용하면 순차나 반복 형식에 비해 성능이 더 좋아질 것이라 추측했다. 추측만 하지말고, 정말로 성능이 좋아지는지 무조건 확인을 해봐야한다.

자바 마이크로벤체마크 하니스 (JMH)

JMH를 이용하면 간단하고, 어노테이션 기반 방식으로 벤치마크를 구현할 수 있다. 책에서는 메이븐을 통해 디펜던시를 추가했는데, 나는 gradle 을 사용했다.

Gradle 설정

plugins {
    id "me.champeau.jmh" version "0.6.5"
}


dependencies {
    jmh 'org.openjdk.jmh:jmh-core:0.9'
    jmh 'org.openjdk.jmh:jmh-generator-annprocess:0.9'
}

앞서 설명한 세 가지 방식의 sum 메서드를 벤치마크로 수행해본 결과 iterativeSum > sequentialSum > parallelSum 순으로 속도가 빠르다.

스크린샷 2021-08-12 오후 8.33.38

iterativeSum 이 전통적인 방식인 sequentialSum 보다 느린 이유 스트림을 iterate 하면서 long 변수를 박싱한 결과인 Long으로 연산을 하기 때문이다. parallelSum 이 실망스럽게도 제일 느린 이유는 두 가지 문제점이 있다.

  • 반복 결과로 박싱된 객체가 만들어지므로 숫자를 더하려면 언박싱을 해야한다.
  • 반복 작업은 병렬로 수행할 수 있는 독립 단위로 나누기가 어렵다.
    • 즉, 이전 연산의 결과에 따라 다음 함수의 입력이 달라지기 때문에 iterate 연산을 청크로 분할하기가 어렵다.

스트림이 병렬로 처리되도록 지시했고 각각의 합계가 다른 스레드에서 수행되었지만, 리듀싱 과정을 시작하는 시점에 전체 숫자 리스트가 준비되지 않았으므로 스트림을 병렬로 처리할 수 있도록 청크로 분할이 안된 것이다. 결국, 스레드를 할당하는 오버헤드만 증가했다.

따라서, 병렬 프로그래밍을 오용하면 오히려 전체 프로그램의 성능이 더 나빠질 수 있으므로 parallel 메서드를 호출했을 때 내부적으로 어떤 일이 일어나는지 꼭 이해해야 한다.

효율적으로 병렬로 처리하기 위해서 LongStream.rangeClosed 메서드를 활용한다.

  • 기본형 long 을 직접 사용하므로 박싱과 언박싱 오버헤드가 사라진다.
  • LongStream.rangeClosed 는 쉽게 청크로 분할할 수있는 숫자 범위를 생산한다.
    • 1-20 범위의 숫자를 각각 1-5, 6-10, 11-15, 16-20 범위의 숫자로 분할할 수 있다.

스크린샷 2021-08-12 오후 10.07.33

JMH로 수행한 결과, 앞서 세 케이스보다 처리 속도가 빠른 것을 확인할 수 있다. 만약 jmh 수행 후, 이전 메서드 주석한 뒤 새로운 메서드만 수행했을 시 에러 발생 시, ./gradlew clean 을 한 뒤 다시 수행해보면 된다.

이렇듯, 올바른 자료구조를 선택하고, 함수형 프로그래밍을 올바르게 사용하면 반복적으로 코드를 실행하는 방법에 비해 최신 멀티 코어 CPU가 제공하는 병렬 실행의 힘을 얻을 수 있다.

병렬 스트림의 올바른 사용법

병렬 스트림을 잘못 사용하면서 발생하는 많은 문제는 공유된 상태를 바꾸는 알고리즘을 사용하기 떄문에 일어난다. 병렬 스트렘과 병렬 계산에서는 공유된 가변 상태를 피해야 한다

병렬 스트림 효과적으로 사용하기

  • 확신이 서지 않으면 직접 측정하라 무조건 병렬 스트림으로 바꾸는 것이 능사는 아니다. 이미 살펴본 것처럼 언제나 병렬 스트림이 순타 스트림보다 빠른 것은 아니기 때문이다. JMH와 같은 적절한 벤치마크로 성능을 측정하자

  • 박싱을 주의하라. 앞서 살펴봤듯이, 자동 박싱과 언박싱은 (생각외로) 성능을 크게 저하시킬 수 있다. 기본형 특화 스트림 (IntStream, LongStream, DoubleStream)을 사용하자

  • 순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있다. limit, findFirst 처럼 요소의 순서에 의존하는 연산을 병렬 스트림에서 수행하려면 비싼 비용을 치러야 한다.

  • 스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하라 N개의 요소에, 하나의 요소를 처리하는 데 드는 비용을 Q라 하면 전체 스트림 파이프라인 처리 비용을 N*Q로 예상할 수 있다. Q가 높아진다는 것은 병렬 스트림으로 성능을 개선할 수 있는 가능성이 있음을 의미한다.

  • 소량의 데이터에서는 병렬 스트림이 도움되지 않는다. 소량의 데이터를 처리하는 상황에서는 병렬화 과정에서 생기는 부가 비용을 상쇄할 수 있을 만큼의 이득을 얻지 못하기 때문이다.

  • 스트림을 구성하는 자료구조가 적절한지 확인하라. ArrayList가 LinkedList보다 낫다. LinkedList를 분할하려면 모든 요소를 탐색해야하기 때문이다.

  • 스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다. SIZED 스트림은 정확히 같은 크기의 두 스트림으로 분할할 수 있으므로 효과적으로 스트림을 병렬 처리할 수 있다. 반면 필터 연산이 있으면 스트림의 길이를 예측할 수 없으므로 효과적으로 스트림을 병렬 처리할 수 있을지 알 수 없게 된다.

  • 최종 연산의 병합과정 (Collector.combiner()) 비용을 살펴보라. 병합 과정의 비용이 비싸다면 병렬 스트림으로 얻은 성능의 이익이 서브스트림의 부분 결과를 합치는 과정에서 상쇄될 수 있다.

포크 / 조인 프레임워크

포크/조인 프레임워크는 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계되었다. 포크/조인 프레임워크에서는 서브태스크를 ForkJoinPool 의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현한다.

RecursiveTask 활용

스레드 풀을 이용하려면 RecursiveTask 의 서브클래스를 만들어야 한다. RecursiveTask 를 정의하려면 추상 매서드 `compute` 를 구현해야 한다. 이 `compute` 메서드는 태스크를 서브태스크로 분할하는 로직과, 더 이상 분할할 수 없을 때 개별 서브태스크의 결과를 생산할 알고리즘을 정의한다.

분할 후 정복 (divide-and-conquer) 알고리즘의 병렬화 버전이다! 앞에서 봤던, N까지의 수를 더하는 예제를 Fork/Join 프레임워크를 사용해서 다시 구현해보자

public class ForkJoinSumCalculator extends RecursiveTask<Long> {

    private final long[] numbers; // 더할 숫자 배열
    private final int start; // 이 서브태스크에서 처리할 배열의 초기위치와 최종위치
    private final int end;
    private static final long THRESHOLD = 10_000; // 이 값 이하의 서브태스크는 더 이상 분할할 수 없다.

    public ForkJoinSumCalculator(long[] numbers) {
        this(numbers, 0, numbers.length);
    }
    private ForkJoinSumCalculator(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        int length = end - start; // 이 태스크에서 더할 배열의 길이
        if(length <= THRESHOLD) {
            return computeSequentially();
        }
        ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2);
        leftTask.fork(); // ForkJoinPool의 다른 스레드로 새로 생성한 태스크를 비동기로 실행한다.

        ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end);

        Long rightResult = rightTask.compute(); // 두 번째 서브태스크를 동기 실행한다. 이때 추가로 분할이 일어날 수 있다.
        Long leftResult = leftTask.join(); // 첫번째 서브태스크의 결과를 읽거나 아직 결과가 없으면 기다린다.

        return leftResult + rightResult;

    }

    private long computeSequentially() {
        long sum = 0;
        for(int i=start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
}

이 예제에서는 10,000개의 요소 까지 쪼개고 그 이상부터는 쪼개지 않는다. 즉, 10,000개 씩 합을 하는 스레드를 만드는 것이다. RecursiveTask 을 구현한 ForkJoinSumCalculator를 ForkJoinPool로 전달하면 풀의 스레드가 ForkJoinSumCalculator의 compute 메서드를 실행하면서 작업을 수행한다. Task 를 분할할 수 있을 때까지 (THRESHOLD) 분할 한뒤, 재귀적으로 역순으로 방문하면서 result를 구하고, 최종 결과를 계산한다.

메서드 성능을 확인하면 병렬 스트림을 이용할때보다 성능이 나빠졌다. 그 이유는 ForkJoinSumCalculator 태스크에서 사용할 수 있도록 전체 스트림을 long[] 으로 변환했기 때문이다.

포크/조인 프레임워크를 올바르게 사용하는 방법

  • join 메서드를 태스크에 호출하면 태스크가 생상하는 결과가 준비될 때까지 호출자를 블록시킨다. 따라서 두 서브태스크가 모두 시작된 다음에 join 을 호출해야 한다. 그렇지 않으면 각각의 서브태스크가 다른 태스크가 끝나길 기다리는 일이 발생하며, 원래 순차 알고리즘보다 느리고 복잡한 프로그램이 될 수 있다.

  • RecursiveTask 내에서는 ForkJoinPool의 invoke 메서드를 사용하면 안된다. 대신, compute 나 fork 메서드를 이용해 메서드를 호출하자

  • 서브태스크에 fork 메서드를 호출해서 ForkJoinPool의 일정을 조절할 수 있다. 왼쪽 작업과 오른쪽 작업 모두에 fork 메서드를 호출하는 것이 자연스러울 것 같지만, 예제에서도 봤듯이, 한쪽에는 fork 한쪽에는 compute 를 호출한다. 그 이유는 두 서브 태스크의 한 태스크에는 같은 스레드를 재사용할 수 있으므로 풀에서 불필요한 태스크를 할당하는 오버헤드를 피할 수 있다.

  • 포크/조인 프레임워크를 이용하는 병렬 계산은 디버깅하기 어렵다.

  • 병렬 스트렘에서 살펴본 것 처럼, 멀티코어에 포크/조인 프레임워크를 사용하는 것이 순차 처리보다 무조건 빠를 거라는 생각은 버려야한다. I/O 시간도 있으며, 서브태스크를 포킹하는데 드는 시간도 있다.

작업 훔치기

앞선 예제에서는 10,000 을 THRESHOLD로 두고 서브태스크를 분할했다. 기준값을 바꿔가면서 최적의 기준을 찾는 방법 밖에는 답이 없다. 쿠어 개수와 관계없이 적절한 크기로 분할된 많은 태스크를 포킹하는것이 바람직한다. 포크/조인 프레임워크에서는 작업 훔치기라는 기법으로 스레드를 공정하게 불할하며, 각각의 스레드가 본인 일이 끝나면 다른 스레드에서 일을 훔쳐와서 수행을 한다.

doubly linked list 를 참조하면서 작업이 끝날 때마다 큐의 헤드에서 다른 태스크를 가져와서 처리하기 때문이다.

Spliterator 인터페이스

앞에서 숫자 배열을 여러 태스크로 분할하는 로직을 개발해봤다. 병렬 스트림을 쓸때는 분할 로직을 따로 구현하지 않았었는데, 이는 스트림을 자동으로 분할해주는 기능이 있기 때문이다. 그 기능을 하는 것이 Spliterator 이다.

Iterator 처럼 Spliterator 는 소스의 요소 탐색 기능을 제공한다. 다만, 병렬작업에 특화되어 있다. ‘분할할 수 있는 반복자’ 역할을 한다. Spliterator 가 스트림을 여러 스트림으로 분할하는 과정은 재귀적 으로 일어난다.

public interface Spliterator<T> {
  boolean tryAdvance(Consumer<? super T> action);
  Spliterator<T> trySplit();
  long estimatedSize;
  int characteristics;
}

Spliterator 인터페이스는 여러 메서드를 정의하는데 아래 메서드들을 살펴보자.

  • tryAdvance : 요소를 하나씩 순차적으로 소비하면서 탐색해야 할 요소가 남아있으면 참을 반환한다. (Iterator 과 동일)
  • trySplit : Spliterator의 일부 요소를 분할해서 두 번째 Spliterator를 생성한다.
  • estimatedSize : 탐색해야할 요소 수, 공평하게 Spliterator를 분할하라 수 있다.
  • characteristics : 이 정보를 참고해서 Spliterator를 더 잘 제어하고 최적화할 수 있다.

    구분 설명
    ORDERED 리스트처럼 요소에 정해진 순서가 있으므로 Spliterator 는 요소를 탐색하고 분할할 때 이 순서에 유의해야 한다.
    DISTINCT x,y 두 요소를 방문했을 때 x.equals(y) 는 항상 false 를 반환한다.
    SORTED 탐색된 요소는 미리 정의된 정렬 순서를 따른다.
    SIZED 크기가 알려진 소스(예를 들면 Set) 로 Spliterator를 생성했으므로 estimatedSize() 는 정확한 값을 반환한다.
    NON-NULL 탐색하는 모든 요소는 null이 아니다.
    IMMUTABLE 이 Spliterator의 소스는 불변이다. 즉, 요소를 탐색하는 동안 요소를 추가하거나, 삭제하거나, 고칠 수 없다.
    CONCURRENT 동기화 없이 Spliterator의 소스를 여러 스레드에서 동시에 고칠 수 있다.
    SUBSIZED 이 Spliterator 그리고 분할되는 모든 Spliterator는 SIZED 특성을 갖는다.

커스텀 Spliterator 구현하기

GIT 링크 첨부

댓글남기기