これなら使える!ビッグデータ分析基盤のエコシステム

第2回ストリーミング処理とバッチ処理によるデータ収集 ~ Fluentd編 ~

データの種類と頻度に合わせたデータ収集方法

データ分析を行っていく上で、データ収集を行うことは非常に時間がかかる作業です。たとえば、ログデータはサーバの数だけ散在しているだけでなく、アプリケーションごとにログのフォーマットが異なるということも往々にしてあります。また、外部サービスと連携している場合やデータベースに保存されているデータのように、定期的にダンプされたCSVファイルを収集するケースもあります。

こうした多種多様なデータを横断的に分析するためには、これらのデータを1つの場所に集計しやすい形で収集する必要があります。

このようなケースにおいて、データ収集を発生頻度によってストリーミング処理とバッチ処理の2つの方法に分けることで対応することができます。

ストリーミング処理では、Webサーバのアクセスログやセンサーデータなどのリアルタイムに生成されるデータを対象とします。そのために適したツールとしてFluentdを利用します。バッチ処理では、RDBに格納されるマスターデータや外部サービスからFTPなどで定期的に送られてくるデータを定期的に転送する必要があるデータを対象とします。このために適したツールとしてEmbulkを利用します。

図1 データ収集システムの概要
図1 データ収集システムの概要

今回と次回で、FluentdとEmbulkのインストールから分析エンジンに収集するまでの流れを紹介していきます。

Fluentdでアクセスログをストリーミングで収集する

Fluentdとは

Fluentdとは、米トレジャーデータ社が中心となり、オープンソースとして開発を行っているログコレクタです。特徴として、スキーマレスであること、データ入出力加工がプラグイン機構であること、バッファリングの機構によって信頼性の高いログ転送の仕組みになっていること、という点が挙げられます

Fluentdのインストール

Fluentdを使うためには、下記のソフトウェアのいずれかをインストールすることによって実現できます。ここでは、Rubyやそのライブラリの依存関係を考慮したパッケージのtd-agent2を使ってインストールを行います。

Fluentd
Rubyのパッケージ管理システムのRubyGemsで配布をされており、最新版や独自Rubyバージョンで利用したい場合にはこちらを利用します。
td-agent
米トレジャーデータ社によって、依存関係を考慮してパッケージングされており、基本的にはtd-agentをインストールするのが一番簡単です。また、現在のところ下記の2種類が提供されています。
td-agent1
Ruby1.9系でパッケージングされたバージョンです。Ruby1.9系の修正・サポートの提供が停止したため、td-agent1もセキュリティフィクスのみの提供となっています。
td-agent2
Ruby2.1系でパッケージングされたバージョンです。現在の安定版となっているため、通常はこちらを利用してください。

さて、それでは下記のインストールスクリプトを実行してみましょう。2015/07/23時点ではtd-agent2.2.1が最新版となっており、今回の利用環境としてはUbuntu14.04を利用しています。

$ curl -L https://td-toolbelt.herokuapp.com/sh/install-ubuntu-trusty-td-agent2.sh | sh

アクセスログのフォーマット

Fluentdをインストールして、ログを収集する準備はできました。さて、それでは対象となるアクセスログについて考えてみましょう。

一般的に、Apacheのアクセスログは下記のフォーマットでログ出力されています。

Apacheアクセスログの標準フォーマット
LogFormat "%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\"" combined
表1 Apacheアクセスログの標準フォーマット
Format String説明
%hリクエストしたリモートホスト名
%lリモートユーザ名
%u認証に使用されたリモートユーザ
%tリクエストを受けた時刻
%rHTTPリクエストヘッダ
%>sサーバがリクエストに対して返したステータスコード
%bHTTPヘッダを除いた転送バイト数
%{Referer}iRefererヘッダの内容
%{User-Agent}iUser-Agentヘッダの内容

しかし、表1を見たときに、Webサイトにアクセスするユーザを一意に特定できるかについて考えると、情報が不足していることがわかります。上記データの場合には、基本KPIの分析はある程度可能ですが、ユーザごとのWebサイト上のパスを見る応用KPIの分析を行うには難しいことが考えられます。そこで、アクセスログの中にクッキーIDなどのユーザを一意に取得するための情報を入れておくことで応用KPIの分析にたどり着くことが可能です。

クッキーIDを設定するためのApacheのログを生成するためには、Apacheのusertrack_moduleを使うことで今回は実現しています。そして、このクッキー情報をログフォーマットに追加します。

また合わせて、デフォルトのApacheのログフォーマットは可読性がよくなく、任意のパラメータを入れた際に、ログをパースすることが煩雑になりがちです。そこで、LTSV(Labeled Tab-separated Values)形式にしておきます。LTSV形式ではカラム名とその値がセットになって扱われ、delimiterとしてはタブがデフォルトで利用されます。

