【Python】超详细实例讲解python多线程(threading模块)

Python 多线程详解:threading 模块从入门到高级实例(2026 年视角)

Python 的 threading 模块 是标准库中实现多线程编程的核心工具。它基于线程模型,让你能并发执行多个任务,尤其适合 I/O 密集型场景(如网络请求、文件读写)。但 Python 有 GIL(Global Interpreter Lock,全局解释器锁),所以在 CPU 密集型任务中,多线程并不能真正并行(更适合多进程 multiprocessing)。2026 年,随着 Python 3.13+ 的 GIL 优化(可选无 GIL 模式),多线程性能有显著提升,但默认仍需注意。

本教程从基础到高级,逐步讲解 + 超详细代码实例(每个实例附运行说明 + 潜在问题 + 优化建议)。假设你用 Python 3.8+,代码可在 Jupyter 或 VS Code 中运行。每个实例都独立可复制粘贴测试。

1. 多线程基础概念

  • 线程 vs 进程:线程是进程内的执行单元,共享内存(更快启动,但需小心数据竞争)。进程独立内存,更安全但开销大。
  • threading 模块关键类/函数: 类/函数 描述 常见用法 Thread(target=func, args=()) 创建线程对象 启动子线程执行 func thread.start() 启动线程 — thread.join() 等待线程结束 主线程阻塞等子线程 threading.Lock() 互斥锁(防止数据竞争) with lock: … threading.RLock() 可重入锁(同一线程可多次获取) 递归函数中用 threading.Semaphore(n) 信号量(限流) 控制同时访问资源的线程数 threading.Condition() 条件变量(线程间通信) 生产者-消费者模式 threading.Event() 事件(信号灯) 一个线程通知多个线程 queue.Queue() 线程安全队列(数据共享) 多线程间传递数据 threading.current_thread() 获取当前线程 打印线程名/ID
  • 注意:多线程不适合 CPU 密集(如计算密集循环),用 multiprocessing 或 concurrent.futures.ThreadPoolExecutor(更高层抽象)。

2. 入门实例:简单多线程

场景:主线程 + 子线程并发打印消息,演示线程创建/启动/等待。

import threading
import time

def worker(name):
    print(f"线程 {name} 开始执行...")
    time.sleep(2)  # 模拟耗时任务
    print(f"线程 {name} 执行结束!")

# 创建线程
t1 = threading.Thread(target=worker, args=("子线程1",))
t2 = threading.Thread(target=worker, args=("子线程2",))

# 启动线程
t1.start()
t2.start()

print("主线程继续执行...")

# 等待子线程结束(可选,如果不 join,主线程可能先结束)
t1.join()
t2.join()

print("所有线程结束,主线程退出。")

运行输出(大致顺序,非严格):

线程 子线程1 开始执行...
线程 子线程2 开始执行...
主线程继续执行...
线程 子线程1 执行结束!
线程 子线程2 执行结束!
所有线程结束,主线程退出。

详解

  • target=worker:指定线程执行的函数。
  • args=("子线程1",):传参,必须是元组。
  • start():启动线程,立即返回(非阻塞)。
  • join():阻塞主线程,直到该线程结束。
  • 潜在问题:无共享数据,无需锁。但如果函数有异常,会静默失败(用 try-except 捕获)。
  • 优化:用 daemon=True 设置守护线程(主线程结束时自动杀子线程)。

3. 中级实例:线程间数据共享 + 锁(避免竞争)

场景:多个线程修改共享变量(计数器),无锁 vs 有锁对比。演示数据竞争问题。

import threading

counter = 0  # 共享变量

def increment(n):
    global counter
    for _ in range(n):
        counter += 1  # 非原子操作,可能竞争

