精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

超詳細講解,帶你零基礎(chǔ)入門kafka!

開發(fā) 后端 Kafka
本文超詳細的講解了零基礎(chǔ)入門kafka,一起來看看吧。

認識 kafka

kafka簡介

Kafka 是一個分布式流媒體平臺,kafka官網(wǎng):http://kafka.apache.org/

1)流媒體平臺有三個關(guān)鍵功能:

  •  「發(fā)布和訂閱記錄流」,類似于消息隊列或企業(yè)消息傳遞系統(tǒng)。
  •  「以容錯的持久方式存儲」 記錄流。
  •  記錄發(fā)生時處理流。

2)Kafka通常用于兩大類應(yīng)用:

  •  構(gòu)建可在 「系統(tǒng)或應(yīng)用程序之間」 可靠獲取數(shù)據(jù)的實時流數(shù)據(jù)管道
  •  構(gòu)建轉(zhuǎn)換或響應(yīng)數(shù)據(jù)流的實時流應(yīng)用程序

3)首先是幾個概念:

  •  Kafka作為一個集群運行在一個或多個可跨多個**「數(shù)據(jù)中心的服務(wù)器」**上。
  •  Kafka集群以稱為** topics主題**的類別存儲記錄流。
  •  每條記錄都包含**「一個鍵,一個值和一個時間戳」**。

4)Kafka有四個核心API:

  •  「Producer API(生產(chǎn)者API」)允許應(yīng)用程序發(fā)布記錄流至一個或多個kafka的topics(主題)。
  •  「Consumer API(消費者API」)允許應(yīng)用程序訂閱一個或多個topics(主題),并處理所產(chǎn)生的對他們記錄的數(shù)據(jù)流。
  •  Streams API(流API) 允許應(yīng)用程序充當(dāng)流處理器,從一個或多個topics(主題)消耗的輸入流,并產(chǎn)生一個輸出流至一個或多個輸出的topics(主題),有效地變換所述輸入流,以輸出流。
  •  「Connector API(連接器API」)允許構(gòu)建和運行kafka topics(主題)連接到現(xiàn)有的應(yīng)用程序或數(shù)據(jù)系統(tǒng)中重用生產(chǎn)者或消費者。例如,關(guān)系數(shù)據(jù)庫的連接器可能捕獲對表的每個更改。

圖片

在Kafka中,客戶端和服務(wù)器之間的通信是通過簡單,高性能,語言無關(guān)的TCP協(xié)議完成的。此協(xié)議已版本化并保持與舊版本的向后兼容性。Kafka提供Java客戶端,但客戶端有多種語言版本。

1.2 Topics主題 和 partitions分區(qū)

我們首先深入了解 Kafka 為記錄流提供的核心抽象 - 主題topics

一個Topic可以認為是一類消息,每個topic將被分成多個partition(區(qū)),每個partition在存儲層面是append log文件

主題是發(fā)布記錄的類別或訂閱源名稱。Kafka的主題總是多用戶; 也就是說,一個主題可以有零個,一個或多個消費者訂閱寫入它的數(shù)據(jù)。

對于每個主題,Kafka群集都維護一個如下所示的分區(qū)日志:

圖片

每個分區(qū)都是一個有序的,不可變的記錄序列,不斷附加到結(jié)構(gòu)化的提交日志中。分區(qū)中的記錄每個都分配了一個稱為偏移的順序ID號,它唯一地標(biāo)識分區(qū)中的每個記錄。

Kafka集群持久保存所有已發(fā)布的記錄 - 無論是否已使用 - 使用可配置的保留期。例如,如果保留策略設(shè)置為兩天,則在發(fā)布記錄后的兩天內(nèi),它可供使用,之后將被丟棄以釋放空間。Kafka的性能在數(shù)據(jù)大小方面實際上是恒定的,因此長時間存儲數(shù)據(jù)不是問題。

圖片

實際上,基于每個消費者保留的唯一元數(shù)據(jù)是該消費者在日志中的偏移或位置。這種偏移由消費者控制:通常消費者在讀取記錄時會線性地提高其偏移量,但事實上,由于該位置由消費者控制,因此它可以按照自己喜歡的任何順序消費記錄。例如,消費者可以重置為較舊的偏移量來重新處理過去的數(shù)據(jù),或者跳到最近的記錄并從“現(xiàn)在”開始消費。

這些功能組合意味著Kafka 消費者consumers 非常cheap - 他們可以來來往往對集群或其他消費者沒有太大影響。例如,您可以使用我們的命令行工具“tail”任何主題的內(nèi)容,而無需更改任何現(xiàn)有使用者所消耗的內(nèi)容。

日志中的分區(qū)有多種用途。首先,它們允許日志擴展到超出適合單個服務(wù)器的大小。每個單獨的分區(qū)必須適合托管它的服務(wù)器,但主題可能有許多分區(qū),因此它可以處理任意數(shù)量的數(shù)據(jù)。其次,它們充當(dāng)了并行性的單位 - 更多的是它。

1.3 Distribution分配

一個Topic的多個partitions,被分布在kafka集群中的多個server上;每個server(kafka實例)負責(zé)partitions中消息的讀寫操作;此外kafka還可以配置partitions需要備份的個數(shù)(replicas),每個partition將會被備份到多臺機器上,以提高可用性.

基于replicated方案,那么就意味著需要對多個備份進行調(diào)度;每個partition都有一個server為"leader";leader負責(zé)所有的讀寫操作,如果leader失效,那么將會有其他follower來接管(成為新的leader);follower只是單調(diào)的和leader跟進,同步消息即可..由此可見作為leader的server承載了全部的請求壓力,因此從集群的整體考慮,有多少個partitions就意味著有多少個"leader",kafka會將"leader"均衡的分散在每個實例上,來確保整體的性能穩(wěn)定。

1.4 Producers生產(chǎn)者 和 Consumers消費者

1.4.1 Producers生產(chǎn)者

Producers 將數(shù)據(jù)發(fā)布到指定的topics 主題。同時Producer 也能決定將此消息歸屬于哪個partition;比如基于"round-robin"方式或者通過其他的一些算法等。

