Cassandra実践入門―Twitter,Facebookが採用するNoSQLシステム

この記事を読むのに必要な時間:およそ 10.5 分

Ruby on Railsでのプログラミング

筆者が所属するケイビーエムジェイの製品パーソナライズド・レコメンダーでは,一部の機能にCassandraを使用しています。その事例をベースに,Cassandraを使うコードを書いてみましょう。

筆者が動作検証したバージョンはそれぞれ,Ruby 1.8.7,Rails 2.3.8,Cassandra 0.6.3です。

例題「最近チェックしたアイテム」

Railsで実装されたショッピングモールアプリケーションに,Cassandraを使って「ユーザが最近チェックしたアイテム」を表示する機能を追加します。ユーザがアイテムの詳細ページにアクセスする際,Cassandraに履歴情報を記録します。またショップの各画面を表示する際,Cassandraから履歴情報を参照し,最新10件のアイテム情報を表示します図5⁠。

図5 ショッピングモールと履歴情報の保存

図5 ショッピングモールと履歴情報の保存

データ構造

まず履歴情報を格納するカラムファミリを定義します。名前はUserHistoryとします。Cassandraでは,カラムファミリの定義時にソート順を表す型を指定すると,読み出し時に必ずその順序で返されることがシステムによって保証されています表5⁠。

表5 カラムファミリの定義で指定可能なソートタイプ

タイプ説明
BytesType値によって単純にソートする。値の型によるチェックは行われない。指定しない場合これがデフォルトとなる
AsciiTypeBytesTypeと同じソートが行われるが,US-ASCIIとしてパースできないとインサートできない
UTF8TypeUTF-8文字列としてソートする
LongType64ビットlong値としてソートする
LexicalUUIDType128ビットのUUID(Universal Unique ID)を,バイト値でソートする。UUIDはRFC-4122で定義されている
TimeUUIDTypeversion1と呼ばれる時間ベースのUUIDを,埋め込まれたタイムスタンプによって時間順にソートする

今回扱う情報は履歴データですので,TimeUUID Typeを使うと時系列でソートされた状態で取り出すことができます。バージョン0.6系列のCassandraでは,conf/storage-conf.xmlでスキーマを定義していますので,このファイルを編集して次の記述を追加します。

<ColumnFamily Name="UserHistory"
  CompareWith="TimeUUIDType" />

属性Nameがカラムファミリの名前,属性CompareWithがソート順を意味します。ファイルを編集してCassandraを再起動すれば設定が反映され,UserHistoryが使用可能になります。

また,個々のユーザの履歴情報は,ショップのIDとユーザのIDの複合キーになることがわかっていますので,両者をデリミタ@で連結して1つのキーとします。データ構造をJSON形式で表現すると次のようになります。

UserHisotry = {
  "user_id@shop_id": {
    TimeUUIDのバイト表現: "item_id", ...
  }, ...
};

cassandra gemのインストール

RubyからCassandraを使うためのライブラリは,少し紛らわしいですが表3で紹介したように「Cassandra」です。gemの名称が「cassandra」で,クラス名も「Cassandra」です。このライブラリはGitHubで開発が進められており,Twitterのエンジニアであるライアン・キングもプロジェクトに参加しています。

cassandraはgemコマンドでインストールできます注5⁠。

$ sudo gem install cassandra
注5)
依存するgemの中にCで書かれた拡張ライブラリが含まれているため,Cコンパイラが必要です。

UserHistoryクラス

RailsからCassandraを利用するにあたっては,UserHistoryというシングルトンクラスを作成し,そのインスタンスを通じてアクセスすることにします。ここでは初期化部分の実装を説明しますリスト1⁠。

リスト1 RAILS_ROOT/lib/user_history.rb(初期化部)

class UserHistory
  include Singleton  ---①

  def self.setup(options)  ---②
    @@host = options.delete(:host)
    @@keyspace = options.delete(:keyspace)
    @@options = options
  end

  def initialize  ---③
    @cassandra = Cassandra.new(@@keyspace, @@host, @@options)
  end
end

UserHistoryでは,でSingletonモジュールをインクルードし,のクラスメソッドsetup()で初期化に必要なパラメータを受け取ります。インスタンス生成時にのinitialize()メソッドでCassandraクラスをnew()し,メンバ変数に保持します。

