第 9 章:Python 异步编程

异步编程是现代 Python 开发的核心技能之一。对于前端开发者来说,JavaScript 中的 async/await 早已是日常工具。本章将从 JS 异步模型出发,深入讲解 Python asyncio 的工作机制、核心 API 和实战技巧。

9.1 异步编程为什么重要

同步编程最大的瓶颈是 I/O 等待——网络请求、文件读写、数据库查询等操作90%的时间都在等待外部响应。异步编程让程序在等待期间切换到其他任务,极大提升并发能力。

text
同步(顺序执行):
  ┌─────┐   ┌─────┐   ┌─────┐
  │ A   │ → │ B   │ → │ C   │    总耗时 = A + B + C
  └─────┘   └─────┘   └─────┘

异步(并发执行):
  ┌─────┐
  │ A   │──┐
  └─────┘  │  ┌─────┐
           ├──│ B   │             总耗时 ≈ max(A, B, C)
  ┌─────┐  │  └─────┘
  │ C   │──┘
  └─────┘
ℹ️JS vs Python 异步

JavaScript 天生就是"异步优先"——浏览器只有一个主线程,所有 I/O 操作都通过事件循环 + 回调/Promise 实现。Python 的异步是可选的:同步代码是默认,异步通过 asyncio 库显式启用。二者都用 async/await 语法,但底层实现截然不同。

9.2 asyncio 基础概念

事件循环(Event Loop)

事件循环是异步编程的心脏——它像一个调度器,持续检查哪些任务可以执行,哪些在等待 I/O,并在任务间切换。

ℹ️前端类比

JavaScript 的 Event Loop 是浏览器内置的,自动运行;Python 的 Event Loop 需要手动创建或通过 asyncio.run() 启动。如果你的 JS 基础扎实,可以把 asyncio.run() 理解为「启动 Node.js 的事件循环——只是你得自己叫它开始」。

协程(Coroutine)

协程是使用 async def 定义的函数。调用协程不会立即执行,而是返回一个 coroutine 对象——需要在事件循环中「等待」它。

python
import asyncio

# 定义一个协程(就像 JS 的 async function)
async def greet(name):
    await asyncio.sleep(1)  # 模拟异步 I/O
    return f"Hello, {name}"

# 错误:直接调用只会返回 coroutine 对象,不会执行
# coro = greet("World")  # RuntimeWarning: coroutine was never awaited

# 正确:通过 asyncio.run() 启动
result = asyncio.run(greet("World"))
print(result)  # Hello, World

await 关键字

await 是 Python 异步的核心操作符。它的行为与 JS 中的 await 几乎一致:暂停当前协程的执行,等待一个可等待对象(coroutine、Task、Future)完成后再继续。

概念JavaScriptPython
异步函数定义async function fn() {}async def fn():
等待异步结果await promiseawait coroutine
并发执行Promise.all([p1, p2])asyncio.gather(t1, t2)
创建后台任务fn().then() (microtask)asyncio.create_task(coro)
延迟执行setTimeout(fn, ms)asyncio.sleep(s)
启动入口自动(浏览器/Node)asyncio.run(coro)

9.3 asyncio 核心 API

asyncio.run() — 启动事件循环

asyncio.run() 是 Python 3.7+ 的标准入口。它创建事件循环、运行协程、清理资源一气呵成。

python
import asyncio

async def main():
    print("开始...")
    await asyncio.sleep(1)
    print("结束!")
    return 42

# asyncio.run() 是同步的——它会阻塞直到协程完成
result = asyncio.run(main())
print(f"返回: {result}")

# 注意:同一线程不能同时运行两个事件循环
# asyncio.run(main())  # 在同一线程再次调用也没问题(只要前一个已完成)

asyncio.create_task() — 创建并发任务

这是异步编程最常用的函数之一:将 coroutine 包装成一个 Task,放入事件循环的后台队列中并发执行。类似于 JS 中不 await 直接调用 async function 的效果。

python
import asyncio
import time

async def download(file_id, duration):
    print(f"  [{file_id}] 开始下载...")
    await asyncio.sleep(duration)  # 模拟下载耗时
    print(f"  [{file_id}] 下载完成!")
    return f"file_{file_id}.data"

