AP4R、Rubyで非同期メッセージング

第4回システムは稼働してからがはじまり

システムはできて終わりではなく、そこから始まる

前回は、安心のためのSAF機能とテストサポートをみてきました。SAFによりat-least-onceという実行時の堅牢さが保証され、メッセージが消失しない安心を得ることができました。また、非同期まで含めたテストの仕組みにより、TDD/BDDと同じように開発ができ、リリースに対する安心感も得られました。

前回までで、アプリケーションは一応の完成をみています。しかし、システムは作って終わりというわけにはいきません。利用状況の変化に合わせて円滑に処理が進むよう調整しなければなりませんし、また障害発生時にもすみやかに復旧する必要があります。

最終回となる今回は、負荷分散のための設定と、SAF Forwardエラーのリカバリ、および業務処理がエラーとなったメッセージのリカバリを説明します。

負荷分散のための設定

AP4Rにおける負荷分散はいくつかのポイントがあります。

  • 非同期処理を複数プロセスへ分散
    • ウェブサーバのプロキシ機能を使う
    • AP4RのURL変換フィルタを使う
  • AP4R プロセス自体の多重化
    • メッセージ送信を分散する
    • AP4R プロセス間でメッセージを転送する

既に説明した内容もありますが、負荷分散という観点でまとめて見ていきましょう。

ウェブサーバのプロキシ機能にて、非同期処理を複数プロセスへ分散

非同期の処理を複数プロセスで分散させることを考えた場合、通常のHTTPリクエストをリバースプロキシで分散させるのと同様に、ウェブサーバの機能を利用する方法があります。

最も簡単な構成は、同期処理と同じウェブサーバを利用して、処理を担当するRailsプロセス群も同期、非同期の区別無く動かすことです。この場合、Railsがリバースプロキシの後ろで動くようになっていれば、追加の設定は必要ありません。Railsアプリケーションを動かすときのよくある構成と同じです。

図1 リバースプロキシで複数プロセスに分散させる
図1 リバースプロキシで複数プロセスに分散させる

では、非同期処理を担当するRailsプロセス群を、同期のプロセス群から分離する場合はどうなるでしょうか。リバースプロキシまで含めて分離する場合は、AP4RのURL変換フィルタにて対応することが出来ます。前回までの例では、ポート番号を変更していましたが、ホストの部分を変更すればOKです。

modify_rules:
  url: proc {|url| url.host = "your.async.host"}
図2 リバースプロキシごと非同期処理を分離する
図2 リバースプロキシごと非同期処理を分離する

リバースプロキシを同期処理と共用にする場合は、AP4Rでの設定は不要です。プロキシの設定で非同期と同期を分離することになります。プロキシの設定を簡略化するためには、非同期に流したいURLやHTTPヘッダなどに、接頭辞やキーワードなど共通の構造を持たせる必要があるかもしれません。

図3 リバースプロキシにより、同期と非同期の処理を分離させる
図3 リバースプロキシにより、同期と非同期の処理を分離させる

dispatchersとURL変換フィルタ

より細かく、メッセージの種類により処理するサーバーを分散させたい場合にはどのような方法があるでしょうか。現状のAP4Rでは、キューの名前で分散させることが可能です。dispatchers の初期設定を見るとこのようになっています。

dispatchers:
  -
    targets: queue.*
    threads: 1

これは、targets で指定されるキュー(アスタリスクは前方一致)から1本のスレッドでメッセージを取得する、という設定になっています。非同期処理のなかに、非常に数の多い処理と、非常に時間のかかる処理があるとしましょう。それぞれを別サーバで処理したいとします。この場合、次のような設定で分割する方法があります。

dispatchers:
  -
    targets: queue.very_busy.*
    threads: 10
    modify_rules:
      url: proc {|url| url.host = "busy.async.host"}
  -
    targets: queue.very_heavy.*
    threads: 1
    modify_rules:
      url: proc {|url| url.host = "heavy.async.host"}

数の多い処理は、busy.async.host にて処理をさせ、時間のかかる処理は、heavy.async.host に流れるようにしています。また、数の多い処理を捌くスレッドは数を調整する必要もあるでしょう。あくまで参考の設定ですが、このような柔軟性があります。

メッセージ送信の分散

ここからは、AP4Rのプロセスが複数ある場合を考えます。

これまでの例では、ひとつのRailsプロセスからは、ひとつのAP4Rプロセスへメッセージを送信していました。Railsプロセスから複数のAP4Rプロセスへ送信できると、大量の非同期メッセージが発生する場合には、負荷分散ができます。また、AP4Rプロセスが止まってしまった場合にも、非同期メッセージの送信が出来るため、耐障害性の向上にもつながります。この機能は現在実装中で、近々リリースの予定です。

