1
0

[HUDI-1632] Supports merge on read write mode for Flink writer (#2593)

Also supports async compaction with pluggable strategies.
This commit is contained in:
Danny Chan
2021-03-01 12:29:41 +08:00
committed by GitHub
parent be257b58c6
commit 7a11de1276
38 changed files with 2296 additions and 152 deletions

View File

@@ -37,9 +37,11 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.FlinkHoodieIndex;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.FlinkAppendHandle;
import org.apache.hudi.io.FlinkCreateHandle;
import org.apache.hudi.io.FlinkMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
@@ -48,14 +50,19 @@ import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.FlinkCompactHelpers;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngrade;
import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.ParseException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -64,6 +71,8 @@ import java.util.stream.Collectors;
public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
AbstractHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
private static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkWriteClient.class);
/**
* FileID to write handle mapping in order to record the write handles for each file group,
* so that we can append the mini-batch data buffer incrementally.
@@ -127,21 +136,9 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT);
final HoodieRecord<T> record = records.get(0);
final HoodieRecordLocation loc = record.getCurrentLocation();
final String fileID = loc.getFileId();
final boolean isInsert = loc.getInstantTime().equals("I");
final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
if (bucketToHandles.containsKey(fileID)) {
writeHandle = bucketToHandles.get(fileID);
} else {
// create the write handle if not exists
writeHandle = isInsert
? new FlinkCreateHandle<>(getConfig(), instantTime, table, record.getPartitionPath(),
fileID, table.getTaskContextSupplier())
: new FlinkMergeHandle<>(getConfig(), instantTime, table, records.listIterator(), record.getPartitionPath(),
fileID, table.getTaskContextSupplier());
bucketToHandles.put(fileID, writeHandle);
}
final boolean isDelta = table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ);
final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(record, isDelta, getConfig(),
instantTime, table, record.getPartitionPath(), records.listIterator());
HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) table).upsert(context, writeHandle, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
@@ -160,7 +157,12 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.INSERT);
HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, instantTime, records);
// create the write handle if not exists
final HoodieRecord<T> record = records.get(0);
final boolean isDelta = table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ);
final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(record, isDelta, getConfig(),
instantTime, table, record.getPartitionPath(), records.listIterator());
HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) table).insert(context, writeHandle, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
}
@@ -207,13 +209,40 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
}
@Override
public void commitCompaction(String compactionInstantTime, List<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata) throws IOException {
throw new HoodieNotSupportedException("Compaction is not supported yet");
public void commitCompaction(
String compactionInstantTime,
List<WriteStatus> writeStatuses,
Option<Map<String, String>> extraMetadata) throws IOException {
HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
HoodieCommitMetadata metadata = FlinkCompactHelpers.newInstance().createCompactionMetadata(
table, compactionInstantTime, writeStatuses, config.getSchema());
extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
completeCompaction(metadata, writeStatuses, table, compactionInstantTime);
}
@Override
protected void completeCompaction(HoodieCommitMetadata metadata, List<WriteStatus> writeStatuses, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, String compactionCommitTime) {
throw new HoodieNotSupportedException("Compaction is not supported yet");
public void completeCompaction(
HoodieCommitMetadata metadata,
List<WriteStatus> writeStatuses,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String compactionCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
List<HoodieWriteStat> writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
finalizeWrite(table, compactionCommitTime, writeStats);
LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
FlinkCompactHelpers.newInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
if (compactionTimer != null) {
long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
try {
metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
} catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
+ config.getBasePath() + " at time " + compactionCommitTime, e);
}
}
LOG.info("Compacted successfully on commit " + compactionCommitTime);
}
@Override
@@ -244,6 +273,46 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
this.bucketToHandles.clear();
}
/**
* Get or create a new write handle in order to reuse the file handles.
*
* @param record The first record in the bucket
* @param isDelta Whether the table is in MOR mode
* @param config Write config
* @param instantTime The instant time
* @param table The table
* @param partitionPath Partition path
* @param recordItr Record iterator
* @return Existing write handle or create a new one
*/
private HoodieWriteHandle<?, ?, ?, ?> getOrCreateWriteHandle(
HoodieRecord<T> record,
boolean isDelta,
HoodieWriteConfig config,
String instantTime,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String partitionPath,
Iterator<HoodieRecord<T>> recordItr) {
final HoodieRecordLocation loc = record.getCurrentLocation();
final String fileID = loc.getFileId();
if (bucketToHandles.containsKey(fileID)) {
return bucketToHandles.get(fileID);
}
final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
if (isDelta) {
writeHandle = new FlinkAppendHandle<>(config, instantTime, table, partitionPath, fileID, recordItr,
table.getTaskContextSupplier());
} else if (loc.getInstantTime().equals("I")) {
writeHandle = new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
fileID, table.getTaskContextSupplier());
} else {
writeHandle = new FlinkMergeHandle<>(config, instantTime, table, recordItr, partitionPath,
fileID, table.getTaskContextSupplier());
}
this.bucketToHandles.put(fileID, writeHandle);
return writeHandle;
}
private HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(HoodieTableMetaClient metaClient, WriteOperationType operationType) {
if (operationType == WriteOperationType.DELETE) {
setWriteSchemaForDeletes(metaClient);
@@ -305,4 +374,16 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
activeTimeline.transitionRequestedToInflight(requested, Option.empty(),
config.shouldAllowMultiWriteOnSameInstant());
}
public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
rollbackInflightCompaction(inflightInstant, table);
}
}
public HoodieFlinkTable<T> getHoodieTable() {
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
}
}

