[HUDI-3451] Delete metadata table when the write client disables MDT (#5186)
* Add checks for metadata table init to avoid possible out-of-sync * Revise the logic to reuse existing table config * Revise docs and naming Co-authored-by: yuezhang <yuezhang@freewheel.tv> Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>
This commit is contained in:
@@ -122,6 +122,8 @@ public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
|
||||
} catch (IOException e) {
|
||||
throw new HoodieMetadataException("Checking existence of metadata table failed", e);
|
||||
}
|
||||
} else {
|
||||
maybeDeleteMetadataTable();
|
||||
}
|
||||
|
||||
return Option.empty();
|
||||
|
||||
@@ -18,15 +18,6 @@
|
||||
|
||||
package org.apache.hudi.client.functional;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieMetadataRecord;
|
||||
@@ -76,6 +67,7 @@ import org.apache.hudi.common.testutils.HoodieTestTable;
|
||||
import org.apache.hudi.common.util.ClosableIterator;
|
||||
import org.apache.hudi.common.util.HoodieTimer;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
|
||||
import org.apache.hudi.config.HoodieClusteringConfig;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
@@ -100,6 +92,16 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
|
||||
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
|
||||
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
|
||||
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.parquet.avro.AvroSchemaConverter;
|
||||
@@ -114,6 +116,7 @@ import org.junit.jupiter.params.provider.EnumSource;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
@@ -198,6 +201,69 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
validateMetadata(testTable, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTurnOffMetadataTableAfterEnable() throws Exception {
|
||||
init(COPY_ON_WRITE, true);
|
||||
String instant1 = "0000001";
|
||||
HoodieCommitMetadata hoodieCommitMetadata = doWriteOperationWithMeta(testTable, instant1, INSERT);
|
||||
|
||||
// Simulate the complete data directory including ".hoodie_partition_metadata" file
|
||||
File metaForP1 = new File(metaClient.getBasePath() + "/p1",".hoodie_partition_metadata");
|
||||
File metaForP2 = new File(metaClient.getBasePath() + "/p2",".hoodie_partition_metadata");
|
||||
metaForP1.createNewFile();
|
||||
metaForP2.createNewFile();
|
||||
|
||||
// Sync to metadata table
|
||||
metaClient.reloadActiveTimeline();
|
||||
HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
|
||||
Option metadataWriter = table.getMetadataWriter(instant1, Option.of(hoodieCommitMetadata));
|
||||
validateMetadata(testTable, true);
|
||||
|
||||
assertTrue(metadataWriter.isPresent());
|
||||
HoodieTableConfig hoodieTableConfig =
|
||||
new HoodieTableConfig(this.fs, metaClient.getMetaPath(), writeConfig.getPayloadClass());
|
||||
assertFalse(hoodieTableConfig.getMetadataPartitions().isEmpty());
|
||||
|
||||
// Turn off metadata table
|
||||
HoodieWriteConfig writeConfig2 = HoodieWriteConfig.newBuilder()
|
||||
.withProperties(this.writeConfig.getProps())
|
||||
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
|
||||
.build();
|
||||
testTable = HoodieTestTable.of(metaClient);
|
||||
String instant2 = "0000002";
|
||||
HoodieCommitMetadata hoodieCommitMetadata2 = doWriteOperationWithMeta(testTable, instant2, INSERT);
|
||||
metaClient.reloadActiveTimeline();
|
||||
HoodieTable table2 = HoodieSparkTable.create(writeConfig2, context, metaClient);
|
||||
Option metadataWriter2 = table2.getMetadataWriter(instant2, Option.of(hoodieCommitMetadata2));
|
||||
assertFalse(metadataWriter2.isPresent());
|
||||
|
||||
HoodieTableConfig hoodieTableConfig2 =
|
||||
new HoodieTableConfig(this.fs, metaClient.getMetaPath(), writeConfig2.getPayloadClass());
|
||||
assertEquals(StringUtils.EMPTY_STRING, hoodieTableConfig2.getMetadataPartitions());
|
||||
// Assert metadata table folder is deleted
|
||||
assertFalse(metaClient.getFs().exists(
|
||||
new Path(HoodieTableMetadata.getMetadataTableBasePath(writeConfig2.getBasePath()))));
|
||||
|
||||
// Enable metadata table again and initialize metadata table through
|
||||
// HoodieTable.getMetadataWriter() function
|
||||
HoodieWriteConfig writeConfig3 = HoodieWriteConfig.newBuilder()
|
||||
.withProperties(this.writeConfig.getProps())
|
||||
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
|
||||
.build();
|
||||
testTable = HoodieTestTable.of(metaClient);
|
||||
metaClient.reloadActiveTimeline();
|
||||
String instant3 = "0000003";
|
||||
HoodieCommitMetadata hoodieCommitMetadata3 = doWriteOperationWithMeta(testTable, instant3, INSERT);
|
||||
metaClient.reloadActiveTimeline();
|
||||
HoodieTable table3 = HoodieSparkTable.create(writeConfig3, context, metaClient);
|
||||
Option metadataWriter3 = table3.getMetadataWriter(instant3, Option.of(hoodieCommitMetadata3));
|
||||
validateMetadata(testTable, true);
|
||||
assertTrue(metadataWriter3.isPresent());
|
||||
HoodieTableConfig hoodieTableConfig3 =
|
||||
new HoodieTableConfig(this.fs, metaClient.getMetaPath(), writeConfig.getPayloadClass());
|
||||
assertFalse(hoodieTableConfig3.getMetadataPartitions().isEmpty());
|
||||
}
|
||||
|
||||
/**
|
||||
* Only valid partition directories are added to the metadata.
|
||||
*/
|
||||
|
||||
@@ -23,6 +23,7 @@ import org.apache.hudi.client.HoodieTimelineArchiver;
|
||||
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
||||
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
|
||||
import org.apache.hudi.common.model.HoodieCleaningPolicy;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.model.WriteConcurrencyMode;
|
||||
@@ -176,6 +177,10 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
|
||||
testTable.doWriteOperation(commitTime, operationType, emptyList(), asList("p1", "p2"), 3);
|
||||
}
|
||||
|
||||
protected HoodieCommitMetadata doWriteOperationWithMeta(HoodieTestTable testTable, String commitTime, WriteOperationType operationType) throws Exception {
|
||||
return testTable.doWriteOperation(commitTime, operationType, emptyList(), asList("p1", "p2"), 3);
|
||||
}
|
||||
|
||||
protected void doClean(HoodieTestTable testTable, String commitTime, List<String> commitsToClean) throws IOException {
|
||||
doCleanInternal(testTable, commitTime, commitsToClean, false);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user