저작권 안내: 저작권자표시 Yes 상업적이용 No 컨텐츠변경 No

스프링5 입문

JSP 2.3

JPA 입문

DDD Start

인프런 객체 지향 입문 강의

병렬(Parallel) 처리

시퀀스는 순차적으로 next 신호를 발생하고 Subscriber는 순차적으로 신호를 처리한다. 리액터는 시퀀스가 발생하는 next 신호를 병렬로 처리할 수 있는 방법을 제공한다. 이 글에서는 Flux의 parallel()을 사용하는 방법과 zip()을 이용한 방법에 대해 살펴본다.


parallel()과 runOn()으로 Flux 병렬 처리하기

Flux#parallel()과 runOn()을 사용하면 Flux가 생성하는 next 신호를 병렬로 처리할 수 있다. 다음 예를 보자.


Flux.range(1, 20)

        .parallel(2) // 작업을 레일로 나누기만 함

        .runOn(Schedulers.newParallel("PAR", 2))  // 각 레일을 병렬로 실행

        .map(x -> {

            int sleepTime = nextSleepTime(x % 2 == 0 ? 50 : 100, x % 2 == 0 ? 150 : 300);

            logger.info("map1 {}, sleepTime {}", x, sleepTime);

            sleep(sleepTime);

            return String.format("%02d", x);

        })

        .subscribe(i -> logger.info("next {}", i) );


// nextSleepTime은 인자로 받은 두 정수 값 범위에 해당하는 임의의 값을 생성한다고 가정


Flux#parallel(int parallelism) 메서드는 Flux가 생성하는 next 신호를 parallelism 개수만큼 라운드 로빈 방식으로 신호를 나눈다. 분리한 신호는 일종의 신호를 전달할 레일(rail)을 구성한다. 위 코드는 2를 값으로 주었으므로 2개의 레일을 생성한다. 라운드 로빈 방식을 사용해서 각 레일에 값을 전달하므로 위 코드는 [1, 3, 5, .., 19]를 제공하는 레일과 [2, 4, 6, ..., 20]를 제공하는 레일을 생성한다.


parallel()로 여러 레일을 만든다고 해서 병렬로 신호를 처리하는 것은 아니다. parallel()은 병렬로 신호를 처리할 수 있는 ParallelFlux를 리턴하는데, ParallelFlux의 runOn() 메서드에 다중 쓰레드를 사용하는 스케줄러를 전달해야 병렬로 신호를 처리할 수 있다. 위 코드는 2개 쓰레드를 사용하는 parallel 스케줄러를 전달했으므로 동시에 2개 레일로부터 오는 신호를 처리하게 된다.


병렬로 처리되는 것을 확인하기 위해 map() 메서드는 값이 짝수인 경우 50~150 밀리초, 홀수인 경우 100~300 밀리초 동안 슬립하고 문자열로 변환한 값을 리턴하도록 구현했다. parallel()은 라운드 로빈 방식으로 레일을 나누므로 짝수 레일과 홀수 레일이 생성되므로 슬립 타임 구간이 작은 짝수 레일이 더 빨리 끝나게 된다.


실제 결과를 확인해보자.


13:45:14.272 [PAR-1] INFO parallel.ParallelTest - map1 1, sleepTime 117

13:45:14.272 [PAR-2] INFO parallel.ParallelTest - map1 2, sleepTime 96

13:45:14.378 [PAR-2] INFO parallel.ParallelTest - next 02

13:45:14.378 [PAR-2] INFO parallel.ParallelTest - map1 4, sleepTime 98

13:45:14.399 [PAR-1] INFO parallel.ParallelTest - next 01

13:45:14.399 [PAR-1] INFO parallel.ParallelTest - map1 3, sleepTime 268

13:45:14.477 [PAR-2] INFO parallel.ParallelTest - next 04

13:45:14.477 [PAR-2] INFO parallel.ParallelTest - map1 6, sleepTime 93

13:45:14.570 [PAR-2] INFO parallel.ParallelTest - next 06

...생략

13:45:14.868 [PAR-2] INFO parallel.ParallelTest - map1 16, sleepTime 50

13:45:14.905 [PAR-1] INFO parallel.ParallelTest - next 05

13:45:14.905 [PAR-1] INFO parallel.ParallelTest - map1 7, sleepTime 201

13:45:14.918 [PAR-2] INFO parallel.ParallelTest - next 16

13:45:14.918 [PAR-2] INFO parallel.ParallelTest - map1 18, sleepTime 122

13:45:15.040 [PAR-2] INFO parallel.ParallelTest - next 18

13:45:15.040 [PAR-2] INFO parallel.ParallelTest - map1 20, sleepTime 62

13:45:15.102 [PAR-2] INFO parallel.ParallelTest - next 20

