[MINOR] Updated HoodieMergeOnReadTestUtils for future testing requirements (#1456)
1. getRecordsUsingInputFormat() can take a custom Configuration which can be used to specify HUDI table properties (e.g. <table>.consume.mode or <table>.consume.start.timestamp) 2. Fixed the return to return an empty List rather than raise an Exception if no records are found
This commit is contained in:
@@ -44,9 +44,13 @@ import java.util.stream.Collectors;
|
|||||||
* Utility methods to aid in testing MergeOnRead (workaround for HoodieReadClient for MOR).
|
* Utility methods to aid in testing MergeOnRead (workaround for HoodieReadClient for MOR).
|
||||||
*/
|
*/
|
||||||
public class HoodieMergeOnReadTestUtils {
|
public class HoodieMergeOnReadTestUtils {
|
||||||
|
|
||||||
public static List<GenericRecord> getRecordsUsingInputFormat(List<String> inputPaths, String basePath) {
|
public static List<GenericRecord> getRecordsUsingInputFormat(List<String> inputPaths, String basePath) {
|
||||||
JobConf jobConf = new JobConf();
|
return getRecordsUsingInputFormat(inputPaths, basePath, new Configuration());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static List<GenericRecord> getRecordsUsingInputFormat(List<String> inputPaths, String basePath,
|
||||||
|
Configuration conf) {
|
||||||
|
JobConf jobConf = new JobConf(conf);
|
||||||
Schema schema = HoodieAvroUtils.addMetadataFields(
|
Schema schema = HoodieAvroUtils.addMetadataFields(
|
||||||
new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA));
|
new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA));
|
||||||
HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat();
|
HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat();
|
||||||
@@ -64,8 +68,10 @@ public class HoodieMergeOnReadTestUtils {
|
|||||||
// writable returns an array with [field1, field2, _hoodie_commit_time,
|
// writable returns an array with [field1, field2, _hoodie_commit_time,
|
||||||
// _hoodie_commit_seqno]
|
// _hoodie_commit_seqno]
|
||||||
Writable[] values = writable.get();
|
Writable[] values = writable.get();
|
||||||
|
final int[] fieldIndex = {0};
|
||||||
|
assert schema.getFields().size() <= values.length;
|
||||||
schema.getFields().forEach(field -> {
|
schema.getFields().forEach(field -> {
|
||||||
newRecord.set(field, values[2]);
|
newRecord.set(field, values[fieldIndex[0]++]);
|
||||||
});
|
});
|
||||||
records.add(newRecord.build());
|
records.add(newRecord.build());
|
||||||
}
|
}
|
||||||
@@ -76,7 +82,7 @@ public class HoodieMergeOnReadTestUtils {
|
|||||||
}).reduce((a, b) -> {
|
}).reduce((a, b) -> {
|
||||||
a.addAll(b);
|
a.addAll(b);
|
||||||
return a;
|
return a;
|
||||||
}).get();
|
}).orElse(new ArrayList<GenericRecord>());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void setPropsForInputFormat(HoodieParquetRealtimeInputFormat inputFormat, JobConf jobConf,
|
private static void setPropsForInputFormat(HoodieParquetRealtimeInputFormat inputFormat, JobConf jobConf,
|
||||||
|
|||||||
Reference in New Issue
Block a user