提交 50950d03 authored 作者: 刘海泉's avatar 刘海泉

合并车辆轨迹监控相关代码

上级 f1718561
package com.clx.performance.config;
import com.clx.performance.constant.RabbitKeyTraceConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @Author: aiqingguo
* @Description: 轨迹 MQ
* @Date: 2023-10-12 16:27:30
* @Version: 1.0
*/
@Configuration
public class RabbitTraceConfig {
@Bean
public DirectExchange traceDefaultExchange() {
return new DirectExchange(RabbitKeyTraceConstants.DEFAULT_EXCHANGE);
}
@Bean
public Queue traceDefaultDelayQueue() {
Map<String, Object> params = new HashMap<>(8);
params.put("x-message-ttl", 15 * 60 * 1000);
params.put("x-max-length", 5000000);
params.put("x-dead-letter-exchange", RabbitKeyTraceConstants.DEFAULT_EXCHANGE);
params.put("x-dead-letter-routing-key", RabbitKeyTraceConstants.DEFAULT_DEAD_ROUTING_KEY);
return new Queue(RabbitKeyTraceConstants.DEFAULT_DELAY_QUEUE, true, false, false, params);
}
@Bean
public Binding traceDefaultDelayQueueBinding() {
return BindingBuilder.bind(traceDefaultDelayQueue()).to(traceDefaultExchange()).with(RabbitKeyTraceConstants.DEFAULT_DELAY_ROUTING_KEY);
}
@Bean
public Queue traceDefaultDeadQueue() {
return new Queue(RabbitKeyTraceConstants.DEFAULT_DEAD_QUEUE);
}
@Bean
public Binding traceDefaultDeadQueueBinding() {
return BindingBuilder.bind(traceDefaultDeadQueue()).to(traceDefaultExchange()).with(RabbitKeyTraceConstants.DEFAULT_DEAD_ROUTING_KEY);
}
/**
* 统计更新
*/
@Bean
public Queue traceTruckTraceSyncQueue() {
return new Queue(RabbitKeyTraceConstants.TRUCK_TRACE_SYNC_QUEUE);
}
@Bean
public Binding traceTruckTraceSyncQueueBinding() {
return BindingBuilder.bind(traceTruckTraceSyncQueue()).to(traceDefaultExchange()).with(RabbitKeyTraceConstants.TRUCK_TRACE_SYNC_ROUTING_KEY);
}
}
package com.clx.performance.listener.trace;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.clx.performance.common.MqDelay;
import com.clx.performance.common.MqWrapper;
import com.clx.performance.constant.RabbitKeyTraceConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @Author: 艾庆国
* @Description: 轨迹 mq
* @Date: 2023-10-20 10:30:49
* @Version: 1.0
*/
@Slf4j
@Component
public class TraceDelayListener {
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = RabbitKeyTraceConstants.DEFAULT_DEAD_QUEUE)
public void process(Message message) {
try{
log.info("轨迹-延迟, message:{}", message);
process(JSON.parseObject(new String(message.getBody()), new TypeReference<MqWrapper<MqDelay>>(){}).getData());
}catch (Exception e){
log.info("轨迹-延迟 失败",e);
}
}
void process(MqDelay mq){
Message message = MessageBuilder.withBody(JSON.toJSONString(mq.getData()).getBytes()).build();
rabbitTemplate.send(mq.getExchange(), mq.getRouterKey(), message);
}
}
package com.clx.performance.listener.trace;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.clx.performance.common.MqWrapper;
import com.clx.performance.constant.RabbitKeyTraceConstants;
import com.clx.performance.param.mq.trace.TruckTraceSyncMqParam;
import com.clx.performance.service.trace.TruckTraceMqHandlerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @Author: 艾庆国
* @Description: mq
* @Date: 2023-10-20 16:34:40
* @Version: 1.0
*/
@Slf4j
@Component
public class TruckTraceSyncListener {
@Autowired
private TruckTraceMqHandlerService truckTraceMqHandlerService;
@RabbitListener(queues = RabbitKeyTraceConstants.TRUCK_TRACE_SYNC_QUEUE)
public void process(Message message) {
try{
log.info("轨迹, 车辆轨迹同步, message:{}", new String(message.getBody()));
process(JSON.parseObject(new String(message.getBody()), new TypeReference<MqWrapper<TruckTraceSyncMqParam>>(){}).getData());
}catch (Exception e){
log.info("轨迹, 车辆轨迹同步失败",e);
}
}
private void process(TruckTraceSyncMqParam mq){
truckTraceMqHandlerService.truckTracSync(mq);
}
}
package com.clx.performance.service.impl.trace;
import com.clx.performance.dto.zjxl.TruckTraceDTO;
import com.clx.performance.esplus.mapper.TruckTraceESPlusMapper;
import com.clx.performance.esplus.model.TruckTraceESPlus;
import com.clx.performance.param.mq.trace.TruckTraceSyncMqParam;
import com.clx.performance.service.trace.TruckTraceMqHandlerService;
import com.clx.performance.utils.zjxl.ZjxlGpsService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Service
public class TruckTraceMqHandlerServiceImpl implements TruckTraceMqHandlerService {
@Autowired
private ZjxlGpsService zjxlGpsService;
@Autowired
private TruckTraceESPlusMapper truckTraceESPlusMapper;
@Transactional(rollbackFor = Exception.class)
@Override
public void truckTracSync(TruckTraceSyncMqParam mq) {
List<TruckTraceDTO> list = zjxlGpsService.getTruckTrace(mq.getTruckNo(), mq.getBeginTime(), mq.getEndTime());
if (list.isEmpty()) {return;}
List<TruckTraceESPlus> esList = new ArrayList<>();
for (TruckTraceDTO item : list) {
TruckTraceESPlus es = new TruckTraceESPlus();
es.setTruckNo(item.getTruckNo());
es.setAngle(item.getAngle());
es.setHeight(item.getHeight());
es.setLocation(new BigDecimal[]{item.getLocation()[0], item.getLocation()[1]});
es.setGpsTime(item.getGpsTime());
es.setMileage(item.getMileage());
es.setSpeed(item.getSpeed());
esList.add(es);
}
// 保存
truckTraceESPlusMapper.insertBatch(esList);
}
}
package com.clx.performance.service.trace;
import com.clx.performance.param.mq.trace.TruckTraceSyncMqParam;
public interface TruckTraceMqHandlerService {
void truckTracSync(TruckTraceSyncMqParam mq);
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论