Compare commits

...

2 Commits

9 changed files with 180 additions and 35 deletions

View File

@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
/**
* @ClassName: DataLinkUpController
@ -180,6 +181,80 @@ public class DataLinkUpController {
* @return 响应结果
*/
@Mapping("com.awspaas.user.apps.bnbm.datalinkup.controller.DataLinkUpController_calculateSummary")
// public ResponseObject calculateSummary(String dataStr, String sid, String formattedDate) {
// long methodStartTime = System.currentTimeMillis();
// LOGGER.info("【开始】数据计算汇总操作,开始时间:{}", new Date(methodStartTime));
// ResponseObject ro = ResponseObject.newOkResponse();
// JSONArray configArray = new JSONArray(dataStr);
//
// Calendar cal = Calendar.getInstance();
// cal.add(Calendar.DATE, -1); // 昨天
// Date endDate = cal.getTime();
//
// // 解析formattedDate为日期对象并计算时间范围
// DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
// LocalDate date = LocalDate.parse(formattedDate, dateFormatter);
// LocalDateTime startDateTime = date.atStartOfDay(); // 当天00:00:00
// Date startDate = Date.from(startDateTime.atZone(ZoneId.systemDefault()).toInstant());
// DataSummaryService summaryService = null;
// SaleCountDimensionImpl saleCountDimension = null;
// try {
// LOGGER.info("开始执行销售数据多维度汇总计算");
//
// DateRange dateRange = new DateRange();
// dateRange.setStartDate(startDate);
// dateRange.setEndDate(endDate);
// LOGGER.info("汇总计算开始时间为:{},结束时间为:{}",startDate,endDate);
// // 2. 执行汇总计算
// for (int i = 0; i < configArray.length(); i++) {
// JSONObject config = configArray.getJSONObject(i);
// String timeField = config.getString("SJZD");
// String ccId = config.getString("CC_ID");
// String targetTable = config.getString("LDB");
// String partitionField = config.getString("FQBZD");
// String tableName = config.getString("TBB");
// String bkgs = config.getString("SSBK");
// String ssyw = config.getString("SSYW");
//
// // 1. 创建数据汇总服务实例
// if ("销售".equals(ssyw)) {
// summaryService = new SaleDataSummaryServiceImpl();
// saleCountDimension = new SaleCountDimensionImpl();
// LOGGER.info("销售销售的接口");
// }else {
// summaryService = new PurchaseDataSummaryServiceImpl();
// LOGGER.info("采购销售的接口");
// }
//
// List<RowMap> bkgsMaps = DBSql.getMaps("SELECT BKGS FROM " + targetTable + " GROUP BY BKGS");
// if (bkgsMaps!=null) {
// for (RowMap map : bkgsMaps) {
// BO bo = new BO();
// bo.set("BKGS", map.getString("BKGS"));
//// summaryService.calculateSummary(dateRange, bo);
//// if (saleCountDimension!=null){
//// //计算销售的维度
//// LOGGER.info("======== 开始执行销售数据汇总计算 ========");
// saleCountDimension.calculateSummary(dateRange, bo);
//// LOGGER.info("======== 销售数据汇总计算完成 ========");
//// }
// }
// }
// }
//
// ro.put("success", true);
// ro.put("message", "数据汇总计算完成");
// LOGGER.info("销售数据多维度汇总计算完成");
// } catch (Exception e) {
// String errorMsg = "数据汇总计算失败: " + e.getMessage();
// LOGGER.error(errorMsg, e);
// ro.put("success", false);
// ro.put("message", errorMsg);
// }
// long methodEndTime = System.currentTimeMillis();
// LOGGER.info("【完成】数据计算汇总操作,总耗时:{}ms", methodEndTime - methodStartTime);
// return ro;
// }
public ResponseObject calculateSummary(String dataStr, String sid, String formattedDate) {
long methodStartTime = System.currentTimeMillis();
LOGGER.info("【开始】数据计算汇总操作,开始时间:{}", new Date(methodStartTime));
@ -197,6 +272,8 @@ public class DataLinkUpController {
Date startDate = Date.from(startDateTime.atZone(ZoneId.systemDefault()).toInstant());
DataSummaryService summaryService = null;
SaleCountDimensionImpl saleCountDimension = null;
ExecutorService executorService = null; // 线程池用于并行处理销售业务
try {
LOGGER.info("开始执行销售数据多维度汇总计算");
@ -219,26 +296,82 @@ public class DataLinkUpController {
if ("销售".equals(ssyw)) {
summaryService = new SaleDataSummaryServiceImpl();
saleCountDimension = new SaleCountDimensionImpl();
LOGGER.info("销售销售的接口");
executorService = Executors.newFixedThreadPool(2); // 创建固定大小为2的线程池
LOGGER.info("销售业务检测到创建summaryService和saleCountDimension实例初始化线程池");
}else {
summaryService = new PurchaseDataSummaryServiceImpl();
LOGGER.info("采购销售的接口");
LOGGER.info("采购业务检测到创建summaryService实例");
}
List<RowMap> bkgsMaps = DBSql.getMaps("SELECT BKGS FROM " + targetTable + " GROUP BY BKGS");
if (bkgsMaps!=null) {
if (bkgsMaps != null) {
for (RowMap map : bkgsMaps) {
BO bo = new BO();
bo.set("BKGS", map.getString("BKGS"));
summaryService.calculateSummary(dateRange, bo);
if (saleCountDimension!=null){
//计算销售的维度
LOGGER.info("======== 开始执行销售数据汇总计算 ========");
saleCountDimension.calculateSummary(dateRange, bo);
LOGGER.info("======== 销售数据汇总计算完成 ========");
if ("销售".equals(ssyw)) {
LOGGER.info("======== 开始并行执行销售数据汇总计算BKGS: {} ========", bo.get("BKGS"));
// 创建并提交两个并行任务
DataSummaryService finalSummaryService = summaryService;
Future<?> summaryFuture = executorService.submit(() -> {
try {
LOGGER.info("开始执行一体化-销售数据汇总计算BKGS: {}", bo.get("BKGS"));
finalSummaryService.calculateSummary(dateRange, bo);
LOGGER.info("完成一体化-销售数据汇总计算BKGS: {}", bo.get("BKGS"));
} catch (Exception e) {
LOGGER.error("一体化-销售数据汇总计算异常BKGS: {}", bo.get("BKGS"), e);
throw new RuntimeException("一体化-销售数据汇总计算失败: " + e.getMessage(), e);
}
});
SaleCountDimensionImpl finalSaleCountDimension = saleCountDimension;
Future<?> countFuture = executorService.submit(() -> {
try {
LOGGER.info("开始执行销售数据多维度汇总计算BKGS: {}", bo.get("BKGS"));
finalSaleCountDimension.calculateSummary(dateRange, bo);
LOGGER.info("完成销售数据多维度汇总计算BKGS: {}", bo.get("BKGS"));
} catch (Exception e) {
LOGGER.error("销售数据多维度汇总计算异常BKGS: {}", bo.get("BKGS"), e);
throw new RuntimeException("销售数据多维度汇总计算失败: " + e.getMessage(), e);
}
});
// 等待两个任务完成
try {
summaryFuture.get();
countFuture.get();
LOGGER.info("销售数据并行计算完成BKGS: {}", bo.get("BKGS"));
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("销售数据并行计算执行异常BKGS: {}", bo.get("BKGS"), e);
throw new RuntimeException("销售数据并行计算失败: " + e.getMessage(), e);
}
} else {
// 非销售业务单线程执行
LOGGER.info("======== 开始执行采购数据汇总计算BKGS: {} ========", bo.get("BKGS"));
try {
summaryService.calculateSummary(dateRange, bo);
LOGGER.info("======== 采购数据汇总计算完成BKGS: {} ========", bo.get("BKGS"));
} catch (Exception e) {
LOGGER.error("采购数据汇总计算异常BKGS: {}", bo.get("BKGS"), e);
throw new RuntimeException("采购数据汇总计算失败: " + e.getMessage(), e);
}
}
}
}
// 如果是销售业务关闭线程池
if ("销售".equals(ssyw) && executorService != null) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
LOGGER.info("销售业务线程池已关闭");
}
}
ro.put("success", true);
@ -249,6 +382,16 @@ public class DataLinkUpController {
LOGGER.error(errorMsg, e);
ro.put("success", false);
ro.put("message", errorMsg);
// 确保异常时关闭线程池
if (executorService != null) {
executorService.shutdownNow();
}
}finally {
// 最终确保线程池关闭
if (executorService != null) {
executorService.shutdown();
}
}
long methodEndTime = System.currentTimeMillis();
LOGGER.info("【完成】数据计算汇总操作,总耗时:{}ms", methodEndTime - methodStartTime);

View File

@ -379,12 +379,12 @@ public class PurchaseDataSummaryServiceImpl implements DataSummaryService {
summaryBO.set("JD", base);
List<BO> bnbmCgPzwlfl = SDK.getBOAPI().query("BO_EU_BNBM_CG_PZWLFL").
addQuery("BKMC = ", bkgs).addQuery("WLMC LIKE '%" + wlmc + "%'", null).list();
if (bnbmCgPzwlfl!=null){
String wlfl = bnbmCgPzwlfl.stream().map(o -> o.getString("WLFL")).collect(Collectors.joining("|"));
summaryBO.set("WLMC", wlfl);
}else {
// if (bnbmCgPzwlfl!=null){
// String wlfl = bnbmCgPzwlfl.stream().map(o -> o.getString("WLFL")).collect(Collectors.joining("|"));
// summaryBO.set("WLMC", wlfl);
// }else {
summaryBO.set("WLMC", wlmc);
}
// }
summaryBO.set("YEARMONTH", yearMonth);
summaryBO.set("CGZE", currentMonthRow.getDouble("totalAmount"));
summaryBO.set("CGZL", currentMonthRow.getDouble("totalQuantity"));

View File

@ -117,13 +117,17 @@ public class SaleCountDimensionImpl implements DataSummaryService {
Connection conn = null;
try {
conn = DBSql.open();
conn.setAutoCommit(false);
// conn.setAutoCommit(false);
// 获取时间范围内的所有月份
List<String> yearMonths = getYearMonthsBetweenDates(dateRange.getStartDate(), dateRange.getEndDate());
for (String yearMonth : yearMonths) {
LOGGER.info("开始处理{}月份的数据,板块公司: {}", yearMonth, bkgs);
// 4. 处理区域两金占比
LOGGER.info("开始区域两金占比");
processRegionTwoFundsRatio(conn, yearMonth, bkgs);
// 1. 处理营业收入数据
LOGGER.info("开始营业收入数据");
processRevenueData(conn, yearMonth, bkgs);
@ -136,11 +140,9 @@ public class SaleCountDimensionImpl implements DataSummaryService {
LOGGER.info("开始应收账款数据");
processReceivableData(conn, yearMonth, bkgs);
// 4. 处理区域两金占比
LOGGER.info("开始区域两金占比");
processRegionTwoFundsRatio(conn, yearMonth, bkgs);
}
conn.commit();
// conn.commit();
LOGGER.info("所有月份数据处理完成");
}catch (Exception e) {
try {
@ -203,12 +205,12 @@ public class SaleCountDimensionImpl implements DataSummaryService {
"GROUP BY QYGS, KCZZ, LB_1, LB_2, LB_3, SQ, CS, QY " +
"LIMIT " + PAGE_SIZE + " OFFSET " + offset;
LOGGER.debug("营业收入数据查询第{}页SQL: {}", page + 1, querySql);
LOGGER.info("营业收入数据查询第{}页SQL: {}", page + 1, querySql);
List<RowMap> maps = DBSql.getMaps(conn, querySql, yearMonth.replace("-", ""), bkgs);
if (maps.isEmpty()) {
hasMore = false;
LOGGER.debug("营业收入数据第{}页无数据,停止分页查询", page + 1);
LOGGER.info("营业收入数据第{}页无数据,停止分页查询", page + 1);
} else {
ArrayList<BO> bos = new ArrayList<>();
for (RowMap map : maps) {
@ -285,18 +287,18 @@ public class SaleCountDimensionImpl implements DataSummaryService {
while (hasMore) {
int offset = page * PAGE_SIZE;
String querySql = "SELECT QYGS, KCZZ, LB_1, LB_2, LB_3, SQ, CS, QY, " +
"SUM(ZSSL) as XL, SUM(SSJERMB) as XE ,(SUM(XSSL))/10000 AS TSXL " +
"SUM(ZSSL) as XL, SUM(SSJERMB) as XE ,SUM(XSSL) AS TSXL " +
"FROM " + SALES_DETAIL_TABLE + " " +
"WHERE YEARMONTH = ? AND BKGS = ? " +
"GROUP BY QYGS, KCZZ, LB_1, LB_2, LB_3, SQ, CS, QY " +
"LIMIT " + PAGE_SIZE + " OFFSET " + offset;
LOGGER.debug("销量销额数据查询第{}页SQL: {}", page + 1, querySql);
LOGGER.info("销量销额数据查询第{}页SQL: {}", page + 1, querySql);
List<RowMap> maps = DBSql.getMaps(conn, querySql, yearMonth.replace("-", ""), bkgs);
if (maps.isEmpty()) {
hasMore = false;
LOGGER.debug("销量销额数据第{}页无数据,停止分页查询", page + 1);
LOGGER.info("销量销额数据第{}页无数据,停止分页查询", page + 1);
} else {
ArrayList<BO> bos = new ArrayList<>();
for (RowMap map : maps) {
@ -317,7 +319,7 @@ public class SaleCountDimensionImpl implements DataSummaryService {
bo.set("JD", location.getLongitude());
bo.set("WD", location.getLatitude());
}
if (bkgs.equals("泰山石膏") && "石膏板".equals(map.getString("LB_1"))){
if ("石膏板".equals(map.getString("LB_1"))){
bo.set("XL", map.getDouble("TSXL"));
}else {
bo.set("XL", map.getDouble("XL"));
@ -391,12 +393,12 @@ public class SaleCountDimensionImpl implements DataSummaryService {
"WHERE DATE(RQ) = ? AND BKGS = ? " +
"LIMIT " + PAGE_SIZE + " OFFSET " + offset;
LOGGER.debug("应收账款数据查询第{}页SQL: {}", page + 1, querySql);
LOGGER.info("应收账款数据查询第{}页SQL: {}", page + 1, querySql);
List<RowMap> maps = DBSql.getMaps(conn, querySql, lastDayOfMonth, bkgs);
if (maps.isEmpty()) {
hasMore = false;
LOGGER.debug("应收账款数据第{}页无数据,停止分页查询", page + 1);
LOGGER.info("应收账款数据第{}页无数据,停止分页查询", page + 1);
} else {
ArrayList<BO> bos = new ArrayList<>();
for (RowMap map : maps) {
@ -477,16 +479,16 @@ public class SaleCountDimensionImpl implements DataSummaryService {
String receivableSql = "SELECT QYGS, XSZZ, SUM(YSYE) as YSZK " +
"FROM " + RECEIVABLE_DETAIL_TABLE + " " +
"WHERE YEAR(RQ) = YEAR(?) AND MONTH(RQ) = MONTH(?) AND BKGS = ? " +
"GROUP BY QYGS, BKGS" +
"GROUP BY QYGS, BKGS " +
"LIMIT " + PAGE_SIZE + " OFFSET " + offset;
LOGGER.debug("应收账款数据查询第{}页SQL: {}", page + 1, receivableSql);
LOGGER.info("应收账款数据查询第{}页SQL: {}", page + 1, receivableSql);
List<RowMap> receivableMaps = DBSql.getMaps(conn, receivableSql,
lastDayOfMonth, lastDayOfMonth, bkgs);
if (receivableMaps.isEmpty()) {
hasMore = false;
LOGGER.debug("应收账款数据第{}页无数据,停止分页查询", page + 1);
LOGGER.info("应收账款数据第{}页无数据,停止分页查询", page + 1);
} else {
ArrayList<BO> bos = new ArrayList<>();
@ -506,15 +508,15 @@ public class SaleCountDimensionImpl implements DataSummaryService {
String inventorySql = "SELECT STOCKORGNAME, SUM(BALANCE_AMOUNT) as KCJE " +
"FROM " + BO_EU_DWD_ORDER_KC_HZ + " " +
"WHERE STOCKORGNAME IN (" + placeholders + ") " +
"WHERE STOCKORGNAME IN ('" + placeholders + "') " +
"AND CATEGORY = '产成品' " +
"AND YEAR(INDATE) = YEAR(?) " +
"AND MONTH(INDATE) = MONTH(?) " +
"AND YEAR(INDATE) = YEAR('"+lastDayOfMonth+"') " +
"AND MONTH(INDATE) = MONTH('"+lastDayOfMonth+"') " +
"GROUP BY STOCKORGNAME";
LOGGER.debug("库存金额数据查询SQL: {}", inventorySql);
LOGGER.info("库存金额数据查询SQL: {}", inventorySql);
List<RowMap> inventoryMaps = DBSql.getMaps(conn, inventorySql, lastDayOfMonth,lastDayOfMonth);
List<RowMap> inventoryMaps = DBSql.getMaps(conn, inventorySql);
Map<String, Double> inventoryMap = inventoryMaps.stream()
.collect(Collectors.toMap(
row -> row.getString("STOCKORGNAME"),