0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

Java中的Reactive編程示例

科技綠洲 ? 來源:Java技術(shù)指北 ? 作者:Java技術(shù)指北 ? 2023-10-08 16:06 ? 次閱讀

相信響應(yīng)式編程經(jīng)常會在各種地方被提到。本篇就為大家從函數(shù)式編程一直到Spring WeFlux做一次簡單的講解,并給出一些示例,希望大家可以更好的理解響應(yīng)式編程,可以在合適的時機運用到實際項目中。

1. 前言

了解響應(yīng)式編程,首先我們需要了解函數(shù)式操作和Stream的操作,下面我們簡單的復(fù)習一下嘍。

1.1 常用函數(shù)式編程

函數(shù)式接口

我們先來回顧一下Java中的函數(shù)式接口。常見的有以下幾種

  • Consumer 一個輸入,無輸出
  • Supplier 無輸入,有輸出
  • Function 輸入T,輸出R
  • BiFunction 輸入T,U 輸出R
  • Predicate 有輸入,輸出boolean類型

上面的簡單函數(shù)式接口示例如下:

Consumer consumer = (i)- > System.out.println("this is " + i);
consumer.accept("consumer");

Supplier supplier  = () - > "this is supplier";
System.out.println(supplier.get());

Function< Integer,Integer > function = (i) - > i*i;
System.out.println(function.apply(8));

BiFunction< Integer,Integer,String > biFunction = (i,j)- > i+"*"+j+"="+i*j;
System.out.println(biFunction.apply(8,8));

Predicate< Integer > predicate = (i) - > i.intValue() >3;
System.out.println(predicate.test(5));

其執(zhí)行結(jié)果如下:

this is consumer
this is supplier
64
8*8=64
true

1.2 Stream操作

對Stream進行操作,主要有幾個關(guān)鍵點:

  • 生成流
  • 流的中間操作 其中中間操作可以有多個,中間操作會返回一個新的流(如 map ,filter,sorted等),然后交給下一個流方法使用。
  • 流的終結(jié)操作 終結(jié)操作只有一個。終結(jié)操作執(zhí)行后,流就到了終止狀態(tài),無法被操作 (如forEach,toArray , findFirst 等)。

創(chuàng)建流的示例:

String[] strArray = {"ss","ss","","sdffg"};

Arrays.stream(strArray).forEach(System.out::println);
Arrays.asList(strArray).stream().forEach(System.out::println);
Stream.of(strArray).forEach(System.out::println);
Stream.iterate(1,(i) - > i+1).limit(10).forEach(System.out::println);
Stream.generate(() - > new Random().nextInt(10)).limit(10).forEach(System.out::println);

簡單的流處理示例:

String[] strArray1 = {"ss","ss","","sdffg","bca-de","fff"};
String collect = Stream.of(strArray1)
        .filter(i - > !i.isEmpty())//過濾空字符串
        .sorted() //排序
        .limit(1) //只取第一個元素
        .map(i - > i.replace("-", ""))//替換 "-"
        .flatMap(i - > Stream.of(i.split("")))//將字符拆成字符數(shù)組
        .sorted() //排序
        .collect(Collectors.joining());//將字符拼接組合到一起
System.out.println(collect);//最后輸出abcde

2. Java響應(yīng)式編程

響應(yīng)式編程會用到一個發(fā)布者和一個訂閱者,然后通過訂閱關(guān)系完成數(shù)據(jù)流的傳輸。訂閱關(guān)系中可以處理一些背壓問題,即調(diào)節(jié)消費者與生產(chǎn)者之間的供需平衡,讓整個程序達到最大效率。

圖片

Java9中java.util.concurrent.Flow接口提供響應(yīng)式流編程類似的功能。

下面我們實現(xiàn)一個基于Java 響應(yīng)式編程的示例:

其中有三個簡單步驟:

  1. 建立生產(chǎn)者
  2. 構(gòu)建消費者
  3. 消費者訂閱生產(chǎn)者
  4. 生產(chǎn)者生產(chǎn)內(nèi)容
