精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Apache Flink 漫談系列(11) - Temporal Table JOIN

開發 開發工具
在《Apache Flink 漫談系列 - JOIN LATERAL》中提到了Temporal Table JOIN,本篇就向大家詳細介紹什么是Temporal Table JOIN。

一、什么是Temporal Table

在《Apache Flink 漫談系列 - JOIN LATERAL》中提到了Temporal Table JOIN,本篇就向大家詳細介紹什么是Temporal Table JOIN。

ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的數據庫廠商也先后實現了這個標準。Temporal Table記錄了歷史上任何時間點所有的數據改動,Temporal Table的工作流程如下:

emporal Table

上圖示意Temporal Table具有普通table的特性,有具體獨特的DDL/DML/QUERY語法,時間是其核心屬性。歷史意味著時間,意味著快照Snapshot。

二、ANSI-SQL 2011 Temporal Table示例

我們以一個DDL和一套DML示例說明Temporal Table的原理,DDL定義PK是可選的,下面的示例我們以不定義PK的為例進行說明:

1. DDL 示例

  1. CREATE TABLE Emp 
  2. ENo INTEGER, 
  3. Sys_Start TIMESTAMP(12) GENERATED 
  4. ALWAYS AS ROW Start, 
  5. Sys_end TIMESTAMP(12) GENERATED 
  6. ALWAYS AS ROW END, 
  7. EName VARCHAR(30), 
  8. PERIOD FOR SYSTEM_TIME (Sys_Start,Sys_end) 
  9. ) WITH SYSTEM VERSIONING 

2. DML 示例

(1) INSERT

  1. INSERT INTO Emp (ENo, EName) VALUES (22217, 'Joe') 

說明: 其中Sys_Start和Sys_End是數據庫系統默認填充的。

(2) UPDATE

  1. UPDATE Emp SET EName = 'Tom' WHERE ENo = 22217 

說明: 假設是在 2012-02-03 10:00:00 執行的UPDATE,執行之后上一個值"Joe"的Sys_End值由9999-12-31 23:59:59 變成了 2012-02-03 10:00:00, 也就是下一個值"Tom"生效的開始時間。可見我們執行的是UPDATE但是數據庫里面會存在兩條數據,數據值和有效期不同,也就是版本不同。

(3) DELETE (假設執行DELETE之前的表內容如下)

  1. DELETE FROM Emp WHERE ENo = 22217 

說明: 假設我們是在2012-06-01 00:00:00執行的DELETE,則Sys_End值由9999-12-31 23:59:59 變成了 2012-06-01 00:00:00, 也就是在執行DELETE時候沒有真正的刪除符合條件的行,而是系統將符合條件的行的Sys_end修改為執行DELETE的操作時間。標識數據的有效期到DELETE執行那一刻為止。

(4) SELECT

  1. SELECT ENo,EName,Sys_Start,Sys_End FROM Emp 
  2. FOR SYSTEM_TIME AS OF TIMESTAMP '2011-01-02 00:00:00' 

說明: 這個查詢會返回所有Sys_Start <= 2011-01-02 00:00:00 并且 Sys_end > 2011-01-02 00:00:00 的記錄。

三、SQLServer Temporal Table 示例

1. DDL

  1. CREATE TABLE Department 
  2. DeptID int NOT NULL PRIMARY KEY CLUSTERED 
  3. , DeptName varchar(50) NOT NULL 
  4. , ManagerID INT NULL 
  5. , ParentDeptID int NULL 
  6. , SysStartTime datetime2 GENERATED ALWAYS AS ROW Start NOT NULL 
  7. , SysEndTime datetime2 GENERATED ALWAYS AS ROW END NOT NULL 
  8. , PERIOD FOR SYSTEM_TIME (SysStartTime,SysEndTime) 
  9. WITH (SYSTEM_VERSIONING = ON); 

執行上面的語句,在數據庫會創建當前表和歷史表,如下圖:

Department 顯示是有版本控制的,歷史表是默認的名字,我也可以指定名字如:SYSTEM_VERSIONING = ON (HISTORY_TABLE = dbo.DepartmentHistory)。

2. DML

(1) INSERT - 插入列不包含SysStartTime和SysEndTime列

  1. INSERT INTO [dbo].[Department] ([DeptID] ,[DeptName] ,[ManagerID] ,[ParentDeptID]) 
  2. VALUES(10, 'Marketing', 101, 1); 

VALUES(10, 'Marketing', 101, 1);

執行之后我們分別查詢當前表和歷史表,如下圖:

我們***條INSERT語句數據值的有效時間是操作那一刻2018-06-06 05:50:20.7913985 到永遠 9999-12-31 23:59:59.9999999,但這時刻歷史表還沒有任何信息。我們接下來進行更新操作。

(2) UPDATE

  1. UPDATE [dbo].[Department] SET [ManagerID] = 501 WHERE [DeptID] = 10 

執行之后當前表信息會更新并在歷史表里面產生一條歷史信息,如下:

 

注意當前表的SysStartTime意見發生了變化,歷史表產生了一條記錄,SyStartTIme是原當前表記錄的SysStartTime,SysEndTime是當前表記錄的SystemStartTime。我們再更新一次:

  1. UPDATE [dbo].[Department] SET [ManagerID] = 201 WHERE [DeptID] = 10 