1.4.2 Consumers

  •  本質(zhì)上kafka只支持Topic.每個consumer屬于一個consumer group;反過來說,每個group中可以有多個consumer.發(fā)送到Topic的消息,只會被訂閱此Topic的每個group中的一個consumer消費。
  •  如果所有使用者實例具有相同的使用者組,則記錄將有效地在使用者實例上進行負載平衡。
  •  如果所有消費者實例具有不同的消費者組,則每個記錄將廣播到所有消費者進程。

圖片

分析:兩個服務(wù)器Kafka群集,托管四個分區(qū)(P0-P3),包含兩個使用者組。消費者組A有兩個消費者實例,B組有四個消費者實例。

在Kafka中實現(xiàn)消費consumption 的方式是通過在消費者實例上劃分日志中的分區(qū),以便每個實例在任何時間點都是分配的“公平份額”的獨占消費者。維護組中成員資格的過程由Kafka協(xié)議動態(tài)處理。如果新實例加入該組,他們將從該組的其他成員接管一些分區(qū); 如果實例死亡,其分區(qū)將分發(fā)給其余實例。

Kafka僅提供分區(qū)內(nèi)記錄的總訂單,而不是主題中不同分區(qū)之間的記錄。對于大多數(shù)應(yīng)用程序而言,按分區(qū)排序與按鍵分區(qū)數(shù)據(jù)的能力相結(jié)合就足夠了。但是,如果您需要對記錄進行總訂單,則可以使用僅包含一個分區(qū)的主題來實現(xiàn),但這將意味著每個使用者組只有一個使用者進程。

1.5 Consumers kafka確保

  •  發(fā)送到partitions中的消息將會按照它接收的順序追加到日志中。也就是說,如果記錄M1由與記錄M2相同的生成者發(fā)送,并且首先發(fā)送M1,則M1將具有比M2更低的偏移并且在日志中更早出現(xiàn)。
  •  消費者實例按照它們存儲在日志中的順序查看記錄。對于消費者而言,它們消費消息的順序和日志中消息順序一致。
  •  如果Topic的"replicationfactor"為N,那么允許N-1個kafka實例失效,我們將容忍最多N-1個服務(wù)器故障,而不會丟失任何提交到日志的記錄。

1.6 kafka作為消息系統(tǒng)

Kafka的流概念與傳統(tǒng)的企業(yè)郵件系統(tǒng)相比如何?

(1)傳統(tǒng)消息系統(tǒng)

消息傳統(tǒng)上有兩種模型:queuing排隊 and publish-subscribe發(fā)布 - 訂閱。在隊列中,消費者池可以從服務(wù)器讀取并且每個記錄轉(zhuǎn)到其中一個; 在發(fā)布 - 訂閱中,記錄被廣播給所有消費者。這兩種模型中的每一種都有優(yōu)點和缺點。排隊的優(yōu)勢在于它允許您在多個消費者實例上劃分數(shù)據(jù)處理,從而可以擴展您的處理。不幸的是,一旦一個進程讀取它已經(jīng)消失的數(shù)據(jù),隊列就不是多用戶。發(fā)布 - 訂閱允許您將數(shù)據(jù)廣播到多個進程,但由于每條消息都發(fā)送給每個訂閱者,因此無法進行擴展處理。

卡夫卡的消費者群體概念概括了這兩個概念。與隊列一樣,使用者組允許您將處理劃分為一組進程(使用者組的成員)。與發(fā)布 - 訂閱一樣,Kafka允許您向多個消費者組廣播消息。

(2)kafka 的優(yōu)勢

Kafka模型的優(yōu)勢在于每個主題都具有這些屬性 - 它可以擴展處理并且也是多用戶 - 不需要選擇其中一個。

與傳統(tǒng)的消息系統(tǒng)相比,Kafka具有更強的訂購保證。

傳統(tǒng)隊列在服務(wù)器上按順序保留記錄,如果多個消費者從隊列中消耗,則服務(wù)器按照存儲順序分發(fā)記錄。但是,雖然服務(wù)器按順序分發(fā)記錄,但是記錄是異步傳遞給消費者的,因此它們可能會在不同的消費者處出現(xiàn)故障。這實際上意味著在存在并行消耗的情況下丟失記錄的順序。消息傳遞系統(tǒng)通常通過具有“獨占消費者”概念來解決這個問題,該概念只允許一個進程從隊列中消耗,但當(dāng)然這意味著處理中沒有并行性。

kafka做得更好。通過在主題中具有并行性概念 - 分區(qū) - ,Kafka能夠在消費者流程池中提供訂購保證和負載平衡。這是通過將主題中的分區(qū)分配給使用者組中的使用者來實現(xiàn)的,以便每個分區(qū)僅由該組中的一個使用者使用。通過這樣做,我們確保使用者是該分區(qū)的唯一讀者并按順序使用數(shù)據(jù)。由于有許多分區(qū),這仍然可以平衡許多消費者實例的負載。但請注意,消費者組中的消費者實例不能超過分區(qū)。

1.7 kafka作為存儲系統(tǒng)

  •  任何允許發(fā)布與消費消息分離的消息的消息隊列實際上充當(dāng)了正在進行的消息的存儲系統(tǒng)。Kafka的不同之處在于它是一個非常好的存儲系統(tǒng)。
  •  寫入Kafka的數(shù)據(jù)將寫入磁盤并進行復(fù)制以實現(xiàn)容錯。Kafka允許生產(chǎn)者等待確認,以便在完全復(fù)制之前寫入不被認為是完整的,并且即使寫入的服務(wù)器失敗也保證寫入仍然存在。
  •  磁盤結(jié)構(gòu)Kafka很好地使用了規(guī)模 - 無論服務(wù)器上有50 KB還是50 TB的持久數(shù)據(jù),Kafka都會執(zhí)行相同的操作。
  •  由于認真對待存儲并允許客戶端控制其讀取位置,您可以將Kafka視為一種專用于高性能,低延遲提交日志存儲,復(fù)制和傳播的專用分布式文件系統(tǒng)。

