第 5 章:网络请求与爬虫

作为前端开发者,你已经熟悉 HTTP 请求、DOM 操作和浏览器行为。本章将把这些知识迁移到 Python 世界:用 Requests 发送请求、用 BeautifulSoup 解析 HTML、用 Playwright 控制浏览器,最终完成一个完整的爬虫实战项目。

5.1 HTTP 基础回顾

前端开发者对 HTTP 已经非常熟悉,本节快速回顾核心概念,建立与 Python 实现的对应关系。

请求方法

方法 前端场景 Python 场景
GET 页面加载、获取数据 爬取网页、调用 API
POST 表单提交、创建资源 提交数据、登录
PUT 更新资源(完整替换) 更新 API 资源
PATCH 局部更新 部分字段更新
DELETE 删除资源 删除 API 资源

常见状态码

状态码 含义 爬虫处理建议
200 成功 正常处理响应内容
301/302 重定向 Requests 默认自动跟随
400 请求参数错误 检查 URL 参数和请求体
403 禁止访问 可能需要 Cookie、Referer 或更换 IP
404 资源不存在 URL 可能已变更
429 请求过于频繁 降低请求频率,添加延时
500 服务器内部错误 稍后重试

Headers 与 Cookie

前端通过 fetchxhr 发送请求时设置的 Headers,在 Python 中同样需要关注:

5.2 Requests 库

requests 是 Python 最知名的 HTTP 库,以"HTTP for Humans"为设计理念,API 极其简洁优雅。它对应前端生态中的 axios

ℹ️与 JS 的对比

Requests = axios(但无需处理 Promise,Python 的同步代码更直观)。如果需要异步请求,可以使用 aiohttphttpx

安装与基础用法

bash
pip install requests
python
import requests

# ========== GET 请求 ==========
# 类似 JS: axios.get('https://api.github.com')
response = requests.get("https://api.github.com")

# 状态码
print(response.status_code)  # 200

# 响应头(类似 response.headers)
print(response.headers["Content-Type"])  # application/json

# JSON 响应(自动解析,类似 response.data)
data = response.json()
print(data["current_user_url"])

# 文本响应(HTML 页面等)
html = response.text

# 二进制响应(图片、文件等)
content = response.content


# ========== 带参数的 GET ==========
# 类似 JS: axios.get('/search', { params: { q: 'python' } })
params = {"q": "python", "page": 1}
response = requests.get("https://httpbin.org/get", params=params)
print(response.url)  # https://httpbin.org/get?q=python&page=1


# ========== POST 请求 ==========
# 类似 JS: axios.post('/login', { username: 'xxx', password: 'xxx' })
payload = {"username": "admin", "password": "secret"}
response = requests.post("https://httpbin.org/post", json=payload)

# 表单提交(Content-Type: application/x-www-form-urlencoded)
response = requests.post("https://httpbin.org/post", data=payload)

# 上传文件
with open("document.pdf", "rb") as f:
    files = {"file": ("document.pdf", f, "application/pdf")}
    response = requests.post("https://httpbin.org/post", files=files)


# ========== 设置 Headers ==========
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.0",
    "Accept": "application/json",
    "Authorization": "Bearer your_token_here",
}
response = requests.get("https://api.example.com/data", headers=headers)


# ========== 错误处理 ==========
try:
    response = requests.get("https://api.example.com/data", timeout=10)
    response.raise_for_status()  # 如果状态码 >= 400,抛出 HTTPError
except requests.exceptions.HTTPError as e:
    print(f"HTTP 错误: {e}")
except requests.exceptions.ConnectionError:
    print("连接失败")
except requests.exceptions.Timeout:
    print("请求超时")
except requests.exceptions.RequestException as e:
    print(f"请求异常: {e}")

Session 保持会话

Session 对象可以在多次请求之间保持 Cookie,类似浏览器中的会话保持。这在需要登录后才能访问的页面中非常重要。

python
import requests

# 创建 Session(类似浏览器的一个标签页)
session = requests.Session()

# 设置全局 Headers(该 Session 的所有请求都会带上)
session.headers.update({
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
})

# 第一步:登录(Session 会自动保存返回的 Cookie)
login_data = {"username": "myuser", "password": "mypass"}
login_response = session.post("https://example.com/login", data=login_data)

