用于傳感器分析的KSQL UDF。利用KSQL的新的API特性,用Java輕松地構(gòu)建UDF / UDAF函數(shù),從而使用Apache Kafka進(jìn)行連續(xù)流處理。用例:聯(lián)網(wǎng)汽車——使用深度學(xué)習(xí)的實(shí)時流媒體分析。
我為混合機(jī)器學(xué)習(xí)基礎(chǔ)設(shè)施構(gòu)建了一個場景,利用Apache Kafka作為可伸縮的中樞神經(jīng)系統(tǒng)。使用公共云在極端尺度下訓(xùn)練分析模型(如通過谷歌ML引擎在谷歌云平臺(GCP)上使用TensorFlow和TPUs。預(yù)測(即模型推斷)是在本地Kafka基礎(chǔ)設(shè)施的邊緣前提下執(zhí)行的(例如利用Kafka流或KSQL進(jìn)行流分析)。
這篇文章的重點(diǎn)是在前提部署。我用KSQL UDF創(chuàng)建了一個用于傳感器分析的Github項目。它利用KSQL的新API特性輕松地使用Java構(gòu)建UDF / UDAF函數(shù),對傳入事件進(jìn)行連續(xù)流處理。
用例:聯(lián)網(wǎng)汽車——使用深度學(xué)習(xí)的實(shí)時流媒體分析
連續(xù)處理來自連接設(shè)備(本例中的汽車傳感器)的數(shù)百萬個事件:
我建立了不同的分析模型。他們在公共云上接受訓(xùn)練,利用TensorFlow、H2O和谷歌ML引擎。模型創(chuàng)建不是這個示例的重點(diǎn)。最終的模型已經(jīng)準(zhǔn)備好投入生產(chǎn),并可以部署進(jìn)行實(shí)時預(yù)測。
模型服務(wù)可以通過模型服務(wù)器或原生嵌入到流處理應(yīng)用程序中來完成。查看模型部署中RPC與流處理的權(quán)衡和“TensorFlow + gRPC + Kafka流”示例。
演示:使用MQTT、Kafka和KSQL在邊緣進(jìn)行模型推斷
Github項目生成汽車傳感器數(shù)據(jù),通過Confluent MQTT代理將其轉(zhuǎn)發(fā)到Kafka集群進(jìn)行KSQL處理和實(shí)時分析。
這個項目主要是通過MQTT將數(shù)據(jù)輸入Kafka,通過KSQL對數(shù)據(jù)進(jìn)行處理:
Confluent MQTT代理的一大優(yōu)點(diǎn)是可以簡單地實(shí)現(xiàn)物聯(lián)網(wǎng)場景,而不需要MQTT代理。您可以通過MQTT代理直接將消息從MQTT設(shè)備轉(zhuǎn)發(fā)到Kafka。這大大減少了工作和成本。如果您“只是”希望在Kafka和MQTT設(shè)備之間進(jìn)行通信,那么這是一個完美的解決方案。
如果你想看這個故事的其他部分(與像Elasticsearch / Grafana這樣的sink應(yīng)用的集成),請看看Github項目“KSQL流物聯(lián)網(wǎng)數(shù)據(jù)”。通過Kafka Connect和Elastic connector實(shí)現(xiàn)了與ElasticSearch和Grafana的集成。
KSQL UDF 源代碼
開發(fā)udf非常容易。只需在一個UDF類中實(shí)現(xiàn)一個Java方法:
下面是KSQL UDF異常檢測的完整源代碼。(Anomaly Detection KSQL UDF.)
如何運(yùn)行與Apache Kafka和MQTT代理演示?
在Github項目中描述了執(zhí)行演示的所有步驟。
您只需要安裝Confluent Platform,然后按照以下步驟部署UDF、創(chuàng)建MQTT事件并通過利用分析模型的KSQL處理它們。
我使用mosquito to生成MQTT消息。當(dāng)然,您也可以使用任何其他MQTT客戶機(jī)。這就是開放和標(biāo)準(zhǔn)化協(xié)議的最大好處。
責(zé)任編輯:pj
-
傳感器
+關(guān)注
關(guān)注
2546文章
50498瀏覽量
751195 -
物聯(lián)網(wǎng)
+關(guān)注
關(guān)注
2902文章
44122瀏覽量
370427 -
機(jī)器學(xué)習(xí)
+關(guān)注
關(guān)注
66文章
8357瀏覽量
132328
發(fā)布評論請先 登錄
相關(guān)推薦
評論