주요글: 도커 시작하기
반응형

아파치 스톰(Storm) 프로젝트는 실시간 데이터 분석을 위한 기반 플랫폼이다. 스톰을 이용하면 데이터를 병렬로 가공할 수 있는 클러스터를 구축할 수 있으며, 이 클러스터를 이용해서 실시간으로 발생하는 데이터를 가공하고 분석하는 작업을 수행할 수 있다. 스톰에 대한 소개는 이전에 올렸던 'Storm 훑어보기'(http://javacan.tistory.com/324) 자료로 대신하고, 본 글에선 스톰 트라이던트의 연산 처리와 관련된 API의 사용 방법에 대해 살펴볼 것이다.


만들 예제


본 글에서는 다음과 같은 간단한 예제를 만들어보면서 스톰 트라이던트 API의 연산 처리와 관련된 API를 알아볼 것이다.

  • 실시간 로그를 읽어와,
  • 로그가 판매 로그면,
  • 로그를 파싱하고,
  • 1분당 판매 개수가 50개를 넘기면 상품이 있으면,
  • 통지한다.

스톰 트라이던트 프로그래밍의 구성 요소


스톰 클러스터를 이용해서 데이터를 처리하려면 다음의 세 가지 종류의 코드를 작성해야 한다.

  • 트라이던트 스파우트(Spout): 튜플을 발생시킨다. Kafka와 같은 서버에서 메시지를 읽어와 튜플로 변환해서 스톰 스트림에 넣어준다.
  • 연산 처리
    • Function: 튜플을 입력받아 튜플을 생성한다.
    • Filter: 튜플을 입력받아 연산을 계속할지 여부를 결정한다.
    • 그룹핑: 튜플을 특정 필드를 이용해서그룹핑한다.
  • 토폴로지(Topology): 스파우트와 연산 구성 요소를 연결하고 병렬 처리 등을 설정한다.
여기서 만들 토폴로지 구성은 아래 그림과 같다.


스파우트는 로그를 튜플로 생성한다. 튜플(tuple)은 데이터를 보관하는 단위로서, Function과 Filter는 한 개의 튜플에 대해 연산을 수행한다. 여기서 만들 스파우트는 단순히 로그 문자열을 튜플에 담아 스트림에 넣는다.


모든 로그가 아닌 주문 로그만 처리하면 되므로, 주문 로그인지 여부를 확인하는 필터를 맨 앞에 위치시킨다. 필터는 로그가 주문 필터인 경우에만 튜플을 스트림의 다음 단계로 전달한다. 이후 과정에서는 로그 문자열을 파싱해서 ShopLog라는 객체로 변환하고, 그룹핑 기준으로 사용할 '제품ID:시간'을 튜플에 추가한다.


'제품ID:시간'으로 튜플을 그룹핑 한뒤 뒤, 각 '제품ID:시간' 별로 개수를 구한다. 그리고, '제품ID:시간' 별 개수를 누적하고(기 별 개수 합 함수), 각 누적 개수가 임계값에 도달하면 통지 필터에 해당 튜플을 전달한다.


트라이던트 스파우트 구현


트라이던트 스파우트(이하 스파우트)는 기본적으로 배치 단위로 튜플을 생성한다. 다음 글에서 더 자세히 다루겠지만, 스파우트가 생성하는 각 튜플은 한 트랜잭션 ID(배치)에 포함된다. 예를 들어, 스파우트는 한 트랜잭션ID에 해대 튜플을 5개씩 생성할 수 있고, 이 때 스트림은 한 번에 5개의 튜플을 처리하게 된다. 만약 토폴로지에서 튜플을 처리하는 도중 에러가 발생하면, 스톰은 특졍 트랜잭션 ID에 해당하는 튜플을 다시 생성하도록 스파우트에 요청한다.


스파우트가 데이터를 어떻게 생성해주냐에 따라 스파우트를 크게 세 가지 종류로 구분할 수 있다.

  • Transactional: 한 트랜잭션 ID에 대해 매 번 동일한 튜플 묶음을 생성함을 보장한다. 한 트랜잭션 ID에 속한 튜플이 다른 트랜잭션ID에 포함되지 않는다.
  • Opaque-transactional: 한 튜플은 한 트랜잭션ID에만 포함된다. 단, 한 트랜잭션ID에 대해 매 번 동일한 튜플 묶음을 생성함을 보장하진 않는다. 
  • Non-transactional: 한 튜플이 한 트랜잭션ID에만 포함된다는 것을 보장하지 않는다.

트라인던트 스파우트를 구현하려면 스톰이 제공하는 인터페이스를 상속받아야 한다. 트라이던트 스파우트 구현을 위한 몇 가지 종류의 인터페이스가 존재하는데, 본 글에서는 그 중에서 ITridentSpout 인터페이스를 이용한 구현 방법에 대해 살펴볼 것이다.


ITridentSpout 인터페이스를 이용한 스파우트 구현


ITridentSpout 인터페이스를 이용해서 스파우트를 구현하려면, 다음의 세 인터페이스에 대한 이해가 필요하다.



Emitter, BatchCoordinator, ITridentSpout는 각각 다음과 같은 역할을 한다.

  • Emitter: 특정 트랜잭션에 속할 튜플을 생성한다.
  • BatchCoordinator: 특정 트랜잭션 ID에 해당하는 메타 데이터를 생성하고, 튜플 생성을 준비한다.
  • ITridentSpout: 스톰에 Emitter 객체와 BatchCoordinator 객체를 제공한다. 또한, Emitter가 생성할 튜플의 필드 정보를 정의한다.
스톰이 사용할 트라이던트 스파우트를 제공하려면 위 세 개의 인터페이스를 알맞게 구현해 주어야 한다. 본 글에서 만들어볼 예제는 어딘가에서 일정 개수의 로그를 읽어와 튜플로 전달한다. 튜플에 포함될 필드는 1개 뿐이고 그 필드의 이름을 "log"라고 한다면, ITridentSpout 인터페이스는 다음과 같이 구현할 수 있다.

@SuppressWarnings("rawtypes")
public class LogSpout implements ITridentSpout<Long> {

    private static final long serialVersionUID = 1L;

    @Override
    public BatchCoordinator<Long> getCoordinator(
             String txStateId, Map conf, TopologyContext context) {
        return new LogBatchCoordinator();
    }

    @Override
    public Emitter<Long> getEmitter(String txStateId, Map conf, TopologyContext context) {
        return new LogEmitter();
    }

    @Override
    public Map getComponentConfiguration() {
        return null;
    }

    @Override
    public Fields getOutputFields() {
        return new Fields("logString");
    }
}

ITrdientSpout의 타입 파라미터를 Long으로 지정했는데, 여기서 Long은 뒤에서 설명할 트랜잭션 메타 데이터의 타입이 된다. getCoordinator()와 getEmitter()는 각각 Coordinator와 Emitter를 생성하면 된다. 이 두 메서드의 txStateId는 스트림을 생성할 때 입력한 값이 사용된다.

TridentTopology topology = new TridentTopology();
topology.newStream("log", new LogSpout()) // "log"가 txStateId 값으로 사용
    .each(new Fields("logString"), new OrderLogFilter())
    ....

getCoordinator()와 getEmitter()의 conf 파라미터는 getComponentConfiguration() 메서드에서 리턴한 Map 객체가 전달된다.

다음은 BatchCoordinator 인터페이스를 구현한 클래스의 예다.

public class LogBatchCoordinator implements BatchCoordinator<Long> {
    private static final Logger LOG = LoggerFactory.getLogger(LogBatchCoordinator.class);

    @Override
    public Long initializeTransaction(long txid, Long prevMetadata, Long currMetadata) {
        if (prevMetadata == null) {
            return 1L;
        }
        return prevMetadata + 1L;
    }

    @Override
    public void success(long txid) {
        LOG.info("BatchCoordinator.success(txid={})", txid);
    }

    @Override
    public boolean isReady(long txid) {
        // 튜플을 생성할 준비가 되면 true를 리턴한다.
        LOG.info("BatchCoordinator.isReady({})", new Object[] { txid });
        return true;
    }

    @Override
    public void close() {
        LOG.info("BatchCoordinator.close()");
    }
}

BatchCoordinator 인터페이스의 각 메서드는 다음과 같은 기능을 제공해야 한다.

 메서드

설명 

 isReady

트랜잭션ID에 해당하는 튜플을 생성할 준비가 되었으면 true를 리턴한다.

 initializeTransaction

튜플을 발생하기 전에 현재 트랜잭션ID를 위한 메타데이터를 생성한다. 각 파라미터는 다음과 같다.

  • txid: 현재 트랜잭션 ID
  • prevMetadata: 이전 트랜잭션의 메타 데이터
  • currMetadata: 현재 트랜잭션을 처음 시도하면 null. 현재 트랜잭션을 재시도하는 것이면, 앞서 최초 시도할 때 생성한 메타 데이터.
 success

 트랜잭션ID에 해당하는 튜플들을 성공적으로 처리하면 호출된다.

 close 토폴로지를 종료할 때 호출된다.


Emitter 인터페이스의 구현 예는 다음과 같다.


public class LogEmitter implements Emitter<Long> {

    private static final Logger LOG = LoggerFactory.getLogger(LogEmitter.class);


    @Override

    public void emitBatch(TransactionAttempt tx, Long coordinatorMeta,

            TridentCollector collector) {

        LOG.info("Emitter.emitBatch({}, {}, collector)", tx, coordinatorMeta);

        List<String> logs = getLogs(coordinatorMeta);

        for (String log : logs) {

            List<Object> oneTuple = Arrays.<Object> asList(log);

            collector.emit(oneTuple);

        }

    }


    private List<String> getLogs(Long coordinatorMeta) {

        List<String> logs = new ArrayList<String>();

        ... // 로그를 어딘가에서 읽어와 List에 담음

        return logs;

    }


    @Override

    public void success(TransactionAttempt tx) {

        LOG.info("Emitter.success({})", tx);

    }


    @Override

    public void close() {

        LOG.info("Emitter.close()");

    }


}


Emitter 인터페이스의 각 메서드는 다음 기능을 제공한다.


메서드 

설명 

 emitBatch

트랜잭션ID에 속할 튜플을 생성한다. 각 파라미터는 다음과 같은 용도로 사용된다.

  • tx 파라미터: 트랜잭션ID와 재시도회수 제공
  • coordinatorMeta: BatchCoordinator가 생성한 메타 데이터
  • collector: 트랜잭션ID에 속하는 튜플을 토폴로지에 추가
 success

 트랜잭션에 속한 모든 튜플을 성공적으로 처리했을 때 실행된다.

 close

 토폴로지를 종료할 때 실행된다.


스톰 토폴로지를 구현할 때 어려운 부분 중 하나가 스파우트다. 스파우트의 종류에는 Transactional, Opaque transactional, non-transactional의 세 가지가 있다고 했는데, 이 중 transactional이나 opaque transactional을 구현하려면 트랜잭션ID와 트랜잭션 메타데이터를 활용해야 한다. 예를 들어, Emitter는 emitBatch()를 실행할 때 캐시에 트랜잭션ID를 키로 하고 생성한 튜플 목록을 값으로 하는 (키, 값) 쌍을 외부의 메모리 캐시에 보관할 수 있을 것이다. 이후, 튜플 처리에 실패해서 재전송을 해야 한다면 외부 메모리 캐시에서 읽어와 동일한 튜플을 재전송할 수 있을 것이다. 이 경우, success() 메서드에서 외부 메모리 캐시에서 트랜잭션ID에 해당하는 값을 삭제하도록 구현할 것이다.


연산 처리 구성 요소: Function, Filter


튜플을 생성하면 그 다음으로 할 일은 튜플을 가공해서 원하는 결과를 얻어내는 것이다. 보통은 튜플을 가공하고, 걸러내는 과정을 거친 뒤에, 집합 연산을 수행하게 된다. 이를 위해 스톰 트라이던트는 Function, Filter, Aggregator를 제공하는데, 먼저 Function과 Filter에 대해 살펴보도록 하자. 이 두 인터페이스와 관련된 상위 타입 및 하위 타입은 아래 그림과 같이 구성되어 있다.



Function과 Filter를 사용할 때 초기화가 필요하면 prepare() 메서드를 이용한다. 반대로 종료시 정리가 필요하면 cleanup() 메서드를 이용한다.


Filter 구현


Filter는 튜플을 걸러내는 기능을 제공한다. Filter 인터페이스에 정의된 isKeep() 메서드는 튜플을 계속해서 스트림에 흘려 보낼지 여부를 결정한다. 다음은 Filter 인터페이스의 구현 예다. isKeep() 메서드를 보면 튜플 데이터 중 "logString"을 읽어와 이 문자열이 "ORDER"로 시작하면 true를 리턴하고 그렇지 않으면 false를 리턴한다. 따라서, 로그 문자열 값이 "ORDER"로 시작하는 것만 토폴로지의 다음 단계에 전달된다.


public class OrderLogFilter extends BaseFilter {

    private static final long serialVersionUID = 1L;

    private static final Logger LOG = LoggerFactory.getLogger(OrderLogFilter.class);


    @Override

    public boolean isKeep(TridentTuple tuple) {

        String logString = tuple.getStringByField("logString");

        boolean pass = logString.startsWith("ORDER");

        if (!pass)

            LOG.info("OrderLogFilter filtered out {}", logString);

        return pass;

    }


}


위 필터를 사용한 토폴로지 구성 코드 예는 다음과 같다. 이 코드를 보면 LogSpout에 의해 생성된 튜플을 OrderLogFilter로 먼저 필터링하고 있다. 따라서, 튜플에 포함된 "logString"의 값이 "ORDER"로 시작하는 것만 그 다음 단계인 LogParser에 전달된다.


TridentTopology topology = new TridentTopology();

topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

       ...코드생략


each() 메서드의 첫 번째 파라미터는 Filter.isKeep() 메서드에서 사용할 튜플 필드 목록을 뜻한다. 즉, 위 코드의 경우 OrderLogFilter의 isKeep() 메서드는 "logString" 필드만 사용할 수 있다. 이 예에서는 LogSpout가 [logString:값]인 튜플을 생성하기 때문에, 사용할 필드가 "logString" 밖에 없지만, 만약 여러 개의 필드를 생성한다면 new Fields("field1", "field2")처럼 여러 필드명을 지정할 수 있다.


Function 구현


Function은 튜플에 새로운 데이터를 추가할 때 사용한다. Function 인터페이스는 튜플을 가공하기 위한 execute() 메서드를 정의하고 있으며, 이 메서드에서 튜플을 알맞게 가공하면 된다. 다음은 Function 인터페이스의 구현 예를 보여주고 있다.


public class LogParser extends BaseFunction {


    private static final long serialVersionUID = 1L;


    @Override

    public void execute(TridentTuple tuple, TridentCollector collector) {

        String log = (String) tuple.getStringByField("logString");

        ShopLog event = parseLog(log);

        collector.emit(Arrays.<Object>asList(event));

    }


    private ShopLog parseLog(String log) {

        String[] tokens = log.split(",");

        return new ShopLog(tokens[0], Long.parseLong(tokens[1]), Long.parseLong(tokens[2]));

    }


}


execute() 메서드는 tuple로부터 가공에 필요한 필드 값을 구한다. 구한 필드 값으로 알맞은 처리를 한 뒤, collector.emit()을 이용해서 결과로 생성할 필드 데이터를 추가한다. collector.emit() 메서드는 List<Object> 타입을 인자로 받는데, 이 때 List에 담긴 각 값이 토폴로지 정의시 지정한 추가 필드가 된다. 예를 들어, 다음과 같이 LogParser Function을 사용한 경우, 위 코드의 execute() 메서드에서 collector.emit() 메서드에서 추가한 값 목록은 아래 코드의 each() 메서드에서 세 번째 파라미터로 지정한 필드 목록에 매칭된다.


topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        ...코드생략


Function 사용시 유희할 점은 Function은 기존 튜플의 필드를 제거하지 않고, 새로운 튜플을 추가한다는 점이다. 예를 들어, 위 코드를 보면 LogSpout는 [logString:값]인 튜플을 생성한다. LogParser를 거치면 튜플은 [logString:값, shopLog:값] 이 된다. 이 상태에서 AddGroupingValueFunction을 거치면 튜플은 [logString:값, shopLog:값, productId:time:값]이 된다. each() 메서드의 첫 번째 파라미터는 이 튜플 필드 중 Function이나 Filter에 전달할 필드 목록을 지정하는 것이며, 세 번째 파라미터는 Function의 실행 결과로 추가되는 필드 목록을 지정하는 것이다.


참고로, AddGroupingValueFunction 클래스는 로그를 그룹핑할 때 사용할 필드를 추가하는 기능을 제공한다. 이 클래스는 새로운 필드로 "제품ID:기준시간" 값을 추가한다. 각 제품의 분 당 판매개수를 구할 것이기 때문에, 타임스탬프 값을 60000으로 나눈 값을 기준시간 으로 사용해 봤다.


public class AddGroupingValueFunction extends BaseFunction {


    private static final long serialVersionUID = 1L;


    @Override

    public void execute(TridentTuple tuple, TridentCollector collector) {

        ShopLog shopLog = (ShopLog) tuple.getValueByField("shopLog");

        long time = shopLog.getTimestamp() / (1000L * 60L);

        collector.emit(Arrays.<Object>asList(shopLog.getProductId() + ":" + time));

    }


}


연산처리 구성 요소: 그룹핑과 집합 연산


트라이던트로 스트림을 처리하면 많은 경우 집합 연산을 하게 된다. 예를 들어, 예제의 경우는 분당 제품 판매 개수를 구해야 하는데, 이는 "제품ID:시간"을 기준으로 판매 개수를 구하는 집합 연산을 필요로 한다. 이런 집합 연산을 처리할 수 있도록 트라이던트 API는 다음의 두 가지를 제공하고 있다.

  • 그룹 스트림: 배치에 속한 튜플들을 특정 필드를 기준으로 그룹핑한다. 이 스트림은 그룹 단위로 연산을 적용한다.
  • 집합 연산: 배치에 속한 튜플을 모아 단일 결과(튜플)를 생성한다.

이 두 기능을 사용하면 특정 필드를 기준으로 개수를 구한다거나 합을 구하는 등의 연산을 수행할 수 있다. 그룹 스트림을 생성하는 방법은 매우 간단하다. 토폴로지 정의에서 groupBy() 메서드를 사용하면 된다.


topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .groupBy(new Fields("productId:time"))

        .aggregate(new Fields("productId:time"), new Count(), new Fields("count"))

        ...생략


groupBy() 메서드는 지정한 필드를 이용해서 튜플을 그룹핑하는 GroupedStream을 리턴한다.



GroupedStream의 aggregate() 메서드를 사용하면 각 그룹별로 집합 연산을 수행하게 된다. 예를 들어, 아래 코드는 groupBy로 생성된 그룹에 대해 "productId:time" 필드를 기준으로 Count 집합 연산을 수행한다.


topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .groupBy(new Fields("productId:time"))

        .aggregate(new Fields("productId:time"), new Count(), new Fields("count"))

        ...생략


aggregate() 메서드의 세 번째 파라미터는 집합 연산 결과로 생성되는 필드를 정의한다. 위 코드의 경우 Count 집합 연산의 결과로 생성되는 필드가 한 개임을 알 수 있다. aggregate는 새로운 튜플을 생성하는데, GroupedStream의 aggregate는 첫 번째 파라미터로 지정한 필드와 세 번째 파라미터로 지정한 필드를 갖는 튜플을 생성한다. 즉, 위 코드의 경우 aggregate 메서드의 결과로 생성되는 튜플은 ["productId:time":값, "count":값] 으로 구성된다.


집합 연산 인터페이스


스톰 트라이던트 API는 집합 연산을 위한 인터페이스 세 개를 제공하며, 이들 인터페이스는 다음과 같다.



집합 연산: ReducerAggregator


ReducerAggregator 인터페이스는 다음과 같이 정의되어 있다.


public interface ReducerAggregator<T> extends Serializable {

    T init();

    T reduce(T curr, TridentTuple tuple);

}


ReducerAggregator의 동작 방식은 다음과 같다.

  1. 배치(또는 그룹)의 첫 튜플을 처리하기 전에 init()으로 초기값을 생성한다.
  2. 이 초기값을 현재값(curr)으로 설정한다.
  3. 각 튜플마다
    1. reduce(curr, 튜플)을 실행한다.
    2. 3-1의 결과를 curr에 설정한다.
  4. 마지막 튜플에 대한 reduct()의 결과값을 aggregate의 결과값으로 사용한다.
간단하게 튜플의 특정 필드의 합을 구하는 집합 연산을 ReducerAggregator로 구현해보면 다음과 같이 구할 수 있다.

public class Sum implements ReducerAggregator<Long> {
    private static final long serialVersionUID = 1L;

    @Override
    public Long init() {
        return 0L;
    }

    @Override
    public Long reduce(Long curr, TridentTuple tuple) {
        return curr + tuple.getLong(0);
    }
}


집합 연산: CombinerAggregator


CombinerAggregator 인터페이스는 다음과 같이 정의되어 있다.


public interface CombinerAggregator<T> extends Serializable {

    T init(TridentTuple tuple);

    T combine(T val1, T val2);

    T zero();

}


CombinerAggregator의 동작 방식은 다음과 같다.

  1. 배치(또는 그룹)에 튜플이 있다면, 각 튜플마다
    1. init()으로 튜플에 해당하는 값을 구한다.
    2. combine()을 이용해서 이전 combine()의 결과와 init()의 결과를 조합한 결과를 리턴한다.
      1. 첫 번째 튜플의 경우 이전 combine() 결과 값으로 zero() 값을 사용한다.

스톰은 CombinerAggregator의 구현인 Count를 제공하고 있는 Count의 코드는 다음과 같다. 배치(또는 그룹)의 튜플 개수를 셀 때 사용하는 CombinerAggregator이다.


public class Count implements CombinerAggregator<Long> {


    @Override

    public Long init(TridentTuple tuple) {

        return 1L;

    }


    @Override

    public Long combine(Long val1, Long val2) {

        return val1 + val2;

    }


    @Override

    public Long zero() {

        return 0L;

    }

    

}


집합 연산: Aggregator


Aggregator는 앞서 두 개 인터페이스보다 좀 더 범용적인 인터페이스를 정의하고 있다.


public interface Aggregator<T> extends Operation {

    T init(Object batchId, TridentCollector collector);

    void aggregate(T val, TridentTuple tuple, TridentCollector collector);

    void complete(T val, TridentCollector collector);

}


Aggregator는 다음과 같이 동작한다.

  1. 배치의 튜플을 처리하기 전에 init()을 실행하고, 집합 연산에 필요한 객체 val을 리턴한다.
  2. 각 튜플마다 aggregator 메서드를 호출한다. 첫 번째 파라미터는 1에서 리턴한 객체이다.
  3. 배치의 모든 튜플에 대한 처리가 끝나면 complete 메서드를 호출한다.
ReducerAggregator와 CombinerAggregator가 최종적으로 1개의 값을 생성하도록 제한된 인터페이스인 반면에, Aggregator는 한 개 이상의 튜플을 생성할 수 있다. 예를 들어, 다음은 배치(또는 그룹)에 있는 튜플 중에서 정렬 기준으로 N개의 튜플을 골라내는 Aggregator의 구현 코드이다. (스톰이 제공하는 클래스다.)

public static class FirstNSortedAgg extends BaseAggregator<PriorityQueue> {

    int _n;
    String _sortField;
    boolean _reverse;
    
    public FirstNSortedAgg(int n, String sortField, boolean reverse) {
        _n = n;
        _sortField = sortField;
        _reverse = reverse;
    }

    @Override
    public PriorityQueue init(Object batchId, TridentCollector collector) {
        return new PriorityQueue(_n, new Comparator<TridentTuple>() {
            @Override
            public int compare(TridentTuple t1, TridentTuple t2) {
                Comparable c1 = (Comparable) t1.getValueByField(_sortField);
                Comparable c2 = (Comparable) t2.getValueByField(_sortField);
                int ret = c1.compareTo(c2);
                if(_reverse) ret *= -1;
                return ret;
            }                
        });
    }

    @Override
    public void aggregate(PriorityQueue state, TridentTuple tuple, TridentCollector collector) {
        state.add(tuple);
    }

    @Override
    public void complete(PriorityQueue val, TridentCollector collector) {
        int total = val.size();
        for(int i=0; i<_n && i < total; i++) {
            TridentTuple t = (TridentTuple) val.remove();
            collector.emit(t);
        }
    }
}  


Aggregator를 이용해서 튜플 개수를 세는 코드는 다음과 같이 구현하고 있다.


public class CountAsAggregator extends BaseAggregator<CountAsAggregator.State> {


    static class State {

        long count = 0;

    }

    

    @Override

    public State init(Object batchId, TridentCollector collector) {

        return new State();

    }


    @Override

    public void aggregate(State state, TridentTuple tuple, TridentCollector collector) {

        state.count++;

    }


    @Override

    public void complete(State state, TridentCollector collector) {

        collector.emit(new Values(state.count));

    }

    

}


토플로지 구성 및 로컬 실행


스파우트와 Function, Filter, 집합 연산을 구현했다면, 남은 일은 TridentTopology를 이용해서 토폴로지를 구성하고 실행하는 것이다. 아래 코드는 토폴로지를 구성하고 로컬에서 실행하는 예제 코드를 보여주고 있다.


public class LogTopology {


    public static void main(String[] args) {

        TridentTopology topology = new TridentTopology();

        topology.newStream("log", new LogSpout())

                .each(new Fields("logString"), new OrderLogFilter())

                .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

                .each(new Fields("shopLog"), 

                         new AddGroupingValueFunction(), new Fields("productId:time"))

                .groupBy(new Fields("productId:time"))

                .aggregate(new Fields("productId:time"), new Count(), new Fields("count"))

                .each(new Fields("productId:time", "count"), 

                         new CountSumFunction(), new Fields("sum"))

                .each(new Fields("productId:time", "sum"), new ThresholdFilter())

                .each(new Fields("productId:time", "sum"), new AlertFilter());

        StormTopology stormTopology = topology.build();


        Config conf = new Config();

        conf.put("ThresholdFilter.value", 5L);

        LocalCluster cluster = new LocalCluster();

        cluster.submitTopology("cdc", conf, stormTopology);


        try {

            Thread.sleep(60000);

        } catch (InterruptedException e) {

        }

        cluster.shutdown();

    }


}


글에서 보여주지 않은 코드인 CountSumFunction, ThresholdFilter, AlertFilter 등은 참고자료에 있는 예제 코드 링크를 참고하기 바란다.


참고자료

  • 예제 코드: https://github.com/madvirus/storm-sample





+ Recent posts