주요글: 도커 시작하기
반응형

스트림으로 들어오는 튜플을 단순히 처리만하고 끝나는 경우는 드물다. 보통은 처리 결과를 최종 또는 중간 단계 어딘가에 저장하게 된다. 예를 들어, 10분당 판매량이 100개가 넘는 상품을 통지해주는 스톰 토폴로지를 만들었다면, 토폴로지를 이용해서 계산한 분당 판매량 데이터를 HBase나 MySQL과 같은 데이터베이스에서 보관하길 원하는데, 이렇게 해야 스톰에 장애가 발생했을 때, 그 이전에 계산한 값을 날리지 않을 수 있으며, 또한, 이렇게 계산한 데이터를 실시간 통계 데이터로 활용할 수 있기 때문이다.


이전글:


상태 추상화


Filter나 Function을 이용해서 계산한 결과 상태를 DB에 보관하는 기능을 구현할 수 있지만, 트라이던트 API는 일관된 방법으로 상태를 처리할 수 있도록 상태를 위한 API를 제공하고 있다. 트라이던트 API는 상태 보관을 위해 다음의 세 가지 인터페이스를 정의하고 있다.

  • State: 상태를 표현
  • StateFactory: State를 생성해주는 팩토리
  • StateUpdater: 배치 단위로 튜플을 State 객체에 반영

트라이던트 API는 위 세 가지 인터페이를 이용해서 상태를 관리한다. 즉, 개발자는 위 세 개의 인터페이스에 대한 구현 클래스만 알맞게 만들어주면 된다. 나머지는 트라이언트가 알아서 처리한다.


세 인터페이스는 다음과 같이 정의되어 있다.


public interface State {

    void beginCommit(Long txid);

    void commit(Long txid);

}


public interface StateFactory extends Serializable {

    State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions);

}


public interface StateUpdater<S extends State> extends Operation {

    void updateState(S state, List<TridentTuple> tuples, TridentCollector collector);

}


스톰은 내부적으로 다음과 같은 코드를 사용해서 튜플을 상태에 반영한다.


// 배치 시작을 상태에 알림

state.beginCommit(txid);


// 배치에 속한 튜플 단위로 상태에 반영하고, collector를 이용해서 새로운 튜플 발생

stateUpdater.updateState(state, tuples, collector)


// 배치 종료를 상태에 알림

state.commit();


스톰은 배치 단위로 튜플을 상태에 반영한다. 배치가 시작되면 이를 State에 알리고(beginCommit 실행), StateUpdater를 이용해서 배치에 속한 튜플을 State에 반영한다(updateState 실행). 배치에 속한 튜플을 모두 State에 반영하면 배치가 끝났음을 State에 알린다.(commit 실행)


위 코드에서 알 수 있는 건, State와 StateUpdater는 쌍을 이룬다는 점이다. 즉, 특정 타입의 State에 상태를 반영하려면 그 State에 알맞은 StateUpdater가 필요하다. 실제로 Stream의 partitionPersist() 메서드를 보면 다음과 같이 특정 타입의 State 객체를 생성하는 StateFactory와 그 State의 상태를 갱신할 때 사용될 StateUpdater를 함께 파라미터로 전달받는 것을 알 수 있다.


partitionPersist(StateFactory stateFactory, Fields inputFields, StateUpdater updater, Fields functionFields)

 


[State와 StateUpdater는 쌍을 이룬다.]


스톰은 일부 State 구현에 대해 이미 쌍을 우리는 StateUpdater 구현을 제공하고 있다. (위 그림에서 MapState와 MapCombinerAggStateUpdater가 스톰이 제공하는 인터페이스/클래스이다.)


State 만들어보기


Stream이 제공하는 메서드는 partitionPersist()인데, 이 이름에서 알 수 있듯이 스트림은 기본적으로 파티션 단위로 상태값을 보관하는 간단한 State 구현을 만들어보자. 이를 통해 State의 동작 방식에 대한 약간의 감을 잡을 수 있을 것이다. 여기서 만들 예제는 중복되지 않은 "제품ID:시간"을 키로 하는 이벤트 발생 여부를 보관하는 State 구현이다.


