大家好,我是正在实战各种 AI 项目的程序员晚枫。
asyncio 让 Python 单线程处理数万并发连接成为可能。但很多人学 asyncio 总是感觉"懂了又没懂"——这是因为没有从事件循环的角度去理解它的本质。
本讲从底层原理出发,带你真正搞懂异步编程。
🔄 为什么需要异步? 传统同步代码中,IO 等待会阻塞整个线程:
1 2 3 4 5 6 7 8 同步模型(单线程): ─── 请求1 ─── [等待IO 1s] ─── 处理 ─── 请求2 ─── [等待IO 1s] ─── 处理 → 总耗时 2s+ 异步模型(单线程): ─── 请求1 ─── [发出IO请求] ───── [IO等待期间处理请求2] ───── [IO1返回] ─── 处理1 └──── 请求2 ─── [发出IO请求] ─── [IO2返回] ─── 处理2 → 总耗时 1s+
关键洞察:IO 等待期间,CPU 是空闲的 。异步编程就是充分利用这段空闲时间。
🏗️ 核心概念:事件循环与协程 协程(Coroutine) 协程是可以暂停和恢复执行的函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import asyncioasync def say_hello (name: str , delay: float ) -> str : """协程函数:用 async def 定义""" print (f"准备向 {name} 打招呼..." ) await asyncio.sleep(delay) print (f"Hello, {name} !" ) return f"Greeted {name} " coro = say_hello("Alice" , 1.0 ) print (type (coro)) result = asyncio.run(coro) print (result)
事件循环(Event Loop) 事件循环是 asyncio 的核心调度器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import asyncioasync def task1 (): print ("task1 开始" ) await asyncio.sleep(2 ) print ("task1 完成" ) return "result1" async def task2 (): print ("task2 开始" ) await asyncio.sleep(1 ) print ("task2 完成" ) return "result2" async def main (): start = asyncio.get_event_loop().time() r1, r2 = await asyncio.gather(task1(), task2()) elapsed = asyncio.get_event_loop().time() - start print (f"结果:{r1} , {r2} ,耗时:{elapsed:.2 f} s" ) asyncio.run(main())
🚀 asyncio 核心 API asyncio.gather:并发执行多个协程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import asyncioimport aiohttpimport timeasync def fetch (session: aiohttp.ClientSession, url: str ) -> dict : """异步 HTTP 请求""" async with session.get(url) as response: data = await response.json() return {"url" : url, "status" : response.status, "data" : data} async def fetch_all (urls: list [str ] ) -> list [dict ]: async with aiohttp.ClientSession() as session: tasks = [fetch(session, url) for url in urls] results = await asyncio.gather(*tasks, return_exceptions=True ) return results urls = [f"https://jsonplaceholder.typicode.com/posts/{i} " for i in range (1 , 11 )] start = time.time() results = asyncio.run(fetch_all(urls)) print (f"并发请求 {len (urls)} 个 URL 耗时:{time.time()-start:.2 f} s" )
asyncio.create_task:更细粒度的任务控制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import asyncioasync def background_task (name: str , delay: float ) -> str : await asyncio.sleep(delay) return f"{name} done" async def main (): task1 = asyncio.create_task(background_task("A" , 2.0 ), name="task-A" ) task2 = asyncio.create_task(background_task("B" , 1.0 ), name="task-B" ) print ("任务已启动,做些其他事情..." ) await asyncio.sleep(0.5 ) result2 = await task2 print (f"task2 先完成:{result2} " ) result1 = await task1 print (f"task1 完成:{result1} " ) asyncio.run(main())
asyncio.wait:等待部分完成 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import asyncioasync def task (n: int ) -> int : await asyncio.sleep(n * 0.5 ) if n == 3 : raise ValueError(f"Task {n} failed!" ) return n * 2 async def main (): tasks = {asyncio.create_task(task(i), name=f"task-{i} " ) for i in range (1 , 6 )} done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) print (f"最先完成的:{[t.result() for t in done]} " ) for t in pending: t.cancel() tasks2 = {asyncio.create_task(task(i)) for i in range (1 , 5 )} done, _ = await asyncio.wait(tasks2, return_when=asyncio.ALL_COMPLETED) for t in done: if t.exception(): print (f"任务失败:{t.exception()} " ) else : print (f"任务成功:{t.result()} " ) asyncio.run(main())
🌐 实战:高并发爬虫 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import asyncioimport aiohttpfrom typing import Optional import reclass AsyncCrawler : """高并发异步爬虫""" def __init__ (self, max_concurrent: int = 10 ): self.max_concurrent = max_concurrent self.semaphore = asyncio.Semaphore(max_concurrent) self.results: list [dict ] = [] async def fetch_one ( self, session: aiohttp.ClientSession, url: str ) -> Optional [dict ]: async with self.semaphore: try : async with session.get(url, timeout=aiohttp.ClientTimeout(total=10 )) as response: if response.status == 200 : text = await response.text() return { "url" : url, "status" : response.status, "length" : len (text), } return {"url" : url, "status" : response.status, "length" : 0 } except asyncio.TimeoutError: print (f"超时:{url} " ) return None except Exception as e: print (f"错误 {url} :{e} " ) return None async def crawl (self, urls: list [str ] ) -> list [dict ]: async with aiohttp.ClientSession( headers={"User-Agent" : "AsyncCrawler/1.0" } ) as session: tasks = [self.fetch_one(session, url) for url in urls] results = await asyncio.gather(*tasks, return_exceptions=False ) return [r for r in results if r is not None ] async def main (): urls = [f"https://httpbin.org/delay/{i % 3 } " for i in range (20 )] crawler = AsyncCrawler(max_concurrent=5 ) import time start = time.time() results = await crawler.crawl(urls) elapsed = time.time() - start print (f"抓取 {len (results)} /{len (urls)} 个 URL" ) print (f"耗时:{elapsed:.2 f} s" ) asyncio.run(main())
🔒 异步同步原语 asyncio.Lock:异步锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import asyncioclass AsyncCounter : def __init__ (self ): self.value = 0 self._lock = asyncio.Lock() async def increment (self ): async with self._lock: value = self.value await asyncio.sleep(0 ) self.value = value + 1 async def main (): counter = AsyncCounter() tasks = [asyncio.create_task(counter.increment()) for _ in range (1000 )] await asyncio.gather(*tasks) print (counter.value) asyncio.run(main())
asyncio.Queue:生产者消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import asyncioimport randomasync def producer (queue: asyncio.Queue, n: int ) -> None : """生产者:放入数据""" for i in range (n): item = random.randint(1 , 100 ) await queue.put(item) print (f"生产:{item} " ) await asyncio.sleep(0.1 ) await queue.put(None ) async def consumer (queue: asyncio.Queue, worker_id: int ) -> None : """消费者:处理数据""" while True : item = await queue.get() if item is None : await queue.put(None ) break print (f"Worker-{worker_id} 处理:{item} " ) await asyncio.sleep(0.2 ) queue.task_done() async def main (): queue = asyncio.Queue(maxsize=5 ) await asyncio.gather( producer(queue, 10 ), consumer(queue, 1 ), consumer(queue, 2 ), consumer(queue, 3 ), ) asyncio.run(main())
🔗 同步代码与异步代码的桥接 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import asynciofrom concurrent.futures import ThreadPoolExecutorimport timedef sync_heavy_task (n: int ) -> int : """无法改造的同步阻塞函数(比如第三方库)""" time.sleep(1 ) return n * 2 async def main (): loop = asyncio.get_event_loop() with ThreadPoolExecutor() as executor: tasks = [ loop.run_in_executor(executor, sync_heavy_task, i) for i in range (5 ) ] results = await asyncio.gather(*tasks) print (results) asyncio.run(main())
📊 多线程 vs asyncio 对比 对比维度 多线程(threading) asyncio 并发模型 抢占式(OS 调度) 协作式(主动让出) 内存开销 每个线程约 8MB 栈 协程极小(几 KB) 适用规模 数百个线程 数万个协程 共享状态 需要 Lock(复杂) 单线程,较少竞态 调试难度 竞态条件难排查 更可预测 生态支持 所有库都支持 需要 async 版本库
⚠️ 常见陷阱 1. 在协程中调用阻塞函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import asyncioimport timeimport requests async def bad_fetch (url ): response = requests.get(url) return response.text async def good_fetch (url ): import aiohttp async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text()
2. 忘记 await 1 2 3 4 5 6 async def main (): result = some_coroutine() result = await some_coroutine()
3. 在非 async 函数中调用 async 函数 1 2 3 4 5 6 7 8 9 10 11 12 async def async_func (): return 42 def normal_func (): result = await async_func() def normal_func (): result = asyncio.run(async_func())
🎯 本讲总结 事件循环 :asyncio 的核心调度器,单线程中交替执行协程,靠协程主动 await 让出控制权。
协程 vs 线程 :协程更轻量(万级并发)、更可预测(无竞态条件);但必须全栈 async,不能调用阻塞函数。
核心 API :
asyncio.run() —— 入口,运行顶层协程asyncio.gather() —— 并发执行,等待全部完成asyncio.create_task() —— 立即开始但不阻塞当前协程asyncio.Semaphore —— 限制并发数量桥接同步代码 :用 loop.run_in_executor() 在线程池中运行同步阻塞函数,避免阻塞事件循环。
选择原则 :IO 密集且并发数高(>数百)→ asyncio;IO 密集并发适中 → 多线程也可以;有大量已有同步代码 → 多线程更容易迁移。
📚 推荐教材 《Python 编程从入门到实践(第 3 版)》 | 《流畅的 Python(第 2 版)》 | 《CPython 设计与实现》
学习路线: 零基础 → 《从入门到实践》 → 《流畅的 Python》 → 本门课程 → 《CPython 设计与实现》
🎓 加入《流畅的 Python》直播共读营 学到这里,如果你想系统吃透这本书——欢迎加入我的直播共读课。
每周直播精讲,逐章拆解核心知识点 专属学习群,随时答疑交流 试运营特惠:499 元 → 299 元 👉 【立即报名《流畅的 Python》共读课】 :https://mp.weixin.qq.com/s/ivHJwn1nNx5ug4TFrapvGg
🔗 课程导航 ← 上一讲:并发编程模型 | 下一讲:性能优化 →
💬 联系我 主营业务 :AI 编程培训、企业内训、技术咨询
🎓 AI 编程实战课程 想系统学习 AI 编程?程序员晚枫的 AI 编程实战课 帮你从零上手!