SubmissionPublisher publisher = new SubmissionPublisher<  >();//建立生產(chǎn)者
Flow.Subscriber subscriber = new Flow.Subscriber() {...};//建立消費者 (其中的實現(xiàn)放到下面)
publisher.subscribe(subscriber);//訂閱關(guān)系
for (int i = 0; i < 10; i++) {
 publisher.submit("test reactive java : " +i); //生產(chǎn)者生產(chǎn)內(nèi)容
}

消費者全部代碼如下:

Flow.Subscriber subscriber = new Flow.Subscriber() {
    Flow.Subscription subscription;
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("Subscription establish first ");
        this.subscription = subscription;
        this.subscription.request(1);
    }
    @Override
    public void onNext(Object item) {
        subscription.request(10);
        System.out.println("receive :  "+ item);
    }
    @Override
    public void onError(Throwable throwable) {
        System.out.println(" onError ");
    }
    @Override
    public void onComplete() {
        System.out.println(" onComplete ");
    }
};

其中onSubscribe方法表示建立訂閱關(guān)系

onNext接受數(shù)據(jù),并請求生產(chǎn)者的數(shù)據(jù)。

onError,onComplete則是error或者完成之后的處理方法。

帶有中間處理器的響應(yīng)式流

Reactive Stream 通常會基于如下的模型:

圖片

下面我們實現(xiàn)一個帶有中間處理功能的響應(yīng)式模型:

下面的Processor 既有發(fā)布者,又有訂閱者:

public class ReactiveProcessor extends SubmissionPublisher implements Flow.Subscriber {
    private Flow.Subscription subscription;
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println( Thread.currentThread().getName() +  " Reactive processor establish connection ");
        this.subscription = subscription;
        this.subscription.request(1);
    }

    @Override
    public void onNext(Object item) {
        System.out.println(Thread.currentThread().getName() + " Reactive processor receive data: "+ item);
        this.submit(item.toString().toUpperCase());
        this.subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Reactive processor error ");
        throwable.printStackTrace();
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
        System.out.println(Thread.currentThread().getName() + " Reactive processor receive data complete ");
    }
}

如上中間處理器訂閱發(fā)布者, 同時消費者再訂閱中間處理器。中間處理器也可以調(diào)節(jié)發(fā)布訂閱的生產(chǎn)消費速率。

SubmissionPublisher publisher = new SubmissionPublisher<  >(); //創(chuàng)建生產(chǎn)者
ReactiveProcessor reactiveProcessor = new ReactiveProcessor(); // 創(chuàng)建中間處理器
publisher.subscribe(reactiveProcessor); //中間處理器訂閱生產(chǎn)者
Flow.Subscriber subscriber = new Flow.Subscriber() {...}; //創(chuàng)建消費者
reactiveProcessor.subscribe(subscriber); //消費者訂閱中間處理器
for (int i = 0; i < 10; i++) {
    publisher.submit("test reactive java : " +i); //生產(chǎn)者生產(chǎn)數(shù)據(jù)
}

通過上述生產(chǎn)者-> 中間處理器->消費者, 可以將生產(chǎn)者生產(chǎn)的數(shù)據(jù)全部變成大寫,然后再發(fā)送給最終的消費者。

以上式Java中的reactive 編程示例。Java會不同線程來分別處理消費者與生產(chǎn)者的消息處理

3. Reactor

Reactor中兩個比較關(guān)鍵的對象式Flux和Mono, 整個Spring的響應(yīng)式編程均式基于projectreactor項目。Reactor是響應(yīng)式編程的依賴,主要是基于JVM構(gòu)建非阻塞程序。

根據(jù)Reactor的介紹,此類響應(yīng)式編程的的三方庫(Reactor)主要是解決一些JVM經(jīng)典異步編程中的一些缺點,并且還可以專注于一些新的特性,如下:

  • 可組合性與可讀性 (Composability and readability)
  • 可以使用豐富的運算操作符將數(shù)據(jù)作為流進行操作
  • 訂閱之前,不會有任何事
  • 背壓特性(Backpressure ),可以理解為消費者可以向生產(chǎn)者發(fā)送產(chǎn)出率過高的信號,從而調(diào)整生產(chǎn)速率。或者消費者可以選擇一次性拉去一捆數(shù)據(jù)進行消費。
  • 于并發(fā)無關(guān)的高度抽象的高級功能

