【Python】基于Pyside6开发一个通用的在线升级工具

介绍

在线升级

基于PySide6开发一个通用的在线升级程序涉及到多个方面的设计和实现,包括用户界面、网络通信、版本检查、下载更新文件、解压和替换旧文件等。以下是一个大致的步骤指南,帮助你构建这样一个应用程序。

1. 环境准备

  • 安装Python:确保你的环境中已经安装了Python,并且是3.x版本。

  • 安装PySide6:使用pip来安装PySide6库。你可以通过命令行运行pip install PySide6来进行安装。

  • 其他依赖项:根据需要安装其他的第三方库,比如用于HTTP请求的requests,以及用于处理压缩文件的zipfiletarfile

  • 2. 创建基本的用户界面

    使用Qt Designer(可以通过PySide6安装包中的pyside6-designer启动)或者直接在代码中创建一个简单的GUI,包含按钮如“检查更新”、“下载并安装更新”,以及进度条显示下载进度等元素。

    3. 实现版本检查功能

    编写函数来连接到服务器上的API接口,获取最新的版本信息并与本地版本进行比较。

    4. 下载更新文件

    如果发现有新版本可用,则开始下载更新包。这里可以使用requests库的流式下载特性来逐步读取数据并更新进度条。

    5. 解压和替换旧文件

    下载完成后,你需要将新的文件解压到合适的位置,并覆盖现有的应用程序文件。这一步需要注意权限问题,可能需要管理员权限才能修改某些系统文件。

    6. 自动重启应用

    更新完成后,通常希望应用程序能够自动重启以加载新的版本。这可以通过执行可执行文件路径的方式实现。

    7. 集成所有组件

    最后一步是将上述所有部分集成到一起,在界面上添加事件处理器来触发相应的逻辑。

    流程概括

    这段代码描述了一个完整的在线升级应用程序,它包含了用户界面(UI)、主程序逻辑以及与后台服务的交互。以下是详细解析:

    配置文件 service.ini

    配置文件 service.ini 是应用程序和服务器通信的关键,其中保存了诸如服务器URL、当前版本信息、要下载的文件名等关键参数。该文件允许应用程序在启动时检查是否有可用的新版本,并据此决定是否开始升级流程。

    主程序逻辑 upgrade_service.py

    这个Python脚本是整个应用程序的核心,负责处理所有与升级相关的任务。它使用PySide6来创建图形用户界面,并通过HTTP请求与后端API进行通信以获取更新文件。

  • 初始化:在 MainWindow 类中,构造函数设置了无边框窗口、鼠标拖动功能、图标样式,并启动了下载线程。

  • 文件下载download_file 方法实现了从服务器下载文件的主要逻辑,包括获取升级信息、下载主程序文件、权重文件和其他额外资源(如ZIP格式的第三方库)。每个文件下载完成后都会更新进度条和状态标签。

  • 进程管理end_processis_process_running_by_name 函数用于终止正在运行的目标进程,确保旧版本的应用程序不会干扰新版本的安装。

  • 解压缩unzip_file 函数用于解压下载下来的ZIP文件到指定目录。

  • 事件监听listen_download_state 方法被定时器周期性调用,用来监控下载进度并执行必要的操作,比如关闭窗口或重启应用程序。

  • 样式应用apply_stylesheet 函数来自 qt_material 库,用于美化应用程序界面,提供更现代的视觉体验。

  • 后台服务(Java)

    虽然提供的Java代码片段有限,但可以推测这是一个Spring Boot风格的RESTful API接口,用于提供文件下载服务。@ApiOperation("下载升级文件")@RequestMapping("download") 注解表明这是一个用于下载升级文件的API端点。客户端(即Python应用程序)会向此端点发送请求,以获取最新的应用程序文件或其他需要更新的资源。

    关键点补充

    1. 错误处理:代码中对可能出现的问题进行了适当的异常捕获,比如网络连接失败时给出提示,并尝试清理临时文件。

    2. 用户体验:通过进度条和动态更新的状态文本,让用户清楚了解升级过程中的每一步进展,增强了用户体验。

    3. 安全性考量:尽管没有在代码片段中直接体现,但在实际开发中应该考虑对下载链接的安全性和完整性验证,例如使用HTTPS协议、数字签名或校验和来确保文件未被篡改。

    4. 跨平台兼容性:由于使用了PySide6作为UI框架,这使得应用程序可以在多个操作系统上运行,而不需要为每个平台单独编写代码。

    5. 自动化打包:提供了如何使用 pyinstaller 将Python脚本打包成独立可执行文件的指令,方便分发给最终用户。

    UI PyDesigner

    f3c1918d36f648aeb9dd8bfac3410fba.png

    主程序

    upgrade_service.py

    import time
    
    import configparser
    import requests
    import sys
    import threading
    import psutil
    import os
    import zipfile
    import warnings
    
    from PySide6.QtWidgets import QApplication, QMainWindow
    from PySide6.QtCore import Qt, QTimer
    from PySide6.QtGui import QIcon, QMouseEvent
    from ui.main import Ui_MainWindow
    from qt_material import apply_stylesheet
    
    '''
    pip install pyinstaller pyside6 qt_material psutil requests
    
    pyside6-uic -o ../../ui/main.py ../../ui/main.ui
    
    pyinstaller --clean --icon=upgrade.ico D:\\1_local_project\IMVision\IMVDXUpgrade\\upgrade_service.py
    pyinstaller upgrade_service.spec --noconfirm
    '''
    
    warnings.filterwarnings('ignore', category=DeprecationWarning)
    
    
    class UpgradeInfo(object):
        def __init__(self):
            self.username = ''
            self.version = ''
            self.url = ''
            self.extra_file = 0
            self.extra_path = ''
            self.remark = ''
    
    
    class MainWindow(QMainWindow, Ui_MainWindow):
        def __init__(self):
            super(MainWindow, self).__init__()
            self.setupUi(self)
            self.setWindowFlags(Qt.FramelessWindowHint)
    
            # 开启鼠标追踪
            self.setMouseTracking(True)
            # 记录鼠标按下时的位置
            self.drag_start_position = None
    
            self.setWindowIcon(QIcon('icons/upgrade.ico'))
            self.label.setStyleSheet(self.label.styleSheet() + 'font-size: 20px;')
    
            self.upgrade_finished = False
            self.finished_chunk_size = 0
            self.total_file_count = 0
            threading.Thread(target=self.download_file).start()
    
            self.timer = QTimer(self)
            self.timer.timeout.connect(self.listen_download_state)
            self.timer.start(100)
    
        def mousePressEvent(self, event: QMouseEvent) -> None:
            if event.button() == Qt.LeftButton:
                self.drag_start_position = event.pos()
    
        def mouseMoveEvent(self, event: QMouseEvent) -> None:
            if self.drag_start_position is not None:
                delta = event.pos() - self.drag_start_position
                self.move(self.pos() + delta)
    
        def mouseReleaseEvent(self, event: QMouseEvent) -> None:
            if event.button() == Qt.LeftButton:
                self.drag_start_position = None
    
        def listen_download_state(self):
            if self.upgrade_finished:
                self.timer.stop()
                cf.set('upgrade', 'state', '0')
                cf.set('upgrade', 'currentVersion', new_version)
                cf.write(open('service.ini', 'w'))
                if self.total_file_count > 0:
                    os.startfile(main_file)
                time.sleep(0.2)
                self.close()
    
        def updateLabel(self, c, t, overwrite=False):
            self.label.setText('正在升级...(' + c + '/' + t + ')') if not overwrite else self.label.setText(t)
    
        def download_file(self):
            upgrade_files = []
            self.finished_count = 0
            self.total_file_count = 0
            self.weight_file = 0
            self.extra_file = 0
            response = requests.get(upgrade_url + 'upgrade/v1/info?username=' + username, timeout=2)
            if response.status_code == 200:
                res = response.json()
                if res['code'] == 0 and res['data'] is not None:
                    data = res['data']
                    self.weight_file_tag = data['weight_file']
                    self.extra_file_tag = data['extra_file']
                    self.total_file_count = data['total_file_count']
                    upgrade_files = (data['extra_path'].replace('[', '').replace(']', '').replace(' ', '').split(','))
    
            if self.total_file_count == 0:
                self.updateLabel('0', '当前已是最新版本!', True)
                time.sleep(1)
                self.upgrade_finished = True
                return
    
            if self.weight_file_tag == 1:
                self.total_file_count += len(weight_files)
            self.updateLabel('0', str(self.total_file_count))
            time.sleep(0.2)
            # 下载主程序
            try:
                response = requests.get(upgrade_url + 'upgrade/v1/download?filename=' + main_file + '&username=' + username,
                                        stream=True)
                end_process(main_file)
                time.sleep(0.5)
                end_process(main_file)
                time.sleep(0.5)
    
                self.pbDownload.setMaximum(int(response.headers.get('Content-Length', 0)))
    
                with open(main_file_tmp, 'wb') as file:
                    self.finished_chunk_size = 0
                    for chunk in response.iter_content(chunk_size=8192 * 10):
                        if chunk:
                            file.write(chunk)
                            self.finished_chunk_size += len(chunk)
                            self.pbDownload.setValue(self.finished_chunk_size)
                            QApplication.processEvents()
    
                time.sleep(0.5)
                # 下载完成,删除main_file,将临时文件改成main_file
                os.remove(main_file)
                os.rename(main_file_tmp, main_file)
            except:
                # 下载失败,删除临时文件
                os.remove(main_file_tmp)
                self.updateLabel('0', '升级失败!请检查网络连接!', True)
                time.sleep(1)
                self.upgrade_finished = True
                return
    
            self.updateLabel('1', str(self.total_file_count))
    
            time.sleep(0.2)
            self.finished_count = 1
            if self.weight_file_tag == 1:
                for weight_file in weight_files:
                    weight_file_tmp = weight_file + '.tmp'
                    try:
                        # 下载权重文件
                        response = requests.get(
                            upgrade_url + 'upgrade/v1/download?filename=' + weight_file + '&username=' + username,
                            stream=True)
                        self.pbDownload.setMaximum(int(response.headers.get('Content-Length', 0)))
    
                        with open('weights/' + weight_file_tmp, 'wb') as file:
                            self.finished_chunk_size = 0
                            for chunk in response.iter_content(chunk_size=8192 * 10):
                                if chunk:
                                    file.write(chunk)
                                    self.finished_chunk_size += len(chunk)
                                    self.pbDownload.setValue(self.finished_chunk_size)
                                    QApplication.processEvents()
    
                        time.sleep(0.5)
                        # 下载完成,删除weight_file,将临时文件改成weight_file
                        os.remove('weights/' + weight_file)
                        os.rename('weights/' + weight_file_tmp, 'weights/' + weight_file)
                        self.finished_count += 1
                    except:
                        # 下载失败,删除临时文件
                        os.remove('weights/' + weight_file_tmp)
                        self.updateLabel('0', '升级失败!请检查网络连接!', True)
                        time.sleep(1)
                        self.upgrade_finished = True
                        return
    
                    self.updateLabel(str(self.finished_count), str(self.total_file_count))
    
            # 最后下载第三方文件(zip格式)
            if self.extra_file_tag == 1:
                try:
                    for idx, filename in enumerate(upgrade_files):
                        response = requests.get(
                            upgrade_url + 'upgrade/v1/download?filename=' + filename + '&username=' + username, stream=True)
    
                        extra_file_zip = '_internal\\' + filename
                        self.pbDownload.setMaximum(int(response.headers.get('Content-Length', 0)))
                        with open(extra_file_zip, 'wb') as file:
                            self.finished_chunk_size = 0
                            for chunk in response.iter_content(chunk_size=8192):
                                if chunk:
                                    file.write(chunk)
                                    self.finished_chunk_size += len(chunk)
                                    self.pbDownload.setValue(self.finished_chunk_size)
                                    QApplication.processEvents()
    
                        # 解压缩zip
                        unzip_file(extra_file_zip, './_internal')
                        self.updateLabel(str(idx + self.finished_count + 1), str(self.total_file_count))
                except:
                    self.updateLabel('0', '升级失败!请检查网络连接!', True)
                    time.sleep(1)
                    self.upgrade_finished = True
                    return
    
            self.updateLabel('0', '升级完成!启动新版本...', True)
            time.sleep(0.5)
            self.upgrade_finished = True
    
        def close(self):
            super(MainWindow, self).close()
    
    
    def end_process(process_name):
        try:
            if is_process_running_by_name(process_name):
                os.system('taskkill /f /im ' + process_name)
        except:
            pass
    
    
    def is_process_running_by_name(process_name):
        for proc in psutil.process_iter(['name']):
            if proc.info['name'].find(process_name) == 0:
                return True
        return False
    
    
    def unzip_file(zip_path, extract_path='.'):
        if not os.path.exists(extract_path):
            os.makedirs(extract_path)
    
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(extract_path)
    
        os.remove(zip_path)
    
    
    extra = {
        'font_family': '微软雅黑',
        'density_scale': '0',
        'button_shape': 'default',
        'pyside6': True
    }
    
    if __name__ == '__main__':
        cf = configparser.ConfigParser()
        cf_file = 'service.ini'
        if os.path.exists(cf_file):
            cf.read(cf_file)
            upgrade_url = cf.get('server', 'url')
            main_file = cf.get('upgrade', 'mainfile')
            weight_files = cf.get('upgrade', 'weightfiles').split(',')
            main_file_tmp = main_file + '.tmp'
            current_version = cf.get('upgrade', 'currentversion')
            new_version = cf.get('upgrade', 'newversion')
            state = cf.getint('upgrade', 'state')
            username = cf.get('sys', 'username')
        else:
            sys.exit(0)
    
        if state == 0 or (current_version == new_version):
            sys.exit(0)
    
        app = QApplication(sys.argv)
        main_window = MainWindow()
        apply_stylesheet(app, theme='dark_teal2.xml', extra=extra)
        main_window.show()
        sys.exit(app.exec())
    

    配置文件 service.ini

    2aafd9c3a9c941aab6a405510fd250d6.png

    后台服务(Java)

        @ApiOperation("下载升级文件")
        @RequestMapping("download")
        public void download(@RequestParam String filename, @RequestParam String username) {
            UpgradeSetDO upgradeSetDO = upgradeSetMapper.selectOne(
                    new QueryWrapper<UpgradeSetDO>().eq("username", username).eq("state", 1)
                            .last(" order by id desc limit 1"));
    
            ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
            assert requestAttributes != null;
            HttpServletResponse response = requestAttributes.getResponse();
            assert response != null;
    
            String filePath = "C:\\IMVD\\UPGRADE\\" + username + "\\" + upgradeSetDO.getVersion() + "\\" + filename;
            File file = new File(filePath);
            response.setHeader("Content-Disposition", "attachment;filename=" + filename);
            response.setHeader("Content-Length", String.valueOf(file.length()));
            try (OutputStream outputStream = response.getOutputStream();
                 BufferedInputStream bis = new BufferedInputStream(new FileInputStream(filePath))) {
                byte[] buff = new byte[1024];
                int i = bis.read(buff);
                while (i != -1) {
                    outputStream.write(buff, 0, buff.length);
                    outputStream.flush();
                    i = bis.read(buff);
                }
            } catch (IOException e) {
                log.error(e.getMessage());
            }
        }

    作者:道友老李

    物联沃分享整理
    物联沃-IOTWORD物联网 » 【Python】基于Pyside6开发一个通用的在线升级工具

    发表回复