저작권 안내: 저작권자표시 Yes 상업적이용 No 컨텐츠변경 No

스프링5 입문

JSP 2.3

JPA 입문

DDD Start

인프런 객체 지향 입문 강의

스트림으로 들어오는 튜플을 단순히 처리만하고 끝나는 경우는 드물다. 보통은 처리 결과를 최종 또는 중간 단계 어딘가에 저장하게 된다. 예를 들어, 10분당 판매량이 100개가 넘는 상품을 통지해주는 스톰 토폴로지를 만들었다면, 토폴로지를 이용해서 계산한 분당 판매량 데이터를 HBase나 MySQL과 같은 데이터베이스에서 보관하길 원하는데, 이렇게 해야 스톰에 장애가 발생했을 때, 그 이전에 계산한 값을 날리지 않을 수 있으며, 또한, 이렇게 계산한 데이터를 실시간 통계 데이터로 활용할 수 있기 때문이다.


이전글:


상태 추상화


Filter나 Function을 이용해서 계산한 결과 상태를 DB에 보관하는 기능을 구현할 수 있지만, 트라이던트 API는 일관된 방법으로 상태를 처리할 수 있도록 상태를 위한 API를 제공하고 있다. 트라이던트 API는 상태 보관을 위해 다음의 세 가지 인터페이스를 정의하고 있다.

  • State: 상태를 표현
  • StateFactory: State를 생성해주는 팩토리
  • StateUpdater: 배치 단위로 튜플을 State 객체에 반영

트라이던트 API는 위 세 가지 인터페이를 이용해서 상태를 관리한다. 즉, 개발자는 위 세 개의 인터페이스에 대한 구현 클래스만 알맞게 만들어주면 된다. 나머지는 트라이언트가 알아서 처리한다.


세 인터페이스는 다음과 같이 정의되어 있다.


public interface State {

    void beginCommit(Long txid);

    void commit(Long txid);

}


public interface StateFactory extends Serializable {

    State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions);

}


public interface StateUpdater<S extends State> extends Operation {

    void updateState(S state, List<TridentTuple> tuples, TridentCollector collector);

}


스톰은 내부적으로 다음과 같은 코드를 사용해서 튜플을 상태에 반영한다.


// 배치 시작을 상태에 알림

state.beginCommit(txid);


// 배치에 속한 튜플 단위로 상태에 반영하고, collector를 이용해서 새로운 튜플 발생

stateUpdater.updateState(state, tuples, collector)


// 배치 종료를 상태에 알림

state.commit();


스톰은 배치 단위로 튜플을 상태에 반영한다. 배치가 시작되면 이를 State에 알리고(beginCommit 실행), StateUpdater를 이용해서 배치에 속한 튜플을 State에 반영한다(updateState 실행). 배치에 속한 튜플을 모두 State에 반영하면 배치가 끝났음을 State에 알린다.(commit 실행)


위 코드에서 알 수 있는 건, State와 StateUpdater는 쌍을 이룬다는 점이다. 즉, 특정 타입의 State에 상태를 반영하려면 그 State에 알맞은 StateUpdater가 필요하다. 실제로 Stream의 partitionPersist() 메서드를 보면 다음과 같이 특정 타입의 State 객체를 생성하는 StateFactory와 그 State의 상태를 갱신할 때 사용될 StateUpdater를 함께 파라미터로 전달받는 것을 알 수 있다.


partitionPersist(StateFactory stateFactory, Fields inputFields, StateUpdater updater, Fields functionFields)

 


[State와 StateUpdater는 쌍을 이룬다.]


스톰은 일부 State 구현에 대해 이미 쌍을 우리는 StateUpdater 구현을 제공하고 있다. (위 그림에서 MapState와 MapCombinerAggStateUpdater가 스톰이 제공하는 인터페이스/클래스이다.)


State 만들어보기


Stream이 제공하는 메서드는 partitionPersist()인데, 이 이름에서 알 수 있듯이 스트림은 기본적으로 파티션 단위로 상태값을 보관하는 간단한 State 구현을 만들어보자. 이를 통해 State의 동작 방식에 대한 약간의 감을 잡을 수 있을 것이다. 여기서 만들 예제는 중복되지 않은 "제품ID:시간"을 키로 하는 이벤트 발생 여부를 보관하는 State 구현이다.


State 구현은 다음과 같이 간단하게 구현했다.


public class DistinctState implements State {


    private Map<String, ShopLog> map = new ConcurrentHashMap<>();

    private Map<String, ShopLog> temp = new ConcurrentHashMap<>();


    @Override

    public void beginCommit(Long txid) {

        temp.clear();

    }


    @Override

    public void commit(Long txid) {

        for (Map.Entry<String, ShopLog> entry : temp.entrySet())

            map.put(entry.getKey(), entry.getValue());

    }


    public boolean hasKey(String key) {

        return map.containsKey(key) || temp.containsKey(key);

    }


    public void put(String key, ShopLog value) {

        temp.put(key, value);

    }

}


위 코드에서 beginCommmit() 메서드는 temp 맵을 초기화한다. 그리고, commit() 메서드는 temp에 담겨 있는 모든 내용을 map에 복사한다. 여기서는 메모리 맵은 map 필드에 내용을 복사했지만, 실제 구현에서는 외부의 캐시 서버나 DBMS 등에 내용을 복사하도록 구현하게 된다. 즉, 한 배치가 시작될 때 beginCommit() 메서드를 통해 배치에 속한 튜플의 처리 준비를 하고, 한 배치가 끝날 때 commit() 메서드를 통해 배치에 포함된 튜플의 처리 결과를 상태를 보관할 저장소에 반영하게 된다.


배치의 시작과 끝 사이에 사용되는 메서드가 hasKey() 메서드와 put() 메서드이다. 이 두 메서드는 StateUpdater에 의해 사용되는데, DistictState를 위한 StateUpdater 구현 클래스는 다음과 같다.


public class DistinctStateUpdater extends BaseStateUpdater<DistinctState> {


    @Override

    public void updateState(

                DistinctState state, List<TridentTuple> tuples, TridentCollector collector) {

        List<TridentTuple> newEntries = new ArrayList<>();

        for (TridentTuple t : tuples) {

            String key = t.getStringByField("productId:time");

            if (!state.hasKey(key)) {

                state.put(key, (ShopLog) t.getValueByField("shopLog"));

                newEntries.add(t);

            }

        }

        for (TridentTuple t : newEntries) {

            collector.emit( // 상태로부터 새로운 스트림을 생성

                Arrays.asList(t.getValueByField("shopLog"), t.getStringByField("productId:time"))

            );

        }

    }

} 


