1
0

[HUDI-2537] Fix metadata table for flink (#3774)

This commit is contained in:
Danny Chan
2021-10-10 09:30:39 +08:00
committed by GitHub
parent 2a392d8e8e
commit ad63938890
12 changed files with 231 additions and 152 deletions

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.AbstractHoodieWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -219,7 +220,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
*/ */
protected abstract void initialize(HoodieEngineContext engineContext); protected abstract void initialize(HoodieEngineContext engineContext);
protected void initTableMetadata() { public void initTableMetadata() {
try { try {
if (this.metadata != null) { if (this.metadata != null) {
this.metadata.close(); this.metadata.close();
@@ -533,4 +534,42 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
* @param instantTime The timestamp to use for the deltacommit. * @param instantTime The timestamp to use for the deltacommit.
*/ */
protected abstract void commit(List<HoodieRecord> records, String partitionName, String instantTime); protected abstract void commit(List<HoodieRecord> records, String partitionName, String instantTime);
/**
* Perform a compaction on the Metadata Table.
*
* Cases to be handled:
* 1. We cannot perform compaction if there are previous inflight operations on the dataset. This is because
* a compacted metadata base file at time Tx should represent all the actions on the dataset till time Tx.
*
* 2. In multi-writer scenario, a parallel operation with a greater instantTime may have completed creating a
* deltacommit.
*/
protected void compactIfNecessary(AbstractHoodieWriteClient writeClient, String instantTime) {
String latestDeltacommitTime = metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
.get().getTimestamp();
List<HoodieInstant> pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
.findInstantsBefore(latestDeltacommitTime).getInstants().collect(Collectors.toList());
if (!pendingInstants.isEmpty()) {
LOG.info(String.format("Cannot compact metadata table as there are %d inflight instants before latest deltacommit %s: %s",
pendingInstants.size(), latestDeltacommitTime, Arrays.toString(pendingInstants.toArray())));
return;
}
// Trigger compaction with suffixes based on the same instant time. This ensures that any future
// delta commits synced over will not have an instant time lesser than the last completed instant on the
// metadata table.
final String compactionInstantTime = latestDeltacommitTime + "001";
if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) {
writeClient.compact(compactionInstantTime);
}
}
protected void doClean(AbstractHoodieWriteClient writeClient, String instantTime) {
// Trigger cleaning with suffixes based on the same instant time. This ensures that any future
// delta commits synced over will not have an instant time lesser than the last completed instant on the
// metadata table.
writeClient.clean(instantTime + "002");
}
} }

View File

@@ -19,7 +19,6 @@
package org.apache.hudi.client; package org.apache.hudi.client;
import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.FileSlice;
@@ -51,6 +50,8 @@ import org.apache.hudi.io.FlinkMergeAndReplaceHandle;
import org.apache.hudi.io.FlinkMergeHandle; import org.apache.hudi.io.FlinkMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.MiniBatchHandle; import org.apache.hudi.io.MiniBatchHandle;
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
@@ -86,24 +87,18 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
* FileID to write handle mapping in order to record the write handles for each file group, * FileID to write handle mapping in order to record the write handles for each file group,
* so that we can append the mini-batch data buffer incrementally. * so that we can append the mini-batch data buffer incrementally.
*/ */
private Map<String, HoodieWriteHandle<?, ?, ?, ?>> bucketToHandles; private final Map<String, HoodieWriteHandle<?, ?, ?, ?>> bucketToHandles;
public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) { /**
this(context, clientConfig, false); * Cached metadata writer for coordinator to reuse for each commit.
} */
private Option<HoodieBackedTableMetadataWriter> metadataWriterOption = Option.empty();
@Deprecated public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending) {
super(context, writeConfig); super(context, writeConfig);
this.bucketToHandles = new HashMap<>(); this.bucketToHandles = new HashMap<>();
} }
@Deprecated
public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending,
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService);
}
/** /**
* Complete changes performed at the given instantTime marker with specified action. * Complete changes performed at the given instantTime marker with specified action.
*/ */
@@ -260,6 +255,24 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
// remove the async cleaning // remove the async cleaning
} }
@Override
protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
this.metadataWriterOption.ifPresent(w -> {
w.initTableMetadata(); // refresh the timeline
w.update(metadata, instantTime);
});
}
/**
* Initialize the table metadata writer, for e.g, bootstrap the metadata table
* from the filesystem if it does not exist.
*/
public void initMetadataWriter() {
HoodieBackedTableMetadataWriter metadataWriter = (HoodieBackedTableMetadataWriter) FlinkHoodieBackedTableMetadataWriter.create(
FlinkClientUtil.getHadoopConf(), this.config, HoodieFlinkEngineContext.DEFAULT);
this.metadataWriterOption = Option.of(metadataWriter);
}
/** /**
* Starts async cleaning service for finished commits. * Starts async cleaning service for finished commits.
* *
@@ -347,6 +360,8 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
String compactionCommitTime) { String compactionCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction"); this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
List<HoodieWriteStat> writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()); List<HoodieWriteStat> writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime));
// commit to data table after committing to metadata table.
finalizeWrite(table, compactionCommitTime, writeStats); finalizeWrite(table, compactionCommitTime, writeStats);
LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata); LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
FlinkCompactHelpers.newInstance().completeInflightCompaction(table, compactionCommitTime, metadata); FlinkCompactHelpers.newInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
@@ -381,6 +396,19 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
throw new HoodieNotSupportedException("Clustering is not supported yet"); throw new HoodieNotSupportedException("Clustering is not supported yet");
} }
private void writeTableMetadata(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
HoodieCommitMetadata commitMetadata,
HoodieInstant hoodieInstant) {
try {
this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty());
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp()));
} finally {
this.txnManager.endTransaction();
}
}
@Override @Override
protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String instantTime) { protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true); HoodieTableMetaClient metaClient = createMetaClient(true);
@@ -478,6 +506,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
} else { } else {
writeTimer = metrics.getDeltaCommitCtx(); writeTimer = metrics.getDeltaCommitCtx();
} }
table.getHoodieView().sync();
return table; return table;
} }

View File

@@ -20,24 +20,17 @@ package org.apache.hudi.metadata;
import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
@@ -86,82 +79,61 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
@Override @Override
protected void commit(List<HoodieRecord> records, String partitionName, String instantTime) { protected void commit(List<HoodieRecord> records, String partitionName, String instantTime) {
ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled"); ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled");
List<HoodieRecord> recordRDD = prepRecords(records, partitionName); List<HoodieRecord> recordList = prepRecords(records, partitionName, 1);
try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig, true)) { try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) {
writeClient.startCommitWithTime(instantTime); if (!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime)) {
writeClient.transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime); // if this is a new commit being applied to metadata for the first time
writeClient.startCommitWithTime(instantTime);
writeClient.transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime);
} else {
// this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable.
// for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable.
// when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes
// are upserts to metadata table and so only a new delta commit will be created.
// once rollback is complete, compaction will be retried again, which will eventually hit this code block where the respective commit is
// already part of completed commit. So, we have to manually remove the completed instant and proceed.
// and it is for the same reason we enabled withAllowMultiWriteOnSameInstant for metadata table.
HoodieInstant alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant().get();
HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), metadataMetaClient.getMetaPath(), alreadyCompletedInstant);
metadataMetaClient.reloadActiveTimeline();
}
List<WriteStatus> statuses = writeClient.upsertPreppedRecords(recordRDD, instantTime); List<WriteStatus> statuses = records.size() > 0
? writeClient.upsertPreppedRecords(recordList, instantTime)
: Collections.emptyList();
statuses.forEach(writeStatus -> { statuses.forEach(writeStatus -> {
if (writeStatus.hasErrors()) { if (writeStatus.hasErrors()) {
throw new HoodieMetadataException("Failed to commit metadata table records at instant " + instantTime); throw new HoodieMetadataException("Failed to commit metadata table records at instant " + instantTime);
} }
}); });
// flink does not support auto-commit yet, also the auto commit logic is not complete as AbstractHoodieWriteClient now.
writeClient.commit(instantTime, statuses, Option.empty(), HoodieActiveTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap()); writeClient.commit(instantTime, statuses, Option.empty(), HoodieActiveTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap());
// trigger cleaning, compaction, with suffixes based on the same instant time. This ensures that any future
// delta commits synced over will not have an instant time lesser than the last completed instant on the // reload timeline
// metadata table. metadataMetaClient.reloadActiveTimeline();
if (writeClient.scheduleCompactionAtInstant(instantTime + "001", Option.empty())) { compactIfNecessary(writeClient, instantTime);
writeClient.compact(instantTime + "001"); doClean(writeClient, instantTime);
}
writeClient.clean(instantTime + "002");
} }
// Update total size of the metadata and count of base/log files // Update total size of the metadata and count of base/log files
metrics.ifPresent(m -> { metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata));
try {
m.updateSizeMetrics(metadataMetaClient, metadata);
} catch (HoodieIOException e) {
LOG.error("Could not publish metadata size metrics", e);
}
});
} }
/** /**
* Tag each record with the location. * Tag each record with the location in the given partition.
* <p> *
* Since we only read the latest base file in a partition, we tag the records with the instant time of the latest * The record is tagged with respective file slice's location based on its record key.
* base file.
*/ */
private List<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName) { private List<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName, int numFileGroups) {
HoodieTable table = HoodieFlinkTable.create(metadataWriteConfig, (HoodieFlinkEngineContext) engineContext); List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName);
TableFileSystemView.SliceView fsView = table.getSliceView(); ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));
List<HoodieBaseFile> baseFiles = fsView.getLatestFileSlices(partitionName)
.map(FileSlice::getBaseFile)
.filter(Option::isPresent)
.map(Option::get)
.collect(Collectors.toList());
// All the metadata fits within a single base file return records.stream().map(r -> {
if (partitionName.equals(MetadataPartitionType.FILES.partitionPath())) { FileSlice slice = fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), numFileGroups));
if (baseFiles.size() > 1) { final String instantTime = slice.isEmpty() ? "I" : "U";
throw new HoodieMetadataException("Multiple base files found in metadata partition"); r.setCurrentLocation(new HoodieRecordLocation(instantTime, slice.getFileId()));
} return r;
} }).collect(Collectors.toList());
String fileId;
String instantTime;
if (!baseFiles.isEmpty()) {
fileId = baseFiles.get(0).getFileId();
instantTime = "U";
} else {
// If there is a log file then we can assume that it has the data
List<HoodieLogFile> logFiles = fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath())
.map(FileSlice::getLatestLogFile)
.filter(Option::isPresent)
.map(Option::get)
.collect(Collectors.toList());
if (logFiles.isEmpty()) {
// No base and log files. All are new inserts
fileId = FSUtils.createNewFileIdPfx();
instantTime = "I";
} else {
fileId = logFiles.get(0).getFileId();
instantTime = "U";
}
}
return records.stream().map(r -> r.setCurrentLocation(new HoodieRecordLocation(instantTime, fileId))).collect(Collectors.toList());
} }
} }

