基于Spring Boot自定义RabbitMQ配置实例
MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。下面通过一个实例来说明一下怎么去使用它,怎么体现出它的优势所在:
由于我是基于Spring Boot的来说明rabbitMQ的使用,所以首先我们需要配置好rabbitMQ。
依赖包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${springboot.version}</version>
<exclusions>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
</exclusion>
</exclusions>
</dependency>
配置文件准备
在 src/main/resources/下面添加如下properties文件:
application.properties
spring.profiles.active=@profiles.active@ rabbitMQ.host=${rabbitMQ.host} rabbitMQ.port=${rabbitMQ.port} rabbitMQ.username=${rabbitMQ.username} rabbitMQ.password=${rabbitMQ.password} rabbitMQ.exchange=${rabbitMQ.exchange}
application-develop.properties(测试用)
rabbitMQ.host=192.168.100.117 rabbitMQ.port=5672 rabbitMQ.username=guest rabbitMQ.password=guest rabbitMQ.exchange=xxx.xxx.xxx
application-production.properties(生产用)
rabbitMQ.host=192.168.100.117 rabbitMQ.port=5672 rabbitMQ.username=guest rabbitMQ.password=guest rabbitMQ.exchange=xxx.xxx.xxx
RabbitMQ自定义配置文件实现
在src/main/java/com.jmust.service.demo下创建”RabbitMQConfiguration.java”文件
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
@Configuration
public class RabbitMQConfiguration {
@Autowired
private Environment env;
@Bean
public RabbitTemplate instance() {
com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory = new com.rabbitmq.client.ConnectionFactory();
rabbitConnectionFactory.setUsername(env.getProperty("rabbitMQ.username"));
rabbitConnectionFactory.setPassword(env.getProperty("rabbitMQ.password"));
rabbitConnectionFactory.setHost(env.getProperty("rabbitMQ.host"));
rabbitConnectionFactory.setPort(Integer.parseInt(env.getProperty("rabbitMQ.port")));
ConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitConnectionFactory);
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setExchange(env.getProperty("rabbitMQ.exchange"));
return rabbitTemplate;
}
}
使用
1、自定义接口类(DemoMQResource)src/main/com.jmust.service.demo.resource,定义操作契约
import com.jmust.service.demo.resource.entity.DemoToMQ; @FunctionalInterface public interface DemoMQResource { void sendDemoMQ(DemoToMQ demoToMQ); }
2、实现接口类,业务逻辑包含在内(src/main/com.jmust.service.demo.resource)
import com.jmust.server.profile.ProfileProduction; import com.jmust.service.demo.LogFailMqUtil; import com.jmust.service.demo.resource.entity.DemoToMQ; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; @Component @ProfileProduction public class DemoMQResourceImpl implements DemoMQResource { private static Logger logger = LoggerFactory.getLogger(DemoMQResourceImpl.class); @Autowired private RabbitTemplate rabbitTemplate; @Override public void sendDemoMQ(DemoToMQ demoToMQ) { //发送mq MessageProperties messageProperties = new MessageProperties(); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); messageProperties.setContentEncoding("JSON"); messageProperties.setContentType( "xxx.xxx.xxx, xxx.xxx, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null"); String body = demoToMQ.toString(); Message message = new Message(body.getBytes(StandardCharsets.UTF_8), messageProperties); try { rabbitTemplate.send(message); logger.info("send success to mq:{}", body); } catch (Exception e) { logger.error("send fail to mq:", e); } } }
项目测试
import com.jmust.service.demo.resource.entity.DemoToMQ;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doThrow;
public class DemoMQResourceTest {
@InjectMocks
private DemoMQResourceImpl demoMQResourceImpl;
@Mock
private RabbitTemplate rabbitTemplate;
@BeforeClass
public void init() {
demoMQResourceImpl = new DemoMQResourceImpl();
MockitoAnnotations.initMocks(this);
}
@Test
public void sendMq() {
doThrow(new RuntimeException("send mq is validate")).when(rabbitTemplate).send(any(Message.class));
demoMQResourceImpl.sendDemoMQ(new DemoToMQ());
}
}