주요글: 도커 시작하기
반응형

아파치 스톰(Storm) 프로젝트는 실시간 데이터 분석을 위한 기반 플랫폼이다. 스톰을 이용하면 데이터를 병렬로 가공할 수 있는 클러스터를 구축할 수 있으며, 이 클러스터를 이용해서 실시간으로 발생하는 데이터를 가공하고 분석하는 작업을 수행할 수 있다. 스톰에 대한 소개는 이전에 올렸던 'Storm 훑어보기'(http://javacan.tistory.com/324) 자료로 대신하고, 본 글에선 스톰 트라이던트의 연산 처리와 관련된 API의 사용 방법에 대해 살펴볼 것이다.


만들 예제


본 글에서는 다음과 같은 간단한 예제를 만들어보면서 스톰 트라이던트 API의 연산 처리와 관련된 API를 알아볼 것이다.

  • 실시간 로그를 읽어와,
  • 로그가 판매 로그면,
  • 로그를 파싱하고,
  • 1분당 판매 개수가 50개를 넘기면 상품이 있으면,
  • 통지한다.

스톰 트라이던트 프로그래밍의 구성 요소


스톰 클러스터를 이용해서 데이터를 처리하려면 다음의 세 가지 종류의 코드를 작성해야 한다.

  • 트라이던트 스파우트(Spout): 튜플을 발생시킨다. Kafka와 같은 서버에서 메시지를 읽어와 튜플로 변환해서 스톰 스트림에 넣어준다.
  • 연산 처리
    • Function: 튜플을 입력받아 튜플을 생성한다.
    • Filter: 튜플을 입력받아 연산을 계속할지 여부를 결정한다.
    • 그룹핑: 튜플을 특정 필드를 이용해서그룹핑한다.
  • 토폴로지(Topology): 스파우트와 연산 구성 요소를 연결하고 병렬 처리 등을 설정한다.
여기서 만들 토폴로지 구성은 아래 그림과 같다.


스파우트는 로그를 튜플로 생성한다. 튜플(tuple)은 데이터를 보관하는 단위로서, Function과 Filter는 한 개의 튜플에 대해 연산을 수행한다. 여기서 만들 스파우트는 단순히 로그 문자열을 튜플에 담아 스트림에 넣는다.


모든 로그가 아닌 주문 로그만 처리하면 되므로, 주문 로그인지 여부를 확인하는 필터를 맨 앞에 위치시킨다. 필터는 로그가 주문 필터인 경우에만 튜플을 스트림의 다음 단계로 전달한다. 이후 과정에서는 로그 문자열을 파싱해서 ShopLog라는 객체로 변환하고, 그룹핑 기준으로 사용할 '제품ID:시간'을 튜플에 추가한다.


'제품ID:시간'으로 튜플을 그룹핑 한뒤 뒤, 각 '제품ID:시간' 별로 개수를 구한다. 그리고, '제품ID:시간' 별 개수를 누적하고(기 별 개수 합 함수), 각 누적 개수가 임계값에 도달하면 통지 필터에 해당 튜플을 전달한다.


트라이던트 스파우트 구현


트라이던트 스파우트(이하 스파우트)는 기본적으로 배치 단위로 튜플을 생성한다. 다음 글에서 더 자세히 다루겠지만, 스파우트가 생성하는 각 튜플은 한 트랜잭션 ID(배치)에 포함된다. 예를 들어, 스파우트는 한 트랜잭션ID에 해대 튜플을 5개씩 생성할 수 있고, 이 때 스트림은 한 번에 5개의 튜플을 처리하게 된다. 만약 토폴로지에서 튜플을 처리하는 도중 에러가 발생하면, 스톰은 특졍 트랜잭션 ID에 해당하는 튜플을 다시 생성하도록 스파우트에 요청한다.


스파우트가 데이터를 어떻게 생성해주냐에 따라 스파우트를 크게 세 가지 종류로 구분할 수 있다.

  • Transactional: 한 트랜잭션 ID에 대해 매 번 동일한 튜플 묶음을 생성함을 보장한다. 한 트랜잭션 ID에 속한 튜플이 다른 트랜잭션ID에 포함되지 않는다.
  • Opaque-transactional: 한 튜플은 한 트랜잭션ID에만 포함된다. 단, 한 트랜잭션ID에 대해 매 번 동일한 튜플 묶음을 생성함을 보장하진 않는다. 
  • Non-transactional: 한 튜플이 한 트랜잭션ID에만 포함된다는 것을 보장하지 않는다.

트라인던트 스파우트를 구현하려면 스톰이 제공하는 인터페이스를 상속받아야 한다. 트라이던트 스파우트 구현을 위한 몇 가지 종류의 인터페이스가 존재하는데, 본 글에서는 그 중에서 ITridentSpout 인터페이스를 이용한 구현 방법에 대해 살펴볼 것이다.


ITridentSpout 인터페이스를 이용한 스파우트 구현


ITridentSpout 인터페이스를 이용해서 스파우트를 구현하려면, 다음의 세 인터페이스에 대한 이해가 필요하다.



Emitter, BatchCoordinator, ITridentSpout는 각각 다음과 같은 역할을 한다.

  • Emitter: 특정 트랜잭션에 속할 튜플을 생성한다.
  • BatchCoordinator: 특정 트랜잭션 ID에 해당하는 메타 데이터를 생성하고, 튜플 생성을 준비한다.
  • ITridentSpout: 스톰에 Emitter 객체와 BatchCoordinator 객체를 제공한다. 또한, Emitter가 생성할 튜플의 필드 정보를 정의한다.
스톰이 사용할 트라이던트 스파우트를 제공하려면 위 세 개의 인터페이스를 알맞게 구현해 주어야 한다. 본 글에서 만들어볼 예제는 어딘가에서 일정 개수의 로그를 읽어와 튜플로 전달한다. 튜플에 포함될 필드는 1개 뿐이고 그 필드의 이름을 "log"라고 한다면, ITridentSpout 인터페이스는 다음과 같이 구현할 수 있다.

@SuppressWarnings("rawtypes")
public class LogSpout implements ITridentSpout<Long> {

    private static final long serialVersionUID = 1L;

    @Override
    public BatchCoordinator<Long> getCoordinator(
             String txStateId, Map conf, TopologyContext context) {
        return new LogBatchCoordinator();
    }

    @Override
    public Emitter<Long> getEmitter(String txStateId, Map conf, TopologyContext context) {
        return new LogEmitter();
    }

    @Override
    public Map getComponentConfiguration() {
        return null;
    }

    @Override
    public Fields getOutputFields() {
        return new Fields("logString");
    }
}

ITrdientSpout의 타입 파라미터를 Long으로 지정했는데, 여기서 Long은 뒤에서 설명할 트랜잭션 메타 데이터의 타입이 된다. getCoordinator()와 getEmitter()는 각각 Coordinator와 Emitter를 생성하면 된다. 이 두 메서드의 txStateId는 스트림을 생성할 때 입력한 값이 사용된다.

TridentTopology topology = new TridentTopology();
topology.newStream("log", new LogSpout()) // "log"가 txStateId 값으로 사용
    .each(new Fields("logString"), new OrderLogFilter())
    ....

getCoordinator()와 getEmitter()의 conf 파라미터는 getComponentConfiguration() 메서드에서 리턴한 Map 객체가 전달된다.

다음은 BatchCoordinator 인터페이스를 구현한 클래스의 예다.

public class LogBatchCoordinator implements BatchCoordinator<Long> {
    private static final Logger LOG = LoggerFactory.getLogger(LogBatchCoordinator.class);

    @Override
    public Long initializeTransaction(long txid, Long prevMetadata, Long currMetadata) {
        if (prevMetadata == null) {
            return 1L;
        }
        return prevMetadata + 1L;
    }

    @Override
    public void success(long txid) {
        LOG.info("BatchCoordinator.success(txid={})", txid);
    }

    @Override
    public boolean isReady(long txid) {
        // 튜플을 생성할 준비가 되면 true를 리턴한다.
        LOG.info("BatchCoordinator.isReady({})", new Object[] { txid });
        return true;
    }

    @Override
    public void close() {
        LOG.info("BatchCoordinator.close()");
    }
}

BatchCoordinator 인터페이스의 각 메서드는 다음과 같은 기능을 제공해야 한다.

 메서드

설명 

 isReady

트랜잭션ID에 해당하는 튜플을 생성할 준비가 되었으면 true를 리턴한다.

 initializeTransaction

튜플을 발생하기 전에 현재 트랜잭션ID를 위한 메타데이터를 생성한다. 각 파라미터는 다음과 같다.

  • txid: 현재 트랜잭션 ID
  • prevMetadata: 이전 트랜잭션의 메타 데이터
  • currMetadata: 현재 트랜잭션을 처음 시도하면 null. 현재 트랜잭션을 재시도하는 것이면, 앞서 최초 시도할 때 생성한 메타 데이터.
 success

 트랜잭션ID에 해당하는 튜플들을 성공적으로 처리하면 호출된다.

 close 토폴로지를 종료할 때 호출된다.


Emitter 인터페이스의 구현 예는 다음과 같다.


public class LogEmitter implements Emitter<Long> {

    private static final Logger LOG = LoggerFactory.getLogger(LogEmitter.class);


    @Override

    public void emitBatch(TransactionAttempt tx, Long coordinatorMeta,

            TridentCollector collector) {

        LOG.info("Emitter.emitBatch({}, {}, collector)", tx, coordinatorMeta);

        List<String> logs = getLogs(coordinatorMeta);

        for (String log : logs) {

            List<Object> oneTuple = Arrays.<Object> asList(log);

            collector.emit(oneTuple);

        }

    }


    private List<String> getLogs(Long coordinatorMeta) {

        List<String> logs = new ArrayList<String>();

        ... // 로그를 어딘가에서 읽어와 List에 담음

        return logs;

    }


    @Override

    public void success(TransactionAttempt tx) {

        LOG.info("Emitter.success({})", tx);

    }


    @Override

    public void close() {

        LOG.info("Emitter.close()");

    }


}


Emitter 인터페이스의 각 메서드는 다음 기능을 제공한다.


메서드 

설명 

 emitBatch

트랜잭션ID에 속할 튜플을 생성한다. 각 파라미터는 다음과 같은 용도로 사용된다.

  • tx 파라미터: 트랜잭션ID와 재시도회수 제공
  • coordinatorMeta: BatchCoordinator가 생성한 메타 데이터
  • collector: 트랜잭션ID에 속하는 튜플을 토폴로지에 추가
 success

 트랜잭션에 속한 모든 튜플을 성공적으로 처리했을 때 실행된다.

 close

 토폴로지를 종료할 때 실행된다.


스톰 토폴로지를 구현할 때 어려운 부분 중 하나가 스파우트다. 스파우트의 종류에는 Transactional, Opaque transactional, non-transactional의 세 가지가 있다고 했는데, 이 중 transactional이나 opaque transactional을 구현하려면 트랜잭션ID와 트랜잭션 메타데이터를 활용해야 한다. 예를 들어, Emitter는 emitBatch()를 실행할 때 캐시에 트랜잭션ID를 키로 하고 생성한 튜플 목록을 값으로 하는 (키, 값) 쌍을 외부의 메모리 캐시에 보관할 수 있을 것이다. 이후, 튜플 처리에 실패해서 재전송을 해야 한다면 외부 메모리 캐시에서 읽어와 동일한 튜플을 재전송할 수 있을 것이다. 이 경우, success() 메서드에서 외부 메모리 캐시에서 트랜잭션ID에 해당하는 값을 삭제하도록 구현할 것이다.


연산 처리 구성 요소: Function, Filter


튜플을 생성하면 그 다음으로 할 일은 튜플을 가공해서 원하는 결과를 얻어내는 것이다. 보통은 튜플을 가공하고, 걸러내는 과정을 거친 뒤에, 집합 연산을 수행하게 된다. 이를 위해 스톰 트라이던트는 Function, Filter, Aggregator를 제공하는데, 먼저 Function과 Filter에 대해 살펴보도록 하자. 이 두 인터페이스와 관련된 상위 타입 및 하위 타입은 아래 그림과 같이 구성되어 있다.



Function과 Filter를 사용할 때 초기화가 필요하면 prepare() 메서드를 이용한다. 반대로 종료시 정리가 필요하면 cleanup() 메서드를 이용한다.


Filter 구현


Filter는 튜플을 걸러내는 기능을 제공한다. Filter 인터페이스에 정의된 isKeep() 메서드는 튜플을 계속해서 스트림에 흘려 보낼지 여부를 결정한다. 다음은 Filter 인터페이스의 구현 예다. isKeep() 메서드를 보면 튜플 데이터 중 "logString"을 읽어와 이 문자열이 "ORDER"로 시작하면 true를 리턴하고 그렇지 않으면 false를 리턴한다. 따라서, 로그 문자열 값이 "ORDER"로 시작하는 것만 토폴로지의 다음 단계에 전달된다.


public class OrderLogFilter extends BaseFilter {

    private static final long serialVersionUID = 1L;

    private static final Logger LOG = LoggerFactory.getLogger(OrderLogFilter.class);


    @Override

    public boolean isKeep(TridentTuple tuple) {

        String logString = tuple.getStringByField("logString");

        boolean pass = logString.startsWith("ORDER");

        if (!pass)

            LOG.info("OrderLogFilter filtered out {}", logString);

        return pass;

    }


}


위 필터를 사용한 토폴로지 구성 코드 예는 다음과 같다. 이 코드를 보면 LogSpout에 의해 생성된 튜플을 OrderLogFilter로 먼저 필터링하고 있다. 따라서, 튜플에 포함된 "logString"의 값이 "ORDER"로 시작하는 것만 그 다음 단계인 LogParser에 전달된다.


TridentTopology topology = new TridentTopology();

topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

       ...코드생략


each() 메서드의 첫 번째 파라미터는 Filter.isKeep() 메서드에서 사용할 튜플 필드 목록을 뜻한다. 즉, 위 코드의 경우 OrderLogFilter의 isKeep() 메서드는 "logString" 필드만 사용할 수 있다. 이 예에서는 LogSpout가 [logString:값]인 튜플을 생성하기 때문에, 사용할 필드가 "logString" 밖에 없지만, 만약 여러 개의 필드를 생성한다면 new Fields("field1", "field2")처럼 여러 필드명을 지정할 수 있다.


Function 구현


Function은 튜플에 새로운 데이터를 추가할 때 사용한다. Function 인터페이스는 튜플을 가공하기 위한 execute() 메서드를 정의하고 있으며, 이 메서드에서 튜플을 알맞게 가공하면 된다. 다음은 Function 인터페이스의 구현 예를 보여주고 있다.


public class LogParser extends BaseFunction {


    private static final long serialVersionUID = 1L;


    @Override

    public void execute(TridentTuple tuple, TridentCollector collector) {

        String log = (String) tuple.getStringByField("logString");

        ShopLog event = parseLog(log);

        collector.emit(Arrays.<Object>asList(event));

    }


    private ShopLog parseLog(String log) {

        String[] tokens = log.split(",");

        return new ShopLog(tokens[0], Long.parseLong(tokens[1]), Long.parseLong(tokens[2]));

    }


}


execute() 메서드는 tuple로부터 가공에 필요한 필드 값을 구한다. 구한 필드 값으로 알맞은 처리를 한 뒤, collector.emit()을 이용해서 결과로 생성할 필드 데이터를 추가한다. collector.emit() 메서드는 List<Object> 타입을 인자로 받는데, 이 때 List에 담긴 각 값이 토폴로지 정의시 지정한 추가 필드가 된다. 예를 들어, 다음과 같이 LogParser Function을 사용한 경우, 위 코드의 execute() 메서드에서 collector.emit() 메서드에서 추가한 값 목록은 아래 코드의 each() 메서드에서 세 번째 파라미터로 지정한 필드 목록에 매칭된다.


topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        ...코드생략


Function 사용시 유희할 점은 Function은 기존 튜플의 필드를 제거하지 않고, 새로운 튜플을 추가한다는 점이다. 예를 들어, 위 코드를 보면 LogSpout는 [logString:값]인 튜플을 생성한다. LogParser를 거치면 튜플은 [logString:값, shopLog:값] 이 된다. 이 상태에서 AddGroupingValueFunction을 거치면 튜플은 [logString:값, shopLog:값, productId:time:값]이 된다. each() 메서드의 첫 번째 파라미터는 이 튜플 필드 중 Function이나 Filter에 전달할 필드 목록을 지정하는 것이며, 세 번째 파라미터는 Function의 실행 결과로 추가되는 필드 목록을 지정하는 것이다.


참고로, AddGroupingValueFunction 클래스는 로그를 그룹핑할 때 사용할 필드를 추가하는 기능을 제공한다. 이 클래스는 새로운 필드로 "제품ID:기준시간" 값을 추가한다. 각 제품의 분 당 판매개수를 구할 것이기 때문에, 타임스탬프 값을 60000으로 나눈 값을 기준시간 으로 사용해 봤다.


public class AddGroupingValueFunction extends BaseFunction {


    private static final long serialVersionUID = 1L;


    @Override

    public void execute(TridentTuple tuple, TridentCollector collector) {

        ShopLog shopLog = (ShopLog) tuple.getValueByField("shopLog");

        long time = shopLog.getTimestamp() / (1000L * 60L);

        collector.emit(Arrays.<Object>asList(shopLog.getProductId() + ":" + time));

    }


}


연산처리 구성 요소: 그룹핑과 집합 연산


트라이던트로 스트림을 처리하면 많은 경우 집합 연산을 하게 된다. 예를 들어, 예제의 경우는 분당 제품 판매 개수를 구해야 하는데, 이는 "제품ID:시간"을 기준으로 판매 개수를 구하는 집합 연산을 필요로 한다. 이런 집합 연산을 처리할 수 있도록 트라이던트 API는 다음의 두 가지를 제공하고 있다.

  • 그룹 스트림: 배치에 속한 튜플들을 특정 필드를 기준으로 그룹핑한다. 이 스트림은 그룹 단위로 연산을 적용한다.
  • 집합 연산: 배치에 속한 튜플을 모아 단일 결과(튜플)를 생성한다.

이 두 기능을 사용하면 특정 필드를 기준으로 개수를 구한다거나 합을 구하는 등의 연산을 수행할 수 있다. 그룹 스트림을 생성하는 방법은 매우 간단하다. 토폴로지 정의에서 groupBy() 메서드를 사용하면 된다.


topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .groupBy(new Fields("productId:time"))

        .aggregate(new Fields("productId:time"), new Count(), new Fields("count"))

        ...생략


groupBy() 메서드는 지정한 필드를 이용해서 튜플을 그룹핑하는 GroupedStream을 리턴한다.



GroupedStream의 aggregate() 메서드를 사용하면 각 그룹별로 집합 연산을 수행하게 된다. 예를 들어, 아래 코드는 groupBy로 생성된 그룹에 대해 "productId:time" 필드를 기준으로 Count 집합 연산을 수행한다.


topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .groupBy(new Fields("productId:time"))

        .aggregate(new Fields("productId:time"), new Count(), new Fields("count"))

        ...생략


aggregate() 메서드의 세 번째 파라미터는 집합 연산 결과로 생성되는 필드를 정의한다. 위 코드의 경우 Count 집합 연산의 결과로 생성되는 필드가 한 개임을 알 수 있다. aggregate는 새로운 튜플을 생성하는데, GroupedStream의 aggregate는 첫 번째 파라미터로 지정한 필드와 세 번째 파라미터로 지정한 필드를 갖는 튜플을 생성한다. 즉, 위 코드의 경우 aggregate 메서드의 결과로 생성되는 튜플은 ["productId:time":값, "count":값] 으로 구성된다.


집합 연산 인터페이스


스톰 트라이던트 API는 집합 연산을 위한 인터페이스 세 개를 제공하며, 이들 인터페이스는 다음과 같다.



집합 연산: ReducerAggregator


ReducerAggregator 인터페이스는 다음과 같이 정의되어 있다.


public interface ReducerAggregator<T> extends Serializable {

    T init();

    T reduce(T curr, TridentTuple tuple);

}


ReducerAggregator의 동작 방식은 다음과 같다.

  1. 배치(또는 그룹)의 첫 튜플을 처리하기 전에 init()으로 초기값을 생성한다.
  2. 이 초기값을 현재값(curr)으로 설정한다.
  3. 각 튜플마다
    1. reduce(curr, 튜플)을 실행한다.
    2. 3-1의 결과를 curr에 설정한다.
  4. 마지막 튜플에 대한 reduct()의 결과값을 aggregate의 결과값으로 사용한다.
간단하게 튜플의 특정 필드의 합을 구하는 집합 연산을 ReducerAggregator로 구현해보면 다음과 같이 구할 수 있다.

public class Sum implements ReducerAggregator<Long> {
    private static final long serialVersionUID = 1L;

    @Override
    public Long init() {
        return 0L;
    }

    @Override
    public Long reduce(Long curr, TridentTuple tuple) {
        return curr + tuple.getLong(0);
    }
}


집합 연산: CombinerAggregator


CombinerAggregator 인터페이스는 다음과 같이 정의되어 있다.


public interface CombinerAggregator<T> extends Serializable {

    T init(TridentTuple tuple);

    T combine(T val1, T val2);

    T zero();

}


CombinerAggregator의 동작 방식은 다음과 같다.

  1. 배치(또는 그룹)에 튜플이 있다면, 각 튜플마다
    1. init()으로 튜플에 해당하는 값을 구한다.
    2. combine()을 이용해서 이전 combine()의 결과와 init()의 결과를 조합한 결과를 리턴한다.
      1. 첫 번째 튜플의 경우 이전 combine() 결과 값으로 zero() 값을 사용한다.

스톰은 CombinerAggregator의 구현인 Count를 제공하고 있는 Count의 코드는 다음과 같다. 배치(또는 그룹)의 튜플 개수를 셀 때 사용하는 CombinerAggregator이다.


public class Count implements CombinerAggregator<Long> {


    @Override

    public Long init(TridentTuple tuple) {

        return 1L;

    }


    @Override

    public Long combine(Long val1, Long val2) {

        return val1 + val2;

    }


    @Override

    public Long zero() {

        return 0L;

    }

    

}


집합 연산: Aggregator


Aggregator는 앞서 두 개 인터페이스보다 좀 더 범용적인 인터페이스를 정의하고 있다.


public interface Aggregator<T> extends Operation {

    T init(Object batchId, TridentCollector collector);

    void aggregate(T val, TridentTuple tuple, TridentCollector collector);

    void complete(T val, TridentCollector collector);

}


Aggregator는 다음과 같이 동작한다.

  1. 배치의 튜플을 처리하기 전에 init()을 실행하고, 집합 연산에 필요한 객체 val을 리턴한다.
  2. 각 튜플마다 aggregator 메서드를 호출한다. 첫 번째 파라미터는 1에서 리턴한 객체이다.
  3. 배치의 모든 튜플에 대한 처리가 끝나면 complete 메서드를 호출한다.
ReducerAggregator와 CombinerAggregator가 최종적으로 1개의 값을 생성하도록 제한된 인터페이스인 반면에, Aggregator는 한 개 이상의 튜플을 생성할 수 있다. 예를 들어, 다음은 배치(또는 그룹)에 있는 튜플 중에서 정렬 기준으로 N개의 튜플을 골라내는 Aggregator의 구현 코드이다. (스톰이 제공하는 클래스다.)

public static class FirstNSortedAgg extends BaseAggregator<PriorityQueue> {

    int _n;
    String _sortField;
    boolean _reverse;
    
    public FirstNSortedAgg(int n, String sortField, boolean reverse) {
        _n = n;
        _sortField = sortField;
        _reverse = reverse;
    }

    @Override
    public PriorityQueue init(Object batchId, TridentCollector collector) {
        return new PriorityQueue(_n, new Comparator<TridentTuple>() {
            @Override
            public int compare(TridentTuple t1, TridentTuple t2) {
                Comparable c1 = (Comparable) t1.getValueByField(_sortField);
                Comparable c2 = (Comparable) t2.getValueByField(_sortField);
                int ret = c1.compareTo(c2);
                if(_reverse) ret *= -1;
                return ret;
            }                
        });
    }

    @Override
    public void aggregate(PriorityQueue state, TridentTuple tuple, TridentCollector collector) {
        state.add(tuple);
    }

    @Override
    public void complete(PriorityQueue val, TridentCollector collector) {
        int total = val.size();
        for(int i=0; i<_n && i < total; i++) {
            TridentTuple t = (TridentTuple) val.remove();
            collector.emit(t);
        }
    }
}  


Aggregator를 이용해서 튜플 개수를 세는 코드는 다음과 같이 구현하고 있다.


public class CountAsAggregator extends BaseAggregator<CountAsAggregator.State> {


    static class State {

        long count = 0;

    }

    

    @Override

    public State init(Object batchId, TridentCollector collector) {

        return new State();

    }


    @Override

    public void aggregate(State state, TridentTuple tuple, TridentCollector collector) {

        state.count++;

    }


    @Override

    public void complete(State state, TridentCollector collector) {

        collector.emit(new Values(state.count));

    }

    

}


토플로지 구성 및 로컬 실행


스파우트와 Function, Filter, 집합 연산을 구현했다면, 남은 일은 TridentTopology를 이용해서 토폴로지를 구성하고 실행하는 것이다. 아래 코드는 토폴로지를 구성하고 로컬에서 실행하는 예제 코드를 보여주고 있다.


public class LogTopology {


    public static void main(String[] args) {

        TridentTopology topology = new TridentTopology();

        topology.newStream("log", new LogSpout())

                .each(new Fields("logString"), new OrderLogFilter())

                .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

                .each(new Fields("shopLog"), 

                         new AddGroupingValueFunction(), new Fields("productId:time"))

                .groupBy(new Fields("productId:time"))

                .aggregate(new Fields("productId:time"), new Count(), new Fields("count"))

                .each(new Fields("productId:time", "count"), 

                         new CountSumFunction(), new Fields("sum"))

                .each(new Fields("productId:time", "sum"), new ThresholdFilter())

                .each(new Fields("productId:time", "sum"), new AlertFilter());

        StormTopology stormTopology = topology.build();


        Config conf = new Config();

        conf.put("ThresholdFilter.value", 5L);

        LocalCluster cluster = new LocalCluster();

        cluster.submitTopology("cdc", conf, stormTopology);


        try {

            Thread.sleep(60000);

        } catch (InterruptedException e) {

        }

        cluster.shutdown();

    }


}


글에서 보여주지 않은 코드인 CountSumFunction, ThresholdFilter, AlertFilter 등은 참고자료에 있는 예제 코드 링크를 참고하기 바란다.


참고자료

  • 예제 코드: https://github.com/madvirus/storm-sample





  1. 96480XX 2014.10.01 15:10

    내가 아는 사람중에
    뭐 이렇게 좋은 내용의 블로그를 운영하는 사람이 있었다니 ...

반응형

Zookeeper 소개 발표 자료입니다.


반응형

하둡2의 YARN 짧게 보기 자료입니다.



  1. 해피데이 2014.08.04 18:31

    안녕하세요? 올려주신 hadoop2 자료로 대략적인 이해 및 개념이 잡혔습니다.
    감사합니다.

    해당 ppt를 보면서는 hadoop1의 JobTraker, TaskTraker, namenode, datanode가 별 의미가 없어졌다고 생각했는데,
    hadoop2 설치 및 start/stop 커맨드를 보니, namenode와 datanode를 별도로 start/stop 하더라구요.

    hadoop2에서 namenode, datanode, JobTraker, TaskTraker는 어떤 의미를 가지고 yarn과 관련해서는 언제, 어떻게 동작하는지 설명해 주실 수 있을까요?

    그리고...
    Hadoop Federation 이라고 하는 개념도 어떤 것인지 궁금합니다.

    • 최범균 madvirus 2014.08.04 19:01 신고

      우선, Hadoop은 크게 두 개의 기능을 제공하는데, 하나는 File 보관을 위한 HDFS이고, 다른 하나는 연산을 위한 MapReduce 입니다.
      이들을 위한 데몬이 다음과 같죠.
      * HDFS: namenode, datanode
      * MR: JobTracker, TaskTracker

      Hadoop1의 MR은 한계가 있었고, 이를 극복하기 위해 Hadoop2에서 연산 부분을 MR1에서 좀더 범용적인 YARN을 개비를 했습니다.
      그러면서, 다음과 같이 연산을 위한 데몬이 바뀌게 되죠.

      * HDFS: namenode, datanode (동일!)
      * YARN: ResourceManager, NodeManager

      즉, Hadoop2에서 Namenode와 datanode는 그대로 남아 있고,
      Hadoop1에서 MR을 위해서 존재하던 JobTracker와 TaskTracker가 사라지고(?)
      YARN을 위한 ResourceManager와 NodeManager로 대체되었습니다.

      Hadoop1에서 MR만 실행할 수 있었던 JobTracker/TaskTracker와 달리
      Hadoop2의 YARN은 MR 뿐만 아니라 다양한 연산을 실행할 수 있는 범용적인 프레임워크입니다.
      예를 들어, YARN을 기반으로 Spark나 스톰, JBoss 등을 실행할 수 있습니다.

      그리고, Namenode Federation은 Namenode의 단점을 해소하기 위해 나왔습니다.
      Namenode는 기본적으로 1대에서 모든 HDFS의 파일 정보를 제공하게 되는데, 이는 확장성에 한계를 갖게 만듭니다.
      그래서, 여러 대의 서버에 파일 정보를 나눠서 보관함으로써 처리 용량을 확장할 수 있게 만든게 Namenode Federation 입니다.

      Namenode가 죽으면 HDFS의 모든 서비스가 중지됩니다. 그래서 Namenode가 SPOF(Single Point Of Failure)가 됩니다.
      따라서, 한 대의 Namenode가 죽어도 서비스를 유지하기 위해 동일한 Namenode를 두 대 만드는데,
      이를 Namenode HA(high availability)라고 합니다.

      제가 이쪽 전문가는 아니어서 이 정도 밖에 설명을 못 드리겠네요.
      자료를 찾는데 단초가 되었으면 하고 바래봅니다.

반응형

오늘 Ambari 1.5.1을 이용해서 호튼웍스의 HDP 2.1을 설치하던 도중 다음과 같은 에러가 발생했다.


'ambari.clusterconfig' doesn't exist


이 문제는 MySQL의 데이터베이스를 UTF-8로 만들면서 발생했다. ambari가 사용할 DB로 기존에 설치되어 있던 MySQL 5.1 버전을 사용했고, 다음 스크립트를 이용해서 데이터베이스를 생성했다.


create database ambari character set utf8


ambari-server setup을 이용해서 ambari가 사용할 DB를 설정할 때, 위 코드로 생성한 DB를 설정해주었다. setup 과정에서 아무 에러 메시지 출력 없이 정상적으로 실행되었다.


ambari-server start로 ambari 서버를 실행하고, http://설치호스트:8080 으로 접속해서 설치를 시작했다. 설치 대상 서버부터, 각 구성 요소를 어느 서버에 설치할지, hive 등의 메타 정보를 어느 DB에 보관할지 등을 설정했고, 최종적으로 설치를 시작했다. 그런데, 앞서 언급했던 ['ambari.clusterconfig' doesn't exist'] 에러 메시지가 출력되면서 더 이상 설치를 진행할 수 없게 됐다.


configcluster 테이블이 왜 존재하지 않는지 확인해 보기 위해, ambari가 DB 테이블을 생성할 때 사용하는 스크립트를 찾아보았다. 스 스크립트에서 configcluster 테이블을 생성할 때 사용되는 쿼리를 찾아서 mysql 콘솔에서 직접 실행해보니 다음과 같은 오류가 발생했다.


ERROR 1071 (42000): Specified key was too long; max key length is 1000 bytes


켁! PK 생성 제약으로 configcluster 테이블을 생성하지 못한 것이다. ambari-server setup 과정에서 아무 오류 없이 넘어갔기에 테이블 생성이 안 된 것을 설치 과정 후반부에 안 것이다.


위 문제가 발생한 이유는 데이터베이스를 UTF-8로 생성했기 때문이다. configcluster 테이블이 PK로 사용하는 컬럼은 세 개인데, 그 중 2개 컬럼의 타입이 varchar(255)이다. 그런데, UTF-8을 사용하면서 한 글자가 최대 4개 바이트까지 차지할 수 있기 때문에, 두 개 컬럼이 실제로 차지할 수 있는 바이트 수가 가뿐히 1000을 넘기게 된다.


이 문제를 해결하는 가장 쉬운 방법은 DB를 latin 계열 캐릭터셋을 사용하도록 만드는 것이겠지만, 그러고 싶지 않았다. 좀 검색을 해 보니 MySQL 5.5의 경우 innodb_large_prefix 옵션을 true로 주면 1000 바이트가 넘는 PK를 생성할 수 있다는 것을 알게 되었다. (http://dev.mysql.com/doc/refman/5.5/en/innodb-parameters.html#sysvar_innodb_large_prefix 참고)


[mysqld]

...다른설정들

innodb_large_prefix = true


기존에 설치된 MySQL 5.1에 아무것도 없었기에, 그냥 5.5로 바꾸고 위 옵션을 추가해 주었다. MySQL 5.1을 사용하고 있다면, utf-8이 아닌 euc-kr이나 다른 캐릭터 셋을 사용하도록 설정해서 PK의 바이트 길이가 1000을 넘지 않도록 해 줘야 할 것 같다.


반응형
앞서 시간 윈도우와 개수 윈도우에 대해 알아봤었는데, Esper는 이 외에도 다양한 종류의 뷰를 제공하고 있다. 이번 글에서는 몇 가지 뷰를 정리해보았다.

이전 글
Esper에서는 뷰는 이벤트를 유지하는 공간이다. EPL에서 사용중인 뷰에 담긴 이벤트 목록을 구하고 싶다면 다음의 코드를 사용하면 된다.

SafeIterator<EventBean> iter = eps.safeIterator(); // eps는 EPStatement
while (iter.hasNext()) {
    EventBean bean = iter.next();
    SomeEvent event = (SomeEvent)bean.getUnderlying());
}
iter.close();

1. 윈도우 뷰


앞서 EPL 기초 글에서 4개의(시간, 시간 배치, 길이, 길이 배치) 윈도우 뷰를 설명했었다. 여기선 몇 개의 윈도우 뷰를 추가로 설명하겠다.


외부 시간 윈도우(externally-timed window)


시간 윈도우가 이벤트의 발생 시점과 엔진 시간을 기준으로 윈도우에 포함될 이벤트를 결정한다면, 외부 시간 윈도우는 계산된 시간을 기준으로 윈도우에 포함될 이벤트를 결정한다. 다음은 설정 예이다.


select avg(responseTime) as avg from AccessLog.win:ext_timed(accessTime, 3 seconds)


win:ext_timed의 첫 번째 파라미터는 시간을 계산할 때 사용할 표현식으로 밀리초 단위의 값을 사용한다. 위 EPL에서는 AccessLog의 accessTime 값을 사용하였다. 위 EPL을 실행하면 win:ext_timed는 다음과 같은 방식으로 동작한다.

  • 새로운 AccessLog 이벤트 n이 들어오면, n.accessTime 값을 기준으로 이전 이벤트 x에 대해 x.accessTime 값이 3초 지난 x를 찾는다.
  • 윈도우에 x를 제거한다.

예를 들어, 신규 이벤트가 발생하는 순서에 따라서 윈도우에 보관되는 이벤트는 다음과 같이 달라진다.


신규 이벤트 

윈도우에서 제거되는 이벤트 

윈도우에 포함된 이벤트

N1 (accessTime=100)

-

N1 

N2 (accessTime=2500)

N1,N2 

N3 (accessTime=3400) 

N1 (3400 - 100 > 3000) 

N2,N3 

N4 (accessTime=3700)

-

N2,N3,N4  

N5 (accessTime=5300)

-

N2,N3,N4,N5

N6 (accessTime=5700) 

N2 (5700 - 2500 > 3000) 

N3,N4,N5 

* 새로운 이벤트가 들어와야 윈도우에서 제거되므로, 이 점에 유의해서 사용해야 한다.


외부 시간 배치 윈도우(externally-timed batch window)

외부 시간 배치 윈도우는 이벤트 발생 시점에 표현식을 기준으로 배치를 처리한다. 

select avg(responseTime) as avg from AccessLog.win:ext_timed_batch(accessTime, 2 seconds)


win:ext_timed_batch의 첫 번째 파라미터는 타임 배치를 처리할 기준 시간이다. 다음은 위 EPL을 실행할 때 이벤트 순서에 따라 배치 윈도우에 들어가는 이벤트와 결과 생성을 보여주고 있다.


신규 이벤트 

배치 윈도우

결과 생성

N1 (accessTime=100)

N1


N2 (accessTime=2500)

N2 

N1의 결과

N3 (accessTime=3400) 

N2,N3


N4 (accessTime=3700)

N2,N3,N4


N5 (accessTime=5300)

N5

N2,N3,N4의 결과

N6 (accessTime=5700) 

N5,N6


N7 (accessTime=7400)

N7 

N5,N6의 결과 

* 새로운 이벤트가 들어와야 윈도우에서 제거되므로, 이 점에 유의해서 사용해야 한다.


시간 누적 윈도우(time-accumulating window)


시간 누적 윈도우는 지정한 시간 안에 이벤트가 들어오지 않으면 윈도우에서 이벤트를 제거한다. 예를 들어, 아래 코드는 마지막 이벤트가 들어온지 2초 동안 이벤트가 들어오지 않으면 윈도우에서 이벤트를 제거한다.


select rstream * from AccessLog.win:time_accum(2 sec)


최초 길이/시간 윈도우(first length, first time)


최초 n개의 이벤트만 윈도우에 유지하거나 시작 이후 지정한 시간 동안의 이벤트만 윈도우에 유지하고 싶다면, 다음의 두 윈도우를 사용한다.


select * from AccessLog.win:firstlength(10)


select * from AccessLog.win:firsttime(2 sec)



2. 표준 뷰


Unique 뷰 / Firstunique 뷰

Unique 뷰는 지정한 식을 기준으로 단일 값을 보관한다. 예를 들어, 아래 코드를 보자.

select * from StockTick.std:unique(code)

위 EPL은 같은 code 값을 가지는 이벤트 중 가장 마지막 이벤트를 포함하는 뷰를 생성한다. 예를 들어, 이벤트의 code 값에 따라 뷰에 포함되는 이벤트는 다음과 같이 달라진다.

발생 이벤트 

뷰에 포함된 이벤트 

S1(code='1')

S1 

S2(code='1')

S2

S3(code='2')

S1,S3

S4(code='1')

S3,S4

S5(code='2')

S4,S5

S6(code='2')

S4,S6


Firstunique 뷰는 지정한 식을 기준으로 단일 값 중 최초 이벤트를 보관한다. (Unique 뷰는 단일 값 중 마지막 이벤트를 보관하는 것과 다르다.)


select * from StockTick.std:firstunique(code)



그룹 데이터 윈도우

그룹 데이터 윈도우는 지정한 표현식을 기준으로 그룹핑 된 데이터 윈도우를 생성한다. 앞서 살펴본 뷰와 차이점이 있다면 반드시 다른 뷰를 서브 뷰로 조합해서 사용해야 한다는 점이다. 예를 들어, 다음의 뷰를 보자.

select code, avg(cost) as avg from StockTick.std:groupwin(code).win:length(3)

위 EPL은 code 프로퍼티를 이용해서 이벤트를 그룹핑한다. 그리고, 각 그룹별로 길이가 3인 서브 뷰를 만든다. 즉, 각 code 별로 길이가 3인 뷰가 만들어지는 것이다. 그런데, 위 EPL에서 주의할 점은 뷰만 그룹으로 생성된다는 거지, avg 연산자 그룹별로 되는 것은 아니라는 점이다.

실제 그룹별로 값을 구하려면 다음과 같이 group by 를 함께 사용해줘야 한다. 

select code, avg(cost) as avg from StockTick.std:groupwin(code).win:length(3)
group by code


3. 통계 뷰

단변량(Univariate) 통계

단변량 통계 뷰는 숫자 프로퍼티에 대한 통계 데이터를 제공하는 뷰다. 다음은 사용 예이다.

select average from StockTick.stat:uni(cost)

stat:uni()는 숫자 식을 첫 번째 파라미터로 갖는다. 위 코드의 경우 cost 프로퍼티를 값으로 갖는다. 위 EPL에서 select 절에 있는 average는 stat:uni 뷰가 제공하는 프로퍼티로서, 평균 값을 제공하는데 사용된다. average 프로퍼티 외에 stat:uni 뷰가 제공하는 프로퍼티는 다음과 같다.

  • datapoints: 값의 개수
  • total: 값의 합
  • average: 값의 평균
  • variance: 분산
  • stddev: 샘플 표준편차
  • stddevpa: 표준편차

단변량 통계 뷰는 다른 뷰와 함께 사용될 수 있다. 예를 들어, 다음 코드는 code로 그룹핑 한 뷰의 서브 뷰로 stat:uni 뷰를 사용했는데, 이 경우 average는 code 그룹 별로 cost의 평균이 된다.


select average from StockTick.std:groupwin(code).stat:uni(cost)


stat:uni가 제공하는 프로퍼티 외에 다른 프로퍼티(예, 이벤트의 프로퍼티)를 사용하려면 다음과 같다. stat:uni 뷰에 파라미터를 추가로 지정해주면 된다.


select avergae, code from StockTick.std:groupwin(code).stat:uni(cost, code)

select * from StockTick.std:groupwin(code).stat:uni(cost, code, price)


select 절에서 '*'을 사용하면 stat:uni가 제공하는 프로퍼티 및 두 번째 이후에 지정한 파라미터를 선택한다.


회귀Regression 뷰


stat:linest 뷰를 이용하면 두 식의 선형회귀를 구할 수 있다. 다음 식은 price와 total의 회귀 연산을 해서 회귀선의 기울기를 구한다.


select slope from StockTick.win.time(30 min).std:linest(cost, total)


std:linest()는 slope를 포함해서 다음의 프로퍼티를 제공한다.

  • slope: 기울기
  • YIntercept: Y 절편
  • XSum, YSum, sumX, sumY: X, Y의 합
  • sumXY: X*Y의 합
  • sumXSq, sumYSq: X 제곱의 합, Y 제곱의 합
  • XAverage, YAverage: X, Y 평균
  • XVariance, YVariance: X, Y 분산
  • XStandardDeviationPop, YStandardDeviationPop: X, Y 표준 편차
  • XStandardDeviationSample, YStandardDeviationSample: X, Y 샘플 표준 편차

상관계수 뷰


stat:correl 뷰는 두 식 간의 상관계수를 구한다.


select correlation from StockTick.stat:correl(cost, total)


4.확장 뷰


정렬 윈도우 뷰


특정 식을 기준으로 정렬 기준으로 상위 n개의 이벤트만 유지하고 싶을 때 ext:sort 뷰를 사용한다. 다음은 ext:sort 뷰의 예이다.


select sum(total) from StockTick.ext:sort(10, total desc)

select sum(total) from StockTick.ext:sort(10, total desc, timestamp asc)


ext:sort() 첫 번째 파라미터로 정렬된 상태로 보관할 이벤트 개수를 지정하고, 두 번째 이후로는 정렬 기준으로 지정한다. 위 코드에서 첫 번째 EPL은 total 프로퍼티 값이 큰 상위 10개 StockTick 이벤트를 기준으로 sum(total)을 생성하게 된다. 정렬 기준으로 두 개 이상 지정하고 싶다면, 각 정렬 기준을 콤마로 구분해서 지정하면 된다.


랭킹 뷰


랭킹 뷰는 정렬 윈도우 뷰와 유사하다. 차이점이 있다면, 랭키 뷰는 유일식을 기준으로 한 개의 이벤트만 유지한다는 점이다. 예를 들어, 다음 EPL을 보자.


select sum(total) from StockTick.ext:rank(code, 10, price desc)


이 EPL은 price 기준으로 상위 10개의 StockTicke 이벤트를 뷰에 보관한다. 그런데, code 값이 같은 이벤트는 한 개만 보관한다. 예를 들어, 다음의 순서로 이벤트가 발생했다고 하면,

  • S1(code='C1', price=1000) -> S2(code='C2', price=800) -> S3(code='C1', price=1200)
뷰에 남는 이벤트는 S2와 S3가 된다. S1의 price 값이 S2보다 크지만, S3와 S1이 같은 code 값을 갖기 때문에 랭킹 뷰에는 S3만 남게 된다.


반응형

관련 글


컨텍스트


주식 종목 별로 최근 10분 동안의 평균 가격을 구하려면 다음과 같이 group by 사용햇다.


select code, avg(cost) as avg from StockTick.win:time(10 min) group by code


컨텍스트를 만들면 group by를 사용하지 않고 이벤트를 분류해서 분류된 파티션 별로 결과를 만들어 낼 수 있다. 컨텍스트는 이벤트를 컨텍스트 파티션으로 분류해 주며, EPL에 컨텍스트를 적용하면, 한 개의 EPL을 컨텍스트 파티션 별로 적용할 수 있다. 따라서, 주식 코드 별로 컨텍스트 파티션을 생성하고, 이 컨텍스트를 이용해서 EPL을 실행하면 group by 등의 쿼리를 사용하지 않아도 종목 별로 평균이나 추이 등을 분석할 수 있다.


컨텍스트 만들고 사용하기


컨텍스트는 다음과 같은 문장을 이용해서 생성한다.


epService.getEPAdministrator().createEPL(

       "create context CodeSegment partition by code from StockTick");


위 코드는 StockTick 이벤트를 code 값으로 분류하는 CodeSegment라는 컨텍스트를 생성한다. 값으로 파티션을 생성하는 방식과 해시값으로 파티션을 생성하는 방식 등 몇 가지 종류의 컨텍스트를 제공하는데, 이에 대한 내용은 뒤에서 다시 정리해 본다.


파티션을 생성했다면, EPL에서 다음과 같이 context 절을 이용해서 컨텍스트를 적용할 수 있다.


EPStatement eps = epService.getEPAdministrator().createEPL(

        "context CodeSegment " +

        "select code, avg(cost) as avg from StockTick.win:time(3 sec) ");


위 코드는 CodeSegment 컨텍스트를 사용하는데, 이 경우 각 EPL은 code 값으로 분류된 파티션 별로 적용된다. 따라서, 위 EPL의 select는 각 code 별로 최근 3초 시간 윈도우의 평균 값을 생성한다.


컨텍스트 종류

  • 키 기반 컨텍스트
  • 해시 키반 컨텍스트
  • 카테고리 컨텍스트
  • 논오버래핑 컨텍스트
  • 오버래핑 컨텍스트


컨텍스트 종류: 키 기반 컨텍스트


키 기반 컨텍스트는 이벤트의 특정 프로퍼티를 이용해서 이벤트를 파티션으로 분류한다. partition by 를 이용해서 파티션 키를 지정한다.


create context CodeSegment partition by code from StockTick


두 개 이상의 키를 사용할 수도 있다.


create context AccessLogSegment partition by domain and sesessionId from AccessLog


여러 이벤트를 이용해서 컨텍스트를 생성할 수도 있다.


create context CodeSegment2 partition by 

compCode from Announcement, code from StockTick


위 컨텍스트는 StockTick 이벤트와 Announcement 이벤트를 이용해서 컨텍스트를 생성한다. 각 이벤트에서 지정한 프로퍼티 개수와 타입은 동일해야 한다. 위 컨텍스트 내에서 실행되는 EPL은 코드는 같은 회사 코드를 갖는 주가와 공시 이벤트를 묶어서 처리할 수 있게 된다. 예를 들어, 아래 EPL에서 StockTick 이벤트와 Announcement 이벤트는 이미 동일한 회사 코드를 갖고 있으므로, code와 compCode가 같은지 여부를 비교할 필요가 없다.


context CodeSegment

select s from Announcement.win:time(10 min) a, StockTick t

where t.rate > 10


컨텍스트 종류: 해시 기반 컨텍스트


해시 기반 컨텍스트는 지정한 프로퍼티의 해시 값을 이용해서 이벤트를 분류한다. 해시 기반 컨텍스트를 사용하려면 다음과 같이 coalesce by와 해시 함수를 함께 사용하면 된다.


create context CodeSegment coalesce by

consistent_hash_crc32(code) from StockTick granularity 32

preallocate


consistent_hash_crc32는 CRC 32 알고리즘을 이용해서 해시 코드를 생성한다. hash_code를 사용하면 자바의 해시 코드를 이용한다. granularity는 파티션의 최대 개수를 지정하며, preallocate를 사용하면 미리 파티션을 생성해 놓는다. 키 기반 컨텍스트와 마찬가지로 여러 이벤트를 이용해서 정의할 수 있다.


위 코드를 이용해서 생성한 해시 기반 컨텍스트는 하나의 파티션에 한 개 이상의 code가 존재하게 됨에 유의하자.


컨텍스트 종류: 카테고리 컨텍스트


카테고리 컨텍스트는 프로퍼티를 이용해서 카테고리를 생성하고 이를 기준으로 분류한다. 다음은 카테고리 컨텍스트의 생성 예를 보여주고 있다. "group 표현식 as 카테고리이름"을 이용해서 카테고리를 정의한다.


create context AccessLogCategory

group responseTime <= 1000 as normal,

group responseTime > 1000 and responseTime <= 2000 as slow,

group responseTime > 2000 as tooslow

from AccessLog


카테고리 기반 컨텍스트는 이벤트를 몇 개의 카테고리로 나눈다. 위 코드는 3개의 파티션을 생성하는데, 이벤트는 응답 시간에 따라 normal 파티션, slow 파티션, tooslow 파티션에 속하게 된다.


이 컨텍스트를 사용하는 EPL은 다음과 같이 context.label을 이용해서 카테고리 이름을 구할 수 있다.


context AccessLogCategory

select context.label from AccessLog


컨텍스트 종류: 논오버래핑 컨텍스트


논오버래핑 컨텍스트는 시작과 끝 조건에 따라 컨텍스트가 시작되거나 끝나는 컨텍스트이다. 컨텍스트가 시작되면 끝 조건 전까지 컨텍스트 파티션은 1개만 존재한다. 끝 조건을 충족해서 컨텍스트가 끝나면 파티션은 0개가 된다.


논오버래핑 컨텍스트는 start와 end를 이용해서 시작 조건과 끝 조건을 지정한다. 시작 조건과 끝 조건에는 이벤트, 패턴, 그론탭, 시간 간격 등이 올 수 있다. 다음은 레퍼런스 문서에 있는 몇 가지 예이다.

  • create context NineToFive start (0, 9, *, *, *) end (0, 17, *, *, *)
    크론탭 표현식을 이용한 시작/종료 시점을 지정한다. 9시에 컨텍스트를 시작하고 17시에 끝낸다.
  • create context PowerOutage start PowerOutageEvent end pattern [PowerOnEvent -> timer:interval(5 sec)]
    PowerOutageEvent가 발생하면 컨텍스트를 시작하고, PowerOnEvent 발생 후 5 초가 지나면 끝낸다.
  • create context Every15minutes start @now end after 15 min
    @now는 지금을 의미하는 어노테이션으로, 컨텍스트는 지금 시작해서 15분 후에 끝낸다. 컨텍스트가 종료되면 @now에 의해 다시 컨텍스트가 바로 시작.
시작 조건은 @now 또는 크론탭이나 이벤트 필터 등의 표현식이 온다.

EPL에서는 context.startTime과 context.endTime을 이용해서 컨텍스트의 시작 시작과 끝 시간을 구할 수 있다.

컨텍스트 종류: 오버래핑 컨텍스트


오버래핑 컨텍스트는 시작과 종료 조건을 지정하는 건 논오버래핑 컨텍스트와 같다. 차이점이 있다면, 오버래핑 컨텍스트는 시작 조건을 충족할 때 마다 새로운 컨텍스트 파티션을 생성한다는 점이다. initiated와 terminated를 이용해서 컨텍스트 시작과 종료 조건을 지정한다. 다음은 레퍼런스 문서에 있는 몇 가지 예이다.

  • create context CtxTrainEnter initiated TrainEventEnter as te terminated after 5 min
    TrainEventEnter 이벤트가 들어올 때 마다 새로운 컨텍스트를 시작하고, 5분이 지나면 해당 컨텍스트를 종료한다.
  • create context CtxEachMinute initiated @now and pattern [every timer:interval(1 min)] terminated after 1 min
    "@now and"는 컨텍스트를 즉각 시작하고, 시작 조건이 충족되면 새로운 컨텍스트를 시작한다. 위 EPL은 컨텍스트를 즉각 시작하고, (1분이 지나면 조건을 충족하므로) 매 1분이 지날 때 마다 컨텍스트를 시작한다. 각 컨텍스트는 1분 이후에 종료된다.
  • create context OrderContext initiated distinct(orderId) NewOrderEvent as newOrder terminated CloseOrderEvent(closeOrderid = newOrder.orderId)
    NewOrderEvent가 들어오면 컨텍스트를 시작한다. 단, orderId를 기준으로 컨텍스트가 이미 존재하면 새로운 컨텍스트는 시작되지 않는다.

컨텍스트 종료시 결과 생성하기


output을 사용하면 컨텍스트가 종료될 때 결과를 생성할 수 있다. 다음처럼 output - when terminated 구문을 사용하면 된다.


context CtxEachMinute select avg(temp) from sensorEvent output snapshot when terminiated



컨텍스트를 사용할 때의 장점


레퍼런스 문서에 따르면 컨텍스트 사용시 다음과 같은 장점이 있다고 한다.

  • 하나의 컨텍스트를 여러 EPL에 적용할 수 있으므로, 그룹핑하기 위한 중복 부분을 제거할 수 있다.
  • EPL을 더 읽기 쉽게 만들어준다.
  • 두 개 이상의 컨텍스트를 조합(중첩)할 수 있다.
  • 파티션이 시간 상 겹칠 수 있다.
  • 엔진이 컨텍스트 단위로 락을 관리하기 때문에 동시성을 높일 수 있다.


반응형

회원 정보 목록 화면과 회원 정보 상세 화면을 반복해서 왔다 갔다 하는 직원이 있다고 하자. 또, 특정 조건으로 회원 목록을 검색해서 1페이지, 2페이지, 3페이지 등 페이지 이동을 하는 직원이 있다고 하자. 이 두 경우 회원 정보를 지속적으로 조회하는 것으로서, 회원 정보를 몰래 수집하는 이상 행동으로 의심해 볼 수 있다. 이런 이상 행동에는 일정 패턴이 반복되는 경우가 많은데, 이런 상황을 발견할 때 유용하게 사용할 수 있는 것이 EPL 패턴이다.


관련 글:

EPL 패턴


EPL은 발생하는 이벤트들이 특정 패턴에 매칭되는지 찾아준다. 다음은 매우 간단한 EPL 패턴의 사용 예다.


EPStatement eps = epService.getEPAdministrator().createEPL(

        "select l from pattern[every l=List ]"

);

eps.addListener(new UpdateListener() {

    @Override

    public void update(EventBean[] newEvents, EventBean[] oldEvents) {

        MemberList l = (MemberList)newEvents[0].get("l"));

        ...

    }

});


