diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 07b419e59..5b751e4f1 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -437,6 +437,12 @@ public class HoodieFlinkWriteClient extends final HoodieRecordLocation loc = record.getCurrentLocation(); final String fileID = loc.getFileId(); final String partitionPath = record.getPartitionPath(); + // Always use FlinkCreateHandle when insert duplication turns on + if (config.allowDuplicateInserts()) { + return new FlinkCreateHandle<>(config, instantTime, table, partitionPath, + fileID, table.getTaskContextSupplier()); + } + if (bucketToHandles.containsKey(fileID)) { MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID); if (lastHandle.shouldReplace()) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index f839b5ef9..34b050c92 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -209,6 +209,12 @@ public class FlinkOptions extends HoodieConfig { .defaultValue(TABLE_TYPE_COPY_ON_WRITE) .withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ"); + public static final ConfigOption INSERT_ALLOW_DUP = ConfigOptions + .key("write.insert.allow_dup") + .booleanType() + .defaultValue(true) + .withDescription("Whether to allow data duplication for INSERT operation, if enabled, writes the base files directly, default true"); + public static final ConfigOption OPERATION = ConfigOptions .key("write.operation") .stringType() diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 688cf1baf..e6c59b1ef 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -91,8 +91,6 @@ public class BucketAssignFunction> private final Configuration conf; - private transient org.apache.hadoop.conf.Configuration hadoopConf; - private final boolean isChangingRecords; /** @@ -117,21 +115,25 @@ public class BucketAssignFunction> public void open(Configuration parameters) throws Exception { super.open(parameters); HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf); - this.hadoopConf = StreamerUtil.getHadoopConf(); HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( - new SerializableConfiguration(this.hadoopConf), + new SerializableConfiguration(StreamerUtil.getHadoopConf()), new FlinkTaskContextSupplier(getRuntimeContext())); this.bucketAssigner = BucketAssigners.create( getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getMaxNumberOfParallelSubtasks(), getRuntimeContext().getNumberOfParallelSubtasks(), - WriteOperationType.isOverwrite(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))), + ignoreSmallFiles(writeConfig), HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)), context, writeConfig); this.payloadCreation = PayloadCreation.instance(this.conf); } + private boolean ignoreSmallFiles(HoodieWriteConfig writeConfig) { + WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)); + return WriteOperationType.isOverwrite(operationType) || writeConfig.allowDuplicateInserts(); + } + @Override public void snapshotState(FunctionSnapshotContext context) { this.bucketAssigner.reset(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java index 8d304db3e..13d458760 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java @@ -35,25 +35,25 @@ public abstract class BucketAssigners { /** * Creates a {@code BucketAssigner}. * - * @param taskID The task ID - * @param maxParallelism The max parallelism - * @param numTasks The number of tasks - * @param overwrite Whether the write operation is OVERWRITE - * @param tableType The table type - * @param context The engine context - * @param config The configuration + * @param taskID The task ID + * @param maxParallelism The max parallelism + * @param numTasks The number of tasks + * @param ignoreSmallFiles Whether to ignore the small files + * @param tableType The table type + * @param context The engine context + * @param config The configuration * @return the bucket assigner instance */ public static BucketAssigner create( int taskID, int maxParallelism, int numTasks, - boolean overwrite, + boolean ignoreSmallFiles, HoodieTableType tableType, HoodieFlinkEngineContext context, HoodieWriteConfig config) { boolean delta = tableType.equals(HoodieTableType.MERGE_ON_READ); - WriteProfile writeProfile = WriteProfiles.singleton(overwrite, delta, config, context); + WriteProfile writeProfile = WriteProfiles.singleton(ignoreSmallFiles, delta, config, context); return new BucketAssigner(taskID, maxParallelism, numTasks, writeProfile, config); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/OverwriteWriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/EmptyWriteProfile.java similarity index 74% rename from hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/OverwriteWriteProfile.java rename to hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/EmptyWriteProfile.java index 8b084462c..3cdd798e2 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/OverwriteWriteProfile.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/EmptyWriteProfile.java @@ -26,13 +26,18 @@ import java.util.Collections; import java.util.List; /** - * WriteProfile for INSERT OVERWRITE and INSERT OVERWRITE TABLE operations, - * this WriteProfile always skip the existing small files because of the 'OVERWRITE' semantics. + * WriteProfile that always return empty small files. + * + *

This write profile is used for cases: + * i). INSERT OVERWRITE and INSERT OVERWRITE TABLE operations, + * the existing small files are ignored because of the 'OVERWRITE' semantics; + * ii). INSERT operation when data file merge is disabled. + * * *