到這里我們了解到SQLServer里面關于Temporal Table的邏輯是有當前表和歷史表來存儲數據,并且數據庫內部以StartTime和EndTime的方式管理數據的版本。

(3) SELECT

  1. SELECT [DeptID], [DeptName], [SysStartTime],[SysEndTime] 
  2. FROM [dbo].[Department] 
  3. FOR SYSTEM_TIME AS OF '2018-06-06 05:50:21.0000000' ; 

SELECT語句查詢的是Department的表,實際返回的數據是從歷史表里面查詢出來的,查詢的底層邏輯就是 SysStartTime <= '2018-06-06 05:50:21.0000000' and SysEndTime > '2018-06-06 05:50:21.0000000' 。

四、Apache Flink Temporal Table

我們不止一次的提到Apache Flink遵循ANSI-SQL標準,Apache Flink中Temporal Table的概念也源于ANSI-2011的標準語義,但目前的實現在語法層面和ANSI-SQL略有差別,上面看到ANSI-2011中使用FOR SYSTEM_TIME AS OF的語法,目前Apache Flink中使用 LATERAL TABLE(TemporalTableFunction)的語法。這一點后續需要推動社區進行改進。

1. 為啥需要 Temporal Table

我們以具體的查詢示例來說明為啥需要Temporal Table,假設我們有一張實時變化的匯率表(RatesHistory),如下:

RatesHistory代表了Yen匯率(Yen匯率為1),是不斷變化的Append only的匯率表。例如,Euro兌Yen匯率從09:00至10:45的匯率為114。從10點45分到11點15分是116。

假設我們想在10:58輸出所有當前匯率,我們需要以下SQL查詢來計算結果表:

  1. SELECT * 
  2. FROM RatesHistory AS r 
  3. WHERE r.rowtime = ( 
  4. SELECT MAX(rowtime) 
  5. FROM RatesHistory AS r2 
  6. WHERE rr2.currency = r.currency 
  7. AND r2.rowtime <= '10:58'); 