13:45:15.106 [PAR-1] INFO parallel.ParallelTest - next 07

13:45:15.106 [PAR-1] INFO parallel.ParallelTest - map1 9, sleepTime 202

13:45:15.308 [PAR-1] INFO parallel.ParallelTest - next 09

13:45:15.308 [PAR-1] INFO parallel.ParallelTest - map1 11, sleepTime 131

13:45:15.439 [PAR-1] INFO parallel.ParallelTest - next 11

13:45:15.439 [PAR-1] INFO parallel.ParallelTest - map1 13, sleepTime 289

13:45:15.728 [PAR-1] INFO parallel.ParallelTest - next 13

13:45:15.728 [PAR-1] INFO parallel.ParallelTest - map1 15, sleepTime 288

13:45:16.017 [PAR-1] INFO parallel.ParallelTest - next 15

13:45:16.017 [PAR-1] INFO parallel.ParallelTest - map1 17, sleepTime 156

13:45:16.173 [PAR-1] INFO parallel.ParallelTest - next 17

13:45:16.173 [PAR-1] INFO parallel.ParallelTest - map1 19, sleepTime 247

13:45:16.420 [PAR-1] INFO parallel.ParallelTest - next 19


실행 결과를 보면 PAR-1 쓰레드는 홀수를 PAR-2는 짝수를 처리하는 것을 알 수 있다. 즉 쓰레드마다 한 레일을 처리하고 있다. 짝수인 경우 슬립 타임을 더 작은 범위로 주었으므로 짝수 레일을 처리한 PAR-2가 먼저 레일을 처리하고 있다.

아래와 같이 레일은 4개로 나누었는데 쓰레드가 2개인 병렬 스케줄러를 사용하면 어떻게 될까?

Flux.range(1, 20)
        .parallel(4)
        .runOn(Schedulers.newParallel("PAR", 2))
        .map(x -> {
            ...
        })
        .subscribe(i -> logger.info("next {}", i) );


이 경우 스케줄러는 2개의 레일을 먼저 처리한다. 한 레일에 남아 있는 데이터가 없으면 데이터가 남아 있는 다른 레일을 처리한다.


레일당 크기

ParallelFlux#runOn() 메서드는 기본적으로 한 레일 당 Queues.SMALL_BUFFER_SIZE 만큼의 데이터를 저장한다. (이 값은 reactor.bufferSize.small 시스템 프로퍼티 값을 사용하는데 이 값을 지정하지 않으면 256을 사용하고 이 값이 16보다 작으면 16을 사용한다.)


레일에 미리 채울 데이터 개수를 변경하려면 다음과 같이 runOn() 메서드의 두 번째 인자로 값을 주면 된다. 다음 코드는 레일에 미리 채울 값(prefetch)으로 2를 사용한 예이다.


Flux.range(1, 20)

        .parallel(4)

        .runOn(Schedulers.newParallel("PAR", 2), 2) // 레일에 미리 채울 값으로 2 사용

        .subscribe(x -> logger.info("next {}", x));


위 코드의 경우 최초에 각 레일에 다음과 같이 데이터가 채워진다.

레일0: 1, 5
레일1: 2, 6
레일2: 3, 7
레일3: 4, 8


스케줄러는 2개의 쓰레드를 사용하는데 두 쓰레드를 PAR-1, PAR-2라고 하자. 이 두 쓰레드가 처음에 각각 레일0과 레일1을 선택했다고 하자.


레일0: 1, 5 (PAR-1)
레일1: 2, 6 (PAR-2)
레일2: 3, 7
레일3: 4, 8

두 쓰레드가 레일의 데이터를 처리하면 상태는 다음과 같이 바뀐다.


레일0: (PAR-1)
레일1: (PAR-2)
레일2: 3, 7
레일3: 4, 8

이 상태에서 PAR-2가 레일1이 비어있는지 여부를 검사한다면 레일이 비워져 있으므로 다음 레일을 선택한다. 이때 레일3을 선택했다고 하자. 그리고 PAR-1이 레일0이 비어있는지 여부를 검사하기 전에 레일0과 레일1이 채워졌다고 하자. 그럼 상태는 다음과 같이 바뀐다.


레일0: 9 (PAR-1)
레일1: 10
레일2: 3, 7 
레일3: 4, 8 (PAR-2)

그러면 PAR-2는 4를 처리하고 PAR-1은 9를 처리한다. PAR-1이 9를 처리하는 동안에 레일0에 데이터가 채워지지 않았다면 다음 레일을 선택하는데 이때 레일1을 선택할 수 있다.

레일0: 11,
레일1: 10 (PAR-1)
레일2: 3, 7 
레일3: 8 (PAR-2)

이렇게 병렬 스케줄러의 쓰레드 개수가 레일 개수보다 작으면 그때 그때 레일의 데이터 개수에 따라 스케줄러가 선택하는 레일이 달라지게 된다.