State 구현은 다음과 같이 간단하게 구현했다.


public class DistinctState implements State {


    private Map<String, ShopLog> map = new ConcurrentHashMap<>();

    private Map<String, ShopLog> temp = new ConcurrentHashMap<>();


    @Override

    public void beginCommit(Long txid) {

        temp.clear();

    }


    @Override

    public void commit(Long txid) {

        for (Map.Entry<String, ShopLog> entry : temp.entrySet())

            map.put(entry.getKey(), entry.getValue());

    }


    public boolean hasKey(String key) {

        return map.containsKey(key) || temp.containsKey(key);

    }


    public void put(String key, ShopLog value) {

        temp.put(key, value);

    }

}


위 코드에서 beginCommmit() 메서드는 temp 맵을 초기화한다. 그리고, commit() 메서드는 temp에 담겨 있는 모든 내용을 map에 복사한다. 여기서는 메모리 맵은 map 필드에 내용을 복사했지만, 실제 구현에서는 외부의 캐시 서버나 DBMS 등에 내용을 복사하도록 구현하게 된다. 즉, 한 배치가 시작될 때 beginCommit() 메서드를 통해 배치에 속한 튜플의 처리 준비를 하고, 한 배치가 끝날 때 commit() 메서드를 통해 배치에 포함된 튜플의 처리 결과를 상태를 보관할 저장소에 반영하게 된다.


배치의 시작과 끝 사이에 사용되는 메서드가 hasKey() 메서드와 put() 메서드이다. 이 두 메서드는 StateUpdater에 의해 사용되는데, DistictState를 위한 StateUpdater 구현 클래스는 다음과 같다.


public class DistinctStateUpdater extends BaseStateUpdater<DistinctState> {


    @Override

    public void updateState(

                DistinctState state, List<TridentTuple> tuples, TridentCollector collector) {

        List<TridentTuple> newEntries = new ArrayList<>();

        for (TridentTuple t : tuples) {

            String key = t.getStringByField("productId:time");

            if (!state.hasKey(key)) {

                state.put(key, (ShopLog) t.getValueByField("shopLog"));

                newEntries.add(t);

            }

        }

        for (TridentTuple t : newEntries) {

            collector.emit( // 상태로부터 새로운 스트림을 생성

                Arrays.asList(t.getValueByField("shopLog"), t.getStringByField("productId:time"))

            );

        }

    }

} 


StateUpdater 구현 클래스는 BaseStateUpdater 클래스를 상속받은 뒤 updateState() 메서드를 알맞게 구현해주면 된다. updateState() 메서드는 첫 번째 파라미터로 상태를 표현하는 객체를, 두 번째 파라미터로 배치에 속한 튜플을, 세 번째 파라미터는 튜플을 상태에 반영한 후 새로운 튜플을 생성할 때 사용할 TridentCollector이다.


updteState() 메서드를 보면 DistinctState의 hasKey() 메서드를 이용해서 처리하려는 튜플의 key 값이 이미 DistinctState에 보관한 key인지 확인한다. 아직 보관전인 key를 가진 튜플이면, put() 메서드를 이용해서 DistinctState에 반영하고, newEntries에 보관한다.


updateState() 메서드는 배치에 포함된 모든 튜플을 상태에 반영한 뒤, collector를 이용해서 newEntries에 보관된 튜플을 재발생시킨다. 이렇게 함으로써 상태 처리 상태로부터 새로운 튜플을 만들어내는 스트림이 만들어지게 된다.


마지막으로 구현해야 할 클래스는 StateFactory 인터페이스의 구현 클래스이다. 이 클래스의 구현은 다음과 같이 간단하다.


public class DistinctStateFactory implements StateFactory {

    private static final long serialVersionUID = 1L;


    @SuppressWarnings("rawtypes")

    @Override

    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {

        return new DistinctState();

    }

}


이제 이렇게 만든 상태 관련 클래스를 실제 트라이던트 토폴로지에 적용해보자.


TridentTopology topology = new TridentTopology();

topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .partitionPersist(

                new DistinctStateFactory()

                new Fields("shopLog", "productId:time"), // StateUpdater에 전달될 튜플의 필드 목록

                new DistinctStateUpdater()

                new Fields("shopLog", "productId:time")) // StateUpdater가 생성하는 튜플 필드 목록

        .newValuesStream() // StateUpdater에서 생성한 튜플로 구성된 스트림

        .each(new Fields("shopLog", "productId:time"), Util.printer());

...


partionPersist() 메서드는 파티션 별로 State를 생성하고 튜플을 상태에 반영하는 기능을 제공한다. 위 코드의 경우 StateFactory로 DisctinctStateFactory를 사용했으므로 파티션 별로 상태 보관을 위해 DistinctState 객체가 생성된다.


partitionPersist() 메서드는 TridentState 객체를 리턴하는데, 이 객체의 newValueStream() 메서드는 DistinctStateUpdater에서 collector를 이용해서 생성한 튜플을 스트림으로 제공하는 새로운 Stream을 리턴한다. 따라서, 위 코드에서 마지막 each() 메서드는 DistinctStateUpdater가 생성한 튜플을 입력으로 받아서 그 내용을 콘솔에 출력하게 된다.


상태와 튜플 재처리 방식


튜플로부터 상태를 보관할 때 고민해야 할 점이 있다면, 그것은 바로 재처리와 관련된 것이다. 아래 그림을 보자. 아 그림에서 persistentAggregate 연산은 앞의 groupBy로 그룹핑 된 튜플의 개수를 적재하는 State를 처리해준다고 해 보자.




State가 키가 "0:123"인 튜플의 개수로 3을 갖고 있었다고 하자. 즉, {"0:123": 3 }을 State가 상태로 보관하고 있었다고 하자. 이 상태에서 위 그림의 윗 부분의 실행되었다고 하자. 이 때 스파우트가 발생한 "0:123"인 튜플의 개수가 4라면, persistenAggregate를 거친 뒤에 상태 값은 {"0:123": 7}이 될 것이다. 그런데, AlertFunction 연산에서 실패가 나는 바람에 스톰이 재처리를 수행했다고 해 보자. 그러면, 위 그림의 아래 부분처럼 튜플을 다시 발생시킬 거고, 그렇게 되면 persistenceAggregate는 이미 {"0:123": 7}을 상태 값으로 갖고 있었기 때문에, 재발송된 튜플 개수 4개가 더해지면서 상태 값이 {"0:123", 11}로 바뀌게 될 것이다.


사실, 여기서 동일한 튜플이 다시 발생한 것이므로 최종 상태 값은 {"0:123", 7}이어야 한다. 그런데, 상태가 튜플의 재발생 여부를 제대로 처리하지 못하게 되면, 같은 튜플이 상태에 중복해서 적용되는 문제가 발생하는 것이다.


이렇게 튜플을 다시 발생했을 때 동일 튜플이 State에 중복해서 적용되는 문제를 처리하기 위해 스톰은 세 가지 타입의 State 구현 객체를 제공하고 있다.



State

설명

TransactionalMap

스파우트가 Transactional인 경우(즉 한 배치에 대해 동일한 튜플을 생성할 경우), 동일 튜플에 대해 동일 상태를 보장한다.

OpaqueMap

스파우트가 Opaque Transactional인 경우(즉 한 튜플은 한 배치에만 속하지만 한 배치에 대해 매번 동일한 튜플이 발생하지 않을수도 있는 경우), 이전 상태 값을 기준으로 상태를 갱신한다.

NonTransactionalMap스파우트가 Non Transactional인 경우(즉 튜플이 특정 배치에만 속한다고 가정할 수 없는 경우), 튜플 재생성에 따른 롤백을 지원하지 않는다.


TransactionalMap의 동작 방식


TransactionalMap은 상태를 보관할 때 (트랜잭션ID, {키: 값})의 구조를 사용한다. 예를 들어, 배치 단위로 그룹바이-카운트를 수행한 결과 아래 표의 왼쪽/중간 컬럼과 같이 (키, 개수)가 생성되었다고 해 보자. 이 경우 표의 우측 컬럼과 같이 상태 값이 갱신된다.


 배치(트랜잭션) ID

 튜플 연산 결과 (키, 개수) 값