View File

@@ -29,14 +29,25 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.index.FlinkHoodieIndex; import org.apache.hudi.index.FlinkHoodieIndex;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.List; import java.util.List;
public abstract class HoodieFlinkTable<T extends HoodieRecordPayload> public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>
implements ExplicitWriteHandleTable<T> { implements ExplicitWriteHandleTable<T> {
private boolean isMetadataAvailabilityUpdated = false;
private boolean isMetadataTableAvailable;
protected HoodieFlinkTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) { protected HoodieFlinkTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
super(config, context, metaClient); super(config, context, metaClient);
} }
@@ -66,4 +77,31 @@ public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
protected HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext context) { protected HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext context) {
return FlinkHoodieIndex.createIndex((HoodieFlinkEngineContext) context, config); return FlinkHoodieIndex.createIndex((HoodieFlinkEngineContext) context, config);
} }
/**
* Fetch instance of {@link HoodieTableMetadataWriter}.
*
* @return instance of {@link HoodieTableMetadataWriter}
*/
@Override
public Option<HoodieTableMetadataWriter> getMetadataWriter() {
synchronized (this) {
if (!isMetadataAvailabilityUpdated) {
// this code assumes that if metadata availability is updated once it will not change. please revisit this logic if that's not the case.
// this is done to avoid repeated calls to fs.exists().
try {
isMetadataTableAvailable = config.isMetadataTableEnabled()
&& metaClient.getFs().exists(new Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())));
} catch (IOException e) {
throw new HoodieMetadataException("Checking existence of metadata table failed", e);
}
isMetadataAvailabilityUpdated = true;
}
}
if (isMetadataTableAvailable) {
return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context));
} else {
return Option.empty();
}
}
} }

