Perl Hackers Hub

第10回ジョブキューで後回し大作戦―TheSchwartz、Qudo、Q4M(3)

Q4M―MySQLを利用したジョブキュー

今まではPerlで作られたミドルウェアとしてのジョブキューを紹介してきましたが、最後にMySQLのプラグインとして提供されるジョブキューQ4Mを紹介します。Q4Mは本連載第6回UNIXプログラミングの勘所を執筆した奥一穂氏によって作られています。MySQLには依存してしまいますが、利用するプログラミング言語には依存しません。MySQLに接続できるどのプログラミング言語からも利用できます。また、TheSchwartzやQudoのようにミドルウェア側のデータスキーマの制限を受けることがないのも魅力の一つでしょう。

Q4Mの使い方

Q4Mのインストール方法はドキュメントで確認してください。

Q4Mをインストールしたら次はジョブキューで使用するテーブルを定義します。TheSchwartzやQudoでは決められたテーブル定義を使用する必要がありましたが、Q4Mは好きなテーブルを定義し利用できます。今回は次のようなテーブルを定義したものとします。

CREATE TABLE welcome (
    nickname       varchar(255) NOT NULL,
    email          varchar(255) NOT NULL,
    created_on int(10) unsigned NOT NULL
) ENGINE=QUEUE DEFAULT CHARSET=utf8;

通常のテーブル定義と異なるのはENGINEにQUEUEを使っている点です。Q4MをMySQLにインストールするとQUEUEエンジンが利用できるようになり、このエンジンで定義されたテーブルを使うことでジョブキューを管理できるようになります。

ジョブの登録

TheSchwartzやQudoではジョブの情報をシリアライズしてデータベースのカラムに格納していましたが、Q4Mでは自由なテーブル定義を行うことができるのでシリアライズを考える必要がありません。ジョブを登録するには、INSERTでレコードを登録するだけです。

mysql> INSERT INTO welcome(nickname,email,created_on)
VALUES('nekokak','nekokak@gmail.com',UNIX_TIMESTAMP());

登録したジョブは、通常のMySQLのレコードと同様にSELECT文を使うだけで参照できます。

mysql> SELECT * FROM welcome;
+----------+-------------------+------------+
| nickname | email             | created_on |
+----------+-------------------+------------+
| nekokak  | nekokak@gmail.com | 1307539020 |
+----------+-------------------+------------+

実際のアプリケーションではDBIを使ってwelcomeテーブルにINSERTを実行します。

#! /usr/bin/perl
use strict;
use warnings;
use DBI;

my $dbh = DBI->connect('dbi:mysql:q4m_test','root','');
$dbh->do(q{
    INSERT INTO welcome(nickname,email,created_on)
    VALUES(?,?,UNIX_TIMESTAMP())
}, undef, 'nekokak', 'nekokak@gmail.com');

ジョブの取り出し

次にジョブを取得します。ジョブを取得するにはqueue_waitというfunctionに、対象となるキューテーブル名を指定しSELECTを実行します。

mysql> SELECT queue_wait('welcome');
+-----------------------+
| queue_wait('welcome') |
+-----------------------+
|                     1 |
+-----------------------+

成功すると結果として1が返ってきます。この状態をオーナーモードと呼びます。オーナーモード中に対象テーブルをSELECTすると1レコードだけ取得できるようになります。

mysql> SELECT * FROM welcome;
+----------+-------------------+------------+
| nickname | email             | created_on |
+----------+-------------------+------------+
| nekokak  | nekokak@gmail.com | 1307539020 |
+----------+-------------------+------------+

ジョブの終了

ジョブワーカはこれで取得できた情報をもとに処理を実行します。ジョブワーカの処理が正常に終了したらキューを終了させるコマンドを実行します。

mysql> SELECT queue_end();
+-------------+
| queue_end() |
+-------------+
|           1 |
+-------------+

これにより同時にジョブレコードが削除され、オーナーモードが解除されます。もしジョブワーカの処理が異常終了した場合は、

mysql> SELECT queue_abort();
+---------------+
| queue_abort() |
+---------------+
|             1 |
+---------------+

