pyflink1.18-实时数据管道(Python+Flink-cdc)
pyflink1.18搭建实时数据管道
实际需求
在数仓或数据湖建设过程中需将不同数据源的数据同步到数仓中,大部分数据统计采用T+1数据更新就能满足,但对于实时看板、报告比如进单运营看板、回款看板等,需要实时数据更新,此时就需要针对实际需求搭建“实时数据管道”,将业务系统的销售、回款数据实时同步到数仓,并实时计算、统计。Pyflink基于flink的python接口,能将python的数据处理功能通过flink流式实现,为实时数仓与数据统计提供处理工具。
基础架构
实时数据管道,基于flink流式计算功能搭建实现,在flink基础上开发部署python程序,通过flink-cdc监听业务系统数据库(一般为关系型数据库如oracle、MySQL)增、删、改日志,从而获取发送变化的数据,并将发生变化数据按照统计分析的计算逻辑进行相应计算,之后装入数据仓库数据集市层,供运营看板、回款看板使用。即应用flink的流式计算能力,实现python数据处理功能的实时计算,为业务、运营等提供实时的数据分析功能。
实现代码
以oracle数据库作为业务系统源库,以doris作为实时数仓,flink1.18、python3.10作为开发工具,来实现实时数据管道。oracle、doris、flink、python的安装有很多成熟的帖子,此处不做表述(oracle需开启redo日志)。
此代码只做了简单的数据同步(即从源表实时同步到目标表),在同步过程中可以通过flinksql进行数据处理,然后将处理完的数据装入到目标表;另外也可以引入pandas包对数据进行计算处理然后装到目标表。
定义oracle日志获取与doris写入链接函数
直接上代码
#!/usr/bin/python
# -*- coding:UTF-8 -*-
import os
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink_etl.utils.common import get_jar_file
def oracle_cdc_2_doris(rd_tablename,rd_columns,rd_host,rd_port,rd_username,rd_password,rd_dbname,rd_schema_name,wt_dbtype,wt_tablename,wt_columns,wt_host,wt_port,wt_username,wt_password,wt_db_name, env):
"""
Flink CDC 实时同步 Oracle 数据到 Doris 案例,参数中包括了读取的表、表头、oracle地址、端口、用户名、密码、库名(oracle用户)、schema,写入的库类型、表名、表头、doris地址、端口、用户名、密码、库名,以及创建的flink进程
ORACLE -- FLINK 大小写敏感 表名及表头 都要用大写
开启库、表:DATABASE、ORACLE_SOURCE补充日志
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA #开启数据库补充日志
ALTER TABLE FLINKU.ORACLE_SOURCE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS #将要同步的表 开启全日志设置
"""
# 先处理读取的表名、表头 处理成大写
all_col = []
allcname = []
for ec in rd_columns:
cname = (ec['colname']).upper()
ctype = ec['coltype']
ecol_sql = str(cname)+' '+str(ctype)
all_col.append(ecol_sql)
allcname.append(cname)
colsql = str(all_col).replace('[','').replace(']','').replace("'",'')
source_oracle = """CREATE TABLE CDC_"""+rd_tablename+""" ("""+colsql+""", PRIMARY key("""+allcname[0]+""") not enforced
) WITH (
'connector' = 'oracle-cdc',
'hostname' = '"""+rd_host+"""',
'port' = '"""+rd_port+"""',
'username' = '"""+rd_username+"""',
'password' = '"""+rd_password+"""',
'database-name' = '"""+rd_dbname+"""',
'schema-name' = '"""+rd_schema_name+"""',
'table-name' = '"""+rd_tablename+"""',
'debezium.database.tablename.case.insensitive' = 'false',# 使用 Oracle “大小写不敏感” 的特性
'debezium.log.mining.strategy' = 'online_catalog', # oracle 监控日志方式
'debezium.log.mining.continuous.mine' = 'true', # 实时搜集日志的工作交给 Oracle 自动完成
'scan.startup.mode' = 'latest-offset' # 用于指定启动时从哪个位置开始读取数据
);
"""
# sink_print = """create table if not exists sink_print (
# ID int,
# NAME STRING
# )
# with
# (
# 'connector' = 'print'
# )
# """
# 下边处理写入的表名、表头 处理成大写
wt_col = []
wt_cn = []
for ew in wt_columns:
wname = (ew['colname']).upper()
wtype = ew['coltype']
wcol_sql = str(wname)+' '+str(wtype)
wt_col.append(wcol_sql)
wt_cn.append(wname)
wolsql = str(wt_col).replace('[','').replace(']','').replace("'",'')
# -- 支持同步insert/update/delete事件
sink_to_doris = """CREATE TABLE """+wt_tablename+""" ("""+wolsql+""")
WITH (
'connector' = 'doris',
'fenodes' = '"""+wt_host+""":"""+wt_port+"""',
'table.identifier' = '"""+wt_db_name+"""."""+wt_tablename+"""',
'username' = '"""+wt_username+"""',
'password' = '"""+wt_password+"""',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.enable-delete' = 'true', -- 同步删除事件
'sink.label-prefix' = 'doris_label'
);
"""
wt_sql = str(wt_cn).replace('[','').replace(']','').replace("'",'')
# 从源表装入目标表-此处可以加入数据处理逻辑,比如清洗、转换、聚合等计算逻辑
insert_sql = "insert into "+wt_tablename+" select "+wt_sql+" from CDC_"+rd_tablename+";"
# print_sql = "insert into sink_print select * from cdc_oracle_source;"
env.execute_sql(source_oracle) # 创建源表
# env.execute_sql(sink_print) # 需要打印内容的 创建打印表(sink_print)
env.execute_sql(sink_to_doris) # 创建目标表
statement_set = env.create_statement_set() # 创建Statementset 实例。 可执行包含多个sink作业
# statement_set.add_insert_sql(print_sql) # 插入数据
statement_set.add_insert_sql(insert_sql) # 插入数据
statement_set.execute().wait() # 执行statement set
定义创建flink环境(表环境)
直接上代码
def flink_table_env(rd_dbtype,rd_tablename,rd_columns,rd_host,rd_port,rd_username,rd_password,rd_dbname,rd_schema_name,wt_dbtype,wt_tablename,wt_columns,wt_host,wt_port,wt_username,wt_password,wt_db_name,check_interval):
'''此处一样传入需要的参数,读取的数据库参数、写入的数据库参数,日志检查时间间隔等'''
pro_dir = str(os.path.dirname(__file__))# 获取当前位置-后续传入本地flink connector jar包使用
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
# 设置重启策略为 "fixed-delay"
t_env.get_config().set("restart-strategy.type", "fixed-delay")
t_env.get_config().set("restart-strategy.fixed-delay.attempts", "3")
t_env.get_config().set("restart-strategy.fixed-delay.delay", "30s")
# 设置并行度(可能未生效)
t_env.get_config().get_configuration().set_string("execution.parallelism.default", "2")
# 设置 checkpoint 检查点模式为 EXACTLY_ONCE
t_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE")
# t_env.get_config().set("execution.checkpointing.interval", "3s")
t_env.get_config().get_configuration().set_string("execution.checkpointing.interval", check_interval)# 原程序这么写的 checkpoint的时间间隔(检查的时间间隔,同步一次数据的时间间隔)毫秒
t_env.get_config().get_configuration().set_string("execution.checkpointing.retention.seconds", "600")
# 检查点清除策略(正常停止后保留最后一个检查点文件-retain_on_cancellation,DELETE_ON_CANCELLATION则表示cancel任务时删除检查点,只有在任务失败时,才会被保留。)
t_env.get_config().get_configuration().set_string("execution.checkpointing.cleanup","retain_on_cancellation")
# 设置故障时,是否从最新的检查点进行恢复数据
t_env.get_config().get_configuration().set_string('execution.checkpointing.setPreferCheckpointForRecovery', "true")
# 设置 statebackend 类型为 "rocksdb",其他可选项有 "filesystem" 和 "jobmanager"
# 你也可以将这个属性设置为 StateBackendFactory 的完整类名
# e.g. org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory ojdbc8-19.3.0.0.jar
t_env.get_config().set("state.backend.type", "rocksdb")
# 设置 RocksDB statebackend 所需要的 checkpoint 目录
t_env.get_config().set("execution.checkpointing.dir", "file:///"+pro_dir+"/check_point/tmp/checkpoints/")
if str(rd_dbtype)=='oracle':
# flink相应的jar包
filters = ['flink-connector-jdbc-3.1.2-1.18.jar', 'flink-doris-connector-1.18-1.6.0.jar', 'ojdbc8-19.3.0.0.jar', 'flink-sql-connector-oracle-cdc-2.2.1.jar']
str_jars = get_jar_file(dir_path=pro_dir+'/jar_dir', need_jars=filters)
t_env.get_config().set("pipeline.jars", str_jars)
logger.info('flink_table_env:[create oracle cdc] table <'+rd_tablename+'>')
# oracle同步到doris
oracle_cdc_2_doris(rd_tablename,rd_columns,rd_host,rd_port,rd_username,rd_password,rd_dbname,rd_schema_name,wt_dbtype,wt_tablename,wt_columns,wt_host,wt_port,wt_username,wt_password,wt_db_name, env=t_env)
else:
# 可再添加其他读取的数据源类型,比如MySQL
print('flink_table_env:[read db type is not exists]')
启动python-flink程序
直接上代码
if __name__ == '__main__':
rd_dbtype = 'oracle'
rd_tablename = 'ORACLE_SOURCE'
rd_columns = [{'colname':'ID','coltype':'INT'},{'colname':'NAME','coltype':'STRING'}]
rd_host = 'x.x.x.x'
rd_port = '1521'
rd_username = 'flinku'
rd_password = 'xxxxxxxxx'
rd_dbname = 'flink'
rd_schema_name = 'FLINKU'
wt_dbtype = 'doris'
wt_tablename = 'doris_sink'
wt_columns = [{'colname':'id','coltype':'INT'},{'colname':'name','coltype':'STRING'}]
wt_host = 'x.x.x.x'
wt_port = '8030'
wt_username = 'root'
wt_password = 'dorisp'
wt_db_name = 'ods_flink'
check_interval = 1000
flink_table_env(rd_dbtype,rd_tablename,rd_columns,rd_host,rd_port,rd_username,rd_password,rd_dbname,rd_schema_name,wt_dbtype,wt_tablename,wt_columns,wt_host,wt_port,wt_username,wt_password,wt_db_name,check_interval)
部署运行
pyflink数据同步程序开发完成,需要部署在flink集群上运行。flink集群的部署前边讲过通过python进行安装部署。在flink部署服务器上,以flink运行pyflink代码,将python程序部署到flink集群。可通过以下命令实现:
/home/python310/lib/python3.10/site-packages/pyflink/bin/flink run -py /home/python/main_flink_cdc.py -pyexec /home/python310/bin/python3.10
# flink安装路径 pyflink程序路径 python路径
部署运行后结果如下:
作者:欧阳伯疼