1.8 kafka用于流處理

  •  僅僅讀取,寫入和存儲數(shù)據(jù)流是不夠的,目的是實現(xiàn)流的實時處理。
  •  在Kafka中,流處理器是指從輸入主題獲取連續(xù)數(shù)據(jù)流,對此輸入執(zhí)行某些處理以及生成連續(xù)數(shù)據(jù)流以輸出主題的任何內(nèi)容。
  •  例如,零售應(yīng)用程序可能會接收銷售和發(fā)貨的輸入流,并輸出重新排序流和根據(jù)此數(shù)據(jù)計算的價格調(diào)整。
  •  可以使用生產(chǎn)者和消費者API直接進行簡單處理。但是,對于更復(fù)雜的轉(zhuǎn)換,Kafka提供了完全集成的Streams API。這允許構(gòu)建執(zhí)行非平凡處理的應(yīng)用程序,這些應(yīng)用程序可以計算流的聚合或?qū)⒘鬟B接在一起。
  •  此工具有助于解決此類應(yīng)用程序面臨的難題:處理無序數(shù)據(jù),在代碼更改時重新處理輸入,執(zhí)行有狀態(tài)計算等。
  •  流API構(gòu)建在Kafka提供的核心原語上:它使用生產(chǎn)者和消費者API進行輸入,使用Kafka進行有狀態(tài)存儲,并在流處理器實例之間使用相同的組機制來實現(xiàn)容錯。

2、kafka使用場景

2.1 消息Messaging

Kafka可以替代更傳統(tǒng)的消息代理。消息代理的使用有多種原因(將處理與數(shù)據(jù)生成器分離,緩沖未處理的消息等)。與大多數(shù)消息傳遞系統(tǒng)相比,Kafka具有更好的吞吐量,內(nèi)置分區(qū),復(fù)制和容錯功能,這使其成為大規(guī)模消息處理應(yīng)用程序的理想解決方案。

根據(jù)經(jīng)驗,消息傳遞的使用通常相對較低,但可能需要較低的端到端延遲,并且通常取決于Kafka提供的強大的耐用性保證。

在這個領(lǐng)域,Kafka可與傳統(tǒng)的消息傳遞系統(tǒng)(如ActiveMQ或 RabbitMQ)相媲美。

2.2 網(wǎng)站活動跟蹤

Kafka的原始用例是能夠?qū)⒂脩艋顒痈櫣艿乐亟橐唤M實時發(fā)布 - 訂閱源。這意味著站點活動(頁面查看,搜索或用戶可能采取的其他操作)將發(fā)布到中心主題,每個活動類型包含一個主題。這些源可用于訂購一系列用例,包括實時處理,實時監(jiān)控以及加載到Hadoop或離線數(shù)據(jù)倉庫系統(tǒng)以進行脫機處理和報告。

活動跟蹤通常非常高,因為為每個用戶頁面視圖生成了許多活動消息。

2.3 度量Metrics

Kafka通常用于運營監(jiān)控數(shù)據(jù)。這涉及從分布式應(yīng)用程序聚合統(tǒng)計信息以生成操作數(shù)據(jù)的集中式提要。

2.4 日志聚合

許多人使用Kafka作為日志聚合解決方案的替代品。日志聚合通常從服務(wù)器收集物理日志文件,并將它們放在中央位置(可能是文件服務(wù)器或HDFS)進行處理。Kafka抽象出文件的細節(jié),并將日志或事件數(shù)據(jù)作為消息流更清晰地抽象出來。這允許更低延遲的處理并更容易支持多個數(shù)據(jù)源和分布式數(shù)據(jù)消耗。與Scribe或Flume等以日志為中心的系統(tǒng)相比,Kafka提供了同樣出色的性能,由于復(fù)制而具有更強的耐用性保證,以及更低的端到端延遲。

2.5 流處理

許多Kafka用戶在處理由多個階段組成的管道時處理數(shù)據(jù),其中原始輸入數(shù)據(jù)從Kafka主題中消費,然后聚合,豐富或以其他方式轉(zhuǎn)換為新主題以供進一步消費或后續(xù)處理。

例如,用于推薦新聞文章的處理管道可以從RSS訂閱源抓取文章內(nèi)容并將其發(fā)布到“文章”主題; 進一步處理可能會對此內(nèi)容進行規(guī)范化或重復(fù)數(shù)據(jù)刪除,并將已清理的文章內(nèi)容發(fā)布到新主題; 最終處理階段可能會嘗試向用戶推薦此內(nèi)容。此類處理管道基于各個主題創(chuàng)建實時數(shù)據(jù)流的圖形。從0.10.0.0開始,這是一個輕量級但功能強大的流處理庫,名為Kafka Streams 在Apache Kafka中可用于執(zhí)行如上所述的此類數(shù)據(jù)處理。除了Kafka Streams之外,其他開源流處理工具包括Apache Storm和 Apache Samza。

2.6 Event Sourcing

Event Sourcing是一種應(yīng)用程序設(shè)計風(fēng)格,其中狀態(tài)更改記錄為按時間排序的記錄序列。Kafka對非常大的存儲日志數(shù)據(jù)的支持使其成為以這種風(fēng)格構(gòu)建的應(yīng)用程序的出色后端。

2.7 提交日志

Kafka可以作為分布式系統(tǒng)的一種外部提交日志。該日志有助于在節(jié)點之間復(fù)制數(shù)據(jù),并充當(dāng)故障節(jié)點恢復(fù)其數(shù)據(jù)的重新同步機制。Kafka中的日志壓縮功能有助于支持此用法。在這種用法中,Kafka類似于Apache BookKeeper項目。

3、kafka安裝

3.1 下載安裝

到官網(wǎng)http://kafka.apache.org/downloads.html下載想要的版本。

