提交 e0c3bfb3 authored 作者: huyufan's avatar huyufan

修改BUG

上级 721ba393
package com.clx.performance.config;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
//常用的三个配置如下
//1---设置手动应答(acknowledge-mode: manual)
// 2---设置生产者消息发送的确认回调机制 ( #这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调
// publisher-confirm-type: correlated
// #保证交换机能把消息推送到队列中
// publisher-returns: true
// template:
// #以下是rabbitmqTemplate配置
// mandatory: true)
// 3---设置重试
//TODO rabbitMQ 配置,后面压测可能会调整相关参数,未完善
@Configuration
@Order(-1)
public class RabbitConfig implements RabbitListenerConfigurer {
@Autowired
private ConnectionFactory rabbitConnectionFactory;
//@Bean 缓存连接池
//public CachingConnectionFactory rabbitConnectionFactory
@Autowired
private RabbitProperties properties;
//这里因为使用自动配置的connectionFactory,所以把自定义的connectionFactory注解掉
// 存在此名字的bean 自带的连接工厂会不加载(也就是说yml中rabbitmq下一级不生效),如果想自定义来区分开 需要改变bean 的名称
// @Bean
// public ConnectionFactory connectionFactory() throws Exception {
// //创建工厂类
// CachingConnectionFactory cachingConnectionFactory=new CachingConnectionFactory();
// //用户名
// cachingConnectionFactory.setUsername("gust");
// //密码
// cachingConnectionFactory.setPassword("gust");
// //rabbitMQ地址
// cachingConnectionFactory.setHost("127.0.0.1");
// //rabbitMQ端口
// cachingConnectionFactory.setPort(Integer.parseInt("5672"));
//
// //设置发布消息后回调
// cachingConnectionFactory.setPublisherReturns(true);
// //设置发布后确认类型,此处确认类型为交互
// cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
//
// cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
// return cachingConnectionFactory;
// }
@Bean
public MessageHandlerMethodFactory messageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
return messageHandlerMethodFactory;
}
@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
return new MappingJackson2MessageConverter();
}
// 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的listener下的simple配置),如果想自定义来区分开 需要改变bean 的名称
// @Bean
// public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
// SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
// containerFactory.setConnectionFactory(rabbitConnectionFactory);
//
// // 预加载消息数量 -- QOS
// containerFactory.setPrefetchCount(1);
// // 应答模式(此处设置为手动)
// containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// //消息序列化方式
// containerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
// // 设置通知调用链 (这里设置的是重试机制的调用链)
// containerFactory.setAdviceChain(
// RetryInterceptorBuilder
// .stateless()
// .recoverer(new RejectAndDontRequeueRecoverer())
// //.retryOperations(rabbitRetryTemplate())
// .build()
// );
// return containerFactory;
// }
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(SimpleRabbitListenerContainerFactory connectionFactory) {
SimpleMessageListenerContainer bean = connectionFactory.createListenerContainer();
return bean;
}
// 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的template的配置),如果想自定义来区分开 需要改变bean 的名称
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
//默认是用jdk序列化
//数据转换为json存入消息队列,方便可视化界面查看消息数据
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
//此处设置重试template后,会再生产者发送消息的时候,调用该template中的调用链
// rabbitTemplate.setRetryTemplate(rabbitRetryTemplate());
//CorrelationData correlationData, boolean b, String s
// rabbitTemplate.setConfirmCallback(
// new BaseConfirmCallback(messageRecordComponent));
// rabbitTemplate.setReturnCallback(
// new BaseReturnCallback(messageRecordComponent));
return rabbitTemplate;
}
//重试的Template
//
// @Bean
// public ExponentialBackOffPolicy backOffPolicyByProperties() {
// ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
// long maxInterval = properties.getListener().getSimple().getRetry().getMaxInterval().getSeconds();
// long initialInterval = properties.getListener().getSimple().getRetry().getInitialInterval().getSeconds();
// double multiplier = properties.getListener().getSimple().getRetry().getMultiplier();
// // 重试间隔
// backOffPolicy.setInitialInterval(initialInterval * 1000);
// // 重试最大间隔
// backOffPolicy.setMaxInterval(maxInterval * 1000);
// // 重试间隔乘法策略
// backOffPolicy.setMultiplier(multiplier);
// return backOffPolicy;
// }
//
// @Bean
// public SimpleRetryPolicy retryPolicyByProperties() {
// SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
// int maxAttempts = properties.getListener().getSimple().getRetry().getMaxAttempts();
// retryPolicy.setMaxAttempts(maxAttempts);
// return retryPolicy;
// }
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}
}
\ No newline at end of file
package com.clx.performance;
import com.clx.order.feign.OrderFeign;
import com.clx.performance.constant.RabbitKeyConstants;
import com.clx.performance.mapper.OrderGoodsMapper;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
......@@ -17,8 +21,15 @@ public class JobTest {
@Autowired
private OrderFeign orderFeign;
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test1() {
orderFeign.updateOrderInfoResidueWeight(5,new BigDecimal(32));
Message message = MessageBuilder.withBody("PT2023091900049".getBytes()).build();
message.getMessageProperties().setExpiration("10000");
rabbitTemplate.send(
RabbitKeyConstants.ORDER_GOODS_ON_EXCHANGE, RabbitKeyConstants.ORDER_GOODS_ON_ROUTE_KEY, message
);
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论