1
0

[HUDI-2305] Add MARKERS.type and fix marker-based rollback (#3472)

- Rollback infers the directory structure and does rollback based on the strategy used while markers were written. "write markers type" in write config is used to determine marker strategy only for new writes.
This commit is contained in:
Y Ethan Guo
2021-08-14 05:18:49 -07:00
committed by GitHub
parent b7da6cb33d
commit 9056c68744
17 changed files with 403 additions and 143 deletions

View File

@@ -18,15 +18,15 @@
package org.apache.hudi.client.embedded;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.util.NetworkUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.marker.MarkerType;
import org.apache.hudi.timeline.service.TimelineService;
import org.apache.log4j.LogManager;

View File

@@ -30,11 +30,12 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.ReflectionUtils;
@@ -46,7 +47,6 @@ import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.hudi.table.marker.MarkerType;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.orc.CompressionKind;

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.MarkerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -149,7 +150,7 @@ public class DirectWriteMarkers extends WriteMarkers {
}
private String translateMarkerToDataPath(String markerPath) {
String rPath = stripMarkerFolderPrefix(markerPath);
String rPath = MarkerUtils.stripMarkerFolderPrefix(markerPath, basePath, instantTime);
return stripMarkerSuffix(rPath);
}
@@ -158,7 +159,7 @@ public class DirectWriteMarkers extends WriteMarkers {
Set<String> markerFiles = new HashSet<>();
if (doesMarkerDirExist()) {
FSUtils.processFiles(fs, markerDirPath.toString(), fileStatus -> {
markerFiles.add(stripMarkerFolderPrefix(fileStatus.getPath().toString()));
markerFiles.add(MarkerUtils.stripMarkerFolderPrefix(fileStatus.getPath().toString(), basePath, instantTime));
return true;
}, false);
}

View File

@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.table.marker;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.util.MarkerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.fs.FileSystem;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* A utility class for marker-based rollback.
*/
public class MarkerBasedRollbackUtils {
/**
* Gets all marker paths.
*
* @param table instance of {@code HoodieTable} to use
* @param context instance of {@code HoodieEngineContext} to use
* @param instant instant of interest to rollback
* @param parallelism parallelism to use
* @return a list of all markers
* @throws IOException
*/
public static List<String> getAllMarkerPaths(HoodieTable table, HoodieEngineContext context,
String instant, int parallelism) throws IOException {
String markerDir = table.getMetaClient().getMarkerFolderPath(instant);
FileSystem fileSystem = table.getMetaClient().getFs();
Option<MarkerType> markerTypeOption = MarkerUtils.readMarkerType(fileSystem, markerDir);
// If there is no marker type file "MARKERS.type", we assume "DIRECT" markers are used
if (!markerTypeOption.isPresent()) {
WriteMarkers writeMarkers = WriteMarkersFactory.get(MarkerType.DIRECT, table, instant);
return new ArrayList<>(writeMarkers.allMarkerFilePaths());
}
switch (markerTypeOption.get()) {
case TIMELINE_SERVER_BASED:
// Reads all markers written by the timeline server
Map<String, Set<String>> markersMap =
MarkerUtils.readTimelineServerBasedMarkersFromFileSystem(
markerDir, fileSystem, context, parallelism);
return markersMap.values().stream().flatMap(Collection::stream)
.collect(Collectors.toCollection(ArrayList::new));
default:
throw new HoodieException(
"The marker type \"" + markerTypeOption.get().name() + "\" is not supported.");
}
}
}

View File

@@ -23,7 +23,6 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hadoop.fs.Path;
@@ -127,23 +126,6 @@ public abstract class WriteMarkers implements Serializable {
return new Path(path, markerFileName);
}
/**
* Strips the folder prefix of the marker file path.
*
* @param fullMarkerPath the full path of the marker file
* @return marker file name
*/
protected String stripMarkerFolderPrefix(String fullMarkerPath) {
ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN));
String markerRootPath = Path.getPathWithoutSchemeAndAuthority(
new Path(String.format("%s/%s/%s", basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime))).toString();
int begin =
fullMarkerPath.indexOf(markerRootPath);
ValidationUtils.checkArgument(begin >= 0,
"Not in marker dir. Marker Path=" + fullMarkerPath + ", Expected Marker Root=" + markerRootPath);
return fullMarkerPath.substring(begin + markerRootPath.length() + 1);
}
/**
* Deletes the marker directory.
*

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.table.marker;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.StorageSchemes;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieTable;

View File

@@ -32,11 +32,10 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hadoop.fs.FileStatus;
import java.util.ArrayList;
import java.io.IOException;
import java.util.List;
@@ -54,8 +53,9 @@ public class FlinkMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> ext
@Override
public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
try {
WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp());
List<HoodieRollbackStat> rollbackStats = context.map(new ArrayList<>(writeMarkers.allMarkerFilePaths()), markerFilePath -> {
List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism());
List<HoodieRollbackStat> rollbackStats = context.map(markerPaths, markerFilePath -> {
String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
IOType type = IOType.valueOf(typeStr);
switch (type) {

View File

@@ -19,11 +19,12 @@
package org.apache.hudi.table.upgrade;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -37,7 +38,6 @@ import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.marker.MarkerType;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;

View File

@@ -30,10 +30,9 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -49,9 +48,9 @@ public class JavaMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> exte
@Override
public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
try {
WriteMarkers writeMarkers =
WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp());
List<HoodieRollbackStat> rollbackStats = context.map(new ArrayList<>(writeMarkers.allMarkerFilePaths()), markerFilePath -> {
List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism());
List<HoodieRollbackStat> rollbackStats = context.map(markerPaths, markerFilePath -> {
String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
IOType type = IOType.valueOf(typeStr);
switch (type) {

View File

@@ -33,15 +33,14 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -58,11 +57,11 @@ public class SparkMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> ext
public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
try {
WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp());
List<String> markerFilePaths = new ArrayList<>(writeMarkers.allMarkerFilePaths());
int parallelism = Math.max(Math.min(markerFilePaths.size(), config.getRollbackParallelism()), 1);
List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism());
int parallelism = Math.max(Math.min(markerPaths.size(), config.getRollbackParallelism()), 1);
jsc.setJobGroup(this.getClass().getSimpleName(), "Rolling back using marker files");
return jsc.parallelize(markerFilePaths, parallelism)
return jsc.parallelize(markerPaths, parallelism)
.map(markerFilePath -> {
String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
IOType type = IOType.valueOf(typeStr);

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -33,13 +34,12 @@ import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.marker.MarkerType;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import java.util.List;
import java.util.stream.Collectors;

View File

@@ -23,14 +23,15 @@ import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.testutils.FileSystemTestUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.MarkerUtils;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.timeline.service.TimelineService;
import org.apache.hudi.timeline.service.handlers.marker.MarkerDirState;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -40,16 +41,15 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestTimelineServerBasedWriteMarkers extends TestWriteMarkersBase {
TimelineService timelineService;
@@ -94,26 +94,10 @@ public class TestTimelineServerBasedWriteMarkers extends TestWriteMarkersBase {
@Override
void verifyMarkersInFileSystem() throws IOException {
List<String> allMarkers = FileSystemTestUtils.listRecursive(fs, markerFolderPath)
.stream().filter(status -> status.getPath().getName().contains(MarkerDirState.MARKERS_FILENAME_PREFIX))
.flatMap(status -> {
// Read all markers stored in each marker file maintained by the timeline service
FSDataInputStream fsDataInputStream = null;
BufferedReader bufferedReader = null;
List<String> markers = null;
try {
fsDataInputStream = fs.open(status.getPath());
bufferedReader = new BufferedReader(new InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8));
markers = bufferedReader.lines().collect(Collectors.toList());
} catch (IOException e) {
e.printStackTrace();
} finally {
closeQuietly(bufferedReader);
closeQuietly(fsDataInputStream);
}
return markers.stream();
})
.sorted()
// Verifies the markers
List<String> allMarkers = MarkerUtils.readTimelineServerBasedMarkersFromFileSystem(
markerFolderPath.toString(), fs, context, 1)
.values().stream().flatMap(Collection::stream).sorted()
.collect(Collectors.toList());
assertEquals(3, allMarkers.size());
assertIterableEquals(CollectionUtils.createImmutableList(
@@ -121,6 +105,13 @@ public class TestTimelineServerBasedWriteMarkers extends TestWriteMarkersBase {
"2020/06/02/file2.marker.APPEND",
"2020/06/03/file3.marker.CREATE"),
allMarkers);
// Verifies the marker type file
Path markerTypeFilePath = new Path(markerFolderPath, MarkerUtils.MARKER_TYPE_FILENAME);
assertTrue(MarkerUtils.doesMarkerTypeFileExist(fs, markerFolderPath.toString()));
FSDataInputStream fsDataInputStream = fs.open(markerTypeFilePath);
assertEquals(MarkerType.TIMELINE_SERVER_BASED.toString(),
FileIOUtils.readAsUTFString(fsDataInputStream));
closeQuietly(fsDataInputStream);
}
/**

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.testutils.FileSystemTestUtils;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.MarkerUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hadoop.fs.FileSystem;
@@ -100,8 +101,10 @@ public abstract class TestWriteMarkersBase extends HoodieCommonTestHarness {
createSomeMarkers();
// add invalid file
createInvalidFile("2020/06/01", "invalid_file3");
int fileSize = FileSystemTestUtils.listRecursive(fs, markerFolderPath).size();
assertEquals(fileSize,4);
long fileSize = FileSystemTestUtils.listRecursive(fs, markerFolderPath).stream()
.filter(fileStatus -> !fileStatus.getPath().getName().contains(MarkerUtils.MARKER_TYPE_FILENAME))
.count();
assertEquals(fileSize, 4);
// then
assertIterableEquals(CollectionUtils.createImmutableList(
@@ -118,7 +121,9 @@ public abstract class TestWriteMarkersBase extends HoodieCommonTestHarness {
// then
assertIterableEquals(CollectionUtils.createImmutableList("2020/06/01/file1.marker.MERGE",
"2020/06/02/file2.marker.APPEND", "2020/06/03/file3.marker.CREATE"),
writeMarkers.allMarkerFilePaths().stream().sorted().collect(Collectors.toList())
writeMarkers.allMarkerFilePaths().stream()
.filter(path -> !path.contains(MarkerUtils.MARKER_TYPE_FILENAME))
.sorted().collect(Collectors.toList())
);
}

View File

@@ -7,16 +7,17 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.table.marker;
package org.apache.hudi.common.table.marker;
/**
* Marker type indicating how markers are stored in the file system.

View File

@@ -0,0 +1,219 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.common.util;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.apache.hudi.common.util.FileIOUtils.closeQuietly;
/**
* A utility class for marker related operations.
*/
public class MarkerUtils {
public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
public static final String MARKER_TYPE_FILENAME = MARKERS_FILENAME_PREFIX + ".type";
private static final Logger LOG = LogManager.getLogger(MarkerUtils.class);
/**
* Strips the folder prefix of the marker file path corresponding to a data file.
*
* @param fullMarkerPath the full path of the marker file
* @param basePath the base path
* @param instantTime instant of interest
* @return marker file name
*/
public static String stripMarkerFolderPrefix(String fullMarkerPath, String basePath, String instantTime) {
ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN));
String markerRootPath = Path.getPathWithoutSchemeAndAuthority(
new Path(String.format("%s/%s/%s", basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime))).toString();
return stripMarkerFolderPrefix(fullMarkerPath, markerRootPath);
}
/**
* Strips the marker folder prefix of any file path under the marker directory.
*
* @param fullMarkerPath the full path of the file
* @param markerDir marker directory
* @return file name
*/
public static String stripMarkerFolderPrefix(String fullMarkerPath, String markerDir) {
int begin = fullMarkerPath.indexOf(markerDir);
ValidationUtils.checkArgument(begin >= 0,
"Not in marker dir. Marker Path=" + fullMarkerPath + ", Expected Marker Root=" + markerDir);
return fullMarkerPath.substring(begin + markerDir.length() + 1);
}
/**
* @param fileSystem file system to use.
* @param markerDir marker directory.
* @return {@code true} if the MARKERS.type file exists; {@code false} otherwise.
*/
public static boolean doesMarkerTypeFileExist(FileSystem fileSystem, String markerDir) throws IOException {
return fileSystem.exists(new Path(markerDir, MARKER_TYPE_FILENAME));
}
/**
* Reads the marker type from `MARKERS.type` file.
*
* @param fileSystem file system to use.
* @param markerDir marker directory.
* @return the marker type, or empty if the marker type file does not exist.
*/
public static Option<MarkerType> readMarkerType(FileSystem fileSystem, String markerDir) {
Path markerTypeFilePath = new Path(markerDir, MARKER_TYPE_FILENAME);
FSDataInputStream fsDataInputStream = null;
Option<MarkerType> content = Option.empty();
try {
if (!doesMarkerTypeFileExist(fileSystem, markerDir)) {
return Option.empty();
}
fsDataInputStream = fileSystem.open(markerTypeFilePath);
content = Option.of(MarkerType.valueOf(FileIOUtils.readAsUTFString(fsDataInputStream)));
} catch (IOException e) {
throw new HoodieIOException("Cannot read marker type file " + markerTypeFilePath.toString()
+ "; " + e.getMessage(), e);
} finally {
closeQuietly(fsDataInputStream);
}
return content;
}
/**
* Writes the marker type to the file `MARKERS.type`.
*
* @param markerType marker type.
* @param fileSystem file system to use.
* @param markerDir marker directory.
*/
public static void writeMarkerTypeToFile(MarkerType markerType, FileSystem fileSystem, String markerDir) {
Path markerTypeFilePath = new Path(markerDir, MARKER_TYPE_FILENAME);
FSDataOutputStream fsDataOutputStream = null;
BufferedWriter bufferedWriter = null;
try {
fsDataOutputStream = fileSystem.create(markerTypeFilePath, false);
bufferedWriter = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
bufferedWriter.write(markerType.toString());
} catch (IOException e) {
throw new HoodieException("Failed to create marker type file " + markerTypeFilePath.toString()
+ "; " + e.getMessage(), e);
} finally {
closeQuietly(bufferedWriter);
closeQuietly(fsDataOutputStream);
}
}
/**
* Deletes `MARKERS.type` file.
*
* @param fileSystem file system to use.
* @param markerDir marker directory.
*/
public static void deleteMarkerTypeFile(FileSystem fileSystem, String markerDir) {
Path markerTypeFilePath = new Path(markerDir, MARKER_TYPE_FILENAME);
try {
fileSystem.delete(markerTypeFilePath, false);
} catch (IOException e) {
throw new HoodieIOException("Cannot delete marker type file " + markerTypeFilePath.toString()
+ "; " + e.getMessage(), e);
}
}
/**
* Reads files containing the markers written by timeline-server-based marker mechanism.
*
* @param markerDir marker directory.
* @param fileSystem file system to use.
* @param context instance of {@link HoodieEngineContext} to use
* @param parallelism parallelism to use
* @return A {@code Map} of file name to the set of markers stored in the file.
*/
public static Map<String, Set<String>> readTimelineServerBasedMarkersFromFileSystem(
String markerDir, FileSystem fileSystem, HoodieEngineContext context, int parallelism) {
Path dirPath = new Path(markerDir);
try {
if (fileSystem.exists(dirPath)) {
FileStatus[] fileStatuses = fileSystem.listStatus(dirPath);
Predicate<String> prefixFilter = pathStr -> pathStr.contains(MARKERS_FILENAME_PREFIX);
Predicate<String> markerTypeFilter =
pathStr -> !stripMarkerFolderPrefix(pathStr, markerDir).equals(MARKER_TYPE_FILENAME);
List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
.map(fileStatus -> fileStatus.getPath().toString())
.filter(prefixFilter.and(markerTypeFilter))
.collect(Collectors.toList());
if (markerDirSubPaths.size() > 0) {
SerializableConfiguration conf = new SerializableConfiguration(fileSystem.getConf());
int actualParallelism = Math.min(markerDirSubPaths.size(), parallelism);
return context.mapToPair(markerDirSubPaths, markersFilePathStr -> {
Path markersFilePath = new Path(markersFilePathStr);
FileSystem fs = markersFilePath.getFileSystem(conf.get());
FSDataInputStream fsDataInputStream = null;
BufferedReader bufferedReader = null;
Set<String> markers = new HashSet<>();
try {
LOG.debug("Read marker file: " + markersFilePathStr);
fsDataInputStream = fs.open(markersFilePath);
bufferedReader = new BufferedReader(new InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8));
markers = bufferedReader.lines().collect(Collectors.toSet());
} catch (IOException e) {
throw new HoodieIOException("Failed to read file " + markersFilePathStr, e);
} finally {
closeQuietly(bufferedReader);
closeQuietly(fsDataInputStream);
}
return new ImmutablePair<>(markersFilePathStr, markers);
}, actualParallelism);
}
}
return new HashMap<>();
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
}
}