図4 メッセージ送信を複数のAP4Rプロセスへ分散させる
図4 メッセージ送信を複数のAP4Rプロセスへ分散させる

AP4R プロセス間のメッセージ転送

複数のAP4Rプロセスを実行している場合、とても忙しくてメッセージが溜ってしまうものがいる一方で、暇なものが出てくるかもしれません。そのような場合に、溜っているメッセージを暇なプロセスへ転送することが出来ます。設定ファイル中、carriers という項目がこの機能にあたります。

(注)
この機能は、設定方法によっては、AP4Rプロセス間でメッセージのキャッチボールが起きたり、メッセージDBの負荷が増えたりする難点もあります。必要な設定項目、アルゴリズムなどを考慮する余地があるため、現状は「実験的」な機能と位置付けています。

設定例は以下のようになります。

carriers:
  - 
    source_uri: druby://another.ap4r.host:6438
    threads: 1

どのAP4Rプロセスsource_uri )から、何本のスレッドthreads )でメッセージを転送するかを設定しています。各スレッドは、自プロセス内の dispatchers が処理できるキューからメッセージを取得し、自プロセス内のキューに保存しなおします。

図5 メッセージの転送
図5 メッセージの転送
(注)
複数のAP4Rプロセスを利用する場合には、サーバソケットを開くアドレスとポートの指定で注意する点があります。バージョン1.1のreliable-msgには、デフォルトポート(6438)以外でのLISTENが出来ないバグがあります。またlocalhost以外でのLISTENが出来ません。AP4RのWiki: FAQに修正方法を書いています。

以上のように、メッセージの送信と処理を複数プロセスに分散させたり、簡単な流量の制御により、システム運用における非同期処理の円滑な実行をサポートします。

メッセージのリカバリ

メッセージのリカバリには2つの種類があります。冒頭でも触れましたが、SAF Forwardエラーのリカバリ、および業務処理がエラーとなったメッセージのリカバリです。 簡単のため、前者をSAFリカバリ、後者をDLQリカバリと呼びます。DLQとはDead Letter Queueの略称で、メッセージングで配信不能のメッセージを格納する場所としてよく使われます。それぞれの発生状況、リカバリ方法についてはこのあと詳しく述べていきます。

SAF リカバリ

発生状況

SAF機能を利用しているときに発生します。メッセージは、SAF用テーブルにStoreされた後、AP4RサービスにForwardされますが、このタイミングでネットワークの障害や、Railsプロセス、あるいはAP4Rプロセスのダウンがあったとします。このとき、SAF Forwardエラーとなります。この場合、メッセージはAP4Rに渡っていませんので、非同期処理が実行されません。システムの復旧後、再度メッセージをForwardする必要があります。メッセージの情報はSAF用テーブルに残っていますので、そこからリカバリします。

図6 SAF Forward エラーの発生状況
図6 SAF Forward エラーの発生状況

状況の確認

SAF用テーブルに保管されているメッセージを確認します。最初に、前回からのメッセージが残っているかもしれないので、きれいにお掃除しましょう。

% cd as_rails
% ruby script/console
Loading development environment.
>> Ap4r::StoredMessage.destroy_all

次に、メッセージを確認したいのでSAF用テーブルが論理削除の設定になっていることを確認します。environment.rbの最後に以下のコードがあれば大丈夫です。

as_rails/config/environment.rb

(省略)
Ap4r::AsyncHelper::base.saf_delete_mode = :logical

RailsプロセスとAP4Rプロセスを起動します。

% cd as_rails
% ruby script/server
% cd as_ap4r
% ruby script/mongrel_ap4r start -A config/queues_mysql.cfg

これまで作成してきたアプリケーションの画面をひらき、注文処理を実行します。

「Order was successfully created.」と画面に表示されたら、SAF用テーブルを確認します。

% cd as_rails
% ruby script/console
Loading development environment.
>> y Ap4r::StoredMessage.find(:all)
---
- !ruby/object:Ap4r::StoredMessage
  attributes:
    status: "1"
    updated_at: 2007-09-14 10:21:30
    duplication_check_id: bf6627e0-448e-012a-cfbe-0016cb9ad524
    id: "29"
    queue: queue.async_shop.payment
    object: "\x04\b\"\x10order_id=29"
    created_at: 2007-09-14 10:21:30
    headers: "\x04\b{\n\      :\rdelivery:\tonce:\x0Fqueue_name\
      "\x1Dqueue.async_shop.payment:\x12dispatch_mode:\tHTTP:\x0Ftarget_url\
      "-http://localhost:3000/async_shop/payment:\x12target_method\"\tPOST"
