java 简单折腾SpringBoot + RabbitMq
之前研究了生产者消费者模式,拓展到了消息队列这一概念。
本来我想折腾一下阿里的(现在归Apache)RocketMq(因为RocketMq是java实现的,不需要另外部署环境),但是SpringBoot官方提供的starter中暂时还没有提供RocketMq的支持,可能要折腾很久。所以我决定试用一下另外一个有名的消息队列RabbitMq。
一、部署RabbitMq
(1)在linux上部署RabbitMq
个人建议在linux上部署RabbitMq。可以参考:
http://www.itfanr.cc/2017/02/21/installation-rabbitmq-on-ubuntu/
如果要在别的操作系统上部署RabbitMq,可以参考官方文档。
(2)虚拟机中部署RabbitMq的要点
如果你像我一样使用虚拟机进行部署,记得开启桥接模式(不能使用NAT模式),并且启用本地连接中的虚拟机网卡。否则本机将无法和虚拟机通讯,连Xshell使用的22端口都无法访问,更不要说RabbitMq提供的5672服务端口和15672管理端口,所有尝试访问虚拟机的链接都会失败。
部署成功后,可以访问http://192.168.1.105(服务器地址):15672,输入注册的账号密码,进入RabbitMq的管理界面。
(3)桥接模式和NAT模式的区别
1.bridged networking(桥接模式)
在这种模式下,VMWare虚拟出来的操作系统就像是局域网中的一台独立的主机,它可以访问网内任何一台机器。
在桥接模式下,你需要手工为虚拟系统配置IP地址、子网掩码,而且还要和宿主机器处于同一网段,这样虚拟系统才能和宿主机器进行通信。同时,配置好网关和DNS的地址后,以实现通过局域网的网关或路由器访问互联。
2.network address translation(NAT模式)
使用NAT模式,就是让虚拟系统借助NAT(网络地址转换)功能,通过宿主机器所在的网络来访问公网。
也就是说,使用NAT模式可以实现在虚拟系统里访问互联网。NAT模式下的虚拟系统的TCP/IP配置信息是由VMnet8(NAT)虚拟网络的DHCP服务器提供的,无法进行手工修改,因此虚拟系统也就无法和本局域网中的其他真实主机进行通讯。
采用NAT模式最大的优势是虚拟系统接入互联网非常简单,只需要宿主机器能访问互联网,你不需要配置IP地址,子网掩码,网关,但是DNS地址需要根据实际情况填写。
二、整合SpringBoot和RabbitMq
项目结构为(重要的包和类被箭头指示):
(1)建立SpringBoot项目,在pom.xml中加入依赖
因为有别的需要,个人使用了如下依赖:
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>1.5.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <version>1.5.6.RELEASE</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.7.0</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.7.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>1.5.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <version>1.5.6.RELEASE</version> <scope>test</scope> </dependency> </dependencies> |
最重要的是spring-boot-starter-web(提供web框架服务)、spring-boot-starter-test(提供测试功能)、spring-boot-starter-amqp(整合消息队列)三个依赖,必须加入。
(2)在resources文件夹下建立一个配置文件
建立消息队列的配置文件:
application.properties
1 2 3 4 5 6 7 |
spring.application.name=spirng-boot-rabbitmq spring.rabbitmq.host=192.168.1.105 spring.rabbitmq.port=5672 spring.rabbitmq.username=xie4ever spring.rabbitmq.password=root spring.rabbitmq.publisher-confirms=true |
SpringBoot会自动加载resources文件夹下的配置文件。配置文件不要起奇怪的名字(比如RabbitMq-Config什么的,亲测不行),否则SpringBoot将无法识别,导致加载失败。
(3)写一个消息队列的配置类
RabbitConfig.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
package com.xie.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public Queue queue() { return new Queue("test"); } } |
在Spring中注入一个名为test的消息队列。
(4)使用消息队列
1.往消息队列中加入消息
写一个Sender类,相当于生产者消费者模式中的生产者,负责往消息队列中加入消息。
Sender.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
package com.xie.rabbitmq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class Sender { @Autowired private AmqpTemplate amqpTemplate; public void send(int i) { String context = "test " + i; System.out.println("Sender : " + context); this.amqpTemplate.convertAndSend("test", context); } } |
2.从消息队列中取出消息
写一个Receiver类,相当于生产者消费者模式中的消费者,负责往消息队列中取出消息。
Receiver.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
package com.xie.rabbitmq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "test") public class Receiver { @RabbitHandler public void process(String string) { System.out.println("Receiver : " + string); } } |
3.写一个测试类
TestRabbitMq.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
package com.xie.rabbitmq; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class TestRabbitMq { @Autowired private Sender sender; @Test public void test() throws Exception { for (int i = 0; i < 100; i++) { sender.send(i); } } } |
通过Sender类的send方法,往消息队列中加入100条消息。相应的,Receiver将收到完整的100条消息(因为Receiver只有一个,所有消息都需要由他进行处理)。
之后,可以通过Junit启动这个TestRabbitMq测试类。我们可以看到,Sender将把100条消息加入消费队列中,Receiver将收到一部分消息,还没完全收完测试就结束了(这是因为Junit测试的是test()方法,一旦test()方法执行完了,测试就结束了,不会等待Receiver收完全部消息)。
个人认为,这有一点点像redis的订阅发布模式,一旦发布者发布了一条消息,所有订阅者都会收到消息。不同之处在于,如果RabbitMq中存在多个Receiver,将“平均”获取消息(一个消息只由一个Receiver获取,类似负载均衡),分开进行处理。这也是消息队列的基本特性。
(只要写多几个Receiver,就可以看到这一现象)
至此,SpringBoot + RabbitMq已经整合完毕了。可以从官方文档获取更多使用方法。
三、总结
简单的整合了SpringBoot + RabbitMq。以后我将详细研究RabbitMq的用法。