1
0

[HUDI-1789] Support reading older snapshots (#2809)

* [HUDI-1789] In HoodieParquetInoutFormat we currently default to the latest version of base files.
This PR attempts to add a new jobConf
 `hoodie.%s.consume.snapshot.time`

This new config will allow us to read older snapshots.

-  Reusing hoodie.%s.consume.commit for point in time snapshot queries as well.
-  Adding javadocs and some more tests
This commit is contained in:
jsbali
2021-05-11 03:56:49 +05:30
committed by GitHub
parent 8a48d16e41
commit aa398f77f1
3 changed files with 115 additions and 14 deletions

View File

@@ -59,6 +59,7 @@ import java.util.List;
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -120,7 +121,6 @@ public class TestHoodieParquetInputFormat {
assertFalse(filteredTimeline.containsInstant(t4));
assertFalse(filteredTimeline.containsInstant(t5));
assertFalse(filteredTimeline.containsInstant(t6));
// remove compaction instant and setup timeline again
instants.remove(t3);
timeline = new HoodieActiveTimeline(metaClient);
@@ -193,6 +193,55 @@ public class TestHoodieParquetInputFormat {
+ "files from 200 commit", files, "100", 5);
}
@Test
public void testSnapshotWithInvalidCommitShouldThrowException() throws IOException {
File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
InputFormatTestUtil.commit(basePath, "100");
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
InputFormatTestUtil.setupSnapshotIncludePendingCommits(jobConf, "1");
Exception exception = assertThrows(HoodieIOException.class, () -> inputFormat.listStatus(jobConf));
assertEquals("Valid timestamp is required for hoodie.%s.consume.commit in snapshot mode", exception.getMessage());
InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, "1");
exception = assertThrows(HoodieIOException.class, () -> inputFormat.listStatus(jobConf));
assertEquals("Valid timestamp is required for hoodie.%s.consume.commit in snapshot mode", exception.getMessage());
}
@Test
public void testPointInTimeQueryWithUpdates() throws IOException {
// initial commit
File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
InputFormatTestUtil.commit(basePath, "100");
// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
FileStatus[] files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length);
// update files
InputFormatTestUtil.simulateUpdates(partitionDir, baseFileExtension, "100", 5, "200", true);
// Before the commit
files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length);
ensureFilesInCommit("Commit 200 has not been committed. We should not see files from this commit", files, "200", 0);
InputFormatTestUtil.commit(basePath, "200");
InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, "100");
files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length);
ensureFilesInCommit("We shouldn't have any file pertaining to commit 200", files, "200", 0);
ensureFilesInCommit("All files should be from commit 100", files, "100", 10);
InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, "200");
files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length);
ensureFilesInCommit("5 files for commit 200", files, "200", 5);
ensureFilesInCommit("5 files for commit 100", files, "100", 5);
}
@Test
public void testInputFormatWithCompaction() throws IOException {
// initial commit
@@ -496,6 +545,11 @@ public class TestHoodieParquetInputFormat {
// expected because validate is called with invalid instantTime
}
//Creating a new jobCOnf Object because old one has hoodie.%.consume.commit set
jobConf = new JobConf();
inputFormat.setConf(jobConf);
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
// verify that snapshot mode skips uncommitted files
InputFormatTestUtil.setupSnapshotScanMode(jobConf);
files = inputFormat.listStatus(jobConf);

View File

@@ -141,19 +141,30 @@ public class InputFormatTestUtil {
jobConf.set(validateTimestampName, instantTime);
}
public static void setupSnapshotMaxCommitTimeQueryMode(JobConf jobConf, String maxInstantTime) {
setUpScanMode(jobConf);
String validateTimestampName =
String.format(HoodieHiveUtils.HOODIE_CONSUME_COMMIT, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.set(validateTimestampName, maxInstantTime);
}
public static void setupSnapshotScanMode(JobConf jobConf) {
setupSnapshotScanMode(jobConf, false);
}
private static void setupSnapshotScanMode(JobConf jobConf, boolean includePending) {
String modePropertyName =
String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.set(modePropertyName, HoodieHiveUtils.SNAPSHOT_SCAN_MODE);
setUpScanMode(jobConf);
String includePendingCommitsName =
String.format(HoodieHiveUtils.HOODIE_CONSUME_PENDING_COMMITS, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.setBoolean(includePendingCommitsName, includePending);
}
private static void setUpScanMode(JobConf jobConf) {
String modePropertyName =
String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.set(modePropertyName, HoodieHiveUtils.SNAPSHOT_SCAN_MODE);
}
public static File prepareParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles,
int numberOfRecords, String commitNumber) throws IOException {
return prepareParquetTable(basePath, schema, numberOfFiles, numberOfRecords, commitNumber, HoodieTableType.COPY_ON_WRITE);