Python多数据源整合实践指南

python 多数据源整合

步骤说明

  1. 模拟数据生成部分

  2. 使用Flask创建了一个返回订单数据的API接口
  3. 生成本地CSV测试文件
  4. 创建MySQL数据库表并插入测试数据
  5. 数据读取部分

  6. 通过Requests客户端获取API数据
  7. 使用pandas直接读取MySQL数据
  8. 读取本地CSV文件
  9. 数据清洗部分

  10. 统一所有数据源的列名称
  11. 标准化日期格式(处理不同分隔符)
  12. 统一产品名称的命名规范
  13. 合并来自不同数据源的数据
  14. 处理缺失值并进行数据类型转换
  15. 结果输出

  16. 打印整合后的数据集
  17. 将结果保存到CSV文件

运行这个代码需要以下准备:

  1. 升级 pippython -m pip install --upgrade pip
  2. 安装依赖库:pip install pandas flask sqlalchemy pymysql requests
  3. 确保MySQL服务已启动并配置正确的连接参数
  4. API部分需要保持服务运行(正式使用时需要单独运行Flask应用)

最终输出的整合数据包含以下特征:

  • 统一的字段名称
  • 标准化的日期格式
  • 规范化的产品名称
  • 一致的数值类型
  • 处理后的完整数据记录
  • 模拟数据生成部分

    使用Flask创建了一个返回订单数据的API接口

    app.py

    from flask import Flask, jsonify
    import random, datetime
    import pandas as pd
    app = Flask(__name__)
    
    
    @app.route('/')
    def hello_world():  # put application's code here
      return '世界和平'
    
    
    @app.route('/api/orders', methods=['GET'])
    def get_orders():
      # 生成模拟API数据
      mock_data = [
        {
          "order_id": i,
          "product": random.choice(['手机', '电脑', '平板', '耳机']),
          "amount": random.randint(1, 5),
          "order_date": (datetime.date.today() - datetime.timedelta(days=i)).strftime('%Y/%m/%d')
        }
        for i in range(1, 6)
      ]
      return jsonify(mock_data)
    
    
    # 2.1 从API获取数据
    import requests
    
    # 启动Flask服务(在实际使用中需要在单独终端运行)
    # 此处为演示目的直接通过请求获取
    with app.test_client() as client:
      api_response = client.get('/api/orders')
      api_data = api_response.get_json()
      df_api = pd.DataFrame(api_data)
      print(df_api)
      
    if __name__ == '__main__':
      app.run()
    
    

    生成本地CSV测试文件

    mock/mock_csv.py

    import pandas as pd
    csv_data = {
      'transaction_id': [1001, 1002, 1003, 1004],
      'product_name': ['手机', '电脑', '键盘', '鼠标'],
      'quantity': [2, 1, 3, 4],
      'sale_date': ['2023-01-01', '2023-01-02', '2023-01-03', '2023-01-04']
    }
    
    pd.DataFrame(csv_data).to_csv('../data/local_sales.csv', index=False)
    

    创建MySQL数据库表并插入测试数据

    mock/mock_db.py

    import pymysql
    
    # 创建数据库连接
    conn = pymysql.connect(
      host="36.41.67.11",
      user="root",
      password="lihaozhe",
      database="test_db"
    )
    
    # 使用 cursor() 方法创建一个游标对象 cursor
    cursor = conn.cursor()
    
    # 创建测试表
    cursor.execute('drop database if exists test_db')
    cursor.execute('create database if not exists test_db')
    cursor.execute('use test_db')
    cursor.execute('drop table if exists test_table')
    cursor.execute("""CREATE TABLE IF NOT EXISTS online_orders (
                    order_no INT PRIMARY KEY comment '订单编号',
                    item VARCHAR(50) comment '订单商品',
                    qty INT comment '商品数量',
                    order_time DATE comment '订单时间')""")
    
    # 插入测试数据
    insert_data = [
      (3001, '手机', 1, '2023-01-01'),
      (3002, '电脑', 2, '2023-01-02'),
      (3003, '显示器', 1, '2023-01-03')
    ]
    
    cursor.executemany("INSERT INTO online_orders VALUES (%s, %s, %s, %s)", insert_data)
    conn.commit()
    
    # 关闭数据库连接
    cursor.close()
    conn.close()
    

    数据获取、统一格式、合并

    client.py

    # ========================== 第二部分:数据读取 ==========================
    # ========================== 第二部分:数据读取 ==========================
    import datetime
    
    import pandas as pd
    from sqlalchemy import create_engine
    # 2.1 从API获取数据
    import requests
    
    # API接口
    url_api = 'http://localhost:5000/api/orders'
    
    # 向API接口发送请求获取数据
    response = requests.get(url=url_api)
    # 解析API接口返回数据中为json数据
    api_data = response.json()
    # 将API接口返回的json格式数据封装为 DataFrame
    df_api = pd.DataFrame(api_data)
    print(f'API接口数据:\n{df_api}\n')
    
    # 2.2 从MySQL读取数据
    host = "36.41.67.11"
    port = 3306
    username = "root"
    password = "lihaozhe"
    database = "test_db"
    
    engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{database}')
    query = "SELECT * FROM online_orders"
    df_mysql01 = pd.read_sql(query, engine)
    df_mysql02 = pd.read_sql_table('online_orders', engine)
    print(f'数据库接口数据:\n{df_mysql01}\n')
    print(f'数据库接口数据:\n{df_mysql02}\n')
    
    # 2.3 从本地CSV读取数据
    df_csv = pd.read_csv('./data/local_sales.csv')
    print(f'从本地CSV读取数据:\n{df_csv}\n')
    print('=' * 60)
    
    # ========================== 第三部分:数据清洗 ==========================
    # 3.1 统一列名
    df_api = df_api.rename(columns={
      'order_id': 'order_id',
      'product': 'product_name',
      'amount': 'product_quantity',
      'order_date': 'order_date'
    })
    print(f'df_api数据:\n{df_api}\n')
    
    df_mysql = df_mysql01.rename(columns={
      'order_no': 'order_id',
      'item': 'product_name',
      'qty': 'product_quantity',
      'order_time': 'order_date'
    })
    print(f'df_mysql数据:\n{df_mysql}\n')
    
    df_csv = df_csv.rename(columns={
      'transaction_id': 'order_id',
      'quantity': 'product_quantity',
      'sale_date': 'order_date'
    })
    print(f'df_csv数据:\n{df_csv}\n')
    print('=' * 60)
    
    
    # 3.2 统一日期格式
    def standardize_date(date_str):
      # 处理不同分隔符的日期格式
      formats = ['%Y/%m/%d', '%Y-%m-%d']
      for fmt in formats:
        try:
          return datetime.datetime.strptime(date_str, fmt).date()
        except ValueError:
          continue
      return date_str  # 如果无法解析则保留原始值
    
    
    for df in [df_api, df_mysql, df_csv]:
      df['order_date'] = df['order_date'].apply(lambda x: standardize_date(str(x)))
    
    # 3.4 合并数据集
    combined_df = pd.concat([df_api, df_mysql, df_csv], ignore_index=True)
    
    # 3.5 处理缺失值(如果有)
    combined_df = combined_df.dropna(subset=['order_date', 'product_name'])
    
    # 3.6 类型转换
    combined_df = combined_df.astype({
      'order_id': 'int64',
      'product_quantity': 'int32',
      'order_date': 'datetime64[ns]'
    })
    # ========================== 第四部分:结果输出 ==========================
    print("整合后的数据集:")
    print(combined_df)
    combined_df.to_csv('./data/combined_sales_data.csv', index=False)
    

    作者:李昊哲小课

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python多数据源整合实践指南

    发表回复