주요글: 도커 시작하기
반응형

스톰은 토폴로지가 메트릭을 만들 때 필요한 API를 제공하고 있다. 이 API를 사용하면 내가 만든 토폴로지에서 원하는 메트릭을 발생하고 이를 쉽게 처리할 수 있다.


IMetric으로 토폴로지에서 메트릭 발생하기


토폴로지에서 메트릭을 발생시키는 방법은 다음과 같이 간단하다.

  1. 메트릭을 발생시키고 싶은 Bolt의 prepare() 메서드 또는 Spout의 open() 메서드에서 IMetric을 구현한 객체를 생성하고 메트릭으로 등록한다.
  2. 1에서 생성한 IMetric 객체를 이용해서 필요할 때 메트릭 값을 조정한다.

다음은 스톰 스타터 샘플에 있는 Bold에 메트릭 생성 기능을 추가한 예이다.


public class ExclamationBolt extends BaseRichBolt {

    OutputCollector _collector;


    transient CountMetric countMetric;

    transient MultiCountMetric wordCountMetric;


    @Override

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

        _collector = collector;


        // IMetric을 구현한 객체를 생성하고

        countMetric = new CountMetric();

        wordCountMetric = new MultiCountMetric();


        // context에 메트릭을 등록

        context.registerMetric("word_count", wordCountMetric, 10);

        context.registerMetric("execute_count", countMetric, 20);

    }


    @Override

    public void execute(Tuple tuple) {

        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));

        _collector.ack(tuple);


        String word = tuple.getString(0);


        // 튜플 처리할 때 마다 원하는 메트릭 값을 조정

        countMetric.incr();

        wordCountMetric.scope(word).incr();

    }


    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("word"));

    }


}


이 코드에서 context.registerMetric() 메서드의 파라미터는 순서대로 메트릭 이름, 메트릭 객체, 메트릭 버킷 크기를 의미한다. 메트릭 버킷 크기는 초 단위이다. 위 코드에서 "word_count" 메트릭 이름은 10초 메트릭 버킷 크기를 설정했는데, 이 경우 스톰은 10초 마다 해당 메트릭 객체로부터 값을 읽어오고 메트릭을 리셋한다.(정확하게는 10초 마다 메트릭 객체의 getValueAndReset() 메서드를 호출한다.)


예를 들어, 0-10초 사이에 countMetric.incr()을 4번 호출했고, 10-20초 사이에 countMetric.incr()을 5번 호출했다고 하자. 이 경우 스톰이 읽어가는 메트릭 값은 10초에 4이고 20초에 5가 된다.


IMetricsConsumer로 메트릭 수집하기


스톰은 메트릭 객체에서 값을 읽어와 그 값을 IMetricsConsumer에 전달한다. 즉, 스톰 토폴로지가 발생한 메트릭을 알맞게 처리하고 싶다면 IMetricsConsumer를 구현한 클래스를 작성하면 된다.


IMetricsConsumer 인터페이스는 다음과 같이 정의되어 있다.


public interface IMetricsConsumer {

    void prepare(Map stormConf, Object registrationArgument, 

            TopologyContext context, IErrorReporter errorReporter);

    void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints);

    void cleanup();

}


각 메서드는 다음과 같다.

  • prepare() : DB나 소켓 연결처럼 메트릭 처리에 필요한 사전 준비를 한다.
  • handleDataPoints() : 토폴로지에서 읽어온 메트릭을 처리한다.
  • cleanup() : DB나 소켓 등 종료 처리를 한다.

토폴로지 이름이 필요한 경우 prepare() 메서드에서 다음과 같은 방법으로 토폴로지 이름을 구할 수 있다.


import backtype.storm.Config;

import backtype.storm.metric.api.IMetricsConsumer;

import backtype.storm.task.IErrorReporter;

import backtype.storm.task.TopologyContext;


public class MetricsCollector implements IMetricsConsumer {

    public static final Logger LOG = LoggerFactory.getLogger(MetricsCollector.class);;


    private String topologyName;


    private DwmonMetricSender metricSender;


    @Override

    public void prepare(Map stormConf, Object registrationArgument, 

            TopologyContext context, IErrorReporter errorReporter) {

        LOG.info("Preparing Metric Collector");


        topologyName = (String) stormConf.get(Config.TOPOLOGY_NAME);

    }


handleDataPoints()에서 메트릭 값 처리하기


스톰은 토폴로지가 등록한 메트릭에서 주기적으로 값을 읽어와 IMetricsConsumer#handleDataPoints() 메서드에서 전달하므로, 이 메서드에서 메트릭을 알맞게 처리하면 된다.


public class MetricsCollector implements IMetricsConsumer {


    @Override

    public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {

        // taskInfo.srcWorkerHost: String - 워커 호스트

        // taskInfo.srcWorkerPort: int - 워커 포트 

        // taskInfo.srcComponentId: String - 컴포넌트 ID

        // taskInfo.srcTaskId: int - 태스크 ID

        // taskInfo.timestamp: long - 메트릭 수집 시간(long, 단위 초)

        // taskInfo.updateIntervalSecs: long - 메트릭 수집 주기(단위 초)


        for (DataPoint dp : dataPoints) {

            // dp.name: String - 메트릭 이름

            // dp.value: Object - 메트릭 값, IMetric 구현마다 값 타입이 다름

            // 

        }

    }



IMetric 타입별로 값 타입이 다른데, 주요 구현체의 값 타입은 다음과 같다.

  • CountMetric - Long
  • MultiCountMetric - Map<String, Long>

스톰은 토폴로지에서 직접 발생하는 메트릭뿐만 아니라 스톰이 토폴로지와 관련해서 발생시키는 메트릭 값도 IMectirsConcumer에 전달한다. 따라서 IMectirsConcumer는 처리하려는 메트릭을 알맞게 걸러내야 한다. 스톰 자체 메트릭인 경우 메트릭 이름이 "__", "GC/", "memory/"와 같은 값으로 시작하므로, 토폴로지에서 직접 생성하는 메트릭만 처리하고 싶다면 이 이름을 가진 메트릭을 제외한 나머지 메트릭만 처리하면 된다.


스톰 설정에서 IMetricsConsumer 구현 클래스 등록하기


이제 남은 것은 스톰이 토폴로지가 발생한 메트릭을 IMetricsConsumer에 전달하도록 설정하는 것이다. 다음의 두 가지 방법 중 하나를 이용해서 설정할 수 있다.

  • storm.yaml 파일에 설정하기
  • 토폴로지 코드에서 설정하기
storm.yaml 파일에 설정하려면, 다음과 같이 topology.metrics.consumer.register에 구현 클래스를 지정하면 된다.

topology.metrics.consumer.register:
  - class: "com.mycomp.storm.metrics.MetricsCollector"
    parallelism.hint: 1

토폴로지 코드에서 직접 지정하고 싶다면 다음 코드처럼 Config#registerMetricsConsumer()를 이용해서 지정하면 된다.


public static void main(String[] args) throws Exception {

    TopologyBuilder builder = new TopologyBuilder();


    builder.setSpout("word", new TestWordSpout(), 4);

    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");

    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");


    Config conf = new Config();

    conf.registerMetricsConsumer(MetricsCollector.class, 1);

    conf.setDebug(true);

    if (args != null && args.length > 0) {

        conf.setNumWorkers(6);

        StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());

    } else {

        LocalCluster cluster = new LocalCluster();

        cluster.submitTopology("test", conf, builder.createTopology());

        Utils.sleep(60000);

        cluster.killTopology("test");

        cluster.shutdown();

    }

}



+ Recent posts