상태

 1

{ "0:123": 4 }

(1, {"0:123": 4} ) 

 2

{ "0:123": 5 } 

(2, {"0:123": 9} )

 3

{ "0:123": 3 } 

(3, {"0:123": 12} ) 

 3 (재발생)

{ "0:123": 3 }  
Transactional 스파우트는 동일한 튜플을 발생

(3, {"0:123": 12} )  
(기존 값 그대로 유지)

위 표에서 트랜잭션ID가 3에 해당하는 배치의 튜플이 다시 발생했다. 이 스파우트는 Transactional 타입이므로 같은 배치 ID에 대해 동일한 튜플을 발생시키므로, 튜플의 그룹바이-카운트 연산 결과는 위 표의 가운데 컬럼에서 보는 것처럼 동일하다. 따라서, TransactionalMap은 현재 상태와 동일한 트랜잭션 ID에 해당하는 결과 값이 들어올 경우, 상태에 반영하지 않고 유지하면 된다.


OpaqueMap의 동작 방식


OpaqueMap은 상태를 보관할 때 (트랜잭션ID, 키, 이전값, 현재값)의 구조를 사용한다. 앞의 TransactionalMap과 유사하게 3번 트랜잭션ID에 해당하는 배치를 재처리했다고 해 보자.


 배치(트랜잭션) ID

 튜플 연산 결과 (키, 개수) 값

상태

 1

{ "0:123": 4 }

(1, "0:123", null, 4 ) 

 2

{ "0:123": 5 } 

(2, "0:123", 4 9 )

 3

{ "0:123": 3 } 

(3, "0:123", 9, 12 ) 

 3 (재발생)

{ "0:123": 5 }  

Opaque 스파우트는 발생한 튜플이 달라질 수 있음

(3, "0:123", 9, 14 )  

위 표에서 OpaqueMap은 상태에 값을 보관할 때, 현재 처리한 트랜잭션ID와 이전 값, 현재 값을 함께 보관한다. 만약 특정 문제가 발생해서 스파우트가 특정 트랜잭션 ID에 해당하는 튜플을 재발생한 경우, Opaque 스파우트는 위 표의 중간 컬럼처럼 다른 결과를 만들어낼 수 있다. 따라서 상태는 새롭게 계산된 결과와 이전 상태 값을 이용해서 현재 상태 값을 갱신해주어야 한다. 위 표에서는 3번 트랜잭션ID의 재발생 시점에 이전 상태 값이 {"0:123": 9}이므로, 현재 상태 값은 {"0:123": 9 + 5}인 {"0:123": 14}로 바뀐다.


OpaqueMap 구현 방법: IBackingMap, OpaqueValue 사용


OpaqueMap은 내부적으로 IBackingMap 인터페이스와 OpaqueValue 클래스를 사용한다. OpaqueMap은 상태 데이터를 처리하기 위해 OpaqueValue 클래스를 사용하고, 실제 저장소로부터 상태 값을 조회하거나 저장소에 상태 값을 반영하기 위해 IBackingMap 구현체를 사용한다. 실제 OpaqueMap 인스턴스를 생성하는 메서드는 정적 팩토리 메서드로 다음과 같이 정의되어 있다.


public class OpaqueMap<T> implements MapState<T> {

    public static <T> MapState<T> build(IBackingMap<OpaqueValue> backing) {

        return new OpaqueMap<T>(backing);

    }


실제로 OpaqueMap을 사용하기 위해서 필요한 건 다음과 같다.

  • 상태 보관을 위해 IBackingMap 인터페이스를 알맞게 구현한 클래스를 만든다.
  • 앞서 만든 IBackingMap 구현 객체를 이용해서 생성한 OpaqueMap을 리턴해주는 StateFactory를 구현한다.
  • 집합 연산을 구현한다.
  • GroupedStream의 persistenceAggregate() 메서드를 이용해서 그룹핑 연산 결과를 상태에 반영한다.
    • Stream의 partitionPersist()를 사용할 경우, 알맞은 StateUpdater를 사용한다.

OpaqueMap용 "키:개수" 상태 보관하기 위한 IBackingMap 인터페이스를 구현한 클래스는 다음 코드처럼 만들어 볼 수 있다.


public class CountSumBackingMap implements IBackingMap<OpaqueValue> {

