提交 a9aff16b authored 作者: liruixin's avatar liruixin

dts监听订单表

上级 e2772cc7
package com.clx.performance.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
@Getter
@AllArgsConstructor
public enum DtsOperationTypeEnum {
// 数据操作类型
INSERT(1, "INSERT"),
UPDATE(2, "UPDATE"),
DELETE(3, "DELETE"),
DDL(4, "DDL"),
BEGIN(5, "BEGIN"),
COMMIT(6, "COMMIT"),
HEARTBEAT(7, "HEARTBEAT"),
CHECKPOINT(8, "CHECKPOINT"),
UNKNOWN(9, "UNKNOWN"),
;
private final Integer code;
private final String msg;
public static String getMsg(Integer code) {
return Arrays.stream(values())
.filter(e -> Objects.equals(e.code, code))
.findFirst()
.map(DtsOperationTypeEnum::getMsg)
.orElse("");
}
public static Optional<DtsOperationTypeEnum> getByMsg(String msg) {
return Stream.of(values()).filter(q -> Objects.equals(q.getMsg(), msg)).findFirst();
}
}
package com.clx.performance.dto.zjxl.dts;
import com.clx.performance.enums.DtsOperationTypeEnum;
import lombok.Data;
import java.io.Serializable;
import java.util.Map;
@Data
public class DataTransportDTO implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 数据库名称
*/
String databaseName;
/**
* 表名
*/
String tableName;
/**
* 操作类型
*/
DtsOperationTypeEnum operationType;
/**
* 操作记录id 非记录id
*/
String recordID;
/**
* 操作时间戳
*/
String recordTimestamp;
/**
* 修改之前数据
*/
Map<String, Object> beforeMap;
/**
* 修改之后数据
*/
Map<String, Object> afterMap;
}
package com.clx.performance.listener; package com.clx.performance.listener;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.clx.performance.constant.RabbitKeyConstants; import com.clx.performance.constant.RabbitKeyConstants;
import com.clx.performance.data.OrderChildData; import com.clx.performance.data.OrderChildData;
import com.clx.performance.dto.zjxl.dts.DataTransportDTO;
import com.clx.performance.enums.DtsOperationTypeEnum;
import com.clx.performance.enums.OrderChildEnum; import com.clx.performance.enums.OrderChildEnum;
import com.msl.common.utils.DtsMapConvertUtil; import com.msl.common.utils.DtsMapConvertUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Objects; import java.util.Objects;
...@@ -24,23 +25,20 @@ public class OrderChildDtsListener { ...@@ -24,23 +25,20 @@ public class OrderChildDtsListener {
String msg = new String(message.getBody()); String msg = new String(message.getBody());
log.info("DTS消息同步开始, database:clx_performance.order_child, msg:{}", msg); log.info("DTS消息同步开始, database:clx_performance.order_child, msg:{}", msg);
JSONObject object = JSON.parseObject(msg); DataTransportDTO dataTransportDTO = JSON.parseObject(msg, DataTransportDTO.class);
JSONObject beforeMap = object.getJSONObject("beforeMap"); OrderChildData before = DtsMapConvertUtil.convert(dataTransportDTO.getBeforeMap(), new OrderChildData());
JSONObject afterMap = object.getJSONObject("afterMap"); OrderChildData after = DtsMapConvertUtil.convert(dataTransportDTO.getAfterMap(), new OrderChildData());
OrderChildData before = DtsMapConvertUtil.convert(beforeMap, new OrderChildData()); if(Objects.equals(DtsOperationTypeEnum.INSERT.getCode(),dataTransportDTO.getOperationType().getCode())){
OrderChildData after = DtsMapConvertUtil.convert(afterMap, new OrderChildData());
if(Objects.isNull(before) && Objects.nonNull(after)){
//同步after数据 //同步after数据
System.out.println("新增:"+after); System.out.println("新增:"+after);
}else if(Objects.nonNull(before) && Objects.nonNull(after)){ }else if(Objects.equals(DtsOperationTypeEnum.UPDATE.getCode(),dataTransportDTO.getOperationType().getCode())){
if(OrderChildEnum.SYNC_STATUS_lIST.contains(after.getStatus()) && !Objects.equals(before.getStatus(),after.getStatus())){ if(OrderChildEnum.SYNC_STATUS_lIST.contains(after.getStatus()) && !Objects.equals(before.getStatus(),after.getStatus())){
//同步after数据 //同步after数据
System.out.println("修改:"+after); System.out.println("修改:"+after);
} }
}else if(Objects.nonNull(before) && Objects.isNull(after)){ }else if(Objects.equals(DtsOperationTypeEnum.UPDATE.getCode(),dataTransportDTO.getOperationType().getCode())){
//数据删除 //数据删除
System.out.println("删除:"+before); System.out.println("删除:"+before);
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论