위 코드에서 from 절 뒤에 "pattern [ 패턴 ]" 형식으로 패턴을 지정하고 있다. 위 패턴은 매우 단순한 패턴으로, 모든 List 이벤트에 대해 매칭되는 패턴이다. 위 EPL을 실행하면 List 이벤트가 발생할 때 마다 UpdateListener가 실행된다.


위 패턴은 "select l from List l" EPL과 동일한 결과를 발생시키는데, 실제 위와 같이 단순한 패턴을 사용하지는 않는다. EPL은 다양한 패턴 연산자와 구성 요소를 제공하고 있으며 이들을 조합해서 흥미로운 패턴을 만들어낼 수 있다.


패턴의 구성 Atom과 연산자


패턴을 구성하는 주요 Atom(EPL 레퍼런스 문서를 보면 Atom이라고 부름)에는 다음의 두 가지가 있다.


패턴 Atom 

이벤트 필터 Atom

이벤트: List

조건에 맞는 이벤트: List(userId = 'madvirus')

시간 기반 Atom

타이머: timer:interval(10 sec)

스케줄: timer:at(*, 6, *, *, *)


Atom에 대한 연산자는 다음의 4가지 종류가 존재한다.


종류

연산자

반복을 제어하는 연산자

every, [num], until 등

논리 연산자

