第 18 讲:Python 异步编程完全指南 | asyncio、async/await、事件循环与高并发实战

大家好,我是正在实战各种 AI 项目的程序员晚枫。

asyncio 让 Python 单线程处理数万并发连接成为可能。但很多人学 asyncio 总是感觉"懂了又没懂"——这是因为没有从事件循环的角度去理解它的本质。

本讲从底层原理出发,带你真正搞懂异步编程。


🔄 为什么需要异步?

传统同步代码中,IO 等待会阻塞整个线程:

1
2
3
4
5
6
7
8
同步模型(单线程):
─── 请求1 ─── [等待IO 1s] ─── 处理 ─── 请求2 ─── [等待IO 1s] ─── 处理
→ 总耗时 2s+

异步模型(单线程):
─── 请求1 ─── [发出IO请求] ───── [IO等待期间处理请求2] ───── [IO1返回] ─── 处理1
└──── 请求2 ─── [发出IO请求] ─── [IO2返回] ─── 处理2
→ 总耗时 1s+

关键洞察:IO 等待期间,CPU 是空闲的。异步编程就是充分利用这段空闲时间。


🏗️ 核心概念:事件循环与协程

协程(Coroutine)

协程是可以暂停和恢复执行的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio

async def say_hello(name: str, delay: float) -> str:
"""协程函数:用 async def 定义"""
print(f"准备向 {name} 打招呼...")
await asyncio.sleep(delay) # 让出控制权,暂停这里
print(f"Hello, {name}!")
return f"Greeted {name}"

# 协程对象不会自动执行
coro = say_hello("Alice", 1.0)
print(type(coro)) # <class 'coroutine'>

# 必须通过事件循环运行
result = asyncio.run(coro)
print(result) # Greeted Alice

事件循环(Event Loop)

事件循环是 asyncio 的核心调度器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import asyncio

async def task1():
print("task1 开始")
await asyncio.sleep(2) # 暂停,交出控制权
print("task1 完成")
return "result1"

async def task2():
print("task2 开始")
await asyncio.sleep(1) # 暂停,交出控制权
print("task2 完成")
return "result2"

async def main():
# 并发运行:两个任务交替执行
start = asyncio.get_event_loop().time()

# 顺序执行(慢):约 3 秒
# r1 = await task1()
# r2 = await task2()

# 并发执行(快):约 2 秒
r1, r2 = await asyncio.gather(task1(), task2())

elapsed = asyncio.get_event_loop().time() - start
print(f"结果:{r1}, {r2},耗时:{elapsed:.2f}s")

asyncio.run(main())
# task1 开始
# task2 开始
# task2 完成 ← 1 秒后
# task1 完成 ← 2 秒后
# 结果:result1, result2,耗时:2.00s

🚀 asyncio 核心 API

asyncio.gather:并发执行多个协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
import aiohttp
import time

async def fetch(session: aiohttp.ClientSession, url: str) -> dict:
"""异步 HTTP 请求"""
async with session.get(url) as response:
data = await response.json()
return {"url": url, "status": response.status, "data": data}

async def fetch_all(urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
# gather 并发执行,保持顺序
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results

# 并发请求 100 个 URL,约等于请求单个 URL 的时间
urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 11)]

start = time.time()
results = asyncio.run(fetch_all(urls))
print(f"并发请求 {len(urls)} 个 URL 耗时:{time.time()-start:.2f}s")

asyncio.create_task:更细粒度的任务控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio

async def background_task(name: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"{name} done"

async def main():
# create_task 立即开始执行(不等 await)
task1 = asyncio.create_task(background_task("A", 2.0), name="task-A")
task2 = asyncio.create_task(background_task("B", 1.0), name="task-B")

print("任务已启动,做些其他事情...")
await asyncio.sleep(0.5)

# 等待特定任务
result2 = await task2
print(f"task2 先完成:{result2}")

result1 = await task1
print(f"task1 完成:{result1}")

asyncio.run(main())

asyncio.wait:等待部分完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio

async def task(n: int) -> int:
await asyncio.sleep(n * 0.5)
if n == 3:
raise ValueError(f"Task {n} failed!")
return n * 2

async def main():
tasks = {asyncio.create_task(task(i), name=f"task-{i}") for i in range(1, 6)}

# 等待任意一个完成
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f"最先完成的:{[t.result() for t in done]}")

