From 23ced7a2e1c2535c73fb256602c9657042bb7d9e Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Wed, 8 May 2024 14:16:46 +0800 Subject: [PATCH] =?UTF-8?q?fix(api):=20=E4=BF=AE=E5=A4=8Dlatest=5Fop=5Fts?= =?UTF-8?q?=E6=97=A0=E6=B3=95=E8=AE=B0=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit mysql的greatest函数,null永远是最大值 --- .../service/api/service/SyncStateService.java | 12 +++++++++-- .../service/command/commands/YarnCommand.java | 4 ---- .../test/java/com/test/SqlBuilderTests.java | 21 +++++++++++++++---- .../functions/CompactionEventHandler.java | 9 ++++---- .../service/sync/utils/StatusUtils.java | 6 +++--- 5 files changed, 35 insertions(+), 17 deletions(-) diff --git a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java index 28c4ffa..8728c5f 100644 --- a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java +++ b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java @@ -250,7 +250,7 @@ public class SyncStateService { } public void saveCompactionLatestOperationTime(Long flinkJobId, String alias, Long latestOperationTime) { - Date operationDate = ObjectUtil.isNull(latestOperationTime) ? null : Date.from(Instant.ofEpochMilli(latestOperationTime)); + Date operationDate = latestOperationTime == 0 ? null : Date.from(Instant.ofEpochMilli(latestOperationTime)); jdbcTemplate.update( SqlBuilder .insertInto( @@ -260,10 +260,18 @@ public class SyncStateService { ) .values() .addValue(null, null) - .onDuplicateKeyUpdateSetter(StrUtil.format("{} = GREATEST({}, ?)", TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O, TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O)) + .onDuplicateKeyUpdateSetter( + StrUtil.format( + "{} = if({} is null, ?, greatest({}, ?))", + TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O, + TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O, + TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O + ) + ) .precompileSql(), syncStateId(flinkJobId, alias), operationDate, + operationDate, operationDate ); } diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/YarnCommand.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/YarnCommand.java index 54ed29c..269f3f1 100644 --- a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/YarnCommand.java +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/YarnCommand.java @@ -297,8 +297,4 @@ public class YarnCommand extends AbstractUtilShellComponent { ) : "" ); } - - @ShellMethod("test") - public void test() { - } } diff --git a/service-info-query/src/test/java/com/test/SqlBuilderTests.java b/service-info-query/src/test/java/com/test/SqlBuilderTests.java index b991ac8..26a14ea 100644 --- a/service-info-query/src/test/java/com/test/SqlBuilderTests.java +++ b/service-info-query/src/test/java/com/test/SqlBuilderTests.java @@ -69,10 +69,23 @@ public class SqlBuilderTests { public static void main(String[] args) { System.out.println(SqlUtil.formatSql( - SqlBuilder.update(TbAppHudiSyncState._origin_) - .set(TbAppHudiSyncState.PULSAR_BACK_LOG_O, "?") - .whereEq(TbAppHudiSyncState.ID_O, Column.as("?")) - .build() + SqlBuilder + .insertInto( + TbAppHudiSyncState._origin_, + TbAppHudiSyncState.ID_O, + TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O + ) + .values() + .addValue(null, null) + .onDuplicateKeyUpdateSetter( + StrUtil.format( + "{} = if({} is null, ?, greatest({}, ?))", + TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O, + TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O, + TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O + ) + ) + .precompileSql() )); } } diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/CompactionEventHandler.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/CompactionEventHandler.java index 20de2f8..1cad1cf 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/CompactionEventHandler.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/CompactionEventHandler.java @@ -40,7 +40,7 @@ public class CompactionEventHandler implements CompactEventHandler, Serializable public void success(String instant, List statuses, HoodieCommitMetadata metadata) { StatusUtils.compactionCommit(configuration, flinkJob, tableMeta, instant, metadata); // Trace latest_op_ts - Long max = statuses.stream() + statuses.stream() .map(status -> { if (status instanceof TraceWriteStatus) { TraceWriteStatus s = (TraceWriteStatus) status; @@ -49,9 +49,10 @@ public class CompactionEventHandler implements CompactEventHandler, Serializable return 0L; }) .max(Long::compare) - .orElse(0L); - logger.info("Latest op ts: {}", max); - StatusUtils.compactionLatestOpTs(configuration, flinkJob, tableMeta, max); + .ifPresent(max -> { + logger.info("Latest op ts: {}", max); + StatusUtils.compactionLatestOpTs(configuration, flinkJob, tableMeta, max); + }); } @Override diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/StatusUtils.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/StatusUtils.java index b595bf9..08e3647 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/StatusUtils.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/StatusUtils.java @@ -58,7 +58,7 @@ public class StatusUtils { } public static void syncCheckpoint(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String messageId, Long publishTime) { - logger.info("Enter method: syncCheckpoint[configuration, flinkJob, tableMeta, messageId, publishTime]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "messageId:" + messageId + "," + "publishTime:" + publishTime); + logger.info("Enter method: syncCheckpoint[configuration, flinkJob, tableMeta, messageId, publishTime]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob.getId() + "," + "tableMeta:" + tableMeta.getAlias() + "," + "messageId:" + messageId + "," + "publishTime:" + publishTime); try { Failsafe.with(RetryPolicyProvider.HTTP_RETRY) .run(() -> HttpUtil.createGet( @@ -229,7 +229,7 @@ public class StatusUtils { } public static void versionUpdate(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String version, String opts) { - logger.info("Enter method: versionUpdate[configuration, flinkJob, tableMeta, version, opts]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "version:" + version + "," + "opts:" + opts); + logger.info("Enter method: versionUpdate[configuration, flinkJob, tableMeta, version, opts]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob.getId() + "," + "tableMeta:" + tableMeta.getAlias() + "," + "version:" + version + "," + "opts:" + opts); Failsafe.with(RetryPolicyProvider.HTTP_RETRY) .run(() -> HttpUtil.createGet( @@ -250,7 +250,7 @@ public class StatusUtils { } public static void compactionLatestOpTs(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, Long latestOpTs) { - logger.info("Enter method: compactionLatestOpTs[configuration, flinkJob, tableMeta, latestOpTs]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "latestOpTs:" + latestOpTs); + logger.info("Enter method: compactionLatestOpTs[configuration, flinkJob, tableMeta, latestOpTs]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob.getId() + "," + "tableMeta:" + tableMeta.getAlias() + "," + "latestOpTs:" + latestOpTs); Failsafe.with(RetryPolicyProvider.HTTP_RETRY) .run(() -> HttpUtil.createGet(