주요글: 도커 시작하기
반응형

https://youtu.be/bBFi48Azvks

마리아DB 10.3, 10.4, 10.5에 있는 파티션 테이블과 관련된 치명적인 버그를 살펴봅니다.

반응형

kafka 아는 척하기1 영상: (카프카 기본 구조, 토픽/파티션, 성능, 리플리카 등, youtu.be/0Ssx7jJJADI)

kafka 아는 척하기 1

 

kafka 아는 척하기 2 영상: 프로듀서 편 (youtu.be/geMtm17ofPY)

 

 

kafka 아는 척하기 3 영상: 컨슈머 편 (youtu.be/xqrIDHbGjOY)

 

 

 

반응형

MariaDB에서 grant usage ... with max_statement_time 명령어를 사용하면 사용자별로 최대 쿼리 실행 시간을 지정할 수 있다. 다음은 명령어 실행 예를 보여준다. 이때 시간은 초 단위다(MySQL도 동일한 유사한 기능이 있는데 시간 단위는 밀리초이다).

GRANT USAGE ON *.* TO batchuser@'%' WITH MAX_STATEMENT_TIME 60

사용자별로 지정한 최대 쿼리 실행 시간은 글로벌 시간에 우선한다.

반응형

logs 테이블의 id 칼럼이 자동 증가 칼럼이라고 하자. 최근 id 칼럼 값이 4라고 할때 이 칼럼에 P1, P2, P3과 아래 그림과 같은 순서로 insert 쿼리를 실행한다고 하자.

P1이 세 번의 insert 쿼리를 실행하면 id 값은 5, 6, 7이 된다. P1의 트랜잭션이 끝나지 않은 상태에서 P2, P3가 각각 insert 쿼리를 실행하면 ID는 8, 9가 된다. P2와 P3는 트랜잭션이 끝나고 P1이 아직 트랜잭션 진행 중인 상태에서 P4가 id를 조회하면 [8, 9]를 리턴한다.

일정 간격으로 데이터를 복사할 경우 이 상황은 문제가 될 수 있다. 예를 들어 다음 로직을 사용해서 최신 데이터를 처리하는 로직이 있다고 하자.

  1. 이전에 처리한 최대 ID를 구해 lastProcessedId에 할당한다.
  2. 현재의 최대 ID를 구해 maxId에 할당한다.
  3. lastProcessedId보다 크고 maxId보다 작거나 같은 ID의 값 목록을 구해 ids에 할당한다.
  4. ids에 속한 데이터를 처리한다.

만약 lastProcessedId가 4이고 P4의 조회 시점에 maxId는 9라고 하자. 이 경우 과정 3에서 구하는 ids에는 5, 6, 7은 없고 8, 9만 담긴다. 과정 4를 처리한 뒤 다시 과정 1을 반복하면 이때 lastProcessedId는 9가 되어 5, 6, 7은 처리 대상에서 누락되는 문제가 발생한다.

이런 누락 문제가 발생하지 않도록 하려면 트랜잭션 격리 레벨을 높이거나 데이터 조회 시점과 최대 ID가 증가하는 시점에 차이를 둬야 한다. CDC(Change Data Capture)를 사용하는 방법도 있다.

  1. 애니멀봐 다시보기 2020.07.21 10:57

    잘 보고 갑니다~~

반응형

텐서플로우 윈도우 버전이 나와서 윈도우 8.1 환경에 설치하고 테스트를 해 봤다.


설치 과정에서 참고한 자료는 다음과 같다.

CUDA, cuDNN 설치

GPU 관련 설치 과정을 요약하면 다음과 같다.
  • https://developer.nvidia.com/cuda-downloads 사이트에서 CUDA Toolkit 8 버전 다운받아 설치한다. 용량이 1G를 넘으니 인내심이 필요하다. 기본 설치 경로는 C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v8.0이고, 설치가 끝나면 PATH에 관련 경로가 추가된다.
  • https://developer.nvidia.com/cudnn 사이트에서 cuDNN 다운받는다. 회원 가입을 해야 다운로드 받을 수 있다.  다운 받은 파일을 C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v8.0에 압축풀면 된다.
아나콘다(Anaconda)와 텐서플로우(Tensorflow) 설치

아나콘다를 이용해서 파이선 3.5를 설치했다. https://www.continuum.io/downloads 사이트에서 아나콘다를 다운 받아 설치하면 된다.

GPU 버전 텐서플로우와 CPU 버전의 텐서플로우를 테스트하기 위해 두 개의 가상 환경을 만들었다. 먼저 GPU 버전 텐서플로우를 위한 환경을 만들고 설치에 사용한 명령어는 다음과 같다.

C:\>conda create -n tensorflow-gpu python=3.5
C:\>activate tensorflow-gpu
(tensorflow-gpu) C:\>pip install tensorflow-gpu

다른 창을 열어 CPU 버전 텐서플로우 환경을 생성했다.

C:\>conda create -n tensorflow-cpu python=3.5
C:\>activate tensorflow-cpu
(tensorflow-cpu) C:\>pip install tensorflow

간단한 성능 확인

간단하게 성능을 비교하기 위해 '텐서플로 첫걸음' 책의 CNN 예제를 돌려봤다. 참고로 PC 환경은 다음과 같다. 장비1은 GPU가 있고, 장비2에는 GPU가 없다.
  • 장비1
    • CPU: Intel Core i7-4712MQ CPU @ 2.30 GHz
    • RAM: 16G
    • GPU: GeForce 840M 5.0, memoryClockRate (GHz) 1.124, memory 2.0GiB
  • 장비2
    • CPU: Intel Core i7-6700 CPU @ 3.40 GHz x 8
학습을 1000회 실행하는데, 50회마다 시간을 출력하고 학습이 끝나면 시간을 출력했다. GPU와 CPU에 대해 각 3회씩 실행하고 그 평균을 기록했다.

step GPU (초)
장비1
CPU (초)
장비1

장비1 CPU/GPU (시간 비율)

 CPU2

장비2

0 1.119 0.056

 0.028

50

3.004

7.751 2.58 3.646 
100 4.817 15.417 3.20 7.282 
150 6.632 23.008 3.47 10.922 
200 8.445 30.666 3.63 14.557 
250 10.261 38.283 3.73 18.212 
300 12.077 45.845 3.80 21.833 
350 13.891 53.471 3.85 25.491 
400 15.704 61.094 3.89 29.110 
450 17.518 68.672 3.92 32.755 
500 19.333 76.37 3.95

36.380 

550 21.15 83.973 3.97 40.023 
600 22.964 91.584 3.99 43.640 
650 24.777 99.26 4.01 47.275 
700 26.593 106.839 4.02 50.911 
750 28.409 114.4 4.03 54.559 
800 30.222 122.071 4.04 58.191 
850 32.036 129.627 4.05 61.825 
900 33.852 137.188 4.05 65.461 
950 35.666 144.953 4.06 69.089 
전체 37.471 152.721 4.08

72.702 


실행 결과를 보면 장비1에서 GPU를 사용한 결과가 CPU를 사용한 결과보다 4배 정도 빠른 것을 알 수 있다. 또한, 장비2의 CPU 결과보다도 2배 정도 빠른 것을 알 수 있다.


GPU와 OOM 주의 사항


GPU로 예제를 돌릴 때 다음 코드에서 OOM when allocating tensor with shape[10000,32,28,28]  에러가 발생했다.


print("test accuracy %g"% sess.run(accuracy, feed_dict={

     x: mnist.test.images, y_: mnist.test.labels, keep_prob: 1.0}))


에러가 발생하는 이유는 GPU 메모리가 부족하기 때문이다. 이를 처리하는 가장 쉬운 방법은 mnist.test.images 전체가 아닌 일부만 구해서 정확도를 구하는 것이다.


for i in range(200):

    testSet = mnist.test.next_batch(50)

    print("test accuracy %g" % sess.run(accuracy, 

            feed_dict={ x: testSet[0], y_: testSet[1], keep_prob: 1.0}))



* CNN 테스트에 사용한 코드: https://github.com/madvirus/tfstudy/blob/master/nn/cnn.py



반응형

기존 access 로그를 이용해서 데이터를 추출해야 할 일이 생겨서 ELK를 사용했다. 앞으로도 비슷한 요구가 생기면 유사한 설정, 코드를 사용할 것 같아 기록으로 남긴다.


로그스태시(logstash) 설정 파일


여러 웹 어플리케이션의 access 로그 파일을 로그스태시를 이용해서 일레스틱서치에 밀어 넣었다. 각 웹 어플리케이션마다 미세하게 파싱해야 하는 내용이 달라, 로그스태시 설정 파일을 웹 어플리케이션 별로 하나씩 만들었다. 다음은 그 파일 중 하나이다.


input {

  stdin {

    type => "web"

    add_field => {

      appname => "myappname"

    }

  }

}


filter {

  grok {

    match=> { message => "%{IPORHOST:clientip} %{HTTPDUSER:ident} %{HTTPDUSER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{URIPATH:uripath}(?<query>(\?\S+)?)(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response} (?:%{NUMBER:bytes}|-)"}

  }

  date {

    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]

  }

  grok {

    match => {

      uripath => "(/im/(?<context>(ui))/\S*)|(/gcs/(?<context>(\S+))/.*)"

    }

  }

  mutate {

    remove_field => ["query", "uripath"]

    add_field => {

      "methoduri" => "%{verb} %{uripath}"

    }

  }

  metrics {

    meter => "documents"

    add_tag => "metric"

    flush_interval => 60

  }

}


output {

  if "_grokparsefailure" not in [tags] {

    elasticsearch {

      hosts => "일레스틱서버주소"

      flush_size => 20000

    }

  }


  if "metric" in [tags] {

    stdout {

      codec => line {

        format => "1m rate: %{[documents][rate_1m]} ( %{[documents][count]} )"

      }

    }

  }

}


설정에서 특이한 점은 다음과 같다.
  • input의 stdin: 기존 로그 파일을 cat 명령어를 이용해서 logstash에 전달할거라서 stdin을 입력으로 설정했다.
  • filter의 첫 번째 grok: match의 message 패턴으로 로그스태시가 기본 제공하는 HTTPD_COMMONLOG 패턴을 사용하지 않은 이유는 요청 경로에서 쿼리 문자열과 요청 URI를 분리할 필요가 있었기 때문이다. 참고로 HTTPD_COMMONLOG 패턴은 쿼리문자열을 포함한 요청 경로를 request 필드로 추출한다.
  • filter의 date: 로그의 timestamp 필드를 이벤트의 @timestamp 값으로 사용한다. date를 사용하지 않으면 로그스태시가 로그 파일을 넣는 시점을 @timestamp 값으로 사용하기 때문에 기간별 로그 분석을 하기 어려워진다.
  • filter의 mutate: 요청 방식과 요청 URI를 합쳐서 하나로 분석해야 해서 이 둘을 연결한 값을 새로운 필드로 추가한다.
  • filter의 metrics: 로그스태시가 얼마나 처리하는지 보기 위해 메트릭 값을 60초마다 생성한다.
  • output의 elasticsearch: flush_size 값은 일레스틱서치 서버의 상황을 고려해 설정한다. 1000~5000 사이에서 시작해서 테스트하면서 점진적으로 늘려서 처리속도를 높여나간다.
  • output의 stdout: metrics가 생성한 메트릭 값을 콘솔에 출력한다. 초당 몇 개씩 처리하는지 1분 주기로 콘솔에 찍혀서 그나마 덜 심심하다.

로그 밀어넣기 위한 실행 스크립트


기존 로그 파일을 특정 폴더에 모아두고, 그 파일을 로그스태시로 일레스틱서치에 쭈욱 밀어 넣었다. 파일 하나하나 수동으로 하면 힘드니까 다음의 쉘 파일을 만들어서 실행했다.


