저작권 안내: 저작권자표시 Yes 상업적이용 No 컨텐츠변경 No

스프링5 입문

JSP 2.3

JPA 입문

DDD Start

인프런 객체 지향 입문 강의

아파치 스톰(Storm) 프로젝트는 실시간 데이터 분석을 위한 기반 플랫폼이다. 스톰을 이용하면 데이터를 병렬로 가공할 수 있는 클러스터를 구축할 수 있으며, 이 클러스터를 이용해서 실시간으로 발생하는 데이터를 가공하고 분석하는 작업을 수행할 수 있다. 스톰에 대한 소개는 이전에 올렸던 'Storm 훑어보기'(http://javacan.tistory.com/324) 자료로 대신하고, 본 글에선 스톰 트라이던트의 연산 처리와 관련된 API의 사용 방법에 대해 살펴볼 것이다.


만들 예제


본 글에서는 다음과 같은 간단한 예제를 만들어보면서 스톰 트라이던트 API의 연산 처리와 관련된 API를 알아볼 것이다.

  • 실시간 로그를 읽어와,
  • 로그가 판매 로그면,
  • 로그를 파싱하고,
  • 1분당 판매 개수가 50개를 넘기면 상품이 있으면,
  • 통지한다.

스톰 트라이던트 프로그래밍의 구성 요소


스톰 클러스터를 이용해서 데이터를 처리하려면 다음의 세 가지 종류의 코드를 작성해야 한다.

  • 트라이던트 스파우트(Spout): 튜플을 발생시킨다. Kafka와 같은 서버에서 메시지를 읽어와 튜플로 변환해서 스톰 스트림에 넣어준다.
  • 연산 처리
    • Function: 튜플을 입력받아 튜플을 생성한다.
    • Filter: 튜플을 입력받아 연산을 계속할지 여부를 결정한다.
    • 그룹핑: 튜플을 특정 필드를 이용해서그룹핑한다.
  • 토폴로지(Topology): 스파우트와 연산 구성 요소를 연결하고 병렬 처리 등을 설정한다.
여기서 만들 토폴로지 구성은 아래 그림과 같다.


스파우트는 로그를 튜플로 생성한다. 튜플(tuple)은 데이터를 보관하는 단위로서, Function과 Filter는 한 개의 튜플에 대해 연산을 수행한다. 여기서 만들 스파우트는 단순히 로그 문자열을 튜플에 담아 스트림에 넣는다.


모든 로그가 아닌 주문 로그만 처리하면 되므로, 주문 로그인지 여부를 확인하는 필터를 맨 앞에 위치시킨다. 필터는 로그가 주문 필터인 경우에만 튜플을 스트림의 다음 단계로 전달한다. 이후 과정에서는 로그 문자열을 파싱해서 ShopLog라는 객체로 변환하고, 그룹핑 기준으로 사용할 '제품ID:시간'을 튜플에 추가한다.


'제품ID:시간'으로 튜플을 그룹핑 한뒤 뒤, 각 '제품ID:시간' 별로 개수를 구한다. 그리고, '제품ID:시간' 별 개수를 누적하고(기 별 개수 합 함수), 각 누적 개수가 임계값에 도달하면 통지 필터에 해당 튜플을 전달한다.


트라이던트 스파우트 구현


트라이던트 스파우트(이하 스파우트)는 기본적으로 배치 단위로 튜플을 생성한다. 다음 글에서 더 자세히 다루겠지만, 스파우트가 생성하는 각 튜플은 한 트랜잭션 ID(배치)에 포함된다. 예를 들어, 스파우트는 한 트랜잭션ID에 해대 튜플을 5개씩 생성할 수 있고, 이 때 스트림은 한 번에 5개의 튜플을 처리하게 된다. 만약 토폴로지에서 튜플을 처리하는 도중 에러가 발생하면, 스톰은 특졍 트랜잭션 ID에 해당하는 튜플을 다시 생성하도록 스파우트에 요청한다.


스파우트가 데이터를 어떻게 생성해주냐에 따라 스파우트를 크게 세 가지 종류로 구분할 수 있다.

  • Transactional: 한 트랜잭션 ID에 대해 매 번 동일한 튜플 묶음을 생성함을 보장한다. 한 트랜잭션 ID에 속한 튜플이 다른 트랜잭션ID에 포함되지 않는다.
  • Opaque-transactional: 한 튜플은 한 트랜잭션ID에만 포함된다. 단, 한 트랜잭션ID에 대해 매 번 동일한 튜플 묶음을 생성함을 보장하진 않는다. 
  • Non-transactional: 한 튜플이 한 트랜잭션ID에만 포함된다는 것을 보장하지 않는다.

트라인던트 스파우트를 구현하려면 스톰이 제공하는 인터페이스를 상속받아야 한다. 트라이던트 스파우트 구현을 위한 몇 가지 종류의 인터페이스가 존재하는데, 본 글에서는 그 중에서 ITridentSpout 인터페이스를 이용한 구현 방법에 대해 살펴볼 것이다.


ITridentSpout 인터페이스를 이용한 스파우트 구현


ITridentSpout 인터페이스를 이용해서 스파우트를 구현하려면, 다음의 세 인터페이스에 대한 이해가 필요하다.



Emitter, BatchCoordinator, ITridentSpout는 각각 다음과 같은 역할을 한다.

  • Emitter: 특정 트랜잭션에 속할 튜플을 생성한다.
  • BatchCoordinator: 특정 트랜잭션 ID에 해당하는 메타 데이터를 생성하고, 튜플 생성을 준비한다.
  • ITridentSpout: 스톰에 Emitter 객체와 BatchCoordinator 객체를 제공한다. 또한, Emitter가 생성할 튜플의 필드 정보를 정의한다.
스톰이 사용할 트라이던트 스파우트를 제공하려면 위 세 개의 인터페이스를 알맞게 구현해 주어야 한다. 본 글에서 만들어볼 예제는 어딘가에서 일정 개수의 로그를 읽어와 튜플로 전달한다. 튜플에 포함될 필드는 1개 뿐이고 그 필드의 이름을 "log"라고 한다면, ITridentSpout 인터페이스는 다음과 같이 구현할 수 있다.

@SuppressWarnings("rawtypes")
public class LogSpout implements ITridentSpout<Long> {

    private static final long serialVersionUID = 1L;

    @Override
    public BatchCoordinator<Long> getCoordinator(
             String txStateId, Map conf, TopologyContext context) {
        return new LogBatchCoordinator();
    }

    @Override
    public Emitter<Long> getEmitter(String txStateId, Map conf, TopologyContext context) {
        return new LogEmitter();
    }

    @Override
    public Map getComponentConfiguration() {
        return null;
    }

    @Override
    public Fields getOutputFields() {
        return new Fields("logString");
    }
}

ITrdientSpout의 타입 파라미터를 Long으로 지정했는데, 여기서 Long은 뒤에서 설명할 트랜잭션 메타 데이터의 타입이 된다. getCoordinator()와 getEmitter()는 각각 Coordinator와 Emitter를 생성하면 된다. 이 두 메서드의 txStateId는 스트림을 생성할 때 입력한 값이 사용된다.

TridentTopology topology = new TridentTopology();
topology.newStream("log", new LogSpout()) // "log"가 txStateId 값으로 사용
    .each(new Fields("logString"), new OrderLogFilter())
    ....

getCoordinator()와 getEmitter()의 conf 파라미터는 getComponentConfiguration() 메서드에서 리턴한 Map 객체가 전달된다.

다음은 BatchCoordinator 인터페이스를 구현한 클래스의 예다.

public class LogBatchCoordinator implements BatchCoordinator<Long> {
    private static final Logger LOG = LoggerFactory.getLogger(LogBatchCoordinator.class);

    @Override
    public Long initializeTransaction(long txid, Long prevMetadata, Long currMetadata) {
        if (prevMetadata == null) {
            return 1L;
        }
        return prevMetadata + 1L;
    }

    @Override
    public void success(long txid) {
        LOG.info("BatchCoordinator.success(txid={})", txid);
    }

    @Override
    public boolean isReady(long txid) {
        // 튜플을 생성할 준비가 되면 true를 리턴한다.
        LOG.info("BatchCoordinator.isReady({})", new Object[] { txid });
        return true;
    }

    @Override
    public void close() {
        LOG.info("BatchCoordinator.close()");
    }
}

BatchCoordinator 인터페이스의 각 메서드는 다음과 같은 기능을 제공해야 한다.

 메서드

설명 

 isReady

트랜잭션ID에 해당하는 튜플을 생성할 준비가 되었으면 true를 리턴한다.

 initializeTransaction

튜플을 발생하기 전에 현재 트랜잭션ID를 위한 메타데이터를 생성한다. 각 파라미터는 다음과 같다.

  • txid: 현재 트랜잭션 ID
  • prevMetadata: 이전 트랜잭션의 메타 데이터
  • currMetadata: 현재 트랜잭션을 처음 시도하면 null. 현재 트랜잭션을 재시도하는 것이면, 앞서 최초 시도할 때 생성한 메타 데이터.
 success

 트랜잭션ID에 해당하는 튜플들을 성공적으로 처리하면 호출된다.

 close 토폴로지를 종료할 때 호출된다.


Emitter 인터페이스의 구현 예는 다음과 같다.


public class LogEmitter implements Emitter<Long> {

    private static final Logger LOG = LoggerFactory.getLogger(LogEmitter.class);


    @Override

    public void emitBatch(TransactionAttempt tx, Long coordinatorMeta,

            TridentCollector collector) {

        LOG.info("Emitter.emitBatch({}, {}, collector)", tx, coordinatorMeta);

        List<String> logs = getLogs(coordinatorMeta);

        for (String log : logs) {

            List<Object> oneTuple = Arrays.<Object> asList(log);

            collector.emit(oneTuple);

        }

    }


    private List<String> getLogs(Long coordinatorMeta) {

        List<String> logs = new ArrayList<String>();

        ... // 로그를 어딘가에서 읽어와 List에 담음

        return logs;

    }


    @Override

    public void success(TransactionAttempt tx) {

        LOG.info("Emitter.success({})", tx);

    }


    @Override

    public void close() {

        LOG.info("Emitter.close()");

    }


}


Emitter 인터페이스의 각 메서드는 다음 기능을 제공한다.


메서드 

설명 

 emitBatch

트랜잭션ID에 속할 튜플을 생성한다. 각 파라미터는 다음과 같은 용도로 사용된다.

  • tx 파라미터: 트랜잭션ID와 재시도회수 제공
  • coordinatorMeta: BatchCoordinator가 생성한 메타 데이터
  • collector: 트랜잭션ID에 속하는 튜플을 토폴로지에 추가
 success

 트랜잭션에 속한 모든 튜플을 성공적으로 처리했을 때 실행된다.

 close

 토폴로지를 종료할 때 실행된다.


스톰 토폴로지를 구현할 때 어려운 부분 중 하나가 스파우트다. 스파우트의 종류에는 Transactional, Opaque transactional, non-transactional의 세 가지가 있다고 했는데, 이 중 transactional이나 opaque transactional을 구현하려면 트랜잭션ID와 트랜잭션 메타데이터를 활용해야 한다. 예를 들어, Emitter는 emitBatch()를 실행할 때 캐시에 트랜잭션ID를 키로 하고 생성한 튜플 목록을 값으로 하는 (키, 값) 쌍을 외부의 메모리 캐시에 보관할 수 있을 것이다. 이후, 튜플 처리에 실패해서 재전송을 해야 한다면 외부 메모리 캐시에서 읽어와 동일한 튜플을 재전송할 수 있을 것이다. 이 경우, success() 메서드에서 외부 메모리 캐시에서 트랜잭션ID에 해당하는 값을 삭제하도록 구현할 것이다.


연산 처리 구성 요소: Function, Filter


튜플을 생성하면 그 다음으로 할 일은 튜플을 가공해서 원하는 결과를 얻어내는 것이다. 보통은 튜플을 가공하고, 걸러내는 과정을 거친 뒤에, 집합 연산을 수행하게 된다. 이를 위해 스톰 트라이던트는 Function, Filter, Aggregator를 제공하는데, 먼저 Function과 Filter에 대해 살펴보도록 하자. 이 두 인터페이스와 관련된 상위 타입 및 하위 타입은 아래 그림과 같이 구성되어 있다.



Function과 Filter를 사용할 때 초기화가 필요하면 prepare() 메서드를 이용한다. 반대로 종료시 정리가 필요하면 cleanup() 메서드를 이용한다.


Filter 구현


Filter는 튜플을 걸러내는 기능을 제공한다. Filter 인터페이스에 정의된 isKeep() 메서드는 튜플을 계속해서 스트림에 흘려 보낼지 여부를 결정한다. 다음은 Filter 인터페이스의 구현 예다. isKeep() 메서드를 보면 튜플 데이터 중 "logString"을 읽어와 이 문자열이 "ORDER"로 시작하면 true를 리턴하고 그렇지 않으면 false를 리턴한다. 따라서, 로그 문자열 값이 "ORDER"로 시작하는 것만 토폴로지의 다음 단계에 전달된다.


public class OrderLogFilter extends BaseFilter {

    private static final long serialVersionUID = 1L;

    private static final Logger LOG = LoggerFactory.getLogger(OrderLogFilter.class);


    @Override

    public boolean isKeep(TridentTuple tuple) {

        String logString = tuple.getStringByField("logString");

        boolean pass = logString.startsWith("ORDER");

        if (!pass)

            LOG.info("OrderLogFilter filtered out {}", logString);

        return pass;

    }


}


위 필터를 사용한 토폴로지 구성 코드 예는 다음과 같다. 이 코드를 보면 LogSpout에 의해 생성된 튜플을 OrderLogFilter로 먼저 필터링하고 있다. 따라서, 튜플에 포함된 "logString"의 값이 "ORDER"로 시작하는 것만 그 다음 단계인 LogParser에 전달된다.


TridentTopology topology = new TridentTopology();

topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

       ...코드생략


each() 메서드의 첫 번째 파라미터는 Filter.isKeep() 메서드에서 사용할 튜플 필드 목록을 뜻한다. 즉, 위 코드의 경우 OrderLogFilter의 isKeep() 메서드는 "logString" 필드만 사용할 수 있다. 이 예에서는 LogSpout가 [logString:값]인 튜플을 생성하기 때문에, 사용할 필드가 "logString" 밖에 없지만, 만약 여러 개의 필드를 생성한다면 new Fields("field1", "field2")처럼 여러 필드명을 지정할 수 있다.


Function 구현


Function은 튜플에 새로운 데이터를 추가할 때 사용한다. Function 인터페이스는 튜플을 가공하기 위한 execute() 메서드를 정의하고 있으며, 이 메서드에서 튜플을 알맞게 가공하면 된다. 다음은 Function 인터페이스의 구현 예를 보여주고 있다.


public class LogParser extends BaseFunction {


    private static final long serialVersionUID = 1L;


    @Override

    public void execute(TridentTuple tuple, TridentCollector collector) {

        String log = (String) tuple.getStringByField("logString");

        ShopLog event = parseLog(log);

        collector.emit(Arrays.<Object>asList(event));

    }


    private ShopLog parseLog(String log) {

        String[] tokens = log.split(",");

        return new ShopLog(tokens[0], Long.parseLong(tokens[1]), Long.parseLong(tokens[2]));

    }


}


execute() 메서드는 tuple로부터 가공에 필요한 필드 값을 구한다. 구한 필드 값으로 알맞은 처리를 한 뒤, collector.emit()을 이용해서 결과로 생성할 필드 데이터를 추가한다. collector.emit() 메서드는 List<Object> 타입을 인자로 받는데, 이 때 List에 담긴 각 값이 토폴로지 정의시 지정한 추가 필드가 된다. 예를 들어, 다음과 같이 LogParser Function을 사용한 경우, 위 코드의 execute() 메서드에서 collector.emit() 메서드에서 추가한 값 목록은 아래 코드의 each() 메서드에서 세 번째 파라미터로 지정한 필드 목록에 매칭된다.


topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        ...코드생략


Function 사용시 유희할 점은 Function은 기존 튜플의 필드를 제거하지 않고, 새로운 튜플을 추가한다는 점이다. 예를 들어, 위 코드를 보면 LogSpout는 [logString:값]인 튜플을 생성한다. LogParser를 거치면 튜플은 [logString:값, shopLog:값] 이 된다. 이 상태에서 AddGroupingValueFunction을 거치면 튜플은 [logString:값, shopLog:값, productId:time:값]이 된다. each() 메서드의 첫 번째 파라미터는 이 튜플 필드 중 Function이나 Filter에 전달할 필드 목록을 지정하는 것이며, 세 번째 파라미터는 Function의 실행 결과로 추가되는 필드 목록을 지정하는 것이다.


참고로, AddGroupingValueFunction 클래스는 로그를 그룹핑할 때 사용할 필드를 추가하는 기능을 제공한다. 이 클래스는 새로운 필드로 "제품ID:기준시간" 값을 추가한다. 각 제품의 분 당 판매개수를 구할 것이기 때문에, 타임스탬프 값을 60000으로 나눈 값을 기준시간 으로 사용해 봤다.


public class AddGroupingValueFunction extends BaseFunction {


    private static final long serialVersionUID = 1L;


    @Override

    public void execute(TridentTuple tuple, TridentCollector collector) {

        ShopLog shopLog = (ShopLog) tuple.getValueByField("shopLog");

        long time = shopLog.getTimestamp() / (1000L * 60L);

        collector.emit(Arrays.<Object>asList(shopLog.getProductId() + ":" + time));

    }


}


연산처리 구성 요소: 그룹핑과 집합 연산


트라이던트로 스트림을 처리하면 많은 경우 집합 연산을 하게 된다. 예를 들어, 예제의 경우는 분당 제품 판매 개수를 구해야 하는데, 이는 "제품ID:시간"을 기준으로 판매 개수를 구하는 집합 연산을 필요로 한다. 이런 집합 연산을 처리할 수 있도록 트라이던트 API는 다음의 두 가지를 제공하고 있다.

  • 그룹 스트림: 배치에 속한 튜플들을 특정 필드를 기준으로 그룹핑한다. 이 스트림은 그룹 단위로 연산을 적용한다.
  • 집합 연산: 배치에 속한 튜플을 모아 단일 결과(튜플)를 생성한다.

이 두 기능을 사용하면 특정 필드를 기준으로 개수를 구한다거나 합을 구하는 등의 연산을 수행할 수 있다. 그룹 스트림을 생성하는 방법은 매우 간단하다. 토폴로지 정의에서 groupBy() 메서드를 사용하면 된다.


topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .groupBy(new Fields("productId:time"))

        .aggregate(new Fields("productId:time"), new Count(), new Fields("count"))

        ...생략


groupBy() 메서드는 지정한 필드를 이용해서 튜플을 그룹핑하는 GroupedStream을 리턴한다.



GroupedStream의 aggregate() 메서드를 사용하면 각 그룹별로 집합 연산을 수행하게 된다. 예를 들어, 아래 코드는 groupBy로 생성된 그룹에 대해 "productId:time" 필드를 기준으로 Count 집합 연산을 수행한다.


topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .groupBy(new Fields("productId:time"))

        .aggregate(new Fields("productId:time"), new Count(), new Fields("count"))

        ...생략


aggregate() 메서드의 세 번째 파라미터는 집합 연산 결과로 생성되는 필드를 정의한다. 위 코드의 경우 Count 집합 연산의 결과로 생성되는 필드가 한 개임을 알 수 있다. aggregate는 새로운 튜플을 생성하는데, GroupedStream의 aggregate는 첫 번째 파라미터로 지정한 필드와 세 번째 파라미터로 지정한 필드를 갖는 튜플을 생성한다. 즉, 위 코드의 경우 aggregate 메서드의 결과로 생성되는 튜플은 ["productId:time":값, "count":값] 으로 구성된다.


집합 연산 인터페이스


스톰 트라이던트 API는 집합 연산을 위한 인터페이스 세 개를 제공하며, 이들 인터페이스는 다음과 같다.



집합 연산: ReducerAggregator


ReducerAggregator 인터페이스는 다음과 같이 정의되어 있다.


public interface ReducerAggregator<T> extends Serializable {

    T init();

    T reduce(T curr, TridentTuple tuple);

}


ReducerAggregator의 동작 방식은 다음과 같다.

  1. 배치(또는 그룹)의 첫 튜플을 처리하기 전에 init()으로 초기값을 생성한다.
  2. 이 초기값을 현재값(curr)으로 설정한다.
  3. 각 튜플마다
    1. reduce(curr, 튜플)을 실행한다.
    2. 3-1의 결과를 curr에 설정한다.
  4. 마지막 튜플에 대한 reduct()의 결과값을 aggregate의 결과값으로 사용한다.
간단하게 튜플의 특정 필드의 합을 구하는 집합 연산을 ReducerAggregator로 구현해보면 다음과 같이 구할 수 있다.

public class Sum implements ReducerAggregator<Long> {
    private static final long serialVersionUID = 1L;

    @Override
    public Long init() {
        return 0L;
    }

    @Override
    public Long reduce(Long curr, TridentTuple tuple) {
        return curr + tuple.getLong(0);
    }
}


집합 연산: CombinerAggregator


CombinerAggregator 인터페이스는 다음과 같이 정의되어 있다.


public interface CombinerAggregator<T> extends Serializable {

    T init(TridentTuple tuple);

    T combine(T val1, T val2);

    T zero();

}


CombinerAggregator의 동작 방식은 다음과 같다.

  1. 배치(또는 그룹)에 튜플이 있다면, 각 튜플마다
    1. init()으로 튜플에 해당하는 값을 구한다.
    2. combine()을 이용해서 이전 combine()의 결과와 init()의 결과를 조합한 결과를 리턴한다.
      1. 첫 번째 튜플의 경우 이전 combine() 결과 값으로 zero() 값을 사용한다.

스톰은 CombinerAggregator의 구현인 Count를 제공하고 있는 Count의 코드는 다음과 같다. 배치(또는 그룹)의 튜플 개수를 셀 때 사용하는 CombinerAggregator이다.


public class Count implements CombinerAggregator<Long> {


    @Override

    public Long init(TridentTuple tuple) {

        return 1L;

    }


    @Override

    public Long combine(Long val1, Long val2) {

        return val1 + val2;

    }


    @Override

    public Long zero() {

        return 0L;

    }

    

}


집합 연산: Aggregator


Aggregator는 앞서 두 개 인터페이스보다 좀 더 범용적인 인터페이스를 정의하고 있다.


public interface Aggregator<T> extends Operation {

    T init(Object batchId, TridentCollector collector);

    void aggregate(T val, TridentTuple tuple, TridentCollector collector);

    void complete(T val, TridentCollector collector);

}


Aggregator는 다음과 같이 동작한다.

  1. 배치의 튜플을 처리하기 전에 init()을 실행하고, 집합 연산에 필요한 객체 val을 리턴한다.
  2. 각 튜플마다 aggregator 메서드를 호출한다. 첫 번째 파라미터는 1에서 리턴한 객체이다.
  3. 배치의 모든 튜플에 대한 처리가 끝나면 complete 메서드를 호출한다.
ReducerAggregator와 CombinerAggregator가 최종적으로 1개의 값을 생성하도록 제한된 인터페이스인 반면에, Aggregator는 한 개 이상의 튜플을 생성할 수 있다. 예를 들어, 다음은 배치(또는 그룹)에 있는 튜플 중에서 정렬 기준으로 N개의 튜플을 골라내는 Aggregator의 구현 코드이다. (스톰이 제공하는 클래스다.)

public static class FirstNSortedAgg extends BaseAggregator<PriorityQueue> {

    int _n;
    String _sortField;
    boolean _reverse;
    
    public FirstNSortedAgg(int n, String sortField, boolean reverse) {
        _n = n;
        _sortField = sortField;
        _reverse = reverse;
    }

    @Override
    public PriorityQueue init(Object batchId, TridentCollector collector) {
        return new PriorityQueue(_n, new Comparator<TridentTuple>() {
            @Override
            public int compare(TridentTuple t1, TridentTuple t2) {
                Comparable c1 = (Comparable) t1.getValueByField(_sortField);
                Comparable c2 = (Comparable) t2.getValueByField(_sortField);
                int ret = c1.compareTo(c2);
                if(_reverse) ret *= -1;
                return ret;
            }                
        });
    }

    @Override
    public void aggregate(PriorityQueue state, TridentTuple tuple, TridentCollector collector) {
        state.add(tuple);
    }

    @Override
    public void complete(PriorityQueue val, TridentCollector collector) {
        int total = val.size();
        for(int i=0; i<_n && i < total; i++) {
            TridentTuple t = (TridentTuple) val.remove();
            collector.emit(t);
        }
    }
}  


Aggregator를 이용해서 튜플 개수를 세는 코드는 다음과 같이 구현하고 있다.


public class CountAsAggregator extends BaseAggregator<CountAsAggregator.State> {


    static class State {

        long count = 0;

    }

    

    @Override

    public State init(Object batchId, TridentCollector collector) {

        return new State();

    }


    @Override

    public void aggregate(State state, TridentTuple tuple, TridentCollector collector) {

        state.count++;

    }


    @Override

    public void complete(State state, TridentCollector collector) {

        collector.emit(new Values(state.count));

    }

    

}


토플로지 구성 및 로컬 실행


스파우트와 Function, Filter, 집합 연산을 구현했다면, 남은 일은 TridentTopology를 이용해서 토폴로지를 구성하고 실행하는 것이다. 아래 코드는 토폴로지를 구성하고 로컬에서 실행하는 예제 코드를 보여주고 있다.


public class LogTopology {


    public static void main(String[] args) {

        TridentTopology topology = new TridentTopology();

        topology.newStream("log", new LogSpout())

                .each(new Fields("logString"), new OrderLogFilter())

                .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

                .each(new Fields("shopLog"), 

                         new AddGroupingValueFunction(), new Fields("productId:time"))

                .groupBy(new Fields("productId:time"))

                .aggregate(new Fields("productId:time"), new Count(), new Fields("count"))

                .each(new Fields("productId:time", "count"), 

                         new CountSumFunction(), new Fields("sum"))

                .each(new Fields("productId:time", "sum"), new ThresholdFilter())

                .each(new Fields("productId:time", "sum"), new AlertFilter());

        StormTopology stormTopology = topology.build();


        Config conf = new Config();

        conf.put("ThresholdFilter.value", 5L);

        LocalCluster cluster = new LocalCluster();

        cluster.submitTopology("cdc", conf, stormTopology);


        try {

            Thread.sleep(60000);

        } catch (InterruptedException e) {

        }

        cluster.shutdown();

    }


}


글에서 보여주지 않은 코드인 CountSumFunction, ThresholdFilter, AlertFilter 등은 참고자료에 있는 예제 코드 링크를 참고하기 바란다.


참고자료

  • 예제 코드: https://github.com/madvirus/storm-sample





Posted by 최범균 madvirus

댓글을 달아 주세요

  1. 96480XX 2014.10.01 15:10 신고  댓글주소  수정/삭제  댓글쓰기

    내가 아는 사람중에
    뭐 이렇게 좋은 내용의 블로그를 운영하는 사람이 있었다니 ...

서블릿 2.3에 새롭게 추가된 필터가 무엇이며, 어떻게 구현하는지에 대해서 살펴본다.

필터!!

현재 서블릿 2.3 규약은 Proposed Final Draft 2 상태에 있다. 조만간 서블릿 2.3과 JSP 1.2 최종 규약이 발표될 것으로 예상되며 우리는 당연히 새롭게 추가된 것들이 무엇인지에 관심이 쏠리게 된다. 서블릿 2.3 규약에 새롭게 추가된 것 중에 필자가 가장 눈여겨 본 것은 바로 필터(Filter) 기능의 추가이다.

그 동안 필자는 서블릿 2.2와 JSP 1.1에 기반하여 웹 어플리케이션을 구현하는 동안 몇몇 부분에서 서블릿 2.2 규약의 부족한 면을 느낄 수 있었으며, 특히 사용자 인증 처리, 요청 URL에 따른 처리, XSL/T를 이용한 XML 변환(Transformation) 등 개발자들이 직접 설계해야 하는 부분이 많았었다. 하지만, 이제 서블릿 2.3 규약에 새롭게 추가된 필터(Filter)를 사용함으로써 개발자들이 고민해야 했던 많은 부분을 덜어낼 수 있게 되었다. 이 글에서는 필터가 무엇이며 어떻게 필터를 구현하는지에 대해 살펴볼 것이다.

간단하게 말해서, 필터는 'HTTP 요청과 응답을 변경할 수 있는 재사용가능한 코드'이다. 필터는 객체의 형태로 존재하며 클라이언트로부터 오는 요청(request)과 최종 자원(서블릿/JSP/기타 문서) 사이에 위치하여 클라이언트의 요청 정보를 알맞게 변경할 수 있으며, 또한 필터는 최종 자원과 클라이언트로 가는 응답(response) 사이에 위치하여 최종 자원의 요청 결과를 알맞게 변경할 수 있다. 이를 그림으로 표현하면 다음과 같다.

그림1 - 필터의 기본 구조
그림1에서 자원이 받게 되는 요청 정보는 클라이언트와 자원 사이에 존재하는 필터에 의해 변경된 요청 정보가 되며, 또한 클라이언트가 보게 되는 응답 정보는 클라이언트와 자원 사이에 존재하는 필터에 의해 변경된 응답 정보가 된다. 위 그림에서는 요청 정보를 변경하는 필터와 응답 정보를 변경하는 필터를 구분해서 표시했는데 실제로 이 둘은 같은 필터이다. 단지 개념적인 설명을 위해 그림1과 같이 분리해 놓은 것 뿐이다.

필터는 그림1에서처럼 클라이언트와 자원 사이에 1개가 존재하는 경우가 보통이지만, 여러 개의 필터가 모여 하나의 체인(chain; 또는 사슬)을 형성할 수도 있다. 그림2는 필터 체인의 구조를 보여주고 있다.

그림2 - 필터 체인
그림2와 같이 여러 개의 필터가 모여서 하나의 체인을 형성할 때 첫번째 필터가 변경하는 요청 정보는 클라이언트의 요청 정보가 되지만, 체인의 두번째 필터가 변경하는 요청 정보는 첫번째 필터를 통해서 변경된 요청 정보가 된다. 즉, 요청 정보는 변경에 변경에 변경을 거듭하게 되는 것이다. 응답 정보의 경우도 요청 정보와 비슷한 과정을 거치며 차이점이 있다면 필터의 적용 순서가 요청 때와는 반대라는 것이다. (그림2를 보면 이를 알 수 있다.)

필터는 변경된 정보를 변경하는 역할 뿐만 아니라 흐름을 변경하는 역할도 할 수 있다. 즉, 필터는 클라이언트의 요청을 필터 체인의 다음 단계(결과적으로는 클라이언트가 요청한 자원)에 보내는 것이 아니라 다른 자원의 결과를 클라이언트에 전송할 수 있다. 필터의 이러한 기능은 사용자 인증이나 권한 체크와 같은 곳에서 사용할 수 있다.

필터 관련 인터페이스 및 클래스

필터를 구현하는데 있어 핵심적인 역할을 인터페이스 및 클래스가 3개가 있는 데, 그것들은 바로 javax.servlet.Filter 인터페이스, javax.servlet.ServletRequestWrapper 클래스, javax.servlet.ServletResponseWrapper 클래스이다. 이 중 Filter 인터페이스는 클라이언트와 최종 자원 사이에 위치하는 필터를 나타내는 객체가 구현해야 하는 인터페이스이다. 그리고 ServletRequestWrapper 클래스와 SerlvetResponseWrapper 클래스는 필터가 요청을 변경한 결과 또는 응답을 변경할 결과를 저장할 래퍼 클래스를 나타내며, 개발자는 이 두 클래스를 알맞게 상속하여 요청/응답 정보를 변경하면 된다.

Filter 인터페이스

먼저, Filter 인터페이스부터 살펴보자. Filter 인터페이스에는 다음과 같은 메소드가 선언되어 있다.

  • public void init(FilterConfig filterConfig) throws ServletException
    필터를 웹 콘테이너내에 생성한 후 초기화할 때 호출한다.
  • public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws java.io.IOException, ServletException
    체인을 따라 다음에 존재하는 필터로 이동한다. 체인의 가장 마지막에는 클라이언트가 요청한 최종 자원이 위치한다.
  • public void destroy()
    필터가 웹 콘테이너에서 삭제될 때 호출된다.
위 메소드에서 필터의 역할을 하는 메소드가 바로 doFilter() 메소드이다. 서블릿 콘테이너는 사용자가 특정한 자원을 요청했을 때 그 자원 사이에 필터가 존재할 경우 그 필터 객체의 doFilter() 메소드를 호출하며, 바로 이 시점부터 필터가 작용하기 시작한다. 다음은 전형적인 필터의 구현 방법을 보여주고 있다.

  public class FirstFilter implements javax.servlet.Filter {
  
     public void init(FilterConfig filterConfig) throws ServletException {
        // 필터 초기화 작업
     }
     
     public void doFilter(ServletRequest request,
                          ServletResponse response,
                          FilterChain chain)
                          throws IOException, ServletException {
        // 1. request 파리미터를 이용하여 요청의 필터 작업 수행
        // 2. 체인의 다음 필터 처리
        chain.doFilter(request, response);        // 3. response를 이용하여 응답의 필터링 작업 수행
     }
     
     public void destroy() {
        // 주로 필터가 사용한 자원을 반납
     }
  }

위 코드에서 Filter 인터페이스의 doFilter() 메소드는 javax.servlet.Servlet 인터페이스의 service() 메소드와 비슷한 구조를 갖는다. 즉 만약 클라이언트의 자원 요청이 필터를 거치는 경우, 클라이언트의 요청이 있을 때 마다 doFilter() 메소드가 호출되며, doFilter() 메소드는 서블릿과 마찬가지로 각각의 요청에 대해서 알맞은 작업을 처리하게 되는 것이다.

위 코드를 보면 doFilter() 메소드는 세번째 파라미터로 FilterChain 객체를 전달받는 것을 알 수 있다. 이는 클라이언트가 요청한 자원에 이르기까지 클라이언트의 요청이 거쳐가게 되는 필터 체인을 나타낸다. FilterChain을 사용함으로써 필터는 체인에 있는 다음 필터에 변경한 요청과 응답을 건내줄 수 있게 된다.

위 코드를 보면서 우리가 또 하나 알아야 하는 것은 요청을 필터링한 필터 객체가 또 다시 응답을 필터링한다는 점이다. 위 코드의 doFilter() 메소드를 보면 1, 2, 3 이라는 숫자를 사용하여 doFilter() 메소드 내에서 이루어지는 작업의 순서를 표시하였는데, 그 순서를 다시 정리해보면 다음과 같다.

  1. request 파리미터를 이용하여 클라이언트의 요청 필터링
    1 단계에서는 RequestWrapper 클래스를 사용하여 클라이언트의 요청을 변경한다.
  2. chain.doFilter() 메소드 호출
    2 단계에서는 요청의 필터링 결과를 다음 필터에 전달한다.
  3. response 파리미터를 사용하여 클라이트로 가는 응답 필터링
    3 단계에서는 체인을 통해서 전달된 응답 데이터를 변경하여 그 결과를 클라이언트에 전송한다.
1단계와 3단계 사이에서 다음 필터로 이동하기 때문에 요청의 필터 순서와 응답의 필터 순서는 그림2에서 봤듯이 반대가 된다.

필터의 설정

필터를 사용하기 위해서는 어떤 필터가 어떤 자원에 대해서 적용된다는 것을 서블릿/JSP 콘테이너에 알려주어야 한다. 서블릿 규약은 웹 어플리케이션과 관련된 설정은 웹 어플리케이션 디렉토리의 /WEB-INF 디렉토리에 존재하는 web.xml 파일을 통해서 하도록 하고 있으며, 필터 역시 web.xml 파일을 통해서 설정하도록 하고 있다.

web.xml 파일에서 필터를 설정하기 위해서는 다음과 같이 <filter> 태그와 <filter-mapping> 태그를 사용하면 된다.

  <web-app>
     
     <filter>
        <filter-name>HighlightFilter</filter-name>
        <filter-class>javacan.filter.HighlightFilter</filter-class>
        <init-param>
           <param-name>paramName</param-name>
           <param-value>value</param-value>
        </init-param>
     </filter>
     
     <filter-mapping>
        <filter-name>HighlightFilter</filter-name>
        <url-pattern>*.txt</url-pattern>
     </filter-mapping>
     
  </web-app>

여기서 <filter> 태그는 웹 어플리케이션에서 사용될 필터를 지정하는 역할을 하며, <filter-mapping> 태그는 특정 자원에 대해 어떤 필터를 사용할지를 지정한다. 위 예제의 경우는 클라이언트가 txt 확장자를 갖는 자원을 요청할 경우 HithlightFilter가 사용되도록 지정하고 있다.

<init-param> 태그는 필터가 초기화될 때, 즉 필터의 init() 메소드가 호출될 때 전달되는 파라미터 값이다. 이는 서블릿의 초기화 파라미터와 비슷한 역할을 하며 주로 필터를 사용하기 전에 초기화해야 하는 객체나 자원을 할당할 때 필요한 정보를 제공하기 위해 사용된다.

<url-pattern> 태그는 클라이언트가 요청한 특정 URI에 대해서 필터링을 할 때 사용된다. 서블릿 2.3 규약의 11장을 보면 다음과 같이 url-pattern의 적용 기준을 명시하고 있다.

  • '/'로 시작하고 '/*'로 끝나는 url-pattern은 경로 매핑을 위해서 사용된다.
  • '*.'로 시작하는 url-pattern은 확장자에 대한 매핑을 할 때 사용된다.
  • 나머지 다른 문자열을 정확한 매핑을 위해서 사용된다.
에를 들어, 다음과 같이 <filter-mapping> 태그를 지정하였다고 해 보자.

     <filter-mapping>
        <filter-name>AuthCheckFilter</filter-name>
        <url-pattern>/pds/*</url-pattern>
     </filter-mapping>

이 경우 클라이언트가 /pds/a.zip 을 요청하든 /pds/b.zip 을 요청하는지에 상관없이 AuthCheckFilter가 필터로 사용될 것이다.

<url-pattern> 태그를 사용하지 않고 대신 <servlet-name> 태그를 사용함으로써 특정 서블릿에 대한 요청에 대해서 필터를 적용할 수도 있다. 예를 들면 다음과 같이 이름이 FileDownload인 서블릿에 대해서 AuthCheckFilter를 필터로 사용하도록 할 수 있다.

     <filter-mapping>
        <filter-name>AuthCheckFilter</filter-name>
        <servlet-name>FileDownload</servlet-name>
     </filter-mapping>
     
     <servlet>
        <servlet-name>FileDownload</servlet-name>
        ...
     </servlet>

래퍼 클래스

필터가 필터로서의 제기능을 하기 위해서는 클라이언트의 요청을 변경하고, 또한 클라이언트로 가는 응답을 변경할 수 있어야 할 것이다. 이러한 변경을 할 수 있도록 해 주는 것이 바로 ServletRequestWrapper와 ServletResponseWrapper이다. 서블릿 요청/응답 래퍼 클래스를 이용함으로써 클라이언트의 요청 정보를 변경하여 최종 자원인 서블릿/JSP/HTML/기타 자원에 전달할 수 있고, 또한 최종 자원으로부터의 응답 결과를 변경하여 새로운 응답 정보를 클라이언트에 보낼 수 있게 된다.

서블릿 요청/응답 래퍼 클래스로서의 역할을 수행하기 위해서는 javax.servlet 패키지에 정의되어 있는 ServletRequestWrapper 클래스와 ServletResponseWrapper 클래스를 상속받으면 된다. 하지만, 대부분의 경우 HTTP 프로토콜에 대한 요청/응답을 필터링 하기 때문에 이 두 클래스를 상속받아 알맞게 구현한 HttpServletRequestWrapper 클래스와 HttpServletResponseWrapper 클래스를 상속받는 경우가 대부분일 것이다.

HttpServletRequestWrapper 클래스와 HttpServletResponseWrapper 클래스는 모두 javax.servlet.http 패키지에 정의되어 있으며, 이 두 클래스는 각각 HttpServletRequest 인터페이스와 HttpServletResponse 인터페이스에 정의되어 있는 모든 메소드를 이미 구현해 놓고 있다. 필터를 통해서 변경하고 싶은 정보가 있을 경우 그 정보를 추출하는 메소드를 알맞게 오버라이딩하여 필터의 doFilter() 메소드에 넘겨주기만 하면 된다. 예를 들어, 클라이언트가 전송한 "company" 파리머터의 값을 무조건 "JavaCan.com"으로 변경하는 요청 래퍼 클래스는 다음과 같이 HttpServletRequestWrapper 클래스를 상속받은 후에 getParameter() 메소드를 알맞게 구현하면 된다.

  package javacan.filter;
  
  import javax.servlet.http.*;
  
  public class ParameterWrapper extends HttpServletRequestWrapper {
     
     public ParameterWrapper(HttpServletRequest wrapper) {
        super(wrapper);
     }
     
     public String getParameter(String name) {
        if ( name.equals("company") ) {
           return "JavaCan.com";
        } else {
           return super.getParameter(name);
        }
     }
  }

오버라이딩한 getParameter() 메소드를 살펴보면 값을 구하고자 하는 파라미터의 이름이 "company"일 경우 "JavaCan.com"을 리턴하고 그렇지 않을 경우에는 상위 클래스(즉, HttpServletRequestWrapper 클래스)의 getParameter() 메소드를 호출하는 것을 알 수 있다.

이렇게 작성한 래퍼 클래스는 필터 체인을 통해서 최종 자원까지 전달되어야 그 효과가 있을 것이다. 즉, 최종 자원인 서블릿/JSP에서 request.getParameter("company")를 호출했을 때 ParameterWrapper 클래스의 getParameter() 메소드가 사용되기 위해서는 ParameterWrapper 객체가 HttpServletRequest 객체를 대체해야 하는데, 이는 Filter 인터페이의 doFilter() 내에서 ParameterWrapper 객체를 생성한 후 파라미터로 전달받은 FilterChain의 doFilter() 메소드를 호출함으로써 가능하다. 좀 복잡하게 느껴질지도 모르겠으나 이를 코드로 구현해보면 다음과 같이 간단한다.

  package javacan.filter;
  
  import javax.servlet.*;
  import javax.servlet.http.*;
  
  public class ParameterFilter implements Filter {
     
     private FilterConfig filterConfig;
     
     public ParameterFilter() {
     }
     
     public void init(FilterConfig filterConfig) {
        this.filterConfig = filterConfig;
     }
     
     public void destroy() {
        filterConfig = null;
     }
     
     public void doFilter(ServletRequest request,
                          ServletResponse response,
                          FilterChain chain)
                          throws java.io.IOException, ServletException {
        // 요청 래퍼 객체 생성
        HttpServletRequestWrapper requestWrapper = 
                     new ParameterWrapper((HttpServletRequest)request);
        // 체인의 다음 필터에 요청 래퍼 객체 전달
        chain.doFilter(requestWrapper, response);     }
  }

응답 래퍼 클래스 역시 요청 래퍼 클래스와 비슷한 방법으로 구현된다.

앞에서도 언급했듯이 요청 정보의 변경 및 응답 정보 변경의 출발점은 래퍼 클래스이다. XML+XSL/T 기법이나 사용자 인증과 같은 것들을 최종 자원과 분리시켜 객체 지향적으로 구현하기 위해서 요청/응답 래퍼 클래스를 사용하는 것은 필수적이다. 2부에서 실제 예를 통해서 어떻게 필터와 요청/응답 래퍼 클래스를 효과적으로 사용할 수 있는 지 살펴보게 될 것이다.

필터 체인의 순서

앞에서 필터는 체인을 형성할 수 있다고 하였다. 체인을 형성한다는 것은 어떤 특정한 순서에 따라 필터가 적용된다는 것을 의미한다. 예를 들면, 여러분은 '인증필터->파라미터 변환 필터->XSL/T 필터->자원->XSL/T 필터->파라미터 변환 필터->인증필터'와 같이 특정 순서대로 필터를 적용하길 원할 것이다. 서블릿2.3 규약은 다음과 같은 규칙에 기반하여 필터 체인 내에서 필터의 적용 순서를 결정한다.

  1. url-pattern 매칭은 web.xml 파일에 표기된 순서대로 필터 체인을 형성한다.
  2. 그런 후, servlet-name 매칭이 web.xml 파일에 표기된 순서대로 필터 체인을 형성한다.
결론

이번 1 부에서는 서블릿 2.3 규약에 새롭게 추가된 필터가 무엇인지 그리고 필터를 어떻게 구현하며 또한 필터를 어떻게 서블릿이나 JSP와 같은 자원에 적용할 수 있는지에 대해서 알아보았다. 아직 구체적으로 필터의 응용방법에 대해서 설명하지 않았기 때문에 필터의 장점이 머리에 떠오르지 않을것이다. 다음 2 부에서는 구체적으로 필터의 예를 살펴봄으로써 필터의 활용함으로써 얻게 되는 장점에 대해서 살펴보도록 하자.

관련링크:
Posted by 최범균 madvirus

댓글을 달아 주세요

  1. 최자84 2010.07.23 10:19 신고  댓글주소  수정/삭제  댓글쓰기

    정말 잘 보고 갑니다. 큰 도움이 되었습니다.

  2. 감사합니다. 2012.05.01 16:57 신고  댓글주소  수정/삭제  댓글쓰기

    감사합니다.
    변경한 사항과 Filter부분 연동하는 부분이 필요했는데, 정말 도움 많이 받앗습니다.

  3. 임성수 2013.05.13 17:50 신고  댓글주소  수정/삭제  댓글쓰기

    정말 잘 보고 갑니다..
    개념이 딱 들어서는거 같습니다.

    그냥 지나가기 아까운 자료인지라..
    출처 명시하고 제 블로그에 담아놔도 될런지요?

  4. 초보개발자 2013.08.20 09:49 신고  댓글주소  수정/삭제  댓글쓰기

    감사합니다! 개념도 쏙쏙 잘들어오고, 정리가 잘되서 많은 도움이되었습니다^^
    doFilter() 메소드에서 chain.doFilter(requestWrapper, response);를 호출하는게 두번째 filter class 맞지요? doFilter를 다른 controller단에서 호출하여 제어해서 쓰는게 맞는지요??@_@
    개념찾아보다가 좋은글 보게되어 제가 생각한부분이 맞는지 문의드리고갑니다^^;

  5. 많이 감사합니다 2015.10.02 18:12 신고  댓글주소  수정/삭제  댓글쓰기

    지금 처음 파견나와서 엄청 힘든데 많은 도움되었습니다. 감사합니다.

  6. 김도원 2016.01.22 11:03 신고  댓글주소  수정/삭제  댓글쓰기

    잘 보고 갑니다.

    좋은 자료라 제 개인블로그에 출처 명시후 스크랩해도 괜찮을까요?

  7. 훈훈 2016.12.28 21:10 신고  댓글주소  수정/삭제  댓글쓰기

    http://blog.naver.com/runajoker/220897175025
    저의 블로그에 출처 명시하고 part1, part2 퍼갔습니다. 문제가 될 경우 삭제 처리 하겠습니다. 저번에도 검색하다가 이곳에 와서 좋은 정보 얻어갔는데 이번에도 또 좋은 정보 얻어가네요 감사합니다.

  8. 유니윤 2017.07.27 17:58 신고  댓글주소  수정/삭제  댓글쓰기

    16년이나된 자룐데 정말 자세히 그리고 이해하기 쉽게 체계적으로 잘 설명을 해주셨네요ㅎㅎ 덕분에 filter에 대해서 어느 정도 감을 잡고 갑니다ㅎ 아무쪼록 무더운 여름 잘 나시길 바라겠습니다