주요글: 도커 시작하기
반응형

기존 access 로그를 이용해서 데이터를 추출해야 할 일이 생겨서 ELK를 사용했다. 앞으로도 비슷한 요구가 생기면 유사한 설정, 코드를 사용할 것 같아 기록으로 남긴다.


로그스태시(logstash) 설정 파일


여러 웹 어플리케이션의 access 로그 파일을 로그스태시를 이용해서 일레스틱서치에 밀어 넣었다. 각 웹 어플리케이션마다 미세하게 파싱해야 하는 내용이 달라, 로그스태시 설정 파일을 웹 어플리케이션 별로 하나씩 만들었다. 다음은 그 파일 중 하나이다.


input {

  stdin {

    type => "web"

    add_field => {

      appname => "myappname"

    }

  }

}


filter {

  grok {

    match=> { message => "%{IPORHOST:clientip} %{HTTPDUSER:ident} %{HTTPDUSER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{URIPATH:uripath}(?<query>(\?\S+)?)(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response} (?:%{NUMBER:bytes}|-)"}

  }

  date {

    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]

  }

  grok {

    match => {

      uripath => "(/im/(?<context>(ui))/\S*)|(/gcs/(?<context>(\S+))/.*)"

    }

  }

  mutate {

    remove_field => ["query", "uripath"]

    add_field => {

      "methoduri" => "%{verb} %{uripath}"

    }

  }

  metrics {

    meter => "documents"

    add_tag => "metric"

    flush_interval => 60

  }

}


output {

  if "_grokparsefailure" not in [tags] {

    elasticsearch {

      hosts => "일레스틱서버주소"

      flush_size => 20000

    }

  }


  if "metric" in [tags] {

    stdout {

      codec => line {

        format => "1m rate: %{[documents][rate_1m]} ( %{[documents][count]} )"

      }

    }

  }

}


설정에서 특이한 점은 다음과 같다.
  • input의 stdin: 기존 로그 파일을 cat 명령어를 이용해서 logstash에 전달할거라서 stdin을 입력으로 설정했다.
  • filter의 첫 번째 grok: match의 message 패턴으로 로그스태시가 기본 제공하는 HTTPD_COMMONLOG 패턴을 사용하지 않은 이유는 요청 경로에서 쿼리 문자열과 요청 URI를 분리할 필요가 있었기 때문이다. 참고로 HTTPD_COMMONLOG 패턴은 쿼리문자열을 포함한 요청 경로를 request 필드로 추출한다.
  • filter의 date: 로그의 timestamp 필드를 이벤트의 @timestamp 값으로 사용한다. date를 사용하지 않으면 로그스태시가 로그 파일을 넣는 시점을 @timestamp 값으로 사용하기 때문에 기간별 로그 분석을 하기 어려워진다.
  • filter의 mutate: 요청 방식과 요청 URI를 합쳐서 하나로 분석해야 해서 이 둘을 연결한 값을 새로운 필드로 추가한다.
  • filter의 metrics: 로그스태시가 얼마나 처리하는지 보기 위해 메트릭 값을 60초마다 생성한다.
  • output의 elasticsearch: flush_size 값은 일레스틱서치 서버의 상황을 고려해 설정한다. 1000~5000 사이에서 시작해서 테스트하면서 점진적으로 늘려서 처리속도를 높여나간다.
  • output의 stdout: metrics가 생성한 메트릭 값을 콘솔에 출력한다. 초당 몇 개씩 처리하는지 1분 주기로 콘솔에 찍혀서 그나마 덜 심심하다.

로그 밀어넣기 위한 실행 스크립트


기존 로그 파일을 특정 폴더에 모아두고, 그 파일을 로그스태시로 일레스틱서치에 쭈욱 밀어 넣었다. 파일 하나하나 수동으로 하면 힘드니까 다음의 쉘 파일을 만들어서 실행했다.


#!/bin/bash

FILES="./로그폴더/*.log"

for f in $FILES

do

  echo "$f start"

  SECONDS=0

  cat $f | ../logstash/bin/logstash -b 2500 --config app1.logstash.conf

  duration=$SECONDS

  echo "$f done [$(($duration / 60))m $(($duration % 60))s elapsed]"

  echo "---"

  sleep 3

done


logstash의 -b 옵션은 프로세스 파이프라인 배치 크기이다. 파이프라인 워커 개수는 기본이 8이다. 워커개수*배치크기를 elasticsearch의 flush_size와 맞췄다.

테스트하는 동안 심심함도 달래고 성능도 확인할 겸 로그 파일 한 개를 처리하는데 걸리는 시간을 측정해서 값을 출력하도록 했다.


실행 결과


작성한 쉘 스크립트를 돌리면 콘솔에 다음과 같이 시간 값이 찍혀서 덜 심심하다.


$ ./logdump.app1.sh

...

./로그폴더/access_2016_04_06.log start

Settings: Default pipeline workers: 8

Pipeline main started

1m rate: 6336.785481508631 ( 407172 )

1m rate: 6937.548787233175 ( 856873 )

Pipeline main has been shutdown

stopping pipeline {:id=>"main"}

./misweb_in1_rest/access_2016_04_06.log done [2m 17s elapsed]

...

...


일단 1-2개 로그 파일로 작업해보고 퇴근 전에 nohup을 걸어서 나머지를 밀어 넣었다.


$ nohup ./logdump.app1.sh > dump.log 2>&1 &



+ Recent posts