# 第二步:访问需要登录的页面(自动带上登录 Cookie)
dashboard = session.get("https://example.com/dashboard")
print(dashboard.text)

# 第三步:继续操作(Cookie 持续有效)
profile = session.get("https://example.com/profile")

# 查看当前保存的 Cookie
print(session.cookies.get_dict())

# 手动设置 Cookie
session.cookies.set("session_id", "abc123")

# 关闭 Session
session.close()

超时与重试

网络请求不稳定时,合理的超时设置和重试机制能大幅提升脚本的健壮性。

python
import time
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry


def create_session_with_retry(
    retries: int = 3,
    backoff_factor: float = 0.3,
    status_forcelist: tuple[int, ...] = (500, 502, 503, 504),
) -> requests.Session:
    """创建带自动重试的 Session.

    参数:
        retries: 最大重试次数
        backoff_factor: 重试间隔系数(第一次等 0.3s,第二次 0.6s,第三次 1.2s...)
        status_forcelist: 遇到这些状态码时重试
    """
    session = requests.Session()

    retry_strategy = Retry(
        total=retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist,
        allowed_methods=["HEAD", "GET", "OPTIONS", "POST"],
    )

    adapter = HTTPAdapter(max_retries=retry_strategy)
    session.mount("http://", adapter)
    session.mount("https://", adapter)

    return session


# 使用
session = create_session_with_retry()
response = session.get("https://api.example.com/data", timeout=(3, 27))
# timeout=(连接超时, 读取超时)


# 简单的手动重试示例
def fetch_with_retry(url: str, max_retries: int = 3) -> requests.Response:
    """手动实现重试逻辑."""
    for attempt in range(max_retries):
        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()
            return response
        except requests.exceptions.RequestException as e:
            if attempt == max_retries - 1:
                raise
            wait_time = 2 ** attempt  # 指数退避
            print(f"请求失败,{wait_time}秒后重试...")
            time.sleep(wait_time)

    raise RuntimeError("Unreachable")

5.3 HTML 解析

获取 HTML 后,需要从中提取数据。前端开发者熟悉 DOM 操作和 CSS 选择器,Python 的 BeautifulSoup 提供了几乎相同的体验。

BeautifulSoup 基础

python
"""BeautifulSoup 示例 - 类似前端的 DOM 查询.

安装: pip install beautifulsoup4
还需要解析器: pip install lxml(更快)或直接使用内置的 html.parser
"""

from bs4 import BeautifulSoup
import requests

# 获取示例 HTML
html_doc = """

示例页面

  

欢迎来到示例网站

总价: ¥600

热销
""" # 创建 Soup 对象(类似 JS 的 DOM 解析) soup = BeautifulSoup(html_doc, "lxml") # 或 "html.parser" # ========== 基本查找 ========== # 通过标签名查找(类似 document.querySelector('title')) title = soup.find("title") print(title.text) # 示例页面 print(title.get_text()) # 同上 # 通过 id 查找(类似 document.getElementById('title')) heading = soup.find(id="title") print(heading.text) # 通过 class 查找(类似 document.querySelector('.items')) # 注意:class 是 Python 关键字,参数名用 class_ item_list = soup.find("ul", class_="items") # 查找所有 li(类似 document.querySelectorAll('li')) items = soup.find_all("li") for item in items: print(item.text.strip()) # ========== CSS 选择器 ========== # select() 方法支持完整的 CSS 选择器语法! # 类选择器 hot_tags = soup.select(".hot") # 类似 .querySelectorAll('.hot') first_item = soup.select_one(".items li") # 类似 .querySelector('.items li') # ID 选择器 title_elem = soup.select_one("#title") # 类似 #title # 属性选择器 links = soup.select("a[href^='/item/']") # href 以 /item/ 开头 item_2 = soup.select_one('li[data-id="2"]') # 组合选择器 price = soup.select_one(".info .price") # 后代选择器 tags = soup.select(".container > .info") # 子选择器 # 伪类选择器(部分支持) first_li = soup.select_one("li:first-child") last_li = soup.select_one("li:last-child") # ========== 提取数据 ========== # 获取属性(类似 element.getAttribute('href')) link = soup.select_one("a") print(link["href"]) # /item/1 print(link.get("href")) # 同上 print(link.attrs) # {'href': '/item/1'} # 获取自定义 data 属性 li = soup.select_one('li[data-id="2"]') print(li["data-id"]) # 2 # 获取文本内容 print(link.text) # 商品 1 print(link.string) # 商品 1(如果只有一个子节点) print(link.get_text(strip=True)) # 去除空白 # ========== 实际爬虫示例 ========== def fetch_article_titles(url: str) -> list[dict[str, str]]: """从博客首页提取文章标题和链接. 类似于前端用 querySelectorAll 提取列表数据。 """ response = requests.get(url, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, "lxml") articles = [] # 假设文章结构: for article in soup.select("article.post"): title_elem = article.select_one("h2 a") if title_elem: articles.append({ "title": title_elem.get_text(strip=True), "url": title_elem["href"], }) return articles # 使用 # articles = fetch_article_titles("https://example.com/blog") # for article in articles: # print(f"{article['title']}: {article['url']}")

