Ruby Freaks Lounge

第28回RubyとHadoopで分散処理 Hadoop Streamingの仕組み

はじめに

Hadoopとは、Googleの基盤技術であるMapReduceをJavaでオープンソース実装したもので、分散処理のフレームワークです。Hadoopを使うと、1台のサーバでは時間の掛かるような処理を、複数のサーバで分散処理させることができます。⁠処理を割り振ったサーバが壊れた場合どうするか」などの耐障害性の問題もHadoopが管理してくれるため、利用者は処理のアルゴリズムのみに集中することができるのです。素晴らしいですね。最近ではYahoo!やはてななど、様々な企業でも利用されるようになってきています。

Hadoop導入の背景

筆者はクックパッド株式会社に勤めています。open('http://cookpad.com'); return false;">クックパッドというサイトが有名だと思いますが、他にも携帯版クックパッドであるopen('http://m.cookpad.com'); return false;">モバれぴや、クックパッドでの検索データを提供するopen('http://tabemiru.com/2009'); return false;">たべみるといったサービスを運営しています。

2009年12月現在、クックパッドはユーザ数が848万人、30代女性の3人に1人が利用するサイトとなっています。たくさんの方が毎日の献立を考える際にクックパッドを利用してくださっていて、日々大量の検索データが溜まっています。また、クックパッドではもっともっと料理が楽しくなるように、ユーザが本当に望んでいる(利用したい)食材が店頭に並ぶようになることを目指しています。そのために、膨大な検索データを週や月、地域といった側面から分かりやすくデータ解析したものがたべみるで、⁠いつどこで、どんなキーワードが検索されているのか」を見ることができます。このたべみるのデータ解析にHadoopを使ってみました。

図1 たべみるで「カレー」を検索した画面(月別表示、2006年のデータ)
図1 たべみるで「カレー」を検索した画面(月別表示、2006年のデータ)

とはいっても、いきなりHadoopを導入したわけではなく、最初は1台のサーバで(DBを使って)処理していましたが、以下のような問題が出てきて困りました。

  • そもそも扱うデータ量が多くて、DB処理が遅い
  • DB処理の中でも特にGROUP BYする処理が多く、これが猛烈に遅い
  • 1年分のデータ解析で7000時間くらい(推定)掛かりそう

こ、これは何とかしないといけません。そこで、処理時間の短縮を期待してHadoopを試してみることにしたのです。

Hadoop導入時の注意点

時間の掛かる処理でも複数のサーバで分散処理させるので短時間で処理が終わる……良いことだらけのように見えるHadoopですが、DBが扱えないという注意点があります。これは、Hadoopから扱えるデータはHDFS(Hadoop Distributed File System)上にマウントされたデータのみという制約があるためです。つまり、Hadoopを使う場合にはDBを使わないような処理のアルゴリズムで設計しなくてはいけません。

Hadoopを使う

さて、実際にHadoopを使うには2つの方法があります。1つは、もともとHadoopはJavaで作られたものなのでJavaで記述する方法、もう1つはHadoop Streamingという仕組みを使って、Java以外の(標準入出力に対応している)言語で記述する方法です[1]⁠。

今回はRubyを使いたかったため、Hadoop Streamingを利用しました。

Hadoop Streamingの仕組み

Hadoop Streamingの仕組みについて説明します。Hadoopでは、Map、Shuffle&Sort、Reduceという3つの処理のフェーズがあります。Hadoop Streamingでは、Shuffle&Sortのフェーズは変更することができませんが、MapとReduceのフェーズは標準入出力を介して、利用者が自由にプログラムを記述できます。イメージとしては以下の図のようなものです。

図2 Hadoop Streamingの動作イメージ
図2 Hadoop Streamingの動作イメージ

HadoopではMapでの出力データがタブを区切り文字としたkeyとvalueとして扱われ、同じkeyのデータは必ず同じReduceで処理されます。

実際の例 - ワードカウント

それでは、実際の例としてワードカウントを考えてみましょう。以下に簡単なMapとReduceの例を紹介するので参考にしてみてくださいリスト1~3⁠。最終的には各ワードがkeyに、各ワードの出現回数がvalueとしてタブ(\t)区切りで出力されます。Reduceで各ワードの出現回数をカウントしていますが、同じワード(key)は同じReduceに渡されることが保証されているため、きちんとカウントを取ることができるわけです。

リスト1 入力データ(Mapの入力)
hoge fuga foo hoge foo
foo bar hoge hoge fuga
リスト2 Mapで使うプログラムの例
#!/usr/bin/ruby

ARGF.each do |line|
  line.chomp!
  line.split.each do |word|
    puts "#{word}\t1"
  end
end
リスト3 Reduceで使うプログラムの例
#!/usr/bin/ruby

count = Hash.new {|h,k| h[k] = 0}
ARGF.each do |line|
  line.chomp!
  key, value = line.split(/\t/)
  count[key] += 1
end

count.each do |k,v|
  puts "#{k}\t#{v}"
end

クックパッドでのHadoop利用環境

実際、クックパッドでは以下の環境でHadoopを利用しています。

  • Hadoop Streamingを利用
  • 使用した言語はRuby
  • EC2でHadoopクラスタを構築(インスタンスは30台~50台程度)
  • HDFSとしてS3を利用(s3fs)

先述しましたが、Hadoop Streamingを使っていて、処理はRubyで記述しています。また、このデータ解析自体がそれほど頻繁に行われるものではないため、使うときだけサーバを立ち上げられるようにAmazonのEC2とS3を利用しています。データ量に応じて気軽にサーバ台数を変更することができるため、非常に重宝しています。HadoopとEC2/S3の相性は非常に良いですね。また、EC2を利用していて、そのままではHDFS上のデータがインスタンスを起動するたびに消えてしまうため、HDFS上のデータが消えないようにS3をHDFSとして利用しています。

まとめ

今回は導入編としてHadoop Streamingの仕組みや、クックパッドでのHadoop利用環境を紹介しました。次回はデータ処理時に具体的にどのようなコードを書いたのか、実際に利用してみて困った(ハマった)ことなど、実践的な内容を紹介したいと思います。

おすすめ記事

記事・ニュース一覧