
譯者 | 李睿
審校 | 重樓
梅西百貨公司首席數據工程師Naresh Erukulla是一位勇于迎接挑戰的數據工程師,他擅長用簡潔明了的概念驗證(POC)解決各種問題。最近,Naresh關注到了數據工程師日常工作中普遍遭遇的一個難題,并為此采取行動,為所有批處理和流數據管道設置了警報系統。當錯誤超過閾值或數據管道出現故障時,可以迅速通過電子郵件向數據工程師發送故障通知,確保問題能夠得到及時處理。
一切似乎都在順利進行中,直到他注意到一個關鍵數據集無法加載到BigQuery中。在調查了錯誤日志之后,發現一些“缺少所需數據”提示的消息。當看到用戶輸入文件中頻繁出現的原始數據問題時,他為此感到困惑。
處理數據不一致問題,特別是數據缺失或格式錯誤,會在分析和運營工作流程的后續環節引發嚴重的后果。有一個關鍵的下游報告正是建立在這些輸入數據的基礎之上。該報告在日常業務中發揮著至關重要的作用,它能夠反映出公司在多個領域內的關鍵指標表現,并且為決策制定提供了不可或缺的數據支持。在這份至關重要的報告中,所有高管級別的利益相關者都依賴這些數據來展示業績指標、討論面臨的挑戰以及規劃未來的發展路徑。
Erukulla耗費了數小時檢查源CSV文件,該文件承載了來自另一個上游應用程序的大量事務數據。準確識別并修正問題行顯得至關重要。然而,當他著手處理這些問題時,發現已經錯過截止日期,這無疑令利益相關者深感失望。Erukulla也意識到傳統數據管道的脆弱性。它們很容易出錯,而且往往需要多次人工干預來進行修復,這個過程既耗時又容易出錯。
人們是否也遇到過類似的情況?是否花費了大量時間調試數據管道,結果卻發現根本原因只是一個簡單的格式錯誤或缺少必填字段?事實上,世界各地的數據工程師每天都在努力應對這些挑戰。那么是否有可以構建能夠“自我修復”數據管道的方法?這正是Erukulla追求的目標。
自我修復數據管道的工作原理
自我修復數據管道的想法很簡單:當數據處理過程中出現錯誤時,數據管道應該自動檢測、分析和糾正錯誤,而無需人工干預。傳統上,解決這些問題需要人工干預,這既耗時又容易出錯。
雖然有多種方法可以實現這一點,,但使用人工智能代理是最好的方法,也是數據工程師在未來自我修復故障數據管道并動態自動糾正它們的方法。本文將展示如何使用像GPT-4/DeepSeek R1模型這樣的LLM來自修復數據管道的基本實現,其方法是使用LLM對失敗記錄進行分析并提出建議,并在數據管道運行的過程中應用這些修復措施。所提供的解決方案可以擴展到大型數據管道,并將擴展更多的功能。
以下介紹如何利用OpenAI API在云計算環境中使用GPT-4模型構建一個實用的管道。遵循的基本步驟如下:
- 將源文件上傳到谷歌云存儲桶(Google Cloud Storage Bucket)。如果沒有谷歌云平臺的訪問權限,則可以使用任何本地或其他云存儲。
- 創建數據模型,用于將原始數據提取到BigQuery表中,將錯誤記錄提取到錯誤表中。
- 從CSV中讀取源文件,并從輸入數據中識別干凈(Clean)數據集和無效記錄錯誤行(Error Rows)數據集。
- 將Clean數據集導入BigQuery,并使用提示將Error Rows數據集傳遞給LLM。
- 對于每個錯誤行(Error Rows),OpenAI的GPT API進行分析并提供智能產品ID分配。
- 使用Google BigQuery動態存儲和檢索產品信息。
- 基于Python的自動化無縫集成。
可以參閱Erukulla在GitHub上的完整代碼庫。
1.從云存儲讀取輸入數據
數據管道首先讀取存儲在Cloud Storage中的客戶端上傳的CSV文件,可以利用云函數(無服務器執行管道步驟)在新文件上傳到存儲桶時觸發。該函數使用谷歌云存儲庫(google-cloud-storage)讀取文件,并將其解析為Pandas DataFrame以供進一步處理。
在將數據傳遞到下一步之前,可以實施一些數據質量檢查。然而,現實世界中的數據問題是動態的,無法預測和編寫所有測試用例,這會使代碼變得復雜且難以閱讀。
在這個用例中,CSV文件包含字段ProductID、Price、name、saleAmount。以下是包含數據的示例文件(ProductID和Price字段中也缺少數據)。

