精通FastAPI WebSocket:完整的分步指南
FastAPI 不仅是一个用于构建高性能 API 的现代化 Python Web 框架,它还为处理 WebSocket 提供了卓越的一流支持。本指南将带您从零开始,逐步深入,学习在 FastAPI 应用程序中实现实时、双向通信所需的一切知识。
无论您是想构建一个聊天应用、实时仪表盘,还是任何需要即时数据更新的功能,本文都将为您提供坚实的基础。
目录
- 什么是 WebSocket?
- 环境准备
- 创建您的第一个 WebSocket 端点
- 构建一个简单的前端客户端
- 处理不同类型的数据
- 实现连接管理器与消息广播
- 构建一个完整的聊天室应用
- 处理断开连接与异常
- 总结
1. 什么是 WebSocket?
在传统的 HTTP 模型中,通信是单向的:客户端发起请求,服务器返回响应。如果您需要实时更新,客户端必须不断地轮询服务器以获取新数据,这既低效又浪费资源。
WebSocket 协议 (RFC 6455) 解决了这个问题。它在客户端和服务器之间建立一个持久的、全双工的(bidirectional)通信通道。一旦连接建立,任何一方都可以随时向对方发送数据,从而实现了真正的实时通信。
主要优势:
* 低延迟: 数据可以立即发送,无需等待客户端发起请求。
* 双向通信: 客户端和服务器都可以主动发送消息。
* 高效率: 与 HTTP 轮询相比,开销更小,因为它保持一个单一的 TCP 连接。
* 状态保持: 连接是持久的,使得状态管理更加容易。
2. 环境准备
在开始之前,请确保您已经安装了 Python。我们将使用 pip 来安装 FastAPI 和 uvicorn(一个 ASGI 服务器)。
步骤 1: 创建项目目录和虚拟环境
“`bash
mkdir fastapi-websocket-project
cd fastapi-websocket-project
python -m venv venv
Windows
venv\Scripts\activate
macOS/Linux
source venv/bin/activate
“`
步骤 2: 安装必要的库
bash
pip install "fastapi[all]"
"fastapi[all]" 会安装 FastAPI、Uvicorn,以及其他推荐的依赖,如 websockets 库。
3. 创建您的第一个 WebSocket 端点
让我们从一个简单的 “回声” (Echo) WebSocket 开始。它会接收客户端发送的任何消息,然后原封不动地发回给该客户端。
创建一个名为 main.py 的文件:
“`python
main.py
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.get(“/”)
async def read_root():
return {“message”: “这是一个 FastAPI WebSocket 教程”}
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
# 接受客户端的连接
await websocket.accept()
try:
while True:
# 等待接收客户端发来的消息
data = await websocket.receive_text()
# 向客户端发送消息
await websocket.send_text(f”消息已收到: {data}”)
except Exception as e:
print(f”发生错误: {e}”)
finally:
# 客户端断开连接
print(“客户端已断开连接”)
“`
代码解析:
* from fastapi import WebSocket: 我们从 fastapi 导入 WebSocket 类。
* @app.websocket("/ws"): 这不是常见的 @app.get 或 @app.post,而是专用于 WebSocket 的装饰器。它告诉 FastAPI /ws 这个路径将处理 WebSocket 连接。
* async def websocket_endpoint(websocket: WebSocket): 异步函数接收一个 WebSocket 类型的参数。这个 websocket 对象就是我们与单个客户端进行交互的接口。
* await websocket.accept(): 这是建立连接的关键一步。在调用此方法之前,不能进行任何收发操作。
* while True: 我们进入一个无限循环,以保持连接并持续监听消息。
* await websocket.receive_text(): 等待并接收来自客户端的文本消息。如果客户端断开,此方法会抛出异常。
* await websocket.send_text(...): 向客户端发送文本消息。
运行服务器:
在终端中,运行以下命令:
bash
uvicorn main:app --reload
服务器现在运行在 http://127.0.0.1:8000。
4. 构建一个简单的前端客户端
为了与我们的 WebSocket 端点交互,我们需要一个客户端。我们可以用一个简单的 HTML 文件和几行 JavaScript 来实现。
创建一个名为 index.html 的文件:
“`html
WebSocket 聊天
“`
如何测试:
1. 确保您的 FastAPI 服务器仍在运行。
2. 在浏览器中打开 index.html 文件。
3. 您应该会看到 “已连接到服务器” 的消息。
4. 在输入框中输入任何内容,点击 “发送” 或按回车。
5. 您会立刻看到服务器返回的 消息已收到: ...。
5. 处理不同类型的数据
FastAPI 的 WebSocket 对象提供了处理不同数据类型的专用方法:
- 文本 (Text):
- 接收:
await websocket.receive_text() - 发送:
await websocket.send_text()
- 接收:
- JSON:
- 接收:
await websocket.receive_json()(自动将 JSON 字符串解析为 Python 字典/列表) - 发送:
await websocket.send_json()(自动将 Python 字典/列表序列化为 JSON 字符串)
- 接收:
- 字节 (Bytes):
- 接收:
await websocket.receive_bytes() - 发送:
await websocket.send_bytes()
- 接收:
示例: 使用 JSON
修改 main.py 的端点:
“`python
@app.websocket(“/ws_json”)
async def websocket_json_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
# 假设客户端发送 {“client_id”: “user123”, “message”: “你好”}
client_id = data.get(“client_id”, “未知用户”)
message = data.get(“message”, “”)
await websocket.send_json({
"status": "success",
"response_to": client_id,
"echo_message": message
})
except Exception as e:
print(f"发生错误: {e}")
“`
您需要相应地更新 JavaScript 客户端以发送和解析 JSON。
6. 实现连接管理器与消息广播
到目前为止,我们的 WebSocket 只能与单个客户端通信。但在真实应用中(如聊天室),我们需要将一条消息 广播 给所有连接的客户端。
FastAPI 本身不维护 WebSocket 连接列表,因为每个 WebSocket 都是在独立的上下文中处理的。因此,我们需要自己实现一个连接管理器。
步骤 1: 创建 ConnectionManager.py
创建一个新文件 ConnectionManager.py:
“`python
ConnectionManager.py
from fastapi import WebSocket
from typing import List
class ConnectionManager:
def init(self):
# 存放激活的ws连接对象
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
# 等待连接
await websocket.accept()
# 将连接对象存入列表
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
# 连接关闭时,从列表中移除
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
# 发送个人消息
await websocket.send_text(message)
async def broadcast(self, message: str):
# 广播消息
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
“`
步骤 2: 在 main.py 中使用管理器
现在,我们重构 main.py 来使用这个 ConnectionManager。
“`python
main.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
假设 ConnectionManager.py 与 main.py 在同一目录
from ConnectionManager import manager
app = FastAPI()
提供 HTML 页面的端点
为简单起见,我们将HTML内容放在字符串中
在实际项目中,您会使用 Jinja2 或类似模板引擎
html = “””
WebSocket Chat
Your ID:
“””
@app.get(“/”)
async def get():
return HTMLResponse(html)
@app.websocket(“/ws/{client_id}”)
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket)
# 广播有新用户加入
await manager.broadcast(f”客户端 #{client_id} 加入聊天室”)
try:
while True:
data = await websocket.receive_text()
# 向所有客户端广播收到的消息
await manager.broadcast(f”客户端 #{client_id} 说: {data}”)
except WebSocketDisconnect:
manager.disconnect(websocket)
# 广播有用户离开
await manager.broadcast(f”客户端 #{client_id} 离开聊天室”)
“`
代码解析:
* 我们导入了 manager 的单例。
* @app.websocket("/ws/{client_id}"): 我们在路径中添加了一个 client_id 参数,以便识别不同的用户。
* await manager.connect(websocket): 当一个新客户端连接时,我们将其添加到管理器中。
* await manager.broadcast(...): 当收到消息时,我们不再只回复给发送者,而是通过管理器广播给所有连接的客户端。
* except WebSocketDisconnect:: FastAPI 提供了一个特殊的异常 WebSocketDisconnect,当客户端断开时会触发。
* manager.disconnect(websocket): 在 except 块中,我们从管理器中移除断开的连接,确保不会向一个已关闭的连接发送消息。
7. 构建一个完整的聊天室应用
上面的代码已经包含了后端的聊天室逻辑。前端部分,我们提供了一个完整的HTML,可以直接运行。
如何测试:
1. 将 ConnectionManager.py 和 main.py 放在同一目录下。
2. 运行 uvicorn main:app --reload。
3. 在 两个或多个不同的浏览器标签页或窗口 中打开 http://127.0.0.1:8000。
4. 每个页面都会被分配一个唯一的客户端 ID。
5. 当一个新页面打开时,所有其他页面都会收到 “xxx 加入聊天室” 的广播。
6. 在任何一个页面发送消息,所有页面都会实时收到这条消息。
7. 关闭一个页面,其他页面会收到 “xxx 离开聊天室” 的广播。
8. 处理断开连接与异常
我们已经在上面的例子中使用了 try...except WebSocketDisconnect 结构,这是处理客户端正常关闭的最佳实践。
python
try:
# 循环接收消息
while True:
data = await websocket.receive_text()
await manager.broadcast(f"客户端 #{client_id} 说: {data}")
except WebSocketDisconnect:
# 客户端断开连接
manager.disconnect(websocket)
await manager.broadcast(f"客户端 #{client_id} 离开聊天室")
except Exception as e:
# 处理其他可能的异常
print(f"客户端 #{client_id} 发生错误: {e}")
manager.disconnect(websocket)
将 disconnect 调用放在 finally 块中是另一种确保连接总是被清理的健壮方法,但 WebSocketDisconnect 异常通常更具可读性。
9. 总结
恭喜!您已经成功学习了如何在 FastAPI 中使用 WebSocket,从一个简单的回声服务到一个功能性的实时聊天应用。
我们涵盖的核心要点:
* 使用 @app.websocket() 装饰器定义 WebSocket 路由。
* 通过 websocket.accept() 建立连接。
* 使用 receive_text/json 和 send_text/json 进行双向通信。
* 通过实现 ConnectionManager 来管理多个连接并实现消息广播。
* 使用 try...except WebSocketDisconnect 优雅地处理客户端断开。
FastAPI 使 WebSocket 的开发变得异常简单和直观。以此为基础,您可以构建更复杂的应用,例如集成数据库、实现用户认证、或者将 WebSocket 与 Redis Pub/Sub 等消息队列结合以支持分布式部署。祝您编码愉快!