[HUDI-716] Exception: Not an Avro data file when running HoodieCleanClient.runClean (#1432)
This commit is contained in:
@@ -26,8 +26,11 @@ import org.apache.hudi.common.fs.FSUtils;
|
|||||||
import org.apache.hudi.common.model.HoodiePartitionMetadata;
|
import org.apache.hudi.common.model.HoodiePartitionMetadata;
|
||||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||||
|
import org.apache.hudi.common.util.CleanerUtils;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.spark.launcher.SparkLauncher;
|
import org.apache.spark.launcher.SparkLauncher;
|
||||||
import org.springframework.shell.core.CommandMarker;
|
import org.springframework.shell.core.CommandMarker;
|
||||||
import org.springframework.shell.core.annotation.CliCommand;
|
import org.springframework.shell.core.annotation.CliCommand;
|
||||||
@@ -51,6 +54,8 @@ import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME
|
|||||||
@Component
|
@Component
|
||||||
public class RepairsCommand implements CommandMarker {
|
public class RepairsCommand implements CommandMarker {
|
||||||
|
|
||||||
|
private static final Logger LOG = Logger.getLogger(RepairsCommand.class);
|
||||||
|
|
||||||
@CliCommand(value = "repair deduplicate",
|
@CliCommand(value = "repair deduplicate",
|
||||||
help = "De-duplicate a partition path contains duplicates & produce repaired files to replace with")
|
help = "De-duplicate a partition path contains duplicates & produce repaired files to replace with")
|
||||||
public String deduplicate(
|
public String deduplicate(
|
||||||
@@ -137,4 +142,20 @@ public class RepairsCommand implements CommandMarker {
|
|||||||
}
|
}
|
||||||
return HoodiePrintHelper.print(new String[] {"Property", "Old Value", "New Value"}, rows);
|
return HoodiePrintHelper.print(new String[] {"Property", "Old Value", "New Value"}, rows);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@CliCommand(value = "repair corrupted clean files", help = "repair corrupted clean files")
|
||||||
|
public void removeCorruptedPendingCleanAction() {
|
||||||
|
|
||||||
|
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
|
||||||
|
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
|
||||||
|
|
||||||
|
activeTimeline.filterInflightsAndRequested().getInstants().forEach(instant -> {
|
||||||
|
try {
|
||||||
|
CleanerUtils.getCleanerPlan(client, instant);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("try to remove corrupted instant file: " + instant);
|
||||||
|
FSUtils.deleteInstantFile(client.getFs(), client.getMetaPath(), instant);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -85,7 +85,11 @@ public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
// If there are inflight(failed) or previously requested clean operation, first perform them
|
// If there are inflight(failed) or previously requested clean operation, first perform them
|
||||||
table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant -> {
|
table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant -> {
|
||||||
LOG.info("There were previously unfinished cleaner operations. Finishing Instant=" + hoodieInstant);
|
LOG.info("There were previously unfinished cleaner operations. Finishing Instant=" + hoodieInstant);
|
||||||
|
try {
|
||||||
runClean(table, hoodieInstant);
|
runClean(table, hoodieInstant);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Failed to perform previous clean operation, instant: " + hoodieInstant, e);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
Option<HoodieCleanerPlan> cleanerPlanOpt = scheduleClean(startCleanTime);
|
Option<HoodieCleanerPlan> cleanerPlanOpt = scheduleClean(startCleanTime);
|
||||||
|
|||||||
@@ -984,6 +984,25 @@ public class TestCleaner extends TestHoodieClientBase {
|
|||||||
testPendingCompactions(config, 36, 9, retryFailure);
|
testPendingCompactions(config, 36, 9, retryFailure);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test clean previous corrupted cleanFiles.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCleanPreviousCorruptedCleanFiles() {
|
||||||
|
HoodieWriteConfig config =
|
||||||
|
HoodieWriteConfig.newBuilder()
|
||||||
|
.withPath(basePath).withAssumeDatePartitioning(true)
|
||||||
|
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||||
|
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
HoodieTestUtils.createCorruptedPendingCleanFiles(metaClient, getNextInstant());
|
||||||
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
|
|
||||||
|
List<HoodieCleanStat> cleanStats = runCleaner(config);
|
||||||
|
assertEquals("Must not clean any files", 0, cleanStats.size());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Common test method for validating pending compactions.
|
* Common test method for validating pending compactions.
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -487,6 +487,15 @@ public class FSUtils {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void deleteInstantFile(FileSystem fs, String metaPath, HoodieInstant instant) {
|
||||||
|
try {
|
||||||
|
LOG.warn("try to delete instant file: " + instant);
|
||||||
|
fs.delete(new Path(metaPath, instant.getFileName()), false);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieIOException("Could not delete instant file" + instant.getFileName(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static void deleteOlderRestoreMetaFiles(FileSystem fs, String metaPath, Stream<HoodieInstant> instants) {
|
public static void deleteOlderRestoreMetaFiles(FileSystem fs, String metaPath, Stream<HoodieInstant> instants) {
|
||||||
// TODO - this should be archived when archival is made general for all meta-data
|
// TODO - this should be archived when archival is made general for all meta-data
|
||||||
// skip MIN_ROLLBACK_TO_KEEP and delete rest
|
// skip MIN_ROLLBACK_TO_KEEP and delete rest
|
||||||
|
|||||||
@@ -179,6 +179,30 @@ public class HoodieTestUtils {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void createCorruptedPendingCleanFiles(HoodieTableMetaClient metaClient, String commitTime) {
|
||||||
|
Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(commitTime),
|
||||||
|
HoodieTimeline.makeInflightCleanerFileName(commitTime)).forEach(f -> {
|
||||||
|
FSDataOutputStream os = null;
|
||||||
|
try {
|
||||||
|
Path commitFile = new Path(
|
||||||
|
metaClient.getBasePath() + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f);
|
||||||
|
os = metaClient.getFs().create(commitFile, true);
|
||||||
|
// Write empty clean metadata
|
||||||
|
os.write(new byte[0]);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
throw new HoodieIOException(ioe.getMessage(), ioe);
|
||||||
|
} finally {
|
||||||
|
if (null != os) {
|
||||||
|
try {
|
||||||
|
os.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieIOException(e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
public static String createNewDataFile(String basePath, String partitionPath, String instantTime)
|
public static String createNewDataFile(String basePath, String partitionPath, String instantTime)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
String fileID = UUID.randomUUID().toString();
|
String fileID = UUID.randomUUID().toString();
|
||||||
|
|||||||
Reference in New Issue
Block a user