Python多数据源整合实践指南
python 多数据源整合
步骤说明
-
模拟数据生成部分:
- 使用Flask创建了一个返回订单数据的API接口
- 生成本地CSV测试文件
- 创建MySQL数据库表并插入测试数据
-
数据读取部分:
- 通过Requests客户端获取API数据
- 使用pandas直接读取MySQL数据
- 读取本地CSV文件
-
数据清洗部分:
- 统一所有数据源的列名称
- 标准化日期格式(处理不同分隔符)
- 统一产品名称的命名规范
- 合并来自不同数据源的数据
- 处理缺失值并进行数据类型转换
-
结果输出:
- 打印整合后的数据集
- 将结果保存到CSV文件
运行这个代码需要以下准备:
- 升级
pip
:python -m pip install --upgrade pip
- 安装依赖库:
pip install pandas flask sqlalchemy pymysql requests
- 确保MySQL服务已启动并配置正确的连接参数
- API部分需要保持服务运行(正式使用时需要单独运行Flask应用)
最终输出的整合数据包含以下特征:
模拟数据生成部分
使用Flask创建了一个返回订单数据的API接口
app.py
from flask import Flask, jsonify
import random, datetime
import pandas as pd
app = Flask(__name__)
@app.route('/')
def hello_world(): # put application's code here
return '世界和平'
@app.route('/api/orders', methods=['GET'])
def get_orders():
# 生成模拟API数据
mock_data = [
{
"order_id": i,
"product": random.choice(['手机', '电脑', '平板', '耳机']),
"amount": random.randint(1, 5),
"order_date": (datetime.date.today() - datetime.timedelta(days=i)).strftime('%Y/%m/%d')
}
for i in range(1, 6)
]
return jsonify(mock_data)
# 2.1 从API获取数据
import requests
# 启动Flask服务(在实际使用中需要在单独终端运行)
# 此处为演示目的直接通过请求获取
with app.test_client() as client:
api_response = client.get('/api/orders')
api_data = api_response.get_json()
df_api = pd.DataFrame(api_data)
print(df_api)
if __name__ == '__main__':
app.run()
生成本地CSV测试文件
mock/mock_csv.py
import pandas as pd
csv_data = {
'transaction_id': [1001, 1002, 1003, 1004],
'product_name': ['手机', '电脑', '键盘', '鼠标'],
'quantity': [2, 1, 3, 4],
'sale_date': ['2023-01-01', '2023-01-02', '2023-01-03', '2023-01-04']
}
pd.DataFrame(csv_data).to_csv('../data/local_sales.csv', index=False)
创建MySQL数据库表并插入测试数据
mock/mock_db.py
import pymysql
# 创建数据库连接
conn = pymysql.connect(
host="36.41.67.11",
user="root",
password="lihaozhe",
database="test_db"
)
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = conn.cursor()
# 创建测试表
cursor.execute('drop database if exists test_db')
cursor.execute('create database if not exists test_db')
cursor.execute('use test_db')
cursor.execute('drop table if exists test_table')
cursor.execute("""CREATE TABLE IF NOT EXISTS online_orders (
order_no INT PRIMARY KEY comment '订单编号',
item VARCHAR(50) comment '订单商品',
qty INT comment '商品数量',
order_time DATE comment '订单时间')""")
# 插入测试数据
insert_data = [
(3001, '手机', 1, '2023-01-01'),
(3002, '电脑', 2, '2023-01-02'),
(3003, '显示器', 1, '2023-01-03')
]
cursor.executemany("INSERT INTO online_orders VALUES (%s, %s, %s, %s)", insert_data)
conn.commit()
# 关闭数据库连接
cursor.close()
conn.close()
数据获取、统一格式、合并
client.py
# ========================== 第二部分:数据读取 ==========================
# ========================== 第二部分:数据读取 ==========================
import datetime
import pandas as pd
from sqlalchemy import create_engine
# 2.1 从API获取数据
import requests
# API接口
url_api = 'http://localhost:5000/api/orders'
# 向API接口发送请求获取数据
response = requests.get(url=url_api)
# 解析API接口返回数据中为json数据
api_data = response.json()
# 将API接口返回的json格式数据封装为 DataFrame
df_api = pd.DataFrame(api_data)
print(f'API接口数据:\n{df_api}\n')
# 2.2 从MySQL读取数据
host = "36.41.67.11"
port = 3306
username = "root"
password = "lihaozhe"
database = "test_db"
engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{database}')
query = "SELECT * FROM online_orders"
df_mysql01 = pd.read_sql(query, engine)
df_mysql02 = pd.read_sql_table('online_orders', engine)
print(f'数据库接口数据:\n{df_mysql01}\n')
print(f'数据库接口数据:\n{df_mysql02}\n')
# 2.3 从本地CSV读取数据
df_csv = pd.read_csv('./data/local_sales.csv')
print(f'从本地CSV读取数据:\n{df_csv}\n')
print('=' * 60)
# ========================== 第三部分:数据清洗 ==========================
# 3.1 统一列名
df_api = df_api.rename(columns={
'order_id': 'order_id',
'product': 'product_name',
'amount': 'product_quantity',
'order_date': 'order_date'
})
print(f'df_api数据:\n{df_api}\n')
df_mysql = df_mysql01.rename(columns={
'order_no': 'order_id',
'item': 'product_name',
'qty': 'product_quantity',
'order_time': 'order_date'
})
print(f'df_mysql数据:\n{df_mysql}\n')
df_csv = df_csv.rename(columns={
'transaction_id': 'order_id',
'quantity': 'product_quantity',
'sale_date': 'order_date'
})
print(f'df_csv数据:\n{df_csv}\n')
print('=' * 60)
# 3.2 统一日期格式
def standardize_date(date_str):
# 处理不同分隔符的日期格式
formats = ['%Y/%m/%d', '%Y-%m-%d']
for fmt in formats:
try:
return datetime.datetime.strptime(date_str, fmt).date()
except ValueError:
continue
return date_str # 如果无法解析则保留原始值
for df in [df_api, df_mysql, df_csv]:
df['order_date'] = df['order_date'].apply(lambda x: standardize_date(str(x)))
# 3.4 合并数据集
combined_df = pd.concat([df_api, df_mysql, df_csv], ignore_index=True)
# 3.5 处理缺失值(如果有)
combined_df = combined_df.dropna(subset=['order_date', 'product_name'])
# 3.6 类型转换
combined_df = combined_df.astype({
'order_id': 'int64',
'product_quantity': 'int32',
'order_date': 'datetime64[ns]'
})
# ========================== 第四部分:结果输出 ==========================
print("整合后的数据集:")
print(combined_df)
combined_df.to_csv('./data/combined_sales_data.csv', index=False)
作者:李昊哲小课