텐서플로우 윈도우 버전이 나와서 윈도우 8.1 환경에 설치하고 테스트를 해 봤다.


설치 과정에서 참고한 자료는 다음과 같다.

CUDA, cuDNN 설치

GPU 관련 설치 과정을 요약하면 다음과 같다.
  • https://developer.nvidia.com/cuda-downloads 사이트에서 CUDA Toolkit 8 버전 다운받아 설치한다. 용량이 1G를 넘으니 인내심이 필요하다. 기본 설치 경로는 C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v8.0이고, 설치가 끝나면 PATH에 관련 경로가 추가된다.
  • https://developer.nvidia.com/cudnn 사이트에서 cuDNN 다운받는다. 회원 가입을 해야 다운로드 받을 수 있다.  다운 받은 파일을 C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v8.0에 압축풀면 된다.
아나콘다(Anaconda)와 텐서플로우(Tensorflow) 설치

아나콘다를 이용해서 파이선 3.5를 설치했다. https://www.continuum.io/downloads 사이트에서 아나콘다를 다운 받아 설치하면 된다.

GPU 버전 텐서플로우와 CPU 버전의 텐서플로우를 테스트하기 위해 두 개의 가상 환경을 만들었다. 먼저 GPU 버전 텐서플로우를 위한 환경을 만들고 설치에 사용한 명령어는 다음과 같다.

C:\>conda create -n tensorflow-gpu python=3.5
C:\>activate tensorflow-gpu
(tensorflow-gpu) C:\>pip install tensorflow-gpu

다른 창을 열어 CPU 버전 텐서플로우 환경을 생성했다.

C:\>conda create -n tensorflow-cpu python=3.5
C:\>activate tensorflow-cpu
(tensorflow-cpu) C:\>pip install tensorflow

간단한 성능 확인

간단하게 성능을 비교하기 위해 '텐서플로 첫걸음' 책의 CNN 예제를 돌려봤다. 참고로 PC 환경은 다음과 같다. 장비1은 GPU가 있고, 장비2에는 GPU가 없다.
  • 장비1
    • CPU: Intel Core i7-4712MQ CPU @ 2.30 GHz
    • RAM: 16G
    • GPU: GeForce 840M 5.0, memoryClockRate (GHz) 1.124, memory 2.0GiB
  • 장비2
    • CPU: Intel Core i7-6700 CPU @ 3.40 GHz x 8
학습을 1000회 실행하는데, 50회마다 시간을 출력하고 학습이 끝나면 시간을 출력했다. GPU와 CPU에 대해 각 3회씩 실행하고 그 평균을 기록했다.

step GPU (초)
장비1
CPU (초)
장비1

장비1 CPU/GPU (시간 비율)

 CPU2

장비2

0 1.119 0.056

 0.028

50

3.004

7.751 2.58 3.646 
100 4.817 15.417 3.20 7.282 
150 6.632 23.008 3.47 10.922 
200 8.445 30.666 3.63 14.557 
250 10.261 38.283 3.73 18.212 
300 12.077 45.845 3.80 21.833 
350 13.891 53.471 3.85 25.491 
400 15.704 61.094 3.89 29.110 
450 17.518 68.672 3.92 32.755 
500 19.333 76.37 3.95

36.380 

550 21.15 83.973 3.97 40.023 
600 22.964 91.584 3.99 43.640 
650 24.777 99.26 4.01 47.275 
700 26.593 106.839 4.02 50.911 
750 28.409 114.4 4.03 54.559 
800 30.222 122.071 4.04 58.191 
850 32.036 129.627 4.05 61.825 
900 33.852 137.188 4.05 65.461 
950 35.666 144.953 4.06 69.089 
전체 37.471 152.721 4.08

72.702 


실행 결과를 보면 장비1에서 GPU를 사용한 결과가 CPU를 사용한 결과보다 4배 정도 빠른 것을 알 수 있다. 또한, 장비2의 CPU 결과보다도 2배 정도 빠른 것을 알 수 있다.


GPU와 OOM 주의 사항


GPU로 예제를 돌릴 때 다음 코드에서 OOM when allocating tensor with shape[10000,32,28,28]  에러가 발생했다.


print("test accuracy %g"% sess.run(accuracy, feed_dict={

     x: mnist.test.images, y_: mnist.test.labels, keep_prob: 1.0}))


에러가 발생하는 이유는 GPU 메모리가 부족하기 때문이다. 이를 처리하는 가장 쉬운 방법은 mnist.test.images 전체가 아닌 일부만 구해서 정확도를 구하는 것이다.


for i in range(200):

    testSet = mnist.test.next_batch(50)

    print("test accuracy %g" % sess.run(accuracy, 

            feed_dict={ x: testSet[0], y_: testSet[1], keep_prob: 1.0}))



* CNN 테스트에 사용한 코드: https://github.com/madvirus/tfstudy/blob/master/nn/cnn.py



기존 access 로그를 이용해서 데이터를 추출해야 할 일이 생겨서 ELK를 사용했다. 앞으로도 비슷한 요구가 생기면 유사한 설정, 코드를 사용할 것 같아 기록으로 남긴다.


로그스태시(logstash) 설정 파일


여러 웹 어플리케이션의 access 로그 파일을 로그스태시를 이용해서 일레스틱서치에 밀어 넣었다. 각 웹 어플리케이션마다 미세하게 파싱해야 하는 내용이 달라, 로그스태시 설정 파일을 웹 어플리케이션 별로 하나씩 만들었다. 다음은 그 파일 중 하나이다.


input {

  stdin {

    type => "web"

    add_field => {

      appname => "myappname"

    }

  }

}


filter {

  grok {

    match=> { message => "%{IPORHOST:clientip} %{HTTPDUSER:ident} %{HTTPDUSER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{URIPATH:uripath}(?<query>(\?\S+)?)(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response} (?:%{NUMBER:bytes}|-)"}

  }

  date {

    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]

  }

  grok {

    match => {

      uripath => "(/im/(?<context>(ui))/\S*)|(/gcs/(?<context>(\S+))/.*)"

    }

  }

  mutate {

    remove_field => ["query", "uripath"]

    add_field => {

      "methoduri" => "%{verb} %{uripath}"

    }

  }

  metrics {

    meter => "documents"

    add_tag => "metric"

    flush_interval => 60

  }

}


output {

  if "_grokparsefailure" not in [tags] {

    elasticsearch {

      hosts => "일레스틱서버주소"

      flush_size => 20000

    }

  }


  if "metric" in [tags] {

    stdout {

      codec => line {

        format => "1m rate: %{[documents][rate_1m]} ( %{[documents][count]} )"

      }

    }

  }

}


설정에서 특이한 점은 다음과 같다.
  • input의 stdin: 기존 로그 파일을 cat 명령어를 이용해서 logstash에 전달할거라서 stdin을 입력으로 설정했다.
  • filter의 첫 번째 grok: match의 message 패턴으로 로그스태시가 기본 제공하는 HTTPD_COMMONLOG 패턴을 사용하지 않은 이유는 요청 경로에서 쿼리 문자열과 요청 URI를 분리할 필요가 있었기 때문이다. 참고로 HTTPD_COMMONLOG 패턴은 쿼리문자열을 포함한 요청 경로를 request 필드로 추출한다.
  • filter의 date: 로그의 timestamp 필드를 이벤트의 @timestamp 값으로 사용한다. date를 사용하지 않으면 로그스태시가 로그 파일을 넣는 시점을 @timestamp 값으로 사용하기 때문에 기간별 로그 분석을 하기 어려워진다.
  • filter의 mutate: 요청 방식과 요청 URI를 합쳐서 하나로 분석해야 해서 이 둘을 연결한 값을 새로운 필드로 추가한다.
  • filter의 metrics: 로그스태시가 얼마나 처리하는지 보기 위해 메트릭 값을 60초마다 생성한다.
  • output의 elasticsearch: flush_size 값은 일레스틱서치 서버의 상황을 고려해 설정한다. 1000~5000 사이에서 시작해서 테스트하면서 점진적으로 늘려서 처리속도를 높여나간다.
  • output의 stdout: metrics가 생성한 메트릭 값을 콘솔에 출력한다. 초당 몇 개씩 처리하는지 1분 주기로 콘솔에 찍혀서 그나마 덜 심심하다.

로그 밀어넣기 위한 실행 스크립트


기존 로그 파일을 특정 폴더에 모아두고, 그 파일을 로그스태시로 일레스틱서치에 쭈욱 밀어 넣었다. 파일 하나하나 수동으로 하면 힘드니까 다음의 쉘 파일을 만들어서 실행했다.


#!/bin/bash

FILES="./로그폴더/*.log"

for f in $FILES

do

  echo "$f start"

  SECONDS=0

  cat $f | ../logstash/bin/logstash -b 2500 --config app1.logstash.conf

  duration=$SECONDS

  echo "$f done [$(($duration / 60))m $(($duration % 60))s elapsed]"

  echo "---"

  sleep 3

done


logstash의 -b 옵션은 프로세스 파이프라인 배치 크기이다. 파이프라인 워커 개수는 기본이 8이다. 워커개수*배치크기를 elasticsearch의 flush_size와 맞췄다.

테스트하는 동안 심심함도 달래고 성능도 확인할 겸 로그 파일 한 개를 처리하는데 걸리는 시간을 측정해서 값을 출력하도록 했다.


실행 결과


작성한 쉘 스크립트를 돌리면 콘솔에 다음과 같이 시간 값이 찍혀서 덜 심심하다.


$ ./logdump.app1.sh

...

./로그폴더/access_2016_04_06.log start

Settings: Default pipeline workers: 8

Pipeline main started

1m rate: 6336.785481508631 ( 407172 )

1m rate: 6937.548787233175 ( 856873 )

Pipeline main has been shutdown

stopping pipeline {:id=>"main"}

./misweb_in1_rest/access_2016_04_06.log done [2m 17s elapsed]

...

...


일단 1-2개 로그 파일로 작업해보고 퇴근 전에 nohup을 걸어서 나머지를 밀어 넣었다.


$ nohup ./logdump.app1.sh > dump.log 2>&1 &



요즘 스파크를 공부중인데, 독립 모드 클러스터를 설치하고 테스트하고 싶어졌다. 마음대로 쓸 수 있는 장비가 없어서 Vagrant를 이용하기로 했다. Vagrant로 리눅스 VM 4개를 생성하고, 4대에 스파크를 설치해서 독립 모드로 클러스터를 만들어봤다.


이 글은 Virtualbox와 Vagrant를 이미 설치했다는 가정하에 진행한다.


1. 준비


Vagrant 설정 파일이 위치할 폴더를 생성한다. 이 글에선 c:\work\spark-vagrant 폴더를 사용한다. 이 폴더를 만들었다면 다음의 두 파일을 c:\work\spark-vagrant\programs 폴더에 다운로드한다.

  • 리눅스용 JDK 1.8 - 이 글에서 다운로드 받은 파일은 jdk-8u73-linux-x64.gz이다.
  • 스파크 1.6.0 - 이 글에서 다운로드 받은 파일은 spark-1.6.0-bin-hadoop2.6.tgz이다.


2. 4개 VM을 위한 Vagrant 설정


Vagrant를 이용해서 VM을 생성할 때 필요한 파일을 만든다.


먼저 vagrant의 insecure_private_key 파일을 복사한다. 이 파일은 ssh 연결 용으로 사용한다.


copy c:\Users\로그인계정\.vagrant.d\insecure_private_key c:\work\spark-vagrant


다음으로 c:\work\spark-vagrant\hosts 파일을 생성한다.


127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4

::1         localhost localhost.localdomain localhost6 localhost6.localdomain6

192.168.67.101 c6701.spark.apache.org c6701

192.168.67.102 c6702.spark.apache.org c6702

192.168.67.103 c6703.spark.apache.org c6703

192.168.67.104 c6704.spark.apache.org c6704

192.168.67.105 c6705.spark.apache.org c6705

192.168.67.106 c6706.spark.apache.org c6706

192.168.67.107 c6707.spark.apache.org c6707

192.168.67.108 c6708.spark.apache.org c6708

192.168.67.109 c6709.spark.apache.org c6709

192.168.67.110 c6710.spark.apache.org c6710


192.168.67.101부터 192.168.67.110까지 설정은 본인 C:\Windows\System32\drivers\etc\hosts 파일에도 동일하게 추가한다. 


c:\work\spark-vagrant\resolv.conf 파일을 다음과 같이 생성한다.


; generated by /sbin/dhclient-script

search spark.apache.org

nameserver 8.8.8.8


c:\work\spark-vagrant\bootstrap.sh 파일을 다음과 같이 생성한다.


#!/usr/bin/env bash


cp /vagrant/hosts /etc/hosts

cp /vagrant/resolv.conf /etc/resolv.conf

yum install ntp -y

service ntpd start

service iptables stop

chkconfig ntpd on

chkconfig iptables off

mkdir -p /root/.ssh; chmod 600 /root/.ssh; cp /home/vagrant/.ssh/authorized_keys /root/.ssh/

cp /vagrant/insecure_private_key /root/.ssh/id_rsa; chmod 600 /root/.ssh/id_rsa


#Again, stopping iptables

/etc/init.d/iptables stop


# Increasing swap space

sudo dd if=/dev/zero of=/swapfile bs=1024 count=3072k

sudo mkswap /swapfile

sudo swapon /swapfile

echo "/swapfile       none    swap    sw      0       0" >> /etc/fstab


마지막으로 c:\work\spark-vagrant\Vagrantfile 파일을 작성한다.


# -*- mode: ruby -*-

# vi: set ft=ruby :


# Vagrantfile API/syntax version. Don't touch unless you know what you're doing!

VAGRANTFILE_API_VERSION = "2"


Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|

  config.vm.box = "centos6.7"

  config.ssh.insert_key = false


  # CentOS 6.7 x86_64, 아래는 한 줄임

  config.vm.box_url = "https://github.com/CommanderK5/packer-centos-template/releases/download/0.6.7/vagrant-centos-6.7.box"


  config.vm.provider :virtualbox do |vb|

    vb.customize ["modifyvm", :id, "--memory", 2048] # RAM allocated to each VM

  end


  config.vm.provision :shell, :path => "bootstrap.sh"


  config.vm.define :c6701 do |c6701|

    c6701.vm.hostname = "c6701.spark.apache.org"

    c6701.vm.network :private_network, ip: "192.168.67.101"

  end


  config.vm.define :c6702 do |c6702|

    c6702.vm.hostname = "c6702.spark.apache.org"

    c6702.vm.network :private_network, ip: "192.168.67.102"

  end


  config.vm.define :c6703 do |c6703|

    c6703.vm.hostname = "c6703.spark.apache.org"

    c6703.vm.network :private_network, ip: "192.168.67.103"

  end


  config.vm.define :c6704 do |c6704|

    c6704.vm.hostname = "c6704.spark.apache.org"

    c6704.vm.network :private_network, ip: "192.168.67.104"

  end


end



3. VM 실행


명령프롬프트를 실행하고 c:\work\spark-vagrant 폴더로 이동해서 vagrant 명령어로 VM을 실행한다.


C:\work\spark-vagrant>vagrant up


4개 VM을 모두 만들었다면 vagrant ssh 명령어로 각 VM에 ssh로 연결되는지 확인한다.


C:\work\spark-vagrant>vagrant ssh c6701

[vagrant@c6701 ~]$ exit

logout

Connection to 127.0.0.1 closed.


