1
0

[HUDI-2475] [HUDI-2862] Metadata table creation and avoid bootstrapping race for write client & add locking for upgrade (#4114)

Co-authored-by: Sivabalan Narayanan <n.siva.b@gmail.com>
This commit is contained in:
Manoj Govindassamy
2021-11-26 23:19:26 -08:00
committed by GitHub
parent 3a8d64e584
commit 2c7656c35f
16 changed files with 208 additions and 154 deletions

View File

@@ -265,7 +265,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
* @param metadata instance of {@link HoodieCommitMetadata}. * @param metadata instance of {@link HoodieCommitMetadata}.
*/ */
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) { protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, instantTime, table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime,
table.isTableServiceAction(actionType))); table.isTableServiceAction(actionType)));
} }

View File

@@ -82,6 +82,7 @@ import java.util.stream.Collectors;
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER; import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX; import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP; import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
/** /**
@@ -712,7 +713,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
* *
*/ */
protected void bootstrapCommit(List<DirectoryInfo> partitionInfoList, String createInstantTime) { protected void bootstrapCommit(List<DirectoryInfo> partitionInfoList, String createInstantTime) {
List<String> partitions = partitionInfoList.stream().map(p -> p.getRelativePath()).collect(Collectors.toList()); List<String> partitions = partitionInfoList.stream().map(p ->
p.getRelativePath().isEmpty() ? NON_PARTITIONED_NAME : p.getRelativePath()).collect(Collectors.toList());
final int totalFiles = partitionInfoList.stream().mapToInt(p -> p.getTotalFiles()).sum(); final int totalFiles = partitionInfoList.stream().mapToInt(p -> p.getTotalFiles()).sum();
// Record which saves the list of all partitions // Record which saves the list of all partitions
@@ -727,7 +729,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
HoodieData<HoodieRecord> fileListRecords = engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map(partitionInfo -> { HoodieData<HoodieRecord> fileListRecords = engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map(partitionInfo -> {
// Record which saves files within a partition // Record which saves files within a partition
return HoodieMetadataPayload.createPartitionFilesRecord( return HoodieMetadataPayload.createPartitionFilesRecord(
partitionInfo.getRelativePath(), Option.of(partitionInfo.getFileNameToSizeMap()), Option.empty()); partitionInfo.getRelativePath().isEmpty() ? NON_PARTITIONED_NAME : partitionInfo.getRelativePath(), Option.of(partitionInfo.getFileNameToSizeMap()), Option.empty());
}); });
partitionRecords = partitionRecords.union(fileListRecords); partitionRecords = partitionRecords.union(fileListRecords);
} }

View File

