システムはできて終わりではなく、そこから始まる
前回は、
前回までで、
最終回となる今回は、
負荷分散のための設定
AP4Rにおける負荷分散はいくつかのポイントがあります。
- 非同期処理を複数プロセスへ分散
- ウェブサーバのプロキシ機能を使う
- AP4RのURL変換フィルタを使う
- AP4R プロセス自体の多重化
- メッセージ送信を分散する
- AP4R プロセス間でメッセージを転送する
既に説明した内容もありますが、
ウェブサーバのプロキシ機能にて、非同期処理を複数プロセスへ分散
非同期の処理を複数プロセスで分散させることを考えた場合、
最も簡単な構成は、

では、
modify_rules:
url: proc {|url| url.host = "your.async.host"}

リバースプロキシを同期処理と共用にする場合は、

dispatchersとURL変換フィルタ
より細かく、dispatchers
の初期設定を見るとこのようになっています。
dispatchers:
-
targets: queue.*
threads: 1
これは、targets
で指定されるキュー
dispatchers:
-
targets: queue.very_busy.*
threads: 10
modify_rules:
url: proc {|url| url.host = "busy.async.host"}
-
targets: queue.very_heavy.*
threads: 1
modify_rules:
url: proc {|url| url.host = "heavy.async.host"}
数の多い処理は、busy.
にて処理をさせ、heavy.
に流れるようにしています。また、
メッセージ送信の分散
ここからは、
これまでの例では、

AP4R プロセス間のメッセージ転送
複数のAP4Rプロセスを実行している場合、carriers
という項目がこの機能にあたります。
(注) - この機能は、
設定方法によっては、 AP4Rプロセス間でメッセージのキャッチボールが起きたり、 メッセージDBの負荷が増えたりする難点もあります。必要な設定項目、 アルゴリズムなどを考慮する余地があるため、 現状は 「実験的」 な機能と位置付けています。
設定例は以下のようになります。
carriers:
-
source_uri: druby://another.ap4r.host:6438
threads: 1
どのAP4Rプロセスsource_
)から、threads
)でメッセージを転送するかを設定しています。各スレッドは、dispatchers
が処理できるキューからメッセージを取得し、

(注) - 複数のAP4Rプロセスを利用する場合には、
サーバソケットを開くアドレスとポートの指定で注意する点があります。バージョン1. 1のreliable-msgには、 デフォルトポート (6438) 以外でのLISTENが出来ないバグがあります。またlocalhost以外でのLISTENが出来ません。AP4RのWiki: FAQに修正方法を書いています。
以上のように、
メッセージのリカバリ
メッセージのリカバリには2つの種類があります。冒頭でも触れましたが、
SAF リカバリ
発生状況
SAF機能を利用しているときに発生します。メッセージは、

状況の確認
SAF用テーブルに保管されているメッセージを確認します。最初に、
% cd as_rails % ruby script/console Loading development environment. >> Ap4r::StoredMessage.destroy_all
次に、
as_
(省略)
Ap4r::AsyncHelper::base.saf_delete_mode = :logical
RailsプロセスとAP4Rプロセスを起動します。
% cd as_rails % ruby script/server
% cd as_ap4r % ruby script/mongrel_ap4r start -A config/queues_mysql.cfg
これまで作成してきたアプリケーションの画面をひらき、
「Order was successfully created.」
% cd as_rails % ruby script/console Loading development environment. >> y Ap4r::StoredMessage.find(:all) --- - !ruby/object:Ap4r::StoredMessage attributes: status: "1" updated_at: 2007-09-14 10:21:30 duplication_check_id: bf6627e0-448e-012a-cfbe-0016cb9ad524 id: "29" queue: queue.async_shop.payment object: "\x04\b\"\x10order_id=29" created_at: 2007-09-14 10:21:30 headers: "\x04\b{\n\ :\rdelivery:\tonce:\x0Fqueue_name\ "\x1Dqueue.async_shop.payment:\x12dispatch_mode:\tHTTP:\x0Ftarget_url\ "-http://localhost:3000/async_shop/payment:\x12target_method\"\tPOST" => nil
statusは"1"となっています。第3回のSAF機能の説明でも触れましたが、
status = 0 | 未処理 |
---|---|
status = 1 | 処理済 |
したがって、
次に、Ctrl+C
で終了した状態で注文処理を実行します。ステータスは未処理をあらわす"0"となっているでしょう。
>> y Ap4r::StoredMessage.find(:all) --- - !ruby/object:Ap4r::StoredMessage attributes: status: "1" updated_at: 2007-09-14 10:21:30 (省略)
- !ruby/object:Ap4r::StoredMessage attributes: status: "0" updated_at: 2007-09-14 10:26:58 duplication_check_id: 82912650-448f-012a-cfbe-0016cb9ad524 id: "30" queue: queue.async_shop.payment object: "\x04\b\"\x10order_id=30" created_at: 2007-09-14 10:26:58 headers: "\x04\b{\n\ :\rdelivery:\tonce:\x0Fqueue_name\ "\x1Dqueue.async_shop.payment:\x12dispatch_mode:\tHTTP:\x0Ftarget_url\ "-http://localhost:3000/async_shop/payment:\x12target_method\"\tPOST" => nil
SAF用テーブルに保管されているメッセージが多い場合には、
>> y Ap4r::StoredMessage.find_status_of(:all).map{|sm| sm.to_summary_string} --- - 29, queue.async_shop.payment, Fri Sep 14 10:21:30 +0900 2007 - 30, queue.async_shop.payment, Fri Sep 14 10:26:58 +0900 2007 => nil
>> y Ap4r::StoredMessage.find_status_of(:unforwarded).map{|sm| sm.to_summary_string} --- - 30, queue.async_shop.payment, Fri Sep 14 10:26:58 +0900 2007 => nil
リカバリ方法
メッセージを再度AP4Rに Forward します。AP4R プロセスを起動しておきましょう。
% cd as_ap4r % ruby script/mongrel_ap4r start -A config/queues_mysql.cfg
SAF用テーブルに保管されている、
% cd as_rails % ruby script/console Loading development environment. >> Ap4r::StoredMessage.reforward_all => [1, 0]
リカバリ対象のメッセージが大量にある場合は、
>> Ap4r::StoredMessage.find_status_of(:unforwarded).size => 10 >> Ap4r::StoredMessage.reforward_all 3 ActiveRecord::RecordNotFound: ...(省略) => [7, 3] >> y Ap4r::StoredMessage.find_status_of(:unforwarded).map{|sm| sm.to_summary_string} --- - 35, queue.async_shop.payment, Fri Sep 14 10:38:07 +0900 2007 - 36, queue.async_shop.payment, Fri Sep 14 10:38:19 +0900 2007 - 37, queue.async_shop.payment, Fri Sep 14 10:38:28 +0900 2007 => nil
また、
>> Ap4r::StoredMessage.reforward 36 => true
SAF用テーブルを管理するRailsアプリケーションを作成すれば、

