1
0

[HUDI-285] Implement HoodieStorageWriter based on actual file type (#936)

This commit is contained in:
leesf
2019-10-04 22:45:16 +08:00
committed by vinoth chandar
parent 3dedc7e5fd
commit 7dd9c74b1b
2 changed files with 72 additions and 3 deletions

View File

@@ -18,6 +18,9 @@
package org.apache.hudi.io.storage;
import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
@@ -25,6 +28,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.BloomFilter;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.parquet.avro.AvroSchemaConverter;
@@ -34,9 +38,12 @@ public class HoodieStorageWriterFactory {
public static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieStorageWriter<R> getStorageWriter(
String commitTime, Path path, HoodieTable<T> hoodieTable,
HoodieWriteConfig config, Schema schema) throws IOException {
//TODO - based on the metadata choose the implementation of HoodieStorageWriter
// Currently only parquet is supported
return newParquetStorageWriter(commitTime, path, config, schema, hoodieTable);
final String name = path.getName();
final String extension = FSUtils.isLogFile(path) ? HOODIE_LOG.getFileExtension() : FSUtils.getFileExtension(name);
if (PARQUET.getFileExtension().equals(extension)) {
return newParquetStorageWriter(commitTime, path, config, schema, hoodieTable);
}
throw new UnsupportedOperationException(extension + " format not supported yet.");
}
private static <T extends HoodieRecordPayload,