View File

@@ -23,16 +23,12 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.FlinkHoodieIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -46,15 +42,9 @@ import java.util.List;
public class FlinkInMemoryStateIndex<T extends HoodieRecordPayload> extends FlinkHoodieIndex<T> {
private static final Logger LOG = LogManager.getLogger(FlinkInMemoryStateIndex.class);
private MapState<HoodieKey, HoodieRecordLocation> mapState;
public FlinkInMemoryStateIndex(HoodieFlinkEngineContext context, HoodieWriteConfig config) {
super(config);
if (context.getRuntimeContext() != null) {
MapStateDescriptor<HoodieKey, HoodieRecordLocation> indexStateDesc =
new MapStateDescriptor<>("indexState", TypeInformation.of(HoodieKey.class), TypeInformation.of(HoodieRecordLocation.class));
mapState = context.getRuntimeContext().getMapState(indexStateDesc);
}
}
@Override

View File

@@ -26,11 +26,11 @@ import org.apache.hudi.table.HoodieTable;
/**
* Create handle factory for Flink writer, use the specified write handle directly.
*/
public class ExplicitCreateHandleFactory<T extends HoodieRecordPayload, I, K, O>
extends CreateHandleFactory<T, I, K, O> {
public class ExplicitWriteHandleFactory<T extends HoodieRecordPayload, I, K, O>
extends WriteHandleFactory<T, I, K, O> {
private HoodieWriteHandle<T, I, K, O> writeHandle;
public ExplicitCreateHandleFactory(HoodieWriteHandle<T, I, K, O> writeHandle) {
public ExplicitWriteHandleFactory(HoodieWriteHandle<T, I, K, O> writeHandle) {
this.writeHandle = writeHandle;
}

View File

@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* A {@link HoodieAppendHandle} that supports append write incrementally(mini-batches).
*
* <p>For the first mini-batch, it initialize and set up the next file path to write,
* but does not close the file writer until all the mini-batches write finish. Each mini-batch
* data are appended to this handle, the back-up writer may rollover on condition.
*
* @param <T> Payload type
* @param <I> Input type
* @param <K> Key type
* @param <O> Output type
*/
public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieAppendHandle<T, I, K, O> implements MiniBatchHandle {
private static final Logger LOG = LoggerFactory.getLogger(FlinkAppendHandle.class);
private boolean needBootStrap = true;
// Total number of bytes written to file
private long sizeInBytes = 0;
public FlinkAppendHandle(
HoodieWriteConfig config,
String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
String partitionPath,
String fileId,
Iterator<HoodieRecord<T>> recordItr,
TaskContextSupplier taskContextSupplier) {
super(config, instantTime, hoodieTable, partitionPath, fileId, recordItr, taskContextSupplier);
}
@Override
protected boolean needsUpdateLocation() {
return false;
}
@Override
protected boolean isUpdateRecord(HoodieRecord<T> hoodieRecord) {
return hoodieRecord.getCurrentLocation() != null
&& hoodieRecord.getCurrentLocation().getInstantTime().equals("U");
}
/**
* Returns whether there is need to bootstrap this file handle.
* E.G. the first time that the handle is created.
*/
public boolean isNeedBootStrap() {
return this.needBootStrap;
}
/**
* Appends new records into this append handle.
* @param recordItr The new records iterator
*/
public void appendNewRecords(Iterator<HoodieRecord<T>> recordItr) {
this.recordItr = recordItr;
}
@Override
public List<WriteStatus> close() {
needBootStrap = false;
// flush any remaining records to disk
appendDataAndDeleteBlocks(header);
try {
for (WriteStatus status: statuses) {
long logFileSize = FSUtils.getFileSize(fs, new Path(config.getBasePath(), status.getStat().getPath()));
status.getStat().setFileSizeInBytes(logFileSize);
}
} catch (IOException e) {
throw new HoodieUpsertException("Failed to get file size for append handle", e);
}
List<WriteStatus> ret = new ArrayList<>(statuses);
statuses.clear();
return ret;
}
@Override
public void finishWrite() {
LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten);
try {
if (writer != null) {
writer.close();
}
} catch (IOException e) {
throw new HoodieUpsertException("Failed to close append handle", e);
}
}
}

View File

@@ -90,13 +90,12 @@ public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
*/
private WriteStatus getIncrementalWriteStatus() {
try {
long fileSizeInBytes = FSUtils.getFileSize(fs, path);
setUpWriteStatus(fileSizeInBytes);
setUpWriteStatus();
// reset the write status
recordsWritten = 0;
recordsDeleted = 0;
insertRecordsWritten = 0;
this.lastFileSize = fileSizeInBytes;
timer = new HoodieTimer().startTimer();
writeStatus.setTotalErrorRecords(0);
return writeStatus;
} catch (IOException e) {
@@ -107,10 +106,12 @@ public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
/**
* Set up the write status.
*
* @param fileSizeInBytes File size in bytes
* @throws IOException if error occurs
*/
private void setUpWriteStatus(long fileSizeInBytes) throws IOException {
private void setUpWriteStatus() throws IOException {
long fileSizeInBytes = FSUtils.getFileSize(fs, path);
long incFileSizeInBytes = fileSizeInBytes - lastFileSize;
this.lastFileSize = fileSizeInBytes;
HoodieWriteStat stat = new HoodieWriteStat();
stat.setPartitionPath(writeStatus.getPartitionPath());
stat.setNumWrites(recordsWritten);
@@ -119,13 +120,12 @@ public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
stat.setFileId(writeStatus.getFileId());
stat.setPath(new Path(config.getBasePath()), path);
stat.setTotalWriteBytes(fileSizeInBytes - lastFileSize);
stat.setTotalWriteBytes(incFileSizeInBytes);
stat.setFileSizeInBytes(fileSizeInBytes);
stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats();
runtimeStats.setTotalCreateTime(timer.endTimer());
stat.setRuntimeStats(runtimeStats);
timer = new HoodieTimer().startTimer();
writeStatus.setStat(stat);
}

View File

@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
@@ -109,29 +108,10 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
return writeStatus;
}
/**
* The difference with the parent method is that there is no need to set up
* locations for the records.
*
* @param fileId The file ID
* @param newRecordsItr The incremental records iterator
*/
@Override
protected void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
initializeIncomingRecordsMap();
while (newRecordsItr.hasNext()) {
HoodieRecord<T> record = newRecordsItr.next();
// NOTE: Once Records are added to map (spillable-map), DO NOT change it as they won't persist
keyToNewRecords.put(record.getRecordKey(), record);
}
LOG.info(String.format("Number of entries in MemoryBasedMap => %d\n"
+ "Total size in bytes of MemoryBasedMap => %d\n"
+ "Number of entries in DiskBasedMap => %d\n"
+ "Size of file spilled to disk => %d",
((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries(),
((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize(),
((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries(),
((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes()));
boolean needsUpdateLocation() {
// No need to update location for Flink hoodie records because all the records are pre-tagged
// with the desired locations.
return false;
}
/**

View File

@@ -26,6 +26,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -34,6 +35,10 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieSortedMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
@@ -41,10 +46,17 @@ import org.apache.hudi.table.action.clean.FlinkCleanActionExecutor;
import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkMergeHelper;
import org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.rollback.FlinkCopyOnWriteRollbackActionExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -58,7 +70,9 @@ import java.util.Map;
*/
public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends HoodieFlinkTable<T> {
protected HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
private static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCopyOnWriteTable.class);
public HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
super(config, context, metaClient);
}
@@ -270,4 +284,52 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends
public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
throw new HoodieNotSupportedException("Savepoint and restore is not supported yet");
}
// -------------------------------------------------------------------------
// Used for compaction
// -------------------------------------------------------------------------
public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException {
// these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime, partitionPath, fileId, keyToNewRecords, oldDataFile);
return handleUpdateInternal(upsertHandle, instantTime, fileId);
}
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?,?,?,?> upsertHandle, String instantTime,
String fileId) throws IOException {
if (upsertHandle.getOldFilePath() == null) {
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
} else {
FlinkMergeHelper.newInstance().runMerge(this, upsertHandle);
}
// TODO(vc): This needs to be revisited
if (upsertHandle.getPartitionPath() == null) {
LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", "
+ upsertHandle.writeStatuses());
}
return Collections.singletonList(upsertHandle.writeStatuses()).iterator();
}
protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
if (requireSortedRecords()) {
return new HoodieSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
dataFileToBeMerged, taskContextSupplier);
} else {
return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
dataFileToBeMerged,taskContextSupplier);
}
}
public Iterator<List<WriteStatus>> handleInsert(String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordMap) {
HoodieCreateHandle<?,?,?,?> createHandle =
new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordMap, taskContextSupplier);
createHandle.write();
return Collections.singletonList(createHandle.close()).iterator();
}
}

View File

@@ -18,14 +18,74 @@
package org.apache.hudi.table;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.io.FlinkAppendHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExecutor;
import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor;
import org.apache.hudi.table.action.compact.FlinkScheduleCompactionActionExecutor;
public class HoodieFlinkMergeOnReadTable<T extends HoodieRecordPayload> extends HoodieFlinkCopyOnWriteTable<T> {
protected HoodieFlinkMergeOnReadTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
import java.util.List;
import java.util.Map;
public class HoodieFlinkMergeOnReadTable<T extends HoodieRecordPayload>
extends HoodieFlinkCopyOnWriteTable<T> {
HoodieFlinkMergeOnReadTable(
HoodieWriteConfig config,
HoodieEngineContext context,
HoodieTableMetaClient metaClient) {
super(config, context, metaClient);
}
// TODO not support yet.
@Override
public HoodieWriteMetadata<List<WriteStatus>> upsert(
HoodieEngineContext context,
HoodieWriteHandle<?, ?, ?, ?> writeHandle,
String instantTime,
List<HoodieRecord<T>> hoodieRecords) {
ValidationUtils.checkArgument(writeHandle instanceof FlinkAppendHandle,
"MOR write handle should always be a FlinkAppendHandle");
FlinkAppendHandle<?, ?, ?, ?> appendHandle = (FlinkAppendHandle<?, ?, ?, ?>) writeHandle;
return new FlinkUpsertDeltaCommitActionExecutor<>(context, appendHandle, config, this, instantTime, hoodieRecords).execute();
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> insert(
HoodieEngineContext context,
HoodieWriteHandle<?, ?, ?, ?> writeHandle,
String instantTime,
List<HoodieRecord<T>> hoodieRecords) {
ValidationUtils.checkArgument(writeHandle instanceof FlinkAppendHandle,
"MOR write handle should always be a FlinkAppendHandle");
FlinkAppendHandle<?, ?, ?, ?> appendHandle = (FlinkAppendHandle<?, ?, ?, ?>) writeHandle;
return new FlinkUpsertDeltaCommitActionExecutor<>(context, appendHandle, config, this, instantTime, hoodieRecords).execute();
}
@Override
public Option<HoodieCompactionPlan> scheduleCompaction(
HoodieEngineContext context,
String instantTime,
Option<Map<String, String>> extraMetadata) {
BaseScheduleCompactionActionExecutor scheduleCompactionExecutor = new FlinkScheduleCompactionActionExecutor(
context, config, this, instantTime, extraMetadata);
return scheduleCompactionExecutor.execute();
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> compact(HoodieEngineContext context, String compactionInstantTime) {
throw new HoodieNotSupportedException("Compaction is supported as a separate pipeline, "
+ "should not invoke directly through HoodieFlinkMergeOnReadTable");
}
}

View File

@@ -29,7 +29,6 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.FlinkHoodieIndex;
import org.apache.hudi.index.HoodieIndex;
@@ -57,7 +56,7 @@ public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
case COPY_ON_WRITE:
return new HoodieFlinkCopyOnWriteTable<>(config, context, metaClient);
case MERGE_ON_READ:
throw new HoodieNotSupportedException("MERGE_ON_READ is not supported yet");
return new HoodieFlinkMergeOnReadTable<>(config, context, metaClient);
default:
throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
}

View File

@@ -35,7 +35,7 @@ import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.FlinkLazyInsertIterable;
import org.apache.hudi.io.ExplicitCreateHandleFactory;
import org.apache.hudi.io.ExplicitWriteHandleFactory;
import org.apache.hudi.io.FlinkMergeHandle;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
@@ -74,7 +74,7 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
private static final Logger LOG = LogManager.getLogger(BaseFlinkCommitActionExecutor.class);
private HoodieWriteHandle<?, ?, ?, ?> writeHandle;
protected HoodieWriteHandle<?, ?, ?, ?> writeHandle;
public BaseFlinkCommitActionExecutor(HoodieEngineContext context,
HoodieWriteHandle<?, ?, ?, ?> writeHandle,
@@ -107,22 +107,13 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I")
? BucketType.INSERT
: BucketType.UPDATE;
if (WriteOperationType.isChangingRecords(operationType)) {
handleUpsertPartition(
instantTime,
partitionPath,
fileId, bucketType,
inputRecords.iterator())
.forEachRemaining(writeStatuses::addAll);
} else {
handleUpsertPartition(
instantTime,
partitionPath,
fileId,
bucketType,
inputRecords.iterator())
.forEachRemaining(writeStatuses::addAll);
}
handleUpsertPartition(
instantTime,
partitionPath,
fileId,
bucketType,
inputRecords.iterator())
.forEachRemaining(writeStatuses::addAll);
setUpWriteMetadata(writeStatuses, result);
return result;
}
@@ -265,6 +256,6 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
}
return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime, table, idPfx,
taskContextSupplier, new ExplicitCreateHandleFactory<>(writeHandle));
taskContextSupplier, new ExplicitWriteHandleFactory<>(writeHandle));
}
}

View File

@@ -65,14 +65,13 @@ public class FlinkMergeHelper<T extends HoodieRecordPayload> extends AbstractMer
HoodieMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> upsertHandle) throws IOException {
final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();
Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
FlinkMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> mergeHandle =
(FlinkMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>) upsertHandle;
HoodieMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> mergeHandle = upsertHandle;
HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
final GenericDatumWriter<GenericRecord> gWriter;
final GenericDatumReader<GenericRecord> gReader;
Schema readSchema;
if (mergeHandle.isNeedBootStrap()
if (isNeedBootStrap(mergeHandle)
&& (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent())) {
readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema();
gWriter = new GenericDatumWriter<>(readSchema);
@@ -87,7 +86,7 @@ public class FlinkMergeHelper<T extends HoodieRecordPayload> extends AbstractMer
HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.<GenericRecord>getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());
try {
final Iterator<GenericRecord> readerIterator;
if (mergeHandle.isNeedBootStrap() && baseFile.getBootstrapBaseFile().isPresent()) {
if (isNeedBootStrap(mergeHandle) && baseFile.getBootstrapBaseFile().isPresent()) {
readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation);
} else {
readerIterator = reader.getRecordIterator(readSchema);
@@ -116,4 +115,8 @@ public class FlinkMergeHelper<T extends HoodieRecordPayload> extends AbstractMer
}
}
private static boolean isNeedBootStrap(HoodieMergeHandle<?, ?, ?, ?> mergeHandle) {
return mergeHandle instanceof FlinkMergeHandle && ((FlinkMergeHandle) mergeHandle).isNeedBootStrap();
}
}

View File

@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit.delta;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.FlinkLazyInsertIterable;
import org.apache.hudi.io.ExplicitWriteHandleFactory;
import org.apache.hudi.io.FlinkAppendHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
public abstract class BaseFlinkDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseFlinkCommitActionExecutor<T> {
public BaseFlinkDeltaCommitActionExecutor(HoodieEngineContext context,
FlinkAppendHandle<?, ?, ?, ?> writeHandle,
HoodieWriteConfig config,
HoodieTable table,
String instantTime,
WriteOperationType operationType) {
super(context, writeHandle, config, table, instantTime, operationType);
}
@Override
public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle;
if (!appendHandle.isNeedBootStrap()) {
appendHandle.appendNewRecords(recordItr);
}
appendHandle.doAppend();
List<WriteStatus> writeStatuses = appendHandle.close();
return Collections.singletonList(writeStatuses).iterator();
}
@Override
public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr) {
return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime, table,
idPfx, taskContextSupplier, new ExplicitWriteHandleFactory(writeHandle));
}
}

