빨간색코딩

Reactive Streams (관찰자 결합, 반복자 결합, Back Pressure, 흐름, Processor, 비동기 및 병렬화 구현방식) 본문

Java

Reactive Streams (관찰자 결합, 반복자 결합, Back Pressure, 흐름, Processor, 비동기 및 병렬화 구현방식)

빨간색소년 2020. 12. 31. 15:59

1. Reactive Streams 란?

  • 리액티브 스트림 스펙을 정의하고, 인터페이스를 제공하는 reactive-streams.org 를 살펴보면, 다음과 같다.
    • Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.
    • non-blocking back pressure 로 비동기적 스트림처리를 제공하는 표준
  • 리액티브 스트림은 관찰자(Observer) 패턴, 반복자(Iterator) 패턴, 함수형 패러다임의 조합으로 정의되곤 한다.

1-1. 왜 표준 Reactive Streams가 필요했나?

  • 비동기 요청을 위해 CompletableStage, ListenableFuture 등 다양한 라이브러리들을 사용할 수 있는데, 문제는 각각 호환이 안된다는 것이다. 상호 변환을 위해서는 유틸성 로직을 따로 작성해야 한다.
    • ex) spring AsyncRestTemplate은 ListenableFuture 을 반환하는데, 이걸 상위에서는 CompletableFuture 로 쓰고 싶은 경우..
  • 초기에 가장 인기있던 RxJava 1.x 에 의존하던 다른 라이브러리들이 기능들을 막 추가하면서, 충돌, 버전문제 등이 이슈가 됨
    • RxJava 1.x 가 따라가지 못함

2. 관찰자 패턴의 문제점과 결합

  • 먼저, 용어부터 정리해보자
    • Publisher = Subject = Observable = Producer
    • Subscriber = Observer = Consumer
  • Publisher는 Subscriber에게 Event를 Push한다.
  • cf) 옵저버 패턴 : https://sjh836.tistory.com/180

2-1. 관찰자 패턴의 한계와 리액티브 스트림에서의 해법

  • Observer 가 준비가 되지 않았는데, 이벤트(=데이터)가 전달된다면?(=흐른다면?)

    • 제대로 처리하지 못할 것이다
  • 연속되는 데이터의 끝을 알려주지 못한다.

    • 리액티브 스트림에서는 Iterator 패턴과 조합으로 해결
  • Observer 가 이벤트를 처리하는 속도보다, Subject 가 이벤트를 발행하는 속도가 빠르다면?

    • (빠른 프로듀서와 느린 컨슈머 이슈)
    • Subject 는 이벤트를 발행하고 전달하는 데만 집중할 뿐이다.
    • Observer 는 유실없이 처리하기 위해 Queue를 따로 두거나(이래도 결국 쌓여서 터짐), 별도 조치가 필요할 것이다.
      • 이전 포스팅에서 언급한, Spring EventBus를 사용하는 방식이라면, AsyncTaskExecutor 쓰레드풀이 쉬지않을 것
      • 무제한 Queue = OOM
      • 크기제한 Queue = drop으로 인한 유실
      • BlockingQueue = 비동기동작을 사실상 모두 무효화..
    • 리액티브 스트림에서는 back pressure 를 통해, 이 문제를 해결한다.

2-2. 결합하기

// 관찰자 패턴의 주체
public interface Observable<T> {
    void registerObserver(Observer<T> observer); // 리액티브 스트림에서는 Publisher - Subscriber 가 한 쌍이다. (복잡성, 동시성, 메모리누수 등을 해결)
    void unregisterObserver(Observer<T> observer); // 리액티브 스트림에서는 Subscription.cancle 을 통해 구독을 취소할 수 있다.
    void notifyObservers(T event); // 리액티브 스트림에선 Subscription를 통해 Subscriber.onNext, onComplete, onError 로 전달된다.
}

// 리액티브스트림의 발행자
public interface Publisher<T> {
   public void subscribe(Subscriber<? super T> s); // 관찰자의 registerObserver
}

3. 반복자 패턴과 결합

  • next()를 통해 데이터를 리턴받기 때문에, Pull이라고 볼 수 있다.
  • 데이터의 끝을 hasNext()를 통해 알 수 있다.
  • 이것을 결합해보면, 다음과 같다.
// 관찰자 패턴의 관찰자
public interface Observer<T> {
    void observe(T event);
}

// 반복자 패턴의 반복자
public interface Iterator<E> {
    boolean hasNext();
    E next();
}

// 리액티브 스트림의 구독자
public interface Subscriber<T> {
    public void onNext(T t); // 관찰자의 observe + 반복자의 next
    public void onComplete(); // 반복자의 hasNext
    public void onError(Throwable t); // next에서 Exception이 발생할 때, 전파를 위하여
}

