diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java index 0af9ff2e9..7b859c203 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java @@ -27,9 +27,11 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CleanerUtils; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.avro.AvroRuntimeException; import org.apache.hadoop.fs.Path; import org.apache.hudi.common.util.StringUtils; import org.apache.log4j.Logger; @@ -171,14 +173,21 @@ public class RepairsCommand implements CommandMarker { public void removeCorruptedPendingCleanAction() { HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); - HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); - - activeTimeline.filterInflightsAndRequested().getInstants().forEach(instant -> { + HoodieTimeline cleanerTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline().getCleanerTimeline(); + LOG.info("Inspecting pending clean metadata in timeline for corrupted files"); + cleanerTimeline.filterInflightsAndRequested().getInstants().forEach(instant -> { try { CleanerUtils.getCleanerPlan(client, instant); - } catch (IOException e) { - LOG.warn("try to remove corrupted instant file: " + instant); + } catch (AvroRuntimeException e) { + LOG.warn("Corruption found. Trying to remove corrupted clean instant file: " + instant); FSUtils.deleteInstantFile(client.getFs(), client.getMetaPath(), instant); + } catch (IOException ioe) { + if (ioe.getMessage().contains("Not an Avro data file")) { + LOG.warn("Corruption found. Trying to remove corrupted clean instant file: " + instant); + FSUtils.deleteInstantFile(client.getFs(), client.getMetaPath(), instant); + } else { + throw new HoodieIOException(ioe.getMessage(), ioe); + } } }); } diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java index 9fd44b424..452e24969 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java @@ -188,8 +188,8 @@ public class TestRepairsCommand extends AbstractShellIntegrationTest { // Create four requested files for (int i = 100; i < 104; i++) { String timestamp = String.valueOf(i); - // Write corrupted requested Compaction - HoodieTestCommitMetadataGenerator.createCompactionRequestedFile(tablePath, timestamp, conf); + // Write corrupted requested Clean File + HoodieTestCommitMetadataGenerator.createEmptyCleanRequestedFile(tablePath, timestamp, conf); } // reload meta client diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java index 65b567c77..9a55ade08 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java @@ -315,12 +315,23 @@ public class HoodieTestDataGenerator { }); } + public static void createEmptyCleanRequestedFile(String basePath, String instantTime, Configuration configuration) + throws IOException { + Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + + HoodieTimeline.makeRequestedCleanerFileName(instantTime)); + createEmptyFile(basePath, commitFile, configuration); + } + public static void createCompactionRequestedFile(String basePath, String instantTime, Configuration configuration) throws IOException { Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeRequestedCompactionFileName(instantTime)); + createEmptyFile(basePath, commitFile, configuration); + } + + private static void createEmptyFile(String basePath, Path filePath, Configuration configuration) throws IOException { FileSystem fs = FSUtils.getFs(basePath, configuration); - FSDataOutputStream os = fs.create(commitFile, true); + FSDataOutputStream os = fs.create(filePath, true); os.close(); }