diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java index 3b398e3b9..5e31e5cf9 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java @@ -72,4 +72,13 @@ public class HoodieTableHeaderFields { public static final String HEADER_DELTA_BASE_UNSCHEDULED = "Delta To Base Ratio" + COMPACTION_UNSCHEDULED_SUFFIX; public static final String HEADER_DELTA_FILES_SCHEDULED = "Delta Files" + COMPACTION_SCHEDULED_SUFFIX; public static final String HEADER_DELTA_FILES_UNSCHEDULED = "Delta Files" + COMPACTION_UNSCHEDULED_SUFFIX; + + /** + * Fields of Repair. + */ + public static final String HEADER_METADATA_PRESENT = "Metadata Present?"; + public static final String HEADER_REPAIR_ACTION = "Action"; + public static final String HEADER_HOODIE_PROPERTY = "Property"; + public static final String HEADER_OLD_VALUE = "Old Value"; + public static final String HEADER_NEW_VALUE = "New Value"; } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java index 41de7d21c..0af9ff2e9 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java @@ -20,6 +20,7 @@ package org.apache.hudi.cli.commands; import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.HoodiePrintHelper; +import org.apache.hudi.cli.HoodieTableHeaderFields; import org.apache.hudi.cli.utils.InputStreamConsumer; import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.common.fs.FSUtils; @@ -30,12 +31,15 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.util.CleanerUtils; import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.util.StringUtils; import org.apache.log4j.Logger; import org.apache.spark.launcher.SparkLauncher; +import org.apache.spark.util.Utils; import org.springframework.shell.core.CommandMarker; import org.springframework.shell.core.annotation.CliCommand; import org.springframework.shell.core.annotation.CliOption; import org.springframework.stereotype.Component; +import scala.collection.JavaConverters; import java.io.File; import java.io.FileInputStream; @@ -55,6 +59,7 @@ import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME public class RepairsCommand implements CommandMarker { private static final Logger LOG = Logger.getLogger(RepairsCommand.class); + public static final String DEDUPLICATE_RETURN_PREFIX = "Deduplicated files placed in: "; @CliCommand(value = "repair deduplicate", help = "De-duplicate a partition path contains duplicates & produce repaired files to replace with") @@ -64,19 +69,35 @@ public class RepairsCommand implements CommandMarker { @CliOption(key = {"repairedOutputPath"}, help = "Location to place the repaired files", mandatory = true) final String repairedOutputPath, @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path", - mandatory = true) final String sparkPropertiesPath) + unspecifiedDefaultValue = "") String sparkPropertiesPath, + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master, + @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", + help = "Spark executor memory") final String sparkMemory, + @CliOption(key = {"dryrun"}, + help = "Should we actually remove duplicates or just run and store result to repairedOutputPath", + unspecifiedDefaultValue = "true") final boolean dryRun) throws Exception { + if (StringUtils.isNullOrEmpty(sparkPropertiesPath)) { + sparkPropertiesPath = + Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala()); + } + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); - sparkLauncher.addAppArgs(SparkMain.SparkCommand.DEDUPLICATE.toString(), duplicatedPartitionPath, repairedOutputPath, - HoodieCLI.getTableMetaClient().getBasePath()); + sparkLauncher.addAppArgs(SparkMain.SparkCommand.DEDUPLICATE.toString(), master, sparkMemory, + duplicatedPartitionPath, repairedOutputPath, HoodieCLI.getTableMetaClient().getBasePath(), + String.valueOf(dryRun)); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); int exitCode = process.waitFor(); if (exitCode != 0) { - return "Deduplicated files placed in: " + repairedOutputPath; + return "Deduplication failed!"; + } + if (dryRun) { + return DEDUPLICATE_RETURN_PREFIX + repairedOutputPath; + } else { + return DEDUPLICATE_RETURN_PREFIX + duplicatedPartitionPath; } - return "Deduplication failed "; } @CliCommand(value = "repair addpartitionmeta", help = "Add partition metadata to a table, if not present") @@ -106,12 +127,14 @@ public class RepairsCommand implements CommandMarker { HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(HoodieCLI.fs, latestCommit, basePath, partitionPath); partitionMetadata.trySave(0); + row[2] = "Repaired"; } } rows[ind++] = row; } - return HoodiePrintHelper.print(new String[] {"Partition Path", "Metadata Present?", "Action"}, rows); + return HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH, + HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows); } @CliCommand(value = "repair overwrite-hoodie-props", help = "Overwrite hoodie.properties with provided file. Risky operation. Proceed with caution!") @@ -140,7 +163,8 @@ public class RepairsCommand implements CommandMarker { }; rows[ind++] = row; } - return HoodiePrintHelper.print(new String[] {"Property", "Old Value", "New Value"}, rows); + return HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY, + HoodieTableHeaderFields.HEADER_OLD_VALUE, HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows); } @CliCommand(value = "repair corrupted clean files", help = "repair corrupted clean files") diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index 5aa225559..5d8972d0b 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -73,8 +73,8 @@ public class SparkMain { returnCode = rollback(jsc, args[1], args[2]); break; case DEDUPLICATE: - assert (args.length == 4); - returnCode = deduplicatePartitionPath(jsc, args[1], args[2], args[3]); + assert (args.length == 7); + returnCode = deduplicatePartitionPath(jsc, args[3], args[4], args[5], args[6]); break; case ROLLBACK_TO_SAVEPOINT: assert (args.length == 3); @@ -162,7 +162,8 @@ public class SparkMain { private static boolean sparkMasterContained(SparkCommand command) { List masterContained = Arrays.asList(SparkCommand.COMPACT_VALIDATE, SparkCommand.COMPACT_REPAIR, - SparkCommand.COMPACT_UNSCHEDULE_PLAN, SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN); + SparkCommand.COMPACT_UNSCHEDULE_PLAN, SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN, + SparkCommand.DEDUPLICATE); return masterContained.contains(command); } @@ -263,10 +264,10 @@ public class SparkMain { } private static int deduplicatePartitionPath(JavaSparkContext jsc, String duplicatedPartitionPath, - String repairedOutputPath, String basePath) { + String repairedOutputPath, String basePath, String dryRun) { DedupeSparkJob job = new DedupeSparkJob(basePath, duplicatedPartitionPath, repairedOutputPath, new SQLContext(jsc), FSUtils.getFs(basePath, jsc.hadoopConfiguration())); - job.fixDuplicates(true); + job.fixDuplicates(Boolean.parseBoolean(dryRun)); return 0; } diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala index a229a15ce..7de5f42f5 100644 --- a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala +++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala @@ -46,6 +46,10 @@ object SparkHelpers { HoodieIndexConfig.DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_INDEX_FILTER_TYPE); val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter) val parquetConfig: HoodieParquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf, HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO.toDouble) + + // Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'. + parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader) + val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](instantTime, destinationFile, parquetConfig, schema, new SparkTaskContextSupplier()) for (rec <- sourceRecords) { val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java new file mode 100644 index 000000000..9fd44b424 --- /dev/null +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.commands; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.cli.AbstractShellIntegrationTest; +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.HoodiePrintHelper; +import org.apache.hudi.cli.HoodieTableHeaderFields; +import org.apache.hudi.cli.common.HoodieTestCommitMetadataGenerator; +import org.apache.hudi.common.HoodieTestDataGenerator; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.BeforeEach; +import org.springframework.shell.core.CommandResult; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test class for {@link RepairsCommand}. + */ +public class TestRepairsCommand extends AbstractShellIntegrationTest { + + private String tablePath; + + @BeforeEach + public void init() throws IOException { + String tableName = "test_table"; + tablePath = basePath + File.separator + tableName; + + // Create table and connect + new TableCommand().createTable( + tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(), + "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); + } + + /** + * Test case for dry run 'repair addpartitionmeta'. + */ + @Test + public void testAddPartitionMetaWithDryRun() throws IOException { + // create commit instant + Files.createFile(Paths.get(tablePath + "/.hoodie/100.commit")); + + // create partition path + String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; + String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH; + String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH; + assertTrue(fs.mkdirs(new Path(partition1))); + assertTrue(fs.mkdirs(new Path(partition2))); + assertTrue(fs.mkdirs(new Path(partition3))); + + // default is dry run. + CommandResult cr = getShell().executeCommand("repair addpartitionmeta"); + assertTrue(cr.isSuccess()); + + // expected all 'No'. + String[][] rows = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath) + .stream() + .map(partition -> new String[]{partition, "No", "None"}) + .toArray(String[][]::new); + String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH, + HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows); + + assertEquals(expected, cr.getResult().toString()); + } + + /** + * Test case for real run 'repair addpartitionmeta'. + */ + @Test + public void testAddPartitionMetaWithRealRun() throws IOException { + // create commit instant + Files.createFile(Paths.get(tablePath + "/.hoodie/100.commit")); + + // create partition path + String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; + String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH; + String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH; + assertTrue(fs.mkdirs(new Path(partition1))); + assertTrue(fs.mkdirs(new Path(partition2))); + assertTrue(fs.mkdirs(new Path(partition3))); + + CommandResult cr = getShell().executeCommand("repair addpartitionmeta --dryrun false"); + assertTrue(cr.isSuccess()); + + List paths = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath); + // after dry run, the action will be 'Repaired' + String[][] rows = paths.stream() + .map(partition -> new String[]{partition, "No", "Repaired"}) + .toArray(String[][]::new); + String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH, + HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows); + + assertEquals(expected, cr.getResult().toString()); + + cr = getShell().executeCommand("repair addpartitionmeta"); + + // after real run, Metadata is present now. + rows = paths.stream() + .map(partition -> new String[]{partition, "Yes", "None"}) + .toArray(String[][]::new); + expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH, + HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows); + assertEquals(expected, cr.getResult().toString()); + } + + /** + * Test case for 'repair overwrite-hoodie-props'. + */ + @Test + public void testOverwriteHoodieProperties() throws IOException { + URL newProps = this.getClass().getClassLoader().getResource("table-config.properties"); + assertNotNull(newProps, "New property file must exist"); + + CommandResult cr = getShell().executeCommand("repair overwrite-hoodie-props --new-props-file " + newProps.getPath()); + assertTrue(cr.isSuccess()); + + Map oldProps = HoodieCLI.getTableMetaClient().getTableConfig().getProps(); + + // after overwrite, the stored value in .hoodie is equals to which read from properties. + Map result = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().getProps(); + Properties expectProps = new Properties(); + expectProps.load(new FileInputStream(new File(newProps.getPath()))); + + Map expected = expectProps.entrySet().stream() + .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue()))); + assertEquals(expected, result); + + // check result + List allPropsStr = Arrays.asList("hoodie.table.name", "hoodie.table.type", + "hoodie.archivelog.folder", "hoodie.timeline.layout.version"); + String[][] rows = allPropsStr.stream().sorted().map(key -> new String[]{key, + oldProps.getOrDefault(key, null), result.getOrDefault(key, null)}) + .toArray(String[][]::new); + String expect = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY, + HoodieTableHeaderFields.HEADER_OLD_VALUE, HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows); + + assertEquals(expect, cr.getResult().toString()); + } + + /** + * Test case for 'repair corrupted clean files'. + */ + @Test + public void testRemoveCorruptedPendingCleanAction() throws IOException { + HoodieCLI.conf = jsc.hadoopConfiguration(); + + Configuration conf = HoodieCLI.conf; + + metaClient = HoodieCLI.getTableMetaClient(); + + // Create four requested files + for (int i = 100; i < 104; i++) { + String timestamp = String.valueOf(i); + // Write corrupted requested Compaction + HoodieTestCommitMetadataGenerator.createCompactionRequestedFile(tablePath, timestamp, conf); + } + + // reload meta client + metaClient = HoodieTableMetaClient.reload(metaClient); + // first, there are four instants + assertEquals(4, metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count()); + + CommandResult cr = getShell().executeCommand("repair corrupted clean files"); + assertTrue(cr.isSuccess()); + + // reload meta client + metaClient = HoodieTableMetaClient.reload(metaClient); + assertEquals(0, metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count()); + } +} diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java new file mode 100644 index 000000000..4f48bc34f --- /dev/null +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.integ; + +import org.apache.avro.Schema; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.cli.AbstractShellIntegrationTest; +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.commands.RepairsCommand; +import org.apache.hudi.cli.commands.TableCommand; +import org.apache.hudi.common.HoodieClientTestUtils; +import org.apache.hudi.common.HoodieTestDataGenerator; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.SchemaTestUtil; +import org.apache.spark.sql.Dataset; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.BeforeEach; +import org.springframework.shell.core.CommandResult; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.spark.sql.functions.lit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Integration test class for {@link RepairsCommand#deduplicate}. + *

