Python 多线程详解:threading 模块从入门到高级实例(2026 年视角)
Python 的 threading 模块 是标准库中实现多线程编程的核心工具。它基于线程模型,让你能并发执行多个任务,尤其适合 I/O 密集型场景(如网络请求、文件读写)。但 Python 有 GIL(Global Interpreter Lock,全局解释器锁),所以在 CPU 密集型任务中,多线程并不能真正并行(更适合多进程 multiprocessing)。2026 年,随着 Python 3.13+ 的 GIL 优化(可选无 GIL 模式),多线程性能有显著提升,但默认仍需注意。
本教程从基础到高级,逐步讲解 + 超详细代码实例(每个实例附运行说明 + 潜在问题 + 优化建议)。假设你用 Python 3.8+,代码可在 Jupyter 或 VS Code 中运行。每个实例都独立可复制粘贴测试。
1. 多线程基础概念
- 线程 vs 进程:线程是进程内的执行单元,共享内存(更快启动,但需小心数据竞争)。进程独立内存,更安全但开销大。
- threading 模块关键类/函数: 类/函数 描述 常见用法 Thread(target=func, args=()) 创建线程对象 启动子线程执行 func thread.start() 启动线程 — thread.join() 等待线程结束 主线程阻塞等子线程 threading.Lock() 互斥锁(防止数据竞争) with lock: … threading.RLock() 可重入锁(同一线程可多次获取) 递归函数中用 threading.Semaphore(n) 信号量(限流) 控制同时访问资源的线程数 threading.Condition() 条件变量(线程间通信) 生产者-消费者模式 threading.Event() 事件(信号灯) 一个线程通知多个线程 queue.Queue() 线程安全队列(数据共享) 多线程间传递数据 threading.current_thread() 获取当前线程 打印线程名/ID
- 注意:多线程不适合 CPU 密集(如计算密集循环),用 multiprocessing 或 concurrent.futures.ThreadPoolExecutor(更高层抽象)。
2. 入门实例:简单多线程
场景:主线程 + 子线程并发打印消息,演示线程创建/启动/等待。
import threading
import time
def worker(name):
print(f"线程 {name} 开始执行...")
time.sleep(2) # 模拟耗时任务
print(f"线程 {name} 执行结束!")
# 创建线程
t1 = threading.Thread(target=worker, args=("子线程1",))
t2 = threading.Thread(target=worker, args=("子线程2",))
# 启动线程
t1.start()
t2.start()
print("主线程继续执行...")
# 等待子线程结束(可选,如果不 join,主线程可能先结束)
t1.join()
t2.join()
print("所有线程结束,主线程退出。")
运行输出(大致顺序,非严格):
线程 子线程1 开始执行...
线程 子线程2 开始执行...
主线程继续执行...
线程 子线程1 执行结束!
线程 子线程2 执行结束!
所有线程结束,主线程退出。
详解:
target=worker:指定线程执行的函数。args=("子线程1",):传参,必须是元组。start():启动线程,立即返回(非阻塞)。join():阻塞主线程,直到该线程结束。- 潜在问题:无共享数据,无需锁。但如果函数有异常,会静默失败(用 try-except 捕获)。
- 优化:用
daemon=True设置守护线程(主线程结束时自动杀子线程)。
3. 中级实例:线程间数据共享 + 锁(避免竞争)
场景:多个线程修改共享变量(计数器),无锁 vs 有锁对比。演示数据竞争问题。
import threading
counter = 0 # 共享变量
def increment(n):
global counter
for _ in range(n):
counter += 1 # 非原子操作,可能竞争
# 无锁版本(会出错)
threads = []
for _ in range(10):
t = threading.Thread(target=increment, args=(100_000,))
threads.append(t)
t.start()
for t in threads:
t.join()
print("无锁结果(预期 1,000,000):", counter) # 实际 ~900,000+,丢失计数
# 有锁版本
counter = 0
lock = threading.Lock()
def increment_locked(n):
global counter
for _ in range(n):
with lock: # 获取锁,独占执行
counter += 1
threads = []
for _ in range(10):
t = threading.Thread(target=increment_locked, args=(100_000,))
threads.append(t)
t.start()
for t in threads:
t.join()
print("有锁结果(预期 1,000,000):", counter) # 正确 1,000,000
详解:
- 数据竞争:GIL 虽保护解释器,但
counter += 1是多步操作(读-改-写),线程切换时丢失。 Lock():互斥锁,with lock:自动获取/释放。- 潜在问题:死锁(两个线程互等锁);性能瓶颈(锁粒度太大)。
- 优化:用 RLock() 如果函数递归调用锁;最小化锁内代码。
4. 高级实例:生产者-消费者模式(Condition + Queue)
场景:生产者线程生成数据,消费者线程处理。演示线程通信 + 队列安全共享。
import threading
import queue
import time
import random
q = queue.Queue(maxsize=5) # 线程安全队列,限 5 项
def producer():
for i in range(10):
item = random.randint(1, 100)
q.put(item) # 阻塞如果队列满
print(f"生产者生产: {item} (队列大小: {q.qsize()})")
time.sleep(random.uniform(0.5, 1.5))
q.put(None) # 结束信号
def consumer(name):
while True:
item = q.get() # 阻塞如果队列空
if item is None:
print(f"{name} 收到结束信号")
break
print(f"{name} 消费: {item} (队列大小: {q.qsize()})")
time.sleep(random.uniform(1, 2))
# 启动
p = threading.Thread(target=producer)
c1 = threading.Thread(target=consumer, args=("消费者1",))
c2 = threading.Thread(target=consumer, args=("消费者2",))
p.start()
c1.start()
c2.start()
p.join()
c1.join()
c2.join()
print("生产消费结束")
运行输出(示例):
生产者生产: 42 (队列大小: 1)
消费者1 消费: 42 (队列大小: 0)
生产者生产: 73 (队列大小: 1)
消费者2 消费: 73 (队列大小: 0)
... (继续直到 10 项)
消费者1 收到结束信号
消费者2 收到结束信号
生产消费结束
详解:
Queue():线程安全 FIFO,先入先出。put()/get()自动处理锁。None作为哨兵:通知消费者结束(多消费者时需多个哨兵或用 Event)。- 潜在问题:队列满/空时阻塞;如果生产慢消费快,可能饥饿。
- 优化:用
queue.PriorityQueue()优先级队列;或 LifoQueue 栈式。
5. 高级实例:限流 + 信号量(Semaphore)
场景:模拟数据库连接池,限制同时访问的线程数(e.g., 最多 3 个)。
import threading
import time
sem = threading.Semaphore(3) # 最多 3 个线程同时访问
def access_db(name):
print(f"{name} 尝试访问 DB...")
with sem: # 获取信号量(计数 -1,如果 0 则阻塞)
print(f"{name} 访问 DB 中...")
time.sleep(2) # 模拟查询
print(f"{name} 访问结束")
threads = []
for i in range(10):
t = threading.Thread(target=access_db, args=(f"线程{i}",))
threads.append(t)
t.start()
time.sleep(0.2) # 错开启动
for t in threads:
t.join()
运行输出(示例):
线程0 尝试访问 DB...
线程0 访问 DB 中...
线程1 尝试访问 DB...
线程1 访问 DB 中...
线程2 尝试访问 DB...
线程2 访问 DB 中...
线程3 尝试访问 DB... # 阻塞,直到前一个释放
...
详解:
Semaphore(n):计数器,初始 n。acquire()减 1(0 时阻塞),release()加 1。with sem:自动 acquire/release。- 潜在问题:忘记 release() 导致死锁;n 太小瓶颈大。
- 优化:结合 Lock 保护共享资源;2026 年用 asyncio.Semaphore 异步版。
6. 高级实例:线程间通信(Condition + Event)
场景:一个线程等待另一个线程的信号(e.g., 下载完成通知处理)。
import threading
import time
cond = threading.Condition() # 条件变量
event = threading.Event() # 事件信号
def downloader():
print("下载开始...")
time.sleep(3)
with cond:
print("下载完成,通知等待者...")
cond.notify_all() # 通知所有等待线程
event.set() # 设置事件为 True
def processor(name):
print(f"{name} 等待下载完成...")
with cond:
cond.wait() # 阻塞直到 notify
print(f"{name} 收到通知,开始处理...")
# 启动
d = threading.Thread(target=downloader)
p1 = threading.Thread(target=processor, args=("处理器1",))
p2 = threading.Thread(target=processor, args=("处理器2",))
d.start()
p1.start()
p2.start()
# 用 Event 额外等待
event.wait() # 主线程等待事件
print("所有处理完成")
p1.join()
p2.join()
d.join()
运行输出:
处理器1 等待下载完成...
处理器2 等待下载完成...
下载开始...
下载完成,通知等待者...
处理器1 收到通知,开始处理...
处理器2 收到通知,开始处理...
所有处理完成
详解:
Condition():结合锁,用于 wait() / notify() / notify_all()。Event():简单信号,set() 后 is_set() 为 True,所有 wait() 唤醒。- 潜在问题:Condition 需在 with cond 内使用;Event 一旦 set 不可 reset(用 clear())。
- 优化:Condition 适合复杂同步,Event 适合简单通知。
7. 常见问题 & 最佳实践(2026 年视角)
- GIL 影响:I/O 任务(如 API 调用)用多线程好;CPU 任务用 multiprocessing 或 concurrent.futures.ProcessPoolExecutor。
- 线程安全:全局变量用锁;列表/字典用 queue 或 threading.local()(线程本地存储)。
- 调试:用
threading.enumerate()列所有线程;threading.settrace()追踪。 - 性能监控:用 timeit / cProfile 测速;psutil 查 CPU/内存。
- 高级替代:asyncio(异步 I/O,更高效);concurrent.futures(线程/进程池,简化管理)。
- 陷阱:不要在子线程改主线程 GUI(e.g., Tkinter);异常传播需手动处理。
做完这些实例,你基本掌握 threading 了!推荐实践:写一个多线程下载器(requests + ThreadPoolExecutor)。
你现在最想深入哪个部分?(e.g., 更多队列例子?asyncio 对比?)或者贴你的代码,我帮 debug~