1
0

Create .hoodie_partition_metadata in each partition, linking back to basepath

- Concurreny handled via taskID, failure recovery handled via renames
 - Falls back to search 3 levels up
 - Cli tool has command to add this to existing tables
This commit is contained in:
Vinoth Chandar
2017-03-21 23:57:30 -07:00
committed by vinoth chandar
parent 1e802ad4f2
commit 3129770fd0
10 changed files with 291 additions and 86 deletions

View File

@@ -1,78 +0,0 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.cli.commands;
import com.uber.hoodie.cli.HoodieCLI;
import com.uber.hoodie.cli.utils.InputStreamConsumer;
import com.uber.hoodie.cli.utils.SparkUtil;
import org.apache.spark.launcher.SparkLauncher;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
@Component
public class RecordsCommand implements CommandMarker {
@CliAvailabilityIndicator({"records deduplicate"})
public boolean isRecordsDeduplicateAvailable() {
return HoodieCLI.tableMetadata != null;
}
@CliCommand(value = "records deduplicate", help = "De-duplicate a partition path contains duplicates & produce repaired files to replace with")
public String deduplicate(
@CliOption(key = {
"duplicatedPartitionPath"}, help = "Partition Path containing the duplicates")
final String duplicatedPartitionPath,
@CliOption(key = {"repairedOutputPath"}, help = "Location to place the repaired files")
final String repairedOutputPath,
@CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path")
final String sparkPropertiesPath) throws Exception {
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher
.addAppArgs(SparkMain.SparkCommand.DEDUPLICATE.toString(), duplicatedPartitionPath,
repairedOutputPath, HoodieCLI.tableMetadata.getBasePath());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Deduplicated files placed in: " + repairedOutputPath;
}
return "Deduplication failed ";
}
// @CliCommand(value = "records find", help = "Find Records in a hoodie dataset")
// public String findRecords(
// @CliOption(key = {"keys"}, help = "Keys To Find (Comma seperated)")
// final String hoodieKeys,
// @CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path")
// final String sparkPropertiesPath) throws Exception {
// SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
// sparkLauncher
// .addAppArgs(SparkMain.RECORD_FIND, hoodieKeys, HoodieCLI.tableMetadata.getBasePath());
// Process process = sparkLauncher.launch();
// InputStreamConsumer.captureOutput(process);
// int exitCode = process.waitFor();
//
// if (exitCode != 0) {
// return "Deduplicated files placed in: " + repairedOutputPath;
// }
// return "Deduplication failed ";
// }
}

View File

@@ -0,0 +1,110 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.cli.commands;
import com.uber.hoodie.cli.HoodieCLI;
import com.uber.hoodie.cli.HoodiePrintHelper;
import com.uber.hoodie.cli.utils.InputStreamConsumer;
import com.uber.hoodie.cli.utils.SparkUtil;
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
import com.uber.hoodie.common.util.FSUtils;
import org.apache.hadoop.fs.Path;
import org.apache.spark.launcher.SparkLauncher;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
@Component
public class RepairsCommand implements CommandMarker {
@CliAvailabilityIndicator({"repair deduplicate"})
public boolean isRepairDeduplicateAvailable() {
return HoodieCLI.tableMetadata != null;
}
@CliAvailabilityIndicator({"repair addpartitionmeta"})
public boolean isRepairAddPartitionMetaAvailable() {
return HoodieCLI.tableMetadata != null;
}
@CliCommand(value = "repair deduplicate", help = "De-duplicate a partition path contains duplicates & produce repaired files to replace with")
public String deduplicate(
@CliOption(key = {
"duplicatedPartitionPath"}, help = "Partition Path containing the duplicates")
final String duplicatedPartitionPath,
@CliOption(key = {"repairedOutputPath"}, help = "Location to place the repaired files")
final String repairedOutputPath,
@CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path")
final String sparkPropertiesPath) throws Exception {
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher
.addAppArgs(SparkMain.SparkCommand.DEDUPLICATE.toString(), duplicatedPartitionPath,
repairedOutputPath, HoodieCLI.tableMetadata.getBasePath());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Deduplicated files placed in: " + repairedOutputPath;
}
return "Deduplication failed ";
}
@CliCommand(value = "repair addpartitionmeta", help = "Add partition metadata to a dataset, if not present")
public String addPartitionMeta(
@CliOption(key = {"dryrun"},
help = "Should we actually add or just print what would be done",
unspecifiedDefaultValue = "true")
final boolean dryRun) throws IOException {
String latestCommit = HoodieCLI.tableMetadata.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp();
List<String> partitionPaths = FSUtils.getAllPartitionPaths(HoodieCLI.fs,
HoodieCLI.tableMetadata.getBasePath());
Path basePath = new Path(HoodieCLI.tableMetadata.getBasePath());
String[][] rows = new String[partitionPaths.size() + 1][];
int ind = 0;
for (String partition: partitionPaths) {
Path partitionPath = new Path(basePath, partition);
String[] row = new String[3];
row[0] = partition; row[1] = "Yes"; row[2] = "None";
if (!HoodiePartitionMetadata.hasPartitionMetadata(HoodieCLI.fs, partitionPath)) {
row[1] = "No";
if (!dryRun) {
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(
HoodieCLI.fs,
latestCommit,
basePath,
partitionPath);
partitionMetadata.trySave(0);
}
}
rows[ind++] = row;
}
return HoodiePrintHelper.print(
new String[] {"Partition Path", "Metadata Present?", "Action"}, rows);
}
}

