From 20786ab8a2a1e7735ab846e92802fb9f4449adc9 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Fri, 12 Mar 2021 16:39:24 +0800 Subject: [PATCH] [HUDI-1681] Support object storage for Flink writer (#2662) In order to support object storage, we need these changes: * Use the Hadoop filesystem so that we can find the plugin filesystem * Do not fetch file size until the file handle is closed * Do not close the opened filesystem because we want to use the filesystem cache --- .../apache/hudi/io/HoodieAppendHandle.java | 2 + .../hudi/io/storage/HoodieFileWriter.java | 2 + .../hudi/io/storage/HoodieHFileWriter.java | 5 + .../hudi/io/storage/HoodieParquetWriter.java | 5 + hudi-client/hudi-flink-client/pom.xml | 6 + .../common/HoodieFlinkEngineContext.java | 4 +- .../org/apache/hudi/io/FlinkAppendHandle.java | 11 +- .../org/apache/hudi/io/FlinkCreateHandle.java | 3 +- .../org/apache/hudi/util/FlinkClientUtil.java | 81 ++++++ .../hudi/factory/HoodieTableFactory.java | 2 +- .../apache/hudi/source/HoodieTableSource.java | 25 +- .../hudi/source/format/FilePathUtils.java | 126 ++++----- .../format/cow/CopyOnWriteInputFormat.java | 253 +++++++++++++++++- .../format/mor/MergeOnReadInputFormat.java | 4 +- .../org/apache/hudi/util/StreamerUtil.java | 64 +---- .../hudi/source/TestHoodieTableSource.java | 2 +- .../hudi/source/TestStreamReadOperator.java | 10 +- .../hudi/source/format/TestInputFormat.java | 2 +- 18 files changed, 443 insertions(+), 164 deletions(-) create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 986afe64e..b248a2f68 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -274,6 +274,7 @@ public class HoodieAppendHandle extends if (!stat.getLogFiles().contains(result.logFile().getFileName())) { stat.addLogFiles(result.logFile().getFileName()); } + stat.setFileSizeInBytes(result.size()); } private void updateRuntimeStats(HoodieDeltaWriteStat stat) { @@ -304,6 +305,7 @@ public class HoodieAppendHandle extends } else if (stat.getPath().endsWith(result.logFile().getFileName())) { // append/continued writing to the same log file stat.setLogOffset(Math.min(stat.getLogOffset(), result.offset())); + stat.setFileSizeInBytes(stat.getFileSizeInBytes() + result.size()); accumulateWriteCounts(stat, result); accumulateRuntimeStats(stat); } else { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java index ea9ecad6e..1aaa389ed 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java @@ -33,4 +33,6 @@ public interface HoodieFileWriter { void close() throws IOException; void writeAvro(String key, R oldRecord) throws IOException; + + long getBytesWritten(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java index d77483caa..352c51c73 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java @@ -156,4 +156,9 @@ public class HoodieHFileWriterorg.apache.flink flink-clients_${scala.binary.version} + + org.apache.flink + flink-hadoop-compatibility_${scala.binary.version} + ${flink.version} + provided + diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java index 6c7f44b57..7713c1e63 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java @@ -29,7 +29,6 @@ import org.apache.hudi.common.function.SerializablePairFunction; import org.apache.hudi.common.util.Option; import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.hadoop.conf.Configuration; import java.util.List; import java.util.Map; @@ -37,6 +36,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.util.FlinkClientUtil; import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper; @@ -50,7 +50,7 @@ public class HoodieFlinkEngineContext extends HoodieEngineContext { private RuntimeContext runtimeContext; public HoodieFlinkEngineContext(TaskContextSupplier taskContextSupplier) { - this(new SerializableConfiguration(new Configuration()), taskContextSupplier); + this(new SerializableConfiguration(FlinkClientUtil.getHadoopConf()), taskContextSupplier); } public HoodieFlinkEngineContext(SerializableConfiguration hadoopConf, TaskContextSupplier taskContextSupplier) { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java index 9d56d4770..b827bf232 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java @@ -20,14 +20,12 @@ package org.apache.hudi.io; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.table.HoodieTable; -import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,14 +96,7 @@ public class FlinkAppendHandle extends H needBootStrap = false; // flush any remaining records to disk appendDataAndDeleteBlocks(header); - try { - for (WriteStatus status: statuses) { - long logFileSize = FSUtils.getFileSize(fs, new Path(config.getBasePath(), status.getStat().getPath())); - status.getStat().setFileSizeInBytes(logFileSize); - } - } catch (IOException e) { - throw new HoodieUpsertException("Failed to get file size for append handle", e); - } + // need to fix that the incremental write size in bytes may be lost List ret = new ArrayList<>(statuses); statuses.clear(); return ret; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java index ce3725b01..6f4638e85 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java @@ -20,7 +20,6 @@ package org.apache.hudi.io; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; @@ -109,7 +108,7 @@ public class FlinkCreateHandle * @throws IOException if error occurs */ private void setUpWriteStatus() throws IOException { - long fileSizeInBytes = FSUtils.getFileSize(fs, path); + long fileSizeInBytes = fileWriter.getBytesWritten(); long incFileSizeInBytes = fileSizeInBytes - lastFileSize; this.lastFileSize = fileSizeInBytes; HoodieWriteStat stat = new HoodieWriteStat(); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java new file mode 100644 index 000000000..c38c1f16d --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.hadoop.fs.Path; + +import java.io.File; + +/** + * Utilities for Hoodie Flink client. + */ +public class FlinkClientUtil { + + /** + * Returns the hadoop configuration with possible hadoop conf paths. + * E.G. the configurations under path $HADOOP_CONF_DIR and $HADOOP_HOME. + */ + public static org.apache.hadoop.conf.Configuration getHadoopConf() { + // create hadoop configuration with hadoop conf directory configured. + org.apache.hadoop.conf.Configuration hadoopConf = null; + for (String possibleHadoopConfPath : HadoopUtils.possibleHadoopConfPaths(new Configuration())) { + hadoopConf = getHadoopConfiguration(possibleHadoopConfPath); + if (hadoopConf != null) { + break; + } + } + if (hadoopConf == null) { + hadoopConf = new org.apache.hadoop.conf.Configuration(); + } + return hadoopConf; + } + + /** + * Returns a new Hadoop Configuration object using the path to the hadoop conf configured. + * + * @param hadoopConfDir Hadoop conf directory path. + * @return A Hadoop configuration instance. + */ + private static org.apache.hadoop.conf.Configuration getHadoopConfiguration(String hadoopConfDir) { + if (new File(hadoopConfDir).exists()) { + org.apache.hadoop.conf.Configuration hadoopConfiguration = new org.apache.hadoop.conf.Configuration(); + File coreSite = new File(hadoopConfDir, "core-site.xml"); + if (coreSite.exists()) { + hadoopConfiguration.addResource(new Path(coreSite.getAbsolutePath())); + } + File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml"); + if (hdfsSite.exists()) { + hadoopConfiguration.addResource(new Path(hdfsSite.getAbsolutePath())); + } + File yarnSite = new File(hadoopConfDir, "yarn-site.xml"); + if (yarnSite.exists()) { + hadoopConfiguration.addResource(new Path(yarnSite.getAbsolutePath())); + } + // Add mapred-site.xml. We need to read configurations like compression codec. + File mapredSite = new File(hadoopConfDir, "mapred-site.xml"); + if (mapredSite.exists()) { + hadoopConfiguration.addResource(new Path(mapredSite.getAbsolutePath())); + } + return hadoopConfiguration; + } + return null; + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/factory/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/factory/HoodieTableFactory.java index 4fbe4cdc5..28883ba04 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/factory/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/factory/HoodieTableFactory.java @@ -24,7 +24,6 @@ import org.apache.hudi.source.HoodieTableSource; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.Path; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.data.RowData; @@ -35,6 +34,7 @@ import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sources.TableSource; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.utils.TableSchemaUtils; +import org.apache.hadoop.fs.Path; import java.util.Collections; import java.util.HashMap; diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/source/HoodieTableSource.java index 2a8fbe009..e78c65613 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/HoodieTableSource.java @@ -49,7 +49,6 @@ import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.io.CollectionInputFormat; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -70,6 +69,7 @@ import org.apache.flink.table.sources.TableSource; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -166,7 +166,7 @@ public class HoodieTableSource implements (TypeInformation) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType()); if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) { StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction( - conf, path, metaClient, maxCompactionMemoryInBytes); + conf, FilePathUtils.toFlinkPath(path), metaClient, maxCompactionMemoryInBytes); OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) getInputFormat(true)); SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, "streaming_source") .setParallelism(1) @@ -213,7 +213,8 @@ public class HoodieTableSource implements @Override public List> getPartitions() { - return FilePathUtils.getPartitions(path, conf, partitionKeys, defaultPartName); + return FilePathUtils.getPartitions(path, hadoopConf, partitionKeys, defaultPartName, + conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION)); } @Override @@ -253,7 +254,8 @@ public class HoodieTableSource implements private List buildFileIndex(Path[] paths) { FileStatus[] fileStatuses = Arrays.stream(paths) - .flatMap(path -> Arrays.stream(FilePathUtils.getHadoopFileStatusRecursively(path, 1, hadoopConf))) + .flatMap(path -> + Arrays.stream(FilePathUtils.getFileStatusRecursively(path, 1, hadoopConf))) .toArray(FileStatus[]::new); if (fileStatuses.length == 0) { throw new HoodieException("No files found for reading in user provided path."); @@ -281,9 +283,7 @@ public class HoodieTableSource implements } else { // all the files are logs return Arrays.stream(paths).map(partitionPath -> { - String relPartitionPath = FSUtils.getRelativePartitionPath( - new org.apache.hadoop.fs.Path(path.toUri()), - new org.apache.hadoop.fs.Path(partitionPath.toUri())); + String relPartitionPath = FSUtils.getRelativePartitionPath(path, partitionPath); return fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, latestCommit) .map(fileSlice -> { Option> logPaths = Option.ofNullable(fileSlice.getLogFiles() @@ -351,14 +351,14 @@ public class HoodieTableSource implements inputSplits); return new MergeOnReadInputFormat( this.conf, - paths, + FilePathUtils.toFlinkPaths(paths), hoodieTableState, rowDataType.getChildren(), // use the explicit fields data type because the AvroSchemaConverter is not very stable. "default", this.limit); case COPY_ON_WRITE: FileInputFormat format = new CopyOnWriteInputFormat( - paths, + FilePathUtils.toFlinkPaths(paths), this.schema.getFieldNames(), this.schema.getFieldDataTypes(), this.requiredPos, @@ -398,7 +398,8 @@ public class HoodieTableSource implements public Path[] getReadPaths() { return partitionKeys.isEmpty() ? new Path[] {path} - : FilePathUtils.partitionPath2ReadPath(path, conf, partitionKeys, getOrFetchPartitions()); + : FilePathUtils.partitionPath2ReadPath(path, partitionKeys, getOrFetchPartitions(), + conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION)); } private static class LatestFileFilter extends FilePathFilter { @@ -409,8 +410,8 @@ public class HoodieTableSource implements } @Override - public boolean filterPath(Path filePath) { - return !this.hoodieFilter.accept(new org.apache.hadoop.fs.Path(filePath.toUri())); + public boolean filterPath(org.apache.flink.core.fs.Path filePath) { + return !this.hoodieFilter.accept(new Path(filePath.toUri())); } } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/format/FilePathUtils.java b/hudi-flink/src/main/java/org/apache/hudi/source/format/FilePathUtils.java index 03bf53df8..1b029b225 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/format/FilePathUtils.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/format/FilePathUtils.java @@ -22,14 +22,16 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.operator.FlinkOptions; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.fs.FileStatus; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; import org.apache.flink.table.api.TableException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.BitSet; import java.util.LinkedHashMap; import java.util.List; @@ -77,14 +79,14 @@ public class FilePathUtils { /** * Make partition path from partition spec. * - * @param partitionKVs The partition key value mapping - * @param hiveStylePartition Whether the partition path is with Hive style, - * e.g. {partition key} = {partition value} + * @param partitionKVs The partition key value mapping + * @param hivePartition Whether the partition path is with Hive style, + * e.g. {partition key} = {partition value} * @return an escaped, valid partition name */ public static String generatePartitionPath( LinkedHashMap partitionKVs, - boolean hiveStylePartition) { + boolean hivePartition) { if (partitionKVs.isEmpty()) { return ""; } @@ -92,16 +94,16 @@ public class FilePathUtils { int i = 0; for (Map.Entry e : partitionKVs.entrySet()) { if (i > 0) { - suffixBuf.append(Path.SEPARATOR); + suffixBuf.append(File.separator); } - if (hiveStylePartition) { + if (hivePartition) { suffixBuf.append(escapePathName(e.getKey())); suffixBuf.append('='); } suffixBuf.append(escapePathName(e.getValue())); i++; } - suffixBuf.append(Path.SEPARATOR); + suffixBuf.append(File.separator); return suffixBuf.toString(); } @@ -235,7 +237,11 @@ public class FilePathUtils { return ret; } - private static FileStatus[] getFileStatusRecursively(Path path, int expectLevel, FileSystem fs) { + public static FileStatus[] getFileStatusRecursively(Path path, int expectLevel, Configuration conf) { + return getFileStatusRecursively(path, expectLevel, FSUtils.getFs(path.toString(), conf)); + } + + public static FileStatus[] getFileStatusRecursively(Path path, int expectLevel, FileSystem fs) { ArrayList result = new ArrayList<>(); try { @@ -267,54 +273,6 @@ public class FilePathUtils { } private static boolean isHiddenFile(FileStatus fileStatus) { - String name = fileStatus.getPath().getName(); - return name.startsWith("_") || name.startsWith("."); - } - - /** - * Same as getFileStatusRecursively but returns hadoop {@link org.apache.hadoop.fs.FileStatus}s. - */ - public static org.apache.hadoop.fs.FileStatus[] getHadoopFileStatusRecursively( - Path path, int expectLevel, Configuration hadoopConf) { - ArrayList result = new ArrayList<>(); - - org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(path.toUri()); - org.apache.hadoop.fs.FileSystem fs = FSUtils.getFs(path.getPath(), hadoopConf); - - try { - org.apache.hadoop.fs.FileStatus fileStatus = fs.getFileStatus(hadoopPath); - listStatusRecursivelyV2(fs, fileStatus, 0, expectLevel, result); - } catch (IOException ignore) { - return new org.apache.hadoop.fs.FileStatus[0]; - } - - return result.toArray(new org.apache.hadoop.fs.FileStatus[0]); - } - - private static void listStatusRecursivelyV2( - org.apache.hadoop.fs.FileSystem fs, - org.apache.hadoop.fs.FileStatus fileStatus, - int level, - int expectLevel, - List results) throws IOException { - if (isHiddenFileV2(fileStatus)) { - // do nothing - return; - } - - if (expectLevel == level) { - results.add(fileStatus); - return; - } - - if (fileStatus.isDirectory()) { - for (org.apache.hadoop.fs.FileStatus stat : fs.listStatus(fileStatus.getPath())) { - listStatusRecursivelyV2(fs, stat, level + 1, expectLevel, results); - } - } - } - - private static boolean isHiddenFileV2(org.apache.hadoop.fs.FileStatus fileStatus) { String name = fileStatus.getPath().getName(); // the log files is hidden file return name.startsWith("_") || name.startsWith(".") && !name.contains(".log."); @@ -333,21 +291,23 @@ public class FilePathUtils { *

The return list should be [{key1:val1, key2:val2, key3:val3}, {key1:val4, key2:val5, key3:val6}]. * * @param path The base path - * @param conf The configuration + * @param hadoopConf The hadoop configuration * @param partitionKeys The partition key list * @param defaultParName The default partition name for nulls + * @param hivePartition Whether the partition path is in Hive style */ public static List> getPartitions( Path path, - org.apache.flink.configuration.Configuration conf, + Configuration hadoopConf, List partitionKeys, - String defaultParName) { + String defaultParName, + boolean hivePartition) { try { return FilePathUtils .searchPartKeyValueAndPaths( - path.getFileSystem(), + FSUtils.getFs(path.toString(), hadoopConf), path, - conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION), + hivePartition, partitionKeys.toArray(new String[0])) .stream() .map(tuple2 -> tuple2.f0) @@ -386,21 +346,23 @@ public class FilePathUtils { * Returns all the file paths that is the parents of the data files. * * @param path The base path - * @param conf The configuration + * @param conf The Flink configuration + * @param hadoopConf The hadoop configuration * @param partitionKeys The partition key list - * @param defaultParName The default partition name for nulls */ public static Path[] getReadPaths( Path path, org.apache.flink.configuration.Configuration conf, - List partitionKeys, - String defaultParName) { + Configuration hadoopConf, + List partitionKeys) { if (partitionKeys.isEmpty()) { return new Path[] {path}; } else { + final String defaultParName = conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME); + final boolean hivePartition = conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION); List> partitionPaths = - getPartitions(path, conf, partitionKeys, defaultParName); - return partitionPath2ReadPath(path, conf, partitionKeys, partitionPaths); + getPartitions(path, hadoopConf, partitionKeys, defaultParName, hivePartition); + return partitionPath2ReadPath(path, partitionKeys, partitionPaths, hivePartition); } } @@ -408,21 +370,37 @@ public class FilePathUtils { * Transforms the given partition key value mapping to read paths. * * @param path The base path - * @param conf The hadoop configuration * @param partitionKeys The partition key list * @param partitionPaths The partition key value mapping + * @param hivePartition Whether the partition path is in Hive style * * @see #getReadPaths */ public static Path[] partitionPath2ReadPath( Path path, - org.apache.flink.configuration.Configuration conf, List partitionKeys, - List> partitionPaths) { + List> partitionPaths, + boolean hivePartition) { return partitionPaths.stream() .map(m -> validateAndReorderPartitions(m, partitionKeys)) - .map(kvs -> FilePathUtils.generatePartitionPath(kvs, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION))) + .map(kvs -> FilePathUtils.generatePartitionPath(kvs, hivePartition)) .map(n -> new Path(path, n)) .toArray(Path[]::new); } + + /** + * Transforms the array of Hadoop paths to Flink paths. + */ + public static org.apache.flink.core.fs.Path[] toFlinkPaths(Path[] paths) { + return Arrays.stream(paths) + .map(p -> toFlinkPath(p)) + .toArray(org.apache.flink.core.fs.Path[]::new); + } + + /** + * Transforms the Hadoop path to Flink path. + */ + public static org.apache.flink.core.fs.Path toFlinkPath(Path path) { + return new org.apache.flink.core.fs.Path(path.toUri()); + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/CopyOnWriteInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/CopyOnWriteInputFormat.java index 2a28d85c5..709c32e1d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/CopyOnWriteInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/format/cow/CopyOnWriteInputFormat.java @@ -18,7 +18,12 @@ package org.apache.hudi.source.format.cow; +import org.apache.hudi.common.fs.FSUtils; + import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.FilePathFilter; +import org.apache.flink.api.common.io.GlobFilePathFilter; +import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.parquet.utils.SerializableConfiguration; @@ -26,11 +31,19 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.utils.PartitionPathUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; +import java.util.Set; import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE; import static org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType; @@ -43,11 +56,16 @@ import static org.apache.flink.table.filesystem.RowPartitionComputer.restorePart * {@code org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory.ParquetInputFormat} * to support TIMESTAMP_MILLIS. * + *

Note: Override the {@link #createInputSplits} method from parent to rewrite the logic creating the FileSystem, + * use {@link FSUtils#getFs} to get a plugin filesystem. + * * @see ParquetSplitReaderUtil */ public class CopyOnWriteInputFormat extends FileInputFormat { private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteInputFormat.class); + private final String[] fullFieldNames; private final DataType[] fullFieldTypes; private final int[] selectedFields; @@ -59,6 +77,11 @@ public class CopyOnWriteInputFormat extends FileInputFormat { private transient ParquetColumnarRowSplitReader reader; private transient long currentReadCount; + /** + * Files filter for determining what files/directories should be included. + */ + private FilePathFilter localFilesFilter = new GlobFilePathFilter(); + public CopyOnWriteInputFormat( Path[] paths, String[] fullFieldNames, @@ -98,12 +121,138 @@ public class CopyOnWriteInputFormat extends FileInputFormat { partObjects, selectedFields, DEFAULT_SIZE, - new Path(fileSplit.getPath().toString()), + fileSplit.getPath(), fileSplit.getStart(), fileSplit.getLength()); this.currentReadCount = 0L; } + @Override + public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException { + if (minNumSplits < 1) { + throw new IllegalArgumentException("Number of input splits has to be at least 1."); + } + + // take the desired number of splits into account + minNumSplits = Math.max(minNumSplits, this.numSplits); + + final List inputSplits = new ArrayList(minNumSplits); + + // get all the files that are involved in the splits + List files = new ArrayList<>(); + long totalLength = 0; + + for (Path path : getFilePaths()) { + final org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(path.toUri()); + final FileSystem fs = FSUtils.getFs(hadoopPath.toString(), this.conf.conf()); + final FileStatus pathFile = fs.getFileStatus(hadoopPath); + + if (pathFile.isDir()) { + totalLength += addFilesInDir(hadoopPath, files, true); + } else { + testForUnsplittable(pathFile); + + files.add(pathFile); + totalLength += pathFile.getLen(); + } + } + + // returns if unsplittable + if (unsplittable) { + int splitNum = 0; + for (final FileStatus file : files) { + final FileSystem fs = FSUtils.getFs(file.getPath().toString(), this.conf.conf()); + final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, file.getLen()); + Set hosts = new HashSet(); + for (BlockLocation block : blocks) { + hosts.addAll(Arrays.asList(block.getHosts())); + } + long len = file.getLen(); + if (testForUnsplittable(file)) { + len = READ_WHOLE_SPLIT_FLAG; + } + FileInputSplit fis = new FileInputSplit(splitNum++, new Path(file.getPath().toUri()), 0, len, + hosts.toArray(new String[hosts.size()])); + inputSplits.add(fis); + } + return inputSplits.toArray(new FileInputSplit[inputSplits.size()]); + } + + + final long maxSplitSize = totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0 : 1); + + // now that we have the files, generate the splits + int splitNum = 0; + for (final FileStatus file : files) { + + final FileSystem fs = FSUtils.getFs(file.getPath().toString(), this.conf.conf()); + final long len = file.getLen(); + final long blockSize = file.getBlockSize(); + + final long minSplitSize; + if (this.minSplitSize <= blockSize) { + minSplitSize = this.minSplitSize; + } else { + if (LOG.isWarnEnabled()) { + LOG.warn("Minimal split size of " + this.minSplitSize + " is larger than the block size of " + + blockSize + ". Decreasing minimal split size to block size."); + } + minSplitSize = blockSize; + } + + final long splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize)); + final long halfSplit = splitSize >>> 1; + + final long maxBytesForLastSplit = (long) (splitSize * 1.1f); + + if (len > 0) { + + // get the block locations and make sure they are in order with respect to their offset + final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, len); + Arrays.sort(blocks); + + long bytesUnassigned = len; + long position = 0; + + int blockIndex = 0; + + while (bytesUnassigned > maxBytesForLastSplit) { + // get the block containing the majority of the data + blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex); + // create a new split + FileInputSplit fis = new FileInputSplit(splitNum++, new Path(file.getPath().toUri()), position, splitSize, + blocks[blockIndex].getHosts()); + inputSplits.add(fis); + + // adjust the positions + position += splitSize; + bytesUnassigned -= splitSize; + } + + // assign the last split + if (bytesUnassigned > 0) { + blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex); + final FileInputSplit fis = new FileInputSplit(splitNum++, new Path(file.getPath().toUri()), position, + bytesUnassigned, blocks[blockIndex].getHosts()); + inputSplits.add(fis); + } + } else { + // special case with a file of zero bytes size + final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, 0); + String[] hosts; + if (blocks.length > 0) { + hosts = blocks[0].getHosts(); + } else { + hosts = new String[0]; + } + final FileInputSplit fis = new FileInputSplit(splitNum++, new Path(file.getPath().toUri()), 0, 0, hosts); + inputSplits.add(fis); + } + } + + return inputSplits.toArray(new FileInputSplit[inputSplits.size()]); + } + @Override public boolean supportsMultiPaths() { return true; @@ -131,4 +280,106 @@ public class CopyOnWriteInputFormat extends FileInputFormat { } this.reader = null; } + + /** + * Enumerate all files in the directory and recursive if enumerateNestedFiles is true. + * + * @return the total length of accepted files. + */ + private long addFilesInDir(org.apache.hadoop.fs.Path path, List files, boolean logExcludedFiles) + throws IOException { + final org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(path.toUri()); + final FileSystem fs = FSUtils.getFs(hadoopPath.toString(), this.conf.conf()); + + long length = 0; + + for (FileStatus dir : fs.listStatus(hadoopPath)) { + if (dir.isDir()) { + if (acceptFile(dir) && enumerateNestedFiles) { + length += addFilesInDir(dir.getPath(), files, logExcludedFiles); + } else { + if (logExcludedFiles && LOG.isDebugEnabled()) { + LOG.debug("Directory " + dir.getPath().toString() + " did not pass the file-filter and is excluded."); + } + } + } else { + if (acceptFile(dir)) { + files.add(dir); + length += dir.getLen(); + testForUnsplittable(dir); + } else { + if (logExcludedFiles && LOG.isDebugEnabled()) { + LOG.debug("Directory " + dir.getPath().toString() + " did not pass the file-filter and is excluded."); + } + } + } + } + return length; + } + + @Override + public void setFilesFilter(FilePathFilter filesFilter) { + this.localFilesFilter = filesFilter; + super.setFilesFilter(filesFilter); + } + + /** + * A simple hook to filter files and directories from the input. + * The method may be overridden. Hadoop's FileInputFormat has a similar mechanism and applies the + * same filters by default. + * + * @param fileStatus The file status to check. + * @return true, if the given file or directory is accepted + */ + public boolean acceptFile(FileStatus fileStatus) { + final String name = fileStatus.getPath().getName(); + return !name.startsWith("_") + && !name.startsWith(".") + && !localFilesFilter.filterPath(new Path(fileStatus.getPath().toUri())); + } + + /** + * Retrieves the index of the BlockLocation that contains the part of the file described by the given + * offset. + * + * @param blocks The different blocks of the file. Must be ordered by their offset. + * @param offset The offset of the position in the file. + * @param startIndex The earliest index to look at. + * @return The index of the block containing the given position. + */ + private int getBlockIndexForPosition(BlockLocation[] blocks, long offset, long halfSplitSize, int startIndex) { + // go over all indexes after the startIndex + for (int i = startIndex; i < blocks.length; i++) { + long blockStart = blocks[i].getOffset(); + long blockEnd = blockStart + blocks[i].getLength(); + + if (offset >= blockStart && offset < blockEnd) { + // got the block where the split starts + // check if the next block contains more than this one does + if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) { + return i + 1; + } else { + return i; + } + } + } + throw new IllegalArgumentException("The given offset is not contained in the any block."); + } + + private boolean testForUnsplittable(FileStatus pathFile) { + if (getInflaterInputStreamFactory(pathFile.getPath()) != null) { + unsplittable = true; + return true; + } + return false; + } + + private InflaterInputStreamFactory getInflaterInputStreamFactory(org.apache.hadoop.fs.Path path) { + String fileExtension = extractFileExtension(path.getName()); + if (fileExtension != null) { + return getInflaterInputStreamFactory(fileExtension); + } else { + return null; + } + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputFormat.java index 510b5b549..e2951ab49 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputFormat.java @@ -26,6 +26,7 @@ import org.apache.hudi.operator.FlinkOptions; import org.apache.hudi.source.format.FilePathUtils; import org.apache.hudi.source.format.FormatUtils; import org.apache.hudi.source.format.cow.ParquetColumnarRowSplitReader; +import org.apache.hudi.source.format.cow.ParquetSplitReaderUtil; import org.apache.hudi.util.AvroToRowDataConverters; import org.apache.hudi.util.RowDataToAvroConverters; import org.apache.hudi.util.StreamerUtil; @@ -40,7 +41,6 @@ import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.InputSplitAssigner; -import org.apache.hudi.source.format.cow.ParquetSplitReaderUtil; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; @@ -238,7 +238,7 @@ public class MergeOnReadInputFormat private ParquetColumnarRowSplitReader getReader(String path, int[] requiredPos) throws IOException { // generate partition specs. LinkedHashMap partSpec = FilePathUtils.extractPartitionKeyValues( - new org.apache.flink.core.fs.Path(path).getParent(), + new org.apache.hadoop.fs.Path(path).getParent(), this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION), this.conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")); LinkedHashMap partObjects = new LinkedHashMap<>(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index b5326bd07..536423060 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -42,7 +42,6 @@ import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Preconditions; @@ -53,7 +52,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; -import java.io.File; import java.io.IOException; import java.io.StringReader; import java.util.List; @@ -127,50 +125,9 @@ public class StreamerUtil { return conf; } + // Keep to avoid to much modifications. public static org.apache.hadoop.conf.Configuration getHadoopConf() { - // create hadoop configuration with hadoop conf directory configured. - org.apache.hadoop.conf.Configuration hadoopConf = null; - for (String possibleHadoopConfPath : HadoopUtils.possibleHadoopConfPaths(new Configuration())) { - hadoopConf = getHadoopConfiguration(possibleHadoopConfPath); - if (hadoopConf != null) { - break; - } - } - if (hadoopConf == null) { - hadoopConf = new org.apache.hadoop.conf.Configuration(); - } - return hadoopConf; - } - - /** - * Returns a new Hadoop Configuration object using the path to the hadoop conf configured. - * - * @param hadoopConfDir Hadoop conf directory path. - * @return A Hadoop configuration instance. - */ - private static org.apache.hadoop.conf.Configuration getHadoopConfiguration(String hadoopConfDir) { - if (new File(hadoopConfDir).exists()) { - org.apache.hadoop.conf.Configuration hadoopConfiguration = new org.apache.hadoop.conf.Configuration(); - File coreSite = new File(hadoopConfDir, "core-site.xml"); - if (coreSite.exists()) { - hadoopConfiguration.addResource(new Path(coreSite.getAbsolutePath())); - } - File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml"); - if (hdfsSite.exists()) { - hadoopConfiguration.addResource(new Path(hdfsSite.getAbsolutePath())); - } - File yarnSite = new File(hadoopConfDir, "yarn-site.xml"); - if (yarnSite.exists()) { - hadoopConfiguration.addResource(new Path(yarnSite.getAbsolutePath())); - } - // Add mapred-site.xml. We need to read configurations like compression codec. - File mapredSite = new File(hadoopConfDir, "mapred-site.xml"); - if (mapredSite.exists()) { - hadoopConfiguration.addResource(new Path(mapredSite.getAbsolutePath())); - } - return hadoopConfiguration; - } - return null; + return FlinkClientUtil.getHadoopConf(); } /** @@ -291,21 +248,22 @@ public class StreamerUtil { final String basePath = conf.getString(FlinkOptions.PATH); final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf(); // Hadoop FileSystem - try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) { - if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))) { - HoodieTableMetaClient.withPropertyBuilder() + FileSystem fs = FSUtils.getFs(basePath, hadoopConf); + if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))) { + HoodieTableMetaClient.withPropertyBuilder() .setTableType(conf.getString(FlinkOptions.TABLE_TYPE)) .setTableName(conf.getString(FlinkOptions.TABLE_NAME)) .setPayloadClassName(conf.getString(FlinkOptions.PAYLOAD_CLASS)) .setArchiveLogFolder(DEFAULT_ARCHIVE_LOG_FOLDER) .setTimelineLayoutVersion(1) .initTable(hadoopConf, basePath); - LOG.info("Table initialized under base path {}", basePath); - } else { - LOG.info("Table [{}/{}] already exists, no need to initialize the table", - basePath, conf.getString(FlinkOptions.TABLE_NAME)); - } + LOG.info("Table initialized under base path {}", basePath); + } else { + LOG.info("Table [{}/{}] already exists, no need to initialize the table", + basePath, conf.getString(FlinkOptions.TABLE_NAME)); } + // Do not close the filesystem in order to use the CACHE, + // some of the filesystems release the handles in #close method. } /** Generates the bucket ID using format {partition path}_{fileID}. */ diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java index 48bd350db..c1e234855 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java @@ -28,8 +28,8 @@ import org.apache.hudi.util.StreamerUtil; import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.Path; import org.apache.flink.table.data.RowData; +import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java index 2daa6c38f..4ada381ba 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java @@ -36,7 +36,6 @@ import org.apache.hudi.utils.TestUtils; import org.apache.avro.Schema; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; @@ -48,6 +47,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; +import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -242,8 +242,9 @@ public class TestStreamReadOperator { private OneInputStreamOperatorTestHarness createReader() throws Exception { final String basePath = tempFile.getAbsolutePath(); + final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf(); final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() - .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build(); + .setConf(hadoopConf).setBasePath(basePath).build(); final List partitionKeys = Collections.singletonList("partition"); // This input format is used to opening the emitted split. @@ -262,11 +263,10 @@ public class TestStreamReadOperator { tableAvroSchema.toString(), AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(), Collections.emptyList()); - Path[] paths = FilePathUtils.getReadPaths( - new Path(basePath), conf, partitionKeys, conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME)); + Path[] paths = FilePathUtils.getReadPaths(new Path(basePath), conf, hadoopConf, partitionKeys); MergeOnReadInputFormat inputFormat = new MergeOnReadInputFormat( conf, - paths, + FilePathUtils.toFlinkPaths(paths), hoodieTableState, rowDataType.getChildren(), "default", diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java index 343f2939b..1d92dbb68 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java @@ -27,9 +27,9 @@ import org.apache.hudi.util.StreamerUtil; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.InputSplit; import org.apache.flink.table.data.RowData; +import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest;