1
0

[HUDI-2087] Support Append only in Flink stream (#3252)

Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>
This commit is contained in:
yuzhaojing
2021-07-10 14:49:35 +08:00
committed by GitHub
parent 7c6eebf98c
commit 783c9cb369
14 changed files with 243 additions and 29 deletions

View File

@@ -19,5 +19,5 @@
package org.apache.hudi.table.action.commit;
public enum BucketType {
UPDATE, INSERT
UPDATE, INSERT, APPEND_ONLY
}

View File

@@ -56,6 +56,7 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.table.action.compact.FlinkCompactHelpers;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngrade;
import org.apache.hudi.util.FlinkClientUtil;
@@ -408,6 +409,12 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
final HoodieRecordLocation loc = record.getCurrentLocation();
final String fileID = loc.getFileId();
final String partitionPath = record.getPartitionPath();
// append only mode always use FlinkCreateHandle
if (loc.getInstantTime().equals(BucketType.APPEND_ONLY.name())) {
return new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
fileID, table.getTaskContextSupplier());
}
if (bucketToHandles.containsKey(fileID)) {
MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID);
if (lastHandle.shouldReplace()) {
@@ -424,7 +431,8 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
if (isDelta) {
writeHandle = new FlinkAppendHandle<>(config, instantTime, table, partitionPath, fileID, recordItr,
table.getTaskContextSupplier());
} else if (loc.getInstantTime().equals("I")) {
} else if (loc.getInstantTime().equals(BucketType.INSERT.name()) || loc.getInstantTime().equals(BucketType.APPEND_ONLY.name())) {
// use the same handle for insert bucket
writeHandle = new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
fileID, table.getTaskContextSupplier());
} else {

View File

@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -80,7 +81,7 @@ public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O>
@Override
protected boolean isUpdateRecord(HoodieRecord<T> hoodieRecord) {
return hoodieRecord.getCurrentLocation() != null
&& hoodieRecord.getCurrentLocation().getInstantTime().equals("U");
&& hoodieRecord.getCurrentLocation().getInstantTime().equals(BucketType.UPDATE.name());
}
@Override

View File

@@ -30,6 +30,7 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.table.MarkerFiles;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -70,6 +71,17 @@ public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
}
}
@Override
protected void createMarkerFile(String partitionPath, String dataFileName) {
// In some rare cases, the task was pulled up again with same write file name,
// for e.g, reuse the small log files from last commit instant.
// Just skip the marker file creation if it already exists, the new data would append to
// the file directly.
MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType());
}
/**
* The flink checkpoints start in sequence and asynchronously, when one write task finish the checkpoint(A)
* (thus the fs view got the written data files some of which may be invalid),

View File

@@ -102,9 +102,7 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
final HoodieRecord<?> record = inputRecords.get(0);
final String partitionPath = record.getPartitionPath();
final String fileId = record.getCurrentLocation().getFileId();
final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I")
? BucketType.INSERT
: BucketType.UPDATE;
final BucketType bucketType = BucketType.valueOf(record.getCurrentLocation().getInstantTime());
handleUpsertPartition(
instantTime,
partitionPath,
@@ -185,6 +183,7 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
} else {
switch (bucketType) {
case INSERT:
case APPEND_ONLY:
return handleInsert(fileIdHint, recordItr);
case UPDATE:
return handleUpdate(partitionPath, fileIdHint, recordItr);

View File

@@ -186,6 +186,12 @@ public class FlinkOptions {
.defaultValue(TABLE_TYPE_COPY_ON_WRITE)
.withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ");
public static final ConfigOption<Boolean> APPEND_ONLY_ENABLE = ConfigOptions
.key("append_only.enable")
.booleanType()
.defaultValue(false)
.withDescription("Whether to write data to new baseFile without index, only support in COW, default false");
public static final ConfigOption<String> OPERATION = ConfigOptions
.key("write.operation")
.stringType()

View File

@@ -28,11 +28,13 @@ import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.bootstrap.IndexRecord;
import org.apache.hudi.sink.utils.PayloadCreation;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
@@ -55,9 +57,9 @@ import java.util.Objects;
* it then assigns the bucket with ID using the {@link BucketAssigner}.
*
* <p>All the records are tagged with HoodieRecordLocation, instead of real instant time,
* INSERT record uses "I" and UPSERT record uses "U" as instant time. There is no need to keep
* INSERT record uses "INSERT" and UPSERT record uses "UPDATE" as instant time. There is no need to keep
* the "real" instant time for each record, the bucket ID (partition path & fileID) actually decides
* where the record should write to. The "I" and "U" tags are only used for downstream to decide whether
* where the record should write to. The "INSERT" and "UPDATE" tags are only used for downstream to decide whether
* the data bucket is an INSERT or an UPSERT, we should factor the tags out when the underneath writer
* supports specifying the bucket type explicitly.
*
@@ -106,11 +108,18 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
*/
private final boolean globalIndex;
private final boolean appendOnly;
public BucketAssignFunction(Configuration conf) {
this.conf = conf;
this.isChangingRecords = WriteOperationType.isChangingRecords(
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
this.globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED);
this.appendOnly = conf.getBoolean(FlinkOptions.APPEND_ONLY_ENABLE);
if (appendOnly) {
ValidationUtils.checkArgument(conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.COPY_ON_WRITE.name()),
"APPEND_ONLY mode only support in COPY_ON_WRITE table");
}
}
@Override
@@ -170,25 +179,33 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
final String partitionPath = hoodieKey.getPartitionPath();
final HoodieRecordLocation location;
if (appendOnly) {
location = getNewRecordLocation(partitionPath);
this.context.setCurrentKey(recordKey);
record.setCurrentLocation(location);
out.collect((O) record);
return;
}
// Only changing records need looking up the index for the location,
// append only records are always recognized as INSERT.
HoodieRecordGlobalLocation oldLoc = indexState.value();
if (isChangingRecords && oldLoc != null) {
// Set up the instant time as "U" to mark the bucket as an update bucket.
// Set up the instant time as "UPDATE" to mark the bucket as an update bucket.
if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) {
if (globalIndex) {
// if partition path changes, emit a delete record for old partition path,
// then update the index state using location with new partition path.
HoodieRecord<?> deleteRecord = new HoodieRecord<>(new HoodieKey(recordKey, oldLoc.getPartitionPath()),
payloadCreation.createDeletePayload((BaseAvroPayload) record.getData()));
deleteRecord.setCurrentLocation(oldLoc.toLocal("U"));
deleteRecord.setCurrentLocation(oldLoc.toLocal(BucketType.UPDATE.name()));
deleteRecord.seal();
out.collect((O) deleteRecord);
}
location = getNewRecordLocation(partitionPath);
updateIndexState(partitionPath, location);
} else {
location = oldLoc.toLocal("U");
location = oldLoc.toLocal(BucketType.UPDATE.name());
this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
}
} else {
@@ -203,17 +220,26 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
}
private HoodieRecordLocation getNewRecordLocation(String partitionPath) {
final BucketInfo bucketInfo = this.bucketAssigner.addInsert(partitionPath);
BucketInfo bucketInfo;
if (appendOnly) {
bucketInfo = this.bucketAssigner.addAppendOnly(partitionPath);
} else {
bucketInfo = this.bucketAssigner.addInsert(partitionPath);
}
final HoodieRecordLocation location;
switch (bucketInfo.getBucketType()) {
case INSERT:
// This is an insert bucket, use HoodieRecordLocation instant time as "I".
// This is an insert bucket, use HoodieRecordLocation instant time as "INSERT".
// Downstream operators can then check the instant time to know whether
// a record belongs to an insert bucket.
location = new HoodieRecordLocation("I", bucketInfo.getFileIdPrefix());
location = new HoodieRecordLocation(BucketType.INSERT.name(), bucketInfo.getFileIdPrefix());
break;
case UPDATE:
location = new HoodieRecordLocation("U", bucketInfo.getFileIdPrefix());
location = new HoodieRecordLocation(BucketType.UPDATE.name(), bucketInfo.getFileIdPrefix());
break;
case APPEND_ONLY:
location = new HoodieRecordLocation(BucketType.APPEND_ONLY.name(), bucketInfo.getFileIdPrefix());
break;
default:
throw new AssertionError();

View File

@@ -140,6 +140,14 @@ public class BucketAssigner implements AutoCloseable {
}
// if we have anything more, create new insert buckets, like normal
return getOrCreateNewFileBucket(partitionPath, BucketType.INSERT);
}
public BucketInfo addAppendOnly(String partitionPath) {
return getOrCreateNewFileBucket(partitionPath, BucketType.APPEND_ONLY);
}
private BucketInfo getOrCreateNewFileBucket(String partitionPath, BucketType bucketType) {
if (newFileAssignStates.containsKey(partitionPath)) {
NewFileAssignState newFileAssignState = newFileAssignStates.get(partitionPath);
if (newFileAssignState.canAssign()) {
@@ -148,7 +156,7 @@ public class BucketAssigner implements AutoCloseable {
final String key = StreamerUtil.generateBucketKey(partitionPath, newFileAssignState.fileId);
return bucketInfoMap.get(key);
}
BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath);
BucketInfo bucketInfo = new BucketInfo(bucketType, FSUtils.createNewFileIdPfx(), partitionPath);
final String key = StreamerUtil.generateBucketKey(partitionPath, bucketInfo.getFileIdPrefix());
bucketInfoMap.put(key, bucketInfo);
newFileAssignStates.put(partitionPath, new NewFileAssignState(bucketInfo.getFileIdPrefix(), writeProfile.getRecordsPerBucket()));

View File

@@ -69,6 +69,9 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ.", required = true)
public String tableType;
@Parameter(names = {"--append-only"}, description = "Write data to new parquet in every checkpoint. Only support in COPY_ON_WRITE table.", required = true)
public Boolean appendOnly = false;
@Parameter(names = {"--props"}, description = "Path to properties file on localfs or dfs, with configurations for "
+ "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
+ "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
@@ -290,7 +293,13 @@ public class FlinkStreamerConfig extends Configuration {
conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName);
// copy_on_write works same as COPY_ON_WRITE
conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase());
conf.setString(FlinkOptions.OPERATION, config.operation.value());
conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, config.appendOnly);
if (config.appendOnly) {
// append only should use insert operation
conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
} else {
conf.setString(FlinkOptions.OPERATION, config.operation.value());
}
conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName);
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, config.filterDupes);

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.table;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
@@ -144,6 +145,11 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
TableSchema schema) {
// table name
conf.setString(FlinkOptions.TABLE_NAME.key(), tableName);
// append only
if (conf.getBoolean(FlinkOptions.APPEND_ONLY_ENABLE)) {
// append only should use insert operation
conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
}
// hoodie key about options
setupHoodieKeyOptions(conf, table);
// cleaning options

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
@@ -44,6 +45,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -387,12 +389,12 @@ public class TestWriteCopyOnWrite {
@Test
public void testInsertWithMiniBatches() throws Exception {
// reset the config option
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0007); // 734 bytes batch size
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data
funcWrapper.openFunction();
// Each record is 208 bytes. so 4 records expect to trigger a mini-batch write
// Each record is 216 bytes. so 4 records expect to trigger a mini-batch write
for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
funcWrapper.invoke(rowData);
}
@@ -448,13 +450,13 @@ public class TestWriteCopyOnWrite {
@Test
public void testInsertWithDeduplication() throws Exception {
// reset the config option
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0007); // 734 bytes batch size
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true);
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data
funcWrapper.openFunction();
// Each record is 208 bytes. so 4 records expect to trigger a mini-batch write
// Each record is 216 bytes. so 4 records expect to trigger a mini-batch write
for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
funcWrapper.invoke(rowData);
}
@@ -510,14 +512,91 @@ public class TestWriteCopyOnWrite {
}
@Test
public void testInsertWithSmallBufferSize() throws Exception {
public void testAppendOnly() throws Exception {
// reset the config option
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0007); // 734 bytes batch size
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, false);
conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, true);
conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data
funcWrapper.openFunction();
// each record is 208 bytes. so 4 records expect to trigger buffer flush:
// Each record is 216 bytes. so 4 records expect to trigger a mini-batch write
for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
funcWrapper.invoke(rowData);
}
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
assertThat("All data should be flushed out", dataBuffer.size(), is(0));
final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
final OperatorEvent event2 = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
String instant = funcWrapper.getWriteClient()
.getLastPendingInstant(getTableType());
funcWrapper.checkpointComplete(1);
Map<String, List<String>> expected = new HashMap<>();
expected.put("par1", Arrays.asList(
"id1,par1,id1,Danny,23,0,par1",
"id1,par1,id1,Danny,23,1,par1",
"id1,par1,id1,Danny,23,2,par1",
"id1,par1,id1,Danny,23,3,par1",
"id1,par1,id1,Danny,23,4,par1"));
TestData.checkWrittenAllData(tempFile, expected, 1);
// started a new instant already
checkInflightInstant(funcWrapper.getWriteClient());
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
// insert duplicates again
for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
funcWrapper.invoke(rowData);
}
funcWrapper.checkpointFunction(2);
final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first
final OperatorEvent event4 = funcWrapper.getNextEvent();
funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
funcWrapper.checkpointComplete(2);
// Same the original base file content.
expected.put("par1", Arrays.asList(
"id1,par1,id1,Danny,23,0,par1",
"id1,par1,id1,Danny,23,0,par1",
"id1,par1,id1,Danny,23,1,par1",
"id1,par1,id1,Danny,23,1,par1",
"id1,par1,id1,Danny,23,2,par1",
"id1,par1,id1,Danny,23,2,par1",
"id1,par1,id1,Danny,23,3,par1",
"id1,par1,id1,Danny,23,3,par1",
"id1,par1,id1,Danny,23,4,par1",
"id1,par1,id1,Danny,23,4,par1"));
TestData.checkWrittenAllData(tempFile, expected, 1);
}
@Test
public void testInsertWithSmallBufferSize() throws Exception {
// reset the config option
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0007); // 734 bytes buffer size
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data
funcWrapper.openFunction();
// each record is 216 bytes. so 4 records expect to trigger buffer flush:
// flush the max size bucket once at a time.
for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
funcWrapper.invoke(rowData);
@@ -660,7 +739,7 @@ public class TestWriteCopyOnWrite {
public void testWriteExactlyOnce() throws Exception {
// reset the config option
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 3);
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0007); // 734 bytes buffer size
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data

