# 进程

# import multiprocessing

:Python 的标准库,用于创建进程。

# Process()

class Process(group=None, target=None, name=None, args:Iterable, kwargs:Mapping, daemon:bool)
  • 功能:

    • 输入一些参数,定义一个进程,返回一个 Process 对象。
  • 参数:

    • group 表示进程组。
    • target 表示在进程中运行的入口函数。
    • name 表示进程的名称。默认按 Process-<int> 的格式命名。
    • args 表示传递元组参数给 target 函数。
    • kwargs 表示传递字典参数给 target 函数。
    • daemon 表示该进程,是否在后台运行。默认继承当前进程的 daemon 参数。
  • 例:

    >>> import multiprocessing
    >>> p = multiprocessing.Process(target=print, args=('hello',))
    >>> p
    <Process name='Process-1' parent=14036 initial> # 此时进程处于 initial 状态,也就是已经初始化,尚未启动
    >>> p.start()   # 启动进程
    hello           # 子进程默认在前台运行,将 stdout、stderr 输出到当前终端
    >>> p.start()   # 每个进程只允许调用一次 start() 方法
    AssertionError: cannot start a process twice
    
  • 进程的属性:

    >>> p.name      # 进程的名称
    'Process-1'
    >>> p.pid       # 进程的 PID 。调用 start() 之后,操作系统才会创建进程,分配 PID
    15176
    >>> p.daemon    # 进程是否为 daemon 类型
    False
    >>> p.exitcode  # 如果进程已终止,则可以查看其退出码
    0
    
  • 进程的方法:

    >>> p.is_alive()    # 判断进程是否正在运行
    False
    >>> p.terminate()   # 强制终止进程。如果该进程已终止,则也返回 None
    
    • 终止一个进程时,它的子进程不会被自动杀死,而是成为孤儿进程。
  • 查看所有进程:

    >>> multiprocessing.current_process()   # 返回当前进程,用 Process 对象表示
    <_MainProcess name='MainProcess' parent=None started>
    >>> multiprocessing.active_children()   # 返回当前进程的,所有正在运行的子进程
    []
    

# Pool()

class Pool(processes=None)
  • 功能:创建进程池,用于限制同时运行的进程数量。

    • 进程池中,最多容纳 processes 个进程。如果不指定 processes ,则 processes 默认等于本机的 CPU 核数。
    • 当进程池满了时,新建的进程会被挂起。等待进程池有空位时,才开始运行。
  • 例:

    >>> pool = multiprocessing.Pool(2)    # 创建一个 pool ,容量为 2
    >>> for i in range(5):
    ...     _ = pool.apply_async(print, (i,))  # 往 pool 中添加进程
    ...
    0
    1
    2
    3
    4
    >>> pool.close()  # 关闭 pool ,此后不能再添加进程
    >>> pool.join()   # 阻塞当前进程,等 pool 里的进程运行结束。这必须先调用 close() 方法
    
  • 为了避免用户忘记调用 close() ,建议通过 with 关键字访问 pool :

    with multiprocessing.Pool(2) as pool:
        ...
    
def apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
  • 功能:往 pool 中异步地添加进程。
    • 调用该函数,会立即返回。不会等子进程运行结束,不会阻塞当前进程。

# 阻塞

  • 阻塞当前进程:

    >>> p.join()  # 阻塞当前进程,等到指定进程终止,才继续运行当前进程
    >>> p.join(timeout=3)  # 最多阻塞当前进程 3 秒。等到指定进程终止,或者 3 秒之后,才继续运行当前进程
    
  • threading 模块提供了 Event、Condition、Lock、RLock 类,用于在线程之间进行阻塞。multiprocessing 模块也提供了 Event、Condition、Lock、RLock 类,用于在进程之间进行阻塞,用法相似。

# 通信