を実行することでオーナーモードを解除します。queue_abort()した場合はジョブレコードは削除されずそのままデータベースに残ります。基本的にはこの繰り返しでジョブを処理するプログラムを自分で書きます。自分で書くと言うと難しく感じますが、基本的にはSQLのINSERTやSELECTを実行するだけなので、難しいものではありません。

Q4Mを使った簡単なサンプルプログラムは次のようになります。

#! /usr/bin/perl
use strict;
use warnings;
use DBI;

my $dbh = DBI->connect('dbi:mysql:q4m_test','root','');
while (1) {
    my $lock = $dbh->do("SELECT queue_wait('welcome')")
    if ($lock) {
        my $queue = $dbh->do('SELECT * FROM welcome');
        # ここでジョブの処理をさせる
        $dbh->do('SELECT queue_end()');
    }
}

ジョブのロック時間の調整

queue_waitを実行するときの第2引数で、ジョブをロックするまでの待ち時間を秒単位で決めることができます。

mysql> select queue_wait('welcome',10);
+--------------------------+
| queue_wait('welcome',10) |
+--------------------------+
|                        0 |
+--------------------------+

指定秒数内にキューからデータを取得できない場合は、諦めて処理を中断することもできます。

ジョブキューを使う際の注意点

ここからはジョブキューを使ったプログラムを書くうえでの注意点を取り上げます。

シグナルセーフなワーカの作成

ワーカは基本的に無限ループで動き続け、新しいジョブが登録されるたびに実際の処理が行われます。動き続けるワーカにはいくつかの問題があります。

まず第一に、ワーカプログラムを修正したときは、基本的には現在起動中のワーカを一度停止する必要があります。その際、何も考えずにプロセスを停止してしまうと、もしかしたらワーカがジョブを処理中で中途半端にデータを更新した状態でジョブが強制終了されてしまい、データに不整合が発生してしまうかもしれません。そんなことにならないように「シグナルトラップ」という方法を使って安全にワーカを停止させるようにしましょう。

第二に、ワーカプログラム中にメモリリークが存在する場合は、プロセスが専有するメモリが処理ごとに増大し大幅な性能劣化を引き起こすことがあります。メモリリークは当然解決すべき問題ですが、使用しているライブラリすべてをチェックするのはかなり難しいのが現実です。そこでプロセスが一定回数処理したら処理プロセスを自動で再起動するようにすることで、定期的にメモリを解放しメモリリークによる大きな問題を回避します。

シグナルを考慮したワーカを作成するにはParallel::Preforkがたいへん便利です。Parallel::Preforkではmax_workersで指定した数だけプロセスをforkし、ワーカを複製して管理してくれる機能もあるのでワーカプロセス全体の管理も楽になります。

Parallel::Preforkを使ったワーカプログラムは次のようになります。次の例ではさらに、一定回数処理を行うと子プロセスを終了させ、メモリリークによる問題をなるべく回避できるようにしています。

#! /usr/bin/perl
use strict;
use warnings;
use Parallel::Prefork;

my $pm = Parallel::Prefork->new(
    +{
        max_workers => 5,
        trap_signals => +{
            TERM => 'TERM',
            HUP => 'TERM',
            USR1 => undef,
        },
    }
);
my $max_requests_per_child = 50;
while ( $pm->signal_received ne 'TERM' ) {
    $pm->start and next;
    my $i=0;
    while ( $i++ < $max_requests_per_child ) {
        # ここでワーカの処理を起動させる
    }
    $pm->finish;
}
$pm->wait_all_children;

処理を停止させたいときはプロセスに対しTERMシグナルを送ります。シグナルトラップにより、Parallel::Preforkがワーカを安全に停止します。

ただしQ4Mを使っている場合は、上記コードではうまくシグナルハンドリングできない場合があります。Q4Mを使う場合、DBI経由でqueue_waitなどを実行してキュー待ちするのですが、DBIなどのXSレベルでの処理中では通常のシグナルハンドリングでは即座に処理を中断できません。ですのでQ4Mを使う場合に正しくシグナルハンドリングした状態でワーカを動かすには、Sys::SigActionを利用します。

#! /usr/bin/perl
use strict;
use warnings;
use Sys::SigAction qw(set_sig_handler);
use Parallel::Prefork;

