R&Dトレンドレポート

第10回 MapReduce処理をやってみよう![実践編]

この記事を読むのに必要な時間:およそ 10.5 分

Recude処理

続いてReduce処理です。

Reduce処理の入力はMap処理の出力になります。名詞のみに絞った場合下記のようになりました。

1287144000    迫力
1287144000    香川
1287144000    w
1287144600    利根川
1287144600    利根川
1287144600    キター
1287144600    !

次に出力です。

1287144000    1,迫力
1287144000    1,香川
1287144000    1,w
1287144600    2,利根川
1287144600    1,キター
1287144600    1,!

丸めたunixtimeごとに出現回数の多い単語順にベスト5が出力されます。これが最終的なMapReduceの出力となります(ファイル出力となる⁠⁠。

それでは実際のコードを見てみましょう。J2chReduerクラスです。

static class J2chReducer extends Reducer <IntWritable, Text, IntWritable, Text>
{
	public void reduce(IntWritable key, Iterable<Text> values, Context context)
		throws IOException, InterruptedException
	{
		// hashtableオブジェクトを作成
		// Text=カウントというハッシュを作りたい。
		Hashtable kvs = new Hashtable();
		
		// valueにTextのリストが渡されるのでイテレーションする
		for( Text value : values )
		{
			// Textの取り出し。
			String k = value.toString();

			// kvsオブジェクトに存在すれば。
			if ( kvs.containsKey( k ))
			{
				// すでに存在するキーのバリューを+1する。出現回数を+1する。
				Integer n = kvs.get(k) + 1;
				kvs.put(k, n);
			}
			else // 無ければ
			{
				// kvsオブジェクトにキーと出現回数(1)を追加する。
				kvs.put(k, 1);
			}
		}

		// 配列のリストを作成してkvsの中身をセットする。
		ArrayList entries = new ArrayList(kvs.entrySet());

		// バリューの中身でソートする。(出現回数でソートする)
		Collections.sort(entries, new Comparator(){
				public int compare(Object obj1, Object obj2){
				Map.Entry ent1 =(Map.Entry)obj1;
				Map.Entry ent2 =(Map.Entry)obj2;
				int val1 = Integer.parseInt(ent1.getValue().toString());
				int val2 = Integer.parseInt(ent2.getValue().toString());
				return (val2 - val1); // 降順
			}
		});

		// ソート後の配列のリストを走査(降順)
		// 最大4まで配列を回す
		for( int i = 0; i < entries.size() && i < 5; i++ )
		{
			String word = (String)((Map.Entry)entries.get(i)).getKey();
			int cnt = Integer.parseInt(((Map.Entry)entries.get(i)).getValue().toString());

			// キーはIntWritable,バリューはText
			context.write(key, new Text( key + "," + cnt + "," + word));
		}
	}
}

こちらもJ2chMapperと同様に,

static class J2chReducer extends Reducer <IntWritable, Text, IntWritable, Text>

の部分が,reducerクラスの

  • 入力キー,バリュー(IntWritable, Text)
  • 出力キー,バリュー(IntWritable, Text)

を定義しています。Mapperの出力とReducerの入力が同じであることに注意してください。

さて,reduce()関数ですが,第二引数がiterable<Text>となっています。mapの出力であればTextじゃないか,と思われるかもしれませんが,実際にはText型のリストが渡されます。つまり,mapの出力であると想定された下記は,

1287144000    迫力
1287144000    香川
1287144000    w
1287144600    利根川
1287144600    利根川
1287144600    キター
1287144600    !

実際には

1287144000    [迫力,香川,w]
1287144600    [利根川,利根川,キター,!]

というふうに,キーごとにTextのリストが渡されます。また,Reduce処理の前にキーでソートされますので,同一キーについては必ず1つにまとめられます。

リストで渡された名詞の単語を一度Hashtableに登録し,ArrayListに入れ直してバリューで降順にソートしています(明らかに無駄な動作をしているような気がしますが…⁠⁠。ソート後の単語と出現回数を上位5つまで出力しています。

これで当初の目的である,2ちゃんねるの実況スレッドから時間単位に出現数の多い単語を出力する,という機能が実装されました。

いかがでしたか? 思ってたよりずっと簡単に感じたんではないでしょうか? Map,Reduceのそれぞれの処理での入出力のフォーマットがある程度決まっていることで,肝心のやりたいことに集中できたと思います。

Map,Reduceというふうに役割がきっちり分かれていることで,どこに何を実装するべきかというのが自然に分担されたと思います。さらに,MapとReduceの間にキーでのソート,マージがあることもReduce処理の書きやすさを増していたと思います。

著者プロフィール

脇本武士(わきもとたけし)

都内中小IT企業(メイサンソフト(株))に所属。某大手自動車会社でのシステム開発,運用を経て,現在は研究開発部署に席をお借りしています。DB周りの保守サポート,ウェブ技術開発を主に手がけてきました。現在は大規模計算フレームワークの活用とKVSに注目しています。普段はOS Xを使用していますが一番よく使うアプリはTerminalです(笑)。

R&Dトレンドレポート(てくらぼ)