0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

仿Flow構(gòu)建器創(chuàng)建數(shù)據(jù)流教程

jf_78858299 ? 來源:北洋洋洋 ? 作者:北洋 ? 2023-02-14 15:31 ? 次閱讀

第一步

首先回顧下前面的知識(shí)點(diǎn):

flow提供的只是一個(gè) 「擴(kuò)展函數(shù)」 返回的是一個(gè)保存了這個(gè)方法的類實(shí)例,并且該類提供emit方法以供flow中調(diào)用

構(gòu)建Flow

「flow方法」

object Flow {
    fun  flow(collect: Collector<T>.() -> Unit): SafeFlowCollector {
        return SafeFlowCollector(collect)
    }
}

定義一個(gè)Flow類,內(nèi)部提供flow方法。

「SafeCollector」 類:

class SafeFlowCollector<T>(val collect: Collector.() -> Unit) {
    //將該Function保存在調(diào)用flow后創(chuàng)建的實(shí)例中獲取實(shí)例創(chuàng)建FlowCollector
    fun collectFunction(a: (T) -> Unit) {
        val co = Collector(a)
        co.collect()
    }

「Collector類」

class Collector<T>(val action: (a: T) -> Unit) {
    fun emit(value: T) {
        action(value)
    }
}

這是flow方法需要?jiǎng)?chuàng)建的三個(gè)類,雖然功能不多,但是對(duì)于簡(jiǎn)單的構(gòu)建流還是綽綽有余的。

分析

可以看到flow方法傳入的方法參數(shù)collect被定義為了Collector的擴(kuò)展函數(shù),并且保存在了剛創(chuàng)建的SafeCollector的類中用collect函數(shù)表示。

「第一個(gè)功能」 :flow參數(shù)提交的類型和collect中收到的類型一致,我采用了更加直接的形式定義flow時(shí)需要設(shè)置傳輸?shù)念愋?,在emit和collect中都是對(duì)應(yīng)的類型。

「實(shí)現(xiàn)」

flow定義類型和emit類型保持一致:通過Collector< T >實(shí)現(xiàn)

flow定義類型和收集到的類型一致:通過SafeFlowCollector< T >實(shí)現(xiàn)

第二步

構(gòu)建collect收集器

第一步發(fā)射器設(shè)置好后,我們限制了發(fā)送的類型和接受的類型,并且將發(fā)送邏輯保存在了實(shí)例中。接下來我們需要調(diào)用該實(shí)例對(duì)象以觸發(fā)發(fā)送邏輯,在發(fā)送邏輯中還需要調(diào)用到我們收集的邏輯。

因此收集邏輯需要單獨(dú)存放,因此需要單獨(dú)構(gòu)建一個(gè)類,這個(gè)類還必須可以調(diào)用到發(fā)送邏輯。

?注:Flow中采用的是collect收集觸發(fā)flow流發(fā)送邏輯,本人使用時(shí)是按照collectFunction定義的。兩者邏輯一樣只是名稱不同

?

flow要構(gòu)造成哪個(gè)類的構(gòu)造函數(shù),該類就需要持有collect傳入的方法。這也是Collector的功能

SafeFlowCollector和Collector

Collector保存collect傳入的方法,flow擴(kuò)展Collector以使他可以觸發(fā)發(fā)射邏輯也就是flow傳入的參數(shù)。

class SafeFlowCollector<T>(val collect: Collector.() -> Unit)  //擴(kuò)展函數(shù)

//發(fā)送邏輯emit,這個(gè)action是從哪里來的呢?
class Collector<T>(val action: (a: T) -> Unit) {
    fun emit(value: T) {
        action(value)
    }
}

再解collectFunction方法:

上面說到action方法,接著看:

fun collectFunction(a: (T) -> Unit) {
    //可以看到調(diào)用collectFunction方法將傳入的方法參數(shù)保存到了Collector中也就是action方法
    val co = Collector(a)
    //觸發(fā)collect,collect也就是flow中傳入的方法。
    co.collect()
}

原生flow,collect的demo:

?注:將上述三個(gè)類拷貝到您的項(xiàng)目中即可調(diào)用

?

Flow.flow {
    Log.i(TAG, "emit before")
    emit("1")
    Log.i(TAG, "emit after")
}.collectFunction {
    Log.i(TAG, "collectionFunction is : $it")
}

總結(jié)

因此可以看到調(diào)用collectFunction方法會(huì)調(diào)用到flow傳入的方法中,在flow傳入的方法中調(diào)用emit又會(huì)執(zhí)行collectFunction傳入的方法。

