Python threading 模块
Python 的 threading
模块是用于实现多线程编程的标准库之一。多线程允许程序在同一时间内执行多个任务,从而提高程序的效率和响应速度。
threading
模块提供了创建和管理线程的工具,使得开发者可以轻松地编写并发程序。
为什么使用多线程?
在单线程程序中,任务是一个接一个地顺序执行的。如果某个任务需要等待(例如等待网络响应或文件读取),整个程序会被阻塞,直到该任务完成。而多线程可以让程序在等待某个任务的同时,继续执行其他任务,从而提高程序的整体性能。
如何使用 threading 模块?
1. 创建线程
在 Python 中,可以通过继承 threading.Thread
类或直接使用 threading.Thread
构造函数来创建线程。
方法 1:继承 threading.Thread
类
实例
import threading
class MyThread(threading.Thread):
def run(self):
print("线程开始执行")
# 在这里编写线程要执行的代码
print("线程执行结束")
# 创建线程实例
thread = MyThread()
# 启动线程
thread.start()
# 等待线程执行完毕
thread.join()
print("主线程结束")
class MyThread(threading.Thread):
def run(self):
print("线程开始执行")
# 在这里编写线程要执行的代码
print("线程执行结束")
# 创建线程实例
thread = MyThread()
# 启动线程
thread.start()
# 等待线程执行完毕
thread.join()
print("主线程结束")
方法 2:使用 threading.Thread
构造函数
实例
import threading
def my_function():
print("线程开始执行")
# 在这里编写线程要执行的代码
print("线程执行结束")
# 创建线程实例
thread = threading.Thread(target=my_function)
# 启动线程
thread.start()
# 等待线程执行完毕
thread.join()
print("主线程结束")
def my_function():
print("线程开始执行")
# 在这里编写线程要执行的代码
print("线程执行结束")
# 创建线程实例
thread = threading.Thread(target=my_function)
# 启动线程
thread.start()
# 等待线程执行完毕
thread.join()
print("主线程结束")
2. 线程同步
在多线程编程中,多个线程可能会同时访问共享资源,这可能导致数据不一致的问题。为了避免这种情况,可以使用线程同步机制,如锁(Lock
)。
实例
import threading
# 创建一个锁对象
lock = threading.Lock()
def my_function():
with lock:
print("线程开始执行")
# 在这里编写线程要执行的代码
print("线程执行结束")
# 创建线程实例
thread1 = threading.Thread(target=my_function)
thread2 = threading.Thread(target=my_function)
# 启动线程
thread1.start()
thread2.start()
# 等待线程执行完毕
thread1.join()
thread2.join()
print("主线程结束")
# 创建一个锁对象
lock = threading.Lock()
def my_function():
with lock:
print("线程开始执行")
# 在这里编写线程要执行的代码
print("线程执行结束")
# 创建线程实例
thread1 = threading.Thread(target=my_function)
thread2 = threading.Thread(target=my_function)
# 启动线程
thread1.start()
thread2.start()
# 等待线程执行完毕
thread1.join()
thread2.join()
print("主线程结束")
3. 线程间通信
线程间通信可以通过队列(Queue
)来实现。Queue
是线程安全的,可以在多个线程之间安全地传递数据。
实例
import threading
import queue
def worker(q):
while not q.empty():
item = q.get()
print(f"处理项目: {item}")
q.task_done()
# 创建一个队列并填充数据
q = queue.Queue()
for i in range(10):
q.put(i)
# 创建线程实例
thread1 = threading.Thread(target=worker, args=(q,))
thread2 = threading.Thread(target=worker, args=(q,))
# 启动线程
thread1.start()
thread2.start()
# 等待队列中的所有项目被处理完毕
q.join()
print("所有项目处理完毕")
import queue
def worker(q):
while not q.empty():
item = q.get()
print(f"处理项目: {item}")
q.task_done()
# 创建一个队列并填充数据
q = queue.Queue()
for i in range(10):
q.put(i)
# 创建线程实例
thread1 = threading.Thread(target=worker, args=(q,))
thread2 = threading.Thread(target=worker, args=(q,))
# 启动线程
thread1.start()
thread2.start()
# 等待队列中的所有项目被处理完毕
q.join()
print("所有项目处理完毕")
常用类、方法及属性
1. 核心类
类/方法/属性 | 说明 | 示例 |
---|---|---|
threading.Thread | 线程类,用于创建和管理线程 | t = Thread(target=func, args=(1,)) |
threading.Lock | 互斥锁(原始锁) | lock = Lock() |
threading.RLock | 可重入锁(同一线程可多次获取) | rlock = RLock() |
threading.Event | 事件对象,用于线程同步 | event = Event() |
threading.Condition | 条件变量,用于复杂线程协调 | cond = Condition() |
threading.Semaphore | 信号量,控制并发线程数 | sem = Semaphore(3) |
threading.BoundedSemaphore | 有界信号量(防止计数超过初始值) | b_sem = BoundedSemaphore(2) |
threading.Timer | 定时器线程,延迟执行 | timer = Timer(5.0, func) |
threading.local | 线程局部数据(各线程独立存储) | local_data = threading.local() |
2. Thread 对象常用方法/属性
方法/属性 | 说明 | 示例 |
---|---|---|
start() | 启动线程 | t.start() |
run() | 线程执行的方法(可重写) | 自定义类时覆盖此方法 |
join(timeout=None) | 阻塞当前线程,直到目标线程结束 | t.join() |
is_alive() | 检查线程是否在运行 | if t.is_alive(): |
name | 线程名称(可修改) | t.name = "Worker-1" |
daemon | 守护线程标志(主线程退出时自动结束) | t.daemon = True |
ident | 线程标识符(未启动时为 None ) | print(t.ident) |
3. Lock/RLock 常用方法
方法 | 说明 | 示例 |
---|---|---|
acquire(blocking=True, timeout=-1) | 获取锁(阻塞或非阻塞) | lock.acquire() |
release() | 释放锁 | lock.release() |
locked() | 检查锁是否被占用 | if not lock.locked(): |
4. Event 常用方法
方法 | 说明 | 示例 |
---|---|---|
set() | 设置事件为真,唤醒所有等待线程 | event.set() |
clear() | 重置事件为假 | event.clear() |
wait(timeout=None) | 阻塞直到事件为真或超时 | event.wait(2.0) |
is_set() | 检查事件状态 | if event.is_set(): |
5. Condition 常用方法
方法 | 说明 | 示例 |
---|---|---|
wait(timeout=None) | 释放锁并阻塞,直到被通知或超时 | cond.wait() |
notify(n=1) | 唤醒最多 n 个等待线程 | cond.notify(2) |
notify_all() | 唤醒所有等待线程 | cond.notify_all() |
6. 模块级函数/属性
函数/属性 | 说明 | 示例 |
---|---|---|
threading.active_count() | 返回当前活跃线程数 | print(threading.active_count()) |
threading.current_thread() | 返回当前线程对象 | print(threading.current_thread().name) |
threading.enumerate() | 返回所有活跃线程的列表 | for t in threading.enumerate(): |
threading.main_thread() | 返回主线程对象 | if threading.current_thread() is threading.main_thread(): |
threading.get_ident() | 返回当前线程的标识符(Python 3.3+) | print(threading.get_ident()) |
实例
1. 基础线程创建
实例
import threading
def worker(num):
print(f"Worker {num} started")
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
def worker(num):
print(f"Worker {num} started")
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
2. 使用锁保护共享资源
实例
lock = threading.Lock()
count = 0
def increment():
global count
with lock: # 自动获取和释放锁
count += 1
threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(count) # 输出: 10
count = 0
def increment():
global count
with lock: # 自动获取和释放锁
count += 1
threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(count) # 输出: 10
3. 事件同步
实例
event = threading.Event()
def waiter():
print("Waiting for event...")
event.wait()
print("Event triggered!")
t = threading.Thread(target=waiter)
t.start()
# 主线程触发事件
threading.Event().wait(2.0) # 模拟延迟
event.set()
t.join()
def waiter():
print("Waiting for event...")
event.wait()
print("Event triggered!")
t = threading.Thread(target=waiter)
t.start()
# 主线程触发事件
threading.Event().wait(2.0) # 模拟延迟
event.set()
t.join()
4. 生产者-消费者模型(Condition)
实例
import random
from threading import Condition
queue = []
cond = Condition()
MAX_ITEMS = 5
def producer():
for _ in range(10):
with cond:
while len(queue) >= MAX_ITEMS:
cond.wait()
item = random.randint(1, 100)
queue.append(item)
print(f"Produced {item}")
cond.notify()
def consumer():
for _ in range(10):
with cond:
while not queue:
cond.wait()
item = queue.pop(0)
print(f"Consumed {item}")
cond.notify()
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()
from threading import Condition
queue = []
cond = Condition()
MAX_ITEMS = 5
def producer():
for _ in range(10):
with cond:
while len(queue) >= MAX_ITEMS:
cond.wait()
item = random.randint(1, 100)
queue.append(item)
print(f"Produced {item}")
cond.notify()
def consumer():
for _ in range(10):
with cond:
while not queue:
cond.wait()
item = queue.pop(0)
print(f"Consumed {item}")
cond.notify()
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()
注意事项
全局解释器锁(GIL):Python 的 GIL 会限制同一时间只有一个线程执行 Python 字节码。因此,在 CPU 密集型任务中,多线程可能不会带来性能提升。对于 I/O 密集型任务,多线程仍然是有益的。
线程安全:在多线程环境中,确保对共享资源的访问是线程安全的,避免数据竞争和死锁。
线程数量:创建过多的线程可能会导致系统资源耗尽,影响程序性能。合理控制线程数量,或使用线程池(
ThreadPoolExecutor
)来管理线程。
点我分享笔记
笔记需要是本篇文章的内容扩展!文章投稿,可点击这里
注册邀请码获取方式
分享笔记前必须登录!
注册邀请码获取方式
-->