async def main():
    # ❌ 顺序执行(慢)
    t0 = time.monotonic()
    r1 = await download("A", 2)
    r2 = await download("B", 2)
    r3 = await download("C", 2)
    print(f"顺序执行耗时: {time.monotonic() - t0:.1f}s")  # ~6s

    # ✅ 并发执行(快)
    t0 = time.monotonic()
    tasks = [
        asyncio.create_task(download("D", 2)),
        asyncio.create_task(download("E", 2)),
        asyncio.create_task(download("F", 2)),
    ]
    results = await asyncio.gather(*tasks)
    print(f"并发执行耗时: {time.monotonic() - t0:.1f}s")  # ~2s
    print(f"结果: {results}")

asyncio.run(main())

asyncio.gather() — 批量等待

gather() 是 Python 版的 Promise.all():接收多个可等待对象,并发执行,返回所有结果的列表。如果任一任务抛出异常,默认会传播给调用方。

python
import asyncio

async def fetch(url):
    await asyncio.sleep(1)
    if "error" in url:
        raise ValueError(f"请求失败: {url}")
    return f"{url}"

async def main():
    urls = ["/api/users", "/api/posts", "/api/error", "/api/tags"]

    # return_exceptions=True:异常作为结果返回,不抛出
    results = await asyncio.gather(
        *[fetch(url) for url in urls],
        return_exceptions=True  # 相当于 Promise.allSettled()
    )

    for url, result in zip(urls, results):
        if isinstance(result, Exception):
            print(f"  {url}: 失败 - {result}")
        else:
            print(f"  {url}: 成功 - {result}")

asyncio.run(main())

asyncio.wait_for() — 超时控制

python
import asyncio

async def slow_operation():
    await asyncio.sleep(10)
    return "done"

async def main():
    try:
        # 最多等待 3 秒,超时抛出 TimeoutError
        result = await asyncio.wait_for(slow_operation(), timeout=3)
        print(result)
    except asyncio.TimeoutError:
        print("操作超时! 3 秒后取消")

asyncio.run(main())

9.4 aiohttp — 异步 HTTP 客户端

aiohttp 是 Python 异步 HTTP 的标杆库——相当于 Node.js 的 node-fetch + 事件循环。它同时支持客户端和服务端,在爬虫、API 网关等场景中性能远超同步的 requests

bash
pip install aiohttp
python
import asyncio
import aiohttp

async def fetch_url(session, url):
    """获取单个 URL 的内容"""
    try:
        async with session.get(url, timeout=10) as response:
            # 非 JSON 响应用 text()
            if response.content_type == "application/json":
                data = await response.json()
            else:
                data = await response.text()
            return {"url": url, "status": response.status, "data": data}
    except Exception as e:
        return {"url": url, "error": str(e)}

async def main():
    urls = [
        "https://httpbin.org/json",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/uuid",
    ]

    # 通过连接器配置连接池(关键性能优化)
    connector = aiohttp.TCPConnector(
        limit=100,          # 总连接池大小
        limit_per_host=30,  # 每个主机的最大连接数
        ttl_dns_cache=300   # DNS 缓存时间(秒)
    )

    # 创建 session(推荐复用,不要每次请求新建)
    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    for r in results:
        status = r.get("status", "N/A")
        url = r["url"]
        print(f"[{status}] {url}")

asyncio.run(main())
💡最佳实践

总是复用 ClientSession 而不是每次请求都新建——session 内部维护连接池,复用可以避免 TCP 握手和 SSL 协商的开销。这与浏览器中的 HTTP Keep-Alive 是同一原理。

9.5 异步上下文管理器

Python 的 async with 语句是专为异步资源管理的语法糖——进入和退出 with 块时都可以执行异步操作(如建立/关闭数据库连接)。

python
import asyncio

class AsyncDatabase:
    """模拟异步数据库连接"""
    async def __aenter__(self):
        print("正在连接数据库...")
        await asyncio.sleep(0.5)
        print("数据库已连接")
        return self

    async def __aexit__(self, exc_type, exc, tb):
        print("正在关闭连接...")
        await asyncio.sleep(0.3)
        print("连接已关闭")

    async def query(self, sql):
        await asyncio.sleep(0.2)
        return [{"id": 1, "name": "Alice"}]