相應Flink代碼如下:

  • 定義數據源-genRatesHistorySource
    1. def genRatesHistorySource: CsvTableSource = { 
    2.  
    3. val csvRecords = Seq
    4. "rowtime ,currency ,rate", 
    5. "09:00:00 ,US Dollar , 102", 
    6. "09:00:00 ,Euro , 114", 
    7. "09:00:00 ,Yen , 1", 
    8. "10:45:00 ,Euro , 116", 
    9. "11:15:00 ,Euro , 119", 
    10. "11:49:00 ,Pounds , 108" 
    11. // 測試數據寫入臨時文件 
    12. val tempFilePath = 
    13. writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp") 
    14.  
    15. // 創建Source connector 
    16. new CsvTableSource( 
    17. tempFilePath, 
    18. Array("rowtime","currency","rate"), 
    19. Array( 
    20. Types.STRING,Types.STRING,Types.STRING 
    21. ), 
    22. fieldDelim = ","
    23. rowDelim = "$"
    24. ignoreFirstLine = true
    25. ignoreComments = "%" 
    26. def writeToTempFile( 
    27. contents: String, 
    28. filePrefix: String, 
    29. fileSuffix: String, 
    30. charset: String = "UTF-8"): String = { 
    31. val tempFile = File.createTempFile(filePrefix, fileSuffix) 
    32. val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), charset) 
    33. tmpWriter.write(contents) 
    34. tmpWriter.close() 
    35. tempFile.getAbsolutePath} 
  • 主程序代碼
    1. def main(args: Array[String]): Unit = { 
    2. // Streaming 環境 
    3. val env = StreamExecutionEnvironment.getExecutionEnvironment 
    4. val tEnv = TableEnvironment.getTableEnvironment(env) 
    5.  
    6. //方便我們查出輸出數據 
    7. env.setParallelism(1) 
    8.  
    9. val sourceTableName = "RatesHistory" 
    10. // 創建CSV source數據結構 
    11. val tableSource = CsvTableSourceUtils.genRatesHistorySource 
    12. // 注冊source 
    13. tEnv.registerTableSource(sourceTableName, tableSource) 
    14.  
    15. // 注冊retract sink 
    16. val sinkTableName = "retractSink" 
    17. val fieldNames = Array("rowtime", "currency", "rate") 
    18. val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.STRING, Types.STRING) 
    19.  
    20. tEnv.registerTableSink( 
    21. sinkTableName, 
    22. fieldNames, 
    23. fieldTypes, 
    24. new MemoryRetractSink) 
    25.  
    26. val SQL = 
    27. ""
    28. |SELECT * 
    29. |FROM RatesHistory AS r 
    30. |WHERE r.rowtime = ( 
    31. | SELECT MAX(rowtime) 
    32. | FROM RatesHistory AS r2 
    33. | WHERE rr2.currency = r.currency 
    34. | AND r2.rowtime <= '10:58:00' ) 
    35. """.stripMargin 
    36.  
    37. // 執行查詢 
    38. val result = tEnv.SQLQuery(SQL) 
    39.  
    40. // 將結果插入sink 
    41. result.insertInto(sinkTableName) 
    42. env.execute() 
  • 執行結果如下圖:

結果表格化一下:

Temporal Table的概念旨在簡化此類查詢,加速它們的執行。Temporal Table是Append Only表上的參數化視圖,它把Append Only的表變化解釋為表的Changelog,并在特定時間點提供該表的版本(時間版本)。將Applend Only表解釋為changelog需要指定主鍵屬性和時間戳屬性。主鍵確定覆蓋哪些行,時間戳確定行有效的時間,也就是數據版本,與上面SQL Server示例的有效期的概念一致。

在上面的示例中,currency是RatesHistory表的主鍵,而rowtime是timestamp屬性。

2. 如何定義Temporal Table

在Apache Flink中擴展了TableFunction的接口,在TableFunction接口的基礎上添加了時間屬性和pk屬性。

(1) 內部TemporalTableFunction定義如下:

  1. class TemporalTableFunction private( 
  2. @transient private val underlyingHistoryTable: Table, 
  3. // 時間屬性,相當于版本信息 
  4. private val timeAttribute: Expression, 
  5. // 主鍵定義 
  6. private val primaryKey: String, 
  7. private val resultType: RowTypeInfo) 
  8. extends TableFunction[Row] { 
  9. ...} 

(2) 用戶創建TemporalTableFunction方式

在Table中添加了createTemporalTableFunction方法,該方法需要傳入時間屬性和主鍵,接口定義如下:

  1. // Creates TemporalTableFunction backed up by this table as a history table. 
  2.  
  3. def createTemporalTableFunction( 
  4. timeAttribute: Expression, 
  5. primaryKey: Expression): TemporalTableFunction = { 
  6. ...} 

用戶通過如下方式調用就可以得到一個TemporalTableFunction的實例,代碼如下:

  1. val tab = ... 
  2. val temporalTableFunction = tab.createTemporalTableFunction('time, 'pk) 
  3. ... 

3. 案例代碼

(1) 需求描述

假設我們有一張訂單表Orders和一張匯率表Rates,那么訂單來自于不同的地區,所以支付的幣種各不一樣,那么假設需要統計每個訂單在下單時候Yen幣種對應的金額。

(2) Orders 數據

(3) Rates 數據

(4) 統計需求對應的SQL

  1. SELECT o.currency, o.amount, r.rate 
  2. o.amount * r.rate AS yen_amount 
  3. FROM 
  4. Orders AS o, 
  5. LATERAL TABLE (Rates(o.rowtime)) AS r 
  6. WHERE r.currency = o.currency 

(5) 預期結果

4. Without connnector 實現代碼

  1. object TemporalTableJoinTest { 
  2. def main(args: Array[String]): Unit = { 
  3. val env = StreamExecutionEnvironment.getExecutionEnvironment 
  4. val tEnv = TableEnvironment.getTableEnvironment(env) 
  5. env.setParallelism(1) 
  6. // 設置時間類型是 event-time env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
  7. // 構造訂單數據 
  8. val ordersData = new mutable.MutableList[(Long, String, Timestamp)] 
  9. ordersData.+=((2L, "Euro", new Timestamp(2L))) 
  10. ordersData.+=((1L, "US Dollar", new Timestamp(3L))) 
  11. ordersData.+=((50L, "Yen", new Timestamp(4L))) 
  12. ordersData.+=((3L, "Euro", new Timestamp(5L))) 
  13.  
  14. //構造匯率數據 
  15. val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)] 
  16. ratesHistoryData.+=(("US Dollar", 102L, new Timestamp(1L))) 
  17. ratesHistoryData.+=(("Euro", 114L, new Timestamp(1L))) 
  18. ratesHistoryData.+=(("Yen", 1L, new Timestamp(1L))) 
  19. ratesHistoryData.+=(("Euro", 116L, new Timestamp(5L))) 
  20. ratesHistoryData.+=(("Euro", 119L, new Timestamp(7L))) 
  21.  
  22. // 進行訂單表 event-time 的提取 
  23. val orders = env 
  24. .fromCollection(ordersData) 
  25. .assignTimestampsAndWatermarks(new OrderTimestampExtractor[Long, String]()) 
  26. .toTable(tEnv, 'amount, 'currency, 'rowtime.rowtime) 
  27.  
  28. // 進行匯率表 event-time 的提取 
  29. val ratesHistory = env 
  30. .fromCollection(ratesHistoryData) 
  31. .assignTimestampsAndWatermarks(new OrderTimestampExtractor[String, Long]()) 
  32. .toTable(tEnv, 'currency, 'rate, 'rowtime.rowtime) 
  33.  
  34. // 注冊訂單表和匯率表 
  35. tEnv.registerTable("Orders", orders) 
  36. tEnv.registerTable("RatesHistory", ratesHistory) 
  37. val tab = tEnv.scan("RatesHistory"); 
  38. // 創建TemporalTableFunction 
  39. val temporalTableFunction = tab.createTemporalTableFunction('rowtime, 'currency) 
  40. //注冊TemporalTableFunction 
  41. tEnv.registerFunction("Rates",temporalTableFunction) 
  42.  
  43. val SQLQuery = 
  44. ""
  45. |SELECT o.currency, o.amount, r.rate, 
  46. | o.amount * r.rate AS yen_amount 
  47. |FROM 
  48. | Orders AS o, 
  49. | LATERAL TABLE (Rates(o.rowtime)) AS r 
  50. |WHERE r.currency = o.currency 
  51. |""".stripMargin 
  52.  
  53. tEnv.registerTable("TemporalJoinResult", tEnv.SQLQuery(SQLQuery)) 
  54.  
  55. val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row] 
  56. // 打印查詢結果 
  57. result.print() 
  58. env.execute() 