C:\work\spark-vagrant>vagrant ssh c6702

[vagrant@c6702 ~]$ exit

logout

Connection to 127.0.0.1 closed.


c6701, c6702, c6703, c6704에 대해 각각 확인한다.


4. 각 VM에 암호 없이 ssh 연결할 수 있는지 확인


앞서 작성한 bootstrap.sh 파일을 보면 insecure_private_key 파일을 root 계정의 인증키로 복사한 것을 알 수 있다. 이를 한 이유는 root 계정으로 c6701, c6702, c6703, c6704 간에 암호 없이 ssh로 연결하기 위함이다. 다음과 같이 su 명령어로 root 계정으로 바꾼 뒤, ssh를 이용해서 다른 호스트에 암호 없이 연결되는지 확인한다.


C:\work\spark-vagrant>vagrant ssh c6701

Last login: Tue Mar  8 05:48:27 2016 from 10.0.2.2

[vagrant@c6701 ~]$ su -

Password: (암호는 vagrant)

[root@c6701 ~]# ssh c6702

The authenticity of host 'c6702 (192.168.67.102)' can't be established.

RSA key fingerprint is 5c:97:4b:96:a2:41:a8:44:cc:70:b1:5e:8d:a7:a5:3b.

Are you sure you want to continue connecting (yes/no)? yes

Warning: Permanently added 'c6702,192.168.67.102' (RSA) to the list of known hosts.

[root@c6702 ~]#



5. 각 VM에 JDK와 스파크 설치


c:\work\spark-vagrant 폴더는 /vagrant로 마운트되므로, 다음 명령어를 다운로드 받은 JDK 파일의 압축을 /usr/local에 푼다.


[root@c6701 ~]# ssh c6701 'tar xzf /vagrant/programs/jdk-8u73-linux-x64.gz -C /usr/local'

[root@c6701 ~]# ssh c6702 'tar xzf /vagrant/programs/jdk-8u73-linux-x64.gz -C /usr/local'

[root@c6701 ~]# ssh c6703 'tar xzf /vagrant/programs/jdk-8u73-linux-x64.gz -C /usr/local'

[root@c6701 ~]# ssh c6704 'tar xzf /vagrant/programs/jdk-8u73-linux-x64.gz -C /usr/local'


압축을 풀었다면, java가 제대로 설치됐는지 확인한다.


[root@c6701 ~]# /usr/local/jdk1.8.0_73/bin/java -version

java version "1.8.0_73"

Java(TM) SE Runtime Environment (build 1.8.0_73-b02)

Java HotSpot(TM) 64-Bit Server VM (build 25.73-b02, mixed mode)


비슷한 방식으로 스파크 파일의 압축을 /usr/local에 푼다.


[root@c6701 ~]# ssh c6701 'tar xzf /vagrant/programs/spark-1.6.0-bin-hadoop2.6.tgz -C /usr/local'

[root@c6701 ~]# ssh c6702 'tar xzf /vagrant/programs/spark-1.6.0-bin-hadoop2.6.tgz -C /usr/local'

[root@c6701 ~]# ssh c6703 'tar xzf /vagrant/programs/spark-1.6.0-bin-hadoop2.6.tgz -C /usr/local'

[root@c6701 ~]# ssh c6704 'tar xzf /vagrant/programs/spark-1.6.0-bin-hadoop2.6.tgz -C /usr/local'


6. spark-env.sh 파일 작성


각 VM(c6701, c6702, c6703, c6704)에서 /usr/local/spark-1.6.0-bin-hadoop2.6/conf 폴더로 이동한 뒤 spark-env.sh 파일을 작성한다. 먼저 spark-env.sh.template 파일을 복사해서 spark-env.sh 파일을 생성한다.


[root@c6701 ~]# cd /usr/local/spark-1.6.0-bin-hadoop2.6/conf

[root@c6701 conf]# cp spark-env.sh.template spark-env.sh


spark-env.sh 파일의 맨 마지막에 JAVA_HOME 환경변수를 추가한다.


# spark-env.sh 파일 마지막에 추가한다.

JAVA_HOME=/usr/local/jdk1.8.0_73


c6701에 spark-env.sh을 만든 뒤, 다음과 같이 scp를 이용해서 복사해도 된다.


[root@c6701 conf]# scp spark-env.sh root@c6702:/usr/local/spark-1.6.0-bin-hadoop2.6/conf

spark-env.sh     100% 4243     4.1KB/s   00:00



7. slaves 파일 작성


slaves 파일은 마스터 역할을 할 c6701 장비에만 작성한다. /usr/local/spark-1.6.0-bin-hadoop2.6/conf 위치에 다음과 같이 slaves 파일을 생성한다.


c6702

c6703

c6704



8. 스파크 단독 모드 클러스터 실행


자, 이제 클러스터를 실행할 차례이다. c6701의 /usr/local/spark-1.6.0-bin-hadoop2.6 에서 sbin/start-all.sh을 이용해서 마스터와 워커를 실행한다.


[root@c6701 spark-1.6.0-bin-hadoop2.6]# sbin/start-all.sh

starting org.apache.spark.deploy.master.Master, logging to /usr/local/spark-1.6.0-bin-hadoop2.6/logs/spark-root-org.apache.spark.deploy.master.Master-1-c6701.spark.apache.org.out

c6702: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/spark-1.6.0-bin-hadoop2.6/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-c6702.spark.apache.org.out

c6704: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/spark-1.6.0-bin-hadoop2.6/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-c6704.spark.apache.org.out

c6703: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/spark-1.6.0-bin-hadoop2.6/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-c6703.spark.apache.org.out

[root@c6701 spark-1.6.0-bin-hadoop2.6]#


마스터와 워커를 실행했으니 이제 확인할 차례다. http://c6701.spark.apache.org:8080 주소를 웹 브라우저에 입력해보자. 잠시 후 다음과 같은 결과 화면을 볼 수 있을 것이다.



9. 스파크 클러스터 종료와 vagrant 종료


sbin/stop-all.sh을 사용하면 스파크 클러스터를 종료한다.


VM에서 로그아웃한 뒤, 명령 프롬프트에서 vagrant halt 명령어를 실행하면 전체 VM을 종료한다.






러닝 스파크(Learning Spark) 책의 2장을 따라하는데, 윈도우에서 spark-shell을 실행하면 다음과 같은 이상한 에러가 발생한다.


C:\devtool\spark-1.6.0-bin-hadoop2.6>bin\spark-shell

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).

log4j:WARN Please initialize the log4j system properly.

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties

To adjust logging level use sc.setLogLevel("INFO")

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0

      /_/


Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_51)

...

16/02/17 13:09:58 WARN : Your hostname, user resolves to a loopback/non-reachable address: fe80:0:0:0:35a2:f2cc:2a09:416

e%eth8, but we couldn't find any external IP address!

java.lang.RuntimeException: java.lang.RuntimeException: java.io.IOException: 액세스가 거부되었습니다

        at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)

        at org.apache.spark.sql.hive.client.ClientWrapper.<init>(ClientWrapper.scala:194)

        ...(너무 길어 생략)

Caused by: java.io.IOException: 액세스가 거부되었습니다

        at java.io.WinNTFileSystem.createFileExclusively(Native Method)

        at java.io.File.createTempFile(File.java:2024)

        at org.apache.hadoop.hive.ql.session.SessionState.createTempFile(SessionState.java:818)

        at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:513)

        ... 62 more


<console>:16: error: not found: value sqlContext

         import sqlContext.implicits._

                ^

<console>:16: error: not found: value sqlContext

         import sqlContext.sql

                ^


scala>


이 에러가 발생하지 않도록 하려면 다음의 두 가지를 해 주면 된다.

  1. winutils.exe 파일을 c:\hadoop\bin에 복사한다. c:\haoop을 HADOOP_HOME 환경 변수로 설정한다. (winutils.exe 파일은 https://github.com/steveloughran/winutils/tree/master/hadoop-2.6.0/bin 에서 다운로드 받을 수 있다)
  2. 관리자 권한으로 명령 프롬프트를 열고 "C:\Hadoop\bin\winutils.exe chmod 777 /tmp/hive" 명령어를 실행한다.

이제 관리자 권한으로 실행한 명령 프롬프트에서 bin\spark-shell을 실행한다. 다음과 같이 정상으로 실행되는 것을 확인할 수 있다.


C:\devtool\spark-1.6.0-bin-hadoop2.6>bin\spark-shell

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).

log4j:WARN Please initialize the log4j system properly.

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties

To adjust logging level use sc.setLogLevel("INFO")

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0

      /_/


Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_51)

Type in expressions to have them evaluated.

Type :help for more information.

Spark context available as sc.

16/02/17 13:20:05 WARN General: Plugin (Bundle) "org.datanucleus" is already registered. Ensure you dont have multiple J

...생략

16/02/17 13:20:12 WARN : Your hostname, user resolves to a loopback/non-reachable address: fe80:0:0:0:35a2:f2cc:2a09:416

e%eth8, but we couldn't find any external IP address!

...생략

16/02/17 13:20:18 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException

SQL context available as sqlContext.


scala> val lines = sc.textFile("README.md")

lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:27


scala>




암바리(https://ambari.apache.org/)는 오픈소스 하둡 관리 도구이다. 설치부터, 설정 관리, 오퍼레이션, 모니터링, 통지 등 하둡 클러스터를 관리하는데 필요한 기본 기능을 제공한다. 암바리를 사용하다보면 레거시 환경의 통지 시스템을 이용해서 암바리의 문제 상황을 문자나 메신저 등으로 통지 받고 싶을 때가 있다. 하지만 아쉽게도 암바리는 이메일과 SNMP 트랩만 이용해서 문제를 통지하기 때문에, 암바리 Alert을 레거시 환경에 바로 붙일 수 없다.


왓츠업과 같이 SNMP를 지원하는 도구를 이용해서 이미 모니터링을 하고 있다면 이를 사용하면 된다. SNMP 기반의 모니터링 도구가 없거나 적합하지 않은 경우에는 암바리가 발생하는 SNMP 트랩을 직접 수신해서 처리하는 방식으로 레거시와 연동을 처리해야 한다.


[암바리 SNMP Alert 설정 화면]


암바리의 Alert SNMP 트랩 구성


암바리는 Alert 발생시 SNMP Alert 설정 화면에서 입력한 정보를 이용해서 트랩을 발생시킨다.

  • OID : 1.3.6.1.6.3.1.1.5.4와 같은 OID 문자열
  • Hosts : 트랩을 수신할 SNMP 트랩 리시버 서버의 호스트
  • Port : 리시버 서버가 사용할 포트 번호 (UDP 포트)
OID 값으로 "1.3.6.1.6.3.1.1.5.4"를 입력한 경우 트랩은 다음과 같이 구성된다.
  • OID(1.3.6.1.6.3.1.1.4.1.0), OBJECT IDENTIFIER[6] = 1.3.6.1.6.3.1.1.5.4
  • OID(1.3.6.1.6.3.1.1.5.4), OCTET STRING[4] = 상세메시지(body)
  • OID(1.3.6.1.6.3.1.1.5.4), OCTET STRING[4] = 요약메시지(subject)

상세 메시지는 몸체 내용에 해당하며, 다음과 같은 구조의 메시지를 갖는다. (마지막 줄에 공백 문자가 존재한다.)


\n

[Alert] Supervisor Process\n

[Service] STORM\n

[Component] SUPERVISOR\n

[Host] skt-phddtn3\n

\n

TCP OK - 0.000s response on port 56431\n

    


요약 메시지는 다음과 같은 구조를 갖는다.(마지막 줄에 공백 문자가 존재한다.)


\n

      [OK] Supervisor Process\n

    


요약 메시지는 Alert의 심각도로 시작한다. 심각도에는 OK, WARNING, CRITICAL, UNKNOWN이 존재한다.


트랩 리시버는 OID로부터 값을 읽어와 메시지를 알맞게 변환한 뒤 심각도에 따라 알맞은 처리를 하면 된다. 예를 들어, 심각도가 CRITICAL이면 SMS로 문자를 발송하고, WARNING이면 메신저로 메시지를 보내는 등의 작업을 하면 된다.


SNMP4J를 이용한 암바리 트랩 수신


트랩을 받기 위한 SNMP 도구가 따로 없다면 직접 트랩 리시버를 구현해야 한다. 다행히 SNMP 자체를 잘 몰라도 SNMP4J(http://www.snmp4j.org/)를 사용하면 쉽게 암바리 Alert 처리를 위한 트랩 리서버를 구현할 수 있다. 다음은 SNMP4J를 이용한 트랩 리시버를 구현한 코드 예이다.


public class TrapReceiver {

    private int port;

    private MultiThreadedMessageDispatcher dispatcher;

    private Snmp snmp = null;

    private Address listenAddress;

    private ThreadPool threadPool;


    private NotificationHandler notificationHandler;

    private int threadPoolSize;


    public TrapReceiver(int port) {

        this.port = port;

        this.threadPoolSize = 10;

    }


    public void init() throws IOException {

        threadPool = ThreadPool.create("Trap", threadPoolSize);

        dispatcher = new MultiThreadedMessageDispatcher(

              threadPool, new MessageDispatcherImpl());

        listenAddress = GenericAddress.parse("udp:0.0.0.0/" + port);

        TransportMapping<?> transport = 

                new DefaultUdpTransportMapping((UdpAddress) listenAddress);


        snmp = new Snmp(dispatcher, transport);

        snmp.getMessageDispatcher().addMessageProcessingModel(new MPv1());

        snmp.getMessageDispatcher().addMessageProcessingModel(new MPv2c());


        snmp.listen();


        snmp.addCommandResponder(new CommandResponder() {

            @Override

            public void processPdu(CommandResponderEvent event) {

                handleNotification(event);

            }

        });

    }


    private void handleNotification(CommandResponderEvent event) {

        if (notificationHandler == null) {

            return;

        }

        notificationHandler.handle(createNotification(event));

    }


    private Notification createNotification(CommandResponderEvent event) {

        OID trapOID = (OID) event.getPDU().getVariable(SnmpConstants.snmpTrapOID);

        List<String> values = new ArrayList<>(2);

        Vector<? extends VariableBinding> varBinds = event.getPDU().getVariableBindings();

        if (varBinds != null && !varBinds.isEmpty()) {

            Iterator<? extends VariableBinding> varIter = varBinds.iterator();

            while (varIter.hasNext()) {

                VariableBinding var = varIter.next();

                if (var.getOid().equals(trapOID)) {

                    values.add(var.getVariable().toString());

                }

            }

        }

        return new Notification(values.get(0), values.get(1));

    }


    public void close() {

        try {

            snmp.close();

        } catch (IOException e) {

        }

        threadPool.stop();

    }


    public void setNotificationHandler(NotificationHandler notificationHandler) {

        this.notificationHandler = notificationHandler;

    }

}


이 코드에서 createNotification() 메서드가 트랩 메시지를 처리하는 코드이다. 암바리가 생성하는 트랩은 지정한 OID에 해당하는 첫 번째 변수에 상세 내용(몸체), 두 번째 변수에 요약 내용(제목)을 메시지로 전달한다. 따라서, 지정 OID를 갖는 메시지 값을 차례대로 저장한 뒤 각각 순서대로 몸체와 제목으로 사용하면 된다. 제목은 [OK], [CRITICAL]는 심각도를 포함하고 있으므로 이 값을 이용해서 트랩을 어떻게 처리할지 결정할 수 있다.


몸체 메시지에는 서비스 종류, 호스트 정보 등이 포함되어 있으므로 이 정보를 사용해서 서비스 담당자별로 통지하거나 장비 담당자에게 메시지를 보내는 것과 같은 처리를 할 수 있다.


스톰은 토폴로지가 메트릭을 만들 때 필요한 API를 제공하고 있다. 이 API를 사용하면 내가 만든 토폴로지에서 원하는 메트릭을 발생하고 이를 쉽게 처리할 수 있다.


IMetric으로 토폴로지에서 메트릭 발생하기


토폴로지에서 메트릭을 발생시키는 방법은 다음과 같이 간단하다.

  1. 메트릭을 발생시키고 싶은 Bolt의 prepare() 메서드 또는 Spout의 open() 메서드에서 IMetric을 구현한 객체를 생성하고 메트릭으로 등록한다.
  2. 1에서 생성한 IMetric 객체를 이용해서 필요할 때 메트릭 값을 조정한다.

다음은 스톰 스타터 샘플에 있는 Bold에 메트릭 생성 기능을 추가한 예이다.


public class ExclamationBolt extends BaseRichBolt {

    OutputCollector _collector;


    transient CountMetric countMetric;

    transient MultiCountMetric wordCountMetric;


    @Override

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

        _collector = collector;


        // IMetric을 구현한 객체를 생성하고

        countMetric = new CountMetric();

        wordCountMetric = new MultiCountMetric();


        // context에 메트릭을 등록

        context.registerMetric("word_count", wordCountMetric, 10);

        context.registerMetric("execute_count", countMetric, 20);

    }


    @Override

    public void execute(Tuple tuple) {

        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));

        _collector.ack(tuple);


        String word = tuple.getString(0);


        // 튜플 처리할 때 마다 원하는 메트릭 값을 조정

        countMetric.incr();

        wordCountMetric.scope(word).incr();

    }


    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("word"));

    }


}


