저작권 안내: 저작권자표시 Yes 상업적이용 No 컨텐츠변경 No

스프링5 입문

JSP 2.3

JPA 입문

DDD Start

인프런 객체 지향 입문 강의

관련 글


Insert into를 이용한 이벤트 발생

EPL은 insert into 를 사용해서 새로운 이벤트를 발생시킬 수 있다. insert into의 사용 방법은 다음과 같이 단순하다.

insert into SlowResponse
select url, responseTime as respTime from AccessLog where responseTime > 500

위 코드는 AccessLog 이벤트의 responseTime 프로퍼티가 500보다 크면 url과 responseTime 프로퍼티를 선택하고, 선택된 프로퍼티를 이용해서 새로운 SlowResponse 이벤트를 생성한다.

insert into를 이용해서 생성하는 이벤트는 select에서 선택한 프로퍼티와 동일한 프로퍼티를 갖고 있어야 한다. 예를 들어, 위 코드의 경우 SlowResponse 이벤트는 url과 respTime 프로퍼티를 갖고 있어야 한다. (SlowResponse 이벤트를 자바 클래스로 정의했다면 setUrl()과 setRespTime() 메서드를 필요로 한다.)

insert into를 이용해서 생성한 이벤트는 일반적인 다른 이벤트와 마찬가지로 EPL을 이용해서 사용할 수 있다. 예를 들어, 위 코드로 생성한 SlowResponse 이벤트는 다음의 EPL에서 사용할 수 있게 된다.

select count(*) as count from SlowResponse.win:time(2 sec) having count(*) > 3


SlowResponse 클래스가 다음의 생성자를 갖고 있다고 하자.


public class SlowResponse {

    private String url;

    private long respTime;


    public SlowResponse(String url, long respTime) {

        this.url = url;

        this.respTime = respTime;

    }


이 경우, inserto into에서는 생성자를 이용할 수 있다.


insert into SlowResponse(url, responseTime)

select url, responseTime from AccessLog where responseTime > 500


조인(Join)


Esper는 스트림 간의 조인을 제공한다. RDMBS의 조인과 유사하며, 내부 조인과 외부 조인을 함께 지원하고 있다. RDBMS와의 차이가 있다면, EPL에서의 조인은 제한된 뷰에서의 조인을 지원한다는 점이다. (win:time 이나 win:length 등이 뷰에 해당하며, Esper는 앞서 Esper 초보 시리즈 2 - EPL 기초 에서 봤던 시간/길이 윈도우 외에 다양한 뷰를 제공하고 있다.)


아래 코드는 두 스트림의 조인을 이용한 코드의 예이다.


EPStatement eps = epService.getEPAdministrator().createEPL(

  "select v, o from ProductView.win:time(2 sec) as v, ProductOrder.win:time(2 sec) as o "

);

eps.addListener(new UpdateListener() {

    @Override

    public void update(EventBean[] newEvents, EventBean[] oldEvents) {

        for (EventBean eb : newEvents) {

            ProductView pv = (ProductView) eb.get("v");

            ProductOrder po = (ProductOrder) eb.get("o");

        }

    }

});


위 코드는 1분 동안 발생한 ProductView와 ProductOrder 이벤트를 조인한 결과를 select 하고 있다.



위 그림에서 ProductView 이벤트가 1초, 1.5초, 2.5초 시점에 발생하고, ProductOrder 이벤트는 2초와 3.3초 시점에 발생했다. 앞서 EPL에서 두 이벤트에 대해 동일하게 2초의 시간 윈도우를 지정했는데, 이 경우 각 시점에 select로 조회되는 조인 결과는 다음과 같다.

