diff --git a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java index 440a7d1..f8f7e17 100644 --- a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java +++ b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java @@ -3,6 +3,7 @@ package com.lanyuanxiaoyao.service.api.service; import club.kingon.sql.builder.SqlBuilder; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.api.entity.HoodieCommitMetadata; import com.lanyuanxiaoyao.service.common.Constants; import java.time.Instant; @@ -119,9 +120,18 @@ public class SyncStateService { ) .values() .addValue(null, null) - .onDuplicateKeyUpdateColumn(TbAppHudiSyncState.SOURCE_PUBLISH_TIME_O) + .onDuplicateKeyUpdateSetter( + StrUtil.format( + "{} = if({} is null, ?, greatest({}, ?))", + TbAppHudiSyncState.SOURCE_PUBLISH_TIME_O, + TbAppHudiSyncState.SOURCE_PUBLISH_TIME_O, + TbAppHudiSyncState.SOURCE_PUBLISH_TIME_O + ) + ) .precompileSql(), syncStateId(flinkJobId, alias), + publishDate, + publishDate, publishDate ); } @@ -137,9 +147,18 @@ public class SyncStateService { ) .values() .addValue(null, null) - .onDuplicateKeyUpdateColumn(TbAppHudiSyncState.SOURCE_RECEIVE_TIME_O) + .onDuplicateKeyUpdateSetter( + StrUtil.format( + "{} = if({} is null, ?, greatest({}, ?))", + TbAppHudiSyncState.SOURCE_RECEIVE_TIME_O, + TbAppHudiSyncState.SOURCE_RECEIVE_TIME_O, + TbAppHudiSyncState.SOURCE_RECEIVE_TIME_O + ) + ) .precompileSql(), syncStateId(flinkJobId, alias), + receiveDate, + receiveDate, receiveDate ); } @@ -155,9 +174,18 @@ public class SyncStateService { ) .values() .addValue(null, null) - .onDuplicateKeyUpdateColumn(TbAppHudiSyncState.SOURCE_OP_TIME_O) + .onDuplicateKeyUpdateSetter( + StrUtil.format( + "{} = if({} is null, ?, greatest({}, ?))", + TbAppHudiSyncState.SOURCE_OP_TIME_O, + TbAppHudiSyncState.SOURCE_OP_TIME_O, + TbAppHudiSyncState.SOURCE_OP_TIME_O + ) + ) .precompileSql(), syncStateId(flinkJobId, alias), + operationDate, + operationDate, operationDate ); } @@ -299,9 +327,18 @@ public class SyncStateService { ) .values() .addValue(null, null) - .onDuplicateKeyUpdateColumn(TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O) + .onDuplicateKeyUpdateSetter( + StrUtil.format( + "{} = if({} is null, ?, greatest({}, ?))", + TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O, + TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O, + TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O + ) + ) .precompileSql(), syncStateId(flinkJobId, alias), + operationDate, + operationDate, operationDate ); }