Ruby Freaks Lounge

第30回RubyとHadoopで分散処理 Hadoop Streamingで外部データを読み込む

前回のおさらい

第28回ではHadoop Streamingの仕組みについて説明しました。今回は、実際にHadoop Streamingを利用してデータ解析したときの具体的な話や、利用してみて困った(ハマった)Hadoop Streaming特有の問題とその解決法について紹介していきます。

実際に利用してみて困った(ハマった)こと

さて、第28回でも紹介しましたが今回Hadoopを利用したのはopen('http://tabemiru.com/2009'); return false;">たべみるのデータ解析のためです。たべみるのデータ解析では、食材名や地域名といった特定の値ごとにデータをまとめる処理が多く、またその処理速度が遅いのが問題だったのですが、このような処理はHadoop上(Ruby)でも比較的簡単に実装することができました。

それで最初は「これはスムーズにHadoop上に処理を移行できるかも?」と考えていたのですが、思わぬところで問題が発生してきて困りました。というのも、MapやReduceの中で外部データ(マスタデータなど)を読み込んできてマッピングするような処理をしたかったのですが、外部データをどのように読み込こめばよいのかが分からなかったのです。

図1 最初に想定した処理イメージ
図1 最初に想定した処理イメージ

そもそもHadoop Streamingでは、外部データをMapの標準入力として渡してあげる仕組みはあるのですが、MapやReduceの中で外部データを読み込んでくる仕組みは提供されていないようです[1]⁠。

この問題が解決できれば、より様々な処理にHadoop Streamingを利用することができそうです。

どうやって解決したか

調べてみてもこの問題を解決する情報を見つけられなかったため、手を動かして色々と試してみることにしました。その結果、現在は「Hadoopのコマンドでcatして、その出力を受け取って変数に入れる」というやり方でこの問題に対処しています。具体的には以下のようなコードです。

リスト1 外部データの読み込み
mapping_data = `hadoop dfs -cat s3://xxx/path/to/data`

Hadoopのコマンドでcatすることで、通常のcatコマンドのような感覚でHDFS上のファイルの中身を読むことができます。さらに、全体をバッククォート(`)で囲んであげて出力を受け取っています。通常、コマンドライン上などでHadoopのコマンド(putやgetなど)を利用するときには相対パスを指定すれば問題ありませんが、この場合は相対パスでは駄目で絶対パスでないと上手くいかないので注意してください。p>

なお、クックパッドではHDFSとしてS3を利用しているため上記のようなプロトコル(s3://)になっています。この方法で自由に外部ファイルを読み込むことが可能になり、通常のプログラミングと同じ感覚でHadoop Streamingでの処理(Map, Reduce)を記述できるようになりました。

具体的なコードの紹介

さて、実際にHadoopを使ってみたい方にとって「具体的にどのようなコードを記述しているのか」という点が気になるところでしょう。

あくまで一つの例として挙げると、Hadoopを使ったコードは以下のように記述できます。

リスト2 Hadoopで利用したコードの一例
#!/usr/bin/ruby

require 'csv';

# 外部データを読み込む
mapping_data = `hadoop dfs -cat s3://xxx/path/to/data`

ARGF.each do |line|
  line.chomp!
  csv = CSV.parse(line)

  # 好きな処理を行う(結果はresultに入るとする)
end

# 必要であれば後処理を行う
result.each do |key, value|
  # 最終的な処理、出力など
end

標準入力として、csvデータが渡ってくることを想定しています。標準入力を待ち受けているARGFブロックで標準入力を受け取る前に、例のように外部データを読み込んでいます。それを変数に入れておき(この例の場合はmapping_data⁠⁠、ARGFブロック内でそれを利用しています。その結果得られたデータ(この例の場合はresult)を使って、ARGFブロックの後にさらに処理をしたりすることも可能です。ARGFブロック内ですべての処理を行わないといけないわけではありません。

データ処理の流れとしては、以下の図のようになります。

図2 Hadoop Streamingでの具体的な処理の流れ
図2 Hadoop Streamingでの具体的な処理の流れ

どのくらい高速化したか

このようにして、問題にぶつかり試行錯誤しつつも、無事たべみるのデータ解析を終えることができました。結果的に、7000時間くらい(推定)掛かりそうだった処理を、Hadoopを利用することで30時間程度で処理することに成功し、非常に高速化しました。

さらに、Hadoopを使うとインスタンスを増やすことで処理速度が比較的線形にスケールしてくれるため、より処理時間を短縮することも可能ではないかと思います[2]⁠。

まとめ

実際にデータ解析にHadoopを使ってみて、DBでは処理しきれない、処理するのに非常に時間の掛かるような大規模なデータ処理でも、Hadoopを使えば短時間で処理できることが実感できました。また、Hadoop Streamingで外部データを読み込むことも可能だということが分かりました。

Hadoopを利用するとスケールさせるのが非常に簡単であるため、今後ますますデータが増えていくことを考えても使える技術だと思います。Hadoopの情報はまだまだ少ないのが実際のところなので、この記事がHadoopを使っている(または、使いたいと思っている)方の参考になれば幸いです。

おすすめ記事

記事・ニュース一覧