lxml 简介

lxml 是高性能的 XML/HTML 解析库,BeautifulSoup 可以借助它提升解析速度。对于大规模爬取,直接使用 lxml 的 XPath 查询效率更高。

python
from lxml import html
import requests

# 使用 lxml 直接解析
response = requests.get("https://example.com")
tree = html.fromstring(response.content)

# XPath 查询(比 CSS 选择器更强大)
# 提取所有链接
titles = tree.xpath("//h1/text()")
links = tree.xpath("//a/@href")

# 带条件的 XPath
items = tree.xpath('//li[@data-id]/a/text()')

# 提取表格数据
rows = tree.xpath("//table/tr")
for row in rows:
    cells = row.xpath("./td/text()")
    print(cells)

# XPath 函数
first_title = tree.xpath("//h1/text()")[0]  # 取第一个
item_count = tree.xpath("count(//li)")       # 计数
ℹ️选择器对比

前端开发者熟悉 CSS 选择器,建议先用 BeautifulSoup 的 select() 方法。遇到复杂查询(如文本内容匹配、父节点选择)时,再考虑 XPath。

5.4 浏览器自动化

有些网站使用 JavaScript 动态渲染内容,或者需要复杂的用户交互。这时需要控制真实的浏览器,就像前端自动化测试一样。

Playwright for Python

Playwright 是微软开源的浏览器自动化框架,支持 Python、Node.js、Java 等语言。前端开发者可能对它的 Node.js 版本已经很熟悉了。

ℹ️与前端测试对比

Playwright 的 Python API 与 Node.js 版本几乎一致。如果你用过 @playwright/test,迁移到 Python 版本几乎没有学习成本。

bash
# 安装 Playwright
pip install playwright

# 安装浏览器(Chromium、Firefox、WebKit)
playwright install chromium
python
"""Playwright 示例 - 浏览器自动化.

对比 Node.js 版本,API 几乎完全一致。
"""

from playwright.sync_api import sync_playwright


def scrape_dynamic_page() -> None:
    """爬取 JavaScript 渲染的页面."""
    with sync_playwright() as p:
        # 启动浏览器(无头模式,类似 headless: true)
        browser = p.chromium.launch(headless=True)

        # 创建新页面(类似 browser.newPage())
        page = browser.new_page()

        # 设置视口
        page.set_viewport_size({"width": 1280, "height": 720})

        # 访问页面
        page.goto("https://example.com")

        # 等待元素加载(类似 page.waitForSelector)
        page.wait_for_selector(".content-loaded")

        # 提取数据(类似 page.evaluate)
        title = page.title()
        print(f"页面标题: {title}")

        # 获取元素文本
        items = page.query_selector_all(".item")
        for item in items:
            text = item.inner_text()
            print(text)

        # 截图
        page.screenshot(path="screenshot.png", full_page=True)

        # 关闭浏览器
        browser.close()


