0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

一套符合標(biāo)準(zhǔn)SQL語義的開發(fā)語言,link的最新特性

WpOh_rgznai100 ? 來源:lq ? 2019-07-18 14:12 ? 次閱讀

一、Flink SQL 背景

Flink SQL 是 Flink 實時計算為簡化計算模型,降低用戶使用實時計算門檻而設(shè)計的一套符合標(biāo)準(zhǔn) SQL 語義的開發(fā)語言。

自 2015 年開始,阿里巴巴開始調(diào)研開源流計算引擎,最終決定基于 Flink 打造新一代計算引擎,針對 Flink 存在的不足進行優(yōu)化和改進,并且在 2019 年初將最終代碼開源,也就是我們熟知的 Blink。Blink 在原來的 Flink 基礎(chǔ)上最顯著的一個貢獻就是 Flink SQL 的實現(xiàn)。

Flink SQL 是面向用戶的 API 層,在我們傳統(tǒng)的流式計算領(lǐng)域,比如 Storm、Spark Streaming 都會提供一些 Function 或者 Datastream API,用戶通過 Java 或 Scala 寫業(yè)務(wù)邏輯,這種方式雖然靈活,但有一些不足,比如具備一定門檻且調(diào)優(yōu)較難,隨著版本的不斷更新,API 也出現(xiàn)了很多不兼容的地方。

在這個背景下,毫無疑問,SQL 就成了我們最佳選擇,之所以選擇將 SQL 作為核心 API,是因為其具有幾個非常重要的特點:

SQL 屬于設(shè)定式語言,用戶只要表達清楚需求即可,不需要了解具體做法;

SQL 可優(yōu)化,內(nèi)置多種查詢優(yōu)化器,這些查詢優(yōu)化器可為 SQL 翻譯出最優(yōu)執(zhí)行計劃;

SQL 易于理解,不同行業(yè)和領(lǐng)域的人都懂,學(xué)習(xí)成本較低;

SQL 非常穩(wěn)定,在數(shù)據(jù)庫 30 多年的歷史中,SQL 本身變化較少;

流與批的統(tǒng)一,F(xiàn)link 底層 Runtime 本身就是一個流與批統(tǒng)一的引擎,而 SQL 可以做到 API 層的流與批統(tǒng)一。

二、Flink 的最新特性(1.7.0 和 1.8.0 更新)

2.1 Flink 1.7.0 新特性

在 Flink 1.7.0 中,我們更接近實現(xiàn)快速數(shù)據(jù)處理和以無縫方式為 Flink 社區(qū)構(gòu)建數(shù)據(jù)密集型應(yīng)用程序的目標(biāo)。最新版本包括一些新功能和改進,例如對 Scala 2.12 的支持、一次性 S3 文件接收器、復(fù)雜事件處理與流 SQL 的集成等。

Apache Flink 中對 Scala 2.12 的支持(FLINK-7811)

Apache Flink 1.7.0 是第一個完全支持 Scala 2.12 的版本。這允許用戶使用較新的 Scala 版本編寫 Flink 應(yīng)用程序并利用 Scala 2.12 生態(tài)系統(tǒng)。

狀態(tài)演進(FLINK-9376)

許多情況下,由于需求的變化,長期運行的 Flink 應(yīng)用程序需要在其生命周期內(nèi)發(fā)展。在不失去當(dāng)前應(yīng)用程序進度狀態(tài)的情況下更改用戶狀態(tài)是應(yīng)用程序發(fā)展的關(guān)鍵要求。使用 Flink 1.7.0,社區(qū)添加了狀態(tài)演變,允許您靈活地調(diào)整長時間運行的應(yīng)用程序的用戶狀態(tài)模式,同時保持與以前保存點的兼容性。通過狀態(tài)演變,可以在狀態(tài)模式中添加或刪除列,以便更改應(yīng)用程序部署后應(yīng)用程序捕獲的業(yè)務(wù)功能。現(xiàn)在,使用 Avro 生成時,狀態(tài)模式演變現(xiàn)在可以立即使用作為用戶狀態(tài)的類,這意味著可以根據(jù) Avro 的規(guī)范來演變國家的架構(gòu)。雖然 Avro 類型是 Flink 1.7 中唯一支持模式演變的內(nèi)置類型,但社區(qū)仍在繼續(xù)致力于在未來的 Flink 版本中進一步擴展對其他類型的支持。

MATCH RECOGNIZE Streaming SQL 支持(FLINK-6935)

這是 Apache Flink 1.7.0 的一個重要補充,它為 Flink SQL 提供了 MATCH RECOGNIZE 標(biāo)準(zhǔn)的初始支持。此功能結(jié)合了復(fù)雜事件處理(CEP)和 SQL,可以輕松地對數(shù)據(jù)流進行模式匹配,從而實現(xiàn)一整套新的用例。此功能目前處于測試階段,因此我們歡迎社區(qū)提供任何反饋和建議。

流式 SQL 中的時態(tài)表和時間連接(FLINK-9712)

