1
0

[HUDI-2258] Metadata table for flink (#3381)

This commit is contained in:
Danny Chan
2021-08-04 10:54:55 +08:00
committed by GitHub
parent b4c14eaa29
commit 02331fc223
19 changed files with 645 additions and 119 deletions

View File

@@ -248,7 +248,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
if (!latestMetadataInstant.isPresent()) {
LOG.warn("Metadata Table will need to be re-bootstrapped as no instants were found");
rebootstrap = true;
} else if (datasetMetaClient.getActiveTimeline().isBeforeTimelineStarts(latestMetadataInstant.get().getTimestamp())) {
} else if (!latestMetadataInstant.get().getTimestamp().equals(SOLO_COMMIT_TIMESTAMP)
&& datasetMetaClient.getActiveTimeline().isBeforeTimelineStarts(latestMetadataInstant.get().getTimestamp())) {
LOG.warn("Metadata Table will need to be re-bootstrapped as un-synced instants have been archived."
+ " latestMetadataInstant=" + latestMetadataInstant.get().getTimestamp()
+ ", latestDatasetInstant=" + datasetMetaClient.getActiveTimeline().firstInstant().get().getTimestamp());

View File

@@ -40,7 +40,9 @@ import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.FlinkHoodieIndex;
import org.apache.hudi.index.HoodieIndex;
@@ -50,6 +52,8 @@ import org.apache.hudi.io.FlinkMergeAndReplaceHandle;
import org.apache.hudi.io.FlinkMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.MiniBatchHandle;
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
@@ -153,7 +157,15 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
@Override
public List<WriteStatus> upsertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime) {
throw new HoodieNotSupportedException("UpsertPrepped operation is not supported yet");
// only used for metadata table, the upsert happens in single thread
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient());
final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(preppedRecords.get(0), getConfig(),
instantTime, table, preppedRecords.listIterator());
HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) table).upsertPrepped(context, writeHandle, instantTime, preppedRecords);
return postWrite(result, instantTime, table);
}
@Override
@@ -355,7 +367,14 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
@Override
protected List<WriteStatus> compact(String compactionInstantTime, boolean shouldComplete) {
throw new HoodieNotSupportedException("Compaction is not supported yet");
// only used for metadata table, the compaction happens in single thread
try {
List<WriteStatus> writeStatuses = FlinkCompactHelpers.compact(compactionInstantTime, this);
commitCompaction(compactionInstantTime, writeStatuses, Option.empty());
return writeStatuses;
} catch (IOException e) {
throw new HoodieException("Error while compacting instant: " + compactionInstantTime);
}
}
@Override
@@ -370,6 +389,16 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
return getTableAndInitCtx(metaClient, operationType);
}
@Override
public void syncTableMetadata() {
// Open up the metadata table again, for syncing
try (HoodieTableMetadataWriter writer = FlinkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context)) {
LOG.info("Successfully synced to metadata table");
} catch (Exception e) {
throw new HoodieMetadataException("Error syncing to metadata table.", e);
}
}
/**
* Clean the write handles within a checkpoint interval.
* All the handles should have been closed already.

View File

@@ -32,6 +32,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -47,7 +48,13 @@ import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper
* A flink engine implementation of HoodieEngineContext.
*/
public class HoodieFlinkEngineContext extends HoodieEngineContext {
private RuntimeContext runtimeContext;
public static final HoodieFlinkEngineContext DEFAULT = new HoodieFlinkEngineContext();
private final RuntimeContext runtimeContext;
private HoodieFlinkEngineContext() {
this(new SerializableConfiguration(FlinkClientUtil.getHadoopConf()), new DefaultTaskContextSupplier());
}
public HoodieFlinkEngineContext(TaskContextSupplier taskContextSupplier) {
this(new SerializableConfiguration(FlinkClientUtil.getHadoopConf()), taskContextSupplier);
@@ -97,4 +104,34 @@ public class HoodieFlinkEngineContext extends HoodieEngineContext {
public void setJobStatus(String activeModule, String activityDescription) {
// no operation for now
}
/**
* Override the flink context supplier to return constant write token.
*/
private static class DefaultTaskContextSupplier extends FlinkTaskContextSupplier {
public DefaultTaskContextSupplier() {
this(null);
}
public DefaultTaskContextSupplier(RuntimeContext flinkRuntimeContext) {
super(flinkRuntimeContext);
}
public Supplier<Integer> getPartitionIdSupplier() {
return () -> 0;
}
public Supplier<Integer> getStageIdSupplier() {
return () -> 1;
}
public Supplier<Long> getAttemptIdSupplier() {
return () -> 0L;
}
public Option<String> getProperty(EngineProperty prop) {
return Option.empty();
}
}
}

View File

@@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.metadata;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter {
private static final Logger LOG = LogManager.getLogger(FlinkHoodieBackedTableMetadataWriter.class);
public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, HoodieEngineContext context) {
return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, context);
}
FlinkHoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
super(hadoopConf, writeConfig, engineContext);
}
@Override
protected void initRegistry() {
if (metadataWriteConfig.isMetricsOn()) {
// should support executor metrics
Registry registry = Registry.getRegistry("HoodieMetadata");
this.metrics = Option.of(new HoodieMetadataMetrics(registry));
} else {
this.metrics = Option.empty();
}
}
@Override
protected void initialize(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient) {
try {
if (enabled) {
bootstrapIfNeeded(engineContext, datasetMetaClient);
}
} catch (IOException e) {
LOG.error("Failed to initialize metadata table. Disabling the writer.", e);
enabled = false;
}
}
@Override
protected void commit(List<HoodieRecord> records, String partitionName, String instantTime) {
ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled");
List<HoodieRecord> recordRDD = prepRecords(records, partitionName);
try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig, true)) {
writeClient.startCommitWithTime(instantTime);
writeClient.transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime);
List<WriteStatus> statuses = writeClient.upsertPreppedRecords(recordRDD, instantTime);
statuses.forEach(writeStatus -> {
if (writeStatus.hasErrors()) {
throw new HoodieMetadataException("Failed to commit metadata table records at instant " + instantTime);
}
});
writeClient.commit(instantTime, statuses, Option.empty(), HoodieActiveTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap());
// trigger cleaning, compaction, with suffixes based on the same instant time. This ensures that any future
// delta commits synced over will not have an instant time lesser than the last completed instant on the
// metadata table.
if (writeClient.scheduleCompactionAtInstant(instantTime + "001", Option.empty())) {
writeClient.compact(instantTime + "001");
}
writeClient.clean(instantTime + "002");
}
// Update total size of the metadata and count of base/log files
metrics.ifPresent(m -> {
try {
Map<String, String> stats = m.getStats(false, metaClient, metadata);
m.updateMetrics(Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_BASE_FILE_SIZE)),
Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_LOG_FILE_SIZE)),
Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_BASE_FILES)),
Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_LOG_FILES)));
} catch (HoodieIOException e) {
LOG.error("Could not publish metadata size metrics", e);
}
});
}
/**
* Return the timestamp of the latest instant synced.
* <p>
* To sync a instant on dataset, we create a corresponding delta-commit on the metadata table. So return the latest
* delta-commit.
*/
@Override
public Option<String> getLatestSyncedInstantTime() {
if (!enabled) {
return Option.empty();
}
HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
return timeline.getDeltaCommitTimeline().filterCompletedInstants()
.lastInstant().map(HoodieInstant::getTimestamp);
}
/**
* Tag each record with the location.
* <p>
* Since we only read the latest base file in a partition, we tag the records with the instant time of the latest
* base file.
*/
private List<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName) {
HoodieTable table = HoodieFlinkTable.create(metadataWriteConfig, (HoodieFlinkEngineContext) engineContext);
TableFileSystemView.SliceView fsView = table.getSliceView();
List<HoodieBaseFile> baseFiles = fsView.getLatestFileSlices(partitionName)
.map(FileSlice::getBaseFile)
.filter(Option::isPresent)
.map(Option::get)
.collect(Collectors.toList());
// All the metadata fits within a single base file
if (partitionName.equals(MetadataPartitionType.FILES.partitionPath())) {
if (baseFiles.size() > 1) {
throw new HoodieMetadataException("Multiple base files found in metadata partition");
}
}
String fileId;
String instantTime;
if (!baseFiles.isEmpty()) {
fileId = baseFiles.get(0).getFileId();
instantTime = "U";
} else {
// If there is a log file then we can assume that it has the data
List<HoodieLogFile> logFiles = fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath())
.map(FileSlice::getLatestLogFile)
.filter(Option::isPresent)
.map(Option::get)
.collect(Collectors.toList());
if (logFiles.isEmpty()) {
// No base and log files. All are new inserts
fileId = FSUtils.createNewFileIdPfx();
instantTime = "I";
} else {
fileId = logFiles.get(0).getFileId();
instantTime = "U";
}
}
return records.stream().map(r -> r.setCurrentLocation(new HoodieRecordLocation(instantTime, fileId))).collect(Collectors.toList());
}
}

