From 2b2f23aa60bde21ab6ef2404ece1cbfa6ab1324c Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Tue, 14 Jan 2020 23:32:20 -0500 Subject: [PATCH] Fixing delete util method --- .../java/org/apache/hudi/QuickstartUtils.java | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java b/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java index 2ef90afd3..3d5da9a60 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java @@ -28,6 +28,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.spark.sql.Row; import java.io.IOException; import java.util.ArrayList; @@ -94,7 +95,7 @@ public class QuickstartUtils { } public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName, - double timestamp) { + double timestamp) { GenericRecord rec = new GenericData.Record(avroSchema); rec.put("uuid", rowKey); rec.put("ts", timestamp); @@ -171,19 +172,15 @@ public class QuickstartUtils { } /** - * Generates new Deletes, randomly distributed across the keys above. There can be duplicates within the returned - * list + * Generates delete records for the passed in rows * - * @param n Number of deletes (including dups) + * @param rows List of {@link Row}s for which delete record need to be generated * @return list of hoodie records to delete */ - public List generateDeletes(Integer n) throws IOException { - List deletes = new ArrayList<>(); - for (int i = 0; i < n; i++) { - HoodieKey key = existingKeys.get(rand.nextInt(numExistingKeys)); - deletes.add(key); - } - return deletes; + public List generateDeletes(List rows) { + return rows.stream().map(row -> + convertToString(row.getAs("uuid"), row.getAs("partitionPath"))).filter(os -> os.isPresent()).map(os -> os.get()) + .collect(Collectors.toList()); } public void close() { @@ -203,6 +200,16 @@ public class QuickstartUtils { } } + private static Option convertToString(String uuid, String partitionPath) { + StringBuffer stringBuffer = new StringBuffer(); + stringBuffer.append("{"); + stringBuffer.append("\"ts\": 0.0,"); + stringBuffer.append("\"uuid\": \"" + uuid + "\","); + stringBuffer.append("\"partitionpath\": \"" + partitionPath + "\""); + stringBuffer.append("}"); + return Option.of(stringBuffer.toString()); + } + public static List convertToStringList(List records) { return records.stream().map(hr -> convertToString(hr)).filter(os -> os.isPresent()).map(os -> os.get()) .collect(Collectors.toList());