Python threading 模块

Python 的 threading 模块是用于实现多线程编程的标准库之一。多线程允许程序在同一时间内执行多个任务,从而提高程序的效率和响应速度。

threading 模块提供了创建和管理线程的工具,使得开发者可以轻松地编写并发程序。

为什么使用多线程?

在单线程程序中,任务是一个接一个地顺序执行的。如果某个任务需要等待(例如等待网络响应或文件读取),整个程序会被阻塞,直到该任务完成。而多线程可以让程序在等待某个任务的同时,继续执行其他任务,从而提高程序的整体性能。


如何使用 threading 模块?

1. 创建线程

在 Python 中,可以通过继承 threading.Thread 类或直接使用 threading.Thread 构造函数来创建线程。

方法 1:继承 threading.Thread

实例

import threading

class MyThread(threading.Thread):
    def run(self):
        print("线程开始执行")
        # 在这里编写线程要执行的代码
        print("线程执行结束")

# 创建线程实例
thread = MyThread()
# 启动线程
thread.start()
# 等待线程执行完毕
thread.join()
print("主线程结束")

方法 2:使用 threading.Thread 构造函数

实例

import threading

def my_function():
    print("线程开始执行")
    # 在这里编写线程要执行的代码
    print("线程执行结束")

# 创建线程实例
thread = threading.Thread(target=my_function)
# 启动线程
thread.start()
# 等待线程执行完毕
thread.join()
print("主线程结束")

2. 线程同步

在多线程编程中,多个线程可能会同时访问共享资源,这可能导致数据不一致的问题。为了避免这种情况,可以使用线程同步机制,如锁(Lock)。

实例

import threading

# 创建一个锁对象
lock = threading.Lock()

def my_function():
    with lock:
        print("线程开始执行")
        # 在这里编写线程要执行的代码
        print("线程执行结束")

# 创建线程实例
thread1 = threading.Thread(target=my_function)
thread2 = threading.Thread(target=my_function)
# 启动线程
thread1.start()
thread2.start()
# 等待线程执行完毕
thread1.join()
thread2.join()
print("主线程结束")

3. 线程间通信

线程间通信可以通过队列(Queue)来实现。Queue 是线程安全的,可以在多个线程之间安全地传递数据。

实例

import threading
import queue

def worker(q):
    while not q.empty():
        item = q.get()
        print(f"处理项目: {item}")
        q.task_done()

# 创建一个队列并填充数据
q = queue.Queue()
for i in range(10):
    q.put(i)

# 创建线程实例
thread1 = threading.Thread(target=worker, args=(q,))
thread2 = threading.Thread(target=worker, args=(q,))
# 启动线程
thread1.start()
thread2.start()
# 等待队列中的所有项目被处理完毕
q.join()
print("所有项目处理完毕")

常用类、方法及属性

1. 核心类

类/方法/属性说明示例
threading.Thread线程类,用于创建和管理线程t = Thread(target=func, args=(1,))
threading.Lock互斥锁(原始锁)lock = Lock()
threading.RLock可重入锁(同一线程可多次获取)rlock = RLock()
threading.Event事件对象,用于线程同步event = Event()
threading.Condition条件变量,用于复杂线程协调cond = Condition()
threading.Semaphore信号量,控制并发线程数sem = Semaphore(3)
threading.BoundedSemaphore有界信号量(防止计数超过初始值)b_sem = BoundedSemaphore(2)
threading.Timer定时器线程,延迟执行timer = Timer(5.0, func)
threading.local线程局部数据(各线程独立存储)local_data = threading.local()

2. Thread 对象常用方法/属性

方法/属性说明示例
start()启动线程t.start()
run()线程执行的方法(可重写)自定义类时覆盖此方法
join(timeout=None)阻塞当前线程,直到目标线程结束t.join()
is_alive()检查线程是否在运行if t.is_alive():
name线程名称(可修改)t.name = "Worker-1"
daemon守护线程标志(主线程退出时自动结束)t.daemon = True
ident线程标识符(未启动时为 Noneprint(t.ident)

3. Lock/RLock 常用方法

方法说明示例
acquire(blocking=True, timeout=-1)获取锁(阻塞或非阻塞)lock.acquire()
release()释放锁lock.release()
locked()检查锁是否被占用if not lock.locked():

4. Event 常用方法

方法说明示例
set()设置事件为真,唤醒所有等待线程event.set()
clear()重置事件为假event.clear()
wait(timeout=None)阻塞直到事件为真或超时event.wait(2.0)
is_set()检查事件状态if event.is_set():

5. Condition 常用方法

方法说明示例
wait(timeout=None)释放锁并阻塞,直到被通知或超时cond.wait()
notify(n=1)唤醒最多 n 个等待线程cond.notify(2)
notify_all()唤醒所有等待线程cond.notify_all()

6. 模块级函数/属性

函数/属性说明示例
threading.active_count()返回当前活跃线程数print(threading.active_count())
threading.current_thread()返回当前线程对象print(threading.current_thread().name)
threading.enumerate()返回所有活跃线程的列表for t in threading.enumerate():
threading.main_thread()返回主线程对象if threading.current_thread() is threading.main_thread():
threading.get_ident()返回当前线程的标识符(Python 3.3+)print(threading.get_ident())

实例

1. 基础线程创建

实例

import threading

def worker(num):
    print(f"Worker {num} started")

threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

2. 使用锁保护共享资源

实例

lock = threading.Lock()
count = 0

def increment():
    global count
    with lock:  # 自动获取和释放锁
        count += 1

threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(count)  # 输出: 10

3. 事件同步

实例

event = threading.Event()

def waiter():
    print("Waiting for event...")
    event.wait()
    print("Event triggered!")

t = threading.Thread(target=waiter)
t.start()

# 主线程触发事件
threading.Event().wait(2.0)  # 模拟延迟
event.set()
t.join()

4. 生产者-消费者模型(Condition)

实例

import random
from threading import Condition

queue = []
cond = Condition()
MAX_ITEMS = 5

def producer():
    for _ in range(10):
        with cond:
            while len(queue) >= MAX_ITEMS:
                cond.wait()
            item = random.randint(1, 100)
            queue.append(item)
            print(f"Produced {item}")
            cond.notify()

def consumer():
    for _ in range(10):
        with cond:
            while not queue:
                cond.wait()
            item = queue.pop(0)
            print(f"Consumed {item}")
            cond.notify()

threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()

注意事项

  1. 全局解释器锁(GIL):Python 的 GIL 会限制同一时间只有一个线程执行 Python 字节码。因此,在 CPU 密集型任务中,多线程可能不会带来性能提升。对于 I/O 密集型任务,多线程仍然是有益的。

  2. 线程安全:在多线程环境中,确保对共享资源的访问是线程安全的,避免数据竞争和死锁。

  3. 线程数量:创建过多的线程可能会导致系统资源耗尽,影响程序性能。合理控制线程数量,或使用线程池(ThreadPoolExecutor)来管理线程。