주요글: 도커 시작하기
반응형


이래 저래 데이터 처리 관련된 기술을 조사하던 중 Esper란 놈을 알게 되었다. 몇 가지 글을 읽어보니 기회가 되면 사용해보고 싶은 욕구가 생겼다. 나처럼 Esper에 관심있는 분들을 위한 퀵스타트 문서를 정리해 본다.


관련 시리즈:


Esper란?


Esper 사이트에 따르면 Esper란 다음과 같은 것이다.

Esper is a component for complex event processing (CEP) and event series analysis.

Esper는 실시간으로 발생하는 이벤트를 분석하고 처리하기 위한 컴포넌트로서, 다음과 같은 방식으로 동작한다.


외부에서 발생한 이벤트를 Esper 엔진에 전달하면, 이벤트를 분석한다. 이벤트를 분석할 때 EPL 이라는 언어를 사용하는데, 이 언어를 이용해서 조건에 맞는 이벤트를 찾고 처리한 결과를 데이터를 생성한다.


EPL 언어는 SQL과 유사한 구조를 갖고 있기 때문에 조금만 노력하면 금방 익힐 수 있을 것 같다. EPL과 SQL을 비교해 보면, SQL에 존재하는 데이터에 대해 쿼리를 실행하는 방식이라면 EPL은 실시간으로 발생되는 이벤트에 대해 쿼리를 실행하는 방식이라고 말 할 수 있다.


Esper 다운로드


Esper를 사용하려면 Esper를 다운로드 받고 필요한 jar 파일을 복사하면 된다. 그런데, Esper는 EPL 파싱을 위해 Antlr을 사용하고 로깅을 위해 Commons Logging을 사용하고 있기 때문에 실제로는 Esper 배포판뿐만 아니라 의존 모듈도 함께 다운로드 받아야 한다.


메이븐을 사용하고 있다면 다음과 같이 의존을 추가해주면 된다.


<dependencies>

<dependency>

<groupId>com.espertech</groupId>

<artifactId>esper</artifactId>

<version>4.11.0</version>

</dependency>

</dependencies>


간단한 예제 만들어보기


여기서 만들어 볼 예제는다음과 같은 간단한 예제이다.

  • 현재가가 전날 종가대비 10% 이상 오른 종목을 통지한다.

이벤트로 사용될 클래스


먼저 할 작업은 이벤트로 사용될 클래스를 작성하는 것이다.


public class StockTick {


    private String name;

    private String code;

    private int cost;

    private int fluctuation;

    private double rate;


    public StockTick(String name, String code, int cost, int fluctuation, double rate) {

        this.name = name;

        this.code = code;

        this.cost = cost;

        this.fluctuation = fluctuation;

        this.rate = rate;

    }


    public String getName() {

        return name;

    }


    public String getCode() {

        return code;

    }


    public int getCost() {

        return cost;

    }


    public int getFluctuation() {

        return fluctuation;

    }


    public double getRate() {

        return rate;

    }

}


위 코드는 한 종목의 이름, 코드, 현재주가, 전일비, 등낙율을 담고 있다.


조건을 충족하는 종목을 찾아주는 코드


아래 코드는 Esper를 이용해서 전날 종가보다 10% 이상 상승한 종목을 출력해주는 기능을 제공하는 클래스이다.


import com.espertech.esper.client.*;


public class StockFinder {

    private EPServiceProvider epService;

    private EPStatement eps;

    private StockFoundListener listener;


    public void setup() {

        Configuration config = new Configuration();

        // 1. StockTick 클래스를 Esper가 사용할 이벤트 타입으로 등록

        config.addEventType("StockTick", StockTick.class);


        // 2. config를 이용해서 EPService 생성

        epService = EPServiceProviderManager.getProvider("StockTick", config);


        // 3. epService를 이용해서 EPL 생성

        eps = epService.getEPAdministrator().createEPL(

                "select * from StockTick t where t.rate >= 10");


        // 4. EPL의 결과를 받는 리스너 등록

        eps.addListener(new UpdateListener() {

            @Override

            public void update(EventBean[] newEvents, EventBean[] oldEvents) {

                StockTick stockTick = (StockTick) newEvents[0].getUnderlying();

                if (listener != null) listener.found(stockTick);

            }

        });

    }


    public void setStockFoundListener(StockFoundListener listener) {

        this.listener = listener;

    }


    public void sendStockTick(StockTick tick) {

        // 5. EP런타임에 이벤트 전달

        epService.getEPRuntime().sendEvent(tick);

    }

}


// StockFoundListener

public interface StockFoundListener {

    public void found(StockTick stockTick);

}


위 코드에서 3번 항목의 EPL은 아래와 같은데, 이를 보면 SQL과 유사한 것을 알 수 있다.


select * from StockTick t where t.rate >= 10


참고로, 이 쿼리는 앞서 1번 과정에서 등록한 StockTick 이벤트에 대해 그 이벤트의 rate 값이 10 보다 크거나 같으면 선택하라는 의미이다.


EPL에 의해 선택된 이벤트에 접근할 때 사용되는 것이 리스너이다. Esper는 EPL에 의해 선택된 데이터를 등록된 UpdateListener에 전달한다. 위 코드의 경우 3번 과정의 EPL에서 생성한 결과를 4번에서 등록한 listener에 StockTick 객체를 전달하도록 했다.


초기화를 했다면 EPRuntime의 sendEvent() 메서드를 이용해서 이벤트를 Esper에 전달하면 된다. 위 코드의 경우 다른 부분에서 지속적으로 주가 데이터를 읽어와 StockTick 객체를 생성한 뒤에 StockFinder의 sendStockTick() 메서드를 이용해서 StockTick 객체를 Esper 런타임에 전달하게 될 것이다.


