Python Flask 数据库开发

  • 引言
  • 环境配置
  • 创建 Flask 应用,连接数据库
  • 定义路由
  • 定义模型
  • 创建表
  • 创建 API
  • 数据库直接操作
  • 启动 Flask 应用
  • app.py 示例
  • 运行 Flask
  • 访问应用
  • 展望
  • 引言

    在现代 web 开发中,Python 的 Flask 框架因其轻量和灵活性受到广泛欢迎。结合数据库技术,Flask 可以高效地管理和处理数据,使开发者能够快速构建功能强大的应用程序。无论是选择 MySQL 还是 PostgreSQL,掌握数据库与 Flask 的集成至关重要。本文将探讨如何在 Flask 中设置和使用数据库,涵盖从环境配置到基本操作的各个方面,便于我们去实现数据驱动的应用。

    环境配置

    在开始之前,需要确保 Python 环境已安装 Flask 模块。可以通过 pip 安装 Flask 和 SQLAlchemy(Flask 的 ORM 工具):

    pip install Flask Flask-SQLAlchemy
    

    接下来,根据所选数据库的类型(如 MySQL 或 PostgreSQL),部署相应的数据库服务。
    详情可参考:MySQL && PostgreSQL 数据库部署

    创建 Flask 应用,连接数据库

    from flask import Flask
    from flask_sqlalchemy import SQLAlchemy
    
    app = Flask(__name__)
    # 创建了一个 Flask 应用实例。__name__ 是 Python 的一个特殊变量,它指向当前模块的名字。
    
    app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://user:password@localhost/dbname'
    # mysql://user:password@localhost/dbname 的格式是:
    # mysql:数据库类型。
    # user:数据库用户名。
    # password:数据库密码。
    # localhost:数据库服务器地址(这里是本地)。
    # dbname:要连接的数据库名称。
    
    db = SQLAlchemy(app)
    # 创建了一个 SQLAlchemy 对象,并将 Flask 应用实例传递给它。这样,SQLAlchemy 就可以使用 Flask 应用的配置来管理数据库操作
    
    # @app.route 是 Flask 中的装饰器,用于定义路由。它将一个 URL 路径与一个视图函数关联起来。简单来说,当用户访问指定的 URL 时,Flask 会调用对应的视图函数并返回结果
    @app.route('/')
    def home():
        return 'Hello, Flask!'
    
    if __name__ == '__main__':
        app.run(debug=True)
    

    定义路由

    @app.route 是 Flask 中的装饰器,用于定义路由。它将一个 URL 路径与一个视图函数关联起来。简单来说,当用户访问指定的 URL 时,Flask 会调用对应的视图函数并返回结果。例如:

    @app.route('/')
    def home():
        return 'Hello, Flask!'
    

    在这个例子中,访问根 URL (/) 时,用户会看到 “Hello, Flask!” 的消息。通过使用不同的路径和方法(如 GET 或 POST),我们可以创建丰富的 Web 应用。

    定义模型

    使用 SQLAlchemy 的 ORM(对象关系映射)定义数据库模型是与数据库交互的第一步。数据库模型是用于定义和组织数据库中数据结构的抽象框架。它描述了数据的类型、关系、约束以及如何在数据库中存储和检索数据。

    一个清晰的数据库模型会大大提高开发效率和数据管理的质量,比如说:

  • 创建表:通过模型定义表的结构(字段类型、约束等)。
  • 执行 CRUD 操作:模型使得创建、读取、更新和删除数据变得简单。
  • 维护数据完整性:模型可以设置约束(如唯一性、外键),确保数据的正确性。
  • 简化查询:通过模型,可以使用 ORM(对象关系映射)库,轻松编写查询。
  • # 创建一个名为 DiagnosisCategory 的类,继承自 db.Model,这是 SQLAlchemy 中所有模型类的基类
    class DiagnosisCategory(db.Model):
    	__tablename__ = 'diagnosis_category'  # 设置表名
    	
    	# 定义一个名为 id 的列,类型为 Integer。primary_key=True 表示这个列是主键,用于唯一标识每个用户。
        id = db.Column(db.Integer, primary_key=True)
        
    	# 定义一个名为 name 的列,类型为 String,最大长度为 80。unique=True 表示这个列的值在数据库中必须是唯一的,nullable=False 表示该字段不能为空。
        name = db.Column(db.String(80), unique=True, nullable=False)
    
        def __repr__(self):
            return f'<User {self.username}>'
    

    创建表

    要创建 diagnosis_category这张表,只需在应用上下文中调用 db.create_all(),就会自动生成 User 表。这样,数据库中就会有一个新的 DiagnosisCategory 表 (diagnosis_category) 用于存储用户信息。

    class DiagnosisCategory(db.Model):
        __tablename__ = 'diagnosis_category'
        id = db.Column(db.Integer, primary_key=True)
        name = db.Column(db.String(80))
    
        def __repr__(self):
            return f'<User {self.username}>'
    
    # 创建表
    with app.app_context():
        db.create_all()
    

    创建 API

    使用 Flask-Restless 创建的 API 可以通过 HTTP 请求进行 CRUD 操作。其中,collection_name 定义了访问特定资源的 URL 路径。

    from flask import Flask
    from flask_sqlalchemy import SQLAlchemy
    from flask_restless import APIManager
    
    app = Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///yourdatabase.db'
    db = SQLAlchemy(app)
    
    # 定义模型
    class DiagnosisCategory(db.Model):
        __tablename__ = 'diagnosis_category'
        id = db.Column(db.Integer, primary_key=True)
        name = db.Column(db.String(80))
    
    # 创建数据库表
    with app.app_context():
        db.create_all()
        
    # 创建 API 管理器
    manager = APIManager(app, session=db.session)
    
    # 创建 API 端点
    manager.create_api(DiagnosisCategory, collection_name='category',
                       methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])
    

    访问这个 API 的路径将是 /api/category,前端代码(ts、js等)可以通过这个路径对后端服务器进行 CRUD 操作。

    // 获取所有分类(GET 请求):
    GET /api/category
    
    // 创建新分类(POST 请求):
    POST /api/category
    
    // 获取特定分类(GET 请求,假设 ID 为 1):
    GET /api/category/1
    
    // 更新特定分类(PUT 请求,假设 ID 为 1):
    PUT /api/category/1
    
    // 删除特定分类(DELETE 请求,假设 ID 为 1):
    DELETE /api/category/1
    
    

    数据库直接操作

    在 Flask 中,也可以使用 SQLAlchemy 提供的 API 直接操作数据库,使用 Python 对象进行 CRUD。

    # 创建
    new_user = User(username='example')	# 创建新用户实例
    db.session.add(new_user) # 将新用户添加到会话
    db.session.commit()	# 提交事务以保存更改
    
    # 查询
    users = User.query.all() # # 查询所有用户
    
    # 更新
    user = User.query.first() # # 查询第一个用户
    user.username = 'new_username'
    db.session.commit()
    
    # 删除
    db.session.delete(user) # 删除用户
    db.session.commit()
    

    启动 Flask 应用

    app.py 示例

    我们以一个实际应用为例:连接远程主机(10.2.0.92)上的 MySQL数据库,创建 t_diag_category、t_diag_model、t_diag_test_suite、t_diag_test_target、t_diag_test_result 五张表,定义相应的 CRUD 接口,实现基于表的SN字段查重、关系表获取等额外操作。

    # -*- coding: UTF-8 -*-
    
    import os
    import shutil
    import sys
    import argparse
    import json
    from flask import Flask, make_response, request, jsonify, send_from_directory
    from flask_sqlalchemy import SQLAlchemy
    from sqlalchemy import text
    from flask_restless import APIManager
    from datetime import datetime
    
    
    CURRENT_PATH = os.path.dirname(os.path.realpath(sys.argv[0]))
    DB_SERVER = '10.2.0.92'
    DB_PORT = '3306'
    DB_USERNAME = 'root'
    DB_PASSWORD = '123456'
    DB_NAME = 'diagnosisdev'
    
    app = Flask(__name__)
    db = None
    manager = None
    
    def initialize():
        global app
        global db
        global manager
    
        # Database
        app.config[
            'SQLALCHEMY_DATABASE_URI'] = f'mysql://{DB_USERNAME}:{DB_PASSWORD}@{DB_SERVER}:{DB_PORT}/{DB_NAME}?charset=utf8mb4'
        app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
        db = SQLAlchemy(app)
        manager = APIManager(app, session=db.session)
        
        class DiagnosisCategory(db.Model):
            __tablename__ = 't_diag_category'
            id = db.Column(db.Integer, primary_key=True, autoincrement=True)
            parent_id = db.Column(db.Integer, nullable=False, default=-1)
            name = db.Column(db.Text)
            create_user = db.Column(db.Integer)
            create_time = db.Column(db.DateTime, default=datetime.utcnow)
            modify_user = db.Column(db.Integer)
            modify_time = db.Column(db.DateTime, default=datetime.utcnow)
            remark = db.Column(db.Text)
            enabled = db.Column(db.Integer, default=1)
    
        class DiagnosisModel(db.Model):
            __tablename__ = 't_diag_model'
            id = db.Column(db.Integer, primary_key=True, autoincrement=True)
            parent_id = db.Column(db.Integer, nullable=False, default=-1)
            category_id = db.Column(
                db.Integer, db.ForeignKey('t_diag_category.id'))
            category = db.relationship('DiagnosisCategory', backref='t_diag_model')
            name = db.Column(db.Text)
            create_user = db.Column(db.Integer)
            create_time = db.Column(db.DateTime, default=datetime.utcnow)
            modify_user = db.Column(db.Integer)
            modify_time = db.Column(db.DateTime, default=datetime.utcnow)
            remark = db.Column(db.Text)
            enabled = db.Column(db.Integer, default=1)
            image_url = db.Column(db.Text)
    
        class TestSuite(db.Model):
            '''
            for test:
            curl http://127.0.0.1:50000/api/test_suite -H "Accept: application/vnd.api+json"
            curl http://127.0.0.1:50000/api/test_suite/1 -H "Accept: application/vnd.api+json"
            curl -X POST http://127.0.0.1:50000/api/test_suite -H "Content-Type: application/vnd.api+json" -H "Accept: application/vnd.api+json" -d '{"data":{"type": "test_suite", "attributes": {"name": "xxx", "data": "123"}}}'
            curl -X PATCH http://127.0.0.1:50000/api/test_suite/1 -H "Content-Type: application/vnd.api+json" -H "Accept: application/vnd.api+json" -d  '{"data":{"type": "test_suite", "id": "1", "attributes": {"name": "xxx", "data": "123"}}}'
            '''
            __tablename__ = 't_diag_test_suite'
            id = db.Column(db.Integer, primary_key=True, autoincrement=True)
            name = db.Column(db.Text)
            type = db.Column(db.Text)
            model_id = db.Column(db.Integer, db.ForeignKey('t_diag_model.id'))
            model = db.relationship('DiagnosisModel', backref='t_diag_test_suite')
            report_name = db.Column(db.Text)
            data = db.Column(db.Text)
            create_user = db.Column(db.Integer)
            create_time = db.Column(db.DateTime, default=datetime.utcnow)
            modify_user = db.Column(db.Integer)
            modify_time = db.Column(db.DateTime, default=datetime.utcnow)
            remark = db.Column(db.Text)
            enabled = db.Column(db.Integer, default=1)
            sop_url = db.Column(db.Text)
            case_type = db.Column(db.INT)
            # ALTER TABLE t_diag_test_suite ADD COLUMN sop_url TEXT;
            # ALTER TABLE t_diag_test_suite DROP COLUMN sop_url;
    
        class TestTarget(db.Model):
            __tablename__ = 't_diag_test_target'
            id = db.Column(db.Integer, primary_key=True, autoincrement=True)
            parent_id = db.Column(db.Integer, nullable=False, default=-1)
            model_id = db.Column(db.Integer, db.ForeignKey('t_diag_model.id'))
            model = db.relationship('DiagnosisModel', backref='t_diag_test_target')
            name = db.Column(db.Text)
            sn = db.Column(db.Text)
            is_hardware_changed = db.Column(db.Integer, default=0)
            create_user = db.Column(db.Integer)
            create_time = db.Column(db.DateTime, default=datetime.utcnow)
            modify_user = db.Column(db.Integer)
            modify_time = db.Column(db.DateTime, default=datetime.utcnow)
            remark = db.Column(db.Text)
            enabled = db.Column(db.Integer, default=1)
    
        class TestResult(db.Model):
            __tablename__ = 't_diag_test_result'
            id = db.Column(db.Integer, primary_key=True, autoincrement=True)
            target_id = db.Column(
                db.Integer, db.ForeignKey('t_diag_test_target.id'))
            target = db.relationship('TestTarget', backref='t_diag_test_result')
            test_suite_id = db.Column(
                db.Integer, db.ForeignKey('t_diag_test_suite.id'))
            test_suite = db.relationship('TestSuite', backref='t_diag_test_result')
            name = db.Column(db.Text)
            data = db.Column(db.Text)
            create_user = db.Column(db.Integer)
            create_time = db.Column(db.DateTime, default=datetime.now)
            modify_user = db.Column(db.Integer)
            modify_time = db.Column(db.DateTime, default=datetime.now)
            remark = db.Column(db.Text)
            enabled = db.Column(db.Integer, default=1)
    
        with app.app_context():
            db.create_all()
            manager.create_api(DiagnosisCategory, collection_name='category',
                               methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])
            manager.create_api(DiagnosisModel, collection_name='model',
                               methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])
            manager.create_api(TestSuite, collection_name='test_suite',
                               methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])
            manager.create_api(TestTarget, collection_name='test_target',
                               methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])
            manager.create_api(TestResult, collection_name='test_result',
                               methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])
    
    
    @app.route('/')
    def hello():
        return 'Hello, world!'
    
    
    @app.route('/api/is_scrapped_sn/<sn>', methods=['GET'])
    def is_scrapped_sn(sn):
        try:
            result = db.session.execute(
                text('SELECT COUNT(1) FROM t_diag_test_target WHERE sn = :sn AND scrap = 1'),  {'sn': sn})
            return '1' if list(result)[0][0] > 0 else '0'
        except Exception as e:
            print(e)
            return '0'
    
    
    @app.route('/api/is_duplicate_primary_sn/<sn>', methods=['GET'])
    def is_duplicate_primary_sn(sn):
        try:
            result = db.session.execute(
                text('SELECT COUNT(1) FROM t_diag_test_target WHERE sn = :sn AND enabled = 1 AND parent_id = -1'),  {'sn': sn})
            return '1' if list(result)[0][0] > 0 else '0'
        except Exception as e:
            print(e)
            return '0'
    
    
    @app.route('/api/is_duplicate_relationship_sn/<sn>', methods=['GET'])
    def is_duplicate_relationship_sn(sn):
        try:
            result = db.session.execute(
                text('SELECT COUNT(1) FROM t_diag_test_target WHERE sn = :sn AND enabled = 1 AND parent_id != -1'),  {'sn': sn})
            return '1' if list(result)[0][0] > 0 else '0'
        except Exception as e:
            print(e)
            return '0'
    
    
    @app.route('/api/disable_sn/<sn>', methods=['GET'])
    def disable_sn(sn):
        with db.session.begin():
            try:
                db.session.execute(
                    text('UPDATE t_diag_test_target SET enabled = 0 WHERE parent_id IN (SELECT id FROM t_diag_test_target WHERE sn = :sn)'),  {'sn': sn})
                db.session.execute(
                    text('UPDATE t_diag_test_target SET enabled = 0 WHERE sn = :sn'), {'sn': sn})
                db.session.commit()
                return '0'
            except Exception as e:
                print(e)
                db.session.rollback()
                return '-1'
    
    @app.route('/api/get_relationship_sn/<sn>', methods=['GET'])
    def get_relationship_sn(sn):
        try:
            result = db.session.execute(
                text('WITH RECURSIVE cte(id, parent_id, sn, psn) AS (SELECT id, parent_id, sn, psn FROM v_diag_valid_test_target WHERE sn = :sn UNION ALL SELECT t1.id, t1.parent_id, t1.sn, t1.psn FROM v_diag_valid_test_target t1, cte WHERE t1.psn = cte.sn) SELECT * FROM cte'),  {'sn': sn})
            data = [row[2] for row in result.fetchall()]
            print(data)
            return json.dumps(data)
        except Exception as e:
            print(e)
            return jsonify('[]')
    
    
    @app.route('/api/get_relationship_table/<sn>', methods=['GET'])
    def get_relationship_table(sn):
        try:
            result = db.session.execute(
                text("WITH RECURSIVE cte(sn, path) AS (SELECT sn, CAST(sn AS CHAR) FROM v_diag_valid_test_target WHERE sn = :sn UNION ALL SELECT t1.sn, CONCAT(cte.path, '.', t1.sn) FROM v_diag_valid_test_target t1 INNER JOIN cte ON t1.psn = cte.sn) SELECT path FROM cte ORDER BY path"), {'sn': sn})
            output = {}
            for data in result.fetchall():
                keys = data[0].split('.')
                temp = output
                for key in keys:
                    temp.setdefault(key, {"info": get_sn_info(key), "subsn": {}})
                    temp = temp[key]["subsn"]
            json_output = json.dumps(output)
            return json_output
        except Exception as e:
            print(e)
            return jsonify('[]')
    
    
    @app.route('/api/sn_scrap', methods=['POST'])
    def sn_scrap():
        try:
            sn_lists = request.json
            sql_query = """
                UPDATE t_diag_test_target
                SET enabled = 0 WHERE enabled = 1
                AND parent_id IN (SELECT id FROM (SELECT id FROM t_diag_test_target WHERE sn IN :sn_list) AS T1)
                OR (sn IN :sn_list)
            """
            db.session.execute(text(sql_query), {"sn_list": tuple(sn_lists)})
            sql_query = "UPDATE t_diag_test_target SET scrap = 1 WHERE sn IN :sn_list"
            db.session.execute(text(sql_query),  {"sn_list": tuple(sn_lists)})
            db.session.commit()
            return '0'
        except Exception as e:
            db.session.rollback()
            print(e)
            return '-1'
    
    
    if __name__ == '__main__':
        parser = argparse.ArgumentParser()
        parser.add_argument('path', nargs='*')
        parser.add_argument('--db_name', type=str, default='diagnosisdev',
                            help="mode: dev or null")
        parser.add_argument('--db_server', type=str, default='10.2.0.92',
                            help="database server ip address")
        parser.add_argument('--db_port', type=str, default='3306',
                            help="database server port")
        parser.add_argument('--db_username', type=str, default='root',
                            help="database server username")
        parser.add_argument('--db_password', type=str, default='123456',
                            help="database server password")
       
        args = parser.parse_args()
        DB_SERVER = args.db_server
        DB_PORT = args.db_port
        DB_USERNAME = args.db_username
        DB_PASSWORD = args.db_password
        DB_NAME = args.db_name
       
        initialize()
        app.run(host='0.0.0.0', port=50000, debug=True)
    

    运行 Flask

    在终端中导航到你的 Flask 应用文件所在的目录,然后运行:

    python app.py
    

    访问应用

    打开浏览器,访问 http://127.0.0.1:5000/,你应该能看到 “Hello, Flask!” 的消息。

    使用 curl 工具与运行在本机(127.0.0.1)的50000端口上的API进行交互。

  • 获取测试套件列表

    curl http://127.0.0.1:50000/api/test_suite -H "Accept: application/vnd.api+json"
    

    这个命令用于获取所有测试套件的列表。它向 /api/test_suite 发送一个GET请求,并指定接受的内容类型为 application/vnd.api+json,即JSON API格式。

  • 获取特定ID的测试套件

    curl http://127.0.0.1:50000/api/test_suite/1 -H "Accept: application/vnd.api+json"
    

    这个命令用于获取ID为1的测试套件的详细信息。它向 /api/test_suite/1 发送一个GET请求,同样指定接受的内容类型为 application/vnd.api+json。

  • 创建新的测试套件

    curl -X POST http://127.0.0.1:50000/api/test_suite -H "Content-Type: application/vnd.api+json" -H "Accept: application/vnd.api+json" -d '{"data":{"type": "test_suite", "attributes": {"name": "xxx", "data": "123"}}}'
    

    这个命令用于创建一个新的测试套件。它向 /api/test_suite 发送一个POST请求,并在请求体中提供要创建的测试套件的数据。这里指定了 Content-Type 为 application/vnd.api+json,表示发送的数据是JSON API格式,同时指定接受的内容类型也是 application/vnd.api+json。请求体中包含了一个 data 对象,其 type 为 test_suite,attributes 中包含了测试套件的名称和数据。

  • 更新特定ID的测试套件

    curl -X PATCH http://127.0.0.1:50000/api/test_suite/1 -H "Content-Type: application/vnd.api+json" -H "Accept: application/vnd.api+json" -d  '{"data":{"type": "test_suite", "id": "1", "attributes": {"name": "xxx", "data": "123"}}}'
    

    这个命令用于更新ID为1的测试套件的信息。它向 /api/test_suite/1 发送一个PATCH请求,并在请求体中提供更新后的测试套件数据。与创建请求类似,这里也指定了 Content-Type 和 Accept 为 application/vnd.api+json。请求体中的 data 对象包含了 type(test_suite)、id(1)以及更新后的 attributes。

  • 这些命令展示了如何使用 curl 工具与遵循JSON API规范的API进行基本的CRUD(创建、读取、更新、删除)操作。注意,实际的API实现可能会对这些请求有不同的要求,比如认证、权限等,这些在示例中没有体现。

    展望

    通过以上步骤,我们知道了如何在 Flask 应用中集成 MySQL 或 PostgreSQL 数据库。后续,可以深入学习更复杂的查询和数据库迁移等高级主题,以充分利用 Flask 和数据库的强大功能。

    作者:划碎、时光

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python Flask 数据库开发

    发表回复