[HUDI-1646] Provide mechanism to read uncommitted data through InputFormat (#2611)
This commit is contained in:
@@ -157,6 +157,13 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
|
||||
details);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieDefaultTimeline findInstantsBeforeOrEquals(String instantTime) {
|
||||
return new HoodieDefaultTimeline(instants.stream()
|
||||
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN_OR_EQUALS, instantTime)),
|
||||
details);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieTimeline filter(Predicate<HoodieInstant> filter) {
|
||||
return new HoodieDefaultTimeline(instants.stream().filter(filter), details);
|
||||
|
||||
@@ -176,6 +176,11 @@ public interface HoodieTimeline extends Serializable {
|
||||
*/
|
||||
HoodieTimeline findInstantsBefore(String instantTime);
|
||||
|
||||
/**
|
||||
* Create new timeline with all instants before or equals specified time.
|
||||
*/
|
||||
HoodieTimeline findInstantsBeforeOrEquals(String instantTime);
|
||||
|
||||
/**
|
||||
* Custom Filter of Instants.
|
||||
*/
|
||||
|
||||
@@ -169,14 +169,21 @@ public class FileSystemViewManager {
|
||||
|
||||
public static HoodieTableFileSystemView createInMemoryFileSystemView(HoodieEngineContext engineContext, HoodieTableMetaClient metaClient,
|
||||
HoodieMetadataConfig metadataConfig) {
|
||||
|
||||
return createInMemoryFileSystemViewWithTimeline(engineContext, metaClient, metadataConfig,
|
||||
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
|
||||
|
||||
}
|
||||
|
||||
public static HoodieTableFileSystemView createInMemoryFileSystemViewWithTimeline(HoodieEngineContext engineContext,
|
||||
HoodieTableMetaClient metaClient,
|
||||
HoodieMetadataConfig metadataConfig,
|
||||
HoodieTimeline timeline) {
|
||||
LOG.info("Creating InMemory based view for basePath " + metaClient.getBasePath());
|
||||
if (metadataConfig.useFileListingMetadata()) {
|
||||
return new HoodieMetadataFileSystemView(engineContext, metaClient,
|
||||
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
|
||||
metadataConfig);
|
||||
return new HoodieMetadataFileSystemView(engineContext, metaClient, timeline, metadataConfig);
|
||||
}
|
||||
return new HoodieTableFileSystemView(metaClient,
|
||||
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
|
||||
return new HoodieTableFileSystemView(metaClient, timeline);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -19,8 +19,12 @@
|
||||
package org.apache.hudi.hadoop.utils;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.CollectionUtils;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
@@ -40,6 +44,8 @@ public class HoodieHiveUtils {
|
||||
public static final String HOODIE_CONSUME_MODE_PATTERN = "hoodie.%s.consume.mode";
|
||||
public static final String HOODIE_START_COMMIT_PATTERN = "hoodie.%s.consume.start.timestamp";
|
||||
public static final String HOODIE_MAX_COMMIT_PATTERN = "hoodie.%s.consume.max.commits";
|
||||
public static final String HOODIE_CONSUME_PENDING_COMMITS = "hoodie.%s.consume.pending.commits";
|
||||
public static final String HOODIE_CONSUME_COMMIT = "hoodie.%s.consume.commit";
|
||||
public static final Set<String> VIRTUAL_COLUMN_NAMES = CollectionUtils.createImmutableSet(
|
||||
"INPUT__FILE__NAME", "BLOCK__OFFSET__INSIDE__FILE", "ROW__OFFSET__INSIDE__BLOCK", "RAW__DATA__SIZE",
|
||||
"ROW__ID", "GROUPING__ID");
|
||||
@@ -115,4 +121,20 @@ public class HoodieHiveUtils {
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public static HoodieTimeline getTableTimeline(final String tableName, final JobConf job, final HoodieTableMetaClient metaClient) {
|
||||
boolean includePendingCommits = job.getBoolean(String.format(HOODIE_CONSUME_PENDING_COMMITS, tableName), false);
|
||||
if (includePendingCommits) {
|
||||
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline();
|
||||
String maxCommit = job.get(String.format(HOODIE_CONSUME_COMMIT, tableName));
|
||||
if (maxCommit == null || !timeline.containsInstant(maxCommit)) {
|
||||
LOG.info("Timestamp configured for validation: " + maxCommit + " commits timeline:" + timeline + " table: " + tableName);
|
||||
throw new HoodieIOException("Valid timestamp is required for " + HOODIE_CONSUME_COMMIT + " in validate mode");
|
||||
}
|
||||
return timeline.findInstantsBeforeOrEquals(maxCommit);
|
||||
}
|
||||
|
||||
// by default return all completed commits.
|
||||
return metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -439,8 +439,9 @@ public class HoodieInputFormatUtils {
|
||||
LOG.debug("Hoodie Metadata initialized with completed commit instant as :" + metaClient);
|
||||
}
|
||||
|
||||
HoodieTimeline timeline = HoodieHiveUtils.getTableTimeline(metaClient.getTableConfig().getTableName(), job, metaClient);
|
||||
HoodieTableFileSystemView fsView = fsViewCache.computeIfAbsent(metaClient, tableMetaClient ->
|
||||
FileSystemViewManager.createInMemoryFileSystemView(engineContext, tableMetaClient, buildMetadataConfig(job)));
|
||||
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, tableMetaClient, buildMetadataConfig(job), timeline));
|
||||
List<HoodieBaseFile> filteredBaseFiles = new ArrayList<>();
|
||||
for (Path p : entry.getValue()) {
|
||||
String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), p);
|
||||
|
||||
@@ -28,7 +28,9 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
|
||||
import org.apache.hudi.common.testutils.FileCreateUtils;
|
||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
|
||||
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
|
||||
|
||||
@@ -57,6 +59,7 @@ import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResou
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
||||
public class TestHoodieParquetInputFormat {
|
||||
|
||||
@@ -429,6 +432,44 @@ public class TestHoodieParquetInputFormat {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSnapshotPreCommitValidate() throws IOException {
|
||||
// initial commit
|
||||
File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
|
||||
createCommitFile(basePath, "100", "2016/05/01");
|
||||
|
||||
// Add the paths
|
||||
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
|
||||
FileStatus[] files = inputFormat.listStatus(jobConf);
|
||||
assertEquals(10, files.length, "Snapshot read must return all files in partition");
|
||||
|
||||
// add more files
|
||||
InputFormatTestUtil.simulateInserts(partitionDir, baseFileExtension, "fileId2-", 5, "200");
|
||||
FileCreateUtils.createInflightCommit(basePath.toString(), "200");
|
||||
|
||||
// Verify that validate mode reads uncommitted files
|
||||
InputFormatTestUtil.setupSnapshotIncludePendingCommits(jobConf, "200");
|
||||
files = inputFormat.listStatus(jobConf);
|
||||
assertEquals(15, files.length, "Must return uncommitted files");
|
||||
ensureFilesInCommit("Pulling 1 commit from 100, should get us the 5 files committed at 200", files, "200", 5);
|
||||
ensureFilesInCommit("Pulling 1 commit from 100, should get us the 10 files committed at 100", files, "100", 10);
|
||||
|
||||
try {
|
||||
// Verify that Validate mode throws error with invalid commit time
|
||||
InputFormatTestUtil.setupSnapshotIncludePendingCommits(jobConf, "300");
|
||||
inputFormat.listStatus(jobConf);
|
||||
fail("Expected list status to fail when validate is called with unknown timestamp");
|
||||
} catch (HoodieIOException e) {
|
||||
// expected because validate is called with invalid instantTime
|
||||
}
|
||||
|
||||
// verify that snapshot mode skips uncommitted files
|
||||
InputFormatTestUtil.setupSnapshotScanMode(jobConf);
|
||||
files = inputFormat.listStatus(jobConf);
|
||||
assertEquals(10, files.length, "snapshot scan mode must not return uncommitted files");
|
||||
ensureFilesInCommit("Pulling 1 commit from 100, should get us the 10 files committed at 100", files, "100", 10);
|
||||
}
|
||||
|
||||
private void ensureRecordsInCommit(String msg, String commit, int expectedNumberOfRecordsInCommit,
|
||||
int totalExpected) throws IOException {
|
||||
int actualCount = 0;
|
||||
|
||||
@@ -122,6 +122,26 @@ public class InputFormatTestUtil {
|
||||
String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
|
||||
}
|
||||
|
||||
public static void setupSnapshotIncludePendingCommits(JobConf jobConf, String instantTime) {
|
||||
setupSnapshotScanMode(jobConf, true);
|
||||
String validateTimestampName =
|
||||
String.format(HoodieHiveUtils.HOODIE_CONSUME_COMMIT, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
jobConf.set(validateTimestampName, instantTime);
|
||||
}
|
||||
|
||||
public static void setupSnapshotScanMode(JobConf jobConf) {
|
||||
setupSnapshotScanMode(jobConf, false);
|
||||
}
|
||||
|
||||
private static void setupSnapshotScanMode(JobConf jobConf, boolean includePending) {
|
||||
String modePropertyName =
|
||||
String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
jobConf.set(modePropertyName, HoodieHiveUtils.SNAPSHOT_SCAN_MODE);
|
||||
String includePendingCommitsName =
|
||||
String.format(HoodieHiveUtils.HOODIE_CONSUME_PENDING_COMMITS, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
jobConf.setBoolean(includePendingCommitsName, includePending);
|
||||
}
|
||||
|
||||
public static File prepareParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles,
|
||||
int numberOfRecords, String commitNumber) throws IOException {
|
||||
|
||||
Reference in New Issue
Block a user