From dce35ff0d74da19edf0e0ffdac3c5d01c3440b7d Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Sun, 26 Mar 2017 17:40:20 -0700 Subject: [PATCH] Adding a config to control whether date partitioning can be assumed - false by default - CAUTION: If you have an existing tables without partition metadata, you need to set this to "true" --- .../main/java/com/uber/hoodie/HoodieWriteClient.java | 6 +++--- .../com/uber/hoodie/config/HoodieWriteConfig.java | 8 ++++++++ .../io/compact/HoodieRealtimeTableCompactor.java | 2 +- .../test/java/com/uber/hoodie/TestHoodieClient.java | 4 ++-- .../java/com/uber/hoodie/common/util/FSUtils.java | 11 ++++++++++- .../uber/hoodie/utilities/HoodieSnapshotCopier.java | 9 ++++++--- .../hoodie/utilities/TestHoodieSnapshotCopier.java | 4 ++-- 7 files changed, 32 insertions(+), 12 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 677f78129..f734da622 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -476,7 +476,7 @@ public class HoodieWriteClient implements Seriali + lastCommitRetained); Map> latestFilesMap = jsc.parallelize( - FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath())) + FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning())) .mapToPair((PairFunction>) partitionPath -> { // Scan all partitions files with this commit time logger.info("Collecting latest files in partition path " + partitionPath); @@ -650,7 +650,7 @@ public class HoodieWriteClient implements Seriali logger.info("Clean out all parquet files generated for commits: " + commits); final LongAccumulator numFilesDeletedCounter = jsc.sc().longAccumulator(); List stats = jsc.parallelize( - FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath())) + FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning())) .map((Function) partitionPath -> { // Scan all partitions files with this commit time logger.info("Cleaning path " + partitionPath); @@ -739,7 +739,7 @@ public class HoodieWriteClient implements Seriali .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); List partitionsToClean = - FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath()); + FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()); // shuffle to distribute cleaning work across partitions evenly Collections.shuffle(partitionsToClean); logger.info("Partitions to clean up : " + partitionsToClean + ", with policy " + config diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index f6f0d84f3..a6790be05 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -51,6 +51,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String DEFAULT_WRITE_STATUS_STORAGE_LEVEL = "MEMORY_AND_DISK_SER"; private static final String HOODIE_AUTO_COMMIT_PROP = "hoodie.auto.commit"; private static final String DEFAULT_HOODIE_AUTO_COMMIT = "true"; + private static final String HOODIE_ASSUME_DATE_PARTITIONING_PROP = "hoodie.assume.date.partitioning"; + private static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false"; private HoodieWriteConfig(Properties props) { @@ -76,6 +78,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Boolean.parseBoolean(props.getProperty(HOODIE_AUTO_COMMIT_PROP)); } + public Boolean shouldAssumeDatePartitioning() { + return Boolean.parseBoolean(props.getProperty(HOODIE_ASSUME_DATE_PARTITIONING_PROP)); + } + public int getInsertShuffleParallelism() { return Integer.parseInt(props.getProperty(INSERT_PARALLELISM)); } @@ -337,6 +343,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { WRITE_STATUS_STORAGE_LEVEL, DEFAULT_WRITE_STATUS_STORAGE_LEVEL); setDefaultOnCondition(props, !props.containsKey(HOODIE_AUTO_COMMIT_PROP), HOODIE_AUTO_COMMIT_PROP, DEFAULT_HOODIE_AUTO_COMMIT); + setDefaultOnCondition(props, !props.containsKey(HOODIE_ASSUME_DATE_PARTITIONING_PROP), + HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING); setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().build()); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index 32a70ecbf..3cf9d7869 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -77,7 +77,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { String compactionCommit = startCompactionCommit(hoodieTable); log.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommit); List partitionPaths = - FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath()); + FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), config.shouldAssumeDatePartitioning()); log.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); List operations = diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java index 30035990f..0f3be9901 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java @@ -403,7 +403,7 @@ public class TestHoodieClient implements Serializable { // Verify there are no errors assertNoWriteErrors(statuses); - List partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath()); + List partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), getConfig().shouldAssumeDatePartitioning()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); final TableFileSystemView view = table.getFileSystemView(); @@ -475,7 +475,7 @@ public class TestHoodieClient implements Serializable { statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); // Verify there are no errors assertNoWriteErrors(statuses); - List partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath()); + List partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), getConfig().shouldAssumeDatePartitioning()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); final TableFileSystemView view1 = table.getFileSystemView(); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java index 35308171f..d973cdbf1 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java @@ -139,7 +139,7 @@ public class FSUtils { * Obtain all the partition paths, that are present in this table, denoted by presence of {@link * com.uber.hoodie.common.model.HoodiePartitionMetadata#HOODIE_PARTITION_METAFILE} */ - public static List getAllPartitionPaths(FileSystem fs, String basePathStr) + public static List getAllFoldersWithPartitionMetaFile(FileSystem fs, String basePathStr) throws IOException { List partitions = new ArrayList<>(); Path basePath = new Path(basePathStr); @@ -155,6 +155,15 @@ public class FSUtils { return partitions; } + public static List getAllPartitionPaths(FileSystem fs, String basePathStr, boolean assumeDatePartitioning) + throws IOException { + if (assumeDatePartitioning) { + return getAllFoldersThreeLevelsDown(fs, basePathStr); + } else { + return getAllFoldersWithPartitionMetaFile(fs, basePathStr); + } + } + public static String getFileExtension(String fullName) { Preconditions.checkNotNull(fullName); String fileName = (new File(fullName)).getName(); diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java index ce7ec2b3b..40796070c 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java @@ -65,9 +65,12 @@ public class HoodieSnapshotCopier implements Serializable { @Parameter(names = {"--output-path", "-op"}, description = "The snapshot output path", required = true) String outputPath = null; + + @Parameter(names = {"--date-partitioned", "-dp"}, description = "Can we assume date partitioning?") + boolean shouldAssumeDatePartitioning = false; } - public void snapshot(JavaSparkContext jsc, String baseDir, final String outputDir) throws IOException { + public void snapshot(JavaSparkContext jsc, String baseDir, final String outputDir, final boolean shouldAssumeDatePartitioning) throws IOException { FileSystem fs = FSUtils.getFs(); final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs, baseDir); final TableFileSystemView fsView = new HoodieTableFileSystemView(tableMetadata, @@ -82,7 +85,7 @@ public class HoodieSnapshotCopier implements Serializable { final String latestCommitTimestamp = latestCommit.get().getTimestamp(); logger.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.", latestCommitTimestamp)); - List partitions = FSUtils.getAllPartitionPaths(fs, baseDir); + List partitions = FSUtils.getAllPartitionPaths(fs, baseDir, shouldAssumeDatePartitioning); if (partitions.size() > 0) { logger.info(String.format("The job needs to copy %d partitions.", partitions.size())); @@ -172,7 +175,7 @@ public class HoodieSnapshotCopier implements Serializable { // Copy HoodieSnapshotCopier copier = new HoodieSnapshotCopier(); - copier.snapshot(jsc, cfg.basePath, cfg.outputPath); + copier.snapshot(jsc, cfg.basePath, cfg.outputPath, cfg.shouldAssumeDatePartitioning); // Stop the job jsc.stop(); diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java index 64e568105..f359619a3 100644 --- a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java @@ -63,7 +63,7 @@ public class TestHoodieSnapshotCopier { // Do the snapshot HoodieSnapshotCopier copier = new HoodieSnapshotCopier(); - copier.snapshot(jsc, basePath, outputPath); + copier.snapshot(jsc, basePath, outputPath, true); // Nothing changed; we just bail out assertEquals(fs.listStatus(new Path(basePath)).length, 1); @@ -117,7 +117,7 @@ public class TestHoodieSnapshotCopier { // Do a snapshot copy HoodieSnapshotCopier copier = new HoodieSnapshotCopier(); - copier.snapshot(jsc, basePath, outputPath); + copier.snapshot(jsc, basePath, outputPath, false); // Check results assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file11.getName())));