이 코드에서 context.registerMetric() 메서드의 파라미터는 순서대로 메트릭 이름, 메트릭 객체, 메트릭 버킷 크기를 의미한다. 메트릭 버킷 크기는 초 단위이다. 위 코드에서 "word_count" 메트릭 이름은 10초 메트릭 버킷 크기를 설정했는데, 이 경우 스톰은 10초 마다 해당 메트릭 객체로부터 값을 읽어오고 메트릭을 리셋한다.(정확하게는 10초 마다 메트릭 객체의 getValueAndReset() 메서드를 호출한다.)


예를 들어, 0-10초 사이에 countMetric.incr()을 4번 호출했고, 10-20초 사이에 countMetric.incr()을 5번 호출했다고 하자. 이 경우 스톰이 읽어가는 메트릭 값은 10초에 4이고 20초에 5가 된다.


IMetricsConsumer로 메트릭 수집하기


스톰은 메트릭 객체에서 값을 읽어와 그 값을 IMetricsConsumer에 전달한다. 즉, 스톰 토폴로지가 발생한 메트릭을 알맞게 처리하고 싶다면 IMetricsConsumer를 구현한 클래스를 작성하면 된다.


IMetricsConsumer 인터페이스는 다음과 같이 정의되어 있다.


public interface IMetricsConsumer {

    void prepare(Map stormConf, Object registrationArgument, 

            TopologyContext context, IErrorReporter errorReporter);

    void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints);

    void cleanup();

}


각 메서드는 다음과 같다.

  • prepare() : DB나 소켓 연결처럼 메트릭 처리에 필요한 사전 준비를 한다.
  • handleDataPoints() : 토폴로지에서 읽어온 메트릭을 처리한다.
  • cleanup() : DB나 소켓 등 종료 처리를 한다.

토폴로지 이름이 필요한 경우 prepare() 메서드에서 다음과 같은 방법으로 토폴로지 이름을 구할 수 있다.


import backtype.storm.Config;

import backtype.storm.metric.api.IMetricsConsumer;

import backtype.storm.task.IErrorReporter;

import backtype.storm.task.TopologyContext;


public class MetricsCollector implements IMetricsConsumer {

    public static final Logger LOG = LoggerFactory.getLogger(MetricsCollector.class);;


    private String topologyName;


    private DwmonMetricSender metricSender;


    @Override

    public void prepare(Map stormConf, Object registrationArgument, 

            TopologyContext context, IErrorReporter errorReporter) {

        LOG.info("Preparing Metric Collector");


        topologyName = (String) stormConf.get(Config.TOPOLOGY_NAME);

    }


handleDataPoints()에서 메트릭 값 처리하기


스톰은 토폴로지가 등록한 메트릭에서 주기적으로 값을 읽어와 IMetricsConsumer#handleDataPoints() 메서드에서 전달하므로, 이 메서드에서 메트릭을 알맞게 처리하면 된다.


public class MetricsCollector implements IMetricsConsumer {


    @Override

    public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {

        // taskInfo.srcWorkerHost: String - 워커 호스트

        // taskInfo.srcWorkerPort: int - 워커 포트 

        // taskInfo.srcComponentId: String - 컴포넌트 ID

        // taskInfo.srcTaskId: int - 태스크 ID

        // taskInfo.timestamp: long - 메트릭 수집 시간(long, 단위 초)

        // taskInfo.updateIntervalSecs: long - 메트릭 수집 주기(단위 초)


        for (DataPoint dp : dataPoints) {

            // dp.name: String - 메트릭 이름

            // dp.value: Object - 메트릭 값, IMetric 구현마다 값 타입이 다름

            // 

        }

    }



IMetric 타입별로 값 타입이 다른데, 주요 구현체의 값 타입은 다음과 같다.

  • CountMetric - Long
  • MultiCountMetric - Map<String, Long>

스톰은 토폴로지에서 직접 발생하는 메트릭뿐만 아니라 스톰이 토폴로지와 관련해서 발생시키는 메트릭 값도 IMectirsConcumer에 전달한다. 따라서 IMectirsConcumer는 처리하려는 메트릭을 알맞게 걸러내야 한다. 스톰 자체 메트릭인 경우 메트릭 이름이 "__", "GC/", "memory/"와 같은 값으로 시작하므로, 토폴로지에서 직접 생성하는 메트릭만 처리하고 싶다면 이 이름을 가진 메트릭을 제외한 나머지 메트릭만 처리하면 된다.


스톰 설정에서 IMetricsConsumer 구현 클래스 등록하기


이제 남은 것은 스톰이 토폴로지가 발생한 메트릭을 IMetricsConsumer에 전달하도록 설정하는 것이다. 다음의 두 가지 방법 중 하나를 이용해서 설정할 수 있다.

  • storm.yaml 파일에 설정하기
  • 토폴로지 코드에서 설정하기
storm.yaml 파일에 설정하려면, 다음과 같이 topology.metrics.consumer.register에 구현 클래스를 지정하면 된다.

topology.metrics.consumer.register:
  - class: "com.mycomp.storm.metrics.MetricsCollector"
    parallelism.hint: 1

토폴로지 코드에서 직접 지정하고 싶다면 다음 코드처럼 Config#registerMetricsConsumer()를 이용해서 지정하면 된다.


public static void main(String[] args) throws Exception {

    TopologyBuilder builder = new TopologyBuilder();


    builder.setSpout("word", new TestWordSpout(), 4);

    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");

    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");


    Config conf = new Config();

    conf.registerMetricsConsumer(MetricsCollector.class, 1);

    conf.setDebug(true);

    if (args != null && args.length > 0) {

        conf.setNumWorkers(6);

        StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());

    } else {

        LocalCluster cluster = new LocalCluster();

        cluster.submitTopology("test", conf, builder.createTopology());

        Utils.sleep(60000);

        cluster.killTopology("test");

        cluster.shutdown();

    }

}



Flume에 대한 커스텀 메트릭을 구현할 일이 생겨 그 내용을 정리한다.


구조


Flume의 메트릭을 가져와 처리하려면 Flume이 제공하는 MonitorService 인터페이스를 상속받아 구현하면 된다.




MonitorService 구현하기


MonitorService를 상속받은 클래스는 다음과 같이 세 개의 메서드만 알맞게 구현하면 된다.

  • configure() : 초기화한다.
  • start() : 메트릭을 처리하는 작업을 별도 쓰레드로 시작한다.
  • stop() : 메트릭 처리 작업을 중지한다.
뼈대 코드는 다음과 같다.

public class CustomMonitorService implements MonitorService {

    @Override
    public void configure(Context context) {
        // 초기화
    }

    @Override
    public void start() {
        // 별도 쓰레드를 이용해서 MetricReporter 실행
    }

    @Override
    public void stop() {
        // MetricReporter 실행 중지
    }
}

configure() 구현

configure() 메서드는 필요한 설정 정보를 읽어와 초기화 작업을 수행한다. 아래 코드는 구현 예시이다.

public class CustomMonitorService implements MonitorService {
    private int intervalSec;
    private String hosts;

    @Override
    public void configure(Context context) {
        // "flume.monitoring.pollInterval" 시스템 프로퍼티 값을 읽음. 없을 경우 기본 값으로 60 사용
        this.intervalSec = context.getInteger("pollInterval", 60);
        this.hosts = System.getProperty("custom.collector.hosts");
    }


Context는 "flume.monitoring."로 시작하는 시스템 프로퍼티의 값을 읽어오는 기능을 제공한다. 위 코드에서 context.getInteger("pollInterval")은 flume.monitoring.pollInterval 시스템 프로퍼티 값을 읽어와 int 타입으로 변환한다. 해당 프로퍼티가 존재하지 않으면 60을 리턴한다.

사용할 시스템 프로퍼티가 "flume.monitoring."로 시작하지 않으면 System.getProperty() 메서드나 기타 다른 방법을 사용해서 필요한 설정 값을 직접 읽어온다.

start()와 stop() 구현

Flume은 configure() 메서드로 초기화를 한 뒤 start() 메서드를 실행한다. start() 메서드는 별도 쓰레드로 리포팅 기능을 실행하도록 구현하면 된다. 주기적으로 리포팅 기능을 실행할 경우 ExecutorService를 사용하면 편리하다.


public class CustomMonitorService implements MonitorService {
    private ScheduledExecutorService scheduledExecutorService;

    private int pollInterval;
    private String hosts;
    private MetricReporter metricReporter; // Runnable 구현

    @Override
    public void start() {
        if (scheduledExecutorService == null || scheduledExecutorService.isShutdown() ||
                    scheduledExecutorService.isTerminated()) {
            scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        }
        metricReporter = new MetricReporter(hosts);
        scheduledExecutorService.scheduleWithFixedDelay(
                metricReporter,
                pollingIntervalSec,
                pollingIntervalSec,
                TimeUnit.SECONDS);
    }

    @Override
    public void stop() {
        scheduledExecutorService.shutdown();
    }
    ...

주기적으로 메트릭 리포터 기능을 실행하기 위해 ScheduledExecutorService를 사용했다. start() 메서드에서 ExecutorService를 생성하고 scheduleWithFixedDelay() 메서드로 지정한 주기로 리포팅 기능을 실행하도록 했다.


MetricReporter 구현


MetricReporter는 ExecutorService를 이용해서 실행할 것이므로 Runnable을 구현한다. run() 메서드에서 메트릭을 읽어와 알맞은 처리를 하면 된다. 다음은 간단한 구현 예제이다.


import org.apache.flume.instrumentation.util.JMXPollUtil;


public class MetricReporter implements Runnable {


    private final String hosts;


    public FlumemonReporter(String hosts) {

        this.hosts = hosts;

    }


    @Override

    public void run() {

        try {

            List<String> metrics = new ArrayList<>();

            Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans();

            for (String component : metricsMap.keySet()) {

                Map<String, String> attributeMap = metricsMap.get(component);

                if (component.startsWith("CHANNEL.")) {

                    String name = "EventPutAttemptCount";

                    String value = attributeMap.get(name);

                    metrics.add(component + ", " + name + "=" + value);

                }

            }

            metrics.forEach(System.out::println);

        } catch (Exception e) {

        }

    }


}


JMXPollUtil.getAllMBeans()를 사용하면 flume이 제공하는 메트릭을 Map 형태로 구할 수 있다. 이 Map의 키는 컴포넌트 이름에 해당하는데, flume의 SOURCE, CHANNEL, SINK가 컴포넌트가 된다. 컴포넌트 이름은 "타입.이름"의 형태를 취한다.


metricsMap.get()은 개별 컴포넌트의 메트릭 목록을 맵 형태로 다시 구한다. 각 컴포넌트별로 제공하는 메트릭 목록이 다르다. 예를 들어, 채널의 경우 "EventPutAttemptCount", "EventPutSuccessCount"와 같은 메트릭을 제공한다. 각 메트릭 값은 String 타입으로 읽어오기 때문에, 필요시 Long이나 Integer와 같은 타입으로 알맞게 변환해주어야 한다.


Flume 실행시 MonitorService 지정하기


MonitorService를 구현했다면, 이제 남은 작업은 Flume을 실행할 때 구현한 클래스를 MonitorService로 사용하도록 지정하는 것이다. 다음과 같이 flume.monitoring.type 시스템 프로퍼터에 구현 클래스를 지정하면 된다.


bin/flume-ng agent --conf-file example.conf --name a1 \

   -Dflume.monitoring.type=com.mycomp.MetricReporter -Dflume.monitoring.hosts=somehost


이 외에 구현 클래스에서 필요로 하는 시스템 프로퍼티를 -D 옵션으로 추가 지정한다. 물론, 구현한 클래스가 flume이 사용하는 클래스패스에 위치해야 한다.


Kafka의 메트릭 리포트 부분을 확장할 일이 있어 메트릭 수집 관련 부분을 조사했는데, 그 내용을 정리한다.


구조


Kafka의 메트릭 리포팅과 관련된 클래스 구조는 아래 그림과 같다.



메트릭 리포팅 관련 타입은 크게 네 종류로 다음과 같다.

  • 카프카의 기반 타입
    • KafkaMetricsReporterMBean : 카프카 메트릭 리포터 MBean을 위한 기반 타입이다. 리포터의 시작/종료를 JMX로 조작하고 싶은 경우 이 타입을 상속받은 MBean 인터페이스를 만든다.
    • KafkaMetricsReporter : Kafka가 메트릭 리포터로 사용할 클래스가 구현할 타입이다.
  • Metrics의 기반 타입
    • AbstractReporter / AbstractPollingReporter : Metrics의 메트릭을 리포팅 할 리포터가 상속받을 클래스이다. 주기적으로 메트릭 값을 처리하려면 AbstractPollingReporter를 상속받아 구현한다.
    • MetricProcessor : Metrics의 메트리 종류별로 값을 받아 처리할 처리기가 구현해야 할 인터페이스이다.

Metrics는 메트릭 생성/관리를 위한 도구로 http://metrics.dropwizard.io/3.1.0/ 사이트를 참조한다.


Kafka로부터 메트릭을 받아 원하는 동작을 하고 싶다면 위 클래스 다이어그램에서 노란 색으로 표시한 타입을 알맞게 구현하면 된다. 그림에서 보는 것처럼 Kafka의 리포팅 관련 상위 타입을 구현한 타입 두 개와 Metrics와 관련된 타입 두 개를 구현하면 된다.


Kafka 리포터 구현


구현할 타입은 다음과 같다.

