新特性!可以替代 Canal 的数据同步方案——Flink-CDC

本文由 简悦 SimpRead 转码, 原文地址 www.toutiao.com

原创 2021-11-08 09:52· 尚硅谷教育

1、CDC 简介

CDC 是 Change Data Capture(变更数据获取) 的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

CDC 主要分为基于查询和基于 Binlog 两种方式,我们主要了解一下这两种之间的区别:

基于查询的 CDC 基于 Binlog 的 CDC
开源产品 Sqoop、Kafka JDBC Source Canal、Maxwell、Debezium
执行模式 Batch Streaming
是否可以捕获所有数据变化
延迟性 高延迟 低延迟
是否增加数据库压力

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。目前也已开源,开源地址: https://github.com/ververica/flink-cdc-connectors

https://p3.toutiaoimg.com/origin/pgc-image/031835eba31b4e38b39832f1fcae38ec?from=pc

2、Flink DataStream 方式应用的案例实操

  1. 在 pom.xml 中增加如下依赖
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.12.0</version>
    </dependency>


    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>


    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>


    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>


    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.49</version>
    </dependency>


    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>1.2.0</version>
    </dependency>
 
<dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.75</version>
    </dependency>
</dependencies>
 
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
  1. 编写代码
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;

import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;

import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;

import org.apache.flink.runtime.state.filesystem.FsStateBackend;

import org.apache.flink.streaming.api.CheckpointingMode;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.CheckpointConfig;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class FlinkCDC {

    public static void main(String[] args) throws Exception {

        //1.创建执行环境

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //2.Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,需要从Checkpoint或者Savepoint启动程序

        //2.1 开启Checkpoint,每隔5秒钟做一次CK

        env.enableCheckpointing(5000L);

        //2.2 指定CK的一致性语义

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        //2.3 设置任务关闭的时候保留最后一次CK数据

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //2.4 指定从CK自动重启策略

        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));

        //2.5 设置状态后端

        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));

        //2.6 设置访问HDFS的用户名

        System.setProperty("HADOOP_USER_NAME", "atguigu");

        //3.创建Flink-MySQL-CDC的Source

        //initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.

        //latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.

        //timestamp: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.

        //specific-offset: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.

        DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()

                .hostname("hadoop102")

                .port(3306)

                .username("root")

                .password("000000")

                .databaseList("gmall-flink")

                .tableList("gmall-flink.z_user_info") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式

                .debeziumProperties(properties)

                .startupOptions(StartupOptions.initial())

                .build();

        //4.使用CDC Source从MySQL读取数据

        DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);

        //5.打印数据

        mysqlDS.print();

        //6.执行任务

        env.execute();

    }

}
  1. 案例测试

1)打包并上传至 Linux

https://p3.toutiaoimg.com/origin/pgc-image/254016cff2834b789576ee8212206ed9?from=pc

2)开启 MySQL Binlog 并重启 MySQL

3)启动 Flink 集群

1
[atguigu@hadoop102 flink-standalone]$ bin/start-cluster.sh

4)启动 HDFS 集群

1
[atguigu@hadoop102 flink-standalone]$ start-dfs.sh

5)启动程序

1
[atguigu@hadoop102 flink-standalone]$ bin/flink run -c com.atguigu.FlinkCDCflink-1.0-SNAPSHOT-jar-with-dependencies.jar

6)在 MySQL 的 gmall-flink.z_user_info 表中添加、修改或者删除数据

7)给当前的 Flink 程序创建 Savepoint

1
[atguigu@hadoop102 flink-standalone]$ bin/flink savepoint JobId hdfs://hadoop102:8020/flink/save

8)关闭程序以后从 Savepoint 重启程序

1
[atguigu@hadoop102 flink-standalone]$ bin/flink run -s hdfs://hadoop102:8020/flink/save/...-c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar

3、Flink SQL 方式应用的案例实操

  1. 在 pom.xml 中增加如下依赖