StateUpdater 구현 클래스는 BaseStateUpdater 클래스를 상속받은 뒤 updateState() 메서드를 알맞게 구현해주면 된다. updateState() 메서드는 첫 번째 파라미터로 상태를 표현하는 객체를, 두 번째 파라미터로 배치에 속한 튜플을, 세 번째 파라미터는 튜플을 상태에 반영한 후 새로운 튜플을 생성할 때 사용할 TridentCollector이다.


updteState() 메서드를 보면 DistinctState의 hasKey() 메서드를 이용해서 처리하려는 튜플의 key 값이 이미 DistinctState에 보관한 key인지 확인한다. 아직 보관전인 key를 가진 튜플이면, put() 메서드를 이용해서 DistinctState에 반영하고, newEntries에 보관한다.


updateState() 메서드는 배치에 포함된 모든 튜플을 상태에 반영한 뒤, collector를 이용해서 newEntries에 보관된 튜플을 재발생시킨다. 이렇게 함으로써 상태 처리 상태로부터 새로운 튜플을 만들어내는 스트림이 만들어지게 된다.


마지막으로 구현해야 할 클래스는 StateFactory 인터페이스의 구현 클래스이다. 이 클래스의 구현은 다음과 같이 간단하다.


public class DistinctStateFactory implements StateFactory {

    private static final long serialVersionUID = 1L;


    @SuppressWarnings("rawtypes")

    @Override

    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {

        return new DistinctState();

    }

}


이제 이렇게 만든 상태 관련 클래스를 실제 트라이던트 토폴로지에 적용해보자.


TridentTopology topology = new TridentTopology();

topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .partitionPersist(

                new DistinctStateFactory()

                new Fields("shopLog", "productId:time"), // StateUpdater에 전달될 튜플의 필드 목록

                new DistinctStateUpdater()

                new Fields("shopLog", "productId:time")) // StateUpdater가 생성하는 튜플 필드 목록

        .newValuesStream() // StateUpdater에서 생성한 튜플로 구성된 스트림

        .each(new Fields("shopLog", "productId:time"), Util.printer());

...


partionPersist() 메서드는 파티션 별로 State를 생성하고 튜플을 상태에 반영하는 기능을 제공한다. 위 코드의 경우 StateFactory로 DisctinctStateFactory를 사용했으므로 파티션 별로 상태 보관을 위해 DistinctState 객체가 생성된다.


partitionPersist() 메서드는 TridentState 객체를 리턴하는데, 이 객체의 newValueStream() 메서드는 DistinctStateUpdater에서 collector를 이용해서 생성한 튜플을 스트림으로 제공하는 새로운 Stream을 리턴한다. 따라서, 위 코드에서 마지막 each() 메서드는 DistinctStateUpdater가 생성한 튜플을 입력으로 받아서 그 내용을 콘솔에 출력하게 된다.


상태와 튜플 재처리 방식


튜플로부터 상태를 보관할 때 고민해야 할 점이 있다면, 그것은 바로 재처리와 관련된 것이다. 아래 그림을 보자. 아 그림에서 persistentAggregate 연산은 앞의 groupBy로 그룹핑 된 튜플의 개수를 적재하는 State를 처리해준다고 해 보자.




State가 키가 "0:123"인 튜플의 개수로 3을 갖고 있었다고 하자. 즉, {"0:123": 3 }을 State가 상태로 보관하고 있었다고 하자. 이 상태에서 위 그림의 윗 부분의 실행되었다고 하자. 이 때 스파우트가 발생한 "0:123"인 튜플의 개수가 4라면, persistenAggregate를 거친 뒤에 상태 값은 {"0:123": 7}이 될 것이다. 그런데, AlertFunction 연산에서 실패가 나는 바람에 스톰이 재처리를 수행했다고 해 보자. 그러면, 위 그림의 아래 부분처럼 튜플을 다시 발생시킬 거고, 그렇게 되면 persistenceAggregate는 이미 {"0:123": 7}을 상태 값으로 갖고 있었기 때문에, 재발송된 튜플 개수 4개가 더해지면서 상태 값이 {"0:123", 11}로 바뀌게 될 것이다.


사실, 여기서 동일한 튜플이 다시 발생한 것이므로 최종 상태 값은 {"0:123", 7}이어야 한다. 그런데, 상태가 튜플의 재발생 여부를 제대로 처리하지 못하게 되면, 같은 튜플이 상태에 중복해서 적용되는 문제가 발생하는 것이다.


이렇게 튜플을 다시 발생했을 때 동일 튜플이 State에 중복해서 적용되는 문제를 처리하기 위해 스톰은 세 가지 타입의 State 구현 객체를 제공하고 있다.



State

설명

TransactionalMap

스파우트가 Transactional인 경우(즉 한 배치에 대해 동일한 튜플을 생성할 경우), 동일 튜플에 대해 동일 상태를 보장한다.

OpaqueMap

스파우트가 Opaque Transactional인 경우(즉 한 튜플은 한 배치에만 속하지만 한 배치에 대해 매번 동일한 튜플이 발생하지 않을수도 있는 경우), 이전 상태 값을 기준으로 상태를 갱신한다.

NonTransactionalMap스파우트가 Non Transactional인 경우(즉 튜플이 특정 배치에만 속한다고 가정할 수 없는 경우), 튜플 재생성에 따른 롤백을 지원하지 않는다.


TransactionalMap의 동작 방식


TransactionalMap은 상태를 보관할 때 (트랜잭션ID, {키: 값})의 구조를 사용한다. 예를 들어, 배치 단위로 그룹바이-카운트를 수행한 결과 아래 표의 왼쪽/중간 컬럼과 같이 (키, 개수)가 생성되었다고 해 보자. 이 경우 표의 우측 컬럼과 같이 상태 값이 갱신된다.


 배치(트랜잭션) ID

 튜플 연산 결과 (키, 개수) 값

상태

 1

{ "0:123": 4 }

(1, {"0:123": 4} ) 

 2

{ "0:123": 5 } 

(2, {"0:123": 9} )

 3

{ "0:123": 3 } 

(3, {"0:123": 12} ) 

 3 (재발생)

{ "0:123": 3 }  
Transactional 스파우트는 동일한 튜플을 발생

(3, {"0:123": 12} )  
(기존 값 그대로 유지)

