详细介绍Spring,基于RabbitMQ的延迟队列

望文生义,延迟队列就是步入该队列的音讯会被延迟花费的系列。而相似的行列,新闻一经入队了后来就能够被花费者及时花费。

何为延迟队列?

从名称想到所包涵的意义,延迟队列就是跻身该队列的新闻会被延缓花费的类别。而相似的行列,音讯一经入队了以往就能够被费用者即时成本。

什么样是延迟队列

延迟队列,即新闻发送之后,在一段时间之后延迟被花费端花费的音讯队列。举个例子大家发送一条音信,希望在半小时过后才足以被花费端成本到的这种意况中就足以用到延迟队列了。

推迟队列能做哪些?

延期队列能做怎样?

延迟队列多用来必要延期职业的场景。最遍布的是以下二种现象:

  1. 延期成本。比方:
    • 客户生成订单之后,须求过一段时间校验订单的支付情状,假诺订单仍未支付则必要立即地关闭订单。
    • 详细介绍Spring,基于RabbitMQ的延迟队列。客商注册成功之后,须要过一段时间举个例子七日后校验客户的施用景况,假如开掘客户活跃度异常的低,则发送邮件或许短信来提醒客商使用。
  2. 延期重试。比如费用者从队列里开销音讯时失利了,可是想要延迟一段时间后活动重试。

假诺不接纳延缓队列,那么大家只可以通过三个轮询扫描程序去做到。这种方案既不高雅,也不方便人民群众做成统一的劳务方便开辟人士使用。可是使用延迟队列的话,大家就足以简单地达成。

动用rabbitmq实现延迟队列

rabbitmq的3.6.本子中得以应用三个插件rabbitmq-delayed-message-exchange
创设一个的推移队列
*。

何以设置和行使rabbitmq能够参谋小编事先的稿子:

  • 《CentOS上安装RabbitMQ》
  • 《RabbitMQ的多种调换机》
  • 《RabbitMQ with Spring
    Boot》

延迟队列多用来须要延期工作的场景。最广泛的是以下二种现象:

哪些兑现?

别急,在下文中,大家将详细介绍怎么样接纳Spring
Boot
RabbitMQ来兑现延迟队列。

正文出现的亲自过问代码都已经push到Github饭店中:

在介绍具体的贯彻思路在此以前,大家先来介绍一下RabbitMQ的两性格形,三个是Time-To-Live
Extensions,另三个是Dead Letter Exchanges。

安装

下载
https://dl.bintray.com/rabbitmq/community-plugins/rabbitmq\_delayed\_message\_exchange-0.0.1.ez
插件安装到rabbitmq的插件目录 (平常是/usr/lib/rabbitmq/plugins 可能/usr/lib/rabbitmq/lib/rabbitmq_server-version/plugins 目录)

然后在rabbitmq-server运营的情事下,运转那条命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

推迟费用。举例:

Time-To-Live Extensions

RabbitMQ允许我们为新闻依旧队列设置TTL(time to
live),也正是晚点岁月。TTL注解了一条新闻可在队列中存活的最大日子,单位为纳秒。相当于说,当某条音信被设置了TTL可能当某条音讯进入了设置了TTL的行列时,这条音讯会在通过TTL秒后“过逝”,成为Dead
Letter。假诺既配置了音讯的TTL,又安顿了队列的TTL,那么十分小的百般值会被取用。更加多材质请查阅官方文书档案。

在spring应用中应用这几个特点

public static final String QUEUE_NAME = "delay_queue";

public static final String EXCHANGE_NAME = "delay_exchange";

@Bean
Queue queue() {    
      return new Queue(QUEUE_NAME, true);
}

// 定义一个延迟交换机
@Bean
CustomExchange delayExchange() {    
    Map<String, Object> args = new HashMap<String, Object>();    
    args.put("x-delayed-type", "direct");    
    return new CustomExchange(EXCHANGE_NAME, "x-delayed-message", true, false, args);
}

// 绑定队列到这个延迟交换机上
@Bean
Binding binding(Queue queue, CustomExchange delayExchange) {    
    return BindingBuilder.bind(queue).to(delayExchange).with(QUEUE_NAME).noargs();
}

