flink系列之:使用flink cdc3从mysql数据库同步数据到doris和starrocks

news/2025/2/23 6:37:30

flink系列之:使用flink cdc3从mysql数据库同步数据到dorisstarrocks

  • 一、下载部署flink
  • 二、下载部署flink cdc3
  • 三、下载mysql-connector-java到flink和flink cdc的lib目录
  • 四、flink设置checkpoint支持增量同步数据
  • 五、mysql到doris和starrocks的yaml配置文件
  • 六、启动flink和flink cdc
  • 七、查看flink cdc任务同步日志
  • 八、查看mysql表和starrocks
  • 九、flink cdc技术生产环境应用

一、下载部署flink

  • 下载flink

解压flink

tar -zxvf flink-1.19.1-bin-scala_2.12.tgz

修改flink配置文件config.yaml

taskmanager:
  bind-host: localhost
  host: localhost
  numberOfTaskSlots: 6
  memory:
    process:
      size: 1728m

parallelism:
  default: 1
rest:
  address: 10.66.77.104
  # network interface, such as 0.0.0.0.
  bind-address: 10.66.77.104
  # port: 8081
  # # Port range for the REST and web server to bind to.
  # bind-port: 8080-8090

设置flink 环境变零

cd /etc/profile.d
cat flink.sh 

#export HADOOP_CLASSPATH=`hadoop classpath`
FLINK_HOME=/data/src/flink/flink-1.19.1
PATH=$PATH:$FLINK_HOME/bin:$FLINK_HOME/sbin

export PATH
export FLINK_HOME

启动flink

./start-cluster.sh

查看jps

jps
760234 StandaloneSessionClusterEntrypoint
390132 Jps
760880 TaskManagerRunner

查看flink web ui,{ip}:{port}
在这里插入图片描述

二、下载部署flink cdc3

  • https://github.com/apache/flink-cdc/releases
    在这里插入图片描述
    解压flink-cdc3
tar -zxvf flink-cdc-3.3.0-bin.tar.gz

下载Pipeline Connectors Jars和Source Connector Jars到lib目录

/data/src/flink/flink-cdc-3.3.0/lib   ls
flink-cdc-dist-3.3.0.jar                              flink-cdc-pipeline-connector-maxcompute-3.3.0.jar  flink-sql-connector-tidb-cdc-3.3.0.jar
flink-cdc-pipeline-connector-doris-3.3.0.jar          flink-cdc-pipeline-connector-mysql-3.3.0.jar       mysql-connector-java-8.0.28.jar
flink-cdc-pipeline-connector-elasticsearch-3.3.0.jar  flink-cdc-pipeline-connector-paimon-3.3.0.jar
flink-cdc-pipeline-connector-kafka-3.3.0.jar          flink-cdc-pipeline-connector-starrocks-3.3.0.jar

三、下载mysql-connector-java到flink和flink cdc的lib目录

https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28

在这里插入图片描述

wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar

四、flink设置checkpoint支持增量同步数据

  • execution.checkpointing.interval: 3000

参数说明

  • execution.checkpointing.interval: 这个参数用于指定 Flink 作业执行检查点的频率。检查点是 Flink 用于实现容错机制的一种机制,通过定期保存作业的状态,可以在发生故障时恢复到最近的一个检查点。
  • 3000: 这个值表示检查点的间隔时间,单位是毫秒(ms)。因此,3000 毫秒等于 3 秒。

starrocksyaml_100">五、mysql到doris和starrocks的yaml配置文件

放到任意目录下

mysql-to-doris.yaml

   source:
     type: mysql
     hostname: ip
     port: 3306
     username: *********
     password: ************
     tables: data_entry_test.debeziumOfflineClusterInfo,data_entry_test.debeziumRealtimeClusterInfo
     server-id: 5400-5404
     server-time-zone: Asia/Shanghai

   sink:
     type: doris
     fenodes: ip:8030
     username: ***********
     password: *************

   route:
     - source-table: data_entry_test.debeziumOfflineClusterInfo
       sink-table: optics.debeziumOfflineClusterInfo
     - source-table: data_entry_test.debeziumRealtimeClusterInfo
       sink-table: optics.debeziumRealtimeClusterInfo


   pipeline:
     name: Sync MySQL Database to Doris
     parallelism: 2

