기존 access 로그를 이용해서 데이터를 추출해야 할 일이 생겨서 ELK를 사용했다. 앞으로도 비슷한 요구가 생기면 유사한 설정, 코드를 사용할 것 같아 기록으로 남긴다.
로그스태시(logstash) 설정 파일
여러 웹 어플리케이션의 access 로그 파일을 로그스태시를 이용해서 일레스틱서치에 밀어 넣었다. 각 웹 어플리케이션마다 미세하게 파싱해야 하는 내용이 달라, 로그스태시 설정 파일을 웹 어플리케이션 별로 하나씩 만들었다. 다음은 그 파일 중 하나이다.
input {
stdin {
type => "web"
add_field => {
appname => "myappname"
}
}
}
filter {
grok {
match=> { message => "%{IPORHOST:clientip} %{HTTPDUSER:ident} %{HTTPDUSER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{URIPATH:uripath}(?<query>(\?\S+)?)(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response} (?:%{NUMBER:bytes}|-)"}
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
grok {
match => {
uripath => "(/im/(?<context>(ui))/\S*)|(/gcs/(?<context>(\S+))/.*)"
}
}
mutate {
remove_field => ["query", "uripath"]
add_field => {
"methoduri" => "%{verb} %{uripath}"
}
}
metrics {
meter => "documents"
add_tag => "metric"
flush_interval => 60
}
}
output {
if "_grokparsefailure" not in [tags] {
elasticsearch {
hosts => "일레스틱서버주소"
flush_size => 20000
}
}
if "metric" in [tags] {
stdout {
codec => line {
format => "1m rate: %{[documents][rate_1m]} ( %{[documents][count]} )"
}
}
}
}
- input의 stdin: 기존 로그 파일을 cat 명령어를 이용해서 logstash에 전달할거라서 stdin을 입력으로 설정했다.
- filter의 첫 번째 grok: match의 message 패턴으로 로그스태시가 기본 제공하는 HTTPD_COMMONLOG 패턴을 사용하지 않은 이유는 요청 경로에서 쿼리 문자열과 요청 URI를 분리할 필요가 있었기 때문이다. 참고로 HTTPD_COMMONLOG 패턴은 쿼리문자열을 포함한 요청 경로를 request 필드로 추출한다.
- filter의 date: 로그의 timestamp 필드를 이벤트의 @timestamp 값으로 사용한다. date를 사용하지 않으면 로그스태시가 로그 파일을 넣는 시점을 @timestamp 값으로 사용하기 때문에 기간별 로그 분석을 하기 어려워진다.
- filter의 mutate: 요청 방식과 요청 URI를 합쳐서 하나로 분석해야 해서 이 둘을 연결한 값을 새로운 필드로 추가한다.
- filter의 metrics: 로그스태시가 얼마나 처리하는지 보기 위해 메트릭 값을 60초마다 생성한다.
- output의 elasticsearch: flush_size 값은 일레스틱서치 서버의 상황을 고려해 설정한다. 1000~5000 사이에서 시작해서 테스트하면서 점진적으로 늘려서 처리속도를 높여나간다.
- output의 stdout: metrics가 생성한 메트릭 값을 콘솔에 출력한다. 초당 몇 개씩 처리하는지 1분 주기로 콘솔에 찍혀서 그나마 덜 심심하다.
로그 밀어넣기 위한 실행 스크립트
기존 로그 파일을 특정 폴더에 모아두고, 그 파일을 로그스태시로 일레스틱서치에 쭈욱 밀어 넣었다. 파일 하나하나 수동으로 하면 힘드니까 다음의 쉘 파일을 만들어서 실행했다.
#!/bin/bash
FILES="./로그폴더/*.log"
for f in $FILES
do
echo "$f start"
SECONDS=0
cat $f | ../logstash/bin/logstash -b 2500 --config app1.logstash.conf
duration=$SECONDS
echo "$f done [$(($duration / 60))m $(($duration % 60))s elapsed]"
echo "---"
sleep 3
done
테스트하는 동안 심심함도 달래고 성능도 확인할 겸 로그 파일 한 개를 처리하는데 걸리는 시간을 측정해서 값을 출력하도록 했다.
실행 결과
작성한 쉘 스크립트를 돌리면 콘솔에 다음과 같이 시간 값이 찍혀서 덜 심심하다.
$ ./logdump.app1.sh
...
./로그폴더/access_2016_04_06.log start
Settings: Default pipeline workers: 8
Pipeline main started
1m rate: 6336.785481508631 ( 407172 )
1m rate: 6937.548787233175 ( 856873 )
Pipeline main has been shutdown
stopping pipeline {:id=>"main"}
./misweb_in1_rest/access_2016_04_06.log done [2m 17s elapsed]
...
...
일단 1-2개 로그 파일로 작업해보고 퇴근 전에 nohup을 걸어서 나머지를 밀어 넣었다.
$ nohup ./logdump.app1.sh > dump.log 2>&1 &