玩转 FastAPI SSE:构建实时 Web 应用 – wiki大全


玩转 FastAPI SSE:构建实时 Web 应用

在现代 Web 开发中,实时通信已成为用户体验不可或缺的一部分。无论是聊天应用、实时通知、数据看板还是日志监控,能够即时获取最新信息都至关重要。虽然 WebSocket 是实现双向实时通信的强大工具,但在许多场景下,我们只需要服务器向客户端推送数据,而客户端无需频繁向服务器发送请求。这时,Server-Sent Events (SSE) 便是一个轻量且高效的解决方案。

本文将深入探讨如何在 Python 的高性能 Web 框架 FastAPI 中,利用 SSE 构建实时 Web 应用,并提供实用的代码示例。

1. 什么是 Server-Sent Events (SSE)?

Server-Sent Events (SSE) 是一种允许服务器向客户端推送更新的 Web 技术。它基于 HTTP 协议,通过一个持久的、单向的连接实现。与传统的短轮询(客户端不断请求数据)或长轮询(服务器保持请求开放直到有新数据)不同,SSE 允许服务器在数据可用时主动将其推送到客户端。

SSE 的主要特点:

  • 单向通信:数据流从服务器流向客户端。客户端无法直接通过 SSE 连接向服务器发送数据(需要额外的 HTTP 请求或 WebSocket)。
  • 基于 HTTP:SSE 利用标准的 HTTP 协议,通常在 /text/event-stream 内容类型上运行。这意味着它可以通过现有的 HTTP/S 基础设施,如代理和防火墙,而无需特殊的配置。
  • 自动重连:当连接中断时,浏览器会自动尝试重新连接。
  • 简单易用:相对于 WebSocket,SSE 的 API 更简单,尤其适用于只需要服务器推送的场景。
  • 事件流:服务器可以发送不同类型的命名事件,客户端可以监听特定的事件。

为什么选择 SSE 而不是 WebSocket?

  • 简单性:如果你的应用只涉及服务器向客户端推送数据,SSE 提供了更简单的实现和更小的开销。
  • 兼容性:SSE 基于 HTTP,因此与现有的 Web 技术栈和基础设施兼容性更好。
  • 自动重连:内置的自动重连机制减少了客户端代码的复杂性。
  • 性能:对于纯粹的服务器推送场景,SSE 的性能通常与 WebSocket 不相上下,甚至在某些情况下可能更优,因为它避免了 WebSocket 握手和更复杂的帧协议。

2. FastAPI 与异步流

FastAPI 基于 Starlette 和 Pydantic 构建,天生支持异步编程。这使得它与 SSE 完美结合,因为 SSE 本质上就是一种异步数据流。FastAPI 提供了 StreamingResponse 类,可以非常方便地创建流式响应。

2.1 核心:StreamingResponse

StreamingResponse 接收一个异步生成器(或普通生成器)作为内容,并以流的形式将其发送给客户端。对于 SSE,我们需要将 media_type 设置为 text/event-stream

一个基本的 SSE 事件流的格式如下:

event: message_type
id: 1
data: some data string

每条消息以 data: 开头,并以两个换行符 \n\n 结束。如果需要指定事件类型和 ID,则分别用 event:id: 开头。

3. 构建一个简单的 FastAPI SSE 服务

我们将从一个最简单的例子开始,构建一个每秒向客户端发送当前时间的 SSE 服务。

3.1 FastAPI 后端代码

创建一个 main.py 文件:

“`python
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import datetime

app = FastAPI()

async def generate_time_events():
“””
一个异步生成器,每秒生成一个包含当前时间的 SSE 事件。
“””
while True:
# 获取当前时间
current_time = datetime.datetime.now().strftime(“%H:%M:%S”)

    # 格式化为 SSE 事件数据
    # data: 字段是必须的
    # event: 字段是可选的,用于指定事件类型
    # id: 字段是可选的,用于标识事件
    sse_data = f"data: Current time: {current_time}\n\n"

    # 打印到控制台,方便调试
    print(f"Sending: {sse_data.strip()}")

    # 发送数据
    yield sse_data

    # 等待一秒
    await asyncio.sleep(1)

@app.get(“/stream”)
async def stream_time(request: Request):
“””
SSE 接口,返回一个每秒更新时间的事件流。
“””
print(“Client connected for time stream.”)

# 使用 StreamingResponse 返回一个 SSE 流
return StreamingResponse(generate_time_events(), media_type="text/event-stream")

@app.get(“/”)
async def read_root():
return {“message”: “Welcome to FastAPI SSE example!”}

“`

要运行此应用,请安装 FastAPI 和 Uvicorn:

bash
pip install fastapi "uvicorn[standard]"

然后运行:

bash
uvicorn main:app --reload