View File

@@ -17,6 +17,7 @@
package com.uber.hoodie.io;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload;
@@ -55,6 +56,11 @@ public class HoodieInsertHandle<T extends HoodieRecordPayload> extends HoodieIOH
this.path = makeNewPath(partitionPath, TaskContext.getPartitionId(), status.getFileId());
try {
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs,
commitTime,
new Path(config.getBasePath()),
new Path(config.getBasePath(), partitionPath));
partitionMetadata.trySave(TaskContext.getPartitionId());
this.storageWriter =
HoodieStorageWriterFactory.getStorageWriter(commitTime, path, hoodieTable, config, schema);
} catch (IOException e) {

View File

@@ -16,6 +16,7 @@
package com.uber.hoodie.io;
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieRecord;
@@ -82,6 +83,13 @@ public class HoodieUpdateHandle <T extends HoodieRecordPayload> extends HoodieIO
.getLatestDataFilesForFileId(record.getPartitionPath(), fileId).findFirst()
.get().getFileName();
writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath));
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs,
commitTime,
new Path(config.getBasePath()),
new Path(config.getBasePath(), record.getPartitionPath()));
partitionMetadata.trySave(TaskContext.getPartitionId());
oldFilePath = new Path(
config.getBasePath() + "/" + record.getPartitionPath() + "/"
+ latestValidFilePath);

View File

@@ -0,0 +1,135 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.model;
import com.uber.hoodie.exception.HoodieException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Properties;
/**
* The metadata that goes into the meta file in each partition
*/
public class HoodiePartitionMetadata {
public static final String HOODIE_PARTITION_METAFILE = ".hoodie_partition_metadata";
public static final String PARTITION_DEPTH_KEY = "partitionDepth";
public static final String COMMIT_TIME_KEY = "commitTime";
/**
* Contents of the metadata
*/
private final Properties props;
/**
* Path to the partition, about which we have the metadata
*/
private final Path partitionPath;
private final FileSystem fs;
private static Logger log = LogManager.getLogger(HoodiePartitionMetadata.class);
/**
* Construct metadata from existing partition
*/
public HoodiePartitionMetadata(FileSystem fs, Path partitionPath) {
this.fs = fs;
this.props = new Properties();
this.partitionPath = partitionPath;
}
/**
* Construct metadata object to be written out.
*/
public HoodiePartitionMetadata(FileSystem fs, String commitTime, Path basePath, Path partitionPath) {
this(fs, partitionPath);
props.setProperty(COMMIT_TIME_KEY, commitTime);
props.setProperty(PARTITION_DEPTH_KEY, String.valueOf(partitionPath.depth() - basePath.depth()));
}
public int getPartitionDepth() {
if (!props.contains(PARTITION_DEPTH_KEY)) {
throw new HoodieException("Could not find partitionDepth in partition metafile");
}
return Integer.parseInt(props.getProperty(PARTITION_DEPTH_KEY));
}
/**
* Write the metadata safely into partition
*/
public void trySave(int taskPartitionId) {
Path tmpMetaPath = new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE + "_" + taskPartitionId);
Path metaPath = new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE);
boolean metafileExists = false;
try {
metafileExists = fs.exists(metaPath);
if (!metafileExists) {
// write to temporary file
FSDataOutputStream os = fs.create(tmpMetaPath, true);
props.store(os, "partition metadata");
os.hsync();
os.hflush();
os.close();
// move to actual path
fs.rename(tmpMetaPath, metaPath);
}
} catch (IOException ioe) {
log.warn("Error trying to save partition metadata (this is okay, as long as atleast 1 of these succced), " +
partitionPath, ioe);
} finally {
if (!metafileExists) {
try {
// clean up tmp file, if still lying around
if (fs.exists(tmpMetaPath)) {
fs.delete(tmpMetaPath, false);
}
} catch (IOException ioe) {
log.warn("Error trying to clean up temporary files for " + partitionPath, ioe);
}
}
}
}
/**
* Read out the metadata for this partition
*/
public void readFromFS() {
try {
Path metaFile = new Path(partitionPath, HOODIE_PARTITION_METAFILE);
FSDataInputStream is = fs.open(metaFile);
props.load(is);
} catch (IOException ioe) {
throw new HoodieException("Error reading Hoodie partition metadata for " + partitionPath, ioe);
}
}
// methods related to partition meta data
public static boolean hasPartitionMetadata(FileSystem fs, Path partitionPath) throws IOException {
return fs.exists(new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
}
}

