精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Apache Flink 漫談系列(15) - DataStream Connectors之Kafka

開發 開發工具 Kafka
為了滿足本系列讀者的需求,我先介紹一下Kafka在Apache Flink中的使用。所以本篇以一個簡單的示例,向大家介紹在Apache Flink中如何使用Kafka。

一、聊什么

為了滿足本系列讀者的需求,我先介紹一下Kafka在Apache Flink中的使用。所以本篇以一個簡單的示例,向大家介紹在Apache Flink中如何使用Kafka。

二、Kafka 簡介

Apache Kafka是一個分布式發布-訂閱消息傳遞系統。 它最初由LinkedIn公司開發,LinkedIn于2010年貢獻給了Apache基金會并成為***開源項目。Kafka用于構建實時數據管道和流式應用程序。它具有水平擴展性、容錯性、極快的速度,目前也得到了廣泛的應用。

Kafka不但是分布式消息系統而且也支持流式計算,所以在介紹Kafka在Apache Flink中的應用之前,先以一個Kafka的簡單示例直觀了解什么是Kafka。

1. 安裝

本篇不是系統的,詳盡的介紹Kafka,而是想讓大家直觀認識Kafka,以便在Apahe Flink中進行很好的應用,所以我們以最簡單的方式安裝Kafka。

(1) 下載二進制包:

  1. curl -L -O http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz 

(2) 解壓安裝

Kafka安裝只需要將下載的tgz解壓即可,如下:

  1. jincheng:kafka jincheng.sunjc$ tar -zxf kafka_2.11-2.1.0.tgz 
  2. jincheng:kafka jincheng.sunjc$ cd kafka_2.11-2.1.0 
  3. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ ls 
  4. LICENSE        NOTICE        bin        config        libs        site-docs 

其中bin包含了所有Kafka的管理命令,如接下來我們要啟動的Kafka的Server。

(3) 啟動Kafka Server

Kafka是一個發布訂閱系統,消息訂閱首先要有個服務存在。我們啟動一個Kafka Server 實例。 Kafka需要使用ZooKeeper,要進行投產部署我們需要安裝ZooKeeper集群,這不在本篇的介紹范圍內,所以我們利用Kafka提供的腳本,安裝一個只有一個節點的ZooKeeper實例。如下:

  1. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/zookeeper-server-start.sh config/zookeeper.properties & 
  2.  
  3. [2019-01-13 09:06:19,985] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) 
  4. .... 
  5. .... 
  6. [2019-01-13 09:06:20,061] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory) 

啟動之后,ZooKeeper會綁定2181端口(默認)。接下來我們啟動Kafka Server,如下:

  1. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-server-start.sh config/server.properties 
  2. [2019-01-13 09:09:16,937] INFO Registered kafkakafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) 
  3. [2019-01-13 09:09:17,267] INFO starting (kafka.server.KafkaServer) 
  4. [2019-01-13 09:09:17,267] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer) 
  5. [2019-01-13 09:09:17,284] INFO [ZooKeeperClient] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient) 
  6. ... 
  7. ... 
  8. [2019-01-13 09:09:18,253] INFO [KafkaServer id=0] started (kafka.server.KafkaServer) 

如果上面一切順利,Kafka的安裝就完成了。

2. 創建Topic

Kafka是消息訂閱系統,首先創建可以被訂閱的Topic,我們創建一個名為flink-tipic的Topic,在一個新的terminal中,執行如下命令:

  1. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic 
  2.  
  3. Created topic "flink-tipic". 

在Kafka Server的terminal中也會輸出如下成功創建信息:

  1. ... 
  2. [2019-01-13 09:13:31,156] INFO Created log for partition flink-tipic-0 in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 2.1-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)... 

上面顯示了flink-topic的基本屬性配置,如消息壓縮方式,消息格式,備份數量等等。

除了看日志,我們可以用命令顯示的查詢我們是否成功的創建了flink-topic,如下:

  1. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --list --zookeeper localhost:2181 
  2.  
  3. flink-tipic 

如果輸出flink-tipic那么說明我們的Topic成功創建了。

那么Topic是保存在哪里?Kafka是怎樣進行消息的發布和訂閱的呢?為了直觀,我們看如下Kafka架構示意圖簡單理解一下:

簡單介紹一下,Kafka利用ZooKeeper來存儲集群信息,也就是上面我們啟動的Kafka Server 實例,一個集群中可以有多個Kafka Server 實例,Kafka Server叫做Broker,我們創建的Topic可以在一個或多個Broker中。Kafka利用Push模式發送消息,利用Pull方式拉取消息。

3. 發送消息

如何向已經存在的Topic中發送消息呢,當然我們可以API的方式編寫代碼發送消息。同時,還可以利用命令方式來便捷的發送消息,如下:

  1. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-topic 
  2. >Kafka test msg 
  3. >Kafka connector 