在運行上面代碼之前需要注意上面代碼中對EventTime時間提取的過程,也就是說Apache Flink的TimeCharacteristic.EventTime 模式,需要調用assignTimestampsAndWatermarks方法設置EventTime的生成方式,這種方式也非常靈活,用戶可以控制業務數據的EventTime的值和WaterMark的產生,WaterMark相關內容可以查閱《Apache Flink 漫談系列(03) - Watermark》。 在本示例中提取EventTime的完整代碼如下:

  1. import java.SQL.Timestamp 
  2.  
  3. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor 
  4. import org.apache.flink.streaming.api.windowing.time.Time 
  5.  
  6. class OrderTimestampExtractor[T1, T2] 
  7. extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) { 
  8. override def extractTimestamp(element: (T1, T2, Timestamp)): Long = { 
  9. element._3.getTime 

查看運行結果:

5. With CSVConnector 實現代碼

在實際的生產開發中,都需要實際的Connector的定義,下面我們以CSV格式的Connector定義來開發Temporal Table JOIN Demo。

(1) genEventRatesHistorySource

  1. def genEventRatesHistorySource: CsvTableSource = { 
  2.  
  3. val csvRecords = Seq
  4. "ts#currency#rate", 
  5. "1#US Dollar#102", 
  6. "1#Euro#114", 
  7. "1#Yen#1", 
  8. "3#Euro#116", 
  9. "5#Euro#119", 
  10. "7#Pounds#108" 
  11. // 測試數據寫入臨時文件 
  12. val tempFilePath = 
  13. FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_rate", "tmp") 
  14.  
  15. // 創建Source connector 
  16. new CsvTableSource( 
  17. tempFilePath, 
  18. Array("ts","currency","rate"), 
  19. Array( 
  20. Types.LONG,Types.STRING,Types.LONG 
  21. ), 
  22. fieldDelim = "#"
  23. rowDelim = CommonUtils.line, 
  24. ignoreFirstLine = true
  25. ignoreComments = "%" 
  26. )} 

(2) genRatesOrderSource

  1. def genRatesOrderSource: CsvTableSource = { 
  2.  
  3. val csvRecords = Seq
  4. "ts#currency#amount", 
  5. "2#Euro#10", 
  6. "4#Euro#10" 
  7. // 測試數據寫入臨時文件 
  8. val tempFilePath = 
  9. FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_order", "tmp") 
  10.  
  11. // 創建Source connector 
  12. new CsvTableSource( 
  13. tempFilePath, 
  14. Array("ts","currency", "amount"), 
  15. Array( 
  16. Types.LONG,Types.STRING,Types.LONG 
  17. ), 
  18. fieldDelim = "#"
  19. rowDelim = CommonUtils.line, 
  20. ignoreFirstLine = true
  21. ignoreComments = "%" 

(3) 主程序代碼

  1. /* 
  2.  * Licensed to the Apache Software Foundation (ASF) under one 
  3.  * or more contributor license agreements.  See the NOTICE file 
  4.  * distributed with this work for additional information 
  5.  * regarding copyright ownership.  The ASF licenses this file 
  6.  * to you under the Apache License, Version 2.0 (the 
  7.  * "License"); you may not use this file except in compliance 
  8.  * with the License.  You may obtain a copy of the License at 
  9.  * 
  10.  *     http://www.apache.org/licenses/LICENSE-2.0 
  11.  * 
  12.  * Unless required by applicable law or agreed to in writing, software 
  13.  * distributed under the License is distributed on an "AS IS" BASIS, 
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  15.  * See the License for the specific language governing permissions and 
  16.  * limitations under the License. 
  17.  */ 
  18.  
  19. package org.apache.flink.book.connectors 
  20.  
  21. import java.io.File 
  22.  
  23. import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} 
  24. import org.apache.flink.book.utils.{CommonUtils, FileUtils} 
  25. import org.apache.flink.table.sinks.{CsvTableSink, TableSink} 
  26. import org.apache.flink.table.sources.CsvTableSource 
  27. import org.apache.flink.types.Row 
  28.  
  29. object CsvTableSourceUtils { 
  30.  
  31.   def genWordCountSource: CsvTableSource = { 
  32.     val csvRecords = Seq
  33.       "words", 
  34.       "Hello Flink", 
  35.       "Hi, Apache Flink", 
  36.       "Apache FlinkBook" 
  37.     ) 
  38.     // 測試數據寫入臨時文件 
  39.     val tempFilePath = 
  40.       FileUtils.writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp") 
  41.  
  42.     // 創建Source connector 
  43.     new CsvTableSource( 
  44.       tempFilePath, 
  45.       Array("words"), 
  46.       Array( 
  47.         Types.STRING 
  48.       ), 
  49.       fieldDelim = "#"
  50.       rowDelim = "$"
  51.       ignoreFirstLine = true
  52.       ignoreComments = "%" 
  53.     ) 
  54.   } 
  55.  
  56.  
  57.   def genRatesHistorySource: CsvTableSource = { 
  58.  
  59.     val csvRecords = Seq
  60.       "rowtime ,currency   ,rate", 
  61.     "09:00:00   ,US Dollar  , 102", 
  62.     "09:00:00   ,Euro       , 114", 
  63.     "09:00:00  ,Yen        ,   1", 
  64.     "10:45:00   ,Euro       , 116", 
  65.     "11:15:00   ,Euro       , 119", 
  66.     "11:49:00   ,Pounds     , 108" 
  67.     ) 
  68.     // 測試數據寫入臨時文件 
  69.     val tempFilePath = 
  70.       FileUtils.writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp") 
  71.  
  72.     // 創建Source connector 
  73.     new CsvTableSource( 
  74.       tempFilePath, 
  75.       Array("rowtime","currency","rate"), 
  76.       Array( 
  77.         Types.STRING,Types.STRING,Types.STRING 
  78.       ), 
  79.       fieldDelim = ","
  80.       rowDelim = "$"
  81.       ignoreFirstLine = true
  82.       ignoreComments = "%" 
  83.     ) 
  84.   } 
  85.  
  86.   def genEventRatesHistorySource: CsvTableSource = { 
  87.  
  88.     val csvRecords = Seq
  89.       "ts#currency#rate", 
  90.       "1#US Dollar#102", 
  91.       "1#Euro#114", 
  92.       "1#Yen#1", 
  93.       "3#Euro#116", 
  94.       "5#Euro#119", 
  95.       "7#Pounds#108" 
  96.     ) 
  97.     // 測試數據寫入臨時文件 
  98.     val tempFilePath = 
  99.       FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_rate", "tmp") 
  100.  
  101.     // 創建Source connector 
  102.     new CsvTableSource( 
  103.       tempFilePath, 
  104.       Array("ts","currency","rate"), 
  105.       Array( 
  106.         Types.LONG,Types.STRING,Types.LONG 
  107.       ), 
  108.       fieldDelim = "#"
  109.       rowDelim = CommonUtils.line, 
  110.       ignoreFirstLine = true
  111.       ignoreComments = "%" 
  112.     ) 
  113.   } 
  114.  
  115.   def genRatesOrderSource: CsvTableSource = { 
  116.  
  117.     val csvRecords = Seq
  118.       "ts#currency#amount", 
  119.       "2#Euro#10", 
  120.       "4#Euro#10" 
  121.     ) 
  122.     // 測試數據寫入臨時文件 
  123.     val tempFilePath = 
  124.       FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_order", "tmp") 
  125.  
  126.     // 創建Source connector 
  127.     new CsvTableSource( 
  128.       tempFilePath, 
  129.       Array("ts","currency", "amount"), 
  130.       Array( 
  131.         Types.LONG,Types.STRING,Types.LONG 
  132.       ), 
  133.       fieldDelim = "#"
  134.       rowDelim = CommonUtils.line, 
  135.       ignoreFirstLine = true
  136.       ignoreComments = "%" 
  137.     ) 
  138.   } 
  139.  
  140.  
  141.   /** 
  142.     * Example: 
  143.     * genCsvSink( 
  144.     *   Array[String]("word", "count"), 
  145.     *   Array[TypeInformation[_] ](Types.STRING, Types.LONG)) 
  146.     */ 
  147.   def genCsvSink(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = { 
  148.     val tempFile = File.createTempFile("csv_sink_", "tem") 
  149.     if (tempFile.exists()) { 
  150.       tempFile.delete() 
  151.     } 
  152.     new CsvTableSink(tempFile.getAbsolutePath).configure(fieldNames, fieldTypes) 
  153.   } 
  154.  

運行結果如下 :

6. 內部實現原理

我們還是以訂單和匯率關系示例來說明Apache Flink內部實現Temporal Table JOIN的原理,如下圖所示:

五、Temporal Table JOIN vs 雙流JOIN vs Lateral JOIN

在《Apache Flink 漫談系列(09) - JOIN算子》中我們介紹了雙流JOIN,在《Apache Flink 漫談系列(10) - JOIN LATERAL 》中我們介紹了 JOIN LATERAL(TableFunction),那么本篇介紹的Temporal Table JOIN和雙流JOIN/JOIN LATERAL(TableFunction)有什么本質區別呢?

  • 雙流JOIN - 雙流JOIN本質很明確是 Stream JOIN Stream,雙流驅動。
  • LATERAL JOIN - Lateral JOIN的本質是Steam JOIN Table Function, 是單流驅動。
  • Temporal Table JOIN - Temporal Table JOIN 的本質就是 Stream JOIN Temporal Table 或者 Stream JOIN Table with snapshot。Temporal Table JOIN 特點單流驅動,Temporal Table 是被動查詢。

1. Temporal Table JOIN vs LATERAL JOIN

從功能上說Temporal Table JOIN和 LATERAL JOIN都是由左流一條數據獲取多行數據,也就是單流驅動,并且都是被動查詢,那么Temporal JOIN和LATERAL JOIN最本質的區別是什么呢?這里我們說最關鍵的一點是 State 的管理,LATERAL JOIN是一個TableFunction,不具備state的管理能力,數據不具備版本特性。而Temporal Table JOIN是一個具備版本信息的數據表。

2. Temporal Table JOIN vs 雙流 JOIN

Temporal Table JOIN 和 雙流 JOIN都可以管理State,那么他們的本質區別是什么? 那就是計算驅動的差別,Temporal Table JOIN是單邊驅動,Temporal Table是被動的查詢,而雙流JOIN是雙邊驅動,兩邊都是主動的進行JOIN計算。

3. Temporal Table JOIN改進

個人認為Apache Flink的Temporal Table JOIN功能不論在語法和語義上面都要遵循ANSI-SQL標準,后期會推動社區在Temporal Table上面支持ANSI-SQL的FOR SYSTEM_TIME AS OF標準語法。改進后的處理邏輯示意圖:

其中cache是一種性能考慮的優化,詳細內容待社區完善后再細述。

六、小結

本篇結合ANSI-SQL標準和SQL Server對Temporal Table的支持來開篇,然后介紹目前Apache Flink對Temporal Table的支持現狀,以代碼示例和內部處理邏輯示意圖的方式讓大家直觀體驗Temporal Table JOIN的語法和語義。

關于點贊和評論

本系列文章難免有很多缺陷和不足,真誠希望讀者對有收獲的篇章給予點贊鼓勵,對有不足的篇章給予反饋和建議,先行感謝大家!

作者:孫金城,花名 金竹,目前就職于阿里巴巴,自2015年以來一直投入于基于Apache Flink的阿里巴巴計算平臺Blink的設計研發工作。

【本文為51CTO專欄作者“金竹”原創稿件,轉載請聯系原作者】

戳這里,看該作者更多好文

責任編輯:趙寧寧 來源: 51CTO專欄
相關推薦

2022-07-13 12:53:59

數據存儲

2018-11-20 07:59:43

Apache Flin JOIN算子代碼

2018-11-29 09:01:26

Apache FlinJOIN代碼

2019-01-03 10:17:53

Apache FlinTable API代碼

2022-06-10 17:26:07

數據集計算

2018-12-29 08:16:32

Apache FlinJOIN代碼

2018-10-09 10:55:52

Apache FlinWatermark流計算

2018-09-26 08:44:22

Apache Flin流計算計算模式

2018-10-16 08:54:35

Apache Flin流計算State

2018-09-26 07:50:52

Apache Flin流計算計算模式

2018-11-14 09:01:23

Apache FlinSQL代碼

2018-10-22 21:43:39

Apache Flin流計算Fault Toler

2022-07-13 13:03:29

流計算亂序

2022-07-12 10:38:25

分布式框架

2018-11-07 08:48:31

Apache Flin持續查詢流計算

2019-01-15 08:50:12

Apache FlinKafka分布式

2018-10-30 14:08:45

Apache Flin流表對偶duality

2020-04-09 11:08:30

PyFlinkJAR依賴

2022-06-20 05:52:27

FlinkTTL流查詢

2018-10-30 11:10:05

Flink數據集計算
點贊
收藏

51CTO技術棧公眾號

久久精品美女| 日韩在线免费高清视频| 中文字幕无码精品亚洲35| 亚洲三级黄色片| 日韩一级不卡| 一本大道久久加勒比香蕉| 天天爽夜夜爽视频| 水蜜桃在线视频| 国产精品久久久久久久久免费桃花| 国产免费成人av| 久久久久亚洲AV| 精品国产99| 亚洲大胆美女视频| 激情六月丁香婷婷| 日韩激情av| 99re6这里只有精品视频在线观看 99re8在线精品视频免费播放 | 国产九一精品| 精品国产麻豆免费人成网站| 亚洲视频在线观看一区二区三区| 人人超在线公开视频| 国产欧美日韩在线| 国产在线一区二区三区播放| 97超碰人人草| 日本欧美一区二区三区| 国内精品小视频在线观看| 亚洲色图欧美色| 色综合久久中文| 欧美变态tickle挠乳网站| 中文字幕天天干| 蜜桃视频在线观看免费视频| 亚洲女同ⅹxx女同tv| 少妇特黄a一区二区三区| 精品国产无码AV| 久久99精品国产麻豆不卡| 国产91色在线|| 羞羞影院体验区| 伊人精品成人久久综合软件| 美女视频久久黄| av黄色免费在线观看| 国产毛片一区二区三区| 亚洲成人激情在线观看| 无码人妻一区二区三区在线视频| 日韩电影免费观看高清完整版在线观看 | 日韩久久久久久久| 国产精品影院在线| 久久久久久免费| 欧美午夜视频在线| 青青草在线视频免费观看| 成人激情午夜影院| 国产精品加勒比| 亚洲AV无码精品自拍| 国产成人综合精品三级| 91沈先生播放一区二区| av免费观看在线| 国产精品69毛片高清亚洲| 亚洲直播在线一区| 国产丰满美女做爰| 国产成人免费视| 官网99热精品| 欧美特级特黄aaaaaa在线看| 成人小视频在线| 极品校花啪啪激情久久| 香蕉久久一区二区三区| 91麻豆国产自产在线观看| 蜜桃视频在线观看91| 国产视频三级在线观看播放| 国产欧美日韩不卡| 一区二区免费在线观看| 国产精品剧情| 亚洲国产一二三| 欧美网站免费观看| 国产精品扒开腿做爽爽爽视频软件| 91福利视频在线| 91av视频免费观看| 一区二区三区在线资源| 亚洲韩国青草视频| 久久美女免费视频| 2023国产精品久久久精品双| 欧美精品激情在线观看| 亚洲精品男人的天堂| 日本美女一区二区三区视频| 91社区国产高清| 欧洲av在线播放| 日本一区二区三区免费乱视频| 一区二区免费在线观看| 国产精品一区hongkong| 色999日韩国产欧美一区二区| 日韩成人精品视频在线观看| 成人av动漫| 亚洲毛片在线观看.| 激情高潮到大叫狂喷水| 狠狠爱综合网| 国产精品一区二区3区| www.黄色av| 国产三区在线成人av| 公共露出暴露狂另类av| 天堂中文在线播放| 91精品国产综合久久精品麻豆| 国产女人18毛片水真多18| 第四色成人网| 97视频在线观看播放| 在线观看视频二区| av亚洲精华国产精华精华| 婷婷亚洲婷婷综合色香五月| 国产极品人妖在线观看| 欧美日韩视频在线一区二区| xfplay5566色资源网站| 欧美黄色大片在线观看| 欧美亚洲午夜视频在线观看| 一级做a爱片性色毛片| 99久久精品免费看国产 | 亚洲性图久久| 国产噜噜噜噜久久久久久久久| 欧美一级片免费| 最近日本中文字幕| 午夜日韩福利| 成人av在线网址| 精品久久av| 午夜a成v人精品| 日本55丰满熟妇厨房伦| 久久不卡国产精品一区二区| 久久久爽爽爽美女图片| 国产精品爽爽久久| 国产视频一区二区在线| 免费成人在线视频网站| 超碰精品在线观看| 欧美刺激性大交免费视频| 又色又爽又黄无遮挡的免费视频| 91网站在线播放| 欧美精品久久久久久久久久久| 久久91超碰青草在哪里看| 亚洲男人的天堂网站| 国产黄色片视频| 国产盗摄一区二区三区| ijzzijzzij亚洲大全| 开心久久婷婷综合中文字幕| 国产亚洲综合久久| 精产国品一区二区| 久久天堂av综合合色蜜桃网| 俄罗斯av网站| 里番精品3d一二三区| 久久久免费精品视频| 高h震动喷水双性1v1| 一区二区三区四区国产精品| 中文字幕av一区二区三区人妻少妇| 久久亚洲成人| 91精品中国老女人| 精人妻一区二区三区| 天天综合天天综合| 日韩美女啊v在线免费观看| 中文字幕av专区| 日韩理论电影大全| 国产女人精品视频| 黄色小网站在线观看| 制服视频三区第一页精品| 91麻豆精品成人一区二区| 精品一区二区在线播放| 国产树林野战在线播放| 日韩精品三级| 韩国精品美女www爽爽爽视频| 日本人妻熟妇久久久久久| 精品国产户外野外| 久久av无码精品人妻系列试探| 老牛国产精品一区的观看方式| 日本高清一区| 亚洲日韩中文字幕一区| 久久99精品久久久久久琪琪| 成人免费一级视频| 欧美色道久久88综合亚洲精品| 精品人妻一区二区三区视频| 日本中文在线一区| 大桥未久一区二区| 精品久久对白| 国产精品久久久久久av| 成年视频在线观看| 亚洲成人动漫在线播放| av手机天堂网| 国产精品国产成人国产三级 | 成人av网站在线观看| 鲁一鲁一鲁一鲁一色| 狠狠做深爱婷婷综合一区| 国产在线拍揄自揄视频不卡99| 伊人影院在线视频| 亚洲乱码一区av黑人高潮| 亚洲无码精品在线播放| 亚洲国产精品欧美一二99| 在线免费看黄视频| 国产精品主播直播| 国产综合免费视频| 午夜精品网站| 欧美亚洲一级二级| 精品国产18久久久久久二百| 欧美在线视频一区二区| 免费超碰在线| 日韩激情视频在线| 国产高中女学生第一次| 日韩欧美精品免费在线| 人妻少妇精品一区二区三区| 久久免费精品国产久精品久久久久| 亚洲 激情 在线| 国产日韩亚洲| 日本免费成人网| 欧美激情黄色片| 麻豆成人av| 4438全国亚洲精品观看视频| 国产精品久久不能| 99爱在线视频| 久久深夜福利免费观看| 国产专区在线播放| 精品国产一区二区三区av性色| 免费av中文字幕| 亚洲第一狼人社区| 国产免费久久久久| 国产欧美精品区一区二区三区 | 欧美日韩国产中字| 91久久国产综合| 欧美激情资源网| 久久亚洲AV成人无码国产野外| 国产一区二区女| 天堂网在线免费观看| 欧美专区18| 国产原创中文在线观看 | 美日韩中文字幕| 国产精品一区二区三区免费| 精品欧美视频| 国产精品普通话| 欧美亚洲大片| 欧美一级大片视频| 成人高潮aa毛片免费| 欧美精品在线免费播放| 国内精品不卡| 久久九九有精品国产23| 婷婷成人激情| 中日韩美女免费视频网址在线观看 | 欧美激情15p| 国产精品一区二区三区免费| 日韩一区二区三区在线看| 91综合免费在线| 国产激情综合| 亚洲free性xxxx护士白浆| 亚洲欧美专区| 2022国产精品| 中文字幕日韩在线| 懂色av一区二区三区在线播放| 国产一区91精品张津瑜| 国产丝袜不卡| 亚洲成人五区| 91精品国产综合久久久久久丝袜 | 黑丝av在线播放| 成人一区在线看| 99re这里只有| 久久久久亚洲综合| 九九热免费在线| 国产精品久久久一区麻豆最新章节| 男人的天堂官网| 国产精品每日更新| 国产suv精品一区二区68| 亚洲靠逼com| 动漫精品一区一码二码三码四码| 亚洲一区二区三区四区在线观看| 国产无精乱码一区二区三区| 精品成人在线视频| 欧美brazzers| 69堂亚洲精品首页| 亚洲爱情岛论坛永久| 亚洲第一男人天堂| 理论在线观看| 久久精品最新地址| 国产精品69xx| 国产精国产精品| 亚洲毛片在线免费| 高清国产在线一区| 国产精品日韩精品中文字幕| 亚洲日本精品一区| 欧美久久一级| 国产成人无码一二三区视频| 男女性色大片免费观看一区二区| 婷婷中文字幕在线观看| 国产99久久久久| 久久中文字幕人妻| 中文字幕五月欧美| 日韩毛片在线视频| 精品视频色一区| 超碰在线播放97| 亚洲人高潮女人毛茸茸| 国精产品一区| 欧美亚洲在线播放| 国产一区一区| 欧美日韩高清免费| 欧美在线高清| 国产极品美女高潮无套久久久| 极品少妇xxxx精品少妇偷拍 | 久久精品色综合| 一区二区在线中文字幕电影视频| 欧美另类专区| 国产 porn| 99精品一区二区| 日韩a级片在线观看| 日韩欧美在线第一页| 国产婷婷在线视频| 亚洲欧洲视频在线| 爱情岛亚洲播放路线| 国产自摸综合网| 国产成人精品999在线观看| 欧美中文字幕在线观看视频 | 精品美女在线观看视频在线观看| 97免费视频在线播放| 日本免费在线一区| 日本不卡在线观看| 国产精品尤物| 一级黄色免费视频| 中文字幕综合网| 无码人妻久久一区二区三区 | 亚洲影院色无极综合| 国产欧美日韩在线观看视频| 日本大片免费看| 国产一区二区三区免费看| 中字幕一区二区三区乱码| 欧美日韩国产精品一区| 亚洲爱爱综合网| 久久91亚洲精品中文字幕| 国产精品xxx| 日本高清视频一区二区三区 | 国产成人aa精品一区在线播放| 91欧美极品| 亚洲色婷婷久久精品av蜜桃| 老汉av免费一区二区三区| 国产jjizz一区二区三区视频| 五月婷婷欧美视频| 肥臀熟女一区二区三区| 欧美成人网在线| gogo大尺度成人免费视频| 天堂精品视频| 奇米影视一区二区三区| 精品一区二区三区蜜桃在线| 色成人在线视频| 激情综合闲人网| 日本午夜精品理论片a级appf发布| 久久夜色精品国产噜噜av小说| 久久99久久99精品| 国产iv一区二区三区| 久久免费视频播放| 精品国产一区二区亚洲人成毛片| 里番在线播放| 国产一区二区三区四区五区加勒比| 国产精品v亚洲精品v日韩精品| 99热这里只有精品2| 一区二区三区在线视频观看58| 99这里有精品视频| 欧美极品少妇xxxxⅹ喷水| gogo久久日韩裸体艺术| 日韩欧美不卡在线| 久久综合中文字幕| 69视频免费看| 日韩一二三在线视频播| 久久综合偷偷噜噜噜色| 青春草国产视频| 97久久精品人人澡人人爽| 麻豆成人免费视频| 在线亚洲国产精品网| 91久久青草| av无码久久久久久不卡网站| 92国产精品观看| 青青国产在线视频| 精品国产区一区二区三区在线观看| 精品国产亚洲一区二区在线观看| av日韩在线看| 久久综合视频网| 亚洲综合一区中| 欧美富婆性猛交| 伊人成综合网yiren22| 欧美成年人视频在线观看| 有码一区二区三区| 亚洲AV成人无码一二三区在线| 日本精品久久久久影院| 国产精品毛片一区二区在线看| 中文字幕永久免费| 色婷婷av久久久久久久| 国产网站在线免费观看| 久久国产手机看片| 美女精品一区二区| 国产一级做a爰片在线看免费| 亚洲欧美国产日韩中文字幕| 伊人久久大香| 丰满爆乳一区二区三区| 国产精品国产精品国产专区不片| 黑人乱码一区二区三区av| 国产成人精品av在线| 在线观看日韩| 在线不卡av电影| 欧美mv日韩mv| 99久久婷婷国产综合精品首页 | 国产一区二区在线观看免费| 日韩熟女精品一区二区三区| 色哟哟入口国产精品| 日韩美女国产精品| 天天久久综合网| 91久久免费观看| 岛国毛片av在线| 在线日韩av永久免费观看|