1
0

[HUDI-1681] Support object storage for Flink writer (#2662)

In order to support object storage, we need these changes:

* Use the Hadoop filesystem so that we can find the plugin filesystem
* Do not fetch file size until the file handle is closed
* Do not close the opened filesystem because we want to use the
  filesystem cache
This commit is contained in:
Danny Chan
2021-03-12 16:39:24 +08:00
committed by GitHub
parent e8e6708aea
commit 20786ab8a2
18 changed files with 443 additions and 164 deletions

View File

@@ -274,6 +274,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
if (!stat.getLogFiles().contains(result.logFile().getFileName())) {
stat.addLogFiles(result.logFile().getFileName());
}
stat.setFileSizeInBytes(result.size());
}
private void updateRuntimeStats(HoodieDeltaWriteStat stat) {
@@ -304,6 +305,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
} else if (stat.getPath().endsWith(result.logFile().getFileName())) {
// append/continued writing to the same log file
stat.setLogOffset(Math.min(stat.getLogOffset(), result.offset()));
stat.setFileSizeInBytes(stat.getFileSizeInBytes() + result.size());
accumulateWriteCounts(stat, result);
accumulateRuntimeStats(stat);
} else {

View File

@@ -33,4 +33,6 @@ public interface HoodieFileWriter<R extends IndexedRecord> {
void close() throws IOException;
void writeAvro(String key, R oldRecord) throws IOException;
long getBytesWritten();
}

View File

@@ -156,4 +156,9 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR
writer.close();
writer = null;
}
@Override
public long getBytesWritten() {
return fs.getBytesWritten(file);
}
}

View File

@@ -94,4 +94,9 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
super.write(object);
writeSupport.add(key);
}
@Override
public long getBytesWritten() {
return fs.getBytesWritten(file);
}
}

View File

@@ -46,6 +46,12 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Parquet -->
<dependency>

View File

