一、背景
限流對于一個微服務(wù)架構(gòu)系統(tǒng)來說具有非常重要的意義,否則其中的某個微服務(wù)將成為整個系統(tǒng)隱藏的雪崩因素,為什么這么說?
舉例來講,某個SAAS平臺有100多個微服務(wù)應(yīng)用,但是作為底層的某個或某幾個應(yīng)用來說,將會被所有上層應(yīng)用頻繁調(diào)用,業(yè)務(wù)高峰期時,如果底層應(yīng)用不做限流處理,該應(yīng)用必將面臨著巨大的壓力,尤其是那些個別被高頻調(diào)用的接口來說,最直接的表現(xiàn)就是導(dǎo)致后續(xù)新進來的請求阻塞、排隊、響應(yīng)超時...最后直到該服務(wù)所在JVM資源被耗盡。
二、限流概述
在大多數(shù)的微服務(wù)架構(gòu)在設(shè)計之初,比如在技術(shù)選型階段,架構(gòu)師會從一個全局的視角去規(guī)劃技術(shù)棧的組合,比如結(jié)合當(dāng)前產(chǎn)品的現(xiàn)狀考慮是使用dubbo?還是springcloud?作為微服務(wù)治理的底層框架。甚至為了滿足快速的上線、迭代和交付,直接以springboot為基座進行開發(fā),后續(xù)再引入新的技術(shù)棧等...
所以在談?wù)撃硞€業(yè)務(wù)場景具體的技術(shù)解決方案時不可一概而論,而是需要結(jié)合產(chǎn)品和業(yè)務(wù)的現(xiàn)狀綜合評估,以限流來說,在下面的不同的技術(shù)架構(gòu)下具體在選擇的時候可能也不一樣。
2.1 dubbo 服務(wù)治理模式
選擇dubbo框架作為基礎(chǔ)服務(wù)治理對于那種偏向內(nèi)部平臺的應(yīng)用還是不錯的,dubbo底層走netty,這一點相比http協(xié)議來說,在一定場景下還是具有優(yōu)勢的,如果選擇dubbo,在選擇限流方案上可以做如下的參考。
2.1.1 dubbo框架級限流
dubbo官方提供了完善的服務(wù)治理,能夠滿足大多數(shù)開發(fā)場景中的需求,針對限流這個場景,具體來說包括如下手段,具體的配置,可以參考官方手冊;
客戶端限流
信號量限流 (通過統(tǒng)計的方式)
連接數(shù)限流 (socket->tcp)
服務(wù)端限流
線程池限流 (隔離手段)
信號量限流 (非隔離手段)
接收數(shù)限流 (socket->tcp)
2.1.2 線程池設(shè)置
多線程并發(fā)操作一定離不開線程池,Dubbo自身提供了支持了四種線程池類型支持。生產(chǎn)者
2.1.3 集成第三方組件
如果是springboot框架的項目,可以考慮直接引入地方的組件或SDK,比如hystrix,guava,sentinel原生SDK等,如果技術(shù)實力足夠強甚至可以考慮自己造輪子。
2.2 springcloud 服務(wù)治理模式
如果你的服務(wù)治理框架選用的是springcloud或springcloud-alibaba,其框架自身的生態(tài)中已經(jīng)包含了相應(yīng)的限流組件,可以實現(xiàn)開箱即用,下面列舉幾種常用的基于springcloud框架的限流組件。
2.2.1 hystrix
Hystrix是Netflix開源的一款容錯框架,在springcloud早期推出市場的時候,作為springcloud生態(tài)中用于限流、熔斷、降級的一款組件。
Hystrix提供了限流功能,在springcloud架構(gòu)的系統(tǒng)中,可以在網(wǎng)關(guān)啟用Hystrix,進行限流處理,每個微服務(wù)也可以各自啟用Hystrix進行限流。
Hystrix默認使用線程隔離模式,可以通過線程數(shù)+隊列大小進行限流,具體參數(shù)配置可以參考官網(wǎng)相關(guān)資料。
2.2.2 sentinel
Sentinel 號稱分布式系統(tǒng)的流量防衛(wèi)兵,屬于springcloud-alibaba生態(tài)中的重要組件,面向分布式服務(wù)架構(gòu)的流量控制組件,主要以流量為切入點,從限流、流量整形、熔斷降級、系統(tǒng)負載保護、熱點防護等多個維度來幫助開發(fā)者保障微服務(wù)的穩(wěn)定性。
2.3 網(wǎng)關(guān)層限流
隨著微服務(wù)規(guī)模的增加,整個系統(tǒng)中很多微服務(wù)都需要實現(xiàn)限流這種需求時,就可以考慮在網(wǎng)關(guān)這一層進行限流了,通常來說,網(wǎng)關(guān)層的限流面向的是通用的業(yè)務(wù),比如那些惡意的請求,爬蟲,攻擊等,簡單來說,網(wǎng)關(guān)層面的限流提供了一層對系統(tǒng)整體的保護措施。
三、常用限流策略
3.1 限流常用的算法
不管是哪種限流組件,其底層的限流實現(xiàn)算法大同小異,這里列舉幾種常用的限流算法以供了解。
3.1.1 令牌桶算法
令牌桶算法是目前應(yīng)用最為廣泛的限流算法,顧名思義,它有以下兩個關(guān)鍵角色:
令牌 :獲取到令牌的Request才會被處理,其他Requests要么排隊要么被直接丟棄;
桶 :用來裝令牌的地方,所有Request都從這個桶里面獲取令牌
令牌桶主要涉及到2個過程,即令牌的生成,令牌的獲取
3.1.2 漏桶算法
漏桶算法的前半段和令牌桶類似,但是操作的對象不同,結(jié)合下圖進行理解。
令牌桶是將令牌放入桶里,而漏桶是將訪問請求的數(shù)據(jù)包放到桶里。同樣的是,如果桶滿了,那么后面新來的數(shù)據(jù)包將被丟棄。
3.1.3 滑動時間窗口
根據(jù)下圖,簡單描述下滑動時間窗口這種過程:
黑色大框為時間窗口,可以設(shè)定窗口時間單位為5秒,它會隨著時間推移向后滑動。我們將窗口內(nèi)的時間劃分為五個小格子,每個格子代表1秒鐘,同時這個格子還包含一個計數(shù)器,用來計算在當(dāng)前時間內(nèi)訪問的請求數(shù)量。那么這個時間窗口內(nèi)的總訪問量就是所有格子計數(shù)器累加后的數(shù)值;
比如說,我們在每一秒內(nèi)有5個用戶訪問,第5秒內(nèi)有10個用戶訪問,那么在0到5秒這個時間窗口內(nèi)訪問量就是15。如果我們的接口設(shè)置了時間窗口內(nèi)訪問上限是20,那么當(dāng)時間到第六秒的時候,這個時間窗口內(nèi)的計數(shù)總和就變成了10,因為1秒的格子已經(jīng)退出了時間窗口,因此在第六秒內(nèi)可以接收的訪問量就是20-10=10個;
滑動窗口其實也是一種計算器算法,它有一個顯著特點,當(dāng)時間窗口的跨度越長時,限流效果就越平滑。打個比方,如果當(dāng)前時間窗口只有兩秒,而訪問請求全部集中在第一秒的時候,當(dāng)時間向后滑動一秒后,當(dāng)前窗口的計數(shù)量將發(fā)生較大的變化,拉長時間窗口可以降低這種情況的發(fā)生概率
四、通用限流實現(xiàn)方案
拋開網(wǎng)關(guān)層的限流先不說,在微服務(wù)應(yīng)用中,考慮到技術(shù)棧的組合,團隊人員的開發(fā)水平,以及易維護性等因素,一個比較通用的做法是,利用AOP技術(shù)+自定義注解實現(xiàn)對特定的方法或接口進行限流,下面基于這個思路來分別介紹下幾種常用的限流方案的實現(xiàn)。
4.1 基于guava限流實現(xiàn)
guava為谷歌開源的一個比較實用的組件,利用這個組件可以幫助開發(fā)人員完成常規(guī)的限流操作,接下來看具體的實現(xiàn)步驟。
4.1.1 引入guava依賴
版本可以選擇更高的或其他版本
com.google.guava guava 23.0
4.1.2 自定義限流注解
自定義一個限流用的注解,后面在需要限流的方法或接口上面只需添加該注解即可;
importjava.lang.annotation.ElementType; importjava.lang.annotation.Retention; importjava.lang.annotation.RetentionPolicy; importjava.lang.annotation.Target; @Target(value=ElementType.METHOD) @Retention(value=RetentionPolicy.RUNTIME) public@interfaceRateConfigAnno{ StringlimitType(); doublelimitCount()default5d; }
4.1.3 限流AOP類
通過AOP前置通知的方式攔截添加了上述自定義限流注解的方法,解析注解中的屬性值,并以該屬性值作為guava提供的限流參數(shù),該類為整個實現(xiàn)的核心所在。
importcom.alibaba.fastjson2.JSONObject; importcom.google.common.util.concurrent.RateLimiter; importorg.aspectj.lang.JoinPoint; importorg.aspectj.lang.annotation.Aspect; importorg.aspectj.lang.annotation.Before; importorg.slf4j.Logger; importorg.slf4j.LoggerFactory; importorg.springframework.stereotype.Component; importorg.springframework.web.context.request.RequestContextHolder; importorg.springframework.web.context.request.ServletRequestAttributes; importjavax.servlet.ServletOutputStream; importjavax.servlet.http.HttpServletResponse; importjava.io.IOException; importjava.lang.reflect.Method; importjava.util.Objects; @Aspect @Component publicclassGuavaLimitAop{ privatestaticLoggerlogger=LoggerFactory.getLogger(GuavaLimitAop.class); @Before("execution(@RateConfigAnno**(..))") publicvoidlimit(JoinPointjoinPoint){ //1、獲取當(dāng)前的調(diào)用方法 MethodcurrentMethod=getCurrentMethod(joinPoint); if(Objects.isNull(currentMethod)){ return; } //2、從方法注解定義上獲取限流的類型 StringlimitType=currentMethod.getAnnotation(RateConfigAnno.class).limitType(); doublelimitCount=currentMethod.getAnnotation(RateConfigAnno.class).limitCount(); //使用guava的令牌桶算法獲取一個令牌,獲取不到先等待 RateLimiterrateLimiter=RateLimitHelper.getRateLimiter(limitType,limitCount); booleanb=rateLimiter.tryAcquire(); if(b){ System.out.println("獲取到令牌"); }else{ HttpServletResponseresp=((ServletRequestAttributes)RequestContextHolder.getRequestAttributes()).getResponse(); JSONObjectjsonObject=newJSONObject(); jsonObject.put("success",false); jsonObject.put("msg","限流中"); try{ output(resp,jsonObject.toJSONString()); }catch(Exceptione){ logger.error("error,e:{}",e); } } } privateMethodgetCurrentMethod(JoinPointjoinPoint){ Method[]methods=joinPoint.getTarget().getClass().getMethods(); Methodtarget=null; for(Methodmethod:methods){ if(method.getName().equals(joinPoint.getSignature().getName())){ target=method; break; } } returntarget; } publicvoidoutput(HttpServletResponseresponse,Stringmsg)throwsIOException{ response.setContentType("application/json;charset=UTF-8"); ServletOutputStreamoutputStream=null; try{ outputStream=response.getOutputStream(); outputStream.write(msg.getBytes("UTF-8")); }catch(IOExceptione){ e.printStackTrace(); }finally{ outputStream.flush(); outputStream.close(); } } }
其中限流的核心API即為RateLimiter這個對象,涉及到的RateLimitHelper類如下
importcom.google.common.util.concurrent.RateLimiter; importjava.util.HashMap; importjava.util.Map; publicclassRateLimitHelper{ privateRateLimitHelper(){} privatestaticMaprateMap=newHashMap<>(); publicstaticRateLimitergetRateLimiter(StringlimitType,doublelimitCount){ RateLimiterrateLimiter=rateMap.get(limitType); if(rateLimiter==null){ rateLimiter=RateLimiter.create(limitCount); rateMap.put(limitType,rateLimiter); } returnrateLimiter; } }
4.1.4 測試接口
下面添加一個測試接口,測試一下上面的代碼是否生效
@RestController publicclassOrderController{ //localhost:8081/save @GetMapping("/save") @RateConfigAnno(limitType="saveOrder",limitCount=1) publicStringsave(){ return"success"; } }
在接口中為了模擬出效果,我們將參數(shù)設(shè)置的非常小,即QPS為1,可以預(yù)想當(dāng)每秒請求超過1時將會出現(xiàn)被限流的提示,啟動工程并驗證接口,每秒1次的請求,可以正常得到結(jié)果,效果如下:
快速刷接口,將會看到下面的效果
4.2 基于sentinel限流實現(xiàn)
在不少同學(xué)的意識中,sentinel通常是需要結(jié)合springcloud-alibaba框架一起實用的,而且與框架集成之后,可以配合控制臺一起使用達到更好的效果,實際上,sentinel官方也提供了相對原生的SDK可供使用,接下來就以這種方式進行整合。
4.2.1 引入sentinel核心依賴包
com.alibaba.csp sentinel-core 1.8.0
4.2.2 自定義限流注解
可以根據(jù)需要,添加更多的屬性
importjava.lang.annotation.ElementType; importjava.lang.annotation.Retention; importjava.lang.annotation.RetentionPolicy; importjava.lang.annotation.Target; @Target(value=ElementType.METHOD) @Retention(value=RetentionPolicy.RUNTIME) public@interfaceSentinelLimitAnnotation{ StringresourceName(); intlimitCount()default5; }
4.2.3 自定義AOP類實現(xiàn)限流
該類的實現(xiàn)思路與上述使用guava類似,不同的是,這里使用的是sentinel原生的限流相關(guān)的API,對此不夠?qū)傩缘目梢圆殚喒俜降奈臋n進行學(xué)習(xí),這里就不展開來說了。
importcom.alibaba.csp.sentinel.Entry; importcom.alibaba.csp.sentinel.SphU; importcom.alibaba.csp.sentinel.Tracer; importcom.alibaba.csp.sentinel.slots.block.BlockException; importcom.alibaba.csp.sentinel.slots.block.RuleConstant; importcom.alibaba.csp.sentinel.slots.block.flow.FlowRule; importcom.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; importorg.apache.commons.lang3.StringUtils; importorg.aspectj.lang.JoinPoint; importorg.aspectj.lang.ProceedingJoinPoint; importorg.aspectj.lang.annotation.Around; importorg.aspectj.lang.annotation.Aspect; importorg.aspectj.lang.annotation.Pointcut; importorg.springframework.stereotype.Component; importjava.lang.reflect.Method; importjava.util.ArrayList; importjava.util.List; importjava.util.Objects; @Aspect @Component publicclassSentinelMethodLimitAop{ privatestaticvoidinitFlowRule(StringresourceName,intlimitCount){ Listrules=newArrayList<>(); FlowRulerule=newFlowRule(); //設(shè)置受保護的資源 rule.setResource(resourceName); //設(shè)置流控規(guī)則QPS rule.setGrade(RuleConstant.FLOW_GRADE_QPS); //設(shè)置受保護的資源閾值 rule.setCount(limitCount); rules.add(rule); //加載配置好的規(guī)則 FlowRuleManager.loadRules(rules); } @Pointcut(value="@annotation(com.congge.sentinel.SentinelLimitAnnotation)") publicvoidrateLimit(){ } @Around("rateLimit()") publicObjectaround(ProceedingJoinPointjoinPoint){ //1、獲取當(dāng)前的調(diào)用方法 MethodcurrentMethod=getCurrentMethod(joinPoint); if(Objects.isNull(currentMethod)){ returnnull; } //2、從方法注解定義上獲取限流的類型 StringresourceName=currentMethod.getAnnotation(SentinelLimitAnnotation.class).resourceName(); if(StringUtils.isEmpty(resourceName)){ thrownewRuntimeException("資源名稱為空"); } intlimitCount=currentMethod.getAnnotation(SentinelLimitAnnotation.class).limitCount(); initFlowRule(resourceName,limitCount); Entryentry=null; Objectresult=null; try{ entry=SphU.entry(resourceName); try{ result=joinPoint.proceed(); }catch(Throwablethrowable){ throwable.printStackTrace(); } }catch(BlockExceptionex){ //資源訪問阻止,被限流或被降級 //在此處進行相應(yīng)的處理操作 System.out.println("blocked"); return"被限流了"; }catch(Exceptione){ Tracer.traceEntry(e,entry); }finally{ if(entry!=null){ entry.exit(); } } returnresult; } privateMethodgetCurrentMethod(JoinPointjoinPoint){ Method[]methods=joinPoint.getTarget().getClass().getMethods(); Methodtarget=null; for(Methodmethod:methods){ if(method.getName().equals(joinPoint.getSignature().getName())){ target=method; break; } } returntarget; } }
4.2.4 自定義測試接口
為了模擬效果,這里將QPS的數(shù)量設(shè)置為1
//localhost:8081/limit @GetMapping("/limit") @SentinelLimitAnnotation(limitCount=1,resourceName="sentinelLimit") publicStringsentinelLimit(){ return"sentinelLimit"; }
啟動工程之后,瀏覽器調(diào)用接口測試一下,每秒一個請求,可以正常通過
快速刷接口,超過每秒1次時,效果如下
這里只是為了演示出效果,建議在真實的項目中使用時,對返回結(jié)果做一個封裝。
4.3 基于redis+lua限流實現(xiàn)
redis是線程安全的,天然具有線程安全的特性,支持原子性操作,限流服務(wù)不僅需要承接超高QPS,還要保證限流邏輯的執(zhí)行層面具備線程安全的特性,利用Redis這些特性做限流,既能保證線程安全,也能保證性能?;趓edis的限流實現(xiàn)完整流程如下圖:
結(jié)合上面的流程圖,這里梳理出一個整體的實現(xiàn)思路:
編寫lua腳本,指定入?yún)⒌南蘖饕?guī)則,比如對特定的接口限流時,可以根據(jù)某個或幾個參數(shù)進行判定,調(diào)用該接口的請求,在一定的時間窗口內(nèi)監(jiān)控請求次數(shù);
既然是限流,最好能夠通用,可將限流規(guī)則應(yīng)用到任何接口上,那么最合適的方式就是通過自定義注解形式切入;
提供一個配置類,被spring的容器管理,redisTemplate中提供了DefaultRedisScript這個bean;
提供一個能動態(tài)解析接口參數(shù)的類,根據(jù)接口參數(shù)進行規(guī)則匹配后觸發(fā)限流;
4.3.1 引入redis依賴
org.springframework.boot spring-boot-starter-data-redis
4.3.2 自定義注解
@Target({ElementType.METHOD,ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented public@interfaceRedisLimitAnnotation{ /** *key */ Stringkey()default""; /** *Key的前綴 */ Stringprefix()default""; /** *一定時間內(nèi)最多訪問次數(shù) */ intcount(); /** *給定的時間范圍單位(秒) */ intperiod(); /** *限流的類型(用戶自定義key或者請求ip) */ LimitTypelimitType()defaultLimitType.CUSTOMER; }
4.3.3 自定義redis配置類
importorg.springframework.context.annotation.Bean; importorg.springframework.core.io.ClassPathResource; importorg.springframework.data.redis.connection.RedisConnectionFactory; importorg.springframework.data.redis.core.RedisTemplate; importorg.springframework.data.redis.core.script.DefaultRedisScript; importorg.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; importorg.springframework.data.redis.serializer.StringRedisSerializer; importorg.springframework.scripting.support.ResourceScriptSource; importorg.springframework.stereotype.Component; importjava.io.Serializable; @Component publicclassRedisConfiguration{ @Bean publicDefaultRedisScriptredisluaScript(){ DefaultRedisScript redisScript=newDefaultRedisScript<>(); redisScript.setScriptSource(newResourceScriptSource(newClassPathResource("limit.lua"))); redisScript.setResultType(Number.class); returnredisScript; } @Bean("redisTemplate") publicRedisTemplate redisTemplate(RedisConnectionFactoryredisConnectionFactory){ RedisTemplate redisTemplate=newRedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); Jackson2JsonRedisSerializerjackson2JsonRedisSerializer=newJackson2JsonRedisSerializer(Object.class); //設(shè)置value的序列化方式為JSOn redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); //設(shè)置key的序列化方式為String redisTemplate.setKeySerializer(newStringRedisSerializer()); redisTemplate.setHashKeySerializer(newStringRedisSerializer()); redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); redisTemplate.afterPropertiesSet(); returnredisTemplate; } }
4.3.4 自定義限流AOP類
importorg.aspectj.lang.ProceedingJoinPoint; importorg.aspectj.lang.annotation.Around; importorg.aspectj.lang.annotation.Aspect; importorg.aspectj.lang.annotation.Pointcut; importorg.aspectj.lang.reflect.MethodSignature; importorg.slf4j.Logger; importorg.slf4j.LoggerFactory; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.context.annotation.Configuration; importorg.springframework.data.redis.core.RedisTemplate; importorg.springframework.data.redis.core.script.DefaultRedisScript; importorg.springframework.web.context.request.RequestContextHolder; importorg.springframework.web.context.request.ServletRequestAttributes; importjavax.servlet.http.HttpServletRequest; importjava.io.Serializable; importjava.lang.reflect.Method; importjava.util.Collections; importjava.util.List; @Aspect @Configuration publicclassLimitRestAspect{ privatestaticfinalLoggerlogger=LoggerFactory.getLogger(LimitRestAspect.class); @Autowired privateRedisTemplateredisTemplate; @Autowired privateDefaultRedisScript redisluaScript; @Pointcut(value="@annotation(com.congge.config.limit.RedisLimitAnnotation)") publicvoidrateLimit(){ } @Around("rateLimit()") publicObjectinterceptor(ProceedingJoinPointjoinPoint)throwsThrowable{ MethodSignaturesignature=(MethodSignature)joinPoint.getSignature(); Methodmethod=signature.getMethod(); Class>targetClass=method.getDeclaringClass(); RedisLimitAnnotationrateLimit=method.getAnnotation(RedisLimitAnnotation.class); if(rateLimit!=null){ HttpServletRequestrequest=((ServletRequestAttributes)RequestContextHolder.getRequestAttributes()).getRequest(); StringipAddress=getIpAddr(request); StringBufferstringBuffer=newStringBuffer(); stringBuffer.append(ipAddress).append("-") .append(targetClass.getName()).append("-") .append(method.getName()).append("-") .append(rateLimit.key()); List keys=Collections.singletonList(stringBuffer.toString()); //調(diào)用lua腳本,獲取返回結(jié)果,這里即為請求的次數(shù) Numbernumber=redisTemplate.execute( redisluaScript, keys, rateLimit.count(), rateLimit.period() ); if(number!=null&&number.intValue()!=0&&number.intValue()<=?rateLimit.count())?{ ????????????????logger.info("限流時間段內(nèi)訪問了第:{}?次",?number.toString()); ????????????????return?joinPoint.proceed(); ????????????} ????????}?else?{ ????????????return?joinPoint.proceed(); ????????} ????????throw?new?RuntimeException("訪問頻率過快,被限流了"); ????} ? ????/** ?????*?獲取請求的IP方法 ?????*?@param?request ?????*?@return ?????*/ ????private?static?String?getIpAddr(HttpServletRequest?request)?{ ????????String?ipAddress?=?null; ????????try?{ ????????????ipAddress?=?request.getHeader("x-forwarded-for"); ????????????if?(ipAddress?==?null?||?ipAddress.length()?==?0?||?"unknown".equalsIgnoreCase(ipAddress))?{ ????????????????ipAddress?=?request.getHeader("Proxy-Client-IP"); ????????????} ????????????if?(ipAddress?==?null?||?ipAddress.length()?==?0?||?"unknown".equalsIgnoreCase(ipAddress))?{ ????????????????ipAddress?=?request.getHeader("WL-Proxy-Client-IP"); ????????????} ????????????if?(ipAddress?==?null?||?ipAddress.length()?==?0?||?"unknown".equalsIgnoreCase(ipAddress))?{ ????????????????ipAddress?=?request.getRemoteAddr(); ????????????} ????????????//?對于通過多個代理的情況,第一個IP為客戶端真實IP,多個IP按照','分割 ????????????if?(ipAddress?!=?null?&&?ipAddress.length()?>15){ if(ipAddress.indexOf(",")>0){ ipAddress=ipAddress.substring(0,ipAddress.indexOf(",")); } } }catch(Exceptione){ ipAddress=""; } returnipAddress; } }
該類要做的事情和上面的兩種限流措施類似,不過在這里核心的限流是通過讀取lua腳步,通過參數(shù)傳遞給lua腳步實現(xiàn)的。
4.3.5 自定義lua腳本
在工程的resources目錄下,添加如下的lua腳本
localkey="rate.limit:"..KEYS[1] locallimit=tonumber(ARGV[1]) localcurrent=tonumber(redis.call('get',key)or"0") ifcurrent+1>limitthen return0 else --沒有超閾值,將當(dāng)前訪問數(shù)量+1,并設(shè)置2秒過期(可根據(jù)自己的業(yè)務(wù)情況調(diào)整) redis.call("INCRBY",key,"1") redis.call("expire",key,"2") returncurrent+1 end
4.3.6 添加測試接口
@RestController publicclassRedisController{ //localhost:8081/redis/limit @GetMapping("/redis/limit") @RedisLimitAnnotation(key="queryFromRedis",period=1,count=1) publicStringqueryFromRedis(){ return"success"; } }
為了模擬效果,這里將QPS設(shè)置為1 ,啟動工程后(提前啟動redis服務(wù)),調(diào)用一下接口,正常的效果如下:
快速刷接口,超過每秒1次的請求時看到如下效果
五、自定義starter限流實現(xiàn)
上面通過案例介紹了幾種常用的限流實現(xiàn),不過細心的同學(xué)可以看到,這些限流的實現(xiàn)都是在具體的工程模塊中嵌入的,事實上,在真實的微服務(wù)開發(fā)中,一個項目可能包含了眾多的微服務(wù)模塊,為了減少重復(fù)造輪子,避免每個微服務(wù)模塊中單獨實現(xiàn),可以考慮將限流的邏輯實現(xiàn)封裝成一個SDK,即作為一個springboot的starter的方式被其他微服務(wù)模塊進行引用即可。這也是目前很多生產(chǎn)實踐中比較通用的做法,接下來看看具體的實現(xiàn)吧。
5.1 前置準備
創(chuàng)建一個空的springboot工程,工程目錄結(jié)構(gòu)如下圖,目錄說明:
annotation:存放自定義的限流相關(guān)的注解;
aop:存放不同的限流實現(xiàn),比如基于guava的aop,基于sentinel的aop實現(xiàn)等;
spring.factories:自定義待裝配的aop實現(xiàn)類;
5.2 代碼整合完成步驟
5.2.1 導(dǎo)入基礎(chǔ)的依賴
這里包括如下幾個必須的依賴,其他的依賴可以結(jié)合自身的情況合理選擇;
spring-boot-starter;
guava;
spring-boot-autoconfigure;
sentinel-core;
org.springframework.boot spring-boot-starter-parent 2.2.1.RELEASE UTF-8 UTF-8 1.8 org.springframework.boot spring-boot-starter-aop log4j log4j 1.2.17 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-web org.projectlombok lombok com.google.guava guava 23.0 org.springframework.boot spring-boot-autoconfigure 2.2.1.RELEASE org.springframework.boot spring-boot-configuration-processor 2.2.1.RELEASE com.alibaba.csp sentinel-core 1.8.0 org.apache.commons commons-lang3 3.4 com.alibaba.fastjson2 fastjson2 2.0.22 src/main/resources **/**
5.2.2 自定義注解
目前該SDK支持三種限流方式,即后續(xù)其他微服務(wù)工程中可以通過添加這3種注解即可實現(xiàn)限流,分別是基于guava的令牌桶,基于sentinel的限流,基于java自帶的Semaphore限流,三個自定義注解類如下:
令牌桶
@Documented @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public@interfaceTokenBucketLimiter{ intvalue()default50; }
Semaphore
@Documented @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public@interfaceShLimiter{ intvalue()default50; }
sentinel
@Target(value=ElementType.METHOD) @Retention(value=RetentionPolicy.RUNTIME) public@interfaceSentinelLimiter{ StringresourceName(); intlimitCount()default50; }
5.2.3 限流實現(xiàn)AOP類
具體的限流在AOP中進行實現(xiàn),思路和上一章節(jié)類似,即通過環(huán)繞通知的方式,先解析那些添加了限流注解的方法,然后解析里面的參數(shù),進行限流的業(yè)務(wù)實現(xiàn)。
基于guava的aop實現(xiàn)
importcom.alibaba.fastjson2.JSONObject; importcom.congge.annotation.TokenBucketLimiter; importcom.google.common.util.concurrent.RateLimiter; importlombok.extern.slf4j.Slf4j; importorg.aspectj.lang.ProceedingJoinPoint; importorg.aspectj.lang.annotation.Around; importorg.aspectj.lang.annotation.Aspect; importorg.aspectj.lang.annotation.Pointcut; importorg.springframework.cglib.core.ReflectUtils; importorg.springframework.stereotype.Component; importorg.springframework.web.context.request.RequestContextHolder; importorg.springframework.web.context.request.ServletRequestAttributes; importjavax.servlet.ServletOutputStream; importjavax.servlet.http.HttpServletResponse; importjava.io.IOException; importjava.lang.reflect.Method; importjava.util.Arrays; importjava.util.Map; importjava.util.concurrent.ConcurrentHashMap; @Aspect @Component @Slf4j publicclassGuavaLimiterAop{ privatefinalMaprateLimiters=newConcurrentHashMap (); @Pointcut("@annotation(com.congge.annotation.TokenBucketLimiter)") publicvoidaspect(){ } @Around(value="aspect()") publicObjectaround(ProceedingJoinPointpoint)throwsThrowable{ log.debug("準備限流"); Objecttarget=point.getTarget(); StringtargetName=target.getClass().getName(); StringmethodName=point.getSignature().getName(); Object[]arguments=point.getArgs(); Class>targetClass=Class.forName(targetName); Class>[]argTypes=ReflectUtils.getClasses(arguments); Methodmethod=targetClass.getDeclaredMethod(methodName,argTypes); //獲取目標method上的限流注解@Limiter TokenBucketLimiterlimiter=method.getAnnotation(TokenBucketLimiter.class); RateLimiterrateLimiter=null; Objectresult=null; if(null!=limiter){ //以class+method+parameters為key,避免重載、重寫帶來的混亂 Stringkey=targetName+"."+methodName+Arrays.toString(argTypes); rateLimiter=rateLimiters.get(key); if(null==rateLimiter){ //獲取限定的流量 //為了防止并發(fā) rateLimiters.putIfAbsent(key,RateLimiter.create(limiter.value())); rateLimiter=rateLimiters.get(key); } booleanb=rateLimiter.tryAcquire(); if(b){ log.debug("得到令牌,準備執(zhí)行業(yè)務(wù)"); result=point.proceed(); }else{ HttpServletResponseresp=((ServletRequestAttributes)RequestContextHolder.getRequestAttributes()).getResponse(); JSONObjectjsonObject=newJSONObject(); jsonObject.put("success",false); jsonObject.put("msg","限流中"); try{ output(resp,jsonObject.toJSONString()); }catch(Exceptione){ log.error("error,e:{}",e); } } }else{ result=point.proceed(); } log.debug("退出限流"); returnresult; } publicvoidoutput(HttpServletResponseresponse,Stringmsg)throwsIOException{ response.setContentType("application/json;charset=UTF-8"); ServletOutputStreamoutputStream=null; try{ outputStream=response.getOutputStream(); outputStream.write(msg.getBytes("UTF-8")); }catch(IOExceptione){ e.printStackTrace(); }finally{ outputStream.flush(); outputStream.close(); } } }
基于Semaphore的aop實現(xiàn)
importcom.congge.annotation.ShLimiter; importlombok.extern.slf4j.Slf4j; importorg.aspectj.lang.ProceedingJoinPoint; importorg.aspectj.lang.annotation.Around; importorg.aspectj.lang.annotation.Aspect; importorg.aspectj.lang.annotation.Pointcut; importorg.slf4j.Logger; importorg.slf4j.LoggerFactory; importorg.springframework.cglib.core.ReflectUtils; importorg.springframework.stereotype.Component; importjava.lang.reflect.Method; importjava.util.Arrays; importjava.util.Map; importjava.util.concurrent.ConcurrentHashMap; importjava.util.concurrent.Semaphore; @Aspect @Component @Slf4j publicclassSemaphoreLimiterAop{ privatefinalMapsemaphores=newConcurrentHashMap (); privatefinalstaticLoggerLOG=LoggerFactory.getLogger(SemaphoreLimiterAop.class); @Pointcut("@annotation(com.congge.annotation.ShLimiter)") publicvoidaspect(){ } @Around(value="aspect()") publicObjectaround(ProceedingJoinPointpoint)throwsThrowable{ log.debug("進入限流aop"); Objecttarget=point.getTarget(); StringtargetName=target.getClass().getName(); StringmethodName=point.getSignature().getName(); Object[]arguments=point.getArgs(); Class>targetClass=Class.forName(targetName); Class>[]argTypes=ReflectUtils.getClasses(arguments); Methodmethod=targetClass.getDeclaredMethod(methodName,argTypes); //獲取目標method上的限流注解@Limiter ShLimiterlimiter=method.getAnnotation(ShLimiter.class); Objectresult=null; if(null!=limiter){ //以class+method+parameters為key,避免重載、重寫帶來的混亂 Stringkey=targetName+"."+methodName+Arrays.toString(argTypes); //獲取限定的流量 Semaphoresemaphore=semaphores.get(key); if(null==semaphore){ semaphores.putIfAbsent(key,newSemaphore(limiter.value())); semaphore=semaphores.get(key); } try{ semaphore.acquire(); result=point.proceed(); }finally{ if(null!=semaphore){ semaphore.release(); } } }else{ result=point.proceed(); } log.debug("退出限流"); returnresult; } }
基于sentinel的aop實現(xiàn)
importcom.alibaba.csp.sentinel.Entry; importcom.alibaba.csp.sentinel.SphU; importcom.alibaba.csp.sentinel.Tracer; importcom.alibaba.csp.sentinel.slots.block.BlockException; importcom.alibaba.csp.sentinel.slots.block.RuleConstant; importcom.alibaba.csp.sentinel.slots.block.flow.FlowRule; importcom.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; importcom.congge.annotation.SentinelLimiter; importorg.apache.commons.lang3.StringUtils; importorg.aspectj.lang.JoinPoint; importorg.aspectj.lang.ProceedingJoinPoint; importorg.aspectj.lang.annotation.Around; importorg.aspectj.lang.annotation.Aspect; importorg.aspectj.lang.annotation.Pointcut; importorg.springframework.stereotype.Component; importjava.lang.reflect.Method; importjava.util.ArrayList; importjava.util.List; importjava.util.Objects; @Aspect @Component publicclassSentinelLimiterAop{ privatestaticvoidinitFlowRule(StringresourceName,intlimitCount){ Listrules=newArrayList<>(); FlowRulerule=newFlowRule(); //設(shè)置受保護的資源 rule.setResource(resourceName); //設(shè)置流控規(guī)則QPS rule.setGrade(RuleConstant.FLOW_GRADE_QPS); //設(shè)置受保護的資源閾值 rule.setCount(limitCount); rules.add(rule); //加載配置好的規(guī)則 FlowRuleManager.loadRules(rules); } @Pointcut(value="@annotation(com.congge.annotation.SentinelLimiter)") publicvoidrateLimit(){ } @Around("rateLimit()") publicObjectaround(ProceedingJoinPointjoinPoint){ //1、獲取當(dāng)前的調(diào)用方法 MethodcurrentMethod=getCurrentMethod(joinPoint); if(Objects.isNull(currentMethod)){ returnnull; } //2、從方法注解定義上獲取限流的類型 StringresourceName=currentMethod.getAnnotation(SentinelLimiter.class).resourceName(); if(StringUtils.isEmpty(resourceName)){ thrownewRuntimeException("資源名稱為空"); } intlimitCount=currentMethod.getAnnotation(SentinelLimiter.class).limitCount(); initFlowRule(resourceName,limitCount); Entryentry=null; Objectresult=null; try{ entry=SphU.entry(resourceName); try{ result=joinPoint.proceed(); }catch(Throwablethrowable){ throwable.printStackTrace(); } }catch(BlockExceptionex){ //資源訪問阻止,被限流或被降級 //在此處進行相應(yīng)的處理操作 System.out.println("blocked"); return"被限流了"; }catch(Exceptione){ Tracer.traceEntry(e,entry); }finally{ if(entry!=null){ entry.exit(); } } returnresult; } privateMethodgetCurrentMethod(JoinPointjoinPoint){ Method[]methods=joinPoint.getTarget().getClass().getMethods(); Methodtarget=null; for(Methodmethod:methods){ if(method.getName().equals(joinPoint.getSignature().getName())){ target=method; break; } } returntarget; } }
5.2.4 配置自動裝配AOP實現(xiàn)
在resources目錄下創(chuàng)建上述的spring.factories文件,內(nèi)容如下,通過這種方式配置后,其他應(yīng)用模塊引入了當(dāng)前的SDK的jar之后,就可以實現(xiàn)開箱即用了;
org.springframework.boot.autoconfigure.EnableAutoConfiguration= com.congge.aop.SemaphoreLimiterAop, com.congge.aop.GuavaLimiterAop, com.congge.aop.SemaphoreLimiterAop
5.2.5 將工程打成jar進行安裝
這一步比較簡單就跳過了
5.2.6 在其他的工程中引入上述SDK
cm.congge biz-limit 1.0-SNAPSHOT
5.2.7 編寫測試接口
在其他工程中,編寫一個測試接口,并使用上面的注解,這里以guava的限流注解為例進行說明
importcom.congge.annotation.TokenBucketLimiter; importorg.springframework.web.bind.annotation.GetMapping; importorg.springframework.web.bind.annotation.RestController; @RestController publicclassSdkController{ //localhost:8081/query @GetMapping("/query") @TokenBucketLimiter(1) publicStringqueryUser(){ return"queryUser"; } }
5.2.8 功能測試
啟動當(dāng)前的工程后,正常調(diào)用接口,每秒一次的請求,可以正常得到結(jié)果
快速刷接口,QPS超過1之后,將會觸發(fā)限流,看到如下效果
通過上面這種方式,也可以得到預(yù)期的效果,其他兩種限流注解有興趣的同學(xué)也可以繼續(xù)測試驗證,篇幅原因就不再贅述了。
上述通過starter的方式實現(xiàn)了一種更優(yōu)雅的限流集成方式,也是生產(chǎn)中比較推薦的一種方式,不過當(dāng)前的案例還比較粗糙,需要使用的同學(xué)還需根據(jù)自己的情況完善里面的邏輯,進一步的封裝以期得到更好的效果。
審核編輯:劉清
-
接收機
+關(guān)注
關(guān)注
8文章
1178瀏覽量
53377 -
計數(shù)器
+關(guān)注
關(guān)注
32文章
2253瀏覽量
94287 -
JVM
+關(guān)注
關(guān)注
0文章
157瀏覽量
12197 -
QPS
+關(guān)注
關(guān)注
0文章
24瀏覽量
8785 -
負載保護器
+關(guān)注
關(guān)注
0文章
4瀏覽量
5424 -
SpringBoot
+關(guān)注
關(guān)注
0文章
173瀏覽量
161
原文標題:SpringBoot 通用限流方案(VIP珍藏版)
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論