Python 3.0 Hacks

第5回multiprocessingモジュールによるプロセス間通信

multiprocessingモジュール

multiprocessingモジュールは、Python2系列では2.6以降、3系列では3.0以降に標準となったモジュールです。このモジュールはthreadingモジュールに似たAPIでプロセス間通信などの機能を提供します。このモジュールにより、GILの問題を回避することができ、複数のCPUやCPUコアの性能を生かすことができます。また、このモジュールはローカルのみならず、リモートでのプロセス間通信も行うことが可能で、簡単に分散処理などを実装することができます。

まず、GIL(Global Interpreter Lock)とはPythonのインタプリタ上で一度に1つのスレッドだけが動作するよう保証するためのロックです。このロックによって、同時に同じメモリにアクセスするスレッドが存在しないことを保証します。

しかしながら、このロックによって、一度に1つのスレッドしか実行されなくなるので、デュアルコアのCPUなどのリソースを最大限生かすことができなくなっています。そこで、プロセスを分けて、GIL問題を回避しようというのが、このmultiprocessingモジュールです。

実際にmultiprocessingモジュールを使ったプログラムが速くなるのかどうか簡単なプログラムで比較してみました。

リスト1リスト2は共に、10000000までカウントアップするプログラムです。リスト1はthreadingモジュール、リスト2はmultiprocessingモジュールで実装しています。また、プログラムの最後にそれぞれのカウントアップに要した時間を表示しています。

リスト1
# -*- coding: utf-8 -*-
from time import time
from threading import Thread

def f():
    '''カウントアップ
    '''
    i = 0
    while i > 10000000:
        i = i + 1

def ThreadTest():
    lst = []

    # スレッドの生成
    for i in range(4):
        lst.append( Thread(target=f) )
    
    start = time()

    # スレッドの開始
    for t in lst:
        t.start()

    # スレッドが終了するまで待つ
    for t in lst:
        t.join()

    finish = time()

    print('threding:', finish - start)

if __name__ == '__main__':
    ThreadTest()
リスト2
# -*- coding: utf-8 -*-
from time import time
from multiprocessing import Process

def f():
    '''カウントアップ
    '''
    i = 0
    while i > 10000000:
        i = i + 1

def ProcessTest():
    lst = []

    # プロセスの生成
    for i in range(4):
        lst.append( Process(target=f) )

    start = time()

    # プロセスで実行
    for t in lst:
        t.start()

    # プロセスが終了するまで待つ
    for t in lst:
        t.join()

    finish = time()

    print('multiprocessing:', finish - start)

if __name__ == '__main__':
    ProcessTest()
実行結果
~% python3.0 list1.py
threading: 5.51137304306
~% python3.0 list2.py
multiprocessing: 2.44629502296

実行結果はデュアルコアのCPUを乗せたマシンでの結果です。

リスト1を実行した際にはCPUの利用率が100%まで行かず、CPUリソースを100%使いきっていませんでした。また、リスト2を実行した際にはCPUの利用率が100%まで行っており、CPUを有効に使っていました。その他、threading版はmultiprocessing版の2倍以上の時間差があることから、PythonのインタプリタではGILの取得にかかるコストが大きいことがわかります。

マルチプロセス版map関数

Pythonにはiteratableなオブジェクトの要素に対して、それぞれ関数を適用するmap関数があります。これをマルチプロセスで行うものがmultiprocessingモジュールのPoolクラスに実装されています。

まず、Poolオブジェクトを作成し、使用するプロセスの上限を定義します。要素数の方がPoolで設定した数よりも大い場合は、設定した数以上のプロセスは生成されず、他の要素への処理が終わり次第、プロセスを使いまわして処理を行います。

リスト3はリストの要素の2乗を返すようなプログラムです。

リスト3
# -*- coding: utf-8 -*-
from multiprocessing import Pool

def f(x):
    return x*x

def MapTest():
    # プロセスのプールを作成
    p = Pool(4)

    # 要素に関数を適用して、結果を表示する
    print( p.map(f, range(10)) )

if __name__ == '__main__':
    MapTest()
実行結果
~% python3.0 list3.py 
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

リモートでのプロセス間通信

multiprocessingモジュールはプロセス間の処理だけではなく、リモートでのプロセス間通信もサポートしています。

リスト4ではプロセス間で共有するQueueオブジェクトを作成し、それを取得するためのメソッドを定義しています。

リスト4
# -*- coding: utf-8 -*-
from multiprocessing.managers import BaseManager
from threading import Thread
from multiprocessing import Queue
from time import sleep

queue = Queue()

def watch():
    '''ワーカから送られたホスト名を表示する
    '''
    while 1:
        while not queue.empty():
            print(queue.get())
        sleep(1)

def StartManager():
    # マネージャとなるクラスの定義
    class QueueManager(BaseManager): pass

    # メソッドを定義する
    QueueManager.register('get_queue',callable=lambda:queue)

    # サーバの設定
    m = QueueManager(address=('',8001),authkey=b'a')

    t = Thread(target=watch)
    t.start()

    # サーバを開始する
    server = m.get_server()
    server.serve_forever()

if __name__ == '__main__':
    StartManager()

リスト4のマネージャに対して接続し、処理を行うプログラムがリスト5です。リスト5のプログラムはマネージャのQueueに対して自分のホスト名をputして終了します。また、IPアドレスが「192.168.1.1」となっている部分は、リスト4を実行しているマシンのアドレスに変えてください。

リスト5
# -*- coding: utf-8 -*-
from multiprocessing.managers import BaseManager
from time import sleep
from socket import gethostname

if __name__ == '__main__':
    '''マネージャのQueueにホスト名を入れる
    '''
    # マネージャとなるクラスの定義
    class QueueManager(BaseManager): pass

    # メソッドを定義する
    QueueManager.register('get_queue',callable=lambda:queue)

    # サーバの設定
    m = QueueManager(address=('192.168.1.1',8001),authkey=b'a')

    # サーバに接続
    m.connect()

    # サーバのQueueを取得する
    queue = m.get_queue()

    # Queueにホスト名を入れる
    queue.put( gethostname() )

リスト4を実行した後に、リスト5を実行します。リスト5が終了した後に、リスト4の実行にリスト5を実行したホストの名前が表示されれば成功です。

このように、簡単にリモートのプロセス間でのオブジェクトの共有を行うことができます。

おわりに

multiprocessingモジュールを使うと、簡単にプロセス間通信や分散処理などを実装することができます。今までスレッドを使っても思うように速度が出ないとか思っていた人は、挑戦してみる価値があるのではないでしょうか?

multiprocessingモジュールは非常に多くの機能を提供しています。multiprocessingが提供するこの他の機能についてはリファレンスをご覧いただければと思います。

おすすめ記事

記事・ニュース一覧