其中有這么一段解釋,可以形象的說明響應(yīng)式編程。

Reactive的程序可以想象成車間的流水線,reactor既是流水線上的傳送帶,又是處理工作站。原料從一個原始的生產(chǎn)者出發(fā),最終成為產(chǎn)品被推總給消費者。

3.1 Flux & Mono

下面我們介紹一下Flux和Mono。

在Reactor中Flux和Mono均是Publisher,即生產(chǎn)者。兩者也有不同。Flux對象表示0到N個異步的響應(yīng)序列,而Mono只代表0個(empty)或者1個結(jié)果。

Reactor官網(wǎng)上介紹的Flux示意如下:

圖片

Mono示意如下:

圖片

3.2 Flux Mono創(chuàng)建與使用

我們也可以單獨引用其依賴。

使用maven依賴

< dependencies >
    < dependency >
        < groupId >io.projectreactor< /groupId >
        < artifactId >reactor-core< /artifactId > 
    < /dependency >
    < dependency >
        < groupId >io.projectreactor< /groupId >
        < artifactId >reactor-test< /artifactId > 
        < scope >test< /scope >
    < /dependency >
< /dependencies >
Mono創(chuàng)建

分別創(chuàng)建空Mono和一個包含一個String的Mono,并由消費者消費打印。

Mono.empty().subscribe(System.out::println);
Mono.just("Hello Mono Java North").subscribe(System.out::print);
Flux創(chuàng)建

Flux創(chuàng)建有如下的一些方法,

  • just(通過不定參數(shù)創(chuàng)建)
  • range(從某個整數(shù)開始,往后的整數(shù)數(shù)量)
  • fromArray,fromIterable,fromStream,從名稱上就可以看出來,通過數(shù)組,迭代器,Stream流創(chuàng)建Flux

下面式一些Java代碼示例

Flux.just(1,2,3,4,5).subscribe(System.out::print);
Flux.range(1,20).subscribe(System.out::print);
Flux.fromArray(new String[]{"a1","a2","a3","a4","a5","a6"}).skip(2).subscribe(System.out::print);
Flux.fromIterable(Arrays.asList(1,2,3,4,5,6,7)).subscribe(System.out::println);
Flux.fromStream(Stream.of(Arrays.asList(1,2,3,4,5,6,7))).subscribe(System.out::print);

我們再舉一個generate的例子

public static  T, S > Flux< T > generate(Callable< S > stateSupplier, BiFunction< S, SynchronousSink< T >, S > generator)

如上代碼所示,generate需要一個Callable參數(shù),而且是supplier (即沒有輸入值,只有一個輸出)

另一個參數(shù)是BiFunction (前面我們也介紹過,需要兩個輸入值,一個輸出值)。BiFunction中的其中一個輸入值是SynchronousSink,下面我們給出一個generate創(chuàng)建Flux的示例。

Flux.generate(
 () - > 0, //提供一個初始狀態(tài)值0
 (i, sink) - > {
    sink.next("3*" + i + "=" + 3 * i);//使用初始值去生產(chǎn)一個3的乘法
    if (i > 9) sink.complete();//設(shè)置停止條件
    return i + 1;//返回一個新的狀態(tài)值,以便在下一次的生產(chǎn)中使用,除非響應(yīng)序列終止
}).subscribe(System.out::println);

下面我們在看一個Flux嵌套處理示例:

需求:將字符串去空格,并去重,然后排序輸出。

String str = "qa ws ed rf tg yh uj i k ol p za sx dc vf bg hn jm k loi yt ";
Flux.fromArray(str.split(" "))//通過數(shù)組創(chuàng)建Flux
    .flatMap(i - > Flux.fromArray(i.split(""))) 
    .distinct() // 去重
    .sort() //排序
    .subscribe(System.out::print); 
    //flatMap與Stream中的flatMap類似,接受Function作為參數(shù),輸入一個值,輸出一個值,此處輸出均為Publisher,