  • KafkaMetricsReporterMBean 인터페이스를 상속받은 CustomMetricsReporterMBean 인터페이스
  • KafkaMetricsReporter 인터페이스와 CustomMetricsReporterMBean 인터페이스를 상속받은 클래스
먼저 CustomReporterMBean 인터페이스는 다음과 같이 간단하다.

import kafka.metrics.KafkaMetricsReporterMBean;

public interface CustomMetricsReporterMBean extends KafkaMetricsReporterMBean {
}

MBean으로 노출하고 싶은 오퍼레이션이 추가로 있다면 CustomMetricsReporterMBean에 추가한다.


다음으로 KafkaMetricsReporter 인터페이스와 CustomMetricsReporterMBean 인터페이스를 상속받은 CustomMetricsReporterMBean 클래스를 구현한다. 이 클래스의 뼈대 코드는 다음과 같다.


import kafka.metrics.KafkaMetricsReporter;

import kafka.utils.VerifiableProperties;


public class CustomMetricsReporter implements KafkaMetricsReporter, CustomMetricsReporterMBean {


    private VerifiableProperties props;

    private CustomReporter underlying;

    ...기타 필드


    @Override

    public void init(VerifiableProperties props) {

        // 리포터를 초기화하고 리포팅을 시작한다.

    }


    @Override

    public void startReporter(long pollingPeriodSecs) {

        // underlying 리포팅 시작

    }


    @Override

    public void stopReporter() {

        // underlying 리포팅 종료

    }


    @Override

    public String getMBeanName() {

        return "kafka:type=com.mycomp.kafka.CustomMetricsReporter";

    }

}


init() 메서드 구현


init() 메서드는 Metrics의 리포터를 초기화하고 리포팅을 시작한다. 구현 코드는 다음과 같은 형태를 취한다.


import com.yammer.metrics.Metrics;

import kafka.metrics.KafkaMetricsConfig;

import kafka.metrics.KafkaMetricsReporter;

import kafka.utils.VerifiableProperties;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;


import java.net.InetAddress;

import java.net.UnknownHostException;

import java.util.concurrent.TimeUnit;


public class CustomMetricsReporter 