# 无锁版本(会出错)
threads = []
for _ in range(10):
    t = threading.Thread(target=increment, args=(100_000,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print("无锁结果(预期 1,000,000):", counter)  # 实际 ~900,000+,丢失计数

# 有锁版本
counter = 0
lock = threading.Lock()

def increment_locked(n):
    global counter
    for _ in range(n):
        with lock:  # 获取锁,独占执行
            counter += 1

threads = []
for _ in range(10):
    t = threading.Thread(target=increment_locked, args=(100_000,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print("有锁结果(预期 1,000,000):", counter)  # 正确 1,000,000

详解

  • 数据竞争:GIL 虽保护解释器,但 counter += 1 是多步操作(读-改-写),线程切换时丢失。
  • Lock():互斥锁,with lock: 自动获取/释放。
  • 潜在问题:死锁(两个线程互等锁);性能瓶颈(锁粒度太大)。
  • 优化:用 RLock() 如果函数递归调用锁;最小化锁内代码。

4. 高级实例:生产者-消费者模式(Condition + Queue)

场景:生产者线程生成数据,消费者线程处理。演示线程通信 + 队列安全共享。

import threading
import queue
import time
import random

q = queue.Queue(maxsize=5)  # 线程安全队列,限 5 项

def producer():
    for i in range(10):
        item = random.randint(1, 100)
        q.put(item)  # 阻塞如果队列满
        print(f"生产者生产: {item} (队列大小: {q.qsize()})")
        time.sleep(random.uniform(0.5, 1.5))
    q.put(None)  # 结束信号

def consumer(name):
    while True:
        item = q.get()  # 阻塞如果队列空
        if item is None:
            print(f"{name} 收到结束信号")
            break
        print(f"{name} 消费: {item} (队列大小: {q.qsize()})")
        time.sleep(random.uniform(1, 2))

# 启动
p = threading.Thread(target=producer)
c1 = threading.Thread(target=consumer, args=("消费者1",))
c2 = threading.Thread(target=consumer, args=("消费者2",))

p.start()
c1.start()
c2.start()

p.join()
c1.join()
c2.join()

print("生产消费结束")

运行输出(示例):

生产者生产: 42 (队列大小: 1)
消费者1 消费: 42 (队列大小: 0)
生产者生产: 73 (队列大小: 1)
消费者2 消费: 73 (队列大小: 0)
... (继续直到 10 项)
消费者1 收到结束信号
消费者2 收到结束信号
生产消费结束

详解

  • Queue():线程安全 FIFO,先入先出。put()/get() 自动处理锁。
  • None 作为哨兵:通知消费者结束(多消费者时需多个哨兵或用 Event)。
  • 潜在问题:队列满/空时阻塞;如果生产慢消费快,可能饥饿。
  • 优化:用 queue.PriorityQueue() 优先级队列;或 LifoQueue 栈式。

5. 高级实例:限流 + 信号量(Semaphore)

场景:模拟数据库连接池,限制同时访问的线程数(e.g., 最多 3 个)。

import threading
import time

sem = threading.Semaphore(3)  # 最多 3 个线程同时访问

def access_db(name):
    print(f"{name} 尝试访问 DB...")
    with sem:  # 获取信号量(计数 -1,如果 0 则阻塞)
        print(f"{name} 访问 DB 中...")
        time.sleep(2)  # 模拟查询
    print(f"{name} 访问结束")

threads = []
for i in range(10):
    t = threading.Thread(target=access_db, args=(f"线程{i}",))
    threads.append(t)
    t.start()
    time.sleep(0.2)  # 错开启动

for t in threads:
    t.join()

运行输出(示例):

线程0 尝试访问 DB...
线程0 访问 DB 中...
线程1 尝试访问 DB...
线程1 访问 DB 中...
线程2 尝试访问 DB...
线程2 访问 DB 中...
线程3 尝试访问 DB...  # 阻塞,直到前一个释放
...

详解

  • Semaphore(n):计数器,初始 n。acquire() 减 1(0 时阻塞),release() 加 1。
  • with sem: 自动 acquire/release。
  • 潜在问题:忘记 release() 导致死锁;n 太小瓶颈大。
  • 优化:结合 Lock 保护共享资源;2026 年用 asyncio.Semaphore 异步版。

6. 高级实例:线程间通信(Condition + Event)

场景:一个线程等待另一个线程的信号(e.g., 下载完成通知处理)。

import threading
import time

cond = threading.Condition()  # 条件变量
event = threading.Event()     # 事件信号

def downloader():
    print("下载开始...")
    time.sleep(3)
    with cond:
        print("下载完成,通知等待者...")
        cond.notify_all()  # 通知所有等待线程
    event.set()  # 设置事件为 True

def processor(name):
    print(f"{name} 等待下载完成...")
    with cond:
        cond.wait()  # 阻塞直到 notify
    print(f"{name} 收到通知,开始处理...")

# 启动
d = threading.Thread(target=downloader)
p1 = threading.Thread(target=processor, args=("处理器1",))
p2 = threading.Thread(target=processor, args=("处理器2",))

d.start()
p1.start()
p2.start()

# 用 Event 额外等待
event.wait()  # 主线程等待事件
print("所有处理完成")

p1.join()
p2.join()
d.join()

运行输出

处理器1 等待下载完成...
处理器2 等待下载完成...
下载开始...
下载完成,通知等待者...
处理器1 收到通知,开始处理...
处理器2 收到通知,开始处理...
所有处理完成

详解

  • Condition():结合锁,用于 wait() / notify() / notify_all()。
  • Event():简单信号,set() 后 is_set() 为 True,所有 wait() 唤醒。
  • 潜在问题:Condition 需在 with cond 内使用;Event 一旦 set 不可 reset(用 clear())。
  • 优化:Condition 适合复杂同步,Event 适合简单通知。

7. 常见问题 & 最佳实践(2026 年视角)

  • GIL 影响:I/O 任务(如 API 调用)用多线程好;CPU 任务用 multiprocessing 或 concurrent.futures.ProcessPoolExecutor。
  • 线程安全:全局变量用锁;列表/字典用 queue 或 threading.local()(线程本地存储)。
  • 调试:用 threading.enumerate() 列所有线程;threading.settrace() 追踪。
  • 性能监控:用 timeit / cProfile 测速;psutil 查 CPU/内存。
  • 高级替代:asyncio(异步 I/O,更高效);concurrent.futures(线程/进程池,简化管理)。
  • 陷阱:不要在子线程改主线程 GUI(e.g., Tkinter);异常传播需手动处理。

做完这些实例,你基本掌握 threading 了!推荐实践:写一个多线程下载器(requests + ThreadPoolExecutor)。

你现在最想深入哪个部分?(e.g., 更多队列例子?asyncio 对比?)或者贴你的代码,我帮 debug~

文章已创建 3958

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

相关文章

开始在上面输入您的搜索词,然后按回车进行搜索。按ESC取消。

返回顶部