Ruby Freaks Lounge

第29回 Reactorで非同期処理をやってみよう(1)

この記事を読むのに必要な時間:およそ 2 分

はじめに

WebサービスのAPIをコールするような,ネットワークを介した通信処理は,今日では頻繁に行われています。

ローカルマシンのみで完結する処理と比べると,通信が必要な処理は多大な時間が必要になります。相手サーバへの接続,相手サーバ側での処理,相手サーバからの受信など,何もすることなくただ待つだけの時間が存在します。

この無駄な時間の間に他の処理ができるならば,トータルの処理時間を大幅に短縮することが可能になります。これを実現するためにスレッドがよく使われています。しかしマルチスレッドプログラミングはいろいろと注意を払う点も多く,使いにくさを感じている方も多いのではないでしょうか。

今回はReactorパターンという,マルチスレッドとは違ったアプローチで非同期処理を実現してみたいと思います。

複数のwebサーバからHTML文章を取得してみる

同期処理

ひとまず非同期処理を忘れて,シーケンシャルに処理をしてみましょう。

非同期処理を採用する理由は,何も処理をしないでただ待っている無駄な時間を有効利用するためでした。逆に考えると,無駄な時間が存在しない(相手サーバが瞬時に処理を返す)ならば,同期的な処理がもっとも処理時間を短くできます。

リスト1 ひとつずつ同期的に取得

hosts.each do |host|
  sock = TCPSocket.new(host, 80)
  sock.write(request)
  sock.read
  sock.close
end

しかし,現実には無駄な時間は必ず発生します。上記のコードでは,ブロック内のすべてのメソッドコールで発生するでしょう。ひとつのサーバでの遅延は後続の処理の開始を遅らせる結果となり,それが積もり積もって全体の処理時間に無視できない影響を与えます。

マルチスレッド処理

次に処理時間を短縮するため,スレッドを使って並列的に処理を行ってみます。

本当にすべての処理が並列に動くとしたら,一番重いサーバの処理時間が全体の処理時間と等しくなります。実際はすべての処理が同時に並列に動くわけではないし,コンテキストスイッチのコストもあるため,処理時間はもう少しかかることになります。それでも,同期処理とは比較にならないほど高速に処理されるはずです。

リスト2 スレッドを使用

threads = hosts.map do |host|
  Thread.new(host) do |h|
    sock = TCPSocket.new(h, 80)
    sock.write(request)
    sock.read
    sock.close
  end
end
threads.each{|t| t.join}

Reactorパターンを使う

writeやreadのメソッドでの無駄な時間を減らすためにはどうしたらよいでしょう。

一番無駄なのが,相手サーバからデータが届いていないのにreadメソッドでずっと待つことです。データが読める状態になってはじめてreadメソッドをコールすれば,無駄な時間を短縮できることになります。

一番最初に書き込み可能になったソケット,また読み込み可能になったソケットに対して,それぞれwrite,readメソッドを呼んであげるために,ここではselectメソッドを使用します。selectメソッドは,対象のソケットが書き込み可能,または読み込み可能になるまで待機します。用意のできたソケットが出てきたとき,それを通知してくれます。イベントが発生してから処理を行うため,イベント駆動,イベントループなどとも呼ばれています。

リスト3 Reactorパターンで非同期処理を行う

# ①
write_socks = hosts.map do |host|
  TCPSocket.new(host, 80)
end
read_socks = []

# ②
write_proc = lambda{|sock|
  sock.write(request)
}

# ③
read_proc = lambda{|sock|
  sock.read
  sock.close
}

# ④
until (write_socks + read_socks).empty?

  # ⑤
  r_socks, w_socks, e_socks = IO.select(read_socks, write_socks)

  # ⑥
  if ws = w_socks.first
    write_proc.call(ws)
    read_socks << ws
    write_socks.delete(ws)
  end

  # ⑦
  if rs = r_socks.first
    read_proc.call(rs)
    read_socks.delete(rs)
  end
end

他のアプローチと比べると,コードが長くて複雑に見えるかもしれません。ひとつひとつ追ってみましょう。

①の部分では,書き込み対象のソケット群と読み込み対象のソケット群を用意しています。webサーバからHTMLファイルを取得するときは,まずリクエストを送信する必要があります。そのため,ここではリクエストを送信するソケットを,サーバーの数分だけ用意しています。一方,読み込みが可能なソケットはまだないはずなので,空配列になります。

②,③の部分では,書き込み可能,または読み込み可能となったソケットに対する処理を記述しています。書き込み可能となったソケットに対してはリクエストを送信します。読み込み可能となったソケットからはレスポンスを受信し,そのソケットをクローズしています。このように,イベントが発生した際に呼ばれる処理をコールバックと呼びます。

④のブロックがイベントループにあたります。書き込みソケット群,および読み込みソケット群が空になるまでこのループの中の処理を行います。

⑤では,selectを使って,対象ソケットが読み込み可能,または書き込み可能になるのを待っています。いずれかのソケットでイベントが発生すると,処理は⑥以降へと進みます。

⑥では書き込み可能状態となったソケットに対して処理を行います。②で定義したコールバック処理を実行し,その後で対象ソケットを書き込みソケット群から読み込みソケット群に移動させています。こうすることで,送信が完了したソケットは相手のサーバからのレスポンスを待つことができるようになります。

⑦では,読み込み可能状態となったソケットに対して処理を行います。③で定義したブロックをコールし,その後で対象ソケットを読み込みソケット群から除外しています。相手サーバからのレスポンスを受け取ったこのソケットはもう仕事を終えているため,これ以上イベント待ちする必要はありません。

著者プロフィール

おおいしつかさ

株式会社カカクコム所属。食べログのプログラマ。
iPhoneアプリの開発も担当。

URLhttp://www.kaeruspoon.net/

コメント

コメントの記入