        implements KafkaMetricsReporter, CustomMetricsReporterMBean {


    private VerifiableProperties props; // 설정 파일의 프로퍼티 보관

    private CustomReporter underlying; // Metrics 리포터


    private final Object lock = new Object();

    private boolean initialized = false;

    ...기타 필드


    @Override

    public void init(VerifiableProperties props) {

        // props는 Kafka 설정 파일의 프로퍼티 값을 갖는다.

        synchronized (lock) {

            this.props = props;

            if (!initialized) {

                initializeReporter();

                initialized = true;

                KafkaMetricsConfig metricsConfig = new KafkaMetricsConfig(props);

                // metricsConfig.pollingIntervalSecs() --> kafka.metrics.polling.interval.secs 설정 값

                startReporter(metricsConfig.pollingIntervalSecs());

            }

        }

    }


    // Metrics 리포터 초기화

    private void initializeReporter() {

        // 리포터 초기화에 필요한 프로퍼티 값 사용

        String someProperty = props.getString("some.property", "defaultValue");

        underlying = new CustomReporter(Metrics.defaultRegistry(), someProperty);

    }


    @Override

    public void startReporter(long pollingPeriodSecs) {

        // underlying 리포팅 시작

    }


startReporter() 구현


startReporter() 메서드는 Metrics의 리포터인 underying의 start() 메서드를 실행한다. 이 메서드는 메트릭 리포팅을 주기적으로 실행한다.


import java.util.concurrent.TimeUnit;


public class CustomMetricsReporter implements KafkaMetricsReporter, CustomMetricsReporterMBean {


    private VerifiableProperties props;

    private CustomReporter underlying;


    private final Object lock = new Object();

    private boolean initialized = false;

    private boolean running = false;


    @Override

    public void startReporter(long pollingPeriodSecs) {

        // underlying 리포팅 시작

        synchronized (lock) {

            if (initialized && !running) {

                underlying.start(pollingPeriodSecs, TimeUnit.SECONDS);

                running = true;

            }

        }

    }



stopReporter() 구현


public class CustomMetricsReporter implements KafkaMetricsReporter, CustomMetricsReporterMBean {


    private VerifiableProperties props;

    private CustomReporter underlying;


    private final Object lock = new Object();

    private boolean initialized = false;

    private boolean running = false;

    ...기타 필드


    @Override

    public void stopReporter() {

        // underlying 리포팅 종료

        synchronized (lock) {

            if (initialized && running) {

                underlying.shutdown();

                running = false;

                initializeReporter();

            }

        }

    }


stopReporter()는 Metrics 리포터의 shutdown()을 실행해서 메트릭 리포팅을 중지한다. 위 코드에서 리포팅 중지후에 initializeReporter()를 다시 실행하는데, 그 이유는 startReporter()를 다시 실행할 때 사용할 리포터를 미리 초기화하기 위함이다.


Metrics 리포터 구현


앞서 Kafka의 리포터 구현에서 봤듯이 Kafka 리포터는 Metrics 리포터를 시작하고 중지하는 역할을 맡는다. 실제로 메트릭을 리포팅하는 역할은 Metrics 리포터가 처리한다.


Metrics 리포터를 위한 코드 뼈대는 다음과 같다.


import com.yammer.metrics.core.*;

import com.yammer.metrics.reporting.AbstractPollingReporter;


import java.util.concurrent.TimeUnit;


public class CustomReporter

        extends AbstractPollingReporter

        implements MetricProcessor<CustomReporter.Context> {


    ...// 필드


    public CustomReporter(MetricsRegistry metricsRegistry, String someValue) {

        super(metricsRegistry, "custom-reporter");

        this.someValue = someValue;

    }


    @Override

    public void start(long pollingPeriodSecs, TimeUnit unit) {

        super.start(pollingPeriodSecs, unit);

    }


    @Override

    public void shutdown() {

        try {

            super.shutdown();

        } finally {

        }

    }


    @Override

    public void run() {

        // 리포팅 코드 위치

    }


    @Override

    public void processMeter(MetricName name, Metered meter, Context context) throws Exception {

    }


    @Override

    public void processCounter(MetricName name, Counter counter, Context context) throws Exception {

    }


    @Override

    public void processHistogram(MetricName name, Histogram histogram, Context context) throws Exception {

    }


    @Override

    public void processTimer(MetricName name, Timer timer, Context context) throws Exception {

    }


    @Override

    public void processGauge(MetricName name, Gauge<?> gauge, Context context) throws Exception {

    }


    public class Context {

    }

}


start() 메서드와 shutdown() 메서드는 상위 클래스의 start()와 shutdown() 메서드를 호출한다. AbstractPollingReporter의 start() 메서드는 자체 Executor를 이용해서 리포팅을 실행한다.


run() 메서드 구현


run() 메서드에서 메트릭을 읽어와 원하는 리포팅 작업을 수행한다.


import com.yammer.metrics.core.*;


public class CustomReporter

        extends AbstractPollingReporter

        implements MetricProcessor<CustomReporter.Context> {


    ...


    @Override

    public void run() {

        // 리포트 시작


        //  모든 메트릭을 구한다.

        final Set<Map.Entry<MetricName, Metric>> metricEntries =

                getMetricsRegistry().allMetrics().entrySet();


        Context context = new Context(); // 메트릭 값을 담을 컨텍스트 생성

        for (Map.Entry<MetricName, Metric> entry : metricEntries) {

            final MetricName metricName = entry.getKey();

            final Metric metric = entry.getValue();

            try {

                // 각 메트릭을 처리한다. processWith의 첫 번째 인자의 타입은 MetricProcessor이다.

                // 이 클래스가 MetricProcessor를 구현했으므로 this를 첫 번째 인자로 전달한다.

                metric.processWith(this, metricName, context);

            } catch (Exception e) {

            }

        }

        ... context의 결과 이용해서 리포팅

        

        // 리포트 종료

    }


    // process 메서드에서 메트릭 타입별로 처리

    ...


MetricName과 Metric은 각각 메트릭의 이름과 값을 갖는다. metric.processWith() 메서드를 메트릭 타입에 따라서 알맞은 process 메서드를 실행하는데, 이 process 메서드를 이용해서 메트릭을 context에 수집한다.


Context와 process 메서드 구현


Context는 주로 필요한 메트릭의 결과를 수집하는 역할을 한다. 간단한 Context 클래스를 다음과 같이 구현할 수 있다. 앞서 뼈대 코드에서는 Context 클래스를 CustomReporter의 내부 클래스로 선언했다.


public class Context {

    public List<String> metrics = new LinkedList<>();

}


각 메트릭을 처리하는 process 메서드는 MetricProcessor 인터페이스에 정의되어 있는데, CustomReporter 클래스가 이 인터페이스를 구현했는데 각 process 메서드에서 알맞게 메트릭을 꺼내 Context에 보관한다.


public class CustomReporter

        extends AbstractPollingReporter

        implements MetricProcessor<CustomReporter.Context> {

    ...


    @Override

    public void processMeter(MetricName name, Metered meter, Context context) throws Exception {

        String metric = name.getName() + " = " + meter.count();

        context.metrics.add(metric);

    }


    @Override

    public void processCounter(MetricName name, Counter counter, Context context) throws Exception {

        ...

    }

    ... processHistogram, processTimer, processGauge 메서드 구현


process 메서드는 Context에 필요한 정보를 저장한다. 다시 run() 메서드로 돌아가서 각 메트릭을 처리한 뒤 다음 코드처럼 context에 보관된 메트릭 처리 결과를 알맞은 형식으로 리포팅하면 메트릭 리포팅이 완료된다.


    @Override

    public void run() {

        // 리포트 시작

        final Set<Map.Entry<MetricName, Metric>> metricEntries = ...;


        Context context = new Context();

        for (Map.Entry<MetricName, Metric> entry : metricEntries) {

            ...생략

        }

        // context에서 처리된 메트릭 값 읽어와 리포팅

        for (String metric : context.metrics) {

            System.out.println(metric);

        }

        // 리포트 종료

    }


이 코드에서는 간단히 System.out을 사용했지만, 메트릭 수집 결과를 DB에 보관한다거나 메트릭 저장소에 전송하는 등의 코드를 알맞게 구현하면 된다.


카프카 브로커 설정


구현한 메트릭 리포터를 사용하려면 카프카 설정에 kafka.metric.reporters 프로퍼티에 리포터 Kafka 리포터 클래스를 지정하면 된다.


kafka.metrics.reporters=com.mycom.kafka.CustomMetricsReporter

log.dirs=/data1/kafka

zookeeper.connect=zk1:2181,zk2:2181,zk3:2181


설정을 적용했다면 Kafka를 시작해 보자. 커스텀 메트릭 리포터가 동작하는 것을 확인할 수 있을 것이다.


스트림으로 들어오는 튜플을 단순히 처리만하고 끝나는 경우는 드물다. 보통은 처리 결과를 최종 또는 중간 단계 어딘가에 저장하게 된다. 예를 들어, 10분당 판매량이 100개가 넘는 상품을 통지해주는 스톰 토폴로지를 만들었다면, 토폴로지를 이용해서 계산한 분당 판매량 데이터를 HBase나 MySQL과 같은 데이터베이스에서 보관하길 원하는데, 이렇게 해야 스톰에 장애가 발생했을 때, 그 이전에 계산한 값을 날리지 않을 수 있으며, 또한, 이렇게 계산한 데이터를 실시간 통계 데이터로 활용할 수 있기 때문이다.


이전글:


상태 추상화


Filter나 Function을 이용해서 계산한 결과 상태를 DB에 보관하는 기능을 구현할 수 있지만, 트라이던트 API는 일관된 방법으로 상태를 처리할 수 있도록 상태를 위한 API를 제공하고 있다. 트라이던트 API는 상태 보관을 위해 다음의 세 가지 인터페이스를 정의하고 있다.

  • State: 상태를 표현
  • StateFactory: State를 생성해주는 팩토리
  • StateUpdater: 배치 단위로 튜플을 State 객체에 반영

트라이던트 API는 위 세 가지 인터페이를 이용해서 상태를 관리한다. 즉, 개발자는 위 세 개의 인터페이스에 대한 구현 클래스만 알맞게 만들어주면 된다. 나머지는 트라이언트가 알아서 처리한다.


세 인터페이스는 다음과 같이 정의되어 있다.


public interface State {

    void beginCommit(Long txid);

    void commit(Long txid);

}


public interface StateFactory extends Serializable {

    State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions);

}


public interface StateUpdater<S extends State> extends Operation {

    void updateState(S state, List<TridentTuple> tuples, TridentCollector collector);

}


스톰은 내부적으로 다음과 같은 코드를 사용해서 튜플을 상태에 반영한다.


// 배치 시작을 상태에 알림

state.beginCommit(txid);


// 배치에 속한 튜플 단위로 상태에 반영하고, collector를 이용해서 새로운 튜플 발생

stateUpdater.updateState(state, tuples, collector)


// 배치 종료를 상태에 알림

state.commit();


스톰은 배치 단위로 튜플을 상태에 반영한다. 배치가 시작되면 이를 State에 알리고(beginCommit 실행), StateUpdater를 이용해서 배치에 속한 튜플을 State에 반영한다(updateState 실행). 배치에 속한 튜플을 모두 State에 반영하면 배치가 끝났음을 State에 알린다.(commit 실행)


위 코드에서 알 수 있는 건, State와 StateUpdater는 쌍을 이룬다는 점이다. 즉, 특정 타입의 State에 상태를 반영하려면 그 State에 알맞은 StateUpdater가 필요하다. 실제로 Stream의 partitionPersist() 메서드를 보면 다음과 같이 특정 타입의 State 객체를 생성하는 StateFactory와 그 State의 상태를 갱신할 때 사용될 StateUpdater를 함께 파라미터로 전달받는 것을 알 수 있다.


partitionPersist(StateFactory stateFactory, Fields inputFields, StateUpdater updater, Fields functionFields)

 


[State와 StateUpdater는 쌍을 이룬다.]


스톰은 일부 State 구현에 대해 이미 쌍을 우리는 StateUpdater 구현을 제공하고 있다. (위 그림에서 MapState와 MapCombinerAggStateUpdater가 스톰이 제공하는 인터페이스/클래스이다.)


State 만들어보기


Stream이 제공하는 메서드는 partitionPersist()인데, 이 이름에서 알 수 있듯이 스트림은 기본적으로 파티션 단위로 상태값을 보관하는 간단한 State 구현을 만들어보자. 이를 통해 State의 동작 방식에 대한 약간의 감을 잡을 수 있을 것이다. 여기서 만들 예제는 중복되지 않은 "제품ID:시간"을 키로 하는 이벤트 발생 여부를 보관하는 State 구현이다.


State 구현은 다음과 같이 간단하게 구현했다.


public class DistinctState implements State {


    private Map<String, ShopLog> map = new ConcurrentHashMap<>();

    private Map<String, ShopLog> temp = new ConcurrentHashMap<>();


    @Override

    public void beginCommit(Long txid) {

        temp.clear();

    }


    @Override

    public void commit(Long txid) {

        for (Map.Entry<String, ShopLog> entry : temp.entrySet())

            map.put(entry.getKey(), entry.getValue());

    }


    public boolean hasKey(String key) {

        return map.containsKey(key) || temp.containsKey(key);

    }


    public void put(String key, ShopLog value) {

        temp.put(key, value);

    }

}


위 코드에서 beginCommmit() 메서드는 temp 맵을 초기화한다. 그리고, commit() 메서드는 temp에 담겨 있는 모든 내용을 map에 복사한다. 여기서는 메모리 맵은 map 필드에 내용을 복사했지만, 실제 구현에서는 외부의 캐시 서버나 DBMS 등에 내용을 복사하도록 구현하게 된다. 즉, 한 배치가 시작될 때 beginCommit() 메서드를 통해 배치에 속한 튜플의 처리 준비를 하고, 한 배치가 끝날 때 commit() 메서드를 통해 배치에 포함된 튜플의 처리 결과를 상태를 보관할 저장소에 반영하게 된다.


배치의 시작과 끝 사이에 사용되는 메서드가 hasKey() 메서드와 put() 메서드이다. 이 두 메서드는 StateUpdater에 의해 사용되는데, DistictState를 위한 StateUpdater 구현 클래스는 다음과 같다.


public class DistinctStateUpdater extends BaseStateUpdater<DistinctState> {


    @Override

    public void updateState(

                DistinctState state, List<TridentTuple> tuples, TridentCollector collector) {

        List<TridentTuple> newEntries = new ArrayList<>();

        for (TridentTuple t : tuples) {

            String key = t.getStringByField("productId:time");

            if (!state.hasKey(key)) {

                state.put(key, (ShopLog) t.getValueByField("shopLog"));

                newEntries.add(t);

            }

        }

        for (TridentTuple t : newEntries) {

            collector.emit( // 상태로부터 새로운 스트림을 생성

                Arrays.asList(t.getValueByField("shopLog"), t.getStringByField("productId:time"))

            );

        }

    }

} 


StateUpdater 구현 클래스는 BaseStateUpdater 클래스를 상속받은 뒤 updateState() 메서드를 알맞게 구현해주면 된다. updateState() 메서드는 첫 번째 파라미터로 상태를 표현하는 객체를, 두 번째 파라미터로 배치에 속한 튜플을, 세 번째 파라미터는 튜플을 상태에 반영한 후 새로운 튜플을 생성할 때 사용할 TridentCollector이다.


updteState() 메서드를 보면 DistinctState의 hasKey() 메서드를 이용해서 처리하려는 튜플의 key 값이 이미 DistinctState에 보관한 key인지 확인한다. 아직 보관전인 key를 가진 튜플이면, put() 메서드를 이용해서 DistinctState에 반영하고, newEntries에 보관한다.


updateState() 메서드는 배치에 포함된 모든 튜플을 상태에 반영한 뒤, collector를 이용해서 newEntries에 보관된 튜플을 재발생시킨다. 이렇게 함으로써 상태 처리 상태로부터 새로운 튜플을 만들어내는 스트림이 만들어지게 된다.


마지막으로 구현해야 할 클래스는 StateFactory 인터페이스의 구현 클래스이다. 이 클래스의 구현은 다음과 같이 간단하다.


public class DistinctStateFactory implements StateFactory {

    private static final long serialVersionUID = 1L;


    @SuppressWarnings("rawtypes")

    @Override

    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {

        return new DistinctState();

    }

}


이제 이렇게 만든 상태 관련 클래스를 실제 트라이던트 토폴로지에 적용해보자.


TridentTopology topology = new TridentTopology();

topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .partitionPersist(

                new DistinctStateFactory()

                new Fields("shopLog", "productId:time"), // StateUpdater에 전달될 튜플의 필드 목록

                new DistinctStateUpdater()

                new Fields("shopLog", "productId:time")) // StateUpdater가 생성하는 튜플 필드 목록

        .newValuesStream() // StateUpdater에서 생성한 튜플로 구성된 스트림

        .each(new Fields("shopLog", "productId:time"), Util.printer());

...


partionPersist() 메서드는 파티션 별로 State를 생성하고 튜플을 상태에 반영하는 기능을 제공한다. 위 코드의 경우 StateFactory로 DisctinctStateFactory를 사용했으므로 파티션 별로 상태 보관을 위해 DistinctState 객체가 생성된다.


partitionPersist() 메서드는 TridentState 객체를 리턴하는데, 이 객체의 newValueStream() 메서드는 DistinctStateUpdater에서 collector를 이용해서 생성한 튜플을 스트림으로 제공하는 새로운 Stream을 리턴한다. 따라서, 위 코드에서 마지막 each() 메서드는 DistinctStateUpdater가 생성한 튜플을 입력으로 받아서 그 내용을 콘솔에 출력하게 된다.


상태와 튜플 재처리 방식


튜플로부터 상태를 보관할 때 고민해야 할 점이 있다면, 그것은 바로 재처리와 관련된 것이다. 아래 그림을 보자. 아 그림에서 persistentAggregate 연산은 앞의 groupBy로 그룹핑 된 튜플의 개수를 적재하는 State를 처리해준다고 해 보자.




State가 키가 "0:123"인 튜플의 개수로 3을 갖고 있었다고 하자. 즉, {"0:123": 3 }을 State가 상태로 보관하고 있었다고 하자. 이 상태에서 위 그림의 윗 부분의 실행되었다고 하자. 이 때 스파우트가 발생한 "0:123"인 튜플의 개수가 4라면, persistenAggregate를 거친 뒤에 상태 값은 {"0:123": 7}이 될 것이다. 그런데, AlertFunction 연산에서 실패가 나는 바람에 스톰이 재처리를 수행했다고 해 보자. 그러면, 위 그림의 아래 부분처럼 튜플을 다시 발생시킬 거고, 그렇게 되면 persistenceAggregate는 이미 {"0:123": 7}을 상태 값으로 갖고 있었기 때문에, 재발송된 튜플 개수 4개가 더해지면서 상태 값이 {"0:123", 11}로 바뀌게 될 것이다.


사실, 여기서 동일한 튜플이 다시 발생한 것이므로 최종 상태 값은 {"0:123", 7}이어야 한다. 그런데, 상태가 튜플의 재발생 여부를 제대로 처리하지 못하게 되면, 같은 튜플이 상태에 중복해서 적용되는 문제가 발생하는 것이다.


이렇게 튜플을 다시 발생했을 때 동일 튜플이 State에 중복해서 적용되는 문제를 처리하기 위해 스톰은 세 가지 타입의 State 구현 객체를 제공하고 있다.



State

설명

TransactionalMap

스파우트가 Transactional인 경우(즉 한 배치에 대해 동일한 튜플을 생성할 경우), 동일 튜플에 대해 동일 상태를 보장한다.

OpaqueMap

스파우트가 Opaque Transactional인 경우(즉 한 튜플은 한 배치에만 속하지만 한 배치에 대해 매번 동일한 튜플이 발생하지 않을수도 있는 경우), 이전 상태 값을 기준으로 상태를 갱신한다.

NonTransactionalMap스파우트가 Non Transactional인 경우(즉 튜플이 특정 배치에만 속한다고 가정할 수 없는 경우), 튜플 재생성에 따른 롤백을 지원하지 않는다.


TransactionalMap의 동작 방식


TransactionalMap은 상태를 보관할 때 (트랜잭션ID, {키: 값})의 구조를 사용한다. 예를 들어, 배치 단위로 그룹바이-카운트를 수행한 결과 아래 표의 왼쪽/중간 컬럼과 같이 (키, 개수)가 생성되었다고 해 보자. 이 경우 표의 우측 컬럼과 같이 상태 값이 갱신된다.


 배치(트랜잭션) ID

 튜플 연산 결과 (키, 개수) 값

상태

 1

{ "0:123": 4 }

(1, {"0:123": 4} ) 

 2

{ "0:123": 5 } 

(2, {"0:123": 9} )

 3

{ "0:123": 3 } 

(3, {"0:123": 12} ) 

 3 (재발생)

{ "0:123": 3 }  
Transactional 스파우트는 동일한 튜플을 발생

(3, {"0:123": 12} )  
(기존 값 그대로 유지)

위 표에서 트랜잭션ID가 3에 해당하는 배치의 튜플이 다시 발생했다. 이 스파우트는 Transactional 타입이므로 같은 배치 ID에 대해 동일한 튜플을 발생시키므로, 튜플의 그룹바이-카운트 연산 결과는 위 표의 가운데 컬럼에서 보는 것처럼 동일하다. 따라서, TransactionalMap은 현재 상태와 동일한 트랜잭션 ID에 해당하는 결과 값이 들어올 경우, 상태에 반영하지 않고 유지하면 된다.


OpaqueMap의 동작 방식


OpaqueMap은 상태를 보관할 때 (트랜잭션ID, 키, 이전값, 현재값)의 구조를 사용한다. 앞의 TransactionalMap과 유사하게 3번 트랜잭션ID에 해당하는 배치를 재처리했다고 해 보자.


 배치(트랜잭션) ID

 튜플 연산 결과 (키, 개수) 값

상태

 1

{ "0:123": 4 }

(1, "0:123", null, 4 ) 

 2

{ "0:123": 5 } 

(2, "0:123", 4 9 )

 3

{ "0:123": 3 } 

(3, "0:123", 9, 12 ) 

 3 (재발생)

{ "0:123": 5 }  

Opaque 스파우트는 발생한 튜플이 달라질 수 있음

(3, "0:123", 9, 14 )  

위 표에서 OpaqueMap은 상태에 값을 보관할 때, 현재 처리한 트랜잭션ID와 이전 값, 현재 값을 함께 보관한다. 만약 특정 문제가 발생해서 스파우트가 특정 트랜잭션 ID에 해당하는 튜플을 재발생한 경우, Opaque 스파우트는 위 표의 중간 컬럼처럼 다른 결과를 만들어낼 수 있다. 따라서 상태는 새롭게 계산된 결과와 이전 상태 값을 이용해서 현재 상태 값을 갱신해주어야 한다. 위 표에서는 3번 트랜잭션ID의 재발생 시점에 이전 상태 값이 {"0:123": 9}이므로, 현재 상태 값은 {"0:123": 9 + 5}인 {"0:123": 14}로 바뀐다.


OpaqueMap 구현 방법: IBackingMap, OpaqueValue 사용


OpaqueMap은 내부적으로 IBackingMap 인터페이스와 OpaqueValue 클래스를 사용한다. OpaqueMap은 상태 데이터를 처리하기 위해 OpaqueValue 클래스를 사용하고, 실제 저장소로부터 상태 값을 조회하거나 저장소에 상태 값을 반영하기 위해 IBackingMap 구현체를 사용한다. 실제 OpaqueMap 인스턴스를 생성하는 메서드는 정적 팩토리 메서드로 다음과 같이 정의되어 있다.


public class OpaqueMap<T> implements MapState<T> {

    public static <T> MapState<T> build(IBackingMap<OpaqueValue> backing) {

        return new OpaqueMap<T>(backing);

    }


실제로 OpaqueMap을 사용하기 위해서 필요한 건 다음과 같다.

  • 상태 보관을 위해 IBackingMap 인터페이스를 알맞게 구현한 클래스를 만든다.
  • 앞서 만든 IBackingMap 구현 객체를 이용해서 생성한 OpaqueMap을 리턴해주는 StateFactory를 구현한다.
  • 집합 연산을 구현한다.
  • GroupedStream의 persistenceAggregate() 메서드를 이용해서 그룹핑 연산 결과를 상태에 반영한다.
    • Stream의 partitionPersist()를 사용할 경우, 알맞은 StateUpdater를 사용한다.

OpaqueMap용 "키:개수" 상태 보관하기 위한 IBackingMap 인터페이스를 구현한 클래스는 다음 코드처럼 만들어 볼 수 있다.


public class CountSumBackingMap implements IBackingMap<OpaqueValue> {

    private Map<String, CountValue> sumMap = new ConcurrentHashMap<>();


    @Override

    public List<OpaqueValue> multiGet(List<List<Object>> keys) {

        List<OpaqueValue> values = new ArrayList<>();

        for (List<Object> keyVals : keys) {

            String key = (String) keyVals.get(0);

            CountValue countValue = sumMap.get(key);

            if (countValue == null) {

                values.add(null);

            } else {

                values.add(new OpaqueValue<Long>(countValue.getCurrentTxId(),

                                 countValue.getCurrCount(), countValue.getPrevCount()));

            }

        }

        return values;

    }


    @Override

    public void multiPut(List<List<Object>> keys, List<OpaqueValue> vals) {

        for (int i = 0; i < keys.size(); i++) {

            String key = (String) keys.get(i).get(0);

            OpaqueValue<Long> val = vals.get(i);


            CountValue countValue = sumMap.get(key);

            if (countValue == null) {

                sumMap.put(key, new CountValue(val.getCurrTxid(), val.getPrev(), val.getCurr()));

            } else {

                countValue.update(val.getCurrTxid(), val.getPrev(), val.getCurr());

            }

        }

    }


}


IBackingMap을 구현하는 클래스는 multiGet() 메서드와 multiPut() 메서드 두 개만 알맞게 구현해주면 된다. 먼저, multiGet() 메서드는 파라미터로 키 목록을 전달받으며, 이 키에 해당하는 OpaqueValue 목록을 리턴해주면 된다. 위 코드의 multiGet() 메서드 메모리 맵인 sumMap에서 키에 해당하는 CountValue 객체를 가져온 뒤, 그 CountValue 객체로부터 OpaqueValue를 생성하고 있다.


CountValue는 (currTxId, prevVal, currVal)을 값으로 갖고 있으므로, sumMap은 결과적으로 (currTxId, key, prevVal, currVal)을 보관하는 장소가 된다. 이 예에서는 메모리 맵을 사용했지만, 실제로는 외부의 DBMS나 NoSQL 같은 곳에 보관된 데이터를 보관하고 읽어올 것이다.


OpaqueValue는 OpaqueMap의 동작 방식에서 설명했던 데이터 구조 중 키를 제외한 (트랜잭션ID, 이전값, 현재값)을 값으로 갖는다. 위 코드에서도 CountValue로부터 OpaqueValue를 생성할 때 트랜잭션ID, 이전값, 현재값을 가져와 생성했다.


스톰은 그룹 연산을 수행할 때 OpaqueMap은 내부적으로 다음과 같은 과정을 거친다.

  • IBackingMap의 multiGet()을 이용해서 (키: OpaqueValue) 목록을 구한다.
  • 앞서 구한 그룹 연산의 결과를 반영한 새로운 (키: OpaqueValue) 목록을 생성하고
  • 그 결과를 IBackingMap의 multiPut()을 이용해서 반영한다.

즉, multiPut() 메서드는 파라미터로 전달받은 (키: OpaqueValue) 목록을 이용해서 그 결과를 저장소에 반영하면 된다. 위 코드는 메모리 맵은 sumMap에 반영했지만, 실제 구현에서는 HBase나 MySQL과 같은 외부 저장소에 반영하게 될 것이다. 연산 결과를 반영할 때 주의할 점은 OpaqueMap이 제대로 동작하도록 (트랜잭션ID, 키, 이전값, 현재값)이 올바르게 적용되어야 한다는 점이다.


IBackingMap을 구현했다면 상태 생성을 위한 StateFactory를 구현하는 것이다. OpaqueMap을 위한 StateFactory는 다음과 같이 간단하게 구현할 수 있다.


public class CountSumStateFactory implements StateFactory {


    @SuppressWarnings("rawtypes")

    @Override

    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {

        return OpaqueMap.build(new CountSumBackingMap());

    }


}


키 별로 개수를 구하는 집합 연산을 다음과 같이 구현해 볼 수 있다.


public class CountReducerAggregator implements ReducerAggregator<Long> {


    private static final long serialVersionUID = 1L;


    @Override

    public Long init() {

        return 0L;

    }


    @Override

    public Long reduce(Long curr, TridentTuple tuple) {

        return curr + 1L;

    }


}


이제 필요한 것은 다 만들었으므로, 다음과 같이 persistentAggregate() 메서드를 이용해서 그룹핑 연산 결과를 상태에 보관할 수 있게 된다.


TridentTopology topology = new TridentTopology();

topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .groupBy(new Fields("productId:time"))

        .persistentAggregate(

                new CountSumStateFactory(), // 상태 팩토리

                new CountReducerAggregator(), // 집합 연산

                new Fields("sum")) // 집합 연산 결과 필드

        .newValuesStream() // (productId:time, sum) 튜플 생성

        .each(new Fields("productId:time", "sum"), new ThresholdFilter())

        .each(new Fields("productId:time", "sum"), new AlertFilter())


그룹핑 연산이므로 그룹핑 기준 튜플의 값이 키가 되고, 집한 연산 결과가 값이 된다. 즉, "productId:time" 필드 값이 키가 되고 집합 연산 결과인 "sum" 필드가 값이 된다. 위 코드에서 한 가지 빠진 것이 있는데, 그것은 바로 StateUpdater이다. 앞서 partionPersist() 예제에서는 StateUpdater를 사용했는데, 위 예에서는 StateUpdater를 따로 구현하지 않고 있다. StateUpdater를 지정하지 않은 이유는 persistentAggregate() 메서드가 내부적으로 OpaqueMap에 알맞은 StateUpdater 구현체를 사용하고 있기 때문이다.


TransactionalMap 구현 방법: IBackingMap, TransactionalValue 사용


TransactionalMap을 사용하는 방법은 OpaqueMap을 사용하는 방법과 거의 동일하다. 둘 다 알맞은 IBackingMap 구현 클래스만 제공해주면 된다. 차이점이 있다면 TransactionalMap은 OpaqueValue 대신에 TransactionalMap을 사용한다는 것이다. 다음은 TransactionalMap을 위한 IBackingMap의 구현 예를 보여주고 있다.


public class CountSumBackingMap2 implements IBackingMap<TransactionalValue<Long>> {

    private Map<String, CountValue2> sumMap = new ConcurrentHashMap<>();


    @Override

    public List<TransactionalValue<Long>> multiGet(List<List<Object>> keys) {

        List<TransactionalValue<Long>> values = new ArrayList<>();

        for (List<Object> keyVals : keys) {

            String key = (String) keyVals.get(0);

            CountValue2 countValue = sumMap.get(key);

            if (countValue == null) {

                values.add(null);

            } else {

                values.add(

                    new TransactionalValue<Long>(countValue.getTxId(), countValue.getCount()));

            }

        }

        return values;

    }


    @Override

    public void multiPut(List<List<Object>> keys, List<TransactionalValue<Long>> vals) {

        for (int i = 0; i < keys.size(); i++) {

            String key = (String) keys.get(i).get(0);

            TransactionalValue<Long> val = vals.get(i);


            CountValue2 countValue = sumMap.get(key);

            if (countValue == null) {

                sumMap.put(key, new CountValue2(val.getTxid(), val.getVal()));

            } else {

                countValue.update(val.getTxid(), val.getVal());

            }

        }


    }


}


StateFactory는 동일한 방식으로 구현한다.


public class CountSumStateFactory2 implements StateFactory {


    private static final long serialVersionUID = 1L;


    @SuppressWarnings("rawtypes")

    @Override

    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {

        return TransactionalMap.build(new CountSumBackingMap2());

    }


}


스톰 트라이던트 상태 구현 예

  • Memcached를 위한 State 구현: https://github.com/nathanmarz/trident-memcached
  • 소스 코드: https://github.com/madvirus/storm-sample


오늘 Ambari 1.6을 이용해서 HDP 2.1을 설치하는데, Application Timeline Serve가 올라오지 않아 서버에 들어가 로그 파일에 기록된 에러 메시지를 봤더니, 다음 클래스를 찾지 못한다는 에러 메시지가 출력되고 있었다.


2014-07-16 19:01:55,808 INFO  applicationhistoryservice.ApplicationHistoryServer (SignalLogger.java:register(91)) - registered UNIX signal handlers for [TERM, HUP, INT]

2014-07-16 19:01:55,960 INFO  service.AbstractService (AbstractService.java:noteFailure(272)) - Service org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer failed in state INITED; cause: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.LeveldbTimelineStore not found

java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.LeveldbTimelineStore not found

        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1927)

        at org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer.createTimelineStore(ApplicationHistoryServer.java:165)

        at org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer.serviceInit(ApplicationHistoryServer.java:80)

        at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)


약간의 구글링을 통해서 위 클래스 이름을 다음과 같이 변경해주어야 한다는 것을 알았다. Ambari 1.6-46, Hadoop 2.4.0.2.1 기준으로 YARN -> Config 에서 "yarn.timeline-service.store-class" 속성의 값을 변경해주면 된다.


org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore


예전에 받아 놓은 HDP 2.1 Sandbox 버전의 경우, 위 속성 값이 org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.LeveldbTimelineStore로 설정되어 있는데, 하둡 버전은 2.4.0.2.1로 동일하다. (같은 하둡 버전인데, 클래스 이름이 다르다는 건 좀...)

스톰을 사용하는 이유 중 하나를 꼽아보자면, 병렬 처리를 들 수 있을 것 같다. 다중 노드에서 병렬로 연산을 함으로써 대량의 실시간 스트림 데이터를 처리하기 위해 만든 것이 스톰임을 생각해보면, 병렬 처리는 스톰의 주된 사용 이유일 것이다.


스톰의 기반 API의 경우, 아래 코드처럼 태스크의 개수와 쓰레드 개수 등을 설정해서 몇 개의 볼트를 동시에 실행할지 결정했다.


builder.setBolt("word-normalizer", new WordNormalizer(), 4) // 4개의 쓰레드

        .setNumTasks(8) // 8개의 작업 생성

        .shuffleGrouping("word-reader");


스톰의 병렬 힌트와 파티션


스톰 트라이던트 API는 직접 태스크 개수를 지정하는 방식을 사용하지 않고, 병렬 힌트를 주는 방식을 사용하고 있다. 다음은 병렬 힌트 코드의 예를 보여주고 있다.


topology.newStream("log", new LogSpout())

        .each(new Fields("logString"),  new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .parallelismHint(2);


위 코드에서 parallelismHint() 메서드를 볼 수 있는데, 이 메서드는 동시 실행 단위가 되는 파티션을 몇 개 생성할지 결정한다. 예를 들어, 위 코드는 다음과 같이 두 개의 타피션을 생성한다.



스톰은 각 파티션을 별도 쓰레드에서 실행한다. 워커 프로세스가 2개 이상일 경우, 별도 프로세스에 파티션이 실행된다. 파티션의 단위가 변경되는 것을 리파티션이라고 하는데, parallelismHint() 메서드에 의해 생성되는 파티션 적용 범위는 parallelismHint() 메서드 호출 이전에 리파티션이 일어나기 전까지다. 예를 들어, 다음 코드를 보자.


topology.newStream("log", new LogSpout())

        .parallelismHint(1)

        .shuffle()

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .parallelismHint(2);


위 코드를 보면 두 개의 parallelismHint() 메서드를 사용하고 있다. 한 번은 LogSpout 뒤에 설정했고, 한 번은 나머지 세 개 연산 뒤에 설정했다. 이 경우 두 번째 설정한 parallelismHint(2) 메서드의 적용 범위는 첫 번째 parallelismHint(1) 메서드의 이후가 된다. 이 경우 생성되는 파티션은 다음과 같다. 



파티션의 개수가 달라지는 리파티셔닝은 위 코드처럼 parallelismHint()의 개수가 다를 때 발생하며, groupBy()에 의해서도 발생한다. groupBy()는 기본적으로 모든 파티션에서 발생한 튜플을 한 곳으로 모아 그룹핑처리를 한다.


topology.newStream("log", new LogSpout()) // 1개 파티션

        .parallelismHint(1)

        .shuffle()

         // 2개 파티션

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new GroupingValueFunction(), new Fields("productId:time"))

        .parallelismHint(2)

        .groupBy(new Fields("productId:time")) // 1개 파티션

        .aggregate(new CountAggregator(), new Fields("count"))


리파티셔닝과 튜플 분배


파티션 크기가 변경되면, 파티션 간에 데이터를 어떻게 분배할지에 대해 결정해 주어야 한다. 예를 들어, 아래 코드는 LogSpout가 속한 파티션과 이후 세 개의 연산이 속한 파티션의 개수가 다르기 때문에 LogSpout가 생성한 튜플을 세 개의 파티션에 알맞게 분배해줘야 한다. 아래 코드에서는 라인 로빈 방식으로 분배하는 shuffle() 방식을 선택한다.


topology.newStream("log", new LogSpout())

        .parallelismHint(1)

        .shuffle()

        .each(new Fields("logString"), new OrderLogFilter())

        .each(...)

        .each(...)

        .parallelismHint(3)


트라이던트 API가 제공하는 분배 방식은 다음과 같다.

  • shuffle(): 라운드 로빈 방식으로 분배
  • partitionBy(Fields): 필드의 해시값을 이용해서 분배
  • broadcast(): 모든 파티션에 복사
  • global(): 모든 튜플을 한 파티션으로 보낸다.
  • batchGloabl(): 한 배치에 속한 튜플은 한 파티션으로 보낸다.
  • 커스텀 규칙: 직접 분배 규칙을 구현한다.

파티션 로컬 오퍼레이션


스트림에서 다음의 세 연산은 한 파티션에서 실행된다.

  • each(Function)
  • each(Filter)
  • partitionAggregate(Aggregator)
예를 들어, 아래 코드에서 병렬 힌트를 CountSumFunction 다음에 3으로 주긴 했지만, Function과 Filter는 기본적으로 같은 파티션에 포함되기 때문에, [CountSumFunction, ThresholdFilter, AlertFilter]는 한 파티션에 속하며, 따라서 이 쌍으로 세 개의 파티션이 생기게 된다.

.groupBy(new Fields("productId:time"))
.aggregate(new CountAggregator(), new Fields("count"))
.parallelismHint(1).partitionBy(new Fields("productId:time"))
.each(new Fields("productId:time", "count"), new CountSumFunction(), new Fields("sum"))
.parallelismHint(3)
.each(new Fields("productId:time", "sum"), new ThresholdFilter())
.each(new Fields("productId:time", "sum"), new AlertFilter());

파티션 로컬 오퍼레이션에 대해서 파티션을 분리하고 싶다면, 다음과 같이 명시적으로 병렬 힌트를 지정해 주어야 한다. 이 코드는 AlertFilter 다음에 병렬 힌트를 1로 주었다. 따라서, 아래 코드는 CountSumFunction만 속한 파티션 3개와 [ThresholdFilter, AlertFilter]가 속한 파티션 1개를 생성한다.

.groupBy(new Fields("productId:time"))
.aggregate(new CountAggregator(), new Fields("count"))
.parallelismHint(1).partitionBy(new Fields("productId:time"))
.each(new Fields("productId:time", "count"), new CountSumFunction(), new Fields("sum"))
.parallelismHint(3).shuffle()
.each(new Fields("productId:time", "sum"), new ThresholdFilter())
.each(new Fields("productId:time", "sum"), new AlertFilter())
.parallelismHint(1);

한 타피션은 물리적으로 한 볼트에 속한다. 예를 들어, 아래 코드를 보자.

topology.newStream("log", new LogSpout())
        .each(new Fields("logString"), new OrderLogFilter())
        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))
        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))
        .groupBy(new Fields("productId:time"))
        .aggregate(new Fields("productId:time"), new Count(), new Fields("count"))
        .each(new Fields("productId:time", "count"), new CountSumFunction(), new Fields("sum"))
        .each(new Fields("productId:time", "sum"), new ThresholdFilter())
        .each(new Fields("productId:time", "sum"), new AlertFilter());

위 코드에서 리파티셔닝을 하는 연산으로 groupBy()가 존재한다. 따라서, LogSpout부터 AddGroupingValueFunction 까지 파티션이 만들어지고, Count()부터 AlertFilter()까지 파티션이 만들어진다. 이 경우 스톰은 다음과 같이 Spout와 Bold를 구성한다.


참고자료

관련 글:


아파치 스톰(Storm) 프로젝트는 실시간 데이터 분석을 위한 기반 플랫폼이다. 스톰을 이용하면 데이터를 병렬로 가공할 수 있는 클러스터를 구축할 수 있으며, 이 클러스터를 이용해서 실시간으로 발생하는 데이터를 가공하고 분석하는 작업을 수행할 수 있다. 스톰에 대한 소개는 이전에 올렸던 'Storm 훑어보기'(http://javacan.tistory.com/324) 자료로 대신하고, 본 글에선 스톰 트라이던트의 연산 처리와 관련된 API의 사용 방법에 대해 살펴볼 것이다.


만들 예제


본 글에서는 다음과 같은 간단한 예제를 만들어보면서 스톰 트라이던트 API의 연산 처리와 관련된 API를 알아볼 것이다.

  • 실시간 로그를 읽어와,
  • 로그가 판매 로그면,
  • 로그를 파싱하고,
  • 1분당 판매 개수가 50개를 넘기면 상품이 있으면,
  • 통지한다.

스톰 트라이던트 프로그래밍의 구성 요소


스톰 클러스터를 이용해서 데이터를 처리하려면 다음의 세 가지 종류의 코드를 작성해야 한다.

  • 트라이던트 스파우트(Spout): 튜플을 발생시킨다. Kafka와 같은 서버에서 메시지를 읽어와 튜플로 변환해서 스톰 스트림에 넣어준다.
  • 연산 처리
    • Function: 튜플을 입력받아 튜플을 생성한다.
    • Filter: 튜플을 입력받아 연산을 계속할지 여부를 결정한다.
    • 그룹핑: 튜플을 특정 필드를 이용해서그룹핑한다.
  • 토폴로지(Topology): 스파우트와 연산 구성 요소를 연결하고 병렬 처리 등을 설정한다.
여기서 만들 토폴로지 구성은 아래 그림과 같다.


스파우트는 로그를 튜플로 생성한다. 튜플(tuple)은 데이터를 보관하는 단위로서, Function과 Filter는 한 개의 튜플에 대해 연산을 수행한다. 여기서 만들 스파우트는 단순히 로그 문자열을 튜플에 담아 스트림에 넣는다.


모든 로그가 아닌 주문 로그만 처리하면 되므로, 주문 로그인지 여부를 확인하는 필터를 맨 앞에 위치시킨다. 필터는 로그가 주문 필터인 경우에만 튜플을 스트림의 다음 단계로 전달한다. 이후 과정에서는 로그 문자열을 파싱해서 ShopLog라는 객체로 변환하고, 그룹핑 기준으로 사용할 '제품ID:시간'을 튜플에 추가한다.


'제품ID:시간'으로 튜플을 그룹핑 한뒤 뒤, 각 '제품ID:시간' 별로 개수를 구한다. 그리고, '제품ID:시간' 별 개수를 누적하고(기 별 개수 합 함수), 각 누적 개수가 임계값에 도달하면 통지 필터에 해당 튜플을 전달한다.


트라이던트 스파우트 구현


트라이던트 스파우트(이하 스파우트)는 기본적으로 배치 단위로 튜플을 생성한다. 다음 글에서 더 자세히 다루겠지만, 스파우트가 생성하는 각 튜플은 한 트랜잭션 ID(배치)에 포함된다. 예를 들어, 스파우트는 한 트랜잭션ID에 해대 튜플을 5개씩 생성할 수 있고, 이 때 스트림은 한 번에 5개의 튜플을 처리하게 된다. 만약 토폴로지에서 튜플을 처리하는 도중 에러가 발생하면, 스톰은 특졍 트랜잭션 ID에 해당하는 튜플을 다시 생성하도록 스파우트에 요청한다.


스파우트가 데이터를 어떻게 생성해주냐에 따라 스파우트를 크게 세 가지 종류로 구분할 수 있다.

  • Transactional: 한 트랜잭션 ID에 대해 매 번 동일한 튜플 묶음을 생성함을 보장한다. 한 트랜잭션 ID에 속한 튜플이 다른 트랜잭션ID에 포함되지 않는다.
  • Opaque-transactional: 한 튜플은 한 트랜잭션ID에만 포함된다. 단, 한 트랜잭션ID에 대해 매 번 동일한 튜플 묶음을 생성함을 보장하진 않는다. 
  • Non-transactional: 한 튜플이 한 트랜잭션ID에만 포함된다는 것을 보장하지 않는다.

트라인던트 스파우트를 구현하려면 스톰이 제공하는 인터페이스를 상속받아야 한다. 트라이던트 스파우트 구현을 위한 몇 가지 종류의 인터페이스가 존재하는데, 본 글에서는 그 중에서 ITridentSpout 인터페이스를 이용한 구현 방법에 대해 살펴볼 것이다.


ITridentSpout 인터페이스를 이용한 스파우트 구현


ITridentSpout 인터페이스를 이용해서 스파우트를 구현하려면, 다음의 세 인터페이스에 대한 이해가 필요하다.



Emitter, BatchCoordinator, ITridentSpout는 각각 다음과 같은 역할을 한다.

  • Emitter: 특정 트랜잭션에 속할 튜플을 생성한다.
  • BatchCoordinator: 특정 트랜잭션 ID에 해당하는 메타 데이터를 생성하고, 튜플 생성을 준비한다.
  • ITridentSpout: 스톰에 Emitter 객체와 BatchCoordinator 객체를 제공한다. 또한, Emitter가 생성할 튜플의 필드 정보를 정의한다.
스톰이 사용할 트라이던트 스파우트를 제공하려면 위 세 개의 인터페이스를 알맞게 구현해 주어야 한다. 본 글에서 만들어볼 예제는 어딘가에서 일정 개수의 로그를 읽어와 튜플로 전달한다. 튜플에 포함될 필드는 1개 뿐이고 그 필드의 이름을 "log"라고 한다면, ITridentSpout 인터페이스는 다음과 같이 구현할 수 있다.

@SuppressWarnings("rawtypes")
public class LogSpout implements ITridentSpout<Long> {

    private static final long serialVersionUID = 1L;

    @Override
    public BatchCoordinator<Long> getCoordinator(
             String txStateId, Map conf, TopologyContext context) {
        return new LogBatchCoordinator();
    }

    @Override
    public Emitter<Long> getEmitter(String txStateId, Map conf, TopologyContext context) {
        return new LogEmitter();
    }

    @Override
    public Map getComponentConfiguration() {
        return null;
    }

    @Override
    public Fields getOutputFields() {
        return new Fields("logString");
    }
}

ITrdientSpout의 타입 파라미터를 Long으로 지정했는데, 여기서 Long은 뒤에서 설명할 트랜잭션 메타 데이터의 타입이 된다. getCoordinator()와 getEmitter()는 각각 Coordinator와 Emitter를 생성하면 된다. 이 두 메서드의 txStateId는 스트림을 생성할 때 입력한 값이 사용된다.

TridentTopology topology = new TridentTopology();
topology.newStream("log", new LogSpout()) // "log"가 txStateId 값으로 사용
    .each(new Fields("logString"), new OrderLogFilter())
    ....

getCoordinator()와 getEmitter()의 conf 파라미터는 getComponentConfiguration() 메서드에서 리턴한 Map 객체가 전달된다.

다음은 BatchCoordinator 인터페이스를 구현한 클래스의 예다.

public class LogBatchCoordinator implements BatchCoordinator<Long> {
    private static final Logger LOG = LoggerFactory.getLogger(LogBatchCoordinator.class);

    @Override
    public Long initializeTransaction(long txid, Long prevMetadata, Long currMetadata) {
        if (prevMetadata == null) {
            return 1L;
        }
        return prevMetadata + 1L;
    }

    @Override
    public void success(long txid) {
        LOG.info("BatchCoordinator.success(txid={})", txid);
    }

    @Override
    public boolean isReady(long txid) {
        // 튜플을 생성할 준비가 되면 true를 리턴한다.
        LOG.info("BatchCoordinator.isReady({})", new Object[] { txid });
        return true;
    }

    @Override
    public void close() {
        LOG.info("BatchCoordinator.close()");
    }
}

BatchCoordinator 인터페이스의 각 메서드는 다음과 같은 기능을 제공해야 한다.

 메서드

설명 

 isReady

트랜잭션ID에 해당하는 튜플을 생성할 준비가 되었으면 true를 리턴한다.

 initializeTransaction

튜플을 발생하기 전에 현재 트랜잭션ID를 위한 메타데이터를 생성한다. 각 파라미터는 다음과 같다.

  • txid: 현재 트랜잭션 ID
  • prevMetadata: 이전 트랜잭션의 메타 데이터
  • currMetadata: 현재 트랜잭션을 처음 시도하면 null. 현재 트랜잭션을 재시도하는 것이면, 앞서 최초 시도할 때 생성한 메타 데이터.
 success

 트랜잭션ID에 해당하는 튜플들을 성공적으로 처리하면 호출된다.

 close 토폴로지를 종료할 때 호출된다.


Emitter 인터페이스의 구현 예는 다음과 같다.


public class LogEmitter implements Emitter<Long> {

    private static final Logger LOG = LoggerFactory.getLogger(LogEmitter.class);


    @Override

    public void emitBatch(TransactionAttempt tx, Long coordinatorMeta,

            TridentCollector collector) {

        LOG.info("Emitter.emitBatch({}, {}, collector)", tx, coordinatorMeta);

        List<String> logs = getLogs(coordinatorMeta);

        for (String log : logs) {

            List<Object> oneTuple = Arrays.<Object> asList(log);

            collector.emit(oneTuple);

        }

    }


    private List<String> getLogs(Long coordinatorMeta) {

        List<String> logs = new ArrayList<String>();

        ... // 로그를 어딘가에서 읽어와 List에 담음

        return logs;

    }


    @Override

    public void success(TransactionAttempt tx) {

        LOG.info("Emitter.success({})", tx);

    }


    @Override

    public void close() {

        LOG.info("Emitter.close()");

    }


}


Emitter 인터페이스의 각 메서드는 다음 기능을 제공한다.


메서드 

설명 

 emitBatch

트랜잭션ID에 속할 튜플을 생성한다. 각 파라미터는 다음과 같은 용도로 사용된다.

  • tx 파라미터: 트랜잭션ID와 재시도회수 제공
  • coordinatorMeta: BatchCoordinator가 생성한 메타 데이터
  • collector: 트랜잭션ID에 속하는 튜플을 토폴로지에 추가
 success

 트랜잭션에 속한 모든 튜플을 성공적으로 처리했을 때 실행된다.

 close

 토폴로지를 종료할 때 실행된다.


스톰 토폴로지를 구현할 때 어려운 부분 중 하나가 스파우트다. 스파우트의 종류에는 Transactional, Opaque transactional, non-transactional의 세 가지가 있다고 했는데, 이 중 transactional이나 opaque transactional을 구현하려면 트랜잭션ID와 트랜잭션 메타데이터를 활용해야 한다. 예를 들어, Emitter는 emitBatch()를 실행할 때 캐시에 트랜잭션ID를 키로 하고 생성한 튜플 목록을 값으로 하는 (키, 값) 쌍을 외부의 메모리 캐시에 보관할 수 있을 것이다. 이후, 튜플 처리에 실패해서 재전송을 해야 한다면 외부 메모리 캐시에서 읽어와 동일한 튜플을 재전송할 수 있을 것이다. 이 경우, success() 메서드에서 외부 메모리 캐시에서 트랜잭션ID에 해당하는 값을 삭제하도록 구현할 것이다.


연산 처리 구성 요소: Function, Filter


튜플을 생성하면 그 다음으로 할 일은 튜플을 가공해서 원하는 결과를 얻어내는 것이다. 보통은 튜플을 가공하고, 걸러내는 과정을 거친 뒤에, 집합 연산을 수행하게 된다. 이를 위해 스톰 트라이던트는 Function, Filter, Aggregator를 제공하는데, 먼저 Function과 Filter에 대해 살펴보도록 하자. 이 두 인터페이스와 관련된 상위 타입 및 하위 타입은 아래 그림과 같이 구성되어 있다.



Function과 Filter를 사용할 때 초기화가 필요하면 prepare() 메서드를 이용한다. 반대로 종료시 정리가 필요하면 cleanup() 메서드를 이용한다.


Filter 구현


Filter는 튜플을 걸러내는 기능을 제공한다. Filter 인터페이스에 정의된 isKeep() 메서드는 튜플을 계속해서 스트림에 흘려 보낼지 여부를 결정한다. 다음은 Filter 인터페이스의 구현 예다. isKeep() 메서드를 보면 튜플 데이터 중 "logString"을 읽어와 이 문자열이 "ORDER"로 시작하면 true를 리턴하고 그렇지 않으면 false를 리턴한다. 따라서, 로그 문자열 값이 "ORDER"로 시작하는 것만 토폴로지의 다음 단계에 전달된다.


public class OrderLogFilter extends BaseFilter {

    private static final long serialVersionUID = 1L;

    private static final Logger LOG = LoggerFactory.getLogger(OrderLogFilter.class);


    @Override

    public boolean isKeep(TridentTuple tuple) {

        String logString = tuple.getStringByField("logString");

        boolean pass = logString.startsWith("ORDER");

        if (!pass)

            LOG.info("OrderLogFilter filtered out {}", logString);

        return pass;

    }


}


위 필터를 사용한 토폴로지 구성 코드 예는 다음과 같다. 이 코드를 보면 LogSpout에 의해 생성된 튜플을 OrderLogFilter로 먼저 필터링하고 있다. 따라서, 튜플에 포함된 "logString"의 값이 "ORDER"로 시작하는 것만 그 다음 단계인 LogParser에 전달된다.


TridentTopology topology = new TridentTopology();

topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

       ...코드생략


each() 메서드의 첫 번째 파라미터는 Filter.isKeep() 메서드에서 사용할 튜플 필드 목록을 뜻한다. 즉, 위 코드의 경우 OrderLogFilter의 isKeep() 메서드는 "logString" 필드만 사용할 수 있다. 이 예에서는 LogSpout가 [logString:값]인 튜플을 생성하기 때문에, 사용할 필드가 "logString" 밖에 없지만, 만약 여러 개의 필드를 생성한다면 new Fields("field1", "field2")처럼 여러 필드명을 지정할 수 있다.


Function 구현


Function은 튜플에 새로운 데이터를 추가할 때 사용한다. Function 인터페이스는 튜플을 가공하기 위한 execute() 메서드를 정의하고 있으며, 이 메서드에서 튜플을 알맞게 가공하면 된다. 다음은 Function 인터페이스의 구현 예를 보여주고 있다.


public class LogParser extends BaseFunction {


    private static final long serialVersionUID = 1L;


    @Override

    public void execute(TridentTuple tuple, TridentCollector collector) {

        String log = (String) tuple.getStringByField("logString");

        ShopLog event = parseLog(log);

        collector.emit(Arrays.<Object>asList(event));

    }


    private ShopLog parseLog(String log) {

        String[] tokens = log.split(",");

        return new ShopLog(tokens[0], Long.parseLong(tokens[1]), Long.parseLong(tokens[2]));

    }


}


execute() 메서드는 tuple로부터 가공에 필요한 필드 값을 구한다. 구한 필드 값으로 알맞은 처리를 한 뒤, collector.emit()을 이용해서 결과로 생성할 필드 데이터를 추가한다. collector.emit() 메서드는 List<Object> 타입을 인자로 받는데, 이 때 List에 담긴 각 값이 토폴로지 정의시 지정한 추가 필드가 된다. 예를 들어, 다음과 같이 LogParser Function을 사용한 경우, 위 코드의 execute() 메서드에서 collector.emit() 메서드에서 추가한 값 목록은 아래 코드의 each() 메서드에서 세 번째 파라미터로 지정한 필드 목록에 매칭된다.


topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        ...코드생략


Function 사용시 유희할 점은 Function은 기존 튜플의 필드를 제거하지 않고, 새로운 튜플을 추가한다는 점이다. 예를 들어, 위 코드를 보면 LogSpout는 [logString:값]인 튜플을 생성한다. LogParser를 거치면 튜플은 [logString:값, shopLog:값] 이 된다. 이 상태에서 AddGroupingValueFunction을 거치면 튜플은 [logString:값, shopLog:값, productId:time:값]이 된다. each() 메서드의 첫 번째 파라미터는 이 튜플 필드 중 Function이나 Filter에 전달할 필드 목록을 지정하는 것이며, 세 번째 파라미터는 Function의 실행 결과로 추가되는 필드 목록을 지정하는 것이다.


참고로, AddGroupingValueFunction 클래스는 로그를 그룹핑할 때 사용할 필드를 추가하는 기능을 제공한다. 이 클래스는 새로운 필드로 "제품ID:기준시간" 값을 추가한다. 각 제품의 분 당 판매개수를 구할 것이기 때문에, 타임스탬프 값을 60000으로 나눈 값을 기준시간 으로 사용해 봤다.


public class AddGroupingValueFunction extends BaseFunction {


    private static final long serialVersionUID = 1L;


    @Override

    public void execute(TridentTuple tuple, TridentCollector collector) {

        ShopLog shopLog = (ShopLog) tuple.getValueByField("shopLog");

        long time = shopLog.getTimestamp() / (1000L * 60L);

        collector.emit(Arrays.<Object>asList(shopLog.getProductId() + ":" + time));

    }


}


연산처리 구성 요소: 그룹핑과 집합 연산


트라이던트로 스트림을 처리하면 많은 경우 집합 연산을 하게 된다. 예를 들어, 예제의 경우는 분당 제품 판매 개수를 구해야 하는데, 이는 "제품ID:시간"을 기준으로 판매 개수를 구하는 집합 연산을 필요로 한다. 이런 집합 연산을 처리할 수 있도록 트라이던트 API는 다음의 두 가지를 제공하고 있다.

  • 그룹 스트림: 배치에 속한 튜플들을 특정 필드를 기준으로 그룹핑한다. 이 스트림은 그룹 단위로 연산을 적용한다.
  • 집합 연산: 배치에 속한 튜플을 모아 단일 결과(튜플)를 생성한다.

이 두 기능을 사용하면 특정 필드를 기준으로 개수를 구한다거나 합을 구하는 등의 연산을 수행할 수 있다. 그룹 스트림을 생성하는 방법은 매우 간단하다. 토폴로지 정의에서 groupBy() 메서드를 사용하면 된다.


topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .groupBy(new Fields("productId:time"))

        .aggregate(new Fields("productId:time"), new Count(), new Fields("count"))

        ...생략


groupBy() 메서드는 지정한 필드를 이용해서 튜플을 그룹핑하는 GroupedStream을 리턴한다.



GroupedStream의 aggregate() 메서드를 사용하면 각 그룹별로 집합 연산을 수행하게 된다. 예를 들어, 아래 코드는 groupBy로 생성된 그룹에 대해 "productId:time" 필드를 기준으로 Count 집합 연산을 수행한다.


topology.newStream("log", new LogSpout())

        .each(new Fields("logString"), new OrderLogFilter())

        .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

        .each(new Fields("shopLog"), new AddGroupingValueFunction(), new Fields("productId:time"))

        .groupBy(new Fields("productId:time"))

        .aggregate(new Fields("productId:time"), new Count(), new Fields("count"))

        ...생략


aggregate() 메서드의 세 번째 파라미터는 집합 연산 결과로 생성되는 필드를 정의한다. 위 코드의 경우 Count 집합 연산의 결과로 생성되는 필드가 한 개임을 알 수 있다. aggregate는 새로운 튜플을 생성하는데, GroupedStream의 aggregate는 첫 번째 파라미터로 지정한 필드와 세 번째 파라미터로 지정한 필드를 갖는 튜플을 생성한다. 즉, 위 코드의 경우 aggregate 메서드의 결과로 생성되는 튜플은 ["productId:time":값, "count":값] 으로 구성된다.


집합 연산 인터페이스


스톰 트라이던트 API는 집합 연산을 위한 인터페이스 세 개를 제공하며, 이들 인터페이스는 다음과 같다.



집합 연산: ReducerAggregator


ReducerAggregator 인터페이스는 다음과 같이 정의되어 있다.


public interface ReducerAggregator<T> extends Serializable {

    T init();

    T reduce(T curr, TridentTuple tuple);

}


ReducerAggregator의 동작 방식은 다음과 같다.

  1. 배치(또는 그룹)의 첫 튜플을 처리하기 전에 init()으로 초기값을 생성한다.
  2. 이 초기값을 현재값(curr)으로 설정한다.
  3. 각 튜플마다
    1. reduce(curr, 튜플)을 실행한다.
    2. 3-1의 결과를 curr에 설정한다.
  4. 마지막 튜플에 대한 reduct()의 결과값을 aggregate의 결과값으로 사용한다.
간단하게 튜플의 특정 필드의 합을 구하는 집합 연산을 ReducerAggregator로 구현해보면 다음과 같이 구할 수 있다.

public class Sum implements ReducerAggregator<Long> {
    private static final long serialVersionUID = 1L;

    @Override
    public Long init() {
        return 0L;
    }

    @Override
    public Long reduce(Long curr, TridentTuple tuple) {
        return curr + tuple.getLong(0);
    }
}


집합 연산: CombinerAggregator


CombinerAggregator 인터페이스는 다음과 같이 정의되어 있다.


public interface CombinerAggregator<T> extends Serializable {

    T init(TridentTuple tuple);

    T combine(T val1, T val2);

    T zero();

}


CombinerAggregator의 동작 방식은 다음과 같다.

  1. 배치(또는 그룹)에 튜플이 있다면, 각 튜플마다
    1. init()으로 튜플에 해당하는 값을 구한다.
    2. combine()을 이용해서 이전 combine()의 결과와 init()의 결과를 조합한 결과를 리턴한다.
      1. 첫 번째 튜플의 경우 이전 combine() 결과 값으로 zero() 값을 사용한다.

스톰은 CombinerAggregator의 구현인 Count를 제공하고 있는 Count의 코드는 다음과 같다. 배치(또는 그룹)의 튜플 개수를 셀 때 사용하는 CombinerAggregator이다.


public class Count implements CombinerAggregator<Long> {


    @Override

    public Long init(TridentTuple tuple) {

        return 1L;

    }


    @Override

    public Long combine(Long val1, Long val2) {

        return val1 + val2;

    }


    @Override

    public Long zero() {

        return 0L;

    }

    

}


집합 연산: Aggregator


Aggregator는 앞서 두 개 인터페이스보다 좀 더 범용적인 인터페이스를 정의하고 있다.


public interface Aggregator<T> extends Operation {

    T init(Object batchId, TridentCollector collector);

    void aggregate(T val, TridentTuple tuple, TridentCollector collector);

    void complete(T val, TridentCollector collector);

}


Aggregator는 다음과 같이 동작한다.

  1. 배치의 튜플을 처리하기 전에 init()을 실행하고, 집합 연산에 필요한 객체 val을 리턴한다.
  2. 각 튜플마다 aggregator 메서드를 호출한다. 첫 번째 파라미터는 1에서 리턴한 객체이다.
  3. 배치의 모든 튜플에 대한 처리가 끝나면 complete 메서드를 호출한다.
ReducerAggregator와 CombinerAggregator가 최종적으로 1개의 값을 생성하도록 제한된 인터페이스인 반면에, Aggregator는 한 개 이상의 튜플을 생성할 수 있다. 예를 들어, 다음은 배치(또는 그룹)에 있는 튜플 중에서 정렬 기준으로 N개의 튜플을 골라내는 Aggregator의 구현 코드이다. (스톰이 제공하는 클래스다.)

public static class FirstNSortedAgg extends BaseAggregator<PriorityQueue> {

    int _n;
    String _sortField;
    boolean _reverse;
    
    public FirstNSortedAgg(int n, String sortField, boolean reverse) {
        _n = n;
        _sortField = sortField;
        _reverse = reverse;
    }

    @Override
    public PriorityQueue init(Object batchId, TridentCollector collector) {
        return new PriorityQueue(_n, new Comparator<TridentTuple>() {
            @Override
            public int compare(TridentTuple t1, TridentTuple t2) {
                Comparable c1 = (Comparable) t1.getValueByField(_sortField);
                Comparable c2 = (Comparable) t2.getValueByField(_sortField);
                int ret = c1.compareTo(c2);
                if(_reverse) ret *= -1;
                return ret;
            }                
        });
    }

    @Override
    public void aggregate(PriorityQueue state, TridentTuple tuple, TridentCollector collector) {
        state.add(tuple);
    }

    @Override
    public void complete(PriorityQueue val, TridentCollector collector) {
        int total = val.size();
        for(int i=0; i<_n && i < total; i++) {
            TridentTuple t = (TridentTuple) val.remove();
            collector.emit(t);
        }
    }
}  


Aggregator를 이용해서 튜플 개수를 세는 코드는 다음과 같이 구현하고 있다.


public class CountAsAggregator extends BaseAggregator<CountAsAggregator.State> {


    static class State {

        long count = 0;

    }

    

    @Override

    public State init(Object batchId, TridentCollector collector) {

        return new State();

    }


    @Override

    public void aggregate(State state, TridentTuple tuple, TridentCollector collector) {

        state.count++;

    }


    @Override

    public void complete(State state, TridentCollector collector) {

        collector.emit(new Values(state.count));

    }

    

}


토플로지 구성 및 로컬 실행


스파우트와 Function, Filter, 집합 연산을 구현했다면, 남은 일은 TridentTopology를 이용해서 토폴로지를 구성하고 실행하는 것이다. 아래 코드는 토폴로지를 구성하고 로컬에서 실행하는 예제 코드를 보여주고 있다.


public class LogTopology {


    public static void main(String[] args) {

        TridentTopology topology = new TridentTopology();

        topology.newStream("log", new LogSpout())

                .each(new Fields("logString"), new OrderLogFilter())

                .each(new Fields("logString"), new LogParser(), new Fields("shopLog"))

                .each(new Fields("shopLog"), 

                         new AddGroupingValueFunction(), new Fields("productId:time"))

                .groupBy(new Fields("productId:time"))

                .aggregate(new Fields("productId:time"), new Count(), new Fields("count"))

                .each(new Fields("productId:time", "count"), 

                         new CountSumFunction(), new Fields("sum"))

                .each(new Fields("productId:time", "sum"), new ThresholdFilter())

                .each(new Fields("productId:time", "sum"), new AlertFilter());

        StormTopology stormTopology = topology.build();


        Config conf = new Config();

        conf.put("ThresholdFilter.value", 5L);

        LocalCluster cluster = new LocalCluster();

        cluster.submitTopology("cdc", conf, stormTopology);


        try {

            Thread.sleep(60000);

        } catch (InterruptedException e) {

        }

        cluster.shutdown();

    }


}


글에서 보여주지 않은 코드인 CountSumFunction, ThresholdFilter, AlertFilter 등은 참고자료에 있는 예제 코드 링크를 참고하기 바란다.


참고자료

  • 예제 코드: https://github.com/madvirus/storm-sample





  1. 96480XX 2014.10.01 15:10

    내가 아는 사람중에
    뭐 이렇게 좋은 내용의 블로그를 운영하는 사람이 있었다니 ...

Zookeeper 소개 발표 자료입니다.


하둡2의 YARN 짧게 보기 자료입니다.



  1. 해피데이 2014.08.04 18:31

    안녕하세요? 올려주신 hadoop2 자료로 대략적인 이해 및 개념이 잡혔습니다.
    감사합니다.

    해당 ppt를 보면서는 hadoop1의 JobTraker, TaskTraker, namenode, datanode가 별 의미가 없어졌다고 생각했는데,
    hadoop2 설치 및 start/stop 커맨드를 보니, namenode와 datanode를 별도로 start/stop 하더라구요.

    hadoop2에서 namenode, datanode, JobTraker, TaskTraker는 어떤 의미를 가지고 yarn과 관련해서는 언제, 어떻게 동작하는지 설명해 주실 수 있을까요?

    그리고...
    Hadoop Federation 이라고 하는 개념도 어떤 것인지 궁금합니다.

    • 최범균 madvirus 2014.08.04 19:01 신고

      우선, Hadoop은 크게 두 개의 기능을 제공하는데, 하나는 File 보관을 위한 HDFS이고, 다른 하나는 연산을 위한 MapReduce 입니다.
      이들을 위한 데몬이 다음과 같죠.
      * HDFS: namenode, datanode
      * MR: JobTracker, TaskTracker

      Hadoop1의 MR은 한계가 있었고, 이를 극복하기 위해 Hadoop2에서 연산 부분을 MR1에서 좀더 범용적인 YARN을 개비를 했습니다.
      그러면서, 다음과 같이 연산을 위한 데몬이 바뀌게 되죠.

      * HDFS: namenode, datanode (동일!)
      * YARN: ResourceManager, NodeManager

      즉, Hadoop2에서 Namenode와 datanode는 그대로 남아 있고,
      Hadoop1에서 MR을 위해서 존재하던 JobTracker와 TaskTracker가 사라지고(?)
      YARN을 위한 ResourceManager와 NodeManager로 대체되었습니다.

      Hadoop1에서 MR만 실행할 수 있었던 JobTracker/TaskTracker와 달리
      Hadoop2의 YARN은 MR 뿐만 아니라 다양한 연산을 실행할 수 있는 범용적인 프레임워크입니다.
      예를 들어, YARN을 기반으로 Spark나 스톰, JBoss 등을 실행할 수 있습니다.

      그리고, Namenode Federation은 Namenode의 단점을 해소하기 위해 나왔습니다.
      Namenode는 기본적으로 1대에서 모든 HDFS의 파일 정보를 제공하게 되는데, 이는 확장성에 한계를 갖게 만듭니다.
      그래서, 여러 대의 서버에 파일 정보를 나눠서 보관함으로써 처리 용량을 확장할 수 있게 만든게 Namenode Federation 입니다.

      Namenode가 죽으면 HDFS의 모든 서비스가 중지됩니다. 그래서 Namenode가 SPOF(Single Point Of Failure)가 됩니다.
      따라서, 한 대의 Namenode가 죽어도 서비스를 유지하기 위해 동일한 Namenode를 두 대 만드는데,
      이를 Namenode HA(high availability)라고 합니다.

      제가 이쪽 전문가는 아니어서 이 정도 밖에 설명을 못 드리겠네요.
      자료를 찾는데 단초가 되었으면 하고 바래봅니다.

오늘 Ambari 1.5.1을 이용해서 호튼웍스의 HDP 2.1을 설치하던 도중 다음과 같은 에러가 발생했다.


'ambari.clusterconfig' doesn't exist


이 문제는 MySQL의 데이터베이스를 UTF-8로 만들면서 발생했다. ambari가 사용할 DB로 기존에 설치되어 있던 MySQL 5.1 버전을 사용했고, 다음 스크립트를 이용해서 데이터베이스를 생성했다.


create database ambari character set utf8


ambari-server setup을 이용해서 ambari가 사용할 DB를 설정할 때, 위 코드로 생성한 DB를 설정해주었다. setup 과정에서 아무 에러 메시지 출력 없이 정상적으로 실행되었다.


ambari-server start로 ambari 서버를 실행하고, http://설치호스트:8080 으로 접속해서 설치를 시작했다. 설치 대상 서버부터, 각 구성 요소를 어느 서버에 설치할지, hive 등의 메타 정보를 어느 DB에 보관할지 등을 설정했고, 최종적으로 설치를 시작했다. 그런데, 앞서 언급했던 ['ambari.clusterconfig' doesn't exist'] 에러 메시지가 출력되면서 더 이상 설치를 진행할 수 없게 됐다.


configcluster 테이블이 왜 존재하지 않는지 확인해 보기 위해, ambari가 DB 테이블을 생성할 때 사용하는 스크립트를 찾아보았다. 스 스크립트에서 configcluster 테이블을 생성할 때 사용되는 쿼리를 찾아서 mysql 콘솔에서 직접 실행해보니 다음과 같은 오류가 발생했다.


ERROR 1071 (42000): Specified key was too long; max key length is 1000 bytes


켁! PK 생성 제약으로 configcluster 테이블을 생성하지 못한 것이다. ambari-server setup 과정에서 아무 오류 없이 넘어갔기에 테이블 생성이 안 된 것을 설치 과정 후반부에 안 것이다.


위 문제가 발생한 이유는 데이터베이스를 UTF-8로 생성했기 때문이다. configcluster 테이블이 PK로 사용하는 컬럼은 세 개인데, 그 중 2개 컬럼의 타입이 varchar(255)이다. 그런데, UTF-8을 사용하면서 한 글자가 최대 4개 바이트까지 차지할 수 있기 때문에, 두 개 컬럼이 실제로 차지할 수 있는 바이트 수가 가뿐히 1000을 넘기게 된다.


이 문제를 해결하는 가장 쉬운 방법은 DB를 latin 계열 캐릭터셋을 사용하도록 만드는 것이겠지만, 그러고 싶지 않았다. 좀 검색을 해 보니 MySQL 5.5의 경우 innodb_large_prefix 옵션을 true로 주면 1000 바이트가 넘는 PK를 생성할 수 있다는 것을 알게 되었다. (http://dev.mysql.com/doc/refman/5.5/en/innodb-parameters.html#sysvar_innodb_large_prefix 참고)


[mysqld]

...다른설정들

innodb_large_prefix = true


기존에 설치된 MySQL 5.1에 아무것도 없었기에, 그냥 5.5로 바꾸고 위 옵션을 추가해 주었다. MySQL 5.1을 사용하고 있다면, utf-8이 아닌 euc-kr이나 다른 캐릭터 셋을 사용하도록 설정해서 PK의 바이트 길이가 1000을 넘지 않도록 해 줘야 할 것 같다.


+ Recent posts