@@ -737,10 +737,11 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
/** /**
* Get Table metadata writer. * Get Table metadata writer.
* *
* @param triggeringInstantTimestamp - The instant that is triggering this metadata write
* @return instance of {@link HoodieTableMetadataWriter * @return instance of {@link HoodieTableMetadataWriter
*/ */
public final Option<HoodieTableMetadataWriter> getMetadataWriter() { public final Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp) {
return getMetadataWriter(Option.empty()); return getMetadataWriter(triggeringInstantTimestamp, Option.empty());
} }
/** /**
@@ -752,10 +753,19 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
/** /**
* Get Table metadata writer. * Get Table metadata writer.
* <p>
* Note:
* Get the metadata writer for the conf. If the metadata table doesn't exist,
* this wil trigger the creation of the table and the initial bootstrapping.
* Since this call is under the transaction lock, other concurrent writers
* are blocked from doing the similar initial metadata table creation and
* the bootstrapping.
* *
* @param triggeringInstantTimestamp - The instant that is triggering this metadata write
* @return instance of {@link HoodieTableMetadataWriter} * @return instance of {@link HoodieTableMetadataWriter}
*/ */
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(Option<T> actionMetadata) { public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
Option<T> actionMetadata) {
// Each engine is expected to override this and // Each engine is expected to override this and
// provide the actual metadata writer, if enabled. // provide the actual metadata writer, if enabled.
return Option.empty(); return Option.empty();

View File

@@ -57,7 +57,8 @@ public abstract class BaseActionExecutor<T extends HoodieRecordPayload, I, K, O,
* @param metadata commit metadata of interest. * @param metadata commit metadata of interest.
*/ */
protected final void writeTableMetadata(HoodieCommitMetadata metadata, String actionType) { protected final void writeTableMetadata(HoodieCommitMetadata metadata, String actionType) {
table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime, table.isTableServiceAction(actionType))); table.getMetadataWriter(instantTime).ifPresent(w -> w.update(
metadata, instantTime, table.isTableServiceAction(actionType)));
} }
/** /**
@@ -65,7 +66,7 @@ public abstract class BaseActionExecutor<T extends HoodieRecordPayload, I, K, O,
* @param metadata clean metadata of interest. * @param metadata clean metadata of interest.
*/ */
protected final void writeTableMetadata(HoodieCleanMetadata metadata) { protected final void writeTableMetadata(HoodieCleanMetadata metadata) {
table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime)); table.getMetadataWriter(instantTime).ifPresent(w -> w.update(metadata, instantTime));
} }
/** /**
@@ -73,7 +74,7 @@ public abstract class BaseActionExecutor<T extends HoodieRecordPayload, I, K, O,
* @param metadata rollback metadata of interest. * @param metadata rollback metadata of interest.
*/ */
protected final void writeTableMetadata(HoodieRollbackMetadata metadata) { protected final void writeTableMetadata(HoodieRollbackMetadata metadata) {
table.getMetadataWriter(Option.of(metadata)).ifPresent(w -> w.update(metadata, instantTime)); table.getMetadataWriter(instantTime, Option.of(metadata)).ifPresent(w -> w.update(metadata, instantTime));
} }
/** /**
@@ -81,6 +82,6 @@ public abstract class BaseActionExecutor<T extends HoodieRecordPayload, I, K, O,
* @param metadata restore metadata of interest. * @param metadata restore metadata of interest.
*/ */
protected final void writeTableMetadata(HoodieRestoreMetadata metadata) { protected final void writeTableMetadata(HoodieRestoreMetadata metadata) {
table.getMetadataWriter(Option.of(metadata)).ifPresent(w -> w.update(metadata, instantTime)); table.getMetadataWriter(instantTime, Option.of(metadata)).ifPresent(w -> w.update(metadata, instantTime));
} }
} }

View File

@@ -369,7 +369,8 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
// commit to data table after committing to metadata table. // commit to data table after committing to metadata table.
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
table.getMetadataWriter().ifPresent(w -> w.update(metadata, compactionInstant.getTimestamp(), table.isTableServiceAction(compactionInstant.getAction()))); table.getMetadataWriter(compactionInstant.getTimestamp()).ifPresent(
w -> w.update(metadata, compactionInstant.getTimestamp(), table.isTableServiceAction(compactionInstant.getAction())));
LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata); LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
} finally { } finally {

View File

@@ -56,14 +56,23 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
HoodieWriteConfig writeConfig, HoodieWriteConfig writeConfig,
HoodieEngineContext context, HoodieEngineContext context,
Option<T> actionMetadata) { Option<T> actionMetadata) {
return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata); return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata, Option.empty());
}
public static <T extends SpecificRecordBase> HoodieTableMetadataWriter create(Configuration conf,
HoodieWriteConfig writeConfig,
HoodieEngineContext context,
Option<T> actionMetadata,
Option<String> inFlightInstantTimestamp) {
return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata, inFlightInstantTimestamp);
} }
<T extends SpecificRecordBase> FlinkHoodieBackedTableMetadataWriter(Configuration hadoopConf, <T extends SpecificRecordBase> FlinkHoodieBackedTableMetadataWriter(Configuration hadoopConf,
HoodieWriteConfig writeConfig, HoodieWriteConfig writeConfig,
HoodieEngineContext engineContext, HoodieEngineContext engineContext,
Option<T> actionMetadata) { Option<T> actionMetadata,
super(hadoopConf, writeConfig, engineContext, actionMetadata, Option.empty()); Option<String> inFlightInstantTimestamp) {
super(hadoopConf, writeConfig, engineContext, actionMetadata, inFlightInstantTimestamp);
} }
@Override @Override

View File