?ps:collectFunction類比于原生Flow中的collect方法即可

?

擴(kuò)展中間轉(zhuǎn)換符

flow和collect我們支持了,現(xiàn)在我們來擴(kuò)展轉(zhuǎn)換操作符。這里還是簡(jiǎn)單實(shí)現(xiàn),原生Flow的實(shí)現(xiàn)看的很繞。

map操作符

map方法接受的是一個(gè)方法。并且該方法的參數(shù)是原數(shù)據(jù),經(jīng)過轉(zhuǎn)換后返回的值是collect接受的值。

首先我們需要確定幾個(gè)點(diǎn):

1、map的參數(shù)如何確定?

?map的參數(shù)即上一個(gè)Flow emit的值,而在我們這個(gè)里面emit的值是通過flow< T >指定的,所以參數(shù)直接寫T就可以。2.map的返回值如何確定?

?

?map的返回值要經(jīng)過兩個(gè)階段,收到上一個(gè)flow發(fā)送的值調(diào)用轉(zhuǎn)換函數(shù)把值傳入得到結(jié)果,因此map中最后一行即為返回值。

?

3.支持鏈?zhǔn)秸{(diào)用map

?map之后還需要再次經(jīng)歷map或者collectFunction方法,因此他返回的也必須是一個(gè)flowSafeCollector。參數(shù)是map傳入方法的返回值。

?

確定好這些來實(shí)現(xiàn),由于需要調(diào)用到上一個(gè)flow的collect方法, 「原生中是擴(kuò)展函數(shù)this即代表上一個(gè)Flow。本demo異曲同工,直接定義為該類的函數(shù)」 。于是實(shí)現(xiàn)如下, 「修改SafeFlowCollector」 類即可:

class SafeFlowCollector<T>(val collect: Collector.() -> Unit) {
    fun collectFunction(a: (T) -> Unit) {
        val co = Collector(a)
        co.collect()
    }

    //對(duì)比發(fā)現(xiàn)只是擴(kuò)展了這個(gè)map方法
    fun  map(mapFunction: (T) -> R): SafeFlowCollector {
                //外層調(diào)用經(jīng)map轉(zhuǎn)換后的collectionFunction會(huì)走到flow里面
        return Flow.flow {
                //this@SafeFlowCollector是為了保證調(diào)用到調(diào)用map函數(shù)的flow觸發(fā)這個(gè)flow的收集
                this@SafeFlowCollector.collectFunction {
                    //當(dāng)最內(nèi)層flow函數(shù)調(diào)用emit,此collectFunction會(huì)收到回調(diào),進(jìn)行轉(zhuǎn)換后調(diào)用flow返回給最外層的collectFunction方法。
                    emit(mapFunction(it))
                }
        }
    }

}

讀者可以直接將上面的SafeFlowCollector換成這個(gè)即可。

可支持map轉(zhuǎn)換Flow的demo

Flow.flow {
    emit("2")
}.map {
    it.toInt()
}.map {
    it.toString()
}.collectFunction {
    Log.i(TAG, "onCreate: it   $it")
}

「最后還是放出這張圖片:」

圖片

1649984327(1).png

擴(kuò)展中間操作符

還是像map一樣,直接把該方法添加到 「SafeFlowCollector」 類中即可

//新增擴(kuò)展zip操作流,用于多個(gè)流發(fā)射,就近原則誰先返回用誰
//tips1:收集器類型及·返回流類型,由于返回值相同。因此復(fù)用調(diào)用方流的泛型即可
//2:開啟收集后觸發(fā)多個(gè)流的收集,利用標(biāo)志位進(jìn)行判斷是否發(fā)射。目前采用這種方式。缺點(diǎn):流發(fā)射后應(yīng)該關(guān)閉但是此處只是限制了流的發(fā)射邏輯
fun zip(twoFlow: SafeFlowCollector<T>): SafeFlowCollector {
    return Flow.flow {
        var a = false //通過定義condition判斷,不支持協(xié)程需要手動(dòng)切換線程測(cè)試效果
        //開啟兩個(gè)劉的收集,誰先收集到誰先返回
        this@SafeFlowCollector.collectFunction {
            if (!a) {
                a = true
                this.emit(it)
            }
        }
        twoFlow.collectFunction {
            if (!a) {
                a = true
                this.emit(it)
            }
        }
    }
}

?該種方案可支持多個(gè)Flow發(fā)射,zip中將參數(shù)變?yōu)榭勺儏?shù)即可。SafeCollector中擴(kuò)展