mysql-to-starrocks.yaml

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
 type: mysql
 hostname: ip
 port: 3306
 username: *********
 password: **********
 tables: data_entry_test.debeziumOfflineClusterInfo,data_entry_test.debeziumRealtimeClusterInfo
 server-id: 5400-5404
 server-time-zone: Asia/Shanghai

sink:
  type: starrocks
  name: StarRocks Sink
  jdbc-url: jdbc:mysql://ip:9030
  load-url: ip:8030
  username: ****************
  password: ****************
route:
  - source-table: data_entry_test.debeziumOfflineClusterInfo
    sink-table: dd_test_starrocks.debeziumOfflineClusterInfo
  - source-table: data_entry_test.debeziumRealtimeClusterInfo
    sink-table: dd_test_starrocks.debeziumRealtimeClusterInfo
pipeline:
   name: MySQL to StarRocks Pipeline
   parallelism: 6

六、启动flink和flink cdc

启动flink

./start-cluster.sh

启动flink cdc

/data/src/flink/flink-cdc-3.3.0/bin/flink-cdc.sh
/data/src/flink/flink-cdc-3.3.0/conf/mysql-to-starrocks.yaml

flink web ui查看任务
在这里插入图片描述

七、查看flink cdc任务同步日志

2025-02-18 13:48:49,973 INFO  com.starrocks.connector.flink.catalog.StarRocksCatalog       [] - Success to create table dd_test_starrocks.dd_test_starrocks, sql: CREATE TABLE IF NOT EXISTS dd_test_starrocks.debeziumOfflineClusterInfo (
id VARCHAR(21) NOT NULL,
servername VARCHAR(6168) NOT NULL,
connectorname VARCHAR(6168) NOT NULL,
databasename VARCHAR(6168) NOT NULL,
url VARCHAR(6168) NOT NULL,
topicname VARCHAR(6168) NOT NULL,
clustername VARCHAR(6168) NOT NULL
) PRIMARY KEY (id)
DISTRIBUTED BY HASH (id);
2025-02-18 14:04:25,298 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Flink CDC Event Source: mysql -> SchemaOperator -> PrePartition (1/2)#0 (2069f3b2a289abd02012736f795a34b7_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from INITIALIZING to RUNNING.
2025-02-18 14:04:25,333 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Flink CDC Event Source: mysql -> SchemaOperator -> PrePartition (2/2)#0 (2069f3b2a289abd02012736f795a34b7_cbc357ccb763df2852fee8c4fc7d55f2_1_0) switched from INITIALIZING to RUNNING.
2025-02-18 14:09:35,729 INFO  com.starrocks.data.load.stream.DefaultStreamLoader           [] - Stream load completed, label : flink-84c2fdac-3341-4b5b-8bf1-3946098c0a97, database : dd_test_starrocks, table : debeziumOfflineClusterInfo, body : {
    "Status": "OK",
    "Message": "",
    "Label": "flink-84c2fdac-3341-4b5b-8bf1-3946098c0a97",
    "TxnId": 108875857,
    "LoadBytes": 133959,
    "StreamLoadPlanTimeMs": 0,
    "ReceivedDataTimeMs": 0
}

starrocks_209">八、查看mysql表和starrocks

mysql表

-- data_entry_test.debeziumOfflineClusterInfo definition

