1
0

Fixing delete util method

This commit is contained in:
Sivabalan Narayanan
2020-01-14 23:32:20 -05:00
committed by Bhavani Sudha Saktheeswaran
parent 87fdb769f0
commit 2b2f23aa60

View File

@@ -28,6 +28,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import java.io.IOException;
import java.util.ArrayList;
@@ -94,7 +95,7 @@ public class QuickstartUtils {
}
public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
double timestamp) {
double timestamp) {
GenericRecord rec = new GenericData.Record(avroSchema);
rec.put("uuid", rowKey);
rec.put("ts", timestamp);
@@ -171,19 +172,15 @@ public class QuickstartUtils {
}
/**
* Generates new Deletes, randomly distributed across the keys above. There can be duplicates within the returned
* list
* Generates delete records for the passed in rows
*
* @param n Number of deletes (including dups)
* @param rows List of {@link Row}s for which delete record need to be generated
* @return list of hoodie records to delete
*/
public List<HoodieKey> generateDeletes(Integer n) throws IOException {
List<HoodieKey> deletes = new ArrayList<>();
for (int i = 0; i < n; i++) {
HoodieKey key = existingKeys.get(rand.nextInt(numExistingKeys));
deletes.add(key);
}
return deletes;
public List<String> generateDeletes(List<Row> rows) {
return rows.stream().map(row ->
convertToString(row.getAs("uuid"), row.getAs("partitionPath"))).filter(os -> os.isPresent()).map(os -> os.get())
.collect(Collectors.toList());
}
public void close() {
@@ -203,6 +200,16 @@ public class QuickstartUtils {
}
}
private static Option<String> convertToString(String uuid, String partitionPath) {
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append("{");
stringBuffer.append("\"ts\": 0.0,");
stringBuffer.append("\"uuid\": \"" + uuid + "\",");
stringBuffer.append("\"partitionpath\": \"" + partitionPath + "\"");
stringBuffer.append("}");
return Option.of(stringBuffer.toString());
}
public static List<String> convertToStringList(List<HoodieRecord> records) {
return records.stream().map(hr -> convertToString(hr)).filter(os -> os.isPresent()).map(os -> os.get())
.collect(Collectors.toList());