# 线程
# 原理
进程、线程的原理,参考以下笔记:
当一个进程中所有线程都终止时,操作系统才认为该进程已终止,释放该进程占用的系统资源,比如 PID、内存。
创建的子线程、子进程,默认会在前台运行,将 stdout、stderr 输出到当前终端。
- 如果将子线程、子进程设置为 daemon 类型,则它们会在后台运行,不影响当前终端。
- 如果一个进程中,所有非 daemon 线程都终止了,则操作系统会自动终止所有 daemon 线程,然后终止该进程。
- 如果一个进程组中,所有非 daemon 进程都终止了,则操作系统会自动终止所有 daemon 线程,然后终止该进程组。
CPython 解释器采用 GIL(Global Interpreter Lock,全局解释锁)。
- 原理:
- 同一进程中,同时只能有一个线程拿到 GIL 锁。
- 只有拿到 GIL 锁的那个线程,会被 CPU 执行。而其它线程,不会被 CPU 执行。
- 优点:
- 同一个进程中的多个线程,可能同时访问一个资源,发生冲突。而 GIL 能避免这种情况。
- 缺点:
- 由于 GIL 机制,同一进程中的多个线程,不能同时执行、并发工作。
- CPython 的多线程,
- 适合处理 IO 密集型任务。因为这些线程经常处于 iowait 状态,大部分时间不占用 CPU 。
- 不适合处理 CPU 密集型任务。因为这些线程都需要占用 CPU ,但同时只会执行拿到 GIL 锁的那个线程。导致多个线程的任务处理速度,与单个线程差不多,甚至可能更慢(因为上下文切换的耗时)。
- CPython 的多进程,
- 不适合处理 IO 密集型任务。因为这种任务,用多线程就可以处理。创建进程,比起创建线程,需要消耗更多 CPU、内存。
- 适合处理 CPU 密集型任务。因为 N 个进程,可以同时被 CPU 执行,任务处理速度是单个进程的 N 倍,除非主机的 CPU 核数少于 N 。
- 原理:
# import threading
:Python 的标准库,用于创建线程。
# Thread()
class Thread(group=None, target=None, name=None, args=(), kwargs=None, daemon:bool)
功能:
- 输入一些参数,定义一个线程,返回一个 Thread 对象。
参数:
- group 表示线程组。
- target 表示在线程中运行的入口函数。
- name 表示线程的名称。默认按
Thread-<int>
的格式命名。 - args 表示传递元组参数给 target 函数。
- kwargs 表示传递字典参数给 target 函数。
- daemon 表示该线程,是否在后台运行。默认继承当前线程的 daemon 参数。
例:
>>> import threading >>> t1 = threading.Thread(target=print, args=('hello',)) >>> t1 <Thread(Thread-1, initial)> # 此时线程处于 initial 状态,也就是已经初始化,尚未启动 >>> t1.start() # 启动线程 hello # 子线程默认在前台运行,将 stdout、stderr 输出到当前终端 >>> t1.start() # 每个线程只允许调用一次 start() 方法 RuntimeError: threads can only be started once
查看线程的信息:
>>> t1 <Thread(t1, stopped 7712)> >>> t1.name # 线程的名称 'Thread-1' >>> t1.native_id # 线程的 TID 。调用 start() 之后,操作系统才会创建线程,分配 TID 7712 >>> t1.daemon # 线程是否为 daemon 类型 False >>> t1.is_alive() # 线程是否正在运行 False
查看所有线程:
>>> threading.current_thread() # 返回当前线程,用 Thread 对象表示 <_MainThread(MainThread, started 6764)> >>> threading.main_thread() # 返回当前进程中的主线程 <_MainThread(MainThread, started 6764)> >>> threading.active_count() # 返回当前进程中,正在运行的线程总数 1 >>> threading.enumerate() # 返回当前进程中,正在运行的所有 Thread 对象 [<_MainThread(MainThread, started 6764)>]
# Thread.run()
另一种创建线程的方法:定义一个线程类,继承
threading.Thread
类,至少重载其run()
方法。例:
>>> class myThread(threading.Thread): ... def __init__(self, a): ... super().__init__() # 重载 __init__() 方法时,需要调用 Thread.__init__() 方法,才能初始化线程 ... self.a = a ... def run(self): ... print(self.a) ... >>> >>> t1 = myThread(a=1) >>> t1.start() 1
Thread.start()
方法,会调用Thread.run()
方法,来启动线程。Thread.run()
方法,会调用线程的 target 函数,来启动线程。
# 阻塞
阻塞当前线程:
>>> t1.join() # 阻塞当前线程,等到 t1 线程终止,才继续运行当前线程 >>> t1.join(timeout=3) # 最多阻塞当前线程 3 秒。等到 t1 终止,或者 3 秒之后,才继续运行当前线程
threading.Lock()
是互斥锁。- 如果一个线程获得了互斥锁,则会阻塞同一进程的其它所有线程。
- 例:
>>> lock = threading.Lock() # 创建一个互斥锁 >>> lock.acquire() # 请求获得锁,这会阻塞当前线程,直到成功获得锁 True >>> do_something() # 此时只有当前线程在运行,其它线程暂停运行 >>> lock.release() # 释放锁 >>> lock.locked() # 判断 lock 是否被锁定 False
>>> lock.acquire(blocking=False) # blocking=False 不会阻塞当前线程。需要根据返回值 True/False ,判断是否成功获得锁 True >>> lock.acquire(timeout=3) # 设置超时时间,最多阻塞当前线程多久 False
- 如果一个线程获得互斥锁之后,没有释放,就再次调用
acquire()
方法,则会一直等待获得锁,陷入死锁。 - 为了避免用户忘记调用
release()
,建议通过 with 关键字访问 lock :这相当于:>>> lock = threading.Lock() >>> with lock: ... lock.locked() # 此时 lock 被锁 ... True >>> lock.locked() # 此时 lock 没被锁 False
lock.acquire() try: ... finally: lock.release()
threading.RLock()
是可重入的互斥锁。- 同一个线程,可以重复获得同一个
RLock()
,不会死锁。但是调用了多少次acquire()
,就需要调用多少次release()
,才能完全释放该锁,供其它线程获取。 - 例:
>>> rlock = threading.RLock() >>> rlock.acquire() True >>> rlock.acquire() True >>> rlock # owner 表示该锁被哪个线程获取了,count 表示 acquire 的次数 <locked _thread.RLock object owner=6764 count=2 at 0x0000015AABCB9420> >>> rlock.release() >>> rlock.release() >>> rlock.release() # 如果对一个已释放的锁,调用 release() ,则会抛出异常 RuntimeError: cannot release un-acquired lock
- 同一个线程,可以重复获得同一个
threading.Condition()
表示条件。用于让某个线程阻塞,直到某个条件被激活,才继续运行。- 例:在一个线程中,等待条件在另一个线程中,激活条件:
>>> condition = threading.Condition() # 创建一个条件 >>> with condition: # 获取这个条件的 RLock 锁,相当于调用 condition.acquire() ... condition.wait(timeout=3) # 阻塞当前线程,直到该条件被激活。timeout 默认值为 None ,表示无限阻塞 ... False # 如果 wait 成功,则返回 True 。如果 wait 超时,则返回 False
>>> condition.notify(n=1) # 唤醒 n 个正在 wait 该条件的线程。 n 默认值为 1 >>> condition.notify_all() # 唤醒所有正在 wait 该条件的线程
- 例:在一个线程中,等待条件
# 终止
- 如何终止一个线程?
- 建议不要从外部杀死线程,否则线程可能来不及做完某些任务。比如尚未将内存中的数据持久化到磁盘。
- 因此,threading 库没有提供杀死线程的方法。
- 如果线程为 daemon 类型,则可能被操作系统终止。
- 建议编写线程的源代码,让它自己决定何时终止。
- 比如一个线程只会执行少量任务,运行一段时间就会自行终止。
- 比如一个线程会无限循环,但每隔 1 分钟,就检查一次是否应该终止。
- 建议不要从外部杀死线程,否则线程可能来不及做完某些任务。比如尚未将内存中的数据持久化到磁盘。
# 通信
如何在多个线程之间通信?
- 可以使用全局变量。因为所有子线程,会共享父线程的全局变量。如下:为了方便阅读代码时,看出每个线程访问了哪些全局变量,建议以函数实参的形式传入全局变量:
>>> def fun1(): ... configs['num'] += 1 ... print(configs) ... >>> configs = {'num': 0} # configs 是一个全局变量,能在所有子线程、所有函数内访问到 >>> for _ in range(3): ... threading.Thread(target=fun1).start() ... {'num': 1} {'num': 2} {'num': 3} # 可见,各个线程的 num 累加了
>>> def fun1(configs): ... configs['num'] += 1 ... print(configs) ... >>> configs = {'num': 0} >>> for _ in range(3): ... threading.Thread(target=fun1, args=(configs,)).start() ... {'num': 1} {'num': 2} {'num': 3}
- 可以使用 queue 模块,创建消息队列。
# Timer()
class Timer(interval, function, args=None, kwargs=None)
功能:定义一个延迟启动的线程。
这样可以实现定时任务,但功能简单,不适合处理大量定时任务。
例:
>>> t1 = threading.Timer(3.0, print, ('hello',)) >>> t1.start() # 启动线程。但线程会等 interval 秒才启动,执行 function 函数 >>> hello >>> t1.cancel() # 可以提前终止线程
# import queue
:Python 的标准库,用于线程之间的通信。
工作流程:
- 创建一个队列,可以容纳多条消息。
- 一个线程往队列中写入消息。
- 其它线程从队列中读取消息。
queue 提供了多种消息队列:Queue、LifoQueue、PriorityQueue 。
- 它们都采用生产者-消费者模式。
- 它们都采用了线程锁,是线程安全的。即使多个线程同时读写同一个队列,也不会冲突。
# Queue
class Queue(maxsize=0)
- 功能:创建一个队列,可以写入最多 maxsize 条消息。最先写入的消息,会被最先读出,即先进先出(FIFO)。
- maxsize 表示队列的容量。如果
maxsize<=0
,则不限制容量。
- maxsize 表示队列的容量。如果
- 例:
>>> import queue >>> q = queue.Queue(10) >>> q.qsize() # 返回队列此时的大小,即消息总数 0 >>> q.empty() # 判断队列是否为空,即不包含任何消息 True >>> q.full() # 判断队列是否满了,即达到 maxsize False
Queue.put(item, block=True, timeout=None)
- 功能:往队列中写入一个元素(或者说消息),该元素可以是任意类型。
- 如果
block=True
,则当队列满了时,调用 put() 会一直阻塞,直到队列中有元素被取出。 - 如果
block=False
,则当队列满了时,调用 put() 不会阻塞,而是抛出queue.Full
异常。 - 如果输入了 timeout 参数,则最多阻塞 timeout 秒,然后抛出
queue.Full
异常。
- 如果
- 例:
>>> q.put(0) >>> q.put('Hello', block=False) >>> q.put({}, timeout=1)
Queue.get(block=True, timeout=None)
- 功能:从队列中取出一个元素。
- 如果
block=True
,则当队列为空时,调用 get() 会一直阻塞,直到队列中存在元素。 - 如果
block=False
,则当队列为空时,调用 get() 不会阻塞,而是抛出queue.Empty
异常。 - 如果输入了 timeout 参数,则最多阻塞 timeout 秒,然后抛出
queue.Empty
异常。
- 如果
- 例:
>>> q.get() 0 >>> q.get(block=False) 'Hello' >>> q.get(timeout=1) {}
# LifoQueue
class LifoQueue(maxsize=0)
- 功能:创建一个后进先出的队列(LIFO)。
- LifoQueue 继承自 Queue 类,用法差不多,只是元素为后进先出。
# PriorityQueue
class PriorityQueue(maxsize=0)
- 功能:创建一个优先级队列(Priority)。
- 写入的每个元素,必须实现
__lt__()
和__gt__()
魔法方法,从而支持排序。 - 调用 get() 时,会取出队列中排序最小的那个元素。
- 实际上是基于 heapq 模块的 heappush() 和 heappop() 来写入、取出元素。
- 写入的每个元素,必须实现
- 例:
>>> class Item: ... def __init__(self, priority, data): ... self.priority = priority ... self.data = data ... def __repr__(self): ... return self.data ... def __lt__(self, other): ... return self.priority < other ... def __gt__(self, other): ... return self.priority > other ... >>> q = queue.PriorityQueue() >>> q.put(1, block=True, timeout=None) >>> q.put(Item(3, 'three')) >>> q.put(Item(2, 'two')) >>> q.get(timeout=1) 1 >>> q.get(timeout=1) two >>> q.get(timeout=1) three