start = time.perf_counter() threads = [threading.Thread(target=io_task) for _ inrange(100)] for t in threads: t.start() for t in threads: t.join() end = time.perf_counter() print(f"100个IO任务: {end - start:.2f}s") # 比串行快很多!
时机2:字节码计数(Python 3.2+)
1 2 3 4 5 6 7 8 9
// 每执行 N 条字节码指令后,释放 GIL // 默认 N = 1000(Python 3.9+ 可通过配置)
threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls] # IO 时自动释放 GIL,线程并行效果好
1 2 3 4 5 6 7 8 9 10 11
# 方式 2:asyncio(推荐!) import asyncio
asyncdeffetch(session, url): asyncwith session.get(url) as resp: returnawait resp.text()
asyncdefmain(urls): asyncwith aiohttp.ClientSession() as session: tasks = [fetch(session, url) for url in urls] returnawait asyncio.gather(*tasks)
混合场景 → ProcessPoolExecutor
1 2 3 4 5 6 7
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
# CPU 密集用进程,IO 密集用线程 with ProcessPoolExecutor() as cpu_pool: with ThreadPoolExecutor() as io_pool: cpu_result = cpu_pool.submit(heavy_compute, data) io_result = io_pool.submit(fetch_data, url)