# 线程

# 原理

  • 进程、线程的原理,参考以下笔记:

  • 当一个进程中所有线程都终止时,操作系统才认为该进程已终止,释放该进程占用的系统资源,比如 PID、内存。

  • 创建的子线程、子进程,默认会在前台运行,将 stdout、stderr 输出到当前终端。

    • 如果将子线程、子进程设置为 daemon 类型,则它们会在后台运行,不影响当前终端。
    • 如果一个进程中,所有非 daemon 线程都终止了,则操作系统会自动终止所有 daemon 线程,然后终止该进程。
    • 如果一个进程组中,所有非 daemon 进程都终止了,则操作系统会自动终止所有 daemon 线程,然后终止该进程组。
  • CPython 解释器采用 GIL(Global Interpreter Lock,全局解释锁)。

    • 原理:
      • 同一进程中,同时只能有一个线程拿到 GIL 锁。
      • 只有拿到 GIL 锁的那个线程,会被 CPU 执行。而其它线程,不会被 CPU 执行。
    • 优点:
      • 同一个进程中的多个线程,可能同时访问一个资源,发生冲突。而 GIL 能避免这种情况。
    • 缺点:
      • 由于 GIL 机制,同一进程中的多个线程,不能同时执行、并发工作。
    • CPython 的多线程,
      • 适合处理 IO 密集型任务。因为这些线程经常处于 iowait 状态,大部分时间不占用 CPU 。
      • 不适合处理 CPU 密集型任务。因为这些线程都需要占用 CPU ,但同时只会执行拿到 GIL 锁的那个线程。导致多个线程的任务处理速度,与单个线程差不多,甚至可能更慢(因为上下文切换的耗时)。
    • CPython 的多进程,
      • 不适合处理 IO 密集型任务。因为这种任务,用多线程就可以处理。创建进程,比起创建线程,需要消耗更多 CPU、内存。
      • 适合处理 CPU 密集型任务。因为 N 个进程,可以同时被 CPU 执行,任务处理速度是单个进程的 N 倍,除非主机的 CPU 核数少于 N 。

# import threading

:Python 的标准库,用于创建线程。

# Thread()

class Thread(group=None, target=None, name=None, args=(), kwargs=None, daemon:bool)
  • 功能:

    • 输入一些参数,定义一个线程,返回一个 Thread 对象。
  • 参数:

    • group 表示线程组。
    • target 表示在线程中运行的入口函数。
    • name 表示线程的名称。默认按 Thread-<int> 的格式命名。
    • args 表示传递元组参数给 target 函数。
    • kwargs 表示传递字典参数给 target 函数。
    • daemon 表示该线程,是否在后台运行。默认继承当前线程的 daemon 参数。
  • 例:

    >>> import threading
    >>> t1 = threading.Thread(target=print, args=('hello',))
    >>> t1
    <Thread(Thread-1, initial)>   # 此时线程处于 initial 状态,也就是已经初始化,尚未启动
    >>> t1.start()    # 启动线程
    hello             # 子线程默认在前台运行,将 stdout、stderr 输出到当前终端
    >>> t1.start()    # 每个线程只允许调用一次 start() 方法
    RuntimeError: threads can only be started once
    
  • 查看线程的信息:

    >>> t1
    <Thread(t1, stopped 7712)>
    >>> t1.name             # 线程的名称
    'Thread-1'
    >>> t1.native_id        # 线程的 TID 。调用 start() 之后,操作系统才会创建线程,分配 TID
    7712
    >>> t1.daemon           # 线程是否为 daemon 类型
    False
    >>> t1.is_alive()       # 线程是否正在运行
    False
    
  • 查看所有线程:

    >>> threading.current_thread()  # 返回当前线程,用 Thread 对象表示
    <_MainThread(MainThread, started 6764)>
    >>> threading.main_thread()     # 返回当前进程中的主线程
    <_MainThread(MainThread, started 6764)>
    >>> threading.active_count()    # 返回当前进程中,正在运行的线程总数
    1
    >>> threading.enumerate()       # 返回当前进程中,正在运行的所有 Thread 对象
    [<_MainThread(MainThread, started 6764)>]
    

# Thread.run()

  • 另一种创建线程的方法:定义一个线程类,继承 threading.Thread 类,至少重载其 run() 方法。

  • 例:

    >>> class myThread(threading.Thread):
    ...     def __init__(self, a):
    ...         super().__init__()  # 重载 __init__() 方法时,需要调用 Thread.__init__() 方法,才能初始化线程
    ...         self.a = a
    ...     def run(self):
    ...         print(self.a)
    ...
    >>>
    >>> t1 = myThread(a=1)
    >>> t1.start()
    1
    
    • Thread.start() 方法,会调用 Thread.run() 方法,来启动线程。
    • Thread.run() 方法,会调用线程的 target 函数,来启动线程。