これらの設定を実現するためには、Apacheのログ出力のフォーマットとして下記を設定します。

TSV形式に対応したアクセスログのフォーマット
"time:%{%Y-%m-%d %H:%M:%S %z}t\tdomain:%V\thost:%h\tserver:%A\tident:%l\tuser:%u\tmethod:%m\tpath:%U%q\tprotocol:%H\tstatus:%>s\tsize:%b\treferer:%{Referer}i\tagent:%{User-Agent}i\tresponse_time:%D\tcookie:%{cookie}i\tset_cookie:%{Set-Cookie}o" ltsv
表2 識別情報を加えLTSV形式に対応したアクセスログのフォーマット
項目名 Format String 説明
domain %V Hostヘッダの内容
host %h リクエストしたリモートホスト名
server %A 応答を返したサーバのローカルIPアドレス
ident %l リモートユーザ名
user %u 認証に使用されたリモートユーザー名
time %{%Y-%m-%d %H:%M:%S %z}t リクエストを受けた時刻
method %m リクエストメソッド
path %U%q REQUEST_URI
protocol %H リクエストプロトコル
status %>s サーバがリクエストに対して返したステータスコード
size %b HTTPヘッダを除いた転送バイト数
referer %{Referer}i Refererヘッダの内容
agent %{User-Agent}i User-Agentヘッダの内容
response_time %D リクエストを処理するのにかかった時間、マイクロ秒単位
cookie %{cookie}i サーバが受信したクッキー
set_cookie %{Set-Cookie}o サーバが送出したクッキー

上記のパラメータを元に生成されたログは下記のようになります。

変更後のアクセスログ
time:2015-07-26 08:58:20 +0000  domain:52.69.91.201 host:153.232.253.97 server:172.31.6.70  
ident:- user:-  method:GET  path:/index.html    protocol:HTTP/1.1   status:200  size:3256   
referer:-   agent:Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_4) AppleWebKit/600.7.12 (KHTML, like Gecko) Version/8.0.7 Safari/600.7.12    
response_time:795   cookie:-    set_cookie:Apache=2a4375d6.51bc3704b915f; path=/; expires=Tue, 25-Jul-17 08:58:20 GMT

アクセスログ(LTSV形式)を集める

ここまでの説明でApacheのアクセスログをLTSV形式で/var/log/apache2/access.logに出力するようにしました。そこで、FluentdでLTSV形式のログを収集してみます。読み込んだファイルは、テストとして標準出力に出力しておきます。

LTSV形式のログを収集するためのFluentdの設定
<source>
  @type tail
  path /var/log/apache2/access.log
  pos_file /var/log/td-agent/access.log.pos
  tag apache.access
  format ltsv
  time_key time
  read_from_head
</source>

<match apache.access>
  @type stdout
</match>

設定ファイルを/etc/log/td-agent.confに配置し、td-agentを起動します。

$ service td-agent start

すると、td-agentのログ(/var/log/td-agent/td-agent.log)には下記が出力されます。

td-agentで収集されたログ
2015-07-26 17:58:20 +0900 apache.access: {"domain":"52.69.91.201",
"host":"153.232.253.97","server":"172.31.6.70",
"ident":"-","user":"-","method":"GET",
"path":"/index.html","protocol":"HTTP/1.1",
"status":"200","size":"3256","referer":"-",
"agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_4) AppleWebKit/600.7.12 (KHTML, like Gecko) Version/8.0.7 Safari/600.7.12",
"response_time":"795","cookie":"-","set_cookie":"Apache=2a4375d6.51bc3704b915f; path=/; expires=Tue, 25-Jul-17 08:58:20 GMT"}
2015-07-26 17:58:21 +0900 apache.access: {"domain":"52.69.91.201",
"host":"153.232.253.97","server":"172.31.6.70",
"ident":"-","user":"-","method":"GET",
"path":"/index.html","protocol":"HTTP/1.1",
"status":"200","size":"3256","referer":"-",
"agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_4) AppleWebKit/600.7.12 (KHTML, like Gecko) Version/8.0.7 Safari/600.7.12",
"response_time":"670","cookie":"Apache=2a4375d6.51bc3704b915f","set_cookie":"-"}

集めたログを分析エンジンへ送る

さて、Fluentdで収集したデータを第1回で紹介した分析エンジンに送りたいところですが、今回はMySQLを分析エンジンに見立てて進めていきます。

また、バックアップ用途としてAmazon S3にも並行してデータを送ります。これは、分析エンジンによってはストリーミングインポートの量が多い場合に、分析エンジン側が負荷により詰まってしまい、インポートができなくなることがあります。そのため、何か問題があったときの復旧用に別なストレージサービスにデータを送っておくために必要です。

