주요글: 도커 시작하기
반응형

병렬(Parallel) 처리

시퀀스는 순차적으로 next 신호를 발생하고 Subscriber는 순차적으로 신호를 처리한다. 리액터는 시퀀스가 발생하는 next 신호를 병렬로 처리할 수 있는 방법을 제공한다. 이 글에서는 Flux의 parallel()을 사용하는 방법과 zip()을 이용한 방법에 대해 살펴본다.


parallel()과 runOn()으로 Flux 병렬 처리하기

Flux#parallel()과 runOn()을 사용하면 Flux가 생성하는 next 신호를 병렬로 처리할 수 있다. 다음 예를 보자.


Flux.range(1, 20)

        .parallel(2) // 작업을 레일로 나누기만 함

        .runOn(Schedulers.newParallel("PAR", 2))  // 각 레일을 병렬로 실행

        .map(x -> {

            int sleepTime = nextSleepTime(x % 2 == 0 ? 50 : 100, x % 2 == 0 ? 150 : 300);

            logger.info("map1 {}, sleepTime {}", x, sleepTime);

            sleep(sleepTime);

            return String.format("%02d", x);

        })

        .subscribe(i -> logger.info("next {}", i) );


// nextSleepTime은 인자로 받은 두 정수 값 범위에 해당하는 임의의 값을 생성한다고 가정


Flux#parallel(int parallelism) 메서드는 Flux가 생성하는 next 신호를 parallelism 개수만큼 라운드 로빈 방식으로 신호를 나눈다. 분리한 신호는 일종의 신호를 전달할 레일(rail)을 구성한다. 위 코드는 2를 값으로 주었으므로 2개의 레일을 생성한다. 라운드 로빈 방식을 사용해서 각 레일에 값을 전달하므로 위 코드는 [1, 3, 5, .., 19]를 제공하는 레일과 [2, 4, 6, ..., 20]를 제공하는 레일을 생성한다.


parallel()로 여러 레일을 만든다고 해서 병렬로 신호를 처리하는 것은 아니다. parallel()은 병렬로 신호를 처리할 수 있는 ParallelFlux를 리턴하는데, ParallelFlux의 runOn() 메서드에 다중 쓰레드를 사용하는 스케줄러를 전달해야 병렬로 신호를 처리할 수 있다. 위 코드는 2개 쓰레드를 사용하는 parallel 스케줄러를 전달했으므로 동시에 2개 레일로부터 오는 신호를 처리하게 된다.


병렬로 처리되는 것을 확인하기 위해 map() 메서드는 값이 짝수인 경우 50~150 밀리초, 홀수인 경우 100~300 밀리초 동안 슬립하고 문자열로 변환한 값을 리턴하도록 구현했다. parallel()은 라운드 로빈 방식으로 레일을 나누므로 짝수 레일과 홀수 레일이 생성되므로 슬립 타임 구간이 작은 짝수 레일이 더 빨리 끝나게 된다.


실제 결과를 확인해보자.


13:45:14.272 [PAR-1] INFO parallel.ParallelTest - map1 1, sleepTime 117

13:45:14.272 [PAR-2] INFO parallel.ParallelTest - map1 2, sleepTime 96

13:45:14.378 [PAR-2] INFO parallel.ParallelTest - next 02

13:45:14.378 [PAR-2] INFO parallel.ParallelTest - map1 4, sleepTime 98

13:45:14.399 [PAR-1] INFO parallel.ParallelTest - next 01

13:45:14.399 [PAR-1] INFO parallel.ParallelTest - map1 3, sleepTime 268

13:45:14.477 [PAR-2] INFO parallel.ParallelTest - next 04

13:45:14.477 [PAR-2] INFO parallel.ParallelTest - map1 6, sleepTime 93

13:45:14.570 [PAR-2] INFO parallel.ParallelTest - next 06

...생략