View File

@@ -34,6 +34,7 @@ import org.apache.hudi.io.FlinkAppendHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExecutor;
import org.apache.hudi.table.action.commit.delta.FlinkUpsertPreppedDeltaCommitActionExecutor;
import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor;
import org.apache.hudi.table.action.compact.FlinkScheduleCompactionActionExecutor;
import org.apache.hudi.table.action.rollback.FlinkMergeOnReadRollbackActionExecutor;
@@ -63,6 +64,18 @@ public class HoodieFlinkMergeOnReadTable<T extends HoodieRecordPayload>
return new FlinkUpsertDeltaCommitActionExecutor<>(context, appendHandle, config, this, instantTime, hoodieRecords).execute();
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(
HoodieEngineContext context,
HoodieWriteHandle<?, ?, ?, ?> writeHandle,
String instantTime,
List<HoodieRecord<T>> preppedRecords) {
ValidationUtils.checkArgument(writeHandle instanceof FlinkAppendHandle,
"MOR write handle should always be a FlinkAppendHandle");
FlinkAppendHandle<?, ?, ?, ?> appendHandle = (FlinkAppendHandle<?, ?, ?, ?>) writeHandle;
return new FlinkUpsertPreppedDeltaCommitActionExecutor<>(context, appendHandle, config, this, instantTime, preppedRecords).execute();
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> insert(
HoodieEngineContext context,

View File

@@ -32,7 +32,7 @@ import java.util.List;
public class FlinkUpsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseFlinkDeltaCommitActionExecutor<T> {
private List<HoodieRecord<T>> inputRecords;
private final List<HoodieRecord<T>> inputRecords;
public FlinkUpsertDeltaCommitActionExecutor(HoodieEngineContext context,
FlinkAppendHandle<?, ?, ?, ?> writeHandle,

View File

@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit.delta;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.FlinkAppendHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import java.util.List;
public class FlinkUpsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseFlinkDeltaCommitActionExecutor<T> {
private final List<HoodieRecord<T>> preppedRecords;
public FlinkUpsertPreppedDeltaCommitActionExecutor(HoodieEngineContext context,
FlinkAppendHandle<?, ?, ?, ?> writeHandle,
HoodieWriteConfig config,
HoodieTable table,
String instantTime,
List<HoodieRecord<T>> preppedRecords) {
super(context, writeHandle, config, table, instantTime, WriteOperationType.UPSERT_PREPPED);
this.preppedRecords = preppedRecords;
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> execute() {
return super.execute(preppedRecords);
}
}

View File

@@ -19,20 +19,33 @@
package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.toList;
/**
* A flink implementation of {@link AbstractCompactHelpers}.
*
@@ -40,6 +53,7 @@ import java.util.stream.Collectors;
*/
public class FlinkCompactHelpers<T extends HoodieRecordPayload> extends
AbstractCompactHelpers<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
private static final Logger LOG = LoggerFactory.getLogger(FlinkCompactHelpers.class);
private FlinkCompactHelpers() {
}
@@ -71,5 +85,63 @@ public class FlinkCompactHelpers<T extends HoodieRecordPayload> extends
}
return metadata;
}
@SuppressWarnings("unchecked, rawtypes")
public static List<WriteStatus> compact(
HoodieFlinkWriteClient writeClient,
String compactInstantTime,
CompactionOperation compactionOperation) throws IOException {
HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor();
return compactor.compact(
new HoodieFlinkCopyOnWriteTable<>(
writeClient.getConfig(),
writeClient.getEngineContext(),
writeClient.getHoodieTable().getMetaClient()),
writeClient.getHoodieTable().getMetaClient(),
writeClient.getConfig(),
compactionOperation,
compactInstantTime);
}
/**
* Called by the metadata table compactor code path.
*/
@SuppressWarnings("unchecked, rawtypes")
public static List<WriteStatus> compact(String compactionInstantTime, HoodieFlinkWriteClient writeClient) throws IOException {
HoodieFlinkTable table = writeClient.getHoodieTable();
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
writeClient.rollbackInflightCompaction(inflightInstant, table);
table.getMetaClient().reloadActiveTimeline();
}
// generate compaction plan
// should support configurable commit metadata
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
table.getMetaClient(), compactionInstantTime);
if (compactionPlan == null || (compactionPlan.getOperations() == null)
|| (compactionPlan.getOperations().isEmpty())) {
// do nothing.
LOG.info("No compaction plan for instant " + compactionInstantTime);
return Collections.emptyList();
} else {
HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
// Mark instant as compaction inflight
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
table.getMetaClient().reloadActiveTimeline();
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("Compacting " + operations + " files");
List<WriteStatus> writeStatusList = new ArrayList<>();
for (CompactionOperation operation : operations) {
List<WriteStatus> statuses = compact(writeClient, compactionInstantTime, operation);
writeStatusList.addAll(statuses);
}
return writeStatusList;
}
}
}