  • 주황색(2초 시점): O1 이벤트가 발생하고, 그 결과로 (V1, O1), (V2, O2) 조인 결과가 select 된다.
  • 녹색(2.5초 시점): V3 이벤트가 발생하고, 그 결과로 (V3, O1) 조인 결과가 select 된다.
  • 빨강색(3.3초 시점): O2 이벤트가 발생하고, 그 결과로 (V2, O2), (V3, O2) 조인 결과가 select 된다.
where 절을 이용해서 조인 조건을 정의할 수도 있다.

select v, o from ProductView.win:time(2 sec) as v, ProductOrder.win:time(2 sec) as o
where v.productId = o.productId and v.userId = o.userId

외부 조인을 사용할 수도 있다. 다음은 외부 조인의 사용예이다. SQL과 동일하게 on 절을 이용해서 조건을 지정한다.


select v, o 

from ProductView.win:time(2 sec) as v 

left outer join ProductOrder.win:time(2 sec) as o 

on v.productId = o.productId and v.userId = o.userId 


leff 외에 right, full 외부 조인을 지원한다.


아래 그림은 다음의 쿼리를 실행했을 때 결과를 정리한 것이다.

  • select v, o from ProductView.win:time(2 sec) as v 
    left outer join ProductOrder.win:time(2 sec) as o 


이 그림에서 회색 상자는 특정 시점에 select 결과로 전달받은 데이터를 표시한 것이다. V1 이벤트가 발생한 시점을 기준으로 아직 ProductOrder 이벤트가 존재하지 않으므로 select 결과는 (V1, null)이 된다. 비슷하게 V2 이벤트 발생 시점에도 (V2, null)을 결과로 받는다. O1 이벤트가 발생하는 시점이 되면, EPL에서 지정한 시간 윈도우 범위 안에 V1, V2 이벤트가 조인할 수 있는 O1 이벤트가 생기게 되고, 따라서 select 결과로 (V1, O1), (V2, O1)을 받게 된다.


외부 조인을 사용하면 좀 더 재미난 쿼리를 만들 수 있다. 아래 코드를 보자.


select rstream v

from ProductView.win:time(1 min) as v

left outer join ProductOrder.win:time(1 min) as o

on v.productId = o.productId and v.userId = o.userId

where o is null


위 EPL은 select 절에 rstream을 사용했는데, 이렇게 하면 데이터가 윈도우를 벗어날 때 쿼리를 실행하게 된다. 따라서, 위 코드는 ProductView 이벤트가 발생한 뒤로 1분 안에 동일 제품/동일 사용자의 ProductOrder 이벤트가 발생하지 않으면 해당 ProductView 이벤트를 select 결과로 받게 된다. 이 때, UpdateListener는 아래 코드와 같이 oldEvents 파라미터를 이용해서 결과를 얻을 수 있다.


EPStatement eps = epService.getEPAdministrator().createEPL(

        "select rstream v "+

                "from ProductView.win:time(2 sec) as v "+

                "left outer join ProductOrder.win:time(2 sec) as o " +

                "on v.productId = o.productId and v.userId = o.userId "+

                "where o is null"

);


eps.addListener(new UpdateListener() {

    @Override

    public void update(EventBean[] newEvents, EventBean[] oldEvents) {

        for (EventBean eb : newEvents) {

            ProductView pv = (ProductView) eb.get("v");

            ...

        }

    }

});


조인이 유용하지만, 좀 더 상황이 복잡할 경우에는 패턴을 사용해서 풀어낼 수 있다. 패턴 대한 내용은 이후에 따로 정리할 예정이다.


서브 쿼리


Esper는 서브 쿼리도 지원한다. 서브 쿼리는 select, where, having, 필터 등에서 사용할 수 있다. 다음은 서브 쿼리의 예다.


select count(*) as count, (select max(value) from Tps.win:time(5 sec)) as maxTps

from SlowResponse.win:time(3 sec) s having count(*) > 3


다음은 where 절에서 서브 쿼리를 사용하는 예이다.


select * from SlowResponse.win:time(4 sec) s

where s.responseTime > (select avg(responseTime) from SlowResponse.win:time(2 sec))


where 절에서 exists, in, not in, any, all 등을 사용할 수도 있다. 다음은 exists를 사용한 예이다.


select rstream v from ProductView.win:time(10 min) as v 

where not exists 