CREATE TABLE `debeziumOfflineClusterInfo` (
  `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'primary key',
  `servername` varchar(2056) NOT NULL COMMENT 'connector标识名',
  `connectorname` varchar(2056) NOT NULL COMMENT 'connector名称',
  `databasename` varchar(2056) NOT NULL COMMENT '数据库名',
  `url` varchar(2056) NOT NULL COMMENT '数据库名',
  `topicname` varchar(2056) NOT NULL COMMENT 'topic名称',
  `clustername` varchar(2056) NOT NULL COMMENT '集群名称',
  `database_server_id` varchar(256) NOT NULL COMMENT '集群名称',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=765 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

starrocks

-- dd_test_starrocks.debeziumOfflineClusterInfo definition

CREATE TABLE `debeziumOfflineClusterInfo` (
  `id` varchar(21) NOT NULL COMMENT "",
  `servername` varchar(6168) NOT NULL COMMENT "",
  `connectorname` varchar(6168) NOT NULL COMMENT "",
  `databasename` varchar(6168) NOT NULL COMMENT "",
  `url` varchar(6168) NOT NULL COMMENT "",
  `topicname` varchar(6168) NOT NULL COMMENT "",
  `clustername` varchar(6168) NOT NULL COMMENT ""
) ENGINE=OLAP 
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`)
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "false",
"compression" = "LZ4"
);

如上所示,成功在starrocks表中创建了表,并完成了历史数据和增量数据的同步

九、flink cdc技术生产环境应用

  • 阿里云基于 Flink CDC 的现代数据栈云上实践

细粒度变更策略控制:

  • 支持新增表、新增列、修改列名、修改列定义、删除列、删除表和清空表等操作
    在这里插入图片描述

当上游数据库新增表时,CDC YAML 能够自动识别并同步这些表的数据,而无需重新配置作业。此功能分为两种情况:

  • 历史数据同步:通过开启 scan.newly-added-table.enabled 选项,并通过 savepoint 重启作业来读取新增表的历史数据。
  • 增量数据同步:只需开启 scan.binlog.newly-added-table.enabled 选项,自动同步新增表的增量数据。

在这里插入图片描述


http://www.niftyadmin.cn/n/5863130.html

相关文章

MT7628基于原厂的SDK包, 修改ra1网卡的MAC方法。

1、在/etc/config/wireless文件添加多个WIFI网卡的方法。 2、修改WIFI驱动,在src/embedded/ap/ap.c文件里面,从系统文件信息来修改ra1网卡的MAC内容,添加红色部分源代码。 RTMP_IO_WRITE32(pAd, RMAC_RMACDR, Value); if (idx > 0) …

edge浏览器将书签栏顶部显示

追求效果,感觉有点丑,但总归方便多了 操作路径:设置-外观-显示收藏夹栏-始终

uniapp 中使用天地图,安卓端、h5

背景:项目需要将高德地图换成天地图,pc端已经更换,但app端用uniapp写的,就有点茫然了,毕竟uniapp官方给出的地图组件也不支持啊,网上找吧,也没什么例子,算了,自己写吧。 …

图论 之 弗洛伊德算法求解全源最短路径

文章目录 题目1334.阈值距离内邻居最少的城市 Floyd算法适合用于求解多源的最短路径的问题,相比之下,Dijkstra算法适合用于求解单源的最短路径的问题,并且,当边的权值只有1的时候,我们还能使用BFS求解最短路径的问题 …

Uniapp 开发中遇到的坑与注意事项:全面指南

文章目录 1. 引言Uniapp 简介开发中的常见问题本文的目标与结构 2. 环境配置与项目初始化环境配置问题解决方案 项目初始化注意事项解决方案 常见错误与解决方案 3. 页面与组件开发页面生命周期注意事项示例代码 组件通信与复用注意事项示例代码 样式与布局问题注意事项示例代码…

tcpdump 用法示例

server.py 源码: import socket import sys# 这里创建了一个UDP套接字。socket.AF_INET指定了IPv4地址族,socket.SOCK_DGRAM指定了这个套接字是UDP协议的。 sock socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # 这里定义了服务器将要监听的地址和…

Docker下的Elastic search

一、安装 (一)Elastic search 1.创建配置文件 :我是在win系统中,创建文件【G:\dockermount\es\elasticsearch.yml】 添加【http.host: 0.0.0.0】 2. 拉取镜像:docker pull elasticsearch 3. 创建容器(注意我挂载的…

游戏引擎学习第118天

仓库:https://gitee.com/mrxiao_com/2d_game_3 优化工作概述 这次我们正在进行一些非常有趣的工作,主要是对游戏进行优化。这是首次进行优化,我们正在将一个常规的标量C代码例程转换为内建指令,以便利用AIX 64位处理器的SIMD指令集进行加速…