Seatunnel 是如何將數據行中的元數據信息提取并轉換為普通字段的?
描述
Metadata 轉換插件用于將數據行中的元數據信息提取并轉換為普通字段,方便后續處理和分析。

核心功能:
- 將元數據(如數據庫名、表名、行類型等)提取為可見字段
- 支持自定義輸出字段名稱
- 不改變原有數據字段,只是新增元數據字段
典型應用場景:
- CDC 數據同步時需要記錄數據來源(庫名、表名)
- 需要追蹤數據變更類型(INSERT、UPDATE、DELETE)
- 需要記錄數據的事件時間和延遲信息
- 多表合并時需要標識數據來源
支持的元數據字段
元數據Key | 輸出類型 | 說明 | 數據來源 |
Database | string | 數據所屬的數據庫名稱 | 所有連接器 |
Table | string | 數據所屬的表名稱 | 所有連接器 |
RowKind | string | 行的變更類型,值為:+I(插入)、-U(更新前)、+U(更新后)、-D(刪除) | 所有連接器 |
EventTime | long | 數據變更的事件時間戳(毫秒) | CDC 連接器 |
Delay | long | 數據采集延遲時間(毫秒),即數據抽取時間與數據庫變更時間的差值 | CDC 連接器 |
Partition | string | 數據所屬的分區信息,多個分區字段使用逗號分隔 | 支持分區的連接器 |
重要說明:
- 元數據字段區分大小寫:配置時必須嚴格按照上表中的 Key 名稱(如 Database、Table、RowKind 等)
- CDC 專有字段:EventTime 和 Delay 僅在使用 CDC 連接器時有效(TiDB-CDC 除外)
配置選項
參數名 | 類型 | 是否必填 | 默認值 | 說明 |
metadata_fields | map | 否 | 空映射 | 元數據字段與輸出字段的映射關系,格式為 |
(1) metadata_fields [map]
定義元數據字段到輸出字段的映射關系。
配置格式:
metadata_fields {
<元數據Key> = <輸出字段名>
<元數據Key> = <輸出字段名>
...
}配置示例:
metadata_fields {
Database = source_db # 將數據庫名映射到 source_db 字段
Table = source_table # 將表名映射到 source_table 字段
RowKind = op_type # 將行類型映射到 op_type 字段
EventTime = event_ts # 將事件時間映射到 event_ts 字段
Delay = sync_delay # 將延遲時間映射到 sync_delay 字段
Partition = partition_info # 將分區信息映射到 partition_info 字段
}注意事項:
- 左側必須是支持的元數據 Key(見上表),且嚴格區分大小寫
- 右側是自定義的輸出字段名,不能與原有字段重名
- 可以只選擇需要的元數據字段,不必全部配置
完整示例
(1) 示例 1:MySQL CDC 數據同步,提取所有元數據
從 MySQL 數據庫同步數據,并提取所有可用的元數據信息。
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
MySQL-CDC {
plugin_output = "mysql_cdc_source"
server-id = 5652
username = "root"
password = "your_password"
table-names = ["mydb.users"]
url = "jdbc:mysql://localhost:3306/mydb"
}
}
transform {
Metadata {
plugin_input = "mysql_cdc_source"
plugin_output = "metadata_added"
metadata_fields {
Database = source_database # 提取數據庫名
Table = source_table # 提取表名
RowKind = change_type # 提取變更類型
EventTime = event_timestamp # 提取事件時間
Delay = sync_delay_ms # 提取同步延遲
}
}
}
sink {
Console {
plugin_input = "metadata_added"
}
}輸入數據示例:
原始數據行(來自 mydb.users 表):
id=1, name="張三", age=25
RowKind: +I (INSERT)輸出數據示例:
轉換后的數據行:
id=1, name="張三", age=25, source_database="mydb", source_table="users",
change_type="+I", event_timestamp=1699000000000, sync_delay_ms=100(2) 示例 2:只提取部分元數據
只提取數據來源信息(庫名和表名),用于多表合并場景。
env {
parallelism = 1
job.mode = "STREAMING"
}
source {
MySQL-CDC {
plugin_output = "multi_table_source"
server-id = 5652
username = "root"
password = "your_password"
table-names = ["db1.orders", "db2.orders"]
url = "jdbc:mysql://localhost:3306"
}
}
transform {
Metadata {
plugin_input = "multi_table_source"
plugin_output = "with_source_info"
metadata_fields {
Database = db_name
Table = table_name
}
}
}
sink {
Jdbc {
plugin_input = "with_source_info"
url = "jdbc:mysql://localhost:3306/target_db"
table = "merged_orders"
# 目標表會包含 db_name 和 table_name 字段,用于標識數據來源
}
}



