and, or, not

이벤트의 발생 순서

 ->

표현식 유효 조건

where timer:within, where timer:withinmax, while 등 


연산자의 우선순위는 다음과 같다.

  1. 조건 접미사: where timer:within, where timer:withinmax, while
  2. 단항 연산자: every, not, every distinct
  3. 반복: [num], until
  4. and
  5. or
  6. 순서: ->

연산자 우선순위를 이해하지 않으면 패턴을 다른 의미로 읽거나 작성하게 되므로, 우선순위에 유의해서 패턴을 작성해야 한다.


이벤트 필터


다음 EPL을 보자.


select L from pattern [L=List]


위 EPL에서는 List는 이벤트를 위한 Atom 이다. 패턴에 명시한 이벤트를 참조하려면 '태그이름=이벤트Atom'의 형식으로 태그를 사용해야 한다. 위 코드에서는 'L'이 태그가 된다.


위 EPL을 실행하면 최초에 발생한 List 이벤트만 패칭되며, 이후 발생하는 List 이벤트에 대해서는 매칭되지 않는다. 따라서, List 타입 이벤트가 L1, L2, L3의 순서로 발생하면 L1 이벤트에 대해서만 UpdateListener를 통해 통지 받을 수 있다.


모든 List 이벤트에 대해 매칭되려면 every 연산자를 사용해야 한다.