View File

@@ -21,7 +21,10 @@ package org.apache.hudi.table.action.rollback;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.IOType;
@@ -31,7 +34,11 @@ import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hadoop.fs.FileStatus;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import scala.Tuple2;
@@ -71,4 +78,11 @@ public class FlinkMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> ext
throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
}
}
protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException {
// collect all log files that is supposed to be deleted with this rollback
return FSUtils.getAllLogFiles(table.getMetaClient().getFs(),
FSUtils.getPartitionPath(config.getBasePath(), partitionPathStr), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime)
.collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
}
}

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.rollback;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
@@ -116,12 +117,22 @@ public class ListingBasedRollbackHelper implements Serializable {
.withDeletedFileResults(filesToDeletedStatus).build());
}
case APPEND_ROLLBACK_BLOCK: {
String fileId = rollbackRequest.getFileId().get();
String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get();
// collect all log files that is supposed to be deleted with this rollback
Map<FileStatus, Long> writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(),
FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()),
fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant)
.collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
HoodieLogFormat.Writer writer = null;
try {
writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
.withFileId(rollbackRequest.getFileId().get())
.overBaseCommit(rollbackRequest.getLatestBaseInstant().get()).withFs(metaClient.getFs())
.withFileId(fileId)
.overBaseCommit(latestBaseInstant)
.withFs(metaClient.getFs())
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
// generate metadata
@@ -151,7 +162,8 @@ public class ListingBasedRollbackHelper implements Serializable {
);
return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
.withRollbackBlockAppendResults(filesToNumBlocksRollback)
.withWrittenLogFileSizeMap(writtenLogFileSizeMap).build());
}
default:
throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);

