1
0

FSUtils.getAllPartitionsPaths() works based on .hoodie_partition_metadata

- clean/rollback/write paths covered by existing tests
 - Snapshot copier fixed to copy metadata file also, and test fixed
 - Existing tables need to be repaired by addition of metadata, before this can be rolled out
This commit is contained in:
Vinoth Chandar
2017-03-26 17:15:52 -07:00
committed by vinoth chandar
parent 3129770fd0
commit f9fd16069d
9 changed files with 214 additions and 207 deletions

View File

@@ -80,7 +80,7 @@ public class RepairsCommand implements CommandMarker {
final boolean dryRun) throws IOException {
String latestCommit = HoodieCLI.tableMetadata.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp();
List<String> partitionPaths = FSUtils.getAllPartitionPaths(HoodieCLI.fs,
List<String> partitionPaths = FSUtils.getAllFoldersThreeLevelsDown(HoodieCLI.fs,
HoodieCLI.tableMetadata.getBasePath());
Path basePath = new Path(HoodieCLI.tableMetadata.getBasePath());
String[][] rows = new String[partitionPaths.size() + 1][];

View File

@@ -18,31 +18,30 @@ package com.uber.hoodie;
import com.google.common.collect.Iterables;
import com.uber.hoodie.avro.model.HoodieSavepointMetadata;
import com.uber.hoodie.common.HoodieClientTestUtils;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.model.HoodieCleaningPolicy;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.ParquetUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieStorageConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieRollbackException;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.table.HoodieTable;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -60,7 +59,6 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -122,6 +120,15 @@ public class TestHoodieClient implements Serializable {
}
}
private void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException {
for (String partitionPath: partitionPaths) {
assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath)));
HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath));
pmeta.readFromFS();
assertEquals(3, pmeta.getPartitionDepth());
}
}
private void checkTaggedRecords(List<HoodieRecord> taggedRecords, String commitTime) {
for (HoodieRecord rec : taggedRecords) {
assertTrue("Record " + rec + " found with no location.", rec.isCurrentLocationKnown());
@@ -203,6 +210,9 @@ public class TestHoodieClient implements Serializable {
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);
// check the partition metadata is written out
assertPartitionMetadata(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, fs);
// verify that there is a commit
HoodieReadClient readClient = new HoodieReadClient(jsc, basePath, sqlContext);
assertEquals("Expecting a single commit.", readClient.listCommitsSince("000").size(), 1);
@@ -348,6 +358,7 @@ public class TestHoodieClient implements Serializable {
.build()).build();
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
FileSystem fs = FSUtils.getFs();
HoodieTestDataGenerator.writePartitionMetadata(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath);
/**
* Write 1 (only inserts)
@@ -433,6 +444,7 @@ public class TestHoodieClient implements Serializable {
.build()).build();
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
FileSystem fs = FSUtils.getFs();
HoodieTestDataGenerator.writePartitionMetadata(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath);
/**
* Write 1 (only inserts)
@@ -712,6 +724,10 @@ public class TestHoodieClient implements Serializable {
String commitTime2 = "20160502020601";
String commitTime3 = "20160506030611";
new File(basePath + "/.hoodie").mkdirs();
HoodieTestDataGenerator.writePartitionMetadata(FSUtils.getFs(),
new String[] {"2016/05/01", "2016/05/02", "2016/05/06"},
basePath);
// Only first two have commit files
HoodieTestUtils.createCommitFiles(basePath, commitTime1, commitTime2);
@@ -801,6 +817,9 @@ public class TestHoodieClient implements Serializable {
String commitTime2 = "20160502020601";
String commitTime3 = "20160506030611";
new File(basePath + "/.hoodie").mkdirs();
HoodieTestDataGenerator.writePartitionMetadata(FSUtils.getFs(),
new String[] {"2016/05/01", "2016/05/02", "2016/05/06"},
basePath);
// One good commit
HoodieTestUtils.createCommitFiles(basePath, commitTime1);
@@ -984,7 +1003,6 @@ public class TestHoodieClient implements Serializable {
// setup the small file handling params
HoodieWriteConfig config = getSmallInsertWriteConfig(INSERT_SPLIT_LIMIT); // hold upto 20 records max
dataGen = new HoodieTestDataGenerator(new String[] {TEST_PARTITION_PATH});
HoodieWriteClient client = new HoodieWriteClient(jsc, config);
// Inserts => will write file1
@@ -995,6 +1013,7 @@ public class TestHoodieClient implements Serializable {
List<WriteStatus> statuses= client.insert(insertRecordsRDD1, commitTime1).collect();
assertNoWriteErrors(statuses);
assertPartitionMetadata(new String[]{TEST_PARTITION_PATH}, FSUtils.getFs());
assertEquals("Just 1 file needs to be added.", 1, statuses.size());
String file1 = statuses.get(0).getFileId();

View File

@@ -18,6 +18,7 @@ package com.uber.hoodie.common;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
@@ -63,11 +64,19 @@ public class HoodieTestDataGenerator {
// based on examination of sample file, the schema produces the following per record size
public static final int SIZE_PER_RECORD = 50 * 1024;
public static final String[] DEFAULT_PARTITION_PATHS = {"2016/03/15", "2015/03/16", "2015/03/17"};
public static void writePartitionMetadata(FileSystem fs, String[] partitionPaths, String basePath) {
for (String partitionPath: partitionPaths) {
new HoodiePartitionMetadata(fs, "000", new Path(basePath), new Path(basePath, partitionPath)).trySave(0);
}
}
private List<KeyPartition> existingKeysList = new ArrayList<>();
public static Schema avroSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA));
private static Random rand = new Random(46474747);
private String[] partitionPaths = {"2016/03/15", "2015/03/16", "2015/03/17"};
private String[] partitionPaths = DEFAULT_PARTITION_PATHS;
public HoodieTestDataGenerator(String[] partitionPaths) {
this.partitionPaths = partitionPaths;

View File

@@ -60,7 +60,6 @@ import static org.junit.Assert.assertTrue;
public class TestHoodieCompactor {
private transient JavaSparkContext jsc = null;
private transient SQLContext sqlContext;
private String basePath = null;
private HoodieCompactor compactor;
private transient HoodieTestDataGenerator dataGen = null;

View File

@@ -71,7 +71,7 @@ public class HoodiePartitionMetadata {
}
public int getPartitionDepth() {
if (!props.contains(PARTITION_DEPTH_KEY)) {
if (!props.containsKey(PARTITION_DEPTH_KEY)) {
throw new HoodieException("Could not find partitionDepth in partition metafile");
}
return Integer.parseInt(props.getProperty(PARTITION_DEPTH_KEY));

View File

@@ -19,6 +19,7 @@ package com.uber.hoodie.common.util;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.uber.hoodie.avro.model.HoodieCleanMetadata;
import com.uber.hoodie.avro.model.HoodieCleanPartitionMetadata;
import com.uber.hoodie.avro.model.HoodieRollbackMetadata;
@@ -28,10 +29,10 @@ import com.uber.hoodie.avro.model.HoodieSavepointPartitionMetadata;
import com.uber.hoodie.common.HoodieCleanStat;
import com.uber.hoodie.common.HoodieRollbackStat;
import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.exception.HoodieIOException;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
@@ -40,29 +41,20 @@ import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.mapred.FsInput;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.fs.AvroFSInput;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.ByteArrayOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
public class AvroUtils {

View File

@@ -18,14 +18,19 @@ package com.uber.hoodie.common.util;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
import com.uber.hoodie.common.table.log.HoodieLogFile;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.InvalidHoodiePathException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -75,7 +80,7 @@ public class FSUtils {
fs = FileSystem.get(conf);
} catch (IOException e) {
throw new HoodieIOException("Failed to get instance of " + FileSystem.class.getName(),
e);
e);
}
LOG.info(String.format("Hadoop Configuration: fs.defaultFS: [%s], Config:[%s], FileSystem: [%s]",
conf.getRaw("fs.defaultFS"), conf.toString(), fs.toString()));
@@ -111,25 +116,43 @@ public class FSUtils {
return fs.listStatus(path)[0].getLen();
}
// TODO (weiy): rename the function for better readability
public static String getFileId(String fullFileName) {
return fullFileName.split("_")[0];
}
/**
* Obtain all the partition paths, that are present in this table.
* Gets all partition paths assuming date partitioning (year, month, day) three levels down.
*/
public static List<String> getAllPartitionPaths(FileSystem fs, String basePath)
throws IOException {
List<String> partitionsToClean = new ArrayList<>();
// TODO(vc): For now, assume partitions are two levels down from base path.
public static List<String> getAllFoldersThreeLevelsDown(FileSystem fs, String basePath) throws IOException {
List<String> datePartitions = new ArrayList<>();
FileStatus[] folders = fs.globStatus(new Path(basePath + "/*/*/*"));
for (FileStatus status : folders) {
Path path = status.getPath();
partitionsToClean.add(String.format("%s/%s/%s", path.getParent().getParent().getName(),
path.getParent().getName(), path.getName()));
datePartitions.add(String.format("%s/%s/%s", path.getParent().getParent().getName(),
path.getParent().getName(), path.getName()));
}
return partitionsToClean;
return datePartitions;
}
/**
* Obtain all the partition paths, that are present in this table, denoted by presence of {@link
* com.uber.hoodie.common.model.HoodiePartitionMetadata#HOODIE_PARTITION_METAFILE}
*/
public static List<String> getAllPartitionPaths(FileSystem fs, String basePathStr)
throws IOException {
List<String> partitions = new ArrayList<>();
Path basePath = new Path(basePathStr);
RemoteIterator<LocatedFileStatus> allFiles = fs.listFiles(new Path(basePathStr), true);
while (allFiles.hasNext()) {
Path filePath = allFiles.next().getPath();
if (filePath.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
String partitionFullPath = filePath.getParent().toString();
int partitionStartIndex = partitionFullPath.lastIndexOf(basePath.getName()) ;
partitions.add(partitionFullPath.substring(partitionStartIndex + basePath.getName().length() + 1));
}
}
return partitions;
}
public static String getFileExtension(String fullName) {
@@ -146,42 +169,34 @@ public class FSUtils {
/**
* Get the file extension from the log file
* @param logPath
* @return
*/
public static String getFileExtensionFromLog(Path logPath) {
Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName());
if(!matcher.find()) {
if (!matcher.find()) {
throw new InvalidHoodiePathException(logPath, "LogFile");
}
return matcher.group(3) + "." + matcher.group(4);
}
/**
* Get the first part of the file name in the log file. That will be the fileId.
* Log file do not have commitTime in the file name.
*
* @param path
* @return
* Get the first part of the file name in the log file. That will be the fileId. Log file do not
* have commitTime in the file name.
*/
public static String getFileIdFromLogPath(Path path) {
Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
if(!matcher.find()) {
if (!matcher.find()) {
throw new InvalidHoodiePathException(path, "LogFile");
}
return matcher.group(1);
}
/**
* Get the first part of the file name in the log file. That will be the fileId.
* Log file do not have commitTime in the file name.
*
* @param path
* @return
* Get the first part of the file name in the log file. That will be the fileId. Log file do not
* have commitTime in the file name.
*/
public static String getBaseCommitTimeFromLogPath(Path path) {
Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
if(!matcher.find()) {
if (!matcher.find()) {
throw new InvalidHoodiePathException(path, "LogFile");
}
return matcher.group(2);
@@ -189,20 +204,17 @@ public class FSUtils {
/**
* Get the last part of the file name in the log file and convert to int.
*
* @param logPath
* @return
*/
public static int getFileVersionFromLog(Path logPath) {
Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName());
if(!matcher.find()) {
if (!matcher.find()) {
throw new InvalidHoodiePathException(logPath, "LogFile");
}
return Integer.parseInt(matcher.group(5));
}
public static String makeLogFileName(String fileId, String logFileExtension,
String baseCommitTime, int version) {
String baseCommitTime, int version) {
return String.format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version);
}
@@ -213,46 +225,30 @@ public class FSUtils {
/**
* Get the latest log file written from the list of log files passed in
*
* @param logFiles
* @return
*/
public static Optional<HoodieLogFile> getLatestLogFile(Stream<HoodieLogFile> logFiles) {
return logFiles.sorted(Comparator
.comparing(s -> s.getLogVersion(),
Comparator.reverseOrder())).findFirst();
.comparing(s -> s.getLogVersion(),
Comparator.reverseOrder())).findFirst();
}
/**
* Get all the log files for the passed in FileId in the partition path
*
* @param fs
* @param partitionPath
* @param fileId
* @param logFileExtension
* @return
*/
public static Stream<HoodieLogFile> getAllLogFiles(FileSystem fs, Path partitionPath,
final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException {
final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException {
return Arrays.stream(fs.listStatus(partitionPath,
path -> path.getName().startsWith(fileId) && path.getName().contains(logFileExtension)))
.map(HoodieLogFile::new).filter(s -> s.getBaseCommitTime().equals(baseCommitTime));
path -> path.getName().startsWith(fileId) && path.getName().contains(logFileExtension)))
.map(HoodieLogFile::new).filter(s -> s.getBaseCommitTime().equals(baseCommitTime));
}
/**
* Get the latest log version for the fileId in the partition path
*
* @param fs
* @param partitionPath
* @param fileId
* @param logFileExtension
* @return
* @throws IOException
*/
public static Optional<Integer> getLatestLogVersion(FileSystem fs, Path partitionPath,
final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException {
final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException {
Optional<HoodieLogFile> latestLogFile =
getLatestLogFile(getAllLogFiles(fs, partitionPath, fileId, logFileExtension, baseCommitTime));
getLatestLogFile(getAllLogFiles(fs, partitionPath, fileId, logFileExtension, baseCommitTime));
if (latestLogFile.isPresent()) {
return Optional.of(latestLogFile.get().getLogVersion());
}
@@ -260,26 +256,20 @@ public class FSUtils {
}
public static int getCurrentLogVersion(FileSystem fs, Path partitionPath,
final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException {
final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException {
Optional<Integer> currentVersion =
getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime);
getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime);
// handle potential overflow
return (currentVersion.isPresent()) ? currentVersion.get() : 1;
}
/**
* computes the next log version for the specified fileId in the partition path
*
* @param fs
* @param partitionPath
* @param fileId
* @return
* @throws IOException
*/
public static int computeNextLogVersion(FileSystem fs, Path partitionPath, final String fileId,
final String logFileExtension, final String baseCommitTime) throws IOException {
final String logFileExtension, final String baseCommitTime) throws IOException {
Optional<Integer> currentVersion =
getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime);
getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime);
// handle potential overflow
return (currentVersion.isPresent()) ? currentVersion.get() + 1 : 1;
}
@@ -297,17 +287,13 @@ public class FSUtils {
}
/**
* When a file was opened and the task died without closing the stream, another task executor cannot open because the existing lease will be active.
* We will try to recover the lease, from HDFS. If a data node went down, it takes about 10 minutes for the lease to be rocovered.
* But if the client dies, this should be instant.
*
* @param dfs
* @param p
* @return
* @throws IOException
* When a file was opened and the task died without closing the stream, another task executor
* cannot open because the existing lease will be active. We will try to recover the lease, from
* HDFS. If a data node went down, it takes about 10 minutes for the lease to be rocovered. But
* if the client dies, this should be instant.
*/
public static boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p)
throws IOException, InterruptedException {
throws IOException, InterruptedException {
LOG.info("Recover lease on dfs file " + p);
// initiate the recovery
boolean recovered = false;
@@ -324,7 +310,7 @@ public class FSUtils {
}
public static void deleteOlderCleanMetaFiles(FileSystem fs, String metaPath,
Stream<HoodieInstant> instants) {
Stream<HoodieInstant> instants) {
//TODO - this should be archived when archival is made general for all meta-data
// skip MIN_CLEAN_TO_KEEP and delete rest
instants.skip(MIN_CLEAN_TO_KEEP).map(s -> {
@@ -332,13 +318,13 @@ public class FSUtils {
return fs.delete(new Path(metaPath, s.getFileName()), false);
} catch (IOException e) {
throw new HoodieIOException("Could not delete clean meta files" + s.getFileName(),
e);
e);
}
});
}
public static void deleteOlderRollbackMetaFiles(FileSystem fs, String metaPath,
Stream<HoodieInstant> instants) {
Stream<HoodieInstant> instants) {
//TODO - this should be archived when archival is made general for all meta-data
// skip MIN_ROLLBACK_TO_KEEP and delete rest
instants.skip(MIN_ROLLBACK_TO_KEEP).map(s -> {
@@ -346,7 +332,7 @@ public class FSUtils {
return fs.delete(new Path(metaPath, s.getFileName()), false);
} catch (IOException e) {
throw new HoodieIOException(
"Could not delete rollback meta files " + s.getFileName(), e);
"Could not delete rollback meta files " + s.getFileName(), e);
}
});
}

View File

@@ -22,6 +22,7 @@ import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
import com.uber.hoodie.common.table.HoodieTableConfig;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
@@ -50,6 +51,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Hoodie snapshot copy job which copies latest files from all partitions to another place, for snapshot backup.
@@ -71,13 +73,14 @@ public class HoodieSnapshotCopier implements Serializable {
final TableFileSystemView fsView = new HoodieTableFileSystemView(tableMetadata,
tableMetadata.getActiveTimeline().getCommitTimeline().filterCompletedInstants());
// Get the latest commit
final Optional<HoodieInstant>
latestCommit = tableMetadata.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
Optional<HoodieInstant> latestCommit = tableMetadata.getActiveTimeline()
.getCommitTimeline().filterCompletedInstants().lastInstant();
if(!latestCommit.isPresent()) {
logger.warn("No commits present. Nothing to snapshot");
} else {
logger.info(String.format("Starting to snapshot latest version files which are also no-late-than %targetBasePath.", latestCommit.get()));
return;
}
final String latestCommitTimestamp = latestCommit.get().getTimestamp();
logger.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.", latestCommitTimestamp));
List<String> partitions = FSUtils.getAllPartitionPaths(fs, baseDir);
if (partitions.size() > 0) {
@@ -90,51 +93,47 @@ public class HoodieSnapshotCopier implements Serializable {
fs.delete(new Path(outputDir), true);
}
jsc.parallelize(partitions, partitions.size()).flatMap(new FlatMapFunction<String, Tuple2<String, String>>() {
@Override
public Iterator<Tuple2<String, String>> call(String partition) throws Exception {
// Only take latest version files <= latestCommit.
FileSystem fs = FSUtils.getFs();
List<Tuple2<String, String>> filePaths = new ArrayList<>();
for (HoodieDataFile hoodieDataFile : fsView
.getLatestVersionInPartition(partition, latestCommit.get().getTimestamp())
.collect(Collectors.toList())) {
filePaths.add(new Tuple2<>(partition, hoodieDataFile.getPath()));
}
return filePaths.iterator();
}
}).foreach(new VoidFunction<Tuple2<String, String>>() {
@Override
public void call(Tuple2<String, String> tuple) throws Exception {
String partition = tuple._1();
Path sourceFilePath = new Path(tuple._2());
Path toPartitionPath = new Path(outputDir, partition);
FileSystem fs = FSUtils.getFs();
jsc.parallelize(partitions, partitions.size())
.flatMap(partition -> {
// Only take latest version files <= latestCommit.
FileSystem fs1 = FSUtils.getFs();
List<Tuple2<String, String>> filePaths = new ArrayList<>();
Stream<HoodieDataFile> dataFiles = fsView.getLatestVersionInPartition(partition, latestCommitTimestamp);
dataFiles.forEach(hoodieDataFile -> filePaths.add(new Tuple2<>(partition, hoodieDataFile.getPath())));
if (!fs.exists(toPartitionPath)) {
fs.mkdirs(toPartitionPath);
}
FileUtil.copy(fs, sourceFilePath, fs, new Path(toPartitionPath, sourceFilePath.getName()),
false, fs.getConf());
}
// also need to copy over partition metadata
Path partitionMetaFile = new Path(new Path(baseDir, partition), HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE);
if (fs1.exists(partitionMetaFile)) {
filePaths.add(new Tuple2<>(partition, partitionMetaFile.toString()));
}
return filePaths.iterator();
}).foreach(tuple -> {
String partition = tuple._1();
Path sourceFilePath = new Path(tuple._2());
Path toPartitionPath = new Path(outputDir, partition);
FileSystem fs1 = FSUtils.getFs();
if (!fs1.exists(toPartitionPath)) {
fs1.mkdirs(toPartitionPath);
}
FileUtil.copy(fs1, sourceFilePath, fs1,
new Path(toPartitionPath, sourceFilePath.getName()), false, fs1.getConf());
});
// Also copy the .commit files
logger.info(String.format("Copying .commit files which are no-late-than %targetBasePath.", latestCommit.get()));
logger.info(String.format("Copying .commit files which are no-late-than %s.", latestCommitTimestamp));
FileStatus[] commitFilesToCopy = fs.listStatus(
new Path(baseDir + "/" + HoodieTableMetaClient.METAFOLDER_NAME), new PathFilter() {
@Override
public boolean accept(Path commitFilePath) {
if (commitFilePath.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
return true;
} else {
String commitTime =
FSUtils.getCommitFromCommitFile(commitFilePath.getName());
return tableMetadata.getActiveTimeline().getCommitTimeline()
.compareTimestamps(commitTime, latestCommit.get().getTimestamp(), HoodieTimeline.GREATER);
}
}
});
new Path(baseDir + "/" + HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> {
if (commitFilePath.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
return true;
} else {
String commitTime =
FSUtils.getCommitFromCommitFile(commitFilePath.getName());
return tableMetadata.getActiveTimeline().getCommitTimeline()
.compareTimestamps(commitTime, latestCommitTimestamp, HoodieTimeline.LESSER_OR_EQUAL);
}
});
for (FileStatus commitStatus : commitFilesToCopy) {
Path targetFilePath = new Path(
outputDir + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitStatus

View File

@@ -16,6 +16,7 @@
package com.uber.hoodie.utilities;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.util.FSUtils;
import org.apache.hadoop.fs.FileSystem;
@@ -64,77 +65,79 @@ public class TestHoodieSnapshotCopier {
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
copier.snapshot(jsc, basePath, outputPath);
// Nothing changed except _SUCCESS
// Nothing changed; we just bail out
assertEquals(fs.listStatus(new Path(basePath)).length, 1);
assertTrue(fs.exists(new Path(outputPath + "/_SUCCESS")));
assertFalse(fs.exists(new Path(outputPath + "/_SUCCESS")));
}
//TODO - uncomment this after fixing test failures
// @Test
// public void testSnapshotCopy() throws Exception {
// // Generate some commits and corresponding parquets
// String commitTime1 = "20160501010101";
// String commitTime2 = "20160502020601";
// String commitTime3 = "20160506030611";
// new File(basePath + "/.hoodie").mkdirs();
// new File(basePath + "/.hoodie/hoodie.properties").createNewFile();
// // Only first two have commit files
// new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
// new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
// new File(basePath + "/.hoodie/" + commitTime3 + ".inflight").createNewFile();
//
// // Some parquet files
// new File(basePath + "/2016/05/01/").mkdirs();
// new File(basePath + "/2016/05/02/").mkdirs();
// new File(basePath + "/2016/05/06/").mkdirs();
//
// // Make commit1
// File file11 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime1, 1, "id11"));
// file11.createNewFile();
// File file12 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime1, 1, "id12"));
// file12.createNewFile();
// File file13 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime1, 1, "id13"));
// file13.createNewFile();
//
// // Make commit2
// File file21 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime2, 1, "id21"));
// file21.createNewFile();
// File file22 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime2, 1, "id22"));
// file22.createNewFile();
// File file23 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime2, 1, "id23"));
// file23.createNewFile();
//
// // Make commit3
// File file31 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime3, 1, "id31"));
// file31.createNewFile();
// File file32 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime3, 1, "id32"));
// file32.createNewFile();
// File file33 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime3, 1, "id33"));
// file33.createNewFile();
//
// // Do a snapshot copy
// HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
// copier.snapshot(jsc, basePath, outputPath);
//
// // Check results
// assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file11.getName())));
// assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + file12.getName())));
// assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + file13.getName())));
// assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file21.getName())));
// assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + file22.getName())));
// assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + file23.getName())));
// assertFalse(fs.exists(new Path(outputPath + "/2016/05/01/" + file31.getName())));
// assertFalse(fs.exists(new Path(outputPath + "/2016/05/02/" + file32.getName())));
// assertFalse(fs.exists(new Path(outputPath + "/2016/05/06/" + file33.getName())));
//
// assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime1 + ".commit")));
// assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime2 + ".commit")));
// assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 + ".commit")));
// assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 + ".inflight")));
// assertTrue(fs.exists(new Path(outputPath + "/.hoodie/hoodie.properties")));
//
// assertTrue(fs.exists(new Path(outputPath + "/_SUCCESS")));
// }
@Test
public void testSnapshotCopy() throws Exception {
// Generate some commits and corresponding parquets
String commitTime1 = "20160501010101";
String commitTime2 = "20160502020601";
String commitTime3 = "20160506030611";
new File(basePath + "/.hoodie").mkdirs();
new File(basePath + "/.hoodie/hoodie.properties").createNewFile();
// Only first two have commit files
new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
new File(basePath + "/.hoodie/" + commitTime3 + ".inflight").createNewFile();
// Some parquet files
new File(basePath + "/2016/05/01/").mkdirs();
new File(basePath + "/2016/05/02/").mkdirs();
new File(basePath + "/2016/05/06/").mkdirs();
HoodieTestDataGenerator.writePartitionMetadata(fs,
new String[] {"2016/05/01", "2016/05/02", "2016/05/06"},
basePath);
// Make commit1
File file11 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime1, 1, "id11"));
file11.createNewFile();
File file12 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime1, 1, "id12"));
file12.createNewFile();
File file13 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime1, 1, "id13"));
file13.createNewFile();
// Make commit2
File file21 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime2, 1, "id21"));
file21.createNewFile();
File file22 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime2, 1, "id22"));
file22.createNewFile();
File file23 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime2, 1, "id23"));
file23.createNewFile();
// Make commit3
File file31 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime3, 1, "id31"));
file31.createNewFile();
File file32 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime3, 1, "id32"));
file32.createNewFile();
File file33 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime3, 1, "id33"));
file33.createNewFile();
// Do a snapshot copy
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
copier.snapshot(jsc, basePath, outputPath);
// Check results
assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file11.getName())));
assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + file12.getName())));
assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + file13.getName())));
assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file21.getName())));
assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + file22.getName())));
assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + file23.getName())));
assertFalse(fs.exists(new Path(outputPath + "/2016/05/01/" + file31.getName())));
assertFalse(fs.exists(new Path(outputPath + "/2016/05/02/" + file32.getName())));
assertFalse(fs.exists(new Path(outputPath + "/2016/05/06/" + file33.getName())));
assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime1 + ".commit")));
assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime2 + ".commit")));
assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 + ".commit")));
assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 + ".inflight")));
assertTrue(fs.exists(new Path(outputPath + "/.hoodie/hoodie.properties")));
assertTrue(fs.exists(new Path(outputPath + "/_SUCCESS")));
}
@After
public void cleanup() {