리액터 모으기(aggregation) 연산
List 콜렉션으로 모으기: collectList()
Flux는 데이터를 콜렉션으로 모을 수 있는 기능을 제공한다. 이 중에서 List로 모아주는 collectList()는 다음과 같이 사용한다.
Mono<List<Integer>> mono = someFlux.collectList();
mono.subscribe(lst -> System.out.println(lst));
collectList()의 리턴 타입은 Mono<List<T>>이므로 Mono를 구독해서 값을 사용하면 된다.
Map 콜렉션으로 모으기: collectMap()
다음의 Flux#collectMap()을 이용해서 Map으로 모을 수도 있다.
- Mono<Map<K, T>> collectMap(Function<? super T, ? extends K> keyExtractor)
- Mono<Map<K, V>> collectMap(Function<? super T, ? extends K> keyExtractor,
Function<? super T, ? extends V> valueExtractor) - Mono<Map<K, V>> collectMap(Function<? super T, ? extends K> keyExtractor,
Function<? super T, ? extends V> valueExtractor,
Supplier<Map<K, V>> mapSupplier)
각 인자는 다음과 같다.
- keyExtractor : 데이터에서 맵의 키를 제공하는 함수
- valueExtractor : 데이터에서 맵의 값을 제공하는 함수
- mapSupplier : 사용할 Map 객체를 제공(mapSupplier가 없는 메서드는 기본으로 HashMap 사용)
다음 코드는 각 메서드의 사용 예이다.
// keyExtractor만 지정. 값은 그대로 사용.
Mono<Map<Integer, Tuple2<Integer, String>>> numTupMapMono =
Flux.just(Tuples.of(1, "일"), Tuples.of(2, "이"), Tuples.of(3, "삼"), Tuples.of(4, "사"))
.collectMap(x -> x.getT1()); // keyExtractor
// String을 리턴하는 valueExtractor 사용.
Mono<Map<Integer, String>> numLabelMapMono =
Flux.just(Tuples.of(1, "일"), Tuples.of(2, "이"), Tuples.of(3, "삼"), Tuples.of(4, "사"))
.collectMap(x -> x.getT1(), // keyExtractor
x -> x.getT2()); // valueExtractor
// Map으로 TreeMap 사용
Mono<Map<Integer, String>> numLabelTreeMapMono =
Flux.just(Tuples.of(1, "일"), Tuples.of(2, "이"), Tuples.of(3, "삼"), Tuples.of(4, "사"))
.collectMap(x -> x.getT1(), // keyExtractor
x -> x.getT2(), // valueExtractor
() -> new TreeMap<>()); // mapSupplier
collectMap은 중복된 키가 존재하면 마지막 데이터와 관련된 값이 사용된다. 예를 들어 아래 코드는 Flux가 생성하는 데이터는 4개지만 키로 사용하는 값이 중복되므로 실제 Map에는 2와 4 두 개의 데이터만 저장된다.
Map의 값을 콜렉션으로 모으기: collectMultiMap()
collectMultiMap()을 사용하면 같은 키를 가진 데이터를 List로 갖는 Map을 생성할 수 있다. 다음은 예제 코드이다.
Mono<Map<Integer, Collection<Integer>>> oddEvenList =
Flux.just(1, 2, 3, 4).collectMultimap(x -> x % 2);
oddEvenList.subscribe(map -> System.out.println(map)); // {0=[2, 4], 1=[1, 3]}
collectMultiMap() 메서드는 collectMap() 메서드와 동일한 파라미터를 갖는다.
개수 새기: count()
Flux#count() 메서드를 사용하면 개수를 제공하는 Mono를 리턴한다.
Mono<Long> countMono = Flux.just(1, 2, 3, 4).count();
누적 하기: reduce()
reduce()는 각 값에 연산을 누적해서 결과를 생성한다. Flux의 데이터를 이용해서 단일 값을 생성하는 범용 기능이라고 보면 된다. 첫 번째 살펴볼 reduce() 메서드 다음과 같다. 이 메서드는 Flux가 발생하는 데이터와 동일 타입으로 누적할 때 사용한다.
- Mono<T> reduce(BiFunction<T, T, T> aggregator)
aggregator는 인자가 두 개인 함수이다. 이 함수의 첫 번째 인자는 지금까지 누적된 값을 받으며, 두 번째 인자는 누적할 데이터를 받는다. aggregator는 두 인자를 이용해서 새로운 누적 값을 리턴한다. 새 누적 값은 다음 데이터를 aggregator 함수로 누적할 때 첫 번째 인자로 사용된다.
예를 들어 간단한 곱셈 기능을 reduce()를 이용해서 다음과 같이 구현할 수 있다.
Mono<Integer> mulMono = Flux.just(1, 2, 3, 4).reduce((acc, ele) -> acc * ele);
mulMono.subscribe(sum -> System.out.println("sum : " + sum);
acc는 이전까지 누적된 값인데, 두 번째 데이터를 누적할 때 첫 번째 데이터를 누적된 값(acc)으로 사용한다. 위 코드는 다음과 같은 계산을 거쳐 최종 값으로 24를 출력한다.
acc1 = 1 // 첫 번째 값을 누적 값의 초기 값으로 사용
acc2 = aggregator(acc1, 2) // 1 * 2
acc3 = aggregator(acc2, 3) // 2 * 3
acc4 = aggregator(acc3, 4) // 6 * 4
누적 값의 초기 값을 지정하고 싶거나 데이터와 다른 타입으로 누적하고 싶다면 다음 reduce() 메서드를 사용한다.
- Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator)
- Mono<A> reduceWith(Supplier<A> initial, BiFunction<A, ? super T, A> accumulator)
reduce()의 initial은 초기 값이고, reduceWith()의 initial은 초기값을 제공하는 Supplier이다. 다음은 초기 값을 사용하는 reduce() 메서드의 사용예이다.
Mono<String> strMono = Flux.just(1, 2, 3, 4)
.reduce("", (str, ele) -> str + "-" + ele.toString());
strMono.subscribe(System.out::println); // -1-2-3-4 출력
누적하면서 값 생성하기: scan()
데이터를 누적하면 중간 누적 결과를 데이터로 생성하고 싶다면 scan() 메서드를 사용한다. 최종 누적된 값 한 개만 발생하는 reduce()와 달리 scan()은 중간 결과를 포함한 여러 값을 생성하므로, scan()의 리턴 타입은 Flux이다. 다음은 같은 타입으로 누적한 결과를 발생하는 scan() 메서드이다.
- Flux<T> scan(BiFunction<T, T, T> accumulator)
리턴 타입이 Flux인 것을 제외하면 reduce()와 동일하다.
다음은 예제 코드이다.
Flux<Integer> seq = Flux.just(1, 2, 3, 4).scan((acc, x) -> acc * x);
seq.subscribe(System.out::println);
다음은 위 코드의 출력 결과이다. 중간 결과가 출력되는 것을 알 수 있다.
1
2
6
24
reduce()와 동일하게 누적 초기값을 갖는 메서드를 제공한다.
- Flux<A> scan(A initial, BiFunction<A, ? super T, A> accumulator)
- Flux<A> scanWith(Supplier<A> initial, BiFunction<A, ? super T, A> accumulator)
다음은 초기 값을 지정하는 사용 예이다.
Flux<Integer> seq = Flux.just(2, 3, 4).scan(1, (acc, x) -> acc * x);
seq.subscribe(System.out::println);
실행 결과는 다음과 같다.
1
2
6
24
결과를 보면 초기 값(1)도 시퀀스의 값으로 발생한 것을 알 수 있다.
데이터 조건 검사
모든/일부 데이터가 특정 조건을 충족하는지 검사할 때는 all()이나 any()를 사용한다.
Mono<Boolean> all = Flux.just(1, 2, 3, 4).all(x -> x > 2);
all.subscribe(b -> System.out.println("all: " + b)); // false
Mono<Boolean> any = Flux.just(1, 2, 3, 4).any(x -> x > 2);
any.subscribe(b -> System.out.println("any: " + b)); // true
데이터가 존재하는지 또는 특정 데이터를 포함하는지 검사할 때는 hasElements()나 hasElement()를 사용한다.
Mono<Boolean> hasElements = Flux.just(1, 2, 3, 4).hasElements();
hasElements.subscribe(b -> System.out.println("hasElements: " + b)); // true
Mono<Boolean> hasElement = Flux.just(1, 2, 3, 4).hasElement(3);
hasElement.subscribe(b -> System.out.println("hasElement: " + b)); // true