精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

SpringBoot整合Flink CDC,實時追蹤數據變動,無縫同步至Redis

開發 前端
具體來說,Flink CDC的應用場景包括但不限于實時數據倉庫更新、實時數據同步和遷移、實時數據處理等。它還可以確保數據一致性,并在數據發生變更時能夠準確地捕獲和處理。

環境:SpringBoot2.7.16 + Flink 1.19.0 + JDK21

1. 簡介

Flink CDC(Flink Change Data Capture)是基于數據庫的日志CDC技術,實現了全增量一體化讀取的數據集成框架。它搭配Flink計算框架,能夠高效實現海量數據的實時集成。Flink CDC的核心功能在于實時地監視數據庫或數據流中發生的數據變動,并將這些變動抽取出來,以便進一步的處理和分析。通過使用Flink CDC,用戶可以輕松地構建實時數據管道,對數據變動進行實時響應和處理,為實時分析、實時報表和實時決策等場景提供強大的支持。

具體來說,Flink CDC的應用場景包括但不限于實時數據倉庫更新、實時數據同步和遷移、實時數據處理等。它還可以確保數據一致性,并在數據發生變更時能夠準確地捕獲和處理。此外,Flink CDC支持與多種數據源進行集成,如MySQL、PostgreSQL、Oracle等,并提供了相應的連接器,方便數據的捕獲和處理。

接下來將詳細的介紹關于MySQL CDC的使用。MySQL CDC 連接器允許從 MySQL 數據庫讀取快照數據和增量數據。

支持的數據庫

Connector

Database

Driver

mysql-cdc

  • MySQL:5.6,5.7,8.0.x
  • RDS MYSQL: 5.6,5.7,8.0.x
  • PolarDB MySQL: 5.6,5.7,8.0.x
  • Aurora MySQL 5.6,5.7,8.0.x
  • MariaDB: 10.x
  • PolarDB X: 2.0.1

JDBC Driver 8.0.27

2. 實戰案例

2.1 MySQL開啟Binlog

在MySQL的配置文件中(如Linux的/etc/my.cnf或Windows的\my.ini),需要在[mysqld]部分設置相關參數以開啟binlog功能,如下:

[mysqld]
server-id=1
# 格式,行級格式
binlog-format=Row
# binlog 日志文件的前綴
log-bin=mysql-bin
# 指定哪些數據庫需要記錄二進制日志
binlog_do_db=testjpa

除了開啟binlog功能外,Flink CDC還需要其他配置和權限來確保能夠正常連接到MySQL并讀取數據。例如,需要授予Flink CDC連接MySQL的用戶必要的權限,包括SELECT、REPLICATION SLAVE、REPLICATION CLIENT、SHOW VIEW等。這些權限是Flink CDC讀取數據和元數據所必需的。

查看是否開啟了binlog功能

mysql> SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+

以上就對mysql相關的配置完成了。

2.2 依賴管理

<properties>
  <flink.version>1.19.0</flink.version>
</properties>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-base</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-sql-connector-mysql-cdc</artifactId>
  <version>3.0.1</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-runtime</artifactId>
  <version>${flink.version}</version>
</dependency>

2.3 代碼實現

@Component
public class MonitorMySQLCDC implements InitializingBean {


  // 該隊列專門用來臨時保存變化的數據(實際生產環境,你應該使用MQ相關的產品)
  public static final LinkedBlockingQueue<Map<String, Object>> queue = new LinkedBlockingQueue<>() ;
  
  private final StringRedisTemplate stringRedisTemplate ;
  // 保存到redis中key的前綴
  private final String PREFIX = "users:" ;
  // 數據發生變化后的sink處理
  private final CustomSink customSink ;
  public MonitorMySQLCDC(CustomSink customSink, StringRedisTemplate stringRedisTemplate) {
    this.customSink = customSink ;
    this.stringRedisTemplate = stringRedisTemplate ;
  }
  
