diff --git a/utils/sync/src/main/java/org/apache/hudi/keygen/DefaultPartitionNameKeyGenerator.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/configuration/DefaultPartitionNameKeyGenerator.java similarity index 96% rename from utils/sync/src/main/java/org/apache/hudi/keygen/DefaultPartitionNameKeyGenerator.java rename to utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/configuration/DefaultPartitionNameKeyGenerator.java index 544672a..19f46bd 100644 --- a/utils/sync/src/main/java/org/apache/hudi/keygen/DefaultPartitionNameKeyGenerator.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/configuration/DefaultPartitionNameKeyGenerator.java @@ -1,4 +1,4 @@ -package org.apache.hudi.keygen; +package com.lanyuanxiaoyao.service.sync.configuration; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.TypedProperties; diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/ConfigurationUtils.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/ConfigurationUtils.java index d63eb1b..573d193 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/ConfigurationUtils.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/ConfigurationUtils.java @@ -7,6 +7,7 @@ import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.entity.FlinkJob; import com.lanyuanxiaoyao.service.common.entity.TableMeta; import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper; +import com.lanyuanxiaoyao.service.sync.configuration.DefaultPartitionNameKeyGenerator; import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration; import java.util.Optional; import java.util.stream.Collectors; @@ -14,7 +15,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.hudi.client.TraceWriteStatus; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.TraceOverwriteWithLatestAvroPayload; +import org.apache.hudi.common.model.TraceEventTimeAvroPayload; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.util.collection.Pair; @@ -25,7 +26,6 @@ import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.config.metrics.HoodieMetricsVictoriaConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.index.HoodieIndex; -import org.apache.hudi.keygen.DefaultPartitionNameKeyGenerator; import org.apache.hudi.metrics.MetricsReporterType; import org.apache.hudi.org.apache.avro.Schema; import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy; @@ -128,7 +128,7 @@ public class ConfigurationUtils { configuration.setString(FlinkOptions.PARTITION_PATH_FIELD, partitionPath.get()); } - configuration.setString(FlinkOptions.PAYLOAD_CLASS_NAME, TraceOverwriteWithLatestAvroPayload.class.getName()); + configuration.setString(FlinkOptions.PAYLOAD_CLASS_NAME, TraceEventTimeAvroPayload.class.getName()); configuration.setString(HoodieWriteConfig.WRITE_STATUS_CLASS_NAME.key(), TraceWriteStatus.class.getName()); configuration.setBoolean(FlinkOptions.METADATA_ENABLED, false); diff --git a/utils/sync/src/main/java/org/apache/hudi/client/TraceWriteStatus.java b/utils/sync/src/main/java/org/apache/hudi/client/TraceWriteStatus.java deleted file mode 100644 index e1ef973..0000000 --- a/utils/sync/src/main/java/org/apache/hudi/client/TraceWriteStatus.java +++ /dev/null @@ -1,57 +0,0 @@ -package org.apache.hudi.client; - -import cn.hutool.core.util.StrUtil; -import com.lanyuanxiaoyao.service.common.Constants; -import java.time.LocalDateTime; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; -import java.util.Map; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.util.Option; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author lanyuanxiaoyao - * @date 2023-04-17 - */ -public class TraceWriteStatus extends WriteStatus { - private static final Logger logger = LoggerFactory.getLogger(TraceWriteStatus.class); - private final static DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - private long latestOpts = 0L; - - public TraceWriteStatus() { - super(); - } - - public TraceWriteStatus(Boolean trackSuccessRecords, Double failureFraction) { - super(trackSuccessRecords, failureFraction); - } - - public long getLatestOpts() { - return latestOpts; - } - - @Override - public void markSuccess(HoodieRecord record, Option> optionalRecordMetadata) { - super.markSuccess(record, optionalRecordMetadata); - try { - optionalRecordMetadata.ifPresent(map -> { - if (map.containsKey(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME)) { - String inOpts = map.get(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME); - if (StrUtil.isNotBlank(inOpts)) { - long current = LocalDateTime.parse(inOpts, FORMATTER).toInstant(ZoneOffset.ofHours(8)).toEpochMilli(); - latestOpts = Long.max(latestOpts, current); - } - } - }); - } catch (Throwable throwable) { - logger.error("Parse latest opts failure", throwable); - } - } - - @Override - public void markFailure(HoodieRecord record, Throwable t, Option> optionalRecordMetadata) { - super.markFailure(record, t, optionalRecordMetadata); - } -} diff --git a/utils/sync/src/main/java/org/apache/hudi/common/model/TraceOverwriteWithLatestAvroPayload.java b/utils/sync/src/main/java/org/apache/hudi/common/model/TraceOverwriteWithLatestAvroPayload.java deleted file mode 100644 index 50eeec2..0000000 --- a/utils/sync/src/main/java/org/apache/hudi/common/model/TraceOverwriteWithLatestAvroPayload.java +++ /dev/null @@ -1,57 +0,0 @@ -package org.apache.hudi.common.model; - -import com.lanyuanxiaoyao.service.common.Constants; -import java.util.HashMap; -import java.util.Map; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.org.apache.avro.generic.GenericRecord; -import org.apache.hudi.org.apache.avro.util.Utf8; -import org.apache.hudi.table.HoodieTableFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * 通过将latest_op_ts值写入payload进行传递,使压缩完成后可以从status中获取该值 - * Hudi默认使用{@link OverwriteWithLatestAvroPayload},由于{@link HoodieTableFactory}156行中的说明 - * 如果使用默认值,则改为使用{@link EventTimeAvroPayload},所以这里的payload直接继承EventTimeAvroPayload - * 从而符合原来的使用逻辑 - * - * @author lanyuanxiaoyao - * @date 2023-04-18 - */ -public class TraceOverwriteWithLatestAvroPayload extends EventTimeAvroPayload { - private static final Logger logger = LoggerFactory.getLogger(TraceOverwriteWithLatestAvroPayload.class); - - private final String latestOpts; - - public TraceOverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) { - super(record, orderingVal); - this.latestOpts = updateLatestOpts(Option.ofNullable(record)); - } - - public TraceOverwriteWithLatestAvroPayload(Option record) { - super(record); - this.latestOpts = updateLatestOpts(record); - } - - private String updateLatestOpts(Option record) { - try { - return record - .map(r -> ((Utf8) r.get(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME)).toString()) - .orElse(null); - } catch (Throwable throwable) { - logger.error("Get latest opts failure", throwable); - } - return null; - } - - @Override - public Option> getMetadata() { - if (this.latestOpts == null) { - return Option.empty(); - } - Map metadata = super.getMetadata().orElse(new HashMap<>()); - metadata.put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, this.latestOpts); - return Option.of(metadata); - } -}