注:由于Kafka控制臺腳本對于基于Unix和Windows的平臺是不同的,因此在Windows平臺上使用bin\windows\ 而不是bin/ 將腳本擴展名更改為.bat。 

  1. [root@along ~]# wget http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz    
  2. [root@along ~]# tar -C /data/ -xvf kafka_2.11-2.1.0.tgz    
  3. [root@along ~]# cd /data/kafka_2.11-2.1.0/   

3.2 配置啟動zookeeper

kafka正常運行,必須配置zookeeper,否則無論是kafka集群還是客戶端的生存者和消費者都無法正常的工作的;所以需要配置啟動zookeeper服務(wù)。

(1)zookeeper需要java環(huán)境 

  1. [root@along ~]# yum -y install java-1.8.0   

(2)這里kafka下載包已經(jīng)包括zookeeper服務(wù),所以只需修改配置文件,啟動即可。

如果需要下載指定zookeeper版本;可以單獨去zookeeper官網(wǎng)http://mirrors.shu.edu.cn/apache/zookeeper/下載指定版本。 

  1. [root@along ~]# cd /data/kafka_2.11-2.1.0/    
  2. [root@along kafka_2.11-2.1.0]# grep "^[^#]" config/zookeeper.properties    
  3. dataDir=/tmp/zookeeper   #數(shù)據(jù)存儲目錄    
  4. clientPort=2181   #zookeeper端口    
  5. maxClientCnxns=0   

注:可自行添加修改zookeeper配置

3.3 配置kafka

(1)修改配置文件 

  1. [root@along kafka_2.11-2.1.0]# grep "^[^#]" config/server.properties    
  2. broker.id=0    
  3. listeners=PLAINTEXT://localhost:9092    
  4. num.network.threads=3    
  5. num.io.threads=8    
  6. socket.send.buffer.bytes=102400    
  7. socket.receive.buffer.bytes=102400   
  8. socket.request.max.bytes=104857600    
  9. log.dirs=/tmp/kafka-logs    
  10. num.partitions=1    
  11. num.recovery.threads.per.data.dir=1  
  12. offsets.topic.replication.factor=1    
  13. transaction.state.log.replication.factor=1    
  14. transaction.state.log.min.isr=1    
  15. log.retention.hours=168    
  16. log.segment.bytes=1073741824    
  17. log.retention.check.interval.ms=300000    
  18. zookeeper.connect=localhost:2181    
  19. zookeeper.connection.timeout.ms=6000    
  20. group.initial.rebalance.delay.ms=0   

注:可根據(jù)自己需求修改配置文件 

  1. broker.id:#唯一標(biāo)識ID    
  2. listeners=PLAINTEXT://localhost:9092:#kafka服務(wù)監(jiān)聽地址和端口    
  3. log.dirs:#日志存儲目錄   
  4. zookeeper.connect:#指定zookeeper服務(wù)   

(2)配置環(huán)境變量 

  1. [root@along ~]# vim /etc/profile.d/kafka.sh    
  2. export KAFKA_HOME="/data/kafka_2.11-2.1.0"    
  3. export PATH="${KAFKA_HOME}/bin:$PATH"    
  4. [root@along ~]# source /etc/profile.d/kafka.sh   

(3)配置服務(wù)啟動腳本 

  1. [root@along ~]# vim /etc/init.d/kafka    
  2. #!/bin/sh    
  3. #    
  4. # chkconfig: 345 99 01   
  5. # description: Kafka    
  6. #    
  7. # File : Kafka    
  8. #    
  9. # Description: Starts and stops the Kafka server    
  10. #    
  11. source /etc/rc.d/init.d/functions   
  12. KAFKA_HOME=/data/kafka_2.11-2.1.0    
  13. KAFKA_USER=root    
  14. export LOG_DIR=/tmp/kafka-logs    
  15. [ -e /etc/sysconfig/kafka ] && . /etc/sysconfig/kafka  
  16.  # See how we were called.    
  17. case "$1" in   
  18.    start)    
  19.     echo -n "Starting Kafka:"    
  20.     /sbin/runuser -s /bin/sh $KAFKA_USER -c "nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > $LOG_DIR/server.out 2> $LOG_DIR/server.err &"   
  21.     echo " done."    
  22.     exit 0    
  23.     ;;     
  24.    stop)  
  25.      echo -n "Stopping Kafka: "    
  26.     /sbin/runuser -s /bin/sh $KAFKA_USER  -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill \-9"   
  27.      echo " done."    
  28.     exit 0    
  29.     ;;    
  30.   hardstop)    
  31.     echo -n "Stopping (hard) Kafka: "  
  32.     /sbin/runuser -s /bin/sh $KAFKA_USER  -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill -9"   
  33.      echo " done."   
  34.     exit 0   
  35.      ;;     
  36.    status)   
  37.      c_pid=`ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}'`   
  38.     if [ "$c_pid" = "" ] ; then    
  39.       echo "Stopped"    
  40.       exit 3    
  41.     else    
  42.       echo "Running $c_pid"    
  43.       exit 0    
  44.     fi    
  45.     ;;     
  46.    restart)    
  47.     stop    
  48.     start    
  49.     ;;    
  50.    *)    
  51.     echo "Usage: kafka {start|stop|hardstop|status|restart}"   
  52.      exit 1    
  53.     ;;    
  54.  esac   

3.4 啟動kafka服務(wù)

(1)后臺啟動zookeeper服務(wù) 

  1. [root@along ~]# nohup zookeeper-server-start.sh /data/kafka_2.11-2.1.0/config/zookeeper.properties &   

(2)啟動kafka服務(wù) 

  1. [root@along ~]# service kafka start    
  2. Starting kafka (via systemctl):                            [  OK  ]    
  3. [root@along ~]# service kafka status    
  4. Running 86018    
  5. [root@along ~]# ss -nutl    
  6. Netid State      Recv-Q Send-Q     Local Address:Port                    Peer Address:Port                             
  7. tcp   LISTEN     0      50                    :::9092                              :::*               
  8. tcp   LISTEN     0      50                    :::2181                              :::*    

4、kafka使用簡單入門

4.1 創(chuàng)建主題topics