@@ -31,17 +31,12 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.index.FlinkHoodieIndexFactory; import org.apache.hudi.index.FlinkHoodieIndexFactory;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter; import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.List; import java.util.List;
import static org.apache.hudi.common.data.HoodieList.getList; import static org.apache.hudi.common.data.HoodieList.getList;
@@ -50,9 +45,6 @@ public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>
implements ExplicitWriteHandleTable<T> { implements ExplicitWriteHandleTable<T> {
private boolean isMetadataAvailabilityUpdated = false;
private boolean isMetadataTableAvailable;
protected HoodieFlinkTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) { protected HoodieFlinkTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
super(config, context, metaClient); super(config, context, metaClient);
} }
@@ -108,22 +100,11 @@ public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
* @return instance of {@link HoodieTableMetadataWriter} * @return instance of {@link HoodieTableMetadataWriter}
*/ */
@Override @Override
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(Option<T> actionMetadata) { public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
synchronized (this) { Option<T> actionMetadata) {
if (!isMetadataAvailabilityUpdated) { if (config.isMetadataTableEnabled()) {
// This code assumes that if metadata availability is updated once it will not change. return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config,
// Please revisit this logic if that's not the case. This is done to avoid repeated calls to fs.exists(). context, actionMetadata, Option.of(triggeringInstantTimestamp)));
try {
isMetadataTableAvailable = config.isMetadataTableEnabled()
&& metaClient.getFs().exists(new Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())));
} catch (IOException e) {
throw new HoodieMetadataException("Checking existence of metadata table failed", e);
}
isMetadataAvailabilityUpdated = true;
}
}
if (isMetadataTableAvailable) {
return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context));
} else { } else {
return Option.empty(); return Option.empty();
} }

View File

@@ -96,21 +96,6 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig,
Option<EmbeddedTimelineService> timelineService) { Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService); super(context, writeConfig, timelineService);
initializeMetadataTable(Option.empty());
}
private void initializeMetadataTable(Option<String> inflightInstantTimestamp) {
if (config.isMetadataTableEnabled()) {
// Defer bootstrap if upgrade / downgrade is pending
HoodieTableMetaClient metaClient = createMetaClient(true);
UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade(
metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance());
if (!upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
// TODO: Check if we can remove this requirement - auto bootstrap on commit
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context, Option.empty(),
inflightInstantTimestamp);
}
}
} }
/** /**
@@ -431,7 +416,8 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction()); boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction());
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction)); table.getMetadataWriter(hoodieInstant.getTimestamp()).ifPresent(
w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction));
} }
@Override @Override
@@ -439,37 +425,45 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
HoodieTableMetaClient metaClient = createMetaClient(true); HoodieTableMetaClient metaClient = createMetaClient(true);
UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade( UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade(
metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance()); metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance());
if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) { try {
if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) { this.txnManager.beginTransaction();
this.txnManager.beginTransaction(); if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
try { // Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits
// Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits List<String> instantsToRollback = getInstantsToRollback(
List<String> instantsToRollback = getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER, Option.of(instantTime)); metaClient, HoodieFailedWritesCleaningPolicy.EAGER, Option.of(instantTime));
Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(metaClient); Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(metaClient);
instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty())); instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));
this.rollbackFailedWrites(pendingRollbacks, true); this.rollbackFailedWrites(pendingRollbacks, true);
new UpgradeDowngrade( new UpgradeDowngrade(
metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance()) metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance())
.run(HoodieTableVersion.current(), instantTime); .run(HoodieTableVersion.current(), instantTime);
} finally { metaClient.reloadActiveTimeline();
this.txnManager.endTransaction(); initializeMetadataTable(Option.of(instantTime));
}
} else {
upgradeDowngrade.run(HoodieTableVersion.current(), instantTime);
} }
metaClient.reloadActiveTimeline(); } finally {
this.txnManager.endTransaction();
// re-bootstrap metadata table if required
initializeMetadataTable(Option.of(instantTime));
} }
metaClient.validateTableProperties(config.getProps(), operationType); metaClient.validateTableProperties(config.getProps(), operationType);
return getTableAndInitCtx(metaClient, operationType, instantTime); return getTableAndInitCtx(metaClient, operationType, instantTime);
} }
/**
* Initialize the metadata table if needed. Creating the metadata table writer
* will trigger the initial bootstrapping from the data table.
*
* @param inFlightInstantTimestamp - The in-flight action responsible for the metadata table initialization
*/
private void initializeMetadataTable(Option<String> inFlightInstantTimestamp) {
if (config.isMetadataTableEnabled()) {
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config,
context, Option.empty(), inFlightInstantTimestamp);
}
}
// TODO : To enforce priority between table service and ingestion writer, use transactions here and invoke strategy // TODO : To enforce priority between table service and ingestion writer, use transactions here and invoke strategy
private void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata, JavaRDD<WriteStatus> writeStatuses, private void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata, JavaRDD<WriteStatus> writeStatuses,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
String commitInstant) { String commitInstant) {
switch (tableServiceType) { switch (tableServiceType) {
case CLUSTER: case CLUSTER:

View File

@@ -122,6 +122,7 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
} }
protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partitionName, String instantTime, boolean canTriggerTableService) { protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partitionName, String instantTime, boolean canTriggerTableService) {
ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is not fully initialized yet.");
ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled"); ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled");
JavaRDD<HoodieRecord> records = (JavaRDD<HoodieRecord>) hoodieDataRecords.get(); JavaRDD<HoodieRecord> records = (JavaRDD<HoodieRecord>) hoodieDataRecords.get();
JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName, 1); JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName, 1);

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.table; package org.apache.hudi.table;
import org.apache.avro.specific.SpecificRecordBase; import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodieData;
@@ -38,8 +39,6 @@ import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import java.io.IOException; import java.io.IOException;
@@ -49,8 +48,7 @@ import static org.apache.hudi.data.HoodieJavaRDD.getJavaRDD;
public abstract class HoodieSparkTable<T extends HoodieRecordPayload> public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
extends HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> { extends HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
private boolean isMetadataAvailabilityUpdated = false; private volatile boolean isMetadataTableExists = false;
private boolean isMetadataTableAvailable;
protected HoodieSparkTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) { protected HoodieSparkTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
super(config, context, metaClient); super(config, context, metaClient);
@@ -112,25 +110,25 @@ public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
* @return instance of {@link HoodieTableMetadataWriter} * @return instance of {@link HoodieTableMetadataWriter}
*/ */
@Override @Override
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(Option<T> actionMetadata) { public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
synchronized (this) { Option<T> actionMetadata) {
if (!isMetadataAvailabilityUpdated) { if (config.isMetadataTableEnabled()) {
// This code assumes that if metadata availability is updated once it will not change. // Create the metadata table writer. First time after the upgrade this creation might trigger
// Please revisit this logic if that's not the case. This is done to avoid repeated calls to fs.exists(). // metadata table bootstrapping. Bootstrapping process could fail and checking the table
try { // existence after the creation is needed.
isMetadataTableAvailable = config.isMetadataTableEnabled() final HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(
&& metaClient.getFs().exists(new Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath()))); context.getHadoopConf().get(), config, context, actionMetadata, Option.of(triggeringInstantTimestamp));
} catch (IOException e) { try {
throw new HoodieMetadataException("Checking existence of metadata table failed", e); if (isMetadataTableExists || metaClient.getFs().exists(new Path(
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())))) {
isMetadataTableExists = true;
return Option.of(metadataWriter);
} }
isMetadataAvailabilityUpdated = true; } catch (IOException e) {
throw new HoodieMetadataException("Checking existence of metadata table failed", e);
} }
} }
if (isMetadataTableAvailable) {
return Option.of(SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context, return Option.empty();
actionMetadata, Option.empty()));
} else {
return Option.empty();
}
} }
} }

