diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java index 1f59bab26..4773ef157 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.List; public class KeyGenUtils { @@ -41,6 +42,32 @@ public class KeyGenUtils { protected static final String DEFAULT_PARTITION_PATH = "default"; protected static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/"; + /** + * Extracts the record key fields in strings out of the given record key, + * this is the reverse operation of {@link #getRecordKey(GenericRecord, String)}. + * + * @see SimpleAvroKeyGenerator + * @see org.apache.hudi.keygen.ComplexAvroKeyGenerator + */ + public static String[] extractRecordKeys(String recordKey) { + String[] fieldKV = recordKey.split(","); + if (fieldKV.length == 1) { + return fieldKV; + } else { + // a complex key + return Arrays.stream(fieldKV).map(kv -> { + final String[] kvArray = kv.split(":"); + if (kvArray[1].equals(NULL_RECORDKEY_PLACEHOLDER)) { + return null; + } else if (kvArray[1].equals(EMPTY_RECORDKEY_PLACEHOLDER)) { + return ""; + } else { + return kvArray[1]; + } + }).toArray(String[]::new); + } + } + public static String getRecordKey(GenericRecord record, List recordKeyFields) { boolean keyIsNullEmpty = true; StringBuilder recordKey = new StringBuilder(); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index de4d8ca65..6a6bcedbf 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -429,8 +429,8 @@ public class HoodieFlinkWriteClient extends HoodieFlinkTable table = getHoodieTable(); String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType)); HoodieActiveTimeline activeTimeline = table.getMetaClient().getActiveTimeline(); - activeTimeline.deletePending(HoodieInstant.State.INFLIGHT, commitType, instant); - activeTimeline.deletePending(HoodieInstant.State.REQUESTED, commitType, instant); + activeTimeline.deletePendingIfExists(HoodieInstant.State.INFLIGHT, commitType, instant); + activeTimeline.deletePendingIfExists(HoodieInstant.State.REQUESTED, commitType, instant); } public void transitionRequestedToInflight(String tableType, String inFlightInstant) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 25ea95e69..9ff50fe61 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -173,10 +173,10 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { deleteInstantFile(instant); } - public void deletePending(HoodieInstant.State state, String action, String instantStr) { + public void deletePendingIfExists(HoodieInstant.State state, String action, String instantStr) { HoodieInstant instant = new HoodieInstant(state, action, instantStr); ValidationUtils.checkArgument(!instant.isCompleted()); - deleteInstantFile(instant); + deleteInstantFileIfExists(instant); } public void deleteCompactionRequested(HoodieInstant instant) { @@ -185,6 +185,25 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { deleteInstantFile(instant); } + private void deleteInstantFileIfExists(HoodieInstant instant) { + LOG.info("Deleting instant " + instant); + Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), instant.getFileName()); + try { + if (metaClient.getFs().exists(inFlightCommitFilePath)) { + boolean result = metaClient.getFs().delete(inFlightCommitFilePath, false); + if (result) { + LOG.info("Removed instant " + instant); + } else { + throw new HoodieIOException("Could not delete instant " + instant); + } + } else { + LOG.warn("The commit " + inFlightCommitFilePath + " to remove does not exist"); + } + } catch (IOException e) { + throw new HoodieIOException("Could not remove inflight commit " + inFlightCommitFilePath, e); + } + } + private void deleteInstantFile(HoodieInstant instant) { LOG.info("Deleting instant " + instant); Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), instant.getFileName()); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 53104fbc0..364d28ebb 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -18,11 +18,8 @@ package org.apache.hudi.sink; -import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.ObjectSizeCalculator; @@ -160,8 +157,8 @@ public class StreamWriteFunction @Override public void open(Configuration parameters) throws IOException { this.taskID = getRuntimeContext().getIndexOfThisSubtask(); + this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext()); initBuffer(); - initWriteClient(); initWriteFunction(); } @@ -254,15 +251,6 @@ public class StreamWriteFunction this.addToBufferCondition = this.bufferLock.newCondition(); } - private void initWriteClient() { - HoodieFlinkEngineContext context = - new HoodieFlinkEngineContext( - new SerializableConfiguration(StreamerUtil.getHadoopConf()), - new FlinkTaskContextSupplier(getRuntimeContext())); - - writeClient = new HoodieFlinkWriteClient<>(context, StreamerUtil.getHoodieClientConfig(this.config)); - } - private void initWriteFunction() { final String writeOperation = this.config.get(FlinkOptions.OPERATION); switch (WriteOperationType.fromValue(writeOperation)) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index ad0661771..51149e2dc 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -18,11 +18,11 @@ package org.apache.hudi.sink; -import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.event.BatchWriteSuccessEvent; @@ -54,7 +54,6 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists; @@ -138,7 +137,7 @@ public class StreamWriteOperatorCoordinator // initialize event buffer reset(); // writeClient - initWriteClient(); + this.writeClient = StreamerUtil.createWriteClient(conf, null); // init table, create it if not exists. initTableIfNotExists(this.conf); // start a new instant @@ -178,7 +177,10 @@ public class StreamWriteOperatorCoordinator @Override public void notifyCheckpointComplete(long checkpointId) { // start to commit the instant. - checkAndCommitWithRetry(); + final String errorMsg = String.format("Instant [%s] has a complete checkpoint [%d],\n" + + "but the coordinator has not received full write success events,\n" + + "rolls back the instant and rethrow", this.instant, checkpointId); + checkAndForceCommit(errorMsg); // if async compaction is on, schedule the compaction if (needsScheduleCompaction) { writeClient.scheduleCompaction(Option.empty()); @@ -226,10 +228,14 @@ public class StreamWriteOperatorCoordinator @Override public void handleEventFromOperator(int i, OperatorEvent operatorEvent) { // no event to handle - Preconditions.checkState(operatorEvent instanceof BatchWriteSuccessEvent, + ValidationUtils.checkState(operatorEvent instanceof BatchWriteSuccessEvent, "The coordinator can only handle BatchWriteSuccessEvent"); BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent; - Preconditions.checkState(event.getInstantTime().equals(this.instant), + // the write task does not block after checkpointing(and before it receives a checkpoint success event), + // if it it checkpoints succeed then flushes the data buffer again before this coordinator receives a checkpoint + // success event, the data buffer would flush with an older instant time. + ValidationUtils.checkState( + HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()), String.format("Receive an unexpected event for instant %s from task %d", event.getInstantTime(), event.getTaskID())); if (this.eventBuffer[event.getTaskID()] != null) { @@ -258,14 +264,6 @@ public class StreamWriteOperatorCoordinator // Utilities // ------------------------------------------------------------------------- - @SuppressWarnings("rawtypes") - private void initWriteClient() { - writeClient = new HoodieFlinkWriteClient( - new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)), - StreamerUtil.getHoodieClientConfig(this.conf), - true); - } - private void initHiveSync() { this.executor = new NonThrownExecutor(); this.hiveSyncContext = HiveSyncContext.create(conf); @@ -338,51 +336,6 @@ public class StreamWriteOperatorCoordinator doCommit(); } - @SuppressWarnings("unchecked") - private void checkAndCommitWithRetry() { - int retryTimes = this.conf.getInteger(FlinkOptions.RETRY_TIMES); - if (retryTimes < 0) { - retryTimes = 1; - } - long retryIntervalMillis = this.conf.getLong(FlinkOptions.RETRY_INTERVAL_MS); - int tryTimes = 0; - while (tryTimes++ < retryTimes) { - try { - if (!checkReady()) { - // Do not throw if the try times expires but the event buffer are still not ready, - // because we have a force check when next checkpoint starts. - if (tryTimes == retryTimes) { - // Throw if the try times expires but the event buffer are still not ready - throw new HoodieException("Try " + retryTimes + " to commit instant [" + this.instant + "] failed"); - } - sleepFor(retryIntervalMillis); - continue; - } - doCommit(); - return; - } catch (Throwable throwable) { - String cause = throwable.getCause() == null ? "" : throwable.getCause().toString(); - LOG.warn("Try to commit the instant {} failed, with times {} and cause {}", this.instant, tryTimes, cause); - if (tryTimes == retryTimes) { - throw new HoodieException("Not all write tasks finish the batch write to commit", throwable); - } - sleepFor(retryIntervalMillis); - } - } - } - - /** - * Sleep {@code intervalMillis} milliseconds in current thread. - */ - private void sleepFor(long intervalMillis) { - try { - TimeUnit.MILLISECONDS.sleep(intervalMillis); - } catch (InterruptedException e) { - LOG.error("Thread interrupted while waiting to retry the instant commits"); - throw new HoodieException(e); - } - } - /** Checks the buffer is ready to commit. */ private boolean checkReady() { return Arrays.stream(eventBuffer) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java index d0fa51baa..7f4f7b97b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java @@ -18,11 +18,8 @@ package org.apache.hudi.sink.compact; -import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable; import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor; @@ -62,7 +59,7 @@ public class CompactFunction extends KeyedProcessFunction(context, StreamerUtil.getHoodieClientConfig(conf)); - } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java index 86529a124..86dae20d2 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java @@ -19,11 +19,8 @@ package org.apache.hudi.sink.compact; import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -86,7 +83,7 @@ public class CompactionCommitSink extends CleanFunction { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); - initWriteClient(); + this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); this.commitBuffer = new ArrayList<>(); } @@ -142,13 +139,4 @@ public class CompactionCommitSink extends CleanFunction { this.commitBuffer.clear(); this.compactionInstantTime = null; } - - private void initWriteClient() { - HoodieFlinkEngineContext context = - new HoodieFlinkEngineContext( - new SerializableConfiguration(StreamerUtil.getHadoopConf()), - new FlinkTaskContextSupplier(getRuntimeContext())); - - writeClient = new HoodieFlinkWriteClient<>(context, StreamerUtil.getHoodieClientConfig(this.conf)); - } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java index 6eebe9445..e48f4ed55 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java @@ -19,15 +19,14 @@ package org.apache.hudi.sink.compact; import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.HoodieFlinkWriteClient; -import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.StreamerUtil; @@ -37,6 +36,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.hadoop.fs.Path; import java.io.IOException; import java.util.List; @@ -74,7 +74,7 @@ public class CompactionPlanOperator extends AbstractStreamOperator> output) { this.output = output; } - - private void initWriteClient() { - HoodieFlinkEngineContext context = - new HoodieFlinkEngineContext( - new SerializableConfiguration(StreamerUtil.getHadoopConf()), - new FlinkTaskContextSupplier(getRuntimeContext())); - - writeClient = new HoodieFlinkWriteClient<>(context, StreamerUtil.getHoodieClientConfig(this.conf)); - } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/event/BatchWriteSuccessEvent.java b/hudi-flink/src/main/java/org/apache/hudi/sink/event/BatchWriteSuccessEvent.java index dd40de2a8..186a470ea 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/event/BatchWriteSuccessEvent.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/event/BatchWriteSuccessEvent.java @@ -35,7 +35,7 @@ public class BatchWriteSuccessEvent implements OperatorEvent { private List writeStatuses; private final int taskID; - private final String instantTime; + private String instantTime; private boolean isLastBatch; /** * Flag saying whether the event comes from the end of input, e.g. the source @@ -102,8 +102,9 @@ public class BatchWriteSuccessEvent implements OperatorEvent { * @param other The event to be merged */ public void mergeWith(BatchWriteSuccessEvent other) { - ValidationUtils.checkArgument(this.instantTime.equals(other.instantTime)); ValidationUtils.checkArgument(this.taskID == other.taskID); + // the instant time could be monotonically increasing + this.instantTime = other.instantTime; this.isLastBatch |= other.isLastBatch; // true if one of the event isLastBatch true. List statusList = new ArrayList<>(); statusList.addAll(this.writeStatuses); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index f92e1a8b5..ada9e01cd 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -191,9 +191,9 @@ public class HoodieTableSource implements } else { InputFormatSourceFunction func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo); DataStreamSource source = execEnv.addSource(func, asSummaryString(), typeInfo); - return source.name("streaming_source") + return source.name("bounded_source") .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)) - .uid("uid_streaming_source"); + .uid("uid_bounded_source"); } } }; @@ -363,21 +363,26 @@ public class HoodieTableSource implements requiredRowType, tableAvroSchema.toString(), AvroSchemaConverter.convertToSchema(requiredRowType).toString(), - inputSplits); - return new MergeOnReadInputFormat( - this.conf, - FilePathUtils.toFlinkPaths(paths), - hoodieTableState, - rowDataType.getChildren(), // use the explicit fields data type because the AvroSchemaConverter is not very stable. - "default", - this.limit); + inputSplits, + conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",")); + return MergeOnReadInputFormat.builder() + .config(this.conf) + .paths(FilePathUtils.toFlinkPaths(paths)) + .tableState(hoodieTableState) + // use the explicit fields data type because the AvroSchemaConverter + // is not very stable. + .fieldTypes(rowDataType.getChildren()) + .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME)) + .limit(this.limit) + .emitDelete(isStreaming) + .build(); case COPY_ON_WRITE: FileInputFormat format = new CopyOnWriteInputFormat( FilePathUtils.toFlinkPaths(paths), this.schema.getFieldNames(), this.schema.getFieldDataTypes(), this.requiredPos, - "default", + this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value getParquetConf(this.conf, this.hadoopConf), this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE) diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index a3a6f858a..12bebdf4d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.table.format.FilePathUtils; import org.apache.hudi.table.format.FormatUtils; import org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader; @@ -30,22 +31,28 @@ import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil; import org.apache.hudi.util.AvroToRowDataConverters; import org.apache.hudi.util.RowDataToAvroConverters; import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.util.StringToRowDataConverter; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.generic.IndexedRecord; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.io.DefaultInputSplitAssigner; import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; import java.io.IOException; +import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; @@ -116,13 +123,20 @@ public class MergeOnReadInputFormat */ private long currentReadCount = 0; - public MergeOnReadInputFormat( + /** + * Flag saying whether to emit the deletes. In streaming read mode, downstream + * operators need the delete messages to retract the legacy accumulator. + */ + private boolean emitDelete; + + private MergeOnReadInputFormat( Configuration conf, Path[] paths, MergeOnReadTableState tableState, List fieldTypes, String defaultPartName, - long limit) { + long limit, + boolean emitDelete) { this.conf = conf; this.paths = paths; this.tableState = tableState; @@ -133,6 +147,14 @@ public class MergeOnReadInputFormat // because we need to this.requiredPos = tableState.getRequiredPositions(); this.limit = limit; + this.emitDelete = emitDelete; + } + + /** + * Returns the builder for {@link MergeOnReadInputFormat}. + */ + public static Builder builder() { + return new Builder(); } @Override @@ -177,7 +199,7 @@ public class MergeOnReadInputFormat if (filePath == null) { throw new IllegalArgumentException("File path was not specified in input format or configuration."); } else { - this.paths = new Path[] { new Path(filePath) }; + this.paths = new Path[] {new Path(filePath)}; } } // may supports nested files in the future. @@ -269,6 +291,13 @@ public class MergeOnReadInputFormat final Map> logRecords = FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords(); final Iterator logRecordsKeyIterator = logRecords.keySet().iterator(); + final int[] pkOffset = tableState.getPkOffsetsInRequired(); + // flag saying whether the pk semantics has been dropped by user specified + // projections. For e.g, if the pk fields are [a, b] but user only select a, + // then the pk semantics is lost. + final boolean pkSemanticLost = Arrays.stream(pkOffset).anyMatch(offset -> offset == -1); + final LogicalType[] pkTypes = pkSemanticLost ? null : tableState.getPkTypes(pkOffset); + final StringToRowDataConverter converter = pkSemanticLost ? null : new StringToRowDataConverter(pkTypes); return new Iterator() { private RowData currentRecord; @@ -278,14 +307,30 @@ public class MergeOnReadInputFormat if (logRecordsKeyIterator.hasNext()) { String curAvrokey = logRecordsKeyIterator.next(); Option curAvroRecord = null; + final HoodieRecord hoodieRecord = logRecords.get(curAvrokey); try { - curAvroRecord = logRecords.get(curAvrokey).getData().getInsertValue(tableSchema); + curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema); } catch (IOException e) { throw new HoodieException("Get avro insert value error for key: " + curAvrokey, e); } if (!curAvroRecord.isPresent()) { - // delete record found, skipping - return hasNext(); + if (emitDelete && !pkSemanticLost) { + GenericRowData delete = new GenericRowData(tableState.getRequiredRowType().getFieldCount()); + + final String recordKey = hoodieRecord.getRecordKey(); + final String[] pkFields = KeyGenUtils.extractRecordKeys(recordKey); + final Object[] converted = converter.convert(pkFields); + for (int i = 0; i < pkOffset.length; i++) { + delete.setField(pkOffset[i], converted[i]); + } + delete.setRowKind(RowKind.DELETE); + + this.currentRecord = delete; + return true; + } else { + // delete record found, skipping + return hasNext(); + } } else { // should improve the code when log scanner supports // seeking by log blocks with commit time which is more @@ -318,6 +363,10 @@ public class MergeOnReadInputFormat }; } + // ------------------------------------------------------------------------- + // Inner Class + // ------------------------------------------------------------------------- + private interface RecordIterator { boolean reachedEnd() throws IOException; @@ -521,4 +570,62 @@ public class MergeOnReadInputFormat return logRecords.get(curKey).getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema); } } + + /** + * Builder for {@link MergeOnReadInputFormat}. + */ + public static class Builder { + private Configuration conf; + private Path[] paths; + private MergeOnReadTableState tableState; + private List fieldTypes; + private String defaultPartName; + private long limit = -1; + private boolean emitDelete = false; + + public Builder config(Configuration conf) { + this.conf = conf; + return this; + } + + public Builder paths(Path[] paths) { + this.paths = paths; + return this; + } + + public Builder tableState(MergeOnReadTableState tableState) { + this.tableState = tableState; + return this; + } + + public Builder fieldTypes(List fieldTypes) { + this.fieldTypes = fieldTypes; + return this; + } + + public Builder defaultPartName(String defaultPartName) { + this.defaultPartName = defaultPartName; + return this; + } + + public Builder limit(long limit) { + this.limit = limit; + return this; + } + + public Builder emitDelete(boolean emitDelete) { + this.emitDelete = emitDelete; + return this; + } + + public MergeOnReadInputFormat build() { + return new MergeOnReadInputFormat(conf, paths, tableState, + fieldTypes, defaultPartName, limit, emitDelete); + } + } + + @VisibleForTesting + public void isEmitDelete(boolean emitDelete) { + this.emitDelete = emitDelete; + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java index 7dedcdaa3..9a32af636 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java @@ -18,9 +18,11 @@ package org.apache.hudi.table.format.mor; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import java.io.Serializable; +import java.util.Arrays; import java.util.List; /** @@ -35,18 +37,21 @@ public class MergeOnReadTableState implements Serializable { private final String avroSchema; private final String requiredAvroSchema; private final List inputSplits; + private final String[] pkFields; public MergeOnReadTableState( RowType rowType, RowType requiredRowType, String avroSchema, String requiredAvroSchema, - List inputSplits) { + List inputSplits, + String[] pkFields) { this.rowType = rowType; this.requiredRowType = requiredRowType; this.avroSchema = avroSchema; this.requiredAvroSchema = requiredAvroSchema; this.inputSplits = inputSplits; + this.pkFields = pkFields; } public RowType getRowType() { @@ -76,4 +81,30 @@ public class MergeOnReadTableState implements Serializable { .mapToInt(i -> i) .toArray(); } + + /** + * Get the primary key positions in required row type. + */ + public int[] getPkOffsetsInRequired() { + final List fieldNames = requiredRowType.getFieldNames(); + return Arrays.stream(pkFields) + .map(fieldNames::indexOf) + .mapToInt(i -> i) + .toArray(); + } + + /** + * Returns the primary key fields logical type with given offsets. + * + * @param pkOffsets the pk offsets in required row type + * @return pk field logical types + * + * @see #getPkOffsetsInRequired() + */ + public LogicalType[] getPkTypes(int[] pkOffsets) { + final LogicalType[] requiredTypes = requiredRowType.getFields().stream() + .map(RowType.RowField::getType).toArray(LogicalType[]::new); + return Arrays.stream(pkOffsets).mapToObj(offset -> requiredTypes[offset]) + .toArray(LogicalType[]::new); + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java b/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java new file mode 100644 index 000000000..b30d6d6e7 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.common.util.ValidationUtils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; + +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.time.LocalDate; +import java.util.Arrays; + +/** + * A converter that converts a string array into internal row data fields. + * The converter is designed to be stateful(not pure stateless tool) + * in order to reuse the specific converters. + */ +@Internal +public class StringToRowDataConverter { + private final Converter[] converters; + + public StringToRowDataConverter(LogicalType[] fieldTypes) { + this.converters = Arrays.stream(fieldTypes) + .map(StringToRowDataConverter::getConverter) + .toArray(Converter[]::new); + } + + public Object[] convert(String[] fields) { + ValidationUtils.checkArgument(converters.length == fields.length, + "Field types and values should equal with number"); + + Object[] converted = new Object[fields.length]; + for (int i = 0; i < fields.length; i++) { + converted[i] = converters[i].convert(fields[i]); + } + return converted; + } + + private interface Converter { + Object convert(String field); + } + + private static Converter getConverter(LogicalType logicalType) { + switch (logicalType.getTypeRoot()) { + case NULL: + return field -> null; + case TINYINT: + return Byte::parseByte; + case SMALLINT: + return Short::parseShort; + case BOOLEAN: + return Boolean::parseBoolean; + case INTEGER: + case TIME_WITHOUT_TIME_ZONE: + return Integer::parseInt; + case BIGINT: + return Long::parseLong; + case FLOAT: + return Float::parseFloat; + case DOUBLE: + return Double::parseDouble; + case DATE: + // see HoodieAvroUtils#convertValueForAvroLogicalTypes + return field -> (int) LocalDate.parse(field).toEpochDay(); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return field -> TimestampData.fromEpochMillis(Long.parseLong(field)); + case CHAR: + case VARCHAR: + return StringData::fromString; + case BINARY: + case VARBINARY: + return field -> field.getBytes(StandardCharsets.UTF_8); + case DECIMAL: + DecimalType decimalType = (DecimalType) logicalType; + return field -> + DecimalData.fromBigDecimal( + new BigDecimal(field), + decimalType.getPrecision(), + decimalType.getScale()); + default: + throw new UnsupportedOperationException( + "Unsupported type " + logicalType.getTypeRoot() + " for " + StringToRowDataConverter.class.getName()); + } + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index 90874f1e6..321abfad7 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -137,7 +137,7 @@ public class TestStreamWriteOperatorCoordinator { } @Test - public void testCheckpointCompleteWithRetry() { + public void testCheckpointCompleteWithException() { final CompletableFuture future = new CompletableFuture<>(); coordinator.checkpointCoordinator(1, future); String inflightInstant = coordinator.getInstant(); @@ -149,7 +149,9 @@ public class TestStreamWriteOperatorCoordinator { coordinator.handleEventFromOperator(0, event); assertThrows(HoodieException.class, () -> coordinator.notifyCheckpointComplete(1), - "Try 3 to commit instant"); + "org.apache.hudi.exception.HoodieException: Instant [20210330153432] has a complete checkpoint [1],\n" + + "but the coordinator has not received full write success events,\n" + + "rolls back the instant and rethrow"); } @Test diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java index aaeed94e2..4abc79acd 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java @@ -260,15 +260,17 @@ public class TestStreamReadOperator { TestConfigurations.ROW_TYPE, tableAvroSchema.toString(), AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(), - Collections.emptyList()); + Collections.emptyList(), + new String[0]); Path[] paths = FilePathUtils.getReadPaths(new Path(basePath), conf, hadoopConf, partitionKeys); - MergeOnReadInputFormat inputFormat = new MergeOnReadInputFormat( - conf, - FilePathUtils.toFlinkPaths(paths), - hoodieTableState, - rowDataType.getChildren(), - "default", - 1000L); + MergeOnReadInputFormat inputFormat = MergeOnReadInputFormat.builder() + .config(conf) + .paths(FilePathUtils.toFlinkPaths(paths)) + .tableState(hoodieTableState) + .fieldTypes(rowDataType.getChildren()) + .defaultPartName("default").limit(1000L) + .emitDelete(true) + .build(); OneInputStreamOperatorFactory factory = StreamReadOperator.factory(inputFormat); OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>( diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 218c7420f..ffb985915 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -208,6 +208,41 @@ public class HoodieDataSourceITCase extends AbstractTestBase { "some commits should be cleaned"); } + @Test + void testStreamReadWithDeletes() throws Exception { + // create filesystem table named source + + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setString(FlinkOptions.TABLE_NAME, "t1"); + conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + + // write one commit + TestData.writeData(TestData.DATA_SET_INSERT, conf); + // write another commit with deletes + TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf); + + String latestCommit = StreamerUtil.createWriteClient(conf, null) + .getLastCompletedInstant(FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + + Map options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); + options.put(FlinkOptions.READ_STREAMING_CHECK_INTERVAL.key(), "2"); + options.put(FlinkOptions.READ_STREAMING_START_COMMIT.key(), latestCommit); + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + streamTableEnv.executeSql(hoodieTableDDL); + + List result = execSelectSql(streamTableEnv, "select * from t1", 10); + final String expected = "[" + + "id1,Danny,24,1970-01-01T00:00:00.001,par1, " + + "id2,Stephen,34,1970-01-01T00:00:00.002,par1, " + + "id3,null,null,null,null, " + + "id5,null,null,null,null, " + + "id9,null,null,null,null]"; + assertRowsEquals(result, expected); + } + @ParameterizedTest @EnumSource(value = ExecMode.class) void testWriteAndRead(ExecMode execMode) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index ae15abab3..5e324de67 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -21,6 +21,7 @@ package org.apache.hudi.table.format; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.table.HoodieTableSource; +import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; @@ -43,6 +44,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -151,6 +153,29 @@ public class TestInputFormat { assertThat(actual, is(expected)); } + @Test + void testReadWithDeletes() throws Exception { + beforeEach(HoodieTableType.MERGE_ON_READ); + + // write another commit to read again + TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf); + + InputFormat inputFormat = this.tableSource.getInputFormat(); + assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class)); + ((MergeOnReadInputFormat) inputFormat).isEmitDelete(true); + + List result = readData(inputFormat); + + final String actual = TestData.rowDataToString(result); + final String expected = "[" + + "id1,Danny,24,1970-01-01T00:00:00.001,par1, " + + "id2,Stephen,34,1970-01-01T00:00:00.002,par1, " + + "id3,null,null,null,null, " + + "id5,null,null,null,null, " + + "id9,null,null,null,null]"; + assertThat(actual, is(expected)); + } + @ParameterizedTest @EnumSource(value = HoodieTableType.class) void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception { diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index f8d4e7d6a..2201aeb37 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -224,6 +224,10 @@ public class TestData { funcWrapper.close(); } + private static String toStringSafely(Object obj) { + return obj == null ? "null" : obj.toString(); + } + /** * Sort the {@code rows} using field at index 0 and asserts * it equals with the expected string {@code expected}. @@ -233,7 +237,7 @@ public class TestData { */ public static void assertRowsEquals(List rows, String expected) { String rowsString = rows.stream() - .sorted(Comparator.comparing(o -> o.getField(0).toString())) + .sorted(Comparator.comparing(o -> toStringSafely(o.getField(0)))) .collect(Collectors.toList()).toString(); assertThat(rowsString, is(expected)); } @@ -247,7 +251,7 @@ public class TestData { */ public static void assertRowsEquals(List rows, List expected) { String rowsString = rows.stream() - .sorted(Comparator.comparing(o -> o.getField(0).toString())) + .sorted(Comparator.comparing(o -> toStringSafely(o.getField(0)))) .collect(Collectors.toList()).toString(); assertThat(rowsString, is(rowDataToString(expected))); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java new file mode 100644 index 000000000..dde19b848 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utils; + +import org.apache.hudi.keygen.KeyGenUtils; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.RowDataToAvroConverters; +import org.apache.hudi.util.StringToRowDataConverter; + +import org.apache.avro.generic.GenericRecord; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.temporal.ChronoField; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; + +/** + * Test cases for {@link StringToRowDataConverter}. + */ +public class TestStringToRowDataConverter { + @Test + void testConvert() { + String[] fields = new String[] {"1.1", "3.4", "2021-03-30", "56669000", "1617119069000", "12345.67"}; + LogicalType[] fieldTypes = new LogicalType[] { + DataTypes.FLOAT().getLogicalType(), + DataTypes.DOUBLE().getLogicalType(), + DataTypes.DATE().getLogicalType(), + DataTypes.TIME(3).getLogicalType(), + DataTypes.TIMESTAMP().getLogicalType(), + DataTypes.DECIMAL(7, 2).getLogicalType() + }; + StringToRowDataConverter converter = new StringToRowDataConverter(fieldTypes); + Object[] converted = converter.convert(fields); + Object[] expected = new Object[] { + 1.1f, 3.4D, (int) LocalDate.parse("2021-03-30").toEpochDay(), + LocalTime.parse("15:44:29").get(ChronoField.MILLI_OF_DAY), + TimestampData.fromEpochMillis(Instant.parse("2021-03-30T15:44:29Z").toEpochMilli()), + DecimalData.fromBigDecimal(new BigDecimal("12345.67"), 7, 2) + }; + assertArrayEquals(expected, converted); + } + + @Test + void testRowDataToAvroStringToRowData() { + GenericRowData rowData = new GenericRowData(6); + rowData.setField(0, 1.1f); + rowData.setField(1, 3.4D); + rowData.setField(2, (int) LocalDate.parse("2021-03-30").toEpochDay()); + rowData.setField(3, LocalTime.parse("15:44:29").get(ChronoField.MILLI_OF_DAY)); + rowData.setField(4, TimestampData.fromEpochMillis(Instant.parse("2021-03-30T15:44:29Z").toEpochMilli())); + rowData.setField(5, DecimalData.fromBigDecimal(new BigDecimal("12345.67"), 7, 2)); + + DataType dataType = DataTypes.ROW( + DataTypes.FIELD("f_float", DataTypes.FLOAT()), + DataTypes.FIELD("f_double", DataTypes.DOUBLE()), + DataTypes.FIELD("f_date", DataTypes.DATE()), + DataTypes.FIELD("f_time", DataTypes.TIME(3)), + DataTypes.FIELD("f_timestamp", DataTypes.TIMESTAMP(3)), + DataTypes.FIELD("f_decimal", DataTypes.DECIMAL(7, 2)) + ); + RowType rowType = (RowType) dataType.getLogicalType(); + RowDataToAvroConverters.RowDataToAvroConverter converter = + RowDataToAvroConverters.createConverter(rowType); + GenericRecord avroRecord = + (GenericRecord) converter.convert(AvroSchemaConverter.convertToSchema(rowType), rowData); + StringToRowDataConverter stringToRowDataConverter = + new StringToRowDataConverter(rowType.getChildren().toArray(new LogicalType[0])); + final String recordKey = KeyGenUtils.getRecordKey(avroRecord, rowType.getFieldNames()); + final String[] recordKeys = KeyGenUtils.extractRecordKeys(recordKey); + Object[] convertedKeys = stringToRowDataConverter.convert(recordKeys); + + GenericRowData converted = new GenericRowData(6); + for (int i = 0; i < 6; i++) { + converted.setField(i, convertedKeys[i]); + } + assertThat(converted, is(rowData)); + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java index b8d7de6d8..68e183b87 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java @@ -143,14 +143,9 @@ public class CollectSinkTableFactory implements DynamicTableSinkFactory { @Override public void invoke(RowData value, SinkFunction.Context context) { - if (value.getRowKind() == RowKind.INSERT) { - Row row = (Row) converter.toExternal(value); - assert row != null; - RESULT.get(taskID).add(row); - } else { - throw new RuntimeException( - "CollectSinkFunction received " + value.getRowKind() + " messages."); - } + Row row = (Row) converter.toExternal(value); + assert row != null; + RESULT.get(taskID).add(row); } @Override