?

zip操作符Demo

val flow1 = Flow.flow<Int> {
        emit(2)
}
Flow.flow<String> {
    Thread{
        emit("111")
    }.interrupt()
}.map {
    it.toInt()
}.zip(flow1).collectFunction {
    Log.i(TAG, "onCreate: ZIP操作符下的Flow收集器收集到的數(shù)據(jù)位 $it")
}

?上述方案采用標(biāo)志位實(shí)現(xiàn)不太優(yōu)雅,有更好的方式可以提出~~

?

「不支持協(xié)程~~~,需要調(diào)度手動(dòng)切換線程」

總結(jié)

「轉(zhuǎn)換操作符只是中間操作符的一種,其他中間操作符原理大致都一樣。都是經(jīng)過在封裝一次flow然后觸發(fā)上級(jí)flow的收集,最后調(diào)用到最里層的flow,調(diào)用emit在一層層經(jīng)過中間操作符處理給到最外層」

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 編程
    +關(guān)注

    關(guān)注

    88

    文章

    3521

    瀏覽量

    93269
  • 函數(shù)
    +關(guān)注

    關(guān)注

    3

    文章

    4237

    瀏覽量

    61969
  • Flow
    +關(guān)注

    關(guān)注

    0

    文章

    10

    瀏覽量

    8817
