如何使用Python、Apache Kafka和云平臺構建健壯的實時數據管道
譯文譯者 | 李睿
審校 | 重樓
在當今競爭激烈的市場環境中,為了生存和發展,企業必須能夠實時收集、處理和響應數據。無論是檢測欺詐、個性化用戶體驗還是監控系統,現在都需要接近即時的數據。

然而,構建和運行任務關鍵型實時數據管道具有挑戰性?;A設施必須具有容錯性、無限可擴展性,并與各種數據源和應用程序集成。這就是ApacheKafka、Python和云平臺的用武之地。
這個綜合指南中將介紹:
- 概述Apache Kafka架構
- 在云中運行Kafka集群
- 使用Python構建實時數據管道
- 使用PySpark進行擴展處理
- 實際示例,例如用戶活動跟蹤、物聯網數據管道,并支持聊天分析
這里將包括大量的代碼片段、配置示例和文檔鏈接,以便獲得這些非常有用的技術的實踐經驗。
Apache Kafka架構介紹
Apache Kafka是一個分布式、分區、復制的提交日志,用于可靠且大規模地存儲數據流。Apache Kafka的核心是提供以下功能:
- 發布-訂閱消息:Kafka允許廣播來自生產者的數據流,例如頁面瀏覽量、交易、用戶事件等,并支持消費者實時消費。
- 消息存儲:Kafka在消息到達時將其持久保存在磁盤上,并在指定的時間內保留它們。消息通過指示日志中位置的偏移量來存儲和索引。
- 容錯:數據在可配置數量的服務器上復制。如果一臺服務器宕機,另一臺服務器可以保證持續運行。
- 橫向可擴展性:Kafka集群可以通過簡單地添加更多的服務器來彈性擴展。這允許無限的存儲和處理能力。
Kafka架構由以下主要組件組成:
(1)主題
消息被發布到名為“主題”的類別中。每個主題都充當消息提要或消息隊列。常見的場景是每個消息類型或數據流的一個主題。Kafka主題中的每條消息都有一個唯一的標識符,稱為偏移量,它代表了在主題中的位置。一個主題可以分為多個分區,這些分區是可以存儲在不同代理上的主題片段。分區允許Kafka通過在多個消費者之間分配負載來擴展和并行化數據處理。
(2)生產者
生產者是向Kafka主題發布消息的應用程序。它們連接到Kafka集群,序列化數據(例如JSON或Avro),分配一個密鑰,并將其發送到適當的主題。
例如,一個Web應用程序可以產生點擊流事件,或者一個移動應用程序可以產生使用統計。
(3)消費者
消費者從Kafka主題中讀取消息并進行處理。處理可能涉及解析數據、驗證、聚合、過濾、存儲到數據庫等。
消費者連接到Kafka集群,并訂閱一個或多個主題來獲取消息提要,然后根據用例需求進行處理。
(4)代理
這是一個Kafka服務器,它接收來自生產者的消息,分配偏移量,將消息提交到存儲中,并將數據提供給消費者。Kafka集群由多個代理組成,以實現可擴展性和容錯性。
(5)ZooKeeper
ZooKeeper處理代理之間的協調和共識,例如控制器選舉和主題配置。它維護Kafka操作所需的集群狀態和配置信息。
這涵蓋了Kafka的基礎知識。要深入了解,可以參考一些Kafka文檔。
以下了解如何通過在云中運行Kafka來簡化管理。
在云中運行Kafka
雖然Kafka具有高度可擴展性和可靠性,但它的運行涉及部署、基礎設施管理、監控、安全、故障處理、升級等方面的大量工作。
值得慶幸的是,Kafka現在是所有主要云計算提供商提供的完全托管服務:
服務 | 描述 | 定價 |
AWS MSK | 在AWS上完全托管、高可用的Apache Kafka集群。處理基礎設施,擴展,安全,故障處理等。 | 基于代理的數量 |
Google Cloud Pub/Sub | 基于Kafka的無服務器實時消息服務。自動擴展,至少一次交付保證。 | 基于使用指標 |
Confluent Cloud | 完全管理的事件流平臺,由Apache Kafka提供支持。提供免費層。 | 基于功能的分層定價 |
Azure Event Hubs | Apache Kafka的高吞吐量事件攝取服務。與Azure數據服務的集成。 | 基于吞吐量單位 |
托管服務抽象了Kafka操作的復雜性,可以讓用戶專注數據管道。
接下來,將使用Python、Kafka和云平臺構建一個實時管道。也可以參考以下的指南作為另一個示例。
構建實時數據管道
Kafka的基本實時管道有兩個主要組件:向Kafka發布消息的生產者和訂閱主題并處理消息的消費者。
其架構遵循以下流程:

為了進行簡化,將使用Confluent Kafka Python客戶端庫。
1. Python生產者
生產者應用程序從數據源收集數據并將其發布到Kafka主題。作為一個例子,假設有一個Python服務從一個Web應用程序收集用戶點擊流事件。
在Web應用程序中,當用戶的行為像是頁面瀏覽或產品評級時,可以捕獲這些事件并將它們發送給Kafka。
可以抽象出Web應用程序如何收集數據的實現細節。
Python
from confluent_kafka import Producer
import json
# User event data
event = {
"timestamp": "2022-01-01T12:22:25",
"userid": "user123",
"page": "/product123",
"action": "view"
}
# Convert to JSON
event_json = json.dumps(event)
# Kafka producer configuration
conf = {
'bootstrap.servers': 'my_kafka_cluster-xyz.cloud.provider.com:9092',
'client.id': 'clickstream-producer'
}
# Create producer instance
producer = Producer(conf)
# Publish event
producer.produce(topic='clickstream', value=event_json)
# Flush and close producer
producer.flush()
producer.close()這將事件發布到云托管Kafka集群上的clickstream主題。
Confluent_Kafka Python客戶端在將消息發送到Kafka之前使用內部緩沖區來批處理消息。與單獨發送每條消息相比,這提高了效率。
在默認情況下,消息會在緩沖區中累積,直到:
(1)已達到緩沖區大小限制(默認為32MB)。
(2)調用flush()方法。
當調用flush()時,緩沖區中的任何消息都會立即發送到Kafka代理。
如果不調用flush(),而是依賴于緩沖區大小限制,那么在下一次自動刷新之前,如果發生故障,就有丟失事件的風險。調用flush()能夠更好地控制最小化潛在的消息丟失。
但是,在每次生產后調用flush()會帶來額外的開銷。找到合適的緩沖配置取決于特定的可靠性需求和吞吐量需求。
可以在事件發生時不斷添加事件來構建實時流。這為下游數據消費者提供了連續的事件提要。
2.Python消費者
接下來,有一個消費者應用程序來從Kafka攝取事件并處理它們。
例如,可能想要解析事件,篩選特定的子類型,并驗證模式。
Python
from confluent_kafka import Consumer
import json
# Kafka consumer configuration
conf = {'bootstrap.servers': 'my_kafka_cluster-xyz.cloud.provider.com:9092',
'group.id': 'clickstream-processor',
'auto.offset.reset': 'earliest'}
# Create consumer instance
consumer = Consumer(conf)
# Subscribe to 'clickstream' topic
consumer.subscribe(['clickstream'])
# Poll Kafka for messages infinitely
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
# Parse JSON from message value
event = json.loads(msg.value())
# Process event based on business logic
if event['action'] == 'view':
print('User viewed product page')
elif event['action'] == 'rating':
# Validate rating, insert to DB etc
pass
print(event) # Print event
# Close consumer
consumer.close()這個輪詢clickstream主題以獲取新消息,使用它們,并根據事件類型采取行動——打印、更新數據庫等。
對于一個簡單的管道來說,這很有效。但如果每秒事件數增加100倍呢?消費者將無法跟上其增長。這就是像PySpark這樣的工具可以幫助擴展處理的地方。
3.使用PySpark進行擴展
PySpark為Apache Spark提供了一個Python API,Apache Spark是一個為大規模數據處理優化的分布式計算框架。
使用PySpark,可以利用Spark的內存計算和并行執行來更快地使用Kafka流。
首先,將Kafka數據加載到DataFrame中,DataFrame可以使用Spark SQL或Python進行操作。
Python
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName('clickstream-consumer') \
.getOrCreate()
# Read stream from Kafka 'clickstream'
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \
.option("subscribe", "clickstream") \
.load()
# Parse JSON from value
df = df.selectExpr("CAST(value AS STRING)")
df = df.select(from_json(col("value"), schema).alias("data"))
Next, we can express whatever processing logic we need using DataFrame transformations:
from pyspark.sql.functions import *
# Filter for 'page view' events
views = df.filter(col("data.action") == "view")
# Count views per page URL
counts = views.groupBy(col("data.page"))
.count()
.orderBy("count")
# Print the stream
query = counts.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()它利用Spark的分布式運行時,在數據流上實時應用過濾、聚合和排序等操作。
還可以使用多個消費者組并行化消費,并將輸出接收器寫入數據庫、云存儲等。
這允許在Kafka的數據上構建可擴展的流處理。
現在已經介紹了端到端管道,以下了解應用它的一些實際用例。
實際用例
以下探索一些實際用例,在這些用例中,這些技術可以幫助大規模地處理大量實時數據。
1.用戶活動跟蹤
許多現代網絡和移動應用程序跟蹤用戶的行為,例如頁面瀏覽量、按鈕點擊、交易等,以收集使用情況分析。
(1)問題
- 數據量可以隨著數百萬活躍用戶而大規模擴展。
- 需要實時洞察以檢測問題并個性化內容。
- 希望為歷史報表存儲匯總數據。
(2)解決方案
- 使用Python或任何語言將點擊流事件攝取到Kafka主題中。
- 使用PySpark進行清理、聚合和分析。
- 將輸出保存到數據庫,例如Cassandra的儀表板。
- 使用Spark ML實時警報檢測異常。
2.物聯網數據管道
物聯網傳感器產生大量的實時遙測數據,例如溫度、壓力、位置等。
(1)問題
- 每秒產生數百萬個傳感器事件。
- 需要清洗、改造、豐富。
- 需要實時監控和歷史存儲。
(2)解決方案
- 使用語言SDK收集Kafka主題中的傳感器數據。
- 使用PySpark進行數據整理和連接外部數據。
- 將數據流輸入機器學習模型進行實時預測。
- 將聚合數據存儲在時間序列數據庫中以實現可視化。
3.客戶支持聊天分析
像Zendesk這樣的聊天平臺捕獲了大量的客戶支持對話。
(1)問題
- 每月產生數百萬條聊天信息。
- 需要了解客戶痛點和代理表現。
- 必須發現負面情緒和緊急問題。
(2)解決方案
- 使用連接器將聊天記錄導入Kafka主題。
- 使用PySpark SQL和DataFrames進行聚合和處理。
- 將數據輸入NLP模型,對情緒和意圖進行分類。
- 存儲洞察到數據庫的歷史報告。
- 為聯絡中心操作提供實時儀表板。
這個用例演示了如何將這些技術應用于涉及大量快速移動數據的實際業務問題。
結論
綜上所述, Python、Kafka和云平臺為構建健壯的、可擴展的實時數據管道提供了一個很好的組合。
原文標題:Building Robust Real-Time Data Pipelines With Python, Apache Kafka, and the Cloud,作者:Dmitrii Mitiaev
