#!/bin/bash

FILES="./로그폴더/*.log"

for f in $FILES

do

  echo "$f start"

  SECONDS=0

  cat $f | ../logstash/bin/logstash -b 2500 --config app1.logstash.conf

  duration=$SECONDS

  echo "$f done [$(($duration / 60))m $(($duration % 60))s elapsed]"

  echo "---"

  sleep 3

done


logstash의 -b 옵션은 프로세스 파이프라인 배치 크기이다. 파이프라인 워커 개수는 기본이 8이다. 워커개수*배치크기를 elasticsearch의 flush_size와 맞췄다.

테스트하는 동안 심심함도 달래고 성능도 확인할 겸 로그 파일 한 개를 처리하는데 걸리는 시간을 측정해서 값을 출력하도록 했다.


실행 결과


작성한 쉘 스크립트를 돌리면 콘솔에 다음과 같이 시간 값이 찍혀서 덜 심심하다.


$ ./logdump.app1.sh

...

./로그폴더/access_2016_04_06.log start

Settings: Default pipeline workers: 8

Pipeline main started

1m rate: 6336.785481508631 ( 407172 )

1m rate: 6937.548787233175 ( 856873 )

Pipeline main has been shutdown

stopping pipeline {:id=>"main"}

./misweb_in1_rest/access_2016_04_06.log done [2m 17s elapsed]

...

...


일단 1-2개 로그 파일로 작업해보고 퇴근 전에 nohup을 걸어서 나머지를 밀어 넣었다.


$ nohup ./logdump.app1.sh > dump.log 2>&1 &



반응형

요즘 스파크를 공부중인데, 독립 모드 클러스터를 설치하고 테스트하고 싶어졌다. 마음대로 쓸 수 있는 장비가 없어서 Vagrant를 이용하기로 했다. Vagrant로 리눅스 VM 4개를 생성하고, 4대에 스파크를 설치해서 독립 모드로 클러스터를 만들어봤다.


이 글은 Virtualbox와 Vagrant를 이미 설치했다는 가정하에 진행한다.


1. 준비


Vagrant 설정 파일이 위치할 폴더를 생성한다. 이 글에선 c:\work\spark-vagrant 폴더를 사용한다. 이 폴더를 만들었다면 다음의 두 파일을 c:\work\spark-vagrant\programs 폴더에 다운로드한다.

  • 리눅스용 JDK 1.8 - 이 글에서 다운로드 받은 파일은 jdk-8u73-linux-x64.gz이다.
  • 스파크 1.6.0 - 이 글에서 다운로드 받은 파일은 spark-1.6.0-bin-hadoop2.6.tgz이다.


2. 4개 VM을 위한 Vagrant 설정


Vagrant를 이용해서 VM을 생성할 때 필요한 파일을 만든다.


먼저 vagrant의 insecure_private_key 파일을 복사한다. 이 파일은 ssh 연결 용으로 사용한다.


copy c:\Users\로그인계정\.vagrant.d\insecure_private_key c:\work\spark-vagrant


다음으로 c:\work\spark-vagrant\hosts 파일을 생성한다.


127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4

::1         localhost localhost.localdomain localhost6 localhost6.localdomain6

192.168.67.101 c6701.spark.apache.org c6701

192.168.67.102 c6702.spark.apache.org c6702

192.168.67.103 c6703.spark.apache.org c6703

192.168.67.104 c6704.spark.apache.org c6704

192.168.67.105 c6705.spark.apache.org c6705

192.168.67.106 c6706.spark.apache.org c6706

192.168.67.107 c6707.spark.apache.org c6707

192.168.67.108 c6708.spark.apache.org c6708

192.168.67.109 c6709.spark.apache.org c6709

192.168.67.110 c6710.spark.apache.org c6710


192.168.67.101부터 192.168.67.110까지 설정은 본인 C:\Windows\System32\drivers\etc\hosts 파일에도 동일하게 추가한다. 


c:\work\spark-vagrant\resolv.conf 파일을 다음과 같이 생성한다.


; generated by /sbin/dhclient-script

search spark.apache.org

nameserver 8.8.8.8


c:\work\spark-vagrant\bootstrap.sh 파일을 다음과 같이 생성한다.


#!/usr/bin/env bash


cp /vagrant/hosts /etc/hosts

cp /vagrant/resolv.conf /etc/resolv.conf

yum install ntp -y

service ntpd start

service iptables stop

chkconfig ntpd on

chkconfig iptables off

mkdir -p /root/.ssh; chmod 600 /root/.ssh; cp /home/vagrant/.ssh/authorized_keys /root/.ssh/

cp /vagrant/insecure_private_key /root/.ssh/id_rsa; chmod 600 /root/.ssh/id_rsa


#Again, stopping iptables

/etc/init.d/iptables stop


# Increasing swap space

sudo dd if=/dev/zero of=/swapfile bs=1024 count=3072k

sudo mkswap /swapfile

sudo swapon /swapfile

echo "/swapfile       none    swap    sw      0       0" >> /etc/fstab


마지막으로 c:\work\spark-vagrant\Vagrantfile 파일을 작성한다.


# -*- mode: ruby -*-

# vi: set ft=ruby :


# Vagrantfile API/syntax version. Don't touch unless you know what you're doing!

VAGRANTFILE_API_VERSION = "2"


Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|

  config.vm.box = "centos6.7"

  config.ssh.insert_key = false


  # CentOS 6.7 x86_64, 아래는 한 줄임

  config.vm.box_url = "https://github.com/CommanderK5/packer-centos-template/releases/download/0.6.7/vagrant-centos-6.7.box"


  config.vm.provider :virtualbox do |vb|

    vb.customize ["modifyvm", :id, "--memory", 2048] # RAM allocated to each VM

  end


  config.vm.provision :shell, :path => "bootstrap.sh"


  config.vm.define :c6701 do |c6701|

    c6701.vm.hostname = "c6701.spark.apache.org"

    c6701.vm.network :private_network, ip: "192.168.67.101"

  end


  config.vm.define :c6702 do |c6702|

    c6702.vm.hostname = "c6702.spark.apache.org"

    c6702.vm.network :private_network, ip: "192.168.67.102"

  end


  config.vm.define :c6703 do |c6703|

    c6703.vm.hostname = "c6703.spark.apache.org"

    c6703.vm.network :private_network, ip: "192.168.67.103"

  end


  config.vm.define :c6704 do |c6704|

    c6704.vm.hostname = "c6704.spark.apache.org"

    c6704.vm.network :private_network, ip: "192.168.67.104"

  end


end



3. VM 실행


명령프롬프트를 실행하고 c:\work\spark-vagrant 폴더로 이동해서 vagrant 명령어로 VM을 실행한다.


C:\work\spark-vagrant>vagrant up


4개 VM을 모두 만들었다면 vagrant ssh 명령어로 각 VM에 ssh로 연결되는지 확인한다.


C:\work\spark-vagrant>vagrant ssh c6701

[vagrant@c6701 ~]$ exit

logout

Connection to 127.0.0.1 closed.


C:\work\spark-vagrant>vagrant ssh c6702

[vagrant@c6702 ~]$ exit

logout

Connection to 127.0.0.1 closed.


c6701, c6702, c6703, c6704에 대해 각각 확인한다.


4. 각 VM에 암호 없이 ssh 연결할 수 있는지 확인


앞서 작성한 bootstrap.sh 파일을 보면 insecure_private_key 파일을 root 계정의 인증키로 복사한 것을 알 수 있다. 이를 한 이유는 root 계정으로 c6701, c6702, c6703, c6704 간에 암호 없이 ssh로 연결하기 위함이다. 다음과 같이 su 명령어로 root 계정으로 바꾼 뒤, ssh를 이용해서 다른 호스트에 암호 없이 연결되는지 확인한다.


C:\work\spark-vagrant>vagrant ssh c6701

Last login: Tue Mar  8 05:48:27 2016 from 10.0.2.2

[vagrant@c6701 ~]$ su -

Password: (암호는 vagrant)

[root@c6701 ~]# ssh c6702

The authenticity of host 'c6702 (192.168.67.102)' can't be established.

RSA key fingerprint is 5c:97:4b:96:a2:41:a8:44:cc:70:b1:5e:8d:a7:a5:3b.

Are you sure you want to continue connecting (yes/no)? yes

Warning: Permanently added 'c6702,192.168.67.102' (RSA) to the list of known hosts.

[root@c6702 ~]#



5. 각 VM에 JDK와 스파크 설치


c:\work\spark-vagrant 폴더는 /vagrant로 마운트되므로, 다음 명령어를 다운로드 받은 JDK 파일의 압축을 /usr/local에 푼다.


[root@c6701 ~]# ssh c6701 'tar xzf /vagrant/programs/jdk-8u73-linux-x64.gz -C /usr/local'

[root@c6701 ~]# ssh c6702 'tar xzf /vagrant/programs/jdk-8u73-linux-x64.gz -C /usr/local'

[root@c6701 ~]# ssh c6703 'tar xzf /vagrant/programs/jdk-8u73-linux-x64.gz -C /usr/local'

[root@c6701 ~]# ssh c6704 'tar xzf /vagrant/programs/jdk-8u73-linux-x64.gz -C /usr/local'


압축을 풀었다면, java가 제대로 설치됐는지 확인한다.


[root@c6701 ~]# /usr/local/jdk1.8.0_73/bin/java -version

java version "1.8.0_73"

Java(TM) SE Runtime Environment (build 1.8.0_73-b02)

Java HotSpot(TM) 64-Bit Server VM (build 25.73-b02, mixed mode)


비슷한 방식으로 스파크 파일의 압축을 /usr/local에 푼다.


[root@c6701 ~]# ssh c6701 'tar xzf /vagrant/programs/spark-1.6.0-bin-hadoop2.6.tgz -C /usr/local'

[root@c6701 ~]# ssh c6702 'tar xzf /vagrant/programs/spark-1.6.0-bin-hadoop2.6.tgz -C /usr/local'

[root@c6701 ~]# ssh c6703 'tar xzf /vagrant/programs/spark-1.6.0-bin-hadoop2.6.tgz -C /usr/local'

[root@c6701 ~]# ssh c6704 'tar xzf /vagrant/programs/spark-1.6.0-bin-hadoop2.6.tgz -C /usr/local'


6. spark-env.sh 파일 작성


각 VM(c6701, c6702, c6703, c6704)에서 /usr/local/spark-1.6.0-bin-hadoop2.6/conf 폴더로 이동한 뒤 spark-env.sh 파일을 작성한다. 먼저 spark-env.sh.template 파일을 복사해서 spark-env.sh 파일을 생성한다.


[root@c6701 ~]# cd /usr/local/spark-1.6.0-bin-hadoop2.6/conf

[root@c6701 conf]# cp spark-env.sh.template spark-env.sh


spark-env.sh 파일의 맨 마지막에 JAVA_HOME 환경변수를 추가한다.


# spark-env.sh 파일 마지막에 추가한다.

JAVA_HOME=/usr/local/jdk1.8.0_73


c6701에 spark-env.sh을 만든 뒤, 다음과 같이 scp를 이용해서 복사해도 된다.


[root@c6701 conf]# scp spark-env.sh root@c6702:/usr/local/spark-1.6.0-bin-hadoop2.6/conf

spark-env.sh     100% 4243     4.1KB/s   00:00



7. slaves 파일 작성


slaves 파일은 마스터 역할을 할 c6701 장비에만 작성한다. /usr/local/spark-1.6.0-bin-hadoop2.6/conf 위치에 다음과 같이 slaves 파일을 생성한다.


c6702

c6703

c6704



8. 스파크 단독 모드 클러스터 실행