上面我們發送了兩條消息Kafka test msg 和 Kafka connector 到 flink-topic Topic中。

4. 讀取消息

如果讀取指定Topic的消息呢?同樣可以API和命令兩種方式都可以完成,我們以命令方式讀取flink-topic的消息,如下:

  1. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink-topic --from-beginning 
  2. Kafka test msg 
  3. Kafka connector 

其中--from-beginning 描述了我們從Topic開始位置讀取消息。

三、Flink Kafka Connector

前面我們以最簡單的方式安裝了Kafka環境,那么我們以上面的環境介紹Flink Kafka Connector的使用。Flink Connector相關的基礎知識會在《Apache Flink 漫談系列(14) - Connectors》中介紹,這里我們直接介紹與Kafka Connector相關的內容。

Apache Flink 中提供了多個版本的Kafka Connector,本篇以flink-1.7.0版本為例進行介紹。

1. mvn 依賴

要使用Kakfa Connector需要在我們的pom中增加對Kafka Connector的依賴,如下:

  1. <dependency> 
  2. <groupId>org.apache.flink</groupId> 
  3. <artifactId>flink-connector-kafka_2.11</artifactId> 
  4. <version>1.7.0</version> 
  5. </dependency> 

Flink Kafka Consumer需要知道如何將Kafka中的二進制數據轉換為Java / Scala對象。 DeserializationSchema允許用戶指定這樣的模式。 為每個Kafka消息調用 T deserialize(byte [] message)方法,從Kafka傳遞值。

2. Examples

我們示例讀取Kafka的數據,再將數據做簡單處理之后寫入到Kafka中。我們需要再創建一個用于寫入的Topic,如下:

  1. bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic-output 

所以示例中我們Source利用flink-topic, Sink用slink-topic-output。

(1) Simple ETL

我們假設Kafka中存儲的就是一個簡單的字符串,所以我們需要一個用于對字符串進行serialize和deserialize的實現,也就是我們要定義一個實現DeserializationSchema和SerializationSchema 的序列化和反序列化的類。因為我們示例中是字符串,所以我們自定義一個KafkaMsgSchema實現類,然后在編寫Flink主程序。

  • KafkaMsgSchema - 完整代碼
    1. import org.apache.flink.api.common.serialization.DeserializationSchema; 
    2. import org.apache.flink.api.common.serialization.SerializationSchema; 
    3. import org.apache.flink.api.common.typeinfo.BasicTypeInfo; 
    4. import org.apache.flink.api.common.typeinfo.TypeInformation; 
    5. import org.apache.flink.util.Preconditions; 
    6.  
    7. import java.io.IOException; 
    8. import java.io.ObjectInputStream; 
    9. import java.io.ObjectOutputStream; 
    10. import java.nio.charset.Charset; 
    11.  
    12. public class KafkaMsgSchema implements DeserializationSchema<String>, SerializationSchema<String> { 
    13.     private static final long serialVersionUID = 1L
    14.     private transient Charset charset; 
    15.  
    16.     public KafkaMsgSchema() { 
    17. // 默認UTF-8編碼 
    18.         this(Charset.forName("UTF-8")); 
    19.     } 
    20.  
    21.     public KafkaMsgSchema(Charset charset) { 
    22.         this.charset = Preconditions.checkNotNull(charset); 
    23.     } 
    24.  
    25.     public Charset getCharset() { 
    26.         return this.charset; 
    27.     } 
    28.  
    29.     public String deserialize(byte[] message) { 
    30. // 將Kafka的消息反序列化為java對象 
    31.         return new String(message, charset); 
    32.     } 
    33.  
    34.     public boolean isEndOfStream(String nextElement) { 
    35. // 流永遠不結束 
    36.         return false; 
    37.     } 
    38.  
    39.     public byte[] serialize(String element) { 
    40. // 將java對象序列化為Kafka的消息 
    41.         return element.getBytes(this.charset); 
    42.     } 
    43.  
    44.     public TypeInformation<String> getProducedType() { 
    45. // 定義產生的數據Typeinfo 
    46.         return BasicTypeInfo.STRING_TYPE_INFO; 
    47.     } 
    48.  
    49.     private void writeObject(ObjectOutputStream out) throws IOException { 
    50.         out.defaultWriteObject(); 
    51.         out.writeUTF(this.charset.name()); 
    52.     } 
    53.  
    54.     private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { 
    55.         in.defaultReadObject(); 
    56.         String charsetName = in.readUTF(); 
    57.         this.charset = Charset.forName(charsetName); 
    58.     } 
  • 主程序 - 完整代碼
    1. import org.apache.flink.api.common.functions.MapFunction; 
    2. import org.apache.flink.api.java.utils.ParameterTool; 
    3. import org.apache.flink.streaming.api.datastream.DataStream; 
    4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
    5. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; 
    6. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; 
    7. import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; 
    8.  
    9. import java.util.Properties; 
    10.  
    11. public class KafkaExample { 
    12.     public static void main(String[] args) throws Exception { 
    13.         // 用戶參數獲取 
    14.         final ParameterTool parameterTool = ParameterTool.fromArgs(args); 
    15.         // Stream 環境 
    16.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    17.  
    18.         // Source的topic 
    19.         String sourceTopic = "flink-topic"
    20.         // Sink的topic 
    21.         String sinkTopic = "flink-topic-output"
    22.         // broker 地址 
    23.         String broker = "localhost:9092"
    24.  
    25.         // 屬性參數 - 實際投產可以在命令行傳入 
    26.         Properties p = parameterTool.getProperties(); 
    27.         p.putAll(parameterTool.getProperties()); 
    28.         p.put("bootstrap.servers", broker); 
    29.  
    30.         env.getConfig().setGlobalJobParameters(parameterTool); 
    31.  
    32.         // 創建消費者 
    33.         FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>
    34.                 sourceTopic, 
    35.                 new KafkaMsgSchema(), 
    36.                 p); 
    37.         // 設置讀取最早的數據 
    38. //        consumer.setStartFromEarliest(); 
    39.  
    40.         // 讀取Kafka消息 
    41.         DataStream<String> input = env.addSource(consumer); 
    42.  
    43.  
    44.         // 數據處理 
    45.         DataStream<String> result = input.map(new MapFunction<String, String>() { 
    46.             public String map(String s) throws Exception { 
    47.                 String msg = "Flink study ".concat(s); 
    48.                 System.out.println(msg); 
    49.                 return msg; 
    50.             } 
    51.         }); 
    52.  
    53.         // 創建生產者 
    54.         FlinkKafkaProducer producer = new FlinkKafkaProducer<String>
    55.                 sinkTopic, 
    56.                 new KeyedSerializationSchemaWrapper<String>(new KafkaMsgSchema()), 
    57.                 p, 
    58.                 FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); 
    59.  
    60.         // 將數據寫入Kafka指定Topic中 
    61.         result.addSink(producer); 
    62.  
    63.         // 執行job 
    64.         env.execute("Kafka Example"); 
    65.     } 

