Pandas DataFrame でマルチプロセッシングを使用する
このチュートリアルでは、Python のマルチプロセッシングを紹介し、コード例とグラフィック表現を使用してそれについて説明します。 また、マルチプロセッシングの重要性を強調し、Pandas データフレームで multiprocessing
モジュールを使用する方法を示します。
マルチプロセッシングの紹介
マルチプロセッシングとは、同時に複数のプロセッサをサポートするシステムの能力を持つことを意味します。 マルチプロセッシングでは、アプリケーションは独立して実行できる、または独立性が低いルーチンに分割されます。
オペレーティング システムはこれらのスレッドを異なるプロセッサに割り当て、システムのパフォーマンスを向上させます。 どうやって? 次の簡単なプログラムで理解してみましょう。
コード例:
import time
start = time.perf_counter()
def print_something():
print("Sleeping for 1 second.")
time.sleep(1)
print("Done with sleeping.")
print_something()
finish = time.perf_counter()
print(f"Completed in {round(finish-start,2)} second(s)")
出力:
Sleeping for 1 second.
Done with sleeping.
Completed in 1.01 second(s)
上記の出力は、コード フェンスで最初に time
モジュールをインポートしたため、正しく聞こえます。これを使用して、スクリプトの実行にかかる時間を測定します。 それを測定するために、start = time.perf_counter()
と finish = time.perf_counter()
を使用して start
と finish
の時間を計算しました。
print_something()
という名前の関数もあり、何かを出力し、1 秒間スリープし、別のステートメントを出力します。 この関数を呼び出し、最後に、スクリプトが完了したことを示す最後のステートメントを出力します。
ここで、print_something()
を 2 回実行すると、ほぼ 2 秒かかります。 以下で確認できます。
コード例:
import time
start = time.perf_counter()
def print_something():
print("Sleeping for 1 second.")
time.sleep(1)
print("Done with sleeping.")
print_something()
print_something()
finish = time.perf_counter()
print(f"Completed in {round(finish-start,2)} second(s)")
出力:
Sleeping for 1 second.
Done with sleeping.
Sleeping for 1 second.
Done with sleeping.
Completed in 2.02 second(s)
現在、プログラムは 1 秒間 2 回スリープしているため、スクリプトが完了するまでに約 2 秒かかります。
したがって、この print_something()
関数を実行するたびに、スクリプトに約 1 秒追加されることがわかります。 スクリプトは 1 秒間スリープ状態で待機しているだけで、次の関数に移動して、さらに 1 秒間待機します。
そして、その時点で作業は完了し、スクリプトは終了します。 次の図で理解できます。
このグラフィカルな表現は、関数 (この場合は print_something()
) を実行し、1 秒待ってから関数を再度実行し、さらに 1 秒待ってから、最終的な print
を出力することを示しています。 私たちが終わったことを示す声明。
上の図に示されているように、この順序でスクリプトを実行することを同期実行と呼びます。
ここで、同期的に実行する必要のないタスクがある場合、multiprocessing
モジュールを使用してこれらのタスクを他の CPU に分割し、同時に実行できます。
multithreading
は multiprocessing
と同じではないことに注意してください。 ここで違いを見つけることができます。
multiprocessing
モジュールを使用する場合、グラフィック表示は次のようになります。
ここでは、まだ 2つのタスクがありますが、これを 2つのプロセスに分割し、異なるプロセスで同時に実行します。 それでは、このグラフィカルな表現を次のようにプログラムに実装しましょう。
import multiprocessing
import time
start = time.perf_counter()
def print_something():
print("Sleeping for 1 second.")
time.sleep(1)
print("Done with sleeping.")
process1 = multiprocessing.Process(target=print_something)
process2 = multiprocessing.Process(target=print_something)
process1.start()
process2.start()
process1.join()
process2.join()
finish = time.perf_counter()
print(f"Completed in {round(finish-start,2)} second(s)")
出力:
Sleeping for 1 second.
Done with sleeping.
Sleeping for 1 second.
Done with sleeping.
Completed in 1.02 second(s)
ご覧のとおり、スクリプトの所要時間は 2 秒ではなく 1 秒です。 上記のスクリプトは、いくつかの変更を除いて前のスクリプトと似ています。
プロセスの作成に使用する multiprocessing
モジュールをインポートしました。 次に、multiprocessing.Process(target=print_something)
を使用して 2つのプロセスを作成し、process1
と process2
に保存します。
その後、process1.start()
と process2.start()
を使用してこれらのプロセスの両方を開始します。 .join()
メソッドを使用して、プロセスを終了する前に残りのスクリプトを実行しないようにしました。
.join()
メソッドを省略すると、プロセスを終了する前に次の 2つのステートメントが実行されることを意味します。
finish = time.perf_counter()
print(f"Completed in {round(finish-start,2)} second(s)")
次のサンプルコードを使用して学習できます。
import multiprocessing
import time
start = time.perf_counter()
def print_something():
print("Sleeping for 1 second.")
time.sleep(1)
print("Done with sleeping.")
process1 = multiprocessing.Process(target=print_something)
process2 = multiprocessing.Process(target=print_something)
process1.start()
process2.start()
# process1.join()
# process2.join()
finish = time.perf_counter()
print(f"Completed in {round(finish-start,2)} second(s)")
出力:
Completed in 0.01 second(s)
Sleeping for 1 second.
Done with sleeping.
Sleeping for 1 second.
Done with sleeping.
ご覧のとおり、Sleeping
ステートメントと Done
ステートメントの前に Completed
ステートメントを出力します。 それを避けるために、.join()
メソッドを使用できます。
マルチプロセッシングの重要性
プロセッサが 1つあるマシン (PC/ラップトップ) があるとします。 複数のプロセスを同時に割り当てると、すべてのタスクを妨害または中断し、すべてのプロセスを実行し続けるために、あるタスクから別のタスクに切り替える必要があります。
ここで、マルチプロセッシングの概念が登場します。 マルチプロセッシング コンピューターには、複数の中央処理装置 (マルチプロセッサー) を搭載することも、コアと呼ばれる 2つ以上の独立した実際の処理ユニット (マルチコア プロセッサー) を備えた 1つのコンピューティング コンポーネントを搭載することもできます。
ここでは、CPU は複数のタスクを同時に簡単に実行でき、各タスクはそのプロセスを使用します。 このようにして、プログラムを高速化し、多くの時間とコストを節約します。
Pandas DataFrame でマルチプロセッシングを使用する
multiprocessing
モジュール、その基本的な使用法、およびその重要性について十分な知識があります。 このモジュールをデータ フレームで使用する方法を学びましょう。
-
ライブラリとモジュールをインポートします。
まず、必要なモジュールとライブラリをすべてインポートします。
import pandas as pd from geopy.distance import geodesic from itertools import combinations import multiprocessing as mp
-
データ フレームを作成します。
serial_number
、column_name
、lat
、およびlon
列とその値を含むデータ フレームを作成します。df = pd.DataFrame( { "serial_number": [1, 2, 3, 4, 5, 6, 7, 8, 9, 0], "column_name": ["aa", "aa", "aa", "bb", "bb", "bb", "bb", "cc", "cc", "cc"], "lat": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], "lon": [21, 22, 23, 24, 25, 26, 27, 28, 29, 30], } )
-
タスクを分割します。
タスクをプロセス間で分割し、おそらくすべてのタプル
(grp, lst)
を別のプロセスに送信します。 次のコード行は同じことを行います。grp_lst_args = list(df.groupby("column_name").groups.items()) print(grp_lst_args) # [('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])]
その後、各タプルを別のプロセスで関数 (この場合は
calc_dist()
) に引数として送信します。 以下でそれを見てみましょう。 -
タプルのリストを関数に送信します。
calc_dist()
はlist
型のパラメーターを受け取り、距離を計算し、それをデータ フレームとして返します。pd.DataFrame()
を使用してデータ フレームに変換することに注意してください。ここで、リストには 3つのタプル
[('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])]
前に作成したもの。def calc_dist(arg): grp, lst = arg return pd.DataFrame( [ [ grp, df.loc[c[0]].serial_number, df.loc[c[1]].serial_number, geodesic(df.loc[c[0], ["lat", "lon"]], df.loc[c[1], ["lat", "lon"]]), ] for c in combinations(lst, 2) ], columns=["column_name", "machine_A", "machine_B", "Distance"], )
-
マルチプロセッシングを実装します。
この時点で、マルチプロセッシングを使用して、各タプルに対して
calc_dist()
関数を同時に呼び出します。 次のコードでは、タプルごとにcalc_dist()
関数を並列実行するためにPool()
を使用します。pool = mp.Pool(processes=(mp.cpu_count() - 1)) results = pool.map(calc_dist, grp_lst_args) pool.close() pool.join() results_df = pd.concat(results)
完全なソース コードを以下に示します。
import pandas as pd
from geopy.distance import geodesic
from itertools import combinations
import multiprocessing as mp
df = pd.DataFrame(
{
"serial_number": [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
"column_name": ["aa", "aa", "aa", "bb", "bb", "bb", "bb", "cc", "cc", "cc"],
"lat": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
"lon": [21, 22, 23, 24, 25, 26, 27, 28, 29, 30],
}
)
grp_lst_args = list(df.groupby("column_name").groups.items())
print(grp_lst_args)
# [('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])]
def calc_dist(arg):
grp, lst = arg
return pd.DataFrame(
[
[
grp,
df.loc[c[0]].serial_number,
df.loc[c[1]].serial_number,
geodesic(df.loc[c[0], ["lat", "lon"]], df.loc[c[1], ["lat", "lon"]]),
]
for c in combinations(lst, 2)
],
columns=["column_name", "machine_A", "machine_B", "Distance"],
)
pool = mp.Pool(processes=(mp.cpu_count() - 1))
results = pool.map(calc_dist, grp_lst_args)
pool.close()
pool.join()
results_df = pd.concat(results)
results_df
出力:
| | column_name | machine_A | machine_B | Distance |
| ---- | ----------- | --------- | --------- | --------------------- |
| 0 | aa | 1 | 2 | 156.87614940188664 km |
| 1 | aa | 1 | 3 | 313.70544546930296 km |
| 2 | aa | 2 | 3 | 156.82932911607335 km |
| 0 | bb | 4 | 5 | 156.66564184752647 km |
| 1 | bb | 4 | 6 | 313.21433304645853 km |
| 2 | bb | 4 | 7 | 469.6225353706956 km |
| 3 | bb | 5 | 6 | 156.54889742502786 km |
| 4 | bb | 5 | 7 | 312.95759748703733 km |
| 5 | bb | 6 | 7 | 156.40899678081678 km |
| 0 | cc | 8 | 9 | 156.0601654009819 km |
| 1 | cc | 8 | 0 | 311.91099818906036 km |
| 2 | cc | 9 | 0 | 155.85149814424545 km |