def scrape_with_interaction() -> None:
    """需要交互的页面爬取示例."""
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=False)  # 有头模式,方便调试
        context = browser.new_context(
            user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
        )
        page = context.new_page()

        # 访问登录页
        page.goto("https://example.com/login")

        # 填写表单(类似 page.fill)
        page.fill("input[name='username']", "myuser")
        page.fill("input[name='password']", "mypass")

        # 点击登录按钮
        page.click("button[type='submit']")

        # 等待导航完成
        page.wait_for_url("https://example.com/dashboard")

        # 等待动态内容加载
        page.wait_for_selector(".data-table")

        # 提取表格数据
        rows = page.query_selector_all(".data-table tr")
        data = []
        for row in rows[1:]:  # 跳过表头
            cells = row.query_selector_all("td")
            data.append({
                "name": cells[0].inner_text(),
                "value": cells[1].inner_text(),
            })

        print(data)

        # 保存 Cookie 供后续使用
        cookies = context.cookies()
        print(cookies)

        browser.close()


def scrape_with_scroll() -> None:
    """滚动加载的页面(无限滚动)."""
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=True)
        page = browser.new_page()
        page.goto("https://example.com/infinite-scroll")

        # 滚动到页面底部多次,触发加载
        for _ in range(5):
            page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
            page.wait_for_timeout(1000)  # 等待 1 秒

        # 现在所有内容都已加载
        items = page.query_selector_all(".item")
        print(f"共加载 {len(items)} 条数据")

        browser.close()


# 运行
# scrape_dynamic_page()

等待策略

等待是浏览器自动化的核心。Playwright 提供了多种等待方式,与前端测试中的等待策略完全一致。

python
# 等待元素出现在 DOM 中(默认 30 秒超时)
page.wait_for_selector(".item")

# 等待元素可见
page.wait_for_selector(".item", state="visible")

# 等待元素从 DOM 中消失
page.wait_for_selector(".loading", state="hidden")

# 等待网络空闲(类似 Cypress 的 cy.wait())
page.wait_for_load_state("networkidle")

# 等待特定请求完成
with page.expect_response("**/api/data") as response_info:
    page.click("#load-more")
response = response_info.value
print(response.json())

# 等待超时设置
page.wait_for_selector(".item", timeout=10000)  # 10 秒超时

5.5 数据存储

爬取到的数据需要持久化存储。Python 标准库提供了 CSV、JSON、SQLite 的支持,无需额外安装。

CSV 存储

python
import csv

# 写入 CSV
movies = [
    {"title": "肖申克的救赎", "rating": 9.7, "year": 1994},
    {"title": "霸王别姬", "rating": 9.6, "year": 1993},
    {"title": "阿甘正传", "rating": 9.5, "year": 1994},
]

with open("movies.csv", "w", newline="", encoding="utf-8-sig") as f:
    # fieldnames 定义表头
    writer = csv.DictWriter(f, fieldnames=["title", "rating", "year"])
    writer.writeheader()
    writer.writerows(movies)

# 读取 CSV
with open("movies.csv", "r", encoding="utf-8-sig") as f:
    reader = csv.DictReader(f)
    for row in reader:
        print(f"{row['title']}: {row['rating']}")

# 使用 pandas 处理大数据量(推荐)
# import pandas as pd
# df = pd.DataFrame(movies)
# df.to_csv("movies.csv", index=False, encoding="utf-8-sig")
# df = pd.read_csv("movies.csv")

JSON 存储

python
import json

# Python 字典 -> JSON 字符串(类似 JSON.stringify)
data = {"name": "Python", "year": 1991, "creator": "Guido van Rossum"}
json_str = json.dumps(data, ensure_ascii=False, indent=2)
print(json_str)

# 写入 JSON 文件
with open("data.json", "w", encoding="utf-8") as f:
    json.dump(data, f, ensure_ascii=False, indent=2)

# JSON 字符串 -> Python 对象(类似 JSON.parse)
parsed = json.loads(json_str)
print(parsed["name"])

# 读取 JSON 文件
with open("data.json", "r", encoding="utf-8") as f:
    loaded = json.load(f)

# 处理非标准类型(如 datetime)
from datetime import datetime

def datetime_encoder(obj):
    """自定义 JSON 编码器."""
    if isinstance(obj, datetime):
        return obj.isoformat()
    raise TypeError(f"Object of type {type(obj)} is not JSON serializable")

now = {"time": datetime.now()}
json_str = json.dumps(now, default=datetime_encoder)

SQLite 轻量数据库

SQLite 是嵌入式的关系型数据库,无需单独安装服务器,非常适合爬虫数据的本地存储。它对应前端生态中的 IndexedDB,但使用 SQL 查询。