創(chuàng)建一個名為“along”的主題,它只包含一個分區(qū),只有一個副本: 

  1. [root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic along    
  2. Created topic "along".   

如果我們運行l(wèi)ist topic命令,我們現(xiàn)在可以看到該主題: 

  1. [root@along ~]# kafka-topics.sh --list --zookeeper localhost:2181    
  2. along   

4.2 發(fā)送一些消息

Kafka附帶一個命令行客戶端,它將從文件或標(biāo)準輸入中獲取輸入,并將其作為消息發(fā)送到Kafka集群。默認情況下,每行將作為單獨的消息發(fā)送。

運行生產(chǎn)者,然后在控制臺中鍵入一些消息以發(fā)送到服務(wù)器。 

  1. [root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic along    
  2. >This is a message    
  3. >This is another message   

4.3 啟動消費者

Kafka還有一個命令行使用者,它會將消息轉(zhuǎn)儲到標(biāo)準輸出。 

  1. [root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic along --from-beginning    
  2. This is a message    
  3. This is another message   

所有命令行工具都有其他選項; 運行不帶參數(shù)的命令將顯示更詳細地記錄它們的使用信息。

5、設(shè)置多代理kafka群集

到目前為止,我們一直在與一個broker運行,但這并不好玩。對于Kafka,單個代理只是一個大小為1的集群,因此除了啟動一些代理實例之外沒有太多變化。但是為了感受它,讓我們將我們的集群擴展到三個節(jié)點(仍然在我們的本地機器上)。

5.1 準備配置文件 

  1. [root@along kafka_2.11-2.1.0]# cd /data/kafka_2.11-2.1.0/    
  2. [root@along kafka_2.11-2.1.0]# cp config/server.properties config/server-1.properties    
  3. [root@along kafka_2.11-2.1.0]# cp config/server.properties config/server-2.properties    
  4. [root@along kafka_2.11-2.1.0]# vim config/server-1.properties    
  5.     broker.id=1    
  6.     listeners=PLAINTEXT://:9093    
  7.     log.dirs=/tmp/kafka-logs-1    
  8. [root@along kafka_2.11-2.1.0]# vim config/server-2.properties    
  9.     broker.id=2    
  10.     listeners=PLAINTEXT://:9094    
  11.     log.dirs=/tmp/kafka-logs-2   

注:該broker.id 屬性是群集中每個節(jié)點的唯一且永久的名稱。我們必須覆蓋端口和日志目錄,因為我們在同一臺機器上運行這些,并且我們希望讓所有代理嘗試在同一端口上注冊或覆蓋彼此的數(shù)據(jù)。

5.2 開啟集群另2個kafka服務(wù) 

  1. [root@along ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-1.properties &    
  2. [root@along ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-2.properties &    
  3. [root@along ~]# ss -nutl    
  4. Netid State      Recv-Q Send-Q     Local Address:Port                    Peer Address:Port                          
  5. tcp   LISTEN     0      50      ::ffff:127.0.0.1:9092                              :::*                    
  6. tcp   LISTEN     0      50      ::ffff:127.0.0.1:9093                              :::*                                   
  7. tcp   LISTEN     0      50      ::ffff:127.0.0.1:9094                              :::*   

5.3 在集群中進行操作

1)現(xiàn)在創(chuàng)建一個復(fù)制因子為3的新主題my-replicated-topic 

  1. [root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic    
  2. Created topic "my-replicated-topic".   

2)在一個集群中,運行“describe topics”命令查看哪個broker正在做什么 

  1. [root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic    
  2. Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:    
  3.     Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 2,0,1 Isr: 2,0,1     
  4. #注釋:第一行給出了所有分區(qū)的摘要,每個附加行提供有關(guān)一個分區(qū)的信息。由于我們只有一個分區(qū)用于此主題,因此只有一行。     
  5. #“leader”是負責(zé)給定分區(qū)的所有讀取和寫入的節(jié)點。每個節(jié)點將成為隨機選擇的分區(qū)部分的領(lǐng)導(dǎo)者。      
  6. #“replicas”是復(fù)制此分區(qū)日志的節(jié)點列表,無論它們是否為領(lǐng)導(dǎo)者,或者即使它們當(dāng)前處于活動狀態(tài)。    
  7. # “isr”是“同步”復(fù)制品的集合。這是副本列表的子集,該列表當(dāng)前處于活躍狀態(tài)并且已經(jīng)被領(lǐng)導(dǎo)者捕獲。   
  8. #請注意,Leader: 2,在我的示例中,節(jié)點2 是該主題的唯一分區(qū)的Leader。   

3)可以在我們創(chuàng)建的原始主題上運行相同的命令,以查看它的位置 

  1. [root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic along    
  2. Topic:along PartitionCount:1    ReplicationFactor:1 Configs:    
  3.     Topic: along    Partition: 0    Leader: 0   Replicas: 0 Isr: 0   

4)向我們的新主題發(fā)布一些消息: 

  1. [root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic    
  2. >my test message 1    
  3. >my test message 2    
  4. >^C   

5)現(xiàn)在讓我們使用這些消息: 

  1. [root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic    
  2. my test message 1    
  3. my test message 2   

5.4 測試集群的容錯性

1)現(xiàn)在讓我們測試一下容錯性。Broker 2 充當(dāng)leader 所以讓我們殺了它: 

  1. [root@along ~]# ps aux | grep server-2.properties |awk '{print $2}'    
  2. 106737    
  3. [root@along ~]# kill -9 106737    
  4. [root@along ~]# ss -nutl    
  5. tcp   LISTEN     0      50      ::ffff:127.0.0.1:9092                              :::*                       
  6. tcp   LISTEN     0      50      ::ffff:127.0.0.1:9093                              :::*   

2)leader 已切換到其中一個從屬節(jié)點,節(jié)點2不再位于同步副本集中: 

  1. [root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic    
  2. Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:   
  3.     Topic: my-replicated-topic  Partition: 0    Leader: 0   Replicas: 2,0,1 Isr: 0,1   

3)即使最初接受寫入的leader 已經(jīng)失敗,這些消息仍可供消費: 

  1. [root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic  
  2. my test message 1    
  3. my test message 2   

6、使用Kafka Connect導(dǎo)入/導(dǎo)出數(shù)據(jù)

從控制臺寫入數(shù)據(jù)并將其寫回控制臺是一個方便的起點,但有時候可能希望使用其他來源的數(shù)據(jù)或?qū)?shù)據(jù)從Kafka導(dǎo)出到其他系統(tǒng)。對于許多系統(tǒng),您可以使用Kafka Connect導(dǎo)入或?qū)С鰯?shù)據(jù),而不是編寫自定義集成代碼。

