1、修改拉取数据重复值问题,增加分页查询,增加字段金额字段

2、修改涂料防水营收计算逻辑,取同步本地的值,不查询第三方库
This commit is contained in:
llllon 2025-09-01 19:21:19 +08:00
parent cab88d2282
commit e6856253dc
15 changed files with 184 additions and 96 deletions

View File

@ -27,6 +27,7 @@ import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* @ClassName: DataLinkUpController
@ -139,6 +140,7 @@ public class DataLinkUpController {
String tableName = config.getString("TBB");
String tablename = config.getString("TABLENAME");
String bkgs = config.getString("SSBK");
String jezd = config.getString("JEZD");// 金额字段
List<BO> fieldMappings = SDK.getBOAPI()
.query("BO_EU_BNBM_DATALINKUP_SJGTPZ_SUB")
@ -191,7 +193,7 @@ public class DataLinkUpController {
}
syncService.querySourceData(ccId, tableName, timeField, startDate, endDate, partitionField,
fieldMappings, targetTable);
fieldMappings, targetTable, jezd);
}
LOGGER.info("开始使用DataSyncService处理数据同步 ({}条配置)", mainConfigs.size());
@ -441,7 +443,9 @@ public class DataLinkUpController {
.addQuery("BINDID =", bindId)
.list();
//获取板块公司
String bkgs = DBSql.getString("SELECT BKGS FROM " + targetTable, "BKGS");
List<RowMap> maps = DBSql.getMaps("SELECT BKGS FROM " + targetTable + "GROUP BY BKGS");
String bkgss = maps.stream().map(o -> o.getString("BKGS")).collect(Collectors.joining(","));
String[] bkgsArr = bkgss.split(",");
LOGGER.info("字段映射配置查询完成,耗时:{}ms", System.currentTimeMillis() - queryStartTime);
@ -449,11 +453,12 @@ public class DataLinkUpController {
long deleteStartTime = System.currentTimeMillis();
if (timeField == null || timeField.isEmpty()) {
// 全量删除
for (String bkgs : bkgsArr) {
String deleteSql = "DELETE FROM "+hzb+" WHERE BKGS = '"+bkgs+"'";
int deletedCount = DBSql.update(deleteSql);
LOGGER.info("已删除目标表["+hzb+"]中{}条数据(时间范围: {} - {}),耗时:{}ms",
deletedCount, startDate, endDate, System.currentTimeMillis() - deleteStartTime);
}
// 根据时间范围增加数据分页查询数据存储到BO_EU_BNBM_DATALINKUP_XS_XSL_HZ
// 全量分页迁移数据到汇总表
long summarizeStartTime = System.currentTimeMillis();
@ -466,12 +471,13 @@ public class DataLinkUpController {
LOGGER.error("无法找到源时间字段[{}]对应的目标表字段,跳过同步", timeField);
}
// 按时间范围删除
for (String bkgs : bkgsArr) {
String deleteSql = "DELETE FROM " + hzb +
" WHERE BKGS = '"+bkgs+"' AND " + targetTimeField + " BETWEEN ? AND ?";
int deletedCount = DBSql.update(deleteSql, new Object[]{startDate, endDate});
LOGGER.info("已删除目标表["+hzb+"]中{}条数据(时间范围: {} - {})",
deletedCount, startDate, endDate);
}
// 根据时间范围增加数据分页查询数据存储到BO_EU_BNBM_DATALINKUP_XS_XSL_HZ
// 按时间范围分页迁移数据到汇总表
long summarizeStartTime = System.currentTimeMillis();

View File

@ -11,15 +11,15 @@ public enum Section {
// 防水板块配置
WATERPROOF("防水", "北新防水",
"BO_EU_XS_YSL_BXFS",
"T_FS_SalesData_YSD",
"T_FS_SalesData_SKD",
"T_FS_SalesData_SKTKD"),
"BO_EU_SALESDATA_SKD",
"BO_EU_FS_SALESDATA_SKD",
"BO_EU_SALESDATA_SKTKD"),
// 涂料板块配置
PAINT("涂料", "北新涂料",
"BO_EU_XS_YSL_BXTL",
"T_TL_SalesData_YSD",
"T_TL_SalesData_SKD",
"T_TL_SalesData_SKTKD");
"BO_EU_TL_SALESDATA_SKD",
"BO_EU_SALESDATA_SKD_TL",
"BO_EU_TL_SALESDATA_SKTKD");
private final String name;
private final String bkgs;

View File

@ -5,11 +5,9 @@ import com.actionsoft.bpms.commons.database.RowMap;
import com.actionsoft.bpms.schedule.IJob;
import com.actionsoft.bpms.server.UserContext;
import com.actionsoft.bpms.util.DBSql;
import com.actionsoft.bpms.util.UtilDate;
import com.actionsoft.sdk.local.SDK;
import com.actionsoft.sdk.local.api.cc.RDSAPI;
import com.awspaas.user.apps.bnbm.datalinkup.enums.Section;
import com.awspaas.user.apps.bnbm.datalinkup.service.impl.SaleDataSyncServiceImpl;
import org.apache.commons.lang3.StringUtils;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
@ -21,8 +19,6 @@ import java.time.LocalDate;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @ClassName: WaterproofPaintAccountsReceivableJob
@ -107,11 +103,12 @@ public class WaterproofPaintAccountsReceivableJob implements IJob {
section.getTargetTable(), delete, startDateFormat, nowDateFormat);
// 获取RDSAPI实例
RDSAPI rdsapi = SDK.getCCAPI().getRDSAPI(RDS_ID);
// RDSAPI rdsapi = SDK.getCCAPI().getRDSAPI(RDS_ID);
// 查询销售组织和客户分组信息
List<RowMap> maps = rdsapi.getMaps("SELECT FManageRegionName, FSaleOrgUnit, FCustName FROM " +
section.getYsdTable() + " GROUP BY FManageRegionName, FSaleOrgUnit, FCustName");
List<RowMap> maps = DBSql.getMaps("SELECT FMANAGEREGIONNAME, FCUSTNAME, FPROVINCE,FCITY FROM " +
section.getYsdTable() + " GROUP BY FMANAGEREGIONNAME, FCUSTNAME, FPROVINCE,FCITY");
if (maps == null || maps.isEmpty()) {
LOGGER.warn("{}板块未查询到销售组织和客户数据", sectionName);
@ -131,33 +128,33 @@ public class WaterproofPaintAccountsReceivableJob implements IJob {
// 遍历每个销售组织-客户组合
for (RowMap row : maps) {
try {
String manageRegionName = row.getString("FManageRegionName");
String saleOrgUnit = row.getString("FSaleOrgUnit");
String custName = row.getString("FCustName");
String manageRegionName = row.getString("FMANAGEREGIONNAME");
String saleOrgUnit = row.getString("FSALEORGUNIT");
String custName = row.getString("FCUSTNAME");
// 查询省市区信息
RowMap locationInfo = rdsapi.getMap(
"SELECT FProvince, FCity FROM " + section.getYsdTable() +
" WHERE FSaleOrgUnit = ? AND FCustName = ? ",
saleOrgUnit, custName
);
// RowMap locationInfo = rdsapi.getMap(
// "SELECT FProvince, FCity FROM " + section.getYsdTable() +
// " WHERE FSaleOrgUnit = ? AND FCustName = ? ",
// saleOrgUnit, custName
// );
String province = locationInfo != null ? locationInfo.getString("FProvince") : "";
String city = locationInfo != null ? locationInfo.getString("FCity") : "";
String district = locationInfo != null ? locationInfo.getString("FDistrict") : "";
String province = row != null ? row.getString("FPROVINCE") : "";
String city = row != null ? row.getString("FCITY") : "";
String district = row != null ? row.getString("FDISTRICT")!=null ? row.getString("FDISTRICT"):"": "";
// 计算期初余额上一年度
LocalDate previousYearStart = currentDate.minusYears(1).withDayOfYear(1);
LocalDate previousYearEnd = currentDate.minusYears(1).withDayOfYear(365);
BigDecimal qcye = calculateInitialBalance(rdsapi, section, saleOrgUnit, custName, previousYearStart, previousYearEnd);
BigDecimal qcye = calculateInitialBalance(section, saleOrgUnit, custName, previousYearStart, previousYearEnd);
// 计算累计销售本年年初到当前日期
LocalDate currentYearStart = currentDate.withDayOfYear(1);
BigDecimal ljxs = calculateTotalSales(rdsapi, section, saleOrgUnit, custName, currentYearStart, currentDate);
BigDecimal ljxs = calculateTotalSales(section, saleOrgUnit, custName, currentYearStart, currentDate);
// 计算累计还款本年年初到当前日期
BigDecimal ljhk = calculateTotalRepayment(rdsapi, section, saleOrgUnit, custName, currentYearStart, currentDate);
BigDecimal ljhk = calculateTotalRepayment(section, saleOrgUnit, custName, currentYearStart, currentDate);
// 计算应收余额
BigDecimal ysye = qcye.add(ljxs).subtract(ljhk);
@ -212,13 +209,13 @@ public class WaterproofPaintAccountsReceivableJob implements IJob {
/**
* 计算期初余额
*/
private BigDecimal calculateInitialBalance(RDSAPI rdsapi, Section section, String saleOrgUnit, String custName,
private BigDecimal calculateInitialBalance(Section section, String saleOrgUnit, String custName,
LocalDate startDate, LocalDate endDate) {
try {
// 查询应收单总额
RowMap ysResult = rdsapi.getMap(
"SELECT SUM(FSaleAmountRMB) as total FROM " + section.getYsdTable() +
" WHERE FSaleOrgUnit = ? AND FCustName = ? AND FBillDate BETWEEN ? AND ?",
RowMap ysResult = DBSql.getMap(
"SELECT SUM(FSALEAMOUNTRMB) as total FROM " + section.getYsdTable() +
" WHERE FSALEORGUNIT = ? AND FCUSTNAME = ? AND FBILLDATE BETWEEN ? AND ?",
saleOrgUnit, custName, java.sql.Date.valueOf(startDate), java.sql.Date.valueOf(endDate)
);
@ -226,9 +223,9 @@ public class WaterproofPaintAccountsReceivableJob implements IJob {
new BigDecimal(ysResult.get("total").toString()) : BigDecimal.ZERO;
// 查询收款单总额
RowMap skResult = rdsapi.getMap(
"SELECT SUM(FSaleAmountRMB) as total FROM " + section.getSkdTable() +
" WHERE FSaleOrgUnit = ? AND FCustName = ? AND FBillDate BETWEEN ? AND ?",
RowMap skResult = DBSql.getMap(
"SELECT SUM(FSALEAMOUNTRMB) as total FROM " + section.getSkdTable() +
" WHERE FSALEORGUNIT = ? AND FCUSTNAME = ? AND FBILLDATE BETWEEN ? AND ?",
saleOrgUnit, custName, java.sql.Date.valueOf(startDate), java.sql.Date.valueOf(endDate)
);
@ -236,9 +233,9 @@ public class WaterproofPaintAccountsReceivableJob implements IJob {
new BigDecimal(skResult.get("total").toString()) : BigDecimal.ZERO;
// 查询收款退款单总额
RowMap sktkResult = rdsapi.getMap(
"SELECT SUM(FSaleAmountRMB) as total FROM " + section.getSktkdTable() +
" WHERE FSaleOrgUnit = ? AND FCustName = ? AND FBillDate BETWEEN ? AND ?",
RowMap sktkResult = DBSql.getMap(
"SELECT SUM(FSALEAMOUNTRMB) as total FROM " + section.getSktkdTable() +
" WHERE FSALEORGUNIT = ? AND FCUSTNAME = ? AND FBILLDATE BETWEEN ? AND ?",
saleOrgUnit, custName, java.sql.Date.valueOf(startDate), java.sql.Date.valueOf(endDate)
);
@ -257,12 +254,12 @@ public class WaterproofPaintAccountsReceivableJob implements IJob {
/**
* 计算累计销售
*/
private BigDecimal calculateTotalSales(RDSAPI rdsapi, Section section, String saleOrgUnit, String custName,
private BigDecimal calculateTotalSales(Section section, String saleOrgUnit, String custName,
LocalDate startDate, LocalDate endDate) {
try {
RowMap result = rdsapi.getMap(
"SELECT SUM(FSaleAmountRMB) as total FROM " + section.getYsdTable() +
" WHERE FSaleOrgUnit = ? AND FCustName = ? AND FBillDate BETWEEN ? AND ?",
RowMap result = DBSql.getMap(
"SELECT SUM(FSALEAMOUNTRMB) as total FROM " + section.getYsdTable() +
" WHERE FSALEORGUNIT = ? AND FCUSTNAME = ? AND FBILLDATE BETWEEN ? AND ?",
saleOrgUnit, custName, java.sql.Date.valueOf(startDate), java.sql.Date.valueOf(endDate)
);
@ -278,13 +275,13 @@ public class WaterproofPaintAccountsReceivableJob implements IJob {
/**
* 计算累计还款
*/
private BigDecimal calculateTotalRepayment(RDSAPI rdsapi, Section section, String saleOrgUnit, String custName,
private BigDecimal calculateTotalRepayment(Section section, String saleOrgUnit, String custName,
LocalDate startDate, LocalDate endDate) {
try {
// 查询收款单总额
RowMap skResult = rdsapi.getMap(
"SELECT SUM(FSaleAmountRMB) as total FROM " + section.getSkdTable() +
" WHERE FSaleOrgUnit = ? AND FCustName = ? AND FBillDate BETWEEN ? AND ?",
RowMap skResult = DBSql.getMap(
"SELECT SUM(FSALEAMOUNTRMB) as total FROM " + section.getSkdTable() +
" WHERE FSALEORGUNIT = ? AND FCUSTNAME = ? AND FBILLDATE BETWEEN ? AND ?",
saleOrgUnit, custName, java.sql.Date.valueOf(startDate), java.sql.Date.valueOf(endDate)
);
@ -292,9 +289,9 @@ public class WaterproofPaintAccountsReceivableJob implements IJob {
new BigDecimal(skResult.get("total").toString()) : BigDecimal.ZERO;
// 查询收款退款单总额
RowMap sktkResult = rdsapi.getMap(
"SELECT SUM(FSaleAmountRMB) as total FROM " + section.getSktkdTable() +
" WHERE FSaleOrgUnit = ? AND FCustName = ? AND FBillDate BETWEEN ? AND ?",
RowMap sktkResult = DBSql.getMap(
"SELECT SUM(FSALEAMOUNTRMB) as total FROM " + section.getSktkdTable() +
" WHERE FSALEORGUNIT = ? AND FCUSTNAME = ? AND FBILLDATE BETWEEN ? AND ?",
saleOrgUnit, custName, java.sql.Date.valueOf(startDate), java.sql.Date.valueOf(endDate)
);

View File

@ -68,7 +68,7 @@ public interface DataSyncService {
* @return 查询结果数据集
* @throws RuntimeException 查询失败或参数无效时抛出
*/
void querySourceData(String ccId, String tableName, String timeField, Date startDate, Date endDate, String partitionField, List<BO> fieldMappings, String targetTable);
void querySourceData(String ccId, String tableName, String timeField, Date startDate, Date endDate, String partitionField, List<BO> fieldMappings, String targetTable,String jezd);
/**
* 各板块数据汇总

View File

@ -37,7 +37,7 @@ public class ProductionDataSyncServiceImpl implements DataSyncService {
/**
* 增加分页大小常量
*/
private static final int PAGE_SIZE = 1000; // 每页查询1000条记录
private static final int PAGE_SIZE = 10000; // 每页查询1000条记录
private static final String ORACLE_DATE_FORMAT = "YYYY-MM-DD HH24:MI:SS";
@Override
@ -82,6 +82,7 @@ public class ProductionDataSyncServiceImpl implements DataSyncService {
String ccId = mainConfig.getString("CC_ID");
String partitionField = mainConfig.getString("FQBZD");
String bkgs = mainConfig.getString("BKGS");
String jezd = mainConfig.getString("JEZD");
DateRange dateRange = new DateRange();
LOGGER.info("处理配置BindID={}, 源表={}, 目标表={}, CC_ID={}, 时间字段={}, 分区字段配置={}",
@ -124,7 +125,7 @@ public class ProductionDataSyncServiceImpl implements DataSyncService {
deleteTargetData(targetTable, targetTimeField, startDate, endDate);
}
querySourceData(ccId, tableName, timeField, startDate, endDate, partitionField,
fieldMappings, targetTable);
fieldMappings, targetTable,jezd);
dateRange.setStartDate(startDate);
dateRange.setEndDate(endDate);
return dateRange;
@ -193,7 +194,7 @@ public class ProductionDataSyncServiceImpl implements DataSyncService {
@Override
public void querySourceData(String ccId, String tableName,
String timeField, Date startDated, Date endDated,String partitionField,
List<BO> fieldMappings, String targetTable) {
List<BO> fieldMappings, String targetTable,String jezd) {
int totalRows = 0; // 总查询行数
int totalSuccess = 0; // 总成功插入行数
int pageNo = 1;
@ -210,6 +211,7 @@ public class ProductionDataSyncServiceImpl implements DataSyncService {
if ("ORACLE".equalsIgnoreCase(DBname)){
// 构建查询条件
StringBuilder conditionBuilder = new StringBuilder();
StringBuilder orderByBuilder = new StringBuilder(); // 用于构建排序子句
List<Object> params = new ArrayList<>(); // 存储查询参数
// 分区字段和时间字段组合查询条件
@ -243,6 +245,13 @@ public class ProductionDataSyncServiceImpl implements DataSyncService {
.append("') AND TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("')");
// 构建排序子句
orderByBuilder.append(" ORDER BY ").append(timeField);
if (jezd != null && !jezd.isEmpty()) {
orderByBuilder.append(", ").append(jezd);
}
orderByBuilder.append(" DESC");
params.add(startDate);
params.add(endDate);
}
@ -258,6 +267,12 @@ public class ProductionDataSyncServiceImpl implements DataSyncService {
.append("') AND TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("')");
// 构建排序子句
orderByBuilder.append(" ORDER BY ").append(timeField);
if (jezd != null && !jezd.isEmpty()) {
orderByBuilder.append(", ").append(jezd);
}
orderByBuilder.append(" DESC");
params.add(startDate);
params.add(endDate);
} else {
@ -270,8 +285,11 @@ public class ProductionDataSyncServiceImpl implements DataSyncService {
do {
// 使用Oracle分页语法 (12c+)
String querySql = "SELECT * FROM ( " +
"SELECT t.*, ROWNUM rn FROM " + tableName + " t " +
"WHERE " + conditionBuilder.toString() + " AND ROWNUM <= " + (pageNo * PAGE_SIZE) +
"SELECT t.*, ROWNUM rn FROM (" +
"SELECT * FROM " + tableName +
" WHERE " + conditionBuilder.toString() +
orderByBuilder.toString() + // 添加排序子句
") t WHERE ROWNUM <= " + (pageNo * PAGE_SIZE) +
") WHERE rn > " + ((pageNo - 1) * PAGE_SIZE);
LOGGER.debug("执行Oracle查询: {}", querySql);
@ -328,12 +346,20 @@ public class ProductionDataSyncServiceImpl implements DataSyncService {
if (timeField != null && !timeField.isEmpty()) {
conditionBuilder.append(" AND ")
.append(timeField)
.append(" BETWEEN ? AND ?");
.append(" BETWEEN ? AND ? ORDER BY "+timeField+"");
if (jezd!=null && !jezd.isEmpty()){
conditionBuilder.append(", "+jezd+" ");
}
conditionBuilder.append(" DESC");
}
} else if (timeField != null && !timeField.isEmpty()) {
// 没有分区字段但时间字段存在使用时间范围条件
conditionBuilder.append(timeField)
.append(" BETWEEN ? AND ?");
.append(" BETWEEN ? AND ? ORDER BY "+timeField+"");
if (jezd!=null && !jezd.isEmpty()){
conditionBuilder.append(", "+jezd+" ");
}
conditionBuilder.append(" DESC");
} else {
// 既没有分区字段也没有时间字段查询全表实际应避免这种情况
LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!");
@ -634,14 +660,14 @@ public class ProductionDataSyncServiceImpl implements DataSyncService {
do {
if (startDated == null || endDated == null) {
pageSql = "SELECT * FROM " + targetTable +
" LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
" ORDER BY ID LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
pageData = DBSql.getMaps(pageSql);
} else {
startDate = simpleDateFormat.format(startDated);
endDate = simpleDateFormat.format(endDated);
pageSql = "SELECT * FROM " + targetTable +
" WHERE " + targetTimeField + " BETWEEN '" + startDate + "' AND '" + endDate + "' " +
" LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
" ORDER BY ID LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
LOGGER.info("执行查询的sql{}", pageSql);
pageData = DBSql.getMaps(pageSql);
}

View File

@ -39,7 +39,7 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService {
/**
* 增加分页大小常量
*/
private static final int PAGE_SIZE = 1000; // 每页查询1000条记录
private static final int PAGE_SIZE = 10000; // 每页查询1000条记录
private static final String ORACLE_DATE_FORMAT = "YYYY-MM-DD HH24:MI:SS";
@Override
@ -85,6 +85,7 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService {
String partitionField = mainConfig.getString("FQBZD");
String tablename = mainConfig.getString("TABLENAME");
String bkgs = mainConfig.getString("BKGS");
String jezd = mainConfig.getString("JEZD");
DateRange dateRange = new DateRange();
LOGGER.info("处理配置BindID={}, 源表={}, 目标表={}, CC_ID={}, 时间字段={}, 分区字段配置={}",
@ -157,7 +158,7 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService {
deleteTargetData(targetTable, targetTimeField, startDate, endDate);
}
querySourceData(ccId, tableName, timeField, startDate, endDate, partitionField,
fieldMappings, targetTable);
fieldMappings, targetTable,jezd);
dateRange.setStartDate(startDate);
dateRange.setEndDate(endDate);
return dateRange;
@ -225,7 +226,7 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService {
@Override
public void querySourceData(String ccId, String tableName,
String timeField, Date startDated, Date endDated,String partitionField,
List<BO> fieldMappings, String targetTable) {
List<BO> fieldMappings, String targetTable,String jezd) {
int totalRows = 0; // 总查询行数
int totalSuccess = 0; // 总成功插入行数
int pageNo = 1;
@ -242,6 +243,7 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService {
if ("ORACLE".equalsIgnoreCase(DBname)){
// 构建查询条件
StringBuilder conditionBuilder = new StringBuilder();
StringBuilder orderByBuilder = new StringBuilder(); // 用于构建排序子句
List<Object> params = new ArrayList<>(); // 存储查询参数
// 分区字段和时间字段组合查询条件
@ -275,6 +277,13 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService {
.append("') AND TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("')");
// 构建排序子句
orderByBuilder.append(" ORDER BY ").append(timeField);
if (jezd != null && !jezd.isEmpty()) {
orderByBuilder.append(", ").append(jezd);
}
orderByBuilder.append(" DESC");
params.add(startDate);
params.add(endDate);
}
@ -290,6 +299,12 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService {
.append("') AND TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("')");
// 构建排序子句
orderByBuilder.append(" ORDER BY ").append(timeField);
if (jezd != null && !jezd.isEmpty()) {
orderByBuilder.append(", ").append(jezd);
}
orderByBuilder.append(" DESC");
params.add(startDate);
params.add(endDate);
} else {
@ -302,8 +317,11 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService {
do {
// 使用Oracle分页语法 (12c+)
String querySql = "SELECT * FROM ( " +
"SELECT t.*, ROWNUM rn FROM " + tableName + " t " +
"WHERE " + conditionBuilder.toString() + " AND ROWNUM <= " + (pageNo * PAGE_SIZE) +
"SELECT t.*, ROWNUM rn FROM (" +
"SELECT * FROM " + tableName +
" WHERE " + conditionBuilder.toString() +
orderByBuilder.toString() + // 添加排序子句
") t WHERE ROWNUM <= " + (pageNo * PAGE_SIZE) +
") WHERE rn > " + ((pageNo - 1) * PAGE_SIZE);
LOGGER.debug("执行Oracle查询: {}", querySql);
@ -360,12 +378,20 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService {
if (timeField != null && !timeField.isEmpty()) {
conditionBuilder.append(" AND ")
.append(timeField)
.append(" BETWEEN ? AND ?");
.append(" BETWEEN ? AND ? ORDER BY "+timeField+"");
if (jezd!=null && !jezd.isEmpty()){
conditionBuilder.append(", "+jezd+" ");
}
conditionBuilder.append(" DESC");
}
} else if (timeField != null && !timeField.isEmpty()) {
// 没有分区字段但时间字段存在使用时间范围条件
conditionBuilder.append(timeField)
.append(" BETWEEN ? AND ?");
.append(" BETWEEN ? AND ? ORDER BY "+timeField+"");
if (jezd!=null && !jezd.isEmpty()){
conditionBuilder.append(", "+jezd+" ");
}
conditionBuilder.append(" DESC");
} else {
// 既没有分区字段也没有时间字段查询全表实际应避免这种情况
LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!");
@ -688,14 +714,14 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService {
do {
if (startDated == null || endDated == null) {
pageSql = "SELECT * FROM " + targetTable +
" LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
" ORDER BY ID LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
pageData = DBSql.getMaps(pageSql);
} else {
startDate = simpleDateFormat.format(startDated);
endDate = simpleDateFormat.format(endDated);
pageSql = "SELECT * FROM " + targetTable +
" WHERE " + targetTimeField + " BETWEEN '" + startDate + "' AND '" + endDate + "' " +
" LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
" ORDER BY ID LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
LOGGER.info("执行查询的sql{}", pageSql);
pageData = DBSql.getMaps(pageSql);
}

View File

@ -507,18 +507,25 @@ public class SaleCountDimensionImpl implements DataSummaryService {
Map<String, RowMap> receivableMap = new HashMap<>();
for (RowMap map : receivableMaps) {
String xszz = map.getString("XSZZ");
String qygs = map.getString("QYGS");
List<RowMap> maps = DBSql.getMaps("SELECT * FROM BO_EU_BNBM_QYGSDYB WHERE QYGS = '" + qygs + "'");
for (RowMap rowMap : maps) {
String xszz = rowMap.getString("XSZZ");
xszzList.add(xszz);
receivableMap.put(xszz, map);
}
}
// 第二个SQL查询库存金额数据
if (!xszzList.isEmpty()) {
String placeholders = String.join(",", Collections.nCopies(xszzList.size(), "?"));
String inClause = xszzList.stream()
.map(s -> "'" + s.replace("'", "''") + "'") // 转义单引号防止SQL注入
.collect(Collectors.joining(","));
String inventorySql = "SELECT STOCKORGNAME, SUM(BALANCE_AMOUNT) as KCJE " +
"FROM " + BO_EU_DWD_ORDER_KC_HZ + " " +
"WHERE STOCKORGNAME IN ('" + placeholders + "') " +
"WHERE STOCKORGNAME IN (" + inClause + ") " +
"AND CATEGORY = '产成品' " +
"AND YEAR(INDATE) = YEAR('"+lastDayOfMonth+"') " +
"AND MONTH(INDATE) = MONTH('"+lastDayOfMonth+"') " +

View File

@ -37,7 +37,7 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
// private static final String GAUSSIAN_USERNAME = SDK.getAppAPI().getProperty("com.awspaas.user.apps.bnbm.datalinkup", "jbl_data_act");
// private static final String GAUSSIAN_PASSWORD = SDK.getAppAPI().getProperty("com.awspaas.user.apps.bnbm.datalinkup", "jbl_data_pw");
// 增加分页大小常量
private static final int PAGE_SIZE = 1000; // 每页查询1000条记录
private static final int PAGE_SIZE = 10000; // 每页查询1000条记录
private static final String ORACLE_DATE_FORMAT = "YYYY-MM-DD HH24:MI:SS";
@ -100,6 +100,7 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
String ccId = mainConfig.getString("CC_ID");
String partitionField = mainConfig.getString("FQBZD");
String bkgs = mainConfig.getString("BKGS");
String jezd = mainConfig.getString("JEZD");
DateRange dateRange = new DateRange();
LOGGER.info("处理配置BindID={}, 源表={}, 目标表={}, CC_ID={}, 时间字段={}, 分区字段配置={}",
@ -150,7 +151,7 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
// } else {
// 查询源表数据跨库查询
querySourceData(ccId, tableName, timeField, startDate, endDate, partitionField,
fieldMappings, targetTable);
fieldMappings, targetTable,jezd);
// }
dateRange.setStartDate(startDate);
dateRange.setEndDate(endDate);
@ -170,7 +171,7 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
@Override
public void querySourceData(String ccId, String tableName,
String timeField, Date startDated, Date endDated,String partitionField,
List<BO> fieldMappings, String targetTable) {
List<BO> fieldMappings, String targetTable,String jezd) {
int totalRows = 0; // 总查询行数
int totalSuccess = 0; // 总成功插入行数
int pageNo = 1;
@ -187,6 +188,7 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
if ("ORACLE".equalsIgnoreCase(DBname)){
// 构建查询条件
StringBuilder conditionBuilder = new StringBuilder();
StringBuilder orderByBuilder = new StringBuilder(); // 用于构建排序子句
List<Object> params = new ArrayList<>(); // 存储查询参数
// 分区字段和时间字段组合查询条件
@ -220,6 +222,13 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
.append("') AND TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("')");
// 构建排序子句
orderByBuilder.append(" ORDER BY ").append(timeField);
if (jezd != null && !jezd.isEmpty()) {
orderByBuilder.append(", ").append(jezd);
}
orderByBuilder.append(" DESC");
params.add(startDate);
params.add(endDate);
}
@ -235,6 +244,12 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
.append("') AND TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("')");
// 构建排序子句
orderByBuilder.append(" ORDER BY ").append(timeField);
if (jezd != null && !jezd.isEmpty()) {
orderByBuilder.append(", ").append(jezd);
}
orderByBuilder.append(" DESC");
params.add(startDate);
params.add(endDate);
} else {
@ -247,8 +262,11 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
do {
// 使用Oracle分页语法 (12c+)
String querySql = "SELECT * FROM ( " +
"SELECT t.*, ROWNUM rn FROM " + tableName + " t " +
"WHERE " + conditionBuilder.toString() + " AND ROWNUM <= " + (pageNo * PAGE_SIZE) +
"SELECT t.*, ROWNUM rn FROM (" +
"SELECT * FROM " + tableName +
" WHERE " + conditionBuilder.toString() +
orderByBuilder.toString() + // 添加排序子句
") t WHERE ROWNUM <= " + (pageNo * PAGE_SIZE) +
") WHERE rn > " + ((pageNo - 1) * PAGE_SIZE);
LOGGER.debug("执行Oracle查询: {}", querySql);
@ -305,12 +323,20 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
if (timeField != null && !timeField.isEmpty()) {
conditionBuilder.append(" AND ")
.append(timeField)
.append(" BETWEEN ? AND ?");
.append(" BETWEEN ? AND ? ORDER BY "+timeField+"");
if (jezd!=null && !jezd.isEmpty()){
conditionBuilder.append(", "+jezd+" ");
}
conditionBuilder.append(" DESC");
}
} else if (timeField != null && !timeField.isEmpty()) {
// 没有分区字段但时间字段存在使用时间范围条件
conditionBuilder.append(timeField)
.append(" BETWEEN ? AND ?");
.append(" BETWEEN ? AND ? ORDER BY "+timeField+"");
if (jezd!=null && !jezd.isEmpty()){
conditionBuilder.append(", "+jezd+" ");
}
conditionBuilder.append(" DESC");
} else {
// 既没有分区字段也没有时间字段查询全表实际应避免这种情况
LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!");
@ -321,10 +347,10 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
String querySqls = "SELECT * FROM " + tableName +
" WHERE " + conditionBuilder.toString() +" ";
// " LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
LOGGER.debug("执行查询querySqls: {}", querySqls);
LOGGER.info("执行查询querySqls: {}", querySqls);
String querySql = SQLPagination.getPaginitionSQL(querySqls, (pageNo - 1) * PAGE_SIZE, PAGE_SIZE,DBname);
LOGGER.debug("执行查询querySql: {}", querySql);
LOGGER.info("执行查询querySql: {}", querySql);
List<RowMap> pageData;
// 根据条件类型执行查询
@ -456,7 +482,7 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
}
}
// 增加详细日志输出共处理多少条成功同步多少条
bkgs = bkgsSet.toArray().toString();
bkgs = Arrays.toString(bkgsSet.toArray());
LOGGER.info("同步板块为:{};落地表为:{},本次处理{}条数据,成功同步{}条数据到表[{}]",
bkgs,targetTable,processedCount, successCount, targetTable);
return successCount;
@ -665,14 +691,14 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
do {
if (startDated == null || endDated == null) {
pageSql = "SELECT * FROM " + targetTable +
" LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
" ORDER BY ID LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
pageData = DBSql.getMaps(pageSql);
} else {
startDate = simpleDateFormat.format(startDated);
endDate = simpleDateFormat.format(endDated);
pageSql = "SELECT * FROM " + targetTable +
" WHERE " + targetTimeField + " BETWEEN '" + startDate + "' AND '" + endDate + "' " +
" LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
" ORDER BY ID LIMIT " + PAGE_SIZE + " OFFSET " + (pageNo - 1) * PAGE_SIZE;
LOGGER.info("执行查询的sql{}", pageSql);
pageData = DBSql.getMaps(pageSql);
}