3.2 客户端 HTML/JavaScript

现在,我们需要一个前端页面来接收并显示这些事件。创建一个 index.html 文件:

“`html






FastAPI SSE Time Stream


FastAPI SSE Time Stream

This page receives real-time updates from a FastAPI SSE server.



“`

index.html 文件放在 main.py 同级目录下,然后在浏览器中打开 http://127.0.0.1:8000/index.html(如果直接通过文件系统打开可能因跨域问题无法连接,最好是通过一个简单的静态文件服务器,或者直接通过 FastAPI 提供)。

通过 FastAPI 提供静态文件 (可选,仅为方便测试):

main.py 中添加:

“`python
from fastapi.staticfiles import StaticFiles

… (之前的代码)

app.mount(“/static”, StaticFiles(directory=”.”), name=”static”)

@app.get(“/”) 可以指向 index.html

@app.get(“/”)
async def read_root():
with open(“index.html”, “r”) as f:
return StreamingResponse(f.read(), media_type=”text/html”)
“`

现在访问 http://127.0.0.1:8000/ 就可以看到效果了。

4. 构建一个简易的实时日志监控

接下来,我们构建一个更实际的例子:一个实时监控服务器端日志的 SSE 应用。

4.1 FastAPI 后端代码

假设我们有一个 app.log 文件,并且有另一个进程或脚本在不断地向其中写入日志。我们的 FastAPI 应用将监控这个文件,并在有新行时将其推送到客户端。

修改 main.py

“`python
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
import asyncio
import datetime
import os

app = FastAPI()

模拟日志文件

LOG_FILE = “app.log”

创建或清空日志文件以便测试

with open(LOG_FILE, “w”) as f:
f.write(f”[{datetime.datetime.now()}] Server started.\n”)

模拟一个后台任务,每隔一段时间向日志文件写入新内容

async def simulate_log_writes():
count = 0
while True:
await asyncio.sleep(3) # 每3秒写入一条日志
count += 1
log_message = f”[{datetime.datetime.now()}] Log message {count}: This is an important event.\n”
with open(LOG_FILE, “a”) as f:
f.write(log_message)
print(f”Simulated writing to log: {log_message.strip()}”)

在应用启动时启动模拟日志写入任务

@app.on_event(“startup”)
async def startup_event():
asyncio.create_task(simulate_log_writes())

async def follow_log_file(request: Request):
“””
异步生成器,实时跟踪日志文件的新行并作为 SSE 事件发送。
“””
last_position = 0

while True:
    # 检查客户端是否还在连接
    if await request.is_disconnected():
        print("Client disconnected from log stream.")
        break

    try:
        with open(LOG_FILE, "r") as f:
            f.seek(last_position) # 从上次读取的位置开始
            new_lines = f.readlines()
            last_position = f.tell() # 更新上次读取的位置

            for line in new_lines:
                if line.strip(): # 忽略空行
                    sse_data = f"data: {line.strip()}\n\n"
                    print(f"Sending log: {sse_data.strip()}")
                    yield sse_data
    except Exception as e:
        print(f"Error reading log file: {e}")
        yield f"data: Error reading log file: {e}\n\n"

    await asyncio.sleep(1) # 每秒检查一次新日志

@app.get(“/log-stream”)
async def log_stream(request: Request):
“””
SSE 接口,返回实时日志事件流。
“””
print(“Client connected for log stream.”)
return StreamingResponse(follow_log_file(request), media_type=”text/event-stream”)

提供静态文件(HTML 页面)

app.mount(“/static”, StaticFiles(directory=”.”), name=”static”)

@app.get(“/”, response_class=HTMLResponse)
async def read_root():
return “””
<!DOCTYPE html>




FastAPI SSE Real-time Log Monitor


FastAPI SSE Real-time Log Monitor

This page displays real-time server logs via Server-Sent Events.

    <script>
        const eventSource = new EventSource("/log-stream");
        const outputDiv = document.getElementById("output");

        eventSource.onmessage = function(event) {
            console.log("Received log entry:", event.data);
            const p = document.createElement("p");
            p.className = "log-entry";
            p.textContent = event.data;
            outputDiv.appendChild(p); // 将新消息添加到底部
            outputDiv.scrollTop = outputDiv.scrollHeight; // 自动滚动到最新消息
        };

        eventSource.onopen = function() {
            console.log("SSE log connection opened.");
            const p = document.createElement("p");
            p.className = "log-entry system";
            p.textContent = "--- Log stream opened ---";
            outputDiv.appendChild(p);
            outputDiv.scrollTop = outputDiv.scrollHeight;
        };

        eventSource.onerror = function(error) {
            console.error("SSE log error:", error);
            const p = document.createElement("p");
            p.className = "log-entry system error";
            p.textContent = "--- Log stream error or closed. Attempting to reconnect... ---";
            outputDiv.appendChild(p);
            outputDiv.scrollTop = outputDiv.scrollHeight;
        };

        window.onbeforeunload = function() {
            eventSource.close();
            console.log("SSE log connection closed.");
        };
    </script>
</body>
</html>
"""

“`