時態(tài)表是 Apache Flink 中的一個新概念,它為表的更改歷史提供(參數(shù)化)視圖,并在特定時間點返回表的內(nèi)容。例如,我們可以使用具有歷史貨幣匯率的表格。隨著時間的推移,這種表格不斷增長/發(fā)展,并且增加了新的更新匯率。時態(tài)表是一種視圖,可以將這些匯率的實際狀態(tài)返回到任何給定的時間點。使用這樣的表,可以使用正確的匯率將不同貨幣的訂單流轉(zhuǎn)換為通用貨幣。時間聯(lián)接允許使用不斷變化/更新的表來進行內(nèi)存和計算有效的流數(shù)據(jù)連接。

Streaming SQL 的其他功能

除了上面提到的主要功能外,F(xiàn)link 的 Table&SQL API 已經(jīng)擴展到更多用例。以下內(nèi)置函數(shù)被添加到 API:TO_BASE64、LOG2、LTRIM、REPEAT、REPLACE、COSH、SINH、TANH SQL Client 現(xiàn)在支持在環(huán)境文件和 CLI 會話中定義視圖。此外,CLI 中添加了基本的 SQL 語句自動完成功能。社區(qū)添加了一個 Elasticsearch 6 表接收器,允許存儲動態(tài)表的更新結(jié)果。

Kafka 2.0 連接器(FLINK-10598)

Apache Flink 1.7.0 繼續(xù)添加更多連接器,使其更容易與更多外部系統(tǒng)進行交互。在此版本中,社區(qū)添加了 Kafka 2.0 連接器,該連接器允許通過一次性保證讀取和寫入 Kafka 2.0。

本地恢復(fù)(FLINK-9635)

Apache Flink 1.7.0 通過擴展 Flink 的調(diào)度來完成本地恢復(fù)功能,以便在恢復(fù)時考慮以前的部署位置。如果啟用了本地恢復(fù),F(xiàn)link 將保留最新檢查點的本地副本任務(wù)運行的機器。通過將任務(wù)調(diào)度到以前的位置,F(xiàn)link 將通過從本地磁盤讀取檢查點狀態(tài)來最小化恢復(fù)狀態(tài)的網(wǎng)絡(luò)流量。此功能大大提高了恢復(fù)速度。

2.2 Flink 1.8.0 新特性

Flink 1.8.0 引入對狀態(tài)的清理

使用 TTL(生存時間)連續(xù)增量清除舊的 Key 狀態(tài) Flink 1.8 引入了對 RocksDB 狀態(tài)后端(FLINK-10471)和堆狀態(tài)后端(FLINK-10473)的舊數(shù)據(jù)的連續(xù)清理。這意味著舊的數(shù)據(jù)將(根據(jù) TTL 設(shè)置)不斷被清理掉。

新增和刪除一些 Table API

1) 引入新的 CSV 格式符(FLINK-9964)

此版本為符合 RFC4180 的 CSV 文件引入了新的格式符。新描述符可以使用 org.apache.flink.table.descriptors.Csv。目前,只能與 Kafka 一起使用。舊描述符 org.apache.flink.table.descriptors.OldCsv 用于文件系統(tǒng)連接器。

2) 靜態(tài)生成器方法在 TableEnvironment(FLINK-11445)上的棄用

為了將 API 與實際實現(xiàn)分開,TableEnvironment.getTableEnvironment() 不推薦使用靜態(tài)方法?,F(xiàn)在推薦使用Batch/StreamTableEnvironment.create()。

3) 表 API Maven 模塊中的更改(FLINK-11064)

之前具有 flink-table 依賴關(guān)系的用戶需要更新其依賴關(guān)系 flink-table-planner,以及正確的依賴關(guān)系 flink-table-api-*,具體取決于是使用 Java 還是 Scala: flink-table-api-java-bridge 或者 flink-table-api-scala-bridge。

Kafka Connector 的修改

引入可直接訪問 ConsumerRecord 的新KafkaDeserializationSchema(FLINK-8354),對于 FlinkKafkaConsumers 推出了一個新的 KafkaDeserializationSchema,可以直接訪問 KafkaConsumerRecord。

三、Flink SQL 的編程模型

Flink 的編程模型基礎(chǔ)構(gòu)建模塊是流(streams)與轉(zhuǎn)換 (transformations),每一個數(shù)據(jù)流起始于一個或多個 source,并終止于一個或多個 sink。

相信大家對上面的圖已經(jīng)十分熟悉了,當(dāng)然基于 Flink SQL 編寫的 Flink 程序也離不開讀取原始數(shù)據(jù),計算邏輯和寫入計算結(jié)果數(shù)據(jù)三部分。

一個完整的 Flink SQL 編寫的程序包括如下三部分:

Source Operator:Soruce operator 是對外部數(shù)據(jù)源的抽象, 目前 Apache Flink 內(nèi)置了很多常用的數(shù)據(jù)源實現(xiàn)例如 MySQL、Kafka 等;

Transformation Operators:算子操作主要完成例如查詢、聚合操作等,目前 Flink SQL 支持了 Union、Join、Projection、Difference、Intersection 及 window 等大多數(shù)傳統(tǒng)數(shù)據(jù)庫支持的操作;

Sink Operator:Sink operator 是對外結(jié)果表的抽象,目前 Apache Flink 也內(nèi)置了很多常用的結(jié)果表的抽象,比如 Kafka Sink 等