Kafka Connect是Kafka附帶的工具,用于向Kafka導(dǎo)入和導(dǎo)出數(shù)據(jù)。它是一個可擴展的工具,運行連接器,實現(xiàn)與外部系統(tǒng)交互的自定義邏輯。在本快速入門中,我們將了解如何使用簡單的連接器運行Kafka Connect,這些連接器將數(shù)據(jù)從文件導(dǎo)入Kafka主題并將數(shù)據(jù)從Kafka主題導(dǎo)出到文件。

1)首先創(chuàng)建一些種子數(shù)據(jù)進行測試: 

  1. [root@along ~]# echo -e "foo\nbar" > test.txt    
  2. 或者在Windows上:    
  3. > echo foo> test.txt    
  4. > echo bar>> test.txt  

2)接下來,啟動兩個以獨立模式運行的連接器,這意味著它們在單個本地專用進程中運行。提供三個配置文件作為參數(shù)。

第一個始終是Kafka Connect流程的配置,包含常見配置,例如要連接的Kafka代理和數(shù)據(jù)的序列化格式。

其余配置文件均指定要創(chuàng)建的連接器。這些文件包括唯一的連接器名稱,要實例化的連接器類以及連接器所需的任何其他配置。 

  1. [root@along ~]# connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties    
  2. [2019-01-16 16:16:31,884] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:67)    
  3. [2019-01-16 16:16:31,903] INFO WorkerInfo values:    
  4. ... ...    
  5. #注:Kafka附帶的這些示例配置文件使用您之前啟動的默認本地群集配置并創(chuàng)建兩個連接器:第一個是源連接器,它從輸入文件讀取行并生成每個Kafka主題,第二個是宿連接器從Kafka主題讀取消息并將每個消息生成為輸出文件中的一行。   

3)驗證是否導(dǎo)入成功(另起終端)

在啟動過程中,您將看到許多日志消息,包括一些指示正在實例化連接器的日志消息。

① 一旦Kafka Connect進程啟動,源連接器應(yīng)該開始從test.txt主題讀取行并將其生成到主題connect-test,并且接收器連接器應(yīng)該開始從主題讀取消息connect-test 并將它們寫入文件test.sink.txt。我們可以通過檢查輸出文件的內(nèi)容來驗證數(shù)據(jù)是否已通過整個管道傳遞: 

  1. [root@along ~]# cat test.sink.txt    
  2. foo    
  3. bar   