以上就是Flux和Mono的一些簡單介紹,同時Ractor也支持JDK中的FlowPubliser 和FlowSubscriber與 Reactor中的publisher, subscriber的適配等.

4. WebFlux

SpringBoot 2之后支持的Reactive響應(yīng)式編程。

關(guān)于Reactive技術(shù)棧和經(jīng)典的Servlet技術(shù)棧對比,Spring官網(wǎng)的這張圖比較清晰。

圖片

Spring響應(yīng)式編程主要依賴于Reactor第三方庫,即上面講的Flux和Mono的庫。

WebFlux主要有以下幾個要點:

  • 反應(yīng)式棧web框架
  • 完全異步非阻塞
  • 運行在netty,undertow,Servlet3.1 + 容器
  • 核心反應(yīng)式庫 Reactor
  • 返回 Flux 或Mono
  • 支持注解和函數(shù)編程兩種編程模式

Spring WebFlux示例

下面我們給出幾個SpringBoot 的響應(yīng)式web示例。

可以去https://start.spring.io/ 新建webflux的項目也可以。

項目中的主要依賴就是spring-boot-starter-webflux

< dependency >
   < groupId >org.springframework.boot< /groupId >
   < artifactId >spring-boot-starter-webflux< /artifactId >
  < /dependency >
基于注解的WebFlux:

以下是一個最簡單的基于注解的WebFlux

@GetMapping("/hello/mono1")
public Mono< String > mono(){
    return Mono.just("Hello Mono -  Java North");
}

@GetMapping("/hello/flux1")
public Flux< String > flux(){
    return Flux.just("Hello Flux","Hello Java North");
}
基于函數(shù)式編程的WebFlux:

創(chuàng)建RouterFunction,將其注入到Spring中即可。

@Bean
public RouterFunction< ServerResponse > testRoutes1() {
    return RouterFunctions.route().GET("/flux/function", new HandlerFunction< ServerResponse >() {
        @Override
        public Mono< ServerResponse > handle(ServerRequest request) {
            return ServerResponse.ok().bodyValue("hello web flux , Hello Java North");
        }
    }).build();
}

//上面的方法使用函數(shù)式編程替換之后如下
@Bean
public RouterFunction< ServerResponse > testRoutes() {
    return RouterFunctions.route().GET("/flux/function",
         request - > ServerResponse.ok()
                    .bodyValue("Hello web flux , Hello Java North")).build();
}
Flux與Mono的響應(yīng)式編程延遲示例

下面我們編寫一段返回Mono的響應(yīng)式Web服務(wù)。

@GetMapping("/hello/mono")
public Mono< String > stringMono(){
    Mono< String > from = Mono.fromSupplier(() - > {
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "Hello, Spring Reactive  date time:"+ LocalDateTime.now();
    });
    System.out.println( "thread : " + Thread.currentThread().getName()+ " ===  " + LocalDateTime.now() +"  ==========Mono function complete==========");
    return from;
}

使用postman請求如下, 5秒鐘后返回數(shù)據(jù)。后臺卻在5秒中之前已經(jīng)處理完整個方法。

圖片

后臺打印日志:

圖片

再看一組Flux

@GetMapping(value = "/hello/flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux< String > flux1(){
    Flux< String > stringFlux = Flux.fromStream(IntStream.range(1,6).mapToObj(i - >{
        mySleep(1);//表示睡1秒
        return "java north flux" + i + "date time: " +LocalDateTime.now();
    }));
    System.out.println("thread : " + Thread.currentThread().getName()+ " ===  " + LocalDateTime.now() + "  ==========Flux function complete=========");
    return stringFlux;
}

此次使用谷歌瀏覽器請求此服務(wù):

可以發(fā)現(xiàn)每隔一秒就會有一條消息被生產(chǎn)出來。

圖片

后臺完成時間同樣是在一開始就完成整個方法:

圖片

通過上述對Flux 與 Mono的例子,可以好好體會一下響應(yīng)式編程。

總結(jié)

本篇回顧了函數(shù)式編程,Stream操作等,然后再舉例講了Java中的Reactive編程示例, 同時也給處理Reactor三方庫的Flux于Mono的示例。

