리액터 쓰레드 스케줄링
리액터는 비동기 실행을 강제하지 않는다. 예를 들어 아래 코드를 보자.
Flux.range(1, 3)
.map(i -> {
logger.info("map {} to {}", i, i + 2);
return i + 2;
})
.flatMap(i -> {
logger.info("flatMap {} to Flux.range({}, {})", i, 1, i);
return Flux.range(1, i);
})
.subscribe(i -> logger.info("next " + i));
위 코드에서 logger는 쓰레드 이름을 남기도록 설정한 로거라고 하자. 위 코드를 main 메서드에서 실행하면 다음과 같은 결과를 출력한다.
17:44:57.180 [main] INFO schedule.ScheduleTest - map 1 to 3
17:44:57.183 [main] INFO schedule.ScheduleTest - flatMap 3 to Flux.range(1, 3)
17:44:57.202 [main] INFO schedule.ScheduleTest - next 1
17:44:57.202 [main] INFO schedule.ScheduleTest - next 2
17:44:57.202 [main] INFO schedule.ScheduleTest - next 3
17:44:57.202 [main] INFO schedule.ScheduleTest - map 2 to 4
17:44:57.202 [main] INFO schedule.ScheduleTest - flatMap 4 to Flux.range(1, 4)
17:44:57.202 [main] INFO schedule.ScheduleTest - next 1
17:44:57.202 [main] INFO schedule.ScheduleTest - next 2
17:44:57.202 [main] INFO schedule.ScheduleTest - next 3
17:44:57.202 [main] INFO schedule.ScheduleTest - next 4
17:44:57.202 [main] INFO schedule.ScheduleTest - map 3 to 5
17:44:57.202 [main] INFO schedule.ScheduleTest - flatMap 5 to Flux.range(1, 5)
17:44:57.203 [main] INFO schedule.ScheduleTest - next 1
17:44:57.203 [main] INFO schedule.ScheduleTest - next 2
17:44:57.203 [main] INFO schedule.ScheduleTest - next 3
17:44:57.203 [main] INFO schedule.ScheduleTest - next 4
17:44:57.203 [main] INFO schedule.ScheduleTest - next 5
실행 결과를 보면 map(), flatMap(), subscribe()에 전달한 코드가 모두 main 쓰레드에서 실행된 것을 알 수 있다. 즉 map 연산, flatMap 연산뿐만 아니라 subscribe를 이용한 구독까지 모두 main 쓰레드가 실행한다.
스케줄러를 사용하면 구독이나 신호 처리를 별도 쓰레드로 실행할 수 있다.
publishOn을 이용한 신호 처리 쓰레드 스케줄링
publishOn() 메서드를 이용하면 next, complete, error신호를 별도 쓰레드로 처리할 수 있다. map(), flatMap() 등의 변환도 publishOn()이 지정한 쓰레드를 이용해서 처리한다. 다음 코드를 보자.
CountDownLatch latch = new CountDownLatch(1);
Flux.range(1, 6)
.map(i -> {
logger.info("map 1: {} + 10", i);
return i + 10;
})
.publishOn(Schedulers.newElastic("PUB"), 2)
.map(i -> { // publishOn에서 지정한 PUB 스케줄러가 실행
logger.info("map 2: {} + 10", i);
return i + 10;
})
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
logger.info("hookOnSubscribe");
requestUnbounded();
}
@Override
protected void hookOnNext(Integer value) {
logger.info("hookOnNext: " + value); // publishOn에서 지정한 스케줄러가 실행
}
@Override
protected void hookOnComplete() {
logger.info("hookOnComplete"); // publishOn에서 지정한 스케줄러가 실행
latch.countDown();
}
});
latch.await();
publishOn()은 두 개의 인자를 받는다. 이 코드에서 첫 번째 인자인 Schedulers.newElastic("PUB")은 비동기로 신호를 처리할 스케줄러이다. 다양한 스케줄러가 존재하는데 이에 대해서는 뒤에서 다시 살펴본다. 일단 지금은 스케줄러가 별도 쓰레드를 이용해서 신호를 처리한다고 생각하면 된다.
두 번째 인자인 2는 스케줄러가 신호를 처리하기 전에 미리 가져올 (prefetch) 데이터 개수이다. 이는 스케줄러가 생성하는 비동기 경계 시점에 보관할 수 있는 데이터의 개수로 일종의 버퍼 크기가 된다.
위 코드를 실제로 실행하면 어떤 일이 벌어지는지 보자. 다음은 결과이다.
13:01:03.026 [main] INFO schedule.ScheduleTest - hookOnSubscribe
13:01:03.029 [main] INFO schedule.ScheduleTest - map 1: 1 + 10
13:01:03.030 [main] INFO schedule.ScheduleTest - map 1: 2 + 10
13:01:03.031 [PUB-2] INFO schedule.ScheduleTest - map 2: 11 + 10
13:01:03.037 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 21
13:01:03.037 [PUB-2] INFO schedule.ScheduleTest - map 2: 12 + 10
13:01:03.037 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 22
13:01:03.037 [PUB-2] INFO schedule.ScheduleTest - map 1: 3 + 10
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 1: 4 + 10
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 2: 13 + 10
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 23
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 2: 14 + 10
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 24
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 1: 5 + 10
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 1: 6 + 10
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 2: 15 + 10
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 25
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 2: 16 + 10
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 26
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - hookOnComplete
최초에 2개를 미리 가져올 때를 제외하면 나머지는 모두 publishOn()으로 전달한 스케줄러의 쓰레드(쓰레드 이름이 "PUB"로 시작)가 처리하는 것을 알 수 있다.
publishOn()에 지정한 스케줄러는 다음 publishOn()을 설정할 때까지 적용된다. 예를 들어 다음과 같이 이름이 PUB1과 PUB2인 두 개의 스케줄러를 설정했다고 하자.
Flux.range(1, 6)
.publishOn(Schedulers.newElastic("PUB1"), 2)
.map(i -> {
logger.info("map 1: {} + 10", i);
return i + 10;
})
.publishOn(Schedulers.newElastic("PUB2"))
.map(i -> {
logger.info("map 2: {} + 10", i);
return i + 10;
})
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
logger.info("hookOnSubscribe");
requestUnbounded();
}
@Override
protected void hookOnNext(Integer value) {
logger.info("hookOnNext: " + value);
}
@Override
protected void hookOnComplete() {
logger.info("hookOnComplete");
latch.countDown();
}
});
이 코드를 실행한 결과는 다음과 같다.
13:38:14.957 [main] INFO schedule.ScheduleTest - hookOnSubscribe
13:38:14.960 [PUB1-4] INFO schedule.ScheduleTest - map 1: 1 + 10
13:38:14.963 [PUB1-4] INFO schedule.ScheduleTest - map 1: 2 + 10
13:38:14.963 [PUB2-3] INFO schedule.ScheduleTest - map 2: 11 + 10
13:38:14.963 [PUB1-4] INFO schedule.ScheduleTest - map 1: 3 + 10
13:38:14.964 [PUB1-4] INFO schedule.ScheduleTest - map 1: 4 + 10
13:38:14.964 [PUB1-4] INFO schedule.ScheduleTest - map 1: 5 + 10
13:38:14.964 [PUB1-4] INFO schedule.ScheduleTest - map 1: 6 + 10
13:38:14.969 [PUB2-3] INFO schedule.ScheduleTest - hookOnNext: 21
13:38:14.979 [PUB2-3] INFO schedule.ScheduleTest - map 2: 12 + 10
13:38:14.979 [PUB2-3] INFO schedule.ScheduleTest - hookOnNext: 22
...생략
13:38:15.021 [PUB2-3] INFO schedule.ScheduleTest - map 2: 16 + 10
13:38:15.021 [PUB2-3] INFO schedule.ScheduleTest - hookOnNext: 26
13:38:15.031 [PUB2-3] INFO schedule.ScheduleTest - hookOnComplete
결과를 보면 첫 번째 publishOn()과 두 번째 publishOn() 사이의 map() 처리는 PUB1 스케줄러가 실행하고 두 번째 publishOn() 이후의 map(), 신호 처리는 PUB2 스케줄러가 실행한 것을 알 수 있다.
subscribeOn을 이용한 구독 처리 쓰레드 스케줄링
subscribeOn()을 사용하면 Subscriber가 시퀀스에 대한 request 신호를 별도 스케줄러로 처리한다. 즉 시퀀스(Flux나 Mono)를 실행할 스케줄러를 지정한다. 다음은 subscribeOn()의 사용예이다.
CountDownLatch latch = new CountDownLatch(1);
Flux.range(1, 6)
.log() // 보다 상세한 로그 출력 위함
.subscribeOn(Schedulers.newElastic("SUB"))
.map(i -> {
logger.info("map: {} + 10", i);
return i + 10;
})
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
logger.info("hookOnSubscribe"); // main thread
request(1);
}
@Override
protected void hookOnNext(Integer value) {
logger.info("hookOnNext: " + value); // SUB 쓰레드
request(1);
}
@Override
protected void hookOnComplete() {
logger.info("hookOnComplete"); // SUB 쓰레드
latch.countDown();
}
});
latch.await();
subscribeOn()으로 지정한 스케줄러는 시퀀스의 request 요청 처리뿐만 아니라 첫 번째 publishOn() 이전까지의 신호 처리를 실행한다. 따라서 위 코드를 실행하면 Flux.range()가 생성한 시퀀스의 신호 발생뿐만 아니라 map() 실행, Subscriber의 next, complete 신호 처리를 "SUB" 스케줄러가 실행한다. 참고로 시퀀스의 request 요청과 관련된 로그를 보기 위해 log() 메서드를 사용했다.
다음은 실행 결과이다.
14:56:24.996 [main] INFO schedule.ScheduleTest - hookOnSubscribe
14:56:25.005 [SUB-2] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
14:56:25.010 [SUB-2] INFO reactor.Flux.Range.1 - | request(1)
14:56:25.010 [SUB-2] INFO reactor.Flux.Range.1 - | onNext(1)
14:56:25.011 [SUB-2] INFO schedule.ScheduleTest - map: 1 + 10
14:56:25.016 [SUB-2] INFO schedule.ScheduleTest - hookOnNext: 11
14:56:25.016 [SUB-2] INFO reactor.Flux.Range.1 - | request(1)
14:56:25.016 [SUB-2] INFO reactor.Flux.Range.1 - | onNext(2)
14:56:25.016 [SUB-2] INFO schedule.ScheduleTest - map: 2 + 10
14:56:25.017 [SUB-2] INFO schedule.ScheduleTest - hookOnNext: 12
...(생략)
14:56:25.017 [SUB-2] INFO reactor.Flux.Range.1 - | request(1)
14:56:25.017 [SUB-2] INFO reactor.Flux.Range.1 - | onNext(6)
14:56:25.017 [SUB-2] INFO schedule.ScheduleTest - map: 6 + 10
14:56:25.017 [SUB-2] INFO schedule.ScheduleTest - hookOnNext: 16
14:56:25.017 [SUB-2] INFO reactor.Flux.Range.1 - | request(1)
14:56:25.018 [SUB-2] INFO reactor.Flux.Range.1 - | onComplete()
14:56:25.018 [SUB-2] INFO schedule.ScheduleTest - hookOnComplete
실행 결과에서 Flux.Range 타입은 Flux.range() 메서드가 생성한 시퀀스 객체의 타입이다. 위 결과에서 Flux.Range.1의 reques(1), onNext(), onComplete() 로그는 Subscriber의 request 신호를 처리하는 로그이다. 이 로그를 보면 SUB 스케줄러가 해당 기능을 실행하고 있음을 알 수 있다. 또한 map()과 Subscriber의 신호 처리 메서드(hookOnNext, hookOnComplete)도 SUB 스케줄러가 실행하고 있다.
subscribeOn() + publishOn() 조합
앞서 말했듯이 subscribeOn으로 지정한 스케줄러는 첫 번째 publishOn이 올때까지 적용된다. 다음 코드를 통해 이를 확인할 수 있다.
CountDownLatch latch = new CountDownLatch(1);
Flux.range(1, 6)
.log()
.subscribeOn(Schedulers.newElastic("SUB"))
.map(i -> {
logger.info("map1: " + i + " --> " + (i + 20));
return i + 20;
})
.map(i -> {
logger.info("mapBySub: " + i + " --> " + (i + 100));
return i + 100;
})
.publishOn(Schedulers.newElastic("PUB1"), 2)
.map(i -> {
logger.info("mapByPub1: " + i + " --> " + (i + 1000));
return i + 1000;
})
.publishOn(Schedulers.newElastic("PUB2"), 2)
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
logger.info("hookOnSubscribe");
request(1);
}
@Override
protected void hookOnNext(Integer value) {
logger.info("hookOnNext: " + value);
request(1);
}
@Override
protected void hookOnComplete() {
logger.info("hookOnComplete");
latch.countDown();
}
});
latch.await();
이 코드는 구독을 위한 "SUB" 스케줄러와 신호 처리를 위한 "PUB1", "PUB2" 스케줄러를 설정하고 있다.
다음은 실행 결과이다.
15:10:05.660 [main] INFO schedule.ScheduleTest - hookOnSubscribe
15:10:05.681 [SUB-6] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
15:10:05.687 [SUB-6] INFO reactor.Flux.Range.1 - | request(2)
15:10:05.688 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(1)
15:10:05.718 [SUB-6] INFO schedule.ScheduleTest - map1: 1 --> 21
15:10:05.719 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 21 --> 121
15:10:05.720 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(2)
15:10:05.720 [SUB-6] INFO schedule.ScheduleTest - map1: 2 --> 22
15:10:05.720 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 22 --> 122
15:10:05.721 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 121 --> 1121
15:10:05.722 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 122 --> 1122
15:10:05.734 [SUB-6] INFO reactor.Flux.Range.1 - | request(2)
15:10:05.735 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(3)
15:10:05.735 [SUB-6] INFO schedule.ScheduleTest - map1: 3 --> 23
15:10:05.735 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 23 --> 123
15:10:05.735 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(4)
15:10:05.735 [SUB-6] INFO schedule.ScheduleTest - map1: 4 --> 24
15:10:05.735 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 24 --> 124
15:10:05.736 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1121
15:10:05.736 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1122
15:10:05.736 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 123 --> 1123
15:10:05.736 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 124 --> 1124
15:10:05.736 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1123
15:10:05.736 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1124
15:10:05.736 [SUB-6] INFO reactor.Flux.Range.1 - | request(2)
15:10:05.736 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(5)
15:10:05.736 [SUB-6] INFO schedule.ScheduleTest - map1: 5 --> 25
15:10:05.736 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 25 --> 125
15:10:05.737 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(6)
15:10:05.737 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 125 --> 1125
15:10:05.737 [SUB-6] INFO schedule.ScheduleTest - map1: 6 --> 26
15:10:05.737 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 26 --> 126
15:10:05.737 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1125
15:10:05.737 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 126 --> 1126
15:10:05.737 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1126
15:10:05.737 [SUB-6] INFO reactor.Flux.Range.1 - | onComplete()
15:10:05.738 [PUB2-4] INFO schedule.ScheduleTest - hookOnComplete
실행 결과를 보면 첫 번째 publishOn()으로 PUB1 스케줄러를 지정하기 전까지는 SUB 스케줄러가 request 요청과 map1, mapBySub 변환을 처리하는 것을 확인할 수 있다.
[노트]
subscribeOn()이 publishOn() 뒤에 위치하면 실질적으로 prefetch할 때를 제외하면 적용되지 않는다. subscribeOn()은 원본 시퀀스의 신호 발생을 처리할 스케줄러를 지정하므로 시퀀스 생성 바로 뒤에 subscribeOn()을 지정하도록 하자. 또한 두 개 이상 subscribeOn()을 지정해도 첫 번째 subscribeOn()만 적용된다.
스케줄러 종류
스프링 리액터는 다음 스케줄러를 기본 제공한다.
- Schedulers.immediate() : 현재 쓰레드에서 실행한다.
- Schedulers.single() : 쓰레드가 한 개인 쓰레드 풀을 이용해서 실행한다. 즉 한 쓰레드를 공유한다.
- Schedulers.elastic() : 쓰레드 풀을 이용해서 실행한다. 블로킹 IO를 리액터로 처리할 때 적합하다. 쓰레드가 필요하면 새로 생성하고 일정 시간(기본 60초) 이상 유휴 상태인 쓰레드는 제거한다. 데몬 쓰레드를 생성한다.
- Schedulers.parallel() : 고정 크기 쓰레드 풀을 이용해서 실행한다. 병렬 작업에 적합하다.
single(), elastic(), parallel()은 매번 새로운 쓰레드 풀을 만들지 않고 동일한 쓰레드 풀을 리턴한다. 예를 들어 아래 코드에서 두 publishOn()은 같은 쓰레드 풀을 공유한다.
someFlux.publishOn(Schedulers.parallel())
.map(...)
.publishOn(Schedulers.parallel())
.subscribe(...);
single(), elastic(), parallel()이 생성하는 쓰레드는 데몬 쓰레드로서 main 쓰레드가 종료되면 함께 종료된다.
같은 종류의 쓰레드 풀인데 새로 생성하고 싶다면 다음 메서드를 사용하면 된다.
- newSingle(String name)
- newSingle(String name, boolean daemon)
- newElastic(String name)
- newElastic(String name, int ttlSeconds)
- newElastic(String name, int ttlSeconds, boolean daemon)
- newParallel(String name)
- newParallel(String name, int parallelism)
- newParallel(String name, int parallelism, boolean daemon)
각 파라미터는 다음과 같다.
- name : 쓰레드 이름으로 사용할 접두사이다.
- daemon : 데몬 쓰레드 여부를 지정한다. 지정하지 않으면 false이다. 데몬 쓰레드가 아닌 경우 JVM 종료시에 생성한 스케줄러의 dispose()를 호출해서 풀에 있는 쓰레드를 종료해야 한다.
- ttlSeconds : elastic 쓰레드 풀의 쓰레드 유휴 시간을 지정한다. 지정하지 않으면 60(초)이다.
- parallelism : 작업 쓰레드 개수를 지정한다. 지정하지 않으면 Runtime.getRuntime().availableProcessors()이 리턴한 값을 사용한다.
newXXX() 로 생성하는 쓰레드 풀은 기본으로 데몬 쓰레드가 아니기 때문에 어플리케이션 종료시에는 다음과 같이 dispose() 메서드를 호출해서 쓰레드를 종료시켜 주어야 한다. 그렇지 않으면 어플리케이션이 종료되지 않는 문제가 발생할 수 있다.
// 비데몬 스케줄러 초기화
Scheduler scheduler = Schedulers.newElastic("SUB", 60, false);
// 비데몬 스케줄러 사용
someFlux.publishOn(scheduler)
.map(...)
.subscribe(...)
// 어플리케이션 종료시에 스케줄러 종료 처리
scheduler.dispose();
병렬 처리와 관련된 내용은 다음에 더 자세히 살펴본다.
일정 주기로 tick 발생: Flux.interval
Flux.interval()을 사용하면 일정 주기로 신호를 발생할 수 있다. 발생 순서에 따라 발생한 정수 값을 1씩 증가시킨다. 다음은 간단한 사용 예이다.
Flux.interval(Duration.ofSeconds(1)) // Flux<Long>
.subscribe(tick -> System.out.println("Tick " + tick));
Thread.sleep(5000);
1초 간격으로 신호가 발생하는 것을 알 수 있다.
interval()은 Schedulers.parallel()를 사용해서 신호를 주기적으로 발생한다. 다른 스케줄러를 사용하고 싶다면 internval(Duration, Scheduler) 메서드를 사용하면 된다.
관련글