Python中实现流式接口响应处理详解
一. 前言
在Python中实现流式处理通常涉及到处理大量的数据,这些数据可能来自于网络、磁盘或其他数据源。流式处理的一个关键点是数据不需要一次性加载到内存中,而是逐块处理。
二、设计原则
三、实现方式
四. 代码设计
1.python后端代码:
1.1 Flask服务
import json
import requests
from flask import Flask, Response, stream_with_context, request
import time
from flask_cors import CORS
from gevent.pywsgi import WSGIServer
app = Flask(__name__)
CORS(app, resources={r"/*": {"origins": "*"}}) # 允许所有源访问,但请注意生产环境的安全性
@app.route('/ret_json_data', methods=['GET'])
def ret_json_data():
print('start request!')
return {'hello': 'hello flask'}
@app.route('/stream_json_data', methods=["GET", "POST"])
def stream_json_data():
json_data = {
'state': 0,
'data': [],
'text': '',
'message': ''
}
ret_str = "大模型通常指的是那些规模庞大、参数数量极多的人工智能模型,它们在训练时使用了海量的数据,能够处理复杂的任务并展现出强大的语言理解和生成能力。"
def generate():
for i in range(len(ret_str)):
json_data["text"] = ret_str[i]
time.sleep(0.1) # 暂停一秒,模拟数据产生的时间间隔
print(f"Data: {json_data}\n")
yield json.dumps(json_data, ensure_ascii=False)
print('stream-data')
return Response(stream_with_context(generate()), content_type='application/json; charset=utf-8')
# return stream_data(generate())
@app.route('/stream_data_to_frontend')
def stream_data_to_frontend():
# 假设你有一个外部 URL,该 URL 提供流式数据
external_url = 'http://127.0.0.1:7000/stream_json_data'
# 使用 requests 库的流式响应来接收数据
headers = {"Content-Type": "application/json"}
response = requests.get(external_url, stream=True, headers=headers)
# with requests.get(external_url, stream=True, headers=headers) as r:
# # print('r========', r.json())
# # 检查外部请求是否成功
if response.status_code == 200:
# 创建一个生成器来流式传输响应内容
def generate():
for chunk in response.iter_content(chunk_size=1024): # 你可以设置合适的 chunk_size
if chunk:
print('chunk: ', chunk)
# yield chunk
# yield chunk.decode("utf-8", "ignore")
chunk = json.loads(chunk)
chunk['tt'] = 'ttttt'
yield json.dumps(chunk, ensure_ascii=False)
# 创建一个 Flask 响应对象,并使用 stream_with_context 来发送流式响应
return Response(stream_with_context(generate()), content_type='application/json; charset=utf-8')
else:
# 如果外部请求失败,返回错误信息
return 'Failed to retrieve data from external source', 500
@app.route('/stream_data_to_frontend_test')
def stream_data_to_frontend_test():
url = "http://ip:port/llm"
params = {
"messages": [
{
"role": "user",
"content": "储能公司在该项目攻关中取得了哪些关键性突破?"
}
]
}
headers = {"Content-Type": "application/json"}
response = requests.post(url=url, json=params, stream=True, headers=headers)
def generate():
for chunk in response.iter_content(1024):
print(chunk.decode("utf-8", "ignore"))
if chunk:
yield chunk.decode("utf-8", "ignore")
return Response(stream_with_context(generate()), content_type='application/json; charset=utf-8')
@app.route('/stream_faq_data_to_frontend_test')
def stream_faq_data_to_frontend_test():
url = 'http://ip:port/url'
# 请求的数据
data = {
"query": "测试"
}
headers = {"Content-Type": "application/json"}
response = requests.post(url=url, json=data, stream=True, headers=headers)
def generate():
for chunk in response.iter_content(1024):
print('chunk', chunk.decode("utf-8", "ignore"))
if chunk:
# chunk = json.loads(chunk)
# chunk = json.dumps(chunk)
yield chunk
# yield chunk.decode("utf-8", "ignore")
return stream_data(generate())
def stream_data(generate):
"""
:param generate: generator object
:return:
"""
return Response(stream_with_context(generate), content_type='application/json; charset=utf-8')
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7000)
# server = WSGIServer(('0.0.0.0', 7000), app)
# server.serve_forever()
1.2 FastAPI服务
- 基础流式文本输出
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
async def data_generator():
for i in range(5):
# 模拟耗时操作(如 AI 推理、实时数据采集)
await asyncio.sleep(1)
yield f"数据块 {i}\n"
@app.get("/stream")
async def stream_data():
return StreamingResponse(
data_generator(),
media_type="text/plain", # 或 "text/event-stream"(SSE)
)
- 流式传输大文件
@app.get("/stream-file")
def stream_large_file():
def file_iterator():
with open("large_file.zip", "rb") as f:
while chunk := f.read(4096): # 每次读取 4KB
yield chunk
return StreamingResponse(
file_iterator(),
media_type="application/octet-stream",
headers={"Content-Disposition": "attachment; filename=large_file.zip"}
)
- Server-Sent Events (SSE)
@app.get("/sse")
async def sse():
async def event_generator():
for i in range(5):
await asyncio.sleep(1)
# SSE 格式要求
yield f"data: 事件 {i}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
2.前端代码
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Single Request Demo</title>
</head>
<body>
<h1>Single Request Demo</h1>
<pre id="single-output"></pre>
<script>
fetch('http://127.0.0.1:7000/stream_json_data')
.then(response => {
if (!response.ok) {
throw new Error('Network response was not ok');
}
return response.text(); // 或者使用 response.json() 如果后端返回的是JSON
})
.then(data => {
// 将接收到的数据添加到页面上
var output = document.getElementById('single-output');
output.textContent += data; // 注意这里使用了 = 而不是 +=,因为我们只想要一次性的内容
})
.catch(error => {
console.error('There has been a problem with your fetch operation:', error);
});
</script>
</body>
</html>
五. 总结
流式接口设计是一种强大的编程范式,它能够提高代码的可读性、灵活性和可维护性。特别是在现在大模型的应用中很常见。
以上就是关于python -【流式接口返回响应处理】的基本介绍使用,希望对你有所帮助。
作者:天下·第二