提交 58b4d690 authored 作者: huyufan's avatar huyufan

开发:增加订单状态MQ

上级 6a59458b
package com.clx.order.params;
import io.swagger.annotations.ApiModelProperty;
import lombok.Getter;
import lombok.Setter;
import java.math.BigDecimal;
import java.time.LocalDateTime;
@Getter
@Setter
public class OrderGoodsChildParams {
@ApiModelProperty("挂单运费")
private BigDecimal pendingOrderFreight;
@ApiModelProperty("提取方式 1提取全部 2提取部分")
private Integer extractWay;
@ApiModelProperty("提取吨数")
private BigDecimal extractWeight;
@ApiModelProperty("需要车辆/辆")
private Integer needTruckNum;
@ApiModelProperty("挂单方式 1公开派单 2定向派单")
private Integer pendingOrderWay;
@ApiModelProperty(value = "最晚到达货源地时间/运单的最晚装货时间", example = "2023-09-09 12:00:01", required = true)
private String lastArriveSendTime;
@ApiModelProperty(value = "挂单时间", example = "2023-09-09 12:00:01", required = true)
private String pendingOrderTime;
@ApiModelProperty("高级物流经理id")
private Integer seniorLogisticsManagerId;
@ApiModelProperty("高级物流经理姓名")
private String seniorLogisticsManagerName;
@ApiModelProperty(value = "定向运单失效时间", example = "2023-09-09 12:00:01", required = true)
private String directionalExpireTime;
@ApiModelProperty("创建人用户编号")
private Long userNo;
@ApiModelProperty("创建人姓名")
private String userName;
}
package com.clx.order.params;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import javax.validation.constraints.NotBlank;
import java.util.List;
@Data
public class OrderGoodsParams {
@ApiModelProperty("订单编号")
@NotBlank(message = "订单编号不能为空")
private String orderNo;
private List<OrderGoodsChildParams> orderGoodsChildParams;
}
package com.clx.order.params;
import com.msl.common.base.PageParam;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class PageCarrierOrderListParam extends PageParam {
@ApiModelProperty(value = "订单编号")
private String orderNo;
@ApiModelProperty(value = "联系人(货主)id")
private Integer contactId;
@ApiModelProperty(value = "订单状态:2报价已确认 3平台已承接 7 挂单中 8运输中 11 挂单中")
private Integer orderStatus;
@ApiModelProperty(value = "开始时间", example = "2023-09-09 12:00:01", required = true)
private String beginTime;
@ApiModelProperty(value = "结束", example = "2023-09-09 12:00:01", required = true)
private String endTime;
@ApiModelProperty(value = "时间类型: 1创建时间 2拉运开始时间 3拉运结束时间 ")
@NotNull(message = "时间类型不能为空")
private Integer timeType;
@ApiModelProperty(value = "用车类型: 1平台车辆 2部分平台车辆 3自有车辆")
private Integer truckDemand;
}
package com.clx.performance.config;
import com.clx.performance.constant.RabbitKeyConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
*
* @author xujianke
* @date 2017年9月14日
* @description rabbit配置文件
*/
@Configuration
public class RabbitBeanConfig {
/**
* 订单已挂单队列
*/
@Bean
public Queue orderPostedQueue() {
return new Queue(RabbitKeyConstants.ORDER_POSTED_QUEUE, true);
}
/**
* 订单已挂单交换机
**/
@Bean
public DirectExchange orderPostedExchange() {
return new DirectExchange(RabbitKeyConstants.ORDER_POSTED_EXCHANGE);
}
/**
* 订单已挂单绑定
*/
@Bean
public Binding orderPostedExchangeBind() {
return BindingBuilder.bind(orderPostedQueue()).to(orderPostedExchange()).with(RabbitKeyConstants.ORDER_POSTED_ROUTE_KEY);
}
@Bean
public Queue orderOnQueue() {
Map<String, Object> params = new HashMap<>(6);
params.put("x-dead-letter-exchange", RabbitKeyConstants.ORDER_ON_DEAD_EXCHANGE);
params.put("x-dead-letter-routing-key", RabbitKeyConstants.ORDER_ON_ROUTE_KEY);
return new Queue(RabbitKeyConstants.ORDER_ON_QUEUE, true, false, false, params);
}
/**
* 订单挂单中交换机
**/
@Bean
public DirectExchange orderOnExchange() {
return new DirectExchange(RabbitKeyConstants.ORDER_ON_EXCHANGE);
}
/**
* 订单挂单中绑定
*/
@Bean
public Binding orderOnExchangeBind() {
return BindingBuilder.bind(orderOnQueue()).to(orderOnExchange()).with(RabbitKeyConstants.ORDER_ON_ROUTE_KEY);
}
/**
* 死信队列:死信队列处理延迟消息
* @return
*/
@Bean
public Queue orderOnDeadQueue() {
return new Queue(RabbitKeyConstants.ORDER_ON_DEAD_QUEUE, true, false, false);
}
/**
* 订单挂单中交换机:死信队列处理延迟消息
**/
@Bean
public DirectExchange orderOnDeadExchange() {
return new DirectExchange(RabbitKeyConstants.ORDER_ON_DEAD_EXCHANGE);
}
/**
* 订单挂单中绑定:死信队列处理延迟消息
*/
@Bean
public Binding orderDeadExchangeBind() {
return BindingBuilder.bind(orderOnDeadQueue()).to(orderOnDeadExchange()).with(RabbitKeyConstants.ORDER_ON_ROUTE_KEY);
}
}
package com.clx.performance.config; package com.clx.performance.config;
import com.clx.performance.constant.RabbitKeyConstants; import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding; import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import java.util.HashMap; //常用的三个配置如下
import java.util.Map; //1---设置手动应答(acknowledge-mode: manual)
// 2---设置生产者消息发送的确认回调机制 ( #这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调
/** // publisher-confirm-type: correlated
* // #保证交换机能把消息推送到队列中
* @author xujianke // publisher-returns: true
* @date 2017年9月14日 // template:
* @description rabbit配置文件 // #以下是rabbitmqTemplate配置
*/ // mandatory: true)
// 3---设置重试
//TODO rabbitMQ 配置,后面压测可能会调整相关参数,未完善
@Configuration @Configuration
public class RabbitConfig { @Order(-1)
/** public class RabbitConfig implements RabbitListenerConfigurer {
* 订单已挂单队列
*/ @Autowired
@Bean private ConnectionFactory rabbitConnectionFactory;
public Queue orderPostedQueue() {
return new Queue(RabbitKeyConstants.ORDER_POSTED_QUEUE, true);
} //@Bean 缓存连接池
/** //public CachingConnectionFactory rabbitConnectionFactory
* 订单已挂单交换机
**/ @Autowired
private RabbitProperties properties;
//这里因为使用自动配置的connectionFactory,所以把自定义的connectionFactory注解掉
// 存在此名字的bean 自带的连接工厂会不加载(也就是说yml中rabbitmq下一级不生效),如果想自定义来区分开 需要改变bean 的名称
// @Bean
// public ConnectionFactory connectionFactory() throws Exception {
// //创建工厂类
// CachingConnectionFactory cachingConnectionFactory=new CachingConnectionFactory();
// //用户名
// cachingConnectionFactory.setUsername("gust");
// //密码
// cachingConnectionFactory.setPassword("gust");
// //rabbitMQ地址
// cachingConnectionFactory.setHost("127.0.0.1");
// //rabbitMQ端口
// cachingConnectionFactory.setPort(Integer.parseInt("5672"));
//
// //设置发布消息后回调
// cachingConnectionFactory.setPublisherReturns(true);
// //设置发布后确认类型,此处确认类型为交互
// cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
//
// cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
// return cachingConnectionFactory;
// }
@Bean @Bean
public DirectExchange orderPostedExchange() { public MessageHandlerMethodFactory messageHandlerMethodFactory() {
return new DirectExchange(RabbitKeyConstants.ORDER_POSTED_EXCHANGE); DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
return messageHandlerMethodFactory;
} }
/**
* 订单已挂单绑定
*/
@Bean @Bean
public Binding orderPostedExchangeBind() { public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
return BindingBuilder.bind(orderPostedQueue()).to(orderPostedExchange()).with(RabbitKeyConstants.ORDER_POSTED_ROUTE_KEY); return new MappingJackson2MessageConverter();
} }
// 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的listener下的simple配置),如果想自定义来区分开 需要改变bean 的名称
@Bean @Bean
public Queue orderOnQueue() { public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
Map<String, Object> params = new HashMap<>(6); SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
params.put("x-dead-letter-exchange", RabbitKeyConstants.ORDER_ON_DEAD_EXCHANGE); containerFactory.setConnectionFactory(rabbitConnectionFactory);
params.put("x-dead-letter-routing-key", RabbitKeyConstants.ORDER_ON_ROUTE_KEY);
return new Queue(RabbitKeyConstants.ORDER_ON_QUEUE, true, false, false, params);
}
/** // 预加载消息数量 -- QOS
* 订单挂单中交换机 containerFactory.setPrefetchCount(1);
**/ // 应答模式(此处设置为手动)
@Bean containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
public DirectExchange orderOnExchange() { //消息序列化方式
return new DirectExchange(RabbitKeyConstants.ORDER_ON_EXCHANGE); containerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
// 设置通知调用链 (这里设置的是重试机制的调用链)
containerFactory.setAdviceChain(
RetryInterceptorBuilder
.stateless()
.recoverer(new RejectAndDontRequeueRecoverer())
//.retryOperations(rabbitRetryTemplate())
.build()
);
return containerFactory;
} }
/**
* 订单挂单中绑定
*/
@Bean @Bean
public Binding orderOnExchangeBind() { public SimpleMessageListenerContainer simpleMessageListenerContainer(SimpleRabbitListenerContainerFactory connectionFactory) {
return BindingBuilder.bind(orderOnQueue()).to(orderOnExchange()).with(RabbitKeyConstants.ORDER_ON_ROUTE_KEY); SimpleMessageListenerContainer bean = connectionFactory.createListenerContainer();
return bean;
} }
/** // 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的template的配置),如果想自定义来区分开 需要改变bean 的名称
* 死信队列:死信队列处理延迟消息
* @return
*/
@Bean @Bean
public Queue orderOnDeadQueue() { public RabbitTemplate rabbitTemplate() {
return new Queue(RabbitKeyConstants.ORDER_ON_DEAD_QUEUE, true, false, false); RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
//默认是用jdk序列化
//数据转换为json存入消息队列,方便可视化界面查看消息数据
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
//此处设置重试template后,会再生产者发送消息的时候,调用该template中的调用链
// rabbitTemplate.setRetryTemplate(rabbitRetryTemplate());
//CorrelationData correlationData, boolean b, String s
// rabbitTemplate.setConfirmCallback(
// new BaseConfirmCallback(messageRecordComponent));
// rabbitTemplate.setReturnCallback(
// new BaseReturnCallback(messageRecordComponent));
return rabbitTemplate;
} }
/** //重试的Template
* 订单挂单中交换机:死信队列处理延迟消息
**/
@Bean @Bean
public DirectExchange orderOnDeadExchange() { public ExponentialBackOffPolicy backOffPolicyByProperties() {
return new DirectExchange(RabbitKeyConstants.ORDER_ON_DEAD_EXCHANGE); ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
long maxInterval = properties.getListener().getSimple().getRetry().getMaxInterval().getSeconds();
long initialInterval = properties.getListener().getSimple().getRetry().getInitialInterval().getSeconds();
double multiplier = properties.getListener().getSimple().getRetry().getMultiplier();
// 重试间隔
backOffPolicy.setInitialInterval(initialInterval * 1000);
// 重试最大间隔
backOffPolicy.setMaxInterval(maxInterval * 1000);
// 重试间隔乘法策略
backOffPolicy.setMultiplier(multiplier);
return backOffPolicy;
} }
/**
* 订单挂单中绑定:死信队列处理延迟消息
*/
@Bean @Bean
public Binding orderDeadExchangeBind() { public SimpleRetryPolicy retryPolicyByProperties() {
return BindingBuilder.bind(orderOnDeadQueue()).to(orderOnDeadExchange()).with(RabbitKeyConstants.ORDER_ON_ROUTE_KEY); SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
int maxAttempts = properties.getListener().getSimple().getRetry().getMaxAttempts();
retryPolicy.setMaxAttempts(maxAttempts);
return retryPolicy;
} }
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}
} }
\ No newline at end of file
package com.clx.performance.controller.pc; package com.clx.performance.controller.pc;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import com.clx.order.feign.OrderFeign; import com.clx.order.feign.OrderFeign;
import com.clx.order.param.feign.FeignPageCarrierOrderListParam; import com.clx.order.param.feign.FeignPageCarrierOrderListParam;
...@@ -77,7 +78,7 @@ public class GoodsOrderController { ...@@ -77,7 +78,7 @@ public class GoodsOrderController {
@Autowired @Autowired
private OrderGoodsIdGenerate orderGoodsIdGenerate; private OrderGoodsIdGenerate orderGoodsIdGenerate;
@ApiOperation(value = "提交货单", notes = "<br>By:刘海泉") @ApiOperation(value = "提交货单", notes = "<br>By:胡宇帆")
@PostMapping("/saveGoodsOrder") @PostMapping("/saveGoodsOrder")
public Result<Object> saveGoodName(@RequestBody @Validated OrderGoodsParams orderGoodsParams) { public Result<Object> saveGoodName(@RequestBody @Validated OrderGoodsParams orderGoodsParams) {
String orderNo = orderGoodsParams.getOrderNo(); String orderNo = orderGoodsParams.getOrderNo();
...@@ -235,8 +236,10 @@ public class GoodsOrderController { ...@@ -235,8 +236,10 @@ public class GoodsOrderController {
log.error("redis 分布式锁释放异常!", e); log.error("redis 分布式锁释放异常!", e);
} }
} }
rabbitTemplate.send(RabbitKeyConstants.ORDER_POSTED_EXCHANGE, RabbitKeyConstants.ORDER_POSTED_ROUTE_KEY, MessageBuilder.withBody(orderNo.getBytes()).build()); JSONObject jsonObject = new JSONObject();
Message message = MessageBuilder.withBody(orderNo.getBytes()).build(); jsonObject.set("orderNo", orderNo);
rabbitTemplate.send(RabbitKeyConstants.ORDER_POSTED_EXCHANGE, RabbitKeyConstants.ORDER_POSTED_ROUTE_KEY, MessageBuilder.withBody(jsonObject.toString().getBytes()).build());
Message message = MessageBuilder.withBody(jsonObject.toString().getBytes()).build();
long epochMilli = sendLazyTime.toInstant(ZoneOffset.of("+8")).toEpochMilli(); long epochMilli = sendLazyTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
message.getMessageProperties().setExpiration(String.valueOf(epochMilli)); message.getMessageProperties().setExpiration(String.valueOf(epochMilli));
rabbitTemplate.send( rabbitTemplate.send(
...@@ -246,7 +249,7 @@ public class GoodsOrderController { ...@@ -246,7 +249,7 @@ public class GoodsOrderController {
return Result.ok(); return Result.ok();
} }
@ApiOperation(value = "创建货单-查看订单列表", notes = "<br>By:胡宇帆")
@PostMapping("/pageCarrierOrderList") @PostMapping("/pageCarrierOrderList")
@UnitCovert(param = false) @UnitCovert(param = false)
public Result<PageData<FeignPageOrderVO>> pageCarrierOrderList(@RequestBody @Validated PageCarrierOrderListParam param) { public Result<PageData<FeignPageOrderVO>> pageCarrierOrderList(@RequestBody @Validated PageCarrierOrderListParam param) {
......
...@@ -3,7 +3,7 @@ server: ...@@ -3,7 +3,7 @@ server:
spring: spring:
rabbitmq: rabbitmq:
host: 192.168.9.20 host: 39.101.187.243
port: 5672 port: 5672
username: clxmq username: clxmq
password: clxmq711 password: clxmq711
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论