1
0

[HUDI-2591] Bootstrap metadata table only if upgrade / downgrade is not required. (#3836)

This commit is contained in:
Prashant Wason
2021-11-09 07:26:20 -08:00
committed by GitHub
parent e057a10499
commit 2f95967dfe
6 changed files with 100 additions and 64 deletions

View File

@@ -84,22 +84,32 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
@Deprecated
public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending) {
super(context, writeConfig);
this(context, writeConfig, Option.empty());
}
@Deprecated
public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending,
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService);
this(context, writeConfig, timelineService);
}
public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig,
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService);
initializeMetadataTable(Option.empty());
}
private void initializeMetadataTable(Option<String> inflightInstantTimestamp) {
if (config.isMetadataTableEnabled()) {
// If the metadata table does not exist, it should be bootstrapped here
// TODO: Check if we can remove this requirement - auto bootstrap on commit
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context);
// Defer bootstrap if upgrade / downgrade is pending
HoodieTableMetaClient metaClient = createMetaClient(true);
UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade(
metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance());
if (!upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
// TODO: Check if we can remove this requirement - auto bootstrap on commit
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context, Option.empty(),
inflightInstantTimestamp);
}
}
}
@@ -213,7 +223,6 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
}
/**
* Removes all existing records of the Hoodie table and inserts the given HoodieRecords, into the table.
@@ -369,7 +378,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<WriteStatus> writeStatuses,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
String clusteringCommitTime) {
List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
e.getValue().stream()).collect(Collectors.toList());
@@ -440,6 +449,9 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
upgradeDowngrade.run(HoodieTableVersion.current(), instantTime);
}
metaClient.reloadActiveTimeline();
// re-bootstrap metadata table if required
initializeMetadataTable(Option.of(instantTime));
}
metaClient.validateTableProperties(config.getProps(), operationType);
return getTableAndInitCtx(metaClient, operationType, instantTime);

View File

@@ -48,23 +48,41 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
private static final Logger LOG = LogManager.getLogger(SparkHoodieBackedTableMetadataWriter.class);
public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig,
HoodieEngineContext context) {
return create(conf, writeConfig, context, Option.empty());
}
/**
* Return a Spark based implementation of {@code HoodieTableMetadataWriter} which can be used to
* write to the metadata table.
*
* If the metadata table does not exist, an attempt is made to bootstrap it but there is no guarantted that
* table will end up bootstrapping at this time.
*
* @param conf
* @param writeConfig
* @param context
* @param actionMetadata
* @param inflightInstantTimestamp Timestamp of an instant which is in-progress. This instant is ignored while
* attempting to bootstrap the table.
* @return An instance of the {@code HoodieTableMetadataWriter}
*/
public static <T extends SpecificRecordBase> HoodieTableMetadataWriter create(Configuration conf,
HoodieWriteConfig writeConfig,
HoodieEngineContext context,
Option<T> actionMetadata) {
return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata);
Option<T> actionMetadata,
Option<String> inflightInstantTimestamp) {
return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata,
inflightInstantTimestamp);
}
public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig,
HoodieEngineContext context) {
return create(conf, writeConfig, context, Option.empty(), Option.empty());
}
<T extends SpecificRecordBase> SparkHoodieBackedTableMetadataWriter(Configuration hadoopConf,
HoodieWriteConfig writeConfig,
HoodieEngineContext engineContext,
Option<T> actionMetadata) {
super(hadoopConf, writeConfig, engineContext, actionMetadata);
Option<T> actionMetadata,
Option<String> inflightInstantTimestamp) {
super(hadoopConf, writeConfig, engineContext, actionMetadata, inflightInstantTimestamp);
}
@Override
@@ -84,7 +102,8 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
@Override
protected <T extends SpecificRecordBase> void initialize(HoodieEngineContext engineContext,
Option<T> actionMetadata) {
Option<T> actionMetadata,
Option<String> inflightInstantTimestamp) {
try {
metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> {
if (registry instanceof DistributedRegistry) {
@@ -94,7 +113,7 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
});
if (enabled) {
bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata);
bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata, inflightInstantTimestamp);
}
} catch (IOException e) {
LOG.error("Failed to initialize metadata table. Disabling the writer.", e);

View File

@@ -128,7 +128,7 @@ public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
}
if (isMetadataTableAvailable) {
return Option.of(SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context,
actionMetadata));
actionMetadata, Option.empty()));
} else {
return Option.empty();
}

View File

@@ -968,25 +968,19 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
// set hoodie.table.version to 2 in hoodie.properties file
changeTableVersion(HoodieTableVersion.TWO);
// With next commit the table should be deleted (as part of upgrade)
// With next commit the table should be deleted (as part of upgrade) and then re-bootstrapped automatically
commitTimestamp = HoodieActiveTimeline.createNewInstantTime();
metaClient.reloadActiveTimeline();
FileStatus prevStatus = fs.getFileStatus(new Path(metadataTableBasePath));
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
records = dataGen.generateInserts(commitTimestamp, 5);
client.startCommitWithTime(commitTimestamp);
writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), commitTimestamp).collect();
assertNoWriteErrors(writeStatuses);
}
assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not exist");
// With next commit the table should be re-bootstrapped (currently in the constructor. To be changed)
commitTimestamp = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
records = dataGen.generateInserts(commitTimestamp, 5);
client.startCommitWithTime(commitTimestamp);
writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), commitTimestamp).collect();
assertNoWriteErrors(writeStatuses);
}
assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist");
FileStatus currentStatus = fs.getFileStatus(new Path(metadataTableBasePath));
assertTrue(currentStatus.getModificationTime() > prevStatus.getModificationTime());
initMetaClient();
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.THREE.versionCode());
@@ -1060,7 +1054,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
.withProperties(properties)
.build();
// With next commit the table should be deleted (as part of upgrade) and partial commit should be rolled back.
// With next commit the table should be re-bootstrapped and partial commit should be rolled back.
metaClient.reloadActiveTimeline();
commitTimestamp = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
@@ -1069,17 +1063,6 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
writeStatuses = client.insert(jsc.parallelize(records, 1), commitTimestamp);
assertNoWriteErrors(writeStatuses.collect());
}
assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not exist");
// With next commit the table should be re-bootstrapped (currently in the constructor. To be changed)
commitTimestamp = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
records = dataGen.generateInserts(commitTimestamp, 5);
client.startCommitWithTime(commitTimestamp);
writeStatuses = client.insert(jsc.parallelize(records, 1), commitTimestamp);
assertNoWriteErrors(writeStatuses.collect());
}
assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist");
initMetaClient();
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.THREE.versionCode());