주요글: 도커 시작하기
반응형

스톰을 사용하는 이유 중 하나를 꼽아보자면, 병렬 처리를 들 수 있을 것 같다. 다중 노드에서 병렬로 연산을 함으로써 대량의 실시간 스트림 데이터를 처리하기 위해 만든 것이 스톰임을 생각해보면, 병렬 처리는 스톰의 주된 사용 이유일 것이다.


스톰의 기반 API의 경우, 아래 코드처럼 태스크의 개수와 쓰레드 개수 등을 설정해서 몇 개의 볼트를 동시에 실행할지 결정했다.


builder.setBolt("word-normalizer", new WordNormalizer(), 4) // 4개의 쓰레드

        .setNumTasks(8) // 8개의 작업 생성

        .shuffleGrouping("word-reader");


스톰의 병렬 힌트와 파티션


스톰 트라이던트 API는 직접 태스크 개수를 지정하는 방식을 사용하지 않고, 병렬 힌트를 주는 방식을 사용하고 있다. 다음은 병렬 힌트 코드의 예를 보여주고 있다.


topology.newStream("log", new LogSpout())

        .each(new Fields("logString"),  new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .parallelismHint(2);


위 코드에서 parallelismHint() 메서드를 볼 수 있는데, 이 메서드는 동시 실행 단위가 되는 파티션을 몇 개 생성할지 결정한다. 예를 들어, 위 코드는 다음과 같이 두 개의 타피션을 생성한다.



스톰은 각 파티션을 별도 쓰레드에서 실행한다. 워커 프로세스가 2개 이상일 경우, 별도 프로세스에 파티션이 실행된다. 파티션의 단위가 변경되는 것을 리파티션이라고 하는데, parallelismHint() 메서드에 의해 생성되는 파티션 적용 범위는 parallelismHint() 메서드 호출 이전에 리파티션이 일어나기 전까지다. 예를 들어, 다음 코드를 보자.


topology.newStream("log", new LogSpout())

        .parallelismHint(1)

        .shuffle()

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .parallelismHint(2);


위 코드를 보면 두 개의 parallelismHint() 메서드를 사용하고 있다. 한 번은 LogSpout 뒤에 설정했고, 한 번은 나머지 세 개 연산 뒤에 설정했다. 이 경우 두 번째 설정한 parallelismHint(2) 메서드의 적용 범위는 첫 번째 parallelismHint(1) 메서드의 이후가 된다. 이 경우 생성되는 파티션은 다음과 같다. 



파티션의 개수가 달라지는 리파티셔닝은 위 코드처럼 parallelismHint()의 개수가 다를 때 발생하며, groupBy()에 의해서도 발생한다. groupBy()는 기본적으로 모든 파티션에서 발생한 튜플을 한 곳으로 모아 그룹핑처리를 한다.


topology.newStream("log", new LogSpout()) // 1개 파티션

        .parallelismHint(1)

        .shuffle()

         // 2개 파티션

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new GroupingValueFunction(), new Fields("productId:time"))

        .parallelismHint(2)

        .groupBy(new Fields("productId:time")) // 1개 파티션

        .aggregate(new CountAggregator(), new Fields("count"))


리파티셔닝과 튜플 분배


파티션 크기가 변경되면, 파티션 간에 데이터를 어떻게 분배할지에 대해 결정해 주어야 한다. 예를 들어, 아래 코드는 LogSpout가 속한 파티션과 이후 세 개의 연산이 속한 파티션의 개수가 다르기 때문에 LogSpout가 생성한 튜플을 세 개의 파티션에 알맞게 분배해줘야 한다. 아래 코드에서는 라인 로빈 방식으로 분배하는 shuffle() 방식을 선택한다.


topology.newStream("log", new LogSpout())

        .parallelismHint(1)

        .shuffle()

        .each(new Fields("logString"), new OrderLogFilter())

        .each(...)

        .each(...)

        .parallelismHint(3)


트라이던트 API가 제공하는 분배 방식은 다음과 같다.

  • shuffle(): 라운드 로빈 방식으로 분배
  • partitionBy(Fields): 필드의 해시값을 이용해서 분배
  • broadcast(): 모든 파티션에 복사
  • global(): 모든 튜플을 한 파티션으로 보낸다.
  • batchGloabl(): 한 배치에 속한 튜플은 한 파티션으로 보낸다.
  • 커스텀 규칙: 직접 분배 규칙을 구현한다.

파티션 로컬 오퍼레이션


스트림에서 다음의 세 연산은 한 파티션에서 실행된다.

  • each(Function)
  • each(Filter)
  • partitionAggregate(Aggregator)
예를 들어, 아래 코드에서 병렬 힌트를 CountSumFunction 다음에 3으로 주긴 했지만, Function과 Filter는 기본적으로 같은 파티션에 포함되기 때문에, [CountSumFunction, ThresholdFilter, AlertFilter]는 한 파티션에 속하며, 따라서 이 쌍으로 세 개의 파티션이 생기게 된다.

.groupBy(new Fields("productId:time"))
.aggregate(new CountAggregator(), new Fields("count"))
.parallelismHint(1).partitionBy(new Fields("productId:time"))
.each(new Fields("productId:time", "count"), new CountSumFunction(), new Fields("sum"))
.parallelismHint(3)
.each(new Fields("productId:time", "sum"), new ThresholdFilter())
.each(new Fields("productId:time", "sum"), new AlertFilter());

파티션 로컬 오퍼레이션에 대해서 파티션을 분리하고 싶다면, 다음과 같이 명시적으로 병렬 힌트를 지정해 주어야 한다. 이 코드는 AlertFilter 다음에 병렬 힌트를 1로 주었다. 따라서, 아래 코드는 CountSumFunction만 속한 파티션 3개와 [ThresholdFilter, AlertFilter]가 속한 파티션 1개를 생성한다.

.groupBy(new Fields("productId:time"))
.aggregate(new CountAggregator(), new Fields("count"))
.parallelismHint(1).partitionBy(new Fields("productId:time"))
.each(new Fields("productId:time", "count"), new CountSumFunction(), new Fields("sum"))
.parallelismHint(3).shuffle()
.each(new Fields("productId:time", "sum"), new ThresholdFilter())
.each(new Fields("productId:time", "sum"), new AlertFilter())
.parallelismHint(1);

한 타피션은 물리적으로 한 볼트에 속한다. 예를 들어, 아래 코드를 보자.

topology.newStream("log", new LogSpout())
        .each(new Fields("logString"), new OrderLogFilter())
        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))
        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))
        .groupBy(new Fields("productId:time"))
        .aggregate(new Fields("productId:time"), new Count(), new Fields("count"))
        .each(new Fields("productId:time", "count"), new CountSumFunction(), new Fields("sum"))
        .each(new Fields("productId:time", "sum"), new ThresholdFilter())
        .each(new Fields("productId:time", "sum"), new AlertFilter());

위 코드에서 리파티셔닝을 하는 연산으로 groupBy()가 존재한다. 따라서, LogSpout부터 AddGroupingValueFunction 까지 파티션이 만들어지고, Count()부터 AlertFilter()까지 파티션이 만들어진다. 이 경우 스톰은 다음과 같이 Spout와 Bold를 구성한다.


참고자료

관련 글:


+ Recent posts