주요글: 도커 시작하기
반응형

리액터 모으기(aggregation) 연산


List 콜렉션으로 모으기: collectList()

Flux는 데이터를 콜렉션으로 모을 수 있는 기능을 제공한다. 이 중에서 List로 모아주는 collectList()는 다음과 같이 사용한다.


Mono<List<Integer>> mono = someFlux.collectList();

mono.subscribe(lst -> System.out.println(lst));


collectList()의 리턴 타입은 Mono<List<T>>이므로 Mono를 구독해서 값을 사용하면 된다.


Map 콜렉션으로 모으기: collectMap()

다음의 Flux#collectMap()을 이용해서 Map으로 모을 수도 있다.


  • Mono<Map<K, T>> collectMap(Function<? super T, ? extends K> keyExtractor)
  • Mono<Map<K, V>> collectMap(Function<? super T, ? extends K> keyExtractor,
                                              Function<? super T, ? extends V> valueExtractor)
  • Mono<Map<K, V>> collectMap(Function<? super T, ? extends K> keyExtractor,
                                              Function<? super T, ? extends V> valueExtractor,
                                              Supplier<Map<K, V>> mapSupplier)

각 인자는 다음과 같다.

  • keyExtractor : 데이터에서 맵의 키를 제공하는 함수
  • valueExtractor : 데이터에서 맵의 값을 제공하는 함수
  • mapSupplier : 사용할 Map 객체를 제공(mapSupplier가 없는 메서드는 기본으로 HashMap 사용)

다음 코드는 각 메서드의 사용 예이다.


// keyExtractor만 지정. 값은 그대로 사용.


Mono<Map<Integer, Tuple2<Integer, String>>> numTupMapMono =

        Flux.just(Tuples.of(1, "일"), Tuples.of(2, "이"), Tuples.of(3, "삼"), Tuples.of(4, "사"))

                .collectMap(x -> x.getT1()); // keyExtractor



// String을 리턴하는 valueExtractor 사용.


Mono<Map<Integer, String>> numLabelMapMono =

        Flux.just(Tuples.of(1, "일"), Tuples.of(2, "이"), Tuples.of(3, "삼"), Tuples.of(4, "사"))

                .collectMap(x -> x.getT1(), // keyExtractor

                        x -> x.getT2()); // valueExtractor



// Map으로 TreeMap 사용


Mono<Map<Integer, String>> numLabelTreeMapMono =

        Flux.just(Tuples.of(1, "일"), Tuples.of(2, "이"), Tuples.of(3, "삼"), Tuples.of(4, "사"))

                .collectMap(x -> x.getT1(), // keyExtractor

                        x -> x.getT2(), // valueExtractor

                        () -> new TreeMap<>()); // mapSupplier


collectMap은 중복된 키가 존재하면 마지막 데이터와 관련된 값이 사용된다. 예를 들어 아래 코드는 Flux가 생성하는 데이터는 4개지만 키로 사용하는 값이 중복되므로 실제 Map에는 2와 4 두 개의 데이터만 저장된다.


Flux.just(1, 2, 3, 4)
     .collectMap(x -> x % 2)
     .subscribe(map -> System.out.println(map)); // {0=4, 1=3}

Map의 값을 콜렉션으로 모으기: collectMultiMap()

collectMultiMap()을 사용하면 같은 키를 가진 데이터를 List로 갖는 Map을 생성할 수 있다. 다음은 예제 코드이다.


Mono<Map<Integer, Collection<Integer>>> oddEvenList =

        Flux.just(1, 2, 3, 4).collectMultimap(x -> x % 2);

oddEvenList.subscribe(map -> System.out.println(map)); // {0=[2, 4], 1=[1, 3]}


collectMultiMap() 메서드는 collectMap() 메서드와 동일한 파라미터를 갖는다.


개수 새기: count()

Flux#count() 메서드를 사용하면 개수를 제공하는 Mono를 리턴한다.


Mono<Long> countMono = Flux.just(1, 2, 3, 4).count();


누적 하기: reduce()

reduce()는 각 값에 연산을 누적해서 결과를 생성한다. Flux의 데이터를 이용해서 단일 값을 생성하는 범용 기능이라고 보면 된다. 첫 번째 살펴볼 reduce() 메서드 다음과 같다. 이 메서드는 Flux가 발생하는 데이터와 동일 타입으로 누적할 때 사용한다.

  • Mono<T> reduce(BiFunction<T, T, T> aggregator)

