0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

xxl-job通信設(shè)計(jì)流程

jf_ro2CN3Fa ? 來源:c1n.cn ? 2024-01-30 09:34 ? 次閱讀

通信底層介紹

xxl-job 使用 netty http 的方式進(jìn)行通信,雖然也支持 Mina,jetty,netty tcp 等方式,但是代碼里面固定寫死的是 netty http。

通信整體流程

我以調(diào)度器通知執(zhí)行器執(zhí)行任務(wù)為例,繪制的活動(dòng)圖:

2417a940-b8c3-11ee-8b88-92fbcf53809c.png

活動(dòng)圖

驚艷的設(shè)計(jì)

看完了整個(gè)處理流程代碼,設(shè)計(jì)上可以說獨(dú)具匠心,將 netty,多線程的知識(shí)運(yùn)用得行云流水。

我現(xiàn)在就將這些設(shè)計(jì)上出彩的點(diǎn)總結(jié)如下:

使用動(dòng)態(tài)代理模式,隱藏通信細(xì)節(jié)

xxl-job 定義了兩個(gè)接口 ExecutorBiz,AdminBiz,ExecutorBiz 接口中封裝了向心跳,暫停,觸發(fā)執(zhí)行等操作,AdminBiz 封裝了回調(diào),注冊(cè),取消注冊(cè)操作,接口的實(shí)現(xiàn)類中,并沒有通信相關(guān)的處理。

XxlRpcReferenceBean 類的 getObject() 方法會(huì)生成一個(gè)代理類,這個(gè)代理類會(huì)進(jìn)行遠(yuǎn)程通信。

全異步處理

執(zhí)行器收到消息進(jìn)行反序列化,并沒有同步執(zhí)行任務(wù)代碼,而是將任務(wù)信息存儲(chǔ)在 LinkedBlockingQueue 中,異步線程從這個(gè)隊(duì)列中獲取任務(wù)信息,然后執(zhí)行。

而任務(wù)的處理結(jié)果,也不是說處理完之后,同步返回的,也是放到回調(diào)線程的阻塞隊(duì)列中,異步的將處理結(jié)果返回回去。

這樣處理的好處就是減少了 netty 工作線程的處理時(shí)間,提升了吞吐量。

對(duì)異步處理的包裝

對(duì)異步處理進(jìn)行了包裝,代碼看起來是同步調(diào)用的。

我們看下調(diào)度器,XxlJobTrigger 類觸發(fā)任務(wù)執(zhí)行的代碼:

