1
0

[HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR FileInputFormats (#4556)

This commit is contained in:
Alexey Kudinkin
2022-02-03 14:01:41 -08:00
committed by GitHub
parent 5927bdd1c0
commit 69dfcda116
22 changed files with 493 additions and 197 deletions

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.table;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -189,8 +190,10 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
assertTrue(fileIdToNewSize.entrySet().stream().anyMatch(entry -> fileIdToSize.get(entry.getKey()) < entry.getValue()));
List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
List<String> inputPaths = roView.getLatestBaseFiles()
.map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
.collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), inputPaths,
basePath(), new JobConf(hadoopConf()), true, false);
// Wrote 20 records in 2 batches
assertEquals(40, recordsRead.size(), "Must contain 40 records");

View File

@@ -22,6 +22,7 @@ package org.apache.hudi.table.functional;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
@@ -113,13 +114,14 @@ public class TestHoodieSparkMergeOnReadTableCompaction extends SparkClientFuncti
JavaRDD records = jsc().parallelize(dataGen.generateInserts(instant, numRecords), 2);
metaClient = HoodieTableMetaClient.reload(metaClient);
client.startCommitWithTime(instant);
List<WriteStatus> writeStatues = client.upsert(records, instant).collect();
org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatues);
List<WriteStatus> writeStatuses = client.upsert(records, instant).collect();
org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatuses);
if (doCommit) {
Assertions.assertTrue(client.commitStats(instant, writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
Option.empty(), metaClient.getCommitActionType()));
List<HoodieWriteStat> writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
boolean committed = client.commitStats(instant, writeStats, Option.empty(), metaClient.getCommitActionType());
Assertions.assertTrue(committed);
}
metaClient = HoodieTableMetaClient.reload(metaClient);
return writeStatues;
return writeStatuses;
}
}

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.table.functional;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.FileSlice;
@@ -213,8 +214,11 @@ public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent());
List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, basePath(), new JobConf(hadoopConf()), true, false);
List<String> inputPaths = tableView.getLatestBaseFiles()
.map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
.collect(Collectors.toList());
List<GenericRecord> recordsRead =
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), inputPaths, basePath(), new JobConf(hadoopConf()), true, false);
// Wrote 20 records and deleted 20 records, so remaining 20-20 = 0
assertEquals(0, recordsRead.size(), "Must contain 0 records");
}

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieRecord;
@@ -40,6 +41,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -64,6 +66,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -172,10 +175,12 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
client.commit(newCommitTime, writeStatusJavaRDD);
List<WriteStatus> statuses = writeStatusJavaRDD.collect();
assertNoWriteErrors(statuses);
client.commit(newCommitTime, jsc().parallelize(statuses));
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -208,8 +213,10 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200));
List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
List<String> inputPaths = tableView.getLatestBaseFiles()
.map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
.collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), inputPaths,
basePath());
assertEquals(200, recordsRead.size());
@@ -225,8 +232,10 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
.contains(commitTime1)).map(fileStatus -> fileStatus.getPath().toString()).collect(Collectors.toList());
assertEquals(0, remainingFiles.size(), "There files should have been rolled-back "
+ "when rolling back commit " + commitTime1 + " but are still remaining. Files: " + remainingFiles);
dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, basePath());
inputPaths = tableView.getLatestBaseFiles()
.map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
.collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), inputPaths, basePath());
assertEquals(200, recordsRead.size());
}
@@ -241,8 +250,10 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
copyOfRecords = dataGen.generateUpdates(commitTime2, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(commitTime2, 200));
List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
List<String> inputPaths = tableView.getLatestBaseFiles()
.map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
.collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), inputPaths,
basePath());
assertEquals(200, recordsRead.size());
@@ -262,8 +273,10 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, basePath());
inputPaths = tableView.getLatestBaseFiles()
.map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
.collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), inputPaths, basePath());
// check that the number of records read is still correct after rollback operation
assertEquals(200, recordsRead.size());
@@ -275,11 +288,13 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
thirdClient.startCommitWithTime(newCommitTime);
writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime);
statuses = writeStatusJavaRDD.collect();
thirdClient.commit(newCommitTime, writeStatusJavaRDD);
// Verify there are no errors
assertNoWriteErrors(statuses);
thirdClient.commit(newCommitTime, jsc().parallelize(statuses));
metaClient = HoodieTableMetaClient.reload(metaClient);
String compactionInstantTime = thirdClient.scheduleCompaction(Option.empty()).get().toString();
@@ -317,8 +332,8 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
try (final SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
/*
* Write 1 (only inserts)
*/
@@ -329,20 +344,29 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
client.commit(newCommitTime, writeStatusJavaRDD);
List<WriteStatus> statuses = writeStatusJavaRDD.collect();
assertNoWriteErrors(statuses);
client.commit(newCommitTime, jsc().parallelize(statuses));
client.close();
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantCommitMetadataPairOpt =
metaClient.getActiveTimeline().getLastCommitMetadataWithValidData();
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
assertTrue(deltaCommit.isPresent());
assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit should be 001");
assertTrue(instantCommitMetadataPairOpt.isPresent());
HoodieInstant commitInstant = instantCommitMetadataPairOpt.get().getKey();
assertEquals("001", commitInstant.getTimestamp());
assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION, commitInstant.getAction());
assertEquals(200, getTotalRecordsWritten(instantCommitMetadataPairOpt.get().getValue()));
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
@@ -352,6 +376,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent(),
"Should list the base files we wrote in the delta commit");
/*
* Write 2 (inserts + updates)
*/
@@ -368,7 +393,9 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200));
List<String> dataFiles = tableView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
List<String> dataFiles = tableView.getLatestBaseFiles()
.map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
.collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
basePath());
assertEquals(200, recordsRead.size());
@@ -376,7 +403,9 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
statuses = nClient.upsert(jsc().parallelize(copyOfRecords, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
nClient.commit(newCommitTime, writeStatusJavaRDD);
nClient.commit(newCommitTime, jsc().parallelize(statuses));
copyOfRecords.clear();
}
@@ -393,11 +422,12 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
writeRecords = jsc().parallelize(records, 1);
writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
client.commit(newCommitTime, writeStatusJavaRDD);
statuses = writeStatusJavaRDD.collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
client.commit(newCommitTime, jsc().parallelize(statuses));
metaClient = HoodieTableMetaClient.reload(metaClient);
String compactionInstantTime = "004";
@@ -414,11 +444,12 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
writeRecords = jsc().parallelize(records, 1);
writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
client.commit(newCommitTime, writeStatusJavaRDD);
statuses = writeStatusJavaRDD.collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
client.commit(newCommitTime, jsc().parallelize(statuses));
metaClient = HoodieTableMetaClient.reload(metaClient);
compactionInstantTime = "006";
@@ -447,7 +478,9 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
statuses = client.upsert(jsc().parallelize(copyOfRecords, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
client.commit(newCommitTime, writeStatusJavaRDD);
client.commit(newCommitTime, jsc().parallelize(statuses));
copyOfRecords.clear();
// Rollback latest commit first
@@ -471,6 +504,13 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
}
}
private long getTotalRecordsWritten(HoodieCommitMetadata commitMetadata) {
return commitMetadata.getPartitionToWriteStats().values().stream()
.flatMap(Collection::stream)
.map(stat -> stat.getNumWrites() + stat.getNumUpdateWrites())
.reduce(0L, Long::sum);
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testMORTableRestore(boolean restoreAfterCompaction) throws Exception {
@@ -523,8 +563,6 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
client.commit(newCommitTime, writeStatusJavaRDD);
List<WriteStatus> statuses = writeStatusJavaRDD.collect();
assertNoWriteErrors(statuses);
return records;
}
@@ -541,8 +579,10 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
List<String> dataFiles = tableView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
List<String> inputPaths = tableView.getLatestBaseFiles()
.map(hf -> new Path(hf.getPath()).getParent().toString())
.collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), inputPaths,
basePath());
assertRecords(expectedRecords, recordsRead);
}
@@ -603,9 +643,8 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<HoodieRecord> recordsRDD = jsc().parallelize(records, 1);
JavaRDD<WriteStatus> statuses = writeClient.insert(recordsRDD, newCommitTime);
// trigger an action
List<WriteStatus> writeStatuses = statuses.collect();
List<WriteStatus> writeStatuses = ((JavaRDD<WriteStatus>) writeClient.insert(recordsRDD, newCommitTime)).collect();
// Ensure that inserts are written to only log files
assertEquals(0,

View File

@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi;
public class TypeUtils {
/**
* This utility abstracts unsafe type-casting in a way that allows to
* <ul>
* <li>Search for such type-casts more easily (just searching for usages of this method)</li>
* <li>Avoid type-cast warnings from the compiler</li>
* </ul>
*/
@SuppressWarnings("unchecked")
public static <T> T unsafeCast(Object o) {
return (T) o;
}
}

View File

@@ -24,11 +24,13 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ
import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView}
import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.language.implicitConversions
/**
* Common (engine-agnostic) File Index implementation enabling individual query engines to
@@ -87,6 +89,12 @@ abstract class HoodieTableFileIndexBase(engineContext: HoodieEngineContext,
refresh0()
/**
* Returns latest completed instant as seen by this instance of the file-index
*/
def latestCompletedInstant(): Option[HoodieInstant] =
getActiveTimeline.filterCompletedInstants().lastInstant()
/**
* Fetch list of latest base files and log files per partition.
*
@@ -171,11 +179,17 @@ abstract class HoodieTableFileIndexBase(engineContext: HoodieEngineContext,
}
private def getActiveTimeline = {
val timeline = metaClient.getActiveTimeline.getCommitsTimeline
// NOTE: We have to use commits and compactions timeline, to make sure that we're properly
// handling the following case: when records are inserted into the new log-file w/in the file-group
// that is under the pending compaction process, new log-file will bear the compaction's instant (on the
// timeline) in its name, as opposed to the base-file's commit instant. To make sure we're not filtering
// such log-file we have to _always_ include pending compaction instants into consideration
// TODO(HUDI-3302) re-evaluate whether we should not filter any commits in here
val timeline = metaClient.getCommitsAndCompactionTimeline
if (shouldIncludePendingCommits) {
timeline
} else {
timeline.filterCompletedInstants()
timeline.filterCompletedAndCompactionInstants()
}
}
@@ -291,6 +305,16 @@ abstract class HoodieTableFileIndexBase(engineContext: HoodieEngineContext,
}
}
}
/**
* Converts Hudi's internal representation of the {@code Option} into Scala's default one
*/
implicit def asScalaOption[T](opt: org.apache.hudi.common.util.Option[T]): Option[T] =
if (opt.isPresent) {
Some(opt.get)
} else {
None
}
}
trait FileStatusCacheTrait {

View File

@@ -36,7 +36,7 @@ import java.util.List;
*/
public class BaseFileWithLogsSplit extends FileSplit {
// a flag to mark this split is produced by incremental query or not.
private boolean belongToIncrementalSplit = false;
private boolean belongsToIncrementalQuery = false;
// the log file paths of this split.
private List<HoodieLogFile> deltaLogFiles = new ArrayList<>();
// max commit time of current split.
@@ -53,7 +53,7 @@ public class BaseFileWithLogsSplit extends FileSplit {
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeBoolean(belongToIncrementalSplit);
out.writeBoolean(belongsToIncrementalQuery);
Text.writeString(out, maxCommitTime);
Text.writeString(out, basePath);
Text.writeString(out, baseFilePath);
@@ -67,7 +67,7 @@ public class BaseFileWithLogsSplit extends FileSplit {
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
belongToIncrementalSplit = in.readBoolean();
belongsToIncrementalQuery = in.readBoolean();
maxCommitTime = Text.readString(in);
basePath = Text.readString(in);
baseFilePath = Text.readString(in);
@@ -81,12 +81,12 @@ public class BaseFileWithLogsSplit extends FileSplit {
deltaLogFiles = tempDeltaLogs;
}
public boolean getBelongToIncrementalSplit() {
return belongToIncrementalSplit;
public boolean getBelongsToIncrementalQuery() {
return belongsToIncrementalQuery;
}
public void setBelongToIncrementalSplit(boolean belongToIncrementalSplit) {
this.belongToIncrementalSplit = belongToIncrementalSplit;
public void setBelongsToIncrementalQuery(boolean belongsToIncrementalQuery) {
this.belongsToIncrementalQuery = belongsToIncrementalQuery;
}
public List<HoodieLogFile> getDeltaLogFiles() {

View File

@@ -32,6 +32,10 @@ public class BootstrapBaseFileSplit extends FileSplit {
private FileSplit bootstrapFileSplit;
/**
* NOTE: This ctor is necessary for Hive to be able to serialize and
* then instantiate it when deserializing back
*/
public BootstrapBaseFileSplit() {
super();
}

View File

@@ -38,6 +38,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import scala.collection.JavaConverters;
@@ -70,8 +71,6 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat<NullWrit
protected Configuration conf;
protected abstract boolean includeLogFilesForSnapShotView();
@Override
public final Configuration getConf() {
return conf;
@@ -82,27 +81,6 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat<NullWrit
this.conf = conf;
}
@Nonnull
private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile, Stream<HoodieLogFile> logFiles) {
List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
try {
RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(latestLogFile.getFileStatus());
rtFileStatus.setDeltaLogFiles(sortedLogFiles);
return rtFileStatus;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Nonnull
private static FileStatus getFileStatusUnchecked(Option<HoodieBaseFile> baseFileOpt) {
try {
return HoodieInputFormatUtils.getFileStatus(baseFileOpt.get());
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
@Override
public FileStatus[] listStatus(JobConf job) throws IOException {
// Segregate inputPaths[] to incremental, snapshot and non hoodie paths
@@ -143,6 +121,126 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat<NullWrit
return returns.toArray(new FileStatus[0]);
}
private void validate(List<FileStatus> targetFiles, List<FileStatus> legacyFileStatuses) {
List<FileStatus> diff = CollectionUtils.diff(targetFiles, legacyFileStatuses);
checkState(diff.isEmpty(), "Should be empty");
}
@Nonnull
private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
try {
return HoodieInputFormatUtils.getFileStatus(baseFile);
} catch (IOException ioe) {
throw new HoodieIOException("Failed to get file-status", ioe);
}
}
/**
* Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that
* lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified
* as part of provided {@link JobConf}
*/
protected final FileStatus[] doListStatus(JobConf job) throws IOException {
return super.listStatus(job);
}
/**
* Achieves listStatus functionality for an incrementally queried table. Instead of listing all
* partitions and then filtering based on the commits of interest, this logic first extracts the
* partitions touched by the desired commits and then lists only those partitions.
*/
protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
HoodieTableMetaClient tableMetaClient,
List<Path> inputPaths,
String incrementalTable) throws IOException {
Job jobContext = Job.getInstance(job);
Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
if (!timeline.isPresent()) {
return null;
}
Option<List<HoodieInstant>> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, incrementalTable, timeline.get());
if (!commitsToCheck.isPresent()) {
return null;
}
Option<String> incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths);
// Mutate the JobConf to set the input paths to only partitions touched by incremental pull.
if (!incrementalInputPaths.isPresent()) {
return null;
}
setInputPaths(job, incrementalInputPaths.get());
FileStatus[] fileStatuses = doListStatus(job);
return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
}
protected abstract boolean includeLogFilesForSnapshotView();
@Nonnull
private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
Stream<HoodieLogFile> logFiles,
Option<HoodieInstant> latestCompletedInstantOpt,
HoodieTableMetaClient tableMetaClient) {
List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
FileStatus baseFileStatus = getFileStatusUnchecked(baseFile);
try {
RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus);
rtFileStatus.setDeltaLogFiles(sortedLogFiles);
rtFileStatus.setBaseFilePath(baseFile.getPath());
rtFileStatus.setBasePath(tableMetaClient.getBasePath());
if (latestCompletedInstantOpt.isPresent()) {
HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
checkState(latestCompletedInstant.isCompleted());
rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
}
if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
rtFileStatus.setBootStrapFileStatus(baseFileStatus);
}
return rtFileStatus;
} catch (IOException e) {
throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
}
}
@Nonnull
private List<FileStatus> listStatusForSnapshotModeLegacy(JobConf job, Map<String, HoodieTableMetaClient> tableMetaClientMap, List<Path> snapshotPaths) throws IOException {
return HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapshotView());
}
@Nonnull
private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile,
Stream<HoodieLogFile> logFiles,
Option<HoodieInstant> latestCompletedInstantOpt,
HoodieTableMetaClient tableMetaClient) {
List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
try {
RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(latestLogFile.getFileStatus());
rtFileStatus.setDeltaLogFiles(sortedLogFiles);
rtFileStatus.setBasePath(tableMetaClient.getBasePath());
if (latestCompletedInstantOpt.isPresent()) {
HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
checkState(latestCompletedInstant.isCompleted());
rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
}
return rtFileStatus;
} catch (IOException e) {
throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
}
}
private static Option<HoodieInstant> fromScala(scala.Option<HoodieInstant> opt) {
if (opt.isDefined()) {
return Option.of(opt.get());
}
return Option.empty();
}
@Nonnull
private List<FileStatus> listStatusForSnapshotMode(JobConf job,
Map<String, HoodieTableMetaClient> tableMetaClientMap,
@@ -187,13 +285,28 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat<NullWrit
.map(fileSlice -> {
Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
Option<HoodieLogFile> latestLogFileOpt = fileSlice.getLatestLogFile();
if (baseFileOpt.isPresent()) {
return getFileStatusUnchecked(baseFileOpt);
} else if (includeLogFilesForSnapShotView() && latestLogFileOpt.isPresent()) {
return createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), fileSlice.getLogFiles());
Stream<HoodieLogFile> logFiles = fileSlice.getLogFiles();
Option<HoodieInstant> latestCompletedInstantOpt =
fromScala(fileIndex.latestCompletedInstant());
// Check if we're reading a MOR table
if (includeLogFilesForSnapshotView()) {
if (baseFileOpt.isPresent()) {
return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles, latestCompletedInstantOpt, tableMetaClient);
} else if (latestLogFileOpt.isPresent()) {
return createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), logFiles, latestCompletedInstantOpt, tableMetaClient);
} else {
throw new IllegalStateException("Invalid state: either base-file or log-file has to be present");
}
} else {
throw new IllegalStateException("Invalid state: either base-file or log-file should be present");
if (baseFileOpt.isPresent()) {
return getFileStatusUnchecked(baseFileOpt.get());
} else {
throw new IllegalStateException("Invalid state: base-file has to be present");
}
}
})
.collect(Collectors.toList())
);
@@ -204,49 +317,4 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat<NullWrit
return targetFiles;
}
private void validate(List<FileStatus> targetFiles, List<FileStatus> legacyFileStatuses) {
List<FileStatus> diff = CollectionUtils.diff(targetFiles, legacyFileStatuses);
checkState(diff.isEmpty(), "Should be empty");
}
@Nonnull
private List<FileStatus> listStatusForSnapshotModeLegacy(JobConf job, Map<String, HoodieTableMetaClient> tableMetaClientMap, List<Path> snapshotPaths) throws IOException {
return HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapShotView());
}
/**
* Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that
* lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified
* as part of provided {@link JobConf}
*/
protected final FileStatus[] doListStatus(JobConf job) throws IOException {
return super.listStatus(job);
}
/**
* Achieves listStatus functionality for an incrementally queried table. Instead of listing all
* partitions and then filtering based on the commits of interest, this logic first extracts the
* partitions touched by the desired commits and then lists only those partitions.
*/
protected List<FileStatus> listStatusForIncrementalMode(JobConf job, HoodieTableMetaClient tableMetaClient,
List<Path> inputPaths, String incrementalTable) throws IOException {
Job jobContext = Job.getInstance(job);
Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
if (!timeline.isPresent()) {
return null;
}
Option<List<HoodieInstant>> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, incrementalTable, timeline.get());
if (!commitsToCheck.isPresent()) {
return null;
}
Option<String> incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths);
// Mutate the JobConf to set the input paths to only partitions touched by incremental pull.
if (!incrementalInputPaths.isPresent()) {
return null;
}
setInputPaths(job, incrementalInputPaths.get());
FileStatus[] fileStatuses = doListStatus(job);
return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
}
}