=> nil

statusは"1"となっています。第3回のSAF機能の説明でも触れましたが、このステータスは次の意味があります。

status = 0未処理(Forward前、およびForward失敗時)
status = 1処理済(Forward成功時)

したがって、このステータスはForwardが無事に成功したことをあらわしています。

次に、未処理のステータスをつくるために、AP4Rプロセスを Ctrl+C で終了した状態で注文処理を実行します。ステータスは未処理をあらわす"0"となっているでしょう。

>> y Ap4r::StoredMessage.find(:all)
--- 
- !ruby/object:Ap4r::StoredMessage 
  attributes: 
    status: "1"
    updated_at: 2007-09-14 10:21:30
  (省略)
- !ruby/object:Ap4r::StoredMessage
  attributes:
    status: "0"
    updated_at: 2007-09-14 10:26:58
    duplication_check_id: 82912650-448f-012a-cfbe-0016cb9ad524
    id: "30"
    queue: queue.async_shop.payment
    object: "\x04\b\"\x10order_id=30"
    created_at: 2007-09-14 10:26:58
    headers: "\x04\b{\n\      :\rdelivery:\tonce:\x0Fqueue_name\
      "\x1Dqueue.async_shop.payment:\x12dispatch_mode:\tHTTP:\x0Ftarget_url\
      "-http://localhost:3000/async_shop/payment:\x12target_method\"\tPOST"
=> nil

SAF用テーブルに保管されているメッセージが多い場合には、次のようにすると簡単に一覧が見られます。左から順に、ID、キューの名前、生成日時をあらわしています。

>> y Ap4r::StoredMessage.find_status_of(:all).map{|sm| sm.to_summary_string}
---
- 29, queue.async_shop.payment, Fri Sep 14 10:21:30 +0900 2007
- 30, queue.async_shop.payment, Fri Sep 14 10:26:58 +0900 2007
=> nil
>> y Ap4r::StoredMessage.find_status_of(:unforwarded).map{|sm| sm.to_summary_string}
---
- 30, queue.async_shop.payment, Fri Sep 14 10:26:58 +0900 2007
=> nil

リカバリ方法

メッセージを再度AP4Rに Forward します。AP4R プロセスを起動しておきましょう。

% cd as_ap4r
% ruby script/mongrel_ap4r start -A config/queues_mysql.cfg

SAF用テーブルに保管されている、すべての未処理のメッセージをリカバリするには次のようにします。戻り値は、リカバリに成功したメッセージの数と失敗した数の配列になります。ここでは、未処理のメッセージは1つだったので、無事にリカバリができたことになります。

% cd as_rails
% ruby script/console
Loading development environment.
>> Ap4r::StoredMessage.reforward_all
=> [1, 0]

リカバリ対象のメッセージが大量にある場合は、引数を指定することで、その数ごとにひとつのデータベーストランザクションとしてまとめて処理できます。引数なしの場合は、10メッセージごとに処理しています。下記の例では、未処理のメッセージを10用意しました。3つずつまとめて再送処理を実行していますが、そのなかでエラーが発生しています。エラーのあった回の3つは処理が失敗に終わり、それ以外の7つは無事に成功しています。失敗したメッセージはさきほどと同様にして確認できます。

>> Ap4r::StoredMessage.find_status_of(:unforwarded).size
=> 10         
>> Ap4r::StoredMessage.reforward_all 3
ActiveRecord::RecordNotFound: ...(省略)
=> [7, 3]
>> y Ap4r::StoredMessage.find_status_of(:unforwarded).map{|sm| sm.to_summary_string}
---
- 35, queue.async_shop.payment, Fri Sep 14 10:38:07 +0900 2007
- 36, queue.async_shop.payment, Fri Sep 14 10:38:19 +0900 2007
- 37, queue.async_shop.payment, Fri Sep 14 10:38:28 +0900 2007
=> nil

また、特定のメッセージのみリカバリするには、次のようにします。

>> Ap4r::StoredMessage.reforward 36
=> true

SAF用テーブルを管理するRailsアプリケーションを作成すれば、GUI環境でリカバリすることも可能です。RubyForgeからHelloWorldアプリケーションがダウンロードできますが、そちらにそのサンプル実装があります。HelloWorldアプリケーションの動かし方は、ホームページのGetting Startedを参考にしてください。

図7 HelloWorldサンプルアプリケーション
図7 HelloWorldサンプルアプリケーション

HelloWorldアプリケーションを起動し、以下のURLにアクセスすると、SAFリカバリの画面が表示されます。