View File

@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit.delta;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.FlinkAppendHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.FlinkWriteHelper;
import java.util.List;
public class FlinkUpsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseFlinkDeltaCommitActionExecutor<T> {
private List<HoodieRecord<T>> inputRecords;
public FlinkUpsertDeltaCommitActionExecutor(HoodieEngineContext context,
FlinkAppendHandle<?, ?, ?, ?> writeHandle,
HoodieWriteConfig config,
HoodieTable table,
String instantTime,
List<HoodieRecord<T>> inputRecords) {
super(context, writeHandle, config, table, instantTime, WriteOperationType.UPSERT);
this.inputRecords = inputRecords;
}
@Override
public HoodieWriteMetadata execute() {
return FlinkWriteHelper.newInstance().write(instantTime, inputRecords, context, table,
config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, true);
}
}

View File

@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.table.HoodieTable;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
/**
* A flink implementation of {@link AbstractCompactHelpers}.
*
* @param <T>
*/
public class FlinkCompactHelpers<T extends HoodieRecordPayload> extends
AbstractCompactHelpers<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
private FlinkCompactHelpers() {
}
private static class CompactHelperHolder {
private static final FlinkCompactHelpers FLINK_COMPACT_HELPERS = new FlinkCompactHelpers();
}
public static FlinkCompactHelpers newInstance() {
return CompactHelperHolder.FLINK_COMPACT_HELPERS;
}
@Override
public HoodieCommitMetadata createCompactionMetadata(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String compactionInstantTime,
List<WriteStatus> writeStatuses,
String schema) throws IOException {
byte[] planBytes = table.getActiveTimeline().readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get();
HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan(planBytes);
List<HoodieWriteStat> updateStatusMap = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
org.apache.hudi.common.model.HoodieCommitMetadata metadata = new org.apache.hudi.common.model.HoodieCommitMetadata(true);
for (HoodieWriteStat stat : updateStatusMap) {
metadata.addWriteStat(stat.getPartitionPath(), stat);
}
metadata.addMetadata(org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY, schema);
if (compactionPlan.getExtraMetadata() != null) {
compactionPlan.getExtraMetadata().forEach(metadata::addMetadata);
}
return metadata;
}
}