地方定义了二个x-delayed-message项指标沟通机,由于Spring AMQP中并未有那个类其他沟通机,所以我们使用叁个CustomExchange来定义那些插件构建的调换机。

图片 1

查看那几个沟通机的习性

发送新闻到这几个交流机上:

MessageProperties properties = new MessageProperties();

properties.setDelay(5000); 

Message message = new Message("delay_test_message".getBytes(), properties);

rabbitTemplate.send(RabbitMQConfiguration.EXCHANGE_NAME, RabbitMQConfiguration.QUEUE_NAME, message);

自身傻眼的开掘,Spring
AMQP居然已经在措施上支撑了x-delay以此天性,不过奇异的是他们不曾提供适配延迟队列的置换机类,还供给和谐定义贰个CustomExchange。其余四只自身定义了叁个Listener类:

@Component
public class DelayListener {   

    // 消息转换器    
    @RabbitListener(queues = RabbitMQConfiguration.QUEUE_NAME)    
    public void consumer(Message message) {        
        System.out.println(new Date() + " ---> " + new String(message.getBody()));    
    }

}

自个儿发送出那条音信:

send amqp message in Mon Oct 09 20:46:02 CST 2017

在5秒未来,确实在接到到了那条新闻:

Mon Oct 09 20:46:07 CST 2017 —> delay_test_message

顾客生成订单之后,供给过一段时间校验订单的开垦意况,假若订单仍未支付则须要立即地关闭订单。

Dead Letter Exchange

刚才提到了,被安装了TTL的新闻在逾期后会成为Dead
Letter。其实在RabbitMQ中,一共有三种音信的“驾鹤归西”情势:

  1. 音信被拒绝。通过调用basic.reject恐怕basic.nack並且安装的requeue参数为false。
  2. 音信因为安装了TTL而过期。
  3. 音讯进入了一条已经达成最大尺寸的行列。

比方队列设置了Dead Letter Exchange,那么这个Dead
Letter就能够被再一次publish到Dead Letter Exchange,通过Dead Letter
Exchange路由到另外队列。越多质地请查阅官方文书档案。

rabbitmq之外的方案

前边小编也设想使用redisexpire指令同盟公布/订阅模型创设延迟队列,末了开掘redis成就这么些地方有贰个一点都不小的流弊,一旦音讯被订阅,费用端的服务未有拍卖成功,那么那条新闻就相当有比不小概率再也心余力绌管理了。除却,RocketMQ放到了延迟队列的意义,可是只可以支持1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h那样三种固定时期的推迟新闻,自定义不是可怜有利。

客户注册成功之后,需求过一段时间比如七日后校验客户的行使意况,假诺开采纳户活跃度好低,则发送邮件大概短信来提醒顾客使用。

流程图

精明能干的您早晚已经想到了,如何将RabbitMQ的TTL和DLX本性结合在协同,完毕三个延迟队列。

针对于上述的延迟队列的四个场景,大家独家有以下二种流程图:

延期成本是延迟队列最为常用的选用方式。如下图所示,生产者发生的音信首先会进来缓冲队列。通过RabbitMQ提供的TTL增加,那么些音讯会被设置过期时间,也便是延迟开销的岁月。等音信过期从此,那么些音讯会通过配备好的DLX转载到实际费用队列,以此达到延迟花费的意义。

图片 2image

推迟重试本质上也是延迟开支的一种,不过这种方式的布局与日常的推移花费的流程图较为差别,所以单独拎出来介绍。

日常来讲图所示,花费者发掘该信息管理出现了丰富,举个例子是因为互联网波动引起的那几个。那么一旦不等待一段时间,直接就重试的话,很可能会产生在这里面内一贯不能成功,变成一定的财富浪费。那么大家得以将其先放在缓冲队列中,等新闻经过一段的延迟时间后再行进入实际花费队列中,此时由于已通过了“较长”的时刻了,格外的一部分骚动平常已经还原,那些消息能够被符合规律地花费。

图片 3image

接下去我们将介绍怎么样在Spring
Boot中贯彻基于RabbitMQ的延期队列。我们倘诺读者已经有所了Spring
Boot与RabbitMQ的基本知识。要是想急忙精通Spring
Boot的有关基础知识,能够参照笔者事先写的一篇小说。

延期重试。譬喻成本者从队列里消费消息时失败了,然而想要延迟一段时间后活动重试。