select L from pattern [every L=List]


위 EPL은 모든 List 이벤트에 매칭된다.


특정 조건을 만족하는 이벤트를 걸러내고 싶다면 다음과 같이 필터 구문을 사용하면 된다.


select L from pattern [every L=List(uri='/member/list', userId='bkchoi')]


"->"를 이용한 발생 순서 매칭


-> 연산자는 매우 유용한 연산자이다. 이벤트의 발생 순서를 지정할 때 -> 연산자를 사용할 수 있다. 아래 패턴은 List 이벤트 다음에 Detail 이벤트가 발생한 경우 매칭된다.


select L, D from pattern [L=List -> D=Detail]


다음은 이벤트 순서와 위 패턴에 매칭되어 선택된 결과를 표시한 것이다.

  • L1, L2, D1, D2, L3, D3 : (L1, D1)

위 결과를 보면 첫 번째로 패턴에 매칭된 이벤트 집합(L1, D1)만 선택된 것을 알 수 있다. 


모든 이벤트 List 이벤트에 대해 매칭되도록 하려면 every 연산자를 사용하면 된다.


select L, D from pattern [every L=List -> D=Detail]


위 패턴은 모든 List 이벤트에 대해 뒤에 Detail이 오면 매칭된다. every 연산자가 -> 보다 우선순위가 높으므로 위 패턴은 [(every L=List) -> D=Detail]과 같다. 다음은 이벤트 흐름에 대해 매칭되어 선택되는 이벤트 집합을 표시한 것이다.

  • L1, L2, D1, D2, L3, D3 : (L1, D1), (L2, D1), (L3, D3)

위 매칭 결과를 보면 (L1, D1)과 (L2, D1)이 선택된 것을 알 수 있다. L1과 L2에 대해 D1이 모두 매칭됐는데, 겹치는 경우 매칭되지 않도록 하고 싶다면 다음과 같이 @SuppressOverlappingMatches 패턴 어노테이션을 사용한다.


