[minor] Some code refactoring for LogFileComparator and Instant instantiation (#5600)
This commit is contained in:
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
|
|||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
|
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
@@ -77,7 +76,7 @@ public class CompactHelpers<T extends HoodieRecordPayload, I, K, O> {
|
|||||||
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
|
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
|
||||||
try {
|
try {
|
||||||
activeTimeline.transitionCompactionInflightToComplete(
|
activeTimeline.transitionCompactionInflightToComplete(
|
||||||
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime),
|
HoodieTimeline.getCompactionInflightInstant(compactionCommitTime),
|
||||||
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new HoodieCompactionException(
|
throw new HoodieCompactionException(
|
||||||
|
|||||||
@@ -358,7 +358,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
|
|||||||
String compactionCommitTime) {
|
String compactionCommitTime) {
|
||||||
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName());
|
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName());
|
||||||
List<HoodieWriteStat> writeStats = metadata.getWriteStats();
|
List<HoodieWriteStat> writeStats = metadata.getWriteStats();
|
||||||
final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime);
|
final HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime);
|
||||||
try {
|
try {
|
||||||
this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
|
this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
|
||||||
finalizeWrite(table, compactionCommitTime, writeStats);
|
finalizeWrite(table, compactionCommitTime, writeStats);
|
||||||
|
|||||||
@@ -305,7 +305,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
|||||||
String compactionCommitTime) {
|
String compactionCommitTime) {
|
||||||
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName());
|
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName());
|
||||||
List<HoodieWriteStat> writeStats = metadata.getWriteStats();
|
List<HoodieWriteStat> writeStats = metadata.getWriteStats();
|
||||||
final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime);
|
final HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime);
|
||||||
try {
|
try {
|
||||||
this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
|
this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
|
||||||
finalizeWrite(table, compactionCommitTime, writeStats);
|
finalizeWrite(table, compactionCommitTime, writeStats);
|
||||||
@@ -382,7 +382,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
|||||||
+ writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(",")));
|
+ writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(",")));
|
||||||
}
|
}
|
||||||
|
|
||||||
final HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime);
|
final HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime);
|
||||||
try {
|
try {
|
||||||
this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
|
this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
|
||||||
|
|
||||||
@@ -393,7 +393,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
|||||||
LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata);
|
LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata);
|
||||||
|
|
||||||
table.getActiveTimeline().transitionReplaceInflightToComplete(
|
table.getActiveTimeline().transitionReplaceInflightToComplete(
|
||||||
HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime),
|
clusteringInstant,
|
||||||
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e);
|
throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e);
|
||||||
|
|||||||
@@ -40,6 +40,9 @@ public class HoodieLogFile implements Serializable {
|
|||||||
public static final String DELTA_EXTENSION = ".log";
|
public static final String DELTA_EXTENSION = ".log";
|
||||||
public static final Integer LOGFILE_BASE_VERSION = 1;
|
public static final Integer LOGFILE_BASE_VERSION = 1;
|
||||||
|
|
||||||
|
private static final Comparator<HoodieLogFile> LOG_FILE_COMPARATOR = new LogFileComparator();
|
||||||
|
private static final Comparator<HoodieLogFile> LOG_FILE_COMPARATOR_REVERSED = new LogFileComparator().reversed();
|
||||||
|
|
||||||
private transient FileStatus fileStatus;
|
private transient FileStatus fileStatus;
|
||||||
private final String pathStr;
|
private final String pathStr;
|
||||||
private long fileLen;
|
private long fileLen;
|
||||||
@@ -129,11 +132,11 @@ public class HoodieLogFile implements Serializable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static Comparator<HoodieLogFile> getLogFileComparator() {
|
public static Comparator<HoodieLogFile> getLogFileComparator() {
|
||||||
return new LogFileComparator();
|
return LOG_FILE_COMPARATOR;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Comparator<HoodieLogFile> getReverseLogFileComparator() {
|
public static Comparator<HoodieLogFile> getReverseLogFileComparator() {
|
||||||
return new LogFileComparator().reversed();
|
return LOG_FILE_COMPARATOR_REVERSED;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -23,10 +23,16 @@ import org.apache.hudi.util.FlinkClientUtil;
|
|||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utilities for fetching hadoop configurations.
|
||||||
|
*/
|
||||||
public class HadoopConfigurations {
|
public class HadoopConfigurations {
|
||||||
private static final String HADOOP_PREFIX = "hadoop.";
|
private static final String HADOOP_PREFIX = "hadoop.";
|
||||||
private static final String PARQUET_PREFIX = "parquet.";
|
private static final String PARQUET_PREFIX = "parquet.";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a merged hadoop configuration with given flink configuration and hadoop configuration.
|
||||||
|
*/
|
||||||
public static org.apache.hadoop.conf.Configuration getParquetConf(
|
public static org.apache.hadoop.conf.Configuration getParquetConf(
|
||||||
org.apache.flink.configuration.Configuration options,
|
org.apache.flink.configuration.Configuration options,
|
||||||
org.apache.hadoop.conf.Configuration hadoopConf) {
|
org.apache.hadoop.conf.Configuration hadoopConf) {
|
||||||
@@ -37,12 +43,12 @@ public class HadoopConfigurations {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new hadoop configuration that is initialized with the given flink configuration.
|
* Creates a new hadoop configuration that is initialized with the given flink configuration.
|
||||||
*/
|
*/
|
||||||
public static org.apache.hadoop.conf.Configuration getHadoopConf(Configuration conf) {
|
public static org.apache.hadoop.conf.Configuration getHadoopConf(Configuration conf) {
|
||||||
org.apache.hadoop.conf.Configuration hadoopConf = FlinkClientUtil.getHadoopConf();
|
org.apache.hadoop.conf.Configuration hadoopConf = FlinkClientUtil.getHadoopConf();
|
||||||
Map<String, String> options = FlinkOptions.getPropertiesWithPrefix(conf.toMap(), HADOOP_PREFIX);
|
Map<String, String> options = FlinkOptions.getPropertiesWithPrefix(conf.toMap(), HADOOP_PREFIX);
|
||||||
options.forEach((k, v) -> hadoopConf.set(k, v));
|
options.forEach(hadoopConf::set);
|
||||||
return hadoopConf;
|
return hadoopConf;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user