Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
Tags
- cache
- AWS
- socket
- r
- html
- network
- Elk
- mongodb
- HTTP
- VCS
- mybatis
- redis
- github
- javascript
- Linux
- nodejs
- 네트워크
- spring
- reactor
- git
- 데이터통신
- Java
- libuv
- Lombok
- NoSQL
- effective
- reactive
- ajax
- Static
- Heap
Archives
- Today
- Total
빨간색코딩
Reactor 의 탄생, 생성과 구독 (역사, Publisher 구현체 Flux와 Mono, defer, Subscriber 직접구현) 본문
Java
Reactor 의 탄생, 생성과 구독 (역사, Publisher 구현체 Flux와 Mono, defer, Subscriber 직접구현)
빨간색소년 2021. 1. 18. 02:23- 참조문서
리액티브 스트림을 기본적으로 이해해야, Reactor 도 수월하게 이해할 수 있다.
cf) 리액티브 스트림 포스팅 : https://sjh836.tistory.com/182
예제코드는 io.projectreactor:reactor-core:3.4.2 기준으로 작성되었다.
1. Reactor의 탄생
- reactor 1.0은 13년 7월에 출시되었다. spring 프레임워크의 개발팀인 pivotal 에서 만든 오픈소스이다.
- 리액터 패턴, 함수형 프로그래밍, 메세지 기반 등의 설계와 모범사례들을 통합
- 리액터 패턴 포스팅 : https://sjh836.tistory.com/184
- 당시, 스프링 개발팀은 대용량 데이터 개발을 단순화하는 Spring XD 프로젝트를 개발 중이었다.
- 17년 7월부로 deprecated 되었고, Spring Cloud Data Flow 로 넘어갔다.
- Spring XD 에 비동기 논블로킹을 지원하기 위해 만들어진 프로젝트가 Reactor 이다.
- 장점 : spring 과 완벽한 통합, netty 도 지원, 비동기 논블로킹 메세지 처리, 고성능
- 단점 : Back Pressure 기능 없음, 복잡한 오류처리
2. Reactive Stream과 Reactor 3.x
- reactor 2.0 는 위 단점들을 해결하고, 15년 3월에 나왔다.
- onOverflowBuffer(), onOverflowDrop() 등을 통해서 Back Pressure 를 지원
- 15년 4월에는 출시된 리액티브 스트림 표준 스펙이 발표되었다.
- API 및 규칙만 정의하고, 사용할 수 있는 라이브러리를 제공하진 않았다.
- reactor 는 리액티브 스트림 표준 스펙을 2.5 마일스톤 버전부터 지원했다. (16년 2월)
- 하지만, 릴리즈는 3.0으로 했다. (16년 8월)
- 비슷한 시기에 리액티브 스트림 표준을 지원하는 RxJava 2.0 도 출시되었는데, 가장 큰 차이점은 RxJava는 안드로이드 포함한 jdk6 부터 지원했고, reactor는 jdk8부터 지원했다.
3. Publisher<T>
의 구현체
- Flux와 Mono가 있다. 이 둘은 대략적인 카디널리티 정보를 담는 식으로 타입을 구분한다.
- 아이템이 0개 아니면 1개라면 Mono를 써서 표현력을 좀 더 좋게 해주는 식이다.
- 연산자도 그에 맞는것만 지원하고..
- Mono의 연산자들은 버퍼 중복, 값비싼 동기화 작업 등이 생략되어 있다.
- Flux와 Mono는 서로 쉽게 변환할 수 있다.
Flux<T>.collectList() = Mono<List<T>>
Mono<T>.flux() = Flux<T>
3-1. Flux<T>
- Flux는 여러 요소(0 ~ N)를 생성할 수 있는 리액티브 스트림이다.
- 메모리 부족을 야기하지 않고도 무한대의 리액티브 스트림을 만들 수 있다.
Flux.range(0, 5).repeat()
- 다만, 이것을 수집하려고 시도하면 OOM이 날 것이다.
.collectList().block()
3-2. Mono<T>
- Mono는 최대 하나의 요소(0 ~ 1)를 생성할 수 있는 리액티브 스트림이다.
CompletableFuture<T>
와 의미론적으로 동일해서, 비슷한 용도로 사용할 수 있다. 그러나 CompletableFuture는 값을 반환해야하고, 즉시 시작한다는 차이가 있다. Mono 는 구독자가 없으면 가만히 있고.Mono<Void>
를 통해, 뭔가 완료되었다고 알리는데 활용할 수 있다.
4. 리액티브 스트림 시퀀스 생성하기
- Flux와 Mono는 많은 팩토리 메소드를 제공한다.
4-1. Flux
- Flux.just("A", "B")
- Flux.fromArray(new Integer[] { 2, 4, 8 })
- Flux.fromIterable(Arrays.asList(3, 6, 9))
- Flux.range(1, 5);
- Flux.from(publisher) : 다른 Publisher를 Flux로 변환
- Flux.empty()
- Flux.never() : onComplete, onError 신호까지 보내지 않음
- Flux.error(throwable) : 바로 오류를 전파하는 시퀀스를 생성
- Flux.defer(supplier) : 구독되는 순간에 supplier를 실행하여 시퀀스를 생성
4-2. Mono
- Mono.just("devljh")
- Mono.justOrEmpty(null) : nullable, Optional.empty 모두 가능
- Mono.fromCallable(this::httpRequest)
- Mono.fromRunnable(runnable)
- Mono.fromSupplier(supplier)
- Mono.fromFuture(future)
- Mono.fromCompletionStage(completionStage)
- Mono.from(publisher) : 다른 Publisher를 Mono로 변환
- Mono.empty()
- Mono.never() : onComplete, onError 신호까지 보내지 않음
- Mono.error(throwable) : 바로 오류를 전파하는 시퀀스를 생성
- Mono.defer(supplier) : 구독되는 순간에 supplier를 실행하여 시퀀스를 생성
4-2-1. just VS fromSupplier VS defer
- 위 3개는 Mono 시퀀스를 만드는 데, 어떤 차이가 있을까?
- just는 즉시 시퀀스를 만든다.
- fromSupplier과 defer는 구독시점에 시퀀스를 생성한다. (lazy 처리)
- fromSupplier 는
Supplier<? extends T> supplier
를 인자로 받는다. Mono가 아닌 값에 사용하면 된다. - defer 는
Supplier<? extends Mono<? extends T>> supplier
를 인자로 받는다. 이미 Mono로 반환되는 메소드에 사용하는 게 좋다.- 즉, 아래 경우에는 fromSupplier 가 더 적절하고 깔끔해보이는 상황이다.
@Test
public void defer_테스트() throws Exception {
long start = System.currentTimeMillis();
// just
Mono<Long> clock1 = Mono.just(System.currentTimeMillis());
Thread.sleep(5000);
long result1 = clock1.block() - start;
log.info("흐른 시간 = {}", result1); // 5초가 지났으나, 위에서 즉발실행되어서 0 출력
// fromSupplier
Mono<Long> clock2 = Mono.fromSupplier(System::currentTimeMillis);
Thread.sleep(5000);
long result2 = clock2.block() - start;
log.info("흐른 시간 = {}", result2); // 10초 지남
// defer
Mono<Long> clock3 = Mono.defer(() -> Mono.just(System.currentTimeMillis()));
Thread.sleep(5000);
long result3 = clock3.block() - start;
log.info("흐른 시간 = {}", result3); // 15초 지남
}
// 실행결과
23:41:03.091 [main] INFO com.devljh.reactive.practice.ReactorTest - 흐른 시간 = 0
23:41:08.106 [main] INFO com.devljh.reactive.practice.ReactorTest - 흐른 시간 = 10108
23:41:13.128 [main] INFO com.devljh.reactive.practice.ReactorTest - 흐른 시간 = 15123
5. 리액티브 스트림 구독하기
- Flux와 Mono를 구독하려면 subscribe() 메소드를 사용하면 된다. 메소드 시그니처는 아래와 같다.
- 내부적으로는
new LambdaSubscriber<>(...)
를 통해 Subscriber를 생성해서 구독한다. - LambdaSubscriber 내부 코드를 보면, Subscription.request를 조정하지 않는다면, 기본적으로
s.request(Long.MAX_VALUE)
이다.
- 내부적으로는
Disposable subscribe()
: 구독은 하지만, onNext, onError, onComplete 처리는 하지 않는다.Disposable subscribe(Consumer<? super T> consumer)
: onNext 만 처리한다.Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer)
: onNext, onError 만 처리한다.- errorConsumer 는 NotNull 이다.
Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer)
: onNext, onError, onComplete 를 처리한다.Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer)
: onNext, onError, onComplete 를 처리하고, Subscription(요청량 조절, 구독취소) 도 직접 제어한다.- 해당 메소드는 Reactor 3.5 부터 제거될 예정이다. request 제어를 잊는 경우가 많아서 삭제된다고 한다.. 밑의 subscribeWith 를 쓰라고 한다.
<E extends Subscriber<? super T>> E subscribeWith(E subscriber)
: 밑의 메소드와 동일한데, 반환을 해주는 차이가 있다.void subscribe(Subscriber<? super T> actual)
: 리액티브 스트림 표준 Publisher 인터페이스의 subscribe() 를 재정의했기 때문에, 이것도 사용할 수 있다. 다양한 조절을 할 순 있지만, 이걸 직접쓸 일은 거의 없긴 하다.
5-1. Disposable 인터페이스
- Reactor 코어에서 제공하는 인터페이스이며,
@FunctionalInterface
이다. - Subscription을 취소할 수 있다.
@Test
public void 구독취소_테스트() throws Exception {
Disposable disposable = Flux.interval(Duration.ofMillis(500))
.subscribe(i -> log.info("{}", i));
Thread.sleep(2000);
disposable.dispose();
}
// 실행결과
01:24:46.144 [parallel-1] INFO com.devljh.reactive.practice.ReactorTest - 0
01:24:46.652 [parallel-1] INFO com.devljh.reactive.practice.ReactorTest - 1
01:24:47.158 [parallel-1] INFO com.devljh.reactive.practice.ReactorTest - 2
01:24:47.657 [parallel-1] INFO com.devljh.reactive.practice.ReactorTest - 3
5-2. Custom Subscriber 로 구독하기
5-2-1. 권장하지 않는 방법
-
리액티브 스트림 표준 Subscriber 인터페이스를 직접 구현하는 것은 쉬운 일이 아니다.
-
제시하는 표준 스펙을 모두 준수해야하고, TCK(Technology Compatibility Kit) 테스트코드를 통과해야한다.
-
아래는 동작은 하지만, 잘못된 구현이다.
@Test public void Subscriber_잘못된_구현() { Subscriber<String> subscriber = new Subscriber<String>() { /* * 저번 포스팅의 리액티브 스트림 파이프라인 병렬화에서 보았듯이, * 발행과 구독은 각각 다른 쓰레드에서 처리될 수 있다. * 따라서, volatile 키워드를 사용한다. */ private volatile Subscription subscription; @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; this.subscription.request(1); // 구독 후, 최초 요청 } @Override public void onNext(String s) { log.info("onNext = {}", s); this.subscription.request(1); // 데이터 수신 후, 추가 요청 } @Override public void onError(Throwable t) { } @Override public void onComplete() { log.info("onComplete"); } }; Flux.just("A", "B", "C").subscribe(subscriber); } // 실행 결과 01:48:54.609 [main] INFO com.devljh.reactive.practice.ReactorTest - onNext = A 01:48:54.610 [main] INFO com.devljh.reactive.practice.ReactorTest - onNext = B 01:48:54.610 [main] INFO com.devljh.reactive.practice.ReactorTest - onNext = C 01:48:54.613 [main] INFO com.devljh.reactive.practice.ReactorTest - onComplete
-
this.subscription.request(n) 에서 숫자가 0 이하인지, 구독(subscription) 상태는 정상인지 등을 확인하지 않고, 1개씩 계속 요청했기 때문이다.
5-2-2. 안전한 구현 방법
-
리액터 프레임워크에서 미리 만들어둔, BaseSubscriber 추상클래스를 상속해서 구현한다.
-
Subscriber의 onSubscribe, onNext, onError, onComplete 는 모두 재정의할 수 없게 final 로 선언되어 있다.
-
제공하는 hook 을 통해서, 상황에 따라 얼마든지 조정이 가능하다.
public abstract class BaseSubscriber<T> implements CoreSubscriber<T>, Subscription, Disposable { protected void hookOnNext(T value){ // NO-OP } @Override public final void onNext(T value) { Objects.requireNonNull(value, "onNext"); try { hookOnNext(value); } catch (Throwable throwable) { onError(Operators.onOperatorError(subscription, throwable, value, currentContext())); } } // 생략... }
-
아래는 BaseSubscriber 로 동일하게 다시 구현한 코드이다.
@Test public void Subscriber_안전한_구현() { Subscriber<String> subscriber = new BaseSubscriber<String>() { @Override public void hookOnSubscribe(Subscription subscription) { request(1); // 구독 후, 최초 요청 } @Override public void hookOnNext(String s) { log.info("onNext = {}", s); request(1); // 데이터 수신 후, 추가 요청 } @Override public void hookOnComplete() { log.info("onComplete"); } }; Flux.just("A", "B", "C").subscribe(subscriber); } // 실행 결과 02:03:22.931 [main] INFO com.devljh.reactive.practice.ReactorTest - onNext = A 02:03:22.932 [main] INFO com.devljh.reactive.practice.ReactorTest - onNext = B 02:03:22.932 [main] INFO com.devljh.reactive.practice.ReactorTest - onNext = C 02:03:22.933 [main] INFO com.devljh.reactive.practice.ReactorTest - onComplete
'Java' 카테고리의 다른 글
JPA란 무엇인가 (Before JPA, 영속성 컨텍스트, Entity, JPQL, 트랜잭션, N+1 문제) (2) | 2022.01.04 |
---|---|
RxJava 코드로 Reactive Streams 맛보기 (코드레벨의 동작방식, map, filter, reduce 연산자 직접 구현) (0) | 2020.12.31 |
Reactive Streams (관찰자 결합, 반복자 결합, Back Pressure, 흐름, Processor, 비동기 및 병렬화 구현방식) (0) | 2020.12.31 |
Enum 찾기의 달인 (효율적으로 찾기, spring bean과 맵핑) (0) | 2019.12.13 |
[이펙티브 자바3판] 7장 람다와 스트림 (0) | 2019.01.18 |
Comments