Perl Hackers Hub

第22回 Coroを使ったやさしいクローラの作り方(3)

この記事を読むのに必要な時間:およそ 6.5 分

スレッド間の連携を行う

クローラに必要な処理を複数のスレッドに分担させ,それらをCoro::Channelを使って連携します。Coro::current->descを使って役割に応じてスレッドに名前を付けておくと,デバッグの際に便利です。URLを受け取ってHTTPレスポンスを取得するためのfetcher(1))⁠取得したレスポンスを解析するためのparser(2))⁠解析した結果をファイルなどに書き出すためのupdater(3)という3種類のスレッドを動かしています。

もしこれが,それぞれcallback方式で記述されていた場合にはどうなるでしょうか。

リスト1 CoroのSemaphoreとChannelを使った典型的なクローラのためのコード(coro_crawler.pl)

use strict;
use Coro;
use Coro::Channel;
use Coro::Semaphore;
use Coro::Timer;
use URI;
use FurlX::Coro;
use Web::Query;
use Try::Tiny;

my @done;
my @fail;
sub done { push @done, $_[0] }
sub fail { push @fail, $_[0] }

my %queue;

sub queue {
    my $name = shift;
    $queue{$name} ||= Coro::Channel->new;
}

sub logger {
    my $msg = shift;
    # 時刻と現在のスレッド名を出力
    warn localtime . sprintf ": %s %s\n", Coro::current->desc, $msg;
}

# 全体の同時接続数制限
my $global_lock = Coro::Semaphore->new(5);
sub global_lock {
    $global_lock->guard;
}

# ホストごとの接続数制限
my %lock;
my $use_sleep = 1;
sub host_lock {
    my $url = shift;
    my $host = URI->new($url)->host;
    my $sem = $lock{$host} ||= Coro::Semaphore->new(1);
    # 最後の接続から一定時間ウェイトを入れる
    if ($use_sleep && $sem->count == 0) {
        my $guard = $sem->guard;
        Coro::Timer::sleep 3;
        return $guard;
    }
    $sem->guard;
}
(1)
sub fetcher {
    my $url = queue("fetch")->get;
    my $lock = host_lock($url);
    my $glock = global_lock();
    my $ua = FurlX::Coro->new;
    logger($url);
    my $res = $ua->get($url);
    queue("parse")->put([$url, $res]);
}
(2)
sub parser {
    my $data = queue("parse")->get;
    my ($url, $res) = @$data;
    logger($url);
    # タイトルを抜き出す場合
    my $title = Web::Query->new_from_html($res->content)
                ->find("title")->text;
    queue("update")->put([$url, $title])
}
(3)
sub updater {
    my $data = queue("update")->get;
    my ($url, $res) = @$data;
    logger($url);
    warn $res;
    done($res);
}
sub create_worker {
    my ( $name, $code, $num ) = @_;
    for ( 0 .. $num ) {
        my $desc = $name . "_" . $_;
        async_pool {
            Coro::current->desc($desc);
            while (1) {
                try {
                    $code->()
                } catch {
                    warn $_;
                    fail($_);
                }
            }
        }
    }
}

create_worker( fetcher => \&fetcher, 1000 );
create_worker( parser => \&parser, 1 );
create_worker( updater => \&updater, 1 );

# 取得するURL
my @list = qw(
    http://localhost/1
    http://localhost/2
    http://local.example.com/2
    http://local.example.com/3
);

my $stop_flag = 0;
my $force_exit = 0;
my $total = scalar @list;
my $main = Coro::current;

# シグナルを受け取って「終了」フラグを立てる
$SIG{INT} = sub {
    if ($stop_flag == 1) { $force_exit = 1 }
    $stop_flag = 1;
    if ($force_exit) {
       die "exit";
    }
};

# manager
async {
    while (1) {
        Coro::Timer::sleep 1;
# 作業途中のデータが失われないようにするとよい
# 新規のジョブは受け取らないようにして
# 実行中のジョブが終了するのを待つ,など
        if ($stop_flag) {
            warn "signal recieved!!! ";
            async {
                Coro::Timer::sleep 10;
                $stop_flag = 0
            }
        }
        warn sprintf "Task: %s/%s Fail: %s",
        scalar @done, $total, scalar @fail;
        my $done = scalar @done + scalar @fail;
        if ($done == $total) {
            warn "All task done!";
            $main->ready;
        }
    }
};

queue("fetch")->put($_) for @list;
schedule;



http_fetch($url, sub {
    my ($url, $res) = @_;
    parse_response($res, sub {
        my $res = shift;
        update_databse($res, sub {
            warn "done!"
        })
    })
});

こんな具合にネストが深いものになってしまうでしょう。もちろんコーディングテクニックで解消できる部分も多くありますし,シンプルなワーカをキューを使って連携するというふうにすれば,AnyEventを使う場合でも見通しの良いコードになるでしょう。

複数のセマフォを組み合わせて使う

ある程度大きなクローラを書くことになると,複数の制限を加えることになるでしょう。⁠生成するCoroの最大数」⁠ホストごとの最大接続数」⁠プロセス中の全体での最大同時接続数」をそれぞれセマフォを使って制限します。セマフォによって何も処理しない休止中のスレッドが出てくるので,CoroのスレッドをHTTPクライアントの同時接続数より多く作り待機させます。

最後の接続から一定秒数のウェイトを入れるような処理も,単にCoro::Timer::sleepを使って実装できます。条件が整うまではそのスレッドを一時停止して別のスレッドに切り替わるといった,AnyEventでは書くことが難しい処理でもすんなりと直感的に書けるのがCoroの良いところです。Coroのスレッド作成はforkで子プロセスを作るよりも非常に高速,省メモリであるため,100や200程度であれば気軽に作成できます。

著者プロフィール

mala(マラ)

NHN Japan所属。livedoor Readerの開発で知られる。JavaScriptを使ったUI,非同期処理,Webアプリケーションセキュリティなどに携わる。

Twitter:@bulkneets