From 5b7bb142dc6712c41fd8ada208ab3186369431f9 Mon Sep 17 00:00:00 2001 From: Suneel Marthi Date: Sun, 2 Feb 2020 11:03:44 +0100 Subject: [PATCH] [HUDI-583] Code Cleanup, remove redundant code, and other changes (#1237) --- .../org/apache/hudi/HoodieWriteClient.java | 11 +- .../hudi/avro/MercifulJsonConverter.java | 20 +- .../filter/InternalDynamicBloomFilter.java | 35 ++-- .../bloom/filter/SimpleBloomFilter.java | 5 +- .../io/storage/HoodieWrapperFileSystem.java | 2 +- .../hudi/common/table/TimelineLayout.java | 4 +- .../table/timeline/HoodieActiveTimeline.java | 12 +- .../table/timeline/HoodieDefaultTimeline.java | 2 +- .../common/table/timeline/HoodieInstant.java | 6 +- .../table/view/FileSystemViewManager.java | 4 +- .../view/FileSystemViewStorageConfig.java | 13 +- .../view/RocksDbBasedFileSystemView.java | 182 +++++++++--------- .../view/SpillableMapBasedFileSystemView.java | 4 +- .../apache/hudi/common/util/AvroUtils.java | 4 +- .../common/util/BufferedRandomAccessFile.java | 4 +- .../apache/hudi/common/util/CleanerUtils.java | 3 +- .../hudi/common/util/CompactionUtils.java | 42 ++-- .../common/util/ConsistencyGuardConfig.java | 5 +- .../org/apache/hudi/common/util/FSUtils.java | 26 +-- .../common/util/FailSafeConsistencyGuard.java | 4 +- .../hudi/common/util/HoodieAvroUtils.java | 10 +- .../hudi/common/util/LogReaderUtils.java | 2 +- .../common/util/ObjectSizeCalculator.java | 8 +- .../apache/hudi/common/util/RocksDBDAO.java | 14 +- .../hudi/common/util/SerializationUtils.java | 4 +- .../util/collection/RocksDBBasedMap.java | 6 +- .../util/queue/BoundedInMemoryQueue.java | 4 +- .../hudi/common/model/HoodieTestUtils.java | 48 +++-- .../table/TestHoodieTableMetaClient.java | 4 +- .../common/table/log/TestHoodieLogFormat.java | 2 +- .../view/TestHoodieTableFileSystemView.java | 6 +- .../table/view/TestIncrementalFSViewSync.java | 104 +++++----- .../apache/hudi/common/util/TestFSUtils.java | 6 +- .../apache/hudi/hadoop/HoodieHiveUtil.java | 4 +- .../hudi/hadoop/HoodieParquetInputFormat.java | 24 ++- .../hive/HoodieCombineHiveInputFormat.java | 86 ++++----- .../AbstractRealtimeRecordReader.java | 18 +- .../HoodieParquetRealtimeInputFormat.java | 4 +- .../realtime/HoodieRealtimeRecordReader.java | 2 +- .../RealtimeCompactedRecordReader.java | 2 +- .../RealtimeUnmergedRecordReader.java | 7 +- .../hudi/hadoop/InputFormatTestUtil.java | 25 +-- .../hudi/hadoop/InputPathHandlerTest.java | 2 +- .../hadoop/TestHoodieParquetInputFormat.java | 6 +- .../TestHoodieRealtimeRecordReader.java | 10 +- .../org/apache/hudi/hive/HiveSyncTool.java | 4 +- .../apache/hudi/hive/HoodieHiveClient.java | 7 +- .../org/apache/hudi/integ/ITTestBase.java | 2 +- .../java/org/apache/hudi/DataSourceUtils.java | 16 +- .../org/apache/hudi/AvroConversionUtils.scala | 4 +- .../scala/org/apache/hudi/DefaultSource.scala | 2 +- .../apache/hudi/HoodieSparkSqlWriter.scala | 4 +- .../test/scala/TestDataSourceDefaults.scala | 46 ++--- .../service/FileSystemViewHandler.java | 3 +- .../hudi/utilities/HDFSParquetImporter.java | 12 +- .../hudi/utilities/HiveIncrementalPuller.java | 11 +- .../apache/hudi/utilities/HoodieCleaner.java | 19 +- .../utilities/HoodieCompactionAdminTool.java | 5 +- .../hudi/utilities/HoodieCompactor.java | 2 +- .../utilities/HoodieWithTimelineServer.java | 14 +- .../apache/hudi/utilities/UtilHelpers.java | 7 +- .../utilities/deltastreamer/Compactor.java | 2 +- .../utilities/deltastreamer/DeltaSync.java | 10 +- .../deltastreamer/HoodieDeltaStreamer.java | 43 ++--- .../HoodieDeltaStreamerMetrics.java | 4 +- .../deltastreamer/SchedulerConfGenerator.java | 2 +- .../utilities/sources/HoodieIncrSource.java | 8 +- .../sources/helpers/AvroConvertor.java | 3 +- .../TestTimestampBasedKeyGenerator.java | 3 +- 69 files changed, 447 insertions(+), 582 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java index 8ba0afd12..7e91df155 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java @@ -869,12 +869,11 @@ public class HoodieWriteClient extends AbstractHo LOG.info("Generate a new instant time " + instantTime); HoodieTableMetaClient metaClient = createMetaClient(true); // if there are pending compactions, their instantTime must not be greater than that of this instant time - metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending -> { - Preconditions.checkArgument( - HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), instantTime, HoodieTimeline.LESSER), - "Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :" - + latestPending + ", Ingesting at " + instantTime); - }); + metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending -> + Preconditions.checkArgument( + HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), instantTime, HoodieTimeline.LESSER), + "Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :" + + latestPending + ", Ingesting at " + instantTime)); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); String commitActionType = table.getMetaClient().getCommitActionType(); diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java index 2416eba7f..20b00f145 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java @@ -49,15 +49,13 @@ public class MercifulJsonConverter { * Build type processor map for each avro type. */ private static Map getFieldTypeProcessors() { - Map processorMap = - new ImmutableMap.Builder().put(Type.STRING, generateStringTypeHandler()) - .put(Type.BOOLEAN, generateBooleanTypeHandler()).put(Type.DOUBLE, generateDoubleTypeHandler()) - .put(Type.FLOAT, generateFloatTypeHandler()).put(Type.INT, generateIntTypeHandler()) - .put(Type.LONG, generateLongTypeHandler()).put(Type.ARRAY, generateArrayTypeHandler()) - .put(Type.RECORD, generateRecordTypeHandler()).put(Type.ENUM, generateEnumTypeHandler()) - .put(Type.MAP, generateMapTypeHandler()).put(Type.BYTES, generateBytesTypeHandler()) - .put(Type.FIXED, generateFixedTypeHandler()).build(); - return processorMap; + return new ImmutableMap.Builder().put(Type.STRING, generateStringTypeHandler()) + .put(Type.BOOLEAN, generateBooleanTypeHandler()).put(Type.DOUBLE, generateDoubleTypeHandler()) + .put(Type.FLOAT, generateFloatTypeHandler()).put(Type.INT, generateIntTypeHandler()) + .put(Type.LONG, generateLongTypeHandler()).put(Type.ARRAY, generateArrayTypeHandler()) + .put(Type.RECORD, generateRecordTypeHandler()).put(Type.ENUM, generateEnumTypeHandler()) + .put(Type.MAP, generateMapTypeHandler()).put(Type.BYTES, generateBytesTypeHandler()) + .put(Type.FIXED, generateFixedTypeHandler()).build(); } /** @@ -286,7 +284,7 @@ public class MercifulJsonConverter { public Pair convert(Object value, String name, Schema schema) throws HoodieJsonToAvroConversionException { Schema elementSchema = schema.getElementType(); - List listRes = new ArrayList(); + List listRes = new ArrayList<>(); for (Object v : (List) value) { listRes.add(convertJsonToAvroField(v, name, elementSchema)); } @@ -301,7 +299,7 @@ public class MercifulJsonConverter { public Pair convert(Object value, String name, Schema schema) throws HoodieJsonToAvroConversionException { Schema valueSchema = schema.getValueType(); - Map mapRes = new HashMap(); + Map mapRes = new HashMap<>(); for (Map.Entry v : ((Map) value).entrySet()) { mapRes.put(v.getKey(), convertJsonToAvroField(v.getValue(), name, valueSchema)); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/InternalDynamicBloomFilter.java b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/InternalDynamicBloomFilter.java index 5468ae919..a98e38e28 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/InternalDynamicBloomFilter.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/InternalDynamicBloomFilter.java @@ -98,8 +98,7 @@ class InternalDynamicBloomFilter extends InternalFilter { @Override public void and(InternalFilter filter) { - if (filter == null - || !(filter instanceof InternalDynamicBloomFilter) + if (!(filter instanceof InternalDynamicBloomFilter) || filter.vectorSize != this.vectorSize || filter.nbHash != this.nbHash) { throw new IllegalArgumentException("filters cannot be and-ed"); @@ -122,8 +121,8 @@ class InternalDynamicBloomFilter extends InternalFilter { return true; } - for (int i = 0; i < matrix.length; i++) { - if (matrix[i].membershipTest(key)) { + for (BloomFilter bloomFilter : matrix) { + if (bloomFilter.membershipTest(key)) { return true; } } @@ -133,15 +132,14 @@ class InternalDynamicBloomFilter extends InternalFilter { @Override public void not() { - for (int i = 0; i < matrix.length; i++) { - matrix[i].not(); + for (BloomFilter bloomFilter : matrix) { + bloomFilter.not(); } } @Override public void or(InternalFilter filter) { - if (filter == null - || !(filter instanceof InternalDynamicBloomFilter) + if (!(filter instanceof InternalDynamicBloomFilter) || filter.vectorSize != this.vectorSize || filter.nbHash != this.nbHash) { throw new IllegalArgumentException("filters cannot be or-ed"); @@ -159,8 +157,7 @@ class InternalDynamicBloomFilter extends InternalFilter { @Override public void xor(InternalFilter filter) { - if (filter == null - || !(filter instanceof InternalDynamicBloomFilter) + if (!(filter instanceof InternalDynamicBloomFilter) || filter.vectorSize != this.vectorSize || filter.nbHash != this.nbHash) { throw new IllegalArgumentException("filters cannot be xor-ed"); @@ -180,8 +177,8 @@ class InternalDynamicBloomFilter extends InternalFilter { public String toString() { StringBuilder res = new StringBuilder(); - for (int i = 0; i < matrix.length; i++) { - res.append(matrix[i]); + for (BloomFilter bloomFilter : matrix) { + res.append(bloomFilter); res.append(Character.LINE_SEPARATOR); } return res.toString(); @@ -195,8 +192,8 @@ class InternalDynamicBloomFilter extends InternalFilter { out.writeInt(nr); out.writeInt(currentNbRecord); out.writeInt(matrix.length); - for (int i = 0; i < matrix.length; i++) { - matrix[i].write(out); + for (BloomFilter bloomFilter : matrix) { + bloomFilter.write(out); } } @@ -217,13 +214,9 @@ class InternalDynamicBloomFilter extends InternalFilter { * Adds a new row to this dynamic Bloom filter. */ private void addRow() { - org.apache.hadoop.util.bloom.BloomFilter[] tmp = new org.apache.hadoop.util.bloom.BloomFilter[matrix.length + 1]; - - for (int i = 0; i < matrix.length; i++) { - tmp[i] = matrix[i]; - } - - tmp[tmp.length - 1] = new org.apache.hadoop.util.bloom.BloomFilter(vectorSize, nbHash, hashType); + BloomFilter[] tmp = new BloomFilter[matrix.length + 1]; + System.arraycopy(matrix, 0, tmp, 0, matrix.length); + tmp[tmp.length - 1] = new BloomFilter(vectorSize, nbHash, hashType); matrix = tmp; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/SimpleBloomFilter.java b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/SimpleBloomFilter.java index 0610e082e..a5e784811 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/SimpleBloomFilter.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/SimpleBloomFilter.java @@ -41,7 +41,7 @@ import java.nio.charset.StandardCharsets; public class SimpleBloomFilter implements BloomFilter { - private org.apache.hadoop.util.bloom.BloomFilter filter = null; + private org.apache.hadoop.util.bloom.BloomFilter filter; /** * Create a new Bloom filter with the given configurations. @@ -114,8 +114,7 @@ public class SimpleBloomFilter implements BloomFilter { filter.write(os); } - private void readObject(ObjectInputStream is) - throws IOException, ClassNotFoundException { + private void readObject(ObjectInputStream is) throws IOException { filter = new org.apache.hadoop.util.bloom.BloomFilter(); filter.readFields(is); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/io/storage/HoodieWrapperFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/io/storage/HoodieWrapperFileSystem.java index 5ddb837d3..63ee7df2b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/io/storage/HoodieWrapperFileSystem.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/io/storage/HoodieWrapperFileSystem.java @@ -114,7 +114,7 @@ public class HoodieWrapperFileSystem extends FileSystem { } @Override - public void initialize(URI uri, Configuration conf) throws IOException { + public void initialize(URI uri, Configuration conf) { // Get the default filesystem to decorate Path path = new Path(uri); // Remove 'hoodie-' prefix from path diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TimelineLayout.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TimelineLayout.java index 99bc2d4d8..e3e31dbdd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TimelineLayout.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TimelineLayout.java @@ -66,8 +66,8 @@ public abstract class TimelineLayout implements Serializable { @Override public Stream filterHoodieInstants(Stream instantStream) { return instantStream.collect(Collectors.groupingBy(instant -> Pair.of(instant.getTimestamp(), - HoodieInstant.getComparableAction(instant.getAction())))).entrySet().stream() - .map(e -> e.getValue().stream().reduce((x, y) -> { + HoodieInstant.getComparableAction(instant.getAction())))).values().stream() + .map(hoodieInstants -> hoodieInstants.stream().reduce((x, y) -> { // Pick the one with the highest state if (x.getState().compareTo(y.getState()) >= 0) { return x; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index a7226a7e6..f322d473a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -64,10 +64,10 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { public static final SimpleDateFormat COMMIT_FORMATTER = new SimpleDateFormat("yyyyMMddHHmmss"); public static final Set VALID_EXTENSIONS_IN_ACTIVE_TIMELINE = new HashSet<>(Arrays.asList( - new String[]{COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, REQUESTED_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION, - INFLIGHT_DELTA_COMMIT_EXTENSION, REQUESTED_DELTA_COMMIT_EXTENSION, SAVEPOINT_EXTENSION, - INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION, - INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION})); + COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, REQUESTED_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION, + INFLIGHT_DELTA_COMMIT_EXTENSION, REQUESTED_DELTA_COMMIT_EXTENSION, SAVEPOINT_EXTENSION, + INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION, + INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION)); private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class); protected HoodieTableMetaClient metaClient; @@ -79,7 +79,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { */ public static String createNewInstantTime() { return lastInstantTime.updateAndGet((oldVal) -> { - String newCommitTime = null; + String newCommitTime; do { newCommitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()); } while (HoodieTimeline.compareTimestamps(newCommitTime, oldVal, LESSER_OR_EQUAL)); @@ -255,7 +255,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { public void deleteCompactionRequested(HoodieInstant instant) { Preconditions.checkArgument(instant.isRequested()); - Preconditions.checkArgument(instant.getAction() == HoodieTimeline.COMPACTION_ACTION); + Preconditions.checkArgument(instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)); deleteInstantFile(instant); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index c61355ce3..78d6c6f96 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -65,7 +65,7 @@ public class HoodieDefaultTimeline implements HoodieTimeline { final MessageDigest md; try { md = MessageDigest.getInstance(HASHING_ALGORITHM); - this.instants.stream().forEach(i -> md + this.instants.forEach(i -> md .update(StringUtils.joinUsingDelim("_", i.getTimestamp(), i.getAction(), i.getState().name()).getBytes())); } catch (NoSuchAlgorithmException nse) { throw new HoodieException(nse); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java index 460d0c05c..460931ba6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java @@ -45,13 +45,13 @@ public class HoodieInstant implements Serializable, Comparable { .put(HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.COMMIT_ACTION).build(); public static final Comparator ACTION_COMPARATOR = - Comparator.comparing(instant -> getComparableAction(instant.getAction())); + Comparator.comparing(instant -> getComparableAction(instant.getAction())); public static final Comparator COMPARATOR = Comparator.comparing(HoodieInstant::getTimestamp) .thenComparing(ACTION_COMPARATOR).thenComparing(HoodieInstant::getState); - public static final String getComparableAction(String action) { - return COMPARABLE_ACTIONS.containsKey(action) ? COMPARABLE_ACTIONS.get(action) : action; + public static String getComparableAction(String action) { + return COMPARABLE_ACTIONS.getOrDefault(action, action); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java index 2a247acb3..8d50ef598 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java @@ -95,7 +95,7 @@ public class FileSystemViewManager { * Closes all views opened. */ public void close() { - this.globalViewMap.values().stream().forEach(v -> v.close()); + this.globalViewMap.values().forEach(SyncableFileSystemView::close); this.globalViewMap.clear(); } @@ -196,7 +196,7 @@ public class FileSystemViewManager { return new FileSystemViewManager(conf, config, (basePath, viewConfig) -> { RemoteHoodieTableFileSystemView remoteFileSystemView = createRemoteFileSystemView(conf, viewConfig, new HoodieTableMetaClient(conf.newCopy(), basePath)); - SyncableFileSystemView secondaryView = null; + SyncableFileSystemView secondaryView; switch (viewConfig.getSecondaryStorageType()) { case MEMORY: secondaryView = createInMemoryFileSystemView(conf, viewConfig, basePath); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java index b72696a4e..4cf6942b7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java @@ -69,7 +69,7 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig { } public boolean isIncrementalTimelineSyncEnabled() { - return Boolean.valueOf(props.getProperty(FILESYSTEM_VIEW_INCREMENTAL_SYNC_MODE)); + return Boolean.parseBoolean(props.getProperty(FILESYSTEM_VIEW_INCREMENTAL_SYNC_MODE)); } public String getRemoteViewServerHost() { @@ -87,10 +87,8 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig { public long getMaxMemoryForPendingCompaction() { long totalMemory = Long.parseLong(props.getProperty(FILESYSTEM_VIEW_SPILLABLE_MEM)); - long reservedForPendingComaction = - new Double(totalMemory * Double.parseDouble(props.getProperty(FILESYSTEM_VIEW_PENDING_COMPACTION_MEM_FRACTION))) - .longValue(); - return reservedForPendingComaction; + return new Double(totalMemory * Double.parseDouble(props.getProperty(FILESYSTEM_VIEW_PENDING_COMPACTION_MEM_FRACTION))) + .longValue(); } public String getBaseStoreDir() { @@ -113,12 +111,9 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig { private final Properties props = new Properties(); public Builder fromFile(File propertiesFile) throws IOException { - FileReader reader = new FileReader(propertiesFile); - try { + try (FileReader reader = new FileReader(propertiesFile)) { props.load(reader); return this; - } finally { - reader.close(); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java index 1a00bca98..a937647ea 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java @@ -85,7 +85,7 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste @Override protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline) { - schemaHelper.getAllColumnFamilies().stream().forEach(rocksDB::addColumnFamily); + schemaHelper.getAllColumnFamilies().forEach(rocksDB::addColumnFamily); super.init(metaClient, visibleActiveTimeline); LOG.info("Created ROCKSDB based file-system view at " + config.getRocksdbBasePath()); } @@ -98,39 +98,39 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste @Override protected void resetPendingCompactionOperations(Stream> operations) { rocksDB.writeBatch(batch -> { - operations.forEach(opPair -> { - rocksDB.putInBatch(batch, schemaHelper.getColFamilyForPendingCompaction(), - schemaHelper.getKeyForPendingCompactionLookup(opPair.getValue().getFileGroupId()), opPair); - }); + operations.forEach(opPair -> + rocksDB.putInBatch(batch, schemaHelper.getColFamilyForPendingCompaction(), + schemaHelper.getKeyForPendingCompactionLookup(opPair.getValue().getFileGroupId()), opPair) + ); LOG.info("Initializing pending compaction operations. Count=" + batch.count()); }); } @Override protected void addPendingCompactionOperations(Stream> operations) { - rocksDB.writeBatch(batch -> { - operations.forEach(opInstantPair -> { - Preconditions.checkArgument(!isPendingCompactionScheduledForFileId(opInstantPair.getValue().getFileGroupId()), - "Duplicate FileGroupId found in pending compaction operations. FgId :" - + opInstantPair.getValue().getFileGroupId()); - rocksDB.putInBatch(batch, schemaHelper.getColFamilyForPendingCompaction(), - schemaHelper.getKeyForPendingCompactionLookup(opInstantPair.getValue().getFileGroupId()), opInstantPair); - }); - }); + rocksDB.writeBatch(batch -> + operations.forEach(opInstantPair -> { + Preconditions.checkArgument(!isPendingCompactionScheduledForFileId(opInstantPair.getValue().getFileGroupId()), + "Duplicate FileGroupId found in pending compaction operations. FgId :" + + opInstantPair.getValue().getFileGroupId()); + rocksDB.putInBatch(batch, schemaHelper.getColFamilyForPendingCompaction(), + schemaHelper.getKeyForPendingCompactionLookup(opInstantPair.getValue().getFileGroupId()), opInstantPair); + }) + ); } @Override void removePendingCompactionOperations(Stream> operations) { - rocksDB.writeBatch(batch -> { - operations.forEach(opInstantPair -> { - Preconditions.checkArgument( - getPendingCompactionOperationWithInstant(opInstantPair.getValue().getFileGroupId()) != null, - "Trying to remove a FileGroupId which is not found in pending compaction operations. FgId :" - + opInstantPair.getValue().getFileGroupId()); - rocksDB.deleteInBatch(batch, schemaHelper.getColFamilyForPendingCompaction(), - schemaHelper.getKeyForPendingCompactionLookup(opInstantPair.getValue().getFileGroupId())); - }); - }); + rocksDB.writeBatch(batch -> + operations.forEach(opInstantPair -> { + Preconditions.checkArgument( + getPendingCompactionOperationWithInstant(opInstantPair.getValue().getFileGroupId()) != null, + "Trying to remove a FileGroupId which is not found in pending compaction operations. FgId :" + + opInstantPair.getValue().getFileGroupId()); + rocksDB.deleteInBatch(batch, schemaHelper.getColFamilyForPendingCompaction(), + schemaHelper.getKeyForPendingCompactionLookup(opInstantPair.getValue().getFileGroupId())); + }) + ); } @Override @@ -170,17 +170,16 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste schemaHelper.getPrefixForDataFileViewByPartition(partitionPath)); // Now add them - fileGroups.stream().forEach(fg -> { - rocksDB.writeBatch(batch -> { - fg.getAllFileSlicesIncludingInflight().forEach(fs -> { - rocksDB.putInBatch(batch, schemaHelper.getColFamilyForView(), schemaHelper.getKeyForSliceView(fg, fs), fs); - fs.getBaseFile().ifPresent(df -> { - rocksDB.putInBatch(batch, schemaHelper.getColFamilyForView(), schemaHelper.getKeyForDataFileView(fg, fs), - df); - }); - }); - }); - }); + fileGroups.forEach(fg -> + rocksDB.writeBatch(batch -> + fg.getAllFileSlicesIncludingInflight().forEach(fs -> { + rocksDB.putInBatch(batch, schemaHelper.getColFamilyForView(), schemaHelper.getKeyForSliceView(fg, fs), fs); + fs.getBaseFile().ifPresent(df -> + rocksDB.putInBatch(batch, schemaHelper.getColFamilyForView(), schemaHelper.getKeyForDataFileView(fg, fs), df) + ); + }) + ) + ); // record that partition is loaded. rocksDB.put(schemaHelper.getColFamilyForStoredPartitions(), lookupKey, Boolean.TRUE); @@ -194,69 +193,66 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste */ protected void applyDeltaFileSlicesToPartitionView(String partition, List deltaFileGroups, DeltaApplyMode mode) { - rocksDB.writeBatch(batch -> { - deltaFileGroups.stream().forEach(fg -> { - fg.getAllRawFileSlices().map(fs -> { - FileSlice oldSlice = getFileSlice(partition, fs.getFileId(), fs.getBaseInstantTime()); - if (null == oldSlice) { - return fs; - } else { - // First remove the file-slice - LOG.info("Removing old Slice in DB. FS=" + oldSlice); - rocksDB.deleteInBatch(batch, schemaHelper.getColFamilyForView(), - schemaHelper.getKeyForSliceView(fg, oldSlice)); - rocksDB.deleteInBatch(batch, schemaHelper.getColFamilyForView(), - schemaHelper.getKeyForDataFileView(fg, oldSlice)); + rocksDB.writeBatch(batch -> + deltaFileGroups.forEach(fg -> + fg.getAllRawFileSlices().map(fs -> { + FileSlice oldSlice = getFileSlice(partition, fs.getFileId(), fs.getBaseInstantTime()); + if (null == oldSlice) { + return fs; + } else { + // First remove the file-slice + LOG.info("Removing old Slice in DB. FS=" + oldSlice); + rocksDB.deleteInBatch(batch, schemaHelper.getColFamilyForView(), schemaHelper.getKeyForSliceView(fg, oldSlice)); + rocksDB.deleteInBatch(batch, schemaHelper.getColFamilyForView(), schemaHelper.getKeyForDataFileView(fg, oldSlice)); - Map logFiles = oldSlice.getLogFiles() - .map(lf -> Pair.of(Path.getPathWithoutSchemeAndAuthority(lf.getPath()).toString(), lf)) - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - Map deltaLogFiles = - fs.getLogFiles().map(lf -> Pair.of(Path.getPathWithoutSchemeAndAuthority(lf.getPath()).toString(), lf)) + Map logFiles = oldSlice.getLogFiles() + .map(lf -> Pair.of(Path.getPathWithoutSchemeAndAuthority(lf.getPath()).toString(), lf)) .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + Map deltaLogFiles = + fs.getLogFiles().map(lf -> Pair.of(Path.getPathWithoutSchemeAndAuthority(lf.getPath()).toString(), lf)) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - switch (mode) { - case ADD: { - FileSlice newFileSlice = new FileSlice(oldSlice.getFileGroupId(), oldSlice.getBaseInstantTime()); - oldSlice.getBaseFile().ifPresent(df -> newFileSlice.setBaseFile(df)); - fs.getBaseFile().ifPresent(df -> newFileSlice.setBaseFile(df)); - Map newLogFiles = new HashMap<>(logFiles); - deltaLogFiles.entrySet().stream().filter(e -> !logFiles.containsKey(e.getKey())) - .forEach(p -> newLogFiles.put(p.getKey(), p.getValue())); - newLogFiles.values().stream().forEach(lf -> newFileSlice.addLogFile(lf)); - LOG.info("Adding back new File Slice after add FS=" + newFileSlice); - return newFileSlice; - } - case REMOVE: { - LOG.info("Removing old File Slice =" + fs); - FileSlice newFileSlice = new FileSlice(oldSlice.getFileGroupId(), oldSlice.getBaseInstantTime()); - fs.getBaseFile().orElseGet(() -> { - oldSlice.getBaseFile().ifPresent(df -> newFileSlice.setBaseFile(df)); - return null; - }); + switch (mode) { + case ADD: { + FileSlice newFileSlice = new FileSlice(oldSlice.getFileGroupId(), oldSlice.getBaseInstantTime()); + oldSlice.getBaseFile().ifPresent(newFileSlice::setBaseFile); + fs.getBaseFile().ifPresent(newFileSlice::setBaseFile); + Map newLogFiles = new HashMap<>(logFiles); + deltaLogFiles.entrySet().stream().filter(e -> !logFiles.containsKey(e.getKey())) + .forEach(p -> newLogFiles.put(p.getKey(), p.getValue())); + newLogFiles.values().forEach(newFileSlice::addLogFile); + LOG.info("Adding back new File Slice after add FS=" + newFileSlice); + return newFileSlice; + } + case REMOVE: { + LOG.info("Removing old File Slice =" + fs); + FileSlice newFileSlice = new FileSlice(oldSlice.getFileGroupId(), oldSlice.getBaseInstantTime()); + fs.getBaseFile().orElseGet(() -> { + oldSlice.getBaseFile().ifPresent(newFileSlice::setBaseFile); + return null; + }); - deltaLogFiles.keySet().stream().forEach(p -> logFiles.remove(p)); - // Add remaining log files back - logFiles.values().stream().forEach(lf -> newFileSlice.addLogFile(lf)); - if (newFileSlice.getBaseFile().isPresent() || (newFileSlice.getLogFiles().count() > 0)) { - LOG.info("Adding back new file-slice after remove FS=" + newFileSlice); - return newFileSlice; + deltaLogFiles.keySet().forEach(logFiles::remove); + // Add remaining log files back + logFiles.values().forEach(newFileSlice::addLogFile); + if (newFileSlice.getBaseFile().isPresent() || (newFileSlice.getLogFiles().count() > 0)) { + LOG.info("Adding back new file-slice after remove FS=" + newFileSlice); + return newFileSlice; + } + return null; + } + default: + throw new IllegalStateException("Unknown diff apply mode=" + mode); } - return null; } - default: - throw new IllegalStateException("Unknown diff apply mode=" + mode); - } - } - }).filter(Objects::nonNull).forEach(fs -> { - rocksDB.putInBatch(batch, schemaHelper.getColFamilyForView(), schemaHelper.getKeyForSliceView(fg, fs), fs); - fs.getBaseFile().ifPresent(df -> { - rocksDB.putInBatch(batch, schemaHelper.getColFamilyForView(), schemaHelper.getKeyForDataFileView(fg, fs), - df); - }); - }); - }); - }); + }).filter(Objects::nonNull).forEach(fs -> { + rocksDB.putInBatch(batch, schemaHelper.getColFamilyForView(), schemaHelper.getKeyForSliceView(fg, fs), fs); + fs.getBaseFile().ifPresent(df -> + rocksDB.putInBatch(batch, schemaHelper.getColFamilyForView(), schemaHelper.getKeyForDataFileView(fg, fs), df) + ); + }) + ) + ); } @Override @@ -330,7 +326,7 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste private FileSlice getFileSlice(String partitionPath, String fileId, String instantTime) { String key = schemaHelper.getKeyForSliceView(partitionPath, fileId, instantTime); - return rocksDB.get(schemaHelper.getColFamilyForView(), key); + return rocksDB.get(schemaHelper.getColFamilyForView(), key); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java index 3ada17e95..c806523b1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java @@ -106,8 +106,6 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView { @Override public Stream fetchAllStoredFileGroups() { - return ((ExternalSpillableMap) partitionToFileGroupsMap).valueStream().flatMap(fg -> { - return ((List) fg).stream(); - }); + return ((ExternalSpillableMap) partitionToFileGroupsMap).valueStream().flatMap(fg -> ((List) fg).stream()); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java index a856926db..196289463 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java @@ -42,7 +42,7 @@ import org.apache.avro.specific.SpecificRecordBase; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -58,7 +58,7 @@ public class AvroUtils { ImmutableMap.Builder> commitToStatBuilder = ImmutableMap.builder(); for (Map.Entry> commitToStat : commitToStats.entrySet()) { commitToStatBuilder.put(commitToStat.getKey(), - Arrays.asList(convertRollbackMetadata(startRestoreTime, durationInMs, commits, commitToStat.getValue()))); + Collections.singletonList(convertRollbackMetadata(startRestoreTime, durationInMs, commits, commitToStat.getValue()))); } return new HoodieRestoreMetadata(startRestoreTime, durationInMs.orElseGet(() -> -1L), commits, commitToStatBuilder.build(), DEFAULT_VERSION); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BufferedRandomAccessFile.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BufferedRandomAccessFile.java index 416208b8b..25fe7b080 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/BufferedRandomAccessFile.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BufferedRandomAccessFile.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -59,7 +59,7 @@ import java.nio.ByteBuffer; public final class BufferedRandomAccessFile extends RandomAccessFile { private static final Logger LOG = Logger.getLogger(BufferedRandomAccessFile.class); static final int DEFAULT_BUFFER_SIZE = (1 << 16); // 64K buffer - static final int BUFFER_BOUNDARY_MASK = ~(DEFAULT_BUFFER_SIZE - 1); + static final int BUFFER_BOUNDARY_MASK = -DEFAULT_BUFFER_SIZE; private int capacity; private ByteBuffer dataBuffer; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java index 84b7ee4c5..1d32b6406 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java @@ -55,10 +55,9 @@ public class CleanerUtils { } } - HoodieCleanMetadata metadata = new HoodieCleanMetadata(startCleanTime, + return new HoodieCleanMetadata(startCleanTime, durationInMs.orElseGet(() -> -1L), totalDeleted, earliestCommitToRetain, partitionMetadataBuilder.build(), CLEAN_METADATA_VERSION_2); - return metadata; } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java index 40988b49d..c5ce52673 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java @@ -90,7 +90,7 @@ public class CompactionUtils { Option> extraMetadata, Option, Map>> metricsCaptureFunction) { HoodieCompactionPlan.Builder builder = HoodieCompactionPlan.newBuilder(); - extraMetadata.ifPresent(m -> builder.setExtraMetadata(m)); + extraMetadata.ifPresent(builder::setExtraMetadata); builder.setOperations(partitionFileSlicePairs.stream() .map(pfPair -> buildFromFileSlice(pfPair.getKey(), pfPair.getValue(), metricsCaptureFunction)) @@ -157,25 +157,23 @@ public class CompactionUtils { Map> fgIdToPendingCompactionWithInstantMap = new HashMap<>(); - pendingCompactionPlanWithInstants.stream().flatMap(instantPlanPair -> { - return getPendingCompactionOperations(instantPlanPair.getKey(), instantPlanPair.getValue()); - }).forEach(pair -> { - // Defensive check to ensure a single-fileId does not have more than one pending compaction with different - // file slices. If we find a full duplicate we assume it is caused by eventual nature of the move operation - // on some DFSs. - if (fgIdToPendingCompactionWithInstantMap.containsKey(pair.getKey())) { - HoodieCompactionOperation operation = pair.getValue().getValue(); - HoodieCompactionOperation anotherOperation = - fgIdToPendingCompactionWithInstantMap.get(pair.getKey()).getValue(); + pendingCompactionPlanWithInstants.stream().flatMap(instantPlanPair -> + getPendingCompactionOperations(instantPlanPair.getKey(), instantPlanPair.getValue())).forEach(pair -> { + // Defensive check to ensure a single-fileId does not have more than one pending compaction with different + // file slices. If we find a full duplicate we assume it is caused by eventual nature of the move operation + // on some DFSs. + if (fgIdToPendingCompactionWithInstantMap.containsKey(pair.getKey())) { + HoodieCompactionOperation operation = pair.getValue().getValue(); + HoodieCompactionOperation anotherOperation = fgIdToPendingCompactionWithInstantMap.get(pair.getKey()).getValue(); - if (!operation.equals(anotherOperation)) { - String msg = "Hudi File Id (" + pair.getKey() + ") has more than 1 pending compactions. Instants: " - + pair.getValue() + ", " + fgIdToPendingCompactionWithInstantMap.get(pair.getKey()); - throw new IllegalStateException(msg); - } - } - fgIdToPendingCompactionWithInstantMap.put(pair.getKey(), pair.getValue()); - }); + if (!operation.equals(anotherOperation)) { + String msg = "Hudi File Id (" + pair.getKey() + ") has more than 1 pending compactions. Instants: " + + pair.getValue() + ", " + fgIdToPendingCompactionWithInstantMap.get(pair.getKey()); + throw new IllegalStateException(msg); + } + } + fgIdToPendingCompactionWithInstantMap.put(pair.getKey(), pair.getValue()); + }); return fgIdToPendingCompactionWithInstantMap; } @@ -183,10 +181,8 @@ public class CompactionUtils { HoodieInstant instant, HoodieCompactionPlan compactionPlan) { List ops = compactionPlan.getOperations(); if (null != ops) { - return ops.stream().map(op -> { - return Pair.of(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()), - Pair.of(instant.getTimestamp(), op)); - }); + return ops.stream().map(op -> Pair.of(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()), + Pair.of(instant.getTimestamp(), op))); } else { return Stream.empty(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ConsistencyGuardConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ConsistencyGuardConfig.java index 152e3f7e7..8a5017af1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConsistencyGuardConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConsistencyGuardConfig.java @@ -78,12 +78,9 @@ public class ConsistencyGuardConfig extends DefaultHoodieConfig { private final Properties props = new Properties(); public Builder fromFile(File propertiesFile) throws IOException { - FileReader reader = new FileReader(propertiesFile); - try { + try (FileReader reader = new FileReader(propertiesFile)) { props.load(reader); return this; - } finally { - reader.close(); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java index d9161e512..43b0030e5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java @@ -70,12 +70,7 @@ public class FSUtils { private static final long MIN_ROLLBACK_TO_KEEP = 10; private static final String HOODIE_ENV_PROPS_PREFIX = "HOODIE_ENV_"; - private static final PathFilter ALLOW_ALL_FILTER = new PathFilter() { - @Override - public boolean accept(Path file) { - return true; - } - }; + private static final PathFilter ALLOW_ALL_FILTER = file -> true; public static Configuration prepareHadoopConf(Configuration conf) { conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); @@ -93,7 +88,7 @@ public class FSUtils { public static FileSystem getFs(String path, Configuration conf) { FileSystem fs; - conf = prepareHadoopConf(conf); + prepareHadoopConf(conf); try { fs = new Path(path).getFileSystem(conf); } catch (IOException e) { @@ -226,8 +221,7 @@ public class FSUtils { boolean excludeMetaFolder) throws IOException { PathFilter pathFilter = excludeMetaFolder ? getExcludeMetaPathFilter() : ALLOW_ALL_FILTER; FileStatus[] topLevelStatuses = fs.listStatus(new Path(basePathStr)); - for (int i = 0; i < topLevelStatuses.length; i++) { - FileStatus child = topLevelStatuses[i]; + for (FileStatus child : topLevelStatuses) { if (child.isFile()) { boolean success = consumer.apply(child); if (!success) { @@ -264,12 +258,7 @@ public class FSUtils { private static PathFilter getExcludeMetaPathFilter() { // Avoid listing and including any folders under the metafolder - return (path) -> { - if (path.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME)) { - return false; - } - return true; - }; + return (path) -> !path.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME); } public static String getInstantTime(String name) { @@ -396,17 +385,14 @@ public class FSUtils { public static boolean isLogFile(Path logPath) { Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName()); - if (!matcher.find()) { - return false; - } - return true; + return matcher.find(); } /** * Get the latest log file written from the list of log files passed in. */ public static Option getLatestLogFile(Stream logFiles) { - return Option.fromJavaOptional(logFiles.sorted(HoodieLogFile.getReverseLogFileComparator()).findFirst()); + return Option.fromJavaOptional(logFiles.min(HoodieLogFile.getReverseLogFileComparator())); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FailSafeConsistencyGuard.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FailSafeConsistencyGuard.java index b4a099179..70ceed0f9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/FailSafeConsistencyGuard.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FailSafeConsistencyGuard.java @@ -81,7 +81,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard { public void waitForFilesVisibility(String dirPath, List files, FileVisibility event) throws TimeoutException { Path dir = new Path(dirPath); List filesWithoutSchemeAndAuthority = - files.stream().map(f -> Path.getPathWithoutSchemeAndAuthority(new Path(f))).map(p -> p.toString()) + files.stream().map(f -> Path.getPathWithoutSchemeAndAuthority(new Path(f))).map(Path::toString) .collect(Collectors.toList()); retryTillSuccess((retryNum) -> { @@ -89,7 +89,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard { LOG.info("Trying " + retryNum); FileStatus[] entries = fs.listStatus(dir); List gotFiles = Arrays.stream(entries).map(e -> Path.getPathWithoutSchemeAndAuthority(e.getPath())) - .map(p -> p.toString()).collect(Collectors.toList()); + .map(Path::toString).collect(Collectors.toList()); List candidateFiles = new ArrayList<>(filesWithoutSchemeAndAuthority); boolean altered = candidateFiles.removeAll(gotFiles); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java index d030ce810..02e48e3a4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java @@ -43,6 +43,7 @@ import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -84,7 +85,7 @@ public class HoodieAvroUtils { public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOException { BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, reuseDecoder.get()); reuseDecoder.set(decoder); - GenericDatumReader reader = new GenericDatumReader(schema); + GenericDatumReader reader = new GenericDatumReader<>(schema); return reader.read(null, decoder); } @@ -141,7 +142,7 @@ public class HoodieAvroUtils { Schema.Field recordKeyField = new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance()); Schema recordKeySchema = Schema.createRecord("HoodieRecordKey", "", "", false); - recordKeySchema.setFields(Arrays.asList(recordKeyField)); + recordKeySchema.setFields(Collections.singletonList(recordKeyField)); return recordKeySchema; } @@ -166,9 +167,8 @@ public class HoodieAvroUtils { * @param newFieldNames Null Field names to be added */ public static Schema appendNullSchemaFields(Schema schema, List newFieldNames) { - List newFields = schema.getFields().stream().map(field -> { - return new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue()); - }).collect(Collectors.toList()); + List newFields = schema.getFields().stream() + .map(field -> new Field(field.name(), field.schema(), field.doc(), field.defaultValue())).collect(Collectors.toList()); for (String newField : newFieldNames) { newFields.add(new Schema.Field(newField, METADATA_FIELD_SCHEMA, "", NullNode.getInstance())); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/LogReaderUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/LogReaderUtils.java index 649396d36..52b30cbc2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/LogReaderUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/LogReaderUtils.java @@ -49,7 +49,7 @@ public class LogReaderUtils { HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); while (reader.hasPrev()) { HoodieLogBlock block = reader.prev(); - if (block instanceof HoodieAvroDataBlock && block != null) { + if (block instanceof HoodieAvroDataBlock) { HoodieAvroDataBlock lastBlock = (HoodieAvroDataBlock) block; if (completedTimeline .containsOrBeforeTimelineStarts(lastBlock.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME))) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ObjectSizeCalculator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ObjectSizeCalculator.java index b11ac6c1d..d39600044 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ObjectSizeCalculator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ObjectSizeCalculator.java @@ -132,7 +132,7 @@ public class ObjectSizeCalculator { private final Set alreadyVisited = Sets.newIdentityHashSet(); - private final Deque pending = new ArrayDeque(16 * 1024); + private final Deque pending = new ArrayDeque<>(16 * 1024); private long size; /** @@ -268,7 +268,7 @@ public class ObjectSizeCalculator { public ClassSizeInfo(Class clazz) { long fieldsSize = 0; - final List referenceFields = new LinkedList(); + final List referenceFields = new LinkedList<>(); for (Field f : clazz.getDeclaredFields()) { if (Modifier.isStatic(f.getModifiers())) { continue; @@ -303,9 +303,7 @@ public class ObjectSizeCalculator { try { calc.enqueue(f.get(obj)); } catch (IllegalAccessException e) { - final AssertionError ae = new AssertionError("Unexpected denial of access to " + f); - ae.initCause(e); - throw ae; + throw new AssertionError("Unexpected denial of access to " + f, e); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBDAO.java b/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBDAO.java index 59af74bf2..0115ec2b9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBDAO.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBDAO.java @@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.rocksdb.AbstractImmutableNativeReference; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; @@ -61,13 +62,11 @@ public class RocksDBDAO { private transient ConcurrentHashMap managedDescriptorMap; private transient RocksDB rocksDB; private boolean closed = false; - private final String basePath; private final String rocksDBBasePath; public RocksDBDAO(String basePath, String rocksDBBasePath) { - this.basePath = basePath; this.rocksDBBasePath = - String.format("%s/%s/%s", rocksDBBasePath, this.basePath.replace("/", "_"), UUID.randomUUID().toString()); + String.format("%s/%s/%s", rocksDBBasePath, basePath.replace("/", "_"), UUID.randomUUID().toString()); init(); } @@ -153,14 +152,11 @@ public class RocksDBDAO { * Perform a batch write operation. */ public void writeBatch(BatchHandler handler) { - WriteBatch batch = new WriteBatch(); - try { + try (WriteBatch batch = new WriteBatch()) { handler.apply(batch); getRocksDB().write(new WriteOptions(), batch); } catch (RocksDBException re) { throw new HoodieException(re); - } finally { - batch.close(); } } @@ -442,9 +438,7 @@ public class RocksDBDAO { public synchronized void close() { if (!closed) { closed = true; - managedHandlesMap.values().forEach(columnFamilyHandle -> { - columnFamilyHandle.close(); - }); + managedHandlesMap.values().forEach(AbstractImmutableNativeReference::close); managedHandlesMap.clear(); managedDescriptorMap.clear(); getRocksDB().close(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java index 1c17e8334..9096080bb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java @@ -41,7 +41,7 @@ public class SerializationUtils { // Caching kryo serializer to avoid creating kryo instance for every serde operation private static final ThreadLocal SERIALIZER_REF = - ThreadLocal.withInitial(() -> new KryoSerializerInstance()); + ThreadLocal.withInitial(KryoSerializerInstance::new); // Serialize // ----------------------------------------------------------------------- @@ -99,7 +99,7 @@ public class SerializationUtils { kryo.setRegistrationRequired(false); } - byte[] serialize(Object obj) throws IOException { + byte[] serialize(Object obj) { kryo.reset(); baos.reset(); Output output = new Output(baos); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBBasedMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBBasedMap.java index fd211e0f7..38dcaaaf3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBBasedMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBBasedMap.java @@ -84,11 +84,7 @@ public final class RocksDBBasedMap m) { - getRocksDBDAO().writeBatch(batch -> { - m.entrySet().forEach(entry -> { - getRocksDBDAO().putInBatch(batch, columnFamilyName, entry.getKey(), entry.getValue()); - }); - }); + getRocksDBDAO().writeBatch(batch -> m.forEach((key, value) -> getRocksDBDAO().putInBatch(batch, columnFamilyName, key, value))); } private RocksDBDAO getRocksDBDAO() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java index 2c5ce5d97..7ea6d5e1b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java @@ -73,7 +73,7 @@ public class BoundedInMemoryQueue implements Iterable { // it holds the root cause of the exception in case either queueing records (consuming from // inputIterator) fails or // thread reading records from queue fails. - private final AtomicReference hasFailed = new AtomicReference(null); + private final AtomicReference hasFailed = new AtomicReference<>(null); // used for indicating that all the records from queue are read successfully. private final AtomicBoolean isReadDone = new AtomicBoolean(false); // used for indicating that all records have been enqueued @@ -222,7 +222,7 @@ public class BoundedInMemoryQueue implements Iterable { /** * Puts an empty entry to queue to denote termination. */ - public void close() throws InterruptedException { + public void close() { // done queueing records notifying queue-reader. isWriteDone.set(true); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java index 6f27dbce6..71890b196 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java @@ -70,6 +70,7 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.Iterator; @@ -125,7 +126,7 @@ public class HoodieTestUtils { return new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()); } - public static final void createCommitFiles(String basePath, String... commitTimes) throws IOException { + public static void createCommitFiles(String basePath, String... commitTimes) throws IOException { for (String commitTime : commitTimes) { new File( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" @@ -139,11 +140,11 @@ public class HoodieTestUtils { } } - public static final void createMetadataFolder(String basePath) { + public static void createMetadataFolder(String basePath) { new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME).mkdirs(); } - public static final void createInflightCommitFiles(String basePath, String... commitTimes) throws IOException { + public static void createInflightCommitFiles(String basePath, String... commitTimes) throws IOException { for (String commitTime : commitTimes) { new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" @@ -153,7 +154,7 @@ public class HoodieTestUtils { } } - public static final void createPendingCleanFiles(HoodieTableMetaClient metaClient, String... commitTimes) { + public static void createPendingCleanFiles(HoodieTableMetaClient metaClient, String... commitTimes) { for (String commitTime : commitTimes) { Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(commitTime), HoodieTimeline.makeInflightCleanerFileName(commitTime)).forEach(f -> { @@ -180,19 +181,19 @@ public class HoodieTestUtils { } } - public static final String createNewDataFile(String basePath, String partitionPath, String commitTime) + public static String createNewDataFile(String basePath, String partitionPath, String commitTime) throws IOException { String fileID = UUID.randomUUID().toString(); return createDataFile(basePath, partitionPath, commitTime, fileID); } - public static final String createNewMarkerFile(String basePath, String partitionPath, String commitTime) + public static String createNewMarkerFile(String basePath, String partitionPath, String commitTime) throws IOException { String fileID = UUID.randomUUID().toString(); return createMarkerFile(basePath, partitionPath, commitTime, fileID); } - public static final String createDataFile(String basePath, String partitionPath, String commitTime, String fileID) + public static String createDataFile(String basePath, String partitionPath, String commitTime, String fileID) throws IOException { String folderPath = basePath + "/" + partitionPath + "/"; new File(folderPath).mkdirs(); @@ -200,7 +201,7 @@ public class HoodieTestUtils { return fileID; } - public static final String createMarkerFile(String basePath, String partitionPath, String commitTime, String fileID) + public static String createMarkerFile(String basePath, String partitionPath, String commitTime, String fileID) throws IOException { String folderPath = basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME + "/" + commitTime + "/" + partitionPath + "/"; @@ -210,7 +211,7 @@ public class HoodieTestUtils { return f.getAbsolutePath(); } - public static final String createNewLogFile(FileSystem fs, String basePath, String partitionPath, String commitTime, + public static String createNewLogFile(FileSystem fs, String basePath, String partitionPath, String commitTime, String fileID, Option version) throws IOException { String folderPath = basePath + "/" + partitionPath + "/"; boolean makeDir = fs.mkdirs(new Path(folderPath)); @@ -226,7 +227,7 @@ public class HoodieTestUtils { return fileID; } - public static final void createCompactionCommitFiles(FileSystem fs, String basePath, String... commitTimes) + public static void createCompactionCommitFiles(FileSystem fs, String basePath, String... commitTimes) throws IOException { for (String commitTime : commitTimes) { boolean createFile = fs.createNewFile(new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" @@ -237,7 +238,7 @@ public class HoodieTestUtils { } } - public static final void createCompactionRequest(HoodieTableMetaClient metaClient, String instant, + public static void createCompactionRequest(HoodieTableMetaClient metaClient, String instant, List> fileSliceList) throws IOException { HoodieCompactionPlan plan = CompactionUtils.buildFromFileSlices(fileSliceList, Option.empty(), Option.empty()); HoodieInstant compactionInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instant); @@ -245,47 +246,47 @@ public class HoodieTestUtils { AvroUtils.serializeCompactionPlan(plan)); } - public static final String getDataFilePath(String basePath, String partitionPath, String commitTime, String fileID) { + public static String getDataFilePath(String basePath, String partitionPath, String commitTime, String fileID) { return basePath + "/" + partitionPath + "/" + FSUtils.makeDataFileName(commitTime, DEFAULT_WRITE_TOKEN, fileID); } - public static final String getLogFilePath(String basePath, String partitionPath, String commitTime, String fileID, + public static String getLogFilePath(String basePath, String partitionPath, String commitTime, String fileID, Option version) { return basePath + "/" + partitionPath + "/" + FSUtils.makeLogFileName(fileID, ".log", commitTime, version.orElse(DEFAULT_LOG_VERSION), HoodieLogFormat.UNKNOWN_WRITE_TOKEN); } - public static final String getCommitFilePath(String basePath, String commitTime) { + public static String getCommitFilePath(String basePath, String commitTime) { return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitTime + HoodieTimeline.COMMIT_EXTENSION; } - public static final String getInflightCommitFilePath(String basePath, String commitTime) { + public static String getInflightCommitFilePath(String basePath, String commitTime) { return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitTime + HoodieTimeline.INFLIGHT_COMMIT_EXTENSION; } - public static final String getRequestedCompactionFilePath(String basePath, String commitTime) { + public static String getRequestedCompactionFilePath(String basePath, String commitTime) { return basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + commitTime + HoodieTimeline.INFLIGHT_COMMIT_EXTENSION; } - public static final boolean doesDataFileExist(String basePath, String partitionPath, String commitTime, + public static boolean doesDataFileExist(String basePath, String partitionPath, String commitTime, String fileID) { return new File(getDataFilePath(basePath, partitionPath, commitTime, fileID)).exists(); } - public static final boolean doesLogFileExist(String basePath, String partitionPath, String commitTime, String fileID, + public static boolean doesLogFileExist(String basePath, String partitionPath, String commitTime, String fileID, Option version) { return new File(getLogFilePath(basePath, partitionPath, commitTime, fileID, version)).exists(); } - public static final boolean doesCommitExist(String basePath, String commitTime) { + public static boolean doesCommitExist(String basePath, String commitTime) { return new File( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitTime + HoodieTimeline.COMMIT_EXTENSION) .exists(); } - public static final boolean doesInflightExist(String basePath, String commitTime) { + public static boolean doesInflightExist(String basePath, String commitTime) { return new File( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitTime + HoodieTimeline.INFLIGHT_EXTENSION) .exists(); @@ -298,19 +299,16 @@ public class HoodieTestUtils { Path commitFile = new Path( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCleanerFileName(commitTime)); FileSystem fs = FSUtils.getFs(basePath, configuration); - FSDataOutputStream os = fs.create(commitFile, true); - try { + try (FSDataOutputStream os = fs.create(commitFile, true)) { HoodieCleanStat cleanStats = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, DEFAULT_PARTITION_PATHS[rand.nextInt(DEFAULT_PARTITION_PATHS.length)], new ArrayList<>(), new ArrayList<>(), new ArrayList<>(), commitTime); // Create the clean metadata HoodieCleanMetadata cleanMetadata = - CleanerUtils.convertCleanMetadata(metaClient, commitTime, Option.of(0L), Arrays.asList(cleanStats)); + CleanerUtils.convertCleanMetadata(metaClient, commitTime, Option.of(0L), Collections.singletonList(cleanStats)); // Write empty clean metadata os.write(AvroUtils.serializeCleanMetadata(cleanMetadata).get()); - } finally { - os.close(); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java index ab1d95ed8..68646231e 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; -import com.google.common.collect.Lists; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; @@ -34,6 +33,7 @@ import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.util.Arrays; import java.util.stream.Collectors; import static org.junit.Assert.assertArrayEquals; @@ -120,7 +120,7 @@ public class TestHoodieTableMetaClient extends HoodieCommonTestHarness { HoodieInstant instant2 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "2"); HoodieInstant instant3 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "3"); - assertEquals(Lists.newArrayList(instant1, instant2, instant3), + assertEquals(Arrays.asList(instant1, instant2, instant3), archivedTimeline.getInstants().collect(Collectors.toList())); assertArrayEquals(new Text("data1").getBytes(), archivedTimeline.getInstantDetails(instant1).get()); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java index 652cca926..1b9667c1e 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java @@ -86,7 +86,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { private FileSystem fs; private Path partitionPath; private int bufferSize = 4096; - private Boolean readBlocksLazily = true; + private Boolean readBlocksLazily; public TestHoodieLogFormat(Boolean readBlocksLazily) { this.readBlocksLazily = readBlocksLazily; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java index fc05e2f01..2b8f04fff 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java @@ -209,7 +209,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness { * @return */ private Stream getAllRawFileSlices(String partitionPath) { - return fsView.getAllFileGroups(partitionPath).map(group -> group.getAllFileSlicesIncludingInflight()) + return fsView.getAllFileGroups(partitionPath).map(HoodieFileGroup::getAllFileSlicesIncludingInflight) .flatMap(sliceList -> sliceList); } @@ -220,7 +220,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness { * @return */ public Stream getLatestRawFileSlices(String partitionPath) { - return fsView.getAllFileGroups(partitionPath).map(fileGroup -> fileGroup.getLatestFileSlicesIncludingInflight()) + return fsView.getAllFileGroups(partitionPath).map(HoodieFileGroup::getLatestFileSlicesIncludingInflight) .filter(fileSliceOpt -> fileSliceOpt.isPresent()).map(Option::get); } @@ -275,7 +275,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness { partitionFileSlicesPairs.add(Pair.of(partitionPath, fileSlices.get(0))); HoodieCompactionPlan compactionPlan = CompactionUtils.buildFromFileSlices(partitionFileSlicesPairs, Option.empty(), Option.empty()); - HoodieInstant compactionInstant = null; + HoodieInstant compactionInstant; if (isCompactionInFlight) { // Create a Data-file but this should be skipped by view new File(basePath + "/" + partitionPath + "/" + compactDataFileName).createNewFile(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java index 7a4efccb6..e38625a3a 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java @@ -62,6 +62,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -115,13 +116,13 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { unscheduleCompaction(view, "14", "13", "11"); // Add one more delta instant - instantsToFiles.putAll(testMultipleWriteSteps(view, Arrays.asList("15"), true, "11")); + instantsToFiles.putAll(testMultipleWriteSteps(view, Collections.singletonList("15"), true, "11")); // Schedule Compaction again scheduleCompaction(view, "16"); // Run Compaction - This will be the second file-slice - testMultipleWriteSteps(view, Arrays.asList("16"), false, "16", 2); + testMultipleWriteSteps(view, Collections.singletonList("16"), false, "16", 2); // Run 2 more ingest instantsToFiles.putAll(testMultipleWriteSteps(view, Arrays.asList("17", "18"), true, "16", 2)); @@ -130,25 +131,25 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { scheduleCompaction(view, "19"); // Run one more ingestion after pending compaction. THis will be 3rd slice - instantsToFiles.putAll(testMultipleWriteSteps(view, Arrays.asList("20"), true, "19", 3)); + instantsToFiles.putAll(testMultipleWriteSteps(view, Collections.singletonList("20"), true, "19", 3)); // Clean first slice - testCleans(view, Arrays.asList("21"), + testCleans(view, Collections.singletonList("21"), new ImmutableMap.Builder>().put("11", Arrays.asList("12", "13", "15")).build(), - instantsToFiles, Arrays.asList("11")); + instantsToFiles, Collections.singletonList("11")); // Add one more ingestion instant. This should be 2nd slice now - instantsToFiles.putAll(testMultipleWriteSteps(view, Arrays.asList("22"), true, "19", 2)); + instantsToFiles.putAll(testMultipleWriteSteps(view, Collections.singletonList("22"), true, "19", 2)); // Restore last ingestion - testRestore(view, Arrays.asList("23"), true, new HashMap<>(), Arrays.asList("22"), "24", false); + testRestore(view, Collections.singletonList("23"), true, new HashMap<>(), Collections.singletonList("22"), "24", false); // Run one more ingestion. THis is still 2nd slice - instantsToFiles.putAll(testMultipleWriteSteps(view, Arrays.asList("24"), true, "19", 2)); + instantsToFiles.putAll(testMultipleWriteSteps(view, Collections.singletonList("24"), true, "19", 2)); // Finish Compaction - instantsToFiles.putAll(testMultipleWriteSteps(view, Arrays.asList("19"), false, "19", 2, - Arrays.asList(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "24")))); + instantsToFiles.putAll(testMultipleWriteSteps(view, Collections.singletonList("19"), false, "19", 2, + Collections.singletonList(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "24")))); } @Test @@ -198,13 +199,13 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { SyncableFileSystemView view1 = getFileSystemView(metaClient); view1.sync(); - Map> instantsToFiles = null; + Map> instantsToFiles; /** * Case where incremental syncing is catching up on more than one ingestion at a time */ // Run 1 ingestion on MOR table (1 delta commits). View1 is now sync up to this point - instantsToFiles = testMultipleWriteSteps(view1, Arrays.asList("11"), true, "11"); + instantsToFiles = testMultipleWriteSteps(view1, Collections.singletonList("11"), true, "11"); SyncableFileSystemView view2 = getFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath())); @@ -213,7 +214,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { instantsToFiles.putAll(testMultipleWriteSteps(view2, Arrays.asList("12", "13"), true, "11")); // Now Sync view1 and add 1 more ingestion. Check if view1 is able to catchup correctly - instantsToFiles.putAll(testMultipleWriteSteps(view1, Arrays.asList("14"), true, "11")); + instantsToFiles.putAll(testMultipleWriteSteps(view1, Collections.singletonList("14"), true, "11")); view2.sync(); SyncableFileSystemView view3 = @@ -238,8 +239,8 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { scheduleCompaction(view2, "16"); instantsToFiles.putAll(testMultipleWriteSteps(view2, Arrays.asList("17", "18"), true, "16", 2)); // Compaction - testMultipleWriteSteps(view2, Arrays.asList("16"), false, "16", 2, - Arrays.asList(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "18"))); + testMultipleWriteSteps(view2, Collections.singletonList("16"), false, "16", 2, + Collections.singletonList(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "18"))); view1.sync(); areViewsConsistent(view1, view2, partitions.size() * fileIdsPerPartition.size() * 2); SyncableFileSystemView view5 = @@ -249,14 +250,14 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { /** * Case where a clean happened and then rounds of ingestion and compaction happened */ - testCleans(view2, Arrays.asList("19"), + testCleans(view2, Collections.singletonList("19"), new ImmutableMap.Builder>().put("11", Arrays.asList("12", "13", "14")).build(), - instantsToFiles, Arrays.asList("11")); + instantsToFiles, Collections.singletonList("11")); scheduleCompaction(view2, "20"); instantsToFiles.putAll(testMultipleWriteSteps(view2, Arrays.asList("21", "22"), true, "20", 2)); // Compaction - testMultipleWriteSteps(view2, Arrays.asList("20"), false, "20", 2, - Arrays.asList(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "22"))); + testMultipleWriteSteps(view2, Collections.singletonList("20"), false, "20", 2, + Collections.singletonList(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "22"))); // Run one more round of ingestion instantsToFiles.putAll(testMultipleWriteSteps(view2, Arrays.asList("23", "24"), true, "20", 2)); view1.sync(); @@ -268,14 +269,14 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { /** * Case where multiple restores and ingestions happened */ - testRestore(view2, Arrays.asList("25"), true, new HashMap<>(), Arrays.asList("24"), "29", true); - testRestore(view2, Arrays.asList("26"), true, new HashMap<>(), Arrays.asList("23"), "29", false); - instantsToFiles.putAll(testMultipleWriteSteps(view2, Arrays.asList("27"), true, "20", 2)); + testRestore(view2, Collections.singletonList("25"), true, new HashMap<>(), Collections.singletonList("24"), "29", true); + testRestore(view2, Collections.singletonList("26"), true, new HashMap<>(), Collections.singletonList("23"), "29", false); + instantsToFiles.putAll(testMultipleWriteSteps(view2, Collections.singletonList("27"), true, "20", 2)); scheduleCompaction(view2, "28"); - instantsToFiles.putAll(testMultipleWriteSteps(view2, Arrays.asList("29"), true, "28", 3)); + instantsToFiles.putAll(testMultipleWriteSteps(view2, Collections.singletonList("29"), true, "28", 3)); // Compaction - testMultipleWriteSteps(view2, Arrays.asList("28"), false, "28", 3, - Arrays.asList(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "29"))); + testMultipleWriteSteps(view2, Collections.singletonList("28"), false, "28", 3, + Collections.singletonList(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "29"))); Arrays.asList(view1, view2, view3, view4, view5, view6).forEach(v -> { v.sync(); @@ -371,8 +372,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { LOG.info("Last Instant is :" + view.getLastInstant().get()); if (isRestore) { Assert.assertEquals(newRestoreInstants.get(idx), view.getLastInstant().get().getTimestamp()); - Assert.assertEquals(isRestore ? HoodieTimeline.RESTORE_ACTION : HoodieTimeline.ROLLBACK_ACTION, - view.getLastInstant().get().getAction()); + Assert.assertEquals(HoodieTimeline.RESTORE_ACTION, view.getLastInstant().get().getAction()); } Assert.assertEquals(State.COMPLETED, view.getLastInstant().get().getState()); @@ -532,9 +532,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { view.sync(); Assert.assertEquals(newLastInstant, view.getLastInstant().get().getTimestamp()); - partitions.forEach(p -> view.getLatestFileSlices(p).forEach(fs -> { - Assert.assertEquals(newBaseInstant, fs.getBaseInstantTime()); - })); + partitions.forEach(p -> view.getLatestFileSlices(p).forEach(fs -> Assert.assertEquals(newBaseInstant, fs.getBaseInstantTime()))); } /** @@ -618,17 +616,11 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { final long expTotalFileSlicesPerPartition = fileIdsPerPartition.size() * multiple; partitions.forEach(p -> Assert.assertEquals(expTotalFileSlicesPerPartition, view.getAllFileSlices(p).count())); if (deltaCommit) { - partitions.forEach(p -> { - view.getLatestFileSlices(p).forEach(f -> { - Assert.assertEquals(baseInstantForDeltaCommit, f.getBaseInstantTime()); - }); - }); + partitions.forEach(p -> + view.getLatestFileSlices(p).forEach(f -> Assert.assertEquals(baseInstantForDeltaCommit, f.getBaseInstantTime())) + ); } else { - partitions.forEach(p -> { - view.getLatestBaseFiles(p).forEach(f -> { - Assert.assertEquals(instant, f.getCommitTime()); - }); - }); + partitions.forEach(p -> view.getLatestBaseFiles(p).forEach(f -> Assert.assertEquals(instant, f.getCommitTime()))); } metaClient.reloadActiveTimeline(); @@ -704,23 +696,21 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { private List addInstant(HoodieTableMetaClient metaClient, String instant, boolean deltaCommit, String baseInstant) throws IOException { - List> writeStats = partitions.stream().flatMap(p -> { - return fileIdsPerPartition.stream().map(f -> { - try { - File file = new File(basePath + "/" + p + "/" - + (deltaCommit - ? FSUtils.makeLogFileName(f, ".log", baseInstant, Integer.parseInt(instant), TEST_WRITE_TOKEN) - : FSUtils.makeDataFileName(instant, TEST_WRITE_TOKEN, f))); - file.createNewFile(); - HoodieWriteStat w = new HoodieWriteStat(); - w.setFileId(f); - w.setPath(String.format("%s/%s", p, file.getName())); - return Pair.of(p, w); - } catch (IOException e) { - throw new HoodieException(e); - } - }); - }).collect(Collectors.toList()); + List> writeStats = partitions.stream().flatMap(p -> fileIdsPerPartition.stream().map(f -> { + try { + File file = new File(basePath + "/" + p + "/" + + (deltaCommit + ? FSUtils.makeLogFileName(f, ".log", baseInstant, Integer.parseInt(instant), TEST_WRITE_TOKEN) + : FSUtils.makeDataFileName(instant, TEST_WRITE_TOKEN, f))); + file.createNewFile(); + HoodieWriteStat w = new HoodieWriteStat(); + w.setFileId(f); + w.setPath(String.format("%s/%s", p, file.getName())); + return Pair.of(p, w); + } catch (IOException e) { + throw new HoodieException(e); + } + })).collect(Collectors.toList()); HoodieCommitMetadata metadata = new HoodieCommitMetadata(); writeStats.forEach(e -> metadata.addWriteStat(e.getKey(), e.getValue())); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestFSUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestFSUtils.java index b919d4236..3b010ca6b 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestFSUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestFSUtils.java @@ -229,8 +229,7 @@ public class TestFSUtils extends HoodieCommonTestHarness { String log1Ver1 = makeOldLogFileName("file1", ".log", "1", 1); String log1base2 = makeOldLogFileName("file1", ".log", "2", 0); List logFiles = Stream.of(log1base2, log1Ver1, log1Ver0).map(HoodieLogFile::new) - .collect(Collectors.toList()); - logFiles.sort(HoodieLogFile.getLogFileComparator()); + .sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); assertEquals(log1Ver0, logFiles.get(0).getFileName()); assertEquals(log1Ver1, logFiles.get(1).getFileName()); assertEquals(log1base2, logFiles.get(2).getFileName()); @@ -250,8 +249,7 @@ public class TestFSUtils extends HoodieCommonTestHarness { List logFiles = Stream.of(log1Ver1W1, log1base2W0, log1base2W1, log1Ver1W0, log1Ver0W1, log1Ver0W0) - .map(HoodieLogFile::new).collect(Collectors.toList()); - logFiles.sort(HoodieLogFile.getLogFileComparator()); + .map(HoodieLogFile::new).sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); assertEquals(log1Ver0W0, logFiles.get(0).getFileName()); assertEquals(log1Ver0W1, logFiles.get(1).getFileName()); assertEquals(log1Ver1W0, logFiles.get(2).getFileName()); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java index ba8e04213..f65e4f11e 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java @@ -21,6 +21,7 @@ package org.apache.hudi.hadoop; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -77,8 +78,7 @@ public class HoodieHiveUtil { return (!matcher.find() ? null : matcher.group(1)); } return null; - }).filter(s -> s != null) - .collect(Collectors.toList()); + }).filter(Objects::nonNull).collect(Collectors.toList()); if (result == null) { // Returns an empty list instead of null. result = new ArrayList<>(); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java index e7eb99014..eaf28fc0f 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java @@ -20,6 +20,7 @@ package org.apache.hudi.hadoop; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -48,6 +49,7 @@ import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieIOException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -93,9 +95,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement if (nonHoodiePaths.size() > 0) { setInputPaths(job, nonHoodiePaths.toArray(new Path[nonHoodiePaths.size()])); FileStatus[] fileStatuses = super.listStatus(job); - for (FileStatus fileStatus: fileStatuses) { - returns.add(fileStatus); - } + returns.addAll(Arrays.asList(fileStatuses)); } // process snapshot queries next. @@ -133,8 +133,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement .getInstants().collect(Collectors.toList()); // Extract partitions touched by the commitsToCheck Set partitionsToList = new HashSet<>(); - for (int i = 0; i < commitsToCheck.size(); i++) { - HoodieInstant commit = commitsToCheck.get(i); + for (HoodieInstant commit : commitsToCheck) { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(), HoodieCommitMetadata.class); partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet()); @@ -171,15 +170,14 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement return false; }) .collect(Collectors.joining(",")); - if (incrementalInputPaths == null || incrementalInputPaths.isEmpty()) { + if (StringUtils.isNullOrEmpty(incrementalInputPaths)) { return null; } // Mutate the JobConf to set the input paths to only partitions touched by incremental pull. setInputPaths(job, incrementalInputPaths); FileStatus[] fileStatuses = super.listStatus(job); - BaseFileOnlyView roView = new HoodieTableFileSystemView(tableMetaClient, timeline, - fileStatuses); - List commitsList = commitsToCheck.stream().map(s -> s.getTimestamp()).collect(Collectors.toList()); + BaseFileOnlyView roView = new HoodieTableFileSystemView(tableMetaClient, timeline, fileStatuses); + List commitsList = commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); List filteredFiles = roView.getLatestBaseFilesInRange(commitsList).collect(Collectors.toList()); List returns = new ArrayList<>(); for (HoodieBaseFile filteredFile : filteredFiles) { @@ -200,7 +198,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement * @throws IOException */ private Map> groupFileStatusForSnapshotPaths( - FileStatus[] fileStatuses, Collection metaClientList) throws IOException { + FileStatus[] fileStatuses, Collection metaClientList) { // This assumes the paths for different tables are grouped together Map> grouped = new HashMap<>(); HoodieTableMetaClient metadata = null; @@ -231,8 +229,8 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement * Filters data files for a snapshot queried table. */ private List filterFileStatusForSnapshotMode( - HoodieTableMetaClient metadata, List fileStatuses) throws IOException { - FileStatus[] statuses = fileStatuses.toArray(new FileStatus[fileStatuses.size()]); + HoodieTableMetaClient metadata, List fileStatuses) { + FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]); if (LOG.isDebugEnabled()) { LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metadata); } @@ -258,7 +256,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement * super.listStatus() and gets back a FileStatus[] 2. Then it creates the HoodieTableMetaClient for the paths listed. * 3. Generation of splits looks at FileStatus size to create splits, which skips this file */ - private HoodieBaseFile checkFileStatus(HoodieBaseFile dataFile) throws IOException { + private HoodieBaseFile checkFileStatus(HoodieBaseFile dataFile) { Path dataPath = dataFile.getFileStatus().getPath(); try { if (dataFile.getFileSize() == 0) { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java index cd1cea32e..506b6cfe2 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java @@ -65,11 +65,12 @@ import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -115,7 +116,7 @@ public class HoodieCombineHiveInputFormat call() throws Exception { - Set nonCombinablePathIndices = new HashSet(); + Set nonCombinablePathIndices = new HashSet<>(); for (int i = 0; i < length; i++) { PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(pathToPartitionInfo, paths[i + start], IOPrepareCache.get().allocatePartitionDescMap()); @@ -356,25 +357,21 @@ public class HoodieCombineHiveInputFormat result = new ArrayList(); + ArrayList result = new ArrayList<>(); // combine splits only from same tables and same partitions. Do not combine splits from multiple // tables or multiple partitions. Path[] paths = StringInternUtils.internUriStringsInPathArray(combine.getInputPathsShim(job)); - List inpDirs = new ArrayList(); - List inpFiles = new ArrayList(); - Map poolMap = new HashMap(); - Set poolSet = new HashSet(); + List inpDirs = new ArrayList<>(); + List inpFiles = new ArrayList<>(); + Map poolMap = new HashMap<>(); + Set poolSet = new HashSet<>(); for (Path path : paths) { PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(pathToPartitionInfo, path, @@ -414,8 +411,8 @@ public class HoodieCombineHiveInputFormat> opList = null; + CombineFilter f; + List> opList; if (!mrwork.isMapperCannotSpanPartns()) { // if mapper can span partitions, make sure a splits does not contain multiple @@ -441,7 +438,7 @@ public class HoodieCombineHiveInputFormat iss = new ArrayList(); + List iss = new ArrayList<>(); if (!mrwork.isMapperCannotSpanPartns()) { // mapper can span partitions // combine into as few as one split, subject to the PathFilters set @@ -496,14 +493,14 @@ public class HoodieCombineHiveInputFormat>> futureList = new ArrayList>>(numThreads); + List>> futureList = new ArrayList<>(numThreads); try { for (int i = 0; i < numThreads; i++) { int start = i * numPathPerThread; int length = i != numThreads - 1 ? numPathPerThread : paths.length - start; futureList.add(executor.submit(new CheckNonCombinablePathCallable(paths, start, length, job))); } - Set nonCombinablePathIndices = new HashSet(); + Set nonCombinablePathIndices = new HashSet<>(); for (Future> future : futureList) { nonCombinablePathIndices.addAll(future.get()); } @@ -522,12 +519,12 @@ public class HoodieCombineHiveInputFormat result = new ArrayList(); + List result = new ArrayList<>(); Path[] paths = getInputPaths(job); - List nonCombinablePaths = new ArrayList(paths.length / 2); - List combinablePaths = new ArrayList(paths.length / 2); + List nonCombinablePaths = new ArrayList<>(paths.length / 2); + List combinablePaths = new ArrayList<>(paths.length / 2); int numThreads = Math.min(MAX_CHECK_NONCOMBINABLE_THREAD_NUM, (int) Math.ceil((double) paths.length / DEFAULT_NUM_PATH_PER_THREAD)); @@ -561,22 +558,18 @@ public class HoodieCombineHiveInputFormat 0) { - FileInputFormat.setInputPaths(job, nonCombinablePaths.toArray(new Path[nonCombinablePaths.size()])); + FileInputFormat.setInputPaths(job, nonCombinablePaths.toArray(new Path[0])); InputSplit[] splits = super.getSplits(job, numSplits); - for (InputSplit split : splits) { - result.add(split); - } + Collections.addAll(result, splits); } // Process the combine splits if (combinablePaths.size() > 0) { - FileInputFormat.setInputPaths(job, combinablePaths.toArray(new Path[combinablePaths.size()])); + FileInputFormat.setInputPaths(job, combinablePaths.toArray(new Path[0])); Map pathToPartitionInfo = this.pathToPartitionInfo != null ? this.pathToPartitionInfo : Utilities.getMapWork(job).getPathToPartitionInfo(); InputSplit[] splits = getCombineSplits(job, numSplits, pathToPartitionInfo); - for (InputSplit split : splits) { - result.add(split); - } + Collections.addAll(result, splits); } // Restore the old path information back @@ -634,8 +627,8 @@ public class HoodieCombineHiveInputFormat sampleSplits(List splits) { HashMap nameToSamples = mrwork.getNameToSplitSample(); - List retLists = new ArrayList(); - Map> aliasToSplitList = new HashMap>(); + List retLists = new ArrayList<>(); + Map> aliasToSplitList = new HashMap<>(); Map> pathToAliases = mrwork.getPathToAliases(); Map> pathToAliasesNoScheme = removeScheme(pathToAliases); @@ -651,7 +644,7 @@ public class HoodieCombineHiveInputFormat()); + aliasToSplitList.put(alias, new ArrayList<>()); } aliasToSplitList.get(alias).add(split); } else { @@ -727,7 +720,7 @@ public class HoodieCombineHiveInputFormat pStrings = new HashSet(); + private final Set pStrings = new HashSet<>(); // store a path prefix in this TestFilter // PRECONDITION: p should always be a directory @@ -764,7 +757,7 @@ public class HoodieCombineHiveInputFormat(Arrays.asList(input.listStatus(new JobConf(job.getConfiguration())))); + result = new ArrayList<>(Arrays.asList(input.listStatus(new JobConf(job.getConfiguration())))); } else { result = super.listStatus(job); } - Iterator it = result.iterator(); - - while (it.hasNext()) { - FileStatus stat = (FileStatus) it.next(); - if (!stat.isFile()) { - it.remove(); - } - } + result.removeIf(stat -> !stat.isFile()); return result; } @@ -870,12 +856,12 @@ public class HoodieCombineHiveInputFormat inputSplitShims = new ArrayList<>(); - for (int pos = 0; pos < splits.length; ++pos) { - CombineFileSplit split = (CombineFileSplit) splits[pos]; + for (InputSplit inputSplit : splits) { + CombineFileSplit split = (CombineFileSplit) inputSplit; if (split.getPaths().length > 0) { - inputSplitShims.add(new HadoopShimsSecure.InputSplitShim(job, split.getPaths(), split.getStartOffsets(), + inputSplitShims.add(new InputSplitShim(job, split.getPaths(), split.getStartOffsets(), split.getLengths(), split.getLocations())); } } @@ -884,7 +870,7 @@ public class HoodieCombineHiveInputFormat {@link https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L229} - Set fieldOrdersSet = new LinkedHashSet<>(); String[] fieldOrdersWithDups = fieldOrderCsv.split(","); - for (String fieldOrder : fieldOrdersWithDups) { - fieldOrdersSet.add(fieldOrder); - } - String[] fieldOrders = fieldOrdersSet.toArray(new String[fieldOrdersSet.size()]); + Set fieldOrdersSet = new LinkedHashSet<>(Arrays.asList(fieldOrdersWithDups)); + String[] fieldOrders = fieldOrdersSet.toArray(new String[0]); List fieldNames = Arrays.stream(fieldNameCsv.split(",")) .filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList()); - Set fieldNamesSet = new LinkedHashSet<>(); - for (String fieldName : fieldNames) { - fieldNamesSet.add(fieldName); - } + Set fieldNamesSet = new LinkedHashSet<>(fieldNames); // Hive does not provide ids for partitioning fields, so check for lengths excluding that. if (fieldNamesSet.size() != fieldOrders.length) { throw new HoodieException(String @@ -189,7 +183,7 @@ public abstract class AbstractRealtimeRecordReader { fieldNames.size(), fieldOrders.length)); } TreeMap orderedFieldMap = new TreeMap<>(); - String[] fieldNamesArray = fieldNamesSet.toArray(new String[fieldNamesSet.size()]); + String[] fieldNamesArray = fieldNamesSet.toArray(new String[0]); for (int ox = 0; ox < fieldOrders.length; ox++) { orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNamesArray[ox]); } @@ -402,7 +396,7 @@ public abstract class AbstractRealtimeRecordReader { public long getMaxCompactionMemoryInBytes() { // jobConf.getMemoryForMapTask() returns in MB return (long) Math - .ceil(Double.valueOf(jobConf.get(COMPACTION_MEMORY_FRACTION_PROP, DEFAULT_COMPACTION_MEMORY_FRACTION)) + .ceil(Double.parseDouble(jobConf.get(COMPACTION_MEMORY_FRACTION_PROP, DEFAULT_COMPACTION_MEMORY_FRACTION)) * jobConf.getMemoryForMapTask() * 1024 * 1024L); } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java index 35d969e8d..bc717a91e 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java @@ -108,7 +108,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i // for all unique split parents, obtain all delta files based on delta commit timeline, // grouped on file id List rtSplits = new ArrayList<>(); - partitionsToParquetSplits.keySet().stream().forEach(partitionPath -> { + partitionsToParquetSplits.keySet().forEach(partitionPath -> { // for each partition path obtain the data & log file groupings, then map back to inputsplits HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath); HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline()); @@ -149,7 +149,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i } }); LOG.info("Returning a total splits of " + rtSplits.size()); - return rtSplits.toArray(new InputSplit[rtSplits.size()]); + return rtSplits.toArray(new InputSplit[0]); } @Override diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java index cb8606e18..c4b79cb10 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java @@ -48,7 +48,7 @@ public class HoodieRealtimeRecordReader implements RecordReader { // convert Hoodie log record to Hadoop AvroWritable and buffer GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema()).get(); @@ -93,7 +93,6 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader /** * Setup log and parquet reading in parallel. Both write to central buffer. */ - @SuppressWarnings("unchecked") private List> getParallelProducers() { List> producers = new ArrayList<>(); producers.add(new FunctionBasedQueueProducer<>(buffer -> { @@ -105,7 +104,7 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader } @Override - public boolean next(NullWritable key, ArrayWritable value) throws IOException { + public boolean next(NullWritable key, ArrayWritable value) { if (!iterator.hasNext()) { return false; } @@ -125,7 +124,7 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader } @Override - public long getPos() throws IOException { + public long getPos() { // TODO: vb - No logical way to represent parallel stream pos in a single long. // Should we just return invalid (-1). Where is it used ? return 0; diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputFormatTestUtil.java index 405549b03..559a573d5 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputFormatTestUtil.java @@ -33,12 +33,12 @@ import org.apache.parquet.avro.AvroParquetWriter; import org.junit.rules.TemporaryFolder; import java.io.File; -import java.io.FilenameFilter; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.UUID; public class InputFormatTestUtil { @@ -59,13 +59,10 @@ public class InputFormatTestUtil { public static void simulateUpdates(File directory, final String originalCommit, int numberOfFilesUpdated, String newCommit, boolean randomize) throws IOException { - List dataFiles = Arrays.asList(directory.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - String commitTs = FSUtils.getCommitTime(name); - return originalCommit.equals(commitTs); - } - })); + List dataFiles = Arrays.asList(Objects.requireNonNull(directory.listFiles((dir, name) -> { + String commitTs = FSUtils.getCommitTime(name); + return originalCommit.equals(commitTs); + }))); if (randomize) { Collections.shuffle(dataFiles); } @@ -183,16 +180,10 @@ public class InputFormatTestUtil { public static void simulateParquetUpdates(File directory, Schema schema, String originalCommit, int totalNumberOfRecords, int numberOfRecordsToUpdate, String newCommit) throws IOException { - File fileToUpdate = directory.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.endsWith("parquet"); - } - })[0]; + File fileToUpdate = Objects.requireNonNull(directory.listFiles((dir, name) -> name.endsWith("parquet")))[0]; String fileId = FSUtils.getFileId(fileToUpdate.getName()); File dataFile = new File(directory, FSUtils.makeDataFileName(newCommit, TEST_WRITE_TOKEN, fileId)); - AvroParquetWriter parquetWriter = new AvroParquetWriter(new Path(dataFile.getAbsolutePath()), schema); - try { + try (AvroParquetWriter parquetWriter = new AvroParquetWriter(new Path(dataFile.getAbsolutePath()), schema)) { for (GenericRecord record : generateAvroRecords(schema, totalNumberOfRecords, originalCommit, fileId)) { if (numberOfRecordsToUpdate > 0) { // update this record @@ -203,8 +194,6 @@ public class InputFormatTestUtil { } parquetWriter.write(record); } - } finally { - parquetWriter.close(); } } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputPathHandlerTest.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputPathHandlerTest.java index 307c18203..4dfaea911 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputPathHandlerTest.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputPathHandlerTest.java @@ -157,7 +157,7 @@ public class InputPathHandlerTest { @Test public void testInputPathHandler() throws IOException { inputPathHandler = new InputPathHandler(dfs.getConf(), inputPaths.toArray( - new Path[inputPaths.size()]), incrementalTables); + new Path[0]), incrementalTables); List actualPaths = inputPathHandler.getGroupedIncrementalPaths().values().stream() .flatMap(List::stream).collect(Collectors.toList()); assertTrue(actualComparesToExpected(actualPaths, incrementalPaths)); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index ed501e700..0f15f4568 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -122,7 +122,7 @@ public class TestHoodieParquetInputFormat { throws IOException { List writeStats = HoodieTestUtils.generateFakeHoodieWriteStat(1); HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); - writeStats.stream().forEach(stat -> commitMetadata.addWriteStat(partitionPath, stat)); + writeStats.forEach(stat -> commitMetadata.addWriteStat(partitionPath, stat)); File file = new File(basePath.getRoot().toString() + "/.hoodie/", commitNumber + ".commit"); file.createNewFile(); FileOutputStream fileOutputStream = new FileOutputStream(file); @@ -221,8 +221,8 @@ public class TestHoodieParquetInputFormat { String defaultmode = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips"); conf.set(defaultmode, HoodieHiveUtil.DEFAULT_SCAN_MODE); List actualincrTables = HoodieHiveUtil.getIncrementalTableNames(Job.getInstance(conf)); - for (int i = 0; i < expectedincrTables.length; i++) { - assertTrue(actualincrTables.contains(expectedincrTables[i])); + for (String expectedincrTable : expectedincrTables) { + assertTrue(actualincrTables.contains(expectedincrTable)); } } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index aaadebead..89b71684b 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -68,7 +68,7 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -167,9 +167,9 @@ public class TestHoodieRealtimeRecordReader { private void setHiveColumnNameProps(List fields, JobConf jobConf, boolean isPartitioned) { String names = fields.stream().map(Field::name).collect(Collectors.joining(",")); - String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); + String positions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); - jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions); String hiveOrderedColumnNames = fields.stream().filter(field -> !field.name().equalsIgnoreCase(PARTITION_COLUMN)) .map(Field::name).collect(Collectors.joining(",")); @@ -286,7 +286,7 @@ public class TestHoodieRealtimeRecordReader { String logFilePath = writer.getLogFile().getPath().toString(); HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + commitTime + ".parquet"), 0, 1, jobConf), - basePath.getRoot().getPath(), Arrays.asList(logFilePath), newCommitTime); + basePath.getRoot().getPath(), Collections.singletonList(logFilePath), newCommitTime); // create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = new MapredParquetInputFormat().getRecordReader( @@ -361,7 +361,7 @@ public class TestHoodieRealtimeRecordReader { String logFilePath = writer.getLogFile().getPath().toString(); HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + commitTime + ".parquet"), 0, 1, jobConf), - basePath.getRoot().getPath(), Arrays.asList(logFilePath), newCommitTime); + basePath.getRoot().getPath(), Collections.singletonList(logFilePath), newCommitTime); // create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = new MapredParquetInputFormat().getRecordReader( diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 8684f3bd9..bc976b0ea 100644 --- a/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -80,7 +80,7 @@ public class HiveSyncTool { } } - public void syncHoodieTable() throws ClassNotFoundException { + public void syncHoodieTable() { try { switch (hoodieHiveClient.getTableType()) { case COPY_ON_WRITE: @@ -193,7 +193,7 @@ public class HiveSyncTool { .collect(Collectors.toList()); } - public static void main(String[] args) throws Exception { + public static void main(String[] args) { // parse the params final HiveSyncConfig cfg = new HiveSyncConfig(); JCommander cmd = new JCommander(cfg, null, args); diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index 4578bb2fa..208173828 100644 --- a/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -18,7 +18,6 @@ package org.apache.hudi.hive; -import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieLogFile; @@ -41,6 +40,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; @@ -65,7 +65,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -190,7 +189,7 @@ public class HoodieHiveClient { for (int i = 0; i < syncConfig.partitionFields.size(); i++) { partBuilder.add("`" + syncConfig.partitionFields.get(i) + "`='" + partitionValues.get(i) + "'"); } - return partBuilder.stream().collect(Collectors.joining(",")); + return String.join(",", partBuilder); } private List constructChangePartitions(String tableName, List partitions) { @@ -500,7 +499,7 @@ public class HoodieHiveClient { * @param sql SQL statement to execute */ public CommandProcessorResponse updateHiveSQLUsingHiveDriver(String sql) throws HoodieHiveSyncException { - List responses = updateHiveSQLs(Arrays.asList(sql)); + List responses = updateHiveSQLs(Collections.singletonList(sql)); return responses.get(responses.size() - 1); } diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java index 39ac69484..4a227c6c9 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java @@ -87,7 +87,7 @@ public abstract class ITTestBase { cmd.add("hive.stats.autogather=false"); cmd.add("-e"); cmd.add("\"" + fullCommand + "\""); - return cmd.stream().toArray(String[]::new); + return cmd.toArray(new String[0]); } private static String getHiveConsoleCommandFile(String commandFile, String additionalVar) { diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index af8bb1697..f7aa67ea2 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -43,7 +43,7 @@ import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -127,7 +127,7 @@ public class DataSourceUtils { } public static void checkRequiredProperties(TypedProperties props, List checkPropNames) { - checkPropNames.stream().forEach(prop -> { + checkPropNames.forEach(prop -> { if (!props.containsKey(prop)) { throw new HoodieNotSupportedException("Required property " + prop + " is missing"); } @@ -182,19 +182,13 @@ public class DataSourceUtils { @SuppressWarnings("unchecked") public static JavaRDD dropDuplicates(JavaSparkContext jssc, JavaRDD incomingHoodieRecords, HoodieWriteConfig writeConfig, Option timelineService) { - HoodieReadClient client = null; - try { - client = new HoodieReadClient<>(jssc, writeConfig, timelineService); + try (HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig, timelineService)) { return client.tagLocation(incomingHoodieRecords) .filter(r -> !((HoodieRecord) r).isCurrentLocationKnown()); } catch (TableNotFoundException e) { // this will be executed when there is no hoodie table yet // so no dups to drop return incomingHoodieRecords; - } finally { - if (null != client) { - client.close(); - } } } @@ -207,12 +201,12 @@ public class DataSourceUtils { } public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String basePath) { - checkRequiredProperties(props, Arrays.asList(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY())); + checkRequiredProperties(props, Collections.singletonList(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY())); HiveSyncConfig hiveSyncConfig = new HiveSyncConfig(); hiveSyncConfig.basePath = basePath; hiveSyncConfig.usePreApacheInputFormat = props.getBoolean(DataSourceWriteOptions.HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY(), - Boolean.valueOf(DataSourceWriteOptions.DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL())); + Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL())); hiveSyncConfig.databaseName = props.getString(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), DataSourceWriteOptions.DEFAULT_HIVE_DATABASE_OPT_VAL()); hiveSyncConfig.tableName = props.getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY()); diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index 16c2d7548..decab9c64 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -52,7 +52,7 @@ object AvroConversionUtils { } def createRddForDeletes(df: DataFrame, rowField: String, partitionField: String): RDD[HoodieKey] = { - df.rdd.map(row => (new HoodieKey(row.getAs[String](rowField), row.getAs[String](partitionField)))) + df.rdd.map(row => new HoodieKey(row.getAs[String](rowField), row.getAs[String](partitionField))) } def createDataFrame(rdd: RDD[GenericRecord], schemaStr: String, ss: SparkSession): Dataset[Row] = { @@ -67,7 +67,7 @@ object AvroConversionUtils { val convertor = AvroConversionHelper.createConverterToRow(schema, dataType) records.map { x => convertor(x).asInstanceOf[Row] } } - }, convertAvroSchemaToStructType(new Schema.Parser().parse(schemaStr))).asInstanceOf[Dataset[Row]] + }, convertAvroSchemaToStructType(new Schema.Parser().parse(schemaStr))) } } diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala index 23a757db9..931f23bc2 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -63,7 +63,7 @@ class DefaultSource extends RelationProvider sqlContext.sparkContext.hadoopConfiguration.setClass( "mapreduce.input.pathFilter.class", classOf[HoodieROTablePathFilter], - classOf[org.apache.hadoop.fs.PathFilter]); + classOf[org.apache.hadoop.fs.PathFilter]) log.info("Constructing hoodie (as parquet) data source with options :" + parameters) log.warn("Snapshot view not supported yet via data source, for MERGE_ON_READ tables. " + diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index d4d48f960..598e5cd29 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -77,7 +77,7 @@ private[hudi] object HoodieSparkSqlWriter { val jsc = new JavaSparkContext(sparkContext) val basePath = new Path(parameters("path")) - val commitTime = HoodieActiveTimeline.createNewInstantTime(); + val commitTime = HoodieActiveTimeline.createNewInstantTime() val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) var exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) @@ -282,7 +282,7 @@ private[hudi] object HoodieSparkSqlWriter { client.close() commitSuccess && syncHiveSucess } else { - log.error(s"$operation failed with ${errorCount} errors :"); + log.error(s"$operation failed with $errorCount errors :") if (log.isTraceEnabled) { log.trace("Printing out the top 100 errors") writeStatuses.rdd.filter(ws => ws.hasErrors) diff --git a/hudi-spark/src/test/scala/TestDataSourceDefaults.scala b/hudi-spark/src/test/scala/TestDataSourceDefaults.scala index 31a04d691..5a307d3ba 100644 --- a/hudi-spark/src/test/scala/TestDataSourceDefaults.scala +++ b/hudi-spark/src/test/scala/TestDataSourceDefaults.scala @@ -31,7 +31,7 @@ import org.scalatest.junit.AssertionsForJUnit class TestDataSourceDefaults extends AssertionsForJUnit { val schema = SchemaTestUtil.getComplexEvolvedSchema - var baseRecord: GenericRecord = null + var baseRecord: GenericRecord = _ @Before def initialize(): Unit = { baseRecord = SchemaTestUtil @@ -60,10 +60,9 @@ class TestDataSourceDefaults extends AssertionsForJUnit { new SimpleKeyGenerator(props).getKey(baseRecord) fail("Should have errored out") } catch { - case e: IllegalArgumentException => { + case e: IllegalArgumentException => // do nothing - } - }; + } // recordkey field not specified try { @@ -72,10 +71,9 @@ class TestDataSourceDefaults extends AssertionsForJUnit { new SimpleKeyGenerator(props).getKey(baseRecord) fail("Should have errored out") } catch { - case e: IllegalArgumentException => { + case e: IllegalArgumentException => // do nothing - } - }; + } // nested field as record key and partition path val hk2 = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.isAdmin", "false")) @@ -89,14 +87,13 @@ class TestDataSourceDefaults extends AssertionsForJUnit { .getKey(baseRecord) fail("Should have errored out") } catch { - case e: HoodieException => { + case e: HoodieException => // do nothing - } - }; + } // if partition path can't be found, return default partition path val hk3 = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.notThere", "false")) - .getKey(baseRecord); + .getKey(baseRecord) assertEquals("default", hk3.getPartitionPath) // if enable hive style partitioning @@ -155,10 +152,9 @@ class TestDataSourceDefaults extends AssertionsForJUnit { new ComplexKeyGenerator(props).getKey(baseRecord) fail("Should have errored out") } catch { - case e: IllegalArgumentException => { + case e: IllegalArgumentException => // do nothing - } - }; + } // recordkey field not specified try { @@ -167,10 +163,9 @@ class TestDataSourceDefaults extends AssertionsForJUnit { new ComplexKeyGenerator(props).getKey(baseRecord) fail("Should have errored out") } catch { - case e: IllegalArgumentException => { + case e: IllegalArgumentException => // do nothing - } - }; + } // nested field as record key and partition path val hk2 = new ComplexKeyGenerator(getKeyConfig("testNestedRecord.userId,testNestedRecord.isAdmin", "testNestedRecord.userId,testNestedRecord.isAdmin", "false")) @@ -184,14 +179,13 @@ class TestDataSourceDefaults extends AssertionsForJUnit { .getKey(baseRecord) fail("Should have errored out") } catch { - case e: HoodieException => { + case e: HoodieException => // do nothing - } - }; + } // if partition path can't be found, return default partition path val hk3 = new ComplexKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.notThere", "false")) - .getKey(baseRecord); + .getKey(baseRecord) assertEquals("default", hk3.getPartitionPath) // if enable hive style partitioning @@ -269,10 +263,9 @@ class TestDataSourceDefaults extends AssertionsForJUnit { new GlobalDeleteKeyGenerator(props).getKey(baseRecord) fail("Should have errored out") } catch { - case e: IllegalArgumentException => { + case e: IllegalArgumentException => // do nothing - } - }; + } // Nested record key not found try { @@ -280,10 +273,9 @@ class TestDataSourceDefaults extends AssertionsForJUnit { .getKey(baseRecord) fail("Should have errored out") } catch { - case e: HoodieException => { + case e: HoodieException => // do nothing - } - }; + } // if all parts of the composite record key are null/empty, throw error try { diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java index a25b129ad..a807e05f5 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java @@ -350,8 +350,7 @@ public class FileSystemViewHandler { } finally { long endTs = System.currentTimeMillis(); long timeTakenMillis = endTs - beginTs; - LOG - .info(String.format( + LOG.info(String.format( "TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], " + "Success=%s, Query=%s, Host=%s, synced=%s", timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken, finalCheckTimeTaken, success, diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java index 856e68247..2cfc91454 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java @@ -56,6 +56,7 @@ import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Properties; @@ -81,7 +82,7 @@ public class HDFSParquetImporter implements Serializable { this.cfg = cfg; } - public static void main(String[] args) throws Exception { + public static void main(String[] args) { final Config cfg = new Config(); JCommander cmd = new JCommander(cfg, null, args); if (cfg.help || args.length == 0) { @@ -160,8 +161,7 @@ public class HDFSParquetImporter implements Serializable { AvroReadSupport.setAvroReadSchema(jsc.hadoopConfiguration(), (new Schema.Parser().parse(schemaStr))); ParquetInputFormat.setReadSupportClass(job, (AvroReadSupport.class)); - return jsc - .newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class, + return jsc.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class, job.getConfiguration()) // To reduce large number of tasks. .coalesce(16 * cfg.parallelism).map(entry -> { @@ -198,7 +198,7 @@ public class HDFSParquetImporter implements Serializable { * @param Type */ protected JavaRDD load(HoodieWriteClient client, String instantTime, - JavaRDD> hoodieRecords) throws Exception { + JavaRDD> hoodieRecords) { switch (cfg.command.toLowerCase()) { case "upsert": { return client.upsert(hoodieRecords, instantTime); @@ -227,7 +227,7 @@ public class HDFSParquetImporter implements Serializable { public static class FormatValidator implements IValueValidator { - List validFormats = Arrays.asList("parquet"); + List validFormats = Collections.singletonList("parquet"); @Override public void validate(String name, String value) throws ParameterException { @@ -241,7 +241,7 @@ public class HDFSParquetImporter implements Serializable { public static class Config implements Serializable { @Parameter(names = {"--command", "-c"}, description = "Write command Valid values are insert(default)/upsert/bulkinsert", - required = false, validateValueWith = CommandValidator.class) + validateValueWith = CommandValidator.class) public String command = "INSERT"; @Parameter(names = {"--src-path", "-sp"}, description = "Base path for the input table", required = true) public String srcPath = null; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java index b08517c4e..7d356a554 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java @@ -62,7 +62,6 @@ import java.util.stream.Collectors; public class HiveIncrementalPuller { private static final Logger LOG = LogManager.getLogger(HiveIncrementalPuller.class); - private static String driverName = "org.apache.hive.jdbc.HiveDriver"; public static class Config implements Serializable { @@ -97,6 +96,7 @@ public class HiveIncrementalPuller { } static { + String driverName = "org.apache.hive.jdbc.HiveDriver"; try { Class.forName(driverName); } catch (ClassNotFoundException e) { @@ -219,8 +219,7 @@ public class HiveIncrementalPuller { // Set the from commit time executeStatement("set hoodie." + config.sourceTable + ".consume.start.timestamp=" + config.fromCommitTime, stmt); // Set number of commits to pull - executeStatement("set hoodie." + config.sourceTable + ".consume.max.commits=" + String.valueOf(config.maxCommits), - stmt); + executeStatement("set hoodie." + config.sourceTable + ".consume.max.commits=" + config.maxCommits, stmt); } private boolean deleteHDFSPath(FileSystem fs, String path) throws IOException { @@ -233,14 +232,14 @@ public class HiveIncrementalPuller { stmt.execute(sql); } - private String inferCommitTime(FileSystem fs) throws SQLException, IOException { + private String inferCommitTime(FileSystem fs) throws IOException { LOG.info("FromCommitTime not specified. Trying to infer it from Hoodie table " + config.targetDb + "." + config.targetTable); String targetDataLocation = getTableLocation(config.targetDb, config.targetTable); return scanForCommitTime(fs, targetDataLocation); } - private String getTableLocation(String db, String table) throws SQLException { + private String getTableLocation(String db, String table) { ResultSet resultSet = null; Statement stmt = null; try { @@ -309,7 +308,7 @@ public class HiveIncrementalPuller { return FileSystem.mkdirs(fs, targetBaseDirPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); } - private String getLastCommitTimePulled(FileSystem fs, String sourceTableLocation) throws IOException { + private String getLastCommitTimePulled(FileSystem fs, String sourceTableLocation) { HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(), sourceTableLocation); List commitsToSync = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants() .findInstantsAfter(config.fromCommitTime, config.maxCommits).getInstants().map(HoodieInstant::getTimestamp) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java index 13b712552..11f44e143 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java @@ -31,7 +31,6 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; -import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; @@ -45,11 +44,6 @@ public class HoodieCleaner { */ private final Config cfg; - /** - * Filesystem used. - */ - private transient FileSystem fs; - /** * Spark context. */ @@ -60,22 +54,25 @@ public class HoodieCleaner { */ private TypedProperties props; - public HoodieCleaner(Config cfg, JavaSparkContext jssc) throws IOException { + public HoodieCleaner(Config cfg, JavaSparkContext jssc) { this.cfg = cfg; this.jssc = jssc; - this.fs = FSUtils.getFs(cfg.basePath, jssc.hadoopConfiguration()); + /* + * Filesystem used. + */ + FileSystem fs = FSUtils.getFs(cfg.basePath, jssc.hadoopConfiguration()); this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) : UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); LOG.info("Creating Cleaner with configs : " + props.toString()); } - public void run() throws Exception { + public void run() { HoodieWriteConfig hoodieCfg = getHoodieClientConfig(); HoodieWriteClient client = new HoodieWriteClient<>(jssc, hoodieCfg, false); client.clean(); } - private HoodieWriteConfig getHoodieClientConfig() throws Exception { + private HoodieWriteConfig getHoodieClientConfig() { return HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.basePath).withAutoCommit(false) .withProps(props).build(); } @@ -101,7 +98,7 @@ public class HoodieCleaner { public Boolean help = false; } - public static void main(String[] args) throws Exception { + public static void main(String[] args) { final Config cfg = new Config(); JCommander cmd = new JCommander(cfg, null, args); if (cfg.help || args.length == 0) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java index 87e6cecdd..3634362fb 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java @@ -60,8 +60,7 @@ public class HoodieCompactionAdminTool { */ public void run(JavaSparkContext jsc) throws Exception { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.basePath); - final CompactionAdminClient admin = new CompactionAdminClient(jsc, cfg.basePath); - try { + try (CompactionAdminClient admin = new CompactionAdminClient(jsc, cfg.basePath)) { final FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration()); if (cfg.outputPath != null && fs.exists(new Path(cfg.outputPath))) { throw new IllegalStateException("Output File Path already exists"); @@ -101,8 +100,6 @@ public class HoodieCompactionAdminTool { default: throw new IllegalStateException("Not yet implemented !!"); } - } finally { - admin.close(); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java index 1547403b4..60cabe57d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java @@ -83,7 +83,7 @@ public class HoodieCompactor { public List configs = new ArrayList<>(); } - public static void main(String[] args) throws Exception { + public static void main(String[] args) { final Config cfg = new Config(); JCommander cmd = new JCommander(cfg, null, args); if (cfg.help || args.length == 0) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieWithTimelineServer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieWithTimelineServer.java index f89aa78ea..feb2c219a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieWithTimelineServer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieWithTimelineServer.java @@ -42,30 +42,28 @@ public class HoodieWithTimelineServer implements Serializable { private final Config cfg; - private transient Javalin app = null; - public HoodieWithTimelineServer(Config cfg) { this.cfg = cfg; } public static class Config implements Serializable { - @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false) + @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master") public String sparkMaster = null; @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true) public String sparkMemory = null; - @Parameter(names = {"--num-partitions", "-n"}, description = "Num Partitions", required = false) + @Parameter(names = {"--num-partitions", "-n"}, description = "Num Partitions") public Integer numPartitions = 100; - @Parameter(names = {"--server-port", "-p"}, description = " Server Port", required = false) + @Parameter(names = {"--server-port", "-p"}, description = " Server Port") public Integer serverPort = 26754; - @Parameter(names = {"--delay-secs", "-d"}, description = "Delay(sec) before client connects", required = false) + @Parameter(names = {"--delay-secs", "-d"}, description = "Delay(sec) before client connects") public Integer delaySecs = 30; @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; } public void startService() { - app = Javalin.create().start(cfg.serverPort); + Javalin app = Javalin.create().start(cfg.serverPort); app.get("/", ctx -> ctx.result("Hello World")); } @@ -107,7 +105,7 @@ public class HoodieWithTimelineServer implements Serializable { System.out.println("Response Code from(" + url + ") : " + response.getStatusLine().getStatusCode()); try (BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity().getContent()))) { - StringBuffer result = new StringBuffer(); + StringBuilder result = new StringBuilder(); String line; while ((line = rd.readLine()) != null) { result.append(line); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 4e0c86cf3..3fe00ca0c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -167,9 +167,8 @@ public class UtilHelpers { sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK"); - additionalConfigs.entrySet().forEach(e -> sparkConf.set(e.getKey(), e.getValue())); - SparkConf newSparkConf = HoodieWriteClient.registerClasses(sparkConf); - return newSparkConf; + additionalConfigs.forEach(sparkConf::set); + return HoodieWriteClient.registerClasses(sparkConf); } public static JavaSparkContext buildSparkContext(String appName, String defaultMaster, Map configs) { @@ -200,7 +199,7 @@ public class UtilHelpers { * @param parallelism Parallelism */ public static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr, - int parallelism, Option compactionStrategyClass, TypedProperties properties) throws Exception { + int parallelism, Option compactionStrategyClass, TypedProperties properties) { HoodieCompactionConfig compactionConfig = compactionStrategyClass .map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false) .withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build()) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java index eb3212fa0..f67b62c79 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java @@ -50,7 +50,7 @@ public class Compactor implements Serializable { public void compact(HoodieInstant instant) throws IOException { LOG.info("Compactor executing compaction " + instant); JavaRDD res = compactionClient.compact(instant.getTimestamp()); - long numWriteErrors = res.collect().stream().filter(r -> r.hasErrors()).count(); + long numWriteErrors = res.collect().stream().filter(WriteStatus::hasErrors).count(); if (numWriteErrors != 0) { // We treat even a single error in compaction as fatal LOG.error("Compaction for instant (" + instant + ") failed with write errors. Errors :" + numWriteErrors); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 2dd4138cf..e608a6414 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -376,8 +376,8 @@ public class DeltaSync implements Serializable { throw new HoodieDeltaStreamerException("Unknown operation :" + cfg.operation); } - long totalErrorRecords = writeStatusRDD.mapToDouble(ws -> ws.getTotalErrorRecords()).sum().longValue(); - long totalRecords = writeStatusRDD.mapToDouble(ws -> ws.getTotalRecords()).sum().longValue(); + long totalErrorRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue(); + long totalRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue(); boolean hasErrors = totalErrorRecords > 0; long hiveSyncTimeMs = 0; if (!hasErrors || cfg.commitOnErrors) { @@ -414,10 +414,10 @@ public class DeltaSync implements Serializable { } else { LOG.error("Delta Sync found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords); LOG.error("Printing out the top 100 errors"); - writeStatusRDD.filter(ws -> ws.hasErrors()).take(100).forEach(ws -> { + writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> { LOG.error("Global error :", ws.getGlobalError()); if (ws.getErrors().size() > 0) { - ws.getErrors().entrySet().forEach(r -> LOG.trace("Error for key:" + r.getKey() + " is " + r.getValue())); + ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " is " + value)); } }); // Rolling back instant @@ -456,7 +456,7 @@ public class DeltaSync implements Serializable { /** * Sync to Hive. */ - private void syncHive() throws ClassNotFoundException { + private void syncHive() { if (cfg.enableHiveSync) { HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath); LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :" diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index b8ff404c8..9c05f315c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -64,7 +64,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; import java.util.stream.IntStream; /** @@ -440,7 +439,7 @@ public class HoodieDeltaStreamer implements Serializable { HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, true); List pending = CompactionUtils.getPendingCompactionInstantTimes(meta); - pending.stream().forEach(hoodieInstant -> asyncCompactService.enqueuePendingCompaction(hoodieInstant)); + pending.forEach(hoodieInstant -> asyncCompactService.enqueuePendingCompaction(hoodieInstant)); asyncCompactService.start((error) -> { // Shutdown DeltaSync shutdown(false); @@ -554,29 +553,27 @@ public class HoodieDeltaStreamer implements Serializable { @Override protected Pair startService() { ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentCompaction); - List> compactionFutures = - IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> { - try { - // Set Compactor Pool Name for allowing users to prioritize compaction - LOG.info("Setting Spark Pool name for compaction to " + SchedulerConfGenerator.COMPACT_POOL_NAME); - jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.COMPACT_POOL_NAME); + return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> { + try { + // Set Compactor Pool Name for allowing users to prioritize compaction + LOG.info("Setting Spark Pool name for compaction to " + SchedulerConfGenerator.COMPACT_POOL_NAME); + jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.COMPACT_POOL_NAME); - while (!isShutdownRequested()) { - final HoodieInstant instant = fetchNextCompactionInstant(); - if (null != instant) { - compactor.compact(instant); - } - } - LOG.info("Compactor shutting down properly!!"); - } catch (InterruptedException ie) { - LOG.warn("Compactor executor thread got interrupted exception. Stopping", ie); - } catch (IOException e) { - LOG.error("Compactor executor failed", e); - throw new HoodieIOException(e.getMessage(), e); + while (!isShutdownRequested()) { + final HoodieInstant instant = fetchNextCompactionInstant(); + if (null != instant) { + compactor.compact(instant); } - return true; - }, executor)).collect(Collectors.toList()); - return Pair.of(CompletableFuture.allOf(compactionFutures.stream().toArray(CompletableFuture[]::new)), executor); + } + LOG.info("Compactor shutting down properly!!"); + } catch (InterruptedException ie) { + LOG.warn("Compactor executor thread got interrupted exception. Stopping", ie); + } catch (IOException e) { + LOG.error("Compactor executor failed", e); + throw new HoodieIOException(e.getMessage(), e); + } + return true; + }, executor)).toArray(CompletableFuture[]::new)), executor); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java index 19f8e10ab..a054b277a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java @@ -25,8 +25,8 @@ import com.codahale.metrics.Timer; public class HoodieDeltaStreamerMetrics { - private HoodieWriteConfig config = null; - private String tableName = null; + private HoodieWriteConfig config; + private String tableName; public String overallTimerName = null; public String hiveSyncTimerName = null; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java index c0059ad45..e98b86763 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java @@ -46,7 +46,7 @@ public class SchedulerConfGenerator { public static final String SPARK_SCHEDULER_MODE_KEY = "spark.scheduler.mode"; public static final String SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file"; - private static String SPARK_SCHEDULING_PATTERN = + private static final String SPARK_SCHEDULING_PATTERN = "\n\n \n" + " %s\n %s\n %s\n" + " \n \n %s\n" diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index a31683b63..deb26b57d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -36,7 +36,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import java.util.Arrays; +import java.util.Collections; public class HoodieIncrSource extends RowSource { @@ -87,9 +87,9 @@ public class HoodieIncrSource extends RowSource { @Override public Pair>, String> fetchNextBatch(Option lastCkptStr, long sourceLimit) { - DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.HOODIE_SRC_BASE_PATH)); + DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.HOODIE_SRC_BASE_PATH)); - /** + /* * DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.HOODIE_SRC_BASE_PATH, * Config.HOODIE_SRC_PARTITION_FIELDS)); List partitionFields = * props.getStringList(Config.HOODIE_SRC_PARTITION_FIELDS, ",", new ArrayList<>()); PartitionValueExtractor @@ -121,7 +121,7 @@ public class HoodieIncrSource extends RowSource { Dataset source = reader.load(srcPath); - /** + /* * log.info("Partition Fields are : (" + partitionFields + "). Initial Source Schema :" + source.schema()); * * StructType newSchema = new StructType(source.schema().fields()); for (String field : partitionFields) { newSchema diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java index 9dd2c6a65..988dbd822 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java @@ -25,7 +25,6 @@ import com.twitter.bijection.avro.GenericAvroCodecs; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import java.io.IOException; import java.io.Serializable; /** @@ -80,7 +79,7 @@ public class AvroConvertor implements Serializable { } } - public GenericRecord fromJson(String json) throws IOException { + public GenericRecord fromJson(String json) { initSchema(); initJsonConvertor(); return jsonConverter.convert(json, schema); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestTimestampBasedKeyGenerator.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestTimestampBasedKeyGenerator.java index cb0c82244..b358bd6f5 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestTimestampBasedKeyGenerator.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestTimestampBasedKeyGenerator.java @@ -33,13 +33,12 @@ import java.io.IOException; import static org.junit.Assert.assertEquals; public class TestTimestampBasedKeyGenerator { - private Schema schema; private GenericRecord baseRecord; private TypedProperties properties = new TypedProperties(); @Before public void initialize() throws IOException { - schema = SchemaTestUtil.getTimestampEvolvedSchema(); + Schema schema = SchemaTestUtil.getTimestampEvolvedSchema(); baseRecord = SchemaTestUtil .generateAvroRecordFromJson(schema, 1, "001", "f1");