자, 이제 클러스터를 실행할 차례이다. c6701의 /usr/local/spark-1.6.0-bin-hadoop2.6 에서 sbin/start-all.sh을 이용해서 마스터와 워커를 실행한다.


[root@c6701 spark-1.6.0-bin-hadoop2.6]# sbin/start-all.sh

starting org.apache.spark.deploy.master.Master, logging to /usr/local/spark-1.6.0-bin-hadoop2.6/logs/spark-root-org.apache.spark.deploy.master.Master-1-c6701.spark.apache.org.out

c6702: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/spark-1.6.0-bin-hadoop2.6/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-c6702.spark.apache.org.out

c6704: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/spark-1.6.0-bin-hadoop2.6/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-c6704.spark.apache.org.out

c6703: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/spark-1.6.0-bin-hadoop2.6/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-c6703.spark.apache.org.out

[root@c6701 spark-1.6.0-bin-hadoop2.6]#


마스터와 워커를 실행했으니 이제 확인할 차례다. http://c6701.spark.apache.org:8080 주소를 웹 브라우저에 입력해보자. 잠시 후 다음과 같은 결과 화면을 볼 수 있을 것이다.



9. 스파크 클러스터 종료와 vagrant 종료


sbin/stop-all.sh을 사용하면 스파크 클러스터를 종료한다.


VM에서 로그아웃한 뒤, 명령 프롬프트에서 vagrant halt 명령어를 실행하면 전체 VM을 종료한다.






반응형

러닝 스파크(Learning Spark) 책의 2장을 따라하는데, 윈도우에서 spark-shell을 실행하면 다음과 같은 이상한 에러가 발생한다.


C:\devtool\spark-1.6.0-bin-hadoop2.6>bin\spark-shell

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).

log4j:WARN Please initialize the log4j system properly.

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties

To adjust logging level use sc.setLogLevel("INFO")

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0

      /_/


Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_51)

...

16/02/17 13:09:58 WARN : Your hostname, user resolves to a loopback/non-reachable address: fe80:0:0:0:35a2:f2cc:2a09:416

e%eth8, but we couldn't find any external IP address!

java.lang.RuntimeException: java.lang.RuntimeException: java.io.IOException: 액세스가 거부되었습니다

        at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)

        at org.apache.spark.sql.hive.client.ClientWrapper.<init>(ClientWrapper.scala:194)

        ...(너무 길어 생략)

Caused by: java.io.IOException: 액세스가 거부되었습니다

        at java.io.WinNTFileSystem.createFileExclusively(Native Method)

        at java.io.File.createTempFile(File.java:2024)

        at org.apache.hadoop.hive.ql.session.SessionState.createTempFile(SessionState.java:818)

        at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:513)

        ... 62 more


<console>:16: error: not found: value sqlContext

         import sqlContext.implicits._

                ^

<console>:16: error: not found: value sqlContext

         import sqlContext.sql

                ^


scala>


이 에러가 발생하지 않도록 하려면 다음의 두 가지를 해 주면 된다.

  1. winutils.exe 파일을 c:\hadoop\bin에 복사한다. c:\haoop을 HADOOP_HOME 환경 변수로 설정한다. (winutils.exe 파일은 https://github.com/steveloughran/winutils/tree/master/hadoop-2.6.0/bin 에서 다운로드 받을 수 있다)
  2. 관리자 권한으로 명령 프롬프트를 열고 "C:\Hadoop\bin\winutils.exe chmod 777 /tmp/hive" 명령어를 실행한다.

이제 관리자 권한으로 실행한 명령 프롬프트에서 bin\spark-shell을 실행한다. 다음과 같이 정상으로 실행되는 것을 확인할 수 있다.


C:\devtool\spark-1.6.0-bin-hadoop2.6>bin\spark-shell

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).

log4j:WARN Please initialize the log4j system properly.

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties

To adjust logging level use sc.setLogLevel("INFO")

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0

      /_/


Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_51)

Type in expressions to have them evaluated.

Type :help for more information.

Spark context available as sc.

16/02/17 13:20:05 WARN General: Plugin (Bundle) "org.datanucleus" is already registered. Ensure you dont have multiple J

...생략

16/02/17 13:20:12 WARN : Your hostname, user resolves to a loopback/non-reachable address: fe80:0:0:0:35a2:f2cc:2a09:416

e%eth8, but we couldn't find any external IP address!

...생략

16/02/17 13:20:18 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException

SQL context available as sqlContext.


scala> val lines = sc.textFile("README.md")

lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:27


scala>




반응형

암바리(https://ambari.apache.org/)는 오픈소스 하둡 관리 도구이다. 설치부터, 설정 관리, 오퍼레이션, 모니터링, 통지 등 하둡 클러스터를 관리하는데 필요한 기본 기능을 제공한다. 암바리를 사용하다보면 레거시 환경의 통지 시스템을 이용해서 암바리의 문제 상황을 문자나 메신저 등으로 통지 받고 싶을 때가 있다. 하지만 아쉽게도 암바리는 이메일과 SNMP 트랩만 이용해서 문제를 통지하기 때문에, 암바리 Alert을 레거시 환경에 바로 붙일 수 없다.


왓츠업과 같이 SNMP를 지원하는 도구를 이용해서 이미 모니터링을 하고 있다면 이를 사용하면 된다. SNMP 기반의 모니터링 도구가 없거나 적합하지 않은 경우에는 암바리가 발생하는 SNMP 트랩을 직접 수신해서 처리하는 방식으로 레거시와 연동을 처리해야 한다.


[암바리 SNMP Alert 설정 화면]


암바리의 Alert SNMP 트랩 구성


암바리는 Alert 발생시 SNMP Alert 설정 화면에서 입력한 정보를 이용해서 트랩을 발생시킨다.

  • OID : 1.3.6.1.6.3.1.1.5.4와 같은 OID 문자열
  • Hosts : 트랩을 수신할 SNMP 트랩 리시버 서버의 호스트
  • Port : 리시버 서버가 사용할 포트 번호 (UDP 포트)
OID 값으로 "1.3.6.1.6.3.1.1.5.4"를 입력한 경우 트랩은 다음과 같이 구성된다.
  • OID(1.3.6.1.6.3.1.1.4.1.0), OBJECT IDENTIFIER[6] = 1.3.6.1.6.3.1.1.5.4
  • OID(1.3.6.1.6.3.1.1.5.4), OCTET STRING[4] = 상세메시지(body)
  • OID(1.3.6.1.6.3.1.1.5.4), OCTET STRING[4] = 요약메시지(subject)

상세 메시지는 몸체 내용에 해당하며, 다음과 같은 구조의 메시지를 갖는다. (마지막 줄에 공백 문자가 존재한다.)


\n

[Alert] Supervisor Process\n

[Service] STORM\n

[Component] SUPERVISOR\n

[Host] skt-phddtn3\n

\n

TCP OK - 0.000s response on port 56431\n

    


요약 메시지는 다음과 같은 구조를 갖는다.(마지막 줄에 공백 문자가 존재한다.)


\n

      [OK] Supervisor Process\n

    


요약 메시지는 Alert의 심각도로 시작한다. 심각도에는 OK, WARNING, CRITICAL, UNKNOWN이 존재한다.


트랩 리시버는 OID로부터 값을 읽어와 메시지를 알맞게 변환한 뒤 심각도에 따라 알맞은 처리를 하면 된다. 예를 들어, 심각도가 CRITICAL이면 SMS로 문자를 발송하고, WARNING이면 메신저로 메시지를 보내는 등의 작업을 하면 된다.


SNMP4J를 이용한 암바리 트랩 수신


트랩을 받기 위한 SNMP 도구가 따로 없다면 직접 트랩 리시버를 구현해야 한다. 다행히 SNMP 자체를 잘 몰라도 SNMP4J(http://www.snmp4j.org/)를 사용하면 쉽게 암바리 Alert 처리를 위한 트랩 리서버를 구현할 수 있다. 다음은 SNMP4J를 이용한 트랩 리시버를 구현한 코드 예이다.


public class TrapReceiver {

    private int port;

    private MultiThreadedMessageDispatcher dispatcher;

    private Snmp snmp = null;

    private Address listenAddress;

    private ThreadPool threadPool;


    private NotificationHandler notificationHandler;

    private int threadPoolSize;


    public TrapReceiver(int port) {

        this.port = port;

        this.threadPoolSize = 10;

    }


    public void init() throws IOException {

        threadPool = ThreadPool.create("Trap", threadPoolSize);

        dispatcher = new MultiThreadedMessageDispatcher(

              threadPool, new MessageDispatcherImpl());

        listenAddress = GenericAddress.parse("udp:0.0.0.0/" + port);

        TransportMapping<?> transport = 

                new DefaultUdpTransportMapping((UdpAddress) listenAddress);


        snmp = new Snmp(dispatcher, transport);

        snmp.getMessageDispatcher().addMessageProcessingModel(new MPv1());

        snmp.getMessageDispatcher().addMessageProcessingModel(new MPv2c());


        snmp.listen();


        snmp.addCommandResponder(new CommandResponder() {

            @Override

            public void processPdu(CommandResponderEvent event) {

                handleNotification(event);

            }

        });

    }


    private void handleNotification(CommandResponderEvent event) {

        if (notificationHandler == null) {

            return;

        }

        notificationHandler.handle(createNotification(event));

    }


    private Notification createNotification(CommandResponderEvent event) {

        OID trapOID = (OID) event.getPDU().getVariable(SnmpConstants.snmpTrapOID);

        List<String> values = new ArrayList<>(2);

        Vector<? extends VariableBinding> varBinds = event.getPDU().getVariableBindings();

        if (varBinds != null && !varBinds.isEmpty()) {

            Iterator<? extends VariableBinding> varIter = varBinds.iterator();

            while (varIter.hasNext()) {

                VariableBinding var = varIter.next();

                if (var.getOid().equals(trapOID)) {

                    values.add(var.getVariable().toString());

                }

            }

        }

        return new Notification(values.get(0), values.get(1));

    }


    public void close() {

        try {

            snmp.close();

        } catch (IOException e) {

        }

        threadPool.stop();

    }


    public void setNotificationHandler(NotificationHandler notificationHandler) {

        this.notificationHandler = notificationHandler;

    }

}


이 코드에서 createNotification() 메서드가 트랩 메시지를 처리하는 코드이다. 암바리가 생성하는 트랩은 지정한 OID에 해당하는 첫 번째 변수에 상세 내용(몸체), 두 번째 변수에 요약 내용(제목)을 메시지로 전달한다. 따라서, 지정 OID를 갖는 메시지 값을 차례대로 저장한 뒤 각각 순서대로 몸체와 제목으로 사용하면 된다. 제목은 [OK], [CRITICAL]는 심각도를 포함하고 있으므로 이 값을 이용해서 트랩을 어떻게 처리할지 결정할 수 있다.


몸체 메시지에는 서비스 종류, 호스트 정보 등이 포함되어 있으므로 이 정보를 사용해서 서비스 담당자별로 통지하거나 장비 담당자에게 메시지를 보내는 것과 같은 처리를 할 수 있다.


반응형

스톰은 토폴로지가 메트릭을 만들 때 필요한 API를 제공하고 있다. 이 API를 사용하면 내가 만든 토폴로지에서 원하는 메트릭을 발생하고 이를 쉽게 처리할 수 있다.


IMetric으로 토폴로지에서 메트릭 발생하기


토폴로지에서 메트릭을 발생시키는 방법은 다음과 같이 간단하다.

  1. 메트릭을 발생시키고 싶은 Bolt의 prepare() 메서드 또는 Spout의 open() 메서드에서 IMetric을 구현한 객체를 생성하고 메트릭으로 등록한다.
  2. 1에서 생성한 IMetric 객체를 이용해서 필요할 때 메트릭 값을 조정한다.

다음은 스톰 스타터 샘플에 있는 Bold에 메트릭 생성 기능을 추가한 예이다.


public class ExclamationBolt extends BaseRichBolt {

    OutputCollector _collector;


    transient CountMetric countMetric;

    transient MultiCountMetric wordCountMetric;


    @Override

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

        _collector = collector;


        // IMetric을 구현한 객체를 생성하고

        countMetric = new CountMetric();

        wordCountMetric = new MultiCountMetric();


        // context에 메트릭을 등록

        context.registerMetric("word_count", wordCountMetric, 10);

        context.registerMetric("execute_count", countMetric, 20);

    }


    @Override

    public void execute(Tuple tuple) {

        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));

        _collector.ack(tuple);


        String word = tuple.getString(0);


        // 튜플 처리할 때 마다 원하는 메트릭 값을 조정

        countMetric.incr();

        wordCountMetric.scope(word).incr();

    }


    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("word"));

    }


}