13:45:14.868 [PAR-2] INFO parallel.ParallelTest - map1 16, sleepTime 50

13:45:14.905 [PAR-1] INFO parallel.ParallelTest - next 05

13:45:14.905 [PAR-1] INFO parallel.ParallelTest - map1 7, sleepTime 201

13:45:14.918 [PAR-2] INFO parallel.ParallelTest - next 16

13:45:14.918 [PAR-2] INFO parallel.ParallelTest - map1 18, sleepTime 122

13:45:15.040 [PAR-2] INFO parallel.ParallelTest - next 18

13:45:15.040 [PAR-2] INFO parallel.ParallelTest - map1 20, sleepTime 62

13:45:15.102 [PAR-2] INFO parallel.ParallelTest - next 20

13:45:15.106 [PAR-1] INFO parallel.ParallelTest - next 07

13:45:15.106 [PAR-1] INFO parallel.ParallelTest - map1 9, sleepTime 202

13:45:15.308 [PAR-1] INFO parallel.ParallelTest - next 09

13:45:15.308 [PAR-1] INFO parallel.ParallelTest - map1 11, sleepTime 131

13:45:15.439 [PAR-1] INFO parallel.ParallelTest - next 11

13:45:15.439 [PAR-1] INFO parallel.ParallelTest - map1 13, sleepTime 289

13:45:15.728 [PAR-1] INFO parallel.ParallelTest - next 13

13:45:15.728 [PAR-1] INFO parallel.ParallelTest - map1 15, sleepTime 288

13:45:16.017 [PAR-1] INFO parallel.ParallelTest - next 15

13:45:16.017 [PAR-1] INFO parallel.ParallelTest - map1 17, sleepTime 156

13:45:16.173 [PAR-1] INFO parallel.ParallelTest - next 17

13:45:16.173 [PAR-1] INFO parallel.ParallelTest - map1 19, sleepTime 247

13:45:16.420 [PAR-1] INFO parallel.ParallelTest - next 19


실행 결과를 보면 PAR-1 쓰레드는 홀수를 PAR-2는 짝수를 처리하는 것을 알 수 있다. 즉 쓰레드마다 한 레일을 처리하고 있다. 짝수인 경우 슬립 타임을 더 작은 범위로 주었으므로 짝수 레일을 처리한 PAR-2가 먼저 레일을 처리하고 있다.

아래와 같이 레일은 4개로 나누었는데 쓰레드가 2개인 병렬 스케줄러를 사용하면 어떻게 될까?

Flux.range(1, 20)
        .parallel(4)
        .runOn(Schedulers.newParallel("PAR", 2))
        .map(x -> {
            ...
        })
        .subscribe(i -> logger.info("next {}", i) );


이 경우 스케줄러는 2개의 레일을 먼저 처리한다. 한 레일에 남아 있는 데이터가 없으면 데이터가 남아 있는 다른 레일을 처리한다.


레일당 크기

ParallelFlux#runOn() 메서드는 기본적으로 한 레일 당 Queues.SMALL_BUFFER_SIZE 만큼의 데이터를 저장한다. (이 값은 reactor.bufferSize.small 시스템 프로퍼티 값을 사용하는데 이 값을 지정하지 않으면 256을 사용하고 이 값이 16보다 작으면 16을 사용한다.)


레일에 미리 채울 데이터 개수를 변경하려면 다음과 같이 runOn() 메서드의 두 번째 인자로 값을 주면 된다. 다음 코드는 레일에 미리 채울 값(prefetch)으로 2를 사용한 예이다.


Flux.range(1, 20)

        .parallel(4)

        .runOn(Schedulers.newParallel("PAR", 2), 2) // 레일에 미리 채울 값으로 2 사용

        .subscribe(x -> logger.info("next {}", x));


위 코드의 경우 최초에 각 레일에 다음과 같이 데이터가 채워진다.

