第 9 章:Python 异步编程
异步编程是现代 Python 开发的核心技能之一。对于前端开发者来说,JavaScript 中的 async/await 早已是日常工具。本章将从 JS 异步模型出发,深入讲解 Python asyncio 的工作机制、核心 API 和实战技巧。
9.1 异步编程为什么重要
同步编程最大的瓶颈是 I/O 等待——网络请求、文件读写、数据库查询等操作90%的时间都在等待外部响应。异步编程让程序在等待期间切换到其他任务,极大提升并发能力。
同步(顺序执行):
┌─────┐ ┌─────┐ ┌─────┐
│ A │ → │ B │ → │ C │ 总耗时 = A + B + C
└─────┘ └─────┘ └─────┘
异步(并发执行):
┌─────┐
│ A │──┐
└─────┘ │ ┌─────┐
├──│ B │ 总耗时 ≈ max(A, B, C)
┌─────┐ │ └─────┘
│ C │──┘
└─────┘
JavaScript 天生就是"异步优先"——浏览器只有一个主线程,所有 I/O 操作都通过事件循环 + 回调/Promise 实现。Python 的异步是可选的:同步代码是默认,异步通过 asyncio 库显式启用。二者都用 async/await 语法,但底层实现截然不同。
9.2 asyncio 基础概念
事件循环(Event Loop)
事件循环是异步编程的心脏——它像一个调度器,持续检查哪些任务可以执行,哪些在等待 I/O,并在任务间切换。
JavaScript 的 Event Loop 是浏览器内置的,自动运行;Python 的 Event Loop 需要手动创建或通过 asyncio.run() 启动。如果你的 JS 基础扎实,可以把 asyncio.run() 理解为「启动 Node.js 的事件循环——只是你得自己叫它开始」。
协程(Coroutine)
协程是使用 async def 定义的函数。调用协程不会立即执行,而是返回一个 coroutine 对象——需要在事件循环中「等待」它。
import asyncio
# 定义一个协程(就像 JS 的 async function)
async def greet(name):
await asyncio.sleep(1) # 模拟异步 I/O
return f"Hello, {name}"
# 错误:直接调用只会返回 coroutine 对象,不会执行
# coro = greet("World") # RuntimeWarning: coroutine was never awaited
# 正确:通过 asyncio.run() 启动
result = asyncio.run(greet("World"))
print(result) # Hello, World
await 关键字
await 是 Python 异步的核心操作符。它的行为与 JS 中的 await 几乎一致:暂停当前协程的执行,等待一个可等待对象(coroutine、Task、Future)完成后再继续。
| 概念 | JavaScript | Python |
|---|---|---|
| 异步函数定义 | async function fn() {} | async def fn(): |
| 等待异步结果 | await promise | await coroutine |
| 并发执行 | Promise.all([p1, p2]) | asyncio.gather(t1, t2) |
| 创建后台任务 | fn().then() (microtask) | asyncio.create_task(coro) |
| 延迟执行 | setTimeout(fn, ms) | asyncio.sleep(s) |
| 启动入口 | 自动(浏览器/Node) | asyncio.run(coro) |
9.3 asyncio 核心 API
asyncio.run() — 启动事件循环
asyncio.run() 是 Python 3.7+ 的标准入口。它创建事件循环、运行协程、清理资源一气呵成。
import asyncio
async def main():
print("开始...")
await asyncio.sleep(1)
print("结束!")
return 42
# asyncio.run() 是同步的——它会阻塞直到协程完成
result = asyncio.run(main())
print(f"返回: {result}")
# 注意:同一线程不能同时运行两个事件循环
# asyncio.run(main()) # 在同一线程再次调用也没问题(只要前一个已完成)
asyncio.create_task() — 创建并发任务
这是异步编程最常用的函数之一:将 coroutine 包装成一个 Task,放入事件循环的后台队列中并发执行。类似于 JS 中不 await 直接调用 async function 的效果。
import asyncio
import time
async def download(file_id, duration):
print(f" [{file_id}] 开始下载...")
await asyncio.sleep(duration) # 模拟下载耗时
print(f" [{file_id}] 下载完成!")
return f"file_{file_id}.data"
async def main():
# ❌ 顺序执行(慢)
t0 = time.monotonic()
r1 = await download("A", 2)
r2 = await download("B", 2)
r3 = await download("C", 2)
print(f"顺序执行耗时: {time.monotonic() - t0:.1f}s") # ~6s
# ✅ 并发执行(快)
t0 = time.monotonic()
tasks = [
asyncio.create_task(download("D", 2)),
asyncio.create_task(download("E", 2)),
asyncio.create_task(download("F", 2)),
]
results = await asyncio.gather(*tasks)
print(f"并发执行耗时: {time.monotonic() - t0:.1f}s") # ~2s
print(f"结果: {results}")
asyncio.run(main())
asyncio.gather() — 批量等待
gather() 是 Python 版的 Promise.all():接收多个可等待对象,并发执行,返回所有结果的列表。如果任一任务抛出异常,默认会传播给调用方。
import asyncio
async def fetch(url):
await asyncio.sleep(1)
if "error" in url:
raise ValueError(f"请求失败: {url}")
return f"{url}"
async def main():
urls = ["/api/users", "/api/posts", "/api/error", "/api/tags"]
# return_exceptions=True:异常作为结果返回,不抛出
results = await asyncio.gather(
*[fetch(url) for url in urls],
return_exceptions=True # 相当于 Promise.allSettled()
)
for url, result in zip(urls, results):
if isinstance(result, Exception):
print(f" {url}: 失败 - {result}")
else:
print(f" {url}: 成功 - {result}")
asyncio.run(main())
asyncio.wait_for() — 超时控制
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "done"
async def main():
try:
# 最多等待 3 秒,超时抛出 TimeoutError
result = await asyncio.wait_for(slow_operation(), timeout=3)
print(result)
except asyncio.TimeoutError:
print("操作超时! 3 秒后取消")
asyncio.run(main())
9.4 aiohttp — 异步 HTTP 客户端
aiohttp 是 Python 异步 HTTP 的标杆库——相当于 Node.js 的 node-fetch + 事件循环。它同时支持客户端和服务端,在爬虫、API 网关等场景中性能远超同步的 requests。
pip install aiohttp
import asyncio
import aiohttp
async def fetch_url(session, url):
"""获取单个 URL 的内容"""
try:
async with session.get(url, timeout=10) as response:
# 非 JSON 响应用 text()
if response.content_type == "application/json":
data = await response.json()
else:
data = await response.text()
return {"url": url, "status": response.status, "data": data}
except Exception as e:
return {"url": url, "error": str(e)}
async def main():
urls = [
"https://httpbin.org/json",
"https://httpbin.org/delay/1",
"https://httpbin.org/uuid",
]
# 通过连接器配置连接池(关键性能优化)
connector = aiohttp.TCPConnector(
limit=100, # 总连接池大小
limit_per_host=30, # 每个主机的最大连接数
ttl_dns_cache=300 # DNS 缓存时间(秒)
)
# 创建 session(推荐复用,不要每次请求新建)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for r in results:
status = r.get("status", "N/A")
url = r["url"]
print(f"[{status}] {url}")
asyncio.run(main())
总是复用 ClientSession 而不是每次请求都新建——session 内部维护连接池,复用可以避免 TCP 握手和 SSL 协商的开销。这与浏览器中的 HTTP Keep-Alive 是同一原理。
9.5 异步上下文管理器
Python 的 async with 语句是专为异步资源管理的语法糖——进入和退出 with 块时都可以执行异步操作(如建立/关闭数据库连接)。
import asyncio
class AsyncDatabase:
"""模拟异步数据库连接"""
async def __aenter__(self):
print("正在连接数据库...")
await asyncio.sleep(0.5)
print("数据库已连接")
return self
async def __aexit__(self, exc_type, exc, tb):
print("正在关闭连接...")
await asyncio.sleep(0.3)
print("连接已关闭")
async def query(self, sql):
await asyncio.sleep(0.2)
return [{"id": 1, "name": "Alice"}]
async def main():
async with AsyncDatabase() as db:
result = await db.query("SELECT * FROM users")
print(result)
asyncio.run(main())
# 输出:
# 正在连接数据库...
# 数据库已连接
# [{'id': 1, 'name': 'Alice'}]
# 正在关闭连接...
# 连接已关闭
9.6 异步迭代器与异步生成器
当数据源本身是异步的(如流式 API 响应、数据库游标),可以用 async for 逐个消费,而非等待所有数据返回后再处理。
import asyncio
# 异步生成器:yield 前可以 await
async def paginated_api(base_url, pages=3):
"""模拟分页 API——每页异步获取"""
for page in range(1, pages + 1):
await asyncio.sleep(0.3) # 模拟网络延迟
yield {"page": page, "items": [f"{base_url}/item-{page}-{i}" for i in range(5)]}
async def main():
# async for:每次迭代都可能需要 await
async for page_data in paginated_api("/api/users"):
page = page_data["page"]
count = len(page_data["items"])
print(f"第 {page} 页: {count} 条数据")
# 等价于 JS 中的:
# for await (const page of paginatedGenerator("/api/users")) { ... }
asyncio.run(main())
9.7 asyncio 高级模式
生产者-消费者模式
使用 asyncio.Queue 实现解耦的生产者和消费者——非常适合处理数据流管道。
import asyncio
import random
async def producer(queue, n):
"""生产者:产生数据放入队列"""
for i in range(n):
await asyncio.sleep(random.uniform(0.1, 0.3)) # 模拟产生数据的时间
item = f"数据-{i}"
await queue.put(item)
print(f" [生产] {item}")
# 发送结束信号
await queue.put(None)
async def consumer(queue, name):
"""消费者:从队列取数据并处理"""
while True:
item = await queue.get()
if item is None:
# 把结束信号放回去,让其他消费者也能收到
await queue.put(None)
break
await asyncio.sleep(random.uniform(0.2, 0.5)) # 模拟处理时间
print(f" [{name} 消费] {item}")
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10) # 限制队列容量(背压)
# 启动 1 个生产者 + 3 个消费者
prod = asyncio.create_task(producer(queue, 10))
consumers = [asyncio.create_task(consumer(queue, f"C-{i}")) for i in range(3)]
await prod
await asyncio.gather(*consumers)
print("所有任务完成!")
asyncio.run(main())
信号量与限流
当需要控制并发数量时(如 API 有速率限制),asyncio.Semaphore 是最佳工具。
import asyncio
import aiohttp
async def fetch_with_limit(sem, session, url):
"""带限流的请求——最多同时 N 个并发"""
async with sem: # 获取信号量许可(满了就等待)
async with session.get(url) as resp:
data = await resp.json()
print(f" {url} -> {resp.status}")
return data
async def main():
urls = [f"https://httpbin.org/delay/1?id={i}" for i in range(10)]
# 最多同时 3 个并发请求(API 速率限制)
sem = asyncio.Semaphore(3)
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(sem, session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"完成 {len(results)} 个请求")
asyncio.run(main())
9.8 实战:异步批量文件下载器
综合运用本章知识,构建一个支持并发控制、超时、重试的文件下载器。
#!/usr/bin/env python3
"""
async_downloader.py
异步批量文件下载器——支持并发控制、超时、重试、进度显示
"""
import asyncio
import aiohttp
import time
from pathlib import Path
from typing import Optional
from urllib.parse import urlparse
class AsyncDownloader:
def __init__(self,
max_concurrent: int = 5,
timeout: int = 30,
max_retries: int = 3,
output_dir: str = "./downloads"):
self.max_concurrent = max_concurrent
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.max_retries = max_retries
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self._sem = asyncio.Semaphore(max_concurrent)
def _get_filename(self, url: str) -> str:
"""从 URL 提取文件名"""
parsed = urlparse(url)
# 取路径最后一段作为文件名
name = Path(parsed.path).name or "index.html"
return name
async def _download_one(self,
session: aiohttp.ClientSession,
url: str) -> dict:
"""下载单个文件(带重试)"""
filename = self._get_filename(url)
filepath = self.output_dir / filename
for attempt in range(self.max_retries):
try:
async with self._sem: # 并发控制
async with session.get(url) as response:
response.raise_for_status()
# 流式写入大文件
with open(filepath, "wb") as f:
async for chunk in response.content.iter_chunked(8192):
f.write(chunk)
size = filepath.stat().st_size
return {
"url": url,
"file": str(filepath),
"size": size,
"status": "success"
}
except asyncio.TimeoutError:
if attempt == self.max_retries - 1:
return {"url": url, "status": "timeout", "error": "超时"}
await asyncio.sleep(2 ** attempt) # 指数退避
except Exception as e:
if attempt == self.max_retries - 1:
return {"url": url, "status": "error", "error": str(e)}
await asyncio.sleep(2 ** attempt)
async def download_batch(self, urls: list[str]) -> list[dict]:
"""批量下载"""
connector = aiohttp.TCPConnector(limit=self.max_concurrent + 10)
async with aiohttp.ClientSession(
connector=connector,
timeout=self.timeout
) as session:
tasks = [self._download_one(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
def run(self, urls: list[str]):
"""同步入口:启动下载并打印报告"""
print(f"开始下载 {len(urls)} 个文件 (并发数: {self.max_concurrent})")
print(f"保存目录: {self.output_dir.absolute()}\n")
t0 = time.monotonic()
results = asyncio.run(self.download_batch(urls))
elapsed = time.monotonic() - t0
# 统计报告
success = [r for r in results if r["status"] == "success"]
failed = [r for r in results if r["status"] != "success"]
print(f"\n{'='*50}")
print(f"下载完成! 耗时: {elapsed:.1f}s")
print(f" 成功: {len(success)} 个")
print(f" 失败: {len(failed)} 个")
for r in success:
size_kb = r["size"] / 1024
print(f" ✓ {r['file']} ({size_kb:.1f} KB)")
for r in failed:
print(f" ✗ {r['url']}: {r.get('error', '未知错误')}")
return results
if __name__ == "__main__":
# 示例用法
test_urls = [
"https://httpbin.org/image/jpeg",
"https://httpbin.org/image/png",
"https://httpbin.org/image/svg",
]
downloader = AsyncDownloader(
max_concurrent=3,
timeout=15,
max_retries=2,
output_dir="./my_downloads"
)
downloader.run(test_urls)
如果用 requests 库同步下载 10 个文件(每个耗时 2s),总耗时为 20s。使用 aiohttp + Semaphore(5) 异步下载,总耗时仅 ~4s(5 个并发,每批 2s)。对于 I/O 密集型任务,异步的优势是指数级的。
9.9 asyncio 常见陷阱
- 不要在协程中调用阻塞函数:
time.sleep()会阻塞整个事件循环。总是使用await asyncio.sleep()替代。 - CPU 密集型任务不适合 asyncio:图像处理、复杂计算等任务应使用
concurrent.futures.ProcessPoolExecutor,或配合loop.run_in_executor()放入线程池。 - Task 未被 await 可能导致静默丢失:如果创建了 Task 但没有 await 它,异常可能被静默吞掉。
- 不要在 Jupyter 中运行 asyncio.run():Jupyter 已有自己的事件循环,应直接使用
await。
asyncio.run() 在一个线程中只能同时运行一个实例。如果你在 FastAPI 或其他异步框架中使用 asyncio,不要手动调用 asyncio.run()——框架已经帮你管理了事件循环。
9.10 本章小结
async/await语法与 JS 高度相似,但底层事件循环需要手动启动asyncio.gather()是并发执行的核心工具,等价于Promise.all()asyncio.create_task()可以后台运行协程aiohttp是异步 HTTP 的首选库,始终复用ClientSessionasyncio.Semaphore控制并发数,asyncio.Queue实现生产者-消费者- 异步迭代器(
async for)适合流式数据处理 - 避免在协程中使用阻塞调用,CPU 密集型任务请用进程池