[HUDI-3782] Fixing table config when any of the index is disabled (#5222)
This commit is contained in:
@@ -95,6 +95,7 @@ import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
|
|||||||
import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
|
import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
|
||||||
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
|
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
|
||||||
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
|
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
|
||||||
|
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions;
|
||||||
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;
|
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -377,7 +378,25 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
|||||||
if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
|
if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
|
||||||
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
|
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
|
||||||
}
|
}
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if metadata table exists, then check if any of the enabled partition types needs to be initialized
|
||||||
|
Set<String> inflightAndCompletedPartitions = getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
|
||||||
|
List<MetadataPartitionType> partitionsToInit = this.enabledPartitionTypes.stream()
|
||||||
|
.filter(p -> !inflightAndCompletedPartitions.contains(p.getPartitionPath()) && !MetadataPartitionType.FILES.equals(p))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
// if there are no partitions to initialize or there is a pending operation, then don't initialize in this round
|
||||||
|
if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
|
||||||
|
initTableMetadata(); // re-init certain flags in BaseTableMetadata
|
||||||
|
initializeEnabledFileGroups(dataMetaClient, createInstantTime, partitionsToInit);
|
||||||
|
initialCommit(createInstantTime, partitionsToInit);
|
||||||
|
updateInitializedPartitionsInTableConfig(partitionsToInit);
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T extends SpecificRecordBase> boolean metadataTableExists(HoodieTableMetaClient dataMetaClient,
|
private <T extends SpecificRecordBase> boolean metadataTableExists(HoodieTableMetaClient dataMetaClient,
|
||||||
@@ -502,26 +521,11 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
|||||||
*/
|
*/
|
||||||
private boolean initializeFromFilesystem(HoodieTableMetaClient dataMetaClient,
|
private boolean initializeFromFilesystem(HoodieTableMetaClient dataMetaClient,
|
||||||
Option<String> inflightInstantTimestamp) throws IOException {
|
Option<String> inflightInstantTimestamp) throws IOException {
|
||||||
ValidationUtils.checkState(enabled, "Metadata table cannot be initialized as it is not enabled");
|
if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
|
||||||
|
|
||||||
// We can only initialize if there are no pending operations on the dataset
|
|
||||||
List<HoodieInstant> pendingDataInstant = dataMetaClient.getActiveTimeline()
|
|
||||||
.getInstants().filter(i -> !i.isCompleted())
|
|
||||||
.filter(i -> !inflightInstantTimestamp.isPresent() || !i.getTimestamp().equals(inflightInstantTimestamp.get()))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
|
|
||||||
if (!pendingDataInstant.isEmpty()) {
|
|
||||||
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1));
|
|
||||||
LOG.warn("Cannot initialize metadata table as operation(s) are in progress on the dataset: "
|
|
||||||
+ Arrays.toString(pendingDataInstant.toArray()));
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
|
String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
|
||||||
// Otherwise, we use the timestamp of the latest completed action.
|
|
||||||
String createInstantTime = dataMetaClient.getActiveTimeline().filterCompletedInstants()
|
|
||||||
.getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
|
|
||||||
LOG.info("Creating a new metadata table in " + metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
|
|
||||||
|
|
||||||
initializeMetaClient(dataWriteConfig.getMetadataConfig().populateMetaFields());
|
initializeMetaClient(dataWriteConfig.getMetadataConfig().populateMetaFields());
|
||||||
initTableMetadata();
|
initTableMetadata();
|
||||||
@@ -535,15 +539,38 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
|||||||
enabledPartitionTypes = this.enabledPartitionTypes;
|
enabledPartitionTypes = this.enabledPartitionTypes;
|
||||||
}
|
}
|
||||||
initializeEnabledFileGroups(dataMetaClient, createInstantTime, enabledPartitionTypes);
|
initializeEnabledFileGroups(dataMetaClient, createInstantTime, enabledPartitionTypes);
|
||||||
|
|
||||||
// During cold startup, the list of files to be committed can be huge. So creating a HoodieCommitMetadata out
|
|
||||||
// of these large number of files and calling the existing update(HoodieCommitMetadata) function does not scale
|
|
||||||
// well. Hence, we have a special commit just for the initialization scenario.
|
|
||||||
initialCommit(createInstantTime, enabledPartitionTypes);
|
initialCommit(createInstantTime, enabledPartitionTypes);
|
||||||
updateInitializedPartitionsInTableConfig(enabledPartitionTypes);
|
updateInitializedPartitionsInTableConfig(enabledPartitionTypes);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String getInitialCommitInstantTime(HoodieTableMetaClient dataMetaClient) {
|
||||||
|
// If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
|
||||||
|
// Otherwise, we use the timestamp of the latest completed action.
|
||||||
|
String createInstantTime = dataMetaClient.getActiveTimeline().filterCompletedInstants()
|
||||||
|
.getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
|
||||||
|
LOG.info("Creating a new metadata table in " + metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
|
||||||
|
return createInstantTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, Option<String> inflightInstantTimestamp) {
|
||||||
|
ValidationUtils.checkState(enabled, "Metadata table cannot be initialized as it is not enabled");
|
||||||
|
|
||||||
|
// We can only initialize if there are no pending operations on the dataset
|
||||||
|
List<HoodieInstant> pendingDataInstant = dataMetaClient.getActiveTimeline()
|
||||||
|
.getInstants().filter(i -> !i.isCompleted())
|
||||||
|
.filter(i -> !inflightInstantTimestamp.isPresent() || !i.getTimestamp().equals(inflightInstantTimestamp.get()))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
if (!pendingDataInstant.isEmpty()) {
|
||||||
|
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1));
|
||||||
|
LOG.warn("Cannot initialize metadata table as operation(s) are in progress on the dataset: "
|
||||||
|
+ Arrays.toString(pendingDataInstant.toArray()));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
private void updateInitializedPartitionsInTableConfig(List<MetadataPartitionType> partitionTypes) {
|
private void updateInitializedPartitionsInTableConfig(List<MetadataPartitionType> partitionTypes) {
|
||||||
Set<String> completedPartitions = getCompletedMetadataPartitions(dataMetaClient.getTableConfig());
|
Set<String> completedPartitions = getCompletedMetadataPartitions(dataMetaClient.getTableConfig());
|
||||||
completedPartitions.addAll(partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()));
|
completedPartitions.addAll(partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()));
|
||||||
@@ -973,8 +1000,12 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is invoked to initialize metadata table for a dataset. Bootstrap Commit has special handling mechanism due to its scale compared to
|
* This is invoked to initialize metadata table for a dataset.
|
||||||
* other regular commits.
|
* Initial commit has special handling mechanism due to its scale compared to other regular commits.
|
||||||
|
* During cold startup, the list of files to be committed can be huge.
|
||||||
|
* So creating a HoodieCommitMetadata out of these large number of files,
|
||||||
|
* and calling the existing update(HoodieCommitMetadata) function does not scale well.
|
||||||
|
* Hence, we have a special commit just for the initialization scenario.
|
||||||
*/
|
*/
|
||||||
private void initialCommit(String createInstantTime, List<MetadataPartitionType> partitionTypes) {
|
private void initialCommit(String createInstantTime, List<MetadataPartitionType> partitionTypes) {
|
||||||
// List all partitions in the basePath of the containing dataset
|
// List all partitions in the basePath of the containing dataset
|
||||||
@@ -992,18 +1023,17 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
|||||||
}).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
|
}).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
|
||||||
final Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>();
|
final Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>();
|
||||||
|
|
||||||
// Record which saves the list of all partitions
|
|
||||||
HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions);
|
|
||||||
if (partitions.isEmpty()) {
|
|
||||||
// in case of initializing of a fresh table, there won't be any partitions, but we need to make a boostrap commit
|
|
||||||
final HoodieData<HoodieRecord> allPartitionRecordsRDD = engineContext.parallelize(
|
|
||||||
Collections.singletonList(allPartitionRecord), 1);
|
|
||||||
partitionToRecordsMap.put(MetadataPartitionType.FILES, allPartitionRecordsRDD);
|
|
||||||
commit(createInstantTime, partitionToRecordsMap, false);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (partitionTypes.contains(MetadataPartitionType.FILES)) {
|
if (partitionTypes.contains(MetadataPartitionType.FILES)) {
|
||||||
|
// Record which saves the list of all partitions
|
||||||
|
HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions);
|
||||||
|
if (partitions.isEmpty()) {
|
||||||
|
// in case of initializing of a fresh table, there won't be any partitions, but we need to make a boostrap commit
|
||||||
|
final HoodieData<HoodieRecord> allPartitionRecordsRDD = engineContext.parallelize(
|
||||||
|
Collections.singletonList(allPartitionRecord), 1);
|
||||||
|
partitionToRecordsMap.put(MetadataPartitionType.FILES, allPartitionRecordsRDD);
|
||||||
|
commit(createInstantTime, partitionToRecordsMap, false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
HoodieData<HoodieRecord> filesPartitionRecords = getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord);
|
HoodieData<HoodieRecord> filesPartitionRecords = getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord);
|
||||||
ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1));
|
ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1));
|
||||||
partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords);
|
partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords);
|
||||||
|
|||||||
@@ -70,7 +70,6 @@ import org.apache.hudi.exception.HoodieMetadataException;
|
|||||||
import org.apache.hudi.exception.HoodieUpsertException;
|
import org.apache.hudi.exception.HoodieUpsertException;
|
||||||
import org.apache.hudi.index.HoodieIndex;
|
import org.apache.hudi.index.HoodieIndex;
|
||||||
import org.apache.hudi.metadata.HoodieTableMetadata;
|
import org.apache.hudi.metadata.HoodieTableMetadata;
|
||||||
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
|
|
||||||
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
|
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
|
||||||
import org.apache.hudi.metadata.MetadataPartitionType;
|
import org.apache.hudi.metadata.MetadataPartitionType;
|
||||||
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
||||||
@@ -99,6 +98,13 @@ import java.util.function.Function;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
|
||||||
|
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
|
||||||
|
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
|
||||||
|
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
|
||||||
|
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
|
||||||
|
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Abstract implementation of a HoodieTable.
|
* Abstract implementation of a HoodieTable.
|
||||||
*
|
*
|
||||||
@@ -818,14 +824,60 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
|
|||||||
if (shouldExecuteMetadataTableDeletion()) {
|
if (shouldExecuteMetadataTableDeletion()) {
|
||||||
try {
|
try {
|
||||||
LOG.info("Deleting metadata table because it is disabled in writer.");
|
LOG.info("Deleting metadata table because it is disabled in writer.");
|
||||||
HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context);
|
deleteMetadataTable(config.getBasePath(), context);
|
||||||
clearMetadataTablePartitionsConfig();
|
clearMetadataTablePartitionsConfig(Option.empty(), true);
|
||||||
} catch (HoodieMetadataException e) {
|
} catch (HoodieMetadataException e) {
|
||||||
throw new HoodieException("Failed to delete metadata table.", e);
|
throw new HoodieException("Failed to delete metadata table.", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletes the metadata partition if the writer disables any metadata index.
|
||||||
|
*/
|
||||||
|
public void deleteMetadataIndexIfNecessary() {
|
||||||
|
Stream.of(MetadataPartitionType.values()).forEach(partitionType -> {
|
||||||
|
if (shouldDeleteMetadataPartition(partitionType)) {
|
||||||
|
try {
|
||||||
|
LOG.info("Deleting metadata partition because it is disabled in writer: " + partitionType.name());
|
||||||
|
if (metadataPartitionExists(metaClient.getBasePath(), context, partitionType)) {
|
||||||
|
deleteMetadataPartition(metaClient.getBasePath(), context, partitionType);
|
||||||
|
}
|
||||||
|
clearMetadataTablePartitionsConfig(Option.of(partitionType), false);
|
||||||
|
} catch (HoodieMetadataException e) {
|
||||||
|
throw new HoodieException("Failed to delete metadata partition: " + partitionType.name(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean shouldDeleteMetadataPartition(MetadataPartitionType partitionType) {
|
||||||
|
// Only delete metadata table partition when all the following conditions are met:
|
||||||
|
// (1) This is data table.
|
||||||
|
// (2) Index corresponding to this metadata partition is disabled in HoodieWriteConfig.
|
||||||
|
// (3) The completed metadata partitions in table config contains this partition.
|
||||||
|
// NOTE: Inflight metadata partitions are not considered as they could have been inflight due to async indexer.
|
||||||
|
if (HoodieTableMetadata.isMetadataTable(metaClient.getBasePath()) || !config.isMetadataTableEnabled()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
boolean metadataIndexDisabled;
|
||||||
|
switch (partitionType) {
|
||||||
|
// NOTE: FILES partition type is always considered in sync with hoodie.metadata.enable.
|
||||||
|
// It cannot be the case that metadata is enabled but FILES is disabled.
|
||||||
|
case COLUMN_STATS:
|
||||||
|
metadataIndexDisabled = !config.isMetadataColumnStatsIndexEnabled();
|
||||||
|
break;
|
||||||
|
case BLOOM_FILTERS:
|
||||||
|
metadataIndexDisabled = !config.isMetadataBloomFilterIndexEnabled();
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
LOG.debug("Not a valid metadata partition type: " + partitionType.name());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return metadataIndexDisabled
|
||||||
|
&& getCompletedMetadataPartitions(metaClient.getTableConfig()).contains(partitionType.getPartitionPath());
|
||||||
|
}
|
||||||
|
|
||||||
private boolean shouldExecuteMetadataTableDeletion() {
|
private boolean shouldExecuteMetadataTableDeletion() {
|
||||||
// Only execute metadata table deletion when all the following conditions are met
|
// Only execute metadata table deletion when all the following conditions are met
|
||||||
// (1) This is data table
|
// (1) This is data table
|
||||||
@@ -835,17 +887,23 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
|
|||||||
// partitions are ready to use
|
// partitions are ready to use
|
||||||
return !HoodieTableMetadata.isMetadataTable(metaClient.getBasePath())
|
return !HoodieTableMetadata.isMetadataTable(metaClient.getBasePath())
|
||||||
&& !config.isMetadataTableEnabled()
|
&& !config.isMetadataTableEnabled()
|
||||||
&& (!metaClient.getTableConfig().contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS)
|
&& (!metaClient.getTableConfig().contains(TABLE_METADATA_PARTITIONS)
|
||||||
|| !StringUtils.isNullOrEmpty(metaClient.getTableConfig().getMetadataPartitions()));
|
|| !StringUtils.isNullOrEmpty(metaClient.getTableConfig().getMetadataPartitions()));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clears hoodie.table.metadata.partitions in hoodie.properties
|
* Clears hoodie.table.metadata.partitions in hoodie.properties
|
||||||
*/
|
*/
|
||||||
private void clearMetadataTablePartitionsConfig() {
|
private void clearMetadataTablePartitionsConfig(Option<MetadataPartitionType> partitionType, boolean clearAll) {
|
||||||
LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties");
|
if (clearAll) {
|
||||||
metaClient.getTableConfig().setValue(
|
LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties");
|
||||||
HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), StringUtils.EMPTY_STRING);
|
metaClient.getTableConfig().setValue(TABLE_METADATA_PARTITIONS.key(), EMPTY_STRING);
|
||||||
|
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Set<String> completedPartitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
|
||||||
|
completedPartitions.remove(partitionType.get().getPartitionPath());
|
||||||
|
metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", completedPartitions));
|
||||||
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
|
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -105,6 +105,9 @@ public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
|
|||||||
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
|
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
|
||||||
Option<T> actionMetadata) {
|
Option<T> actionMetadata) {
|
||||||
if (config.isMetadataTableEnabled()) {
|
if (config.isMetadataTableEnabled()) {
|
||||||
|
// even with metadata enabled, some index could have been disabled
|
||||||
|
// delete metadata partitions corresponding to such indexes
|
||||||
|
deleteMetadataIndexIfNecessary();
|
||||||
return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config,
|
return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config,
|
||||||
context, actionMetadata, Option.of(triggeringInstantTimestamp)));
|
context, actionMetadata, Option.of(triggeringInstantTimestamp)));
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -113,6 +113,9 @@ public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
|
|||||||
// existence after the creation is needed.
|
// existence after the creation is needed.
|
||||||
final HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(
|
final HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(
|
||||||
context.getHadoopConf().get(), config, context, actionMetadata, Option.of(triggeringInstantTimestamp));
|
context.getHadoopConf().get(), config, context, actionMetadata, Option.of(triggeringInstantTimestamp));
|
||||||
|
// even with metadata enabled, some index could have been disabled
|
||||||
|
// delete metadata partitions corresponding to such indexes
|
||||||
|
deleteMetadataIndexIfNecessary();
|
||||||
try {
|
try {
|
||||||
if (isMetadataTableExists || metaClient.getFs().exists(new Path(
|
if (isMetadataTableExists || metaClient.getFs().exists(new Path(
|
||||||
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())))) {
|
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())))) {
|
||||||
|
|||||||
@@ -146,6 +146,10 @@ import static org.apache.hudi.common.model.WriteOperationType.DELETE;
|
|||||||
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
|
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
|
||||||
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
|
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
|
||||||
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
|
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
|
||||||
|
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
|
||||||
|
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
|
||||||
|
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
|
||||||
|
import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
|
||||||
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
|
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
|
||||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
@@ -202,6 +206,119 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
|||||||
validateMetadata(testTable, true);
|
validateMetadata(testTable, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTurnOffMetadataIndexAfterEnable() throws Exception {
|
||||||
|
initPath();
|
||||||
|
HoodieWriteConfig cfg = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER)
|
||||||
|
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
|
||||||
|
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
|
||||||
|
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
|
||||||
|
.build();
|
||||||
|
init(COPY_ON_WRITE);
|
||||||
|
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
|
||||||
|
// metadata enabled with only FILES partition
|
||||||
|
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, cfg)) {
|
||||||
|
// Insert
|
||||||
|
String commitTime = "0000001";
|
||||||
|
List<HoodieRecord> records = dataGen.generateInserts(commitTime, 20);
|
||||||
|
client.startCommitWithTime(commitTime);
|
||||||
|
List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records, 1), commitTime).collect();
|
||||||
|
assertNoWriteErrors(writeStatuses);
|
||||||
|
|
||||||
|
// Upsert
|
||||||
|
commitTime = "0000002";
|
||||||
|
client.startCommitWithTime(commitTime);
|
||||||
|
records = dataGen.generateUniqueUpdates(commitTime, 10);
|
||||||
|
writeStatuses = client.upsert(jsc.parallelize(records, 1), commitTime).collect();
|
||||||
|
assertNoWriteErrors(writeStatuses);
|
||||||
|
validateMetadata(client);
|
||||||
|
}
|
||||||
|
// check table config
|
||||||
|
HoodieTableMetaClient.reload(metaClient);
|
||||||
|
HoodieTableConfig tableConfig = metaClient.getTableConfig();
|
||||||
|
assertFalse(tableConfig.getMetadataPartitions().isEmpty());
|
||||||
|
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(FILES.getPartitionPath()));
|
||||||
|
assertFalse(getCompletedMetadataPartitions(tableConfig).contains(COLUMN_STATS.getPartitionPath()));
|
||||||
|
assertFalse(getCompletedMetadataPartitions(tableConfig).contains(BLOOM_FILTERS.getPartitionPath()));
|
||||||
|
|
||||||
|
// enable column stats and run 1 upserts
|
||||||
|
HoodieWriteConfig cfgWithColStatsEnabled = HoodieWriteConfig.newBuilder()
|
||||||
|
.withProperties(cfg.getProps())
|
||||||
|
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
|
||||||
|
.withProperties(cfg.getMetadataConfig().getProps())
|
||||||
|
.withMetadataIndexColumnStats(true)
|
||||||
|
.build())
|
||||||
|
.build();
|
||||||
|
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, cfgWithColStatsEnabled)) {
|
||||||
|
// Upsert
|
||||||
|
String commitTime = "0000003";
|
||||||
|
client.startCommitWithTime(commitTime);
|
||||||
|
List<HoodieRecord> records = dataGen.generateUniqueUpdates(commitTime, 10);
|
||||||
|
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), commitTime).collect();
|
||||||
|
assertNoWriteErrors(writeStatuses);
|
||||||
|
validateMetadata(client);
|
||||||
|
}
|
||||||
|
// check table config
|
||||||
|
HoodieTableMetaClient.reload(metaClient);
|
||||||
|
tableConfig = metaClient.getTableConfig();
|
||||||
|
assertFalse(tableConfig.getMetadataPartitions().isEmpty());
|
||||||
|
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(FILES.getPartitionPath()));
|
||||||
|
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(COLUMN_STATS.getPartitionPath()));
|
||||||
|
assertFalse(getCompletedMetadataPartitions(tableConfig).contains(BLOOM_FILTERS.getPartitionPath()));
|
||||||
|
|
||||||
|
// disable column stats and run 1 upsert
|
||||||
|
HoodieWriteConfig cfgWithColStatsDisabled = HoodieWriteConfig.newBuilder()
|
||||||
|
.withProperties(cfg.getProps())
|
||||||
|
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
|
||||||
|
.withProperties(cfg.getMetadataConfig().getProps())
|
||||||
|
.withMetadataIndexColumnStats(false)
|
||||||
|
.build())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, cfgWithColStatsDisabled)) {
|
||||||
|
// Upsert
|
||||||
|
String commitTime = "0000004";
|
||||||
|
client.startCommitWithTime(commitTime);
|
||||||
|
List<HoodieRecord> records = dataGen.generateUniqueUpdates(commitTime, 10);
|
||||||
|
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), commitTime).collect();
|
||||||
|
assertNoWriteErrors(writeStatuses);
|
||||||
|
validateMetadata(client);
|
||||||
|
}
|
||||||
|
// check table config
|
||||||
|
HoodieTableMetaClient.reload(metaClient);
|
||||||
|
tableConfig = metaClient.getTableConfig();
|
||||||
|
assertFalse(tableConfig.getMetadataPartitions().isEmpty());
|
||||||
|
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(FILES.getPartitionPath()));
|
||||||
|
assertFalse(getCompletedMetadataPartitions(tableConfig).contains(COLUMN_STATS.getPartitionPath()));
|
||||||
|
assertFalse(getCompletedMetadataPartitions(tableConfig).contains(BLOOM_FILTERS.getPartitionPath()));
|
||||||
|
|
||||||
|
// enable bloom filter as well as column stats and run 1 upsert
|
||||||
|
HoodieWriteConfig cfgWithBloomFilterEnabled = HoodieWriteConfig.newBuilder()
|
||||||
|
.withProperties(cfgWithColStatsEnabled.getProps())
|
||||||
|
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
|
||||||
|
.withProperties(cfgWithColStatsEnabled.getMetadataConfig().getProps())
|
||||||
|
.withMetadataIndexBloomFilter(true)
|
||||||
|
.build())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, cfgWithBloomFilterEnabled)) {
|
||||||
|
// Upsert
|
||||||
|
String commitTime = "0000005";
|
||||||
|
client.startCommitWithTime(commitTime);
|
||||||
|
List<HoodieRecord> records = dataGen.generateUniqueUpdates(commitTime, 10);
|
||||||
|
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), commitTime).collect();
|
||||||
|
assertNoWriteErrors(writeStatuses);
|
||||||
|
validateMetadata(client);
|
||||||
|
}
|
||||||
|
// check table config
|
||||||
|
HoodieTableMetaClient.reload(metaClient);
|
||||||
|
tableConfig = metaClient.getTableConfig();
|
||||||
|
assertFalse(tableConfig.getMetadataPartitions().isEmpty());
|
||||||
|
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(FILES.getPartitionPath()));
|
||||||
|
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(COLUMN_STATS.getPartitionPath()));
|
||||||
|
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(BLOOM_FILTERS.getPartitionPath()));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTurnOffMetadataTableAfterEnable() throws Exception {
|
public void testTurnOffMetadataTableAfterEnable() throws Exception {
|
||||||
init(COPY_ON_WRITE, true);
|
init(COPY_ON_WRITE, true);
|
||||||
@@ -549,13 +666,13 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
|||||||
// metadata writer to delete column_stats partition
|
// metadata writer to delete column_stats partition
|
||||||
HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client);
|
HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client);
|
||||||
assertNotNull(metadataWriter, "MetadataWriter should have been initialized");
|
assertNotNull(metadataWriter, "MetadataWriter should have been initialized");
|
||||||
metadataWriter.deletePartitions("0000003", Arrays.asList(MetadataPartitionType.COLUMN_STATS));
|
metadataWriter.deletePartitions("0000003", Arrays.asList(COLUMN_STATS));
|
||||||
|
|
||||||
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
|
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
|
||||||
List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, metadataMetaClient.getBasePath(), false, false);
|
List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, metadataMetaClient.getBasePath(), false, false);
|
||||||
// partition should be physically deleted
|
// partition should be physically deleted
|
||||||
assertEquals(metadataWriter.getEnabledPartitionTypes().size(), metadataTablePartitions.size());
|
assertEquals(metadataWriter.getEnabledPartitionTypes().size(), metadataTablePartitions.size());
|
||||||
assertFalse(metadataTablePartitions.contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath()));
|
assertFalse(metadataTablePartitions.contains(COLUMN_STATS.getPartitionPath()));
|
||||||
|
|
||||||
Option<HoodieInstant> completedReplaceInstant = metadataMetaClient.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant();
|
Option<HoodieInstant> completedReplaceInstant = metadataMetaClient.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant();
|
||||||
assertTrue(completedReplaceInstant.isPresent());
|
assertTrue(completedReplaceInstant.isPresent());
|
||||||
@@ -566,7 +683,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
|||||||
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline());
|
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline());
|
||||||
metadataTablePartitions.forEach(partition -> {
|
metadataTablePartitions.forEach(partition -> {
|
||||||
List<FileSlice> latestSlices = fsView.getLatestFileSlices(partition).collect(Collectors.toList());
|
List<FileSlice> latestSlices = fsView.getLatestFileSlices(partition).collect(Collectors.toList());
|
||||||
if (MetadataPartitionType.COLUMN_STATS.getPartitionPath().equals(partition)) {
|
if (COLUMN_STATS.getPartitionPath().equals(partition)) {
|
||||||
// there should not be any file slice in column_stats partition
|
// there should not be any file slice in column_stats partition
|
||||||
assertTrue(latestSlices.isEmpty());
|
assertTrue(latestSlices.isEmpty());
|
||||||
} else {
|
} else {
|
||||||
@@ -819,7 +936,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
|||||||
// Compaction should not be triggered yet. Let's verify no base file
|
// Compaction should not be triggered yet. Let's verify no base file
|
||||||
// and few log files available.
|
// and few log files available.
|
||||||
List<FileSlice> fileSlices = table.getSliceView()
|
List<FileSlice> fileSlices = table.getSliceView()
|
||||||
.getLatestFileSlices(MetadataPartitionType.FILES.getPartitionPath()).collect(Collectors.toList());
|
.getLatestFileSlices(FILES.getPartitionPath()).collect(Collectors.toList());
|
||||||
if (fileSlices.isEmpty()) {
|
if (fileSlices.isEmpty()) {
|
||||||
throw new IllegalStateException("LogFile slices are not available!");
|
throw new IllegalStateException("LogFile slices are not available!");
|
||||||
}
|
}
|
||||||
@@ -912,7 +1029,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
|||||||
.withBasePath(metadataMetaClient.getBasePath())
|
.withBasePath(metadataMetaClient.getBasePath())
|
||||||
.withLogFilePaths(logFilePaths)
|
.withLogFilePaths(logFilePaths)
|
||||||
.withLatestInstantTime(latestCommitTimestamp)
|
.withLatestInstantTime(latestCommitTimestamp)
|
||||||
.withPartition(MetadataPartitionType.FILES.getPartitionPath())
|
.withPartition(FILES.getPartitionPath())
|
||||||
.withReaderSchema(schema)
|
.withReaderSchema(schema)
|
||||||
.withMaxMemorySizeInBytes(100000L)
|
.withMaxMemorySizeInBytes(100000L)
|
||||||
.withBufferSize(4096)
|
.withBufferSize(4096)
|
||||||
@@ -942,7 +1059,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
|||||||
private void verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(HoodieTable table, boolean enableMetaFields) throws IOException {
|
private void verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(HoodieTable table, boolean enableMetaFields) throws IOException {
|
||||||
table.getHoodieView().sync();
|
table.getHoodieView().sync();
|
||||||
List<FileSlice> fileSlices = table.getSliceView()
|
List<FileSlice> fileSlices = table.getSliceView()
|
||||||
.getLatestFileSlices(MetadataPartitionType.FILES.getPartitionPath()).collect(Collectors.toList());
|
.getLatestFileSlices(FILES.getPartitionPath()).collect(Collectors.toList());
|
||||||
if (!fileSlices.get(0).getBaseFile().isPresent()) {
|
if (!fileSlices.get(0).getBaseFile().isPresent()) {
|
||||||
throw new IllegalStateException("Base file not available!");
|
throw new IllegalStateException("Base file not available!");
|
||||||
}
|
}
|
||||||
@@ -2005,7 +2122,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
|||||||
assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".count"));
|
assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".count"));
|
||||||
assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".totalDuration"));
|
assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".totalDuration"));
|
||||||
assertTrue(metricsRegistry.getAllCounts().get(HoodieMetadataMetrics.INITIALIZE_STR + ".count") >= 1L);
|
assertTrue(metricsRegistry.getAllCounts().get(HoodieMetadataMetrics.INITIALIZE_STR + ".count") >= 1L);
|
||||||
final String prefix = MetadataPartitionType.FILES.getPartitionPath() + ".";
|
final String prefix = FILES.getPartitionPath() + ".";
|
||||||
assertTrue(metricsRegistry.getAllCounts().containsKey(prefix + HoodieMetadataMetrics.STAT_COUNT_BASE_FILES));
|
assertTrue(metricsRegistry.getAllCounts().containsKey(prefix + HoodieMetadataMetrics.STAT_COUNT_BASE_FILES));
|
||||||
assertTrue(metricsRegistry.getAllCounts().containsKey(prefix + HoodieMetadataMetrics.STAT_COUNT_LOG_FILES));
|
assertTrue(metricsRegistry.getAllCounts().containsKey(prefix + HoodieMetadataMetrics.STAT_COUNT_LOG_FILES));
|
||||||
assertTrue(metricsRegistry.getAllCounts().containsKey(prefix + HoodieMetadataMetrics.STAT_TOTAL_BASE_FILE_SIZE));
|
assertTrue(metricsRegistry.getAllCounts().containsKey(prefix + HoodieMetadataMetrics.STAT_TOTAL_BASE_FILE_SIZE));
|
||||||
@@ -2218,10 +2335,10 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
|||||||
+ numFileVersions + " per file group, but was " + latestSlices.size());
|
+ numFileVersions + " per file group, but was " + latestSlices.size());
|
||||||
List<HoodieLogFile> logFiles = latestSlices.get(0).getLogFiles().collect(Collectors.toList());
|
List<HoodieLogFile> logFiles = latestSlices.get(0).getLogFiles().collect(Collectors.toList());
|
||||||
try {
|
try {
|
||||||
if (MetadataPartitionType.FILES.getPartitionPath().equals(partition)) {
|
if (FILES.getPartitionPath().equals(partition)) {
|
||||||
verifyMetadataRawRecords(table, logFiles, false);
|
verifyMetadataRawRecords(table, logFiles, false);
|
||||||
}
|
}
|
||||||
if (MetadataPartitionType.COLUMN_STATS.getPartitionPath().equals(partition)) {
|
if (COLUMN_STATS.getPartitionPath().equals(partition)) {
|
||||||
verifyMetadataColumnStatsRecords(logFiles);
|
verifyMetadataColumnStatsRecords(logFiles);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
|||||||
@@ -95,6 +95,10 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
|
|||||||
init(tableType, enableMetadataTable, true, false, false);
|
init(tableType, enableMetadataTable, true, false, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean enableColumnStats) throws IOException {
|
||||||
|
init(tableType, enableMetadataTable, true, false, false);
|
||||||
|
}
|
||||||
|
|
||||||
public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean enableFullScan, boolean enableMetrics, boolean
|
public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean enableFullScan, boolean enableMetrics, boolean
|
||||||
validateMetadataPayloadStateConsistency) throws IOException {
|
validateMetadataPayloadStateConsistency) throws IOException {
|
||||||
init(tableType, Option.empty(), enableMetadataTable, enableFullScan, enableMetrics,
|
init(tableType, Option.empty(), enableMetadataTable, enableFullScan, enableMetrics,
|
||||||
|
|||||||
@@ -444,6 +444,11 @@ public final class HoodieMetadataConfig extends HoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withProperties(Properties properties) {
|
||||||
|
this.metadataConfig.getProps().putAll(properties);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public HoodieMetadataConfig build() {
|
public HoodieMetadataConfig build() {
|
||||||
metadataConfig.setDefaultValue(ENABLE, getDefaultMetadataEnable(engineType));
|
metadataConfig.setDefaultValue(ENABLE, getDefaultMetadataEnable(engineType));
|
||||||
metadataConfig.setDefaults(HoodieMetadataConfig.class.getName());
|
metadataConfig.setDefaults(HoodieMetadataConfig.class.getName());
|
||||||
|
|||||||
Reference in New Issue
Block a user