View File

@@ -38,14 +38,6 @@ public class FlinkClientUtil {
return HoodieTableMetaClient.builder().setBasePath(basePath).setConf(FlinkClientUtil.getHadoopConf()).build();
}
/**
* Parses the file name from path.
*/
public static String parseFileName(String path) {
int slash = path.lastIndexOf(Path.SEPARATOR);
return path.substring(slash + 1);
}
/**
* Returns the hadoop configuration with possible hadoop conf paths.
* E.G. the configurations under path $HADOOP_CONF_DIR and $HADOOP_HOME.

View File

@@ -76,6 +76,22 @@ public class FlinkOptions extends HoodieConfig {
.withDescription("The default partition name in case the dynamic partition"
+ " column value is null/empty string");
// ------------------------------------------------------------------------
// Metadata table Options
// ------------------------------------------------------------------------
public static final ConfigOption<Boolean> METADATA_ENABLED = ConfigOptions
.key("metadata.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Enable the internal metadata table which serves table metadata like level file listings, default false");
public static final ConfigOption<Integer> METADATA_COMPACTION_DELTA_COMMITS = ConfigOptions
.key("metadata.compaction.delta_commits")
.intType()
.defaultValue(24)
.withDescription("Max delta commits for metadata table to trigger compaction, default 24");
// ------------------------------------------------------------------------
// Index Options
// ------------------------------------------------------------------------

View File

@@ -324,13 +324,7 @@ public class StreamWriteFunction<K, I, O>
}
private void sendBootstrapEvent() {
WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.writeStatus(Collections.emptyList())
.instantTime("")
.bootstrap(true)
.build();
this.eventGateway.sendEventToCoordinator(event);
this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID));
LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID);
}

View File

@@ -94,7 +94,7 @@ public class StreamWriteOperatorCoordinator
/**
* Current REQUESTED instant, for validation.
*/
private volatile String instant = "";
private volatile String instant = WriteMetadataEvent.BOOTSTRAP_INSTANT;
/**
* Event buffer for one round of checkpointing. When all the elements are non-null and have the same
@@ -107,11 +107,6 @@ public class StreamWriteOperatorCoordinator
*/
private final int parallelism;
/**
* Whether to schedule compaction plan on finished checkpoints.
*/
private final boolean scheduleCompaction;
/**
* A single-thread executor to handle all the asynchronous jobs of the coordinator.
*/
@@ -127,6 +122,11 @@ public class StreamWriteOperatorCoordinator
*/
private HiveSyncContext hiveSyncContext;
/**
* A single-thread executor to handle metadata table sync.
*/
private NonThrownExecutor metadataSyncExecutor;
/**
* The table state.
*/
@@ -144,23 +144,25 @@ public class StreamWriteOperatorCoordinator
this.conf = conf;
this.context = context;
this.parallelism = context.currentParallelism();
this.scheduleCompaction = StreamerUtil.needsScheduleCompaction(conf);
}
@Override
public void start() throws Exception {
// initialize event buffer
reset();
this.writeClient = StreamerUtil.createWriteClient(conf, null);
this.writeClient = StreamerUtil.createWriteClient(conf);
this.tableState = TableState.create(conf);
// init table, create it if not exists.
initTableIfNotExists(this.conf);
// start the executor
this.executor = new CoordinatorExecutor(this.context, LOG);
// start the executor if required
if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) {
if (tableState.syncHive) {
initHiveSync();
}
if (tableState.syncMetadata) {
initMetadataSync();
}
}
@Override
@@ -205,13 +207,15 @@ public class StreamWriteOperatorCoordinator
final boolean committed = commitInstant(this.instant);
if (committed) {
// if async compaction is on, schedule the compaction
if (scheduleCompaction) {
if (tableState.scheduleCompaction) {
writeClient.scheduleCompaction(Option.empty());
}
// start new instant.
startInstant();
// sync Hive if is enabled
syncHiveIfEnabled();
// sync metadata if is enabled
syncMetadataIfEnabled();
}
}, "commits the instant %s", this.instant
);
@@ -263,7 +267,7 @@ public class StreamWriteOperatorCoordinator
}
private void syncHiveIfEnabled() {
if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) {
if (tableState.syncHive) {
this.hiveSyncExecutor.execute(this::syncHive, "sync hive metadata for instant %s", this.instant);
}
}
@@ -275,6 +279,27 @@ public class StreamWriteOperatorCoordinator
hiveSyncContext.hiveSyncTool().syncHoodieTable();
}
private void initMetadataSync() {
this.metadataSyncExecutor = new NonThrownExecutor(LOG, true);
}
/**
* Sync the write metadata to the metadata table.
*/
private void syncMetadataIfEnabled() {
if (tableState.syncMetadata) {
this.metadataSyncExecutor.execute(this::syncMetadata,
"sync metadata table for instant %s", this.instant);
}
}
/**
* Sync the write metadata to the metadata table.
*/
private void syncMetadata() {
this.writeClient.syncTableMetadata();
}
private void reset() {
this.eventBuffer = new WriteMetadataEvent[this.parallelism];
}
@@ -324,6 +349,11 @@ public class StreamWriteOperatorCoordinator
LOG.info("Recommit instant {}", instant);
commitInstant(instant);
}
if (tableState.syncMetadata) {
// initialize metadata table first if enabled
// condition: the data set timeline has committed instants
syncMetadata();
}
// starts a new instant
startInstant();
}, "initialize instant %s", instant);
@@ -344,6 +374,8 @@ public class StreamWriteOperatorCoordinator
commitInstant(this.instant);
// sync Hive if is enabled in batch mode.
syncHiveIfEnabled();
// sync metadata if is enabled in batch mode.
syncMetadataIfEnabled();
}
}
@@ -480,6 +512,14 @@ public class StreamWriteOperatorCoordinator
this.executor = executor;
}
@VisibleForTesting
public void setMetadataSyncExecutor(NonThrownExecutor executor) throws Exception {
if (this.metadataSyncExecutor != null) {
this.metadataSyncExecutor.close();
}
this.metadataSyncExecutor = executor;
}
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
@@ -513,15 +553,21 @@ public class StreamWriteOperatorCoordinator
private static class TableState implements Serializable {
private static final long serialVersionUID = 1L;
private final WriteOperationType operationType;
private final String commitAction;
private final boolean isOverwrite;
final WriteOperationType operationType;
final String commitAction;
final boolean isOverwrite;
final boolean scheduleCompaction;
final boolean syncHive;
final boolean syncMetadata;
private TableState(Configuration conf) {
this.operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
this.commitAction = CommitUtils.getCommitActionType(this.operationType,
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase(Locale.ROOT)));
this.isOverwrite = WriteOperationType.isOverwrite(this.operationType);
this.scheduleCompaction = StreamerUtil.needsScheduleCompaction(conf);
this.syncHive = conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED);
this.syncMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED);
}
public static TableState create(Configuration conf) {

View File

@@ -22,8 +22,7 @@ import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
import org.apache.hudi.table.action.compact.FlinkCompactHelpers;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
@@ -99,16 +98,7 @@ public class CompactFunction extends ProcessFunction<CompactionPlanEvent, Compac
}
private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector<CompactionCommitEvent> collector) throws IOException {
HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor();
List<WriteStatus> writeStatuses = compactor.compact(
new HoodieFlinkCopyOnWriteTable<>(
this.writeClient.getConfig(),
this.writeClient.getEngineContext(),
this.writeClient.getHoodieTable().getMetaClient()),
this.writeClient.getHoodieTable().getMetaClient(),
this.writeClient.getConfig(),
compactionOperation,
instantTime);
List<WriteStatus> writeStatuses = FlinkCompactHelpers.compact(writeClient, instantTime, compactionOperation);
collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID));
}

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -33,6 +34,8 @@ import java.util.Objects;
public class WriteMetadataEvent implements OperatorEvent {
private static final long serialVersionUID = 1L;
public static final String BOOTSTRAP_INSTANT = "";
private List<WriteStatus> writeStatuses;
private int taskID;
private String instantTime;
@@ -57,11 +60,11 @@ public class WriteMetadataEvent implements OperatorEvent {
* @param taskID The task ID
* @param instantTime The instant time under which to write the data
* @param writeStatuses The write statues list
* @param lastBatch Whether the event reports the last batch
* @param lastBatch Whether the event reports the last batch
* within an checkpoint interval,
* if true, the whole data set of the checkpoint
* has been flushed successfully
* @param bootstrap Whether the event comes from the bootstrap
* @param bootstrap Whether the event comes from the bootstrap
*/
private WriteMetadataEvent(
int taskID,
@@ -79,7 +82,8 @@ public class WriteMetadataEvent implements OperatorEvent {
}
// default constructor for efficient serialization
public WriteMetadataEvent() {}
public WriteMetadataEvent() {
}
/**
* Returns the builder for {@link WriteMetadataEvent}.
@@ -159,6 +163,25 @@ public class WriteMetadataEvent implements OperatorEvent {
return lastBatch && this.instantTime.equals(currentInstant);
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
/**
* Creates empty bootstrap event for task {@code taskId}.
*
* <p>The event indicates that the new instant can start directly,
* there is no old instant write statuses to recover.
*/
public static WriteMetadataEvent emptyBootstrap(int taskId) {
return WriteMetadataEvent.builder()
.taskID(taskId)
.instantTime(BOOTSTRAP_INSTANT)
.writeStatus(Collections.emptyList())
.bootstrap(true)
.build();
}
// -------------------------------------------------------------------------
// Builder
// -------------------------------------------------------------------------

View File

@@ -78,8 +78,7 @@ public class HiveSyncContext {
hiveSyncConfig.partitionFields = Arrays.asList(FilePathUtils.extractPartitionKeys(conf));
hiveSyncConfig.partitionValueExtractorClass = conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS);
hiveSyncConfig.useJdbc = conf.getBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC);
// needs to support metadata table for flink
hiveSyncConfig.useFileListingFromMetadata = false;
hiveSyncConfig.useFileListingFromMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED);
hiveSyncConfig.verifyMetadataFileListing = false;
hiveSyncConfig.ignoreExceptions = conf.getBoolean(FlinkOptions.HIVE_SYNC_IGNORE_EXCEPTIONS);
hiveSyncConfig.supportTimestamp = conf.getBoolean(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP);

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
@@ -170,6 +171,10 @@ public class StreamerUtil {
.logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 1024 * 1024)
.logFileMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_MAX_SIZE) * 1024 * 1024)
.build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED))
.withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS))
.build())
.withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton
.withAutoCommit(false)
.withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));
@@ -265,11 +270,18 @@ public class StreamerUtil {
&& conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED);
}
/**
* Creates the meta client.
*/
public static HoodieTableMetaClient createMetaClient(String basePath) {
return HoodieTableMetaClient.builder().setBasePath(basePath).setConf(FlinkClientUtil.getHadoopConf()).build();
}
/**
* Creates the meta client.
*/
public static HoodieTableMetaClient createMetaClient(Configuration conf) {
return HoodieTableMetaClient.builder().setBasePath(conf.getString(FlinkOptions.PATH)).setConf(FlinkClientUtil.getHadoopConf()).build();
return createMetaClient(conf.getString(FlinkOptions.PATH));
}
/**
@@ -284,6 +296,15 @@ public class StreamerUtil {
return new HoodieFlinkWriteClient<>(context, getHoodieClientConfig(conf));
}
/**
* Creates the Flink write client.
*
* <p>The task context supplier is a constant: the write token is always '0-1-0'.
*/
public static HoodieFlinkWriteClient createWriteClient(Configuration conf) {
return new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, getHoodieClientConfig(conf));
}
/**
* Return the median instant time between the given two instant time.
*/

