Pythonにおける並行・並列処理について調べてみた

f:id:okiyasi:20200922031123p:plain 開発をしていると要求されている処理時間より時間がかかってしまうことがあり、処理を高速化しないといけない場面に遭遇すると思います。

その場合は、まずボトルネックとなっている処理を探し、ボトルネックとなっている場所のアルゴリズムやデータ構造の改善やライブラリの使用による処理の代替を行います。

しかし、それだけでは高速化できななかったり、もっと高速化しないと要件を満たせないことがあります。その時に一つの高速化の手段として並列処理や並行処理があります。 今回はその並列処理・並行処理について解説していきたいと思います。

並行処理と並列処理とは

まずは、並行処理と並列処理の定義です。

並行処理と並列処理は似たような言葉ですが違いがあります。細かな定義は人によってまちまちですが、以下のような違いがあります。

並行処理

システムが複数の動作を同時に実行状態に保てる状態にあること
わかりにくいので端的に言うと、2 つ以上のタスクが存在する時に、複数のタスク処理を高速に切り替えてあたかも同時に処理されているかのように見せることです。

並列処理

2 つ以上のタスクが存在する時、それらを実際に同時に処理すること。


これらの並列処理や並行処理を利用して、プログラムを高速化しますが、ここでボトルネックなっている処理の種類が重要となってきます。

ボトルネックとなる処理の種類

ボトルネックとなる処理は主に以下の2つの種類に分けることができます。

CPUバウンドな処理

数値計算のようにCPUに負荷をかけるようなような処理のことで、処理速度がCPUの計算速度に依存する処理です。

I/Oバウンドな処理

ファイルの読み書き、ネットワーク通信、DBへの接続などの処理のことで、ディスクの読み出しなど、主にCPUとは関係ないI/O部分に負荷がかかる処理のことです。


CPUバウンドな処理は並列処理により、複数のタスクを同時に処理することによって高速化することができます。(※並行処理では高速化できません。)

一方で、I/Oバウンドな処理はCPUはディスクの処理(I/O処理)が終わるまで待ち状態になります。そこでI/Oバウンドな処理では、並行処理により、その待ち時間の間にCPUが他のタスクをこなすことで高速化を実現します。(もちろん並列処理でも高速化できます。)

並列処理・並行処理を行う際には複数のタスク処理を実行する・実行できる状態にすることが必要になりますが、このタスク処理を行ってくれるものについて理解する必要があります。それがプロセススレッドです。

プロセスとスレッド

プロセス

プロセスとは、OSが実行しているプログラム(のインスタンス)のことです。

スレッド

スレッドとは、プロセス内の「実行単位」のことで、このスレッドを使用して、CPUのコアに命令を与えて計算処理を行っています。


おそらく、ざっくり過ぎてなんのことを言っているのかわからないかもしれないので、詳しくは こちらの記事を読んでいただけると幸いです。

このスレッドやプロセスを複数用意し、並列的に同時に動かすことによって、並列処理を実現し高速化します。

GoやJavaなどの言語ではメモリの節約の観点などから、プロセスよりも、スレッドを複数用意して並列処理を行うことで高速化を実現するのですが、Pythonの場合は少し勝手が違います。

Pythonにおけるマルチスレッド

マルチスレッドで処理を行う場合は基本的にスレッドセーフである必要があります。スレッドセーフとは複数のスレッドを並列的に使用しても問題が発生しないことを意味します。

具体的にはスレッドが複数存在していても共有しているデータに対してが一度に1つのスレッドのみがアクセスするようにしておくことで、処理中に他のスレッドにデータを上書きされたりするのを防ぎます。

しかし、Python(正確にはPythonの中でC言語で実装されている部分CPython)はスレッドセーフではありません

そこでPythonGIL(Global Interpreter lock)という排他ロックを使用することによってこの問題を回避しています。GILにより、Pythonインタプリタのプロセスは1スレッドしか実行できません。

つまり1つのプロセスに複数スレッドが存在してもロックを持つ単一スレッドでしかコードが実行できずに、その他のスレッドは待機状態になります。

