스톰을 사용하는 이유 중 하나를 꼽아보자면, 병렬 처리를 들 수 있을 것 같다. 다중 노드에서 병렬로 연산을 함으로써 대량의 실시간 스트림 데이터를 처리하기 위해 만든 것이 스톰임을 생각해보면, 병렬 처리는 스톰의 주된 사용 이유일 것이다.
스톰의 기반 API의 경우, 아래 코드처럼 태스크의 개수와 쓰레드 개수 등을 설정해서 몇 개의 볼트를 동시에 실행할지 결정했다.
builder.setBolt("word-normalizer", new WordNormalizer(), 4) // 4개의 쓰레드
.setNumTasks(8) // 8개의 작업 생성
.shuffleGrouping("word-reader");
스톰의 병렬 힌트와 파티션
스톰 트라이던트 API는 직접 태스크 개수를 지정하는 방식을 사용하지 않고, 병렬 힌트를 주는 방식을 사용하고 있다. 다음은 병렬 힌트 코드의 예를 보여주고 있다.
topology.newStream("log", new LogSpout())
.each(new Fields("logString"), new OrderLogFilter())
.each(new Fields("logString"), new LogParser(), new Fields("shopLog"))
.each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))
.parallelismHint(2);
위 코드에서 parallelismHint() 메서드를 볼 수 있는데, 이 메서드는 동시 실행 단위가 되는 파티션을 몇 개 생성할지 결정한다. 예를 들어, 위 코드는 다음과 같이 두 개의 타피션을 생성한다.
스톰은 각 파티션을 별도 쓰레드에서 실행한다. 워커 프로세스가 2개 이상일 경우, 별도 프로세스에 파티션이 실행된다. 파티션의 단위가 변경되는 것을 리파티션이라고 하는데, parallelismHint() 메서드에 의해 생성되는 파티션 적용 범위는 parallelismHint() 메서드 호출 이전에 리파티션이 일어나기 전까지다. 예를 들어, 다음 코드를 보자.
topology.newStream("log", new LogSpout())
.parallelismHint(1)
.shuffle()
.each(new Fields("logString"), new OrderLogFilter())
.each(new Fields("logString"), new LogParser(), new Fields("shopLog"))
.each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))
.parallelismHint(2);
위 코드를 보면 두 개의 parallelismHint() 메서드를 사용하고 있다. 한 번은 LogSpout 뒤에 설정했고, 한 번은 나머지 세 개 연산 뒤에 설정했다. 이 경우 두 번째 설정한 parallelismHint(2) 메서드의 적용 범위는 첫 번째 parallelismHint(1) 메서드의 이후가 된다. 이 경우 생성되는 파티션은 다음과 같다.
파티션의 개수가 달라지는 리파티셔닝은 위 코드처럼 parallelismHint()의 개수가 다를 때 발생하며, groupBy()에 의해서도 발생한다. groupBy()는 기본적으로 모든 파티션에서 발생한 튜플을 한 곳으로 모아 그룹핑처리를 한다.
topology.newStream("log", new LogSpout()) // 1개 파티션
.parallelismHint(1)
.shuffle()
// 2개 파티션
.each(new Fields("logString"), new OrderLogFilter())
.each(new Fields("logString"), new LogParser(), new Fields("shopLog"))
.each(new Fields("shopLog"), new GroupingValueFunction(), new Fields("productId:time"))
.parallelismHint(2)
.groupBy(new Fields("productId:time")) // 1개 파티션
.aggregate(new CountAggregator(), new Fields("count"))
리파티셔닝과 튜플 분배
파티션 크기가 변경되면, 파티션 간에 데이터를 어떻게 분배할지에 대해 결정해 주어야 한다. 예를 들어, 아래 코드는 LogSpout가 속한 파티션과 이후 세 개의 연산이 속한 파티션의 개수가 다르기 때문에 LogSpout가 생성한 튜플을 세 개의 파티션에 알맞게 분배해줘야 한다. 아래 코드에서는 라인 로빈 방식으로 분배하는 shuffle() 방식을 선택한다.
topology.newStream("log", new LogSpout())
.parallelismHint(1)
.shuffle()
.each(new Fields("logString"), new OrderLogFilter())
.each(...)
.each(...)
.parallelismHint(3)
트라이던트 API가 제공하는 분배 방식은 다음과 같다.
- shuffle(): 라운드 로빈 방식으로 분배
- partitionBy(Fields): 필드의 해시값을 이용해서 분배
- broadcast(): 모든 파티션에 복사
- global(): 모든 튜플을 한 파티션으로 보낸다.
- batchGloabl(): 한 배치에 속한 튜플은 한 파티션으로 보낸다.
- 커스텀 규칙: 직접 분배 규칙을 구현한다.
파티션 로컬 오퍼레이션
스트림에서 다음의 세 연산은 한 파티션에서 실행된다.
- each(Function)
- each(Filter)
- partitionAggregate(Aggregator)