[HUDI-1533] Make SerializableSchema work for large schemas and add ability to sortBy numeric values (#2453)
This commit is contained in:
@@ -20,6 +20,7 @@ package org.apache.hudi.execution.bulkinsert;
|
|||||||
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
|
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||||
import org.apache.hudi.common.config.SerializableSchema;
|
import org.apache.hudi.common.config.SerializableSchema;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
@@ -58,18 +59,22 @@ public class RDDCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
|
|||||||
public boolean arePartitionRecordsSorted() {
|
public boolean arePartitionRecordsSorted() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String getRecordSortColumnValues(HoodieRecord<? extends HoodieRecordPayload> record,
|
private static Object getRecordSortColumnValues(HoodieRecord<? extends HoodieRecordPayload> record,
|
||||||
String[] sortColumns,
|
String[] sortColumns,
|
||||||
SerializableSchema schema) {
|
SerializableSchema schema) {
|
||||||
try {
|
try {
|
||||||
GenericRecord genericRecord = (GenericRecord) record.getData().getInsertValue(schema.get()).get();
|
GenericRecord genericRecord = (GenericRecord) record.getData().getInsertValue(schema.get()).get();
|
||||||
StringBuilder sb = new StringBuilder();
|
if (sortColumns.length == 1) {
|
||||||
for (String col : sortColumns) {
|
return HoodieAvroUtils.getNestedFieldVal(genericRecord, sortColumns[0], true);
|
||||||
sb.append(genericRecord.get(col));
|
} else {
|
||||||
}
|
StringBuilder sb = new StringBuilder();
|
||||||
|
for (String col : sortColumns) {
|
||||||
|
sb.append(HoodieAvroUtils.getNestedFieldValAsString(genericRecord, col, true));
|
||||||
|
}
|
||||||
|
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new HoodieIOException("Unable to read record with key:" + record.getKey(), e);
|
throw new HoodieIOException("Unable to read record with key:" + record.getKey(), e);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -62,12 +62,17 @@ public class SerializableSchema implements Serializable {
|
|||||||
|
|
||||||
// create a public write method for unit test
|
// create a public write method for unit test
|
||||||
public void writeObjectTo(ObjectOutputStream out) throws IOException {
|
public void writeObjectTo(ObjectOutputStream out) throws IOException {
|
||||||
out.writeUTF(schema.toString());
|
// Note: writeUTF cannot support string length > 64K. So use writeObject which has small overhead (relatively).
|
||||||
|
out.writeObject(schema.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
// create a public read method for unit test
|
// create a public read method for unit test
|
||||||
public void readObjectFrom(ObjectInputStream in) throws IOException {
|
public void readObjectFrom(ObjectInputStream in) throws IOException {
|
||||||
schema = new Schema.Parser().parse(in.readUTF());
|
try {
|
||||||
|
schema = new Schema.Parser().parse(in.readObject().toString());
|
||||||
|
} catch (ClassNotFoundException e) {
|
||||||
|
throw new IOException("unable to parse schema", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -49,6 +49,11 @@ public class TestSerializableSchema {
|
|||||||
verifySchema(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS);
|
verifySchema(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLargeSchema() throws IOException {
|
||||||
|
verifySchema(new Schema.Parser().parse(generateLargeSchema()));
|
||||||
|
}
|
||||||
|
|
||||||
private void verifySchema(Schema schema) throws IOException {
|
private void verifySchema(Schema schema) throws IOException {
|
||||||
SerializableSchema serializableSchema = new SerializableSchema(schema);
|
SerializableSchema serializableSchema = new SerializableSchema(schema);
|
||||||
assertEquals(schema, serializableSchema.get());
|
assertEquals(schema, serializableSchema.get());
|
||||||
@@ -65,4 +70,20 @@ public class TestSerializableSchema {
|
|||||||
newSchema.readObjectFrom(new ObjectInputStream(new ByteArrayInputStream(bytesWritten)));
|
newSchema.readObjectFrom(new ObjectInputStream(new ByteArrayInputStream(bytesWritten)));
|
||||||
assertEquals(schema, newSchema.get());
|
assertEquals(schema, newSchema.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// generate large schemas (>64K which is limitation of ObjectOutputStream#writeUTF) to validate it can be serialized
|
||||||
|
private String generateLargeSchema() {
|
||||||
|
StringBuilder schema = new StringBuilder();
|
||||||
|
schema.append(HoodieTestDataGenerator.TRIP_SCHEMA_PREFIX);
|
||||||
|
int fieldNum = 1;
|
||||||
|
while (schema.length() < 80 * 1024) {
|
||||||
|
String fieldName = "field" + fieldNum;
|
||||||
|
schema.append("{\"name\": \"" + fieldName + "\",\"type\": {\"type\":\"record\", \"name\":\"" + fieldName + "\",\"fields\": ["
|
||||||
|
+ "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},");
|
||||||
|
fieldNum++;
|
||||||
|
}
|
||||||
|
|
||||||
|
schema.append(HoodieTestDataGenerator.TRIP_SCHEMA_SUFFIX);
|
||||||
|
return schema.toString();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user