收藏 人收藏

    評(píng)論

    相關(guān)推薦

    SSIS體系結(jié)構(gòu)控制數(shù)據(jù)流技術(shù)對(duì)比分析

    數(shù)據(jù)流(Data Flow)是控制中的核心組件,用于把數(shù)據(jù)提取到服務(wù)內(nèi)存中,轉(zhuǎn)換數(shù)據(jù)并把
    發(fā)表于 11-06 11:19 ?4614次閱讀

    大眾數(shù)據(jù)流分析

    、3.0發(fā)動(dòng)機(jī)數(shù)據(jù)流定義與解釋別克君威2.0發(fā)動(dòng)機(jī)數(shù)據(jù)流定義與解釋凱越數(shù)據(jù)流列表凱越發(fā)動(dòng)機(jī)數(shù)據(jù)流定義賽歐數(shù)據(jù)流列表賽歐
    發(fā)表于 06-15 12:28

    大眾防盜數(shù)據(jù)流

    汽車防盜數(shù)據(jù)流
    發(fā)表于 08-31 16:23

    研究labview的數(shù)據(jù)流

    我在NI上下載了labview一個(gè)程序,運(yùn)行起來還有點(diǎn)小問題,我想運(yùn)行 看看他的數(shù)據(jù)流,深入的研究下,哪位大神能指導(dǎo)下,怎么增加個(gè)仿真信號(hào) 和輸出采集,這樣能清楚的看到其整個(gè)數(shù)據(jù)流的過程,謝謝~
    發(fā)表于 12-31 10:40

    LabVIEW中的數(shù)據(jù)流編程基礎(chǔ)

      LabVIEW按照數(shù)據(jù)流(dataflow)模式運(yùn)行VI。 當(dāng)接受到所有所需的輸入時(shí),程序框圖節(jié)點(diǎn)將運(yùn)行。節(jié)點(diǎn)在運(yùn)行時(shí)產(chǎn)生輸出端數(shù)據(jù)并將該數(shù)據(jù)傳送給數(shù)據(jù)流路徑中的下一個(gè)節(jié)點(diǎn)。
    發(fā)表于 11-20 10:47

    關(guān)于高速數(shù)據(jù)流盤處理技術(shù)看完你就懂了

    本文討論了支持高速數(shù)據(jù)流處理的技術(shù)、最大化系統(tǒng)處理性能的應(yīng)用設(shè)計(jì)和在數(shù)據(jù)流導(dǎo)入磁盤與數(shù)據(jù)流導(dǎo)入存儲(chǔ)應(yīng)用中可獲得的
    發(fā)表于 04-29 06:25

    基于數(shù)據(jù)流的Java字節(jié)碼分析

    本文基于數(shù)據(jù)流框架理論,提出了如何將數(shù)據(jù)流分析方法應(yīng)用于JAVA 字節(jié)碼中,通過建立數(shù)據(jù)流與半格、數(shù)據(jù)流和函數(shù)調(diào)用圖的關(guān)系,從而對(duì)類型信息進(jìn)行分析。實(shí)驗(yàn)表明該
    發(fā)表于 12-25 13:22 ?9次下載

    網(wǎng)絡(luò)數(shù)據(jù)流存儲(chǔ)算法分析與實(shí)現(xiàn)

    針對(duì)網(wǎng)絡(luò)數(shù)據(jù)流存儲(chǔ)的瓶頸問題,提出了一種網(wǎng)絡(luò)數(shù)據(jù)流存儲(chǔ)算法分析與實(shí)現(xiàn)方法,仿真結(jié)果表明,模型能顯著提高網(wǎng)絡(luò)數(shù)據(jù)流的實(shí)時(shí)存儲(chǔ)能力
    發(fā)表于 05-26 15:57 ?21次下載
    網(wǎng)絡(luò)<b class='flag-5'>數(shù)據(jù)流</b>存儲(chǔ)算法分析與實(shí)現(xiàn)

    基于數(shù)據(jù)流特征的電子文件訪問方法

    的概念,提取了常用網(wǎng)絡(luò)協(xié)議傳輸電子文件的特征值,設(shè)計(jì)和構(gòu)建了擴(kuò)展性強(qiáng)的數(shù)據(jù)流特征數(shù)據(jù)庫,并給出了該特征數(shù)據(jù)庫檢測(cè)和防止電子文件網(wǎng)絡(luò)泄漏過程。實(shí)驗(yàn)數(shù)據(jù)
    發(fā)表于 11-15 14:15 ?5次下載

    基于FPGA芯片的數(shù)據(jù)流結(jié)構(gòu)分析

    Virtex 型FPGA 芯片是Xilinx 公司芯片系列中的一種,Virtex 系列的數(shù)據(jù)流及配置邏輯與XC4000 的數(shù)據(jù)流及配置邏輯有顯著不同,但卻與Xilinx 的FPGA 家族保持了很大
    發(fā)表于 11-18 11:37 ?2224次閱讀

    數(shù)據(jù)流編程模型優(yōu)化

    數(shù)據(jù)流編程模型將程序的計(jì)算與通信分離,暴露了應(yīng)用程序潛在的并行性并簡(jiǎn)化了編程難度。分布式計(jì)算框架利用廉價(jià)PC構(gòu)建多核集群解決了大規(guī)模并行計(jì)算問題,但多核集群層次性存儲(chǔ)結(jié)構(gòu)和處理單元對(duì)數(shù)據(jù)流程序的性能
    發(fā)表于 11-23 15:48 ?3次下載
    <b class='flag-5'>數(shù)據(jù)流</b>編程模型優(yōu)化

    基于角度方差的數(shù)據(jù)流異常檢測(cè)算法

    傳統(tǒng)基于歐氏距離的異常檢測(cè)算法在高維數(shù)據(jù)檢測(cè)中存在精度無法保證以及運(yùn)行時(shí)間過長(zhǎng)的問題。為此,結(jié)合高維數(shù)據(jù)流的特點(diǎn)運(yùn)用角度方差的方法,提出一種改進(jìn)的基于角度方差的數(shù)據(jù)流異常檢測(cè)算法。通過構(gòu)建
    發(fā)表于 01-17 11:29 ?1次下載
    基于角度方差的<b class='flag-5'>數(shù)據(jù)流</b>異常檢測(cè)算法

    數(shù)據(jù)流是什么

    數(shù)據(jù)流最初是通信領(lǐng)域使用的概念,代表傳輸中所使用的信息的數(shù)字編碼信號(hào)序列。然而,我們所提到的數(shù)據(jù)流概念與此不同。這個(gè)概念最初在1998年由Henzinger在文獻(xiàn)87中提出,他將數(shù)據(jù)流定義為“只能以事先規(guī)定好的順序被讀取一次的
    的頭像 發(fā)表于 02-27 15:25 ?6888次閱讀

    Labview數(shù)據(jù)流編程的簡(jiǎn)單介紹

    Labview數(shù)據(jù)流編程基本概念視頻教學(xué)
    的頭像 發(fā)表于 08-05 06:05 ?3942次閱讀

    控制數(shù)據(jù)流的區(qū)別

    控制數(shù)據(jù)流的區(qū)別? 在計(jì)算機(jī)科學(xué)中,控制數(shù)據(jù)流是兩個(gè)非常重要的概念。雖然它們經(jīng)常一起使用,但它們具有非常不同的含義。本文將討論控制
    的頭像 發(fā)表于 09-13 11:17 ?4540次閱讀