이 코드에서 context.registerMetric() 메서드의 파라미터는 순서대로 메트릭 이름, 메트릭 객체, 메트릭 버킷 크기를 의미한다. 메트릭 버킷 크기는 초 단위이다. 위 코드에서 "word_count" 메트릭 이름은 10초 메트릭 버킷 크기를 설정했는데, 이 경우 스톰은 10초 마다 해당 메트릭 객체로부터 값을 읽어오고 메트릭을 리셋한다.(정확하게는 10초 마다 메트릭 객체의 getValueAndReset() 메서드를 호출한다.)


예를 들어, 0-10초 사이에 countMetric.incr()을 4번 호출했고, 10-20초 사이에 countMetric.incr()을 5번 호출했다고 하자. 이 경우 스톰이 읽어가는 메트릭 값은 10초에 4이고 20초에 5가 된다.


IMetricsConsumer로 메트릭 수집하기


스톰은 메트릭 객체에서 값을 읽어와 그 값을 IMetricsConsumer에 전달한다. 즉, 스톰 토폴로지가 발생한 메트릭을 알맞게 처리하고 싶다면 IMetricsConsumer를 구현한 클래스를 작성하면 된다.


IMetricsConsumer 인터페이스는 다음과 같이 정의되어 있다.


public interface IMetricsConsumer {

    void prepare(Map stormConf, Object registrationArgument, 

            TopologyContext context, IErrorReporter errorReporter);

    void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints);

    void cleanup();

}


각 메서드는 다음과 같다.

  • prepare() : DB나 소켓 연결처럼 메트릭 처리에 필요한 사전 준비를 한다.
  • handleDataPoints() : 토폴로지에서 읽어온 메트릭을 처리한다.
  • cleanup() : DB나 소켓 등 종료 처리를 한다.

토폴로지 이름이 필요한 경우 prepare() 메서드에서 다음과 같은 방법으로 토폴로지 이름을 구할 수 있다.


import backtype.storm.Config;

import backtype.storm.metric.api.IMetricsConsumer;

import backtype.storm.task.IErrorReporter;

import backtype.storm.task.TopologyContext;


public class MetricsCollector implements IMetricsConsumer {

    public static final Logger LOG = LoggerFactory.getLogger(MetricsCollector.class);;


    private String topologyName;


    private DwmonMetricSender metricSender;


    @Override

    public void prepare(Map stormConf, Object registrationArgument, 

            TopologyContext context, IErrorReporter errorReporter) {

        LOG.info("Preparing Metric Collector");


        topologyName = (String) stormConf.get(Config.TOPOLOGY_NAME);

    }


handleDataPoints()에서 메트릭 값 처리하기


스톰은 토폴로지가 등록한 메트릭에서 주기적으로 값을 읽어와 IMetricsConsumer#handleDataPoints() 메서드에서 전달하므로, 이 메서드에서 메트릭을 알맞게 처리하면 된다.


public class MetricsCollector implements IMetricsConsumer {


    @Override

    public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {

        // taskInfo.srcWorkerHost: String - 워커 호스트

        // taskInfo.srcWorkerPort: int - 워커 포트 

        // taskInfo.srcComponentId: String - 컴포넌트 ID

        // taskInfo.srcTaskId: int - 태스크 ID

        // taskInfo.timestamp: long - 메트릭 수집 시간(long, 단위 초)

        // taskInfo.updateIntervalSecs: long - 메트릭 수집 주기(단위 초)


        for (DataPoint dp : dataPoints) {

            // dp.name: String - 메트릭 이름

            // dp.value: Object - 메트릭 값, IMetric 구현마다 값 타입이 다름

            // 

        }

    }



IMetric 타입별로 값 타입이 다른데, 주요 구현체의 값 타입은 다음과 같다.

  • CountMetric - Long
  • MultiCountMetric - Map<String, Long>

스톰은 토폴로지에서 직접 발생하는 메트릭뿐만 아니라 스톰이 토폴로지와 관련해서 발생시키는 메트릭 값도 IMectirsConcumer에 전달한다. 따라서 IMectirsConcumer는 처리하려는 메트릭을 알맞게 걸러내야 한다. 스톰 자체 메트릭인 경우 메트릭 이름이 "__", "GC/", "memory/"와 같은 값으로 시작하므로, 토폴로지에서 직접 생성하는 메트릭만 처리하고 싶다면 이 이름을 가진 메트릭을 제외한 나머지 메트릭만 처리하면 된다.


스톰 설정에서 IMetricsConsumer 구현 클래스 등록하기


이제 남은 것은 스톰이 토폴로지가 발생한 메트릭을 IMetricsConsumer에 전달하도록 설정하는 것이다. 다음의 두 가지 방법 중 하나를 이용해서 설정할 수 있다.

  • storm.yaml 파일에 설정하기
  • 토폴로지 코드에서 설정하기
storm.yaml 파일에 설정하려면, 다음과 같이 topology.metrics.consumer.register에 구현 클래스를 지정하면 된다.

topology.metrics.consumer.register:
  - class: "com.mycomp.storm.metrics.MetricsCollector"
    parallelism.hint: 1

토폴로지 코드에서 직접 지정하고 싶다면 다음 코드처럼 Config#registerMetricsConsumer()를 이용해서 지정하면 된다.


public static void main(String[] args) throws Exception {

    TopologyBuilder builder = new TopologyBuilder();


    builder.setSpout("word", new TestWordSpout(), 4);

    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");

    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");


    Config conf = new Config();

    conf.registerMetricsConsumer(MetricsCollector.class, 1);

    conf.setDebug(true);

    if (args != null && args.length > 0) {

        conf.setNumWorkers(6);

        StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());

    } else {

        LocalCluster cluster = new LocalCluster();

        cluster.submitTopology("test", conf, builder.createTopology());

        Utils.sleep(60000);

        cluster.killTopology("test");

        cluster.shutdown();

    }

}



반응형

Flume에 대한 커스텀 메트릭을 구현할 일이 생겨 그 내용을 정리한다.


구조


Flume의 메트릭을 가져와 처리하려면 Flume이 제공하는 MonitorService 인터페이스를 상속받아 구현하면 된다.




MonitorService 구현하기


MonitorService를 상속받은 클래스는 다음과 같이 세 개의 메서드만 알맞게 구현하면 된다.

  • configure() : 초기화한다.
  • start() : 메트릭을 처리하는 작업을 별도 쓰레드로 시작한다.
  • stop() : 메트릭 처리 작업을 중지한다.
뼈대 코드는 다음과 같다.

public class CustomMonitorService implements MonitorService {

    @Override
    public void configure(Context context) {
        // 초기화
    }

    @Override
    public void start() {
        // 별도 쓰레드를 이용해서 MetricReporter 실행
    }

    @Override
    public void stop() {
        // MetricReporter 실행 중지
    }
}

configure() 구현

configure() 메서드는 필요한 설정 정보를 읽어와 초기화 작업을 수행한다. 아래 코드는 구현 예시이다.

public class CustomMonitorService implements MonitorService {
    private int intervalSec;
    private String hosts;

    @Override
    public void configure(Context context) {
        // "flume.monitoring.pollInterval" 시스템 프로퍼티 값을 읽음. 없을 경우 기본 값으로 60 사용
        this.intervalSec = context.getInteger("pollInterval", 60);
        this.hosts = System.getProperty("custom.collector.hosts");
    }


Context는 "flume.monitoring."로 시작하는 시스템 프로퍼티의 값을 읽어오는 기능을 제공한다. 위 코드에서 context.getInteger("pollInterval")은 flume.monitoring.pollInterval 시스템 프로퍼티 값을 읽어와 int 타입으로 변환한다. 해당 프로퍼티가 존재하지 않으면 60을 리턴한다.

사용할 시스템 프로퍼티가 "flume.monitoring."로 시작하지 않으면 System.getProperty() 메서드나 기타 다른 방법을 사용해서 필요한 설정 값을 직접 읽어온다.

start()와 stop() 구현

Flume은 configure() 메서드로 초기화를 한 뒤 start() 메서드를 실행한다. start() 메서드는 별도 쓰레드로 리포팅 기능을 실행하도록 구현하면 된다. 주기적으로 리포팅 기능을 실행할 경우 ExecutorService를 사용하면 편리하다.


public class CustomMonitorService implements MonitorService {
    private ScheduledExecutorService scheduledExecutorService;

    private int pollInterval;
    private String hosts;
    private MetricReporter metricReporter; // Runnable 구현

    @Override
    public void start() {
        if (scheduledExecutorService == null || scheduledExecutorService.isShutdown() ||
                    scheduledExecutorService.isTerminated()) {
            scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        }
        metricReporter = new MetricReporter(hosts);
        scheduledExecutorService.scheduleWithFixedDelay(
                metricReporter,
                pollingIntervalSec,
                pollingIntervalSec,
                TimeUnit.SECONDS);
    }

    @Override
    public void stop() {
        scheduledExecutorService.shutdown();
    }
    ...

주기적으로 메트릭 리포터 기능을 실행하기 위해 ScheduledExecutorService를 사용했다. start() 메서드에서 ExecutorService를 생성하고 scheduleWithFixedDelay() 메서드로 지정한 주기로 리포팅 기능을 실행하도록 했다.


MetricReporter 구현


MetricReporter는 ExecutorService를 이용해서 실행할 것이므로 Runnable을 구현한다. run() 메서드에서 메트릭을 읽어와 알맞은 처리를 하면 된다. 다음은 간단한 구현 예제이다.


import org.apache.flume.instrumentation.util.JMXPollUtil;


public class MetricReporter implements Runnable {


    private final String hosts;


    public FlumemonReporter(String hosts) {

        this.hosts = hosts;

    }


    @Override

    public void run() {

        try {

            List<String> metrics = new ArrayList<>();

            Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans();

            for (String component : metricsMap.keySet()) {

                Map<String, String> attributeMap = metricsMap.get(component);

                if (component.startsWith("CHANNEL.")) {

                    String name = "EventPutAttemptCount";

                    String value = attributeMap.get(name);

                    metrics.add(component + ", " + name + "=" + value);

                }

            }

            metrics.forEach(System.out::println);

        } catch (Exception e) {

        }

    }


}


JMXPollUtil.getAllMBeans()를 사용하면 flume이 제공하는 메트릭을 Map 형태로 구할 수 있다. 이 Map의 키는 컴포넌트 이름에 해당하는데, flume의 SOURCE, CHANNEL, SINK가 컴포넌트가 된다. 컴포넌트 이름은 "타입.이름"의 형태를 취한다.


metricsMap.get()은 개별 컴포넌트의 메트릭 목록을 맵 형태로 다시 구한다. 각 컴포넌트별로 제공하는 메트릭 목록이 다르다. 예를 들어, 채널의 경우 "EventPutAttemptCount", "EventPutSuccessCount"와 같은 메트릭을 제공한다. 각 메트릭 값은 String 타입으로 읽어오기 때문에, 필요시 Long이나 Integer와 같은 타입으로 알맞게 변환해주어야 한다.


Flume 실행시 MonitorService 지정하기


MonitorService를 구현했다면, 이제 남은 작업은 Flume을 실행할 때 구현한 클래스를 MonitorService로 사용하도록 지정하는 것이다. 다음과 같이 flume.monitoring.type 시스템 프로퍼터에 구현 클래스를 지정하면 된다.


bin/flume-ng agent --conf-file example.conf --name a1 \