4. Back Pressure (배압)

  • 빠른 Publisher - 느린 Subscriber 문제를 해결하는 원리이다.
  • Publisher의 일방적 데이터 Push 가 아니라, Subscriber가 처리할 수 있을 만큼의 데이터만 Subscriber의 요청에 의해서 전달해주는 것이다.
    • 중간에 Queue 같은게 필요없어짐
    • 이것을 dynamic pull 이라고 부른다.
  • 리액티브 스트림에서는 이것을 Subscription 로 제어한다. request 메소드를 통해 요청량을 조절
    • request 를 LONG.MAX 개씩 요청하면 순수 push 모델이 되는 것이고,
    • request 를 onNext 당 1개씩 요청하면 pull 모델이 된다.
public interface Subscription {
    public void request(long n);
    public void cancel();
}

public interface Subscriber<T> {
    public void onSubscribe(Subscription s); // 전달받은 Subscription 은 Publisher와 Subscriber 사이의 통신 매개체가 된다.

    // 나머지는 위와 동일해서 생략...
}

5. 최종적인 흐름

  1. Subscriber 가 subscribe 메소드를 통해 Publisher 에게 구독을 요청
  2. Publisher 는 onSubscribe 메소드로 Subscriber 에게 Subscription 를 전달
  3. Subscriber 는 Subscription.request 을 통해, 자신에게 데이터를 흘려줄 것을 요구
  4. Publisher 는 Subscription 를 통해 Subscriber.onNext로 데이터를 전달한다.
    • Subscriber 는 내부에 Subscription를 set하였기 때문 (2번)
  5. 전달이 잘 끝났으면, onComplete, 오류났다면 onError 로 끝낸다.

6. Processor

  • Publisher 와 Subscriber 를 혼합한 Processor 라는 것도 있다.

      public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
      }
  • 이 둘 사이에서 몇 가지 처리 단계를 유연하게 추가할 수 있다.

  • 하나의 subscriber 의 결과물을 다른 subscriber 에 그대로 전달하거나, 변형할 때도 사용할 수있다. 마치 새로운 Publisher 처럼 행동하는 것이다.

  • ex1) 구독자 중 기준에 일치하는 구독자들에게만 전송한다던지..

  • ex2) 멀티캐스팅

7. jdk9 Flow

  • 리액티브 스트림은 jdk9 에서도 Flow API 로 반영이 되었다.
  • 리액티브 스트림 타입들을 Flow 로도 쉽게 변환이 가능하다. (ex. FlowAdapters)

8. 리액티브 스트림은 어떻게 비동기 및 병렬을 구현할까?

8-1. 리액티브 스트림 규약

  • reactive-streams.org 에서, Publisher 가 생성하고 Subscriber 가 소비하는 모든 신호는 non-blocking 이어야한다고 정의했다.
    • 단순 인터페이스와 규칙만 정해준거니.. 논블로킹은 개발자의 책임인 듯? (사실 Publisher - Subscriber 를 직접 구현할 일은 거의 없긴함)
    • 따라서, Publisher - Subscriber 를 구현할 때, block이 되지않도록 해야함
  • 전제를 만족했다면, 멀티코어를 모두 활용하기 위해서 Subscriber.onNext 메소드를 병렬로 호출해야하는데, 이것도 규약에 막혀있다.
    • on*** 호출은 스레드 안전성을 보장하여 신호를 보내야하며, 멀티 스레드에서 수행될 경우, 외부에서 동기화 요망
    • 따라서, 순차적으로만 onNext 를 호출할 수 밖에 없다..
    • Publisher 가 멀티스레딩하여 Subscriber.onNext 를 동시에 호출할수 없는 것..
  • 그렇다면, 도대체 어떻게 병렬로 구현할 수 있는가?

8-2. 리액티브 스트림 파이프라인의 병렬화

  • 파이프는 데이터 소스에서 몇가지 처리 또는 변환 단계를 거쳐 목적지까지 흘러간다.
    • 이 때, 데이터 소스에서 메세지 생성이 오래걸리거나 리소스를 많이 소모시킬 수 있고,
    • 메세지 생성은 금방되는데, 변환 처리가 오래걸릴 수도 있고,
    • 목적지가 데이터를 수신하는데, 리소스를 많이소모하며 오래걸릴 수도 있다.
  • 위에서 보듯이, 각 단계를 비동기적으로 메세지를 전달하는 것이다.
  • 정리하면, 처리 단계별로. 별도의 스레드에 바인딩되는 컨셉이다.
  • 하지만, 이것을 잘 균형있게 분할하고 배치하고 스레드에 바인딩하는 것은 까다로운 작업이다. 다행히, 리액티브 라이브러리들은 이런걸 스케줄러 API로 제공해준다.
Comments