From 73e5b4c7bbd1306a6a2686f6e8d85a2c871ac7ff Mon Sep 17 00:00:00 2001 From: Pratyaksh Sharma Date: Fri, 18 Sep 2020 17:07:52 +0530 Subject: [PATCH] [HUDI-796] Add deduping logic for upserts case (#1558) --- .../hudi/cli/commands/RepairsCommand.java | 10 +- .../apache/hudi/cli/commands/SparkMain.java | 11 +- .../org/apache/hudi/cli/DeDupeType.scala | 28 ++++ .../org/apache/hudi/cli/DedupeSparkJob.scala | 120 +++++++++++++----- .../hudi/cli/integ/ITTestRepairsCommand.java | 66 +++++++++- 5 files changed, 195 insertions(+), 40 deletions(-) create mode 100644 hudi-cli/src/main/scala/org/apache/hudi/cli/DeDupeType.scala diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java index 00ccf7013..40dddfc72 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java @@ -18,6 +18,7 @@ package org.apache.hudi.cli.commands; +import org.apache.hudi.cli.DeDupeType; import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.HoodieTableHeaderFields; @@ -77,8 +78,13 @@ public class RepairsCommand implements CommandMarker { help = "Spark executor memory") final String sparkMemory, @CliOption(key = {"dryrun"}, help = "Should we actually remove duplicates or just run and store result to repairedOutputPath", - unspecifiedDefaultValue = "true") final boolean dryRun) + unspecifiedDefaultValue = "true") final boolean dryRun, + @CliOption(key = {"dedupeType"}, help = "Valid values are - insert_type, update_type and upsert_type", + unspecifiedDefaultValue = "insert_type") final String dedupeType) throws Exception { + if (!DeDupeType.values().contains(DeDupeType.withName(dedupeType))) { + throw new IllegalArgumentException("Please provide valid dedupe type!"); + } if (StringUtils.isNullOrEmpty(sparkPropertiesPath)) { sparkPropertiesPath = Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala()); @@ -87,7 +93,7 @@ public class RepairsCommand implements CommandMarker { SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); sparkLauncher.addAppArgs(SparkMain.SparkCommand.DEDUPLICATE.toString(), master, sparkMemory, duplicatedPartitionPath, repairedOutputPath, HoodieCLI.getTableMetaClient().getBasePath(), - String.valueOf(dryRun)); + String.valueOf(dryRun), dedupeType); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); int exitCode = process.waitFor(); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index e6a3f529c..ce567b98f 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -18,6 +18,7 @@ package org.apache.hudi.cli.commands; +import org.apache.hudi.cli.DeDupeType; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.cli.DedupeSparkJob; import org.apache.hudi.cli.utils.SparkUtil; @@ -87,8 +88,8 @@ public class SparkMain { returnCode = rollback(jsc, args[3], args[4]); break; case DEDUPLICATE: - assert (args.length == 7); - returnCode = deduplicatePartitionPath(jsc, args[3], args[4], args[5], args[6]); + assert (args.length == 8); + returnCode = deduplicatePartitionPath(jsc, args[3], args[4], args[5], Boolean.parseBoolean(args[6]), args[7]); break; case ROLLBACK_TO_SAVEPOINT: assert (args.length == 5); @@ -304,10 +305,10 @@ public class SparkMain { } private static int deduplicatePartitionPath(JavaSparkContext jsc, String duplicatedPartitionPath, - String repairedOutputPath, String basePath, String dryRun) { + String repairedOutputPath, String basePath, boolean dryRun, String dedupeType) { DedupeSparkJob job = new DedupeSparkJob(basePath, duplicatedPartitionPath, repairedOutputPath, new SQLContext(jsc), - FSUtils.getFs(basePath, jsc.hadoopConfiguration())); - job.fixDuplicates(Boolean.parseBoolean(dryRun)); + FSUtils.getFs(basePath, jsc.hadoopConfiguration()), DeDupeType.withName(dedupeType)); + job.fixDuplicates(dryRun); return 0; } diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/DeDupeType.scala b/hudi-cli/src/main/scala/org/apache/hudi/cli/DeDupeType.scala new file mode 100644 index 000000000..b020f5ddb --- /dev/null +++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/DeDupeType.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli + +object DeDupeType extends Enumeration { + + type dedupeType = Value + + val INSERT_TYPE = Value("insert_type") + val UPDATE_TYPE = Value("update_type") + val UPSERT_TYPE = Value("upsert_type") +} diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala b/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala index 5f4974c86..96944c5c0 100644 --- a/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala +++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala @@ -26,11 +26,10 @@ import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.exception.HoodieException import org.apache.log4j.Logger -import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.{DataFrame, Row, SQLContext} import scala.collection.JavaConversions._ -import scala.collection.mutable._ - +import scala.collection.mutable.{Buffer, HashMap, HashSet, ListBuffer} /** * Spark job to de-duplicate data present in a partition path @@ -39,8 +38,8 @@ class DedupeSparkJob(basePath: String, duplicatedPartitionPath: String, repairOutputPath: String, sqlContext: SQLContext, - fs: FileSystem) { - + fs: FileSystem, + dedupeType: DeDupeType.Value) { val sparkHelper = new SparkHelper(sqlContext, fs) val LOG = Logger.getLogger(this.getClass) @@ -98,33 +97,91 @@ class DedupeSparkJob(basePath: String, ON h.`_hoodie_record_key` = d.dupe_key """ val dupeMap = sqlContext.sql(dupeDataSql).collectAsList().groupBy(r => r.getString(0)) - val fileToDeleteKeyMap = new HashMap[String, HashSet[String]]() - - // Mark all files except the one with latest commits for deletion - dupeMap.foreach(rt => { - val (key, rows) = rt - var maxCommit = -1L - - rows.foreach(r => { - val c = r(3).asInstanceOf[String].toLong - if (c > maxCommit) - maxCommit = c - }) - - rows.foreach(r => { - val c = r(3).asInstanceOf[String].toLong - if (c != maxCommit) { - val f = r(2).asInstanceOf[String].split("_")(0) - if (!fileToDeleteKeyMap.contains(f)) { - fileToDeleteKeyMap(f) = HashSet[String]() - } - fileToDeleteKeyMap(f).add(key) - } - }) - }) - fileToDeleteKeyMap + getDedupePlan(dupeMap) } + private def getDedupePlan(dupeMap: Map[String, Buffer[Row]]): HashMap[String, HashSet[String]] = { + val fileToDeleteKeyMap = new HashMap[String, HashSet[String]]() + dupeMap.foreach(rt => { + val (key, rows) = rt + + dedupeType match { + case DeDupeType.UPDATE_TYPE => + /* + This corresponds to the case where all duplicates have been updated at least once. + Once updated, duplicates are bound to have same commit time unless forcefully modified. + */ + rows.init.foreach(r => { + val f = r(2).asInstanceOf[String].split("_")(0) + if (!fileToDeleteKeyMap.contains(f)) { + fileToDeleteKeyMap(f) = HashSet[String]() + } + fileToDeleteKeyMap(f).add(key) + }) + + case DeDupeType.INSERT_TYPE => + /* + This corresponds to the case where duplicates got created due to INSERT and have never been updated. + */ + var maxCommit = -1L + + rows.foreach(r => { + val c = r(3).asInstanceOf[String].toLong + if (c > maxCommit) + maxCommit = c + }) + + rows.foreach(r => { + val c = r(3).asInstanceOf[String].toLong + if (c != maxCommit) { + val f = r(2).asInstanceOf[String].split("_")(0) + if (!fileToDeleteKeyMap.contains(f)) { + fileToDeleteKeyMap(f) = HashSet[String]() + } + fileToDeleteKeyMap(f).add(key) + } + }) + + case DeDupeType.UPSERT_TYPE => + /* + This corresponds to the case where duplicates got created as a result of inserts as well as updates, + i.e few duplicate records have been updated, while others were never updated. + */ + var maxCommit = -1L + + rows.foreach(r => { + val c = r(3).asInstanceOf[String].toLong + if (c > maxCommit) + maxCommit = c + }) + val rowsWithMaxCommit = new ListBuffer[Row]() + rows.foreach(r => { + val c = r(3).asInstanceOf[String].toLong + if (c != maxCommit) { + val f = r(2).asInstanceOf[String].split("_")(0) + if (!fileToDeleteKeyMap.contains(f)) { + fileToDeleteKeyMap(f) = HashSet[String]() + } + fileToDeleteKeyMap(f).add(key) + } else { + rowsWithMaxCommit += r + } + }) + + rowsWithMaxCommit.toList.init.foreach(r => { + val f = r(2).asInstanceOf[String].split("_")(0) + if (!fileToDeleteKeyMap.contains(f)) { + fileToDeleteKeyMap(f) = HashSet[String]() + } + fileToDeleteKeyMap(f).add(key) + }) + + case _ => throw new IllegalArgumentException("Please provide valid type for deduping!") + } + }) + LOG.debug(s"fileToDeleteKeyMap size: ${fileToDeleteKeyMap.size}, map: $fileToDeleteKeyMap") + fileToDeleteKeyMap + } def fixDuplicates(dryRun: Boolean = true) = { val metadata = new HoodieTableMetaClient(fs.getConf, basePath) @@ -152,7 +209,7 @@ class DedupeSparkJob(basePath: String, val newFilePath = new Path(s"$repairOutputPath/${fileNameToPathMap(fileName).getName}") LOG.info(" Skipping and writing new file for : " + fileName) SparkHelpers.skipKeysAndWriteNewFile(instantTime, fs, badFilePath, newFilePath, dupeFixPlan(fileName)) - fs.delete(badFilePath, false) + fs.delete(badFilePath, true) } // 3. Check that there are no duplicates anymore. @@ -175,7 +232,6 @@ class DedupeSparkJob(basePath: String, throw new HoodieException("Some records in source are not found in fixed files. Inspect output!!") } - println("No duplicates found & counts are in check!!!! ") // 4. Prepare to copy the fixed files back. fileNameToPathMap.foreach { case (_, filePath) => diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java index 57238f70e..f277e33b6 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java @@ -59,12 +59,16 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class ITTestRepairsCommand extends AbstractShellIntegrationTest { private String duplicatedPartitionPath; + private String duplicatedPartitionPathWithUpdates; + private String duplicatedPartitionPathWithUpserts; private String repairedOutputPath; @BeforeEach public void init() throws Exception { final String tablePath = Paths.get(basePath, "test_table").toString(); duplicatedPartitionPath = Paths.get(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).toString(); + duplicatedPartitionPathWithUpdates = Paths.get(tablePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).toString(); + duplicatedPartitionPathWithUpserts = Paths.get(tablePath, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH).toString(); repairedOutputPath = Paths.get(basePath, "tmp").toString(); HoodieCLI.conf = jsc.hadoopConfiguration(); @@ -85,10 +89,16 @@ public class ITTestRepairsCommand extends AbstractShellIntegrationTest { .withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "2", hoodieRecords2) .withLogFile(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); + testTable.withInserts(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "4", hoodieRecords1) + .withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "6", hoodieRecords1); + // read records and get 10 to generate duplicates HoodieRecord[] dupRecords = Arrays.copyOf(hoodieRecords1, 10); + testTable.withInserts(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "5", dupRecords); testTable.addCommit("20160401010202") .withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "3", dupRecords); + testTable.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "7", dupRecords) + .withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "8", dupRecords); metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); } @@ -97,7 +107,7 @@ public class ITTestRepairsCommand extends AbstractShellIntegrationTest { * Test case for dry run deduplicate. */ @Test - public void testDeduplicate() throws IOException { + public void testDeduplicateWithInserts() throws IOException { // get fs and check number of latest files HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), @@ -124,6 +134,60 @@ public class ITTestRepairsCommand extends AbstractShellIntegrationTest { assertEquals(200, result.count()); } + @Test + public void testDeduplicateWithUpdates() throws IOException { + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, + metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), + fs.listStatus(new Path(duplicatedPartitionPathWithUpdates))); + List filteredStatuses = fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); + assertEquals(2, filteredStatuses.size(), "There should be 2 files."); + + // Before deduplicate, all files contain 110 records + String[] files = filteredStatuses.toArray(new String[0]); + Dataset df = sqlContext.read().parquet(files); + assertEquals(110, df.count()); + + String partitionPath = HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH; + String cmdStr = String.format("repair deduplicate --duplicatedPartitionPath %s --repairedOutputPath %s --sparkMaster %s --dedupeType %s", + partitionPath, repairedOutputPath, "local", "update_type"); + CommandResult cr = getShell().executeCommand(cmdStr); + assertTrue(cr.isSuccess()); + assertEquals(RepairsCommand.DEDUPLICATE_RETURN_PREFIX + repairedOutputPath, cr.getResult().toString()); + + // After deduplicate, there are 100 records + FileStatus[] fileStatus = fs.listStatus(new Path(repairedOutputPath)); + files = Arrays.stream(fileStatus).map(status -> status.getPath().toString()).toArray(String[]::new); + Dataset result = sqlContext.read().parquet(files); + assertEquals(100, result.count()); + } + + @Test + public void testDeduplicateWithUpserts() throws IOException { + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, + metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), + fs.listStatus(new Path(duplicatedPartitionPathWithUpserts))); + List filteredStatuses = fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); + assertEquals(3, filteredStatuses.size(), "There should be 3 files."); + + // Before deduplicate, all files contain 120 records + String[] files = filteredStatuses.toArray(new String[0]); + Dataset df = sqlContext.read().parquet(files); + assertEquals(120, df.count()); + + String partitionPath = HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH; + String cmdStr = String.format("repair deduplicate --duplicatedPartitionPath %s --repairedOutputPath %s --sparkMaster %s --dedupeType %s", + partitionPath, repairedOutputPath, "local", "upsert_type"); + CommandResult cr = getShell().executeCommand(cmdStr); + assertTrue(cr.isSuccess()); + assertEquals(RepairsCommand.DEDUPLICATE_RETURN_PREFIX + repairedOutputPath, cr.getResult().toString()); + + // After deduplicate, there are 100 records + FileStatus[] fileStatus = fs.listStatus(new Path(repairedOutputPath)); + files = Arrays.stream(fileStatus).map(status -> status.getPath().toString()).toArray(String[]::new); + Dataset result = sqlContext.read().parquet(files); + assertEquals(100, result.count()); + } + /** * Test case for real run deduplicate. */