アプリケーションからは次のような記述でUserHistoryクラスのインスタンスを参照できます。

user_history = UserHistory.instance

Ruby on Railsへの組み込み

Railsの初期化ファイルenvironment.rbの中で,CassandraクラスとUserHistoryクラスを使えるように設定しますリスト2⁠。

リスト2 RAILS_ROOT/cong/environment.rb(初期化部)

Rails::Initializer.run do |config|
  config.gem "cassandra"  ---①
end
cassandra_options = {
  :host => "127.0.0.1:9160",  ---②
  :keyspace => "Keyspace1",  ---③
  :timeout => 0.5
}
UserHistory.setup(cassandra_options)  ---④

まずでcassandra gemを読み込みます。

次にCassandraのインスタンス生成に必要なパラメータを用意します。で接続先としてローカルで動作するCassandraを指定します。9160はCassandraのデフォルトのポート番号です。で使用するキースペースを指定します。ここではリリースファイル添付のstorage-conf.xmlで定義されている「Keyspace1」を指定しています。

最後にでUserHistoryクラスの初期化メソッドを呼び出します。

履歴データの書き込み

チェックされたアイテムを1件Cassandraに記録するUserHistory#add()メソッドの実装について見てみますリスト3⁠。

リスト3 RAILS_ROOT/lib/user_history.rb(書き込み部)

class UserHistory
  include Cassandra::Constants  ---①

  def add(shop_id, user_id, item_id)
    column = {SimpleUUID::UUID.new => item_id.to_s}  ---②
    option = {:consistency => ONE}  ---③
    @cassandra.insert(:UserHistory,  ---④
                      row_key(shop_id, user_id),
                      column, option)
  end

  private
  def row_key(sid, uid)  ---⑤
    uid.to_s + "@" + sid.to_s
  end
end

の部分でCassandra#insert()メソッドを使って実際の書き込み処理を行っています。1番目の引数はカラムファミリで,文字列ではなくシンボルを与えることになっています。2番めのパラメータは行キーです。のrow_key()メソッドを使ってキーとなる文字列を生成しています。

カラムデータはRubyのハッシュで表現します。ソートタイプがTimeUUIDTypeなので,のようにSimpleUUID::UUIDクラスのインスタンスを使います注6⁠。

注6)
cassandraと同時にインストールされるsimple_uuidライブラリで提供されます。
書き込み時の整合性レベル

Cassandraは結果整合性を持つシステムですが,書き込みや読み込みの際に,整合性のレベルを変化させることができます。

ここでは書き込み時の整合性レベルとして提供されているオプションを見てみます表6⁠。表6の下のほうにいくほど整合性のレベルは高くなりますが,その分処理速度は遅くなります。今回はRuby Cassandraのデフォルト値であるONEを指定してみましょう。Cassandra#insert()メソッドの4番めの引数にリスト3のようなハッシュを渡します。ONEという定数はリスト3でインクルードしたモジュールによって使えるようになります注7⁠。

なお,Cassandraでの書き込み処理は,ディスクへのシーケンシャルな書き出し(ジャーナリング)とメモリ上での記録が終わった時点で成功を返すため,非常に高速です。

表6 書き込み要求における整合性レベル

レベル動作
ZERO書き込み要求を受け取った時点で処理を返し,その後バックグラウンドで書き込み処理を行う。失敗は報告されない
ANYどこかで1回書き込みされたことが通知されるまで待ってから処理を返す。本来担当すべきノードがダウンしている場合は最寄りの別のノードがデータを一時保管し,本来のノードへの伝達はキューイングされる
ONE1つのノードでの書き込み成功が通知されてから処理を返す(Ruby Cassandraでの省略時デフォルト)
QUORUMレプリケーションファクタ値の過半数(N/2+1)のノードが書き込みに成功するのを待ってから処理を返す
ALLレプリケーションファクタで指定されたすべてのノードの書き込みに成功するのを待ってから処理を返す
注7)
この引数はONEの場合省略可能ですが,説明のために明示的に記述しています。

履歴データの読み込み