위 표에서 트랜잭션ID가 3에 해당하는 배치의 튜플이 다시 발생했다. 이 스파우트는 Transactional 타입이므로 같은 배치 ID에 대해 동일한 튜플을 발생시키므로, 튜플의 그룹바이-카운트 연산 결과는 위 표의 가운데 컬럼에서 보는 것처럼 동일하다. 따라서, TransactionalMap은 현재 상태와 동일한 트랜잭션 ID에 해당하는 결과 값이 들어올 경우, 상태에 반영하지 않고 유지하면 된다.


OpaqueMap의 동작 방식


OpaqueMap은 상태를 보관할 때 (트랜잭션ID, 키, 이전값, 현재값)의 구조를 사용한다. 앞의 TransactionalMap과 유사하게 3번 트랜잭션ID에 해당하는 배치를 재처리했다고 해 보자.


 배치(트랜잭션) ID

 튜플 연산 결과 (키, 개수) 값

상태

 1

{ "0:123": 4 }

(1, "0:123", null, 4 ) 

 2

{ "0:123": 5 } 

(2, "0:123", 4 9 )

 3

{ "0:123": 3 } 

(3, "0:123", 9, 12 ) 

 3 (재발생)

{ "0:123": 5 }  

Opaque 스파우트는 발생한 튜플이 달라질 수 있음

(3, "0:123", 9, 14 )  

위 표에서 OpaqueMap은 상태에 값을 보관할 때, 현재 처리한 트랜잭션ID와 이전 값, 현재 값을 함께 보관한다. 만약 특정 문제가 발생해서 스파우트가 특정 트랜잭션 ID에 해당하는 튜플을 재발생한 경우, Opaque 스파우트는 위 표의 중간 컬럼처럼 다른 결과를 만들어낼 수 있다. 따라서 상태는 새롭게 계산된 결과와 이전 상태 값을 이용해서 현재 상태 값을 갱신해주어야 한다. 위 표에서는 3번 트랜잭션ID의 재발생 시점에 이전 상태 값이 {"0:123": 9}이므로, 현재 상태 값은 {"0:123": 9 + 5}인 {"0:123": 14}로 바뀐다.


OpaqueMap 구현 방법: IBackingMap, OpaqueValue 사용


OpaqueMap은 내부적으로 IBackingMap 인터페이스와 OpaqueValue 클래스를 사용한다. OpaqueMap은 상태 데이터를 처리하기 위해 OpaqueValue 클래스를 사용하고, 실제 저장소로부터 상태 값을 조회하거나 저장소에 상태 값을 반영하기 위해 IBackingMap 구현체를 사용한다. 실제 OpaqueMap 인스턴스를 생성하는 메서드는 정적 팩토리 메서드로 다음과 같이 정의되어 있다.


public class OpaqueMap<T> implements MapState<T> {

    public static <T> MapState<T> build(IBackingMap<OpaqueValue> backing) {

        return new OpaqueMap<T>(backing);

    }


실제로 OpaqueMap을 사용하기 위해서 필요한 건 다음과 같다.

  • 상태 보관을 위해 IBackingMap 인터페이스를 알맞게 구현한 클래스를 만든다.
  • 앞서 만든 IBackingMap 구현 객체를 이용해서 생성한 OpaqueMap을 리턴해주는 StateFactory를 구현한다.
  • 집합 연산을 구현한다.
  • GroupedStream의 persistenceAggregate() 메서드를 이용해서 그룹핑 연산 결과를 상태에 반영한다.
    • Stream의 partitionPersist()를 사용할 경우, 알맞은 StateUpdater를 사용한다.

OpaqueMap용 "키:개수" 상태 보관하기 위한 IBackingMap 인터페이스를 구현한 클래스는 다음 코드처럼 만들어 볼 수 있다.


public class CountSumBackingMap implements IBackingMap<OpaqueValue> {

    private Map<String, CountValue> sumMap = new ConcurrentHashMap<>();


    @Override

    public List<OpaqueValue> multiGet(List<List<Object>> keys) {

        List<OpaqueValue> values = new ArrayList<>();

        for (List<Object> keyVals : keys) {

            String key = (String) keyVals.get(0);

            CountValue countValue = sumMap.get(key);

            if (countValue == null) {

                values.add(null);

            } else {

                values.add(new OpaqueValue<Long>(countValue.getCurrentTxId(),

                                 countValue.getCurrCount(), countValue.getPrevCount()));

            }

        }

        return values;

    }


    @Override

    public void multiPut(List<List<Object>> keys, List<OpaqueValue> vals) {

        for (int i = 0; i < keys.size(); i++) {

            String key = (String) keys.get(i).get(0);

            OpaqueValue<Long> val = vals.get(i);


            CountValue countValue = sumMap.get(key);

            if (countValue == null) {

                sumMap.put(key, new CountValue(val.getCurrTxid(), val.getPrev(), val.getCurr()));

            } else {

                countValue.update(val.getCurrTxid(), val.getPrev(), val.getCurr());

            }

        }

    }


}


IBackingMap을 구현하는 클래스는 multiGet() 메서드와 multiPut() 메서드 두 개만 알맞게 구현해주면 된다. 먼저, multiGet() 메서드는 파라미터로 키 목록을 전달받으며, 이 키에 해당하는 OpaqueValue 목록을 리턴해주면 된다. 위 코드의 multiGet() 메서드 메모리 맵인 sumMap에서 키에 해당하는 CountValue 객체를 가져온 뒤, 그 CountValue 객체로부터 OpaqueValue를 생성하고 있다.


CountValue는 (currTxId, prevVal, currVal)을 값으로 갖고 있으므로, sumMap은 결과적으로 (currTxId, key, prevVal, currVal)을 보관하는 장소가 된다. 이 예에서는 메모리 맵을 사용했지만, 실제로는 외부의 DBMS나 NoSQL 같은 곳에 보관된 데이터를 보관하고 읽어올 것이다.


OpaqueValue는 OpaqueMap의 동작 방식에서 설명했던 데이터 구조 중 키를 제외한 (트랜잭션ID, 이전값, 현재값)을 값으로 갖는다. 위 코드에서도 CountValue로부터 OpaqueValue를 생성할 때 트랜잭션ID, 이전값, 현재값을 가져와 생성했다.


스톰은 그룹 연산을 수행할 때 OpaqueMap은 내부적으로 다음과 같은 과정을 거친다.

