1
0

[HUDI-1771] Propagate CDC format for hoodie (#3285)

This commit is contained in:
swuferhong
2021-08-10 20:23:23 +08:00
committed by GitHub
parent b4441abcf7
commit 21db6d7a84
50 changed files with 1081 additions and 199 deletions

View File

@@ -76,6 +76,17 @@ public class FlinkOptions extends HoodieConfig {
.withDescription("The default partition name in case the dynamic partition"
+ " column value is null/empty string");
public static final ConfigOption<Boolean> CHANGELOG_ENABLED = ConfigOptions
.key("changelog.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Whether to keep all the intermediate changes, "
+ "we try to keep all the changes of a record when enabled:\n"
+ "1). The sink accept the UPDATE_BEFORE message;\n"
+ "2). The source try to emit every changes of a record.\n"
+ "The semantics is best effort because the compaction job would finally merge all changes of a record into one.\n"
+ " default false to have UPSERT semantics");
// ------------------------------------------------------------------------
// Metadata table Options
// ------------------------------------------------------------------------

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.sink;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -360,23 +361,26 @@ public class StreamWriteFunction<K, I, O>
private final String key; // record key
private final String instant; // 'U' or 'I'
private final HoodieRecordPayload<?> data; // record payload
private final HoodieOperation operation; // operation
private DataItem(String key, String instant, HoodieRecordPayload<?> data) {
private DataItem(String key, String instant, HoodieRecordPayload<?> data, HoodieOperation operation) {
this.key = key;
this.instant = instant;
this.data = data;
this.operation = operation;
}
public static DataItem fromHoodieRecord(HoodieRecord<?> record) {
return new DataItem(
record.getRecordKey(),
record.getCurrentLocation().getInstantTime(),
record.getData());
record.getData(),
record.getOperation());
}
public HoodieRecord<?> toHoodieRecord(String partitionPath) {
HoodieKey hoodieKey = new HoodieKey(this.key, partitionPath);
HoodieRecord<?> record = new HoodieRecord<>(hoodieKey, data);
HoodieRecord<?> record = new HoodieRecord<>(hoodieKey, data, operation);
HoodieRecordLocation loc = new HoodieRecordLocation(instant, null);
record.setCurrentLocation(loc);
return record;
@@ -417,7 +421,7 @@ public class StreamWriteFunction<K, I, O>
public void preWrite(List<HoodieRecord> records) {
// rewrite the first record with expected fileID
HoodieRecord<?> first = records.get(0);
HoodieRecord<?> record = new HoodieRecord<>(first.getKey(), first.getData());
HoodieRecord<?> record = new HoodieRecord<>(first.getKey(), first.getData(), first.getOperation());
HoodieRecordLocation newLoc = new HoodieRecordLocation(first.getCurrentLocation().getInstantTime(), fileID);
record.setCurrentLocation(newLoc);

View File

@@ -203,7 +203,7 @@ public class BootstrapFunction<I, O extends HoodieRecord>
.filter(logFile -> isValidFile(logFile.getFileStatus()))
.map(logFile -> logFile.getPath().toString())
.collect(toList());
HoodieMergedLogRecordScanner scanner = FormatUtils.scanLog(logPaths, schema, latestCommitTime.get().getTimestamp(),
HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(),
writeConfig, hadoopConf);
try {

View File

@@ -71,7 +71,7 @@ public class BulkInsertWriterHelper {
this.taskPartitionId = taskPartitionId;
this.taskId = taskId;
this.taskEpochId = taskEpochId;
this.rowType = addMetadataFields(rowType); // patch up with metadata fields
this.rowType = addMetadataFields(rowType, writeConfig.allowOperationMetadataField()); // patch up with metadata fields
this.arePartitionRecordsSorted = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION);
this.fileIdPrefix = UUID.randomUUID().toString();
this.keyGen = RowDataKeyGen.instance(conf, rowType);
@@ -141,7 +141,7 @@ public class BulkInsertWriterHelper {
/**
* Adds the Hoodie metadata fields to the given row type.
*/
private static RowType addMetadataFields(RowType rowType) {
private static RowType addMetadataFields(RowType rowType, boolean withOperationField) {
List<RowType.RowField> mergedFields = new ArrayList<>();
LogicalType metadataFieldType = DataTypes.STRING().getLogicalType();
@@ -161,6 +161,13 @@ public class BulkInsertWriterHelper {
mergedFields.add(recordKeyField);
mergedFields.add(partitionPathField);
mergedFields.add(fileNameField);
if (withOperationField) {
RowType.RowField operationField =
new RowType.RowField(HoodieRecord.OPERATION_METADATA_FIELD, metadataFieldType, "operation");
mergedFields.add(operationField);
}
mergedFields.addAll(rowType.getFields());
return new RowType(false, mergedFields);

View File

@@ -20,8 +20,6 @@ package org.apache.hudi.sink.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
@@ -57,26 +55,14 @@ public class CompactionPlanSourceFunction extends AbstractRichFunction implement
/**
* Compaction instant time.
*/
private String compactionInstantTime;
/**
* Hoodie flink table.
*/
private HoodieFlinkTable<?> table;
private final String compactionInstantTime;
/**
* The compaction plan.
*/
private HoodieCompactionPlan compactionPlan;
private final HoodieCompactionPlan compactionPlan;
/**
* Hoodie instant.
*/
private HoodieInstant instant;
public CompactionPlanSourceFunction(HoodieFlinkTable<?> table, HoodieInstant instant, HoodieCompactionPlan compactionPlan, String compactionInstantTime) {
this.table = table;
this.instant = instant;
public CompactionPlanSourceFunction(HoodieCompactionPlan compactionPlan, String compactionInstantTime) {
this.compactionPlan = compactionPlan;
this.compactionInstantTime = compactionInstantTime;
}

View File

@@ -133,9 +133,8 @@ public class HoodieFlinkCompactor {
// Mark instant as compaction inflight
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
table.getMetaClient().reloadActiveTimeline();
env.addSource(new CompactionPlanSourceFunction(table, instant, compactionPlan, compactionInstantTime))
env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime))
.name("compaction_source")
.uid("uid_compaction_source")
.rebalance()

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.sink.transform;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.configuration.FlinkOptions;
@@ -34,7 +35,6 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import java.io.IOException;
@@ -108,9 +108,9 @@ public class RowDataToHoodieFunction<I extends RowData, O extends HoodieRecord>
private HoodieRecord toHoodieRecord(I record) throws Exception {
GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record);
final HoodieKey hoodieKey = keyGenerator.getKey(gr);
// nullify the payload insert data to mark the record as a DELETE
final boolean isDelete = record.getRowKind() == RowKind.DELETE;
HoodieRecordPayload payload = payloadCreation.createPayload(gr, isDelete);
return new HoodieRecord<>(hoodieKey, payload);
HoodieRecordPayload payload = payloadCreation.createPayload(gr);
HoodieOperation operation = HoodieOperation.fromValue(record.getRowKind().toByteValue());
return new HoodieRecord<>(hoodieKey, payload, operation);
}
}

View File

@@ -86,6 +86,7 @@ public class HiveSyncContext {
hiveSyncConfig.decodePartition = conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING);
hiveSyncConfig.skipROSuffix = conf.getBoolean(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX);
hiveSyncConfig.assumeDatePartitioning = conf.getBoolean(FlinkOptions.HIVE_SYNC_ASSUME_DATE_PARTITION);
hiveSyncConfig.withOperationField = conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED);
return hiveSyncConfig;
}
}

View File

@@ -71,13 +71,12 @@ public class PayloadCreation implements Serializable {
return new PayloadCreation(shouldCombine, constructor, preCombineField);
}
public HoodieRecordPayload<?> createPayload(GenericRecord record, boolean isDelete) throws Exception {
public HoodieRecordPayload<?> createPayload(GenericRecord record) throws Exception {
if (shouldCombine) {
ValidationUtils.checkState(preCombineField != null);
Comparable<?> orderingVal = (Comparable<?>) HoodieAvroUtils.getNestedFieldVal(record,
preCombineField, false);
return (HoodieRecordPayload<?>) constructor.newInstance(
isDelete ? null : record, orderingVal);
return (HoodieRecordPayload<?>) constructor.newInstance(record, orderingVal);
} else {
return (HoodieRecordPayload<?>) this.constructor.newInstance(Option.of(record));
}

View File

@@ -34,6 +34,7 @@ import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
import org.apache.hudi.util.ChangelogModes;
import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.util.StreamerUtil;
@@ -52,7 +53,6 @@ import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode$;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import java.util.Map;
@@ -185,12 +185,11 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
@Override
public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
// ignore RowKind.UPDATE_BEFORE
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.DELETE)
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_AFTER)
.build();
if (conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) {
return ChangelogModes.FULL;
} else {
return ChangelogModes.UPSERT;
}
}
@Override

View File

@@ -37,6 +37,7 @@ import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadTableState;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.ChangelogModes;
import org.apache.hudi.util.StreamerUtil;
import org.apache.avro.Schema;
@@ -196,7 +197,12 @@ public class HoodieTableSource implements
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
return conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
&& !conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)
? ChangelogModes.FULL
// when all the changes are persisted or read as batch,
// use INSERT mode.
: ChangelogMode.insertOnly();
}
@Override
@@ -309,7 +315,7 @@ public class HoodieTableSource implements
return new CollectionInputFormat<>(Collections.emptyList(), null);
}
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
final Schema tableAvroSchema;
try {
tableAvroSchema = schemaUtil.getTableAvroSchema();

View File

@@ -19,19 +19,32 @@
package org.apache.hudi.table.format;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.generic.IndexedRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@@ -45,6 +58,39 @@ public class FormatUtils {
private FormatUtils() {
}
/**
* Sets up the row kind to the row data {@code rowData} from the resolved operation.
*/
public static void setRowKind(RowData rowData, IndexedRecord record, int index) {
if (index == -1) {
return;
}
rowData.setRowKind(getRowKind(record, index));
}
/**
* Returns the RowKind of the given record, never null.
* Returns RowKind.INSERT when the given field value not found.
*/
private static RowKind getRowKind(IndexedRecord record, int index) {
Object val = record.get(index);
if (val == null) {
return RowKind.INSERT;
}
final HoodieOperation operation = HoodieOperation.fromName(val.toString());
if (HoodieOperation.isInsert(operation)) {
return RowKind.INSERT;
} else if (HoodieOperation.isUpdateBefore(operation)) {
return RowKind.UPDATE_BEFORE;
} else if (HoodieOperation.isUpdateAfter(operation)) {
return RowKind.UPDATE_AFTER;
} else if (HoodieOperation.isDelete(operation)) {
return RowKind.DELETE;
} else {
throw new AssertionError();
}
}
public static GenericRecord buildAvroRecordBySchema(
IndexedRecord record,
Schema requiredSchema,
@@ -57,10 +103,11 @@ public class FormatUtils {
return recordBuilder.build();
}
public static HoodieMergedLogRecordScanner scanLog(
public static HoodieMergedLogRecordScanner logScanner(
MergeOnReadInputSplit split,
Schema logSchema,
Configuration config) {
Configuration config,
boolean withOperationField) {
FileSystem fs = FSUtils.getFs(split.getTablePath(), config);
return HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
@@ -81,10 +128,88 @@ public class FormatUtils {
config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
.withInstantRange(split.getInstantRange())
.withOperationField(withOperationField)
.build();
}
public static HoodieMergedLogRecordScanner scanLog(
private static HoodieUnMergedLogRecordScanner unMergedLogScanner(
MergeOnReadInputSplit split,
Schema logSchema,
Configuration config,
HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback) {
FileSystem fs = FSUtils.getFs(split.getTablePath(), config);
return HoodieUnMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(split.getTablePath())
.withLogFilePaths(split.getLogPaths().get())
.withReaderSchema(logSchema)
.withLatestInstantTime(split.getLatestCommit())
.withReadBlocksLazily(
string2Boolean(
config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
.withReverseReader(false)
.withBufferSize(
config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
.withInstantRange(split.getInstantRange())
.withLogRecordScannerCallback(callback)
.build();
}
/**
* Utility to read and buffer the records in the unMerged log record scanner.
*/
public static class BoundedMemoryRecords {
// Log Record unmerged scanner
private final HoodieUnMergedLogRecordScanner scanner;
// Executor that runs the above producers in parallel
private final BoundedInMemoryExecutor<HoodieRecord<?>, HoodieRecord<?>, ?> executor;
// Iterator for the buffer consumer
private final Iterator<HoodieRecord<?>> iterator;
public BoundedMemoryRecords(
MergeOnReadInputSplit split,
Schema logSchema,
Configuration hadoopConf) {
this.executor = new BoundedInMemoryExecutor<>(
HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(new JobConf(hadoopConf)),
getParallelProducers(),
Option.empty(),
x -> x,
new DefaultSizeEstimator<>());
// Consumer of this record reader
this.iterator = this.executor.getQueue().iterator();
this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, hadoopConf,
record -> executor.getQueue().insertRecord(record));
// Start reading and buffering
this.executor.startProducers();
}
public Iterator<HoodieRecord<?>> getRecordsIterator() {
return this.iterator;
}
/**
* Setup log and parquet reading in parallel. Both write to central buffer.
*/
private List<BoundedInMemoryQueueProducer<HoodieRecord<?>>> getParallelProducers() {
List<BoundedInMemoryQueueProducer<HoodieRecord<?>>> producers = new ArrayList<>();
producers.add(new FunctionBasedQueueProducer<>(buffer -> {
scanner.scan();
return null;
}));
return producers;
}
public void close() {
this.executor.shutdownNow();
}
}
public static HoodieMergedLogRecordScanner logScanner(
List<String> logPaths,
Schema logSchema,
String latestInstantTime,

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.table.format.mor;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.InstantRange;
@@ -176,7 +177,11 @@ public class MergeOnReadInputFormat
}
} else if (!split.getBasePath().isPresent()) {
// log files only
this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
if (conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) {
this.iterator = new LogFileOnlyIterator(getUnMergedLogFileIterator(split));
} else {
this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
}
} else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) {
this.iterator = new SkipMergeIterator(
getRequiredSchemaReader(split.getBasePath().get()),
@@ -190,6 +195,9 @@ public class MergeOnReadInputFormat
new Schema.Parser().parse(this.tableState.getAvroSchema()),
new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
this.requiredPos,
this.emitDelete,
this.conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED),
this.tableState.getOperationPos(),
getFullSchemaReader(split.getBasePath().get()));
} else {
throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for "
@@ -298,7 +306,7 @@ public class MergeOnReadInputFormat
final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
final HoodieMergedLogRecordScanner scanner = FormatUtils.scanLog(split, tableSchema, hadoopConf);
final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, hadoopConf, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
final Iterator<String> logRecordsKeyIterator = scanner.getRecords().keySet().iterator();
final int[] pkOffset = tableState.getPkOffsetsInRequired();
// flag saying whether the pk semantics has been dropped by user specified
@@ -335,18 +343,20 @@ public class MergeOnReadInputFormat
}
delete.setRowKind(RowKind.DELETE);
this.currentRecord = delete;
this.currentRecord = delete;
return true;
}
// skipping if the condition is unsatisfied
// continue;
} else {
final IndexedRecord avroRecord = curAvroRecord.get();
GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
curAvroRecord.get(),
avroRecord,
requiredSchema,
requiredPos,
recordBuilder);
currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
FormatUtils.setRowKind(currentRecord, avroRecord, tableState.getOperationPos());
return true;
}
}
@@ -365,6 +375,55 @@ public class MergeOnReadInputFormat
};
}
private ClosableIterator<RowData> getUnMergedLogFileIterator(MergeOnReadInputSplit split) {
final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema());
final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema());
final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
final FormatUtils.BoundedMemoryRecords records = new FormatUtils.BoundedMemoryRecords(split, tableSchema, hadoopConf);
final Iterator<HoodieRecord<?>> recordsIterator = records.getRecordsIterator();
return new ClosableIterator<RowData>() {
private RowData currentRecord;
@Override
public boolean hasNext() {
while (recordsIterator.hasNext()) {
Option<IndexedRecord> curAvroRecord = null;
final HoodieRecord<?> hoodieRecord = recordsIterator.next();
try {
curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
} catch (IOException e) {
throw new HoodieException("Get avro insert value error for key: " + hoodieRecord.getRecordKey(), e);
}
if (curAvroRecord.isPresent()) {
final IndexedRecord avroRecord = curAvroRecord.get();
GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
avroRecord,
requiredSchema,
requiredPos,
recordBuilder);
currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
FormatUtils.setRowKind(currentRecord, avroRecord, tableState.getOperationPos());
return true;
}
}
return false;
}
@Override
public RowData next() {
return currentRecord;
}
@Override
public void close() {
records.close();
}
};
}
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
@@ -544,6 +603,8 @@ public class MergeOnReadInputFormat
private final Schema tableSchema;
private final Schema requiredSchema;
private final int[] requiredPos;
private final boolean emitDelete;
private final int operationPos;
private final RowDataToAvroConverters.RowDataToAvroConverter rowDataToAvroConverter;
private final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
private final GenericRecordBuilder recordBuilder;
@@ -557,7 +618,7 @@ public class MergeOnReadInputFormat
// refactor it out once FLINK-22370 is resolved.
private boolean readLogs = false;
private Set<String> keyToSkip = new HashSet<>();
private final Set<String> keyToSkip = new HashSet<>();
private RowData currentRecord;
@@ -569,13 +630,18 @@ public class MergeOnReadInputFormat
Schema tableSchema,
Schema requiredSchema,
int[] requiredPos,
boolean emitDelete,
boolean withOperationField,
int operationPos,
ParquetColumnarRowSplitReader reader) { // the reader should be with full schema
this.tableSchema = tableSchema;
this.reader = reader;
this.scanner = FormatUtils.scanLog(split, tableSchema, hadoopConf);
this.scanner = FormatUtils.logScanner(split, tableSchema, hadoopConf, withOperationField);
this.logKeysIterator = scanner.getRecords().keySet().iterator();
this.requiredSchema = requiredSchema;
this.requiredPos = requiredPos;
this.emitDelete = emitDelete;
this.operationPos = operationPos;
this.recordBuilder = new GenericRecordBuilder(requiredSchema);
this.rowDataToAvroConverter = RowDataToAvroConverters.createConverter(tableRowType);
this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType);
@@ -602,12 +668,13 @@ public class MergeOnReadInputFormat
// deleted
continue;
} else {
GenericRecord record = buildAvroRecordBySchema(
GenericRecord avroRecord = buildAvroRecordBySchema(
mergedAvroRecord.get(),
requiredSchema,
requiredPos,
recordBuilder);
this.currentRecord = (RowData) avroToRowDataConverter.convert(record);
this.currentRecord = (RowData) avroToRowDataConverter.convert(avroRecord);
FormatUtils.setRowKind(this.currentRecord, mergedAvroRecord.get(), this.operationPos);
return false;
}
}
@@ -620,16 +687,16 @@ public class MergeOnReadInputFormat
while (logKeysIterator.hasNext()) {
final String curKey = logKeysIterator.next();
if (!keyToSkip.contains(curKey)) {
Option<IndexedRecord> insertAvroRecord =
scanner.getRecords().get(curKey).getData().getInsertValue(tableSchema);
Option<IndexedRecord> insertAvroRecord = getInsertValue(curKey);
if (insertAvroRecord.isPresent()) {
// the record is a DELETE if insertAvroRecord not present, skipping
GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
GenericRecord avroRecord = buildAvroRecordBySchema(
insertAvroRecord.get(),
requiredSchema,
requiredPos,
recordBuilder);
this.currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
this.currentRecord = (RowData) avroToRowDataConverter.convert(avroRecord);
FormatUtils.setRowKind(this.currentRecord, insertAvroRecord.get(), this.operationPos);
return false;
}
}
@@ -637,6 +704,14 @@ public class MergeOnReadInputFormat
return true;
}
private Option<IndexedRecord> getInsertValue(String curKey) throws IOException {
final HoodieRecord<?> record = scanner.getRecords().get(curKey);
if (!emitDelete && HoodieOperation.isDelete(record.getOperation())) {
return Option.empty();
}
return record.getData().getInsertValue(tableSchema);
}
@Override
public RowData nextRecord() {
return currentRecord;
@@ -655,8 +730,12 @@ public class MergeOnReadInputFormat
private Option<IndexedRecord> mergeRowWithLog(
RowData curRow,
String curKey) throws IOException {
final HoodieRecord<?> record = scanner.getRecords().get(curKey);
if (!emitDelete && HoodieOperation.isDelete(record.getOperation())) {
return Option.empty();
}
GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow);
return scanner.getRecords().get(curKey).getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema);
return record.getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema);
}
}

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.table.format.mor;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -38,6 +40,7 @@ public class MergeOnReadTableState implements Serializable {
private final String requiredAvroSchema;
private final List<MergeOnReadInputSplit> inputSplits;
private final String[] pkFields;
private final int operationPos;
public MergeOnReadTableState(
RowType rowType,
@@ -52,6 +55,7 @@ public class MergeOnReadTableState implements Serializable {
this.requiredAvroSchema = requiredAvroSchema;
this.inputSplits = inputSplits;
this.pkFields = pkFields;
this.operationPos = rowType.getFieldIndex(HoodieRecord.OPERATION_METADATA_FIELD);
}
public RowType getRowType() {
@@ -74,6 +78,10 @@ public class MergeOnReadTableState implements Serializable {
return inputSplits;
}
public int getOperationPos() {
return operationPos;
}
public int[] getRequiredPositions() {
final List<String> fieldNames = rowType.getFieldNames();
return requiredRowType.getFieldNames().stream()

View File

@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.util;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.RowKind;
/**
* Utilities for all kinds of common {@link org.apache.flink.table.connector.ChangelogMode}s.
*/
public class ChangelogModes {
public static final ChangelogMode FULL = ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
/**
* Change log mode that ignores UPDATE_BEFORE, e.g UPSERT.
*/
public static final ChangelogMode UPSERT = ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
private ChangelogModes() {
}
}

View File

@@ -186,6 +186,7 @@ public class StreamerUtil {
.build())
.withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton
.withAutoCommit(false)
.withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
.withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));
builder = builder.withSchema(getSourceSchema(conf).toString());

View File

@@ -38,9 +38,9 @@ import org.apache.hudi.sink.compact.CompactionPlanSourceFunction;
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.CompactionUtil;
@@ -85,6 +85,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Integration test for Flink Hoodie stream sink.
*/
@@ -200,7 +202,9 @@ public class StreamWriteITCase extends TestLogger {
// To compute the compaction instant time and do compaction.
String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient);
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
assertTrue(scheduled, "The compaction plan should be scheduled");
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
// generate compaction plan
@@ -209,8 +213,10 @@ public class StreamWriteITCase extends TestLogger {
table.getMetaClient(), compactionInstantTime);
HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
// Mark instant as compaction inflight
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
env.addSource(new CompactionPlanSourceFunction(table, instant, compactionPlan, compactionInstantTime))
env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime))
.name("compaction_source")
.uid("uid_compaction_source")
.rebalance()

