1
0

[HUDI-2330][HUDI-2335] Adding support for merge-on-read tables (#3679)

- Inserts go into logs, hashed by Kafka and Hudi partitions
 - Fixed issues with the setupKafka script
 - Bumped up the default commit interval to 300 seconds
 - Minor renaming
This commit is contained in:
vinoth chandar
2021-09-16 15:24:34 -07:00
committed by GitHub
parent b8dad628e5
commit 57d5da68aa
16 changed files with 315 additions and 124 deletions

View File

@@ -201,6 +201,7 @@ public class WriteStatus implements Serializable {
public String toString() { public String toString() {
final StringBuilder sb = new StringBuilder("WriteStatus {"); final StringBuilder sb = new StringBuilder("WriteStatus {");
sb.append("fileId=").append(fileId); sb.append("fileId=").append(fileId);
sb.append(", writeStat=").append(stat);
sb.append(", globalError='").append(globalError).append('\''); sb.append(", globalError='").append(globalError).append('\'');
sb.append(", hasErrors='").append(hasErrors()).append('\''); sb.append(", hasErrors='").append(hasErrors()).append('\'');
sb.append(", errorCount='").append(totalErrorRecords).append('\''); sb.append(", errorCount='").append(totalErrorRecords).append('\'');

View File

@@ -18,14 +18,40 @@
package org.apache.hudi.table; package org.apache.hudi.table;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.JavaBulkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.JavaUpsertPreppedDeltaCommitActionExecutor;
import java.util.List;
public class HoodieJavaMergeOnReadTable<T extends HoodieRecordPayload> extends HoodieJavaCopyOnWriteTable<T> { public class HoodieJavaMergeOnReadTable<T extends HoodieRecordPayload> extends HoodieJavaCopyOnWriteTable<T> {
protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) { protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
super(config, context, metaClient); super(config, context, metaClient);
} }
// TODO not support yet.
@Override
public HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(HoodieEngineContext context,
String instantTime,
List<HoodieRecord<T>> preppedRecords) {
return new JavaUpsertPreppedDeltaCommitActionExecutor<>((HoodieJavaEngineContext) context, config,
this, instantTime, preppedRecords).execute();
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> bulkInsertPrepped(HoodieEngineContext context,
String instantTime,
List<HoodieRecord<T>> preppedRecords,
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
return new JavaBulkInsertPreppedCommitActionExecutor((HoodieJavaEngineContext) context, config,
this, instantTime, preppedRecords, bulkInsertPartitioner).execute();
}
} }

View File

@@ -29,9 +29,8 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.JavaHoodieIndex;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.JavaHoodieIndex;
import java.util.List; import java.util.List;
@@ -56,7 +55,7 @@ public abstract class HoodieJavaTable<T extends HoodieRecordPayload>
case COPY_ON_WRITE: case COPY_ON_WRITE:
return new HoodieJavaCopyOnWriteTable<>(config, context, metaClient); return new HoodieJavaCopyOnWriteTable<>(config, context, metaClient);
case MERGE_ON_READ: case MERGE_ON_READ:
throw new HoodieNotSupportedException("MERGE_ON_READ is not supported yet"); return new HoodieJavaMergeOnReadTable<>(config, context, metaClient);
default: default:
throw new HoodieException("Unsupported table type :" + metaClient.getTableType()); throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
} }

View File

@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.table.action.deltacommit;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor;
public abstract class BaseJavaDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>> extends BaseJavaCommitActionExecutor<T> {
public BaseJavaDeltaCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table,
String instantTime, WriteOperationType operationType) {
super(context, config, table, instantTime, operationType);
}
}

View File