次に,履歴情報の読み込み処理を行うUserHistory #get()メソッドの実装について見てみますリスト4⁠。Cassandraから返ってくるデータは,デフォルトでは昇順に並んでいますが,パラメータによって降順にすることができます。今回必要なのは最新分からのデータですから,ここではのようにして降順を指定します。

のCassandra#get()メソッドの返り値は,Cassandra::OrderedHashクラス注8のインスタンスです。Hashクラスと同等のAPIを持ち,順序を保持する性質を持っているため,のようにvalues()メソッドを使うとCassandraの返したとおりの順序でカラムの値を取り出すことができます。

リスト4 RAILS_ROOT/lib/user_history.rb(読み込み部)


class UserHistory
  MAX_COUNT = 100
  COUNT = 10

  def get(shop_id, user_id)
    options = {
      :reversed => true,  ---①
      :consistency => QUORUM,  ---②
      :count => MAX_COUNT  ---③
    }
    columns = @cassandra.get(:UserHistory,  ---④
                             row_key(shop_id, user_id),
                             options)
    item_ids = columns.values.map{|s| s.to_i}  ---⑤
    return item_ids.uniq[0, COUNT]
  end
end
読み込み時の整合性レベル

Cassandraの読み込み時の整合性レベルを表7に示します。先ほどと同様に,表7の下のほうにいくほど整合性のレベルは高くなりますが,その分処理速度は遅くなります。

表7 読み込み要求における整合性レベル

レベル動作
ONE読み込み要求に対して最初にレスポンスされてきたデータを返す(Ruby Cassandraでの省略時デフォルト)
QUORUMレプリケーションファクタ値の過半数(N/2+1)からのレスポンスが返ってきた時点で,タイムスタンプが最も新しいデータを返す
ALLデータを持つすべてのノードからのレスポンスを待ち,タイムスタンプが最も新しいデータを返す

Cassandraでは,次の式を満たすことで強い整合性を得ることができます。

  • W + R > N
  • W:書き込み(insert())時の整合性レベル
  • R:読み込み(get())時の整合性レベル
  • N:レプリケーションファクタ

リスト3ではinsert()メソッドでONEを指定しました。この場合W=1となり,get()メソッドの整合性レベルに,レプリケーションファクタが1ならばONE以上(N=1,R=1⁠⁠,2ならばQUORUM以上(N=2,R=N/2+1=2⁠⁠,3以上ならばALL(N=R≧3)を指定すると式を満たします。リスト4ではレプリケーションファクタが2であるという想定で,QUORUMを指定しています。

なお,Cassandraでの読み込み処理はディスクのランダムアクセスを伴うため,書き込み処理よりも遅いという特性を持っています。

読み込み件数の制限

履歴情報が蓄積されるにつれて,行にあたるカラムデータは大きくなっていきます。巨大なデータの読み込みはサーバとクライアント双方のメモリを圧迫してしまうため,履歴の表示に不要なデータを読み込まないように制限します。表示すべき件数は10件ですが,履歴情報には重複があるため,それを見越した件数を読み込む必要があります。既存の統計データからユーザのアイテム詳細ページへの再訪率を調べて取得件数を決めます。今回の例題では,リスト4のように最大100件という値を指定しています。

注8)
cassandra-0.8.2のgemに含まれているOrderedHashは,activesupport-2.3.5のActiveSupport::OrderedHashと同内容のものです。

これで,RailsアプリケーションからCassandraを使って「最近チェックしたアイテム」を読み書きするクラスが完成しました。なお,ここで解説したコードとRailsアプリケーションのひな形一式を,本誌サポートサイトからダウンロードできますので,参考にしてください。

著者プロフィール

島田慶樹(しまだけいき)

株式会社ケイビーエムジェイ コマースソリューション事業部 シニアエンジニア。

主にRuby on RailsによるWebシステム開発に従事。大規模ASPサービスの開発・保守を通じてシステムのスケーラビリティやパフォーマンスの問題に取り組み,NoSQLを始めとしたさまざまな技術と格闘する日々。好きなプログラミング言語はSmalltalkとLisp。いつかはPrologをマスターしたいと思いつつ,今はScalaをかじっている。

Twitter:@_shimada