diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 4ca1dd770..b13b561f5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -177,7 +177,7 @@ public class HoodieMergeHandle extends H writeStatus.setPartitionPath(partitionPath); writeStatus.getStat().setPartitionPath(partitionPath); writeStatus.getStat().setFileId(fileId); - writeStatus.getStat().setPath(new Path(config.getBasePath()), newFilePath); + setWriteStatusPath(); // Create Marker file createMarkerFile(partitionPath, newFileName); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java index da72b165f..c1e8cbf08 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java @@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -104,8 +105,8 @@ public class HoodieCommitMetadata implements Serializable { public HashMap getFileIdAndRelativePaths() { HashMap filePaths = new HashMap<>(); // list all partitions paths - for (Map.Entry> entry : getPartitionToWriteStats().entrySet()) { - for (HoodieWriteStat stat : entry.getValue()) { + for (List stats : getPartitionToWriteStats().values()) { + for (HoodieWriteStat stat : stats) { filePaths.put(stat.getFileId(), stat.getPath()); } } @@ -142,6 +143,60 @@ public class HoodieCommitMetadata implements Serializable { return fileGroupIdToFullPaths; } + /** + * Extract the file status of all affected files from the commit metadata. If a file has + * been touched multiple times in the given commits, the return value will keep the one + * from the latest commit. + * + * @param basePath The base path + * @return the file full path to file status mapping + */ + public Map getFullPathToFileStatus(String basePath) { + Map fullPathToFileStatus = new HashMap<>(); + for (List stats : getPartitionToWriteStats().values()) { + // Iterate through all the written files. + for (HoodieWriteStat stat : stats) { + String relativeFilePath = stat.getPath(); + Path fullPath = relativeFilePath != null ? FSUtils.getPartitionPath(basePath, relativeFilePath) : null; + if (fullPath != null) { + FileStatus fileStatus = new FileStatus(stat.getFileSizeInBytes(), false, 0, 0, + 0, fullPath); + fullPathToFileStatus.put(fullPath.getName(), fileStatus); + } + } + } + return fullPathToFileStatus; + } + + /** + * Extract the file status of all affected files from the commit metadata. If a file has + * been touched multiple times in the given commits, the return value will keep the one + * from the latest commit by file group ID. + * + *