まずはデフォルトではインストールされないmysqlのプラグインをインストールします。

$ td-agent-gem install fluent-plugin-mysql
# エラーが発生した場合は、ubuntuの場合は下記をインストールしておきましょう
$ sudo apt-get install libmysqlclient-dev gcc make

前述の設定ファイルと組み合わせた設定が下記となります。

MySQLで分析するためのログを生成する設定
# ファイルからtailで読み込みます
<source>
  @type tail # @は、plugin固有の設定ではなく、Fluentd自体の設定のときの区別用に付与します
  path /var/log/apache2/access.log
  pos_file /var/log/td-agent/access.log.pos
  tag apache.access
  format ltsv # LTSV形式としてパースします
  time_key time
  read_from_head # ファイルの先頭から読み込みます
</source>

<match apache.access>
  @type copy

  <store>
    @type s3

    aws_key_id AWS_KEY_ID
    aws_sec_key AWS_SECRET_KEY

    s3_bucket BUCKET_NAME
    s3_region ap-northeast-1

    format json

    time_slice_format %Y%m%d/%Y%m%d-%H%M
    time_slice_wait 10m

    path apache_logs/

    s3_object_key_format %{path}%{time_slice}_%{index}_%{hostname}.%{file_extension}

    store_as gzip

    buffer_type file
    buffer_path /var/log/td-agent/buffer/s3

    buffer_chunk_limit 32m
    buffer_queue_limit 1024
  </store>

  <store>
    @type mysql
    host HOSTNAME
    port 3306
    database DATEBASE
    username USERNAME
    password PASSWORD
    include_time_key yes
        time_format %Y-%m-%d %H:%M:%S %z
    key_names domain,host,server,ident,user,time,method,path,protocol,status,size,referer,agent,response_time,cookie,set_cookie
    sql INSERT INTO accesslog (domain,host,server,ident,user,time,method,path,protocol,status,size,referer,agent,response_time,cookie,set_cookie) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
    flush_interval 10s

    buffer_type file
    buffer_path /var/log/td-agent/buffer/mysql

    buffer_chunk_limit 32m
    buffer_queue_limit 1024
  </store>
</match>

先ほど、match句では、stdoutを利用して標準出力に送っていましたが、copyを今度は使っています。copyプラグインを利用することで、同一のタグのデータに対して複数箇所へのアウトプットができるようになります。

またインポート前の準備として、MySQLにログ用テーブル(testdb.accesslog)を作っておきます。

testdb.accesslogの内容
CREATE  TABLE IF NOT EXISTS `testdb`.`accesslog` (
  `log_id` INT NOT NULL AUTO_INCREMENT ,
  `domain` VARCHAR(255) NOT NULL ,
  `host` VARCHAR(255) NOT NULL ,
  `server` VARCHAR(255) NOT NULL ,
  `ident` VARCHAR(255) NOT NULL ,
  `user` VARCHAR(255) NOT NULL ,
  `time` TIMESTAMP NOT NULL ,
  `method` VARCHAR(255) NOT NULL ,
  `path` VARCHAR(512) NOT NULL ,
  `protocol` VARCHAR(255) NOT NULL ,
  `status` INT NOT NULL ,
  `size` INT NOT NULL ,
  `referer` VARCHAR(512) NOT NULL ,
  `agent` VARCHAR(255) NOT NULL ,
  `response_time` DOUBLE NOT NULL ,
  `cookie` VARCHAR(512) NOT NULL ,
  `set_cookie` VARCHAR(512) NOT NULL ,
  PRIMARY KEY (`log_id`, `time`));

それでは、前述のログを再度送ってみましょう。

その後、td-agentを起動させ、問題がなければMySQLのテーブルには下記のようにデータが入ってきます。

表3 MySQLに送り込まれたログデータ
log_id12
domain52.69.91.20152.69.91.201
host153.232.253.97153.232.253.97
server172.31.6.70172.31.6.70
ident--
user--
time2015-07-26 08:58:202015-07-26 08:58:21
methodGETGET
path/index.html/index.html
protocolHTTP/1.1HTTP/1.1
status200200
size32563256
referer--
agentMozilla/5.0 (Macintosh; Intel Mac OS X 10_10_4) AppleWebKit/600.7.12 (KHTML, like Gecko) Version/8.0.7 Safari/600.7.12 Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_4) AppleWebKit/600.7.12 (KHTML, like Gecko) Version/8.0.7 Safari/600.7.12
response_time795670
cookie-Apache=2a4375d6. 51bc3704b915f
set_cookieApache=2a4375d6. 51bc3704b915f; path=/; expires=Tue, 25-Jul-17 08:58:20 GMT-

さて、ここまでFluentdによるWebサーバのアクセスログの収集が完了です。