my $pm = Parallel::Prefork->new(
    +{
        max_workers => 5,
        trap_signals => +{
            TERM => 'TERM',
            HUP => 'TERM',
            USR1 => undef,
        },
    }
);
my $max_requests_per_child = 50;
while ( $pm->signal_received ne 'TERM' ) {
    $pm->start and next;
    my $i = 0;
    my $is_stop=0;
    my $h = set_sig_handler(
        'TERM',
        sub {$is_last = 1},
    +{flags => SA_RESTART}
    );
        while ( $i++ < $max_requests_per_child ) {
        # ここでワーカの処理を起動させる
        last if $is_stop;
    }
    $pm->finish;
}
$pm->wait_all_children;

これでどんなタイミングでも安心してワーカプロセスを再起動することが可能となり、ついでに簡単にメモリリーク対策を行うことができます。

キュー数の監視

ジョブキューは非同期に処理されるためリアルタイム性がそこまで必要ないという話をしましたが、まったく処理されないのではさすがに困ってしまいます。そこでジョブキューがどれくらい処理されているのか、キューが溜まり過ぎていないかを監視する必要があります。ジョブキューでは重い処理を行うことが多いので、キュー数の監視をしていないと知らない間にもの凄い件数のキューが溜まってしまっていることも珍しくありません。基本的に今回紹介したTheSchwartz、Qudo、Q4Mはすべてデータベースにジョブを溜めているので、

SELECT COUNT(*) FROM $queue_table_name;

などでジョブ数のカウントを定期的にとり、キューが溜まり過ぎていないかを確認しましょう。

Webアプリケーションであれば体感的にパフォーマンスが劣化していることがわかりやすいのですが、ジョブキューの場合は意識的にチェックしないと見落としがちになってしまいます。ジョブキューを導入するにあたってキュー数の監視は必ず行うようにしましょう。

スケール方法

多くの処理をジョブキューで行うようになると、どうしてもキューが溜まりがちになり、処理が追いつかなくなってしまいます。ジョブキューもWebアプリケーションと同様にスケールアウトを考えた設計を行うとよいでしょう。

まず、登録されるジョブキューの数に対して処理が追いつかなくなるケースでは、複数のサーバでワーカを起動しても問題ないような作りにすることです。複数のサーバでワーカを動かせるようになると、それだけで多くのキューを処理できます。

次にボトルネックになるのはデータベースでしょう。TheSchwartz、Qudoはそれ自体で複数のデータベースを扱える設計となっています。Q4Mの場合も複数のデータベースを利用すること自体は問題ありません。ただQ4Mの場合はTheSchwartz、Qudoとは異なりただのMySQLの1ストレージなので、複数のデータベースを扱う処理を自分で実装する必要があります。

そこでお勧めなのが、ローカル環境にmydns[1]を立てデータベースを名前ベースでアクセスできるように設定し、DNSラウンドロビンを用いて参照先を分散させる方法です。

ローカル環境にDNSを設置しDNSラウンドロビンを利用すれば、アプリケーションプログラムはデータベースが複数台あることを一切気にすることなくデータベースの構成をスケールさせることができます。またmydnsを利用したDNSラウンドロビンではラウンドロビンに重み付けが利用できるので、データベースのスペックの違いをラウンドロビンの重み付けで解消することもできます。筆者が所属するDeNAではmydnsによるDNSラウンドロビンをよく利用しています。

おわりに

個人的にはカジュアルにジョブキューを実現させるにはQ4MよりもTheSchwartzやQudoを利用することをお勧めします。理由としては特定のデータベースに依存せずジョブキューフレームワークとして利用できるため、フレームワークの作法さえ覚えてしまえば簡単に導入可能だからです。今回は紙幅の都合で各ジョブキューミドルウェアの突っ込んだ使い方は説明できませんでしたが、今までジョブキューを使ったことのない人が利用するための足がかりになれれば幸いです。

次回の執筆者はオペレーションエンジニアエバンジェリストこと長野雅広(kazeburo)さんで、テーマは「ログ」です。お楽しみに!

おすすめ記事

記事・ニュース一覧