初阶化学工业程

第一大家在速龙lij中创立三个Spring
Boot工程,何况增加spring-boot-starter-amqp扩展。

一旦不利用延缓队列,那么我们只可以通过五个轮询扫描程序去做到。这种方案既倒霉看,也不方便人民群众做成统一的劳务方便人民群众开拓人士使用。不过采纳延迟队列的话,我们就能够轻便地完结。

配置队列

从上述的流程图中大家能够观看,一个延迟队列的兑现,须求一个缓冲队列以及三个实际的花费队列。又由于在RabbitMQ中,我们具备三种消息过期的安排情势,所以在代码中,大家总括安顿了三条队列:

  • delay_queue_per_message_ttl:TTL配置在消息上的缓冲队列。
  • delay_queue_per_queue_ttl:TTL配置在队列上的缓冲队列。
  • delay_process_queue:实际费用队列。

我们经过Java
Config的主意将上述的连串配置为Bean。由于大家增添了spring-boot-starter-amqp强大,Spring
Boot在运维时会根据大家的布局活动创造那些队列。为了便利接下去的测验,咱们将delay_queue_per_message_ttl以及delay_queue_per_queue_ttl的DLX配置为同三个,且过期的新闻都会通过DLX转载到delay_process_queue。

第一介绍delay_queue_per_message_ttl的配备代码:

@BeanQueue delayQueuePerMessageTTL() { return QueueBuilder.durable(DELAY_QUEUE_PER_MESSAGE_TTL_NAME) .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX,dead letter发送到的exchange .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key .build();}

其中,x-dead-letter-exchange评释了队列里的死信转发到的DLX名称,x-dead-letter-routing-key宣示了那么些死信在转会时指点的routing-key名称。

类似地,delay_queue_per_queue_ttl的安排代码:

@BeanQueue delayQueuePerQueueTTL() { return QueueBuilder.durable(DELAY_QUEUE_PER_QUEUE_TTL_NAME) .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key .withArgument("x-message-ttl", QUEUE_EXPIRATION) // 设置队列的过期时间 .build();}

delay_queue_per_queue_ttl队列的安顿比delay_queue_per_message_ttl队列的配备多了一个x-message-ttl,该配置用来设置队列的超时时间。

delay_process_queue的配置最为简练:

@BeanQueue delayProcessQueue() { return QueueBuilder.durable(DELAY_PROCESS_QUEUE_NAME) .build();}

什么样兑现?

配置Exchange

率先,大家必要配备DLX,代码如下:

@BeanDirectExchange delayExchange() { return new DirectExchange(DELAY_EXCHANGE_NAME);}

接下来再将该DLX绑定到实际费用队列即delay_process_queue上。那样具备的死信都会由此DLX被转载到delay_process_queue:

@BeanBinding dlxBinding(Queue delayProcessQueue, DirectExchange delayExchange) { return BindingBuilder.bind(delayProcessQueue) .to(delayExchange) .with(DELAY_PROCESS_QUEUE_NAME);}

从延迟重试的流程图中咱们得以看到,音信管理退步以往,大家供给将音讯转载到缓冲队列,所以缓冲队列也急需绑定叁个Exchange。在本例中,我们将delay_process_per_queue_ttl作为延迟重试里的缓冲队列。具体代码是怎么安插的,这里就不赘述了,我们能够查看本身Github中的代码。

别急,在下文中,大家将详细介绍怎么样选择Spring
Boot
RabbitMQ来兑现延迟队列。

概念花费者

大家成立贰个最简便的主顾ProcessReceiver,那几个花费者监听delay_process_queue队列,对于收受到的音讯,他会:

  • 一经消息里的音信体不等于FAIL_MESSAGE,那么他会输出音信体。
  • 假使音信里的音讯体恰好是FAIL_MESSAGE,那么他会效仿抛出十一分,然后将该音信重定向到缓冲队列。

另外,我们还供给新建贰个监听容器用于存放花费者,代码如下:

@BeanSimpleMessageListenerContainer processContainer(ConnectionFactory connectionFactory, ProcessReceiver processReceiver) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(DELAY_PROCESS_QUEUE_NAME); // 监听delay_process_queue container.setMessageListener(new MessageListenerAdapter(processReceiver)); return container;}

至此,大家松开的计划代码已经全副编写制定作而成功,接下去大家必要编写制定测量检验用例来测验大家的延期队列。

贯彻思路

编排测量检验用例

首先大家编辑用于测验TTL设置在音信上的测量检验代码。

咱俩借助spring-rabbit包下提供的RabbitTemplate类来发送消息。由于大家增加了spring-boot-starter-amqp恢宏,Spring
Boot会在早先化时自动地将RabbitTemplate当成bean加载到容器中。

涸泽而渔了新闻的出殡难题,那么又该怎么为各种新闻设置TTL呢?这里我们须要信赖MessagePostProcessor。MessagePostProcessor平时用来安装新闻的Header以及音讯的属性。大家新建一个ExpirationMessagePostProcessor类来负担安装新闻的TTL属性:

/** * 设置消息的失效时间 */public class ExpirationMessagePostProcessor implements MessagePostProcessor { private final Long ttl; // 毫秒 public ExpirationMessagePostProcessor { this.ttl = ttl; } @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties() .setExpiration(ttl.toString; // 设置per-message的失效时间 return message; }}

然后在调用RabbitTemplate的convertAndSend方法时,传入ExpirationMessagePostPorcessor就可以。大家向缓冲队列中发送3条消息,过期时光顺序为1秒,2秒和3秒。具体的代码如下所示:

@Testpublic void testDelayQueuePerMessageTTL() throws InterruptedException { ProcessReceiver.latch = new CountDownLatch; for (int i = 1; i <= 3; i++) { long expiration = i * 1000; rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_MESSAGE_TTL_NAME,  ("Message From delay_queue_per_message_ttl with expiration " + expiration), new ExpirationMessagePostProcessor(expiration)); } ProcessReceiver.latch.await();}

留意的意中人料定会问,为啥要在代码中加三个CountDownLatch呢?那是因为只要未有latch阻塞住测量试验方法的话,测量试验用例会直接结束,程序退出,大家就看不到音讯被延迟开支的呈现了。

那正是说看似地,测量检验TTL设置在队列上的代码如下:

@Testpublic void testDelayQueuePerQueueTTL() throws InterruptedException { ProcessReceiver.latch = new CountDownLatch; for (int i = 1; i <= 3; i++) { rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_QUEUE_TTL_NAME, "Message From delay_queue_per_queue_ttl with expiration " + QueueConfig.QUEUE_EXPIRATION); } ProcessReceiver.latch.await();}