1
2
3
4
5
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
  1. 代码实现
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkSQL_CDC {

    public static void main(String[] args) throws Exception {

        //1.创建执行环境
        StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv =StreamTableEnvironment.create(env);
        //2.创建Flink-MySQL-CDC的Source
        tableEnv.executeSql("CREATE TABLE user_info (" +
                "  idINT," +
                " name STRING," +
                " phone_num STRING" +
                ") WITH (" +
                " 'connector' = 'mysql-cdc'," +
                " 'hostname' = 'hadoop102'," +
                " 'port' = '3306'," +
                " 'username' = 'root'," +
                " 'password' = '000000'," +
                " 'database-name' = 'gmall-flink'," +
                " 'table-name' = 'z_user_info'" +
                ")");

        tableEnv.executeSql("select * from user_info").print();

        env.execute();

    }

}

4、自定义反序列化器

代码实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.Properties;

public class Flink_CDCWithCustomerSchema {

    public static void main(String[]args) throws Exception {

        //1.创建执行环境
        StreamExecutionEnvironmentenv = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.创建Flink-MySQL-CDC的Source
        Properties properties= new Properties();

        //initial (default):Performs an initial snapshot on the monitored database tables upon firststartup, and continue to read the latest binlog.
        //latest-offset: Never to performsnapshot on the monitored database tables upon first startup, just read fromthe end of the binlog which means only have the changes since the connector wasstarted.
        //timestamp: Never to performsnapshot on the monitored database tables upon first startup, and directly readbinlog from the specified timestamp. The consumer will traverse the binlog fromthe beginning and ignore change events whose timestamp is smaller than thespecified timestamp.
        //specific-offset: Never toperform snapshot on the monitored database tables upon first startup, anddirectly read binlog from the specified offset.
       DebeziumSourceFunction<String> mysqlSource =MySQLSource.<String>builder()
                .hostname("hadoop102")
                .port(3306)
                .username("root")
                .password("000000")
                .databaseList("gmall-flink")
                .tableList("gmall-flink.z_user_info")         //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式
                .debeziumProperties(properties)
.startupOptions(StartupOptions.initial())
                .deserializer(new DebeziumDeserializationSchema<String>(){  //自定义数据解析器
                    @Override
                    public void deserialize(SourceRecord sourceRecord,Collector<String> collector) throws Exception {

                        //获取主题信息,包含着数据库和表名 mysql_binlog_source.gmall-flink.z_user_info
                        String topic = sourceRecord.topic();
                        String[] arr =topic.split("\\.");
                        String db = arr[1];
                        String tableName= arr[2];

                        //获取操作类型 READ DELETE UPDATE CREATE
                        Envelope.Operation operation = Envelope.operationFor(sourceRecord);

                        //获取值信息并转换为Struct类型
                        Struct value = (Struct) sourceRecord.value();

                        //获取变化后的数据
                        Struct after = value.getStruct("after");

                        //创建JSON对象用于存储数据信息
                        JSONObject data = new JSONObject();
                        for (Field field : after.schema().fields()) {
                            Object o =after.get(field);
                            data.put(field.name(), o);
                        }

                        //创建JSON对象用于封装最终返回值数据信息
                        JSONObject result = new JSONObject();
                        result.put("operation", operation.toString().toLowerCase());
                        result.put("data", data);
                        result.put("database", db);
                        result.put("table", tableName);

                        //发送数据至下游
                        collector.collect(result.toJSONString());
                    }

                    @Override
                    public TypeInformation<String> getProducedType(){
                        return TypeInformation.of(String.class);
                    }
                })
                .build();

       //3.使用CDC Source从MySQL读取数据
        DataStreamSource<String> mysqlDS =env.addSource(mysqlSource);

        //4.打印数据
        mysqlDS.print();

        //5.执行任务
        env.execute();
    }
}

相关阅读:

Flink 流处理 Api 之 Sink

深度解析 Flink 内存管理

全网首发图解 Flink 内核源码

详解 Flink 组件通信——RPC 协议

详解 Flink 通讯模型——Akka 与 Actor 模型