各メッセージの中身も参照できますので、必要に応じて「Recovery」のリンクを押してください。

図8 SAFリカバリ画面
図8 SAFリカバリ画面

DLQリカバリ

発生状況

メッセージの処理中のネットワーク障害や、メッセージを処理するアプリケーションのエラーなどで、メッセージは元のキューからDLQに移動されます。ネットワークやDBサーバの障害の場合、DLQに入ったメッセージを元のキューに戻すことで、再度処理が行われます。メッセージ内のデータがおかしいなど、アプリケーションのエラーでDLQに入った場合は、アプリケーションのロジックを変更するか、データ自体を修正してから元のキューに戻す必要があります。今回は後者については触れません。

状況の確認

AP4Rが起動中であれば、irbから確認できます。

まず、DLQに入った状況を作るために、わざとエラーを発生させてみましょう。ここでは簡単のため、 AsyncShopController#payment アクションの頭で

raise "dummy exception to test DLQ"

として、エラーを発生させます。この状況でブラウザから注文処理を実行すると、ブラウザには"Order was successfully created."と表示されますが、RailsおよびAP4Rのログにはエラーが記録されます。Railsのログでは payment アクションの処理中にダミーの例外が発生したこと、それによりAP4R側のログでメッセージ処理がエラーになったことが残ります。

では、DLQの中身を見てみましょう。まず、irbを起動して、DLQへの(druby越しの)参照 dlq とキューの管理をしている ReliableMsg::QueueManager ⁠のインスタンス)への参照 qm を取得します。

% irb -rubygems -rap4r
>> dlq = ReliableMsg::Queue.new "$dlq"
=> #<ReliableMsg::Queue:0x2838360 @queue="$dlq">
>> qm = dlq.send :qm
=> #<DRb::DRbObject:0x2832b04 @ref=nil, @uri="druby://localhost:6438">

次に、DLQ(内部では、$dlq という名前になっています)の一覧を表示します。

>> y qm.list(:queue => "$dlq")
--- # 一部整形しています
- :max_deliveries: 5
  :priority: 0
  :created: 1189737438
  :target_method: POST
  :redelivery: 1
  :queue: queue.async_shop.payment
  :expires:
  :id: 56205b90-4499-012a-1774-0016cb9ad556
  :delivery: :once
  :target_url: http://localhost:3000/async_shop/payment
  :dispatch_mode: :HTTP
=> nil

ここではメッセージのヘッダー部分のみが表示されていますが、1件のメッセージがあることを確認できます。ヘッダーの中で、:queue の箇所に元のキューの名前が記録されています。

リカバリ方法

ではリカバリを行います。DLQからメッセージを抜いて、元のキューに入れるという手順になります。さきほど payment アクションに入れたダミーの例外は削除しておきましょう。

>> dlq.get{|m| dlq.put(m.object, m.headers) }
=> "5f7df210-4499-012a-1774-0016cb9ad556"

これで元のキューにメッセージが入り、dispatcherスレッドがRailsにリクエストしてpayment アクションが呼ばれます。少し待ってからブラウザをリロードすると「Payed at」に時刻が表示されるはずです。

(注)
上のスクリプトでは、一見DLQに put しているように見えますが、実際には、m.headers:queue で指定されたキューに入ります。

ここで、ちょっと横道に逸れて、リカバリしている最中にエラーが発生した場合を考えてみましょう。DLQにメッセージを1つ入れてから、それを取得したタイミングで例外を投げてみます。

>> y qm.list(:queue => "$dlq")
--- # 抜粋。一部整形
- :id: e232a830-4499-012a-1774-0016cb9ad556
=> nil
>> dlq.get{|m| raise "dummy recovery error"}
RuntimeError: dummy recovery error # 以下略

さて、DLQに入っていたメッセージはどうなっているでしょうか。

>> y qm.list(:queue => "$dlq")
--- # 抜粋。一部整形
- :id: e232a830-4499-012a-1774-0016cb9ad556
=> nil

同じ :id でメッセージが残っていることが分かります。これは、reliable-msgの機能により、ブロック付きで ReliableMsg::Queue#get を呼び出すと、⁠メッセージングの意味での)トランザクション内でブロックが実行されるためです。詳しくは、RDocを参照してください。

DLQに入ったメッセージの確認と操作について説明しました。最低限のことは可能ですが、実用のためには、考慮すべき事項がいくつかあります。

  • リカバリが必要なメッセージ数が多い場合
  • メッセージの中身が大きい場合
  • リカバリ処理の自動化(例: DLQに30分入っているメッセージを自動的にリカバリする)