大家向缓冲队列中发送3条音信。理论上那3条音讯会在4秒后还要过期。

我们一致还需测验延迟重试场景。

@Testpublic void testFailMessage() throws InterruptedException { ProcessReceiver.latch = new CountDownLatch; for (int i = 1; i <= 3; i++) { rabbitTemplate.convertAndSend(QueueConfig.DELAY_PROCESS_QUEUE_NAME, ProcessReceiver.FAIL_MESSAGE); } ProcessReceiver.latch.await();}

我们向delay_process_queue发送3条会触发FAIL的消息,理论上那3条新闻会在4秒后自动重试。

在介绍具体的贯彻思路从前,大家先来介绍一下RabbitMQ的七个特点,三个是Time-To-Live
Extensions,另多个是Dead Letter Exchanges。

翻开测验结果

推迟开销的风貌测量检验我们分为了TTL设置在音讯上和TTL设置在队列上三种。首先,我们先看一下TTL设置在音信上的测量检验结果:

图片 4image

从上航海用教室中大家能够看来,ProcessReceiver分别通过1秒、2秒、3秒收到消息。测验结果申明音讯不但被推迟花费了,並且每条消息的延迟时间是足以被天性化设置的。TTL设置在音讯上的延期花费现象测量试验成功。然后,TTL设置在队列上的测量检验结果如下图:

图片 5image

从上海体育场合中大家能够看出,ProcessReceiver经过了4秒的推移之后,同临时间接受了3条音信。测验结果申明音讯不但被推迟成本了,同有毛病间也认证了当TTL设置在队列上的时候,音信的晚点时间是定点的。TTL设置在队列上的推移成本现象测验成功。

接下去,大家再来看一下推迟重试的测验结果:

图片 6image

