並列・分散処理のためのクローラ
ある程度大規模なクローラを作る場合は、
Perlで高性能なクローラを書く場合の選択肢は2つあります。一つはforkによるマルチプロセスによるもので、
以降では、
AnyEventとCoroの違い
Coroを使うと複雑な非同期処理を、
use AE;
my $i = 0;
my $cv = AE::cv;
# 1秒ごとに、指定した関数を実行せよ
my $watcher = AE::timer 0, 1, sub {
warn $i++;
$cv->send("done") if $i >= 10;
};
# CV = condition variable が利用可能になるまで待機
warn $cv->recv;
use Coro;
my $main = Coro::current;
async {
my $i = 0;
while (1) {
# async で囲まれた部分だけがsleep によって1 秒止まる
Coro::Timer::sleep 1;
warn $i++;
last if ( $i > 10 );
}
# 次の切り替え時にメインスレッドに切り替わるようにスケジューラに指示
$main->ready;
};
schedule;
Coroで
AnyEventとCoroの使い分け
Coroは黒魔術ですので、
Coro::Timerの内部は次のようになっています。
sub sleep($) {
my $w = AE::timer $_[0], 0, Coro::rouse_cb;
Coro::rouse_wait;
}
内部ではAE::timer を使って指定秒数後にCoro::rouse_
そのためAnyEventとCoroの使い分けは、
Coroを使った典型的なクローラのひな型
Coroを使ってホストごとの接続数制限やウェイトを実装したクローラのサンプルはリスト1のようになります。実際には各処理は複数のクラスに分割したほうがよいですが、
Coroを使うメリットとして、
ホストごとの並列数を制限する
クローラを書くうえで、
たとえば同時接続数を4件に制限したい場合、
my $hosts = {};
sub task {
my $url = shift;
my $host = URI->new($url)->host;
# ホストごとのセマフォを呼び出す、なければ新しく作る
my $semaphore = $hosts->{$host} ||= Coro::Semaphore->new(4);
# 4 件以上呼ばれたら、その時点で別スレッドに切り替わる
my $guard = $semaphore->guard;
# 何らかの処理
...
}
上記のコードでは、
$semaphore->guardを使わずに、
詳しい使い方はperldoc Coro::Semaphore
を参照してください。
スレッド間の連携を行う
クローラに必要な処理を複数のスレッドに分担させ、
もしこれが、
use strict;
use Coro;
use Coro::Channel;
use Coro::Semaphore;
use Coro::Timer;
use URI;
use FurlX::Coro;
use Web::Query;
use Try::Tiny;
my @done;
my @fail;
sub done { push @done, $_[0] }
sub fail { push @fail, $_[0] }
my %queue;
sub queue {
my $name = shift;
$queue{$name} ||= Coro::Channel->new;
}
sub logger {
my $msg = shift;
# 時刻と現在のスレッド名を出力
warn localtime . sprintf ": %s %s\n", Coro::current->desc, $msg;
}
# 全体の同時接続数制限
my $global_lock = Coro::Semaphore->new(5);
sub global_lock {
$global_lock->guard;
}
# ホストごとの接続数制限
my %lock;
my $use_sleep = 1;
sub host_lock {
my $url = shift;
my $host = URI->new($url)->host;
my $sem = $lock{$host} ||= Coro::Semaphore->new(1);
# 最後の接続から一定時間ウェイトを入れる
if ($use_sleep && $sem->count == 0) {
my $guard = $sem->guard;
Coro::Timer::sleep 3;
return $guard;
}
$sem->guard;
}
(1)
sub fetcher {
my $url = queue("fetch")->get;
my $lock = host_lock($url);
my $glock = global_lock();
my $ua = FurlX::Coro->new;
logger($url);
my $res = $ua->get($url);
queue("parse")->put([$url, $res]);
}
(2)
sub parser {
my $data = queue("parse")->get;
my ($url, $res) = @$data;
logger($url);
# タイトルを抜き出す場合
my $title = Web::Query->new_from_html($res->content)
->find("title")->text;
queue("update")->put([$url, $title])
}
(3)
sub updater {
my $data = queue("update")->get;
my ($url, $res) = @$data;
logger($url);
warn $res;
done($res);
}
sub create_worker {
my ( $name, $code, $num ) = @_;
for ( 0 .. $num ) {
my $desc = $name . "_" . $_;
async_pool {
Coro::current->desc($desc);
while (1) {
try {
$code->()
} catch {
warn $_;
fail($_);
}
}
}
}
}
create_worker( fetcher => \&fetcher, 1000 );
create_worker( parser => \&parser, 1 );
create_worker( updater => \&updater, 1 );
# 取得するURL
my @list = qw(
http://localhost/1
http://localhost/2
http://local.example.com/2
http://local.example.com/3
);
my $stop_flag = 0;
my $force_exit = 0;
my $total = scalar @list;
my $main = Coro::current;
# シグナルを受け取って「終了」フラグを立てる
$SIG{INT} = sub {
if ($stop_flag == 1) { $force_exit = 1 }
$stop_flag = 1;
if ($force_exit) {
die "exit";
}
};
# manager
async {
while (1) {
Coro::Timer::sleep 1;
# 作業途中のデータが失われないようにするとよい
# 新規のジョブは受け取らないようにして
# 実行中のジョブが終了するのを待つ、など
if ($stop_flag) {
warn "signal recieved!!! ";
async {
Coro::Timer::sleep 10;
$stop_flag = 0
}
}
warn sprintf "Task: %s/%s Fail: %s",
scalar @done, $total, scalar @fail;
my $done = scalar @done + scalar @fail;
if ($done == $total) {
warn "All task done!";
$main->ready;
}
}
};
queue("fetch")->put($_) for @list;
schedule;
http_fetch($url, sub {
my ($url, $res) = @_;
parse_response($res, sub {
my $res = shift;
update_databse($res, sub {
warn "done!"
})
})
});
こんな具合にネストが深いものになってしまうでしょう。もちろんコーディングテクニックで解消できる部分も多くありますし、
複数のセマフォを組み合わせて使う
ある程度大きなクローラを書くことになると、
最後の接続から一定秒数のウェイトを入れるような処理も、
複数プロセスで共有するキューを作る
Coro::Channelを使うことで別のスレッドにデータを受け渡すことができますが、
RedisのLIST型を使う
Redisにはちょうどキューとして使えるデータ型とコマンドが存在しているので、
PerlからRedisにアクセスするためのCPANモジュールはいくつかありますが、
package Rq;
# Redis as Queue
use strict;
use Coro;
use AnyEvent::Redis;
our %REDIS_SERVER = (server => '127.0.0.1', port => 6379);
sub new {
my $class = shift;
my $name = shift;
my $self = {
name => $name,
redis => AnyEvent::Redis->new(%REDIS_SERVER),
};
bless $self, $class;
}
# LIST から取得
sub get {
my $self = shift;
while(1) {
$self->{redis}->blpop($self->{name}, 10, rouse_cb);
my $res = rouse_wait;
return $res->[1] if ($res);
}
}
# LIST に追加
sub put {
my ($self, $msg) = @_;
$self->{redis}->rpush($self->{name}, $msg);
}
1;
use strict;
use Rq;
use Coro;
use Coro::Timer;
my $queue = Rq->new("test_channel");
sub publish {
my $message = shift;
$queue->put($message);
}
my $broker = async {
Coro::current->desc("broker");
my $i = 0;
while (1) {
$i++;
Coro::Timer::sleep 1;
publish( "task: " . $i );
}
};
schedule;
use strict;
use Coro;
use Coro::Timer;
use Rq;
my $channel = Rq->new("test_channel");
async {
while(1) {
my $msg = $channel->get;
warn $msg;
}
};
# $channel->get がブロッキング中でもタイマーは動き続ける
my $timer = async {
while(1) {
Coro::Timer::sleep 1;
warn time;
}
};
schedule;
これで複数のプロセス間でRedisを介してメッセージの送受信ができるようになりました。Coro::Channelと違って文字列しか受け渡すことができないので、
一般的なメッセージキューのためのミドルウェアでは、
Coroを使うメリット
昨今は、
Coroのメリットは、
インメモリのキャッシュを活用する
また、
まとめ
本稿では、
さて、