実例で学ぶAWS入門:オーディエンスデータプラットフォーム「cosmi」を例に

第3回Amazon Elastic MapReduceを学ぶ(前編)

はじめに

こんにちは、adingoの岩川です。

ビッグデータという言葉が浸透して、実際に向き合う機会も増えてきていると感じます。

ビッグなデータを処理するには相応の計算パワーが必要です。分散処理システムを使って対処するのは一般的な方法ですが、分散処理システムを1から組むのは大変なので、Hadoop MapReduceベースのシステムが広く利用されています。

Hadoop MapReduceにおいては、ユーザはMapReduceと呼ばれる計算モデルに従って、Map処理、Reduce処理と呼ばれる計算内容のコア部分だけを書けばよく、タスク配分や通信などをケアする必要がありません。これによって、プログラミングのコストを大幅に減らすことが可能なのです。

しかしながら、実はHadoopクラスタの管理は決してラクなものではありません。

何台ものサーバをセットアップし、Hadoopをインストールしてやり、動作確認をする……非常に骨の折れる作業です。

そもそも、何台のクラスタが必要なのか。やってみないとわからないというのが正直なところではないでしょうか。

我々cosmi開発チーム)もまた、データ分析システムを開発するにあたって、そういった問題を抱えていたのです。そこで、Amazon Elastic MapReduce(EMR)とよばれるPaaSスタイルのHadoop環境を利用することにしました。

「実例で学ぶAWS」第3回と第4回は、少し予定を変更して2回ともEMRについて解説していきます。特にcosmiのケースで起こった、下記のパターンでのMapReduceアプリケーション開発手順を追っていきます。

  • Javaで実装する
  • CloudFrontのログを解析する
  • EMRクラスタとEC2インスタンスとが通信する

第3回は、EMRで実行する、Hadoop MapReduceプログラムのコンパイルから、ローカル環境で実行して動作確認するところまでを追っていきます。

CloudFront & EMR

今回はAWSが提供するContents Delivery Network(CDN)である、Amazon CloudFrontのログを解析するプログラムを作成します。

CloudFrontとEMRは相性がよく、CloudFrontのログをそのままEMRで処理することができるようになっています。

CloudFrontは静的コンテンツの配信だけではなく、情報を記録するために利用することができます。

すなわち、記録したい情報をGETパラメータに含めたリクエストを行うことで、情報をCloudFrontのログとして貯蔵しておくわけです。

cosmiでも、パートナー様から送信されるデータを記録するために、CloudFrontを利用しています。

EMRを使う、その前に