   -Dflume.monitoring.type=com.mycomp.MetricReporter -Dflume.monitoring.hosts=somehost


이 외에 구현 클래스에서 필요로 하는 시스템 프로퍼티를 -D 옵션으로 추가 지정한다. 물론, 구현한 클래스가 flume이 사용하는 클래스패스에 위치해야 한다.


반응형

Kafka의 메트릭 리포트 부분을 확장할 일이 있어 메트릭 수집 관련 부분을 조사했는데, 그 내용을 정리한다.


구조


Kafka의 메트릭 리포팅과 관련된 클래스 구조는 아래 그림과 같다.



메트릭 리포팅 관련 타입은 크게 네 종류로 다음과 같다.

  • 카프카의 기반 타입
    • KafkaMetricsReporterMBean : 카프카 메트릭 리포터 MBean을 위한 기반 타입이다. 리포터의 시작/종료를 JMX로 조작하고 싶은 경우 이 타입을 상속받은 MBean 인터페이스를 만든다.
    • KafkaMetricsReporter : Kafka가 메트릭 리포터로 사용할 클래스가 구현할 타입이다.
  • Metrics의 기반 타입
    • AbstractReporter / AbstractPollingReporter : Metrics의 메트릭을 리포팅 할 리포터가 상속받을 클래스이다. 주기적으로 메트릭 값을 처리하려면 AbstractPollingReporter를 상속받아 구현한다.
    • MetricProcessor : Metrics의 메트리 종류별로 값을 받아 처리할 처리기가 구현해야 할 인터페이스이다.

Metrics는 메트릭 생성/관리를 위한 도구로 http://metrics.dropwizard.io/3.1.0/ 사이트를 참조한다.


Kafka로부터 메트릭을 받아 원하는 동작을 하고 싶다면 위 클래스 다이어그램에서 노란 색으로 표시한 타입을 알맞게 구현하면 된다. 그림에서 보는 것처럼 Kafka의 리포팅 관련 상위 타입을 구현한 타입 두 개와 Metrics와 관련된 타입 두 개를 구현하면 된다.


Kafka 리포터 구현


구현할 타입은 다음과 같다.

  • KafkaMetricsReporterMBean 인터페이스를 상속받은 CustomMetricsReporterMBean 인터페이스
  • KafkaMetricsReporter 인터페이스와 CustomMetricsReporterMBean 인터페이스를 상속받은 클래스
먼저 CustomReporterMBean 인터페이스는 다음과 같이 간단하다.

import kafka.metrics.KafkaMetricsReporterMBean;

public interface CustomMetricsReporterMBean extends KafkaMetricsReporterMBean {
}

MBean으로 노출하고 싶은 오퍼레이션이 추가로 있다면 CustomMetricsReporterMBean에 추가한다.


다음으로 KafkaMetricsReporter 인터페이스와 CustomMetricsReporterMBean 인터페이스를 상속받은 CustomMetricsReporterMBean 클래스를 구현한다. 이 클래스의 뼈대 코드는 다음과 같다.


import kafka.metrics.KafkaMetricsReporter;

import kafka.utils.VerifiableProperties;


public class CustomMetricsReporter implements KafkaMetricsReporter, CustomMetricsReporterMBean {


    private VerifiableProperties props;

    private CustomReporter underlying;

    ...기타 필드


    @Override

    public void init(VerifiableProperties props) {

        // 리포터를 초기화하고 리포팅을 시작한다.

    }


    @Override

    public void startReporter(long pollingPeriodSecs) {

        // underlying 리포팅 시작

    }


    @Override

    public void stopReporter() {

        // underlying 리포팅 종료

    }


    @Override

    public String getMBeanName() {

        return "kafka:type=com.mycomp.kafka.CustomMetricsReporter";

    }

}


init() 메서드 구현


init() 메서드는 Metrics의 리포터를 초기화하고 리포팅을 시작한다. 구현 코드는 다음과 같은 형태를 취한다.


import com.yammer.metrics.Metrics;

import kafka.metrics.KafkaMetricsConfig;

import kafka.metrics.KafkaMetricsReporter;

import kafka.utils.VerifiableProperties;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;


import java.net.InetAddress;

import java.net.UnknownHostException;

import java.util.concurrent.TimeUnit;


public class CustomMetricsReporter 

