1
0

[MINOR]Optimize hudi-client module (#1139)

This commit is contained in:
SteNicholas
2020-01-05 02:57:08 +08:00
committed by vinoth chandar
parent a733f4ef72
commit 726ae47ce2
28 changed files with 152 additions and 199 deletions

View File

@@ -219,10 +219,9 @@ public class CompactionAdminClient extends AbstractHoodieClient {
*/ */
private static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, String compactionInstant) private static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, String compactionInstant)
throws IOException { throws IOException {
HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan( return AvroUtils.deserializeCompactionPlan(
metaClient.getActiveTimeline().readPlanAsBytes( metaClient.getActiveTimeline().readPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get()); HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get());
return compactionPlan;
} }
/** /**

View File

@@ -77,7 +77,7 @@ import org.apache.spark.storage.StorageLevel;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.text.ParseException; import java.text.ParseException;
import java.util.Arrays; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -347,7 +347,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
// perform index loop up to get existing location of records // perform index loop up to get existing location of records
JavaRDD<HoodieRecord<T>> taggedRecords = index.tagLocation(dedupedRecords, jsc, table); JavaRDD<HoodieRecord<T>> taggedRecords = index.tagLocation(dedupedRecords, jsc, table);
// filter out non existant keys/records // filter out non existant keys/records
JavaRDD<HoodieRecord<T>> taggedValidRecords = taggedRecords.filter(record -> record.isCurrentLocationKnown()); JavaRDD<HoodieRecord<T>> taggedValidRecords = taggedRecords.filter(HoodieRecord::isCurrentLocationKnown);
if (!taggedValidRecords.isEmpty()) { if (!taggedValidRecords.isEmpty()) {
metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
indexTimer = null; indexTimer = null;
@@ -392,7 +392,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table, fileIDPrefixes), true) .mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table, fileIDPrefixes), true)
.flatMap(writeStatuses -> writeStatuses.iterator()); .flatMap(List::iterator);
return updateIndexAndCommitIfNeeded(writeStatusRDD, table, commitTime); return updateIndexAndCommitIfNeeded(writeStatusRDD, table, commitTime);
} }
@@ -424,14 +424,14 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
throws HoodieCommitException { throws HoodieCommitException {
try { try {
HoodieCommitMetadata metadata = new HoodieCommitMetadata(); HoodieCommitMetadata metadata = new HoodieCommitMetadata();
profile.getPartitionPaths().stream().forEach(path -> { profile.getPartitionPaths().forEach(path -> {
WorkloadStat partitionStat = profile.getWorkloadStat(path.toString()); WorkloadStat partitionStat = profile.getWorkloadStat(path.toString());
partitionStat.getUpdateLocationToCount().entrySet().stream().forEach(entry -> { partitionStat.getUpdateLocationToCount().forEach((key, value) -> {
HoodieWriteStat writeStat = new HoodieWriteStat(); HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId(entry.getKey()); writeStat.setFileId(key);
// TODO : Write baseCommitTime is possible here ? // TODO : Write baseCommitTime is possible here ?
writeStat.setPrevCommit(entry.getValue().getKey()); writeStat.setPrevCommit(value.getKey());
writeStat.setNumUpdateWrites(entry.getValue().getValue()); writeStat.setNumUpdateWrites(value.getValue());
metadata.addWriteStat(path.toString(), writeStat); metadata.addWriteStat(path.toString(), writeStat);
}); });
}); });
@@ -804,7 +804,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
ImmutableMap.Builder<String, List<HoodieRollbackStat>> instantsToStats = ImmutableMap.builder(); ImmutableMap.Builder<String, List<HoodieRollbackStat>> instantsToStats = ImmutableMap.builder();
table.getActiveTimeline().createNewInstant( table.getActiveTimeline().createNewInstant(
new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRollbackInstant)); new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRollbackInstant));
instantsToRollback.stream().forEach(instant -> { instantsToRollback.forEach(instant -> {
try { try {
switch (instant.getAction()) { switch (instant.getAction()) {
case HoodieTimeline.COMMIT_ACTION: case HoodieTimeline.COMMIT_ACTION:
@@ -850,7 +850,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
// Check if any of the commits is a savepoint - do not allow rollback on those commits // Check if any of the commits is a savepoint - do not allow rollback on those commits
List<String> savepoints = table.getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp) List<String> savepoints = table.getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList()); .collect(Collectors.toList());
savepoints.stream().forEach(s -> { savepoints.forEach(s -> {
if (s.contains(commitToRollback)) { if (s.contains(commitToRollback)) {
throw new HoodieRollbackException( throw new HoodieRollbackException(
"Could not rollback a savepointed commit. Delete savepoint first before rolling back" + s); "Could not rollback a savepointed commit. Delete savepoint first before rolling back" + s);
@@ -864,19 +864,18 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
// Make sure only the last n commits are being rolled back // Make sure only the last n commits are being rolled back
// If there is a commit in-between or after that is not rolled back, then abort // If there is a commit in-between or after that is not rolled back, then abort
String lastCommit = commitToRollback;
if ((lastCommit != null) && !commitTimeline.empty() if ((commitToRollback != null) && !commitTimeline.empty()
&& !commitTimeline.findInstantsAfter(lastCommit, Integer.MAX_VALUE).empty()) { && !commitTimeline.findInstantsAfter(commitToRollback, Integer.MAX_VALUE).empty()) {
throw new HoodieRollbackException( throw new HoodieRollbackException(
"Found commits after time :" + lastCommit + ", please rollback greater commits first"); "Found commits after time :" + commitToRollback + ", please rollback greater commits first");
} }
List<String> inflights = inflightAndRequestedCommitTimeline.getInstants().map(HoodieInstant::getTimestamp) List<String> inflights = inflightAndRequestedCommitTimeline.getInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList()); .collect(Collectors.toList());
if ((lastCommit != null) && !inflights.isEmpty() && (inflights.indexOf(lastCommit) != inflights.size() - 1)) { if ((commitToRollback != null) && !inflights.isEmpty() && (inflights.indexOf(commitToRollback) != inflights.size() - 1)) {
throw new HoodieRollbackException( throw new HoodieRollbackException(
"Found in-flight commits after time :" + lastCommit + ", please rollback greater commits first"); "Found in-flight commits after time :" + commitToRollback + ", please rollback greater commits first");
} }
List<HoodieRollbackStat> stats = table.rollback(jsc, instantToRollback, true); List<HoodieRollbackStat> stats = table.rollback(jsc, instantToRollback, true);
@@ -895,7 +894,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
List<String> commitsToRollback, final String startRollbackTime) throws IOException { List<String> commitsToRollback, final String startRollbackTime) throws IOException {
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
Option<Long> durationInMs = Option.empty(); Option<Long> durationInMs = Option.empty();
Long numFilesDeleted = rollbackStats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum(); long numFilesDeleted = rollbackStats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum();
if (context != null) { if (context != null) {
durationInMs = Option.of(metrics.getDurationInMs(context.stop())); durationInMs = Option.of(metrics.getDurationInMs(context.stop()));
metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted); metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted);
@@ -923,7 +922,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
List<String> commitsToRollback, final String startRestoreTime, final String restoreToInstant) throws IOException { List<String> commitsToRollback, final String startRestoreTime, final String restoreToInstant) throws IOException {
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
Option<Long> durationInMs = Option.empty(); Option<Long> durationInMs = Option.empty();
Long numFilesDeleted = 0L; long numFilesDeleted = 0L;
for (Map.Entry<String, List<HoodieRollbackStat>> commitToStat : commitToStats.entrySet()) { for (Map.Entry<String, List<HoodieRollbackStat>> commitToStat : commitToStats.entrySet()) {
List<HoodieRollbackStat> stats = commitToStat.getValue(); List<HoodieRollbackStat> stats = commitToStat.getValue();
numFilesDeleted = stats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum(); numFilesDeleted = stats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum();
@@ -962,7 +961,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
if (rollbackInstantOpt.isPresent()) { if (rollbackInstantOpt.isPresent()) {
List<HoodieRollbackStat> stats = doRollbackAndGetStats(rollbackInstantOpt.get()); List<HoodieRollbackStat> stats = doRollbackAndGetStats(rollbackInstantOpt.get());
finishRollback(context, stats, Arrays.asList(commitToRollback), startRollbackTime); finishRollback(context, stats, Collections.singletonList(commitToRollback), startRollbackTime);
} }
} catch (IOException e) { } catch (IOException e) {
throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitToRollback, throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitToRollback,
@@ -1124,7 +1123,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
} }
/** /**
* Deduplicate Hoodie records, using the given deduplication funciton. * Deduplicate Hoodie records, using the given deduplication function.
*/ */
JavaRDD<HoodieRecord<T>> deduplicateRecords(JavaRDD<HoodieRecord<T>> records, int parallelism) { JavaRDD<HoodieRecord<T>> deduplicateRecords(JavaRDD<HoodieRecord<T>> records, int parallelism) {
boolean isIndexingGlobal = index.isGlobal(); boolean isIndexingGlobal = index.isGlobal();
@@ -1144,7 +1143,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
} }
/** /**
* Deduplicate Hoodie records, using the given deduplication funciton. * Deduplicate Hoodie records, using the given deduplication function.
*/ */
JavaRDD<HoodieKey> deduplicateKeys(JavaRDD<HoodieKey> keys, int parallelism) { JavaRDD<HoodieKey> deduplicateKeys(JavaRDD<HoodieKey> keys, int parallelism) {
boolean isIndexingGlobal = index.isGlobal(); boolean isIndexingGlobal = index.isGlobal();
@@ -1342,9 +1341,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
// Copy extraMetadata // Copy extraMetadata
extraMetadata.ifPresent(m -> { extraMetadata.ifPresent(m -> {
m.entrySet().stream().forEach(e -> { m.forEach(metadata::addMetadata);
metadata.addMetadata(e.getKey(), e.getValue());
});
}); });
LOG.info("Committing Compaction {}. Finished with result {}", compactionCommitTime, metadata); LOG.info("Committing Compaction {}. Finished with result {}", compactionCommitTime, metadata);

View File

@@ -83,10 +83,9 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
* Min and Max for HBASE_QPS_FRACTION_PROP to stabilize skewed volume workloads. * Min and Max for HBASE_QPS_FRACTION_PROP to stabilize skewed volume workloads.
*/ */
public static final String HBASE_MIN_QPS_FRACTION_PROP = "hoodie.index.hbase.min.qps.fraction"; public static final String HBASE_MIN_QPS_FRACTION_PROP = "hoodie.index.hbase.min.qps.fraction";
public static final String DEFAULT_HBASE_MIN_QPS_FRACTION_PROP = "0.002";
public static final String HBASE_MAX_QPS_FRACTION_PROP = "hoodie.index.hbase.max.qps.fraction"; public static final String HBASE_MAX_QPS_FRACTION_PROP = "hoodie.index.hbase.max.qps.fraction";
public static final String DEFAULT_HBASE_MAX_QPS_FRACTION_PROP = "0.06";
/** /**
* Hoodie index desired puts operation time in seconds. * Hoodie index desired puts operation time in seconds.
*/ */
@@ -115,12 +114,9 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
private final Properties props = new Properties(); private final Properties props = new Properties();
public HoodieHBaseIndexConfig.Builder fromFile(File propertiesFile) throws IOException { public HoodieHBaseIndexConfig.Builder fromFile(File propertiesFile) throws IOException {
FileReader reader = new FileReader(propertiesFile); try (FileReader reader = new FileReader(propertiesFile)) {
try {
this.props.load(reader); this.props.load(reader);
return this; return this;
} finally {
reader.close();
} }
} }
@@ -194,6 +190,11 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
return this; return this;
} }
public Builder hbaseIndexSleepMsBetweenGetBatch(int sleepMsBetweenGetBatch) {
props.setProperty(HBASE_SLEEP_MS_GET_BATCH_PROP, String.valueOf(sleepMsBetweenGetBatch));
return this;
}
public Builder withQPSResourceAllocatorType(String qpsResourceAllocatorClass) { public Builder withQPSResourceAllocatorType(String qpsResourceAllocatorClass) {
props.setProperty(HBASE_INDEX_QPS_ALLOCATOR_CLASS, qpsResourceAllocatorClass); props.setProperty(HBASE_INDEX_QPS_ALLOCATOR_CLASS, qpsResourceAllocatorClass);
return this; return this;
@@ -217,7 +218,7 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
/** /**
* <p> * <p>
* Method to set maximum QPS allowed per Region Server. This should be same across various jobs. This is intended to * Method to set maximum QPS allowed per Region Server. This should be same across various jobs. This is intended to
* limit the aggregate QPS generated across various jobs to an Hbase Region Server. * limit the aggregate QPS generated across various jobs to an HBase Region Server.
* </p> * </p>
* <p> * <p>
* It is recommended to set this value based on your global indexing throughput needs and most importantly, how much * It is recommended to set this value based on your global indexing throughput needs and most importantly, how much
@@ -238,7 +239,7 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_PROP), HBASE_PUT_BATCH_SIZE_PROP, setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_PROP), HBASE_PUT_BATCH_SIZE_PROP,
String.valueOf(DEFAULT_HBASE_BATCH_SIZE)); String.valueOf(DEFAULT_HBASE_BATCH_SIZE));
setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP), setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP),
HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP, String.valueOf(DEFAULT_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE)); HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP, DEFAULT_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE);
setDefaultOnCondition(props, !props.containsKey(HBASE_QPS_FRACTION_PROP), HBASE_QPS_FRACTION_PROP, setDefaultOnCondition(props, !props.containsKey(HBASE_QPS_FRACTION_PROP), HBASE_QPS_FRACTION_PROP,
String.valueOf(DEFAULT_HBASE_QPS_FRACTION)); String.valueOf(DEFAULT_HBASE_QPS_FRACTION));
setDefaultOnCondition(props, !props.containsKey(HBASE_MAX_QPS_PER_REGION_SERVER_PROP), setDefaultOnCondition(props, !props.containsKey(HBASE_MAX_QPS_PER_REGION_SERVER_PROP),
@@ -250,7 +251,7 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS), setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS),
HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS, String.valueOf(DEFAULT_HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS)); HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS, String.valueOf(DEFAULT_HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS));
setDefaultOnCondition(props, !props.containsKey(HBASE_ZK_PATH_QPS_ROOT), HBASE_ZK_PATH_QPS_ROOT, setDefaultOnCondition(props, !props.containsKey(HBASE_ZK_PATH_QPS_ROOT), HBASE_ZK_PATH_QPS_ROOT,
String.valueOf(DEFAULT_HBASE_ZK_PATH_QPS_ROOT)); DEFAULT_HBASE_ZK_PATH_QPS_ROOT);
setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS), setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS),
HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS, String.valueOf(DEFAULT_ZK_SESSION_TIMEOUT_MS)); HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS, String.valueOf(DEFAULT_ZK_SESSION_TIMEOUT_MS));
setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS), setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS),

View File

@@ -90,12 +90,9 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
private final Properties props = new Properties(); private final Properties props = new Properties();
public Builder fromFile(File propertiesFile) throws IOException { public Builder fromFile(File propertiesFile) throws IOException {
FileReader reader = new FileReader(propertiesFile); try (FileReader reader = new FileReader(propertiesFile)) {
try {
this.props.load(reader); this.props.load(reader);
return this; return this;
} finally {
reader.close();
} }
} }

View File

@@ -76,12 +76,9 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig {
private final Properties props = new Properties(); private final Properties props = new Properties();
public Builder fromFile(File propertiesFile) throws IOException { public Builder fromFile(File propertiesFile) throws IOException {
FileReader reader = new FileReader(propertiesFile); try (FileReader reader = new FileReader(propertiesFile)) {
try {
this.props.load(reader); this.props.load(reader);
return this; return this;
} finally {
reader.close();
} }
} }
@@ -141,9 +138,9 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig {
// 0.6 is the default value used by Spark, // 0.6 is the default value used by Spark,
// look at {@link // look at {@link
// https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkConf.scala#L507} // https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkConf.scala#L507}
double memoryFraction = Double.valueOf( double memoryFraction = Double.parseDouble(
SparkEnv.get().conf().get(SPARK_EXECUTOR_MEMORY_FRACTION_PROP, DEFAULT_SPARK_EXECUTOR_MEMORY_FRACTION)); SparkEnv.get().conf().get(SPARK_EXECUTOR_MEMORY_FRACTION_PROP, DEFAULT_SPARK_EXECUTOR_MEMORY_FRACTION));
double maxMemoryFractionForMerge = Double.valueOf(maxMemoryFraction); double maxMemoryFractionForMerge = Double.parseDouble(maxMemoryFraction);
double userAvailableMemory = executorMemoryInBytes * (1 - memoryFraction); double userAvailableMemory = executorMemoryInBytes * (1 - memoryFraction);
long maxMemoryForMerge = (long) Math.floor(userAvailableMemory * maxMemoryFractionForMerge); long maxMemoryForMerge = (long) Math.floor(userAvailableMemory * maxMemoryFractionForMerge);
return Math.max(DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES, maxMemoryForMerge); return Math.max(DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES, maxMemoryForMerge);

View File

@@ -70,12 +70,9 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig {
private final Properties props = new Properties(); private final Properties props = new Properties();
public Builder fromFile(File propertiesFile) throws IOException { public Builder fromFile(File propertiesFile) throws IOException {
FileReader reader = new FileReader(propertiesFile); try (FileReader reader = new FileReader(propertiesFile)) {
try {
this.props.load(reader); this.props.load(reader);
return this; return this;
} finally {
reader.close();
} }
} }

View File

@@ -66,12 +66,9 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
private final Properties props = new Properties(); private final Properties props = new Properties();
public Builder fromFile(File propertiesFile) throws IOException { public Builder fromFile(File propertiesFile) throws IOException {
FileReader reader = new FileReader(propertiesFile); try (FileReader reader = new FileReader(propertiesFile)) {
try {
this.props.load(reader); this.props.load(reader);
return this; return this;
} finally {
reader.close();
} }
} }

View File

@@ -335,11 +335,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
} }
public int getHbaseIndexGetBatchSize() { public int getHbaseIndexGetBatchSize() {
return Integer.valueOf(props.getProperty(HoodieHBaseIndexConfig.HBASE_GET_BATCH_SIZE_PROP)); return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_GET_BATCH_SIZE_PROP));
} }
public int getHbaseIndexPutBatchSize() { public int getHbaseIndexPutBatchSize() {
return Integer.valueOf(props.getProperty(HoodieHBaseIndexConfig.HBASE_PUT_BATCH_SIZE_PROP)); return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_PUT_BATCH_SIZE_PROP));
} }
public Boolean getHbaseIndexPutBatchSizeAutoCompute() { public Boolean getHbaseIndexPutBatchSizeAutoCompute() {
@@ -363,11 +363,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
} }
public boolean getHBaseIndexShouldComputeQPSDynamically() { public boolean getHBaseIndexShouldComputeQPSDynamically() {
return Boolean.valueOf(props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY)); return Boolean.parseBoolean(props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY));
} }
public int getHBaseIndexDesiredPutsTime() { public int getHBaseIndexDesiredPutsTime() {
return Integer.valueOf(props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS)); return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS));
} }
public String getBloomFilterType() { public String getBloomFilterType() {
@@ -455,7 +455,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
} }
public double getParquetCompressionRatio() { public double getParquetCompressionRatio() {
return Double.valueOf(props.getProperty(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO)); return Double.parseDouble(props.getProperty(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO));
} }
public CompressionCodecName getParquetCompressionCodec() { public CompressionCodecName getParquetCompressionCodec() {
@@ -463,7 +463,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
} }
public double getLogFileToParquetCompressionRatio() { public double getLogFileToParquetCompressionRatio() {
return Double.valueOf(props.getProperty(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO)); return Double.parseDouble(props.getProperty(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO));
} }
/** /**
@@ -517,7 +517,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
} }
public int getMaxDFSStreamBufferSize() { public int getMaxDFSStreamBufferSize() {
return Integer.valueOf(props.getProperty(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP)); return Integer.parseInt(props.getProperty(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP));
} }
public String getSpillableMapBasePath() { public String getSpillableMapBasePath() {
@@ -525,7 +525,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
} }
public double getWriteStatusFailureFraction() { public double getWriteStatusFailureFraction() {
return Double.valueOf(props.getProperty(HoodieMemoryConfig.WRITESTATUS_FAILURE_FRACTION_PROP)); return Double.parseDouble(props.getProperty(HoodieMemoryConfig.WRITESTATUS_FAILURE_FRACTION_PROP));
} }
public ConsistencyGuardConfig getConsistencyGuardConfig() { public ConsistencyGuardConfig getConsistencyGuardConfig() {
@@ -564,12 +564,9 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private boolean isConsistencyGuardSet = false; private boolean isConsistencyGuardSet = false;
public Builder fromFile(File propertiesFile) throws IOException { public Builder fromFile(File propertiesFile) throws IOException {
FileReader reader = new FileReader(propertiesFile); try (FileReader reader = new FileReader(propertiesFile)) {
try {
this.props.load(reader); this.props.load(reader);
return this; return this;
} finally {
reader.close();
} }
} }

View File

@@ -65,6 +65,8 @@ public class BloomIndexFileInfo implements Serializable {
* Does the given key fall within the range (inclusive). * Does the given key fall within the range (inclusive).
*/ */
public boolean isKeyInRange(String recordKey) { public boolean isKeyInRange(String recordKey) {
assert minRecordKey != null;
assert maxRecordKey != null;
return minRecordKey.compareTo(recordKey) <= 0 && maxRecordKey.compareTo(recordKey) >= 0; return minRecordKey.compareTo(recordKey) <= 0 && maxRecordKey.compareTo(recordKey) >= 0;
} }

View File

@@ -180,10 +180,10 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
.mapToPair(t -> t).countByKey(); .mapToPair(t -> t).countByKey();
} else { } else {
fileToComparisons = new HashMap<>(); fileToComparisons = new HashMap<>();
partitionToFileInfo.entrySet().stream().forEach(e -> { partitionToFileInfo.forEach((key, value) -> {
for (BloomIndexFileInfo fileInfo : e.getValue()) { for (BloomIndexFileInfo fileInfo : value) {
// each file needs to be compared against all the records coming into the partition // each file needs to be compared against all the records coming into the partition
fileToComparisons.put(fileInfo.getFileId(), recordsPerPartition.get(e.getKey())); fileToComparisons.put(fileInfo.getFileId(), recordsPerPartition.get(key));
} }
}); });
} }

View File

@@ -46,12 +46,10 @@ class IntervalTreeBasedGlobalIndexFileFilter implements IndexFileFilter {
IntervalTreeBasedGlobalIndexFileFilter(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo) { IntervalTreeBasedGlobalIndexFileFilter(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo) {
List<BloomIndexFileInfo> allIndexFiles = new ArrayList<>(); List<BloomIndexFileInfo> allIndexFiles = new ArrayList<>();
partitionToFileIndexInfo.forEach((parition, bloomIndexFileInfoList) -> { partitionToFileIndexInfo.forEach((parition, bloomIndexFileInfoList) -> bloomIndexFileInfoList.forEach(file -> {
bloomIndexFileInfoList.forEach(file -> {
fileIdToPartitionPathMap.put(file.getFileId(), parition); fileIdToPartitionPathMap.put(file.getFileId(), parition);
allIndexFiles.add(file); allIndexFiles.add(file);
}); }));
});
// Note that the interval tree implementation doesn't have auto-balancing to ensure logN search time. // Note that the interval tree implementation doesn't have auto-balancing to ensure logN search time.
// So, we are shuffling the input here hoping the tree will not have any skewness. If not, the tree could be skewed // So, we are shuffling the input here hoping the tree will not have any skewness. If not, the tree could be skewed

View File

@@ -39,13 +39,11 @@ class ListBasedGlobalIndexFileFilter extends ListBasedIndexFileFilter {
@Override @Override
public Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey) { public Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey) {
Set<Pair<String, String>> toReturn = new HashSet<>(); Set<Pair<String, String>> toReturn = new HashSet<>();
partitionToFileIndexInfo.forEach((partition, bloomIndexFileInfoList) -> { partitionToFileIndexInfo.forEach((partition, bloomIndexFileInfoList) -> bloomIndexFileInfoList.forEach(file -> {
bloomIndexFileInfoList.forEach(file -> {
if (shouldCompareWithFile(file, recordKey)) { if (shouldCompareWithFile(file, recordKey)) {
toReturn.add(Pair.of(partition, file.getFileId())); toReturn.add(Pair.of(partition, file.getFileId()));
} }
}); }));
});
return toReturn; return toReturn;
} }
} }

View File

@@ -29,7 +29,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieHBaseIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieDependentSystemUnavailableException; import org.apache.hudi.exception.HoodieDependentSystemUnavailableException;
import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.exception.HoodieIndexException;
@@ -39,6 +39,7 @@ import org.apache.hudi.table.HoodieTable;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -89,7 +90,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
private int maxQpsPerRegionServer; private int maxQpsPerRegionServer;
/** /**
* multiPutBatchSize will be computed and re-set in updateLocation if * multiPutBatchSize will be computed and re-set in updateLocation if
* {@link HoodieIndexConfig.HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP} is set to true. * {@link HoodieHBaseIndexConfig#HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP} is set to true.
*/ */
private Integer multiPutBatchSize; private Integer multiPutBatchSize;
private Integer numRegionServersForTable; private Integer numRegionServersForTable;
@@ -115,9 +116,8 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
public HBaseIndexQPSResourceAllocator createQPSResourceAllocator(HoodieWriteConfig config) { public HBaseIndexQPSResourceAllocator createQPSResourceAllocator(HoodieWriteConfig config) {
try { try {
LOG.info("createQPSResourceAllocator : {}", config.getHBaseQPSResourceAllocatorClass()); LOG.info("createQPSResourceAllocator : {}", config.getHBaseQPSResourceAllocatorClass());
final HBaseIndexQPSResourceAllocator resourceAllocator = (HBaseIndexQPSResourceAllocator) ReflectionUtils return (HBaseIndexQPSResourceAllocator) ReflectionUtils
.loadClass(config.getHBaseQPSResourceAllocatorClass(), config); .loadClass(config.getHBaseQPSResourceAllocatorClass(), config);
return resourceAllocator;
} catch (Exception e) { } catch (Exception e) {
LOG.warn("error while instantiating HBaseIndexQPSResourceAllocator", e); LOG.warn("error while instantiating HBaseIndexQPSResourceAllocator", e);
} }
@@ -149,20 +149,17 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
} }
/** /**
* Since we are sharing the HbaseConnection across tasks in a JVM, make sure the HbaseConnectio is closed when JVM * Since we are sharing the HBaseConnection across tasks in a JVM, make sure the HBaseConnection is closed when JVM
* exits. * exits.
*/ */
private void addShutDownHook() { private void addShutDownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() { Runtime.getRuntime().addShutdownHook(new Thread(() -> {
@Override
public void run() {
try { try {
hbaseConnection.close(); hbaseConnection.close();
} catch (Exception e) { } catch (Exception e) {
// fail silently for any sort of exception // fail silently for any sort of exception
} }
} }));
});
} }
/** /**
@@ -197,7 +194,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
return (Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<HoodieRecord<T>>>) (partitionNum, return (Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<HoodieRecord<T>>>) (partitionNum,
hoodieRecordIterator) -> { hoodieRecordIterator) -> {
Integer multiGetBatchSize = config.getHbaseIndexGetBatchSize(); int multiGetBatchSize = config.getHbaseIndexGetBatchSize();
// Grab the global HBase connection // Grab the global HBase connection
synchronized (HBaseIndex.class) { synchronized (HBaseIndex.class) {
@@ -485,7 +482,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
try (Connection conn = getHBaseConnection()) { try (Connection conn = getHBaseConnection()) {
RegionLocator regionLocator = conn.getRegionLocator(TableName.valueOf(tableName)); RegionLocator regionLocator = conn.getRegionLocator(TableName.valueOf(tableName));
numRegionServersForTable = Math numRegionServersForTable = Math
.toIntExact(regionLocator.getAllRegionLocations().stream().map(e -> e.getServerName()).distinct().count()); .toIntExact(regionLocator.getAllRegionLocations().stream().map(HRegionLocation::getServerName).distinct().count());
return numRegionServersForTable; return numRegionServersForTable;
} catch (IOException e) { } catch (IOException e) {
LOG.error("Error while connecting HBase:", e); LOG.error("Error while connecting HBase:", e);

View File

@@ -207,7 +207,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
recordList.clear(); recordList.clear();
} }
if (keysToDelete.size() > 0) { if (keysToDelete.size() > 0) {
writer = writer.appendBlock(new HoodieDeleteBlock(keysToDelete.stream().toArray(HoodieKey[]::new), header)); writer = writer.appendBlock(new HoodieDeleteBlock(keysToDelete.toArray(new HoodieKey[keysToDelete.size()]), header));
keysToDelete.clear(); keysToDelete.clear();
} }
} catch (Exception e) { } catch (Exception e) {

View File

@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieDataFile; import org.apache.hudi.common.model.HoodieDataFile;
import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.HoodieTimeline;
@@ -101,14 +102,11 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> implements Seri
&& (cleanMetadata.getEarliestCommitToRetain().length() > 0)) { && (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed " LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
+ "since last cleaned at {}. New Instant to retain : {}", cleanMetadata.getEarliestCommitToRetain(), newInstantToRetain); + "since last cleaned at {}. New Instant to retain : {}", cleanMetadata.getEarliestCommitToRetain(), newInstantToRetain);
return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(instant -> { return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
return HoodieTimeline.compareTimestamps(instant.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
HoodieTimeline.GREATER_OR_EQUAL) && HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_OR_EQUAL) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
newInstantToRetain.get().getTimestamp(), HoodieTimeline.LESSER); newInstantToRetain.get().getTimestamp(), HoodieTimeline.LESSER)).flatMap(instant -> {
}).flatMap(instant -> {
try { try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
return commitMetadata.getPartitionToWriteStats().keySet().stream(); return commitMetadata.getPartitionToWriteStats().keySet().stream();
} catch (IOException e) { } catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e); throw new HoodieIOException(e.getMessage(), e);
@@ -127,7 +125,7 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> implements Seri
* policy is useful, if you are simply interested in querying the table, and you don't want too many versions for a * policy is useful, if you are simply interested in querying the table, and you don't want too many versions for a
* single file (i.e run it with versionsRetained = 1) * single file (i.e run it with versionsRetained = 1)
*/ */
private List<String> getFilesToCleanKeepingLatestVersions(String partitionPath) throws IOException { private List<String> getFilesToCleanKeepingLatestVersions(String partitionPath) {
LOG.info("Cleaning {}, retaining latest {} file versions. ", partitionPath, config.getCleanerFileVersionsRetained()); LOG.info("Cleaning {}, retaining latest {} file versions. ", partitionPath, config.getCleanerFileVersionsRetained());
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList()); List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
List<String> deletePaths = new ArrayList<>(); List<String> deletePaths = new ArrayList<>();
@@ -164,7 +162,7 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> implements Seri
} }
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well // If merge on read, then clean the log files for the commits as well
deletePaths.addAll(nextSlice.getLogFiles().map(file -> file.getFileName()).collect(Collectors.toList())); deletePaths.addAll(nextSlice.getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList()));
} }
} }
} }
@@ -185,7 +183,7 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> implements Seri
* <p> * <p>
* This policy is the default. * This policy is the default.
*/ */
private List<String> getFilesToCleanKeepingLatestCommits(String partitionPath) throws IOException { private List<String> getFilesToCleanKeepingLatestCommits(String partitionPath) {
int commitsRetained = config.getCleanerCommitsRetained(); int commitsRetained = config.getCleanerCommitsRetained();
LOG.info("Cleaning {}, retaining latest {} commits. ", partitionPath, commitsRetained); LOG.info("Cleaning {}, retaining latest {} commits. ", partitionPath, commitsRetained);
List<String> deletePaths = new ArrayList<>(); List<String> deletePaths = new ArrayList<>();
@@ -235,7 +233,7 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> implements Seri
aFile.ifPresent(hoodieDataFile -> deletePaths.add(hoodieDataFile.getFileName())); aFile.ifPresent(hoodieDataFile -> deletePaths.add(hoodieDataFile.getFileName()));
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well // If merge on read, then clean the log files for the commits as well
deletePaths.addAll(aSlice.getLogFiles().map(file -> file.getFileName()).collect(Collectors.toList())); deletePaths.addAll(aSlice.getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList()));
} }
} }
} }
@@ -264,7 +262,7 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> implements Seri
/** /**
* Returns files to be cleaned for the given partitionPath based on cleaning policy. * Returns files to be cleaned for the given partitionPath based on cleaning policy.
*/ */
public List<String> getDeletePaths(String partitionPath) throws IOException { public List<String> getDeletePaths(String partitionPath) {
HoodieCleaningPolicy policy = config.getCleanerPolicy(); HoodieCleaningPolicy policy = config.getCleanerPolicy();
List<String> deletePaths; List<String> deletePaths;
if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {

View File

@@ -60,6 +60,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -152,7 +153,7 @@ public class HoodieCommitArchiveLog {
} else { } else {
return new ArrayList<HoodieInstant>(); return new ArrayList<HoodieInstant>();
} }
}).flatMap(i -> i.stream()); }).flatMap(Collection::stream);
// TODO (na) : Add a way to return actions associated with a timeline and then merge/unify // TODO (na) : Add a way to return actions associated with a timeline and then merge/unify
// with logic above to avoid Stream.concats // with logic above to avoid Stream.concats
@@ -171,9 +172,7 @@ public class HoodieCommitArchiveLog {
s.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL)); s.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL));
}).filter(s -> { }).filter(s -> {
// Ensure commits >= oldest pending compaction commit is retained // Ensure commits >= oldest pending compaction commit is retained
return oldestPendingCompactionInstant.map(instant -> { return oldestPendingCompactionInstant.map(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), s.getTimestamp(), HoodieTimeline.GREATER)).orElse(true);
return HoodieTimeline.compareTimestamps(instant.getTimestamp(), s.getTimestamp(), HoodieTimeline.GREATER);
}).orElse(true);
}).limit(commitTimeline.countInstants() - minCommitsToKeep)); }).limit(commitTimeline.countInstants() - minCommitsToKeep));
} }
@@ -204,10 +203,8 @@ public class HoodieCommitArchiveLog {
} }
// Remove older meta-data from auxiliary path too // Remove older meta-data from auxiliary path too
Option<HoodieInstant> latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> { Option<HoodieInstant> latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION)
return i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION) || (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp)));
|| (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)));
}).max(Comparator.comparing(HoodieInstant::getTimestamp)));
LOG.info("Latest Committed Instant={}", latestCommitted); LOG.info("Latest Committed Instant={}", latestCommitted);
if (latestCommitted.isPresent()) { if (latestCommitted.isPresent()) {
success &= deleteAllInstantsOlderorEqualsInAuxMetaFolder(latestCommitted.get()); success &= deleteAllInstantsOlderorEqualsInAuxMetaFolder(latestCommitted.get());

View File

@@ -50,13 +50,11 @@ public class BoundedPartitionAwareCompactionStrategy extends DayBasedCompactionS
String earliestPartitionPathToCompact = String earliestPartitionPathToCompact =
dateFormat.format(getDateAtOffsetFromToday(-1 * writeConfig.getTargetPartitionsPerDayBasedCompaction())); dateFormat.format(getDateAtOffsetFromToday(-1 * writeConfig.getTargetPartitionsPerDayBasedCompaction()));
// Filter out all partitions greater than earliestPartitionPathToCompact // Filter out all partitions greater than earliestPartitionPathToCompact
List<HoodieCompactionOperation> eligibleCompactionOperations =
operations.stream().collect(Collectors.groupingBy(HoodieCompactionOperation::getPartitionPath)).entrySet() return operations.stream().collect(Collectors.groupingBy(HoodieCompactionOperation::getPartitionPath)).entrySet()
.stream().sorted(Map.Entry.comparingByKey(comparator)) .stream().sorted(Map.Entry.comparingByKey(comparator))
.filter(e -> comparator.compare(earliestPartitionPathToCompact, e.getKey()) >= 0) .filter(e -> comparator.compare(earliestPartitionPathToCompact, e.getKey()) >= 0)
.flatMap(e -> e.getValue().stream()).collect(Collectors.toList()); .flatMap(e -> e.getValue().stream()).collect(Collectors.toList());
return eligibleCompactionOperations;
} }
@Override @Override
@@ -65,10 +63,9 @@ public class BoundedPartitionAwareCompactionStrategy extends DayBasedCompactionS
String earliestPartitionPathToCompact = String earliestPartitionPathToCompact =
dateFormat.format(getDateAtOffsetFromToday(-1 * writeConfig.getTargetPartitionsPerDayBasedCompaction())); dateFormat.format(getDateAtOffsetFromToday(-1 * writeConfig.getTargetPartitionsPerDayBasedCompaction()));
// Get all partitions and sort them // Get all partitions and sort them
List<String> filteredPartitionPaths = partitionPaths.stream().map(partition -> partition.replace("/", "-")) return partitionPaths.stream().map(partition -> partition.replace("/", "-"))
.sorted(Comparator.reverseOrder()).map(partitionPath -> partitionPath.replace("-", "/")) .sorted(Comparator.reverseOrder()).map(partitionPath -> partitionPath.replace("-", "/"))
.filter(e -> comparator.compare(earliestPartitionPathToCompact, e) >= 0).collect(Collectors.toList()); .filter(e -> comparator.compare(earliestPartitionPathToCompact, e) >= 0).collect(Collectors.toList());
return filteredPartitionPaths;
} }
@VisibleForTesting @VisibleForTesting