運行主程序如下:

我測試操作的過程如下:

  • 啟動flink-topic和flink-topic-output的消費拉取;
  • 通過命令向flink-topic中添加測試消息only for test;
  • 通過命令打印驗證添加的測試消息 only for test;
  • 最簡單的FlinkJob source->map->sink 對測試消息進行map處理:"Flink study ".concat(s);
  • 通過命令打印sink的數據;

(2) 內置Schemas

Apache Flink 內部提供了如下3種內置的常用消息格式的Schemas:

  • TypeInformationSerializationSchema (and TypeInformationKeyValueSerializationSchema) 它基于Flink的TypeInformation創建模式。 如果數據由Flink寫入和讀取,這將非常有用。
  • JsonDeserializationSchema (and JSONKeyValueDeserializationSchema) 它將序列化的JSON轉換為ObjectNode對象,可以使用objectNode.get(“field”)作為(Int / String / ...)()從中訪問字段。 KeyValue objectNode包含“key”和“value”字段,其中包含所有字段以及可選的"metadata"字段,該字段公開此消息的偏移量/分區/主題。
  • AvroDeserializationSchema 它使用靜態提供的模式讀取使用Avro格式序列化的數據。 它可以從Avro生成的類(AvroDeserializationSchema.forSpecific(...))推斷出模式,或者它可以與GenericRecords一起使用手動提供的模式(使用AvroDeserializationSchema.forGeneric(...))

要使用內置的Schemas需要添加如下依賴:

  1. <dependency> 
  2. <groupId>org.apache.flink</groupId> 
  3. <artifactId>flink-avro</artifactId> 
  4. <version>1.7.0</version> 
  5. </dependency> 

(3) 讀取位置配置

我們在消費Kafka數據時候,可能需要指定消費的位置,Apache Flink 的FlinkKafkaConsumer提供很多便利的位置設置,如下:

  • consumer.setStartFromEarliest() - 從最早的記錄開始;
  • consumer.setStartFromLatest() - 從***記錄開始;
  • consumer.setStartFromTimestamp(...); // 從指定的epoch時間戳(毫秒)開始;
  • consumer.setStartFromGroupOffsets(); // 默認行為,從上次消費的偏移量進行繼續消費。

上面的位置指定可以精確到每個分區,比如如下代碼:

  1. Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>(); 
  2. specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); // ***個分區從23L開始 
  3. specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);// 第二個分區從31L開始 
  4. specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);// 第三個分區從43L開始 
  5.  
  6. consumer.setStartFromSpecificOffsets(specificStartOffsets); 

對于沒有指定的分區還是默認的setStartFromGroupOffsets方式。

(4) Topic發現