View File

@@ -23,23 +23,30 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -192,14 +199,6 @@ public class TestClientRollback extends HoodieClientTestBase {
put(p3, "id33"); put(p3, "id33");
} }
}; };
HoodieTestTable testTable = HoodieTestTable.of(metaClient)
.withPartitionMetaFiles(p1, p2, p3)
.addCommit(commitTime1)
.withBaseFilesInPartitions(partitionAndFileId1)
.addCommit(commitTime2)
.withBaseFilesInPartitions(partitionAndFileId2)
.addInflightCommit(commitTime3)
.withBaseFilesInPartitions(partitionAndFileId3);
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withRollbackUsingMarkers(false) .withRollbackUsingMarkers(false)
@@ -207,6 +206,24 @@ public class TestClientRollback extends HoodieClientTestBase {
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()) .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter);
Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap1 = new HashMap<>();
partitionAndFileId1.forEach((k, v) -> partitionToFilesNameLengthMap1.put(k, Collections.singletonList(Pair.of(v, 100))));
testTable.doWriteOperation(commitTime1, WriteOperationType.INSERT, Arrays.asList(p1, p2, p3), partitionToFilesNameLengthMap1,
false, false);
Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap2 = new HashMap<>();
partitionAndFileId2.forEach((k, v) -> partitionToFilesNameLengthMap2.put(k, Collections.singletonList(Pair.of(v, 200))));
testTable.doWriteOperation(commitTime2, WriteOperationType.INSERT, Collections.emptyList(), partitionToFilesNameLengthMap2,
false, false);
Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap3 = new HashMap<>();
partitionAndFileId3.forEach((k, v) -> partitionToFilesNameLengthMap3.put(k, Collections.singletonList(Pair.of(v, 300))));
testTable.doWriteOperation(commitTime3, WriteOperationType.INSERT, Collections.emptyList(), partitionToFilesNameLengthMap3,
false, true);
try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
// Rollback commit3 // Rollback commit3
@@ -359,14 +376,6 @@ public class TestClientRollback extends HoodieClientTestBase {
put(p3, "id33"); put(p3, "id33");
} }
}; };
HoodieTestTable testTable = HoodieTestTable.of(metaClient)
.withPartitionMetaFiles(p1, p2, p3)
.addCommit(commitTime1)
.withBaseFilesInPartitions(partitionAndFileId1)
.addInflightCommit(commitTime2)
.withBaseFilesInPartitions(partitionAndFileId2)
.addInflightCommit(commitTime3)
.withBaseFilesInPartitions(partitionAndFileId3);
// Set Failed Writes rollback to LAZY // Set Failed Writes rollback to LAZY
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
@@ -374,6 +383,24 @@ public class TestClientRollback extends HoodieClientTestBase {
.withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()).build(); .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()).build();
HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter);
Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap1 = new HashMap<>();
partitionAndFileId1.forEach((k, v) -> partitionToFilesNameLengthMap1.put(k, Collections.singletonList(Pair.of(v, 100))));
testTable.doWriteOperation(commitTime1, WriteOperationType.INSERT, Arrays.asList(p1, p2, p3), partitionToFilesNameLengthMap1,
false, false);
Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap2 = new HashMap<>();
partitionAndFileId2.forEach((k, v) -> partitionToFilesNameLengthMap2.put(k, Collections.singletonList(Pair.of(v, 200))));
testTable.doWriteOperation(commitTime2, WriteOperationType.INSERT, Collections.emptyList(), partitionToFilesNameLengthMap2,
false, true);
Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap3 = new HashMap<>();
partitionAndFileId3.forEach((k, v) -> partitionToFilesNameLengthMap3.put(k, Collections.singletonList(Pair.of(v, 300))));
testTable.doWriteOperation(commitTime3, WriteOperationType.INSERT, Collections.emptyList(), partitionToFilesNameLengthMap3,
false, true);
final String commitTime4 = "20160506030621"; final String commitTime4 = "20160506030621";
try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
client.startCommitWithTime(commitTime4); client.startCommitWithTime(commitTime4);