@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.table.action.deltacommit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.HoodieAppendHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.JavaBulkInsertHelper;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
public class JavaUpsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>> extends BaseJavaDeltaCommitActionExecutor<T> {
private static final Logger LOG = LogManager.getLogger(JavaUpsertPreppedDeltaCommitActionExecutor.class);
private final List<HoodieRecord<T>> preppedInputRecords;
public JavaUpsertPreppedDeltaCommitActionExecutor(HoodieJavaEngineContext context, HoodieWriteConfig config, HoodieTable table,
String instantTime, List<HoodieRecord<T>> preppedInputRecords) {
super(context, config, table, instantTime, WriteOperationType.UPSERT_PREPPED);
this.preppedInputRecords = preppedInputRecords;
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> execute() {
HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
// First group by target file id.
HashMap<Pair<String, String>, List<HoodieRecord<T>>> recordsByFileId = new HashMap<>();
List<HoodieRecord<T>> insertedRecords = new LinkedList<>();
// Split records into inserts and updates.
for (HoodieRecord<T> record : preppedInputRecords) {
if (!record.isCurrentLocationKnown()) {
insertedRecords.add(record);
} else {
Pair<String, String> fileIdPartitionPath = Pair.of(record.getCurrentLocation().getFileId(), record.getPartitionPath());
if (!recordsByFileId.containsKey(fileIdPartitionPath)) {
recordsByFileId.put(fileIdPartitionPath, new LinkedList<>());
}
recordsByFileId.get(fileIdPartitionPath).add(record);
}
}
LOG.info(String.format("Total update fileIDs %s, total inserts %s for commit %s",
recordsByFileId.size(), insertedRecords.size(), instantTime));
List<WriteStatus> allWriteStatuses = new ArrayList<>();
try {
recordsByFileId.forEach((k, v) -> {
HoodieAppendHandle<?, ?, ?, ?> appendHandle = new HoodieAppendHandle(config, instantTime, table,
k.getRight(), k.getLeft(), v.iterator(), taskContextSupplier);
appendHandle.doAppend();
allWriteStatuses.addAll(appendHandle.close());
});
if (insertedRecords.size() > 0) {
HoodieWriteMetadata<List<WriteStatus>> insertResult = JavaBulkInsertHelper.newInstance()
.bulkInsert(insertedRecords, instantTime, table, config, this, false, Option.empty());
allWriteStatuses.addAll(insertResult.getWriteStatuses());
}
} catch (Throwable e) {
if (e instanceof HoodieUpsertException) {
throw e;
}
throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
}
updateIndex(allWriteStatuses, result);
return result;
}
}

View File

@@ -70,9 +70,9 @@ Wait until the kafka cluster is up and running.
### 2 - Set up the schema registry ### 2 - Set up the schema registry
Hudi leverages schema registry to obtain the latest schema when writing records. While it supports most popular schema registries, Hudi leverages schema registry to obtain the latest schema when writing records. While it supports most popular schema
we use Confluent schema registry. Download the latest confluent schema registry code from https://github.com/confluentinc/schema-registry registries, we use Confluent schema registry. Download the latest confluent platform and run the schema registry
and start the schema registry service. service.
```bash ```bash
cd $CONFLUENT_DIR cd $CONFLUENT_DIR
@@ -120,7 +120,7 @@ that can be changed based on the desired properties.
```bash ```bash
curl -X DELETE http://localhost:8083/connectors/hudi-sink curl -X DELETE http://localhost:8083/connectors/hudi-sink
curl -X POST -H "Content-Type:application/json" -d @$HUDI-DIR/hudi-kafka-connect/demo/config-sink.json http://localhost:8083/connectors curl -X POST -H "Content-Type:application/json" -d @${HUDI_DIR}/hudi-kafka-connect/demo/config-sink.json http://localhost:8083/connectors
``` ```
Now, you should see that the connector is created and tasks are running. Now, you should see that the connector is created and tasks are running.

View File

@@ -9,10 +9,11 @@
"value.converter.schemas.enable": "false", "value.converter.schemas.enable": "false",
"topics": "hudi-test-topic", "topics": "hudi-test-topic",
"hoodie.table.name": "hudi-test-topic", "hoodie.table.name": "hudi-test-topic",
"hoodie.table.type": "MERGE_ON_READ",
"hoodie.base.path": "file:///tmp/hoodie/hudi-test-topic", "hoodie.base.path": "file:///tmp/hoodie/hudi-test-topic",
"hoodie.datasource.write.recordkey.field": "volume", "hoodie.datasource.write.recordkey.field": "volume",
"hoodie.datasource.write.partitionpath.field": "date", "hoodie.datasource.write.partitionpath.field": "date",
"hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider", "hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider",
"hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8081/subjects/hudi-test-topic/versions/latest" "hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8081/subjects/hudi-test-topic/versions/latest"
} }
} }

131
hudi-kafka-connect/demo/setupKafka.sh Normal file → Executable file
View File