Kafka支持Topic自動發現,也就是用正則的方式創建FlinkKafkaConsumer,比如:

  1. // 創建消費者 
  2. FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(            java.util.regex.Pattern.compile(sourceTopic.concat("-[0-9]")), 
  3. new KafkaMsgSchema(), 
  4. p); 

在上面的示例中,當作業開始運行時,消費者將訂閱名稱與指定正則表達式匹配的所有Topic(以sourceTopic的值開頭并以單個數字結尾)。

3. 定義Watermark(Window)

對Kafka Connector的應用不僅限于上面的簡單數據提取,我們更多時候是期望對Kafka數據進行Event-time的窗口操作,那么就需要在Flink Kafka Source中定義Watermark。

要定義Event-time,首先是Kafka數據里面攜帶時間屬性,假設我們數據是String#Long的格式,如only for test#1000。那么我們將Long作為時間列。

  • KafkaWithTsMsgSchema - 完整代碼

要想解析上面的Kafka的數據格式,我們需要開發一個自定義的Schema,比如叫KafkaWithTsMsgSchema,將String#Long解析為一個Java的Tuple2

  1. import org.apache.flink.api.common.serialization.DeserializationSchema; 
  2. import org.apache.flink.api.common.serialization.SerializationSchema; 
  3. import org.apache.flink.api.common.typeinfo.BasicTypeInfo; 
  4. import org.apache.flink.api.common.typeinfo.TypeInformation; 
  5. import org.apache.flink.api.java.tuple.Tuple2; 
  6. import org.apache.flink.api.java.typeutils.TupleTypeInfo; 
  7. import org.apache.flink.util.Preconditions; 
  8.  
  9. import java.io.IOException; 
  10. import java.io.ObjectInputStream; 
  11. import java.io.ObjectOutputStream; 
  12. import java.nio.charset.Charset; 
  13.  
  14. public class KafkaWithTsMsgSchema implements DeserializationSchema<Tuple2<String, Long>>, SerializationSchema<Tuple2<String, Long>> { 
  15.     private static final long serialVersionUID = 1L
  16.     private transient Charset charset; 
  17.  
  18.     public KafkaWithTsMsgSchema() { 
  19.         this(Charset.forName("UTF-8")); 
  20.     } 
  21.  
  22.     public KafkaWithTsMsgSchema(Charset charset) { 
  23.         this.charset = Preconditions.checkNotNull(charset); 
  24.     } 
  25.  
  26.     public Charset getCharset() { 
  27.         return this.charset; 
  28.     } 
  29.  
  30.     public Tuple2<String, Long> deserialize(byte[] message) { 
  31.         String msg = new String(message, charset); 
  32.         String[] dataAndTs = msg.split("#"); 
  33.         if(dataAndTs.length == 2){ 
  34.             return new Tuple2<String, Long>(dataAndTs[0], Long.parseLong(dataAndTs[1].trim())); 
  35.         }else{ 
  36.             // 實際生產上需要拋出runtime異常 
  37.             System.out.println("Fail due to invalid msg format.. ["+msg+"]"); 
  38.             return new Tuple2<String, Long>(msg, 0L); 
  39.         } 
  40.     } 
  41.  
  42.     @Override 
  43.     public boolean isEndOfStream(Tuple2<String, Long> stringLongTuple2) { 
  44.         return false; 
  45.     } 
  46.  
  47.     public byte[] serialize(Tuple2<String, Long> element) { 
  48.         return "MAX - ".concat(element.f0).concat("#").concat(String.valueOf(element.f1)).getBytes(this.charset); 
  49.     } 
  50.  
  51.     private void writeObject(ObjectOutputStream out) throws IOException { 
  52.         out.defaultWriteObject(); 
  53.         out.writeUTF(this.charset.name()); 
  54.     } 
  55.  
  56.     private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { 
  57.         in.defaultReadObject(); 
  58.         String charsetName = in.readUTF(); 
  59.         this.charset = Charset.forName(charsetName); 
  60.     } 
  61.  
  62.     @Override 
  63.     public TypeInformation<Tuple2<String, Long>> getProducedType() { 
  64.         return new TupleTypeInfo<Tuple2<String, Long>>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); 
  65.     }} 
  • Watermark生成

提取時間戳和創建Watermark,需要實現一個自定義的時間提取和Watermark生成器。在Apache Flink 內部有2種方式如下:

  • AssignerWithPunctuatedWatermarks - 每條記錄都產生Watermark。
  • AssignerWithPeriodicWatermarks - 周期性的生成Watermark。