View File

@@ -41,9 +41,7 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter { public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter {
@@ -129,44 +127,6 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata)); metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata));
} }
/**
* Perform a compaction on the Metadata Table.
*
* Cases to be handled:
* 1. We cannot perform compaction if there are previous inflight operations on the dataset. This is because
* a compacted metadata base file at time Tx should represent all the actions on the dataset till time Tx.
*
* 2. In multi-writer scenario, a parallel operation with a greater instantTime may have completed creating a
* deltacommit.
*/
private void compactIfNecessary(SparkRDDWriteClient writeClient, String instantTime) {
String latestDeltacommitTime = metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
.get().getTimestamp();
List<HoodieInstant> pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
.findInstantsBefore(latestDeltacommitTime).getInstants().collect(Collectors.toList());
if (!pendingInstants.isEmpty()) {
LOG.info(String.format("Cannot compact metadata table as there are %d inflight instants before latest deltacommit %s: %s",
pendingInstants.size(), latestDeltacommitTime, Arrays.toString(pendingInstants.toArray())));
return;
}
// Trigger compaction with suffixes based on the same instant time. This ensures that any future
// delta commits synced over will not have an instant time lesser than the last completed instant on the
// metadata table.
final String compactionInstantTime = latestDeltacommitTime + "001";
if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) {
writeClient.compact(compactionInstantTime);
}
}
private void doClean(SparkRDDWriteClient writeClient, String instantTime) {
// Trigger cleaning with suffixes based on the same instant time. This ensures that any future
// delta commits synced over will not have an instant time lesser than the last completed instant on the
// metadata table.
writeClient.clean(instantTime + "002");
}
/** /**
* Tag each record with the location in the given partition. * Tag each record with the location in the given partition.
* *

View File

@@ -126,11 +126,6 @@ public class StreamWriteOperatorCoordinator
*/ */
private HiveSyncContext hiveSyncContext; private HiveSyncContext hiveSyncContext;
/**
* A single-thread executor to handle metadata table sync.
*/
private NonThrownExecutor metadataSyncExecutor;
/** /**
* The table state. * The table state.
*/ */
@@ -294,7 +289,7 @@ public class StreamWriteOperatorCoordinator
} }
private void initMetadataSync() { private void initMetadataSync() {
this.metadataSyncExecutor = new NonThrownExecutor(LOG, true); this.writeClient.initMetadataWriter();
} }
private void reset() { private void reset() {
@@ -498,14 +493,6 @@ public class StreamWriteOperatorCoordinator
this.executor = executor; this.executor = executor;
} }
@VisibleForTesting
public void setMetadataSyncExecutor(NonThrownExecutor executor) throws Exception {
if (this.metadataSyncExecutor != null) {
this.metadataSyncExecutor.close();
}
this.metadataSyncExecutor = executor;
}
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// Inner Class // Inner Class
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------

