对rabbitmq的封装,有几个目标:
1 提供send接口
2 提供consume接口
3 保证消息的事务性处理
所谓事务性处理,是指对一个消息的处理必须严格可控,必须满足原子性,只有两种可能的处理结果:
(1) 处理成功,从队列中删除消息
(2) 处理失败(网络问题,程序问题,服务挂了),将消息重新放回队列
为了做到这点,我们使用rabbitmq的手动ack模式,这个后面细说。
1 send接口
send接口相对简单,我们使用spring的RabbitTemplate来实现,代码如下:
2 consume接口
在consume接口中,会调用用户自己的MessageProcess,接口定义如下:
consume的实现相对来说复杂一点,代码如下:
3 保证消息的事务性处理
rabbitmq默认的处理方式为auto ack,这意味着当你从消息队列取出一个消息时,ack自动发送,mq就会将消息删除。而为了保证消息的正确处理,我们需要将消息处理修改为手动确认的方式。
(1) sender的手工确认模式
首先将ConnectionFactory的模式设置为publisherConfirms,如下
之后设置rabbitTemplate的confirmCallback,如下:
只有在消息处理成功后发送ack确认,或失败后发送nack使信息重新投递
4 自动重连机制
为了保证rabbitmq的高可用性,我们使用rabbitmq Cluster模式,并配合haproxy。这样,在一台机器down掉时或者网络发生抖动时,就会发生当前连接失败的情况,如果不对这种情况做处理,就会造成当前的服务不可用。
在spring-rabbitmq中,已实现了connection的自动重连,但是connection重连后,channel的状态并不正确。因此我们需要自己捕捉ShutdownSignalException异常,并重新生成channel。如下:
5 consumer线程池
在对消息处理的过程中,我们期望多线程并行执行来增加效率,因此对consumer做了一个线程池的封装。
线程池通过builder模式构造,需要准备如下参数:
核心循环也较为简单,代码如下:
6 使用示例
最后,我们还是用一个例子做结。
(1) 定义model
(3) sender示例
(4) MessageProcess(用户自定义处理接口)示例,本例中我们只是简单的将信息打印出来
(5) consumer示例
7 github地址
https://github.com/littlersmall/rabbitmq-access
到此这篇enoent解决办法rabbitmq(enoent解决办法 安卓)的文章就介绍到这了,更多相关内容请继续浏览下面的相关推荐文章,希望大家都能在编程的领域有一番成就!版权声明:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若内容造成侵权、违法违规、事实不符,请将相关资料发送至xkadmin@xkablog.com进行投诉反馈,一经查实,立即处理!
转载请注明出处,原文链接:https://www.xkablog.com/rfx/36693.html