주요글: 도커 시작하기
반응형

Kafka의 메트릭 리포트 부분을 확장할 일이 있어 메트릭 수집 관련 부분을 조사했는데, 그 내용을 정리한다.


구조


Kafka의 메트릭 리포팅과 관련된 클래스 구조는 아래 그림과 같다.



메트릭 리포팅 관련 타입은 크게 네 종류로 다음과 같다.

  • 카프카의 기반 타입
    • KafkaMetricsReporterMBean : 카프카 메트릭 리포터 MBean을 위한 기반 타입이다. 리포터의 시작/종료를 JMX로 조작하고 싶은 경우 이 타입을 상속받은 MBean 인터페이스를 만든다.
    • KafkaMetricsReporter : Kafka가 메트릭 리포터로 사용할 클래스가 구현할 타입이다.
  • Metrics의 기반 타입
    • AbstractReporter / AbstractPollingReporter : Metrics의 메트릭을 리포팅 할 리포터가 상속받을 클래스이다. 주기적으로 메트릭 값을 처리하려면 AbstractPollingReporter를 상속받아 구현한다.
    • MetricProcessor : Metrics의 메트리 종류별로 값을 받아 처리할 처리기가 구현해야 할 인터페이스이다.

Metrics는 메트릭 생성/관리를 위한 도구로 http://metrics.dropwizard.io/3.1.0/ 사이트를 참조한다.


Kafka로부터 메트릭을 받아 원하는 동작을 하고 싶다면 위 클래스 다이어그램에서 노란 색으로 표시한 타입을 알맞게 구현하면 된다. 그림에서 보는 것처럼 Kafka의 리포팅 관련 상위 타입을 구현한 타입 두 개와 Metrics와 관련된 타입 두 개를 구현하면 된다.


Kafka 리포터 구현


구현할 타입은 다음과 같다.

  • KafkaMetricsReporterMBean 인터페이스를 상속받은 CustomMetricsReporterMBean 인터페이스
  • KafkaMetricsReporter 인터페이스와 CustomMetricsReporterMBean 인터페이스를 상속받은 클래스
먼저 CustomReporterMBean 인터페이스는 다음과 같이 간단하다.

import kafka.metrics.KafkaMetricsReporterMBean;

public interface CustomMetricsReporterMBean extends KafkaMetricsReporterMBean {
}

MBean으로 노출하고 싶은 오퍼레이션이 추가로 있다면 CustomMetricsReporterMBean에 추가한다.


다음으로 KafkaMetricsReporter 인터페이스와 CustomMetricsReporterMBean 인터페이스를 상속받은 CustomMetricsReporterMBean 클래스를 구현한다. 이 클래스의 뼈대 코드는 다음과 같다.


import kafka.metrics.KafkaMetricsReporter;

import kafka.utils.VerifiableProperties;


public class CustomMetricsReporter implements KafkaMetricsReporter, CustomMetricsReporterMBean {


    private VerifiableProperties props;

    private CustomReporter underlying;

    ...기타 필드


    @Override

    public void init(VerifiableProperties props) {

        // 리포터를 초기화하고 리포팅을 시작한다.

    }


    @Override

    public void startReporter(long pollingPeriodSecs) {

        // underlying 리포팅 시작

    }


    @Override

    public void stopReporter() {

        // underlying 리포팅 종료

    }


    @Override

    public String getMBeanName() {

        return "kafka:type=com.mycomp.kafka.CustomMetricsReporter";

    }

}


init() 메서드 구현


init() 메서드는 Metrics의 리포터를 초기화하고 리포팅을 시작한다. 구현 코드는 다음과 같은 형태를 취한다.


import com.yammer.metrics.Metrics;

import kafka.metrics.KafkaMetricsConfig;

import kafka.metrics.KafkaMetricsReporter;

import kafka.utils.VerifiableProperties;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;


import java.net.InetAddress;

import java.net.UnknownHostException;

import java.util.concurrent.TimeUnit;


