Usar multiprocesamiento en un marco de datos de Pandas
- Introducción al multiprocesamiento
- Importancia de usar multiprocesamiento
- Usar multiprocesamiento en un marco de datos de Pandas
Este tutorial presenta el multiprocesamiento en Python y lo educa mediante ejemplos de código y representaciones gráficas. También destaca la importancia del multiprocesamiento y demuestra cómo usar el módulo de multiprocesamiento
con un dataframe de Pandas.
Introducción al multiprocesamiento
Multiprocesamiento significa tener la capacidad del sistema para soportar múltiples procesadores al mismo tiempo. En el multiprocesamiento, las aplicaciones se dividen en rutinas más pequeñas que pueden ejecutarse de forma independiente o menos independiente.
El sistema operativo asignará estos hilos a diferentes procesadores, mejorando el rendimiento del sistema. ¿Cómo? Entendámoslo a través del siguiente programa simple.
Código de ejemplo:
import time
start = time.perf_counter()
def print_something():
print("Sleeping for 1 second.")
time.sleep(1)
print("Done with sleeping.")
print_something()
finish = time.perf_counter()
print(f"Completed in {round(finish-start,2)} second(s)")
Producción :
Sleeping for 1 second.
Done with sleeping.
Completed in 1.01 second(s)
El resultado anterior suena bien porque, en la cerca de código, primero importamos el módulo tiempo
, que usaremos para medir cuánto tiempo tarda en ejecutarse el script. Para medir eso, calculamos el tiempo de inicio
y fin
usando start = time.perf_counter()
y finish = time.perf_counter()
.
También tenemos una función llamada print_something()
que imprime algo, duerme por un segundo e imprime otra declaración. Llamamos a esta función y finalmente imprimimos la última declaración que muestra que hemos completado el script.
Ahora, si ejecutamos print_something()
dos veces, tardará casi dos segundos. Puede verificar eso a continuación.
Código de ejemplo:
import time
start = time.perf_counter()
def print_something():
print("Sleeping for 1 second.")
time.sleep(1)
print("Done with sleeping.")
print_something()
print_something()
finish = time.perf_counter()
print(f"Completed in {round(finish-start,2)} second(s)")
Producción :
Sleeping for 1 second.
Done with sleeping.
Sleeping for 1 second.
Done with sleeping.
Completed in 2.02 second(s)
Ahora, el programa está dormido por un segundo dos veces, por lo que lleva casi dos segundos terminar el script.
Entonces, podemos ver que cada vez que ejecutamos esta función print_something()
, agrega aproximadamente un segundo al script. Nuestro script solo espera durmiendo por un segundo, y luego, pasa a la siguiente función y se sienta a esperar otro segundo.
Luego, en ese punto, terminamos y nuestro script finaliza. Podemos entenderlo a través de la siguiente representación gráfica.
Esta representación gráfica muestra que ejecutamos una función (es print_something()
en nuestro caso), esperamos un segundo, ejecutamos la función nuevamente, luego esperamos otro segundo, y una vez hecho esto, imprime el print
final. declaración que demuestra que hemos terminado.
Como se presenta en la representación gráfica anterior, la ejecución del script en este orden se denomina ejecución sincrónica.
Ahora, si tenemos alguna tarea que no necesita ejecutarse sincrónicamente, entonces podemos usar el módulo de multiprocesamiento
para dividir estas tareas en las otras CPU y ejecutarlas simultáneamente.
Recuerde, multihilo
no es lo mismo que multiprocesamiento
. Puede encontrar la diferencia aquí.
La representación gráfica se verá de la siguiente manera si se supone que vamos a utilizar el módulo de multiprocesamiento
.
Aquí, todavía tenemos dos tareas, pero las dividimos en dos procesos que se ejecutan simultáneamente en diferentes procesos. Ahora, implementemos esta representación gráfica en nuestro programa de la siguiente manera:
import multiprocessing
import time
start = time.perf_counter()
def print_something():
print("Sleeping for 1 second.")
time.sleep(1)
print("Done with sleeping.")
process1 = multiprocessing.Process(target=print_something)
process2 = multiprocessing.Process(target=print_something)
process1.start()
process2.start()
process1.join()
process2.join()
finish = time.perf_counter()
print(f"Completed in {round(finish-start,2)} second(s)")
Producción :
Sleeping for 1 second.
Done with sleeping.
Sleeping for 1 second.
Done with sleeping.
Completed in 1.02 second(s)
Como podemos ver ahora, el script tarda un segundo en lugar de dos. El guión anterior es similar al anterior excepto por algunas modificaciones.
Importamos el módulo multiprocesamiento
, que usaremos para crear procesos. Luego, hacemos dos procesos usando multiprocessing.Process(target=print_something)
y los guardamos en process1
y process2
.
Después de eso, comenzamos ambos procesos usando process1.start()
y process2.start()
. Usamos el método .join()
para evitar ejecutar el resto de nuestro script antes de terminar los procesos.
Significa que si omitimos el método .join()
, ejecutará las siguientes dos declaraciones antes de finalizar los procesos:
finish = time.perf_counter()
print(f"Completed in {round(finish-start,2)} second(s)")
Podemos aprenderlo usando el siguiente código de ejemplo:
import multiprocessing
import time
start = time.perf_counter()
def print_something():
print("Sleeping for 1 second.")
time.sleep(1)
print("Done with sleeping.")
process1 = multiprocessing.Process(target=print_something)
process2 = multiprocessing.Process(target=print_something)
process1.start()
process2.start()
# process1.join()
# process2.join()
finish = time.perf_counter()
print(f"Completed in {round(finish-start,2)} second(s)")
Producción :
Completed in 0.01 second(s)
Sleeping for 1 second.
Done with sleeping.
Sleeping for 1 second.
Done with sleeping.
Como podemos ver, imprime la sentencia Completed
antes de las sentencias Sleeping
y Done
. Para evitar eso, podemos usar el método .join()
.
Importancia de usar multiprocesamiento
Supongamos que tenemos una máquina (PC/Laptop) con un procesador. Si le asignamos varios procesos simultáneamente, tendrá que molestar o interrumpir cada tarea y cambiar de una a otra para seguir ejecutando todos los procesos.
Aquí es donde entra en escena el concepto de multiprocesamiento. Una computadora multiprocesador puede tener varios procesadores centrales (multiprocesador) o un componente informático con dos o más unidades de procesamiento reales independientes conocidas como núcleos (procesadores multinúcleo).
Aquí, la CPU ejecutará fácilmente múltiples tareas simultáneamente, donde cada tarea usará su proceso. De esta manera, aceleraremos nuestro programa y ahorraremos mucho tiempo y dinero.
Usar multiprocesamiento en un marco de datos de Pandas
Tenemos suficiente conocimiento sobre el módulo de multiprocesamiento
, su uso básico y su importancia. Aprendamos cómo podemos usar este módulo con marcos de datos.
-
Importe las bibliotecas y los módulos.
Primero, importe todos los módulos y bibliotecas necesarios.
import pandas as pd from geopy.distance import geodesic from itertools import combinations import multiprocessing as mp
-
Cree un marco de datos.
Creamos un marco de datos que contiene las columnas
serial_number
,column_name
,lat
ylon
y sus valores.df = pd.DataFrame( { "serial_number": [1, 2, 3, 4, 5, 6, 7, 8, 9, 0], "column_name": ["aa", "aa", "aa", "bb", "bb", "bb", "bb", "cc", "cc", "cc"], "lat": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], "lon": [21, 22, 23, 24, 25, 26, 27, 28, 29, 30], } )
-
Divida las tareas.
Dividimos la tarea entre procesos, probablemente enviando cada tupla
(grp, lst)
a un proceso separado. Las siguientes líneas de código hacen lo mismo:grp_lst_args = list(df.groupby("column_name").groups.items()) print(grp_lst_args) # [('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])]
Después de eso, enviamos cada tupla como argumento a una función (
calc_dist()
en este caso) en un proceso separado. Veamos eso a continuación. -
Envía la lista de tuplas a una función.
El
calc_dist()
toma un parámetro de tipolista
para calcular la distancia y lo devuelve como un marco de datos. Tenga en cuenta quepd.DataFrame()
se utiliza para convertir a un marco de datos.Aquí, la lista contiene tres tuplas
[('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])]
que creamos en el anterior.def calc_dist(arg): grp, lst = arg return pd.DataFrame( [ [ grp, df.loc[c[0]].serial_number, df.loc[c[1]].serial_number, geodesic(df.loc[c[0], ["lat", "lon"]], df.loc[c[1], ["lat", "lon"]]), ] for c in combinations(lst, 2) ], columns=["column_name", "machine_A", "machine_B", "Distance"], )
-
Implementar multiprocesamiento.
En este punto, usaremos el multiprocesamiento para llamar simultáneamente a la función
calc_dist()
para cada tupla. En el siguiente código, usamosPool()
para la ejecución paralela de la funcióncalc_dist()
para cada tupla.pool = mp.Pool(processes=(mp.cpu_count() - 1)) results = pool.map(calc_dist, grp_lst_args) pool.close() pool.join() results_df = pd.concat(results)
El código fuente completo se proporciona a continuación:
import pandas as pd
from geopy.distance import geodesic
from itertools import combinations
import multiprocessing as mp
df = pd.DataFrame(
{
"serial_number": [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
"column_name": ["aa", "aa", "aa", "bb", "bb", "bb", "bb", "cc", "cc", "cc"],
"lat": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
"lon": [21, 22, 23, 24, 25, 26, 27, 28, 29, 30],
}
)
grp_lst_args = list(df.groupby("column_name").groups.items())
print(grp_lst_args)
# [('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])]
def calc_dist(arg):
grp, lst = arg
return pd.DataFrame(
[
[
grp,
df.loc[c[0]].serial_number,
df.loc[c[1]].serial_number,
geodesic(df.loc[c[0], ["lat", "lon"]], df.loc[c[1], ["lat", "lon"]]),
]
for c in combinations(lst, 2)
],
columns=["column_name", "machine_A", "machine_B", "Distance"],
)
pool = mp.Pool(processes=(mp.cpu_count() - 1))
results = pool.map(calc_dist, grp_lst_args)
pool.close()
pool.join()
results_df = pd.concat(results)
results_df
Producción :
| | column_name | machine_A | machine_B | Distance |
| ---- | ----------- | --------- | --------- | --------------------- |
| 0 | aa | 1 | 2 | 156.87614940188664 km |
| 1 | aa | 1 | 3 | 313.70544546930296 km |
| 2 | aa | 2 | 3 | 156.82932911607335 km |
| 0 | bb | 4 | 5 | 156.66564184752647 km |
| 1 | bb | 4 | 6 | 313.21433304645853 km |
| 2 | bb | 4 | 7 | 469.6225353706956 km |
| 3 | bb | 5 | 6 | 156.54889742502786 km |
| 4 | bb | 5 | 7 | 312.95759748703733 km |
| 5 | bb | 6 | 7 | 156.40899678081678 km |
| 0 | cc | 8 | 9 | 156.0601654009819 km |
| 1 | cc | 8 | 0 | 311.91099818906036 km |
| 2 | cc | 9 | 0 | 155.85149814424545 km |
Artículo relacionado - Pandas DataFrame
- Cómo obtener las cabeceras de columna de Pandas DataFrame como una lista
- Cómo borrar la columna de Pandas DataFrame
- Cómo convertir la columna del DataFrame a Datetime en Pandas
- Cómo convertir un float en un entero en Pandas DataFrame
- Cómo clasificar Pandas DataFrame por los valores de una columna
- Cómo obtener el agregado de Pandas grupo por y suma