@@ -16,38 +16,33 @@
#!/bin/bash #!/bin/bash
## Directories
HOME_DIR=~
HUDI_DIR=${HOME_DIR}/hudi
KAFKA_HOME=${HOME_DIR}/kafka
######################### #########################
# The command line help # # The command line help #
######################### #########################
usage() { usage() {
echo "Usage: $0" echo "Usage: $0"
echo " -n |--num-kafka-records, (required) number of kafka records to generate" echo " -n |--num-kafka-records, (required) number of kafka records to generate"
echo " -f |--raw-file, (optional) raw file for the kafka records" echo " -f |--raw-file, (optional) raw file for the kafka records"
echo " -k |--kafka-topic, (optional) Topic name for Kafka" echo " -k |--kafka-topic, (optional) Topic name for Kafka"
echo " -m |--num-kafka-partitions, (optional) number of kafka partitions" echo " -m |--num-kafka-partitions, (optional) number of kafka partitions"
echo " -r |--record-key, (optional) field to use as record key" echo " -r |--record-key, (optional) field to use as record key"
echo " -l |--num-hudi-partitions, (optional) number of hudi partitions" echo " -l |--num-hudi-partitions, (optional) number of hudi partitions"
echo " -p |--partition-key, (optional) field to use as partition" echo " -p |--partition-key, (optional) field to use as partition"
echo " -s |--schema-file, (optional) path of the file containing the schema of the records" echo " -s |--schema-file, (optional) path of the file containing the schema of the records"
exit 1 exit 1
} }
case "$1" in case "$1" in
--help) --help)
usage usage
exit 0 exit 0
;; ;;
esac esac
if [ $# -lt 1 ]; then if [ $# -lt 1 ]; then
echo "Illegal number of parameters" echo "Illegal number of parameters"
usage usage
exit 0 exit 0
fi fi
## defaults ## defaults
@@ -61,71 +56,91 @@ schemaFile=${HUDI_DIR}/docker/demo/config/schema.avsc
while getopts ":n:f:k:m:r:l:p:s:-:" opt; do while getopts ":n:f:k:m:r:l:p:s:-:" opt; do
case $opt in case $opt in
n) num_records="$OPTARG" n)
num_records="$OPTARG"
printf "Argument num-kafka-records is %s\n" "$num_records" printf "Argument num-kafka-records is %s\n" "$num_records"
;; ;;
k) rawDataFile="$OPTARG" k)
rawDataFile="$OPTARG"
printf "Argument raw-file is %s\n" "$rawDataFile" printf "Argument raw-file is %s\n" "$rawDataFile"
;; ;;
f) kafkaTopicName="$OPTARG" f)
kafkaTopicName="$OPTARG"
printf "Argument kafka-topic is %s\n" "$kafkaTopicName" printf "Argument kafka-topic is %s\n" "$kafkaTopicName"
;; ;;
m) numKafkaPartitions="$OPTARG" m)
numKafkaPartitions="$OPTARG"
printf "Argument num-kafka-partitions is %s\n" "$numKafkaPartitions" printf "Argument num-kafka-partitions is %s\n" "$numKafkaPartitions"
;; ;;
r) recordKey="$OPTARG" r)
recordKey="$OPTARG"
printf "Argument record-key is %s\n" "$recordKey" printf "Argument record-key is %s\n" "$recordKey"
;; ;;
l) numHudiPartitions="$OPTARG" l)
numHudiPartitions="$OPTARG"
printf "Argument num-hudi-partitions is %s\n" "$numHudiPartitions" printf "Argument num-hudi-partitions is %s\n" "$numHudiPartitions"
;; ;;
p) partitionField="$OPTARG" p)
partitionField="$OPTARG"
printf "Argument partition-key is %s\n" "$partitionField" printf "Argument partition-key is %s\n" "$partitionField"
;; ;;
p) schemaFile="$OPTARG" p)
schemaFile="$OPTARG"
printf "Argument schema-file is %s\n" "$schemaFile" printf "Argument schema-file is %s\n" "$schemaFile"
;; ;;
-) echo "Invalid option -$OPTARG" >&2 -)
echo "Invalid option -$OPTARG" >&2
;; ;;
esac esac
done done
# First delete the existing topic # First delete the existing topic
$KAFKA_HOME/bin/kafka-topics.sh --delete --topic ${kafkaTopicName} --bootstrap-server localhost:9092 #${KAFKA_HOME}/bin/kafka-topics.sh --delete --topic ${kafkaTopicName} --bootstrap-server localhost:9092
# Create the topic with 4 partitions # Create the topic with 4 partitions
$KAFKA_HOME/bin/kafka-topics.sh --create --topic ${kafkaTopicName} --partitions $numKafkaPartitions --replication-factor 1 --bootstrap-server localhost:9092 #${KAFKA_HOME}/bin/kafka-topics.sh --create --topic ${kafkaTopicName} --partitions $numKafkaPartitions --replication-factor 1 --bootstrap-server localhost:9092
# Setup the schema registry # Setup the schema registry
export SCHEMA=`sed 's|/\*|\n&|g;s|*/|&\n|g' ${schemaFile} | sed '/\/\*/,/*\//d' | jq tostring` export SCHEMA=$(sed 's|/\*|\n&|g;s|*/|&\n|g' ${schemaFile} | sed '/\/\*/,/*\//d' | jq tostring)
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data "{\"schema\": $SCHEMA}" http://localhost:8081/subjects/${kafkaTopicName}/versions curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data "{\"schema\": $SCHEMA}" http://localhost:8081/subjects/${kafkaTopicName}/versions
curl -X GET http://localhost:8081/subjects/${kafkaTopicName}/versions/latest curl -X GET http://localhost:8081/subjects/${kafkaTopicName}/versions/latest
# Generate kafka messages from raw records # Generate kafka messages from raw records
# Each records with unique keys and generate equal messages across each hudi partition # Each records with unique keys and generate equal messages across each hudi partition
partitions={} partitions={}
for ((i=0; i<${numHudiPartitions}; i++)) for ((i = 0; i < ${numHudiPartitions}; i++)); do
do partitions[$i]="partition-"$i
partitions[$i]="partition-"$i;
done done
for ((recordValue=0; recordValue<=${num_records}; )) events_file=/tmp/kcat-input.events
do rm -f ${events_file}
while IFS= read line
do recordValue=0
for partitionValue in "${partitions[@]}" num_records=$((num_records + 0))
do
echo $line | jq --arg recordKey $recordKey --arg recordValue $recordValue --arg partitionField $partitionField --arg partitionValue $partitionValue -c '.[$recordKey] = $recordValue | .[$partitionField] = $partitionValue' | kafkacat -P -b localhost:9092 -t hudi-test-topic; for (( ; ; )); do
((recordValue++)); while IFS= read line; do
if [ $recordValue -gt ${num_records} ]; then for partitionValue in "${partitions[@]}"; do
exit 0 echo $line | jq --arg recordKey $recordKey --arg recordValue $recordValue --arg partitionField $partitionField --arg partitionValue $partitionValue -c '.[$recordKey] = $recordValue | .[$partitionField] = $partitionValue' >>${events_file}
fi ((recordValue = recordValue + 1))
done
if [ $recordValue -gt $num_records ]; then
if [ $(( $recordValue % 1000 )) -eq 0 ] break
then sleep 1 fi
fi done
done < "$rawDataFile"
done if [ $recordValue -gt $num_records ]; then
break
fi
if [ $(($recordValue % 1000)) -eq 0 ]; then
sleep 1
fi
done <"$rawDataFile"
if [ $recordValue -gt $num_records ]; then
break
fi
done
grep -v '^$' ${events_file} | kcat -P -b localhost:9092 -t hudi-test-topic

