fix(api): 修复latest_op_ts无法记录
mysql的greatest函数,null永远是最大值
This commit is contained in:
@@ -250,7 +250,7 @@ public class SyncStateService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void saveCompactionLatestOperationTime(Long flinkJobId, String alias, Long latestOperationTime) {
|
public void saveCompactionLatestOperationTime(Long flinkJobId, String alias, Long latestOperationTime) {
|
||||||
Date operationDate = ObjectUtil.isNull(latestOperationTime) ? null : Date.from(Instant.ofEpochMilli(latestOperationTime));
|
Date operationDate = latestOperationTime == 0 ? null : Date.from(Instant.ofEpochMilli(latestOperationTime));
|
||||||
jdbcTemplate.update(
|
jdbcTemplate.update(
|
||||||
SqlBuilder
|
SqlBuilder
|
||||||
.insertInto(
|
.insertInto(
|
||||||
@@ -260,10 +260,18 @@ public class SyncStateService {
|
|||||||
)
|
)
|
||||||
.values()
|
.values()
|
||||||
.addValue(null, null)
|
.addValue(null, null)
|
||||||
.onDuplicateKeyUpdateSetter(StrUtil.format("{} = GREATEST({}, ?)", TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O, TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O))
|
.onDuplicateKeyUpdateSetter(
|
||||||
|
StrUtil.format(
|
||||||
|
"{} = if({} is null, ?, greatest({}, ?))",
|
||||||
|
TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O,
|
||||||
|
TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O,
|
||||||
|
TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O
|
||||||
|
)
|
||||||
|
)
|
||||||
.precompileSql(),
|
.precompileSql(),
|
||||||
syncStateId(flinkJobId, alias),
|
syncStateId(flinkJobId, alias),
|
||||||
operationDate,
|
operationDate,
|
||||||
|
operationDate,
|
||||||
operationDate
|
operationDate
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -297,8 +297,4 @@ public class YarnCommand extends AbstractUtilShellComponent {
|
|||||||
) : ""
|
) : ""
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ShellMethod("test")
|
|
||||||
public void test() {
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -69,10 +69,23 @@ public class SqlBuilderTests {
|
|||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
System.out.println(SqlUtil.formatSql(
|
System.out.println(SqlUtil.formatSql(
|
||||||
SqlBuilder.update(TbAppHudiSyncState._origin_)
|
SqlBuilder
|
||||||
.set(TbAppHudiSyncState.PULSAR_BACK_LOG_O, "?")
|
.insertInto(
|
||||||
.whereEq(TbAppHudiSyncState.ID_O, Column.as("?"))
|
TbAppHudiSyncState._origin_,
|
||||||
.build()
|
TbAppHudiSyncState.ID_O,
|
||||||
|
TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O
|
||||||
|
)
|
||||||
|
.values()
|
||||||
|
.addValue(null, null)
|
||||||
|
.onDuplicateKeyUpdateSetter(
|
||||||
|
StrUtil.format(
|
||||||
|
"{} = if({} is null, ?, greatest({}, ?))",
|
||||||
|
TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O,
|
||||||
|
TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O,
|
||||||
|
TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.precompileSql()
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ public class CompactionEventHandler implements CompactEventHandler, Serializable
|
|||||||
public void success(String instant, List<WriteStatus> statuses, HoodieCommitMetadata metadata) {
|
public void success(String instant, List<WriteStatus> statuses, HoodieCommitMetadata metadata) {
|
||||||
StatusUtils.compactionCommit(configuration, flinkJob, tableMeta, instant, metadata);
|
StatusUtils.compactionCommit(configuration, flinkJob, tableMeta, instant, metadata);
|
||||||
// Trace latest_op_ts
|
// Trace latest_op_ts
|
||||||
Long max = statuses.stream()
|
statuses.stream()
|
||||||
.map(status -> {
|
.map(status -> {
|
||||||
if (status instanceof TraceWriteStatus) {
|
if (status instanceof TraceWriteStatus) {
|
||||||
TraceWriteStatus s = (TraceWriteStatus) status;
|
TraceWriteStatus s = (TraceWriteStatus) status;
|
||||||
@@ -49,9 +49,10 @@ public class CompactionEventHandler implements CompactEventHandler, Serializable
|
|||||||
return 0L;
|
return 0L;
|
||||||
})
|
})
|
||||||
.max(Long::compare)
|
.max(Long::compare)
|
||||||
.orElse(0L);
|
.ifPresent(max -> {
|
||||||
logger.info("Latest op ts: {}", max);
|
logger.info("Latest op ts: {}", max);
|
||||||
StatusUtils.compactionLatestOpTs(configuration, flinkJob, tableMeta, max);
|
StatusUtils.compactionLatestOpTs(configuration, flinkJob, tableMeta, max);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -58,7 +58,7 @@ public class StatusUtils {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static void syncCheckpoint(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String messageId, Long publishTime) {
|
public static void syncCheckpoint(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String messageId, Long publishTime) {
|
||||||
logger.info("Enter method: syncCheckpoint[configuration, flinkJob, tableMeta, messageId, publishTime]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "messageId:" + messageId + "," + "publishTime:" + publishTime);
|
logger.info("Enter method: syncCheckpoint[configuration, flinkJob, tableMeta, messageId, publishTime]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob.getId() + "," + "tableMeta:" + tableMeta.getAlias() + "," + "messageId:" + messageId + "," + "publishTime:" + publishTime);
|
||||||
try {
|
try {
|
||||||
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
|
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
|
||||||
.run(() -> HttpUtil.createGet(
|
.run(() -> HttpUtil.createGet(
|
||||||
@@ -229,7 +229,7 @@ public class StatusUtils {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static void versionUpdate(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String version, String opts) {
|
public static void versionUpdate(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String version, String opts) {
|
||||||
logger.info("Enter method: versionUpdate[configuration, flinkJob, tableMeta, version, opts]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "version:" + version + "," + "opts:" + opts);
|
logger.info("Enter method: versionUpdate[configuration, flinkJob, tableMeta, version, opts]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob.getId() + "," + "tableMeta:" + tableMeta.getAlias() + "," + "version:" + version + "," + "opts:" + opts);
|
||||||
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
|
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
|
||||||
.run(() ->
|
.run(() ->
|
||||||
HttpUtil.createGet(
|
HttpUtil.createGet(
|
||||||
@@ -250,7 +250,7 @@ public class StatusUtils {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static void compactionLatestOpTs(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, Long latestOpTs) {
|
public static void compactionLatestOpTs(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, Long latestOpTs) {
|
||||||
logger.info("Enter method: compactionLatestOpTs[configuration, flinkJob, tableMeta, latestOpTs]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "latestOpTs:" + latestOpTs);
|
logger.info("Enter method: compactionLatestOpTs[configuration, flinkJob, tableMeta, latestOpTs]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob.getId() + "," + "tableMeta:" + tableMeta.getAlias() + "," + "latestOpTs:" + latestOpTs);
|
||||||
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
|
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
|
||||||
.run(() ->
|
.run(() ->
|
||||||
HttpUtil.createGet(
|
HttpUtil.createGet(
|
||||||
|
|||||||
Reference in New Issue
Block a user