1、增加生产同步数据逻辑
2、同步数据分页方法修改使用工具类 3、同步数据日志输出增加板块
This commit is contained in:
parent
a74b12f055
commit
afc1ad251b
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -124,10 +124,13 @@ public class DataLinkUpController {
|
|||||||
String ssyw = config.getString("SSYW");
|
String ssyw = config.getString("SSYW");
|
||||||
if ("销售".equals(ssyw)) {
|
if ("销售".equals(ssyw)) {
|
||||||
syncService = new SaleDataSyncServiceImpl();
|
syncService = new SaleDataSyncServiceImpl();
|
||||||
LOGGER.info("销售的接口");
|
LOGGER.info("同步数据——销售的接口");
|
||||||
}else {
|
}else if ("运营".equals(ssyw)) {
|
||||||
|
syncService = new ProductionDataSyncServiceImpl();
|
||||||
|
LOGGER.info("同步数据——生产运营的接口");
|
||||||
|
} else {
|
||||||
syncService = new PurchaseDataSyncServiceImpl();
|
syncService = new PurchaseDataSyncServiceImpl();
|
||||||
LOGGER.info("采购的接口");
|
LOGGER.info("同步数据——采购的接口");
|
||||||
}
|
}
|
||||||
String timeField = config.getString("SJZD");
|
String timeField = config.getString("SJZD");
|
||||||
String ccId = config.getString("CC_ID");
|
String ccId = config.getString("CC_ID");
|
||||||
@ -181,80 +184,6 @@ public class DataLinkUpController {
|
|||||||
* @return 响应结果
|
* @return 响应结果
|
||||||
*/
|
*/
|
||||||
@Mapping("com.awspaas.user.apps.bnbm.datalinkup.controller.DataLinkUpController_calculateSummary")
|
@Mapping("com.awspaas.user.apps.bnbm.datalinkup.controller.DataLinkUpController_calculateSummary")
|
||||||
// public ResponseObject calculateSummary(String dataStr, String sid, String formattedDate) {
|
|
||||||
// long methodStartTime = System.currentTimeMillis();
|
|
||||||
// LOGGER.info("【开始】数据计算汇总操作,开始时间:{}", new Date(methodStartTime));
|
|
||||||
// ResponseObject ro = ResponseObject.newOkResponse();
|
|
||||||
// JSONArray configArray = new JSONArray(dataStr);
|
|
||||||
//
|
|
||||||
// Calendar cal = Calendar.getInstance();
|
|
||||||
// cal.add(Calendar.DATE, -1); // 昨天
|
|
||||||
// Date endDate = cal.getTime();
|
|
||||||
//
|
|
||||||
// // 解析formattedDate为日期对象并计算时间范围
|
|
||||||
// DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
|
|
||||||
// LocalDate date = LocalDate.parse(formattedDate, dateFormatter);
|
|
||||||
// LocalDateTime startDateTime = date.atStartOfDay(); // 当天00:00:00
|
|
||||||
// Date startDate = Date.from(startDateTime.atZone(ZoneId.systemDefault()).toInstant());
|
|
||||||
// DataSummaryService summaryService = null;
|
|
||||||
// SaleCountDimensionImpl saleCountDimension = null;
|
|
||||||
// try {
|
|
||||||
// LOGGER.info("开始执行销售数据多维度汇总计算");
|
|
||||||
//
|
|
||||||
// DateRange dateRange = new DateRange();
|
|
||||||
// dateRange.setStartDate(startDate);
|
|
||||||
// dateRange.setEndDate(endDate);
|
|
||||||
// LOGGER.info("汇总计算开始时间为:{},结束时间为:{}",startDate,endDate);
|
|
||||||
// // 2. 执行汇总计算
|
|
||||||
// for (int i = 0; i < configArray.length(); i++) {
|
|
||||||
// JSONObject config = configArray.getJSONObject(i);
|
|
||||||
// String timeField = config.getString("SJZD");
|
|
||||||
// String ccId = config.getString("CC_ID");
|
|
||||||
// String targetTable = config.getString("LDB");
|
|
||||||
// String partitionField = config.getString("FQBZD");
|
|
||||||
// String tableName = config.getString("TBB");
|
|
||||||
// String bkgs = config.getString("SSBK");
|
|
||||||
// String ssyw = config.getString("SSYW");
|
|
||||||
//
|
|
||||||
// // 1. 创建数据汇总服务实例
|
|
||||||
// if ("销售".equals(ssyw)) {
|
|
||||||
// summaryService = new SaleDataSummaryServiceImpl();
|
|
||||||
// saleCountDimension = new SaleCountDimensionImpl();
|
|
||||||
// LOGGER.info("销售销售的接口");
|
|
||||||
// }else {
|
|
||||||
// summaryService = new PurchaseDataSummaryServiceImpl();
|
|
||||||
// LOGGER.info("采购销售的接口");
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// List<RowMap> bkgsMaps = DBSql.getMaps("SELECT BKGS FROM " + targetTable + " GROUP BY BKGS");
|
|
||||||
// if (bkgsMaps!=null) {
|
|
||||||
// for (RowMap map : bkgsMaps) {
|
|
||||||
// BO bo = new BO();
|
|
||||||
// bo.set("BKGS", map.getString("BKGS"));
|
|
||||||
//// summaryService.calculateSummary(dateRange, bo);
|
|
||||||
//// if (saleCountDimension!=null){
|
|
||||||
//// //计算销售的维度
|
|
||||||
//// LOGGER.info("======== 开始执行销售数据汇总计算 ========");
|
|
||||||
// saleCountDimension.calculateSummary(dateRange, bo);
|
|
||||||
//// LOGGER.info("======== 销售数据汇总计算完成 ========");
|
|
||||||
//// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// ro.put("success", true);
|
|
||||||
// ro.put("message", "数据汇总计算完成");
|
|
||||||
// LOGGER.info("销售数据多维度汇总计算完成");
|
|
||||||
// } catch (Exception e) {
|
|
||||||
// String errorMsg = "数据汇总计算失败: " + e.getMessage();
|
|
||||||
// LOGGER.error(errorMsg, e);
|
|
||||||
// ro.put("success", false);
|
|
||||||
// ro.put("message", errorMsg);
|
|
||||||
// }
|
|
||||||
// long methodEndTime = System.currentTimeMillis();
|
|
||||||
// LOGGER.info("【完成】数据计算汇总操作,总耗时:{}ms", methodEndTime - methodStartTime);
|
|
||||||
// return ro;
|
|
||||||
// }
|
|
||||||
public ResponseObject calculateSummary(String dataStr, String sid, String formattedDate) {
|
public ResponseObject calculateSummary(String dataStr, String sid, String formattedDate) {
|
||||||
long methodStartTime = System.currentTimeMillis();
|
long methodStartTime = System.currentTimeMillis();
|
||||||
LOGGER.info("【开始】数据计算汇总操作,开始时间:{}", new Date(methodStartTime));
|
LOGGER.info("【开始】数据计算汇总操作,开始时间:{}", new Date(methodStartTime));
|
||||||
@ -397,6 +326,80 @@ public class DataLinkUpController {
|
|||||||
LOGGER.info("【完成】数据计算汇总操作,总耗时:{}ms", methodEndTime - methodStartTime);
|
LOGGER.info("【完成】数据计算汇总操作,总耗时:{}ms", methodEndTime - methodStartTime);
|
||||||
return ro;
|
return ro;
|
||||||
}
|
}
|
||||||
|
// public ResponseObject calculateSummary(String dataStr, String sid, String formattedDate) {
|
||||||
|
// long methodStartTime = System.currentTimeMillis();
|
||||||
|
// LOGGER.info("【开始】数据计算汇总操作,开始时间:{}", new Date(methodStartTime));
|
||||||
|
// ResponseObject ro = ResponseObject.newOkResponse();
|
||||||
|
// JSONArray configArray = new JSONArray(dataStr);
|
||||||
|
//
|
||||||
|
// Calendar cal = Calendar.getInstance();
|
||||||
|
// cal.add(Calendar.DATE, -1); // 昨天
|
||||||
|
// Date endDate = cal.getTime();
|
||||||
|
//
|
||||||
|
// // 解析formattedDate为日期对象并计算时间范围
|
||||||
|
// DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
|
||||||
|
// LocalDate date = LocalDate.parse(formattedDate, dateFormatter);
|
||||||
|
// LocalDateTime startDateTime = date.atStartOfDay(); // 当天00:00:00
|
||||||
|
// Date startDate = Date.from(startDateTime.atZone(ZoneId.systemDefault()).toInstant());
|
||||||
|
// DataSummaryService summaryService = null;
|
||||||
|
// SaleCountDimensionImpl saleCountDimension = null;
|
||||||
|
// try {
|
||||||
|
// LOGGER.info("开始执行销售数据多维度汇总计算");
|
||||||
|
//
|
||||||
|
// DateRange dateRange = new DateRange();
|
||||||
|
// dateRange.setStartDate(startDate);
|
||||||
|
// dateRange.setEndDate(endDate);
|
||||||
|
// LOGGER.info("汇总计算开始时间为:{},结束时间为:{}",startDate,endDate);
|
||||||
|
// // 2. 执行汇总计算
|
||||||
|
// for (int i = 0; i < configArray.length(); i++) {
|
||||||
|
// JSONObject config = configArray.getJSONObject(i);
|
||||||
|
// String timeField = config.getString("SJZD");
|
||||||
|
// String ccId = config.getString("CC_ID");
|
||||||
|
// String targetTable = config.getString("LDB");
|
||||||
|
// String partitionField = config.getString("FQBZD");
|
||||||
|
// String tableName = config.getString("TBB");
|
||||||
|
// String bkgs = config.getString("SSBK");
|
||||||
|
// String ssyw = config.getString("SSYW");
|
||||||
|
//
|
||||||
|
// // 1. 创建数据汇总服务实例
|
||||||
|
// if ("销售".equals(ssyw)) {
|
||||||
|
// summaryService = new SaleDataSummaryServiceImpl();
|
||||||
|
// saleCountDimension = new SaleCountDimensionImpl();
|
||||||
|
// LOGGER.info("销售销售的接口");
|
||||||
|
// }else {
|
||||||
|
// summaryService = new PurchaseDataSummaryServiceImpl();
|
||||||
|
// LOGGER.info("采购销售的接口");
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// List<RowMap> bkgsMaps = DBSql.getMaps("SELECT BKGS FROM " + targetTable + " GROUP BY BKGS");
|
||||||
|
// if (bkgsMaps!=null) {
|
||||||
|
// for (RowMap map : bkgsMaps) {
|
||||||
|
// BO bo = new BO();
|
||||||
|
// bo.set("BKGS", map.getString("BKGS"));
|
||||||
|
//// summaryService.calculateSummary(dateRange, bo);
|
||||||
|
//// if (saleCountDimension!=null){
|
||||||
|
//// //计算销售的维度
|
||||||
|
//// LOGGER.info("======== 开始执行销售数据汇总计算 ========");
|
||||||
|
// saleCountDimension.calculateSummary(dateRange, bo);
|
||||||
|
//// LOGGER.info("======== 销售数据汇总计算完成 ========");
|
||||||
|
//// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// ro.put("success", true);
|
||||||
|
// ro.put("message", "数据汇总计算完成");
|
||||||
|
// LOGGER.info("销售数据多维度汇总计算完成");
|
||||||
|
// } catch (Exception e) {
|
||||||
|
// String errorMsg = "数据汇总计算失败: " + e.getMessage();
|
||||||
|
// LOGGER.error(errorMsg, e);
|
||||||
|
// ro.put("success", false);
|
||||||
|
// ro.put("message", errorMsg);
|
||||||
|
// }
|
||||||
|
// long methodEndTime = System.currentTimeMillis();
|
||||||
|
// LOGGER.info("【完成】数据计算汇总操作,总耗时:{}ms", methodEndTime - methodStartTime);
|
||||||
|
// return ro;
|
||||||
|
// }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 各板块数据汇总
|
* 各板块数据汇总
|
||||||
@ -449,8 +452,14 @@ public class DataLinkUpController {
|
|||||||
}else {
|
}else {
|
||||||
hzb = "BO_EU_BNBM_DATALINKUP_XS_YSL";
|
hzb = "BO_EU_BNBM_DATALINKUP_XS_YSL";
|
||||||
}
|
}
|
||||||
LOGGER.info("销售销售的接口");
|
LOGGER.info("汇总板块数据——销售的接口");
|
||||||
}else {
|
} else if ("运营".equals(ssyw)) {
|
||||||
|
dataSyncService = new ProductionDataSyncServiceImpl();
|
||||||
|
if ("原材料".equals(tablename)){
|
||||||
|
hzb = "BO_EU_DWS_ORDER_YCLXH_HZ";
|
||||||
|
}
|
||||||
|
LOGGER.info("汇总板块数据——运营的接口");
|
||||||
|
} else {
|
||||||
dataSyncService = new PurchaseDataSyncServiceImpl();
|
dataSyncService = new PurchaseDataSyncServiceImpl();
|
||||||
if ("采购单".equals(tablename)){
|
if ("采购单".equals(tablename)){
|
||||||
hzb = "BO_EU_DWD_ORDER_CGDD_HZ";
|
hzb = "BO_EU_DWD_ORDER_CGDD_HZ";
|
||||||
@ -461,7 +470,7 @@ public class DataLinkUpController {
|
|||||||
} else {
|
} else {
|
||||||
hzb = "BO_EU_DWD_ORDER_RKD_HZ";
|
hzb = "BO_EU_DWD_ORDER_RKD_HZ";
|
||||||
}
|
}
|
||||||
LOGGER.info("采购销售的接口");
|
LOGGER.info("汇总板块数据——采购的接口");
|
||||||
}
|
}
|
||||||
|
|
||||||
LOGGER.info("即将同步的数据:{}",plate);
|
LOGGER.info("即将同步的数据:{}",plate);
|
||||||
|
|||||||
@ -0,0 +1,690 @@
|
|||||||
|
package com.awspaas.user.apps.bnbm.datalinkup.service.impl;
|
||||||
|
|
||||||
|
import com.actionsoft.bpms.bo.engine.BO;
|
||||||
|
import com.actionsoft.bpms.commons.database.DBUtils;
|
||||||
|
import com.actionsoft.bpms.commons.database.RowMap;
|
||||||
|
import com.actionsoft.bpms.commons.pagination.SQLPagination;
|
||||||
|
import com.actionsoft.bpms.server.UserContext;
|
||||||
|
import com.actionsoft.bpms.util.DBSql;
|
||||||
|
import com.actionsoft.bpms.util.UtilDate;
|
||||||
|
import com.actionsoft.sdk.local.SDK;
|
||||||
|
import com.actionsoft.sdk.local.api.cc.RDSAPI;
|
||||||
|
import com.awspaas.user.apps.bnbm.datalinkup.entity.DateRange;
|
||||||
|
import com.awspaas.user.apps.bnbm.datalinkup.service.DataSyncService;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.math.BigDecimal;
|
||||||
|
import java.math.RoundingMode;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @ClassName: ProductionDataSyncServiceImpl
|
||||||
|
* @Description: 生产实现类
|
||||||
|
* @date: 2025/8/25 13:41
|
||||||
|
* @Blog: https://
|
||||||
|
*/
|
||||||
|
public class ProductionDataSyncServiceImpl implements DataSyncService {
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(ProductionDataSyncServiceImpl.class);
|
||||||
|
/**
|
||||||
|
* 时间范围常量:同步最近60天数据(不包括当天)
|
||||||
|
*/
|
||||||
|
private static final int DAYS_BACK = Integer.parseInt(SDK.getAppAPI().getProperty("com.awspaas.user.apps.bnbm.datalinkup", "days_back"));
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 增加分页大小常量
|
||||||
|
*/
|
||||||
|
private static final int PAGE_SIZE = 1000; // 每页查询1000条记录
|
||||||
|
|
||||||
|
private static final String ORACLE_DATE_FORMAT = "YYYY-MM-DD HH24:MI:SS";
|
||||||
|
@Override
|
||||||
|
public ArrayList<DateRange> syncDataByConfigs(List<BO> configs) {
|
||||||
|
ArrayList<DateRange> list = new ArrayList<>();
|
||||||
|
if (configs.isEmpty()) {
|
||||||
|
LOGGER.info("未找到有效的同步配置");
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 按所属板块(SSBK)分组配置
|
||||||
|
Map<String, List<BO>> configsByPlate = configs.stream()
|
||||||
|
.collect(Collectors.groupingBy(bo -> bo.getString("SSBK")));
|
||||||
|
// 遍历处理每个板块的配置
|
||||||
|
for (Map.Entry<String, List<BO>> entry : configsByPlate.entrySet()) {
|
||||||
|
String plate = entry.getKey();
|
||||||
|
List<BO> plateConfigs = entry.getValue();
|
||||||
|
LOGGER.info("处理板块【{}】的{}条配置", plate, plateConfigs.size());
|
||||||
|
|
||||||
|
// 处理当前板块的每条配置
|
||||||
|
boolean connectionFailed = false;
|
||||||
|
String errorMsg = "";
|
||||||
|
for (BO mainConfig : plateConfigs) {
|
||||||
|
try {
|
||||||
|
DateRange dateRange = processMainConfig(mainConfig);
|
||||||
|
list.add(dateRange);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOGGER.error("处理配置失败 [板块={}, BindID={}]: {}",
|
||||||
|
plate, mainConfig.getString("BINDID"), e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DateRange processMainConfig(BO mainConfig) {
|
||||||
|
String bindId = mainConfig.getString("BINDID");
|
||||||
|
String tableName = mainConfig.getString("TBB");
|
||||||
|
String timeField = mainConfig.getString("SJZD");
|
||||||
|
String targetTable = mainConfig.getString("LDB");
|
||||||
|
String ccId = mainConfig.getString("CC_ID");
|
||||||
|
String partitionField = mainConfig.getString("FQBZD");
|
||||||
|
String bkgs = mainConfig.getString("BKGS");
|
||||||
|
DateRange dateRange = new DateRange();
|
||||||
|
|
||||||
|
LOGGER.info("处理配置:BindID={}, 源表={}, 目标表={}, CC_ID={}, 时间字段={}, 分区字段配置={}",
|
||||||
|
bindId, tableName, targetTable, ccId,timeField,partitionField);
|
||||||
|
|
||||||
|
// 查询子表字段映射配置
|
||||||
|
List<BO> fieldMappings = SDK.getBOAPI()
|
||||||
|
.query("BO_EU_BNBM_DATALINKUP_SJGTPZ_SUB")
|
||||||
|
.addQuery("BINDID =", bindId)
|
||||||
|
.list();
|
||||||
|
|
||||||
|
if (fieldMappings.isEmpty()) {
|
||||||
|
LOGGER.warn("未找到BindID={}的字段映射配置", bindId);
|
||||||
|
return dateRange;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 根据时间字段是否为空设置日期范围
|
||||||
|
Date startDate = null;
|
||||||
|
Date endDate = null;
|
||||||
|
|
||||||
|
// 删除目标表数据(根据时间字段是否为空决定删除范围)
|
||||||
|
if (timeField == null || timeField.isEmpty()) {
|
||||||
|
// 全量删除
|
||||||
|
deleteAllTargetData(targetTable);
|
||||||
|
} else {
|
||||||
|
// 计算时间范围(当前日期-30天 ~ 昨天)
|
||||||
|
Calendar cal = Calendar.getInstance();
|
||||||
|
cal.add(Calendar.DATE, -1); // 昨天
|
||||||
|
endDate = cal.getTime();
|
||||||
|
cal.add(Calendar.DATE, -DAYS_BACK + 1); // 30天前(含)
|
||||||
|
startDate = cal.getTime();
|
||||||
|
|
||||||
|
// 获取目标表时间字段名
|
||||||
|
String targetTimeField = getTargetTimeField(fieldMappings, timeField);
|
||||||
|
if (targetTimeField == null) {
|
||||||
|
LOGGER.error("无法找到源时间字段[{}]对应的目标表字段,跳过同步", timeField);
|
||||||
|
return dateRange;
|
||||||
|
}
|
||||||
|
// 按时间范围删除
|
||||||
|
deleteTargetData(targetTable, targetTimeField, startDate, endDate);
|
||||||
|
}
|
||||||
|
querySourceData(ccId, tableName, timeField, startDate, endDate, partitionField,
|
||||||
|
fieldMappings, targetTable);
|
||||||
|
dateRange.setStartDate(startDate);
|
||||||
|
dateRange.setEndDate(endDate);
|
||||||
|
return dateRange;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取目标表时间字段名
|
||||||
|
* @param mappings 字段映射配置列表
|
||||||
|
* @param sourceTimeField 源表时间字段名
|
||||||
|
* @return 目标表时间字段名,未找到返回null
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public String getTargetTimeField(List<BO> mappings, String sourceTimeField) {
|
||||||
|
for (BO mapping : mappings) {
|
||||||
|
if (sourceTimeField.equals(mapping.getString("TBBZD"))) {
|
||||||
|
return mapping.getString("LDBZD");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除目标表中指定时间范围的数据
|
||||||
|
* @param targetTable 目标表名
|
||||||
|
* @param targetTimeField 目标表时间字段名
|
||||||
|
* @param startDate 开始时间
|
||||||
|
* @param endDate 结束时间
|
||||||
|
* @throws RuntimeException 删除失败时抛出
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void deleteTargetData(String targetTable, String targetTimeField,
|
||||||
|
Date startDate, Date endDate) {
|
||||||
|
try {
|
||||||
|
String deleteSql = "DELETE FROM " + targetTable +
|
||||||
|
" WHERE " + targetTimeField + " BETWEEN ? AND ?";
|
||||||
|
int deletedCount = DBSql.update(deleteSql, new Object[]{startDate, endDate});
|
||||||
|
LOGGER.info("已删除目标表[{}]中{}条数据(时间范围: {} - {})",
|
||||||
|
targetTable, deletedCount, startDate, endDate);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException("删除目标表数据失败: " + e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteAllTargetData(String targetTable) {
|
||||||
|
try {
|
||||||
|
String deleteSql = "DELETE FROM " + targetTable ;
|
||||||
|
int deletedCount = DBSql.update(deleteSql);
|
||||||
|
LOGGER.info("已全量删除目标表[{}]中{}条数据", targetTable, deletedCount);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException("全量删除目标表数据失败: " + e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 跨库查询源表数据
|
||||||
|
* @param ccId 跨库连接ID
|
||||||
|
* @param tableName 源表名
|
||||||
|
* @param timeField 源表时间字段名
|
||||||
|
* @param startDated 开始时间
|
||||||
|
* @param endDated 结束时间
|
||||||
|
* @return 查询结果数据集
|
||||||
|
* @throws RuntimeException 查询失败或参数无效时抛出
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void querySourceData(String ccId, String tableName,
|
||||||
|
String timeField, Date startDated, Date endDated,String partitionField,
|
||||||
|
List<BO> fieldMappings, String targetTable) {
|
||||||
|
int totalRows = 0; // 总查询行数
|
||||||
|
int totalSuccess = 0; // 总成功插入行数
|
||||||
|
int pageNo = 1;
|
||||||
|
boolean hasMore;
|
||||||
|
RDSAPI rdsapi = null;
|
||||||
|
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||||
|
String startDate = simpleDateFormat.format(startDated);
|
||||||
|
String endDate = simpleDateFormat.format(endDated);
|
||||||
|
try {
|
||||||
|
rdsapi = SDK.getCCAPI().getRDSAPI(ccId);
|
||||||
|
DBUtils.SUPPLY supply = rdsapi.getSupply();
|
||||||
|
String DBname = supply.getName();
|
||||||
|
LOGGER.info("数据库为:{}",DBname);
|
||||||
|
if ("ORACLE".equalsIgnoreCase(DBname)){
|
||||||
|
// 构建查询条件
|
||||||
|
StringBuilder conditionBuilder = new StringBuilder();
|
||||||
|
List<Object> params = new ArrayList<>(); // 存储查询参数
|
||||||
|
|
||||||
|
// 分区字段和时间字段组合查询条件
|
||||||
|
if (partitionField != null && !partitionField.isEmpty()) {
|
||||||
|
// 1. 查询最大分区值
|
||||||
|
String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName;
|
||||||
|
List<RowMap> maxPartitionResult = rdsapi.getMaps(maxPartitionSql);
|
||||||
|
|
||||||
|
if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) {
|
||||||
|
LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
String maxPartition = maxPartitionResult.get(0).getString("max_partition");
|
||||||
|
LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition);
|
||||||
|
|
||||||
|
// 添加分区条件
|
||||||
|
conditionBuilder.append(partitionField)
|
||||||
|
.append(" = '")
|
||||||
|
.append(maxPartition)
|
||||||
|
.append("'");
|
||||||
|
|
||||||
|
// 如果时间字段存在,添加时间范围条件
|
||||||
|
if (timeField != null && !timeField.isEmpty()) {
|
||||||
|
conditionBuilder.append(" AND TO_DATE(")
|
||||||
|
.append(timeField)
|
||||||
|
.append(", '")
|
||||||
|
.append(ORACLE_DATE_FORMAT)
|
||||||
|
.append("') BETWEEN TO_DATE(?, '")
|
||||||
|
.append(ORACLE_DATE_FORMAT)
|
||||||
|
.append("') AND TO_DATE(?, '")
|
||||||
|
.append(ORACLE_DATE_FORMAT)
|
||||||
|
.append("')");
|
||||||
|
params.add(startDate);
|
||||||
|
params.add(endDate);
|
||||||
|
}
|
||||||
|
} else if (timeField != null && !timeField.isEmpty()) {
|
||||||
|
// 没有分区字段,但时间字段存在,使用时间范围条件
|
||||||
|
// 仅时间范围条件(使用占位符)
|
||||||
|
conditionBuilder.append("TO_DATE(")
|
||||||
|
.append(timeField)
|
||||||
|
.append(", '")
|
||||||
|
.append(ORACLE_DATE_FORMAT)
|
||||||
|
.append("') BETWEEN TO_DATE(?, '")
|
||||||
|
.append(ORACLE_DATE_FORMAT)
|
||||||
|
.append("') AND TO_DATE(?, '")
|
||||||
|
.append(ORACLE_DATE_FORMAT)
|
||||||
|
.append("')");
|
||||||
|
params.add(startDate);
|
||||||
|
params.add(endDate);
|
||||||
|
} else {
|
||||||
|
// 既没有分区字段也没有时间字段,查询全表
|
||||||
|
LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!");
|
||||||
|
conditionBuilder.append("1=1");
|
||||||
|
}
|
||||||
|
|
||||||
|
// 分页查询数据
|
||||||
|
do {
|
||||||
|
// 使用Oracle分页语法 (12c+)
|
||||||
|
String querySql = "SELECT * FROM ( " +
|
||||||
|
"SELECT t.*, ROWNUM rn FROM " + tableName + " t " +
|
||||||
|
"WHERE " + conditionBuilder.toString() + " AND ROWNUM <= " + (pageNo * PAGE_SIZE) +
|
||||||
|
") WHERE rn > " + ((pageNo - 1) * PAGE_SIZE);
|
||||||
|
|
||||||
|
LOGGER.debug("执行Oracle查询: {}", querySql);
|
||||||
|
|
||||||
|
List<RowMap> pageData;
|
||||||
|
// 根据条件类型执行查询
|
||||||
|
if (partitionField != null && !partitionField.isEmpty() &&
|
||||||
|
timeField != null && !timeField.isEmpty()) {
|
||||||
|
// 分区+时间范围查询
|
||||||
|
pageData = rdsapi.getMaps(querySql, startDate, endDate);
|
||||||
|
} else if (timeField != null && !timeField.isEmpty()) {
|
||||||
|
// 仅时间范围查询
|
||||||
|
pageData = rdsapi.getMaps(querySql, startDate, endDate);
|
||||||
|
} else {
|
||||||
|
// 无时间范围查询(仅分区或全表)
|
||||||
|
pageData = rdsapi.getMaps(querySql);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pageData != null && !pageData.isEmpty()) {
|
||||||
|
// 直接处理当前页数据
|
||||||
|
int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable);
|
||||||
|
totalRows += pageData.size();
|
||||||
|
totalSuccess += successCount;
|
||||||
|
hasMore = pageData.size() == PAGE_SIZE;
|
||||||
|
pageNo++;
|
||||||
|
} else {
|
||||||
|
hasMore = false;
|
||||||
|
}
|
||||||
|
} while (hasMore);
|
||||||
|
}else {
|
||||||
|
// 构建查询条件
|
||||||
|
StringBuilder conditionBuilder = new StringBuilder();
|
||||||
|
// 修改点:分区字段和时间字段组合查询条件
|
||||||
|
if (partitionField != null && !partitionField.isEmpty()) {
|
||||||
|
// 1. 查询最大分区值
|
||||||
|
String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName;
|
||||||
|
List<RowMap> maxPartitionResult = rdsapi.getMaps(maxPartitionSql);
|
||||||
|
|
||||||
|
if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) {
|
||||||
|
LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
String maxPartition = maxPartitionResult.get(0).getString("max_partition");
|
||||||
|
LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition);
|
||||||
|
|
||||||
|
// 添加分区条件
|
||||||
|
conditionBuilder.append(partitionField)
|
||||||
|
.append(" = '")
|
||||||
|
.append(maxPartition)
|
||||||
|
.append("'");
|
||||||
|
|
||||||
|
// 如果时间字段存在,添加时间范围条件
|
||||||
|
if (timeField != null && !timeField.isEmpty()) {
|
||||||
|
conditionBuilder.append(" AND ")
|
||||||
|
.append(timeField)
|
||||||
|
.append(" BETWEEN ? AND ?");
|
||||||
|
}
|
||||||
|
} else if (timeField != null && !timeField.isEmpty()) {
|
||||||
|
// 没有分区字段,但时间字段存在,使用时间范围条件
|
||||||
|
conditionBuilder.append(timeField)
|
||||||
|
.append(" BETWEEN ? AND ?");
|
||||||
|
} else {
|
||||||
|
// 既没有分区字段也没有时间字段,查询全表(实际应避免这种情况)
|
||||||
|
LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!");
|
||||||
|
conditionBuilder.append("1=1");
|
||||||
|
}
|
||||||
|
// 分页查询数据
|
||||||
|
do {
|
||||||
|
String querySqls = "SELECT * FROM " + tableName +
|
||||||
|
" WHERE " + conditionBuilder.toString() +" ";
|
||||||
|
// " LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
|
||||||
|
LOGGER.debug("执行查询querySqls: {}", querySqls);
|
||||||
|
String querySql = SQLPagination.getPaginitionSQL(querySqls, (pageNo - 1) * PAGE_SIZE, PAGE_SIZE,DBname);
|
||||||
|
|
||||||
|
LOGGER.debug("执行查询querySql: {}", querySql);
|
||||||
|
|
||||||
|
List<RowMap> pageData;
|
||||||
|
// 根据条件类型执行查询
|
||||||
|
if (partitionField != null && !partitionField.isEmpty() &&
|
||||||
|
timeField != null && !timeField.isEmpty()) {
|
||||||
|
// 分区+时间范围查询
|
||||||
|
pageData = rdsapi.getMaps(querySql, startDate, endDate);
|
||||||
|
} else if (timeField != null && !timeField.isEmpty()) {
|
||||||
|
// 仅时间范围查询
|
||||||
|
pageData = rdsapi.getMaps(querySql, startDate, endDate);
|
||||||
|
} else {
|
||||||
|
// 无时间范围查询(仅分区或全表)
|
||||||
|
pageData = rdsapi.getMaps(querySql);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pageData != null && !pageData.isEmpty()) {
|
||||||
|
// 直接处理当前页数据
|
||||||
|
int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable);
|
||||||
|
totalRows += pageData.size();
|
||||||
|
totalSuccess += successCount;
|
||||||
|
hasMore = pageData.size() == PAGE_SIZE;
|
||||||
|
pageNo++;
|
||||||
|
} else {
|
||||||
|
hasMore = false;
|
||||||
|
}
|
||||||
|
} while (hasMore);
|
||||||
|
}
|
||||||
|
|
||||||
|
LOGGER.info("从表[{}]共查询到{}条数据,成功同步{}条数据",
|
||||||
|
tableName, totalRows, totalSuccess);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException("查询源表[" + tableName + "]数据失败: " + e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理并插入数据到目标表
|
||||||
|
* @param sourceData 源数据列表
|
||||||
|
* @param mappings 字段映射配置
|
||||||
|
* @param targetTable 目标表名
|
||||||
|
*/
|
||||||
|
public int processAndInsertData(List<RowMap> sourceData,
|
||||||
|
List<BO> mappings, String targetTable) {
|
||||||
|
String bkgs = "";
|
||||||
|
if (sourceData.isEmpty()) {
|
||||||
|
LOGGER.info("没有需要同步的数据");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<BO> batchList = new ArrayList<>();
|
||||||
|
int successCount = 0;
|
||||||
|
int totalCount = sourceData.size();
|
||||||
|
int processedCount = 0; // 已处理记录数
|
||||||
|
|
||||||
|
for (int i = 0; i < totalCount; i++) {
|
||||||
|
RowMap record = sourceData.get(i);
|
||||||
|
processedCount++; // 增加已处理计数
|
||||||
|
try {
|
||||||
|
// 字段映射转换
|
||||||
|
BO targetData = convertFields(record, mappings);
|
||||||
|
bkgs = targetData.getString("BKGS");
|
||||||
|
batchList.add(targetData);
|
||||||
|
|
||||||
|
// 批量插入条件:达到批处理大小或最后一条
|
||||||
|
if (batchList.size() >= PAGE_SIZE || i == totalCount - 1) {
|
||||||
|
// 使用管理员权限批量插入
|
||||||
|
SDK.getBOAPI().createDataBO(targetTable, batchList, UserContext.fromUID("admin"));
|
||||||
|
successCount += batchList.size();
|
||||||
|
batchList.clear(); // 清空批次
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOGGER.error("数据处理失败: {}", e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// 增加详细日志输出:共处理多少条,成功同步多少条
|
||||||
|
LOGGER.info("同步板块为:{},落地表为:{},本次处理{}条数据,成功同步{}条数据到表[{}]",
|
||||||
|
bkgs,targetTable,processedCount, successCount, targetTable);
|
||||||
|
return successCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 字段映射转换
|
||||||
|
* @param source 源数据记录
|
||||||
|
* @param mappings 字段映射配置
|
||||||
|
* @return 转换后的BO对象
|
||||||
|
*/
|
||||||
|
public BO convertFields(RowMap source, List<BO> mappings) {
|
||||||
|
BO target = new BO();
|
||||||
|
|
||||||
|
for (BO mapping : mappings) {
|
||||||
|
String sourceField = mapping.getString("TBBZD");
|
||||||
|
String targetField = mapping.getString("LDBZD");
|
||||||
|
|
||||||
|
// if (!source.containsKey(sourceField)) {
|
||||||
|
// LOGGER.debug("源字段[{}]不存在于查询结果中", sourceField);
|
||||||
|
// continue;
|
||||||
|
// }
|
||||||
|
|
||||||
|
String operationExpr = mapping.getString("TBBZDJSLJ");
|
||||||
|
if (StringUtils.isNotBlank(operationExpr)) {
|
||||||
|
// 解析运算表达式 (格式: [运算符][数字])
|
||||||
|
char operator = operationExpr.charAt(0);
|
||||||
|
String numberPart = operationExpr.substring(1);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 获取源值并转换为BigDecimal
|
||||||
|
String sourceValue = source.getString(sourceField);
|
||||||
|
if (StringUtils.isBlank(sourceValue)) {
|
||||||
|
target.set(targetField, null);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ("GG".equals(sourceField)){
|
||||||
|
String string = source.getString(sourceField);
|
||||||
|
if (StringUtils.isNotBlank(string) && string.contains("×")){
|
||||||
|
String[] split = string.split("×");
|
||||||
|
String s = split[split.length - 1];
|
||||||
|
target.set(targetField, s);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
BigDecimal sourceNum = new BigDecimal(sourceValue);
|
||||||
|
BigDecimal operand = new BigDecimal(numberPart);
|
||||||
|
BigDecimal result;
|
||||||
|
|
||||||
|
// 执行相应运算
|
||||||
|
switch (operator) {
|
||||||
|
case '*':
|
||||||
|
result = sourceNum.multiply(operand);
|
||||||
|
break;
|
||||||
|
case '/':
|
||||||
|
if (BigDecimal.ZERO.compareTo(operand) == 0) {
|
||||||
|
LOGGER.error("除零错误: 源字段[{}] 除数为0", sourceField);
|
||||||
|
result = sourceNum; // 避免除零异常
|
||||||
|
} else {
|
||||||
|
// 除法保留10位小数并四舍五入
|
||||||
|
result = sourceNum.divide(operand, 10, RoundingMode.HALF_UP);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case '+':
|
||||||
|
result = sourceNum.add(operand);
|
||||||
|
break;
|
||||||
|
case '-':
|
||||||
|
result = sourceNum.subtract(operand);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
LOGGER.error("未知运算符: {} 字段[{}]", operator, sourceField);
|
||||||
|
result = sourceNum;
|
||||||
|
}
|
||||||
|
target.set(targetField, result);
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
LOGGER.error("数值转换失败: 源字段[{}]={}, 操作数={}",
|
||||||
|
sourceField, source.getString(sourceField), numberPart, e);
|
||||||
|
target.set(targetField, source.getString(sourceField));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if ("GG".equals(sourceField)){
|
||||||
|
String string = source.getString(sourceField);
|
||||||
|
if (StringUtils.isNotBlank(string) && string.contains("×")){
|
||||||
|
String[] split = string.split("×");
|
||||||
|
String s = split[split.length - 1];
|
||||||
|
target.set(targetField, s);
|
||||||
|
}else {
|
||||||
|
// 无运算表达式时直接复制原始值
|
||||||
|
target.set(targetField, StringUtils.isNotBlank(string)?string:"");
|
||||||
|
}
|
||||||
|
}else {
|
||||||
|
// 无运算表达式时直接复制原始值
|
||||||
|
target.set(targetField, source.getString(sourceField));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
String ldzdmrz = mapping.getString("LDZDMRZ");
|
||||||
|
if (StringUtils.isNotBlank(ldzdmrz)){
|
||||||
|
target.set(mapping.getString("LDBZD"),ldzdmrz);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return target;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 销售各板块数据汇总表
|
||||||
|
* @param configs
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void sumBkTable(List<BO> configs) {
|
||||||
|
Map<String, List<BO>> configsByPlate = configs.stream()
|
||||||
|
.collect(Collectors.groupingBy(bo -> bo.getString("SSBK")));
|
||||||
|
|
||||||
|
// 遍历处理每个板块的配置
|
||||||
|
for (Map.Entry<String, List<BO>> entry : configsByPlate.entrySet()) {
|
||||||
|
String plate = entry.getKey();
|
||||||
|
List<BO> plateConfigs = entry.getValue();
|
||||||
|
LOGGER.info("处理板块【{}】的{}条配置", plate, plateConfigs.size());
|
||||||
|
|
||||||
|
// 处理当前板块的每条配置
|
||||||
|
for (BO mainConfig : plateConfigs) {
|
||||||
|
String targetTable = mainConfig.getString("LDB");//落地表
|
||||||
|
String timeField = mainConfig.getString("SJZD");//时间字段
|
||||||
|
String bindId = mainConfig.getString("BINDID");//bindid
|
||||||
|
String tablename = mainConfig.getString("TABLENAME");//同步表名
|
||||||
|
String hzb = "";
|
||||||
|
try {
|
||||||
|
if ("原材料".equals(tablename)){
|
||||||
|
hzb = "BO_EU_DWS_ORDER_YCLXH_HZ";
|
||||||
|
}else {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// 查询子表字段映射配置
|
||||||
|
List<BO> fieldMappings = SDK.getBOAPI()
|
||||||
|
.query("BO_EU_BNBM_DATALINKUP_SJGTPZ_SUB")
|
||||||
|
.addQuery("BINDID =", bindId)
|
||||||
|
.list();
|
||||||
|
//获取板块公司
|
||||||
|
String bkgs = DBSql.getString("SELECT BKGS FROM " + targetTable, "BKGS");
|
||||||
|
|
||||||
|
// 根据时间字段是否为空设置日期范围
|
||||||
|
Date startDate = null;
|
||||||
|
Date endDate = null;
|
||||||
|
|
||||||
|
// 删除目标表数据(根据时间字段是否为空决定删除范围)
|
||||||
|
if (timeField == null || timeField.isEmpty()) {
|
||||||
|
// 全量删除
|
||||||
|
String deleteSql = "DELETE FROM "+hzb+" WHERE BKGS = '"+bkgs+"'";
|
||||||
|
int deletedCount = DBSql.update(deleteSql);
|
||||||
|
LOGGER.info("已删除目标表[{}}]中{}条数据(时间范围: {} - {})",
|
||||||
|
hzb,deletedCount, startDate, endDate);
|
||||||
|
|
||||||
|
// 根据时间范围增加数据分页查询数据存储到BO_EU_BNBM_DATALINKUP_XS_XSL_HZ
|
||||||
|
// 全量分页迁移数据到汇总表
|
||||||
|
summarizeScopeData(targetTable, null, null, null, hzb);
|
||||||
|
} else {
|
||||||
|
// 计算时间范围(当前日期-30天 ~ 昨天)
|
||||||
|
Calendar cal = Calendar.getInstance();
|
||||||
|
cal.add(Calendar.DATE, -1); // 昨天
|
||||||
|
endDate = cal.getTime();
|
||||||
|
cal.add(Calendar.DATE, -DAYS_BACK + 1); // 30天前(含)
|
||||||
|
startDate = cal.getTime();
|
||||||
|
|
||||||
|
// 获取目标表时间字段名
|
||||||
|
String targetTimeField = getTargetTimeField(fieldMappings, timeField);
|
||||||
|
if (targetTimeField == null) {
|
||||||
|
LOGGER.error("无法找到源时间字段[{}]对应的目标表字段,跳过同步", timeField);
|
||||||
|
}
|
||||||
|
// 按时间范围删除
|
||||||
|
String deleteSql = "DELETE FROM " + hzb +
|
||||||
|
" WHERE BKGS = '"+bkgs+"' AND " + targetTimeField + " BETWEEN ? AND ?";
|
||||||
|
int deletedCount = DBSql.update(deleteSql, new Object[]{startDate, endDate});
|
||||||
|
LOGGER.info("已删除目标表[{}]中{}条数据(时间范围: {} - {})",
|
||||||
|
hzb,deletedCount, startDate, endDate);
|
||||||
|
|
||||||
|
// 根据时间范围增加数据分页查询数据存储到BO_EU_BNBM_DATALINKUP_XS_XSL_HZ
|
||||||
|
// 按时间范围分页迁移数据到汇总表
|
||||||
|
summarizeScopeData(targetTable, startDate, endDate, targetTimeField, hzb);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOGGER.error("处理配置失败 [板块={}, BindID={}]: {}",
|
||||||
|
plate, mainConfig.getString("BINDID"), e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 汇总各板块销售数据汇总
|
||||||
|
* @param targetTable
|
||||||
|
* @param startDated
|
||||||
|
* @param endDated
|
||||||
|
* @param targetTimeField
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void summarizeScopeData(String targetTable, Date startDated, Date endDated, String targetTimeField, String hzb) {
|
||||||
|
int pageNo = 1;
|
||||||
|
boolean hasMore;
|
||||||
|
String pageSql = "";
|
||||||
|
List<RowMap> pageData = null;
|
||||||
|
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||||
|
String startDate = "";
|
||||||
|
String endDate = "";
|
||||||
|
try {
|
||||||
|
do {
|
||||||
|
if (startDated == null || endDated == null) {
|
||||||
|
pageSql = "SELECT * FROM " + targetTable +
|
||||||
|
" LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
|
||||||
|
pageData = DBSql.getMaps(pageSql);
|
||||||
|
} else {
|
||||||
|
startDate = simpleDateFormat.format(startDated);
|
||||||
|
endDate = simpleDateFormat.format(endDated);
|
||||||
|
pageSql = "SELECT * FROM " + targetTable +
|
||||||
|
" WHERE " + targetTimeField + " BETWEEN '" + startDate + "' AND '" + endDate + "' " +
|
||||||
|
" LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
|
||||||
|
LOGGER.info("执行查询的sql:{}", pageSql);
|
||||||
|
pageData = DBSql.getMaps(pageSql);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pageData.isEmpty()) break;
|
||||||
|
|
||||||
|
List<BO> bos = new ArrayList<>();
|
||||||
|
for (RowMap map : pageData) {
|
||||||
|
BO bo = new BO();
|
||||||
|
// 复制所有字段(排除系统字段)
|
||||||
|
for (String key : map.keySet()) {
|
||||||
|
if (!key.equalsIgnoreCase("ID") &&
|
||||||
|
!key.equalsIgnoreCase("ORGID") &&
|
||||||
|
!key.equalsIgnoreCase("CREATEDATE") &&
|
||||||
|
!key.equalsIgnoreCase("CREATEUSER") &&
|
||||||
|
!key.equalsIgnoreCase("UPDATEDATE") &&
|
||||||
|
!key.equalsIgnoreCase("UPDATEUSER") &&
|
||||||
|
!key.equalsIgnoreCase("ISEND") &&
|
||||||
|
!key.equalsIgnoreCase("BINDID")) {
|
||||||
|
if (StringUtils.isNotBlank(targetTimeField)) {
|
||||||
|
String targetTimeField1 = map.getString(targetTimeField);
|
||||||
|
Date parse = UtilDate.parse(targetTimeField1);
|
||||||
|
int year = UtilDate.getYear(parse);
|
||||||
|
String monthFormat = UtilDate.monthFormat(parse);
|
||||||
|
int day = UtilDate.getDay(parse);
|
||||||
|
bo.set("YEARMONTH", year + monthFormat);
|
||||||
|
bo.set("YEAR", year);
|
||||||
|
bo.set("MONTH", monthFormat);
|
||||||
|
bo.set("DAY", day);
|
||||||
|
}
|
||||||
|
bo.set(key, map.get(key));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bos.add(bo);
|
||||||
|
}
|
||||||
|
|
||||||
|
SDK.getBOAPI().createDataBO(hzb, bos, UserContext.fromUID("admin"));
|
||||||
|
LOGGER.info("已迁移{}条数据到汇总表(页号: {},时间范围: {} - {})",
|
||||||
|
bos.size(), pageNo, startDate, endDate);
|
||||||
|
|
||||||
|
hasMore = pageData.size() == PAGE_SIZE;
|
||||||
|
pageNo++;
|
||||||
|
} while (hasMore);
|
||||||
|
}catch (Exception e){
|
||||||
|
LOGGER.error("汇总数据失败 [汇总表={}, 第几页={}]: {}",
|
||||||
|
hzb, PAGE_SIZE, e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -3,6 +3,7 @@ package com.awspaas.user.apps.bnbm.datalinkup.service.impl;
|
|||||||
import com.actionsoft.bpms.bo.engine.BO;
|
import com.actionsoft.bpms.bo.engine.BO;
|
||||||
import com.actionsoft.bpms.commons.database.DBUtils;
|
import com.actionsoft.bpms.commons.database.DBUtils;
|
||||||
import com.actionsoft.bpms.commons.database.RowMap;
|
import com.actionsoft.bpms.commons.database.RowMap;
|
||||||
|
import com.actionsoft.bpms.commons.pagination.SQLPagination;
|
||||||
import com.actionsoft.bpms.server.UserContext;
|
import com.actionsoft.bpms.server.UserContext;
|
||||||
import com.actionsoft.bpms.util.DBSql;
|
import com.actionsoft.bpms.util.DBSql;
|
||||||
import com.actionsoft.bpms.util.UtilDate;
|
import com.actionsoft.bpms.util.UtilDate;
|
||||||
@ -22,15 +23,14 @@ import java.util.stream.Collectors;
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* @ClassName: PurchaseDataSyncServiceImpl
|
* @ClassName: PurchaseDataSyncServiceImpl
|
||||||
* @Description:
|
* @Description: 采购实现类
|
||||||
* @author: 李春洋
|
|
||||||
* @date: 2025/8/15 15:46
|
* @date: 2025/8/15 15:46
|
||||||
* @Blog: https://
|
* @Blog: https://
|
||||||
*/
|
*/
|
||||||
public class PurchaseDataSyncServiceImpl implements DataSyncService {
|
public class PurchaseDataSyncServiceImpl implements DataSyncService {
|
||||||
private static final Logger LOGGER = LoggerFactory.getLogger(SaleDataSyncServiceImpl.class);
|
private static final Logger LOGGER = LoggerFactory.getLogger(SaleDataSyncServiceImpl.class);
|
||||||
/**
|
/**
|
||||||
* 时间范围常量:同步最近30天数据(不包括当天)
|
* 时间范围常量:同步最近60天数据(不包括当天)
|
||||||
*/
|
*/
|
||||||
private static final int DAYS_BACK = Integer.parseInt(SDK.getAppAPI().getProperty("com.awspaas.user.apps.bnbm.datalinkup", "days_back"));
|
private static final int DAYS_BACK = Integer.parseInt(SDK.getAppAPI().getProperty("com.awspaas.user.apps.bnbm.datalinkup", "days_back"));
|
||||||
|
|
||||||
@ -125,7 +125,6 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService {
|
|||||||
}
|
}
|
||||||
querySourceData(ccId, tableName, timeField, startDate, endDate, partitionField,
|
querySourceData(ccId, tableName, timeField, startDate, endDate, partitionField,
|
||||||
fieldMappings, targetTable);
|
fieldMappings, targetTable);
|
||||||
// }
|
|
||||||
dateRange.setStartDate(startDate);
|
dateRange.setStartDate(startDate);
|
||||||
dateRange.setEndDate(endDate);
|
dateRange.setEndDate(endDate);
|
||||||
return dateRange;
|
return dateRange;
|
||||||
@ -341,11 +340,13 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService {
|
|||||||
}
|
}
|
||||||
// 分页查询数据
|
// 分页查询数据
|
||||||
do {
|
do {
|
||||||
String querySql = "SELECT * FROM " + tableName +
|
String querySqls = "SELECT * FROM " + tableName +
|
||||||
" WHERE " + conditionBuilder.toString() +
|
" WHERE " + conditionBuilder.toString() +" ";
|
||||||
" LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
|
// " LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
|
||||||
|
LOGGER.debug("执行查询querySqls: {}", querySqls);
|
||||||
|
String querySql = SQLPagination.getPaginitionSQL(querySqls, (pageNo - 1) * PAGE_SIZE, PAGE_SIZE,DBname);
|
||||||
|
|
||||||
LOGGER.debug("执行查询: {}", querySql);
|
LOGGER.debug("执行查询querySql: {}", querySql);
|
||||||
|
|
||||||
List<RowMap> pageData;
|
List<RowMap> pageData;
|
||||||
// 根据条件类型执行查询
|
// 根据条件类型执行查询
|
||||||
@ -391,6 +392,7 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService {
|
|||||||
*/
|
*/
|
||||||
public int processAndInsertData(List<RowMap> sourceData,
|
public int processAndInsertData(List<RowMap> sourceData,
|
||||||
List<BO> mappings, String targetTable) {
|
List<BO> mappings, String targetTable) {
|
||||||
|
String bkgs = "";
|
||||||
if (sourceData.isEmpty()) {
|
if (sourceData.isEmpty()) {
|
||||||
LOGGER.info("没有需要同步的数据");
|
LOGGER.info("没有需要同步的数据");
|
||||||
return 0;
|
return 0;
|
||||||
@ -407,6 +409,7 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService {
|
|||||||
try {
|
try {
|
||||||
// 字段映射转换
|
// 字段映射转换
|
||||||
BO targetData = convertFields(record, mappings);
|
BO targetData = convertFields(record, mappings);
|
||||||
|
bkgs = targetData.getString("BKGS");
|
||||||
batchList.add(targetData);
|
batchList.add(targetData);
|
||||||
|
|
||||||
// 批量插入条件:达到批处理大小或最后一条
|
// 批量插入条件:达到批处理大小或最后一条
|
||||||
@ -421,8 +424,8 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// 增加详细日志输出:共处理多少条,成功同步多少条
|
// 增加详细日志输出:共处理多少条,成功同步多少条
|
||||||
LOGGER.info("本次处理{}条数据,成功同步{}条数据到表[{}]",
|
LOGGER.info("同步板块为:{},落地表为:{},本次处理{}条数据,成功同步{}条数据到表[{}]",
|
||||||
processedCount, successCount, targetTable);
|
bkgs,targetTable,processedCount, successCount, targetTable);
|
||||||
return successCount;
|
return successCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -155,11 +155,6 @@ public class SaleCountDimensionImpl implements DataSummaryService {
|
|||||||
throw new RuntimeException(errorMsg, e);
|
throw new RuntimeException(errorMsg, e);
|
||||||
} finally {
|
} finally {
|
||||||
DBSql.close(conn);
|
DBSql.close(conn);
|
||||||
try {
|
|
||||||
LOGGER.info("销售数据汇总计算完成:关闭连接,conn状态为:{}",conn.getClientInfo());
|
|
||||||
} catch (SQLException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -489,11 +489,6 @@ public class SaleDataSummaryServiceImpl implements DataSummaryService {
|
|||||||
saveUnitPriceData(date, lb_1, year, month, day, bkgs);
|
saveUnitPriceData(date, lb_1, year, month, day, bkgs);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 处理石膏板数据
|
|
||||||
// saveUnitPriceData(date, "石膏板", year, month, day, bkgs);
|
|
||||||
// 处理轻钢龙骨数据
|
|
||||||
// saveUnitPriceData(date, "轻钢龙骨", year, month, day, bkgs);
|
|
||||||
|
|
||||||
LOGGER.info("产品单价日明细数据保存成功");
|
LOGGER.info("产品单价日明细数据保存成功");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
String errorMsg = String.format("产品单价日明细数据计算失败: %s", e.getMessage());
|
String errorMsg = String.format("产品单价日明细数据计算失败: %s", e.getMessage());
|
||||||
@ -529,7 +524,6 @@ public class SaleDataSummaryServiceImpl implements DataSummaryService {
|
|||||||
|
|
||||||
if (day > 0) {
|
if (day > 0) {
|
||||||
sql += " AND DAY(DZRQ) = '"+day+"'";
|
sql += " AND DAY(DZRQ) = '"+day+"'";
|
||||||
params = new Object[]{year, month, day, productType,bkgs};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
RowMap result = DBSql.getMap(sql);
|
RowMap result = DBSql.getMap(sql);
|
||||||
@ -580,12 +574,6 @@ public class SaleDataSummaryServiceImpl implements DataSummaryService {
|
|||||||
*/
|
*/
|
||||||
private BO createBaseBO(String bkgs) {
|
private BO createBaseBO(String bkgs) {
|
||||||
BO bo = new BO();
|
BO bo = new BO();
|
||||||
// bo.set("ORGID", "");
|
|
||||||
// bo.set("BINDID", "");
|
|
||||||
// bo.set("CREATEDATE", new Timestamp(System.currentTimeMillis()));
|
|
||||||
// bo.set("CREATEUSER", "admin");
|
|
||||||
// bo.set("PROCESSDEFID", "");
|
|
||||||
// bo.set("ISEND", 0);
|
|
||||||
bo.set("BKGS", bkgs); // 新增BKGS字段
|
bo.set("BKGS", bkgs); // 新增BKGS字段
|
||||||
return bo;
|
return bo;
|
||||||
}
|
}
|
||||||
@ -647,13 +635,6 @@ public class SaleDataSummaryServiceImpl implements DataSummaryService {
|
|||||||
.append("WHERE YEAR(RQ) = '"+year+"' AND MONTH(RQ) BETWEEN 1 AND '"+month+"' ")
|
.append("WHERE YEAR(RQ) = '"+year+"' AND MONTH(RQ) BETWEEN 1 AND '"+month+"' ")
|
||||||
.append("AND BKGS = '"+bkgs+"' ");
|
.append("AND BKGS = '"+bkgs+"' ");
|
||||||
|
|
||||||
// 构建品牌列表参数占位符
|
|
||||||
// for (int i = 0; i < brands.size(); i++) {
|
|
||||||
// sql.append(brands.get(i));
|
|
||||||
// if (i < brands.size() - 1) sql.append(",");
|
|
||||||
// }
|
|
||||||
// sql.append(")");
|
|
||||||
|
|
||||||
double value = DBSql.getDouble(sql.toString(), "receivable");
|
double value = DBSql.getDouble(sql.toString(), "receivable");
|
||||||
return BigDecimal.valueOf(value);
|
return BigDecimal.valueOf(value);
|
||||||
}
|
}
|
||||||
@ -697,20 +678,6 @@ public class SaleDataSummaryServiceImpl implements DataSummaryService {
|
|||||||
.append("FROM " + RECEIVABLE_DETAIL_TABLE + " ")
|
.append("FROM " + RECEIVABLE_DETAIL_TABLE + " ")
|
||||||
.append("WHERE YEAR(RQ) = '"+year+"' AND MONTH(RQ) = '"+month+"' AND BKGS = '"+bkgs+"' ");
|
.append("WHERE YEAR(RQ) = '"+year+"' AND MONTH(RQ) = '"+month+"' AND BKGS = '"+bkgs+"' ");
|
||||||
|
|
||||||
// for (int i = 0; i < brands.size(); i++) {
|
|
||||||
// sql.append(brands.get(i));
|
|
||||||
// if (i < brands.size() - 1) sql.append(",");
|
|
||||||
// }
|
|
||||||
|
|
||||||
// 构建参数数组
|
|
||||||
// Object[] params = new Object[brands.size() + 3];
|
|
||||||
// params[0] = year;
|
|
||||||
// params[1] = month;
|
|
||||||
// params[2] = bkgs;
|
|
||||||
// for (int i = 0; i < brands.size(); i++) {
|
|
||||||
// params[i + 3] = brands.get(i);
|
|
||||||
// }
|
|
||||||
|
|
||||||
Map<String, BigDecimal> result = new HashMap<>();
|
Map<String, BigDecimal> result = new HashMap<>();
|
||||||
List<RowMap> rows = DBSql.getMaps(sql.toString());
|
List<RowMap> rows = DBSql.getMaps(sql.toString());
|
||||||
for (RowMap row : rows) {
|
for (RowMap row : rows) {
|
||||||
|
|||||||
@ -3,6 +3,7 @@ package com.awspaas.user.apps.bnbm.datalinkup.service.impl;
|
|||||||
import com.actionsoft.bpms.bo.engine.BO;
|
import com.actionsoft.bpms.bo.engine.BO;
|
||||||
import com.actionsoft.bpms.commons.database.DBUtils;
|
import com.actionsoft.bpms.commons.database.DBUtils;
|
||||||
import com.actionsoft.bpms.commons.database.RowMap;
|
import com.actionsoft.bpms.commons.database.RowMap;
|
||||||
|
import com.actionsoft.bpms.commons.pagination.SQLPagination;
|
||||||
import com.actionsoft.bpms.server.UserContext;
|
import com.actionsoft.bpms.server.UserContext;
|
||||||
import com.actionsoft.bpms.util.DBSql;
|
import com.actionsoft.bpms.util.DBSql;
|
||||||
import com.actionsoft.bpms.util.UtilDate;
|
import com.actionsoft.bpms.util.UtilDate;
|
||||||
@ -317,11 +318,13 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
|
|||||||
}
|
}
|
||||||
// 分页查询数据
|
// 分页查询数据
|
||||||
do {
|
do {
|
||||||
String querySql = "SELECT * FROM " + tableName +
|
String querySqls = "SELECT * FROM " + tableName +
|
||||||
" WHERE " + conditionBuilder.toString() +
|
" WHERE " + conditionBuilder.toString() +" ";
|
||||||
" LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
|
// " LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
|
||||||
|
LOGGER.debug("执行查询querySqls: {}", querySqls);
|
||||||
|
String querySql = SQLPagination.getPaginitionSQL(querySqls, (pageNo - 1) * PAGE_SIZE, PAGE_SIZE,DBname);
|
||||||
|
|
||||||
LOGGER.debug("执行查询: {}", querySql);
|
LOGGER.debug("执行查询querySql: {}", querySql);
|
||||||
|
|
||||||
List<RowMap> pageData;
|
List<RowMap> pageData;
|
||||||
// 根据条件类型执行查询
|
// 根据条件类型执行查询
|
||||||
@ -420,6 +423,7 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
|
|||||||
*/
|
*/
|
||||||
public int processAndInsertData(List<RowMap> sourceData,
|
public int processAndInsertData(List<RowMap> sourceData,
|
||||||
List<BO> mappings, String targetTable) {
|
List<BO> mappings, String targetTable) {
|
||||||
|
String bkgs = "";
|
||||||
if (sourceData.isEmpty()) {
|
if (sourceData.isEmpty()) {
|
||||||
LOGGER.info("没有需要同步的数据");
|
LOGGER.info("没有需要同步的数据");
|
||||||
return 0;
|
return 0;
|
||||||
@ -436,6 +440,7 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
|
|||||||
try {
|
try {
|
||||||
// 字段映射转换
|
// 字段映射转换
|
||||||
BO targetData = convertFields(record, mappings);
|
BO targetData = convertFields(record, mappings);
|
||||||
|
bkgs = targetData.getString("BKGS");
|
||||||
batchList.add(targetData);
|
batchList.add(targetData);
|
||||||
|
|
||||||
// 批量插入条件:达到批处理大小或最后一条
|
// 批量插入条件:达到批处理大小或最后一条
|
||||||
@ -450,8 +455,8 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// 增加详细日志输出:共处理多少条,成功同步多少条
|
// 增加详细日志输出:共处理多少条,成功同步多少条
|
||||||
LOGGER.info("本次处理{}条数据,成功同步{}条数据到表[{}]",
|
LOGGER.info("同步板块为:{},落地表为:{},本次处理{}条数据,成功同步{}条数据到表[{}]",
|
||||||
processedCount, successCount, targetTable);
|
bkgs,targetTable,processedCount, successCount, targetTable);
|
||||||
return successCount;
|
return successCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -639,10 +644,11 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 汇总各板块销售数据汇总
|
* 汇总各板块销售数据汇总
|
||||||
* @param targetTable
|
* @param targetTable 落地表字段
|
||||||
* @param startDated
|
* @param startDated 开始时间
|
||||||
* @param endDated
|
* @param endDated 结束时间
|
||||||
* @param targetTimeField
|
* @param targetTimeField 时间范围字段
|
||||||
|
* @param hzb 汇总表
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void summarizeScopeData(String targetTable, Date startDated, Date endDated, String targetTimeField, String hzb) {
|
public void summarizeScopeData(String targetTable, Date startDated, Date endDated, String targetTimeField, String hzb) {
|
||||||
@ -714,141 +720,4 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 高斯数据库连接池(简化版)
|
|
||||||
// public static class GaussDataSource {
|
|
||||||
// private static final String URL = GAUSSIAN_JDBC_URL;
|
|
||||||
// private static final String USER = GAUSSIAN_USERNAME;
|
|
||||||
// private static final String PASSWORD = GAUSSIAN_PASSWORD;
|
|
||||||
//
|
|
||||||
// public static Connection getConnection() throws SQLException {
|
|
||||||
// String driver = "com.huawei.gaussdb.jdbc.Driver";
|
|
||||||
// try {
|
|
||||||
// //加载数据库驱动。
|
|
||||||
// Class.forName(driver).newInstance();
|
|
||||||
// } catch (Exception e) {
|
|
||||||
// e.printStackTrace();
|
|
||||||
// return null;
|
|
||||||
// }
|
|
||||||
// Connection connection = DriverManager.getConnection(URL, USER, PASSWORD);
|
|
||||||
// boolean autoCommit = connection.getAutoCommit();
|
|
||||||
// LOGGER.info("autoCommit:{}",autoCommit);
|
|
||||||
// return connection;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// /**
|
|
||||||
// * 高斯数据库专用查询方法(支持分区和分页)
|
|
||||||
// */
|
|
||||||
// @Override
|
|
||||||
// public void queryGaussDataWithCondition(String tableName, String timeField,
|
|
||||||
// Date startDate, Date endDate, String partitionField,
|
|
||||||
// List<BO> fieldMappings, String targetTable) {
|
|
||||||
// int totalRows = 0; // 总查询行数
|
|
||||||
// int totalSuccess = 0; // 总成功插入行数
|
|
||||||
// int pageNo = 1;
|
|
||||||
// boolean hasMore;
|
|
||||||
// Connection conn = null;
|
|
||||||
//
|
|
||||||
// try {
|
|
||||||
// conn = GaussDataSource.getConnection();
|
|
||||||
// LOGGER.info("成功连接高斯数据库");
|
|
||||||
//
|
|
||||||
// // 构建查询条件
|
|
||||||
// StringBuilder conditionBuilder = new StringBuilder();
|
|
||||||
//
|
|
||||||
// // 修改点:分区字段和时间字段组合查询条件
|
|
||||||
// if (partitionField != null && !partitionField.isEmpty()) {
|
|
||||||
// // 1. 查询最大分区值
|
|
||||||
// String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName;
|
|
||||||
// try (Statement stmt = conn.createStatement();
|
|
||||||
// ResultSet rs = stmt.executeQuery(maxPartitionSql)) {
|
|
||||||
//
|
|
||||||
// if (rs.next()) {
|
|
||||||
// String maxPartition = rs.getString("max_partition");
|
|
||||||
// // 添加分区条件
|
|
||||||
// conditionBuilder.append(partitionField)
|
|
||||||
// .append(" = '")
|
|
||||||
// .append(maxPartition)
|
|
||||||
// .append("'");
|
|
||||||
// LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition);
|
|
||||||
// } else {
|
|
||||||
// LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField);
|
|
||||||
// return;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // 添加时间范围条件(无论是否有分区字段,只要时间字段存在)
|
|
||||||
// if (timeField != null && !timeField.isEmpty()) {
|
|
||||||
// if (conditionBuilder.length() > 0) {
|
|
||||||
// conditionBuilder.append(" AND ");
|
|
||||||
// }
|
|
||||||
// conditionBuilder.append(timeField)
|
|
||||||
// .append(" BETWEEN '")
|
|
||||||
// .append(new Timestamp(startDate.getTime()))
|
|
||||||
// .append("' AND '")
|
|
||||||
// .append(new Timestamp(endDate.getTime()))
|
|
||||||
// .append("'");
|
|
||||||
// } else if (conditionBuilder.length() == 0) {
|
|
||||||
// // 既没有分区字段也没有时间字段,查询全表(实际应避免)
|
|
||||||
// LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!");
|
|
||||||
// conditionBuilder.append("1=1");
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // 分页查询数据
|
|
||||||
// do {
|
|
||||||
// String querySql = "SELECT * FROM " + tableName;
|
|
||||||
// // 如果有条件则添加WHERE子句
|
|
||||||
// if (conditionBuilder.length() > 0) {
|
|
||||||
// querySql += " WHERE " + conditionBuilder.toString();
|
|
||||||
// }
|
|
||||||
// querySql += " LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
|
|
||||||
//
|
|
||||||
// LOGGER.debug("执行高斯查询: {}", querySql);
|
|
||||||
//
|
|
||||||
// try (Statement stmt = conn.createStatement();
|
|
||||||
// ResultSet rs = stmt.executeQuery(querySql)) {
|
|
||||||
//
|
|
||||||
// List<RowMap> pageData = new ArrayList<>();
|
|
||||||
// ResultSetMetaData metaData = rs.getMetaData();
|
|
||||||
// int columnCount = metaData.getColumnCount();
|
|
||||||
//
|
|
||||||
// while (rs.next()) {
|
|
||||||
// BO row = new BO();
|
|
||||||
// for (int i = 1; i <= columnCount; i++) {
|
|
||||||
// String colName = metaData.getColumnName(i);
|
|
||||||
// Object value = rs.getObject(i);
|
|
||||||
// row.set(colName, value);
|
|
||||||
// }
|
|
||||||
// RowMap map = new RowMap(row.asMap());
|
|
||||||
// pageData.add(map);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if (!pageData.isEmpty()) {
|
|
||||||
// // 直接处理当前页数据
|
|
||||||
// int successCount = processAndInsertData(pageData, fieldMappings, targetTable);
|
|
||||||
// totalRows += pageData.size();
|
|
||||||
// totalSuccess += successCount;
|
|
||||||
// hasMore = pageData.size() == PAGE_SIZE;
|
|
||||||
// pageNo++;
|
|
||||||
// } else {
|
|
||||||
// hasMore = false;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// } while (hasMore);
|
|
||||||
//
|
|
||||||
// LOGGER.info("从高斯表[{}]共查询到{}条数据,成功同步{}条数据",
|
|
||||||
// tableName, totalRows, totalSuccess);
|
|
||||||
// } catch (SQLException e) {
|
|
||||||
// throw new RuntimeException("高斯数据库查询失败: " + e.getMessage(), e);
|
|
||||||
// } finally {
|
|
||||||
// if (conn != null) {
|
|
||||||
// try {
|
|
||||||
// conn.close();
|
|
||||||
// } catch (SQLException e) {
|
|
||||||
// LOGGER.error("关闭高斯数据库连接失败", e);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
Loading…
Reference in New Issue
Block a user