View File

@@ -399,33 +399,29 @@ public class TestWriteCopyOnWrite {
// the coordinator checkpoint commits the inflight instant.
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
Map<String, String> expected = new HashMap<>();
// id3, id5 were deleted and id9 is ignored
expected.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]");
expected.put("par2", "[id4,par2,id4,Fabian,31,4,par2]");
expected.put("par3", "[id6,par3,id6,Emma,20,6,par3]");
expected.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
Map<String, String> expected = getUpsertWithDeleteExpected();
checkWrittenData(tempFile, expected);
}
@Test
public void testInsertWithMiniBatches() throws Exception {
// reset the config option
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0008); // 839 bytes batch size
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data
funcWrapper.openFunction();
// Each record is 208 bytes. so 4 records expect to trigger a mini-batch write
// record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
// so 3 records expect to trigger a mini-batch write
for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
funcWrapper.invoke(rowData);
}
Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
assertThat("2 records expect to flush out as a mini-batch",
assertThat("3 records expect to flush out as a mini-batch",
dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
is(2));
is(3));
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
@@ -472,22 +468,23 @@ public class TestWriteCopyOnWrite {
@Test
public void testInsertWithDeduplication() throws Exception {
// reset the config option
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0008); // 839 bytes batch size
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true);
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data
funcWrapper.openFunction();
// Each record is 208 bytes. so 4 records expect to trigger a mini-batch write
// record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
// so 3 records expect to trigger a mini-batch write
for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
funcWrapper.invoke(rowData);
}
Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
assertThat("2 records expect to flush out as a mini-batch",
assertThat("3 records expect to flush out as a mini-batch",
dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
is(2));
is(3));
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
@@ -612,12 +609,13 @@ public class TestWriteCopyOnWrite {
@Test
public void testInsertWithSmallBufferSize() throws Exception {
// reset the config option
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0008); // 839 bytes buffer size
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data
funcWrapper.openFunction();
// each record is 208 bytes. so 4 records expect to trigger buffer flush:
// record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
// so 3 records expect to trigger a mini-batch write
// flush the max size bucket once at a time.
for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
funcWrapper.invoke(rowData);
@@ -625,9 +623,9 @@ public class TestWriteCopyOnWrite {
Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
assertThat("2 records expect to flush out as a mini-batch",
assertThat("3 records expect to flush out as a mini-batch",
dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
is(2));
is(3));
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
@@ -676,8 +674,17 @@ public class TestWriteCopyOnWrite {
// the last 2 lines are merged
expected.put("par1", "["
+ "id1,par1,id1,Danny,23,1,par1, "
+ "id1,par1,id1,Danny,23,1,par1, "
+ "id1,par1,id1,Danny,23,1,par1]");
+ "id1,par1,id1,Danny,23,1,par1" + "]");
return expected;
}
protected Map<String, String> getUpsertWithDeleteExpected() {
Map<String, String> expected = new HashMap<>();
// id3, id5 were deleted and id9 is ignored
expected.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]");
expected.put("par2", "[id4,par2,id4,Fabian,31,4,par2]");
expected.put("par3", "[id6,par3,id6,Emma,20,6,par3]");
expected.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
return expected;
}

