1、增加数据库重试机制,三次失败发送邮件

2、增加更新入库单区域公司条数
This commit is contained in:
llllon 2025-09-22 09:07:38 +08:00
parent c8a7d0be46
commit 1d001ea77f
11 changed files with 285 additions and 161 deletions

View File

@ -192,6 +192,7 @@ public class DataLinkUpController {
syncService.deleteTargetData(targetTable, targetTimeField, startDate, endDate);
}
if (tablename.equals("应收表")){
LOGGER.info("开始执行应收的明细汇总");
syncService.querySourceDataYS(ccId, tableName, timeField, startDate, endDate, partitionField,
fieldMappings, targetTable, jezd);
}else {

View File

@ -18,6 +18,9 @@ import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
@ -40,6 +43,9 @@ public class ProductionDataSyncServiceImpl implements DataSyncService {
private static final int PAGE_SIZE = 10000; // 每页查询1000条记录
private static final String ORACLE_DATE_FORMAT = "YYYY-MM-DD HH24:MI:SS";
private static final String SEND_EMAIL_ACCOUNT_NUMBER = SDK.getAppAPI().getProperty("com.awspaas.user.apps.bnbm.datalinkup","sendEmailAccountNumber");
private static final String RECEIVE_EMAIL_ACCOUNT = SDK.getAppAPI().getProperty("com.awspaas.user.apps.bnbm.datalinkup","RECEIVE_EMAIL_ACCOUNT");
@Override
public ArrayList<DateRange> syncDataByConfigs(List<BO> configs) {
ArrayList<DateRange> list = new ArrayList<>();
@ -200,12 +206,45 @@ public class ProductionDataSyncServiceImpl implements DataSyncService {
int pageNo = 1;
boolean hasMore;
RDSAPI rdsapi = null;
DBUtils.SUPPLY supply = null;
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
SimpleDateFormat simpleDateFormat2 = new SimpleDateFormat("yyyy-MM-dd");
// 重试配置
int maxRetries = 3;
int retryDelayMs = 2000; // 重试延迟2秒
try {
// 带重试机制的连接获取
for (int retry = 0; retry < maxRetries; retry++) {
try {
rdsapi = SDK.getCCAPI().getRDSAPI(ccId);
DBUtils.SUPPLY supply = rdsapi.getSupply();
supply = rdsapi.getSupply();
if (rdsapi != null && supply!=null) {
break; // 连接成功跳出重试循环
}
} catch (Exception e) {
if (retry == maxRetries - 1) {
String result = fieldMappings.stream()
.filter(bo -> "BKGS".equals(bo.getString("LDBZD"))) // 条件1: LDBZD "BKGS"
.filter(bo -> bo.getString("LDZDMRZ") != null) // 条件2: LDZDMRZ 不为空
.map(bo -> bo.getString("LDZDMRZ")) // 提取 LDZDMRZ 的值
.collect(Collectors.joining()); // 收集到 List
// 最后一次重试仍然失败
String errorMsg = String.format("获取数据库连接失败(CC_ID: %s),板块为:%s已重试%d次同步时间为%s\n错误信息为%s",
ccId, result, maxRetries, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), e.getMessage());
LOGGER.error(errorMsg, e);
// 发送邮件通知
boolean eMailSync = SDK.getNotificationAPI().sendEMailSync(SEND_EMAIL_ACCOUNT_NUMBER, RECEIVE_EMAIL_ACCOUNT, null, "数据贯通数据库连接通知", errorMsg);
if (eMailSync){
LOGGER.info("短信发送成功");
}else {
LOGGER.info("短信发送失败");
}
throw new RuntimeException(errorMsg, e);
}
LOGGER.warn("获取数据库连接失败(CC_ID: {}),第{}次重试...", ccId, retry + 1);
Thread.sleep(retryDelayMs);
}
}
String DBname = supply.getName();
LOGGER.info("数据库为:{}",DBname);

View File

@ -20,6 +20,8 @@ import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
@ -40,6 +42,8 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService {
* 增加分页大小常量
*/
private static final int PAGE_SIZE = 10000; // 每页查询1000条记录
private static final String SEND_EMAIL_ACCOUNT_NUMBER = SDK.getAppAPI().getProperty("com.awspaas.user.apps.bnbm.datalinkup","sendEmailAccountNumber");
private static final String RECEIVE_EMAIL_ACCOUNT = SDK.getAppAPI().getProperty("com.awspaas.user.apps.bnbm.datalinkup","RECEIVE_EMAIL_ACCOUNT");
private static final String ORACLE_DATE_FORMAT = "YYYY-MM-DD HH24:MI:SS";
@Override
@ -232,11 +236,46 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService {
int pageNo = 1;
boolean hasMore;
RDSAPI rdsapi = null;
DBUtils.SUPPLY supply = null;
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
SimpleDateFormat simpleDateFormat2 = new SimpleDateFormat("yyyy-MM-dd");
// 重试配置
int maxRetries = 3;
int retryDelayMs = 2000; // 重试延迟2秒
try {
// 带重试机制的连接获取
for (int retry = 0; retry < maxRetries; retry++) {
try {
rdsapi = SDK.getCCAPI().getRDSAPI(ccId);
DBUtils.SUPPLY supply = rdsapi.getSupply();
supply = rdsapi.getSupply();
if (rdsapi != null && supply!=null) {
break; // 连接成功跳出重试循环
}
} catch (Exception e) {
if (retry == maxRetries - 1) {
String result = fieldMappings.stream()
.filter(bo -> "BKGS".equals(bo.getString("LDBZD"))) // 条件1: LDBZD "BKGS"
.filter(bo -> bo.getString("LDZDMRZ") != null) // 条件2: LDZDMRZ 不为空
.map(bo -> bo.getString("LDZDMRZ")) // 提取 LDZDMRZ 的值
.collect(Collectors.joining()); // 收集到 List
// 最后一次重试仍然失败
String errorMsg = String.format("获取数据库连接失败(CC_ID: %s),板块为:%s已重试%d次同步时间为%s\n错误信息为%s",
ccId, result, maxRetries, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), e.getMessage());
LOGGER.error(errorMsg, e);
// 发送邮件通知
boolean eMailSync = SDK.getNotificationAPI().sendEMailSync(SEND_EMAIL_ACCOUNT_NUMBER, RECEIVE_EMAIL_ACCOUNT, null, "数据贯通数据库连接通知", errorMsg);
if (eMailSync){
LOGGER.info("短信发送成功");
}else {
LOGGER.info("短信发送失败");
}
throw new RuntimeException(errorMsg, e);
}
LOGGER.warn("获取数据库连接失败(CC_ID: {}),第{}次重试...", ccId, retry + 1);
Thread.sleep(retryDelayMs);
}
}
String DBname = supply.getName();
LOGGER.info("数据库为:{}",DBname);
@ -925,6 +964,12 @@ public class PurchaseDataSyncServiceImpl implements DataSyncService {
bos.add(bo);
}
int update = DBSql.update("UPDATE BO_EU_DWD_ORDER_RKD_HZ rkd " +
" JOIN BO_EU_ZZDYQY zzd ON rkd.BKGS = zzd.BKGS AND rkd.KCZZ = zzd.XSZZ " +
" SET rkd.QYGS = zzd.QYGS " +
" WHERE rkd.QYGS IS NULL");
LOGGER.info("更新入库单区域公司条数:{}",update);
SDK.getBOAPI().createDataBO(hzb, bos, UserContext.fromUID("admin"));
LOGGER.info("已迁移{}条数据到汇总表(页号: {},时间范围: {} - {}",
bos.size(), pageNo, startDate, endDate);

View File

@ -129,8 +129,9 @@ public class SaleCountDimensionImpl implements DataSummaryService {
// 删除已存在的记录
String deleteSql = "DELETE FROM " + BO_EU_XS_YSZK + " WHERE YEARMONTH >= ? AND BKGS = ?";
try {
int deleted = DBSql.update(deleteSql, new Object[]{DATE_FORMAT.format(dateRange.getStartDate()), bkgs});
LOGGER.info("应收账款-已删除{}条记录", deleted);
LOGGER.info("应收账款删除sql{}",deleteSql);
int deleted = DBSql.update(deleteSql, new Object[]{YEAR_MONTH_FORMAT.format(dateRange.getStartDate()), bkgs});
LOGGER.info("应收账款-已删除{}-{}条应收账款记录\nsql为{}", YEAR_MONTH_FORMAT.format(dateRange.getStartDate()), deleted,deleteSql);
} catch (Exception e) {
LOGGER.error("应收账款-删除{}数据错误删除sql为{},请检查数据库链接:{}", bkgs, deleteSql, e.getMessage());
throw e;
@ -142,11 +143,11 @@ public class SaleCountDimensionImpl implements DataSummaryService {
" t.ZLFX3_4 AS ZLFX3_4,t.ZLFX4_5 AS ZLFX4_5,t.ZLFX5 AS ZLFX5" +
" FROM BO_EU_BNBM_DATALINKUP_XS_YSL t" +
" INNER JOIN (" +
" SELECT QYGS,SHIQU,MAX(RQ) AS MaxRQ,YEAR(RQ) AS Year,MONTH(RQ) AS Month" +
" SELECT MAX(RQ) AS MaxRQ,YEAR(RQ) AS Year,MONTH(RQ) AS Month" +
" FROM BO_EU_BNBM_DATALINKUP_XS_YSL" +
" WHERE DATE(RQ) >= ? AND BKGS = ?" +
" GROUP BY QYGS, SHIQU, YEAR(RQ), MONTH(RQ)" +
" ) AS grouped ON t.QYGS = grouped.QYGS AND t.SHIQU = grouped.SHIQU AND t.RQ = grouped.MaxRQ" +
" GROUP BY YEAR(RQ), MONTH(RQ)" +
" ) AS grouped ON t.RQ = grouped.MaxRQ" +
" WHERE t.BKGS = ?" +
" ORDER BY t.QYGS, t.SHIQU, t.RQ";

View File

@ -20,6 +20,7 @@ import java.math.RoundingMode;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.Date;
@ -34,14 +35,13 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
// 时间范围常量同步最近30天数据不包括当天
private static final int DAYS_BACK = Integer.parseInt(SDK.getAppAPI().getProperty("com.awspaas.user.apps.bnbm.datalinkup", "days_back"));
// 高斯数据库配置常量生产环境中应改为从配置文件读取
// private static final String GAUSSIAN_JDBC_URL = SDK.getAppAPI().getProperty("com.awspaas.user.apps.bnbm.datalinkup", "jbl_data_url");
// private static final String GAUSSIAN_USERNAME = SDK.getAppAPI().getProperty("com.awspaas.user.apps.bnbm.datalinkup", "jbl_data_act");
// private static final String GAUSSIAN_PASSWORD = SDK.getAppAPI().getProperty("com.awspaas.user.apps.bnbm.datalinkup", "jbl_data_pw");
// 增加分页大小常量
private static final int PAGE_SIZE = 10000; // 每页查询1000条记录
private static final String ORACLE_DATE_FORMAT = "YYYY-MM-DD HH24:MI:SS";
private static final String SEND_EMAIL_ACCOUNT_NUMBER = SDK.getAppAPI().getProperty("com.awspaas.user.apps.bnbm.datalinkup","sendEmailAccountNumber");
private static final String RECEIVE_EMAIL_ACCOUNT = SDK.getAppAPI().getProperty("com.awspaas.user.apps.bnbm.datalinkup","RECEIVE_EMAIL_ACCOUNT");
/**
* 根据主配置列表执行数据同步
@ -171,11 +171,47 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
int pageNo = 1;
boolean hasMore;
RDSAPI rdsapi = null;
DBUtils.SUPPLY supply = null;
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
SimpleDateFormat simpleDateFormat2 = new SimpleDateFormat("yyyy-MM-dd");
// 重试配置
int maxRetries = 3;
int retryDelayMs = 2000; // 重试延迟2秒
try {
// 带重试机制的连接获取
for (int retry = 0; retry < maxRetries; retry++) {
try {
rdsapi = SDK.getCCAPI().getRDSAPI(ccId);
DBUtils.SUPPLY supply = rdsapi.getSupply();
supply = rdsapi.getSupply();
if (rdsapi != null && supply!=null) {
break; // 连接成功跳出重试循环
}
} catch (Exception e) {
if (retry == maxRetries - 1) {
String result = fieldMappings.stream()
.filter(bo -> "BKGS".equals(bo.getString("LDBZD"))) // 条件1: LDBZD "BKGS"
.filter(bo -> bo.getString("LDZDMRZ") != null) // 条件2: LDZDMRZ 不为空
.map(bo -> bo.getString("LDZDMRZ")) // 提取 LDZDMRZ 的值
.collect(Collectors.joining()); // 收集到 List
// 最后一次重试仍然失败
String errorMsg = String.format("获取数据库连接失败(CC_ID: %s),板块为:%s已重试%d次同步时间为%s\n错误信息为%s",
ccId, result, maxRetries, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), e.getMessage());
LOGGER.error(errorMsg, e);
// 发送邮件通知
boolean eMailSync = SDK.getNotificationAPI().sendEMailSync(SEND_EMAIL_ACCOUNT_NUMBER, RECEIVE_EMAIL_ACCOUNT, null, "数据贯通数据库连接通知", errorMsg);
if (eMailSync){
LOGGER.info("短信发送成功");
}else {
LOGGER.info("短信发送失败");
}
throw new RuntimeException(errorMsg, e);
}
LOGGER.warn("获取数据库连接失败(CC_ID: {}),第{}次重试...", ccId, retry + 1);
Thread.sleep(retryDelayMs);
}
}
String DBname = supply.getName();
LOGGER.info("数据库为:{}",DBname);
@ -679,6 +715,7 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
String timeField = mainConfig.getString("SJZD");//时间字段
String bindId = mainConfig.getString("BINDID");//bindid
String tablename = mainConfig.getString("TABLENAME");//同步表名
String hzb = "";
try {
if ("销售表".equals(tablename)){
@ -708,7 +745,11 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
// 根据时间范围增加数据分页查询数据存储到BO_EU_BNBM_DATALINKUP_XS_XSL_HZ
// 全量分页迁移数据到汇总表
if (tablename.equals("应收表")){
summarizeScopeDataYs(targetTable, null, null, null, hzb);
}else {
summarizeScopeData(targetTable, null, null, null, hzb);
}
} else {
// 计算时间范围当前日期-30天 ~ 昨天
Calendar cal = Calendar.getInstance();
@ -731,8 +772,12 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
// 根据时间范围增加数据分页查询数据存储到BO_EU_BNBM_DATALINKUP_XS_XSL_HZ
// 按时间范围分页迁移数据到汇总表
if (tablename.equals("应收表")){
summarizeScopeDataYs(targetTable, startDate, endDate, targetTimeField, hzb);
}else {
summarizeScopeData(targetTable, startDate, endDate, targetTimeField, hzb);
}
}
} catch (Exception e) {
LOGGER.error("处理配置失败 [板块={}, BindID={}]: {}",
plate, mainConfig.getString("BINDID"), e.getMessage(), e);
@ -817,8 +862,8 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
}
//销售汇总表修改区域公司
if ("梦牌".equals(bo.getString("BKGS"))) {
String cs = StringUtils.isBlank(bo.getString("CS"))?"":bo.getString("CS");
String sq = StringUtils.isBlank(bo.getString("SQ"))?"":bo.getString("SQ");
String cs = StringUtils.isBlank(bo.getString("SHIQU"))?"":bo.getString("SHIQU");
String sq = StringUtils.isBlank(bo.getString("SHENGQU"))?"":bo.getString("SHENGQU");
boolean found = false; // 标记是否找到匹配
for (BO bo1 : mpqyList) {
String ss = bo1.getString("SS");
@ -888,7 +933,7 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
String startDate = "";
String endDate = "";
// 查询梦牌区域
List<BO> mpqyList = SDK.getBOAPI().query("BO_EU_DATALINKUP_QYGSED").addQuery("BKGS = ", "梦牌").list();
List<BO> mpqyList = SDK.getBOAPI().query("BO_EU_DATALINKUP_QYGSED").addQuery("SS IS NOT NULL",null).addQuery("BKGS = ", "梦牌").list();
// 查询龙牌区域
List<BO> lpqyList = SDK.getBOAPI().query("BO_EU_QYGX").list();
try {
@ -951,8 +996,8 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
}
//销售汇总表修改区域公司
if ("梦牌".equals(bo.getString("BKGS"))) {
String cs = StringUtils.isBlank(bo.getString("CS"))?"":bo.getString("CS");
String sq = StringUtils.isBlank(bo.getString("SQ"))?"":bo.getString("SQ");
String cs = StringUtils.isBlank(bo.getString("SHIQU"))?"":bo.getString("SHIQU");
String sq = StringUtils.isBlank(bo.getString("SHENGQU"))?"":bo.getString("SHENGQU");
boolean found = false; // 标记是否找到匹配
for (BO bo1 : mpqyList) {
String ss = bo1.getString("SS");
@ -1071,20 +1116,51 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
int pageNo = 1;
boolean hasMore;
RDSAPI rdsapi = null;
DBUtils.SUPPLY supply = null;
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
SimpleDateFormat simpleDateFormat2 = new SimpleDateFormat("yyyy-MM-dd");
// 重试配置
int maxRetries = 3;
int retryDelayMs = 2000; // 重试延迟2秒
try {
// 带重试机制的连接获取
for (int retry = 0; retry < maxRetries; retry++) {
try {
rdsapi = SDK.getCCAPI().getRDSAPI(ccId);
DBUtils.SUPPLY supply = rdsapi.getSupply();
supply = rdsapi.getSupply();
if (rdsapi != null && supply!=null) {
break; // 连接成功跳出重试循环
}
} catch (Exception e) {
if (retry == maxRetries - 1) {
String result = fieldMappings.stream()
.filter(bo -> "BKGS".equals(bo.getString("LDBZD"))) // 条件1: LDBZD "BKGS"
.filter(bo -> bo.getString("LDZDMRZ") != null) // 条件2: LDZDMRZ 不为空
.map(bo -> bo.getString("LDZDMRZ")) // 提取 LDZDMRZ 的值
.collect(Collectors.joining()); // 收集到 List
// 最后一次重试仍然失败
String errorMsg = String.format("获取数据库连接失败(CC_ID: %s),板块为:%s已重试%d次同步时间为%s\n错误信息为%s",
ccId, result, maxRetries, LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")), e.getMessage());
LOGGER.error(errorMsg, e);
// 发送邮件通知
boolean eMailSync = SDK.getNotificationAPI().sendEMailSync(SEND_EMAIL_ACCOUNT_NUMBER, RECEIVE_EMAIL_ACCOUNT, null, "数据贯通数据库连接通知", errorMsg);
if (eMailSync){
LOGGER.info("短信发送成功");
}else {
LOGGER.info("短信发送失败");
}
throw new RuntimeException(errorMsg, e);
}
LOGGER.warn("获取数据库连接失败(CC_ID: {}),第{}次重试...", ccId, retry + 1);
Thread.sleep(retryDelayMs);
}
}
String DBname = supply.getName();
LOGGER.info("数据库为:{}",DBname);
LOGGER.info("数据库为:{}", DBname);
// 计算时间范围并拆分为30天一组
List<Date[]> timeRanges = splitTimeRange(startDated, endDated, 30);
LOGGER.info("时间范围拆分为 {} 个查询区间", timeRanges.size());
String maxPartition = "";
if (partitionField != null && !partitionField.isEmpty()) {
// 1. 查询最大分区值
// 查询最大分区值
String maxPartitionSql = "SELECT MAX(" + partitionField + ") AS max_partition FROM " + tableName;
List<RowMap> maxPartitionResult = rdsapi.getMaps(maxPartitionSql);
@ -1097,13 +1173,6 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
LOGGER.info("表[{}]的最大分区为: {}", tableName, maxPartition);
}
for (int i = 0; i < timeRanges.size(); i++) {
Date[] range = timeRanges.get(i);
String startDate = simpleDateFormat2.format(range[0]);
String endDate = simpleDateFormat2.format(range[1]);
LOGGER.info("正在处理第 {} 个时间区间: {} 至 {}", i + 1, startDate, endDate);
// 构建基础查询条件
StringBuilder conditionBuilder = new StringBuilder();
StringBuilder orderByBuilder = new StringBuilder();
@ -1115,101 +1184,67 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
.append(" = '")
.append(maxPartition)
.append("'");
}
// 时间字段条件
if (timeField != null && !timeField.isEmpty()) {
if (conditionBuilder.length() > 0) {
conditionBuilder.append(" AND ");
}
if ("ORACLE".equalsIgnoreCase(DBname)) {
conditionBuilder.append("TO_DATE(")
.append(timeField)
.append(", '")
.append(ORACLE_DATE_FORMAT)
.append("') >= TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("') AND TO_DATE(")
.append(timeField)
.append(", '")
.append(ORACLE_DATE_FORMAT)
.append("') < TO_DATE(?, '")
.append(ORACLE_DATE_FORMAT)
.append("')");
} else {
conditionBuilder.append(timeField).append(" >= ?")
.append(" AND ")
.append(timeField).append(" < ?");
}
params.add(startDate);
params.add(endDate);
// 排序条件
orderByBuilder.append(" ORDER BY ").append(timeField);
if (jezd != null && !jezd.isEmpty()) {
orderByBuilder.append(", ").append(jezd);
}
orderByBuilder.append(" DESC");
} else if (conditionBuilder.length() == 0) {
// 既没有分区字段也没有时间字段查询全表
LOGGER.warn("警告:未配置分区字段和时间字段,将查询全表数据!");
}else {
conditionBuilder.append("1=1");
}
// 添加每月最大日期条件
if (timeField != null && !timeField.isEmpty()) {
if (conditionBuilder.length() > 0) {
conditionBuilder.append(" AND ");
}
// 根据不同数据库类型实现每月最大日期查询
if ("ORACLE".equalsIgnoreCase(DBname)) {
conditionBuilder.append("TO_CHAR(").append(timeField).append(", 'YYYY-MM') = ")
.append("(SELECT TO_CHAR(MAX(").append(timeField).append("), 'YYYY-MM') FROM ")
.append(tableName).append(" WHERE ").append(conditionBuilder.toString()).append(")");
} else if ("POSTGRESQL".equalsIgnoreCase(DBname) || "GAUSS".equalsIgnoreCase(DBname)) {
// 高斯和PG使用相同的语法
conditionBuilder.append("TO_CHAR(").append(timeField).append(", 'YYYY-MM') = ")
.append("(SELECT TO_CHAR(MAX(").append(timeField).append("), 'YYYY-MM') FROM ")
.append(tableName).append(" WHERE ").append(conditionBuilder.toString()).append(")");
} else if ("SQLSERVER".equalsIgnoreCase(DBname)) {
conditionBuilder.append("FORMAT(").append(timeField).append(", 'yyyy-MM') = ")
.append("(SELECT FORMAT(MAX(").append(timeField).append("), 'yyyy-MM') FROM ")
.append(tableName).append(" WHERE ").append(conditionBuilder.toString()).append(")");
} else {
// 其他数据库使用通用方法
conditionBuilder.append("DATE_FORMAT(").append(timeField).append(", '%Y-%m') = ")
.append("(SELECT DATE_FORMAT(MAX(").append(timeField).append("), '%Y-%m') FROM ")
.append(tableName).append(" WHERE ").append(conditionBuilder.toString()).append(")");
}
}
// 分页查询数据
do {
String startDate = simpleDateFormat2.format(startDated);
String endDate = simpleDateFormat2.format(endDated);
// 修改后的querySourceDataYS方法中的SQL查询部分
String querySql;
if ("ORACLE".equalsIgnoreCase(DBname)) {
// 使用Oracle分页语法 (12c+)
querySql = "SELECT * FROM " + tableName +
" WHERE " + conditionBuilder.toString() +
orderByBuilder.toString();
querySql = "SELECT t1.* FROM " + tableName + " t1 " +
"JOIN (SELECT TRUNC(TO_DATE(" + timeField + ", 'YYYY-MM-DD'), 'MONTH') AS month_start, " +
"MAX(TO_DATE(" + timeField + ", 'YYYY-MM-DD')) AS max_date " +
"FROM " + tableName + " WHERE " + conditionBuilder.toString() +
" AND TO_DATE(" + timeField + ", 'YYYY-MM-DD') BETWEEN TO_DATE('" + startDate + "', 'YYYY-MM-DD') AND TO_DATE('" + endDate + "', 'YYYY-MM-DD') " +
"GROUP BY TRUNC(TO_DATE(" + timeField + ", 'YYYY-MM-DD'), 'MONTH')) t2 " +
"ON TO_DATE(t1." + timeField + ", 'YYYY-MM-DD') = t2.max_date " +
"WHERE " + conditionBuilder.toString() +
" AND TO_DATE(t1." + timeField + ", 'YYYY-MM-DD') BETWEEN TO_DATE('" + startDate + "', 'YYYY-MM-DD') AND TO_DATE('" + endDate + "', 'YYYY-MM-DD')";
} else if ("POSTGRESQL".equalsIgnoreCase(DBname) || "GAUSS".equalsIgnoreCase(DBname)) {
querySql = "SELECT t1.* FROM " + tableName + " t1 " +
"JOIN (SELECT DATE_TRUNC('MONTH', " + timeField + "::DATE) AS month_start, " +
"MAX(" + timeField + "::DATE) AS max_date " +
"FROM " + tableName + " WHERE "+conditionBuilder.toString()+" AND full_date::DATE BETWEEN '"+startDate+"' AND '"+endDate+"' " +
"GROUP BY DATE_TRUNC('MONTH', " + timeField + "::DATE)) t2 " +
"ON t1." + timeField + "::DATE = t2.max_date " +
"WHERE t1." +conditionBuilder.toString()+
" AND t1."+timeField+"::DATE BETWEEN '"+startDate+"' AND '"+endDate+"'";
} else if ("SQLSERVER".equalsIgnoreCase(DBname)) {
querySql = "SELECT t1.* FROM " + tableName + " t1 " +
"JOIN (SELECT DATEADD(MONTH, DATEDIFF(MONTH, 0, CONVERT(DATE, " + timeField + ")), 0) AS month_start, " +
"MAX(CONVERT(DATE, " + timeField + ")) AS max_date " +
"FROM " + tableName + " WHERE " + conditionBuilder.toString() +
" AND CONVERT(DATE, " + timeField + ") BETWEEN CONVERT(DATE, '" + startDate + "') AND CONVERT(DATE, '" + endDate + "') " +
"GROUP BY DATEADD(MONTH, DATEDIFF(MONTH, 0, CONVERT(DATE, " + timeField + ")), 0)) t2 " +
"ON CONVERT(DATE, t1." + timeField + ") = t2.max_date " +
"WHERE " + conditionBuilder.toString() +
" AND CONVERT(DATE, t1." + timeField + ") BETWEEN CONVERT(DATE, '" + startDate + "') AND CONVERT(DATE, '" + endDate + "')";
} else {
querySql = "SELECT * FROM " + tableName +
" WHERE " + conditionBuilder.toString() +
orderByBuilder.toString();
// 默认处理如MySQL
querySql = "SELECT t1.* FROM " + tableName + " t1 " +
"JOIN (SELECT DATE_FORMAT(STR_TO_DATE(" + timeField + ", '%Y-%m-%d'), '%Y-%m-01') AS month_start, " +
"MAX(STR_TO_DATE(" + timeField + ", '%Y-%m-%d')) AS max_date " +
"FROM " + tableName + " WHERE " + conditionBuilder.toString() +
" AND STR_TO_DATE(" + timeField + ", '%Y-%m-%d') BETWEEN STR_TO_DATE('" + startDate + "', '%Y-%m-%d') AND STR_TO_DATE('" + endDate + "', '%Y-%m-%d') " +
"GROUP BY DATE_FORMAT(STR_TO_DATE(" + timeField + ", '%Y-%m-%d'), '%Y-%m-01')) t2 " +
"ON STR_TO_DATE(t1." + timeField + ", '%Y-%m-%d') = t2.max_date " +
"WHERE " + conditionBuilder.toString() +
" AND STR_TO_DATE(t1." + timeField + ", '%Y-%m-%d') BETWEEN STR_TO_DATE('" + startDate + "', '%Y-%m-%d') AND STR_TO_DATE('" + endDate + "', '%Y-%m-%d')";
}
LOGGER.info("执行查询: {}", querySql);
List<RowMap> pageData;
if (params.isEmpty()) {
pageData = rdsapi.getMaps(querySql);
} else {
pageData = rdsapi.getMaps(querySql, params.toArray());
pageData = rdsapi.getMaps(querySql);
}
if (pageData != null && !pageData.isEmpty()) {
// 直接处理当前页数据
int successCount = this.processAndInsertData(pageData, fieldMappings, targetTable);
totalRows += pageData.size();
totalSuccess += successCount;
@ -1219,7 +1254,6 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
hasMore = false;
}
} while (hasMore);
}
//每月1号删除上月去年同期去除每月最后一天的数据删除当月去年同期全量数据后新增当月去年同期全量数据
// 每月1号执行的任务
@ -1229,18 +1263,23 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
// if (dayOfMonth == 1) {
try {
LOGGER.info("开始执行每月1号的特殊数据处理任务");
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
// 计算时间范围
LocalDate lastYearMonthDate = now.minusYears(1).minusMonths(1);
int lastYearMonthValue = lastYearMonthDate.getMonthValue();
int lastyear = lastYearMonthDate.getYear();
// 计算去年上月期第一天当月第一天
LocalDate firstDayOfLastYearLastMonth = lastYearMonthDate.withDayOfMonth(1);
// 计算去年上月个月最后一天
LocalDate firstDayOfLastMonthLastYear = lastYearMonthDate.plusMonths(1).withDayOfMonth(1);
String lastYearLastMonthFirstDayStr = firstDayOfLastYearLastMonth.format(formatter);
String lastYearLastMonthEndDayStr = firstDayOfLastMonthLastYear.format(formatter);
// 1. 删除当月去年同期全量数据
String deleteCurrentMonthLastYearSql = "DELETE FROM " + targetTable +
" WHERE YEAR = " + lastyear +
" AND MONTH = " + lastYearMonthValue;
" WHERE RQ >= '" + lastYearLastMonthFirstDayStr +
"' AND RQ < '" + lastYearLastMonthEndDayStr+"'";
int update = DBSql.update(deleteCurrentMonthLastYearSql);
LOGGER.info("已删除去年同期({}-{})的全量数据,删除了{}条数据", lastyear, lastYearMonthValue,update);
LOGGER.info("已删除去年同期({}-{})的全量数据,删除了{}条数据", lastYearLastMonthFirstDayStr, lastYearLastMonthEndDayStr,update);
// 2. 新增当月去年同期数据
// 去年日期
@ -1250,20 +1289,19 @@ public class SaleDataSyncServiceImpl implements DataSyncService {
// 计算去年下个月第一天
LocalDate firstDayOfNextMonthLastYear = minusYears.plusMonths(1).withDayOfMonth(1);
// 格式化日期为字符串根据数据库格式要求调整
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
String lastYearFirstDayStr = firstDayOfLastYearMonth.format(formatter);
String lastYearNextMonthFirstDayStr = firstDayOfNextMonthLastYear.format(formatter);
//删除清空去年同期数据
int update1 = DBSql.update("DELETE FROM " + targetTable +
" WHERE YEAR = " + firstDayOfLastYearMonth.getYear() +
" AND MONTH = " + firstDayOfLastYearMonth.getMonthValue());
" WHERE RQ = '" + lastYearNextMonthFirstDayStr+"'");
LOGGER.info("先清空{}去年同期数{}",lastYearFirstDayStr,update1);
// 构建插入SQL注意需确保字段匹配且处理可能的主键冲突
String insertCurrentMonthLastYearSql = "INSERT INTO " + targetTable +
" SELECT * FROM " + targetTable +
" WHERE " + timeField + " >= '" + lastYearFirstDayStr +
"' AND " + timeField + " < '" + lastYearNextMonthFirstDayStr + "'";
" WHERE RQ >= '" + lastYearFirstDayStr +
"' AND RQ < '" + lastYearNextMonthFirstDayStr + "'";
// 执行插入操作
rdsapi.update(insertCurrentMonthLastYearSql);
LOGGER.info("{}已新增当月去年同期数据",lastYearFirstDayStr);