  • IBackingMap의 multiGet()을 이용해서 (키: OpaqueValue) 목록을 구한다.
  • 앞서 구한 그룹 연산의 결과를 반영한 새로운 (키: OpaqueValue) 목록을 생성하고
  • 그 결과를 IBackingMap의 multiPut()을 이용해서 반영한다.

즉, multiPut() 메서드는 파라미터로 전달받은 (키: OpaqueValue) 목록을 이용해서 그 결과를 저장소에 반영하면 된다. 위 코드는 메모리 맵은 sumMap에 반영했지만, 실제 구현에서는 HBase나 MySQL과 같은 외부 저장소에 반영하게 될 것이다. 연산 결과를 반영할 때 주의할 점은 OpaqueMap이 제대로 동작하도록 (트랜잭션ID, 키, 이전값, 현재값)이 올바르게 적용되어야 한다는 점이다.


IBackingMap을 구현했다면 상태 생성을 위한 StateFactory를 구현하는 것이다. OpaqueMap을 위한 StateFactory는 다음과 같이 간단하게 구현할 수 있다.


public class CountSumStateFactory implements StateFactory {


    @SuppressWarnings("rawtypes")

    @Override

    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {

        return OpaqueMap.build(new CountSumBackingMap());

    }


}


키 별로 개수를 구하는 집합 연산을 다음과 같이 구현해 볼 수 있다.


public class CountReducerAggregator implements ReducerAggregator<Long> {


    private static final long serialVersionUID = 1L;


    @Override

    public Long init() {

        return 0L;

    }


    @Override

    public Long reduce(Long curr, TridentTuple tuple) {

        return curr + 1L;

    }


}


이제 필요한 것은 다 만들었으므로, 다음과 같이 persistentAggregate() 메서드를 이용해서 그룹핑 연산 결과를 상태에 보관할 수 있게 된다.


TridentTopology topology = new TridentTopology();

topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .groupBy(new Fields("productId:time"))

        .persistentAggregate(

                new CountSumStateFactory(), // 상태 팩토리

                new CountReducerAggregator(), // 집합 연산

                new Fields("sum")) // 집합 연산 결과 필드

        .newValuesStream() // (productId:time, sum) 튜플 생성

        .each(new Fields("productId:time", "sum"), new ThresholdFilter())

        .each(new Fields("productId:time", "sum"), new AlertFilter())


그룹핑 연산이므로 그룹핑 기준 튜플의 값이 키가 되고, 집한 연산 결과가 값이 된다. 즉, "productId:time" 필드 값이 키가 되고 집합 연산 결과인 "sum" 필드가 값이 된다. 위 코드에서 한 가지 빠진 것이 있는데, 그것은 바로 StateUpdater이다. 앞서 partionPersist() 예제에서는 StateUpdater를 사용했는데, 위 예에서는 StateUpdater를 따로 구현하지 않고 있다. StateUpdater를 지정하지 않은 이유는 persistentAggregate() 메서드가 내부적으로 OpaqueMap에 알맞은 StateUpdater 구현체를 사용하고 있기 때문이다.


TransactionalMap 구현 방법: IBackingMap, TransactionalValue 사용


TransactionalMap을 사용하는 방법은 OpaqueMap을 사용하는 방법과 거의 동일하다. 둘 다 알맞은 IBackingMap 구현 클래스만 제공해주면 된다. 차이점이 있다면 TransactionalMap은 OpaqueValue 대신에 TransactionalMap을 사용한다는 것이다. 다음은 TransactionalMap을 위한 IBackingMap의 구현 예를 보여주고 있다.


public class CountSumBackingMap2 implements IBackingMap<TransactionalValue<Long>> {

    private Map<String, CountValue2> sumMap = new ConcurrentHashMap<>();


    @Override

    public List<TransactionalValue<Long>> multiGet(List<List<Object>> keys) {

        List<TransactionalValue<Long>> values = new ArrayList<>();

        for (List<Object> keyVals : keys) {

            String key = (String) keyVals.get(0);

            CountValue2 countValue = sumMap.get(key);

            if (countValue == null) {

                values.add(null);

            } else {

                values.add(

                    new TransactionalValue<Long>(countValue.getTxId(), countValue.getCount()));

            }

        }

        return values;

    }


    @Override

    public void multiPut(List<List<Object>> keys, List<TransactionalValue<Long>> vals) {

        for (int i = 0; i < keys.size(); i++) {

            String key = (String) keys.get(i).get(0);

            TransactionalValue<Long> val = vals.get(i);


            CountValue2 countValue = sumMap.get(key);

            if (countValue == null) {

                sumMap.put(key, new CountValue2(val.getTxid(), val.getVal()));

            } else {

                countValue.update(val.getTxid(), val.getVal());

            }

        }


    }


}


StateFactory는 동일한 방식으로 구현한다.


public class CountSumStateFactory2 implements StateFactory {


    private static final long serialVersionUID = 1L;


    @SuppressWarnings("rawtypes")

    @Override

    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {

        return TransactionalMap.build(new CountSumBackingMap2());

    }


}


스톰 트라이던트 상태 구현 예

