diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java index 9572af523..f26519a35 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java @@ -24,6 +24,7 @@ import org.apache.hudi.cli.commands.RepairsCommand; import org.apache.hudi.cli.commands.TableCommand; import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest; import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -62,6 +63,7 @@ public class ITTestRepairsCommand extends AbstractShellIntegrationTest { private String duplicatedPartitionPathWithUpdates; private String duplicatedPartitionPathWithUpserts; private String repairedOutputPath; + private HoodieFileFormat fileFormat; @BeforeEach public void init() throws Exception { @@ -101,6 +103,7 @@ public class ITTestRepairsCommand extends AbstractShellIntegrationTest { .withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "8", dupRecords); metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); + fileFormat = metaClient.getTableConfig().getBaseFileFormat(); } /** @@ -117,7 +120,7 @@ public class ITTestRepairsCommand extends AbstractShellIntegrationTest { // Before deduplicate, all files contain 210 records String[] files = filteredStatuses.toArray(new String[0]); - Dataset df = sqlContext.read().parquet(files); + Dataset df = readFiles(files); assertEquals(210, df.count()); String partitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; @@ -130,7 +133,7 @@ public class ITTestRepairsCommand extends AbstractShellIntegrationTest { // After deduplicate, there are 200 records FileStatus[] fileStatus = fs.listStatus(new Path(repairedOutputPath)); files = Arrays.stream(fileStatus).map(status -> status.getPath().toString()).toArray(String[]::new); - Dataset result = sqlContext.read().parquet(files); + Dataset result = readFiles(files); assertEquals(200, result.count()); } @@ -144,7 +147,7 @@ public class ITTestRepairsCommand extends AbstractShellIntegrationTest { // Before deduplicate, all files contain 110 records String[] files = filteredStatuses.toArray(new String[0]); - Dataset df = sqlContext.read().parquet(files); + Dataset df = readFiles(files); assertEquals(110, df.count()); String partitionPath = HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH; @@ -157,7 +160,7 @@ public class ITTestRepairsCommand extends AbstractShellIntegrationTest { // After deduplicate, there are 100 records FileStatus[] fileStatus = fs.listStatus(new Path(repairedOutputPath)); files = Arrays.stream(fileStatus).map(status -> status.getPath().toString()).toArray(String[]::new); - Dataset result = sqlContext.read().parquet(files); + Dataset result = readFiles(files); assertEquals(100, result.count()); } @@ -171,7 +174,7 @@ public class ITTestRepairsCommand extends AbstractShellIntegrationTest { // Before deduplicate, all files contain 120 records String[] files = filteredStatuses.toArray(new String[0]); - Dataset df = sqlContext.read().parquet(files); + Dataset df = readFiles(files); assertEquals(120, df.count()); String partitionPath = HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH; @@ -184,7 +187,7 @@ public class ITTestRepairsCommand extends AbstractShellIntegrationTest { // After deduplicate, there are 100 records FileStatus[] fileStatus = fs.listStatus(new Path(repairedOutputPath)); files = Arrays.stream(fileStatus).map(status -> status.getPath().toString()).toArray(String[]::new); - Dataset result = sqlContext.read().parquet(files); + Dataset result = readFiles(files); assertEquals(100, result.count()); } @@ -202,7 +205,7 @@ public class ITTestRepairsCommand extends AbstractShellIntegrationTest { // Before deduplicate, all files contain 210 records String[] files = filteredStatuses.toArray(new String[0]); - Dataset df = sqlContext.read().parquet(files); + Dataset df = readFiles(files); assertEquals(210, df.count()); String partitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; @@ -215,7 +218,16 @@ public class ITTestRepairsCommand extends AbstractShellIntegrationTest { // After deduplicate, there are 200 records under partition path FileStatus[] fileStatus = fs.listStatus(new Path(duplicatedPartitionPath)); files = Arrays.stream(fileStatus).map(status -> status.getPath().toString()).toArray(String[]::new); - Dataset result = sqlContext.read().parquet(files); + Dataset result = readFiles(files); assertEquals(200, result.count()); } + + private Dataset readFiles(String[] files) { + if (HoodieFileFormat.PARQUET.equals(fileFormat)) { + return sqlContext.read().parquet(files); + } else if (HoodieFileFormat.ORC.equals(fileFormat)) { + return sqlContext.read().orc(files); + } + throw new UnsupportedOperationException(fileFormat.name() + " format not supported yet."); + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java index f0768429d..85d36cc68 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java @@ -23,6 +23,7 @@ import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TY import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER; import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER; +import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; @@ -47,7 +48,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.AvroOrcUtils; public class HoodieOrcWriter - implements HoodieFileWriter { + implements HoodieFileWriter, Closeable { private static final AtomicLong RECORD_INDEX = new AtomicLong(1); private final long maxFileSize; diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java index f02330e9a..3bf8e4f9d 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java @@ -23,9 +23,11 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; @@ -34,6 +36,8 @@ import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.io.storage.HoodieAvroParquetConfig; +import org.apache.hudi.io.storage.HoodieOrcConfig; +import org.apache.hudi.io.storage.HoodieOrcWriter; import org.apache.hudi.io.storage.HoodieParquetWriter; import org.apache.avro.Schema; @@ -44,6 +48,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.orc.CompressionKind; import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -84,22 +89,43 @@ public class HoodieWriteableTestTable extends HoodieTestTable { FileCreateUtils.createPartitionMetaFile(basePath, partition); String fileName = baseFileName(currentInstantTime, fileId); - HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport( - new AvroSchemaConverter().convert(schema), schema, filter); - HoodieAvroParquetConfig config = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP, - ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024, - new Configuration(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO.defaultValue())); - try (HoodieParquetWriter writer = new HoodieParquetWriter( - currentInstantTime, - new Path(Paths.get(basePath, partition, fileName).toString()), - config, schema, contextSupplier)) { - int seqId = 1; - for (HoodieRecord record : records) { - GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get(); - HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, currentInstantTime, String.valueOf(seqId++)); - HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName); - writer.writeAvro(record.getRecordKey(), avroRecord); - filter.add(record.getRecordKey()); + if (HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue().equals(HoodieFileFormat.PARQUET)) { + HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport( + new AvroSchemaConverter().convert(schema), schema, filter); + HoodieAvroParquetConfig config = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP, + ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024, + new Configuration(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO.defaultValue())); + try (HoodieParquetWriter writer = new HoodieParquetWriter( + currentInstantTime, + new Path(Paths.get(basePath, partition, fileName).toString()), + config, schema, contextSupplier)) { + int seqId = 1; + for (HoodieRecord record : records) { + GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get(); + HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, currentInstantTime, String.valueOf(seqId++)); + HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName); + writer.writeAvro(record.getRecordKey(), avroRecord); + filter.add(record.getRecordKey()); + } + } + } else if (HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue().equals(HoodieFileFormat.ORC)) { + Configuration conf = new Configuration(); + int orcStripSize = Integer.parseInt(HoodieStorageConfig.ORC_STRIPE_SIZE.defaultValue()); + int orcBlockSize = Integer.parseInt(HoodieStorageConfig.ORC_BLOCK_SIZE.defaultValue()); + int maxFileSize = Integer.parseInt(HoodieStorageConfig.ORC_FILE_MAX_BYTES.defaultValue()); + HoodieOrcConfig config = new HoodieOrcConfig(conf, CompressionKind.ZLIB, orcStripSize, orcBlockSize, maxFileSize, filter); + try (HoodieOrcWriter writer = new HoodieOrcWriter( + currentInstantTime, + new Path(Paths.get(basePath, partition, fileName).toString()), + config, schema, contextSupplier)) { + int seqId = 1; + for (HoodieRecord record : records) { + GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get(); + HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, currentInstantTime, String.valueOf(seqId++)); + HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName); + writer.writeAvro(record.getRecordKey(), avroRecord); + filter.add(record.getRecordKey()); + } } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java index f8cc757eb..fc615e0f8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -144,7 +145,13 @@ public class HoodieReadClient implements Serializ // record locations might be same for multiple keys, so need a unique list Set uniquePaths = new HashSet<>(paths); - Dataset originalDF = sqlContextOpt.get().read().parquet(uniquePaths.toArray(new String[uniquePaths.size()])); + Dataset originalDF = null; + // read files based on the file extension name + if (paths.size() == 0 || paths.get(0).endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { + originalDF = sqlContextOpt.get().read().parquet(uniquePaths.toArray(new String[uniquePaths.size()])); + } else if (paths.get(0).endsWith(HoodieFileFormat.ORC.getFileExtension())) { + originalDF = sqlContextOpt.get().read().orc(uniquePaths.toArray(new String[uniquePaths.size()])); + } StructType schema = originalDF.schema(); JavaPairRDD keyRowRDD = originalDF.javaRDD().mapToPair(row -> { HoodieKey key = new HoodieKey(row.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD), diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index 3be0bb340..3a125d230 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -83,7 +83,7 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { HoodieWriteConfig config = makeHoodieClientConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileSize) .insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build()) - .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1000 * 1024).parquetMaxFileSize(1000 * 1024).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1000 * 1024).parquetMaxFileSize(1000 * 1024).orcMaxFileSize(1000 * 1024).build()) .build(); FileCreateUtils.createCommit(basePath, "001"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java index e8e2e5410..4af39739a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java @@ -76,7 +76,7 @@ public class CompactionTestBase extends HoodieClientTestBase { .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024) .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()) .withStorageConfig(HoodieStorageConfig.newBuilder() - .hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).build()) + .hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).orcMaxFileSize(1024 * 1024 * 1024).build()) .forTable("test-trip-table") .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java index 2706dce7d..14e26b9d4 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java @@ -97,7 +97,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { .withParallelism(2, 2) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024) .withInlineCompaction(false).build()) - .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).orcMaxFileSize(1024 * 1024).build()) .withMemoryConfig(HoodieMemoryConfig.newBuilder().withMaxDFSStreamBufferSize(1 * 1024 * 1024).build()) .forTable("test-trip-table") .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java index 78cd6d76c..cbf0a2213 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java @@ -43,6 +43,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.io.TempDir; @@ -158,6 +159,14 @@ public class FunctionalTestHarness implements SparkProvider, DFSProvider, Hoodie } } + @AfterEach + public synchronized void tearDown() throws Exception { + if (spark != null) { + spark.stop(); + spark = null; + } + } + @AfterAll public static synchronized void cleanUpAfterAll() throws IOException { Path workDir = dfs.getWorkingDirectory(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java index 1386edcfe..300feb855 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java @@ -146,7 +146,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness { .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(cleaningPolicy) .compactionSmallFileSize(1024 * 1024).build()) - .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).orcMaxFileSize(1024 * 1024).build()) .forTable("test-trip-table") .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()) .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index 58722053e..b93583b9c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -114,8 +115,14 @@ public class HoodieClientTestUtils { HashMap paths = getLatestFileIDsToFullPath(basePath, commitTimeline, Arrays.asList(commitInstant)); LOG.info("Path :" + paths.values()); - return sqlContext.read().parquet(paths.values().toArray(new String[paths.size()])) - .filter(String.format("%s ='%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, instantTime)); + if (HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue().equals(HoodieFileFormat.PARQUET)) { + return sqlContext.read().parquet(paths.values().toArray(new String[paths.size()])) + .filter(String.format("%s ='%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, instantTime)); + } else if (HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue().equals(HoodieFileFormat.ORC)) { + return sqlContext.read().orc(paths.values().toArray(new String[paths.size()])) + .filter(String.format("%s ='%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, instantTime)); + } + return sqlContext.emptyDataFrame(); } catch (Exception e) { throw new HoodieException("Error reading commit " + instantTime, e); } @@ -141,6 +148,10 @@ public class HoodieClientTestUtils { .filter(gr -> HoodieTimeline.compareTimestamps(lastCommitTime, HoodieActiveTimeline.LESSER_THAN, gr.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString())) .count(); + } else if (paths[0].endsWith(HoodieFileFormat.ORC.getFileExtension())) { + return sqlContext.read().orc(paths) + .filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, lastCommitTime)) + .count(); } throw new HoodieException("Unsupported base file format for file :" + paths[0]); } catch (IOException e) { @@ -175,7 +186,16 @@ public class HoodieClientTestUtils { for (HoodieBaseFile file : latestFiles) { filteredPaths.add(file.getPath()); } - return sqlContext.read().parquet(filteredPaths.toArray(new String[filteredPaths.size()])); + if (filteredPaths.isEmpty()) { + return sqlContext.emptyDataFrame(); + } + String[] filteredPathsToRead = filteredPaths.toArray(new String[filteredPaths.size()]); + if (filteredPathsToRead[0].endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { + return sqlContext.read().parquet(filteredPathsToRead); + } else if (filteredPathsToRead[0].endsWith(HoodieFileFormat.ORC.getFileExtension())) { + return sqlContext.read().orc(filteredPathsToRead); + } + return sqlContext.emptyDataFrame(); } catch (Exception e) { throw new HoodieException("Error reading hoodie table as a dataframe", e); } diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java index cb96e6f3f..c6160a8f1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java @@ -42,6 +42,7 @@ public class HoodieAvroWriteSupport extends AvroWriteSupport { public static final String HOODIE_MIN_RECORD_KEY_FOOTER = "hoodie_min_record_key"; public static final String HOODIE_MAX_RECORD_KEY_FOOTER = "hoodie_max_record_key"; public static final String HOODIE_BLOOM_FILTER_TYPE_CODE = "hoodie_bloom_filter_type_code"; + public static final String HOODIE_AVRO_SCHEMA_METADATA_KEY = "orc.avro.schema"; public HoodieAvroWriteSupport(MessageType schema, Schema avroSchema, BloomFilter bloomFilter) { super(schema, avroSchema); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java index 9fc49a340..b5d11cb8b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java @@ -19,6 +19,8 @@ package org.apache.hudi.common.util; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -46,6 +48,8 @@ import org.apache.orc.Reader.Options; import org.apache.orc.RecordReader; import org.apache.orc.TypeDescription; +import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_SCHEMA_METADATA_KEY; + /** * Utility functions for ORC files. */ @@ -202,8 +206,7 @@ public class OrcUtils extends BaseFileUtils { footerVals.put(footerName, metadata.get(footerName)); } else if (required) { throw new MetadataNotFoundException( - "Could not find index in ORC footer. Looked for key " + footerName + " in " - + orcFilePath); + "Could not find index in ORC footer. Looked for key " + footerName + " in " + orcFilePath); } } return footerVals; @@ -216,8 +219,9 @@ public class OrcUtils extends BaseFileUtils { public Schema readAvroSchema(Configuration conf, Path orcFilePath) { try { Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf)); - TypeDescription orcSchema = reader.getSchema(); - return AvroOrcUtils.createAvroSchema(orcSchema); + ByteBuffer schemaBuffer = reader.getMetadataValue(HOODIE_AVRO_SCHEMA_METADATA_KEY); + String schemaText = StandardCharsets.UTF_8.decode(schemaBuffer).toString(); + return new Schema.Parser().parse(schemaText); } catch (IOException io) { throw new HoodieIOException("Unable to get Avro schema for ORC file:" + orcFilePath, io); } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java index 0706c2a1f..23bd81a85 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java @@ -23,6 +23,7 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -30,7 +31,6 @@ import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ParquetReaderIterator; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieMemoryConfig; @@ -38,8 +38,8 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; -import org.apache.parquet.avro.AvroParquetReader; -import org.apache.parquet.avro.AvroReadSupport; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -113,8 +113,14 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { @Override protected long analyzeSingleFile(String filePath) { - return SparkBasedReader.readParquet(new SparkSession(jsc.sc()), Arrays.asList(filePath), - Option.empty(), Option.empty()).count(); + if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { + return SparkBasedReader.readParquet(new SparkSession(jsc.sc()), Arrays.asList(filePath), + Option.empty(), Option.empty()).count(); + } else if (filePath.endsWith(HoodieFileFormat.ORC.getFileExtension())) { + return SparkBasedReader.readOrc(new SparkSession(jsc.sc()), Arrays.asList(filePath), + Option.empty(), Option.empty()).count(); + } + throw new UnsupportedOperationException("Format for " + filePath + " is not supported yet."); } private JavaRDD fetchAnyRecordsFromDataset(Option numRecordsToUpdate) throws IOException { @@ -149,7 +155,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { // TODO : read record count from metadata // Read the records in a single file - long recordsInSingleFile = iteratorSize(readParquetOrLogFiles(getSingleSliceFromRDD(partitionToFileSlice))); + long recordsInSingleFile = iteratorSize(readColumnarOrLogFiles(getSingleSliceFromRDD(partitionToFileSlice))); int numFilesToUpdate; long numRecordsToUpdatePerFile; if (!numFiles.isPresent() || numFiles.get() <= 0) { @@ -205,9 +211,9 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { return iteratorLimit(p._2, maxFilesToRead); }).flatMap(p -> p).repartition(numFiles).map(fileSlice -> { if (numRecordsToReadPerFile > 0) { - return iteratorLimit(readParquetOrLogFiles(fileSlice), numRecordsToReadPerFile); + return iteratorLimit(readColumnarOrLogFiles(fileSlice), numRecordsToReadPerFile); } else { - return readParquetOrLogFiles(fileSlice); + return readColumnarOrLogFiles(fileSlice); } }).flatMap(p -> p).map(i -> (GenericRecord) i); } @@ -253,15 +259,13 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { }).take(1).get(0); } - private Iterator readParquetOrLogFiles(FileSlice fileSlice) throws IOException { + private Iterator readColumnarOrLogFiles(FileSlice fileSlice) throws IOException { if (fileSlice.getBaseFile().isPresent()) { - // Read the parquet files using the latest writer schema. - Schema schema = new Schema.Parser().parse(schemaStr); - AvroReadSupport.setAvroReadSchema(metaClient.getHadoopConf(), HoodieAvroUtils.addMetadataFields(schema)); - Iterator itr = - new ParquetReaderIterator(AvroParquetReader.builder(new - Path(fileSlice.getBaseFile().get().getPath())).withConf(metaClient.getHadoopConf()).build()); - return itr; + // Read the base files using the latest writer schema. + Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); + HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(metaClient.getHadoopConf(), + new Path(fileSlice.getBaseFile().get().getPath())); + return reader.getRecordIterator(schema); } else { // If there is no data file, fall back to reading log files HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/SparkBasedReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/SparkBasedReader.java index fc23a47b3..5318e93b9 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/SparkBasedReader.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/SparkBasedReader.java @@ -67,4 +67,16 @@ public class SparkBasedReader { .toJavaRDD(); } + public static JavaRDD readOrc(SparkSession sparkSession, List + listOfPaths, Option structName, Option nameSpace) { + + Dataset dataSet = sparkSession.read() + .orc((JavaConverters.asScalaIteratorConverter(listOfPaths.iterator()).asScala().toSeq())); + + return HoodieSparkUtils + .createRdd(dataSet.toDF(), structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME), + RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE) + .toJavaRDD(); + } + }