Python FastAPI实时监控系统构建教程:从日志监听到流式输出

随着越来越多团队开始在 AI 平台做深度学习训练或在线推理服务,对后端的日志监控需求也随之提升。特别是在调试模型和编译过程时,如果前端能够及时获取后端日志,将大大缩短开发者排查 Bug 的时间。本文将分享一个利用 PythonFastAPIWatchdog 组合,实时监控并推送日志文件的实战方案。

1. 小故事:及时发现编译错误,拯救熬夜写代码的你

曾几何时,你在凌晨三点上线一个新版本,为了在这个“万籁俱寂”的黄金时段快速完成部署,偏偏遇到了编译卡死,一看日志发现都是乱码,或者压根就没有来得及打印出来。如果你能及时了解后端日志,或许就能迅速发现是因为缺少了某些依赖导致编译失败。这个时候,一个可以“实时推送日志流”的系统,就像及时雨一样,不仅能帮你减少 Debug 噩梦,还能更好地和前端结合做可视化展示。

于是,本文的故事就此展开:通过 FastAPI + Watchdog 搭建起一套极简又可复用的日志实时推送系统,从此不再为后端日志的调试而抓狂。

2. 主要依赖和原理

2.1 所需技能与依赖

  • Python:3.7+(建议)
  • FastAPI:一个现代、快速、高效的 Web 框架
  • Watchdog:一个专门监控文件变化的 Python 库
  • (如果你是前端开发者,需要了解一点儿基本的 JavaScript)
  • 2.2 方案原理

    核心思路非常简单:

    1. Watchdog 监听目标目录下的日志文件;
    2. 一旦放哨狗发现文件发生变化,就会触发事件;
    3. 此时由 FastAPI 的后台逻辑(基于异步 asyncio)进行增量读取文件新内容;
    4. 最终通过 StreamingResponse 接口,将最新的日志内容按行推送给客户端;
    5. 前端在收到每次更新的日志后及时渲染到页面。

    如果你同时需要监控多个文件,可以为每个文件设置唯一别名(alias),从而在推送数据时携带该别名进行区分。

    3. FastAPI 接口定义与解释

    先来看关键的核心接口——基于 FastAPI 提供的 StreamingResponse。这能让后端以流的形式源源不断地推送数据给前端。下面是一个精简示例:

    from fastapi import FastAPI, Request
    from starlette.responses import StreamingResponse
    import asyncio
    
    app = FastAPI()
    
    @app.get("/streamFile")
    async def streamFile(request: Request):
        return StreamingResponse(log_push(request), media_type="text/event-stream")
    

    3.1 为什么要返回 StreamingResponse

  • 流式输出:通常 HTTP 响应要么一次性返回完整数据,要么在长连接情况下用 WebSocket 传输。而 StreamingResponse 可以在保持连接的同时,逐段地返回数据,让前端像读直播弹幕那样边读边显示。
  • text/event-stream:这是 SSE(Server-Sent Events)常用的 MIME 类型,为了方便前端处理,可以使用此类型。当然你也可以根据需要改为其他 MIME 类型。
  • 在实际项目中,你可以针对不同类型的日志文件,定义不同的路由接口,比如 /streamCompileLog/streamRunLog 等。也可以像示例那样写成一个通用的 /streamFile,根据查询参数或请求体来决定具体监控哪个文件或哪一类文件。

    4. 文件监听及增量返回核心代码

    以下是示例的核心逻辑,本篇文章中我们会重点关注如何用 Watchdog 感知文件变化,并通过队列 + 异步实现增量数据读取,然后和前端配合完成实时显示。

    温馨提示:保证所有日志文件位于同一目录下;如果涉及分布式场景,你可能需要用其它方案(例如队列中转、RPC 通信等)来分发事件。下文示例代码已经做了相对完善的注释,跟着注释阅读可以帮助你理解原理。

    import asyncio
    import threading
    import json
    import traceback
    from typing import AsyncGenerator
    import aiofiles
    from watchdog.observers import Observer
    from watchdog.events import FileSystemEventHandler
    from fastapi import Request
    from pathlib import Path
    from starlette.responses import StreamingResponse
    
    # ======= 1. 定义需要监控的文件(须在同一目录)======
    file_paths = {
        'java_run': 'path/file1.log',
        'java_compile': 'path/file2.log',
    }
    
    # 转化为统一的 POSIX 路径,提升容错率
    for k in file_paths.keys():
        file_paths[k] = Path(file_paths[k]).as_posix()
    
    # ======= 2. 文件变化处理器 =======
    class FileChangeHandler(FileSystemEventHandler):
        def __init__(self, queue: asyncio.Queue, loop: asyncio.AbstractEventLoop):
            """
            初始化文件变化处理器。
    
            参数:
            - queue: asyncio.Queue,用于放入文件变化事件。
            - loop: 异步事件循环实例
            """
            self.queue = queue
            self.loop = loop
    
        def on_modified(self, event):
            """
            当监视的文件被修改时调用。
    
            参数:
            - event: FileSystemEvent,包含文件变化事件信息的对象。
    
            如果修改的文件是 .log 后缀,则通过线程安全方式安排一个 asyncio 任务:
            将被修改的文件路径放入队列 queue,等待后续处理。
            """
            if event.src_path.endswith(".log"):
                self.loop.call_soon_threadsafe(
                    asyncio.create_task, 
                    self.queue.put(event.src_path)
                )
    
    
    async def file_monitor(monitor_dir: str, queue: asyncio.Queue):
        """
        监视指定目录下的文件变化,并将变化事件放入队列中。
    
        参数:
        - monitor_dir: str,需要监视的目录路径。
        - queue: asyncio.Queue,用于放入文件变化事件。
    
        异常:
        - asyncio.CancelledError: 当监视任务被取消时,停止监视。
        """
        loop = asyncio.get_running_loop()
        event_handler = FileChangeHandler(queue, loop)
        observer = Observer()
        observer.schedule(event_handler, path=monitor_dir, recursive=False)
        print(f"Starting file monitor for directory {monitor_dir}")
        observer.start()
    
        try:
            while True:
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            observer.stop()
    
        observer.join()
    
    # 监控目录取自 file_paths 中任意一个文件所在目录(前提:所有文件在同一目录)
    base_dir = Path(list(file_paths.values())[0]).parent
    
    # 构造上次读取位置字典,初始位置均为 file_paths 里的文件当前位置
    last_positions = {
        path: Path(path).stat().st_size for path in file_paths.values()
    }
    
    # 队列,用于存放被修改的文件路径
    queue = asyncio.Queue()
    
    # 启动目录监控线程
    monitor_thread = threading.Thread(
        target=asyncio.run, 
        args=(file_monitor(str(base_dir), queue),)
    )
    monitor_thread.start()
    
    # ======= 3. 核心推送函数 =======
    async def log_push(request: Request) -> AsyncGenerator[str, None]:
        """
        实时监控日志文件并推送更新内容。
        当文件变化且属于file_paths时,从上次读取位置开始读取新内容,并返回。
    
        数据格式: { "file": file_alias, "content": new_line }
    
        参数:
        - request: 当前的HTTP请求对象,用于检查客户端是否断连等。
    
        返回:
        - AsyncGenerator[str]: 异步生成器,产生文件新内容的 JSON 字符串或自定义字符串。
        """
    
        try:
            while True:
                # 如果客户端断开连接,就退出循环停止推送
                if await request.is_disconnected():
                    break
    
                # 如果文件变化队列里有事件,则取出进行处理
                if not queue.empty():
                    modified_path = await queue.get()
    
                    # 将修改的文件路径与 file_paths 中已知路径进行比对
                    modified_path = Path(modified_path).as_posix()
                    if modified_path not in file_paths.values():
                        continue
    
                    # 根据 path 反查别名 alias
                    alias = next(
                        (k for k, v in file_paths.items() if v == modified_path), 
                        modified_path
                    )
    
                    # 如果特定文件是GBK编码,可兼容不同编码
                    encoding = "utf-8"
                    if alias == 'java_compile':
                        encoding = "gbk"
    
                    async with aiofiles.open(modified_path, "r", encoding=encoding) as file:
                        # 如果目前文件大小比上次“末尾位置”还小,表示文件可能被清空或轮转
                        if Path(modified_path).stat().st_size < last_positions[modified_path]:
                            last_positions[modified_path] = 0
    
                        # 从上次记录的末尾位置开始读取
                        await file.seek(last_positions[modified_path])
                        content = await file.read()
                        current_position = await file.tell()
    
                        # 更新 last_positions
                        last_positions[modified_path] = current_position
    
                        # 如果确有新的内容,则分chunk推送
                        if content:
                            # 为防止单次输出过大,做一个简单的分块
                            max_chunk_size = 1024
                            for i in range(0, len(content), max_chunk_size):
                                chunk = content[i:i+max_chunk_size]
    
                                # 这里的拼接格式可以自由设计
                                # 示例:先加上文件标记 alias,再接内容,末尾用 {$} 作分隔符
                                result_data = alias + chunk + "{$}"
                                yield result_data
                
                # 睡眠一点时间,避免 CPU 空转
                await asyncio.sleep(0.1)
    
        except Exception as e:
            print(f"Error during log push: {e}")
            traceback.print_exc()
    

    核心解释

    1. FileChangeHandler:继承自 Watchdog 的 FileSystemEventHandler,用来捕获文件修改事件,将 被修改的文件路径 放入异步队列 queue 中。
    2. file_monitor:这是我们真正的“哨兵”协程,基于 Observer 监控某个目录,监听文件创建、修改等事件。只要文件发生变化,就会触发事件处理器。
    3. last_positions:用一个字典记录每个文件当前已经读取到的位置,防止重复读取旧内容;一旦文件被清空或被轮转,stat().st_size 会变小,就重置到0开始读。
    4. log_push:实际对外暴露的异步生成器函数。它会持续不断地从 queue 中获取被修改的文件路径并增量读取内容,然后用 yield 返回给前端。如果前端断开连接(request.is_disconnected() 为 True),就停止推送。

    5. 前端解析示例

    在前端,我们同样需要以流的形式读取日志并实时显示。下面的 JavaScript 示例函数就是一个可行的做法:通过 fetchReadableStream 组合,为了减少对第三方库的依赖,这里直接使用原生 API。

    async function fetchStreamResult(
      url,
      options,
      onComplete
    ) {
      try {
        // 发起请求并获取响应
        const response = await fetch(url, options);
    
        // 检查响应体是否存在,如果不存在则说明当前浏览器不支持ReadableStream
        if (!response.body) {
          onComplete(false, 'ReadableStream is not supported in this browser');
          throw new Error('ReadableStream is not supported in this browser');
        }
    
        // 获取响应体的读取器和解码器
        const reader = response.body.getReader();
        const decoder = new TextDecoder('utf-8');
    
        // 用于缓存上一次没有拆分完的数据
        let buffer = '';
    
        while (true) {
          const { value, done } = await reader.read();
    
          // 如果流已经结束
          if (done) {
            // 如果 buffer 里还有剩余内容,可以根据需要补充处理
            if (buffer.length > 0) {
              onComplete(true, buffer);
            }
            break;
          }
    
          // 解码得到当前 chunk
          const chunkStr = decoder.decode(value, { stream: true });
    
          // 累积到 buffer 中进行拆分
          buffer += chunkStr;
    
          // 查找分隔符 "{$}" 并处理每一段
          let startIndex = 0;
          let endIndex = buffer.indexOf('{$}', startIndex);
    
          while (endIndex !== -1) {
            // 截取从 startIndex 到 endIndex 的部分
            const line = buffer.substring(startIndex, endIndex);
            onComplete(true, line);
    
            // 更新 startIndex 到 endIndex + 3 ({$} 的长度)
            startIndex = endIndex + 3;
            endIndex = buffer.indexOf('{$}', startIndex);
          }
    
          // 更新 buffer 为最后一个未处理的部分
          buffer = buffer.substring(startIndex);
        }
      } catch (error) {
        onComplete(false, error.message);
        throw error;
      }
    }
    

    前端关键点

  • fetch(url, options):发起 HTTP 请求去访问 /streamFile
  • ReadableStream:浏览器端用 reader.read() 的方式一块块抓取数据,相当于后端 yield 的对等方;
  • onComplete:用于将每条日志回调给调用方,或者记录到页面上。这里需要处理分隔符,因为后端可能会把日志堆在一起一次性发来。
  • 6. 前端展示示例

    最后,会有一个调用函数,比如:

    function queryJavaRunningLogs() {
      fetchStreamResult('/leiwo/chat/streamFile?file_name=java_run', {
        method: 'GET',
      }, (status, text) => {
        if (status) {
          try {
            // 根据别名(例如 'java_run')来区分日志来源
            if (text.startsWith('java_run')) {
              // 去掉前缀
              const chunk = text.substring('java_run'.length);
              // 将 chunk 加到页面上对应的展示区域
              console.log("java_run", chunk);
              // 也可以做 UI badge 提示
            } else {
              // 同理处理 java_compile
              const chunk = text.substring('java_compile'.length);
              console.log("java_compile", chunk);
            }
          } catch (error) {
            console.error(error);
          }
        }
      });
    }
    
    1. /streamFile 请求流数据;
    2. 收到数据后,如果 textjava_run 开头则认为是运行日志,否则就判断是不是 java_compile 编译日志;
    3. 将对应的日志内容显示在页面上,并根据当前是否在对应 Tab 判断是否要增加消息提醒。

    7. 方案价值与细节补充

    1. 实时性强:只要日志文件发生更新,就能在极短时间内(毫秒级到秒级)推送到前端。对在线调试、实时监控非常有帮助。
    2. 多文件区分:采用别名机制,便于在前端做标签或分类显示。如果后续需要监控更多文件,只需往 file_paths 里增加一条配置即可。
    3. 编码兼容:实际项目中,不同日志文件可能采用不同的编码(例如中文日志常见的 GBK)。在读取文件时根据别名动态设置编码,避免乱码。
    4. 大文件处理:如果日志文件超大,一次性读取可能内存不够。这里的分块写法(max_chunk_size)能防止一次返回数据体量过大,也能在浏览器端滚动渲染出最新日志。
    5. 扩展性:如果想把这些日志再进行分析,比如关键字高亮、过滤敏感信息、或加上 WebSocket 双向通信机制,整个项目的架构依旧保持足够清晰。
    6. 应用场景
    7. AI 平台 环境中,可以监控训练日志、推理日志;
    8. 微服务 环境中,也可以替换成自动发现多个日志文件并进行合并推送;
    9. 针对 服务器运维,这套逻辑也可以与 tail -f 等命令结合使用。

    8. 总结

    在当下 AI 项目高速迭代的背景下,能第一时间获取后台日志意义重大:

  • 缩短调试时间,更快处理问题;
  • 增强可视化与安全性,日志整合在前端界面上统一查看,而不是 SSH 上去到处找日志;
  • 兼容多种语言开发:Java、Python、前端都能快速用上,无缝对接。
  • 以上就是在 Python + FastAPI + Watchdog 环境下,构建实时日志流推送的一种可行方案示例。如果你正在被一堆堆无法及时获取的日志困扰,相信这套方案能给你带来极大的帮助,为后续的 AI 平台 或其他场景开发打下坚实的基础。祝各位开发者都能在与 Bug 的战斗中战无不胜!


    文章到此结束,愿这份实时日志推送方案能如同一只忠实的看门狗,为你守护调试世界的和平。握紧它,让你的调试工作如虎添翼,告别无尽的后台日志搜索之痛吧!

    作者:AI陪跑

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python FastAPI实时监控系统构建教程:从日志监听到流式输出

    发表回复