View File

@@ -18,17 +18,13 @@
package org.apache.hudi.connect; package org.apache.hudi.connect;
import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.FileIdPrefixProvider; import org.apache.hudi.table.FileIdPrefixProvider;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Objects;
import java.util.Properties; import java.util.Properties;
public class KafkaConnectFileIdPrefixProvider extends FileIdPrefixProvider { public class KafkaConnectFileIdPrefixProvider extends FileIdPrefixProvider {
@@ -52,18 +48,9 @@ public class KafkaConnectFileIdPrefixProvider extends FileIdPrefixProvider {
// We use a combination of kafka partition and partition path as the file id, and then hash it // We use a combination of kafka partition and partition path as the file id, and then hash it
// to generate a fixed sized hash. // to generate a fixed sized hash.
String rawFileIdPrefix = kafkaPartition + partitionPath; String rawFileIdPrefix = kafkaPartition + partitionPath;
MessageDigest md; String hashedPrefix = KafkaConnectUtils.hashDigest(rawFileIdPrefix);
try {
md = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
LOG.error("Fatal error selecting hash algorithm", e);
throw new HoodieException(e);
}
byte[] digest = Objects.requireNonNull(md).digest(rawFileIdPrefix.getBytes(StandardCharsets.UTF_8));
LOG.info("CreateFileId for Kafka Partition " + kafkaPartition + " : " + partitionPath + " = " + rawFileIdPrefix LOG.info("CreateFileId for Kafka Partition " + kafkaPartition + " : " + partitionPath + " = " + rawFileIdPrefix
+ " === " + StringUtils.toHexString(digest).toUpperCase()); + " === " + hashedPrefix);
return StringUtils.toHexString(digest).toUpperCase(); return hashedPrefix;
} }
} }