publicstaticReturnTrunExecutor(TriggerParamtriggerParam,Stringaddress){
ReturnTrunResult=null;
try{
ExecutorBizexecutorBiz=XxlJobScheduler.getExecutorBiz(address);
//這里面做了很多異步處理,最終同步得到處理結(jié)果
runResult=executorBiz.run(triggerParam);
}catch(Exceptione){
logger.error(">>>>>>>>>>>xxl-jobtriggererror,pleasecheckiftheexecutor[{}]isrunning.",address,e);
runResult=newReturnT(ReturnT.FAIL_CODE,ThrowableUtil.toString(e));
}

StringBufferrunResultSB=newStringBuffer(I18nUtil.getString("jobconf_trigger_run")+":");
runResultSB.append("
address:").append(address); runResultSB.append("
code:").append(runResult.getCode()); runResultSB.append("
msg:").append(runResult.getMsg()); runResult.setMsg(runResultSB.toString()); returnrunResult; }

ExecutorBiz.run 方法我們說過了,是走的動(dòng)態(tài)代理,和執(zhí)行器進(jìn)行通信,執(zhí)行器執(zhí)行結(jié)果也是異步處理完,才返回的,而這里看到的 run 方法是同步等待處理結(jié)果返回。

我們看下xxl-job是如何同步獲取處理結(jié)果的:調(diào)度器向執(zhí)行器發(fā)出消息后,該線程阻塞。等到執(zhí)行器處理完畢后,將處理結(jié)果返回,喚醒被阻塞的線程,調(diào)用處拿到返回值。

動(dòng)態(tài)代理代碼如下:

//代理類中的觸發(fā)調(diào)用
if(CallType.SYNC==callType){
//future-responseset
XxlRpcFutureResponsefutureResponse=newXxlRpcFutureResponse(invokerFactory,xxlRpcRequest,null);
try{
//doinvoke
client.asyncSend(finalAddress,xxlRpcRequest);

//futureget
XxlRpcResponsexxlRpcResponse=futureResponse.get(timeout,TimeUnit.MILLISECONDS);
if(xxlRpcResponse.getErrorMsg()!=null){
thrownewXxlRpcException(xxlRpcResponse.getErrorMsg());
}
returnxxlRpcResponse.getResult();
}catch(Exceptione){
logger.info(">>>>>>>>>>>xxl-rpc,invokeerror,address:{},XxlRpcRequest{}",finalAddress,xxlRpcRequest);

throw(einstanceofXxlRpcException)?e:newXxlRpcException(e);
}finally{
//future-responseremove
futureResponse.removeInvokerFuture();
}
}

XxlRpcFutureResponse 類中實(shí)現(xiàn)了線程的等待,和線程喚醒的處理:

//返回結(jié)果,喚醒線程
publicvoidsetResponse(XxlRpcResponseresponse){
this.response=response;
synchronized(lock){
done=true;
lock.notifyAll();
}
}

@Override
publicXxlRpcResponseget(longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException{
if(!done){
synchronized(lock){
try{
if(timeout

有的同學(xué)可能會(huì)問了,調(diào)度器接收到返回結(jié)果,怎么確定喚醒哪個(gè)線程呢?

每一次遠(yuǎn)程調(diào)用,都會(huì)生成 uuid 的請(qǐng)求 id,這個(gè) id 是在整個(gè)調(diào)用過程中一直傳遞的,就像一把鑰匙,在你回家的的時(shí)候,拿著它就帶開門。

這里拿著請(qǐng)求 id 這把鑰匙,就能找到對(duì)應(yīng)的 XxlRpcFutureResponse,然后調(diào)用 setResponse 方法,設(shè)置返回值,喚醒線程。

publicvoidnotifyInvokerFuture(StringrequestId,finalXxlRpcResponsexxlRpcResponse){


//通過requestId找到XxlRpcFutureResponse,
finalXxlRpcFutureResponsefutureResponse=futureResponsePool.get(requestId);
if(futureResponse==null){
return;
}
if(futureResponse.getInvokeCallback()!=null){

//callbacktype
try{
executeResponseCallback(newRunnable(){
@Override
publicvoidrun(){
if(xxlRpcResponse.getErrorMsg()!=null){
futureResponse.getInvokeCallback().onFailure(newXxlRpcException(xxlRpcResponse.getErrorMsg()));
}else{
futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
}
}
});
}catch(Exceptione){
logger.error(e.getMessage(),e);
}
}else{
//里面調(diào)用lock的notify方法
futureResponse.setResponse(xxlRpcResponse);
}

//doremove
futureResponsePool.remove(requestId);

}
審核編輯:黃飛

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 通信技術(shù)
    +關(guān)注

    關(guān)注

    20

    文章

    1093

    瀏覽量

    92041
  • 線程
    +關(guān)注

    關(guān)注

    0

    文章

    501

    瀏覽量

    19580
  • 調(diào)度器
    +關(guān)注

    關(guān)注

    0

    文章

    98

    瀏覽量

    5209

原文標(biāo)題:xxl-job驚艷的設(shè)計(jì),怎能叫人不愛

文章出處:【微信號(hào):芋道源碼,微信公眾號(hào):芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏

    評(píng)論

    相關(guān)推薦

    《分布式_Job》——定時(shí)XXL_JOB_使用總結(jié)

    《分布式_Job》——定時(shí)XXL_JOB_使用總結(jié)
    發(fā)表于 07-08 17:56

    Spark job是怎么被調(diào)度執(zhí)行的

    Spark job 的執(zhí)行流程簡(jiǎn)介
    發(fā)表于 08-22 08:24

    運(yùn)行調(diào)度中心后訪問出現(xiàn)500錯(cuò)誤怎么解決

    XXL-Job 訪問調(diào)度中心出現(xiàn) 500 應(yīng)用程序異常
    發(fā)表于 11-08 09:39

    關(guān)于XXL-JOB定時(shí)調(diào)度器的使用總結(jié)

    XXL-JOB 定時(shí)調(diào)度器 使用小結(jié)
    發(fā)表于 04-23 14:53

    xxl conf admin在linux下面的自啟動(dòng)

    【配置中心】xxl-conf配置3 - xxl-conf-admin在linux下面的自啟動(dòng)
    發(fā)表于 06-10 17:30

    適用于RS-232通訊的ADM2xxL系列特性和典型應(yīng)用實(shí)例

    適用于RS-232通訊的ADM2xxL系列特性和典型應(yīng)用實(shí)例:
    發(fā)表于 06-17 11:26 ?22次下載
    適用于RS-232通訊的ADM2<b class='flag-5'>xxL</b>系列特性和典型應(yīng)用實(shí)例

    GSM通信流程

    GSM通信流程包括兩方面的內(nèi)容:呼叫基本流程,信令基本流程。其中,呼叫流程主要包含:移動(dòng)主叫流程
    發(fā)表于 07-29 16:34 ?42次下載

    AN-375:用于RS-232通信的ADM2xxL系列

    AN-375:用于RS-232通信的ADM2xxL系列
    發(fā)表于 05-07 19:32 ?8次下載
    AN-375:用于RS-232<b class='flag-5'>通信</b>的ADM2<b class='flag-5'>xxL</b>系列

    BK-JOB運(yùn)維腳本管理系統(tǒng)

    bk-job.zip
    發(fā)表于 04-28 10:31 ?0次下載
    BK-<b class='flag-5'>JOB</b>運(yùn)維腳本管理系統(tǒng)

    如何通過Output Job輸出原理圖變量

    在Output Job中選擇[Project Physical Documents],將會(huì)顯示編譯后的原理圖圖紙,其中包含圖紙上不同的變量元素。
    的頭像 發(fā)表于 09-08 11:20 ?837次閱讀

    什么是定時(shí)任務(wù) xxl-job架構(gòu)設(shè)計(jì)方案

    同一個(gè)執(zhí)行器集群內(nèi)AppName(xxl.job.executor.appname)需要保持一致;調(diào)度中心根據(jù)該配置動(dòng)態(tài)發(fā)現(xiàn)不同集群的在線執(zhí)行器列表。
    發(fā)表于 11-14 12:44 ?1240次閱讀

    xxl-job驚艷的設(shè)計(jì),怎能叫人不愛

    xxl-job 定義了兩個(gè)接口 ExecutorBiz,AdminBiz,ExecutorBiz 接口中封裝了向心跳,暫停,觸發(fā)執(zhí)行等操作,AdminBiz 封裝了回調(diào),注冊(cè),取消注冊(cè)操作,接口的實(shí)現(xiàn)類中,并沒有通信相關(guān)的處理。
    的頭像 發(fā)表于 12-22 14:43 ?645次閱讀

    xxl-job任務(wù)調(diào)度中間件解決定時(shí)任務(wù)的調(diào)度問題

    xxl-job是一款非常優(yōu)秀的任務(wù)調(diào)度中間件,輕量級(jí)、使用簡(jiǎn)單、支持分布式等優(yōu)點(diǎn),讓它廣泛應(yīng)用在我們的項(xiàng)目中,解決了不少定時(shí)任務(wù)的調(diào)度問題。
    的頭像 發(fā)表于 01-31 09:53 ?1621次閱讀

    單向ESD保護(hù)二極管-PESD9XXL_SER

    單向ESD保護(hù)二極管-PESD9XXL_SER
    發(fā)表于 02-09 21:31 ?0次下載
    單向ESD保護(hù)二極管-PESD9<b class='flag-5'>XXL</b>_SER

    分布式定時(shí)調(diào)度:xxl-job最佳實(shí)踐方法

    定時(shí)任務(wù)是按照指定時(shí)間周期運(yùn)行任務(wù)。使用場(chǎng)景為在某個(gè)固定時(shí)間點(diǎn)執(zhí)行,或者周期性的去執(zhí)行某個(gè)任務(wù),比如:每天晚上24點(diǎn)做數(shù)據(jù)匯總,定時(shí)發(fā)送短信等。
    的頭像 發(fā)表于 11-30 11:06 ?1144次閱讀
    分布式定時(shí)調(diào)度:<b class='flag-5'>xxl-job</b>最佳實(shí)踐方法