View File

@@ -233,7 +233,10 @@ public class HoodieTableFileSystemView implements TableFileSystemView, Serializa
*/
private Map<String, List<HoodieDataFile>> groupFilesByFileId(FileStatus[] files,
String maxCommitTime) throws IOException {
return Arrays.stream(files).flatMap(fileStatus -> {
return Arrays.stream(files)
// filter out files starting with "."
.filter(file -> !file.getPath().getName().startsWith("."))
.flatMap(fileStatus -> {
HoodieDataFile dataFile = new HoodieDataFile(fileStatus);
if (visibleActiveCommitTimeline.containsOrBeforeTimelineStarts(dataFile.getCommitTime())
&& visibleActiveCommitTimeline

View File

@@ -111,10 +111,6 @@ public class FSUtils {
return fs.listStatus(path)[0].getLen();
}
public static String globAllFiles(String basePath) {
return String.format("%s/*/*/*/*", basePath);
}
// TODO (weiy): rename the function for better readability
public static String getFileId(String fullFileName) {
return fullFileName.split("_")[0];

View File

@@ -16,6 +16,7 @@
package com.uber.hoodie.hadoop;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -32,6 +33,7 @@ public class HoodieHiveUtil {
public static final String DEFAULT_SCAN_MODE = LATEST_SCAN_MODE;
public static final int DEFAULT_MAX_COMMITS = 1;
public static final int MAX_COMMIT_ALL = -1;
public static final int DEFAULT_LEVELS_TO_BASEPATH = 3;
public static Integer readMaxCommits(JobContext job, String tableName) {
String maxCommitName = String.format(HOODIE_MAX_COMMIT_PATTERN, tableName);
@@ -55,4 +57,12 @@ public class HoodieHiveUtil {
LOG.info(modePropertyName + ": " + mode);
return mode;
}
public static Path getNthParent(Path path, int n) {
Path parent = path;
for (int i = 0; i < n; i++) {
parent = parent.getParent();
}
return parent;
}
}

View File

@@ -17,6 +17,7 @@
package com.uber.hoodie.hadoop;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
@@ -259,8 +260,13 @@ public class HoodieInputFormat extends MapredParquetInputFormat
*/
private HoodieTableMetaClient getTableMetaClient(Path dataPath) throws IOException {
FileSystem fs = dataPath.getFileSystem(conf);
// TODO - remove this hard-coding. Pass this in job conf, somehow. Or read the Table Location
Path baseDir = dataPath.getParent().getParent().getParent();
int levels = HoodieHiveUtil.DEFAULT_LEVELS_TO_BASEPATH;
if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) {
HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath);
metadata.readFromFS();
levels = metadata.getPartitionDepth();
}
Path baseDir = HoodieHiveUtil.getNthParent(dataPath, levels);
LOG.info("Reading hoodie metadata from path " + baseDir.toString());
return new HoodieTableMetaClient(fs, baseDir.toString());

View File

@@ -16,6 +16,7 @@
package com.uber.hoodie.hadoop;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.exception.DatasetNotFoundException;
@@ -117,7 +118,15 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
}
// Perform actual checking.
Path baseDir = safeGetParentsParent(folder);
Path baseDir;
if (HoodiePartitionMetadata.hasPartitionMetadata(fs, folder)) {
HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, folder);
metadata.readFromFS();
baseDir = HoodieHiveUtil.getNthParent(folder, metadata.getPartitionDepth());
} else {
baseDir = safeGetParentsParent(folder);
}
if (baseDir != null) {
try {
HoodieTableMetaClient metaClient =