diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 6a40d62dd..9cd5301fb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -248,7 +248,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta if (!latestMetadataInstant.isPresent()) { LOG.warn("Metadata Table will need to be re-bootstrapped as no instants were found"); rebootstrap = true; - } else if (datasetMetaClient.getActiveTimeline().isBeforeTimelineStarts(latestMetadataInstant.get().getTimestamp())) { + } else if (!latestMetadataInstant.get().getTimestamp().equals(SOLO_COMMIT_TIMESTAMP) + && datasetMetaClient.getActiveTimeline().isBeforeTimelineStarts(latestMetadataInstant.get().getTimestamp())) { LOG.warn("Metadata Table will need to be re-bootstrapped as un-synced instants have been archived." + " latestMetadataInstant=" + latestMetadataInstant.get().getTimestamp() + ", latestDatasetInstant=" + datasetMetaClient.getActiveTimeline().firstInstant().get().getTimestamp()); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 45701bbba..07b419e59 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -40,7 +40,9 @@ import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.index.FlinkHoodieIndex; import org.apache.hudi.index.HoodieIndex; @@ -50,6 +52,8 @@ import org.apache.hudi.io.FlinkMergeAndReplaceHandle; import org.apache.hudi.io.FlinkMergeHandle; import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.io.MiniBatchHandle; +import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter; +import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieTable; @@ -153,7 +157,15 @@ public class HoodieFlinkWriteClient extends @Override public List upsertPreppedRecords(List> preppedRecords, String instantTime) { - throw new HoodieNotSupportedException("UpsertPrepped operation is not supported yet"); + // only used for metadata table, the upsert happens in single thread + HoodieTable>, List, List> table = + getTableAndInitCtx(WriteOperationType.UPSERT, instantTime); + table.validateUpsertSchema(); + preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient()); + final HoodieWriteHandle writeHandle = getOrCreateWriteHandle(preppedRecords.get(0), getConfig(), + instantTime, table, preppedRecords.listIterator()); + HoodieWriteMetadata> result = ((HoodieFlinkTable) table).upsertPrepped(context, writeHandle, instantTime, preppedRecords); + return postWrite(result, instantTime, table); } @Override @@ -355,7 +367,14 @@ public class HoodieFlinkWriteClient extends @Override protected List compact(String compactionInstantTime, boolean shouldComplete) { - throw new HoodieNotSupportedException("Compaction is not supported yet"); + // only used for metadata table, the compaction happens in single thread + try { + List writeStatuses = FlinkCompactHelpers.compact(compactionInstantTime, this); + commitCompaction(compactionInstantTime, writeStatuses, Option.empty()); + return writeStatuses; + } catch (IOException e) { + throw new HoodieException("Error while compacting instant: " + compactionInstantTime); + } } @Override @@ -370,6 +389,16 @@ public class HoodieFlinkWriteClient extends return getTableAndInitCtx(metaClient, operationType); } + @Override + public void syncTableMetadata() { + // Open up the metadata table again, for syncing + try (HoodieTableMetadataWriter writer = FlinkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context)) { + LOG.info("Successfully synced to metadata table"); + } catch (Exception e) { + throw new HoodieMetadataException("Error syncing to metadata table.", e); + } + } + /** * Clean the write handles within a checkpoint interval. * All the handles should have been closed already. diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java index 0bd0d432c..2fc5af19b 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java @@ -32,6 +32,7 @@ import org.apache.flink.api.common.functions.RuntimeContext; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -47,7 +48,13 @@ import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper * A flink engine implementation of HoodieEngineContext. */ public class HoodieFlinkEngineContext extends HoodieEngineContext { - private RuntimeContext runtimeContext; + public static final HoodieFlinkEngineContext DEFAULT = new HoodieFlinkEngineContext(); + + private final RuntimeContext runtimeContext; + + private HoodieFlinkEngineContext() { + this(new SerializableConfiguration(FlinkClientUtil.getHadoopConf()), new DefaultTaskContextSupplier()); + } public HoodieFlinkEngineContext(TaskContextSupplier taskContextSupplier) { this(new SerializableConfiguration(FlinkClientUtil.getHadoopConf()), taskContextSupplier); @@ -97,4 +104,34 @@ public class HoodieFlinkEngineContext extends HoodieEngineContext { public void setJobStatus(String activeModule, String activityDescription) { // no operation for now } + + /** + * Override the flink context supplier to return constant write token. + */ + private static class DefaultTaskContextSupplier extends FlinkTaskContextSupplier { + + public DefaultTaskContextSupplier() { + this(null); + } + + public DefaultTaskContextSupplier(RuntimeContext flinkRuntimeContext) { + super(flinkRuntimeContext); + } + + public Supplier getPartitionIdSupplier() { + return () -> 0; + } + + public Supplier getStageIdSupplier() { + return () -> 1; + } + + public Supplier getAttemptIdSupplier() { + return () -> 0L; + } + + public Option getProperty(EngineProperty prop) { + return Option.empty(); + } + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java new file mode 100644 index 000000000..298113054 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.metrics.Registry; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.view.TableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieMetadataException; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter { + + private static final Logger LOG = LogManager.getLogger(FlinkHoodieBackedTableMetadataWriter.class); + + public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, HoodieEngineContext context) { + return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, context); + } + + FlinkHoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) { + super(hadoopConf, writeConfig, engineContext); + } + + @Override + protected void initRegistry() { + if (metadataWriteConfig.isMetricsOn()) { + // should support executor metrics + Registry registry = Registry.getRegistry("HoodieMetadata"); + this.metrics = Option.of(new HoodieMetadataMetrics(registry)); + } else { + this.metrics = Option.empty(); + } + } + + @Override + protected void initialize(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient) { + try { + if (enabled) { + bootstrapIfNeeded(engineContext, datasetMetaClient); + } + } catch (IOException e) { + LOG.error("Failed to initialize metadata table. Disabling the writer.", e); + enabled = false; + } + } + + @Override + protected void commit(List records, String partitionName, String instantTime) { + ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled"); + List recordRDD = prepRecords(records, partitionName); + + try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig, true)) { + writeClient.startCommitWithTime(instantTime); + writeClient.transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime); + + List statuses = writeClient.upsertPreppedRecords(recordRDD, instantTime); + statuses.forEach(writeStatus -> { + if (writeStatus.hasErrors()) { + throw new HoodieMetadataException("Failed to commit metadata table records at instant " + instantTime); + } + }); + writeClient.commit(instantTime, statuses, Option.empty(), HoodieActiveTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap()); + // trigger cleaning, compaction, with suffixes based on the same instant time. This ensures that any future + // delta commits synced over will not have an instant time lesser than the last completed instant on the + // metadata table. + if (writeClient.scheduleCompactionAtInstant(instantTime + "001", Option.empty())) { + writeClient.compact(instantTime + "001"); + } + writeClient.clean(instantTime + "002"); + } + + // Update total size of the metadata and count of base/log files + metrics.ifPresent(m -> { + try { + Map stats = m.getStats(false, metaClient, metadata); + m.updateMetrics(Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_BASE_FILE_SIZE)), + Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_LOG_FILE_SIZE)), + Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_BASE_FILES)), + Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_LOG_FILES))); + } catch (HoodieIOException e) { + LOG.error("Could not publish metadata size metrics", e); + } + }); + } + + /** + * Return the timestamp of the latest instant synced. + *

+ * To sync a instant on dataset, we create a corresponding delta-commit on the metadata table. So return the latest + * delta-commit. + */ + @Override + public Option getLatestSyncedInstantTime() { + if (!enabled) { + return Option.empty(); + } + + HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline(); + return timeline.getDeltaCommitTimeline().filterCompletedInstants() + .lastInstant().map(HoodieInstant::getTimestamp); + } + + /** + * Tag each record with the location. + *

+ * Since we only read the latest base file in a partition, we tag the records with the instant time of the latest + * base file. + */ + private List prepRecords(List records, String partitionName) { + HoodieTable table = HoodieFlinkTable.create(metadataWriteConfig, (HoodieFlinkEngineContext) engineContext); + TableFileSystemView.SliceView fsView = table.getSliceView(); + List baseFiles = fsView.getLatestFileSlices(partitionName) + .map(FileSlice::getBaseFile) + .filter(Option::isPresent) + .map(Option::get) + .collect(Collectors.toList()); + + // All the metadata fits within a single base file + if (partitionName.equals(MetadataPartitionType.FILES.partitionPath())) { + if (baseFiles.size() > 1) { + throw new HoodieMetadataException("Multiple base files found in metadata partition"); + } + } + + String fileId; + String instantTime; + if (!baseFiles.isEmpty()) { + fileId = baseFiles.get(0).getFileId(); + instantTime = "U"; + } else { + // If there is a log file then we can assume that it has the data + List logFiles = fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()) + .map(FileSlice::getLatestLogFile) + .filter(Option::isPresent) + .map(Option::get) + .collect(Collectors.toList()); + if (logFiles.isEmpty()) { + // No base and log files. All are new inserts + fileId = FSUtils.createNewFileIdPfx(); + instantTime = "I"; + } else { + fileId = logFiles.get(0).getFileId(); + instantTime = "U"; + } + } + + return records.stream().map(r -> r.setCurrentLocation(new HoodieRecordLocation(instantTime, fileId))).collect(Collectors.toList()); + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java index 8bcb979aa..de485dbaa 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java @@ -34,6 +34,7 @@ import org.apache.hudi.io.FlinkAppendHandle; import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExecutor; +import org.apache.hudi.table.action.commit.delta.FlinkUpsertPreppedDeltaCommitActionExecutor; import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor; import org.apache.hudi.table.action.compact.FlinkScheduleCompactionActionExecutor; import org.apache.hudi.table.action.rollback.FlinkMergeOnReadRollbackActionExecutor; @@ -63,6 +64,18 @@ public class HoodieFlinkMergeOnReadTable return new FlinkUpsertDeltaCommitActionExecutor<>(context, appendHandle, config, this, instantTime, hoodieRecords).execute(); } + @Override + public HoodieWriteMetadata> upsertPrepped( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> preppedRecords) { + ValidationUtils.checkArgument(writeHandle instanceof FlinkAppendHandle, + "MOR write handle should always be a FlinkAppendHandle"); + FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle; + return new FlinkUpsertPreppedDeltaCommitActionExecutor<>(context, appendHandle, config, this, instantTime, preppedRecords).execute(); + } + @Override public HoodieWriteMetadata> insert( HoodieEngineContext context, diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/FlinkUpsertDeltaCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/FlinkUpsertDeltaCommitActionExecutor.java index eb850602a..5fdf46f6e 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/FlinkUpsertDeltaCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/FlinkUpsertDeltaCommitActionExecutor.java @@ -32,7 +32,7 @@ import java.util.List; public class FlinkUpsertDeltaCommitActionExecutor> extends BaseFlinkDeltaCommitActionExecutor { - private List> inputRecords; + private final List> inputRecords; public FlinkUpsertDeltaCommitActionExecutor(HoodieEngineContext context, FlinkAppendHandle writeHandle, diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/FlinkUpsertPreppedDeltaCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/FlinkUpsertPreppedDeltaCommitActionExecutor.java new file mode 100644 index 000000000..493c894c8 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/FlinkUpsertPreppedDeltaCommitActionExecutor.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit.delta; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.FlinkAppendHandle; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import java.util.List; + +public class FlinkUpsertPreppedDeltaCommitActionExecutor> + extends BaseFlinkDeltaCommitActionExecutor { + + private final List> preppedRecords; + + public FlinkUpsertPreppedDeltaCommitActionExecutor(HoodieEngineContext context, + FlinkAppendHandle writeHandle, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + List> preppedRecords) { + super(context, writeHandle, config, table, instantTime, WriteOperationType.UPSERT_PREPPED); + this.preppedRecords = preppedRecords; + } + + @Override + public HoodieWriteMetadata> execute() { + return super.execute(preppedRecords); + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkCompactHelpers.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkCompactHelpers.java index 274f691c4..68a42a557 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkCompactHelpers.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkCompactHelpers.java @@ -19,20 +19,33 @@ package org.apache.hudi.table.action.compact; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.util.CompactionUtils; +import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable; +import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; +import static java.util.stream.Collectors.toList; + /** * A flink implementation of {@link AbstractCompactHelpers}. * @@ -40,6 +53,7 @@ import java.util.stream.Collectors; */ public class FlinkCompactHelpers extends AbstractCompactHelpers>, List, List> { + private static final Logger LOG = LoggerFactory.getLogger(FlinkCompactHelpers.class); private FlinkCompactHelpers() { } @@ -71,5 +85,63 @@ public class FlinkCompactHelpers extends } return metadata; } + + @SuppressWarnings("unchecked, rawtypes") + public static List compact( + HoodieFlinkWriteClient writeClient, + String compactInstantTime, + CompactionOperation compactionOperation) throws IOException { + HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor(); + return compactor.compact( + new HoodieFlinkCopyOnWriteTable<>( + writeClient.getConfig(), + writeClient.getEngineContext(), + writeClient.getHoodieTable().getMetaClient()), + writeClient.getHoodieTable().getMetaClient(), + writeClient.getConfig(), + compactionOperation, + compactInstantTime); + } + + /** + * Called by the metadata table compactor code path. + */ + @SuppressWarnings("unchecked, rawtypes") + public static List compact(String compactionInstantTime, HoodieFlinkWriteClient writeClient) throws IOException { + HoodieFlinkTable table = writeClient.getHoodieTable(); + HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); + HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); + if (pendingCompactionTimeline.containsInstant(inflightInstant)) { + writeClient.rollbackInflightCompaction(inflightInstant, table); + table.getMetaClient().reloadActiveTimeline(); + } + + // generate compaction plan + // should support configurable commit metadata + HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( + table.getMetaClient(), compactionInstantTime); + + if (compactionPlan == null || (compactionPlan.getOperations() == null) + || (compactionPlan.getOperations().isEmpty())) { + // do nothing. + LOG.info("No compaction plan for instant " + compactionInstantTime); + return Collections.emptyList(); + } else { + HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); + // Mark instant as compaction inflight + table.getActiveTimeline().transitionCompactionRequestedToInflight(instant); + table.getMetaClient().reloadActiveTimeline(); + + List operations = compactionPlan.getOperations().stream() + .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); + LOG.info("Compacting " + operations + " files"); + List writeStatusList = new ArrayList<>(); + for (CompactionOperation operation : operations) { + List statuses = compact(writeClient, compactionInstantTime, operation); + writeStatusList.addAll(statuses); + } + return writeStatusList; + } + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java index 8ed66bdda..1ffcb72a5 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java @@ -21,7 +21,10 @@ package org.apache.hudi.table.action.rollback; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.IOType; @@ -31,7 +34,11 @@ import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; +import org.apache.hadoop.fs.FileStatus; + +import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import scala.Tuple2; @@ -71,4 +78,11 @@ public class FlinkMarkerBasedRollbackStrategy ext throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e); } } + + protected Map getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException { + // collect all log files that is supposed to be deleted with this rollback + return FSUtils.getAllLogFiles(table.getMetaClient().getFs(), + FSUtils.getPartitionPath(config.getBasePath(), partitionPathStr), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime) + .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen())); + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java index 562acfd20..f03b211bf 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java @@ -21,6 +21,7 @@ package org.apache.hudi.table.action.rollback; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; @@ -116,12 +117,22 @@ public class ListingBasedRollbackHelper implements Serializable { .withDeletedFileResults(filesToDeletedStatus).build()); } case APPEND_ROLLBACK_BLOCK: { + String fileId = rollbackRequest.getFileId().get(); + String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get(); + + // collect all log files that is supposed to be deleted with this rollback + Map writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(), + FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()), + fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant) + .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen())); + HoodieLogFormat.Writer writer = null; try { writer = HoodieLogFormat.newWriterBuilder() .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath())) - .withFileId(rollbackRequest.getFileId().get()) - .overBaseCommit(rollbackRequest.getLatestBaseInstant().get()).withFs(metaClient.getFs()) + .withFileId(fileId) + .overBaseCommit(latestBaseInstant) + .withFs(metaClient.getFs()) .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); // generate metadata @@ -151,7 +162,8 @@ public class ListingBasedRollbackHelper implements Serializable { ); return new ImmutablePair<>(rollbackRequest.getPartitionPath(), HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .withRollbackBlockAppendResults(filesToNumBlocksRollback).build()); + .withRollbackBlockAppendResults(filesToNumBlocksRollback) + .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build()); } default: throw new IllegalStateException("Unknown Rollback action " + rollbackRequest); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java index 65daf782f..3850ec8ac 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java @@ -38,14 +38,6 @@ public class FlinkClientUtil { return HoodieTableMetaClient.builder().setBasePath(basePath).setConf(FlinkClientUtil.getHadoopConf()).build(); } - /** - * Parses the file name from path. - */ - public static String parseFileName(String path) { - int slash = path.lastIndexOf(Path.SEPARATOR); - return path.substring(slash + 1); - } - /** * Returns the hadoop configuration with possible hadoop conf paths. * E.G. the configurations under path $HADOOP_CONF_DIR and $HADOOP_HOME. diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index f30e8b23b..f839b5ef9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -76,6 +76,22 @@ public class FlinkOptions extends HoodieConfig { .withDescription("The default partition name in case the dynamic partition" + " column value is null/empty string"); + // ------------------------------------------------------------------------ + // Metadata table Options + // ------------------------------------------------------------------------ + + public static final ConfigOption METADATA_ENABLED = ConfigOptions + .key("metadata.enabled") + .booleanType() + .defaultValue(false) + .withDescription("Enable the internal metadata table which serves table metadata like level file listings, default false"); + + public static final ConfigOption METADATA_COMPACTION_DELTA_COMMITS = ConfigOptions + .key("metadata.compaction.delta_commits") + .intType() + .defaultValue(24) + .withDescription("Max delta commits for metadata table to trigger compaction, default 24"); + // ------------------------------------------------------------------------ // Index Options // ------------------------------------------------------------------------ diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 891193b73..375265a4b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -324,13 +324,7 @@ public class StreamWriteFunction } private void sendBootstrapEvent() { - WriteMetadataEvent event = WriteMetadataEvent.builder() - .taskID(taskID) - .writeStatus(Collections.emptyList()) - .instantTime("") - .bootstrap(true) - .build(); - this.eventGateway.sendEventToCoordinator(event); + this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID)); LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index e032c6f59..53122c3ec 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -94,7 +94,7 @@ public class StreamWriteOperatorCoordinator /** * Current REQUESTED instant, for validation. */ - private volatile String instant = ""; + private volatile String instant = WriteMetadataEvent.BOOTSTRAP_INSTANT; /** * Event buffer for one round of checkpointing. When all the elements are non-null and have the same @@ -107,11 +107,6 @@ public class StreamWriteOperatorCoordinator */ private final int parallelism; - /** - * Whether to schedule compaction plan on finished checkpoints. - */ - private final boolean scheduleCompaction; - /** * A single-thread executor to handle all the asynchronous jobs of the coordinator. */ @@ -127,6 +122,11 @@ public class StreamWriteOperatorCoordinator */ private HiveSyncContext hiveSyncContext; + /** + * A single-thread executor to handle metadata table sync. + */ + private NonThrownExecutor metadataSyncExecutor; + /** * The table state. */ @@ -144,23 +144,25 @@ public class StreamWriteOperatorCoordinator this.conf = conf; this.context = context; this.parallelism = context.currentParallelism(); - this.scheduleCompaction = StreamerUtil.needsScheduleCompaction(conf); } @Override public void start() throws Exception { // initialize event buffer reset(); - this.writeClient = StreamerUtil.createWriteClient(conf, null); + this.writeClient = StreamerUtil.createWriteClient(conf); this.tableState = TableState.create(conf); // init table, create it if not exists. initTableIfNotExists(this.conf); // start the executor this.executor = new CoordinatorExecutor(this.context, LOG); // start the executor if required - if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) { + if (tableState.syncHive) { initHiveSync(); } + if (tableState.syncMetadata) { + initMetadataSync(); + } } @Override @@ -205,13 +207,15 @@ public class StreamWriteOperatorCoordinator final boolean committed = commitInstant(this.instant); if (committed) { // if async compaction is on, schedule the compaction - if (scheduleCompaction) { + if (tableState.scheduleCompaction) { writeClient.scheduleCompaction(Option.empty()); } // start new instant. startInstant(); // sync Hive if is enabled syncHiveIfEnabled(); + // sync metadata if is enabled + syncMetadataIfEnabled(); } }, "commits the instant %s", this.instant ); @@ -263,7 +267,7 @@ public class StreamWriteOperatorCoordinator } private void syncHiveIfEnabled() { - if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) { + if (tableState.syncHive) { this.hiveSyncExecutor.execute(this::syncHive, "sync hive metadata for instant %s", this.instant); } } @@ -275,6 +279,27 @@ public class StreamWriteOperatorCoordinator hiveSyncContext.hiveSyncTool().syncHoodieTable(); } + private void initMetadataSync() { + this.metadataSyncExecutor = new NonThrownExecutor(LOG, true); + } + + /** + * Sync the write metadata to the metadata table. + */ + private void syncMetadataIfEnabled() { + if (tableState.syncMetadata) { + this.metadataSyncExecutor.execute(this::syncMetadata, + "sync metadata table for instant %s", this.instant); + } + } + + /** + * Sync the write metadata to the metadata table. + */ + private void syncMetadata() { + this.writeClient.syncTableMetadata(); + } + private void reset() { this.eventBuffer = new WriteMetadataEvent[this.parallelism]; } @@ -324,6 +349,11 @@ public class StreamWriteOperatorCoordinator LOG.info("Recommit instant {}", instant); commitInstant(instant); } + if (tableState.syncMetadata) { + // initialize metadata table first if enabled + // condition: the data set timeline has committed instants + syncMetadata(); + } // starts a new instant startInstant(); }, "initialize instant %s", instant); @@ -344,6 +374,8 @@ public class StreamWriteOperatorCoordinator commitInstant(this.instant); // sync Hive if is enabled in batch mode. syncHiveIfEnabled(); + // sync metadata if is enabled in batch mode. + syncMetadataIfEnabled(); } } @@ -480,6 +512,14 @@ public class StreamWriteOperatorCoordinator this.executor = executor; } + @VisibleForTesting + public void setMetadataSyncExecutor(NonThrownExecutor executor) throws Exception { + if (this.metadataSyncExecutor != null) { + this.metadataSyncExecutor.close(); + } + this.metadataSyncExecutor = executor; + } + // ------------------------------------------------------------------------- // Inner Class // ------------------------------------------------------------------------- @@ -513,15 +553,21 @@ public class StreamWriteOperatorCoordinator private static class TableState implements Serializable { private static final long serialVersionUID = 1L; - private final WriteOperationType operationType; - private final String commitAction; - private final boolean isOverwrite; + final WriteOperationType operationType; + final String commitAction; + final boolean isOverwrite; + final boolean scheduleCompaction; + final boolean syncHive; + final boolean syncMetadata; private TableState(Configuration conf) { this.operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)); this.commitAction = CommitUtils.getCommitActionType(this.operationType, HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase(Locale.ROOT))); this.isOverwrite = WriteOperationType.isOverwrite(this.operationType); + this.scheduleCompaction = StreamerUtil.needsScheduleCompaction(conf); + this.syncHive = conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED); + this.syncMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED); } public static TableState create(Configuration conf) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java index 4598583cc..591624429 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java @@ -22,8 +22,7 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.sink.utils.NonThrownExecutor; -import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable; -import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor; +import org.apache.hudi.table.action.compact.FlinkCompactHelpers; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; @@ -99,16 +98,7 @@ public class CompactFunction extends ProcessFunction collector) throws IOException { - HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor(); - List writeStatuses = compactor.compact( - new HoodieFlinkCopyOnWriteTable<>( - this.writeClient.getConfig(), - this.writeClient.getEngineContext(), - this.writeClient.getHoodieTable().getMetaClient()), - this.writeClient.getHoodieTable().getMetaClient(), - this.writeClient.getConfig(), - compactionOperation, - instantTime); + List writeStatuses = FlinkCompactHelpers.compact(writeClient, instantTime, compactionOperation); collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID)); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java b/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java index a63232a45..7aec6be41 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; @@ -33,6 +34,8 @@ import java.util.Objects; public class WriteMetadataEvent implements OperatorEvent { private static final long serialVersionUID = 1L; + public static final String BOOTSTRAP_INSTANT = ""; + private List writeStatuses; private int taskID; private String instantTime; @@ -57,11 +60,11 @@ public class WriteMetadataEvent implements OperatorEvent { * @param taskID The task ID * @param instantTime The instant time under which to write the data * @param writeStatuses The write statues list - * @param lastBatch Whether the event reports the last batch + * @param lastBatch Whether the event reports the last batch * within an checkpoint interval, * if true, the whole data set of the checkpoint * has been flushed successfully - * @param bootstrap Whether the event comes from the bootstrap + * @param bootstrap Whether the event comes from the bootstrap */ private WriteMetadataEvent( int taskID, @@ -79,7 +82,8 @@ public class WriteMetadataEvent implements OperatorEvent { } // default constructor for efficient serialization - public WriteMetadataEvent() {} + public WriteMetadataEvent() { + } /** * Returns the builder for {@link WriteMetadataEvent}. @@ -159,6 +163,25 @@ public class WriteMetadataEvent implements OperatorEvent { return lastBatch && this.instantTime.equals(currentInstant); } + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + /** + * Creates empty bootstrap event for task {@code taskId}. + * + *

The event indicates that the new instant can start directly, + * there is no old instant write statuses to recover. + */ + public static WriteMetadataEvent emptyBootstrap(int taskId) { + return WriteMetadataEvent.builder() + .taskID(taskId) + .instantTime(BOOTSTRAP_INSTANT) + .writeStatus(Collections.emptyList()) + .bootstrap(true) + .build(); + } + // ------------------------------------------------------------------------- // Builder // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java index 44e0baf06..12eb03989 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java @@ -78,8 +78,7 @@ public class HiveSyncContext { hiveSyncConfig.partitionFields = Arrays.asList(FilePathUtils.extractPartitionKeys(conf)); hiveSyncConfig.partitionValueExtractorClass = conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS); hiveSyncConfig.useJdbc = conf.getBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC); - // needs to support metadata table for flink - hiveSyncConfig.useFileListingFromMetadata = false; + hiveSyncConfig.useFileListingFromMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED); hiveSyncConfig.verifyMetadataFileListing = false; hiveSyncConfig.ignoreExceptions = conf.getBoolean(FlinkOptions.HIVE_SYNC_IGNORE_EXCEPTIONS); hiveSyncConfig.supportTimestamp = conf.getBoolean(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP); diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index a01cad498..d42993b9a 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.DFSPropertiesConfiguration; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.EngineType; @@ -170,6 +171,10 @@ public class StreamerUtil { .logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 1024 * 1024) .logFileMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_MAX_SIZE) * 1024 * 1024) .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED)) + .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS)) + .build()) .withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton .withAutoCommit(false) .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf))); @@ -265,11 +270,18 @@ public class StreamerUtil { && conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED); } + /** + * Creates the meta client. + */ + public static HoodieTableMetaClient createMetaClient(String basePath) { + return HoodieTableMetaClient.builder().setBasePath(basePath).setConf(FlinkClientUtil.getHadoopConf()).build(); + } + /** * Creates the meta client. */ public static HoodieTableMetaClient createMetaClient(Configuration conf) { - return HoodieTableMetaClient.builder().setBasePath(conf.getString(FlinkOptions.PATH)).setConf(FlinkClientUtil.getHadoopConf()).build(); + return createMetaClient(conf.getString(FlinkOptions.PATH)); } /** @@ -284,6 +296,15 @@ public class StreamerUtil { return new HoodieFlinkWriteClient<>(context, getHoodieClientConfig(conf)); } + /** + * Creates the Flink write client. + * + *

The task context supplier is a constant: the write token is always '0-1-0'. + */ + public static HoodieFlinkWriteClient createWriteClient(Configuration conf) { + return new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, getHoodieClientConfig(conf)); + } + /** * Return the median instant time between the given two instant time. */ diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index 2fd0ca404..f37989322 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -23,7 +23,9 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.utils.MockCoordinatorExecutor; import org.apache.hudi.util.StreamerUtil; @@ -34,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.util.FileUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterEach; @@ -71,22 +74,8 @@ public class TestStreamWriteOperatorCoordinator { coordinator.start(); coordinator.setExecutor(new MockCoordinatorExecutor(context)); - final WriteMetadataEvent event0 = WriteMetadataEvent.builder() - .taskID(0) - .instantTime("") - .writeStatus(Collections.emptyList()) - .bootstrap(true) - .build(); - - final WriteMetadataEvent event1 = WriteMetadataEvent.builder() - .taskID(1) - .instantTime("") - .writeStatus(Collections.emptyList()) - .bootstrap(true) - .build(); - - coordinator.handleEventFromOperator(0, event0); - coordinator.handleEventFromOperator(1, event1); + coordinator.handleEventFromOperator(0, WriteMetadataEvent.emptyBootstrap(0)); + coordinator.handleEventFromOperator(1, WriteMetadataEvent.emptyBootstrap(1)); } @AfterEach @@ -99,25 +88,8 @@ public class TestStreamWriteOperatorCoordinator { String instant = coordinator.getInstant(); assertNotEquals("", instant); - WriteStatus writeStatus = new WriteStatus(true, 0.1D); - writeStatus.setPartitionPath("par1"); - writeStatus.setStat(new HoodieWriteStat()); - OperatorEvent event0 = WriteMetadataEvent.builder() - .taskID(0) - .instantTime(instant) - .writeStatus(Collections.singletonList(writeStatus)) - .lastBatch(true) - .build(); - - WriteStatus writeStatus1 = new WriteStatus(false, 0.2D); - writeStatus1.setPartitionPath("par2"); - writeStatus1.setStat(new HoodieWriteStat()); - OperatorEvent event1 = WriteMetadataEvent.builder() - .taskID(1) - .instantTime(instant) - .writeStatus(Collections.singletonList(writeStatus1)) - .lastBatch(true) - .build(); + OperatorEvent event0 = createOperatorEvent(0, instant, "par1", true, 0.1); + OperatorEvent event1 = createOperatorEvent(1, instant, "par2", false, 0.2); coordinator.handleEventFromOperator(0, event0); coordinator.handleEventFromOperator(1, event1); @@ -177,15 +149,7 @@ public class TestStreamWriteOperatorCoordinator { assertNull(lastCompleted, "Returns early for empty write results"); assertNull(coordinator.getEventBuffer()[0]); - WriteStatus writeStatus1 = new WriteStatus(false, 0.2D); - writeStatus1.setPartitionPath("par2"); - writeStatus1.setStat(new HoodieWriteStat()); - OperatorEvent event1 = WriteMetadataEvent.builder() - .taskID(1) - .instantTime(instant) - .writeStatus(Collections.singletonList(writeStatus1)) - .lastBatch(true) - .build(); + OperatorEvent event1 = createOperatorEvent(1, instant, "par2", false, 0.2); coordinator.handleEventFromOperator(1, event1); assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(2), "Commits the instant with partial events anyway"); @@ -195,6 +159,8 @@ public class TestStreamWriteOperatorCoordinator { @Test public void testHiveSyncInvoked() throws Exception { + // reset + reset(); // override the default configuration Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); conf.setBoolean(FlinkOptions.HIVE_SYNC_ENABLED, true); @@ -203,39 +169,97 @@ public class TestStreamWriteOperatorCoordinator { coordinator.start(); coordinator.setExecutor(new MockCoordinatorExecutor(context)); - final WriteMetadataEvent event0 = WriteMetadataEvent.builder() - .taskID(0) - .instantTime("") - .writeStatus(Collections.emptyList()) - .bootstrap(true) - .build(); + final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0); + + coordinator.handleEventFromOperator(0, event0); + + String instant = mockWriteWithMetadata(); + assertNotEquals("", instant); + + // never throw for hive synchronization now + assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1)); + } + + @Test + void testSyncMetadataTable() throws Exception { + // reset + reset(); + // override the default configuration + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setBoolean(FlinkOptions.METADATA_ENABLED, true); + conf.setInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 5); + OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1); + coordinator = new StreamWriteOperatorCoordinator(conf, context); + coordinator.start(); + coordinator.setExecutor(new MockCoordinatorExecutor(context)); + coordinator.setMetadataSyncExecutor(new MockCoordinatorExecutor(context)); + + final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0); coordinator.handleEventFromOperator(0, event0); String instant = coordinator.getInstant(); assertNotEquals("", instant); - WriteStatus writeStatus = new WriteStatus(true, 0.1D); - writeStatus.setPartitionPath("par1"); - writeStatus.setStat(new HoodieWriteStat()); + final String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath()); + HoodieTableMetaClient metadataTableMetaClient = StreamerUtil.createMetaClient(metadataTableBasePath); + HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants(); + assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(1L)); + assertThat(completedTimeline.lastInstant().get().getTimestamp(), is("0000000000000")); - OperatorEvent event1 = WriteMetadataEvent.builder() - .taskID(0) - .instantTime(instant) - .writeStatus(Collections.singletonList(writeStatus)) - .lastBatch(true) - .build(); - - coordinator.handleEventFromOperator(0, event1); - - // never throw for hive synchronization now - assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1)); + // test metadata table compaction + // write another 4 commits + for (int i = 1; i < 4; i++) { + instant = mockWriteWithMetadata(); + metadataTableMetaClient.reloadActiveTimeline(); + completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants(); + assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(i + 1L)); + assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant)); + } + // the 5th commit triggers the compaction + instant = mockWriteWithMetadata(); + metadataTableMetaClient.reloadActiveTimeline(); + completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); + assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(6L)); + assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant + "001")); + assertThat(completedTimeline.lastInstant().get().getAction(), is(HoodieTimeline.COMMIT_ACTION)); } // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- + private String mockWriteWithMetadata() { + final String instant = coordinator.getInstant(); + OperatorEvent event = createOperatorEvent(0, instant, "par1", true, 0.1); + + coordinator.handleEventFromOperator(0, event); + coordinator.notifyCheckpointComplete(0); + return instant; + } + + private static WriteMetadataEvent createOperatorEvent( + int taskId, + String instant, + String partitionPath, + boolean trackSuccessRecords, + double failureFraction) { + final WriteStatus writeStatus = new WriteStatus(trackSuccessRecords, failureFraction); + writeStatus.setPartitionPath(partitionPath); + writeStatus.setStat(new HoodieWriteStat()); + + return WriteMetadataEvent.builder() + .taskID(taskId) + .instantTime(instant) + .writeStatus(Collections.singletonList(writeStatus)) + .lastBatch(true) + .build(); + } + + private void reset() throws Exception { + FileUtils.cleanDirectory(tempFile); + } + private void assertError(Runnable runnable, String message) { runnable.run(); // wait a little while for the task to finish