1 # Read CSV from GCS
2 client = storage.Client()
3 bucket = client.bucket(bucket_name)
4 blob = bucket.blob(file_name)
5 data = blob.download_as_text()
6 df = pd.read_csv(io.StringIO(data))
72.將數據導入BigQuery
接下來,數據管道嘗試將數據導入到BigQuery中。如果由于模式不匹配、數據類型錯誤或缺少字段而導致任何行失敗,則捕獲并記錄它們以供進一步分析。這一步驟對于檢測底層錯誤信息至關重要,這些錯誤信息將用于識別OpenAI API的可能解決方案。
1 # Function to clean and preprocess data
2 def clean_data(df):
3 avg_price = get_average_price()
4
5 df["Price"] = df["Price"].fillna(avg_price)
6
7 # Log and remove rows with issues
8 error_rows = df[df["ProductID"].isna()]
9 clean_df = df.dropna(subset=["ProductID"])
10
11 return clean_df, error_rows
12
13 # Function to query BigQuery for an average price
14 def get_average_price():
15 client = bigquery.Client()
16 query = f"SELECT AVG(Price) AS avg_price FROM `{BQ_PROJECT_ID}.{BQ_DATASET_ID}.Product_Info`"
17
18 try:
19 df = client.query(query).to_dataframe()
20 avg_price = df["avg_price"][0]
21 print(f"Fetched Average Price: {avg_price}")
22 return avg_price
23 except Exception as e:
24 print(f"Error fetching average price: {e}")
25 return None
26注意,avg_price = get_average_price()是從BigQuery查詢中獲取的。
在插入干凈的數據集之后如下圖所示:

3.使用LLM分析錯誤
分析錯誤是整個流程中的關鍵步驟,這就是采用LLM的神奇之處。失敗的記錄被發送到GPT-4或DeepSeek R1等LLM進行分析。LLM檢查錯誤并提出更正建議和修正后的記錄。
例如,假設日期字段的格式不正確。在這種情況下,LLM可能會建議從字符串到整數轉換或從字符串到日期/時間戳轉換的正確格式記錄,反之亦然。在數據是預期的但發現為空的情況下,根據代碼強制執行的規則,帶有“平均”(Average)或“默認”(Default)值的缺失值將被修復,以確保數據攝取成功。
通過重試機制實現ChatCompletion請求。
為了確保彈性,利用tenacity實現了重試機制。該函數將錯誤細節發送給GPT并檢索建議的修復程序。在本文的示例中,創建了‘functions’(函數)列表,并使用ChatCompletion Request將其傳遞給JSON有效負載。
需要注意,‘functions’列表是使用在管道代碼中創建的Python函數來修復已知或可能問題的所有函數的列表。GPT分析輸入提示符和錯誤消息,以確定是否調用‘functions’列表中列出的特定函數來修復問題。
因此,GPT的響應提供了指示應該調用哪個函數的結構化數據。GPT不會直接執行函數,而是由數據管道來執行。
1 @retry(wait=wait_random_exponential(min=1, max=40), stop=stop_after_attempt(3))
2 def chat_completion_request(messages, functinotallow=None, model=GPT_MODEL):
3 headers = {
4 "Content-Type": "application/json",
5 "Authorization": "Bearer " + openai.api_key,
6 }
7 json_data = {"model": model, "messages": messages}
8 if functions is not None:
9 json_data.update({"functions": functions})
10 try:
11 response = requests.post(
12 "https://api.openai.com/v1/chat/completions",
13 headers=headers,
14 jsnotallow=json_data,
15 )
16 return response.json()
17 except Exception as e:
18 print("Unable to generate ChatCompletion response")
19 print(f"Exception: {e}")
20 return e
21 # Function to send ChatCompletion request to OpenAI API
22 functions = [
23 {
24 "name": "assign_product_id",
25 "description": "assigning a unique ProductID",
26 "parameters": {
27 "type": "object",
28 "properties": {
29 "ProductID": {
30 "type": "integer",
31 "description": "The product ID to assign."
32 },
33 }
34 },
35 }
36 ]
37assign_product_id是‘functions’列表中列出的函數,GPT可以在需要時調用它。在這個示例中,CSV文件的最后兩行缺少ProductID。因此,GPT調用特定的assign_product_id函數來確定ProductID值。
assign_product_id函數從BigQuery中獲取最高分配的ProductID,并將其遞增以供后續使用。如果它是首次運行,或者BigQuery表中沒有可用的數據,它將分配默認的99999作為ProductID。
1 def assign_product_id():
2 client = bigquery.Client()
3 # table_ref = client.dataset(BQ_DATASET_ID).table(BQ_TABLE_ID)
4
5 query = f"""
6 Select max(ProductID) as max_id from `{BQ_PROJECT_ID}.{BQ_DATASET_ID}.{BQ_TABLE_ID}` WHERE ProductID < 99999
7 """
8 df = client
9 try:
10 df = client.query(query).to_dataframe()
11 except Exception as e:
12 print(f"Error fetching max ProductID: {e}")
13 return None
14 return df["max_id"][0] + 1 if not df.empty else 99999
154.應用自動更正
數據管道將GPT的建議應用于失敗的記錄,并重新嘗試將它們導入到BigQuery中。如果更正成功,數據將存儲在主表中。如果沒有,不可修復的記錄將被記錄到一個單獨的錯誤表中,以供人工檢查。
在字段是必需且唯一的情況下,GPT可以從BigQuery獲得唯一的ProductID值,并在此值的基礎上加1,以確保其唯一性。考慮管道中有多個錯誤行的情況;每個記錄都按照GPT響應提供的修復程序順序處理,并用建議值更新錯誤記錄。
在以下的代碼中,ProductID被從assign_product_id()BigQuery表中獲取的值替換。當有多個錯誤行時,每個錯誤行都會通過遞增ProductID獲得一個唯一的數字。
1 # Function to send error data to GPT-4 for analysis
2 def analyze_errors_with_gpt(error_rows):
3 if error_rows.empty:
4 return error_rows
5
6 new_product_id = assign_product_id()
7
8 for index, row in error_rows.iterrows():
9 prompt = f"""
10 Fix the following ProductID by assigning a unique ProductID from the bigquery table Product_Info:
11 {row.to_json()}
12 """
13 chat_response = chat_completion_request(
14 model=GPT_MODEL,
15 messages=[{"role": "user", "content": prompt}],
16 functions=functions
17 )
18
19 if chat_response is not None:
20 try:
21 if chat_response["choices"][0]["message"]:
22 response_content = chat_response["choices"][0]["message"]
23 else:
24 print("Chat response content is None")
25 continue
26 except json.JSONDecodeError as e:
27 print(f"Error decoding JSON response: {e}")
28 continue
29
30 if 'function_call' in response_content:
31 if response_content['function_call']['name'] == 'assign_product_id':
32 res = json.loads(response_content['function_call']['arguments'])
33 res['product_id'] = new_product_id
34 error_rows.at[index, "ProductID"] = res['product_id']
35 new_product_id += 1 # Increment the ProductID for the next row
36
37 print(f"Assigned ProductID: {res['product_id']}")
38 else:
39 print("Function not supported")
40 else:
41 chat.add_prompt('assistant', response_content['content'])
42 else:
43 print("ChatCompletion request failed. Retrying...")
44
45 return error_rows
465.將已修改的行導入到BigQuery表中
main函數從谷歌云存儲(Google Cloud Storage)讀取數據并進行清理,并將有效數據導入到BigQuery中,同時動態修復錯誤。
1 # Main function to execute the pipeline
2 def main():
3 bucket_name = "self-healing-91"
4 file_name = "query_results.csv"
5
6 # Read CSV from GCS
7 client = storage.Client()
8 bucket = client.bucket(bucket_name)
9 blob = bucket.blob(file_name)
10 data = blob.download_as_text()
11 df = pd.read_csv(io.StringIO(data))
12
13 # Clean data and identify errors
14 clean_df, error_rows = clean_data(df)
15
16 # Load valid data into BigQuery
17 load_to_bigquery(clean_df, BQ_TABLE_ID)
18
19 # Process errors if any
20 if not error_rows.empty:
21
22 # Analyze errors with GPT-4
23 error_rows = analyze_errors_with_gpt(error_rows)
24
25 load_to_bigquery(error_rows, BQ_TABLE_ID)
26
27 print("Fixed Errors loaded successfully into BigQuery original table.")
28在修復數據錯誤之后,需要特別檢查第66至68行。從BigQuery表中獲取最大值10000 ProductID后,對這些ID逐一進行遞增處理。此外,錯誤行中缺少信息的Price字段被BigQuery表中的Avg(Price)替換。