View File

@@ -42,7 +42,7 @@ public class HoodieHFileInputFormat extends HoodieFileInputFormatBase {
}
@Override
protected boolean includeLogFilesForSnapShotView() {
protected boolean includeLogFilesForSnapshotView() {
return false;
}

View File

@@ -66,7 +66,7 @@ public class HoodieParquetInputFormat extends HoodieFileInputFormatBase implemen
return HoodieInputFormatUtils.filterInstantsTimeline(timeline);
}
protected boolean includeLogFilesForSnapShotView() {
protected boolean includeLogFilesForSnapshotView() {
return false;
}

View File

@@ -35,7 +35,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getTableMetaClientForBasePath;
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getTableMetaClientForBasePathUnchecked;
/**
* InputPathHandler takes in a set of input paths and incremental tables list. Then, classifies the
@@ -107,7 +107,7 @@ public class InputPathHandler {
// This path is for a table that we don't know about yet.
HoodieTableMetaClient metaClient;
try {
metaClient = getTableMetaClientForBasePath(inputPath.getFileSystem(conf), inputPath);
metaClient = getTableMetaClientForBasePathUnchecked(conf, inputPath);
tableMetaClientMap.put(getIncrementalTable(metaClient), metaClient);
tagAsIncrementalOrSnapshot(inputPath, metaClient, incrementalTables);
} catch (TableNotFoundException | InvalidTableException e) {

View File

@@ -31,7 +31,7 @@ import java.util.List;
*/
public class PathWithLogFilePath extends Path {
// a flag to mark this split is produced by incremental query or not.
private boolean belongToIncrementalPath = false;
private boolean belongsToIncrementalPath = false;
// the log files belong this path.
private List<HoodieLogFile> deltaLogFiles = new ArrayList<>();
// max commit time of current path.
@@ -48,8 +48,8 @@ public class PathWithLogFilePath extends Path {
super(parent, child);
}
public void setBelongToIncrementalPath(boolean belongToIncrementalPath) {
this.belongToIncrementalPath = belongToIncrementalPath;
public void setBelongsToIncrementalPath(boolean belongsToIncrementalPath) {
this.belongsToIncrementalPath = belongsToIncrementalPath;
}
public List<HoodieLogFile> getDeltaLogFiles() {
@@ -72,6 +72,10 @@ public class PathWithLogFilePath extends Path {
return basePath;
}
public boolean getBelongsToIncrementalQuery() {
return belongsToIncrementalPath;
}
public void setBasePath(String basePath) {
this.basePath = basePath;
}
@@ -98,7 +102,7 @@ public class PathWithLogFilePath extends Path {
public BaseFileWithLogsSplit buildSplit(Path file, long start, long length, String[] hosts) {
BaseFileWithLogsSplit bs = new BaseFileWithLogsSplit(file, start, length, hosts);
bs.setBelongToIncrementalSplit(belongToIncrementalPath);
bs.setBelongsToIncrementalQuery(belongsToIncrementalPath);
bs.setDeltaLogFiles(deltaLogFiles);
bs.setMaxCommitTime(maxCommitTime);
bs.setBasePath(basePath);

View File

@@ -56,7 +56,7 @@ public class RealtimeFileStatus extends FileStatus {
public Path getPath() {
Path path = super.getPath();
PathWithLogFilePath pathWithLogFilePath = new PathWithLogFilePath(path.getParent(), path.getName());
pathWithLogFilePath.setBelongToIncrementalPath(belongToIncrementalFileStatus);
pathWithLogFilePath.setBelongsToIncrementalPath(belongToIncrementalFileStatus);
pathWithLogFilePath.setDeltaLogFiles(deltaLogFiles);
pathWithLogFilePath.setMaxCommitTime(maxCommitTime);
pathWithLogFilePath.setBasePath(basePath);

View File

@@ -39,7 +39,8 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Arrays;
import java.util.stream.Stream;
import java.util.List;
import java.util.stream.Collectors;
/**
* HoodieRealtimeInputFormat for HUDI datasets which store data in HFile base file format.
@@ -52,7 +53,9 @@ public class HoodieHFileRealtimeInputFormat extends HoodieHFileInputFormat {
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is);
List<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits))
.map(is -> (FileSplit) is)
.collect(Collectors.toList());
return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
}

View File

@@ -83,14 +83,11 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
List<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList());
boolean isIncrementalSplits = HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits);
return isIncrementalSplits
? HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, fileSplits.stream())
: HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits.stream());
return HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits)
? HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, fileSplits)
: HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
}
/**
@@ -112,8 +109,10 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
* TODO: unify the incremental view code between hive/spark-sql and spark datasource
*/
@Override
protected List<FileStatus> listStatusForIncrementalMode(
JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths, String incrementalTable) throws IOException {
protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
HoodieTableMetaClient tableMetaClient,
List<Path> inputPaths,
String incrementalTable) throws IOException {
List<FileStatus> result = new ArrayList<>();
Job jobContext = Job.getInstance(job);
@@ -217,7 +216,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
}
@Override
protected boolean includeLogFilesForSnapShotView() {
protected boolean includeLogFilesForSnapshotView() {
return true;
}
@@ -251,14 +250,18 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
private FileSplit doMakeSplitForPathWithLogFilePath(PathWithLogFilePath path, long start, long length, String[] hosts, String[] inMemoryHosts) {
if (!path.includeBootstrapFilePath()) {
return path.buildSplit(path, start, length, hosts);
} else {
FileSplit bf =
inMemoryHosts == null
? super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts)
: super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts, inMemoryHosts);
return HoodieRealtimeInputFormatUtils
.createRealtimeBoostrapBaseFileSplit((BootstrapBaseFileSplit) bf, path.getBasePath(), path.getDeltaLogFiles(), path.getMaxCommitTime());
}
FileSplit bf = inMemoryHosts == null
? super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts)
: super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts, inMemoryHosts);
return HoodieRealtimeInputFormatUtils.createRealtimeBoostrapBaseFileSplit(
(BootstrapBaseFileSplit) bf,
path.getBasePath(),
path.getDeltaLogFiles(),
path.getMaxCommitTime(),
path.getBelongsToIncrementalQuery());
}
@Override

View File

@@ -32,6 +32,9 @@ import java.util.stream.Collectors;
/**
* Filesplit that wraps the base split and a list of log files to merge deltas from.
*
* NOTE: If you're adding fields here you need to make sure that you appropriately de-/serialize them
* in {@link #readFromInput(DataInput)} and {@link #writeToOutput(DataOutput)}
*/
public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit {
@@ -44,9 +47,7 @@ public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit
private Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty();
public HoodieRealtimeFileSplit() {
super();
}
public HoodieRealtimeFileSplit() {}
public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List<HoodieLogFile> deltaLogFiles, String maxCommitTime,
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo)

View File

@@ -18,11 +18,11 @@
package org.apache.hudi.hadoop.realtime;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hudi.hadoop.InputSplitUtils;
import java.io.DataInput;
import java.io.DataOutput;
@@ -33,6 +33,9 @@ import java.util.stream.Collectors;
/**
* Realtime File Split with external base file.
*
* NOTE: If you're adding fields here you need to make sure that you appropriately de-/serialize them
* in {@link #readFromInput(DataInput)} and {@link #writeToOutput(DataOutput)}
*/
public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit implements RealtimeSplit {
@@ -43,29 +46,42 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit imple
private String basePath;
private boolean belongsToIncrementalSplit;
/**
* NOTE: This ctor is necessary for Hive to be able to serialize and
* then instantiate it when deserializing back
*/
public RealtimeBootstrapBaseFileSplit() {
super();
}
public RealtimeBootstrapBaseFileSplit(FileSplit baseSplit, String basePath, List<HoodieLogFile> deltaLogFiles,
String maxInstantTime, FileSplit externalFileSplit) throws IOException {
public RealtimeBootstrapBaseFileSplit(FileSplit baseSplit,
String basePath,
List<HoodieLogFile> deltaLogFiles,
String maxInstantTime,
FileSplit externalFileSplit,
boolean belongsToIncrementalQuery) throws IOException {
super(baseSplit, externalFileSplit);
this.maxInstantTime = maxInstantTime;
this.deltaLogFiles = deltaLogFiles;
this.deltaLogPaths = deltaLogFiles.stream().map(entry -> entry.getPath().toString()).collect(Collectors.toList());
this.basePath = basePath;
this.belongsToIncrementalSplit = belongsToIncrementalQuery;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
writeToOutput(out);
InputSplitUtils.writeBoolean(belongsToIncrementalSplit, out);
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
readFromInput(in);
belongsToIncrementalSplit = InputSplitUtils.readBoolean(in);
}
@Override
@@ -93,6 +109,10 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit imple
return Option.empty();
}
public boolean getBelongsToIncrementalQuery() {
return belongsToIncrementalSplit;
}
@Override
public void setDeltaLogPaths(List<String> deltaLogPaths) {
this.deltaLogPaths = deltaLogPaths;

View File

@@ -90,14 +90,16 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo {
for (String logFilePath : getDeltaLogPaths()) {
InputSplitUtils.writeString(logFilePath, out);
}
if (!getHoodieVirtualKeyInfo().isPresent()) {
Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt = getHoodieVirtualKeyInfo();
if (!virtualKeyInfoOpt.isPresent()) {
InputSplitUtils.writeBoolean(false, out);
} else {
InputSplitUtils.writeBoolean(true, out);
InputSplitUtils.writeString(getHoodieVirtualKeyInfo().get().getRecordKeyField(), out);
InputSplitUtils.writeString(getHoodieVirtualKeyInfo().get().getPartitionPathField(), out);
InputSplitUtils.writeString(String.valueOf(getHoodieVirtualKeyInfo().get().getRecordKeyFieldIndex()), out);
InputSplitUtils.writeString(String.valueOf(getHoodieVirtualKeyInfo().get().getPartitionPathFieldIndex()), out);
InputSplitUtils.writeString(virtualKeyInfoOpt.get().getRecordKeyField(), out);
InputSplitUtils.writeString(virtualKeyInfoOpt.get().getPartitionPathField(), out);
InputSplitUtils.writeString(String.valueOf(virtualKeyInfoOpt.get().getRecordKeyFieldIndex()), out);
InputSplitUtils.writeString(String.valueOf(virtualKeyInfoOpt.get().getPartitionPathFieldIndex()), out);
}
}

View File

@@ -318,7 +318,7 @@ public class HoodieInputFormatUtils {
Map<Path, HoodieTableMetaClient> metaClientMap = new HashMap<>();
return partitions.stream().collect(Collectors.toMap(Function.identity(), p -> {
try {
HoodieTableMetaClient metaClient = getTableMetaClientForBasePath(p.getFileSystem(conf), p);
HoodieTableMetaClient metaClient = getTableMetaClientForBasePathUnchecked(conf, p);
metaClientMap.put(p, metaClient);
return metaClient;
} catch (IOException e) {
@@ -328,20 +328,17 @@ public class HoodieInputFormatUtils {
}
/**
* Extract HoodieTableMetaClient from a partition path(not base path).
* @param fs
* @param dataPath
* @return
* @throws IOException
* Extract HoodieTableMetaClient from a partition path (not base path)
*/
public static HoodieTableMetaClient getTableMetaClientForBasePath(FileSystem fs, Path dataPath) throws IOException {
public static HoodieTableMetaClient getTableMetaClientForBasePathUnchecked(Configuration conf, Path partitionPath) throws IOException {
FileSystem fs = partitionPath.getFileSystem(conf);
int levels = HoodieHiveUtils.DEFAULT_LEVELS_TO_BASEPATH;
if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) {
HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath);
if (HoodiePartitionMetadata.hasPartitionMetadata(fs, partitionPath)) {
HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, partitionPath);
metadata.readFromFS();
levels = metadata.getPartitionDepth();
}
Path baseDir = HoodieHiveUtils.getNthParent(dataPath, levels);
Path baseDir = HoodieHiveUtils.getNthParent(partitionPath, levels);
LOG.info("Reading hoodie metadata from path " + baseDir.toString());
return HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(baseDir.toString()).build();
}
@@ -440,6 +437,9 @@ public class HoodieInputFormatUtils {
.build();
}
/**
* @deprecated
*/
public static List<FileStatus> filterFileStatusForSnapshotMode(JobConf job, Map<String, HoodieTableMetaClient> tableMetaClientMap,
List<Path> snapshotPaths, boolean includeLogFiles) throws IOException {
HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(job);

View File

@@ -18,6 +18,13 @@
package org.apache.hudi.hadoop.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
@@ -42,14 +49,6 @@ import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo;
import org.apache.hudi.hadoop.realtime.RealtimeBootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
@@ -57,6 +56,7 @@ import org.apache.parquet.schema.MessageType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -65,11 +65,70 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.TypeUtils.unsafeCast;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
public static InputSplit[] getRealtimeSplits(Configuration conf, Stream<FileSplit> fileSplits) {
public static InputSplit[] getRealtimeSplits(Configuration conf, List<FileSplit> fileSplits) throws IOException {
if (fileSplits.isEmpty()) {
return new InputSplit[0];
}
FileSplit fileSplit = fileSplits.get(0);
// Pre-process table-config to fetch virtual key info
Path partitionPath = fileSplit.getPath().getParent();
HoodieTableMetaClient metaClient = getTableMetaClientForBasePathUnchecked(conf, partitionPath);
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfoOpt = getHoodieVirtualKeyInfo(metaClient);
// NOTE: This timeline is kept in sync w/ {@code HoodieTableFileIndexBase}
HoodieInstant latestCommitInstant =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
InputSplit[] finalSplits = fileSplits.stream()
.map(split -> {
// There are 4 types of splits could we have to handle here
// - {@code BootstrapBaseFileSplit}: in case base file does have associated bootstrap file,
// but does NOT have any log files appended (convert it to {@code RealtimeBootstrapBaseFileSplit})
// - {@code RealtimeBootstrapBaseFileSplit}: in case base file does have associated bootstrap file
// and does have log files appended
// - {@code BaseFileWithLogsSplit}: in case base file does NOT have associated bootstrap file
// and does have log files appended;
// - {@code FileSplit}: in case Hive passed down non-Hudi path
if (split instanceof RealtimeBootstrapBaseFileSplit) {
return split;
} else if (split instanceof BootstrapBaseFileSplit) {
BootstrapBaseFileSplit bootstrapBaseFileSplit = unsafeCast(split);
return createRealtimeBoostrapBaseFileSplit(
bootstrapBaseFileSplit,
metaClient.getBasePath(),
Collections.emptyList(),
latestCommitInstant.getTimestamp(),
false);
} else if (split instanceof BaseFileWithLogsSplit) {
BaseFileWithLogsSplit baseFileWithLogsSplit = unsafeCast(split);
return createHoodieRealtimeSplitUnchecked(baseFileWithLogsSplit, hoodieVirtualKeyInfoOpt);
} else {
// Non-Hudi paths might result in just generic {@code FileSplit} being
// propagated up to this point
return split;
}
})
.toArray(InputSplit[]::new);
LOG.info("Returning a total splits of " + finalSplits.length);
return finalSplits;
}
/**
* @deprecated
*/
public static InputSplit[] getRealtimeSplitsLegacy(Configuration conf, Stream<FileSplit> fileSplits) {
Map<Path, List<FileSplit>> partitionsToParquetSplits =
fileSplits.collect(Collectors.groupingBy(split -> split.getPath().getParent()));
// TODO(vc): Should we handle also non-hoodie splits here?
@@ -124,7 +183,7 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
.collect(Collectors.toList());
if (split instanceof BootstrapBaseFileSplit) {
BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) split;
rtSplits.add(createRealtimeBoostrapBaseFileSplit(eSplit, metaClient.getBasePath(), logFiles, maxCommitTime));
rtSplits.add(createRealtimeBoostrapBaseFileSplit(eSplit, metaClient.getBasePath(), logFiles, maxCommitTime, false));
} else {
rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFiles, maxCommitTime, finalHoodieVirtualKeyInfo));
}
@@ -144,11 +203,16 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
return rtSplits.toArray(new InputSplit[0]);
}
/**
* @deprecated will be replaced w/ {@link #getRealtimeSplits(Configuration, List)}
*/
// get IncrementalRealtimeSplits
public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, Stream<FileSplit> fileSplits) throws IOException {
public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, List<FileSplit> fileSplits) throws IOException {
checkState(fileSplits.stream().allMatch(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery),
"All splits have to belong to incremental query");
List<InputSplit> rtSplits = new ArrayList<>();
List<FileSplit> fileSplitList = fileSplits.collect(Collectors.toList());
Set<Path> partitionSet = fileSplitList.stream().map(f -> f.getPath().getParent()).collect(Collectors.toSet());
Set<Path> partitionSet = fileSplits.stream().map(f -> f.getPath().getParent()).collect(Collectors.toSet());
Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionSet);
// Pre process tableConfig from first partition to fetch virtual key info
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty();
@@ -156,14 +220,12 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
hoodieVirtualKeyInfo = getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next()));
}
Option<HoodieVirtualKeyInfo> finalHoodieVirtualKeyInfo = hoodieVirtualKeyInfo;
fileSplitList.stream().forEach(s -> {
fileSplits.stream().forEach(s -> {
// deal with incremental query.
try {
if (s instanceof BaseFileWithLogsSplit) {
BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
if (bs.getBelongToIncrementalSplit()) {
rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
}
BaseFileWithLogsSplit bs = unsafeCast(s);
rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
} else if (s instanceof RealtimeBootstrapBaseFileSplit) {
rtSplits.add(s);
}
@@ -191,22 +253,30 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
return Option.empty();
}
private static boolean doesBelongToIncrementalQuery(FileSplit s) {
if (s instanceof BaseFileWithLogsSplit) {
BaseFileWithLogsSplit bs = unsafeCast(s);
return bs.getBelongsToIncrementalQuery();
} else if (s instanceof RealtimeBootstrapBaseFileSplit) {
RealtimeBootstrapBaseFileSplit bs = unsafeCast(s);
return bs.getBelongsToIncrementalQuery();
}
return false;
}
public static boolean isIncrementalQuerySplits(List<FileSplit> fileSplits) {
if (fileSplits == null || fileSplits.size() == 0) {
return false;
}
return fileSplits.stream().anyMatch(s -> {
if (s instanceof BaseFileWithLogsSplit) {
BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
return bs.getBelongToIncrementalSplit();
} else {
return s instanceof RealtimeBootstrapBaseFileSplit;
}
});
return fileSplits.stream().anyMatch(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery);
}
public static RealtimeBootstrapBaseFileSplit createRealtimeBoostrapBaseFileSplit(
BootstrapBaseFileSplit split, String basePath, List<HoodieLogFile> logFiles, String maxInstantTime) {
public static RealtimeBootstrapBaseFileSplit createRealtimeBoostrapBaseFileSplit(BootstrapBaseFileSplit split,
String basePath,
List<HoodieLogFile> logFiles,
String maxInstantTime,
boolean belongsToIncrementalQuery) {
try {
String[] hosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo())
.filter(x -> !x.isInMemory()).toArray(String[]::new) : new String[0];
@@ -214,7 +284,7 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
.filter(SplitLocationInfo::isInMemory).toArray(String[]::new) : new String[0];
FileSplit baseSplit = new FileSplit(split.getPath(), split.getStart(), split.getLength(),
hosts, inMemoryHosts);
return new RealtimeBootstrapBaseFileSplit(baseSplit, basePath, logFiles, maxInstantTime, split.getBootstrapFileSplit());
return new RealtimeBootstrapBaseFileSplit(baseSplit, basePath, logFiles, maxInstantTime, split.getBootstrapFileSplit(), belongsToIncrementalQuery);
} catch (IOException e) {
throw new HoodieIOException("Error creating hoodie real time split ", e);
}
@@ -330,4 +400,18 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
}
}
}
private static HoodieRealtimeFileSplit createHoodieRealtimeSplitUnchecked(BaseFileWithLogsSplit baseFileWithLogsSplit,
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfoOpt) {
try {
return new HoodieRealtimeFileSplit(
baseFileWithLogsSplit,
baseFileWithLogsSplit.getBasePath(),
baseFileWithLogsSplit.getDeltaLogFiles(),
baseFileWithLogsSplit.getMaxCommitTime(),
hoodieVirtualKeyInfoOpt);
} catch (IOException e) {
throw new HoodieIOException(String.format("Failed to init %s", HoodieRealtimeFileSplit.class.getSimpleName()), e);
}
}
}

View File

@@ -245,7 +245,7 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness {
}
@Test
public void testMutilReaderRealtimeComineHoodieInputFormat() throws Exception {
public void testMultiReaderRealtimeCombineHoodieInputFormat() throws Exception {
// test for hudi-1722
Configuration conf = new Configuration();
// initial commit