HelloWorldアプリケーションを起動し、
各メッセージの中身も参照できますので、

DLQリカバリ
発生状況
メッセージの処理中のネットワーク障害や、
状況の確認
AP4Rが起動中であれば、
まず、AsyncShopController#payment
アクションの頭で
raise "dummy exception to test DLQ"
として、payment
アクションの処理中にダミーの例外が発生したこと、
では、dlq
とキューの管理をしている ReliableMsg::QueueManager
(のインスタンス)qm
を取得します。
% irb -rubygems -rap4r >> dlq = ReliableMsg::Queue.new "$dlq" => #<ReliableMsg::Queue:0x2838360 @queue="$dlq"> >> qm = dlq.send :qm => #<DRb::DRbObject:0x2832b04 @ref=nil, @uri="druby://localhost:6438">
次に、$dlq
という名前になっています)
>> y qm.list(:queue => "$dlq") --- # 一部整形しています - :max_deliveries: 5 :priority: 0 :created: 1189737438 :target_method: POST :redelivery: 1 :queue: queue.async_shop.payment :expires: :id: 56205b90-4499-012a-1774-0016cb9ad556 :delivery: :once :target_url: http://localhost:3000/async_shop/payment :dispatch_mode: :HTTP => nil
ここではメッセージのヘッダー部分のみが表示されていますが、:queue
の箇所に元のキューの名前が記録されています。
リカバリ方法
ではリカバリを行います。DLQからメッセージを抜いて、payment
アクションに入れたダミーの例外は削除しておきましょう。
>> dlq.get{|m| dlq.put(m.object, m.headers) } => "5f7df210-4499-012a-1774-0016cb9ad556"
これで元のキューにメッセージが入り、payment
アクションが呼ばれます。少し待ってからブラウザをリロードすると
(注) - 上のスクリプトでは、
一見DLQに put
しているように見えますが、実際には、 m.
のheaders :queue
で指定されたキューに入ります。
ここで、
>> y qm.list(:queue => "$dlq") --- # 抜粋。一部整形 - :id: e232a830-4499-012a-1774-0016cb9ad556 => nil >> dlq.get{|m| raise "dummy recovery error"} RuntimeError: dummy recovery error # 以下略
さて、
>> y qm.list(:queue => "$dlq") --- # 抜粋。一部整形 - :id: e232a830-4499-012a-1774-0016cb9ad556 => nil
同じ :id
でメッセージが残っていることが分かります。これは、ReliableMsg::Queue#get
を呼び出すと、
DLQに入ったメッセージの確認と操作について説明しました。最低限のことは可能ですが、
- リカバリが必要なメッセージ数が多い場合
- メッセージの中身が大きい場合
- リカバリ処理の自動化
(例: DLQに30分入っているメッセージを自動的にリカバリする)
今後のAP4Rの拡張には、
まとめ
全4回に渡ってAP4Rの解説をしてきました。AP4Rの概説にはじまり、
AP4Rとは
AP4Rとは、
「軽量」かつ「堅牢」
AP4RはRubyで書かれています。Rubyのもつ簡潔さ、
一方、
シンプルなAPI
メッセージ送信の async_
と、transaction
の2つのAPIのみで非同期処理を組み込むことが可能です。また、
堅牢性の要、SAF機能
AP4RのメッセージングとしてのQoS
非同期でもテストはしっかり
非同期処理を含むアプリケーションでも、
柔軟なシステム構成
AP4Rを利用した典型的なシステムの構成は、
また、
いざというときのリカバリ
稼動しはじめたシステムは、

最後に
AP4Rでは今後の拡張として、
本連載に最後までお付き合いいただき、
AP4R関連リンク
- RubyForge
- 開発者ブログ
- その他