これなら使える!ビッグデータ分析基盤のエコシステム

第3回 ストリーミング処理とバッチ処理によるデータ収集 ~ Embulk編 ~

この記事を読むのに必要な時間:およそ 7 分

データ分析の最初の手順であるデータ収集には大きく分けてストリーミング処理とバッチ処理という2つの方法があります。前回は,ストリーミング処理の1つであるFluentdによるデータ収集の方法を紹介しました。今回はもう一方のバッチ処理による収集をEmbulkというツールで行う方法を解説します。

図1 今回紹介する処理(図中の色のついた部分)

図1 今回紹介する処理(図中の色のついた部分)

Embulkでマスターデータをバッチ処理として収集する

Embulkとは

Fluentdはリアルタイムに生成されるデータに対しては有効なツールでした。しかし,RDBのマスターデータや外部サービスから定期的に数GB生成されるようなデータに対してはFluentdではカバーできないという課題もわかってきました。

こうしたケースに対して,これまでユーザは自前のスクリプトやETL(Extract,Transform,Load)ツールを作って,データ収集を行っていましたが,エラーハンドリングやリトライの仕組み,インポート速度といったパフォーマンスの問題などの課題があり,スクリプトを作ること自体が大変労力のかかる作業となってしまっていました。

こうした課題を解決するために,Embulkという並列データ転送フレームワークが開発されました。

Embulkの特徴として,Fluentdと同じプラグイン機構を持っていることが挙げられます。入力,出力,データ加工などのプラグインを書くことで足りない機能を補完し,現場で使えるツールに拡張していくことができるのです。また,これらがオープンソースとして公開されており,各データソース毎にさまざまな制約があるという課題を多くの人と共有することで,ノウハウが詰まったスクリプトを継続的にメンテナンスできるようになります。EmbulkのPluginのリストはこちらで確認することができます。

そして,Embulkでデータのインポートは,3つのステップで実現することができます。

  1. guess
    データを一部読み込み,自動でスキーマを推定し,設定ファイルを生成します。
  2. preview
    設定ファイルのスキーマ情報を元に読み込んだ際のプレビューを行います。
    ここで想定と異なるスキーマに成ってしまっていた場合には,設定ファイルを手動で修正します。
  3. run
    完成した設定ファイルを元にEmbulkを動かし,データの転送を実行します。

Embulkをインストールする

では,Embulkをインストールしてみましょう。前回のFluentdと同様に,Ubuntu14.04にセットアップをします。なお,EmbulkはWindowsでも動作させることが可能です

$ curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
$ chmod +x ~/.embulk/bin/embulk
$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc

マスターデータのフォーマット

Embulkをインストールして,データを収集する準備はできました。それでは対象となるデータについて見てみましょう。

今回は,PostgreSQLにマスターデータが格納されていることにします。一般的にマスターデータにはユーザ情報が格納されていると思います。今回は前回使ったアクセスログとは関係ないデータで試します,下記のようにテーブルが定義されているとします。

表1 PostgreSQL上のマスターデータ

user_id sex last_update closed_account_time age city device country
1190452 1 2011-07-08 22:13:19 0001-01-01 00:00:00 21 miyagi smart phone japan
1581708 1 2010-04-04 23:33:52 0001-01-01 00:00:00 40 tokyo feature phone japan
1629941 0 2009-11-04 20:41:10 0001-01-01 00:00:00 35 tokyo smart phone japan

このデータをローカルのPostgreSQLのtestdb以下で定義しておきます。また合わせてサンプルデータもインポートしておきます。

create table users (
  user_id bigint PRIMARY KEY,
  sex smallint,
  last_update timestamp NOT NULL,
  closed_account_time timestamp NOT NULL,
  age smallint,
  city varchar(30),
  device varchar(30),
  country varchar(30)
);

INSERT INTO users (user_id, sex, last_update, closed_account_time, age, city, device, country)
VALUES
(1190452, 1, '2011-07-08 22:13:19', '0001-01-01 00:00:00', 21, 'miyagi' ,'smart_phone', 'japan'),
(1581708, 1, '2010-04-04 23:33:52', '0001-01-01 00:00:00', 40, 'tokyo', 'feature phone', 'japan'),
(1629941, 0, '2009-11-04 20:41:10', '0001-01-01 00:00:00', 35, 'tokyo', 'smart phone', 'japan')

また,分析エンジン側のMySQLにも同様のテーブルを作っておきます。

create table users (
  user_id bigint PRIMARY KEY,
  sex smallint,
  last_update DATETIME,
  closed_account_time DATETIME,
  age smallint,
  city varchar(30),
  device varchar(30),
  country varchar(30)
);

著者プロフィール

高橋達(たかはしとおる)

Treasure Data Inc.でテクニカルサポートエンジニアとして,毎日,日米問わず顧客のサポートを担当。サポートエンジニアのエンジニアとしての地位向上を目指して色々模索中。そのために,秋葉原幻橙館で今日も元気にOSS活動を行っている。

URL:http://toru-takahashi.gitbooks.io/secret-ninja/content/

コメント

コメントの記入