1、增加同步明细应收方法

2、一体化日均价更新城市简称
This commit is contained in:
llllon 2025-09-17 10:46:06 +08:00
parent 445053b513
commit 22021eef26
12 changed files with 269 additions and 7 deletions

View File

@ -191,9 +191,14 @@ public class DataLinkUpController {
// 按时间范围删除
syncService.deleteTargetData(targetTable, targetTimeField, startDate, endDate);
}
if (tablename.equals("应收表")){
syncService.querySourceDataYS(ccId, tableName, timeField, startDate, endDate, partitionField,
fieldMappings, targetTable, jezd);
}else {
syncService.querySourceData(ccId, tableName, timeField, startDate, endDate, partitionField,
fieldMappings, targetTable, jezd);
}
syncService.querySourceData(ccId, tableName, timeField, startDate, endDate, partitionField,
fieldMappings, targetTable, jezd);
}
LOGGER.info("开始使用DataSyncService处理数据同步 ({}条配置)", mainConfigs.size());

View File

@ -69,6 +69,18 @@ public interface DataSyncService {
* @throws RuntimeException 查询失败或参数无效时抛出
*/
void querySourceData(String ccId, String tableName, String timeField, Date startDate, Date endDate, String partitionField, List<BO> fieldMappings, String targetTable,String jezd);
/**
* 跨库查询源表数据-应收
* @param ccId 跨库连接ID
* @param tableName 源表名
* @param timeField 源表时间字段名
* @param startDate 开始时间
* @param endDate 结束时间
* @return 查询结果数据集
* @throws RuntimeException 查询失败或参数无效时抛出
*/
void querySourceDataYS(String ccId, String tableName, String timeField, Date startDate, Date endDate, String partitionField, List<BO> fieldMappings, String targetTable, String jezd);
/**
* 各板块数据汇总
@ -104,8 +116,8 @@ public interface DataSyncService {
/**
* 应收将范围内数据汇总
* @param targetTable
* @param startDate
* @param endDate
* @param startDated
* @param endDated
* @param targetTimeField
*/
void summarizeScopeDataYs(String targetTable, Date startDated, Date endDated, String targetTimeField, String hzb);

View File

@ -802,4 +802,9 @@ public class ProductionDataSyncServiceImpl implements DataSyncService {
public void summarizeScopeDataYs(String targetTable, Date startDated, Date endDated, String targetTimeField, String hzb) {
}
@Override
public void querySourceDataYS(String ccId, String tableName, String timeField, Date startDate, Date endDate, String partitionField, List<BO> fieldMappings, String targetTable, String jezd) {
}
}

View File

@ -943,4 +943,10 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService {
public void summarizeScopeDataYs(String targetTable, Date startDated, Date endDated, String targetTimeField, String hzb) {
}
@Override
public void querySourceDataYS(String ccId, String tableName, String timeField, Date startDate, Date endDate, String partitionField, List<BO> fieldMappings, String targetTable, String jezd) {
}
}

View File

@ -82,7 +82,7 @@ public class SaleDataSummaryServiceImpl implements DataSummaryService {
LOGGER.error("主配置中BKGS为空无法进行汇总计算");
return;
}
if (!"龙牌".equals(bkgs) || !"梦牌".equals(bkgs) || !"泰山石膏".equals(bkgs) || !"北新嘉宝莉".equals(bkgs) || !"北新涂料".equals(bkgs)){
if ((!"龙牌,梦牌,泰山石膏,北新嘉宝莉,北新涂料".contains(bkgs))){
LOGGER.error("主配置中板块公司为:{},无需进行计算",bkgs);
return;
}
@ -257,6 +257,15 @@ public class SaleDataSummaryServiceImpl implements DataSummaryService {
list.add(priceBO);
}
SDK.getBOAPI().createDataBO(UNIT_PRICE_DAILY_TABLE,list,UserContext.fromUID("admin"));
//更新城市名称
int update = DBSql.update("UPDATE" +
" BO_EU_DATALINKUP_FACT_UNIT_PRICE_DAILY a " +
" INNER JOIN " +
" BO_EU_YTH_CCJCWH b ON a.CITY = b.CS " +
" SET " +
" a.CITY_SHORT = b.CSJC");
LOGGER.info("一体化产品单价日明细更新城市名称");
}
}
LOGGER.info("产品单价日明细数据保存成功");
@ -450,10 +459,10 @@ public class SaleDataSummaryServiceImpl implements DataSummaryService {
// 根据省份value值查询数据
String sfSql = sfMap.get(key);
String sql = "SELECT BKGS,LB_1,LB_2 AS brand,CONCAT(YEAR, '-', LPAD(MONTH, 2, '0')) ASyear_month," +
" SUM(ZSSL) AS total_zssl, SUM(XSSL)/10000 AS total_xssl_10k, SUM(SSJERMB) AS total_ssjermb," +
" SUM(ZSSL) AS total_zssl, SUM(XSSL)/10000 AS total_xssl_10k, SUM(NMNY) AS total_ssjermb," +
" (SUM(ZSSL)/(SUM(SUM(ZSSL)) OVER (PARTITION BY LB_1, YEAR(DZRQ), MONTH(DZRQ)))) *100 AS lb1_month_zssl_total," +
" ((SUM(XSSL)/10000)/(SUM(SUM(XSSL)) OVER (PARTITION BY LB_1, YEAR(DZRQ), MONTH(DZRQ)) / 10000))*100 AS lb1_month_xssl_total_10k," +
" (SUM(SSJERMB)/(SUM(SUM(SSJERMB)) OVER (PARTITION BY LB_1, YEAR(DZRQ), MONTH(DZRQ))))*100 AS lb1_month_ssjermb_total" +
" (SUM(NMNY)/(SUM(SUM(NMNY)) OVER (PARTITION BY LB_1, YEAR(DZRQ), MONTH(DZRQ))))*100 AS lb1_month_ssjermb_total" +
" FROM BO_EU_BNBM_DATALINKUP_XS_XSL_HZ " +
" WHERE DZRQ>? AND BKGS = ? AND LB_1 IN('石膏板','轻钢龙骨','涂料') AND ("+sfSql+") " +
" GROUP BY LB_2,year, month" +

View File

@ -1054,4 +1054,229 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
}
}
/**
* 跨库查询源表数据增加每月最大日期查询功能
* @param ccId 跨库连接ID
* @param tableName 源表名
* @param timeField 源表时间字段名
* @param startDated 开始时间
* @param endDated 结束时间
* @return 查询结果数据集
* @throws RuntimeException 查询失败或参数无效时抛出
*/
@Override
public void querySourceDataYS(String ccId, String tableName, String timeField, Date startDated, Date endDated, String partitionField, List<BO> fieldMappings, String targetTable, String jezd) {
int totalRows = 0; // 总查询行数
int totalSuccess = 0; // 总成功插入行数
int pageNo = 1;
boolean hasMore;
RDSAPI rdsapi = null;
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
SimpleDateFormat simpleDateFormat2 = new SimpleDateFormat("yyyy-MM-dd");
try {
rdsapi = SDK.getCCAPI().getRDSAPI(ccId);
DBUtils.SUPPLY supply = rdsapi.getSupply();
String DBname = supply.getName();
LOGGER.info("数据库为:{}",DBname);
// 计算时间范围并拆分为30天一组
List<Date[]> timeRanges = splitTimeRange(startDated, endDated, 30);
LOGGER.info("时间范围拆分为 {} 个查询区间", timeRanges.size());
String maxPartition = "";
if (partitionField != null && !partitionField.isEmpty()) {
// 1. 查询最大分区值
String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName;
List<RowMap> maxPartitionResult = rdsapi.getMaps(maxPartitionSql);
if (maxPartitionResult.isEmpty() || maxPartitionResult.get(0).get("max_partition") == null) {
LOGGER.warn("表[{}]没有找到分区字段[{}]的数据", tableName, partitionField);
return;
}
maxPartition = maxPartitionResult.get(0).getString("max_partition");
LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition);
}
for (int i = 0; i < timeRanges.size(); i++) {
Date[] range = timeRanges.get(i);
String startDate = simpleDateFormat2.format(range[0]);
String endDate = simpleDateFormat2.format(range[1]);
LOGGER.info("正在处理第 {} 个时间区间: {} 至 {}", i + 1, startDate, endDate);
// 构建基础查询条件
StringBuilder conditionBuilder = new StringBuilder();
StringBuilder orderByBuilder = new StringBuilder();
List<Object> params = new ArrayList<>();
// 分区字段条件
if (partitionField != null && !partitionField.isEmpty()) {
conditionBuilder.append(partitionField)
.append(" = '")
.append(maxPartition)
.append("'");
}
// 时间字段条件
if (timeField != null && !timeField.isEmpty()) {
if (conditionBuilder.length() > 0) {
conditionBuilder.append(" AND ");
}
if ("ORACLE".equalsIgnoreCase(DBname)) {
conditionBuilder.append("TO_DATE(")
.append(timeField)
.append(", '")
.append(ORACLE_DATE_FORMAT)
.append("') >= TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("') AND TO_DATE(")
.append(timeField)
.append(", '")
.append(ORACLE_DATE_FORMAT)
.append("') < TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("')");
} else {
conditionBuilder.append(timeField).append(" >= ?")
.append(" AND ")
.append(timeField).append(" < ?");
}
params.add(startDate);
params.add(endDate);
// 排序条件
orderByBuilder.append(" ORDER BY ").append(timeField);
if (jezd != null && !jezd.isEmpty()) {
orderByBuilder.append(", ").append(jezd);
}
orderByBuilder.append(" DESC");
} else if (conditionBuilder.length() == 0) {
// 既没有分区字段也没有时间字段查询全表
LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!");
conditionBuilder.append("1=1");
}
// 添加每月最大日期条件
if (timeField != null && !timeField.isEmpty()) {
if (conditionBuilder.length() > 0) {
conditionBuilder.append(" AND ");
}
// 根据不同数据库类型实现每月最大日期查询
if ("ORACLE".equalsIgnoreCase(DBname)) {
conditionBuilder.append("TO_CHAR(").append(timeField).append(", 'YYYY-MM') = ")
.append("(SELECT TO_CHAR(MAX(").append(timeField).append("), 'YYYY-MM') FROM ")
.append(tableName).append(" WHERE ").append(conditionBuilder.toString()).append(")");
} else if ("POSTGRESQL".equalsIgnoreCase(DBname) || "GAUSS".equalsIgnoreCase(DBname)) {
// 高斯和PG使用相同的语法
conditionBuilder.append("TO_CHAR(").append(timeField).append(", 'YYYY-MM') = ")
.append("(SELECT TO_CHAR(MAX(").append(timeField).append("), 'YYYY-MM') FROM ")
.append(tableName).append(" WHERE ").append(conditionBuilder.toString()).append(")");
} else if ("SQLSERVER".equalsIgnoreCase(DBname)) {
conditionBuilder.append("FORMAT(").append(timeField).append(", 'yyyy-MM') = ")
.append("(SELECT FORMAT(MAX(").append(timeField).append("), 'yyyy-MM') FROM ")
.append(tableName).append(" WHERE ").append(conditionBuilder.toString()).append(")");
} else {
// 其他数据库使用通用方法
conditionBuilder.append("DATE_FORMAT(").append(timeField).append(", '%Y-%m') = ")
.append("(SELECT DATE_FORMAT(MAX(").append(timeField).append("), '%Y-%m') FROM ")
.append(tableName).append(" WHERE ").append(conditionBuilder.toString()).append(")");
}
}
// 分页查询数据
do {
String querySql;
if ("ORACLE".equalsIgnoreCase(DBname)) {
// 使用Oracle分页语法 (12c+)
querySql = "SELECT * FROM " + tableName +
" WHERE " + conditionBuilder.toString() +
orderByBuilder.toString();
} else {
querySql = "SELECT * FROM " + tableName +
" WHERE " + conditionBuilder.toString() +
orderByBuilder.toString();
}
LOGGER.info("执行查询: {}", querySql);
List<RowMap> pageData;
if (params.isEmpty()) {
pageData = rdsapi.getMaps(querySql);
} else {
pageData = rdsapi.getMaps(querySql, params.toArray());
}
if (pageData != null && !pageData.isEmpty()) {
// 直接处理当前页数据
int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable);
totalRows += pageData.size();
totalSuccess += successCount;
hasMore = pageData.size() == PAGE_SIZE;
pageNo++;
} else {
hasMore = false;
}
} while (hasMore);
}
//每月1号删除上月去年同期去除每月最后一天的数据删除当月去年同期全量数据后新增当月去年同期全量数据
// 每月1号执行的任务
LocalDate now = LocalDate.now();
int dayOfMonth = now.getDayOfMonth();
// if (dayOfMonth == 1) {
try {
LOGGER.info("开始执行每月1号的特殊数据处理任务");
// 计算时间范围
LocalDate lastYearMonthDate = now.minusYears(1).minusMonths(1);
int lastYearMonthValue = lastYearMonthDate.getMonthValue();
int lastyear = lastYearMonthDate.getYear();
// 1. 删除当月去年同期全量数据
String deleteCurrentMonthLastYearSql = "DELETE FROM " + targetTable +
" WHERE YEAR = " + lastyear +
" AND MONTH = " + lastYearMonthValue;
int update = DBSql.update(deleteCurrentMonthLastYearSql);
LOGGER.info("已删除去年同期({}-{})的全量数据,删除了{}条数据", lastyear, lastYearMonthValue,update);
// 2. 新增当月去年同期数据
// 去年日期
LocalDate minusYears = now.minusYears(1);
// 计算去年同期第一天当月第一天
LocalDate firstDayOfLastYearMonth = minusYears.withDayOfMonth(1);
// 计算去年下个月第一天
LocalDate firstDayOfNextMonthLastYear = minusYears.plusMonths(1).withDayOfMonth(1);
// 格式化日期为字符串根据数据库格式要求调整
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
String lastYearFirstDayStr = firstDayOfLastYearMonth.format(formatter);
String lastYearNextMonthFirstDayStr = firstDayOfNextMonthLastYear.format(formatter);
//删除清空去年同期数据
int update1 = DBSql.update("DELETE FROM " + targetTable +
" WHERE YEAR = " + firstDayOfLastYearMonth.getYear() +
" AND MONTH = " + firstDayOfLastYearMonth.getMonthValue());
LOGGER.info("先清空{}去年同期数{}",lastYearFirstDayStr,update1);
// 构建插入SQL注意需确保字段匹配且处理可能的主键冲突
String insertCurrentMonthLastYearSql = "INSERT INTO " + targetTable +
" SELECT * FROM " + targetTable +
" WHERE " + timeField + " >= '" + lastYearFirstDayStr +
"' AND " + timeField + " < '" + lastYearNextMonthFirstDayStr + "'";
// 执行插入操作
rdsapi.update(insertCurrentMonthLastYearSql);
LOGGER.info("{}已新增当月去年同期数据",lastYearFirstDayStr);
} catch (Exception e) {
LOGGER.error("每月1号任务执行失败: {}", e.getMessage(), e);
}
// }
LOGGER.info("从表[{}]共查询到{}条数据,成功同步{}条数据",
tableName, totalRows, totalSuccess);
} catch (Exception e) {
throw new RuntimeException("查询源表[" + tableName + "]数据失败: " + e.getMessage(), e);
}
}
}