[Ch17] 리액티브 프로그래밍

오늘날에는 다음과 같은 이유로 상황이 변하고 있다.

  • 빅데이터 : 보통 빅데이터는 페타바이트 단위로 구성되며 매일 증가한다.
  • 다양한 환경 : 모바일 디바이스에서 수천 개의 멀티코어 프로세서로 실행되는 클라우드 기반 클러스터에 이르기까지 다양한 환경에 애플리케이션이 배포된다.
  • 사용 패턴 : 사용자는 1년 내내 항상 서비스를 이용할 수 있으며 밀리초 단위의 응답 시간을 기대한다.

예전 소프트웨어 아키텍처로는 오늘날의 이런 요구사항을 만족시킬 수 없다. 리액티브 프로그래밍에서는 다양한 시스템과 소스에서 들어오는 데이터 항목 스트림을 비동기적으로 처리하고 합쳐서 이런 문제를 해결한다.

이번 장에서는 리액티브 애플리케이션과 시스템의 발전된 특징을 알아본다.

리액티브 매니페스토

reactive manifesto 는 2013년에 개발되었으며, 리액티브 애플리케이션과 시스템 개발의 핵심 원칙을 공식적으로 정의한다.

  • 반응성(responsive) : 리액티브 시스템은 빠를 뿐 아니라, 더 중요한 특징으로 일정하고 예상할 수 있는 반응시간을 제공한다. 결과적으로 사용자가 기대치를 가질 수 있다.
  • 회복성 (resilient) : 장애가 발생해도 시스템은 반응해야 한다. 컴포넌트 실행 복제, 여러 컴포넌트의 시간 분리(독립적인 생명주기)와 공간 분리(다른 프로세스에서 실행됨), 각 컴포넌트가 비동기적으로 작업을 다른 컴포넌트에 위임하는 등 리액티브 매니페스토는 회복성을 달성할 수 있는 다양한 기법을 제시한다.
  • 탄력성 (elastic) : 애플리케이션의 생명주기 동안 다양한 작업 부하를 받게 되는데, 이 다양한 작업 부하로 애플리케이션의 반응성이 위협받을 수 있다. -> 자동으로 자원 수를 늘린다.
  • 메시지 주도 (message-driven) : 비동기 메시지를 전달해 컴포넌트 끼리의 통신이 이루어지며, 이를 통해 회복성(장애를 메시지로 처리)과 탄력성(주고 받은 메시지의 수를 감시하고 메시지의 양에 따라 적절하게 리소스를 할당)을 얻을 수 있다.

애플리케이션 수준의 리액티브

애플리케이션 수즌 컴포넌트의 리액티브 프로그래밍의 주요 기능은 비동기로 작업을 수행할 수 있다는 점이다.

이벤트 스트림을 블록하지 않고 비동기로 처리하는 것이 최신 멀티코어 CPU의 사용률을 극대화할 수 있는 방법이다. 이 목표를 달성할 수 있도록 리액티브 프레임워크와 라이브러리는 스레드를 퓨처, 액터, 일련의 콜백을 발생시키는 이벤트 루프 등과 공유하고 처리할 이벤트를 변환하고 관리한다.

스레드를 다시 쪼개는 종류의 기술을 이용할 때는 메인 이벤트 루프 안에서는 절대 동작을 block 하지 않아야 한다는 중요한 전제 조건이 따른다.

  • 데이터베이스나 파일 시스템 접근
  • 원격서비스 호출 위와 같은 모든 I/O 관련 동작이 블록 동작에 속한다.

RxJava, Akka 와 같은 리액티브 프레임워크는 별도로 지정된 스레드 풀에서 블록 동작을 실행시켜 메인 풀의 스레드는 방해받지 않고 실행되게 한다.

CPU 관련 작업은 실제로 할당된 CPU 코어 또는 스레드를 100퍼센트 활용해 뭔가를 연산하느라 다른 일을 처리할 수 없어 block 되는 반면, I/O관련 작업에서는 사용자 입력 같은 외부 응답을 기다리면서 CPU코어나 스레드가 처리할 일이 없이 block되는 상황이다. -> 개발자는 CPU가 최대로 활용할 수 있도록 특정 작업이 CPU 관련 작업인지, I/O 관련 작업인지 적절하게 선택해야 한다.

시스템 수준의 리액티브

시스템 측면에서 봤을 때, 리액티브 시스템이란 여러 애플리케이션이 한 개의 일관적인, 회복할 수 있는 플랫폼을 구성할 수 있게 해줄 뿐 아니라, 이들 애플리케이션 중 하나가 실패해도 전체 시스템은 계속 운영될 수 있도록 도와주는 소프트웨어 아키텍처다.

