diff --git a/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/build/classes/java/main/com/awspaas/user/apps/bnbm/datalinkup/service/impl/PurchaseDataSyncServiceImpl.class b/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/build/classes/java/main/com/awspaas/user/apps/bnbm/datalinkup/service/impl/PurchaseDataSyncServiceImpl.class index 700e0d3..6efdef6 100644 Binary files a/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/build/classes/java/main/com/awspaas/user/apps/bnbm/datalinkup/service/impl/PurchaseDataSyncServiceImpl.class and b/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/build/classes/java/main/com/awspaas/user/apps/bnbm/datalinkup/service/impl/PurchaseDataSyncServiceImpl.class differ diff --git a/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/build/classes/java/main/com/awspaas/user/apps/bnbm/datalinkup/service/impl/SaleDataSyncServiceImpl.class b/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/build/classes/java/main/com/awspaas/user/apps/bnbm/datalinkup/service/impl/SaleDataSyncServiceImpl.class index a5e3a55..3882eb8 100644 Binary files a/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/build/classes/java/main/com/awspaas/user/apps/bnbm/datalinkup/service/impl/SaleDataSyncServiceImpl.class and b/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/build/classes/java/main/com/awspaas/user/apps/bnbm/datalinkup/service/impl/SaleDataSyncServiceImpl.class differ diff --git a/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/build/tmp/compileJava/previous-compilation-data.bin b/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/build/tmp/compileJava/previous-compilation-data.bin index 7ddf5b0..60727c5 100644 Binary files a/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/build/tmp/compileJava/previous-compilation-data.bin and b/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/build/tmp/compileJava/previous-compilation-data.bin differ diff --git a/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/src/main/java/com/awspaas/user/apps/bnbm/datalinkup/service/impl/ProductionDataSyncServiceImpl.java b/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/src/main/java/com/awspaas/user/apps/bnbm/datalinkup/service/impl/ProductionDataSyncServiceImpl.java index 451313c..cb57fb4 100644 --- a/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/src/main/java/com/awspaas/user/apps/bnbm/datalinkup/service/impl/ProductionDataSyncServiceImpl.java +++ b/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/src/main/java/com/awspaas/user/apps/bnbm/datalinkup/service/impl/ProductionDataSyncServiceImpl.java @@ -201,42 +201,74 @@ public class ProductionDataSyncServiceImpl implements DataSyncService { boolean hasMore; RDSAPI rdsapi = null; SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - String startDate = simpleDateFormat.format(startDated); - String endDate = simpleDateFormat.format(endDated); + try { rdsapi = SDK.getCCAPI().getRDSAPI(ccId); DBUtils.SUPPLY supply = rdsapi.getSupply(); String DBname = supply.getName(); LOGGER.info("数据库为:{}",DBname); - if ("ORACLE".equalsIgnoreCase(DBname)){ - // 构建查询条件 - StringBuilder conditionBuilder = new StringBuilder(); - StringBuilder orderByBuilder = new StringBuilder(); // 用于构建排序子句 - List params = new ArrayList<>(); // 存储查询参数 - // 分区字段和时间字段组合查询条件 - if (partitionField != null && !partitionField.isEmpty()) { - // 1. 查询最大分区值 - String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName; - List maxPartitionResult = rdsapi.getMaps(maxPartitionSql); + // 计算时间范围并拆分为30天一组 + List timeRanges = splitTimeRange(startDated, endDated, 30); + LOGGER.info("时间范围拆分为 {} 个查询区间", timeRanges.size()); - if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) { - LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField); - return; - } + for (int i = 0; i < timeRanges.size(); i++) { + Date[] range = timeRanges.get(i); + String startDate = simpleDateFormat.format(range[0]); + String endDate = simpleDateFormat.format(range[1]); - String maxPartition = maxPartitionResult.get(0).getString("max_partition"); - LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition); + LOGGER.info("正在处理第 {} 个时间区间: {} 至 {}", i + 1, startDate, endDate); + if ("ORACLE".equalsIgnoreCase(DBname)) { + // 构建查询条件 + StringBuilder conditionBuilder = new StringBuilder(); + StringBuilder orderByBuilder = new StringBuilder(); // 用于构建排序子句 + List params = new ArrayList<>(); // 存储查询参数 - // 添加分区条件 - conditionBuilder.append(partitionField) - .append(" = '") - .append(maxPartition) - .append("'"); + // 分区字段和时间字段组合查询条件 + if (partitionField != null && !partitionField.isEmpty()) { + // 1. 查询最大分区值 + String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName; + List maxPartitionResult = rdsapi.getMaps(maxPartitionSql); - // 如果时间字段存在,添加时间范围条件 - if (timeField != null && !timeField.isEmpty()) { - conditionBuilder.append(" AND TO_DATE(") + if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) { + LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField); + return; + } + + String maxPartition = maxPartitionResult.get(0).getString("max_partition"); + LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition); + + // 添加分区条件 + conditionBuilder.append(partitionField) + .append(" = '") + .append(maxPartition) + .append("'"); + + // 如果时间字段存在,添加时间范围条件 + if (timeField != null && !timeField.isEmpty()) { + conditionBuilder.append(" AND TO_DATE(") + .append(timeField) + .append(", '") + .append(ORACLE_DATE_FORMAT) + .append("') BETWEEN TO_DATE(?, '") + .append(ORACLE_DATE_FORMAT) + .append("') AND TO_DATE(?, '") + .append(ORACLE_DATE_FORMAT) + .append("')"); + + // 构建排序子句 + orderByBuilder.append(" ORDER BY ").append(timeField); + if (jezd != null && !jezd.isEmpty()) { + orderByBuilder.append(", ").append(jezd); + } + orderByBuilder.append(" DESC"); + params.add(startDate); + params.add(endDate); + } + } else if (timeField != null && !timeField.isEmpty()) { + // 没有分区字段,但时间字段存在,使用时间范围条件 + // 仅时间范围条件(使用占位符) + conditionBuilder.append("TO_DATE(") .append(timeField) .append(", '") .append(ORACLE_DATE_FORMAT) @@ -245,7 +277,6 @@ public class ProductionDataSyncServiceImpl implements DataSyncService { .append("') AND TO_DATE(?, '") .append(ORACLE_DATE_FORMAT) .append("')"); - // 构建排序子句 orderByBuilder.append(" ORDER BY ").append(timeField); if (jezd != null && !jezd.isEmpty()) { @@ -254,154 +285,126 @@ public class ProductionDataSyncServiceImpl implements DataSyncService { orderByBuilder.append(" DESC"); params.add(startDate); params.add(endDate); + } else { + // 既没有分区字段也没有时间字段,查询全表 + LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!"); + conditionBuilder.append("1=1"); } - } else if (timeField != null && !timeField.isEmpty()) { - // 没有分区字段,但时间字段存在,使用时间范围条件 - // 仅时间范围条件(使用占位符) - conditionBuilder.append("TO_DATE(") - .append(timeField) - .append(", '") - .append(ORACLE_DATE_FORMAT) - .append("') BETWEEN TO_DATE(?, '") - .append(ORACLE_DATE_FORMAT) - .append("') AND TO_DATE(?, '") - .append(ORACLE_DATE_FORMAT) - .append("')"); - // 构建排序子句 - orderByBuilder.append(" ORDER BY ").append(timeField); - if (jezd != null && !jezd.isEmpty()) { - orderByBuilder.append(", ").append(jezd); - } - orderByBuilder.append(" DESC"); - params.add(startDate); - params.add(endDate); + + // 分页查询数据 + do { + // 使用Oracle分页语法 (12c+) + String querySql = "SELECT * FROM " + tableName + + " WHERE " + conditionBuilder.toString() + + orderByBuilder.toString(); // 添加排序子句 + + LOGGER.debug("执行Oracle查询: {}", querySql); + + List pageData; + // 根据条件类型执行查询 + if (partitionField != null && !partitionField.isEmpty() && + timeField != null && !timeField.isEmpty()) { + // 分区+时间范围查询 + pageData = rdsapi.getMaps(querySql, startDate, endDate); + } else if (timeField != null && !timeField.isEmpty()) { + // 仅时间范围查询 + pageData = rdsapi.getMaps(querySql, startDate, endDate); + } else { + // 无时间范围查询(仅分区或全表) + pageData = rdsapi.getMaps(querySql); + } + + if (pageData != null && !pageData.isEmpty()) { + // 直接处理当前页数据 + int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable); + totalRows += pageData.size(); + totalSuccess += successCount; + hasMore = pageData.size() == PAGE_SIZE; + pageNo++; + } else { + hasMore = false; + } + } while (hasMore); } else { - // 既没有分区字段也没有时间字段,查询全表 - LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!"); - conditionBuilder.append("1=1"); - } + // 构建查询条件 + StringBuilder conditionBuilder = new StringBuilder(); + // 修改点:分区字段和时间字段组合查询条件 + if (partitionField != null && !partitionField.isEmpty()) { + // 1. 查询最大分区值 + String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName; + List maxPartitionResult = rdsapi.getMaps(maxPartitionSql); - // 分页查询数据 - do { - // 使用Oracle分页语法 (12c+) - String querySql = "SELECT * FROM ( " + - "SELECT t.*, ROWNUM rn FROM (" + - "SELECT * FROM " + tableName + - " WHERE " + conditionBuilder.toString() + - orderByBuilder.toString() + // 添加排序子句 - ") t WHERE ROWNUM <= " + (pageNo * PAGE_SIZE) + - ") WHERE rn > " + ((pageNo - 1) * PAGE_SIZE); + if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) { + LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField); + return; + } - LOGGER.debug("执行Oracle查询: {}", querySql); + String maxPartition = maxPartitionResult.get(0).getString("max_partition"); + LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition); - List pageData; - // 根据条件类型执行查询 - if (partitionField != null && !partitionField.isEmpty() && - timeField != null && !timeField.isEmpty()) { - // 分区+时间范围查询 - pageData = rdsapi.getMaps(querySql, startDate, endDate); + // 添加分区条件 + conditionBuilder.append(partitionField) + .append(" = '") + .append(maxPartition) + .append("'"); + + // 如果时间字段存在,添加时间范围条件 + if (timeField != null && !timeField.isEmpty()) { + conditionBuilder.append(" AND ") + .append(timeField) + .append(" BETWEEN ? AND ? ORDER BY " + timeField + ""); + if (jezd != null && !jezd.isEmpty()) { + conditionBuilder.append(", " + jezd + " "); + } + conditionBuilder.append(" DESC"); + } } else if (timeField != null && !timeField.isEmpty()) { - // 仅时间范围查询 - pageData = rdsapi.getMaps(querySql, startDate, endDate); - } else { - // 无时间范围查询(仅分区或全表) - pageData = rdsapi.getMaps(querySql); - } - - if (pageData != null && !pageData.isEmpty()) { - // 直接处理当前页数据 - int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable); - totalRows += pageData.size(); - totalSuccess += successCount; - hasMore = pageData.size() == PAGE_SIZE; - pageNo++; - } else { - hasMore = false; - } - } while (hasMore); - }else { - // 构建查询条件 - StringBuilder conditionBuilder = new StringBuilder(); - // 修改点:分区字段和时间字段组合查询条件 - if (partitionField != null && !partitionField.isEmpty()) { - // 1. 查询最大分区值 - String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName; - List maxPartitionResult = rdsapi.getMaps(maxPartitionSql); - - if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) { - LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField); - return; - } - - String maxPartition = maxPartitionResult.get(0).getString("max_partition"); - LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition); - - // 添加分区条件 - conditionBuilder.append(partitionField) - .append(" = '") - .append(maxPartition) - .append("'"); - - // 如果时间字段存在,添加时间范围条件 - if (timeField != null && !timeField.isEmpty()) { - conditionBuilder.append(" AND ") - .append(timeField) - .append(" BETWEEN ? AND ? ORDER BY "+timeField+""); - if (jezd!=null && !jezd.isEmpty()){ - conditionBuilder.append(", "+jezd+" "); + // 没有分区字段,但时间字段存在,使用时间范围条件 + conditionBuilder.append(timeField) + .append(" BETWEEN ? AND ? ORDER BY " + timeField + ""); + if (jezd != null && !jezd.isEmpty()) { + conditionBuilder.append(", " + jezd + " "); } conditionBuilder.append(" DESC"); + } else { + // 既没有分区字段也没有时间字段,查询全表(实际应避免这种情况) + LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!"); + conditionBuilder.append("1=1"); } - } else if (timeField != null && !timeField.isEmpty()) { - // 没有分区字段,但时间字段存在,使用时间范围条件 - conditionBuilder.append(timeField) - .append(" BETWEEN ? AND ? ORDER BY "+timeField+""); - if (jezd!=null && !jezd.isEmpty()){ - conditionBuilder.append(", "+jezd+" "); - } - conditionBuilder.append(" DESC"); - } else { - // 既没有分区字段也没有时间字段,查询全表(实际应避免这种情况) - LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!"); - conditionBuilder.append("1=1"); + // 分页查询数据 + do { + String querySql = "SELECT * FROM " + tableName + + " WHERE " + conditionBuilder.toString(); + + LOGGER.debug("执行查询querySql: {}", querySql); + + List pageData; + // 根据条件类型执行查询 + if (partitionField != null && !partitionField.isEmpty() && + timeField != null && !timeField.isEmpty()) { + // 分区+时间范围查询 + pageData = rdsapi.getMaps(querySql, startDate, endDate); + } else if (timeField != null && !timeField.isEmpty()) { + // 仅时间范围查询 + pageData = rdsapi.getMaps(querySql, startDate, endDate); + } else { + // 无时间范围查询(仅分区或全表) + pageData = rdsapi.getMaps(querySql); + } + + if (pageData != null && !pageData.isEmpty()) { + // 直接处理当前页数据 + int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable); + totalRows += pageData.size(); + totalSuccess += successCount; + hasMore = pageData.size() == PAGE_SIZE; + pageNo++; + } else { + hasMore = false; + } + } while (hasMore); } - // 分页查询数据 - do { - String querySqls = "SELECT * FROM " + tableName + - " WHERE " + conditionBuilder.toString() +" "; -// " LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE; - LOGGER.debug("执行查询querySqls: {}", querySqls); - String querySql = SQLPagination.getPaginitionSQL(querySqls, (pageNo - 1) * PAGE_SIZE, PAGE_SIZE,DBname); - - LOGGER.debug("执行查询querySql: {}", querySql); - - List pageData; - // 根据条件类型执行查询 - if (partitionField != null && !partitionField.isEmpty() && - timeField != null && !timeField.isEmpty()) { - // 分区+时间范围查询 - pageData = rdsapi.getMaps(querySql, startDate, endDate); - } else if (timeField != null && !timeField.isEmpty()) { - // 仅时间范围查询 - pageData = rdsapi.getMaps(querySql, startDate, endDate); - } else { - // 无时间范围查询(仅分区或全表) - pageData = rdsapi.getMaps(querySql); - } - - if (pageData != null && !pageData.isEmpty()) { - // 直接处理当前页数据 - int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable); - totalRows += pageData.size(); - totalSuccess += successCount; - hasMore = pageData.size() == PAGE_SIZE; - pageNo++; - } else { - hasMore = false; - } - } while (hasMore); } - LOGGER.info("从表[{}]共查询到{}条数据,成功同步{}条数据", tableName, totalRows, totalSuccess); } catch (Exception e) { @@ -457,6 +460,56 @@ public class ProductionDataSyncServiceImpl implements DataSyncService { return successCount; } + /** + * 将时间范围拆分为多个区间,并确保结束时间包含时间部分以覆盖完整日期 + * @param startDate 开始时间(yyyy-MM-dd格式,时间部分为00:00:00) + * @param endDate 结束时间(yyyy-MM-dd格式,时间部分为00:00:00) + * @param daysInterval 间隔天数 + * @return 时间区间列表,每个区间的结束时间调整为23:59:59以确保覆盖完整日期 + */ + private static List splitTimeRange(Date startDate, Date endDate, int daysInterval) { + List ranges = new ArrayList<>(); + Calendar calendar = Calendar.getInstance(); + calendar.setTime(startDate); + + // 调整结束时间以包含完整日期 + Calendar endCal = Calendar.getInstance(); + endCal.setTime(endDate); + endCal.set(Calendar.HOUR_OF_DAY, 23); + endCal.set(Calendar.MINUTE, 59); + endCal.set(Calendar.SECOND, 59); + endCal.set(Calendar.MILLISECOND, 999); + Date adjustedEndDate = endCal.getTime(); + + while (calendar.getTime().before(adjustedEndDate)) { + Date rangeStart = calendar.getTime(); + + // 增加间隔天数 + calendar.add(Calendar.DAY_OF_MONTH, daysInterval); + Date potentialRangeEnd = calendar.getTime(); + + // 调整区间结束时间为当前日期的23:59:59 + Calendar rangeEndCal = Calendar.getInstance(); + rangeEndCal.setTime(potentialRangeEnd); + rangeEndCal.set(Calendar.HOUR_OF_DAY, 23); + rangeEndCal.set(Calendar.MINUTE, 59); + rangeEndCal.set(Calendar.SECOND, 59); + rangeEndCal.set(Calendar.MILLISECOND, 999); + Date rangeEnd = rangeEndCal.getTime(); + + // 如果调整后的区间结束时间超过最终结束时间,使用调整后的结束时间 + if (rangeEnd.after(adjustedEndDate)) { + rangeEnd = adjustedEndDate; + } + + ranges.add(new Date[]{rangeStart, rangeEnd}); + + // 准备下一个区间的开始时间:当前区间结束时间的下一天00:00:00 + calendar.add(Calendar.MILLISECOND, 1); + } + return ranges; + } + /** * 字段映射转换 * @param source 源数据记录 diff --git a/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/src/main/java/com/awspaas/user/apps/bnbm/datalinkup/service/impl/PurchaseDataSyncServiceImpl.java b/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/src/main/java/com/awspaas/user/apps/bnbm/datalinkup/service/impl/PurchaseDataSyncServiceImpl.java index f89fd8a..9aacf52 100644 --- a/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/src/main/java/com/awspaas/user/apps/bnbm/datalinkup/service/impl/PurchaseDataSyncServiceImpl.java +++ b/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/src/main/java/com/awspaas/user/apps/bnbm/datalinkup/service/impl/PurchaseDataSyncServiceImpl.java @@ -233,42 +233,73 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService { boolean hasMore; RDSAPI rdsapi = null; SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - String startDate = simpleDateFormat.format(startDated); - String endDate = simpleDateFormat.format(endDated); try { rdsapi = SDK.getCCAPI().getRDSAPI(ccId); DBUtils.SUPPLY supply = rdsapi.getSupply(); String DBname = supply.getName(); LOGGER.info("数据库为:{}",DBname); - if ("ORACLE".equalsIgnoreCase(DBname)){ - // 构建查询条件 - StringBuilder conditionBuilder = new StringBuilder(); - StringBuilder orderByBuilder = new StringBuilder(); // 用于构建排序子句 - List params = new ArrayList<>(); // 存储查询参数 - // 分区字段和时间字段组合查询条件 - if (partitionField != null && !partitionField.isEmpty()) { - // 1. 查询最大分区值 - String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName; - List maxPartitionResult = rdsapi.getMaps(maxPartitionSql); + // 计算时间范围并拆分为30天一组 + List timeRanges = splitTimeRange(startDated, endDated, 30); + LOGGER.info("时间范围拆分为 {} 个查询区间", timeRanges.size()); - if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) { - LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField); - return; - } + for (int i = 0; i < timeRanges.size(); i++) { + Date[] range = timeRanges.get(i); + String startDate = simpleDateFormat.format(range[0]); + String endDate = simpleDateFormat.format(range[1]); - String maxPartition = maxPartitionResult.get(0).getString("max_partition"); - LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition); + LOGGER.info("正在处理第 {} 个时间区间: {} 至 {}", i + 1, startDate, endDate); + if ("ORACLE".equalsIgnoreCase(DBname)) { + // 构建查询条件 + StringBuilder conditionBuilder = new StringBuilder(); + StringBuilder orderByBuilder = new StringBuilder(); // 用于构建排序子句 + List params = new ArrayList<>(); // 存储查询参数 - // 添加分区条件 - conditionBuilder.append(partitionField) - .append(" = '") - .append(maxPartition) - .append("'"); + // 分区字段和时间字段组合查询条件 + if (partitionField != null && !partitionField.isEmpty()) { + // 1. 查询最大分区值 + String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName; + List maxPartitionResult = rdsapi.getMaps(maxPartitionSql); - // 如果时间字段存在,添加时间范围条件 - if (timeField != null && !timeField.isEmpty()) { - conditionBuilder.append(" AND TO_DATE(") + if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) { + LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField); + return; + } + + String maxPartition = maxPartitionResult.get(0).getString("max_partition"); + LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition); + + // 添加分区条件 + conditionBuilder.append(partitionField) + .append(" = '") + .append(maxPartition) + .append("'"); + + // 如果时间字段存在,添加时间范围条件 + if (timeField != null && !timeField.isEmpty()) { + conditionBuilder.append(" AND TO_DATE(") + .append(timeField) + .append(", '") + .append(ORACLE_DATE_FORMAT) + .append("') BETWEEN TO_DATE(?, '") + .append(ORACLE_DATE_FORMAT) + .append("') AND TO_DATE(?, '") + .append(ORACLE_DATE_FORMAT) + .append("')"); + + // 构建排序子句 + orderByBuilder.append(" ORDER BY ").append(timeField); + if (jezd != null && !jezd.isEmpty()) { + orderByBuilder.append(", ").append(jezd); + } + orderByBuilder.append(" DESC"); + params.add(startDate); + params.add(endDate); + } + } else if (timeField != null && !timeField.isEmpty()) { + // 没有分区字段,但时间字段存在,使用时间范围条件 + // 仅时间范围条件(使用占位符) + conditionBuilder.append("TO_DATE(") .append(timeField) .append(", '") .append(ORACLE_DATE_FORMAT) @@ -277,7 +308,6 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService { .append("') AND TO_DATE(?, '") .append(ORACLE_DATE_FORMAT) .append("')"); - // 构建排序子句 orderByBuilder.append(" ORDER BY ").append(timeField); if (jezd != null && !jezd.isEmpty()) { @@ -286,163 +316,183 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService { orderByBuilder.append(" DESC"); params.add(startDate); params.add(endDate); + } else { + // 既没有分区字段也没有时间字段,查询全表 + LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!"); + conditionBuilder.append("1=1"); } - } else if (timeField != null && !timeField.isEmpty()) { - // 没有分区字段,但时间字段存在,使用时间范围条件 - // 仅时间范围条件(使用占位符) - conditionBuilder.append("TO_DATE(") - .append(timeField) - .append(", '") - .append(ORACLE_DATE_FORMAT) - .append("') BETWEEN TO_DATE(?, '") - .append(ORACLE_DATE_FORMAT) - .append("') AND TO_DATE(?, '") - .append(ORACLE_DATE_FORMAT) - .append("')"); - // 构建排序子句 - orderByBuilder.append(" ORDER BY ").append(timeField); - if (jezd != null && !jezd.isEmpty()) { - orderByBuilder.append(", ").append(jezd); - } - orderByBuilder.append(" DESC"); - params.add(startDate); - params.add(endDate); + + // 分页查询数据 + do { + String querySql = "SELECT * FROM " + tableName + + " WHERE " + conditionBuilder.toString() + + orderByBuilder.toString(); // 添加排序子句 + + LOGGER.debug("执行Oracle查询: {}", querySql); + + List pageData; + // 根据条件类型执行查询 + if (partitionField != null && !partitionField.isEmpty() && + timeField != null && !timeField.isEmpty()) { + // 分区+时间范围查询 + pageData = rdsapi.getMaps(querySql, startDate, endDate); + } else if (timeField != null && !timeField.isEmpty()) { + // 仅时间范围查询 + pageData = rdsapi.getMaps(querySql, startDate, endDate); + } else { + // 无时间范围查询(仅分区或全表) + pageData = rdsapi.getMaps(querySql); + } + + if (pageData != null && !pageData.isEmpty()) { + // 直接处理当前页数据 + int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable); + totalRows += pageData.size(); + totalSuccess += successCount; + hasMore = pageData.size() == PAGE_SIZE; + pageNo++; + } else { + hasMore = false; + } + } while (hasMore); } else { - // 既没有分区字段也没有时间字段,查询全表 - LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!"); - conditionBuilder.append("1=1"); - } + // 构建查询条件 + StringBuilder conditionBuilder = new StringBuilder(); + // 修改点:分区字段和时间字段组合查询条件 + if (partitionField != null && !partitionField.isEmpty()) { + // 1. 查询最大分区值 + String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName; + List maxPartitionResult = rdsapi.getMaps(maxPartitionSql); - // 分页查询数据 - do { - // 使用Oracle分页语法 (12c+) - String querySql = "SELECT * FROM ( " + - "SELECT t.*, ROWNUM rn FROM (" + - "SELECT * FROM " + tableName + - " WHERE " + conditionBuilder.toString() + - orderByBuilder.toString() + // 添加排序子句 - ") t WHERE ROWNUM <= " + (pageNo * PAGE_SIZE) + - ") WHERE rn > " + ((pageNo - 1) * PAGE_SIZE); + if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) { + LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField); + return; + } - LOGGER.debug("执行Oracle查询: {}", querySql); + String maxPartition = maxPartitionResult.get(0).getString("max_partition"); + LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition); - List pageData; - // 根据条件类型执行查询 - if (partitionField != null && !partitionField.isEmpty() && - timeField != null && !timeField.isEmpty()) { - // 分区+时间范围查询 - pageData = rdsapi.getMaps(querySql, startDate, endDate); + // 添加分区条件 + conditionBuilder.append(partitionField) + .append(" = '") + .append(maxPartition) + .append("'"); + + // 如果时间字段存在,添加时间范围条件 + if (timeField != null && !timeField.isEmpty()) { + conditionBuilder.append(" AND ") + .append(timeField) + .append(" BETWEEN ? AND ? ORDER BY " + timeField + ""); + if (jezd != null && !jezd.isEmpty()) { + conditionBuilder.append(", " + jezd + " "); + } + conditionBuilder.append(" DESC"); + } } else if (timeField != null && !timeField.isEmpty()) { - // 仅时间范围查询 - pageData = rdsapi.getMaps(querySql, startDate, endDate); - } else { - // 无时间范围查询(仅分区或全表) - pageData = rdsapi.getMaps(querySql); - } - - if (pageData != null && !pageData.isEmpty()) { - // 直接处理当前页数据 - int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable); - totalRows += pageData.size(); - totalSuccess += successCount; - hasMore = pageData.size() == PAGE_SIZE; - pageNo++; - } else { - hasMore = false; - } - } while (hasMore); - }else { - // 构建查询条件 - StringBuilder conditionBuilder = new StringBuilder(); - // 修改点:分区字段和时间字段组合查询条件 - if (partitionField != null && !partitionField.isEmpty()) { - // 1. 查询最大分区值 - String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName; - List maxPartitionResult = rdsapi.getMaps(maxPartitionSql); - - if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) { - LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField); - return; - } - - String maxPartition = maxPartitionResult.get(0).getString("max_partition"); - LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition); - - // 添加分区条件 - conditionBuilder.append(partitionField) - .append(" = '") - .append(maxPartition) - .append("'"); - - // 如果时间字段存在,添加时间范围条件 - if (timeField != null && !timeField.isEmpty()) { - conditionBuilder.append(" AND ") - .append(timeField) - .append(" BETWEEN ? AND ? ORDER BY "+timeField+""); - if (jezd!=null && !jezd.isEmpty()){ - conditionBuilder.append(", "+jezd+" "); + // 没有分区字段,但时间字段存在,使用时间范围条件 + conditionBuilder.append(timeField) + .append(" BETWEEN ? AND ? ORDER BY " + timeField + ""); + if (jezd != null && !jezd.isEmpty()) { + conditionBuilder.append(", " + jezd + " "); } conditionBuilder.append(" DESC"); + } else { + // 既没有分区字段也没有时间字段,查询全表(实际应避免这种情况) + LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!"); + conditionBuilder.append("1=1"); } - } else if (timeField != null && !timeField.isEmpty()) { - // 没有分区字段,但时间字段存在,使用时间范围条件 - conditionBuilder.append(timeField) - .append(" BETWEEN ? AND ? ORDER BY "+timeField+""); - if (jezd!=null && !jezd.isEmpty()){ - conditionBuilder.append(", "+jezd+" "); - } - conditionBuilder.append(" DESC"); - } else { - // 既没有分区字段也没有时间字段,查询全表(实际应避免这种情况) - LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!"); - conditionBuilder.append("1=1"); + // 分页查询数据 + do { + String querySql = "SELECT * FROM " + tableName + + " WHERE " + conditionBuilder.toString(); + + LOGGER.debug("执行查询querySql: {}", querySql); + + List pageData; + // 根据条件类型执行查询 + if (partitionField != null && !partitionField.isEmpty() && + timeField != null && !timeField.isEmpty()) { + // 分区+时间范围查询 + pageData = rdsapi.getMaps(querySql, startDate, endDate); + } else if (timeField != null && !timeField.isEmpty()) { + // 仅时间范围查询 + pageData = rdsapi.getMaps(querySql, startDate, endDate); + } else { + // 无时间范围查询(仅分区或全表) + pageData = rdsapi.getMaps(querySql); + } + + if (pageData != null && !pageData.isEmpty()) { + // 直接处理当前页数据 + int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable); + totalRows += pageData.size(); + totalSuccess += successCount; + hasMore = pageData.size() == PAGE_SIZE; + pageNo++; + } else { + hasMore = false; + } + } while (hasMore); } - // 分页查询数据 - do { - String querySqls = "SELECT * FROM " + tableName + - " WHERE " + conditionBuilder.toString() +" "; -// " LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE; - LOGGER.debug("执行查询querySqls: {}", querySqls); - String querySql = SQLPagination.getPaginitionSQL(querySqls, (pageNo - 1) * PAGE_SIZE, PAGE_SIZE,DBname); - - LOGGER.debug("执行查询querySql: {}", querySql); - - List pageData; - // 根据条件类型执行查询 - if (partitionField != null && !partitionField.isEmpty() && - timeField != null && !timeField.isEmpty()) { - // 分区+时间范围查询 - pageData = rdsapi.getMaps(querySql, startDate, endDate); - } else if (timeField != null && !timeField.isEmpty()) { - // 仅时间范围查询 - pageData = rdsapi.getMaps(querySql, startDate, endDate); - } else { - // 无时间范围查询(仅分区或全表) - pageData = rdsapi.getMaps(querySql); - } - - if (pageData != null && !pageData.isEmpty()) { - // 直接处理当前页数据 - int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable); - totalRows += pageData.size(); - totalSuccess += successCount; - hasMore = pageData.size() == PAGE_SIZE; - pageNo++; - } else { - hasMore = false; - } - } while (hasMore); } LOGGER.info("从表[{}]共查询到{}条数据,成功同步{}条数据", tableName, totalRows, totalSuccess); } catch (Exception e) { throw new RuntimeException("查询源表[" + tableName + "]数据失败: " + e.getMessage(), e); - }finally { - } } + /** + * 将时间范围拆分为多个区间,并确保结束时间包含时间部分以覆盖完整日期 + * @param startDate 开始时间(yyyy-MM-dd格式,时间部分为00:00:00) + * @param endDate 结束时间(yyyy-MM-dd格式,时间部分为00:00:00) + * @param daysInterval 间隔天数 + * @return 时间区间列表,每个区间的结束时间调整为23:59:59以确保覆盖完整日期 + */ + private static List splitTimeRange(Date startDate, Date endDate, int daysInterval) { + List ranges = new ArrayList<>(); + Calendar calendar = Calendar.getInstance(); + calendar.setTime(startDate); + + // 调整结束时间以包含完整日期 + Calendar endCal = Calendar.getInstance(); + endCal.setTime(endDate); + endCal.set(Calendar.HOUR_OF_DAY, 23); + endCal.set(Calendar.MINUTE, 59); + endCal.set(Calendar.SECOND, 59); + endCal.set(Calendar.MILLISECOND, 999); + Date adjustedEndDate = endCal.getTime(); + + while (calendar.getTime().before(adjustedEndDate)) { + Date rangeStart = calendar.getTime(); + + // 增加间隔天数 + calendar.add(Calendar.DAY_OF_MONTH, daysInterval); + Date potentialRangeEnd = calendar.getTime(); + + // 调整区间结束时间为当前日期的23:59:59 + Calendar rangeEndCal = Calendar.getInstance(); + rangeEndCal.setTime(potentialRangeEnd); + rangeEndCal.set(Calendar.HOUR_OF_DAY, 23); + rangeEndCal.set(Calendar.MINUTE, 59); + rangeEndCal.set(Calendar.SECOND, 59); + rangeEndCal.set(Calendar.MILLISECOND, 999); + Date rangeEnd = rangeEndCal.getTime(); + + // 如果调整后的区间结束时间超过最终结束时间,使用调整后的结束时间 + if (rangeEnd.after(adjustedEndDate)) { + rangeEnd = adjustedEndDate; + } + + ranges.add(new Date[]{rangeStart, rangeEnd}); + + // 准备下一个区间的开始时间:当前区间结束时间的下一天00:00:00 + calendar.add(Calendar.MILLISECOND, 1); + } + return ranges; + } + /** * 处理并插入数据到目标表 * @param sourceData 源数据列表 @@ -755,61 +805,51 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService { bo.set(key, map.get(key)); } } -// LOGGER.info("明细汇总:字段值set完毕-01"); - if (bo!=null) { - // 如果是采购_入库单汇总 刷新物料名称 - if (hzb.equals("BO_EU_DWD_ORDER_RKD_HZ")) { - String bkgs = bo.getString("BKGS"); - String wlmc = bo.getString("WLMC"); - String wlbm = bo.getString("WLBM"); - String jldw = bo.getString("JLDW"); - String wlfl = bo.getString("WLFL"); - Double djhyfs = bo.get("DJHYF",Double.class); - double djhyf = djhyfs != null ? djhyfs : 0.0; -// LOGGER.info("采购_入库单汇总,刷新物料名称------物料名称:{},板块公司:{},物料编码:{},入库单位:{},单价:{},物料分类:{}", wlmc, bkgs, wlbm, jldw, djhyf, wlfl); + // 如果是采购_入库单汇总 刷新物料名称 + if (hzb.equals("BO_EU_DWD_ORDER_RKD_HZ")) { + String bkgs = bo.getString("BKGS"); + String wlmc = bo.getString("WLMC"); + String wlbm = bo.getString("WLBM"); + String jldw = bo.getString("JLDW"); + String wlfl = bo.getString("WLFL"); + Double djhyfs = bo.get("DJHYF",Double.class); + double djhyf = djhyfs != null ? djhyfs : 0.0; - String newWlmc = ""; - if (StringUtils.isNotBlank(wlmc) || StringUtils.isNotBlank(wlbm) - || StringUtils.isNotBlank(jldw) - || StringUtils.isNotBlank(wlfl)) { - try { - newWlmc = purchaseUtil.materialClassificationFiltering(bkgs, wlmc, wlbm, jldw, djhyf, wlfl); - } catch (Exception e) { - e.printStackTrace(); - } - if (bkgs.equals("北新嘉宝莉")) { - if (wlmc.contains("盖") && djhyf < 5.0) { - continue; - } else if (wlmc.contains("桶身") && gaiSum > 0) { - Double djhyf1 = bo.get("DJHYF", Double.class); - bo.set("DJHYF", djhyf1 + gaiAverage); - gaiSum--; - } - } - bo.set("WLMC", newWlmc); - bo.set("OLDWLMC", wlmc); -// LOGGER.info("明细汇总:物料分类更新完毕-02"); - } else { -// LOGGER.info("明细汇总:物料名称为空不进行汇总-03"); + String newWlmc = ""; + if (StringUtils.isNotBlank(wlmc) || StringUtils.isNotBlank(wlbm) + || StringUtils.isNotBlank(jldw) + || StringUtils.isNotBlank(wlfl)) { + try { + newWlmc = purchaseUtil.materialClassificationFiltering(bkgs, wlmc, wlbm, jldw, djhyf, wlfl); + } catch (Exception e) { + e.printStackTrace(); } -// LOGGER.info("采购_入库单汇总,刷新物料名称------物料名称:{},板块公司:{},物料编码:{},入库单位:{},单价:{},物料分类:{}", wlmc, bkgs, wlbm, jldw, djhyf, wlfl); - - if ("泰山石膏".equals(bkgs)) { - // 泰山石膏处理入库单金额 入库数量*含税单价 - Double rksl = bo.get("RKSL", Double.class);// 入库数量 - Double hsdjhyf = bo.get("HSDJHYF", Double.class);// 含税单价(含运费) - // 处理可能为null的值,默认设为0.0 - double safeRksl = rksl != null ? rksl : 0.0; - double safeHsdjhyf = hsdjhyf != null ? hsdjhyf : 0.0; - - BigDecimal multiply = BigDecimal.valueOf(safeRksl).multiply(BigDecimal.valueOf(safeHsdjhyf)); - bo.set("DHJE", multiply.doubleValue()); + if (bkgs.equals("北新嘉宝莉")) { + if (wlmc.contains("盖") && djhyf < 5.0) { + continue; + } else if (wlmc.contains("桶身") && gaiSum > 0) { + Double djhyf1 = bo.get("DJHYF", Double.class); + bo.set("DJHYF", djhyf1 + gaiAverage); + gaiSum--; + } } + bo.set("WLMC", newWlmc); + bo.set("OLDWLMC", wlmc); } +// LOGGER.info("采购_入库单汇总,刷新物料名称------物料名称:{},板块公司:{},物料编码:{},入库单位:{},单价:{},物料分类:{}", wlmc, bkgs, wlbm, jldw, djhyf, wlfl); + if ("泰山石膏".equals(bkgs)) { + // 泰山石膏处理入库单金额 入库数量*含税单价 + Double rksl = bo.get("RKSL", Double.class);// 入库数量 + Double hsdjhyf = bo.get("HSDJHYF", Double.class);// 含税单价(含运费) + // 处理可能为null的值,默认设为0.0 + double safeRksl = rksl != null ? rksl : 0.0; + double safeHsdjhyf = hsdjhyf != null ? hsdjhyf : 0.0; - bos.add(bo); -// LOGGER.info("明细汇总:bo add完毕-04"); + BigDecimal multiply = BigDecimal.valueOf(safeRksl).multiply(BigDecimal.valueOf(safeHsdjhyf)); + bo.set("DHJE", multiply.doubleValue()); + } } + bos.add(bo); } SDK.getBOAPI().createDataBO(hzb, bos, UserContext.fromUID("admin")); diff --git a/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/src/main/java/com/awspaas/user/apps/bnbm/datalinkup/service/impl/SaleDataSyncServiceImpl.java b/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/src/main/java/com/awspaas/user/apps/bnbm/datalinkup/service/impl/SaleDataSyncServiceImpl.java index 0123f0f..2a0f14c 100644 --- a/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/src/main/java/com/awspaas/user/apps/bnbm/datalinkup/service/impl/SaleDataSyncServiceImpl.java +++ b/com.awspaas.user.apps.bnbm.datalinkup/com.awspaas.user.apps.bnbm.datalinkup/src/main/java/com/awspaas/user/apps/bnbm/datalinkup/service/impl/SaleDataSyncServiceImpl.java @@ -178,42 +178,73 @@ public class SaleDataSyncServiceImpl implements DataSyncService { boolean hasMore; RDSAPI rdsapi = null; SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - String startDate = simpleDateFormat.format(startDated); - String endDate = simpleDateFormat.format(endDated); try { rdsapi = SDK.getCCAPI().getRDSAPI(ccId); DBUtils.SUPPLY supply = rdsapi.getSupply(); String DBname = supply.getName(); LOGGER.info("数据库为:{}",DBname); - if ("ORACLE".equalsIgnoreCase(DBname)){ - // 构建查询条件 - StringBuilder conditionBuilder = new StringBuilder(); - StringBuilder orderByBuilder = new StringBuilder(); // 用于构建排序子句 - List params = new ArrayList<>(); // 存储查询参数 - // 分区字段和时间字段组合查询条件 - if (partitionField != null && !partitionField.isEmpty()) { - // 1. 查询最大分区值 - String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName; - List maxPartitionResult = rdsapi.getMaps(maxPartitionSql); + // 计算时间范围并拆分为30天一组 + List timeRanges = splitTimeRange(startDated, endDated, 30); + LOGGER.info("时间范围拆分为 {} 个查询区间", timeRanges.size()); - if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) { - LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField); - return; - } + for (int i = 0; i < timeRanges.size(); i++) { + Date[] range = timeRanges.get(i); + String startDate = simpleDateFormat.format(range[0]); + String endDate = simpleDateFormat.format(range[1]); - String maxPartition = maxPartitionResult.get(0).getString("max_partition"); - LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition); + LOGGER.info("正在处理第 {} 个时间区间: {} 至 {}", i + 1, startDate, endDate); + if ("ORACLE".equalsIgnoreCase(DBname)) { + // 构建查询条件 + StringBuilder conditionBuilder = new StringBuilder(); + StringBuilder orderByBuilder = new StringBuilder(); // 用于构建排序子句 + List params = new ArrayList<>(); // 存储查询参数 - // 添加分区条件 - conditionBuilder.append(partitionField) - .append(" = '") - .append(maxPartition) - .append("'"); + // 分区字段和时间字段组合查询条件 + if (partitionField != null && !partitionField.isEmpty()) { + // 1. 查询最大分区值 + String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName; + List maxPartitionResult = rdsapi.getMaps(maxPartitionSql); - // 如果时间字段存在,添加时间范围条件 - if (timeField != null && !timeField.isEmpty()) { - conditionBuilder.append(" AND TO_DATE(") + if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) { + LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField); + return; + } + + String maxPartition = maxPartitionResult.get(0).getString("max_partition"); + LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition); + + // 添加分区条件 + conditionBuilder.append(partitionField) + .append(" = '") + .append(maxPartition) + .append("'"); + + // 如果时间字段存在,添加时间范围条件 + if (timeField != null && !timeField.isEmpty()) { + conditionBuilder.append(" AND TO_DATE(") + .append(timeField) + .append(", '") + .append(ORACLE_DATE_FORMAT) + .append("') BETWEEN TO_DATE(?, '") + .append(ORACLE_DATE_FORMAT) + .append("') AND TO_DATE(?, '") + .append(ORACLE_DATE_FORMAT) + .append("')"); + + // 构建排序子句 + orderByBuilder.append(" ORDER BY ").append(timeField); + if (jezd != null && !jezd.isEmpty()) { + orderByBuilder.append(", ").append(jezd); + } + orderByBuilder.append(" DESC"); + params.add(startDate); + params.add(endDate); + } + } else if (timeField != null && !timeField.isEmpty()) { + // 没有分区字段,但时间字段存在,使用时间范围条件 + // 仅时间范围条件(使用占位符) + conditionBuilder.append("TO_DATE(") .append(timeField) .append(", '") .append(ORACLE_DATE_FORMAT) @@ -222,7 +253,6 @@ public class SaleDataSyncServiceImpl implements DataSyncService { .append("') AND TO_DATE(?, '") .append(ORACLE_DATE_FORMAT) .append("')"); - // 构建排序子句 orderByBuilder.append(" ORDER BY ").append(timeField); if (jezd != null && !jezd.isEmpty()) { @@ -231,160 +261,131 @@ public class SaleDataSyncServiceImpl implements DataSyncService { orderByBuilder.append(" DESC"); params.add(startDate); params.add(endDate); + } else { + // 既没有分区字段也没有时间字段,查询全表 + LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!"); + conditionBuilder.append("1=1"); } - } else if (timeField != null && !timeField.isEmpty()) { - // 没有分区字段,但时间字段存在,使用时间范围条件 - // 仅时间范围条件(使用占位符) - conditionBuilder.append("TO_DATE(") - .append(timeField) - .append(", '") - .append(ORACLE_DATE_FORMAT) - .append("') BETWEEN TO_DATE(?, '") - .append(ORACLE_DATE_FORMAT) - .append("') AND TO_DATE(?, '") - .append(ORACLE_DATE_FORMAT) - .append("')"); - // 构建排序子句 - orderByBuilder.append(" ORDER BY ").append(timeField); - if (jezd != null && !jezd.isEmpty()) { - orderByBuilder.append(", ").append(jezd); - } - orderByBuilder.append(" DESC"); - params.add(startDate); - params.add(endDate); + + // 分页查询数据 + do { + // 使用Oracle分页语法 (12c+) + String querySql = "SELECT * FROM " + tableName + + " WHERE " + conditionBuilder.toString() + + orderByBuilder.toString(); // 添加排序子句 + + LOGGER.debug("执行Oracle查询: {}", querySql); + + List pageData; + // 根据条件类型执行查询 + if (partitionField != null && !partitionField.isEmpty() && + timeField != null && !timeField.isEmpty()) { + // 分区+时间范围查询 + pageData = rdsapi.getMaps(querySql, startDate, endDate); + } else if (timeField != null && !timeField.isEmpty()) { + // 仅时间范围查询 + pageData = rdsapi.getMaps(querySql, startDate, endDate); + } else { + // 无时间范围查询(仅分区或全表) + pageData = rdsapi.getMaps(querySql); + } + + if (pageData != null && !pageData.isEmpty()) { + // 直接处理当前页数据 + int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable); + totalRows += pageData.size(); + totalSuccess += successCount; + hasMore = pageData.size() == PAGE_SIZE; + pageNo++; + } else { + hasMore = false; + } + } while (hasMore); } else { - // 既没有分区字段也没有时间字段,查询全表 - LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!"); - conditionBuilder.append("1=1"); - } + // 构建查询条件 + StringBuilder conditionBuilder = new StringBuilder(); + // 修改点:分区字段和时间字段组合查询条件 + if (partitionField != null && !partitionField.isEmpty()) { + // 1. 查询最大分区值 + String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName; + List maxPartitionResult = rdsapi.getMaps(maxPartitionSql); - // 分页查询数据 - do { - // 使用Oracle分页语法 (12c+) - String querySql = "SELECT * FROM ( " + - "SELECT t.*, ROWNUM rn FROM (" + - "SELECT * FROM " + tableName + - " WHERE " + conditionBuilder.toString() + - orderByBuilder.toString() + // 添加排序子句 - ") t WHERE ROWNUM <= " + (pageNo * PAGE_SIZE) + - ") WHERE rn > " + ((pageNo - 1) * PAGE_SIZE); + if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) { + LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField); + return; + } - LOGGER.debug("执行Oracle查询: {}", querySql); + String maxPartition = maxPartitionResult.get(0).getString("max_partition"); + LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition); - List pageData; - // 根据条件类型执行查询 - if (partitionField != null && !partitionField.isEmpty() && - timeField != null && !timeField.isEmpty()) { - // 分区+时间范围查询 - pageData = rdsapi.getMaps(querySql, startDate, endDate); + // 添加分区条件 + conditionBuilder.append(partitionField) + .append(" = '") + .append(maxPartition) + .append("'"); + + // 如果时间字段存在,添加时间范围条件 + if (timeField != null && !timeField.isEmpty()) { + conditionBuilder.append(" AND ") + .append(timeField) + .append(" BETWEEN ? AND ? ORDER BY " + timeField + ""); + if (jezd != null && !jezd.isEmpty()) { + conditionBuilder.append(", " + jezd + " "); + } + conditionBuilder.append(" DESC"); + } } else if (timeField != null && !timeField.isEmpty()) { - // 仅时间范围查询 - pageData = rdsapi.getMaps(querySql, startDate, endDate); - } else { - // 无时间范围查询(仅分区或全表) - pageData = rdsapi.getMaps(querySql); - } - - if (pageData != null && !pageData.isEmpty()) { - // 直接处理当前页数据 - int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable); - totalRows += pageData.size(); - totalSuccess += successCount; - hasMore = pageData.size() == PAGE_SIZE; - pageNo++; - } else { - hasMore = false; - } - } while (hasMore); - }else { - // 构建查询条件 - StringBuilder conditionBuilder = new StringBuilder(); - // 修改点:分区字段和时间字段组合查询条件 - if (partitionField != null && !partitionField.isEmpty()) { - // 1. 查询最大分区值 - String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName; - List maxPartitionResult = rdsapi.getMaps(maxPartitionSql); - - if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) { - LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField); - return; - } - - String maxPartition = maxPartitionResult.get(0).getString("max_partition"); - LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition); - - // 添加分区条件 - conditionBuilder.append(partitionField) - .append(" = '") - .append(maxPartition) - .append("'"); - - // 如果时间字段存在,添加时间范围条件 - if (timeField != null && !timeField.isEmpty()) { - conditionBuilder.append(" AND ") - .append(timeField) - .append(" BETWEEN ? AND ? ORDER BY "+timeField+""); - if (jezd!=null && !jezd.isEmpty()){ - conditionBuilder.append(", "+jezd+" "); + // 没有分区字段,但时间字段存在,使用时间范围条件 + conditionBuilder.append(timeField) + .append(" BETWEEN ? AND ? ORDER BY " + timeField + ""); + if (jezd != null && !jezd.isEmpty()) { + conditionBuilder.append(", " + jezd + " "); } conditionBuilder.append(" DESC"); + } else { + // 既没有分区字段也没有时间字段,查询全表(实际应避免这种情况) + LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!"); + conditionBuilder.append("1=1"); } - } else if (timeField != null && !timeField.isEmpty()) { - // 没有分区字段,但时间字段存在,使用时间范围条件 - conditionBuilder.append(timeField) - .append(" BETWEEN ? AND ? ORDER BY "+timeField+""); - if (jezd!=null && !jezd.isEmpty()){ - conditionBuilder.append(", "+jezd+" "); - } - conditionBuilder.append(" DESC"); - } else { - // 既没有分区字段也没有时间字段,查询全表(实际应避免这种情况) - LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!"); - conditionBuilder.append("1=1"); + // 分页查询数据 + do { + String querySql = "SELECT * FROM " + tableName + + " WHERE " + conditionBuilder.toString(); + + LOGGER.info("执行查询querySql: {}", querySql); + + List pageData; + // 根据条件类型执行查询 + if (partitionField != null && !partitionField.isEmpty() && + timeField != null && !timeField.isEmpty()) { + // 分区+时间范围查询 + pageData = rdsapi.getMaps(querySql, startDate, endDate); + } else if (timeField != null && !timeField.isEmpty()) { + // 仅时间范围查询 + pageData = rdsapi.getMaps(querySql, startDate, endDate); + } else { + // 无时间范围查询(仅分区或全表) + pageData = rdsapi.getMaps(querySql); + } + + if (pageData != null && !pageData.isEmpty()) { + // 直接处理当前页数据 + int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable); + totalRows += pageData.size(); + totalSuccess += successCount; + hasMore = pageData.size() == PAGE_SIZE; + pageNo++; + } else { + hasMore = false; + } + } while (hasMore); } - // 分页查询数据 - do { - String querySqls = "SELECT * FROM " + tableName + - " WHERE " + conditionBuilder.toString() +" "; -// " LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE; - LOGGER.info("执行查询querySqls: {}", querySqls); - String querySql = SQLPagination.getPaginitionSQL(querySqls, (pageNo - 1) * PAGE_SIZE, PAGE_SIZE,DBname); - - LOGGER.info("执行查询querySql: {}", querySql); - - List pageData; - // 根据条件类型执行查询 - if (partitionField != null && !partitionField.isEmpty() && - timeField != null && !timeField.isEmpty()) { - // 分区+时间范围查询 - pageData = rdsapi.getMaps(querySql, startDate, endDate); - } else if (timeField != null && !timeField.isEmpty()) { - // 仅时间范围查询 - pageData = rdsapi.getMaps(querySql, startDate, endDate); - } else { - // 无时间范围查询(仅分区或全表) - pageData = rdsapi.getMaps(querySql); - } - - if (pageData != null && !pageData.isEmpty()) { - // 直接处理当前页数据 - int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable); - totalRows += pageData.size(); - totalSuccess += successCount; - hasMore = pageData.size() == PAGE_SIZE; - pageNo++; - } else { - hasMore = false; - } - } while (hasMore); } LOGGER.info("从表[{}]共查询到{}条数据,成功同步{}条数据", tableName, totalRows, totalSuccess); } catch (Exception e) { throw new RuntimeException("查询源表[" + tableName + "]数据失败: " + e.getMessage(), e); - }finally { - } } @@ -441,6 +442,56 @@ public class SaleDataSyncServiceImpl implements DataSyncService { } } + /** + * 将时间范围拆分为多个区间,并确保结束时间包含时间部分以覆盖完整日期 + * @param startDate 开始时间(yyyy-MM-dd格式,时间部分为00:00:00) + * @param endDate 结束时间(yyyy-MM-dd格式,时间部分为00:00:00) + * @param daysInterval 间隔天数 + * @return 时间区间列表,每个区间的结束时间调整为23:59:59以确保覆盖完整日期 + */ + private static List splitTimeRange(Date startDate, Date endDate, int daysInterval) { + List ranges = new ArrayList<>(); + Calendar calendar = Calendar.getInstance(); + calendar.setTime(startDate); + + // 调整结束时间以包含完整日期 + Calendar endCal = Calendar.getInstance(); + endCal.setTime(endDate); + endCal.set(Calendar.HOUR_OF_DAY, 23); + endCal.set(Calendar.MINUTE, 59); + endCal.set(Calendar.SECOND, 59); + endCal.set(Calendar.MILLISECOND, 999); + Date adjustedEndDate = endCal.getTime(); + + while (calendar.getTime().before(adjustedEndDate)) { + Date rangeStart = calendar.getTime(); + + // 增加间隔天数 + calendar.add(Calendar.DAY_OF_MONTH, daysInterval); + Date potentialRangeEnd = calendar.getTime(); + + // 调整区间结束时间为当前日期的23:59:59 + Calendar rangeEndCal = Calendar.getInstance(); + rangeEndCal.setTime(potentialRangeEnd); + rangeEndCal.set(Calendar.HOUR_OF_DAY, 23); + rangeEndCal.set(Calendar.MINUTE, 59); + rangeEndCal.set(Calendar.SECOND, 59); + rangeEndCal.set(Calendar.MILLISECOND, 999); + Date rangeEnd = rangeEndCal.getTime(); + + // 如果调整后的区间结束时间超过最终结束时间,使用调整后的结束时间 + if (rangeEnd.after(adjustedEndDate)) { + rangeEnd = adjustedEndDate; + } + + ranges.add(new Date[]{rangeStart, rangeEnd}); + + // 准备下一个区间的开始时间:当前区间结束时间的下一天00:00:00 + calendar.add(Calendar.MILLISECOND, 1); + } + return ranges; + } + /** * 处理并插入数据到目标表 * @param sourceData 源数据列表