diff --git a/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java b/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java index 6927f7df7..00e0f751e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java @@ -219,10 +219,9 @@ public class CompactionAdminClient extends AbstractHoodieClient { */ private static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, String compactionInstant) throws IOException { - HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan( - metaClient.getActiveTimeline().readPlanAsBytes( - HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get()); - return compactionPlan; + return AvroUtils.deserializeCompactionPlan( + metaClient.getActiveTimeline().readPlanAsBytes( + HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get()); } /** diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java index 697dac45e..09e3f58be 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java @@ -77,7 +77,7 @@ import org.apache.spark.storage.StorageLevel; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.text.ParseException; -import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -347,7 +347,7 @@ public class HoodieWriteClient extends AbstractHo // perform index loop up to get existing location of records JavaRDD> taggedRecords = index.tagLocation(dedupedRecords, jsc, table); // filter out non existant keys/records - JavaRDD> taggedValidRecords = taggedRecords.filter(record -> record.isCurrentLocationKnown()); + JavaRDD> taggedValidRecords = taggedRecords.filter(HoodieRecord::isCurrentLocationKnown); if (!taggedValidRecords.isEmpty()) { metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); indexTimer = null; @@ -392,7 +392,7 @@ public class HoodieWriteClient extends AbstractHo JavaRDD writeStatusRDD = repartitionedRecords .mapPartitionsWithIndex(new BulkInsertMapFunction(commitTime, config, table, fileIDPrefixes), true) - .flatMap(writeStatuses -> writeStatuses.iterator()); + .flatMap(List::iterator); return updateIndexAndCommitIfNeeded(writeStatusRDD, table, commitTime); } @@ -424,14 +424,14 @@ public class HoodieWriteClient extends AbstractHo throws HoodieCommitException { try { HoodieCommitMetadata metadata = new HoodieCommitMetadata(); - profile.getPartitionPaths().stream().forEach(path -> { + profile.getPartitionPaths().forEach(path -> { WorkloadStat partitionStat = profile.getWorkloadStat(path.toString()); - partitionStat.getUpdateLocationToCount().entrySet().stream().forEach(entry -> { + partitionStat.getUpdateLocationToCount().forEach((key, value) -> { HoodieWriteStat writeStat = new HoodieWriteStat(); - writeStat.setFileId(entry.getKey()); + writeStat.setFileId(key); // TODO : Write baseCommitTime is possible here ? - writeStat.setPrevCommit(entry.getValue().getKey()); - writeStat.setNumUpdateWrites(entry.getValue().getValue()); + writeStat.setPrevCommit(value.getKey()); + writeStat.setNumUpdateWrites(value.getValue()); metadata.addWriteStat(path.toString(), writeStat); }); }); @@ -804,7 +804,7 @@ public class HoodieWriteClient extends AbstractHo ImmutableMap.Builder> instantsToStats = ImmutableMap.builder(); table.getActiveTimeline().createNewInstant( new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRollbackInstant)); - instantsToRollback.stream().forEach(instant -> { + instantsToRollback.forEach(instant -> { try { switch (instant.getAction()) { case HoodieTimeline.COMMIT_ACTION: @@ -850,7 +850,7 @@ public class HoodieWriteClient extends AbstractHo // Check if any of the commits is a savepoint - do not allow rollback on those commits List savepoints = table.getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); - savepoints.stream().forEach(s -> { + savepoints.forEach(s -> { if (s.contains(commitToRollback)) { throw new HoodieRollbackException( "Could not rollback a savepointed commit. Delete savepoint first before rolling back" + s); @@ -864,19 +864,18 @@ public class HoodieWriteClient extends AbstractHo // Make sure only the last n commits are being rolled back // If there is a commit in-between or after that is not rolled back, then abort - String lastCommit = commitToRollback; - if ((lastCommit != null) && !commitTimeline.empty() - && !commitTimeline.findInstantsAfter(lastCommit, Integer.MAX_VALUE).empty()) { + if ((commitToRollback != null) && !commitTimeline.empty() + && !commitTimeline.findInstantsAfter(commitToRollback, Integer.MAX_VALUE).empty()) { throw new HoodieRollbackException( - "Found commits after time :" + lastCommit + ", please rollback greater commits first"); + "Found commits after time :" + commitToRollback + ", please rollback greater commits first"); } List inflights = inflightAndRequestedCommitTimeline.getInstants().map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); - if ((lastCommit != null) && !inflights.isEmpty() && (inflights.indexOf(lastCommit) != inflights.size() - 1)) { + if ((commitToRollback != null) && !inflights.isEmpty() && (inflights.indexOf(commitToRollback) != inflights.size() - 1)) { throw new HoodieRollbackException( - "Found in-flight commits after time :" + lastCommit + ", please rollback greater commits first"); + "Found in-flight commits after time :" + commitToRollback + ", please rollback greater commits first"); } List stats = table.rollback(jsc, instantToRollback, true); @@ -895,7 +894,7 @@ public class HoodieWriteClient extends AbstractHo List commitsToRollback, final String startRollbackTime) throws IOException { HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); Option durationInMs = Option.empty(); - Long numFilesDeleted = rollbackStats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum(); + long numFilesDeleted = rollbackStats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum(); if (context != null) { durationInMs = Option.of(metrics.getDurationInMs(context.stop())); metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted); @@ -923,7 +922,7 @@ public class HoodieWriteClient extends AbstractHo List commitsToRollback, final String startRestoreTime, final String restoreToInstant) throws IOException { HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); Option durationInMs = Option.empty(); - Long numFilesDeleted = 0L; + long numFilesDeleted = 0L; for (Map.Entry> commitToStat : commitToStats.entrySet()) { List stats = commitToStat.getValue(); numFilesDeleted = stats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum(); @@ -962,7 +961,7 @@ public class HoodieWriteClient extends AbstractHo if (rollbackInstantOpt.isPresent()) { List stats = doRollbackAndGetStats(rollbackInstantOpt.get()); - finishRollback(context, stats, Arrays.asList(commitToRollback), startRollbackTime); + finishRollback(context, stats, Collections.singletonList(commitToRollback), startRollbackTime); } } catch (IOException e) { throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitToRollback, @@ -1124,7 +1123,7 @@ public class HoodieWriteClient extends AbstractHo } /** - * Deduplicate Hoodie records, using the given deduplication funciton. + * Deduplicate Hoodie records, using the given deduplication function. */ JavaRDD> deduplicateRecords(JavaRDD> records, int parallelism) { boolean isIndexingGlobal = index.isGlobal(); @@ -1144,7 +1143,7 @@ public class HoodieWriteClient extends AbstractHo } /** - * Deduplicate Hoodie records, using the given deduplication funciton. + * Deduplicate Hoodie records, using the given deduplication function. */ JavaRDD deduplicateKeys(JavaRDD keys, int parallelism) { boolean isIndexingGlobal = index.isGlobal(); @@ -1342,9 +1341,7 @@ public class HoodieWriteClient extends AbstractHo // Copy extraMetadata extraMetadata.ifPresent(m -> { - m.entrySet().stream().forEach(e -> { - metadata.addMetadata(e.getKey(), e.getValue()); - }); + m.forEach(metadata::addMetadata); }); LOG.info("Committing Compaction {}. Finished with result {}", compactionCommitTime, metadata); diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java index 73dfabd55..714aca409 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java @@ -83,10 +83,9 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig { * Min and Max for HBASE_QPS_FRACTION_PROP to stabilize skewed volume workloads. */ public static final String HBASE_MIN_QPS_FRACTION_PROP = "hoodie.index.hbase.min.qps.fraction"; - public static final String DEFAULT_HBASE_MIN_QPS_FRACTION_PROP = "0.002"; public static final String HBASE_MAX_QPS_FRACTION_PROP = "hoodie.index.hbase.max.qps.fraction"; - public static final String DEFAULT_HBASE_MAX_QPS_FRACTION_PROP = "0.06"; + /** * Hoodie index desired puts operation time in seconds. */ @@ -115,12 +114,9 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig { private final Properties props = new Properties(); public HoodieHBaseIndexConfig.Builder fromFile(File propertiesFile) throws IOException { - FileReader reader = new FileReader(propertiesFile); - try { + try (FileReader reader = new FileReader(propertiesFile)) { this.props.load(reader); return this; - } finally { - reader.close(); } } @@ -194,6 +190,11 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig { return this; } + public Builder hbaseIndexSleepMsBetweenGetBatch(int sleepMsBetweenGetBatch) { + props.setProperty(HBASE_SLEEP_MS_GET_BATCH_PROP, String.valueOf(sleepMsBetweenGetBatch)); + return this; + } + public Builder withQPSResourceAllocatorType(String qpsResourceAllocatorClass) { props.setProperty(HBASE_INDEX_QPS_ALLOCATOR_CLASS, qpsResourceAllocatorClass); return this; @@ -217,7 +218,7 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig { /** *

* Method to set maximum QPS allowed per Region Server. This should be same across various jobs. This is intended to - * limit the aggregate QPS generated across various jobs to an Hbase Region Server. + * limit the aggregate QPS generated across various jobs to an HBase Region Server. *

*

* It is recommended to set this value based on your global indexing throughput needs and most importantly, how much @@ -238,7 +239,7 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig { setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_PROP), HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE)); setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP), - HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP, String.valueOf(DEFAULT_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE)); + HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP, DEFAULT_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE); setDefaultOnCondition(props, !props.containsKey(HBASE_QPS_FRACTION_PROP), HBASE_QPS_FRACTION_PROP, String.valueOf(DEFAULT_HBASE_QPS_FRACTION)); setDefaultOnCondition(props, !props.containsKey(HBASE_MAX_QPS_PER_REGION_SERVER_PROP), @@ -250,7 +251,7 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig { setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS), HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS, String.valueOf(DEFAULT_HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS)); setDefaultOnCondition(props, !props.containsKey(HBASE_ZK_PATH_QPS_ROOT), HBASE_ZK_PATH_QPS_ROOT, - String.valueOf(DEFAULT_HBASE_ZK_PATH_QPS_ROOT)); + DEFAULT_HBASE_ZK_PATH_QPS_ROOT); setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS), HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS, String.valueOf(DEFAULT_ZK_SESSION_TIMEOUT_MS)); setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS), diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java index f4672e93c..d39fae1e1 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java @@ -90,12 +90,9 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { private final Properties props = new Properties(); public Builder fromFile(File propertiesFile) throws IOException { - FileReader reader = new FileReader(propertiesFile); - try { + try (FileReader reader = new FileReader(propertiesFile)) { this.props.load(reader); return this; - } finally { - reader.close(); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java index f19a64c67..2aca01ecc 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java @@ -76,12 +76,9 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig { private final Properties props = new Properties(); public Builder fromFile(File propertiesFile) throws IOException { - FileReader reader = new FileReader(propertiesFile); - try { + try (FileReader reader = new FileReader(propertiesFile)) { this.props.load(reader); return this; - } finally { - reader.close(); } } @@ -141,9 +138,9 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig { // 0.6 is the default value used by Spark, // look at {@link // https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkConf.scala#L507} - double memoryFraction = Double.valueOf( + double memoryFraction = Double.parseDouble( SparkEnv.get().conf().get(SPARK_EXECUTOR_MEMORY_FRACTION_PROP, DEFAULT_SPARK_EXECUTOR_MEMORY_FRACTION)); - double maxMemoryFractionForMerge = Double.valueOf(maxMemoryFraction); + double maxMemoryFractionForMerge = Double.parseDouble(maxMemoryFraction); double userAvailableMemory = executorMemoryInBytes * (1 - memoryFraction); long maxMemoryForMerge = (long) Math.floor(userAvailableMemory * maxMemoryFractionForMerge); return Math.max(DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES, maxMemoryForMerge); diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java index 4bb8a3fae..a21e4cc27 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java @@ -70,12 +70,9 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig { private final Properties props = new Properties(); public Builder fromFile(File propertiesFile) throws IOException { - FileReader reader = new FileReader(propertiesFile); - try { + try (FileReader reader = new FileReader(propertiesFile)) { this.props.load(reader); return this; - } finally { - reader.close(); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java index 24cf19027..5ae221a3a 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java @@ -66,12 +66,9 @@ public class HoodieStorageConfig extends DefaultHoodieConfig { private final Properties props = new Properties(); public Builder fromFile(File propertiesFile) throws IOException { - FileReader reader = new FileReader(propertiesFile); - try { + try (FileReader reader = new FileReader(propertiesFile)) { this.props.load(reader); return this; - } finally { - reader.close(); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 488ad75ff..0e9dab0dc 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -335,11 +335,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { } public int getHbaseIndexGetBatchSize() { - return Integer.valueOf(props.getProperty(HoodieHBaseIndexConfig.HBASE_GET_BATCH_SIZE_PROP)); + return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_GET_BATCH_SIZE_PROP)); } public int getHbaseIndexPutBatchSize() { - return Integer.valueOf(props.getProperty(HoodieHBaseIndexConfig.HBASE_PUT_BATCH_SIZE_PROP)); + return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_PUT_BATCH_SIZE_PROP)); } public Boolean getHbaseIndexPutBatchSizeAutoCompute() { @@ -363,11 +363,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { } public boolean getHBaseIndexShouldComputeQPSDynamically() { - return Boolean.valueOf(props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY)); + return Boolean.parseBoolean(props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY)); } public int getHBaseIndexDesiredPutsTime() { - return Integer.valueOf(props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS)); + return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS)); } public String getBloomFilterType() { @@ -455,7 +455,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { } public double getParquetCompressionRatio() { - return Double.valueOf(props.getProperty(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO)); + return Double.parseDouble(props.getProperty(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO)); } public CompressionCodecName getParquetCompressionCodec() { @@ -463,7 +463,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { } public double getLogFileToParquetCompressionRatio() { - return Double.valueOf(props.getProperty(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO)); + return Double.parseDouble(props.getProperty(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO)); } /** @@ -517,7 +517,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { } public int getMaxDFSStreamBufferSize() { - return Integer.valueOf(props.getProperty(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP)); + return Integer.parseInt(props.getProperty(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP)); } public String getSpillableMapBasePath() { @@ -525,7 +525,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { } public double getWriteStatusFailureFraction() { - return Double.valueOf(props.getProperty(HoodieMemoryConfig.WRITESTATUS_FAILURE_FRACTION_PROP)); + return Double.parseDouble(props.getProperty(HoodieMemoryConfig.WRITESTATUS_FAILURE_FRACTION_PROP)); } public ConsistencyGuardConfig getConsistencyGuardConfig() { @@ -564,12 +564,9 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private boolean isConsistencyGuardSet = false; public Builder fromFile(File propertiesFile) throws IOException { - FileReader reader = new FileReader(propertiesFile); - try { + try (FileReader reader = new FileReader(propertiesFile)) { this.props.load(reader); return this; - } finally { - reader.close(); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/BloomIndexFileInfo.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/BloomIndexFileInfo.java index 655d99bb7..c04181405 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/BloomIndexFileInfo.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/BloomIndexFileInfo.java @@ -65,6 +65,8 @@ public class BloomIndexFileInfo implements Serializable { * Does the given key fall within the range (inclusive). */ public boolean isKeyInRange(String recordKey) { + assert minRecordKey != null; + assert maxRecordKey != null; return minRecordKey.compareTo(recordKey) <= 0 && maxRecordKey.compareTo(recordKey) >= 0; } diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java index 804b2cea3..fba168539 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java @@ -180,10 +180,10 @@ public class HoodieBloomIndex extends HoodieIndex .mapToPair(t -> t).countByKey(); } else { fileToComparisons = new HashMap<>(); - partitionToFileInfo.entrySet().stream().forEach(e -> { - for (BloomIndexFileInfo fileInfo : e.getValue()) { + partitionToFileInfo.forEach((key, value) -> { + for (BloomIndexFileInfo fileInfo : value) { // each file needs to be compared against all the records coming into the partition - fileToComparisons.put(fileInfo.getFileId(), recordsPerPartition.get(e.getKey())); + fileToComparisons.put(fileInfo.getFileId(), recordsPerPartition.get(key)); } }); } diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java index ea670bb8a..50d31f9cc 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java @@ -46,12 +46,10 @@ class IntervalTreeBasedGlobalIndexFileFilter implements IndexFileFilter { IntervalTreeBasedGlobalIndexFileFilter(final Map> partitionToFileIndexInfo) { List allIndexFiles = new ArrayList<>(); - partitionToFileIndexInfo.forEach((parition, bloomIndexFileInfoList) -> { - bloomIndexFileInfoList.forEach(file -> { - fileIdToPartitionPathMap.put(file.getFileId(), parition); - allIndexFiles.add(file); - }); - }); + partitionToFileIndexInfo.forEach((parition, bloomIndexFileInfoList) -> bloomIndexFileInfoList.forEach(file -> { + fileIdToPartitionPathMap.put(file.getFileId(), parition); + allIndexFiles.add(file); + })); // Note that the interval tree implementation doesn't have auto-balancing to ensure logN search time. // So, we are shuffling the input here hoping the tree will not have any skewness. If not, the tree could be skewed diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/ListBasedGlobalIndexFileFilter.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/ListBasedGlobalIndexFileFilter.java index 6f6fbdaec..dce763a17 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/ListBasedGlobalIndexFileFilter.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/ListBasedGlobalIndexFileFilter.java @@ -39,13 +39,11 @@ class ListBasedGlobalIndexFileFilter extends ListBasedIndexFileFilter { @Override public Set> getMatchingFilesAndPartition(String partitionPath, String recordKey) { Set> toReturn = new HashSet<>(); - partitionToFileIndexInfo.forEach((partition, bloomIndexFileInfoList) -> { - bloomIndexFileInfoList.forEach(file -> { - if (shouldCompareWithFile(file, recordKey)) { - toReturn.add(Pair.of(partition, file.getFileId())); - } - }); - }); + partitionToFileIndexInfo.forEach((partition, bloomIndexFileInfoList) -> bloomIndexFileInfoList.forEach(file -> { + if (shouldCompareWithFile(file, recordKey)) { + toReturn.add(Pair.of(partition, file.getFileId())); + } + })); return toReturn; } } diff --git a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java index 1bb46c14f..5308a1052 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java @@ -29,7 +29,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieHBaseIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieDependentSystemUnavailableException; import org.apache.hudi.exception.HoodieIndexException; @@ -39,6 +39,7 @@ import org.apache.hudi.table.HoodieTable; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -89,7 +90,7 @@ public class HBaseIndex extends HoodieIndex { private int maxQpsPerRegionServer; /** * multiPutBatchSize will be computed and re-set in updateLocation if - * {@link HoodieIndexConfig.HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP} is set to true. + * {@link HoodieHBaseIndexConfig#HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP} is set to true. */ private Integer multiPutBatchSize; private Integer numRegionServersForTable; @@ -115,9 +116,8 @@ public class HBaseIndex extends HoodieIndex { public HBaseIndexQPSResourceAllocator createQPSResourceAllocator(HoodieWriteConfig config) { try { LOG.info("createQPSResourceAllocator : {}", config.getHBaseQPSResourceAllocatorClass()); - final HBaseIndexQPSResourceAllocator resourceAllocator = (HBaseIndexQPSResourceAllocator) ReflectionUtils - .loadClass(config.getHBaseQPSResourceAllocatorClass(), config); - return resourceAllocator; + return (HBaseIndexQPSResourceAllocator) ReflectionUtils + .loadClass(config.getHBaseQPSResourceAllocatorClass(), config); } catch (Exception e) { LOG.warn("error while instantiating HBaseIndexQPSResourceAllocator", e); } @@ -149,20 +149,17 @@ public class HBaseIndex extends HoodieIndex { } /** - * Since we are sharing the HbaseConnection across tasks in a JVM, make sure the HbaseConnectio is closed when JVM + * Since we are sharing the HBaseConnection across tasks in a JVM, make sure the HBaseConnection is closed when JVM * exits. */ private void addShutDownHook() { - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - try { - hbaseConnection.close(); - } catch (Exception e) { - // fail silently for any sort of exception - } + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + hbaseConnection.close(); + } catch (Exception e) { + // fail silently for any sort of exception } - }); + })); } /** @@ -197,7 +194,7 @@ public class HBaseIndex extends HoodieIndex { return (Function2>, Iterator>>) (partitionNum, hoodieRecordIterator) -> { - Integer multiGetBatchSize = config.getHbaseIndexGetBatchSize(); + int multiGetBatchSize = config.getHbaseIndexGetBatchSize(); // Grab the global HBase connection synchronized (HBaseIndex.class) { @@ -485,7 +482,7 @@ public class HBaseIndex extends HoodieIndex { try (Connection conn = getHBaseConnection()) { RegionLocator regionLocator = conn.getRegionLocator(TableName.valueOf(tableName)); numRegionServersForTable = Math - .toIntExact(regionLocator.getAllRegionLocations().stream().map(e -> e.getServerName()).distinct().count()); + .toIntExact(regionLocator.getAllRegionLocations().stream().map(HRegionLocation::getServerName).distinct().count()); return numRegionServersForTable; } catch (IOException e) { LOG.error("Error while connecting HBase:", e); diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 58eafac09..33e4417f6 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -207,7 +207,7 @@ public class HoodieAppendHandle extends HoodieWri recordList.clear(); } if (keysToDelete.size() > 0) { - writer = writer.appendBlock(new HoodieDeleteBlock(keysToDelete.stream().toArray(HoodieKey[]::new), header)); + writer = writer.appendBlock(new HoodieDeleteBlock(keysToDelete.toArray(new HoodieKey[keysToDelete.size()]), header)); keysToDelete.clear(); } } catch (Exception e) { diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java index e8a06a6b8..d75df4b8e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieDataFile; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTimeline; @@ -101,19 +102,16 @@ public class HoodieCleanHelper> implements Seri && (cleanMetadata.getEarliestCommitToRetain().length() > 0)) { LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed " + "since last cleaned at {}. New Instant to retain : {}", cleanMetadata.getEarliestCommitToRetain(), newInstantToRetain); - return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(instant -> { - return HoodieTimeline.compareTimestamps(instant.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(), - HoodieTimeline.GREATER_OR_EQUAL) && HoodieTimeline.compareTimestamps(instant.getTimestamp(), - newInstantToRetain.get().getTimestamp(), HoodieTimeline.LESSER); - }).flatMap(instant -> { - try { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class); - return commitMetadata.getPartitionToWriteStats().keySet().stream(); - } catch (IOException e) { - throw new HoodieIOException(e.getMessage(), e); - } - }).distinct().collect(Collectors.toList()); + return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(), + HoodieTimeline.GREATER_OR_EQUAL) && HoodieTimeline.compareTimestamps(instant.getTimestamp(), + newInstantToRetain.get().getTimestamp(), HoodieTimeline.LESSER)).flatMap(instant -> { + try { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class); + return commitMetadata.getPartitionToWriteStats().keySet().stream(); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + }).distinct().collect(Collectors.toList()); } } } @@ -127,7 +125,7 @@ public class HoodieCleanHelper> implements Seri * policy is useful, if you are simply interested in querying the table, and you don't want too many versions for a * single file (i.e run it with versionsRetained = 1) */ - private List getFilesToCleanKeepingLatestVersions(String partitionPath) throws IOException { + private List getFilesToCleanKeepingLatestVersions(String partitionPath) { LOG.info("Cleaning {}, retaining latest {} file versions. ", partitionPath, config.getCleanerFileVersionsRetained()); List fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList()); List deletePaths = new ArrayList<>(); @@ -164,7 +162,7 @@ public class HoodieCleanHelper> implements Seri } if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { // If merge on read, then clean the log files for the commits as well - deletePaths.addAll(nextSlice.getLogFiles().map(file -> file.getFileName()).collect(Collectors.toList())); + deletePaths.addAll(nextSlice.getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList())); } } } @@ -185,7 +183,7 @@ public class HoodieCleanHelper> implements Seri *

* This policy is the default. */ - private List getFilesToCleanKeepingLatestCommits(String partitionPath) throws IOException { + private List getFilesToCleanKeepingLatestCommits(String partitionPath) { int commitsRetained = config.getCleanerCommitsRetained(); LOG.info("Cleaning {}, retaining latest {} commits. ", partitionPath, commitsRetained); List deletePaths = new ArrayList<>(); @@ -235,7 +233,7 @@ public class HoodieCleanHelper> implements Seri aFile.ifPresent(hoodieDataFile -> deletePaths.add(hoodieDataFile.getFileName())); if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { // If merge on read, then clean the log files for the commits as well - deletePaths.addAll(aSlice.getLogFiles().map(file -> file.getFileName()).collect(Collectors.toList())); + deletePaths.addAll(aSlice.getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList())); } } } @@ -264,7 +262,7 @@ public class HoodieCleanHelper> implements Seri /** * Returns files to be cleaned for the given partitionPath based on cleaning policy. */ - public List getDeletePaths(String partitionPath) throws IOException { + public List getDeletePaths(String partitionPath) { HoodieCleaningPolicy policy = config.getCleanerPolicy(); List deletePaths; if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java index c4ce91a56..9baad754d 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java @@ -60,6 +60,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -152,7 +153,7 @@ public class HoodieCommitArchiveLog { } else { return new ArrayList(); } - }).flatMap(i -> i.stream()); + }).flatMap(Collection::stream); // TODO (na) : Add a way to return actions associated with a timeline and then merge/unify // with logic above to avoid Stream.concats @@ -171,9 +172,7 @@ public class HoodieCommitArchiveLog { s.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL)); }).filter(s -> { // Ensure commits >= oldest pending compaction commit is retained - return oldestPendingCompactionInstant.map(instant -> { - return HoodieTimeline.compareTimestamps(instant.getTimestamp(), s.getTimestamp(), HoodieTimeline.GREATER); - }).orElse(true); + return oldestPendingCompactionInstant.map(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), s.getTimestamp(), HoodieTimeline.GREATER)).orElse(true); }).limit(commitTimeline.countInstants() - minCommitsToKeep)); } @@ -204,10 +203,8 @@ public class HoodieCommitArchiveLog { } // Remove older meta-data from auxiliary path too - Option latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> { - return i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION) - || (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION))); - }).max(Comparator.comparing(HoodieInstant::getTimestamp))); + Option latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION) + || (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp))); LOG.info("Latest Committed Instant={}", latestCommitted); if (latestCommitted.isPresent()) { success &= deleteAllInstantsOlderorEqualsInAuxMetaFolder(latestCommitted.get()); diff --git a/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/BoundedPartitionAwareCompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/BoundedPartitionAwareCompactionStrategy.java index abc7d9275..7c23d785b 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/BoundedPartitionAwareCompactionStrategy.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/BoundedPartitionAwareCompactionStrategy.java @@ -50,13 +50,11 @@ public class BoundedPartitionAwareCompactionStrategy extends DayBasedCompactionS String earliestPartitionPathToCompact = dateFormat.format(getDateAtOffsetFromToday(-1 * writeConfig.getTargetPartitionsPerDayBasedCompaction())); // Filter out all partitions greater than earliestPartitionPathToCompact - List eligibleCompactionOperations = - operations.stream().collect(Collectors.groupingBy(HoodieCompactionOperation::getPartitionPath)).entrySet() - .stream().sorted(Map.Entry.comparingByKey(comparator)) - .filter(e -> comparator.compare(earliestPartitionPathToCompact, e.getKey()) >= 0) - .flatMap(e -> e.getValue().stream()).collect(Collectors.toList()); - return eligibleCompactionOperations; + return operations.stream().collect(Collectors.groupingBy(HoodieCompactionOperation::getPartitionPath)).entrySet() + .stream().sorted(Map.Entry.comparingByKey(comparator)) + .filter(e -> comparator.compare(earliestPartitionPathToCompact, e.getKey()) >= 0) + .flatMap(e -> e.getValue().stream()).collect(Collectors.toList()); } @Override @@ -65,10 +63,9 @@ public class BoundedPartitionAwareCompactionStrategy extends DayBasedCompactionS String earliestPartitionPathToCompact = dateFormat.format(getDateAtOffsetFromToday(-1 * writeConfig.getTargetPartitionsPerDayBasedCompaction())); // Get all partitions and sort them - List filteredPartitionPaths = partitionPaths.stream().map(partition -> partition.replace("/", "-")) + return partitionPaths.stream().map(partition -> partition.replace("/", "-")) .sorted(Comparator.reverseOrder()).map(partitionPath -> partitionPath.replace("-", "/")) .filter(e -> comparator.compare(earliestPartitionPathToCompact, e) >= 0).collect(Collectors.toList()); - return filteredPartitionPaths; } @VisibleForTesting diff --git a/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/DayBasedCompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/DayBasedCompactionStrategy.java index 0de88bf65..a491818d1 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/DayBasedCompactionStrategy.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/DayBasedCompactionStrategy.java @@ -65,19 +65,17 @@ public class DayBasedCompactionStrategy extends CompactionStrategy { List operations, List pendingCompactionPlans) { // Iterate through the operations and accept operations as long as we are within the configured target partitions // limit - List filteredList = operations.stream() + return operations.stream() .collect(Collectors.groupingBy(HoodieCompactionOperation::getPartitionPath)).entrySet().stream() .sorted(Map.Entry.comparingByKey(comparator)).limit(writeConfig.getTargetPartitionsPerDayBasedCompaction()) .flatMap(e -> e.getValue().stream()).collect(Collectors.toList()); - return filteredList; } @Override public List filterPartitionPaths(HoodieWriteConfig writeConfig, List allPartitionPaths) { - List filteredPartitionPaths = allPartitionPaths.stream().map(partition -> partition.replace("/", "-")) + return allPartitionPaths.stream().map(partition -> partition.replace("/", "-")) .sorted(Comparator.reverseOrder()).map(partitionPath -> partitionPath.replace("-", "/")) .collect(Collectors.toList()).subList(0, writeConfig.getTargetPartitionsPerDayBasedCompaction()); - return filteredPartitionPaths; } /** diff --git a/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/LogFileSizeBasedCompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/LogFileSizeBasedCompactionStrategy.java index 4c9ee5fd2..11c2752ce 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/LogFileSizeBasedCompactionStrategy.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/LogFileSizeBasedCompactionStrategy.java @@ -49,7 +49,7 @@ public class LogFileSizeBasedCompactionStrategy extends BoundedIOCompactionStrat // Total size of all the log files Long totalLogFileSize = logFiles.stream().map(HoodieLogFile::getFileSize).filter(size -> size >= 0) - .reduce((size1, size2) -> size1 + size2).orElse(0L); + .reduce(Long::sum).orElse(0L); // save the metrics needed during the order metrics.put(TOTAL_LOG_FILE_SIZE, totalLogFileSize.doubleValue()); return metrics; diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java index a2922fd38..c0dd90502 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java @@ -41,8 +41,8 @@ public class HoodieMetrics { public String finalizeTimerName = null; public String compactionTimerName = null; public String indexTimerName = null; - private HoodieWriteConfig config = null; - private String tableName = null; + private HoodieWriteConfig config; + private String tableName; private Timer rollbackTimer = null; private Timer cleanTimer = null; private Timer commitTimer = null; diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java b/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java index d8c4ae6bd..a3e95fe44 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java @@ -40,18 +40,16 @@ public class JmxMetricsReporter extends MetricsReporter { private static final Logger LOG = LoggerFactory.getLogger(JmxMetricsReporter.class); private final JMXConnectorServer connector; - private String host; - private int port; public JmxMetricsReporter(HoodieWriteConfig config) { try { // Check the host and port here - this.host = config.getJmxHost(); - this.port = config.getJmxPort(); + String host = config.getJmxHost(); + int port = config.getJmxPort(); if (host == null || port == 0) { throw new RuntimeException( String.format("Jmx cannot be initialized with host[%s] and port[%s].", - host, port)); + host, port)); } LocateRegistry.createRegistry(port); String serviceUrl = diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java index 24553f5ec..1ccf026d0 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java @@ -315,14 +315,14 @@ public class HoodieCopyOnWriteTable extends Hoodi @Override public List clean(JavaSparkContext jsc, HoodieInstant cleanInstant, HoodieCleanerPlan cleanerPlan) { int cleanerParallelism = Math.min( - (int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(x -> x.size()).count()), + (int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()), config.getCleanerParallelism()); LOG.info("Using cleanerParallelism: {}", cleanerParallelism); List> partitionCleanStats = jsc .parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream() - .flatMap(x -> x.getValue().stream().map(y -> new Tuple2(x.getKey(), y))) + .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), y))) .collect(Collectors.toList()), cleanerParallelism) - .mapPartitionsToPair(deleteFilesFunc(this)).reduceByKey((e1, e2) -> e1.merge(e2)).collect(); + .mapPartitionsToPair(deleteFilesFunc(this)).reduceByKey(PartitionCleanStat::merge).collect(); Map partitionCleanStatsMap = partitionCleanStats.stream().collect(Collectors.toMap(Tuple2::_1, Tuple2::_2)); @@ -348,9 +348,8 @@ public class HoodieCopyOnWriteTable extends Hoodi @Override public List rollback(JavaSparkContext jsc, HoodieInstant instant, boolean deleteInstants) throws IOException { - Long startTime = System.currentTimeMillis(); + long startTime = System.currentTimeMillis(); List stats = new ArrayList<>(); - String actionType = metaClient.getCommitActionType(); HoodieActiveTimeline activeTimeline = this.getActiveTimeline(); if (instant.isCompleted()) { @@ -379,9 +378,8 @@ public class HoodieCopyOnWriteTable extends Hoodi private List generateRollbackRequests(HoodieInstant instantToRollback) throws IOException { return FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(), - config.shouldAssumeDatePartitioning()).stream().map(partitionPath -> { - return RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback); - }).collect(Collectors.toList()); + config.shouldAssumeDatePartitioning()).stream().map(partitionPath -> RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback)) + .collect(Collectors.toList()); } @@ -541,7 +539,7 @@ public class HoodieCopyOnWriteTable extends Hoodi /** * List of all small files to be corrected. */ - List smallFiles = new ArrayList(); + List smallFiles = new ArrayList<>(); /** * Total number of RDD partitions, is determined by total buckets we want to pack the incoming workload into. */ @@ -567,7 +565,6 @@ public class HoodieCopyOnWriteTable extends Hoodi * Rolling stats for files. */ protected HoodieRollingStatMetadata rollingStatMetadata; - protected long averageRecordSize; UpsertPartitioner(WorkloadProfile profile) { updateLocationToBucket = new HashMap<>(); diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java index c020cc942..884577294 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java @@ -167,7 +167,7 @@ public class HoodieMergeOnReadTable extends Hoodi @Override public List rollback(JavaSparkContext jsc, HoodieInstant instant, boolean deleteInstants) throws IOException { - Long startTime = System.currentTimeMillis(); + long startTime = System.currentTimeMillis(); String commit = instant.getTimestamp(); LOG.error("Rolling back instant {}", instant); @@ -344,14 +344,9 @@ public class HoodieMergeOnReadTable extends Hoodi // TODO : choose last N small files since there can be multiple small files written to a single partition // by different spark partitions in a single batch Option smallFileSlice = Option.fromJavaOptional(getRTFileSystemView() - .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false) - .filter(fileSlice -> fileSlice.getLogFiles().count() < 1 - && fileSlice.getDataFile().get().getFileSize() < config.getParquetSmallFileLimit()) - .sorted((FileSlice left, - FileSlice right) -> left.getDataFile().get().getFileSize() < right.getDataFile().get().getFileSize() - ? -1 - : 1) - .findFirst()); + .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false) + .filter(fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getDataFile().get().getFileSize() < config.getParquetSmallFileLimit()) + .min((FileSlice left, FileSlice right) -> left.getDataFile().get().getFileSize() < right.getDataFile().get().getFileSize() ? -1 : 1)); if (smallFileSlice.isPresent()) { allSmallFileSlices.add(smallFileSlice.get()); } @@ -362,7 +357,7 @@ public class HoodieMergeOnReadTable extends Hoodi getRTFileSystemView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true) .collect(Collectors.toList()); for (FileSlice fileSlice : allFileSlices) { - if (isSmallFile(partitionPath, fileSlice)) { + if (isSmallFile(fileSlice)) { allSmallFileSlices.add(fileSlice); } } @@ -374,7 +369,7 @@ public class HoodieMergeOnReadTable extends Hoodi // TODO : Move logic of file name, file id, base commit time handling inside file slice String filename = smallFileSlice.getDataFile().get().getFileName(); sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); - sf.sizeBytes = getTotalFileSize(partitionPath, smallFileSlice); + sf.sizeBytes = getTotalFileSize(smallFileSlice); smallFileLocations.add(sf); // Update the global small files list smallFiles.add(sf); @@ -382,7 +377,7 @@ public class HoodieMergeOnReadTable extends Hoodi HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get(); sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()), FSUtils.getFileIdFromLogPath(logFile.getPath())); - sf.sizeBytes = getTotalFileSize(partitionPath, smallFileSlice); + sf.sizeBytes = getTotalFileSize(smallFileSlice); smallFileLocations.add(sf); // Update the global small files list smallFiles.add(sf); @@ -397,7 +392,7 @@ public class HoodieMergeOnReadTable extends Hoodi .collect(Collectors.toList()); } - private long getTotalFileSize(String partitionPath, FileSlice fileSlice) { + private long getTotalFileSize(FileSlice fileSlice) { if (!fileSlice.getDataFile().isPresent()) { return convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList())); } else { @@ -406,22 +401,20 @@ public class HoodieMergeOnReadTable extends Hoodi } } - private boolean isSmallFile(String partitionPath, FileSlice fileSlice) { - long totalSize = getTotalFileSize(partitionPath, fileSlice); + private boolean isSmallFile(FileSlice fileSlice) { + long totalSize = getTotalFileSize(fileSlice); return totalSize < config.getParquetMaxFileSize(); } // TODO (NA) : Make this static part of utility @VisibleForTesting public long convertLogFilesSizeToExpectedParquetSize(List hoodieLogFiles) { - long totalSizeOfLogFiles = hoodieLogFiles.stream().map(hoodieLogFile -> hoodieLogFile.getFileSize()) - .filter(size -> size > 0).reduce((a, b) -> (a + b)).orElse(0L); + long totalSizeOfLogFiles = hoodieLogFiles.stream().map(HoodieLogFile::getFileSize) + .filter(size -> size > 0).reduce(Long::sum).orElse(0L); // Here we assume that if there is no base parquet file, all log files contain only inserts. // We can then just get the parquet equivalent size of these log files, compare that with // {@link config.getParquetMaxFileSize()} and decide if there is scope to insert more rows - long logFilesEquivalentParquetFileSize = - (long) (totalSizeOfLogFiles * config.getLogFileToParquetCompressionRatio()); - return logFilesEquivalentParquetFileSize; + return (long) (totalSizeOfLogFiles * config.getLogFileToParquetCompressionRatio()); } } @@ -439,7 +432,7 @@ public class HoodieMergeOnReadTable extends Hoodi return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> { // Filter out stats without prevCommit since they are all inserts - boolean validForRollback = (wStat != null) && (wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT) + boolean validForRollback = (wStat != null) && (!wStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT)) && (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId()); if (validForRollback) { diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java index 2f2105c2d..5bb0ffa70 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -223,7 +223,7 @@ public abstract class HoodieTable implements Seri "Could not get data files for savepoint " + savepointTime + ". No such savepoint."); } HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime); - HoodieSavepointMetadata metadata = null; + HoodieSavepointMetadata metadata; try { metadata = AvroUtils.deserializeHoodieSavepointMetadata(getActiveTimeline().getInstantDetails(instant).get()); } catch (IOException e) { diff --git a/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java index a87b4bed4..236f44028 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java @@ -90,19 +90,18 @@ public class RollbackExecutor implements Serializable { case DELETE_DATA_FILES_ONLY: { deleteCleanedFiles(metaClient, config, filesToDeletedStatus, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath()); - return new Tuple2(rollbackRequest.getPartitionPath(), - HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .withDeletedFileResults(filesToDeletedStatus).build()); + return new Tuple2<>(rollbackRequest.getPartitionPath(), + HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) + .withDeletedFileResults(filesToDeletedStatus).build()); } case DELETE_DATA_AND_LOG_FILES: { deleteCleanedFiles(metaClient, config, filesToDeletedStatus, rollbackRequest.getPartitionPath(), filter); - return new Tuple2(rollbackRequest.getPartitionPath(), - HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .withDeletedFileResults(filesToDeletedStatus).build()); + return new Tuple2<>(rollbackRequest.getPartitionPath(), + HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) + .withDeletedFileResults(filesToDeletedStatus).build()); } case APPEND_ROLLBACK_BLOCK: { Writer writer = null; - boolean success = false; try { writer = HoodieLogFormat.newWriterBuilder() .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath())) @@ -114,7 +113,6 @@ public class RollbackExecutor implements Serializable { Map header = generateHeader(instantToRollback.getTimestamp()); // if update belongs to an existing log file writer = writer.appendBlock(new HoodieCommandBlock(header)); - success = true; } catch (IOException | InterruptedException io) { throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io); } finally { @@ -131,10 +129,10 @@ public class RollbackExecutor implements Serializable { // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in // cloud-storage : HUDI-168 Map filesToNumBlocksRollback = new HashMap<>(); - filesToNumBlocksRollback.put(metaClient.getFs().getFileStatus(writer.getLogFile().getPath()), 1L); - return new Tuple2(rollbackRequest.getPartitionPath(), - HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .withRollbackBlockAppendResults(filesToNumBlocksRollback).build()); + filesToNumBlocksRollback.put(metaClient.getFs().getFileStatus(Preconditions.checkNotNull(writer).getLogFile().getPath()), 1L); + return new Tuple2<>(rollbackRequest.getPartitionPath(), + HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) + .withRollbackBlockAppendResults(filesToNumBlocksRollback).build()); } default: throw new IllegalStateException("Unknown Rollback action " + rollbackRequest); diff --git a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java index 72188b2e9..47459cd5c 100644 --- a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java +++ b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java @@ -72,7 +72,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.TreeSet; import java.util.function.Predicate; @@ -436,8 +435,7 @@ public class TestCleaner extends TestHoodieClientBase { final HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieCleanMetadata cleanMetadata2 = writeClient.runClean(table, HoodieTimeline.getCleanInflightInstant(cleanInstantTs)); - Assert.assertTrue( - Objects.equals(cleanMetadata1.getEarliestCommitToRetain(), cleanMetadata2.getEarliestCommitToRetain())); + Assert.assertEquals(cleanMetadata1.getEarliestCommitToRetain(), cleanMetadata2.getEarliestCommitToRetain()); Assert.assertEquals(new Integer(0), cleanMetadata2.getTotalFilesDeleted()); Assert.assertEquals(cleanMetadata1.getPartitionMetadata().keySet(), cleanMetadata2.getPartitionMetadata().keySet()); diff --git a/hudi-client/src/test/java/org/apache/hudi/TestCompactionAdminClient.java b/hudi-client/src/test/java/org/apache/hudi/TestCompactionAdminClient.java index 6edd0f3f2..2ce452ad3 100644 --- a/hudi-client/src/test/java/org/apache/hudi/TestCompactionAdminClient.java +++ b/hudi-client/src/test/java/org/apache/hudi/TestCompactionAdminClient.java @@ -49,6 +49,8 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.CompactionAdminClient.getRenamingActionsToAlignWithCompactionOperation; +import static org.apache.hudi.CompactionAdminClient.renameLogFile; import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ; public class TestCompactionAdminClient extends TestHoodieClientBase { @@ -139,10 +141,10 @@ public class TestCompactionAdminClient extends TestHoodieClientBase { } // Now repair List> undoFiles = - result.stream().flatMap(r -> client.getRenamingActionsToAlignWithCompactionOperation(metaClient, + result.stream().flatMap(r -> getRenamingActionsToAlignWithCompactionOperation(metaClient, compactionInstant, r.getOperation(), Option.empty()).stream()).map(rn -> { try { - client.renameLogFile(metaClient, rn.getKey(), rn.getValue()); + renameLogFile(metaClient, rn.getKey(), rn.getValue()); } catch (IOException e) { throw new HoodieIOException(e.getMessage(), e); } @@ -248,7 +250,7 @@ public class TestCompactionAdminClient extends TestHoodieClientBase { // Do the renaming only but do not touch the compaction plan - Needed for repair tests renameFiles.forEach(lfPair -> { try { - client.renameLogFile(metaClient, lfPair.getLeft(), lfPair.getRight()); + renameLogFile(metaClient, lfPair.getLeft(), lfPair.getRight()); } catch (IOException e) { throw new HoodieIOException(e.getMessage(), e); } diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java index 5a02adc88..29d039a34 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java @@ -62,11 +62,11 @@ public class TestBucketizedBloomCheckPartitioner { }; BucketizedBloomCheckPartitioner partitioner = new BucketizedBloomCheckPartitioner(100, comparisons1, 10); Map> assignments = partitioner.getFileGroupToPartitions(); - assignments.entrySet().stream().forEach(e -> assertEquals(10, e.getValue().size())); + assignments.forEach((key, value) -> assertEquals(10, value.size())); Map partitionToNumBuckets = assignments.entrySet().stream().flatMap(e -> e.getValue().stream().map(p -> Pair.of(p, e.getKey()))) .collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting())); - partitionToNumBuckets.entrySet().stream().forEach(e -> assertEquals(1L, e.getValue().longValue())); + partitionToNumBuckets.forEach((key, value) -> assertEquals(1L, value.longValue())); } @Test