Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
Tags
- HTTP
- javascript
- socket
- Static
- reactive
- Elk
- r
- 네트워크
- mybatis
- git
- html
- 데이터통신
- Java
- network
- AWS
- nodejs
- github
- cache
- mongodb
- redis
- Heap
- Lombok
- Linux
- VCS
- reactor
- effective
- NoSQL
- spring
- libuv
- ajax
Archives
- Today
- Total
빨간색코딩
Spring Event와 SSE 로 리액티브하게 접근하기 (EventListener, Server-Sent Events, 비동기 컨트롤러, RxJava로 동일하게 재구현) 본문
Spring
Spring Event와 SSE 로 리액티브하게 접근하기 (EventListener, Server-Sent Events, 비동기 컨트롤러, RxJava로 동일하게 재구현)
빨간색소년 2020. 12. 31. 13:11본격적인 리액티브 프레임워크를 사용하기 전에,
옵저버 패턴 - 발행구독 패턴으로 만들어진 Spring Event와 SSE를 통해 리액티브 어플리케이션을 만들어보자
cf) 옵저버 패턴 포스팅 : https://sjh836.tistory.com/180
1. Spring 에서 Event 를 다룬다?
- 스프링 프레임워크에서는 EventListener를 통해 발행-구독(Pub/Sub) 패턴, 이벤트 버스(EventBus) 등을 지원한다.
- ApplicationContext 레벨에서 지원
- 도메인 서비스간 강한 결합, 강한 의존성을 Event 기반으로 풀어내면, 느슨한 결합 등을 얻을 수 있다.
- 개인적으론, 이것은 상황에 따라 장점이 될수도 있고, 단점이 될 수도 있다고 본다.
- 핵심 비즈니스 이외, 부가적인 비즈니스 로직들(핵심로직 후 Web Push 처리, 회원가입 후 환영메일전송 등)과 같은 상황에선 괜찮을 것 같다.
1-1. @EventListener
- Spring 4.2 이전에는 ApplicationEvent 를 상속받은 CustomEvent를 ApplicationEventPublisher 을 통해
publishEvent(e)
하고ApplicationListener<CustomEvent>.onApplicationEvent
를 구현해야 했다. - 4.2 버전부터는 이것이 편해져서, CustomEvent 도 POJO로 작성하고, 이벤트 핸들러도
@EventListener
만 선언하면 된다.- ApplicationEventPublisher 는 동일
- @Order : 해당 어노테이션을 통해 이벤트 핸들러 간에 우선순서를 제어할 수 있다.
- @Async : 기본적으로 Spring Event 는 동기적이다. 하지만 @Async 를 통해 비동기로 동작할 수 있다.
- Executor 쓰레드풀을 사용
- (@TransactionalEventListener 에 대해선, 다음 기회에 다루어보겠다..)
2. SSE(Server-Sent Events)
- 서버에서 클라이언트로 일방적으로 데이터를 전달할 수 있는 간단한 방법으로 websocket과 sse가 있다.
- 이걸 활용하지 않는다면 주기적으로 polling 해야하는데, 리액티브 디자인에 맞지 않음
- 둘 중에, 구현이 더 간단한 sse를 활용
- cf) spring websocket : https://sjh836.tistory.com/166
- 1-4. http streaming 중 하나가 SSE 다.
- 장점 : 단방향이라 구현이 간단, EventSource 는 접속에 문제가 있으면 자동으로 재연결 시도
- 단점 : IE에선 sse를 지원하지 않음, client 에서 연결을 끊어도(=페이지를 떠나도) 서버단에서 감지가 어려움
3. Spring MVC의 비동기 HTTP 통신 기법
- 서블릿3.0 부터는 비동기로 HTTP 요청을 처리할 수 있어졌다.
- 예를들면, 서블릿 쓰레드(nio-8080-exec-1)가 요청을 받고, 다른 스레드(executor-1)에게 작업을 주고, nio-8080-exec-1는 해제되어 버림. executor-1가 작업을 끝내면 서블릿 쓰레드(nio-8080-exec-2)가 다시 받아서 응답을 보내주는 개념
- 즉, 서블릿 리소스 입장에서 비동기 통신인 것이지, 결국 특정 task 에 대해 단위시간 당 어떤 쓰레드는 동기/블로킹 되고 있다.
- 서블릿 쓰레드의 고갈이 주요한 화두였던 듯?
3-1. 컨트롤러의 반환 타입
Callable<T>
: 컨트롤러의 메소드는 곧바로 Callable로 wrapping된 값을 반환하여, 서블릿 Request 쓰레드를 해제시킨다. (응답은 계속 열려있음) 이후, Spring 컨테이너가 관리하는 TaskExecutor 가 Callable 내 작업을 끝마치고, 다시 서블릿 컨테이너에 진짜 값을 넣어주는 방식이다.- 내부적으로는 DispatcherServlet가 WebAsyncManager에게 컨트롤러한테 받은 Callable 을 넘겨주고 끝남. WebAsyncManager가 작업끝내고, DispatcherServlet에게 다시 dispatch 하여 응답하는 형식
- timeout, executor 를 설정하고 싶으면,
WebAsyncTask<T>
를 사용
DeferredResult<T>
: Callable과 비슷하지만, 다른 쓰레드에게 할당하고 그런게 아니라, 단순히DeferredResult.setResult(결과값)
을 해야 응답이 내려간다.- setResult 으로 시점을 제어할 수 있다는 점에서, long polling 에서 많이 사용하는 모양이다.
- 요청을
Queue<DeferredResult>
에 쌓아두고, event 가 발생하면queue.forEach(dr -> dr.setResult(event))
으로 한번에 응답하는 형식
ListenableFuture<T>
: 내부적으론, ListenableFuture의 callback 에서 DeferredResult.setResult 해주는 식으로 동작한다.- Future 인터페이스 확장, AsyncRestTemplate 의 기본 반환형이다.
ResponseBodyEmitter
: 요청에 대해 하나 이상의 비동기 응답을 리턴할 때 주로 쓰인다.- DeferredResult 와 동작은 비슷하나, emitter.send 를 여러번 할 수 있다는게 다른 듯
SseEmitter
: SSE에 쓰이며, ResponseBodyEmitter 를 상속받음- ContentType이 text/event-stream 이다.
StreamingResponseBody
: OutputStream 전송에 쓰이며, ResponseBodyEmitter 를 상속받음CompletableFuture
4. Spring EventListener와 SSE 로 리액티브 어플리케이션 만들기
- 상황정의 : 유저가 회원가입을 했을 때, Signup Event를 전파하여, 특정 엔드포인트를 구독하고 있는 유저들에게 비동기적으로 알려주는 어플리케이션이다.
4-1. Controller
@Slf4j
@RestController
@RequestMapping("/user")
public class UserController {
private static final long SSE_SESSION_TIMEOUT = 30 * 60 * 1000L;
private Set<SseEmitter> emitterSet = new CopyOnWriteArraySet<>();
@GetMapping(value = "/notice", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter signup(HttpServletRequest request) {
log.info("SSE stream 접근 : {}", request.getRemoteAddr());
SseEmitter emitter = new SseEmitter(SSE_SESSION_TIMEOUT);
emitterSet.add(emitter);
emitter.onTimeout(() -> emitterSet.remove(emitter));
emitter.onCompletion(() -> emitterSet.remove(emitter));
return emitter;
}
@Async
@EventListener
public void onSignupEvent(final SignupEvent signupEvent) {
log.info("신규 회원 = {}, 이벤트 구독자 수 = {}", signupEvent.getUser(), emitterSet.size());
List<SseEmitter> deadEmitters = new ArrayList<>();
emitterSet.forEach(emitter -> {
try {
emitter.send(signupEvent, MediaType.APPLICATION_JSON);
} catch (Exception ignore) {
deadEmitters.add(emitter);
}
});
emitterSet.removeAll(deadEmitters);
}
}
4-2. Service
@Service
@RequiredArgsConstructor
public class UserService {
private final ApplicationEventPublisher applicationEventPublisher;
public void signup(final User user) {
// 회원가입 핵심 비즈니스 로직...
// 부가적 비즈니스 수행 등을 위하여 이벤트 전파
applicationEventPublisher.publishEvent(new SignupEvent(user));
}
}
4-3. SignupEvent
@Getter
@AllArgsConstructor
public class SignupEvent {
private User user;
}
4-4. 회원가입 발생을 위한 스케줄러
@Component
@RequiredArgsConstructor
public class SignupScheduler {
private final UserService userService;
private Random random = new Random();
@Scheduled(fixedDelay = 2000)
public void signupTask() {
userService.signup(createTestUser());
}
private User createTestUser() {
return new User("빨간색소년", random.nextInt(30));
}
}
4-5. Config
@Configuration
public class Config implements AsyncConfigurer, SchedulingConfigurer {
@Override
public Executor getAsyncExecutor() { // AsyncConfigurer 구현을 통해 공통Pool 생성
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("ljh-executor-");
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(5);
executor.initialize();
return executor;
}
@Override
public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(4);
scheduler.setThreadNamePrefix("ljh-scheduler-");
scheduler.initialize();
scheduledTaskRegistrar.setTaskScheduler(scheduler);
}
}
4-6. 결과 및 정리
- 회원가입이 발생할 때, 구독자들에게 이벤트가 잘 전달된다는 점에서 반응성(리액티브)를 가졌다고 볼 순 있다.
- 그러나, 구독자가 없을 때도 계속 이벤트가 발생되고 있다. (자원 낭비)
- Spring ApplicationContext 에 의존성이 있어서, 테스트 등이 쉽지않겠다
- Guava EventBus 등이 대안이 될 순 있을 듯
- 비동기 처리라곤 하지만, 결국 AsyncExecutor 라는 다른 쓰레드풀에서 처리하는 것임
- 어플리케이션 내 이벤트버스들은 다 갖고있는 문제일텐데, 스케일아웃 이슈가 있겠다.
5. RxJava 1.x 와 SSE 로 똑같이 만들어보자
- 위에서는 Spring Event 를 통해 Event를 전파하고 수신받아서 구현했다면, 이번에는 RxJava의 발행 - 구독 패턴으로 만들어보자
5-1. Controller
@Slf4j
@RequiredArgsConstructor
@RestController
@RequestMapping("/user")
public class UserController2 {
private static final long SSE_SESSION_TIMEOUT = 30 * 60 * 1000L;
private final SignupScheduler2 signupScheduler2;
@GetMapping(value = "/notice2", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public RxSseEmitter signup(HttpServletRequest request) {
log.info("SSE stream 접근 : {}", request.getRemoteAddr());
RxSseEmitter<User> emitter = new RxSseEmitter<>(SSE_SESSION_TIMEOUT);
// 회원가입 스트림 소스를 emitter가 구독한다.
signupScheduler2.signupUserStream().subscribe(emitter.getSubscriber());
return emitter;
}
}
5-2. RxSseEmitter
public class RxSseEmitter<T> extends SseEmitter {
private Subscriber<T> subscriber;
public RxSseEmitter(long timeout) {
super(timeout);
this.subscriber = new Subscriber<T>() {
@Override
public void onNext(T user) {
try {
// 4-1번에서는 emitter.send(signupEvent) 하는 코드 부분이겠다.
RxSseEmitter.this.send(user);
} catch (Exception e) {
unsubscribe();
}
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
};
onCompletion(subscriber::unsubscribe);
onTimeout(subscriber::unsubscribe);
}
public Subscriber<T> getSubscriber() {
return subscriber;
}
}
5-3. 회원가입 발생을 위한 스케줄러
@Component
@RequiredArgsConstructor
public class SignupScheduler2 {
private final UserService2 userService2;
private Observable<User> hotStream;
private Random random = new Random();
@PostConstruct
private void signupTask() {
// 콜드스트림에서 핫스트림으로 만든다. 구독자가 1명이상 있을때, 2초마다 회원가입을 발생시켜준다.
// 다만, 핫스트림이므로 구독자가 N명이어도 동일한 데이터를 전파한다.
this.hotStream = Observable.interval(2000, TimeUnit.MILLISECONDS)
.map(tick -> createTestUser(tick))
.share(); // share() = publish() + refCount()
//signupUserStream().subscribe(userService2::signup); // 구독자 2개 상황 재현용
}
public Observable<User> signupUserStream() {
return this.hotStream;
}
private User createTestUser(long tick) {
System.out.println(tick);
return new User("빨간색소년", random.nextInt(30));
}
}
5-4. Service
@Slf4j
@Service
public class UserService2 {
public void signup(final User user) {
log.info("회원가입을 시도합니다. user = {}", user);
// 회원가입 핵심 비즈니스 로직...
}
}
5-5. 결과 및 정리
- 구독자가 없으니, 데이터가 전달되고 있지 않았다. (자원 낭비X)
- 구독자가 N개 되는 상황에서도, 핫 스트림을 사용하니, 중간데이터부터 구독을 한다. (형광펜 부분)
- 여러 브라우저에서 동시에 요청을 와도 마찬가지다. (4번에서는 컨트롤러 내 컬렉션에 모두 저장했다가, emitterSet.forEach - send 하는 방식으로 구현했음)
- 만약, 콜드 스트림이었다면, 아예 스트림 소스부터 새로만들어졌으므로, 아래와 같이 나온다. (share연산자 제거하면 재현된다)
- 우리가 정의한 상황과는 맞지 않으므로 핫스트림을 쓰는게 맞음
'Spring' 카테고리의 다른 글
[스프링배치 완벽가이드] 2장 스프링 배치 (0) | 2021.08.18 |
---|---|
[스프링배치 완벽가이드] 1장 배치와 스프링 (0) | 2021.08.18 |
Reactive와 Spring 4 (C10K, 리액티브 선언문, 리액티브 스프링 등장 전) (0) | 2020.12.30 |
spring webflux와 armeria 살펴보기 (Mono, Flux, gRPC, Thrift) (0) | 2020.06.19 |
spring websocket (polling, Handshake 과정, sockjs, webSocketHandler, 예제) (0) | 2018.08.16 |
Comments