From ebbe56e8622441a57fda63b0392cd6f7c265ec1e Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Wed, 18 May 2022 09:30:09 +0800 Subject: [PATCH] [minor] Some code refactoring for LogFileComparator and Instant instantiation (#5600) --- .../hudi/table/action/compact/CompactHelpers.java | 3 +-- .../org/apache/hudi/client/HoodieFlinkWriteClient.java | 2 +- .../org/apache/hudi/client/SparkRDDWriteClient.java | 6 +++--- .../org/apache/hudi/common/model/HoodieLogFile.java | 7 +++++-- .../hudi/configuration/HadoopConfigurations.java | 10 ++++++++-- 5 files changed, 18 insertions(+), 10 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java index a348eb0ed..3379d16f4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; -import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.Option; @@ -77,7 +76,7 @@ public class CompactHelpers { HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); try { activeTimeline.transitionCompactionInflightToComplete( - new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime), + HoodieTimeline.getCompactionInflightInstant(compactionCommitTime), Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); } catch (IOException e) { throw new HoodieCompactionException( diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index f62592a49..ce75452d2 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -358,7 +358,7 @@ public class HoodieFlinkWriteClient extends String compactionCommitTime) { this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName()); List writeStats = metadata.getWriteStats(); - final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); + final HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime); try { this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty()); finalizeWrite(table, compactionCommitTime, writeStats); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index df82e75db..7f9ec05e3 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -305,7 +305,7 @@ public class SparkRDDWriteClient extends String compactionCommitTime) { this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName()); List writeStats = metadata.getWriteStats(); - final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); + final HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime); try { this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty()); finalizeWrite(table, compactionCommitTime, writeStats); @@ -382,7 +382,7 @@ public class SparkRDDWriteClient extends + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(","))); } - final HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime); + final HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime); try { this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty()); @@ -393,7 +393,7 @@ public class SparkRDDWriteClient extends LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata); table.getActiveTimeline().transitionReplaceInflightToComplete( - HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime), + clusteringInstant, Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); } catch (Exception e) { throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java index 5b5a6432e..d4ad2cae1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java @@ -40,6 +40,9 @@ public class HoodieLogFile implements Serializable { public static final String DELTA_EXTENSION = ".log"; public static final Integer LOGFILE_BASE_VERSION = 1; + private static final Comparator LOG_FILE_COMPARATOR = new LogFileComparator(); + private static final Comparator LOG_FILE_COMPARATOR_REVERSED = new LogFileComparator().reversed(); + private transient FileStatus fileStatus; private final String pathStr; private long fileLen; @@ -129,11 +132,11 @@ public class HoodieLogFile implements Serializable { } public static Comparator getLogFileComparator() { - return new LogFileComparator(); + return LOG_FILE_COMPARATOR; } public static Comparator getReverseLogFileComparator() { - return new LogFileComparator().reversed(); + return LOG_FILE_COMPARATOR_REVERSED; } /** diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java index 7784e7caa..72f203115 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java @@ -23,10 +23,16 @@ import org.apache.hudi.util.FlinkClientUtil; import java.util.Map; +/** + * Utilities for fetching hadoop configurations. + */ public class HadoopConfigurations { private static final String HADOOP_PREFIX = "hadoop."; private static final String PARQUET_PREFIX = "parquet."; + /** + * Creates a merged hadoop configuration with given flink configuration and hadoop configuration. + */ public static org.apache.hadoop.conf.Configuration getParquetConf( org.apache.flink.configuration.Configuration options, org.apache.hadoop.conf.Configuration hadoopConf) { @@ -37,12 +43,12 @@ public class HadoopConfigurations { } /** - * Create a new hadoop configuration that is initialized with the given flink configuration. + * Creates a new hadoop configuration that is initialized with the given flink configuration. */ public static org.apache.hadoop.conf.Configuration getHadoopConf(Configuration conf) { org.apache.hadoop.conf.Configuration hadoopConf = FlinkClientUtil.getHadoopConf(); Map options = FlinkOptions.getPropertiesWithPrefix(conf.toMap(), HADOOP_PREFIX); - options.forEach((k, v) -> hadoopConf.set(k, v)); + options.forEach(hadoopConf::set); return hadoopConf; } }