select L, D from pattern @SuppressOverlappingMatches [every L=List -> D=Detail]


위 패턴을 사용하면 이벤트 발생 순서에 따라 다음과 같이 (L1, D1)만 선택되고, (L1, D1)과 겹쳐지는 (L2, D1)은 매칭되지 않는다.

  • L1, L2, D1, D2, L3, D3 : (L1, D1), (L3, D3)

-> 연산자를 사용할 때 주의할 점은 -> 연산자 앞 뒤에 지정한 이벤트가 반드시 바로 직후에 발생되야 하는 것은 아니라는 점이다. 예를 들어, 다음과 같은 패턴과 이벤트 순서를 생각해보자.

  • 패턴: select L, D from pattern [every L=List -> D=Detail]
  • 이벤트: L1, V1, D1, D2, L3, D3
이 경우 L1과 D1 사이에 다른 타입의 이벤트 V1이 발생했지만, L1 이벤트 이후에 D1 이벤트가 발생했으므로 (L1, D1) 이벤트 쌍이 매칭되어 선택된다. 즉, '->' 연산자는 바로 뒤에의 의미가 아니라 '이후에'라는 의미이다.


and와 or 연산자


and는 순서에 상관없이 두 표현식이 모두 true면 매칭된다. 예를 들어, 다음 패턴은 List 이벤트와 Detail 이벤트가 도착하면 매칭된다.


select L, D from pattern [L=List and D=Detail]


순서가 상관이 없기 때문에 다음의 두 순서로 이벤트가 들어와도 매칭되서 선택되는 이벤트는 L2, D1 이다. 한 번만 선택되는 이유는 패턴에 every가 없기 때문이다.

  • L2, D1, D2, L3, D3
  • D1, L2, L3, D3
동일 패턴을 사용하면서 다음의 순서로 이벤트가 발생하면 어떻게 될까?
  • L1, L2, D1, D2, L3, D3
이 경우 선택되는 이벤트 조합은 (L1, D1)이다.

아래와 같이 every를 붙이면 어덯게 될까? (every의 우선순위가 and 보다 높기 때문에 and 쪽에 괄호를 넣었다.)

select L, D from pattern [every (L=List and D=Detail)]


다음 순서로 이벤트가 발생할 경우,
  • L1, L2, D1, D2, L3, D3

매칭되는 결과 이벤트는 (L1, D1)과 (L3, D2) 이다.


or는 두 표현식 중 하나만 true면 된다. 즉, 아래 이벤트는 List 이벤트나 Detail 이벤트 중 하나만 발생해도 매칭된다. every가 붙어 있으므로 결과적으로 모든 List 이벤트와 Detail 이벤트가 매칭된다.

select L, D from pattern [every (L=List or D=Detail)]


not 연산자


not은 표현식이 false인 경우 true가 된다. 처음 Statement가 시작될 때에 "not 표현식"은 true 상태로 시작하며, 표현식이 true가 될 때 false가 된다. not은 단독으로 사용되기 보다는 and와 함께 사용된다. 예를 들어, 발권을 하한 뒤 상영관에 입장하기 전에, 표에 있는 할인 쿠폰으로 간신을 구매하지 않은 사용자를 찾고 싶다고 해 보자. 이는 다음과 같은 패턴으로 찾아낼 수 있다.


select i from pattern [

  every i=IssuedTicket -> (Entering(ticketId = i.id) and not UseTicketCoupon(couponId = i.couponId))

]


위 패턴은 모든 영화표발권(IssuedTicket) 이벤트에 대해 이후 같은 표에 대한 입장(Entering) 이벤트가 발생하고, 그 사이에 표의 쿠폰을 사용함(UseTicketCoupon) 이벤트가 발생하지 않으면 매칭된다. 따라서, 이 패턴으로 매칭된 IssuedTicket 이벤트를 이용해서 실시간으로 쿠폰이 사용되지 않은 표를 구매한 고객을 찾아낸다면, 고객에게 SMS를 이용해서 쿠폰 사용을 유도하는 프로모션을 진행할 수도 있을 것이다. (예를 들어, 같은 건물에 있는 커피숍에 가서 500원을 할인 받으라는 등의 프로모션)


Every 연산자


every 연산자는 동작 방식을 잘 이해해서 사용해야 한다. every 연산자를 어떻게 사용하느냐에 따라 패턴 매칭 결과가 달라지는데, 이에 대한 설명은 레퍼런스 문서에 잘 나와 있다. 그래서 레퍼런스 문서에 있는 내용을 발췌해서 아래 표로 정리해 본다.


* 이벤트가 A1, B1, C1, B2, A2, D1, A3, B3, E1, A4, F1, B4 순서로 발생했다고 가정

every 사용 예

설명

매칭 결과 

every (A -> B)

A 이벤트 발생 후, B 이벤트가 발생하면 매칭된다. 매칭된 후 새로운 매처(매칭 검사기)를 시작한다. A가 매칭된 후, 다음 B가 나올 때 까지 새로운 매처가 시작되지 않는다. 즉, A 이벤트 이후에 B 이벤트가 발생하기 전까지의 다른 A 이벤트는 매칭 대상에서 제외된다.

(A1, B1), (A2, B3),

(A4, B4)

every A -> B

(every A) -> B

모든 A에 대해, A 이벤트 발생 후 B 이벤트가 발생하면 매칭된다.

(A1, B1), (A2, B3),

(A3, B3), (A4, B4) 

A -> every B 

A -> (every B)

패턴 Statement가 시작된 뒤, 첫 번째 A 이벤트에 대해 모든 B가 매칭된다.

(A1, B1), (A1, B2),

(A1, B3), (A1, B4) 

(every A) -> (every B)

모든 A 이벤트에 대해, A 이벤트 발생 후 모든 B 이벤트에 매칭된다.

(A1, B1), (A1, B2),
(A1, B3), (A1, B4)
(A2, B3), (A2, B4)
(A3, B3), (A3, B4)
(A4, B4) 


반복 연산자


특정 표현식이 지정한 횟수만큼 반복되면 매칭되도록 하고 싶다면 반복 연산자를 사용하면 된다. 반복 연산자는 '[횟수]' 형식으로 사용한다. 아래 코드는 예이다.


EPStatement eps = epService.getEPAdministrator().createEPL(

        "select s from pattern [every [2] s=SlowResponse ]"

);


eps.addListener(new UpdateListener() {

    @Override

    public void update(EventBean[] newEvents, EventBean[] oldEvents) {

        for (EventBean eb : newEvents) {

            // 길이가 2인 배열

            SlowResponse[] responses = (SlowResponse[]) eb.get("s");

            ...

        }

    }

});


위 패턴을 사용하면 SlowResponse 이벤트가 2번 발생할 때 마다 매칭된다. 매칭 결과를 받는 리스너의 코드를 보면 "s"에 보관된 값이 SlowResponse 배열임을 알 수 있다.


반복 연산자와 not을 사용하면 다음과 같은 패턴을 만들 수도 있다.


select s, o from pattern [

     every [3] ( (s=SlowResponse or o=OvertimeResponse) and not NormalResponse)"

]


위 패턴은 SlowResponse나 OvertimeResponse가 3회 발생하고, 그 사이에 NormalResponse가 발생하지 않은 경우 매칭된다. 즉, 느린 응답이 연속해서 3회 발생하는 상황을 찾아내주는 패턴이다.


반복 연산자와 until


until 연산자는 until 뒤의 표현식이 충족될 때 매칭된다. 아래 패턴은 NormalResponse가 발생하기 전까지 SlowResponse 이벤트를 반복한다.


select s from pattern [ every (s=SlowResponse until NormalResponse) ]


NormalResponse 이벤트 발생 전에 SlowResponse 이벤트가 5번 발생하면 s는 SlowReponse 객체 5개를 갖는 배열이 할당된다.


범위를 갖는 반복 연산자는 다음과 같은 형식을 갖는다.

  • [3:8] - 최소 3번에서 최대 8번까지 반복
  • [3:] - 최소 3번 반복
  • [:8] - 최대 8번 반복
until과 범위를 갖는 반복 연산자를 함께 사용해도 된다. 다음은 예이다.


* S1 S2 N1 S3 S4 S5 N2 순서로 이벤트가 발생했다고 가정 (S:Slow, N:Normal)

 패턴 예

 결과

 every ([2:] s=SlowResponse until NormalResponse)

(S1, S2) *N1 발생 시점에 생성

(S3, S4, S5) *N2 발생 시점에 생성 

 every ([:3] s=SlowResponse until NormalResponse)

(S1, S2) *N1 발생 시점에 생성 

(S3, S4, S5) *N2 발생 시점에 생성

 every ([:2] s=SlowResponse until NormalResponse)

(S1, S2) *N1 발생 시점에 생성

(S3, S4) *N2 발생 시점에 생성

 every ([3:] s=SlowResponse until NormalResponse)

(S3, S4, S5) *N2 발생 시점에 생성


타이머 패턴 가드


where timer:within 가드는 표현식 뒤에 위치하며, 일정 시간 안에 표현식이 true가 되면 매칭되고 지정한 시간이 지나면 매칭되지 않는다. 다음은 타이머 가드의 사용 예이다.


select s from pattern [s=SlowResponse where timer:within(2 sec)]


위 패턴은 최초 2초 안에 SlowResponse 이벤트가 발생하는지 여부를 확인한다. 최초 2초 안에 이벤트가 발생하면 해당 SlowResponse 이벤트가 선택되고 타이머가 종료된다. 2초가 지나면 타이머가 종료되고 더 이상 패턴은 사용되지 않는다.


최근 2초간 발생한 SlowResponse 이벤트를 구하고 싶다면 다음과 같이 every 연산자를 함께 사용한다. 아래 패턴을 실행하면 2초 안에 SlowResponse가 이벤트가 발생할 때 마다 select 결과를 받게 된다.


select s from pattern [(every s=SlowResponse) where timer:within(2 sec)]


아래 코드는 어떤 의미일까?


select s from pattern [every ((s=SlowResponse -> SlowResponse) where timer:within(2 sec))]


위 코드는 최초 시작시 타이머가 시작되어 (SlowResponse -> SlowResponse)가 2초 이내에 발생하는지 여부를 확인한다. 타이머가 종료되면 every 연산자에 의해 새로운 타이머가 시작된다. 위 패턴을 사용했을 때 이벤트 발생 시점과 select 결과는 아래 그림과 같이 이뤄진다. 아래 그림에서 화살표는 타이머의 시작과 끝을 나타낸다.



위 그림과 동일한 시점에 이벤트가 발생했을 때, 아래 코드는 어떤 결과를 만들까?


select s from pattern [every s=SlowResponse -> SlowResponse where timer:within(2 sec)]

// 우선순위에 따라 evern s=SlowResponse -> (SlowResponse where timer:within(2 sec)) 와 동일


이 패턴은 모든 SlowResponse 이벤트가 발생한 후 2초 이내에 SlowResponse 이벤트가 발생하면 매칭된다.


아래 패턴은 어떨까?


select s from pattern [every s=SlowResponse -> (not NormalResponse where timer:within(2 sec)) ]


얼핏 생각하면 SlowResponse 이벤트 발생 후 2초 이내에 NormalResponse가 발생하지 않으면 매칭될 것 같지만, 실제로는 SlowResponse 이벤트가 발생하는 순간에 바로 매칭된다. 실제로 특정 이벤트가 발생하고 나서 일정 시간 안에 다른 이벤트가 발생하지 않는 패턴을 만들고 싶다면 다음에 설명할 time:interval Atom을 사용해야 한다.


시간 Atom


패턴의 Atom은 이벤트 필터 외에 시간 간격 Atom인 timer:interval 을 제공하고 있다. 다음은 time:interval을 사용한 패턴 예이다.

select o from pattern [
    every o=OrderForm -> timer:interval(10 min) and not OrderComplete(id=o.id)
]

위 코드는 OrderForm 이벤트가 발생한 후, 같은 id를 갖는 OrderComplete 이벤트가 10분 안에 발생하지 않으면 매칭된다. 즉, 주문 양식까지 들어왔는데 10분 동안 결제를 하지 않은 경우를 찾아내는 패턴이다.


반응형

관련 글


Insert into를 이용한 이벤트 발생

EPL은 insert into 를 사용해서 새로운 이벤트를 발생시킬 수 있다. insert into의 사용 방법은 다음과 같이 단순하다.

insert into SlowResponse
select url, responseTime as respTime from AccessLog where responseTime > 500

위 코드는 AccessLog 이벤트의 responseTime 프로퍼티가 500보다 크면 url과 responseTime 프로퍼티를 선택하고, 선택된 프로퍼티를 이용해서 새로운 SlowResponse 이벤트를 생성한다.

insert into를 이용해서 생성하는 이벤트는 select에서 선택한 프로퍼티와 동일한 프로퍼티를 갖고 있어야 한다. 예를 들어, 위 코드의 경우 SlowResponse 이벤트는 url과 respTime 프로퍼티를 갖고 있어야 한다. (SlowResponse 이벤트를 자바 클래스로 정의했다면 setUrl()과 setRespTime() 메서드를 필요로 한다.)

insert into를 이용해서 생성한 이벤트는 일반적인 다른 이벤트와 마찬가지로 EPL을 이용해서 사용할 수 있다. 예를 들어, 위 코드로 생성한 SlowResponse 이벤트는 다음의 EPL에서 사용할 수 있게 된다.

select count(*) as count from SlowResponse.win:time(2 sec) having count(*) > 3


SlowResponse 클래스가 다음의 생성자를 갖고 있다고 하자.


public class SlowResponse {

    private String url;

    private long respTime;


    public SlowResponse(String url, long respTime) {

        this.url = url;

        this.respTime = respTime;

    }


이 경우, inserto into에서는 생성자를 이용할 수 있다.


insert into SlowResponse(url, responseTime)

select url, responseTime from AccessLog where responseTime > 500


조인(Join)


Esper는 스트림 간의 조인을 제공한다. RDMBS의 조인과 유사하며, 내부 조인과 외부 조인을 함께 지원하고 있다. RDBMS와의 차이가 있다면, EPL에서의 조인은 제한된 뷰에서의 조인을 지원한다는 점이다. (win:time 이나 win:length 등이 뷰에 해당하며, Esper는 앞서 Esper 초보 시리즈 2 - EPL 기초 에서 봤던 시간/길이 윈도우 외에 다양한 뷰를 제공하고 있다.)


아래 코드는 두 스트림의 조인을 이용한 코드의 예이다.


EPStatement eps = epService.getEPAdministrator().createEPL(

  "select v, o from ProductView.win:time(2 sec) as v, ProductOrder.win:time(2 sec) as o "

);

eps.addListener(new UpdateListener() {

    @Override

    public void update(EventBean[] newEvents, EventBean[] oldEvents) {

        for (EventBean eb : newEvents) {

            ProductView pv = (ProductView) eb.get("v");

            ProductOrder po = (ProductOrder) eb.get("o");

        }

    }

});


위 코드는 1분 동안 발생한 ProductView와 ProductOrder 이벤트를 조인한 결과를 select 하고 있다.



위 그림에서 ProductView 이벤트가 1초, 1.5초, 2.5초 시점에 발생하고, ProductOrder 이벤트는 2초와 3.3초 시점에 발생했다. 앞서 EPL에서 두 이벤트에 대해 동일하게 2초의 시간 윈도우를 지정했는데, 이 경우 각 시점에 select로 조회되는 조인 결과는 다음과 같다.

