延遲任務(wù)
最近有一個需求,基于消息隊列對數(shù)據(jù)消費,并根據(jù)多次消費的結(jié)果對數(shù)據(jù)進(jìn)行重新組裝,如果在指定時間內(nèi),需要的數(shù)據(jù)全部到達(dá),則進(jìn)行數(shù)據(jù)組裝以及后續(xù)邏輯。簡單的說,設(shè)置一個超時時間,如果在該時間內(nèi)由MQ中消費到完整的數(shù)據(jù)則直接處理,否則進(jìn)入其他流程。
針對這種場景使用了延遲任務(wù)來實現(xiàn),以此為契機對延遲任務(wù)相關(guān)的技術(shù)做了個簡單了解...
簡介
延遲任務(wù)是一種指定任務(wù)在未來某個時間點或一定時間后執(zhí)行的方式。通常情況下,延遲任務(wù)可以通過設(shè)置任務(wù)的執(zhí)行時間或延遲時間來實現(xiàn)。
延遲任務(wù)可以用于異步操作、定時任務(wù)和任務(wù)調(diào)度等場景。例如,在用戶注冊后發(fā)送歡迎郵件或者在用戶下單后發(fā)送訂單確認(rèn)短信,可以通過延遲任務(wù)來實現(xiàn)異步操作。定時檢查服務(wù)器狀態(tài)、定時備份數(shù)據(jù)等任務(wù),也可以通過延遲任務(wù)來實現(xiàn)定時任務(wù)。在某個時間點觸發(fā)某個任務(wù)、在某個時間段內(nèi)重復(fù)執(zhí)行某個任務(wù)等,可以通過延遲任務(wù)來實現(xiàn)任務(wù)調(diào)度。
延遲任務(wù)通常使用隊列或者定時器來實現(xiàn)。在隊列中,任務(wù)會被添加到一個等待隊列中,等待隊列中的任務(wù)會在指定的時間點或延遲時間后被取出執(zhí)行。在定時器中,任務(wù)會被添加到一個定時器中,定時器會在指定的時間點觸發(fā)任務(wù)執(zhí)行。
總之,延遲任務(wù)是一種非常實用的技術(shù),可以幫助我們更好地管理系統(tǒng)中的異步操作、定時任務(wù)和任務(wù)調(diào)度等場景。
使用場景
異步操作:延遲任務(wù)可以用于異步操作,例如在用戶注冊后發(fā)送歡迎郵件或者在用戶下單后發(fā)送訂單確認(rèn)短信。通過使用延遲任務(wù),可以將這些操作推遲到后臺處理,從而提高系統(tǒng)的響應(yīng)速度和并發(fā)能力。
定時任務(wù):延遲任務(wù)可以用于定時任務(wù),例如定時檢查服務(wù)器狀態(tài)、定時備份數(shù)據(jù)等。通過使用延遲任務(wù),可以在指定的時間點自動觸發(fā)任務(wù),避免手動操作的繁瑣和容易出錯。
任務(wù)調(diào)度:延遲任務(wù)可以用于任務(wù)調(diào)度,例如在某個時間點觸發(fā)某個任務(wù)、在某個時間段內(nèi)重復(fù)執(zhí)行某個任務(wù)等。通過使用延遲任務(wù),可以方便地進(jìn)行任務(wù)調(diào)度,提高系統(tǒng)的可靠性和穩(wěn)定性。
技術(shù)實現(xiàn)
- 基于內(nèi)存,應(yīng)用重啟(或宕機)會導(dǎo)致任務(wù)丟失
- 基于內(nèi)存存放隊列,不支持集群
- 依據(jù)compareTo方法排列隊列,調(diào)用take阻塞式的取出第一個任務(wù)(不調(diào)用則不取出),比較不靈活,會影響時間的準(zhǔn)確性
- ScheduledThreadPoolExecutor
- 基于內(nèi)存,應(yīng)用重啟(或宕機)會導(dǎo)致任務(wù)丟失
- 基于內(nèi)存存放任務(wù),不支持集群
- 一個任務(wù)就要新建一個線程綁定任務(wù)的執(zhí)行,容易造成資源浪費
- Redis過期監(jiān)聽 基于Redis過期訂閱
- 客戶端斷開后重連會導(dǎo)致所有事件丟失
- 高并發(fā)場景下,存在大量的失效key場景會導(dǎo)出失效時間存在延遲
- 若有多個監(jiān)聽器監(jiān)聽該key,是會重復(fù)消費這個過期事件的,需要特定邏輯判斷
- MQ延遲隊列 基于消息死信隊列實現(xiàn) 支持集群,分布式,高并發(fā)場景;缺點:引入額外的消息隊列,增加項目的部署和維護的復(fù)雜度。
- HashedWheelTimer 基于Netty提供的工具類HashedWheelTimer HashedWheelTimer 是使用定時輪實現(xiàn)的,定時輪其實就是一種環(huán)型的數(shù)據(jù)結(jié)構(gòu),可以把它想象成一個時鐘, 分成了許多格子,每個格子代表一定的時間,在這個格子上用一個鏈表來保存要執(zhí)行的超時任務(wù),同時有一個指針一格一格的走,走到那個格子時就執(zhí)行格子對應(yīng)的延遲任務(wù),
其中前三種Timer、DelayQueue、ScheduledThreadPoolExecutor實現(xiàn)比較簡單,只不過只適用于單體應(yīng)用,任務(wù)數(shù)據(jù)都在內(nèi)存中,在系統(tǒng)崩潰后數(shù)據(jù)丟失;后兩張實現(xiàn)相對復(fù)雜,并且需要依賴于第三方應(yīng)用,在系統(tǒng)整體結(jié)構(gòu)上更加復(fù)雜且消耗更多資源,但能支持分布式系統(tǒng),且有較高的容錯性。
示例
定義延遲任務(wù)對象:
@Getter
public class DelayTask implements Serializable{
private static final long serialVersionUID = -5062977578344039366L;
private long delaySeconds;
private TaskExecute taskExecute;
public DelayTask(long delaySeconds, TaskExecute taskExecute) {
this.delaySeconds = delaySeconds;
this.taskExecute = taskExecute;
}
/**
*
*/
public void execute(){
taskExecute.run();
}
public interface TaskExecute extends Runnable, Serializable {
}
}
調(diào)度器:
public interface ScheduleTrigger {
/**
* 延遲任務(wù)調(diào)度
* @param delayTask
*/
void schedule(DelayTask delayTask);
}
- Timer
public class JavaTrigger implements ScheduleTrigger{
private Timer timer;
public JavaTimer(){
this.timer = new Timer();
}
/**
*
* @param delayTask
*/
public void schedule(DelayTask delayTask){
timer.schedule(buildTimerTask(delayTask.getTaskExecute()), toMillis(delayTask.getDelaySeconds()));
}
private TimerTask buildTimerTask(Runnable runnable){
return new TimerTask() {
@Override
public void run() {
runnable.run();
}
};
}
}
- DelayQueue
public class DelayQueueTrigger implements ScheduleTrigger{
private DelayQueue< Task > queue = new DelayQueue< >();
public DelayQueueTrigger() {
Thread thread = new Thread(() - > {
while (true) {
try {
Task task = queue.take();
if(task != null)
task.execute();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
thread.setDaemon(true);
thread.start();
}
/**
* @param delayTask
*/
public void schedule(DelayTask delayTask){
if( delayTask instanceof Task ){
queue.put((Task) delayTask);
}
}
}
class Task extends DelayTask implements Delayed{
private long execTime;
public Task(long delaySeconds, TaskExecute taskExecute) {
super(delaySeconds, taskExecute);
this.execTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(delaySeconds);
}
/**
* 輪詢執(zhí)行該方法判斷是否滿足執(zhí)行條件(<=0)
* 同時該返回作為等待時長
* @param unit the time unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return this.execTime - System.currentTimeMillis(); // ms
}
public long getExecTime() {
return execTime;
}
@Override
public int compareTo(Delayed other) {
if(this.getExecTime() == ((Task)other).getExecTime()){
return 0;
}
return this.getExecTime() > ((Task)other).getExecTime() ? 1: -1;
}
}
- ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor實現(xiàn)也是基于延遲隊列BlockingQueue實現(xiàn)
public class ScheduledExecutorTrigger implements ScheduleTrigger{
private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
public void schedule(DelayTask delayTask){
executorService.schedule(delayTask.getTaskExecute(), delayTask.getDelaySeconds(), TimeUnit.SECONDS);
}
}
- Redis過期監(jiān)聽
需要修改redis配置文件:notify-keyspace-events Ex
public class RedisTimer{
private static final String EXPIRATION_KEY = "REDIS_EXPIRATION_KEY";
@Configuration
@Import(RedisAutoConfiguration.class)
public static class Config{
@Bean(name = "redisTemplate")
public RedisTemplate< Object, Object > redisTemplate(RedisConnectionFactory factory) {
RedisTemplate< Object, Object > template = new RedisTemplate< >();
RedisSerializer< String > keySerializer = new StringRedisSerializer();
RedisSerializer< Object > valueSerializer = new ObjectRedisSerializer();
template.setConnectionFactory(factory);
template.setKeySerializer(keySerializer);
template.setValueSerializer(valueSerializer);
return template;
}
/**
* 消息監(jiān)聽器容器bean
* @param connectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer container(LettuceConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
@Bean
public RedisKeyExpirationListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer){
RedisKeyExpirationListener redisKeyExpirationListener = new RedisKeyExpirationListener(redisMessageListenerContainer);
redisKeyExpirationListener.setContext(context());
return redisKeyExpirationListener;
}
@Bean
public Context context(){
return new Context();
}
@Bean
public RedisTrigger redisTrigger(RedisTemplate redisTemplate){
return new RedisTrigger(redisTemplate, context());
}
class ObjectRedisSerializer implements RedisSerializer{
@Override
public byte[] serialize(Object o) throws SerializationException {
return SerializeUtils.serialize(o);
}
@Override
public Object deserialize(byte[] bytes) throws SerializationException {
return SerializeUtils.deserialize(bytes);
}
}
}
public static class RedisTrigger implements ScheduleTrigger{
private RedisTemplate redisTemplate;
private Context context;
public RedisTrigger(RedisTemplate redisTemplate, Context context){
this.redisTemplate = redisTemplate;
this.context = context;
}
public void schedule(DelayTask delayTask){
context.put(EXPIRATION_KEY, delayTask);
redisTemplate.opsForValue().set(EXPIRATION_KEY, delayTask, delayTask.getDelaySeconds(), TimeUnit.SECONDS);
}
}
@Slf4j
public static class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
private Context context;
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/**
* 這里沒法拿到過期值
* @param message never {@literal null}.
*/
@SneakyThrows
@Override
public void doHandleMessage(Message message) {
try {
String topic = new String(message.getChannel(), "utf-8");
String key = new String(message.getBody(), "utf-8");
if (EXPIRATION_KEY.equals(key)) {
Object object = context.get(EXPIRATION_KEY);
if( object instanceof DelayTask ){
log.info("redis key[{}] 過期回調(diào)", key);
((DelayTask) object).execute();
}
}
} catch (Exception e) {
log.error("處理Redis延遲任務(wù)異常:{}", e.getMessage() ,e);
}
}
public void setContext(Context context) {
this.context = context;
}
}
public static class Context{
private Map< String,Object > context = new ConcurrentHashMap< >();
public void put(String key, Object value){
context.put(key, value);
}
public Object get(String key){
return context.get(key);
}
}
}
- MQ延遲隊列
這里MQ選擇的是RabbitMq,要知道在RabbitMq中是沒有延遲隊列的,但可以通過延遲消息插件rabbitmq_delayed_message_exchange實現(xiàn),另外一種是基于死信來實現(xiàn)。
什么時候消息進(jìn)入死信?
- 1)消息消費方調(diào)用了basicNack() 或 basicReject(),并且參數(shù)都是 requeue = false,則消息會路由進(jìn)死信隊列
- 2)消息消費過期,過了TTL(消息、或隊列設(shè)置超時時間) 存活時間,就是消費方在 TTL 時間之內(nèi)沒有消費,則消息會路由進(jìn)死信隊列
- 3)隊列設(shè)置了x-max-length 最大消息數(shù)量且當(dāng)前隊列中的消息已經(jīng)達(dá)到了這個數(shù)量,再次投遞,消息將被擠掉,被擠掉的消息會路由進(jìn)死信隊列
public class RabbitTimer{
@Configuration
@Import(RabbitAutoConfiguration.class)
public static class Config{
static final String TTL_EXCHANGE_FOR_SCHEDULE = "TTL_EXCHANGE_FOR_SCHEDULE";
static final String TTL_QUEUE_FOR_SCHEDULE = "TTL_QUEUE_FOR_SCHEDULE";
static final String TTL_ROUTING_KEY_FOR_SCHEDULE = "TTL_ROUTING_KEY_FOR_SCHEDULE";
static final String COMMON_QUEUE_FOR_SCHEDULE = "COMMON_QUEUE_FOR_SCHEDULE";
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable(TTL_QUEUE_FOR_SCHEDULE).build();
}
@Bean
public Exchange ttlExchange(){
return ExchangeBuilder.directExchange(TTL_EXCHANGE_FOR_SCHEDULE).build();
}
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with(TTL_ROUTING_KEY_FOR_SCHEDULE).noargs();
}
@Bean
public Queue commonQueue(){
return QueueBuilder.durable(COMMON_QUEUE_FOR_SCHEDULE)
.deadLetterExchange(TTL_EXCHANGE_FOR_SCHEDULE)
.deadLetterRoutingKey(TTL_ROUTING_KEY_FOR_SCHEDULE)
.build();
}
@Bean
public TtlMessageConsumer ttlMessageConsumer(){
return new TtlMessageConsumer();
}
@Bean
public RabbitTrigger rabbitTrigger(RabbitTemplate rabbitTemplate){
return new RabbitTrigger(rabbitTemplate);
}
}
@Slf4j
@RabbitListener(queues=TTL_QUEUE_FOR_SCHEDULE)
public static class TtlMessageConsumer{
@RabbitHandler
public void handle(byte [] message){
Object deserialize = SerializeUtils.deserialize(message);
if( deserialize instanceof DelayTask ){
((DelayTask) deserialize).execute();
}
}
}
public static class RabbitTrigger implements ScheduleTrigger{
@Autowired
private RabbitTemplate rabbitTemplate;
public RabbitTrigger(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void schedule(DelayTask delayTask){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration( String.valueOf(TimeUnit.SECONDS.toMillis(delayTask.getDelaySeconds())));
Message message = new Message(SerializeUtils.serialize(delayTask), messageProperties);
rabbitTemplate.send(COMMON_QUEUE_FOR_SCHEDULE, message);
}
}
}
- HashedWheelTimer
public class NettyTrigger implements ScheduleTrigger {
HashedWheelTimer timer = new HashedWheelTimer(200,
TimeUnit.MILLISECONDS,
100); // 時間輪中的槽數(shù)
/**
*
*/
@Override
public void schedule(DelayTask delayTask){
TimerTask task = timeout - > delayTask.execute();
//
timer.newTimeout(task, delayTask.getDelaySeconds(), TimeUnit.SECONDS);
}
}
測試:
ScheduleTrigger.schedule(DelayTask delayTask);
結(jié)束語
通過幾個簡單的示例了解延遲隊列的實現(xiàn)方式,可以根據(jù)實際業(yè)務(wù)場景以及應(yīng)用架構(gòu)做出合理的選擇。
-
數(shù)據(jù)
+關(guān)注
關(guān)注
8文章
6808瀏覽量
88743 -
服務(wù)器
+關(guān)注
關(guān)注
12文章
8958瀏覽量
85085 -
內(nèi)存
+關(guān)注
關(guān)注
8文章
2966瀏覽量
73815 -
定時器
+關(guān)注
關(guān)注
23文章
3231瀏覽量
114329 -
延遲
+關(guān)注
關(guān)注
1文章
70瀏覽量
13504
發(fā)布評論請先 登錄
相關(guān)推薦
評論