diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java index a73138440..8f7485866 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java @@ -201,6 +201,7 @@ public class WriteStatus implements Serializable { public String toString() { final StringBuilder sb = new StringBuilder("WriteStatus {"); sb.append("fileId=").append(fileId); + sb.append(", writeStat=").append(stat); sb.append(", globalError='").append(globalError).append('\''); sb.append(", hasErrors='").append(hasErrors()).append('\''); sb.append(", errorCount='").append(totalErrorRecords).append('\''); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java index 4995af0d6..a78b71b24 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java @@ -18,14 +18,40 @@ package org.apache.hudi.table; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieJavaEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.action.commit.JavaBulkInsertPreppedCommitActionExecutor; +import org.apache.hudi.table.action.deltacommit.JavaUpsertPreppedDeltaCommitActionExecutor; + +import java.util.List; public class HoodieJavaMergeOnReadTable extends HoodieJavaCopyOnWriteTable { protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) { super(config, context, metaClient); } - // TODO not support yet. + + @Override + public HoodieWriteMetadata> upsertPrepped(HoodieEngineContext context, + String instantTime, + List> preppedRecords) { + return new JavaUpsertPreppedDeltaCommitActionExecutor<>((HoodieJavaEngineContext) context, config, + this, instantTime, preppedRecords).execute(); + + } + + @Override + public HoodieWriteMetadata> bulkInsertPrepped(HoodieEngineContext context, + String instantTime, + List> preppedRecords, + Option>>> bulkInsertPartitioner) { + return new JavaBulkInsertPreppedCommitActionExecutor((HoodieJavaEngineContext) context, config, + this, instantTime, preppedRecords, bulkInsertPartitioner).execute(); + } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java index 219dec4e2..9cf9a6dc1 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java @@ -29,9 +29,8 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieNotSupportedException; -import org.apache.hudi.index.JavaHoodieIndex; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.index.JavaHoodieIndex; import java.util.List; @@ -56,7 +55,7 @@ public abstract class HoodieJavaTable case COPY_ON_WRITE: return new HoodieJavaCopyOnWriteTable<>(config, context, metaClient); case MERGE_ON_READ: - throw new HoodieNotSupportedException("MERGE_ON_READ is not supported yet"); + return new HoodieJavaMergeOnReadTable<>(config, context, metaClient); default: throw new HoodieException("Unsupported table type :" + metaClient.getTableType()); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseJavaDeltaCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseJavaDeltaCommitActionExecutor.java new file mode 100644 index 000000000..0b4a65407 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseJavaDeltaCommitActionExecutor.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.action.deltacommit; + +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor; + +public abstract class BaseJavaDeltaCommitActionExecutor> extends BaseJavaCommitActionExecutor { + + public BaseJavaDeltaCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, + String instantTime, WriteOperationType operationType) { + super(context, config, table, instantTime, operationType); + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/deltacommit/JavaUpsertPreppedDeltaCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/deltacommit/JavaUpsertPreppedDeltaCommitActionExecutor.java new file mode 100644 index 000000000..f6faa28bb --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/deltacommit/JavaUpsertPreppedDeltaCommitActionExecutor.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.action.deltacommit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieJavaEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieUpsertException; +import org.apache.hudi.io.HoodieAppendHandle; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.action.commit.JavaBulkInsertHelper; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; + +public class JavaUpsertPreppedDeltaCommitActionExecutor> extends BaseJavaDeltaCommitActionExecutor { + + private static final Logger LOG = LogManager.getLogger(JavaUpsertPreppedDeltaCommitActionExecutor.class); + + private final List> preppedInputRecords; + + public JavaUpsertPreppedDeltaCommitActionExecutor(HoodieJavaEngineContext context, HoodieWriteConfig config, HoodieTable table, + String instantTime, List> preppedInputRecords) { + super(context, config, table, instantTime, WriteOperationType.UPSERT_PREPPED); + this.preppedInputRecords = preppedInputRecords; + } + + @Override + public HoodieWriteMetadata> execute() { + HoodieWriteMetadata> result = new HoodieWriteMetadata<>(); + // First group by target file id. + HashMap, List>> recordsByFileId = new HashMap<>(); + List> insertedRecords = new LinkedList<>(); + + // Split records into inserts and updates. + for (HoodieRecord record : preppedInputRecords) { + if (!record.isCurrentLocationKnown()) { + insertedRecords.add(record); + } else { + Pair fileIdPartitionPath = Pair.of(record.getCurrentLocation().getFileId(), record.getPartitionPath()); + if (!recordsByFileId.containsKey(fileIdPartitionPath)) { + recordsByFileId.put(fileIdPartitionPath, new LinkedList<>()); + } + recordsByFileId.get(fileIdPartitionPath).add(record); + } + } + LOG.info(String.format("Total update fileIDs %s, total inserts %s for commit %s", + recordsByFileId.size(), insertedRecords.size(), instantTime)); + + List allWriteStatuses = new ArrayList<>(); + try { + recordsByFileId.forEach((k, v) -> { + HoodieAppendHandle appendHandle = new HoodieAppendHandle(config, instantTime, table, + k.getRight(), k.getLeft(), v.iterator(), taskContextSupplier); + appendHandle.doAppend(); + allWriteStatuses.addAll(appendHandle.close()); + }); + + if (insertedRecords.size() > 0) { + HoodieWriteMetadata> insertResult = JavaBulkInsertHelper.newInstance() + .bulkInsert(insertedRecords, instantTime, table, config, this, false, Option.empty()); + allWriteStatuses.addAll(insertResult.getWriteStatuses()); + } + } catch (Throwable e) { + if (e instanceof HoodieUpsertException) { + throw e; + } + throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e); + } + + updateIndex(allWriteStatuses, result); + return result; + } +} diff --git a/hudi-kafka-connect/README.md b/hudi-kafka-connect/README.md index 85fc009ad..584fddf22 100644 --- a/hudi-kafka-connect/README.md +++ b/hudi-kafka-connect/README.md @@ -70,9 +70,9 @@ Wait until the kafka cluster is up and running. ### 2 - Set up the schema registry -Hudi leverages schema registry to obtain the latest schema when writing records. While it supports most popular schema registries, -we use Confluent schema registry. Download the latest confluent schema registry code from https://github.com/confluentinc/schema-registry -and start the schema registry service. +Hudi leverages schema registry to obtain the latest schema when writing records. While it supports most popular schema +registries, we use Confluent schema registry. Download the latest confluent platform and run the schema registry +service. ```bash cd $CONFLUENT_DIR @@ -120,7 +120,7 @@ that can be changed based on the desired properties. ```bash curl -X DELETE http://localhost:8083/connectors/hudi-sink -curl -X POST -H "Content-Type:application/json" -d @$HUDI-DIR/hudi-kafka-connect/demo/config-sink.json http://localhost:8083/connectors +curl -X POST -H "Content-Type:application/json" -d @${HUDI_DIR}/hudi-kafka-connect/demo/config-sink.json http://localhost:8083/connectors ``` Now, you should see that the connector is created and tasks are running. diff --git a/hudi-kafka-connect/demo/config-sink.json b/hudi-kafka-connect/demo/config-sink.json index 75e6d8448..2d2be00f8 100644 --- a/hudi-kafka-connect/demo/config-sink.json +++ b/hudi-kafka-connect/demo/config-sink.json @@ -9,10 +9,11 @@ "value.converter.schemas.enable": "false", "topics": "hudi-test-topic", "hoodie.table.name": "hudi-test-topic", + "hoodie.table.type": "MERGE_ON_READ", "hoodie.base.path": "file:///tmp/hoodie/hudi-test-topic", "hoodie.datasource.write.recordkey.field": "volume", "hoodie.datasource.write.partitionpath.field": "date", "hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider", "hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8081/subjects/hudi-test-topic/versions/latest" - } + } } diff --git a/hudi-kafka-connect/demo/setupKafka.sh b/hudi-kafka-connect/demo/setupKafka.sh old mode 100644 new mode 100755 index f2c173596..81968a4e7 --- a/hudi-kafka-connect/demo/setupKafka.sh +++ b/hudi-kafka-connect/demo/setupKafka.sh @@ -16,38 +16,33 @@ #!/bin/bash -## Directories -HOME_DIR=~ -HUDI_DIR=${HOME_DIR}/hudi -KAFKA_HOME=${HOME_DIR}/kafka - ######################### # The command line help # ######################### usage() { - echo "Usage: $0" - echo " -n |--num-kafka-records, (required) number of kafka records to generate" - echo " -f |--raw-file, (optional) raw file for the kafka records" - echo " -k |--kafka-topic, (optional) Topic name for Kafka" - echo " -m |--num-kafka-partitions, (optional) number of kafka partitions" - echo " -r |--record-key, (optional) field to use as record key" - echo " -l |--num-hudi-partitions, (optional) number of hudi partitions" - echo " -p |--partition-key, (optional) field to use as partition" - echo " -s |--schema-file, (optional) path of the file containing the schema of the records" - exit 1 + echo "Usage: $0" + echo " -n |--num-kafka-records, (required) number of kafka records to generate" + echo " -f |--raw-file, (optional) raw file for the kafka records" + echo " -k |--kafka-topic, (optional) Topic name for Kafka" + echo " -m |--num-kafka-partitions, (optional) number of kafka partitions" + echo " -r |--record-key, (optional) field to use as record key" + echo " -l |--num-hudi-partitions, (optional) number of hudi partitions" + echo " -p |--partition-key, (optional) field to use as partition" + echo " -s |--schema-file, (optional) path of the file containing the schema of the records" + exit 1 } case "$1" in - --help) - usage - exit 0 - ;; +--help) + usage + exit 0 + ;; esac if [ $# -lt 1 ]; then - echo "Illegal number of parameters" - usage - exit 0 + echo "Illegal number of parameters" + usage + exit 0 fi ## defaults @@ -61,71 +56,91 @@ schemaFile=${HUDI_DIR}/docker/demo/config/schema.avsc while getopts ":n:f:k:m:r:l:p:s:-:" opt; do case $opt in - n) num_records="$OPTARG" + n) + num_records="$OPTARG" printf "Argument num-kafka-records is %s\n" "$num_records" ;; - k) rawDataFile="$OPTARG" + k) + rawDataFile="$OPTARG" printf "Argument raw-file is %s\n" "$rawDataFile" ;; - f) kafkaTopicName="$OPTARG" + f) + kafkaTopicName="$OPTARG" printf "Argument kafka-topic is %s\n" "$kafkaTopicName" ;; - m) numKafkaPartitions="$OPTARG" + m) + numKafkaPartitions="$OPTARG" printf "Argument num-kafka-partitions is %s\n" "$numKafkaPartitions" ;; - r) recordKey="$OPTARG" + r) + recordKey="$OPTARG" printf "Argument record-key is %s\n" "$recordKey" ;; - l) numHudiPartitions="$OPTARG" + l) + numHudiPartitions="$OPTARG" printf "Argument num-hudi-partitions is %s\n" "$numHudiPartitions" ;; - p) partitionField="$OPTARG" + p) + partitionField="$OPTARG" printf "Argument partition-key is %s\n" "$partitionField" ;; - p) schemaFile="$OPTARG" + p) + schemaFile="$OPTARG" printf "Argument schema-file is %s\n" "$schemaFile" ;; - -) echo "Invalid option -$OPTARG" >&2 + -) + echo "Invalid option -$OPTARG" >&2 ;; -esac + esac done # First delete the existing topic -$KAFKA_HOME/bin/kafka-topics.sh --delete --topic ${kafkaTopicName} --bootstrap-server localhost:9092 +#${KAFKA_HOME}/bin/kafka-topics.sh --delete --topic ${kafkaTopicName} --bootstrap-server localhost:9092 # Create the topic with 4 partitions -$KAFKA_HOME/bin/kafka-topics.sh --create --topic ${kafkaTopicName} --partitions $numKafkaPartitions --replication-factor 1 --bootstrap-server localhost:9092 - +#${KAFKA_HOME}/bin/kafka-topics.sh --create --topic ${kafkaTopicName} --partitions $numKafkaPartitions --replication-factor 1 --bootstrap-server localhost:9092 # Setup the schema registry -export SCHEMA=`sed 's|/\*|\n&|g;s|*/|&\n|g' ${schemaFile} | sed '/\/\*/,/*\//d' | jq tostring` +export SCHEMA=$(sed 's|/\*|\n&|g;s|*/|&\n|g' ${schemaFile} | sed '/\/\*/,/*\//d' | jq tostring) curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data "{\"schema\": $SCHEMA}" http://localhost:8081/subjects/${kafkaTopicName}/versions curl -X GET http://localhost:8081/subjects/${kafkaTopicName}/versions/latest - # Generate kafka messages from raw records # Each records with unique keys and generate equal messages across each hudi partition partitions={} -for ((i=0; i<${numHudiPartitions}; i++)) -do - partitions[$i]="partition-"$i; +for ((i = 0; i < ${numHudiPartitions}; i++)); do + partitions[$i]="partition-"$i done -for ((recordValue=0; recordValue<=${num_records}; )) -do - while IFS= read line - do - for partitionValue in "${partitions[@]}" - do - echo $line | jq --arg recordKey $recordKey --arg recordValue $recordValue --arg partitionField $partitionField --arg partitionValue $partitionValue -c '.[$recordKey] = $recordValue | .[$partitionField] = $partitionValue' | kafkacat -P -b localhost:9092 -t hudi-test-topic; - ((recordValue++)); - if [ $recordValue -gt ${num_records} ]; then - exit 0 - fi - done - - if [ $(( $recordValue % 1000 )) -eq 0 ] - then sleep 1 - fi - done < "$rawDataFile" -done +events_file=/tmp/kcat-input.events +rm -f ${events_file} + +recordValue=0 +num_records=$((num_records + 0)) + +for (( ; ; )); do + while IFS= read line; do + for partitionValue in "${partitions[@]}"; do + echo $line | jq --arg recordKey $recordKey --arg recordValue $recordValue --arg partitionField $partitionField --arg partitionValue $partitionValue -c '.[$recordKey] = $recordValue | .[$partitionField] = $partitionValue' >>${events_file} + ((recordValue = recordValue + 1)) + + if [ $recordValue -gt $num_records ]; then + break + fi + done + + if [ $recordValue -gt $num_records ]; then + break + fi + + if [ $(($recordValue % 1000)) -eq 0 ]; then + sleep 1 + fi + done <"$rawDataFile" + + if [ $recordValue -gt $num_records ]; then + break + fi +done + +grep -v '^$' ${events_file} | kcat -P -b localhost:9092 -t hudi-test-topic diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java index 536ad4a80..9c4674706 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java @@ -18,17 +18,13 @@ package org.apache.hudi.connect; -import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.connect.utils.KafkaConnectUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.FileIdPrefixProvider; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.nio.charset.StandardCharsets; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.Objects; import java.util.Properties; public class KafkaConnectFileIdPrefixProvider extends FileIdPrefixProvider { @@ -52,18 +48,9 @@ public class KafkaConnectFileIdPrefixProvider extends FileIdPrefixProvider { // We use a combination of kafka partition and partition path as the file id, and then hash it // to generate a fixed sized hash. String rawFileIdPrefix = kafkaPartition + partitionPath; - MessageDigest md; - try { - md = MessageDigest.getInstance("MD5"); - } catch (NoSuchAlgorithmException e) { - LOG.error("Fatal error selecting hash algorithm", e); - throw new HoodieException(e); - } - - byte[] digest = Objects.requireNonNull(md).digest(rawFileIdPrefix.getBytes(StandardCharsets.UTF_8)); - + String hashedPrefix = KafkaConnectUtils.hashDigest(rawFileIdPrefix); LOG.info("CreateFileId for Kafka Partition " + kafkaPartition + " : " + partitionPath + " = " + rawFileIdPrefix - + " === " + StringUtils.toHexString(digest).toUpperCase()); - return StringUtils.toHexString(digest).toUpperCase(); + + " === " + hashedPrefix); + return hashedPrefix; } } diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java index 593cfb124..34a44c8d0 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.CustomAvroKeyGenerator; @@ -41,8 +42,12 @@ import org.apache.kafka.common.KafkaFuture; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.stream.Collectors; @@ -137,4 +142,16 @@ public class KafkaConnectUtils { return Option.empty(); } } + + public static String hashDigest(String stringToHash) { + MessageDigest md; + try { + md = MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + LOG.error("Fatal error selecting hash algorithm", e); + throw new HoodieException(e); + } + byte[] digest = Objects.requireNonNull(md).digest(stringToHash.getBytes(StandardCharsets.UTF_8)); + return StringUtils.toHexString(digest).toUpperCase(); + } } diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java index 3d8e5f8ab..9888fd1d5 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java @@ -21,7 +21,9 @@ package org.apache.hudi.connect.writers; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.util.Option; +import org.apache.hudi.connect.utils.KafkaConnectUtils; import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.AvroConvertor; @@ -46,17 +48,19 @@ public abstract class AbstractConnectWriter implements ConnectWriter(keyGenerator.getKey(avroRecord.get()), new HoodieAvroPayload(avroRecord)); + // Tag records with a file ID based on kafka partition and hudi partition. + HoodieRecord hoodieRecord = new HoodieRecord<>(keyGenerator.getKey(avroRecord.get()), new HoodieAvroPayload(avroRecord)); + String fileId = KafkaConnectUtils.hashDigest(String.format("%s-%s", record.kafkaPartition(), hoodieRecord.getPartitionPath())); + hoodieRecord.unseal(); + hoodieRecord.setCurrentLocation(new HoodieRecordLocation(instantTime, fileId)); + hoodieRecord.setNewLocation(new HoodieRecordLocation(instantTime, fileId)); + hoodieRecord.seal(); writeHudiRecord(hoodieRecord); } @Override public List close() throws IOException { - return flushHudiRecords(); + return flushRecords(); } - protected abstract void writeHudiRecord(HoodieRecord record); + protected abstract void writeHudiRecord(HoodieRecord record); - protected abstract List flushHudiRecords() throws IOException; + protected abstract List flushRecords() throws IOException; } diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java index a60293d00..0449f071d 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java @@ -21,8 +21,9 @@ package org.apache.hudi.connect.writers; import org.apache.hudi.client.HoodieJavaWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.HoodieRecordSizeEstimator; import org.apache.hudi.common.util.Option; @@ -39,8 +40,8 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; -import java.util.stream.Collectors; /** * Specific implementation of a Hudi Writer that buffers all incoming records, @@ -52,9 +53,8 @@ public class BufferedConnectWriter extends AbstractConnectWriter { private final HoodieEngineContext context; private final HoodieJavaWriteClient writeClient; - private final String instantTime; private final HoodieWriteConfig config; - private ExternalSpillableMap> bufferedRecords; + private ExternalSpillableMap> bufferedRecords; public BufferedConnectWriter(HoodieEngineContext context, HoodieJavaWriteClient writeClient, @@ -63,10 +63,9 @@ public class BufferedConnectWriter extends AbstractConnectWriter { HoodieWriteConfig config, KeyGenerator keyGenerator, SchemaProvider schemaProvider) { - super(connectConfigs, keyGenerator, schemaProvider); + super(connectConfigs, keyGenerator, schemaProvider, instantTime); this.context = context; this.writeClient = writeClient; - this.instantTime = instantTime; this.config = config; init(); } @@ -88,12 +87,12 @@ public class BufferedConnectWriter extends AbstractConnectWriter { } @Override - public void writeHudiRecord(HoodieRecord record) { + public void writeHudiRecord(HoodieRecord record) { bufferedRecords.put(record.getRecordKey(), record); } @Override - public List flushHudiRecords() throws IOException { + public List flushRecords() throws IOException { try { LOG.info("Number of entries in MemoryBasedMap => " + bufferedRecords.getInMemoryMapNumEntries() @@ -102,15 +101,25 @@ public class BufferedConnectWriter extends AbstractConnectWriter { + bufferedRecords.getDiskBasedMapNumEntries() + "Size of file spilled to disk => " + bufferedRecords.getSizeOfFileOnDiskInBytes()); List writeStatuses = new ArrayList<>(); + + boolean isMorTable = Option.ofNullable(connectConfigs.getString(HoodieTableConfig.TYPE)) + .map(t -> t.equals(HoodieTableType.MERGE_ON_READ.name())) + .orElse(false); + // Write out all records if non-empty if (!bufferedRecords.isEmpty()) { - writeStatuses = writeClient.bulkInsertPreppedRecords( - bufferedRecords.values().stream().collect(Collectors.toList()), - instantTime, Option.empty()); + if (isMorTable) { + writeStatuses = writeClient.upsertPreppedRecords( + new LinkedList<>(bufferedRecords.values()), + instantTime); + } else { + writeStatuses = writeClient.bulkInsertPreppedRecords( + new LinkedList<>(bufferedRecords.values()), + instantTime, Option.empty()); + } } bufferedRecords.close(); - LOG.info("Flushed hudi records and got writeStatuses: " - + writeStatuses); + LOG.info("Flushed hudi records and got writeStatuses: " + writeStatuses); return writeStatuses; } catch (Exception e) { throw new IOException("Write records failed", e); diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java index ae6b5d1d3..e5662bdfd 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java @@ -67,7 +67,7 @@ public class KafkaConnectConfigs extends HoodieConfig { public static final ConfigProperty COORDINATOR_WRITE_TIMEOUT_SECS = ConfigProperty .key("hoodie.kafka.coordinator.write.timeout.secs") - .defaultValue("60") + .defaultValue("300") .withDocumentation("The timeout after sending an END_COMMIT until when " + "the coordinator will wait for the write statuses from all the partitions" + "to ignore the current commit and start a new commit."); diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java index ad40ebcb7..8039e56d3 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java @@ -23,12 +23,10 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieJavaEngineContext; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.connect.transaction.TransactionCoordinator; @@ -38,7 +36,6 @@ import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -54,19 +51,16 @@ import java.util.Map; public class KafkaConnectTransactionServices implements ConnectTransactionServices { private static final Logger LOG = LogManager.getLogger(KafkaConnectTransactionServices.class); - private static final String TABLE_FORMAT = "PARQUET"; private final Option tableMetaClient; private final Configuration hadoopConf; - private final FileSystem fs; private final String tableBasePath; private final String tableName; private final HoodieEngineContext context; private final HoodieJavaWriteClient javaClient; - public KafkaConnectTransactionServices( - KafkaConnectConfigs connectConfigs) throws HoodieException { + public KafkaConnectTransactionServices(KafkaConnectConfigs connectConfigs) throws HoodieException { HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() .withProperties(connectConfigs.getProps()).build(); @@ -74,29 +68,25 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic tableName = writeConfig.getTableName(); hadoopConf = KafkaConnectUtils.getDefaultHadoopConf(); context = new HoodieJavaEngineContext(hadoopConf); - fs = FSUtils.getFs(tableBasePath, hadoopConf); try { KeyGenerator keyGenerator = HoodieAvroKeyGeneratorFactory.createKeyGenerator( new TypedProperties(connectConfigs.getProps())); - String recordKeyFields = KafkaConnectUtils.getRecordKeyColumns(keyGenerator); String partitionColumns = KafkaConnectUtils.getPartitionColumns(keyGenerator, new TypedProperties(connectConfigs.getProps())); - LOG.info(String.format("Setting record key %s and partitionfields %s for table %s", - recordKeyFields, - partitionColumns, - tableBasePath + tableName)); + LOG.info(String.format("Setting record key %s and partition fields %s for table %s", + recordKeyFields, partitionColumns, tableBasePath + tableName)); tableMetaClient = Option.of(HoodieTableMetaClient.withPropertyBuilder() .setTableType(HoodieTableType.COPY_ON_WRITE.name()) .setTableName(tableName) .setPayloadClassName(HoodieAvroPayload.class.getName()) - .setBaseFileFormat(TABLE_FORMAT) .setRecordKeyFields(recordKeyFields) .setPartitionFields(partitionColumns) .setKeyGeneratorClassProp(writeConfig.getKeyGeneratorClass()) + .fromProperties(connectConfigs.getProps()) .initTable(hadoopConf, tableBasePath)); javaClient = new HoodieJavaWriteClient<>(context, writeConfig); @@ -113,8 +103,7 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic } public void endCommit(String commitTime, List writeStatuses, Map extraMetadata) { - javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata), - HoodieActiveTimeline.COMMIT_ACTION, Collections.emptyMap()); + javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata)); LOG.info("Ending Hudi commit " + commitTime); } diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java index 3ca64c33d..c8a3ad6ff 100644 --- a/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java +++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java @@ -148,7 +148,7 @@ public class TestAbstractConnectWriter { private List writtenRecords; public AbstractHudiConnectWriterTestWrapper(KafkaConnectConfigs connectConfigs, KeyGenerator keyGenerator, SchemaProvider schemaProvider) { - super(connectConfigs, keyGenerator, schemaProvider); + super(connectConfigs, keyGenerator, schemaProvider, "000"); writtenRecords = new ArrayList<>(); } @@ -157,12 +157,12 @@ public class TestAbstractConnectWriter { } @Override - protected void writeHudiRecord(HoodieRecord record) { + protected void writeHudiRecord(HoodieRecord record) { writtenRecords.add(record); } @Override - protected List flushHudiRecords() { + protected List flushRecords() { return null; } } diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestBufferedConnectWriter.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestBufferedConnectWriter.java index d1813e1a6..b0dcf38f4 100644 --- a/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestBufferedConnectWriter.java +++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestBufferedConnectWriter.java @@ -88,7 +88,7 @@ public class TestBufferedConnectWriter { Mockito.verify(mockHoodieJavaWriteClient, times(0)) .bulkInsertPreppedRecords(anyList(), eq(COMMIT_TIME), eq(Option.empty())); - writer.flushHudiRecords(); + writer.flushRecords(); final ArgumentCaptor> actualRecords = ArgumentCaptor.forClass(List.class); Mockito.verify(mockHoodieJavaWriteClient, times(1)) .bulkInsertPreppedRecords(actualRecords.capture(), eq(COMMIT_TIME), eq(Option.empty()));