1
0

[HUDI-345] Fix used deprecated function (#1024)

- Schema.parse() with new Schema.Parser().parse
- FSDataOutputStream constructor
This commit is contained in:
hongdd
2019-11-22 19:32:09 +08:00
committed by vinoth chandar
parent 17eaf41c54
commit 7bc08cbfdc
6 changed files with 9 additions and 8 deletions

View File

@@ -46,7 +46,8 @@ public class HoodieMergeOnReadTestUtils {
public static List<GenericRecord> getRecordsUsingInputFormat(List<String> inputPaths, String basePath) public static List<GenericRecord> getRecordsUsingInputFormat(List<String> inputPaths, String basePath)
throws IOException { throws IOException {
JobConf jobConf = new JobConf(); JobConf jobConf = new JobConf();
Schema schema = HoodieAvroUtils.addMetadataFields(Schema.parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)); Schema schema = HoodieAvroUtils.addMetadataFields(
new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA));
HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat(); HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat();
setPropsForInputFormat(inputFormat, jobConf, schema, basePath); setPropsForInputFormat(inputFormat, jobConf, schema, basePath);
return inputPaths.stream().map(path -> { return inputPaths.stream().map(path -> {

View File

@@ -100,7 +100,7 @@ public class TestUpdateMapFunction extends HoodieClientTestHarness {
// Now try an update with an evolved schema // Now try an update with an evolved schema
// Evolved schema does not have guarantee on preserving the original field ordering // Evolved schema does not have guarantee on preserving the original field ordering
final HoodieWriteConfig config2 = makeHoodieClientConfig("/exampleEvolvedSchema.txt"); final HoodieWriteConfig config2 = makeHoodieClientConfig("/exampleEvolvedSchema.txt");
final Schema schema = Schema.parse(config2.getSchema()); final Schema schema = new Schema.Parser().parse(config2.getSchema());
final WriteStatus insertResult = statuses.get(0); final WriteStatus insertResult = statuses.get(0);
String fileId = insertResult.getFileId(); String fileId = insertResult.getFileId();

View File

@@ -43,7 +43,7 @@ public class SizeAwareFSDataOutputStream extends FSDataOutputStream {
public SizeAwareFSDataOutputStream(Path path, FSDataOutputStream out, ConsistencyGuard consistencyGuard, public SizeAwareFSDataOutputStream(Path path, FSDataOutputStream out, ConsistencyGuard consistencyGuard,
Runnable closeCallback) throws IOException { Runnable closeCallback) throws IOException {
super(out); super(out, null);
this.path = path; this.path = path;
this.closeCallback = closeCallback; this.closeCallback = closeCallback;
this.consistencyGuard = consistencyGuard; this.consistencyGuard = consistencyGuard;

View File

@@ -62,7 +62,7 @@ public class HoodieAvroDataBlock extends HoodieLogBlock {
@Nonnull Map<HeaderMetadataType, String> footer) { @Nonnull Map<HeaderMetadataType, String> footer) {
super(header, footer, Option.empty(), Option.empty(), null, false); super(header, footer, Option.empty(), Option.empty(), null, false);
this.records = records; this.records = records;
this.schema = Schema.parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); this.schema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
} }
public HoodieAvroDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header) { public HoodieAvroDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header) {
@@ -97,7 +97,7 @@ public class HoodieAvroDataBlock extends HoodieLogBlock {
createRecordsFromContentBytes(); createRecordsFromContentBytes();
} }
Schema schema = Schema.parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); Schema schema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
GenericDatumWriter<IndexedRecord> writer = new GenericDatumWriter<>(schema); GenericDatumWriter<IndexedRecord> writer = new GenericDatumWriter<>(schema);
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(baos); DataOutputStream output = new DataOutputStream(baos);

View File

@@ -51,7 +51,7 @@ public class LogReaderUtils {
HoodieAvroDataBlock lastBlock = (HoodieAvroDataBlock) block; HoodieAvroDataBlock lastBlock = (HoodieAvroDataBlock) block;
if (completedTimeline if (completedTimeline
.containsOrBeforeTimelineStarts(lastBlock.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME))) { .containsOrBeforeTimelineStarts(lastBlock.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME))) {
writerSchema = Schema.parse(lastBlock.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); writerSchema = new Schema.Parser().parse(lastBlock.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
break; break;
} }
} }

View File

@@ -48,12 +48,12 @@ object AvroConversionUtils {
ss.createDataFrame(rdd.mapPartitions { records => ss.createDataFrame(rdd.mapPartitions { records =>
if (records.isEmpty) Iterator.empty if (records.isEmpty) Iterator.empty
else { else {
val schema = Schema.parse(schemaStr) val schema = new Schema.Parser().parse(schemaStr)
val dataType = convertAvroSchemaToStructType(schema) val dataType = convertAvroSchemaToStructType(schema)
val convertor = AvroConversionHelper.createConverterToRow(schema, dataType) val convertor = AvroConversionHelper.createConverterToRow(schema, dataType)
records.map { x => convertor(x).asInstanceOf[Row] } records.map { x => convertor(x).asInstanceOf[Row] }
} }
}, convertAvroSchemaToStructType(Schema.parse(schemaStr))).asInstanceOf[Dataset[Row]] }, convertAvroSchemaToStructType(new Schema.Parser().parse(schemaStr))).asInstanceOf[Dataset[Row]]
} }
} }