        (select * from ProductOrder.win:time(10 min) as o 

         where v.productId = o.productId and v.userId = o.userId)


위 코드는 ProductView 이벤트의 시간 윈도우에서 벗어나는 이벤트에 대해 그 이벤트와 매칭되는 ProductOrder 이벤트가 존재하지 않는지 확인한다. 즉, 상품을 본 후 10초 이내에 주문을 하지 않은 ProductView 이벤트를 구한다. (이 결과는 앞에서 left outer join 사용 예에서의 결과와 동일하지만, 조인의 경우보다 이해하기가 더 쉬운 것을 알 수 있다.)



Posted by 최범균 madvirus

댓글을 달아 주세요


Esper는 이벤트 시점을 기준으로 출력 결과를 발생시킨다. 이벤트가 들어오거나 타임 윈도우에서 이벤트가 벗어나거나 할 때 관련된 이벤트 처리 결과를 리스너에 전달한다. 예를 들어, 아래 EPL을 생각해보자.


select avg(cost) from StockEvent


위 EPL을 실행하면 StockEvent가 발생할 때 마다 cost의 평균값을 리스너를 통해 받게 된다. 그런데, 단순히 평균의 추이를 알고 싶은 거라면 모든 이벤트마다 평균값을 받는 것 보다 10초, 1분과 같이 주기적으로 그 시점의 평균값을 확인해도 문제가 없을 것이다. 이렇게 출력 자체를 제어하고 싶을 때 output 절을 사용할 수 있다.


관련 시리즈:


output 을 이용한 출력 시점 제어


output은 출력 시점을 제어하기 위해 사용된다. 예를 들어, 10초 마다 출력을 받고 싶다거나, 이벤트 5개 당 첫 번째에 대해 결과를 받고 싶다거나 할 때 output을 사용한다.


output의 기본 사용 방법은 다음과 같다.


select ... from ...

output [all | first | last | snapshot ] every N [seconds | events]


every N에서 N은 숫자이며, seconds는 초를 events는 이벤트 개수를 뜻한다. 즉, "every 10 seconds"는 매 10초마 결과를 리스너에 전달하게 된다.


output 뒤에 all, first, last, snapshot는 다음의 의미를 갖는다.

  • all: 출력 주기에 발생한 모든 이벤트 출력
  • first: 출력 주기에 발생한 첫 번째 이벤트 기준 출력
  • last: 출력 주기에 발생한 마지막 이벤트 기준 출력
  • snapshot: 현재 시점의 출력 결과. 윈도우와 함께 사용되지 않을 경우 last와 같은 결과 출력.

예를 들어, 아래 목록에 첫 번째 EPL은 매 2초 간격으로 출력을 발생시키는데, 출력 주기(2초) 범위에 발생한 모든 이벤트에 대한 출력을 발생시킨다.

  • select avg(cost) as avg from StockTick output all every 2 seconds
  • select avg(cost) as avg from StockTick output first every 2 seconds
  • select avg(cost) as avg from StockTick output last every 2 seconds
  • select avg(cost) as avg from StockTick output snapshot every 2 seconds

위 EPL에 대해 실제 리스너에 전달된 출력 결과를 시간대 별로 출력해보면 아래 그림과 같다.



위 그림에서 상위 두 개는 code1 및 code2 이벤트의 발생 시점이다. 그리고, 회색 화살표는 2초 간격을 표시한 것이다. 녹색 삼각형으로 표시된 output all의 경우, 매 2초 간격마다 발생한 모든 이벤트 처리 결과가 리스너에 전달되는 것을 알 수 있다.


위 그림에서 회색 X 표시가 output first에 의해 발생한 이벤트 발생 시점을 표시한 부분이다. output first의 경우는 약간 특이하게 동작한다. output first의 경우 현재 주기에서 발생한 이벤트가 없으면, 다음 주기가 시작할 때 현재 시점 기준으로 마지막 결과를 리스너에 전달하고, 그 다음 주기에서 새로운 이벤트가 발생하면 그 시점의 결과를 리스너에 다시 전달한다.


group by와 output


group by와 output이 만나면 약간 다른 방식으로 동작한다. group by와 output이 만나면 다음과 같은 방식으로 동작한다.

