在现代Web应用开发中,性能监控是一个关键环节。了解每个请求的处理时间可以帮助我们识别性能瓶颈,优化系统表现。本文将详细介绍如何在FastAPI中创建一个高性能的ASGI中间件来记录HTTP响应时间。
ASGI(Asynchronous Server Gateway Interface)是Python异步Web服务器和应用程序之间的标准接口。ASGI中间件是一种特殊的组件,它位于Web服务器和应用程序之间,可以拦截和处理请求与响应。
虽然FastAPI提供了许多内置功能,但有时我们需要添加特定的业务逻辑,如:
让我们从头开始构建一个高效的响应时间记录中间件。
首先,我们需要理解ASGI的基本概念和消息格式:
pythonimport time
from typing import Callable, Awaitable
from starlette.types import ASGIApp, Scope, Receive, Send
class TimingMiddleware:
"""
纯ASGI中间件,用于计算响应时间并兼容流式响应
"""
def __init__(self, app: ASGIApp):
self.app = app
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
# 中间件的主要逻辑
pass
我们需要确保中间件只处理HTTP请求,忽略其他类型的请求:
pythonasync def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "http":
await self.app(scope, receive, send)
return
# HTTP请求处理逻辑
关键部分是在请求开始时记录时间,并在响应完成时计算总耗时:
pythonasync def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "http":
await self.app(scope, receive, send)
return
# 记录请求开始时间
start_time = time.perf_counter()
# 标志是否已经发送了响应头
self._response_started = False
async def send_wrapper(message):
# 消息处理逻辑
pass
# 调用下一个ASGI应用
await self.app(scope, receive, send_wrapper)
这是最复杂的部分,需要正确处理ASGI的两种主要消息类型:
pythonasync def send_wrapper(message):
if message["type"] == "http.response.start":
# 处理响应头
duration = time.perf_counter() - start_time
headers = message.get("headers", [])
# 检查是否已存在相同头部,避免重复
existing_header_names = [h[0].lower() for h in headers]
if b'x-response-time' not in existing_header_names:
headers = list(headers)
headers.append((b"x-response-time", f"{duration:.3f}s".encode()))
# 更新消息中的头部
message["headers"] = headers
# 标记响应已经开始
self._response_started = True
# 发送修改后的响应头
await send(message)
elif message["type"] == "http.response.body":
# 直接发送body消息
await send(message)
else:
# 其他类型的消息直接转发
await send(message)
pythonimport time
from typing import Callable, Awaitable
from starlette.types import ASGIApp, Scope, Receive, Send
class TimingMiddleware:
"""
纯ASGI中间件,用于计算响应时间并兼容流式响应
"""
def __init__(self, app: ASGIApp):
self.app = app
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "http":
await self.app(scope, receive, send)
return
# 记录请求开始时间
start_time = time.perf_counter()
# 标志是否已经发送了响应头
self._response_started = False
async def send_wrapper(message):
if message["type"] == "http.response.start":
# 在响应头中添加响应时间
duration = time.perf_counter() - start_time
headers = message.get("headers", [])
# 检查是否已存在相同头部,避免重复
existing_header_names = [h[0].lower() for h in headers]
if b'x-response-time' not in existing_header_names:
headers = list(headers)
headers.append((b"x-response-time", f"{duration:.3f}s".encode()))
# 更新消息中的头部
message["headers"] = headers
# 标记响应已经开始
self._response_started = True
# 发送修改后的响应头
await send(message)
elif message["type"] == "http.response.body":
# 直接发送body消息
await send(message)
else:
# 其他类型的消息直接转发
await send(message)
# 调用下一个ASGI应用
await self.app(scope, receive, send_wrapper)
现在让我们看看如何在实际的FastAPI应用中使用这个中间件:
pythonfrom fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
app.add_middleware(TimingMiddleware)
@app.get("/")
async def root():
return {"message": "Hello World"}
@app.get("/stream")
async def stream_data():
"""示例流式响应"""
async def generate_data():
for i in range(5):
yield f"data: Item {i}\n".encode('utf-8')
await asyncio.sleep(0.5) # 模拟延迟
return StreamingResponse(generate_data(), media_type="text/plain")
@app.get("/slow")
async def slow_endpoint():
"""模拟慢速响应"""
await asyncio.sleep(1)
return {"message": "Slow response completed"}
启动应用后,访问任意端点,你可以在响应头中看到 X-Response-Time 字段,显示了请求的处理时间。
time.perf_counter() 提供高精度计时通过创建这个ASGI中间件,我们学会了如何:
这个中间件为我们的FastAPI应用提供了重要的性能监控能力,帮助我们更好地理解和优化应用性能。记住,好的中间件应该是轻量级、高效且不干扰主要业务逻辑的。
本文作者:Dewar
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!