  • 주황색(2초 시점): O1 이벤트가 발생하고, 그 결과로 (V1, O1), (V2, O2) 조인 결과가 select 된다.
  • 녹색(2.5초 시점): V3 이벤트가 발생하고, 그 결과로 (V3, O1) 조인 결과가 select 된다.
  • 빨강색(3.3초 시점): O2 이벤트가 발생하고, 그 결과로 (V2, O2), (V3, O2) 조인 결과가 select 된다.
where 절을 이용해서 조인 조건을 정의할 수도 있다.

select v, o from ProductView.win:time(2 sec) as v, ProductOrder.win:time(2 sec) as o
where v.productId = o.productId and v.userId = o.userId

외부 조인을 사용할 수도 있다. 다음은 외부 조인의 사용예이다. SQL과 동일하게 on 절을 이용해서 조건을 지정한다.


select v, o 

from ProductView.win:time(2 sec) as v 

left outer join ProductOrder.win:time(2 sec) as o 

on v.productId = o.productId and v.userId = o.userId 


leff 외에 right, full 외부 조인을 지원한다.


아래 그림은 다음의 쿼리를 실행했을 때 결과를 정리한 것이다.

  • select v, o from ProductView.win:time(2 sec) as v 
    left outer join ProductOrder.win:time(2 sec) as o 


이 그림에서 회색 상자는 특정 시점에 select 결과로 전달받은 데이터를 표시한 것이다. V1 이벤트가 발생한 시점을 기준으로 아직 ProductOrder 이벤트가 존재하지 않으므로 select 결과는 (V1, null)이 된다. 비슷하게 V2 이벤트 발생 시점에도 (V2, null)을 결과로 받는다. O1 이벤트가 발생하는 시점이 되면, EPL에서 지정한 시간 윈도우 범위 안에 V1, V2 이벤트가 조인할 수 있는 O1 이벤트가 생기게 되고, 따라서 select 결과로 (V1, O1), (V2, O1)을 받게 된다.


외부 조인을 사용하면 좀 더 재미난 쿼리를 만들 수 있다. 아래 코드를 보자.


select rstream v

from ProductView.win:time(1 min) as v

left outer join ProductOrder.win:time(1 min) as o

on v.productId = o.productId and v.userId = o.userId

where o is null


위 EPL은 select 절에 rstream을 사용했는데, 이렇게 하면 데이터가 윈도우를 벗어날 때 쿼리를 실행하게 된다. 따라서, 위 코드는 ProductView 이벤트가 발생한 뒤로 1분 안에 동일 제품/동일 사용자의 ProductOrder 이벤트가 발생하지 않으면 해당 ProductView 이벤트를 select 결과로 받게 된다. 이 때, UpdateListener는 아래 코드와 같이 oldEvents 파라미터를 이용해서 결과를 얻을 수 있다.


EPStatement eps = epService.getEPAdministrator().createEPL(

        "select rstream v "+

                "from ProductView.win:time(2 sec) as v "+

                "left outer join ProductOrder.win:time(2 sec) as o " +

                "on v.productId = o.productId and v.userId = o.userId "+

                "where o is null"

);


eps.addListener(new UpdateListener() {

    @Override

    public void update(EventBean[] newEvents, EventBean[] oldEvents) {

        for (EventBean eb : newEvents) {

            ProductView pv = (ProductView) eb.get("v");

            ...

        }

    }

});


조인이 유용하지만, 좀 더 상황이 복잡할 경우에는 패턴을 사용해서 풀어낼 수 있다. 패턴 대한 내용은 이후에 따로 정리할 예정이다.


서브 쿼리


Esper는 서브 쿼리도 지원한다. 서브 쿼리는 select, where, having, 필터 등에서 사용할 수 있다. 다음은 서브 쿼리의 예다.


select count(*) as count, (select max(value) from Tps.win:time(5 sec)) as maxTps

from SlowResponse.win:time(3 sec) s having count(*) > 3


다음은 where 절에서 서브 쿼리를 사용하는 예이다.


select * from SlowResponse.win:time(4 sec) s

where s.responseTime > (select avg(responseTime) from SlowResponse.win:time(2 sec))


where 절에서 exists, in, not in, any, all 등을 사용할 수도 있다. 다음은 exists를 사용한 예이다.


select rstream v from ProductView.win:time(10 min) as v 

where not exists 

        (select * from ProductOrder.win:time(10 min) as o 

         where v.productId = o.productId and v.userId = o.userId)


위 코드는 ProductView 이벤트의 시간 윈도우에서 벗어나는 이벤트에 대해 그 이벤트와 매칭되는 ProductOrder 이벤트가 존재하지 않는지 확인한다. 즉, 상품을 본 후 10초 이내에 주문을 하지 않은 ProductView 이벤트를 구한다. (이 결과는 앞에서 left outer join 사용 예에서의 결과와 동일하지만, 조인의 경우보다 이해하기가 더 쉬운 것을 알 수 있다.)



반응형


Esper는 이벤트 시점을 기준으로 출력 결과를 발생시킨다. 이벤트가 들어오거나 타임 윈도우에서 이벤트가 벗어나거나 할 때 관련된 이벤트 처리 결과를 리스너에 전달한다. 예를 들어, 아래 EPL을 생각해보자.


select avg(cost) from StockEvent


위 EPL을 실행하면 StockEvent가 발생할 때 마다 cost의 평균값을 리스너를 통해 받게 된다. 그런데, 단순히 평균의 추이를 알고 싶은 거라면 모든 이벤트마다 평균값을 받는 것 보다 10초, 1분과 같이 주기적으로 그 시점의 평균값을 확인해도 문제가 없을 것이다. 이렇게 출력 자체를 제어하고 싶을 때 output 절을 사용할 수 있다.


관련 시리즈:


output 을 이용한 출력 시점 제어


output은 출력 시점을 제어하기 위해 사용된다. 예를 들어, 10초 마다 출력을 받고 싶다거나, 이벤트 5개 당 첫 번째에 대해 결과를 받고 싶다거나 할 때 output을 사용한다.


output의 기본 사용 방법은 다음과 같다.


select ... from ...

output [all | first | last | snapshot ] every N [seconds | events]


every N에서 N은 숫자이며, seconds는 초를 events는 이벤트 개수를 뜻한다. 즉, "every 10 seconds"는 매 10초마 결과를 리스너에 전달하게 된다.


output 뒤에 all, first, last, snapshot는 다음의 의미를 갖는다.

  • all: 출력 주기에 발생한 모든 이벤트 출력
  • first: 출력 주기에 발생한 첫 번째 이벤트 기준 출력
  • last: 출력 주기에 발생한 마지막 이벤트 기준 출력
  • snapshot: 현재 시점의 출력 결과. 윈도우와 함께 사용되지 않을 경우 last와 같은 결과 출력.

예를 들어, 아래 목록에 첫 번째 EPL은 매 2초 간격으로 출력을 발생시키는데, 출력 주기(2초) 범위에 발생한 모든 이벤트에 대한 출력을 발생시킨다.

  • select avg(cost) as avg from StockTick output all every 2 seconds
  • select avg(cost) as avg from StockTick output first every 2 seconds
  • select avg(cost) as avg from StockTick output last every 2 seconds
  • select avg(cost) as avg from StockTick output snapshot every 2 seconds

위 EPL에 대해 실제 리스너에 전달된 출력 결과를 시간대 별로 출력해보면 아래 그림과 같다.



위 그림에서 상위 두 개는 code1 및 code2 이벤트의 발생 시점이다. 그리고, 회색 화살표는 2초 간격을 표시한 것이다. 녹색 삼각형으로 표시된 output all의 경우, 매 2초 간격마다 발생한 모든 이벤트 처리 결과가 리스너에 전달되는 것을 알 수 있다.


위 그림에서 회색 X 표시가 output first에 의해 발생한 이벤트 발생 시점을 표시한 부분이다. output first의 경우는 약간 특이하게 동작한다. output first의 경우 현재 주기에서 발생한 이벤트가 없으면, 다음 주기가 시작할 때 현재 시점 기준으로 마지막 결과를 리스너에 전달하고, 그 다음 주기에서 새로운 이벤트가 발생하면 그 시점의 결과를 리스너에 다시 전달한다.


group by와 output


group by와 output이 만나면 약간 다른 방식으로 동작한다. group by와 output이 만나면 다음과 같은 방식으로 동작한다.

  • first: 그룹 별 출력 주기에 발생한 첫 번째 이벤트를 기준으로 출력 결과 생성. 그룹 별 첫 번째 이벤트 발생 시점에 출력 생성. 출력 주기에 이벤트가 없으면 출력 결과를 생성하지 않음.
  • last: 그룹 별 출력 주기에 발생한 마지막 이벤트를 기준으로 출력 결과 생성. 출력 주기 종료 시점에 마지막 출력 결과 생성. 출력 주기에 이벤트가 없으면 출력 결과를 생성하지 않음.
  • all: 출력 주기 종료 시점에 마지막 출력 결과 생성. 출력 주기에 이벤트가 없어도 출력 결과 생성.
  • 키워드 없음: 출력 주기에 발생한 모든 출력 결과 생성. 출력 주기 종료 시점에 출력 결과 생성. 출력 주기에 이벤트가 없으면 출력 결과를 생성하지 않음.

아래 쿼리는 작성 예이다.

  • select code, avg(cost) as avg from StockTick group by code output first every 2 sec
  • select code, avg(cost) as avg from StockTick group by code output last every 2 sec
  • select code, avg(cost) as avg from StockTick group by code output all every 2 sec
  • select code, avg(cost) as avg from StockTick group by code output every 2 sec

위 쿼리를 실행해 보면 이벤트 발생 시점에 따라 아래와 같이 출력 결과가 발생한다.




윈도우와 snapshot의 동작 방식


윈도우와 output snapshot을 함께 사용하면 윈도우를 기준으로 마지막 값이 출력된다. 아래 그림은 다음의 세 쿼리를 실행한 결과를 정리한 것이다.

  • select avg(cost) as avg from StockTick.win:time_batch(3 sec) output snapshot every 1.5 sec
  • select avg(cost) as avg from StockTick.win:time_batch(3 sec)
  • select avg(cost) as avg from StockTick.win:time(3 sec) output snapshot every 1.5 sec



출력 주기를 1.5초로 지정했는데, 그 출력 결과를 보면 시간 배치 윈도우와 시간 윈도우에서 output snapshot이 다르게 동작하는 것을 확인할 수 있다. 시간 배치 윈도우와 output snapshot을 함께 사용한 결과를 보면 세 번째 출력 결과가 5000인데, 그 이유는 다음과 같다.

  • 시간 배치 윈도우에 적용된 output snapshot의 세 번째 출력 결과 시점은 약 8.5초
  • 시간 배치 윈도우의 처리 구간은 7초~9초
  • 따라서, 아직 현재 시간 배치 윈도우의 결과가 없으므로, 이전 배치의 결과인 5000을 값으로 출력
시간 윈도우의 경우는 항상 결과가 있기 때문에 현재 출력 시점을 기준으로 시간 윈도우 크기 범위의 결과를 출력한다.


추가 내용


출력 주기를 크론탭을 이용해서 표현 가능하다. 다음은 매 15분 주기로 결과를 출력하는 예이다.

  • select avg(cost) as avg from StockTick.win:time_batch(30 minutes) output snapshot at(*/15, *, *, *, *)

after를 사용해서 최초 일정 시간 동안 결과를 출력하지 않도록 지정할 수 있다.

  • select avg(cost) as avg from StockTick output after 4 sec last every 1.5 sec


  1. 2017.08.06 21:52

    비밀댓글입니다

    • 최범균 madvirus 2017.08.11 20:33 신고

      안녕하세요, 제가 이 글을 쓴지 몇 년 지나기도 했고 Esper를 손 놓은지 오래 되기도 해서 최신 버전의 Esper에 대한 내용을 새로 쓸 수준이 안 됩니다.

반응형

Esper를 잘 사용하려면 EPL을 잘 사용해야 할 것 같다. EPL 레퍼런스가 워낙 양이 많아 다 읽어보지 못했지만, Esper를 학습하는 진입 지점으로 사용할 수 있도록 몇 가지 기초적인 내용을 좀 추려보았다.


관련 시리즈:


EPL 기본 구조


EPL의 기본 구조 및 EPL이 선택한 데이터를 UpdateLister에서 접근할 때 사용하는 코드는 아래와 같다.


// 모든 이벤트를 선택

select * from StockTick


// UpdateListener의 구현 코드

EPStatement eps = epService.getEPAdministrator().createEPL("select * from StockTick");

eps.addListener(new UpdateListener() {

    @Override

    public void update(EventBean[] newEvents, EventBean[] oldEvents) {

        StockTick pe = (StockTick) newEvents[0].getUnderlying();

        ...

    }

});


select에서 선택한 이벤트는 UpdateListener.update() 메서드의 newEvents 파라미터로 전달된다. 위 코드의 EPL의 경우 한 번에 한 개의 이벤트만 선택되기 때문에, newEvents에는 길이가 0인 배열에 전달된다.


EventBean은 선택한 이벤트에 접근할 수 있는 몇 가지 메서드를 제공하고 있는데, 위 EPL과 같이 select 절에서 "*"을 사용한 경우 getUnderlying() 메서드를 이용해서 데이터에 접근할 수 있다.


다음은 이벤트에서 특정 프로퍼티만 사용하는 경우의 코드이다.


// 모든 이벤트의 특정 프로퍼티를 선택

select name, code, cost from StockTick


// UpdateListener.update 메서드

public void update(EventBean[] newEvents, EventBean[] oldEvents) {

    String name = (String) newEvents[0].get("name");

    String code = (String) newEvents[0].get("code");

    int cost = (Integer) newEvents[0].get("cost");

    ...

}


where 절을 이용해서 특정 조건을 충족하는 이벤트를 선택할 수 있다.


// 특정 조건을 충족하는 이벤트를 선택

select * from StockTick where rate > 10 and cost > 5000


다음처럼 from 절의 이벤트나 select 절에서 별칭을 사용할 수 있다.


// 별칭 사용

select s.name, s.code, s.cost as cost from StockTick s where s.rate > 10 


// UpdateListener.update 메서드

public void update(EventBean[] newEvents, EventBean[] oldEvents) {

    String name = (String) newEvents[0].get("s.name");

    String code = (String) newEvents[0].get("s.code");

    int cost = (Integer) newEvents[0].get("cost");

}


위 코드에서 눈여겨 볼 점은 select 절에서 "별칭.프로퍼티" 이름으로 프로퍼티의 경우 EventBean.get() 메서드에서도 그대로 "별칭.프로퍼티이름"을 사용한다는 것이다. 이 형식을 사용하기 싫으면 s.cost의 경우처럼 as 를 이용해서 이름을 변경하면 된다.


윈도우


최근 10분간 발생한 모든 주문의 합을 실시간으로 추적하고 힢다면 어떻게 해야 할까? 또는 가장 최근에 발생한 20개의 이벤트 중 가장 큰 값을 구하려면 어떻게 해야 할까? 이럴 때 사용할 수 있는 게 윈도우이다. 윈도우는 일정 시간 동안 또는 일정 개수의 이벤트를 메모리에 보관하고, EPL에서는 이 윈도우에 보관된 이벤트 목록를 대상으로 결과를 뽑아낼 수 있다.


주요 윈도우는 다음의 네 가지가 있다.

