修改销售同步汇总数据-增加判断销售增加线程池;同步sql错误

This commit is contained in:
llllon 2025-08-24 09:31:21 +08:00
parent 02139884bf
commit a90df695a4
7 changed files with 99 additions and 28 deletions

View File

@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Calendar; import java.util.Calendar;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.*;
/** /**
* @ClassName: DataLinkUpController * @ClassName: DataLinkUpController
@ -197,6 +198,8 @@ public class DataLinkUpController {
Date startDate = Date.from(startDateTime.atZone(ZoneId.systemDefault()).toInstant()); Date startDate = Date.from(startDateTime.atZone(ZoneId.systemDefault()).toInstant());
DataSummaryService summaryService = null; DataSummaryService summaryService = null;
SaleCountDimensionImpl saleCountDimension = null; SaleCountDimensionImpl saleCountDimension = null;
ExecutorService executorService = null; // 线程池用于并行处理销售业务
try { try {
LOGGER.info("开始执行销售数据多维度汇总计算"); LOGGER.info("开始执行销售数据多维度汇总计算");
@ -219,26 +222,82 @@ public class DataLinkUpController {
if ("销售".equals(ssyw)) { if ("销售".equals(ssyw)) {
summaryService = new SaleDataSummaryServiceImpl(); summaryService = new SaleDataSummaryServiceImpl();
saleCountDimension = new SaleCountDimensionImpl(); saleCountDimension = new SaleCountDimensionImpl();
LOGGER.info("销售销售的接口"); executorService = Executors.newFixedThreadPool(2); // 创建固定大小为2的线程池
LOGGER.info("销售业务检测到创建summaryService和saleCountDimension实例初始化线程池");
}else { }else {
summaryService = new PurchaseDataSummaryServiceImpl(); summaryService = new PurchaseDataSummaryServiceImpl();
LOGGER.info("采购销售的接口"); LOGGER.info("采购业务检测到创建summaryService实例");
} }
List<RowMap> bkgsMaps = DBSql.getMaps("SELECT BKGS FROM " + targetTable + " GROUP BY BKGS"); List<RowMap> bkgsMaps = DBSql.getMaps("SELECT BKGS FROM " + targetTable + " GROUP BY BKGS");
if (bkgsMaps!=null) { if (bkgsMaps != null) {
for (RowMap map : bkgsMaps) { for (RowMap map : bkgsMaps) {
BO bo = new BO(); BO bo = new BO();
bo.set("BKGS", map.getString("BKGS")); bo.set("BKGS", map.getString("BKGS"));
summaryService.calculateSummary(dateRange, bo);
if (saleCountDimension!=null){ if ("销售".equals(ssyw)) {
//计算销售的维度 LOGGER.info("======== 开始并行执行销售数据汇总计算BKGS: {} ========", bo.get("BKGS"));
LOGGER.info("======== 开始执行销售数据汇总计算 ========");
saleCountDimension.calculateSummary(dateRange, bo); // 创建并提交两个并行任务
LOGGER.info("======== 销售数据汇总计算完成 ========"); DataSummaryService finalSummaryService = summaryService;
Future<?> summaryFuture = executorService.submit(() -> {
try {
LOGGER.info("开始执行一体化-销售数据汇总计算BKGS: {}", bo.get("BKGS"));
finalSummaryService.calculateSummary(dateRange, bo);
LOGGER.info("完成一体化-销售数据汇总计算BKGS: {}", bo.get("BKGS"));
} catch (Exception e) {
LOGGER.error("一体化-销售数据汇总计算异常BKGS: {}", bo.get("BKGS"), e);
throw new RuntimeException("一体化-销售数据汇总计算失败: " + e.getMessage(), e);
}
});
SaleCountDimensionImpl finalSaleCountDimension = saleCountDimension;
Future<?> countFuture = executorService.submit(() -> {
try {
LOGGER.info("开始执行销售数据多维度汇总计算BKGS: {}", bo.get("BKGS"));
finalSaleCountDimension.calculateSummary(dateRange, bo);
LOGGER.info("完成销售数据多维度汇总计算BKGS: {}", bo.get("BKGS"));
} catch (Exception e) {
LOGGER.error("销售数据多维度汇总计算异常BKGS: {}", bo.get("BKGS"), e);
throw new RuntimeException("销售数据多维度汇总计算失败: " + e.getMessage(), e);
}
});
// 等待两个任务完成
try {
summaryFuture.get();
countFuture.get();
LOGGER.info("销售数据并行计算完成BKGS: {}", bo.get("BKGS"));
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("销售数据并行计算执行异常BKGS: {}", bo.get("BKGS"), e);
throw new RuntimeException("销售数据并行计算失败: " + e.getMessage(), e);
}
} else {
// 非销售业务单线程执行
LOGGER.info("======== 开始执行采购数据汇总计算BKGS: {} ========", bo.get("BKGS"));
try {
summaryService.calculateSummary(dateRange, bo);
LOGGER.info("======== 采购数据汇总计算完成BKGS: {} ========", bo.get("BKGS"));
} catch (Exception e) {
LOGGER.error("采购数据汇总计算异常BKGS: {}", bo.get("BKGS"), e);
throw new RuntimeException("采购数据汇总计算失败: " + e.getMessage(), e);
}
} }
} }
} }
// 如果是销售业务关闭线程池
if ("销售".equals(ssyw) && executorService != null) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
LOGGER.info("销售业务线程池已关闭");
}
} }
ro.put("success", true); ro.put("success", true);
@ -249,6 +308,16 @@ public class DataLinkUpController {
LOGGER.error(errorMsg, e); LOGGER.error(errorMsg, e);
ro.put("success", false); ro.put("success", false);
ro.put("message", errorMsg); ro.put("message", errorMsg);
// 确保异常时关闭线程池
if (executorService != null) {
executorService.shutdownNow();
}
}finally {
// 最终确保线程池关闭
if (executorService != null) {
executorService.shutdown();
}
} }
long methodEndTime = System.currentTimeMillis(); long methodEndTime = System.currentTimeMillis();
LOGGER.info("【完成】数据计算汇总操作,总耗时:{}ms", methodEndTime - methodStartTime); LOGGER.info("【完成】数据计算汇总操作,总耗时:{}ms", methodEndTime - methodStartTime);

View File

@ -117,13 +117,17 @@ public class SaleCountDimensionImpl implements DataSummaryService {
Connection conn = null; Connection conn = null;
try { try {
conn = DBSql.open(); conn = DBSql.open();
conn.setAutoCommit(false); // conn.setAutoCommit(false);
// 获取时间范围内的所有月份 // 获取时间范围内的所有月份
List<String> yearMonths = getYearMonthsBetweenDates(dateRange.getStartDate(), dateRange.getEndDate()); List<String> yearMonths = getYearMonthsBetweenDates(dateRange.getStartDate(), dateRange.getEndDate());
for (String yearMonth : yearMonths) { for (String yearMonth : yearMonths) {
LOGGER.info("开始处理{}月份的数据,板块公司: {}", yearMonth, bkgs); LOGGER.info("开始处理{}月份的数据,板块公司: {}", yearMonth, bkgs);
// 4. 处理区域两金占比
LOGGER.info("开始区域两金占比");
processRegionTwoFundsRatio(conn, yearMonth, bkgs);
// 1. 处理营业收入数据 // 1. 处理营业收入数据
LOGGER.info("开始营业收入数据"); LOGGER.info("开始营业收入数据");
processRevenueData(conn, yearMonth, bkgs); processRevenueData(conn, yearMonth, bkgs);
@ -136,11 +140,9 @@ public class SaleCountDimensionImpl implements DataSummaryService {
LOGGER.info("开始应收账款数据"); LOGGER.info("开始应收账款数据");
processReceivableData(conn, yearMonth, bkgs); processReceivableData(conn, yearMonth, bkgs);
// 4. 处理区域两金占比
LOGGER.info("开始区域两金占比");
processRegionTwoFundsRatio(conn, yearMonth, bkgs);
} }
conn.commit(); // conn.commit();
LOGGER.info("所有月份数据处理完成"); LOGGER.info("所有月份数据处理完成");
}catch (Exception e) { }catch (Exception e) {
try { try {
@ -203,12 +205,12 @@ public class SaleCountDimensionImpl implements DataSummaryService {
"GROUP BY QYGS, KCZZ, LB_1, LB_2, LB_3, SQ, CS, QY " + "GROUP BY QYGS, KCZZ, LB_1, LB_2, LB_3, SQ, CS, QY " +
"LIMIT " + PAGE_SIZE + " OFFSET " + offset; "LIMIT " + PAGE_SIZE + " OFFSET " + offset;
LOGGER.debug("营业收入数据查询第{}页SQL: {}", page + 1, querySql); LOGGER.info("营业收入数据查询第{}页SQL: {}", page + 1, querySql);
List<RowMap> maps = DBSql.getMaps(conn, querySql, yearMonth.replace("-", ""), bkgs); List<RowMap> maps = DBSql.getMaps(conn, querySql, yearMonth.replace("-", ""), bkgs);
if (maps.isEmpty()) { if (maps.isEmpty()) {
hasMore = false; hasMore = false;
LOGGER.debug("营业收入数据第{}页无数据,停止分页查询", page + 1); LOGGER.info("营业收入数据第{}页无数据,停止分页查询", page + 1);
} else { } else {
ArrayList<BO> bos = new ArrayList<>(); ArrayList<BO> bos = new ArrayList<>();
for (RowMap map : maps) { for (RowMap map : maps) {
@ -291,12 +293,12 @@ public class SaleCountDimensionImpl implements DataSummaryService {
"GROUP BY QYGS, KCZZ, LB_1, LB_2, LB_3, SQ, CS, QY " + "GROUP BY QYGS, KCZZ, LB_1, LB_2, LB_3, SQ, CS, QY " +
"LIMIT " + PAGE_SIZE + " OFFSET " + offset; "LIMIT " + PAGE_SIZE + " OFFSET " + offset;
LOGGER.debug("销量销额数据查询第{}页SQL: {}", page + 1, querySql); LOGGER.info("销量销额数据查询第{}页SQL: {}", page + 1, querySql);
List<RowMap> maps = DBSql.getMaps(conn, querySql, yearMonth.replace("-", ""), bkgs); List<RowMap> maps = DBSql.getMaps(conn, querySql, yearMonth.replace("-", ""), bkgs);
if (maps.isEmpty()) { if (maps.isEmpty()) {
hasMore = false; hasMore = false;
LOGGER.debug("销量销额数据第{}页无数据,停止分页查询", page + 1); LOGGER.info("销量销额数据第{}页无数据,停止分页查询", page + 1);
} else { } else {
ArrayList<BO> bos = new ArrayList<>(); ArrayList<BO> bos = new ArrayList<>();
for (RowMap map : maps) { for (RowMap map : maps) {
@ -391,12 +393,12 @@ public class SaleCountDimensionImpl implements DataSummaryService {
"WHERE DATE(RQ) = ? AND BKGS = ? " + "WHERE DATE(RQ) = ? AND BKGS = ? " +
"LIMIT " + PAGE_SIZE + " OFFSET " + offset; "LIMIT " + PAGE_SIZE + " OFFSET " + offset;
LOGGER.debug("应收账款数据查询第{}页SQL: {}", page + 1, querySql); LOGGER.info("应收账款数据查询第{}页SQL: {}", page + 1, querySql);
List<RowMap> maps = DBSql.getMaps(conn, querySql, lastDayOfMonth, bkgs); List<RowMap> maps = DBSql.getMaps(conn, querySql, lastDayOfMonth, bkgs);
if (maps.isEmpty()) { if (maps.isEmpty()) {
hasMore = false; hasMore = false;
LOGGER.debug("应收账款数据第{}页无数据,停止分页查询", page + 1); LOGGER.info("应收账款数据第{}页无数据,停止分页查询", page + 1);
} else { } else {
ArrayList<BO> bos = new ArrayList<>(); ArrayList<BO> bos = new ArrayList<>();
for (RowMap map : maps) { for (RowMap map : maps) {
@ -477,16 +479,16 @@ public class SaleCountDimensionImpl implements DataSummaryService {
String receivableSql = "SELECT QYGS, XSZZ, SUM(YSYE) as YSZK " + String receivableSql = "SELECT QYGS, XSZZ, SUM(YSYE) as YSZK " +
"FROM " + RECEIVABLE_DETAIL_TABLE + " " + "FROM " + RECEIVABLE_DETAIL_TABLE + " " +
"WHERE YEAR(RQ) = YEAR(?) AND MONTH(RQ) = MONTH(?) AND BKGS = ? " + "WHERE YEAR(RQ) = YEAR(?) AND MONTH(RQ) = MONTH(?) AND BKGS = ? " +
"GROUP BY QYGS, BKGS" + "GROUP BY QYGS, BKGS " +
"LIMIT " + PAGE_SIZE + " OFFSET " + offset; "LIMIT " + PAGE_SIZE + " OFFSET " + offset;
LOGGER.debug("应收账款数据查询第{}页SQL: {}", page + 1, receivableSql); LOGGER.info("应收账款数据查询第{}页SQL: {}", page + 1, receivableSql);
List<RowMap> receivableMaps = DBSql.getMaps(conn, receivableSql, List<RowMap> receivableMaps = DBSql.getMaps(conn, receivableSql,
lastDayOfMonth, lastDayOfMonth, bkgs); lastDayOfMonth, lastDayOfMonth, bkgs);
if (receivableMaps.isEmpty()) { if (receivableMaps.isEmpty()) {
hasMore = false; hasMore = false;
LOGGER.debug("应收账款数据第{}页无数据,停止分页查询", page + 1); LOGGER.info("应收账款数据第{}页无数据,停止分页查询", page + 1);
} else { } else {
ArrayList<BO> bos = new ArrayList<>(); ArrayList<BO> bos = new ArrayList<>();
@ -506,15 +508,15 @@ public class SaleCountDimensionImpl implements DataSummaryService {
String inventorySql = "SELECT STOCKORGNAME, SUM(BALANCE_AMOUNT) as KCJE " + String inventorySql = "SELECT STOCKORGNAME, SUM(BALANCE_AMOUNT) as KCJE " +
"FROM " + BO_EU_DWD_ORDER_KC_HZ + " " + "FROM " + BO_EU_DWD_ORDER_KC_HZ + " " +
"WHERE STOCKORGNAME IN (" + placeholders + ") " + "WHERE STOCKORGNAME IN ('" + placeholders + "') " +
"AND CATEGORY = '产成品' " + "AND CATEGORY = '产成品' " +
"AND YEAR(INDATE) = YEAR(?) " + "AND YEAR(INDATE) = YEAR('"+lastDayOfMonth+"') " +
"AND MONTH(INDATE) = MONTH(?) " + "AND MONTH(INDATE) = MONTH('"+lastDayOfMonth+"') " +
"GROUP BY STOCKORGNAME"; "GROUP BY STOCKORGNAME";
LOGGER.debug("库存金额数据查询SQL: {}", inventorySql); LOGGER.info("库存金额数据查询SQL: {}", inventorySql);
List<RowMap> inventoryMaps = DBSql.getMaps(conn, inventorySql, lastDayOfMonth,lastDayOfMonth); List<RowMap> inventoryMaps = DBSql.getMaps(conn, inventorySql);
Map<String, Double> inventoryMap = inventoryMaps.stream() Map<String, Double> inventoryMap = inventoryMaps.stream()
.collect(Collectors.toMap( .collect(Collectors.toMap(
row -> row.getString("STOCKORGNAME"), row -> row.getString("STOCKORGNAME"),