[HUDI-820] cleaner repair command should only inspect clean metadata files (#1542)
This commit is contained in:
committed by
GitHub
parent
f92b9fdcc4
commit
8d0e23173b
@@ -27,9 +27,11 @@ import org.apache.hudi.common.fs.FSUtils;
|
|||||||
import org.apache.hudi.common.model.HoodiePartitionMetadata;
|
import org.apache.hudi.common.model.HoodiePartitionMetadata;
|
||||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
import org.apache.hudi.common.util.CleanerUtils;
|
import org.apache.hudi.common.util.CleanerUtils;
|
||||||
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
|
|
||||||
|
import org.apache.avro.AvroRuntimeException;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hudi.common.util.StringUtils;
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
@@ -171,14 +173,21 @@ public class RepairsCommand implements CommandMarker {
|
|||||||
public void removeCorruptedPendingCleanAction() {
|
public void removeCorruptedPendingCleanAction() {
|
||||||
|
|
||||||
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
|
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
|
||||||
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
|
HoodieTimeline cleanerTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline().getCleanerTimeline();
|
||||||
|
LOG.info("Inspecting pending clean metadata in timeline for corrupted files");
|
||||||
activeTimeline.filterInflightsAndRequested().getInstants().forEach(instant -> {
|
cleanerTimeline.filterInflightsAndRequested().getInstants().forEach(instant -> {
|
||||||
try {
|
try {
|
||||||
CleanerUtils.getCleanerPlan(client, instant);
|
CleanerUtils.getCleanerPlan(client, instant);
|
||||||
} catch (IOException e) {
|
} catch (AvroRuntimeException e) {
|
||||||
LOG.warn("try to remove corrupted instant file: " + instant);
|
LOG.warn("Corruption found. Trying to remove corrupted clean instant file: " + instant);
|
||||||
FSUtils.deleteInstantFile(client.getFs(), client.getMetaPath(), instant);
|
FSUtils.deleteInstantFile(client.getFs(), client.getMetaPath(), instant);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
if (ioe.getMessage().contains("Not an Avro data file")) {
|
||||||
|
LOG.warn("Corruption found. Trying to remove corrupted clean instant file: " + instant);
|
||||||
|
FSUtils.deleteInstantFile(client.getFs(), client.getMetaPath(), instant);
|
||||||
|
} else {
|
||||||
|
throw new HoodieIOException(ioe.getMessage(), ioe);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -188,8 +188,8 @@ public class TestRepairsCommand extends AbstractShellIntegrationTest {
|
|||||||
// Create four requested files
|
// Create four requested files
|
||||||
for (int i = 100; i < 104; i++) {
|
for (int i = 100; i < 104; i++) {
|
||||||
String timestamp = String.valueOf(i);
|
String timestamp = String.valueOf(i);
|
||||||
// Write corrupted requested Compaction
|
// Write corrupted requested Clean File
|
||||||
HoodieTestCommitMetadataGenerator.createCompactionRequestedFile(tablePath, timestamp, conf);
|
HoodieTestCommitMetadataGenerator.createEmptyCleanRequestedFile(tablePath, timestamp, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
// reload meta client
|
// reload meta client
|
||||||
|
|||||||
@@ -315,12 +315,23 @@ public class HoodieTestDataGenerator {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void createEmptyCleanRequestedFile(String basePath, String instantTime, Configuration configuration)
|
||||||
|
throws IOException {
|
||||||
|
Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
|
||||||
|
+ HoodieTimeline.makeRequestedCleanerFileName(instantTime));
|
||||||
|
createEmptyFile(basePath, commitFile, configuration);
|
||||||
|
}
|
||||||
|
|
||||||
public static void createCompactionRequestedFile(String basePath, String instantTime, Configuration configuration)
|
public static void createCompactionRequestedFile(String basePath, String instantTime, Configuration configuration)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
|
Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
|
||||||
+ HoodieTimeline.makeRequestedCompactionFileName(instantTime));
|
+ HoodieTimeline.makeRequestedCompactionFileName(instantTime));
|
||||||
|
createEmptyFile(basePath, commitFile, configuration);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void createEmptyFile(String basePath, Path filePath, Configuration configuration) throws IOException {
|
||||||
FileSystem fs = FSUtils.getFs(basePath, configuration);
|
FileSystem fs = FSUtils.getFs(basePath, configuration);
|
||||||
FSDataOutputStream os = fs.create(commitFile, true);
|
FSDataOutputStream os = fs.create(filePath, true);
|
||||||
os.close();
|
os.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user