View File

@@ -65,19 +65,17 @@ public class DayBasedCompactionStrategy extends CompactionStrategy {
List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) { List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) {
// Iterate through the operations and accept operations as long as we are within the configured target partitions // Iterate through the operations and accept operations as long as we are within the configured target partitions
// limit // limit
List<HoodieCompactionOperation> filteredList = operations.stream() return operations.stream()
.collect(Collectors.groupingBy(HoodieCompactionOperation::getPartitionPath)).entrySet().stream() .collect(Collectors.groupingBy(HoodieCompactionOperation::getPartitionPath)).entrySet().stream()
.sorted(Map.Entry.comparingByKey(comparator)).limit(writeConfig.getTargetPartitionsPerDayBasedCompaction()) .sorted(Map.Entry.comparingByKey(comparator)).limit(writeConfig.getTargetPartitionsPerDayBasedCompaction())
.flatMap(e -> e.getValue().stream()).collect(Collectors.toList()); .flatMap(e -> e.getValue().stream()).collect(Collectors.toList());
return filteredList;
} }
@Override @Override
public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths) { public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths) {
List<String> filteredPartitionPaths = allPartitionPaths.stream().map(partition -> partition.replace("/", "-")) return allPartitionPaths.stream().map(partition -> partition.replace("/", "-"))
.sorted(Comparator.reverseOrder()).map(partitionPath -> partitionPath.replace("-", "/")) .sorted(Comparator.reverseOrder()).map(partitionPath -> partitionPath.replace("-", "/"))
.collect(Collectors.toList()).subList(0, writeConfig.getTargetPartitionsPerDayBasedCompaction()); .collect(Collectors.toList()).subList(0, writeConfig.getTargetPartitionsPerDayBasedCompaction());
return filteredPartitionPaths;
} }
/** /**

View File

@@ -49,7 +49,7 @@ public class LogFileSizeBasedCompactionStrategy extends BoundedIOCompactionStrat
// Total size of all the log files // Total size of all the log files
Long totalLogFileSize = logFiles.stream().map(HoodieLogFile::getFileSize).filter(size -> size >= 0) Long totalLogFileSize = logFiles.stream().map(HoodieLogFile::getFileSize).filter(size -> size >= 0)
.reduce((size1, size2) -> size1 + size2).orElse(0L); .reduce(Long::sum).orElse(0L);
// save the metrics needed during the order // save the metrics needed during the order
metrics.put(TOTAL_LOG_FILE_SIZE, totalLogFileSize.doubleValue()); metrics.put(TOTAL_LOG_FILE_SIZE, totalLogFileSize.doubleValue());
return metrics; return metrics;

View File

@@ -41,8 +41,8 @@ public class HoodieMetrics {
public String finalizeTimerName = null; public String finalizeTimerName = null;
public String compactionTimerName = null; public String compactionTimerName = null;
public String indexTimerName = null; public String indexTimerName = null;
private HoodieWriteConfig config = null; private HoodieWriteConfig config;
private String tableName = null; private String tableName;
private Timer rollbackTimer = null; private Timer rollbackTimer = null;
private Timer cleanTimer = null; private Timer cleanTimer = null;
private Timer commitTimer = null; private Timer commitTimer = null;

View File

@@ -40,14 +40,12 @@ public class JmxMetricsReporter extends MetricsReporter {
private static final Logger LOG = LoggerFactory.getLogger(JmxMetricsReporter.class); private static final Logger LOG = LoggerFactory.getLogger(JmxMetricsReporter.class);
private final JMXConnectorServer connector; private final JMXConnectorServer connector;
private String host;
private int port;
public JmxMetricsReporter(HoodieWriteConfig config) { public JmxMetricsReporter(HoodieWriteConfig config) {
try { try {
// Check the host and port here // Check the host and port here
this.host = config.getJmxHost(); String host = config.getJmxHost();
this.port = config.getJmxPort(); int port = config.getJmxPort();
if (host == null || port == 0) { if (host == null || port == 0) {
throw new RuntimeException( throw new RuntimeException(
String.format("Jmx cannot be initialized with host[%s] and port[%s].", String.format("Jmx cannot be initialized with host[%s] and port[%s].",

View File

@@ -315,14 +315,14 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
@Override @Override
public List<HoodieCleanStat> clean(JavaSparkContext jsc, HoodieInstant cleanInstant, HoodieCleanerPlan cleanerPlan) { public List<HoodieCleanStat> clean(JavaSparkContext jsc, HoodieInstant cleanInstant, HoodieCleanerPlan cleanerPlan) {
int cleanerParallelism = Math.min( int cleanerParallelism = Math.min(
(int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(x -> x.size()).count()), (int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()),
config.getCleanerParallelism()); config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: {}", cleanerParallelism); LOG.info("Using cleanerParallelism: {}", cleanerParallelism);
List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc
.parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream() .parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream()
.flatMap(x -> x.getValue().stream().map(y -> new Tuple2<String, String>(x.getKey(), y))) .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), y)))
.collect(Collectors.toList()), cleanerParallelism) .collect(Collectors.toList()), cleanerParallelism)
.mapPartitionsToPair(deleteFilesFunc(this)).reduceByKey((e1, e2) -> e1.merge(e2)).collect(); .mapPartitionsToPair(deleteFilesFunc(this)).reduceByKey(PartitionCleanStat::merge).collect();
Map<String, PartitionCleanStat> partitionCleanStatsMap = Map<String, PartitionCleanStat> partitionCleanStatsMap =
partitionCleanStats.stream().collect(Collectors.toMap(Tuple2::_1, Tuple2::_2)); partitionCleanStats.stream().collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
@@ -348,9 +348,8 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
@Override @Override
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, HoodieInstant instant, boolean deleteInstants) public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, HoodieInstant instant, boolean deleteInstants)
throws IOException { throws IOException {
Long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
List<HoodieRollbackStat> stats = new ArrayList<>(); List<HoodieRollbackStat> stats = new ArrayList<>();
String actionType = metaClient.getCommitActionType();
HoodieActiveTimeline activeTimeline = this.getActiveTimeline(); HoodieActiveTimeline activeTimeline = this.getActiveTimeline();
if (instant.isCompleted()) { if (instant.isCompleted()) {
@@ -379,9 +378,8 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
private List<RollbackRequest> generateRollbackRequests(HoodieInstant instantToRollback) private List<RollbackRequest> generateRollbackRequests(HoodieInstant instantToRollback)
throws IOException { throws IOException {
return FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(), return FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning()).stream().map(partitionPath -> { config.shouldAssumeDatePartitioning()).stream().map(partitionPath -> RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback))
return RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback); .collect(Collectors.toList());
}).collect(Collectors.toList());
} }
@@ -541,7 +539,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
/** /**
* List of all small files to be corrected. * List of all small files to be corrected.
*/ */
List<SmallFile> smallFiles = new ArrayList<SmallFile>(); List<SmallFile> smallFiles = new ArrayList<>();
/** /**
* Total number of RDD partitions, is determined by total buckets we want to pack the incoming workload into. * Total number of RDD partitions, is determined by total buckets we want to pack the incoming workload into.
*/ */
@@ -567,7 +565,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
* Rolling stats for files. * Rolling stats for files.
*/ */
protected HoodieRollingStatMetadata rollingStatMetadata; protected HoodieRollingStatMetadata rollingStatMetadata;
protected long averageRecordSize;
UpsertPartitioner(WorkloadProfile profile) { UpsertPartitioner(WorkloadProfile profile) {
updateLocationToBucket = new HashMap<>(); updateLocationToBucket = new HashMap<>();

View File

@@ -167,7 +167,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
@Override @Override
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, HoodieInstant instant, public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, HoodieInstant instant,
boolean deleteInstants) throws IOException { boolean deleteInstants) throws IOException {
Long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
String commit = instant.getTimestamp(); String commit = instant.getTimestamp();
LOG.error("Rolling back instant {}", instant); LOG.error("Rolling back instant {}", instant);
@@ -345,13 +345,8 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
// by different spark partitions in a single batch // by different spark partitions in a single batch
Option<FileSlice> smallFileSlice = Option.fromJavaOptional(getRTFileSystemView() Option<FileSlice> smallFileSlice = Option.fromJavaOptional(getRTFileSystemView()
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false) .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false)
.filter(fileSlice -> fileSlice.getLogFiles().count() < 1 .filter(fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getDataFile().get().getFileSize() < config.getParquetSmallFileLimit())
&& fileSlice.getDataFile().get().getFileSize() < config.getParquetSmallFileLimit()) .min((FileSlice left, FileSlice right) -> left.getDataFile().get().getFileSize() < right.getDataFile().get().getFileSize() ? -1 : 1));
.sorted((FileSlice left,
FileSlice right) -> left.getDataFile().get().getFileSize() < right.getDataFile().get().getFileSize()
? -1
: 1)
.findFirst());
if (smallFileSlice.isPresent()) { if (smallFileSlice.isPresent()) {
allSmallFileSlices.add(smallFileSlice.get()); allSmallFileSlices.add(smallFileSlice.get());
} }
@@ -362,7 +357,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
getRTFileSystemView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true) getRTFileSystemView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
.collect(Collectors.toList()); .collect(Collectors.toList());
for (FileSlice fileSlice : allFileSlices) { for (FileSlice fileSlice : allFileSlices) {
if (isSmallFile(partitionPath, fileSlice)) { if (isSmallFile(fileSlice)) {
allSmallFileSlices.add(fileSlice); allSmallFileSlices.add(fileSlice);
} }
} }
@@ -374,7 +369,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
// TODO : Move logic of file name, file id, base commit time handling inside file slice // TODO : Move logic of file name, file id, base commit time handling inside file slice
String filename = smallFileSlice.getDataFile().get().getFileName(); String filename = smallFileSlice.getDataFile().get().getFileName();
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
sf.sizeBytes = getTotalFileSize(partitionPath, smallFileSlice); sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf); smallFileLocations.add(sf);
// Update the global small files list // Update the global small files list
smallFiles.add(sf); smallFiles.add(sf);
@@ -382,7 +377,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get(); HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()), sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
FSUtils.getFileIdFromLogPath(logFile.getPath())); FSUtils.getFileIdFromLogPath(logFile.getPath()));
sf.sizeBytes = getTotalFileSize(partitionPath, smallFileSlice); sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf); smallFileLocations.add(sf);
// Update the global small files list // Update the global small files list
smallFiles.add(sf); smallFiles.add(sf);
@@ -397,7 +392,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
private long getTotalFileSize(String partitionPath, FileSlice fileSlice) { private long getTotalFileSize(FileSlice fileSlice) {
if (!fileSlice.getDataFile().isPresent()) { if (!fileSlice.getDataFile().isPresent()) {
return convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList())); return convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));
} else { } else {
@@ -406,22 +401,20 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
} }
} }
private boolean isSmallFile(String partitionPath, FileSlice fileSlice) { private boolean isSmallFile(FileSlice fileSlice) {
long totalSize = getTotalFileSize(partitionPath, fileSlice); long totalSize = getTotalFileSize(fileSlice);
return totalSize < config.getParquetMaxFileSize(); return totalSize < config.getParquetMaxFileSize();
} }
// TODO (NA) : Make this static part of utility // TODO (NA) : Make this static part of utility
@VisibleForTesting @VisibleForTesting
public long convertLogFilesSizeToExpectedParquetSize(List<HoodieLogFile> hoodieLogFiles) { public long convertLogFilesSizeToExpectedParquetSize(List<HoodieLogFile> hoodieLogFiles) {
long totalSizeOfLogFiles = hoodieLogFiles.stream().map(hoodieLogFile -> hoodieLogFile.getFileSize()) long totalSizeOfLogFiles = hoodieLogFiles.stream().map(HoodieLogFile::getFileSize)
.filter(size -> size > 0).reduce((a, b) -> (a + b)).orElse(0L); .filter(size -> size > 0).reduce(Long::sum).orElse(0L);
// Here we assume that if there is no base parquet file, all log files contain only inserts. // Here we assume that if there is no base parquet file, all log files contain only inserts.
// We can then just get the parquet equivalent size of these log files, compare that with // We can then just get the parquet equivalent size of these log files, compare that with
// {@link config.getParquetMaxFileSize()} and decide if there is scope to insert more rows // {@link config.getParquetMaxFileSize()} and decide if there is scope to insert more rows
long logFilesEquivalentParquetFileSize = return (long) (totalSizeOfLogFiles * config.getLogFileToParquetCompressionRatio());
(long) (totalSizeOfLogFiles * config.getLogFileToParquetCompressionRatio());
return logFilesEquivalentParquetFileSize;
} }
} }
@@ -439,7 +432,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> { return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> {
// Filter out stats without prevCommit since they are all inserts // Filter out stats without prevCommit since they are all inserts
boolean validForRollback = (wStat != null) && (wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT) boolean validForRollback = (wStat != null) && (!wStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT))
&& (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId()); && (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId());
if (validForRollback) { if (validForRollback) {

View File

@@ -223,7 +223,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
"Could not get data files for savepoint " + savepointTime + ". No such savepoint."); "Could not get data files for savepoint " + savepointTime + ". No such savepoint.");
} }
HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime); HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
HoodieSavepointMetadata metadata = null; HoodieSavepointMetadata metadata;
try { try {
metadata = AvroUtils.deserializeHoodieSavepointMetadata(getActiveTimeline().getInstantDetails(instant).get()); metadata = AvroUtils.deserializeHoodieSavepointMetadata(getActiveTimeline().getInstantDetails(instant).get());
} catch (IOException e) { } catch (IOException e) {

View File

@@ -90,19 +90,18 @@ public class RollbackExecutor implements Serializable {
case DELETE_DATA_FILES_ONLY: { case DELETE_DATA_FILES_ONLY: {
deleteCleanedFiles(metaClient, config, filesToDeletedStatus, instantToRollback.getTimestamp(), deleteCleanedFiles(metaClient, config, filesToDeletedStatus, instantToRollback.getTimestamp(),
rollbackRequest.getPartitionPath()); rollbackRequest.getPartitionPath());
return new Tuple2<String, HoodieRollbackStat>(rollbackRequest.getPartitionPath(), return new Tuple2<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withDeletedFileResults(filesToDeletedStatus).build()); .withDeletedFileResults(filesToDeletedStatus).build());
} }
case DELETE_DATA_AND_LOG_FILES: { case DELETE_DATA_AND_LOG_FILES: {
deleteCleanedFiles(metaClient, config, filesToDeletedStatus, rollbackRequest.getPartitionPath(), filter); deleteCleanedFiles(metaClient, config, filesToDeletedStatus, rollbackRequest.getPartitionPath(), filter);
return new Tuple2<String, HoodieRollbackStat>(rollbackRequest.getPartitionPath(), return new Tuple2<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withDeletedFileResults(filesToDeletedStatus).build()); .withDeletedFileResults(filesToDeletedStatus).build());
} }
case APPEND_ROLLBACK_BLOCK: { case APPEND_ROLLBACK_BLOCK: {
Writer writer = null; Writer writer = null;
boolean success = false;
try { try {
writer = HoodieLogFormat.newWriterBuilder() writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath())) .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
@@ -114,7 +113,6 @@ public class RollbackExecutor implements Serializable {
Map<HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp()); Map<HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());
// if update belongs to an existing log file // if update belongs to an existing log file
writer = writer.appendBlock(new HoodieCommandBlock(header)); writer = writer.appendBlock(new HoodieCommandBlock(header));
success = true;
} catch (IOException | InterruptedException io) { } catch (IOException | InterruptedException io) {
throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io); throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
} finally { } finally {
@@ -131,8 +129,8 @@ public class RollbackExecutor implements Serializable {
// getFileStatus would reflect correct stats and FileNotFoundException is not thrown in // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
// cloud-storage : HUDI-168 // cloud-storage : HUDI-168
Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>(); Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>();
filesToNumBlocksRollback.put(metaClient.getFs().getFileStatus(writer.getLogFile().getPath()), 1L); filesToNumBlocksRollback.put(metaClient.getFs().getFileStatus(Preconditions.checkNotNull(writer).getLogFile().getPath()), 1L);
return new Tuple2<String, HoodieRollbackStat>(rollbackRequest.getPartitionPath(), return new Tuple2<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withRollbackBlockAppendResults(filesToNumBlocksRollback).build()); .withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
} }

