Perl Hackers Hub

第10回ジョブキューで後回し大作戦―TheSchwartz、Qudo、Q4M(2)

Qudo―もう一つのTheSchwartz

TheSchwartzは実績豊富なプログラムですが、拡張性に欠けるところがあります。そこで筆者は拡張性に重点を置いたジョブキューのしくみを作成しました。それがQudo(クドー)です。

QudoはTheSchwartzにはないフックポイントとプラグイン機構を提供しています。フック機能を使えば、ジョブデータをデータベースに登録する際に使用するシリアライザなどを自由に選択できます。

TheSchwartzではシリアライザはStorable以外は利用できませんでしたが、QudoではJSONJavaScript Object NotationやData::MessagePackを使ってジョブデータをシリアライズできます。Storableでシリアライズされたデータはデータベースの中身を直接覗いても読めませんが、JSONでシリアライズすればデータベースを直接見たときに読める状態でジョブが登録されます。

また、Storableは利用するPerlのバージョンによって互換性がないことがあります。筆者はStorableのバージョン2.15でシリアライズしたデータが2.13でデシリアライズできない問題に遭遇したこともあります。StorableはPerlのコアモジュールとして提供されているためバージョンの調整が難しくなります。StorableはData::MessagePackなどのシリアライズモジュールと比較すると低速でデータの圧縮率も悪いので、可能であればStorable以外のシリアライズモジュールを利用するとよいでしょう。

ほかにもフックポイントを利用すると、ジョブの処理が一定回数以上失敗したらアラートメールを送信したりすることもできます。このようにQudoはTheSchwartzではちょっと手の届かない部分を自由に拡張できます。

Qudoの使い方

QudoもTheSchwartzと同様にcpanmなどでインストールしてください。Qudoを使うにはTheSchwartzと同様にバックエンドにRDBMSを利用し、MySQL、SQLite、PostgreSQLに対応しています。

データベースのセットアップには同梱されているqudoコマンドを利用します。次のように実行すると、Qudoで利用するスキーマを手元のデータベースにセットアップできます。

$ qudo --db qudo_test --user root\
   --pass password -f-rdbms mysql

なお--dry_runオプションを付けて実行すると、実際にデータベースにスキーマをセットアップせず画面にスキーマ情報を表示できるので、リモートのデータベースに対してセットアップしたい場合は--dry_runオプション付きで表示された情報を使ってセットアップしてください。

Qudoを使ってアプリケーションからジョブを登録するには次のようにします。

#! /usr/bin/perl
use strict;
use warnings;
use Qudo;

my $qudo = Qudo->new(
    databases => [+{
        dsn => 'dbi:mysql:qudo_test',
        username => 'root',
        password => '',
    }],
);
$qudo->enqueue("MyWorker", {
    arg => 'arg',
    uniqkey => 'uniqkey'
});

登録したジョブを処理するワーカは次のようになります。

package MyWorker;
use strict;
use warnings;
use parent 'Qudo::Worker';

sub work {
    my ($self , $job ) = @_;
    my $job_arg = $job->arg();
    # ここでワーカにジョブを処理させる
    $job->completed(); # or $job->abort
}
1;

作成したワーカを使って実際にジョブを処理させるには次のようにします。

#! /usr/bin/perl
use strict;
use warnings;
use Qudo;

my $qudo = Qudo->new(
    databases => [+{
        dsn => 'dbi:mysql:qudo_test',
        username => '',
        password => '',
    }],
    manager_abilities => [qw/MyWorker/],
);
$qudo->work();

基本的な使い方はTheSchwartz と同じなので、TheSchwartzを使ったことがある人はそれほど迷わず利用できます。

QudoではデフォルトでDBIx::Skinnyという筆者が作成したO/Rマッパをバックエンドで利用しているため、DBIで直接SQLを実行するよりもパフォーマンスが悪くなることがあります。そのようなときは星野将氏が作成したQudo::Driver::DBIをインストールし利用すれば、それだけでパフォーマンスを改善できます。

Qudo::Driver::DBI を利用するには次のようにdriver_classを指定するだけです。

#! /usr/bin/perl
use strict;
use warnings;
use Qudo;

