From 69dfcda116584c59983f682253998f2c5b0dce6e Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 3 Feb 2022 14:01:41 -0800 Subject: [PATCH] [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s (#4556) --- .../table/TestHoodieMergeOnReadTable.java | 7 +- ...HoodieSparkMergeOnReadTableCompaction.java | 12 +- ...arkMergeOnReadTableInsertUpdateDelete.java | 8 +- ...stHoodieSparkMergeOnReadTableRollback.java | 93 +++++--- .../main/java/org/apache/hudi/TypeUtils.java | 35 +++ .../hudi/HoodieTableFileIndexBase.scala | 28 ++- .../hudi/hadoop/BaseFileWithLogsSplit.java | 14 +- .../hudi/hadoop/BootstrapBaseFileSplit.java | 4 + .../hadoop/HoodieFileInputFormatBase.java | 214 ++++++++++++------ .../hudi/hadoop/HoodieHFileInputFormat.java | 2 +- .../hudi/hadoop/HoodieParquetInputFormat.java | 2 +- .../apache/hudi/hadoop/InputPathHandler.java | 4 +- .../hudi/hadoop/PathWithLogFilePath.java | 12 +- .../hudi/hadoop/RealtimeFileStatus.java | 2 +- .../HoodieHFileRealtimeInputFormat.java | 7 +- .../HoodieParquetRealtimeInputFormat.java | 35 +-- .../realtime/HoodieRealtimeFileSplit.java | 7 +- .../RealtimeBootstrapBaseFileSplit.java | 28 ++- .../hudi/hadoop/realtime/RealtimeSplit.java | 12 +- .../hadoop/utils/HoodieInputFormatUtils.java | 20 +- .../utils/HoodieRealtimeInputFormatUtils.java | 142 +++++++++--- .../TestHoodieCombineHiveInputFormat.java | 2 +- 22 files changed, 493 insertions(+), 197 deletions(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/TypeUtils.java diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index ac77126fb..fb484f4be 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.hadoop.fs.Path; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; @@ -189,8 +190,10 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness assertTrue(fileIdToNewSize.entrySet().stream().anyMatch(entry -> fileIdToSize.get(entry.getKey()) < entry.getValue())); - List dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); - List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, + List inputPaths = roView.getLatestBaseFiles() + .map(baseFile -> new Path(baseFile.getPath()).getParent().toString()) + .collect(Collectors.toList()); + List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), inputPaths, basePath(), new JobConf(hadoopConf()), true, false); // Wrote 20 records in 2 batches assertEquals(40, recordsRead.size(), "Must contain 40 records"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java index 13903bf54..f4f47d375 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java @@ -22,6 +22,7 @@ package org.apache.hudi.table.functional; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; @@ -113,13 +114,14 @@ public class TestHoodieSparkMergeOnReadTableCompaction extends SparkClientFuncti JavaRDD records = jsc().parallelize(dataGen.generateInserts(instant, numRecords), 2); metaClient = HoodieTableMetaClient.reload(metaClient); client.startCommitWithTime(instant); - List writeStatues = client.upsert(records, instant).collect(); - org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatues); + List writeStatuses = client.upsert(records, instant).collect(); + org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatuses); if (doCommit) { - Assertions.assertTrue(client.commitStats(instant, writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()), - Option.empty(), metaClient.getCommitActionType())); + List writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()); + boolean committed = client.commitStats(instant, writeStats, Option.empty(), metaClient.getCommitActionType()); + Assertions.assertTrue(committed); } metaClient = HoodieTableMetaClient.reload(metaClient); - return writeStatues; + return writeStatuses; } } \ No newline at end of file diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java index 62ce00749..dac32b7bb 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java @@ -19,6 +19,7 @@ package org.apache.hudi.table.functional; +import org.apache.hadoop.fs.Path; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.FileSlice; @@ -213,8 +214,11 @@ public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent()); - List dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); - List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, basePath(), new JobConf(hadoopConf()), true, false); + List inputPaths = tableView.getLatestBaseFiles() + .map(baseFile -> new Path(baseFile.getPath()).getParent().toString()) + .collect(Collectors.toList()); + List recordsRead = + HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), inputPaths, basePath(), new JobConf(hadoopConf()), true, false); // Wrote 20 records and deleted 20 records, so remaining 20-20 = 0 assertEquals(0, recordsRead.size(), "Must contain 0 records"); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java index 3b5ce0fa1..8097741dd 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java @@ -24,6 +24,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieRecord; @@ -40,6 +41,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -64,6 +66,7 @@ import java.io.IOException; import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -172,10 +175,12 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction JavaRDD writeRecords = jsc().parallelize(records, 1); JavaRDD writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); - client.commit(newCommitTime, writeStatusJavaRDD); + List statuses = writeStatusJavaRDD.collect(); assertNoWriteErrors(statuses); + client.commit(newCommitTime, jsc().parallelize(statuses)); + HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); @@ -208,8 +213,10 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords); copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200)); - List dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); - List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, + List inputPaths = tableView.getLatestBaseFiles() + .map(baseFile -> new Path(baseFile.getPath()).getParent().toString()) + .collect(Collectors.toList()); + List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), inputPaths, basePath()); assertEquals(200, recordsRead.size()); @@ -225,8 +232,10 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction .contains(commitTime1)).map(fileStatus -> fileStatus.getPath().toString()).collect(Collectors.toList()); assertEquals(0, remainingFiles.size(), "There files should have been rolled-back " + "when rolling back commit " + commitTime1 + " but are still remaining. Files: " + remainingFiles); - dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); - recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, basePath()); + inputPaths = tableView.getLatestBaseFiles() + .map(baseFile -> new Path(baseFile.getPath()).getParent().toString()) + .collect(Collectors.toList()); + recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), inputPaths, basePath()); assertEquals(200, recordsRead.size()); } @@ -241,8 +250,10 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction copyOfRecords = dataGen.generateUpdates(commitTime2, copyOfRecords); copyOfRecords.addAll(dataGen.generateInserts(commitTime2, 200)); - List dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); - List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, + List inputPaths = tableView.getLatestBaseFiles() + .map(baseFile -> new Path(baseFile.getPath()).getParent().toString()) + .collect(Collectors.toList()); + List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), inputPaths, basePath()); assertEquals(200, recordsRead.size()); @@ -262,8 +273,10 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction metaClient = HoodieTableMetaClient.reload(metaClient); hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); - recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, basePath()); + inputPaths = tableView.getLatestBaseFiles() + .map(baseFile -> new Path(baseFile.getPath()).getParent().toString()) + .collect(Collectors.toList()); + recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), inputPaths, basePath()); // check that the number of records read is still correct after rollback operation assertEquals(200, recordsRead.size()); @@ -275,11 +288,13 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction thirdClient.startCommitWithTime(newCommitTime); writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime); + statuses = writeStatusJavaRDD.collect(); - thirdClient.commit(newCommitTime, writeStatusJavaRDD); // Verify there are no errors assertNoWriteErrors(statuses); + thirdClient.commit(newCommitTime, jsc().parallelize(statuses)); + metaClient = HoodieTableMetaClient.reload(metaClient); String compactionInstantTime = thirdClient.scheduleCompaction(Option.empty()).get().toString(); @@ -317,8 +332,8 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties); try (final SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { - HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + /* * Write 1 (only inserts) */ @@ -329,20 +344,29 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction JavaRDD writeRecords = jsc().parallelize(records, 1); JavaRDD writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); - client.commit(newCommitTime, writeStatusJavaRDD); + List statuses = writeStatusJavaRDD.collect(); assertNoWriteErrors(statuses); + + client.commit(newCommitTime, jsc().parallelize(statuses)); client.close(); - HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); + Option> instantCommitMetadataPairOpt = + metaClient.getActiveTimeline().getLastCommitMetadataWithValidData(); - Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); - assertTrue(deltaCommit.isPresent()); - assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit should be 001"); + assertTrue(instantCommitMetadataPairOpt.isPresent()); + + HoodieInstant commitInstant = instantCommitMetadataPairOpt.get().getKey(); + + assertEquals("001", commitInstant.getTimestamp()); + assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION, commitInstant.getAction()); + assertEquals(200, getTotalRecordsWritten(instantCommitMetadataPairOpt.get().getValue())); Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); assertFalse(commit.isPresent()); + HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); + FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); Stream dataFilesToRead = tableView.getLatestBaseFiles(); @@ -352,6 +376,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent(), "Should list the base files we wrote in the delta commit"); + /* * Write 2 (inserts + updates) */ @@ -368,7 +393,9 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords); copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200)); - List dataFiles = tableView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); + List dataFiles = tableView.getLatestBaseFiles() + .map(baseFile -> new Path(baseFile.getPath()).getParent().toString()) + .collect(Collectors.toList()); List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, basePath()); assertEquals(200, recordsRead.size()); @@ -376,7 +403,9 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction statuses = nClient.upsert(jsc().parallelize(copyOfRecords, 1), newCommitTime).collect(); // Verify there are no errors assertNoWriteErrors(statuses); - nClient.commit(newCommitTime, writeStatusJavaRDD); + + nClient.commit(newCommitTime, jsc().parallelize(statuses)); + copyOfRecords.clear(); } @@ -393,11 +422,12 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction writeRecords = jsc().parallelize(records, 1); writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); - client.commit(newCommitTime, writeStatusJavaRDD); statuses = writeStatusJavaRDD.collect(); // Verify there are no errors assertNoWriteErrors(statuses); + client.commit(newCommitTime, jsc().parallelize(statuses)); + metaClient = HoodieTableMetaClient.reload(metaClient); String compactionInstantTime = "004"; @@ -414,11 +444,12 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction writeRecords = jsc().parallelize(records, 1); writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); - client.commit(newCommitTime, writeStatusJavaRDD); statuses = writeStatusJavaRDD.collect(); // Verify there are no errors assertNoWriteErrors(statuses); + client.commit(newCommitTime, jsc().parallelize(statuses)); + metaClient = HoodieTableMetaClient.reload(metaClient); compactionInstantTime = "006"; @@ -447,7 +478,9 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction statuses = client.upsert(jsc().parallelize(copyOfRecords, 1), newCommitTime).collect(); // Verify there are no errors assertNoWriteErrors(statuses); - client.commit(newCommitTime, writeStatusJavaRDD); + + client.commit(newCommitTime, jsc().parallelize(statuses)); + copyOfRecords.clear(); // Rollback latest commit first @@ -471,6 +504,13 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction } } + private long getTotalRecordsWritten(HoodieCommitMetadata commitMetadata) { + return commitMetadata.getPartitionToWriteStats().values().stream() + .flatMap(Collection::stream) + .map(stat -> stat.getNumWrites() + stat.getNumUpdateWrites()) + .reduce(0L, Long::sum); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) void testMORTableRestore(boolean restoreAfterCompaction) throws Exception { @@ -523,8 +563,6 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction JavaRDD writeRecords = jsc().parallelize(records, 1); JavaRDD writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); client.commit(newCommitTime, writeStatusJavaRDD); - List statuses = writeStatusJavaRDD.collect(); - assertNoWriteErrors(statuses); return records; } @@ -541,8 +579,10 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - List dataFiles = tableView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); - List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, + List inputPaths = tableView.getLatestBaseFiles() + .map(hf -> new Path(hf.getPath()).getParent().toString()) + .collect(Collectors.toList()); + List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), inputPaths, basePath()); assertRecords(expectedRecords, recordsRead); } @@ -603,9 +643,8 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction List records = dataGen.generateInserts(newCommitTime, 100); JavaRDD recordsRDD = jsc().parallelize(records, 1); - JavaRDD statuses = writeClient.insert(recordsRDD, newCommitTime); // trigger an action - List writeStatuses = statuses.collect(); + List writeStatuses = ((JavaRDD) writeClient.insert(recordsRDD, newCommitTime)).collect(); // Ensure that inserts are written to only log files assertEquals(0, diff --git a/hudi-common/src/main/java/org/apache/hudi/TypeUtils.java b/hudi-common/src/main/java/org/apache/hudi/TypeUtils.java new file mode 100644 index 000000000..6e7d2c874 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/TypeUtils.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi; + +public class TypeUtils { + + /** + * This utility abstracts unsafe type-casting in a way that allows to + *
    + *
  • Search for such type-casts more easily (just searching for usages of this method)
  • + *
  • Avoid type-cast warnings from the compiler
  • + *
+ */ + @SuppressWarnings("unchecked") + public static T unsafeCast(Object o) { + return (T) o; + } + +} diff --git a/hudi-common/src/main/scala/org/apache/hudi/HoodieTableFileIndexBase.scala b/hudi-common/src/main/scala/org/apache/hudi/HoodieTableFileIndexBase.scala index f25c7d99d..f46b79f38 100644 --- a/hudi-common/src/main/scala/org/apache/hudi/HoodieTableFileIndexBase.scala +++ b/hudi-common/src/main/scala/org/apache/hudi/HoodieTableFileIndexBase.scala @@ -24,11 +24,13 @@ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType} import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView} import scala.collection.JavaConverters._ import scala.collection.JavaConversions._ import scala.collection.mutable +import scala.language.implicitConversions /** * Common (engine-agnostic) File Index implementation enabling individual query engines to @@ -87,6 +89,12 @@ abstract class HoodieTableFileIndexBase(engineContext: HoodieEngineContext, refresh0() + /** + * Returns latest completed instant as seen by this instance of the file-index + */ + def latestCompletedInstant(): Option[HoodieInstant] = + getActiveTimeline.filterCompletedInstants().lastInstant() + /** * Fetch list of latest base files and log files per partition. * @@ -171,11 +179,17 @@ abstract class HoodieTableFileIndexBase(engineContext: HoodieEngineContext, } private def getActiveTimeline = { - val timeline = metaClient.getActiveTimeline.getCommitsTimeline + // NOTE: We have to use commits and compactions timeline, to make sure that we're properly + // handling the following case: when records are inserted into the new log-file w/in the file-group + // that is under the pending compaction process, new log-file will bear the compaction's instant (on the + // timeline) in its name, as opposed to the base-file's commit instant. To make sure we're not filtering + // such log-file we have to _always_ include pending compaction instants into consideration + // TODO(HUDI-3302) re-evaluate whether we should not filter any commits in here + val timeline = metaClient.getCommitsAndCompactionTimeline if (shouldIncludePendingCommits) { timeline } else { - timeline.filterCompletedInstants() + timeline.filterCompletedAndCompactionInstants() } } @@ -291,6 +305,16 @@ abstract class HoodieTableFileIndexBase(engineContext: HoodieEngineContext, } } } + + /** + * Converts Hudi's internal representation of the {@code Option} into Scala's default one + */ + implicit def asScalaOption[T](opt: org.apache.hudi.common.util.Option[T]): Option[T] = + if (opt.isPresent) { + Some(opt.get) + } else { + None + } } trait FileStatusCacheTrait { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java index c9afa9119..d0b168f29 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java @@ -36,7 +36,7 @@ import java.util.List; */ public class BaseFileWithLogsSplit extends FileSplit { // a flag to mark this split is produced by incremental query or not. - private boolean belongToIncrementalSplit = false; + private boolean belongsToIncrementalQuery = false; // the log file paths of this split. private List deltaLogFiles = new ArrayList<>(); // max commit time of current split. @@ -53,7 +53,7 @@ public class BaseFileWithLogsSplit extends FileSplit { @Override public void write(DataOutput out) throws IOException { super.write(out); - out.writeBoolean(belongToIncrementalSplit); + out.writeBoolean(belongsToIncrementalQuery); Text.writeString(out, maxCommitTime); Text.writeString(out, basePath); Text.writeString(out, baseFilePath); @@ -67,7 +67,7 @@ public class BaseFileWithLogsSplit extends FileSplit { @Override public void readFields(DataInput in) throws IOException { super.readFields(in); - belongToIncrementalSplit = in.readBoolean(); + belongsToIncrementalQuery = in.readBoolean(); maxCommitTime = Text.readString(in); basePath = Text.readString(in); baseFilePath = Text.readString(in); @@ -81,12 +81,12 @@ public class BaseFileWithLogsSplit extends FileSplit { deltaLogFiles = tempDeltaLogs; } - public boolean getBelongToIncrementalSplit() { - return belongToIncrementalSplit; + public boolean getBelongsToIncrementalQuery() { + return belongsToIncrementalQuery; } - public void setBelongToIncrementalSplit(boolean belongToIncrementalSplit) { - this.belongToIncrementalSplit = belongToIncrementalSplit; + public void setBelongsToIncrementalQuery(boolean belongsToIncrementalQuery) { + this.belongsToIncrementalQuery = belongsToIncrementalQuery; } public List getDeltaLogFiles() { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BootstrapBaseFileSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BootstrapBaseFileSplit.java index 437304fb0..1a609e042 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BootstrapBaseFileSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BootstrapBaseFileSplit.java @@ -32,6 +32,10 @@ public class BootstrapBaseFileSplit extends FileSplit { private FileSplit bootstrapFileSplit; + /** + * NOTE: This ctor is necessary for Hive to be able to serialize and + * then instantiate it when deserializing back + */ public BootstrapBaseFileSplit() { super(); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java index a35eb5094..010b8d6ac 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java @@ -38,6 +38,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import scala.collection.JavaConverters; @@ -70,8 +71,6 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat logFiles) { - List sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); - try { - RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(latestLogFile.getFileStatus()); - rtFileStatus.setDeltaLogFiles(sortedLogFiles); - return rtFileStatus; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Nonnull - private static FileStatus getFileStatusUnchecked(Option baseFileOpt) { - try { - return HoodieInputFormatUtils.getFileStatus(baseFileOpt.get()); - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } - @Override public FileStatus[] listStatus(JobConf job) throws IOException { // Segregate inputPaths[] to incremental, snapshot and non hoodie paths @@ -143,6 +121,126 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat targetFiles, List legacyFileStatuses) { + List diff = CollectionUtils.diff(targetFiles, legacyFileStatuses); + checkState(diff.isEmpty(), "Should be empty"); + } + + @Nonnull + private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) { + try { + return HoodieInputFormatUtils.getFileStatus(baseFile); + } catch (IOException ioe) { + throw new HoodieIOException("Failed to get file-status", ioe); + } + } + + /** + * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that + * lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified + * as part of provided {@link JobConf} + */ + protected final FileStatus[] doListStatus(JobConf job) throws IOException { + return super.listStatus(job); + } + + /** + * Achieves listStatus functionality for an incrementally queried table. Instead of listing all + * partitions and then filtering based on the commits of interest, this logic first extracts the + * partitions touched by the desired commits and then lists only those partitions. + */ + protected List listStatusForIncrementalMode(JobConf job, + HoodieTableMetaClient tableMetaClient, + List inputPaths, + String incrementalTable) throws IOException { + Job jobContext = Job.getInstance(job); + Option timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient); + if (!timeline.isPresent()) { + return null; + } + Option> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, incrementalTable, timeline.get()); + if (!commitsToCheck.isPresent()) { + return null; + } + Option incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths); + // Mutate the JobConf to set the input paths to only partitions touched by incremental pull. + if (!incrementalInputPaths.isPresent()) { + return null; + } + setInputPaths(job, incrementalInputPaths.get()); + FileStatus[] fileStatuses = doListStatus(job); + return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get()); + } + + protected abstract boolean includeLogFilesForSnapshotView(); + + @Nonnull + private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile, + Stream logFiles, + Option latestCompletedInstantOpt, + HoodieTableMetaClient tableMetaClient) { + List sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); + FileStatus baseFileStatus = getFileStatusUnchecked(baseFile); + try { + RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus); + rtFileStatus.setDeltaLogFiles(sortedLogFiles); + rtFileStatus.setBaseFilePath(baseFile.getPath()); + rtFileStatus.setBasePath(tableMetaClient.getBasePath()); + + if (latestCompletedInstantOpt.isPresent()) { + HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get(); + checkState(latestCompletedInstant.isCompleted()); + + rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp()); + } + + if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) { + rtFileStatus.setBootStrapFileStatus(baseFileStatus); + } + + return rtFileStatus; + } catch (IOException e) { + throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e); + } + } + + @Nonnull + private List listStatusForSnapshotModeLegacy(JobConf job, Map tableMetaClientMap, List snapshotPaths) throws IOException { + return HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapshotView()); + } + + @Nonnull + private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile, + Stream logFiles, + Option latestCompletedInstantOpt, + HoodieTableMetaClient tableMetaClient) { + List sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); + try { + RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(latestLogFile.getFileStatus()); + rtFileStatus.setDeltaLogFiles(sortedLogFiles); + rtFileStatus.setBasePath(tableMetaClient.getBasePath()); + + if (latestCompletedInstantOpt.isPresent()) { + HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get(); + checkState(latestCompletedInstant.isCompleted()); + + rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp()); + } + + return rtFileStatus; + } catch (IOException e) { + throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e); + } + } + + private static Option fromScala(scala.Option opt) { + if (opt.isDefined()) { + return Option.of(opt.get()); + } + + return Option.empty(); + } + @Nonnull private List listStatusForSnapshotMode(JobConf job, Map tableMetaClientMap, @@ -187,13 +285,28 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat { Option baseFileOpt = fileSlice.getBaseFile(); Option latestLogFileOpt = fileSlice.getLatestLogFile(); - if (baseFileOpt.isPresent()) { - return getFileStatusUnchecked(baseFileOpt); - } else if (includeLogFilesForSnapShotView() && latestLogFileOpt.isPresent()) { - return createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), fileSlice.getLogFiles()); + Stream logFiles = fileSlice.getLogFiles(); + + Option latestCompletedInstantOpt = + fromScala(fileIndex.latestCompletedInstant()); + + // Check if we're reading a MOR table + if (includeLogFilesForSnapshotView()) { + if (baseFileOpt.isPresent()) { + return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles, latestCompletedInstantOpt, tableMetaClient); + } else if (latestLogFileOpt.isPresent()) { + return createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), logFiles, latestCompletedInstantOpt, tableMetaClient); + } else { + throw new IllegalStateException("Invalid state: either base-file or log-file has to be present"); + } } else { - throw new IllegalStateException("Invalid state: either base-file or log-file should be present"); + if (baseFileOpt.isPresent()) { + return getFileStatusUnchecked(baseFileOpt.get()); + } else { + throw new IllegalStateException("Invalid state: base-file has to be present"); + } } + }) .collect(Collectors.toList()) ); @@ -204,49 +317,4 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat targetFiles, List legacyFileStatuses) { - List diff = CollectionUtils.diff(targetFiles, legacyFileStatuses); - checkState(diff.isEmpty(), "Should be empty"); - } - - @Nonnull - private List listStatusForSnapshotModeLegacy(JobConf job, Map tableMetaClientMap, List snapshotPaths) throws IOException { - return HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapShotView()); - } - - /** - * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that - * lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified - * as part of provided {@link JobConf} - */ - protected final FileStatus[] doListStatus(JobConf job) throws IOException { - return super.listStatus(job); - } - - /** - * Achieves listStatus functionality for an incrementally queried table. Instead of listing all - * partitions and then filtering based on the commits of interest, this logic first extracts the - * partitions touched by the desired commits and then lists only those partitions. - */ - protected List listStatusForIncrementalMode(JobConf job, HoodieTableMetaClient tableMetaClient, - List inputPaths, String incrementalTable) throws IOException { - Job jobContext = Job.getInstance(job); - Option timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient); - if (!timeline.isPresent()) { - return null; - } - Option> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, incrementalTable, timeline.get()); - if (!commitsToCheck.isPresent()) { - return null; - } - Option incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths); - // Mutate the JobConf to set the input paths to only partitions touched by incremental pull. - if (!incrementalInputPaths.isPresent()) { - return null; - } - setInputPaths(job, incrementalInputPaths.get()); - FileStatus[] fileStatuses = doListStatus(job); - return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get()); - } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java index 2baf140e2..cb13bf026 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java @@ -42,7 +42,7 @@ public class HoodieHFileInputFormat extends HoodieFileInputFormatBase { } @Override - protected boolean includeLogFilesForSnapShotView() { + protected boolean includeLogFilesForSnapshotView() { return false; } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java index f63352829..078258f3e 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java @@ -66,7 +66,7 @@ public class HoodieParquetInputFormat extends HoodieFileInputFormatBase implemen return HoodieInputFormatUtils.filterInstantsTimeline(timeline); } - protected boolean includeLogFilesForSnapShotView() { + protected boolean includeLogFilesForSnapshotView() { return false; } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java index 07bd82afa..24d190700 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java @@ -35,7 +35,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getTableMetaClientForBasePath; +import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getTableMetaClientForBasePathUnchecked; /** * InputPathHandler takes in a set of input paths and incremental tables list. Then, classifies the @@ -107,7 +107,7 @@ public class InputPathHandler { // This path is for a table that we don't know about yet. HoodieTableMetaClient metaClient; try { - metaClient = getTableMetaClientForBasePath(inputPath.getFileSystem(conf), inputPath); + metaClient = getTableMetaClientForBasePathUnchecked(conf, inputPath); tableMetaClientMap.put(getIncrementalTable(metaClient), metaClient); tagAsIncrementalOrSnapshot(inputPath, metaClient, incrementalTables); } catch (TableNotFoundException | InvalidTableException e) { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/PathWithLogFilePath.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/PathWithLogFilePath.java index 8f9ac8b03..7983e0929 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/PathWithLogFilePath.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/PathWithLogFilePath.java @@ -31,7 +31,7 @@ import java.util.List; */ public class PathWithLogFilePath extends Path { // a flag to mark this split is produced by incremental query or not. - private boolean belongToIncrementalPath = false; + private boolean belongsToIncrementalPath = false; // the log files belong this path. private List deltaLogFiles = new ArrayList<>(); // max commit time of current path. @@ -48,8 +48,8 @@ public class PathWithLogFilePath extends Path { super(parent, child); } - public void setBelongToIncrementalPath(boolean belongToIncrementalPath) { - this.belongToIncrementalPath = belongToIncrementalPath; + public void setBelongsToIncrementalPath(boolean belongsToIncrementalPath) { + this.belongsToIncrementalPath = belongsToIncrementalPath; } public List getDeltaLogFiles() { @@ -72,6 +72,10 @@ public class PathWithLogFilePath extends Path { return basePath; } + public boolean getBelongsToIncrementalQuery() { + return belongsToIncrementalPath; + } + public void setBasePath(String basePath) { this.basePath = basePath; } @@ -98,7 +102,7 @@ public class PathWithLogFilePath extends Path { public BaseFileWithLogsSplit buildSplit(Path file, long start, long length, String[] hosts) { BaseFileWithLogsSplit bs = new BaseFileWithLogsSplit(file, start, length, hosts); - bs.setBelongToIncrementalSplit(belongToIncrementalPath); + bs.setBelongsToIncrementalQuery(belongsToIncrementalPath); bs.setDeltaLogFiles(deltaLogFiles); bs.setMaxCommitTime(maxCommitTime); bs.setBasePath(basePath); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RealtimeFileStatus.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RealtimeFileStatus.java index e8e1a2898..1d732f5a6 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RealtimeFileStatus.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RealtimeFileStatus.java @@ -56,7 +56,7 @@ public class RealtimeFileStatus extends FileStatus { public Path getPath() { Path path = super.getPath(); PathWithLogFilePath pathWithLogFilePath = new PathWithLogFilePath(path.getParent(), path.getName()); - pathWithLogFilePath.setBelongToIncrementalPath(belongToIncrementalFileStatus); + pathWithLogFilePath.setBelongsToIncrementalPath(belongToIncrementalFileStatus); pathWithLogFilePath.setDeltaLogFiles(deltaLogFiles); pathWithLogFilePath.setMaxCommitTime(maxCommitTime); pathWithLogFilePath.setBasePath(basePath); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java index 525bec613..9c8d5561f 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java @@ -39,7 +39,8 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.util.Arrays; -import java.util.stream.Stream; +import java.util.List; +import java.util.stream.Collectors; /** * HoodieRealtimeInputFormat for HUDI datasets which store data in HFile base file format. @@ -52,7 +53,9 @@ public class HoodieHFileRealtimeInputFormat extends HoodieHFileInputFormat { @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - Stream fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is); + List fileSplits = Arrays.stream(super.getSplits(job, numSplits)) + .map(is -> (FileSplit) is) + .collect(Collectors.toList()); return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java index f3cf4ffa8..d8bfce67d 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java @@ -83,14 +83,11 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - List fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList()); - boolean isIncrementalSplits = HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits); - - return isIncrementalSplits - ? HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, fileSplits.stream()) - : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits.stream()); + return HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits) + ? HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, fileSplits) + : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits); } /** @@ -112,8 +109,10 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i * TODO: unify the incremental view code between hive/spark-sql and spark datasource */ @Override - protected List listStatusForIncrementalMode( - JobConf job, HoodieTableMetaClient tableMetaClient, List inputPaths, String incrementalTable) throws IOException { + protected List listStatusForIncrementalMode(JobConf job, + HoodieTableMetaClient tableMetaClient, + List inputPaths, + String incrementalTable) throws IOException { List result = new ArrayList<>(); Job jobContext = Job.getInstance(job); @@ -217,7 +216,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i } @Override - protected boolean includeLogFilesForSnapShotView() { + protected boolean includeLogFilesForSnapshotView() { return true; } @@ -251,14 +250,18 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i private FileSplit doMakeSplitForPathWithLogFilePath(PathWithLogFilePath path, long start, long length, String[] hosts, String[] inMemoryHosts) { if (!path.includeBootstrapFilePath()) { return path.buildSplit(path, start, length, hosts); - } else { - FileSplit bf = - inMemoryHosts == null - ? super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts) - : super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts, inMemoryHosts); - return HoodieRealtimeInputFormatUtils - .createRealtimeBoostrapBaseFileSplit((BootstrapBaseFileSplit) bf, path.getBasePath(), path.getDeltaLogFiles(), path.getMaxCommitTime()); } + + FileSplit bf = inMemoryHosts == null + ? super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts) + : super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts, inMemoryHosts); + + return HoodieRealtimeInputFormatUtils.createRealtimeBoostrapBaseFileSplit( + (BootstrapBaseFileSplit) bf, + path.getBasePath(), + path.getDeltaLogFiles(), + path.getMaxCommitTime(), + path.getBelongsToIncrementalQuery()); } @Override diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java index a39ec3550..2b45fe3f3 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java @@ -32,6 +32,9 @@ import java.util.stream.Collectors; /** * Filesplit that wraps the base split and a list of log files to merge deltas from. + * + * NOTE: If you're adding fields here you need to make sure that you appropriately de-/serialize them + * in {@link #readFromInput(DataInput)} and {@link #writeToOutput(DataOutput)} */ public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit { @@ -44,9 +47,7 @@ public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit private Option hoodieVirtualKeyInfo = Option.empty(); - public HoodieRealtimeFileSplit() { - super(); - } + public HoodieRealtimeFileSplit() {} public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List deltaLogFiles, String maxCommitTime, Option hoodieVirtualKeyInfo) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java index 79d2d815e..2ac720446 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java @@ -18,11 +18,11 @@ package org.apache.hudi.hadoop.realtime; +import org.apache.hadoop.mapred.FileSplit; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.BootstrapBaseFileSplit; - -import org.apache.hadoop.mapred.FileSplit; +import org.apache.hudi.hadoop.InputSplitUtils; import java.io.DataInput; import java.io.DataOutput; @@ -33,6 +33,9 @@ import java.util.stream.Collectors; /** * Realtime File Split with external base file. + * + * NOTE: If you're adding fields here you need to make sure that you appropriately de-/serialize them + * in {@link #readFromInput(DataInput)} and {@link #writeToOutput(DataOutput)} */ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit implements RealtimeSplit { @@ -43,29 +46,42 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit imple private String basePath; + private boolean belongsToIncrementalSplit; + + /** + * NOTE: This ctor is necessary for Hive to be able to serialize and + * then instantiate it when deserializing back + */ public RealtimeBootstrapBaseFileSplit() { super(); } - public RealtimeBootstrapBaseFileSplit(FileSplit baseSplit, String basePath, List deltaLogFiles, - String maxInstantTime, FileSplit externalFileSplit) throws IOException { + public RealtimeBootstrapBaseFileSplit(FileSplit baseSplit, + String basePath, + List deltaLogFiles, + String maxInstantTime, + FileSplit externalFileSplit, + boolean belongsToIncrementalQuery) throws IOException { super(baseSplit, externalFileSplit); this.maxInstantTime = maxInstantTime; this.deltaLogFiles = deltaLogFiles; this.deltaLogPaths = deltaLogFiles.stream().map(entry -> entry.getPath().toString()).collect(Collectors.toList()); this.basePath = basePath; + this.belongsToIncrementalSplit = belongsToIncrementalQuery; } @Override public void write(DataOutput out) throws IOException { super.write(out); writeToOutput(out); + InputSplitUtils.writeBoolean(belongsToIncrementalSplit, out); } @Override public void readFields(DataInput in) throws IOException { super.readFields(in); readFromInput(in); + belongsToIncrementalSplit = InputSplitUtils.readBoolean(in); } @Override @@ -93,6 +109,10 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit imple return Option.empty(); } + public boolean getBelongsToIncrementalQuery() { + return belongsToIncrementalSplit; + } + @Override public void setDeltaLogPaths(List deltaLogPaths) { this.deltaLogPaths = deltaLogPaths; diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java index a7f0d2cc2..6c1e02cf6 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java @@ -90,14 +90,16 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo { for (String logFilePath : getDeltaLogPaths()) { InputSplitUtils.writeString(logFilePath, out); } - if (!getHoodieVirtualKeyInfo().isPresent()) { + + Option virtualKeyInfoOpt = getHoodieVirtualKeyInfo(); + if (!virtualKeyInfoOpt.isPresent()) { InputSplitUtils.writeBoolean(false, out); } else { InputSplitUtils.writeBoolean(true, out); - InputSplitUtils.writeString(getHoodieVirtualKeyInfo().get().getRecordKeyField(), out); - InputSplitUtils.writeString(getHoodieVirtualKeyInfo().get().getPartitionPathField(), out); - InputSplitUtils.writeString(String.valueOf(getHoodieVirtualKeyInfo().get().getRecordKeyFieldIndex()), out); - InputSplitUtils.writeString(String.valueOf(getHoodieVirtualKeyInfo().get().getPartitionPathFieldIndex()), out); + InputSplitUtils.writeString(virtualKeyInfoOpt.get().getRecordKeyField(), out); + InputSplitUtils.writeString(virtualKeyInfoOpt.get().getPartitionPathField(), out); + InputSplitUtils.writeString(String.valueOf(virtualKeyInfoOpt.get().getRecordKeyFieldIndex()), out); + InputSplitUtils.writeString(String.valueOf(virtualKeyInfoOpt.get().getPartitionPathFieldIndex()), out); } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index a5a3f7e21..63abfc874 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -318,7 +318,7 @@ public class HoodieInputFormatUtils { Map metaClientMap = new HashMap<>(); return partitions.stream().collect(Collectors.toMap(Function.identity(), p -> { try { - HoodieTableMetaClient metaClient = getTableMetaClientForBasePath(p.getFileSystem(conf), p); + HoodieTableMetaClient metaClient = getTableMetaClientForBasePathUnchecked(conf, p); metaClientMap.put(p, metaClient); return metaClient; } catch (IOException e) { @@ -328,20 +328,17 @@ public class HoodieInputFormatUtils { } /** - * Extract HoodieTableMetaClient from a partition path(not base path). - * @param fs - * @param dataPath - * @return - * @throws IOException + * Extract HoodieTableMetaClient from a partition path (not base path) */ - public static HoodieTableMetaClient getTableMetaClientForBasePath(FileSystem fs, Path dataPath) throws IOException { + public static HoodieTableMetaClient getTableMetaClientForBasePathUnchecked(Configuration conf, Path partitionPath) throws IOException { + FileSystem fs = partitionPath.getFileSystem(conf); int levels = HoodieHiveUtils.DEFAULT_LEVELS_TO_BASEPATH; - if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) { - HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath); + if (HoodiePartitionMetadata.hasPartitionMetadata(fs, partitionPath)) { + HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, partitionPath); metadata.readFromFS(); levels = metadata.getPartitionDepth(); } - Path baseDir = HoodieHiveUtils.getNthParent(dataPath, levels); + Path baseDir = HoodieHiveUtils.getNthParent(partitionPath, levels); LOG.info("Reading hoodie metadata from path " + baseDir.toString()); return HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(baseDir.toString()).build(); } @@ -440,6 +437,9 @@ public class HoodieInputFormatUtils { .build(); } + /** + * @deprecated + */ public static List filterFileStatusForSnapshotMode(JobConf job, Map tableMetaClientMap, List snapshotPaths, boolean includeLogFiles) throws IOException { HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(job); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java index 6718642d2..7dc58d1e2 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java @@ -18,6 +18,13 @@ package org.apache.hudi.hadoop.utils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; @@ -42,14 +49,6 @@ import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit; import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo; import org.apache.hudi.hadoop.realtime.RealtimeBootstrapBaseFileSplit; import org.apache.hudi.hadoop.realtime.RealtimeSplit; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.schema.MessageType; @@ -57,6 +56,7 @@ import org.apache.parquet.schema.MessageType; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -65,11 +65,70 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.TypeUtils.unsafeCast; +import static org.apache.hudi.common.util.ValidationUtils.checkState; + public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class); - public static InputSplit[] getRealtimeSplits(Configuration conf, Stream fileSplits) { + public static InputSplit[] getRealtimeSplits(Configuration conf, List fileSplits) throws IOException { + if (fileSplits.isEmpty()) { + return new InputSplit[0]; + } + + FileSplit fileSplit = fileSplits.get(0); + + // Pre-process table-config to fetch virtual key info + Path partitionPath = fileSplit.getPath().getParent(); + HoodieTableMetaClient metaClient = getTableMetaClientForBasePathUnchecked(conf, partitionPath); + + Option hoodieVirtualKeyInfoOpt = getHoodieVirtualKeyInfo(metaClient); + + // NOTE: This timeline is kept in sync w/ {@code HoodieTableFileIndexBase} + HoodieInstant latestCommitInstant = + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get(); + + InputSplit[] finalSplits = fileSplits.stream() + .map(split -> { + // There are 4 types of splits could we have to handle here + // - {@code BootstrapBaseFileSplit}: in case base file does have associated bootstrap file, + // but does NOT have any log files appended (convert it to {@code RealtimeBootstrapBaseFileSplit}) + // - {@code RealtimeBootstrapBaseFileSplit}: in case base file does have associated bootstrap file + // and does have log files appended + // - {@code BaseFileWithLogsSplit}: in case base file does NOT have associated bootstrap file + // and does have log files appended; + // - {@code FileSplit}: in case Hive passed down non-Hudi path + if (split instanceof RealtimeBootstrapBaseFileSplit) { + return split; + } else if (split instanceof BootstrapBaseFileSplit) { + BootstrapBaseFileSplit bootstrapBaseFileSplit = unsafeCast(split); + return createRealtimeBoostrapBaseFileSplit( + bootstrapBaseFileSplit, + metaClient.getBasePath(), + Collections.emptyList(), + latestCommitInstant.getTimestamp(), + false); + } else if (split instanceof BaseFileWithLogsSplit) { + BaseFileWithLogsSplit baseFileWithLogsSplit = unsafeCast(split); + return createHoodieRealtimeSplitUnchecked(baseFileWithLogsSplit, hoodieVirtualKeyInfoOpt); + } else { + // Non-Hudi paths might result in just generic {@code FileSplit} being + // propagated up to this point + return split; + } + }) + .toArray(InputSplit[]::new); + + LOG.info("Returning a total splits of " + finalSplits.length); + + return finalSplits; + } + + /** + * @deprecated + */ + public static InputSplit[] getRealtimeSplitsLegacy(Configuration conf, Stream fileSplits) { Map> partitionsToParquetSplits = fileSplits.collect(Collectors.groupingBy(split -> split.getPath().getParent())); // TODO(vc): Should we handle also non-hoodie splits here? @@ -124,7 +183,7 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { .collect(Collectors.toList()); if (split instanceof BootstrapBaseFileSplit) { BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) split; - rtSplits.add(createRealtimeBoostrapBaseFileSplit(eSplit, metaClient.getBasePath(), logFiles, maxCommitTime)); + rtSplits.add(createRealtimeBoostrapBaseFileSplit(eSplit, metaClient.getBasePath(), logFiles, maxCommitTime, false)); } else { rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFiles, maxCommitTime, finalHoodieVirtualKeyInfo)); } @@ -144,11 +203,16 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { return rtSplits.toArray(new InputSplit[0]); } + /** + * @deprecated will be replaced w/ {@link #getRealtimeSplits(Configuration, List)} + */ // get IncrementalRealtimeSplits - public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, Stream fileSplits) throws IOException { + public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, List fileSplits) throws IOException { + checkState(fileSplits.stream().allMatch(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery), + "All splits have to belong to incremental query"); + List rtSplits = new ArrayList<>(); - List fileSplitList = fileSplits.collect(Collectors.toList()); - Set partitionSet = fileSplitList.stream().map(f -> f.getPath().getParent()).collect(Collectors.toSet()); + Set partitionSet = fileSplits.stream().map(f -> f.getPath().getParent()).collect(Collectors.toSet()); Map partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionSet); // Pre process tableConfig from first partition to fetch virtual key info Option hoodieVirtualKeyInfo = Option.empty(); @@ -156,14 +220,12 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { hoodieVirtualKeyInfo = getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next())); } Option finalHoodieVirtualKeyInfo = hoodieVirtualKeyInfo; - fileSplitList.stream().forEach(s -> { + fileSplits.stream().forEach(s -> { // deal with incremental query. try { if (s instanceof BaseFileWithLogsSplit) { - BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s; - if (bs.getBelongToIncrementalSplit()) { - rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo)); - } + BaseFileWithLogsSplit bs = unsafeCast(s); + rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo)); } else if (s instanceof RealtimeBootstrapBaseFileSplit) { rtSplits.add(s); } @@ -191,22 +253,30 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { return Option.empty(); } + private static boolean doesBelongToIncrementalQuery(FileSplit s) { + if (s instanceof BaseFileWithLogsSplit) { + BaseFileWithLogsSplit bs = unsafeCast(s); + return bs.getBelongsToIncrementalQuery(); + } else if (s instanceof RealtimeBootstrapBaseFileSplit) { + RealtimeBootstrapBaseFileSplit bs = unsafeCast(s); + return bs.getBelongsToIncrementalQuery(); + } + + return false; + } + public static boolean isIncrementalQuerySplits(List fileSplits) { if (fileSplits == null || fileSplits.size() == 0) { return false; } - return fileSplits.stream().anyMatch(s -> { - if (s instanceof BaseFileWithLogsSplit) { - BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s; - return bs.getBelongToIncrementalSplit(); - } else { - return s instanceof RealtimeBootstrapBaseFileSplit; - } - }); + return fileSplits.stream().anyMatch(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery); } - public static RealtimeBootstrapBaseFileSplit createRealtimeBoostrapBaseFileSplit( - BootstrapBaseFileSplit split, String basePath, List logFiles, String maxInstantTime) { + public static RealtimeBootstrapBaseFileSplit createRealtimeBoostrapBaseFileSplit(BootstrapBaseFileSplit split, + String basePath, + List logFiles, + String maxInstantTime, + boolean belongsToIncrementalQuery) { try { String[] hosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo()) .filter(x -> !x.isInMemory()).toArray(String[]::new) : new String[0]; @@ -214,7 +284,7 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { .filter(SplitLocationInfo::isInMemory).toArray(String[]::new) : new String[0]; FileSplit baseSplit = new FileSplit(split.getPath(), split.getStart(), split.getLength(), hosts, inMemoryHosts); - return new RealtimeBootstrapBaseFileSplit(baseSplit, basePath, logFiles, maxInstantTime, split.getBootstrapFileSplit()); + return new RealtimeBootstrapBaseFileSplit(baseSplit, basePath, logFiles, maxInstantTime, split.getBootstrapFileSplit(), belongsToIncrementalQuery); } catch (IOException e) { throw new HoodieIOException("Error creating hoodie real time split ", e); } @@ -330,4 +400,18 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { } } } + + private static HoodieRealtimeFileSplit createHoodieRealtimeSplitUnchecked(BaseFileWithLogsSplit baseFileWithLogsSplit, + Option hoodieVirtualKeyInfoOpt) { + try { + return new HoodieRealtimeFileSplit( + baseFileWithLogsSplit, + baseFileWithLogsSplit.getBasePath(), + baseFileWithLogsSplit.getDeltaLogFiles(), + baseFileWithLogsSplit.getMaxCommitTime(), + hoodieVirtualKeyInfoOpt); + } catch (IOException e) { + throw new HoodieIOException(String.format("Failed to init %s", HoodieRealtimeFileSplit.class.getSimpleName()), e); + } + } } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java index 50c3f2e1c..32d7f1e3b 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java @@ -245,7 +245,7 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness { } @Test - public void testMutilReaderRealtimeComineHoodieInputFormat() throws Exception { + public void testMultiReaderRealtimeCombineHoodieInputFormat() throws Exception { // test for hudi-1722 Configuration conf = new Configuration(); // initial commit