View File

@@ -23,8 +23,8 @@ import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.timeline.service.TimelineService;
import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationFuture;
import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationDispatchingRunnable;
import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationFuture;
import org.apache.hudi.timeline.service.handlers.marker.MarkerDirState;
import io.javalin.Context;

View File

@@ -21,15 +21,15 @@ package org.apache.hudi.timeline.service.handlers.marker;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.MarkerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -38,10 +38,8 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
@@ -56,6 +54,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.util.FileIOUtils.closeQuietly;
import static org.apache.hudi.common.util.MarkerUtils.MARKERS_FILENAME_PREFIX;
import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
/**
@@ -64,7 +63,6 @@ import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
* The operations inside this class is designed to be thread-safe.
*/
public class MarkerDirState implements Serializable {
public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
private static final Logger LOG = LogManager.getLogger(MarkerDirState.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
// Marker directory
@@ -89,6 +87,7 @@ public class MarkerDirState implements Serializable {
// Last underlying file index used, for finding the next file index
// in a round-robin fashion
private int lastFileIndexUsed = -1;
private boolean isMarkerTypeWritten = false;
public MarkerDirState(String markerDirPath, int markerBatchNumThreads, FileSystem fileSystem,
Registry metricsRegistry, HoodieEngineContext hoodieEngineContext, int parallelism) {
@@ -104,7 +103,7 @@ public class MarkerDirState implements Serializable {
}
/**
* @return {@code true} if the marker directory exists in the system.
* @return {@code true} if the marker directory exists in the system.
*/
public boolean exists() {
try {
@@ -212,6 +211,12 @@ public class MarkerDirState implements Serializable {
}
future.setResult(!exists);
}
if (!isMarkerTypeWritten) {
// Create marker directory and write marker type to MARKERS.type
writeMarkerTypeToFile();
isMarkerTypeWritten = true;
}
}
flushMarkersToFile(fileIndex);
markFileAsAvailable(fileIndex);
@@ -266,62 +271,49 @@ public class MarkerDirState implements Serializable {
* Syncs all markers maintained in the underlying files under the marker directory in the file system.
*/
private void syncMarkersFromFileSystem() {
Path dirPath = new Path(markerDirPath);
try {
if (fileSystem.exists(dirPath)) {
FileStatus[] fileStatuses = fileSystem.listStatus(dirPath);
List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
.map(fileStatus -> fileStatus.getPath().toString())
.filter(pathStr -> pathStr.contains(MARKERS_FILENAME_PREFIX))
.collect(Collectors.toList());
Map<String, Set<String>> fileMarkersSetMap = MarkerUtils.readTimelineServerBasedMarkersFromFileSystem(
markerDirPath, fileSystem, hoodieEngineContext, parallelism);
if (markerDirSubPaths.size() > 0) {
SerializableConfiguration conf = new SerializableConfiguration(fileSystem.getConf());
int actualParallelism = Math.min(markerDirSubPaths.size(), parallelism);
Map<String, Set<String>> fileMarkersSetMap =
hoodieEngineContext.mapToPair(markerDirSubPaths, markersFilePathStr -> {
Path markersFilePath = new Path(markersFilePathStr);
FileSystem fileSystem = markersFilePath.getFileSystem(conf.get());
FSDataInputStream fsDataInputStream = null;
BufferedReader bufferedReader = null;
Set<String> markers = new HashSet<>();
try {
LOG.debug("Read marker file: " + markersFilePathStr);
fsDataInputStream = fileSystem.open(markersFilePath);
bufferedReader = new BufferedReader(new InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8));
markers = bufferedReader.lines().collect(Collectors.toSet());
bufferedReader.close();
fsDataInputStream.close();
} catch (IOException e) {
throw new HoodieIOException("Failed to read MARKERS file " + markerDirPath, e);
} finally {
closeQuietly(bufferedReader);
closeQuietly(fsDataInputStream);
}
return new ImmutablePair<>(markersFilePathStr, markers);
}, actualParallelism);
for (String markersFilePathStr: fileMarkersSetMap.keySet()) {
Set<String> fileMarkers = fileMarkersSetMap.get(markersFilePathStr);
if (!fileMarkers.isEmpty()) {
int index = parseMarkerFileIndex(markersFilePathStr);
if (index >= 0) {
fileMarkersMap.put(index, new StringBuilder(StringUtils.join(",", fileMarkers)));
allMarkers.addAll(fileMarkers);
}
}
}
for (String markersFilePathStr : fileMarkersSetMap.keySet()) {
Set<String> fileMarkers = fileMarkersSetMap.get(markersFilePathStr);
if (!fileMarkers.isEmpty()) {
int index = parseMarkerFileIndex(markersFilePathStr);
if (index >= 0) {
fileMarkersMap.put(index, new StringBuilder(StringUtils.join(",", fileMarkers)));
allMarkers.addAll(fileMarkers);
}
}
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
try {
if (MarkerUtils.doesMarkerTypeFileExist(fileSystem, markerDirPath)) {
isMarkerTypeWritten = true;
}
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}
/**
* Writes marker type, "TIMELINE_SERVER_BASED", to file.
*/
private void writeMarkerTypeToFile() {
Path dirPath = new Path(markerDirPath);
try {
if (!fileSystem.exists(dirPath)) {
// There is no existing marker directory, create a new directory and write marker type
fileSystem.mkdirs(dirPath);
MarkerUtils.writeMarkerTypeToFile(MarkerType.TIMELINE_SERVER_BASED, fileSystem, markerDirPath);
}
} catch (IOException e) {
throw new HoodieIOException("Failed to write marker type file in " + markerDirPath
+ ": " + e.getMessage(), e);
}
}
/**
* Parses the marker file index from the marker file path.
*
* <p>
* E.g., if the marker file path is /tmp/table/.hoodie/.temp/000/MARKERS3, the index returned is 3.
*
* @param markerFilePathStr full path of marker file
@@ -350,14 +342,6 @@ public class MarkerDirState implements Serializable {
LOG.debug("Write to " + markerDirPath + "/" + MARKERS_FILENAME_PREFIX + markerFileIndex);
HoodieTimer timer = new HoodieTimer().startTimer();
Path markersFilePath = new Path(markerDirPath, MARKERS_FILENAME_PREFIX + markerFileIndex);
Path dirPath = markersFilePath.getParent();
try {
if (!fileSystem.exists(dirPath)) {
fileSystem.mkdirs(dirPath);
}
} catch (IOException e) {
throw new HoodieIOException("Failed to make dir " + dirPath, e);
}
FSDataOutputStream fsDataOutputStream = null;
BufferedWriter bufferedWriter = null;
try {