Note: assumes the index can always index log files for Flink write. */ -public class OverwriteWriteProfile extends WriteProfile { - public OverwriteWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) { +public class EmptyWriteProfile extends WriteProfile { + public EmptyWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) { super(config, context); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java index 093cef5be..1277b190d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java @@ -53,22 +53,22 @@ public class WriteProfiles { private WriteProfiles() {} - public static synchronized WriteProfile singleton( - boolean overwrite, + public static synchronized WriteProfile singleton( + boolean ignoreSmallFiles, boolean delta, HoodieWriteConfig config, HoodieFlinkEngineContext context) { return PROFILES.computeIfAbsent(config.getBasePath(), - k -> getWriteProfile(overwrite, delta, config, context)); + k -> getWriteProfile(ignoreSmallFiles, delta, config, context)); } private static WriteProfile getWriteProfile( - boolean overwrite, + boolean ignoreSmallFiles, boolean delta, HoodieWriteConfig config, HoodieFlinkEngineContext context) { - if (overwrite) { - return new OverwriteWriteProfile(config, context); + if (ignoreSmallFiles) { + return new EmptyWriteProfile(config, context); } else if (delta) { return new DeltaWriteProfile(config, context); } else { diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 4597d0902..a7ef14fa4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -69,6 +69,9 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ.", required = true) public String tableType; + @Parameter(names = {"--insert-allow-dup"}, description = "Whether to allow data duplication for INSERT operation, if enabled, writes the base files directly.", required = true) + public Boolean insertAllowDup = true; + @Parameter(names = {"--props"}, description = "Path to properties file on localfs or dfs, with configurations for " + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are " + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer" @@ -305,6 +308,7 @@ public class FlinkStreamerConfig extends Configuration { conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName); // copy_on_write works same as COPY_ON_WRITE conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase()); + conf.setBoolean(FlinkOptions.INSERT_ALLOW_DUP, config.insertAllowDup); conf.setString(FlinkOptions.OPERATION, config.operation.value()); conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField); conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index a8fe93b12..d3c538825 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -62,7 +62,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab Configuration conf = (Configuration) helper.getOptions(); TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); - validateRequiredFields(conf, schema); + sanityCheck(conf, schema); setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema); Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() -> @@ -79,7 +79,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab public DynamicTableSink createDynamicTableSink(Context context) { Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions()); TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); - validateRequiredFields(conf, schema); + sanityCheck(conf, schema); setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema); return new HoodieTableSink(conf, schema); } @@ -103,12 +103,13 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab // Utilities // ------------------------------------------------------------------------- - /** Validate required options. For e.g, record key and pre_combine key. + /** + * The sanity check. * * @param conf The table options * @param schema The table schema */ - private void validateRequiredFields(Configuration conf, TableSchema schema) { + private void sanityCheck(Configuration conf, TableSchema schema) { List fields = Arrays.stream(schema.getFieldNames()).collect(Collectors.toList()); // validate record key in pk absence. @@ -128,6 +129,11 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab throw new ValidationException("Field " + preCombineField + " does not exist in the table schema." + "Please check 'write.precombine.field' option."); } + + if (conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase().equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ) + && conf.getBoolean(FlinkOptions.INSERT_ALLOW_DUP)) { + throw new ValidationException("Option 'write.insert.allow_dup' is only allowed for COPY_ON_WRITE table."); + } } /** diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index d42993b9a..7967b69d9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.util.Option; @@ -145,6 +146,7 @@ public class StreamerUtil { .withEngineType(EngineType.FLINK) .withPath(conf.getString(FlinkOptions.PATH)) .combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true) + .withMergeAllowDuplicateOnInserts(allowDuplicateInserts(conf)) .withCompactionConfig( HoodieCompactionConfig.newBuilder() .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS)) @@ -345,4 +347,9 @@ public class StreamerUtil { throw new IOException("Could not load transformer class(es) " + classNames, e); } } + + public static boolean allowDuplicateInserts(Configuration conf) { + WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)); + return operationType == WriteOperationType.INSERT && conf.getBoolean(FlinkOptions.INSERT_ALLOW_DUP); + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 7e060f710..29a7455b6 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -23,6 +23,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; @@ -532,6 +533,81 @@ public class TestWriteCopyOnWrite { checkWrittenData(tempFile, expected, 1); } + @Test + public void testInsertAllowsDuplication() throws Exception { + // reset the config option + conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size + conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value()); + funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); + + // open the function and ingest data + funcWrapper.openFunction(); + // Each record is 208 bytes. so 4 records expect to trigger a mini-batch write + for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { + funcWrapper.invoke(rowData); + } + + // this triggers the data write and event send + funcWrapper.checkpointFunction(1); + Map> dataBuffer = funcWrapper.getDataBuffer(); + assertThat("All data should be flushed out", dataBuffer.size(), is(0)); + + final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first + final OperatorEvent event2 = funcWrapper.getNextEvent(); + assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class)); + + funcWrapper.getCoordinator().handleEventFromOperator(0, event1); + funcWrapper.getCoordinator().handleEventFromOperator(0, event2); + assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); + + String instant = funcWrapper.getWriteClient() + .getLastPendingInstant(getTableType()); + + funcWrapper.checkpointComplete(1); + + Map expected = new HashMap<>(); + + expected.put("par1", "[" + + "id1,par1,id1,Danny,23,0,par1, " + + "id1,par1,id1,Danny,23,1,par1, " + + "id1,par1,id1,Danny,23,2,par1, " + + "id1,par1,id1,Danny,23,3,par1, " + + "id1,par1,id1,Danny,23,4,par1]"); + + TestData.checkWrittenAllData(tempFile, expected, 1); + + // started a new instant already + checkInflightInstant(funcWrapper.getWriteClient()); + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + + // insert duplicates again + for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { + funcWrapper.invoke(rowData); + } + + funcWrapper.checkpointFunction(2); + + final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first + final OperatorEvent event4 = funcWrapper.getNextEvent(); + funcWrapper.getCoordinator().handleEventFromOperator(0, event3); + funcWrapper.getCoordinator().handleEventFromOperator(0, event4); + funcWrapper.checkpointComplete(2); + + // same with the original base file content. + expected.put("par1", "[" + + "id1,par1,id1,Danny,23,0,par1, " + + "id1,par1,id1,Danny,23,0,par1, " + + "id1,par1,id1,Danny,23,1,par1, " + + "id1,par1,id1,Danny,23,1,par1, " + + "id1,par1,id1,Danny,23,2,par1, " + + "id1,par1,id1,Danny,23,2,par1, " + + "id1,par1,id1,Danny,23,3,par1, " + + "id1,par1,id1,Danny,23,3,par1, " + + "id1,par1,id1,Danny,23,4,par1, " + + "id1,par1,id1,Danny,23,4,par1]"); + TestData.checkWrittenAllData(tempFile, expected, 1); + } + @Test public void testInsertWithSmallBufferSize() throws Exception { // reset the config option diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java index 07e23b56e..fa4f92bdc 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java @@ -37,6 +37,7 @@ import org.apache.avro.Schema; import org.apache.flink.configuration.Configuration; import org.apache.hadoop.fs.FileSystem; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.io.File; import java.util.Comparator; @@ -67,6 +68,11 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite { conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); } + @Test + public void testInsertAllowsDuplication() { + // ignore the test because only COW table supports INSERT duplication + } + @Override protected void checkWrittenData(File baseFile, Map expected, int partitions) throws Exception { HoodieTableMetaClient metaClient = HoodieFlinkTable.create(writeConfig, context).getMetaClient(); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java index 13a71ecb8..de54d90c3 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java @@ -22,7 +22,6 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.flink.configuration.Configuration; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.HashMap; @@ -39,10 +38,14 @@ public class TestWriteMergeOnReadWithCompact extends TestWriteCopyOnWrite { conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); } - @Disabled @Test - public void testIndexStateBootstrap() { - // Ignore the index bootstrap because we only support parquet load now. + public void testInsertAllowsDuplication() { + // ignore the test because only COW table supports INSERT duplication + } + + @Override + protected Map getExpectedBeforeCheckpointComplete() { + return EXPECTED1; } protected Map getMiniBatchExpected() { diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index 799739cfc..21f1647a9 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -340,6 +340,24 @@ public class TestHoodieTableFactory { assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(45)); } + @Test + void testMorTableInsertAllowDuplication() { + TableSchema schema = TableSchema.builder() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.VARCHAR(20)) + .field("f2", DataTypes.TIMESTAMP(3)) + .field("ts", DataTypes.TIMESTAMP(3)) + .primaryKey("f0") + .build(); + // overwrite the operation + this.conf.setString(FlinkOptions.OPERATION.key(), "insert"); + this.conf.setString(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + + final MockContext sinkContext = MockContext.getInstance(this.conf, schema, "f2"); + assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sinkContext), + "Option 'write.insert.allow_dup' is only allowed for COPY_ON_WRITE table."); + } + // ------------------------------------------------------------------------- // Inner Class // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index 5ddb99c5d..f5ac9c529 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -394,6 +394,48 @@ public class TestData { } } + /** + * Checks the source data set are written as expected. + * Different with {@link #checkWrittenData}, it reads all the data files. + * + *

Note: Replace it with the Flink reader when it is supported. + * + * @param baseFile The file base to check, should be a directory + * @param expected The expected results mapping, the key should be the partition path + * and value should be values list with the key partition + * @param partitions The expected partition number + */ + public static void checkWrittenAllData( + File baseFile, + Map expected, + int partitions) throws IOException { + assert baseFile.isDirectory(); + FileFilter filter = file -> !file.getName().startsWith("."); + File[] partitionDirs = baseFile.listFiles(filter); + + assertNotNull(partitionDirs); + assertThat(partitionDirs.length, is(partitions)); + + for (File partitionDir : partitionDirs) { + File[] dataFiles = partitionDir.listFiles(filter); + assertNotNull(dataFiles); + + List readBuffer = new ArrayList<>(); + for (File dataFile : dataFiles) { + ParquetReader reader = AvroParquetReader + .builder(new Path(dataFile.getAbsolutePath())).build(); + GenericRecord nextRecord = reader.read(); + while (nextRecord != null) { + readBuffer.add(filterOutVariables(nextRecord)); + nextRecord = reader.read(); + } + } + + readBuffer.sort(Comparator.naturalOrder()); + assertThat(readBuffer.toString(), is(expected.get(partitionDir.getName()))); + } + } + /** * Checks the source data are written as expected. *