        implements KafkaMetricsReporter, CustomMetricsReporterMBean {


    private VerifiableProperties props; // 설정 파일의 프로퍼티 보관

    private CustomReporter underlying; // Metrics 리포터


    private final Object lock = new Object();

    private boolean initialized = false;

    ...기타 필드


    @Override

    public void init(VerifiableProperties props) {

        // props는 Kafka 설정 파일의 프로퍼티 값을 갖는다.

        synchronized (lock) {

            this.props = props;

            if (!initialized) {

                initializeReporter();

                initialized = true;

                KafkaMetricsConfig metricsConfig = new KafkaMetricsConfig(props);

                // metricsConfig.pollingIntervalSecs() --> kafka.metrics.polling.interval.secs 설정 값

                startReporter(metricsConfig.pollingIntervalSecs());

            }

        }

    }


    // Metrics 리포터 초기화

    private void initializeReporter() {

        // 리포터 초기화에 필요한 프로퍼티 값 사용

        String someProperty = props.getString("some.property", "defaultValue");

        underlying = new CustomReporter(Metrics.defaultRegistry(), someProperty);

    }


    @Override

    public void startReporter(long pollingPeriodSecs) {

        // underlying 리포팅 시작

    }


startReporter() 구현


startReporter() 메서드는 Metrics의 리포터인 underying의 start() 메서드를 실행한다. 이 메서드는 메트릭 리포팅을 주기적으로 실행한다.


import java.util.concurrent.TimeUnit;


public class CustomMetricsReporter implements KafkaMetricsReporter, CustomMetricsReporterMBean {


    private VerifiableProperties props;

    private CustomReporter underlying;


    private final Object lock = new Object();

    private boolean initialized = false;

    private boolean running = false;


    @Override

    public void startReporter(long pollingPeriodSecs) {

        // underlying 리포팅 시작

        synchronized (lock) {

            if (initialized && !running) {

                underlying.start(pollingPeriodSecs, TimeUnit.SECONDS);

                running = true;

            }

        }

    }



stopReporter() 구현


public class CustomMetricsReporter implements KafkaMetricsReporter, CustomMetricsReporterMBean {


    private VerifiableProperties props;

    private CustomReporter underlying;


    private final Object lock = new Object();

    private boolean initialized = false;

    private boolean running = false;

    ...기타 필드


    @Override

    public void stopReporter() {

        // underlying 리포팅 종료

        synchronized (lock) {

            if (initialized && running) {

                underlying.shutdown();

                running = false;

                initializeReporter();

            }

        }

    }


stopReporter()는 Metrics 리포터의 shutdown()을 실행해서 메트릭 리포팅을 중지한다. 위 코드에서 리포팅 중지후에 initializeReporter()를 다시 실행하는데, 그 이유는 startReporter()를 다시 실행할 때 사용할 리포터를 미리 초기화하기 위함이다.


Metrics 리포터 구현


앞서 Kafka의 리포터 구현에서 봤듯이 Kafka 리포터는 Metrics 리포터를 시작하고 중지하는 역할을 맡는다. 실제로 메트릭을 리포팅하는 역할은 Metrics 리포터가 처리한다.


Metrics 리포터를 위한 코드 뼈대는 다음과 같다.


import com.yammer.metrics.core.*;

import com.yammer.metrics.reporting.AbstractPollingReporter;


import java.util.concurrent.TimeUnit;


public class CustomReporter

        extends AbstractPollingReporter

        implements MetricProcessor<CustomReporter.Context> {


    ...// 필드


    public CustomReporter(MetricsRegistry metricsRegistry, String someValue) {

        super(metricsRegistry, "custom-reporter");

        this.someValue = someValue;

    }


    @Override

    public void start(long pollingPeriodSecs, TimeUnit unit) {

        super.start(pollingPeriodSecs, unit);

    }


    @Override

    public void shutdown() {

        try {

            super.shutdown();

        } finally {

        }

    }


    @Override

    public void run() {

        // 리포팅 코드 위치

    }


    @Override

    public void processMeter(MetricName name, Metered meter, Context context) throws Exception {

    }


    @Override

    public void processCounter(MetricName name, Counter counter, Context context) throws Exception {

    }


    @Override

    public void processHistogram(MetricName name, Histogram histogram, Context context) throws Exception {

    }


    @Override

    public void processTimer(MetricName name, Timer timer, Context context) throws Exception {

    }


    @Override

    public void processGauge(MetricName name, Gauge<?> gauge, Context context) throws Exception {

    }


    public class Context {

    }

}


start() 메서드와 shutdown() 메서드는 상위 클래스의 start()와 shutdown() 메서드를 호출한다. AbstractPollingReporter의 start() 메서드는 자체 Executor를 이용해서 리포팅을 실행한다.


run() 메서드 구현


run() 메서드에서 메트릭을 읽어와 원하는 리포팅 작업을 수행한다.


import com.yammer.metrics.core.*;


public class CustomReporter

        extends AbstractPollingReporter

        implements MetricProcessor<CustomReporter.Context> {


    ...


    @Override

    public void run() {

        // 리포트 시작


        //  모든 메트릭을 구한다.

        final Set<Map.Entry<MetricName, Metric>> metricEntries =

                getMetricsRegistry().allMetrics().entrySet();


        Context context = new Context(); // 메트릭 값을 담을 컨텍스트 생성

        for (Map.Entry<MetricName, Metric> entry : metricEntries) {

            final MetricName metricName = entry.getKey();

            final Metric metric = entry.getValue();

            try {

                // 각 메트릭을 처리한다. processWith의 첫 번째 인자의 타입은 MetricProcessor이다.

                // 이 클래스가 MetricProcessor를 구현했으므로 this를 첫 번째 인자로 전달한다.

                metric.processWith(this, metricName, context);

            } catch (Exception e) {

            }

        }

        ... context의 결과 이용해서 리포팅

        

        // 리포트 종료

    }


    // process 메서드에서 메트릭 타입별로 처리

    ...


MetricName과 Metric은 각각 메트릭의 이름과 값을 갖는다. metric.processWith() 메서드를 메트릭 타입에 따라서 알맞은 process 메서드를 실행하는데, 이 process 메서드를 이용해서 메트릭을 context에 수집한다.


Context와 process 메서드 구현


Context는 주로 필요한 메트릭의 결과를 수집하는 역할을 한다. 간단한 Context 클래스를 다음과 같이 구현할 수 있다. 앞서 뼈대 코드에서는 Context 클래스를 CustomReporter의 내부 클래스로 선언했다.


public class Context {

    public List<String> metrics = new LinkedList<>();

}


각 메트릭을 처리하는 process 메서드는 MetricProcessor 인터페이스에 정의되어 있는데, CustomReporter 클래스가 이 인터페이스를 구현했는데 각 process 메서드에서 알맞게 메트릭을 꺼내 Context에 보관한다.


public class CustomReporter

        extends AbstractPollingReporter

        implements MetricProcessor<CustomReporter.Context> {

    ...


    @Override

    public void processMeter(MetricName name, Metered meter, Context context) throws Exception {

        String metric = name.getName() + " = " + meter.count();

        context.metrics.add(metric);

    }


    @Override

    public void processCounter(MetricName name, Counter counter, Context context) throws Exception {

        ...

    }

    ... processHistogram, processTimer, processGauge 메서드 구현


process 메서드는 Context에 필요한 정보를 저장한다. 다시 run() 메서드로 돌아가서 각 메트릭을 처리한 뒤 다음 코드처럼 context에 보관된 메트릭 처리 결과를 알맞은 형식으로 리포팅하면 메트릭 리포팅이 완료된다.


    @Override

    public void run() {

        // 리포트 시작

        final Set<Map.Entry<MetricName, Metric>> metricEntries = ...;


        Context context = new Context();

        for (Map.Entry<MetricName, Metric> entry : metricEntries) {

            ...생략

        }

        // context에서 처리된 메트릭 값 읽어와 리포팅

        for (String metric : context.metrics) {

            System.out.println(metric);

        }

        // 리포트 종료

    }


이 코드에서는 간단히 System.out을 사용했지만, 메트릭 수집 결과를 DB에 보관한다거나 메트릭 저장소에 전송하는 등의 코드를 알맞게 구현하면 된다.


카프카 브로커 설정


구현한 메트릭 리포터를 사용하려면 카프카 설정에 kafka.metric.reporters 프로퍼티에 리포터 Kafka 리포터 클래스를 지정하면 된다.


kafka.metrics.reporters=com.mycom.kafka.CustomMetricsReporter

log.dirs=/data1/kafka

zookeeper.connect=zk1:2181,zk2:2181,zk3:2181


설정을 적용했다면 Kafka를 시작해 보자. 커스텀 메트릭 리포터가 동작하는 것을 확인할 수 있을 것이다.


  1. RootedIn 2019.11.04 15:36 신고

    위 처럼 구현할려면 해당 인스턴스를 카프카 서버에 올려야하는 것인가요? 외부 서버에서 접근하기는 어렵겠죠?

반응형

스트림으로 들어오는 튜플을 단순히 처리만하고 끝나는 경우는 드물다. 보통은 처리 결과를 최종 또는 중간 단계 어딘가에 저장하게 된다. 예를 들어, 10분당 판매량이 100개가 넘는 상품을 통지해주는 스톰 토폴로지를 만들었다면, 토폴로지를 이용해서 계산한 분당 판매량 데이터를 HBase나 MySQL과 같은 데이터베이스에서 보관하길 원하는데, 이렇게 해야 스톰에 장애가 발생했을 때, 그 이전에 계산한 값을 날리지 않을 수 있으며, 또한, 이렇게 계산한 데이터를 실시간 통계 데이터로 활용할 수 있기 때문이다.


이전글:


상태 추상화


Filter나 Function을 이용해서 계산한 결과 상태를 DB에 보관하는 기능을 구현할 수 있지만, 트라이던트 API는 일관된 방법으로 상태를 처리할 수 있도록 상태를 위한 API를 제공하고 있다. 트라이던트 API는 상태 보관을 위해 다음의 세 가지 인터페이스를 정의하고 있다.

  • State: 상태를 표현
  • StateFactory: State를 생성해주는 팩토리
  • StateUpdater: 배치 단위로 튜플을 State 객체에 반영

트라이던트 API는 위 세 가지 인터페이를 이용해서 상태를 관리한다. 즉, 개발자는 위 세 개의 인터페이스에 대한 구현 클래스만 알맞게 만들어주면 된다. 나머지는 트라이언트가 알아서 처리한다.


세 인터페이스는 다음과 같이 정의되어 있다.


public interface State {

    void beginCommit(Long txid);

    void commit(Long txid);

}


public interface StateFactory extends Serializable {

    State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions);

}


public interface StateUpdater<S extends State> extends Operation {

    void updateState(S state, List<TridentTuple> tuples, TridentCollector collector);

}


스톰은 내부적으로 다음과 같은 코드를 사용해서 튜플을 상태에 반영한다.


// 배치 시작을 상태에 알림

state.beginCommit(txid);


// 배치에 속한 튜플 단위로 상태에 반영하고, collector를 이용해서 새로운 튜플 발생

stateUpdater.updateState(state, tuples, collector)


// 배치 종료를 상태에 알림

state.commit();


스톰은 배치 단위로 튜플을 상태에 반영한다. 배치가 시작되면 이를 State에 알리고(beginCommit 실행), StateUpdater를 이용해서 배치에 속한 튜플을 State에 반영한다(updateState 실행). 배치에 속한 튜플을 모두 State에 반영하면 배치가 끝났음을 State에 알린다.(commit 실행)


위 코드에서 알 수 있는 건, State와 StateUpdater는 쌍을 이룬다는 점이다. 즉, 특정 타입의 State에 상태를 반영하려면 그 State에 알맞은 StateUpdater가 필요하다. 실제로 Stream의 partitionPersist() 메서드를 보면 다음과 같이 특정 타입의 State 객체를 생성하는 StateFactory와 그 State의 상태를 갱신할 때 사용될 StateUpdater를 함께 파라미터로 전달받는 것을 알 수 있다.


partitionPersist(StateFactory stateFactory, Fields inputFields, StateUpdater updater, Fields functionFields)

 


[State와 StateUpdater는 쌍을 이룬다.]


스톰은 일부 State 구현에 대해 이미 쌍을 우리는 StateUpdater 구현을 제공하고 있다. (위 그림에서 MapState와 MapCombinerAggStateUpdater가 스톰이 제공하는 인터페이스/클래스이다.)


State 만들어보기


Stream이 제공하는 메서드는 partitionPersist()인데, 이 이름에서 알 수 있듯이 스트림은 기본적으로 파티션 단위로 상태값을 보관하는 간단한 State 구현을 만들어보자. 이를 통해 State의 동작 방식에 대한 약간의 감을 잡을 수 있을 것이다. 여기서 만들 예제는 중복되지 않은 "제품ID:시간"을 키로 하는 이벤트 발생 여부를 보관하는 State 구현이다.


State 구현은 다음과 같이 간단하게 구현했다.


public class DistinctState implements State {


    private Map<String, ShopLog> map = new ConcurrentHashMap<>();

    private Map<String, ShopLog> temp = new ConcurrentHashMap<>();


    @Override

    public void beginCommit(Long txid) {

        temp.clear();

    }


    @Override

    public void commit(Long txid) {

        for (Map.Entry<String, ShopLog> entry : temp.entrySet())

            map.put(entry.getKey(), entry.getValue());

    }


    public boolean hasKey(String key) {

        return map.containsKey(key) || temp.containsKey(key);

    }


    public void put(String key, ShopLog value) {

        temp.put(key, value);

    }

}


위 코드에서 beginCommmit() 메서드는 temp 맵을 초기화한다. 그리고, commit() 메서드는 temp에 담겨 있는 모든 내용을 map에 복사한다. 여기서는 메모리 맵은 map 필드에 내용을 복사했지만, 실제 구현에서는 외부의 캐시 서버나 DBMS 등에 내용을 복사하도록 구현하게 된다. 즉, 한 배치가 시작될 때 beginCommit() 메서드를 통해 배치에 속한 튜플의 처리 준비를 하고, 한 배치가 끝날 때 commit() 메서드를 통해 배치에 포함된 튜플의 처리 결과를 상태를 보관할 저장소에 반영하게 된다.


배치의 시작과 끝 사이에 사용되는 메서드가 hasKey() 메서드와 put() 메서드이다. 이 두 메서드는 StateUpdater에 의해 사용되는데, DistictState를 위한 StateUpdater 구현 클래스는 다음과 같다.


public class DistinctStateUpdater extends BaseStateUpdater<DistinctState> {


    @Override

    public void updateState(

                DistinctState state, List<TridentTuple> tuples, TridentCollector collector) {

        List<TridentTuple> newEntries = new ArrayList<>();

        for (TridentTuple t : tuples) {

            String key = t.getStringByField("productId:time");

            if (!state.hasKey(key)) {

                state.put(key, (ShopLog) t.getValueByField("shopLog"));

                newEntries.add(t);

            }

        }

        for (TridentTuple t : newEntries) {

            collector.emit( // 상태로부터 새로운 스트림을 생성

                Arrays.asList(t.getValueByField("shopLog"), t.getStringByField("productId:time"))

            );

        }

    }

} 


StateUpdater 구현 클래스는 BaseStateUpdater 클래스를 상속받은 뒤 updateState() 메서드를 알맞게 구현해주면 된다. updateState() 메서드는 첫 번째 파라미터로 상태를 표현하는 객체를, 두 번째 파라미터로 배치에 속한 튜플을, 세 번째 파라미터는 튜플을 상태에 반영한 후 새로운 튜플을 생성할 때 사용할 TridentCollector이다.


updteState() 메서드를 보면 DistinctState의 hasKey() 메서드를 이용해서 처리하려는 튜플의 key 값이 이미 DistinctState에 보관한 key인지 확인한다. 아직 보관전인 key를 가진 튜플이면, put() 메서드를 이용해서 DistinctState에 반영하고, newEntries에 보관한다.


updateState() 메서드는 배치에 포함된 모든 튜플을 상태에 반영한 뒤, collector를 이용해서 newEntries에 보관된 튜플을 재발생시킨다. 이렇게 함으로써 상태 처리 상태로부터 새로운 튜플을 만들어내는 스트림이 만들어지게 된다.


마지막으로 구현해야 할 클래스는 StateFactory 인터페이스의 구현 클래스이다. 이 클래스의 구현은 다음과 같이 간단하다.


public class DistinctStateFactory implements StateFactory {

    private static final long serialVersionUID = 1L;


    @SuppressWarnings("rawtypes")

    @Override

    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {

        return new DistinctState();

    }

}


이제 이렇게 만든 상태 관련 클래스를 실제 트라이던트 토폴로지에 적용해보자.


TridentTopology topology = new TridentTopology();

topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .partitionPersist(

                new DistinctStateFactory()

                new Fields("shopLog", "productId:time"), // StateUpdater에 전달될 튜플의 필드 목록

                new DistinctStateUpdater()

                new Fields("shopLog", "productId:time")) // StateUpdater가 생성하는 튜플 필드 목록

        .newValuesStream() // StateUpdater에서 생성한 튜플로 구성된 스트림

        .each(new Fields("shopLog", "productId:time"), Util.printer());

...


partionPersist() 메서드는 파티션 별로 State를 생성하고 튜플을 상태에 반영하는 기능을 제공한다. 위 코드의 경우 StateFactory로 DisctinctStateFactory를 사용했으므로 파티션 별로 상태 보관을 위해 DistinctState 객체가 생성된다.


partitionPersist() 메서드는 TridentState 객체를 리턴하는데, 이 객체의 newValueStream() 메서드는 DistinctStateUpdater에서 collector를 이용해서 생성한 튜플을 스트림으로 제공하는 새로운 Stream을 리턴한다. 따라서, 위 코드에서 마지막 each() 메서드는 DistinctStateUpdater가 생성한 튜플을 입력으로 받아서 그 내용을 콘솔에 출력하게 된다.


상태와 튜플 재처리 방식


튜플로부터 상태를 보관할 때 고민해야 할 점이 있다면, 그것은 바로 재처리와 관련된 것이다. 아래 그림을 보자. 아 그림에서 persistentAggregate 연산은 앞의 groupBy로 그룹핑 된 튜플의 개수를 적재하는 State를 처리해준다고 해 보자.




State가 키가 "0:123"인 튜플의 개수로 3을 갖고 있었다고 하자. 즉, {"0:123": 3 }을 State가 상태로 보관하고 있었다고 하자. 이 상태에서 위 그림의 윗 부분의 실행되었다고 하자. 이 때 스파우트가 발생한 "0:123"인 튜플의 개수가 4라면, persistenAggregate를 거친 뒤에 상태 값은 {"0:123": 7}이 될 것이다. 그런데, AlertFunction 연산에서 실패가 나는 바람에 스톰이 재처리를 수행했다고 해 보자. 그러면, 위 그림의 아래 부분처럼 튜플을 다시 발생시킬 거고, 그렇게 되면 persistenceAggregate는 이미 {"0:123": 7}을 상태 값으로 갖고 있었기 때문에, 재발송된 튜플 개수 4개가 더해지면서 상태 값이 {"0:123", 11}로 바뀌게 될 것이다.


사실, 여기서 동일한 튜플이 다시 발생한 것이므로 최종 상태 값은 {"0:123", 7}이어야 한다. 그런데, 상태가 튜플의 재발생 여부를 제대로 처리하지 못하게 되면, 같은 튜플이 상태에 중복해서 적용되는 문제가 발생하는 것이다.


이렇게 튜플을 다시 발생했을 때 동일 튜플이 State에 중복해서 적용되는 문제를 처리하기 위해 스톰은 세 가지 타입의 State 구현 객체를 제공하고 있다.



State

설명

TransactionalMap

스파우트가 Transactional인 경우(즉 한 배치에 대해 동일한 튜플을 생성할 경우), 동일 튜플에 대해 동일 상태를 보장한다.

OpaqueMap

스파우트가 Opaque Transactional인 경우(즉 한 튜플은 한 배치에만 속하지만 한 배치에 대해 매번 동일한 튜플이 발생하지 않을수도 있는 경우), 이전 상태 값을 기준으로 상태를 갱신한다.

NonTransactionalMap스파우트가 Non Transactional인 경우(즉 튜플이 특정 배치에만 속한다고 가정할 수 없는 경우), 튜플 재생성에 따른 롤백을 지원하지 않는다.


TransactionalMap의 동작 방식


TransactionalMap은 상태를 보관할 때 (트랜잭션ID, {키: 값})의 구조를 사용한다. 예를 들어, 배치 단위로 그룹바이-카운트를 수행한 결과 아래 표의 왼쪽/중간 컬럼과 같이 (키, 개수)가 생성되었다고 해 보자. 이 경우 표의 우측 컬럼과 같이 상태 값이 갱신된다.


 배치(트랜잭션) ID

 튜플 연산 결과 (키, 개수) 값

상태

 1

{ "0:123": 4 }

(1, {"0:123": 4} ) 

 2

{ "0:123": 5 } 

(2, {"0:123": 9} )

 3

{ "0:123": 3 } 

(3, {"0:123": 12} ) 

 3 (재발생)

{ "0:123": 3 }  
Transactional 스파우트는 동일한 튜플을 발생

(3, {"0:123": 12} )  
(기존 값 그대로 유지)

위 표에서 트랜잭션ID가 3에 해당하는 배치의 튜플이 다시 발생했다. 이 스파우트는 Transactional 타입이므로 같은 배치 ID에 대해 동일한 튜플을 발생시키므로, 튜플의 그룹바이-카운트 연산 결과는 위 표의 가운데 컬럼에서 보는 것처럼 동일하다. 따라서, TransactionalMap은 현재 상태와 동일한 트랜잭션 ID에 해당하는 결과 값이 들어올 경우, 상태에 반영하지 않고 유지하면 된다.


OpaqueMap의 동작 방식


OpaqueMap은 상태를 보관할 때 (트랜잭션ID, 키, 이전값, 현재값)의 구조를 사용한다. 앞의 TransactionalMap과 유사하게 3번 트랜잭션ID에 해당하는 배치를 재처리했다고 해 보자.


 배치(트랜잭션) ID

 튜플 연산 결과 (키, 개수) 값

상태

 1

{ "0:123": 4 }

(1, "0:123", null, 4 ) 

 2

{ "0:123": 5 } 

(2, "0:123", 4 9 )

 3

{ "0:123": 3 } 

(3, "0:123", 9, 12 ) 

 3 (재발생)

{ "0:123": 5 }  

Opaque 스파우트는 발생한 튜플이 달라질 수 있음

(3, "0:123", 9, 14 )  

위 표에서 OpaqueMap은 상태에 값을 보관할 때, 현재 처리한 트랜잭션ID와 이전 값, 현재 값을 함께 보관한다. 만약 특정 문제가 발생해서 스파우트가 특정 트랜잭션 ID에 해당하는 튜플을 재발생한 경우, Opaque 스파우트는 위 표의 중간 컬럼처럼 다른 결과를 만들어낼 수 있다. 따라서 상태는 새롭게 계산된 결과와 이전 상태 값을 이용해서 현재 상태 값을 갱신해주어야 한다. 위 표에서는 3번 트랜잭션ID의 재발생 시점에 이전 상태 값이 {"0:123": 9}이므로, 현재 상태 값은 {"0:123": 9 + 5}인 {"0:123": 14}로 바뀐다.


OpaqueMap 구현 방법: IBackingMap, OpaqueValue 사용


OpaqueMap은 내부적으로 IBackingMap 인터페이스와 OpaqueValue 클래스를 사용한다. OpaqueMap은 상태 데이터를 처리하기 위해 OpaqueValue 클래스를 사용하고, 실제 저장소로부터 상태 값을 조회하거나 저장소에 상태 값을 반영하기 위해 IBackingMap 구현체를 사용한다. 실제 OpaqueMap 인스턴스를 생성하는 메서드는 정적 팩토리 메서드로 다음과 같이 정의되어 있다.


public class OpaqueMap<T> implements MapState<T> {

    public static <T> MapState<T> build(IBackingMap<OpaqueValue> backing) {

        return new OpaqueMap<T>(backing);

    }


실제로 OpaqueMap을 사용하기 위해서 필요한 건 다음과 같다.

  • 상태 보관을 위해 IBackingMap 인터페이스를 알맞게 구현한 클래스를 만든다.
  • 앞서 만든 IBackingMap 구현 객체를 이용해서 생성한 OpaqueMap을 리턴해주는 StateFactory를 구현한다.
  • 집합 연산을 구현한다.
  • GroupedStream의 persistenceAggregate() 메서드를 이용해서 그룹핑 연산 결과를 상태에 반영한다.
    • Stream의 partitionPersist()를 사용할 경우, 알맞은 StateUpdater를 사용한다.

OpaqueMap용 "키:개수" 상태 보관하기 위한 IBackingMap 인터페이스를 구현한 클래스는 다음 코드처럼 만들어 볼 수 있다.


public class CountSumBackingMap implements IBackingMap<OpaqueValue> {

    private Map<String, CountValue> sumMap = new ConcurrentHashMap<>();


    @Override

    public List<OpaqueValue> multiGet(List<List<Object>> keys) {

        List<OpaqueValue> values = new ArrayList<>();

        for (List<Object> keyVals : keys) {

            String key = (String) keyVals.get(0);

            CountValue countValue = sumMap.get(key);

            if (countValue == null) {

                values.add(null);

            } else {

                values.add(new OpaqueValue<Long>(countValue.getCurrentTxId(),

                                 countValue.getCurrCount(), countValue.getPrevCount()));

            }

        }

        return values;

    }


    @Override

    public void multiPut(List<List<Object>> keys, List<OpaqueValue> vals) {

        for (int i = 0; i < keys.size(); i++) {

            String key = (String) keys.get(i).get(0);

            OpaqueValue<Long> val = vals.get(i);


            CountValue countValue = sumMap.get(key);

            if (countValue == null) {

                sumMap.put(key, new CountValue(val.getCurrTxid(), val.getPrev(), val.getCurr()));

            } else {

                countValue.update(val.getCurrTxid(), val.getPrev(), val.getCurr());

            }

        }

    }


}


IBackingMap을 구현하는 클래스는 multiGet() 메서드와 multiPut() 메서드 두 개만 알맞게 구현해주면 된다. 먼저, multiGet() 메서드는 파라미터로 키 목록을 전달받으며, 이 키에 해당하는 OpaqueValue 목록을 리턴해주면 된다. 위 코드의 multiGet() 메서드 메모리 맵인 sumMap에서 키에 해당하는 CountValue 객체를 가져온 뒤, 그 CountValue 객체로부터 OpaqueValue를 생성하고 있다.


CountValue는 (currTxId, prevVal, currVal)을 값으로 갖고 있으므로, sumMap은 결과적으로 (currTxId, key, prevVal, currVal)을 보관하는 장소가 된다. 이 예에서는 메모리 맵을 사용했지만, 실제로는 외부의 DBMS나 NoSQL 같은 곳에 보관된 데이터를 보관하고 읽어올 것이다.


OpaqueValue는 OpaqueMap의 동작 방식에서 설명했던 데이터 구조 중 키를 제외한 (트랜잭션ID, 이전값, 현재값)을 값으로 갖는다. 위 코드에서도 CountValue로부터 OpaqueValue를 생성할 때 트랜잭션ID, 이전값, 현재값을 가져와 생성했다.


스톰은 그룹 연산을 수행할 때 OpaqueMap은 내부적으로 다음과 같은 과정을 거친다.

  • IBackingMap의 multiGet()을 이용해서 (키: OpaqueValue) 목록을 구한다.
  • 앞서 구한 그룹 연산의 결과를 반영한 새로운 (키: OpaqueValue) 목록을 생성하고
  • 그 결과를 IBackingMap의 multiPut()을 이용해서 반영한다.

즉, multiPut() 메서드는 파라미터로 전달받은 (키: OpaqueValue) 목록을 이용해서 그 결과를 저장소에 반영하면 된다. 위 코드는 메모리 맵은 sumMap에 반영했지만, 실제 구현에서는 HBase나 MySQL과 같은 외부 저장소에 반영하게 될 것이다. 연산 결과를 반영할 때 주의할 점은 OpaqueMap이 제대로 동작하도록 (트랜잭션ID, 키, 이전값, 현재값)이 올바르게 적용되어야 한다는 점이다.


IBackingMap을 구현했다면 상태 생성을 위한 StateFactory를 구현하는 것이다. OpaqueMap을 위한 StateFactory는 다음과 같이 간단하게 구현할 수 있다.


public class CountSumStateFactory implements StateFactory {


    @SuppressWarnings("rawtypes")

    @Override

    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {

        return OpaqueMap.build(new CountSumBackingMap());

    }


}


키 별로 개수를 구하는 집합 연산을 다음과 같이 구현해 볼 수 있다.


public class CountReducerAggregator implements ReducerAggregator<Long> {


    private static final long serialVersionUID = 1L;


    @Override

    public Long init() {

        return 0L;

    }


    @Override

    public Long reduce(Long curr, TridentTuple tuple) {

        return curr + 1L;

    }


}


이제 필요한 것은 다 만들었으므로, 다음과 같이 persistentAggregate() 메서드를 이용해서 그룹핑 연산 결과를 상태에 보관할 수 있게 된다.


TridentTopology topology = new TridentTopology();

topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .groupBy(new Fields("productId:time"))

        .persistentAggregate(

                new CountSumStateFactory(), // 상태 팩토리

                new CountReducerAggregator(), // 집합 연산

                new Fields("sum")) // 집합 연산 결과 필드

        .newValuesStream() // (productId:time, sum) 튜플 생성

        .each(new Fields("productId:time", "sum"), new ThresholdFilter())

        .each(new Fields("productId:time", "sum"), new AlertFilter())


그룹핑 연산이므로 그룹핑 기준 튜플의 값이 키가 되고, 집한 연산 결과가 값이 된다. 즉, "productId:time" 필드 값이 키가 되고 집합 연산 결과인 "sum" 필드가 값이 된다. 위 코드에서 한 가지 빠진 것이 있는데, 그것은 바로 StateUpdater이다. 앞서 partionPersist() 예제에서는 StateUpdater를 사용했는데, 위 예에서는 StateUpdater를 따로 구현하지 않고 있다. StateUpdater를 지정하지 않은 이유는 persistentAggregate() 메서드가 내부적으로 OpaqueMap에 알맞은 StateUpdater 구현체를 사용하고 있기 때문이다.


TransactionalMap 구현 방법: IBackingMap, TransactionalValue 사용


TransactionalMap을 사용하는 방법은 OpaqueMap을 사용하는 방법과 거의 동일하다. 둘 다 알맞은 IBackingMap 구현 클래스만 제공해주면 된다. 차이점이 있다면 TransactionalMap은 OpaqueValue 대신에 TransactionalMap을 사용한다는 것이다. 다음은 TransactionalMap을 위한 IBackingMap의 구현 예를 보여주고 있다.


public class CountSumBackingMap2 implements IBackingMap<TransactionalValue<Long>> {

    private Map<String, CountValue2> sumMap = new ConcurrentHashMap<>();


    @Override

    public List<TransactionalValue<Long>> multiGet(List<List<Object>> keys) {

        List<TransactionalValue<Long>> values = new ArrayList<>();

        for (List<Object> keyVals : keys) {

            String key = (String) keyVals.get(0);

            CountValue2 countValue = sumMap.get(key);

            if (countValue == null) {

                values.add(null);

            } else {

                values.add(

                    new TransactionalValue<Long>(countValue.getTxId(), countValue.getCount()));

            }

        }

        return values;

    }


    @Override

    public void multiPut(List<List<Object>> keys, List<TransactionalValue<Long>> vals) {

        for (int i = 0; i < keys.size(); i++) {

            String key = (String) keys.get(i).get(0);

            TransactionalValue<Long> val = vals.get(i);


            CountValue2 countValue = sumMap.get(key);

            if (countValue == null) {

                sumMap.put(key, new CountValue2(val.getTxid(), val.getVal()));

            } else {

                countValue.update(val.getTxid(), val.getVal());

            }

        }


    }


}


StateFactory는 동일한 방식으로 구현한다.


public class CountSumStateFactory2 implements StateFactory {


    private static final long serialVersionUID = 1L;


    @SuppressWarnings("rawtypes")

    @Override

    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {

        return TransactionalMap.build(new CountSumBackingMap2());

    }


}


스톰 트라이던트 상태 구현 예

  • Memcached를 위한 State 구현: https://github.com/nathanmarz/trident-memcached
  • 소스 코드: https://github.com/madvirus/storm-sample


반응형

오늘 Ambari 1.6을 이용해서 HDP 2.1을 설치하는데, Application Timeline Serve가 올라오지 않아 서버에 들어가 로그 파일에 기록된 에러 메시지를 봤더니, 다음 클래스를 찾지 못한다는 에러 메시지가 출력되고 있었다.


2014-07-16 19:01:55,808 INFO  applicationhistoryservice.ApplicationHistoryServer (SignalLogger.java:register(91)) - registered UNIX signal handlers for [TERM, HUP, INT]

2014-07-16 19:01:55,960 INFO  service.AbstractService (AbstractService.java:noteFailure(272)) - Service org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer failed in state INITED; cause: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.LeveldbTimelineStore not found

java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.LeveldbTimelineStore not found

        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1927)

        at org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer.createTimelineStore(ApplicationHistoryServer.java:165)

        at org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer.serviceInit(ApplicationHistoryServer.java:80)

        at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)


약간의 구글링을 통해서 위 클래스 이름을 다음과 같이 변경해주어야 한다는 것을 알았다. Ambari 1.6-46, Hadoop 2.4.0.2.1 기준으로 YARN -> Config 에서 "yarn.timeline-service.store-class" 속성의 값을 변경해주면 된다.


org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore


예전에 받아 놓은 HDP 2.1 Sandbox 버전의 경우, 위 속성 값이 org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.LeveldbTimelineStore로 설정되어 있는데, 하둡 버전은 2.4.0.2.1로 동일하다. (같은 하둡 버전인데, 클래스 이름이 다르다는 건 좀...)

반응형

스톰을 사용하는 이유 중 하나를 꼽아보자면, 병렬 처리를 들 수 있을 것 같다. 다중 노드에서 병렬로 연산을 함으로써 대량의 실시간 스트림 데이터를 처리하기 위해 만든 것이 스톰임을 생각해보면, 병렬 처리는 스톰의 주된 사용 이유일 것이다.


스톰의 기반 API의 경우, 아래 코드처럼 태스크의 개수와 쓰레드 개수 등을 설정해서 몇 개의 볼트를 동시에 실행할지 결정했다.


builder.setBolt("word-normalizer", new WordNormalizer(), 4) // 4개의 쓰레드

        .setNumTasks(8) // 8개의 작업 생성

        .shuffleGrouping("word-reader");


스톰의 병렬 힌트와 파티션


스톰 트라이던트 API는 직접 태스크 개수를 지정하는 방식을 사용하지 않고, 병렬 힌트를 주는 방식을 사용하고 있다. 다음은 병렬 힌트 코드의 예를 보여주고 있다.


topology.newStream("log", new LogSpout())

        .each(new Fields("logString"),  new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .parallelismHint(2);


위 코드에서 parallelismHint() 메서드를 볼 수 있는데, 이 메서드는 동시 실행 단위가 되는 파티션을 몇 개 생성할지 결정한다. 예를 들어, 위 코드는 다음과 같이 두 개의 타피션을 생성한다.



스톰은 각 파티션을 별도 쓰레드에서 실행한다. 워커 프로세스가 2개 이상일 경우, 별도 프로세스에 파티션이 실행된다. 파티션의 단위가 변경되는 것을 리파티션이라고 하는데, parallelismHint() 메서드에 의해 생성되는 파티션 적용 범위는 parallelismHint() 메서드 호출 이전에 리파티션이 일어나기 전까지다. 예를 들어, 다음 코드를 보자.


topology.newStream("log", new LogSpout())

        .parallelismHint(1)

        .shuffle()

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .parallelismHint(2);


위 코드를 보면 두 개의 parallelismHint() 메서드를 사용하고 있다. 한 번은 LogSpout 뒤에 설정했고, 한 번은 나머지 세 개 연산 뒤에 설정했다. 이 경우 두 번째 설정한 parallelismHint(2) 메서드의 적용 범위는 첫 번째 parallelismHint(1) 메서드의 이후가 된다. 이 경우 생성되는 파티션은 다음과 같다. 



파티션의 개수가 달라지는 리파티셔닝은 위 코드처럼 parallelismHint()의 개수가 다를 때 발생하며, groupBy()에 의해서도 발생한다. groupBy()는 기본적으로 모든 파티션에서 발생한 튜플을 한 곳으로 모아 그룹핑처리를 한다.


topology.newStream("log", new LogSpout()) // 1개 파티션

        .parallelismHint(1)

        .shuffle()

         // 2개 파티션

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new GroupingValueFunction(), new Fields("productId:time"))

        .parallelismHint(2)

        .groupBy(new Fields("productId:time")) // 1개 파티션

        .aggregate(new CountAggregator(), new Fields("count"))


리파티셔닝과 튜플 분배


파티션 크기가 변경되면, 파티션 간에 데이터를 어떻게 분배할지에 대해 결정해 주어야 한다. 예를 들어, 아래 코드는 LogSpout가 속한 파티션과 이후 세 개의 연산이 속한 파티션의 개수가 다르기 때문에 LogSpout가 생성한 튜플을 세 개의 파티션에 알맞게 분배해줘야 한다. 아래 코드에서는 라인 로빈 방식으로 분배하는 shuffle() 방식을 선택한다.


topology.newStream("log", new LogSpout())

        .parallelismHint(1)

        .shuffle()

        .each(new Fields("logString"), new OrderLogFilter())

        .each(...)

        .each(...)

        .parallelismHint(3)


트라이던트 API가 제공하는 분배 방식은 다음과 같다.

  • shuffle(): 라운드 로빈 방식으로 분배
  • partitionBy(Fields): 필드의 해시값을 이용해서 분배
  • broadcast(): 모든 파티션에 복사
  • global(): 모든 튜플을 한 파티션으로 보낸다.
  • batchGloabl(): 한 배치에 속한 튜플은 한 파티션으로 보낸다.
  • 커스텀 규칙: 직접 분배 규칙을 구현한다.

파티션 로컬 오퍼레이션


스트림에서 다음의 세 연산은 한 파티션에서 실행된다.

  • each(Function)
  • each(Filter)
  • partitionAggregate(Aggregator)
예를 들어, 아래 코드에서 병렬 힌트를 CountSumFunction 다음에 3으로 주긴 했지만, Function과 Filter는 기본적으로 같은 파티션에 포함되기 때문에, [CountSumFunction, ThresholdFilter, AlertFilter]는 한 파티션에 속하며, 따라서 이 쌍으로 세 개의 파티션이 생기게 된다.

.groupBy(new Fields("productId:time"))
.aggregate(new CountAggregator(), new Fields("count"))
.parallelismHint(1).partitionBy(new Fields("productId:time"))
.each(new Fields("productId:time", "count"), new CountSumFunction(), new Fields("sum"))
.parallelismHint(3)
.each(new Fields("productId:time", "sum"), new ThresholdFilter())
.each(new Fields("productId:time", "sum"), new AlertFilter());

파티션 로컬 오퍼레이션에 대해서 파티션을 분리하고 싶다면, 다음과 같이 명시적으로 병렬 힌트를 지정해 주어야 한다. 이 코드는 AlertFilter 다음에 병렬 힌트를 1로 주었다. 따라서, 아래 코드는 CountSumFunction만 속한 파티션 3개와 [ThresholdFilter, AlertFilter]가 속한 파티션 1개를 생성한다.

.groupBy(new Fields("productId:time"))
.aggregate(new CountAggregator(), new Fields("count"))
.parallelismHint(1).partitionBy(new Fields("productId:time"))
.each(new Fields("productId:time", "count"), new CountSumFunction(), new Fields("sum"))
.parallelismHint(3).shuffle()
.each(new Fields("productId:time", "sum"), new ThresholdFilter())
.each(new Fields("productId:time", "sum"), new AlertFilter())
.parallelismHint(1);

한 타피션은 물리적으로 한 볼트에 속한다. 예를 들어, 아래 코드를 보자.

topology.newStream("log", new LogSpout())
        .each(new Fields("logString"), new OrderLogFilter())
        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))
        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))
        .groupBy(new Fields("productId:time"))
        .aggregate(new Fields("productId:time"), new Count(), new Fields("count"))
        .each(new Fields("productId:time", "count"), new CountSumFunction(), new Fields("sum"))
        .each(new Fields("productId:time", "sum"), new ThresholdFilter())
        .each(new Fields("productId:time", "sum"), new AlertFilter());

위 코드에서 리파티셔닝을 하는 연산으로 groupBy()가 존재한다. 따라서, LogSpout부터 AddGroupingValueFunction 까지 파티션이 만들어지고, Count()부터 AlertFilter()까지 파티션이 만들어진다. 이 경우 스톰은 다음과 같이 Spout와 Bold를 구성한다.


참고자료

관련 글:


  1. 무야호 2021.08.06 14:18

    트라이덴트 기본 함수들을 어떻게 사용하는지 몰랐는데 이렇게 자세하게 설명해 주셔서 감사합니다!

+ Recent posts