리액터 윈도우
일정 개수로 묶어서 Flux 만들기: window(int), window(int, int)
Flux#window(int) 메서드를 사용하면 시퀀스가 발생시키는 데이터를 일정 개수로 묶을 수 있다. 다음은 예제 코드이다.
Flux<Flux<Integer>> windowSeq =
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.window(4); // 4개 간격으로 4개씩 새로운 Flux로 묶음
windowSeq.subscribe(seq -> { // seq는 Flux<Integer>
Mono<List<Integer>> monoList = seq.collectList();
monoList.subscribe(list -> logger.info("window: {}", list));
});
위 코드에서 Flux#window(4)가 리턴하는 타입은 Flux<Flux<Integer>>이다. 즉 값이 Flux<Integer>인 Flux를 리턴한다. 이 시퀀스(Flux<Integer>)가 발생하는 값의 개수는 최대 4개이다. 위 코드의 실행 결과는 다음과 같다. 결과를 보면 4개씩 데이터를 묶어서 하나의 Flux로 만든 것을 알 수 있다.
01:19:52.388 [parallel-2] INFO batch.WindowTest - window: [5, 6, 7, 8]
01:19:52.388 [parallel-1] INFO batch.WindowTest - window: [1, 2, 3, 4]
01:19:52.391 [parallel-1] INFO batch.WindowTest - window: [9, 10]
Flux.window(int maxSize, int skip) 메서드를 사용하면 어느 간격으로 데이터를 묶을지 정할 수 있다. 두 번째 파라미터는 몇 개씩 건너서 데이터를 묶을 지 결정한다. 예를 들어 다음 코드를 보자.
Flux<Flux<Integer>> windowSeq =
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.window(4, 3); // 3개 간격마다 4개씩 새로운 Flux로 묶음
windowSeq.subscribe(seq -> { // seq는 Flux<Integer>
Mono<List<Integer>> monoList = seq.collectList();
monoList.subscribe(list -> logger.info("window: {}", list));
});
위 코드는 두 번째 인자로 3을 주었다. 이 경우 3개 데이터 간격으로 4개씩 데이터를 묶는다. 데이터를 묶는 간격이 데이터를 묶는 개수보다 작으므로 일부 데이터에 중복이 발생한다.
15:18:37.898 [main] INFO batch.WindowTest - window: [1, 2, 3, 4]
15:18:37.898 [main] INFO batch.WindowTest - window: [4, 5, 6, 7]
15:18:37.898 [main] INFO batch.WindowTest - window: [7, 8, 9, 10]
15:18:37.898 [main] INFO batch.WindowTest - window: [10]
다음과 같이 skip 파라미터 값으로 5를 주면 어떻게 될까? 데이터를 묶는 개수보다 간격이 더 크므로 일부 데이터에 누락이 발생할 것이다.
Flux<Flux<Integer>> windowSeq2 =
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.window(4, 5); // 5개 간격마다 4개씩 새로운 Flux 묶음
일정 시간 간격으로 묶어서 Flux 만들기: window(Duration), window(Duration, Duration)
Flux#window(Duration) 메서드를 사용하면 시퀀스가 발생시키는 데이터를 일정 시간마다 묶을 수 있다. 다음은 예제 코드이다.
Flux<Flux<Long>> windowSeq = Flux.interval(Duration.ofMillis(100))
.window(Duration.ofMillis(500)); // 500밀리초 간격마다 500밀리초 동안 데이터 묶음
이 코드는 500밀리초(0.5초) 동안 발생한 데이터를 묶는다.
데이터를 묶기 시작하는 간격을 지정하고 싶다면 Flux#window(Duration, Duration) 메서드를 사용한다.
Flux<Flux<Long>> windowSeq = Flux.interval(Duration.ofMillis(100))
// 400밀리초 간격마다 500밀리초 동안 데이터 묶음
.window(Duration.ofMillis(500), Duration.ofMillis(400));
특정 조건에 다다를 때가지 묶어서 Flux 만들기: windowUntil(Predicate)
특정 조건을 충족하는 데이터를 만날 때까지 묶어서 Flux로 만들고 싶다면 windowUntil()을 사용한다. 다음은 사용 예이다.
Flux.just(1,1,2,3,3,4,5)
.windowUntil(x -> x % 2 == 0)
.subscribe((Flux<Integer> seq) -> {
seq.collectList().subscribe(lst -> logger.info("window: {}", lst));
});
위 코드는 2로 나눠서 나머지가 0인(즉 짝수인) 값을 만날 때까지 묶는다. 실제 실행 결과를 보면 다음과 같다.
01:19:27.166 [main] INFO batch.WindowTest - window: [1, 1, 2]
01:19:27.169 [main] INFO batch.WindowTest - window: [3, 3, 4]
01:19:27.169 [main] INFO batch.WindowTest - window: [5]
다음과 같이 마지막 데이터가 조건에 해당하면 어떻게 될까?
Flux.just(1,1,2,3,3,4)
.windowUntil(x -> x % 2 == 0)
.subscribe(seq -> {
seq.collectList().subscribe(lst -> logger.info("window: {}", lst));
});
결과를 보면 다음과 같이 마지막에 빈 Flux가 하나 더 발생되는 것을 알 수 있다.
17:23:22.724 [main] INFO batch.WindowTest - window: [1, 1, 2]
17:23:22.727 [main] INFO batch.WindowTest - window: [3, 3, 4]
17:23:22.727 [main] INFO batch.WindowTest - window: []
특정 조건을 충족하는 동안 묶어서 Flux 만들기: windowWhile(Predicate)
Flux#windowWhile(Predicate)은 해당 조건을 충족하지 않는 데이터가 나올 때까지 묶어서 Flux를 만든다. 조건을 충족하지 않는 데이터로 시작하거나 연속해서 데이터가 조건을 충족하지 않으면 빈 윈도우를 생성한다.
Flux.just(1,1,2,4,3,3,4,6,8,9,10)
.windowWhile(x -> x % 2 == 0) // 짝수인 동안
.subscribe(seq -> {
seq.collectList().subscribe(lst -> logger.info("window: {}", lst));
});
이 코드의 결과는 다음과 같다.
01:07:00.239 [main] INFO batch.WindowTest - window: []
01:07:00.242 [main] INFO batch.WindowTest - window: []
01:07:00.242 [main] INFO batch.WindowTest - window: [2, 4]
01:07:00.242 [main] INFO batch.WindowTest - window: []
01:07:00.242 [main] INFO batch.WindowTest - window: [4, 6, 8]
01:07:00.242 [main] INFO batch.WindowTest - window: [10]
Flux 대신 List로 묶기: buffer류 메서드
window류 메서드가 Flux로 묶는다면 buffer류 메서드는 Collection으로 묶는다. 메서드 이름이 window에서 buffer로 바뀔뿐 시그너쳐는 동일하다. 다음은 buffer류 메서드의 사용 예이다.
Flux<List<Integer>> bufferSeq = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).buffer(4);
bufferSeq.subscribe(list -> logger.info("window: {}", list));
관련글