diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java index 1a6ad9e7d..20f9b75a9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java @@ -18,15 +18,15 @@ package org.apache.hudi.client.embedded; -import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.util.NetworkUtils; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.marker.MarkerType; import org.apache.hudi.timeline.service.TimelineService; import org.apache.log4j.LogManager; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 9ba4fab47..122b90f80 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -30,11 +30,12 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.model.HoodieCleaningPolicy; -import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.util.ReflectionUtils; @@ -46,7 +47,6 @@ import org.apache.hudi.metrics.MetricsReporterType; import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite; import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; -import org.apache.hudi.table.marker.MarkerType; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.orc.CompressionKind; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java index cf627d057..1223cb75b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.MarkerUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -149,7 +150,7 @@ public class DirectWriteMarkers extends WriteMarkers { } private String translateMarkerToDataPath(String markerPath) { - String rPath = stripMarkerFolderPrefix(markerPath); + String rPath = MarkerUtils.stripMarkerFolderPrefix(markerPath, basePath, instantTime); return stripMarkerSuffix(rPath); } @@ -158,7 +159,7 @@ public class DirectWriteMarkers extends WriteMarkers { Set markerFiles = new HashSet<>(); if (doesMarkerDirExist()) { FSUtils.processFiles(fs, markerDirPath.toString(), fileStatus -> { - markerFiles.add(stripMarkerFolderPrefix(fileStatus.getPath().toString())); + markerFiles.add(MarkerUtils.stripMarkerFolderPrefix(fileStatus.getPath().toString(), basePath, instantTime)); return true; }, false); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerBasedRollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerBasedRollbackUtils.java new file mode 100644 index 000000000..9d1f37abd --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerBasedRollbackUtils.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.marker; + +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.table.marker.MarkerType; +import org.apache.hudi.common.util.MarkerUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.FileSystem; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * A utility class for marker-based rollback. + */ +public class MarkerBasedRollbackUtils { + /** + * Gets all marker paths. + * + * @param table instance of {@code HoodieTable} to use + * @param context instance of {@code HoodieEngineContext} to use + * @param instant instant of interest to rollback + * @param parallelism parallelism to use + * @return a list of all markers + * @throws IOException + */ + public static List getAllMarkerPaths(HoodieTable table, HoodieEngineContext context, + String instant, int parallelism) throws IOException { + String markerDir = table.getMetaClient().getMarkerFolderPath(instant); + FileSystem fileSystem = table.getMetaClient().getFs(); + Option markerTypeOption = MarkerUtils.readMarkerType(fileSystem, markerDir); + + // If there is no marker type file "MARKERS.type", we assume "DIRECT" markers are used + if (!markerTypeOption.isPresent()) { + WriteMarkers writeMarkers = WriteMarkersFactory.get(MarkerType.DIRECT, table, instant); + return new ArrayList<>(writeMarkers.allMarkerFilePaths()); + } + + switch (markerTypeOption.get()) { + case TIMELINE_SERVER_BASED: + // Reads all markers written by the timeline server + Map> markersMap = + MarkerUtils.readTimelineServerBasedMarkersFromFileSystem( + markerDir, fileSystem, context, parallelism); + return markersMap.values().stream().flatMap(Collection::stream) + .collect(Collectors.toCollection(ArrayList::new)); + default: + throw new HoodieException( + "The marker type \"" + markerTypeOption.get().name() + "\" is not supported."); + } + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java index 0d1dba8a2..c243b9b7c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java @@ -23,7 +23,6 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieIOException; import org.apache.hadoop.fs.Path; @@ -127,23 +126,6 @@ public abstract class WriteMarkers implements Serializable { return new Path(path, markerFileName); } - /** - * Strips the folder prefix of the marker file path. - * - * @param fullMarkerPath the full path of the marker file - * @return marker file name - */ - protected String stripMarkerFolderPrefix(String fullMarkerPath) { - ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN)); - String markerRootPath = Path.getPathWithoutSchemeAndAuthority( - new Path(String.format("%s/%s/%s", basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime))).toString(); - int begin = - fullMarkerPath.indexOf(markerRootPath); - ValidationUtils.checkArgument(begin >= 0, - "Not in marker dir. Marker Path=" + fullMarkerPath + ", Expected Marker Root=" + markerRootPath); - return fullMarkerPath.substring(begin + markerRootPath.length() + 1); - } - /** * Deletes the marker directory. * diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java index d8ec89db0..044b258e8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java @@ -20,6 +20,7 @@ package org.apache.hudi.table.marker; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.StorageSchemes; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.HoodieTable; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java index f664fba1b..bb7ec7600 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java @@ -32,11 +32,10 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.marker.MarkerBasedRollbackUtils; import org.apache.hudi.table.marker.WriteMarkers; -import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hadoop.fs.FileStatus; -import java.util.ArrayList; import java.io.IOException; import java.util.List; @@ -54,8 +53,9 @@ public class FlinkMarkerBasedRollbackStrategy ext @Override public List execute(HoodieInstant instantToRollback) { try { - WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp()); - List rollbackStats = context.map(new ArrayList<>(writeMarkers.allMarkerFilePaths()), markerFilePath -> { + List markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths( + table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism()); + List rollbackStats = context.map(markerPaths, markerFilePath -> { String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1); IOType type = IOType.valueOf(typeStr); switch (type) { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java index 61a294a74..3996765ca 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java @@ -19,11 +19,12 @@ package org.apache.hudi.table.upgrade; import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -37,7 +38,6 @@ import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest; import org.apache.hudi.table.action.rollback.RollbackUtils; import org.apache.hudi.table.marker.WriteMarkers; import org.apache.hudi.table.marker.WriteMarkersFactory; -import org.apache.hudi.table.marker.MarkerType; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java index eed1774e2..150f663cf 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java @@ -30,10 +30,9 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.marker.MarkerBasedRollbackUtils; import org.apache.hudi.table.marker.WriteMarkers; -import org.apache.hudi.table.marker.WriteMarkersFactory; -import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -49,9 +48,9 @@ public class JavaMarkerBasedRollbackStrategy exte @Override public List execute(HoodieInstant instantToRollback) { try { - WriteMarkers writeMarkers = - WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp()); - List rollbackStats = context.map(new ArrayList<>(writeMarkers.allMarkerFilePaths()), markerFilePath -> { + List markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths( + table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism()); + List rollbackStats = context.map(markerPaths, markerFilePath -> { String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1); IOType type = IOType.valueOf(typeStr); switch (type) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java index ba6bf1fe4..0adacd28c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java @@ -33,15 +33,14 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.marker.MarkerBasedRollbackUtils; import org.apache.hudi.table.marker.WriteMarkers; -import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -58,11 +57,11 @@ public class SparkMarkerBasedRollbackStrategy ext public List execute(HoodieInstant instantToRollback) { JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); try { - WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp()); - List markerFilePaths = new ArrayList<>(writeMarkers.allMarkerFilePaths()); - int parallelism = Math.max(Math.min(markerFilePaths.size(), config.getRollbackParallelism()), 1); + List markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths( + table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism()); + int parallelism = Math.max(Math.min(markerPaths.size(), config.getRollbackParallelism()), 1); jsc.setJobGroup(this.getClass().getSimpleName(), "Rolling back using marker files"); - return jsc.parallelize(markerFilePaths, parallelism) + return jsc.parallelize(markerPaths, parallelism) .map(markerFilePath -> { String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1); IOType type = IOType.valueOf(typeStr); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java index ad951959b..8470b9685 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -33,13 +34,12 @@ import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper; import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hudi.table.action.rollback.RollbackUtils; import org.apache.hudi.table.marker.WriteMarkers; import org.apache.hudi.table.marker.WriteMarkersFactory; -import org.apache.hudi.table.marker.MarkerType; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import java.util.List; import java.util.stream.Collectors; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestTimelineServerBasedWriteMarkers.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestTimelineServerBasedWriteMarkers.java index 3300c95fe..583883ccb 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestTimelineServerBasedWriteMarkers.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestTimelineServerBasedWriteMarkers.java @@ -23,14 +23,15 @@ import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; -import org.apache.hudi.common.testutils.FileSystemTestUtils; import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.MarkerUtils; import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.timeline.service.TimelineService; -import org.apache.hudi.timeline.service.handlers.marker.MarkerDirState; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -40,16 +41,15 @@ import org.apache.spark.api.java.JavaSparkContext; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import java.io.BufferedReader; import java.io.Closeable; import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; +import java.util.Collection; import java.util.List; import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertIterableEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; public class TestTimelineServerBasedWriteMarkers extends TestWriteMarkersBase { TimelineService timelineService; @@ -94,26 +94,10 @@ public class TestTimelineServerBasedWriteMarkers extends TestWriteMarkersBase { @Override void verifyMarkersInFileSystem() throws IOException { - List allMarkers = FileSystemTestUtils.listRecursive(fs, markerFolderPath) - .stream().filter(status -> status.getPath().getName().contains(MarkerDirState.MARKERS_FILENAME_PREFIX)) - .flatMap(status -> { - // Read all markers stored in each marker file maintained by the timeline service - FSDataInputStream fsDataInputStream = null; - BufferedReader bufferedReader = null; - List markers = null; - try { - fsDataInputStream = fs.open(status.getPath()); - bufferedReader = new BufferedReader(new InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8)); - markers = bufferedReader.lines().collect(Collectors.toList()); - } catch (IOException e) { - e.printStackTrace(); - } finally { - closeQuietly(bufferedReader); - closeQuietly(fsDataInputStream); - } - return markers.stream(); - }) - .sorted() + // Verifies the markers + List allMarkers = MarkerUtils.readTimelineServerBasedMarkersFromFileSystem( + markerFolderPath.toString(), fs, context, 1) + .values().stream().flatMap(Collection::stream).sorted() .collect(Collectors.toList()); assertEquals(3, allMarkers.size()); assertIterableEquals(CollectionUtils.createImmutableList( @@ -121,6 +105,13 @@ public class TestTimelineServerBasedWriteMarkers extends TestWriteMarkersBase { "2020/06/02/file2.marker.APPEND", "2020/06/03/file3.marker.CREATE"), allMarkers); + // Verifies the marker type file + Path markerTypeFilePath = new Path(markerFolderPath, MarkerUtils.MARKER_TYPE_FILENAME); + assertTrue(MarkerUtils.doesMarkerTypeFileExist(fs, markerFolderPath.toString())); + FSDataInputStream fsDataInputStream = fs.open(markerTypeFilePath); + assertEquals(MarkerType.TIMELINE_SERVER_BASED.toString(), + FileIOUtils.readAsUTFString(fsDataInputStream)); + closeQuietly(fsDataInputStream); } /** diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java index b3ad41754..0298ed37a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.testutils.FileSystemTestUtils; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.MarkerUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hadoop.fs.FileSystem; @@ -100,8 +101,10 @@ public abstract class TestWriteMarkersBase extends HoodieCommonTestHarness { createSomeMarkers(); // add invalid file createInvalidFile("2020/06/01", "invalid_file3"); - int fileSize = FileSystemTestUtils.listRecursive(fs, markerFolderPath).size(); - assertEquals(fileSize,4); + long fileSize = FileSystemTestUtils.listRecursive(fs, markerFolderPath).stream() + .filter(fileStatus -> !fileStatus.getPath().getName().contains(MarkerUtils.MARKER_TYPE_FILENAME)) + .count(); + assertEquals(fileSize, 4); // then assertIterableEquals(CollectionUtils.createImmutableList( @@ -118,7 +121,9 @@ public abstract class TestWriteMarkersBase extends HoodieCommonTestHarness { // then assertIterableEquals(CollectionUtils.createImmutableList("2020/06/01/file1.marker.MERGE", "2020/06/02/file2.marker.APPEND", "2020/06/03/file3.marker.CREATE"), - writeMarkers.allMarkerFilePaths().stream().sorted().collect(Collectors.toList()) + writeMarkers.allMarkerFilePaths().stream() + .filter(path -> !path.contains(MarkerUtils.MARKER_TYPE_FILENAME)) + .sorted().collect(Collectors.toList()) ); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerType.java b/hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerType.java similarity index 57% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerType.java rename to hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerType.java index 06d7903eb..2b0a28df8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerType.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerType.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.table.marker; +package org.apache.hudi.common.table.marker; /** * Marker type indicating how markers are stored in the file system. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java new file mode 100644 index 000000000..9d0965922 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.util; + +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.marker.MarkerType; +import org.apache.hudi.common.util.collection.ImmutablePair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.util.FileIOUtils.closeQuietly; + +/** + * A utility class for marker related operations. + */ +public class MarkerUtils { + public static final String MARKERS_FILENAME_PREFIX = "MARKERS"; + public static final String MARKER_TYPE_FILENAME = MARKERS_FILENAME_PREFIX + ".type"; + private static final Logger LOG = LogManager.getLogger(MarkerUtils.class); + + /** + * Strips the folder prefix of the marker file path corresponding to a data file. + * + * @param fullMarkerPath the full path of the marker file + * @param basePath the base path + * @param instantTime instant of interest + * @return marker file name + */ + public static String stripMarkerFolderPrefix(String fullMarkerPath, String basePath, String instantTime) { + ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN)); + String markerRootPath = Path.getPathWithoutSchemeAndAuthority( + new Path(String.format("%s/%s/%s", basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime))).toString(); + return stripMarkerFolderPrefix(fullMarkerPath, markerRootPath); + } + + /** + * Strips the marker folder prefix of any file path under the marker directory. + * + * @param fullMarkerPath the full path of the file + * @param markerDir marker directory + * @return file name + */ + public static String stripMarkerFolderPrefix(String fullMarkerPath, String markerDir) { + int begin = fullMarkerPath.indexOf(markerDir); + ValidationUtils.checkArgument(begin >= 0, + "Not in marker dir. Marker Path=" + fullMarkerPath + ", Expected Marker Root=" + markerDir); + return fullMarkerPath.substring(begin + markerDir.length() + 1); + } + + /** + * @param fileSystem file system to use. + * @param markerDir marker directory. + * @return {@code true} if the MARKERS.type file exists; {@code false} otherwise. + */ + public static boolean doesMarkerTypeFileExist(FileSystem fileSystem, String markerDir) throws IOException { + return fileSystem.exists(new Path(markerDir, MARKER_TYPE_FILENAME)); + } + + /** + * Reads the marker type from `MARKERS.type` file. + * + * @param fileSystem file system to use. + * @param markerDir marker directory. + * @return the marker type, or empty if the marker type file does not exist. + */ + public static Option readMarkerType(FileSystem fileSystem, String markerDir) { + Path markerTypeFilePath = new Path(markerDir, MARKER_TYPE_FILENAME); + FSDataInputStream fsDataInputStream = null; + Option content = Option.empty(); + try { + if (!doesMarkerTypeFileExist(fileSystem, markerDir)) { + return Option.empty(); + } + fsDataInputStream = fileSystem.open(markerTypeFilePath); + content = Option.of(MarkerType.valueOf(FileIOUtils.readAsUTFString(fsDataInputStream))); + } catch (IOException e) { + throw new HoodieIOException("Cannot read marker type file " + markerTypeFilePath.toString() + + "; " + e.getMessage(), e); + } finally { + closeQuietly(fsDataInputStream); + } + return content; + } + + /** + * Writes the marker type to the file `MARKERS.type`. + * + * @param markerType marker type. + * @param fileSystem file system to use. + * @param markerDir marker directory. + */ + public static void writeMarkerTypeToFile(MarkerType markerType, FileSystem fileSystem, String markerDir) { + Path markerTypeFilePath = new Path(markerDir, MARKER_TYPE_FILENAME); + FSDataOutputStream fsDataOutputStream = null; + BufferedWriter bufferedWriter = null; + try { + fsDataOutputStream = fileSystem.create(markerTypeFilePath, false); + bufferedWriter = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)); + bufferedWriter.write(markerType.toString()); + } catch (IOException e) { + throw new HoodieException("Failed to create marker type file " + markerTypeFilePath.toString() + + "; " + e.getMessage(), e); + } finally { + closeQuietly(bufferedWriter); + closeQuietly(fsDataOutputStream); + } + } + + /** + * Deletes `MARKERS.type` file. + * + * @param fileSystem file system to use. + * @param markerDir marker directory. + */ + public static void deleteMarkerTypeFile(FileSystem fileSystem, String markerDir) { + Path markerTypeFilePath = new Path(markerDir, MARKER_TYPE_FILENAME); + try { + fileSystem.delete(markerTypeFilePath, false); + } catch (IOException e) { + throw new HoodieIOException("Cannot delete marker type file " + markerTypeFilePath.toString() + + "; " + e.getMessage(), e); + } + } + + /** + * Reads files containing the markers written by timeline-server-based marker mechanism. + * + * @param markerDir marker directory. + * @param fileSystem file system to use. + * @param context instance of {@link HoodieEngineContext} to use + * @param parallelism parallelism to use + * @return A {@code Map} of file name to the set of markers stored in the file. + */ + public static Map> readTimelineServerBasedMarkersFromFileSystem( + String markerDir, FileSystem fileSystem, HoodieEngineContext context, int parallelism) { + Path dirPath = new Path(markerDir); + try { + if (fileSystem.exists(dirPath)) { + FileStatus[] fileStatuses = fileSystem.listStatus(dirPath); + Predicate prefixFilter = pathStr -> pathStr.contains(MARKERS_FILENAME_PREFIX); + Predicate markerTypeFilter = + pathStr -> !stripMarkerFolderPrefix(pathStr, markerDir).equals(MARKER_TYPE_FILENAME); + List markerDirSubPaths = Arrays.stream(fileStatuses) + .map(fileStatus -> fileStatus.getPath().toString()) + .filter(prefixFilter.and(markerTypeFilter)) + .collect(Collectors.toList()); + + if (markerDirSubPaths.size() > 0) { + SerializableConfiguration conf = new SerializableConfiguration(fileSystem.getConf()); + int actualParallelism = Math.min(markerDirSubPaths.size(), parallelism); + return context.mapToPair(markerDirSubPaths, markersFilePathStr -> { + Path markersFilePath = new Path(markersFilePathStr); + FileSystem fs = markersFilePath.getFileSystem(conf.get()); + FSDataInputStream fsDataInputStream = null; + BufferedReader bufferedReader = null; + Set markers = new HashSet<>(); + try { + LOG.debug("Read marker file: " + markersFilePathStr); + fsDataInputStream = fs.open(markersFilePath); + bufferedReader = new BufferedReader(new InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8)); + markers = bufferedReader.lines().collect(Collectors.toSet()); + } catch (IOException e) { + throw new HoodieIOException("Failed to read file " + markersFilePathStr, e); + } finally { + closeQuietly(bufferedReader); + closeQuietly(fsDataInputStream); + } + return new ImmutablePair<>(markersFilePathStr, markers); + }, actualParallelism); + } + } + return new HashMap<>(); + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + } +} diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java index 7e5bdd818..e793c2043 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java @@ -23,8 +23,8 @@ import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.timeline.service.TimelineService; -import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationFuture; import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationDispatchingRunnable; +import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationFuture; import org.apache.hudi.timeline.service.handlers.marker.MarkerDirState; import io.javalin.Context; diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java index 47c0c5971..9075aae8e 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java @@ -21,15 +21,15 @@ package org.apache.hudi.timeline.service.handlers.marker; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.metrics.Registry; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.MarkerUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -38,10 +38,8 @@ import org.apache.hadoop.util.StringUtils; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.IOException; -import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.Serializable; import java.nio.charset.StandardCharsets; @@ -56,6 +54,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.hudi.common.util.FileIOUtils.closeQuietly; +import static org.apache.hudi.common.util.MarkerUtils.MARKERS_FILENAME_PREFIX; import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult; /** @@ -64,7 +63,6 @@ import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult; * The operations inside this class is designed to be thread-safe. */ public class MarkerDirState implements Serializable { - public static final String MARKERS_FILENAME_PREFIX = "MARKERS"; private static final Logger LOG = LogManager.getLogger(MarkerDirState.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // Marker directory @@ -89,6 +87,7 @@ public class MarkerDirState implements Serializable { // Last underlying file index used, for finding the next file index // in a round-robin fashion private int lastFileIndexUsed = -1; + private boolean isMarkerTypeWritten = false; public MarkerDirState(String markerDirPath, int markerBatchNumThreads, FileSystem fileSystem, Registry metricsRegistry, HoodieEngineContext hoodieEngineContext, int parallelism) { @@ -104,7 +103,7 @@ public class MarkerDirState implements Serializable { } /** - * @return {@code true} if the marker directory exists in the system. + * @return {@code true} if the marker directory exists in the system. */ public boolean exists() { try { @@ -212,6 +211,12 @@ public class MarkerDirState implements Serializable { } future.setResult(!exists); } + + if (!isMarkerTypeWritten) { + // Create marker directory and write marker type to MARKERS.type + writeMarkerTypeToFile(); + isMarkerTypeWritten = true; + } } flushMarkersToFile(fileIndex); markFileAsAvailable(fileIndex); @@ -266,62 +271,49 @@ public class MarkerDirState implements Serializable { * Syncs all markers maintained in the underlying files under the marker directory in the file system. */ private void syncMarkersFromFileSystem() { - Path dirPath = new Path(markerDirPath); - try { - if (fileSystem.exists(dirPath)) { - FileStatus[] fileStatuses = fileSystem.listStatus(dirPath); - List markerDirSubPaths = Arrays.stream(fileStatuses) - .map(fileStatus -> fileStatus.getPath().toString()) - .filter(pathStr -> pathStr.contains(MARKERS_FILENAME_PREFIX)) - .collect(Collectors.toList()); + Map> fileMarkersSetMap = MarkerUtils.readTimelineServerBasedMarkersFromFileSystem( + markerDirPath, fileSystem, hoodieEngineContext, parallelism); - if (markerDirSubPaths.size() > 0) { - SerializableConfiguration conf = new SerializableConfiguration(fileSystem.getConf()); - int actualParallelism = Math.min(markerDirSubPaths.size(), parallelism); - Map> fileMarkersSetMap = - hoodieEngineContext.mapToPair(markerDirSubPaths, markersFilePathStr -> { - Path markersFilePath = new Path(markersFilePathStr); - FileSystem fileSystem = markersFilePath.getFileSystem(conf.get()); - FSDataInputStream fsDataInputStream = null; - BufferedReader bufferedReader = null; - Set markers = new HashSet<>(); - try { - LOG.debug("Read marker file: " + markersFilePathStr); - fsDataInputStream = fileSystem.open(markersFilePath); - bufferedReader = new BufferedReader(new InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8)); - markers = bufferedReader.lines().collect(Collectors.toSet()); - bufferedReader.close(); - fsDataInputStream.close(); - } catch (IOException e) { - throw new HoodieIOException("Failed to read MARKERS file " + markerDirPath, e); - } finally { - closeQuietly(bufferedReader); - closeQuietly(fsDataInputStream); - } - return new ImmutablePair<>(markersFilePathStr, markers); - }, actualParallelism); - - for (String markersFilePathStr: fileMarkersSetMap.keySet()) { - Set fileMarkers = fileMarkersSetMap.get(markersFilePathStr); - if (!fileMarkers.isEmpty()) { - int index = parseMarkerFileIndex(markersFilePathStr); - - if (index >= 0) { - fileMarkersMap.put(index, new StringBuilder(StringUtils.join(",", fileMarkers))); - allMarkers.addAll(fileMarkers); - } - } - } + for (String markersFilePathStr : fileMarkersSetMap.keySet()) { + Set fileMarkers = fileMarkersSetMap.get(markersFilePathStr); + if (!fileMarkers.isEmpty()) { + int index = parseMarkerFileIndex(markersFilePathStr); + if (index >= 0) { + fileMarkersMap.put(index, new StringBuilder(StringUtils.join(",", fileMarkers))); + allMarkers.addAll(fileMarkers); } } - } catch (IOException ioe) { - throw new HoodieIOException(ioe.getMessage(), ioe); + } + + try { + if (MarkerUtils.doesMarkerTypeFileExist(fileSystem, markerDirPath)) { + isMarkerTypeWritten = true; + } + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + } + + /** + * Writes marker type, "TIMELINE_SERVER_BASED", to file. + */ + private void writeMarkerTypeToFile() { + Path dirPath = new Path(markerDirPath); + try { + if (!fileSystem.exists(dirPath)) { + // There is no existing marker directory, create a new directory and write marker type + fileSystem.mkdirs(dirPath); + MarkerUtils.writeMarkerTypeToFile(MarkerType.TIMELINE_SERVER_BASED, fileSystem, markerDirPath); + } + } catch (IOException e) { + throw new HoodieIOException("Failed to write marker type file in " + markerDirPath + + ": " + e.getMessage(), e); } } /** * Parses the marker file index from the marker file path. - * + *

* E.g., if the marker file path is /tmp/table/.hoodie/.temp/000/MARKERS3, the index returned is 3. * * @param markerFilePathStr full path of marker file @@ -350,14 +342,6 @@ public class MarkerDirState implements Serializable { LOG.debug("Write to " + markerDirPath + "/" + MARKERS_FILENAME_PREFIX + markerFileIndex); HoodieTimer timer = new HoodieTimer().startTimer(); Path markersFilePath = new Path(markerDirPath, MARKERS_FILENAME_PREFIX + markerFileIndex); - Path dirPath = markersFilePath.getParent(); - try { - if (!fileSystem.exists(dirPath)) { - fileSystem.mkdirs(dirPath); - } - } catch (IOException e) { - throw new HoodieIOException("Failed to make dir " + dirPath, e); - } FSDataOutputStream fsDataOutputStream = null; BufferedWriter bufferedWriter = null; try {