最后使用了SpringBoot WebFlux 創(chuàng)建簡單的響應(yīng)式web服務(wù)。希望能讓大家更好的理解響應(yīng)式編程。

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學習之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 接口
    +關(guān)注

    關(guān)注

    33

    文章

    8254

    瀏覽量

    149942
  • JAVA
    +關(guān)注

    關(guān)注

    19

    文章

    2943

    瀏覽量

    104085
  • 編程
    +關(guān)注

    關(guān)注

    88

    文章

    3521

    瀏覽量

    93263
  • 函數(shù)
    +關(guān)注

    關(guān)注

    3

    文章

    4235

    瀏覽量

    61965
收藏 人收藏

    評論

    相關(guān)推薦

    java編程思想中文版 (電子書)

    java編程思想中文版第4版特點: 適合初學者與專業(yè)人員的經(jīng)典的面向?qū)ο髷⑹龇绞剑瑸楦碌?b class='flag-5'>Java SE5/6增加了新的示例和章節(jié)。=550; 測驗框架顯示程序輸出。=550; 設(shè)計模
    發(fā)表于 12-06 12:17 ?0次下載

    java編程思想第二版(第2版)

    java編程思想第二版的內(nèi)容組織、講授方法、選用示例和附帶練習都別具特色。作者Bruce Eckel根據(jù)多年教學實踐中發(fā)現(xiàn)的問題,通過簡練的示例和敘述,闡明了在學習
    發(fā)表于 12-06 13:37 ?0次下載

    java 網(wǎng)絡(luò)編程語言

    java 網(wǎng)絡(luò)編程語言
    發(fā)表于 12-09 15:41 ?0次下載

    Java編程思想_中文版4

    Java編程思想》這本書贏得了全球程序員的廣泛贊譽,即使是最晦澀的概念,在Bruce Eckel的文字親和力和小而直接的編程示例面前也會化解于無形。從
    發(fā)表于 03-09 11:20 ?0次下載

    JAVA教程之控件的排布示例

    JAVA教程之控件的排布示例,很好的學習資料。
    發(fā)表于 03-31 11:13 ?1次下載

    Java并發(fā)編程實戰(zhàn)

    Java并發(fā)編程實戰(zhàn)
    發(fā)表于 03-19 11:24 ?7次下載

    JAVA優(yōu)化編程

    JAVA優(yōu)化編程
    發(fā)表于 03-19 11:24 ?1次下載

    Java編程指南

    Java編程指南
    發(fā)表于 03-19 11:26 ?3次下載

    Java編程100例

    Java詳細編程 100例
    發(fā)表于 05-24 11:20 ?23次下載

    10個Java編程異常處理最佳實踐

    這里是我收集的10個Java編程中進行異常處理的10最佳實踐。在Java編程對于檢查異常有褒有貶,強制處理異常是一門語言的功能。在本文中,
    的頭像 發(fā)表于 05-03 17:49 ?1859次閱讀

    JAVA并發(fā)編程實踐

    JAVA并發(fā)編程實踐資料免費下載。
    發(fā)表于 06-01 15:31 ?15次下載

    Java并發(fā)編程的藝術(shù)

    Java并發(fā)編程的藝術(shù)說明。
    發(fā)表于 06-01 15:31 ?16次下載

    Java編程思想練習題源碼

    Java編程思想練習題源碼,配合《Java編程思想》進行學習。
    發(fā)表于 09-26 14:24 ?0次下載

    UM1864_Java編寫游戲示例

    UM1864_Java編寫游戲示例
    發(fā)表于 11-22 19:09 ?0次下載
    UM1864_<b class='flag-5'>Java</b>編寫游戲<b class='flag-5'>示例</b>

    移動數(shù)據(jù)的編程示例

    編程示例 在此編程示例,將移動在生產(chǎn)班次期間為示例收集的數(shù)據(jù)值以作進一步處理。收集的數(shù)據(jù)放在
    的頭像 發(fā)表于 08-23 10:10 ?1145次閱讀
    移動數(shù)據(jù)的<b class='flag-5'>編程</b><b class='flag-5'>示例</b>