View File

@@ -72,7 +72,6 @@ import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.function.Predicate; import java.util.function.Predicate;
@@ -436,8 +435,7 @@ public class TestCleaner extends TestHoodieClientBase {
final HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); final HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieCleanMetadata cleanMetadata2 = writeClient.runClean(table, HoodieCleanMetadata cleanMetadata2 = writeClient.runClean(table,
HoodieTimeline.getCleanInflightInstant(cleanInstantTs)); HoodieTimeline.getCleanInflightInstant(cleanInstantTs));
Assert.assertTrue( Assert.assertEquals(cleanMetadata1.getEarliestCommitToRetain(), cleanMetadata2.getEarliestCommitToRetain());
Objects.equals(cleanMetadata1.getEarliestCommitToRetain(), cleanMetadata2.getEarliestCommitToRetain()));
Assert.assertEquals(new Integer(0), cleanMetadata2.getTotalFilesDeleted()); Assert.assertEquals(new Integer(0), cleanMetadata2.getTotalFilesDeleted());
Assert.assertEquals(cleanMetadata1.getPartitionMetadata().keySet(), Assert.assertEquals(cleanMetadata1.getPartitionMetadata().keySet(),
cleanMetadata2.getPartitionMetadata().keySet()); cleanMetadata2.getPartitionMetadata().keySet());