我們通過用一個最經(jīng)典的 WordCount 程序作為入門,看一下傳統(tǒng)的基于 DataSet/DataStream API 開發(fā)和基于 SQL 開發(fā)有哪些不同?

DataStream/DataSetAPI

public class WordCount { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet text = env.fromElements( "Hello", "Flink", "Hello", "Blink" ); DataSet> counts = text.flatMap(new LineSplitter()) .groupBy(0) .sum(1); counts.print(); } public static final class LineSplitter implements FlatMapFunction> { @Override public void flatMap(String value, Collector> out) { String[] tokens = value.toLowerCase().split("\W+"); for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2(token, 1)); } } } }}

Flink SQL

//省略掉初始化環(huán)境等公共代碼SELECT word, COUNT(word) FROM table GROUP BY word;

我們已經(jīng)可以直觀體會到,SQL 開發(fā)的快捷和便利性了。

四、Flink SQL 的語法和算子

4.1 Flink SQL 支持的語法

Flink SQL 核心算子的語義設(shè)計參考了 1992、2011 等 ANSI-SQL 標(biāo)準(zhǔn),F(xiàn)link 使用 Apache Calcite 解析 SQL ,Calcite 支持標(biāo)準(zhǔn)的 ANSI SQL。

那么 Flink 自身支持的 SQL 語法有哪些呢?

insert: INSERT INTO tableReference queryquery: values | { select | selectWithoutFrom | query UNION [ ALL ] query | query EXCEPT query | query INTERSECT query } [ ORDER BY orderItem [, orderItem ]* ] [ LIMIT { count | ALL } ] [ OFFSET start { ROW | ROWS } ] [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]orderItem: expression [ ASC | DESC ]select: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] [ HAVING booleanExpression ] [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]selectWithoutFrom: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* }projectItem: expression [ [ AS ] columnAlias ] | tableAlias . *tableExpression: tableReference [, tableReference ]* | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]joinCondition: ON booleanExpression | USING '(' column [, column ]* ')'tableReference: tablePrimary [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]tablePrimary: [ TABLE ] [ [ catalogName . ] schemaName . ] tableName | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')' | UNNEST '(' expression ')'values: VALUES expression [, expression ]*groupItem: expression | '(' ')' | '(' expression [, expression ]* ')' | CUBE '(' expression [, expression ]* ')' | ROLLUP '(' expression [, expression ]* ')' | GROUPING SETS '(' groupItem [, groupItem ]* ')'windowRef: windowName | windowSpecwindowSpec: [ windowName ] '(' [ ORDER BY orderItem [, orderItem ]* ] [ PARTITION BY expression [, expression ]* ] [ RANGE numericOrIntervalExpression {PRECEDING} | ROWS numericExpression {PRECEDING} ] ')'

上面 SQL 的語法支持也已經(jīng)表明了 Flink SQL 對算子的支持,接下來我們對 Flink SQL 中最常見的算子語義進行介紹。

4.2 Flink SQL 常用算子

SELECT

SELECT 用于從 DataSet/DataStream 中選擇數(shù)據(jù),用于篩選出某些列。

示例:

SELECT * FROM Table;// 取出表中的所有列SELECT name,age FROM Table;// 取出表中 name 和 age 兩列

與此同時 SELECT 語句中可以使用函數(shù)和別名,例如我們上面提到的 WordCount 中:

SELECT word, COUNT(word) FROM table GROUP BY word;

WHERE

WHERE 用于從數(shù)據(jù)集/流中過濾數(shù)據(jù),與 SELECT 一起使用,用于根據(jù)某些條件對關(guān)系做水平分割,即選擇符合條件的記錄。

示例:

SELECT name,age FROM Table where name LIKE ‘% 小明 %’;SELECT * FROM Table WHERE age = 20;

WHERE 是從原數(shù)據(jù)中進行過濾,那么在 WHERE 條件中,F(xiàn)link SQL 同樣支持 =、<、>、<>、>=、<=,以及 AND、OR 等表達式的組合,最終滿足過濾條件的數(shù)據(jù)會被選擇出來。并且 WHERE 可以結(jié)合 IN、NOT IN 聯(lián)合使用。舉個負責(zé)的例子:

SELECT name, ageFROM TableWHERE name IN (SELECT name FROM Table2)

DISTINCT

DISTINCT 用于從數(shù)據(jù)集/流中去重根據(jù) SELECT 的結(jié)果進行去重。

示例:

SELECT DISTINCT name FROM Table;

對于流式查詢,計算查詢結(jié)果所需的 State 可能會無限增長,用戶需要自己控制查詢的狀態(tài)范圍,以防止?fàn)顟B(tài)過大。

GROUP BY

GROUP BY 是對數(shù)據(jù)進行分組操作。例如我們需要計算成績明細表中,每個學(xué)生的總分。

SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name;

UNION和UNION ALL

UNION 用于將兩個結(jié)果集合并起來,要求兩個結(jié)果集字段完全一致,包括字段類型、字段順序。不同于 UNION ALL 的是,UNION 會對結(jié)果數(shù)據(jù)去重。

示例:

SELECT * FROM T1 UNION (ALL) SELECT * FROM T2;

JOIN