python
import sqlite3
from datetime import datetime


def init_db(db_path: str = "movies.db") -> sqlite3.Connection:
    """初始化数据库并创建表."""
    conn = sqlite3.connect(db_path)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS movies (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            title TEXT NOT NULL,
            rating REAL,
            year INTEGER,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)
    conn.commit()
    return conn


def save_movie(conn: sqlite3.Connection, title: str, rating: float, year: int) -> None:
    """保存电影数据."""
    conn.execute(
        "INSERT INTO movies (title, rating, year) VALUES (?, ?, ?)",
        (title, rating, year),
    )
    conn.commit()


def get_all_movies(conn: sqlite3.Connection) -> list[dict]:
    """获取所有电影."""
    conn.row_factory = sqlite3.Row  # 让查询结果可以像字典一样访问
    cursor = conn.execute("SELECT * FROM movies ORDER BY rating DESC")
    return [dict(row) for row in cursor.fetchall()]


def search_movies(conn: sqlite3.Connection, keyword: str) -> list[dict]:
    """搜索电影."""
    conn.row_factory = sqlite3.Row
    cursor = conn.execute(
        "SELECT * FROM movies WHERE title LIKE ? ORDER BY rating DESC",
        (f"%{keyword}%",),
    )
    return [dict(row) for row in cursor.fetchall()]


# 使用示例
conn = init_db()

# 保存数据
save_movie(conn, "肖申克的救赎", 9.7, 1994)
save_movie(conn, "霸王别姬", 9.6, 1993)

# 查询数据
movies = get_all_movies(conn)
for movie in movies:
    print(f"{movie['title']}: {movie['rating']}")

# 搜索
results = search_movies(conn, "肖")
print(f"找到 {len(results)} 条结果")

conn.close()
💡技巧

对于大规模爬虫项目,推荐使用 SQLAlchemy ORM 框架操作数据库,它类似前端的 Prisma 或 TypeORM,可以用 Python 对象的方式操作数据表。

5.6 反爬与合规

爬虫是一把双刃剑。本节介绍常见的反爬机制和应对策略,以及必须遵守的法律和道德规范。

常见反爬手段

反爬手段 说明 应对策略
User-Agent 检测 检查请求头中的客户端标识 设置真实的浏览器 UA
IP 频率限制 同一 IP 短时间内请求过多 降低请求频率、使用代理池
Cookie/Session 验证 需要登录或特定 Cookie 使用 Session 维持登录状态
验证码 图形验证码、滑块验证等 打码平台、OCR、或放弃爬取
动态渲染 数据由 JavaScript 动态加载 使用 Playwright/Selenium
字体反爬 使用自定义字体隐藏真实内容 分析字体映射关系

请求频率控制

python
import random
import time


def polite_request(url: str, min_delay: float = 1.0, max_delay: float = 3.0) -> requests.Response:
    """礼貌地发送请求,带随机延时.

    模拟人类浏览行为,避免触发频率限制。
    """
    # 随机延时(避免固定间隔被识别为机器人)
    delay = random.uniform(min_delay, max_delay)
    time.sleep(delay)

    headers = {
        "User-Agent": (
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
            "AppleWebKit/537.36 (KHTML, like Gecko) "
            "Chrome/120.0.0.0 Safari/537.0"
        ),
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
        "Accept-Encoding": "gzip, deflate, br",
        "Connection": "keep-alive",
    }

    return requests.get(url, headers=headers, timeout=10)


# 批量爬取时控制频率
urls = [f"https://example.com/page/{i}" for i in range(1, 11)]
for url in urls:
    try:
        response = polite_request(url)
        print(f"成功: {url} ({response.status_code})")
    except requests.exceptions.RequestException as e:
        print(f"失败: {url} - {e}")
        # 遇到错误增加额外冷却时间
        time.sleep(10)

robots.txt 与法律合规

⚠️法律合规意识

爬虫必须遵守以下原则:

  1. 遵守 robots.txt:网站根目录的 robots.txt 文件声明了允许/禁止爬取的路径。使用 robotparser 模块检查。
  2. 不要爬取个人隐私数据:用户信息、聊天记录等敏感数据严禁爬取。
  3. 控制请求频率:不要对目标网站造成过大负载,建议每秒不超过 1 个请求。
  4. 遵守网站服务条款:很多网站的使用条款明确禁止爬虫行为。
  5. 数据使用合规:爬取的数据不得用于商业竞争、侵犯版权等非法用途。
