第2回では、halookのセットアップ方法をご紹介しました。第3回では、実際にhalookを活用し、Hadoopの処理の中身を見ていくことにします。今回のテーマは、HadoopのMapReduce Jobを実装する際に重要になる、PartitionerとCombinerです。MapReduceのアルゴリズムを紹介する例としてよく使われる、WordCountの処理時間にもこれらの使い方は影響を与えます。より有効に使えるようになるために、これらを設定がある場合、ない場合の処理の様子、実行時間の比較を、halookで見ていくことにしましょう。
今回の検証内容
単純な集計処理を例にとります。以下のような、非常に単純化した商品の販売記録を元に、店舗ごとの売上を計算します。タブ区切りで、以下のデータが入っているものとします。
以下のようなレコードが並んでいます。
使うのは、4番目の店舗名と、3番目の値段だけです。店舗名ごとに、値段の合計を計算するだけという、非常にシンプルなものです。以下の条件で、環境、データおよびサンプルプログラムを用意しました。
- 1.CDH3u5スレーブ3台の環境を用意
各スレーブでMapとReduceのスロットが2つずつ、計6Mapper/Reducerが同時に動ける環境で実験しました。
- 2.10万レコードのファイル(約4MB)を6つ用意
全部同じディレクトリにおいて、MapReduce処理の入力とします。Mapperが同時に6つ動くことになります。
- 3.store00 が50%、store01からstore50までが各1%で出現
このように、データが非常に偏った状態にしました。
- 4.Mapperは、店舗名をキー、値段をバリューとして出力
- 5.Reducerは店舗名ごとに値段の合計を計算
- 6.検証のために、Mapper, Reducerの処理ごとに1瞬停止
Mapperでは1行ごとに、Reducerでは、バリュー1つごとに、1ミリ秒間スリープする処理を加えました。これにより、より複雑な処理を行う場合を擬似的に再現しました。
PartitionerとCombinerの説明
店舗ごとの売上集計をMapReduceで行うと、以下のようになります。
Mapperでキーとバリューのペアを作り、Reducerで同じキーのバリューを足し合わせます。ここで、図のPartitionerは、複数のReducerがあるときにキーとバリューのペアをどのReducerに割り振るかを決める役割を持っています。Partitionerについて、とくに記述せずにMapReduceを実装した場合、HashPartitionerと呼ばれるPartitionerが使われます。
これは、キーのハッシュ値を計算し、その値をReducerの数で割った余りで何番目のReducerに割り振るかを決定するものです。キーの数が十分多ければ、大体均等にキーが各Reducerに割り振られることになりますが、そもそも特定のキーに値が集中していた場合、そのキーが割り当てられたReducerにデータが集中することになるので、処理を均等に分散させたい場合は注意する必要があります。
次に、Combinerを使った場合の例です。Combinerは、Mapperの処理の後、同じMapperで処理した結果だけで一旦集計処理を行うものです。Combinerには、Reduceの処理内容と全く同じものを指定することが手法としてよく行われますが、こうすることにより、ネットワークの転送量を抑えるなどの恩恵に与れます。
MapReduceの実行結果
上に述べた条件で、実際にMapReduce Jobを実行してみました。以下の、5つのパターンで試しました。
表1 MapReduce Job実行条件(普通の)
No | Reduce数 | カスタムPartitioner | Combiner |
① | 1 | 無 | 無 |
② | 2 | 無 | 無 |
③ | 3 | 無 | 無 |
④ | 2 | 有 | 無 |
⑤ | 1 | 無 | 有 |
Partitionerを設定した4番では、全体の50%を占めるstore00だけを1つのReducerに、残り全てをもう1つのReducerに割り当てています。
順に実行した結果をhalookで確認すると、以下のようになりました。
①から⑤に行くに従って処理時間が短くなっていることがわかります。1つ1つ、詳しく見ていきましょう。
まず、1番目のMapReduce Jobは以下のようになりました。
緑の矢印6本がMap処理、青の矢印1本がReduce処理を表しています。まずこれを見ただけでも、Reduce処理を分散させるだけで処理時間が短くなるのではないかと推測できます。
次に、2番目の、同じ処理をReduce数2で実行した場合を見てみましょう。
Reduceの矢印が2つに分かれている分、処理が早く終わったようです。しかし、見るからに無駄があります。Reduce処理の後半部分は分散処理されていません。このようになってしまうのは、テストデータの特性で、store00に全体の50%が偏ってしまっているからです。デフォルトで使われるPartitionerは、キーを均等に割り振るので、バリューの割り振りで見ると、50%プラス残りの50%の半分を加えた75%と、50%の半分の25%に割り振られてしまう計算になります。この図から、確かにそのような偏った割り振りになっていることがわかります。
Reduce数をもっと増やすとどうなるでしょうか?次は、Reduce数が3の場合です。
全体の処理時間は、Reduce数が2のときより短くなっているのですが、一台に処理が集中する非効率さは変わっていません。
次に、Partitionerを設定した場合です。
store00とそれ以外、で割り振ったため、2つのReduce処理の時間がほぼ同じになっています。Reduce数が1のときと比べて、Reduce処理時間が半分になり、処理時間が大きく短縮されました。
Partitionerを使って性能を改善するには、データの特性を事前に知っておかなければなりません。チューニングのためにデータの特性を分析するのにも手間がかかるため、性能改善の見込みがあるかどうかを事前に確認してから取り組むのがよいでしょう。Reduce数2、Partitioner設定なしの図5の場合のように、一部のノードしか動かない時間が長いようであれば、チューニングで処理が速くなる可能性が十分にあると言えます。
さて、それよりも処理時間が短かったのは、Combinerを使った5番目のパターンでした。
Reduceの時間がほとんどかかっていません。これは、Reduceで行っていた集計処理のほとんどを、6並列で実行しているMapが肩代わりしているからです。
注意点としては、今回行った「合計の計算」ではなく、「平均の計算」などのような、一部のデータだけで先に計算すると最終結果が変わってしまう場合は、Combinerはそのまま使うことはできません。Combinerは、いつでも使えるわけではないですが、使える場合は積極的に利用するのがよいでしょう。
まとめ
Hadoopは、基本的には台数を増やせば増やした分だけ処理が速くなりますが、処理の書き方次第で同じ台数を有効に使えるかどうかが大きく変わってきます。ジョブの開始から終了まで、ずっと処理が分散して走っているか、確認してみてはいかがでしょうか?
なお、PartitionerとCombinerの使い方については、O’Reillyの『Hadoop MapReduceデザインパターン』(Jimmy Lin, Chris Dyer)に詳しく載っています。Map処理中にCombinerにあたる処理を自分で記述する、「in-mapper combining」の手法なども掲載されているので、詳しく知りたい方はこちらを参照してください。
今回はここまでです。次回も、MapReduceの特性について説明します。