public class CustomMetricsReporter 

        implements KafkaMetricsReporter, CustomMetricsReporterMBean {


    private VerifiableProperties props; // 설정 파일의 프로퍼티 보관

    private CustomReporter underlying; // Metrics 리포터


    private final Object lock = new Object();

    private boolean initialized = false;

    ...기타 필드


    @Override

    public void init(VerifiableProperties props) {

        // props는 Kafka 설정 파일의 프로퍼티 값을 갖는다.

        synchronized (lock) {

            this.props = props;

            if (!initialized) {

                initializeReporter();

                initialized = true;

                KafkaMetricsConfig metricsConfig = new KafkaMetricsConfig(props);

                // metricsConfig.pollingIntervalSecs() --> kafka.metrics.polling.interval.secs 설정 값

                startReporter(metricsConfig.pollingIntervalSecs());

            }

        }

    }


    // Metrics 리포터 초기화

    private void initializeReporter() {

        // 리포터 초기화에 필요한 프로퍼티 값 사용

        String someProperty = props.getString("some.property", "defaultValue");

        underlying = new CustomReporter(Metrics.defaultRegistry(), someProperty);

    }


    @Override

    public void startReporter(long pollingPeriodSecs) {

        // underlying 리포팅 시작

    }


startReporter() 구현


startReporter() 메서드는 Metrics의 리포터인 underying의 start() 메서드를 실행한다. 이 메서드는 메트릭 리포팅을 주기적으로 실행한다.


import java.util.concurrent.TimeUnit;


public class CustomMetricsReporter implements KafkaMetricsReporter, CustomMetricsReporterMBean {


    private VerifiableProperties props;

    private CustomReporter underlying;


    private final Object lock = new Object();

    private boolean initialized = false;

    private boolean running = false;


    @Override

    public void startReporter(long pollingPeriodSecs) {

        // underlying 리포팅 시작

        synchronized (lock) {

            if (initialized && !running) {

                underlying.start(pollingPeriodSecs, TimeUnit.SECONDS);

                running = true;

            }

        }

    }



stopReporter() 구현


public class CustomMetricsReporter implements KafkaMetricsReporter, CustomMetricsReporterMBean {


    private VerifiableProperties props;

    private CustomReporter underlying;


    private final Object lock = new Object();

    private boolean initialized = false;

    private boolean running = false;

    ...기타 필드


    @Override

    public void stopReporter() {

        // underlying 리포팅 종료

        synchronized (lock) {

            if (initialized && running) {

                underlying.shutdown();

                running = false;

                initializeReporter();

            }

        }

    }


stopReporter()는 Metrics 리포터의 shutdown()을 실행해서 메트릭 리포팅을 중지한다. 위 코드에서 리포팅 중지후에 initializeReporter()를 다시 실행하는데, 그 이유는 startReporter()를 다시 실행할 때 사용할 리포터를 미리 초기화하기 위함이다.


Metrics 리포터 구현


앞서 Kafka의 리포터 구현에서 봤듯이 Kafka 리포터는 Metrics 리포터를 시작하고 중지하는 역할을 맡는다. 실제로 메트릭을 리포팅하는 역할은 Metrics 리포터가 처리한다.


Metrics 리포터를 위한 코드 뼈대는 다음과 같다.


import com.yammer.metrics.core.*;

import com.yammer.metrics.reporting.AbstractPollingReporter;


import java.util.concurrent.TimeUnit;


public class CustomReporter

        extends AbstractPollingReporter

        implements MetricProcessor<CustomReporter.Context> {


    ...// 필드


    public CustomReporter(MetricsRegistry metricsRegistry, String someValue) {

        super(metricsRegistry, "custom-reporter");

        this.someValue = someValue;

    }


    @Override

    public void start(long pollingPeriodSecs, TimeUnit unit) {

        super.start(pollingPeriodSecs, unit);

    }


    @Override

    public void shutdown() {

        try {

            super.shutdown();

        } finally {

        }

    }


    @Override

    public void run() {

        // 리포팅 코드 위치

    }


    @Override

    public void processMeter(MetricName name, Metered meter, Context context) throws Exception {

    }


    @Override

    public void processCounter(MetricName name, Counter counter, Context context) throws Exception {

    }


    @Override

    public void processHistogram(MetricName name, Histogram histogram, Context context) throws Exception {

    }


    @Override

    public void processTimer(MetricName name, Timer timer, Context context) throws Exception {

    }


    @Override

    public void processGauge(MetricName name, Gauge<?> gauge, Context context) throws Exception {

    }


    public class Context {

    }

}


start() 메서드와 shutdown() 메서드는 상위 클래스의 start()와 shutdown() 메서드를 호출한다. AbstractPollingReporter의 start() 메서드는 자체 Executor를 이용해서 리포팅을 실행한다.


run() 메서드 구현


run() 메서드에서 메트릭을 읽어와 원하는 리포팅 작업을 수행한다.


import com.yammer.metrics.core.*;


public class CustomReporter

        extends AbstractPollingReporter

        implements MetricProcessor<CustomReporter.Context> {


    ...


    @Override

    public void run() {

        // 리포트 시작


        //  모든 메트릭을 구한다.

        final Set<Map.Entry<MetricName, Metric>> metricEntries =

                getMetricsRegistry().allMetrics().entrySet();


        Context context = new Context(); // 메트릭 값을 담을 컨텍스트 생성

        for (Map.Entry<MetricName, Metric> entry : metricEntries) {

            final MetricName metricName = entry.getKey();

            final Metric metric = entry.getValue();

            try {

                // 각 메트릭을 처리한다. processWith의 첫 번째 인자의 타입은 MetricProcessor이다.

                // 이 클래스가 MetricProcessor를 구현했으므로 this를 첫 번째 인자로 전달한다.

                metric.processWith(this, metricName, context);

            } catch (Exception e) {

            }

        }

        ... context의 결과 이용해서 리포팅

        

        // 리포트 종료

    }


    // process 메서드에서 메트릭 타입별로 처리

    ...


MetricName과 Metric은 각각 메트릭의 이름과 값을 갖는다. metric.processWith() 메서드를 메트릭 타입에 따라서 알맞은 process 메서드를 실행하는데, 이 process 메서드를 이용해서 메트릭을 context에 수집한다.


Context와 process 메서드 구현


Context는 주로 필요한 메트릭의 결과를 수집하는 역할을 한다. 간단한 Context 클래스를 다음과 같이 구현할 수 있다. 앞서 뼈대 코드에서는 Context 클래스를 CustomReporter의 내부 클래스로 선언했다.


public class Context {

    public List<String> metrics = new LinkedList<>();

}


각 메트릭을 처리하는 process 메서드는 MetricProcessor 인터페이스에 정의되어 있는데, CustomReporter 클래스가 이 인터페이스를 구현했는데 각 process 메서드에서 알맞게 메트릭을 꺼내 Context에 보관한다.


public class CustomReporter

        extends AbstractPollingReporter

        implements MetricProcessor<CustomReporter.Context> {

    ...


    @Override

    public void processMeter(MetricName name, Metered meter, Context context) throws Exception {

        String metric = name.getName() + " = " + meter.count();

        context.metrics.add(metric);

    }


    @Override

    public void processCounter(MetricName name, Counter counter, Context context) throws Exception {

        ...

    }

    ... processHistogram, processTimer, processGauge 메서드 구현


process 메서드는 Context에 필요한 정보를 저장한다. 다시 run() 메서드로 돌아가서 각 메트릭을 처리한 뒤 다음 코드처럼 context에 보관된 메트릭 처리 결과를 알맞은 형식으로 리포팅하면 메트릭 리포팅이 완료된다.


    @Override

    public void run() {

        // 리포트 시작

        final Set<Map.Entry<MetricName, Metric>> metricEntries = ...;


        Context context = new Context();

        for (Map.Entry<MetricName, Metric> entry : metricEntries) {

            ...생략

        }

        // context에서 처리된 메트릭 값 읽어와 리포팅

        for (String metric : context.metrics) {

            System.out.println(metric);

        }

        // 리포트 종료

    }


이 코드에서는 간단히 System.out을 사용했지만, 메트릭 수집 결과를 DB에 보관한다거나 메트릭 저장소에 전송하는 등의 코드를 알맞게 구현하면 된다.


카프카 브로커 설정


구현한 메트릭 리포터를 사용하려면 카프카 설정에 kafka.metric.reporters 프로퍼티에 리포터 Kafka 리포터 클래스를 지정하면 된다.


kafka.metrics.reporters=com.mycom.kafka.CustomMetricsReporter

log.dirs=/data1/kafka

zookeeper.connect=zk1:2181,zk2:2181,zk3:2181


설정을 적용했다면 Kafka를 시작해 보자. 커스텀 메트릭 리포터가 동작하는 것을 확인할 수 있을 것이다.


+ Recent posts