aggregator는 인자가 두 개인 함수이다. 이 함수의 첫 번째 인자는 지금까지 누적된 값을 받으며, 두 번째 인자는 누적할 데이터를 받는다. aggregator는 두 인자를 이용해서 새로운 누적 값을 리턴한다. 새 누적 값은 다음 데이터를 aggregator 함수로 누적할 때 첫 번째 인자로 사용된다.


예를 들어 간단한 곱셈 기능을 reduce()를 이용해서 다음과 같이 구현할 수 있다.


Mono<Integer> mulMono = Flux.just(1, 2, 3, 4).reduce((acc, ele) -> acc * ele);

mulMono.subscribe(sum -> System.out.println("sum : " + sum);


acc는 이전까지 누적된 값인데, 두 번째 데이터를 누적할 때 첫 번째 데이터를 누적된 값(acc)으로 사용한다. 위 코드는 다음과 같은 계산을 거쳐 최종 값으로 24를 출력한다.


acc1 = 1 // 첫 번째 값을 누적 값의 초기 값으로 사용

acc2 = aggregator(acc1, 2) // 1 * 2

acc3 = aggregator(acc2, 3) // 2 * 3

acc4 = aggregator(acc3, 4) // 6 * 4


누적 값의 초기 값을 지정하고 싶거나 데이터와 다른 타입으로 누적하고 싶다면 다음 reduce() 메서드를 사용한다.

  • Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator)
  • Mono<A> reduceWith(Supplier<A> initial, BiFunction<A, ? super T, A> accumulator)

reduce()의 initial은 초기 값이고, reduceWith()의 initial은 초기값을 제공하는 Supplier이다. 다음은 초기 값을 사용하는 reduce() 메서드의 사용예이다.


Mono<String> strMono = Flux.just(1, 2, 3, 4)

                                        .reduce("", (str, ele) -> str + "-" + ele.toString());

strMono.subscribe(System.out::println); // -1-2-3-4 출력


누적하면서 값 생성하기: scan()

데이터를 누적하면 중간 누적 결과를 데이터로 생성하고 싶다면 scan() 메서드를 사용한다. 최종 누적된 값 한 개만 발생하는 reduce()와 달리 scan()은 중간 결과를 포함한 여러 값을 생성하므로, scan()의 리턴 타입은 Flux이다. 다음은 같은 타입으로 누적한 결과를 발생하는 scan() 메서드이다.

  • Flux<T> scan(BiFunction<T, T, T> accumulator)

리턴 타입이 Flux인 것을 제외하면 reduce()와 동일하다. 


다음은 예제 코드이다.


Flux<Integer> seq = Flux.just(1, 2, 3, 4).scan((acc, x) -> acc * x);

seq.subscribe(System.out::println);


다음은 위 코드의 출력 결과이다. 중간 결과가 출력되는 것을 알 수 있다.


1

2

6

24


reduce()와 동일하게 누적 초기값을 갖는 메서드를 제공한다.

  • Flux<A> scan(A initial, BiFunction<A, ? super T, A> accumulator)
  • Flux<A> scanWith(Supplier<A> initial, BiFunction<A, ? super T, A> accumulator)

다음은 초기 값을 지정하는 사용 예이다.


Flux<Integer> seq = Flux.just(2, 3, 4).scan(1, (acc, x) -> acc * x);

seq.subscribe(System.out::println);


실행 결과는 다음과 같다.


1

2

6

24


결과를 보면 초기 값(1)도 시퀀스의 값으로 발생한 것을 알 수 있다.


데이터 조건 검사

모든/일부 데이터가 특정 조건을 충족하는지 검사할 때는 all()이나 any()를 사용한다.


Mono<Boolean> all = Flux.just(1, 2, 3, 4).all(x -> x > 2);

all.subscribe(b -> System.out.println("all: " + b)); // false


Mono<Boolean> any = Flux.just(1, 2, 3, 4).any(x -> x > 2);

any.subscribe(b -> System.out.println("any: " + b)); // true


데이터가 존재하는지 또는 특정 데이터를 포함하는지 검사할 때는 hasElements()나 hasElement()를 사용한다.


Mono<Boolean> hasElements = Flux.just(1, 2, 3, 4).hasElements();

hasElements.subscribe(b -> System.out.println("hasElements: " + b)); // true


Mono<Boolean> hasElement = Flux.just(1, 2, 3, 4).hasElement(3);

hasElement.subscribe(b -> System.out.println("hasElement: " + b)); // true


관련 글





+ Recent posts