# 终止

  • 如何终止一个线程?
    • 建议不要从外部杀死线程,否则线程可能来不及做完某些任务。比如尚未将内存中的数据持久化到磁盘。
      • 因此,threading 库没有提供杀死线程的方法。
      • 如果线程为 daemon 类型,则可能被操作系统终止。
    • 建议编写线程的源代码,让它自己决定何时终止。
      • 比如一个线程只会执行少量任务,运行一段时间就会自行终止。
      • 比如一个线程会无限循环,但每隔 1 分钟,就检查一次是否应该终止。

# 阻塞

  • 阻塞当前线程:

    >>> t1.join() # 阻塞当前线程,等到 t1 线程终止,才继续运行当前线程
    >>> t1.join(timeout=3)  # 最多阻塞当前线程 3 秒。等到 t1 终止,或者 3 秒之后,才继续运行当前线程
    
  • threading.Event 是事件。用于让某个线程阻塞,直到事件被激活,才继续运行。

    • 创建一个 event ,然后将 event 变量传递给多个线程:
      >>> event = threading.Event()
      >>> threading.Thread(target=fun1, args=(event,)).start()
      
    • 在一个线程中,激活 event :
      >>> event.is_set()  # 检查 event 是否激活,默认未激活
      False
      >>> event.set()     # 激活 event
      >>> event.is_set()
      True
      
    • 在其它线程中,阻塞等待 event 被激活:
      >>> event.wait(timeout=3) # 阻塞当前线程,直到 event 被激活。timeout 默认值为 None ,表示无限阻塞
      True                      # 如果 wait 成功,则返回 True 。如果 wait 超时,则返回 False
      
  • threading.Condition 是条件,用法与 Event 类似。

    • 创建一个 condition ,然后将 condition 变量传递给多个线程:
      >>> condition = threading.Condition()
      >>> threading.Thread(target=fun1, args=(condition,)).start()
      
    • 在一个线程中,激活 condition :
      >>> condition.notify(n=1)   # 唤醒 n 个正在 wait 该条件的线程。 n 默认值为 1
      >>> condition.notify_all()  # 唤醒所有正在 wait 该条件的线程
      
    • 在其它线程中,等待 condition 被激活:
      >>> with condition:   # 获取这个条件的 RLock 锁,相当于调用 condition.acquire()
      ...     condition.wait(timeout=3)
      ...
      False
      
  • threading.Lock 是互斥锁。

    • 创建一个 lock ,然后将 lock 变量传递给多个线程:
      >>> lock = threading.Lock()
      >>> threading.Thread(target=fun1, args=(lock,)).start()
      
    • 在某个线程中,获取锁:
      >>> lock.acquire()  # 请求获得锁,这会阻塞当前线程,直到成功获得锁
      True
      >>> lock.release()  # 释放锁
      >>> lock.locked()   # 判断 lock 是否被锁定
      False
      
    • 对于同一个 lock 变量,同时只能有一个线程锁定它。
      • 如果 lock 已被一个线程锁定,则调用 lock.acquire() 的其它线程,会阻塞等待,直到 lock 解锁、可以被 acquire 。
      • 如果一个线程调用 lock.acquire() 成功之后,没有释放锁,就再次调用 lock.acquire() ,则会一直等待获得锁,陷入死锁。
      • 可以修改 acquire 的方式,避免长时间阻塞:
        >>> lock.acquire(blocking=False)  # blocking=False 不会阻塞当前线程。需要根据返回值 True/False ,判断是否成功获得锁
        True
        >>> lock.acquire(timeout=3)       # 设置超时时间,最多阻塞当前线程多久
        False
        
    • 为了避免用户忘记调用 release() ,建议通过 with 关键字访问 lock :
      >>> lock = threading.Lock()
      >>> with lock:
      ...     lock.locked()   # 此时 lock 被锁
      ...
      True
      >>> lock.locked()   # 此时 lock 没被锁
      False
      
      这相当于:
      lock.acquire()
      try:
          ...
      finally:
          lock.release()
      
  • threading.RLock 是可重入的互斥锁。

    • 同一个线程,可以重复获得同一个 RLock() ,不会死锁。但是调用了多少次 acquire() ,就需要调用多少次 release() ,才能完全释放该锁,供其它线程获取。
    • 例:
      >>> rlock = threading.RLock()
      >>> rlock.acquire()
      True
      >>> rlock.acquire()
      True
      >>> rlock           # owner 表示该锁被哪个线程获取了,count 表示 acquire 的次数
      <locked _thread.RLock object owner=6764 count=2 at 0x0000015AABCB9420>
      >>> rlock.release()
      >>> rlock.release()
      >>> rlock.release() # 如果对一个已释放的锁,调用 release() ,则会抛出异常
      RuntimeError: cannot release un-acquired lock
      

# 通信

