diff --git a/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java b/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java new file mode 100644 index 000000000..c21e410a1 --- /dev/null +++ b/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.HoodieAvroUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieIOException; + +/** + * Class to be used in quickstart guide for generating inserts and updates against a corpus. + * Test data uses a toy Uber trips, data model. + */ +public class QuickstartUtils { + + public static class DataGenerator { + private static final String DEFAULT_FIRST_PARTITION_PATH = "americas/united_states/san_francisco"; + private static final String DEFAULT_SECOND_PARTITION_PATH = "americas/brazil/sao_paulo"; + private static final String DEFAULT_THIRD_PARTITION_PATH = "asia/india/chennai"; + + private static final String[] DEFAULT_PARTITION_PATHS = { + DEFAULT_FIRST_PARTITION_PATH, + DEFAULT_SECOND_PARTITION_PATH, + DEFAULT_THIRD_PARTITION_PATH + }; + static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ " + + "{\"name\": \"ts\",\"type\": \"double\"}," + + "{\"name\": \"uuid\", \"type\": \"string\"}," + + "{\"name\": \"rider\", \"type\": \"string\"}," + + "{\"name\": \"driver\", \"type\": \"string\"}," + + "{\"name\": \"begin_lat\", \"type\": \"double\"}," + + "{\"name\": \"begin_lon\", \"type\": \"double\"}," + + "{\"name\": \"end_lat\", \"type\": \"double\"}," + + "{\"name\": \"end_lon\", \"type\": \"double\"}," + + "{\"name\":\"fare\",\"type\": \"double\"}]}"; + static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); + + private static Random rand = new Random(46474747); + + private final Map existingKeys; + private final String[] partitionPaths; + private int numExistingKeys; + + public DataGenerator() { + this(DEFAULT_PARTITION_PATHS, new HashMap<>()); + } + + private DataGenerator(String[] partitionPaths, Map keyPartitionMap) { + this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length); + this.existingKeys = keyPartitionMap; + } + + private static String generateRandomString() { + int leftLimit = 48; // ascii for 0 + int rightLimit = 57; // ascii for 9 + int stringLength = 3; + StringBuilder buffer = new StringBuilder(stringLength); + for (int i = 0; i < stringLength; i++) { + int randomLimitedInt = leftLimit + (int) + (rand.nextFloat() * (rightLimit - leftLimit + 1)); + buffer.append((char) randomLimitedInt); + } + return buffer.toString(); + } + + public int getNumExistingKeys() { + return numExistingKeys; + } + + public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName, + double timestamp) { + GenericRecord rec = new GenericData.Record(avroSchema); + rec.put("uuid", rowKey); + rec.put("ts", timestamp); + rec.put("rider", riderName); + rec.put("driver", driverName); + rec.put("begin_lat", rand.nextDouble()); + rec.put("begin_lon", rand.nextDouble()); + rec.put("end_lat", rand.nextDouble()); + rec.put("end_lon", rand.nextDouble()); + rec.put("fare", rand.nextDouble() * 100); + return rec; + } + + /** + * Generates a new avro record of the above schema format, retaining the key if optionally provided. + * The riderDriverSuffix string is a random String to simulate updates by changing the rider driver fields + * for records belonging to the same commit. It is purely used for demo purposes. In real world, the actual + * updates are assumed to be provided based on the application requirements. + */ + public static OverwriteWithLatestAvroPayload generateRandomValue(HoodieKey key, String riderDriverSuffix) throws + IOException { + GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + riderDriverSuffix, "driver-" + + riderDriverSuffix, 0.0); + return new OverwriteWithLatestAvroPayload(Option.of(rec)); + } + + /** + * Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys. + */ + public Stream generateInsertsStream(String randomString, Integer n) { + int currSize = getNumExistingKeys(); + + return IntStream.range(0, n).boxed().map(i -> { + String partitionPath = partitionPaths[rand.nextInt(partitionPaths.length)]; + HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath); + existingKeys.put(currSize + i, key); + numExistingKeys++; + try { + return new HoodieRecord(key, generateRandomValue(key, randomString)); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + }); + } + + /** + * Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys. + */ + public List generateInserts(Integer n) throws IOException { + String randomString = generateRandomString(); + return generateInsertsStream(randomString, n).collect(Collectors.toList()); + } + + public HoodieRecord generateUpdateRecord(HoodieKey key, String randomString) throws IOException { + return new HoodieRecord(key, generateRandomValue(key, randomString)); + } + + /** + * Generates new updates, randomly distributed across the keys above. There can be duplicates within the returned + * list + * + * @param n Number of updates (including dups) + * @return list of hoodie record updates + */ + public List generateUpdates(Integer n) throws IOException { + String randomString = generateRandomString(); + List updates = new ArrayList<>(); + for (int i = 0; i < n; i++) { + HoodieKey key = existingKeys.get(rand.nextInt(numExistingKeys - 1)); + HoodieRecord record = generateUpdateRecord(key, randomString); + updates.add(record); + } + return updates; + } + + public void close() { + existingKeys.clear(); + } + } + + private static Option convertToString(HoodieRecord record) { + try { + String str = HoodieAvroUtils.bytesToAvro(((OverwriteWithLatestAvroPayload) record.getData()).recordBytes, + DataGenerator.avroSchema).toString(); + str = "{" + str.substring(str.indexOf("\"ts\":")); + return Option.of(str.replaceAll("}", + ", \"partitionpath\": \"" + record.getPartitionPath() + "\"}")); + } catch (IOException e) { + return Option.empty(); + } + } + + public static List convertToStringList(List records) { + return records.stream().map(hr -> convertToString(hr)).filter(os -> os.isPresent()) + .map(os -> os.get()).collect(Collectors.toList()); + } + + public static Map getQuickstartWriteConfigs() { + Map demoConfigs = new HashMap<>(); + demoConfigs.put("hoodie.insert.shuffle.parallelism", "2"); + demoConfigs.put("hoodie.upsert.shuffle.parallelism", "2"); + return demoConfigs; + } +}