빨간색코딩

RxJava 코드로 Reactive Streams 맛보기 (코드레벨의 동작방식, map, filter, reduce 연산자 직접 구현) 본문

Java

RxJava 코드로 Reactive Streams 맛보기 (코드레벨의 동작방식, map, filter, reduce 연산자 직접 구현)

빨간색소년 2020. 12. 31. 19:42

1. RxJava는?

  • https://github.com/ReactiveX/RxJava
  • 리액티브 프레임워크 중 하나이다.
    • 다른 구현체로는 Akka, 리액터 등
  • RxJava 1.3.8을 끝으로 EOL 되었지만, 개념을 익히는 용도로만 사용해보자
    • RxJava 2.x 나 3.x 사용 권장
    • 아래에서 사용하는 API들은 매우 기본적인 거라, 다른 버전에서도 호환됨

2. 스트림의 생산과 소비

@Test
public void 스트림의_생산과_소비() {
    // 스트림 생산 : 이벤트 생성기
    Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> sub) {
            sub.onNext("Hello, reactive world!");
            sub.onNext("저는 빨간색소년입니다.");
            sub.onCompleted();
        }
    }); // 구독자가 없으므로, 이벤트가 전파되지는 않는다.

    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onNext(String s) {
            log.info(s);
        }

        @Override
        public void onCompleted() {
            log.info("수행 완료!");
        }

        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }
    };

    // 실제 구독이 발생할 때, 위 코드들이 실행된다
    observable.subscribe(subscriber);

    // .forEach 는 subscriber 를 내가 만들진 않았지만, 내부적으로 만들어서 사용한다. (아래 메소드 참조)
    observable.forEach(log::info);
}

// 내부 코드
public final void forEach(final Action1<? super T> onNext) {
    생략...
    subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
}

// 실행결과
17:07:05.376 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - Hello, reactive world!
17:07:05.376 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 저는 빨간색소년입니다.
17:07:05.376 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 수행 완료!
17:07:05.422 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - Hello, reactive world!
17:07:05.422 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 저는 빨간색소년입니다.

3. 비동기 시퀀스

@Test
public void 비동기_시퀀스() throws Exception {
    Subscription subscription = Observable.interval(1, TimeUnit.SECONDS)
            .subscribe(e -> log.info("수신 데이터 = {}", e));
    Thread.sleep(5000);
    subscription.unsubscribe();
    Thread.sleep(5000);
}

// 실행결과
17:10:13.562 [RxComputationScheduler-1] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 수신 데이터 = 0
17:10:14.546 [RxComputationScheduler-1] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 수신 데이터 = 1
17:10:15.554 [RxComputationScheduler-1] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 수신 데이터 = 2
17:10:16.552 [RxComputationScheduler-1] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 수신 데이터 = 3
17:10:17.552 [RxComputationScheduler-1] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 수신 데이터 = 4
  • 스트림의_생산과_소비() 와 무엇이 달라서, 비동기로 실행될까?(=main 스레드에서 실행되지 않을까?)
  • interval() 메소드는 rxjava 내부의 computationScheduler를 사용한다. 해당 스케줄러는 코어 갯수와 같으며, 계산 전용으로 쓰길 권장한다.
public static Observable<Long> interval(long interval, TimeUnit unit) {
    return interval(interval, interval, unit, Schedulers.computation());
}

4. 기본적인 연산자

  • Observable과 Subscriber 만으로도 다양하게 워크플로우를 구현할 수 있다.
    • 이 문구 때문에 아래에서 직접 구현을 해보았음..
  • 연산자들을 스트림의 원소를 조정하거나 스트림 구조 자체를 변경한다.
  • 아래에선 기본적인 연산자 3가지에 대해서만 살펴본다.
  • https://rxmarbles.com/

4-1. map()

  • <R> Observable<R> map(Function<? super T, ? extends R> func)
  • T를 R로 변환하는 함수를 통해 Observable<T>Observable<R>로 변환한다.

4-1-1. 사용 예제

Observable.range(0, 5)
    .map(i -> i * 2)
    .forEach(i -> log.info("결과 = {}", i));

// 실행결과
18:24:55.143 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 0
18:24:55.144 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 2
18:24:55.145 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 4
18:24:55.145 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 6
18:24:55.145 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 8

4-1-2. 코드 레벨에선 어떻게 동작하는 것일까?

// map 연산자
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    // 현재 Observable 과 lambda Function을 조합해서 새로운 Observable 을 만든다.
    return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
}