如何在多个进程之间通信?

  • 不能使用全局变量来通信。因为每个进程,独享一份全局变量。

    >>> def fun1(configs):
    ...     configs['num'] += 1
    ...     print(configs)
    ...
    >>> configs = {'num': 0}
    >>> for _ in range(3):
    ...     multiprocessing.Process(target=fun1, args=(configs,)).start()
    ...
    {'num': 1}  # 可见,各个进程的 num 没有累加
    {'num': 1}
    {'num': 1}
    
  • 使用 multiprocessing.Value 创建被多个进程共享的变量。

    >>> def fun1(num):
    ...     num.value += 1
    ...     print(num.value)
    ...
    >>> num = multiprocessing.Value('i', 0) # 创建一个共享变量,类型为 'i' ,取值为 0
    >>> num
    <Synchronized wrapper for c_int(0)>
    >>> for _ in range(3):
    ...     multiprocessing.Process(target=fun1, args=(num,)).start()
    ...
    1
    2
    3
    
    • Value 是基于 Ctypes 模块,创建一个变量,存储在进程的共享内存中。
    • Value 变量只能是 int、char 等基础数据类型,参考 array (opens new window) 模块。
    • 通过 num.value += 1 的形式修改共享变量时,在读取操作、赋值操作的间隙,该变量可能被其它进程修改。因此,建议加锁:
      with num.get_lock():
          num.value += 1
      
  • 使用 multiprocessing.Queue 创建跨进程的消息队列,用法类似于 queue 模块的 Queue 类。

    >>> queue = multiprocessing.Queue()
    >>> queue.put('hello')
    >>> queue.get(timeout=1)
    'hello'
    
  • 使用 multiprocessing.Pipe 创建一个通信管道。一个进程往管道中写入数据,其它进程从管道中读取数据。

    >>> pipe_left, pipe_right = multiprocessing.Pipe()  # 创建一个管道,返回管道的两端
    >>> pipe_left.send(1)       # 在管道的一端,写入一个元素
    >>> pipe_left.send('hello')
    >>> pipe_right.recv()       # 在管道的另一端,读取一个元素。读取这些元素时,按照先入先出的顺序
    1
    >>> pipe_right.recv()
    'hello'
    
    • 一个 pipe 有两个端口,全双工工作,任一端口既可以读数据,也可以写数据。就像网线。
    • 调用 recv() 时,只会读取从另一个端口发送的数据,不会读取当前端口发送的数据。
    • 调用 recv() 时,如果另一个端口尚未发送数据,则会阻塞当前线程,直到收到数据。
  • 使用 multiprocessing.managers 可以让不同主机的 Python 进程之间进行通信。

    • 需要运行一个 Manager 服务器进程,监听一个 TCP 端口。然后让各个主机的 Python 进程连接到 Manager 服务器。

# import subprocess

:Python 的标准库,用于创建一个子进程去执行终端命令。

# Popen()

