- 스프링 리액터 시작하기 1 - 리액티브 스트림 Flux Mono Subscriber
- 스프링 리액터 시작하기 2 - 시퀀스 생성 just, generate
- 스프링 리액터 시작하기 3 - 시퀀스 생성 create, stream
- 스프링 리액터 시작하기 4 - 시퀀스 변환 기초
- 스프링 리액터 시작하기 5 - 에러 처리
- 스프링 리액터 시작하기 6 - 쓰레드 스케줄링
- 스프링 리액터 시작하기 7 - 병렬 실행
- 스프링 리액터 시작하기 8 - 모으기(aggregation)
- 스프링 리액터 시작하기 9 - 묶어서 처리하기(window buffer)
- 스프링 리액터 시작하기 10 - 로깅, 체크포인트
리액터
- 스프링 리액터 Reactor 기초 글 목록 2019.12.03
- 스프링 리액터 시작하기 10 - 로깅, 체크포인트 2018.08.08
- 스프링 리액터 시작하기 8 - 모으기(aggregation) 2018.08.03
- 스프링 리액터 시작하기 7 - 병렬 실행 2018.08.03
- 스프링 리액터 시작하기 6 - 쓰레드 스케줄링 2018.08.01
- 스프링 리액터 시작하기 5 - 에러 처리 2018.07.23 (2)
- 스프링 리액터 시작하기 4 - 시퀀스 변환 기초 2018.07.16 (2)
스프링 리액터 Reactor 기초 글 목록
스프링 리액터 시작하기 10 - 로깅, 체크포인트
스프링 리액터 로깅과 체크포인트
로깅
리액터의 동작을 보다 자세히 보고 싶다면 다음과 같이 log() 메서드를 사용한다. 아래 코드를 보자.
Flux.just(1, 2, 4, 5, 6)
.log()
.map(x -> x * 2)
.subscribe(x -> logger.info("next: {}", x));
"reactor.Flux.Array.1"이라는 로거가 출력한 로그 메시지는 Flux.just()가 생성한 시퀀스의 동작을 로그로 남긴 것이다. 로그를 보면 시퀀스가 request() 신호를 받은 시점, next 신호(onNext(2) 등)나 complete 신호(onComplete())를 발생한 시점을 확인할 수 있다.
로그 레벨은 INFO인데 로그 레벨을 변경하고 싶다면 다음과 같이 log() 메서드를 사용하면 된다.
Flux.just(1, 2, 4, 5, 6)
.log(null, Level.FINE) // java.util.logging.Level 타입
.subscribe(x -> logger.info("next: {}", x));
두 번째 인자로 자바 로깅의 Level.FINE을 주었다. SLF4j를 사용할 경우 리액터는 자바의 FINE 레벨을 SLF4j의 DEBUG 레벨로 기록한다. 따라서 위 코드를 실행하면 다음과 같이 DEBUG 레벨로 로그를 남기는 것을 확인할 수 있다.
08:50:30.098 [main] DEBUG reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
08:50:30.101 [main] DEBUG reactor.Flux.Array.1 - | request(unbounded)
08:50:30.102 [main] DEBUG reactor.Flux.Array.1 - | onNext(1)
08:50:30.102 [main] INFO logging.LoggingTest - next: 1
08:50:30.102 [main] DEBUG reactor.Flux.Array.1 - | onNext(2)
08:50:30.102 [main] INFO logging.LoggingTest - next: 2
다음과 같이 특정 로거를 이용하도록 지정할 수도 있다.
Flux.just(1, 2, 4, 5, 6)
.log("MYLOG") // 또는 log("MYLOG", Level.INFO)
.subscribe(x -> logger.info("next: {}", x));
위 코드를 실행하면 다음과 같이 지정한 로거를 이용해서 로그를 남긴다.
08:51:55.180 [main] INFO MYLOG - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
08:51:55.184 [main] INFO MYLOG - | request(unbounded)
08:51:55.184 [main] INFO MYLOG - | onNext(1)
08:51:55.184 [main] INFO logging.LoggingTest - next: 1
08:51:55.184 [main] INFO MYLOG - | onNext(2)
08:51:55.184 [main] INFO logging.LoggingTest - next: 2
08:51:55.184 [main] INFO MYLOG - | onNext(4)
체크포인트
시퀀스가 신호를 발생하는 과정에서 익셉션이 발생하면 어떻게 될까? 시퀀스가 여러 단게를 거쳐 변환한다면 어떤 시점에 익셉션이 발생했는지 단번에 찾기 힘들 수도 있다. 이럴 때 도움이 되는 것이 체크포인트이다. 다음은 체크포인트 사용 예이다.
Flux.just(1, 2, 4, -1, 5, 6)
.map(x -> x + 1)
.checkpoint("MAP1")
.map(x -> 10 / x) // 원본 데이터가 -1인 경우 x는 0이 되어 익셉션이 발생
.checkpoint("MAP2")
.subscribe(
x -> System.out.println("next: " + x),
err -> err.printStackTrace());
관련글
스프링 리액터 시작하기 8 - 모으기(aggregation)
리액터 모으기(aggregation) 연산
List 콜렉션으로 모으기: collectList()
Flux는 데이터를 콜렉션으로 모을 수 있는 기능을 제공한다. 이 중에서 List로 모아주는 collectList()는 다음과 같이 사용한다.
Mono<List<Integer>> mono = someFlux.collectList();
mono.subscribe(lst -> System.out.println(lst));
collectList()의 리턴 타입은 Mono<List<T>>이므로 Mono를 구독해서 값을 사용하면 된다.
Map 콜렉션으로 모으기: collectMap()
다음의 Flux#collectMap()을 이용해서 Map으로 모을 수도 있다.
- Mono<Map<K, T>> collectMap(Function<? super T, ? extends K> keyExtractor)
- Mono<Map<K, V>> collectMap(Function<? super T, ? extends K> keyExtractor,
Function<? super T, ? extends V> valueExtractor) - Mono<Map<K, V>> collectMap(Function<? super T, ? extends K> keyExtractor,
Function<? super T, ? extends V> valueExtractor,
Supplier<Map<K, V>> mapSupplier)
각 인자는 다음과 같다.
- keyExtractor : 데이터에서 맵의 키를 제공하는 함수
- valueExtractor : 데이터에서 맵의 값을 제공하는 함수
- mapSupplier : 사용할 Map 객체를 제공(mapSupplier가 없는 메서드는 기본으로 HashMap 사용)
다음 코드는 각 메서드의 사용 예이다.
// keyExtractor만 지정. 값은 그대로 사용.
Mono<Map<Integer, Tuple2<Integer, String>>> numTupMapMono =
Flux.just(Tuples.of(1, "일"), Tuples.of(2, "이"), Tuples.of(3, "삼"), Tuples.of(4, "사"))
.collectMap(x -> x.getT1()); // keyExtractor
// String을 리턴하는 valueExtractor 사용.
Mono<Map<Integer, String>> numLabelMapMono =
Flux.just(Tuples.of(1, "일"), Tuples.of(2, "이"), Tuples.of(3, "삼"), Tuples.of(4, "사"))
.collectMap(x -> x.getT1(), // keyExtractor
x -> x.getT2()); // valueExtractor
// Map으로 TreeMap 사용
Mono<Map<Integer, String>> numLabelTreeMapMono =
Flux.just(Tuples.of(1, "일"), Tuples.of(2, "이"), Tuples.of(3, "삼"), Tuples.of(4, "사"))
.collectMap(x -> x.getT1(), // keyExtractor
x -> x.getT2(), // valueExtractor
() -> new TreeMap<>()); // mapSupplier
collectMap은 중복된 키가 존재하면 마지막 데이터와 관련된 값이 사용된다. 예를 들어 아래 코드는 Flux가 생성하는 데이터는 4개지만 키로 사용하는 값이 중복되므로 실제 Map에는 2와 4 두 개의 데이터만 저장된다.
Map의 값을 콜렉션으로 모으기: collectMultiMap()
collectMultiMap()을 사용하면 같은 키를 가진 데이터를 List로 갖는 Map을 생성할 수 있다. 다음은 예제 코드이다.
Mono<Map<Integer, Collection<Integer>>> oddEvenList =
Flux.just(1, 2, 3, 4).collectMultimap(x -> x % 2);
oddEvenList.subscribe(map -> System.out.println(map)); // {0=[2, 4], 1=[1, 3]}
collectMultiMap() 메서드는 collectMap() 메서드와 동일한 파라미터를 갖는다.
개수 새기: count()
Flux#count() 메서드를 사용하면 개수를 제공하는 Mono를 리턴한다.
Mono<Long> countMono = Flux.just(1, 2, 3, 4).count();
누적 하기: reduce()
reduce()는 각 값에 연산을 누적해서 결과를 생성한다. Flux의 데이터를 이용해서 단일 값을 생성하는 범용 기능이라고 보면 된다. 첫 번째 살펴볼 reduce() 메서드 다음과 같다. 이 메서드는 Flux가 발생하는 데이터와 동일 타입으로 누적할 때 사용한다.
- Mono<T> reduce(BiFunction<T, T, T> aggregator)
aggregator는 인자가 두 개인 함수이다. 이 함수의 첫 번째 인자는 지금까지 누적된 값을 받으며, 두 번째 인자는 누적할 데이터를 받는다. aggregator는 두 인자를 이용해서 새로운 누적 값을 리턴한다. 새 누적 값은 다음 데이터를 aggregator 함수로 누적할 때 첫 번째 인자로 사용된다.
예를 들어 간단한 곱셈 기능을 reduce()를 이용해서 다음과 같이 구현할 수 있다.
Mono<Integer> mulMono = Flux.just(1, 2, 3, 4).reduce((acc, ele) -> acc * ele);
mulMono.subscribe(sum -> System.out.println("sum : " + sum);
acc는 이전까지 누적된 값인데, 두 번째 데이터를 누적할 때 첫 번째 데이터를 누적된 값(acc)으로 사용한다. 위 코드는 다음과 같은 계산을 거쳐 최종 값으로 24를 출력한다.
acc1 = 1 // 첫 번째 값을 누적 값의 초기 값으로 사용
acc2 = aggregator(acc1, 2) // 1 * 2
acc3 = aggregator(acc2, 3) // 2 * 3
acc4 = aggregator(acc3, 4) // 6 * 4
누적 값의 초기 값을 지정하고 싶거나 데이터와 다른 타입으로 누적하고 싶다면 다음 reduce() 메서드를 사용한다.
- Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator)
- Mono<A> reduceWith(Supplier<A> initial, BiFunction<A, ? super T, A> accumulator)
reduce()의 initial은 초기 값이고, reduceWith()의 initial은 초기값을 제공하는 Supplier이다. 다음은 초기 값을 사용하는 reduce() 메서드의 사용예이다.
Mono<String> strMono = Flux.just(1, 2, 3, 4)
.reduce("", (str, ele) -> str + "-" + ele.toString());
strMono.subscribe(System.out::println); // -1-2-3-4 출력
누적하면서 값 생성하기: scan()
데이터를 누적하면 중간 누적 결과를 데이터로 생성하고 싶다면 scan() 메서드를 사용한다. 최종 누적된 값 한 개만 발생하는 reduce()와 달리 scan()은 중간 결과를 포함한 여러 값을 생성하므로, scan()의 리턴 타입은 Flux이다. 다음은 같은 타입으로 누적한 결과를 발생하는 scan() 메서드이다.
- Flux<T> scan(BiFunction<T, T, T> accumulator)
리턴 타입이 Flux인 것을 제외하면 reduce()와 동일하다.
다음은 예제 코드이다.
Flux<Integer> seq = Flux.just(1, 2, 3, 4).scan((acc, x) -> acc * x);
seq.subscribe(System.out::println);
다음은 위 코드의 출력 결과이다. 중간 결과가 출력되는 것을 알 수 있다.
1
2
6
24
reduce()와 동일하게 누적 초기값을 갖는 메서드를 제공한다.
- Flux<A> scan(A initial, BiFunction<A, ? super T, A> accumulator)
- Flux<A> scanWith(Supplier<A> initial, BiFunction<A, ? super T, A> accumulator)
다음은 초기 값을 지정하는 사용 예이다.
Flux<Integer> seq = Flux.just(2, 3, 4).scan(1, (acc, x) -> acc * x);
seq.subscribe(System.out::println);
실행 결과는 다음과 같다.
1
2
6
24
결과를 보면 초기 값(1)도 시퀀스의 값으로 발생한 것을 알 수 있다.
데이터 조건 검사
모든/일부 데이터가 특정 조건을 충족하는지 검사할 때는 all()이나 any()를 사용한다.
Mono<Boolean> all = Flux.just(1, 2, 3, 4).all(x -> x > 2);
all.subscribe(b -> System.out.println("all: " + b)); // false
Mono<Boolean> any = Flux.just(1, 2, 3, 4).any(x -> x > 2);
any.subscribe(b -> System.out.println("any: " + b)); // true
데이터가 존재하는지 또는 특정 데이터를 포함하는지 검사할 때는 hasElements()나 hasElement()를 사용한다.
Mono<Boolean> hasElements = Flux.just(1, 2, 3, 4).hasElements();
hasElements.subscribe(b -> System.out.println("hasElements: " + b)); // true
Mono<Boolean> hasElement = Flux.just(1, 2, 3, 4).hasElement(3);
hasElement.subscribe(b -> System.out.println("hasElement: " + b)); // true
관련 글
스프링 리액터 시작하기 7 - 병렬 실행
병렬(Parallel) 처리
시퀀스는 순차적으로 next 신호를 발생하고 Subscriber는 순차적으로 신호를 처리한다. 리액터는 시퀀스가 발생하는 next 신호를 병렬로 처리할 수 있는 방법을 제공한다. 이 글에서는 Flux의 parallel()을 사용하는 방법과 zip()을 이용한 방법에 대해 살펴본다.
parallel()과 runOn()으로 Flux 병렬 처리하기
Flux#parallel()과 runOn()을 사용하면 Flux가 생성하는 next 신호를 병렬로 처리할 수 있다. 다음 예를 보자.
Flux.range(1, 20)
.parallel(2) // 작업을 레일로 나누기만 함
.runOn(Schedulers.newParallel("PAR", 2)) // 각 레일을 병렬로 실행
.map(x -> {
int sleepTime = nextSleepTime(x % 2 == 0 ? 50 : 100, x % 2 == 0 ? 150 : 300);
logger.info("map1 {}, sleepTime {}", x, sleepTime);
sleep(sleepTime);
return String.format("%02d", x);
})
.subscribe(i -> logger.info("next {}", i) );
// nextSleepTime은 인자로 받은 두 정수 값 범위에 해당하는 임의의 값을 생성한다고 가정
Flux#parallel(int parallelism) 메서드는 Flux가 생성하는 next 신호를 parallelism 개수만큼 라운드 로빈 방식으로 신호를 나눈다. 분리한 신호는 일종의 신호를 전달할 레일(rail)을 구성한다. 위 코드는 2를 값으로 주었으므로 2개의 레일을 생성한다. 라운드 로빈 방식을 사용해서 각 레일에 값을 전달하므로 위 코드는 [1, 3, 5, .., 19]를 제공하는 레일과 [2, 4, 6, ..., 20]를 제공하는 레일을 생성한다.
parallel()로 여러 레일을 만든다고 해서 병렬로 신호를 처리하는 것은 아니다. parallel()은 병렬로 신호를 처리할 수 있는 ParallelFlux를 리턴하는데, ParallelFlux의 runOn() 메서드에 다중 쓰레드를 사용하는 스케줄러를 전달해야 병렬로 신호를 처리할 수 있다. 위 코드는 2개 쓰레드를 사용하는 parallel 스케줄러를 전달했으므로 동시에 2개 레일로부터 오는 신호를 처리하게 된다.
병렬로 처리되는 것을 확인하기 위해 map() 메서드는 값이 짝수인 경우 50~150 밀리초, 홀수인 경우 100~300 밀리초 동안 슬립하고 문자열로 변환한 값을 리턴하도록 구현했다. parallel()은 라운드 로빈 방식으로 레일을 나누므로 짝수 레일과 홀수 레일이 생성되므로 슬립 타임 구간이 작은 짝수 레일이 더 빨리 끝나게 된다.
실제 결과를 확인해보자.
13:45:14.272 [PAR-1] INFO parallel.ParallelTest - map1 1, sleepTime 117
13:45:14.272 [PAR-2] INFO parallel.ParallelTest - map1 2, sleepTime 96
13:45:14.378 [PAR-2] INFO parallel.ParallelTest - next 02
13:45:14.378 [PAR-2] INFO parallel.ParallelTest - map1 4, sleepTime 98
13:45:14.399 [PAR-1] INFO parallel.ParallelTest - next 01
13:45:14.399 [PAR-1] INFO parallel.ParallelTest - map1 3, sleepTime 268
13:45:14.477 [PAR-2] INFO parallel.ParallelTest - next 04
13:45:14.477 [PAR-2] INFO parallel.ParallelTest - map1 6, sleepTime 93
13:45:14.570 [PAR-2] INFO parallel.ParallelTest - next 06
...생략
13:45:14.868 [PAR-2] INFO parallel.ParallelTest - map1 16, sleepTime 50
13:45:14.905 [PAR-1] INFO parallel.ParallelTest - next 05
13:45:14.905 [PAR-1] INFO parallel.ParallelTest - map1 7, sleepTime 201
13:45:14.918 [PAR-2] INFO parallel.ParallelTest - next 16
13:45:14.918 [PAR-2] INFO parallel.ParallelTest - map1 18, sleepTime 122
13:45:15.040 [PAR-2] INFO parallel.ParallelTest - next 18
13:45:15.040 [PAR-2] INFO parallel.ParallelTest - map1 20, sleepTime 62
13:45:15.102 [PAR-2] INFO parallel.ParallelTest - next 20
13:45:15.106 [PAR-1] INFO parallel.ParallelTest - next 07
13:45:15.106 [PAR-1] INFO parallel.ParallelTest - map1 9, sleepTime 202
13:45:15.308 [PAR-1] INFO parallel.ParallelTest - next 09
13:45:15.308 [PAR-1] INFO parallel.ParallelTest - map1 11, sleepTime 131
13:45:15.439 [PAR-1] INFO parallel.ParallelTest - next 11
13:45:15.439 [PAR-1] INFO parallel.ParallelTest - map1 13, sleepTime 289
13:45:15.728 [PAR-1] INFO parallel.ParallelTest - next 13
13:45:15.728 [PAR-1] INFO parallel.ParallelTest - map1 15, sleepTime 288
13:45:16.017 [PAR-1] INFO parallel.ParallelTest - next 15
13:45:16.017 [PAR-1] INFO parallel.ParallelTest - map1 17, sleepTime 156
13:45:16.173 [PAR-1] INFO parallel.ParallelTest - next 17
13:45:16.173 [PAR-1] INFO parallel.ParallelTest - map1 19, sleepTime 247
13:45:16.420 [PAR-1] INFO parallel.ParallelTest - next 19
이 경우 스케줄러는 2개의 레일을 먼저 처리한다. 한 레일에 남아 있는 데이터가 없으면 데이터가 남아 있는 다른 레일을 처리한다.
레일당 크기
ParallelFlux#runOn() 메서드는 기본적으로 한 레일 당 Queues.SMALL_BUFFER_SIZE 만큼의 데이터를 저장한다. (이 값은 reactor.bufferSize.small 시스템 프로퍼티 값을 사용하는데 이 값을 지정하지 않으면 256을 사용하고 이 값이 16보다 작으면 16을 사용한다.)
레일에 미리 채울 데이터 개수를 변경하려면 다음과 같이 runOn() 메서드의 두 번째 인자로 값을 주면 된다. 다음 코드는 레일에 미리 채울 값(prefetch)으로 2를 사용한 예이다.
Flux.range(1, 20)
.parallel(4)
.runOn(Schedulers.newParallel("PAR", 2), 2) // 레일에 미리 채울 값으로 2 사용
.subscribe(x -> logger.info("next {}", x));
스케줄러는 2개의 쓰레드를 사용하는데 두 쓰레드를 PAR-1, PAR-2라고 하자. 이 두 쓰레드가 처음에 각각 레일0과 레일1을 선택했다고 하자.
두 쓰레드가 레일의 데이터를 처리하면 상태는 다음과 같이 바뀐다.
Mono.zip()으로 병렬 처리하기
각 Mono의 구독 처리 쓰레드를 병렬 스케줄러로 실행하고 Mono.zip() 메서드를 이용해서 Mono를 묶으면 각 Mono를 병렬로 처리할 수 있다. 다음은 예제 코드이다.
Mono m1 = Mono.just(1).map(x -> {
logger.info("1 sleep");
sleep(1500);
return x;
}).subscribeOn(Schedulers.parallel());
Mono m2 = Mono.just(2).map(x -> {
logger.info("2 sleep");
sleep(3000);
return x;
}).subscribeOn(Schedulers.parallel());
Mono m3 = Mono.just(3).map(x -> {
logger.info("3 sleep");
sleep(2000);
return x;
}).subscribeOn(Schedulers.parallel());
logger.info("Mono.zip(m1, m2, m3)");
Mono.zip(m1, m2, m3)
.subscribe(tup -> logger.info("next: {}", tup);
위 코드에서 m1, m2, m3는 각각 1.5초, 3초, 2초간 슬립한다. 각각은 subscribeOn()을 이용해서 Parallel 스케줄러를 이용해서 구독 요청을 처리하도록 했다. 그리고 Mono.zip()으로 m1, m2, m3를 묶었다.
실제 실행 결과를 보면 m1, m2, m3가 슬립을 동시에 시작하고 약 3초 뒤에 세 Mono의 값을 묶은 Tuple3의 값을 출력하는 것을 알 수 있다. 이를 통해 m1, m2, m3를 동시에 실행했음을 확인할 수 있다.
16:12:34.424 [main] INFO parallel.ParallelTest - Mono.zip(m1, m2, m3)
16:12:34.447 [parallel-1] INFO parallel.ParallelTest - 1 sleep
16:12:34.447 [parallel-3] INFO parallel.ParallelTest - 3 sleep
16:12:34.447 [parallel-2] INFO parallel.ParallelTest - 2 sleep
16:12:37.469 [parallel-2] INFO parallel.ParallelTest - next: [1,2,3]
관련 글
스프링 리액터 시작하기 6 - 쓰레드 스케줄링
리액터 쓰레드 스케줄링
리액터는 비동기 실행을 강제하지 않는다. 예를 들어 아래 코드를 보자.
Flux.range(1, 3)
.map(i -> {
logger.info("map {} to {}", i, i + 2);
return i + 2;
})
.flatMap(i -> {
logger.info("flatMap {} to Flux.range({}, {})", i, 1, i);
return Flux.range(1, i);
})
.subscribe(i -> logger.info("next " + i));
위 코드에서 logger는 쓰레드 이름을 남기도록 설정한 로거라고 하자. 위 코드를 main 메서드에서 실행하면 다음과 같은 결과를 출력한다.
17:44:57.180 [main] INFO schedule.ScheduleTest - map 1 to 3
17:44:57.183 [main] INFO schedule.ScheduleTest - flatMap 3 to Flux.range(1, 3)
17:44:57.202 [main] INFO schedule.ScheduleTest - next 1
17:44:57.202 [main] INFO schedule.ScheduleTest - next 2
17:44:57.202 [main] INFO schedule.ScheduleTest - next 3
17:44:57.202 [main] INFO schedule.ScheduleTest - map 2 to 4
17:44:57.202 [main] INFO schedule.ScheduleTest - flatMap 4 to Flux.range(1, 4)
17:44:57.202 [main] INFO schedule.ScheduleTest - next 1
17:44:57.202 [main] INFO schedule.ScheduleTest - next 2
17:44:57.202 [main] INFO schedule.ScheduleTest - next 3
17:44:57.202 [main] INFO schedule.ScheduleTest - next 4
17:44:57.202 [main] INFO schedule.ScheduleTest - map 3 to 5
17:44:57.202 [main] INFO schedule.ScheduleTest - flatMap 5 to Flux.range(1, 5)
17:44:57.203 [main] INFO schedule.ScheduleTest - next 1
17:44:57.203 [main] INFO schedule.ScheduleTest - next 2
17:44:57.203 [main] INFO schedule.ScheduleTest - next 3
17:44:57.203 [main] INFO schedule.ScheduleTest - next 4
17:44:57.203 [main] INFO schedule.ScheduleTest - next 5
실행 결과를 보면 map(), flatMap(), subscribe()에 전달한 코드가 모두 main 쓰레드에서 실행된 것을 알 수 있다. 즉 map 연산, flatMap 연산뿐만 아니라 subscribe를 이용한 구독까지 모두 main 쓰레드가 실행한다.
스케줄러를 사용하면 구독이나 신호 처리를 별도 쓰레드로 실행할 수 있다.
publishOn을 이용한 신호 처리 쓰레드 스케줄링
publishOn() 메서드를 이용하면 next, complete, error신호를 별도 쓰레드로 처리할 수 있다. map(), flatMap() 등의 변환도 publishOn()이 지정한 쓰레드를 이용해서 처리한다. 다음 코드를 보자.
CountDownLatch latch = new CountDownLatch(1);
Flux.range(1, 6)
.map(i -> {
logger.info("map 1: {} + 10", i);
return i + 10;
})
.publishOn(Schedulers.newElastic("PUB"), 2)
.map(i -> { // publishOn에서 지정한 PUB 스케줄러가 실행
logger.info("map 2: {} + 10", i);
return i + 10;
})
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
logger.info("hookOnSubscribe");
requestUnbounded();
}
@Override
protected void hookOnNext(Integer value) {
logger.info("hookOnNext: " + value); // publishOn에서 지정한 스케줄러가 실행
}
@Override
protected void hookOnComplete() {
logger.info("hookOnComplete"); // publishOn에서 지정한 스케줄러가 실행
latch.countDown();
}
});
latch.await();
publishOn()은 두 개의 인자를 받는다. 이 코드에서 첫 번째 인자인 Schedulers.newElastic("PUB")은 비동기로 신호를 처리할 스케줄러이다. 다양한 스케줄러가 존재하는데 이에 대해서는 뒤에서 다시 살펴본다. 일단 지금은 스케줄러가 별도 쓰레드를 이용해서 신호를 처리한다고 생각하면 된다.
두 번째 인자인 2는 스케줄러가 신호를 처리하기 전에 미리 가져올 (prefetch) 데이터 개수이다. 이는 스케줄러가 생성하는 비동기 경계 시점에 보관할 수 있는 데이터의 개수로 일종의 버퍼 크기가 된다.
위 코드를 실제로 실행하면 어떤 일이 벌어지는지 보자. 다음은 결과이다.
13:01:03.026 [main] INFO schedule.ScheduleTest - hookOnSubscribe
13:01:03.029 [main] INFO schedule.ScheduleTest - map 1: 1 + 10
13:01:03.030 [main] INFO schedule.ScheduleTest - map 1: 2 + 10
13:01:03.031 [PUB-2] INFO schedule.ScheduleTest - map 2: 11 + 10
13:01:03.037 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 21
13:01:03.037 [PUB-2] INFO schedule.ScheduleTest - map 2: 12 + 10
13:01:03.037 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 22
13:01:03.037 [PUB-2] INFO schedule.ScheduleTest - map 1: 3 + 10
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 1: 4 + 10
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 2: 13 + 10
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 23
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 2: 14 + 10
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 24
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 1: 5 + 10
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 1: 6 + 10
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 2: 15 + 10
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 25
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 2: 16 + 10
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 26
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - hookOnComplete
최초에 2개를 미리 가져올 때를 제외하면 나머지는 모두 publishOn()으로 전달한 스케줄러의 쓰레드(쓰레드 이름이 "PUB"로 시작)가 처리하는 것을 알 수 있다.
publishOn()에 지정한 스케줄러는 다음 publishOn()을 설정할 때까지 적용된다. 예를 들어 다음과 같이 이름이 PUB1과 PUB2인 두 개의 스케줄러를 설정했다고 하자.
Flux.range(1, 6)
.publishOn(Schedulers.newElastic("PUB1"), 2)
.map(i -> {
logger.info("map 1: {} + 10", i);
return i + 10;
})
.publishOn(Schedulers.newElastic("PUB2"))
.map(i -> {
logger.info("map 2: {} + 10", i);
return i + 10;
})
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
logger.info("hookOnSubscribe");
requestUnbounded();
}
@Override
protected void hookOnNext(Integer value) {
logger.info("hookOnNext: " + value);
}
@Override
protected void hookOnComplete() {
logger.info("hookOnComplete");
latch.countDown();
}
});
이 코드를 실행한 결과는 다음과 같다.
13:38:14.957 [main] INFO schedule.ScheduleTest - hookOnSubscribe
13:38:14.960 [PUB1-4] INFO schedule.ScheduleTest - map 1: 1 + 10
13:38:14.963 [PUB1-4] INFO schedule.ScheduleTest - map 1: 2 + 10
13:38:14.963 [PUB2-3] INFO schedule.ScheduleTest - map 2: 11 + 10
13:38:14.963 [PUB1-4] INFO schedule.ScheduleTest - map 1: 3 + 10
13:38:14.964 [PUB1-4] INFO schedule.ScheduleTest - map 1: 4 + 10
13:38:14.964 [PUB1-4] INFO schedule.ScheduleTest - map 1: 5 + 10
13:38:14.964 [PUB1-4] INFO schedule.ScheduleTest - map 1: 6 + 10
13:38:14.969 [PUB2-3] INFO schedule.ScheduleTest - hookOnNext: 21
13:38:14.979 [PUB2-3] INFO schedule.ScheduleTest - map 2: 12 + 10
13:38:14.979 [PUB2-3] INFO schedule.ScheduleTest - hookOnNext: 22
...생략
13:38:15.021 [PUB2-3] INFO schedule.ScheduleTest - map 2: 16 + 10
13:38:15.021 [PUB2-3] INFO schedule.ScheduleTest - hookOnNext: 26
13:38:15.031 [PUB2-3] INFO schedule.ScheduleTest - hookOnComplete
결과를 보면 첫 번째 publishOn()과 두 번째 publishOn() 사이의 map() 처리는 PUB1 스케줄러가 실행하고 두 번째 publishOn() 이후의 map(), 신호 처리는 PUB2 스케줄러가 실행한 것을 알 수 있다.
subscribeOn을 이용한 구독 처리 쓰레드 스케줄링
subscribeOn()을 사용하면 Subscriber가 시퀀스에 대한 request 신호를 별도 스케줄러로 처리한다. 즉 시퀀스(Flux나 Mono)를 실행할 스케줄러를 지정한다. 다음은 subscribeOn()의 사용예이다.
CountDownLatch latch = new CountDownLatch(1);
Flux.range(1, 6)
.log() // 보다 상세한 로그 출력 위함
.subscribeOn(Schedulers.newElastic("SUB"))
.map(i -> {
logger.info("map: {} + 10", i);
return i + 10;
})
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
logger.info("hookOnSubscribe"); // main thread
request(1);
}
@Override
protected void hookOnNext(Integer value) {
logger.info("hookOnNext: " + value); // SUB 쓰레드
request(1);
}
@Override
protected void hookOnComplete() {
logger.info("hookOnComplete"); // SUB 쓰레드
latch.countDown();
}
});
latch.await();
subscribeOn()으로 지정한 스케줄러는 시퀀스의 request 요청 처리뿐만 아니라 첫 번째 publishOn() 이전까지의 신호 처리를 실행한다. 따라서 위 코드를 실행하면 Flux.range()가 생성한 시퀀스의 신호 발생뿐만 아니라 map() 실행, Subscriber의 next, complete 신호 처리를 "SUB" 스케줄러가 실행한다. 참고로 시퀀스의 request 요청과 관련된 로그를 보기 위해 log() 메서드를 사용했다.
다음은 실행 결과이다.
14:56:24.996 [main] INFO schedule.ScheduleTest - hookOnSubscribe
14:56:25.005 [SUB-2] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
14:56:25.010 [SUB-2] INFO reactor.Flux.Range.1 - | request(1)
14:56:25.010 [SUB-2] INFO reactor.Flux.Range.1 - | onNext(1)
14:56:25.011 [SUB-2] INFO schedule.ScheduleTest - map: 1 + 10
14:56:25.016 [SUB-2] INFO schedule.ScheduleTest - hookOnNext: 11
14:56:25.016 [SUB-2] INFO reactor.Flux.Range.1 - | request(1)
14:56:25.016 [SUB-2] INFO reactor.Flux.Range.1 - | onNext(2)
14:56:25.016 [SUB-2] INFO schedule.ScheduleTest - map: 2 + 10
14:56:25.017 [SUB-2] INFO schedule.ScheduleTest - hookOnNext: 12
...(생략)
14:56:25.017 [SUB-2] INFO reactor.Flux.Range.1 - | request(1)
14:56:25.017 [SUB-2] INFO reactor.Flux.Range.1 - | onNext(6)
14:56:25.017 [SUB-2] INFO schedule.ScheduleTest - map: 6 + 10
14:56:25.017 [SUB-2] INFO schedule.ScheduleTest - hookOnNext: 16
14:56:25.017 [SUB-2] INFO reactor.Flux.Range.1 - | request(1)
14:56:25.018 [SUB-2] INFO reactor.Flux.Range.1 - | onComplete()
14:56:25.018 [SUB-2] INFO schedule.ScheduleTest - hookOnComplete
실행 결과에서 Flux.Range 타입은 Flux.range() 메서드가 생성한 시퀀스 객체의 타입이다. 위 결과에서 Flux.Range.1의 reques(1), onNext(), onComplete() 로그는 Subscriber의 request 신호를 처리하는 로그이다. 이 로그를 보면 SUB 스케줄러가 해당 기능을 실행하고 있음을 알 수 있다. 또한 map()과 Subscriber의 신호 처리 메서드(hookOnNext, hookOnComplete)도 SUB 스케줄러가 실행하고 있다.
subscribeOn() + publishOn() 조합
앞서 말했듯이 subscribeOn으로 지정한 스케줄러는 첫 번째 publishOn이 올때까지 적용된다. 다음 코드를 통해 이를 확인할 수 있다.
CountDownLatch latch = new CountDownLatch(1);
Flux.range(1, 6)
.log()
.subscribeOn(Schedulers.newElastic("SUB"))
.map(i -> {
logger.info("map1: " + i + " --> " + (i + 20));
return i + 20;
})
.map(i -> {
logger.info("mapBySub: " + i + " --> " + (i + 100));
return i + 100;
})
.publishOn(Schedulers.newElastic("PUB1"), 2)
.map(i -> {
logger.info("mapByPub1: " + i + " --> " + (i + 1000));
return i + 1000;
})
.publishOn(Schedulers.newElastic("PUB2"), 2)
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
logger.info("hookOnSubscribe");
request(1);
}
@Override
protected void hookOnNext(Integer value) {
logger.info("hookOnNext: " + value);
request(1);
}
@Override
protected void hookOnComplete() {
logger.info("hookOnComplete");
latch.countDown();
}
});
latch.await();
이 코드는 구독을 위한 "SUB" 스케줄러와 신호 처리를 위한 "PUB1", "PUB2" 스케줄러를 설정하고 있다.
다음은 실행 결과이다.
15:10:05.660 [main] INFO schedule.ScheduleTest - hookOnSubscribe
15:10:05.681 [SUB-6] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
15:10:05.687 [SUB-6] INFO reactor.Flux.Range.1 - | request(2)
15:10:05.688 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(1)
15:10:05.718 [SUB-6] INFO schedule.ScheduleTest - map1: 1 --> 21
15:10:05.719 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 21 --> 121
15:10:05.720 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(2)
15:10:05.720 [SUB-6] INFO schedule.ScheduleTest - map1: 2 --> 22
15:10:05.720 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 22 --> 122
15:10:05.721 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 121 --> 1121
15:10:05.722 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 122 --> 1122
15:10:05.734 [SUB-6] INFO reactor.Flux.Range.1 - | request(2)
15:10:05.735 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(3)
15:10:05.735 [SUB-6] INFO schedule.ScheduleTest - map1: 3 --> 23
15:10:05.735 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 23 --> 123
15:10:05.735 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(4)
15:10:05.735 [SUB-6] INFO schedule.ScheduleTest - map1: 4 --> 24
15:10:05.735 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 24 --> 124
15:10:05.736 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1121
15:10:05.736 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1122
15:10:05.736 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 123 --> 1123
15:10:05.736 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 124 --> 1124
15:10:05.736 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1123
15:10:05.736 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1124
15:10:05.736 [SUB-6] INFO reactor.Flux.Range.1 - | request(2)
15:10:05.736 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(5)
15:10:05.736 [SUB-6] INFO schedule.ScheduleTest - map1: 5 --> 25
15:10:05.736 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 25 --> 125
15:10:05.737 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(6)
15:10:05.737 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 125 --> 1125
15:10:05.737 [SUB-6] INFO schedule.ScheduleTest - map1: 6 --> 26
15:10:05.737 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 26 --> 126
15:10:05.737 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1125
15:10:05.737 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 126 --> 1126
15:10:05.737 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1126
15:10:05.737 [SUB-6] INFO reactor.Flux.Range.1 - | onComplete()
15:10:05.738 [PUB2-4] INFO schedule.ScheduleTest - hookOnComplete
실행 결과를 보면 첫 번째 publishOn()으로 PUB1 스케줄러를 지정하기 전까지는 SUB 스케줄러가 request 요청과 map1, mapBySub 변환을 처리하는 것을 확인할 수 있다.
[노트]
subscribeOn()이 publishOn() 뒤에 위치하면 실질적으로 prefetch할 때를 제외하면 적용되지 않는다. subscribeOn()은 원본 시퀀스의 신호 발생을 처리할 스케줄러를 지정하므로 시퀀스 생성 바로 뒤에 subscribeOn()을 지정하도록 하자. 또한 두 개 이상 subscribeOn()을 지정해도 첫 번째 subscribeOn()만 적용된다.
스케줄러 종류
스프링 리액터는 다음 스케줄러를 기본 제공한다.
- Schedulers.immediate() : 현재 쓰레드에서 실행한다.
- Schedulers.single() : 쓰레드가 한 개인 쓰레드 풀을 이용해서 실행한다. 즉 한 쓰레드를 공유한다.
- Schedulers.elastic() : 쓰레드 풀을 이용해서 실행한다. 블로킹 IO를 리액터로 처리할 때 적합하다. 쓰레드가 필요하면 새로 생성하고 일정 시간(기본 60초) 이상 유휴 상태인 쓰레드는 제거한다. 데몬 쓰레드를 생성한다.
- Schedulers.parallel() : 고정 크기 쓰레드 풀을 이용해서 실행한다. 병렬 작업에 적합하다.
single(), elastic(), parallel()은 매번 새로운 쓰레드 풀을 만들지 않고 동일한 쓰레드 풀을 리턴한다. 예를 들어 아래 코드에서 두 publishOn()은 같은 쓰레드 풀을 공유한다.
someFlux.publishOn(Schedulers.parallel())
.map(...)
.publishOn(Schedulers.parallel())
.subscribe(...);
single(), elastic(), parallel()이 생성하는 쓰레드는 데몬 쓰레드로서 main 쓰레드가 종료되면 함께 종료된다.
같은 종류의 쓰레드 풀인데 새로 생성하고 싶다면 다음 메서드를 사용하면 된다.
- newSingle(String name)
- newSingle(String name, boolean daemon)
- newElastic(String name)
- newElastic(String name, int ttlSeconds)
- newElastic(String name, int ttlSeconds, boolean daemon)
- newParallel(String name)
- newParallel(String name, int parallelism)
- newParallel(String name, int parallelism, boolean daemon)
각 파라미터는 다음과 같다.
- name : 쓰레드 이름으로 사용할 접두사이다.
- daemon : 데몬 쓰레드 여부를 지정한다. 지정하지 않으면 false이다. 데몬 쓰레드가 아닌 경우 JVM 종료시에 생성한 스케줄러의 dispose()를 호출해서 풀에 있는 쓰레드를 종료해야 한다.
- ttlSeconds : elastic 쓰레드 풀의 쓰레드 유휴 시간을 지정한다. 지정하지 않으면 60(초)이다.
- parallelism : 작업 쓰레드 개수를 지정한다. 지정하지 않으면 Runtime.getRuntime().availableProcessors()이 리턴한 값을 사용한다.
newXXX() 로 생성하는 쓰레드 풀은 기본으로 데몬 쓰레드가 아니기 때문에 어플리케이션 종료시에는 다음과 같이 dispose() 메서드를 호출해서 쓰레드를 종료시켜 주어야 한다. 그렇지 않으면 어플리케이션이 종료되지 않는 문제가 발생할 수 있다.
// 비데몬 스케줄러 초기화
Scheduler scheduler = Schedulers.newElastic("SUB", 60, false);
// 비데몬 스케줄러 사용
someFlux.publishOn(scheduler)
.map(...)
.subscribe(...)
// 어플리케이션 종료시에 스케줄러 종료 처리
scheduler.dispose();
병렬 처리와 관련된 내용은 다음에 더 자세히 살펴본다.
일정 주기로 tick 발생: Flux.interval
Flux.interval()을 사용하면 일정 주기로 신호를 발생할 수 있다. 발생 순서에 따라 발생한 정수 값을 1씩 증가시킨다. 다음은 간단한 사용 예이다.
Flux.interval(Duration.ofSeconds(1)) // Flux<Long>
.subscribe(tick -> System.out.println("Tick " + tick));
Thread.sleep(5000);
1초 간격으로 신호가 발생하는 것을 알 수 있다.
interval()은 Schedulers.parallel()를 사용해서 신호를 주기적으로 발생한다. 다른 스케줄러를 사용하고 싶다면 internval(Duration, Scheduler) 메서드를 사용하면 된다.
관련글
스프링 리액터 시작하기 5 - 에러 처리
에러 처리
시퀀스는 데이터를 발생하는 과정에서 에러를 발생할 수 있다. 리액터는 에러를 처리하는 여러 방법을 제공하는데 이 글에서는 레퍼런스 문서에서 언급하는 에러 처리 방법을 차례대로 살펴볼 것이다.
참고로 에러 신호는 종료 신호이다. 따라서 에러 신호가 발생하면 시퀀스는 종료되고 더 이상 데이터를 발생하지 않는다.
에러 신호 처리
에러 신호가 발생하면 Subscriber의 onError 메서드가 호출된다. 이 메서드를 구현한 Subscriber를 이용해서 구독을 하면 에러 신호를 알맞게 처리할 수 있다. 또한 에러 처리를 위한 Consumer를 파라미터로 갖는 subscribe() 메서드를 사용해서 익셉션을 처리할 수 있다. 다음 코드는 에러 처리를 위한 Consumer를 파라미터로 갖는 subscribe() 메서드의 사용 예를 보여준다.
Flux.range(1, 10)
.map(x -> {
if (x == 5) throw new RuntimeException("exception"); // 에러 발생
else return x;
})
.subscribe(
i -> System.out.println(i), // next 신호 처리
ex -> System.err.println(ex.getMessage()), // error 신호 처리
() -> System.out.println("complete") // complete 신호 처리
);
위 코드는 1부터 10개의 값을 발생하는데 값이 5이면 익셉션을 발생하는 시퀀스를 생성한다. subscribe() 메서드는 3개의 인자를 갖는데 차례대로 next, error, complete 신호를 처리한다. 실행 결과는 다음과 같다.
1
2
3
4
exception
에러 신호는 종료 신호이므로 익셉션 발생 이후에 더 이상 next 신호가 발생하지 않는 것을 확인할 수 있다.
에러 신호를 처리하기 위한 subscribe() 메서드는 다음과 같다.
- subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer) - subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer) - subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer) - subscribe(Subscriber<? super T> subscriber)
에러 발생하면 기본 값 사용하기: onErrorReturn
에러가 발생할 때 에러 대신에 특정 값을 발생하고 싶다면 onErrorReturn() 메서드를 사용한다. 이 메서드의 사용 예는 다음과 같다.
Flux<Integer> seq = Flux.range(1, 10)
.map(x -> {
if (x == 5) throw new RuntimeException("exception");
else return x;
})
.onErrorReturn(-1);
seq.subscribe(System.out::println);
위 코드를 실행한 결과는 다음과 같다.
1
2
3
4
-1
실행 결과에서 알 수 있듯이 에러 대신에 값을 발생한 뒤에 시퀀스는 종료된다.
발생한 익셉션이 특정 조건을 충족하는 경우에만 에러 대신에 특정 값을 발생하고 싶다면 Predicate을 인자로 받는 onErrorReturn() 메서드를 사용한다. 익셉션이 특정 타입인 경우에만 에러 대신 특정 값을 발생하고 싶다면 Class 타입을 인자로 받는 onErrorReturn() 메서드를 사용한다.
- Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue)
- <E extends Throwable> Flux<T> onErrorReturn(Class<E> type, T fallbackValue)
에러 발생하면 다른 신호(시퀀스)나 다른 에러로 대체하기: onErrorResume
onErrorResume 메서드를 사용하면 에러가 발생하면 다른 시퀀스나 에러로 대체할 수 있다. onErrorResume 메서드는 다음 타입의 함수를 파라미터로 갖는다.
- Function<? super Throwable, ? extends Publisher<? extends T>> :
Throwable을 입력으로 받고 Publisher를 리턴하는 함수
이 코드에서 onErrorResume() 메서드에 전달한 함수를 보자. 이 함수는 발생한 에러가 IllegalArgumentException이면 Flux.just(21, 22)로 21, 22 값을 생성하는 시퀀스를 리턴하고, 발생한 에러가 IllegalStateException이면 31, 32 값을 생성하는 시퀀스를 리턴한다. 두 조건에 해당하지 않는 경우 Flux.error() 메서드를 이용해서 익셉션을 다시 재발생시킨다.
map() 함수에서 임의로 익셉션을 발생시키도록 했으므로 실행할 때마다 결과가 달라진다. 다음은 여러 번 실행한 결과 중 하나이다. 이 실행 결과는 3번째 데이터를 map에서 처리하는 과정에서 IllegalArgumentException이 발생했고 onErrorResume()을 통해 에러 대신에 21, 22를 생성하는 Flux로 대체된 것을 보여준다.
1
2
21
22
onErrorReturn과 마찬가지로 onErrorResume도 Predicate이나 Class 타입을 파라미터로 갖는 메서드를 이용해서 처리할 익셉션을 제한할 수 있다.
에러를 다른 에러로 변환하기: onErrorMap
에러를 다른 에러로 변환할 때에는 onErrorMap을 사용한다. 다음은 간단한 사용 예이다.
Flux<Integer> seq = intSeq.onErrorMap(error -> new MyException(...));
onErrorMap도 Predicate이나 Class 타입을 파라미터로 갖는 메서드를 이용해서 변환할 익셉션을 제한할 수 있다.
재시도하기: retry
retry()를 사용하면 에러가 발생했을 구독을 재시도할 수 있다. 다음은 예제 코드이다.
Flux.range(1, 5)
.map(input -> {
if (input < 4) return "num " + input;
throw new RuntimeException("boom");
})
.retry(1) // 에러 신호 발생시 1회 재시도
.subscribe(System.out::println, System.err::println);
위 코드를 실행한 결과는 다음과 같다.
num 1
num 2
num 3
num 1
num 2
num 3
java.lang.RuntimeException: boom
재시도를 하면 원본 시퀀스를 다시 구독한다. 이런 이유로 위 결과는 에러가 처음 발생했을 때 다시 1부터 신호가 발생하고 있다. 두 번째 에러가 발생했을 때에는 재시도를 하지 않으므로 에러 메시지가 출력되는 것을 알 수 있다.
재시도하기: retryWhen
단순 재시도가 아닌 조금 더 복잡한 상황에 따라 재시도를 하고 싶다면 retryWhen을 사용한다. retryWhen 메서드는 사용법이 다소 복잡하다. 먼저 retryWhen 메서드의 파라미터를 보자. 파라미터 타입은 다음과 같다.
- retryWhen(Function< Flux<Throwable>, ? extends Publisher<?> > whenFactory)
- 에러가 발생할 때마다 에러가 컴페니언 Flux로 전달된다.
- 컴페니언 Flux가 뭐든 발생하면 재시도가 일어난다.
- 컴페니언 Flux가 종료되면 재시도를 하지 않고 원본 시퀀스 역시 종료된다.
- 컴페니언 Flux가 에러를 발생하면 재시도를 하지 않고 컴페니언 Flux가 발생한 에러를 전파한다.
Flux<Integer> seq = Flux.just(1, 2, 3)
.map(i -> {
if (i < 3) return i;
else throw new IllegalStateException("force");
})
.retryWhen(errorsFlux -> errorsFlux.take(2)); // 2개의 데이터 발생
seq.subscribe(
System.out::println,
err -> System.err.println("에러 발생: " + err),
() -> System.out.println("compelte")
);
위 코드에서 retryWhen은 take(2)를 사용해서 2개의 데이터를 발생하는 컴페니언 Flux를 리턴한다. 이 컴페니언 Flux는 2개의 데이터를 발생하고 종료된다. 즉 2-3번 과정에 의해 2번 재시도를 하고 원본 시퀀스를 종료시킨다. 실제 실행 결과는 다음과 같다. 괄호 안의 파란 글씨는 재시도 여부를 표시한 것으로 실제 출력에 포함된 내용은 아니다.
1
2
1 (1번 재시도)
2
1 (2번 재시도)
2
complete
출력 결과에서 눈여겨 볼 점은 에러가 출력되지 않았다는 점이다. 즉 컴페니언 Flux가 complete 신호를 보내면 Subscriber에도 complete 신호가 전달되고 있다. 이 점은 retry()와 다르다. retry()는 최대 재시도 이후에도 에러가 발생하면 해당 에러를 Subscriber에 전달하는데 retryWhen은 컴페니언 Flux가 어떤 데이터를 발생하느냐에 따라 에러를 Subscriber에 전달할지 여부가 달라진다.
다음 코드는 컴페니언 Flux가 에러를 발생하는 예이다.
Flux<Integer> seq = Flux.just(1, 2, 3)
.map(i -> {
if (i < 3) return i;
else throw new IllegalStateException("force");
})
.retryWhen(errorsFlux -> errorsFlux.zipWith(Flux.range(1, 3),
(error, index) -> {
if (index < 3) return index;
else throw new RuntimeException("companion error"); //
})
);
seq.subscribe(
System.out::println,
err -> System.err.println("에러 발생: " + err),
() -> System.out.println("compelte")
);
위 코드에서 retryWhen이 생성하는 컴페니언 Flux는 에러가 세 번째 발생하면 RuntimeException을 발생한다. 즉 두 번째 에러까지는 데이터 신호를 발생하고 세 번째 에러에 에러 신호를 발생한다. 따라서 두 번 재시도를 한다. 실행 결과는 다음과 같다.
1
2
1
2
1
2
에러 발생: java.lang.RuntimeException: companion error
실행 결과를 보면 두 번 재시도 후에 에러 신호를 받은 것을 알 수 있다.
관련 글
스프링 리액터 시작하기 4 - 시퀀스 변환 기초
시퀀스 변환
이 글에서는 시퀀스가 발생하는 데이터를 변환하는 몇 가지 방법을 살펴본다.
1-1 변환: map
첫 번째는 map()이다. map() 한 개의 데이터를 1-1 방식으로 변환해준다. 자바 스트림의 map()과 유사하다. 다음 코드는 map()의 예를 보여준다.
Flux.just("a", "bc", "def", "wxyz")
.map(str -> str.length()) // 문자열을 Integer 값으로 1-1 변환
.subscribe(len -> System.out.println(len));
1-n 변환: flatMap
flatMap은 1개의 데이터로부터 시퀀스를 생성할 때 사용한다. 즉 1-n 방식의 변환을 처리한다. 다음은 간단한 flatMap() 사용 예이다.
Flux<Integer> seq = Flux.just(1, 2, 3)
.flatMap(i -> Flux.range(1, i)) // Integer를 Flux<Integer>로 1-N 변환
seq.subscribe(System.out::println);
flatMap()에 전달한 함수를 보면 Integer 값을 받아서 다시 1부터 해당 값 개수만큼의 숫자를 생성하는 Flux를 생성한다. 위 코드를 보면 다음과 같은 변환이 발생한다.
- 1 -> Flux.range(1, 1) : [1] 생성
- 2 -> Flux.range(1, 2) : [1, 2] 생성
- 3 -> Flux.range(1, 3) : [1, 2, 3] 생성
걸러내기: filter
filter()를 이용해서 시퀀스가 생성한 데이터를 걸러낼 수 있다. filter()에 전달한 함수의 결과가 true인 데이터만 전달하고 false인 데이터는 발생하지 않는다. 다음은 1부터 10 사이의 숫자 중에서 2로 나눠 나머지가 0인 (즉 짝수인) 숫자만 걸러내는 예를 보여준다.
Flux.range(1, 10)
.filter(num -> num % 2 == 0)
.subscribe(x -> System.out.print(x + " -> "));
실행 결과는 다음과 같다.
2 -> 4 -> 6 -> 8 -> 10 ->
빈 시퀀스인 경우 기본 값 사용하기: defaultIfEmpty
시퀀스에 데이터가 없을 때 특정 값을 기본으로 사용하고 싶다면 defaultIfEmpty() 메서드를 사용하면 된다. Mono와 Flux 모두 defaultIfEmpty()를 제공한다. 아래 코드는 사용 예이다.
Flux<Item> popularItems = getPopularItems();
Flux<Item> recItems = popularItems.defaultIfEmpty(featureItem);
getPopularItems() 메서드가 인기 상품 목록을 제공한다고 하자. 이 코드는 인기 상품 목록을 제공하는 시퀀스인 popularItems가 데이터가 없는 빈 시퀀스면 featureItem을 값으로 제공하는 Flux로 변환한다. 즉 인기 상품이 있으면 그 상품 목록을 제공하고 그렇지 않으면 미리 지정한 featureItem을 값으로 제공하는 Flux를 생성한다.
위 코드를 조금 더 간결하게 표현하면 다음과 같다.
Flux<Item> recItems = getPopularItems().defaultIfEmpty(featureItem);
빈 시퀀스인 경우 다른 시퀀스 사용하기: switchIfEmpty
// public Flux<Item> getPopularItems() { ... }
// public Flux<Item> getFeatureItems() { ... }
Flux<Item> recItems =
getPopularItems().switchIfEmpty(getFeatureItems());
특정 값으로 시작하는 시퀀스로 변환: startWith
특정 값으로 시작하도록 시퀀스를 설정하고 싶다면 startWith(T ...) 메서드나 startWith(시퀀스) 메서드를 사용한다.
Flux<Integer> seq1 = Flux.just(1, 2, 3);
Flux<Integer> seq2 = seq1.startWith(-1, 0);
seq2.subscribe(System.out::println);
이 코드에서 seq1은 1, 2, 3을 생성하는 시퀀스인데 startWith(-1, 0)을 사용해서 -1, 0으로 시작하는 시퀀스로 변환했다. 따라서 seq2가 생성하는 데이터는 -1, 0, 1, 2, 3이 된다.
특정 값으로 끝나는 시퀀스로 변환: concatWithValues
시퀀스가 특정 값으로 끝나도록 변환하고 싶다면 concatWithValues(T ...) 메서드를 사용한다.
Flux<Integer> seq = someSeq.concatWithValues(100);
seq.subscribe(System.out::println);
위 코드는 someSeq가 어떤 값을 생성하는지에 상관없이 seq는 가장 마지막에 100을 생성한다.
시퀀스 순서대로 연결: cancatWith
concatWith() 메서드를 사용하면 여러 시퀀스를 순서대로 연결할 수 있다.
Flux<Integer> seq1 = Flux.just(1, 2, 3);
Flux<Integer> seq2 = Flux.just(4, 5, 6);
Flux<Integer> seq3 = Flux.just(7, 8, 9);
seq1.concatWith(seq2).concatWith(seq3).subscribe(System.out::println);
위 코드에서 seq1, seq2, seq3을 차례대로 연결하고 구독을 시작했다. 실행 결과는 1부터 9까지 정수를 출력한다.
concatWith로 연결한 시퀀스는 이전 시퀀스가 종료된 뒤에 구독을 시작한다. 위 예에서는 seq1이 종료된 뒤에 seq2 구독을 시작하고 seq2가 종료된 뒤에 seq3 구독을 시작한다.
시퀀스 발생 순서대로 섞기: mergeWith
시퀀스의 연결 순서가 아니라 시퀀스가 발생하는 데이터 순서대로 섞고 싶다면 mergeWith()를 사용한다. 다음은 예이다.
Flux<String> tick1 = Flux.interval(Duration.ofSeconds(1)).map(tick -> tick + "초틱");
Flux<String> tick2 = Flux.interval(Duration.ofMillis(700)).map(tick -> tick + "밀리초틱");
tick1.mergeWith(tick2).subscribe(System.out::println);
위 코드에서 tick1은 1초 간격으로 데이터를 발생하고 tick2는 700 밀리초 단위로 데이터를 발생한다. 이 두 시퀀스를 mergeWith()로 섞은 뒤 구독하면 두 시퀀스를 동시에 구독한다. 실행 결과는 다음과 같다.
0밀리초틱
0초틱
1밀리초틱
1초틱
2밀리초틱
3밀리초틱
2초틱
4밀리초틱
3초틱
5밀리초틱
...
다음은 실행 결과를 발생 시점 기준으로 그림으로 표시한 것이다. 1초 간격으로 데이터를 발생하는 seq1과 0.7초 간격으로 데이터를 발생하는 seq2를 mergeWith()로 섞을 때 데이터 발생 순서를 알 수 있다.
시퀀스 묶기: zipWith
zipWith()를 사용하면 두 시퀀스의 값을 묶은 값 쌍을 생성하는 시퀀스를 생성할 수 있다. 다음은 사용 예이다.
Flux<String> tick1 = Flux.interval(Duration.ofSeconds(1)).map(tick -> tick + "초틱");
Flux<String> tick2 = Flux.interval(Duration.ofMillis(700)).map(tick -> tick + "밀리초틱");
tick1.zipWith(tick2).subscribe(tup -> System.out.println(tup));
시퀀스 묶기: combineLatest
Flux.combineLatest() 메서드로 시퀀스를 묶을 수도 있다. 이 메서드 정적 메서드이다. 발생한 개수를 맞춰서 쌍을 만드는 zipWith()와 달리 combineLatest()는 가장 최근의 데이터를 쌍으로 만든다. 다음은 그 차이를 보여준다.
다음은 예제 코드이다.
Flux<String> tick1 = Flux.interval(Duration.ofSeconds(1)).map(tick -> tick + "초틱");
Flux<String> tick2 = Flux.interval(Duration.ofMillis(700)).map(tick -> tick + "밀리초틱");
Flux.combineLatest(tick1, tick2, (a, b) -> a + "\n" + b).subscribe(System.out::println);
지정한 개수/시간에 해당하는 데이터만 유지: take, takeLast
지정한 개수/시간만큼 데이터 거르기: skip, skipLast
관련 글