간단 테스트 코드


실제 원하는 대로 동작하는지 아주 간단한 테스트 코드를 만들어보자.


public class StockFinderTest {


    private final StockFinder stockFinder = new StockFinder();


    private StockTick lastFound = null;

    private StockFoundListener listener = new StockFoundListener() {

        @Override

        public void found(StockTick stockTick) {

            lastFound = stockTick;

        }

    };


    @Before

    public void setup() {

        stockFinder.setup();

        stockFinder.setStockFoundListener(listener);

    }


    @Test

    public void shouldFound() {

        StockTick tick1 = new StockTick("name", "code", 109, 9, 9.0);

        stockFinder.sendStockTick(tick1);

        assertThat(lastFound, nullValue());


        StockTick tick2 = new StockTick("name", "code", 110, 10, 10.0);

        stockFinder.sendStockTick(tick2);

        assertThat(lastFound, equalTo(tick2));

    }



위 코드에서 listener 필드는 StockFinderListener 구현 객체를 갖는데, 이 객체는 found() 메서드에 전달된 stockTick 객체를 lastFound 필드에 할당한다. 따라서, StockFinder가 10% 이상 상승한 종목을 찾아서 통지를 하면 lastFound 필드에 그 StockTick 객체가 할당된다.


테스트 메서드에서 tick1은 전날 종가 대비 등락율이 9%이므로 StockFinder는 통지하지 않는다. 따라서, lastFound는 null 이어야 한다. 반면에 tick2는 등락율이 10%이므로 StockFinder가 통지하게 되고, 이에 따라 lastFound 필드에 tick2 객체가 전달된다. 따라서, lastFound와 tick2는 같아야 한다.


아주 약간 흥미를 더한 코드 만들기


단순히 어제 종가 대비 10% 상승한 종목을 찾는 일은 Esper를 사용하지 않아도 쉽게 할 수 있는 일이다. 단순히 이런 걸 하려고 Esper를 사용하진 않을 것이다. 실제로 Esper는 복잡한 상황과 조건을 다룰 수 있는 EPL을 제공하고 있다. 예를 들어, 최근 5초 동안 주가가 5% 이상 상승한 종목을 알고 싶다면 다음과 같은 EPL을 사용할 수 있다.


select first(*) as tick1, last(*) as tick2 

from StockTick.win:time(5 seconds) 

group by code 

having first(*) != last(*) and (last(cost) - first(cost)) / first(cost) >= 0.05


위 EPL을 간단히 설명하면

  • 2줄: StockTick 이벤트를 최근 5초 기준으로
  • 3줄: code를 이용해서 그룹핑 하고,
  • 4줄: 그룹에서 최근 5초 이내 첫 번째와 마지막 이벤트가 다르고, 첫 번째 이벤트의 cost보다 마지막 이벤트의 cost가 5% 이상 값이 크면,
  • 1줄: 그룹의 첫 번째 이벤트와 마지막 이벤트를 선택한다
위 EPL로부터 결과를 받는 UpdateListener는 다음과 같은 코드를 이용해서 최근 5초 동안 5% 이상 상승한 종목을 구할 수 있다.

eps = epService.getEPAdministrator().createEPL(
        "select first(*) as tick1, last(*) as tick2 from StockTick.win:time(5 seconds) " +
        "group by code having first(*) != last(*) and (last(cost) - first(cost)) / first(cost) > 0.05"
);
eps.addListener(new UpdateListener() {
    @Override
    public void update(EventBean[] newEvents, EventBean[] oldEvents) {
        StockTick tick1 = (StockTick) newEvents[0].get("tick1");
        StockTick tick2 = (StockTick) newEvents[0].get("tick2");
        ... 필요한 작업
    }
});

StockTick의 code 값을 이용해서 그룹핑을 하는데, code값이 각각 "code1"과 "code2"인 StockEvent가 아래 그림고 같은 순서로 발생했다고 하자. (시간은 초, code1 및 code2는 해당 초에 발생한 이벤트의 cost 값, 결과는 UpdateListener에 전달된 tick1과 tick2의 값이다.



위 그림에서, 5초가 흐른 시점에 code 값이 "code2"인 StockEvent가 발생을 했고, 이 이벤트의 cose 값은 302 이다. 즉, 최초 2초가 흐름 시점에는 code1과 관련된 2개의 이벤트가 발생했고, 첫 번째 이벤트의 cost는 100, 마지막 이벤트의 cost는 109이므로 5% 이상 상승한 값이다. 그래서 UpdateListener는 0초에 발생한 이벤트와 2초에 발생한 이벤트를 각각 tick1과 tick2로 받는다.


이와 비슷한 방식으로 11초 기준으로 "code2" 관련 StockEvent가 발생하는데, 이 시점에 최근 5초 동안 "code2"의 시작 이벤트는 7초 시점의 305 이벤트이고, 마지막 이벤트는 11초 시점의 323 이벤트이다. 323은 305보다 5% 이상 큰 값이므로, 결과로 305 이벤트와 323 이벤트가 선택된다.

참고자료

Esper를 이용하면 두 종류의 이벤트를 병합한다거나 조인을 하거나 특정 패턴을 찾아내는 등 다양한 방식으로 이벤트를 처리할 수 있다. 또한, 필요에 따라 외부 데이터를 함께 사용할 수도 있다. 이런 다양한 정보를 얻는 가장 좋은 방법은 레퍼런스 문서를 읽어보는 것이다. Esper에 관심이 있다면, http://esper.codehaus.org/ 사이트에서 레퍼런스 문서를 구해 차분히 읽어 보면 도움을 얻을 것이다.



+ Recent posts