  • first: 그룹 별 출력 주기에 발생한 첫 번째 이벤트를 기준으로 출력 결과 생성. 그룹 별 첫 번째 이벤트 발생 시점에 출력 생성. 출력 주기에 이벤트가 없으면 출력 결과를 생성하지 않음.
  • last: 그룹 별 출력 주기에 발생한 마지막 이벤트를 기준으로 출력 결과 생성. 출력 주기 종료 시점에 마지막 출력 결과 생성. 출력 주기에 이벤트가 없으면 출력 결과를 생성하지 않음.
  • all: 출력 주기 종료 시점에 마지막 출력 결과 생성. 출력 주기에 이벤트가 없어도 출력 결과 생성.
  • 키워드 없음: 출력 주기에 발생한 모든 출력 결과 생성. 출력 주기 종료 시점에 출력 결과 생성. 출력 주기에 이벤트가 없으면 출력 결과를 생성하지 않음.

아래 쿼리는 작성 예이다.

  • select code, avg(cost) as avg from StockTick group by code output first every 2 sec
  • select code, avg(cost) as avg from StockTick group by code output last every 2 sec
  • select code, avg(cost) as avg from StockTick group by code output all every 2 sec
  • select code, avg(cost) as avg from StockTick group by code output every 2 sec

위 쿼리를 실행해 보면 이벤트 발생 시점에 따라 아래와 같이 출력 결과가 발생한다.




윈도우와 snapshot의 동작 방식


윈도우와 output snapshot을 함께 사용하면 윈도우를 기준으로 마지막 값이 출력된다. 아래 그림은 다음의 세 쿼리를 실행한 결과를 정리한 것이다.

  • select avg(cost) as avg from StockTick.win:time_batch(3 sec) output snapshot every 1.5 sec
  • select avg(cost) as avg from StockTick.win:time_batch(3 sec)
  • select avg(cost) as avg from StockTick.win:time(3 sec) output snapshot every 1.5 sec



출력 주기를 1.5초로 지정했는데, 그 출력 결과를 보면 시간 배치 윈도우와 시간 윈도우에서 output snapshot이 다르게 동작하는 것을 확인할 수 있다. 시간 배치 윈도우와 output snapshot을 함께 사용한 결과를 보면 세 번째 출력 결과가 5000인데, 그 이유는 다음과 같다.

  • 시간 배치 윈도우에 적용된 output snapshot의 세 번째 출력 결과 시점은 약 8.5초
  • 시간 배치 윈도우의 처리 구간은 7초~9초
  • 따라서, 아직 현재 시간 배치 윈도우의 결과가 없으므로, 이전 배치의 결과인 5000을 값으로 출력
시간 윈도우의 경우는 항상 결과가 있기 때문에 현재 출력 시점을 기준으로 시간 윈도우 크기 범위의 결과를 출력한다.


추가 내용


출력 주기를 크론탭을 이용해서 표현 가능하다. 다음은 매 15분 주기로 결과를 출력하는 예이다.

  • select avg(cost) as avg from StockTick.win:time_batch(30 minutes) output snapshot at(*/15, *, *, *, *)

after를 사용해서 최초 일정 시간 동안 결과를 출력하지 않도록 지정할 수 있다.

  • select avg(cost) as avg from StockTick output after 4 sec last every 1.5 sec


Posted by 최범균 madvirus

댓글을 달아 주세요

  1. 2017.08.06 21:52  댓글주소  수정/삭제  댓글쓰기

    비밀댓글입니다

    • 최범균 madvirus 2017.08.11 20:33 신고  댓글주소  수정/삭제

      안녕하세요, 제가 이 글을 쓴지 몇 년 지나기도 했고 Esper를 손 놓은지 오래 되기도 해서 최신 버전의 Esper에 대한 내용을 새로 쓸 수준이 안 됩니다.