From 46a2399a45803e1b0d863eec47168a09c37ab920 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Tue, 18 May 2021 13:55:38 +0800 Subject: [PATCH] [HUDI-1902] Global index for flink writer (#2958) Supports deduplication for record keys with different partition path. --- .../hudi/common/model/BaseAvroPayload.java | 2 +- .../model/HoodieRecordGlobalLocation.java | 97 +++++++++++++++++ .../common/model/HoodieRecordLocation.java | 4 +- .../hudi/configuration/FlinkOptions.java | 7 ++ .../partitioner/BucketAssignFunction.java | 100 +++++++++++++----- .../transform/RowDataToHoodieFunction.java | 59 +---------- .../hudi/sink/utils/PayloadCreation.java | 93 ++++++++++++++++ .../hudi/table/HoodieDataSourceITCase.java | 50 +++++++++ .../java/org/apache/hudi/utils/TestData.java | 14 ++- .../src/test/resources/test_source_4.data | 8 ++ 10 files changed, 345 insertions(+), 89 deletions(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java create mode 100644 hudi-flink/src/test/resources/test_source_4.data diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java index 3b35b0d4d..cd3a95e6b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java @@ -37,7 +37,7 @@ public abstract class BaseAvroPayload implements Serializable { /** * For purposes of preCombining. */ - protected final Comparable orderingVal; + public final Comparable orderingVal; /** * Instantiate {@link BaseAvroPayload}. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java new file mode 100644 index 000000000..f469a1ab4 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import java.util.Objects; + +/** + * Similar with {@link org.apache.hudi.common.model.HoodieRecordLocation} but with partition path. + */ +public class HoodieRecordGlobalLocation extends HoodieRecordLocation { + private static final long serialVersionUID = 1L; + + private String partitionPath; + + public HoodieRecordGlobalLocation() { + } + + public HoodieRecordGlobalLocation(String partitionPath, String instantTime, String fileId) { + super(instantTime, fileId); + this.partitionPath = partitionPath; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("HoodieGlobalRecordLocation {"); + sb.append("partitionPath=").append(partitionPath).append(", "); + sb.append("instantTime=").append(instantTime).append(", "); + sb.append("fileId=").append(fileId); + sb.append('}'); + return sb.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HoodieRecordGlobalLocation otherLoc = (HoodieRecordGlobalLocation) o; + return Objects.equals(partitionPath, otherLoc.partitionPath) + && Objects.equals(instantTime, otherLoc.instantTime) + && Objects.equals(fileId, otherLoc.fileId); + } + + @Override + public int hashCode() { + return Objects.hash(partitionPath, instantTime, fileId); + } + + public String getPartitionPath() { + return partitionPath; + } + + public void setPartitionPath(String partitionPath) { + this.partitionPath = partitionPath; + } + + /** + * Returns the global record location from local. + */ + public static HoodieRecordGlobalLocation fromLocal(String partitionPath, HoodieRecordLocation localLoc) { + return new HoodieRecordGlobalLocation(partitionPath, localLoc.getInstantTime(), localLoc.getFileId()); + } + + /** + * Returns the record location as local. + */ + public HoodieRecordLocation toLocal(String instantTime) { + return new HoodieRecordLocation(instantTime, fileId); + } + + /** + * Copy the location with given partition path. + */ + public HoodieRecordGlobalLocation copy(String partitionPath) { + return new HoodieRecordGlobalLocation(partitionPath, instantTime, fileId); + } +} + diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java index 1692cfbaa..2b1feab39 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java @@ -26,8 +26,8 @@ import java.util.Objects; */ public class HoodieRecordLocation implements Serializable { - private String instantTime; - private String fileId; + protected String instantTime; + protected String fileId; public HoodieRecordLocation() { } diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 4bed143de..33a16c0dd 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -86,6 +86,13 @@ public class FlinkOptions { .defaultValue(1.5D) .withDescription("Index state ttl in days, default 1.5 day"); + public static final ConfigOption INDEX_GLOBAL_ENABLED = ConfigOptions + .key("index.global.enabled") + .booleanType() + .defaultValue(true) + .withDescription("Whether to update index for the old partition path\n" + + "if same key record with different partition path came in, default true"); + // ------------------------------------------------------------------------ // Read Options // ------------------------------------------------------------------------ diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 0f599d2cc..5ad20d5be 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -21,9 +21,11 @@ package org.apache.hudi.sink.partitioner; import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.model.BaseAvroPayload; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordGlobalLocation; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; @@ -32,6 +34,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.index.HoodieIndexUtils; +import org.apache.hudi.sink.utils.PayloadCreation; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.BucketInfo; import org.apache.hudi.util.StreamerUtil; @@ -90,7 +93,7 @@ public class BucketAssignFunction> *
  • If it does not, use the {@link BucketAssigner} to generate a new bucket ID
  • * */ - private MapState indexState; + private MapState indexState; /** * Bucket assigner to assign new bucket IDs or reuse existing ones. @@ -110,11 +113,23 @@ public class BucketAssignFunction> */ private MapState partitionLoadState; + /** + * Used to create DELETE payload. + */ + private PayloadCreation payloadCreation; + + /** + * If the index is global, update the index for the old partition path + * if same key record with different partition path came in. + */ + private final boolean globalIndex; + public BucketAssignFunction(Configuration conf) { this.conf = conf; this.isChangingRecords = WriteOperationType.isChangingRecords( WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))); this.bootstrapIndex = conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED); + this.globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED); } @Override @@ -132,6 +147,7 @@ public class BucketAssignFunction> HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)), context, writeConfig); + this.payloadCreation = PayloadCreation.instance(this.conf); } @Override @@ -141,11 +157,11 @@ public class BucketAssignFunction> @Override public void initializeState(FunctionInitializationContext context) { - MapStateDescriptor indexStateDesc = + MapStateDescriptor indexStateDesc = new MapStateDescriptor<>( "indexState", - TypeInformation.of(HoodieKey.class), - TypeInformation.of(HoodieRecordLocation.class)); + Types.STRING, + TypeInformation.of(HoodieRecordGlobalLocation.class)); double ttl = conf.getDouble(FlinkOptions.INDEX_STATE_TTL) * 24 * 60 * 60 * 1000; if (ttl > 0) { indexStateDesc.enableTimeToLive(StateTtlConfigUtil.createTtlConfig((long) ttl)); @@ -166,38 +182,41 @@ public class BucketAssignFunction> // 3. if it is an INSERT, decide the location using the BucketAssigner then send it out. HoodieRecord record = (HoodieRecord) value; final HoodieKey hoodieKey = record.getKey(); - final BucketInfo bucketInfo; + final String recordKey = hoodieKey.getRecordKey(); + final String partitionPath = hoodieKey.getPartitionPath(); final HoodieRecordLocation location; // The dataset may be huge, thus the processing would block for long, // disabled by default. - if (bootstrapIndex && !partitionLoadState.contains(hoodieKey.getPartitionPath())) { + if (bootstrapIndex && !partitionLoadState.contains(partitionPath)) { // If the partition records are never loaded, load the records first. - loadRecords(hoodieKey.getPartitionPath()); + loadRecords(partitionPath); } // Only changing records need looking up the index for the location, // append only records are always recognized as INSERT. - if (isChangingRecords && this.indexState.contains(hoodieKey)) { + if (isChangingRecords && indexState.contains(recordKey)) { // Set up the instant time as "U" to mark the bucket as an update bucket. - location = new HoodieRecordLocation("U", this.indexState.get(hoodieKey).getFileId()); - this.bucketAssigner.addUpdate(record.getPartitionPath(), location.getFileId()); - } else { - bucketInfo = this.bucketAssigner.addInsert(hoodieKey.getPartitionPath()); - switch (bucketInfo.getBucketType()) { - case INSERT: - // This is an insert bucket, use HoodieRecordLocation instant time as "I". - // Downstream operators can then check the instant time to know whether - // a record belongs to an insert bucket. - location = new HoodieRecordLocation("I", bucketInfo.getFileIdPrefix()); - break; - case UPDATE: - location = new HoodieRecordLocation("U", bucketInfo.getFileIdPrefix()); - break; - default: - throw new AssertionError(); + HoodieRecordGlobalLocation oldLoc = this.indexState.get(recordKey); + if (!StreamerUtil.equal(oldLoc.getPartitionPath(), partitionPath)) { + if (globalIndex) { + // if partition path changes, emit a delete record for old partition path, + // then update the index state using location with new partition path. + HoodieRecord deleteRecord = new HoodieRecord<>(new HoodieKey(recordKey, oldLoc.getPartitionPath()), + payloadCreation.createDeletePayload((BaseAvroPayload) record.getData())); + deleteRecord.setCurrentLocation(oldLoc.toLocal("U")); + deleteRecord.seal(); + out.collect((O) deleteRecord); + } + location = getNewRecordLocation(partitionPath); + updateIndexState(recordKey, partitionPath, location); + } else { + location = oldLoc.toLocal("U"); + this.bucketAssigner.addUpdate(partitionPath, location.getFileId()); } + } else { + location = getNewRecordLocation(partitionPath); if (isChangingRecords) { - this.indexState.put(hoodieKey, location); + updateIndexState(recordKey, partitionPath, location); } } record.unseal(); @@ -206,6 +225,32 @@ public class BucketAssignFunction> out.collect((O) record); } + private HoodieRecordLocation getNewRecordLocation(String partitionPath) { + final BucketInfo bucketInfo = this.bucketAssigner.addInsert(partitionPath); + final HoodieRecordLocation location; + switch (bucketInfo.getBucketType()) { + case INSERT: + // This is an insert bucket, use HoodieRecordLocation instant time as "I". + // Downstream operators can then check the instant time to know whether + // a record belongs to an insert bucket. + location = new HoodieRecordLocation("I", bucketInfo.getFileIdPrefix()); + break; + case UPDATE: + location = new HoodieRecordLocation("U", bucketInfo.getFileIdPrefix()); + break; + default: + throw new AssertionError(); + } + return location; + } + + private void updateIndexState( + String recordKey, + String partitionPath, + HoodieRecordLocation localLoc) throws Exception { + this.indexState.put(recordKey, HoodieRecordGlobalLocation.fromLocal(partitionPath, localLoc)); + } + @Override public void notifyCheckpointComplete(long l) { // Refresh the table state when there are new commits. @@ -245,7 +290,8 @@ public class BucketAssignFunction> boolean shouldLoad = KeyGroupRangeAssignment.assignKeyToParallelOperator( hoodieKey.getRecordKey(), maxParallelism, parallelism) == taskID; if (shouldLoad) { - this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId())); + this.indexState.put(hoodieKey.getRecordKey(), + new HoodieRecordGlobalLocation(hoodieKey.getPartitionPath(), baseFile.getCommitTime(), baseFile.getFileId())); } } catch (Exception e) { LOG.error("Error when putting record keys into the state from file: {}", baseFile); @@ -265,7 +311,7 @@ public class BucketAssignFunction> @VisibleForTesting public boolean isKeyInState(HoodieKey hoodieKey) { try { - return this.indexState.contains(hoodieKey); + return this.indexState.contains(hoodieKey.getRecordKey()); } catch (Exception e) { throw new HoodieException(e); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java index 5bd3c687e..fcf77dbab 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java @@ -18,16 +18,12 @@ package org.apache.hudi.sink.transform; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ReflectionUtils; -import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.keygen.KeyGenerator; +import org.apache.hudi.sink.utils.PayloadCreation; import org.apache.hudi.util.RowDataToAvroConverters; import org.apache.hudi.util.StreamerUtil; @@ -39,11 +35,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; -import javax.annotation.Nullable; - import java.io.IOException; -import java.io.Serializable; -import java.lang.reflect.Constructor; /** * Function that transforms RowData to HoodieRecord. @@ -116,53 +108,4 @@ public class RowDataToHoodieFunction(hoodieKey, payload); } - - /** - * Util to create hoodie pay load instance. - */ - private static class PayloadCreation implements Serializable { - private static final long serialVersionUID = 1L; - - private final boolean shouldCombine; - private final Constructor constructor; - private final String preCombineField; - - private PayloadCreation( - boolean shouldCombine, - Constructor constructor, - @Nullable String preCombineField) { - this.shouldCombine = shouldCombine; - this.constructor = constructor; - this.preCombineField = preCombineField; - } - - public static PayloadCreation instance(Configuration conf) throws Exception { - boolean shouldCombine = conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS) - || WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT; - String preCombineField = null; - final Class[] argTypes; - final Constructor constructor; - if (shouldCombine) { - preCombineField = conf.getString(FlinkOptions.PRECOMBINE_FIELD); - argTypes = new Class[] {GenericRecord.class, Comparable.class}; - } else { - argTypes = new Class[] {Option.class}; - } - final String clazz = conf.getString(FlinkOptions.PAYLOAD_CLASS); - constructor = ReflectionUtils.getClass(clazz).getConstructor(argTypes); - return new PayloadCreation(shouldCombine, constructor, preCombineField); - } - - public HoodieRecordPayload createPayload(GenericRecord record, boolean isDelete) throws Exception { - if (shouldCombine) { - ValidationUtils.checkState(preCombineField != null); - Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(record, - preCombineField, false); - return (HoodieRecordPayload) constructor.newInstance( - isDelete ? null : record, orderingVal); - } else { - return (HoodieRecordPayload) this.constructor.newInstance(Option.of(record)); - } - } - } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java new file mode 100644 index 000000000..831da25e5 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.BaseAvroPayload; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.configuration.FlinkOptions; + +import org.apache.avro.generic.GenericRecord; +import org.apache.flink.configuration.Configuration; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.lang.reflect.Constructor; + +/** + * Util to create hoodie pay load instance. + */ +public class PayloadCreation implements Serializable { + private static final long serialVersionUID = 1L; + + private final boolean shouldCombine; + private final Constructor constructor; + private final String preCombineField; + + private PayloadCreation( + boolean shouldCombine, + Constructor constructor, + @Nullable String preCombineField) { + this.shouldCombine = shouldCombine; + this.constructor = constructor; + this.preCombineField = preCombineField; + } + + public static PayloadCreation instance(Configuration conf) throws Exception { + boolean shouldCombine = conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS) + || WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT; + String preCombineField = null; + final Class[] argTypes; + final Constructor constructor; + if (shouldCombine) { + preCombineField = conf.getString(FlinkOptions.PRECOMBINE_FIELD); + argTypes = new Class[] {GenericRecord.class, Comparable.class}; + } else { + argTypes = new Class[] {Option.class}; + } + final String clazz = conf.getString(FlinkOptions.PAYLOAD_CLASS); + constructor = ReflectionUtils.getClass(clazz).getConstructor(argTypes); + return new PayloadCreation(shouldCombine, constructor, preCombineField); + } + + public HoodieRecordPayload createPayload(GenericRecord record, boolean isDelete) throws Exception { + if (shouldCombine) { + ValidationUtils.checkState(preCombineField != null); + Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(record, + preCombineField, false); + return (HoodieRecordPayload) constructor.newInstance( + isDelete ? null : record, orderingVal); + } else { + return (HoodieRecordPayload) this.constructor.newInstance(Option.of(record)); + } + } + + public HoodieRecordPayload createDeletePayload(BaseAvroPayload payload) throws Exception { + if (shouldCombine) { + return (HoodieRecordPayload) constructor.newInstance(null, payload.orderingVal); + } else { + return (HoodieRecordPayload) this.constructor.newInstance(Option.empty()); + } + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 41c587c7a..af8707034 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -392,6 +392,56 @@ public class HoodieDataSourceITCase extends AbstractTestBase { assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par5]"); } + @Test + void testWriteGlobalIndex() { + // the source generates 4 commits + String createSource = TestConfigurations.getFileSourceDDL( + "source", "test_source_4.data", 4); + streamTableEnv.executeSql(createSource); + + Map options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.INSERT_DROP_DUPS.key(), "true"); + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + streamTableEnv.executeSql(hoodieTableDDL); + + final String insertInto2 = "insert into t1 select * from source"; + + execInsertSql(streamTableEnv, insertInto2); + + List result = CollectionUtil.iterableToList( + () -> streamTableEnv.sqlQuery("select * from t1").execute().collect()); + assertRowsEquals(result, "[id1,Phoebe,52,1970-01-01T00:00:08,par4]"); + } + + @Test + void testWriteLocalIndex() { + // the source generates 4 commits + String createSource = TestConfigurations.getFileSourceDDL( + "source", "test_source_4.data", 4); + streamTableEnv.executeSql(createSource); + + Map options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.INDEX_GLOBAL_ENABLED.key(), "false"); + options.put(FlinkOptions.INSERT_DROP_DUPS.key(), "true"); + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + streamTableEnv.executeSql(hoodieTableDDL); + + final String insertInto2 = "insert into t1 select * from source"; + + execInsertSql(streamTableEnv, insertInto2); + + List result = CollectionUtil.iterableToList( + () -> streamTableEnv.sqlQuery("select * from t1").execute().collect()); + final String expected = "[" + + "id1,Stephen,34,1970-01-01T00:00:02,par1, " + + "id1,Fabian,32,1970-01-01T00:00:04,par2, " + + "id1,Jane,19,1970-01-01T00:00:06,par3, " + + "id1,Phoebe,52,1970-01-01T00:00:08,par4]"; + assertRowsEquals(result, expected, 3); + } + @Test void testStreamReadEmptyTablePath() throws Exception { // create an empty table diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index 4a2466c94..bb6766180 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -256,8 +256,20 @@ public class TestData { * @param expected Expected string of the sorted rows */ public static void assertRowsEquals(List rows, String expected) { + assertRowsEquals(rows, expected, 0); + } + + /** + * Sort the {@code rows} using field at index {@code orderingPos} and asserts + * it equals with the expected string {@code expected}. + * + * @param rows Actual result rows + * @param expected Expected string of the sorted rows + * @param orderingPos Field position for ordering + */ + public static void assertRowsEquals(List rows, String expected, int orderingPos) { String rowsString = rows.stream() - .sorted(Comparator.comparing(o -> toStringSafely(o.getField(0)))) + .sorted(Comparator.comparing(o -> toStringSafely(o.getField(orderingPos)))) .collect(Collectors.toList()).toString(); assertThat(rowsString, is(expected)); } diff --git a/hudi-flink/src/test/resources/test_source_4.data b/hudi-flink/src/test/resources/test_source_4.data new file mode 100644 index 000000000..1ed4d19fb --- /dev/null +++ b/hudi-flink/src/test/resources/test_source_4.data @@ -0,0 +1,8 @@ +{"uuid": "id1", "name": "Danny", "age": 24, "ts": "1970-01-01T00:00:01", "partition": "par1"} +{"uuid": "id1", "name": "Stephen", "age": 34, "ts": "1970-01-01T00:00:02", "partition": "par1"} +{"uuid": "id1", "name": "Julian", "age": 54, "ts": "1970-01-01T00:00:03", "partition": "par2"} +{"uuid": "id1", "name": "Fabian", "age": 32, "ts": "1970-01-01T00:00:04", "partition": "par2"} +{"uuid": "id1", "name": "Sophia", "age": 18, "ts": "1970-01-01T00:00:05", "partition": "par3"} +{"uuid": "id1", "name": "Jane", "age": 19, "ts": "1970-01-01T00:00:06", "partition": "par3"} +{"uuid": "id1", "name": "Ella", "age": 38, "ts": "1970-01-01T00:00:07", "partition": "par4"} +{"uuid": "id1", "name": "Phoebe", "age": 52, "ts": "1970-01-01T00:00:08", "partition": "par4"}