精通FastAPI WebSocket:完整的分步指南 – wiki大全

精通FastAPI WebSocket:完整的分步指南

FastAPI 不仅是一个用于构建高性能 API 的现代化 Python Web 框架,它还为处理 WebSocket 提供了卓越的一流支持。本指南将带您从零开始,逐步深入,学习在 FastAPI 应用程序中实现实时、双向通信所需的一切知识。

无论您是想构建一个聊天应用、实时仪表盘,还是任何需要即时数据更新的功能,本文都将为您提供坚实的基础。

目录

  1. 什么是 WebSocket?
  2. 环境准备
  3. 创建您的第一个 WebSocket 端点
  4. 构建一个简单的前端客户端
  5. 处理不同类型的数据
  6. 实现连接管理器与消息广播
  7. 构建一个完整的聊天室应用
  8. 处理断开连接与异常
  9. 总结

1. 什么是 WebSocket?

在传统的 HTTP 模型中,通信是单向的:客户端发起请求,服务器返回响应。如果您需要实时更新,客户端必须不断地轮询服务器以获取新数据,这既低效又浪费资源。

WebSocket 协议 (RFC 6455) 解决了这个问题。它在客户端和服务器之间建立一个持久的、全双工的(bidirectional)通信通道。一旦连接建立,任何一方都可以随时向对方发送数据,从而实现了真正的实时通信。

主要优势:
* 低延迟: 数据可以立即发送,无需等待客户端发起请求。
* 双向通信: 客户端和服务器都可以主动发送消息。
* 高效率: 与 HTTP 轮询相比,开销更小,因为它保持一个单一的 TCP 连接。
* 状态保持: 连接是持久的,使得状态管理更加容易。

2. 环境准备

在开始之前,请确保您已经安装了 Python。我们将使用 pip 来安装 FastAPI 和 uvicorn(一个 ASGI 服务器)。

步骤 1: 创建项目目录和虚拟环境

“`bash
mkdir fastapi-websocket-project
cd fastapi-websocket-project
python -m venv venv

Windows

venv\Scripts\activate

macOS/Linux

source venv/bin/activate
“`

步骤 2: 安装必要的库

bash
pip install "fastapi[all]"

"fastapi[all]" 会安装 FastAPI、Uvicorn,以及其他推荐的依赖,如 websockets 库。

3. 创建您的第一个 WebSocket 端点

让我们从一个简单的 “回声” (Echo) WebSocket 开始。它会接收客户端发送的任何消息,然后原封不动地发回给该客户端。

创建一个名为 main.py 的文件:

“`python

main.py

from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.get(“/”)
async def read_root():
return {“message”: “这是一个 FastAPI WebSocket 教程”}

@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
# 接受客户端的连接
await websocket.accept()
try:
while True:
# 等待接收客户端发来的消息
data = await websocket.receive_text()
# 向客户端发送消息
await websocket.send_text(f”消息已收到: {data}”)
except Exception as e:
print(f”发生错误: {e}”)
finally:
# 客户端断开连接
print(“客户端已断开连接”)

“`

代码解析:
* from fastapi import WebSocket: 我们从 fastapi 导入 WebSocket 类。
* @app.websocket("/ws"): 这不是常见的 @app.get@app.post,而是专用于 WebSocket 的装饰器。它告诉 FastAPI /ws 这个路径将处理 WebSocket 连接。
* async def websocket_endpoint(websocket: WebSocket): 异步函数接收一个 WebSocket 类型的参数。这个 websocket 对象就是我们与单个客户端进行交互的接口。
* await websocket.accept(): 这是建立连接的关键一步。在调用此方法之前,不能进行任何收发操作。
* while True: 我们进入一个无限循环,以保持连接并持续监听消息。
* await websocket.receive_text(): 等待并接收来自客户端的文本消息。如果客户端断开,此方法会抛出异常。
* await websocket.send_text(...): 向客户端发送文本消息。

运行服务器:
在终端中,运行以下命令:
bash
uvicorn main:app --reload

服务器现在运行在 http://127.0.0.1:8000

4. 构建一个简单的前端客户端

为了与我们的 WebSocket 端点交互,我们需要一个客户端。我们可以用一个简单的 HTML 文件和几行 JavaScript 来实现。

创建一个名为 index.html 的文件:

“`html




FastAPI WebSocket 聊天室

WebSocket 聊天





“`

如何测试:
1. 确保您的 FastAPI 服务器仍在运行。
2. 在浏览器中打开 index.html 文件。
3. 您应该会看到 “已连接到服务器” 的消息。
4. 在输入框中输入任何内容,点击 “发送” 或按回车。
5. 您会立刻看到服务器返回的 消息已收到: ...

5. 处理不同类型的数据

FastAPI 的 WebSocket 对象提供了处理不同数据类型的专用方法:

  • 文本 (Text):
    • 接收: await websocket.receive_text()
    • 发送: await websocket.send_text()
  • JSON:
    • 接收: await websocket.receive_json() (自动将 JSON 字符串解析为 Python 字典/列表)
    • 发送: await websocket.send_json() (自动将 Python 字典/列表序列化为 JSON 字符串)
  • 字节 (Bytes):
    • 接收: await websocket.receive_bytes()
    • 发送: await websocket.send_bytes()

示例: 使用 JSON

修改 main.py 的端点:
“`python
@app.websocket(“/ws_json”)
async def websocket_json_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
# 假设客户端发送 {“client_id”: “user123”, “message”: “你好”}
client_id = data.get(“client_id”, “未知用户”)
message = data.get(“message”, “”)

        await websocket.send_json({
            "status": "success",
            "response_to": client_id,
            "echo_message": message
        })
except Exception as e:
    print(f"发生错误: {e}")

“`
您需要相应地更新 JavaScript 客户端以发送和解析 JSON。