Note: different with {@link #getFullPathToFileStatus(String)}, + * only the latest commit file for a file group is returned, + * this is an optimization for COPY_ON_WRITE table to eliminate legacy files for filesystem view. + * + * @param basePath The base path + * @return the file ID to file status mapping + */ + public Map getFileIdToFileStatus(String basePath) { + Map fileIdToFileStatus = new HashMap<>(); + for (List stats : getPartitionToWriteStats().values()) { + // Iterate through all the written files. + for (HoodieWriteStat stat : stats) { + String relativeFilePath = stat.getPath(); + Path fullPath = relativeFilePath != null ? FSUtils.getPartitionPath(basePath, relativeFilePath) : null; + if (fullPath != null) { + FileStatus fileStatus = new FileStatus(stat.getFileSizeInBytes(), false, 0, 0, + 0, fullPath); + fileIdToFileStatus.put(stat.getFileId(), fileStatus); + } + } + } + return fileIdToFileStatus; + } + public String toJsonString() throws IOException { if (partitionToWriteStats.containsKey(null)) { LOG.info("partition path is null for " + partitionToWriteStats.get(null)); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java index 6b05eddcc..4926b2a55 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java @@ -41,6 +41,7 @@ import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -208,9 +209,8 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { List instantsInRange = new ArrayList<>(); for (FileStatus fs : fsStatuses) { //read the archived file - HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(), - new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema()); - try { + try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(), + new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) { int instantsInPreviousFile = instantsInRange.size(); //read the avro blocks while (reader.hasNext()) { @@ -220,8 +220,8 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { List records = blk.getRecords(); // filter blocks in desired time window Stream instantsInBlkStream = records.stream() - .filter(r -> commitsFilter.apply((GenericRecord) r)) - .map(r -> readCommit((GenericRecord) r, loadInstantDetails)); + .filter(r -> commitsFilter.apply((GenericRecord) r)) + .map(r -> readCommit((GenericRecord) r, loadInstantDetails)); if (filter != null) { instantsInBlkStream = instantsInBlkStream.filter(filter::isInRange); @@ -238,11 +238,10 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { break; } } - } finally { - reader.close(); } } + Collections.sort(instantsInRange); return instantsInRange; } catch (IOException e) { throw new HoodieIOException( diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index f5f25bc15..81bd51748 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -371,8 +371,8 @@ public class FlinkOptions extends HoodieConfig { public static final ConfigOption WRITE_BATCH_SIZE = ConfigOptions .key("write.batch.size") .doubleType() - .defaultValue(64D) // 64MB - .withDescription("Batch buffer size in MB to flush data into the underneath filesystem, default 64MB"); + .defaultValue(256D) // 256MB + .withDescription("Batch buffer size in MB to flush data into the underneath filesystem, default 256MB"); public static final ConfigOption WRITE_LOG_BLOCK_SIZE = ConfigOptions .key("write.log_block.size") diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index c71a91a82..f8eea2e89 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -139,7 +139,7 @@ public class StreamWriteFunction extends AbstractStreamWriteFunction { public void close() { if (this.writeClient != null) { this.writeClient.cleanHandlesGracefully(); - this.writeClient.close(); + // this.writeClient.close(); } } @@ -378,6 +378,8 @@ public class StreamWriteFunction extends AbstractStreamWriteFunction { k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value)); final DataItem item = DataItem.fromHoodieRecord(value); + bucket.records.add(item); + boolean flushBucket = bucket.detector.detect(item); boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize); if (flushBucket) { @@ -398,7 +400,6 @@ public class StreamWriteFunction extends AbstractStreamWriteFunction { LOG.warn("The buffer size hits the threshold {}, but still flush the max size data bucket failed!", this.tracer.maxBufferSize); } } - bucket.records.add(item); } private boolean hasData() { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index f2844a608..51280c3da 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -211,11 +211,13 @@ public class StreamWriteOperatorCoordinator // the stream write task snapshot and flush the data buffer synchronously in sequence, // so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract) final boolean committed = commitInstant(this.instant); - if (committed) { + if (tableState.scheduleCompaction) { // if async compaction is on, schedule the compaction - if (tableState.scheduleCompaction) { + if (committed || tableState.timeCompactionTriggerStrategy) { writeClient.scheduleCompaction(Option.empty()); } + } + if (committed) { // start new instant. startInstant(); // sync Hive if is enabled @@ -530,6 +532,7 @@ public class StreamWriteOperatorCoordinator final String commitAction; final boolean isOverwrite; final boolean scheduleCompaction; + final boolean timeCompactionTriggerStrategy; final boolean syncHive; final boolean syncMetadata; @@ -539,6 +542,7 @@ public class StreamWriteOperatorCoordinator HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase(Locale.ROOT))); this.isOverwrite = WriteOperationType.isOverwrite(this.operationType); this.scheduleCompaction = StreamerUtil.needsScheduleCompaction(conf); + this.timeCompactionTriggerStrategy = StreamerUtil.isTimeCompactionTriggerStrategy(conf); this.syncHive = conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED); this.syncMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java index 4d953c29f..441125b7d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java @@ -225,7 +225,7 @@ public class WriteProfile { .orElse(null))) .filter(Objects::nonNull) .collect(Collectors.toList()); - FileStatus[] commitFiles = WriteProfiles.getWritePathsOfInstants(basePath, hadoopConf, metadataList); + FileStatus[] commitFiles = WriteProfiles.getWritePathsOfInstants(basePath, hadoopConf, metadataList, table.getMetaClient().getTableType()); fsView = new HoodieTableFileSystemView(table.getMetaClient(), commitTimeline, commitFiles); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java index 0ab8f12de..e8aafd830 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java @@ -21,11 +21,13 @@ package org.apache.hudi.sink.partitioner.profile; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.core.fs.Path; @@ -37,11 +39,9 @@ import org.slf4j.LoggerFactory; import java.io.FileNotFoundException; import java.io.IOException; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.stream.Collectors; /** @@ -83,57 +83,67 @@ public class WriteProfiles { } /** - * Returns all the incremental write file path statuses with the given commits metadata. + * Returns all the incremental write file statuses with the given commits metadata. * * @param basePath Table base path * @param hadoopConf The hadoop conf * @param metadataList The commits metadata - * @return the file statuses array + * @param tableType The table type + * @return the file status array */ public static FileStatus[] getWritePathsOfInstants( Path basePath, Configuration hadoopConf, - List metadataList) { + List metadataList, + HoodieTableType tableType) { FileSystem fs = FSUtils.getFs(basePath.toString(), hadoopConf); - return metadataList.stream().map(metadata -> getWritePathsOfInstant(basePath, metadata, fs)) - .flatMap(Collection::stream).toArray(FileStatus[]::new); + Map uniqueIdToFileStatus = new HashMap<>(); + metadataList.forEach(metadata -> + uniqueIdToFileStatus.putAll(getFilesToReadOfInstant(basePath, metadata, fs, tableType))); + return uniqueIdToFileStatus.values().toArray(new FileStatus[0]); } /** - * Returns the commit file paths with given metadata. + * Returns the commit file status info with given metadata. * - * @param basePath Table base path - * @param metadata The metadata - * @param fs The filesystem - * @return the commit file status list + * @param basePath Table base path + * @param metadata The metadata + * @param fs The filesystem + * @param tableType The table type + * @return the commit file status info grouping by specific ID */ - private static List getWritePathsOfInstant(Path basePath, HoodieCommitMetadata metadata, FileSystem fs) { - return metadata.getFileIdAndFullPaths(basePath.toString()).values().stream() - .map(org.apache.hadoop.fs.Path::new) + private static Map getFilesToReadOfInstant( + Path basePath, + HoodieCommitMetadata metadata, + FileSystem fs, + HoodieTableType tableType) { + return getFilesToRead(metadata, basePath.toString(), tableType).entrySet().stream() // filter out the file paths that does not exist, some files may be cleaned by // the cleaner. - .filter(path -> { + .filter(entry -> { try { - return fs.exists(path); + return fs.exists(entry.getValue().getPath()); } catch (IOException e) { - LOG.error("Checking exists of path: {} error", path); - throw new HoodieException(e); - } - }).map(path -> { - try { - return fs.getFileStatus(path); - } catch (FileNotFoundException fe) { - LOG.warn("File {} was deleted by the cleaner, ignore", path); - return null; - } catch (IOException e) { - LOG.error("Get write status of path: {} error", path); + LOG.error("Checking exists of path: {} error", entry.getValue().getPath()); throw new HoodieException(e); } }) - // filter out crushed files - .filter(Objects::nonNull) - .filter(StreamerUtil::isValidFile) - .collect(Collectors.toList()); + .filter(entry -> StreamerUtil.isValidFile(entry.getValue())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + private static Map getFilesToRead( + HoodieCommitMetadata metadata, + String basePath, + HoodieTableType tableType) { + switch (tableType) { + case COPY_ON_WRITE: + return metadata.getFileIdToFileStatus(basePath); + case MERGE_ON_READ: + return metadata.getFullPathToFileStatus(basePath); + default: + throw new AssertionError(); + } } /** @@ -178,9 +188,8 @@ public class WriteProfiles { Path basePath, HoodieInstant instant, HoodieTimeline timeline) { - byte[] data = timeline.getInstantDetails(instant).get(); try { - return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class); + return HoodieInputFormatUtils.getCommitMetadata(instant, timeline); } catch (IOException e) { LOG.error("Get write metadata for table {} with instant {} and path: {} error", tableName, instant.getTimestamp(), basePath); diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java index 72d8dd6e2..653e182bf 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.sink.partitioner.profile.WriteProfiles; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; @@ -159,23 +160,24 @@ public class IncrementalInputSplits implements Serializable { .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList()); List archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName); if (archivedMetadataList.size() > 0) { - LOG.warn("" + LOG.warn("\n" + "--------------------------------------------------------------------------------\n" + "---------- caution: the reader has fall behind too much from the writer,\n" + "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n" + "--------------------------------------------------------------------------------"); } List metadataList = archivedMetadataList.size() > 0 - ? mergeList(activeMetadataList, archivedMetadataList) + // IMPORTANT: the merged metadata list must be in ascending order by instant time + ? mergeList(archivedMetadataList, activeMetadataList) : activeMetadataList; - Set writePartitions = getWritePartitionPaths(metadataList); + Set writePartitions = HoodieInputFormatUtils.getWritePartitionPaths(metadataList); // apply partition push down if (this.requiredPartitions != null) { writePartitions = writePartitions.stream() .filter(this.requiredPartitions::contains).collect(Collectors.toSet()); } - FileStatus[] fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList); + FileStatus[] fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType()); if (fileStatuses.length == 0) { LOG.warn("No files found for reading in user provided path."); return Result.EMPTY; @@ -275,19 +277,6 @@ public class IncrementalInputSplits implements Serializable { return instantStream.collect(Collectors.toList()); } - /** - * Returns all the incremental write partition paths as a set with the given commits metadata. - * - * @param metadataList The commits metadata - * @return the partition path set - */ - private Set getWritePartitionPaths(List metadataList) { - return metadataList.stream() - .map(HoodieCommitMetadata::getWritePartitionPaths) - .flatMap(Collection::stream) - .collect(Collectors.toSet()); - } - private static List mergeList(List list1, List list2) { List merged = new ArrayList<>(list1); merged.addAll(list2); diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java index bfd745288..c7bcc399e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java @@ -201,7 +201,7 @@ public class StreamReadMonitoringFunction } // update the issues instant time this.issuedInstant = result.getEndInstant(); - LOG.info("" + LOG.info("\n" + "------------------------------------------------------------\n" + "---------- consumed to instant: {}\n" + "------------------------------------------------------------", diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 898ba88fd..c552bed22 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -184,8 +184,8 @@ public class FlinkStreamerConfig extends Configuration { public Double writeTaskMaxSize = 1024D; @Parameter(names = {"--write-batch-size"}, - description = "Batch buffer size in MB to flush data into the underneath filesystem, default 64MB") - public Double writeBatchSize = 64D; + description = "Batch buffer size in MB to flush data into the underneath filesystem, default 256MB") + public Double writeBatchSize = 256D; @Parameter(names = {"--write-log-block-size"}, description = "Max log block size in MB for log file, default 128MB") public Integer writeLogBlockSize = 128; diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index b71726880..cfa29801c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -286,7 +286,7 @@ public class StreamerUtil { } /** - * Returns whether needs to schedule the compaction plan. + * Returns whether there is need to schedule the compaction plan. * * @param conf The flink configuration. */ @@ -297,6 +297,16 @@ public class StreamerUtil { && conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED); } + /** + * Returns whether the compaction trigger strategy is time based. + * + * @param conf The flink configuration. + */ + public static boolean isTimeCompactionTriggerStrategy(Configuration conf) { + final String strategy = conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY); + return FlinkOptions.TIME_ELAPSED.equalsIgnoreCase(strategy) || FlinkOptions.NUM_OR_TIME.equalsIgnoreCase(strategy); + } + /** * Creates the meta client for reader. * diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index b403f3c65..624a8e8c4 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -420,9 +420,9 @@ public class TestWriteCopyOnWrite { Map> dataBuffer = funcWrapper.getDataBuffer(); assertThat("Should have 1 data bucket", dataBuffer.size(), is(1)); - assertThat("3 records expect to flush out as a mini-batch", + assertThat("2 records expect to flush out as a mini-batch", dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), - is(3)); + is(2)); // this triggers the data write and event send funcWrapper.checkpointFunction(1); @@ -483,9 +483,9 @@ public class TestWriteCopyOnWrite { Map> dataBuffer = funcWrapper.getDataBuffer(); assertThat("Should have 1 data bucket", dataBuffer.size(), is(1)); - assertThat("3 records expect to flush out as a mini-batch", + assertThat("2 records expect to flush out as a mini-batch", dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), - is(3)); + is(2)); // this triggers the data write and event send funcWrapper.checkpointFunction(1); @@ -615,9 +615,9 @@ public class TestWriteCopyOnWrite { Map> dataBuffer = funcWrapper.getDataBuffer(); assertThat("Should have 1 data bucket", dataBuffer.size(), is(1)); - assertThat("3 records expect to flush out as a mini-batch", + assertThat("2 records expect to flush out as a mini-batch", dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), - is(3)); + is(2)); // this triggers the data write and event send funcWrapper.checkpointFunction(1); @@ -665,6 +665,7 @@ public class TestWriteCopyOnWrite { Map expected = new HashMap<>(); // the last 2 lines are merged expected.put("par1", "[" + + "id1,par1,id1,Danny,23,1,par1, " + "id1,par1,id1,Danny,23,1,par1, " + "id1,par1,id1,Danny,23,1,par1" + "]"); return expected; diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index a5812aa58..e31f974f3 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -79,8 +79,11 @@ public class HoodieDataSourceITCase extends AbstractTestBase { streamTableEnv = TableEnvironmentImpl.create(settings); streamTableEnv.getConfig().getConfiguration() .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); - streamTableEnv.getConfig().getConfiguration() - .setString("execution.checkpointing.interval", "2s"); + Configuration execConf = streamTableEnv.getConfig().getConfiguration(); + execConf.setString("execution.checkpointing.interval", "2s"); + // configure not to retry after failure + execConf.setString("restart-strategy", "fixed-delay"); + execConf.setString("restart-strategy.fixed-delay.attempts", "0"); settings = EnvironmentSettings.newInstance().inBatchMode().build(); batchTableEnv = TableEnvironmentImpl.create(settings); @@ -529,12 +532,37 @@ public class HoodieDataSourceITCase extends AbstractTestBase { } @ParameterizedTest - @EnumSource(value = ExecMode.class) - void testUpsertWithMiniBatches(ExecMode execMode) { + @EnumSource(value = HoodieTableType.class) + void testStreamWriteAndReadWithMiniBatches(HoodieTableType tableType) throws Exception { + // create filesystem table named source + String createSource = TestConfigurations.getFileSourceDDL("source", 4); + streamTableEnv.executeSql(createSource); + + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.READ_AS_STREAMING, true) + .option(FlinkOptions.TABLE_TYPE, tableType) + .option(FlinkOptions.READ_START_COMMIT, "earliest") + .option(FlinkOptions.WRITE_BATCH_SIZE, 0.00001) + .noPartition() + .end(); + streamTableEnv.executeSql(hoodieTableDDL); + String insertInto = "insert into t1 select * from source"; + execInsertSql(streamTableEnv, insertInto); + + // reading from the earliest commit instance. + List rows = execSelectSql(streamTableEnv, "select * from t1", 20); + assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); + } + + @ParameterizedTest + @MethodSource("executionModeAndTableTypeParams") + void testBatchUpsertWithMiniBatches(ExecMode execMode, HoodieTableType tableType) { TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.WRITE_BATCH_SIZE, "0.001") + .option(FlinkOptions.TABLE_TYPE, tableType) .end(); tableEnv.executeSql(hoodieTableDDL); @@ -958,7 +986,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { try { tableResult.getJobClient().get().getJobExecutionResult().get(); } catch (InterruptedException | ExecutionException ex) { - throw new RuntimeException(ex); + // ignored } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index 73573043c..c5b97f99f 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodiePartitionMetadata; -import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -487,43 +486,50 @@ public class HoodieInputFormatUtils { } /** - * Iterate through a list of commits in ascending order, and extract the file status of - * all affected files from the commits metadata grouping by partition path. If the files has + * Iterate through a list of commit metadata in natural order, and extract the file status of + * all affected files from the commits metadata grouping by file full path. If the files has * been touched multiple times in the given commits, the return value will keep the one * from the latest commit. - * @param basePath - * @param commitsToCheck - * @param timeline - * @return HashMap> - * @throws IOException + * + * @param basePath The table base path + * @param metadataList The metadata list to read the data from + * + * @return the affected file status array */ - public static HashMap> listAffectedFilesForCommits( - Path basePath, List commitsToCheck, HoodieTimeline timeline) throws IOException { + public static FileStatus[] listAffectedFilesForCommits(Path basePath, List metadataList) { // TODO: Use HoodieMetaTable to extract affected file directly. - HashMap> partitionToFileStatusesMap = new HashMap<>(); - List sortedCommitsToCheck = new ArrayList<>(commitsToCheck); - sortedCommitsToCheck.sort(HoodieInstant::compareTo); + HashMap fullPathToFileStatus = new HashMap<>(); // Iterate through the given commits. - for (HoodieInstant commit: sortedCommitsToCheck) { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(), - HoodieCommitMetadata.class); - // Iterate through all the affected partitions of a commit. - for (Map.Entry> entry: commitMetadata.getPartitionToWriteStats().entrySet()) { - if (!partitionToFileStatusesMap.containsKey(entry.getKey())) { - partitionToFileStatusesMap.put(entry.getKey(), new HashMap<>()); - } - // Iterate through all the written files of this partition. - for (HoodieWriteStat stat : entry.getValue()) { - String relativeFilePath = stat.getPath(); - Path fullPath = relativeFilePath != null ? FSUtils.getPartitionPath(basePath, relativeFilePath) : null; - if (fullPath != null) { - FileStatus fs = new FileStatus(stat.getFileSizeInBytes(), false, 0, 0, - 0, fullPath); - partitionToFileStatusesMap.get(entry.getKey()).put(fullPath.getName(), fs); - } - } - } + for (HoodieCommitMetadata metadata: metadataList) { + fullPathToFileStatus.putAll(metadata.getFullPathToFileStatus(basePath.toString())); } - return partitionToFileStatusesMap; + return fullPathToFileStatus.values().toArray(new FileStatus[0]); + } + + /** + * Returns all the incremental write partition paths as a set with the given commits metadata. + * + * @param metadataList The commits metadata + * @return the partition path set + */ + public static Set getWritePartitionPaths(List metadataList) { + return metadataList.stream() + .map(HoodieCommitMetadata::getWritePartitionPaths) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + } + + /** + * Returns the commit metadata of the given instant. + * + * @param instant The hoodie instant + * @param timeline The timeline + * @return the commit metadata + */ + public static HoodieCommitMetadata getCommitMetadata( + HoodieInstant instant, + HoodieTimeline timeline) throws IOException { + byte[] data = timeline.getInstantDetails(instant).get(); + return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class); } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index f1bc84751..b4a9800d9 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -22,8 +22,10 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForCommits +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getCommitMetadata +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getWritePartitionPaths import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes -import org.apache.hadoop.fs.{FileStatus, GlobPattern, Path} +import org.apache.hadoop.fs.{GlobPattern, Path} import org.apache.hadoop.mapred.JobConf import org.apache.log4j.LogManager import org.apache.spark.rdd.RDD @@ -35,7 +37,6 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SQLContext} import scala.collection.JavaConversions._ -import scala.collection.mutable.ListBuffer /** * Experimental. @@ -162,16 +163,12 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, } def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = { - val partitionsWithFileStatus = listAffectedFilesForCommits(new Path(metaClient.getBasePath), - commitsToReturn, commitsTimelineToReturn) - val affectedFileStatus = new ListBuffer[FileStatus] - partitionsWithFileStatus.iterator.foreach(p => - p._2.iterator.foreach(status => affectedFileStatus += status._2)) - val fsView = new HoodieTableFileSystemView(metaClient, - commitsTimelineToReturn, affectedFileStatus.toArray) + val metadataList = commitsToReturn.map(instant => getCommitMetadata(instant, commitsTimelineToReturn)) + val affectedFileStatus = listAffectedFilesForCommits(new Path(metaClient.getBasePath), metadataList) + val fsView = new HoodieTableFileSystemView(metaClient, commitsTimelineToReturn, affectedFileStatus) // Iterate partitions to create splits - val fileGroup = partitionsWithFileStatus.keySet().flatMap(partitionPath => + val fileGroup = getWritePartitionPaths(metadataList).flatMap(partitionPath => fsView.getAllFileGroups(partitionPath).iterator() ).toList val latestCommit = fsView.getLastInstant.get().getTimestamp