From 21db6d7a84d4a83ec98c110e92ff9c92d05dd530 Mon Sep 17 00:00:00 2001 From: swuferhong <337361684@qq.com> Date: Tue, 10 Aug 2021 20:23:23 +0800 Subject: [PATCH] [HUDI-1771] Propagate CDC format for hoodie (#3285) --- .../apache/hudi/config/HoodieWriteConfig.java | 16 +++ .../apache/hudi/io/HoodieAppendHandle.java | 15 +- .../apache/hudi/io/HoodieCreateHandle.java | 4 + .../org/apache/hudi/io/HoodieMergeHandle.java | 4 + .../org/apache/hudi/io/HoodieWriteHandle.java | 4 +- .../hudi/client/model/HoodieRowData.java | 54 +++---- .../org/apache/hudi/io/FlinkAppendHandle.java | 3 + .../row/HoodieRowDataCreateHandle.java | 4 +- .../table/HoodieFlinkCopyOnWriteTable.java | 2 +- .../table/action/commit/FlinkWriteHelper.java | 19 ++- .../HoodieFlinkMergeOnReadTableCompactor.java | 4 +- .../org/apache/hudi/avro/HoodieAvroUtils.java | 48 ++++++- .../hudi/common/model/HoodieOperation.java | 125 ++++++++++++++++ .../hudi/common/model/HoodieRecord.java | 23 +++ .../common/table/TableSchemaResolver.java | 12 +- .../log/AbstractHoodieLogRecordScanner.java | 20 ++- .../table/log/HoodieFileSliceReader.java | 4 +- .../log/HoodieMergedLogRecordScanner.java | 26 ++-- .../log/HoodieUnMergedLogRecordScanner.java | 13 +- .../hudi/common/util/SpillableMapUtils.java | 16 ++- .../metadata/HoodieBackedTableMetadata.java | 10 +- .../HoodieMetadataMergedLogRecordScanner.java | 2 +- .../hudi/configuration/FlinkOptions.java | 11 ++ .../apache/hudi/sink/StreamWriteFunction.java | 12 +- .../sink/bootstrap/BootstrapFunction.java | 2 +- .../sink/bulk/BulkInsertWriterHelper.java | 11 +- .../compact/CompactionPlanSourceFunction.java | 20 +-- .../sink/compact/HoodieFlinkCompactor.java | 3 +- .../transform/RowDataToHoodieFunction.java | 10 +- .../hudi/sink/utils/HiveSyncContext.java | 1 + .../hudi/sink/utils/PayloadCreation.java | 5 +- .../apache/hudi/table/HoodieTableSink.java | 13 +- .../apache/hudi/table/HoodieTableSource.java | 10 +- .../apache/hudi/table/format/FormatUtils.java | 131 ++++++++++++++++- .../format/mor/MergeOnReadInputFormat.java | 105 ++++++++++++-- .../format/mor/MergeOnReadTableState.java | 8 ++ .../org/apache/hudi/util/ChangelogModes.java | 46 ++++++ .../org/apache/hudi/util/StreamerUtil.java | 1 + .../apache/hudi/sink/StreamWriteITCase.java | 12 +- .../hudi/sink/TestWriteCopyOnWrite.java | 47 +++--- .../hudi/sink/bulk/TestRowDataKeyGen.java | 4 +- .../hudi/table/HoodieDataSourceITCase.java | 29 ++-- .../hudi/table/format/TestInputFormat.java | 122 ++++++++++++++-- .../java/org/apache/hudi/utils/TestData.java | 85 ++++++++++- .../apache/hudi/utils/TestHoodieRowData.java | 136 ++++++++++++++++++ .../factory/CollectSinkTableFactory.java | 1 + .../org/apache/hudi/dla/HoodieDLAClient.java | 2 +- .../org/apache/hudi/hive/HiveSyncConfig.java | 5 + .../apache/hudi/hive/HoodieHiveClient.java | 2 +- .../sync/common/AbstractSyncHoodieClient.java | 18 ++- 50 files changed, 1081 insertions(+), 199 deletions(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/model/HoodieOperation.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/util/ChangelogModes.java create mode 100644 hudi-flink/src/test/java/org/apache/hudi/utils/TestHoodieRowData.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 7e9c0cc66..9616b552c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -373,6 +373,13 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("Whether to allow generation of empty commits, even if no data was written in the commit. " + "It's useful in cases where extra metadata needs to be published regardless e.g tracking source offsets when ingesting data"); + public static final ConfigProperty ALLOW_OPERATION_METADATA_FIELD = ConfigProperty + .key("hoodie.allow.operation.metadata.field") + .defaultValue(false) + .sinceVersion("0.9") + .withDocumentation("Whether to include '_hoodie_operation' in the metadata fields. " + + "Once enabled, all the changes of a record are persisted to the delta log directly without merge"); + private ConsistencyGuardConfig consistencyGuardConfig; // Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled @@ -1309,6 +1316,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getBooleanOrDefault(ALLOW_EMPTY_COMMIT); } + public boolean allowOperationMetadataField() { + return getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD); + } + public static class Builder { protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig(); @@ -1615,6 +1626,11 @@ public class HoodieWriteConfig extends HoodieConfig { return this; } + public Builder withAllowOperationMetadataField(boolean allowOperationMetadataField) { + writeConfig.setValue(ALLOW_OPERATION_METADATA_FIELD, Boolean.toString(allowOperationMetadataField)); + return this; + } + public Builder withProperties(Properties properties) { this.writeConfig.getProps().putAll(properties); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index fe45d4b3f..1315c9940 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieDeltaWriteStat; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; @@ -197,20 +198,26 @@ public class HoodieAppendHandle extends // Pass the isUpdateRecord to the props for HoodieRecordPayload to judge // Whether it is a update or insert record. boolean isUpdateRecord = isUpdateRecord(hoodieRecord); + // If the format can not record the operation field, nullify the DELETE payload manually. + boolean nullifyPayload = HoodieOperation.isDelete(hoodieRecord.getOperation()) && !config.allowOperationMetadataField(); recordProperties.put(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR, String.valueOf(isUpdateRecord)); - Option avroRecord = hoodieRecord.getData().getInsertValue(tableSchema, recordProperties); + Option avroRecord = nullifyPayload ? Option.empty() : hoodieRecord.getData().getInsertValue(tableSchema, recordProperties); if (avroRecord.isPresent()) { if (avroRecord.get().equals(IGNORE_RECORD)) { return avroRecord; } // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema - avroRecord = Option.of(rewriteRecord((GenericRecord) avroRecord.get())); + GenericRecord rewriteRecord = rewriteRecord((GenericRecord) avroRecord.get()); + avroRecord = Option.of(rewriteRecord); String seqId = HoodieRecord.generateSequenceId(instantTime, getPartitionId(), RECORD_COUNTER.getAndIncrement()); if (config.populateMetaFields()) { - HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord.get(), hoodieRecord.getRecordKey(), + HoodieAvroUtils.addHoodieKeyToRecord(rewriteRecord, hoodieRecord.getRecordKey(), hoodieRecord.getPartitionPath(), fileId); - HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord.get(), instantTime, seqId); + HoodieAvroUtils.addCommitMetadataToRecord(rewriteRecord, instantTime, seqId); + } + if (config.allowOperationMetadataField()) { + HoodieAvroUtils.addOperationToRecord(rewriteRecord, hoodieRecord.getOperation()); } if (isUpdateRecord(hoodieRecord)) { updatedRecordsWritten++; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index eef5b3d38..01ad45342 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -22,6 +22,7 @@ import org.apache.avro.Schema; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; @@ -127,6 +128,9 @@ public class HoodieCreateHandle extends @Override public void write(HoodieRecord record, Option avroRecord) { Option recordMetadata = record.getData().getMetadata(); + if (HoodieOperation.isDelete(record.getOperation())) { + avroRecord = Option.empty(); + } try { if (avroRecord.isPresent()) { if (avroRecord.get().equals(IGNORE_RECORD)) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 347f8cc59..3e2014126 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; @@ -264,6 +265,9 @@ public class HoodieMergeHandle extends H writeStatus.markFailure(hoodieRecord, failureEx, recordMetadata); return false; } + if (HoodieOperation.isDelete(hoodieRecord.getOperation())) { + indexedRecord = Option.empty(); + } try { if (indexedRecord.isPresent()) { // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 0337b0e5e..306021bee 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -112,9 +112,9 @@ public abstract class HoodieWriteHandle this.partitionPath = partitionPath; this.fileId = fileId; this.tableSchema = overriddenSchema.orElseGet(() -> getSpecifiedTableSchema(config)); - this.tableSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(tableSchema); + this.tableSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(tableSchema, config.allowOperationMetadataField()); this.writeSchema = overriddenSchema.orElseGet(() -> getWriteSchema(config)); - this.writeSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(writeSchema); + this.writeSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(writeSchema, config.allowOperationMetadataField()); this.timer = new HoodieTimer().startTimer(); this.writeStatus = (WriteStatus) ReflectionUtils.loadClass(config.getWriteStatusClassName(), !hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction()); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java index 86acc1c1d..dfc425c80 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java @@ -18,7 +18,7 @@ package org.apache.hudi.client.model; -import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieOperation; import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.DecimalData; @@ -35,12 +35,7 @@ import org.apache.flink.types.RowKind; * copy rather than fetching from {@link RowData}. */ public class HoodieRowData implements RowData { - - private final String commitTime; - private final String commitSeqNumber; - private final String recordKey; - private final String partitionPath; - private final String fileName; + private final String[] metaColumns; private final RowData row; private final int metaColumnsNum; @@ -49,14 +44,19 @@ public class HoodieRowData implements RowData { String recordKey, String partitionPath, String fileName, - RowData row) { - this.commitTime = commitTime; - this.commitSeqNumber = commitSeqNumber; - this.recordKey = recordKey; - this.partitionPath = partitionPath; - this.fileName = fileName; + RowData row, + boolean withOperation) { + this.metaColumnsNum = withOperation ? 6 : 5; + this.metaColumns = new String[metaColumnsNum]; + metaColumns[0] = commitTime; + metaColumns[1] = commitSeqNumber; + metaColumns[2] = recordKey; + metaColumns[3] = partitionPath; + metaColumns[4] = fileName; + if (withOperation) { + metaColumns[5] = HoodieOperation.fromValue(row.getRowKind().toByteValue()).getName(); + } this.row = row; - this.metaColumnsNum = HoodieRecord.HOODIE_META_COLUMNS.size(); } @Override @@ -74,28 +74,6 @@ public class HoodieRowData implements RowData { this.row.setRowKind(kind); } - private String getMetaColumnVal(int ordinal) { - switch (ordinal) { - case 0: { - return commitTime; - } - case 1: { - return commitSeqNumber; - } - case 2: { - return recordKey; - } - case 3: { - return partitionPath; - } - case 4: { - return fileName; - } - default: - throw new IllegalArgumentException("Not expected"); - } - } - @Override public boolean isNullAt(int ordinal) { if (ordinal < metaColumnsNum) { @@ -181,4 +159,8 @@ public class HoodieRowData implements RowData { public MapData getMap(int ordinal) { return row.getMap(ordinal - metaColumnsNum); } + + private String getMetaColumnVal(int ordinal) { + return this.metaColumns[ordinal]; + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java index 987f3350d..81bd5985e 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java @@ -79,6 +79,9 @@ public class FlinkAppendHandle @Override protected boolean isUpdateRecord(HoodieRecord hoodieRecord) { + // do not use the HoodieRecord operation because hoodie writer has its own + // INSERT/MERGE bucket for 'UPSERT' semantics. For e.g, a hoodie record with fresh new key + // and operation HoodieCdcOperation.DELETE would be put into either an INSERT bucket or UPDATE bucket. return hoodieRecord.getCurrentLocation() != null && hoodieRecord.getCurrentLocation().getInstantTime().equals("U"); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java index 76fad8b1e..238367b5d 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java @@ -117,7 +117,7 @@ public class HoodieRowDataCreateHandle implements Serializable { try { String seqId = HoodieRecord.generateSequenceId(instantTime, taskPartitionId, SEQGEN.getAndIncrement()); HoodieRowData rowData = new HoodieRowData(instantTime, seqId, recordKey, partitionPath, path.getName(), - record); + record, writeConfig.allowOperationMetadataField()); try { fileWriter.writeRow(recordKey, rowData); writeStatus.markSuccess(recordKey); @@ -131,7 +131,7 @@ public class HoodieRowDataCreateHandle implements Serializable { } /** - * @returns {@code true} if this handle can take in more writes. else {@code false}. + * Returns {@code true} if this handle can take in more writes. else {@code false}. */ public boolean canWrite() { return fileWriter.canWrite(); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 9776870d9..8a9b4bf9b 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -354,7 +354,7 @@ public class HoodieFlinkCopyOnWriteTable extends dataFileToBeMerged, taskContextSupplier, Option.empty()); } else { return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId, - dataFileToBeMerged,taskContextSupplier, Option.empty()); + dataFileToBeMerged, taskContextSupplier, Option.empty()); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java index 52381230d..5cb1b80a5 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java @@ -21,6 +21,7 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.collection.Pair; @@ -45,7 +46,7 @@ import java.util.stream.Collectors; *

Computing the records batch locations all at a time is a pressure to the engine, * we should avoid that in streaming system. */ -public class FlinkWriteHelper extends AbstractWriteHelper>, +public class FlinkWriteHelper extends AbstractWriteHelper>, List, List, R> { private FlinkWriteHelper() { @@ -80,8 +81,8 @@ public class FlinkWriteHelper extends AbstractW @Override public List> deduplicateRecords(List> records, - HoodieIndex>, List, List> index, - int parallelism) { + HoodieIndex>, List, List> index, + int parallelism) { Map>>> keyedRecords = records.stream().map(record -> { // If index used is global, then records are expected to differ in their partitionPath final Object key = record.getKey().getRecordKey(); @@ -89,13 +90,17 @@ public class FlinkWriteHelper extends AbstractW }).collect(Collectors.groupingBy(Pair::getLeft)); return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> { - @SuppressWarnings("unchecked") - T reducedData = (T) rec1.getData().preCombine(rec2.getData()); + final T data1 = rec1.getData(); + final T data2 = rec2.getData(); + + @SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1); // we cannot allow the user to change the key or partitionPath, since that will affect // everything // so pick it from one of the records. - HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey(); - HoodieRecord hoodieRecord = new HoodieRecord<>(reducedKey, reducedData); + boolean choosePrev = data1.equals(reducedData); + HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey(); + HoodieOperation operation = choosePrev ? rec1.getOperation() : rec2.getOperation(); + HoodieRecord hoodieRecord = new HoodieRecord<>(reducedKey, reducedData, operation); // reuse the location from the first record. hoodieRecord.setCurrentLocation(rec1.getCurrentLocation()); return hoodieRecord; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java index f08c8b5d1..1f4a52484 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java @@ -86,7 +86,7 @@ public class HoodieFlinkMergeOnReadTableCompactor @Override public List compact(HoodieEngineContext context, HoodieCompactionPlan compactionPlan, - HoodieTable hoodieTable, HoodieWriteConfig config, String compactionInstantTime) throws IOException { + HoodieTable hoodieTable, HoodieWriteConfig config, String compactionInstantTime) throws IOException { throw new UnsupportedOperationException("HoodieFlinkMergeOnReadTableCompactor does not support compact directly, " + "the function works as a separate pipeline"); } @@ -98,7 +98,7 @@ public class HoodieFlinkMergeOnReadTableCompactor String instantTime) throws IOException { FileSystem fs = metaClient.getFs(); - Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); + Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField()); LOG.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames() + " for commit " + instantTime); // TODO - FIX THIS diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 695f97e8a..239eed563 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -19,7 +19,10 @@ package org.apache.hudi.avro; import org.apache.avro.specific.SpecificRecordBase; + +import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; @@ -151,7 +154,8 @@ public class HoodieAvroUtils { || HoodieRecord.COMMIT_SEQNO_METADATA_FIELD.equals(fieldName) || HoodieRecord.RECORD_KEY_METADATA_FIELD.equals(fieldName) || HoodieRecord.PARTITION_PATH_METADATA_FIELD.equals(fieldName) - || HoodieRecord.FILENAME_METADATA_FIELD.equals(fieldName); + || HoodieRecord.FILENAME_METADATA_FIELD.equals(fieldName) + || HoodieRecord.OPERATION_METADATA_FIELD.equals(fieldName); } public static Schema createHoodieWriteSchema(Schema originalSchema) { @@ -164,8 +168,20 @@ public class HoodieAvroUtils { /** * Adds the Hoodie metadata fields to the given schema. + * + * @param schema The schema */ public static Schema addMetadataFields(Schema schema) { + return addMetadataFields(schema, false); + } + + /** + * Adds the Hoodie metadata fields to the given schema. + * + * @param schema The schema + * @param withOperationField Whether to include the '_hoodie_operation' field + */ + public static Schema addMetadataFields(Schema schema, boolean withOperationField) { List parentFields = new ArrayList<>(); Schema.Field commitTimeField = @@ -184,6 +200,13 @@ public class HoodieAvroUtils { parentFields.add(recordKeyField); parentFields.add(partitionPathField); parentFields.add(fileNameField); + + if (withOperationField) { + final Schema.Field operationField = + new Schema.Field(HoodieRecord.OPERATION_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE); + parentFields.add(operationField); + } + for (Schema.Field field : schema.getFields()) { if (!isMetadataField(field.name())) { Schema.Field newField = new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()); @@ -202,7 +225,7 @@ public class HoodieAvroUtils { public static Schema removeMetadataFields(Schema schema) { List filteredFields = schema.getFields() .stream() - .filter(field -> !HoodieRecord.HOODIE_META_COLUMNS.contains(field.name())) + .filter(field -> !HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(field.name())) .map(field -> new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal())) .collect(Collectors.toList()); Schema filteredSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false); @@ -268,6 +291,11 @@ public class HoodieAvroUtils { return record; } + public static GenericRecord addOperationToRecord(GenericRecord record, HoodieOperation operation) { + record.put(HoodieRecord.OPERATION_METADATA_FIELD, operation.getName()); + return record; + } + /** * Add null fields to passed in schema. Caller is responsible for ensuring there is no duplicates. As different query * engines have varying constraints regarding treating the case-sensitivity of fields, its best to let caller @@ -453,6 +481,22 @@ public class HoodieAvroUtils { } } + /** + * Returns the string value of the given record {@code rec} and field {@code fieldName}. + * The field and value both could be missing. + * + * @param rec The record + * @param fieldName The field name + * + * @return the string form of the field + * or empty if the schema does not contain the field name or the value is null + */ + public static Option getNullableValAsString(GenericRecord rec, String fieldName) { + Schema.Field field = rec.getSchema().getField(fieldName); + String fieldVal = field == null ? null : StringUtils.objToString(rec.get(field.pos())); + return Option.ofNullable(fieldVal); + } + /** * This method converts values for fields with certain Avro/Parquet data types that require special handling. * diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieOperation.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieOperation.java new file mode 100644 index 000000000..0da40eb0a --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieOperation.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.hudi.common.util.Option; + +/** + * Represents the changes that a row can describe in a changelog. + */ +public enum HoodieOperation { + /** + * Insert operation. + */ + INSERT("I", (byte) 0), + /** + * Update operation with previous record content, + * should be used together with {@link #UPDATE_AFTER} for modeling an update operation. + */ + UPDATE_BEFORE("-U", (byte) 1), + /** + * Update operation with new record content. + */ + UPDATE_AFTER("U", (byte) 2), + /** + * Delete operation. + */ + DELETE("D", (byte) 4); + + private final String name; + private final byte value; + + HoodieOperation(String name, byte value) { + this.name = name; + this.value = value; + } + + public String getName() { + return name; + } + + public byte getValue() { + return value; + } + + public static HoodieOperation fromValue(byte value) { + switch (value) { + case 0: + return INSERT; + case 1: + return UPDATE_BEFORE; + case 2: + return UPDATE_AFTER; + case 3: + return DELETE; + default: + throw new AssertionError(); + } + } + + public static HoodieOperation fromName(Option nameOpt) { + if (!nameOpt.isPresent()) { + return null; + } + return fromName(nameOpt.get()); + } + + public static HoodieOperation fromName(String name) { + switch (name) { + case "I": + return INSERT; + case "-U": + return UPDATE_BEFORE; + case "U": + return UPDATE_AFTER; + case "D": + return DELETE; + default: + throw new AssertionError(); + } + } + + /** + * Returns whether the operation is INSERT. + */ + public static boolean isInsert(HoodieOperation operation) { + return operation == INSERT; + } + + /** + * Returns whether the operation is UPDATE_BEFORE. + */ + public static boolean isUpdateBefore(HoodieOperation operation) { + return operation == UPDATE_BEFORE; + } + + /** + * Returns whether the operation is UPDATE_AFTER. + */ + public static boolean isUpdateAfter(HoodieOperation operation) { + return operation == UPDATE_AFTER; + } + + /** + * Returns whether the operation is DELETE. + */ + public static boolean isDelete(HoodieOperation operation) { + return operation == DELETE; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java index 6484c5ce4..17427781e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java @@ -39,11 +39,19 @@ public class HoodieRecord implements Serializable public static final String RECORD_KEY_METADATA_FIELD = "_hoodie_record_key"; public static final String PARTITION_PATH_METADATA_FIELD = "_hoodie_partition_path"; public static final String FILENAME_METADATA_FIELD = "_hoodie_file_name"; + public static final String OPERATION_METADATA_FIELD = "_hoodie_operation"; public static final List HOODIE_META_COLUMNS = CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD, RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, FILENAME_METADATA_FIELD); + // Temporary to support the '_hoodie_operation' field, once we solve + // the compatibility problem, it can be removed. + public static final List HOODIE_META_COLUMNS_WITH_OPERATION = + CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD, + RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, FILENAME_METADATA_FIELD, + OPERATION_METADATA_FIELD); + public static final Map HOODIE_META_COLUMNS_NAME_TO_POS = IntStream.range(0, HOODIE_META_COLUMNS.size()).mapToObj(idx -> Pair.of(HOODIE_META_COLUMNS.get(idx), idx)) .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); @@ -73,12 +81,22 @@ public class HoodieRecord implements Serializable */ private boolean sealed; + /** + * The cdc operation. + */ + private HoodieOperation operation; + public HoodieRecord(HoodieKey key, T data) { + this(key, data, null); + } + + public HoodieRecord(HoodieKey key, T data, HoodieOperation operation) { this.key = key; this.data = data; this.currentLocation = null; this.newLocation = null; this.sealed = false; + this.operation = operation; } public HoodieRecord(HoodieRecord record) { @@ -86,6 +104,7 @@ public class HoodieRecord implements Serializable this.currentLocation = record.currentLocation; this.newLocation = record.newLocation; this.sealed = record.sealed; + this.operation = record.operation; } public HoodieRecord() { @@ -95,6 +114,10 @@ public class HoodieRecord implements Serializable return key; } + public HoodieOperation getOperation() { + return operation; + } + public T getData() { if (data == null) { throw new IllegalStateException("Payload already deflated for record."); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 20e5a82c6..8d16e91c2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -55,10 +55,16 @@ import org.apache.parquet.schema.MessageType; public class TableSchemaResolver { private static final Logger LOG = LogManager.getLogger(TableSchemaResolver.class); - private HoodieTableMetaClient metaClient; + private final HoodieTableMetaClient metaClient; + private final boolean withOperationField; public TableSchemaResolver(HoodieTableMetaClient metaClient) { + this(metaClient, false); + } + + public TableSchemaResolver(HoodieTableMetaClient metaClient, boolean withOperationField) { this.metaClient = metaClient; + this.withOperationField = withOperationField; } /** @@ -170,7 +176,7 @@ public class TableSchemaResolver { Option schemaFromTableConfig = metaClient.getTableConfig().getTableCreateSchema(); if (schemaFromTableConfig.isPresent()) { if (includeMetadataFields) { - return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get()); + return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), withOperationField); } else { return schemaFromTableConfig.get(); } @@ -256,7 +262,7 @@ public class TableSchemaResolver { Schema schema = new Schema.Parser().parse(existingSchemaStr); if (includeMetadataFields) { - schema = HoodieAvroUtils.addMetadataFields(schema); + schema = HoodieAvroUtils.addMetadataFields(schema, withOperationField); } return Option.of(schema); } catch (Exception e) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java index 95ed80afa..79d582126 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java @@ -96,6 +96,8 @@ public abstract class AbstractHoodieLogRecordScanner { private final int bufferSize; // optional instant range for incremental block filtering private final Option instantRange; + // Read the operation metadata field from the avro record + private final boolean withOperationField; // FileSystem private final FileSystem fs; // Total log files read - for metrics @@ -114,7 +116,8 @@ public abstract class AbstractHoodieLogRecordScanner { private float progress = 0.0f; protected AbstractHoodieLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, - String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize, Option instantRange) { + String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, + int bufferSize, Option instantRange, boolean withOperationField) { this.readerSchema = readerSchema; this.latestInstantTime = latestInstantTime; this.hoodieTableMetaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build(); @@ -131,6 +134,7 @@ public abstract class AbstractHoodieLogRecordScanner { this.fs = fs; this.bufferSize = bufferSize; this.instantRange = instantRange; + this.withOperationField = withOperationField; } /** @@ -294,7 +298,7 @@ public abstract class AbstractHoodieLogRecordScanner { private boolean isNewInstantBlock(HoodieLogBlock logBlock) { return currentInstantLogBlocks.size() > 0 && currentInstantLogBlocks.peek().getBlockType() != CORRUPT_BLOCK && !logBlock.getLogBlockHeader().get(INSTANT_TIME) - .contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME)); + .contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME)); } /** @@ -312,9 +316,9 @@ public abstract class AbstractHoodieLogRecordScanner { protected HoodieRecord createHoodieRecord(IndexedRecord rec) { if (!simpleKeyGenFields.isPresent()) { - return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN); + return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN, this.withOperationField); } else { - return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN, this.simpleKeyGenFields.get()); + return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN, this.simpleKeyGenFields.get(), this.withOperationField); } } @@ -392,6 +396,10 @@ public abstract class AbstractHoodieLogRecordScanner { return totalCorruptBlocks.get(); } + public boolean isWithOperationField() { + return withOperationField; + } + /** * Builder used to build {@code AbstractHoodieLogRecordScanner}. */ @@ -417,6 +425,10 @@ public abstract class AbstractHoodieLogRecordScanner { throw new UnsupportedOperationException(); } + public Builder withOperationField(boolean withOperationField) { + throw new UnsupportedOperationException(); + } + public abstract AbstractHoodieLogRecordScanner build(); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java index 76d1cc3d0..d840565c6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java @@ -45,8 +45,8 @@ public class HoodieFileSliceReader implements Ite while (baseIterator.hasNext()) { GenericRecord record = (GenericRecord) baseIterator.next(); HoodieRecord hoodieRecord = simpleKeyGenFieldsOpt.isPresent() - ? SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, simpleKeyGenFieldsOpt.get()) - : SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass); + ? SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField()) + : SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, scanner.isWithOperationField()); scanner.processNextRecord(hoodieRecord); } return new HoodieFileSliceReader(scanner.iterator()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index 4d51fbb85..bc08fe744 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -43,7 +43,7 @@ import java.util.Map; /** * Scans through all the blocks in a list of HoodieLogFile and builds up a compacted/merged list of records which will * be used as a lookup table when merging the base columnar file with the redo log file. - * + *

* NOTE: If readBlockLazily is turned on, does not merge, instead keeps reading log blocks and merges everything at once * This is an optimization to avoid seek() back and forth to read new block (forward seek()) and lazily read content of * seen block (reverse and forward seek()) during merge | | Read Block 1 Metadata | | Read Block 1 Data | | | Read Block @@ -72,11 +72,12 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner @SuppressWarnings("unchecked") protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, - String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily, - boolean reverseReader, int bufferSize, String spillableMapBasePath, - Option instantRange, boolean autoScan, - ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled) { - super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange); + String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily, + boolean reverseReader, int bufferSize, String spillableMapBasePath, + Option instantRange, boolean autoScan, + ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled, + boolean withOperationField) { + super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, withOperationField); try { // Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(), @@ -132,8 +133,10 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner if (records.containsKey(key)) { // Merge and store the merged record. The HoodieRecordPayload implementation is free to decide what should be // done when a delete (empty payload) is encountered before or after an insert/update. + + // Always use the natural order now. HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(records.get(key).getData()); - records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue)); + records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, hoodieRecord.getOperation())); } else { // Put the record as is records.put(key, hoodieRecord); @@ -177,6 +180,8 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner private Option instantRange = Option.empty(); // auto scan default true private boolean autoScan = true; + // operation field default false + private boolean withOperationField = false; public Builder withFileSystem(FileSystem fs) { this.fs = fs; @@ -248,12 +253,17 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner return this; } + public Builder withOperationField(boolean withOperationField) { + this.withOperationField = withOperationField; + return this; + } + @Override public HoodieMergedLogRecordScanner build() { return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader, bufferSize, spillableMapBasePath, instantRange, autoScan, - diskMapType, isBitCaskDiskMapCompressionEnabled); + diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index dd0edd99a..8b26f7257 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -36,8 +36,9 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScann private final LogRecordScannerCallback callback; private HoodieUnMergedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, - String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize, LogRecordScannerCallback callback) { - super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, Option.empty()); + String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize, + LogRecordScannerCallback callback, Option instantRange) { + super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, false); this.callback = callback; } @@ -80,6 +81,7 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScann private boolean readBlocksLazily; private boolean reverseReader; private int bufferSize; + private Option instantRange = Option.empty(); // specific configurations private LogRecordScannerCallback callback; @@ -123,6 +125,11 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScann return this; } + public Builder withInstantRange(Option instantRange) { + this.instantRange = instantRange; + return this; + } + public Builder withLogRecordScannerCallback(LogRecordScannerCallback callback) { this.callback = callback; return this; @@ -131,7 +138,7 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScann @Override public HoodieUnMergedLogRecordScanner build() { return new HoodieUnMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, - latestInstantTime, readBlocksLazily, reverseReader, bufferSize, callback); + latestInstantTime, readBlocksLazily, reverseReader, bufferSize, callback, instantRange); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java index 43f8ba5e2..0a716e0e6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java @@ -20,6 +20,7 @@ package org.apache.hudi.common.util; import org.apache.hudi.common.fs.SizeAwareDataOutputStream; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.collection.BitCaskDiskMap.FileEntry; @@ -32,6 +33,8 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.util.zip.CRC32; +import static org.apache.hudi.avro.HoodieAvroUtils.getNullableValAsString; + /** * A utility class supports spillable map. */ @@ -110,18 +113,23 @@ public class SpillableMapUtils { /** * Utility method to convert bytes to HoodieRecord using schema and payload class. */ - public static R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz) { - return convertToHoodieRecordPayload(rec, payloadClazz, Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD)); + public static R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz, boolean withOperationField) { + return convertToHoodieRecordPayload(rec, payloadClazz, Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD), withOperationField); } /** * Utility method to convert bytes to HoodieRecord using schema and payload class. */ - public static R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz, Pair recordKeyPartitionPathPair) { + public static R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz, + Pair recordKeyPartitionPathPair, + boolean withOperationField) { String recKey = rec.get(recordKeyPartitionPathPair.getLeft()).toString(); String partitionPath = rec.get(recordKeyPartitionPathPair.getRight()).toString(); + HoodieOperation operation = withOperationField + ? HoodieOperation.fromName(getNullableValAsString(rec, HoodieRecord.OPERATION_METADATA_FIELD)) : null; HoodieRecord hoodieRecord = new HoodieRecord<>(new HoodieKey(recKey, partitionPath), - ReflectionUtils.loadPayload(payloadClazz, new Object[] {Option.of(rec)}, Option.class)); + ReflectionUtils.loadPayload(payloadClazz, new Object[] {Option.of(rec)}, Option.class), operation); + return (R) hoodieRecord; } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 8995ab491..962b8cac8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -132,10 +132,12 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { HoodieTimer readTimer = new HoodieTimer().startTimer(); Option baseRecord = baseFileReader.getRecordByKey(key); if (baseRecord.isPresent()) { - hoodieRecord = tableConfig.populateMetaFields() ? SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), - tableConfig.getPayloadClass()) : SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), - tableConfig.getPayloadClass(), Pair.of(tableConfig.getRecordKeyFieldProp(), - tableConfig.getPartitionFieldProp())); + hoodieRecord = tableConfig.populateMetaFields() + ? SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), tableConfig.getPayloadClass(), false) + : SpillableMapUtils.convertToHoodieRecordPayload( + baseRecord.get(), + tableConfig.getPayloadClass(), + Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp()), false); metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer())); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java index 51b031564..a3c3e086f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java @@ -45,7 +45,7 @@ public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordS String spillableMapBasePath, Set mergeKeyFilter, ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled) { super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, false, false, bufferSize, - spillableMapBasePath, Option.empty(), false, diskMapType, isBitCaskDiskMapCompressionEnabled); + spillableMapBasePath, Option.empty(), false, diskMapType, isBitCaskDiskMapCompressionEnabled, false); this.mergeKeyFilter = mergeKeyFilter; performScan(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index c0d721ad7..21fdb8421 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -76,6 +76,17 @@ public class FlinkOptions extends HoodieConfig { .withDescription("The default partition name in case the dynamic partition" + " column value is null/empty string"); + public static final ConfigOption CHANGELOG_ENABLED = ConfigOptions + .key("changelog.enabled") + .booleanType() + .defaultValue(false) + .withDescription("Whether to keep all the intermediate changes, " + + "we try to keep all the changes of a record when enabled:\n" + + "1). The sink accept the UPDATE_BEFORE message;\n" + + "2). The source try to emit every changes of a record.\n" + + "The semantics is best effort because the compaction job would finally merge all changes of a record into one.\n" + + " default false to have UPSERT semantics"); + // ------------------------------------------------------------------------ // Metadata table Options // ------------------------------------------------------------------------ diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 375265a4b..71b20ba27 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -21,6 +21,7 @@ package org.apache.hudi.sink; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -360,23 +361,26 @@ public class StreamWriteFunction private final String key; // record key private final String instant; // 'U' or 'I' private final HoodieRecordPayload data; // record payload + private final HoodieOperation operation; // operation - private DataItem(String key, String instant, HoodieRecordPayload data) { + private DataItem(String key, String instant, HoodieRecordPayload data, HoodieOperation operation) { this.key = key; this.instant = instant; this.data = data; + this.operation = operation; } public static DataItem fromHoodieRecord(HoodieRecord record) { return new DataItem( record.getRecordKey(), record.getCurrentLocation().getInstantTime(), - record.getData()); + record.getData(), + record.getOperation()); } public HoodieRecord toHoodieRecord(String partitionPath) { HoodieKey hoodieKey = new HoodieKey(this.key, partitionPath); - HoodieRecord record = new HoodieRecord<>(hoodieKey, data); + HoodieRecord record = new HoodieRecord<>(hoodieKey, data, operation); HoodieRecordLocation loc = new HoodieRecordLocation(instant, null); record.setCurrentLocation(loc); return record; @@ -417,7 +421,7 @@ public class StreamWriteFunction public void preWrite(List records) { // rewrite the first record with expected fileID HoodieRecord first = records.get(0); - HoodieRecord record = new HoodieRecord<>(first.getKey(), first.getData()); + HoodieRecord record = new HoodieRecord<>(first.getKey(), first.getData(), first.getOperation()); HoodieRecordLocation newLoc = new HoodieRecordLocation(first.getCurrentLocation().getInstantTime(), fileID); record.setCurrentLocation(newLoc); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java index bcf6c3a00..49233626f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java @@ -203,7 +203,7 @@ public class BootstrapFunction .filter(logFile -> isValidFile(logFile.getFileStatus())) .map(logFile -> logFile.getPath().toString()) .collect(toList()); - HoodieMergedLogRecordScanner scanner = FormatUtils.scanLog(logPaths, schema, latestCommitTime.get().getTimestamp(), + HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(), writeConfig, hadoopConf); try { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java index 0e3ecb16f..fbe7678b7 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java @@ -71,7 +71,7 @@ public class BulkInsertWriterHelper { this.taskPartitionId = taskPartitionId; this.taskId = taskId; this.taskEpochId = taskEpochId; - this.rowType = addMetadataFields(rowType); // patch up with metadata fields + this.rowType = addMetadataFields(rowType, writeConfig.allowOperationMetadataField()); // patch up with metadata fields this.arePartitionRecordsSorted = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION); this.fileIdPrefix = UUID.randomUUID().toString(); this.keyGen = RowDataKeyGen.instance(conf, rowType); @@ -141,7 +141,7 @@ public class BulkInsertWriterHelper { /** * Adds the Hoodie metadata fields to the given row type. */ - private static RowType addMetadataFields(RowType rowType) { + private static RowType addMetadataFields(RowType rowType, boolean withOperationField) { List mergedFields = new ArrayList<>(); LogicalType metadataFieldType = DataTypes.STRING().getLogicalType(); @@ -161,6 +161,13 @@ public class BulkInsertWriterHelper { mergedFields.add(recordKeyField); mergedFields.add(partitionPathField); mergedFields.add(fileNameField); + + if (withOperationField) { + RowType.RowField operationField = + new RowType.RowField(HoodieRecord.OPERATION_METADATA_FIELD, metadataFieldType, "operation"); + mergedFields.add(operationField); + } + mergedFields.addAll(rowType.getFields()); return new RowType(false, mergedFields); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java index ed916c874..fe5508998 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java @@ -20,8 +20,6 @@ package org.apache.hudi.sink.compact; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.model.CompactionOperation; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.table.HoodieFlinkTable; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.configuration.Configuration; @@ -57,26 +55,14 @@ public class CompactionPlanSourceFunction extends AbstractRichFunction implement /** * Compaction instant time. */ - private String compactionInstantTime; - - /** - * Hoodie flink table. - */ - private HoodieFlinkTable table; + private final String compactionInstantTime; /** * The compaction plan. */ - private HoodieCompactionPlan compactionPlan; + private final HoodieCompactionPlan compactionPlan; - /** - * Hoodie instant. - */ - private HoodieInstant instant; - - public CompactionPlanSourceFunction(HoodieFlinkTable table, HoodieInstant instant, HoodieCompactionPlan compactionPlan, String compactionInstantTime) { - this.table = table; - this.instant = instant; + public CompactionPlanSourceFunction(HoodieCompactionPlan compactionPlan, String compactionInstantTime) { this.compactionPlan = compactionPlan; this.compactionInstantTime = compactionInstantTime; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index 84b68136d..18d49f1be 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -133,9 +133,8 @@ public class HoodieFlinkCompactor { // Mark instant as compaction inflight table.getActiveTimeline().transitionCompactionRequestedToInflight(instant); - table.getMetaClient().reloadActiveTimeline(); - env.addSource(new CompactionPlanSourceFunction(table, instant, compactionPlan, compactionInstantTime)) + env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime)) .name("compaction_source") .uid("uid_compaction_source") .rebalance() diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java index f57571377..b600a5d2f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java @@ -19,6 +19,7 @@ package org.apache.hudi.sink.transform; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.configuration.FlinkOptions; @@ -34,7 +35,6 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.types.RowKind; import java.io.IOException; @@ -108,9 +108,9 @@ public class RowDataToHoodieFunction private HoodieRecord toHoodieRecord(I record) throws Exception { GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record); final HoodieKey hoodieKey = keyGenerator.getKey(gr); - // nullify the payload insert data to mark the record as a DELETE - final boolean isDelete = record.getRowKind() == RowKind.DELETE; - HoodieRecordPayload payload = payloadCreation.createPayload(gr, isDelete); - return new HoodieRecord<>(hoodieKey, payload); + + HoodieRecordPayload payload = payloadCreation.createPayload(gr); + HoodieOperation operation = HoodieOperation.fromValue(record.getRowKind().toByteValue()); + return new HoodieRecord<>(hoodieKey, payload, operation); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java index 12eb03989..192feb5ec 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java @@ -86,6 +86,7 @@ public class HiveSyncContext { hiveSyncConfig.decodePartition = conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING); hiveSyncConfig.skipROSuffix = conf.getBoolean(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX); hiveSyncConfig.assumeDatePartitioning = conf.getBoolean(FlinkOptions.HIVE_SYNC_ASSUME_DATE_PARTITION); + hiveSyncConfig.withOperationField = conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED); return hiveSyncConfig; } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java index 831da25e5..20070b3eb 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java @@ -71,13 +71,12 @@ public class PayloadCreation implements Serializable { return new PayloadCreation(shouldCombine, constructor, preCombineField); } - public HoodieRecordPayload createPayload(GenericRecord record, boolean isDelete) throws Exception { + public HoodieRecordPayload createPayload(GenericRecord record) throws Exception { if (shouldCombine) { ValidationUtils.checkState(preCombineField != null); Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(record, preCombineField, false); - return (HoodieRecordPayload) constructor.newInstance( - isDelete ? null : record, orderingVal); + return (HoodieRecordPayload) constructor.newInstance(record, orderingVal); } else { return (HoodieRecordPayload) this.constructor.newInstance(Option.of(record)); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 49cd9e7b6..8a16c6d4f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -34,6 +34,7 @@ import org.apache.hudi.sink.compact.CompactionPlanEvent; import org.apache.hudi.sink.compact.CompactionPlanOperator; import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.partitioner.BucketAssignOperator; +import org.apache.hudi.util.ChangelogModes; import org.apache.hudi.sink.transform.RowDataToHoodieFunctions; import org.apache.hudi.table.format.FilePathUtils; import org.apache.hudi.util.StreamerUtil; @@ -52,7 +53,6 @@ import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode$; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.types.RowKind; import java.util.Map; @@ -185,12 +185,11 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, @Override public ChangelogMode getChangelogMode(ChangelogMode changelogMode) { - // ignore RowKind.UPDATE_BEFORE - return ChangelogMode.newBuilder() - .addContainedKind(RowKind.DELETE) - .addContainedKind(RowKind.INSERT) - .addContainedKind(RowKind.UPDATE_AFTER) - .build(); + if (conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) { + return ChangelogModes.FULL; + } else { + return ChangelogModes.UPSERT; + } } @Override diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index af2327df5..51d3c8dcd 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -37,6 +37,7 @@ import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; import org.apache.hudi.table.format.mor.MergeOnReadTableState; import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.ChangelogModes; import org.apache.hudi.util.StreamerUtil; import org.apache.avro.Schema; @@ -196,7 +197,12 @@ public class HoodieTableSource implements @Override public ChangelogMode getChangelogMode() { - return ChangelogMode.insertOnly(); + return conf.getBoolean(FlinkOptions.READ_AS_STREAMING) + && !conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED) + ? ChangelogModes.FULL + // when all the changes are persisted or read as batch, + // use INSERT mode. + : ChangelogMode.insertOnly(); } @Override @@ -309,7 +315,7 @@ public class HoodieTableSource implements return new CollectionInputFormat<>(Collections.emptyList(), null); } - TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); + TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)); final Schema tableAvroSchema; try { tableAvroSchema = schemaUtil.getTableAvroSchema(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index cbf1ea700..14f7eb3e2 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -19,19 +19,32 @@ package org.apache.hudi.table.format; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieOperation; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner; +import org.apache.hudi.common.util.DefaultSizeEstimator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; +import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer; +import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.generic.IndexedRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.RowKind; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapred.JobConf; +import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -45,6 +58,39 @@ public class FormatUtils { private FormatUtils() { } + /** + * Sets up the row kind to the row data {@code rowData} from the resolved operation. + */ + public static void setRowKind(RowData rowData, IndexedRecord record, int index) { + if (index == -1) { + return; + } + rowData.setRowKind(getRowKind(record, index)); + } + + /** + * Returns the RowKind of the given record, never null. + * Returns RowKind.INSERT when the given field value not found. + */ + private static RowKind getRowKind(IndexedRecord record, int index) { + Object val = record.get(index); + if (val == null) { + return RowKind.INSERT; + } + final HoodieOperation operation = HoodieOperation.fromName(val.toString()); + if (HoodieOperation.isInsert(operation)) { + return RowKind.INSERT; + } else if (HoodieOperation.isUpdateBefore(operation)) { + return RowKind.UPDATE_BEFORE; + } else if (HoodieOperation.isUpdateAfter(operation)) { + return RowKind.UPDATE_AFTER; + } else if (HoodieOperation.isDelete(operation)) { + return RowKind.DELETE; + } else { + throw new AssertionError(); + } + } + public static GenericRecord buildAvroRecordBySchema( IndexedRecord record, Schema requiredSchema, @@ -57,10 +103,11 @@ public class FormatUtils { return recordBuilder.build(); } - public static HoodieMergedLogRecordScanner scanLog( + public static HoodieMergedLogRecordScanner logScanner( MergeOnReadInputSplit split, Schema logSchema, - Configuration config) { + Configuration config, + boolean withOperationField) { FileSystem fs = FSUtils.getFs(split.getTablePath(), config); return HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(fs) @@ -81,10 +128,88 @@ public class FormatUtils { config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) .withInstantRange(split.getInstantRange()) + .withOperationField(withOperationField) .build(); } - public static HoodieMergedLogRecordScanner scanLog( + private static HoodieUnMergedLogRecordScanner unMergedLogScanner( + MergeOnReadInputSplit split, + Schema logSchema, + Configuration config, + HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback) { + FileSystem fs = FSUtils.getFs(split.getTablePath(), config); + return HoodieUnMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(split.getTablePath()) + .withLogFilePaths(split.getLogPaths().get()) + .withReaderSchema(logSchema) + .withLatestInstantTime(split.getLatestCommit()) + .withReadBlocksLazily( + string2Boolean( + config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, + HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))) + .withReverseReader(false) + .withBufferSize( + config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, + HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) + .withInstantRange(split.getInstantRange()) + .withLogRecordScannerCallback(callback) + .build(); + } + + /** + * Utility to read and buffer the records in the unMerged log record scanner. + */ + public static class BoundedMemoryRecords { + // Log Record unmerged scanner + private final HoodieUnMergedLogRecordScanner scanner; + + // Executor that runs the above producers in parallel + private final BoundedInMemoryExecutor, HoodieRecord, ?> executor; + + // Iterator for the buffer consumer + private final Iterator> iterator; + + public BoundedMemoryRecords( + MergeOnReadInputSplit split, + Schema logSchema, + Configuration hadoopConf) { + this.executor = new BoundedInMemoryExecutor<>( + HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(new JobConf(hadoopConf)), + getParallelProducers(), + Option.empty(), + x -> x, + new DefaultSizeEstimator<>()); + // Consumer of this record reader + this.iterator = this.executor.getQueue().iterator(); + this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, hadoopConf, + record -> executor.getQueue().insertRecord(record)); + // Start reading and buffering + this.executor.startProducers(); + } + + public Iterator> getRecordsIterator() { + return this.iterator; + } + + /** + * Setup log and parquet reading in parallel. Both write to central buffer. + */ + private List>> getParallelProducers() { + List>> producers = new ArrayList<>(); + producers.add(new FunctionBasedQueueProducer<>(buffer -> { + scanner.scan(); + return null; + })); + return producers; + } + + public void close() { + this.executor.shutdownNow(); + } + } + + public static HoodieMergedLogRecordScanner logScanner( List logPaths, Schema logSchema, String latestInstantTime, diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 4d68242d6..d0e1c33d1 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.format.mor; +import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.log.InstantRange; @@ -176,7 +177,11 @@ public class MergeOnReadInputFormat } } else if (!split.getBasePath().isPresent()) { // log files only - this.iterator = new LogFileOnlyIterator(getLogFileIterator(split)); + if (conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) { + this.iterator = new LogFileOnlyIterator(getUnMergedLogFileIterator(split)); + } else { + this.iterator = new LogFileOnlyIterator(getLogFileIterator(split)); + } } else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) { this.iterator = new SkipMergeIterator( getRequiredSchemaReader(split.getBasePath().get()), @@ -190,6 +195,9 @@ public class MergeOnReadInputFormat new Schema.Parser().parse(this.tableState.getAvroSchema()), new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()), this.requiredPos, + this.emitDelete, + this.conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED), + this.tableState.getOperationPos(), getFullSchemaReader(split.getBasePath().get())); } else { throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for " @@ -298,7 +306,7 @@ public class MergeOnReadInputFormat final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema); final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType()); - final HoodieMergedLogRecordScanner scanner = FormatUtils.scanLog(split, tableSchema, hadoopConf); + final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, hadoopConf, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)); final Iterator logRecordsKeyIterator = scanner.getRecords().keySet().iterator(); final int[] pkOffset = tableState.getPkOffsetsInRequired(); // flag saying whether the pk semantics has been dropped by user specified @@ -335,18 +343,20 @@ public class MergeOnReadInputFormat } delete.setRowKind(RowKind.DELETE); - this.currentRecord = delete; + this.currentRecord = delete; return true; } // skipping if the condition is unsatisfied // continue; } else { + final IndexedRecord avroRecord = curAvroRecord.get(); GenericRecord requiredAvroRecord = buildAvroRecordBySchema( - curAvroRecord.get(), + avroRecord, requiredSchema, requiredPos, recordBuilder); currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord); + FormatUtils.setRowKind(currentRecord, avroRecord, tableState.getOperationPos()); return true; } } @@ -365,6 +375,55 @@ public class MergeOnReadInputFormat }; } + private ClosableIterator getUnMergedLogFileIterator(MergeOnReadInputSplit split) { + final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema()); + final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema()); + final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema); + final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter = + AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType()); + final FormatUtils.BoundedMemoryRecords records = new FormatUtils.BoundedMemoryRecords(split, tableSchema, hadoopConf); + final Iterator> recordsIterator = records.getRecordsIterator(); + + return new ClosableIterator() { + private RowData currentRecord; + + @Override + public boolean hasNext() { + while (recordsIterator.hasNext()) { + Option curAvroRecord = null; + final HoodieRecord hoodieRecord = recordsIterator.next(); + try { + curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema); + } catch (IOException e) { + throw new HoodieException("Get avro insert value error for key: " + hoodieRecord.getRecordKey(), e); + } + if (curAvroRecord.isPresent()) { + final IndexedRecord avroRecord = curAvroRecord.get(); + GenericRecord requiredAvroRecord = buildAvroRecordBySchema( + avroRecord, + requiredSchema, + requiredPos, + recordBuilder); + currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord); + FormatUtils.setRowKind(currentRecord, avroRecord, tableState.getOperationPos()); + return true; + } + } + return false; + } + + @Override + public RowData next() { + return currentRecord; + } + + @Override + public void close() { + records.close(); + } + }; + } + // ------------------------------------------------------------------------- // Inner Class // ------------------------------------------------------------------------- @@ -544,6 +603,8 @@ public class MergeOnReadInputFormat private final Schema tableSchema; private final Schema requiredSchema; private final int[] requiredPos; + private final boolean emitDelete; + private final int operationPos; private final RowDataToAvroConverters.RowDataToAvroConverter rowDataToAvroConverter; private final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter; private final GenericRecordBuilder recordBuilder; @@ -557,7 +618,7 @@ public class MergeOnReadInputFormat // refactor it out once FLINK-22370 is resolved. private boolean readLogs = false; - private Set keyToSkip = new HashSet<>(); + private final Set keyToSkip = new HashSet<>(); private RowData currentRecord; @@ -569,13 +630,18 @@ public class MergeOnReadInputFormat Schema tableSchema, Schema requiredSchema, int[] requiredPos, + boolean emitDelete, + boolean withOperationField, + int operationPos, ParquetColumnarRowSplitReader reader) { // the reader should be with full schema this.tableSchema = tableSchema; this.reader = reader; - this.scanner = FormatUtils.scanLog(split, tableSchema, hadoopConf); + this.scanner = FormatUtils.logScanner(split, tableSchema, hadoopConf, withOperationField); this.logKeysIterator = scanner.getRecords().keySet().iterator(); this.requiredSchema = requiredSchema; this.requiredPos = requiredPos; + this.emitDelete = emitDelete; + this.operationPos = operationPos; this.recordBuilder = new GenericRecordBuilder(requiredSchema); this.rowDataToAvroConverter = RowDataToAvroConverters.createConverter(tableRowType); this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType); @@ -602,12 +668,13 @@ public class MergeOnReadInputFormat // deleted continue; } else { - GenericRecord record = buildAvroRecordBySchema( + GenericRecord avroRecord = buildAvroRecordBySchema( mergedAvroRecord.get(), requiredSchema, requiredPos, recordBuilder); - this.currentRecord = (RowData) avroToRowDataConverter.convert(record); + this.currentRecord = (RowData) avroToRowDataConverter.convert(avroRecord); + FormatUtils.setRowKind(this.currentRecord, mergedAvroRecord.get(), this.operationPos); return false; } } @@ -620,16 +687,16 @@ public class MergeOnReadInputFormat while (logKeysIterator.hasNext()) { final String curKey = logKeysIterator.next(); if (!keyToSkip.contains(curKey)) { - Option insertAvroRecord = - scanner.getRecords().get(curKey).getData().getInsertValue(tableSchema); + Option insertAvroRecord = getInsertValue(curKey); if (insertAvroRecord.isPresent()) { // the record is a DELETE if insertAvroRecord not present, skipping - GenericRecord requiredAvroRecord = buildAvroRecordBySchema( + GenericRecord avroRecord = buildAvroRecordBySchema( insertAvroRecord.get(), requiredSchema, requiredPos, recordBuilder); - this.currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord); + this.currentRecord = (RowData) avroToRowDataConverter.convert(avroRecord); + FormatUtils.setRowKind(this.currentRecord, insertAvroRecord.get(), this.operationPos); return false; } } @@ -637,6 +704,14 @@ public class MergeOnReadInputFormat return true; } + private Option getInsertValue(String curKey) throws IOException { + final HoodieRecord record = scanner.getRecords().get(curKey); + if (!emitDelete && HoodieOperation.isDelete(record.getOperation())) { + return Option.empty(); + } + return record.getData().getInsertValue(tableSchema); + } + @Override public RowData nextRecord() { return currentRecord; @@ -655,8 +730,12 @@ public class MergeOnReadInputFormat private Option mergeRowWithLog( RowData curRow, String curKey) throws IOException { + final HoodieRecord record = scanner.getRecords().get(curKey); + if (!emitDelete && HoodieOperation.isDelete(record.getOperation())) { + return Option.empty(); + } GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow); - return scanner.getRecords().get(curKey).getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema); + return record.getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java index 9a32af636..0a63c9125 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java @@ -18,6 +18,8 @@ package org.apache.hudi.table.format.mor; +import org.apache.hudi.common.model.HoodieRecord; + import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; @@ -38,6 +40,7 @@ public class MergeOnReadTableState implements Serializable { private final String requiredAvroSchema; private final List inputSplits; private final String[] pkFields; + private final int operationPos; public MergeOnReadTableState( RowType rowType, @@ -52,6 +55,7 @@ public class MergeOnReadTableState implements Serializable { this.requiredAvroSchema = requiredAvroSchema; this.inputSplits = inputSplits; this.pkFields = pkFields; + this.operationPos = rowType.getFieldIndex(HoodieRecord.OPERATION_METADATA_FIELD); } public RowType getRowType() { @@ -74,6 +78,10 @@ public class MergeOnReadTableState implements Serializable { return inputSplits; } + public int getOperationPos() { + return operationPos; + } + public int[] getRequiredPositions() { final List fieldNames = rowType.getFieldNames(); return requiredRowType.getFieldNames().stream() diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/ChangelogModes.java b/hudi-flink/src/main/java/org/apache/hudi/util/ChangelogModes.java new file mode 100644 index 000000000..164815b4a --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/util/ChangelogModes.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.types.RowKind; + +/** + * Utilities for all kinds of common {@link org.apache.flink.table.connector.ChangelogMode}s. + */ +public class ChangelogModes { + public static final ChangelogMode FULL = ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.UPDATE_BEFORE) + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build(); + + /** + * Change log mode that ignores UPDATE_BEFORE, e.g UPSERT. + */ + public static final ChangelogMode UPSERT = ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build(); + + private ChangelogModes() { + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 86de90e5b..06305e2dc 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -186,6 +186,7 @@ public class StreamerUtil { .build()) .withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton .withAutoCommit(false) + .withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf))); builder = builder.withSchema(getSourceSchema(conf).toString()); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java index 6d802fefd..ade43c571 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java @@ -38,9 +38,9 @@ import org.apache.hudi.sink.compact.CompactionPlanSourceFunction; import org.apache.hudi.sink.compact.FlinkCompactionConfig; import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.partitioner.BucketAssignOperator; +import org.apache.hudi.sink.transform.ChainedTransformer; import org.apache.hudi.sink.transform.RowDataToHoodieFunction; import org.apache.hudi.sink.transform.Transformer; -import org.apache.hudi.sink.transform.ChainedTransformer; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.CompactionUtil; @@ -85,6 +85,8 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; +import static org.junit.jupiter.api.Assertions.assertTrue; + /** * Integration test for Flink Hoodie stream sink. */ @@ -200,7 +202,9 @@ public class StreamWriteITCase extends TestLogger { // To compute the compaction instant time and do compaction. String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient); HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null); - writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); + boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); + + assertTrue(scheduled, "The compaction plan should be scheduled"); HoodieFlinkTable table = writeClient.getHoodieTable(); // generate compaction plan @@ -209,8 +213,10 @@ public class StreamWriteITCase extends TestLogger { table.getMetaClient(), compactionInstantTime); HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); + // Mark instant as compaction inflight + table.getActiveTimeline().transitionCompactionRequestedToInflight(instant); - env.addSource(new CompactionPlanSourceFunction(table, instant, compactionPlan, compactionInstantTime)) + env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime)) .name("compaction_source") .uid("uid_compaction_source") .rebalance() diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index d5639c580..8e7f8099b 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -399,33 +399,29 @@ public class TestWriteCopyOnWrite { // the coordinator checkpoint commits the inflight instant. checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); - Map expected = new HashMap<>(); - // id3, id5 were deleted and id9 is ignored - expected.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]"); - expected.put("par2", "[id4,par2,id4,Fabian,31,4,par2]"); - expected.put("par3", "[id6,par3,id6,Emma,20,6,par3]"); - expected.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]"); + Map expected = getUpsertWithDeleteExpected(); checkWrittenData(tempFile, expected); } @Test public void testInsertWithMiniBatches() throws Exception { // reset the config option - conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size + conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0008); // 839 bytes batch size funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); // open the function and ingest data funcWrapper.openFunction(); - // Each record is 208 bytes. so 4 records expect to trigger a mini-batch write + // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes. + // so 3 records expect to trigger a mini-batch write for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { funcWrapper.invoke(rowData); } Map> dataBuffer = funcWrapper.getDataBuffer(); assertThat("Should have 1 data bucket", dataBuffer.size(), is(1)); - assertThat("2 records expect to flush out as a mini-batch", + assertThat("3 records expect to flush out as a mini-batch", dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), - is(2)); + is(3)); // this triggers the data write and event send funcWrapper.checkpointFunction(1); @@ -472,22 +468,23 @@ public class TestWriteCopyOnWrite { @Test public void testInsertWithDeduplication() throws Exception { // reset the config option - conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size + conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0008); // 839 bytes batch size conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true); funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); // open the function and ingest data funcWrapper.openFunction(); - // Each record is 208 bytes. so 4 records expect to trigger a mini-batch write + // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes. + // so 3 records expect to trigger a mini-batch write for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { funcWrapper.invoke(rowData); } Map> dataBuffer = funcWrapper.getDataBuffer(); assertThat("Should have 1 data bucket", dataBuffer.size(), is(1)); - assertThat("2 records expect to flush out as a mini-batch", + assertThat("3 records expect to flush out as a mini-batch", dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), - is(2)); + is(3)); // this triggers the data write and event send funcWrapper.checkpointFunction(1); @@ -612,12 +609,13 @@ public class TestWriteCopyOnWrite { @Test public void testInsertWithSmallBufferSize() throws Exception { // reset the config option - conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size + conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0008); // 839 bytes buffer size funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); // open the function and ingest data funcWrapper.openFunction(); - // each record is 208 bytes. so 4 records expect to trigger buffer flush: + // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes. + // so 3 records expect to trigger a mini-batch write // flush the max size bucket once at a time. for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { funcWrapper.invoke(rowData); @@ -625,9 +623,9 @@ public class TestWriteCopyOnWrite { Map> dataBuffer = funcWrapper.getDataBuffer(); assertThat("Should have 1 data bucket", dataBuffer.size(), is(1)); - assertThat("2 records expect to flush out as a mini-batch", + assertThat("3 records expect to flush out as a mini-batch", dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), - is(2)); + is(3)); // this triggers the data write and event send funcWrapper.checkpointFunction(1); @@ -676,8 +674,17 @@ public class TestWriteCopyOnWrite { // the last 2 lines are merged expected.put("par1", "[" + "id1,par1,id1,Danny,23,1,par1, " - + "id1,par1,id1,Danny,23,1,par1, " - + "id1,par1,id1,Danny,23,1,par1]"); + + "id1,par1,id1,Danny,23,1,par1" + "]"); + return expected; + } + + protected Map getUpsertWithDeleteExpected() { + Map expected = new HashMap<>(); + // id3, id5 were deleted and id9 is ignored + expected.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]"); + expected.put("par2", "[id4,par2,id4,Fabian,31,4,par2]"); + expected.put("par3", "[id6,par3,id6,Emma,20,6,par3]"); + expected.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]"); return expected; } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java b/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java index 0d445d60d..32ee72541 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java @@ -47,7 +47,7 @@ public class TestRowDataKeyGen { assertThat(keyGen1.getPartitionPath(rowData1), is("par1")); // null record key and partition path - final RowData rowData2 = insertRow(null, StringData.fromString("Danny"), 23, + final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE, null, StringData.fromString("Danny"), 23, TimestampData.fromEpochMillis(1), null); assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData2)); assertThat(keyGen1.getPartitionPath(rowData2), is("default")); @@ -77,7 +77,7 @@ public class TestRowDataKeyGen { assertThat(keyGen1.getPartitionPath(rowData1), is("par1/1970-01-01T00:00:00.001")); // null record key and partition path - final RowData rowData2 = insertRow(null, null, 23, null, null); + final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE,null, null, 23, null, null); assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData2)); assertThat(keyGen1.getPartitionPath(rowData2), is("default/default")); // empty record key and partition path diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 957b9b891..f8dc01825 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -261,6 +261,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); conf.setString(FlinkOptions.TABLE_NAME, "t1"); conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + conf.setBoolean(FlinkOptions.CHANGELOG_ENABLED, true); // write one commit TestData.writeData(TestData.DATA_SET_INSERT, conf); @@ -276,17 +277,20 @@ public class HoodieDataSourceITCase extends AbstractTestBase { options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); options.put(FlinkOptions.READ_STREAMING_CHECK_INTERVAL.key(), "2"); options.put(FlinkOptions.READ_STREAMING_START_COMMIT.key(), latestCommit); + options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true"); String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); streamTableEnv.executeSql(hoodieTableDDL); - List result = execSelectSql(streamTableEnv, "select * from t1", 10); - final String expected = "[" - + "id1,Danny,24,1970-01-01T00:00:00.001,par1, " - + "id2,Stephen,34,1970-01-01T00:00:00.002,par1, " - + "id3,null,null,null,null, " - + "id5,null,null,null,null, " - + "id9,null,null,null,null]"; - assertRowsEquals(result, expected); + final String sinkDDL = "create table sink(\n" + + " name varchar(20),\n" + + " age_sum int\n" + + ") with (\n" + + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'" + + ")"; + List result = execSelectSql(streamTableEnv, + "select name, sum(age) from t1 group by name", sinkDDL, 10); + final String expected = "[+I(Danny,24), +I(Stephen,34)]"; + assertRowsEquals(result, expected, true); } @ParameterizedTest @@ -724,6 +728,11 @@ public class HoodieDataSourceITCase extends AbstractTestBase { } else { sinkDDL = TestConfigurations.getCollectSinkDDL("sink"); } + return execSelectSql(tEnv, select, sinkDDL, timeout); + } + + private List execSelectSql(TableEnvironment tEnv, String select, String sinkDDL, long timeout) + throws InterruptedException { tEnv.executeSql(sinkDDL); TableResult tableResult = tEnv.executeSql("insert into sink " + select); // wait for the timeout then cancels the job @@ -731,7 +740,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { tableResult.getJobClient().ifPresent(JobClient::cancel); tEnv.executeSql("DROP TABLE IF EXISTS sink"); return CollectSinkTableFactory.RESULT.values().stream() - .flatMap(Collection::stream) - .collect(Collectors.toList()); + .flatMap(Collection::stream) + .collect(Collectors.toList()); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index c8885b043..55359451e 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -21,6 +21,7 @@ package org.apache.hudi.table.format; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.table.HoodieTableSource; +import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat; import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; @@ -60,9 +61,14 @@ public class TestInputFormat { File tempFile; void beforeEach(HoodieTableType tableType) throws IOException { + beforeEach(tableType, Collections.emptyMap()); + } + + void beforeEach(HoodieTableType tableType, Map options) throws IOException { conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); conf.setString(FlinkOptions.TABLE_TYPE, tableType.name()); conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // close the async compaction + options.forEach((key, value) -> conf.setString(key, value)); StreamerUtil.initTableIfNotExists(conf); this.tableSource = new HoodieTableSource( @@ -163,8 +169,62 @@ public class TestInputFormat { } @Test - void testReadWithDeletes() throws Exception { - beforeEach(HoodieTableType.MERGE_ON_READ); + void testReadBaseAndLogFilesWithDeletes() throws Exception { + Map options = new HashMap<>(); + options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true"); + beforeEach(HoodieTableType.MERGE_ON_READ, options); + + // write base first with compaction. + conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true); + conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); + TestData.writeData(TestData.DATA_SET_INSERT, conf); + + // write another commit using logs and read again. + conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); + TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf); + + InputFormat inputFormat = this.tableSource.getInputFormat(); + assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class)); + + // when isEmitDelete is false. + List result1 = readData(inputFormat); + + final String actual1 = TestData.rowDataToString(result1, true); + final String expected1 = "[" + + "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), " + + "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1), " + + "+I(id4,Fabian,31,1970-01-01T00:00:00.004,par2), " + + "+I(id6,Emma,20,1970-01-01T00:00:00.006,par3), " + + "+I(id7,Bob,44,1970-01-01T00:00:00.007,par4), " + + "+I(id8,Han,56,1970-01-01T00:00:00.008,par4)]"; + assertThat(actual1, is(expected1)); + + // refresh the input format and set isEmitDelete to true. + this.tableSource.reset(); + inputFormat = this.tableSource.getInputFormat(); + ((MergeOnReadInputFormat) inputFormat).isEmitDelete(true); + + List result2 = readData(inputFormat); + + final String actual2 = TestData.rowDataToString(result2, true); + final String expected2 = "[" + + "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), " + + "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1), " + + "-D(id3,Julian,53,1970-01-01T00:00:00.003,par2), " + + "+I(id4,Fabian,31,1970-01-01T00:00:00.004,par2), " + + "-D(id5,Sophia,18,1970-01-01T00:00:00.005,par3), " + + "+I(id6,Emma,20,1970-01-01T00:00:00.006,par3), " + + "+I(id7,Bob,44,1970-01-01T00:00:00.007,par4), " + + "+I(id8,Han,56,1970-01-01T00:00:00.008,par4), " + + "-D(id9,Jane,19,1970-01-01T00:00:00.006,par3)]"; + assertThat(actual2, is(expected2)); + } + + @Test + void testReadWithDeletesMOR() throws Exception { + Map options = new HashMap<>(); + options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true"); + beforeEach(HoodieTableType.MERGE_ON_READ, options); // write another commit to read again TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf); @@ -175,13 +235,32 @@ public class TestInputFormat { List result = readData(inputFormat); - final String actual = TestData.rowDataToString(result); + final String actual = TestData.rowDataToString(result, true); final String expected = "[" - + "id1,Danny,24,1970-01-01T00:00:00.001,par1, " - + "id2,Stephen,34,1970-01-01T00:00:00.002,par1, " - + "id3,null,null,null,null, " - + "id5,null,null,null,null, " - + "id9,null,null,null,null]"; + + "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), " + + "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1), " + + "-D(id3,Julian,53,1970-01-01T00:00:00.003,par2), " + + "-D(id5,Sophia,18,1970-01-01T00:00:00.005,par3), " + + "-D(id9,Jane,19,1970-01-01T00:00:00.006,par3)]"; + assertThat(actual, is(expected)); + } + + @Test + void testReadWithDeletesCOW() throws Exception { + beforeEach(HoodieTableType.COPY_ON_WRITE); + + // write another commit to read again + TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf); + + InputFormat inputFormat = this.tableSource.getInputFormat(); + assertThat(inputFormat, instanceOf(CopyOnWriteInputFormat.class)); + + List result = readData(inputFormat); + + final String actual = TestData.rowDataToString(result, true); + final String expected = "[" + + "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), " + + "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1)]"; assertThat(actual, is(expected)); } @@ -205,6 +284,33 @@ public class TestInputFormat { assertThat(actual, is(expected)); } + @Test + void testReadChangesUnMergedMOR() throws Exception { + Map options = new HashMap<>(); + options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true"); + beforeEach(HoodieTableType.MERGE_ON_READ, options); + + // write another commit to read again + TestData.writeData(TestData.DATA_SET_INSERT_UPDATE_DELETE, conf); + + InputFormat inputFormat = this.tableSource.getInputFormat(); + assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class)); + + List result = readData(inputFormat); + + final String actual = TestData.rowDataToString(result, true); + final String expected = "[" + + "+I(id1,Danny,19,1970-01-01T00:00:00.001,par1), " + + "-U(id1,Danny,19,1970-01-01T00:00:00.001,par1), " + + "+U(id1,Danny,20,1970-01-01T00:00:00.002,par1), " + + "-U(id1,Danny,20,1970-01-01T00:00:00.002,par1), " + + "+U(id1,Danny,21,1970-01-01T00:00:00.003,par1), " + + "-U(id1,Danny,21,1970-01-01T00:00:00.003,par1), " + + "+U(id1,Danny,22,1970-01-01T00:00:00.004,par1), " + + "-D(id1,Danny,22,1970-01-01T00:00:00.005,par1)]"; + assertThat(actual, is(expected)); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index f5ac9c529..d3e32e699 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -234,15 +234,53 @@ public class TestData { TimestampData.fromEpochMillis(6), StringData.fromString("par3")) ); + public static List DATA_SET_INSERT_UPDATE_DELETE = Arrays.asList( + // INSERT + insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 19, + TimestampData.fromEpochMillis(1), StringData.fromString("par1")), + // UPDATE + updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 19, + TimestampData.fromEpochMillis(1), StringData.fromString("par1")), + updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 20, + TimestampData.fromEpochMillis(2), StringData.fromString("par1")), + updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 20, + TimestampData.fromEpochMillis(2), StringData.fromString("par1")), + updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21, + TimestampData.fromEpochMillis(3), StringData.fromString("par1")), + updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21, + TimestampData.fromEpochMillis(3), StringData.fromString("par1")), + updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22, + TimestampData.fromEpochMillis(4), StringData.fromString("par1")), + // DELETE + deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22, + TimestampData.fromEpochMillis(5), StringData.fromString("par1")) + ); + /** * Returns string format of a list of RowData. */ public static String rowDataToString(List rows) { + return rowDataToString(rows, false); + } + + /** + * Returns string format of a list of RowData. + * + * @param withChangeFlag whether to print the change flag + */ + public static String rowDataToString(List rows, boolean withChangeFlag) { DataStructureConverter converter = DataStructureConverters.getConverter(TestConfigurations.ROW_DATA_TYPE); return rows.stream() - .map(row -> converter.toExternal(row).toString()) - .sorted(Comparator.naturalOrder()) + .sorted(Comparator.comparing(o -> toStringSafely(o.getString(0)))) + .map(row -> { + final String rowStr = converter.toExternal(row).toString(); + if (withChangeFlag) { + return row.getRowKind().shortString() + "(" + rowStr + ")"; + } else { + return rowStr; + } + }) .collect(Collectors.toList()).toString(); } @@ -287,7 +325,30 @@ public class TestData { * @param expected Expected string of the sorted rows */ public static void assertRowsEquals(List rows, String expected) { - assertRowsEquals(rows, expected, 0); + assertRowsEquals(rows, expected, false); + } + + /** + * Sort the {@code rows} using field at index 0 and asserts + * it equals with the expected string {@code expected}. + * + * @param rows Actual result rows + * @param expected Expected string of the sorted rows + * @param withChangeFlag Whether compares with change flags + */ + public static void assertRowsEquals(List rows, String expected, boolean withChangeFlag) { + String rowsString = rows.stream() + .sorted(Comparator.comparing(o -> toStringSafely(o.getField(0)))) + .map(row -> { + final String rowStr = row.toString(); + if (withChangeFlag) { + return row.getKind().shortString() + "(" + rowStr + ")"; + } else { + return rowStr; + } + }) + .collect(Collectors.toList()).toString(); + assertThat(rowsString, is(expected)); } /** @@ -573,7 +634,11 @@ public class TestData { } public static BinaryRowData insertRow(Object... fields) { - LogicalType[] types = TestConfigurations.ROW_TYPE.getFields().stream().map(RowType.RowField::getType) + return insertRow(TestConfigurations.ROW_TYPE, fields); + } + + public static BinaryRowData insertRow(RowType rowType, Object... fields) { + LogicalType[] types = rowType.getFields().stream().map(RowType.RowField::getType) .toArray(LogicalType[]::new); assertEquals( "Filed count inconsistent with type information", @@ -599,4 +664,16 @@ public class TestData { rowData.setRowKind(RowKind.DELETE); return rowData; } + + private static BinaryRowData updateBeforeRow(Object... fields) { + BinaryRowData rowData = insertRow(fields); + rowData.setRowKind(RowKind.UPDATE_BEFORE); + return rowData; + } + + private static BinaryRowData updateAfterRow(Object... fields) { + BinaryRowData rowData = insertRow(fields); + rowData.setRowKind(RowKind.UPDATE_AFTER); + return rowData; + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestHoodieRowData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestHoodieRowData.java new file mode 100644 index 000000000..772904283 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestHoodieRowData.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utils; + +import org.apache.hudi.client.model.HoodieRowData; +import org.apache.hudi.common.model.HoodieRecord; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.util.Random; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Unit tests {@link HoodieRowData}. + */ +public class TestHoodieRowData { + private final int metaColumnsNum = HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.size(); + private static final Random RANDOM = new Random(); + private static final int INTEGER_INDEX = 0; + private static final int STRING_INDEX = 1; + private static final int BOOLEAN_INDEX = 2; + private static final int SHORT_INDEX = 3; + private static final int BYTE_INDEX = 4; + private static final int LONG_INDEX = 5; + private static final int FLOAT_INDEX = 6; + private static final int DOUBLE_INDEX = 7; + private static final int DECIMAL_INDEX = 8; + private static final int BINARY_INDEX = 9; + private static final int ROW_INDEX = 10; + + private static final DataType BASIC_DATA_TYPE = DataTypes.ROW( + DataTypes.FIELD("integer", DataTypes.INT()), + DataTypes.FIELD("string", DataTypes.STRING()), + DataTypes.FIELD("boolean", DataTypes.BOOLEAN()), + DataTypes.FIELD("short", DataTypes.SMALLINT()), + DataTypes.FIELD("byte", DataTypes.TINYINT()), + DataTypes.FIELD("long", DataTypes.BIGINT()), + DataTypes.FIELD("float", DataTypes.FLOAT()), + DataTypes.FIELD("double", DataTypes.DOUBLE()), + DataTypes.FIELD("decimal", DataTypes.DECIMAL(10, 4)), + DataTypes.FIELD("binary", DataTypes.BYTES()), + DataTypes.FIELD("row", DataTypes.ROW())) + .notNull(); + private static final RowType ROW_TYPE = (RowType) BASIC_DATA_TYPE.getLogicalType(); + + @Test + public void testGet() { + Object[] values = getRandomValue(true); + RowData rowData = TestData.insertRow(ROW_TYPE, values); + + HoodieRowData hoodieRowData = new HoodieRowData("commitTime", "commitSeqNo", "recordKey", "partitionPath", "fileName", + rowData, true); + assertValues(hoodieRowData, "commitTime", "commitSeqNo", "recordKey", "partitionPath", + "fileName", values); + } + + /** + * Fetches a random Object[] of values for testing. + * + * @param haveRowType true if rowType need to be added as one of the elements in the Object[] + * @return the random Object[] thus generated + */ + private Object[] getRandomValue(boolean haveRowType) { + Object[] values = new Object[11]; + values[INTEGER_INDEX] = RANDOM.nextInt(); + values[STRING_INDEX] = StringData.fromString(UUID.randomUUID().toString()); + values[BOOLEAN_INDEX] = RANDOM.nextBoolean(); + values[SHORT_INDEX] = (short) RANDOM.nextInt(2); + byte[] bytes = new byte[1]; + RANDOM.nextBytes(bytes); + values[BYTE_INDEX] = bytes[0]; + values[LONG_INDEX] = RANDOM.nextLong(); + values[FLOAT_INDEX] = RANDOM.nextFloat(); + values[DOUBLE_INDEX] = RANDOM.nextDouble(); + values[DECIMAL_INDEX] = DecimalData.fromBigDecimal(new BigDecimal("1005.12313"), 10, 4); + bytes = new byte[20]; + RANDOM.nextBytes(bytes); + values[BINARY_INDEX] = bytes; + if (haveRowType) { + Object[] rowField = getRandomValue(false); + values[ROW_INDEX] = TestData.insertRow(ROW_TYPE, rowField); + } + return values; + } + + private void assertValues(HoodieRowData hoodieRowData, String commitTime, String commitSeqNo, String recordKey, String partitionPath, + String filename, Object[] values) { + assertEquals(commitTime, hoodieRowData.getString(0).toString()); + assertEquals(commitSeqNo, hoodieRowData.getString(1).toString()); + assertEquals(recordKey, hoodieRowData.getString(2).toString()); + assertEquals(partitionPath, hoodieRowData.getString(3).toString()); + assertEquals(filename, hoodieRowData.getString(4).toString()); + assertEquals("I", hoodieRowData.getString(5).toString()); + // row data. + assertEquals(values[INTEGER_INDEX], hoodieRowData.getInt(INTEGER_INDEX + metaColumnsNum)); + assertEquals(values[STRING_INDEX], hoodieRowData.getString(STRING_INDEX + metaColumnsNum)); + assertEquals(values[BOOLEAN_INDEX], hoodieRowData.getBoolean(BOOLEAN_INDEX + metaColumnsNum)); + assertEquals(values[SHORT_INDEX], hoodieRowData.getShort(SHORT_INDEX + metaColumnsNum)); + assertEquals(values[BYTE_INDEX], hoodieRowData.getByte(BYTE_INDEX + metaColumnsNum)); + assertEquals(values[LONG_INDEX], hoodieRowData.getLong(LONG_INDEX + metaColumnsNum)); + assertEquals(values[FLOAT_INDEX], hoodieRowData.getFloat(FLOAT_INDEX + metaColumnsNum)); + assertEquals(values[DOUBLE_INDEX], hoodieRowData.getDouble(DOUBLE_INDEX + metaColumnsNum)); + assertEquals(values[DECIMAL_INDEX], hoodieRowData.getDecimal(DECIMAL_INDEX + metaColumnsNum, 10, 4)); + byte[] exceptBinary = (byte[]) values[BINARY_INDEX]; + byte[] binary = hoodieRowData.getBinary(BINARY_INDEX + metaColumnsNum); + for (int i = 0; i < exceptBinary.length; i++) { + assertEquals(exceptBinary[i], binary[i]); + } + assertEquals(values[ROW_INDEX], hoodieRowData.getRow(ROW_INDEX + metaColumnsNum, values.length)); + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java index 6dc9325fb..33e9d3765 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java @@ -150,6 +150,7 @@ public class CollectSinkTableFactory implements DynamicTableSinkFactory { public void invoke(RowData value, SinkFunction.Context context) { Row row = (Row) converter.toExternal(value); assert row != null; + row.setKind(value.getRowKind()); RESULT.get(taskID).add(row); } diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java index cd84c3b76..6af011945 100644 --- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java +++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java @@ -71,7 +71,7 @@ public class HoodieDLAClient extends AbstractSyncHoodieClient { public HoodieDLAClient(DLASyncConfig syncConfig, FileSystem fs) { super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata, - syncConfig.verifyMetadataFileListing, fs); + syncConfig.verifyMetadataFileListing, false, fs); this.dlaConfig = syncConfig; try { this.partitionValueExtractor = diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index 560563cb7..d9d833d39 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -120,6 +120,9 @@ public class HiveSyncConfig implements Serializable { @Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore.") public int sparkSchemaLengthThreshold = 4000; + @Parameter(names = {"--with-operation-field"}, description = "Whether to include the '_hoodie_operation' field in the metadata fields") + public Boolean withOperationField = false; + // enhance the similar function in child class public static HiveSyncConfig copy(HiveSyncConfig cfg) { HiveSyncConfig newConfig = new HiveSyncConfig(); @@ -143,6 +146,7 @@ public class HiveSyncConfig implements Serializable { newConfig.batchSyncNum = cfg.batchSyncNum; newConfig.syncAsSparkDataSourceTable = cfg.syncAsSparkDataSourceTable; newConfig.sparkSchemaLengthThreshold = cfg.sparkSchemaLengthThreshold; + newConfig.withOperationField = cfg.withOperationField; return newConfig; } @@ -174,6 +178,7 @@ public class HiveSyncConfig implements Serializable { + ", createManagedTable=" + createManagedTable + ", syncAsSparkDataSourceTable=" + syncAsSparkDataSourceTable + ", sparkSchemaLengthThreshold=" + sparkSchemaLengthThreshold + + ", withOperationField=" + withOperationField + '}'; } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index 4a22bb883..13e48f512 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -62,7 +62,7 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { private final HiveSyncConfig syncConfig; public HoodieHiveClient(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) { - super(cfg.basePath, cfg.assumeDatePartitioning, cfg.useFileListingFromMetadata, cfg.verifyMetadataFileListing, fs); + super(cfg.basePath, cfg.assumeDatePartitioning, cfg.useFileListingFromMetadata, cfg.verifyMetadataFileListing, cfg.withOperationField, fs); this.syncConfig = cfg; // Support JDBC, HiveQL and metastore based implementations for backwards compatiblity. Future users should diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java index 1107d744e..11ff74528 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java @@ -51,19 +51,21 @@ public abstract class AbstractSyncHoodieClient { protected final HoodieTableMetaClient metaClient; protected final HoodieTableType tableType; protected final FileSystem fs; - private String basePath; - private boolean assumeDatePartitioning; - private boolean useFileListingFromMetadata; - private boolean verifyMetadataFileListing; + private final String basePath; + private final boolean assumeDatePartitioning; + private final boolean useFileListingFromMetadata; + private final boolean verifyMetadataFileListing; + private final boolean withOperationField; public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, boolean useFileListingFromMetadata, - boolean verifyMetadataFileListing, FileSystem fs) { + boolean verifyMetadataFileListing, boolean withOperationField, FileSystem fs) { this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); this.tableType = metaClient.getTableType(); this.basePath = basePath; this.assumeDatePartitioning = assumeDatePartitioning; this.useFileListingFromMetadata = useFileListingFromMetadata; this.verifyMetadataFileListing = verifyMetadataFileListing; + this.withOperationField = withOperationField; this.fs = fs; } @@ -139,7 +141,11 @@ public abstract class AbstractSyncHoodieClient { */ public MessageType getDataSchema() { try { - return new TableSchemaResolver(metaClient).getTableParquetSchema(); + if (withOperationField) { + return new TableSchemaResolver(metaClient, true).getTableParquetSchema(); + } else { + return new TableSchemaResolver(metaClient).getTableParquetSchema(); + } } catch (Exception e) { throw new HoodieSyncException("Failed to read data schema", e); }