[HUDI-2891] Fix write configs for Java engine in Kafka Connect Sink (#4161)
This commit is contained in:
@@ -10,7 +10,6 @@
|
|||||||
"topics": "hudi-test-topic",
|
"topics": "hudi-test-topic",
|
||||||
"hoodie.table.name": "hudi-test-topic",
|
"hoodie.table.name": "hudi-test-topic",
|
||||||
"hoodie.table.type": "MERGE_ON_READ",
|
"hoodie.table.type": "MERGE_ON_READ",
|
||||||
"hoodie.metadata.enable": "false",
|
|
||||||
"hoodie.base.path": "hdfs://namenode:8020/user/hive/warehouse/hudi-test-topic",
|
"hoodie.base.path": "hdfs://namenode:8020/user/hive/warehouse/hudi-test-topic",
|
||||||
"hoodie.datasource.write.recordkey.field": "volume",
|
"hoodie.datasource.write.recordkey.field": "volume",
|
||||||
"hoodie.datasource.write.partitionpath.field": "date",
|
"hoodie.datasource.write.partitionpath.field": "date",
|
||||||
|
|||||||
@@ -10,7 +10,6 @@
|
|||||||
"topics": "hudi-test-topic",
|
"topics": "hudi-test-topic",
|
||||||
"hoodie.table.name": "hudi-test-topic",
|
"hoodie.table.name": "hudi-test-topic",
|
||||||
"hoodie.table.type": "MERGE_ON_READ",
|
"hoodie.table.type": "MERGE_ON_READ",
|
||||||
"hoodie.metadata.enable": "false",
|
|
||||||
"hoodie.base.path": "file:///tmp/hoodie/hudi-test-topic",
|
"hoodie.base.path": "file:///tmp/hoodie/hudi-test-topic",
|
||||||
"hoodie.datasource.write.recordkey.field": "volume",
|
"hoodie.datasource.write.recordkey.field": "volume",
|
||||||
"hoodie.datasource.write.partitionpath.field": "date",
|
"hoodie.datasource.write.partitionpath.field": "date",
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ import org.apache.hudi.client.HoodieJavaWriteClient;
|
|||||||
import org.apache.hudi.client.WriteStatus;
|
import org.apache.hudi.client.WriteStatus;
|
||||||
import org.apache.hudi.client.common.HoodieJavaEngineContext;
|
import org.apache.hudi.client.common.HoodieJavaEngineContext;
|
||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
|
import org.apache.hudi.common.engine.EngineType;
|
||||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||||
import org.apache.hudi.common.model.HoodieAvroPayload;
|
import org.apache.hudi.common.model.HoodieAvroPayload;
|
||||||
import org.apache.hudi.common.util.ReflectionUtils;
|
import org.apache.hudi.common.util.ReflectionUtils;
|
||||||
@@ -74,6 +75,7 @@ public class KafkaConnectWriterProvider implements ConnectWriterProvider<WriteSt
|
|||||||
|
|
||||||
// Create the write client to write some records in
|
// Create the write client to write some records in
|
||||||
writeConfig = HoodieWriteConfig.newBuilder()
|
writeConfig = HoodieWriteConfig.newBuilder()
|
||||||
|
.withEngineType(EngineType.JAVA)
|
||||||
.withProperties(connectConfigs.getProps())
|
.withProperties(connectConfigs.getProps())
|
||||||
.withFileIdPrefixProviderClassName(KafkaConnectFileIdPrefixProvider.class.getName())
|
.withFileIdPrefixProviderClassName(KafkaConnectFileIdPrefixProvider.class.getName())
|
||||||
.withProps(Collections.singletonMap(
|
.withProps(Collections.singletonMap(
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ package org.apache.hudi.writers;
|
|||||||
|
|
||||||
import org.apache.hudi.client.HoodieJavaWriteClient;
|
import org.apache.hudi.client.HoodieJavaWriteClient;
|
||||||
import org.apache.hudi.client.common.HoodieJavaEngineContext;
|
import org.apache.hudi.client.common.HoodieJavaEngineContext;
|
||||||
|
import org.apache.hudi.common.engine.EngineType;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
@@ -62,6 +63,7 @@ public class TestBufferedConnectWriter {
|
|||||||
configs = KafkaConnectConfigs.newBuilder().build();
|
configs = KafkaConnectConfigs.newBuilder().build();
|
||||||
schemaProvider = new TestAbstractConnectWriter.TestSchemaProvider();
|
schemaProvider = new TestAbstractConnectWriter.TestSchemaProvider();
|
||||||
writeConfig = HoodieWriteConfig.newBuilder()
|
writeConfig = HoodieWriteConfig.newBuilder()
|
||||||
|
.withEngineType(EngineType.JAVA)
|
||||||
.withPath("/tmp")
|
.withPath("/tmp")
|
||||||
.withSchema(schemaProvider.getSourceSchema().toString())
|
.withSchema(schemaProvider.getSourceSchema().toString())
|
||||||
.build();
|
.build();
|
||||||
|
|||||||
Reference in New Issue
Block a user