1
0

[MINOR] Add more configuration to Kafka setup script (#3992)

* [MINOR] Add more configuration to Kafka setup script

* Add option to reuse Kafka topic

* Minor fixes to README
This commit is contained in:
Y Ethan Guo
2021-11-22 18:03:38 -08:00
committed by GitHub
parent e22150fe15
commit 6aa710eae0
2 changed files with 70 additions and 34 deletions

View File

@@ -21,11 +21,14 @@
#########################
usage() {
echo "Usage: $0"
echo " -n |--num-kafka-records, (required) number of kafka records to generate"
echo " -n |--num-kafka-records, (required) number of kafka records to generate in a batch"
echo " -b |--num-batch, (optional) number of batches of records to generate (default is 1)"
echo " -t |--reuse-topic, (optional) reuses the Kafka topic (default deletes and recreate the topic)"
echo " -f |--raw-file, (optional) raw file for the kafka records"
echo " -k |--kafka-topic, (optional) Topic name for Kafka"
echo " -m |--num-kafka-partitions, (optional) number of kafka partitions"
echo " -r |--record-key, (optional) field to use as record key"
echo " -o |--record-key-offset, (optional) record key offset to start with (default is 0)"
echo " -l |--num-hudi-partitions, (optional) number of hudi partitions"
echo " -p |--partition-key, (optional) field to use as partition"
echo " -s |--schema-file, (optional) path of the file containing the schema of the records"
@@ -53,12 +56,23 @@ recordKey=volume
numHudiPartitions=5
partitionField=date
schemaFile=${HUDI_DIR}/docker/demo/config/schema.avsc
numBatch=1
recordValue=0
recreateTopic="Y"
while getopts ":n:f:k:m:r:l:p:s:-:" opt; do
while getopts ":n:b:tf:k:m:r:o:l:p:s:-:" opt; do
case $opt in
n)
num_records="$OPTARG"
printf "Argument num-kafka-records is %s\n" "$num_records"
numRecords="$OPTARG"
printf "Argument num-kafka-records is %s\n" "$numRecords"
;;
b)
numBatch="$OPTARG"
printf "Argument num-batch is %s\n" "$numBatch"
;;
t)
recreateTopic="N"
printf "Argument recreate-topic is N (reuse Kafka topic) \n"
;;
k)
rawDataFile="$OPTARG"
@@ -76,6 +90,10 @@ while getopts ":n:f:k:m:r:l:p:s:-:" opt; do
recordKey="$OPTARG"
printf "Argument record-key is %s\n" "$recordKey"
;;
o)
recordValue="$OPTARG"
printf "Argument record-key-offset is %s\n" "$recordValue"
;;
l)
numHudiPartitions="$OPTARG"
printf "Argument num-hudi-partitions is %s\n" "$numHudiPartitions"
@@ -84,7 +102,7 @@ while getopts ":n:f:k:m:r:l:p:s:-:" opt; do
partitionField="$OPTARG"
printf "Argument partition-key is %s\n" "$partitionField"
;;
p)
s)
schemaFile="$OPTARG"
printf "Argument schema-file is %s\n" "$schemaFile"
;;
@@ -94,11 +112,15 @@ while getopts ":n:f:k:m:r:l:p:s:-:" opt; do
esac
done
# First delete the existing topic
${KAFKA_HOME}/bin/kafka-topics.sh --delete --topic ${kafkaTopicName} --bootstrap-server localhost:9092
if [ $recreateTopic = "Y" ]; then
# First delete the existing topic
echo "Delete Kafka topic $kafkaTopicName ..."
${KAFKA_HOME}/bin/kafka-topics.sh --delete --topic ${kafkaTopicName} --bootstrap-server localhost:9092
# Create the topic with 4 partitions
${KAFKA_HOME}/bin/kafka-topics.sh --create --topic ${kafkaTopicName} --partitions $numKafkaPartitions --replication-factor 1 --bootstrap-server localhost:9092
# Create the topic with 4 partitions
echo "Create Kafka topic $kafkaTopicName ..."
${KAFKA_HOME}/bin/kafka-topics.sh --create --topic ${kafkaTopicName} --partitions $numKafkaPartitions --replication-factor 1 --bootstrap-server localhost:9092
fi
# Setup the schema registry
export SCHEMA=$(sed 's|/\*|\n&|g;s|*/|&\n|g' ${schemaFile} | sed '/\/\*/,/*\//d' | jq tostring)
@@ -115,32 +137,38 @@ done
events_file=/tmp/kcat-input.events
rm -f ${events_file}
recordValue=0
num_records=$((num_records + 0))
totalNumRecords=$((numRecords + recordValue))
for (( ; ; )); do
while IFS= read line; do
for partitionValue in "${partitions[@]}"; do
echo $line | jq --arg recordKey $recordKey --arg recordValue $recordValue --arg partitionField $partitionField --arg partitionValue $partitionValue -c '.[$recordKey] = $recordValue | .[$partitionField] = $partitionValue' >>${events_file}
((recordValue = recordValue + 1))
for ((i = 1;i<=numBatch;i++)); do
rm -f ${events_file}
date
echo "Start batch $i ..."
batchRecordSeq=0
for (( ; ; )); do
while IFS= read line; do
for partitionValue in "${partitions[@]}"; do
echo $line | jq --arg recordKey $recordKey --arg recordValue $recordValue --arg partitionField $partitionField --arg partitionValue $partitionValue -c '.[$recordKey] = $recordValue | .[$partitionField] = $partitionValue' >>${events_file}
((recordValue = recordValue + 1))
((batchRecordSeq = batchRecordSeq + 1))
if [ $recordValue -gt $num_records ]; then
if [ $batchRecordSeq -eq $numRecords ]; then
break
fi
done
if [ $batchRecordSeq -eq $numRecords ]; then
break
fi
done
done <"$rawDataFile"
if [ $recordValue -gt $num_records ]; then
break
fi
if [ $batchRecordSeq -eq $numRecords ]; then
date
echo " Record key until $recordValue"
sleep 20
break
fi
done
if [ $(($recordValue % 1000)) -eq 0 ]; then
sleep 1
fi
done <"$rawDataFile"
if [ $recordValue -gt $num_records ]; then
break
fi
echo "publish to Kafka ..."
grep -v '^$' ${events_file} | kcat -P -b localhost:9092 -t hudi-test-topic
done
grep -v '^$' ${events_file} | kcat -P -b localhost:9092 -t hudi-test-topic