# 取消剩余任务
for t in pending:
t.cancel()

# 等待所有任务(包括异常)
tasks2 = {asyncio.create_task(task(i)) for i in range(1, 5)}
done, _ = await asyncio.wait(tasks2, return_when=asyncio.ALL_COMPLETED)
for t in done:
if t.exception():
print(f"任务失败:{t.exception()}")
else:
print(f"任务成功:{t.result()}")

asyncio.run(main())

🌐 实战:高并发爬虫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import asyncio
import aiohttp
from typing import Optional
import re

class AsyncCrawler:
"""高并发异步爬虫"""

def __init__(self, max_concurrent: int = 10):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent) # 限制并发数
self.results: list[dict] = []

async def fetch_one(
self,
session: aiohttp.ClientSession,
url: str
) -> Optional[dict]:
async with self.semaphore: # 控制并发量,避免服务器过载
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
text = await response.text()
return {
"url": url,
"status": response.status,
"length": len(text),
}
return {"url": url, "status": response.status, "length": 0}
except asyncio.TimeoutError:
print(f"超时:{url}")
return None
except Exception as e:
print(f"错误 {url}{e}")
return None

async def crawl(self, urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession(
headers={"User-Agent": "AsyncCrawler/1.0"}
) as session:
tasks = [self.fetch_one(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=False)
return [r for r in results if r is not None]


async def main():
urls = [f"https://httpbin.org/delay/{i % 3}" for i in range(20)]

crawler = AsyncCrawler(max_concurrent=5)

import time
start = time.time()
results = await crawler.crawl(urls)
elapsed = time.time() - start

print(f"抓取 {len(results)}/{len(urls)} 个 URL")
print(f"耗时:{elapsed:.2f}s")

asyncio.run(main())

🔒 异步同步原语

asyncio.Lock:异步锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio

class AsyncCounter:
def __init__(self):
self.value = 0
self._lock = asyncio.Lock()

async def increment(self):
async with self._lock:
# 临界区:同时只有一个协程能执行
value = self.value
await asyncio.sleep(0) # 模拟一点延迟
self.value = value + 1

async def main():
counter = AsyncCounter()
tasks = [asyncio.create_task(counter.increment()) for _ in range(1000)]
await asyncio.gather(*tasks)
print(counter.value) # 1000(有锁保证正确)

asyncio.run(main())

asyncio.Queue:生产者消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import asyncio
import random

async def producer(queue: asyncio.Queue, n: int) -> None:
"""生产者:放入数据"""
for i in range(n):
item = random.randint(1, 100)
await queue.put(item)
print(f"生产:{item}")
await asyncio.sleep(0.1)

# 发送结束信号
await queue.put(None)

async def consumer(queue: asyncio.Queue, worker_id: int) -> None:
"""消费者:处理数据"""
while True:
item = await queue.get()
if item is None:
await queue.put(None) # 传递结束信号给其他消费者
break

print(f"Worker-{worker_id} 处理:{item}")
await asyncio.sleep(0.2) # 模拟处理
queue.task_done()

async def main():
queue = asyncio.Queue(maxsize=5) # 限制队列大小

# 1 个生产者,3 个消费者
await asyncio.gather(
producer(queue, 10),
consumer(queue, 1),
consumer(queue, 2),
consumer(queue, 3),
)

asyncio.run(main())

🔗 同步代码与异步代码的桥接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time

def sync_heavy_task(n: int) -> int:
"""无法改造的同步阻塞函数(比如第三方库)"""
time.sleep(1) # 阻塞 IO
return n * 2

async def main():
loop = asyncio.get_event_loop()

# 在线程池中运行同步阻塞函数,不阻塞事件循环
with ThreadPoolExecutor() as executor:
tasks = [
loop.run_in_executor(executor, sync_heavy_task, i)
for i in range(5)
]
results = await asyncio.gather(*tasks)
print(results) # [0, 2, 4, 6, 8],约 1 秒完成

asyncio.run(main())

📊 多线程 vs asyncio 对比

对比维度多线程(threading)asyncio
并发模型抢占式(OS 调度)协作式(主动让出)
内存开销每个线程约 8MB 栈协程极小(几 KB)
适用规模数百个线程数万个协程
共享状态需要 Lock(复杂)单线程,较少竞态
调试难度竞态条件难排查更可预测
生态支持所有库都支持需要 async 版本库

⚠️ 常见陷阱

1. 在协程中调用阻塞函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio
import time
import requests # 同步 HTTP 库

async def bad_fetch(url):
# ❌ 这会阻塞整个事件循环!
response = requests.get(url) # 阻塞 1 秒,其他协程都无法运行
return response.text

async def good_fetch(url):
# ✅ 用 aiohttp(异步版本)
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()

2. 忘记 await

1
2
3
4
5
6
async def main():
# ❌ 创建了协程对象,但没有 await,什么都不会发生
result = some_coroutine()

# ✅ 必须 await
result = await some_coroutine()

3. 在非 async 函数中调用 async 函数

1
2
3
4
5
6
7
8
9
10
11
12
async def async_func():
return 42

# ❌ 不能在普通函数中直接 await
def normal_func():
result = await async_func() # SyntaxError!

# ✅ 方法1:用 asyncio.run
def normal_func():
result = asyncio.run(async_func())

# ✅ 方法2:用 run_coroutine_threadsafe(在已有事件循环的情况下)

🎯 本讲总结

事件循环:asyncio 的核心调度器,单线程中交替执行协程,靠协程主动 await 让出控制权。

协程 vs 线程:协程更轻量(万级并发)、更可预测(无竞态条件);但必须全栈 async,不能调用阻塞函数。

核心 API

  • asyncio.run() —— 入口,运行顶层协程
  • asyncio.gather() —— 并发执行,等待全部完成
  • asyncio.create_task() —— 立即开始但不阻塞当前协程
  • asyncio.Semaphore —— 限制并发数量

桥接同步代码:用 loop.run_in_executor() 在线程池中运行同步阻塞函数,避免阻塞事件循环。

选择原则:IO 密集且并发数高(>数百)→ asyncio;IO 密集并发适中 → 多线程也可以;有大量已有同步代码 → 多线程更容易迁移。


📚 推荐教材

《Python 编程从入门到实践(第 3 版)》 | 《流畅的 Python(第 2 版)》 | 《CPython 设计与实现》

学习路线: 零基础 → 《从入门到实践》 → 《流畅的 Python》 → 本门课程 → 《CPython 设计与实现》


🎓 加入《流畅的 Python》直播共读营

学到这里,如果你想系统吃透这本书——欢迎加入我的直播共读课。

  • 每周直播精讲,逐章拆解核心知识点
  • 专属学习群,随时答疑交流
  • 试运营特惠:499 元299 元

👉 【立即报名《流畅的 Python》共读课】https://mp.weixin.qq.com/s/ivHJwn1nNx5ug4TFrapvGg

🔗 课程导航

上一讲:并发编程模型 | 下一讲:性能优化


💬 联系我

平台账号/链接
微信扫码加好友
B 站Python 自动化办公社区

主营业务:AI 编程培训、企业内训、技术咨询

🎓 AI 编程实战课程

想系统学习 AI 编程?程序员晚枫的 AI 编程实战课 帮你从零上手!

fluent-python.png