如何在多个线程之间通信?

  • 可以使用全局变量,在线程之间传递数据。因为所有子线程,会共享父线程的全局变量。如下:
    >>> def fun1():
    ...     configs['num'] += 1
    ...     print(configs)
    ...
    >>> configs = {'num': 0}  # configs 是一个全局变量,能在所有子线程、所有函数内访问到
    >>> for _ in range(3):
    ...     threading.Thread(target=fun1).start()
    ...
    {'num': 1}
    {'num': 2}
    {'num': 3}  # 可见,各个线程的 num 累加了
    
    为了方便阅读代码时,看出每个线程访问了哪些全局变量,建议以函数实参的形式传入全局变量:
    >>> def fun1(configs):
    ...     configs['num'] += 1
    ...     print(configs)
    ...
    >>> configs = {'num': 0}
    >>> for _ in range(3):
    ...     threading.Thread(target=fun1, args=(configs,)).start()
    ...
    {'num': 1}
    {'num': 2}
    {'num': 3}
    
  • 可以使用 queue 模块,创建跨线程的消息队列。

# Timer()

class Timer(interval, function, args=None, kwargs=None)
  • 功能:定义一个延迟启动的线程。

  • 这样可以实现定时任务,但功能简单,不适合处理大量定时任务。

  • 例:

    >>> t1 = threading.Timer(3.0, print, ('hello',))
    >>> t1.start()   # 启动线程。但线程会等 interval 秒才启动,执行 function 函数
    >>> hello
    >>> t1.cancel()  # 可以提前终止线程
    

# import queue

:Python 的标准库,用于线程之间的通信。

  • 官方文档 (opens new window)

  • 工作流程:

    1. 创建一个队列,可以容纳多条消息。
    2. 一个线程往队列中写入消息。
    3. 其它线程从队列中读取消息。
  • queue 提供了多种消息队列:Queue、LifoQueue、PriorityQueue 。

    • 它们都采用生产者-消费者模式。
    • 它们都采用了线程锁,是线程安全的。即使多个线程同时读写同一个队列,也不会冲突。

# Queue

class Queue(maxsize=0)
  • 功能:创建一个队列,可以写入最多 maxsize 条消息。最先写入的消息,会被最先读出,即先进先出(FIFO)。
    • maxsize 表示队列的容量。如果 maxsize<=0 ,则不限制容量。
  • 例:
    >>> import queue
    >>> q = queue.Queue(10)
    >>> q.qsize()   # 返回队列此时的大小,即消息总数
    0
    >>> q.empty()   # 判断队列是否为空,即不包含任何消息
    True
    >>> q.full()    # 判断队列是否满了,即达到 maxsize
    False
    
Queue.put(item, block=True, timeout=None)
  • 功能:往队列中写入一个元素(或者说消息),该元素可以是任意类型。
    • 如果 block=True ,则当队列满了时,调用 put() 会一直阻塞,直到队列中有元素被取出。
    • 如果 block=False ,则当队列满了时,调用 put() 不会阻塞,而是抛出 queue.Full 异常。
    • 如果输入了 timeout 参数,则最多阻塞 timeout 秒,然后抛出 queue.Full 异常。
  • 例:
    >>> q.put(0)
    >>> q.put('Hello', block=False)
    >>> q.put({}, timeout=1)
    
Queue.get(block=True, timeout=None)
  • 功能:从队列中取出一个元素。
    • 如果 block=True ,则当队列为空时,调用 get() 会一直阻塞,直到队列中存在元素。
    • 如果 block=False ,则当队列为空时,调用 get() 不会阻塞,而是抛出 queue.Empty 异常。
    • 如果输入了 timeout 参数,则最多阻塞 timeout 秒,然后抛出 queue.Empty 异常。
  • 例:
    >>> q.get()
    0
    >>> q.get(block=False)
    'Hello'
    >>> q.get(timeout=1)
    {}
    

# LifoQueue

class LifoQueue(maxsize=0)
  • 功能:创建一个后进先出的队列(LIFO)。
  • LifoQueue 继承自 Queue 类,用法差不多,只是元素为后进先出。

# PriorityQueue

class PriorityQueue(maxsize=0)
  • 功能:创建一个优先级队列(Priority)。
    • 写入的每个元素,必须实现 __lt__()__gt__() 魔法方法,从而支持排序。
    • 调用 get() 时,会取出队列中排序最小的那个元素。
    • 实际上是基于 heapq 模块的 heappush() 和 heappop() 来写入、取出元素。
  • 例:
    >>> class Item:
    ...     def __init__(self, priority, data):
    ...         self.priority = priority
    ...         self.data = data
    ...     def __repr__(self):
    ...         return self.data
    ...     def __lt__(self, other):
    ...         return self.priority < other
    ...     def __gt__(self, other):
    ...         return self.priority > other
    ...
    >>> q = queue.PriorityQueue()
    >>> q.put(1, block=True, timeout=None)
    >>> q.put(Item(3, 'three'))
    >>> q.put(Item(2, 'two'))
    >>> q.get(timeout=1)
    1
    >>> q.get(timeout=1)
    two
    >>> q.get(timeout=1)
    three