+ * A command use SparkLauncher need load jars under lib which generate during mvn package. + * Use integration test instead of unit test. + */ +public class ITTestRepairsCommand extends AbstractShellIntegrationTest { + + private String duplicatedPartitionPath; + private String repairedOutputPath; + + @BeforeEach + public void init() throws IOException, URISyntaxException { + String tablePath = basePath + File.separator + "test_table"; + duplicatedPartitionPath = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; + repairedOutputPath = basePath + File.separator + "tmp"; + + HoodieCLI.conf = jsc.hadoopConfiguration(); + + // Create table and connect + new TableCommand().createTable( + tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(), + "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); + + // generate 200 records + Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); + + String fileName1 = "1_0_20160401010101.parquet"; + String fileName2 = "2_0_20160401010101.parquet"; + + List hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema); + HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, + fileName1, hoodieRecords1, schema, null, false); + List hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema); + HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, + fileName2, hoodieRecords2, schema, null, false); + + // generate commit file + String fileId1 = UUID.randomUUID().toString(); + String testWriteToken = "1-0-1"; + String commitTime = FSUtils.getCommitTime(fileName1); + Files.createFile(Paths.get(duplicatedPartitionPath + "/" + + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime, 1, testWriteToken))); + Files.createFile(Paths.get(tablePath + "/.hoodie/" + commitTime + ".commit")); + + // read records and get 10 to generate duplicates + Dataset df = sqlContext.read().parquet(duplicatedPartitionPath); + + String fileName3 = "3_0_20160401010202.parquet"; + commitTime = FSUtils.getCommitTime(fileName3); + df.limit(10).withColumn("_hoodie_commit_time", lit(commitTime)) + .write().parquet(duplicatedPartitionPath + File.separator + fileName3); + Files.createFile(Paths.get(tablePath + "/.hoodie/" + commitTime + ".commit")); + + metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); + } + + /** + * Test case for dry run deduplicate. + */ + @Test + public void testDeduplicate() throws IOException { + // get fs and check number of latest files + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, + metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), + fs.listStatus(new Path(duplicatedPartitionPath))); + List filteredStatuses = fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); + assertEquals(3, filteredStatuses.size(), "There should be 3 files."); + + // Before deduplicate, all files contain 210 records + String[] files = filteredStatuses.toArray(new String[0]); + Dataset df = sqlContext.read().parquet(files); + assertEquals(210, df.count()); + + String partitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; + String cmdStr = String.format("repair deduplicate --duplicatedPartitionPath %s --repairedOutputPath %s --sparkMaster %s", + partitionPath, repairedOutputPath, "local"); + CommandResult cr = getShell().executeCommand(cmdStr); + assertTrue(cr.isSuccess()); + assertEquals(RepairsCommand.DEDUPLICATE_RETURN_PREFIX + repairedOutputPath, cr.getResult().toString()); + + // After deduplicate, there are 200 records + FileStatus[] fileStatus = fs.listStatus(new Path(repairedOutputPath)); + files = Arrays.stream(fileStatus).map(status -> status.getPath().toString()).toArray(String[]::new); + Dataset result = sqlContext.read().parquet(files); + assertEquals(200, result.count()); + } + + /** + * Test case for real run deduplicate. + */ + @Test + public void testDeduplicateWithReal() throws IOException { + // get fs and check number of latest files + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, + metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), + fs.listStatus(new Path(duplicatedPartitionPath))); + List filteredStatuses = fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); + assertEquals(3, filteredStatuses.size(), "There should be 3 files."); + + // Before deduplicate, all files contain 210 records + String[] files = filteredStatuses.toArray(new String[0]); + Dataset df = sqlContext.read().parquet(files); + assertEquals(210, df.count()); + + String partitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; + String cmdStr = String.format("repair deduplicate --duplicatedPartitionPath %s --repairedOutputPath %s" + + " --sparkMaster %s --dryrun %s", partitionPath, repairedOutputPath, "local", false); + CommandResult cr = getShell().executeCommand(cmdStr); + assertTrue(cr.isSuccess()); + assertEquals(RepairsCommand.DEDUPLICATE_RETURN_PREFIX + partitionPath, cr.getResult().toString()); + + // After deduplicate, there are 200 records under partition path + FileStatus[] fileStatus = fs.listStatus(new Path(duplicatedPartitionPath)); + files = Arrays.stream(fileStatus).map(status -> status.getPath().toString()).toArray(String[]::new); + Dataset result = sqlContext.read().parquet(files); + assertEquals(200, result.count()); + } +} diff --git a/hudi-cli/src/test/resources/table-config.properties b/hudi-cli/src/test/resources/table-config.properties new file mode 100644 index 000000000..d74c0444a --- /dev/null +++ b/hudi-cli/src/test/resources/table-config.properties @@ -0,0 +1,21 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +hoodie.table.name=test_table +hoodie.table.type=COPY_ON_WRITE +hoodie.archivelog.folder=archive +hoodie.timeline.layout.version=1