레일0: 1, 5
레일1: 2, 6
레일2: 3, 7
레일3: 4, 8


스케줄러는 2개의 쓰레드를 사용하는데 두 쓰레드를 PAR-1, PAR-2라고 하자. 이 두 쓰레드가 처음에 각각 레일0과 레일1을 선택했다고 하자.


레일0: 1, 5 (PAR-1)
레일1: 2, 6 (PAR-2)
레일2: 3, 7
레일3: 4, 8

두 쓰레드가 레일의 데이터를 처리하면 상태는 다음과 같이 바뀐다.


레일0: (PAR-1)
레일1: (PAR-2)
레일2: 3, 7
레일3: 4, 8

이 상태에서 PAR-2가 레일1이 비어있는지 여부를 검사한다면 레일이 비워져 있으므로 다음 레일을 선택한다. 이때 레일3을 선택했다고 하자. 그리고 PAR-1이 레일0이 비어있는지 여부를 검사하기 전에 레일0과 레일1이 채워졌다고 하자. 그럼 상태는 다음과 같이 바뀐다.


레일0: 9 (PAR-1)
레일1: 10
레일2: 3, 7 
레일3: 4, 8 (PAR-2)

그러면 PAR-2는 4를 처리하고 PAR-1은 9를 처리한다. PAR-1이 9를 처리하는 동안에 레일0에 데이터가 채워지지 않았다면 다음 레일을 선택하는데 이때 레일1을 선택할 수 있다.

레일0: 11,
레일1: 10 (PAR-1)
레일2: 3, 7 
레일3: 8 (PAR-2)

이렇게 병렬 스케줄러의 쓰레드 개수가 레일 개수보다 작으면 그때 그때 레일의 데이터 개수에 따라 스케줄러가 선택하는 레일이 달라지게 된다.

Mono.zip()으로 병렬 처리하기

각 Mono의 구독 처리 쓰레드를 병렬 스케줄러로 실행하고 Mono.zip() 메서드를 이용해서 Mono를 묶으면 각 Mono를 병렬로 처리할 수 있다. 다음은 예제 코드이다.


Mono m1 = Mono.just(1).map(x -> {

    logger.info("1 sleep");

    sleep(1500);

    return x;

}).subscribeOn(Schedulers.parallel());


Mono m2 = Mono.just(2).map(x -> {

    logger.info("2 sleep");

    sleep(3000);

    return x;

}).subscribeOn(Schedulers.parallel());


Mono m3 = Mono.just(3).map(x -> {

    logger.info("3 sleep");

    sleep(2000);

    return x;

}).subscribeOn(Schedulers.parallel());


logger.info("Mono.zip(m1, m2, m3)");


Mono.zip(m1, m2, m3)

        .subscribe(tup -> logger.info("next: {}", tup);


위 코드에서 m1, m2, m3는 각각 1.5초, 3초, 2초간 슬립한다. 각각은 subscribeOn()을 이용해서 Parallel 스케줄러를 이용해서 구독 요청을 처리하도록 했다. 그리고 Mono.zip()으로 m1, m2, m3를 묶었다.


실제 실행 결과를 보면 m1, m2, m3가 슬립을 동시에 시작하고 약 3초 뒤에 세 Mono의 값을 묶은 Tuple3의 값을 출력하는 것을 알 수 있다. 이를 통해 m1, m2, m3를 동시에 실행했음을 확인할 수 있다.


16:12:34.424 [main] INFO parallel.ParallelTest - Mono.zip(m1, m2, m3)

16:12:34.447 [parallel-1] INFO parallel.ParallelTest - 1 sleep

16:12:34.447 [parallel-3] INFO parallel.ParallelTest - 3 sleep

16:12:34.447 [parallel-2] INFO parallel.ParallelTest - 2 sleep

16:12:37.469 [parallel-2] INFO parallel.ParallelTest - next: [1,2,3]



관련 글


+ Recent posts