pyflink1.18-实时数据管道(Python+Flink-cdc)

pyflink1.18搭建实时数据管道

  • 实际需求
  • 基础架构
  • 实现代码
  • 定义oracle日志获取与doris写入链接函数
  • 定义创建flink环境(表环境)
  • 启动python-flink程序
  • 部署运行
  • 实际需求

    在数仓或数据湖建设过程中需将不同数据源的数据同步到数仓中,大部分数据统计采用T+1数据更新就能满足,但对于实时看板、报告比如进单运营看板、回款看板等,需要实时数据更新,此时就需要针对实际需求搭建“实时数据管道”,将业务系统的销售、回款数据实时同步到数仓,并实时计算、统计。Pyflink基于flink的python接口,能将python的数据处理功能通过flink流式实现,为实时数仓与数据统计提供处理工具。

    基础架构

    实时数据管道,基于flink流式计算功能搭建实现,在flink基础上开发部署python程序,通过flink-cdc监听业务系统数据库(一般为关系型数据库如oracle、MySQL)增、删、改日志,从而获取发送变化的数据,并将发生变化数据按照统计分析的计算逻辑进行相应计算,之后装入数据仓库数据集市层,供运营看板、回款看板使用。即应用flink的流式计算能力,实现python数据处理功能的实时计算,为业务、运营等提供实时的数据分析功能。
    请添加图片描述

    实现代码

    以oracle数据库作为业务系统源库,以doris作为实时数仓,flink1.18、python3.10作为开发工具,来实现实时数据管道。oracle、doris、flink、python的安装有很多成熟的帖子,此处不做表述(oracle需开启redo日志)。
    此代码只做了简单的数据同步(即从源表实时同步到目标表),在同步过程中可以通过flinksql进行数据处理,然后将处理完的数据装入到目标表;另外也可以引入pandas包对数据进行计算处理然后装到目标表。

    定义oracle日志获取与doris写入链接函数

    直接上代码

    #!/usr/bin/python
    # -*- coding:UTF-8 -*-
    import os
    from pyflink.table import EnvironmentSettings, TableEnvironment
    from pyflink_etl.utils.common import get_jar_file
    
    def oracle_cdc_2_doris(rd_tablename,rd_columns,rd_host,rd_port,rd_username,rd_password,rd_dbname,rd_schema_name,wt_dbtype,wt_tablename,wt_columns,wt_host,wt_port,wt_username,wt_password,wt_db_name, env):
        """
        Flink CDC 实时同步 Oracle 数据到 Doris 案例,参数中包括了读取的表、表头、oracle地址、端口、用户名、密码、库名(oracle用户)、schema,写入的库类型、表名、表头、doris地址、端口、用户名、密码、库名,以及创建的flink进程
        ORACLE -- FLINK 大小写敏感 表名及表头 都要用大写
        开启库、表:DATABASE、ORACLE_SOURCE补充日志
        ALTER DATABASE ADD SUPPLEMENTAL LOG DATA #开启数据库补充日志
        ALTER TABLE FLINKU.ORACLE_SOURCE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS #将要同步的表 开启全日志设置
        """
        # 先处理读取的表名、表头 处理成大写
        all_col = []
        allcname = []
        for ec in rd_columns:
            cname = (ec['colname']).upper()
            ctype = ec['coltype']
            ecol_sql = str(cname)+' '+str(ctype)
            all_col.append(ecol_sql)
            allcname.append(cname)
        colsql = str(all_col).replace('[','').replace(']','').replace("'",'')
        source_oracle = """CREATE TABLE CDC_"""+rd_tablename+""" ("""+colsql+""", PRIMARY key("""+allcname[0]+""") not enforced
        ) WITH (
        'connector' = 'oracle-cdc',
        'hostname' = '"""+rd_host+"""',
        'port' = '"""+rd_port+"""',
        'username' = '"""+rd_username+"""',
        'password' = '"""+rd_password+"""',
        'database-name' = '"""+rd_dbname+"""',
        'schema-name' = '"""+rd_schema_name+"""',
        'table-name' = '"""+rd_tablename+"""',
        'debezium.database.tablename.case.insensitive' = 'false',# 使用 Oracle “大小写不敏感” 的特性
        'debezium.log.mining.strategy' = 'online_catalog', # oracle 监控日志方式
        'debezium.log.mining.continuous.mine' = 'true', # 实时搜集日志的工作交给 Oracle 自动完成
        'scan.startup.mode' = 'latest-offset' # 用于指定启动时从哪个位置开始读取数据
        );
        """
        # sink_print = """create table if not exists sink_print (
        # ID int,
        # NAME STRING
        # )
        # with
        # (
        #      'connector' = 'print'
        # )
        # """
        # 下边处理写入的表名、表头 处理成大写
        wt_col = []
        wt_cn = []
        for ew in wt_columns:
            wname = (ew['colname']).upper()
            wtype = ew['coltype']
            wcol_sql = str(wname)+' '+str(wtype)
            wt_col.append(wcol_sql)
            wt_cn.append(wname)
        wolsql = str(wt_col).replace('[','').replace(']','').replace("'",'')
        # -- 支持同步insert/update/delete事件
        sink_to_doris = """CREATE TABLE """+wt_tablename+""" ("""+wolsql+""")
        WITH (
          'connector' = 'doris',
          'fenodes' = '"""+wt_host+""":"""+wt_port+"""',
          'table.identifier' = '"""+wt_db_name+"""."""+wt_tablename+"""',
          'username' = '"""+wt_username+"""',
          'password' = '"""+wt_password+"""',
          'sink.properties.format' = 'json',
          'sink.properties.read_json_by_line' = 'true',
          'sink.enable-delete' = 'true',  -- 同步删除事件
          'sink.label-prefix' = 'doris_label'
        );
        """
        wt_sql = str(wt_cn).replace('[','').replace(']','').replace("'",'')
        # 从源表装入目标表-此处可以加入数据处理逻辑,比如清洗、转换、聚合等计算逻辑
        insert_sql = "insert into "+wt_tablename+" select "+wt_sql+" from CDC_"+rd_tablename+";"
        # print_sql = "insert into sink_print select * from cdc_oracle_source;"
    
        env.execute_sql(source_oracle) # 创建源表
        # env.execute_sql(sink_print) # 需要打印内容的 创建打印表(sink_print)
        env.execute_sql(sink_to_doris) # 创建目标表
    
        statement_set = env.create_statement_set() # 创建Statementset 实例。 可执行包含多个sink作业
        # statement_set.add_insert_sql(print_sql) # 插入数据
        statement_set.add_insert_sql(insert_sql) # 插入数据
    
        statement_set.execute().wait() # 执行statement set
    

    定义创建flink环境(表环境)

    直接上代码

    def flink_table_env(rd_dbtype,rd_tablename,rd_columns,rd_host,rd_port,rd_username,rd_password,rd_dbname,rd_schema_name,wt_dbtype,wt_tablename,wt_columns,wt_host,wt_port,wt_username,wt_password,wt_db_name,check_interval):
        '''此处一样传入需要的参数,读取的数据库参数、写入的数据库参数,日志检查时间间隔等'''
        pro_dir = str(os.path.dirname(__file__))# 获取当前位置-后续传入本地flink connector jar包使用
        t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
        # 设置重启策略为 "fixed-delay"
        t_env.get_config().set("restart-strategy.type", "fixed-delay")
        t_env.get_config().set("restart-strategy.fixed-delay.attempts", "3")
        t_env.get_config().set("restart-strategy.fixed-delay.delay", "30s")
    
        # 设置并行度(可能未生效)
        t_env.get_config().get_configuration().set_string("execution.parallelism.default", "2")
    
        # 设置 checkpoint 检查点模式为 EXACTLY_ONCE
        t_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE")
        # t_env.get_config().set("execution.checkpointing.interval", "3s")
        t_env.get_config().get_configuration().set_string("execution.checkpointing.interval", check_interval)# 原程序这么写的 checkpoint的时间间隔(检查的时间间隔,同步一次数据的时间间隔)毫秒
        t_env.get_config().get_configuration().set_string("execution.checkpointing.retention.seconds", "600")
        # 检查点清除策略(正常停止后保留最后一个检查点文件-retain_on_cancellation,DELETE_ON_CANCELLATION则表示cancel任务时删除检查点,只有在任务失败时,才会被保留。)
        t_env.get_config().get_configuration().set_string("execution.checkpointing.cleanup","retain_on_cancellation")
        # 设置故障时,是否从最新的检查点进行恢复数据
        t_env.get_config().get_configuration().set_string('execution.checkpointing.setPreferCheckpointForRecovery', "true")
    
        # 设置 statebackend 类型为 "rocksdb",其他可选项有 "filesystem" 和 "jobmanager"
        # 你也可以将这个属性设置为 StateBackendFactory 的完整类名
        # e.g. org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory  ojdbc8-19.3.0.0.jar
        t_env.get_config().set("state.backend.type", "rocksdb")
    
        # 设置 RocksDB statebackend 所需要的 checkpoint 目录
        t_env.get_config().set("execution.checkpointing.dir", "file:///"+pro_dir+"/check_point/tmp/checkpoints/")
        if str(rd_dbtype)=='oracle':
            # flink相应的jar包
            filters = ['flink-connector-jdbc-3.1.2-1.18.jar', 'flink-doris-connector-1.18-1.6.0.jar', 'ojdbc8-19.3.0.0.jar', 'flink-sql-connector-oracle-cdc-2.2.1.jar']
            str_jars = get_jar_file(dir_path=pro_dir+'/jar_dir', need_jars=filters)
            t_env.get_config().set("pipeline.jars", str_jars)
            logger.info('flink_table_env:[create oracle cdc] table <'+rd_tablename+'>')
            # oracle同步到doris
            oracle_cdc_2_doris(rd_tablename,rd_columns,rd_host,rd_port,rd_username,rd_password,rd_dbname,rd_schema_name,wt_dbtype,wt_tablename,wt_columns,wt_host,wt_port,wt_username,wt_password,wt_db_name, env=t_env)
        else:
            # 可再添加其他读取的数据源类型,比如MySQL
            print('flink_table_env:[read db type is not exists]')
    

    启动python-flink程序

    直接上代码

    if __name__ == '__main__':
        rd_dbtype = 'oracle'
        rd_tablename = 'ORACLE_SOURCE'
        rd_columns = [{'colname':'ID','coltype':'INT'},{'colname':'NAME','coltype':'STRING'}]
        rd_host = 'x.x.x.x'
        rd_port = '1521'
        rd_username = 'flinku'
        rd_password = 'xxxxxxxxx'
        rd_dbname = 'flink'
        rd_schema_name = 'FLINKU'
        wt_dbtype = 'doris'
        wt_tablename = 'doris_sink'
        wt_columns = [{'colname':'id','coltype':'INT'},{'colname':'name','coltype':'STRING'}]
        wt_host = 'x.x.x.x'
        wt_port = '8030'
        wt_username = 'root'
        wt_password = 'dorisp'
        wt_db_name = 'ods_flink'
        check_interval = 1000
        flink_table_env(rd_dbtype,rd_tablename,rd_columns,rd_host,rd_port,rd_username,rd_password,rd_dbname,rd_schema_name,wt_dbtype,wt_tablename,wt_columns,wt_host,wt_port,wt_username,wt_password,wt_db_name,check_interval)
    

    部署运行

    pyflink数据同步程序开发完成,需要部署在flink集群上运行。flink集群的部署前边讲过通过python进行安装部署。在flink部署服务器上,以flink运行pyflink代码,将python程序部署到flink集群。可通过以下命令实现:

    /home/python310/lib/python3.10/site-packages/pyflink/bin/flink run -py /home/python/main_flink_cdc.py -pyexec /home/python310/bin/python3.10
    # flink安装路径     pyflink程序路径      python路径
    

    部署运行后结果如下:

    作者:欧阳伯疼

    物联沃分享整理
    物联沃-IOTWORD物联网 » pyflink1.18-实时数据管道(Python+Flink-cdc)

    发表回复