1
0

[HUDI-1828] Update unit tests to support ORC as the base file format (#3237)

This commit is contained in:
Jintao Guan
2021-07-14 09:05:42 -07:00
committed by GitHub
parent 93967404a7
commit 2debb9b3ed
14 changed files with 149 additions and 53 deletions

View File

@@ -23,6 +23,7 @@ import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TY
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
@@ -47,7 +48,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.AvroOrcUtils;
public class HoodieOrcWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
implements HoodieFileWriter<R> {
implements HoodieFileWriter<R>, Closeable {
private static final AtomicLong RECORD_INDEX = new AtomicLong(1);
private final long maxFileSize;

View File

@@ -23,9 +23,11 @@ import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
@@ -34,6 +36,8 @@ import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
import org.apache.hudi.io.storage.HoodieOrcConfig;
import org.apache.hudi.io.storage.HoodieOrcWriter;
import org.apache.hudi.io.storage.HoodieParquetWriter;
import org.apache.avro.Schema;
@@ -44,6 +48,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.orc.CompressionKind;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -84,22 +89,43 @@ public class HoodieWriteableTestTable extends HoodieTestTable {
FileCreateUtils.createPartitionMetaFile(basePath, partition);
String fileName = baseFileName(currentInstantTime, fileId);
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
new AvroSchemaConverter().convert(schema), schema, filter);
HoodieAvroParquetConfig config = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP,
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024,
new Configuration(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO.defaultValue()));
try (HoodieParquetWriter writer = new HoodieParquetWriter(
currentInstantTime,
new Path(Paths.get(basePath, partition, fileName).toString()),
config, schema, contextSupplier)) {
int seqId = 1;
for (HoodieRecord record : records) {
GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get();
HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, currentInstantTime, String.valueOf(seqId++));
HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName);
writer.writeAvro(record.getRecordKey(), avroRecord);
filter.add(record.getRecordKey());
if (HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue().equals(HoodieFileFormat.PARQUET)) {
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
new AvroSchemaConverter().convert(schema), schema, filter);
HoodieAvroParquetConfig config = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP,
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024,
new Configuration(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO.defaultValue()));
try (HoodieParquetWriter writer = new HoodieParquetWriter(
currentInstantTime,
new Path(Paths.get(basePath, partition, fileName).toString()),
config, schema, contextSupplier)) {
int seqId = 1;
for (HoodieRecord record : records) {
GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get();
HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, currentInstantTime, String.valueOf(seqId++));
HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName);
writer.writeAvro(record.getRecordKey(), avroRecord);
filter.add(record.getRecordKey());
}
}
} else if (HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue().equals(HoodieFileFormat.ORC)) {
Configuration conf = new Configuration();
int orcStripSize = Integer.parseInt(HoodieStorageConfig.ORC_STRIPE_SIZE.defaultValue());
int orcBlockSize = Integer.parseInt(HoodieStorageConfig.ORC_BLOCK_SIZE.defaultValue());
int maxFileSize = Integer.parseInt(HoodieStorageConfig.ORC_FILE_MAX_BYTES.defaultValue());
HoodieOrcConfig config = new HoodieOrcConfig(conf, CompressionKind.ZLIB, orcStripSize, orcBlockSize, maxFileSize, filter);
try (HoodieOrcWriter writer = new HoodieOrcWriter(
currentInstantTime,
new Path(Paths.get(basePath, partition, fileName).toString()),
config, schema, contextSupplier)) {
int seqId = 1;
for (HoodieRecord record : records) {
GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get();
HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, currentInstantTime, String.valueOf(seqId++));
HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName);
writer.writeAvro(record.getRecordKey(), avroRecord);
filter.add(record.getRecordKey());
}
}
}

View File

