[HUDI-4003] Try to read all the log file to parse schema (#5473)
This commit is contained in:
@@ -61,6 +61,7 @@ import org.apache.parquet.schema.MessageType;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema;
|
import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema;
|
||||||
@@ -98,8 +99,8 @@ public class TableSchemaResolver {
|
|||||||
// For COW table, the file has data written must be in parquet or orc format currently.
|
// For COW table, the file has data written must be in parquet or orc format currently.
|
||||||
if (instantAndCommitMetadata.isPresent()) {
|
if (instantAndCommitMetadata.isPresent()) {
|
||||||
HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight();
|
HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight();
|
||||||
String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny().get();
|
Iterator<String> filePaths = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().iterator();
|
||||||
return readSchemaFromBaseFile(filePath);
|
return fetchSchemaFromFiles(filePaths);
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalArgumentException("Could not find any data file written for commit, "
|
throw new IllegalArgumentException("Could not find any data file written for commit, "
|
||||||
+ "so could not get schema for table " + metaClient.getBasePath());
|
+ "so could not get schema for table " + metaClient.getBasePath());
|
||||||
@@ -109,13 +110,8 @@ public class TableSchemaResolver {
|
|||||||
// Determine the file format based on the file name, and then extract schema from it.
|
// Determine the file format based on the file name, and then extract schema from it.
|
||||||
if (instantAndCommitMetadata.isPresent()) {
|
if (instantAndCommitMetadata.isPresent()) {
|
||||||
HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight();
|
HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight();
|
||||||
String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny().get();
|
Iterator<String> filePaths = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().iterator();
|
||||||
if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) {
|
return fetchSchemaFromFiles(filePaths);
|
||||||
// this is a log file
|
|
||||||
return readSchemaFromLogFile(new Path(filePath));
|
|
||||||
} else {
|
|
||||||
return readSchemaFromBaseFile(filePath);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalArgumentException("Could not find any data file written for commit, "
|
throw new IllegalArgumentException("Could not find any data file written for commit, "
|
||||||
+ "so could not get schema for table " + metaClient.getBasePath());
|
+ "so could not get schema for table " + metaClient.getBasePath());
|
||||||
@@ -129,6 +125,20 @@ public class TableSchemaResolver {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private MessageType fetchSchemaFromFiles(Iterator<String> filePaths) throws IOException {
|
||||||
|
MessageType type = null;
|
||||||
|
while (filePaths.hasNext() && type == null) {
|
||||||
|
String filePath = filePaths.next();
|
||||||
|
if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) {
|
||||||
|
// this is a log file
|
||||||
|
type = readSchemaFromLogFile(new Path(filePath));
|
||||||
|
} else {
|
||||||
|
type = readSchemaFromBaseFile(filePath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
private MessageType readSchemaFromBaseFile(String filePath) throws IOException {
|
private MessageType readSchemaFromBaseFile(String filePath) throws IOException {
|
||||||
if (filePath.contains(HoodieFileFormat.PARQUET.getFileExtension())) {
|
if (filePath.contains(HoodieFileFormat.PARQUET.getFileExtension())) {
|
||||||
// this is a parquet file
|
// this is a parquet file
|
||||||
|
|||||||
Reference in New Issue
Block a user