1、同步明细修改为根据时间范围进行同步

This commit is contained in:
llllon 2025-09-04 14:23:53 +08:00
parent 2fdde8355b
commit f8d2b24a0a
6 changed files with 687 additions and 543 deletions

View File

@ -201,42 +201,74 @@ public class ProductionDataSyncServiceImpl implements DataSyncService {
boolean hasMore;
RDSAPI rdsapi = null;
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String startDate = simpleDateFormat.format(startDated);
String endDate = simpleDateFormat.format(endDated);
try {
rdsapi = SDK.getCCAPI().getRDSAPI(ccId);
DBUtils.SUPPLY supply = rdsapi.getSupply();
String DBname = supply.getName();
LOGGER.info("数据库为:{}",DBname);
if ("ORACLE".equalsIgnoreCase(DBname)){
// 构建查询条件
StringBuilder conditionBuilder = new StringBuilder();
StringBuilder orderByBuilder = new StringBuilder(); // 用于构建排序子句
List<Object> params = new ArrayList<>(); // 存储查询参数
// 分区字段和时间字段组合查询条件
if (partitionField != null && !partitionField.isEmpty()) {
// 1. 查询最大分区值
String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName;
List<RowMap> maxPartitionResult = rdsapi.getMaps(maxPartitionSql);
// 计算时间范围并拆分为30天一组
List<Date[]> timeRanges = splitTimeRange(startDated, endDated, 30);
LOGGER.info("时间范围拆分为 {} 个查询区间", timeRanges.size());
if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) {
LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField);
return;
}
for (int i = 0; i < timeRanges.size(); i++) {
Date[] range = timeRanges.get(i);
String startDate = simpleDateFormat.format(range[0]);
String endDate = simpleDateFormat.format(range[1]);
String maxPartition = maxPartitionResult.get(0).getString("max_partition");
LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition);
LOGGER.info("正在处理第 {} 个时间区间: {} 至 {}", i + 1, startDate, endDate);
if ("ORACLE".equalsIgnoreCase(DBname)) {
// 构建查询条件
StringBuilder conditionBuilder = new StringBuilder();
StringBuilder orderByBuilder = new StringBuilder(); // 用于构建排序子句
List<Object> params = new ArrayList<>(); // 存储查询参数
// 添加分区条件
conditionBuilder.append(partitionField)
.append(" = '")
.append(maxPartition)
.append("'");
// 分区字段和时间字段组合查询条件
if (partitionField != null && !partitionField.isEmpty()) {
// 1. 查询最大分区值
String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName;
List<RowMap> maxPartitionResult = rdsapi.getMaps(maxPartitionSql);
// 如果时间字段存在添加时间范围条件
if (timeField != null && !timeField.isEmpty()) {
conditionBuilder.append(" AND TO_DATE(")
if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) {
LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField);
return;
}
String maxPartition = maxPartitionResult.get(0).getString("max_partition");
LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition);
// 添加分区条件
conditionBuilder.append(partitionField)
.append(" = '")
.append(maxPartition)
.append("'");
// 如果时间字段存在添加时间范围条件
if (timeField != null && !timeField.isEmpty()) {
conditionBuilder.append(" AND TO_DATE(")
.append(timeField)
.append(", '")
.append(ORACLE_DATE_FORMAT)
.append("') BETWEEN TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("') AND TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("')");
// 构建排序子句
orderByBuilder.append(" ORDER BY ").append(timeField);
if (jezd != null && !jezd.isEmpty()) {
orderByBuilder.append(", ").append(jezd);
}
orderByBuilder.append(" DESC");
params.add(startDate);
params.add(endDate);
}
} else if (timeField != null && !timeField.isEmpty()) {
// 没有分区字段但时间字段存在使用时间范围条件
// 仅时间范围条件使用占位符
conditionBuilder.append("TO_DATE(")
.append(timeField)
.append(", '")
.append(ORACLE_DATE_FORMAT)
@ -245,7 +277,6 @@ public class ProductionDataSyncServiceImpl implements DataSyncService {
.append("') AND TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("')");
// 构建排序子句
orderByBuilder.append(" ORDER BY ").append(timeField);
if (jezd != null && !jezd.isEmpty()) {
@ -254,154 +285,126 @@ public class ProductionDataSyncServiceImpl implements DataSyncService {
orderByBuilder.append(" DESC");
params.add(startDate);
params.add(endDate);
} else {
// 既没有分区字段也没有时间字段查询全表
LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!");
conditionBuilder.append("1=1");
}
} else if (timeField != null && !timeField.isEmpty()) {
// 没有分区字段但时间字段存在使用时间范围条件
// 仅时间范围条件使用占位符
conditionBuilder.append("TO_DATE(")
.append(timeField)
.append(", '")
.append(ORACLE_DATE_FORMAT)
.append("') BETWEEN TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("') AND TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("')");
// 构建排序子句
orderByBuilder.append(" ORDER BY ").append(timeField);
if (jezd != null && !jezd.isEmpty()) {
orderByBuilder.append(", ").append(jezd);
}
orderByBuilder.append(" DESC");
params.add(startDate);
params.add(endDate);
// 分页查询数据
do {
// 使用Oracle分页语法 (12c+)
String querySql = "SELECT * FROM " + tableName +
" WHERE " + conditionBuilder.toString() +
orderByBuilder.toString(); // 添加排序子句
LOGGER.debug("执行Oracle查询: {}", querySql);
List<RowMap> pageData;
// 根据条件类型执行查询
if (partitionField != null && !partitionField.isEmpty() &&
timeField != null && !timeField.isEmpty()) {
// 分区+时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
} else if (timeField != null && !timeField.isEmpty()) {
// 仅时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
} else {
// 无时间范围查询仅分区或全表
pageData = rdsapi.getMaps(querySql);
}
if (pageData != null && !pageData.isEmpty()) {
// 直接处理当前页数据
int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable);
totalRows += pageData.size();
totalSuccess += successCount;
hasMore = pageData.size() == PAGE_SIZE;
pageNo++;
} else {
hasMore = false;
}
} while (hasMore);
} else {
// 既没有分区字段也没有时间字段查询全表
LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!");
conditionBuilder.append("1=1");
}
// 构建查询条件
StringBuilder conditionBuilder = new StringBuilder();
// 修改点分区字段和时间字段组合查询条件
if (partitionField != null && !partitionField.isEmpty()) {
// 1. 查询最大分区值
String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName;
List<RowMap> maxPartitionResult = rdsapi.getMaps(maxPartitionSql);
// 分页查询数据
do {
// 使用Oracle分页语法 (12c+)
String querySql = "SELECT * FROM ( " +
"SELECT t.*, ROWNUM rn FROM (" +
"SELECT * FROM " + tableName +
" WHERE " + conditionBuilder.toString() +
orderByBuilder.toString() + // 添加排序子句
") t WHERE ROWNUM <= " + (pageNo * PAGE_SIZE) +
") WHERE rn > " + ((pageNo - 1) * PAGE_SIZE);
if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) {
LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField);
return;
}
LOGGER.debug("执行Oracle查询: {}", querySql);
String maxPartition = maxPartitionResult.get(0).getString("max_partition");
LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition);
List<RowMap> pageData;
// 根据条件类型执行查询
if (partitionField != null && !partitionField.isEmpty() &&
timeField != null && !timeField.isEmpty()) {
// 分区+时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
// 添加分区条件
conditionBuilder.append(partitionField)
.append(" = '")
.append(maxPartition)
.append("'");
// 如果时间字段存在添加时间范围条件
if (timeField != null && !timeField.isEmpty()) {
conditionBuilder.append(" AND ")
.append(timeField)
.append(" BETWEEN ? AND ? ORDER BY " + timeField + "");
if (jezd != null && !jezd.isEmpty()) {
conditionBuilder.append(", " + jezd + " ");
}
conditionBuilder.append(" DESC");
}
} else if (timeField != null && !timeField.isEmpty()) {
// 仅时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
} else {
// 无时间范围查询仅分区或全表
pageData = rdsapi.getMaps(querySql);
}
if (pageData != null && !pageData.isEmpty()) {
// 直接处理当前页数据
int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable);
totalRows += pageData.size();
totalSuccess += successCount;
hasMore = pageData.size() == PAGE_SIZE;
pageNo++;
} else {
hasMore = false;
}
} while (hasMore);
}else {
// 构建查询条件
StringBuilder conditionBuilder = new StringBuilder();
// 修改点分区字段和时间字段组合查询条件
if (partitionField != null && !partitionField.isEmpty()) {
// 1. 查询最大分区值
String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName;
List<RowMap> maxPartitionResult = rdsapi.getMaps(maxPartitionSql);
if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) {
LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField);
return;
}
String maxPartition = maxPartitionResult.get(0).getString("max_partition");
LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition);
// 添加分区条件
conditionBuilder.append(partitionField)
.append(" = '")
.append(maxPartition)
.append("'");
// 如果时间字段存在添加时间范围条件
if (timeField != null && !timeField.isEmpty()) {
conditionBuilder.append(" AND ")
.append(timeField)
.append(" BETWEEN ? AND ? ORDER BY "+timeField+"");
if (jezd!=null && !jezd.isEmpty()){
conditionBuilder.append(", "+jezd+" ");
// 没有分区字段但时间字段存在使用时间范围条件
conditionBuilder.append(timeField)
.append(" BETWEEN ? AND ? ORDER BY " + timeField + "");
if (jezd != null && !jezd.isEmpty()) {
conditionBuilder.append(", " + jezd + " ");
}
conditionBuilder.append(" DESC");
} else {
// 既没有分区字段也没有时间字段查询全表实际应避免这种情况
LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!");
conditionBuilder.append("1=1");
}
} else if (timeField != null && !timeField.isEmpty()) {
// 没有分区字段但时间字段存在使用时间范围条件
conditionBuilder.append(timeField)
.append(" BETWEEN ? AND ? ORDER BY "+timeField+"");
if (jezd!=null && !jezd.isEmpty()){
conditionBuilder.append(", "+jezd+" ");
}
conditionBuilder.append(" DESC");
} else {
// 既没有分区字段也没有时间字段查询全表实际应避免这种情况
LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!");
conditionBuilder.append("1=1");
// 分页查询数据
do {
String querySql = "SELECT * FROM " + tableName +
" WHERE " + conditionBuilder.toString();
LOGGER.debug("执行查询querySql: {}", querySql);
List<RowMap> pageData;
// 根据条件类型执行查询
if (partitionField != null && !partitionField.isEmpty() &&
timeField != null && !timeField.isEmpty()) {
// 分区+时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
} else if (timeField != null && !timeField.isEmpty()) {
// 仅时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
} else {
// 无时间范围查询仅分区或全表
pageData = rdsapi.getMaps(querySql);
}
if (pageData != null && !pageData.isEmpty()) {
// 直接处理当前页数据
int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable);
totalRows += pageData.size();
totalSuccess += successCount;
hasMore = pageData.size() == PAGE_SIZE;
pageNo++;
} else {
hasMore = false;
}
} while (hasMore);
}
// 分页查询数据
do {
String querySqls = "SELECT * FROM " + tableName +
" WHERE " + conditionBuilder.toString() +" ";
// " LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
LOGGER.debug("执行查询querySqls: {}", querySqls);
String querySql = SQLPagination.getPaginitionSQL(querySqls, (pageNo - 1) * PAGE_SIZE, PAGE_SIZE,DBname);
LOGGER.debug("执行查询querySql: {}", querySql);
List<RowMap> pageData;
// 根据条件类型执行查询
if (partitionField != null && !partitionField.isEmpty() &&
timeField != null && !timeField.isEmpty()) {
// 分区+时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
} else if (timeField != null && !timeField.isEmpty()) {
// 仅时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
} else {
// 无时间范围查询仅分区或全表
pageData = rdsapi.getMaps(querySql);
}
if (pageData != null && !pageData.isEmpty()) {
// 直接处理当前页数据
int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable);
totalRows += pageData.size();
totalSuccess += successCount;
hasMore = pageData.size() == PAGE_SIZE;
pageNo++;
} else {
hasMore = false;
}
} while (hasMore);
}
LOGGER.info("从表[{}]共查询到{}条数据,成功同步{}条数据",
tableName, totalRows, totalSuccess);
} catch (Exception e) {
@ -457,6 +460,56 @@ public class ProductionDataSyncServiceImpl implements DataSyncService {
return successCount;
}
/**
* 将时间范围拆分为多个区间并确保结束时间包含时间部分以覆盖完整日期
* @param startDate 开始时间yyyy-MM-dd格式时间部分为00:00:00
* @param endDate 结束时间yyyy-MM-dd格式时间部分为00:00:00
* @param daysInterval 间隔天数
* @return 时间区间列表每个区间的结束时间调整为23:59:59以确保覆盖完整日期
*/
private static List<Date[]> splitTimeRange(Date startDate, Date endDate, int daysInterval) {
List<Date[]> ranges = new ArrayList<>();
Calendar calendar = Calendar.getInstance();
calendar.setTime(startDate);
// 调整结束时间以包含完整日期
Calendar endCal = Calendar.getInstance();
endCal.setTime(endDate);
endCal.set(Calendar.HOUR_OF_DAY, 23);
endCal.set(Calendar.MINUTE, 59);
endCal.set(Calendar.SECOND, 59);
endCal.set(Calendar.MILLISECOND, 999);
Date adjustedEndDate = endCal.getTime();
while (calendar.getTime().before(adjustedEndDate)) {
Date rangeStart = calendar.getTime();
// 增加间隔天数
calendar.add(Calendar.DAY_OF_MONTH, daysInterval);
Date potentialRangeEnd = calendar.getTime();
// 调整区间结束时间为当前日期的23:59:59
Calendar rangeEndCal = Calendar.getInstance();
rangeEndCal.setTime(potentialRangeEnd);
rangeEndCal.set(Calendar.HOUR_OF_DAY, 23);
rangeEndCal.set(Calendar.MINUTE, 59);
rangeEndCal.set(Calendar.SECOND, 59);
rangeEndCal.set(Calendar.MILLISECOND, 999);
Date rangeEnd = rangeEndCal.getTime();
// 如果调整后的区间结束时间超过最终结束时间使用调整后的结束时间
if (rangeEnd.after(adjustedEndDate)) {
rangeEnd = adjustedEndDate;
}
ranges.add(new Date[]{rangeStart, rangeEnd});
// 准备下一个区间的开始时间当前区间结束时间的下一天00:00:00
calendar.add(Calendar.MILLISECOND, 1);
}
return ranges;
}
/**
* 字段映射转换
* @param source 源数据记录

View File

@ -233,42 +233,73 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService {
boolean hasMore;
RDSAPI rdsapi = null;
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String startDate = simpleDateFormat.format(startDated);
String endDate = simpleDateFormat.format(endDated);
try {
rdsapi = SDK.getCCAPI().getRDSAPI(ccId);
DBUtils.SUPPLY supply = rdsapi.getSupply();
String DBname = supply.getName();
LOGGER.info("数据库为:{}",DBname);
if ("ORACLE".equalsIgnoreCase(DBname)){
// 构建查询条件
StringBuilder conditionBuilder = new StringBuilder();
StringBuilder orderByBuilder = new StringBuilder(); // 用于构建排序子句
List<Object> params = new ArrayList<>(); // 存储查询参数
// 分区字段和时间字段组合查询条件
if (partitionField != null && !partitionField.isEmpty()) {
// 1. 查询最大分区值
String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName;
List<RowMap> maxPartitionResult = rdsapi.getMaps(maxPartitionSql);
// 计算时间范围并拆分为30天一组
List<Date[]> timeRanges = splitTimeRange(startDated, endDated, 30);
LOGGER.info("时间范围拆分为 {} 个查询区间", timeRanges.size());
if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) {
LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField);
return;
}
for (int i = 0; i < timeRanges.size(); i++) {
Date[] range = timeRanges.get(i);
String startDate = simpleDateFormat.format(range[0]);
String endDate = simpleDateFormat.format(range[1]);
String maxPartition = maxPartitionResult.get(0).getString("max_partition");
LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition);
LOGGER.info("正在处理第 {} 个时间区间: {} 至 {}", i + 1, startDate, endDate);
if ("ORACLE".equalsIgnoreCase(DBname)) {
// 构建查询条件
StringBuilder conditionBuilder = new StringBuilder();
StringBuilder orderByBuilder = new StringBuilder(); // 用于构建排序子句
List<Object> params = new ArrayList<>(); // 存储查询参数
// 添加分区条件
conditionBuilder.append(partitionField)
.append(" = '")
.append(maxPartition)
.append("'");
// 分区字段和时间字段组合查询条件
if (partitionField != null && !partitionField.isEmpty()) {
// 1. 查询最大分区值
String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName;
List<RowMap> maxPartitionResult = rdsapi.getMaps(maxPartitionSql);
// 如果时间字段存在添加时间范围条件
if (timeField != null && !timeField.isEmpty()) {
conditionBuilder.append(" AND TO_DATE(")
if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) {
LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField);
return;
}
String maxPartition = maxPartitionResult.get(0).getString("max_partition");
LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition);
// 添加分区条件
conditionBuilder.append(partitionField)
.append(" = '")
.append(maxPartition)
.append("'");
// 如果时间字段存在添加时间范围条件
if (timeField != null && !timeField.isEmpty()) {
conditionBuilder.append(" AND TO_DATE(")
.append(timeField)
.append(", '")
.append(ORACLE_DATE_FORMAT)
.append("') BETWEEN TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("') AND TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("')");
// 构建排序子句
orderByBuilder.append(" ORDER BY ").append(timeField);
if (jezd != null && !jezd.isEmpty()) {
orderByBuilder.append(", ").append(jezd);
}
orderByBuilder.append(" DESC");
params.add(startDate);
params.add(endDate);
}
} else if (timeField != null && !timeField.isEmpty()) {
// 没有分区字段但时间字段存在使用时间范围条件
// 仅时间范围条件使用占位符
conditionBuilder.append("TO_DATE(")
.append(timeField)
.append(", '")
.append(ORACLE_DATE_FORMAT)
@ -277,7 +308,6 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService {
.append("') AND TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("')");
// 构建排序子句
orderByBuilder.append(" ORDER BY ").append(timeField);
if (jezd != null && !jezd.isEmpty()) {
@ -286,163 +316,183 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService {
orderByBuilder.append(" DESC");
params.add(startDate);
params.add(endDate);
} else {
// 既没有分区字段也没有时间字段查询全表
LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!");
conditionBuilder.append("1=1");
}
} else if (timeField != null && !timeField.isEmpty()) {
// 没有分区字段但时间字段存在使用时间范围条件
// 仅时间范围条件使用占位符
conditionBuilder.append("TO_DATE(")
.append(timeField)
.append(", '")
.append(ORACLE_DATE_FORMAT)
.append("') BETWEEN TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("') AND TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("')");
// 构建排序子句
orderByBuilder.append(" ORDER BY ").append(timeField);
if (jezd != null && !jezd.isEmpty()) {
orderByBuilder.append(", ").append(jezd);
}
orderByBuilder.append(" DESC");
params.add(startDate);
params.add(endDate);
// 分页查询数据
do {
String querySql = "SELECT * FROM " + tableName +
" WHERE " + conditionBuilder.toString() +
orderByBuilder.toString(); // 添加排序子句
LOGGER.debug("执行Oracle查询: {}", querySql);
List<RowMap> pageData;
// 根据条件类型执行查询
if (partitionField != null && !partitionField.isEmpty() &&
timeField != null && !timeField.isEmpty()) {
// 分区+时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
} else if (timeField != null && !timeField.isEmpty()) {
// 仅时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
} else {
// 无时间范围查询仅分区或全表
pageData = rdsapi.getMaps(querySql);
}
if (pageData != null && !pageData.isEmpty()) {
// 直接处理当前页数据
int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable);
totalRows += pageData.size();
totalSuccess += successCount;
hasMore = pageData.size() == PAGE_SIZE;
pageNo++;
} else {
hasMore = false;
}
} while (hasMore);
} else {
// 既没有分区字段也没有时间字段查询全表
LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!");
conditionBuilder.append("1=1");
}
// 构建查询条件
StringBuilder conditionBuilder = new StringBuilder();
// 修改点分区字段和时间字段组合查询条件
if (partitionField != null && !partitionField.isEmpty()) {
// 1. 查询最大分区值
String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName;
List<RowMap> maxPartitionResult = rdsapi.getMaps(maxPartitionSql);
// 分页查询数据
do {
// 使用Oracle分页语法 (12c+)
String querySql = "SELECT * FROM ( " +
"SELECT t.*, ROWNUM rn FROM (" +
"SELECT * FROM " + tableName +
" WHERE " + conditionBuilder.toString() +
orderByBuilder.toString() + // 添加排序子句
") t WHERE ROWNUM <= " + (pageNo * PAGE_SIZE) +
") WHERE rn > " + ((pageNo - 1) * PAGE_SIZE);
if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) {
LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField);
return;
}
LOGGER.debug("执行Oracle查询: {}", querySql);
String maxPartition = maxPartitionResult.get(0).getString("max_partition");
LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition);
List<RowMap> pageData;
// 根据条件类型执行查询
if (partitionField != null && !partitionField.isEmpty() &&
timeField != null && !timeField.isEmpty()) {
// 分区+时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
// 添加分区条件
conditionBuilder.append(partitionField)
.append(" = '")
.append(maxPartition)
.append("'");
// 如果时间字段存在添加时间范围条件
if (timeField != null && !timeField.isEmpty()) {
conditionBuilder.append(" AND ")
.append(timeField)
.append(" BETWEEN ? AND ? ORDER BY " + timeField + "");
if (jezd != null && !jezd.isEmpty()) {
conditionBuilder.append(", " + jezd + " ");
}
conditionBuilder.append(" DESC");
}
} else if (timeField != null && !timeField.isEmpty()) {
// 仅时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
} else {
// 无时间范围查询仅分区或全表
pageData = rdsapi.getMaps(querySql);
}
if (pageData != null && !pageData.isEmpty()) {
// 直接处理当前页数据
int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable);
totalRows += pageData.size();
totalSuccess += successCount;
hasMore = pageData.size() == PAGE_SIZE;
pageNo++;
} else {
hasMore = false;
}
} while (hasMore);
}else {
// 构建查询条件
StringBuilder conditionBuilder = new StringBuilder();
// 修改点分区字段和时间字段组合查询条件
if (partitionField != null && !partitionField.isEmpty()) {
// 1. 查询最大分区值
String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName;
List<RowMap> maxPartitionResult = rdsapi.getMaps(maxPartitionSql);
if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) {
LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField);
return;
}
String maxPartition = maxPartitionResult.get(0).getString("max_partition");
LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition);
// 添加分区条件
conditionBuilder.append(partitionField)
.append(" = '")
.append(maxPartition)
.append("'");
// 如果时间字段存在添加时间范围条件
if (timeField != null && !timeField.isEmpty()) {
conditionBuilder.append(" AND ")
.append(timeField)
.append(" BETWEEN ? AND ? ORDER BY "+timeField+"");
if (jezd!=null && !jezd.isEmpty()){
conditionBuilder.append(", "+jezd+" ");
// 没有分区字段但时间字段存在使用时间范围条件
conditionBuilder.append(timeField)
.append(" BETWEEN ? AND ? ORDER BY " + timeField + "");
if (jezd != null && !jezd.isEmpty()) {
conditionBuilder.append(", " + jezd + " ");
}
conditionBuilder.append(" DESC");
} else {
// 既没有分区字段也没有时间字段查询全表实际应避免这种情况
LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!");
conditionBuilder.append("1=1");
}
} else if (timeField != null && !timeField.isEmpty()) {
// 没有分区字段但时间字段存在使用时间范围条件
conditionBuilder.append(timeField)
.append(" BETWEEN ? AND ? ORDER BY "+timeField+"");
if (jezd!=null && !jezd.isEmpty()){
conditionBuilder.append(", "+jezd+" ");
}
conditionBuilder.append(" DESC");
} else {
// 既没有分区字段也没有时间字段查询全表实际应避免这种情况
LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!");
conditionBuilder.append("1=1");
// 分页查询数据
do {
String querySql = "SELECT * FROM " + tableName +
" WHERE " + conditionBuilder.toString();
LOGGER.debug("执行查询querySql: {}", querySql);
List<RowMap> pageData;
// 根据条件类型执行查询
if (partitionField != null && !partitionField.isEmpty() &&
timeField != null && !timeField.isEmpty()) {
// 分区+时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
} else if (timeField != null && !timeField.isEmpty()) {
// 仅时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
} else {
// 无时间范围查询仅分区或全表
pageData = rdsapi.getMaps(querySql);
}
if (pageData != null && !pageData.isEmpty()) {
// 直接处理当前页数据
int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable);
totalRows += pageData.size();
totalSuccess += successCount;
hasMore = pageData.size() == PAGE_SIZE;
pageNo++;
} else {
hasMore = false;
}
} while (hasMore);
}
// 分页查询数据
do {
String querySqls = "SELECT * FROM " + tableName +
" WHERE " + conditionBuilder.toString() +" ";
// " LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
LOGGER.debug("执行查询querySqls: {}", querySqls);
String querySql = SQLPagination.getPaginitionSQL(querySqls, (pageNo - 1) * PAGE_SIZE, PAGE_SIZE,DBname);
LOGGER.debug("执行查询querySql: {}", querySql);
List<RowMap> pageData;
// 根据条件类型执行查询
if (partitionField != null && !partitionField.isEmpty() &&
timeField != null && !timeField.isEmpty()) {
// 分区+时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
} else if (timeField != null && !timeField.isEmpty()) {
// 仅时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
} else {
// 无时间范围查询仅分区或全表
pageData = rdsapi.getMaps(querySql);
}
if (pageData != null && !pageData.isEmpty()) {
// 直接处理当前页数据
int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable);
totalRows += pageData.size();
totalSuccess += successCount;
hasMore = pageData.size() == PAGE_SIZE;
pageNo++;
} else {
hasMore = false;
}
} while (hasMore);
}
LOGGER.info("从表[{}]共查询到{}条数据,成功同步{}条数据",
tableName, totalRows, totalSuccess);
} catch (Exception e) {
throw new RuntimeException("查询源表[" + tableName + "]数据失败: " + e.getMessage(), e);
}finally {
}
}
/**
* 将时间范围拆分为多个区间并确保结束时间包含时间部分以覆盖完整日期
* @param startDate 开始时间yyyy-MM-dd格式时间部分为00:00:00
* @param endDate 结束时间yyyy-MM-dd格式时间部分为00:00:00
* @param daysInterval 间隔天数
* @return 时间区间列表每个区间的结束时间调整为23:59:59以确保覆盖完整日期
*/
private static List<Date[]> splitTimeRange(Date startDate, Date endDate, int daysInterval) {
List<Date[]> ranges = new ArrayList<>();
Calendar calendar = Calendar.getInstance();
calendar.setTime(startDate);
// 调整结束时间以包含完整日期
Calendar endCal = Calendar.getInstance();
endCal.setTime(endDate);
endCal.set(Calendar.HOUR_OF_DAY, 23);
endCal.set(Calendar.MINUTE, 59);
endCal.set(Calendar.SECOND, 59);
endCal.set(Calendar.MILLISECOND, 999);
Date adjustedEndDate = endCal.getTime();
while (calendar.getTime().before(adjustedEndDate)) {
Date rangeStart = calendar.getTime();
// 增加间隔天数
calendar.add(Calendar.DAY_OF_MONTH, daysInterval);
Date potentialRangeEnd = calendar.getTime();
// 调整区间结束时间为当前日期的23:59:59
Calendar rangeEndCal = Calendar.getInstance();
rangeEndCal.setTime(potentialRangeEnd);
rangeEndCal.set(Calendar.HOUR_OF_DAY, 23);
rangeEndCal.set(Calendar.MINUTE, 59);
rangeEndCal.set(Calendar.SECOND, 59);
rangeEndCal.set(Calendar.MILLISECOND, 999);
Date rangeEnd = rangeEndCal.getTime();
// 如果调整后的区间结束时间超过最终结束时间使用调整后的结束时间
if (rangeEnd.after(adjustedEndDate)) {
rangeEnd = adjustedEndDate;
}
ranges.add(new Date[]{rangeStart, rangeEnd});
// 准备下一个区间的开始时间当前区间结束时间的下一天00:00:00
calendar.add(Calendar.MILLISECOND, 1);
}
return ranges;
}
/**
* 处理并插入数据到目标表
* @param sourceData 源数据列表
@ -755,61 +805,51 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService {
bo.set(key, map.get(key));
}
}
// LOGGER.info("明细汇总字段值set完毕-01");
if (bo!=null) {
// 如果是采购_入库单汇总 刷新物料名称
if (hzb.equals("BO_EU_DWD_ORDER_RKD_HZ")) {
String bkgs = bo.getString("BKGS");
String wlmc = bo.getString("WLMC");
String wlbm = bo.getString("WLBM");
String jldw = bo.getString("JLDW");
String wlfl = bo.getString("WLFL");
Double djhyfs = bo.get("DJHYF",Double.class);
double djhyf = djhyfs != null ? djhyfs : 0.0;
// LOGGER.info("采购_入库单汇总,刷新物料名称------物料名称:{},板块公司:{},物料编码:{},入库单位:{},单价:{},物料分类:{}", wlmc, bkgs, wlbm, jldw, djhyf, wlfl);
// 如果是采购_入库单汇总 刷新物料名称
if (hzb.equals("BO_EU_DWD_ORDER_RKD_HZ")) {
String bkgs = bo.getString("BKGS");
String wlmc = bo.getString("WLMC");
String wlbm = bo.getString("WLBM");
String jldw = bo.getString("JLDW");
String wlfl = bo.getString("WLFL");
Double djhyfs = bo.get("DJHYF",Double.class);
double djhyf = djhyfs != null ? djhyfs : 0.0;
String newWlmc = "";
if (StringUtils.isNotBlank(wlmc) || StringUtils.isNotBlank(wlbm)
|| StringUtils.isNotBlank(jldw)
|| StringUtils.isNotBlank(wlfl)) {
try {
newWlmc = purchaseUtil.materialClassificationFiltering(bkgs, wlmc, wlbm, jldw, djhyf, wlfl);
} catch (Exception e) {
e.printStackTrace();
}
if (bkgs.equals("北新嘉宝莉")) {
if (wlmc.contains("") && djhyf < 5.0) {
continue;
} else if (wlmc.contains("桶身") && gaiSum > 0) {
Double djhyf1 = bo.get("DJHYF", Double.class);
bo.set("DJHYF", djhyf1 + gaiAverage);
gaiSum--;
}
}
bo.set("WLMC", newWlmc);
bo.set("OLDWLMC", wlmc);
// LOGGER.info("明细汇总:物料分类更新完毕-02");
} else {
// LOGGER.info("明细汇总:物料名称为空不进行汇总-03");
String newWlmc = "";
if (StringUtils.isNotBlank(wlmc) || StringUtils.isNotBlank(wlbm)
|| StringUtils.isNotBlank(jldw)
|| StringUtils.isNotBlank(wlfl)) {
try {
newWlmc = purchaseUtil.materialClassificationFiltering(bkgs, wlmc, wlbm, jldw, djhyf, wlfl);
} catch (Exception e) {
e.printStackTrace();
}
// LOGGER.info("采购_入库单汇总,刷新物料名称------物料名称:{},板块公司:{},物料编码:{},入库单位:{},单价:{},物料分类:{}", wlmc, bkgs, wlbm, jldw, djhyf, wlfl);
if ("泰山石膏".equals(bkgs)) {
// 泰山石膏处理入库单金额 入库数量*含税单价
Double rksl = bo.get("RKSL", Double.class);// 入库数量
Double hsdjhyf = bo.get("HSDJHYF", Double.class);// 含税单价含运费
// 处理可能为null的值默认设为0.0
double safeRksl = rksl != null ? rksl : 0.0;
double safeHsdjhyf = hsdjhyf != null ? hsdjhyf : 0.0;
BigDecimal multiply = BigDecimal.valueOf(safeRksl).multiply(BigDecimal.valueOf(safeHsdjhyf));
bo.set("DHJE", multiply.doubleValue());
if (bkgs.equals("北新嘉宝莉")) {
if (wlmc.contains("") && djhyf < 5.0) {
continue;
} else if (wlmc.contains("桶身") && gaiSum > 0) {
Double djhyf1 = bo.get("DJHYF", Double.class);
bo.set("DJHYF", djhyf1 + gaiAverage);
gaiSum--;
}
}
bo.set("WLMC", newWlmc);
bo.set("OLDWLMC", wlmc);
}
// LOGGER.info("采购_入库单汇总,刷新物料名称------物料名称:{},板块公司:{},物料编码:{},入库单位:{},单价:{},物料分类:{}", wlmc, bkgs, wlbm, jldw, djhyf, wlfl);
if ("泰山石膏".equals(bkgs)) {
// 泰山石膏处理入库单金额 入库数量*含税单价
Double rksl = bo.get("RKSL", Double.class);// 入库数量
Double hsdjhyf = bo.get("HSDJHYF", Double.class);// 含税单价含运费
// 处理可能为null的值默认设为0.0
double safeRksl = rksl != null ? rksl : 0.0;
double safeHsdjhyf = hsdjhyf != null ? hsdjhyf : 0.0;
bos.add(bo);
// LOGGER.info("明细汇总bo add完毕-04");
BigDecimal multiply = BigDecimal.valueOf(safeRksl).multiply(BigDecimal.valueOf(safeHsdjhyf));
bo.set("DHJE", multiply.doubleValue());
}
}
bos.add(bo);
}
SDK.getBOAPI().createDataBO(hzb, bos, UserContext.fromUID("admin"));

View File

@ -178,42 +178,73 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
boolean hasMore;
RDSAPI rdsapi = null;
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String startDate = simpleDateFormat.format(startDated);
String endDate = simpleDateFormat.format(endDated);
try {
rdsapi = SDK.getCCAPI().getRDSAPI(ccId);
DBUtils.SUPPLY supply = rdsapi.getSupply();
String DBname = supply.getName();
LOGGER.info("数据库为:{}",DBname);
if ("ORACLE".equalsIgnoreCase(DBname)){
// 构建查询条件
StringBuilder conditionBuilder = new StringBuilder();
StringBuilder orderByBuilder = new StringBuilder(); // 用于构建排序子句
List<Object> params = new ArrayList<>(); // 存储查询参数
// 分区字段和时间字段组合查询条件
if (partitionField != null && !partitionField.isEmpty()) {
// 1. 查询最大分区值
String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName;
List<RowMap> maxPartitionResult = rdsapi.getMaps(maxPartitionSql);
// 计算时间范围并拆分为30天一组
List<Date[]> timeRanges = splitTimeRange(startDated, endDated, 30);
LOGGER.info("时间范围拆分为 {} 个查询区间", timeRanges.size());
if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) {
LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField);
return;
}
for (int i = 0; i < timeRanges.size(); i++) {
Date[] range = timeRanges.get(i);
String startDate = simpleDateFormat.format(range[0]);
String endDate = simpleDateFormat.format(range[1]);
String maxPartition = maxPartitionResult.get(0).getString("max_partition");
LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition);
LOGGER.info("正在处理第 {} 个时间区间: {} 至 {}", i + 1, startDate, endDate);
if ("ORACLE".equalsIgnoreCase(DBname)) {
// 构建查询条件
StringBuilder conditionBuilder = new StringBuilder();
StringBuilder orderByBuilder = new StringBuilder(); // 用于构建排序子句
List<Object> params = new ArrayList<>(); // 存储查询参数
// 添加分区条件
conditionBuilder.append(partitionField)
.append(" = '")
.append(maxPartition)
.append("'");
// 分区字段和时间字段组合查询条件
if (partitionField != null && !partitionField.isEmpty()) {
// 1. 查询最大分区值
String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName;
List<RowMap> maxPartitionResult = rdsapi.getMaps(maxPartitionSql);
// 如果时间字段存在添加时间范围条件
if (timeField != null && !timeField.isEmpty()) {
conditionBuilder.append(" AND TO_DATE(")
if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) {
LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField);
return;
}
String maxPartition = maxPartitionResult.get(0).getString("max_partition");
LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition);
// 添加分区条件
conditionBuilder.append(partitionField)
.append(" = '")
.append(maxPartition)
.append("'");
// 如果时间字段存在添加时间范围条件
if (timeField != null && !timeField.isEmpty()) {
conditionBuilder.append(" AND TO_DATE(")
.append(timeField)
.append(", '")
.append(ORACLE_DATE_FORMAT)
.append("') BETWEEN TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("') AND TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("')");
// 构建排序子句
orderByBuilder.append(" ORDER BY ").append(timeField);
if (jezd != null && !jezd.isEmpty()) {
orderByBuilder.append(", ").append(jezd);
}
orderByBuilder.append(" DESC");
params.add(startDate);
params.add(endDate);
}
} else if (timeField != null && !timeField.isEmpty()) {
// 没有分区字段但时间字段存在使用时间范围条件
// 仅时间范围条件使用占位符
conditionBuilder.append("TO_DATE(")
.append(timeField)
.append(", '")
.append(ORACLE_DATE_FORMAT)
@ -222,7 +253,6 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
.append("') AND TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("')");
// 构建排序子句
orderByBuilder.append(" ORDER BY ").append(timeField);
if (jezd != null && !jezd.isEmpty()) {
@ -231,160 +261,131 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
orderByBuilder.append(" DESC");
params.add(startDate);
params.add(endDate);
} else {
// 既没有分区字段也没有时间字段查询全表
LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!");
conditionBuilder.append("1=1");
}
} else if (timeField != null && !timeField.isEmpty()) {
// 没有分区字段但时间字段存在使用时间范围条件
// 仅时间范围条件使用占位符
conditionBuilder.append("TO_DATE(")
.append(timeField)
.append(", '")
.append(ORACLE_DATE_FORMAT)
.append("') BETWEEN TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("') AND TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("')");
// 构建排序子句
orderByBuilder.append(" ORDER BY ").append(timeField);
if (jezd != null && !jezd.isEmpty()) {
orderByBuilder.append(", ").append(jezd);
}
orderByBuilder.append(" DESC");
params.add(startDate);
params.add(endDate);
// 分页查询数据
do {
// 使用Oracle分页语法 (12c+)
String querySql = "SELECT * FROM " + tableName +
" WHERE " + conditionBuilder.toString() +
orderByBuilder.toString(); // 添加排序子句
LOGGER.debug("执行Oracle查询: {}", querySql);
List<RowMap> pageData;
// 根据条件类型执行查询
if (partitionField != null && !partitionField.isEmpty() &&
timeField != null && !timeField.isEmpty()) {
// 分区+时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
} else if (timeField != null && !timeField.isEmpty()) {
// 仅时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
} else {
// 无时间范围查询仅分区或全表
pageData = rdsapi.getMaps(querySql);
}
if (pageData != null && !pageData.isEmpty()) {
// 直接处理当前页数据
int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable);
totalRows += pageData.size();
totalSuccess += successCount;
hasMore = pageData.size() == PAGE_SIZE;
pageNo++;
} else {
hasMore = false;
}
} while (hasMore);
} else {
// 既没有分区字段也没有时间字段查询全表
LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!");
conditionBuilder.append("1=1");
}
// 构建查询条件
StringBuilder conditionBuilder = new StringBuilder();
// 修改点分区字段和时间字段组合查询条件
if (partitionField != null && !partitionField.isEmpty()) {
// 1. 查询最大分区值
String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName;
List<RowMap> maxPartitionResult = rdsapi.getMaps(maxPartitionSql);
// 分页查询数据
do {
// 使用Oracle分页语法 (12c+)
String querySql = "SELECT * FROM ( " +
"SELECT t.*, ROWNUM rn FROM (" +
"SELECT * FROM " + tableName +
" WHERE " + conditionBuilder.toString() +
orderByBuilder.toString() + // 添加排序子句
") t WHERE ROWNUM <= " + (pageNo * PAGE_SIZE) +
") WHERE rn > " + ((pageNo - 1) * PAGE_SIZE);
if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) {
LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField);
return;
}
LOGGER.debug("执行Oracle查询: {}", querySql);
String maxPartition = maxPartitionResult.get(0).getString("max_partition");
LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition);
List<RowMap> pageData;
// 根据条件类型执行查询
if (partitionField != null && !partitionField.isEmpty() &&
timeField != null && !timeField.isEmpty()) {
// 分区+时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
// 添加分区条件
conditionBuilder.append(partitionField)
.append(" = '")
.append(maxPartition)
.append("'");
// 如果时间字段存在添加时间范围条件
if (timeField != null && !timeField.isEmpty()) {
conditionBuilder.append(" AND ")
.append(timeField)
.append(" BETWEEN ? AND ? ORDER BY " + timeField + "");
if (jezd != null && !jezd.isEmpty()) {
conditionBuilder.append(", " + jezd + " ");
}
conditionBuilder.append(" DESC");
}
} else if (timeField != null && !timeField.isEmpty()) {
// 仅时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
} else {
// 无时间范围查询仅分区或全表
pageData = rdsapi.getMaps(querySql);
}
if (pageData != null && !pageData.isEmpty()) {
// 直接处理当前页数据
int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable);
totalRows += pageData.size();
totalSuccess += successCount;
hasMore = pageData.size() == PAGE_SIZE;
pageNo++;
} else {
hasMore = false;
}
} while (hasMore);
}else {
// 构建查询条件
StringBuilder conditionBuilder = new StringBuilder();
// 修改点分区字段和时间字段组合查询条件
if (partitionField != null && !partitionField.isEmpty()) {
// 1. 查询最大分区值
String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName;
List<RowMap> maxPartitionResult = rdsapi.getMaps(maxPartitionSql);
if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) {
LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField);
return;
}
String maxPartition = maxPartitionResult.get(0).getString("max_partition");
LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition);
// 添加分区条件
conditionBuilder.append(partitionField)
.append(" = '")
.append(maxPartition)
.append("'");
// 如果时间字段存在添加时间范围条件
if (timeField != null && !timeField.isEmpty()) {
conditionBuilder.append(" AND ")
.append(timeField)
.append(" BETWEEN ? AND ? ORDER BY "+timeField+"");
if (jezd!=null && !jezd.isEmpty()){
conditionBuilder.append(", "+jezd+" ");
// 没有分区字段但时间字段存在使用时间范围条件
conditionBuilder.append(timeField)
.append(" BETWEEN ? AND ? ORDER BY " + timeField + "");
if (jezd != null && !jezd.isEmpty()) {
conditionBuilder.append(", " + jezd + " ");
}
conditionBuilder.append(" DESC");
} else {
// 既没有分区字段也没有时间字段查询全表实际应避免这种情况
LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!");
conditionBuilder.append("1=1");
}
} else if (timeField != null && !timeField.isEmpty()) {
// 没有分区字段但时间字段存在使用时间范围条件
conditionBuilder.append(timeField)
.append(" BETWEEN ? AND ? ORDER BY "+timeField+"");
if (jezd!=null && !jezd.isEmpty()){
conditionBuilder.append(", "+jezd+" ");
}
conditionBuilder.append(" DESC");
} else {
// 既没有分区字段也没有时间字段查询全表实际应避免这种情况
LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!");
conditionBuilder.append("1=1");
// 分页查询数据
do {
String querySql = "SELECT * FROM " + tableName +
" WHERE " + conditionBuilder.toString();
LOGGER.info("执行查询querySql: {}", querySql);
List<RowMap> pageData;
// 根据条件类型执行查询
if (partitionField != null && !partitionField.isEmpty() &&
timeField != null && !timeField.isEmpty()) {
// 分区+时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
} else if (timeField != null && !timeField.isEmpty()) {
// 仅时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
} else {
// 无时间范围查询仅分区或全表
pageData = rdsapi.getMaps(querySql);
}
if (pageData != null && !pageData.isEmpty()) {
// 直接处理当前页数据
int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable);
totalRows += pageData.size();
totalSuccess += successCount;
hasMore = pageData.size() == PAGE_SIZE;
pageNo++;
} else {
hasMore = false;
}
} while (hasMore);
}
// 分页查询数据
do {
String querySqls = "SELECT * FROM " + tableName +
" WHERE " + conditionBuilder.toString() +" ";
// " LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
LOGGER.info("执行查询querySqls: {}", querySqls);
String querySql = SQLPagination.getPaginitionSQL(querySqls, (pageNo - 1) * PAGE_SIZE, PAGE_SIZE,DBname);
LOGGER.info("执行查询querySql: {}", querySql);
List<RowMap> pageData;
// 根据条件类型执行查询
if (partitionField != null && !partitionField.isEmpty() &&
timeField != null && !timeField.isEmpty()) {
// 分区+时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
} else if (timeField != null && !timeField.isEmpty()) {
// 仅时间范围查询
pageData = rdsapi.getMaps(querySql, startDate, endDate);
} else {
// 无时间范围查询仅分区或全表
pageData = rdsapi.getMaps(querySql);
}
if (pageData != null && !pageData.isEmpty()) {
// 直接处理当前页数据
int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable);
totalRows += pageData.size();
totalSuccess += successCount;
hasMore = pageData.size() == PAGE_SIZE;
pageNo++;
} else {
hasMore = false;
}
} while (hasMore);
}
LOGGER.info("从表[{}]共查询到{}条数据,成功同步{}条数据",
tableName, totalRows, totalSuccess);
} catch (Exception e) {
throw new RuntimeException("查询源表[" + tableName + "]数据失败: " + e.getMessage(), e);
}finally {
}
}
@ -441,6 +442,56 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
}
}
/**
* 将时间范围拆分为多个区间并确保结束时间包含时间部分以覆盖完整日期
* @param startDate 开始时间yyyy-MM-dd格式时间部分为00:00:00
* @param endDate 结束时间yyyy-MM-dd格式时间部分为00:00:00
* @param daysInterval 间隔天数
* @return 时间区间列表每个区间的结束时间调整为23:59:59以确保覆盖完整日期
*/
private static List<Date[]> splitTimeRange(Date startDate, Date endDate, int daysInterval) {
List<Date[]> ranges = new ArrayList<>();
Calendar calendar = Calendar.getInstance();
calendar.setTime(startDate);
// 调整结束时间以包含完整日期
Calendar endCal = Calendar.getInstance();
endCal.setTime(endDate);
endCal.set(Calendar.HOUR_OF_DAY, 23);
endCal.set(Calendar.MINUTE, 59);
endCal.set(Calendar.SECOND, 59);
endCal.set(Calendar.MILLISECOND, 999);
Date adjustedEndDate = endCal.getTime();
while (calendar.getTime().before(adjustedEndDate)) {
Date rangeStart = calendar.getTime();
// 增加间隔天数
calendar.add(Calendar.DAY_OF_MONTH, daysInterval);
Date potentialRangeEnd = calendar.getTime();
// 调整区间结束时间为当前日期的23:59:59
Calendar rangeEndCal = Calendar.getInstance();
rangeEndCal.setTime(potentialRangeEnd);
rangeEndCal.set(Calendar.HOUR_OF_DAY, 23);
rangeEndCal.set(Calendar.MINUTE, 59);
rangeEndCal.set(Calendar.SECOND, 59);
rangeEndCal.set(Calendar.MILLISECOND, 999);
Date rangeEnd = rangeEndCal.getTime();
// 如果调整后的区间结束时间超过最终结束时间使用调整后的结束时间
if (rangeEnd.after(adjustedEndDate)) {
rangeEnd = adjustedEndDate;
}
ranges.add(new Date[]{rangeStart, rangeEnd});
// 准备下一个区间的开始时间当前区间结束时间的下一天00:00:00
calendar.add(Calendar.MILLISECOND, 1);
}
return ranges;
}
/**
* 处理并插入数据到目标表
* @param sourceData 源数据列表