ProcessReceiver首先接受了3条会触发FAIL的新闻,然后将其活动到缓冲队列之后,过了4秒,又接到了刚刚的那3条音信。延迟重试场景测量试验成功。

本文首先介绍了延迟队列的定义以及用途,况兼经过代码详细批注了如何通过Spring
Boot和RabbitMQ实现贰个延缓队列。希望本文可以对我们通常的就学和办事能有所启发和帮衬。有何样观点或许难点迎接在议论纷纷下方留言,多谢!

本文首发于

Time-To-Live Extensions

RabbitMQ允许大家为新闻照旧队列设置TTL(time to
live),也便是逾期时光。TTL评释了一条新闻可在队列中幸存的最大时间,单位为微秒。也等于说,当某条新闻棉被服装置了TTL可能当某条音讯进入了安装了TTL的系列时,那条消息会在经过TTL秒后“驾鹤归西”,成为Dead
Letter。若是既配置了音信的TTL,又铺排了队列的TTL,那么十分小的不胜值会被取用。更加多质感请查阅
官方文书档案 。

Dead Letter Exchange

刚刚提到了,被安装了TTL的音信在逾期后会成为Dead
Letter。其实在RabbitMQ中,一共有三种音讯的“与世长辞”情势:

音讯被驳回。通过调用basic.reject也许basic.nack并且安装的requeue参数为false。

音信因为设置了TTL而过期。

音讯步向了一条已经达到规定的标准最大尺寸的行列。

要是队列设置了Dead Letter Exchange,那么这一个Dead
Letter就能够被重新publish到Dead Letter Exchange,通过Dead Letter
Exchange路由到任何队列。更多材料请查阅 官方文书档案 。

流程图

智慧的你势必已经想到了,怎么样将RabbitMQ的TTL和DLX个性结合在一同,完毕八个推迟队列。

本着于上述的延迟队列的多少个现象,大家分别有以下二种流程图:

推迟开支

延期消费是延迟队列最为常用的利用情势。如下图所示,生产者发生的新闻首先会跻身缓冲队列。通过RabbitMQ提供的TTL扩张,那个消息会被设置过期时间,也正是延迟开支的时间。等新闻过期过后,这个音讯会通过配备好的DLX转发到骨子里花费队列,以此达到延迟开支的效率。

图片 7Java架构进级群:554355695

延期重试

延迟重试本质上也是延迟开销的一种,可是这种方式的构造与平日的延迟花费的流程图较为差别,所以单独拎出来介绍。

正如图所示,成本者开掘该新闻管理出现了特别,比方是因为互连网波动引起的拾壹分。那么一旦不等待一段时间,直接就重试的话,很恐怕会招致在那中间内平昔不或许成功,形成一定的财富浪费。那么大家得以将其先放在缓冲队列中,等新闻经过一段的延迟时间后重新步入实际花费队列中,此时是因为已透过了“较长”的年月了,十分的局地不定日常已经复苏,这一个消息能够被符合规律地花费。

图片 8Java架构进级群:554355695

一旦您想上学Java工程化、高品质及布满式、高质量、深入显出。质量调优、Spring,MyBatis,Netty源码解析和大数据等知识点能够来找作者。

而前几天自己就有三个平台能够提须求你们学习,令你在实践中积聚经验了然规律。首要矛头是JAVA架构师。假如你想拿高薪,想突破瓶颈,想跟外人竞争能获得优势的,想进BAT可是有忧郁面试然而的,能够加小编的Java架构进级群:554355695

代码达成

接下去我们将介绍怎么着在Spring
Boot中贯彻基于RabbitMQ的延期队列。大家只要读者已经具有了Spring
Boot与RabbitMQ的基本知识。借使想飞快明白Spring
Boot的有关基础知识,可以参照作者在此以前写的一篇小说。

初阶化学工业程

先是我们在英特尔lij中创设叁个Spring
Boot工程,何况拉长spring-boot-starter-amqp增加。

配备队列

从上述的流程图中大家能够见到,七个延迟队列的落到实处,须求一个缓冲队列以及一个实际上的开销队列。又由于在RabbitMQ中,我们有着二种音讯过期的布署格局,所以在代码中,我们一共布置了三条队列:

delay_queue_per_message_ttl:TTL配置在音讯上的缓冲队列。

发表评论

电子邮件地址不会被公开。 必填项已用*标注