From 7cc75e0be25b7c3345efd890b1a353d49db5bace Mon Sep 17 00:00:00 2001 From: satishkotha Date: Thu, 4 Mar 2021 17:43:31 -0800 Subject: [PATCH] [HUDI-1646] Provide mechanism to read uncommitted data through InputFormat (#2611) --- .../table/timeline/HoodieDefaultTimeline.java | 7 ++++ .../common/table/timeline/HoodieTimeline.java | 5 +++ .../table/view/FileSystemViewManager.java | 17 +++++--- .../hudi/hadoop/utils/HoodieHiveUtils.java | 22 ++++++++++ .../hadoop/utils/HoodieInputFormatUtils.java | 3 +- .../hadoop/TestHoodieParquetInputFormat.java | 41 +++++++++++++++++++ .../hadoop/testutils/InputFormatTestUtil.java | 20 +++++++++ 7 files changed, 109 insertions(+), 6 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index e31f7c19c..fa494931a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -157,6 +157,13 @@ public class HoodieDefaultTimeline implements HoodieTimeline { details); } + @Override + public HoodieDefaultTimeline findInstantsBeforeOrEquals(String instantTime) { + return new HoodieDefaultTimeline(instants.stream() + .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN_OR_EQUALS, instantTime)), + details); + } + @Override public HoodieTimeline filter(Predicate filter) { return new HoodieDefaultTimeline(instants.stream().filter(filter), details); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index 116e9ed09..b48543c3d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -176,6 +176,11 @@ public interface HoodieTimeline extends Serializable { */ HoodieTimeline findInstantsBefore(String instantTime); + /** + * Create new timeline with all instants before or equals specified time. + */ + HoodieTimeline findInstantsBeforeOrEquals(String instantTime); + /** * Custom Filter of Instants. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java index f614df560..0c218ee44 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java @@ -169,14 +169,21 @@ public class FileSystemViewManager { public static HoodieTableFileSystemView createInMemoryFileSystemView(HoodieEngineContext engineContext, HoodieTableMetaClient metaClient, HoodieMetadataConfig metadataConfig) { + + return createInMemoryFileSystemViewWithTimeline(engineContext, metaClient, metadataConfig, + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()); + + } + + public static HoodieTableFileSystemView createInMemoryFileSystemViewWithTimeline(HoodieEngineContext engineContext, + HoodieTableMetaClient metaClient, + HoodieMetadataConfig metadataConfig, + HoodieTimeline timeline) { LOG.info("Creating InMemory based view for basePath " + metaClient.getBasePath()); if (metadataConfig.useFileListingMetadata()) { - return new HoodieMetadataFileSystemView(engineContext, metaClient, - metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), - metadataConfig); + return new HoodieMetadataFileSystemView(engineContext, metaClient, timeline, metadataConfig); } - return new HoodieTableFileSystemView(metaClient, - metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()); + return new HoodieTableFileSystemView(metaClient, timeline); } /** diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java index 8dbe08075..dae43602b 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java @@ -19,8 +19,12 @@ package org.apache.hudi.hadoop.utils; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.exception.HoodieIOException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -40,6 +44,8 @@ public class HoodieHiveUtils { public static final String HOODIE_CONSUME_MODE_PATTERN = "hoodie.%s.consume.mode"; public static final String HOODIE_START_COMMIT_PATTERN = "hoodie.%s.consume.start.timestamp"; public static final String HOODIE_MAX_COMMIT_PATTERN = "hoodie.%s.consume.max.commits"; + public static final String HOODIE_CONSUME_PENDING_COMMITS = "hoodie.%s.consume.pending.commits"; + public static final String HOODIE_CONSUME_COMMIT = "hoodie.%s.consume.commit"; public static final Set VIRTUAL_COLUMN_NAMES = CollectionUtils.createImmutableSet( "INPUT__FILE__NAME", "BLOCK__OFFSET__INSIDE__FILE", "ROW__OFFSET__INSIDE__BLOCK", "RAW__DATA__SIZE", "ROW__ID", "GROUPING__ID"); @@ -115,4 +121,20 @@ public class HoodieHiveUtils { } return result; } + + public static HoodieTimeline getTableTimeline(final String tableName, final JobConf job, final HoodieTableMetaClient metaClient) { + boolean includePendingCommits = job.getBoolean(String.format(HOODIE_CONSUME_PENDING_COMMITS, tableName), false); + if (includePendingCommits) { + HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline(); + String maxCommit = job.get(String.format(HOODIE_CONSUME_COMMIT, tableName)); + if (maxCommit == null || !timeline.containsInstant(maxCommit)) { + LOG.info("Timestamp configured for validation: " + maxCommit + " commits timeline:" + timeline + " table: " + tableName); + throw new HoodieIOException("Valid timestamp is required for " + HOODIE_CONSUME_COMMIT + " in validate mode"); + } + return timeline.findInstantsBeforeOrEquals(maxCommit); + } + + // by default return all completed commits. + return metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index 8585a32ad..f95ca6f4c 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -439,8 +439,9 @@ public class HoodieInputFormatUtils { LOG.debug("Hoodie Metadata initialized with completed commit instant as :" + metaClient); } + HoodieTimeline timeline = HoodieHiveUtils.getTableTimeline(metaClient.getTableConfig().getTableName(), job, metaClient); HoodieTableFileSystemView fsView = fsViewCache.computeIfAbsent(metaClient, tableMetaClient -> - FileSystemViewManager.createInMemoryFileSystemView(engineContext, tableMetaClient, buildMetadataConfig(job))); + FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, tableMetaClient, buildMetadataConfig(job), timeline)); List filteredBaseFiles = new ArrayList<>(); for (Path p : entry.getValue()) { String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), p); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index 59214ac44..cb59db978 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -28,7 +28,9 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; @@ -57,6 +59,7 @@ import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResou import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; public class TestHoodieParquetInputFormat { @@ -429,6 +432,44 @@ public class TestHoodieParquetInputFormat { } + @Test + public void testSnapshotPreCommitValidate() throws IOException { + // initial commit + File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100"); + createCommitFile(basePath, "100", "2016/05/01"); + + // Add the paths + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + FileStatus[] files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length, "Snapshot read must return all files in partition"); + + // add more files + InputFormatTestUtil.simulateInserts(partitionDir, baseFileExtension, "fileId2-", 5, "200"); + FileCreateUtils.createInflightCommit(basePath.toString(), "200"); + + // Verify that validate mode reads uncommitted files + InputFormatTestUtil.setupSnapshotIncludePendingCommits(jobConf, "200"); + files = inputFormat.listStatus(jobConf); + assertEquals(15, files.length, "Must return uncommitted files"); + ensureFilesInCommit("Pulling 1 commit from 100, should get us the 5 files committed at 200", files, "200", 5); + ensureFilesInCommit("Pulling 1 commit from 100, should get us the 10 files committed at 100", files, "100", 10); + + try { + // Verify that Validate mode throws error with invalid commit time + InputFormatTestUtil.setupSnapshotIncludePendingCommits(jobConf, "300"); + inputFormat.listStatus(jobConf); + fail("Expected list status to fail when validate is called with unknown timestamp"); + } catch (HoodieIOException e) { + // expected because validate is called with invalid instantTime + } + + // verify that snapshot mode skips uncommitted files + InputFormatTestUtil.setupSnapshotScanMode(jobConf); + files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length, "snapshot scan mode must not return uncommitted files"); + ensureFilesInCommit("Pulling 1 commit from 100, should get us the 10 files committed at 100", files, "100", 10); + } + private void ensureRecordsInCommit(String msg, String commit, int expectedNumberOfRecordsInCommit, int totalExpected) throws IOException { int actualCount = 0; diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index a79d980ac..7a6332eb3 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -122,6 +122,26 @@ public class InputFormatTestUtil { String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); jobConf.setInt(maxCommitPulls, numberOfCommitsToPull); } + + public static void setupSnapshotIncludePendingCommits(JobConf jobConf, String instantTime) { + setupSnapshotScanMode(jobConf, true); + String validateTimestampName = + String.format(HoodieHiveUtils.HOODIE_CONSUME_COMMIT, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.set(validateTimestampName, instantTime); + } + + public static void setupSnapshotScanMode(JobConf jobConf) { + setupSnapshotScanMode(jobConf, false); + } + + private static void setupSnapshotScanMode(JobConf jobConf, boolean includePending) { + String modePropertyName = + String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.set(modePropertyName, HoodieHiveUtils.SNAPSHOT_SCAN_MODE); + String includePendingCommitsName = + String.format(HoodieHiveUtils.HOODIE_CONSUME_PENDING_COMMITS, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.setBoolean(includePendingCommitsName, includePending); + } public static File prepareParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles, int numberOfRecords, String commitNumber) throws IOException {