    private Map<String, CountValue> sumMap = new ConcurrentHashMap<>();


    @Override

    public List<OpaqueValue> multiGet(List<List<Object>> keys) {

        List<OpaqueValue> values = new ArrayList<>();

        for (List<Object> keyVals : keys) {

            String key = (String) keyVals.get(0);

            CountValue countValue = sumMap.get(key);

            if (countValue == null) {

                values.add(null);

            } else {

                values.add(new OpaqueValue<Long>(countValue.getCurrentTxId(),

                                 countValue.getCurrCount(), countValue.getPrevCount()));

            }

        }

        return values;

    }


    @Override

    public void multiPut(List<List<Object>> keys, List<OpaqueValue> vals) {

        for (int i = 0; i < keys.size(); i++) {

            String key = (String) keys.get(i).get(0);

            OpaqueValue<Long> val = vals.get(i);


            CountValue countValue = sumMap.get(key);

            if (countValue == null) {

                sumMap.put(key, new CountValue(val.getCurrTxid(), val.getPrev(), val.getCurr()));

            } else {

                countValue.update(val.getCurrTxid(), val.getPrev(), val.getCurr());

            }

        }

    }


}


IBackingMap을 구현하는 클래스는 multiGet() 메서드와 multiPut() 메서드 두 개만 알맞게 구현해주면 된다. 먼저, multiGet() 메서드는 파라미터로 키 목록을 전달받으며, 이 키에 해당하는 OpaqueValue 목록을 리턴해주면 된다. 위 코드의 multiGet() 메서드 메모리 맵인 sumMap에서 키에 해당하는 CountValue 객체를 가져온 뒤, 그 CountValue 객체로부터 OpaqueValue를 생성하고 있다.


CountValue는 (currTxId, prevVal, currVal)을 값으로 갖고 있으므로, sumMap은 결과적으로 (currTxId, key, prevVal, currVal)을 보관하는 장소가 된다. 이 예에서는 메모리 맵을 사용했지만, 실제로는 외부의 DBMS나 NoSQL 같은 곳에 보관된 데이터를 보관하고 읽어올 것이다.


OpaqueValue는 OpaqueMap의 동작 방식에서 설명했던 데이터 구조 중 키를 제외한 (트랜잭션ID, 이전값, 현재값)을 값으로 갖는다. 위 코드에서도 CountValue로부터 OpaqueValue를 생성할 때 트랜잭션ID, 이전값, 현재값을 가져와 생성했다.


스톰은 그룹 연산을 수행할 때 OpaqueMap은 내부적으로 다음과 같은 과정을 거친다.

  • IBackingMap의 multiGet()을 이용해서 (키: OpaqueValue) 목록을 구한다.
  • 앞서 구한 그룹 연산의 결과를 반영한 새로운 (키: OpaqueValue) 목록을 생성하고
  • 그 결과를 IBackingMap의 multiPut()을 이용해서 반영한다.

즉, multiPut() 메서드는 파라미터로 전달받은 (키: OpaqueValue) 목록을 이용해서 그 결과를 저장소에 반영하면 된다. 위 코드는 메모리 맵은 sumMap에 반영했지만, 실제 구현에서는 HBase나 MySQL과 같은 외부 저장소에 반영하게 될 것이다. 연산 결과를 반영할 때 주의할 점은 OpaqueMap이 제대로 동작하도록 (트랜잭션ID, 키, 이전값, 현재값)이 올바르게 적용되어야 한다는 점이다.


IBackingMap을 구현했다면 상태 생성을 위한 StateFactory를 구현하는 것이다. OpaqueMap을 위한 StateFactory는 다음과 같이 간단하게 구현할 수 있다.


public class CountSumStateFactory implements StateFactory {


    @SuppressWarnings("rawtypes")

