# 进程
# import multiprocessing
:Python 的标准库,用于创建进程。
# Process()
class Process(group=None, target=None, name=None, args:Iterable, kwargs:Mapping, daemon:bool)
功能:
- 输入一些参数,定义一个进程,返回一个 Process 对象。
参数:
- group 表示进程组。
- target 表示在进程中运行的入口函数。
- name 表示进程的名称。默认按
Process-<int>
的格式命名。 - args 表示传递元组参数给 target 函数。
- kwargs 表示传递字典参数给 target 函数。
- daemon 表示该进程,是否在后台运行。默认继承当前进程的 daemon 参数。
例:
>>> import multiprocessing >>> p = multiprocessing.Process(target=print, args=('hello',)) >>> p <Process name='Process-1' parent=14036 initial> # 此时进程处于 initial 状态,也就是已经初始化,尚未启动 >>> p.start() # 启动进程 hello # 子进程默认在前台运行,将 stdout、stderr 输出到当前终端 >>> p.start() # 每个进程只允许调用一次 start() 方法 AssertionError: cannot start a process twice
进程的属性:
>>> p.name # 进程的名称 'Process-1' >>> p.pid # 进程的 PID 。调用 start() 之后,操作系统才会创建进程,分配 PID 15176 >>> p.daemon # 进程是否为 daemon 类型 False >>> p.exitcode # 如果进程已终止,则可以查看其退出码 0
进程的方法:
>>> p.is_alive() # 判断进程是否正在运行 False >>> p.terminate() # 强制终止进程。如果该进程已终止,则也返回 None
- 终止一个进程时,它的子进程不会被自动杀死,而是成为孤儿进程。
查看所有进程:
>>> multiprocessing.current_process() # 返回当前进程,用 Process 对象表示 <_MainProcess name='MainProcess' parent=None started> >>> multiprocessing.active_children() # 返回当前进程的,所有正在运行的子进程 []
# Pool()
class Pool(processes=None)
功能:创建进程池,用于限制同时运行的进程数量。
- 进程池中,最多容纳 processes 个进程。如果不指定 processes ,则 processes 默认等于本机的 CPU 核数。
- 当进程池满了时,新建的进程会被挂起。等待进程池有空位时,才开始运行。
例:
>>> pool = multiprocessing.Pool(2) # 创建一个 pool ,容量为 2 >>> for i in range(5): ... _ = pool.apply_async(print, (i,)) # 往 pool 中添加进程 ... 0 1 2 3 4 >>> pool.close() # 关闭 pool ,此后不能再添加进程 >>> pool.join() # 阻塞当前进程,等 pool 里的进程运行结束。这必须先调用 close() 方法
为了避免用户忘记调用 close() ,建议通过 with 关键字访问 pool :
with multiprocessing.Pool(2) as pool: ...
def apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
- 功能:往 pool 中异步地添加进程。
- 调用该函数,会立即返回。不会等子进程运行结束,不会阻塞当前进程。
# 阻塞
阻塞当前进程:
>>> p.join() # 阻塞当前进程,等到指定进程终止,才继续运行当前进程 >>> p.join(timeout=3) # 最多阻塞当前进程 3 秒。等到指定进程终止,或者 3 秒之后,才继续运行当前进程
threading 模块提供了 Lock、RLock 类,用于在多线程之间创建锁。multiprocessing 模块也提供了 Lock、RLock 类,用于在多进程之间创建锁。
threading 模块提供了 Condition 类。multiprocessing 模块也提供了 Condition 类,用法相似。
# 通信
如何在多个进程之间通信?
不能使用全局变量。因为每个进程,独享一份全局变量。
>>> def fun1(configs): ... configs['num'] += 1 ... print(configs) ... >>> configs = {'num': 0} >>> for _ in range(3): ... multiprocessing.Process(target=fun1, args=(configs,)).start() ... {'num': 1} # 可见,各个进程的 num 没有累加 {'num': 1} {'num': 1}
使用
multiprocessing.Value
创建被多个进程共享的变量。>>> def fun1(num): ... num.value += 1 ... print(num.value) ... >>> num = multiprocessing.Value('i', 0) # 创建一个共享变量,类型为 'i' ,取值为 0 >>> num <Synchronized wrapper for c_int(0)> >>> for _ in range(3): ... multiprocessing.Process(target=fun1, args=(num,)).start() ... 1 2 3
- Value 是基于 Ctypes 模块,创建一个变量,存储在进程的共享内存中。
- Value 变量只能是 int、char 等基础数据类型,参考 array (opens new window) 模块。
- 通过
num.value += 1
的形式修改共享变量时,在读取操作、赋值操作的间隙,该变量可能被其它进程修改。因此,建议加锁:with num.get_lock(): num.value += 1
使用
multiprocessing.Queue
进行进程之间的通信。用法类似于 queue 模块的 Queue 类。>>> queue = multiprocessing.Queue() >>> queue.put('hello') >>> queue.get(timeout=1) 'hello'
使用
multiprocessing.Pipe
创建一个通信管道。一个进程往管道中写入数据,其它进程从管道中读取数据。>>> pipe_left, pipe_right = multiprocessing.Pipe() # 创建一个管道,返回管道的两端 >>> pipe_left.send(1) # 在管道的一端,写入一个元素 >>> pipe_left.send('hello') >>> pipe_right.recv() # 在管道的另一端,读取一个元素。读取这些元素时,按照先入先出的顺序 1 >>> pipe_right.recv() 'hello'
- 一个 pipe 有两个端口,全双工工作,任一端口既可以读数据,也可以写数据。就像网线。
- 调用 recv() 时,只会读取从另一个端口发送的数据,不会读取当前端口发送的数据。
- 调用 recv() 时,如果另一个端口尚未发送数据,则会阻塞当前线程,直到收到数据。
使用
multiprocessing.managers
可以让不同主机的 Python 进程之间进行通信。- 需要运行一个 Manager 服务器进程,监听一个 TCP 端口。然后让各个主机的 Python 进程连接到 Manager 服务器。
# import subprocess
:Python 的标准库,用于创建一个子进程去执行终端命令。
- 官方文档 (opens new window)
- 与
os.system()
、os.popen()
的用途相似,但功能更强。
# Popen()
class Popen(args, # 待执行的命令
bufsize=-1, # 使用 Pipe 时的缓冲区大小
stdin=None, # 提供子进程标准输入的文件描述符
stdout=None,
stderr=None,
shell=False, # 是否在系统 shell 终端中执行命令
cwd=None, # 执行命令时的工作目录,默认是当前目录
env=None, # 可以传入一个字典,作为子进程的所有环境变量。默认继承当前进程的所有环境变量
...)
args
是在系统终端(比如 shell 终端)执行的命令,不是 Python 语句。- 可以是一个字符串,表示可执行文件名或路径(可以是相对路径)。比如
args='ls'
。 - 也可以是一个字符串列表,其中第一个字符串表示可执行文件,之后的字符串表示命令参数。比如
args=['echo', 'Hello']
。
- 可以是一个字符串,表示可执行文件名或路径(可以是相对路径)。比如
bufsize
可选几种取值:0 # 无缓冲 1 # 行缓冲 +int # 使用 int 大小的缓冲 -int # 使用系统默认大小的缓冲(默认是 8KB)
stdin
、stdout
、stderr
可选几种取值:None # 重定向到当前进程 subprocess.PIPE # 缓存到 Pipe 中 subprocess.STDOUT # 重定向到 stdout subprocess.DEVNULL # 丢弃
常用属性和方法:
>>> import subprocess >>> p = subprocess.Popen('pwd') # 输入一个字符串作为 args /root >>> p = subprocess.Popen(['echo', 'Hello']) # 输入一个字符串列表作为 args Hello >>> p.args ['echo', 'Hello'] >>> p.pid 30760 >>> p.communicate() # 调用 communicate(input=None, timeout=None) 会返回子进程的 (stdout, stderr) (None, None) >>> p.wait(timeout=10) # 等待子进程退出,并返回其退出码。默认是一直等待,可以指定等待的超时时间。重复调用该方法会立即返回退出码 0 >>> p.returncode # 获取子进程的返回码,如果它尚未退出则返回 None 0 >>> p.kill() # 终止子进程(在 POSIX 系统上是发送 SIGKILL 信号)
- 也可通过其它方式,终止进程:
>>> import signal >>> p.send_signal(sig=signal.SIGTERM) # 发送信号
- 也可通过其它方式,终止进程:
可以不指定可执行文件的扩展名,Python 会自动根据当前的操作系统进行判断:
>>> p = subprocess.Popen(r'C:\Windows\SysWOW64\ping.exe') # 这里可以省略后缀 .exe 用法: ping [-t] [-a] [-n count] [-l size] [-f] [-i TTL] [-v TOS] [-r count] [-s count] [[-j host-list] | [-k host-list]]
不过如果不是操作系统已定义的可执行文件,比如是脚本文件
.sh
,则不能省略扩展名。可以用
shlex.split()
将一条命令分割成字符串列表:>>> import shlex >>> shlex.split('echo "Hello World"') ['echo', 'Hello World']
默认不会解析注释:
>>> shlex.split('D:/test/1.py # test') ['D:/test/1.py', '#', 'test'] >>> shlex.split('D:/test/1.py # test', comments=True) ['D:/test/1.py']
默认按 POSIX 风格处理命令,在 Windows 系统上要禁用 POSIX 风格:
>>> shlex.split(r'D:\test\1.py') ['D:test1.py'] >>> shlex.split(r'D:\test\1.py', posix=False) ['D:\\test\\1.py'] >>> shlex.split(r'D:\test\1.py', posix=os.name=='posix') ['D:\\test\\1.py']
控制输出:
>>> p = subprocess.Popen(['echo', 'Hello'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) >>> p.communicate() # 建议赋值为 stdout, stderr = p.communicate() (b'Hello\n', None) # 输入、输出的内容默认为 bytes 类型 >>> p.communicate() ValueError: read of closed file # 不能重复读取输出
控制输入:
stdin = 'Hello World'.encode() with subprocess.Popen('cat', stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) as p: try: output = p.communicate(stdin, timeout=10)[0] except subprocess.TimeoutExpired: p.kill() output = p.communicate()[0]
在当前环境变量的基础上,更新环境变量:
import os env_dict = os.environ.copy() env_dict.update({'test_id': '1'}) p = subprocess.Popen('env', env=env_dict)
默认不会将 args 命令放在系统 shell 终端中执行,因此不能使用系统终端内置的命令和语法。比如在 Windows 上执行以下命令:
>>> subprocess.Popen('echo Hello') # 不是在系统终端中执行,因此找不到 echo 命令 FileNotFoundError: [WinError 2] 系统找不到指定的文件。 >>> subprocess.Popen(['cmd', '/c', 'echo Hello']).communicate() # 换成用系统终端执行 Hello (None, None) >>> subprocess.Popen('echo Hello', shell=True).communicate() # 换成用系统终端执行 Hello (None, None)