RabbitMQ【应用 01】SpringBoot集成RabbitMQ及设置RabbitMQ启动总开关
创始人
2025-05-29 08:22:15
0

1.SpringBoot集成RabbitMQ

1.1 依赖及配置

org.springframework.bootspring-boot-starter-amqp

spring:# 用于接收设备发送的数据rabbitmq:host: xxx.xx.xxx.xxxport: 5672username: guestpassword: guestmq-name: test# 确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated# 确认消息已发送到队列publisher-returns: true

1.2 消息监听与发送

  • 数据获取
@Component
@Slf4j
public class RabbitMessageQueueReceiver {@Autowiredprivate ConfigProperties configProperties;@Autowiredprivate AsyncConfig asyncConfig;@Autowiredprivate DataGsmEquipComparisonManager dataGsmEquipComparisonManager;@RabbitListener(queuesToDeclare = {@Queue(name = "${spring.rabbitmq.mq-name}", durable = "true")}, ackMode = "MANUAL")@RabbitHandler()public void receive(String msg, Channel channel, Message message) throws IOException, InterruptedException {// 获取消息体String jsonString = new String(message.getBody());// 处理数据格式Map dataMap = dealMessageData(jsonString);try {asyncConfig.taskExecutor().execute(() -> {// 根据数据类型处理消息【这里大家根据实际情况进行处理】DealMessageByType.getInstance().dispose(dataMap);});channel.basicQos(5);channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);} catch (Exception e) {e.printStackTrace();log.error("error message:" + jsonString);try {channel.basicQos(5);channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);} catch (Exception e1) {e1.printStackTrace();}}}
  • 数据发送
@Component
@Log4j
public class RabbitMessageQueueSender {public RabbitTemplate rabbitTemplate;public boolean sendMessage(String exchange, String routingKey, String message) {try {rabbitTemplate.convertAndSend(exchange, routingKey, message);} catch (Exception e) {e.printStackTrace();return false;}return true;}
}
  • 确认机制(消息发送到服务回调)
@Component
@Slf4j
public class RabbitmqSendCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {private RabbitTemplate rabbitTemplate;@PostConstructpublic void run() {if (rabbitTemplate != null) {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(this);}}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String failCause) {if (ack) {log.info("消息发送成功");} else {log.info("消息发送失败,进行容错处理");}log.info("消息发送到交换机时的回调函数, ack:" + ack + "FailCause 消息:" + failCause);}@Overridepublic void returnedMessage(ReturnedMessage returned) {log.info("消息从交换机发送到队列时失败的回调函数, 调用失败!!!" + returned);}
}

2.设置RabbitMQ启动总开关

SpringBoot 项目集成了 RabbitMQ 但是有时候又用不到它,比如说:

  • 开发跟 RabbitMQ 服务无关接口时,此时 MQ 服务如果未启动,会有报错信息不断打印出来。
  • 不同的用户部署时,有可能用不到 RabbitMQ,此时没有部署 MQ,启动项目时不能报错。

核心报错信息:

WARN  o.s.boot.actuate.amqp.RabbitHealthIndicator - Rabbit health check failed
Caused by: java.net.ConnectException: Connection refused: connect

详细报错信息:

[2023-03-16 11:18:11.456] traceId= [RMI TCP Connection(8)-xxx.xxx.xx.xxx] WARN  o.s.boot.actuate.amqp.RabbitHealthIndicator - Rabbit health check failed
org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connectat org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:61)at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:602)at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:725)at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:252)at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2173)at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2146)at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2126)at org.springframework.boot.actuate.amqp.RabbitHealthIndicator.getVersion(RabbitHealthIndicator.java:49)at org.springframework.boot.actuate.amqp.RabbitHealthIndicator.doHealthCheck(RabbitHealthIndicator.java:44)at org.springframework.boot.actuate.health.AbstractHealthIndicator.health(AbstractHealthIndicator.java:82)at org.springframework.boot.actuate.health.HealthIndicator.getHealth(HealthIndicator.java:37)at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:77)at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:40)at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:130)at org.springframework.boot.actuate.health.HealthEndpointSupport.getAggregateContribution(HealthEndpointSupport.java:141)at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:126)at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:95)at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:66)at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:71)at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:61)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:282)at org.springframework.boot.actuate.endpoint.invoke.reflect.ReflectiveOperationInvoker.invoke(ReflectiveOperationInvoker.java:74)at org.springframework.boot.actuate.endpoint.annotation.AbstractDiscoveredOperation.invoke(AbstractDiscoveredOperation.java:60)at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:122)at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:97)at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819)at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801)at javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1468)at javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76)at javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309)at javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401)at javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:829)at sun.reflect.GeneratedMethodAccessor212.invoke(Unknown Source)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:357)at sun.rmi.transport.Transport$1.run(Transport.java:200)at sun.rmi.transport.Transport$1.run(Transport.java:197)at java.security.AccessController.doPrivileged(Native Method)at sun.rmi.transport.Transport.serviceCall(Transport.java:196)at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:573)at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:834)at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:688)at java.security.AccessController.doPrivileged(Native Method)at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:687)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused: connectat java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:81)at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:476)at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:218)at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:200)at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:162)at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394)at java.net.Socket.connect(Socket.java:606)at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60)at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1223)at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1173)at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connectAddresses(AbstractConnectionFactory.java:640)at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connect(AbstractConnectionFactory.java:615)at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:565)... 50 common frames omitted

2.1 总开关配置

添加spring.rabbitmq.enable配置作为总开关:

spring:# 用于接收设备发送的数据rabbitmq:# rabbitmq 的自定义配置 enable 用于开启或关闭 rabbitmq 服务(false关闭,true开启)enable: truehost: 172.81.205.216port: 5672username: guestpassword: guestmq-name: ZRTZ_QUEUE_EFENCE_DEVICE_OBTAIN_STATUS# 确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated# 确认消息已发送到队列publisher-returns: true

2.2 关闭自动配置

@EnableRabbit
@SpringBootApplication(exclude = {RabbitAutoConfiguration.class})
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}

2.3 根据开关进行配置

/*** 用于管理 RabbitAutoConfiguration 是否配置*/
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.enable", havingValue = "true")
public class RabbitMessageQueueEnableAutoConfig extends RabbitAutoConfiguration {
}

2.4 消息监听与发送

  • 监听开关
@Component
@Slf4j
@Data
public class RabbitMessageQueueReceiverConfig {@Value("${spring.rabbitmq.enable}")private boolean enable;@Beanpublic RabbitMessageQueueReceiver initRabbitMessageQueueReceiver() {if (enable) {RabbitMessageQueueReceiver rabbitMessageQueueReceiver = new RabbitMessageQueueReceiver();log.info("【------已启用------】RabbitMessageQueueReceiver");return rabbitMessageQueueReceiver;} else {log.info("【------不启用------】RabbitMessageQueueReceiver");return null;}}
}// 监听代码【去掉@Component】
// @Component
@Log4j
public class RabbitMessageQueueSender {public RabbitTemplate rabbitTemplate;public boolean sendMessage(String exchange, String routingKey, String message) {try {rabbitTemplate.convertAndSend(exchange, routingKey, message);} catch (Exception e) {e.printStackTrace();return false;}return true;}
}
  • 消息发送及回调【添加 (required = false) 防止接口被调用出错】
// 消息发送
@Component
@Log4j
public class RabbitMessageQueueSender {@Autowired(required = false)public RabbitTemplate rabbitTemplate;public boolean sendMessage(String exchange, String routingKey, String message) {try {if (rabbitTemplate != null) {rabbitTemplate.convertAndSend(exchange, routingKey, message);} else {return false;}} catch (Exception e) {e.printStackTrace();return false;}return true;}
}// 发送回调
@Component
@Slf4j
public class RabbitmqSendCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {@Autowired(required = false)private RabbitTemplate rabbitTemplate;@PostConstructpublic void run() {if (rabbitTemplate != null) {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(this);}}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String failCause) {if (ack) {log.info("消息发送成功");} else {log.info("消息发送失败,进行容错处理");}log.info("消息发送到交换机时的回调函数, ack:" + ack + "FailCause 消息:" + failCause);}@Overridepublic void returnedMessage(ReturnedMessage returned) {log.info("消息从交换机发送到队列时失败的回调函数, 调用失败!!!" + returned);}
}