  • 시간 윈도우
  • 시간 배치 윈도우
  • 길이 윈도우
  • 길이 배치 윈도우


시간(time) 윈도우


시간 윈도우는 이벤트가 발생한 기준으로 지정한 시간 동안만 이벤트를 보관한다. 예를 들어, 지정 시간이 1분이라면, 마지막에 발생한 이벤트를 기준으로 최근 1분 사이에 발생한 이벤트만 윈도우에 보관된다. 지정한 시간이 지나면 이전 이벤트는 윈도우에서 제거된다. 아래 그림은 시간 윈도우의 동작 방식을 그림으로 부여주고 있다.


* 출처: http://esper.codehaus.org/


위 그림에서 윈도우 크기를 4초로 잡았다고 가정하자. W1 이벤트는 t+4초 시점에 발생했다. 이 시점에서 윈도우에는 W1 이벤트만 보관되어 있다. 위 그림에서 둥근 사각형이 윈도우를 의미한다. t+5초 시점에 W2 이벤트가 발생했고, 이 시점에 윈도우에는 W2, W1 이벤트가 보관된다. t+8초 시점에 되면 W1은 윈도우에서 제거된다.


[참고]

윈도우에서 제거된 이벤트는 UpdateListener를 통해서 받을 수 있는데, 그러기 위해서는 select 구문에서 rstream 키워드를 사용해야 한다. 이에 대한 내용이 궁금하다면, EPL 레퍼런스 문서를 확인해보도록 하자.


시간 윈도우를 정하는 방식은 아래와 같다.


select * from StockTick.win:time(5 seconds) s where s.cost > 100


보통은 위와 같이 단순히 select를 하기 보다는 그룹 함수와 함께 사용되는 경우가 많다. 예를 들어, 아래 코드는 최근 30초간 발생한 StockTick 이벤트를 code로 그룹핑 한 뒤, 각 code별로 최대 cost 및 StockTick을 구한다.


select s as tick, max(cost) as maxCost from StockTick.win:time(30 seconds) s group by s.code


시간 배치 윈도우


시간 배치 윈도우는 일정 간격으로 이벤트를 모아서 처리할 때 사용한다. 시간 배치 윈도우의 동작 방식은 아래 그림과 같다.


* 출처: http://esper.codehaus.org/


시간 배치 윈도우는 지정한 시간동안 들어온 이벤트를 윈도우에 보관한다. 시간 배치가 종료되는 시점에 select 결과를 전달하고, 다시 새로운 시간 배치를 시작한다.


시간 배치를 지정할 때에는 아래 코드처럼 win:time() 뷰를 사용한다.


select s as tick, max(cost) as maxCost from StockTick.win:time_batch(30 seconds)

group by s.code


시간 배치 윈도우를 사용할 때 유의할 점은 타임 배치 내에 이벤트가 들어오지 않으면 null을 발생하고, 그 다음 타임 배치에서도 이벤트가 들어오지 않으면 null을 발생하지 않는다는 점이다. 예를 들어, 3초 타임 배치를 사용하는 "select max(cost) from StockTick.win:time_batch(3 sec)" EPL을 사용하고, 다음의 시간 순으로 이벤트가 들어왔다고 하자.

  • 1초(100) -> 4초(200) -> 13초(500) -> 이후 안 들어옴
이 경우 3초 타임 배치에 의해 생성되는 결과는 다음과 같다.
  • 3초(100) -> 6초(200) -> 9초(null) -> 15초(500) -> 18초(null)


길이 윈도우


시간 윈도우가 지정한 시간만큼 윈도우에 보관된다면, 길이 윈도우는 지정한 개수 만큼의 이벤트만 윈도우에 보관한다. 동작 방식은 아래 그림과 같다.


* 출처: http://esper.codehaus.org/


위 그림은 길이가 5인 길이 윈도우의 경우를 보여주고 있는데, 이 경우 윈도우에는 최대 5개의 이벤트만 보관된다. 이미 5개의 이벤트가 윈도우에 보관된 상태에서 새로운 이벤트가 추가되면, 가장 오래된 이벤트가 윈도우에서 제거된다.


길이 윈도우를 사용하는 방법은 다음과 같다.


select s as tick, max(cost) as maxCost from StockTick.win:length(5) s group by code


길이 배치 윈도우


시간 배치 윈도우와 비슷하게 길이 배치 윈도우는 지정한 길이 만큼 이벤트가 차면, select 결과를 발생시키고 새로운 배치를 시작한다. 길이 배치는 win:length_batch()를 사용해서 지정한다.


select s as tick, max(cost) as maxCost from StockTick.win:length_batch(5) s group by code


필터


where 절 외에 이벤트에 조건을 거는 또 다른 방법으로 필터가 있다. 필터를 사용하면 다음과 같이 이벤트의 조건을 지정할 수 있다. 필터를 사용할 때에는 아래 코드처럼 이벤트 이름 뒤에 괄호를 사용해서 지정한다.


select * from StockTick(cost > 2000).win:length(3) 


where 절과 필터가 이벤트의 조건을 확인하는 건 동일하지만, 다음의 중요한 차이점이 있다.

  • 필터 조건을 충족하지 않는 이벤트는 윈도우에 포함되지 않는다.
  • where 조건을 충족하지 않는 이벤트는 일단 윈도우에 포함되고 그 다음에 select 과정에서 where 조건을 검사한다.
예를 들어, 길이가 3인 윈도우를 만들고 위 코드와 같은 필터를 사용한 상태에서 다음의 순서로 이벤트를 발생시켰다고 해 보자.
  • cost=3000, cost=1000, cost=2000, cost=4000, cost=5000, cost=6000 (왼쪽부터 이벤트가 들어온다고 가정)
이 경우, cost가 1000이거나 2000인 이벤트는 애초에 윈도우에 들어가지 않는다. 따라서, cost가 5000인 이벤트가 들어온 시점에 길이 윈도우에는 다음과 같은 순서로 이벤트가 들어가게 된다.
  • [cost=5000, cost=4000, cost=3000] (왼쪽이 최신)
반면에 필터를 사용하지 않고 "select * from StockTick.win:length(3) where cost > 2000" 코드를 실행했다면, cost가 5000인 이벤트가 들어온 시점에 윈도우에는 다음의 이벤트가 남아 있게 된다.
  • [cost=5000, cost=4000, cost=2000] (왼쪽이 최신)


그룹핑과 집합 연산


EPL도 SQL과 유사하게 count() 등의 집합 관련 함수와 group by/having를 이용한 그룹핑을 지원하고 있다. Esper가 제공하는 함수 목록은 레퍼런스 문서를 찾아보도록 하고, 여기서는 기본적인 동작 방식 위주로 설명할 것이다.


아래 코드는 집합 함수의 사용 예이다. 지금까지 들어온 StockTick 이벤트에 대해서 최대 cost와 평균 cost를 구한다.


select max(cost) as max, avg(cost) as avg from StockTick


다음처럼 시간 윈도우를 이용해서 최근 3초간 최대 cost와 평균 cost를 구할 수도 있다.


select max(cost) as max, avg(cost) as avg from StockTick.win:time(3 sec)


group by는 SQL과 마찬가지로 이벤트의 프로퍼티를 이용해서 그룹핑을 할 때 사용된다. 예를 들어, 종목 코드 별로 현재까지 최대/평균을 알고 싶다면 다음과 같이 group by를 사용하면 된다.


select code, max(cost) as max, avg(cost) as avg from StockTick group by code


having을 사용하면 그룹 함수에 대한 선택 조건을 지정할 수 있다. (avg(), sum() 등의 그룹 함수는 where 절에서 제약할 수 없다.)


select code, avg(cost) as avg from StockTick group by code having avg(cost) > 2000


그룹핑과 윈도우 이동에 따른 결과 이벤트 발생 시점


아래 EPL을 보자.


select code, avg(cost) as avg from StockTick.win:time(3 sec) group by code


이 EPL은 code를 기준으로 그룹을 구성하고, 3초 시간 윈도우를 지정하였다.이 경우 윈도우 새롭게 이벤트가 들어오면 위 select 결과가 UpdateListener에 전달된다. 아래 그림은 이벤트 발생 시점과 UpdateListener과 호출되는 시점을 표시한 것이다. 이 그림을 보면 이벤트가 발생하자 마자 거의 바로 리스너가 호출되는 것을 확인할 수 있다. 이벤트가 들어오면 윈도우에 포함된 이벤트들의 cost 평균이 바뀌므로 리스너에 새로운 평균을 전달한다.



위 그림에서 주황색 상자는 5.5초 정도에서 3초 구간을 표시한 것인데, 5.5초 정도에는 새롭게 윈도우에 들어온 이벤트가 없음에도 리스너가 호출되는 것을 알 수 있다. 여기서 리스너가 호출된 이유는 윈도우에서 이벤트가 벗어났기 때문이다. 윈도우에서 이벤트가 벗어나면 윈도우의 평균값이 바뀌게 되고, 따라서 바뀐 평균을 리스너에 전달하는 것이다.


그룹핑과 그룹 함수에 따른 select 출력 개수


기본적으로 아래 코드는 1개 이벤트 당 1개의 출력을 생성한다.


select * from StockTick


그런데, group by 대상과 select 대상을 어떻게 잡느냐에 따라서 select로 발생하는 결과 개수가 달라진다. 다음표는 레퍼런스를 문서를 참고해서 몇 가지 상황 별로 이를 정리한 것이다. 배치가 아닌 시간 윈도우나 길이 윈도우를 사용할 경우 매 이벤트마다 결과가 발생되므로, 이해를 돕기 위해 시간 배치 윈도우를 기준으로 설명하였다.


조건 

예제 쿼리 

출력 개수 

그룹핑 없음, 그룹 함수 없음

select * from StockTick 

이벤트 당 1개 

그룹핑 없음

그룹 함수만 사용

select avg(cost) 

from StockTick.win:time_batch(3 sec)

배치 시간 당 1개 

그룹핑 없음,

그룹 함수와 프로퍼티 함께 사용

select code, avg(cost)

from StockTick.win:time_batch(3 sec)

이벤트 당 1개 

그룹핑 존재,

그룹 함수와 그룹핑 대상 프로퍼티만 사용 

select code, avg(cost)

from StockTick.win:time_batch(3 sec)

group by code

그룹 당 1개 

그룹핑 존재

그룹 함수/그룹핑 대상 프로퍼티 및 다른 프로퍼티 함께 사용

select name, code, avg(cost)

from StockTick.win:time_batch(3 sec)

group by code

이벤트 당 1개




반응형


이래 저래 데이터 처리 관련된 기술을 조사하던 중 Esper란 놈을 알게 되었다. 몇 가지 글을 읽어보니 기회가 되면 사용해보고 싶은 욕구가 생겼다. 나처럼 Esper에 관심있는 분들을 위한 퀵스타트 문서를 정리해 본다.


관련 시리즈:


Esper란?


Esper 사이트에 따르면 Esper란 다음과 같은 것이다.

Esper is a component for complex event processing (CEP) and event series analysis.

Esper는 실시간으로 발생하는 이벤트를 분석하고 처리하기 위한 컴포넌트로서, 다음과 같은 방식으로 동작한다.


외부에서 발생한 이벤트를 Esper 엔진에 전달하면, 이벤트를 분석한다. 이벤트를 분석할 때 EPL 이라는 언어를 사용하는데, 이 언어를 이용해서 조건에 맞는 이벤트를 찾고 처리한 결과를 데이터를 생성한다.


EPL 언어는 SQL과 유사한 구조를 갖고 있기 때문에 조금만 노력하면 금방 익힐 수 있을 것 같다. EPL과 SQL을 비교해 보면, SQL에 존재하는 데이터에 대해 쿼리를 실행하는 방식이라면 EPL은 실시간으로 발생되는 이벤트에 대해 쿼리를 실행하는 방식이라고 말 할 수 있다.


Esper 다운로드


Esper를 사용하려면 Esper를 다운로드 받고 필요한 jar 파일을 복사하면 된다. 그런데, Esper는 EPL 파싱을 위해 Antlr을 사용하고 로깅을 위해 Commons Logging을 사용하고 있기 때문에 실제로는 Esper 배포판뿐만 아니라 의존 모듈도 함께 다운로드 받아야 한다.


메이븐을 사용하고 있다면 다음과 같이 의존을 추가해주면 된다.


<dependencies>

<dependency>

<groupId>com.espertech</groupId>

<artifactId>esper</artifactId>

<version>4.11.0</version>

</dependency>

</dependencies>


간단한 예제 만들어보기


여기서 만들어 볼 예제는다음과 같은 간단한 예제이다.

  • 현재가가 전날 종가대비 10% 이상 오른 종목을 통지한다.

이벤트로 사용될 클래스


먼저 할 작업은 이벤트로 사용될 클래스를 작성하는 것이다.


public class StockTick {


    private String name;

    private String code;

    private int cost;

    private int fluctuation;

    private double rate;


    public StockTick(String name, String code, int cost, int fluctuation, double rate) {

        this.name = name;

        this.code = code;

        this.cost = cost;

        this.fluctuation = fluctuation;

        this.rate = rate;

    }


    public String getName() {

        return name;

    }


    public String getCode() {

        return code;

    }


    public int getCost() {

        return cost;

    }


    public int getFluctuation() {

        return fluctuation;

    }


    public double getRate() {

        return rate;

    }

}


위 코드는 한 종목의 이름, 코드, 현재주가, 전일비, 등낙율을 담고 있다.


조건을 충족하는 종목을 찾아주는 코드


아래 코드는 Esper를 이용해서 전날 종가보다 10% 이상 상승한 종목을 출력해주는 기능을 제공하는 클래스이다.


import com.espertech.esper.client.*;


public class StockFinder {

    private EPServiceProvider epService;

    private EPStatement eps;

    private StockFoundListener listener;


    public void setup() {

        Configuration config = new Configuration();

        // 1. StockTick 클래스를 Esper가 사용할 이벤트 타입으로 등록

        config.addEventType("StockTick", StockTick.class);


        // 2. config를 이용해서 EPService 생성

        epService = EPServiceProviderManager.getProvider("StockTick", config);


        // 3. epService를 이용해서 EPL 생성

        eps = epService.getEPAdministrator().createEPL(

                "select * from StockTick t where t.rate >= 10");


        // 4. EPL의 결과를 받는 리스너 등록

        eps.addListener(new UpdateListener() {

            @Override

            public void update(EventBean[] newEvents, EventBean[] oldEvents) {

                StockTick stockTick = (StockTick) newEvents[0].getUnderlying();

                if (listener != null) listener.found(stockTick);

            }

        });

    }


