第 5 章:网络请求与爬虫
作为前端开发者,你已经熟悉 HTTP 请求、DOM 操作和浏览器行为。本章将把这些知识迁移到 Python 世界:用 Requests 发送请求、用 BeautifulSoup 解析 HTML、用 Playwright 控制浏览器,最终完成一个完整的爬虫实战项目。
5.1 HTTP 基础回顾
前端开发者对 HTTP 已经非常熟悉,本节快速回顾核心概念,建立与 Python 实现的对应关系。
请求方法
| 方法 | 前端场景 | Python 场景 |
|---|---|---|
GET |
页面加载、获取数据 | 爬取网页、调用 API |
POST |
表单提交、创建资源 | 提交数据、登录 |
PUT |
更新资源(完整替换) | 更新 API 资源 |
PATCH |
局部更新 | 部分字段更新 |
DELETE |
删除资源 | 删除 API 资源 |
常见状态码
| 状态码 | 含义 | 爬虫处理建议 |
|---|---|---|
200 |
成功 | 正常处理响应内容 |
301/302 |
重定向 | Requests 默认自动跟随 |
400 |
请求参数错误 | 检查 URL 参数和请求体 |
403 |
禁止访问 | 可能需要 Cookie、Referer 或更换 IP |
404 |
资源不存在 | URL 可能已变更 |
429 |
请求过于频繁 | 降低请求频率,添加延时 |
500 |
服务器内部错误 | 稍后重试 |
Headers 与 Cookie
前端通过 fetch 或 xhr 发送请求时设置的 Headers,在 Python 中同样需要关注:
User-Agent:标识客户端类型,爬虫必须模拟浏览器 UAReferer:来源页面,部分网站会校验Accept:接受的内容类型Cookie:维持登录状态、存储会话信息Authorization:API 认证(Bearer Token、Basic Auth 等)
5.2 Requests 库
requests 是 Python 最知名的 HTTP 库,以"HTTP for Humans"为设计理念,API 极其简洁优雅。它对应前端生态中的 axios。
Requests = axios(但无需处理 Promise,Python 的同步代码更直观)。如果需要异步请求,可以使用 aiohttp 或 httpx。
安装与基础用法
pip install requests
import requests
# ========== GET 请求 ==========
# 类似 JS: axios.get('https://api.github.com')
response = requests.get("https://api.github.com")
# 状态码
print(response.status_code) # 200
# 响应头(类似 response.headers)
print(response.headers["Content-Type"]) # application/json
# JSON 响应(自动解析,类似 response.data)
data = response.json()
print(data["current_user_url"])
# 文本响应(HTML 页面等)
html = response.text
# 二进制响应(图片、文件等)
content = response.content
# ========== 带参数的 GET ==========
# 类似 JS: axios.get('/search', { params: { q: 'python' } })
params = {"q": "python", "page": 1}
response = requests.get("https://httpbin.org/get", params=params)
print(response.url) # https://httpbin.org/get?q=python&page=1
# ========== POST 请求 ==========
# 类似 JS: axios.post('/login', { username: 'xxx', password: 'xxx' })
payload = {"username": "admin", "password": "secret"}
response = requests.post("https://httpbin.org/post", json=payload)
# 表单提交(Content-Type: application/x-www-form-urlencoded)
response = requests.post("https://httpbin.org/post", data=payload)
# 上传文件
with open("document.pdf", "rb") as f:
files = {"file": ("document.pdf", f, "application/pdf")}
response = requests.post("https://httpbin.org/post", files=files)
# ========== 设置 Headers ==========
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.0",
"Accept": "application/json",
"Authorization": "Bearer your_token_here",
}
response = requests.get("https://api.example.com/data", headers=headers)
# ========== 错误处理 ==========
try:
response = requests.get("https://api.example.com/data", timeout=10)
response.raise_for_status() # 如果状态码 >= 400,抛出 HTTPError
except requests.exceptions.HTTPError as e:
print(f"HTTP 错误: {e}")
except requests.exceptions.ConnectionError:
print("连接失败")
except requests.exceptions.Timeout:
print("请求超时")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
Session 保持会话
Session 对象可以在多次请求之间保持 Cookie,类似浏览器中的会话保持。这在需要登录后才能访问的页面中非常重要。
import requests
# 创建 Session(类似浏览器的一个标签页)
session = requests.Session()
# 设置全局 Headers(该 Session 的所有请求都会带上)
session.headers.update({
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
})
# 第一步:登录(Session 会自动保存返回的 Cookie)
login_data = {"username": "myuser", "password": "mypass"}
login_response = session.post("https://example.com/login", data=login_data)
# 第二步:访问需要登录的页面(自动带上登录 Cookie)
dashboard = session.get("https://example.com/dashboard")
print(dashboard.text)
# 第三步:继续操作(Cookie 持续有效)
profile = session.get("https://example.com/profile")
# 查看当前保存的 Cookie
print(session.cookies.get_dict())
# 手动设置 Cookie
session.cookies.set("session_id", "abc123")
# 关闭 Session
session.close()
超时与重试
网络请求不稳定时,合理的超时设置和重试机制能大幅提升脚本的健壮性。
import time
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_session_with_retry(
retries: int = 3,
backoff_factor: float = 0.3,
status_forcelist: tuple[int, ...] = (500, 502, 503, 504),
) -> requests.Session:
"""创建带自动重试的 Session.
参数:
retries: 最大重试次数
backoff_factor: 重试间隔系数(第一次等 0.3s,第二次 0.6s,第三次 1.2s...)
status_forcelist: 遇到这些状态码时重试
"""
session = requests.Session()
retry_strategy = Retry(
total=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
allowed_methods=["HEAD", "GET", "OPTIONS", "POST"],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
# 使用
session = create_session_with_retry()
response = session.get("https://api.example.com/data", timeout=(3, 27))
# timeout=(连接超时, 读取超时)
# 简单的手动重试示例
def fetch_with_retry(url: str, max_retries: int = 3) -> requests.Response:
"""手动实现重试逻辑."""
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise
wait_time = 2 ** attempt # 指数退避
print(f"请求失败,{wait_time}秒后重试...")
time.sleep(wait_time)
raise RuntimeError("Unreachable")
5.3 HTML 解析
获取 HTML 后,需要从中提取数据。前端开发者熟悉 DOM 操作和 CSS 选择器,Python 的 BeautifulSoup 提供了几乎相同的体验。
BeautifulSoup 基础
"""BeautifulSoup 示例 - 类似前端的 DOM 查询.
安装: pip install beautifulsoup4
还需要解析器: pip install lxml(更快)或直接使用内置的 html.parser
"""
from bs4 import BeautifulSoup
import requests
# 获取示例 HTML
html_doc = """
示例页面
"""
# 创建 Soup 对象(类似 JS 的 DOM 解析)
soup = BeautifulSoup(html_doc, "lxml") # 或 "html.parser"
# ========== 基本查找 ==========
# 通过标签名查找(类似 document.querySelector('title'))
title = soup.find("title")
print(title.text) # 示例页面
print(title.get_text()) # 同上
# 通过 id 查找(类似 document.getElementById('title'))
heading = soup.find(id="title")
print(heading.text)
# 通过 class 查找(类似 document.querySelector('.items'))
# 注意:class 是 Python 关键字,参数名用 class_
item_list = soup.find("ul", class_="items")
# 查找所有 li(类似 document.querySelectorAll('li'))
items = soup.find_all("li")
for item in items:
print(item.text.strip())
# ========== CSS 选择器 ==========
# select() 方法支持完整的 CSS 选择器语法!
# 类选择器
hot_tags = soup.select(".hot") # 类似 .querySelectorAll('.hot')
first_item = soup.select_one(".items li") # 类似 .querySelector('.items li')
# ID 选择器
title_elem = soup.select_one("#title") # 类似 #title
# 属性选择器
links = soup.select("a[href^='/item/']") # href 以 /item/ 开头
item_2 = soup.select_one('li[data-id="2"]')
# 组合选择器
price = soup.select_one(".info .price") # 后代选择器
tags = soup.select(".container > .info") # 子选择器
# 伪类选择器(部分支持)
first_li = soup.select_one("li:first-child")
last_li = soup.select_one("li:last-child")
# ========== 提取数据 ==========
# 获取属性(类似 element.getAttribute('href'))
link = soup.select_one("a")
print(link["href"]) # /item/1
print(link.get("href")) # 同上
print(link.attrs) # {'href': '/item/1'}
# 获取自定义 data 属性
li = soup.select_one('li[data-id="2"]')
print(li["data-id"]) # 2
# 获取文本内容
print(link.text) # 商品 1
print(link.string) # 商品 1(如果只有一个子节点)
print(link.get_text(strip=True)) # 去除空白
# ========== 实际爬虫示例 ==========
def fetch_article_titles(url: str) -> list[dict[str, str]]:
"""从博客首页提取文章标题和链接.
类似于前端用 querySelectorAll 提取列表数据。
"""
response = requests.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "lxml")
articles = []
# 假设文章结构:
for article in soup.select("article.post"):
title_elem = article.select_one("h2 a")
if title_elem:
articles.append({
"title": title_elem.get_text(strip=True),
"url": title_elem["href"],
})
return articles
# 使用
# articles = fetch_article_titles("https://example.com/blog")
# for article in articles:
# print(f"{article['title']}: {article['url']}")
lxml 简介
lxml 是高性能的 XML/HTML 解析库,BeautifulSoup 可以借助它提升解析速度。对于大规模爬取,直接使用 lxml 的 XPath 查询效率更高。
from lxml import html
import requests
# 使用 lxml 直接解析
response = requests.get("https://example.com")
tree = html.fromstring(response.content)
# XPath 查询(比 CSS 选择器更强大)
# 提取所有链接
titles = tree.xpath("//h1/text()")
links = tree.xpath("//a/@href")
# 带条件的 XPath
items = tree.xpath('//li[@data-id]/a/text()')
# 提取表格数据
rows = tree.xpath("//table/tr")
for row in rows:
cells = row.xpath("./td/text()")
print(cells)
# XPath 函数
first_title = tree.xpath("//h1/text()")[0] # 取第一个
item_count = tree.xpath("count(//li)") # 计数
前端开发者熟悉 CSS 选择器,建议先用 BeautifulSoup 的 select() 方法。遇到复杂查询(如文本内容匹配、父节点选择)时,再考虑 XPath。
5.4 浏览器自动化
有些网站使用 JavaScript 动态渲染内容,或者需要复杂的用户交互。这时需要控制真实的浏览器,就像前端自动化测试一样。
Playwright for Python
Playwright 是微软开源的浏览器自动化框架,支持 Python、Node.js、Java 等语言。前端开发者可能对它的 Node.js 版本已经很熟悉了。
Playwright 的 Python API 与 Node.js 版本几乎一致。如果你用过 @playwright/test,迁移到 Python 版本几乎没有学习成本。
# 安装 Playwright
pip install playwright
# 安装浏览器(Chromium、Firefox、WebKit)
playwright install chromium
"""Playwright 示例 - 浏览器自动化.
对比 Node.js 版本,API 几乎完全一致。
"""
from playwright.sync_api import sync_playwright
def scrape_dynamic_page() -> None:
"""爬取 JavaScript 渲染的页面."""
with sync_playwright() as p:
# 启动浏览器(无头模式,类似 headless: true)
browser = p.chromium.launch(headless=True)
# 创建新页面(类似 browser.newPage())
page = browser.new_page()
# 设置视口
page.set_viewport_size({"width": 1280, "height": 720})
# 访问页面
page.goto("https://example.com")
# 等待元素加载(类似 page.waitForSelector)
page.wait_for_selector(".content-loaded")
# 提取数据(类似 page.evaluate)
title = page.title()
print(f"页面标题: {title}")
# 获取元素文本
items = page.query_selector_all(".item")
for item in items:
text = item.inner_text()
print(text)
# 截图
page.screenshot(path="screenshot.png", full_page=True)
# 关闭浏览器
browser.close()
def scrape_with_interaction() -> None:
"""需要交互的页面爬取示例."""
with sync_playwright() as p:
browser = p.chromium.launch(headless=False) # 有头模式,方便调试
context = browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
)
page = context.new_page()
# 访问登录页
page.goto("https://example.com/login")
# 填写表单(类似 page.fill)
page.fill("input[name='username']", "myuser")
page.fill("input[name='password']", "mypass")
# 点击登录按钮
page.click("button[type='submit']")
# 等待导航完成
page.wait_for_url("https://example.com/dashboard")
# 等待动态内容加载
page.wait_for_selector(".data-table")
# 提取表格数据
rows = page.query_selector_all(".data-table tr")
data = []
for row in rows[1:]: # 跳过表头
cells = row.query_selector_all("td")
data.append({
"name": cells[0].inner_text(),
"value": cells[1].inner_text(),
})
print(data)
# 保存 Cookie 供后续使用
cookies = context.cookies()
print(cookies)
browser.close()
def scrape_with_scroll() -> None:
"""滚动加载的页面(无限滚动)."""
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto("https://example.com/infinite-scroll")
# 滚动到页面底部多次,触发加载
for _ in range(5):
page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
page.wait_for_timeout(1000) # 等待 1 秒
# 现在所有内容都已加载
items = page.query_selector_all(".item")
print(f"共加载 {len(items)} 条数据")
browser.close()
# 运行
# scrape_dynamic_page()
等待策略
等待是浏览器自动化的核心。Playwright 提供了多种等待方式,与前端测试中的等待策略完全一致。
# 等待元素出现在 DOM 中(默认 30 秒超时)
page.wait_for_selector(".item")
# 等待元素可见
page.wait_for_selector(".item", state="visible")
# 等待元素从 DOM 中消失
page.wait_for_selector(".loading", state="hidden")
# 等待网络空闲(类似 Cypress 的 cy.wait())
page.wait_for_load_state("networkidle")
# 等待特定请求完成
with page.expect_response("**/api/data") as response_info:
page.click("#load-more")
response = response_info.value
print(response.json())
# 等待超时设置
page.wait_for_selector(".item", timeout=10000) # 10 秒超时
5.5 数据存储
爬取到的数据需要持久化存储。Python 标准库提供了 CSV、JSON、SQLite 的支持,无需额外安装。
CSV 存储
import csv
# 写入 CSV
movies = [
{"title": "肖申克的救赎", "rating": 9.7, "year": 1994},
{"title": "霸王别姬", "rating": 9.6, "year": 1993},
{"title": "阿甘正传", "rating": 9.5, "year": 1994},
]
with open("movies.csv", "w", newline="", encoding="utf-8-sig") as f:
# fieldnames 定义表头
writer = csv.DictWriter(f, fieldnames=["title", "rating", "year"])
writer.writeheader()
writer.writerows(movies)
# 读取 CSV
with open("movies.csv", "r", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
for row in reader:
print(f"{row['title']}: {row['rating']}")
# 使用 pandas 处理大数据量(推荐)
# import pandas as pd
# df = pd.DataFrame(movies)
# df.to_csv("movies.csv", index=False, encoding="utf-8-sig")
# df = pd.read_csv("movies.csv")
JSON 存储
import json
# Python 字典 -> JSON 字符串(类似 JSON.stringify)
data = {"name": "Python", "year": 1991, "creator": "Guido van Rossum"}
json_str = json.dumps(data, ensure_ascii=False, indent=2)
print(json_str)
# 写入 JSON 文件
with open("data.json", "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# JSON 字符串 -> Python 对象(类似 JSON.parse)
parsed = json.loads(json_str)
print(parsed["name"])
# 读取 JSON 文件
with open("data.json", "r", encoding="utf-8") as f:
loaded = json.load(f)
# 处理非标准类型(如 datetime)
from datetime import datetime
def datetime_encoder(obj):
"""自定义 JSON 编码器."""
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
now = {"time": datetime.now()}
json_str = json.dumps(now, default=datetime_encoder)
SQLite 轻量数据库
SQLite 是嵌入式的关系型数据库,无需单独安装服务器,非常适合爬虫数据的本地存储。它对应前端生态中的 IndexedDB,但使用 SQL 查询。
import sqlite3
from datetime import datetime
def init_db(db_path: str = "movies.db") -> sqlite3.Connection:
"""初始化数据库并创建表."""
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS movies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
rating REAL,
year INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
return conn
def save_movie(conn: sqlite3.Connection, title: str, rating: float, year: int) -> None:
"""保存电影数据."""
conn.execute(
"INSERT INTO movies (title, rating, year) VALUES (?, ?, ?)",
(title, rating, year),
)
conn.commit()
def get_all_movies(conn: sqlite3.Connection) -> list[dict]:
"""获取所有电影."""
conn.row_factory = sqlite3.Row # 让查询结果可以像字典一样访问
cursor = conn.execute("SELECT * FROM movies ORDER BY rating DESC")
return [dict(row) for row in cursor.fetchall()]
def search_movies(conn: sqlite3.Connection, keyword: str) -> list[dict]:
"""搜索电影."""
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"SELECT * FROM movies WHERE title LIKE ? ORDER BY rating DESC",
(f"%{keyword}%",),
)
return [dict(row) for row in cursor.fetchall()]
# 使用示例
conn = init_db()
# 保存数据
save_movie(conn, "肖申克的救赎", 9.7, 1994)
save_movie(conn, "霸王别姬", 9.6, 1993)
# 查询数据
movies = get_all_movies(conn)
for movie in movies:
print(f"{movie['title']}: {movie['rating']}")
# 搜索
results = search_movies(conn, "肖")
print(f"找到 {len(results)} 条结果")
conn.close()
对于大规模爬虫项目,推荐使用 SQLAlchemy ORM 框架操作数据库,它类似前端的 Prisma 或 TypeORM,可以用 Python 对象的方式操作数据表。
5.6 反爬与合规
爬虫是一把双刃剑。本节介绍常见的反爬机制和应对策略,以及必须遵守的法律和道德规范。
常见反爬手段
| 反爬手段 | 说明 | 应对策略 |
|---|---|---|
| User-Agent 检测 | 检查请求头中的客户端标识 | 设置真实的浏览器 UA |
| IP 频率限制 | 同一 IP 短时间内请求过多 | 降低请求频率、使用代理池 |
| Cookie/Session 验证 | 需要登录或特定 Cookie | 使用 Session 维持登录状态 |
| 验证码 | 图形验证码、滑块验证等 | 打码平台、OCR、或放弃爬取 |
| 动态渲染 | 数据由 JavaScript 动态加载 | 使用 Playwright/Selenium |
| 字体反爬 | 使用自定义字体隐藏真实内容 | 分析字体映射关系 |
请求频率控制
import random
import time
def polite_request(url: str, min_delay: float = 1.0, max_delay: float = 3.0) -> requests.Response:
"""礼貌地发送请求,带随机延时.
模拟人类浏览行为,避免触发频率限制。
"""
# 随机延时(避免固定间隔被识别为机器人)
delay = random.uniform(min_delay, max_delay)
time.sleep(delay)
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.0"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
}
return requests.get(url, headers=headers, timeout=10)
# 批量爬取时控制频率
urls = [f"https://example.com/page/{i}" for i in range(1, 11)]
for url in urls:
try:
response = polite_request(url)
print(f"成功: {url} ({response.status_code})")
except requests.exceptions.RequestException as e:
print(f"失败: {url} - {e}")
# 遇到错误增加额外冷却时间
time.sleep(10)
robots.txt 与法律合规
爬虫必须遵守以下原则:
- 遵守 robots.txt:网站根目录的 robots.txt 文件声明了允许/禁止爬取的路径。使用
robotparser模块检查。 - 不要爬取个人隐私数据:用户信息、聊天记录等敏感数据严禁爬取。
- 控制请求频率:不要对目标网站造成过大负载,建议每秒不超过 1 个请求。
- 遵守网站服务条款:很多网站的使用条款明确禁止爬虫行为。
- 数据使用合规:爬取的数据不得用于商业竞争、侵犯版权等非法用途。
import urllib.robotparser
# 检查 robots.txt
rp = urllib.robotparser.RobotFileParser()
rp.set_url("https://movie.douban.com/robots.txt")
rp.read()
# 检查是否允许爬取特定 URL
can_fetch = rp.can_fetch("*", "https://movie.douban.com/top250")
print(f"是否允许爬取: {can_fetch}")
# 获取建议的爬取间隔
crawl_delay = rp.crawl_delay("*")
if crawl_delay:
print(f"建议爬取间隔: {crawl_delay} 秒")
5.7 实战案例:爬取豆瓣电影 Top250
综合运用本章所学,完成一个完整的爬虫项目:爬取豆瓣电影 Top250,并将结果保存到 Excel。
豆瓣电影 Top250 每页展示 25 部电影,共 10 页。URL 格式为 https://movie.douban.com/top250?start={0|25|50...}。本案例仅用于学习目的,请遵守豆瓣 robots.txt 和使用条款。
#!/usr/bin/env python3
"""豆瓣电影 Top250 爬虫.
功能:
- 爬取豆瓣电影 Top250 的标题、评分、评价人数、链接
- 保存到 Excel 文件
- 包含完整的错误处理和频率控制
安装依赖:
pip install requests beautifulsoup4 openpyxl
"""
import random
import time
from dataclasses import dataclass
import requests
from bs4 import BeautifulSoup
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment
@dataclass
class Movie:
"""电影数据类."""
rank: int # 排名
title: str # 标题
rating: float # 评分
review_count: str # 评价人数
link: str # 详情链接
quote: str # 经典台词
class DoubanSpider:
"""豆瓣电影爬虫."""
BASE_URL = "https://movie.douban.com/top250"
PER_PAGE = 25
TOTAL_PAGES = 10
def __init__(self) -> None:
"""初始化爬虫."""
self.session = requests.Session()
self.session.headers.update({
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9",
})
self.movies: list[Movie] = []
def fetch_page(self, start: int) -> str | None:
"""获取单页 HTML.
参数:
start: 起始偏移量(0, 25, 50, ...)
返回:
HTML 文本,失败返回 None
"""
params = {"start": start}
try:
# 礼貌请求:随机延时
time.sleep(random.uniform(1.5, 3.0))
response = self.session.get(
self.BASE_URL,
params=params,
timeout=10,
)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
print(f"请求失败 (start={start}): {e}")
return None
def parse_page(self, html: str) -> list[Movie]:
"""解析单页电影数据.
参数:
html: 页面 HTML
返回:
该页面的电影列表
"""
soup = BeautifulSoup(html, "lxml")
movies = []
# 豆瓣 Top250 的列表项选择器
items = soup.select("div.item")
for item in items:
try:
# 排名
rank = int(item.select_one("em").text)
# 标题(可能有中文名和英文名)
title_elem = item.select_one("span.title")
title = title_elem.text if title_elem else "未知"
# 评分
rating_elem = item.select_one("span.rating_num")
rating = float(rating_elem.text) if rating_elem else 0.0
# 评价人数
review_elem = item.select_one("div.star span:last-child")
review_count = review_elem.text if review_elem else "0人评价"
# 链接
link_elem = item.select_one("div.hd a")
link = link_elem["href"] if link_elem else ""
# 经典台词
quote_elem = item.select_one("span.inq")
quote = quote_elem.text if quote_elem else ""
movies.append(Movie(
rank=rank,
title=title,
rating=rating,
review_count=review_count,
link=link,
quote=quote,
))
except (AttributeError, ValueError) as e:
print(f"解析单项失败: {e}")
continue
return movies
def run(self) -> None:
"""运行爬虫,获取所有数据."""
print("开始爬取豆瓣电影 Top250...")
for page in range(self.TOTAL_PAGES):
start = page * self.PER_PAGE
print(f"正在爬取第 {page + 1}/{self.TOTAL_PAGES} 页...")
html = self.fetch_page(start)
if html is None:
print(f"第 {page + 1} 页获取失败,跳过")
continue
page_movies = self.parse_page(html)
self.movies.extend(page_movies)
print(f"第 {page + 1} 页获取 {len(page_movies)} 条数据")
print(f"\n爬取完成!共获取 {len(self.movies)} 部电影")
def save_to_excel(self, filename: str = "douban_top250.xlsx") -> None:
"""保存数据到 Excel.
参数:
filename: 输出文件名
"""
if not self.movies:
print("没有数据可保存")
return
wb = Workbook()
ws = wb.active
ws.title = "豆瓣电影 Top250"
# 设置表头
headers = ["排名", "标题", "评分", "评价人数", "经典台词", "链接"]
header_fill = PatternFill(
start_color="366092",
end_color="366092",
fill_type="solid",
)
header_font = Font(bold=True, color="FFFFFF")
for col, header in enumerate(headers, 1):
cell = ws.cell(row=1, column=col, value=header)
cell.fill = header_fill
cell.font = header_font
cell.alignment = Alignment(horizontal="center")
# 填充数据
for row_idx, movie in enumerate(self.movies, 2):
ws.cell(row=row_idx, column=1, value=movie.rank)
ws.cell(row=row_idx, column=2, value=movie.title)
ws.cell(row=row_idx, column=3, value=movie.rating)
ws.cell(row=row_idx, column=4, value=movie.review_count)
ws.cell(row=row_idx, column=5, value=movie.quote)
ws.cell(row=row_idx, column=6, value=movie.link)
# 调整列宽
ws.column_dimensions["A"].width = 8
ws.column_dimensions["B"].width = 25
ws.column_dimensions["C"].width = 10
ws.column_dimensions["D"].width = 15
ws.column_dimensions["E"].width = 40
ws.column_dimensions["F"].width = 50
wb.save(filename)
print(f"数据已保存到: {filename}")
def main() -> None:
"""主函数."""
spider = DoubanSpider()
spider.run()
spider.save_to_excel()
# 打印前 5 条数据预览
print("\n数据预览(前 5 条):")
print("-" * 80)
for movie in spider.movies[:5]:
print(f"{movie.rank}. {movie.title} - {movie.rating}分")
print(f" {movie.quote}")
print()
if __name__ == "__main__":
main()
- 为爬虫添加命令行参数解析(
argparse),支持指定输出格式(Excel/CSV/JSON)和输出路径。 - 添加代理支持,通过
proxies参数使用 HTTP 代理。 - 使用多线程(
concurrent.futures.ThreadPoolExecutor)并发爬取多页,注意控制并发数避免被封。 - 为每个电影详情页添加额外信息爬取(导演、演员、简介等)。
5.8 httpx —— 现代异步 HTTP 客户端
httpx 是 requests 的现代化替代品,支持 HTTP/2、异步请求和更现代化的 API 设计。
如果说 requests 是 axios(经典同步),httpx 就是 axios + async/await 的结合体。
"""httpx 示例 (pip install httpx) —— 兼容 requests API,但更现代"""
import httpx
# === 同步用法(与 requests 几乎相同)===
response = httpx.get("https://api.github.com")
print(response.status_code)
data = response.json()
# 带超时和重试的客户端
with httpx.Client(
timeout=10.0,
headers={"User-Agent": "MyApp/1.0"}
) as client:
r = client.get("https://httpbin.org/get", params={"page": 1})
print(r.json())
# === 异步用法(支持 async/await!)===
import asyncio
async def fetch_all_async():
"""并发获取多个 URL"""
urls = [
"https://api.github.com",
"https://httpbin.org/json",
"https://httpbin.org/ip"
]
async with httpx.AsyncClient() as client:
tasks = [client.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
for url, resp in zip(urls, responses):
print(f"{url}: {resp.status_code}")
# 运行
# asyncio.run(fetch_all_async())
5.9 asyncio 并发爬取
传统同步爬虫一次只能处理一个请求,大量 URL 时需要很长时间。使用 asyncio + aiohttp
可以并发处理多个请求,效率提升 10-50 倍。这非常像 Node.js 的事件循环模型:
"""异步并发爬虫示例 (pip install aiohttp)"""
import asyncio
import aiohttp
import time
async def fetch(session, url, semaphore):
"""带信号量控制的异步请求"""
async with semaphore: # 限制并发数
try:
async with session.get(url, timeout=10) as response:
data = await response.text()
print(f"完成: {url} ({response.status})")
return {"url": url, "status": response.status, "length": len(data)}
except Exception as e:
print(f"失败: {url} - {e}")
return {"url": url, "error": str(e)}
async def crawl_all(urls, concurrency=5):
"""并发爬取所有 URL(控制并发数避免被封)"""
semaphore = asyncio.Semaphore(concurrency)
async with aiohttp.ClientSession(
headers={"User-Agent": "MyCrawler/1.0"}
) as session:
tasks = [fetch(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
return results
# 性能对比
if __name__ == "__main__":
urls = [f"https://httpbin.org/delay/1" for _ in range(10)]
# 异步并发(约 3 秒)
start = time.time()
results = asyncio.run(crawl_all(urls, concurrency=5))
print(f"异步并发耗时: {time.time() - start:.1f}s")
# 同步顺序(约 10+ 秒)
# start = time.time()
# for url in urls:
# requests.get(url)
# print(f"同步顺序耗时: {time.time() - start:.1f}s")
- 大量 URL(>50):异步能显著减少总耗时
- IO 密集型:网络请求是典型的 IO 等待,异步收益最大
- 需要控制并发:用
Semaphore控制并发数,避免被封 - 简单爬取仍用同步:代码更易懂,调试更方便
5.10 pydantic 数据验证
爬虫获取到的数据需要验证和清理。pydantic 是 Python 最流行的数据验证库,
通过类型注解自动完成验证、转换和序列化。它类似于 TypeScript 的 interface + 运行时校验:
"""pydantic 数据验证示例 (pip install pydantic)"""
from pydantic import BaseModel, Field, field_validator
from typing import Optional
# 定义数据模型(类似 TypeScript 的 interface + 运行时验证)
class Movie(BaseModel):
title: str = Field(..., min_length=1, description="电影标题")
rating: float = Field(..., ge=0, le=10, description="评分 0-10")
year: int = Field(..., ge=1888, le=2100, description="上映年份")
link: str
quote: Optional[str] = None
@field_validator("rating")
@classmethod
def rating_must_have_one_decimal(cls, v):
"""评分保留 1 位小数"""
return round(v, 1)
@field_validator("link")
@classmethod
def link_must_start_with_http(cls, v):
"""链接必须以 http 开头"""
if not v.startswith("http"):
raise ValueError("链接必须以 http 开头")
return v
# 使用示例
# 正确数据
movie = Movie(
title="肖申克的救赎",
rating=9.7,
year=1994,
link="https://movie.douban.com/subject/1292052/"
)
print(movie.model_dump()) # 输出字典
print(movie.model_dump_json(indent=2)) # 输出 JSON
# 错误数据:pydantic 会抛出 ValidationError
# movie = Movie(title="A", rating=11, year=1994, link="bad-url")
# pydantic_core._pydantic_core.ValidationError: ...
- 定义目标数据的 schema,确保爬取结果结构一致
- 自动过滤异常数据(如评分超出范围、缺少必填字段)
- 方便序列化为 JSON/CSV,或直接存入数据库
- 配合 FastAPI 使用时,自动生成 API 文档(见第 10 章)