View File

@@ -23,12 +23,14 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestData;
@@ -37,6 +39,7 @@ import org.apache.avro.Schema;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.File;
import java.util.Comparator;
@@ -44,6 +47,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertThrows;
/**
* Test cases for delta stream write.
*/
@@ -86,6 +91,16 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite {
return EXPECTED1;
}
@Test
public void testAppendOnly() throws Exception {
conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, true);
conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
assertThrows(IllegalArgumentException.class, () -> {
funcWrapper.openFunction();
}, "APPEND_ONLY mode only support in COPY_ON_WRITE table");
}
protected Map<String, String> getMiniBatchExpected() {
Map<String, String> expected = new HashMap<>();
// MOR mode merges the messages with the same key.

View File

@@ -19,15 +19,18 @@
package org.apache.hudi.sink;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertThrows;
/**
* Test cases for delta stream write with compaction.
*/
@@ -39,10 +42,19 @@ public class TestWriteMergeOnReadWithCompact extends TestWriteCopyOnWrite {
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
}
@Disabled
@Override
protected Map<String, String> getExpectedBeforeCheckpointComplete() {
return EXPECTED1;
}
@Test
public void testIndexStateBootstrap() {
// Ignore the index bootstrap because we only support parquet load now.
public void testAppendOnly() throws Exception {
conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, true);
conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
assertThrows(IllegalArgumentException.class, () -> {
funcWrapper.openFunction();
}, "APPEND_ONLY mode only support in COPY_ON_WRITE table");
}
protected Map<String, String> getMiniBatchExpected() {

View File

@@ -360,8 +360,10 @@ public class TestData {
assert baseFile.isDirectory();
FileFilter filter = file -> !file.getName().startsWith(".");
File[] partitionDirs = baseFile.listFiles(filter);
assertNotNull(partitionDirs);
assertThat(partitionDirs.length, is(partitions));
for (File partitionDir : partitionDirs) {
File[] dataFiles = partitionDir.listFiles(filter);
assertNotNull(dataFiles);
@@ -381,6 +383,37 @@ public class TestData {
}
}
public static void checkWrittenAllData(
File baseFile,
Map<String, List<String>> expected,
int partitions) throws IOException {
assert baseFile.isDirectory();
FileFilter filter = file -> !file.getName().startsWith(".");
File[] partitionDirs = baseFile.listFiles(filter);
assertNotNull(partitionDirs);
assertThat(partitionDirs.length, is(partitions));
for (File partitionDir : partitionDirs) {
File[] dataFiles = partitionDir.listFiles(filter);
assertNotNull(dataFiles);
List<String> readBuffer = new ArrayList<>();
for (File dataFile : dataFiles) {
ParquetReader<GenericRecord> reader = AvroParquetReader
.<GenericRecord>builder(new Path(dataFile.getAbsolutePath())).build();
GenericRecord nextRecord = reader.read();
while (nextRecord != null) {
readBuffer.add(filterOutVariables(nextRecord));
nextRecord = reader.read();
}
readBuffer.sort(Comparator.naturalOrder());
}
assertThat(readBuffer, is(expected.get(partitionDir.getName())));
}
}
/**
* Checks the source data are written as expected.
*