3.总结

  • 关闭自动配置。
  • 根据自定义的标志进行bean对象装配。
  • 防止未装配导致的报错。

相关内容

热门资讯

原创 撒... 文/阳阳 阳光明媚的五月,三亚的海边再次成为了央妈名嘴撒贝宁一家四口度假的热议焦点! 网友们...
松软香甜可口的绿豆饼 糕点美食... 松软香甜可口的绿豆饼 糕点美食 老人小孩都爱吃美食教程
夏天没食欲,试试这几道开胃菜,... 这天气一热起来,厨房简直成了桑拿房,连带着吃饭的胃口都跑得无影无踪。看着桌上热腾腾的饭菜,愣是提不起...
糊塌子 “山西黑暗料理”:本地... 在美食江湖里,山西糊塌子顶着 “黑暗料理” 的名号,却深受本地人追捧,一句 “配醋绝了” 道出其独特...
上海 “网红” 美食:咸蛋黄青... 上海这座国际化大都市,不仅有着繁华的都市风貌,更藏着令人垂涎的 “网红” 美食。咸蛋黄青团与鲜肉月饼...
原创 3... 一锅牛肉见江湖 "检验厨艺不是看炒菜,而是看炖肉"——这是国营饭店老张师傅退休前教我的真理。炖牛肉...
我妈包了40年蒸饺的秘诀!水温... 朋友们,你们有没有这种体验?外面买的蒸饺,热乎时勉强能吃,一凉就硬得像小石头,嚼得腮帮子都累!可我妈...
原创 这... 导语:这菜很不起眼,竟是天然“杀菌菜”!夏天要多吃,鲜嫩营养高,用来烧汤,鲜掉眉毛 大家好,我是傻姐...
迪士尼电影和电视业务部门再裁员... 迪士尼(DIS.US)正于电影和电视业务部门展开新一轮裁员,涉及数百人,凸显娱乐业萎缩态势未止。此次...
曲靖会泽举行“徒步牯牛寨、登顶... 大牯牛寨峰,位于曲靖市会泽县与昆明市东川区交界处,海拔4017.3米,峰状如昂首的牯牛,岿然屹立,昂...
不止是“东海小渔村”!——考古... 新华社北京6月3日电 6月3日,《新华每日电讯》发表题为《不止是“东海小渔村”!——考古描绘上海六千...
桂林攀岩:新手必选 5 条线路... 本文是桂林攀岩新手的实用攻略,围绕新手必选的 5 条线路、装备穿戴及保护绳使用展开。详细介绍了象鼻山...
原创 女... 编辑丨苏木 文丨苏木 本文陈述所有内容皆有可靠信息来源,赘述在文章结尾 “和医生说被蛇咬了,医生看...
土耳其“最佳旅游乡村”比尔吉 ... 位于土耳其İzmir(伊兹密尔)的历史名村Birgi(比尔吉)正在通过“SENTRUM项目”(基于可...
端午假期江苏接待游客超1240... 端午假期江苏接待游客超1240万人次 江南时报讯(记者 钱海盈) 记者从江苏省文旅厅了解到,端午假期...
广汽传祺向往S7“向往的生活”... 旗舰风范,以智驾智启春城。6月1日,广汽传祺向往S7“向往的生活”智趣家庭露营节昆明站在春城野奢户外...
西沙群岛旅游开放吗如何申请与行... 第一次听说西沙群岛时,我的脑海中浮现的是《中国国家地理》里描述的“玻璃海”与“珊瑚王国”。这片仅对中...
乐山美食街区火爆出圈,游客沉醉... 端午小长假,乐山两大美食地标——古韵流淌的上中顺特色街区和烟火升腾的张公桥美食街,化身人潮涌动的美食...