View File

@@ -72,8 +72,6 @@ public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> {
// it would check the validity. // it would check the validity.
// wait for the buffer data flush out and request a new instant // wait for the buffer data flush out and request a new instant
flushData(false); flushData(false);
// nullify the write helper for next ckp
this.writerHelper = null;
} }
@Override @Override
@@ -133,5 +131,10 @@ public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> {
.endInput(endInput) .endInput(endInput)
.build(); .build();
this.eventGateway.sendEventToCoordinator(event); this.eventGateway.sendEventToCoordinator(event);
// nullify the write helper for next ckp
this.writerHelper = null;
this.writeStatuses.addAll(writeStatus);
// blocks flushing until the coordinator starts a new instant
this.confirming = true;
} }
} }

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
@@ -48,10 +49,12 @@ public class FileIndex {
private final Path path; private final Path path;
private final HoodieMetadataConfig metadataConfig; private final HoodieMetadataConfig metadataConfig;
private List<String> partitionPaths; // cache of partition paths private List<String> partitionPaths; // cache of partition paths
private final boolean tableExists;
private FileIndex(Path path, Configuration conf) { private FileIndex(Path path, Configuration conf) {
this.path = path; this.path = path;
this.metadataConfig = metadataConfig(conf); this.metadataConfig = metadataConfig(conf);
this.tableExists = StreamerUtil.tableExists(path.toString(), StreamerUtil.getHadoopConf());
} }
public static FileIndex instance(Path path, Configuration conf) { public static FileIndex instance(Path path, Configuration conf) {
@@ -111,6 +114,9 @@ public class FileIndex {
* Returns all the file statuses under the table base path. * Returns all the file statuses under the table base path.
*/ */
public FileStatus[] getFilesInPartitions() { public FileStatus[] getFilesInPartitions() {
if (!tableExists) {
return new FileStatus[0];
}
String[] partitions = getOrBuildPartitionPaths().stream().map(p -> fullPartitionPath(path, p)).toArray(String[]::new); String[] partitions = getOrBuildPartitionPaths().stream().map(p -> fullPartitionPath(path, p)).toArray(String[]::new);
return FSUtils.getFilesInPartitions(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString(), return FSUtils.getFilesInPartitions(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString(),
partitions, "/tmp/") partitions, "/tmp/")
@@ -165,8 +171,9 @@ public class FileIndex {
if (this.partitionPaths != null) { if (this.partitionPaths != null) {
return this.partitionPaths; return this.partitionPaths;
} }
this.partitionPaths = FSUtils.getAllPartitionPaths(HoodieFlinkEngineContext.DEFAULT, this.partitionPaths = this.tableExists
metadataConfig, path.toString()); ? FSUtils.getAllPartitionPaths(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString())
: Collections.emptyList();
return this.partitionPaths; return this.partitionPaths;
} }
@@ -174,7 +181,7 @@ public class FileIndex {
Properties properties = new Properties(); Properties properties = new Properties();
// set up metadata.enabled=true in table DDL to enable metadata listing // set up metadata.enabled=true in table DDL to enable metadata listing
properties.put(HoodieMetadataConfig.ENABLE, conf.getBoolean(FlinkOptions.METADATA_ENABLED)); properties.put(HoodieMetadataConfig.ENABLE.key(), conf.getBoolean(FlinkOptions.METADATA_ENABLED));
return HoodieMetadataConfig.newBuilder().fromProperties(properties).build(); return HoodieMetadataConfig.newBuilder().fromProperties(properties).build();
} }

View File

@@ -41,7 +41,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.api.io.TempDir;
@@ -181,7 +180,7 @@ public class TestStreamWriteOperatorCoordinator {
assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1)); assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1));
} }
@Disabled @Test
void testSyncMetadataTable() throws Exception { void testSyncMetadataTable() throws Exception {
// reset // reset
reset(); reset();
@@ -193,7 +192,6 @@ public class TestStreamWriteOperatorCoordinator {
coordinator = new StreamWriteOperatorCoordinator(conf, context); coordinator = new StreamWriteOperatorCoordinator(conf, context);
coordinator.start(); coordinator.start();
coordinator.setExecutor(new MockCoordinatorExecutor(context)); coordinator.setExecutor(new MockCoordinatorExecutor(context));
coordinator.setMetadataSyncExecutor(new MockCoordinatorExecutor(context));
final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0); final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0);
@@ -209,7 +207,7 @@ public class TestStreamWriteOperatorCoordinator {
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is("0000000000000")); assertThat(completedTimeline.lastInstant().get().getTimestamp(), is("0000000000000"));
// test metadata table compaction // test metadata table compaction
// write another 4 commits // write another 3 commits
for (int i = 1; i < 4; i++) { for (int i = 1; i < 4; i++) {
instant = mockWriteWithMetadata(); instant = mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline(); metadataTableMetaClient.reloadActiveTimeline();
@@ -247,7 +245,13 @@ public class TestStreamWriteOperatorCoordinator {
double failureFraction) { double failureFraction) {
final WriteStatus writeStatus = new WriteStatus(trackSuccessRecords, failureFraction); final WriteStatus writeStatus = new WriteStatus(trackSuccessRecords, failureFraction);
writeStatus.setPartitionPath(partitionPath); writeStatus.setPartitionPath(partitionPath);
writeStatus.setStat(new HoodieWriteStat());
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setPartitionPath(partitionPath);
writeStat.setFileId("fileId123");
writeStat.setPath("path123");
writeStatus.setStat(writeStat);
return WriteMetadataEvent.builder() return WriteMetadataEvent.builder()
.taskID(taskId) .taskID(taskId)

View File

@@ -142,9 +142,6 @@ public class StreamWriteFunctionWrapper<I> {
public void openFunction() throws Exception { public void openFunction() throws Exception {
this.coordinator.start(); this.coordinator.start();
this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext)); this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext));
if (conf.getBoolean(FlinkOptions.METADATA_ENABLED)) {
this.coordinator.setMetadataSyncExecutor(new MockCoordinatorExecutor(coordinatorContext));
}
toHoodieFunction = new RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf); toHoodieFunction = new RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf);
toHoodieFunction.setRuntimeContext(runtimeContext); toHoodieFunction.setRuntimeContext(runtimeContext);
toHoodieFunction.open(conf); toHoodieFunction.open(conf);

View File

@@ -27,7 +27,6 @@ import org.apache.hudi.utils.TestData;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
@@ -90,7 +89,8 @@ public class TestFileIndex {
assertTrue(fileStatuses[0].getPath().toString().endsWith(HoodieFileFormat.PARQUET.getFileExtension())); assertTrue(fileStatuses[0].getPath().toString().endsWith(HoodieFileFormat.PARQUET.getFileExtension()));
} }
@Disabled @ParameterizedTest
@ValueSource(booleans = {true, false})
void testFileListingEmptyTable(boolean enableMetadata) { void testFileListingEmptyTable(boolean enableMetadata) {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setBoolean(FlinkOptions.METADATA_ENABLED, enableMetadata); conf.setBoolean(FlinkOptions.METADATA_ENABLED, enableMetadata);

View File

@@ -144,6 +144,10 @@
<include>org.apache.flink:flink-sql-connector-hive-2.3.6_${scala.binary.version}</include> <include>org.apache.flink:flink-sql-connector-hive-2.3.6_${scala.binary.version}</include>
<include>org.apache.hbase:hbase-common</include> <include>org.apache.hbase:hbase-common</include>
<include>org.apache.hbase:hbase-client</include>
<include>org.apache.hbase:hbase-server</include>
<include>org.apache.hbase:hbase-protocol</include>
<include>org.apache.htrace:htrace-core</include>
<include>commons-codec:commons-codec</include> <include>commons-codec:commons-codec</include>
</includes> </includes>
</artifactSet> </artifactSet>
@@ -594,6 +598,45 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-protocol</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
<version>${htrace.version}</version>
</dependency>
</dependencies> </dependencies>
<profiles> <profiles>