class Popen(args,           # 待执行的命令
            bufsize=-1,     # 使用 Pipe 时的缓冲区大小
            stdin=None,     # 提供子进程标准输入的文件描述符
            stdout=None,
            stderr=None,
            shell=False,    # 是否在系统 shell 终端中执行命令
            cwd=None,       # 执行命令时的工作目录,默认是当前目录
            env=None,       # 可以传入一个字典,作为子进程的所有环境变量。默认继承当前进程的所有环境变量
            ...)
  • args 是在系统终端(比如 shell 终端)执行的命令,不是 Python 语句。

    • 可以是一个字符串,表示可执行文件名或路径(可以是相对路径)。比如 args='ls'
    • 也可以是一个字符串列表,其中第一个字符串表示可执行文件,之后的字符串表示命令参数。比如 args=['echo', 'Hello']
  • bufsize 可选几种取值:

    0       # 无缓冲
    1       # 行缓冲
    +int    # 使用 int 大小的缓冲
    -int    # 使用系统默认大小的缓冲(默认是 8KB)
    
  • stdinstdoutstderr 可选几种取值:

    None                # 重定向到当前进程
    subprocess.PIPE     # 缓存到 Pipe 中
    subprocess.STDOUT   # 重定向到 stdout
    subprocess.DEVNULL  # 丢弃
    
  • 常用属性和方法:

    >>> import subprocess
    >>> p = subprocess.Popen('pwd')              # 输入一个字符串作为 args
    /root
    >>> p = subprocess.Popen(['echo', 'Hello'])  # 输入一个字符串列表作为 args
    Hello
    >>> p.args
    ['echo', 'Hello']
    >>> p.pid
    30760
    >>> p.communicate()                     # 调用 communicate(input=None, timeout=None) 会返回子进程的 (stdout, stderr)
    (None, None)
    >>> p.wait(timeout=10)                  # 等待子进程退出,并返回其退出码。默认是一直等待,可以指定等待的超时时间。重复调用该方法会立即返回退出码
    0
    >>> p.returncode                        # 获取子进程的返回码,如果它尚未退出则返回 None
    0
    >>> p.kill()                            # 终止子进程(在 POSIX 系统上是发送 SIGKILL 信号)
    
    • 也可通过其它方式,终止进程:
      >>> import signal
      >>> p.send_signal(sig=signal.SIGTERM) # 发送信号
      
  • 可以不指定可执行文件的扩展名,Python 会自动根据当前的操作系统进行判断:

    >>> p = subprocess.Popen(r'C:\Windows\SysWOW64\ping.exe')   # 这里可以省略后缀 .exe
    用法: ping [-t] [-a] [-n count] [-l size] [-f] [-i TTL] [-v TOS]
                [-r count] [-s count] [[-j host-list] | [-k host-list]]
    

    不过如果不是操作系统已定义的可执行文件,比如是脚本文件 .sh ,则不能省略扩展名。

  • 可以用 shlex.split() 将一条命令分割成字符串列表:

    >>> import shlex
    >>> shlex.split('echo "Hello World"')
    ['echo', 'Hello World']
    

    默认不会解析注释:

    >>> shlex.split('D:/test/1.py # test')
    ['D:/test/1.py', '#', 'test']
    >>> shlex.split('D:/test/1.py # test', comments=True)
    ['D:/test/1.py']
    

    默认按 POSIX 风格处理命令,在 Windows 系统上要禁用 POSIX 风格:

    >>> shlex.split(r'D:\test\1.py')
    ['D:test1.py']
    >>> shlex.split(r'D:\test\1.py', posix=False)
    ['D:\\test\\1.py']
    >>> shlex.split(r'D:\test\1.py', posix=os.name=='posix')
    ['D:\\test\\1.py']
    
  • 控制输出:

    >>> p = subprocess.Popen(['echo', 'Hello'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    >>> p.communicate()                     # 建议赋值为 stdout, stderr = p.communicate()
    (b'Hello\n', None)                      # 输入、输出的内容默认为 bytes 类型
    >>> p.communicate()
    ValueError: read of closed file         # 不能重复读取输出
    
  • 控制输入:

    stdin = 'Hello World'.encode()
    with subprocess.Popen('cat', stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) as p:
        try:
            output  = p.communicate(stdin, timeout=10)[0]
        except subprocess.TimeoutExpired:
            p.kill()
            output  = p.communicate()[0]
    
  • 在当前环境变量的基础上,更新环境变量:

    import os
    env_dict = os.environ.copy()
    env_dict.update({'test_id': '1'})
    p = subprocess.Popen('env', env=env_dict)
    
  • 默认不会将 args 命令放在系统 shell 终端中执行,因此不能使用系统终端内置的命令和语法。比如在 Windows 上执行以下命令:

    >>> subprocess.Popen('echo Hello')                              # 不是在系统终端中执行,因此找不到 echo 命令
    FileNotFoundError: [WinError 2] 系统找不到指定的文件。
    >>> subprocess.Popen(['cmd', '/c', 'echo Hello']).communicate() # 换成用系统终端执行
    Hello
    (None, None)
    >>> subprocess.Popen('echo Hello', shell=True).communicate()    # 换成用系统终端执行
    Hello
    (None, None)