② 請注意,數(shù)據(jù)存儲在Kafka主題中connect-test,因此我們還可以運行控制臺使用者來查看主題中的數(shù)據(jù)(或使用自定義使用者代碼來處理它): 

  1. [root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning    
  2. {"schema":{"type":"string","optional":false},"payload":"foo"}    
  3. {"schema":{"type":"string","optional":false},"payload":"bar"}   

4)繼續(xù)追加數(shù)據(jù),驗證 

  1. [root@along ~]# echo Another line>> test.txt        
  2. [root@along ~]# cat test.sink.txt    
  3. foo    
  4. bar    
  5. Another line    
  6. [root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning    
  7. {"schema":{"type":"string","optional":false},"payload":"foo"}    
  8. {"schema":{"type":"string","optional":false},"payload":"bar"}    
  9. {"schema":{"type":"string","optional":false},"payload":"Another line"}  

 

責(zé)任編輯:龐桂玉 來源: Java知音
相關(guān)推薦

2021-11-18 08:09:40

Python爬蟲Python基礎(chǔ)

2024-06-04 12:49:58

神經(jīng)網(wǎng)絡(luò)AI

2018-04-18 10:50:37

Python入門知識點匯總

2018-08-14 13:15:39

UI設(shè)計師網(wǎng)頁設(shè)計師

2025-08-27 03:22:00

AI智能體系統(tǒng)

2017-09-18 10:48:06

深度學(xué)習(xí)零基礎(chǔ)入門

2018-08-31 14:51:30

人工智能深度學(xué)習(xí)機器學(xué)習(xí)

2022-02-10 16:11:26

EventMysql數(shù)據(jù)庫

2020-04-28 10:40:54

Python開發(fā)工具

2018-06-13 10:23:27

編程語言Python數(shù)據(jù)庫

2021-05-29 10:11:00

Kafa數(shù)據(jù)業(yè)務(wù)

2024-01-16 08:09:28

PythonMongoDB數(shù)據(jù)存儲

2022-03-15 09:31:17

ESLint工作原理前端

2017-09-21 15:31:49

2016-11-25 13:05:18

2012-02-22 10:33:36

Wi-Fi

2021-08-27 07:13:52

UI計算機圖形

2020-06-10 10:50:48

C++開發(fā)編程

2021-04-30 11:33:09

Python變量數(shù)據(jù)

2025-08-11 07:41:59

點贊
收藏

51CTO技術(shù)棧公眾號

一区二区三区在线观看欧美| 丝袜美腿亚洲一区| 精品国产成人在线影院| 亚洲中文字幕无码不卡电影| 国产精品久久久久一区二区国产| 久久99精品一区二区三区三区| 欧美成人免费在线观看| 国产毛片毛片毛片毛片毛片毛片| 亚洲天堂1区| 夜夜嗨av一区二区三区中文字幕| 久久一区二区精品| 国产又爽又黄免费软件| 一区在线视频| 最近2019中文免费高清视频观看www99| 麻豆传媒在线看| 欧美大片1688| 亚洲成人午夜影院| 国产高清精品软男同| 少妇一级淫片免费看| 精品一区二区三区久久| 91精品国产91久久久久久吃药 | 久久久久久久性潮| 亚洲成a人v欧美综合天堂下载 | 毛片免费不卡| 久久久亚洲精品石原莉奈| 亚洲最大av在线| 亚洲av无码乱码国产精品fc2| 欧美日韩亚洲一区三区| 中文字幕亚洲欧美日韩2019| 国产精品探花一区二区在线观看| 国产精品高清一区二区 | 国产三级第一页| 天堂va蜜桃一区二区三区漫画版 | 日本免费久久| 亚洲二区在线观看| 久久国产精品免费观看| 国产精品99999| jizz一区二区| 国产精华一区二区三区| 99热精品在线播放| 精品一区二区免费视频| 国产精品国产福利国产秒拍| 国产免费av一区| 99国产精品久久久久久久成人热| 欧美精品免费看| 国产精品suv一区二区88| 国产精品入口久久| 亚洲欧美激情另类校园| 三级黄色片网站| 久久人人爽人人爽人人片av不| 欧美成人精品福利| 黑人无套内谢中国美女| 久久av网站| 7777精品伊人久久久大香线蕉| 欧美婷婷精品激情| 福利一区在线| 色婷婷av一区二区三区gif| 中国丰满人妻videoshd| 欧亚av在线| 色综合久久中文字幕| 成人黄色片视频| 都市激情亚洲一区| 在线观看一区二区视频| 九色porny91| 国产精品久久久久久久久免费高清| 91精品福利视频| 美女喷白浆视频| 免费视频成人| 欧美一区二区精美| www.四虎精品| 亚洲bt欧美bt精品777| 亚洲人高潮女人毛茸茸| 懂色av粉嫩av浪潮av| 91精品国产福利在线观看麻豆| 久久精品99久久久久久久久| 人妻少妇精品一区二区三区| 亚洲黄页一区| 日韩av观看网址| 亚洲一区精品在线观看| 国产中文一区二区三区| 肥熟一91porny丨九色丨| 午夜在线观看视频18| 久久久久久久性| 精品少妇人妻av一区二区| 免费污视频在线观看| 欧美午夜激情小视频| 五月天亚洲视频| 一区二区三区高清在线观看| 精品五月天久久| 一级片黄色录像| 国内精品久久久久久久影视麻豆 | 国产超碰人人爽人人做人人爱| 久久精品九九| 91久久中文字幕| 婷婷色在线视频| 中文字幕一区在线观看视频| 欧美久久在线观看| 国产亚洲精彩久久| 亚洲第一av在线| 色噜噜噜噜噜噜| 亚洲国产1区| 中文字幕日韩欧美一区二区三区| 亚洲码在线观看| 国产传媒在线看| 激情欧美一区二区三区| 国产精品久久99久久| 99热这里只有精品9| 久久综合色播五月| 亚洲国产一二三精品无码| 肉色欧美久久久久久久免费看| 777色狠狠一区二区三区| 噜噜噜在线视频| 欧美精品18| 国产精品黄页免费高清在线观看| 亚洲黄色在线观看视频| 国产欧美视频一区二区| 国产中文字幕二区| 欧美成人精品一级| 国产亚洲一区二区精品| 久久精品女人毛片国产| 紧缚奴在线一区二区三区| 久久综合婷婷综合| 波多野结依一区| 欧美一级理论片| 精品一区二区三孕妇视频| 亚洲欧美日韩综合国产aⅴ| 亚洲aⅴ男人的天堂在线观看| 黄色av网站在线免费观看| 亚洲大片在线观看| 久久久久99人妻一区二区三区 | 亚洲午夜激情| 桃花岛成人影院| 亚洲国产欧美一区| 久久久99精品| 国产伦精品一区二区三区视频青涩 | 久久久精品国产亚洲| 中文字幕一区二区免费| 久久嫩草精品久久久精品| cao在线观看| 日本免费一区二区三区视频| 久久视频在线观看免费| 亚洲天堂视频网| 亚洲国产激情av| 欧美伦理视频在线观看| 亚洲男人都懂第一日本| 91av成人在线| 视频国产在线观看| 精品成人在线视频| 国产精品无码永久免费不卡| 国产农村妇女精品一二区| 九九久久99| 黄色在线网站噜噜噜| 亚洲精品电影网在线观看| 日韩成人免费在线观看| 99精品桃花视频在线观看| 欧美三级在线观看视频| 国产精品自在线拍| 18性欧美xxxⅹ性满足| 日本亚洲一区| 日本韩国欧美国产| 女人黄色一级片| 老司机精品视频导航| 一级黄色免费在线观看| 成人噜噜噜噜| 欧美日韩不卡合集视频| 欧美视频久久久| 欧美性生交大片免费| 亚洲一级中文字幕| 日韩黄色小视频| 亚洲欧洲另类精品久久综合| 99精品女人在线观看免费视频| 久久精品人人爽| 囯产精品久久久久久| 精品久久久久久国产| 三上悠亚ssⅰn939无码播放| 日本视频中文字幕一区二区三区| 亚洲人成人77777线观看| 91精品一区| 久久久免费电影| 九色国产在线观看| 欧美日韩的一区二区| 一级黄色录像视频| 99re热视频精品| 污污网站在线观看视频| 欧美日韩一视频区二区| 欧美日韩另类丝袜其他| 日韩欧美专区| 97国产精品视频| 91sp网站在线观看入口| 欧美va在线播放| 国产一级片av| 一区二区三区在线免费观看| 欧美性xxxx图片| 九九热在线视频观看这里只有精品| 欧美 日韩 国产精品| 免费一区二区| 亚洲v日韩v综合v精品v| 不卡一二三区| 欧美成人免费大片| 第一福利在线| 亚洲国产另类久久精品| 亚洲图片视频小说| 天天做天天摸天天爽国产一区| 中国美女黄色一级片| av一本久道久久综合久久鬼色| 久久婷婷综合色| 国产欧美短视频| www.亚洲一区二区| 国产一区二区观看| 国产日本一区二区三区| 欧美黄页在线免费观看| 97在线视频精品| 在线h片观看| 在线播放国产一区二区三区| 内射无码专区久久亚洲| 欧美一区二区在线播放| 国产美女www爽爽爽| 亚洲成国产人片在线观看| 日韩一卡二卡在线观看| 国产亚洲精品中文字幕| 水蜜桃av无码| 丰满白嫩尤物一区二区| 国产福利精品一区二区三区| 久久av最新网址| 国产精品入口芒果| 欧美日韩免费| 2021狠狠干| 999国产精品| 亚洲日本无吗高清不卡| 精品一级毛片| 日本高清不卡三区| 国产精品欧美在线观看| 久久综合九色99| 西野翔中文久久精品字幕| 精品无人乱码一区二区三区的优势| 欧美日韩国产一区二区在线观看| 91亚洲精品一区二区| 96视频在线观看欧美| 国产原创欧美精品| 欧美日韩卡一| 成人黄色免费看| 羞羞视频在线观看一区二区| 国产欧美日韩精品专区| 成人亚洲免费| 国产九九精品视频| 婷婷成人av| 91久久久在线| 一区二区三区在线免费看| 2019国产精品视频| ccyy激情综合| 精品久久sese| 精品在线播放| 亚洲一区二区三区免费观看| 欧美成人自拍| 黄黄视频在线观看| 激情婷婷亚洲| 国产中文字幕乱人伦在线观看| 亚洲国产免费| 精品一卡二卡三卡| 久久综合图片| 99热这里只有精品在线播放| 日韩国产欧美在线播放| 97公开免费视频| 精一区二区三区| 亚洲最大天堂网| 国产欧美91| 日本www.色| 日韩成人免费电影| 四虎成人在线播放| 国产福利一区二区三区视频 | 国产午夜精品麻豆| 偷拍25位美女撒尿视频在线观看| 国产日韩欧美一区| www.av91| 亚洲欧洲午夜| 能在线观看的av| 亚洲免费综合| 日韩精品免费播放| 26uuu精品一区二区三区四区在线 26uuu精品一区二区在线观看 | 麻豆成人在线看| 男人和女人做事情在线视频网站免费观看| 国产亚洲视频在线| 国内精品久久久久国产| 久久全国免费视频| 成人一区福利| 成人黄色免费网站在线观看| 伊人久久噜噜噜躁狠狠躁| 国产一区二区不卡视频| 美女毛片一区二区三区四区| 青少年xxxxx性开放hg| 欧美激情日韩| 99蜜桃臀久久久欧美精品网站| 久久99九九99精品| 伊人久久一区二区三区| 91影院在线观看| 翔田千里88av中文字幕| 亚洲成av人在线观看| 日本高清www免费视频| 色视频一区二区| 超碰在线播放97| 亚洲四色影视在线观看| 国产精品一区二区三区视频网站| 26uuu国产精品视频| 亚洲综合123| 国产美女娇喘av呻吟久久| 在线免费看黄色片| 国产欧美日韩卡一| 日韩激情在线播放| 欧美区在线观看| 婷婷国产在线| 欧美精品在线免费| 向日葵视频成人app网址| 成人av播放| 久久理论电影| 亚洲色成人一区二区三区小说| 狠狠色狠狠色合久久伊人| 亚洲av无码一区二区三区网址 | 91视频免费看片| 欧美性猛交xxxx富婆弯腰| 国产美女主播在线观看| 亚洲国内高清视频| 超碰在线最新| 国产精品扒开腿爽爽爽视频| 亚洲精品动态| 欧美国产日韩激情| 国产乱人伦偷精品视频不卡| 国产三级aaa| 欧美色综合久久| 男人av在线| 国产成人91久久精品| 另类在线视频| 国产精品69久久久| 国产精品一区二区黑丝| 国产性猛交xx乱| 欧美性猛片aaaaaaa做受| 亚洲欧美丝袜中文综合| 欧美人与性动交| 国产色99精品9i| 中文字幕欧美日韩一区二区| 久久精品国产精品亚洲综合| 91网站免费视频| 日韩欧美999| 国产一二三区在线| 欧美中文字幕在线| 欧美大胆视频| 激情综合网婷婷| 91理论电影在线观看| 久草视频一区二区| 亚洲精品一区二三区不卡| 97天天综合网| 久久av一区二区三区漫画| 一区在线播放| 初高中福利视频网站| 亚洲欧洲制服丝袜| 国产精品高潮呻吟AV无码| 久久亚洲春色中文字幕| 自拍偷拍亚洲图片| 久久免费视频2| 国产一区二区三区免费看| 午夜国产小视频| 精品区一区二区| 色呦呦在线免费观看| 91精品视频在线看| 一区视频在线| 99久久人妻精品免费二区| 亚洲国产欧美在线人成| 亚洲三区在线观看无套内射| 欧美极品少妇xxxxx| 神马香蕉久久| 人妻丰满熟妇av无码区app| 国产精品久久久久三级| 国产精品福利电影| 日韩中文字幕精品视频| 成人国产精品久久| 国产传媒久久久| 久久久久久免费毛片精品| 姑娘第5集在线观看免费好剧| 亚洲美女动态图120秒| 99久久婷婷国产综合精品首页 | 欧美福利电影在线观看| 任你躁av一区二区三区| 色先锋aa成人| 91网页在线观看| 国产区一区二区三区| 久久精品亚洲| 亚洲精品成人av久久| 日韩午夜电影在线观看| 密臀av在线播放| 国产精品夜夜夜爽张柏芝| 国产99久久久久| 久久久久久久黄色片| 在线精品国产欧美| 九九九九九九精品任你躁| 四虎永久在线精品无码视频| 欧美国产日韩亚洲一区| 后进极品白嫩翘臀在线视频| 日本高清久久天堂| 亚洲经典一区| 久久只有这里有精品| 日韩欧美一级精品久久|