Mono.zip()으로 병렬 처리하기

각 Mono의 구독 처리 쓰레드를 병렬 스케줄러로 실행하고 Mono.zip() 메서드를 이용해서 Mono를 묶으면 각 Mono를 병렬로 처리할 수 있다. 다음은 예제 코드이다.


Mono m1 = Mono.just(1).map(x -> {

    logger.info("1 sleep");

    sleep(1500);

    return x;

}).subscribeOn(Schedulers.parallel());


Mono m2 = Mono.just(2).map(x -> {

    logger.info("2 sleep");

    sleep(3000);

    return x;

}).subscribeOn(Schedulers.parallel());


Mono m3 = Mono.just(3).map(x -> {

    logger.info("3 sleep");

    sleep(2000);

    return x;

}).subscribeOn(Schedulers.parallel());


logger.info("Mono.zip(m1, m2, m3)");


Mono.zip(m1, m2, m3)

        .subscribe(tup -> logger.info("next: {}", tup);


위 코드에서 m1, m2, m3는 각각 1.5초, 3초, 2초간 슬립한다. 각각은 subscribeOn()을 이용해서 Parallel 스케줄러를 이용해서 구독 요청을 처리하도록 했다. 그리고 Mono.zip()으로 m1, m2, m3를 묶었다.


실제 실행 결과를 보면 m1, m2, m3가 슬립을 동시에 시작하고 약 3초 뒤에 세 Mono의 값을 묶은 Tuple3의 값을 출력하는 것을 알 수 있다. 이를 통해 m1, m2, m3를 동시에 실행했음을 확인할 수 있다.


16:12:34.424 [main] INFO parallel.ParallelTest - Mono.zip(m1, m2, m3)

16:12:34.447 [parallel-1] INFO parallel.ParallelTest - 1 sleep

16:12:34.447 [parallel-3] INFO parallel.ParallelTest - 3 sleep

16:12:34.447 [parallel-2] INFO parallel.ParallelTest - 2 sleep

16:12:37.469 [parallel-2] INFO parallel.ParallelTest - next: [1,2,3]



관련 글


Posted by 최범균 madvirus

댓글을 달아 주세요

스톰을 사용하는 이유 중 하나를 꼽아보자면, 병렬 처리를 들 수 있을 것 같다. 다중 노드에서 병렬로 연산을 함으로써 대량의 실시간 스트림 데이터를 처리하기 위해 만든 것이 스톰임을 생각해보면, 병렬 처리는 스톰의 주된 사용 이유일 것이다.


스톰의 기반 API의 경우, 아래 코드처럼 태스크의 개수와 쓰레드 개수 등을 설정해서 몇 개의 볼트를 동시에 실행할지 결정했다.


builder.setBolt("word-normalizer", new WordNormalizer(), 4) // 4개의 쓰레드

        .setNumTasks(8) // 8개의 작업 생성

        .shuffleGrouping("word-reader");


스톰의 병렬 힌트와 파티션


스톰 트라이던트 API는 직접 태스크 개수를 지정하는 방식을 사용하지 않고, 병렬 힌트를 주는 방식을 사용하고 있다. 다음은 병렬 힌트 코드의 예를 보여주고 있다.


topology.newStream("log", new LogSpout())

        .each(new Fields("logString"),  new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .parallelismHint(2);


위 코드에서 parallelismHint() 메서드를 볼 수 있는데, 이 메서드는 동시 실행 단위가 되는 파티션을 몇 개 생성할지 결정한다. 예를 들어, 위 코드는 다음과 같이 두 개의 타피션을 생성한다.



스톰은 각 파티션을 별도 쓰레드에서 실행한다. 워커 프로세스가 2개 이상일 경우, 별도 프로세스에 파티션이 실행된다. 파티션의 단위가 변경되는 것을 리파티션이라고 하는데, parallelismHint() 메서드에 의해 생성되는 파티션 적용 범위는 parallelismHint() 메서드 호출 이전에 리파티션이 일어나기 전까지다. 예를 들어, 다음 코드를 보자.


topology.newStream("log", new LogSpout())

        .parallelismHint(1)

        .shuffle()

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .parallelismHint(2);


위 코드를 보면 두 개의 parallelismHint() 메서드를 사용하고 있다. 한 번은 LogSpout 뒤에 설정했고, 한 번은 나머지 세 개 연산 뒤에 설정했다. 이 경우 두 번째 설정한 parallelismHint(2) 메서드의 적용 범위는 첫 번째 parallelismHint(1) 메서드의 이후가 된다. 이 경우 생성되는 파티션은 다음과 같다. 



파티션의 개수가 달라지는 리파티셔닝은 위 코드처럼 parallelismHint()의 개수가 다를 때 발생하며, groupBy()에 의해서도 발생한다. groupBy()는 기본적으로 모든 파티션에서 발생한 튜플을 한 곳으로 모아 그룹핑처리를 한다.


topology.newStream("log", new LogSpout()) // 1개 파티션

        .parallelismHint(1)

        .shuffle()

         // 2개 파티션

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new GroupingValueFunction(), new Fields("productId:time"))

        .parallelismHint(2)

        .groupBy(new Fields("productId:time")) // 1개 파티션

        .aggregate(new CountAggregator(), new Fields("count"))


리파티셔닝과 튜플 분배


파티션 크기가 변경되면, 파티션 간에 데이터를 어떻게 분배할지에 대해 결정해 주어야 한다. 예를 들어, 아래 코드는 LogSpout가 속한 파티션과 이후 세 개의 연산이 속한 파티션의 개수가 다르기 때문에 LogSpout가 생성한 튜플을 세 개의 파티션에 알맞게 분배해줘야 한다. 아래 코드에서는 라인 로빈 방식으로 분배하는 shuffle() 방식을 선택한다.


topology.newStream("log", new LogSpout())

        .parallelismHint(1)

        .shuffle()

        .each(new Fields("logString"), new OrderLogFilter())

        .each(...)

        .each(...)

        .parallelismHint(3)


트라이던트 API가 제공하는 분배 방식은 다음과 같다.

  • shuffle(): 라운드 로빈 방식으로 분배
  • partitionBy(Fields): 필드의 해시값을 이용해서 분배
  • broadcast(): 모든 파티션에 복사
  • global(): 모든 튜플을 한 파티션으로 보낸다.
  • batchGloabl(): 한 배치에 속한 튜플은 한 파티션으로 보낸다.
  • 커스텀 규칙: 직접 분배 규칙을 구현한다.

파티션 로컬 오퍼레이션


스트림에서 다음의 세 연산은 한 파티션에서 실행된다.

  • each(Function)
  • each(Filter)
  • partitionAggregate(Aggregator)
예를 들어, 아래 코드에서 병렬 힌트를 CountSumFunction 다음에 3으로 주긴 했지만, Function과 Filter는 기본적으로 같은 파티션에 포함되기 때문에, [CountSumFunction, ThresholdFilter, AlertFilter]는 한 파티션에 속하며, 따라서 이 쌍으로 세 개의 파티션이 생기게 된다.

.groupBy(new Fields("productId:time"))
.aggregate(new CountAggregator(), new Fields("count"))
.parallelismHint(1).partitionBy(new Fields("productId:time"))
.each(new Fields("productId:time", "count"), new CountSumFunction(), new Fields("sum"))
.parallelismHint(3)
.each(new Fields("productId:time", "sum"), new ThresholdFilter())
.each(new Fields("productId:time", "sum"), new AlertFilter());

파티션 로컬 오퍼레이션에 대해서 파티션을 분리하고 싶다면, 다음과 같이 명시적으로 병렬 힌트를 지정해 주어야 한다. 이 코드는 AlertFilter 다음에 병렬 힌트를 1로 주었다. 따라서, 아래 코드는 CountSumFunction만 속한 파티션 3개와 [ThresholdFilter, AlertFilter]가 속한 파티션 1개를 생성한다.

.groupBy(new Fields("productId:time"))
.aggregate(new CountAggregator(), new Fields("count"))
.parallelismHint(1).partitionBy(new Fields("productId:time"))
.each(new Fields("productId:time", "count"), new CountSumFunction(), new Fields("sum"))
.parallelismHint(3).shuffle()
.each(new Fields("productId:time", "sum"), new ThresholdFilter())
.each(new Fields("productId:time", "sum"), new AlertFilter())
.parallelismHint(1);

한 타피션은 물리적으로 한 볼트에 속한다. 예를 들어, 아래 코드를 보자.

topology.newStream("log", new LogSpout())
        .each(new Fields("logString"), new OrderLogFilter())
        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))
        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))
        .groupBy(new Fields("productId:time"))
        .aggregate(new Fields("productId:time"), new Count(), new Fields("count"))
        .each(new Fields("productId:time", "count"), new CountSumFunction(), new Fields("sum"))
        .each(new Fields("productId:time", "sum"), new ThresholdFilter())
        .each(new Fields("productId:time", "sum"), new AlertFilter());

위 코드에서 리파티셔닝을 하는 연산으로 groupBy()가 존재한다. 따라서, LogSpout부터 AddGroupingValueFunction 까지 파티션이 만들어지고, Count()부터 AlertFilter()까지 파티션이 만들어진다. 이 경우 스톰은 다음과 같이 Spout와 Bold를 구성한다.


참고자료

관련 글:


Posted by 최범균 madvirus

댓글을 달아 주세요