python
import urllib.robotparser

# 检查 robots.txt
rp = urllib.robotparser.RobotFileParser()
rp.set_url("https://movie.douban.com/robots.txt")
rp.read()

# 检查是否允许爬取特定 URL
can_fetch = rp.can_fetch("*", "https://movie.douban.com/top250")
print(f"是否允许爬取: {can_fetch}")

# 获取建议的爬取间隔
crawl_delay = rp.crawl_delay("*")
if crawl_delay:
    print(f"建议爬取间隔: {crawl_delay} 秒")

5.7 实战案例:爬取豆瓣电影 Top250

综合运用本章所学,完成一个完整的爬虫项目:爬取豆瓣电影 Top250,并将结果保存到 Excel。

ℹ️说明

豆瓣电影 Top250 每页展示 25 部电影,共 10 页。URL 格式为 https://movie.douban.com/top250?start={0|25|50...}。本案例仅用于学习目的,请遵守豆瓣 robots.txt 和使用条款。

python
#!/usr/bin/env python3
"""豆瓣电影 Top250 爬虫.

功能:
    - 爬取豆瓣电影 Top250 的标题、评分、评价人数、链接
    - 保存到 Excel 文件
    - 包含完整的错误处理和频率控制

安装依赖:
    pip install requests beautifulsoup4 openpyxl
"""

import random
import time
from dataclasses import dataclass

import requests
from bs4 import BeautifulSoup
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment


@dataclass
class Movie:
    """电影数据类."""

    rank: int          # 排名
    title: str         # 标题
    rating: float      # 评分
    review_count: str  # 评价人数
    link: str          # 详情链接
    quote: str         # 经典台词