    public void setStockFoundListener(StockFoundListener listener) {

        this.listener = listener;

    }


    public void sendStockTick(StockTick tick) {

        // 5. EP런타임에 이벤트 전달

        epService.getEPRuntime().sendEvent(tick);

    }

}


// StockFoundListener

public interface StockFoundListener {

    public void found(StockTick stockTick);

}


위 코드에서 3번 항목의 EPL은 아래와 같은데, 이를 보면 SQL과 유사한 것을 알 수 있다.


select * from StockTick t where t.rate >= 10


참고로, 이 쿼리는 앞서 1번 과정에서 등록한 StockTick 이벤트에 대해 그 이벤트의 rate 값이 10 보다 크거나 같으면 선택하라는 의미이다.


EPL에 의해 선택된 이벤트에 접근할 때 사용되는 것이 리스너이다. Esper는 EPL에 의해 선택된 데이터를 등록된 UpdateListener에 전달한다. 위 코드의 경우 3번 과정의 EPL에서 생성한 결과를 4번에서 등록한 listener에 StockTick 객체를 전달하도록 했다.


초기화를 했다면 EPRuntime의 sendEvent() 메서드를 이용해서 이벤트를 Esper에 전달하면 된다. 위 코드의 경우 다른 부분에서 지속적으로 주가 데이터를 읽어와 StockTick 객체를 생성한 뒤에 StockFinder의 sendStockTick() 메서드를 이용해서 StockTick 객체를 Esper 런타임에 전달하게 될 것이다.


간단 테스트 코드


실제 원하는 대로 동작하는지 아주 간단한 테스트 코드를 만들어보자.


public class StockFinderTest {


    private final StockFinder stockFinder = new StockFinder();


    private StockTick lastFound = null;

    private StockFoundListener listener = new StockFoundListener() {

        @Override

        public void found(StockTick stockTick) {

            lastFound = stockTick;

        }

    };


    @Before

    public void setup() {

        stockFinder.setup();

        stockFinder.setStockFoundListener(listener);

    }


    @Test

    public void shouldFound() {

        StockTick tick1 = new StockTick("name", "code", 109, 9, 9.0);

        stockFinder.sendStockTick(tick1);

        assertThat(lastFound, nullValue());


        StockTick tick2 = new StockTick("name", "code", 110, 10, 10.0);

        stockFinder.sendStockTick(tick2);

        assertThat(lastFound, equalTo(tick2));

    }



위 코드에서 listener 필드는 StockFinderListener 구현 객체를 갖는데, 이 객체는 found() 메서드에 전달된 stockTick 객체를 lastFound 필드에 할당한다. 따라서, StockFinder가 10% 이상 상승한 종목을 찾아서 통지를 하면 lastFound 필드에 그 StockTick 객체가 할당된다.


테스트 메서드에서 tick1은 전날 종가 대비 등락율이 9%이므로 StockFinder는 통지하지 않는다. 따라서, lastFound는 null 이어야 한다. 반면에 tick2는 등락율이 10%이므로 StockFinder가 통지하게 되고, 이에 따라 lastFound 필드에 tick2 객체가 전달된다. 따라서, lastFound와 tick2는 같아야 한다.


아주 약간 흥미를 더한 코드 만들기


단순히 어제 종가 대비 10% 상승한 종목을 찾는 일은 Esper를 사용하지 않아도 쉽게 할 수 있는 일이다. 단순히 이런 걸 하려고 Esper를 사용하진 않을 것이다. 실제로 Esper는 복잡한 상황과 조건을 다룰 수 있는 EPL을 제공하고 있다. 예를 들어, 최근 5초 동안 주가가 5% 이상 상승한 종목을 알고 싶다면 다음과 같은 EPL을 사용할 수 있다.


select first(*) as tick1, last(*) as tick2 

from StockTick.win:time(5 seconds) 

group by code 

having first(*) != last(*) and (last(cost) - first(cost)) / first(cost) >= 0.05


위 EPL을 간단히 설명하면

  • 2줄: StockTick 이벤트를 최근 5초 기준으로
  • 3줄: code를 이용해서 그룹핑 하고,
  • 4줄: 그룹에서 최근 5초 이내 첫 번째와 마지막 이벤트가 다르고, 첫 번째 이벤트의 cost보다 마지막 이벤트의 cost가 5% 이상 값이 크면,
  • 1줄: 그룹의 첫 번째 이벤트와 마지막 이벤트를 선택한다
위 EPL로부터 결과를 받는 UpdateListener는 다음과 같은 코드를 이용해서 최근 5초 동안 5% 이상 상승한 종목을 구할 수 있다.

eps = epService.getEPAdministrator().createEPL(
        "select first(*) as tick1, last(*) as tick2 from StockTick.win:time(5 seconds) " +
        "group by code having first(*) != last(*) and (last(cost) - first(cost)) / first(cost) > 0.05"
);
eps.addListener(new UpdateListener() {
    @Override
    public void update(EventBean[] newEvents, EventBean[] oldEvents) {
        StockTick tick1 = (StockTick) newEvents[0].get("tick1");
        StockTick tick2 = (StockTick) newEvents[0].get("tick2");
        ... 필요한 작업
    }
});

StockTick의 code 값을 이용해서 그룹핑을 하는데, code값이 각각 "code1"과 "code2"인 StockEvent가 아래 그림고 같은 순서로 발생했다고 하자. (시간은 초, code1 및 code2는 해당 초에 발생한 이벤트의 cost 값, 결과는 UpdateListener에 전달된 tick1과 tick2의 값이다.



위 그림에서, 5초가 흐른 시점에 code 값이 "code2"인 StockEvent가 발생을 했고, 이 이벤트의 cose 값은 302 이다. 즉, 최초 2초가 흐름 시점에는 code1과 관련된 2개의 이벤트가 발생했고, 첫 번째 이벤트의 cost는 100, 마지막 이벤트의 cost는 109이므로 5% 이상 상승한 값이다. 그래서 UpdateListener는 0초에 발생한 이벤트와 2초에 발생한 이벤트를 각각 tick1과 tick2로 받는다.


이와 비슷한 방식으로 11초 기준으로 "code2" 관련 StockEvent가 발생하는데, 이 시점에 최근 5초 동안 "code2"의 시작 이벤트는 7초 시점의 305 이벤트이고, 마지막 이벤트는 11초 시점의 323 이벤트이다. 323은 305보다 5% 이상 큰 값이므로, 결과로 305 이벤트와 323 이벤트가 선택된다.

참고자료

Esper를 이용하면 두 종류의 이벤트를 병합한다거나 조인을 하거나 특정 패턴을 찾아내는 등 다양한 방식으로 이벤트를 처리할 수 있다. 또한, 필요에 따라 외부 데이터를 함께 사용할 수도 있다. 이런 다양한 정보를 얻는 가장 좋은 방법은 레퍼런스 문서를 읽어보는 것이다. Esper에 관심이 있다면, http://esper.codehaus.org/ 사이트에서 레퍼런스 문서를 구해 차분히 읽어 보면 도움을 얻을 것이다.



  1. 춥고배고파 2014.03.09 23:39

    StockFinderListener class 는 어디에 있는건가요?

  2. 춥고배고파 2014.03.10 22:11

    눈이 이상해서 그런지 찾지 못하겠네요 ㅠ..ㅠ

    • 최범균 madvirus 2014.03.11 08:59 신고

      아,,, StockFinderListener는 인터페이스인데, 제가 그 인터페이스 코드 자체를 넣진 않았네요. StockFinderTest 클래스의 listener 필드에 다음과 같이 임의 객체를 생성하는 부분에서, StockFinderListener 인터페이슨느 found() 메서드 하나만 정의하고 있는 인터페이스입니다.

      private StockFinderListener listener = new StockFinderListener() {
      @Override
      public void found(StockTick stockTick) {
      lastFound = stockTick;
      }
      };

      즉, StockFinderListener는 다음과 같습니다.

      public interface StockFinderListener {
      public void found(StockTick stockTick);
      }

  3. 흠... 2014.03.12 16:52

    StockFoundListener클래스도 보이지 않네요! 어디에서 찾아야하나요~?

반응형

실시간 데이터 처리용으로 쓰임새가 늘어나고 있는 스톰Storm을 설치해 봤는데, 그 과정을 정리해 본다.


설치 순서

  1. 자바 설치 (1.6 이상 설치되어 있다고 가정)
  2. 주키퍼ZooKeeper 설치
  3. ZeroMQ 설치
  4. JZMQ 설치
  5. 스톰 님버스Nimbus 설정
  6. 스톰 수퍼바이저Supervisor 설정
  7. 클러스터 시작하기
본 설치 과정을 위해 CentOS 6버전이 설치된 5대의 장비를 사용했다. 각 호스트 이름 별로 다음의 데몬들을 설치할 것이다.
  • zk1: 주키퍼 설치
  • storm-nimbus: 님버스 설치, ZeroMQ, JZMQ
  • storm-sup1 ~ storm-sup3: 수퍼바이저 설치, ZeroMQ, JZMQ
참고로, 스톰의 님버스나 수퍼바이저가 뭔지 모른다면, 스톰 훑어보기 자료(http://javacan.tistory.com/324)를 참고하기 바란다.

주키퍼ZooKeeper 설치(zk1 장비)

1. 주키퍼 데이터 디렉토리 생성

이 글에서는 /data1/zk 를 생성한다고 가정

2. 원하는 디렉토리에 주키퍼 다운로드 받아 설치

여기서는 /data1/program 디렉토리에 설치한다고 가정. 아래 다운로드 경로는 http://www.apache.org/dyn/closer.cgi/zookeeper/ 에서 확인

$ cd /data1/program
$ wget http://mirror.apache-kr.org/zookeeper/stable/zookeeper-3.4.5.tar.gz
$ tar xvzf zookeeper-3.4.5.tar.gz

3. 주키퍼 설정 파일 작성

/data1/program/zookeeper-3.4.5/conf/zoo.cfg 파일 작성. 아래는 작성 예이다.

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data1/zk
clientPort=2181
autopurge.snapRetainCount=5
autopurge.purgeInterval=25


4. 주키퍼 실행


$ cd /data1/program/zookeeper-3.4.5/bin

./zkServer.sh start

JMX enabled by default

Using config: /data1/program/zookeeper-3.4.5/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED



ZeroMQ 설치 (storm-nim, storm-sup1~storm-sup3 장비)


ZeroMQ는 스톰이 통신할 때 사용하는 라이브러리이다.


1. 필요 라이브러리 설치


$ yum install gcc gcc-c++ libuuid-devel


2. ZeroMQ 설치


$ wget http://download.zeromq.org/zeromq-4.0.3.tar.gz

$ tar xzf zeromq-4.0.3.tar.gz

$ cd zeromq-4.0.3

$ ./configure

$ make

$ make install


설치가 끝나면 /usr/local/lib/ 디렉토리에 libzmq.so 파일이 있는 확인해보자.


JZMQ 설치 (storm-nim, storm-sup1~storm-sup3 장비)


1. git 설치 / libtool 설치


git으로부터 소스를 다운로드 받으므로, git을 설치한다. git에서 clone 하기 싫다면, Github 사이트에서 직접 다운로드 받아도 된다.


$ yum install git libtool


2. JZMQ 설치


JAVA_HOME이 알맞게 설정되어 있어야 한다.


$ git clone https://github.com/nathanmarz/jzmq.git

$ cd jzmq

$ ./autogen.sh

$ ./configure

$ make

$ make install


/usr/local/share/java 디렉토리에 zmq.jar 파일이 생성된 것을 확인할 수 있다.



스톰 설치 (storm-nim, storm-sup1~storm-sup3 장비)


1. 스톰 다운로드


님버스와 수퍼바이저를 실행할 장비에 모두 Storm을 다운로드해서 설치한다. 아래 다운로드 주소는 http://storm-project.net/downloads.html 에서 구하면 된다.


$ wget https://dl.dropboxusercontent.com/s/tqdpoif32gufapo/storm-0.9.0.1.tar.gz

$ tar xzf storm-0.9.0.1.tar.gz

$ ln -s storm-0.9.0.1 storm


2. 스톰 데이터 디렉토리


님버스와 수퍼바이저로 사용될 장비에 스톰이 데이터를 보관할 디렉토리를 생성한다. 이 글에서는 /data1/storm 디렉토리를 사용한다고 가정한다.


3. 스톰 설정 파일 작성


각 장비의 [스톰설치경로]/conf/conf/storm.yaml 파일을 수정한다.


storm.zookeeper.servers:

     - "zk1"

 

nimbus.host: "storm-nim"


storm.local.dir: "/data1/storm"


ui.port: 8087


주키퍼 포트를 변경해야 한다거나, 디폴트 설정 값 등의 정보가 궁금하다면 https://github.com/nathanmarz/storm/blob/master/conf/defaults.yaml 를 참고하면 된다.


4. 스톰 클러스터 데몬 실행


storm-nim 장비에서 스톰 님버스 데몬을 실행한다. Ctrl+C 키를 누르면 데몬이 중지된다.


$ cd [스톰설치디렉토리]/bin

$ ./storm nimbus

Running: java -server -Dstorm.options= .......생략


storm-sup1 ~ storm-sup3 장비에서 동일한 방식으로 스톰 수퍼바이저 데몬을 실행한다. 마찬가지로 Ctrl+C 키를 누르면 중지된다.


$ cd [스톰설치디렉토리]/bin

$ ./storm supervisor

Running: java -server -Dstorm.options= .......생략


님버스 데몬을 실행 중인 장비에서 스톰 UI 웹 서버를 실행할 수 있다. 다음은 스톰 UI 데몬 실행 명령어이다.


$ ./storm ui


앞서 설정 파일에서 "ui.port"의 값을 8087로 주었는데, http://nimbus호스트:8087 로 연결해보면 다음과 비슷한 화면이 출력되는 것을 확인할 수 있다.



최초 환경 테스트를 수행할 때에는 위와 같이 스톰 데몬을 수동으로 띄우지만, 실제 환경에서는 supervisord나 daemontools, monit과 같은 툴을 이용해서 프로세스가 죽을 경우 자동으로 뜨도록 해 놓는다.


5. 간단한 명령어로 확인


아래 명령어로 클러스터에 등록된 토폴로지를 확인할 수 있다.


$ storm list

...생략

No topologies running.


설치가 완료되었다.




반응형

실시간 데이터 처리를 위한 Storm 기술 훑어보기 자료입니다.



  1. 티비다시보기 2021.03.24 14:33

    잘 보고 갑니다...

반응형

팀내에서 발표할 하이브 입문하기 발표자료입니다.



반응형

HBase 훑어보기 자료



+ Recent posts