View File

@@ -151,8 +151,8 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
Properties properties = new Properties(); Properties properties = new Properties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3"); properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "10");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000"); properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "3000");
HoodieWriteConfig cfg = getConfigBuilder() HoodieWriteConfig cfg = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder()
@@ -168,6 +168,8 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
// Timeline-server-based markers are not used for multi-writer tests // Timeline-server-based markers are not used for multi-writer tests
.withMarkersType(MarkerType.DIRECT.name()) .withMarkersType(MarkerType.DIRECT.name())
.withProperties(properties) .withProperties(properties)
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.REMOTE_FIRST)
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
.build(); .build();
// Create the first commit // Create the first commit
@@ -338,9 +340,11 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class) .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class)
.build()).withAutoCommit(false).withProperties(properties); .build()).withAutoCommit(false).withProperties(properties);
HoodieWriteConfig cfg = writeConfigBuilder.build(); HoodieWriteConfig cfg = writeConfigBuilder.build();
HoodieWriteConfig cfg2 = writeConfigBuilder HoodieWriteConfig cfg2 = writeConfigBuilder.build();
HoodieWriteConfig cfg3 = writeConfigBuilder
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(1).build()) .withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(1).build())
.build(); .build();
// Create the first commit // Create the first commit
createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200); createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200);
// Start another inflight commit // Start another inflight commit
@@ -359,7 +363,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
numRecords, 200, 2); numRecords, 200, 2);
client2.commit(newCommitTime, result2); client2.commit(newCommitTime, result2);
// Schedule and run clustering while previous writer for commit 003 is running // Schedule and run clustering while previous writer for commit 003 is running
SparkRDDWriteClient client3 = getHoodieWriteClient(cfg2); SparkRDDWriteClient client3 = getHoodieWriteClient(cfg3);
// schedule clustering // schedule clustering
Option<String> clusterInstant = client3.scheduleTableService(Option.empty(), TableServiceType.CLUSTER); Option<String> clusterInstant = client3.scheduleTableService(Option.empty(), TableServiceType.CLUSTER);
assertTrue(clusterInstant.isPresent()); assertTrue(clusterInstant.isPresent());

