diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RecordsCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RecordsCommand.java deleted file mode 100644 index f48611197..000000000 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RecordsCommand.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.cli.commands; - -import com.uber.hoodie.cli.HoodieCLI; -import com.uber.hoodie.cli.utils.InputStreamConsumer; -import com.uber.hoodie.cli.utils.SparkUtil; -import org.apache.spark.launcher.SparkLauncher; -import org.springframework.shell.core.CommandMarker; -import org.springframework.shell.core.annotation.CliAvailabilityIndicator; -import org.springframework.shell.core.annotation.CliCommand; -import org.springframework.shell.core.annotation.CliOption; -import org.springframework.stereotype.Component; - -@Component -public class RecordsCommand implements CommandMarker { - - @CliAvailabilityIndicator({"records deduplicate"}) - public boolean isRecordsDeduplicateAvailable() { - return HoodieCLI.tableMetadata != null; - } - - @CliCommand(value = "records deduplicate", help = "De-duplicate a partition path contains duplicates & produce repaired files to replace with") - public String deduplicate( - @CliOption(key = { - "duplicatedPartitionPath"}, help = "Partition Path containing the duplicates") - final String duplicatedPartitionPath, - @CliOption(key = {"repairedOutputPath"}, help = "Location to place the repaired files") - final String repairedOutputPath, - @CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path") - final String sparkPropertiesPath) throws Exception { - SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); - sparkLauncher - .addAppArgs(SparkMain.SparkCommand.DEDUPLICATE.toString(), duplicatedPartitionPath, - repairedOutputPath, HoodieCLI.tableMetadata.getBasePath()); - Process process = sparkLauncher.launch(); - InputStreamConsumer.captureOutput(process); - int exitCode = process.waitFor(); - - if (exitCode != 0) { - return "Deduplicated files placed in: " + repairedOutputPath; - } - return "Deduplication failed "; - } - -// @CliCommand(value = "records find", help = "Find Records in a hoodie dataset") -// public String findRecords( -// @CliOption(key = {"keys"}, help = "Keys To Find (Comma seperated)") -// final String hoodieKeys, -// @CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path") -// final String sparkPropertiesPath) throws Exception { -// SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); -// sparkLauncher -// .addAppArgs(SparkMain.RECORD_FIND, hoodieKeys, HoodieCLI.tableMetadata.getBasePath()); -// Process process = sparkLauncher.launch(); -// InputStreamConsumer.captureOutput(process); -// int exitCode = process.waitFor(); -// -// if (exitCode != 0) { -// return "Deduplicated files placed in: " + repairedOutputPath; -// } -// return "Deduplication failed "; -// } -} diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java new file mode 100644 index 000000000..8104de96a --- /dev/null +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.cli.commands; + +import com.uber.hoodie.cli.HoodieCLI; +import com.uber.hoodie.cli.HoodiePrintHelper; +import com.uber.hoodie.cli.utils.InputStreamConsumer; +import com.uber.hoodie.cli.utils.SparkUtil; +import com.uber.hoodie.common.model.HoodiePartitionMetadata; +import com.uber.hoodie.common.util.FSUtils; + +import org.apache.hadoop.fs.Path; +import org.apache.spark.launcher.SparkLauncher; +import org.springframework.shell.core.CommandMarker; +import org.springframework.shell.core.annotation.CliAvailabilityIndicator; +import org.springframework.shell.core.annotation.CliCommand; +import org.springframework.shell.core.annotation.CliOption; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.List; + +@Component +public class RepairsCommand implements CommandMarker { + + @CliAvailabilityIndicator({"repair deduplicate"}) + public boolean isRepairDeduplicateAvailable() { + return HoodieCLI.tableMetadata != null; + } + + @CliAvailabilityIndicator({"repair addpartitionmeta"}) + public boolean isRepairAddPartitionMetaAvailable() { + return HoodieCLI.tableMetadata != null; + } + + @CliCommand(value = "repair deduplicate", help = "De-duplicate a partition path contains duplicates & produce repaired files to replace with") + public String deduplicate( + @CliOption(key = { + "duplicatedPartitionPath"}, help = "Partition Path containing the duplicates") + final String duplicatedPartitionPath, + @CliOption(key = {"repairedOutputPath"}, help = "Location to place the repaired files") + final String repairedOutputPath, + @CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path") + final String sparkPropertiesPath) throws Exception { + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher + .addAppArgs(SparkMain.SparkCommand.DEDUPLICATE.toString(), duplicatedPartitionPath, + repairedOutputPath, HoodieCLI.tableMetadata.getBasePath()); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + + if (exitCode != 0) { + return "Deduplicated files placed in: " + repairedOutputPath; + } + return "Deduplication failed "; + } + + + + @CliCommand(value = "repair addpartitionmeta", help = "Add partition metadata to a dataset, if not present") + public String addPartitionMeta( + @CliOption(key = {"dryrun"}, + help = "Should we actually add or just print what would be done", + unspecifiedDefaultValue = "true") + final boolean dryRun) throws IOException { + + String latestCommit = HoodieCLI.tableMetadata.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(); + List partitionPaths = FSUtils.getAllPartitionPaths(HoodieCLI.fs, + HoodieCLI.tableMetadata.getBasePath()); + Path basePath = new Path(HoodieCLI.tableMetadata.getBasePath()); + String[][] rows = new String[partitionPaths.size() + 1][]; + + int ind = 0; + for (String partition: partitionPaths) { + Path partitionPath = new Path(basePath, partition); + String[] row = new String[3]; + row[0] = partition; row[1] = "Yes"; row[2] = "None"; + if (!HoodiePartitionMetadata.hasPartitionMetadata(HoodieCLI.fs, partitionPath)) { + row[1] = "No"; + if (!dryRun) { + HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata( + HoodieCLI.fs, + latestCommit, + basePath, + partitionPath); + partitionMetadata.trySave(0); + } + } + rows[ind++] = row; + } + + return HoodiePrintHelper.print( + new String[] {"Partition Path", "Metadata Present?", "Action"}, rows); + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieInsertHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieInsertHandle.java index ed66fc586..944d38930 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieInsertHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieInsertHandle.java @@ -17,6 +17,7 @@ package com.uber.hoodie.io; import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.model.HoodiePartitionMetadata; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; @@ -55,6 +56,11 @@ public class HoodieInsertHandle extends HoodieIOH this.path = makeNewPath(partitionPath, TaskContext.getPartitionId(), status.getFileId()); try { + HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, + commitTime, + new Path(config.getBasePath()), + new Path(config.getBasePath(), partitionPath)); + partitionMetadata.trySave(TaskContext.getPartitionId()); this.storageWriter = HoodieStorageWriterFactory.getStorageWriter(commitTime, path, hoodieTable, config, schema); } catch (IOException e) { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java index 09c6a834e..0c4b16498 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java @@ -16,6 +16,7 @@ package com.uber.hoodie.io; +import com.uber.hoodie.common.model.HoodiePartitionMetadata; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieRecord; @@ -82,6 +83,13 @@ public class HoodieUpdateHandle extends HoodieIO .getLatestDataFilesForFileId(record.getPartitionPath(), fileId).findFirst() .get().getFileName(); writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath)); + + HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, + commitTime, + new Path(config.getBasePath()), + new Path(config.getBasePath(), record.getPartitionPath())); + partitionMetadata.trySave(TaskContext.getPartitionId()); + oldFilePath = new Path( config.getBasePath() + "/" + record.getPartitionPath() + "/" + latestValidFilePath); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodiePartitionMetadata.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodiePartitionMetadata.java new file mode 100644 index 000000000..76c76a59b --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodiePartitionMetadata.java @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.model; + +import com.uber.hoodie.exception.HoodieException; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Properties; + +/** + * The metadata that goes into the meta file in each partition + */ +public class HoodiePartitionMetadata { + + public static final String HOODIE_PARTITION_METAFILE = ".hoodie_partition_metadata"; + public static final String PARTITION_DEPTH_KEY = "partitionDepth"; + public static final String COMMIT_TIME_KEY = "commitTime"; + + /** + * Contents of the metadata + */ + private final Properties props; + + /** + * Path to the partition, about which we have the metadata + */ + private final Path partitionPath; + + private final FileSystem fs; + + private static Logger log = LogManager.getLogger(HoodiePartitionMetadata.class); + + + /** + * Construct metadata from existing partition + */ + public HoodiePartitionMetadata(FileSystem fs, Path partitionPath) { + this.fs = fs; + this.props = new Properties(); + this.partitionPath = partitionPath; + } + + /** + * Construct metadata object to be written out. + */ + public HoodiePartitionMetadata(FileSystem fs, String commitTime, Path basePath, Path partitionPath) { + this(fs, partitionPath); + props.setProperty(COMMIT_TIME_KEY, commitTime); + props.setProperty(PARTITION_DEPTH_KEY, String.valueOf(partitionPath.depth() - basePath.depth())); + } + + public int getPartitionDepth() { + if (!props.contains(PARTITION_DEPTH_KEY)) { + throw new HoodieException("Could not find partitionDepth in partition metafile"); + } + return Integer.parseInt(props.getProperty(PARTITION_DEPTH_KEY)); + } + + /** + * Write the metadata safely into partition + */ + public void trySave(int taskPartitionId) { + Path tmpMetaPath = new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE + "_" + taskPartitionId); + Path metaPath = new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE); + boolean metafileExists = false; + + try { + metafileExists = fs.exists(metaPath); + if (!metafileExists) { + // write to temporary file + FSDataOutputStream os = fs.create(tmpMetaPath, true); + props.store(os, "partition metadata"); + os.hsync(); + os.hflush(); + os.close(); + + // move to actual path + fs.rename(tmpMetaPath, metaPath); + } + } catch (IOException ioe) { + log.warn("Error trying to save partition metadata (this is okay, as long as atleast 1 of these succced), " + + partitionPath, ioe); + } finally { + if (!metafileExists) { + try { + // clean up tmp file, if still lying around + if (fs.exists(tmpMetaPath)) { + fs.delete(tmpMetaPath, false); + } + } catch (IOException ioe) { + log.warn("Error trying to clean up temporary files for " + partitionPath, ioe); + } + } + } + } + + /** + * Read out the metadata for this partition + */ + public void readFromFS() { + try { + Path metaFile = new Path(partitionPath, HOODIE_PARTITION_METAFILE); + FSDataInputStream is = fs.open(metaFile); + props.load(is); + } catch (IOException ioe) { + throw new HoodieException("Error reading Hoodie partition metadata for " + partitionPath, ioe); + } + } + + // methods related to partition meta data + public static boolean hasPartitionMetadata(FileSystem fs, Path partitionPath) throws IOException { + return fs.exists(new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)); + } +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java index 653539ee5..542906372 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java @@ -233,7 +233,10 @@ public class HoodieTableFileSystemView implements TableFileSystemView, Serializa */ private Map> groupFilesByFileId(FileStatus[] files, String maxCommitTime) throws IOException { - return Arrays.stream(files).flatMap(fileStatus -> { + return Arrays.stream(files) + // filter out files starting with "." + .filter(file -> !file.getPath().getName().startsWith(".")) + .flatMap(fileStatus -> { HoodieDataFile dataFile = new HoodieDataFile(fileStatus); if (visibleActiveCommitTimeline.containsOrBeforeTimelineStarts(dataFile.getCommitTime()) && visibleActiveCommitTimeline diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java index 3ab346713..936b663d5 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java @@ -111,10 +111,6 @@ public class FSUtils { return fs.listStatus(path)[0].getLen(); } - public static String globAllFiles(String basePath) { - return String.format("%s/*/*/*/*", basePath); - } - // TODO (weiy): rename the function for better readability public static String getFileId(String fullFileName) { return fullFileName.split("_")[0]; diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieHiveUtil.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieHiveUtil.java index b1666d4c6..3eed58d67 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieHiveUtil.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieHiveUtil.java @@ -16,6 +16,7 @@ package com.uber.hoodie.hadoop; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.JobContext; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -32,6 +33,7 @@ public class HoodieHiveUtil { public static final String DEFAULT_SCAN_MODE = LATEST_SCAN_MODE; public static final int DEFAULT_MAX_COMMITS = 1; public static final int MAX_COMMIT_ALL = -1; + public static final int DEFAULT_LEVELS_TO_BASEPATH = 3; public static Integer readMaxCommits(JobContext job, String tableName) { String maxCommitName = String.format(HOODIE_MAX_COMMIT_PATTERN, tableName); @@ -55,4 +57,12 @@ public class HoodieHiveUtil { LOG.info(modePropertyName + ": " + mode); return mode; } + + public static Path getNthParent(Path path, int n) { + Path parent = path; + for (int i = 0; i < n; i++) { + parent = parent.getParent(); + } + return parent; + } } diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java index d18602341..7404e20ac 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java @@ -17,6 +17,7 @@ package com.uber.hoodie.hadoop; import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodiePartitionMetadata; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; @@ -259,8 +260,13 @@ public class HoodieInputFormat extends MapredParquetInputFormat */ private HoodieTableMetaClient getTableMetaClient(Path dataPath) throws IOException { FileSystem fs = dataPath.getFileSystem(conf); - // TODO - remove this hard-coding. Pass this in job conf, somehow. Or read the Table Location - Path baseDir = dataPath.getParent().getParent().getParent(); + int levels = HoodieHiveUtil.DEFAULT_LEVELS_TO_BASEPATH; + if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) { + HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath); + metadata.readFromFS(); + levels = metadata.getPartitionDepth(); + } + Path baseDir = HoodieHiveUtil.getNthParent(dataPath, levels); LOG.info("Reading hoodie metadata from path " + baseDir.toString()); return new HoodieTableMetaClient(fs, baseDir.toString()); diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java index 00f003ff6..e6a280751 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java @@ -16,6 +16,7 @@ package com.uber.hoodie.hadoop; import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodiePartitionMetadata; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.exception.DatasetNotFoundException; @@ -117,7 +118,15 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable { } // Perform actual checking. - Path baseDir = safeGetParentsParent(folder); + Path baseDir; + if (HoodiePartitionMetadata.hasPartitionMetadata(fs, folder)) { + HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, folder); + metadata.readFromFS(); + baseDir = HoodieHiveUtil.getNthParent(folder, metadata.getPartitionDepth()); + } else { + baseDir = safeGetParentsParent(folder); + } + if (baseDir != null) { try { HoodieTableMetaClient metaClient =