병렬(Parallel) 처리
시퀀스는 순차적으로 next 신호를 발생하고 Subscriber는 순차적으로 신호를 처리한다. 리액터는 시퀀스가 발생하는 next 신호를 병렬로 처리할 수 있는 방법을 제공한다. 이 글에서는 Flux의 parallel()을 사용하는 방법과 zip()을 이용한 방법에 대해 살펴본다.
parallel()과 runOn()으로 Flux 병렬 처리하기
Flux#parallel()과 runOn()을 사용하면 Flux가 생성하는 next 신호를 병렬로 처리할 수 있다. 다음 예를 보자.
Flux.range(1, 20)
.parallel(2) // 작업을 레일로 나누기만 함
.runOn(Schedulers.newParallel("PAR", 2)) // 각 레일을 병렬로 실행
.map(x -> {
int sleepTime = nextSleepTime(x % 2 == 0 ? 50 : 100, x % 2 == 0 ? 150 : 300);
logger.info("map1 {}, sleepTime {}", x, sleepTime);
sleep(sleepTime);
return String.format("%02d", x);
})
.subscribe(i -> logger.info("next {}", i) );
// nextSleepTime은 인자로 받은 두 정수 값 범위에 해당하는 임의의 값을 생성한다고 가정
Flux#parallel(int parallelism) 메서드는 Flux가 생성하는 next 신호를 parallelism 개수만큼 라운드 로빈 방식으로 신호를 나눈다. 분리한 신호는 일종의 신호를 전달할 레일(rail)을 구성한다. 위 코드는 2를 값으로 주었으므로 2개의 레일을 생성한다. 라운드 로빈 방식을 사용해서 각 레일에 값을 전달하므로 위 코드는 [1, 3, 5, .., 19]를 제공하는 레일과 [2, 4, 6, ..., 20]를 제공하는 레일을 생성한다.
parallel()로 여러 레일을 만든다고 해서 병렬로 신호를 처리하는 것은 아니다. parallel()은 병렬로 신호를 처리할 수 있는 ParallelFlux를 리턴하는데, ParallelFlux의 runOn() 메서드에 다중 쓰레드를 사용하는 스케줄러를 전달해야 병렬로 신호를 처리할 수 있다. 위 코드는 2개 쓰레드를 사용하는 parallel 스케줄러를 전달했으므로 동시에 2개 레일로부터 오는 신호를 처리하게 된다.
병렬로 처리되는 것을 확인하기 위해 map() 메서드는 값이 짝수인 경우 50~150 밀리초, 홀수인 경우 100~300 밀리초 동안 슬립하고 문자열로 변환한 값을 리턴하도록 구현했다. parallel()은 라운드 로빈 방식으로 레일을 나누므로 짝수 레일과 홀수 레일이 생성되므로 슬립 타임 구간이 작은 짝수 레일이 더 빨리 끝나게 된다.
실제 결과를 확인해보자.
13:45:14.272 [PAR-1] INFO parallel.ParallelTest - map1 1, sleepTime 117
13:45:14.272 [PAR-2] INFO parallel.ParallelTest - map1 2, sleepTime 96
13:45:14.378 [PAR-2] INFO parallel.ParallelTest - next 02
13:45:14.378 [PAR-2] INFO parallel.ParallelTest - map1 4, sleepTime 98
13:45:14.399 [PAR-1] INFO parallel.ParallelTest - next 01
13:45:14.399 [PAR-1] INFO parallel.ParallelTest - map1 3, sleepTime 268
13:45:14.477 [PAR-2] INFO parallel.ParallelTest - next 04
13:45:14.477 [PAR-2] INFO parallel.ParallelTest - map1 6, sleepTime 93
13:45:14.570 [PAR-2] INFO parallel.ParallelTest - next 06
...생략
13:45:14.868 [PAR-2] INFO parallel.ParallelTest - map1 16, sleepTime 50
13:45:14.905 [PAR-1] INFO parallel.ParallelTest - next 05
13:45:14.905 [PAR-1] INFO parallel.ParallelTest - map1 7, sleepTime 201
13:45:14.918 [PAR-2] INFO parallel.ParallelTest - next 16
13:45:14.918 [PAR-2] INFO parallel.ParallelTest - map1 18, sleepTime 122
13:45:15.040 [PAR-2] INFO parallel.ParallelTest - next 18
13:45:15.040 [PAR-2] INFO parallel.ParallelTest - map1 20, sleepTime 62
13:45:15.102 [PAR-2] INFO parallel.ParallelTest - next 20
13:45:15.106 [PAR-1] INFO parallel.ParallelTest - next 07
13:45:15.106 [PAR-1] INFO parallel.ParallelTest - map1 9, sleepTime 202
13:45:15.308 [PAR-1] INFO parallel.ParallelTest - next 09
13:45:15.308 [PAR-1] INFO parallel.ParallelTest - map1 11, sleepTime 131
13:45:15.439 [PAR-1] INFO parallel.ParallelTest - next 11
13:45:15.439 [PAR-1] INFO parallel.ParallelTest - map1 13, sleepTime 289
13:45:15.728 [PAR-1] INFO parallel.ParallelTest - next 13
13:45:15.728 [PAR-1] INFO parallel.ParallelTest - map1 15, sleepTime 288
13:45:16.017 [PAR-1] INFO parallel.ParallelTest - next 15
13:45:16.017 [PAR-1] INFO parallel.ParallelTest - map1 17, sleepTime 156
13:45:16.173 [PAR-1] INFO parallel.ParallelTest - next 17
13:45:16.173 [PAR-1] INFO parallel.ParallelTest - map1 19, sleepTime 247
13:45:16.420 [PAR-1] INFO parallel.ParallelTest - next 19
이 경우 스케줄러는 2개의 레일을 먼저 처리한다. 한 레일에 남아 있는 데이터가 없으면 데이터가 남아 있는 다른 레일을 처리한다.
레일당 크기
ParallelFlux#runOn() 메서드는 기본적으로 한 레일 당 Queues.SMALL_BUFFER_SIZE 만큼의 데이터를 저장한다. (이 값은 reactor.bufferSize.small 시스템 프로퍼티 값을 사용하는데 이 값을 지정하지 않으면 256을 사용하고 이 값이 16보다 작으면 16을 사용한다.)
레일에 미리 채울 데이터 개수를 변경하려면 다음과 같이 runOn() 메서드의 두 번째 인자로 값을 주면 된다. 다음 코드는 레일에 미리 채울 값(prefetch)으로 2를 사용한 예이다.
Flux.range(1, 20)
.parallel(4)
.runOn(Schedulers.newParallel("PAR", 2), 2) // 레일에 미리 채울 값으로 2 사용
.subscribe(x -> logger.info("next {}", x));
스케줄러는 2개의 쓰레드를 사용하는데 두 쓰레드를 PAR-1, PAR-2라고 하자. 이 두 쓰레드가 처음에 각각 레일0과 레일1을 선택했다고 하자.
두 쓰레드가 레일의 데이터를 처리하면 상태는 다음과 같이 바뀐다.
Mono.zip()으로 병렬 처리하기
각 Mono의 구독 처리 쓰레드를 병렬 스케줄러로 실행하고 Mono.zip() 메서드를 이용해서 Mono를 묶으면 각 Mono를 병렬로 처리할 수 있다. 다음은 예제 코드이다.
Mono m1 = Mono.just(1).map(x -> {
logger.info("1 sleep");
sleep(1500);
return x;
}).subscribeOn(Schedulers.parallel());
Mono m2 = Mono.just(2).map(x -> {
logger.info("2 sleep");
sleep(3000);
return x;
}).subscribeOn(Schedulers.parallel());
Mono m3 = Mono.just(3).map(x -> {
logger.info("3 sleep");
sleep(2000);
return x;
}).subscribeOn(Schedulers.parallel());
logger.info("Mono.zip(m1, m2, m3)");
Mono.zip(m1, m2, m3)
.subscribe(tup -> logger.info("next: {}", tup);
위 코드에서 m1, m2, m3는 각각 1.5초, 3초, 2초간 슬립한다. 각각은 subscribeOn()을 이용해서 Parallel 스케줄러를 이용해서 구독 요청을 처리하도록 했다. 그리고 Mono.zip()으로 m1, m2, m3를 묶었다.
실제 실행 결과를 보면 m1, m2, m3가 슬립을 동시에 시작하고 약 3초 뒤에 세 Mono의 값을 묶은 Tuple3의 값을 출력하는 것을 알 수 있다. 이를 통해 m1, m2, m3를 동시에 실행했음을 확인할 수 있다.
16:12:34.424 [main] INFO parallel.ParallelTest - Mono.zip(m1, m2, m3)
16:12:34.447 [parallel-1] INFO parallel.ParallelTest - 1 sleep
16:12:34.447 [parallel-3] INFO parallel.ParallelTest - 3 sleep
16:12:34.447 [parallel-2] INFO parallel.ParallelTest - 2 sleep
16:12:37.469 [parallel-2] INFO parallel.ParallelTest - next: [1,2,3]
관련 글