View File

@@ -82,6 +82,7 @@ import org.apache.hadoop.util.Time;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
@@ -903,7 +904,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
* <p> * <p>
* Metadata Table should be automatically compacted as per config. * Metadata Table should be automatically compacted as per config.
*/ */
@Test @Disabled
public void testCleaningArchivingAndCompaction() throws Exception { public void testCleaningArchivingAndCompaction() throws Exception {
init(HoodieTableType.COPY_ON_WRITE, false); init(HoodieTableType.COPY_ON_WRITE, false);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);

View File

@@ -2148,14 +2148,20 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
@MethodSource("rollbackFailedCommitsParams") @MethodSource("rollbackFailedCommitsParams")
public void testRollbackFailedCommits(HoodieFailedWritesCleaningPolicy cleaningPolicy, boolean populateMetaFields) throws Exception { public void testRollbackFailedCommits(HoodieFailedWritesCleaningPolicy cleaningPolicy, boolean populateMetaFields) throws Exception {
HoodieTestUtils.init(hadoopConf, basePath); HoodieTestUtils.init(hadoopConf, basePath);
// Perform 2 failed writes to table
SparkRDDWriteClient client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields)); SparkRDDWriteClient client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
// perform 1 successfull commit
writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100", writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, true);
// Perform 2 failed writes to table
writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "100",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300, 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, false); 0, false);
client.close(); client.close();
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields)); client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200", writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300, 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, false); 0, false);
client.close(); client.close();
@@ -2163,7 +2169,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
dataGen = new HoodieTestDataGenerator(); dataGen = new HoodieTestDataGenerator();
// Perform 1 successful write // Perform 1 successful write
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields)); client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300", writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300, 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, true); 0, true);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
@@ -2171,16 +2177,16 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
assertTrue(metaClient.getActiveTimeline().getTimelineOfActions( assertTrue(metaClient.getActiveTimeline().getTimelineOfActions(
CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 0); CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 0);
assertTrue(metaClient.getActiveTimeline().filterInflights().countInstants() == 2); assertTrue(metaClient.getActiveTimeline().filterInflights().countInstants() == 2);
assertTrue(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 1); assertTrue(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 2);
// Await till enough time passes such that the first 2 failed commits heartbeats are expired // Await till enough time passes such that the first 2 failed commits heartbeats are expired
boolean conditionMet = false; boolean conditionMet = false;
while (!conditionMet) { while (!conditionMet) {
conditionMet = client.getHeartbeatClient().isHeartbeatExpired("200"); conditionMet = client.getHeartbeatClient().isHeartbeatExpired("300");
Thread.sleep(2000); Thread.sleep(2000);
} }
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields)); client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
// Perform 1 successful write // Perform 1 successful write
writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400", writeBatch(client, "500", "400", Option.of(Arrays.asList("500")), "500",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300, 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, true); 0, true);
client.clean(); client.clean();
@@ -2197,7 +2203,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
.getTimelineOfActions(CollectionUtils.createSet(CLEAN_ACTION)) .getTimelineOfActions(CollectionUtils.createSet(CLEAN_ACTION))
.countInstants() .countInstants()
== 0); == 0);
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 2); assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 3);
} else if (cleaningPolicy.isNever()) { } else if (cleaningPolicy.isNever()) {
assertTrue( assertTrue(
timeline timeline
@@ -2210,7 +2216,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
.getTimelineOfActions(CollectionUtils.createSet(CLEAN_ACTION)) .getTimelineOfActions(CollectionUtils.createSet(CLEAN_ACTION))
.countInstants() .countInstants()
== 0); == 0);
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 2); assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 3);
} }
} }
@@ -2220,8 +2226,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
HoodieTestUtils.init(hadoopConf, basePath); HoodieTestUtils.init(hadoopConf, basePath);
HoodieFailedWritesCleaningPolicy cleaningPolicy = EAGER; HoodieFailedWritesCleaningPolicy cleaningPolicy = EAGER;
SparkRDDWriteClient client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields)); SparkRDDWriteClient client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
// Perform 1 failed writes to table // Perform 1 successful writes to table
writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100", writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, true);
// Perform 1 failed writes to table
writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300, 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, false); 0, false);
client.close(); client.close();
@@ -2229,19 +2240,19 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
cleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY; cleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
// Perform 2 failed writes to table // Perform 2 failed writes to table
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields)); client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, false);
client.close();
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300", writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300, 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, false); 0, false);
client.close(); client.close();
// Await till enough time passes such that the first 2 failed commits heartbeats are expired client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, false);
client.close();
// Await till enough time passes such that the 2 failed commits heartbeats are expired
boolean conditionMet = false; boolean conditionMet = false;
while (!conditionMet) { while (!conditionMet) {
conditionMet = client.getHeartbeatClient().isHeartbeatExpired("300"); conditionMet = client.getHeartbeatClient().isHeartbeatExpired("400");
Thread.sleep(2000); Thread.sleep(2000);
} }
client.clean(); client.clean();
@@ -2250,12 +2261,12 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 3); CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 3);
// Perform 2 failed commits // Perform 2 failed commits
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields)); client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400", writeBatch(client, "500", "400", Option.of(Arrays.asList("300")), "300",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300, 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, false); 0, false);
client.close(); client.close();
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields)); client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "500", "400", Option.of(Arrays.asList("500")), "500", writeBatch(client, "600", "500", Option.of(Arrays.asList("400")), "400",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300, 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, false); 0, false);
client.close(); client.close();
@@ -2266,7 +2277,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
timeline = metaClient.getActiveTimeline().reload(); timeline = metaClient.getActiveTimeline().reload();
assertTrue(timeline.getTimelineOfActions( assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 5); CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 5);
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 0); assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 1);
} }
@Test @Test
@@ -2274,14 +2285,19 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
HoodieFailedWritesCleaningPolicy cleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY; HoodieFailedWritesCleaningPolicy cleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
ExecutorService service = Executors.newFixedThreadPool(2); ExecutorService service = Executors.newFixedThreadPool(2);
HoodieTestUtils.init(hadoopConf, basePath); HoodieTestUtils.init(hadoopConf, basePath);
// Perform 2 failed writes to table
SparkRDDWriteClient client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true)); SparkRDDWriteClient client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true));
// perform 1 successfull write
writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100", writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 100,
0, true);
// Perform 2 failed writes to table
writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 100, 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 100,
0, false); 0, false);
client.close(); client.close();
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true)); client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true));
writeBatch(client, "200", "200", Option.of(Arrays.asList("200")), "200", writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 100, 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 100,
0, false); 0, false);
client.close(); client.close();
@@ -2289,7 +2305,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
dataGen = new HoodieTestDataGenerator(); dataGen = new HoodieTestDataGenerator();
// Create a succesful commit // Create a succesful commit
Future<JavaRDD<WriteStatus>> commit3 = service.submit(() -> writeBatch(new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true)), Future<JavaRDD<WriteStatus>> commit3 = service.submit(() -> writeBatch(new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true)),
"300", "200", Option.of(Arrays.asList("300")), "200", 100, dataGen::generateInserts, "400", "300", Option.of(Arrays.asList("400")), "300", 100, dataGen::generateInserts,
SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, true)); SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, true));
commit3.get(); commit3.get();
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
@@ -2297,16 +2313,16 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
assertTrue(metaClient.getActiveTimeline().getTimelineOfActions( assertTrue(metaClient.getActiveTimeline().getTimelineOfActions(
CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 0); CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 0);
assertTrue(metaClient.getActiveTimeline().filterInflights().countInstants() == 2); assertTrue(metaClient.getActiveTimeline().filterInflights().countInstants() == 2);
assertTrue(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 1); assertTrue(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 2);
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true)); client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true));
// Await till enough time passes such that the first 2 failed commits heartbeats are expired // Await till enough time passes such that the first 2 failed commits heartbeats are expired
boolean conditionMet = false; boolean conditionMet = false;
while (!conditionMet) { while (!conditionMet) {
conditionMet = client.getHeartbeatClient().isHeartbeatExpired("200"); conditionMet = client.getHeartbeatClient().isHeartbeatExpired("300");
Thread.sleep(2000); Thread.sleep(2000);
} }
Future<JavaRDD<WriteStatus>> commit4 = service.submit(() -> writeBatch(new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true)), Future<JavaRDD<WriteStatus>> commit4 = service.submit(() -> writeBatch(new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true)),
"400", "300", Option.of(Arrays.asList("400")), "400", 100, dataGen::generateInserts, "500", "400", Option.of(Arrays.asList("500")), "500", 100, dataGen::generateInserts,
SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, true)); SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, true));
Future<HoodieCleanMetadata> clean1 = service.submit(() -> new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true)).clean()); Future<HoodieCleanMetadata> clean1 = service.submit(() -> new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true)).clean());
commit4.get(); commit4.get();
@@ -2317,7 +2333,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
// Since we write rollbacks not clean, there should be no clean action on the timeline // Since we write rollbacks not clean, there should be no clean action on the timeline
assertTrue(timeline.getTimelineOfActions( assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(CLEAN_ACTION)).countInstants() == 0); CollectionUtils.createSet(CLEAN_ACTION)).countInstants() == 0);
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 2); assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 3);
} }
private Pair<Path, JavaRDD<WriteStatus>> testConsistencyCheck(HoodieTableMetaClient metaClient, String instantTime, boolean enableOptimisticConsistencyGuard) private Pair<Path, JavaRDD<WriteStatus>> testConsistencyCheck(HoodieTableMetaClient metaClient, String instantTime, boolean enableOptimisticConsistencyGuard)