async def main():
    async with AsyncDatabase() as db:
        result = await db.query("SELECT * FROM users")
        print(result)

asyncio.run(main())
# 输出:
# 正在连接数据库...
# 数据库已连接
# [{'id': 1, 'name': 'Alice'}]
# 正在关闭连接...
# 连接已关闭

9.6 异步迭代器与异步生成器

当数据源本身是异步的(如流式 API 响应、数据库游标),可以用 async for 逐个消费,而非等待所有数据返回后再处理。

python
import asyncio

# 异步生成器:yield 前可以 await
async def paginated_api(base_url, pages=3):
    """模拟分页 API——每页异步获取"""
    for page in range(1, pages + 1):
        await asyncio.sleep(0.3)  # 模拟网络延迟
        yield {"page": page, "items": [f"{base_url}/item-{page}-{i}" for i in range(5)]}

async def main():
    # async for:每次迭代都可能需要 await
    async for page_data in paginated_api("/api/users"):
        page = page_data["page"]
        count = len(page_data["items"])
        print(f"第 {page} 页: {count} 条数据")

    # 等价于 JS 中的:
    # for await (const page of paginatedGenerator("/api/users")) { ... }

asyncio.run(main())

9.7 asyncio 高级模式

生产者-消费者模式

使用 asyncio.Queue 实现解耦的生产者和消费者——非常适合处理数据流管道。

python
import asyncio
import random

async def producer(queue, n):
    """生产者:产生数据放入队列"""
    for i in range(n):
        await asyncio.sleep(random.uniform(0.1, 0.3))  # 模拟产生数据的时间
        item = f"数据-{i}"
        await queue.put(item)
        print(f"  [生产] {item}")
    # 发送结束信号
    await queue.put(None)

async def consumer(queue, name):
    """消费者:从队列取数据并处理"""
    while True:
        item = await queue.get()
        if item is None:
            # 把结束信号放回去,让其他消费者也能收到
            await queue.put(None)
            break
        await asyncio.sleep(random.uniform(0.2, 0.5))  # 模拟处理时间
        print(f"  [{name} 消费] {item}")
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=10)  # 限制队列容量(背压)

    # 启动 1 个生产者 + 3 个消费者
    prod = asyncio.create_task(producer(queue, 10))
    consumers = [asyncio.create_task(consumer(queue, f"C-{i}")) for i in range(3)]

    await prod
    await asyncio.gather(*consumers)
    print("所有任务完成!")

asyncio.run(main())

信号量与限流

当需要控制并发数量时(如 API 有速率限制),asyncio.Semaphore 是最佳工具。

python
import asyncio
import aiohttp

async def fetch_with_limit(sem, session, url):
    """带限流的请求——最多同时 N 个并发"""
    async with sem:  # 获取信号量许可(满了就等待)
        async with session.get(url) as resp:
            data = await resp.json()
            print(f"  {url} -> {resp.status}")
            return data