View File

@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.CustomAvroKeyGenerator; import org.apache.hudi.keygen.CustomAvroKeyGenerator;
@@ -41,8 +42,12 @@ import org.apache.kafka.common.KafkaFuture;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Properties; import java.util.Properties;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -137,4 +142,16 @@ public class KafkaConnectUtils {
return Option.empty(); return Option.empty();
} }
} }
public static String hashDigest(String stringToHash) {
MessageDigest md;
try {
md = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
LOG.error("Fatal error selecting hash algorithm", e);
throw new HoodieException(e);
}
byte[] digest = Objects.requireNonNull(md).digest(stringToHash.getBytes(StandardCharsets.UTF_8));
return StringUtils.toHexString(digest).toUpperCase();
}
} }

View File

@@ -21,7 +21,9 @@ package org.apache.hudi.connect.writers;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.schema.SchemaProvider; import org.apache.hudi.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor; import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
@@ -46,17 +48,19 @@ public abstract class AbstractConnectWriter implements ConnectWriter<WriteStatus
public static final String KAFKA_JSON_CONVERTER = "org.apache.kafka.connect.json.JsonConverter"; public static final String KAFKA_JSON_CONVERTER = "org.apache.kafka.connect.json.JsonConverter";
public static final String KAFKA_STRING_CONVERTER = "org.apache.kafka.connect.storage.StringConverter"; public static final String KAFKA_STRING_CONVERTER = "org.apache.kafka.connect.storage.StringConverter";
private static final Logger LOG = LogManager.getLogger(AbstractConnectWriter.class); private static final Logger LOG = LogManager.getLogger(AbstractConnectWriter.class);
protected final String instantTime;
private final KafkaConnectConfigs connectConfigs;
private final KeyGenerator keyGenerator; private final KeyGenerator keyGenerator;
private final SchemaProvider schemaProvider; private final SchemaProvider schemaProvider;
protected final KafkaConnectConfigs connectConfigs;
public AbstractConnectWriter(KafkaConnectConfigs connectConfigs, public AbstractConnectWriter(KafkaConnectConfigs connectConfigs,
KeyGenerator keyGenerator, KeyGenerator keyGenerator,
SchemaProvider schemaProvider) { SchemaProvider schemaProvider, String instantTime) {
this.connectConfigs = connectConfigs; this.connectConfigs = connectConfigs;
this.keyGenerator = keyGenerator; this.keyGenerator = keyGenerator;
this.schemaProvider = schemaProvider; this.schemaProvider = schemaProvider;
this.instantTime = instantTime;
} }
@Override @Override
@@ -76,16 +80,22 @@ public abstract class AbstractConnectWriter implements ConnectWriter<WriteStatus
throw new IOException("Unsupported Kafka Format type (" + connectConfigs.getKafkaValueConverter() + ")"); throw new IOException("Unsupported Kafka Format type (" + connectConfigs.getKafkaValueConverter() + ")");
} }
HoodieRecord hoodieRecord = new HoodieRecord<>(keyGenerator.getKey(avroRecord.get()), new HoodieAvroPayload(avroRecord)); // Tag records with a file ID based on kafka partition and hudi partition.
HoodieRecord<?> hoodieRecord = new HoodieRecord<>(keyGenerator.getKey(avroRecord.get()), new HoodieAvroPayload(avroRecord));
String fileId = KafkaConnectUtils.hashDigest(String.format("%s-%s", record.kafkaPartition(), hoodieRecord.getPartitionPath()));
hoodieRecord.unseal();
hoodieRecord.setCurrentLocation(new HoodieRecordLocation(instantTime, fileId));
hoodieRecord.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
hoodieRecord.seal();
writeHudiRecord(hoodieRecord); writeHudiRecord(hoodieRecord);
} }
@Override @Override
public List<WriteStatus> close() throws IOException { public List<WriteStatus> close() throws IOException {
return flushHudiRecords(); return flushRecords();
} }
protected abstract void writeHudiRecord(HoodieRecord<HoodieAvroPayload> record); protected abstract void writeHudiRecord(HoodieRecord<?> record);
protected abstract List<WriteStatus> flushHudiRecords() throws IOException; protected abstract List<WriteStatus> flushRecords() throws IOException;
} }