我們以AssignerWithPunctuatedWatermarks為例寫一個自定義的時間提取和Watermark生成器。代碼如下:

  1. import org.apache.flink.api.java.tuple.Tuple2; 
  2. import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; 
  3. import org.apache.flink.streaming.api.watermark.Watermark; 
  4.  
  5. import javax.annotation.Nullable; 
  6.  
  7. public class KafkaAssignerWithPunctuatedWatermarks 
  8.         implements AssignerWithPunctuatedWatermarks<Tuple2<String, Long>> { 
  9.     @Nullable 
  10.     @Override 
  11.     public Watermark checkAndGetNextWatermark(Tuple2<String, Long> o, long l) { 
  12. // 利用提取的時間戳創建Watermark 
  13.         return new Watermark(l); 
  14.     } 
  15.  
  16.     @Override 
  17.     public long extractTimestamp(Tuple2<String, Long> o, long l) { 
  18. // 提取時間戳 
  19.         return o.f1; 
  20.     }} 

主程序 - 完整程序

我們計算一個大小為1秒的Tumble窗口,計算窗口內***的值。完整的程序如下:

  1. import org.apache.flink.api.common.typeinfo.BasicTypeInfo; 
  2. import org.apache.flink.api.common.typeinfo.TypeInformation; 
  3. import org.apache.flink.api.java.tuple.Tuple2; 
  4. import org.apache.flink.api.java.typeutils.TupleTypeInfo; 
  5. import org.apache.flink.api.java.utils.ParameterTool; 
  6. import org.apache.flink.streaming.api.TimeCharacteristic; 
  7. import org.apache.flink.streaming.api.datastream.DataStream; 
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
  9. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; 
  10. import org.apache.flink.streaming.api.windowing.time.Time; 
  11. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; 
  12. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; 
  13. import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; 
  14.  
  15. import java.util.Properties; 
  16.  
  17. public class KafkaWithEventTimeExample { 
  18.     public static void main(String[] args) throws Exception { 
  19.         // 用戶參數獲取 
  20.         final ParameterTool parameterTool = ParameterTool.fromArgs(args); 
  21.         // Stream 環境 
  22.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
  23.         // 設置 Event-time 
  24.         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
  25.  
  26.         // Source的topic 
  27.         String sourceTopic = "flink-topic"
  28.         // Sink的topic 
  29.         String sinkTopic = "flink-topic-output"
  30.         // broker 地址 
  31.         String broker = "localhost:9092"
  32.  
  33.         // 屬性參數 - 實際投產可以在命令行傳入 
  34.         Properties p = parameterTool.getProperties(); 
  35.         p.putAll(parameterTool.getProperties()); 
  36.         p.put("bootstrap.servers", broker); 
  37.  
  38.         env.getConfig().setGlobalJobParameters(parameterTool); 
  39.         // 創建消費者 
  40.         FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<Tuple2<String, Long>>
  41.                 sourceTopic, 
  42.                 new KafkaWithTsMsgSchema(), 
  43.                 p); 
  44.  
  45.         // 讀取Kafka消息 
  46.         TypeInformation<Tuple2<String, Long>> typeInformation = new TupleTypeInfo<Tuple2<String, Long>>
  47.                 BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); 
  48.  
  49.         DataStream<Tuple2<String, Long>> input = env 
  50.                 .addSource(consumer).returns(typeInformation) 
  51.                 // 提取時間戳,并生產Watermark 
  52.                 .assignTimestampsAndWatermarks(new KafkaAssignerWithPunctuatedWatermarks()); 
  53.  
  54.         // 數據處理 
  55.         DataStream<Tuple2<String, Long>> result = input 
  56.                 .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))) 
  57.                 .max(0); 
  58.  
  59.         // 創建生產者 
  60.         FlinkKafkaProducer producer = new FlinkKafkaProducer<Tuple2<String, Long>>
  61.                 sinkTopic, 
  62.                 new KeyedSerializationSchemaWrapper<Tuple2<String, Long>>(new KafkaWithTsMsgSchema()), 
  63.                 p, 
  64.                 FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); 
  65.  
  66.         // 將數據寫入Kafka指定Topic中 
  67.         result.addSink(producer); 
  68.  
  69.         // 執行job 
  70.         env.execute("Kafka With Event-time Example"); 
  71.     }} 

測試運行如下:

簡單解釋一下,我們輸入數如下:

我們看的5000000~7000000之間的數據,其中B#5000000, C#5000100和E#5000120是同一個窗口的內容。計算MAX值,按字符串比較,***的消息就是輸出的E#5000120。

4. Kafka攜帶Timestamps

在Kafka-0.10+ 消息可以攜帶timestamps,也就是說不用單獨的在msg中顯示添加一個數據列作為timestamps。只有在寫入和讀取都用Flink時候簡單一些。一般情況用上面的示例方式已經足夠了。

四、小結

本篇重點是向大家介紹Kafka如何在Flink中進行應用,開篇介紹了Kafka的簡單安裝和收發消息的命令演示,然后以一個簡單的數據提取和一個Event-time的窗口示例讓大家直觀的感受如何在Apache Flink中使用Kafka。愿介紹的內容對您有所幫助!

關于點贊和評論