运行 uvicorn main:app --reload 后,访问 http://127.0.0.1:8000/,你将看到一个实时更新的日志流。

这个例子中:
* simulate_log_writes 模拟了一个在后台不断写入日志文件的任务。
* follow_log_file 生成器函数使用 f.seek(last_position) 来记住上次读取的位置,并只读取新的行。
* request.is_disconnected() 用于检查客户端是否已断开连接,避免服务器无限期地尝试向已断开的客户端发送数据,这对于优化资源使用非常重要。

5. 进阶考量

5.1 客户端重连机制

EventSource 客户端内置了自动重连机制。当连接中断(例如,网络问题、服务器重启)时,它会尝试在几秒后重新连接。服务器可以通过发送 retry: <milliseconds> 字段来建议客户端的重连间隔。

“`python

示例:在 SSE 事件中包含 retry 字段

sse_data = f”retry: 5000\ndata: Keep alive\n\n” # 建议客户端5秒后重连
yield sse_data
“`

5.2 发送带 ID 和事件类型的消息

为了更精细地控制客户端事件处理,你可以为 SSE 消息添加 idevent 字段:

“`python

假设你在某个地方有一个计数器或唯一 ID

event_id = 0
while True:
event_id += 1
message_type = “log_update” # 自定义事件类型
log_entry = f”[{datetime.datetime.now()}] A new log entry.”

sse_data = (
    f"id: {event_id}\n"
    f"event: {message_type}\n"
    f"data: {log_entry}\n\n"
)
yield sse_data
await asyncio.sleep(1)

“`

客户端可以通过 eventSource.addEventListener('log_update', function(event) { ... }) 来监听特定的事件类型。event.lastEventId 会包含最新的 id

5.3 错误处理与心跳机制

  • 服务器端错误:在生成器中捕获异常,并通过 SSE 发送错误信息给客户端,然后可能终止连接。
  • 心跳消息:为了保持连接活跃,即使没有实际数据推送,服务器也可以定期发送“心跳”消息(例如,data: \n\n 或者带 event: heartbeat 的空数据)。这有助于防止代理或防火墙由于长时间不活动而关闭连接。

5.4 与消息队列集成

对于更复杂的实时应用,例如需要支持大量客户端或从多个源收集数据的场景,直接在生成器中读取文件或数据库可能效率不高。这时,可以结合消息队列(如 Redis Pub/Sub, RabbitMQ, Kafka)来构建可扩展的 SSE 服务。

工作流程示例:

  1. 数据生产者:将实时数据(如日志、通知)发布到消息队列的某个主题。
  2. FastAPI SSE 服务:订阅该主题。
  3. SSE 生成器:从消息队列中消费消息,并将其格式化为 SSE 事件推送到连接的客户端。

“`python

伪代码示例:结合 Redis Pub/Sub

import aioredis # 假设使用 aioredis

async def consume_from_redis():
redis = await aioredis.create_redis_pool(‘redis://localhost’)
channel = (await redis.subscribe(‘my-realtime-channel’))[0]

while True:
    message = await channel.get() # 等待新消息
    if message:
        yield f"data: {message.decode('utf-8')}\n\n"
    await asyncio.sleep(0.1) # 短暂等待,避免CPU空转

“`

5.5 部署考量

  • Gunicorn + Uvicorn Worker:在生产环境中,通常会使用 Gunicorn 配合 Uvicorn Worker 来运行 FastAPI 应用。确保 Gunicorn 的 timeout 设置足够长,以支持 SSE 的长连接。
  • 反向代理 (Nginx/Traefik):如果使用 Nginx 等反向代理,可能需要配置 proxy_read_timeoutproxy_send_timeout,并确保 proxy_buffering off; 以便立即将数据推送到客户端,而不是等待缓冲区填满。
  • 负载均衡:如果部署在多个服务器上,且需要所有客户端都能收到所有事件(例如广播),则需要后端有消息队列来同步不同实例的事件。

6. 总结

Server-Sent Events 提供了一种简洁高效的方式来实现服务器到客户端的单向实时通信。结合 FastAPI 强大的异步能力和 StreamingResponse,我们可以轻松构建出响应迅速、用户体验良好的实时 Web 应用。从简单的实时时钟到复杂的日志监控和消息通知系统,SSE 都是一个值得考虑的强大工具。在选择实时技术时,请根据你的具体需求,权衡 SSE 和 WebSocket 的优劣,选择最适合的方案。


滚动至顶部