Python 中的多进程队列

Aditya Raj 2023年10月10日
  1. Python 多进程队列
  2. Python 多进程队列方法
  3. 使用具有多个进程的多进程队列
  4. 结论
Python 中的多进程队列

编程时,你可以并行运行两个或多个程序。但是,如果你需要在程序之间进行通信,这将成为一项繁琐的任务。

本文讨论了我们如何在 Python 中使用多进程队列在两个 Python 程序之间进行通信。

Python 多进程队列

Python 为我们提供了多处理模块来并行创建、运行和管理两个或多个 Python 程序。你可以使用以下导入语句将多处理模块导入你的程序。

import multiprocessing

导入模块后,使用 Queue() 方法创建一个多进程队列。multiprocessing.Queue() 方法返回一个多进程队列。

代码:

import multiprocessing as mp

myQueue = mp.Queue()
print("The multiprocessing Queue is:")
print(myQueue)

输出:

The multiprocessing Queue is:
<multiprocessing.queues.Queue object at 0x7fa48f038070>

你可以看到已在给定位置的内存中创建了一个 Python 多进程队列。创建 Python 多进程队列后,你可以使用它在两个或多个进程之间传递数据。

Python 多进程队列方法

有各种多进程队列方法,借助它们我们可以执行各种操作。

将元素插入 Python 多进程队列

我们可以使用 put() 方法将元素插入到多进程队列中。当在多进程队列上调用时,该方法将一个元素作为其输入参数并将该元素添加到队列中,并在执行后返回 None

代码:

import multiprocessing as mp

myQueue = mp.Queue()
return_value = myQueue.put(1)
print(return_value)

输出:

None

如果没有为 put() 方法提供输入参数,程序将运行到 TypeError 异常,如下所示。

代码:

import multiprocessing as mp

myQueue = mp.Queue()
return_value = myQueue.put()
print(return_value)

输出:

Traceback (most recent call last):
  File "/home/aditya1117/PycharmProjects/pythonProject/string12.py", line 4, in <module>
    return_value= myQueue.put()
TypeError: put() missing 1 required positional argument: 'obj'

在这里,我们没有为 put() 方法提供任何输入参数。因此,程序引发了 TypeError 异常,指出缺少所需的位置参数。

从 Python 多进程队列中提取元素

你可以使用 get() 方法从多进程队列中提取元素。get() 方法,当在多进程队列上调用时,它会在从队列中删除队列的前端元素后返回队列的前端元素。

代码:

import multiprocessing as mp

myQueue = mp.Queue()
myQueue.put(1)
myQueue.put(2)
myQueue.put(3)
myQueue.put(4)
myQueue.put(5)
return_value = myQueue.get()
print(return_value)

输出:

1

我们首先将五个元素排入多进程队列。之后,我们使用 get() 方法获得了一个元素。

观察 get() 方法返回值 1 插入到多进程队列中。这是因为队列遵循先进先出 (FIFO) 顺序来访问元素。

获取 Python 多进程队列的大小

我们可以使用 qsize() 方法获取多进程队列的大小。qsize() 方法返回 python 多进程队列的近似大小。

代码:

import multiprocessing as mp

myQueue = mp.Queue()
myQueue.put(1)
myQueue.put(2)
myQueue.put(3)
myQueue.put(4)
myQueue.put(5)
return_value = myQueue.qsize()
print("The size of multiprocessing queue is:")
print(return_value)

输出:

The size of multiprocessing queue is:
5

在上面的示例中,我们使用了术语近似大小而不是队列的大小。这是因为队列在多个进程之间共享。

因此,另一个进程可能会在我们获得它的大小之后将一个元素添加到队列中或从队列中删除一个元素。因此,qsize() 方法返回的大小是不可靠的。

检查多进程队列是否为空

empty() 方法检查多进程队列是否为空,如果队列为空,则该方法返回 True。否则,它返回 False

代码:

import multiprocessing as mp

myQueue = mp.Queue()
myQueue.put(1)
myQueue.put(2)
myQueue.put(3)
myQueue.put(4)
myQueue.put(5)
return_value = myQueue.empty()
print("The multiprocessing queue is empty:")
print(return_value)

输出:

The multiprocessing queue is empty:
False

多进程队列中有五个元素。因此,empty() 方法返回 False

关闭 Python 多进程队列

如果你不希望任何进程写入多进程队列,你可以使用 close() 方法关闭队列。close() 方法,当在任何进程中的多进程队列上调用时,会关闭队列。

在此之后,任何进程都不能将元素插入队列。现在让我们解决如何在 Python 程序中使用多进程队列。