  @Override
  public void afterPropertiesSet() throws Exception {
    // 啟動異步線程,實時處理隊列中的數據
    new Thread(() -> {
      while(true) {
        try {
          Map<String, Object> result = queue.take();
          this.doAction(result) ;
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }).start() ;
    Properties jdbcProperties = new Properties() ;
    jdbcProperties.setProperty("useSSL", "false") ;
    MySqlSource<String> source = MySqlSource.<String>builder()
        .hostname("127.0.0.1")
        .port(3306)
        // 可配置多個數據庫
        .databaseList("testjpa")
        // 可配置多個表
        .tableList("testjpa.users")
        .username("root")
        .password("123123")
        .jdbcProperties(jdbcProperties)
        // 包括schema的改變
        .includeSchemaChanges(true)
        // 反序列化設置
        // .deserializer(new StringDebeziumDeserializationSchema())
        .deserializer(new JsonDebeziumDeserializationSchema(true))
        // 啟動模式;關于啟動模式下面詳細介紹
        .startupOptions(StartupOptions.initial())
        .build() ;
    // 環境配置
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;
    // 設置 6s 的 checkpoint 間隔
    env.enableCheckpointing(6000) ;
    // 設置 source 節點的并行度為 4
    env.setParallelism(4) ;
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL")
        // 添加Sink
        .addSink(this.customSink) ;
    env.execute() ;
  }
  
  @SuppressWarnings("unchecked")
  private void doAction(Map<String, Object> result) throws Exception {
    Map<String, Object> payload = (Map<String, Object>) result.get("payload") ;
    String op = (String) payload.get("op") ;
    switch (op) {
      // 更新和插入操作
      case "u", "c" -> {
        Map<String, Object> after = (Map<String, Object>) payload.get("after") ;
        String id = after.get("id").toString();
        System.out.printf("操作:%s, ID: %s%n", op, id) ;
        stringRedisTemplate.opsForValue().set(PREFIX + id, new ObjectMapper().writeValueAsString(after)) ;
      }
      // 刪除操作
      case "d" -> {
        Map<String, Object> after = (Map<String, Object>) payload.get("before") ;
        String id = after.get("id").toString();
        stringRedisTemplate.delete(PREFIX + id) ;
      } 
    }
  }
  
}

啟動模式:

  • initial (默認):在第一次啟動時對受監視的數據庫表執行初始快照,并繼續讀取最新的 binlog。
  • earliest-offset:跳過快照階段,從可讀取的最早 binlog 位點開始讀取
  • latest-offset:首次啟動時,從不對受監視的數據庫表執行快照, 連接器僅從 binlog 的結尾處開始讀取,這意味著連接器只能讀取在連接器啟動之后的數據更改。
  • specific-offset:跳過快照階段,從指定的 binlog 位點開始讀取。位點可通過 binlog 文件名和位置指定,或者在 GTID 在集群上啟用時通過 GTID 集合指定。
  • timestamp:跳過快照階段,從指定的時間戳開始讀取 binlog 事件。

數據處理Sink

@Component
public class CustomSink extends RichSinkFunction<String> {


  private ObjectMapper mapper = new ObjectMapper();


  @Override
  public void invoke(String value, Context context) throws Exception {
    System.out.printf("數據發生變化: %s%n", value);
    TypeReference<Map<String, Object>> valueType = new TypeReference<Map<String, Object>>() {
    };
    Map<String, Object> result = mapper.readValue(value, valueType);
    Map<String, Object> payload = (Map<String, Object>) result.get("payload");
    String op = (String) payload.get("op") ;
    // 不對讀操作處理
    if (!"r".equals(op)) {
      MonitorMySQLCDC.queue.put(result);
    }
  }
}

以上就是實現通過FlinkCDC實時通過數據到Redis的所有代碼。

2.4 Web監控頁面

引入flink web依賴

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-runtime-web</artifactId>
  <version>${flink.version}</version>
</dependency>

環境配置

Configuration config = new Configuration() ;
config.set(RestOptions.PORT, 9090) ;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config) ;

web監聽9090端口。

圖片圖片

通過web控制臺你可以管理查看到更多的信息。

責任編輯:武曉燕 來源: Spring全家桶實戰案例源碼
相關推薦

2022-07-20 23:15:11

Flink數據集CDC

2025-07-10 08:46:21

ConnectSpringBoot數據

2024-10-18 11:39:55

MySQL數據檢索

2023-05-03 08:58:46

數據庫開源

2021-06-04 07:24:14

Flink CDC數據

2025-04-01 08:38:41

2013-05-13 13:49:43

大數據

2021-08-17 06:48:43

SpringbootKafkaStream

2021-07-07 23:25:18

RedisFlinkSQL

2025-04-29 08:36:28

SpringCanal數據庫

2022-01-05 18:18:01

Flink 數倉連接器

2023-09-08 10:13:30

開發技術

2024-02-01 12:32:35

MySQL數據鎖數據庫

2020-01-10 15:42:13

SpringBootRedis數據庫

2015-06-09 22:25:06

SAP大道至簡

2023-05-31 08:56:24

2025-04-25 08:34:52

2022-05-23 08:23:24

鏈路追蹤SleuthSpring

2020-06-29 07:43:12

緩存RedisSpringBoot

2025-09-26 08:46:30

點贊
收藏

51CTO技術棧公眾號

日韩在线视频不卡| 波多野结衣一二三区| 日本精品一区二区三区在线播放| 美女视频一区二区三区| 日韩在线播放一区| 26uuu国产| 日本乱码一区二区三区不卡| 久久精品在线免费观看| 久久久久久亚洲精品中文字幕| 国产一区二区在线观看免费视频| 牛牛精品在线| 久久久久久亚洲综合影院红桃| 国产精品video| www.色小姐com| 日韩深夜影院| 欧美日韩dvd在线观看| www.日本在线播放| 成人黄视频在线观看| 久久久久久久久岛国免费| 91精品国产高清久久久久久91裸体 | 日韩欧美视频网站| 九色porny在线| 久久久国产精品午夜一区ai换脸| 99re6热在线精品视频播放速度| 青青草视频在线观看免费| 狠狠色综合网| 北条麻妃在线一区二区| 亚洲av综合一区二区| 嗯用力啊快一点好舒服小柔久久| 欧美疯狂做受xxxx富婆| 亚洲精品一二三四五区| 亚洲小少妇裸体bbw| 亚洲午夜视频在线观看| 最新视频 - x88av| 97在线观看免费观看高清 | 九九99玖玖| www.国产免费| 国产一区二区网址| 成人激情电影一区二区| 一二三四区视频| 免费观看在线综合| 国产精品美女免费视频| 成人免费视频国产免费| 午夜在线一区| 日韩av日韩在线观看| 久久99精品波多结衣一区| 亚洲高清二区| 高清欧美性猛交| 久久精品国产亚洲av麻豆色欲| 欧美 日韩 国产精品免费观看| www.国产精品一二区| 青青青视频在线免费观看| 成人网18免费网站| 色av中文字幕一区| 肉色超薄丝袜脚交69xx图片 | 伊人免费在线观看高清版| 男女av一区三区二区色多| 国产不卡视频在线| 久久影视中文字幕| 日本欧美一区二区在线观看| 国产欧美日韩免费看aⅴ视频| 亚洲图片小说视频| 精品一区二区在线看| 91视频九色网站| 国产黄色av网站| 成人av午夜影院| 含羞草久久爱69一区| 色综合888| 国产精品伦一区二区三级视频| 亚洲一区二区三区精品动漫| 大地资源网3页在线观看| 亚洲一区二区欧美日韩| 精品无码国模私拍视频| 欧美性suv| 欧美日韩精品电影| 免费黄视频在线观看| 欧美自拍视频| 日韩在线视频一区| 国产一级中文字幕| 日韩av午夜在线观看| 成人免费在线网址| 午夜小视频免费| 91麻豆国产自产在线观看| 乱一区二区三区在线播放| 国产大学生校花援交在线播放| 成人欧美一区二区三区白人 | 91精品国产闺蜜国产在线闺蜜| 亚洲视频久久| 国产精品久久久久久久午夜| 99久久久久久久| 99精品视频中文字幕| 日本一区免费在线观看| 日本天码aⅴ片在线电影网站| 精品久久久久久久久久久久 | caoporm超碰国产精品| 日韩精品电影网站| 久久大胆人体| 欧美视频一区二区三区| 久久精品无码专区| 日本一区二区在线看| 欧美精品www| 中文字幕 国产| www.成人在线| 异国色恋浪漫潭| 97se综合| 精品福利一二区| 多男操一女视频| 久久久久久穴| 国产欧美丝袜| 欧美高清视频| 欧美性极品xxxx娇小| 制服.丝袜.亚洲.中文.综合懂| 精品国产乱码久久久久久果冻传媒 | 日韩精品每日更新| 国产亚洲情侣一区二区无| 视频免费一区| 狠狠色狠狠色综合日日小说| www.偷拍.com| 日本一区二区高清不卡| 欧美最顶级的aⅴ艳星| 午夜精品久久久久久久91蜜桃| 欧美经典一区二区| 你懂的av在线| 大香伊人久久精品一区二区| 久久激情视频免费观看| 国产字幕在线观看| 久久婷婷成人综合色| 一二三四视频社区在线| 51社区在线成人免费视频| 日韩中文字幕不卡视频| 欧美一区免费看| 久久色.com| 青青艹视频在线| 岛国精品一区| 久久久久久国产精品美女| 国产成人麻豆精品午夜在线 | 91视频在线观看免费| 国产乱子伦精品无码专区| 欧美9999| 欧美激情亚洲另类| 免费国产羞羞网站视频| 亚洲午夜在线视频| 中文字幕人妻一区| 亚洲一级电影| 国产伦精品一区二区三区四区视频 | 国产精品久久久久久久久久久久午夜片 | 欧洲一区在线| 欧美美女操人视频| 朝桐光av在线一区二区三区| 亚洲永久免费视频| av免费观看不卡| 一本久道久久综合狠狠爱| 久久综合久久久| 345成人影院| 亚洲欧洲中文天堂| 中文字幕乱码视频| 亚洲免费av观看| 成人一区二区三区仙踪林| 亚洲调教视频在线观看| 精品国产乱码久久久久久郑州公司 | 精品一区不卡| 成人免费直播live| 懂色av一区| 精品视频久久久久久久| 日本高清不卡码| 欧美激情在线一区二区| 国内国产精品天干天干| 欧美亚洲不卡| 久久综合狠狠综合久久综青草| 久久野战av| 欧美巨大黑人极品精男| 色婷婷综合视频| 在线视频你懂得一区二区三区| 性生交大片免费全黄| 成人一二三区视频| 麻豆传传媒久久久爱| 五月天综合网站| 国产一区二区三区高清视频| 日韩视频网站在线观看| 欧美成人黑人xx视频免费观看| 天天操天天操天天操| 欧美性感一区二区三区| 欧美日韩偷拍视频| 国产亚洲午夜高清国产拍精品| 一二三级黄色片| 一区二区福利| 视频一区二区视频| 日韩福利视频一区| 91亚洲人电影| 欧美大胆成人| 欧美精品激情在线| 午夜看片在线免费| 亚洲精品天天看| www.日韩在线观看| 欧美写真视频网站| xxxxxx国产| 亚洲人成网站色在线观看| 日韩av一二区| 粉嫩高潮美女一区二区三区 | 欧美成人在线免费视频| 青春草在线观看 | 国产精品一二三区视频| 精品久久久久久综合日本欧美| 男操女视频网站| 午夜久久久久久| 我要看黄色一级片| 亚洲国产精品黑人久久久| 男人网站在线观看| 国内外成人在线| 亚洲福利精品视频| 亚洲综合二区| 国产真实老熟女无套内射| 婷婷六月综合| 亚洲精品影院| 国产欧美亚洲精品a| 黄色99视频| 中文字幕视频精品一区二区三区| 国产精品网红直播| 不卡av影片| 992tv成人免费影院| 欧美1234区| 久精品免费视频| 乱人伦中文视频在线| 在线精品高清中文字幕| 内衣办公室在线| 精品视频中文字幕| 天天射,天天干| 欧美精品一区二区三区四区 | 裸体女人亚洲精品一区| 99riav在线| 中文字幕亚洲一区二区三区| 国产乱子伦三级在线播放| 亚洲欧美一区二区精品久久久| 全部免费毛片在线播放一个| 精品成人一区二区三区四区| 国产三区在线播放| 91精品国产入口在线| 97久久人国产精品婷婷| 欧美乱妇一区二区三区不卡视频| 青青国产在线视频| 在线一区二区三区四区五区| 99re这里只有精品在线| 在线精品观看国产| 亚洲中文字幕无码爆乳av| 在线视频你懂得一区| 中文字幕91爱爱| 欧美性高清videossexo| 一级片免费观看视频| 欧美日韩国产一级| 国产美女无遮挡永久免费| 欧美一级专区免费大片| 国产成人三级在线观看视频| 亚洲国产精品久久精品怡红院 | 色琪琪综合男人的天堂aⅴ视频| 9i精品一二三区| 另类美女黄大片| 美女网站视频在线| 欧美夜福利tv在线| 朝桐光一区二区| 91欧美精品午夜性色福利在线| av在线亚洲一区| 鬼打鬼之黄金道士1992林正英| 国产精东传媒成人av电影| 就去色蜜桃综合| 欧美oldwomenvideos| 桥本有菜av在线| 亚洲精品视频啊美女在线直播| 日韩久久一级片| 蜜臀久久久久久久| 蜜桃色一区二区三区| 久久久久亚洲综合| 老熟妇高潮一区二区三区| 亚洲综合激情小说| 好吊色在线视频| 欧美一级黄色大片| 婷婷在线观看视频| 三级精品视频久久久久| wwww在线观看免费视频| 国产精品h片在线播放| 国产高清亚洲| 免费成人av网站| 亚洲综合自拍| 欧美污视频网站| 国产美女在线观看一区| 亚洲狠狠婷婷综合久久久久图片| 中文字幕一区二区三区精华液 | 久草网站在线观看| 色噜噜久久综合| 精品欧美一区二区精品少妇| 亚洲精品视频网上网址在线观看| 久久精品视频观看| 日本久久中文字幕| h视频久久久| 亚洲一卡二卡三卡| 国产精品久久久久久久久久妞妞| 色婷婷.com| 久久久久一区二区三区四区| 免费在线观看h片| 色综合久久久久综合体| 成人激情四射网| 色偷偷偷亚洲综合网另类 | 色综合激情久久| 国产视频一区二区三区四区五区| 亚洲人成自拍网站| 国产极品人妖在线观看| 国产综合色香蕉精品| 色天下一区二区三区| 日本一本草久p| 日本午夜一本久久久综合| 天堂www中文在线资源| 综合久久国产九一剧情麻豆| 黄色污污网站在线观看| 亚洲国产成人精品女人久久久 | 国产精品免费久久久| 欧美高清视频看片在线观看| 日韩精品久久一区二区| 久久se精品一区精品二区| 美女被到爽高潮视频| 欧美日韩日本国产| 秋霞网一区二区| 国内精品久久久久久久| 中文字幕一区图| 久久久久久久9| 高清国产一区二区| 人妻久久一区二区| 欧美一区在线视频| a天堂中文在线官网在线| 成人激情视频在线播放| 91日韩视频| 91极品视频在线观看| 久久精品一二三| 成人一级免费视频| 在线电影中文日韩| av免费在线一区| 视频一区二区在线观看| 日韩电影在线一区二区三区| 精品无码国产污污污免费网站| 色网站国产精品| 国产女主播在线写真| 国产精品青草久久久久福利99| 精品精品99| 啊啊啊国产视频| 中文av字幕一区| 国产精品视频久久久久久| www.亚洲男人天堂| 精品一区视频| 欧美一级爱爱视频| 成人综合在线视频| 日韩字幕在线观看| 亚洲精品一区av在线播放| 日韩pacopacomama| 亚洲一区二区免费视频软件合集| 精品一区二区三区免费| 五月婷婷一区二区| 亚洲黄色www| 韩国三级一区| 香蕉视频在线网址| 国产成人亚洲综合a∨婷婷 | 秋霞在线午夜| 国产欧美在线一区二区| 国产日韩欧美三级| 亚洲一区二区自偷自拍| 91麻豆精品国产综合久久久久久| 中文字幕在线播放网址| 99中文字幕| 午夜一级久久| 成年人网站在线观看视频| 欧美一区二区三区在| aaa在线播放视频| 日韩国产高清一区| 国产福利一区在线| 4438国产精品一区二区| www国产精品视频| 国产主播性色av福利精品一区| 国产又大又硬又粗| 自拍偷在线精品自拍偷无码专区| 隣の若妻さん波多野结衣| 国产精品扒开腿做爽爽爽的视频| 91精品国产自产在线观看永久∴| 中文字幕一区二区三区人妻在线视频 | 天天干天天操天天爱| 美女福利精品视频| 亚洲日本三级| 超碰在线资源站| 精品久久久精品| 黄av在线播放| 精品综合在线| 黑人巨大精品欧美黑白配亚洲| 偷偷操不一样的久久| 日韩中文字幕在线视频| 噜噜噜狠狠夜夜躁精品仙踪林| 一级在线免费视频| 亚洲成人精品影院| 免费日本一区二区三区视频| 国产在线播放一区二区| 国内外成人在线| 欧美一级黄视频| 992tv在线成人免费观看| 久久久久久久久丰满| 成都免费高清电影|