async def main():
    urls = [f"https://httpbin.org/delay/1?id={i}" for i in range(10)]

    # 最多同时 3 个并发请求(API 速率限制)
    sem = asyncio.Semaphore(3)

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_limit(sem, session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    print(f"完成 {len(results)} 个请求")

asyncio.run(main())

9.8 实战:异步批量文件下载器

综合运用本章知识,构建一个支持并发控制、超时、重试的文件下载器。

python
#!/usr/bin/env python3
"""
async_downloader.py
异步批量文件下载器——支持并发控制、超时、重试、进度显示
"""

import asyncio
import aiohttp
import time
from pathlib import Path
from typing import Optional
from urllib.parse import urlparse

class AsyncDownloader:
    def __init__(self,
                 max_concurrent: int = 5,
                 timeout: int = 30,
                 max_retries: int = 3,
                 output_dir: str = "./downloads"):
        self.max_concurrent = max_concurrent
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.max_retries = max_retries
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self._sem = asyncio.Semaphore(max_concurrent)

    def _get_filename(self, url: str) -> str:
        """从 URL 提取文件名"""
        parsed = urlparse(url)
        # 取路径最后一段作为文件名
        name = Path(parsed.path).name or "index.html"
        return name

    async def _download_one(self,
                            session: aiohttp.ClientSession,
                            url: str) -> dict:
        """下载单个文件(带重试)"""
        filename = self._get_filename(url)
        filepath = self.output_dir / filename

        for attempt in range(self.max_retries):
            try:
                async with self._sem:  # 并发控制
                    async with session.get(url) as response:
                        response.raise_for_status()

                        # 流式写入大文件
                        with open(filepath, "wb") as f:
                            async for chunk in response.content.iter_chunked(8192):
                                f.write(chunk)

                size = filepath.stat().st_size
                return {
                    "url": url,
                    "file": str(filepath),
                    "size": size,
                    "status": "success"
                }
            except asyncio.TimeoutError:
                if attempt == self.max_retries - 1:
                    return {"url": url, "status": "timeout", "error": "超时"}
                await asyncio.sleep(2 ** attempt)  # 指数退避
            except Exception as e:
                if attempt == self.max_retries - 1:
                    return {"url": url, "status": "error", "error": str(e)}
                await asyncio.sleep(2 ** attempt)

    async def download_batch(self, urls: list[str]) -> list[dict]:
        """批量下载"""
        connector = aiohttp.TCPConnector(limit=self.max_concurrent + 10)
        async with aiohttp.ClientSession(
            connector=connector,
            timeout=self.timeout
        ) as session:
            tasks = [self._download_one(session, url) for url in urls]
            results = await asyncio.gather(*tasks)

        return results

    def run(self, urls: list[str]):
        """同步入口:启动下载并打印报告"""
        print(f"开始下载 {len(urls)} 个文件 (并发数: {self.max_concurrent})")
        print(f"保存目录: {self.output_dir.absolute()}\n")

        t0 = time.monotonic()
        results = asyncio.run(self.download_batch(urls))
        elapsed = time.monotonic() - t0

        # 统计报告
        success = [r for r in results if r["status"] == "success"]
        failed = [r for r in results if r["status"] != "success"]

        print(f"\n{'='*50}")
        print(f"下载完成! 耗时: {elapsed:.1f}s")
        print(f"  成功: {len(success)} 个")
        print(f"  失败: {len(failed)} 个")

        for r in success:
            size_kb = r["size"] / 1024
            print(f"  ✓ {r['file']} ({size_kb:.1f} KB)")

        for r in failed:
            print(f"  ✗ {r['url']}: {r.get('error', '未知错误')}")

        return results


if __name__ == "__main__":
    # 示例用法
    test_urls = [
        "https://httpbin.org/image/jpeg",
        "https://httpbin.org/image/png",
        "https://httpbin.org/image/svg",
    ]

    downloader = AsyncDownloader(
        max_concurrent=3,
        timeout=15,
        max_retries=2,
        output_dir="./my_downloads"
    )
    downloader.run(test_urls)
💡对比同步版本

如果用 requests 库同步下载 10 个文件(每个耗时 2s),总耗时为 20s。使用 aiohttp + Semaphore(5) 异步下载,总耗时仅 ~4s(5 个并发,每批 2s)。对于 I/O 密集型任务,异步的优势是指数级的。

9.9 asyncio 常见陷阱

⚠️重要

asyncio.run() 在一个线程中只能同时运行一个实例。如果你在 FastAPI 或其他异步框架中使用 asyncio,不要手动调用 asyncio.run()——框架已经帮你管理了事件循环。

9.10 本章小结

📝要点回顾
  • async/await 语法与 JS 高度相似,但底层事件循环需要手动启动
  • asyncio.gather() 是并发执行的核心工具,等价于 Promise.all()
  • asyncio.create_task() 可以后台运行协程
  • aiohttp 是异步 HTTP 的首选库,始终复用 ClientSession
  • asyncio.Semaphore 控制并发数,asyncio.Queue 实现生产者-消费者
  • 异步迭代器(async for)适合流式数据处理
  • 避免在协程中使用阻塞调用,CPU 密集型任务请用进程池