Flink + YARN + Gitlab 自動提交代碼全流程詳解
在大數據實時計算領域,Apache Flink 憑借其流批一體、低延遲、高吞吐的特性,成為企業級實時計算的主流選擇。FlinkSQL 作為 Flink 的關系型 API,降低了實時開發的門檻,讓開發者通過 SQL 即可完成復雜流處理邏輯。而 YARN 作為 Hadoop 生態的資源調度平臺,為 Flink 作業提供了穩定的資源管理與隔離能力。Gitlab 則作為代碼托管與 CI/CD 平臺,實現了代碼版本控制與自動化流程的串聯。
本文將詳細介紹 Flink + FlinkSQL + YARN + Gitlab 的自動提交代碼全流程,涵蓋環境準備、代碼管理、作業開發、CI/CD 流程設計、自動提交與監控等核心環節,幫助企業構建實時計算的自動化開發與部署體系。

一、環境準備
1. 組件版本選擇
為確保各組件兼容性,推薦以下版本組合(基于企業級穩定實踐):
組件 | 版本 | 說明 |
Hadoop | 3.3.1 | 包含 YARN 資源調度器 |
Flink | 1.16.0 | 支持 FlinkSQL 與 YARN 集成 |
Gitlab | 14.9.0 | 代碼托管與 CI/CD |
Java | 1.8 | Flink 運行基礎環境 |
Maven | 3.8.6 | 項目構建工具 |
2. 環境安裝與配置
(1) Hadoop & YARN 集群搭建
假設已部署 Hadoop 集群(需包含 HDFS 和 YARN),核心配置如下:
? core-site.xml(HDFS 配置):
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://namenode:8020</value>
</property>
</configuration>? yarn-site.xml(YARN 配置):
<configuration>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>resourcemanager</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>啟動 YARN 后,可通過 http://<resourcemanager>:8088 訪問 YARN Web UI,確認集群狀態正常。
(2) Flink on YARN 配置
下載 Flink 1.16.0 二進制包并解壓,修改 conf/flink-conf.yaml:
# Flink on YARN 核心配置
jobmanager.rpc.address: localhost
rest.port: 8081
# YARN 隊列配置(需與 YARN 隊列名稱一致)
yarn.application.queue: flink_queue
# 狀態后端(推薦使用 RocksDB)
state.backend: rocksdb
# Checkpoint 存儲(HDFS 路徑)
state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints將 Hadoop 配置文件(core-site.xml、hdfs-site.xml、yarn-site.xml)軟鏈至 Flink 的 conf 目錄:
ln -s $HADOOP_HOME/etc/hadoop/core-site.xml $FLINK_HOME/conf/
ln -s $HADOOP_HOME/etc/hadoop/hdfs-site.xml $FLINK_HOME/conf/
ln -s $HADOOP_HOME/etc/hadoop/yarn-site.xml $FLINK_HOME/conf/(3) Gitlab 部署與基礎配置
? 部署 Gitlab:可通過 Docker 快速部署(推薦使用 Gitlab 官方鏡像):
docker run -d --name gitlab \
-p 8080:80 -p 2222:22 \
-v /srv/gitlab/config:/etc/gitlab \
-v /srv/gitlab/logs:/var/log/gitlab \
-v /srv/gitlab/data:/var/opt/gitlab \
gitlab/gitlab-ce:14.9.0-ce.0訪問 http://<gitlab-ip>:8080,初始化管理員密碼后創建項目(如 flink-realtime)。
? 配置 SSH 密鑰:本地生成 SSH 密鑰(ssh-keygen -t rsa),將公鑰(~/.ssh/id_rsa.pub)添加到 Gitlab 用戶設置中,確保代碼可免密推送。
二、Gitlab 代碼管理規范
1. 項目結構設計
基于 Maven 構建Flink作業,推薦項目結構如下:
flink-realtime/
├── src/
│ ├── main/
│ │ ├── java/ # Java/Scala 代碼(如 UDF、自定義 Source/Sink)
│ │ ├── resources/ # 配置文件與 SQL 腳本
│ │ │ ├── sql/ # FlinkSQL 腳本(如 user_behavior.sql)
│ │ │ ├── application.properties # 作業配置(并行度、Kafka 地址等)
│ │ │ └── log4j2.xml # 日志配置
│ │ └── scala/ # Scala 代碼(可選)
│ └── test/ # 單元測試
├── .gitlab-ci.yml # Gitlab CI/CD 配置文件
├── pom.xml # Maven 依賴配置
└── README.md # 項目說明文檔2. 分支管理策略
采用 Git Flow 分支模型,核心分支如下:
分支類型 | 名稱 | 用途 | 合并目標 |
主分支 | master | 生產環境代碼,僅允許 CI/CD 自動更新 | 無 |
開發分支 | develop | 開發環境集成分支 | master |
功能分支 | feature/xxx | 新功能開發(如 feature/user_behavior) | develop |
修復分支 | hotfix/xxx | 生產問題修復 | master/develop |
3. 代碼提交規范
為便于 CI/CD 流程追蹤,提交信息需遵循以下格式:
<type>(<scope>): <description>
# 示例
feat(sql): 新增用戶行為實時統計SQL
fix(config): 修復Kafka消費組配置錯誤
docs(ci): 更新Gitlab CI部署文檔- type:類型(feat 新功能、fix 修復、docs 文檔、style 格式、refactor 重構等)
- scope:影響范圍(如 sql、config、ci)
- description:簡潔描述(不超過50字符)
三、Flink 作業開發(以 FlinkSQL 為核心)
1. 依賴配置(pom.xml)
核心依賴包括 Flink 核心、FlinkSQL、連接器(如 Kafka、MySQL)等:
<dependencies>
<!-- Flink 核心 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.16.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.16.0</version>
<scope>provided</scope>
</dependency>
<!-- FlinkSQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.16.0</version>
<scope>provided</scope>
</dependency>
<!-- Kafka 連接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.16.0</version>
</dependency>
<!-- MySQL 連接器(用于結果寫入) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.16.0</version>
</dependency>
</dependencies>2. FlinkSQL 腳本開發
以“用戶行為實時統計”為例,開發 FlinkSQL 腳本(resources/sql/user_behavior.sql):
-- 1. 創建 Kafka 數據源表(用戶行為日志)
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
behavior STRING, -- 行為類型:pv(瀏覽)、buy(購買)、cart(加購)、fav(收藏)
ts TIMESTAMP(3)
) WITH (
'connector'='kafka',
'topic'='user_behavior',
'properties.bootstrap.servers'='kafka1:9092,kafka2:9092',
'properties.group.id'='flink_consumer_group',
'format'='json',
'scan.startup.mode'='latest-offset'
);
-- 2. 創建 MySQL 結果表(每小時行為統計)
CREATE TABLE behavior_hourly_stats (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
behavior STRING,
count BIGINT,
PRIMARY KEY (window_start, behavior) NOT ENFORCED
) WITH (
'connector'='jdbc',
'url'='jdbc:mysql://mysql:3306/realtime_stats',
'table-name'='behavior_hourly_stats',
'username'='flink',
'password'='flink123'
);
-- 3. 執行統計邏輯(每小時窗口計數)
INSERT INTO behavior_hourly_stats
SELECT
TUMBLE_START(ts, INTERVAL'1'HOUR) AS window_start,
TUMBLE_END(ts, INTERVAL'1'HOUR) AS window_end,
behavior,
COUNT(*) AS count
FROM user_behavior
GROUPBY
TUMBLE(ts, INTERVAL'1'HOUR),
behavior;3. 作業主程序開發
通過 Java 代碼加載 SQL 腳本并執行,實現作業提交(src/main/java/com/example/FlinkSQLJob.java):
package com.example;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import java.io.File;
publicclassFlinkSQLJob {
publicstaticvoidmain(String[] args)throws Exception {
// 1. 創建流執行環境(使用 YARN 模式)
finalStreamExecutionEnvironmentenv= StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4); // 設置全局并行度
// 2. 創建 TableEnvironment(Blink Planner)
EnvironmentSettingssettings= EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
TableEnvironmenttableEnv= TableEnvironment.create(settings);
// 3. 加載 SQL 腳本文件
StringsqlPath=newFile("resources/sql/user_behavior.sql").getAbsolutePath();
StringsqlScript=newString(java.nio.file.Files.readAllBytes(java.nio.file.Paths.get(sqlPath)));
// 4. 執行 SQL 腳本(按語句分割并逐條執行)
String[] sqlStatements = sqlScript.split(";(?=(?:[^']*'[^']*')*[^']*$)");
for (String sql : sqlStatements) {
if (!sql.trim().isEmpty()) {
TableResultresult= tableEnv.executeSql(sql.trim());
System.out.println("SQL executed: " + sql.trim());
}
}
// 5. 提交作業(FlinkSQL 的 INSERT INTO 會自動觸發執行)
env.execute("FlinkSQL User Behavior Hourly Stats");
}
}4. 作業配置(application.properties)
將動態配置(如 Kafka 地址、并行度)提取到配置文件中,避免硬編碼:
# 作業名稱
job.name=flink_sql_user_behavior
# 并行度
job.parallelism=4
# Kafka 配置
kafka.bootstrap.servers=kafka1:9092,kafka2:9092
kafka.topic=user_behavior
kafka.group.id=flink_consumer_group
# MySQL 配置
mysql.url=jdbc:mysql://mysql:3306/realtime_stats
mysql.username=flink
mysql.password=flink123
# Checkpoint 配置
checkpoint.interval=60000 # 1分鐘一次 Checkpoint
checkpoint.timeout=300000 # Checkpoint 超時時間5分鐘四、YARN 資源調度與作業提交
1. Flink on YARN 模式選擇
Flink on YARN 支持三種模式,需根據場景選擇:
模式 | 特點 | 適用場景 |
Session Mode | 預啟動 YARN Application,共享 JobManager | 短作業、低資源消耗場景 |
Per-Job Mode | 每個作業獨立啟動 YARN Application | 作業間資源隔離要求高 |
Application Mode | 推薦:作業主程序在 YARN 中執行,客戶端僅提交 | 生產環境主流模式 |
本文以 Application Mode 為例,該模式下作業主邏輯在 YARN 的 Application Master 中運行,客戶端只需提交 JAR 包,避免客戶端資源占用。
2. 手動提交作業到 YARN
通過 flink run -yarn 命令提交作業,核心參數如下:
flink run -t yarn-application \
-Dyarn.application.name=flink_sql_user_behavior \
-Dyarn.application.queue=flink_queue \
-Dparallelism.default=4 \
-Djobmanager.memory.process.size=1600m \
-Dtaskmanager.memory.process.size=1728m \
-Dtaskmanager.numberOfTaskSlots=4 \
-c com.example.FlinkSQLJob \
/path/to/flink-realtime-1.0-SNAPSHOT.jar參數說明:
- -t yarn-application:指定 Application Mode
- -Dyarn.application.name:YARN 應用名稱
- -Dyarn.application.queue:YARN 隊列(需與 YARN 配置一致)
- -Dparallelism.default:默認并行度
- -c:主程序全限定類名
提交后,可通過 YARN Web UI(http://<resourcemanager>:8088)查看作業狀態,點擊“Tracking UI”進入 Flink Web UI 監控作業指標。
五、Gitlab CI/CD 自動提交流程設計
1. Gitlab CI/CD 原理
Gitlab CI/CD 通過 .gitlab-ci.yml 定義流水線(Pipeline),流水線包含多個階段(Stage),每個階段包含多個作業(Job)。當代碼提交/合并到指定分支時,Gitlab Runner 自動執行流水線,完成構建、測試、部署等流程。
2. Gitlab Runner 配置
(1) 安裝 Gitlab Runner
在可訪問 YARN 集群的節點上安裝 Gitlab Runner(以 Linux 為例):
# 添加 Gitlab Runner 官方倉庫
curl -L https://packages.gitlab.com/install/repositories/runner/gitlab-runner/script.rpm.sh | sudo bash
# 安裝 Runner
sudo yum install -y gitlab-runner
# 注冊 Runner(需從 Gitlab 項目獲取注冊 URL 和 Token)
sudo gitlab-runner register注冊時需配置:
- Gitlab instance URL:Gitlab 地址(如 http://<gitlab-ip>:8080)
- Registration token:從 Gitlab 項目 Settings -> CI/CD -> Runners 獲取
- Executor type:選擇 shell(需確保 Runner 節點已安裝 Java、Maven、Flink、Hadoop 客戶端)
(2) Runner 權限配置
確保 Runner 用戶(默認 gitlab-runner)有權限訪問 HDFS 和 YARN:
# 將 gitlab-runner 用戶加入 hadoop 用戶組
sudo usermod -aG hadoop gitlab-runner
# 配置 HDFS 代理用戶(在 core-site.xml 中添加)
<property>
<name>hadoop.proxyuser.gitlab-runner.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.gitlab-runner.hosts</name>
<value>*</value>
</property>重啟 HDFS 和 YARN 使代理用戶配置生效。
3. .gitlab-ci.yml 流水線配置
在項目根目錄創建 .gitlab-ci.yml,定義以下階段:
階段 | 作用 | 作業示例 |
build | Maven 打包生成 JAR | build_job |
test | 單元測試、SQL 語法校驗 | test_job |
deploy | 提交作業到 YARN(生產/開發) | deploy_prod_job |
完整配置示例:
# 定義流水線階段
stages:
-build
-test
-deploy
# 全局變量(避免硬編碼)
variables:
MAVEN_OPTS:"-Dmaven.repo.local=$CI_PROJECT_DIR/.m2/repository"
FLINK_HOME:"/opt/flink-1.16.0"
HADOOP_HOME:"/opt/hadoop-3.3.1"
YARN_QUEUE:"flink_queue"
# 緩存 Maven 依賴(加速構建)
cache:
paths:
-.m2/repository
# 1. 構建階段:Maven 打包
build_job:
stage:build
script:
-echo"Building Flink job..."
-mvncleanpackage-DskipTests
artifacts:
paths:
-target/*.jar# 保存 JAR 包供后續階段使用
expire_in:1hour# 1小時后過期
# 2. 測試階段:單元測試 + SQL 語法校驗
test_job:
stage:test
script:
-echo"Running unit tests..."
-mvntest
-echo"Validating FlinkSQL syntax..."
# 使用 Flink SQL Parser 校驗語法(需提前編寫校驗腳本)
-bashscripts/validate_sql.shresources/sql/user_behavior.sql
dependencies:
-build_job# 依賴構建階段的 JAR 包
# 3. 部署階段:提交到 YARN(僅 master 分支觸發)
deploy_prod_job:
stage:deploy
script:
-echo"Deploying to YARN production queue..."
# 從構建產物中獲取 JAR 包名稱
-JAR_FILE=$(findtarget-name"*.jar"|head-n1)
-echo"JAR file: $JAR_FILE"
# 提交作業到 YARN(Application Mode)
-$FLINK_HOME/bin/flinkrun-tyarn-application\
-Dyarn.application.name=flink_sql_user_behavior\
-Dyarn.application.queue=$YARN_QUEUE\
-Dparallelism.default=4\
-Djobmanager.memory.process.size=1600m\
-Dtaskmanager.memory.process.size=1728m\
-Dtaskmanager.numberOfTaskSlots=4\
-ccom.example.FlinkSQLJob\
$JAR_FILE
dependencies:
-build_job
only:
-master# 僅 master 分支提交時觸發
when:manual # 手動觸發(可選,避免誤部署)4. SQL 語法校驗腳本
為避免 SQL 語法錯誤導致作業提交失敗,可編寫校驗腳本(scripts/validate_sql.sh):
#!/bin/bash
SQL_FILE=$1
if [ ! -f "$SQL_FILE" ]; then
echo"Error: SQL file $SQL_FILE not found."
exit 1
fi
# 使用 Flink 內置的 SQL Parser 校驗語法(需 Flink 環境變量)
$FLINK_HOME/bin/sql-client.sh -f "$SQL_FILE" -d
if [ $? -eq 0 ]; then
echo"FlinkSQL syntax validation passed."
else
echo"Error: FlinkSQL syntax validation failed."
exit 1
fi賦予腳本執行權限:chmod +x scripts/validate_sql.sh。
六、自動提交全流程實踐
1. 開發與提交代碼
創建功能分支:從 develop 分支切出功能分支:
git checkout -b feature/user_behavior_stat develop開發代碼:編寫 FlinkSQL 腳本、Java 主程序及配置文件,本地測試通過后提交:
git add .
git commit -m "feat(sql): 新增用戶行為實時統計SQL"
git push origin feature/user_behavior_stat提交 Merge Request:在 Gitlab 上創建從 feature/user_behavior_stat 到 develop 的 Merge Request(MR),觸發流水線自動執行 build 和 test 階段。
2. 流水線執行過程
- 構建階段:Maven 自動編譯打包,生成 flink-realtime-1.0-SNAPSHOT.jar,并保存為流水線產物。
- 測試階段:執行單元測試(如 UDF 測試)和 SQL 語法校驗,若測試失敗,流水線終止并通知開發者。
- 合并到 develop:MR 審核通過后,合并到 develop 分支,此時不觸發部署。
- 發布到生產:將 develop 分支合并到 master 分支,觸發 deploy_prod_job,自動提交作業到 YARN 生產隊列。
3. 作業狀態監控
- YARN 監控:通過 YARN Web UI 查看作業狀態(運行中、成功、失敗),點擊“Logs”查看 YARN 日志。
- Flink 監控:點擊作業的“Tracking UI”進入 Flink Web UI,監控 Checkpoint、反壓、吞吐量等指標。
- 日志收集:可將 Flink 作業日志輸出到 HDFS 或 ELK 集群,便于問題排查。
七、常見問題與優化
1. 常見問題排查
(1) 問題1:作業提交到 YARN 失敗,報“YARN application not found”
原因:Flink 與 Hadoop 版本不兼容,或 YARN 配置文件未正確軟鏈。解決:確保 Flink 版本支持 Hadoop 3.x,檢查 $FLINK_HOME/conf 下是否有 yarn-site.xml 等配置文件。
(2) 問題2:SQL 語法校驗通過,但作業運行時報“Table not found”
原因:SQL 腳本中表名大小寫與實際不一致,或連接器配置錯誤(如 Kafka topic 不存在)。解決:檢查 SQL 表名大小寫(FlinkSQL 默認不區分大小寫,但存儲系統可能區分),確認 Kafka/MySQL 連接參數。
(3) 問題3:Gitlab Runner 執行部署時報“Permission denied”
原因:Runner 用戶無權限訪問 HDFS 或提交 YARN 作業。解決:檢查 Hadoop 代理用戶配置,確保 gitlab-runner 用戶屬于 hadoop 用戶組。
2. 流程優化建議
- 多環境部署:通過 Gitlab 變量區分開發/測試/生產環境(如 $YARN_QUEUE_DEV、$YARN_QUEUE_PROD),實現一套代碼多環境部署。
- 版本回滾:在 .gitlab-ci.yml 中添加回滾作業,通過 yarn application -kill <app_id> 停止舊作業,再提交指定版本的 JAR 包。
- 通知機制:集成釘釘/飛書機器人,流水線成功/失敗時發送消息通知開發團隊。
- 資源動態調整:根據作業負載,通過 Flink REST API 動態調整并行度或資源,提升資源利用率。
八、總結
本文詳細介紹了 Flink + FlinkSQL + YARN + Gitlab 的自動提交代碼全流程,從環境準備、代碼管理、作業開發到 CI/CD 流程設計,覆蓋了實時計算自動化部署的核心環節。通過該流程,企業可實現:
- 開發效率提升:代碼提交后自動構建、測試、部署,減少人工操作。
- 質量管控:通過單元測試、SQL 校驗等環節,降低作業上線風險。
- 資源隔離:YARN 提供多隊列資源管理,實現作業間資源隔離與公平調度。
未來可進一步擴展與監控系統集成(如 Prometheus + Grafana)、實現作業自動擴縮容,構建更完善的實時計算運維體系。


