    @Override

    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {

        return OpaqueMap.build(new CountSumBackingMap());

    }


}


키 별로 개수를 구하는 집합 연산을 다음과 같이 구현해 볼 수 있다.


public class CountReducerAggregator implements ReducerAggregator<Long> {


    private static final long serialVersionUID = 1L;


    @Override

    public Long init() {

        return 0L;

    }


    @Override

    public Long reduce(Long curr, TridentTuple tuple) {

        return curr + 1L;

    }


}


이제 필요한 것은 다 만들었으므로, 다음과 같이 persistentAggregate() 메서드를 이용해서 그룹핑 연산 결과를 상태에 보관할 수 있게 된다.


TridentTopology topology = new TridentTopology();

topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .groupBy(new Fields("productId:time"))

        .persistentAggregate(

                new CountSumStateFactory(), // 상태 팩토리

                new CountReducerAggregator(), // 집합 연산

                new Fields("sum")) // 집합 연산 결과 필드

        .newValuesStream() // (productId:time, sum) 튜플 생성

        .each(new Fields("productId:time", "sum"), new ThresholdFilter())

        .each(new Fields("productId:time", "sum"), new AlertFilter())


그룹핑 연산이므로 그룹핑 기준 튜플의 값이 키가 되고, 집한 연산 결과가 값이 된다. 즉, "productId:time" 필드 값이 키가 되고 집합 연산 결과인 "sum" 필드가 값이 된다. 위 코드에서 한 가지 빠진 것이 있는데, 그것은 바로 StateUpdater이다. 앞서 partionPersist() 예제에서는 StateUpdater를 사용했는데, 위 예에서는 StateUpdater를 따로 구현하지 않고 있다. StateUpdater를 지정하지 않은 이유는 persistentAggregate() 메서드가 내부적으로 OpaqueMap에 알맞은 StateUpdater 구현체를 사용하고 있기 때문이다.


TransactionalMap 구현 방법: IBackingMap, TransactionalValue 사용


TransactionalMap을 사용하는 방법은 OpaqueMap을 사용하는 방법과 거의 동일하다. 둘 다 알맞은 IBackingMap 구현 클래스만 제공해주면 된다. 차이점이 있다면 TransactionalMap은 OpaqueValue 대신에 TransactionalMap을 사용한다는 것이다. 다음은 TransactionalMap을 위한 IBackingMap의 구현 예를 보여주고 있다.


public class CountSumBackingMap2 implements IBackingMap<TransactionalValue<Long>> {

    private Map<String, CountValue2> sumMap = new ConcurrentHashMap<>();


    @Override

    public List<TransactionalValue<Long>> multiGet(List<List<Object>> keys) {

        List<TransactionalValue<Long>> values = new ArrayList<>();

        for (List<Object> keyVals : keys) {

            String key = (String) keyVals.get(0);

            CountValue2 countValue = sumMap.get(key);

            if (countValue == null) {

                values.add(null);

            } else {

                values.add(

                    new TransactionalValue<Long>(countValue.getTxId(), countValue.getCount()));

            }

        }

        return values;

    }


    @Override

    public void multiPut(List<List<Object>> keys, List<TransactionalValue<Long>> vals) {

        for (int i = 0; i < keys.size(); i++) {

            String key = (String) keys.get(i).get(0);

            TransactionalValue<Long> val = vals.get(i);


            CountValue2 countValue = sumMap.get(key);

            if (countValue == null) {

                sumMap.put(key, new CountValue2(val.getTxid(), val.getVal()));

            } else {

                countValue.update(val.getTxid(), val.getVal());

            }

        }


    }


}


StateFactory는 동일한 방식으로 구현한다.


public class CountSumStateFactory2 implements StateFactory {


    private static final long serialVersionUID = 1L;


    @SuppressWarnings("rawtypes")

    @Override

    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {

        return TransactionalMap.build(new CountSumBackingMap2());

    }


}


스톰 트라이던트 상태 구현 예

  • Memcached를 위한 State 구현: https://github.com/nathanmarz/trident-memcached
  • 소스 코드: https://github.com/madvirus/storm-sample


+ Recent posts