玩转 FastAPI SSE:构建实时 Web 应用
在现代 Web 开发中,实时通信已成为用户体验不可或缺的一部分。无论是聊天应用、实时通知、数据看板还是日志监控,能够即时获取最新信息都至关重要。虽然 WebSocket 是实现双向实时通信的强大工具,但在许多场景下,我们只需要服务器向客户端推送数据,而客户端无需频繁向服务器发送请求。这时,Server-Sent Events (SSE) 便是一个轻量且高效的解决方案。
本文将深入探讨如何在 Python 的高性能 Web 框架 FastAPI 中,利用 SSE 构建实时 Web 应用,并提供实用的代码示例。
1. 什么是 Server-Sent Events (SSE)?
Server-Sent Events (SSE) 是一种允许服务器向客户端推送更新的 Web 技术。它基于 HTTP 协议,通过一个持久的、单向的连接实现。与传统的短轮询(客户端不断请求数据)或长轮询(服务器保持请求开放直到有新数据)不同,SSE 允许服务器在数据可用时主动将其推送到客户端。
SSE 的主要特点:
- 单向通信:数据流从服务器流向客户端。客户端无法直接通过 SSE 连接向服务器发送数据(需要额外的 HTTP 请求或 WebSocket)。
- 基于 HTTP:SSE 利用标准的 HTTP 协议,通常在
/text/event-stream内容类型上运行。这意味着它可以通过现有的 HTTP/S 基础设施,如代理和防火墙,而无需特殊的配置。 - 自动重连:当连接中断时,浏览器会自动尝试重新连接。
- 简单易用:相对于 WebSocket,SSE 的 API 更简单,尤其适用于只需要服务器推送的场景。
- 事件流:服务器可以发送不同类型的命名事件,客户端可以监听特定的事件。
为什么选择 SSE 而不是 WebSocket?
- 简单性:如果你的应用只涉及服务器向客户端推送数据,SSE 提供了更简单的实现和更小的开销。
- 兼容性:SSE 基于 HTTP,因此与现有的 Web 技术栈和基础设施兼容性更好。
- 自动重连:内置的自动重连机制减少了客户端代码的复杂性。
- 性能:对于纯粹的服务器推送场景,SSE 的性能通常与 WebSocket 不相上下,甚至在某些情况下可能更优,因为它避免了 WebSocket 握手和更复杂的帧协议。
2. FastAPI 与异步流
FastAPI 基于 Starlette 和 Pydantic 构建,天生支持异步编程。这使得它与 SSE 完美结合,因为 SSE 本质上就是一种异步数据流。FastAPI 提供了 StreamingResponse 类,可以非常方便地创建流式响应。
2.1 核心:StreamingResponse
StreamingResponse 接收一个异步生成器(或普通生成器)作为内容,并以流的形式将其发送给客户端。对于 SSE,我们需要将 media_type 设置为 text/event-stream。
一个基本的 SSE 事件流的格式如下:
event: message_type
id: 1
data: some data string
每条消息以 data: 开头,并以两个换行符 \n\n 结束。如果需要指定事件类型和 ID,则分别用 event: 和 id: 开头。
3. 构建一个简单的 FastAPI SSE 服务
我们将从一个最简单的例子开始,构建一个每秒向客户端发送当前时间的 SSE 服务。
3.1 FastAPI 后端代码
创建一个 main.py 文件:
“`python
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import datetime
app = FastAPI()
async def generate_time_events():
“””
一个异步生成器,每秒生成一个包含当前时间的 SSE 事件。
“””
while True:
# 获取当前时间
current_time = datetime.datetime.now().strftime(“%H:%M:%S”)
# 格式化为 SSE 事件数据
# data: 字段是必须的
# event: 字段是可选的,用于指定事件类型
# id: 字段是可选的,用于标识事件
sse_data = f"data: Current time: {current_time}\n\n"
# 打印到控制台,方便调试
print(f"Sending: {sse_data.strip()}")
# 发送数据
yield sse_data
# 等待一秒
await asyncio.sleep(1)
@app.get(“/stream”)
async def stream_time(request: Request):
“””
SSE 接口,返回一个每秒更新时间的事件流。
“””
print(“Client connected for time stream.”)
# 使用 StreamingResponse 返回一个 SSE 流
return StreamingResponse(generate_time_events(), media_type="text/event-stream")
@app.get(“/”)
async def read_root():
return {“message”: “Welcome to FastAPI SSE example!”}
“`
要运行此应用,请安装 FastAPI 和 Uvicorn:
bash
pip install fastapi "uvicorn[standard]"
然后运行:
bash
uvicorn main:app --reload
3.2 客户端 HTML/JavaScript
现在,我们需要一个前端页面来接收并显示这些事件。创建一个 index.html 文件:
“`html
FastAPI SSE Time Stream
This page receives real-time updates from a FastAPI SSE server.
“`
将 index.html 文件放在 main.py 同级目录下,然后在浏览器中打开 http://127.0.0.1:8000/index.html(如果直接通过文件系统打开可能因跨域问题无法连接,最好是通过一个简单的静态文件服务器,或者直接通过 FastAPI 提供)。
通过 FastAPI 提供静态文件 (可选,仅为方便测试):
在 main.py 中添加:
“`python
from fastapi.staticfiles import StaticFiles
… (之前的代码)
app.mount(“/static”, StaticFiles(directory=”.”), name=”static”)
@app.get(“/”) 可以指向 index.html
@app.get(“/”)
async def read_root():
with open(“index.html”, “r”) as f:
return StreamingResponse(f.read(), media_type=”text/html”)
“`
现在访问 http://127.0.0.1:8000/ 就可以看到效果了。
4. 构建一个简易的实时日志监控
接下来,我们构建一个更实际的例子:一个实时监控服务器端日志的 SSE 应用。
4.1 FastAPI 后端代码
假设我们有一个 app.log 文件,并且有另一个进程或脚本在不断地向其中写入日志。我们的 FastAPI 应用将监控这个文件,并在有新行时将其推送到客户端。
修改 main.py:
“`python
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
import asyncio
import datetime
import os
app = FastAPI()
模拟日志文件
LOG_FILE = “app.log”
创建或清空日志文件以便测试
with open(LOG_FILE, “w”) as f:
f.write(f”[{datetime.datetime.now()}] Server started.\n”)
模拟一个后台任务,每隔一段时间向日志文件写入新内容
async def simulate_log_writes():
count = 0
while True:
await asyncio.sleep(3) # 每3秒写入一条日志
count += 1
log_message = f”[{datetime.datetime.now()}] Log message {count}: This is an important event.\n”
with open(LOG_FILE, “a”) as f:
f.write(log_message)
print(f”Simulated writing to log: {log_message.strip()}”)
在应用启动时启动模拟日志写入任务
@app.on_event(“startup”)
async def startup_event():
asyncio.create_task(simulate_log_writes())
async def follow_log_file(request: Request):
“””
异步生成器,实时跟踪日志文件的新行并作为 SSE 事件发送。
“””
last_position = 0
while True:
# 检查客户端是否还在连接
if await request.is_disconnected():
print("Client disconnected from log stream.")
break
try:
with open(LOG_FILE, "r") as f:
f.seek(last_position) # 从上次读取的位置开始
new_lines = f.readlines()
last_position = f.tell() # 更新上次读取的位置
for line in new_lines:
if line.strip(): # 忽略空行
sse_data = f"data: {line.strip()}\n\n"
print(f"Sending log: {sse_data.strip()}")
yield sse_data
except Exception as e:
print(f"Error reading log file: {e}")
yield f"data: Error reading log file: {e}\n\n"
await asyncio.sleep(1) # 每秒检查一次新日志
@app.get(“/log-stream”)
async def log_stream(request: Request):
“””
SSE 接口,返回实时日志事件流。
“””
print(“Client connected for log stream.”)
return StreamingResponse(follow_log_file(request), media_type=”text/event-stream”)
提供静态文件(HTML 页面)
app.mount(“/static”, StaticFiles(directory=”.”), name=”static”)
@app.get(“/”, response_class=HTMLResponse)
async def read_root():
return “””
<!DOCTYPE html>
FastAPI SSE Real-time Log Monitor
This page displays real-time server logs via Server-Sent Events.
<script>
const eventSource = new EventSource("/log-stream");
const outputDiv = document.getElementById("output");
eventSource.onmessage = function(event) {
console.log("Received log entry:", event.data);
const p = document.createElement("p");
p.className = "log-entry";
p.textContent = event.data;
outputDiv.appendChild(p); // 将新消息添加到底部
outputDiv.scrollTop = outputDiv.scrollHeight; // 自动滚动到最新消息
};
eventSource.onopen = function() {
console.log("SSE log connection opened.");
const p = document.createElement("p");
p.className = "log-entry system";
p.textContent = "--- Log stream opened ---";
outputDiv.appendChild(p);
outputDiv.scrollTop = outputDiv.scrollHeight;
};
eventSource.onerror = function(error) {
console.error("SSE log error:", error);
const p = document.createElement("p");
p.className = "log-entry system error";
p.textContent = "--- Log stream error or closed. Attempting to reconnect... ---";
outputDiv.appendChild(p);
outputDiv.scrollTop = outputDiv.scrollHeight;
};
window.onbeforeunload = function() {
eventSource.close();
console.log("SSE log connection closed.");
};
</script>
</body>
</html>
"""
“`
运行 uvicorn main:app --reload 后,访问 http://127.0.0.1:8000/,你将看到一个实时更新的日志流。
这个例子中:
* simulate_log_writes 模拟了一个在后台不断写入日志文件的任务。
* follow_log_file 生成器函数使用 f.seek(last_position) 来记住上次读取的位置,并只读取新的行。
* request.is_disconnected() 用于检查客户端是否已断开连接,避免服务器无限期地尝试向已断开的客户端发送数据,这对于优化资源使用非常重要。
5. 进阶考量
5.1 客户端重连机制
EventSource 客户端内置了自动重连机制。当连接中断(例如,网络问题、服务器重启)时,它会尝试在几秒后重新连接。服务器可以通过发送 retry: <milliseconds> 字段来建议客户端的重连间隔。
“`python
示例:在 SSE 事件中包含 retry 字段
sse_data = f”retry: 5000\ndata: Keep alive\n\n” # 建议客户端5秒后重连
yield sse_data
“`
5.2 发送带 ID 和事件类型的消息
为了更精细地控制客户端事件处理,你可以为 SSE 消息添加 id 和 event 字段:
“`python
假设你在某个地方有一个计数器或唯一 ID
event_id = 0
while True:
event_id += 1
message_type = “log_update” # 自定义事件类型
log_entry = f”[{datetime.datetime.now()}] A new log entry.”
sse_data = (
f"id: {event_id}\n"
f"event: {message_type}\n"
f"data: {log_entry}\n\n"
)
yield sse_data
await asyncio.sleep(1)
“`
客户端可以通过 eventSource.addEventListener('log_update', function(event) { ... }) 来监听特定的事件类型。event.lastEventId 会包含最新的 id。
5.3 错误处理与心跳机制
- 服务器端错误:在生成器中捕获异常,并通过 SSE 发送错误信息给客户端,然后可能终止连接。
- 心跳消息:为了保持连接活跃,即使没有实际数据推送,服务器也可以定期发送“心跳”消息(例如,
data: \n\n或者带event: heartbeat的空数据)。这有助于防止代理或防火墙由于长时间不活动而关闭连接。
5.4 与消息队列集成
对于更复杂的实时应用,例如需要支持大量客户端或从多个源收集数据的场景,直接在生成器中读取文件或数据库可能效率不高。这时,可以结合消息队列(如 Redis Pub/Sub, RabbitMQ, Kafka)来构建可扩展的 SSE 服务。
工作流程示例:
- 数据生产者:将实时数据(如日志、通知)发布到消息队列的某个主题。
- FastAPI SSE 服务:订阅该主题。
- SSE 生成器:从消息队列中消费消息,并将其格式化为 SSE 事件推送到连接的客户端。
“`python
伪代码示例:结合 Redis Pub/Sub
import aioredis # 假设使用 aioredis
async def consume_from_redis():
redis = await aioredis.create_redis_pool(‘redis://localhost’)
channel = (await redis.subscribe(‘my-realtime-channel’))[0]
while True:
message = await channel.get() # 等待新消息
if message:
yield f"data: {message.decode('utf-8')}\n\n"
await asyncio.sleep(0.1) # 短暂等待,避免CPU空转
“`
5.5 部署考量
- Gunicorn + Uvicorn Worker:在生产环境中,通常会使用 Gunicorn 配合 Uvicorn Worker 来运行 FastAPI 应用。确保 Gunicorn 的
timeout设置足够长,以支持 SSE 的长连接。 - 反向代理 (Nginx/Traefik):如果使用 Nginx 等反向代理,可能需要配置
proxy_read_timeout和proxy_send_timeout,并确保proxy_buffering off;以便立即将数据推送到客户端,而不是等待缓冲区填满。 - 负载均衡:如果部署在多个服务器上,且需要所有客户端都能收到所有事件(例如广播),则需要后端有消息队列来同步不同实例的事件。
6. 总结
Server-Sent Events 提供了一种简洁高效的方式来实现服务器到客户端的单向实时通信。结合 FastAPI 强大的异步能力和 StreamingResponse,我们可以轻松构建出响应迅速、用户体验良好的实时 Web 应用。从简单的实时时钟到复杂的日志监控和消息通知系统,SSE 都是一个值得考虑的强大工具。在选择实时技术时,请根据你的具体需求,权衡 SSE 和 WebSocket 的优劣,选择最适合的方案。