1
0

[HUDI-796] Add deduping logic for upserts case (#1558)

This commit is contained in:
Pratyaksh Sharma
2020-09-18 17:07:52 +05:30
committed by GitHub
parent bf65269f66
commit 73e5b4c7bb
5 changed files with 195 additions and 40 deletions

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.cli.commands; package org.apache.hudi.cli.commands;
import org.apache.hudi.cli.DeDupeType;
import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.HoodieTableHeaderFields; import org.apache.hudi.cli.HoodieTableHeaderFields;
@@ -77,8 +78,13 @@ public class RepairsCommand implements CommandMarker {
help = "Spark executor memory") final String sparkMemory, help = "Spark executor memory") final String sparkMemory,
@CliOption(key = {"dryrun"}, @CliOption(key = {"dryrun"},
help = "Should we actually remove duplicates or just run and store result to repairedOutputPath", help = "Should we actually remove duplicates or just run and store result to repairedOutputPath",
unspecifiedDefaultValue = "true") final boolean dryRun) unspecifiedDefaultValue = "true") final boolean dryRun,
@CliOption(key = {"dedupeType"}, help = "Valid values are - insert_type, update_type and upsert_type",
unspecifiedDefaultValue = "insert_type") final String dedupeType)
throws Exception { throws Exception {
if (!DeDupeType.values().contains(DeDupeType.withName(dedupeType))) {
throw new IllegalArgumentException("Please provide valid dedupe type!");
}
if (StringUtils.isNullOrEmpty(sparkPropertiesPath)) { if (StringUtils.isNullOrEmpty(sparkPropertiesPath)) {
sparkPropertiesPath = sparkPropertiesPath =
Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala()); Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
@@ -87,7 +93,7 @@ public class RepairsCommand implements CommandMarker {
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkMain.SparkCommand.DEDUPLICATE.toString(), master, sparkMemory, sparkLauncher.addAppArgs(SparkMain.SparkCommand.DEDUPLICATE.toString(), master, sparkMemory,
duplicatedPartitionPath, repairedOutputPath, HoodieCLI.getTableMetaClient().getBasePath(), duplicatedPartitionPath, repairedOutputPath, HoodieCLI.getTableMetaClient().getBasePath(),
String.valueOf(dryRun)); String.valueOf(dryRun), dedupeType);
Process process = sparkLauncher.launch(); Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process); InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor(); int exitCode = process.waitFor();

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.cli.commands; package org.apache.hudi.cli.commands;
import org.apache.hudi.cli.DeDupeType;
import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.cli.DedupeSparkJob; import org.apache.hudi.cli.DedupeSparkJob;
import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.cli.utils.SparkUtil;
@@ -87,8 +88,8 @@ public class SparkMain {
returnCode = rollback(jsc, args[3], args[4]); returnCode = rollback(jsc, args[3], args[4]);
break; break;
case DEDUPLICATE: case DEDUPLICATE:
assert (args.length == 7); assert (args.length == 8);
returnCode = deduplicatePartitionPath(jsc, args[3], args[4], args[5], args[6]); returnCode = deduplicatePartitionPath(jsc, args[3], args[4], args[5], Boolean.parseBoolean(args[6]), args[7]);
break; break;
case ROLLBACK_TO_SAVEPOINT: case ROLLBACK_TO_SAVEPOINT:
assert (args.length == 5); assert (args.length == 5);
@@ -304,10 +305,10 @@ public class SparkMain {
} }
private static int deduplicatePartitionPath(JavaSparkContext jsc, String duplicatedPartitionPath, private static int deduplicatePartitionPath(JavaSparkContext jsc, String duplicatedPartitionPath,
String repairedOutputPath, String basePath, String dryRun) { String repairedOutputPath, String basePath, boolean dryRun, String dedupeType) {
DedupeSparkJob job = new DedupeSparkJob(basePath, duplicatedPartitionPath, repairedOutputPath, new SQLContext(jsc), DedupeSparkJob job = new DedupeSparkJob(basePath, duplicatedPartitionPath, repairedOutputPath, new SQLContext(jsc),
FSUtils.getFs(basePath, jsc.hadoopConfiguration())); FSUtils.getFs(basePath, jsc.hadoopConfiguration()), DeDupeType.withName(dedupeType));
job.fixDuplicates(Boolean.parseBoolean(dryRun)); job.fixDuplicates(dryRun);
return 0; return 0;
} }

View File

@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.cli
object DeDupeType extends Enumeration {
type dedupeType = Value
val INSERT_TYPE = Value("insert_type")
val UPDATE_TYPE = Value("update_type")
val UPSERT_TYPE = Value("upsert_type")
}

View File

@@ -26,11 +26,10 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.exception.HoodieException import org.apache.hudi.exception.HoodieException
import org.apache.log4j.Logger import org.apache.log4j.Logger
import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.collection.mutable._ import scala.collection.mutable.{Buffer, HashMap, HashSet, ListBuffer}
/** /**
* Spark job to de-duplicate data present in a partition path * Spark job to de-duplicate data present in a partition path
@@ -39,8 +38,8 @@ class DedupeSparkJob(basePath: String,
duplicatedPartitionPath: String, duplicatedPartitionPath: String,
repairOutputPath: String, repairOutputPath: String,
sqlContext: SQLContext, sqlContext: SQLContext,
fs: FileSystem) { fs: FileSystem,
dedupeType: DeDupeType.Value) {
val sparkHelper = new SparkHelper(sqlContext, fs) val sparkHelper = new SparkHelper(sqlContext, fs)
val LOG = Logger.getLogger(this.getClass) val LOG = Logger.getLogger(this.getClass)
@@ -98,11 +97,32 @@ class DedupeSparkJob(basePath: String,
ON h.`_hoodie_record_key` = d.dupe_key ON h.`_hoodie_record_key` = d.dupe_key
""" """
val dupeMap = sqlContext.sql(dupeDataSql).collectAsList().groupBy(r => r.getString(0)) val dupeMap = sqlContext.sql(dupeDataSql).collectAsList().groupBy(r => r.getString(0))
val fileToDeleteKeyMap = new HashMap[String, HashSet[String]]() getDedupePlan(dupeMap)
}
// Mark all files except the one with latest commits for deletion private def getDedupePlan(dupeMap: Map[String, Buffer[Row]]): HashMap[String, HashSet[String]] = {
val fileToDeleteKeyMap = new HashMap[String, HashSet[String]]()
dupeMap.foreach(rt => { dupeMap.foreach(rt => {
val (key, rows) = rt val (key, rows) = rt
dedupeType match {
case DeDupeType.UPDATE_TYPE =>
/*
This corresponds to the case where all duplicates have been updated at least once.
Once updated, duplicates are bound to have same commit time unless forcefully modified.
*/
rows.init.foreach(r => {
val f = r(2).asInstanceOf[String].split("_")(0)
if (!fileToDeleteKeyMap.contains(f)) {
fileToDeleteKeyMap(f) = HashSet[String]()
}
fileToDeleteKeyMap(f).add(key)
})
case DeDupeType.INSERT_TYPE =>
/*
This corresponds to the case where duplicates got created due to INSERT and have never been updated.
*/
var maxCommit = -1L var maxCommit = -1L
rows.foreach(r => { rows.foreach(r => {
@@ -121,11 +141,48 @@ class DedupeSparkJob(basePath: String,
fileToDeleteKeyMap(f).add(key) fileToDeleteKeyMap(f).add(key)
} }
}) })
case DeDupeType.UPSERT_TYPE =>
/*
This corresponds to the case where duplicates got created as a result of inserts as well as updates,
i.e few duplicate records have been updated, while others were never updated.
*/
var maxCommit = -1L
rows.foreach(r => {
val c = r(3).asInstanceOf[String].toLong
if (c > maxCommit)
maxCommit = c
}) })
val rowsWithMaxCommit = new ListBuffer[Row]()
rows.foreach(r => {
val c = r(3).asInstanceOf[String].toLong
if (c != maxCommit) {
val f = r(2).asInstanceOf[String].split("_")(0)
if (!fileToDeleteKeyMap.contains(f)) {
fileToDeleteKeyMap(f) = HashSet[String]()
}
fileToDeleteKeyMap(f).add(key)
} else {
rowsWithMaxCommit += r
}
})
rowsWithMaxCommit.toList.init.foreach(r => {
val f = r(2).asInstanceOf[String].split("_")(0)
if (!fileToDeleteKeyMap.contains(f)) {
fileToDeleteKeyMap(f) = HashSet[String]()
}
fileToDeleteKeyMap(f).add(key)
})
case _ => throw new IllegalArgumentException("Please provide valid type for deduping!")
}
})
LOG.debug(s"fileToDeleteKeyMap size: ${fileToDeleteKeyMap.size}, map: $fileToDeleteKeyMap")
fileToDeleteKeyMap fileToDeleteKeyMap
} }
def fixDuplicates(dryRun: Boolean = true) = { def fixDuplicates(dryRun: Boolean = true) = {
val metadata = new HoodieTableMetaClient(fs.getConf, basePath) val metadata = new HoodieTableMetaClient(fs.getConf, basePath)
@@ -152,7 +209,7 @@ class DedupeSparkJob(basePath: String,
val newFilePath = new Path(s"$repairOutputPath/${fileNameToPathMap(fileName).getName}") val newFilePath = new Path(s"$repairOutputPath/${fileNameToPathMap(fileName).getName}")
LOG.info(" Skipping and writing new file for : " + fileName) LOG.info(" Skipping and writing new file for : " + fileName)
SparkHelpers.skipKeysAndWriteNewFile(instantTime, fs, badFilePath, newFilePath, dupeFixPlan(fileName)) SparkHelpers.skipKeysAndWriteNewFile(instantTime, fs, badFilePath, newFilePath, dupeFixPlan(fileName))
fs.delete(badFilePath, false) fs.delete(badFilePath, true)
} }
// 3. Check that there are no duplicates anymore. // 3. Check that there are no duplicates anymore.
@@ -175,7 +232,6 @@ class DedupeSparkJob(basePath: String,
throw new HoodieException("Some records in source are not found in fixed files. Inspect output!!") throw new HoodieException("Some records in source are not found in fixed files. Inspect output!!")
} }
println("No duplicates found & counts are in check!!!! ") println("No duplicates found & counts are in check!!!! ")
// 4. Prepare to copy the fixed files back. // 4. Prepare to copy the fixed files back.
fileNameToPathMap.foreach { case (_, filePath) => fileNameToPathMap.foreach { case (_, filePath) =>

View File

@@ -59,12 +59,16 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class ITTestRepairsCommand extends AbstractShellIntegrationTest { public class ITTestRepairsCommand extends AbstractShellIntegrationTest {
private String duplicatedPartitionPath; private String duplicatedPartitionPath;
private String duplicatedPartitionPathWithUpdates;
private String duplicatedPartitionPathWithUpserts;
private String repairedOutputPath; private String repairedOutputPath;
@BeforeEach @BeforeEach
public void init() throws Exception { public void init() throws Exception {
final String tablePath = Paths.get(basePath, "test_table").toString(); final String tablePath = Paths.get(basePath, "test_table").toString();
duplicatedPartitionPath = Paths.get(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).toString(); duplicatedPartitionPath = Paths.get(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).toString();
duplicatedPartitionPathWithUpdates = Paths.get(tablePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).toString();
duplicatedPartitionPathWithUpserts = Paths.get(tablePath, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH).toString();
repairedOutputPath = Paths.get(basePath, "tmp").toString(); repairedOutputPath = Paths.get(basePath, "tmp").toString();
HoodieCLI.conf = jsc.hadoopConfiguration(); HoodieCLI.conf = jsc.hadoopConfiguration();
@@ -85,10 +89,16 @@ public class ITTestRepairsCommand extends AbstractShellIntegrationTest {
.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "2", hoodieRecords2) .withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "2", hoodieRecords2)
.withLogFile(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); .withLogFile(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
testTable.withInserts(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "4", hoodieRecords1)
.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "6", hoodieRecords1);
// read records and get 10 to generate duplicates // read records and get 10 to generate duplicates
HoodieRecord[] dupRecords = Arrays.copyOf(hoodieRecords1, 10); HoodieRecord[] dupRecords = Arrays.copyOf(hoodieRecords1, 10);
testTable.withInserts(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "5", dupRecords);
testTable.addCommit("20160401010202") testTable.addCommit("20160401010202")
.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "3", dupRecords); .withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "3", dupRecords);
testTable.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "7", dupRecords)
.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "8", dupRecords);
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
} }
@@ -97,7 +107,7 @@ public class ITTestRepairsCommand extends AbstractShellIntegrationTest {
* Test case for dry run deduplicate. * Test case for dry run deduplicate.
*/ */
@Test @Test
public void testDeduplicate() throws IOException { public void testDeduplicateWithInserts() throws IOException {
// get fs and check number of latest files // get fs and check number of latest files
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
@@ -124,6 +134,60 @@ public class ITTestRepairsCommand extends AbstractShellIntegrationTest {
assertEquals(200, result.count()); assertEquals(200, result.count());
} }
@Test
public void testDeduplicateWithUpdates() throws IOException {
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
fs.listStatus(new Path(duplicatedPartitionPathWithUpdates)));
List<String> filteredStatuses = fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
assertEquals(2, filteredStatuses.size(), "There should be 2 files.");
// Before deduplicate, all files contain 110 records
String[] files = filteredStatuses.toArray(new String[0]);
Dataset df = sqlContext.read().parquet(files);
assertEquals(110, df.count());
String partitionPath = HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
String cmdStr = String.format("repair deduplicate --duplicatedPartitionPath %s --repairedOutputPath %s --sparkMaster %s --dedupeType %s",
partitionPath, repairedOutputPath, "local", "update_type");
CommandResult cr = getShell().executeCommand(cmdStr);
assertTrue(cr.isSuccess());
assertEquals(RepairsCommand.DEDUPLICATE_RETURN_PREFIX + repairedOutputPath, cr.getResult().toString());
// After deduplicate, there are 100 records
FileStatus[] fileStatus = fs.listStatus(new Path(repairedOutputPath));
files = Arrays.stream(fileStatus).map(status -> status.getPath().toString()).toArray(String[]::new);
Dataset result = sqlContext.read().parquet(files);
assertEquals(100, result.count());
}
@Test
public void testDeduplicateWithUpserts() throws IOException {
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
fs.listStatus(new Path(duplicatedPartitionPathWithUpserts)));
List<String> filteredStatuses = fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
assertEquals(3, filteredStatuses.size(), "There should be 3 files.");
// Before deduplicate, all files contain 120 records
String[] files = filteredStatuses.toArray(new String[0]);
Dataset df = sqlContext.read().parquet(files);
assertEquals(120, df.count());
String partitionPath = HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
String cmdStr = String.format("repair deduplicate --duplicatedPartitionPath %s --repairedOutputPath %s --sparkMaster %s --dedupeType %s",
partitionPath, repairedOutputPath, "local", "upsert_type");
CommandResult cr = getShell().executeCommand(cmdStr);
assertTrue(cr.isSuccess());
assertEquals(RepairsCommand.DEDUPLICATE_RETURN_PREFIX + repairedOutputPath, cr.getResult().toString());
// After deduplicate, there are 100 records
FileStatus[] fileStatus = fs.listStatus(new Path(repairedOutputPath));
files = Arrays.stream(fileStatus).map(status -> status.getPath().toString()).toArray(String[]::new);
Dataset result = sqlContext.read().parquet(files);
assertEquals(100, result.count());
}
/** /**
* Test case for real run deduplicate. * Test case for real run deduplicate.
*/ */