@@ -29,7 +29,6 @@ import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.hadoop.conf.Configuration;
import java.util.List;
import java.util.Map;
@@ -37,6 +36,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.util.FlinkClientUtil;
import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper;
@@ -50,7 +50,7 @@ public class HoodieFlinkEngineContext extends HoodieEngineContext {
private RuntimeContext runtimeContext;
public HoodieFlinkEngineContext(TaskContextSupplier taskContextSupplier) {
this(new SerializableConfiguration(new Configuration()), taskContextSupplier);
this(new SerializableConfiguration(FlinkClientUtil.getHadoopConf()), taskContextSupplier);
}
public HoodieFlinkEngineContext(SerializableConfiguration hadoopConf, TaskContextSupplier taskContextSupplier) {

View File

@@ -20,14 +20,12 @@ package org.apache.hudi.io;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,14 +96,7 @@ public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O> extends H
needBootStrap = false;
// flush any remaining records to disk
appendDataAndDeleteBlocks(header);
try {
for (WriteStatus status: statuses) {
long logFileSize = FSUtils.getFileSize(fs, new Path(config.getBasePath(), status.getStat().getPath()));
status.getStat().setFileSizeInBytes(logFileSize);
}
} catch (IOException e) {
throw new HoodieUpsertException("Failed to get file size for append handle", e);
}
// need to fix that the incremental write size in bytes may be lost
List<WriteStatus> ret = new ArrayList<>(statuses);
statuses.clear();
return ret;

View File

@@ -20,7 +20,6 @@ package org.apache.hudi.io;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -109,7 +108,7 @@ public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
* @throws IOException if error occurs
*/
private void setUpWriteStatus() throws IOException {
long fileSizeInBytes = FSUtils.getFileSize(fs, path);
long fileSizeInBytes = fileWriter.getBytesWritten();
long incFileSizeInBytes = fileSizeInBytes - lastFileSize;
this.lastFileSize = fileSizeInBytes;
HoodieWriteStat stat = new HoodieWriteStat();

View File

@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.util;
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.Path;
import java.io.File;
/**
* Utilities for Hoodie Flink client.
*/
public class FlinkClientUtil {
/**
* Returns the hadoop configuration with possible hadoop conf paths.
* E.G. the configurations under path $HADOOP_CONF_DIR and $HADOOP_HOME.
*/
public static org.apache.hadoop.conf.Configuration getHadoopConf() {
// create hadoop configuration with hadoop conf directory configured.
org.apache.hadoop.conf.Configuration hadoopConf = null;
for (String possibleHadoopConfPath : HadoopUtils.possibleHadoopConfPaths(new Configuration())) {
hadoopConf = getHadoopConfiguration(possibleHadoopConfPath);
if (hadoopConf != null) {
break;
}
}
if (hadoopConf == null) {
hadoopConf = new org.apache.hadoop.conf.Configuration();
}
return hadoopConf;
}
/**
* Returns a new Hadoop Configuration object using the path to the hadoop conf configured.
*
* @param hadoopConfDir Hadoop conf directory path.
* @return A Hadoop configuration instance.
*/
private static org.apache.hadoop.conf.Configuration getHadoopConfiguration(String hadoopConfDir) {
if (new File(hadoopConfDir).exists()) {
org.apache.hadoop.conf.Configuration hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
File coreSite = new File(hadoopConfDir, "core-site.xml");
if (coreSite.exists()) {
hadoopConfiguration.addResource(new Path(coreSite.getAbsolutePath()));
}
File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml");
if (hdfsSite.exists()) {
hadoopConfiguration.addResource(new Path(hdfsSite.getAbsolutePath()));
}
File yarnSite = new File(hadoopConfDir, "yarn-site.xml");
if (yarnSite.exists()) {
hadoopConfiguration.addResource(new Path(yarnSite.getAbsolutePath()));
}
// Add mapred-site.xml. We need to read configurations like compression codec.
File mapredSite = new File(hadoopConfDir, "mapred-site.xml");
if (mapredSite.exists()) {
hadoopConfiguration.addResource(new Path(mapredSite.getAbsolutePath()));
}
return hadoopConfiguration;
}
return null;
}
}

View File

@@ -24,7 +24,6 @@ import org.apache.hudi.source.HoodieTableSource;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.RowData;
@@ -35,6 +34,7 @@ import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.hadoop.fs.Path;
import java.util.Collections;
import java.util.HashMap;

View File

@@ -49,7 +49,6 @@ import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.CollectionInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -70,6 +69,7 @@ import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -166,7 +166,7 @@ public class HoodieTableSource implements
(TypeInformation<RowData>) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction(
conf, path, metaClient, maxCompactionMemoryInBytes);
conf, FilePathUtils.toFlinkPath(path), metaClient, maxCompactionMemoryInBytes);
OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) getInputFormat(true));
SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, "streaming_source")
.setParallelism(1)
@@ -213,7 +213,8 @@ public class HoodieTableSource implements
@Override
public List<Map<String, String>> getPartitions() {
return FilePathUtils.getPartitions(path, conf, partitionKeys, defaultPartName);
return FilePathUtils.getPartitions(path, hadoopConf, partitionKeys, defaultPartName,
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION));
}
@Override
@@ -253,7 +254,8 @@ public class HoodieTableSource implements
private List<MergeOnReadInputSplit> buildFileIndex(Path[] paths) {
FileStatus[] fileStatuses = Arrays.stream(paths)
.flatMap(path -> Arrays.stream(FilePathUtils.getHadoopFileStatusRecursively(path, 1, hadoopConf)))
.flatMap(path ->
Arrays.stream(FilePathUtils.getFileStatusRecursively(path, 1, hadoopConf)))
.toArray(FileStatus[]::new);
if (fileStatuses.length == 0) {
throw new HoodieException("No files found for reading in user provided path.");
@@ -281,9 +283,7 @@ public class HoodieTableSource implements
} else {
// all the files are logs
return Arrays.stream(paths).map(partitionPath -> {
String relPartitionPath = FSUtils.getRelativePartitionPath(
new org.apache.hadoop.fs.Path(path.toUri()),
new org.apache.hadoop.fs.Path(partitionPath.toUri()));
String relPartitionPath = FSUtils.getRelativePartitionPath(path, partitionPath);
return fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, latestCommit)
.map(fileSlice -> {
Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
@@ -351,14 +351,14 @@ public class HoodieTableSource implements
inputSplits);
return new MergeOnReadInputFormat(
this.conf,
paths,
FilePathUtils.toFlinkPaths(paths),
hoodieTableState,
rowDataType.getChildren(), // use the explicit fields data type because the AvroSchemaConverter is not very stable.
"default",
this.limit);
case COPY_ON_WRITE:
FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
paths,
FilePathUtils.toFlinkPaths(paths),
this.schema.getFieldNames(),
this.schema.getFieldDataTypes(),
this.requiredPos,
@@ -398,7 +398,8 @@ public class HoodieTableSource implements
public Path[] getReadPaths() {
return partitionKeys.isEmpty()
? new Path[] {path}
: FilePathUtils.partitionPath2ReadPath(path, conf, partitionKeys, getOrFetchPartitions());
: FilePathUtils.partitionPath2ReadPath(path, partitionKeys, getOrFetchPartitions(),
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION));
}
private static class LatestFileFilter extends FilePathFilter {
@@ -409,8 +410,8 @@ public class HoodieTableSource implements
}
@Override
public boolean filterPath(Path filePath) {
return !this.hoodieFilter.accept(new org.apache.hadoop.fs.Path(filePath.toUri()));
public boolean filterPath(org.apache.flink.core.fs.Path filePath) {
return !this.hoodieFilter.accept(new Path(filePath.toUri()));
}
}
}

View File

@@ -22,14 +22,16 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.operator.FlinkOptions;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.TableException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -77,14 +79,14 @@ public class FilePathUtils {
/**
* Make partition path from partition spec.
*
* @param partitionKVs The partition key value mapping
* @param hiveStylePartition Whether the partition path is with Hive style,
* e.g. {partition key} = {partition value}
* @param partitionKVs The partition key value mapping
* @param hivePartition Whether the partition path is with Hive style,
* e.g. {partition key} = {partition value}
* @return an escaped, valid partition name
*/
public static String generatePartitionPath(
LinkedHashMap<String, String> partitionKVs,
boolean hiveStylePartition) {
boolean hivePartition) {
if (partitionKVs.isEmpty()) {
return "";
}
@@ -92,16 +94,16 @@ public class FilePathUtils {
int i = 0;
for (Map.Entry<String, String> e : partitionKVs.entrySet()) {
if (i > 0) {
suffixBuf.append(Path.SEPARATOR);
suffixBuf.append(File.separator);
}
if (hiveStylePartition) {
if (hivePartition) {
suffixBuf.append(escapePathName(e.getKey()));
suffixBuf.append('=');
}
suffixBuf.append(escapePathName(e.getValue()));
i++;
}
suffixBuf.append(Path.SEPARATOR);
suffixBuf.append(File.separator);
return suffixBuf.toString();
}
@@ -235,7 +237,11 @@ public class FilePathUtils {
return ret;
}
private static FileStatus[] getFileStatusRecursively(Path path, int expectLevel, FileSystem fs) {
public static FileStatus[] getFileStatusRecursively(Path path, int expectLevel, Configuration conf) {
return getFileStatusRecursively(path, expectLevel, FSUtils.getFs(path.toString(), conf));
}
public static FileStatus[] getFileStatusRecursively(Path path, int expectLevel, FileSystem fs) {
ArrayList<FileStatus> result = new ArrayList<>();
try {
@@ -267,54 +273,6 @@ public class FilePathUtils {
}
private static boolean isHiddenFile(FileStatus fileStatus) {
String name = fileStatus.getPath().getName();
return name.startsWith("_") || name.startsWith(".");
}
/**
* Same as getFileStatusRecursively but returns hadoop {@link org.apache.hadoop.fs.FileStatus}s.
*/
public static org.apache.hadoop.fs.FileStatus[] getHadoopFileStatusRecursively(
Path path, int expectLevel, Configuration hadoopConf) {
ArrayList<org.apache.hadoop.fs.FileStatus> result = new ArrayList<>();
org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(path.toUri());
org.apache.hadoop.fs.FileSystem fs = FSUtils.getFs(path.getPath(), hadoopConf);
try {
org.apache.hadoop.fs.FileStatus fileStatus = fs.getFileStatus(hadoopPath);
listStatusRecursivelyV2(fs, fileStatus, 0, expectLevel, result);
} catch (IOException ignore) {
return new org.apache.hadoop.fs.FileStatus[0];
}
return result.toArray(new org.apache.hadoop.fs.FileStatus[0]);
}
private static void listStatusRecursivelyV2(
org.apache.hadoop.fs.FileSystem fs,
org.apache.hadoop.fs.FileStatus fileStatus,
int level,
int expectLevel,
List<org.apache.hadoop.fs.FileStatus> results) throws IOException {
if (isHiddenFileV2(fileStatus)) {
// do nothing
return;
}
if (expectLevel == level) {
results.add(fileStatus);
return;
}
if (fileStatus.isDirectory()) {
for (org.apache.hadoop.fs.FileStatus stat : fs.listStatus(fileStatus.getPath())) {
listStatusRecursivelyV2(fs, stat, level + 1, expectLevel, results);
}
}
}
private static boolean isHiddenFileV2(org.apache.hadoop.fs.FileStatus fileStatus) {
String name = fileStatus.getPath().getName();
// the log files is hidden file
return name.startsWith("_") || name.startsWith(".") && !name.contains(".log.");
@@ -333,21 +291,23 @@ public class FilePathUtils {
* <p>The return list should be [{key1:val1, key2:val2, key3:val3}, {key1:val4, key2:val5, key3:val6}].
*
* @param path The base path
* @param conf The configuration
* @param hadoopConf The hadoop configuration
* @param partitionKeys The partition key list
* @param defaultParName The default partition name for nulls
* @param hivePartition Whether the partition path is in Hive style
*/
public static List<Map<String, String>> getPartitions(
Path path,
org.apache.flink.configuration.Configuration conf,
Configuration hadoopConf,
List<String> partitionKeys,
String defaultParName) {
String defaultParName,
boolean hivePartition) {
try {
return FilePathUtils
.searchPartKeyValueAndPaths(
path.getFileSystem(),
FSUtils.getFs(path.toString(), hadoopConf),
path,
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION),
hivePartition,
partitionKeys.toArray(new String[0]))
.stream()
.map(tuple2 -> tuple2.f0)
@@ -386,21 +346,23 @@ public class FilePathUtils {
* Returns all the file paths that is the parents of the data files.
*
* @param path The base path
* @param conf The configuration
* @param conf The Flink configuration
* @param hadoopConf The hadoop configuration
* @param partitionKeys The partition key list
* @param defaultParName The default partition name for nulls
*/
public static Path[] getReadPaths(
Path path,
org.apache.flink.configuration.Configuration conf,
List<String> partitionKeys,
String defaultParName) {
Configuration hadoopConf,
List<String> partitionKeys) {
if (partitionKeys.isEmpty()) {
return new Path[] {path};
} else {
final String defaultParName = conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME);
final boolean hivePartition = conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION);
List<Map<String, String>> partitionPaths =
getPartitions(path, conf, partitionKeys, defaultParName);
return partitionPath2ReadPath(path, conf, partitionKeys, partitionPaths);
getPartitions(path, hadoopConf, partitionKeys, defaultParName, hivePartition);
return partitionPath2ReadPath(path, partitionKeys, partitionPaths, hivePartition);
}
}
@@ -408,21 +370,37 @@ public class FilePathUtils {
* Transforms the given partition key value mapping to read paths.
*
* @param path The base path
* @param conf The hadoop configuration
* @param partitionKeys The partition key list
* @param partitionPaths The partition key value mapping
* @param hivePartition Whether the partition path is in Hive style
*
* @see #getReadPaths
*/
public static Path[] partitionPath2ReadPath(
Path path,
org.apache.flink.configuration.Configuration conf,
List<String> partitionKeys,
List<Map<String, String>> partitionPaths) {
List<Map<String, String>> partitionPaths,
boolean hivePartition) {
return partitionPaths.stream()
.map(m -> validateAndReorderPartitions(m, partitionKeys))
.map(kvs -> FilePathUtils.generatePartitionPath(kvs, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION)))
.map(kvs -> FilePathUtils.generatePartitionPath(kvs, hivePartition))
.map(n -> new Path(path, n))
.toArray(Path[]::new);
}
/**
* Transforms the array of Hadoop paths to Flink paths.
*/
public static org.apache.flink.core.fs.Path[] toFlinkPaths(Path[] paths) {
return Arrays.stream(paths)
.map(p -> toFlinkPath(p))
.toArray(org.apache.flink.core.fs.Path[]::new);
}
/**
* Transforms the Hadoop path to Flink path.
*/
public static org.apache.flink.core.fs.Path toFlinkPath(Path path) {
return new org.apache.flink.core.fs.Path(path.toUri());
}
}

View File

@@ -18,7 +18,12 @@
package org.apache.hudi.source.format.cow;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.io.GlobFilePathFilter;
import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.utils.SerializableConfiguration;
@@ -26,11 +31,19 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE;
import static org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType;
@@ -43,11 +56,16 @@ import static org.apache.flink.table.filesystem.RowPartitionComputer.restorePart
* {@code org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory.ParquetInputFormat}
* to support TIMESTAMP_MILLIS.
*
* <p>Note: Override the {@link #createInputSplits} method from parent to rewrite the logic creating the FileSystem,
* use {@link FSUtils#getFs} to get a plugin filesystem.
*
* @see ParquetSplitReaderUtil
*/
public class CopyOnWriteInputFormat extends FileInputFormat<RowData> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteInputFormat.class);
private final String[] fullFieldNames;
private final DataType[] fullFieldTypes;
private final int[] selectedFields;
@@ -59,6 +77,11 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> {
private transient ParquetColumnarRowSplitReader reader;
private transient long currentReadCount;
/**
* Files filter for determining what files/directories should be included.
*/
private FilePathFilter localFilesFilter = new GlobFilePathFilter();
public CopyOnWriteInputFormat(
Path[] paths,
String[] fullFieldNames,
@@ -98,12 +121,138 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> {
partObjects,
selectedFields,
DEFAULT_SIZE,
new Path(fileSplit.getPath().toString()),
fileSplit.getPath(),
fileSplit.getStart(),
fileSplit.getLength());
this.currentReadCount = 0L;
}
@Override
public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
if (minNumSplits < 1) {
throw new IllegalArgumentException("Number of input splits has to be at least 1.");
}
// take the desired number of splits into account
minNumSplits = Math.max(minNumSplits, this.numSplits);
final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>(minNumSplits);
// get all the files that are involved in the splits
List<FileStatus> files = new ArrayList<>();
long totalLength = 0;
for (Path path : getFilePaths()) {
final org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(path.toUri());
final FileSystem fs = FSUtils.getFs(hadoopPath.toString(), this.conf.conf());
final FileStatus pathFile = fs.getFileStatus(hadoopPath);
if (pathFile.isDir()) {
totalLength += addFilesInDir(hadoopPath, files, true);
} else {
testForUnsplittable(pathFile);
files.add(pathFile);
totalLength += pathFile.getLen();
}
}
// returns if unsplittable
if (unsplittable) {
int splitNum = 0;
for (final FileStatus file : files) {
final FileSystem fs = FSUtils.getFs(file.getPath().toString(), this.conf.conf());
final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, file.getLen());
Set<String> hosts = new HashSet<String>();
for (BlockLocation block : blocks) {
hosts.addAll(Arrays.asList(block.getHosts()));
}
long len = file.getLen();
if (testForUnsplittable(file)) {
len = READ_WHOLE_SPLIT_FLAG;
}
FileInputSplit fis = new FileInputSplit(splitNum++, new Path(file.getPath().toUri()), 0, len,
hosts.toArray(new String[hosts.size()]));
inputSplits.add(fis);
}
return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
}
final long maxSplitSize = totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0 : 1);
// now that we have the files, generate the splits
int splitNum = 0;
for (final FileStatus file : files) {
final FileSystem fs = FSUtils.getFs(file.getPath().toString(), this.conf.conf());
final long len = file.getLen();
final long blockSize = file.getBlockSize();
final long minSplitSize;
if (this.minSplitSize <= blockSize) {
minSplitSize = this.minSplitSize;
} else {
if (LOG.isWarnEnabled()) {
LOG.warn("Minimal split size of " + this.minSplitSize + " is larger than the block size of "
+ blockSize + ". Decreasing minimal split size to block size.");
}
minSplitSize = blockSize;
}
final long splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize));
final long halfSplit = splitSize >>> 1;
final long maxBytesForLastSplit = (long) (splitSize * 1.1f);
if (len > 0) {
// get the block locations and make sure they are in order with respect to their offset
final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, len);
Arrays.sort(blocks);
long bytesUnassigned = len;
long position = 0;
int blockIndex = 0;
while (bytesUnassigned > maxBytesForLastSplit) {
// get the block containing the majority of the data
blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
// create a new split
FileInputSplit fis = new FileInputSplit(splitNum++, new Path(file.getPath().toUri()), position, splitSize,
blocks[blockIndex].getHosts());
inputSplits.add(fis);
// adjust the positions
position += splitSize;
bytesUnassigned -= splitSize;
}
// assign the last split
if (bytesUnassigned > 0) {
blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
final FileInputSplit fis = new FileInputSplit(splitNum++, new Path(file.getPath().toUri()), position,
bytesUnassigned, blocks[blockIndex].getHosts());
inputSplits.add(fis);
}
} else {
// special case with a file of zero bytes size
final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, 0);
String[] hosts;
if (blocks.length > 0) {
hosts = blocks[0].getHosts();
} else {
hosts = new String[0];
}
final FileInputSplit fis = new FileInputSplit(splitNum++, new Path(file.getPath().toUri()), 0, 0, hosts);
inputSplits.add(fis);
}
}
return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
}
@Override
public boolean supportsMultiPaths() {
return true;
@@ -131,4 +280,106 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> {
}
this.reader = null;
}
/**
* Enumerate all files in the directory and recursive if enumerateNestedFiles is true.
*
* @return the total length of accepted files.
*/
private long addFilesInDir(org.apache.hadoop.fs.Path path, List<FileStatus> files, boolean logExcludedFiles)
throws IOException {
final org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(path.toUri());
final FileSystem fs = FSUtils.getFs(hadoopPath.toString(), this.conf.conf());
long length = 0;
for (FileStatus dir : fs.listStatus(hadoopPath)) {
if (dir.isDir()) {
if (acceptFile(dir) && enumerateNestedFiles) {
length += addFilesInDir(dir.getPath(), files, logExcludedFiles);
} else {
if (logExcludedFiles && LOG.isDebugEnabled()) {
LOG.debug("Directory " + dir.getPath().toString() + " did not pass the file-filter and is excluded.");
}
}
} else {
if (acceptFile(dir)) {
files.add(dir);
length += dir.getLen();
testForUnsplittable(dir);
} else {
if (logExcludedFiles && LOG.isDebugEnabled()) {
LOG.debug("Directory " + dir.getPath().toString() + " did not pass the file-filter and is excluded.");
}
}
}
}
return length;
}
@Override
public void setFilesFilter(FilePathFilter filesFilter) {
this.localFilesFilter = filesFilter;
super.setFilesFilter(filesFilter);
}
/**
* A simple hook to filter files and directories from the input.
* The method may be overridden. Hadoop's FileInputFormat has a similar mechanism and applies the
* same filters by default.
*
* @param fileStatus The file status to check.
* @return true, if the given file or directory is accepted
*/
public boolean acceptFile(FileStatus fileStatus) {
final String name = fileStatus.getPath().getName();
return !name.startsWith("_")
&& !name.startsWith(".")
&& !localFilesFilter.filterPath(new Path(fileStatus.getPath().toUri()));
}
/**
* Retrieves the index of the <tt>BlockLocation</tt> that contains the part of the file described by the given
* offset.
*
* @param blocks The different blocks of the file. Must be ordered by their offset.
* @param offset The offset of the position in the file.
* @param startIndex The earliest index to look at.
* @return The index of the block containing the given position.
*/
private int getBlockIndexForPosition(BlockLocation[] blocks, long offset, long halfSplitSize, int startIndex) {
// go over all indexes after the startIndex
for (int i = startIndex; i < blocks.length; i++) {
long blockStart = blocks[i].getOffset();
long blockEnd = blockStart + blocks[i].getLength();
if (offset >= blockStart && offset < blockEnd) {
// got the block where the split starts
// check if the next block contains more than this one does
if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) {
return i + 1;
} else {
return i;
}
}
}
throw new IllegalArgumentException("The given offset is not contained in the any block.");
}
private boolean testForUnsplittable(FileStatus pathFile) {
if (getInflaterInputStreamFactory(pathFile.getPath()) != null) {
unsplittable = true;
return true;
}
return false;
}
private InflaterInputStreamFactory<?> getInflaterInputStreamFactory(org.apache.hadoop.fs.Path path) {
String fileExtension = extractFileExtension(path.getName());
if (fileExtension != null) {
return getInflaterInputStreamFactory(fileExtension);
} else {
return null;
}
}
}

View File

@@ -26,6 +26,7 @@ import org.apache.hudi.operator.FlinkOptions;
import org.apache.hudi.source.format.FilePathUtils;
import org.apache.hudi.source.format.FormatUtils;
import org.apache.hudi.source.format.cow.ParquetColumnarRowSplitReader;
import org.apache.hudi.source.format.cow.ParquetSplitReaderUtil;
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.RowDataToAvroConverters;
import org.apache.hudi.util.StreamerUtil;
@@ -40,7 +41,6 @@ import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.hudi.source.format.cow.ParquetSplitReaderUtil;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
@@ -238,7 +238,7 @@ public class MergeOnReadInputFormat
private ParquetColumnarRowSplitReader getReader(String path, int[] requiredPos) throws IOException {
// generate partition specs.
LinkedHashMap<String, String> partSpec = FilePathUtils.extractPartitionKeyValues(
new org.apache.flink.core.fs.Path(path).getParent(),
new org.apache.hadoop.fs.Path(path).getParent(),
this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION),
this.conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(","));
LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>();

View File

@@ -42,7 +42,6 @@ import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Preconditions;
@@ -53,7 +52,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.util.List;
@@ -127,50 +125,9 @@ public class StreamerUtil {
return conf;
}
// Keep to avoid to much modifications.
public static org.apache.hadoop.conf.Configuration getHadoopConf() {
// create hadoop configuration with hadoop conf directory configured.
org.apache.hadoop.conf.Configuration hadoopConf = null;
for (String possibleHadoopConfPath : HadoopUtils.possibleHadoopConfPaths(new Configuration())) {
hadoopConf = getHadoopConfiguration(possibleHadoopConfPath);
if (hadoopConf != null) {
break;
}
}
if (hadoopConf == null) {
hadoopConf = new org.apache.hadoop.conf.Configuration();
}
return hadoopConf;
}
/**
* Returns a new Hadoop Configuration object using the path to the hadoop conf configured.
*
* @param hadoopConfDir Hadoop conf directory path.
* @return A Hadoop configuration instance.
*/
private static org.apache.hadoop.conf.Configuration getHadoopConfiguration(String hadoopConfDir) {
if (new File(hadoopConfDir).exists()) {
org.apache.hadoop.conf.Configuration hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
File coreSite = new File(hadoopConfDir, "core-site.xml");
if (coreSite.exists()) {
hadoopConfiguration.addResource(new Path(coreSite.getAbsolutePath()));
}
File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml");
if (hdfsSite.exists()) {
hadoopConfiguration.addResource(new Path(hdfsSite.getAbsolutePath()));
}
File yarnSite = new File(hadoopConfDir, "yarn-site.xml");
if (yarnSite.exists()) {
hadoopConfiguration.addResource(new Path(yarnSite.getAbsolutePath()));
}
// Add mapred-site.xml. We need to read configurations like compression codec.
File mapredSite = new File(hadoopConfDir, "mapred-site.xml");
if (mapredSite.exists()) {
hadoopConfiguration.addResource(new Path(mapredSite.getAbsolutePath()));
}
return hadoopConfiguration;
}
return null;
return FlinkClientUtil.getHadoopConf();
}
/**
@@ -291,21 +248,22 @@ public class StreamerUtil {
final String basePath = conf.getString(FlinkOptions.PATH);
final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
// Hadoop FileSystem
try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) {
if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))) {
HoodieTableMetaClient.withPropertyBuilder()
FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))) {
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(conf.getString(FlinkOptions.TABLE_TYPE))
.setTableName(conf.getString(FlinkOptions.TABLE_NAME))
.setPayloadClassName(conf.getString(FlinkOptions.PAYLOAD_CLASS))
.setArchiveLogFolder(DEFAULT_ARCHIVE_LOG_FOLDER)
.setTimelineLayoutVersion(1)
.initTable(hadoopConf, basePath);
LOG.info("Table initialized under base path {}", basePath);
} else {
LOG.info("Table [{}/{}] already exists, no need to initialize the table",
basePath, conf.getString(FlinkOptions.TABLE_NAME));
}
LOG.info("Table initialized under base path {}", basePath);
} else {
LOG.info("Table [{}/{}] already exists, no need to initialize the table",
basePath, conf.getString(FlinkOptions.TABLE_NAME));
}
// Do not close the filesystem in order to use the CACHE,
// some of the filesystems release the handles in #close method.
}
/** Generates the bucket ID using format {partition path}_{fileID}. */

View File

@@ -28,8 +28,8 @@ import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

View File

@@ -36,7 +36,6 @@ import org.apache.hudi.utils.TestUtils;
import org.apache.avro.Schema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
@@ -48,6 +47,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -242,8 +242,9 @@ public class TestStreamReadOperator {
private OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> createReader() throws Exception {
final String basePath = tempFile.getAbsolutePath();
final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
.setConf(hadoopConf).setBasePath(basePath).build();
final List<String> partitionKeys = Collections.singletonList("partition");
// This input format is used to opening the emitted split.
@@ -262,11 +263,10 @@ public class TestStreamReadOperator {
tableAvroSchema.toString(),
AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(),
Collections.emptyList());
Path[] paths = FilePathUtils.getReadPaths(
new Path(basePath), conf, partitionKeys, conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME));
Path[] paths = FilePathUtils.getReadPaths(new Path(basePath), conf, hadoopConf, partitionKeys);
MergeOnReadInputFormat inputFormat = new MergeOnReadInputFormat(
conf,
paths,
FilePathUtils.toFlinkPaths(paths),
hoodieTableState,
rowDataType.getChildren(),
"default",

View File

@@ -27,9 +27,9 @@ import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;