// OnSubscribeMap 은 Observable.OnSubscribe 이다.
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }

    // 인자 o 는 forEach의 ActionSubscriber 가 넘어올 것이다. (이유는 위에 언급되어있음)
    @Override
    public void call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);

        // 현재의 Observable (= Observable.range(0, 5)) 을 MapSubscriber 가 구독한다.
        source.unsafeSubscribe(parent);
    }

    static final class MapSubscriber<T, R> extends Subscriber<T> {
        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual; // foreach의 ActionSubscriber 이다.
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            R result;

            try {
                result = mapper.call(t); // i -> i * 2 수행
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }

            actual.onNext(result); // ActionSubscriber의 onNext를 호출한다. log.info를 찍게해두었으니, 실행결과처럼 콘솔이 찍히겠지..
        }

        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaHooks.onError(e);
                return;
            }
            done = true;

            actual.onError(e);
        }


        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }

    }
}
  • 정리하면, Observable.range(0, 5)를 MapSubscriber 가 구독하는데, MapSubscriber 내부적으로 연산 후에 ActionSubscriber를 호출하고 있다.
  • 나머지 연산자들도 같은 방식으로 동작함

4-1-3. 직접 구현

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        // OnSubscribeRange 에서도 반복문에서 onNext(index); 하다가 if (index == endIndex) 면 onCompleted 하고 끝낸다.
        for (int i = 0; i < 5; i++) {
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }
});

Function<Integer, Integer> transformer = i -> i * 2;

Subscriber<Integer> subscriber = new Subscriber<Integer>() {
    @Override
    public void onNext(Integer i) {
        i = transformer.apply(i); // OnSubscribeMap 에서도 subscriber.onNext(transformer.apply(t))
        log.info("결과 = {}", i);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
    }
};

observable.subscribe(subscriber);

// 람다 사용
Observable.create(sub -> {
    for (int i = 0; i < 5; i++) {
        sub.onNext(i);
    }
    sub.onCompleted();
}).subscribe(i -> {
    i = transformer.apply((Integer) i); // 타입추론을 할 수 없네...
    log.info("결과 = {}", i);
});

// 실행 결과
18:24:55.146 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 0
18:24:55.146 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 2
18:24:55.146 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 4
18:24:55.146 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 6
18:24:55.146 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 8

4-2. filter()

  • Observable<T> filter(Predicate<? super T> predicate)

4-2-1. 사용 예제

Observable.range(0, 5)
    .filter(i -> i % 2 == 0)
    .forEach(i -> log.info("결과 = {}", i));

// 실행 결과
18:35:10.314 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 0
18:35:10.317 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 2
18:35:10.317 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 4

4-2-2. 직접 구현

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; i < 5; i++) {
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }
});

Predicate<Integer> predicate = i -> i % 2 == 0;

Subscriber<Integer> subscriber = new Subscriber<Integer>() {
    @Override
    public void onNext(Integer i) {
        if (predicate.test(i)) {
            log.info("결과 = {}", i);
        }
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
    }
};

observable.subscribe(subscriber);

// 람다 사용
Observable.create(sub -> {
    for (int i = 0; i < 5; i++) {
        sub.onNext(i);
    }
    sub.onCompleted();
}).subscribe(i -> {
    if (predicate.test((Integer) i)) {
        log.info("결과 = {}", i);
    }
});

// 실행 결과
18:35:10.320 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 0
18:35:10.320 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 2
18:35:10.320 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 4

4-3. reduce()

  • Observable<T> reduce(BiFunction<T, U, R> accumulator)

4-3-1. 사용 예제

Observable.range(0, 5)
        .reduce((x, y) -> x + y)
        .forEach(i -> log.info("결과 = {}", i));

// 실행결과
19:07:59.245 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 10

4-3-2. 직접 구현

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; i < 5; i++) {
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }
});

BiFunction<Integer, Integer, Integer> accumulator = (x, y) -> x + y;

Subscriber<Integer> subscriber = new Subscriber<Integer>() {
    private Integer result = null;

    @Override
    public void onNext(Integer i) {
        Integer temp = result;
        if (temp == null) {
            result = i;
        } else {
            result = accumulator.apply(temp, i);
        }
    }

    @Override
    public void onCompleted() {
        log.info("결과 = {}", this.result);
    }

    @Override
    public void onError(Throwable e) {
    }
};

observable.subscribe(subscriber);

// 실행 결과
19:07:59.250 [main] INFO com.devljh.reactive.practice.ReactiveStreamsTest - 결과 = 10
Comments