本系列文章難免有很多缺陷和不足,真誠希望讀者對有收獲的篇章給予點贊鼓勵,對有不足的篇章給予反饋和建議,先行感謝大家!

作者:孫金城,花名 金竹,目前就職于阿里巴巴,自2015年以來一直投入于基于Apache Flink的阿里巴巴計算平臺Blink的設計研發工作。

【本文為51CTO專欄作者“金竹”原創稿件,轉載請聯系原作者】

戳這里,看該作者更多好文

責任編輯:趙寧寧 來源: 51CTO專欄
相關推薦

2022-06-10 17:26:07

數據集計算

2018-09-26 08:44:22

Apache Flin流計算計算模式

2018-09-26 07:50:52

Apache Flin流計算計算模式

2018-10-16 08:54:35

Apache Flin流計算State

2018-10-09 10:55:52

Apache FlinWatermark流計算

2022-07-13 12:53:59

數據存儲

2018-11-14 09:01:23

Apache FlinSQL代碼

2018-10-22 21:43:39

Apache Flin流計算Fault Toler

2018-11-20 07:59:43

Apache Flin JOIN算子代碼

2018-11-29 09:01:26

Apache FlinJOIN代碼

2019-01-03 10:17:53

Apache FlinTable API代碼

2018-12-11 17:28:22

Apache FlinJOIN代碼

2022-07-13 13:03:29

流計算亂序

2022-07-12 10:38:25

分布式框架

2018-11-07 08:48:31

Apache Flin持續查詢流計算

2018-10-30 14:08:45

Apache Flin流表對偶duality

2018-12-29 08:16:32

Apache FlinJOIN代碼

2012-05-18 15:52:49

JavaApacheTomcat

2023-12-11 08:00:00

架構FlinkDruid

2020-12-18 05:53:57

SQL
點贊
收藏

51CTO技術棧公眾號