おまけ:最近のFluentdの動向

Fluentdでは、2014年12月12日にv0.12がリリースされ、いくつかの大きな機能が含まれました。このv0.12はtd-agent2でも導入されているため、このv0.12から取り込まれた新しい3つの機能と最近のトピックについて紹介したいと思います。

1. Filter pluginの導入

Fluentdが普及するにつれて、単純なログ転送だけでなく、データの加工や付加情報を付与したいといったさまざまなユースケースが生まれてきました。従来のバージョンでは、こうしたユースケースに対して、Output pluginを利用して加工処理の対応していました。しかし、Output pluginを使うためには、タグの書き換えが発生し、Pluginの数が増えた際に非常に可読性が良くない設定ファイルになってしまうという問題が起こっていました。

そこで、Fluentd v0.12ではFilter pluginの仕組みが導入されました。Filterを用いることであるタグに対して、InputからOutputまでの流れの中でのデータ加工が簡単になります。

# Fluentd v0.10の設定
<source>
  @type tail
  tag raw.foo
</source>

<match raw.foo>
  @type filter
  remove_tag_prefix raw.
</match>

<match foo>
  @type s3
</match>

上記の設定では、Fluentd v0.10ではFilterプラグインを使うために、一度raw.fooでマッチをさせ、その後rawタグを取り除き、次のmatchに処理されるようになっています。

Source -> Input - TAG(raw.foo) -> Output -> - TAG(foo) -> Output -> Database
         |------------------ Fluentd -----------------------------|

しかし、Fluentd v0.12の下記の設定では、Filter pluginを利用したフィルタを行っているため、tagの書き換えが不要になります。

# Fluentd v0.12の設定
<source>
  @type tail
  tag foo
</source>

<filter foo>
  @type filter
</match>

<match foo>
  @type s3
</match>
Source -> Input - TAG(foo) -> Filter -> - TAG(foo) -> Output -> Database
         |---------------- Fluentd ---------------------------|

2. Label機能の導入

Fluentdでは、レコード毎に付与されるタグを元に、入出力加工のルーティングを行っていました。 しかし、入出力が複数ある場合にタグを都度書き換える必要があり、管理が複雑化してしまうことがあったため、このラベル機能を使って入力ごとに処理を分けることができるようになりました。

下記の設定ファイルの場合には、@PRODのforwardへの入力はTreasureDataにアウトプットされ、@TESTのテスト用のforwardへの入力は標準出力に出力されます。

<source>
  type forward
    @label @PROD
    port 24224
</source>

<source>
  @type forward
  @label @TEST
  port 24225
</source>

<label @PROD>
  <match forward.**>
    @type tdlog
  </match>
</label>

<label @TEST>
  <match forward.**>
    @type stdout
  </match>
</label>

3. At-least-onceのサポート

Fluentdのin_forward/out_forwardプラグインは今までat-most-onceしかサポートしていませんでしたが、v0.12からat-least-onceをサポートするようになりました。

at-least-onceとは、送り先のFluentdがin_forwardがackを返すまでout_forward側がチェックし、タイムアウトなどの理由でackが返ってこなければ、再度データを再送します。そのため、out_forwardは少なくとも1度はin_forwardにデータを送ったことを保証することができるようになりました。

これにより、out_forwardがデータを送ったはずだけれども、なんらかの理由でin_forwardがデータを受け取れていなかったというデータ欠損の可能性を防ぐことができるようになりました。

<match forward.**>
  @type forward
  require_ack_response
</match>

4. DockerのLogging Driverへの導入

最近のFluentdの話題として、Docker v1.6のLogging DriverというDockerコンテナのログを送信するための柔軟なログ構成をするための仕組みの1つにFluentdが組み込まれました。これによりDocker上で動くアプリのログだけでなく、Docker自身のログもFluentdで集めることができるようになりました(参考:Fluentd logging driver⁠。

コンテナ管理のKubernetesでのロギングレイヤーとしてGoogleがFluentdを採用しただけでなく、Docker自身のログ管理としてもFluentdが導入され、今後もますますロギングのデファクトスタンダードになることを目指して開発が進められています。

図2 Docker-Fluentd連携
図2 Docker-Fluentd連携

ストリーミング処理のおわりに

今回は、データ収集の方法をストリーミング処理とバッチ処理の2種類に分け、ストリーミング処理の1つであるFluentdの紹介を行いました。

分析エンジンによっては、Fluentdのようなストリーミング処理によるインポートが向いていないこともあるため、一旦S3にデータを送っておき、その後S3から分析エンジンにバッチ処理で送るということもあります。このようなケースにも利用できるバッチ処理のためのEmbulkを次の回では紹介します。

おすすめ記事

記事・ニュース一覧