my $qudo = Qudo->new(
    driver_class => 'DBI',
    databases => [+{
        dsn => 'dbi:mysql:qudo_test',
        username => '',
        password => '',
    }],
);

QudoではDriverクラスを任意のクラスに差し替えることが可能なので、よりパフォーマンスに特化させたDriverクラスを作成し利用することもできます。

Qudo::Workerの設定

Qudoでは複数のプロセスが同一のジョブを処理しないようにジョブのロックを行います。Qudoのジョブロックの方法は単純で、まずワーカは50件ほどまとめてデータベースからジョブキューのリストを取得します。そして取得したジョブキューの上から順番にUPDATEをかけていきます。

UPDATEのかけ方としては、ジョブレコードのgrabbed_untilというカラムに現在のUNIXタイムをSETし、WHERE句に取得したジョブレコードがもともと持っているgrabbed_untilの値を付け加えます。こうすることで複数のワーカが同一のジョブをロックしようとしてUPDATEをかけても、UPDATEが成功するのは1つのワーカだけとなることをMySQLなどのデータベースが保証してくれます。

こうしてロックが取得できたワーカはジョブを処理し、ロックが取得できなかったワーカは再度リストを上から順番にUPDATEをかけ、ロックが取得できるまでこれを続けます。

しかし、ワーカがジョブを取得し処理を開始したものの、エラーが発生してジョブを正常に処理できなかったらどうなるのでしょうか? Qudo::Workerの設定でgrab_forというのがあります。grab_forには、ジョブを処理するのにかかるであろう時間を秒単位で設定します。このgrab_forで設定した秒数以上経過してもジョブがデータベースに残ってしまっている場合、Qudoはもう一度そのジョブを処理する対象として扱います。grab_forのデフォルトは1時間です。grab_forの設定は次のように行います。

package MyWorker;
use strict;
use warnings;
use parent 'Qudo::Worker';

sub grab_for { 30 } # grab_for 30 sec

sub work {
    my ($self , $job ) = @_;
    ...
}
1;

エラーが発生した場合はデータベースにジョブが残ってしまうので、このしくみでgrab_forで設定されている時間が経過したあとに再度ジョブが処理されることとなります。

ジョブのリトライ

ワーカクラスで処理中にエラーが発生しても、一時的なエラーなのでリトライすれば処理可能なことが多々あります。そういった場合は、ワーカクラスにリトライ設定を追加するだけで自動で処理をリトライさせることができます。リトライ設定はmax_retriesというワーカクラスのメソッドをオーバーライドし、リトライ回数を設定します。Qudoのデフォルトではリトライせず処理を終了させます。

package MyWorker;
use strict;
use warnings;
use parent 'Qudo::Worker';

sub max_retries { 1 }

sub work {
    my ($self , $job ) = @_;
    ...
}
1;

リトライは、即時行ってほしい場合と、一定時間経過後に行ってほしい場合とがあります。リトライの間隔を制御するにはretry_delayというメソッドをオーバーライドし、次回処理をさせるまでの待ち時間を設定します。デフォルトは即時リトライです。

package MyWorker;
use strict;
use warnings;
use parent 'Qudo::Worker';

sub max_retries { 1 }
sub retry_delay { 60 } # wait 60 sec

sub work {
    my ($self , $job ) = @_;
    ...
}
1;

エラーの確認方法

ワーカで設定したリトライ回数以上のエラーが発生した場合、そのジョブは正常に処理できなかったとしてデータベースのジョブリストから削除されます。発生したエラーの情報はデータベースのexception_logテーブルにすべて記録されています。

エラーが発生したときのオペレーションとしては、exception_logテーブルのmessageカラムにエラー情報が格納されているのでそれをチェックします。そしてもう一度リトライさせればよさそうな場合は、enqueue_from_failed_jobというメソッドを使ってもう一度データベースにジョブを登録できます。

#! /usr/bin/perl
use strict;
use warnings;
use Qudo;

my $qudo = Qudo->new(
    databases => [+{
        dsn => 'dbi:mysql:qudo_test',
        username => '',
        password => '',
    }],
);
my $exceptions = $qudo->exception_list;
my ($db, $exception) = each %$exceptions;
$manager->enqueue_from_failed_job(
    $exceptions->[0], $db
);

おすすめ記事

記事・ニュース一覧