From 57668d02a0aa723dd4b2245dc7659fe18113eb59 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Sat, 28 Aug 2021 20:16:54 +0800 Subject: [PATCH] [HUDI-2371] Improvement flink streaming reader (#3552) - Support reading empty table - Fix filtering by partition path - Support reading from earliest commit --- .../hudi/configuration/FlinkOptions.java | 1 + .../source/StreamReadMonitoringFunction.java | 44 ++++- .../apache/hudi/table/HoodieTableSource.java | 153 ++++++++++++------ .../hudi/table/format/FilePathUtils.java | 30 +++- .../org/apache/hudi/util/StreamerUtil.java | 45 +++++- .../TestStreamReadMonitoringFunction.java | 31 ++++ .../hudi/table/HoodieDataSourceITCase.java | 77 ++++++++- .../hudi/table/TestHoodieTableSource.java | 36 ++++- .../java/org/apache/hudi/utils/TestUtils.java | 5 +- 9 files changed, 348 insertions(+), 74 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index a1c441735..0e2b0b338 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -196,6 +196,7 @@ public class FlinkOptions extends HoodieConfig { .defaultValue(60)// default 1 minute .withDescription("Check interval for streaming read of SECOND, default 1 minute"); + public static final String START_COMMIT_EARLIEST = "earliest"; public static final ConfigOption READ_STREAMING_START_COMMIT = ConfigOptions .key("read.streaming.start-commit") .stringType() diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java index 92c06e951..c7dcc0a38 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java @@ -48,6 +48,8 @@ import org.apache.hadoop.fs.FileStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -105,20 +107,23 @@ public class StreamReadMonitoringFunction private transient org.apache.hadoop.conf.Configuration hadoopConf; - private final HoodieTableMetaClient metaClient; + private HoodieTableMetaClient metaClient; private final long maxCompactionMemoryInBytes; + // for partition pruning + private final Set requiredPartitionPaths; + public StreamReadMonitoringFunction( Configuration conf, Path path, - HoodieTableMetaClient metaClient, - long maxCompactionMemoryInBytes) { + long maxCompactionMemoryInBytes, + Set requiredPartitionPaths) { this.conf = conf; this.path = path; - this.metaClient = metaClient; this.interval = conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL); this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes; + this.requiredPartitionPaths = requiredPartitionPaths; } @Override @@ -180,8 +185,26 @@ public class StreamReadMonitoringFunction } } + @Nullable + private HoodieTableMetaClient getOrCreateMetaClient() { + if (this.metaClient != null) { + return this.metaClient; + } + if (StreamerUtil.tableExists(this.path.toString(), hadoopConf)) { + this.metaClient = StreamerUtil.createMetaClient(this.path.toString(), hadoopConf); + return this.metaClient; + } + // fallback + return null; + } + @VisibleForTesting public void monitorDirAndForwardSplits(SourceContext context) { + HoodieTableMetaClient metaClient = getOrCreateMetaClient(); + if (metaClient == null) { + // table does not exist + return; + } metaClient.reloadActiveTimeline(); HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); if (commitTimeline.empty()) { @@ -200,8 +223,9 @@ public class StreamReadMonitoringFunction } else if (this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) { // first time consume and has a start commit final String specifiedStart = this.conf.getString(FlinkOptions.READ_STREAMING_START_COMMIT); - instantRange = InstantRange.getInstance(specifiedStart, instantToIssue.getTimestamp(), - InstantRange.RangeType.CLOSE_CLOSE); + instantRange = specifiedStart.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST) + ? null + : InstantRange.getInstance(specifiedStart, instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE); } else { // first time consume and no start commit, consumes the latest incremental data set. HoodieInstant latestCommitInstant = metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get(); @@ -222,6 +246,11 @@ public class StreamReadMonitoringFunction List metadataList = instants.stream() .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList()); Set writePartitions = getWritePartitionPaths(metadataList); + // apply partition push down + if (this.requiredPartitionPaths.size() > 0) { + writePartitions = writePartitions.stream() + .filter(this.requiredPartitionPaths::contains).collect(Collectors.toSet()); + } FileStatus[] fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList); if (fileStatuses.length == 0) { LOG.warn("No files found for reading in user provided path."); @@ -310,7 +339,8 @@ public class StreamReadMonitoringFunction return commitTimeline.getInstants() .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, issuedInstant)) .collect(Collectors.toList()); - } else if (this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) { + } else if (this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent() + && !this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)) { String definedStartCommit = this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT); return commitTimeline.getInstants() .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, definedStartCommit)) diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 1d58e4e24..fc4239442 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.HoodieLogFile; @@ -85,6 +86,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -151,9 +153,8 @@ public class HoodieTableSource implements : requiredPos; this.limit = limit == null ? NO_LIMIT_CONSTANT : limit; this.filters = filters == null ? Collections.emptyList() : filters; - final String basePath = this.conf.getString(FlinkOptions.PATH); this.hadoopConf = StreamerUtil.getHadoopConf(); - this.metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); + this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf); this.maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new JobConf(this.hadoopConf)); } @@ -173,11 +174,8 @@ public class HoodieTableSource implements (TypeInformation) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType()); if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) { StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction( - conf, FilePathUtils.toFlinkPath(path), metaClient, maxCompactionMemoryInBytes); + conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths()); InputFormat inputFormat = getInputFormat(true); - if (!(inputFormat instanceof MergeOnReadInputFormat)) { - throw new HoodieException("No successful commits under path " + path); - } OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, "streaming_source") .uid("uid_streaming_source_" + conf.getString(FlinkOptions.TABLE_NAME)) @@ -218,7 +216,8 @@ public class HoodieTableSource implements @Override public Result applyFilters(List filters) { this.filters = new ArrayList<>(filters); - return Result.of(new ArrayList<>(filters), new ArrayList<>(filters)); + // refuse all the filters now + return Result.of(Collections.emptyList(), new ArrayList<>(filters)); } @Override @@ -266,7 +265,18 @@ public class HoodieTableSource implements return requiredPartitions; } + private Set getRequiredPartitionPaths() { + if (this.requiredPartitions == null) { + return Collections.emptySet(); + } + return FilePathUtils.toRelativePartitionPaths(this.partitionKeys, this.requiredPartitions, + conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)); + } + private List buildFileIndex(Path[] paths) { + if (paths.length == 0) { + return Collections.emptyList(); + } FileStatus[] fileStatuses = Arrays.stream(paths) .flatMap(path -> Arrays.stream(FilePathUtils.getFileStatusRecursively(path, 1, hadoopConf))) @@ -304,6 +314,10 @@ public class HoodieTableSource implements @VisibleForTesting public InputFormat getInputFormat(boolean isStreaming) { + return isStreaming ? getStreamInputFormat() : getBatchInputFormat(); + } + + private InputFormat getBatchInputFormat() { // When this table has no partition, just return an empty source. if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) { return new CollectionInputFormat<>(Collections.emptyList(), null); @@ -314,13 +328,7 @@ public class HoodieTableSource implements return new CollectionInputFormat<>(Collections.emptyList(), null); } - TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)); - final Schema tableAvroSchema; - try { - tableAvroSchema = schemaUtil.getTableAvroSchema(); - } catch (Exception e) { - throw new HoodieException("Get table avro schema error", e); - } + final Schema tableAvroSchema = getTableAvroSchema(); final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema); final RowType rowType = (RowType) rowDataType.getLogicalType(); final RowType requiredRowType = (RowType) getProducedDataType().notNull().getLogicalType(); @@ -331,17 +339,11 @@ public class HoodieTableSource implements final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)); switch (tableType) { case MERGE_ON_READ: - final List inputSplits; - if (!isStreaming) { - inputSplits = buildFileIndex(paths); - if (inputSplits.size() == 0) { - // When there is no input splits, just return an empty source. - LOG.warn("No input splits generate for MERGE_ON_READ input format, returns empty collection instead"); - return new CollectionInputFormat<>(Collections.emptyList(), null); - } - } else { - // streaming reader would build the splits automatically. - inputSplits = Collections.emptyList(); + final List inputSplits = buildFileIndex(paths); + if (inputSplits.size() == 0) { + // When there is no input splits, just return an empty source. + LOG.warn("No input splits generate for MERGE_ON_READ input format, returns empty collection instead"); + return new CollectionInputFormat<>(Collections.emptyList(), null); } final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState( rowType, @@ -359,28 +361,9 @@ public class HoodieTableSource implements .fieldTypes(rowDataType.getChildren()) .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME)) .limit(this.limit) - .emitDelete(isStreaming) + .emitDelete(false) .build(); case COPY_ON_WRITE: - if (isStreaming) { - final MergeOnReadTableState hoodieTableState2 = new MergeOnReadTableState( - rowType, - requiredRowType, - tableAvroSchema.toString(), - AvroSchemaConverter.convertToSchema(requiredRowType).toString(), - Collections.emptyList(), - conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",")); - return MergeOnReadInputFormat.builder() - .config(this.conf) - .paths(FilePathUtils.toFlinkPaths(paths)) - .tableState(hoodieTableState2) - // use the explicit fields data type because the AvroSchemaConverter - // is not very stable. - .fieldTypes(rowDataType.getChildren()) - .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME)) - .limit(this.limit) - .build(); - } FileInputFormat format = new CopyOnWriteInputFormat( FilePathUtils.toFlinkPaths(paths), this.schema.getColumnNames().toArray(new String[0]), @@ -416,6 +399,86 @@ public class HoodieTableSource implements } } + private InputFormat getStreamInputFormat() { + // if table does not exist, use schema from the DDL + Schema tableAvroSchema = this.metaClient == null ? inferSchemaFromDdl() : getTableAvroSchema(); + final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema); + final RowType rowType = (RowType) rowDataType.getLogicalType(); + final RowType requiredRowType = (RowType) getProducedDataType().notNull().getLogicalType(); + + final String queryType = this.conf.getString(FlinkOptions.QUERY_TYPE); + org.apache.flink.core.fs.Path[] paths = new org.apache.flink.core.fs.Path[0]; + if (FlinkOptions.QUERY_TYPE_SNAPSHOT.equals(queryType)) { + final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)); + switch (tableType) { + case MERGE_ON_READ: + final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState( + rowType, + requiredRowType, + tableAvroSchema.toString(), + AvroSchemaConverter.convertToSchema(requiredRowType).toString(), + Collections.emptyList(), + conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",")); + return MergeOnReadInputFormat.builder() + .config(this.conf) + .paths(paths) + .tableState(hoodieTableState) + // use the explicit fields data type because the AvroSchemaConverter + // is not very stable. + .fieldTypes(rowDataType.getChildren()) + .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME)) + .limit(this.limit) + .emitDelete(true) + .build(); + case COPY_ON_WRITE: + final MergeOnReadTableState hoodieTableState2 = new MergeOnReadTableState( + rowType, + requiredRowType, + tableAvroSchema.toString(), + AvroSchemaConverter.convertToSchema(requiredRowType).toString(), + Collections.emptyList(), + conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",")); + return MergeOnReadInputFormat.builder() + .config(this.conf) + .paths(paths) + .tableState(hoodieTableState2) + // use the explicit fields data type because the AvroSchemaConverter + // is not very stable. + .fieldTypes(rowDataType.getChildren()) + .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME)) + .limit(this.limit) + .build(); + default: + throw new HoodieException("Unexpected table type: " + this.conf.getString(FlinkOptions.TABLE_TYPE)); + } + } + String errMsg = String.format("Invalid query type : '%s', options ['%s'] are supported now", queryType, + FlinkOptions.QUERY_TYPE_SNAPSHOT); + throw new HoodieException(errMsg); + } + + private Schema inferSchemaFromDdl() { + Schema schema = AvroSchemaConverter.convertToSchema(this.schema.toSourceRowDataType().getLogicalType()); + return HoodieAvroUtils.addMetadataFields(schema, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)); + } + + @VisibleForTesting + public Schema getTableAvroSchema() { + try { + TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)); + return schemaUtil.getTableAvroSchema(); + } catch (Throwable e) { + // table exists but has no written data + LOG.warn("Get table avro schema error, use schema from the DDL instead", e); + return inferSchemaFromDdl(); + } + } + + @VisibleForTesting + public HoodieTableMetaClient getMetaClient() { + return this.metaClient; + } + @VisibleForTesting public Configuration getConf() { return this.conf; diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java index 83607cd9c..8f1347f30 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java @@ -36,6 +36,7 @@ import java.util.BitSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -82,11 +83,13 @@ public class FilePathUtils { * @param partitionKVs The partition key value mapping * @param hivePartition Whether the partition path is with Hive style, * e.g. {partition key} = {partition value} + * @param sepSuffix Whether to append the file separator as suffix * @return an escaped, valid partition name */ public static String generatePartitionPath( LinkedHashMap partitionKVs, - boolean hivePartition) { + boolean hivePartition, + boolean sepSuffix) { if (partitionKVs.isEmpty()) { return ""; } @@ -103,7 +106,9 @@ public class FilePathUtils { suffixBuf.append(escapePathName(e.getValue())); i++; } - suffixBuf.append(File.separator); + if (sepSuffix) { + suffixBuf.append(File.separator); + } return suffixBuf.toString(); } @@ -371,11 +376,30 @@ public class FilePathUtils { boolean hivePartition) { return partitionPaths.stream() .map(m -> validateAndReorderPartitions(m, partitionKeys)) - .map(kvs -> FilePathUtils.generatePartitionPath(kvs, hivePartition)) + .map(kvs -> FilePathUtils.generatePartitionPath(kvs, hivePartition, true)) .map(n -> new Path(path, n)) .toArray(Path[]::new); } + /** + * Transforms the given partition key value mapping to relative partition paths. + * + * @param partitionKeys The partition key list + * @param partitionPaths The partition key value mapping + * @param hivePartition Whether the partition path is in Hive style + * + * @see #getReadPaths + */ + public static Set toRelativePartitionPaths( + List partitionKeys, + List> partitionPaths, + boolean hivePartition) { + return partitionPaths.stream() + .map(m -> validateAndReorderPartitions(m, partitionKeys)) + .map(kvs -> FilePathUtils.generatePartitionPath(kvs, hivePartition, false)) + .collect(Collectors.toSet()); + } + /** * Transforms the array of Hadoop paths to Flink paths. */ diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 6dc6adde2..9f625baa1 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -229,9 +229,7 @@ public class StreamerUtil { public static void initTableIfNotExists(Configuration conf) throws IOException { final String basePath = conf.getString(FlinkOptions.PATH); final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf(); - // Hadoop FileSystem - FileSystem fs = FSUtils.getFs(basePath, hadoopConf); - if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))) { + if (!tableExists(basePath, hadoopConf)) { HoodieTableMetaClient.withPropertyBuilder() .setTableType(conf.getString(FlinkOptions.TABLE_TYPE)) .setTableName(conf.getString(FlinkOptions.TABLE_NAME)) @@ -251,6 +249,19 @@ public class StreamerUtil { // some of the filesystems release the handles in #close method. } + /** + * Returns whether the hoodie table exists under given path {@code basePath}. + */ + public static boolean tableExists(String basePath, org.apache.hadoop.conf.Configuration hadoopConf) { + // Hadoop FileSystem + FileSystem fs = FSUtils.getFs(basePath, hadoopConf); + try { + return fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)); + } catch (IOException e) { + throw new HoodieException("Error while checking whether table exists under path:" + basePath, e); + } + } + /** * Generates the bucket ID using format {partition path}_{fileID}. */ @@ -282,11 +293,37 @@ public class StreamerUtil { && conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED); } + /** + * Creates the meta client for reader. + * + *

The streaming pipeline process is long running, so empty table path is allowed, + * the reader would then check and refresh the meta client. + * + * @see org.apache.hudi.source.StreamReadMonitoringFunction + */ + public static HoodieTableMetaClient metaClientForReader( + Configuration conf, + org.apache.hadoop.conf.Configuration hadoopConf) { + final String basePath = conf.getString(FlinkOptions.PATH); + if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING) && !tableExists(basePath, hadoopConf)) { + return null; + } else { + return createMetaClient(basePath, hadoopConf); + } + } + + /** + * Creates the meta client. + */ + public static HoodieTableMetaClient createMetaClient(String basePath, org.apache.hadoop.conf.Configuration hadoopConf) { + return HoodieTableMetaClient.builder().setBasePath(basePath).setConf(hadoopConf).build(); + } + /** * Creates the meta client. */ public static HoodieTableMetaClient createMetaClient(String basePath) { - return HoodieTableMetaClient.builder().setBasePath(basePath).setConf(FlinkClientUtil.getHadoopConf()).build(); + return createMetaClient(basePath, FlinkClientUtil.getHadoopConf()); } /** diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java index f14574446..d13f68319 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java @@ -168,6 +168,37 @@ public class TestStreamReadMonitoringFunction { } } + @Test + public void testConsumeFromEarliestCommit() throws Exception { + // write 2 commits first, then specify the start commit as 'earliest', + // all the splits should come from the earliest commit. + TestData.writeData(TestData.DATA_SET_INSERT, conf); + TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf); + String specifiedCommit = TestUtils.getLatestCommit(tempFile.getAbsolutePath()); + conf.setString(FlinkOptions.READ_STREAMING_START_COMMIT, FlinkOptions.START_COMMIT_EARLIEST); + StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf); + try (AbstractStreamOperatorTestHarness harness = createHarness(function)) { + harness.setup(); + harness.open(); + + CountDownLatch latch = new CountDownLatch(4); + CollectingSourceContext sourceContext = new CollectingSourceContext(latch); + + runAsync(sourceContext, function); + + assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation"); + assertThat("Should produce the expected splits", + sourceContext.getPartitionPaths(), is("par1,par2,par3,par4")); + assertTrue(sourceContext.splits.stream().noneMatch(split -> split.getInstantRange().isPresent()), + "No instants should have range limit"); + assertTrue(sourceContext.splits.stream().allMatch(split -> split.getLatestCommit().equals(specifiedCommit)), + "All the splits should be with specified instant time"); + + // Stop the stream task. + function.close(); + } + } + @Test public void testCheckpointRestore() throws Exception { TestData.writeData(TestData.DATA_SET_INSERT, conf); diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 9effdcc8c..26e0be6ee 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -21,7 +21,6 @@ package org.apache.hudi.table; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; @@ -124,6 +123,13 @@ public class HoodieDataSourceITCase extends AbstractTestBase { execInsertSql(streamTableEnv, insertInto); List rows2 = execSelectSql(streamTableEnv, "select * from t1", 10); assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT); + + streamTableEnv.getConfig().getConfiguration() + .setBoolean("table.dynamic-table-options.enabled", true); + // specify the start commit as earliest + List rows3 = execSelectSql(streamTableEnv, + "select * from t1/*+options('read.streaming.start-commit'='earliest')*/", 10); + assertRowsEquals(rows3, TestData.DATA_SET_SOURCE_INSERT); } @ParameterizedTest @@ -300,6 +306,34 @@ public class HoodieDataSourceITCase extends AbstractTestBase { assertRowsEquals(result, expected, true); } + @ParameterizedTest + @MethodSource("tableTypeAndPartitioningParams") + void testStreamReadFilterByPartition(HoodieTableType tableType, boolean hiveStylePartitioning) throws Exception { + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setString(FlinkOptions.TABLE_NAME, "t1"); + conf.setString(FlinkOptions.TABLE_TYPE, tableType.name()); + conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning); + + // write one commit + TestData.writeData(TestData.DATA_SET_INSERT, conf); + + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ) + .option(FlinkOptions.READ_AS_STREAMING, "true") + .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, "2") + .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning) + .end(); + streamTableEnv.executeSql(hoodieTableDDL); + + List result = execSelectSql(streamTableEnv, + "select * from t1 where `partition`='par1'", 10); + final String expected = "[" + + "+I(+I[id1, Danny, 23, 1970-01-01T00:00:00.001, par1]), " + + "+I(+I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1])]"; + assertRowsEquals(result, expected, true); + } + @ParameterizedTest @MethodSource("executionModeAndPartitioningParams") void testWriteAndRead(ExecMode execMode, boolean hiveStylePartitioning) { @@ -568,10 +602,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { @Test void testStreamReadEmptyTablePath() throws Exception { - // create an empty table - Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); - StreamerUtil.initTableIfNotExists(conf); - + // case1: table metadata path does not exists // create a flink source table String createHoodieTable = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) @@ -580,10 +611,39 @@ public class HoodieDataSourceITCase extends AbstractTestBase { .end(); streamTableEnv.executeSql(createHoodieTable); - // execute query and assert throws exception - assertThrows(HoodieException.class, () -> execSelectSql(streamTableEnv, "select * from t1", 10), - "No successful commits under path " + tempFile.getAbsolutePath()); + // no exception expects to be thrown + List rows1 = execSelectSql(streamTableEnv, "select * from t1", 10); + assertRowsEquals(rows1, "[]"); + // case2: empty table without data files + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + StreamerUtil.initTableIfNotExists(conf); + + List rows2 = execSelectSql(streamTableEnv, "select * from t1", 10); + assertRowsEquals(rows2, "[]"); + } + + @Test + void testBatchReadEmptyTablePath() throws Exception { + // case1: table metadata path does not exists + // create a flink source table + String createHoodieTable = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ) + .end(); + batchTableEnv.executeSql(createHoodieTable); + + // no exception expects to be thrown + assertThrows(Exception.class, + () -> execSelectSql(batchTableEnv, "select * from t1", 10), + "Exception should throw when querying non-exists table in batch mode"); + + // case2: empty table without data files + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + StreamerUtil.initTableIfNotExists(conf); + + List rows2 = CollectionUtil.iteratorToList(batchTableEnv.executeSql("select * from t1").collect()); + assertRowsEquals(rows2, "[]"); } @ParameterizedTest @@ -781,6 +841,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { private List execSelectSql(TableEnvironment tEnv, String select, String sinkDDL, long timeout) throws InterruptedException { + tEnv.executeSql("DROP TABLE IF EXISTS sink"); tEnv.executeSql(sinkDDL); TableResult tableResult = tEnv.executeSql("insert into sink " + select); // wait for the timeout then cancels the job diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java index fed3748ad..25742a7fa 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java @@ -25,12 +25,12 @@ import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; +import org.apache.avro.Schema; import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.data.RowData; import org.apache.hadoop.fs.Path; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.slf4j.Logger; @@ -43,12 +43,14 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; /** @@ -62,17 +64,17 @@ public class TestHoodieTableSource { @TempDir File tempFile; - @BeforeEach void beforeEach() throws IOException { final String path = tempFile.getAbsolutePath(); - conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf = TestConfigurations.getDefaultConf(path); StreamerUtil.initTableIfNotExists(conf); IntStream.range(1, 5) .forEach(i -> new File(path + File.separator + "par" + i).mkdirs()); } @Test - void testGetReadPaths() { + void testGetReadPaths() throws Exception { + beforeEach(); HoodieTableSource tableSource = new HoodieTableSource( TestConfigurations.TABLE_SCHEMA, new Path(tempFile.getPath()), @@ -99,6 +101,7 @@ public class TestHoodieTableSource { @Test void testGetInputFormat() throws Exception { + beforeEach(); // write some data to let the TableSchemaResolver get the right instant TestData.writeData(TestData.DATA_SET_INSERT, conf); @@ -118,4 +121,29 @@ public class TestHoodieTableSource { () -> tableSource.getInputFormat(), "Invalid query type : 'incremental'. Only 'snapshot' is supported now"); } + + @Test + void testGetTableAvroSchema() { + final String path = tempFile.getAbsolutePath(); + conf = TestConfigurations.getDefaultConf(path); + conf.setBoolean(FlinkOptions.READ_AS_STREAMING, true); + + HoodieTableSource tableSource = new HoodieTableSource( + TestConfigurations.TABLE_SCHEMA, + new Path(tempFile.getPath()), + Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")), + "default-par", + conf); + assertNull(tableSource.getMetaClient(), "Streaming source with empty table path is allowed"); + final String schemaFields = tableSource.getTableAvroSchema().getFields().stream() + .map(Schema.Field::name) + .collect(Collectors.joining(",")); + final String expected = "_hoodie_commit_time," + + "_hoodie_commit_seqno," + + "_hoodie_record_key," + + "_hoodie_partition_path," + + "_hoodie_file_name," + + "uuid,name,age,ts,partition"; + assertThat(schemaFields, is(expected)); + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java index ecc86e720..4e9ad5123 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import java.io.File; +import java.util.Collections; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -57,8 +58,6 @@ public class TestUtils { public static StreamReadMonitoringFunction getMonitorFunc(Configuration conf) { final String basePath = conf.getString(FlinkOptions.PATH); - final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() - .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build(); - return new StreamReadMonitoringFunction(conf, new Path(basePath), metaClient, 1024 * 1024L); + return new StreamReadMonitoringFunction(conf, new Path(basePath), 1024 * 1024L, Collections.emptySet()); } }