そのためPythonにおけるマルチスレッドでは並列処理ではなく並行処理になってしまうため、CPUバウンドな処理ではマルチスレッドでは高速化が期待できません。むしろロックの切り替えのために遅くなる場合があります。

CPUバウンドな処理での速度の比較

本当にCPUバウンドな処理でマルチスレッドで処理を行っても速度が変わらないのか検証するために、大きな数値に対してfizz_buzzを行うCPUバウンドな処理のサンプルプログラムを用意します。 リストの中の5つの大きな値について1つずつ逐次的に、1からその数値までfizz_buzzしていき、最後にどのくらいの時間がかかったのかを出力しています。

逐次実行のサンプルプログラム

用意した5つ大きな数字に対してfizz_buzzを逐次的に行っています。

import time

def fizz_buzz(num: int):
    result_list = []
    for i in range(1, num + 1):
        result = ''
        if i % 3 == 0:
            result += 'fizz'
        if i % 5 == 0:
            result += 'buzz'
        if not result:
            result = str(i)
        result_list.append(result)
    return result_list


start = time.time()
num_list = [22000000, 19000000, 25000000, 24500000, 21300000]
for n in num_list:
    fizz_buzz(n)
stop = time.time()
print(f'Sequential processing: {stop - start:.3f} seconds')

実行結果
f:id:okiyasi:20200922005229p:plain

マルチスレッドによる実装

マルチスレッドを実現するための主にthread・threading・concurrent.futuresという3つのPythonのモジュールがあります。 基本的にはPython3ではthreadingかconcurrent.futuresを使います。今回は単純にマルチスレッドを実現したいだけなので一旦threadingを使用します。

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, num):
        super().__init__()
        self.__num = num

    def fizz_buzz(self, num: int):
        result_list = []
        for i in range(1, num + 1):
            result = ''
            if i % 3 == 0:
                result += 'fizz'
            if i % 5 == 0:
                result += 'buzz'
            if not result:
                result = str(i)
            result_list.append(result)
        return result_list

    def run(self):
        self.fizz_buzz(self.__num)


start = time.time()
threads = []
num_list = [22000000, 19000000, 25000000, 24500000, 21300000]
for n in num_list:
    thread = MyThread(n)
    thread.start()
    threads.append(thread)
for th in threads:
    th.join()
stop = time.time()
print(f'multi threads: {stop - start:.3f} seconds')

実行結果
f:id:okiyasi:20200922005357p:plain

全然早くなっていません。 むしろ、逐次的に実行した時よりも少しマルチスレッドで実装した場合の方がロックの切り替え分、2秒程度時間がかかっているのがわかります。

マルチプロセスによる実装

マルチプロセスにはGIL制約などは存在しないので、並行処理ではなく並列処理になります。 マルチプロセスにはmultiprocessingと先ほどスレッドでも登場したconcurrent.futuresのどちらかのモジュールを使用しますが、今回はmultiprocessingを使用します。

from multiprocessing import Process
import time

class MyProcessor(Process):

    def __init__(self, num):
        super().__init__()
        self.__num = num

    def fizz_buzz(self, num: int):
        result_list = []
        for i in range(1, num + 1):
            result = ''
            if i % 3 == 0:
                result += 'fizz'
            if i % 5 == 0:
                result += 'buzz'
            if not result:
                result = str(i)
            result_list.append(result)
        return result_list

    def run(self):
        self.fizz_buzz(self.__num)


start = time.time()
processes = []
num_list = [22000000, 19000000, 25000000, 24500000, 21300000]
for n in num_list:
    process = MyProcessor(n)
    process.start()
    processes.append(process)
for p in processes:
    p.join()
stop = time.time()
print(f'multi process: {stop - start:.3f} seconds')

実行結果
f:id:okiyasi:20200922005430p:plain

スレッドや逐次的に処理した場合よりもかなり早くなっていることがわかります。 CPUバウンドな処理に関してはマルチプロセスを使用することで高速化できることがわかりました。

I/Oバウンドな処理の速度比較

今度はI/Oバウンドな処理に対して速度を比較していきます。

逐次実行のサンプルプログラム

サンプルプログラムとしてwebページからコンテンツをダウンロードするプログラムを用意します。

import requests
import time

def download_site(url, session):
    with session.get(url) as response:
        count_list = []
        for i in range(len(response.content)):
            count_list.append(i)


def download_all_sites(sites):
    with requests.Session() as session:
        for url in sites:
            download_site(url, session)


sites = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice",
] * 80
start = time.time()
download_all_sites(sites)
stop = time.time()
print(f"Sequential processing: {stop - start:.3f} seconds")

実行結果
f:id:okiyasi:20200922023123p:plain

マルチスレッドによる実装

前回はCPUバウンドの時はthreadingを使用しましたが、今回はconcurrent.futuresを使用します。(そこまで変わらないと思いますがconcurrent.futuresの方がメジャーらしいです) ThreadPoolというスレッド数を一定に保ちながらスレッドを使い回してくれる機能を使います。スレッド数は5を指定しています。

import threading
import concurrent.futures
import requests
import time

thread_local = threading.local()


def get_session():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session


def download_site(url):
    session = get_session()
    with session.get(url) as response:
        len(response.content)


def download_all_sites(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_site, sites)


sites = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice",
] * 80
start = time.time()
download_all_sites(sites)
stop = time.time()
print(f"multi threads: {stop - start:.3f} seconds")

実行結果
f:id:okiyasi:20200922023106p:plain

今回はCPUバウンドの処理とは違い、格段に処理が高速化されています。スレッド数分だけ約5倍程度早くなっています。

これはI/O処理自体はGILの制約を受けないので処理が並列化されていて、尚且つCPU処理も並行化によってI/O待ち時間に次のCPU処理が行われているため早くなっています。(主にI/O処理の並列化が寄与しています)

マルチプロセスによる実装

マルチスレッド時のスレッド数と揃えるためにプロセスの数を5にしています。デフォルトではos.cup_count()の数だけProcessが起動されます。

import requests
import time
import multiprocessing

session = None

def set_global_session():
    global session
    if not session:
        session = requests.Session()


def download_site(url):
    with session.get(url) as response:
        multiprocessing.current_process().name
        len(response.content)


def download_all_sites(sites):
    with multiprocessing.Pool(processes=5, initializer=set_global_session) as pool:
        pool.map(download_site, sites)


sites = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice",
] * 80
start = time.time()
download_all_sites(sites)
stop = time.time()
print(f"multi process: {stop - start:.3f} seconds")

実行結果
f:id:okiyasi:20200922030125p:plain

マルチスレッドの時よりも速度がさらに早いという結果になりました。 これは単純にプロセスが増えたことによってCPU処理もI/O処理も両方とも並列化されるために、マルチスレッドの時よりCPU処理時間分だけ早くなったと予想されます。(スレッド数とプロセス数を同じにして比較することがナンセンス感ありますが)

マルチプロセス・マルチスレッドの問題点

今回はPythonにおける並列処理・並行処理を実現するマルチプロセス・マルチスレッドについてみていきました。

しかしマルチスレッドやマルチプロセスについても問題点があります。

それは、プロセス数やスレッド数が増えすぎると、メモリを食いつぶしたり、コンテキストスイッチするコストが増大してサーバがパンクしてしまうという問題です。(いわゆるC10K問題)

そのため、マルチスレッドやマルチプロセスを使わずに、シングルスレッドでも多くの処理を捌く必要が出てきました。

そこで登場してきたのが非同期処理です。非同期処理とはタスクを止めず(ブロックせず)に、別のタスクを実行する手法のことで、非同期処理によってより多くのI/O処理を捌くことが可能になりました。Pythonにおいてはasyncioが非同期処理モジュールとして有名です。

今回は非同期処理については書きませんが、近々、非同期処理についての記事を書こうと思います。

参考

実践Python
【図解】CPUのコアとスレッドとプロセスの違い・関係性、同時マルチスレッディング、コンテキストスイッチについて
Speed Up Your Python Program With Concurrency
Pythonをとりまく並行/非同期の話