View File

@@ -21,8 +21,9 @@ package org.apache.hudi.connect.writers;
import org.apache.hudi.client.HoodieJavaWriteClient; import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator; import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
@@ -39,8 +40,8 @@ import org.apache.log4j.Logger;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
/** /**
* Specific implementation of a Hudi Writer that buffers all incoming records, * Specific implementation of a Hudi Writer that buffers all incoming records,
@@ -52,9 +53,8 @@ public class BufferedConnectWriter extends AbstractConnectWriter {
private final HoodieEngineContext context; private final HoodieEngineContext context;
private final HoodieJavaWriteClient writeClient; private final HoodieJavaWriteClient writeClient;
private final String instantTime;
private final HoodieWriteConfig config; private final HoodieWriteConfig config;
private ExternalSpillableMap<String, HoodieRecord<HoodieAvroPayload>> bufferedRecords; private ExternalSpillableMap<String, HoodieRecord<?>> bufferedRecords;
public BufferedConnectWriter(HoodieEngineContext context, public BufferedConnectWriter(HoodieEngineContext context,
HoodieJavaWriteClient writeClient, HoodieJavaWriteClient writeClient,
@@ -63,10 +63,9 @@ public class BufferedConnectWriter extends AbstractConnectWriter {
HoodieWriteConfig config, HoodieWriteConfig config,
KeyGenerator keyGenerator, KeyGenerator keyGenerator,
SchemaProvider schemaProvider) { SchemaProvider schemaProvider) {
super(connectConfigs, keyGenerator, schemaProvider); super(connectConfigs, keyGenerator, schemaProvider, instantTime);
this.context = context; this.context = context;
this.writeClient = writeClient; this.writeClient = writeClient;
this.instantTime = instantTime;
this.config = config; this.config = config;
init(); init();
} }
@@ -88,12 +87,12 @@ public class BufferedConnectWriter extends AbstractConnectWriter {
} }
@Override @Override
public void writeHudiRecord(HoodieRecord<HoodieAvroPayload> record) { public void writeHudiRecord(HoodieRecord<?> record) {
bufferedRecords.put(record.getRecordKey(), record); bufferedRecords.put(record.getRecordKey(), record);
} }
@Override @Override
public List<WriteStatus> flushHudiRecords() throws IOException { public List<WriteStatus> flushRecords() throws IOException {
try { try {
LOG.info("Number of entries in MemoryBasedMap => " LOG.info("Number of entries in MemoryBasedMap => "
+ bufferedRecords.getInMemoryMapNumEntries() + bufferedRecords.getInMemoryMapNumEntries()
@@ -102,15 +101,25 @@ public class BufferedConnectWriter extends AbstractConnectWriter {
+ bufferedRecords.getDiskBasedMapNumEntries() + "Size of file spilled to disk => " + bufferedRecords.getDiskBasedMapNumEntries() + "Size of file spilled to disk => "
+ bufferedRecords.getSizeOfFileOnDiskInBytes()); + bufferedRecords.getSizeOfFileOnDiskInBytes());
List<WriteStatus> writeStatuses = new ArrayList<>(); List<WriteStatus> writeStatuses = new ArrayList<>();
boolean isMorTable = Option.ofNullable(connectConfigs.getString(HoodieTableConfig.TYPE))
.map(t -> t.equals(HoodieTableType.MERGE_ON_READ.name()))
.orElse(false);
// Write out all records if non-empty // Write out all records if non-empty
if (!bufferedRecords.isEmpty()) { if (!bufferedRecords.isEmpty()) {
writeStatuses = writeClient.bulkInsertPreppedRecords( if (isMorTable) {
bufferedRecords.values().stream().collect(Collectors.toList()), writeStatuses = writeClient.upsertPreppedRecords(
instantTime, Option.empty()); new LinkedList<>(bufferedRecords.values()),
instantTime);
} else {
writeStatuses = writeClient.bulkInsertPreppedRecords(
new LinkedList<>(bufferedRecords.values()),
instantTime, Option.empty());
}
} }
bufferedRecords.close(); bufferedRecords.close();
LOG.info("Flushed hudi records and got writeStatuses: " LOG.info("Flushed hudi records and got writeStatuses: " + writeStatuses);
+ writeStatuses);
return writeStatuses; return writeStatuses;
} catch (Exception e) { } catch (Exception e) {
throw new IOException("Write records failed", e); throw new IOException("Write records failed", e);

View File

@@ -67,7 +67,7 @@ public class KafkaConnectConfigs extends HoodieConfig {
public static final ConfigProperty<String> COORDINATOR_WRITE_TIMEOUT_SECS = ConfigProperty public static final ConfigProperty<String> COORDINATOR_WRITE_TIMEOUT_SECS = ConfigProperty
.key("hoodie.kafka.coordinator.write.timeout.secs") .key("hoodie.kafka.coordinator.write.timeout.secs")
.defaultValue("60") .defaultValue("300")
.withDocumentation("The timeout after sending an END_COMMIT until when " .withDocumentation("The timeout after sending an END_COMMIT until when "
+ "the coordinator will wait for the write statuses from all the partitions" + "the coordinator will wait for the write statuses from all the partitions"
+ "to ignore the current commit and start a new commit."); + "to ignore the current commit and start a new commit.");

View File

@@ -23,12 +23,10 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext; import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.connect.transaction.TransactionCoordinator; import org.apache.hudi.connect.transaction.TransactionCoordinator;
@@ -38,7 +36,6 @@ import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory; import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@@ -54,19 +51,16 @@ import java.util.Map;
public class KafkaConnectTransactionServices implements ConnectTransactionServices { public class KafkaConnectTransactionServices implements ConnectTransactionServices {
private static final Logger LOG = LogManager.getLogger(KafkaConnectTransactionServices.class); private static final Logger LOG = LogManager.getLogger(KafkaConnectTransactionServices.class);
private static final String TABLE_FORMAT = "PARQUET";
private final Option<HoodieTableMetaClient> tableMetaClient; private final Option<HoodieTableMetaClient> tableMetaClient;
private final Configuration hadoopConf; private final Configuration hadoopConf;
private final FileSystem fs;
private final String tableBasePath; private final String tableBasePath;
private final String tableName; private final String tableName;
private final HoodieEngineContext context; private final HoodieEngineContext context;
private final HoodieJavaWriteClient<HoodieAvroPayload> javaClient; private final HoodieJavaWriteClient<HoodieAvroPayload> javaClient;
public KafkaConnectTransactionServices( public KafkaConnectTransactionServices(KafkaConnectConfigs connectConfigs) throws HoodieException {
KafkaConnectConfigs connectConfigs) throws HoodieException {
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withProperties(connectConfigs.getProps()).build(); .withProperties(connectConfigs.getProps()).build();
@@ -74,29 +68,25 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic
tableName = writeConfig.getTableName(); tableName = writeConfig.getTableName();
hadoopConf = KafkaConnectUtils.getDefaultHadoopConf(); hadoopConf = KafkaConnectUtils.getDefaultHadoopConf();
context = new HoodieJavaEngineContext(hadoopConf); context = new HoodieJavaEngineContext(hadoopConf);
fs = FSUtils.getFs(tableBasePath, hadoopConf);
try { try {
KeyGenerator keyGenerator = HoodieAvroKeyGeneratorFactory.createKeyGenerator( KeyGenerator keyGenerator = HoodieAvroKeyGeneratorFactory.createKeyGenerator(
new TypedProperties(connectConfigs.getProps())); new TypedProperties(connectConfigs.getProps()));
String recordKeyFields = KafkaConnectUtils.getRecordKeyColumns(keyGenerator); String recordKeyFields = KafkaConnectUtils.getRecordKeyColumns(keyGenerator);
String partitionColumns = KafkaConnectUtils.getPartitionColumns(keyGenerator, String partitionColumns = KafkaConnectUtils.getPartitionColumns(keyGenerator,
new TypedProperties(connectConfigs.getProps())); new TypedProperties(connectConfigs.getProps()));
LOG.info(String.format("Setting record key %s and partitionfields %s for table %s", LOG.info(String.format("Setting record key %s and partition fields %s for table %s",
recordKeyFields, recordKeyFields, partitionColumns, tableBasePath + tableName));
partitionColumns,
tableBasePath + tableName));
tableMetaClient = Option.of(HoodieTableMetaClient.withPropertyBuilder() tableMetaClient = Option.of(HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE.name()) .setTableType(HoodieTableType.COPY_ON_WRITE.name())
.setTableName(tableName) .setTableName(tableName)
.setPayloadClassName(HoodieAvroPayload.class.getName()) .setPayloadClassName(HoodieAvroPayload.class.getName())
.setBaseFileFormat(TABLE_FORMAT)
.setRecordKeyFields(recordKeyFields) .setRecordKeyFields(recordKeyFields)
.setPartitionFields(partitionColumns) .setPartitionFields(partitionColumns)
.setKeyGeneratorClassProp(writeConfig.getKeyGeneratorClass()) .setKeyGeneratorClassProp(writeConfig.getKeyGeneratorClass())
.fromProperties(connectConfigs.getProps())
.initTable(hadoopConf, tableBasePath)); .initTable(hadoopConf, tableBasePath));
javaClient = new HoodieJavaWriteClient<>(context, writeConfig); javaClient = new HoodieJavaWriteClient<>(context, writeConfig);
@@ -113,8 +103,7 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic
} }
public void endCommit(String commitTime, List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) { public void endCommit(String commitTime, List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) {
javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata), javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata));
HoodieActiveTimeline.COMMIT_ACTION, Collections.emptyMap());
LOG.info("Ending Hudi commit " + commitTime); LOG.info("Ending Hudi commit " + commitTime);
} }

View File

@@ -148,7 +148,7 @@ public class TestAbstractConnectWriter {
private List<HoodieRecord> writtenRecords; private List<HoodieRecord> writtenRecords;
public AbstractHudiConnectWriterTestWrapper(KafkaConnectConfigs connectConfigs, KeyGenerator keyGenerator, SchemaProvider schemaProvider) { public AbstractHudiConnectWriterTestWrapper(KafkaConnectConfigs connectConfigs, KeyGenerator keyGenerator, SchemaProvider schemaProvider) {
super(connectConfigs, keyGenerator, schemaProvider); super(connectConfigs, keyGenerator, schemaProvider, "000");
writtenRecords = new ArrayList<>(); writtenRecords = new ArrayList<>();
} }
@@ -157,12 +157,12 @@ public class TestAbstractConnectWriter {
} }
@Override @Override
protected void writeHudiRecord(HoodieRecord<HoodieAvroPayload> record) { protected void writeHudiRecord(HoodieRecord<?> record) {
writtenRecords.add(record); writtenRecords.add(record);
} }
@Override @Override
protected List<WriteStatus> flushHudiRecords() { protected List<WriteStatus> flushRecords() {
return null; return null;
} }
} }

View File

@@ -88,7 +88,7 @@ public class TestBufferedConnectWriter {
Mockito.verify(mockHoodieJavaWriteClient, times(0)) Mockito.verify(mockHoodieJavaWriteClient, times(0))
.bulkInsertPreppedRecords(anyList(), eq(COMMIT_TIME), eq(Option.empty())); .bulkInsertPreppedRecords(anyList(), eq(COMMIT_TIME), eq(Option.empty()));
writer.flushHudiRecords(); writer.flushRecords();
final ArgumentCaptor<List<HoodieRecord>> actualRecords = ArgumentCaptor.forClass(List.class); final ArgumentCaptor<List<HoodieRecord>> actualRecords = ArgumentCaptor.forClass(List.class);
Mockito.verify(mockHoodieJavaWriteClient, times(1)) Mockito.verify(mockHoodieJavaWriteClient, times(1))
.bulkInsertPreppedRecords(actualRecords.capture(), eq(COMMIT_TIME), eq(Option.empty())); .bulkInsertPreppedRecords(actualRecords.capture(), eq(COMMIT_TIME), eq(Option.empty()));