JOIN 用于把來自兩個表的數(shù)據(jù)聯(lián)合起來形成結(jié)果表,F(xiàn)link 支持的 JOIN 類型包括:

JOIN - INNER JOIN

LEFT JOIN - LEFT OUTER JOIN

RIGHT JOIN - RIGHT OUTER JOIN

FULL JOIN - FULL OUTER JOIN

這里的 JOIN 的語義和我們在關(guān)系型數(shù)據(jù)庫中使用的 JOIN 語義一致。

示例:

JOIN(將訂單表數(shù)據(jù)和商品表進行關(guān)聯(lián))SELECT * FROM Orders INNER JOIN Product ON Orders.productId = [Product.id](http://product.id/)

LEFT JOIN 與 JOIN 的區(qū)別是當(dāng)右表沒有與左邊相 JOIN 的數(shù)據(jù)時候,右邊對應(yīng)的字段補 NULL 輸出,RIGHT JOIN 相當(dāng)于 LEFT JOIN 左右兩個表交互一下位置。FULL JOIN 相當(dāng)于 RIGHT JOIN 和 LEFT JOIN 之后進行 UNION ALL 操作。

示例:

SELECT *FROM Orders LEFT JOIN Product ON Orders.productId = [Product.id](http://product.id/)SELECT *FROM Orders RIGHT JOIN Product ON Orders.productId = [Product.id](http://product.id/)SELECT *FROM Orders FULL OUTER JOIN Product ON Orders.productId = [Product.id](http://product.id/)

Group Window

根據(jù)窗口數(shù)據(jù)劃分的不同,目前 Apache Flink 有如下 3 種 Bounded Window:

Tumble,滾動窗口,窗口數(shù)據(jù)有固定的大小,窗口數(shù)據(jù)無疊加;

Hop,滑動窗口,窗口數(shù)據(jù)有固定大小,并且有固定的窗口重建頻率,窗口數(shù)據(jù)有疊加;

Session,會話窗口,窗口數(shù)據(jù)沒有固定的大小,根據(jù)窗口數(shù)據(jù)活躍程度劃分窗口,窗口數(shù)據(jù)無疊加。

Tumble Window

Tumble 滾動窗口有固定大小,窗口數(shù)據(jù)不重疊,具體語義如下:

Tumble 滾動窗口對應(yīng)的語法如下:

SELECT [gk], [TUMBLE_START(timeCol, size)], [TUMBLE_END(timeCol, size)], agg1(col1), ... aggn(colN)FROM Tab1GROUP BY [gk], TUMBLE(timeCol, size)

其中:

[gk] 決定了是否需要按照字段進行聚合;

TUMBLE_START 代表窗口開始時間;

TUMBLE_END 代表窗口結(jié)束時間;

timeCol 是流表中表示時間字段;

size 表示窗口的大小,如 秒、分鐘、小時、天。

舉個例子,假如我們要計算每個人每天的訂單量,按照 user 進行聚合分組:

SELECT user, TUMBLE_START(rowtime, INTERVAL ‘1’ DAY) as wStart, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL ‘1’ DAY), user;

Hop Window

Hop 滑動窗口和滾動窗口類似,窗口有固定的 size,與滾動窗口不同的是滑動窗口可以通過 slide 參數(shù)控制滑動窗口的新建頻率。因此當(dāng) slide 值小于窗口 size 的值的時候多個滑動窗口會重疊,具體語義如下:

Hop 滑動窗口對應(yīng)語法如下:

SELECT [gk], [HOP_START(timeCol, slide, size)] , [HOP_END(timeCol, slide, size)], agg1(col1), ... aggN(colN) FROM Tab1GROUP BY [gk], HOP(timeCol, slide, size)

每次字段的意思和 Tumble 窗口類似:

[gk] 決定了是否需要按照字段進行聚合;

HOP_START 表示窗口開始時間;

HOP_END 表示窗口結(jié)束時間;

timeCol 表示流表中表示時間字段;

slide 表示每次窗口滑動的大?。?/p>

size 表示整個窗口的大小,如 秒、分鐘、小時、天。

舉例說明,我們要每過一小時計算一次過去 24 小時內(nèi)每個商品的銷量:

SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product

Session Window

會話時間窗口沒有固定的持續(xù)時間,但它們的界限由 interval 不活動時間定義,即如果在定義的間隙期間沒有出現(xiàn)事件,則會話窗口關(guān)閉。

Seeeion 會話窗口對應(yīng)語法如下:

SELECT [gk], SESSION_START(timeCol, gap) AS winStart, SESSION_END(timeCol, gap) AS winEnd, agg1(col1), ... aggn(colN)FROM Tab1GROUP BY [gk], SESSION(timeCol, gap)

[gk] 決定了是否需要按照字段進行聚合;

SESSION_START 表示窗口開始時間;

SESSION_END 表示窗口結(jié)束時間;

timeCol 表示流表中表示時間字段;

gap 表示窗口數(shù)據(jù)非活躍周期的時長。

例如,我們需要計算每個用戶訪問時間 12 小時內(nèi)的訂單量:

SELECT user, SESSION_START(rowtime, INTERVAL ‘12’ HOUR) AS sStart, SESSION_ROWTIME(rowtime, INTERVAL ‘12’ HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(rowtime, INTERVAL ‘12’ HOUR), user

五、Flink SQL 的內(nèi)置函數(shù)

Flink 提供大量的內(nèi)置函數(shù)供我們直接使用,我們常用的內(nèi)置函數(shù)分類如下:

比較函數(shù)

邏輯函數(shù)

算術(shù)函數(shù)

字符串處理函數(shù)

時間函數(shù)

我們接下來對每種函數(shù)舉例進行講解。

5.1 比較函數(shù)

比較函數(shù)

描述

value1=value2 如果 value1 等于 value2,則返回 TRUE ; 如果 value1 或 value2 為 NULL,則返回 UNKNOWN
value1<>value2 如果 value1 不等于 value2,則返回 TRUE ; 如果 value1 或 value2 為 NULL,則返回 UNKNOWN
value1>value2 如果 value1 大于 value2,則返回 TRUE ; 如果 value1 或 value2 為 NULL,則返回 UNKNOWN
value1 < value2 如果 value1 小于 value2,則返回 TRUE ; 如果 value1 或 value2 為 NULL,則返回 UNKNOWN
value IS NULL 如果 value 為 NULL,則返回 TRUE
value IS NOT NULL 如果 value 不為 NULL,則返回 TRUE
string1 LIKE string2 如果 string1 匹配模式 string2,則返回 TRUE ; 如果 string1 或 string2為 NULL,則返回UNKNOWN
value1 IN (value2, value3…) 如果給定列表中存在 value1 (value2,value3,…),則返回 TRUE 。當(dāng)(value2,value3,…)包含 NULL,如果可以找到該數(shù)據(jù)元則返回 TRUE,否則返回 UNKNOWN。如果 value1 為 NULL,則始終返回 UNKNOWN

5.2 邏輯函數(shù)

邏輯函數(shù)

描述

A OR B 如果 A 為 TRUE 或 B 為 TRUE,則返回 TRUE
A AND B 如果 A 和 B 都為 TRUE,則返回 TRUE
NOT boolean 如果 boolean 為 FALSE,則返回 TRUE,否則返回 TRUE。如果 boolean 為 TRUE,則返回 FALSE
A IS TRUE 或 FALSE 判斷 A 是否為真

5.3 算術(shù)函數(shù)

算術(shù)函數(shù)

描述

numeric1 ±*/ numeric2 分別代表兩個數(shù)值加減乘除
ABS(numeric) 返回 numeric 的絕對值
POWER(numeric1, numeric2) 返回 numeric1 上升到 numeric2 的冪

除了上述表中的函數(shù),F(xiàn)link SQL 還支持種類豐富的函數(shù)計算。

5.4 字符串處理函數(shù)

字符串函數(shù)

描述

UPPER/LOWER 以大寫 / 小寫形式返回字符串
LTRIM(string) 返回一個字符串,從去除左空格的字符串, 類似還有 RTRIM
CONCAT(string1, string2,…) 返回連接 string1,string2,…的字符串

5.5 時間函數(shù)

時間函數(shù)

描述

DATE string 返回以“yyyy-MM-dd”形式從字符串解析的 SQL 日期
TIMESTAMP string 返回以字符串形式解析的 SQL 時間戳,格式為“yyyy-MM-dd HH:mm:ss [.SSS]”
CURRENT_DATE 返回 UTC 時區(qū)中的當(dāng)前 SQL 日期
DATE_FORMAT(timestamp, string) 返回使用指定格式字符串格式化時間戳的字符串

六、Flink SQL 實戰(zhàn)應(yīng)用

上面我們分別介紹了 Flink SQL 的背景、新特性、編程模型和常用算子,這部分我們將模擬一個真實的案例為大家使用 Flink SQL 提供一個完整的 Demo。

相信這里應(yīng)該有很多 NBA 的球迷,假設(shè)我們有一份數(shù)據(jù)記錄了每個賽季的得分王的數(shù)據(jù),包括賽季、球員、出場、首發(fā)、時間、助攻、搶斷、蓋帽、得分等?,F(xiàn)在我們要統(tǒng)計獲得得分王榮譽最多的三名球員。

原數(shù)據(jù)存在 score.csv 文件中,如下:

17-18,詹姆斯-哈登,72,72,35.4,8.8,1.8,0.7,30.416-17,拉塞爾-威斯布魯克,81,81,34.6,10.4,1.6,0.4,31.615-16,斯蒂芬-庫里,79,79,34.2,6.7,2.1,0.2,30.114-15,拉塞爾-威斯布魯克,67,67,34.4,8.6,2.1,0.2,28.113-14,凱文-杜蘭特,81,81,38.5,5.5,1.3,0.7,3212-13,卡梅羅-安東尼,67,67,37,2.6,0.8,0.5,28.711-12,凱文-杜蘭特,66,66,38.6,3.5,1.3,1.2,2810-11,凱文-杜蘭特,78,78,38.9,2.7,1.1,1,27.709-10,凱文-杜蘭特,82,82,39.5,2.8,1.4,1,30.108-09,德維恩-韋德,79,79,38.6,7.5,2.2,1.3,30.207-08,勒布朗-詹姆斯,75,74,40.4,7.2,1.8,1.1,3006-07,科比-布萊恩特,77,77,40.8,5.4,1.4,0.5,31.605-06,科比-布萊恩特,80,80,41,4.5,1.8,0.4,35.404-05,阿倫-艾弗森,75,75,42.3,7.9,2.4,0.1,30.703-04,特雷西·麥克格雷迪,67,67,39.9,5.5,1.4,0.6,2802-03,特雷西·麥克格雷迪,75,74,39.4,5.5,1.7,0.8,32.101-02,阿倫-艾弗森,60,59,43.7,5.5,2.8,0.2,31.400-01,阿倫-艾弗森,71,71,42,4.6,2.5,0.3,31.199-00,沙奎爾-奧尼爾,79,79,40,3.8,0.5,3,29.798-99,阿倫-艾弗森,48,48,41.5,4.6,2.3,0.1,26.897-98,邁克爾-喬丹,82,82,38.8,3.5,1.7,0.5,28.796-97,邁克爾-喬丹,82,82,37.9,4.3,1.7,0.5,29.695-96,邁克爾-喬丹,82,82,37.7,4.3,2.2,0.5,30.494-95,沙奎爾-奧尼爾,79,79,37,2.7,0.9,2.4,29.393-94,大衛(wèi)-羅賓遜,80,80,40.5,4.8,1.7,3.3,29.892-93,邁克爾-喬丹,78,78,39.3,5.5,2.8,0.8,32.691-92,邁克爾-喬丹,80,80,38.8,6.1,2.3,0.9,30.190-91,邁克爾-喬丹,82,82,37,5.5,2.7,1,31.589-90,邁克爾-喬丹,82,82,39,6.3,2.8,0.7,33.688-89,邁克爾-喬丹,81,81,40.2,8,2.9,0.8,32.587-88,邁克爾-喬丹,82,82,40.4,5.9,3.2,1.6,3586-87,邁克爾-喬丹,82,82,40,4.6,2.9,1.5,37.185-86,多米尼克-威爾金斯,78,78,39.1,2.6,1.8,0.6,30.384-85,伯納德-金,55,55,37.5,3.7,1.3,0.3,32.983-84,阿德里安-丹特利,79,79,37.8,3.9,0.8,0.1,30.682-83,阿歷克斯-英格利什,82,82,36.4,4.8,1.4,1.5,28.481-82,喬治-格文,79,79,35.7,2.4,1,0.6,32.3

首先我們需要創(chuàng)建一個工程,并且在 Maven 中有如下依賴:

UTF-8 1.7.1 1.7.7 1.2.17 2.11 org.apache.flink flink-core ${flink.version} org.apache.flink flink-java ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-table_2.11 1.7.1 org.apache.flink flink-streaming-scala_${scala.binary.version} 1.7.1 org.slf4j slf4j-log4j12 ${slf4j.version} log4j log4j ${log4j.version}

第一步,創(chuàng)建上下文環(huán)境:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);

第二步,讀取 score.csv 并且作為 source 輸入:

DataSet input = env.readTextFile("score.csv"); DataSet topInput = input.map(new MapFunction() { @Override public PlayerData map(String s) throws Exception { String[] split = s.split(","); return new PlayerData(String.valueOf(split[0]), String.valueOf(split[1]), String.valueOf(split[2]), Integer.valueOf(split[3]), Double.valueOf(split[4]), Double.valueOf(split[5]), Double.valueOf(split[6]), Double.valueOf(split[7]), Double.valueOf(split[8]) ); } });其中的PlayerData類為自定義類:public static class PlayerData { /** * 賽季,球員,出場,首發(fā),時間,助攻,搶斷,蓋帽,得分 */ public String season; public String player; public String play_num; public Integer first_court; public Double time; public Double assists; public Double steals; public Double blocks; public Double scores; public PlayerData() { super(); } public PlayerData(String season, String player, String play_num, Integer first_court, Double time, Double assists, Double steals, Double blocks, Double scores ) { this.season = season; this.player = player; this.play_num = play_num; this.first_court = first_court; this.time = time; this.assists = assists; this.steals = steals; this.blocks = blocks; this.scores = scores; } }

第三步,將 source 數(shù)據(jù)注冊成表:

Table topScore = tableEnv.fromDataSet(topInput);tableEnv.registerTable("score", topScore);

第四步,核心處理邏輯 SQL 的編寫:

Table queryResult = tableEnv.sqlQuery("select player, count(season) as num FROM score GROUP BY player ORDER BY num desc LIMIT 3");

第五步,輸出結(jié)果:

DataSet result = tableEnv.toDataSet(queryResult, Result.class);result.print();

我們直接運行整個程序,觀察輸出結(jié)果:

...16:28:06,162 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Shut down complete.16:28:06,162 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Stop job leader service.16:28:06,164 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Stopped TaskExecutor akka://flink/user/taskmanager_2.16:28:06,166 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.16:28:06,166 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.16:28:06,169 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.16:28:06,177 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopping Akka RPC service.16:28:06,187 INFO org.apache.flink.runtime.blob.PermanentBlobCache - Shutting down BLOB cache16:28:06,187 INFO org.apache.flink.runtime.blob.TransientBlobCache - Shutting down BLOB cache16:28:06,188 INFO org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:5170316:28:06,188 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC service.邁克爾-喬丹:10凱文-杜蘭特:4阿倫-艾弗森:4

我們看到控制臺已經(jīng)輸出結(jié)果了:

完整的代碼如下:

import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.java.BatchTableEnvironment;public class TableSQL { public static void main(String[] args) throws Exception{ //1. 獲取上下文環(huán)境 table的環(huán)境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env); //2. 讀取score.csv DataSet input = env.readTextFile("score.csv"); input.print(); DataSet topInput = input.map(new MapFunction() { @Override public PlayerData map(String s) throws Exception { String[] split = s.split(","); return new PlayerData(String.valueOf(split[0]), String.valueOf(split[1]), String.valueOf(split[2]), Integer.valueOf(split[3]), Double.valueOf(split[4]), Double.valueOf(split[5]), Double.valueOf(split[6]), Double.valueOf(split[7]), Double.valueOf(split[8]) ); } }); //3. 注冊成內(nèi)存表 Table topScore = tableEnv.fromDataSet(topInput); tableEnv.registerTable("score", topScore); //4. 編寫sql 然后提交執(zhí)行 //select player, count(season) as num from score group by player order by num desc; Table queryResult = tableEnv.sqlQuery("select player, count(season) as num from score group by player order by num desc limit 3"); //5. 結(jié)果進行打印 DataSet result = tableEnv.toDataSet(queryResult, Result.class); result.print(); } public static class PlayerData { /** * 賽季,球員,出場,首發(fā),時間,助攻,搶斷,蓋帽,得分 */ public String season; public String player; public String play_num; public Integer first_court; public Double time; public Double assists; public Double steals; public Double blocks; public Double scores; public PlayerData() { super(); } public PlayerData(String season, String player, String play_num, Integer first_court, Double time, Double assists, Double steals, Double blocks, Double scores ) { this.season = season; this.player = player; this.play_num = play_num; this.first_court = first_court; this.time = time; this.assists = assists; this.steals = steals; this.blocks = blocks; this.scores = scores; } } public static class Result { public String player; public Long num; public Result() { super(); } public Result(String player, Long num) { this.player = player; this.num = num; } @Override public String toString() { return player + ":" + num; } }}//

當(dāng)然我們也可以自定義一個 Sink,將結(jié)果輸出到一個文件中,例如:

TableSink sink = new CsvTableSink("/home/result.csv", ","); String[] fieldNames = {"name", "num"}; TypeInformation[] fieldTypes = {Types.STRING, Types.INT}; tableEnv.registerTableSink("result", fieldNames, fieldTypes, sink); sqlQuery.insertInto("result"); env.execute();

然后我們運行程序,可以看到 /home 目錄下生成的 result.csv,查看結(jié)果:

邁克爾-喬丹,10凱文-杜蘭特,4阿倫-艾弗森,4

七、總結(jié)

本篇向大家介紹了 Flink SQL 產(chǎn)生的背景,F(xiàn)link SQL 大部分核心功能,并且分別介紹了 Flink SQL 的編程模型和常用算子及內(nèi)置函數(shù)。最后以一個完整的示例展示了如何編寫 Flink SQL 程序。Flink SQL 的簡便易用極大地降低了 Flink 編程的門檻,是我們必需掌握的使用 Flink 解決流式計算問題最鋒利的武器!

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 編程
    +關(guān)注

    關(guān)注

    88

    文章

    3565

    瀏覽量

    93536
  • SQL
    SQL
    +關(guān)注

    關(guān)注

    1

    文章

    753

    瀏覽量

    44032
  • 應(yīng)用程序
    +關(guān)注

    關(guān)注

    37

    文章

    3237

    瀏覽量

    57547

原文標(biāo)題:Flink最鋒利的武器:Flink SQL入門和實戰(zhàn) | 附完整實現(xiàn)代碼

文章出處:【微信號:rgznai100,微信公眾號:rgznai100】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。

收藏 人收藏

    評論

    相關(guān)推薦

    一套常用的AD封裝庫

    一套常用的AD封裝庫
    發(fā)表于 01-24 15:22

    NLPIR語義分析是對自然語言處理的完美理解

    ?! ∪祟愖匀?b class='flag-5'>語言通常以詞為基本構(gòu)成單位,進而構(gòu)成句子,再由句子形成篇章。篇章的語義由篇章中包含的所有句子的語義綜合而成,而句子的語義又由句中的詞語
    發(fā)表于 10-19 11:34

    一套完整的PCB布線設(shè)計

    來源:互聯(lián)網(wǎng)PCB 布線設(shè)計中,對于布通率的的提高有一套完整的方法,在此,我們?yōu)榇蠹姨峁┨岣?PCB 設(shè)計布通率以及設(shè)計效率的有效技巧,不僅能為客戶節(jié)省項目開發(fā)周期,還能最大限度的保證設(shè)計成品的質(zhì)量。
    發(fā)表于 10-22 08:00

    如何制作一套波形發(fā)生系統(tǒng)?

    本設(shè)計中涉及到單片機匯編語言、VHDL語言的運用,充分地利用了二者的優(yōu)點,制作了一套波形發(fā)生系統(tǒng)。
    發(fā)表于 04-21 06:09

    分享一套通用的開發(fā)環(huán)境搭建教程

    針對嵌入式的開發(fā),不同的開發(fā)者使用不同的操作系統(tǒng),可能是Windows 或者 Linux,隨之而來所搭建的開發(fā)環(huán)境亦不同。所以在這里希望分享一套通用的
    發(fā)表于 11-05 06:23

    一套支持中文C語言編程的鴻蒙Hi3861智能硬件開發(fā)套件

    由于目前學(xué)習(xí)和開發(fā)開源鴻蒙Hi3861難度比較大,整了一套支持中文C語言編程的鴻蒙Hi3861智能硬件開發(fā)套件,難度直接從專業(yè)級降到入門級,主要在開源鴻蒙
    發(fā)表于 04-10 20:34

    慕尼黑大學(xué)開發(fā)一套RFID應(yīng)用模型

    慕尼黑科技大學(xué)了開發(fā)一套RFID應(yīng)用模型:在木材采伐機上安裝支工業(yè)訂槍,將RFID標(biāo)簽訂在切割下來的圓木的表面上。這套系統(tǒng)可以幫助木材公司追蹤木材的多重處理流程和運往加工廠的運送過程。 慕尼黑
    發(fā)表于 12-13 14:26 ?755次閱讀

    關(guān)系數(shù)據(jù)庫標(biāo)準(zhǔn)語言SQL的資料說明

    本文檔的主要內(nèi)容詳細介紹的是關(guān)系數(shù)據(jù)庫標(biāo)準(zhǔn)語言SQL的資料說明。
    發(fā)表于 03-23 16:51 ?6次下載
    關(guān)系數(shù)據(jù)庫<b class='flag-5'>標(biāo)準(zhǔn)語言</b><b class='flag-5'>SQL</b>的資料說明

    如何開發(fā)一套高精密的LCR阻抗標(biāo)準(zhǔn)的研究說明

    針對當(dāng)前LCR阻抗標(biāo)準(zhǔn)器存在的關(guān)鍵技術(shù)問題,通過研究,開發(fā)一套高精密的LCR阻抗標(biāo)準(zhǔn).
    發(fā)表于 12-08 08:00 ?1次下載
    如何<b class='flag-5'>開發(fā)</b><b class='flag-5'>一套</b>高精密的LCR阻抗<b class='flag-5'>標(biāo)準(zhǔn)</b>的研究說明

    一套基于ARM的卷煙防偽識別系統(tǒng)設(shè)計與實現(xiàn)

    為有效打擊假冒、走私、串貨卷煙等違法行為,采用信息技術(shù),設(shè)計并開發(fā)一套卷煙防偽識別系統(tǒng)。以卷煙32位代碼作
    的頭像 發(fā)表于 05-05 09:05 ?1765次閱讀
    <b class='flag-5'>一套</b>基于ARM的卷煙防偽識別系統(tǒng)設(shè)計與實現(xiàn)

    altiumdesigner多少錢一套

    altiumdesigner多少錢一套 altiumdesigner是很多工程師都要用到的設(shè)計工具,那么altiumdesigner多少錢一套? altiumdesigner口碑評價很好
    的頭像 發(fā)表于 04-07 11:36 ?3.2w次閱讀
    altiumdesigner多少錢<b class='flag-5'>一套</b>

    開發(fā)支持符合AUTOSAR標(biāo)準(zhǔn)的軟件組件的建模特定領(lǐng)域的語言

    開發(fā)支持符合AUTOSAR標(biāo)準(zhǔn)的軟件組件的建模特定領(lǐng)域的語言。支持建模軟件組件的應(yīng)用是基于TextX python模塊和內(nèi)部開發(fā)的建??蚣堋?/div>
    的頭像 發(fā)表于 10-08 11:19 ?1045次閱讀

    PLC常見的5種標(biāo)準(zhǔn)編程語言

    IEC 1131-3的編程語言是IEC工作組 對世界范圍的PLC廠家的編程語言合理地吸收、借鑒的基礎(chǔ)上形成的一套針對工業(yè)控制系統(tǒng)的國際編程語言標(biāo)準(zhǔn)
    發(fā)表于 10-17 14:21 ?1.2w次閱讀

    文掌握MyBatis的動態(tài)SQL使用與原理

    摘要:使用動態(tài) SQL 并非件易事,但借助可用于任何 SQL 映射語句中的強大的動態(tài) SQL 語言,MyBatis 顯著地提升了這
    的頭像 發(fā)表于 01-06 11:27 ?921次閱讀

    如何用java語言開發(fā)一套數(shù)字化產(chǎn)科系統(tǒng)? 數(shù)字化產(chǎn)科管理平臺源碼

    如何用java語言開發(fā)一套數(shù)字化產(chǎn)科系統(tǒng) 數(shù)字化產(chǎn)科管理平臺源碼
    的頭像 發(fā)表于 07-06 09:38 ?962次閱讀
    如何用java<b class='flag-5'>語言</b><b class='flag-5'>開發(fā)</b><b class='flag-5'>一套</b>數(shù)字化產(chǎn)科系統(tǒng)? 數(shù)字化產(chǎn)科管理平臺源碼