View File

@@ -49,6 +49,8 @@ import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.apache.hudi.CompactionAdminClient.getRenamingActionsToAlignWithCompactionOperation;
import static org.apache.hudi.CompactionAdminClient.renameLogFile;
import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ; import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
public class TestCompactionAdminClient extends TestHoodieClientBase { public class TestCompactionAdminClient extends TestHoodieClientBase {
@@ -139,10 +141,10 @@ public class TestCompactionAdminClient extends TestHoodieClientBase {
} }
// Now repair // Now repair
List<Pair<HoodieLogFile, HoodieLogFile>> undoFiles = List<Pair<HoodieLogFile, HoodieLogFile>> undoFiles =
result.stream().flatMap(r -> client.getRenamingActionsToAlignWithCompactionOperation(metaClient, result.stream().flatMap(r -> getRenamingActionsToAlignWithCompactionOperation(metaClient,
compactionInstant, r.getOperation(), Option.empty()).stream()).map(rn -> { compactionInstant, r.getOperation(), Option.empty()).stream()).map(rn -> {
try { try {
client.renameLogFile(metaClient, rn.getKey(), rn.getValue()); renameLogFile(metaClient, rn.getKey(), rn.getValue());
} catch (IOException e) { } catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e); throw new HoodieIOException(e.getMessage(), e);
} }
@@ -248,7 +250,7 @@ public class TestCompactionAdminClient extends TestHoodieClientBase {
// Do the renaming only but do not touch the compaction plan - Needed for repair tests // Do the renaming only but do not touch the compaction plan - Needed for repair tests
renameFiles.forEach(lfPair -> { renameFiles.forEach(lfPair -> {
try { try {
client.renameLogFile(metaClient, lfPair.getLeft(), lfPair.getRight()); renameLogFile(metaClient, lfPair.getLeft(), lfPair.getRight());
} catch (IOException e) { } catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e); throw new HoodieIOException(e.getMessage(), e);
} }

View File

@@ -62,11 +62,11 @@ public class TestBucketizedBloomCheckPartitioner {
}; };
BucketizedBloomCheckPartitioner partitioner = new BucketizedBloomCheckPartitioner(100, comparisons1, 10); BucketizedBloomCheckPartitioner partitioner = new BucketizedBloomCheckPartitioner(100, comparisons1, 10);
Map<String, List<Integer>> assignments = partitioner.getFileGroupToPartitions(); Map<String, List<Integer>> assignments = partitioner.getFileGroupToPartitions();
assignments.entrySet().stream().forEach(e -> assertEquals(10, e.getValue().size())); assignments.forEach((key, value) -> assertEquals(10, value.size()));
Map<Integer, Long> partitionToNumBuckets = Map<Integer, Long> partitionToNumBuckets =
assignments.entrySet().stream().flatMap(e -> e.getValue().stream().map(p -> Pair.of(p, e.getKey()))) assignments.entrySet().stream().flatMap(e -> e.getValue().stream().map(p -> Pair.of(p, e.getKey())))
.collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting())); .collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting()));
partitionToNumBuckets.entrySet().stream().forEach(e -> assertEquals(1L, e.getValue().longValue())); partitionToNumBuckets.forEach((key, value) -> assertEquals(1L, value.longValue()));
} }
@Test @Test