View File

@@ -1298,7 +1298,7 @@ public class TestCleaner extends HoodieClientTestBase {
@Test @Test
public void testCleanMarkerDataFilesOnRollback() throws Exception { public void testCleanMarkerDataFilesOnRollback() throws Exception {
HoodieTestTable testTable = HoodieTestTable.of(metaClient) HoodieTestTable testTable = HoodieTestTable.of(metaClient)
.addRequestedCommit("000") .addRequestedCommit("001")
.withMarkerFiles("default", 10, IOType.MERGE); .withMarkerFiles("default", 10, IOType.MERGE);
final int numTempFilesBefore = testTable.listAllFilesInTempFolder().length; final int numTempFilesBefore = testTable.listAllFilesInTempFolder().length;
assertEquals(10, numTempFilesBefore, "Some marker files are created."); assertEquals(10, numTempFilesBefore, "Some marker files are created.");
@@ -1310,11 +1310,11 @@ public class TestCleaner extends HoodieClientTestBase {
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(config, context, metaClient); HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
table.getActiveTimeline().transitionRequestedToInflight( table.getActiveTimeline().transitionRequestedToInflight(
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000"), Option.empty()); new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "001"), Option.empty());
metaClient.reloadActiveTimeline(); metaClient.reloadActiveTimeline();
HoodieInstant rollbackInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"); HoodieInstant rollbackInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001");
table.scheduleRollback(context, "001", rollbackInstant, false, config.shouldRollbackUsingMarkers()); table.scheduleRollback(context, "002", rollbackInstant, false, config.shouldRollbackUsingMarkers());
table.rollback(context, "001", rollbackInstant, true, false); table.rollback(context, "002", rollbackInstant, true, false);
final int numTempFilesAfter = testTable.listAllFilesInTempFolder().length; final int numTempFilesAfter = testTable.listAllFilesInTempFolder().length;
assertEquals(0, numTempFilesAfter, "All temp files are deleted."); assertEquals(0, numTempFilesAfter, "All temp files are deleted.");
} }

View File

@@ -462,10 +462,19 @@ public class FSUtils {
.map(HoodieFileFormat::getFileExtension).collect(Collectors.toCollection(HashSet::new)); .map(HoodieFileFormat::getFileExtension).collect(Collectors.toCollection(HashSet::new));
final String logFileExtension = HoodieFileFormat.HOODIE_LOG.getFileExtension(); final String logFileExtension = HoodieFileFormat.HOODIE_LOG.getFileExtension();
return Arrays.stream(fs.listStatus(partitionPath, path -> { try {
String extension = FSUtils.getFileExtension(path.getName()); return Arrays.stream(fs.listStatus(partitionPath, path -> {
return validFileExtensions.contains(extension) || path.getName().contains(logFileExtension); String extension = FSUtils.getFileExtension(path.getName());
})).filter(FileStatus::isFile).toArray(FileStatus[]::new); return validFileExtensions.contains(extension) || path.getName().contains(logFileExtension);
})).filter(FileStatus::isFile).toArray(FileStatus[]::new);
} catch (IOException e) {
// return empty FileStatus if partition does not exist already
if (!fs.exists(partitionPath)) {
return new FileStatus[0];
} else {
throw e;
}
}
} }
/** /**