View File

@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.text.ParseException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@SuppressWarnings("checkstyle:LineLength")
public class FlinkScheduleCompactionActionExecutor<T extends HoodieRecordPayload> extends
BaseScheduleCompactionActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(FlinkScheduleCompactionActionExecutor.class);
public FlinkScheduleCompactionActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String instantTime,
Option<Map<String, String>> extraMetadata) {
super(context, config, table, instantTime, extraMetadata);
}
@Override
protected HoodieCompactionPlan scheduleCompaction() {
LOG.info("Checking if compaction needs to be run on " + config.getBasePath());
// judge if we need to compact according to num delta commits and time elapsed
boolean compactable = needCompact(config.getInlineCompactTriggerStrategy());
if (compactable) {
LOG.info("Generating compaction plan for merge on read table " + config.getBasePath());
HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor();
try {
SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView();
Set<HoodieFileGroupId> fgInPendingCompactionAndClustering = fileSystemView.getPendingCompactionOperations()
.map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId())
.collect(Collectors.toSet());
// exclude files in pending clustering from compaction.
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
return compactor.generateCompactionPlan(context, table, config, instantTime, fgInPendingCompactionAndClustering);
} catch (IOException e) {
throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e);
}
}
return new HoodieCompactionPlan();
}
public Pair<Integer, String> getLatestDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) {
Option<HoodieInstant> lastCompaction = table.getActiveTimeline().getCommitTimeline()
.filterCompletedInstants().lastInstant();
HoodieTimeline deltaCommits = table.getActiveTimeline().getDeltaCommitTimeline();
String latestInstantTs;
int deltaCommitsSinceLastCompaction = 0;
if (lastCompaction.isPresent()) {
latestInstantTs = lastCompaction.get().getTimestamp();
deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfter(latestInstantTs, Integer.MAX_VALUE).countInstants();
} else {
latestInstantTs = deltaCommits.firstInstant().get().getTimestamp();
deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfterOrEquals(latestInstantTs, Integer.MAX_VALUE).countInstants();
}
return Pair.of(deltaCommitsSinceLastCompaction, latestInstantTs);
}
public boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy) {
boolean compactable;
// get deltaCommitsSinceLastCompaction and lastCompactionTs
Pair<Integer, String> latestDeltaCommitInfo = getLatestDeltaCommitInfo(compactionTriggerStrategy);
int inlineCompactDeltaCommitMax = config.getInlineCompactDeltaCommitMax();
int inlineCompactDeltaSecondsMax = config.getInlineCompactDeltaSecondsMax();
switch (compactionTriggerStrategy) {
case NUM_COMMITS:
compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft();
if (compactable) {
LOG.info(String.format("The delta commits >= %s, trigger compaction scheduler.", inlineCompactDeltaCommitMax));
}
break;
case TIME_ELAPSED:
compactable = inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - parsedToSeconds(latestDeltaCommitInfo.getRight());
if (compactable) {
LOG.info(String.format("The elapsed time >=%ss, trigger compaction scheduler.", inlineCompactDeltaSecondsMax));
}
break;
case NUM_OR_TIME:
compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft()
|| inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - parsedToSeconds(latestDeltaCommitInfo.getRight());
if (compactable) {
LOG.info(String.format("The delta commits >= %s or elapsed_time >=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax,
inlineCompactDeltaSecondsMax));
}
break;
case NUM_AND_TIME:
compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft()
&& inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - parsedToSeconds(latestDeltaCommitInfo.getRight());
if (compactable) {
LOG.info(String.format("The delta commits >= %s and elapsed_time >=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax,
inlineCompactDeltaSecondsMax));
}
break;
default:
throw new HoodieCompactionException("Unsupported compaction trigger strategy: " + config.getInlineCompactTriggerStrategy());
}
return compactable;
}
public Long parsedToSeconds(String time) {
long timestamp;
try {
timestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(time).getTime() / 1000;
} catch (ParseException e) {
throw new HoodieCompactionException(e.getMessage(), e);
}
return timestamp;
}
}