HadoopはMapReduceのJava実装ですので、ユーザプログラムもJavaで書くのが一般的です(もっとも、Hadoop Streamingなどのツールを使えば、Java以外の言語でもMapReduceの処理を書くことが可能です⁠⁠。

ソースコードをコンパイルするにはJavaのコンパイラHadoop Commonのライブラリが必要です。ダウンロードして、解凍しておきましょう。

執筆時点(2011年12月1日)での安定版は0.20.203.0です。

ローカル環境だけでの簡単な動作確認なら、それほど面倒な設定をせずとも実行可能です。

もし自前の分散実行環境を用意するのであれば、別途チュートリアル等をご参照ください(もっとも、分散環境を作るのは面倒だからEMRを使おうという、本記事の趣旨からは外れますが!⁠⁠。

MapReduceとは

Hadoop MapReduceのプログラムを書く前に、MapReduceについて抑えておく必要があるでしょう。

MapReduceは分散コンピューティングのための計算モデルです。入力となるキーと値のペアを、1つ、あるいは複数の別のキーと値のペアにマップするMap処理、Map処理の結果をキーごとにまとめるShuffle処理、キーごとにまとまった複数の値を計算して結果を得るReduce処理からなります。

Map処理とReduce処理はそれぞれキーの種類ぶん複数回実施されます。それぞれの処理は入力に応じた出力さえ算出すればいいので、並列化しやすいのです。また、複数回のMapReduceを組み合わせることで、複雑な計算も可能です。

なお、Shuffle処理はHadoopに組み込まれており、ユーザが記述する必要はありません。

CloudFrontのログを処理するプログラムを書く

まずは、CloudFrontのログの例を以下に示します。タブ区切りのCSV(TSV)形式で、第1カラムが日付です。テスト用にsample.input.logとして保存しておきます。

リスト sample.input.log
2011-10-18	07:00:32	HKG1	748	123.456.789.012	GET	xxx.cloudfront.net	/index.html	200	-	Mozilla/5.0...	foo=bar&hoge=fuga
2011-10-18	22:04:50	HKG1	14534	234.567.890.123	GET	xxx.cloudfront.net	/picture.jpg	200	-	Mozilla/5.0...	foo=bar&hoge=fuga
2011-10-19	01:08:15	HKG1	748	219.118.174.241	GET	xxx.cloudfront.net	/index.html	404	-	Mozilla/5.0...	foo=bar&hoge=fuga

このログを読み込み、各レスポンスコードが何回ずつ発生しているか集計するHadoopプログラムを書き、動作させてみましょう。

目的の処理を、下記のようなMap処理とReduce処理に落とし込みます。

Map処理
ファイル中の行番号→その行のログ内容(タブ区切り)というキーバリューペアを受け取り、1カラム目(日付)を取得して、日付→アクセス数(1)のキーバリューペアを書き込みます。
Reduce処理
日付→アクセス数の配列のキーバリューペアを受け取り、配列の中身をすべて足しこんで日付に対応するアクセス数を算出します。

なお、Map処理の入力となるキーバリューペアが行番号とその行の内容なのは、Hadoopが備えるテキストファイル処理のための入力フォーマット(TextInputFormat)の仕様です。

必要があればユーザが自作の入力フォーマットを定義することも可能です。

上記の内容に沿って作成したプログラムを以下に示します。

リスト DailyAccessCounter.java
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.GenericOptionsParser;

public class DailyAccessCounter {
        public static void main(String[] args) throws Exception{
                Configuration conf = new Configuration();
                String[] otherArgs = new GenericOptionsParser(conf, args)
                                .getRemainingArgs();

                Job job = new Job(conf);
                job.setJarByClass(DailyAccessCounter.class);
                job.setMapperClass(CloudFrontLogToDateMapper.class);
                job.setReducerClass(SumReducer.class);
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(LongWritable.class);
                job.setInputFormatClass(TextInputFormat.class);
                job.setOutputFormatClass(TextOutputFormat.class);
                FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
                FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

                job.waitForCompletion(true);
        }
}

class CloudFrontLogToDateMapper extends Mapper{
    protected void map(LongWritable key, Text value, Context context)
                  throws IOException, InterruptedException {
        String[] log =  value.toString().split("\t");
        if(log.length != 12)
            return;
        String dateString = log[0];
        context.write(new Text(dateString), new LongWritable(1));
    }
}

class SumReducer extends Reducer{
    protected void reduce(Text statusCode, Iterable counts, Context context)
                     throws IOException, InterruptedException {
        long sum = 0;
        for(LongWritable count:counts){
            sum += count.get();
         }
        context.write(statusCode, new LongWritable(sum));
    }
}

このプログラムをDailyAccessCounter.javaとして保存し、コンパイルしてみましょう。 /path/to/hadoopにはHadoop配布物を解凍したディレクトリを指定してください。

Linuxの例
$ export HADOOP_HOME=/path/to/hadoop
$ javac -cp "$HADOOP_HOME/hadoop-core-0.20.203.0.jar:$HADOOP_HOME/lib/*" DailyAccessCounter.java

サンプルデータを入力して、ローカルモードで実行してみましょう。/path/to/jdkにはJDKの設置ディレクトリを指定してください。

$ export JAVA_HOME=/path/to/jdk
$ export HADOOP_CLASSPATH=.
$ $HADOOP_HOME/bin/hadoop DailyAccessCounter sample.input.log output

計算が終わるとoutputディレクトリが作成されます。結果はpart-00000に格納されていますので、確認してみてください。

リスト part-00000
2011-10-18	2
2011-10-19	1

次回は

今回はここまでです。

CloudFrontのログを解析するHadoop MapReduceのプログラムを作り、Hadoopのローカルモードで動かすところまで追って来ました。Hadoopの分散環境の構築は面倒ですが、コンパイルと簡単な動作確認までならそれほど手間をかけずとも実行できます。

次回はこのプログラムをEMRを使って実行します。

おすすめ記事

記事・ニュース一覧