class DoubanSpider:
    """豆瓣电影爬虫."""

    BASE_URL = "https://movie.douban.com/top250"
    PER_PAGE = 25
    TOTAL_PAGES = 10

    def __init__(self) -> None:
        """初始化爬虫."""
        self.session = requests.Session()
        self.session.headers.update({
            "User-Agent": (
                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                "AppleWebKit/537.36 (KHTML, like Gecko) "
                "Chrome/120.0.0.0 Safari/537.36"
            ),
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
            "Accept-Language": "zh-CN,zh;q=0.9",
        })
        self.movies: list[Movie] = []

    def fetch_page(self, start: int) -> str | None:
        """获取单页 HTML.

        参数:
            start: 起始偏移量(0, 25, 50, ...)

        返回:
            HTML 文本,失败返回 None
        """
        params = {"start": start}

        try:
            # 礼貌请求:随机延时
            time.sleep(random.uniform(1.5, 3.0))

            response = self.session.get(
                self.BASE_URL,
                params=params,
                timeout=10,
            )
            response.raise_for_status()
            return response.text

        except requests.exceptions.RequestException as e:
            print(f"请求失败 (start={start}): {e}")
            return None

    def parse_page(self, html: str) -> list[Movie]:
        """解析单页电影数据.

        参数:
            html: 页面 HTML

        返回:
            该页面的电影列表
        """
        soup = BeautifulSoup(html, "lxml")
        movies = []

        # 豆瓣 Top250 的列表项选择器
        items = soup.select("div.item")

        for item in items:
            try:
                # 排名
                rank = int(item.select_one("em").text)

                # 标题(可能有中文名和英文名)
                title_elem = item.select_one("span.title")
                title = title_elem.text if title_elem else "未知"

                # 评分
                rating_elem = item.select_one("span.rating_num")
                rating = float(rating_elem.text) if rating_elem else 0.0

                # 评价人数
                review_elem = item.select_one("div.star span:last-child")
                review_count = review_elem.text if review_elem else "0人评价"

                # 链接
                link_elem = item.select_one("div.hd a")
                link = link_elem["href"] if link_elem else ""

                # 经典台词
                quote_elem = item.select_one("span.inq")
                quote = quote_elem.text if quote_elem else ""

                movies.append(Movie(
                    rank=rank,
                    title=title,
                    rating=rating,
                    review_count=review_count,
                    link=link,
                    quote=quote,
                ))

            except (AttributeError, ValueError) as e:
                print(f"解析单项失败: {e}")
                continue

        return movies

    def run(self) -> None:
        """运行爬虫,获取所有数据."""
        print("开始爬取豆瓣电影 Top250...")

        for page in range(self.TOTAL_PAGES):
            start = page * self.PER_PAGE
            print(f"正在爬取第 {page + 1}/{self.TOTAL_PAGES} 页...")

            html = self.fetch_page(start)
            if html is None:
                print(f"第 {page + 1} 页获取失败,跳过")
                continue

            page_movies = self.parse_page(html)
            self.movies.extend(page_movies)
            print(f"第 {page + 1} 页获取 {len(page_movies)} 条数据")

        print(f"\n爬取完成!共获取 {len(self.movies)} 部电影")

    def save_to_excel(self, filename: str = "douban_top250.xlsx") -> None:
        """保存数据到 Excel.

        参数:
            filename: 输出文件名
        """
        if not self.movies:
            print("没有数据可保存")
            return

        wb = Workbook()
        ws = wb.active
        ws.title = "豆瓣电影 Top250"

        # 设置表头
        headers = ["排名", "标题", "评分", "评价人数", "经典台词", "链接"]
        header_fill = PatternFill(
            start_color="366092",
            end_color="366092",
            fill_type="solid",
        )
        header_font = Font(bold=True, color="FFFFFF")

        for col, header in enumerate(headers, 1):
            cell = ws.cell(row=1, column=col, value=header)
            cell.fill = header_fill
            cell.font = header_font
            cell.alignment = Alignment(horizontal="center")

        # 填充数据
        for row_idx, movie in enumerate(self.movies, 2):
            ws.cell(row=row_idx, column=1, value=movie.rank)
            ws.cell(row=row_idx, column=2, value=movie.title)
            ws.cell(row=row_idx, column=3, value=movie.rating)
            ws.cell(row=row_idx, column=4, value=movie.review_count)
            ws.cell(row=row_idx, column=5, value=movie.quote)
            ws.cell(row=row_idx, column=6, value=movie.link)

        # 调整列宽
        ws.column_dimensions["A"].width = 8
        ws.column_dimensions["B"].width = 25
        ws.column_dimensions["C"].width = 10
        ws.column_dimensions["D"].width = 15
        ws.column_dimensions["E"].width = 40
        ws.column_dimensions["F"].width = 50

        wb.save(filename)
        print(f"数据已保存到: {filename}")


def main() -> None:
    """主函数."""
    spider = DoubanSpider()
    spider.run()
    spider.save_to_excel()

    # 打印前 5 条数据预览
    print("\n数据预览(前 5 条):")
    print("-" * 80)
    for movie in spider.movies[:5]:
        print(f"{movie.rank}. {movie.title} - {movie.rating}分")
        print(f"   {movie.quote}")
        print()


if __name__ == "__main__":
    main()
💡扩展练习
  1. 为爬虫添加命令行参数解析(argparse),支持指定输出格式(Excel/CSV/JSON)和输出路径。
  2. 添加代理支持,通过 proxies 参数使用 HTTP 代理。
  3. 使用多线程(concurrent.futures.ThreadPoolExecutor)并发爬取多页,注意控制并发数避免被封。
  4. 为每个电影详情页添加额外信息爬取(导演、演员、简介等)。

5.8 httpx —— 现代异步 HTTP 客户端

httpx 是 requests 的现代化替代品,支持 HTTP/2、异步请求和更现代化的 API 设计。 如果说 requests 是 axios(经典同步),httpx 就是 axios + async/await 的结合体。

python
"""httpx 示例 (pip install httpx) —— 兼容 requests API,但更现代"""
import httpx

# === 同步用法(与 requests 几乎相同)===
response = httpx.get("https://api.github.com")
print(response.status_code)
data = response.json()

# 带超时和重试的客户端
with httpx.Client(
    timeout=10.0,
    headers={"User-Agent": "MyApp/1.0"}
) as client:
    r = client.get("https://httpbin.org/get", params={"page": 1})
    print(r.json())

# === 异步用法(支持 async/await!)===
import asyncio