메시지는 정의된 목적지 하나를 향하는 반면, 이벤트는 관련 이벤트를 관찰하도록 등록한 컴포넌트가 수신한다는 점이 다르다.

리액티브 시스템에서는 producer 와 consumer 가 각각 수신 메시지, 발신 메시지와 결합하지 않도록 이들 메시지를 비동기로 처리해야 한다. 즉, 각 컴포넌트를 완전히 고립시켜야 하는 것이다. ➡️ fault tolerance 를 지킬 수 있다.

시스템에 장애가 발생했을 때 서서히 성능이 저하되는 것이 아니라, 문제를 격리함으로 장애에서 완전 복구되어 건강한 상태로 시스템이 돌아온다.

위와 같이 구현할 수 있는 이유는, 컴포넌트를 완전히 고립시켜 에러 전파를 방지하고, 이들을 메시지로 바꾸어 다른 컴포넌트로 보내는 등 감독자 역할을 수행함으로 이루어진다.

리액티브 스트림과 Flow API

리액티브 프로그래밍은 리액티브 스트림을 사용하는 프로그래밍이다. 리액티브 스트림은 잠재적으로 무한의 비동기 데이터를 순서대로, 그리고 block 하지 않는 역압력을 전제해 처리하는 표준 기술이다.

역압력은 발행-구독 프로토콜에서 이벤트 스트림의 consumer 가 producer 가 메시지를 발행하는 속도보다 느린 속도로 이벤트를 소비하는 문제가 발생하지 않도록 (lag 가 생기지 않도록) 보장하는 장치다.

즉, 위와 같은 상황은 부하가 발생했다는 의미로 볼 수 있는데, 부하가 발생한 컴포넌트는 이벤트 발생 속도를 늦추라고 알리거나, 얼마나 많은 이벤트를 수신할 수 있는지 알리거나, 다른 데이터를 받기 전에 기존의 데이터를 처리하는 데 얼마나 시간이 걸리는지를 upstream 발행자에게 알릴 수 있어야 한다.

넷플릭스, 레드햇, 트위터, 라이트밴드 및 기타 회사들이 참여한 리액티브 스트림 프로젝트에서는 모든 리액티브 스트림 구현이 제공해야 하는 최소 기능 집합을 네 개의 인터페이스로 정의했다. 아래와 같은 많은 서드파티 라이브러리에서는 이 네 개의 인터페이스를 구현한다.

  • java.util.concurrent.Flow
  • Akka 스트림
  • Reactor, RxJava, Vert.x

Flow 클래스

자바9에서는 리액티브 프로그래밍을 지원하는 java.util.concurrent.Flow 를 추가했다. 불변클래스로, 인스턴스화 할 수 없다.

public final class Flow {
    static final int DEFAULT_BUFFER_SIZE = 256;

    private Flow() {
    }
}

리액티브 스트림의 표준에 따라 아래 4개 인터페이스를 포함한다.

  • Publisher
  • Subscriber
  • Subscription
  • Processor
@FunctionalInterface
public interface Publisher<T> {
    void subscribe(Flow.Subscriber<? super T> var1);
}

Publisher 가 항목을 발행하고, Subscriber 의 요구사항에 따라 역압력 기법에 의해 이벤트 제공 속도가 제한된다. (Subscription 에서 관리한다.)

public interface Subscriber<T> {
    void onSubscribe(Flow.Subscription var1);

    void onNext(T var1);

    void onError(Throwable var1);

    void onComplete();
}

위의 Subscriber 를 보면 subscriber 가 publisher에 자신을 등록할 때, publisher는 처음으로 onSubscribe 메서드를 호출해 Subscription 객체를 전달한다.

public interface Subscription {
    void request(long var1);

    void cancel();
}

Subscription 인터페이스는 메서드 두 개를 정의한다.

  • request : publisher 에게 주어진 개수의 이벤트를 처리할 준비가 되었음을 알릴 수 있다.
  • subscription : publisher 에게 더 이상 이벤트를 받지 않음을 통지한다.
public interface Processor<T, R> extends Flow.Subscriber<T>, Flow.Publisher<R> {
}

마지막으로, Processor 인터페이스는 단지 Publisher 와 Subscriber 를 상속받을 뿐 아무 메서드도 추가하지 않는다.

