Python中实现流式接口响应处理详解

一. 前言

在Python中实现流式处理通常涉及到处理大量的数据,这些数据可能来自于网络、磁盘或其他数据源。流式处理的一个关键点是数据不需要一次性加载到内存中,而是逐块处理。

二、设计原则

  • 可读性:流式接口的设计应着重提高代码的可读性。这要求开发者在命名和构造代码时,要考虑到如何使代码更加直观和易于理解。
  • 流畅性:通过链式调用,流式接口可以实现一系列操作的流畅组合,从而提高代码的编写效率和可读性。
  • 灵活性:流式接口允许开发者根据需要动态地添加或修改操作序列,从而提高了代码的灵活性。
  • 三、实现方式

  • 返回对象本身:在流式接口的方法中,通常通过返回对象本身(例如使用this关键字)来维持链式调用的连续性。
  • 泛型与类型安全:在需要处理不同类型数据时,可以使用泛型来保持类型安全,同时提供灵活的操作序列。
  • 上下文管理:流式接口可能需要管理复杂的上下文信息,例如数据库连接、事务状态等。在设计时,应考虑到如何有效地管理这些上下文信息,以确保代码的正确性和稳定性。
  • 四. 代码设计

    1.python后端代码:

    1.1 Flask服务

    import json
    
    import requests
    from flask import Flask, Response, stream_with_context, request
    import time
    
    from flask_cors import CORS
    from gevent.pywsgi import WSGIServer
    
    app = Flask(__name__)
    
    CORS(app, resources={r"/*": {"origins": "*"}})  # 允许所有源访问,但请注意生产环境的安全性
    
    
    @app.route('/ret_json_data', methods=['GET'])
    def ret_json_data():
        print('start request!')
        return {'hello': 'hello flask'}
    
    
    @app.route('/stream_json_data',  methods=["GET", "POST"])
    def stream_json_data():
        json_data = {
            'state': 0,
            'data': [],
            'text': '',
            'message': ''
        }
        ret_str = "大模型通常指的是那些规模庞大、参数数量极多的人工智能模型,它们在训练时使用了海量的数据,能够处理复杂的任务并展现出强大的语言理解和生成能力。"
    
        def generate():
            for i in range(len(ret_str)):
                json_data["text"] = ret_str[i]
                time.sleep(0.1)  # 暂停一秒,模拟数据产生的时间间隔
                print(f"Data: {json_data}\n")
                yield json.dumps(json_data, ensure_ascii=False)
    
        print('stream-data')
        return Response(stream_with_context(generate()), content_type='application/json; charset=utf-8')
        # return stream_data(generate())
    
    
    @app.route('/stream_data_to_frontend')
    def stream_data_to_frontend():
        # 假设你有一个外部 URL,该 URL 提供流式数据
        external_url = 'http://127.0.0.1:7000/stream_json_data'
    
        # 使用 requests 库的流式响应来接收数据
        headers = {"Content-Type": "application/json"}
        response = requests.get(external_url, stream=True, headers=headers)
        # with requests.get(external_url, stream=True, headers=headers) as r:
        #     # print('r========', r.json())
        #     # 检查外部请求是否成功
        if response.status_code == 200:
            # 创建一个生成器来流式传输响应内容
            def generate():
                for chunk in response.iter_content(chunk_size=1024):  # 你可以设置合适的 chunk_size
                    if chunk:
                        print('chunk: ', chunk)
                        # yield chunk
                        # yield chunk.decode("utf-8", "ignore")
                        chunk = json.loads(chunk)
                        chunk['tt'] = 'ttttt'
                        yield json.dumps(chunk, ensure_ascii=False)
    
                        # 创建一个 Flask 响应对象,并使用 stream_with_context 来发送流式响应
    
            return Response(stream_with_context(generate()), content_type='application/json; charset=utf-8')
        else:
            # 如果外部请求失败,返回错误信息
            return 'Failed to retrieve data from external source', 500
    
    
    @app.route('/stream_data_to_frontend_test')
    def stream_data_to_frontend_test():
        url = "http://ip:port/llm"
        params = {
            "messages": [
                {
                    "role": "user",
                    "content": "储能公司在该项目攻关中取得了哪些关键性突破?"
                }
            ]
        }
        headers = {"Content-Type": "application/json"}
        response = requests.post(url=url, json=params, stream=True, headers=headers)
    
        def generate():
            for chunk in response.iter_content(1024):
                print(chunk.decode("utf-8", "ignore"))
                if chunk:
                    yield chunk.decode("utf-8", "ignore")
    
        return Response(stream_with_context(generate()), content_type='application/json; charset=utf-8')
    
    
    @app.route('/stream_faq_data_to_frontend_test')
    def stream_faq_data_to_frontend_test():
        url = 'http://ip:port/url'
        # 请求的数据
        data = {
            "query": "测试"
        }
        headers = {"Content-Type": "application/json"}
        response = requests.post(url=url, json=data, stream=True, headers=headers)
    
        def generate():
            for chunk in response.iter_content(1024):
                print('chunk', chunk.decode("utf-8", "ignore"))
                if chunk:
                    # chunk = json.loads(chunk)
                    # chunk = json.dumps(chunk)
                    yield chunk
                    # yield chunk.decode("utf-8", "ignore")
    
        return stream_data(generate())
    
    
    def stream_data(generate):
        """
        :param generate: generator object
        :return:
        """
        return Response(stream_with_context(generate), content_type='application/json; charset=utf-8')
    
    
    if __name__ == '__main__':
        app.run(host='0.0.0.0', port=7000)
        # server = WSGIServer(('0.0.0.0', 7000), app)
        # server.serve_forever()
    
    

    1.2 FastAPI服务

    1. 基础流式文本输出
    from fastapi import FastAPI
    from fastapi.responses import StreamingResponse
    import asyncio
    
    app = FastAPI()
    
    async def data_generator():
        for i in range(5):
            # 模拟耗时操作(如 AI 推理、实时数据采集)
            await asyncio.sleep(1)
            yield f"数据块 {i}\n"
    
    @app.get("/stream")
    async def stream_data():
        return StreamingResponse(
            data_generator(),
            media_type="text/plain",  # 或 "text/event-stream"(SSE)
        )
    
    1. 流式传输大文件
    @app.get("/stream-file")
    def stream_large_file():
        def file_iterator():
            with open("large_file.zip", "rb") as f:
                while chunk := f.read(4096):  # 每次读取 4KB
                    yield chunk
    
        return StreamingResponse(
            file_iterator(),
            media_type="application/octet-stream",
            headers={"Content-Disposition": "attachment; filename=large_file.zip"}
        )
    
    1. Server-Sent Events (SSE)
    @app.get("/sse")
    async def sse():
        async def event_generator():
            for i in range(5):
                await asyncio.sleep(1)
                # SSE 格式要求
                yield f"data: 事件 {i}\n\n"
    
        return StreamingResponse(
            event_generator(),
            media_type="text/event-stream"
        )
    

    2.前端代码

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <meta name="viewport" content="width=device-width, initial-scale=1.0">
        <title>Single Request Demo</title>
    </head>
    <body>
    
    <h1>Single Request Demo</h1>
    <pre id="single-output"></pre>
    
    <script>
        fetch('http://127.0.0.1:7000/stream_json_data')
            .then(response => {
                if (!response.ok) {
                    throw new Error('Network response was not ok');
                }
                return response.text(); // 或者使用 response.json() 如果后端返回的是JSON
            })
            .then(data => {
                // 将接收到的数据添加到页面上
                var output = document.getElementById('single-output');
                output.textContent += data; // 注意这里使用了 = 而不是 +=,因为我们只想要一次性的内容
            })
            .catch(error => {
                console.error('There has been a problem with your fetch operation:', error);
            });
    </script>
    
    </body>
    </html>
    

    五. 总结

    流式接口设计是一种强大的编程范式,它能够提高代码的可读性、灵活性和可维护性。特别是在现在大模型的应用中很常见。

    以上就是关于python -【流式接口返回响应处理】的基本介绍使用,希望对你有所帮助。

    作者:天下·第二

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python中实现流式接口响应处理详解

    发表回复