스톰은 토폴로지가 메트릭을 만들 때 필요한 API를 제공하고 있다. 이 API를 사용하면 내가 만든 토폴로지에서 원하는 메트릭을 발생하고 이를 쉽게 처리할 수 있다.
IMetric으로 토폴로지에서 메트릭 발생하기
토폴로지에서 메트릭을 발생시키는 방법은 다음과 같이 간단하다.
- 메트릭을 발생시키고 싶은 Bolt의 prepare() 메서드 또는 Spout의 open() 메서드에서 IMetric을 구현한 객체를 생성하고 메트릭으로 등록한다.
- 1에서 생성한 IMetric 객체를 이용해서 필요할 때 메트릭 값을 조정한다.
다음은 스톰 스타터 샘플에 있는 Bold에 메트릭 생성 기능을 추가한 예이다.
public class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
transient CountMetric countMetric;
transient MultiCountMetric wordCountMetric;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
// IMetric을 구현한 객체를 생성하고
countMetric = new CountMetric();
wordCountMetric = new MultiCountMetric();
// context에 메트릭을 등록
context.registerMetric("word_count", wordCountMetric, 10);
context.registerMetric("execute_count", countMetric, 20);
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
String word = tuple.getString(0);
// 튜플 처리할 때 마다 원하는 메트릭 값을 조정
countMetric.incr();
wordCountMetric.scope(word).incr();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
이 코드에서 context.registerMetric() 메서드의 파라미터는 순서대로 메트릭 이름, 메트릭 객체, 메트릭 버킷 크기를 의미한다. 메트릭 버킷 크기는 초 단위이다. 위 코드에서 "word_count" 메트릭 이름은 10초 메트릭 버킷 크기를 설정했는데, 이 경우 스톰은 10초 마다 해당 메트릭 객체로부터 값을 읽어오고 메트릭을 리셋한다.(정확하게는 10초 마다 메트릭 객체의 getValueAndReset() 메서드를 호출한다.)
예를 들어, 0-10초 사이에 countMetric.incr()을 4번 호출했고, 10-20초 사이에 countMetric.incr()을 5번 호출했다고 하자. 이 경우 스톰이 읽어가는 메트릭 값은 10초에 4이고 20초에 5가 된다.
IMetricsConsumer로 메트릭 수집하기
스톰은 메트릭 객체에서 값을 읽어와 그 값을 IMetricsConsumer에 전달한다. 즉, 스톰 토폴로지가 발생한 메트릭을 알맞게 처리하고 싶다면 IMetricsConsumer를 구현한 클래스를 작성하면 된다.
IMetricsConsumer 인터페이스는 다음과 같이 정의되어 있다.
public interface IMetricsConsumer {
void prepare(Map stormConf, Object registrationArgument,
TopologyContext context, IErrorReporter errorReporter);
void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints);
void cleanup();
}
각 메서드는 다음과 같다.
- prepare() : DB나 소켓 연결처럼 메트릭 처리에 필요한 사전 준비를 한다.
- handleDataPoints() : 토폴로지에서 읽어온 메트릭을 처리한다.
- cleanup() : DB나 소켓 등 종료 처리를 한다.
토폴로지 이름이 필요한 경우 prepare() 메서드에서 다음과 같은 방법으로 토폴로지 이름을 구할 수 있다.
import backtype.storm.Config;
import backtype.storm.metric.api.IMetricsConsumer;
import backtype.storm.task.IErrorReporter;
import backtype.storm.task.TopologyContext;
public class MetricsCollector implements IMetricsConsumer {
public static final Logger LOG = LoggerFactory.getLogger(MetricsCollector.class);;
private String topologyName;
private DwmonMetricSender metricSender;
@Override
public void prepare(Map stormConf, Object registrationArgument,
TopologyContext context, IErrorReporter errorReporter) {
LOG.info("Preparing Metric Collector");
topologyName = (String) stormConf.get(Config.TOPOLOGY_NAME);
}
handleDataPoints()에서 메트릭 값 처리하기
스톰은 토폴로지가 등록한 메트릭에서 주기적으로 값을 읽어와 IMetricsConsumer#handleDataPoints() 메서드에서 전달하므로, 이 메서드에서 메트릭을 알맞게 처리하면 된다.
public class MetricsCollector implements IMetricsConsumer {
@Override
public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
// taskInfo.srcWorkerHost: String - 워커 호스트
// taskInfo.srcWorkerPort: int - 워커 포트
// taskInfo.srcComponentId: String - 컴포넌트 ID
// taskInfo.srcTaskId: int - 태스크 ID
// taskInfo.timestamp: long - 메트릭 수집 시간(long, 단위 초)
// taskInfo.updateIntervalSecs: long - 메트릭 수집 주기(단위 초)
for (DataPoint dp : dataPoints) {
// dp.name: String - 메트릭 이름
// dp.value: Object - 메트릭 값, IMetric 구현마다 값 타입이 다름
//
}
}
IMetric 타입별로 값 타입이 다른데, 주요 구현체의 값 타입은 다음과 같다.
- CountMetric - Long
- MultiCountMetric - Map<String, Long>
스톰은 토폴로지에서 직접 발생하는 메트릭뿐만 아니라 스톰이 토폴로지와 관련해서 발생시키는 메트릭 값도 IMectirsConcumer에 전달한다. 따라서 IMectirsConcumer는 처리하려는 메트릭을 알맞게 걸러내야 한다. 스톰 자체 메트릭인 경우 메트릭 이름이 "__", "GC/", "memory/"와 같은 값으로 시작하므로, 토폴로지에서 직접 생성하는 메트릭만 처리하고 싶다면 이 이름을 가진 메트릭을 제외한 나머지 메트릭만 처리하면 된다.
스톰 설정에서 IMetricsConsumer 구현 클래스 등록하기
이제 남은 것은 스톰이 토폴로지가 발생한 메트릭을 IMetricsConsumer에 전달하도록 설정하는 것이다. 다음의 두 가지 방법 중 하나를 이용해서 설정할 수 있다.
- storm.yaml 파일에 설정하기
- 토폴로지 코드에서 설정하기
토폴로지 코드에서 직접 지정하고 싶다면 다음 코드처럼 Config#registerMetricsConsumer()를 이용해서 지정하면 된다.
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new TestWordSpout(), 4);
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
Config conf = new Config();
conf.registerMetricsConsumer(MetricsCollector.class, 1);
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(6);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(60000);
cluster.killTopology("test");
cluster.shutdown();
}
}