1
0

[HUDI-684] Introduced abstraction for writing and reading different types of base file formats. (#1687)

Notable changes:
    1. HoodieFileWriter and HoodieFileReader abstractions for writer/reader side of a base file format
    2. HoodieDataBlock abstraction for creation specific data blocks for base file formats. (e.g. Parquet has HoodieAvroDataBlock)
    3. All hardocded references to Parquet / Parquet based classes have been abstracted to call methods which accept a base file format
    4. HiveSyncTool accepts the base file format as a CLI parameter
    5. HoodieDeltaStreamer accepts the base file format as a CLI parameter
    6. HoodieSparkSqlWriter accepts the base file format as a parameter
This commit is contained in:
Prashant Wason
2020-06-25 23:46:55 -07:00
committed by GitHub
parent 5e47673341
commit 2603cfb33e
55 changed files with 1086 additions and 466 deletions

View File

@@ -1,77 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.utils;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueue;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.parquet.hadoop.ParquetReader;
import java.io.IOException;
import java.util.Iterator;
/**
* This class wraps a parquet reader and provides an iterator based api to read from a parquet file. This is used in
* {@link BoundedInMemoryQueue}
*/
public class ParquetReaderIterator<T> implements Iterator<T> {
// Parquet reader for an existing parquet file
private final ParquetReader<T> parquetReader;
// Holds the next entry returned by the parquet reader
private T next;
public ParquetReaderIterator(ParquetReader<T> parquetReader) {
this.parquetReader = parquetReader;
}
@Override
public boolean hasNext() {
try {
// To handle when hasNext() is called multiple times for idempotency and/or the first time
if (this.next == null) {
this.next = parquetReader.read();
}
return this.next != null;
} catch (IOException io) {
throw new HoodieIOException("unable to read next record from parquet file ", io);
}
}
@Override
public T next() {
try {
// To handle case when next() is called before hasNext()
if (this.next == null) {
if (!hasNext()) {
throw new HoodieIOException("No more records left to read from parquet file");
}
}
T retVal = this.next;
this.next = parquetReader.read();
return retVal;
} catch (IOException io) {
throw new HoodieIOException("unable to read next record from parquet file ", io);
}
}
public void close() throws IOException {
parquetReader.close();
}
}

View File

@@ -34,7 +34,7 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
@@ -207,7 +207,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writerSchema.toString());
if (recordList.size() > 0) {
writer = writer.appendBlock(new HoodieAvroDataBlock(recordList, header));
writer = writer.appendBlock(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header));
recordList.clear();
}
if (keysToDelete.size() > 0) {

View File

@@ -30,8 +30,7 @@ import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.io.storage.HoodieStorageWriter;
import org.apache.hudi.io.storage.HoodieStorageWriterFactory;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.generic.GenericRecord;
@@ -47,7 +46,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
private static final Logger LOG = LogManager.getLogger(HoodieCreateHandle.class);
private final HoodieStorageWriter<IndexedRecord> storageWriter;
private final HoodieFileWriter<IndexedRecord> fileWriter;
private final Path path;
private long recordsWritten = 0;
private long insertRecordsWritten = 0;
@@ -68,8 +67,8 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
partitionMetadata.trySave(getPartitionId());
createMarkerFile(partitionPath);
this.storageWriter =
HoodieStorageWriterFactory.getStorageWriter(instantTime, path, hoodieTable, config, writerSchema, this.sparkTaskContextSupplier);
this.fileWriter = createNewFileWriter(instantTime, path, hoodieTable, config, writerSchema,
this.sparkTaskContextSupplier);
} catch (IOException e) {
throw new HoodieInsertException("Failed to initialize HoodieStorageWriter for path " + path, e);
}
@@ -88,7 +87,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
@Override
public boolean canWrite(HoodieRecord record) {
return storageWriter.canWrite() && record.getPartitionPath().equals(writeStatus.getPartitionPath());
return fileWriter.canWrite() && record.getPartitionPath().equals(writeStatus.getPartitionPath());
}
/**
@@ -101,7 +100,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
if (avroRecord.isPresent()) {
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get());
storageWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
// update the new location of record, so we know where to find it next
record.unseal();
record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId()));
@@ -156,7 +155,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten);
try {
storageWriter.close();
fileWriter.close();
HoodieWriteStat stat = new HoodieWriteStat();
stat.setPartitionPath(writeStatus.getPartitionPath());

View File

@@ -23,7 +23,6 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
@@ -34,6 +33,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -61,23 +61,26 @@ public class HoodieKeyLookupHandle<T extends HoodieRecordPayload> extends Hoodie
this.candidateRecordKeys = new ArrayList<>();
this.totalKeysChecked = 0;
HoodieTimer timer = new HoodieTimer().startTimer();
this.bloomFilter = ParquetUtils.readBloomFilterFromParquetMetadata(hoodieTable.getHadoopConf(),
new Path(getLatestDataFile().getPath()));
try {
this.bloomFilter = createNewFileReader().readBloomFilter();
} catch (IOException e) {
throw new HoodieIndexException(String.format("Error reading bloom filter from %s: %s", partitionPathFilePair, e));
}
LOG.info(String.format("Read bloom filter from %s in %d ms", partitionPathFilePair, timer.endTimer()));
}
/**
* Given a list of row keys and one file, return only row keys existing in that file.
*/
public static List<String> checkCandidatesAgainstFile(Configuration configuration, List<String> candidateRecordKeys,
Path filePath) throws HoodieIndexException {
public List<String> checkCandidatesAgainstFile(Configuration configuration, List<String> candidateRecordKeys,
Path filePath) throws HoodieIndexException {
List<String> foundRecordKeys = new ArrayList<>();
try {
// Load all rowKeys from the file, to double-confirm
if (!candidateRecordKeys.isEmpty()) {
HoodieTimer timer = new HoodieTimer().startTimer();
Set<String> fileRowKeys =
ParquetUtils.filterParquetRowKeys(configuration, filePath, new HashSet<>(candidateRecordKeys));
Set<String> fileRowKeys = createNewFileReader().filterRowKeys(new HashSet<>(candidateRecordKeys));
foundRecordKeys.addAll(fileRowKeys);
LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath,
timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size()));

View File

@@ -37,8 +37,7 @@ import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.storage.HoodieStorageWriter;
import org.apache.hudi.io.storage.HoodieStorageWriterFactory;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
@@ -61,7 +60,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
private Map<String, HoodieRecord<T>> keyToNewRecords;
private Set<String> writtenRecordKeys;
private HoodieStorageWriter<IndexedRecord> storageWriter;
private HoodieFileWriter<IndexedRecord> fileWriter;
private Path newFilePath;
private Path oldFilePath;
private long recordsWritten = 0;
@@ -115,7 +114,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
oldFilePath = new Path(config.getBasePath() + "/" + partitionPath + "/" + latestValidFilePath);
String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/")
+ FSUtils.makeDataFileName(instantTime, writeToken, fileId)).toString();
+ FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension())).toString();
newFilePath = new Path(config.getBasePath(), relativePath);
LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(),
@@ -131,8 +130,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
createMarkerFile(partitionPath);
// Create the writer for writing the new version file
storageWriter =
HoodieStorageWriterFactory.getStorageWriter(instantTime, newFilePath, hoodieTable, config, writerSchema, sparkTaskContextSupplier);
fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchema, sparkTaskContextSupplier);
} catch (IOException io) {
LOG.error("Error in update task at commit " + instantTime, io);
writeStatus.setGlobalError(io);
@@ -190,7 +189,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
if (indexedRecord.isPresent()) {
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) indexedRecord.get());
storageWriter.writeAvroWithMetadata(recordWithMetadataInSchema, hoodieRecord);
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, hoodieRecord);
recordsWritten++;
} else {
recordsDeleted++;
@@ -243,7 +242,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
String errMsg = "Failed to merge old record into new file for key " + key + " from old file " + getOldFilePath()
+ " to new file " + newFilePath;
try {
storageWriter.writeAvro(key, oldRecord);
fileWriter.writeAvro(key, oldRecord);
} catch (ClassCastException e) {
LOG.error("Schema mismatch when rewriting old record " + oldRecord + " from file " + getOldFilePath()
+ " to file " + newFilePath + " with writerSchema " + writerSchema.toString(true));
@@ -277,8 +276,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
keyToNewRecords.clear();
writtenRecordKeys.clear();
if (storageWriter != null) {
storageWriter.close();
if (fileWriter != null) {
fileWriter.close();
}
long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);

View File

@@ -18,14 +18,12 @@
package org.apache.hudi.io;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
/**
* Extract range information for a given file slice.
@@ -37,8 +35,7 @@ public class HoodieRangeInfoHandle<T extends HoodieRecordPayload> extends Hoodie
super(config, null, hoodieTable, partitionPathFilePair);
}
public String[] getMinMaxKeys() {
HoodieBaseFile dataFile = getLatestDataFile();
return ParquetUtils.readMinMaxRecordKeys(hoodieTable.getHadoopConf(), new Path(dataFile.getPath()));
public String[] getMinMaxKeys() throws IOException {
return createNewFileReader().readMinMaxRecordKeys();
}
}

View File

@@ -22,9 +22,14 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* Base class for read operations done logically on the file group.
@@ -56,4 +61,9 @@ public abstract class HoodieReadHandle<T extends HoodieRecordPayload> extends Ho
return hoodieTable.getBaseFileOnlyView()
.getLatestBaseFile(partitionPathFilePair.getLeft(), partitionPathFilePair.getRight()).get();
}
protected HoodieFileReader createNewFileReader() throws IOException {
return HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(),
new Path(getLatestDataFile().getPath()));
}
}

View File

@@ -30,6 +30,8 @@ import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
@@ -86,7 +88,8 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
throw new HoodieIOException("Failed to make dir " + path, e);
}
return new Path(path.toString(), FSUtils.makeDataFileName(instantTime, writeToken, fileId));
return new Path(path.toString(), FSUtils.makeDataFileName(instantTime, writeToken, fileId,
hoodieTable.getMetaClient().getTableConfig().getBaseFileFormat().getFileExtension()));
}
/**
@@ -180,4 +183,9 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
protected long getAttemptId() {
return sparkTaskContextSupplier.getAttemptIdSupplier().get();
}
protected HoodieFileWriter createNewFileWriter(String instantTime, Path path, HoodieTable<T> hoodieTable,
HoodieWriteConfig config, Schema schema, SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, schema, sparkTaskContextSupplier);
}
}

View File

@@ -24,7 +24,7 @@ import org.apache.avro.generic.IndexedRecord;
import java.io.IOException;
public interface HoodieStorageWriter<R extends IndexedRecord> {
public interface HoodieFileWriter<R extends IndexedRecord> {
void writeAvroWithMetadata(R newRecord, HoodieRecord record) throws IOException;

View File

@@ -34,23 +34,21 @@ import org.apache.parquet.avro.AvroSchemaConverter;
import java.io.IOException;
import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
public class HoodieStorageWriterFactory {
public class HoodieFileWriterFactory {
public static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieStorageWriter<R> getStorageWriter(
public static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> getFileWriter(
String instantTime, Path path, HoodieTable<T> hoodieTable, HoodieWriteConfig config, Schema schema,
SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
final String name = path.getName();
final String extension = FSUtils.isLogFile(path) ? HOODIE_LOG.getFileExtension() : FSUtils.getFileExtension(name);
final String extension = FSUtils.getFileExtension(path.getName());
if (PARQUET.getFileExtension().equals(extension)) {
return newParquetStorageWriter(instantTime, path, config, schema, hoodieTable, sparkTaskContextSupplier);
return newParquetFileWriter(instantTime, path, config, schema, hoodieTable, sparkTaskContextSupplier);
}
throw new UnsupportedOperationException(extension + " format not supported yet.");
}
private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieStorageWriter<R> newParquetStorageWriter(
private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newParquetFileWriter(
String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
BloomFilter filter = BloomFilterFactory

View File

@@ -29,7 +29,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
@@ -42,7 +41,7 @@ import java.util.concurrent.atomic.AtomicLong;
* the current file can take more records with the <code>canWrite()</code>
*/
public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
extends ParquetWriter<IndexedRecord> implements HoodieStorageWriter<R> {
extends ParquetWriter<IndexedRecord> implements HoodieFileWriter<R> {
private static AtomicLong recordIndex = new AtomicLong(1);
@@ -51,7 +50,6 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
private final long maxFileSize;
private final HoodieAvroWriteSupport writeSupport;
private final String instantTime;
private final Schema schema;
private final SparkTaskContextSupplier sparkTaskContextSupplier;
public HoodieParquetWriter(String instantTime, Path file, HoodieParquetConfig parquetConfig,
@@ -60,10 +58,10 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
ParquetFileWriter.Mode.CREATE, parquetConfig.getWriteSupport(), parquetConfig.getCompressionCodecName(),
parquetConfig.getBlockSize(), parquetConfig.getPageSize(), parquetConfig.getPageSize(),
ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED, ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
ParquetWriter.DEFAULT_WRITER_VERSION, registerFileSystem(file, parquetConfig.getHadoopConf()));
ParquetWriter.DEFAULT_WRITER_VERSION, FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf());
this.fs =
(HoodieWrapperFileSystem) this.file.getFileSystem(registerFileSystem(file, parquetConfig.getHadoopConf()));
(HoodieWrapperFileSystem) this.file.getFileSystem(FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
// We cannot accurately measure the snappy compressed output file size. We are choosing a
// conservative 10%
// TODO - compute this compression ratio dynamically by looking at the bytes written to the
@@ -72,18 +70,9 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
+ Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio());
this.writeSupport = parquetConfig.getWriteSupport();
this.instantTime = instantTime;
this.schema = schema;
this.sparkTaskContextSupplier = sparkTaskContextSupplier;
}
public static Configuration registerFileSystem(Path file, Configuration conf) {
Configuration returnConf = new Configuration(conf);
String scheme = FSUtils.getFs(file.toString(), conf).getScheme();
returnConf.set("fs." + HoodieWrapperFileSystem.getHoodieScheme(scheme) + ".impl",
HoodieWrapperFileSystem.class.getName());
return returnConf;
}
@Override
public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
String seqId =

View File

@@ -27,7 +27,6 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.ParquetReaderIterator;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -44,6 +43,8 @@ import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.action.clean.CleanActionExecutor;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.BulkInsertCommitActionExecutor;
@@ -58,9 +59,6 @@ import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -150,11 +148,13 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
} else {
AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getWriterSchema());
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
try (ParquetReader<IndexedRecord> reader =
AvroParquetReader.<IndexedRecord>builder(upsertHandle.getOldFilePath()).withConf(getHadoopConf()).build()) {
wrapper = new SparkBoundedInMemoryExecutor(config, new ParquetReaderIterator(reader),
HoodieFileReader<IndexedRecord> storageReader =
HoodieFileReaderFactory.getFileReader(getHadoopConf(), upsertHandle.getOldFilePath());
try {
wrapper =
new SparkBoundedInMemoryExecutor(config, storageReader.getRecordIterator(upsertHandle.getWriterSchema()),
new UpdateHandler(upsertHandle), x -> x);
wrapper.execute();
} catch (Exception e) {

View File

@@ -34,12 +34,14 @@ import org.apache.hudi.common.fs.ConsistencyGuard;
import org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -310,7 +312,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
/**
* Schedule compaction for the instant time.
*
*
* @param jsc Spark Context
* @param instantTime Instant Time for scheduling compaction
* @param extraMetadata additional metadata to write into plan
@@ -381,7 +383,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
/**
* Delete Marker directory corresponding to an instant.
*
*
* @param instantTs Instant Time
*/
public void deleteMarkerDir(String instantTs) {
@@ -422,9 +424,11 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
return;
}
List<String> invalidDataPaths = FSUtils.getAllDataFilesForMarkers(fs, basePath, instantTs, markerDir.toString());
final String baseFileExtension = getBaseFileFormat().getFileExtension();
List<String> invalidDataPaths = FSUtils.getAllDataFilesForMarkers(fs, basePath, instantTs, markerDir.toString(),
baseFileExtension);
List<String> validDataPaths = stats.stream().map(w -> String.format("%s/%s", basePath, w.getPath()))
.filter(p -> p.endsWith(".parquet")).collect(Collectors.toList());
.filter(p -> p.endsWith(baseFileExtension)).collect(Collectors.toList());
// Contains list of partially created files. These needs to be cleaned up.
invalidDataPaths.removeAll(validDataPaths);
if (!invalidDataPaths.isEmpty()) {
@@ -478,7 +482,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
/**
* Ensures all files passed either appear or disappear.
*
*
* @param jsc JavaSparkContext
* @param groupByPartition Files grouped by partition
* @param visibility Appear/Disappear
@@ -562,4 +566,26 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
throw new HoodieInsertException("Failed insert schema compability check.", e);
}
}
public HoodieFileFormat getBaseFileFormat() {
return metaClient.getTableConfig().getBaseFileFormat();
}
public HoodieFileFormat getLogFileFormat() {
return metaClient.getTableConfig().getLogFileFormat();
}
public HoodieLogBlockType getLogDataBlockFormat() {
switch (getBaseFileFormat()) {
case PARQUET:
return HoodieLogBlockType.AVRO_DATA_BLOCK;
default:
throw new HoodieException("Base file format " + getBaseFileFormat()
+ " does not have associated log block format");
}
}
public String getBaseFileExtension() {
return getBaseFileFormat().getFileExtension();
}
}

View File

@@ -19,7 +19,6 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.ParquetReaderIterator;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -32,6 +31,8 @@ import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.LazyInsertIterable;
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
@@ -39,9 +40,6 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaSparkContext;
@@ -89,11 +87,12 @@ public abstract class CommitActionExecutor<T extends HoodieRecordPayload<T>>
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
} else {
AvroReadSupport.setAvroReadSchema(table.getHadoopConf(), upsertHandle.getWriterSchema());
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
try (ParquetReader<IndexedRecord> reader =
AvroParquetReader.<IndexedRecord>builder(upsertHandle.getOldFilePath()).withConf(table.getHadoopConf()).build()) {
wrapper = new SparkBoundedInMemoryExecutor(config, new ParquetReaderIterator(reader),
try {
HoodieFileReader<IndexedRecord> storageReader =
HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), upsertHandle.getOldFilePath());
wrapper =
new SparkBoundedInMemoryExecutor(config, storageReader.getRecordIterator(upsertHandle.getWriterSchema()),
new UpdateHandler(upsertHandle), x -> x);
wrapper.execute();
} catch (Exception e) {

View File

@@ -83,7 +83,7 @@ public abstract class DeltaCommitActionExecutor<T extends HoodieRecordPayload<T>
@Override
public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr)
throws Exception {
// If canIndexLogFiles, write inserts to log files else write inserts to parquet files
// If canIndexLogFiles, write inserts to log files else write inserts to base files
if (table.getIndex().canIndexLogFiles()) {
return new LazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable<T>)table, idPfx,
sparkTaskContextSupplier, new AppendHandleFactory<>());

View File

@@ -71,8 +71,9 @@ public class RollbackHelper implements Serializable {
*/
public List<HoodieRollbackStat> performRollback(JavaSparkContext jsc, HoodieInstant instantToRollback, List<RollbackRequest> rollbackRequests) {
String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
SerializablePathFilter filter = (path) -> {
if (path.toString().contains(".parquet")) {
if (path.toString().contains(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return instantToRollback.getTimestamp().equals(fileCommitTime);
} else if (path.toString().contains(".log")) {
@@ -184,8 +185,9 @@ public class RollbackHelper implements Serializable {
Map<FileStatus, Boolean> results, String commit, String partitionPath) throws IOException {
LOG.info("Cleaning path " + partitionPath);
FileSystem fs = metaClient.getFs();
String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
PathFilter filter = (path) -> {
if (path.toString().contains(".parquet")) {
if (path.toString().contains(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
}

View File

@@ -1,64 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.utils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.parquet.hadoop.ParquetReader;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestParquetReaderIterator {
@Test
public void testParquetIteratorIdempotency() throws IOException {
ParquetReader reader = mock(ParquetReader.class);
// only 1 record in reader
when(reader.read()).thenReturn(1).thenReturn(null);
ParquetReaderIterator<Integer> iterator = new ParquetReaderIterator<>(reader);
int idempotencyCheckCounter = 0;
// call hasNext() 3 times
while (idempotencyCheckCounter < 3) {
assertTrue(iterator.hasNext());
idempotencyCheckCounter++;
}
}
@Test
public void testParquetIterator() throws IOException {
ParquetReader reader = mock(ParquetReader.class);
// only one record to read
when(reader.read()).thenReturn(1).thenReturn(null);
ParquetReaderIterator<Integer> iterator = new ParquetReaderIterator<>(reader);
// should return value even though hasNext() hasn't been called
assertEquals(1, iterator.next());
// no more entries to iterate on
assertFalse(iterator.hasNext());
assertThrows(HoodieIOException.class, iterator::next, "should throw an exception since there is only 1 record");
}
}

View File

@@ -257,7 +257,11 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
List<String> uuids =
Arrays.asList(record1.getRecordKey(), record2.getRecordKey(), record3.getRecordKey(), record4.getRecordKey());
List<String> results = HoodieKeyLookupHandle.checkCandidatesAgainstFile(hadoopConf, uuids,
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
HoodieKeyLookupHandle keyHandle = new HoodieKeyLookupHandle<>(config, table,
Pair.of("2016/01/31/", FSUtils.getFileId(filename)));
List<String> results = keyHandle.checkCandidatesAgainstFile(hadoopConf, uuids,
new Path(basePath + "/2016/01/31/" + filename));
assertEquals(results.size(), 2);
assertTrue(results.get(0).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")

View File

@@ -34,26 +34,26 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Tests for {@link HoodieStorageWriterFactory}.
* Tests for {@link HoodieFileWriterFactory}.
*/
public class TestHoodieStorageWriterFactory extends HoodieClientTestBase {
public class TestHoodieFileWriterFactory extends HoodieClientTestBase {
@Test
public void testGetStorageWriter() throws IOException {
public void testGetFileWriter() throws IOException {
// parquet file format.
final String instantTime = "100";
final Path parquetPath = new Path(basePath + "/partition/path/f1_1-0-1_000.parquet");
final HoodieWriteConfig cfg = getConfig();
HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
SparkTaskContextSupplier supplier = new SparkTaskContextSupplier();
HoodieStorageWriter<IndexedRecord> parquetWriter = HoodieStorageWriterFactory.getStorageWriter(instantTime,
HoodieFileWriter<IndexedRecord> parquetWriter = HoodieFileWriterFactory.getFileWriter(instantTime,
parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
assertTrue(parquetWriter instanceof HoodieParquetWriter);
// other file format exception.
final Path logPath = new Path(basePath + "/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1");
final Throwable thrown = assertThrows(UnsupportedOperationException.class, () -> {
HoodieStorageWriter<IndexedRecord> logWriter = HoodieStorageWriterFactory.getStorageWriter(instantTime, logPath,
HoodieFileWriter<IndexedRecord> logWriter = HoodieFileWriterFactory.getFileWriter(instantTime, logPath,
table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
}, "should fail since log storage writer is not supported yet.");
assertTrue(thrown.getMessage().contains("format not supported yet."));

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -46,7 +47,9 @@ import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.index.HoodieIndex;
@@ -66,9 +69,9 @@ import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.File;
import java.io.IOException;
@@ -90,37 +93,32 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
private HoodieParquetInputFormat roSnapshotInputFormat;
private JobConf roSnapshotJobConf;
private HoodieParquetInputFormat roInputFormat;
private JobConf roJobConf;
private HoodieParquetRealtimeInputFormat rtInputFormat;
private JobConf rtJobConf;
@BeforeEach
public void init() throws IOException {
@TempDir
public java.nio.file.Path tempFolder;
private HoodieFileFormat baseFileFormat;
static Stream<HoodieFileFormat> argumentsProvider() {
return Stream.of(HoodieFileFormat.PARQUET);
}
public void init(HoodieFileFormat baseFileFormat) throws IOException {
this.baseFileFormat = baseFileFormat;
initDFS();
initSparkContexts("TestHoodieMergeOnReadTable");
hadoopConf.addResource(dfs.getConf());
initPath();
dfs.mkdirs(new Path(basePath));
HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, baseFileFormat);
initTestDataGenerator();
// initialize parquet input format
roSnapshotInputFormat = new HoodieParquetInputFormat();
roSnapshotJobConf = new JobConf(jsc.hadoopConfiguration());
roSnapshotInputFormat.setConf(roSnapshotJobConf);
roInputFormat = new HoodieParquetInputFormat();
roSnapshotJobConf = new JobConf(hadoopConf);
roJobConf = new JobConf(hadoopConf);
roInputFormat.setConf(roJobConf);
rtInputFormat = new HoodieParquetRealtimeInputFormat();
rtJobConf = new JobConf(hadoopConf);
rtInputFormat.setConf(rtJobConf);
}
@AfterEach
@@ -128,8 +126,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
cleanupResources();
}
@Test
public void testSimpleInsertAndUpdate() throws Exception {
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testSimpleInsertAndUpdate(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -153,9 +154,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
String compactionCommitTime = client.scheduleCompaction(Option.empty()).get().toString();
client.compact(compactionCommitTime);
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent());
@@ -174,8 +176,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// test incremental read does not go past compaction instant for RO views
// For RT views, incremental read can go past compaction
@Test
public void testIncrementalReadsWithCompaction() throws Exception {
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testIncrementalReadsWithCompaction(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
String partitionPath = "2020/02/20"; // use only one partition for this test
dataGen = new HoodieTestDataGenerator(new String[] { partitionPath });
HoodieWriteConfig cfg = getConfig(true);
@@ -190,19 +195,17 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
List<HoodieRecord> records001 = dataGen.generateInserts(commitTime1, 200);
insertAndGetFilePaths(records001, client, cfg, commitTime1);
// verify only one parquet file shows up with commit time 001
// verify only one base file shows up with commit time 001
FileStatus[] snapshotROFiles = getROSnapshotFiles(partitionPath);
validateFiles(partitionPath,1, snapshotROFiles, roSnapshotInputFormat,
roSnapshotJobConf,200, commitTime1);
validateFiles(partitionPath, 1, snapshotROFiles, false, roSnapshotJobConf, 200, commitTime1);
FileStatus[] incrementalROFiles = getROIncrementalFiles(partitionPath, true);
validateFiles(partitionPath, 1, incrementalROFiles, roInputFormat,
roJobConf,200, commitTime1);
validateFiles(partitionPath, 1, incrementalROFiles, false, roJobConf, 200, commitTime1);
Path firstFilePath = incrementalROFiles[0].getPath();
FileStatus[] incrementalRTFiles = getRTIncrementalFiles(partitionPath);
validateFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat,
rtJobConf,200, commitTime1);
validateFiles(partitionPath, 1, incrementalRTFiles, true, rtJobConf,200, commitTime1);
assertEquals(firstFilePath, incrementalRTFiles[0].getPath());
/**
@@ -215,14 +218,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// verify RO incremental reads - only one parquet file shows up because updates to into log files
incrementalROFiles = getROIncrementalFiles(partitionPath, false);
validateFiles(partitionPath, 1, incrementalROFiles, roInputFormat,
roJobConf, 200, commitTime1);
validateFiles(partitionPath, 1, incrementalROFiles, false, roJobConf, 200, commitTime1);
assertEquals(firstFilePath, incrementalROFiles[0].getPath());
// verify RT incremental reads includes updates also
incrementalRTFiles = getRTIncrementalFiles(partitionPath);
validateFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat,
rtJobConf, 200, commitTime1, updateTime);
validateFiles(partitionPath, 1, incrementalRTFiles, true, rtJobConf, 200, commitTime1, updateTime);
// request compaction, but do not perform compaction
String compactionCommitTime = "005";
@@ -230,13 +231,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// verify RO incremental reads - only one parquet file shows up because updates go into log files
incrementalROFiles = getROIncrementalFiles(partitionPath, true);
validateFiles(partitionPath,1, incrementalROFiles, roInputFormat,
roJobConf, 200, commitTime1);
validateFiles(partitionPath,1, incrementalROFiles, false, roJobConf, 200, commitTime1);
// verify RT incremental reads includes updates also
incrementalRTFiles = getRTIncrementalFiles(partitionPath);
validateFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat,
rtJobConf, 200, commitTime1, updateTime);
validateFiles(partitionPath, 1, incrementalRTFiles, true, rtJobConf, 200, commitTime1, updateTime);
// write 3 - more inserts
String insertsTime = "006";
@@ -246,44 +245,44 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// verify new write shows up in snapshot mode even though there is pending compaction
snapshotROFiles = getROSnapshotFiles(partitionPath);
validateFiles(partitionPath, 2, snapshotROFiles, roSnapshotInputFormat,
roSnapshotJobConf,400, commitTime1, insertsTime);
validateFiles(partitionPath, 2, snapshotROFiles, false, roSnapshotJobConf,400, commitTime1, insertsTime);
incrementalROFiles = getROIncrementalFiles(partitionPath, true);
assertEquals(firstFilePath, incrementalROFiles[0].getPath());
// verify 006 does not show up in RO mode because of pending compaction
validateFiles(partitionPath, 1, incrementalROFiles, roInputFormat,
roJobConf, 200, commitTime1);
validateFiles(partitionPath, 1, incrementalROFiles, false, roJobConf, 200, commitTime1);
// verify that if stopAtCompaction is disabled, inserts from "insertsTime" show up
incrementalROFiles = getROIncrementalFiles(partitionPath, false);
validateFiles(partitionPath,2, incrementalROFiles, roInputFormat,
roJobConf, 400, commitTime1, insertsTime);
validateFiles(partitionPath,2, incrementalROFiles, false, roJobConf, 400, commitTime1, insertsTime);
// verify 006 shows up in RT views
incrementalRTFiles = getRTIncrementalFiles(partitionPath);
validateFiles(partitionPath, 2, incrementalRTFiles, rtInputFormat,
rtJobConf, 400, commitTime1, updateTime, insertsTime);
validateFiles(partitionPath, 2, incrementalRTFiles, true, rtJobConf, 400, commitTime1, updateTime, insertsTime);
// perform the scheduled compaction
client.compact(compactionCommitTime);
// verify new write shows up in snapshot mode after compaction is complete
snapshotROFiles = getROSnapshotFiles(partitionPath);
validateFiles(partitionPath,2, snapshotROFiles, roSnapshotInputFormat,
roSnapshotJobConf,400, commitTime1, compactionCommitTime, insertsTime);
validateFiles(partitionPath,2, snapshotROFiles, false, roSnapshotJobConf,400, commitTime1, compactionCommitTime,
insertsTime);
incrementalROFiles = getROIncrementalFiles(partitionPath, "002", -1, true);
assertTrue(incrementalROFiles.length == 2);
// verify 006 shows up because of pending compaction
validateFiles(partitionPath, 2, incrementalROFiles, roInputFormat,
roJobConf, 400, commitTime1, compactionCommitTime, insertsTime);
validateFiles(partitionPath, 2, incrementalROFiles, false, roJobConf, 400, commitTime1, compactionCommitTime,
insertsTime);
}
}
// Check if record level metadata is aggregated properly at the end of write.
@Test
public void testMetadataAggregateFromWriteStatus() throws Exception {
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testMetadataAggregateFromWriteStatus(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
HoodieWriteConfig cfg = getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build();
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -305,8 +304,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
@Test
public void testSimpleInsertUpdateAndDelete() throws Exception {
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testSimpleInsertUpdateAndDelete(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -332,7 +334,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent());
@@ -373,23 +375,25 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent());
List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles, basePath);
// Wrote 20 records and deleted 20 records, so remaining 20-20 = 0
assertEquals(0, recordsRead.size(), "Must contain 0 records");
}
}
@Test
public void testCOWToMORConvertedTableRollback() throws Exception {
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testCOWToMORConvertedTableRollback(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
// Set TableType to COW
HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE, baseFileFormat);
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -425,14 +429,14 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
assertNoWriteErrors(statuses);
// Set TableType to MOR
HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, baseFileFormat);
// rollback a COW commit when TableType is MOR
client.rollback(newCommitTime);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
final String absentCommit = newCommitTime;
@@ -440,8 +444,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
@Test
public void testRollbackWithDeltaAndCompactionCommit() throws Exception {
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testRollbackWithDeltaAndCompactionCommit(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
HoodieWriteConfig cfg = getConfig(false);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -471,7 +477,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
tableView =
getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
@@ -495,7 +501,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200));
List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles,
basePath);
assertEquals(recordsRead.size(), 200);
statuses = secondClient.upsert(jsc.parallelize(copyOfRecords, 1), commitTime1).collect();
@@ -504,12 +511,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// Test failed delta commit rollback
secondClient.rollback(commitTime1);
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
// After rollback, there should be no parquet file with the failed commit time
allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
// After rollback, there should be no base file with the failed commit time
assertEquals(0, Arrays.stream(allFiles)
.filter(file -> file.getPath().getName().contains(commitTime1)).count());
dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles, basePath);
assertEquals(200, recordsRead.size());
}
@@ -525,7 +532,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
copyOfRecords.addAll(dataGen.generateInserts(commitTime2, 200));
List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles,
basePath);
assertEquals(200, recordsRead.size());
writeRecords = jsc.parallelize(copyOfRecords, 1);
@@ -537,7 +545,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// Test successful delta commit rollback
thirdClient.rollback(commitTime2);
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
// After rollback, there should be no parquet file with the failed commit time
assertEquals(0, Arrays.stream(allFiles)
.filter(file -> file.getPath().getName().contains(commitTime2)).count());
@@ -546,7 +554,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles, basePath);
// check that the number of records read is still correct after rollback operation
assertEquals(200, recordsRead.size());
@@ -569,7 +577,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
JavaRDD<WriteStatus> ws = thirdClient.compact(compactionInstantTime);
thirdClient.commitCompaction(compactionInstantTime, ws, Option.empty());
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
metaClient = HoodieTableMetaClient.reload(metaClient);
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
@@ -580,7 +588,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
thirdClient.rollback(compactedCommitTime);
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
metaClient = HoodieTableMetaClient.reload(metaClient);
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
@@ -589,8 +597,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
@Test
public void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception {
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testMultiRollbackWithDeltaAndCompactionCommit(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
HoodieWriteConfig cfg = getConfig(false);
try (final HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -618,7 +628,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent());
@@ -641,7 +651,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200));
List<String> dataFiles = tableView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles,
basePath);
assertEquals(200, recordsRead.size());
statuses = nClient.upsert(jsc.parallelize(copyOfRecords, 1), newCommitTime).collect();
@@ -696,7 +707,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
JavaRDD<WriteStatus> ws = client.compact(compactionInstantTime);
client.commitCompaction(compactionInstantTime, ws, Option.empty());
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
metaClient = HoodieTableMetaClient.reload(metaClient);
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
@@ -724,7 +735,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
client.restoreToInstant("000");
metaClient = HoodieTableMetaClient.reload(metaClient);
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
dataFilesToRead = tableView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent());
@@ -751,8 +762,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
.build();
}
@Test
public void testUpsertPartitioner() throws Exception {
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testUpsertPartitioner(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -778,7 +792,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient,
metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
@@ -812,7 +826,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
roView = getHoodieTableFileSystemView(metaClient,
hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles();
@@ -823,14 +837,18 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
assertTrue(parquetFileIdToNewSize.entrySet().stream().anyMatch(entry -> parquetFileIdToSize.get(entry.getKey()) < entry.getValue()));
List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles,
basePath);
// Wrote 20 records in 2 batches
assertEquals(40, recordsRead.size(), "Must contain 40 records");
}
}
@Test
public void testLogFileCountsAfterCompaction() throws Exception {
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testLogFileCountsAfterCompaction(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
// insert 100 records
HoodieWriteConfig config = getConfig(true);
try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
@@ -902,8 +920,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
@Test
public void testSimpleInsertsGeneratedIntoLogFiles() throws Exception {
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testSimpleInsertsGeneratedIntoLogFiles(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
// insert 100 records
// Setting IndexType to be InMemory to simulate Global Index nature
HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
@@ -939,8 +960,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
@Test
public void testInsertsGeneratedIntoLogFilesRollback(@TempDir java.nio.file.Path tempFolder) throws Exception {
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testInsertsGeneratedIntoLogFilesRollback(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
// insert 100 records
// Setting IndexType to be InMemory to simulate Global Index nature
HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
@@ -1010,8 +1034,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
@Test
public void testInsertsGeneratedIntoLogFilesRollbackAfterCompaction() throws Exception {
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
// insert 100 records
// Setting IndexType to be InMemory to simulate Global Index nature
HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
@@ -1063,8 +1090,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
/**
* Test to ensure rolling stats are correctly written to metadata file.
*/
@Test
public void testRollingStatsInMetadata() throws Exception {
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testRollingStatsInMetadata(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -1163,8 +1192,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
/**
* Test to ensure rolling stats are correctly written to the metadata file, identifies small files and corrects them.
*/
@Test
public void testRollingStatsWithSmallFileHandling() throws Exception {
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testRollingStatsWithSmallFileHandling(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
Map<String, Long> fileIdToInsertsMap = new HashMap<>();
@@ -1296,8 +1328,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
/**
* Test to validate invoking table.handleUpdate() with input records from multiple partitions will fail.
*/
@Test
public void testHandleUpdateWithMultiplePartitions() throws Exception {
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testHandleUpdateWithMultiplePartitions(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -1323,7 +1358,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
BaseFileOnlyView roView =
getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
@@ -1401,7 +1436,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
assertFalse(status.hasErrors(), "Errors found in write of " + status.getFileId());
}
}
private FileStatus[] insertAndGetFilePaths(List<HoodieRecord> records, HoodieWriteClient client,
HoodieWriteConfig cfg, String commitTime) throws IOException {
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
@@ -1419,7 +1454,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().lastInstant();
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
BaseFileOnlyView roView =
getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
@@ -1452,14 +1487,14 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
return HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
return listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
}
private FileStatus[] getROSnapshotFiles(String partitionPath)
throws Exception {
HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
FileInputFormat.setInputPaths(roSnapshotJobConf, basePath + "/" + partitionPath);
return roSnapshotInputFormat.listStatus(roSnapshotJobConf);
return listStatus(roSnapshotJobConf, false);
}
private FileStatus[] getROIncrementalFiles(String partitionPath, boolean stopAtCompaction)
@@ -1469,10 +1504,9 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
private FileStatus[] getROIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull, boolean stopAtCompaction)
throws Exception {
HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
setupIncremental(roJobConf, startCommitTime, numCommitsToPull, stopAtCompaction);
FileInputFormat.setInputPaths(roJobConf, Paths.get(basePath, partitionPath).toString());
return roInputFormat.listStatus(roJobConf);
return listStatus(roJobConf, false);
}
private FileStatus[] getRTIncrementalFiles(String partitionPath)
@@ -1482,10 +1516,9 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
private FileStatus[] getRTIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull)
throws Exception {
HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
setupIncremental(rtJobConf, startCommitTime, numCommitsToPull, false);
FileInputFormat.setInputPaths(rtJobConf, Paths.get(basePath, partitionPath).toString());
return rtInputFormat.listStatus(rtJobConf);
return listStatus(rtJobConf, true);
}
private void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull, boolean stopAtCompaction) {
@@ -1507,16 +1540,37 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
private void validateFiles(String partitionPath, int expectedNumFiles,
FileStatus[] files, HoodieParquetInputFormat inputFormat,
JobConf jobConf, int expectedRecords, String... expectedCommits) {
FileStatus[] files, boolean realtime, JobConf jobConf,
int expectedRecords, String... expectedCommits) {
assertEquals(expectedNumFiles, files.length);
Set<String> expectedCommitsSet = Arrays.stream(expectedCommits).collect(Collectors.toSet());
List<GenericRecord> records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
Collections.singletonList(Paths.get(basePath, partitionPath).toString()), basePath, jobConf, inputFormat);
List<GenericRecord> records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf,
Collections.singletonList(Paths.get(basePath, partitionPath).toString()), basePath, jobConf, realtime);
assertEquals(expectedRecords, records.size());
Set<String> actualCommits = records.stream().map(r ->
r.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString()).collect(Collectors.toSet());
assertEquals(expectedCommitsSet, actualCommits);
}
private FileStatus[] listAllDataFilesInPath(HoodieTable table, String basePath) throws IOException {
return HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), basePath, table.getBaseFileExtension());
}
private FileStatus[] listStatus(JobConf jobConf, boolean realtime) throws IOException {
// This is required as Hoodie InputFormats do not extend a common base class and FileInputFormat's
// listStatus() is protected.
FileInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(baseFileFormat, realtime, jobConf);
switch (baseFileFormat) {
case PARQUET:
if (realtime) {
return ((HoodieParquetRealtimeInputFormat)inputFormat).listStatus(jobConf);
} else {
return ((HoodieParquetInputFormat)inputFormat).listStatus(jobConf);
}
default:
throw new HoodieIOException("Hoodie InputFormat not implemented for base file format " + baseFileFormat);
}
}
}

View File

@@ -29,6 +29,7 @@ import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -181,15 +182,20 @@ public class HoodieClientTestUtils {
/**
* Obtain all new data written into the Hoodie table since the given timestamp.
*/
public static Dataset<Row> readSince(String basePath, SQLContext sqlContext, HoodieTimeline commitTimeline,
String lastCommitTime) {
public static Dataset<Row> readSince(String basePath, SQLContext sqlContext,
HoodieTimeline commitTimeline, String lastCommitTime) {
List<HoodieInstant> commitsToReturn =
commitTimeline.findInstantsAfter(lastCommitTime, Integer.MAX_VALUE).getInstants().collect(Collectors.toList());
try {
// Go over the commit metadata, and obtain the new files that need to be read.
HashMap<String, String> fileIdToFullPath = getLatestFileIDsToFullPath(basePath, commitTimeline, commitsToReturn);
return sqlContext.read().parquet(fileIdToFullPath.values().toArray(new String[fileIdToFullPath.size()]))
.filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, lastCommitTime));
String[] paths = fileIdToFullPath.values().toArray(new String[fileIdToFullPath.size()]);
Dataset<Row> rows = null;
if (paths[0].endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
rows = sqlContext.read().parquet(paths);
}
return rows.filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, lastCommitTime));
} catch (IOException e) {
throw new HoodieException("Error pulling data incrementally from commitTimestamp :" + lastCommitTime, e);
}

View File

@@ -19,18 +19,19 @@
package org.apache.hudi.testutils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
@@ -45,20 +46,19 @@ import java.util.stream.Collectors;
* Utility methods to aid in testing MergeOnRead (workaround for HoodieReadClient for MOR).
*/
public class HoodieMergeOnReadTestUtils {
public static List<GenericRecord> getRecordsUsingInputFormat(List<String> inputPaths, String basePath) {
return getRecordsUsingInputFormat(inputPaths, basePath, new Configuration());
public static List<GenericRecord> getRecordsUsingInputFormat(Configuration conf, List<String> inputPaths,
String basePath) {
return getRecordsUsingInputFormat(conf, inputPaths, basePath, new JobConf(conf), true);
}
public static List<GenericRecord> getRecordsUsingInputFormat(List<String> inputPaths, String basePath,
Configuration conf) {
JobConf jobConf = new JobConf(conf);
return getRecordsUsingInputFormat(inputPaths, basePath, jobConf, new HoodieParquetRealtimeInputFormat());
}
public static List<GenericRecord> getRecordsUsingInputFormat(List<String> inputPaths,
public static List<GenericRecord> getRecordsUsingInputFormat(Configuration conf, List<String> inputPaths,
String basePath,
JobConf jobConf,
HoodieParquetInputFormat inputFormat) {
boolean realtime) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf, basePath);
FileInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(metaClient.getTableConfig().getBaseFileFormat(),
realtime, jobConf);
Schema schema = HoodieAvroUtils.addMetadataFields(
new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA));
setPropsForInputFormat(inputFormat, jobConf, schema, basePath);
@@ -93,8 +93,8 @@ public class HoodieMergeOnReadTestUtils {
}).orElse(new ArrayList<GenericRecord>());
}
private static void setPropsForInputFormat(HoodieParquetInputFormat inputFormat, JobConf jobConf,
Schema schema, String basePath) {
private static void setPropsForInputFormat(FileInputFormat inputFormat, JobConf jobConf, Schema schema,
String basePath) {
List<Schema.Field> fields = schema.getFields();
String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(","));
String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
@@ -116,7 +116,10 @@ public class HoodieMergeOnReadTestUtils {
conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
conf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr");
conf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, hiveColumnTypes);
inputFormat.setConf(conf);
// Hoodie Input formats are also configurable
Configurable configurable = (Configurable)inputFormat;
configurable.setConf(conf);
jobConf.addResource(conf);
}