使用具有多个进程的多进程队列

定义函数以创建流程

要在 Python 中的不同进程之间使用多进程队列,我们​​首先需要创建多个进程。我们将首先定义两个函数。

第一个函数将多进程队列作为输入参数。在执行时,它将从 11000 的正数添加到 Python 多进程队列。

def addPositive(queue):
    print("I am in addPositive.")
    for i in range(1, 1001):
        queue.put(i)

第二个函数还将多进程队列作为输入参数。但是,它将从 -1000-1 的负数添加到多进程队列中。

def addNegative(queue):
    print("I am in addNegative.")
    for i in range(-1000, 0):
        queue.put(i)

创建进程以将数据写入多进程队列

创建函数后,我们将使用这两个函数创建两个单独的进程。我们可以使用 Process() 方法来创建一个进程。

Process() 方法将一个函数作为分配给 target 参数的第一个输入参数。它还接受一个元组,该元组包含目标中提供的函数输入参数。

元组被分配给 Process() 方法的 args 参数。执行后,Process() 方法返回一个 Process 对象。

我们将创建一个将正数和负数添加到多进程队列的过程。

myQueue = mp.Queue()
process1 = mp.Process(target=addPositive, args=(myQueue,))
process2 = mp.Process(target=addNegative, args=(myQueue,))

启动将数据写入多进程队列的进程

创建流程后,我们可以使用 start() 方法开始执行流程。一旦进程被执行,这些数字将被写入多进程队列。

process1.start()
process2.start()

如果使用 terminate() 命令或由于异常突然终止任何进程,则多进程队列可能会损坏。之后,你将无法在任何进程中从队列中读取或写入队列。

因此,所有过程都必须顺利执行。

在主进程中等待子进程完成

我们在其中创建了其他进程的父进程可能会在子进程之前完成其执行。在这种情况下,僵尸进程被创建并始终存在于计算机的内存中。

为了避免这种情况,我们可以暂停父进程的执行,直到子进程完成它们的执行。我们可以使用 join() 方法让父进程等待子进程完成执行。

process1.join()
process2.join()

打印多进程队列的内容

我们可以使用 get() 方法、empty() 方法和 print() 函数打印多处理的内容。我们将使用 empty() 方法检查多进程队列是否为空。

如果队列不为空,我们将使用 get() 方法从队列中提取一个元素并打印结果。否则,我们将使用 close() 方法关闭多进程队列以完成程序的执行。

代码:

import multiprocessing as mp


def addPositive(queue):
    print("I am in addPositive.")
    for i in range(1, 100):
        queue.put(i)


def addNegative(queue):
    print("I am in addNegative.")
    for i in range(-100, 0):
        queue.put(i)


myQueue = mp.Queue()
process1 = mp.Process(target=addPositive, args=(myQueue,))
process2 = mp.Process(target=addNegative, args=(myQueue,))
process1.start()
process2.start()
process1.join()
process2.join()
while myQueue:
    print(myQueue.get(), end=",")
myQueue.close()

输出:

1,2,3,4,5,6,7,8,9,10,-1001,11,12,13,-1000,-999,-998,-997,-996,-995,-994,-993,-992,-991,-990,-989,-988,-987,-986,-985,-984,-983,-982,-981,14,-980,15,-979,16,17,18,19,20,21,22,23,24,25,26,-978,-977,-976,-975,-974,-973,-972,-971,-970...

代码将一直运行,直到队列为空。

观察队列随机包含正数和负数。这证明数据正在使用两个不同的进程以并行方式写入多进程队列。

结论

在本文中,我们讨论了 python 多进程队列。multiprocessing 模块提供高级函数来创建子进程。

我们建议使用多处理模块而不是 fork() 方法来创建子进程。你可以使用 Pipe 和 SimpleQueue 对象在进程之间共享数据。

你可以在本文档中阅读有关它们的更多信息。

作者: Aditya Raj
Aditya Raj avatar Aditya Raj avatar

Aditya Raj is a highly skilled technical professional with a background in IT and business, holding an Integrated B.Tech (IT) and MBA (IT) from the Indian Institute of Information Technology Allahabad. With a solid foundation in data analytics, programming languages (C, Java, Python), and software environments, Aditya has excelled in various roles. He has significant experience as a Technical Content Writer for Python on multiple platforms and has interned in data analytics at Apollo Clinics. His projects demonstrate a keen interest in cutting-edge technology and problem-solving, showcasing his proficiency in areas like data mining and software development. Aditya's achievements include securing a top position in a project demonstration competition and gaining certifications in Python, SQL, and digital marketing fundamentals.

GitHub