[HUDI-2551] Support DefaultHoodieRecordPayload for flink (#3792)
This commit is contained in:
@@ -65,7 +65,7 @@ public class FlinkLazyInsertIterable<T extends HoodieRecordPayload> extends Hood
|
|||||||
try {
|
try {
|
||||||
final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
|
final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
|
||||||
bufferedIteratorExecutor =
|
bufferedIteratorExecutor =
|
||||||
new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getTransformFunction(schema));
|
new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getTransformFunction(schema, hoodieConfig));
|
||||||
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
|
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
|
||||||
assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
|
assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
|
||||||
return result;
|
return result;
|
||||||
|
|||||||
@@ -100,7 +100,7 @@ public class FlinkOptions extends HoodieConfig {
|
|||||||
public static final ConfigOption<Integer> METADATA_COMPACTION_DELTA_COMMITS = ConfigOptions
|
public static final ConfigOption<Integer> METADATA_COMPACTION_DELTA_COMMITS = ConfigOptions
|
||||||
.key("metadata.compaction.delta_commits")
|
.key("metadata.compaction.delta_commits")
|
||||||
.intType()
|
.intType()
|
||||||
.defaultValue(24)
|
.defaultValue(10)
|
||||||
.withDescription("Max delta commits for metadata table to trigger compaction, default 24");
|
.withDescription("Max delta commits for metadata table to trigger compaction, default 24");
|
||||||
|
|
||||||
// ------------------------------------------------------------------------
|
// ------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -129,6 +129,13 @@ public class BootstrapOperator<I, O extends HoodieRecord>
|
|||||||
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)),
|
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)),
|
||||||
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)));
|
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)));
|
||||||
|
|
||||||
|
preLoadIndexRecords();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load the index records before {@link #processElement}.
|
||||||
|
*/
|
||||||
|
protected void preLoadIndexRecords() throws Exception {
|
||||||
String basePath = hoodieTable.getMetaClient().getBasePath();
|
String basePath = hoodieTable.getMetaClient().getBasePath();
|
||||||
int taskID = getRuntimeContext().getIndexOfThisSubtask();
|
int taskID = getRuntimeContext().getIndexOfThisSubtask();
|
||||||
LOG.info("Start loading records in table {} into the index state, taskId = {}", basePath, taskID);
|
LOG.info("Start loading records in table {} into the index state, taskId = {}", basePath, taskID);
|
||||||
|
|||||||
@@ -56,6 +56,11 @@ public class BatchBootstrapOperator<I, O extends HoodieRecord>
|
|||||||
this.haveSuccessfulCommits = StreamerUtil.haveSuccessfulCommits(hoodieTable.getMetaClient());
|
this.haveSuccessfulCommits = StreamerUtil.haveSuccessfulCommits(hoodieTable.getMetaClient());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void preLoadIndexRecords() {
|
||||||
|
// no operation
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void processElement(StreamRecord<I> element) throws Exception {
|
public void processElement(StreamRecord<I> element) throws Exception {
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ import org.apache.hudi.common.util.ReflectionUtils;
|
|||||||
import org.apache.hudi.common.util.ValidationUtils;
|
import org.apache.hudi.common.util.ValidationUtils;
|
||||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||||
import org.apache.hudi.config.HoodieMemoryConfig;
|
import org.apache.hudi.config.HoodieMemoryConfig;
|
||||||
|
import org.apache.hudi.config.HoodiePayloadConfig;
|
||||||
import org.apache.hudi.config.HoodieStorageConfig;
|
import org.apache.hudi.config.HoodieStorageConfig;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
@@ -189,6 +190,10 @@ public class StreamerUtil {
|
|||||||
.enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED))
|
.enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED))
|
||||||
.withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS))
|
.withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS))
|
||||||
.build())
|
.build())
|
||||||
|
.withPayloadConfig(HoodiePayloadConfig.newBuilder()
|
||||||
|
.withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
|
||||||
|
.withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
|
||||||
|
.build())
|
||||||
.withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton
|
.withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton
|
||||||
.withAutoCommit(false)
|
.withAutoCommit(false)
|
||||||
.withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
|
.withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.table;
|
package org.apache.hudi.table;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
|
||||||
import org.apache.hudi.common.model.HoodieTableType;
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
@@ -584,6 +585,34 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
|||||||
assertRowsEquals(result, "[+I[id1, Sophia, 18, 1970-01-01T00:00:05, par1]]");
|
assertRowsEquals(result, "[+I[id1, Sophia, 18, 1970-01-01T00:00:05, par1]]");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testUpdateWithDefaultHoodieRecordPayload() {
|
||||||
|
TableEnvironment tableEnv = batchTableEnv;
|
||||||
|
String hoodieTableDDL = sql("t1")
|
||||||
|
.field("id int")
|
||||||
|
.field("name string")
|
||||||
|
.field("price double")
|
||||||
|
.field("ts bigint")
|
||||||
|
.pkField("id")
|
||||||
|
.noPartition()
|
||||||
|
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
|
||||||
|
.option(FlinkOptions.PAYLOAD_CLASS_NAME, DefaultHoodieRecordPayload.class.getName())
|
||||||
|
.end();
|
||||||
|
tableEnv.executeSql(hoodieTableDDL);
|
||||||
|
|
||||||
|
final String insertInto1 = "insert into t1 values\n"
|
||||||
|
+ "(1,'a1',20,20)";
|
||||||
|
execInsertSql(tableEnv, insertInto1);
|
||||||
|
|
||||||
|
final String insertInto4 = "insert into t1 values\n"
|
||||||
|
+ "(1,'a1',20,1)";
|
||||||
|
execInsertSql(tableEnv, insertInto4);
|
||||||
|
|
||||||
|
List<Row> result = CollectionUtil.iterableToList(
|
||||||
|
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
|
||||||
|
assertRowsEquals(result, "[+I[1, a1, 20.0, 20]]");
|
||||||
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource("executionModeAndTableTypeParams")
|
@MethodSource("executionModeAndTableTypeParams")
|
||||||
void testWriteNonPartitionedTable(ExecMode execMode, HoodieTableType tableType) {
|
void testWriteNonPartitionedTable(ExecMode execMode, HoodieTableType tableType) {
|
||||||
|
|||||||
Reference in New Issue
Block a user