async def fetch_all_async():
    """并发获取多个 URL"""
    urls = [
        "https://api.github.com",
        "https://httpbin.org/json",
        "https://httpbin.org/ip"
    ]

    async with httpx.AsyncClient() as client:
        tasks = [client.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)

        for url, resp in zip(urls, responses):
            print(f"{url}: {resp.status_code}")

# 运行
# asyncio.run(fetch_all_async())

5.9 asyncio 并发爬取

传统同步爬虫一次只能处理一个请求,大量 URL 时需要很长时间。使用 asyncio + aiohttp 可以并发处理多个请求,效率提升 10-50 倍。这非常像 Node.js 的事件循环模型:

python
"""异步并发爬虫示例 (pip install aiohttp)"""
import asyncio
import aiohttp
import time

async def fetch(session, url, semaphore):
    """带信号量控制的异步请求"""
    async with semaphore:  # 限制并发数
        try:
            async with session.get(url, timeout=10) as response:
                data = await response.text()
                print(f"完成: {url} ({response.status})")
                return {"url": url, "status": response.status, "length": len(data)}
        except Exception as e:
            print(f"失败: {url} - {e}")
            return {"url": url, "error": str(e)}

async def crawl_all(urls, concurrency=5):
    """并发爬取所有 URL(控制并发数避免被封)"""
    semaphore = asyncio.Semaphore(concurrency)

    async with aiohttp.ClientSession(
        headers={"User-Agent": "MyCrawler/1.0"}
    ) as session:
        tasks = [fetch(session, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

# 性能对比
if __name__ == "__main__":
    urls = [f"https://httpbin.org/delay/1" for _ in range(10)]

    # 异步并发(约 3 秒)
    start = time.time()
    results = asyncio.run(crawl_all(urls, concurrency=5))
    print(f"异步并发耗时: {time.time() - start:.1f}s")

    # 同步顺序(约 10+ 秒)
    # start = time.time()
    # for url in urls:
    #     requests.get(url)
    # print(f"同步顺序耗时: {time.time() - start:.1f}s")
ℹ️何时用异步爬虫
  • 大量 URL(>50):异步能显著减少总耗时
  • IO 密集型:网络请求是典型的 IO 等待,异步收益最大
  • 需要控制并发:用 Semaphore 控制并发数,避免被封
  • 简单爬取仍用同步:代码更易懂,调试更方便

5.10 pydantic 数据验证

爬虫获取到的数据需要验证和清理。pydantic 是 Python 最流行的数据验证库, 通过类型注解自动完成验证、转换和序列化。它类似于 TypeScript 的 interface + 运行时校验:

python
"""pydantic 数据验证示例 (pip install pydantic)"""
from pydantic import BaseModel, Field, field_validator
from typing import Optional

# 定义数据模型(类似 TypeScript 的 interface + 运行时验证)
class Movie(BaseModel):
    title: str = Field(..., min_length=1, description="电影标题")
    rating: float = Field(..., ge=0, le=10, description="评分 0-10")
    year: int = Field(..., ge=1888, le=2100, description="上映年份")
    link: str
    quote: Optional[str] = None

    @field_validator("rating")
    @classmethod
    def rating_must_have_one_decimal(cls, v):
        """评分保留 1 位小数"""
        return round(v, 1)

    @field_validator("link")
    @classmethod
    def link_must_start_with_http(cls, v):
        """链接必须以 http 开头"""
        if not v.startswith("http"):
            raise ValueError("链接必须以 http 开头")
        return v

# 使用示例
# 正确数据
movie = Movie(
    title="肖申克的救赎",
    rating=9.7,
    year=1994,
    link="https://movie.douban.com/subject/1292052/"
)
print(movie.model_dump())  # 输出字典
print(movie.model_dump_json(indent=2))  # 输出 JSON

# 错误数据:pydantic 会抛出 ValidationError
# movie = Movie(title="A", rating=11, year=1994, link="bad-url")
# pydantic_core._pydantic_core.ValidationError: ...
💡pydantic 在爬虫中的应用
  1. 定义目标数据的 schema,确保爬取结果结构一致
  2. 自动过滤异常数据(如评分超出范围、缺少必填字段)
  3. 方便序列化为 JSON/CSV,或直接存入数据库
  4. 配合 FastAPI 使用时,自动生成 API 文档(见第 10 章)