@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -144,7 +145,13 @@ public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializ
// record locations might be same for multiple keys, so need a unique list
Set<String> uniquePaths = new HashSet<>(paths);
Dataset<Row> originalDF = sqlContextOpt.get().read().parquet(uniquePaths.toArray(new String[uniquePaths.size()]));
Dataset<Row> originalDF = null;
// read files based on the file extension name
if (paths.size() == 0 || paths.get(0).endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
originalDF = sqlContextOpt.get().read().parquet(uniquePaths.toArray(new String[uniquePaths.size()]));
} else if (paths.get(0).endsWith(HoodieFileFormat.ORC.getFileExtension())) {
originalDF = sqlContextOpt.get().read().orc(uniquePaths.toArray(new String[uniquePaths.size()]));
}
StructType schema = originalDF.schema();
JavaPairRDD<HoodieKey, Row> keyRowRDD = originalDF.javaRDD().mapToPair(row -> {
HoodieKey key = new HoodieKey(row.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD),

View File

@@ -83,7 +83,7 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
HoodieWriteConfig config = makeHoodieClientConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileSize)
.insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1000 * 1024).parquetMaxFileSize(1000 * 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1000 * 1024).parquetMaxFileSize(1000 * 1024).orcMaxFileSize(1000 * 1024).build())
.build();
FileCreateUtils.createCommit(basePath, "001");

View File

@@ -76,7 +76,7 @@ public class CompactionTestBase extends HoodieClientTestBase {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withStorageConfig(HoodieStorageConfig.newBuilder()
.hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).build())
.hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).orcMaxFileSize(1024 * 1024 * 1024).build())
.forTable("test-trip-table")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()

View File

@@ -97,7 +97,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
.withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
.withInlineCompaction(false).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).orcMaxFileSize(1024 * 1024).build())
.withMemoryConfig(HoodieMemoryConfig.newBuilder().withMaxDFSStreamBufferSize(1 * 1024 * 1024).build())
.forTable("test-trip-table")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build());

View File

@@ -43,6 +43,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
@@ -158,6 +159,14 @@ public class FunctionalTestHarness implements SparkProvider, DFSProvider, Hoodie
}
}
@AfterEach
public synchronized void tearDown() throws Exception {
if (spark != null) {
spark.stop();
spark = null;
}
}
@AfterAll
public static synchronized void cleanUpAfterAll() throws IOException {
Path workDir = dfs.getWorkingDirectory();

View File

@@ -146,7 +146,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(cleaningPolicy)
.compactionSmallFileSize(1024 * 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).orcMaxFileSize(1024 * 1024).build())
.forTable("test-trip-table")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()

View File

@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -114,8 +115,14 @@ public class HoodieClientTestUtils {
HashMap<String, String> paths =
getLatestFileIDsToFullPath(basePath, commitTimeline, Arrays.asList(commitInstant));
LOG.info("Path :" + paths.values());
return sqlContext.read().parquet(paths.values().toArray(new String[paths.size()]))
.filter(String.format("%s ='%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, instantTime));
if (HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue().equals(HoodieFileFormat.PARQUET)) {
return sqlContext.read().parquet(paths.values().toArray(new String[paths.size()]))
.filter(String.format("%s ='%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, instantTime));
} else if (HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue().equals(HoodieFileFormat.ORC)) {
return sqlContext.read().orc(paths.values().toArray(new String[paths.size()]))
.filter(String.format("%s ='%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, instantTime));
}
return sqlContext.emptyDataFrame();
} catch (Exception e) {
throw new HoodieException("Error reading commit " + instantTime, e);
}
@@ -141,6 +148,10 @@ public class HoodieClientTestUtils {
.filter(gr -> HoodieTimeline.compareTimestamps(lastCommitTime, HoodieActiveTimeline.LESSER_THAN,
gr.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString()))
.count();
} else if (paths[0].endsWith(HoodieFileFormat.ORC.getFileExtension())) {
return sqlContext.read().orc(paths)
.filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, lastCommitTime))
.count();
}
throw new HoodieException("Unsupported base file format for file :" + paths[0]);
} catch (IOException e) {
@@ -175,7 +186,16 @@ public class HoodieClientTestUtils {
for (HoodieBaseFile file : latestFiles) {
filteredPaths.add(file.getPath());
}
return sqlContext.read().parquet(filteredPaths.toArray(new String[filteredPaths.size()]));
if (filteredPaths.isEmpty()) {
return sqlContext.emptyDataFrame();
}
String[] filteredPathsToRead = filteredPaths.toArray(new String[filteredPaths.size()]);
if (filteredPathsToRead[0].endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
return sqlContext.read().parquet(filteredPathsToRead);
} else if (filteredPathsToRead[0].endsWith(HoodieFileFormat.ORC.getFileExtension())) {
return sqlContext.read().orc(filteredPathsToRead);
}
return sqlContext.emptyDataFrame();
} catch (Exception e) {
throw new HoodieException("Error reading hoodie table as a dataframe", e);
}