6.日志記錄和監控
在整個過程中,使用云日志(Cloud Logging)記錄錯誤和數據管道的活動。這確保工程師可以監控數據管道的運行狀況并排查問題。
采用的工具和技術
以下是用來構建和測試數據管道的關鍵工具和技術:
- 云存儲:用于存儲輸入的CSV文件。
- 云函數:用于無服務器執行管道步驟。
- BigQuery:用于存儲清理過的數據和錯誤日志。
- GPT-4/DeepSeek R1:用于分析失敗記錄并提出更正建議。
- 云日志:用于監視和故障排除。
- 云編排器:它用于使用Apache氣流編排管道。
面臨的挑戰
1. LLM集成
將LLM集成到數據管道中頗具挑戰性。必須確保API調用是有效的,LLM的響應是準確的。此外,還有成本方面的考慮,由于為LLM配置API對于大型數據集來說可能成本高昂。因此,只需知道必須為該服務設置一個API密鑰。
例如,對于OpenAI,必須訪問https://platform.openai.com/來注冊和生成新的API密鑰,并在發送帶有提示的API調用時在數據管道中使用它。

2.錯誤處理
設計一個穩健的錯誤處理機制具有挑戰性。必須考慮各種錯誤,從模式不匹配到網絡問題,并確保數據管道能夠優雅地處理它們。數據管道可能會面臨許多問題,而且所有問題都不能動態解決,例如文件為空或BigQuery表不存在等問題。
3.可擴展性
隨著數據量的增長,必須優化數據管道以實現可擴展性。這涉及到在BigQuery中對數據進行分區,并使用Dataflow進行大規模處理。
4.成本管理
雖然谷歌云平臺提供了強大的工具,但使用這些工具需要支付費用。因此必須仔細監控使用情況并優化數據管道,以避免額外的成本。OpenAI API成本是需要仔細監控的另一個因素。
結論和要點
對于數據工程師來說,構建自我修復的數據管道是一個改變游戲規則的方法。它可以減少人工干預,提高效率,保證數據質量。然而,這并不是靈丹妙藥。雖然自我修復數據管道可以節省時間,但它們會帶來額外的成本,例如LLM API費用和增加的云函數的使用量。因此,權衡這些成本與收益至關重要。
對于自我修復數據管道領域的新手來說,建議從小型項目著手,首先嘗試集成大型語言模型(LLM)和處理基本錯誤,然后再逐步擴展。在這一過程中,定期監控數據管道的性能和成本。使用云監控和云日志之類的工具來識別瓶頸并進行相應的優化。最后,要與數據科學家、分析師和業務利益相關者緊密合作,了解他們的實際需求,并確保當業務需求發生變化時,其數據管道能夠持續創造價值。
總之,自我修復的數據管道代表著數據工程的未來。通過利用歌云平臺和LLM等工具,可以構建健壯、高效、智能的管道,從而最大限度地減少停機時間并提升生產效率。如果曾經受到脆弱的數據管道的困擾,可以探索和采用這一方法,而前期的努力將帶來長期的收益。
原文標題:Self-Healing Data Pipelines: The Next Big Thing in Data Engineering?,作者:Naresh Erukulla