이 인터페이스는 리액티브 스트림에서 처리하는 이벤트의 변환 단계를 나타낸다. Processor 가 에러를 수신하면, 이로부터 회복하거나 즉시 onError 신호로 모든 Subscriber에 에러를 전파할 수 있다. 마지막 Subscriber 가 Subscription 을 취소하면, Processor 는 자신의 업스트림 Subscription도 취소함으로 취소 신호를 전파해야 한다.

실제로 리액티브 애플리케이션 만들기

Flow 클래스에 정의된 인터페이스 대부분은 직접 구현하도록 의도된 것이 아니라고 한다. “준수해야 할 규칙과 공용어를 제시하는 수준” 으로 봐야 할 것 같다.

책에서는 이러한 실습 예제를 담았다.

  • TempInfo, 원격 온도계를 흉내낸다. (0에서 99 사이의 화씨 온도를 임의로 만들어 연속적으로 보고)
  • TempSubscriber, 레포트를 관찰하면서 각 도시에 설치된 센서에서 보고한 온도 스트림을 출력한다.

코드는 git 에서 보고, 예제에서 알 수 있는 몇가지 내용에 대해 정리한다.

  • 1번 request 를 한 뒤 onNext 로 다시 request를 하는 식으로 코드를 구현했다. 이렇게 할 경우, 재귀적으로 호출되어 StackOverFlow가 발생할 수 있으므로, 별도의 스레드에서 request를 처리하도록 한다.

  • Processor 의 목적은 Publisher를 구독한 다음, 수신한 데이터를 가공해 다시 제공하는 것이다.

public static Flow.Publisher<TempInfo> getCelsiusTemperatures(String town) {
    return subscriber -> {
        TempProcessor processor = new TempProcessor();
        processor.subscribe(subscriber); // subscriber 를 등록
        processor.onSubscribe(new TempSubscription(processor, town)); // publisher 를 등록
    };
}

자바는 왜 플로 API 구현을 제공하지 않는가?

앞선 예제에서 우리는 Flow 인터페이스를 직접 구현했다. 이렇듯, 자바 라이브러리에 왜 플로 API를 구현하지 않았을까?

API를 만들 당시, Akka, RxJava 등 다양한 리액티브 스트림의 자바 코드 라이브러리가 이미 존재했기 떄문이다. 따라서 Flow 인터페이스를 기반으로 리액티브 개념을 구현하도록 진화했다.

리액티브 라이브러리 RxJava 사용하기

RxJava 는 자바로 리액티브 애플리케이션을 구현하는 데 사용하는 라이브러리다. 이벤트 스트림을 두 가지 구현 클래스로 제공한다.

  1. 역압력 기능이 있는 Flow 를 포함하는 io.reactivex.Flowable
  2. 역압력을 지원하지 않는 Publisher io.reativex.Observable

모든 Subscriber 는 구독 객체의 request(Long.MAX_VALUE) 메서드를 이용해 역압력 기능을 끌 수 있다.

Observable 만들고 사용하기

Observable, Flowable 클래스는 다양한 종류의 리액티브 스트림을 편리하게 만들 수 있도록 여러 팩토리 메서드를 제공한다.

Observable<String> strings = Observable.just("first", "second");

여기서 just() 팩토리 메서드는 한 개 이상의 요소를 이용해 이를 방출하는 Observable 로 변환한다.

Observable 의 구독자는 onNext(“first”), onNext(“second”), onComplete() 의 순서로 메시지를 받는다.

지정된 속도로 이벤트를 방출하는 상황에서 interval 메서드를 쓸 수도 있다.

Observable<Long> onePerSec = Observable.interval(1, TimeUnit.SECONDS);

0에서 시작해 1초 간격으로 long 형식의 값을 무한으로 증가시키며 방출한다. (즉, 예제에서 1초마다 온도 보고를 방출하는 로직을 대체할 수 있다.)

RxJava 에서 Observable 이 플로 API의 Publisher 역할을 하며, Observer 는 Flow의 Subscriber 인터페이스 역할을 한다. onSubscribe 메서드가 Subscription 대신 Disposable 인수를 갖는다는 점만 다르다.

Observable 은 역압력을 지원하지 않으므로 Subscription의 request 메서드를 포함하지 않는다.

RxJava의 API는 플로 API보다 유연하다.

  • 다른 세 메서드는 생략하고 onNext 메서드의 시그니처에 해당하는 람다 표현식을 전달해 Observable 을 구독할 수 있다.

  • 한 스트림에서 방출한 요소를 변환하거나 (map), 합치거나 모을 수 있다. (merge)

댓글남기기