From e35733a2d81eef08d3e3717f4d5f867fc2eee8be Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Thu, 9 May 2024 18:22:42 +0800 Subject: [PATCH] =?UTF-8?q?feat(api):=20=E5=AF=B9=E4=BA=8E=E9=9C=80?= =?UTF-8?q?=E8=A6=81=E8=AE=B0=E5=BD=95=E6=9C=80=E5=90=8E=E6=97=B6=E9=97=B4?= =?UTF-8?q?=E7=9A=84=E5=AD=97=E6=AE=B5=E4=BD=BF=E7=94=A8=E6=AF=94=E8=BE=83?= =?UTF-8?q?=E6=96=B9=E5=BC=8F=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/api/service/SyncStateService.java | 45 +++++++++++++++++-- 1 file changed, 41 insertions(+), 4 deletions(-) diff --git a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java index 440a7d1..f8f7e17 100644 --- a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java +++ b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java @@ -3,6 +3,7 @@ package com.lanyuanxiaoyao.service.api.service; import club.kingon.sql.builder.SqlBuilder; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.api.entity.HoodieCommitMetadata; import com.lanyuanxiaoyao.service.common.Constants; import java.time.Instant; @@ -119,9 +120,18 @@ public class SyncStateService { ) .values() .addValue(null, null) - .onDuplicateKeyUpdateColumn(TbAppHudiSyncState.SOURCE_PUBLISH_TIME_O) + .onDuplicateKeyUpdateSetter( + StrUtil.format( + "{} = if({} is null, ?, greatest({}, ?))", + TbAppHudiSyncState.SOURCE_PUBLISH_TIME_O, + TbAppHudiSyncState.SOURCE_PUBLISH_TIME_O, + TbAppHudiSyncState.SOURCE_PUBLISH_TIME_O + ) + ) .precompileSql(), syncStateId(flinkJobId, alias), + publishDate, + publishDate, publishDate ); } @@ -137,9 +147,18 @@ public class SyncStateService { ) .values() .addValue(null, null) - .onDuplicateKeyUpdateColumn(TbAppHudiSyncState.SOURCE_RECEIVE_TIME_O) + .onDuplicateKeyUpdateSetter( + StrUtil.format( + "{} = if({} is null, ?, greatest({}, ?))", + TbAppHudiSyncState.SOURCE_RECEIVE_TIME_O, + TbAppHudiSyncState.SOURCE_RECEIVE_TIME_O, + TbAppHudiSyncState.SOURCE_RECEIVE_TIME_O + ) + ) .precompileSql(), syncStateId(flinkJobId, alias), + receiveDate, + receiveDate, receiveDate ); } @@ -155,9 +174,18 @@ public class SyncStateService { ) .values() .addValue(null, null) - .onDuplicateKeyUpdateColumn(TbAppHudiSyncState.SOURCE_OP_TIME_O) + .onDuplicateKeyUpdateSetter( + StrUtil.format( + "{} = if({} is null, ?, greatest({}, ?))", + TbAppHudiSyncState.SOURCE_OP_TIME_O, + TbAppHudiSyncState.SOURCE_OP_TIME_O, + TbAppHudiSyncState.SOURCE_OP_TIME_O + ) + ) .precompileSql(), syncStateId(flinkJobId, alias), + operationDate, + operationDate, operationDate ); } @@ -299,9 +327,18 @@ public class SyncStateService { ) .values() .addValue(null, null) - .onDuplicateKeyUpdateColumn(TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O) + .onDuplicateKeyUpdateSetter( + StrUtil.format( + "{} = if({} is null, ?, greatest({}, ?))", + TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O, + TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O, + TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O + ) + ) .precompileSql(), syncStateId(flinkJobId, alias), + operationDate, + operationDate, operationDate ); }