  • Memcached를 위한 State 구현: https://github.com/nathanmarz/trident-memcached
  • 소스 코드: https://github.com/madvirus/storm-sample


Posted by 최범균 madvirus

댓글을 달아 주세요

아파치 스톰(Storm) 프로젝트는 실시간 데이터 분석을 위한 기반 플랫폼이다. 스톰을 이용하면 데이터를 병렬로 가공할 수 있는 클러스터를 구축할 수 있으며, 이 클러스터를 이용해서 실시간으로 발생하는 데이터를 가공하고 분석하는 작업을 수행할 수 있다. 스톰에 대한 소개는 이전에 올렸던 'Storm 훑어보기'(http://javacan.tistory.com/324) 자료로 대신하고, 본 글에선 스톰 트라이던트의 연산 처리와 관련된 API의 사용 방법에 대해 살펴볼 것이다.


만들 예제


본 글에서는 다음과 같은 간단한 예제를 만들어보면서 스톰 트라이던트 API의 연산 처리와 관련된 API를 알아볼 것이다.

  • 실시간 로그를 읽어와,
  • 로그가 판매 로그면,
  • 로그를 파싱하고,
  • 1분당 판매 개수가 50개를 넘기면 상품이 있으면,
  • 통지한다.

스톰 트라이던트 프로그래밍의 구성 요소


스톰 클러스터를 이용해서 데이터를 처리하려면 다음의 세 가지 종류의 코드를 작성해야 한다.

  • 트라이던트 스파우트(Spout): 튜플을 발생시킨다. Kafka와 같은 서버에서 메시지를 읽어와 튜플로 변환해서 스톰 스트림에 넣어준다.
  • 연산 처리
    • Function: 튜플을 입력받아 튜플을 생성한다.
    • Filter: 튜플을 입력받아 연산을 계속할지 여부를 결정한다.
    • 그룹핑: 튜플을 특정 필드를 이용해서그룹핑한다.
  • 토폴로지(Topology): 스파우트와 연산 구성 요소를 연결하고 병렬 처리 등을 설정한다.
여기서 만들 토폴로지 구성은 아래 그림과 같다.


스파우트는 로그를 튜플로 생성한다. 튜플(tuple)은 데이터를 보관하는 단위로서, Function과 Filter는 한 개의 튜플에 대해 연산을 수행한다. 여기서 만들 스파우트는 단순히 로그 문자열을 튜플에 담아 스트림에 넣는다.


모든 로그가 아닌 주문 로그만 처리하면 되므로, 주문 로그인지 여부를 확인하는 필터를 맨 앞에 위치시킨다. 필터는 로그가 주문 필터인 경우에만 튜플을 스트림의 다음 단계로 전달한다. 이후 과정에서는 로그 문자열을 파싱해서 ShopLog라는 객체로 변환하고, 그룹핑 기준으로 사용할 '제품ID:시간'을 튜플에 추가한다.


'제품ID:시간'으로 튜플을 그룹핑 한뒤 뒤, 각 '제품ID:시간' 별로 개수를 구한다. 그리고, '제품ID:시간' 별 개수를 누적하고(기 별 개수 합 함수), 각 누적 개수가 임계값에 도달하면 통지 필터에 해당 튜플을 전달한다.


트라이던트 스파우트 구현


트라이던트 스파우트(이하 스파우트)는 기본적으로 배치 단위로 튜플을 생성한다. 다음 글에서 더 자세히 다루겠지만, 스파우트가 생성하는 각 튜플은 한 트랜잭션 ID(배치)에 포함된다. 예를 들어, 스파우트는 한 트랜잭션ID에 해대 튜플을 5개씩 생성할 수 있고, 이 때 스트림은 한 번에 5개의 튜플을 처리하게 된다. 만약 토폴로지에서 튜플을 처리하는 도중 에러가 발생하면, 스톰은 특졍 트랜잭션 ID에 해당하는 튜플을 다시 생성하도록 스파우트에 요청한다.


스파우트가 데이터를 어떻게 생성해주냐에 따라 스파우트를 크게 세 가지 종류로 구분할 수 있다.

  • Transactional: 한 트랜잭션 ID에 대해 매 번 동일한 튜플 묶음을 생성함을 보장한다. 한 트랜잭션 ID에 속한 튜플이 다른 트랜잭션ID에 포함되지 않는다.
  • Opaque-transactional: 한 튜플은 한 트랜잭션ID에만 포함된다. 단, 한 트랜잭션ID에 대해 매 번 동일한 튜플 묶음을 생성함을 보장하진 않는다. 
  • Non-transactional: 한 튜플이 한 트랜잭션ID에만 포함된다는 것을 보장하지 않는다.

트라인던트 스파우트를 구현하려면 스톰이 제공하는 인터페이스를 상속받아야 한다. 트라이던트 스파우트 구현을 위한 몇 가지 종류의 인터페이스가 존재하는데, 본 글에서는 그 중에서 ITridentSpout 인터페이스를 이용한 구현 방법에 대해 살펴볼 것이다.


ITridentSpout 인터페이스를 이용한 스파우트 구현


ITridentSpout 인터페이스를 이용해서 스파우트를 구현하려면, 다음의 세 인터페이스에 대한 이해가 필요하다.



Emitter, BatchCoordinator, ITridentSpout는 각각 다음과 같은 역할을 한다.

  • Emitter: 특정 트랜잭션에 속할 튜플을 생성한다.
  • BatchCoordinator: 특정 트랜잭션 ID에 해당하는 메타 데이터를 생성하고, 튜플 생성을 준비한다.
  • ITridentSpout: 스톰에 Emitter 객체와 BatchCoordinator 객체를 제공한다. 또한, Emitter가 생성할 튜플의 필드 정보를 정의한다.
스톰이 사용할 트라이던트 스파우트를 제공하려면 위 세 개의 인터페이스를 알맞게 구현해 주어야 한다. 본 글에서 만들어볼 예제는 어딘가에서 일정 개수의 로그를 읽어와 튜플로 전달한다. 튜플에 포함될 필드는 1개 뿐이고 그 필드의 이름을 "log"라고 한다면, ITridentSpout 인터페이스는 다음과 같이 구현할 수 있다.

@SuppressWarnings("rawtypes")
public class LogSpout implements ITridentSpout<Long> {

    private static final long serialVersionUID = 1L;

    @Override
    public BatchCoordinator<Long> getCoordinator(
             String txStateId, Map conf, TopologyContext context) {
        return new LogBatchCoordinator();
    }

    @Override
    public Emitter<Long> getEmitter(String txStateId, Map conf, TopologyContext context) {
        return new LogEmitter();
    }

    @Override
    public Map getComponentConfiguration() {
        return null;
    }

    @Override
    public Fields getOutputFields() {
        return new Fields("logString");
    }
}

ITrdientSpout의 타입 파라미터를 Long으로 지정했는데, 여기서 Long은 뒤에서 설명할 트랜잭션 메타 데이터의 타입이 된다. getCoordinator()와 getEmitter()는 각각 Coordinator와 Emitter를 생성하면 된다. 이 두 메서드의 txStateId는 스트림을 생성할 때 입력한 값이 사용된다.

TridentTopology topology = new TridentTopology();
topology.newStream("log", new LogSpout()) // "log"가 txStateId 값으로 사용
    .each(new Fields("logString"), new OrderLogFilter())
    ....

getCoordinator()와 getEmitter()의 conf 파라미터는 getComponentConfiguration() 메서드에서 리턴한 Map 객체가 전달된다.

다음은 BatchCoordinator 인터페이스를 구현한 클래스의 예다.

public class LogBatchCoordinator implements BatchCoordinator<Long> {
    private static final Logger LOG = LoggerFactory.getLogger(LogBatchCoordinator.class);

    @Override
    public Long initializeTransaction(long txid, Long prevMetadata, Long currMetadata) {
        if (prevMetadata == null) {
            return 1L;
        }
        return prevMetadata + 1L;
    }

    @Override
    public void success(long txid) {
        LOG.info("BatchCoordinator.success(txid={})", txid);
    }

    @Override
    public boolean isReady(long txid) {
        // 튜플을 생성할 준비가 되면 true를 리턴한다.
        LOG.info("BatchCoordinator.isReady({})", new Object[] { txid });
        return true;
    }

    @Override
    public void close() {
        LOG.info("BatchCoordinator.close()");
    }
}

BatchCoordinator 인터페이스의 각 메서드는 다음과 같은 기능을 제공해야 한다.

 메서드

설명 

 isReady

트랜잭션ID에 해당하는 튜플을 생성할 준비가 되었으면 true를 리턴한다.

 initializeTransaction

튜플을 발생하기 전에 현재 트랜잭션ID를 위한 메타데이터를 생성한다. 각 파라미터는 다음과 같다.

  • txid: 현재 트랜잭션 ID
  • prevMetadata: 이전 트랜잭션의 메타 데이터
  • currMetadata: 현재 트랜잭션을 처음 시도하면 null. 현재 트랜잭션을 재시도하는 것이면, 앞서 최초 시도할 때 생성한 메타 데이터.
 success

 트랜잭션ID에 해당하는 튜플들을 성공적으로 처리하면 호출된다.

 close 토폴로지를 종료할 때 호출된다.


Emitter 인터페이스의 구현 예는 다음과 같다.


public class LogEmitter implements Emitter<Long> {

    private static final Logger LOG = LoggerFactory.getLogger(LogEmitter.class);


    @Override

    public void emitBatch(TransactionAttempt tx, Long coordinatorMeta,

            TridentCollector collector) {

        LOG.info("Emitter.emitBatch({}, {}, collector)", tx, coordinatorMeta);

        List<String> logs = getLogs(coordinatorMeta);

        for (String log : logs) {

            List<Object> oneTuple = Arrays.<Object> asList(log);

            collector.emit(oneTuple);

        }

    }


    private List<String> getLogs(Long coordinatorMeta) {

        List<String> logs = new ArrayList<String>();

        ... // 로그를 어딘가에서 읽어와 List에 담음

        return logs;

    }


    @Override

    public void success(TransactionAttempt tx) {

        LOG.info("Emitter.success({})", tx);

    }


    @Override

    public void close() {

        LOG.info("Emitter.close()");

    }


}


Emitter 인터페이스의 각 메서드는 다음 기능을 제공한다.


메서드 

설명 

 emitBatch

트랜잭션ID에 속할 튜플을 생성한다. 각 파라미터는 다음과 같은 용도로 사용된다.

  • tx 파라미터: 트랜잭션ID와 재시도회수 제공
  • coordinatorMeta: BatchCoordinator가 생성한 메타 데이터
  • collector: 트랜잭션ID에 속하는 튜플을 토폴로지에 추가
 success

 트랜잭션에 속한 모든 튜플을 성공적으로 처리했을 때 실행된다.

 close

 토폴로지를 종료할 때 실행된다.


스톰 토폴로지를 구현할 때 어려운 부분 중 하나가 스파우트다. 스파우트의 종류에는 Transactional, Opaque transactional, non-transactional의 세 가지가 있다고 했는데, 이 중 transactional이나 opaque transactional을 구현하려면 트랜잭션ID와 트랜잭션 메타데이터를 활용해야 한다. 예를 들어, Emitter는 emitBatch()를 실행할 때 캐시에 트랜잭션ID를 키로 하고 생성한 튜플 목록을 값으로 하는 (키, 값) 쌍을 외부의 메모리 캐시에 보관할 수 있을 것이다. 이후, 튜플 처리에 실패해서 재전송을 해야 한다면 외부 메모리 캐시에서 읽어와 동일한 튜플을 재전송할 수 있을 것이다. 이 경우, success() 메서드에서 외부 메모리 캐시에서 트랜잭션ID에 해당하는 값을 삭제하도록 구현할 것이다.


연산 처리 구성 요소: Function, Filter


튜플을 생성하면 그 다음으로 할 일은 튜플을 가공해서 원하는 결과를 얻어내는 것이다. 보통은 튜플을 가공하고, 걸러내는 과정을 거친 뒤에, 집합 연산을 수행하게 된다. 이를 위해 스톰 트라이던트는 Function, Filter, Aggregator를 제공하는데, 먼저 Function과 Filter에 대해 살펴보도록 하자. 이 두 인터페이스와 관련된 상위 타입 및 하위 타입은 아래 그림과 같이 구성되어 있다.



Function과 Filter를 사용할 때 초기화가 필요하면 prepare() 메서드를 이용한다. 반대로 종료시 정리가 필요하면 cleanup() 메서드를 이용한다.


Filter 구현


Filter는 튜플을 걸러내는 기능을 제공한다. Filter 인터페이스에 정의된 isKeep() 메서드는 튜플을 계속해서 스트림에 흘려 보낼지 여부를 결정한다. 다음은 Filter 인터페이스의 구현 예다. isKeep() 메서드를 보면 튜플 데이터 중 "logString"을 읽어와 이 문자열이 "ORDER"로 시작하면 true를 리턴하고 그렇지 않으면 false를 리턴한다. 따라서, 로그 문자열 값이 "ORDER"로 시작하는 것만 토폴로지의 다음 단계에 전달된다.


public class OrderLogFilter extends BaseFilter {

    private static final long serialVersionUID = 1L;

    private static final Logger LOG = LoggerFactory.getLogger(OrderLogFilter.class);


    @Override

    public boolean isKeep(TridentTuple tuple) {

        String logString = tuple.getStringByField("logString");

        boolean pass = logString.startsWith("ORDER");

        if (!pass)

            LOG.info("OrderLogFilter filtered out {}", logString);

        return pass;

    }


}


위 필터를 사용한 토폴로지 구성 코드 예는 다음과 같다. 이 코드를 보면 LogSpout에 의해 생성된 튜플을 OrderLogFilter로 먼저 필터링하고 있다. 따라서, 튜플에 포함된 "logString"의 값이 "ORDER"로 시작하는 것만 그 다음 단계인 LogParser에 전달된다.


TridentTopology topology = new TridentTopology();

topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

       ...코드생략


each() 메서드의 첫 번째 파라미터는 Filter.isKeep() 메서드에서 사용할 튜플 필드 목록을 뜻한다. 즉, 위 코드의 경우 OrderLogFilter의 isKeep() 메서드는 "logString" 필드만 사용할 수 있다. 이 예에서는 LogSpout가 [logString:값]인 튜플을 생성하기 때문에, 사용할 필드가 "logString" 밖에 없지만, 만약 여러 개의 필드를 생성한다면 new Fields("field1", "field2")처럼 여러 필드명을 지정할 수 있다.


Function 구현


Function은 튜플에 새로운 데이터를 추가할 때 사용한다. Function 인터페이스는 튜플을 가공하기 위한 execute() 메서드를 정의하고 있으며, 이 메서드에서 튜플을 알맞게 가공하면 된다. 다음은 Function 인터페이스의 구현 예를 보여주고 있다.


public class LogParser extends BaseFunction {


    private static final long serialVersionUID = 1L;


    @Override

    public void execute(TridentTuple tuple, TridentCollector collector) {

        String log = (String) tuple.getStringByField("logString");

        ShopLog event = parseLog(log);

        collector.emit(Arrays.<Object>asList(event));

    }


    private ShopLog parseLog(String log) {

        String[] tokens = log.split(",");

        return new ShopLog(tokens[0], Long.parseLong(tokens[1]), Long.parseLong(tokens[2]));

    }


}


execute() 메서드는 tuple로부터 가공에 필요한 필드 값을 구한다. 구한 필드 값으로 알맞은 처리를 한 뒤, collector.emit()을 이용해서 결과로 생성할 필드 데이터를 추가한다. collector.emit() 메서드는 List<Object> 타입을 인자로 받는데, 이 때 List에 담긴 각 값이 토폴로지 정의시 지정한 추가 필드가 된다. 예를 들어, 다음과 같이 LogParser Function을 사용한 경우, 위 코드의 execute() 메서드에서 collector.emit() 메서드에서 추가한 값 목록은 아래 코드의 each() 메서드에서 세 번째 파라미터로 지정한 필드 목록에 매칭된다.


topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        ...코드생략


Function 사용시 유희할 점은 Function은 기존 튜플의 필드를 제거하지 않고, 새로운 튜플을 추가한다는 점이다. 예를 들어, 위 코드를 보면 LogSpout는 [logString:값]인 튜플을 생성한다. LogParser를 거치면 튜플은 [logString:값, shopLog:값] 이 된다. 이 상태에서 AddGroupingValueFunction을 거치면 튜플은 [logString:값, shopLog:값, productId:time:값]이 된다. each() 메서드의 첫 번째 파라미터는 이 튜플 필드 중 Function이나 Filter에 전달할 필드 목록을 지정하는 것이며, 세 번째 파라미터는 Function의 실행 결과로 추가되는 필드 목록을 지정하는 것이다.


참고로, AddGroupingValueFunction 클래스는 로그를 그룹핑할 때 사용할 필드를 추가하는 기능을 제공한다. 이 클래스는 새로운 필드로 "제품ID:기준시간" 값을 추가한다. 각 제품의 분 당 판매개수를 구할 것이기 때문에, 타임스탬프 값을 60000으로 나눈 값을 기준시간 으로 사용해 봤다.


public class AddGroupingValueFunction extends BaseFunction {


    private static final long serialVersionUID = 1L;


    @Override

    public void execute(TridentTuple tuple, TridentCollector collector) {

        ShopLog shopLog = (ShopLog) tuple.getValueByField("shopLog");

        long time = shopLog.getTimestamp() / (1000L * 60L);

        collector.emit(Arrays.<Object>asList(shopLog.getProductId() + ":" + time));

    }


}


연산처리 구성 요소: 그룹핑과 집합 연산


트라이던트로 스트림을 처리하면 많은 경우 집합 연산을 하게 된다. 예를 들어, 예제의 경우는 분당 제품 판매 개수를 구해야 하는데, 이는 "제품ID:시간"을 기준으로 판매 개수를 구하는 집합 연산을 필요로 한다. 이런 집합 연산을 처리할 수 있도록 트라이던트 API는 다음의 두 가지를 제공하고 있다.

  • 그룹 스트림: 배치에 속한 튜플들을 특정 필드를 기준으로 그룹핑한다. 이 스트림은 그룹 단위로 연산을 적용한다.
  • 집합 연산: 배치에 속한 튜플을 모아 단일 결과(튜플)를 생성한다.

이 두 기능을 사용하면 특정 필드를 기준으로 개수를 구한다거나 합을 구하는 등의 연산을 수행할 수 있다. 그룹 스트림을 생성하는 방법은 매우 간단하다. 토폴로지 정의에서 groupBy() 메서드를 사용하면 된다.


topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .groupBy(new Fields("productId:time"))

        .aggregate(new Fields("productId:time"), new Count(), new Fields("count"))

        ...생략


groupBy() 메서드는 지정한 필드를 이용해서 튜플을 그룹핑하는 GroupedStream을 리턴한다.



GroupedStream의 aggregate() 메서드를 사용하면 각 그룹별로 집합 연산을 수행하게 된다. 예를 들어, 아래 코드는 groupBy로 생성된 그룹에 대해 "productId:time" 필드를 기준으로 Count 집합 연산을 수행한다.


topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .groupBy(new Fields("productId:time"))

        .aggregate(new Fields("productId:time"), new Count(), new Fields("count"))

        ...생략


aggregate() 메서드의 세 번째 파라미터는 집합 연산 결과로 생성되는 필드를 정의한다. 위 코드의 경우 Count 집합 연산의 결과로 생성되는 필드가 한 개임을 알 수 있다. aggregate는 새로운 튜플을 생성하는데, GroupedStream의 aggregate는 첫 번째 파라미터로 지정한 필드와 세 번째 파라미터로 지정한 필드를 갖는 튜플을 생성한다. 즉, 위 코드의 경우 aggregate 메서드의 결과로 생성되는 튜플은 ["productId:time":값, "count":값] 으로 구성된다.


집합 연산 인터페이스


스톰 트라이던트 API는 집합 연산을 위한 인터페이스 세 개를 제공하며, 이들 인터페이스는 다음과 같다.



집합 연산: ReducerAggregator


ReducerAggregator 인터페이스는 다음과 같이 정의되어 있다.


public interface ReducerAggregator<T> extends Serializable {

    T init();

    T reduce(T curr, TridentTuple tuple);

}


ReducerAggregator의 동작 방식은 다음과 같다.

  1. 배치(또는 그룹)의 첫 튜플을 처리하기 전에 init()으로 초기값을 생성한다.
  2. 이 초기값을 현재값(curr)으로 설정한다.
  3. 각 튜플마다
    1. reduce(curr, 튜플)을 실행한다.
    2. 3-1의 결과를 curr에 설정한다.
  4. 마지막 튜플에 대한 reduct()의 결과값을 aggregate의 결과값으로 사용한다.
간단하게 튜플의 특정 필드의 합을 구하는 집합 연산을 ReducerAggregator로 구현해보면 다음과 같이 구할 수 있다.

public class Sum implements ReducerAggregator<Long> {
    private static final long serialVersionUID = 1L;

    @Override
    public Long init() {
        return 0L;
    }

    @Override
    public Long reduce(Long curr, TridentTuple tuple) {
        return curr + tuple.getLong(0);
    }
}


집합 연산: CombinerAggregator


CombinerAggregator 인터페이스는 다음과 같이 정의되어 있다.


public interface CombinerAggregator<T> extends Serializable {

    T init(TridentTuple tuple);

    T combine(T val1, T val2);

    T zero();

}


CombinerAggregator의 동작 방식은 다음과 같다.

  1. 배치(또는 그룹)에 튜플이 있다면, 각 튜플마다
    1. init()으로 튜플에 해당하는 값을 구한다.
    2. combine()을 이용해서 이전 combine()의 결과와 init()의 결과를 조합한 결과를 리턴한다.
      1. 첫 번째 튜플의 경우 이전 combine() 결과 값으로 zero() 값을 사용한다.

스톰은 CombinerAggregator의 구현인 Count를 제공하고 있는 Count의 코드는 다음과 같다. 배치(또는 그룹)의 튜플 개수를 셀 때 사용하는 CombinerAggregator이다.


public class Count implements CombinerAggregator<Long> {


    @Override

    public Long init(TridentTuple tuple) {

        return 1L;

    }


    @Override

    public Long combine(Long val1, Long val2) {

        return val1 + val2;

    }


    @Override

    public Long zero() {

        return 0L;

    }

    

}


집합 연산: Aggregator


Aggregator는 앞서 두 개 인터페이스보다 좀 더 범용적인 인터페이스를 정의하고 있다.


public interface Aggregator<T> extends Operation {

    T init(Object batchId, TridentCollector collector);

    void aggregate(T val, TridentTuple tuple, TridentCollector collector);

    void complete(T val, TridentCollector collector);

}


Aggregator는 다음과 같이 동작한다.

  1. 배치의 튜플을 처리하기 전에 init()을 실행하고, 집합 연산에 필요한 객체 val을 리턴한다.
  2. 각 튜플마다 aggregator 메서드를 호출한다. 첫 번째 파라미터는 1에서 리턴한 객체이다.
  3. 배치의 모든 튜플에 대한 처리가 끝나면 complete 메서드를 호출한다.
ReducerAggregator와 CombinerAggregator가 최종적으로 1개의 값을 생성하도록 제한된 인터페이스인 반면에, Aggregator는 한 개 이상의 튜플을 생성할 수 있다. 예를 들어, 다음은 배치(또는 그룹)에 있는 튜플 중에서 정렬 기준으로 N개의 튜플을 골라내는 Aggregator의 구현 코드이다. (스톰이 제공하는 클래스다.)

public static class FirstNSortedAgg extends BaseAggregator<PriorityQueue> {

    int _n;
    String _sortField;
    boolean _reverse;
    
    public FirstNSortedAgg(int n, String sortField, boolean reverse) {
        _n = n;
        _sortField = sortField;
        _reverse = reverse;
    }

    @Override
    public PriorityQueue init(Object batchId, TridentCollector collector) {
        return new PriorityQueue(_n, new Comparator<TridentTuple>() {
            @Override
            public int compare(TridentTuple t1, TridentTuple t2) {
                Comparable c1 = (Comparable) t1.getValueByField(_sortField);
                Comparable c2 = (Comparable) t2.getValueByField(_sortField);
                int ret = c1.compareTo(c2);
                if(_reverse) ret *= -1;
                return ret;
            }                
        });
    }

    @Override
    public void aggregate(PriorityQueue state, TridentTuple tuple, TridentCollector collector) {
        state.add(tuple);
    }

    @Override
    public void complete(PriorityQueue val, TridentCollector collector) {
        int total = val.size();
        for(int i=0; i<_n && i < total; i++) {
            TridentTuple t = (TridentTuple) val.remove();
            collector.emit(t);
        }
    }
}  


Aggregator를 이용해서 튜플 개수를 세는 코드는 다음과 같이 구현하고 있다.


public class CountAsAggregator extends BaseAggregator<CountAsAggregator.State> {


    static class State {

        long count = 0;

    }

    

    @Override

    public State init(Object batchId, TridentCollector collector) {

        return new State();

    }


    @Override

    public void aggregate(State state, TridentTuple tuple, TridentCollector collector) {

        state.count++;

    }


    @Override

    public void complete(State state, TridentCollector collector) {

        collector.emit(new Values(state.count));

    }

    

}


토플로지 구성 및 로컬 실행


스파우트와 Function, Filter, 집합 연산을 구현했다면, 남은 일은 TridentTopology를 이용해서 토폴로지를 구성하고 실행하는 것이다. 아래 코드는 토폴로지를 구성하고 로컬에서 실행하는 예제 코드를 보여주고 있다.


public class LogTopology {


    public static void main(String[] args) {

        TridentTopology topology = new TridentTopology();

        topology.newStream("log", new LogSpout())

                .each(new Fields("logString"), new OrderLogFilter())

                .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

                .each(new Fields("shopLog"), 

                         new AddGroupingValueFunction(), new Fields("productId:time"))

                .groupBy(new Fields("productId:time"))

                .aggregate(new Fields("productId:time"), new Count(), new Fields("count"))

                .each(new Fields("productId:time", "count"), 

                         new CountSumFunction(), new Fields("sum"))

                .each(new Fields("productId:time", "sum"), new ThresholdFilter())

                .each(new Fields("productId:time", "sum"), new AlertFilter());

        StormTopology stormTopology = topology.build();


        Config conf = new Config();

        conf.put("ThresholdFilter.value", 5L);

        LocalCluster cluster = new LocalCluster();

        cluster.submitTopology("cdc", conf, stormTopology);


        try {

            Thread.sleep(60000);

        } catch (InterruptedException e) {

        }

        cluster.shutdown();

    }


}


글에서 보여주지 않은 코드인 CountSumFunction, ThresholdFilter, AlertFilter 등은 참고자료에 있는 예제 코드 링크를 참고하기 바란다.


참고자료

  • 예제 코드: https://github.com/madvirus/storm-sample





Posted by 최범균 madvirus

댓글을 달아 주세요

  1. 96480XX 2014.10.01 15:10 신고  댓글주소  수정/삭제  댓글쓰기

    내가 아는 사람중에
    뭐 이렇게 좋은 내용의 블로그를 운영하는 사람이 있었다니 ...