1 前言
良好的系統(tǒng)設(shè)計(jì)必須要做到開閉原則,隨著業(yè)務(wù)的不斷迭代更新,核心代碼也會(huì)被不斷改動(dòng),出錯(cuò)的概率也會(huì)大大增加。但是大部分增加的功能都是在擴(kuò)展原有的功能,既要保證性能又要保證質(zhì)量,我們往往都會(huì)使用異步線程池來處理,然而卻增加了很多不確定性因素。
由此我設(shè)計(jì)了一套通用的異步處理SDK,可以很輕松的實(shí)現(xiàn)各種異步處理
2 目的
通過異步處理不僅能夠保證方法能夠得到有效的執(zhí)行而且不影響主流程
更重要的是各種兜底方法保證數(shù)據(jù)不丟失,從而達(dá)到最終一致性
3 優(yōu)點(diǎn)
無侵入設(shè)計(jì),獨(dú)立數(shù)據(jù)庫(kù),獨(dú)立定時(shí)任務(wù),獨(dú)立消息隊(duì)列,獨(dú)立人工執(zhí)行界面(統(tǒng)一登錄認(rèn)證)
使用spring事務(wù)事件機(jī)制,即使異步策略解析失敗也不會(huì)影響業(yè)務(wù)
如果你的方法正在運(yùn)行事務(wù),會(huì)等事務(wù)提交后或回滾后再處理事件
就算事務(wù)提交了,異步策略解析失敗了,我們還有兜底方案執(zhí)行(除非數(shù)據(jù)庫(kù)有問題,消息隊(duì)列有問題,方法有bug)
4 原理
容器初始化bean完成后遍歷所有方法,把有@AsyncExec注解的方法緩存起來
方法運(yùn)行時(shí)通過AOP切面發(fā)布事件
事務(wù)事件監(jiān)聽處理異步執(zhí)行策略
@TransactionalEventListener(fallbackExecution?=?true,?phase?=?TransactionPhase.AFTER_COMPLETION)
fallbackExecution=true 沒有事務(wù)正在運(yùn)行,依然處理事件
TransactionPhase.AFTER_COMPLETION 事務(wù)提交后和事務(wù)回滾后都處理事件
5 組件
kafka 消息隊(duì)列
xxl job 定時(shí)任務(wù)
mysql 數(shù)據(jù)庫(kù)
spring 切面
vue 界面
6 設(shè)計(jì)模式
策略
模板方法
動(dòng)態(tài)代理
7 流程圖
8 數(shù)據(jù)庫(kù)腳本
CREATE?TABLE?`async_req`?( ??`id`?bigint?NOT?NULL?AUTO_INCREMENT?COMMENT?'主鍵ID', ??`application_name`?varchar(100)?NOT?NULL?DEFAULT?''?COMMENT?'應(yīng)用名稱', ??`sign`?varchar(50)?NOT?NULL?DEFAULT?''?COMMENT?'方法簽名', ??`class_name`?varchar(200)?NOT?NULL?DEFAULT?''?COMMENT?'全路徑類名稱', ??`method_name`?varchar(100)?NOT?NULL?DEFAULT?''?COMMENT?'方法名稱', ??`async_type`?varchar(50)?NOT?NULL?DEFAULT?''?COMMENT?'異步策略類型', ??`exec_status`?tinyint?NOT?NULL?DEFAULT?'0'?COMMENT?'執(zhí)行狀態(tài)?0:初始化 1:執(zhí)行失敗 2:執(zhí)行成功', ??`exec_count`?int?NOT?NULL?DEFAULT?'0'?COMMENT?'執(zhí)行次數(shù)', ??`param_json`?longtext?COMMENT?'請(qǐng)求參數(shù)', ??`remark`?varchar(200)?NOT?NULL?DEFAULT?''?COMMENT?'業(yè)務(wù)描述', ??`create_time`?datetime?NOT?NULL?DEFAULT?CURRENT_TIMESTAMP?COMMENT?'創(chuàng)建時(shí)間', ??`update_time`?datetime?NOT?NULL?DEFAULT?CURRENT_TIMESTAMP?COMMENT?'更新時(shí)間', ??PRIMARY?KEY?(`id`)?USING?BTREE, ??KEY?`idx_applocation_name`?(`application_name`)?USING?BTREE, ??KEY?`idx_exec_status`?(`exec_status`)?USING?BTREE )?ENGINE=InnoDB?DEFAULT?CHARSET=utf8mb4?COMMENT='異步處理請(qǐng)求'; CREATE?TABLE?`async_log`?( ??`id`?bigint?NOT?NULL?AUTO_INCREMENT?COMMENT?'主鍵ID', ??`async_id`?bigint?NOT?NULL?DEFAULT?'0'?COMMENT?'異步請(qǐng)求ID', ??`error_data`?longtext?COMMENT?'執(zhí)行錯(cuò)誤信息', ??`create_time`?datetime?NOT?NULL?DEFAULT?CURRENT_TIMESTAMP?COMMENT?'創(chuàng)建時(shí)間', ??PRIMARY?KEY?(`id`)?USING?BTREE, ??KEY?`idx_async_id`?(`async_id`)?USING?BTREE )?ENGINE=InnoDB?DEFAULT?CHARSET=utf8mb4?COMMENT='異步處理日志';
9 異步策略
10 安全級(jí)別
11 執(zhí)行狀態(tài)
12 流程圖
?
?
13 apollo 配置
#?開關(guān):默認(rèn)關(guān)閉 async.enabled=true #?應(yīng)用名稱 spring.application.name=xxx #?數(shù)據(jù)源?druid? spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.datasource.url=jdbc//127.0.0.1:3306/fc_async?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true&rewriteBatchedStatements=true spring.datasource.username=user spring.datasource.password=xxxx spring.datasource.filters=config spring.datasource.connectionProperties=config.decrypt=true;config.decrypt.key=yyy #靜態(tài)地址 spring.resources.add-mappings=true spring.resources.static-locations=classpath:/static/ #?以下配置都有默認(rèn)值 #?核心線程數(shù) async.executor.thread.corePoolSize=10 #?最大線程數(shù) async.executor.thread.maxPoolSize=50 #?隊(duì)列容量 async.executor.thread.queueCapacity=10000 #?活躍時(shí)間 async.executor.thread.keepAliveSeconds=600 #?執(zhí)行成功是否刪除記錄:默認(rèn)刪除 async.exec.deleted=true ? #?自定義隊(duì)列名稱前綴:默認(rèn)應(yīng)用名稱 async.topic=${spring.application.name} ? #?重試執(zhí)行次數(shù):默認(rèn)5次 async.exec.count=5 ? #?重試最大查詢數(shù)量 async.retry.limit=100 #?補(bǔ)償最大查詢數(shù)量 async.comp.limit=100 #?登錄攔截:默認(rèn)false async.login=false
14 用法
1,異步開關(guān)
scm.async.enabled=true
2,在需要異步執(zhí)行的方法加注解 (必須是spring代理方法)
@AsyncExec(type?=?AsyncExecEnum.SAVE_ASYNC,?remark?=?"數(shù)據(jù)字典")
3,人工處理地址
http://localhost:8004/async/index.html
15 注意
1,應(yīng)用名稱
spring.application.name
2,隊(duì)列名稱
${async.topic:${spring.application.name}}_async_queue
自定義topic:async.topic=xxx
3,自己業(yè)務(wù)要做冪等
4,一個(gè)應(yīng)用公用一個(gè)隊(duì)列
自產(chǎn)自消
5,定時(shí)任務(wù)
異步重試定時(shí)任務(wù)(2分鐘重試一次,可配置重試次數(shù))
異步補(bǔ)償定時(shí)任務(wù)(一小時(shí)補(bǔ)償一次,創(chuàng)建時(shí)間在一小時(shí)之前的)
16 效果展示
編輯:黃飛
?
評(píng)論
查看更多