R&Dトレンドレポート

第10回 MapReduce処理をやってみよう![実践編]

この記事を読むのに必要な時間:およそ 10.5 分

MapReduceプログラム

ようやくプログラム本体に入れます。今回はJAVAのネイティブなプログラムで機能の実装を行います。

基本的な流れをおさらいすると以下の通りです。

①データ取得・登録
2ちゃんねるからダウンロードしたログデータをHDFSに保存します。これはMapReduce処理のインプットとなるデータです。ここでは1行1レスポンスというフォーマットで保存します。
②Map処理
①のデータを入力としてMap処理がスタートします。Map処理では,以下のことを行います。
  • データの正規化(アスキーアートの削除)
  • キーとバリューへの分割(unixtimeを10分で丸めた値をキー,品詞解析した単語をバリューにする)
この処理は各サーバに分割されたファイルについて実行されます。
③Reduce処理
②で処理された結果を受けて実行されます。キーごとに単語数を集計し,上位2単語程度を当該時間帯でよく発言されたものとします。

プログラムの全体像

MapReduce処理はデータの入出力がキーとバリューと決まっているため,Map, Reduce処理に何をキーにして,何をバリューにするか,にさえ注意すればわりと簡単に書くことができます。ここら辺が複雑な分散コンピューティングとの差であると言えます。

プログラムの全体像はたったこれだけです。

public class J2ch

{
           // Map処理
	static class J2chMapper extends Mapper <LongWritable, Text, IntWritable, Text>
	{
		public void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException
		{
		}
	}

           // Reduce処理
	static class J2chReducer extends Reducer <IntWritable, Text, IntWritable, Text>
	{
		public void reduce(IntWritable key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException
		{
		}
	}