6. 实现连接管理器与消息广播

到目前为止,我们的 WebSocket 只能与单个客户端通信。但在真实应用中(如聊天室),我们需要将一条消息 广播 给所有连接的客户端。

FastAPI 本身不维护 WebSocket 连接列表,因为每个 WebSocket 都是在独立的上下文中处理的。因此,我们需要自己实现一个连接管理器。

步骤 1: 创建 ConnectionManager.py

创建一个新文件 ConnectionManager.py:

“`python

ConnectionManager.py

from fastapi import WebSocket
from typing import List

class ConnectionManager:
def init(self):
# 存放激活的ws连接对象
self.active_connections: List[WebSocket] = []

async def connect(self, websocket: WebSocket):
    # 等待连接
    await websocket.accept()
    # 将连接对象存入列表
    self.active_connections.append(websocket)

def disconnect(self, websocket: WebSocket):
    # 连接关闭时,从列表中移除
    self.active_connections.remove(websocket)

async def send_personal_message(self, message: str, websocket: WebSocket):
    # 发送个人消息
    await websocket.send_text(message)

async def broadcast(self, message: str):
    # 广播消息
    for connection in self.active_connections:
        await connection.send_text(message)

manager = ConnectionManager()

“`

步骤 2: 在 main.py 中使用管理器

现在,我们重构 main.py 来使用这个 ConnectionManager

“`python

main.py

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse

假设 ConnectionManager.py 与 main.py 在同一目录

from ConnectionManager import manager

app = FastAPI()

提供 HTML 页面的端点

为简单起见,我们将HTML内容放在字符串中

在实际项目中,您会使用 Jinja2 或类似模板引擎

html = “””




Chat

WebSocket Chat

Your ID:





“””

@app.get(“/”)
async def get():
return HTMLResponse(html)

@app.websocket(“/ws/{client_id}”)
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket)
# 广播有新用户加入
await manager.broadcast(f”客户端 #{client_id} 加入聊天室”)
try:
while True:
data = await websocket.receive_text()
# 向所有客户端广播收到的消息
await manager.broadcast(f”客户端 #{client_id} 说: {data}”)
except WebSocketDisconnect:
manager.disconnect(websocket)
# 广播有用户离开
await manager.broadcast(f”客户端 #{client_id} 离开聊天室”)
“`

代码解析:
* 我们导入了 manager 的单例。
* @app.websocket("/ws/{client_id}"): 我们在路径中添加了一个 client_id 参数,以便识别不同的用户。
* await manager.connect(websocket): 当一个新客户端连接时,我们将其添加到管理器中。
* await manager.broadcast(...): 当收到消息时,我们不再只回复给发送者,而是通过管理器广播给所有连接的客户端。
* except WebSocketDisconnect:: FastAPI 提供了一个特殊的异常 WebSocketDisconnect,当客户端断开时会触发。
* manager.disconnect(websocket): 在 except 块中,我们从管理器中移除断开的连接,确保不会向一个已关闭的连接发送消息。

7. 构建一个完整的聊天室应用

上面的代码已经包含了后端的聊天室逻辑。前端部分,我们提供了一个完整的HTML,可以直接运行。

如何测试:
1. 将 ConnectionManager.pymain.py 放在同一目录下。
2. 运行 uvicorn main:app --reload
3. 在 两个或多个不同的浏览器标签页或窗口 中打开 http://127.0.0.1:8000
4. 每个页面都会被分配一个唯一的客户端 ID。
5. 当一个新页面打开时,所有其他页面都会收到 “xxx 加入聊天室” 的广播。
6. 在任何一个页面发送消息,所有页面都会实时收到这条消息。
7. 关闭一个页面,其他页面会收到 “xxx 离开聊天室” 的广播。

8. 处理断开连接与异常

我们已经在上面的例子中使用了 try...except WebSocketDisconnect 结构,这是处理客户端正常关闭的最佳实践。

python
try:
# 循环接收消息
while True:
data = await websocket.receive_text()
await manager.broadcast(f"客户端 #{client_id} 说: {data}")
except WebSocketDisconnect:
# 客户端断开连接
manager.disconnect(websocket)
await manager.broadcast(f"客户端 #{client_id} 离开聊天室")
except Exception as e:
# 处理其他可能的异常
print(f"客户端 #{client_id} 发生错误: {e}")
manager.disconnect(websocket)

disconnect 调用放在 finally 块中是另一种确保连接总是被清理的健壮方法,但 WebSocketDisconnect 异常通常更具可读性。

9. 总结

恭喜!您已经成功学习了如何在 FastAPI 中使用 WebSocket,从一个简单的回声服务到一个功能性的实时聊天应用。

我们涵盖的核心要点:
* 使用 @app.websocket() 装饰器定义 WebSocket 路由。
* 通过 websocket.accept() 建立连接。
* 使用 receive_text/jsonsend_text/json 进行双向通信。
* 通过实现 ConnectionManager 来管理多个连接并实现消息广播。
* 使用 try...except WebSocketDisconnect 优雅地处理客户端断开。

FastAPI 使 WebSocket 的开发变得异常简单和直观。以此为基础,您可以构建更复杂的应用,例如集成数据库、实现用户认证、或者将 WebSocket 与 Redis Pub/Sub 等消息队列结合以支持分布式部署。祝您编码愉快!

滚动至顶部