View File

@@ -47,7 +47,7 @@ public class TestRowDataKeyGen {
assertThat(keyGen1.getPartitionPath(rowData1), is("par1"));
// null record key and partition path
final RowData rowData2 = insertRow(null, StringData.fromString("Danny"), 23,
final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE, null, StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1), null);
assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData2));
assertThat(keyGen1.getPartitionPath(rowData2), is("default"));
@@ -77,7 +77,7 @@ public class TestRowDataKeyGen {
assertThat(keyGen1.getPartitionPath(rowData1), is("par1/1970-01-01T00:00:00.001"));
// null record key and partition path
final RowData rowData2 = insertRow(null, null, 23, null, null);
final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE,null, null, 23, null, null);
assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData2));
assertThat(keyGen1.getPartitionPath(rowData2), is("default/default"));
// empty record key and partition path

View File

@@ -261,6 +261,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setString(FlinkOptions.TABLE_NAME, "t1");
conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
conf.setBoolean(FlinkOptions.CHANGELOG_ENABLED, true);
// write one commit
TestData.writeData(TestData.DATA_SET_INSERT, conf);
@@ -276,17 +277,20 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
options.put(FlinkOptions.READ_STREAMING_CHECK_INTERVAL.key(), "2");
options.put(FlinkOptions.READ_STREAMING_START_COMMIT.key(), latestCommit);
options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
streamTableEnv.executeSql(hoodieTableDDL);
List<Row> result = execSelectSql(streamTableEnv, "select * from t1", 10);
final String expected = "["
+ "id1,Danny,24,1970-01-01T00:00:00.001,par1, "
+ "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
+ "id3,null,null,null,null, "
+ "id5,null,null,null,null, "
+ "id9,null,null,null,null]";
assertRowsEquals(result, expected);
final String sinkDDL = "create table sink(\n"
+ " name varchar(20),\n"
+ " age_sum int\n"
+ ") with (\n"
+ " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'"
+ ")";
List<Row> result = execSelectSql(streamTableEnv,
"select name, sum(age) from t1 group by name", sinkDDL, 10);
final String expected = "[+I(Danny,24), +I(Stephen,34)]";
assertRowsEquals(result, expected, true);
}
@ParameterizedTest
@@ -724,6 +728,11 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
} else {
sinkDDL = TestConfigurations.getCollectSinkDDL("sink");
}
return execSelectSql(tEnv, select, sinkDDL, timeout);
}
private List<Row> execSelectSql(TableEnvironment tEnv, String select, String sinkDDL, long timeout)
throws InterruptedException {
tEnv.executeSql(sinkDDL);
TableResult tableResult = tEnv.executeSql("insert into sink " + select);
// wait for the timeout then cancels the job
@@ -731,7 +740,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
tableResult.getJobClient().ifPresent(JobClient::cancel);
tEnv.executeSql("DROP TABLE IF EXISTS sink");
return CollectSinkTableFactory.RESULT.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
}

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.table.format;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.table.HoodieTableSource;
import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
@@ -60,9 +61,14 @@ public class TestInputFormat {
File tempFile;
void beforeEach(HoodieTableType tableType) throws IOException {
beforeEach(tableType, Collections.emptyMap());
}
void beforeEach(HoodieTableType tableType, Map<String, String> options) throws IOException {
conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // close the async compaction
options.forEach((key, value) -> conf.setString(key, value));
StreamerUtil.initTableIfNotExists(conf);
this.tableSource = new HoodieTableSource(
@@ -163,8 +169,62 @@ public class TestInputFormat {
}
@Test
void testReadWithDeletes() throws Exception {
beforeEach(HoodieTableType.MERGE_ON_READ);
void testReadBaseAndLogFilesWithDeletes() throws Exception {
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
beforeEach(HoodieTableType.MERGE_ON_READ, options);
// write base first with compaction.
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
TestData.writeData(TestData.DATA_SET_INSERT, conf);
// write another commit using logs and read again.
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
// when isEmitDelete is false.
List<RowData> result1 = readData(inputFormat);
final String actual1 = TestData.rowDataToString(result1, true);
final String expected1 = "["
+ "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), "
+ "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1), "
+ "+I(id4,Fabian,31,1970-01-01T00:00:00.004,par2), "
+ "+I(id6,Emma,20,1970-01-01T00:00:00.006,par3), "
+ "+I(id7,Bob,44,1970-01-01T00:00:00.007,par4), "
+ "+I(id8,Han,56,1970-01-01T00:00:00.008,par4)]";
assertThat(actual1, is(expected1));
// refresh the input format and set isEmitDelete to true.
this.tableSource.reset();
inputFormat = this.tableSource.getInputFormat();
((MergeOnReadInputFormat) inputFormat).isEmitDelete(true);
List<RowData> result2 = readData(inputFormat);
final String actual2 = TestData.rowDataToString(result2, true);
final String expected2 = "["
+ "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), "
+ "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1), "
+ "-D(id3,Julian,53,1970-01-01T00:00:00.003,par2), "
+ "+I(id4,Fabian,31,1970-01-01T00:00:00.004,par2), "
+ "-D(id5,Sophia,18,1970-01-01T00:00:00.005,par3), "
+ "+I(id6,Emma,20,1970-01-01T00:00:00.006,par3), "
+ "+I(id7,Bob,44,1970-01-01T00:00:00.007,par4), "
+ "+I(id8,Han,56,1970-01-01T00:00:00.008,par4), "
+ "-D(id9,Jane,19,1970-01-01T00:00:00.006,par3)]";
assertThat(actual2, is(expected2));
}
@Test
void testReadWithDeletesMOR() throws Exception {
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
beforeEach(HoodieTableType.MERGE_ON_READ, options);
// write another commit to read again
TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
@@ -175,13 +235,32 @@ public class TestInputFormat {
List<RowData> result = readData(inputFormat);
final String actual = TestData.rowDataToString(result);
final String actual = TestData.rowDataToString(result, true);
final String expected = "["
+ "id1,Danny,24,1970-01-01T00:00:00.001,par1, "
+ "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
+ "id3,null,null,null,null, "
+ "id5,null,null,null,null, "
+ "id9,null,null,null,null]";
+ "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), "
+ "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1), "
+ "-D(id3,Julian,53,1970-01-01T00:00:00.003,par2), "
+ "-D(id5,Sophia,18,1970-01-01T00:00:00.005,par3), "
+ "-D(id9,Jane,19,1970-01-01T00:00:00.006,par3)]";
assertThat(actual, is(expected));
}
@Test
void testReadWithDeletesCOW() throws Exception {
beforeEach(HoodieTableType.COPY_ON_WRITE);
// write another commit to read again
TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
assertThat(inputFormat, instanceOf(CopyOnWriteInputFormat.class));
List<RowData> result = readData(inputFormat);
final String actual = TestData.rowDataToString(result, true);
final String expected = "["
+ "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), "
+ "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1)]";
assertThat(actual, is(expected));
}
@@ -205,6 +284,33 @@ public class TestInputFormat {
assertThat(actual, is(expected));
}
@Test
void testReadChangesUnMergedMOR() throws Exception {
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
beforeEach(HoodieTableType.MERGE_ON_READ, options);
// write another commit to read again
TestData.writeData(TestData.DATA_SET_INSERT_UPDATE_DELETE, conf);
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
List<RowData> result = readData(inputFormat);
final String actual = TestData.rowDataToString(result, true);
final String expected = "["
+ "+I(id1,Danny,19,1970-01-01T00:00:00.001,par1), "
+ "-U(id1,Danny,19,1970-01-01T00:00:00.001,par1), "
+ "+U(id1,Danny,20,1970-01-01T00:00:00.002,par1), "
+ "-U(id1,Danny,20,1970-01-01T00:00:00.002,par1), "
+ "+U(id1,Danny,21,1970-01-01T00:00:00.003,par1), "
+ "-U(id1,Danny,21,1970-01-01T00:00:00.003,par1), "
+ "+U(id1,Danny,22,1970-01-01T00:00:00.004,par1), "
+ "-D(id1,Danny,22,1970-01-01T00:00:00.005,par1)]";
assertThat(actual, is(expected));
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------

View File

@@ -234,15 +234,53 @@ public class TestData {
TimestampData.fromEpochMillis(6), StringData.fromString("par3"))
);
public static List<RowData> DATA_SET_INSERT_UPDATE_DELETE = Arrays.asList(
// INSERT
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 19,
TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
// UPDATE
updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 19,
TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 20,
TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 20,
TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21,
TimestampData.fromEpochMillis(3), StringData.fromString("par1")),
updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21,
TimestampData.fromEpochMillis(3), StringData.fromString("par1")),
updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22,
TimestampData.fromEpochMillis(4), StringData.fromString("par1")),
// DELETE
deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22,
TimestampData.fromEpochMillis(5), StringData.fromString("par1"))
);
/**
* Returns string format of a list of RowData.
*/
public static String rowDataToString(List<RowData> rows) {
return rowDataToString(rows, false);
}
/**
* Returns string format of a list of RowData.
*
* @param withChangeFlag whether to print the change flag
*/
public static String rowDataToString(List<RowData> rows, boolean withChangeFlag) {
DataStructureConverter<Object, Object> converter =
DataStructureConverters.getConverter(TestConfigurations.ROW_DATA_TYPE);
return rows.stream()
.map(row -> converter.toExternal(row).toString())
.sorted(Comparator.naturalOrder())
.sorted(Comparator.comparing(o -> toStringSafely(o.getString(0))))
.map(row -> {
final String rowStr = converter.toExternal(row).toString();
if (withChangeFlag) {
return row.getRowKind().shortString() + "(" + rowStr + ")";
} else {
return rowStr;
}
})
.collect(Collectors.toList()).toString();
}
@@ -287,7 +325,30 @@ public class TestData {
* @param expected Expected string of the sorted rows
*/
public static void assertRowsEquals(List<Row> rows, String expected) {
assertRowsEquals(rows, expected, 0);
assertRowsEquals(rows, expected, false);
}
/**
* Sort the {@code rows} using field at index 0 and asserts
* it equals with the expected string {@code expected}.
*
* @param rows Actual result rows
* @param expected Expected string of the sorted rows
* @param withChangeFlag Whether compares with change flags
*/
public static void assertRowsEquals(List<Row> rows, String expected, boolean withChangeFlag) {
String rowsString = rows.stream()
.sorted(Comparator.comparing(o -> toStringSafely(o.getField(0))))
.map(row -> {
final String rowStr = row.toString();
if (withChangeFlag) {
return row.getKind().shortString() + "(" + rowStr + ")";
} else {
return rowStr;
}
})
.collect(Collectors.toList()).toString();
assertThat(rowsString, is(expected));
}
/**
@@ -573,7 +634,11 @@ public class TestData {
}
public static BinaryRowData insertRow(Object... fields) {
LogicalType[] types = TestConfigurations.ROW_TYPE.getFields().stream().map(RowType.RowField::getType)
return insertRow(TestConfigurations.ROW_TYPE, fields);
}
public static BinaryRowData insertRow(RowType rowType, Object... fields) {
LogicalType[] types = rowType.getFields().stream().map(RowType.RowField::getType)
.toArray(LogicalType[]::new);
assertEquals(
"Filed count inconsistent with type information",
@@ -599,4 +664,16 @@ public class TestData {
rowData.setRowKind(RowKind.DELETE);
return rowData;
}
private static BinaryRowData updateBeforeRow(Object... fields) {
BinaryRowData rowData = insertRow(fields);
rowData.setRowKind(RowKind.UPDATE_BEFORE);
return rowData;
}
private static BinaryRowData updateAfterRow(Object... fields) {
BinaryRowData rowData = insertRow(fields);
rowData.setRowKind(RowKind.UPDATE_AFTER);
return rowData;
}
}

View File

@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utils;
import org.apache.hudi.client.model.HoodieRowData;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.junit.jupiter.api.Test;
import java.math.BigDecimal;
import java.util.Random;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Unit tests {@link HoodieRowData}.
*/
public class TestHoodieRowData {
private final int metaColumnsNum = HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.size();
private static final Random RANDOM = new Random();
private static final int INTEGER_INDEX = 0;
private static final int STRING_INDEX = 1;
private static final int BOOLEAN_INDEX = 2;
private static final int SHORT_INDEX = 3;
private static final int BYTE_INDEX = 4;
private static final int LONG_INDEX = 5;
private static final int FLOAT_INDEX = 6;
private static final int DOUBLE_INDEX = 7;
private static final int DECIMAL_INDEX = 8;
private static final int BINARY_INDEX = 9;
private static final int ROW_INDEX = 10;
private static final DataType BASIC_DATA_TYPE = DataTypes.ROW(
DataTypes.FIELD("integer", DataTypes.INT()),
DataTypes.FIELD("string", DataTypes.STRING()),
DataTypes.FIELD("boolean", DataTypes.BOOLEAN()),
DataTypes.FIELD("short", DataTypes.SMALLINT()),
DataTypes.FIELD("byte", DataTypes.TINYINT()),
DataTypes.FIELD("long", DataTypes.BIGINT()),
DataTypes.FIELD("float", DataTypes.FLOAT()),
DataTypes.FIELD("double", DataTypes.DOUBLE()),
DataTypes.FIELD("decimal", DataTypes.DECIMAL(10, 4)),
DataTypes.FIELD("binary", DataTypes.BYTES()),
DataTypes.FIELD("row", DataTypes.ROW()))
.notNull();
private static final RowType ROW_TYPE = (RowType) BASIC_DATA_TYPE.getLogicalType();
@Test
public void testGet() {
Object[] values = getRandomValue(true);
RowData rowData = TestData.insertRow(ROW_TYPE, values);
HoodieRowData hoodieRowData = new HoodieRowData("commitTime", "commitSeqNo", "recordKey", "partitionPath", "fileName",
rowData, true);
assertValues(hoodieRowData, "commitTime", "commitSeqNo", "recordKey", "partitionPath",
"fileName", values);
}
/**
* Fetches a random Object[] of values for testing.
*
* @param haveRowType true if rowType need to be added as one of the elements in the Object[]
* @return the random Object[] thus generated
*/
private Object[] getRandomValue(boolean haveRowType) {
Object[] values = new Object[11];
values[INTEGER_INDEX] = RANDOM.nextInt();
values[STRING_INDEX] = StringData.fromString(UUID.randomUUID().toString());
values[BOOLEAN_INDEX] = RANDOM.nextBoolean();
values[SHORT_INDEX] = (short) RANDOM.nextInt(2);
byte[] bytes = new byte[1];
RANDOM.nextBytes(bytes);
values[BYTE_INDEX] = bytes[0];
values[LONG_INDEX] = RANDOM.nextLong();
values[FLOAT_INDEX] = RANDOM.nextFloat();
values[DOUBLE_INDEX] = RANDOM.nextDouble();
values[DECIMAL_INDEX] = DecimalData.fromBigDecimal(new BigDecimal("1005.12313"), 10, 4);
bytes = new byte[20];
RANDOM.nextBytes(bytes);
values[BINARY_INDEX] = bytes;
if (haveRowType) {
Object[] rowField = getRandomValue(false);
values[ROW_INDEX] = TestData.insertRow(ROW_TYPE, rowField);
}
return values;
}
private void assertValues(HoodieRowData hoodieRowData, String commitTime, String commitSeqNo, String recordKey, String partitionPath,
String filename, Object[] values) {
assertEquals(commitTime, hoodieRowData.getString(0).toString());
assertEquals(commitSeqNo, hoodieRowData.getString(1).toString());
assertEquals(recordKey, hoodieRowData.getString(2).toString());
assertEquals(partitionPath, hoodieRowData.getString(3).toString());
assertEquals(filename, hoodieRowData.getString(4).toString());
assertEquals("I", hoodieRowData.getString(5).toString());
// row data.
assertEquals(values[INTEGER_INDEX], hoodieRowData.getInt(INTEGER_INDEX + metaColumnsNum));
assertEquals(values[STRING_INDEX], hoodieRowData.getString(STRING_INDEX + metaColumnsNum));
assertEquals(values[BOOLEAN_INDEX], hoodieRowData.getBoolean(BOOLEAN_INDEX + metaColumnsNum));
assertEquals(values[SHORT_INDEX], hoodieRowData.getShort(SHORT_INDEX + metaColumnsNum));
assertEquals(values[BYTE_INDEX], hoodieRowData.getByte(BYTE_INDEX + metaColumnsNum));
assertEquals(values[LONG_INDEX], hoodieRowData.getLong(LONG_INDEX + metaColumnsNum));
assertEquals(values[FLOAT_INDEX], hoodieRowData.getFloat(FLOAT_INDEX + metaColumnsNum));
assertEquals(values[DOUBLE_INDEX], hoodieRowData.getDouble(DOUBLE_INDEX + metaColumnsNum));
assertEquals(values[DECIMAL_INDEX], hoodieRowData.getDecimal(DECIMAL_INDEX + metaColumnsNum, 10, 4));
byte[] exceptBinary = (byte[]) values[BINARY_INDEX];
byte[] binary = hoodieRowData.getBinary(BINARY_INDEX + metaColumnsNum);
for (int i = 0; i < exceptBinary.length; i++) {
assertEquals(exceptBinary[i], binary[i]);
}
assertEquals(values[ROW_INDEX], hoodieRowData.getRow(ROW_INDEX + metaColumnsNum, values.length));
}
}

View File

@@ -150,6 +150,7 @@ public class CollectSinkTableFactory implements DynamicTableSinkFactory {
public void invoke(RowData value, SinkFunction.Context context) {
Row row = (Row) converter.toExternal(value);
assert row != null;
row.setKind(value.getRowKind());
RESULT.get(taskID).add(row);
}