[HUDI-1292] Created a config to enable/disable syncing of metadata table. (#3427)
* [HUDI-1292] Created a config to enable/disable syncing of metadata table. - Metadata Table should only be synced from a single pipeline to prevent conflicts. - Skip syncing metadata table for clustering and compaction - Renamed useFileListingMetadata Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
@@ -403,7 +403,9 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
|||||||
.isPresent()
|
.isPresent()
|
||||||
? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
|
? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
|
||||||
try {
|
try {
|
||||||
syncTableMetadata();
|
if (writeOperationType != WriteOperationType.CLUSTER && writeOperationType != WriteOperationType.COMPACT) {
|
||||||
|
syncTableMetadata();
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
this.txnManager.endTransaction();
|
this.txnManager.endTransaction();
|
||||||
}
|
}
|
||||||
@@ -436,7 +438,9 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
|||||||
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);
|
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);
|
||||||
archiveLog.archiveIfRequired(context);
|
archiveLog.archiveIfRequired(context);
|
||||||
autoCleanOnCommit();
|
autoCleanOnCommit();
|
||||||
syncTableMetadata();
|
if (operationType != null && operationType != WriteOperationType.CLUSTER && operationType != WriteOperationType.COMPACT) {
|
||||||
|
syncTableMetadata();
|
||||||
|
}
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
throw new HoodieIOException(ioe.getMessage(), ioe);
|
throw new HoodieIOException(ioe.getMessage(), ioe);
|
||||||
} finally {
|
} finally {
|
||||||
|
|||||||
@@ -1265,8 +1265,8 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
/**
|
/**
|
||||||
* File listing metadata configs.
|
* File listing metadata configs.
|
||||||
*/
|
*/
|
||||||
public boolean useFileListingMetadata() {
|
public boolean isMetadataTableEnabled() {
|
||||||
return metadataConfig.useFileListingMetadata();
|
return metadataConfig.enabled();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean getFileListingMetadataVerify() {
|
public boolean getFileListingMetadataVerify() {
|
||||||
|
|||||||
@@ -97,7 +97,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
|||||||
this.engineContext = engineContext;
|
this.engineContext = engineContext;
|
||||||
this.hadoopConf = new SerializableConfiguration(hadoopConf);
|
this.hadoopConf = new SerializableConfiguration(hadoopConf);
|
||||||
|
|
||||||
if (writeConfig.useFileListingMetadata()) {
|
if (writeConfig.isMetadataTableEnabled()) {
|
||||||
this.tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;
|
this.tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;
|
||||||
this.metadataWriteConfig = createMetadataWriteConfig(writeConfig);
|
this.metadataWriteConfig = createMetadataWriteConfig(writeConfig);
|
||||||
enabled = true;
|
enabled = true;
|
||||||
@@ -107,7 +107,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
|||||||
ValidationUtils.checkArgument(!this.metadataWriteConfig.inlineCompactionEnabled(), "Compaction is controlled internally for metadata table.");
|
ValidationUtils.checkArgument(!this.metadataWriteConfig.inlineCompactionEnabled(), "Compaction is controlled internally for metadata table.");
|
||||||
// Metadata Table cannot have metadata listing turned on. (infinite loop, much?)
|
// Metadata Table cannot have metadata listing turned on. (infinite loop, much?)
|
||||||
ValidationUtils.checkArgument(this.metadataWriteConfig.shouldAutoCommit(), "Auto commit is required for Metadata Table");
|
ValidationUtils.checkArgument(this.metadataWriteConfig.shouldAutoCommit(), "Auto commit is required for Metadata Table");
|
||||||
ValidationUtils.checkArgument(!this.metadataWriteConfig.useFileListingMetadata(), "File listing cannot be used for Metadata Table");
|
ValidationUtils.checkArgument(!this.metadataWriteConfig.isMetadataTableEnabled(), "File listing cannot be used for Metadata Table");
|
||||||
|
|
||||||
initRegistry();
|
initRegistry();
|
||||||
HoodieTableMetaClient datasetMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(datasetWriteConfig.getBasePath()).build();
|
HoodieTableMetaClient datasetMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(datasetWriteConfig.getBasePath()).build();
|
||||||
|
|||||||
@@ -202,7 +202,7 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
|
|||||||
|
|
||||||
// If metadata table is enabled, do not archive instants which are more recent that the latest synced
|
// If metadata table is enabled, do not archive instants which are more recent that the latest synced
|
||||||
// instant on the metadata table. This is required for metadata table sync.
|
// instant on the metadata table. This is required for metadata table sync.
|
||||||
if (config.useFileListingMetadata()) {
|
if (config.isMetadataTableEnabled()) {
|
||||||
try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(table.getContext(), config.getMetadataConfig(),
|
try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(table.getContext(), config.getMetadataConfig(),
|
||||||
config.getBasePath(), FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue())) {
|
config.getBasePath(), FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue())) {
|
||||||
Option<String> lastSyncedInstantTime = tableMetadata.getSyncedInstantTime();
|
Option<String> lastSyncedInstantTime = tableMetadata.getSyncedInstantTime();
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ public class JavaInsertOverwriteTableCommitActionExecutor<T extends HoodieRecord
|
|||||||
protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<List<WriteStatus>> writeResult) {
|
protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<List<WriteStatus>> writeResult) {
|
||||||
Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
|
Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
|
||||||
List<String> partitionPaths = FSUtils.getAllPartitionPaths(context,
|
List<String> partitionPaths = FSUtils.getAllPartitionPaths(context,
|
||||||
table.getMetaClient().getBasePath(), config.useFileListingMetadata(),
|
table.getMetaClient().getBasePath(), config.isMetadataTableEnabled(),
|
||||||
config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
|
config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
|
||||||
|
|
||||||
if (partitionPaths != null && partitionPaths.size() > 0) {
|
if (partitionPaths != null && partitionPaths.size() > 0) {
|
||||||
|
|||||||
@@ -447,6 +447,11 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void syncTableMetadata() {
|
public void syncTableMetadata() {
|
||||||
|
if (!config.getMetadataConfig().enableSync()) {
|
||||||
|
LOG.info("Metadata table sync is disabled in the config.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Open up the metadata table again, for syncing
|
// Open up the metadata table again, for syncing
|
||||||
try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context)) {
|
try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context)) {
|
||||||
LOG.info("Successfully synced to metadata table");
|
LOG.info("Successfully synced to metadata table");
|
||||||
|
|||||||
@@ -194,6 +194,54 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test enable/disable sync via the config.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSyncConfig() throws Exception {
|
||||||
|
init(HoodieTableType.COPY_ON_WRITE);
|
||||||
|
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
|
||||||
|
|
||||||
|
// Create the metadata table
|
||||||
|
String firstCommitTime = HoodieActiveTimeline.createNewInstantTime();
|
||||||
|
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) {
|
||||||
|
client.startCommitWithTime(firstCommitTime);
|
||||||
|
client.insert(jsc.parallelize(dataGen.generateInserts(firstCommitTime, 2)), firstCommitTime);
|
||||||
|
client.syncTableMetadata();
|
||||||
|
assertTrue(fs.exists(new Path(metadataTableBasePath)));
|
||||||
|
validateMetadata(client);
|
||||||
|
}
|
||||||
|
|
||||||
|
// If sync is disabled, the table will not sync
|
||||||
|
HoodieWriteConfig config = getWriteConfigBuilder(true, true, false)
|
||||||
|
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
|
||||||
|
.enable(true).enableMetrics(false).enableSync(false).build()).build();
|
||||||
|
final String metadataTableMetaPath = metadataTableBasePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME;
|
||||||
|
String secondCommitTime = HoodieActiveTimeline.createNewInstantTime();
|
||||||
|
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, config, true)) {
|
||||||
|
client.startCommitWithTime(secondCommitTime);
|
||||||
|
client.insert(jsc.parallelize(dataGen.generateInserts(secondCommitTime, 2)), secondCommitTime);
|
||||||
|
client.syncTableMetadata();
|
||||||
|
|
||||||
|
// Metadata Table should not have synced
|
||||||
|
assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(firstCommitTime))));
|
||||||
|
assertFalse(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(secondCommitTime))));
|
||||||
|
}
|
||||||
|
|
||||||
|
// If sync is enabled, the table will sync
|
||||||
|
String thirdCommitTime = HoodieActiveTimeline.createNewInstantTime();
|
||||||
|
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) {
|
||||||
|
client.startCommitWithTime(thirdCommitTime);
|
||||||
|
client.insert(jsc.parallelize(dataGen.generateInserts(thirdCommitTime, 2)), thirdCommitTime);
|
||||||
|
client.syncTableMetadata();
|
||||||
|
|
||||||
|
// Metadata Table should have synced
|
||||||
|
assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(firstCommitTime))));
|
||||||
|
assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(secondCommitTime))));
|
||||||
|
assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(thirdCommitTime))));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Only valid partition directories are added to the metadata.
|
* Only valid partition directories are added to the metadata.
|
||||||
*/
|
*/
|
||||||
@@ -932,7 +980,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
|
|||||||
|
|
||||||
HoodieTableMetadata tableMetadata = metadata(client);
|
HoodieTableMetadata tableMetadata = metadata(client);
|
||||||
assertNotNull(tableMetadata, "MetadataReader should have been initialized");
|
assertNotNull(tableMetadata, "MetadataReader should have been initialized");
|
||||||
if (!config.useFileListingMetadata()) {
|
if (!config.isMetadataTableEnabled()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1033,7 +1081,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
|
|||||||
|
|
||||||
// Validate write config for metadata table
|
// Validate write config for metadata table
|
||||||
HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig();
|
HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig();
|
||||||
assertFalse(metadataWriteConfig.useFileListingMetadata(), "No metadata table for metadata table");
|
assertFalse(metadataWriteConfig.isMetadataTableEnabled(), "No metadata table for metadata table");
|
||||||
assertFalse(metadataWriteConfig.getFileListingMetadataVerify(), "No verify for metadata table");
|
assertFalse(metadataWriteConfig.getFileListingMetadataVerify(), "No verify for metadata table");
|
||||||
|
|
||||||
// Metadata table should be in sync with the dataset
|
// Metadata table should be in sync with the dataset
|
||||||
|
|||||||
@@ -44,6 +44,13 @@ public final class HoodieMetadataConfig extends HoodieConfig {
|
|||||||
.sinceVersion("0.7.0")
|
.sinceVersion("0.7.0")
|
||||||
.withDocumentation("Enable the internal metadata table which serves table metadata like level file listings");
|
.withDocumentation("Enable the internal metadata table which serves table metadata like level file listings");
|
||||||
|
|
||||||
|
// Enable syncing the Metadata Table
|
||||||
|
public static final ConfigProperty<Boolean> METADATA_SYNC_ENABLE_PROP = ConfigProperty
|
||||||
|
.key(METADATA_PREFIX + ".sync.enable")
|
||||||
|
.defaultValue(true)
|
||||||
|
.sinceVersion("0.9.0")
|
||||||
|
.withDocumentation("Enable syncing of metadata table from actions on the dataset");
|
||||||
|
|
||||||
// Validate contents of Metadata Table on each access against the actual filesystem
|
// Validate contents of Metadata Table on each access against the actual filesystem
|
||||||
public static final ConfigProperty<Boolean> METADATA_VALIDATE_PROP = ConfigProperty
|
public static final ConfigProperty<Boolean> METADATA_VALIDATE_PROP = ConfigProperty
|
||||||
.key(METADATA_PREFIX + ".validate")
|
.key(METADATA_PREFIX + ".validate")
|
||||||
@@ -137,10 +144,14 @@ public final class HoodieMetadataConfig extends HoodieConfig {
|
|||||||
return getBoolean(HoodieMetadataConfig.HOODIE_ASSUME_DATE_PARTITIONING_PROP);
|
return getBoolean(HoodieMetadataConfig.HOODIE_ASSUME_DATE_PARTITIONING_PROP);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean useFileListingMetadata() {
|
public boolean enabled() {
|
||||||
return getBoolean(METADATA_ENABLE_PROP);
|
return getBoolean(METADATA_ENABLE_PROP);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean enableSync() {
|
||||||
|
return enabled() && getBoolean(HoodieMetadataConfig.METADATA_SYNC_ENABLE_PROP);
|
||||||
|
}
|
||||||
|
|
||||||
public boolean validateFileListingMetadata() {
|
public boolean validateFileListingMetadata() {
|
||||||
return getBoolean(METADATA_VALIDATE_PROP);
|
return getBoolean(METADATA_VALIDATE_PROP);
|
||||||
}
|
}
|
||||||
@@ -174,6 +185,11 @@ public final class HoodieMetadataConfig extends HoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder enableSync(boolean enable) {
|
||||||
|
metadataConfig.setValue(METADATA_SYNC_ENABLE_PROP, String.valueOf(enable));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public Builder enableMetrics(boolean enableMetrics) {
|
public Builder enableMetrics(boolean enableMetrics) {
|
||||||
metadataConfig.setValue(METADATA_METRICS_ENABLE_PROP, String.valueOf(enableMetrics));
|
metadataConfig.setValue(METADATA_METRICS_ENABLE_PROP, String.valueOf(enableMetrics));
|
||||||
return this;
|
return this;
|
||||||
|
|||||||
@@ -160,7 +160,7 @@ public class FileSystemViewManager {
|
|||||||
HoodieTableMetaClient metaClient, SerializableSupplier<HoodieTableMetadata> metadataSupplier) {
|
HoodieTableMetaClient metaClient, SerializableSupplier<HoodieTableMetadata> metadataSupplier) {
|
||||||
LOG.info("Creating InMemory based view for basePath " + metaClient.getBasePath());
|
LOG.info("Creating InMemory based view for basePath " + metaClient.getBasePath());
|
||||||
HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
|
HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
|
||||||
if (metadataConfig.useFileListingMetadata()) {
|
if (metadataConfig.enabled()) {
|
||||||
ValidationUtils.checkArgument(metadataSupplier != null, "Metadata supplier is null. Cannot instantiate metadata file system view");
|
ValidationUtils.checkArgument(metadataSupplier != null, "Metadata supplier is null. Cannot instantiate metadata file system view");
|
||||||
return new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
|
return new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
|
||||||
metadataSupplier.get());
|
metadataSupplier.get());
|
||||||
@@ -181,7 +181,7 @@ public class FileSystemViewManager {
|
|||||||
HoodieMetadataConfig metadataConfig,
|
HoodieMetadataConfig metadataConfig,
|
||||||
HoodieTimeline timeline) {
|
HoodieTimeline timeline) {
|
||||||
LOG.info("Creating InMemory based view for basePath " + metaClient.getBasePath());
|
LOG.info("Creating InMemory based view for basePath " + metaClient.getBasePath());
|
||||||
if (metadataConfig.useFileListingMetadata()) {
|
if (metadataConfig.enabled()) {
|
||||||
return new HoodieMetadataFileSystemView(engineContext, metaClient, timeline, metadataConfig);
|
return new HoodieMetadataFileSystemView(engineContext, metaClient, timeline, metadataConfig);
|
||||||
}
|
}
|
||||||
return new HoodieTableFileSystemView(metaClient, timeline);
|
return new HoodieTableFileSystemView(metaClient, timeline);
|
||||||
|
|||||||
@@ -78,7 +78,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
|
|||||||
this.spillableMapDirectory = spillableMapDirectory;
|
this.spillableMapDirectory = spillableMapDirectory;
|
||||||
this.metadataConfig = metadataConfig;
|
this.metadataConfig = metadataConfig;
|
||||||
|
|
||||||
this.enabled = metadataConfig.useFileListingMetadata();
|
this.enabled = metadataConfig.enabled();
|
||||||
if (metadataConfig.enableMetrics()) {
|
if (metadataConfig.enableMetrics()) {
|
||||||
this.metrics = Option.of(new HoodieMetadataMetrics(Registry.getRegistry("HoodieMetadata")));
|
this.metrics = Option.of(new HoodieMetadataMetrics(Registry.getRegistry("HoodieMetadata")));
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -78,7 +78,7 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable {
|
|||||||
|
|
||||||
static HoodieTableMetadata create(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String datasetBasePath,
|
static HoodieTableMetadata create(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String datasetBasePath,
|
||||||
String spillableMapPath, boolean reuse) {
|
String spillableMapPath, boolean reuse) {
|
||||||
if (metadataConfig.useFileListingMetadata()) {
|
if (metadataConfig.enabled()) {
|
||||||
return new HoodieBackedTableMetadata(engineContext, metadataConfig, datasetBasePath, spillableMapPath, reuse);
|
return new HoodieBackedTableMetadata(engineContext, metadataConfig, datasetBasePath, spillableMapPath, reuse);
|
||||||
} else {
|
} else {
|
||||||
return new FileSystemBackedTableMetadata(engineContext, new SerializableConfiguration(engineContext.getHadoopConf()),
|
return new FileSystemBackedTableMetadata(engineContext, new SerializableConfiguration(engineContext.getHadoopConf()),
|
||||||
|
|||||||
Reference in New Issue
Block a user