Flume에 대한 커스텀 메트릭을 구현할 일이 생겨 그 내용을 정리한다.
구조
Flume의 메트릭을 가져와 처리하려면 Flume이 제공하는 MonitorService 인터페이스를 상속받아 구현하면 된다.
MonitorService 구현하기
MonitorService를 상속받은 클래스는 다음과 같이 세 개의 메서드만 알맞게 구현하면 된다.
- configure() : 초기화한다.
- start() : 메트릭을 처리하는 작업을 별도 쓰레드로 시작한다.
- stop() : 메트릭 처리 작업을 중지한다.
뼈대 코드는 다음과 같다.
public class CustomMonitorService implements MonitorService {
@Override
public void configure(Context context) {
// 초기화
}
@Override
public void start() {
// 별도 쓰레드를 이용해서 MetricReporter 실행
}
@Override
public void stop() {
// MetricReporter 실행 중지
}
}
configure() 구현
configure() 메서드는 필요한 설정 정보를 읽어와 초기화 작업을 수행한다. 아래 코드는 구현 예시이다.
public class CustomMonitorService implements MonitorService {
private int intervalSec;
private String hosts;
@Override
public void configure(Context context) {
// "flume.monitoring.pollInterval" 시스템 프로퍼티 값을 읽음. 없을 경우 기본 값으로 60 사용
this.intervalSec = context.getInteger("pollInterval", 60);
this.hosts = System.getProperty("custom.collector.hosts");
}
Context는 "flume.monitoring."로 시작하는 시스템 프로퍼티의 값을 읽어오는 기능을 제공한다. 위 코드에서 context.getInteger("pollInterval")은 flume.monitoring.pollInterval 시스템 프로퍼티 값을 읽어와 int 타입으로 변환한다. 해당 프로퍼티가 존재하지 않으면 60을 리턴한다.
사용할 시스템 프로퍼티가 "flume.monitoring."로 시작하지 않으면 System.getProperty() 메서드나 기타 다른 방법을 사용해서 필요한 설정 값을 직접 읽어온다.
start()와 stop() 구현
Flume은 configure() 메서드로 초기화를 한 뒤 start() 메서드를 실행한다. start() 메서드는 별도 쓰레드로 리포팅 기능을 실행하도록 구현하면 된다. 주기적으로 리포팅 기능을 실행할 경우 ExecutorService를 사용하면 편리하다.
public class CustomMonitorService implements MonitorService {
private ScheduledExecutorService scheduledExecutorService;
private int pollInterval;
private String hosts;
private MetricReporter metricReporter; // Runnable 구현
@Override
public void start() {
if (scheduledExecutorService == null || scheduledExecutorService.isShutdown() ||
scheduledExecutorService.isTerminated()) {
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
}
metricReporter = new MetricReporter(hosts);
scheduledExecutorService.scheduleWithFixedDelay(
metricReporter,
pollingIntervalSec,
pollingIntervalSec,
TimeUnit.SECONDS);
}
@Override
public void stop() {
scheduledExecutorService.shutdown();
}
...
주기적으로 메트릭 리포터 기능을 실행하기 위해 ScheduledExecutorService를 사용했다. start() 메서드에서 ExecutorService를 생성하고 scheduleWithFixedDelay() 메서드로 지정한 주기로 리포팅 기능을 실행하도록 했다.
MetricReporter 구현
MetricReporter는 ExecutorService를 이용해서 실행할 것이므로 Runnable을 구현한다. run() 메서드에서 메트릭을 읽어와 알맞은 처리를 하면 된다. 다음은 간단한 구현 예제이다.
import org.apache.flume.instrumentation.util.JMXPollUtil;
public class MetricReporter implements Runnable {
private final String hosts;
public FlumemonReporter(String hosts) {
this.hosts = hosts;
}
@Override
public void run() {
try {
List<String> metrics = new ArrayList<>();
Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans();
for (String component : metricsMap.keySet()) {
Map<String, String> attributeMap = metricsMap.get(component);
if (component.startsWith("CHANNEL.")) {
String name = "EventPutAttemptCount";
String value = attributeMap.get(name);
metrics.add(component + ", " + name + "=" + value);
}
}
metrics.forEach(System.out::println);
} catch (Exception e) {
}
}
}
JMXPollUtil.getAllMBeans()를 사용하면 flume이 제공하는 메트릭을 Map 형태로 구할 수 있다. 이 Map의 키는 컴포넌트 이름에 해당하는데, flume의 SOURCE, CHANNEL, SINK가 컴포넌트가 된다. 컴포넌트 이름은 "타입.이름"의 형태를 취한다.
metricsMap.get()은 개별 컴포넌트의 메트릭 목록을 맵 형태로 다시 구한다. 각 컴포넌트별로 제공하는 메트릭 목록이 다르다. 예를 들어, 채널의 경우 "EventPutAttemptCount", "EventPutSuccessCount"와 같은 메트릭을 제공한다. 각 메트릭 값은 String 타입으로 읽어오기 때문에, 필요시 Long이나 Integer와 같은 타입으로 알맞게 변환해주어야 한다.
Flume 실행시 MonitorService 지정하기
MonitorService를 구현했다면, 이제 남은 작업은 Flume을 실행할 때 구현한 클래스를 MonitorService로 사용하도록 지정하는 것이다. 다음과 같이 flume.monitoring.type 시스템 프로퍼터에 구현 클래스를 지정하면 된다.
bin/flume-ng agent --conf-file example.conf --name a1 \
-Dflume.monitoring.type=com.mycomp.MetricReporter -Dflume.monitoring.hosts=somehost
이 외에 구현 클래스에서 필요로 하는 시스템 프로퍼티를 -D 옵션으로 추가 지정한다. 물론, 구현한 클래스가 flume이 사용하는 클래스패스에 위치해야 한다.