From 9056c68744a3f31ac2625e004ec6e155d2e86be9 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Sat, 14 Aug 2021 05:18:49 -0700 Subject: [PATCH] [HUDI-2305] Add MARKERS.type and fix marker-based rollback (#3472) - Rollback infers the directory structure and does rollback based on the strategy used while markers were written. "write markers type" in write config is used to determine marker strategy only for new writes. --- .../embedded/EmbeddedTimelineService.java | 4 +- .../apache/hudi/config/HoodieWriteConfig.java | 4 +- .../hudi/table/marker/DirectWriteMarkers.java | 5 +- .../marker/MarkerBasedRollbackUtils.java | 78 +++++++ .../hudi/table/marker/WriteMarkers.java | 18 -- .../table/marker/WriteMarkersFactory.java | 1 + .../FlinkMarkerBasedRollbackStrategy.java | 8 +- .../upgrade/ZeroToOneUpgradeHandler.java | 4 +- .../JavaMarkerBasedRollbackStrategy.java | 9 +- .../SparkMarkerBasedRollbackStrategy.java | 11 +- .../upgrade/ZeroToOneUpgradeHandler.java | 8 +- .../TestTimelineServerBasedWriteMarkers.java | 41 ++-- .../table/marker/TestWriteMarkersBase.java | 11 +- .../hudi/common}/table/marker/MarkerType.java | 15 +- .../apache/hudi/common/util/MarkerUtils.java | 219 ++++++++++++++++++ .../service/handlers/MarkerHandler.java | 2 +- .../handlers/marker/MarkerDirState.java | 108 ++++----- 17 files changed, 403 insertions(+), 143 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerBasedRollbackUtils.java rename {hudi-client/hudi-client-common/src/main/java/org/apache/hudi => hudi-common/src/main/java/org/apache/hudi/common}/table/marker/MarkerType.java (57%) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java index 1a6ad9e7d..20f9b75a9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java @@ -18,15 +18,15 @@ package org.apache.hudi.client.embedded; -import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.util.NetworkUtils; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.marker.MarkerType; import org.apache.hudi.timeline.service.TimelineService; import org.apache.log4j.LogManager; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 9ba4fab47..122b90f80 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -30,11 +30,12 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.model.HoodieCleaningPolicy; -import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.util.ReflectionUtils; @@ -46,7 +47,6 @@ import org.apache.hudi.metrics.MetricsReporterType; import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite; import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; -import org.apache.hudi.table.marker.MarkerType; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.orc.CompressionKind; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java index cf627d057..1223cb75b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.MarkerUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -149,7 +150,7 @@ public class DirectWriteMarkers extends WriteMarkers { } private String translateMarkerToDataPath(String markerPath) { - String rPath = stripMarkerFolderPrefix(markerPath); + String rPath = MarkerUtils.stripMarkerFolderPrefix(markerPath, basePath, instantTime); return stripMarkerSuffix(rPath); } @@ -158,7 +159,7 @@ public class DirectWriteMarkers extends WriteMarkers { Set markerFiles = new HashSet<>(); if (doesMarkerDirExist()) { FSUtils.processFiles(fs, markerDirPath.toString(), fileStatus -> { - markerFiles.add(stripMarkerFolderPrefix(fileStatus.getPath().toString())); + markerFiles.add(MarkerUtils.stripMarkerFolderPrefix(fileStatus.getPath().toString(), basePath, instantTime)); return true; }, false); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerBasedRollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerBasedRollbackUtils.java new file mode 100644 index 000000000..9d1f37abd --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerBasedRollbackUtils.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.marker; + +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.table.marker.MarkerType; +import org.apache.hudi.common.util.MarkerUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.FileSystem; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * A utility class for marker-based rollback. + */ +public class MarkerBasedRollbackUtils { + /** + * Gets all marker paths. + * + * @param table instance of {@code HoodieTable} to use + * @param context instance of {@code HoodieEngineContext} to use + * @param instant instant of interest to rollback + * @param parallelism parallelism to use + * @return a list of all markers + * @throws IOException + */ + public static List getAllMarkerPaths(HoodieTable table, HoodieEngineContext context, + String instant, int parallelism) throws IOException { + String markerDir = table.getMetaClient().getMarkerFolderPath(instant); + FileSystem fileSystem = table.getMetaClient().getFs(); + Option markerTypeOption = MarkerUtils.readMarkerType(fileSystem, markerDir); + + // If there is no marker type file "MARKERS.type", we assume "DIRECT" markers are used + if (!markerTypeOption.isPresent()) { + WriteMarkers writeMarkers = WriteMarkersFactory.get(MarkerType.DIRECT, table, instant); + return new ArrayList<>(writeMarkers.allMarkerFilePaths()); + } + + switch (markerTypeOption.get()) { + case TIMELINE_SERVER_BASED: + // Reads all markers written by the timeline server + Map> markersMap = + MarkerUtils.readTimelineServerBasedMarkersFromFileSystem( + markerDir, fileSystem, context, parallelism); + return markersMap.values().stream().flatMap(Collection::stream) + .collect(Collectors.toCollection(ArrayList::new)); + default: + throw new HoodieException( + "The marker type \"" + markerTypeOption.get().name() + "\" is not supported."); + } + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java index 0d1dba8a2..c243b9b7c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java @@ -23,7 +23,6 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieIOException; import org.apache.hadoop.fs.Path; @@ -127,23 +126,6 @@ public abstract class WriteMarkers implements Serializable { return new Path(path, markerFileName); } - /** - * Strips the folder prefix of the marker file path. - * - * @param fullMarkerPath the full path of the marker file - * @return marker file name - */ - protected String stripMarkerFolderPrefix(String fullMarkerPath) { - ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN)); - String markerRootPath = Path.getPathWithoutSchemeAndAuthority( - new Path(String.format("%s/%s/%s", basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime))).toString(); - int begin = - fullMarkerPath.indexOf(markerRootPath); - ValidationUtils.checkArgument(begin >= 0, - "Not in marker dir. Marker Path=" + fullMarkerPath + ", Expected Marker Root=" + markerRootPath); - return fullMarkerPath.substring(begin + markerRootPath.length() + 1); - } - /** * Deletes the marker directory. * diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java index d8ec89db0..044b258e8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java @@ -20,6 +20,7 @@ package org.apache.hudi.table.marker; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.StorageSchemes; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.HoodieTable; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java index f664fba1b..bb7ec7600 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java @@ -32,11 +32,10 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.marker.MarkerBasedRollbackUtils; import org.apache.hudi.table.marker.WriteMarkers; -import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hadoop.fs.FileStatus; -import java.util.ArrayList; import java.io.IOException; import java.util.List; @@ -54,8 +53,9 @@ public class FlinkMarkerBasedRollbackStrategy ext @Override public List execute(HoodieInstant instantToRollback) { try { - WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp()); - List rollbackStats = context.map(new ArrayList<>(writeMarkers.allMarkerFilePaths()), markerFilePath -> { + List markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths( + table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism()); + List rollbackStats = context.map(markerPaths, markerFilePath -> { String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1); IOType type = IOType.valueOf(typeStr); switch (type) { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java index 61a294a74..3996765ca 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java @@ -19,11 +19,12 @@ package org.apache.hudi.table.upgrade; import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -37,7 +38,6 @@ import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest; import org.apache.hudi.table.action.rollback.RollbackUtils; import org.apache.hudi.table.marker.WriteMarkers; import org.apache.hudi.table.marker.WriteMarkersFactory; -import org.apache.hudi.table.marker.MarkerType; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java index eed1774e2..150f663cf 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java @@ -30,10 +30,9 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.marker.MarkerBasedRollbackUtils; import org.apache.hudi.table.marker.WriteMarkers; -import org.apache.hudi.table.marker.WriteMarkersFactory; -import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -49,9 +48,9 @@ public class JavaMarkerBasedRollbackStrategy exte @Override public List execute(HoodieInstant instantToRollback) { try { - WriteMarkers writeMarkers = - WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp()); - List rollbackStats = context.map(new ArrayList<>(writeMarkers.allMarkerFilePaths()), markerFilePath -> { + List markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths( + table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism()); + List rollbackStats = context.map(markerPaths, markerFilePath -> { String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1); IOType type = IOType.valueOf(typeStr); switch (type) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java index ba6bf1fe4..0adacd28c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java @@ -33,15 +33,14 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.marker.MarkerBasedRollbackUtils; import org.apache.hudi.table.marker.WriteMarkers; -import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -58,11 +57,11 @@ public class SparkMarkerBasedRollbackStrategy ext public List execute(HoodieInstant instantToRollback) { JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); try { - WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp()); - List markerFilePaths = new ArrayList<>(writeMarkers.allMarkerFilePaths()); - int parallelism = Math.max(Math.min(markerFilePaths.size(), config.getRollbackParallelism()), 1); + List markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths( + table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism()); + int parallelism = Math.max(Math.min(markerPaths.size(), config.getRollbackParallelism()), 1); jsc.setJobGroup(this.getClass().getSimpleName(), "Rolling back using marker files"); - return jsc.parallelize(markerFilePaths, parallelism) + return jsc.parallelize(markerPaths, parallelism) .map(markerFilePath -> { String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1); IOType type = IOType.valueOf(typeStr); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java index ad951959b..8470b9685 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -33,13 +34,12 @@ import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper; import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hudi.table.action.rollback.RollbackUtils; import org.apache.hudi.table.marker.WriteMarkers; import org.apache.hudi.table.marker.WriteMarkersFactory; -import org.apache.hudi.table.marker.MarkerType; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import java.util.List; import java.util.stream.Collectors; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestTimelineServerBasedWriteMarkers.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestTimelineServerBasedWriteMarkers.java index 3300c95fe..583883ccb 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestTimelineServerBasedWriteMarkers.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestTimelineServerBasedWriteMarkers.java @@ -23,14 +23,15 @@ import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; -import org.apache.hudi.common.testutils.FileSystemTestUtils; import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.MarkerUtils; import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.timeline.service.TimelineService; -import org.apache.hudi.timeline.service.handlers.marker.MarkerDirState; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -40,16 +41,15 @@ import org.apache.spark.api.java.JavaSparkContext; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import java.io.BufferedReader; import java.io.Closeable; import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; +import java.util.Collection; import java.util.List; import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertIterableEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; public class TestTimelineServerBasedWriteMarkers extends TestWriteMarkersBase { TimelineService timelineService; @@ -94,26 +94,10 @@ public class TestTimelineServerBasedWriteMarkers extends TestWriteMarkersBase { @Override void verifyMarkersInFileSystem() throws IOException { - List allMarkers = FileSystemTestUtils.listRecursive(fs, markerFolderPath) - .stream().filter(status -> status.getPath().getName().contains(MarkerDirState.MARKERS_FILENAME_PREFIX)) - .flatMap(status -> { - // Read all markers stored in each marker file maintained by the timeline service - FSDataInputStream fsDataInputStream = null; - BufferedReader bufferedReader = null; - List markers = null; - try { - fsDataInputStream = fs.open(status.getPath()); - bufferedReader = new BufferedReader(new InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8)); - markers = bufferedReader.lines().collect(Collectors.toList()); - } catch (IOException e) { - e.printStackTrace(); - } finally { - closeQuietly(bufferedReader); - closeQuietly(fsDataInputStream); - } - return markers.stream(); - }) - .sorted() + // Verifies the markers + List allMarkers = MarkerUtils.readTimelineServerBasedMarkersFromFileSystem( + markerFolderPath.toString(), fs, context, 1) + .values().stream().flatMap(Collection::stream).sorted() .collect(Collectors.toList()); assertEquals(3, allMarkers.size()); assertIterableEquals(CollectionUtils.createImmutableList( @@ -121,6 +105,13 @@ public class TestTimelineServerBasedWriteMarkers extends TestWriteMarkersBase { "2020/06/02/file2.marker.APPEND", "2020/06/03/file3.marker.CREATE"), allMarkers); + // Verifies the marker type file + Path markerTypeFilePath = new Path(markerFolderPath, MarkerUtils.MARKER_TYPE_FILENAME); + assertTrue(MarkerUtils.doesMarkerTypeFileExist(fs, markerFolderPath.toString())); + FSDataInputStream fsDataInputStream = fs.open(markerTypeFilePath); + assertEquals(MarkerType.TIMELINE_SERVER_BASED.toString(), + FileIOUtils.readAsUTFString(fsDataInputStream)); + closeQuietly(fsDataInputStream); } /** diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java index b3ad41754..0298ed37a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.testutils.FileSystemTestUtils; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.MarkerUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hadoop.fs.FileSystem; @@ -100,8 +101,10 @@ public abstract class TestWriteMarkersBase extends HoodieCommonTestHarness { createSomeMarkers(); // add invalid file createInvalidFile("2020/06/01", "invalid_file3"); - int fileSize = FileSystemTestUtils.listRecursive(fs, markerFolderPath).size(); - assertEquals(fileSize,4); + long fileSize = FileSystemTestUtils.listRecursive(fs, markerFolderPath).stream() + .filter(fileStatus -> !fileStatus.getPath().getName().contains(MarkerUtils.MARKER_TYPE_FILENAME)) + .count(); + assertEquals(fileSize, 4); // then assertIterableEquals(CollectionUtils.createImmutableList( @@ -118,7 +121,9 @@ public abstract class TestWriteMarkersBase extends HoodieCommonTestHarness { // then assertIterableEquals(CollectionUtils.createImmutableList("2020/06/01/file1.marker.MERGE", "2020/06/02/file2.marker.APPEND", "2020/06/03/file3.marker.CREATE"), - writeMarkers.allMarkerFilePaths().stream().sorted().collect(Collectors.toList()) + writeMarkers.allMarkerFilePaths().stream() + .filter(path -> !path.contains(MarkerUtils.MARKER_TYPE_FILENAME)) + .sorted().collect(Collectors.toList()) ); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerType.java b/hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerType.java similarity index 57% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerType.java rename to hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerType.java index 06d7903eb..2b0a28df8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerType.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerType.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.table.marker; +package org.apache.hudi.common.table.marker; /** * Marker type indicating how markers are stored in the file system. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java new file mode 100644 index 000000000..9d0965922 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.util; + +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.marker.MarkerType; +import org.apache.hudi.common.util.collection.ImmutablePair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.util.FileIOUtils.closeQuietly; + +/** + * A utility class for marker related operations. + */ +public class MarkerUtils { + public static final String MARKERS_FILENAME_PREFIX = "MARKERS"; + public static final String MARKER_TYPE_FILENAME = MARKERS_FILENAME_PREFIX + ".type"; + private static final Logger LOG = LogManager.getLogger(MarkerUtils.class); + + /** + * Strips the folder prefix of the marker file path corresponding to a data file. + * + * @param fullMarkerPath the full path of the marker file + * @param basePath the base path + * @param instantTime instant of interest + * @return marker file name + */ + public static String stripMarkerFolderPrefix(String fullMarkerPath, String basePath, String instantTime) { + ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN)); + String markerRootPath = Path.getPathWithoutSchemeAndAuthority( + new Path(String.format("%s/%s/%s", basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime))).toString(); + return stripMarkerFolderPrefix(fullMarkerPath, markerRootPath); + } + + /** + * Strips the marker folder prefix of any file path under the marker directory. + * + * @param fullMarkerPath the full path of the file + * @param markerDir marker directory + * @return file name + */ + public static String stripMarkerFolderPrefix(String fullMarkerPath, String markerDir) { + int begin = fullMarkerPath.indexOf(markerDir); + ValidationUtils.checkArgument(begin >= 0, + "Not in marker dir. Marker Path=" + fullMarkerPath + ", Expected Marker Root=" + markerDir); + return fullMarkerPath.substring(begin + markerDir.length() + 1); + } + + /** + * @param fileSystem file system to use. + * @param markerDir marker directory. + * @return {@code true} if the MARKERS.type file exists; {@code false} otherwise. + */ + public static boolean doesMarkerTypeFileExist(FileSystem fileSystem, String markerDir) throws IOException { + return fileSystem.exists(new Path(markerDir, MARKER_TYPE_FILENAME)); + } + + /** + * Reads the marker type from `MARKERS.type` file. + * + * @param fileSystem file system to use. + * @param markerDir marker directory. + * @return the marker type, or empty if the marker type file does not exist. + */ + public static Option readMarkerType(FileSystem fileSystem, String markerDir) { + Path markerTypeFilePath = new Path(markerDir, MARKER_TYPE_FILENAME); + FSDataInputStream fsDataInputStream = null; + Option content = Option.empty(); + try { + if (!doesMarkerTypeFileExist(fileSystem, markerDir)) { + return Option.empty(); + } + fsDataInputStream = fileSystem.open(markerTypeFilePath); + content = Option.of(MarkerType.valueOf(FileIOUtils.readAsUTFString(fsDataInputStream))); + } catch (IOException e) { + throw new HoodieIOException("Cannot read marker type file " + markerTypeFilePath.toString() + + "; " + e.getMessage(), e); + } finally { + closeQuietly(fsDataInputStream); + } + return content; + } + + /** + * Writes the marker type to the file `MARKERS.type`. + * + * @param markerType marker type. + * @param fileSystem file system to use. + * @param markerDir marker directory. + */ + public static void writeMarkerTypeToFile(MarkerType markerType, FileSystem fileSystem, String markerDir) { + Path markerTypeFilePath = new Path(markerDir, MARKER_TYPE_FILENAME); + FSDataOutputStream fsDataOutputStream = null; + BufferedWriter bufferedWriter = null; + try { + fsDataOutputStream = fileSystem.create(markerTypeFilePath, false); + bufferedWriter = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)); + bufferedWriter.write(markerType.toString()); + } catch (IOException e) { + throw new HoodieException("Failed to create marker type file " + markerTypeFilePath.toString() + + "; " + e.getMessage(), e); + } finally { + closeQuietly(bufferedWriter); + closeQuietly(fsDataOutputStream); + } + } + + /** + * Deletes `MARKERS.type` file. + * + * @param fileSystem file system to use. + * @param markerDir marker directory. + */ + public static void deleteMarkerTypeFile(FileSystem fileSystem, String markerDir) { + Path markerTypeFilePath = new Path(markerDir, MARKER_TYPE_FILENAME); + try { + fileSystem.delete(markerTypeFilePath, false); + } catch (IOException e) { + throw new HoodieIOException("Cannot delete marker type file " + markerTypeFilePath.toString() + + "; " + e.getMessage(), e); + } + } + + /** + * Reads files containing the markers written by timeline-server-based marker mechanism. + * + * @param markerDir marker directory. + * @param fileSystem file system to use. + * @param context instance of {@link HoodieEngineContext} to use + * @param parallelism parallelism to use + * @return A {@code Map} of file name to the set of markers stored in the file. + */ + public static Map> readTimelineServerBasedMarkersFromFileSystem( + String markerDir, FileSystem fileSystem, HoodieEngineContext context, int parallelism) { + Path dirPath = new Path(markerDir); + try { + if (fileSystem.exists(dirPath)) { + FileStatus[] fileStatuses = fileSystem.listStatus(dirPath); + Predicate prefixFilter = pathStr -> pathStr.contains(MARKERS_FILENAME_PREFIX); + Predicate markerTypeFilter = + pathStr -> !stripMarkerFolderPrefix(pathStr, markerDir).equals(MARKER_TYPE_FILENAME); + List markerDirSubPaths = Arrays.stream(fileStatuses) + .map(fileStatus -> fileStatus.getPath().toString()) + .filter(prefixFilter.and(markerTypeFilter)) + .collect(Collectors.toList()); + + if (markerDirSubPaths.size() > 0) { + SerializableConfiguration conf = new SerializableConfiguration(fileSystem.getConf()); + int actualParallelism = Math.min(markerDirSubPaths.size(), parallelism); + return context.mapToPair(markerDirSubPaths, markersFilePathStr -> { + Path markersFilePath = new Path(markersFilePathStr); + FileSystem fs = markersFilePath.getFileSystem(conf.get()); + FSDataInputStream fsDataInputStream = null; + BufferedReader bufferedReader = null; + Set markers = new HashSet<>(); + try { + LOG.debug("Read marker file: " + markersFilePathStr); + fsDataInputStream = fs.open(markersFilePath); + bufferedReader = new BufferedReader(new InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8)); + markers = bufferedReader.lines().collect(Collectors.toSet()); + } catch (IOException e) { + throw new HoodieIOException("Failed to read file " + markersFilePathStr, e); + } finally { + closeQuietly(bufferedReader); + closeQuietly(fsDataInputStream); + } + return new ImmutablePair<>(markersFilePathStr, markers); + }, actualParallelism); + } + } + return new HashMap<>(); + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + } +} diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java index 7e5bdd818..e793c2043 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java @@ -23,8 +23,8 @@ import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.timeline.service.TimelineService; -import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationFuture; import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationDispatchingRunnable; +import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationFuture; import org.apache.hudi.timeline.service.handlers.marker.MarkerDirState; import io.javalin.Context; diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java index 47c0c5971..9075aae8e 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java @@ -21,15 +21,15 @@ package org.apache.hudi.timeline.service.handlers.marker; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.metrics.Registry; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.MarkerUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -38,10 +38,8 @@ import org.apache.hadoop.util.StringUtils; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.IOException; -import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.Serializable; import java.nio.charset.StandardCharsets; @@ -56,6 +54,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.hudi.common.util.FileIOUtils.closeQuietly; +import static org.apache.hudi.common.util.MarkerUtils.MARKERS_FILENAME_PREFIX; import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult; /** @@ -64,7 +63,6 @@ import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult; * The operations inside this class is designed to be thread-safe. */ public class MarkerDirState implements Serializable { - public static final String MARKERS_FILENAME_PREFIX = "MARKERS"; private static final Logger LOG = LogManager.getLogger(MarkerDirState.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // Marker directory @@ -89,6 +87,7 @@ public class MarkerDirState implements Serializable { // Last underlying file index used, for finding the next file index // in a round-robin fashion private int lastFileIndexUsed = -1; + private boolean isMarkerTypeWritten = false; public MarkerDirState(String markerDirPath, int markerBatchNumThreads, FileSystem fileSystem, Registry metricsRegistry, HoodieEngineContext hoodieEngineContext, int parallelism) { @@ -104,7 +103,7 @@ public class MarkerDirState implements Serializable { } /** - * @return {@code true} if the marker directory exists in the system. + * @return {@code true} if the marker directory exists in the system. */ public boolean exists() { try { @@ -212,6 +211,12 @@ public class MarkerDirState implements Serializable { } future.setResult(!exists); } + + if (!isMarkerTypeWritten) { + // Create marker directory and write marker type to MARKERS.type + writeMarkerTypeToFile(); + isMarkerTypeWritten = true; + } } flushMarkersToFile(fileIndex); markFileAsAvailable(fileIndex); @@ -266,62 +271,49 @@ public class MarkerDirState implements Serializable { * Syncs all markers maintained in the underlying files under the marker directory in the file system. */ private void syncMarkersFromFileSystem() { - Path dirPath = new Path(markerDirPath); - try { - if (fileSystem.exists(dirPath)) { - FileStatus[] fileStatuses = fileSystem.listStatus(dirPath); - List markerDirSubPaths = Arrays.stream(fileStatuses) - .map(fileStatus -> fileStatus.getPath().toString()) - .filter(pathStr -> pathStr.contains(MARKERS_FILENAME_PREFIX)) - .collect(Collectors.toList()); + Map> fileMarkersSetMap = MarkerUtils.readTimelineServerBasedMarkersFromFileSystem( + markerDirPath, fileSystem, hoodieEngineContext, parallelism); - if (markerDirSubPaths.size() > 0) { - SerializableConfiguration conf = new SerializableConfiguration(fileSystem.getConf()); - int actualParallelism = Math.min(markerDirSubPaths.size(), parallelism); - Map> fileMarkersSetMap = - hoodieEngineContext.mapToPair(markerDirSubPaths, markersFilePathStr -> { - Path markersFilePath = new Path(markersFilePathStr); - FileSystem fileSystem = markersFilePath.getFileSystem(conf.get()); - FSDataInputStream fsDataInputStream = null; - BufferedReader bufferedReader = null; - Set markers = new HashSet<>(); - try { - LOG.debug("Read marker file: " + markersFilePathStr); - fsDataInputStream = fileSystem.open(markersFilePath); - bufferedReader = new BufferedReader(new InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8)); - markers = bufferedReader.lines().collect(Collectors.toSet()); - bufferedReader.close(); - fsDataInputStream.close(); - } catch (IOException e) { - throw new HoodieIOException("Failed to read MARKERS file " + markerDirPath, e); - } finally { - closeQuietly(bufferedReader); - closeQuietly(fsDataInputStream); - } - return new ImmutablePair<>(markersFilePathStr, markers); - }, actualParallelism); - - for (String markersFilePathStr: fileMarkersSetMap.keySet()) { - Set fileMarkers = fileMarkersSetMap.get(markersFilePathStr); - if (!fileMarkers.isEmpty()) { - int index = parseMarkerFileIndex(markersFilePathStr); - - if (index >= 0) { - fileMarkersMap.put(index, new StringBuilder(StringUtils.join(",", fileMarkers))); - allMarkers.addAll(fileMarkers); - } - } - } + for (String markersFilePathStr : fileMarkersSetMap.keySet()) { + Set fileMarkers = fileMarkersSetMap.get(markersFilePathStr); + if (!fileMarkers.isEmpty()) { + int index = parseMarkerFileIndex(markersFilePathStr); + if (index >= 0) { + fileMarkersMap.put(index, new StringBuilder(StringUtils.join(",", fileMarkers))); + allMarkers.addAll(fileMarkers); } } - } catch (IOException ioe) { - throw new HoodieIOException(ioe.getMessage(), ioe); + } + + try { + if (MarkerUtils.doesMarkerTypeFileExist(fileSystem, markerDirPath)) { + isMarkerTypeWritten = true; + } + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + } + + /** + * Writes marker type, "TIMELINE_SERVER_BASED", to file. + */ + private void writeMarkerTypeToFile() { + Path dirPath = new Path(markerDirPath); + try { + if (!fileSystem.exists(dirPath)) { + // There is no existing marker directory, create a new directory and write marker type + fileSystem.mkdirs(dirPath); + MarkerUtils.writeMarkerTypeToFile(MarkerType.TIMELINE_SERVER_BASED, fileSystem, markerDirPath); + } + } catch (IOException e) { + throw new HoodieIOException("Failed to write marker type file in " + markerDirPath + + ": " + e.getMessage(), e); } } /** * Parses the marker file index from the marker file path. - * + *

* E.g., if the marker file path is /tmp/table/.hoodie/.temp/000/MARKERS3, the index returned is 3. * * @param markerFilePathStr full path of marker file @@ -350,14 +342,6 @@ public class MarkerDirState implements Serializable { LOG.debug("Write to " + markerDirPath + "/" + MARKERS_FILENAME_PREFIX + markerFileIndex); HoodieTimer timer = new HoodieTimer().startTimer(); Path markersFilePath = new Path(markerDirPath, MARKERS_FILENAME_PREFIX + markerFileIndex); - Path dirPath = markersFilePath.getParent(); - try { - if (!fileSystem.exists(dirPath)) { - fileSystem.mkdirs(dirPath); - } - } catch (IOException e) { - throw new HoodieIOException("Failed to make dir " + dirPath, e); - } FSDataOutputStream fsDataOutputStream = null; BufferedWriter bufferedWriter = null; try {