1
0

[HUDI-3280] Cleaning up Hive-related hierarchies after refactoring (#4743)

This commit is contained in:
Alexey Kudinkin
2022-02-16 15:36:37 -08:00
committed by GitHub
parent 3363c66468
commit aaddaf524a
27 changed files with 743 additions and 1013 deletions

View File

@@ -36,6 +36,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -74,6 +75,7 @@ public abstract class BaseHoodieTableFileIndex {
protected final List<Path> queryPaths;
private final boolean shouldIncludePendingCommits;
private final boolean shouldValidateInstant;
private final HoodieTableType tableType;
protected final String basePath;
@@ -98,6 +100,7 @@ public abstract class BaseHoodieTableFileIndex {
* @param queryPaths target DFS paths being queried
* @param specifiedQueryInstant instant as of which table is being queried
* @param shouldIncludePendingCommits flags whether file-index should exclude any pending operations
* @param shouldValidateInstant flags to validate whether query instant is present in the timeline
* @param fileStatusCache transient cache of fetched [[FileStatus]]es
*/
public BaseHoodieTableFileIndex(HoodieEngineContext engineContext,
@@ -107,6 +110,7 @@ public abstract class BaseHoodieTableFileIndex {
List<Path> queryPaths,
Option<String> specifiedQueryInstant,
boolean shouldIncludePendingCommits,
boolean shouldValidateInstant,
FileStatusCache fileStatusCache) {
this.partitionColumns = metaClient.getTableConfig().getPartitionFields()
.orElse(new String[0]);
@@ -122,6 +126,7 @@ public abstract class BaseHoodieTableFileIndex {
this.queryPaths = queryPaths;
this.specifiedQueryInstant = specifiedQueryInstant;
this.shouldIncludePendingCommits = shouldIncludePendingCommits;
this.shouldValidateInstant = shouldValidateInstant;
this.tableType = metaClient.getTableType();
this.basePath = metaClient.getBasePath();
@@ -142,6 +147,13 @@ public abstract class BaseHoodieTableFileIndex {
return getActiveTimeline().filterCompletedInstants().lastInstant();
}
/**
* Returns table's base-path
*/
public String getBasePath() {
return metaClient.getBasePath();
}
/**
* Fetch list of latest base files and log files per partition.
*
@@ -264,6 +276,8 @@ public abstract class BaseHoodieTableFileIndex {
Option<String> queryInstant =
specifiedQueryInstant.or(() -> latestInstant.map(HoodieInstant::getTimestamp));
validate(activeTimeline, queryInstant);
if (tableType.equals(HoodieTableType.MERGE_ON_READ) && queryType.equals(HoodieTableQueryType.SNAPSHOT)) {
cachedAllInputFileSlices = partitionFiles.keySet().stream()
.collect(Collectors.toMap(
@@ -277,15 +291,15 @@ public abstract class BaseHoodieTableFileIndex {
)
);
} else {
// TODO re-align with the branch (MOR, snapshot) branch
cachedAllInputFileSlices = partitionFiles.keySet().stream()
.collect(Collectors.toMap(
Function.identity(),
partitionPath ->
specifiedQueryInstant.map(instant ->
fileSystemView.getLatestFileSlicesBeforeOrOn(partitionPath.path, instant, true))
.orElse(fileSystemView.getLatestFileSlices(partitionPath.path))
.collect(Collectors.toList())
queryInstant.map(instant ->
fileSystemView.getLatestFileSlicesBeforeOrOn(partitionPath.path, instant, true)
)
.orElse(fileSystemView.getLatestFileSlices(partitionPath.path))
.collect(Collectors.toList())
)
);
}
@@ -303,6 +317,14 @@ public abstract class BaseHoodieTableFileIndex {
LOG.info(String.format("Refresh table %s, spent: %d ms", metaClient.getTableConfig().getTableName(), duration));
}
private void validate(HoodieTimeline activeTimeline, Option<String> queryInstant) {
if (shouldValidateInstant) {
if (queryInstant.isPresent() && !activeTimeline.containsInstant(queryInstant.get())) {
throw new HoodieIOException(String.format("Query instant (%s) not found in the timeline", queryInstant.get()));
}
}
}
private static long fileSliceSize(FileSlice fileSlice) {
long logFileSize = fileSlice.getLogFiles().map(HoodieLogFile::getFileSize)
.filter(s -> s > 0)

View File

@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
@@ -153,10 +154,12 @@ public class HoodieCommitMetadata implements Serializable {
* been touched multiple times in the given commits, the return value will keep the one
* from the latest commit.
*
*
* @param hadoopConf
* @param basePath The base path
* @return the file full path to file status mapping
*/
public Map<String, FileStatus> getFullPathToFileStatus(String basePath) {
public Map<String, FileStatus> getFullPathToFileStatus(Configuration hadoopConf, String basePath) {
Map<String, FileStatus> fullPathToFileStatus = new HashMap<>();
for (List<HoodieWriteStat> stats : getPartitionToWriteStats().values()) {
// Iterate through all the written files.
@@ -164,7 +167,8 @@ public class HoodieCommitMetadata implements Serializable {
String relativeFilePath = stat.getPath();
Path fullPath = relativeFilePath != null ? FSUtils.getPartitionPath(basePath, relativeFilePath) : null;
if (fullPath != null) {
FileStatus fileStatus = new FileStatus(stat.getFileSizeInBytes(), false, 0, 0,
long blockSize = FSUtils.getFs(fullPath.toString(), hadoopConf).getDefaultBlockSize(fullPath);
FileStatus fileStatus = new FileStatus(stat.getFileSizeInBytes(), false, 0, blockSize,
0, fullPath);
fullPathToFileStatus.put(fullPath.getName(), fileStatus);
}
@@ -178,14 +182,16 @@ public class HoodieCommitMetadata implements Serializable {
* been touched multiple times in the given commits, the return value will keep the one
* from the latest commit by file group ID.
*
* <p>Note: different with {@link #getFullPathToFileStatus(String)},
* <p>Note: different with {@link #getFullPathToFileStatus(Configuration, String)},
* only the latest commit file for a file group is returned,
* this is an optimization for COPY_ON_WRITE table to eliminate legacy files for filesystem view.
*
*
* @param hadoopConf
* @param basePath The base path
* @return the file ID to file status mapping
*/
public Map<String, FileStatus> getFileIdToFileStatus(String basePath) {
public Map<String, FileStatus> getFileIdToFileStatus(Configuration hadoopConf, String basePath) {
Map<String, FileStatus> fileIdToFileStatus = new HashMap<>();
for (List<HoodieWriteStat> stats : getPartitionToWriteStats().values()) {
// Iterate through all the written files.

View File

@@ -18,6 +18,10 @@
package org.apache.hudi.sink.partitioner.profile;
import org.apache.flink.core.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -29,11 +33,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.core.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -117,7 +116,7 @@ public class WriteProfiles {
HoodieCommitMetadata metadata,
FileSystem fs,
HoodieTableType tableType) {
return getFilesToRead(metadata, basePath.toString(), tableType).entrySet().stream()
return getFilesToRead(fs.getConf(), metadata, basePath.toString(), tableType).entrySet().stream()
// filter out the file paths that does not exist, some files may be cleaned by
// the cleaner.
.filter(entry -> {
@@ -133,14 +132,16 @@ public class WriteProfiles {
}
private static Map<String, FileStatus> getFilesToRead(
Configuration hadoopConf,
HoodieCommitMetadata metadata,
String basePath,
HoodieTableType tableType) {
HoodieTableType tableType
) {
switch (tableType) {
case COPY_ON_WRITE:
return metadata.getFileIdToFileStatus(basePath);
return metadata.getFileIdToFileStatus(hadoopConf, basePath);
case MERGE_ON_READ:
return metadata.getFullPathToFileStatus(basePath);
return metadata.getFullPathToFileStatus(hadoopConf, basePath);
default:
throw new AssertionError();
}

View File

@@ -1,123 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hadoop;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Encode additional information in split to track matching log file and base files.
* Hence, this class tracks a log/base file split.
*/
public class BaseFileWithLogsSplit extends FileSplit {
// a flag to mark this split is produced by incremental query or not.
private boolean belongsToIncrementalQuery = false;
// the log file paths of this split.
private List<HoodieLogFile> deltaLogFiles = new ArrayList<>();
// max commit time of current split.
private String maxCommitTime = "";
// the basePath of current hoodie table.
private String basePath = "";
// the base file belong to this split.
private String baseFilePath = "";
public BaseFileWithLogsSplit(Path file, long start, long length, String[] hosts) {
super(file, start, length, hosts);
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeBoolean(belongsToIncrementalQuery);
Text.writeString(out, maxCommitTime);
Text.writeString(out, basePath);
Text.writeString(out, baseFilePath);
out.writeInt(deltaLogFiles.size());
for (HoodieLogFile logFile : deltaLogFiles) {
Text.writeString(out, logFile.getPath().toString());
out.writeLong(logFile.getFileSize());
}
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
belongsToIncrementalQuery = in.readBoolean();
maxCommitTime = Text.readString(in);
basePath = Text.readString(in);
baseFilePath = Text.readString(in);
int deltaLogSize = in.readInt();
List<HoodieLogFile> tempDeltaLogs = new ArrayList<>();
for (int i = 0; i < deltaLogSize; i++) {
String logPath = Text.readString(in);
long logFileSize = in.readLong();
tempDeltaLogs.add(new HoodieLogFile(new Path(logPath), logFileSize));
}
deltaLogFiles = tempDeltaLogs;
}
public boolean getBelongsToIncrementalQuery() {
return belongsToIncrementalQuery;
}
public void setBelongsToIncrementalQuery(boolean belongsToIncrementalQuery) {
this.belongsToIncrementalQuery = belongsToIncrementalQuery;
}
public List<HoodieLogFile> getDeltaLogFiles() {
return deltaLogFiles;
}
public void setDeltaLogFiles(List<HoodieLogFile> deltaLogFiles) {
this.deltaLogFiles = deltaLogFiles;
}
public String getMaxCommitTime() {
return maxCommitTime;
}
public void setMaxCommitTime(String maxCommitTime) {
this.maxCommitTime = maxCommitTime;
}
public String getBasePath() {
return basePath;
}
public void setBasePath(String basePath) {
this.basePath = basePath;
}
public String getBaseFilePath() {
return baseFilePath;
}
public void setBaseFilePath(String baseFilePath) {
this.baseFilePath = baseFilePath;
}
}

View File

@@ -36,9 +36,7 @@ public class BootstrapBaseFileSplit extends FileSplit {
* NOTE: This ctor is necessary for Hive to be able to serialize and
* then instantiate it when deserializing back
*/
public BootstrapBaseFileSplit() {
super();
}
public BootstrapBaseFileSplit() {}
public BootstrapBaseFileSplit(FileSplit baseSplit, FileSplit bootstrapFileSplit)
throws IOException {

View File

@@ -53,6 +53,7 @@ public class HiveHoodieTableFileIndex extends BaseHoodieTableFileIndex {
queryPaths,
specifiedQueryInstant,
shouldIncludePendingCommits,
true,
new NoopCache());
}

View File

@@ -18,8 +18,6 @@
package org.apache.hudi.hadoop;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -36,16 +34,20 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableQueryType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.parquet.schema.MessageType;
import javax.annotation.Nonnull;
import java.io.IOException;
@@ -57,7 +59,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
@@ -73,20 +74,7 @@ import static org.apache.hudi.common.util.ValidationUtils.checkState;
*
* NOTE: This class is invariant of the underlying file-format of the files being read
*/
public class HoodieCopyOnWriteTableInputFormat extends FileInputFormat<NullWritable, ArrayWritable>
implements Configurable {
protected Configuration conf;
@Override
public final Configuration getConf() {
return conf;
}
@Override
public final void setConf(Configuration conf) {
this.conf = conf;
}
public class HoodieCopyOnWriteTableInputFormat extends HoodieTableInputFormat {
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
@@ -159,10 +147,6 @@ public class HoodieCopyOnWriteTableInputFormat extends FileInputFormat<NullWrita
throw new UnsupportedEncodingException("not implemented");
}
protected boolean includeLogFilesForSnapshotView() {
return false;
}
/**
* Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that
* lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified
@@ -200,6 +184,16 @@ public class HoodieCopyOnWriteTableInputFormat extends FileInputFormat<NullWrita
return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
}
protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt) {
Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
if (baseFileOpt.isPresent()) {
return getFileStatusUnchecked(baseFileOpt.get());
} else {
throw new IllegalStateException("Invalid state: base-file has to be present");
}
}
private BootstrapBaseFileSplit makeExternalFileSplit(PathWithBootstrapFileStatus file, FileSplit split) {
try {
LOG.info("Making external data split for " + file);
@@ -212,11 +206,6 @@ public class HoodieCopyOnWriteTableInputFormat extends FileInputFormat<NullWrita
}
}
@Nonnull
private List<FileStatus> listStatusForSnapshotModeLegacy(JobConf job, Map<String, HoodieTableMetaClient> tableMetaClientMap, List<Path> snapshotPaths) throws IOException {
return HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapshotView());
}
@Nonnull
private List<FileStatus> listStatusForSnapshotMode(JobConf job,
Map<String, HoodieTableMetaClient> tableMetaClientMap,
@@ -253,42 +242,17 @@ public class HoodieCopyOnWriteTableInputFormat extends FileInputFormat<NullWrita
Map<String, List<FileSlice>> partitionedFileSlices = fileIndex.listFileSlices();
Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt = getHoodieVirtualKeyInfo(tableMetaClient);
targetFiles.addAll(
partitionedFileSlices.values()
.stream()
.flatMap(Collection::stream)
.map(fileSlice -> {
Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
Option<HoodieLogFile> latestLogFileOpt = fileSlice.getLatestLogFile();
Stream<HoodieLogFile> logFiles = fileSlice.getLogFiles();
Option<HoodieInstant> latestCompletedInstantOpt = fileIndex.getLatestCompletedInstant();
// Check if we're reading a MOR table
if (includeLogFilesForSnapshotView()) {
if (baseFileOpt.isPresent()) {
return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles, latestCompletedInstantOpt, tableMetaClient);
} else if (latestLogFileOpt.isPresent()) {
return createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), logFiles, latestCompletedInstantOpt, tableMetaClient);
} else {
throw new IllegalStateException("Invalid state: either base-file or log-file has to be present");
}
} else {
if (baseFileOpt.isPresent()) {
return getFileStatusUnchecked(baseFileOpt.get());
} else {
throw new IllegalStateException("Invalid state: base-file has to be present");
}
}
})
.map(fileSlice -> createFileStatusUnchecked(fileSlice, fileIndex, virtualKeyInfoOpt))
.collect(Collectors.toList())
);
}
// TODO(HUDI-3280) cleanup
validate(targetFiles, listStatusForSnapshotModeLegacy(job, tableMetaClientMap, snapshotPaths));
return targetFiles;
}
@@ -298,7 +262,7 @@ public class HoodieCopyOnWriteTableInputFormat extends FileInputFormat<NullWrita
}
@Nonnull
private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
protected static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
try {
return HoodieInputFormatUtils.getFileStatus(baseFile);
} catch (IOException ioe) {
@@ -306,57 +270,20 @@ public class HoodieCopyOnWriteTableInputFormat extends FileInputFormat<NullWrita
}
}
@Nonnull
private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
Stream<HoodieLogFile> logFiles,
Option<HoodieInstant> latestCompletedInstantOpt,
HoodieTableMetaClient tableMetaClient) {
List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
FileStatus baseFileStatus = getFileStatusUnchecked(baseFile);
try {
RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus);
rtFileStatus.setDeltaLogFiles(sortedLogFiles);
rtFileStatus.setBaseFilePath(baseFile.getPath());
rtFileStatus.setBasePath(tableMetaClient.getBasePath());
if (latestCompletedInstantOpt.isPresent()) {
HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
checkState(latestCompletedInstant.isCompleted());
rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
}
if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
rtFileStatus.setBootStrapFileStatus(baseFileStatus);
}
return rtFileStatus;
} catch (IOException e) {
throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
protected static Option<HoodieVirtualKeyInfo> getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) {
HoodieTableConfig tableConfig = metaClient.getTableConfig();
if (tableConfig.populateMetaFields()) {
return Option.empty();
}
}
@Nonnull
private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile,
Stream<HoodieLogFile> logFiles,
Option<HoodieInstant> latestCompletedInstantOpt,
HoodieTableMetaClient tableMetaClient) {
List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
try {
RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(latestLogFile.getFileStatus());
rtFileStatus.setDeltaLogFiles(sortedLogFiles);
rtFileStatus.setBasePath(tableMetaClient.getBasePath());
if (latestCompletedInstantOpt.isPresent()) {
HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
checkState(latestCompletedInstant.isCompleted());
rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
}
return rtFileStatus;
} catch (IOException e) {
throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
MessageType parquetSchema = tableSchemaResolver.getTableParquetSchema();
return Option.of(new HoodieVirtualKeyInfo(tableConfig.getRecordKeyFieldProp(),
tableConfig.getPartitionFieldProp(), parquetSchema.getFieldIndex(tableConfig.getRecordKeyFieldProp()),
parquetSchema.getFieldIndex(tableConfig.getPartitionFieldProp())));
} catch (Exception exception) {
throw new HoodieException("Fetching table schema failed with exception ", exception);
}
}
}

View File

@@ -41,11 +41,6 @@ public class HoodieHFileInputFormat extends HoodieCopyOnWriteTableInputFormat {
return HoodieInputFormatUtils.filterInstantsTimeline(timeline);
}
@Override
protected boolean includeLogFilesForSnapshotView() {
return false;
}
@Override
public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf job,
final Reporter reporter) throws IOException {

View File

@@ -47,7 +47,7 @@ import java.io.IOException;
*/
public abstract class HoodieParquetInputFormatBase extends MapredParquetInputFormat implements Configurable {
private final HoodieCopyOnWriteTableInputFormat inputFormatDelegate;
private final HoodieTableInputFormat inputFormatDelegate;
protected HoodieParquetInputFormatBase(HoodieCopyOnWriteTableInputFormat inputFormatDelegate) {
this.inputFormatDelegate = inputFormatDelegate;

View File

@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hadoop;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import java.io.IOException;
/**
* Abstract base class of the Hive's {@link FileInputFormat} implementations allowing for reading of Hudi's
* Copy-on-Write (COW) and Merge-on-Read (MOR) tables
*/
public abstract class HoodieTableInputFormat extends FileInputFormat<NullWritable, ArrayWritable>
implements Configurable {
protected Configuration conf;
@Override
public final Configuration getConf() {
return conf;
}
@Override
public final void setConf(Configuration conf) {
this.conf = conf;
}
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
return super.isSplitable(fs, filename);
}
@Override
protected FileSplit makeSplit(Path file, long start, long length, String[] hosts) {
return super.makeSplit(file, start, length, hosts);
}
@Override
protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) {
return super.makeSplit(file, start, length, hosts, inMemoryHosts);
}
@Override
protected FileStatus[] listStatus(JobConf job) throws IOException {
return super.listStatus(job);
}
}

View File

@@ -1,112 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hadoop;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hadoop.fs.Path;
import java.util.ArrayList;
import java.util.List;
/**
* Encode additional information in Path to track matching log file and base files.
* Hence, this class tracks a log/base file status.
*/
public class PathWithLogFilePath extends Path {
// a flag to mark this split is produced by incremental query or not.
private boolean belongsToIncrementalPath = false;
// the log files belong this path.
private List<HoodieLogFile> deltaLogFiles = new ArrayList<>();
// max commit time of current path.
private String maxCommitTime = "";
// the basePath of current hoodie table.
private String basePath = "";
// the base file belong to this path;
private String baseFilePath = "";
// the bootstrap file belong to this path.
// only if current query table is bootstrap table, this field is used.
private PathWithBootstrapFileStatus pathWithBootstrapFileStatus;
public PathWithLogFilePath(Path parent, String child) {
super(parent, child);
}
public void setBelongsToIncrementalPath(boolean belongsToIncrementalPath) {
this.belongsToIncrementalPath = belongsToIncrementalPath;
}
public List<HoodieLogFile> getDeltaLogFiles() {
return deltaLogFiles;
}
public void setDeltaLogFiles(List<HoodieLogFile> deltaLogFiles) {
this.deltaLogFiles = deltaLogFiles;
}
public String getMaxCommitTime() {
return maxCommitTime;
}
public void setMaxCommitTime(String maxCommitTime) {
this.maxCommitTime = maxCommitTime;
}
public String getBasePath() {
return basePath;
}
public boolean getBelongsToIncrementalQuery() {
return belongsToIncrementalPath;
}
public void setBasePath(String basePath) {
this.basePath = basePath;
}
public void setBaseFilePath(String baseFilePath) {
this.baseFilePath = baseFilePath;
}
public boolean splitable() {
return !baseFilePath.isEmpty();
}
public PathWithBootstrapFileStatus getPathWithBootstrapFileStatus() {
return pathWithBootstrapFileStatus;
}
public void setPathWithBootstrapFileStatus(PathWithBootstrapFileStatus pathWithBootstrapFileStatus) {
this.pathWithBootstrapFileStatus = pathWithBootstrapFileStatus;
}
public boolean includeBootstrapFilePath() {
return pathWithBootstrapFileStatus != null;
}
public BaseFileWithLogsSplit buildSplit(Path file, long start, long length, String[] hosts) {
BaseFileWithLogsSplit bs = new BaseFileWithLogsSplit(file, start, length, hosts);
bs.setBelongsToIncrementalQuery(belongsToIncrementalPath);
bs.setDeltaLogFiles(deltaLogFiles);
bs.setMaxCommitTime(maxCommitTime);
bs.setBasePath(basePath);
bs.setBaseFilePath(baseFilePath);
return bs;
}
}

View File

@@ -18,13 +18,14 @@
package org.apache.hudi.hadoop;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.realtime.HoodieRealtimePath;
import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
@@ -34,51 +35,62 @@ import java.util.List;
* in Path.
*/
public class RealtimeFileStatus extends FileStatus {
// a flag to mark this split is produced by incremental query or not.
private boolean belongToIncrementalFileStatus = false;
// the log files belong this fileStatus.
private List<HoodieLogFile> deltaLogFiles = new ArrayList<>();
// max commit time of current fileStatus.
/**
* Base path of the table this path belongs to
*/
private final String basePath;
/**
* List of delta log-files holding updated records for this base-file
*/
private final List<HoodieLogFile> deltaLogFiles;
/**
* Marks whether this path produced as part of Incremental Query
*/
private final boolean belongsToIncrementalQuery;
/**
* Latest commit instant available at the time of the query in which all of the files
* pertaining to this split are represented
*/
private String maxCommitTime = "";
// the basePath of current hoodie table.
private String basePath = "";
// the base file belong to this status;
private String baseFilePath = "";
// the bootstrap file belong to this status.
// only if current query table is bootstrap table, this field is used.
/**
* File status for the Bootstrap file (only relevant if this table is a bootstrapped table
*/
private FileStatus bootStrapFileStatus;
/**
* Virtual key configuration of the table this split belongs to
*/
private final Option<HoodieVirtualKeyInfo> virtualKeyInfo;
public RealtimeFileStatus(FileStatus fileStatus) throws IOException {
public RealtimeFileStatus(FileStatus fileStatus,
String basePath,
List<HoodieLogFile> deltaLogFiles,
boolean belongsToIncrementalQuery,
Option<HoodieVirtualKeyInfo> virtualKeyInfo) throws IOException {
super(fileStatus);
this.basePath = basePath;
this.deltaLogFiles = deltaLogFiles;
this.belongsToIncrementalQuery = belongsToIncrementalQuery;
this.virtualKeyInfo = virtualKeyInfo;
}
@Override
public Path getPath() {
Path path = super.getPath();
PathWithLogFilePath pathWithLogFilePath = new PathWithLogFilePath(path.getParent(), path.getName());
pathWithLogFilePath.setBelongsToIncrementalPath(belongToIncrementalFileStatus);
pathWithLogFilePath.setDeltaLogFiles(deltaLogFiles);
pathWithLogFilePath.setMaxCommitTime(maxCommitTime);
pathWithLogFilePath.setBasePath(basePath);
pathWithLogFilePath.setBaseFilePath(baseFilePath);
if (bootStrapFileStatus != null) {
pathWithLogFilePath.setPathWithBootstrapFileStatus((PathWithBootstrapFileStatus)bootStrapFileStatus.getPath());
}
return pathWithLogFilePath;
}
public void setBelongToIncrementalFileStatus(boolean belongToIncrementalFileStatus) {
this.belongToIncrementalFileStatus = belongToIncrementalFileStatus;
HoodieRealtimePath realtimePath = new HoodieRealtimePath(path.getParent(), path.getName(), basePath,
deltaLogFiles, maxCommitTime, belongsToIncrementalQuery, virtualKeyInfo);
if (bootStrapFileStatus != null) {
realtimePath.setPathWithBootstrapFileStatus((PathWithBootstrapFileStatus)bootStrapFileStatus.getPath());
}
return realtimePath;
}
public List<HoodieLogFile> getDeltaLogFiles() {
return deltaLogFiles;
}
public void setDeltaLogFiles(List<HoodieLogFile> deltaLogFiles) {
this.deltaLogFiles = deltaLogFiles;
}
public String getMaxCommitTime() {
return maxCommitTime;
}
@@ -87,18 +99,6 @@ public class RealtimeFileStatus extends FileStatus {
this.maxCommitTime = maxCommitTime;
}
public String getBasePath() {
return basePath;
}
public void setBasePath(String basePath) {
this.basePath = basePath;
}
public void setBaseFilePath(String baseFilePath) {
this.baseFilePath = baseFilePath;
}
public void setBootStrapFileStatus(FileStatus bootStrapFileStatus) {
this.bootStrapFileStatus = bootStrapFileStatus;
}

View File

@@ -26,8 +26,10 @@ import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
@@ -37,11 +39,12 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.HiveHoodieTableFileIndex;
import org.apache.hudi.hadoop.HoodieCopyOnWriteTableInputFormat;
import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.PathWithLogFilePath;
import org.apache.hudi.hadoop.RealtimeFileStatus;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
@@ -53,6 +56,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* Base implementation of the Hive's {@link FileInputFormat} allowing for reading of Hudi's
@@ -63,18 +69,38 @@ import java.util.stream.Collectors;
* <li>Incremental mode: reading table's state as of particular timestamp (or instant, in Hudi's terms)</li>
* <li>External mode: reading non-Hudi partitions</li>
* </ul>
*
* <p>
* NOTE: This class is invariant of the underlying file-format of the files being read
*/
public class HoodieMergeOnReadTableInputFormat extends HoodieCopyOnWriteTableInputFormat implements Configurable {
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
List<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList());
List<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits))
.map(is -> (FileSplit) is)
.collect(Collectors.toList());
return HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits)
? HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, fileSplits)
: HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
return (containsIncrementalQuerySplits(fileSplits) ? filterIncrementalQueryFileSplits(fileSplits) : fileSplits)
.toArray(new FileSplit[0]);
}
@Override
protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt) {
Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
Option<HoodieLogFile> latestLogFileOpt = fileSlice.getLatestLogFile();
Stream<HoodieLogFile> logFiles = fileSlice.getLogFiles();
Option<HoodieInstant> latestCompletedInstantOpt = fileIndex.getLatestCompletedInstant();
String tableBasePath = fileIndex.getBasePath();
// Check if we're reading a MOR table
if (baseFileOpt.isPresent()) {
return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles, tableBasePath, latestCompletedInstantOpt, virtualKeyInfoOpt);
} else if (latestLogFileOpt.isPresent()) {
return createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), logFiles, tableBasePath, latestCompletedInstantOpt, virtualKeyInfoOpt);
} else {
throw new IllegalStateException("Invalid state: either base-file or log-file has to be present");
}
}
/**
@@ -126,7 +152,7 @@ public class HoodieMergeOnReadTableInputFormat extends HoodieCopyOnWriteTableInp
// build fileGroup from fsView
List<FileStatus> affectedFileStatus = Arrays.asList(HoodieInputFormatUtils
.listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), metadataList));
.listAffectedFilesForCommits(job, new Path(tableMetaClient.getBasePath()), metadataList));
// step3
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0]));
// build fileGroup from fsView
@@ -152,21 +178,17 @@ public class HoodieMergeOnReadTableInputFormat extends HoodieCopyOnWriteTableInp
candidateFileStatus.put(key, fileStatuses[i]);
}
Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt = getHoodieVirtualKeyInfo(tableMetaClient);
String maxCommitTime = fsView.getLastInstant().get().getTimestamp();
// step6
result.addAll(collectAllIncrementalFiles(fileGroups, maxCommitTime, basePath.toString(), candidateFileStatus));
result.addAll(collectAllIncrementalFiles(fileGroups, maxCommitTime, basePath.toString(), candidateFileStatus, virtualKeyInfoOpt));
return result;
}
@Override
protected boolean includeLogFilesForSnapshotView() {
return true;
}
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
if (filename instanceof PathWithLogFilePath) {
return ((PathWithLogFilePath)filename).splitable();
if (filename instanceof HoodieRealtimePath) {
return ((HoodieRealtimePath) filename).isSplitable();
}
return super.isSplitable(fs, filename);
@@ -177,21 +199,26 @@ public class HoodieMergeOnReadTableInputFormat extends HoodieCopyOnWriteTableInp
// PathWithLogFilePath, so those bootstrap files should be processed int this function.
@Override
protected FileSplit makeSplit(Path file, long start, long length, String[] hosts) {
if (file instanceof PathWithLogFilePath) {
return doMakeSplitForPathWithLogFilePath((PathWithLogFilePath) file, start, length, hosts, null);
if (file instanceof HoodieRealtimePath) {
return doMakeSplitForRealtimePath((HoodieRealtimePath) file, start, length, hosts, null);
}
return super.makeSplit(file, start, length, hosts);
}
@Override
protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) {
if (file instanceof PathWithLogFilePath) {
return doMakeSplitForPathWithLogFilePath((PathWithLogFilePath) file, start, length, hosts, inMemoryHosts);
if (file instanceof HoodieRealtimePath) {
return doMakeSplitForRealtimePath((HoodieRealtimePath) file, start, length, hosts, inMemoryHosts);
}
return super.makeSplit(file, start, length, hosts, inMemoryHosts);
}
private List<FileStatus> collectAllIncrementalFiles(List<HoodieFileGroup> fileGroups, String maxCommitTime, String basePath, Map<String, FileStatus> candidateFileStatus) {
private static List<FileStatus> collectAllIncrementalFiles(List<HoodieFileGroup> fileGroups,
String maxCommitTime,
String basePath,
Map<String, FileStatus> candidateFileStatus,
Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt) {
List<FileStatus> result = new ArrayList<>();
fileGroups.stream().forEach(f -> {
try {
@@ -202,15 +229,12 @@ public class HoodieMergeOnReadTableInputFormat extends HoodieCopyOnWriteTableInp
if (!candidateFileStatus.containsKey(baseFilePath)) {
throw new HoodieException("Error obtaining fileStatus for file: " + baseFilePath);
}
List<HoodieLogFile> deltaLogFiles = f.getLatestFileSlice().get().getLogFiles().collect(Collectors.toList());
// We cannot use baseFileStatus.getPath() here, since baseFileStatus.getPath() missing file size information.
// So we use candidateFileStatus.get(baseFileStatus.getPath()) to get a correct path.
RealtimeFileStatus fileStatus = new RealtimeFileStatus(candidateFileStatus.get(baseFilePath));
RealtimeFileStatus fileStatus = new RealtimeFileStatus(candidateFileStatus.get(baseFilePath),
basePath, deltaLogFiles, true, virtualKeyInfoOpt);
fileStatus.setMaxCommitTime(maxCommitTime);
fileStatus.setBelongToIncrementalFileStatus(true);
fileStatus.setBasePath(basePath);
fileStatus.setBaseFilePath(baseFilePath);
fileStatus.setDeltaLogFiles(f.getLatestFileSlice().get().getLogFiles().collect(Collectors.toList()));
// try to set bootstrapfileStatus
if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
fileStatus.setBootStrapFileStatus(baseFileStatus);
}
@@ -220,11 +244,10 @@ public class HoodieMergeOnReadTableInputFormat extends HoodieCopyOnWriteTableInp
if (f.getLatestFileSlice().isPresent() && baseFiles.isEmpty()) {
List<FileStatus> logFileStatus = f.getLatestFileSlice().get().getLogFiles().map(logFile -> logFile.getFileStatus()).collect(Collectors.toList());
if (logFileStatus.size() > 0) {
RealtimeFileStatus fileStatus = new RealtimeFileStatus(logFileStatus.get(0));
fileStatus.setBelongToIncrementalFileStatus(true);
fileStatus.setDeltaLogFiles(logFileStatus.stream().map(l -> new HoodieLogFile(l.getPath(), l.getLen())).collect(Collectors.toList()));
List<HoodieLogFile> deltaLogFiles = logFileStatus.stream().map(l -> new HoodieLogFile(l.getPath(), l.getLen())).collect(Collectors.toList());
RealtimeFileStatus fileStatus = new RealtimeFileStatus(logFileStatus.get(0), basePath,
deltaLogFiles, true, virtualKeyInfoOpt);
fileStatus.setMaxCommitTime(maxCommitTime);
fileStatus.setBasePath(basePath);
result.add(fileStatus);
}
}
@@ -235,20 +258,117 @@ public class HoodieMergeOnReadTableInputFormat extends HoodieCopyOnWriteTableInp
return result;
}
private FileSplit doMakeSplitForPathWithLogFilePath(PathWithLogFilePath path, long start, long length, String[] hosts, String[] inMemoryHosts) {
if (!path.includeBootstrapFilePath()) {
return path.buildSplit(path, start, length, hosts);
} else {
private FileSplit doMakeSplitForRealtimePath(HoodieRealtimePath path, long start, long length, String[] hosts, String[] inMemoryHosts) {
if (path.includeBootstrapFilePath()) {
FileSplit bf =
inMemoryHosts == null
? super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts)
: super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts, inMemoryHosts);
return HoodieRealtimeInputFormatUtils.createRealtimeBoostrapBaseFileSplit(
return createRealtimeBoostrapBaseFileSplit(
(BootstrapBaseFileSplit) bf,
path.getBasePath(),
path.getDeltaLogFiles(),
path.getMaxCommitTime(),
path.getBelongsToIncrementalQuery());
path.getBelongsToIncrementalQuery(),
path.getVirtualKeyInfo()
);
}
return createRealtimeFileSplit(path, start, length, hosts);
}
private static boolean containsIncrementalQuerySplits(List<FileSplit> fileSplits) {
return fileSplits.stream().anyMatch(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery);
}
private static List<FileSplit> filterIncrementalQueryFileSplits(List<FileSplit> fileSplits) {
return fileSplits.stream().filter(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery)
.collect(Collectors.toList());
}
private static HoodieRealtimeFileSplit createRealtimeFileSplit(HoodieRealtimePath path, long start, long length, String[] hosts) {
try {
return new HoodieRealtimeFileSplit(new FileSplit(path, start, length, hosts), path);
} catch (IOException e) {
throw new HoodieIOException(String.format("Failed to create instance of %s", HoodieRealtimeFileSplit.class.getName()), e);
}
}
private static HoodieRealtimeBootstrapBaseFileSplit createRealtimeBoostrapBaseFileSplit(BootstrapBaseFileSplit split,
String basePath,
List<HoodieLogFile> logFiles,
String maxInstantTime,
boolean belongsToIncrementalQuery,
Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt) {
try {
String[] hosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo())
.filter(x -> !x.isInMemory()).toArray(String[]::new) : new String[0];
String[] inMemoryHosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo())
.filter(SplitLocationInfo::isInMemory).toArray(String[]::new) : new String[0];
FileSplit baseSplit = new FileSplit(split.getPath(), split.getStart(), split.getLength(),
hosts, inMemoryHosts);
return new HoodieRealtimeBootstrapBaseFileSplit(baseSplit, basePath, logFiles, maxInstantTime, split.getBootstrapFileSplit(),
belongsToIncrementalQuery, virtualKeyInfoOpt);
} catch (IOException e) {
throw new HoodieIOException("Error creating hoodie real time split ", e);
}
}
/**
* Creates {@link RealtimeFileStatus} for the file-slice where base file is present
*/
private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
Stream<HoodieLogFile> logFiles,
String basePath,
Option<HoodieInstant> latestCompletedInstantOpt,
Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt) {
FileStatus baseFileStatus = getFileStatusUnchecked(baseFile);
List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
try {
RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus, basePath, sortedLogFiles,
false, virtualKeyInfoOpt);
if (latestCompletedInstantOpt.isPresent()) {
HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
checkState(latestCompletedInstant.isCompleted());
rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
}
if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
rtFileStatus.setBootStrapFileStatus(baseFileStatus);
}
return rtFileStatus;
} catch (IOException e) {
throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
}
}
/**
* Creates {@link RealtimeFileStatus} for the file-slice where base file is NOT present
*/
private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile,
Stream<HoodieLogFile> logFiles,
String basePath,
Option<HoodieInstant> latestCompletedInstantOpt,
Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt) {
List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
try {
RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(latestLogFile.getFileStatus(), basePath,
sortedLogFiles, false, virtualKeyInfoOpt);
if (latestCompletedInstantOpt.isPresent()) {
HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
checkState(latestCompletedInstant.isCompleted());
rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
}
return rtFileStatus;
} catch (IOException e) {
throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
}
}
}

View File

@@ -94,7 +94,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat {
// TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction
// time.
if (!realtimeSplit.getDeltaLogPaths().isEmpty()) {
HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, realtimeSplit.getHoodieVirtualKeyInfo());
HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, realtimeSplit.getVirtualKeyInfo());
}
jobConf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true");
setConf(jobConf);

View File

@@ -22,71 +22,73 @@ import org.apache.hadoop.mapred.FileSplit;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hudi.hadoop.InputSplitUtils;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* Realtime File Split with external base file.
* Realtime {@link FileSplit} with external base file
*
* NOTE: If you're adding fields here you need to make sure that you appropriately de-/serialize them
* in {@link #readFromInput(DataInput)} and {@link #writeToOutput(DataOutput)}
*/
public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit implements RealtimeSplit {
private List<String> deltaLogPaths;
public class HoodieRealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit implements RealtimeSplit {
/**
* Marks whether this path produced as part of Incremental Query
*/
private boolean belongsToIncrementalQuery = false;
/**
* List of delta log-files holding updated records for this base-file
*/
private List<HoodieLogFile> deltaLogFiles = new ArrayList<>();
private String maxInstantTime;
/**
* Latest commit instant available at the time of the query in which all of the files
* pertaining to this split are represented
*/
private String maxCommitTime;
/**
* Base path of the table this path belongs to
*/
private String basePath;
private boolean belongsToIncrementalSplit;
/**
* Virtual key configuration of the table this split belongs to
*/
private Option<HoodieVirtualKeyInfo> virtualKeyInfo = Option.empty();
/**
* NOTE: This ctor is necessary for Hive to be able to serialize and
* then instantiate it when deserializing back
*/
public RealtimeBootstrapBaseFileSplit() {
super();
}
public HoodieRealtimeBootstrapBaseFileSplit() {}
public RealtimeBootstrapBaseFileSplit(FileSplit baseSplit,
String basePath,
List<HoodieLogFile> deltaLogFiles,
String maxInstantTime,
FileSplit externalFileSplit,
boolean belongsToIncrementalQuery) throws IOException {
public HoodieRealtimeBootstrapBaseFileSplit(FileSplit baseSplit,
String basePath,
List<HoodieLogFile> deltaLogFiles,
String maxInstantTime,
FileSplit externalFileSplit,
boolean belongsToIncrementalQuery,
Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt) throws IOException {
super(baseSplit, externalFileSplit);
this.maxInstantTime = maxInstantTime;
this.maxCommitTime = maxInstantTime;
this.deltaLogFiles = deltaLogFiles;
this.deltaLogPaths = deltaLogFiles.stream().map(entry -> entry.getPath().toString()).collect(Collectors.toList());
this.basePath = basePath;
this.belongsToIncrementalSplit = belongsToIncrementalQuery;
this.belongsToIncrementalQuery = belongsToIncrementalQuery;
this.virtualKeyInfo = virtualKeyInfoOpt;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
writeToOutput(out);
InputSplitUtils.writeBoolean(belongsToIncrementalSplit, out);
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
readFromInput(in);
belongsToIncrementalSplit = InputSplitUtils.readBoolean(in);
}
@Override
public List<String> getDeltaLogPaths() {
return deltaLogPaths;
}
@Override
@@ -94,9 +96,14 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit imple
return deltaLogFiles;
}
@Override
public void setDeltaLogFiles(List<HoodieLogFile> deltaLogFiles) {
this.deltaLogFiles = deltaLogFiles;
}
@Override
public String getMaxCommitTime() {
return maxInstantTime;
return maxCommitTime;
}
@Override
@@ -105,22 +112,23 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit imple
}
@Override
public Option<HoodieVirtualKeyInfo> getHoodieVirtualKeyInfo() {
return Option.empty();
}
public boolean getBelongsToIncrementalQuery() {
return belongsToIncrementalSplit;
public Option<HoodieVirtualKeyInfo> getVirtualKeyInfo() {
return virtualKeyInfo;
}
@Override
public void setDeltaLogPaths(List<String> deltaLogPaths) {
this.deltaLogPaths = deltaLogPaths;
public boolean getBelongsToIncrementalQuery() {
return belongsToIncrementalQuery;
}
@Override
public void setBelongsToIncrementalQuery(boolean belongsToIncrementalPath) {
this.belongsToIncrementalQuery = belongsToIncrementalPath;
}
@Override
public void setMaxCommitTime(String maxInstantTime) {
this.maxInstantTime = maxInstantTime;
this.maxCommitTime = maxInstantTime;
}
@Override
@@ -129,6 +137,7 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit imple
}
@Override
public void setHoodieVirtualKeyInfo(Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo) {}
public void setVirtualKeyInfo(Option<HoodieVirtualKeyInfo> virtualKeyInfo) {
this.virtualKeyInfo = virtualKeyInfo;
}
}

View File

@@ -18,86 +18,127 @@
package org.apache.hudi.hadoop.realtime;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hadoop.mapred.FileSplit;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* Filesplit that wraps the base split and a list of log files to merge deltas from.
* {@link FileSplit} implementation that holds
* <ol>
* <li>Split corresponding to the base file</li>
* <li>List of {@link HoodieLogFile} that holds the delta to be merged (upon reading)</li>
* </ol>
*
* This split is correspondent to a single file-slice in the Hudi terminology.
*
* NOTE: If you're adding fields here you need to make sure that you appropriately de-/serialize them
* in {@link #readFromInput(DataInput)} and {@link #writeToOutput(DataOutput)}
*/
public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit {
private List<String> deltaLogPaths;
/**
* List of delta log-files holding updated records for this base-file
*/
private List<HoodieLogFile> deltaLogFiles = new ArrayList<>();
private String maxCommitTime;
/**
* Base path of the table this path belongs to
*/
private String basePath;
private Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty();
/**
* Latest commit instant available at the time of the query in which all of the files
* pertaining to this split are represented
*/
private String maxCommitTime;
/**
* Marks whether this path produced as part of Incremental Query
*/
private boolean belongsToIncrementalQuery = false;
/**
* Virtual key configuration of the table this split belongs to
*/
private Option<HoodieVirtualKeyInfo> virtualKeyInfo = Option.empty();
public HoodieRealtimeFileSplit() {}
public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List<HoodieLogFile> deltaLogFiles, String maxCommitTime,
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo)
public HoodieRealtimeFileSplit(FileSplit baseSplit,
HoodieRealtimePath path)
throws IOException {
this(baseSplit,
path.getBasePath(),
path.getDeltaLogFiles(),
path.getMaxCommitTime(),
path.getBelongsToIncrementalQuery(),
path.getVirtualKeyInfo());
}
/**
* @VisibleInTesting
*/
public HoodieRealtimeFileSplit(FileSplit baseSplit,
String basePath,
List<HoodieLogFile> deltaLogFiles,
String maxCommitTime,
boolean belongsToIncrementalQuery,
Option<HoodieVirtualKeyInfo> virtualKeyInfo)
throws IOException {
super(baseSplit.getPath(), baseSplit.getStart(), baseSplit.getLength(), baseSplit.getLocations());
this.deltaLogFiles = deltaLogFiles;
this.deltaLogPaths = deltaLogFiles.stream().map(entry -> entry.getPath().toString()).collect(Collectors.toList());
this.maxCommitTime = maxCommitTime;
this.basePath = basePath;
this.hoodieVirtualKeyInfo = hoodieVirtualKeyInfo;
}
public List<String> getDeltaLogPaths() {
return deltaLogPaths;
this.maxCommitTime = maxCommitTime;
this.belongsToIncrementalQuery = belongsToIncrementalQuery;
this.virtualKeyInfo = virtualKeyInfo;
}
public List<HoodieLogFile> getDeltaLogFiles() {
return deltaLogFiles;
}
@Override
public void setDeltaLogFiles(List<HoodieLogFile> deltaLogFiles) {
this.deltaLogFiles = deltaLogFiles;
}
public String getMaxCommitTime() {
return maxCommitTime;
}
public String getBasePath() {
return basePath;
}
@Override
public void setHoodieVirtualKeyInfo(Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo) {
this.hoodieVirtualKeyInfo = hoodieVirtualKeyInfo;
}
@Override
public Option<HoodieVirtualKeyInfo> getHoodieVirtualKeyInfo() {
return hoodieVirtualKeyInfo;
}
public void setDeltaLogPaths(List<String> deltaLogPaths) {
this.deltaLogPaths = deltaLogPaths;
}
public void setMaxCommitTime(String maxCommitTime) {
this.maxCommitTime = maxCommitTime;
}
public String getBasePath() {
return basePath;
}
public void setBasePath(String basePath) {
this.basePath = basePath;
}
@Override
public void setVirtualKeyInfo(Option<HoodieVirtualKeyInfo> virtualKeyInfo) {
this.virtualKeyInfo = virtualKeyInfo;
}
@Override
public Option<HoodieVirtualKeyInfo> getVirtualKeyInfo() {
return virtualKeyInfo;
}
@Override
public boolean getBelongsToIncrementalQuery() {
return belongsToIncrementalQuery;
}
@Override
public void setBelongsToIncrementalQuery(boolean belongsToIncrementalPath) {
this.belongsToIncrementalQuery = belongsToIncrementalPath;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
@@ -112,7 +153,7 @@ public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit
@Override
public String toString() {
return "HoodieRealtimeFileSplit{DataPath=" + getPath() + ", deltaLogPaths=" + deltaLogPaths
return "HoodieRealtimeFileSplit{DataPath=" + getPath() + ", deltaLogPaths=" + getDeltaLogPaths()
+ ", maxCommitTime='" + maxCommitTime + '\'' + ", basePath='" + basePath + '\'' + '}';
}
}

View File

@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hadoop.realtime;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.PathWithBootstrapFileStatus;
import java.util.List;
/**
* {@link Path} implementation encoding additional information necessary to appropriately read
* base files of the MOR tables, such as list of delta log files (holding updated records) associated
* w/ the base file, etc.
*/
public class HoodieRealtimePath extends Path {
/**
* Marks whether this path produced as part of Incremental Query
*/
private final boolean belongsToIncrementalQuery;
/**
* List of delta log-files holding updated records for this base-file
*/
private final List<HoodieLogFile> deltaLogFiles;
/**
* Latest commit instant available at the time of the query in which all of the files
* pertaining to this split are represented
*/
private final String maxCommitTime;
/**
* Base path of the table this path belongs to
*/
private final String basePath;
/**
* Virtual key configuration of the table this split belongs to
*/
private final Option<HoodieVirtualKeyInfo> virtualKeyInfo;
/**
* File status for the Bootstrap file (only relevant if this table is a bootstrapped table
*/
private PathWithBootstrapFileStatus pathWithBootstrapFileStatus;
public HoodieRealtimePath(Path parent,
String child,
String basePath,
List<HoodieLogFile> deltaLogFiles,
String maxCommitTime,
boolean belongsToIncrementalQuery,
Option<HoodieVirtualKeyInfo> virtualKeyInfo) {
super(parent, child);
this.basePath = basePath;
this.deltaLogFiles = deltaLogFiles;
this.maxCommitTime = maxCommitTime;
this.belongsToIncrementalQuery = belongsToIncrementalQuery;
this.virtualKeyInfo = virtualKeyInfo;
}
public List<HoodieLogFile> getDeltaLogFiles() {
return deltaLogFiles;
}
public String getMaxCommitTime() {
return maxCommitTime;
}
public String getBasePath() {
return basePath;
}
public boolean getBelongsToIncrementalQuery() {
return belongsToIncrementalQuery;
}
public boolean isSplitable() {
return !toString().isEmpty();
}
public PathWithBootstrapFileStatus getPathWithBootstrapFileStatus() {
return pathWithBootstrapFileStatus;
}
public void setPathWithBootstrapFileStatus(PathWithBootstrapFileStatus pathWithBootstrapFileStatus) {
this.pathWithBootstrapFileStatus = pathWithBootstrapFileStatus;
}
public boolean includeBootstrapFilePath() {
return pathWithBootstrapFileStatus != null;
}
public Option<HoodieVirtualKeyInfo> getVirtualKeyInfo() {
return virtualKeyInfo;
}
}

View File

@@ -18,6 +18,12 @@
package org.apache.hudi.hadoop.realtime;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
@@ -29,13 +35,6 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -55,7 +54,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
private final Set<String> deltaRecordKeys;
private final HoodieMergedLogRecordScanner mergedLogRecordScanner;
private int recordKeyIndex = HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
private final int recordKeyIndex;
private Iterator<String> deltaItr;
public RealtimeCompactedRecordReader(RealtimeSplit split, JobConf job,
@@ -65,9 +64,9 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
this.mergedLogRecordScanner = getMergedLogRecordScanner();
this.deltaRecordMap = mergedLogRecordScanner.getRecords();
this.deltaRecordKeys = new HashSet<>(this.deltaRecordMap.keySet());
if (split.getHoodieVirtualKeyInfo().isPresent()) {
this.recordKeyIndex = split.getHoodieVirtualKeyInfo().get().getRecordKeyFieldIndex();
}
this.recordKeyIndex = split.getVirtualKeyInfo()
.map(HoodieVirtualKeyInfo::getRecordKeyFieldIndex)
.orElse(HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS);
}
/**

View File

@@ -18,18 +18,18 @@
package org.apache.hudi.hadoop.realtime;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.InputSplitUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* Realtime Input Split Interface.
@@ -40,10 +40,14 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo {
* Return Log File Paths.
* @return
*/
List<String> getDeltaLogPaths();
default List<String> getDeltaLogPaths() {
return getDeltaLogFiles().stream().map(entry -> entry.getPath().toString()).collect(Collectors.toList());
}
List<HoodieLogFile> getDeltaLogFiles();
void setDeltaLogFiles(List<HoodieLogFile> deltaLogFiles);
/**
* Return Max Instant Time.
* @return
@@ -60,14 +64,12 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo {
* Returns Virtual key info if meta fields are disabled.
* @return
*/
Option<HoodieVirtualKeyInfo> getHoodieVirtualKeyInfo();
Option<HoodieVirtualKeyInfo> getVirtualKeyInfo();
/**
* Update Log File Paths.
*
* @param deltaLogPaths
* Returns the flag whether this split belongs to an Incremental Query
*/
void setDeltaLogPaths(List<String> deltaLogPaths);
boolean getBelongsToIncrementalQuery();
/**
* Update Maximum valid instant time.
@@ -81,17 +83,25 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo {
*/
void setBasePath(String basePath);
void setHoodieVirtualKeyInfo(Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo);
/**
* Sets the flag whether this split belongs to an Incremental Query
*/
void setBelongsToIncrementalQuery(boolean belongsToIncrementalQuery);
void setVirtualKeyInfo(Option<HoodieVirtualKeyInfo> virtualKeyInfo);
default void writeToOutput(DataOutput out) throws IOException {
InputSplitUtils.writeString(getBasePath(), out);
InputSplitUtils.writeString(getMaxCommitTime(), out);
out.writeInt(getDeltaLogPaths().size());
for (String logFilePath : getDeltaLogPaths()) {
InputSplitUtils.writeString(logFilePath, out);
InputSplitUtils.writeBoolean(getBelongsToIncrementalQuery(), out);
out.writeInt(getDeltaLogFiles().size());
for (HoodieLogFile logFile : getDeltaLogFiles()) {
InputSplitUtils.writeString(logFile.getPath().toString(), out);
out.writeLong(logFile.getFileSize());
}
Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt = getHoodieVirtualKeyInfo();
Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt = getVirtualKeyInfo();
if (!virtualKeyInfoOpt.isPresent()) {
InputSplitUtils.writeBoolean(false, out);
} else {
@@ -106,34 +116,39 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo {
default void readFromInput(DataInput in) throws IOException {
setBasePath(InputSplitUtils.readString(in));
setMaxCommitTime(InputSplitUtils.readString(in));
setBelongsToIncrementalQuery(InputSplitUtils.readBoolean(in));
int totalLogFiles = in.readInt();
List<String> deltaLogPaths = new ArrayList<>(totalLogFiles);
List<HoodieLogFile> deltaLogPaths = new ArrayList<>(totalLogFiles);
for (int i = 0; i < totalLogFiles; i++) {
deltaLogPaths.add(InputSplitUtils.readString(in));
String logFilePath = InputSplitUtils.readString(in);
long logFileSize = in.readLong();
deltaLogPaths.add(new HoodieLogFile(new Path(logFilePath), logFileSize));
}
setDeltaLogPaths(deltaLogPaths);
setDeltaLogFiles(deltaLogPaths);
boolean hoodieVirtualKeyPresent = InputSplitUtils.readBoolean(in);
if (hoodieVirtualKeyPresent) {
String recordKeyField = InputSplitUtils.readString(in);
String partitionPathField = InputSplitUtils.readString(in);
int recordFieldIndex = Integer.parseInt(InputSplitUtils.readString(in));
int partitionPathIndex = Integer.parseInt(InputSplitUtils.readString(in));
setHoodieVirtualKeyInfo(Option.of(new HoodieVirtualKeyInfo(recordKeyField, partitionPathField, recordFieldIndex, partitionPathIndex)));
setVirtualKeyInfo(Option.of(new HoodieVirtualKeyInfo(recordKeyField, partitionPathField, recordFieldIndex, partitionPathIndex)));
}
}
/**
* The file containing this split's data.
*/
public Path getPath();
Path getPath();
/**
* The position of the first byte in the file to process.
*/
public long getStart();
long getStart();
/**
* The number of bytes in the file to process.
*/
public long getLength();
long getLength();
}

View File

@@ -19,15 +19,11 @@
package org.apache.hudi.hadoop.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -148,39 +144,6 @@ public class HoodieHiveUtils {
return result;
}
/**
* Depending on the configs hoodie.%s.consume.pending.commits and hoodie.%s.consume.commit of job
*
* (hoodie.<tableName>.consume.pending.commits, hoodie.<tableName>.consume.commit) ->
* (true, validCommit) -> returns activeTimeline filtered until validCommit
* (true, InValidCommit) -> Raises HoodieIOException
* (true, notSet) -> Raises HoodieIOException
* (false, validCommit) -> returns completedTimeline filtered until validCommit
* (false, InValidCommit) -> Raises HoodieIOException
* (false or notSet, notSet) -> returns completedTimeline unfiltered
*
* validCommit is one which exists in the timeline being checked and vice versa
*/
public static HoodieTimeline getTableTimeline(final String tableName, final JobConf job, final HoodieTableMetaClient metaClient) {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline();
boolean includePendingCommits = shouldIncludePendingCommits(job, tableName);
Option<String> maxCommit = getMaxCommit(job, tableName);
HoodieTimeline finalizedTimeline = includePendingCommits ? timeline : timeline.filterCompletedInstants();
return !maxCommit.isPresent() ? finalizedTimeline : filterIfInstantExists(tableName, finalizedTimeline, maxCommit.get());
}
private static HoodieTimeline filterIfInstantExists(String tableName, HoodieTimeline timeline, String maxCommit) {
if (maxCommit == null || !timeline.containsInstant(maxCommit)) {
LOG.info("Timestamp " + maxCommit + " doesn't exist in the commits timeline:" + timeline + " table: " + tableName);
throw new HoodieIOException("Valid timestamp is required for " + HOODIE_CONSUME_COMMIT + " in snapshot mode");
}
return timeline.findInstantsBeforeOrEquals(maxCommit);
}
public static boolean isIncrementalUseDatabase(Configuration conf) {
return conf.getBoolean(HOODIE_INCREMENTAL_USE_DATABASE, false);
}

View File

@@ -18,34 +18,6 @@
package org.apache.hudi.hadoop.utils;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.HoodieHFileInputFormat;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.RealtimeFileStatus;
import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -57,8 +29,29 @@ import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.HoodieHFileInputFormat;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -437,70 +430,6 @@ public class HoodieInputFormatUtils {
.build();
}
/**
* @deprecated
*/
public static List<FileStatus> filterFileStatusForSnapshotMode(JobConf job, Map<String, HoodieTableMetaClient> tableMetaClientMap,
List<Path> snapshotPaths, boolean includeLogFiles) throws IOException {
HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(job);
List<FileStatus> returns = new ArrayList<>();
Map<HoodieTableMetaClient, List<Path>> groupedPaths =
HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths);
Map<HoodieTableMetaClient, HoodieTableFileSystemView> fsViewCache = new HashMap<>();
LOG.info("Found a total of " + groupedPaths.size() + " groups");
try {
for (Map.Entry<HoodieTableMetaClient, List<Path>> entry : groupedPaths.entrySet()) {
HoodieTableMetaClient metaClient = entry.getKey();
if (LOG.isDebugEnabled()) {
LOG.debug("Hoodie Metadata initialized with completed commit instant as :" + metaClient);
}
HoodieTimeline timeline = HoodieHiveUtils.getTableTimeline(metaClient.getTableConfig().getTableName(), job, metaClient);
HoodieTableFileSystemView fsView = fsViewCache.computeIfAbsent(metaClient, tableMetaClient ->
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, tableMetaClient, buildMetadataConfig(job), timeline));
List<HoodieBaseFile> filteredBaseFiles = new ArrayList<>();
Map<FileStatus, List<HoodieLogFile>> filteredLogs = new HashMap<>();
for (Path p : entry.getValue()) {
String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), p);
List<HoodieBaseFile> matched = fsView.getLatestBaseFiles(relativePartitionPath).collect(Collectors.toList());
filteredBaseFiles.addAll(matched);
if (includeLogFiles) {
List<FileSlice> logMatched = fsView.getLatestFileSlices(relativePartitionPath)
.filter(f -> !f.getBaseFile().isPresent() && f.getLatestLogFile().isPresent())
.collect(Collectors.toList());
logMatched.forEach(f -> {
List<HoodieLogFile> logPathSizePairs = f.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
filteredLogs.put(f.getLatestLogFile().get().getFileStatus(), logPathSizePairs);
});
}
}
LOG.info("Total paths to process after hoodie filter " + filteredBaseFiles.size());
for (HoodieBaseFile filteredFile : filteredBaseFiles) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
}
filteredFile = refreshFileStatus(job, filteredFile);
returns.add(getFileStatus(filteredFile));
}
for (Map.Entry<FileStatus, List<HoodieLogFile>> filterLogEntry : filteredLogs.entrySet()) {
RealtimeFileStatus rs = new RealtimeFileStatus(filterLogEntry.getKey());
rs.setDeltaLogFiles(filterLogEntry.getValue());
returns.add(rs);
}
}
} finally {
fsViewCache.forEach(((metaClient, fsView) -> fsView.close()));
}
return returns;
}
/**
* Checks the file status for a race condition which can set the file size to 0. 1. HiveInputFormat does
* super.listStatus() and gets back a FileStatus[] 2. Then it creates the HoodieTableMetaClient for the paths listed.
@@ -534,12 +463,12 @@ public class HoodieInputFormatUtils {
*
* @return the affected file status array
*/
public static FileStatus[] listAffectedFilesForCommits(Path basePath, List<HoodieCommitMetadata> metadataList) {
public static FileStatus[] listAffectedFilesForCommits(Configuration hadoopConf, Path basePath, List<HoodieCommitMetadata> metadataList) {
// TODO: Use HoodieMetaTable to extract affected file directly.
HashMap<String, FileStatus> fullPathToFileStatus = new HashMap<>();
// Iterate through the given commits.
for (HoodieCommitMetadata metadata: metadataList) {
fullPathToFileStatus.putAll(metadata.getFullPathToFileStatus(basePath.toString()));
fullPathToFileStatus.putAll(metadata.getFullPathToFileStatus(hadoopConf, basePath.toString()));
}
return fullPathToFileStatus.values().toArray(new FileStatus[0]);
}

View File

@@ -22,42 +22,26 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.BaseFileWithLogsSplit;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeBootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo;
import org.apache.hudi.hadoop.realtime.RealtimeBootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -66,230 +50,23 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.TypeUtils.unsafeCast;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
public static InputSplit[] getRealtimeSplits(Configuration conf, List<FileSplit> fileSplits) throws IOException {
if (fileSplits.isEmpty()) {
return new InputSplit[0];
}
FileSplit fileSplit = fileSplits.get(0);
// Pre-process table-config to fetch virtual key info
Path partitionPath = fileSplit.getPath().getParent();
HoodieTableMetaClient metaClient = getTableMetaClientForBasePathUnchecked(conf, partitionPath);
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfoOpt = getHoodieVirtualKeyInfo(metaClient);
// NOTE: This timeline is kept in sync w/ {@code HoodieTableFileIndexBase}
HoodieInstant latestCommitInstant =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
InputSplit[] finalSplits = fileSplits.stream()
.map(split -> {
// There are 4 types of splits could we have to handle here
// - {@code BootstrapBaseFileSplit}: in case base file does have associated bootstrap file,
// but does NOT have any log files appended (convert it to {@code RealtimeBootstrapBaseFileSplit})
// - {@code RealtimeBootstrapBaseFileSplit}: in case base file does have associated bootstrap file
// and does have log files appended
// - {@code BaseFileWithLogsSplit}: in case base file does NOT have associated bootstrap file
// and does have log files appended;
// - {@code FileSplit}: in case Hive passed down non-Hudi path
if (split instanceof RealtimeBootstrapBaseFileSplit) {
return split;
} else if (split instanceof BootstrapBaseFileSplit) {
BootstrapBaseFileSplit bootstrapBaseFileSplit = unsafeCast(split);
return createRealtimeBoostrapBaseFileSplit(
bootstrapBaseFileSplit,
metaClient.getBasePath(),
Collections.emptyList(),
latestCommitInstant.getTimestamp(),
false);
} else if (split instanceof BaseFileWithLogsSplit) {
BaseFileWithLogsSplit baseFileWithLogsSplit = unsafeCast(split);
return createHoodieRealtimeSplitUnchecked(baseFileWithLogsSplit, hoodieVirtualKeyInfoOpt);
} else {
// Non-Hudi paths might result in just generic {@code FileSplit} being
// propagated up to this point
return split;
}
})
.toArray(InputSplit[]::new);
LOG.info("Returning a total splits of " + finalSplits.length);
return finalSplits;
}
/**
* @deprecated
*/
public static InputSplit[] getRealtimeSplitsLegacy(Configuration conf, Stream<FileSplit> fileSplits) {
Map<Path, List<FileSplit>> partitionsToParquetSplits =
fileSplits.collect(Collectors.groupingBy(split -> split.getPath().getParent()));
// TODO(vc): Should we handle also non-hoodie splits here?
Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionsToParquetSplits.keySet());
// Create file system cache so metadata table is only instantiated once. Also can benefit normal file listing if
// partition path is listed twice so file groups will already be loaded in file system
Map<HoodieTableMetaClient, HoodieTableFileSystemView> fsCache = new HashMap<>();
// for all unique split parents, obtain all delta files based on delta commit timeline,
// grouped on file id
List<InputSplit> rtSplits = new ArrayList<>();
try {
// Pre process tableConfig from first partition to fetch virtual key info
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty();
if (partitionsToParquetSplits.size() > 0) {
HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionsToParquetSplits.keySet().iterator().next());
hoodieVirtualKeyInfo = getHoodieVirtualKeyInfo(metaClient);
}
Option<HoodieVirtualKeyInfo> finalHoodieVirtualKeyInfo = hoodieVirtualKeyInfo;
partitionsToParquetSplits.keySet().forEach(partitionPath -> {
// for each partition path obtain the data & log file groupings, then map back to inputsplits
HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
if (!fsCache.containsKey(metaClient)) {
HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(conf);
HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext,
metaClient, HoodieInputFormatUtils.buildMetadataConfig(conf), metaClient.getActiveTimeline());
fsCache.put(metaClient, fsView);
}
HoodieTableFileSystemView fsView = fsCache.get(metaClient);
String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
// Both commit and delta-commits are included - pick the latest completed one
Option<HoodieInstant> latestCompletedInstant =
metaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant();
Stream<FileSlice> latestFileSlices = latestCompletedInstant
.map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp()))
.orElse(Stream.empty());
// subgroup splits again by file id & match with log files.
Map<String, List<FileSplit>> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream()
.collect(Collectors.groupingBy(split -> FSUtils.getFileIdFromFilePath(split.getPath())));
// Get the maxCommit from the last delta or compaction or commit - when bootstrapped from COW table
String maxCommitTime = metaClient.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp();
latestFileSlices.forEach(fileSlice -> {
List<FileSplit> dataFileSplits = groupedInputSplits.getOrDefault(fileSlice.getFileId(), new ArrayList<>());
dataFileSplits.forEach(split -> {
try {
List<HoodieLogFile> logFiles = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
.collect(Collectors.toList());
if (split instanceof BootstrapBaseFileSplit) {
BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) split;
rtSplits.add(createRealtimeBoostrapBaseFileSplit(eSplit, metaClient.getBasePath(), logFiles, maxCommitTime, false));
} else {
rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFiles, maxCommitTime, finalHoodieVirtualKeyInfo));
}
} catch (IOException e) {
throw new HoodieIOException("Error creating hoodie real time split ", e);
}
});
});
});
} catch (Exception e) {
throw new HoodieException("Error obtaining data file/log file grouping ", e);
} finally {
// close all the open fs views.
fsCache.forEach((k, view) -> view.close());
}
LOG.info("Returning a total splits of " + rtSplits.size());
return rtSplits.toArray(new InputSplit[0]);
}
/**
* @deprecated will be replaced w/ {@link #getRealtimeSplits(Configuration, List)}
*/
// get IncrementalRealtimeSplits
public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, List<FileSplit> fileSplits) throws IOException {
checkState(fileSplits.stream().allMatch(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery),
"All splits have to belong to incremental query");
List<InputSplit> rtSplits = new ArrayList<>();
Set<Path> partitionSet = fileSplits.stream().map(f -> f.getPath().getParent()).collect(Collectors.toSet());
Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionSet);
// Pre process tableConfig from first partition to fetch virtual key info
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty();
if (partitionSet.size() > 0) {
hoodieVirtualKeyInfo = getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next()));
}
Option<HoodieVirtualKeyInfo> finalHoodieVirtualKeyInfo = hoodieVirtualKeyInfo;
fileSplits.stream().forEach(s -> {
// deal with incremental query.
try {
if (s instanceof BaseFileWithLogsSplit) {
BaseFileWithLogsSplit bs = unsafeCast(s);
rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
} else if (s instanceof RealtimeBootstrapBaseFileSplit) {
rtSplits.add(s);
}
} catch (IOException e) {
throw new HoodieIOException("Error creating hoodie real time split ", e);
}
});
LOG.info("Returning a total splits of " + rtSplits.size());
return rtSplits.toArray(new InputSplit[0]);
}
public static Option<HoodieVirtualKeyInfo> getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) {
HoodieTableConfig tableConfig = metaClient.getTableConfig();
if (!tableConfig.populateMetaFields()) {
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
try {
MessageType parquetSchema = tableSchemaResolver.getTableParquetSchema();
return Option.of(new HoodieVirtualKeyInfo(tableConfig.getRecordKeyFieldProp(),
tableConfig.getPartitionFieldProp(), parquetSchema.getFieldIndex(tableConfig.getRecordKeyFieldProp()),
parquetSchema.getFieldIndex(tableConfig.getPartitionFieldProp())));
} catch (Exception exception) {
throw new HoodieException("Fetching table schema failed with exception ", exception);
}
}
return Option.empty();
}
private static boolean doesBelongToIncrementalQuery(FileSplit s) {
if (s instanceof BaseFileWithLogsSplit) {
BaseFileWithLogsSplit bs = unsafeCast(s);
public static boolean doesBelongToIncrementalQuery(FileSplit s) {
if (s instanceof HoodieRealtimeFileSplit) {
HoodieRealtimeFileSplit bs = unsafeCast(s);
return bs.getBelongsToIncrementalQuery();
} else if (s instanceof RealtimeBootstrapBaseFileSplit) {
RealtimeBootstrapBaseFileSplit bs = unsafeCast(s);
} else if (s instanceof HoodieRealtimeBootstrapBaseFileSplit) {
HoodieRealtimeBootstrapBaseFileSplit bs = unsafeCast(s);
return bs.getBelongsToIncrementalQuery();
}
return false;
}
public static boolean isIncrementalQuerySplits(List<FileSplit> fileSplits) {
if (fileSplits == null || fileSplits.size() == 0) {
return false;
}
return fileSplits.stream().anyMatch(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery);
}
public static RealtimeBootstrapBaseFileSplit createRealtimeBoostrapBaseFileSplit(BootstrapBaseFileSplit split,
String basePath,
List<HoodieLogFile> logFiles,
String maxInstantTime,
boolean belongsToIncrementalQuery) {
try {
String[] hosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo())
.filter(x -> !x.isInMemory()).toArray(String[]::new) : new String[0];
String[] inMemoryHosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo())
.filter(SplitLocationInfo::isInMemory).toArray(String[]::new) : new String[0];
FileSplit baseSplit = new FileSplit(split.getPath(), split.getStart(), split.getLength(),
hosts, inMemoryHosts);
return new RealtimeBootstrapBaseFileSplit(baseSplit, basePath, logFiles, maxInstantTime, split.getBootstrapFileSplit(), belongsToIncrementalQuery);
} catch (IOException e) {
throw new HoodieIOException("Error creating hoodie real time split ", e);
}
}
// Return parquet file with a list of log files in the same file group.
public static List<Pair<Option<HoodieBaseFile>, List<String>>> groupLogsByBaseFile(Configuration conf, List<Path> partitionPaths) {
Set<Path> partitionSet = new HashSet<>(partitionPaths);
@@ -382,7 +159,7 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
public static boolean canAddProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf jobConf) {
return jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null
|| (!realtimeSplit.getDeltaLogPaths().isEmpty() && !HoodieRealtimeInputFormatUtils.requiredProjectionFieldsExistInConf(jobConf, realtimeSplit.getHoodieVirtualKeyInfo()));
|| (!realtimeSplit.getDeltaLogPaths().isEmpty() && !HoodieRealtimeInputFormatUtils.requiredProjectionFieldsExistInConf(jobConf, realtimeSplit.getVirtualKeyInfo()));
}
/**
@@ -400,18 +177,4 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
}
}
}
private static HoodieRealtimeFileSplit createHoodieRealtimeSplitUnchecked(BaseFileWithLogsSplit baseFileWithLogsSplit,
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfoOpt) {
try {
return new HoodieRealtimeFileSplit(
baseFileWithLogsSplit,
baseFileWithLogsSplit.getBasePath(),
baseFileWithLogsSplit.getDeltaLogFiles(),
baseFileWithLogsSplit.getMaxCommitTime(),
hoodieVirtualKeyInfoOpt);
} catch (IOException e) {
throw new HoodieIOException(String.format("Failed to init %s", HoodieRealtimeFileSplit.class.getSimpleName()), e);
}
}
}

View File

@@ -18,6 +18,15 @@
package org.apache.hudi.hadoop;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -34,16 +43,6 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
@@ -203,11 +202,11 @@ public class TestHoodieParquetInputFormat {
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
InputFormatTestUtil.setupSnapshotIncludePendingCommits(jobConf, "1");
Exception exception = assertThrows(HoodieIOException.class, () -> inputFormat.listStatus(jobConf));
assertEquals("Valid timestamp is required for hoodie.%s.consume.commit in snapshot mode", exception.getMessage());
assertEquals("Query instant (1) not found in the timeline", exception.getMessage());
InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, "1");
exception = assertThrows(HoodieIOException.class, () -> inputFormat.listStatus(jobConf));
assertEquals("Valid timestamp is required for hoodie.%s.consume.commit in snapshot mode", exception.getMessage());
assertEquals("Query instant (1) not found in the timeline", exception.getMessage());
}
@Test

View File

@@ -18,12 +18,11 @@
package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -72,7 +71,7 @@ public class TestHoodieRealtimeFileSplit {
baseFileSplit = new FileSplit(new Path(fileSplitName), 0, 100, new String[] {});
maxCommitTime = "10001";
split = new HoodieRealtimeFileSplit(baseFileSplit, basePath, deltaLogFiles, maxCommitTime, Option.empty());
split = new HoodieRealtimeFileSplit(baseFileSplit, basePath, deltaLogFiles, maxCommitTime, false, Option.empty());
}
@Test

View File

@@ -18,41 +18,10 @@
package org.apache.hudi.hadoop.realtime;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.BaseFileWithLogsSplit;
import org.apache.hudi.hadoop.PathWithLogFilePath;
import org.apache.hudi.hadoop.RealtimeFileStatus;
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -68,8 +37,36 @@ import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.RealtimeFileStatus;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -84,12 +81,12 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -229,7 +226,9 @@ public class TestHoodieRealtimeRecordReader {
new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + baseInstant + ".parquet"), 0, 1, baseJobConf),
basePath.toUri().toString(), fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
.collect(Collectors.toList()),
instantTime, Option.empty());
instantTime,
false,
Option.empty());
// create a RecordReader to be used by HoodieRealtimeRecordReader
RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader(
@@ -309,7 +308,7 @@ public class TestHoodieRealtimeRecordReader {
// create a split with baseFile (parquet file written earlier) and new log file(s)
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf),
basePath.toUri().toString(), Collections.singletonList(writer.getLogFile()), newCommitTime, Option.empty());
basePath.toUri().toString(), Collections.singletonList(writer.getLogFile()), newCommitTime, false, Option.empty());
// create a RecordReader to be used by HoodieRealtimeRecordReader
RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader(
@@ -388,7 +387,7 @@ public class TestHoodieRealtimeRecordReader {
// create a split with baseFile (parquet file written earlier) and new log file(s)
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf),
basePath.toUri().toString(), Collections.singletonList(writer.getLogFile()), newCommitTime, Option.empty());
basePath.toUri().toString(), Collections.singletonList(writer.getLogFile()), newCommitTime, false, Option.empty());
// create a RecordReader to be used by HoodieRealtimeRecordReader
RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader(
@@ -535,7 +534,7 @@ public class TestHoodieRealtimeRecordReader {
// create a split with baseFile (parquet file written earlier) and new log file(s)
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
new FileSplit(new Path(partitionDir + "/fileid0_1_" + instantTime + ".parquet"), 0, 1, baseJobConf),
basePath.toUri().toString(), logFiles, newCommitTime, Option.empty());
basePath.toUri().toString(), logFiles, newCommitTime, false, Option.empty());
// create a RecordReader to be used by HoodieRealtimeRecordReader
RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader(
@@ -615,7 +614,7 @@ public class TestHoodieRealtimeRecordReader {
HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat();
inputFormat.setConf(baseJobConf);
InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1);
assertTrue(splits.length == 1);
assertEquals(1, splits.length);
JobConf newJobConf = new JobConf(baseJobConf);
List<Schema.Field> fields = schema.getFields();
setHiveColumnNameProps(fields, newJobConf, false);
@@ -769,13 +768,16 @@ public class TestHoodieRealtimeRecordReader {
FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime);
// create a split with new log file(s)
fileSlice.addLogFile(new HoodieLogFile(writer.getLogFile().getPath(), size));
RealtimeFileStatus realtimeFileStatus = new RealtimeFileStatus(new FileStatus(writer.getLogFile().getFileSize(), false, 1, 1, 0, writer.getLogFile().getPath()));
RealtimeFileStatus realtimeFileStatus = new RealtimeFileStatus(
new FileStatus(writer.getLogFile().getFileSize(), false, 1, 1, 0, writer.getLogFile().getPath()),
basePath.toString(),
fileSlice.getLogFiles().collect(Collectors.toList()),
false,
Option.empty());
realtimeFileStatus.setMaxCommitTime(instantTime);
realtimeFileStatus.setBasePath(basePath.toString());
realtimeFileStatus.setDeltaLogFiles(fileSlice.getLogFiles().collect(Collectors.toList()));
PathWithLogFilePath pathWithLogFileStatus = (PathWithLogFilePath) realtimeFileStatus.getPath();
BaseFileWithLogsSplit bs = pathWithLogFileStatus.buildSplit(pathWithLogFileStatus, 0, 0, new String[] {""});
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogFiles(), bs.getMaxCommitTime(), Option.empty());
HoodieRealtimePath realtimePath = (HoodieRealtimePath) realtimeFileStatus.getPath();
HoodieRealtimeFileSplit split =
new HoodieRealtimeFileSplit(new FileSplit(realtimePath, 0, 0, new String[] {""}), realtimePath);
JobConf newJobConf = new JobConf(baseJobConf);
List<Schema.Field> fields = schema.getFields();

View File

@@ -17,24 +17,17 @@
package org.apache.hudi
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForCommits
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getCommitMetadata
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getWritePartitionPaths
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import org.apache.hadoop.fs.{GlobPattern, Path}
import org.apache.hadoop.mapred.JobConf
import org.apache.log4j.LogManager
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.{getCommitMetadata, getWritePartitionPaths, listAffectedFilesForCommits}
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}
@@ -167,7 +160,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = {
val metadataList = commitsToReturn.map(instant => getCommitMetadata(instant, commitsTimelineToReturn))
val affectedFileStatus = listAffectedFilesForCommits(new Path(metaClient.getBasePath), metadataList)
val affectedFileStatus = listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), metadataList)
val fsView = new HoodieTableFileSystemView(metaClient, commitsTimelineToReturn, affectedFileStatus)
// Iterate partitions to create splits

View File

@@ -65,6 +65,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
queryPaths.asJava,
toJavaOption(specifiedQueryInstant),
false,
false,
SparkHoodieTableFileIndex.adapt(fileStatusCache)
)
with SparkAdapterSupport