実例で学ぶAWS入門:オーディエンスデータプラットフォーム「cosmi」を例に

第3回 Amazon Elastic MapReduceを学ぶ(前編)

この記事を読むのに必要な時間:およそ 3 分

CloudFrontのログを処理するプログラムを書く

まずは,CloudFrontのログの例を以下に示します。タブ区切りのCSV(TSV)形式で,第1カラムが日付です。テスト用にsample.input.logとして保存しておきます。

リスト sample.input.log

2011-10-18	07:00:32	HKG1	748	123.456.789.012	GET	xxx.cloudfront.net	/index.html	200	-	Mozilla/5.0...	foo=bar&hoge=fuga
2011-10-18	22:04:50	HKG1	14534	234.567.890.123	GET	xxx.cloudfront.net	/picture.jpg	200	-	Mozilla/5.0...	foo=bar&hoge=fuga
2011-10-19	01:08:15	HKG1	748	219.118.174.241	GET	xxx.cloudfront.net	/index.html	404	-	Mozilla/5.0...	foo=bar&hoge=fuga

このログを読み込み,各レスポンスコードが何回ずつ発生しているか集計するHadoopプログラムを書き,動作させてみましょう。

目的の処理を,下記のようなMap処理とReduce処理に落とし込みます。

Map処理
ファイル中の行番号→その行のログ内容(タブ区切り)というキーバリューペアを受け取り,1カラム目(日付)を取得して,日付→アクセス数(1)のキーバリューペアを書き込みます。
Reduce処理
日付→アクセス数の配列のキーバリューペアを受け取り,配列の中身をすべて足しこんで日付に対応するアクセス数を算出します。

なお,Map処理の入力となるキーバリューペアが行番号とその行の内容なのは,Hadoopが備えるテキストファイル処理のための入力フォーマット(TextInputFormat)の仕様です。

必要があればユーザが自作の入力フォーマットを定義することも可能です。

上記の内容に沿って作成したプログラムを以下に示します。

リスト DailyAccessCounter.java

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.GenericOptionsParser;

public class DailyAccessCounter {
        public static void main(String[] args) throws Exception{
                Configuration conf = new Configuration();
                String[] otherArgs = new GenericOptionsParser(conf, args)
                                .getRemainingArgs();

                Job job = new Job(conf);
                job.setJarByClass(DailyAccessCounter.class);
                job.setMapperClass(CloudFrontLogToDateMapper.class);
                job.setReducerClass(SumReducer.class);
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(LongWritable.class);
                job.setInputFormatClass(TextInputFormat.class);
                job.setOutputFormatClass(TextOutputFormat.class);
                FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
                FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

                job.waitForCompletion(true);
        }
}

class CloudFrontLogToDateMapper extends Mapper{
    protected void map(LongWritable key, Text value, Context context)
                  throws IOException, InterruptedException {
        String[] log =  value.toString().split("\t");
        if(log.length != 12)
            return;
        String dateString = log[0];
        context.write(new Text(dateString), new LongWritable(1));
    }
}

class SumReducer extends Reducer{
    protected void reduce(Text statusCode, Iterable counts, Context context)
                     throws IOException, InterruptedException {
        long sum = 0;
        for(LongWritable count:counts){
            sum += count.get();
         }
        context.write(statusCode, new LongWritable(sum));
    }
}

このプログラムをDailyAccessCounter.javaとして保存し,コンパイルしてみましょう。 /path/to/hadoopにはHadoop配布物を解凍したディレクトリを指定してください。

Linuxの例

$ export HADOOP_HOME=/path/to/hadoop
$ javac -cp "$HADOOP_HOME/hadoop-core-0.20.203.0.jar:$HADOOP_HOME/lib/*" DailyAccessCounter.java

サンプルデータを入力して,ローカルモードで実行してみましょう。/path/to/jdkにはJDKの設置ディレクトリを指定してください。

$ export JAVA_HOME=/path/to/jdk
$ export HADOOP_CLASSPATH=.
$ $HADOOP_HOME/bin/hadoop DailyAccessCounter sample.input.log output

計算が終わるとoutputディレクトリが作成されます。結果はpart-00000に格納されていますので,確認してみてください。

リスト part-00000

2011-10-18	2
2011-10-19	1

次回は

今回はここまでです。

CloudFrontのログを解析するHadoop MapReduceのプログラムを作り,Hadoopのローカルモードで動かすところまで追って来ました。Hadoopの分散環境の構築は面倒ですが,コンパイルと簡単な動作確認までならそれほど手間をかけずとも実行できます。

次回はこのプログラムをEMRを使って実行します。

著者プロフィール

小澤昇歩(こざわしょうほ)

株式会社adingo取締役。

2008年adingo設立にエンジニアとして参加。以降同社の技術・プロダクト部門を担当。技術者と事業者という2つの顔を持ちながら,サービスや事業の成長に日々奮闘中。好きな本は『小さなチーム,大きな仕事』。

ブログ:SHOHOKOZAWA

Twitter:@s_kozawa


岩川建彦(いわかわたけひこ)

株式会社adingoエンジニア。鹿児島大学大学院卒。工学博士。

「分散アプリケーションのためのプログラミング言語開発」をテーマに2007年IPA未踏ユース・スーパークリエータに認定された。現在はクラウド環境を活かしたcosmiのシステム開発に従事している。

著書に『Python入門』(共著,秀和システム)がある。