           // メイン関数
	public static void main(String[] args ) throws Exception
	{
		if ( args.length != 2 )
		{
			System.err.println("Usage: hogehoge");
			System.exit(-1);
		}
                       // ジョブの定義
		Job job = new Job();
		job.setJarByClass(J2ch.class);

                       // 入力出力ファイルの定義(引数の1,2)
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

                      // mapperとreducerクラスを定義
		job.setMapperClass(J2chMapper.class);
		job.setReducerClass(J2chReducer.class);

                       // 最終出力のキーとバリューの型を定義
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(Text.class);

		System.exit(job.waitForCompletion(true) ? 0 : 1 );

	}
}

メイン処理ではジョブの定義とmapperとreducerは誰?というのを定義しています。色をつけた部分がメインとmapper, reducerの関係する部分です。

ここにMap処理,Reduce処理の部分に処理を埋めていけばそれでOKです。簡単ですよね!?

Map処理

まずMap処理から見ていきましょう。

Map処理で入力とされる2ちゃんねるデータは以下のようなフォーマットとなります(ある3行だけ抜粋&加工しています⁠⁠。

※内容は先日放送された映画「カイジ 人生逆転ゲーム」の実況スレです。香川照之さん扮する利根川が⁠fuck you!⁠というシーン付近ですね。^^
1287144100  299  名無しさんにズームイン!    LnjhWICN        sage   迫力ねえよ,香川w
1287144803  300  名無しさんにズームイン!    3M/Iym08        sage   利根川なんかちがう。
1287144804  301  名無しさんにズームイン!    xueZONEO        sage   利根川キター!

それではMap処理の出力はどうでしょうか?

1287144000    迫力
1287144000    ねえ
1287144000    よ
1287144000    ,
1287144000    香川
1287144000    w

1287144600    利根川
1287144600    なんか
1287144600    ちがう
1287144600    。

1287144600    利根川
1287144600    キター
1287144600    !

キーがunixtime(シリアルな秒)を600秒(10分)で丸めた値で,バリューとして品詞解析したパーツを並べています。1行が品詞の数だけ複数行に展開されたようなイメージです。キーが600秒で丸められていますので,2行目と3行目は同一のキーとして扱われています。ただ⁠,⁠⁠。⁠などが出力されてしまうのはイマイチかもしれません。ここは名詞だけを出力するようにします。

それでは実際のコードを見てみましょう。innerクラスJ2chMapperです。

static class J2chMapper extends Mapper <LongWritable, Text, IntWritable, Text>
{
	public void map(LongWritable key, Text value, Context context)
		throws IOException, InterruptedException
	{
		// keyは使いません。
		String line = value.toString(); // Text型で渡される1行のデータをString型に変換する。
		String[] arr = line.split("\t");    // タブで分割する。
	
		int _time = Integer.parseInt(arr[0]); // unixtimeを取得。
		int time = (int)Math.floor(_time/600)*600; // 600秒で丸める。後のキーとなる。
	
		String b = arr[6]; // ボディーテキストを取得
	
		if ( isAA(b) ) return; // AAを含むモノは削除
		String body = noGomi(b); // ゴミを削除(URLや>>123などを削除)
	
		Tagger tagger = new StandardTagger("UTF-8", ""); // CMeCabのクラス
		Node node = tagger.parse(body); // 品詞解析
	
	 	// 品詞解析の結果の数(単語)だけループする。
	 	while (node.hasNext())
		{
			String surface = node.next(); // 単語の取得
			String feature = node.feature(); // 単語の解析結果	
	
			// 各項目の値を取得。以下を期待
			// 名詞,副詞可能,*,*,*,*,本日,ホンジツ,ホンジツ
			String featureArr[] = feature.split(",");
	
			// 名詞でかつ内容がゴミではない場合,
			// Map処理の結果としてキーバリューを出力する。
			if ( featureArr[0].equals("名詞" ) && ! isGomi(surface))
			{
	                                    //丸めたunixtime,単語のペアを出力Reducerに渡される。
				context.write(new IntWritable(time), new Text(surface));
			}
	
		}
	}
}
static class J2chMapper extends Mapper <LongWritable, Text, IntWritable, Text> 

この部分で,mapperクラスの

  • 入力キー,バリュー(LongWritable, Text)
  • 出力キー,バリュー(IntWritable, Text)

を定義しています。map()には入力キー,バリューが渡され実行されます(第三引数のContextは意識しなくてもいいです⁠⁠。今回のmapにおけるキーはファイルのバイト位置になります(デフォルトの入力フォーマットTextInputFormatが適用されているため⁠⁠。また,バリューでは行データが渡されます。

つまり,今回のデータの場合,

key=0
value=“1287144100  299  名無しさんにズームイン!    LnjhWICN        sage   迫力ねえよ,香川w”

というふうに1行目が処理されます。2行目は,

key=96
value=“1287144803  300  名無しさんにズームイン!    3M/Iym08        sage   利根川なんかちがう。”

という具合です。

今回のmap処理ではバイト位置は不要ですので,key変数は使っていません。⁠600⁠というマジックナンバーを使ってたりしますが,この辺はstaticな値として変数にした方がいいですね。

品詞解析している部分を別のクラスにして隠蔽化,抽象化した方が別の品詞解析エンジンを使ったときに柔軟に対応できますね,など突っ込みどころがあるかと思いますが,まずはこれだけのコードでmap処理がかけることに驚いてください。

内容は単純なテキスト処理ですので,コメントを追っていただけるとご理解いただけると思います。

著者プロフィール

脇本武士(わきもとたけし)

都内中小IT企業(メイサンソフト(株))に所属。某大手自動車会社でのシステム開発,運用を経て,現在は研究開発部署に席をお借りしています。DB周りの保守サポート,ウェブ技術開発を主に手がけてきました。現在は大規模計算フレームワークの活用とKVSに注目しています。普段はOS Xを使用していますが一番よく使うアプリはTerminalです(笑)。

R&Dトレンドレポート(てくらぼ)