今後のAP4Rの拡張には、このようなことも含めていく予定です。

まとめ

全4回に渡ってAP4Rの解説をしてきました。AP4Rの概説にはじまり、キーワードとして「軽量」かつ「堅牢」をあげました。そして、それらを通じてアプリケーションを気軽に、かつ安心して非同期拡張することができることを、サンプルアプリケーションの作成を例にお話してきました。最後に、簡単なまとめをして連載の締めくくりとしたいと思います。

AP4Rとは

AP4Rとは、Asynchronous Pocessing for Rubyの略で、Rubyで非同期処理を実現するためのライブラリです。AP4Rをアプリケーションに適用することで、⁠重い」処理を非同期に切り離し、ユーザーへの素早い応答が可能になります。また、疎結合化されることで処理の分散もしやすくなり、負荷増大時のスケーラビリティにもつながります。

「軽量」かつ「堅牢」

AP4RはRubyで書かれています。Rubyのもつ簡潔さ、柔軟さをうまくいかし、設定やAPIができるだけ使いやすいものになることを目指しています。そして、実装/テスト/運用をオールインワンでサポートした、導入しやすいライブラリとなるよう開発を進めています。

一方、簡単に導入できるライブラリでありながら、SAF機能により確実なメッセージの配送を実現しています。予期せずシステムを襲う異常時にもメッセージを死守する。これが信頼性のあるメッセージングに求められる必須要件となります。

シンプルなAPI

メッセージ送信の async_to と、SAF機能のための transaction の2つのAPIのみで非同期処理を組み込むことが可能です。また、引数の指定方法では、RubyやRailsとの親和性を考え、直感的に使えることを狙っています。

堅牢性の要、SAF機能

AP4RのメッセージングとしてのQoS(Quality of Service)は、at-least-onceです。すなわち、メッセージを生成するRailsプロセス(Producer)と、そのメッセージを処理するRailsプロセス(Consumer)の間で、最低でも 1回は処理が実行されることを保証します。データベースやネットワークの障害、プロセスの突然のダウンが発生してもメッセージが失なわれることがありません。AP4RではSAF(Store and Forward)という仕組みを通じて、このQoSを実現しています。

非同期でもテストはしっかり

非同期処理を含むアプリケーションでも、網羅的なテストが(比較的)簡単に実行できることは重要です。メッセージング処理では、複数のプロセスが関係することと、ネットワーク通信により時間がかかることの2点を考慮する必要があります。そこで、AP4Rでは、2つのテスト方法を用いて、テストしやすさと、網羅性を補完的に実現しています。メッセージのキューイングをスタブ化し素早く実行するfunctionalテストと、実際の通信も含め実動作となるべく近い環境で行うasyncテストです。

柔軟なシステム構成

AP4Rを利用した典型的なシステムの構成は、ウェブサーバ、Railsプロセス群、そしてAP4Rプロセスとなります。AP4Rを複数用意することでメッセージング層の負荷分散を図ったり、非同期処理用のプロセス群を分割することも出来ます。その一方、非同期メッセージ処理のリクエストは、HTTPなど、標準的なプロトコルを用いて送信されていますので、Railsに限らず、HTTPで受け付けているサービスと連携することも可能です。連携先は、URL変換フィルタの機能を利用して柔軟に対応できます。

また、様々な負荷分散のための設定を調整することで、処理量の増大に備えたスケールアウトや、メッセージの種類に応じた流量制御も可能です。

いざというときのリカバリ

稼動しはじめたシステムは、予期せぬエラーに見舞われることがあります。メッセージの通信中における障害や、データの不整合による処理の異常終了などです。こうした場合に備え、メッセージを復旧、再送する手だてが必要となります。AP4Rでは、SAFリカバリとDLQリカバリの方法を提供しています。

図9 軽量さと堅牢さのバランス
図9 軽量さと堅牢さのバランス

最後に

AP4Rでは今後の拡張として、Capistrano用管理レシピ、Stompプロトコルのサポート、統合監視ツールへの対応などを予定しています。興味を持たれた方は、ユーザー向けのメーリングリストがありますので、そちらで意見や質問、使い勝手のフィードバック、こんな機能が欲しいという要望などを頂ければと思います。筆者らのブログにトラックバックを飛ばしてもらうのでもOKです。

本連載に最後までお付き合いいただき、ありがとうござました。AP4Rについての理解が深まり、皆さんが立ちあげているサービスや手がけているプロジェクトで採用していただけたら幸いです。

AP4R関連リンク

RubyForge
開発者ブログ
その他

おすすめ記事

記事・ニュース一覧