国产乱人伦精品一区| 欧美人与禽猛交乱配| 久热精品在线| 久久手机免费视频| 亚洲啪av永久无码精品放毛片 | 国产精品久久久久久久久久久久久| 亚洲无人区码一码二码三码的含义| 国产精品99久久免费| 精品久久久久久久久中文字幕| 亚洲国产一区在线| 秋霞欧美在线观看| 美女视频免费一区| 午夜精品一区二区三区在线视频| 国产三级短视频| 精品成人自拍视频| 欧美老女人在线| 国产乱子伦农村叉叉叉| 黄色动漫在线观看| 国产日产亚洲精品系列| 国产精品一区二区不卡视频| 中文字幕 欧美激情| 一区精品久久| 久久国产精品视频| 长河落日免费高清观看| 色综合中文网| 亚洲国产精品推荐| 天天爽夜夜爽视频| 久久三级毛片| 色婷婷av一区二区三区之一色屋| 国产xxxx振车| 91精品久久久久久粉嫩| 国产日韩欧美精品在线| 精品欧美一区二区久久久伦| 精品乱子伦一区二区| 久久精品久久综合| 国产精品专区h在线观看| 伦av综合一区| 国产精品毛片在线| 91精品国产91久久久久福利| 国产一级片视频| 正在播放日韩欧美一页| 日韩中文字幕在线| 色婷婷国产精品免| 精品久久久久久久| 在线观看日韩视频| 五月天精品在线| 精品国产一区二区三区小蝌蚪 | 日本在线www| 国产精品免费视频观看| 日韩欧美电影一区二区| 噜噜噜噜噜在线视频| 91蝌蚪porny| 欧美日韩成人一区二区三区 | 欧美丰满老妇厨房牲生活 | 欧美视频不卡中文| 免费观看美女裸体网站| eeuss鲁一区二区三区| 亚洲一区在线免费观看| 免费看日本黄色| 女子免费在线观看视频www| 亚洲综合激情网| 国产精品无码电影在线观看| 24小时免费看片在线观看| 亚洲国产婷婷综合在线精品| 国产69精品久久久久999小说| 成人女同在线观看| 天天影视网天天综合色在线播放| 国产一区二区在线视频播放| 亚洲天堂资源| 欧美性生活一区| 婷婷免费在线观看| 国产精品中文| 亚洲丁香久久久| 亚洲熟妇无码av| 日韩1区2区| 欧美精品在线极品| 日韩熟女精品一区二区三区| 久久综合九色| 国产日韩精品在线观看| 99热这里只有精品99| 成人精品视频一区| 开心色怡人综合网站| av中文字幕在线| 亚洲精品国产a久久久久久 | 日本一区二区精品| 无遮挡的视频在线观看| 一区二区三区四区精品在线视频 | 亚洲国内精品在线| 谁有免费的黄色网址| 亚洲精品888| 日av在线播放中文不卡| 91tv国产成人福利| 99热在这里有精品免费| 亚洲午夜精品久久久久久浪潮| av在线下载| 色香色香欲天天天影视综合网| 免费av不卡在线| 欧美日韩破处| 北条麻妃99精品青青久久| 日本一本高清视频| 久久精品国产亚洲高清剧情介绍| 成人自拍视频网站| 超碰国产在线观看| 亚洲成人免费看| 欧美美女性视频| 欧美日韩一区二区三区不卡视频| 丝袜亚洲欧美日韩综合| 日韩 欧美 精品| 国产自产高清不卡| 日本精品一区二区三区高清 久久 日本精品一区二区三区不卡无字幕 | 尤物在线视频| 日韩欧美在线中文字幕| 日本黄色一级网站| 日韩黄色大片网站| 91精品国产自产91精品| 99国产精品一区二区三区 | 一区视频免费观看| 免费在线观看日韩欧美| 精品欧美一区二区精品久久| 亚洲性图自拍| 欧美精品三级日韩久久| 人妻一区二区视频| 国产精品久久久久久久久久妞妞 | 亚州视频一区二区三区| 亚洲欧美成人一区二区三区| 尤蜜粉嫩av国产一区二区三区| 欧美重口另类| 国产综合在线看| 国产肥老妇视频| 亚洲婷婷国产精品电影人久久| 天堂社区在线视频| 蜜臀91精品国产高清在线观看| 久久久久国产精品免费网站| 国产色视频在线| 椎名由奈av一区二区三区| 在线免费观看av的网站| 久久不见久久见免费视频7| 88国产精品欧美一区二区三区| 亚洲爱情岛论坛永久| 一区二区在线观看免费| 性欧美在线视频| 婷婷久久综合| 91在线视频精品| 黄色片免费在线观看| 欧美精品一二三| 国产高清视频免费在线观看| 麻豆精品视频在线观看免费| 香蕉久久免费影视| 国产精品一区二区免费福利视频| 在线日韩av观看| 这里只有精品999| 亚洲国产精品ⅴa在线观看| 久久99爱视频| 中文字幕日韩欧美精品高清在线| 成人有码在线视频| 日本天码aⅴ片在线电影网站| 日韩一区二区三区视频| 久久免费公开视频| 成年人国产精品| 一本大道熟女人妻中文字幕在线| 亚洲国产国产| 国产精品视频免费观看www| 婷婷视频在线| 日韩三级.com| 日韩和一区二区| 久久久久久久久岛国免费| 性生交免费视频| 天天av综合| 国产精品一区二区a| 性欧美18xxxhd| 这里只有精品在线观看| 国产精品女人久久久| 亚洲国产成人porn| 中文幕无线码中文字蜜桃| 美日韩一区二区| 日韩在线观看a| 国产精品欧美在线观看| 国产在线98福利播放视频| 好久没做在线观看| 亚洲欧洲在线播放| 国产熟女精品视频| 欧美日韩精品在线观看| 在线观看免费黄色网址| 国产精品一二三区在线| 一本大道熟女人妻中文字幕在线| 99久久亚洲精品蜜臀| 国产精选在线观看91| 色婷婷综合久久久中字幕精品久久| www.欧美免费| 毛片在线播放网站| 日韩欧美一区二区三区在线| 成人免费毛片男人用品| 亚洲另类色综合网站| 国产精品1000部啪视频| 精品亚洲国内自在自线福利| 久久综合九色综合88i| 欧美gay男男猛男无套| 国产精品国产精品国产专区蜜臀ah | 中文字幕第88页| 99国产成+人+综合+亚洲欧美| 亚洲一区在线免费| 美女久久久久| 国产精品二区三区四区| 日韩福利影视| 奇门遁甲1982国语版免费观看高清| 黄色网在线免费观看| 亚洲日韩中文字幕| 国产自产一区二区| 欧美精选一区二区| 丁香社区五月天| 亚洲成av人在线观看| 日本一级片免费| 国产欧美精品日韩区二区麻豆天美| 一区二区三区四区影院| 极品销魂美女一区二区三区| 日韩av播放器| 国产日韩欧美一区二区三区在线观看| 蜜臀av.com| 日韩在线欧美| 日韩精品欧美在线| 日韩最新在线| 国产精品手机在线| 国产一区二区三区亚洲综合| 国产精品视频区| 成人va天堂| 日韩av色综合| 成人爱爱网址| 98精品在线视频| www成人免费观看| 久久99久久亚洲国产| 快射av在线播放一区| 中文字幕亚洲欧美日韩在线不卡| 深夜福利视频在线免费观看| 亚洲成avwww人| 性生活视频软件| 欧美一卡二卡在线| 国产同性人妖ts口直男| 91精品国产全国免费观看| 97成人在线观看| 欧美日韩1234| 国产色片在线观看| 日韩欧美综合一区| 国产91免费在线观看| 精品久久久久久无| 欧美一级视频免费| 亚洲国产欧美一区二区三区同亚洲| 精品久久久中文字幕人妻| 欧美va亚洲va在线观看蝴蝶网| 超碰在线观看av| 精品国产区一区| 可以免费看毛片的网站| 精品国产乱码久久久久久夜甘婷婷 | 七七婷婷婷婷精品国产| 奇米影视四色在线| 精品一区二区久久久| 国产三级精品三级在线| 国产美女娇喘av呻吟久久| 在线成人免费av| 成人高清在线视频| 中文字幕一区二区三区人妻| 久久精品在这里| 国产视频精品免费| 亚洲精品伦理在线| 日韩精品一卡二卡| 丁香五六月婷婷久久激情| 天堂网免费视频| 欧美男男青年gay1069videost| av网站在线免费看| 亚洲国产日韩欧美综合久久| 久久久资源网| 久久精品视频中文字幕| 国产第一页在线视频| 全亚洲最色的网站在线观看| av免费在线一区| 9a蜜桃久久久久久免费| 久久亚州av| 亚洲精品二区| 欧美私人啪啪vps| 一本色道无码道dvd在线观看| 激情深爱一区二区| 亚洲天堂美女视频| 国产精品福利电影一区二区三区四区| 欧美成人精品欧美一级| 色综合久久综合中文综合网| 国产精品久久久久久久免费看| 精品电影一区二区| avav免费在线观看| 国内精品一区二区三区| 78精品国产综合久久香蕉| 97自拍视频| 狠狠操综合网| 欧美中文字幕在线观看视频| 美女精品在线观看| 深夜视频在线观看| 欧美经典一区二区三区| 日本中文字幕免费观看| 欧美色精品天天在线观看视频| 亚洲精品综合网| 色偷偷偷综合中文字幕;dd| 草草视频在线观看| 成人在线激情视频| 精品国产一区二区三区av片| 国产a级片网站| 麻豆成人综合网| 亚洲av综合一区二区| 亚洲国产视频网站| 91精东传媒理伦片在线观看| 国产视频精品va久久久久久| 午夜成年人在线免费视频| 国产精品中文字幕久久久| 欧美丝袜美腿| 妞干网在线播放| 久久电影国产免费久久电影| 90岁老太婆乱淫| 天天色图综合网| 乱色精品无码一区二区国产盗| 中文字幕欧美日韩va免费视频| 五月天av在线| 精品国产乱码久久久久久丨区2区| 欧美永久精品| 99热一区二区| 国产精品天美传媒| 日批视频免费在线观看| 日韩电视剧免费观看网站| 欧美人动性xxxxz0oz| 亚洲综合日韩在线| 天天射综合网视频| 国产一区二区在线免费播放| 久久影院午夜片一区| 亚洲免费激情视频| 亚洲第一福利视频| zzzwww在线看片免费| 成人综合色站| 在线不卡欧美| 国产人妻黑人一区二区三区| 一区二区欧美国产| 超碰在线人人干| 欧美日韩爱爱视频| 在这里有精品| 全黄性性激高免费视频| 成人va在线观看| 亚洲一区欧美在线| 亚洲精品国产精品国自产观看浪潮| 操喷在线视频| 激情小说网站亚洲综合网| 亚洲视频成人| 成人免费av片| 欧美性xxxxx极品| 理论视频在线| 国产一区在线播放| 综合激情婷婷| 久久精品女同亚洲女同13| 精品久久久一区二区| 水中色av综合| 国产精品小说在线| 久久人人99| 男插女视频网站| 婷婷久久综合九色综合绿巨人| 日本大臀精品| 国产精品亚洲一区二区三区| 久久久五月天| 一区二区免费在线观看视频| 欧美性精品220| 3p视频在线观看| 亚洲精品欧美一区二区三区| 影音先锋亚洲电影| 国产精品密蕾丝袜| 4438x成人网最大色成网站| 一二三四区在线观看| 久久99精品国产一区二区三区| 麻豆9191精品国产| 韩国一级黄色录像| 亚洲国产成人精品电影| 精品国模一区二区三区| 在线视频精品一区| 成人黄色网址在线观看| 天码人妻一区二区三区在线看| 日韩中文字幕欧美| 岛国av一区| www日韩在线观看| 一区二区三区四区在线播放| 日韩黄色影片| 91系列在线播放| 亚洲欧美bt| 欧美高清视频一区二区三区| 亚洲开心激情网| 九九九九九九精品任你躁| 亚洲熟妇国产熟妇肥婆| 国产精品久久久久一区二区三区共| 亚洲精品国产一区二| 国产精品成人播放| 亚洲一级网站| 国产精品69久久久久孕妇欧美| 欧美精品一区二区在线观看| 成人四虎影院| 国产女大学生av| 亚洲夂夂婷婷色拍ww47| yw193.com尤物在线| 国产一区二区在线观看免费播放| 老司机精品视频一区二区三区|