[HUDI-1738] Emit deletes for flink MOR table streaming read (#2742)
Current we did a soft delete for DELETE row data when writes into hoodie table. For streaming read of MOR table, the Flink reader detects the delete records and still emit them if the record key semantics are still kept. This is useful and actually a must for streaming ETL pipeline incremental computation.
This commit is contained in:
@@ -31,6 +31,7 @@ import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.URLEncoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
public class KeyGenUtils {
|
||||
@@ -41,6 +42,32 @@ public class KeyGenUtils {
|
||||
protected static final String DEFAULT_PARTITION_PATH = "default";
|
||||
protected static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
|
||||
|
||||
/**
|
||||
* Extracts the record key fields in strings out of the given record key,
|
||||
* this is the reverse operation of {@link #getRecordKey(GenericRecord, String)}.
|
||||
*
|
||||
* @see SimpleAvroKeyGenerator
|
||||
* @see org.apache.hudi.keygen.ComplexAvroKeyGenerator
|
||||
*/
|
||||
public static String[] extractRecordKeys(String recordKey) {
|
||||
String[] fieldKV = recordKey.split(",");
|
||||
if (fieldKV.length == 1) {
|
||||
return fieldKV;
|
||||
} else {
|
||||
// a complex key
|
||||
return Arrays.stream(fieldKV).map(kv -> {
|
||||
final String[] kvArray = kv.split(":");
|
||||
if (kvArray[1].equals(NULL_RECORDKEY_PLACEHOLDER)) {
|
||||
return null;
|
||||
} else if (kvArray[1].equals(EMPTY_RECORDKEY_PLACEHOLDER)) {
|
||||
return "";
|
||||
} else {
|
||||
return kvArray[1];
|
||||
}
|
||||
}).toArray(String[]::new);
|
||||
}
|
||||
}
|
||||
|
||||
public static String getRecordKey(GenericRecord record, List<String> recordKeyFields) {
|
||||
boolean keyIsNullEmpty = true;
|
||||
StringBuilder recordKey = new StringBuilder();
|
||||
|
||||
@@ -429,8 +429,8 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
|
||||
HoodieFlinkTable<T> table = getHoodieTable();
|
||||
String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
|
||||
HoodieActiveTimeline activeTimeline = table.getMetaClient().getActiveTimeline();
|
||||
activeTimeline.deletePending(HoodieInstant.State.INFLIGHT, commitType, instant);
|
||||
activeTimeline.deletePending(HoodieInstant.State.REQUESTED, commitType, instant);
|
||||
activeTimeline.deletePendingIfExists(HoodieInstant.State.INFLIGHT, commitType, instant);
|
||||
activeTimeline.deletePendingIfExists(HoodieInstant.State.REQUESTED, commitType, instant);
|
||||
}
|
||||
|
||||
public void transitionRequestedToInflight(String tableType, String inFlightInstant) {
|
||||
|
||||
@@ -173,10 +173,10 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
deleteInstantFile(instant);
|
||||
}
|
||||
|
||||
public void deletePending(HoodieInstant.State state, String action, String instantStr) {
|
||||
public void deletePendingIfExists(HoodieInstant.State state, String action, String instantStr) {
|
||||
HoodieInstant instant = new HoodieInstant(state, action, instantStr);
|
||||
ValidationUtils.checkArgument(!instant.isCompleted());
|
||||
deleteInstantFile(instant);
|
||||
deleteInstantFileIfExists(instant);
|
||||
}
|
||||
|
||||
public void deleteCompactionRequested(HoodieInstant instant) {
|
||||
@@ -185,6 +185,25 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
deleteInstantFile(instant);
|
||||
}
|
||||
|
||||
private void deleteInstantFileIfExists(HoodieInstant instant) {
|
||||
LOG.info("Deleting instant " + instant);
|
||||
Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), instant.getFileName());
|
||||
try {
|
||||
if (metaClient.getFs().exists(inFlightCommitFilePath)) {
|
||||
boolean result = metaClient.getFs().delete(inFlightCommitFilePath, false);
|
||||
if (result) {
|
||||
LOG.info("Removed instant " + instant);
|
||||
} else {
|
||||
throw new HoodieIOException("Could not delete instant " + instant);
|
||||
}
|
||||
} else {
|
||||
LOG.warn("The commit " + inFlightCommitFilePath + " to remove does not exist");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Could not remove inflight commit " + inFlightCommitFilePath, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteInstantFile(HoodieInstant instant) {
|
||||
LOG.info("Deleting instant " + instant);
|
||||
Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), instant.getFileName());
|
||||
|
||||
@@ -18,11 +18,8 @@
|
||||
|
||||
package org.apache.hudi.sink;
|
||||
|
||||
import org.apache.hudi.client.FlinkTaskContextSupplier;
|
||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.common.util.ObjectSizeCalculator;
|
||||
@@ -160,8 +157,8 @@ public class StreamWriteFunction<K, I, O>
|
||||
@Override
|
||||
public void open(Configuration parameters) throws IOException {
|
||||
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
|
||||
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
|
||||
initBuffer();
|
||||
initWriteClient();
|
||||
initWriteFunction();
|
||||
}
|
||||
|
||||
@@ -254,15 +251,6 @@ public class StreamWriteFunction<K, I, O>
|
||||
this.addToBufferCondition = this.bufferLock.newCondition();
|
||||
}
|
||||
|
||||
private void initWriteClient() {
|
||||
HoodieFlinkEngineContext context =
|
||||
new HoodieFlinkEngineContext(
|
||||
new SerializableConfiguration(StreamerUtil.getHadoopConf()),
|
||||
new FlinkTaskContextSupplier(getRuntimeContext()));
|
||||
|
||||
writeClient = new HoodieFlinkWriteClient<>(context, StreamerUtil.getHoodieClientConfig(this.config));
|
||||
}
|
||||
|
||||
private void initWriteFunction() {
|
||||
final String writeOperation = this.config.get(FlinkOptions.OPERATION);
|
||||
switch (WriteOperationType.fromValue(writeOperation)) {
|
||||
|
||||
@@ -18,11 +18,11 @@
|
||||
|
||||
package org.apache.hudi.sink;
|
||||
|
||||
import org.apache.hudi.client.FlinkTaskContextSupplier;
|
||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
|
||||
@@ -54,7 +54,6 @@ import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists;
|
||||
@@ -138,7 +137,7 @@ public class StreamWriteOperatorCoordinator
|
||||
// initialize event buffer
|
||||
reset();
|
||||
// writeClient
|
||||
initWriteClient();
|
||||
this.writeClient = StreamerUtil.createWriteClient(conf, null);
|
||||
// init table, create it if not exists.
|
||||
initTableIfNotExists(this.conf);
|
||||
// start a new instant
|
||||
@@ -178,7 +177,10 @@ public class StreamWriteOperatorCoordinator
|
||||
@Override
|
||||
public void notifyCheckpointComplete(long checkpointId) {
|
||||
// start to commit the instant.
|
||||
checkAndCommitWithRetry();
|
||||
final String errorMsg = String.format("Instant [%s] has a complete checkpoint [%d],\n"
|
||||
+ "but the coordinator has not received full write success events,\n"
|
||||
+ "rolls back the instant and rethrow", this.instant, checkpointId);
|
||||
checkAndForceCommit(errorMsg);
|
||||
// if async compaction is on, schedule the compaction
|
||||
if (needsScheduleCompaction) {
|
||||
writeClient.scheduleCompaction(Option.empty());
|
||||
@@ -226,10 +228,14 @@ public class StreamWriteOperatorCoordinator
|
||||
@Override
|
||||
public void handleEventFromOperator(int i, OperatorEvent operatorEvent) {
|
||||
// no event to handle
|
||||
Preconditions.checkState(operatorEvent instanceof BatchWriteSuccessEvent,
|
||||
ValidationUtils.checkState(operatorEvent instanceof BatchWriteSuccessEvent,
|
||||
"The coordinator can only handle BatchWriteSuccessEvent");
|
||||
BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent;
|
||||
Preconditions.checkState(event.getInstantTime().equals(this.instant),
|
||||
// the write task does not block after checkpointing(and before it receives a checkpoint success event),
|
||||
// if it it checkpoints succeed then flushes the data buffer again before this coordinator receives a checkpoint
|
||||
// success event, the data buffer would flush with an older instant time.
|
||||
ValidationUtils.checkState(
|
||||
HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()),
|
||||
String.format("Receive an unexpected event for instant %s from task %d",
|
||||
event.getInstantTime(), event.getTaskID()));
|
||||
if (this.eventBuffer[event.getTaskID()] != null) {
|
||||
@@ -258,14 +264,6 @@ public class StreamWriteOperatorCoordinator
|
||||
// Utilities
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
private void initWriteClient() {
|
||||
writeClient = new HoodieFlinkWriteClient(
|
||||
new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)),
|
||||
StreamerUtil.getHoodieClientConfig(this.conf),
|
||||
true);
|
||||
}
|
||||
|
||||
private void initHiveSync() {
|
||||
this.executor = new NonThrownExecutor();
|
||||
this.hiveSyncContext = HiveSyncContext.create(conf);
|
||||
@@ -338,51 +336,6 @@ public class StreamWriteOperatorCoordinator
|
||||
doCommit();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void checkAndCommitWithRetry() {
|
||||
int retryTimes = this.conf.getInteger(FlinkOptions.RETRY_TIMES);
|
||||
if (retryTimes < 0) {
|
||||
retryTimes = 1;
|
||||
}
|
||||
long retryIntervalMillis = this.conf.getLong(FlinkOptions.RETRY_INTERVAL_MS);
|
||||
int tryTimes = 0;
|
||||
while (tryTimes++ < retryTimes) {
|
||||
try {
|
||||
if (!checkReady()) {
|
||||
// Do not throw if the try times expires but the event buffer are still not ready,
|
||||
// because we have a force check when next checkpoint starts.
|
||||
if (tryTimes == retryTimes) {
|
||||
// Throw if the try times expires but the event buffer are still not ready
|
||||
throw new HoodieException("Try " + retryTimes + " to commit instant [" + this.instant + "] failed");
|
||||
}
|
||||
sleepFor(retryIntervalMillis);
|
||||
continue;
|
||||
}
|
||||
doCommit();
|
||||
return;
|
||||
} catch (Throwable throwable) {
|
||||
String cause = throwable.getCause() == null ? "" : throwable.getCause().toString();
|
||||
LOG.warn("Try to commit the instant {} failed, with times {} and cause {}", this.instant, tryTimes, cause);
|
||||
if (tryTimes == retryTimes) {
|
||||
throw new HoodieException("Not all write tasks finish the batch write to commit", throwable);
|
||||
}
|
||||
sleepFor(retryIntervalMillis);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sleep {@code intervalMillis} milliseconds in current thread.
|
||||
*/
|
||||
private void sleepFor(long intervalMillis) {
|
||||
try {
|
||||
TimeUnit.MILLISECONDS.sleep(intervalMillis);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Thread interrupted while waiting to retry the instant commits");
|
||||
throw new HoodieException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/** Checks the buffer is ready to commit. */
|
||||
private boolean checkReady() {
|
||||
return Arrays.stream(eventBuffer)
|
||||
|
||||
@@ -18,11 +18,8 @@
|
||||
|
||||
package org.apache.hudi.sink.compact;
|
||||
|
||||
import org.apache.hudi.client.FlinkTaskContextSupplier;
|
||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||
import org.apache.hudi.common.model.CompactionOperation;
|
||||
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
|
||||
import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
|
||||
@@ -62,7 +59,7 @@ public class CompactFunction extends KeyedProcessFunction<Long, CompactionPlanEv
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
|
||||
initWriteClient();
|
||||
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -82,13 +79,4 @@ public class CompactFunction extends KeyedProcessFunction<Long, CompactionPlanEv
|
||||
instantTime);
|
||||
collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID));
|
||||
}
|
||||
|
||||
private void initWriteClient() {
|
||||
HoodieFlinkEngineContext context =
|
||||
new HoodieFlinkEngineContext(
|
||||
new SerializableConfiguration(StreamerUtil.getHadoopConf()),
|
||||
new FlinkTaskContextSupplier(getRuntimeContext()));
|
||||
|
||||
writeClient = new HoodieFlinkWriteClient<>(context, StreamerUtil.getHoodieClientConfig(conf));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,11 +19,8 @@
|
||||
package org.apache.hudi.sink.compact;
|
||||
|
||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||
import org.apache.hudi.client.FlinkTaskContextSupplier;
|
||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
@@ -86,7 +83,7 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
super.open(parameters);
|
||||
initWriteClient();
|
||||
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
|
||||
this.commitBuffer = new ArrayList<>();
|
||||
}
|
||||
|
||||
@@ -142,13 +139,4 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
|
||||
this.commitBuffer.clear();
|
||||
this.compactionInstantTime = null;
|
||||
}
|
||||
|
||||
private void initWriteClient() {
|
||||
HoodieFlinkEngineContext context =
|
||||
new HoodieFlinkEngineContext(
|
||||
new SerializableConfiguration(StreamerUtil.getHadoopConf()),
|
||||
new FlinkTaskContextSupplier(getRuntimeContext()));
|
||||
|
||||
writeClient = new HoodieFlinkWriteClient<>(context, StreamerUtil.getHoodieClientConfig(this.conf));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,15 +19,14 @@
|
||||
package org.apache.hudi.sink.compact;
|
||||
|
||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||
import org.apache.hudi.client.FlinkTaskContextSupplier;
|
||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||
import org.apache.hudi.common.model.CompactionOperation;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.CompactionUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.table.HoodieFlinkTable;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
@@ -37,6 +36,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
|
||||
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
|
||||
import org.apache.flink.streaming.api.operators.Output;
|
||||
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
@@ -74,7 +74,7 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
|
||||
@Override
|
||||
public void open() throws Exception {
|
||||
super.open();
|
||||
initWriteClient();
|
||||
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -113,8 +113,16 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
|
||||
HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
|
||||
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
|
||||
if (!pendingCompactionTimeline.containsInstant(instant)) {
|
||||
throw new IllegalStateException(
|
||||
"No Compaction request available at " + compactionInstantTime + " to run compaction");
|
||||
// this means that the compaction plan was written to auxiliary path(.tmp)
|
||||
// but not the meta path(.hoodie), this usually happens when the job crush
|
||||
// exceptionally.
|
||||
|
||||
// clean the compaction plan in auxiliary path and cancels the compaction.
|
||||
|
||||
LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
|
||||
+ "Clean the compaction plan in auxiliary path and cancels the compaction");
|
||||
cleanInstant(table.getMetaClient(), instant);
|
||||
return;
|
||||
}
|
||||
|
||||
// Mark instant as compaction inflight
|
||||
@@ -130,17 +138,24 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) {
|
||||
Path commitFilePath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName());
|
||||
try {
|
||||
if (metaClient.getFs().exists(commitFilePath)) {
|
||||
boolean deleted = metaClient.getFs().delete(commitFilePath, false);
|
||||
if (deleted) {
|
||||
LOG.info("Removed instant " + instant);
|
||||
} else {
|
||||
throw new HoodieIOException("Could not delete instant " + instant);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Could not remove requested commit " + commitFilePath, e);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setOutput(Output<StreamRecord<CompactionPlanEvent>> output) {
|
||||
this.output = output;
|
||||
}
|
||||
|
||||
private void initWriteClient() {
|
||||
HoodieFlinkEngineContext context =
|
||||
new HoodieFlinkEngineContext(
|
||||
new SerializableConfiguration(StreamerUtil.getHadoopConf()),
|
||||
new FlinkTaskContextSupplier(getRuntimeContext()));
|
||||
|
||||
writeClient = new HoodieFlinkWriteClient<>(context, StreamerUtil.getHoodieClientConfig(this.conf));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
|
||||
|
||||
private List<WriteStatus> writeStatuses;
|
||||
private final int taskID;
|
||||
private final String instantTime;
|
||||
private String instantTime;
|
||||
private boolean isLastBatch;
|
||||
/**
|
||||
* Flag saying whether the event comes from the end of input, e.g. the source
|
||||
@@ -102,8 +102,9 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
|
||||
* @param other The event to be merged
|
||||
*/
|
||||
public void mergeWith(BatchWriteSuccessEvent other) {
|
||||
ValidationUtils.checkArgument(this.instantTime.equals(other.instantTime));
|
||||
ValidationUtils.checkArgument(this.taskID == other.taskID);
|
||||
// the instant time could be monotonically increasing
|
||||
this.instantTime = other.instantTime;
|
||||
this.isLastBatch |= other.isLastBatch; // true if one of the event isLastBatch true.
|
||||
List<WriteStatus> statusList = new ArrayList<>();
|
||||
statusList.addAll(this.writeStatuses);
|
||||
|
||||
@@ -191,9 +191,9 @@ public class HoodieTableSource implements
|
||||
} else {
|
||||
InputFormatSourceFunction<RowData> func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo);
|
||||
DataStreamSource<RowData> source = execEnv.addSource(func, asSummaryString(), typeInfo);
|
||||
return source.name("streaming_source")
|
||||
return source.name("bounded_source")
|
||||
.setParallelism(conf.getInteger(FlinkOptions.READ_TASKS))
|
||||
.uid("uid_streaming_source");
|
||||
.uid("uid_bounded_source");
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -363,21 +363,26 @@ public class HoodieTableSource implements
|
||||
requiredRowType,
|
||||
tableAvroSchema.toString(),
|
||||
AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
|
||||
inputSplits);
|
||||
return new MergeOnReadInputFormat(
|
||||
this.conf,
|
||||
FilePathUtils.toFlinkPaths(paths),
|
||||
hoodieTableState,
|
||||
rowDataType.getChildren(), // use the explicit fields data type because the AvroSchemaConverter is not very stable.
|
||||
"default",
|
||||
this.limit);
|
||||
inputSplits,
|
||||
conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
|
||||
return MergeOnReadInputFormat.builder()
|
||||
.config(this.conf)
|
||||
.paths(FilePathUtils.toFlinkPaths(paths))
|
||||
.tableState(hoodieTableState)
|
||||
// use the explicit fields data type because the AvroSchemaConverter
|
||||
// is not very stable.
|
||||
.fieldTypes(rowDataType.getChildren())
|
||||
.defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
|
||||
.limit(this.limit)
|
||||
.emitDelete(isStreaming)
|
||||
.build();
|
||||
case COPY_ON_WRITE:
|
||||
FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
|
||||
FilePathUtils.toFlinkPaths(paths),
|
||||
this.schema.getFieldNames(),
|
||||
this.schema.getFieldDataTypes(),
|
||||
this.requiredPos,
|
||||
"default",
|
||||
this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
|
||||
this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value
|
||||
getParquetConf(this.conf, this.hadoopConf),
|
||||
this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE)
|
||||
|
||||
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.keygen.KeyGenUtils;
|
||||
import org.apache.hudi.table.format.FilePathUtils;
|
||||
import org.apache.hudi.table.format.FormatUtils;
|
||||
import org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader;
|
||||
@@ -30,22 +31,28 @@ import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil;
|
||||
import org.apache.hudi.util.AvroToRowDataConverters;
|
||||
import org.apache.hudi.util.RowDataToAvroConverters;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
import org.apache.hudi.util.StringToRowDataConverter;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.GenericRecordBuilder;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.flink.annotation.VisibleForTesting;
|
||||
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
|
||||
import org.apache.flink.api.common.io.RichInputFormat;
|
||||
import org.apache.flink.api.common.io.statistics.BaseStatistics;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.core.fs.Path;
|
||||
import org.apache.flink.core.io.InputSplitAssigner;
|
||||
import org.apache.flink.table.data.GenericRowData;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.apache.flink.table.types.DataType;
|
||||
import org.apache.flink.table.types.logical.LogicalType;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
import org.apache.flink.types.RowKind;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
@@ -116,13 +123,20 @@ public class MergeOnReadInputFormat
|
||||
*/
|
||||
private long currentReadCount = 0;
|
||||
|
||||
public MergeOnReadInputFormat(
|
||||
/**
|
||||
* Flag saying whether to emit the deletes. In streaming read mode, downstream
|
||||
* operators need the delete messages to retract the legacy accumulator.
|
||||
*/
|
||||
private boolean emitDelete;
|
||||
|
||||
private MergeOnReadInputFormat(
|
||||
Configuration conf,
|
||||
Path[] paths,
|
||||
MergeOnReadTableState tableState,
|
||||
List<DataType> fieldTypes,
|
||||
String defaultPartName,
|
||||
long limit) {
|
||||
long limit,
|
||||
boolean emitDelete) {
|
||||
this.conf = conf;
|
||||
this.paths = paths;
|
||||
this.tableState = tableState;
|
||||
@@ -133,6 +147,14 @@ public class MergeOnReadInputFormat
|
||||
// because we need to
|
||||
this.requiredPos = tableState.getRequiredPositions();
|
||||
this.limit = limit;
|
||||
this.emitDelete = emitDelete;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the builder for {@link MergeOnReadInputFormat}.
|
||||
*/
|
||||
public static Builder builder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -177,7 +199,7 @@ public class MergeOnReadInputFormat
|
||||
if (filePath == null) {
|
||||
throw new IllegalArgumentException("File path was not specified in input format or configuration.");
|
||||
} else {
|
||||
this.paths = new Path[] { new Path(filePath) };
|
||||
this.paths = new Path[] {new Path(filePath)};
|
||||
}
|
||||
}
|
||||
// may supports nested files in the future.
|
||||
@@ -269,6 +291,13 @@ public class MergeOnReadInputFormat
|
||||
final Map<String, HoodieRecord<? extends HoodieRecordPayload>> logRecords =
|
||||
FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords();
|
||||
final Iterator<String> logRecordsKeyIterator = logRecords.keySet().iterator();
|
||||
final int[] pkOffset = tableState.getPkOffsetsInRequired();
|
||||
// flag saying whether the pk semantics has been dropped by user specified
|
||||
// projections. For e.g, if the pk fields are [a, b] but user only select a,
|
||||
// then the pk semantics is lost.
|
||||
final boolean pkSemanticLost = Arrays.stream(pkOffset).anyMatch(offset -> offset == -1);
|
||||
final LogicalType[] pkTypes = pkSemanticLost ? null : tableState.getPkTypes(pkOffset);
|
||||
final StringToRowDataConverter converter = pkSemanticLost ? null : new StringToRowDataConverter(pkTypes);
|
||||
|
||||
return new Iterator<RowData>() {
|
||||
private RowData currentRecord;
|
||||
@@ -278,14 +307,30 @@ public class MergeOnReadInputFormat
|
||||
if (logRecordsKeyIterator.hasNext()) {
|
||||
String curAvrokey = logRecordsKeyIterator.next();
|
||||
Option<IndexedRecord> curAvroRecord = null;
|
||||
final HoodieRecord<?> hoodieRecord = logRecords.get(curAvrokey);
|
||||
try {
|
||||
curAvroRecord = logRecords.get(curAvrokey).getData().getInsertValue(tableSchema);
|
||||
curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
|
||||
} catch (IOException e) {
|
||||
throw new HoodieException("Get avro insert value error for key: " + curAvrokey, e);
|
||||
}
|
||||
if (!curAvroRecord.isPresent()) {
|
||||
// delete record found, skipping
|
||||
return hasNext();
|
||||
if (emitDelete && !pkSemanticLost) {
|
||||
GenericRowData delete = new GenericRowData(tableState.getRequiredRowType().getFieldCount());
|
||||
|
||||
final String recordKey = hoodieRecord.getRecordKey();
|
||||
final String[] pkFields = KeyGenUtils.extractRecordKeys(recordKey);
|
||||
final Object[] converted = converter.convert(pkFields);
|
||||
for (int i = 0; i < pkOffset.length; i++) {
|
||||
delete.setField(pkOffset[i], converted[i]);
|
||||
}
|
||||
delete.setRowKind(RowKind.DELETE);
|
||||
|
||||
this.currentRecord = delete;
|
||||
return true;
|
||||
} else {
|
||||
// delete record found, skipping
|
||||
return hasNext();
|
||||
}
|
||||
} else {
|
||||
// should improve the code when log scanner supports
|
||||
// seeking by log blocks with commit time which is more
|
||||
@@ -318,6 +363,10 @@ public class MergeOnReadInputFormat
|
||||
};
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Inner Class
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
private interface RecordIterator {
|
||||
boolean reachedEnd() throws IOException;
|
||||
|
||||
@@ -521,4 +570,62 @@ public class MergeOnReadInputFormat
|
||||
return logRecords.get(curKey).getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Builder for {@link MergeOnReadInputFormat}.
|
||||
*/
|
||||
public static class Builder {
|
||||
private Configuration conf;
|
||||
private Path[] paths;
|
||||
private MergeOnReadTableState tableState;
|
||||
private List<DataType> fieldTypes;
|
||||
private String defaultPartName;
|
||||
private long limit = -1;
|
||||
private boolean emitDelete = false;
|
||||
|
||||
public Builder config(Configuration conf) {
|
||||
this.conf = conf;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder paths(Path[] paths) {
|
||||
this.paths = paths;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder tableState(MergeOnReadTableState tableState) {
|
||||
this.tableState = tableState;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder fieldTypes(List<DataType> fieldTypes) {
|
||||
this.fieldTypes = fieldTypes;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder defaultPartName(String defaultPartName) {
|
||||
this.defaultPartName = defaultPartName;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder limit(long limit) {
|
||||
this.limit = limit;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder emitDelete(boolean emitDelete) {
|
||||
this.emitDelete = emitDelete;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MergeOnReadInputFormat build() {
|
||||
return new MergeOnReadInputFormat(conf, paths, tableState,
|
||||
fieldTypes, defaultPartName, limit, emitDelete);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void isEmitDelete(boolean emitDelete) {
|
||||
this.emitDelete = emitDelete;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,9 +18,11 @@
|
||||
|
||||
package org.apache.hudi.table.format.mor;
|
||||
|
||||
import org.apache.flink.table.types.logical.LogicalType;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@@ -35,18 +37,21 @@ public class MergeOnReadTableState implements Serializable {
|
||||
private final String avroSchema;
|
||||
private final String requiredAvroSchema;
|
||||
private final List<MergeOnReadInputSplit> inputSplits;
|
||||
private final String[] pkFields;
|
||||
|
||||
public MergeOnReadTableState(
|
||||
RowType rowType,
|
||||
RowType requiredRowType,
|
||||
String avroSchema,
|
||||
String requiredAvroSchema,
|
||||
List<MergeOnReadInputSplit> inputSplits) {
|
||||
List<MergeOnReadInputSplit> inputSplits,
|
||||
String[] pkFields) {
|
||||
this.rowType = rowType;
|
||||
this.requiredRowType = requiredRowType;
|
||||
this.avroSchema = avroSchema;
|
||||
this.requiredAvroSchema = requiredAvroSchema;
|
||||
this.inputSplits = inputSplits;
|
||||
this.pkFields = pkFields;
|
||||
}
|
||||
|
||||
public RowType getRowType() {
|
||||
@@ -76,4 +81,30 @@ public class MergeOnReadTableState implements Serializable {
|
||||
.mapToInt(i -> i)
|
||||
.toArray();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the primary key positions in required row type.
|
||||
*/
|
||||
public int[] getPkOffsetsInRequired() {
|
||||
final List<String> fieldNames = requiredRowType.getFieldNames();
|
||||
return Arrays.stream(pkFields)
|
||||
.map(fieldNames::indexOf)
|
||||
.mapToInt(i -> i)
|
||||
.toArray();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the primary key fields logical type with given offsets.
|
||||
*
|
||||
* @param pkOffsets the pk offsets in required row type
|
||||
* @return pk field logical types
|
||||
*
|
||||
* @see #getPkOffsetsInRequired()
|
||||
*/
|
||||
public LogicalType[] getPkTypes(int[] pkOffsets) {
|
||||
final LogicalType[] requiredTypes = requiredRowType.getFields().stream()
|
||||
.map(RowType.RowField::getType).toArray(LogicalType[]::new);
|
||||
return Arrays.stream(pkOffsets).mapToObj(offset -> requiredTypes[offset])
|
||||
.toArray(LogicalType[]::new);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,107 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.util;
|
||||
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
|
||||
import org.apache.flink.annotation.Internal;
|
||||
import org.apache.flink.table.data.DecimalData;
|
||||
import org.apache.flink.table.data.StringData;
|
||||
import org.apache.flink.table.data.TimestampData;
|
||||
import org.apache.flink.table.types.logical.DecimalType;
|
||||
import org.apache.flink.table.types.logical.LogicalType;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.LocalDate;
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
* A converter that converts a string array into internal row data fields.
|
||||
* The converter is designed to be stateful(not pure stateless tool)
|
||||
* in order to reuse the specific converters.
|
||||
*/
|
||||
@Internal
|
||||
public class StringToRowDataConverter {
|
||||
private final Converter[] converters;
|
||||
|
||||
public StringToRowDataConverter(LogicalType[] fieldTypes) {
|
||||
this.converters = Arrays.stream(fieldTypes)
|
||||
.map(StringToRowDataConverter::getConverter)
|
||||
.toArray(Converter[]::new);
|
||||
}
|
||||
|
||||
public Object[] convert(String[] fields) {
|
||||
ValidationUtils.checkArgument(converters.length == fields.length,
|
||||
"Field types and values should equal with number");
|
||||
|
||||
Object[] converted = new Object[fields.length];
|
||||
for (int i = 0; i < fields.length; i++) {
|
||||
converted[i] = converters[i].convert(fields[i]);
|
||||
}
|
||||
return converted;
|
||||
}
|
||||
|
||||
private interface Converter {
|
||||
Object convert(String field);
|
||||
}
|
||||
|
||||
private static Converter getConverter(LogicalType logicalType) {
|
||||
switch (logicalType.getTypeRoot()) {
|
||||
case NULL:
|
||||
return field -> null;
|
||||
case TINYINT:
|
||||
return Byte::parseByte;
|
||||
case SMALLINT:
|
||||
return Short::parseShort;
|
||||
case BOOLEAN:
|
||||
return Boolean::parseBoolean;
|
||||
case INTEGER:
|
||||
case TIME_WITHOUT_TIME_ZONE:
|
||||
return Integer::parseInt;
|
||||
case BIGINT:
|
||||
return Long::parseLong;
|
||||
case FLOAT:
|
||||
return Float::parseFloat;
|
||||
case DOUBLE:
|
||||
return Double::parseDouble;
|
||||
case DATE:
|
||||
// see HoodieAvroUtils#convertValueForAvroLogicalTypes
|
||||
return field -> (int) LocalDate.parse(field).toEpochDay();
|
||||
case TIMESTAMP_WITHOUT_TIME_ZONE:
|
||||
return field -> TimestampData.fromEpochMillis(Long.parseLong(field));
|
||||
case CHAR:
|
||||
case VARCHAR:
|
||||
return StringData::fromString;
|
||||
case BINARY:
|
||||
case VARBINARY:
|
||||
return field -> field.getBytes(StandardCharsets.UTF_8);
|
||||
case DECIMAL:
|
||||
DecimalType decimalType = (DecimalType) logicalType;
|
||||
return field ->
|
||||
DecimalData.fromBigDecimal(
|
||||
new BigDecimal(field),
|
||||
decimalType.getPrecision(),
|
||||
decimalType.getScale());
|
||||
default:
|
||||
throw new UnsupportedOperationException(
|
||||
"Unsupported type " + logicalType.getTypeRoot() + " for " + StringToRowDataConverter.class.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -137,7 +137,7 @@ public class TestStreamWriteOperatorCoordinator {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckpointCompleteWithRetry() {
|
||||
public void testCheckpointCompleteWithException() {
|
||||
final CompletableFuture<byte[]> future = new CompletableFuture<>();
|
||||
coordinator.checkpointCoordinator(1, future);
|
||||
String inflightInstant = coordinator.getInstant();
|
||||
@@ -149,7 +149,9 @@ public class TestStreamWriteOperatorCoordinator {
|
||||
coordinator.handleEventFromOperator(0, event);
|
||||
assertThrows(HoodieException.class,
|
||||
() -> coordinator.notifyCheckpointComplete(1),
|
||||
"Try 3 to commit instant");
|
||||
"org.apache.hudi.exception.HoodieException: Instant [20210330153432] has a complete checkpoint [1],\n"
|
||||
+ "but the coordinator has not received full write success events,\n"
|
||||
+ "rolls back the instant and rethrow");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -260,15 +260,17 @@ public class TestStreamReadOperator {
|
||||
TestConfigurations.ROW_TYPE,
|
||||
tableAvroSchema.toString(),
|
||||
AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(),
|
||||
Collections.emptyList());
|
||||
Collections.emptyList(),
|
||||
new String[0]);
|
||||
Path[] paths = FilePathUtils.getReadPaths(new Path(basePath), conf, hadoopConf, partitionKeys);
|
||||
MergeOnReadInputFormat inputFormat = new MergeOnReadInputFormat(
|
||||
conf,
|
||||
FilePathUtils.toFlinkPaths(paths),
|
||||
hoodieTableState,
|
||||
rowDataType.getChildren(),
|
||||
"default",
|
||||
1000L);
|
||||
MergeOnReadInputFormat inputFormat = MergeOnReadInputFormat.builder()
|
||||
.config(conf)
|
||||
.paths(FilePathUtils.toFlinkPaths(paths))
|
||||
.tableState(hoodieTableState)
|
||||
.fieldTypes(rowDataType.getChildren())
|
||||
.defaultPartName("default").limit(1000L)
|
||||
.emitDelete(true)
|
||||
.build();
|
||||
|
||||
OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory(inputFormat);
|
||||
OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> harness = new OneInputStreamOperatorTestHarness<>(
|
||||
|
||||
@@ -208,6 +208,41 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
||||
"some commits should be cleaned");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testStreamReadWithDeletes() throws Exception {
|
||||
// create filesystem table named source
|
||||
|
||||
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
|
||||
conf.setString(FlinkOptions.TABLE_NAME, "t1");
|
||||
conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
|
||||
|
||||
// write one commit
|
||||
TestData.writeData(TestData.DATA_SET_INSERT, conf);
|
||||
// write another commit with deletes
|
||||
TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
|
||||
|
||||
String latestCommit = StreamerUtil.createWriteClient(conf, null)
|
||||
.getLastCompletedInstant(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
|
||||
|
||||
Map<String, String> options = new HashMap<>();
|
||||
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
|
||||
options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
|
||||
options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
|
||||
options.put(FlinkOptions.READ_STREAMING_CHECK_INTERVAL.key(), "2");
|
||||
options.put(FlinkOptions.READ_STREAMING_START_COMMIT.key(), latestCommit);
|
||||
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
|
||||
streamTableEnv.executeSql(hoodieTableDDL);
|
||||
|
||||
List<Row> result = execSelectSql(streamTableEnv, "select * from t1", 10);
|
||||
final String expected = "["
|
||||
+ "id1,Danny,24,1970-01-01T00:00:00.001,par1, "
|
||||
+ "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
|
||||
+ "id3,null,null,null,null, "
|
||||
+ "id5,null,null,null,null, "
|
||||
+ "id9,null,null,null,null]";
|
||||
assertRowsEquals(result, expected);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(value = ExecMode.class)
|
||||
void testWriteAndRead(ExecMode execMode) {
|
||||
|
||||
@@ -21,6 +21,7 @@ package org.apache.hudi.table.format;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.table.HoodieTableSource;
|
||||
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
import org.apache.hudi.utils.TestConfigurations;
|
||||
import org.apache.hudi.utils.TestData;
|
||||
@@ -43,6 +44,7 @@ import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
|
||||
@@ -151,6 +153,29 @@ public class TestInputFormat {
|
||||
assertThat(actual, is(expected));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testReadWithDeletes() throws Exception {
|
||||
beforeEach(HoodieTableType.MERGE_ON_READ);
|
||||
|
||||
// write another commit to read again
|
||||
TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
|
||||
|
||||
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
|
||||
assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
|
||||
((MergeOnReadInputFormat) inputFormat).isEmitDelete(true);
|
||||
|
||||
List<RowData> result = readData(inputFormat);
|
||||
|
||||
final String actual = TestData.rowDataToString(result);
|
||||
final String expected = "["
|
||||
+ "id1,Danny,24,1970-01-01T00:00:00.001,par1, "
|
||||
+ "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
|
||||
+ "id3,null,null,null,null, "
|
||||
+ "id5,null,null,null,null, "
|
||||
+ "id9,null,null,null,null]";
|
||||
assertThat(actual, is(expected));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(value = HoodieTableType.class)
|
||||
void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception {
|
||||
|
||||
@@ -224,6 +224,10 @@ public class TestData {
|
||||
funcWrapper.close();
|
||||
}
|
||||
|
||||
private static String toStringSafely(Object obj) {
|
||||
return obj == null ? "null" : obj.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sort the {@code rows} using field at index 0 and asserts
|
||||
* it equals with the expected string {@code expected}.
|
||||
@@ -233,7 +237,7 @@ public class TestData {
|
||||
*/
|
||||
public static void assertRowsEquals(List<Row> rows, String expected) {
|
||||
String rowsString = rows.stream()
|
||||
.sorted(Comparator.comparing(o -> o.getField(0).toString()))
|
||||
.sorted(Comparator.comparing(o -> toStringSafely(o.getField(0))))
|
||||
.collect(Collectors.toList()).toString();
|
||||
assertThat(rowsString, is(expected));
|
||||
}
|
||||
@@ -247,7 +251,7 @@ public class TestData {
|
||||
*/
|
||||
public static void assertRowsEquals(List<Row> rows, List<RowData> expected) {
|
||||
String rowsString = rows.stream()
|
||||
.sorted(Comparator.comparing(o -> o.getField(0).toString()))
|
||||
.sorted(Comparator.comparing(o -> toStringSafely(o.getField(0))))
|
||||
.collect(Collectors.toList()).toString();
|
||||
assertThat(rowsString, is(rowDataToString(expected)));
|
||||
}
|
||||
|
||||
@@ -0,0 +1,107 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.utils;
|
||||
|
||||
import org.apache.hudi.keygen.KeyGenUtils;
|
||||
import org.apache.hudi.util.AvroSchemaConverter;
|
||||
import org.apache.hudi.util.RowDataToAvroConverters;
|
||||
import org.apache.hudi.util.StringToRowDataConverter;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.flink.table.api.DataTypes;
|
||||
import org.apache.flink.table.data.DecimalData;
|
||||
import org.apache.flink.table.data.GenericRowData;
|
||||
import org.apache.flink.table.data.TimestampData;
|
||||
import org.apache.flink.table.types.DataType;
|
||||
import org.apache.flink.table.types.logical.LogicalType;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalTime;
|
||||
import java.time.temporal.ChronoField;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
|
||||
/**
|
||||
* Test cases for {@link StringToRowDataConverter}.
|
||||
*/
|
||||
public class TestStringToRowDataConverter {
|
||||
@Test
|
||||
void testConvert() {
|
||||
String[] fields = new String[] {"1.1", "3.4", "2021-03-30", "56669000", "1617119069000", "12345.67"};
|
||||
LogicalType[] fieldTypes = new LogicalType[] {
|
||||
DataTypes.FLOAT().getLogicalType(),
|
||||
DataTypes.DOUBLE().getLogicalType(),
|
||||
DataTypes.DATE().getLogicalType(),
|
||||
DataTypes.TIME(3).getLogicalType(),
|
||||
DataTypes.TIMESTAMP().getLogicalType(),
|
||||
DataTypes.DECIMAL(7, 2).getLogicalType()
|
||||
};
|
||||
StringToRowDataConverter converter = new StringToRowDataConverter(fieldTypes);
|
||||
Object[] converted = converter.convert(fields);
|
||||
Object[] expected = new Object[] {
|
||||
1.1f, 3.4D, (int) LocalDate.parse("2021-03-30").toEpochDay(),
|
||||
LocalTime.parse("15:44:29").get(ChronoField.MILLI_OF_DAY),
|
||||
TimestampData.fromEpochMillis(Instant.parse("2021-03-30T15:44:29Z").toEpochMilli()),
|
||||
DecimalData.fromBigDecimal(new BigDecimal("12345.67"), 7, 2)
|
||||
};
|
||||
assertArrayEquals(expected, converted);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRowDataToAvroStringToRowData() {
|
||||
GenericRowData rowData = new GenericRowData(6);
|
||||
rowData.setField(0, 1.1f);
|
||||
rowData.setField(1, 3.4D);
|
||||
rowData.setField(2, (int) LocalDate.parse("2021-03-30").toEpochDay());
|
||||
rowData.setField(3, LocalTime.parse("15:44:29").get(ChronoField.MILLI_OF_DAY));
|
||||
rowData.setField(4, TimestampData.fromEpochMillis(Instant.parse("2021-03-30T15:44:29Z").toEpochMilli()));
|
||||
rowData.setField(5, DecimalData.fromBigDecimal(new BigDecimal("12345.67"), 7, 2));
|
||||
|
||||
DataType dataType = DataTypes.ROW(
|
||||
DataTypes.FIELD("f_float", DataTypes.FLOAT()),
|
||||
DataTypes.FIELD("f_double", DataTypes.DOUBLE()),
|
||||
DataTypes.FIELD("f_date", DataTypes.DATE()),
|
||||
DataTypes.FIELD("f_time", DataTypes.TIME(3)),
|
||||
DataTypes.FIELD("f_timestamp", DataTypes.TIMESTAMP(3)),
|
||||
DataTypes.FIELD("f_decimal", DataTypes.DECIMAL(7, 2))
|
||||
);
|
||||
RowType rowType = (RowType) dataType.getLogicalType();
|
||||
RowDataToAvroConverters.RowDataToAvroConverter converter =
|
||||
RowDataToAvroConverters.createConverter(rowType);
|
||||
GenericRecord avroRecord =
|
||||
(GenericRecord) converter.convert(AvroSchemaConverter.convertToSchema(rowType), rowData);
|
||||
StringToRowDataConverter stringToRowDataConverter =
|
||||
new StringToRowDataConverter(rowType.getChildren().toArray(new LogicalType[0]));
|
||||
final String recordKey = KeyGenUtils.getRecordKey(avroRecord, rowType.getFieldNames());
|
||||
final String[] recordKeys = KeyGenUtils.extractRecordKeys(recordKey);
|
||||
Object[] convertedKeys = stringToRowDataConverter.convert(recordKeys);
|
||||
|
||||
GenericRowData converted = new GenericRowData(6);
|
||||
for (int i = 0; i < 6; i++) {
|
||||
converted.setField(i, convertedKeys[i]);
|
||||
}
|
||||
assertThat(converted, is(rowData));
|
||||
}
|
||||
}
|
||||
@@ -143,14 +143,9 @@ public class CollectSinkTableFactory implements DynamicTableSinkFactory {
|
||||
|
||||
@Override
|
||||
public void invoke(RowData value, SinkFunction.Context context) {
|
||||
if (value.getRowKind() == RowKind.INSERT) {
|
||||
Row row = (Row) converter.toExternal(value);
|
||||
assert row != null;
|
||||
RESULT.get(taskID).add(row);
|
||||
} else {
|
||||
throw new RuntimeException(
|
||||
"CollectSinkFunction received " + value.getRowKind() + " messages.");
|
||||
}
|
||||
Row row = (Row) converter.toExternal(value);
|
||||
assert row != null;
|
||||
RESULT.get(taskID).add(row);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user