View File

@@ -0,0 +1,235 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static java.util.stream.Collectors.toList;
/**
* Compacts a hoodie table with merge on read storage. Computes all possible compactions,
* passes it through a CompactionFilter and executes all the compactions and writes a new version of base files and make
* a normal commit.
*
* <p>Note: the compaction logic is invoked through the flink pipeline.
*/
@SuppressWarnings("checkstyle:LineLength")
public class HoodieFlinkMergeOnReadTableCompactor<T extends HoodieRecordPayload> implements HoodieCompactor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(HoodieFlinkMergeOnReadTableCompactor.class);
// Accumulator to keep track of total log files for a table
private AtomicLong totalLogFiles;
// Accumulator to keep track of total log file slices for a table
private AtomicLong totalFileSlices;
@Override
public List<WriteStatus> compact(HoodieEngineContext context, HoodieCompactionPlan compactionPlan,
HoodieTable hoodieTable, HoodieWriteConfig config, String compactionInstantTime) throws IOException {
throw new UnsupportedOperationException("HoodieFlinkMergeOnReadTableCompactor does not support compact directly, "
+ "the function works as a separate pipeline");
}
public List<WriteStatus> compact(HoodieFlinkCopyOnWriteTable hoodieCopyOnWriteTable,
HoodieTableMetaClient metaClient,
HoodieWriteConfig config,
CompactionOperation operation,
String instantTime) throws IOException {
FileSystem fs = metaClient.getFs();
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
LOG.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames()
+ " for commit " + instantTime);
// TODO - FIX THIS
// Reads the entire avro file. Always only specific blocks should be read from the avro file
// (failure recover).
// Load all the delta commits since the last compaction commit and get all the blocks to be
// loaded and load it using CompositeAvroLogReader
// Since a DeltaCommit is not defined yet, reading all the records. revisit this soon.
String maxInstantTime = metaClient
.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp();
// TODO(danny): make it configurable
long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new FlinkTaskContextSupplier(null), config.getProps());
LOG.info("MaxMemoryPerCompaction => " + maxMemoryPerCompaction);
List<String> logFiles = operation.getDeltaFileNames().stream().map(
p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString())
.collect(toList());
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(metaClient.getBasePath())
.withLogFilePaths(logFiles)
.withReaderSchema(readerSchema)
.withLatestInstantTime(maxInstantTime)
.withMaxMemorySizeInBytes(maxMemoryPerCompaction)
.withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
.withReverseReader(config.getCompactionReverseLogReadEnabled())
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.build();
if (!scanner.iterator().hasNext()) {
return new ArrayList<>();
}
Option<HoodieBaseFile> oldDataFileOpt =
operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath());
// Compacting is very similar to applying updates to existing file
Iterator<List<WriteStatus>> result;
// If the dataFile is present, perform updates else perform inserts into a new base file.
if (oldDataFileOpt.isPresent()) {
result = hoodieCopyOnWriteTable.handleUpdate(instantTime, operation.getPartitionPath(),
operation.getFileId(), scanner.getRecords(),
oldDataFileOpt.get());
} else {
result = hoodieCopyOnWriteTable.handleInsert(instantTime, operation.getPartitionPath(), operation.getFileId(),
scanner.getRecords());
}
Iterable<List<WriteStatus>> resultIterable = () -> result;
return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream).peek(s -> {
s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles());
s.getStat().setTotalLogRecords(scanner.getTotalLogRecords());
s.getStat().setPartitionPath(operation.getPartitionPath());
s.getStat()
.setTotalLogSizeCompacted(operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue());
s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks());
s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks());
RuntimeStats runtimeStats = new RuntimeStats();
runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
s.getStat().setRuntimeStats(runtimeStats);
}).collect(toList());
}
@Override
public HoodieCompactionPlan generateCompactionPlan(HoodieEngineContext context,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable,
HoodieWriteConfig config, String compactionCommitTime,
Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering)
throws IOException {
totalLogFiles = new AtomicLong(0);
totalFileSlices = new AtomicLong(0);
ValidationUtils.checkArgument(hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ,
"Can only compact table of type " + HoodieTableType.MERGE_ON_READ + " and not "
+ hoodieTable.getMetaClient().getTableType().name());
// TODO : check if maxMemory is not greater than JVM or flink.executor memory
// TODO - rollback any compactions in flight
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
LOG.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime);
List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), metaClient.getBasePath());
// filter the partition paths if needed to reduce list status
partitionPaths = config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths);
if (partitionPaths.isEmpty()) {
// In case no partitions could be picked, return no compaction plan
return null;
}
SliceView fileSystemView = hoodieTable.getSliceView();
LOG.info("Compaction looking for files to compact in " + partitionPaths + " partitions");
context.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact");
List<HoodieCompactionOperation> operations = context.flatMap(partitionPaths, partitionPath -> fileSystemView
.getLatestFileSlices(partitionPath)
.filter(slice -> !fgIdsInPendingCompactionAndClustering.contains(slice.getFileGroupId()))
.map(s -> {
List<HoodieLogFile> logFiles =
s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
totalLogFiles.addAndGet(logFiles.size());
totalFileSlices.addAndGet(1L);
// Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO
// for flink Map operations and collecting them finally in Avro generated classes for storing
// into meta files.
Option<HoodieBaseFile> dataFile = s.getBaseFile();
return new CompactionOperation(dataFile, partitionPath, logFiles,
config.getCompactionStrategy().captureMetrics(config, s));
})
.filter(c -> !c.getDeltaFileNames().isEmpty()), partitionPaths.size()).stream().map(CompactionUtils::buildHoodieCompactionOperation).collect(toList());
LOG.info("Total of " + operations.size() + " compactions are retrieved");
LOG.info("Total number of latest files slices " + totalFileSlices.get());
LOG.info("Total number of log files " + totalLogFiles.get());
LOG.info("Total number of file slices " + totalFileSlices.get());
// Filter the compactions with the passed in filter. This lets us choose most effective
// compactions only
HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations,
CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList()));
ValidationUtils.checkArgument(
compactionPlan.getOperations().stream().noneMatch(
op -> fgIdsInPendingCompactionAndClustering.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))),
"Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. "
+ "Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactionAndClustering
+ ", Selected workload :" + compactionPlan);
if (compactionPlan.getOperations().isEmpty()) {
LOG.warn("After filtering, Nothing to compact for " + metaClient.getBasePath());
}
return compactionPlan;
}
}