[HUDI-2468] Metadata table support for rolling back the first commit (#3843)
- Fix is to make Metadata table writer creation aware of the currently inflight action so that it can make some informed decision about whether bootstrapping is needed for the table and whether any pending action on the data timeline can be ignored.
This commit is contained in:
committed by
GitHub
parent
5ed35bff83
commit
c9d641cc30
@@ -18,7 +18,9 @@
|
|||||||
|
|
||||||
package org.apache.hudi.metadata;
|
package org.apache.hudi.metadata;
|
||||||
|
|
||||||
|
import org.apache.avro.specific.SpecificRecordBase;
|
||||||
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||||
|
import org.apache.hudi.avro.model.HoodieInstantInfo;
|
||||||
import org.apache.hudi.avro.model.HoodieMetadataRecord;
|
import org.apache.hudi.avro.model.HoodieMetadataRecord;
|
||||||
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
||||||
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
||||||
@@ -67,6 +69,7 @@ import org.apache.log4j.Logger;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -98,8 +101,19 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
|||||||
protected SerializableConfiguration hadoopConf;
|
protected SerializableConfiguration hadoopConf;
|
||||||
protected final transient HoodieEngineContext engineContext;
|
protected final transient HoodieEngineContext engineContext;
|
||||||
|
|
||||||
protected HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig,
|
/**
|
||||||
HoodieEngineContext engineContext) {
|
* Hudi backed table metadata writer.
|
||||||
|
*
|
||||||
|
* @param hadoopConf - Hadoop configuration to use for the metadata writer
|
||||||
|
* @param writeConfig - Writer config
|
||||||
|
* @param engineContext - Engine context
|
||||||
|
* @param actionMetadata - Optional action metadata to help decide bootstrap operations
|
||||||
|
* @param <T> - Action metadata types extending Avro generated SpecificRecordBase
|
||||||
|
*/
|
||||||
|
protected <T extends SpecificRecordBase> HoodieBackedTableMetadataWriter(Configuration hadoopConf,
|
||||||
|
HoodieWriteConfig writeConfig,
|
||||||
|
HoodieEngineContext engineContext,
|
||||||
|
Option<T> actionMetadata) {
|
||||||
this.dataWriteConfig = writeConfig;
|
this.dataWriteConfig = writeConfig;
|
||||||
this.engineContext = engineContext;
|
this.engineContext = engineContext;
|
||||||
this.hadoopConf = new SerializableConfiguration(hadoopConf);
|
this.hadoopConf = new SerializableConfiguration(hadoopConf);
|
||||||
@@ -110,15 +124,20 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
|||||||
enabled = true;
|
enabled = true;
|
||||||
|
|
||||||
// Inline compaction and auto clean is required as we dont expose this table outside
|
// Inline compaction and auto clean is required as we dont expose this table outside
|
||||||
ValidationUtils.checkArgument(!this.metadataWriteConfig.isAutoClean(), "Cleaning is controlled internally for Metadata table.");
|
ValidationUtils.checkArgument(!this.metadataWriteConfig.isAutoClean(),
|
||||||
ValidationUtils.checkArgument(!this.metadataWriteConfig.inlineCompactionEnabled(), "Compaction is controlled internally for metadata table.");
|
"Cleaning is controlled internally for Metadata table.");
|
||||||
|
ValidationUtils.checkArgument(!this.metadataWriteConfig.inlineCompactionEnabled(),
|
||||||
|
"Compaction is controlled internally for metadata table.");
|
||||||
// Metadata Table cannot have metadata listing turned on. (infinite loop, much?)
|
// Metadata Table cannot have metadata listing turned on. (infinite loop, much?)
|
||||||
ValidationUtils.checkArgument(this.metadataWriteConfig.shouldAutoCommit(), "Auto commit is required for Metadata Table");
|
ValidationUtils.checkArgument(this.metadataWriteConfig.shouldAutoCommit(),
|
||||||
ValidationUtils.checkArgument(!this.metadataWriteConfig.isMetadataTableEnabled(), "File listing cannot be used for Metadata Table");
|
"Auto commit is required for Metadata Table");
|
||||||
|
ValidationUtils.checkArgument(!this.metadataWriteConfig.isMetadataTableEnabled(),
|
||||||
|
"File listing cannot be used for Metadata Table");
|
||||||
|
|
||||||
initRegistry();
|
initRegistry();
|
||||||
this.dataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build();
|
this.dataMetaClient =
|
||||||
initialize(engineContext);
|
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build();
|
||||||
|
initialize(engineContext, actionMetadata);
|
||||||
initTableMetadata();
|
initTableMetadata();
|
||||||
} else {
|
} else {
|
||||||
enabled = false;
|
enabled = false;
|
||||||
@@ -215,10 +234,11 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize the metadata table if it does not exist.
|
* Initialize the metadata table if it does not exist.
|
||||||
*
|
* <p>
|
||||||
* If the metadata table did not exist, then file and partition listing is used to bootstrap the table.
|
* If the metadata table does not exist, then file and partition listing is used to bootstrap the table.
|
||||||
*/
|
*/
|
||||||
protected abstract void initialize(HoodieEngineContext engineContext);
|
protected abstract <T extends SpecificRecordBase> void initialize(HoodieEngineContext engineContext,
|
||||||
|
Option<T> actionMetadata);
|
||||||
|
|
||||||
public void initTableMetadata() {
|
public void initTableMetadata() {
|
||||||
try {
|
try {
|
||||||
@@ -233,26 +253,33 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void bootstrapIfNeeded(HoodieEngineContext engineContext, HoodieTableMetaClient dataMetaClient) throws IOException {
|
/**
|
||||||
|
* Bootstrap the metadata table if needed.
|
||||||
|
*
|
||||||
|
* @param engineContext - Engine context
|
||||||
|
* @param dataMetaClient - Meta client for the data table
|
||||||
|
* @param actionMetadata - Optional action metadata
|
||||||
|
* @param <T> - Action metadata types extending Avro generated SpecificRecordBase
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
protected <T extends SpecificRecordBase> void bootstrapIfNeeded(HoodieEngineContext engineContext,
|
||||||
|
HoodieTableMetaClient dataMetaClient,
|
||||||
|
Option<T> actionMetadata) throws IOException {
|
||||||
HoodieTimer timer = new HoodieTimer().startTimer();
|
HoodieTimer timer = new HoodieTimer().startTimer();
|
||||||
boolean exists = dataMetaClient.getFs().exists(new Path(metadataWriteConfig.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME));
|
|
||||||
|
boolean exists = dataMetaClient.getFs().exists(new Path(metadataWriteConfig.getBasePath(),
|
||||||
|
HoodieTableMetaClient.METAFOLDER_NAME));
|
||||||
boolean rebootstrap = false;
|
boolean rebootstrap = false;
|
||||||
|
|
||||||
|
// If the un-synced instants have been archived, then
|
||||||
|
// the metadata table will need to be bootstrapped again.
|
||||||
if (exists) {
|
if (exists) {
|
||||||
// If the un-synched instants have been archived then the metadata table will need to be bootstrapped again
|
final HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get())
|
||||||
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get())
|
|
||||||
.setBasePath(metadataWriteConfig.getBasePath()).build();
|
.setBasePath(metadataWriteConfig.getBasePath()).build();
|
||||||
Option<HoodieInstant> latestMetadataInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
|
final Option<HoodieInstant> latestMetadataInstant =
|
||||||
if (!latestMetadataInstant.isPresent()) {
|
metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
|
||||||
LOG.warn("Metadata Table will need to be re-bootstrapped as no instants were found");
|
|
||||||
rebootstrap = true;
|
rebootstrap = isBootstrapNeeded(latestMetadataInstant, actionMetadata);
|
||||||
} else if (!latestMetadataInstant.get().getTimestamp().equals(SOLO_COMMIT_TIMESTAMP)
|
|
||||||
&& dataMetaClient.getActiveTimeline().getAllCommitsTimeline().isBeforeTimelineStarts(latestMetadataInstant.get().getTimestamp())) {
|
|
||||||
// TODO: Revisit this logic and validate that filtering for all commits timeline is the right thing to do
|
|
||||||
LOG.warn("Metadata Table will need to be re-bootstrapped as un-synced instants have been archived."
|
|
||||||
+ " latestMetadataInstant=" + latestMetadataInstant.get().getTimestamp()
|
|
||||||
+ ", latestDataInstant=" + dataMetaClient.getActiveTimeline().firstInstant().get().getTimestamp());
|
|
||||||
rebootstrap = true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rebootstrap) {
|
if (rebootstrap) {
|
||||||
@@ -270,6 +297,52 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether bootstrap operation needed for this metadata table.
|
||||||
|
* <p>
|
||||||
|
* Rollback of the first commit would look like un-synced instants in the metadata table.
|
||||||
|
* Action metadata is needed to verify the instant time and avoid erroneous bootstrapping.
|
||||||
|
* <p>
|
||||||
|
* TODO: Revisit this logic and validate that filtering for all
|
||||||
|
* commits timeline is the right thing to do
|
||||||
|
*
|
||||||
|
* @return True if the bootstrap is not needed, False otherwise
|
||||||
|
*/
|
||||||
|
private <T extends SpecificRecordBase> boolean isBootstrapNeeded(Option<HoodieInstant> latestMetadataInstant,
|
||||||
|
Option<T> actionMetadata) {
|
||||||
|
if (!latestMetadataInstant.isPresent()) {
|
||||||
|
LOG.warn("Metadata Table will need to be re-bootstrapped as no instants were found");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
final String latestMetadataInstantTimestamp = latestMetadataInstant.get().getTimestamp();
|
||||||
|
if (latestMetadataInstantTimestamp.equals(SOLO_COMMIT_TIMESTAMP)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isRollbackAction = false;
|
||||||
|
List<String> rollbackedTimestamps = Collections.emptyList();
|
||||||
|
if (actionMetadata.isPresent() && actionMetadata.get() instanceof HoodieRollbackMetadata) {
|
||||||
|
isRollbackAction = true;
|
||||||
|
List<HoodieInstantInfo> rollbackedInstants =
|
||||||
|
((HoodieRollbackMetadata) actionMetadata.get()).getInstantsRollback();
|
||||||
|
rollbackedTimestamps = rollbackedInstants.stream().map(instant -> {
|
||||||
|
return instant.getCommitTime().toString();
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dataMetaClient.getActiveTimeline().getAllCommitsTimeline().isBeforeTimelineStarts(
|
||||||
|
latestMetadataInstant.get().getTimestamp())
|
||||||
|
&& (!isRollbackAction || !rollbackedTimestamps.contains(latestMetadataInstantTimestamp))) {
|
||||||
|
LOG.warn("Metadata Table will need to be re-bootstrapped as un-synced instants have been archived."
|
||||||
|
+ " latestMetadataInstant=" + latestMetadataInstant.get().getTimestamp()
|
||||||
|
+ ", latestDataInstant=" + dataMetaClient.getActiveTimeline().firstInstant().get().getTimestamp());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize the Metadata Table by listing files and partitions from the file system.
|
* Initialize the Metadata Table by listing files and partitions from the file system.
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.table;
|
package org.apache.hudi.table;
|
||||||
|
|
||||||
|
import org.apache.avro.specific.SpecificRecordBase;
|
||||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||||
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||||
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
||||||
@@ -718,11 +719,23 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetch instance of {@link HoodieTableMetadataWriter}.
|
* Get Table metadata writer.
|
||||||
|
*
|
||||||
|
* @return instance of {@link HoodieTableMetadataWriter
|
||||||
|
*/
|
||||||
|
public final Option<HoodieTableMetadataWriter> getMetadataWriter() {
|
||||||
|
return getMetadataWriter(Option.empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get Table metadata writer.
|
||||||
|
*
|
||||||
* @return instance of {@link HoodieTableMetadataWriter}
|
* @return instance of {@link HoodieTableMetadataWriter}
|
||||||
*/
|
*/
|
||||||
public Option<HoodieTableMetadataWriter> getMetadataWriter() {
|
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(Option<T> actionMetadata) {
|
||||||
// Each engine is expected to override this and provide the actual metadata writer if enabled.
|
// Each engine is expected to override this and
|
||||||
|
// provide the actual metadata writer, if enabled.
|
||||||
return Option.empty();
|
return Option.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
|||||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
|
||||||
@@ -72,7 +73,7 @@ public abstract class BaseActionExecutor<T extends HoodieRecordPayload, I, K, O,
|
|||||||
* @param metadata rollback metadata of interest.
|
* @param metadata rollback metadata of interest.
|
||||||
*/
|
*/
|
||||||
protected final void writeTableMetadata(HoodieRollbackMetadata metadata) {
|
protected final void writeTableMetadata(HoodieRollbackMetadata metadata) {
|
||||||
table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime));
|
table.getMetadataWriter(Option.of(metadata)).ifPresent(w -> w.update(metadata, instantTime));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.metadata;
|
package org.apache.hudi.metadata;
|
||||||
|
|
||||||
|
import org.apache.avro.specific.SpecificRecordBase;
|
||||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||||
import org.apache.hudi.client.WriteStatus;
|
import org.apache.hudi.client.WriteStatus;
|
||||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||||
@@ -45,12 +46,23 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
|
|||||||
|
|
||||||
private static final Logger LOG = LogManager.getLogger(FlinkHoodieBackedTableMetadataWriter.class);
|
private static final Logger LOG = LogManager.getLogger(FlinkHoodieBackedTableMetadataWriter.class);
|
||||||
|
|
||||||
public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, HoodieEngineContext context) {
|
public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig,
|
||||||
return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, context);
|
HoodieEngineContext context) {
|
||||||
|
return create(conf, writeConfig, context, Option.empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
FlinkHoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
|
public static <T extends SpecificRecordBase> HoodieTableMetadataWriter create(Configuration conf,
|
||||||
super(hadoopConf, writeConfig, engineContext);
|
HoodieWriteConfig writeConfig,
|
||||||
|
HoodieEngineContext context,
|
||||||
|
Option<T> actionMetadata) {
|
||||||
|
return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata);
|
||||||
|
}
|
||||||
|
|
||||||
|
<T extends SpecificRecordBase> FlinkHoodieBackedTableMetadataWriter(Configuration hadoopConf,
|
||||||
|
HoodieWriteConfig writeConfig,
|
||||||
|
HoodieEngineContext engineContext,
|
||||||
|
Option<T> actionMetadata) {
|
||||||
|
super(hadoopConf, writeConfig, engineContext, actionMetadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -65,10 +77,11 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void initialize(HoodieEngineContext engineContext) {
|
protected <T extends SpecificRecordBase> void initialize(HoodieEngineContext engineContext,
|
||||||
|
Option<T> actionMetadata) {
|
||||||
try {
|
try {
|
||||||
if (enabled) {
|
if (enabled) {
|
||||||
bootstrapIfNeeded(engineContext, dataMetaClient);
|
bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Failed to initialize metadata table. Disabling the writer.", e);
|
LOG.error("Failed to initialize metadata table. Disabling the writer.", e);
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.table;
|
package org.apache.hudi.table;
|
||||||
|
|
||||||
|
import org.apache.avro.specific.SpecificRecordBase;
|
||||||
import org.apache.hudi.client.WriteStatus;
|
import org.apache.hudi.client.WriteStatus;
|
||||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||||
import org.apache.hudi.common.data.HoodieData;
|
import org.apache.hudi.common.data.HoodieData;
|
||||||
@@ -107,11 +108,11 @@ public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
|
|||||||
* @return instance of {@link HoodieTableMetadataWriter}
|
* @return instance of {@link HoodieTableMetadataWriter}
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Option<HoodieTableMetadataWriter> getMetadataWriter() {
|
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(Option<T> actionMetadata) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (!isMetadataAvailabilityUpdated) {
|
if (!isMetadataAvailabilityUpdated) {
|
||||||
// this code assumes that if metadata availability is updated once it will not change. please revisit this logic if that's not the case.
|
// This code assumes that if metadata availability is updated once it will not change.
|
||||||
// this is done to avoid repeated calls to fs.exists().
|
// Please revisit this logic if that's not the case. This is done to avoid repeated calls to fs.exists().
|
||||||
try {
|
try {
|
||||||
isMetadataTableAvailable = config.isMetadataTableEnabled()
|
isMetadataTableAvailable = config.isMetadataTableEnabled()
|
||||||
&& metaClient.getFs().exists(new Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())));
|
&& metaClient.getFs().exists(new Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())));
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.metadata;
|
package org.apache.hudi.metadata;
|
||||||
|
|
||||||
|
import org.apache.avro.specific.SpecificRecordBase;
|
||||||
import org.apache.hudi.client.SparkRDDWriteClient;
|
import org.apache.hudi.client.SparkRDDWriteClient;
|
||||||
import org.apache.hudi.client.WriteStatus;
|
import org.apache.hudi.client.WriteStatus;
|
||||||
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
||||||
@@ -47,12 +48,23 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
|
|||||||
|
|
||||||
private static final Logger LOG = LogManager.getLogger(SparkHoodieBackedTableMetadataWriter.class);
|
private static final Logger LOG = LogManager.getLogger(SparkHoodieBackedTableMetadataWriter.class);
|
||||||
|
|
||||||
public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, HoodieEngineContext context) {
|
public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig,
|
||||||
return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, context);
|
HoodieEngineContext context) {
|
||||||
|
return create(conf, writeConfig, context, Option.empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
SparkHoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
|
public static <T extends SpecificRecordBase> HoodieTableMetadataWriter create(Configuration conf,
|
||||||
super(hadoopConf, writeConfig, engineContext);
|
HoodieWriteConfig writeConfig,
|
||||||
|
HoodieEngineContext context,
|
||||||
|
Option<T> actionMetadata) {
|
||||||
|
return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata);
|
||||||
|
}
|
||||||
|
|
||||||
|
<T extends SpecificRecordBase> SparkHoodieBackedTableMetadataWriter(Configuration hadoopConf,
|
||||||
|
HoodieWriteConfig writeConfig,
|
||||||
|
HoodieEngineContext engineContext,
|
||||||
|
Option<T> actionMetadata) {
|
||||||
|
super(hadoopConf, writeConfig, engineContext, actionMetadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -71,7 +83,8 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void initialize(HoodieEngineContext engineContext) {
|
protected <T extends SpecificRecordBase> void initialize(HoodieEngineContext engineContext,
|
||||||
|
Option<T> actionMetadata) {
|
||||||
try {
|
try {
|
||||||
metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> {
|
metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> {
|
||||||
if (registry instanceof DistributedRegistry) {
|
if (registry instanceof DistributedRegistry) {
|
||||||
@@ -81,7 +94,7 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
|
|||||||
});
|
});
|
||||||
|
|
||||||
if (enabled) {
|
if (enabled) {
|
||||||
bootstrapIfNeeded(engineContext, dataMetaClient);
|
bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Failed to initialize metadata table. Disabling the writer.", e);
|
LOG.error("Failed to initialize metadata table. Disabling the writer.", e);
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.table;
|
package org.apache.hudi.table;
|
||||||
|
|
||||||
|
import org.apache.avro.specific.SpecificRecordBase;
|
||||||
import org.apache.hudi.client.WriteStatus;
|
import org.apache.hudi.client.WriteStatus;
|
||||||
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
||||||
import org.apache.hudi.common.data.HoodieData;
|
import org.apache.hudi.common.data.HoodieData;
|
||||||
@@ -111,11 +112,11 @@ public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
|
|||||||
* @return instance of {@link HoodieTableMetadataWriter}
|
* @return instance of {@link HoodieTableMetadataWriter}
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Option<HoodieTableMetadataWriter> getMetadataWriter() {
|
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(Option<T> actionMetadata) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (!isMetadataAvailabilityUpdated) {
|
if (!isMetadataAvailabilityUpdated) {
|
||||||
// this code assumes that if metadata availability is updated once it will not change. please revisit this logic if that's not the case.
|
// This code assumes that if metadata availability is updated once it will not change.
|
||||||
// this is done to avoid repeated calls to fs.exists().
|
// Please revisit this logic if that's not the case. This is done to avoid repeated calls to fs.exists().
|
||||||
try {
|
try {
|
||||||
isMetadataTableAvailable = config.isMetadataTableEnabled()
|
isMetadataTableAvailable = config.isMetadataTableEnabled()
|
||||||
&& metaClient.getFs().exists(new Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())));
|
&& metaClient.getFs().exists(new Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())));
|
||||||
@@ -126,7 +127,8 @@ public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (isMetadataTableAvailable) {
|
if (isMetadataTableAvailable) {
|
||||||
return Option.of(SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context));
|
return Option.of(SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context,
|
||||||
|
actionMetadata));
|
||||||
} else {
|
} else {
|
||||||
return Option.empty();
|
return Option.empty();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -337,7 +337,7 @@ public class TestHBaseIndex extends SparkClientFunctionalTestHarness {
|
|||||||
public void testSimpleTagLocationAndUpdateWithRollback() throws Exception {
|
public void testSimpleTagLocationAndUpdateWithRollback() throws Exception {
|
||||||
// Load to memory
|
// Load to memory
|
||||||
HoodieWriteConfig config = getConfigBuilder(100, false, false)
|
HoodieWriteConfig config = getConfigBuilder(100, false, false)
|
||||||
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build();
|
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
|
||||||
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
|
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
|
||||||
SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
|
SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
|
||||||
|
|
||||||
@@ -425,7 +425,7 @@ public class TestHBaseIndex extends SparkClientFunctionalTestHarness {
|
|||||||
public void testEnsureTagLocationUsesCommitTimeline() throws Exception {
|
public void testEnsureTagLocationUsesCommitTimeline() throws Exception {
|
||||||
// Load to memory
|
// Load to memory
|
||||||
HoodieWriteConfig config = getConfigBuilder(100, false, false)
|
HoodieWriteConfig config = getConfigBuilder(100, false, false)
|
||||||
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build();
|
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
|
||||||
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
|
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
|
||||||
SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
|
SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
|
||||||
|
|
||||||
|
|||||||
@@ -459,6 +459,39 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
|||||||
|
|
||||||
// Some operations are not feasible with test table infra. hence using write client to test those cases.
|
// Some operations are not feasible with test table infra. hence using write client to test those cases.
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Rollback of the first commit should not trigger bootstrap errors at the metadata table.
|
||||||
|
*/
|
||||||
|
@ParameterizedTest
|
||||||
|
@EnumSource(HoodieTableType.class)
|
||||||
|
public void testFirstCommitRollback(HoodieTableType tableType) throws Exception {
|
||||||
|
init(tableType);
|
||||||
|
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
|
||||||
|
|
||||||
|
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
|
||||||
|
|
||||||
|
// Write 1
|
||||||
|
String commitTime = "0000001";
|
||||||
|
List<HoodieRecord> records = dataGen.generateInserts(commitTime, 20);
|
||||||
|
client.startCommitWithTime(commitTime);
|
||||||
|
List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records, 1), commitTime).collect();
|
||||||
|
assertNoWriteErrors(writeStatuses);
|
||||||
|
validateMetadata(client);
|
||||||
|
|
||||||
|
// Rollback the first commit
|
||||||
|
client.rollback(commitTime);
|
||||||
|
|
||||||
|
// Write 2
|
||||||
|
commitTime = "0000002";
|
||||||
|
records = dataGen.generateInserts(commitTime, 10);
|
||||||
|
client.startCommitWithTime(commitTime);
|
||||||
|
writeStatuses = client.upsert(jsc.parallelize(records, 1), commitTime).collect();
|
||||||
|
assertNoWriteErrors(writeStatuses);
|
||||||
|
validateMetadata(client);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test several table operations with restore. This test uses SparkRDDWriteClient.
|
* Test several table operations with restore. This test uses SparkRDDWriteClient.
|
||||||
* Once the restore support is ready in HoodieTestTable, then rewrite this test.
|
* Once the restore support is ready in HoodieTestTable, then rewrite this test.
|
||||||
|
|||||||
Reference in New Issue
Block a user