diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/CompactionEventHandler.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/CompactionEventHandler.java index ec7a286..20de2f8 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/CompactionEventHandler.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/CompactionEventHandler.java @@ -1,14 +1,12 @@ package com.lanyuanxiaoyao.service.sync.functions; -import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.entity.FlinkJob; import com.lanyuanxiaoyao.service.common.entity.TableMeta; -import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper; import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration; -import com.lanyuanxiaoyao.service.sync.configuration.TraceWriteStatus; import com.lanyuanxiaoyao.service.sync.utils.StatusUtils; import java.io.Serializable; import java.util.List; +import org.apache.hudi.client.TraceWriteStatus; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.sink.compact.CompactEventHandler; @@ -41,21 +39,19 @@ public class CompactionEventHandler implements CompactEventHandler, Serializable @Override public void success(String instant, List statuses, HoodieCommitMetadata metadata) { StatusUtils.compactionCommit(configuration, flinkJob, tableMeta, instant, metadata); - logger.info("WriteStatus: {}", statuses); - if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_TRACE_LATEST_OP_TS)) { - Long max = statuses.stream() - .map(status -> { - if (status instanceof TraceWriteStatus) { - TraceWriteStatus s = (TraceWriteStatus) status; - return s.getLatestOpts(); - } - return 0L; - }) - .max(Long::compare) - .orElse(0L); - logger.info("Latest op ts: {}", max); - StatusUtils.compactionLatestOpTs(configuration, flinkJob, tableMeta, max); - } + // Trace latest_op_ts + Long max = statuses.stream() + .map(status -> { + if (status instanceof TraceWriteStatus) { + TraceWriteStatus s = (TraceWriteStatus) status; + return s.getLatestOpts(); + } + return 0L; + }) + .max(Long::compare) + .orElse(0L); + logger.info("Latest op ts: {}", max); + StatusUtils.compactionLatestOpTs(configuration, flinkJob, tableMeta, max); } @Override diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/StatusUtils.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/StatusUtils.java index 3f782e5..b595bf9 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/StatusUtils.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/StatusUtils.java @@ -30,7 +30,7 @@ public class StatusUtils { private static final int HTTP_TIMEOUT = (int) Constants.MINUTE; public static void syncStart(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta) { - logger.info("Enter method: syncStart[configuration, flinkJob, tableMeta]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta); + // logger.info("Enter method: syncStart[configuration, flinkJob, tableMeta]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta); try { Failsafe.with(RetryPolicyProvider.HTTP_RETRY) .run(() -> @@ -86,7 +86,7 @@ public class StatusUtils { } public static void syncOperation(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, Long operationTime) { - logger.info("Enter method: syncOperation[configuration, flinkJob, tableMeta, operationTime]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "operationTime:" + operationTime); + // logger.info("Enter method: syncOperation[configuration, flinkJob, tableMeta, operationTime]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "operationTime:" + operationTime); try { Failsafe.with(RetryPolicyProvider.HTTP_RETRY) .run(() -> { @@ -125,7 +125,7 @@ public class StatusUtils { } public static void compactionStart(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta) { - logger.info("Enter method: compactionStart[configuration, flinkJob, tableMeta]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta); + // logger.info("Enter method: compactionStart[configuration, flinkJob, tableMeta]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta); try { Failsafe.with(RetryPolicyProvider.HTTP_RETRY) .run(() -> @@ -151,7 +151,7 @@ public class StatusUtils { } public static void compactionPreCommit(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String instant, Map metadata) { - logger.info("Enter method: compactionPreCommit[configuration, flinkJob, tableMeta, instant, metadata]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "instant:" + instant + "," + "metadata:" + metadata); + // logger.info("Enter method: compactionPreCommit[configuration, flinkJob, tableMeta, instant, metadata]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "instant:" + instant + "," + "metadata:" + metadata); try { Failsafe.with(RetryPolicyProvider.HTTP_RETRY) .run(() -> HttpUtil.createPost( @@ -177,7 +177,7 @@ public class StatusUtils { } public static void compactionCommit(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String instant, HoodieCommitMetadata metadata) { - logger.info("Enter method: compactionCommit[configuration, flinkJob, tableMeta, instant, metadata]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "instant:" + instant + "," + "metadata:" + metadata); + // logger.info("Enter method: compactionCommit[configuration, flinkJob, tableMeta, instant, metadata]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "instant:" + instant + "," + "metadata:" + metadata); try { Failsafe.with(RetryPolicyProvider.HTTP_RETRY) .run(() -> HttpUtil.createPost( @@ -203,7 +203,7 @@ public class StatusUtils { } public static void compactionFinish(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String message, Exception exception) { - logger.info("Enter method: compactionFinish[configuration, flinkJob, tableMeta, message, exception]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "message:" + message + "," + "exception:" + exception); + // logger.info("Enter method: compactionFinish[configuration, flinkJob, tableMeta, message, exception]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "message:" + message + "," + "exception:" + exception); try { Failsafe.with(RetryPolicyProvider.HTTP_RETRY) .run(() -> { diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/SyncUtils.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/SyncUtils.java index bb84c8f..a6f10e3 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/SyncUtils.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/SyncUtils.java @@ -8,10 +8,7 @@ import com.lanyuanxiaoyao.service.common.entity.FlinkJob; import com.lanyuanxiaoyao.service.common.entity.Record; import com.lanyuanxiaoyao.service.common.entity.TableMeta; import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper; -import com.lanyuanxiaoyao.service.sync.configuration.DefaultPartitionNameKeyGenerator; import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration; -import com.lanyuanxiaoyao.service.sync.configuration.TraceOverwriteWithLatestAvroPayload; -import com.lanyuanxiaoyao.service.sync.configuration.TraceWriteStatus; import com.lanyuanxiaoyao.service.sync.functions.OperationTypeFilter; import com.lanyuanxiaoyao.service.sync.functions.Record2RowDataFunction; import com.lanyuanxiaoyao.service.sync.functions.type.TypeConverter; @@ -28,9 +25,11 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; +import org.apache.hudi.client.TraceWriteStatus; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.TraceOverwriteWithLatestAvroPayload; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.util.collection.Pair; @@ -42,6 +41,7 @@ import org.apache.hudi.config.metrics.HoodieMetricsVictoriaConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.keygen.DefaultPartitionNameKeyGenerator; import org.apache.hudi.metrics.MetricsReporterType; import org.apache.hudi.org.apache.avro.Schema; import org.apache.hudi.sink.utils.Pipelines; @@ -77,7 +77,7 @@ public class SyncUtils { configuration = inputConfiguration; } - configuration.setBoolean(HoodieMetricsConfig.TURN_METRICS_ON.key(), false); + configuration.setBoolean(HoodieMetricsConfig.TURN_METRICS_ON.key(), true); configuration.setString(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), MetricsReporterType.VICTORIA.name()); configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_ENDPOINT.key(), globalConfiguration.getMetricPublishPrometheusUrl()); configuration.setInteger(HoodieMetricsVictoriaConfig.VICTORIA_TIMEOUT.key(), 60000); @@ -102,7 +102,7 @@ public class SyncUtils { configuration = inputConfiguration; } - configuration.setBoolean(HoodieMetricsConfig.TURN_METRICS_ON.key(), false); + configuration.setBoolean(HoodieMetricsConfig.TURN_METRICS_ON.key(), true); configuration.setString(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), MetricsReporterType.VICTORIA.name()); configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_ENDPOINT.key(), globalConfiguration.getMetricPublishPrometheusUrl()); configuration.setInteger(HoodieMetricsVictoriaConfig.VICTORIA_TIMEOUT.key(), 60000); @@ -153,11 +153,8 @@ public class SyncUtils { configuration.setString(FlinkOptions.PARTITION_PATH_FIELD, partitionPath.get()); } - if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_TRACE_LATEST_OP_TS)) { - logger.info("Enable trace latest op ts"); - configuration.setString(FlinkOptions.PAYLOAD_CLASS_NAME, TraceOverwriteWithLatestAvroPayload.class.getName()); - configuration.setString(HoodieWriteConfig.WRITE_STATUS_CLASS_NAME.key(), TraceWriteStatus.class.getName()); - } + configuration.setString(FlinkOptions.PAYLOAD_CLASS_NAME, TraceOverwriteWithLatestAvroPayload.class.getName()); + configuration.setString(HoodieWriteConfig.WRITE_STATUS_CLASS_NAME.key(), TraceWriteStatus.class.getName()); configuration.setBoolean(FlinkOptions.METADATA_ENABLED, false); configuration.setInteger(HoodieStorageConfig.LOGFILE_DATA_BLOCK_MAX_SIZE.key(), Integer.MAX_VALUE); diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/configuration/TraceWriteStatus.java b/utils/sync/src/main/java/org/apache/hudi/client/TraceWriteStatus.java similarity index 95% rename from utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/configuration/TraceWriteStatus.java rename to utils/sync/src/main/java/org/apache/hudi/client/TraceWriteStatus.java index 2b18575..e1ef973 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/configuration/TraceWriteStatus.java +++ b/utils/sync/src/main/java/org/apache/hudi/client/TraceWriteStatus.java @@ -1,4 +1,4 @@ -package com.lanyuanxiaoyao.service.sync.configuration; +package org.apache.hudi.client; import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.common.Constants; @@ -6,7 +6,6 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.Map; -import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.slf4j.Logger; diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/configuration/TraceOverwriteWithLatestAvroPayload.java b/utils/sync/src/main/java/org/apache/hudi/common/model/TraceOverwriteWithLatestAvroPayload.java similarity index 74% rename from utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/configuration/TraceOverwriteWithLatestAvroPayload.java rename to utils/sync/src/main/java/org/apache/hudi/common/model/TraceOverwriteWithLatestAvroPayload.java index 8744632..50eeec2 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/configuration/TraceOverwriteWithLatestAvroPayload.java +++ b/utils/sync/src/main/java/org/apache/hudi/common/model/TraceOverwriteWithLatestAvroPayload.java @@ -1,20 +1,25 @@ -package com.lanyuanxiaoyao.service.sync.configuration; +package org.apache.hudi.common.model; import com.lanyuanxiaoyao.service.common.Constants; import java.util.HashMap; import java.util.Map; -import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.org.apache.avro.generic.GenericRecord; import org.apache.hudi.org.apache.avro.util.Utf8; +import org.apache.hudi.table.HoodieTableFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** + * 通过将latest_op_ts值写入payload进行传递,使压缩完成后可以从status中获取该值 + * Hudi默认使用{@link OverwriteWithLatestAvroPayload},由于{@link HoodieTableFactory}156行中的说明 + * 如果使用默认值,则改为使用{@link EventTimeAvroPayload},所以这里的payload直接继承EventTimeAvroPayload + * 从而符合原来的使用逻辑 + * * @author lanyuanxiaoyao * @date 2023-04-18 */ -public class TraceOverwriteWithLatestAvroPayload extends OverwriteWithLatestAvroPayload { +public class TraceOverwriteWithLatestAvroPayload extends EventTimeAvroPayload { private static final Logger logger = LoggerFactory.getLogger(TraceOverwriteWithLatestAvroPayload.class); private final String latestOpts; diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/configuration/DefaultPartitionNameKeyGenerator.java b/utils/sync/src/main/java/org/apache/hudi/keygen/DefaultPartitionNameKeyGenerator.java similarity index 96% rename from utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/configuration/DefaultPartitionNameKeyGenerator.java rename to utils/sync/src/main/java/org/apache/hudi/keygen/DefaultPartitionNameKeyGenerator.java index 19f46bd..544672a 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/configuration/DefaultPartitionNameKeyGenerator.java +++ b/utils/sync/src/main/java/org/apache/hudi/keygen/DefaultPartitionNameKeyGenerator.java @@ -1,4 +1,4 @@ -package com.lanyuanxiaoyao.service.sync.configuration; +package org.apache.hudi.keygen; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.TypedProperties;