View File

@@ -23,7 +23,9 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
import org.apache.hudi.util.StreamerUtil;
@@ -34,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.FileUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
@@ -71,22 +74,8 @@ public class TestStreamWriteOperatorCoordinator {
coordinator.start();
coordinator.setExecutor(new MockCoordinatorExecutor(context));
final WriteMetadataEvent event0 = WriteMetadataEvent.builder()
.taskID(0)
.instantTime("")
.writeStatus(Collections.emptyList())
.bootstrap(true)
.build();
final WriteMetadataEvent event1 = WriteMetadataEvent.builder()
.taskID(1)
.instantTime("")
.writeStatus(Collections.emptyList())
.bootstrap(true)
.build();
coordinator.handleEventFromOperator(0, event0);
coordinator.handleEventFromOperator(1, event1);
coordinator.handleEventFromOperator(0, WriteMetadataEvent.emptyBootstrap(0));
coordinator.handleEventFromOperator(1, WriteMetadataEvent.emptyBootstrap(1));
}
@AfterEach
@@ -99,25 +88,8 @@ public class TestStreamWriteOperatorCoordinator {
String instant = coordinator.getInstant();
assertNotEquals("", instant);
WriteStatus writeStatus = new WriteStatus(true, 0.1D);
writeStatus.setPartitionPath("par1");
writeStatus.setStat(new HoodieWriteStat());
OperatorEvent event0 = WriteMetadataEvent.builder()
.taskID(0)
.instantTime(instant)
.writeStatus(Collections.singletonList(writeStatus))
.lastBatch(true)
.build();
WriteStatus writeStatus1 = new WriteStatus(false, 0.2D);
writeStatus1.setPartitionPath("par2");
writeStatus1.setStat(new HoodieWriteStat());
OperatorEvent event1 = WriteMetadataEvent.builder()
.taskID(1)
.instantTime(instant)
.writeStatus(Collections.singletonList(writeStatus1))
.lastBatch(true)
.build();
OperatorEvent event0 = createOperatorEvent(0, instant, "par1", true, 0.1);
OperatorEvent event1 = createOperatorEvent(1, instant, "par2", false, 0.2);
coordinator.handleEventFromOperator(0, event0);
coordinator.handleEventFromOperator(1, event1);
@@ -177,15 +149,7 @@ public class TestStreamWriteOperatorCoordinator {
assertNull(lastCompleted, "Returns early for empty write results");
assertNull(coordinator.getEventBuffer()[0]);
WriteStatus writeStatus1 = new WriteStatus(false, 0.2D);
writeStatus1.setPartitionPath("par2");
writeStatus1.setStat(new HoodieWriteStat());
OperatorEvent event1 = WriteMetadataEvent.builder()
.taskID(1)
.instantTime(instant)
.writeStatus(Collections.singletonList(writeStatus1))
.lastBatch(true)
.build();
OperatorEvent event1 = createOperatorEvent(1, instant, "par2", false, 0.2);
coordinator.handleEventFromOperator(1, event1);
assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(2),
"Commits the instant with partial events anyway");
@@ -195,6 +159,8 @@ public class TestStreamWriteOperatorCoordinator {
@Test
public void testHiveSyncInvoked() throws Exception {
// reset
reset();
// override the default configuration
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setBoolean(FlinkOptions.HIVE_SYNC_ENABLED, true);
@@ -203,39 +169,97 @@ public class TestStreamWriteOperatorCoordinator {
coordinator.start();
coordinator.setExecutor(new MockCoordinatorExecutor(context));
final WriteMetadataEvent event0 = WriteMetadataEvent.builder()
.taskID(0)
.instantTime("")
.writeStatus(Collections.emptyList())
.bootstrap(true)
.build();
final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0);
coordinator.handleEventFromOperator(0, event0);
String instant = mockWriteWithMetadata();
assertNotEquals("", instant);
// never throw for hive synchronization now
assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1));
}
@Test
void testSyncMetadataTable() throws Exception {
// reset
reset();
// override the default configuration
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
conf.setInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 5);
OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
coordinator = new StreamWriteOperatorCoordinator(conf, context);
coordinator.start();
coordinator.setExecutor(new MockCoordinatorExecutor(context));
coordinator.setMetadataSyncExecutor(new MockCoordinatorExecutor(context));
final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0);
coordinator.handleEventFromOperator(0, event0);
String instant = coordinator.getInstant();
assertNotEquals("", instant);
WriteStatus writeStatus = new WriteStatus(true, 0.1D);
writeStatus.setPartitionPath("par1");
writeStatus.setStat(new HoodieWriteStat());
final String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
HoodieTableMetaClient metadataTableMetaClient = StreamerUtil.createMetaClient(metadataTableBasePath);
HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(1L));
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is("0000000000000"));
OperatorEvent event1 = WriteMetadataEvent.builder()
.taskID(0)
.instantTime(instant)
.writeStatus(Collections.singletonList(writeStatus))
.lastBatch(true)
.build();
coordinator.handleEventFromOperator(0, event1);
// never throw for hive synchronization now
assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1));
// test metadata table compaction
// write another 4 commits
for (int i = 1; i < 4; i++) {
instant = mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(i + 1L));
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant));
}
// the 5th commit triggers the compaction
instant = mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(6L));
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant + "001"));
assertThat(completedTimeline.lastInstant().get().getAction(), is(HoodieTimeline.COMMIT_ACTION));
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
private String mockWriteWithMetadata() {
final String instant = coordinator.getInstant();
OperatorEvent event = createOperatorEvent(0, instant, "par1", true, 0.1);
coordinator.handleEventFromOperator(0, event);
coordinator.notifyCheckpointComplete(0);
return instant;
}
private static WriteMetadataEvent createOperatorEvent(
int taskId,
String instant,
String partitionPath,
boolean trackSuccessRecords,
double failureFraction) {
final WriteStatus writeStatus = new WriteStatus(trackSuccessRecords, failureFraction);
writeStatus.setPartitionPath(partitionPath);
writeStatus.setStat(new HoodieWriteStat());
return WriteMetadataEvent.builder()
.taskID(taskId)
.instantTime(instant)
.writeStatus(Collections.singletonList(writeStatus))
.lastBatch(true)
.build();
}
private void reset() throws Exception {
FileUtils.cleanDirectory(tempFile);
}
private void assertError(Runnable runnable, String message) {
runnable.run();
// wait a little while for the task to finish