[HUDI-1055] Remove hardcoded parquet in tests (#2740)
* Remove hardcoded parquet in tests * Use DataFileUtils.getInstance * Renaming DataFileUtils to BaseFileUtils Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
@@ -25,8 +25,9 @@ import org.apache.hudi.avro.HoodieAvroWriteSupport
|
||||
import org.apache.hudi.client.SparkTaskContextSupplier
|
||||
import org.apache.hudi.common.HoodieJsonPayload
|
||||
import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory}
|
||||
import org.apache.hudi.common.model.HoodieFileFormat
|
||||
import org.apache.hudi.common.model.HoodieRecord
|
||||
import org.apache.hudi.common.util.ParquetUtils
|
||||
import org.apache.hudi.common.util.BaseFileUtils
|
||||
import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig}
|
||||
import org.apache.hudi.io.storage.{HoodieAvroParquetConfig, HoodieParquetWriter}
|
||||
import org.apache.parquet.avro.AvroSchemaConverter
|
||||
@@ -40,7 +41,7 @@ import scala.collection.mutable._
|
||||
object SparkHelpers {
|
||||
@throws[Exception]
|
||||
def skipKeysAndWriteNewFile(instantTime: String, fs: FileSystem, sourceFile: Path, destinationFile: Path, keysToSkip: Set[String]) {
|
||||
val sourceRecords = ParquetUtils.readAvroRecords(fs.getConf, sourceFile)
|
||||
val sourceRecords = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readAvroRecords(fs.getConf, sourceFile)
|
||||
val schema: Schema = sourceRecords.get(0).getSchema
|
||||
val filter: BloomFilter = BloomFilterFactory.createBloomFilter(HoodieIndexConfig.DEFAULT_BLOOM_FILTER_NUM_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_FILTER_FPP.toDouble,
|
||||
HoodieIndexConfig.DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_INDEX_FILTER_TYPE);
|
||||
@@ -125,7 +126,7 @@ class SparkHelper(sqlContext: SQLContext, fs: FileSystem) {
|
||||
* @return
|
||||
*/
|
||||
def fileKeysAgainstBF(conf: Configuration, sqlContext: SQLContext, file: String): Boolean = {
|
||||
val bf = ParquetUtils.readBloomFilterFromParquetMetadata(conf, new Path(file))
|
||||
val bf = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readBloomFilterFromMetadata(conf, new Path(file))
|
||||
val foundCount = sqlContext.parquetFile(file)
|
||||
.select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`")
|
||||
.collect().count(r => !bf.mightContain(r.getString(0)))
|
||||
|
||||
@@ -70,7 +70,7 @@ public class TestRollbacksCommand extends AbstractShellIntegrationTest {
|
||||
tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
|
||||
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
|
||||
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
|
||||
//Create some commits files and parquet files
|
||||
//Create some commits files and base files
|
||||
Map<String, String> partitionAndFileId = new HashMap<String, String>() {
|
||||
{
|
||||
put(DEFAULT_FIRST_PARTITION_PATH, "file-1");
|
||||
|
||||
@@ -60,7 +60,7 @@ public class TestUpgradeDowngradeCommand extends AbstractShellIntegrationTest {
|
||||
tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
|
||||
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
|
||||
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
|
||||
//Create some commits files and parquet files
|
||||
//Create some commits files and base files
|
||||
HoodieTestTable.of(metaClient)
|
||||
.withPartitionMetaFiles(DEFAULT_PARTITION_PATHS)
|
||||
.addCommit("100")
|
||||
|
||||
@@ -71,7 +71,7 @@ public class ITTestCommitsCommand extends AbstractShellIntegrationTest {
|
||||
*/
|
||||
@Test
|
||||
public void testRollbackCommit() throws Exception {
|
||||
//Create some commits files and parquet files
|
||||
//Create some commits files and base files
|
||||
Map<String, String> partitionAndFileId = new HashMap<String, String>() {
|
||||
{
|
||||
put(DEFAULT_FIRST_PARTITION_PATH, "file-1");
|
||||
|
||||
@@ -22,7 +22,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.util.ParquetUtils;
|
||||
import org.apache.hudi.common.util.BaseFileUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
@@ -48,7 +48,8 @@ public class HoodieKeyLocationFetchHandle<T extends HoodieRecordPayload, I, K, O
|
||||
|
||||
public Stream<Pair<HoodieKey, HoodieRecordLocation>> locations() {
|
||||
HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
|
||||
return ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hoodieTable.getHadoopConf(), new Path(baseFile.getPath())).stream()
|
||||
return BaseFileUtils.getInstance(baseFile.getPath()).fetchRecordKeyPartitionPath(
|
||||
hoodieTable.getHadoopConf(), new Path(baseFile.getPath())).stream()
|
||||
.map(entry -> Pair.of(entry,
|
||||
new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId())));
|
||||
}
|
||||
|
||||
@@ -135,7 +135,7 @@ public class RollbackUtils {
|
||||
!activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit, 1).empty();
|
||||
if (higherDeltaCommits) {
|
||||
// Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled
|
||||
// and has not yet finished. In this scenario we should delete only the newly created parquet files
|
||||
// and has not yet finished. In this scenario we should delete only the newly created base files
|
||||
// and not corresponding base commit log files created with this as baseCommit since updates would
|
||||
// have been written to the log files.
|
||||
LOG.info("Rolling back compaction. There are higher delta commits. So only deleting data files");
|
||||
@@ -168,13 +168,13 @@ public class RollbackUtils {
|
||||
// ---------------------------------------------------------------------------------------------------
|
||||
// (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal
|
||||
// ---------------------------------------------------------------------------------------------------
|
||||
// (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries.
|
||||
// In this scenario, we delete all the parquet files written for the failed commit.
|
||||
// (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In
|
||||
// (B.1) Failed first commit - Inserts were written to base files and HoodieWriteStat has no entries.
|
||||
// In this scenario, we delete all the base files written for the failed commit.
|
||||
// (B.2) Failed recurring commits - Inserts were written to base files and updates to log files. In
|
||||
// this scenario, perform (A.1) and for updates written to log files, write rollback blocks.
|
||||
// (B.3) Rollback triggered for first commit - Same as (B.1)
|
||||
// (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files
|
||||
// as well if the base parquet file gets deleted.
|
||||
// as well if the base base file gets deleted.
|
||||
try {
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
|
||||
table.getMetaClient().getCommitTimeline()
|
||||
@@ -183,7 +183,7 @@ public class RollbackUtils {
|
||||
HoodieCommitMetadata.class);
|
||||
|
||||
// In case all data was inserts and the commit failed, delete the file belonging to that commit
|
||||
// We do not know fileIds for inserts (first inserts are either log files or parquet files),
|
||||
// We do not know fileIds for inserts (first inserts are either log files or base files),
|
||||
// delete all files for the corresponding failed commit, if present (same as COW)
|
||||
partitionRollbackRequests.add(
|
||||
ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath));
|
||||
@@ -211,7 +211,7 @@ public class RollbackUtils {
|
||||
// wStat.getPrevCommit() might not give the right commit time in the following
|
||||
// scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be
|
||||
// used to write the new log files. In this case, the commit time for the log file is the compaction requested time.
|
||||
// But the index (global) might store the baseCommit of the parquet and not the requested, hence get the
|
||||
// But the index (global) might store the baseCommit of the base and not the requested, hence get the
|
||||
// baseCommit always by listing the file slice
|
||||
Map<String, String> fileIdToBaseCommitTimeForLogMap = table.getSliceView().getLatestFileSlices(partitionPath)
|
||||
.collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime));
|
||||
|
||||
@@ -221,7 +221,7 @@ public class TestFlinkHoodieBloomIndex extends HoodieFlinkClientTestHarness {
|
||||
HoodieRecord record4 =
|
||||
new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4);
|
||||
|
||||
// We write record1, record2 to a parquet file, but the bloom filter contains (record1,
|
||||
// We write record1, record2 to a base file, but the bloom filter contains (record1,
|
||||
// record2, record3).
|
||||
BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name());
|
||||
filter.add(record3.getRecordKey());
|
||||
@@ -311,7 +311,7 @@ public class TestFlinkHoodieBloomIndex extends HoodieFlinkClientTestHarness {
|
||||
assertFalse(record.isCurrentLocationKnown());
|
||||
}
|
||||
|
||||
// We create three parquet file, each having one record. (two different partitions)
|
||||
// We create three base file, each having one record. (two different partitions)
|
||||
String fileId1 = testTable.addCommit("001").getFileIdWithInserts("2016/01/31", record1);
|
||||
String fileId2 = testTable.addCommit("002").getFileIdWithInserts("2016/01/31", record2);
|
||||
String fileId3 = testTable.addCommit("003").getFileIdWithInserts("2015/01/31", record4);
|
||||
@@ -385,7 +385,7 @@ public class TestFlinkHoodieBloomIndex extends HoodieFlinkClientTestHarness {
|
||||
assertTrue(!record.isPresent());
|
||||
}
|
||||
|
||||
// We create three parquet file, each having one record. (two different partitions)
|
||||
// We create three base file, each having one record. (two different partitions)
|
||||
String fileId1 = testTable.addCommit("001").getFileIdWithInserts("2016/01/31", record1);
|
||||
String fileId2 = testTable.addCommit("002").getFileIdWithInserts("2016/01/31", record2);
|
||||
String fileId3 = testTable.addCommit("003").getFileIdWithInserts("2015/01/31", record4);
|
||||
@@ -433,7 +433,7 @@ public class TestFlinkHoodieBloomIndex extends HoodieFlinkClientTestHarness {
|
||||
String recordStr2 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
|
||||
+ "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
|
||||
|
||||
// We write record1 to a parquet file, using a bloom filter having both records
|
||||
// We write record1 to a base file, using a bloom filter having both records
|
||||
RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1);
|
||||
HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1);
|
||||
RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2);
|
||||
|
||||
@@ -31,8 +31,8 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||
import org.apache.hudi.common.testutils.RawTripTestPayload;
|
||||
import org.apache.hudi.common.testutils.Transformations;
|
||||
import org.apache.hudi.common.util.BaseFileUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ParquetUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.config.HoodieStorageConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
@@ -125,6 +125,7 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase
|
||||
HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
|
||||
writeClient.startCommitWithTime(firstCommitTime);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient);
|
||||
|
||||
String partitionPath = "2016/01/31";
|
||||
HoodieJavaCopyOnWriteTable table = (HoodieJavaCopyOnWriteTable) HoodieJavaTable.create(config, context, metaClient);
|
||||
@@ -155,14 +156,14 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase
|
||||
assertEquals(1, allFiles.length);
|
||||
|
||||
// Read out the bloom filter and make sure filter can answer record exist or not
|
||||
Path parquetFilePath = allFiles[0].getPath();
|
||||
BloomFilter filter = ParquetUtils.readBloomFilterFromParquetMetadata(hadoopConf, parquetFilePath);
|
||||
Path filePath = allFiles[0].getPath();
|
||||
BloomFilter filter = fileUtils.readBloomFilterFromMetadata(hadoopConf, filePath);
|
||||
for (HoodieRecord record : records) {
|
||||
assertTrue(filter.mightContain(record.getRecordKey()));
|
||||
}
|
||||
|
||||
// Read the parquet file, check the record content
|
||||
List<GenericRecord> fileRecords = ParquetUtils.readAvroRecords(hadoopConf, parquetFilePath);
|
||||
// Read the base file, check the record content
|
||||
List<GenericRecord> fileRecords = fileUtils.readAvroRecords(hadoopConf, filePath);
|
||||
GenericRecord newRecord;
|
||||
int index = 0;
|
||||
for (GenericRecord record : fileRecords) {
|
||||
@@ -193,12 +194,12 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase
|
||||
allFiles = getIncrementalFiles(partitionPath, firstCommitTime, -1);
|
||||
assertEquals(1, allFiles.length);
|
||||
// verify new incremental file group is same as the previous one
|
||||
assertEquals(FSUtils.getFileId(parquetFilePath.getName()), FSUtils.getFileId(allFiles[0].getPath().getName()));
|
||||
assertEquals(FSUtils.getFileId(filePath.getName()), FSUtils.getFileId(allFiles[0].getPath().getName()));
|
||||
|
||||
// Check whether the record has been updated
|
||||
Path updatedParquetFilePath = allFiles[0].getPath();
|
||||
Path updatedfilePath = allFiles[0].getPath();
|
||||
BloomFilter updatedFilter =
|
||||
ParquetUtils.readBloomFilterFromParquetMetadata(hadoopConf, updatedParquetFilePath);
|
||||
fileUtils.readBloomFilterFromMetadata(hadoopConf, updatedfilePath);
|
||||
for (HoodieRecord record : records) {
|
||||
// No change to the _row_key
|
||||
assertTrue(updatedFilter.mightContain(record.getRecordKey()));
|
||||
@@ -207,7 +208,7 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase
|
||||
assertTrue(updatedFilter.mightContain(insertedRecord1.getRecordKey()));
|
||||
records.add(insertedRecord1);// add this so it can further check below
|
||||
|
||||
ParquetReader updatedReader = ParquetReader.builder(new AvroReadSupport<>(), updatedParquetFilePath).build();
|
||||
ParquetReader updatedReader = ParquetReader.builder(new AvroReadSupport<>(), updatedfilePath).build();
|
||||
index = 0;
|
||||
while ((newRecord = (GenericRecord) updatedReader.read()) != null) {
|
||||
assertEquals(newRecord.get("_row_key").toString(), records.get(index).getRecordKey());
|
||||
@@ -397,7 +398,7 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase
|
||||
// Check the updated file
|
||||
int counts = 0;
|
||||
for (File file : Paths.get(basePath, "2016/01/31").toFile().listFiles()) {
|
||||
if (file.getName().endsWith(".parquet") && FSUtils.getCommitTime(file.getName()).equals(instantTime)) {
|
||||
if (file.getName().endsWith(table.getBaseFileExtension()) && FSUtils.getCommitTime(file.getName()).equals(instantTime)) {
|
||||
LOG.info(file.getName() + "-" + file.length());
|
||||
counts++;
|
||||
}
|
||||
|
||||
@@ -47,7 +47,7 @@ public class HoodieSparkBootstrapSchemaProvider extends HoodieBootstrapSchemaPro
|
||||
MessageType parquetSchema = partitions.stream().flatMap(p -> p.getValue().stream()).map(fs -> {
|
||||
try {
|
||||
Path filePath = FileStatusUtils.toPath(fs.getPath());
|
||||
return ParquetUtils.readSchema(context.getHadoopConf().get(), filePath);
|
||||
return new ParquetUtils().readSchema(context.getHadoopConf().get(), filePath);
|
||||
} catch (Exception ex) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -163,7 +163,7 @@ public class TestClientRollback extends HoodieClientTestBase {
|
||||
*/
|
||||
@Test
|
||||
public void testRollbackCommit() throws Exception {
|
||||
// Let's create some commit files and parquet files
|
||||
// Let's create some commit files and base files
|
||||
final String p1 = "2016/05/01";
|
||||
final String p2 = "2016/05/02";
|
||||
final String p3 = "2016/05/06";
|
||||
@@ -251,7 +251,7 @@ public class TestClientRollback extends HoodieClientTestBase {
|
||||
*/
|
||||
@Test
|
||||
public void testAutoRollbackInflightCommit() throws Exception {
|
||||
// Let's create some commit files and parquet files
|
||||
// Let's create some commit files and base files
|
||||
final String p1 = "2016/05/01";
|
||||
final String p2 = "2016/05/02";
|
||||
final String p3 = "2016/05/06";
|
||||
|
||||
@@ -49,9 +49,9 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||
import org.apache.hudi.common.testutils.RawTripTestPayload;
|
||||
import org.apache.hudi.common.util.ClusteringUtils;
|
||||
import org.apache.hudi.common.util.CollectionUtils;
|
||||
import org.apache.hudi.common.util.BaseFileUtils;
|
||||
import org.apache.hudi.common.util.FileIOUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ParquetUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.config.HoodieClusteringConfig;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
@@ -115,7 +115,6 @@ import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.NULL_SCHE
|
||||
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
|
||||
import static org.apache.hudi.common.testutils.Transformations.randomSelectAsHoodieKeys;
|
||||
import static org.apache.hudi.common.testutils.Transformations.recordsToRecordKeySet;
|
||||
import static org.apache.hudi.common.util.ParquetUtils.readRowKeysFromParquet;
|
||||
import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY;
|
||||
import static org.apache.hudi.config.HoodieClusteringConfig.DEFAULT_CLUSTERING_EXECUTION_STRATEGY_CLASS;
|
||||
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
|
||||
@@ -424,23 +423,24 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build();
|
||||
String basePathStr = basePath;
|
||||
HoodieTable table = getHoodieTable(metaClient, cfg);
|
||||
String extension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
|
||||
jsc.parallelize(Arrays.asList(1)).map(e -> {
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
||||
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(
|
||||
metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get()).get(),
|
||||
HoodieCommitMetadata.class);
|
||||
String filePath = commitMetadata.getPartitionToWriteStats().values().stream()
|
||||
.flatMap(w -> w.stream()).filter(s -> s.getPath().endsWith(".parquet")).findAny()
|
||||
.flatMap(w -> w.stream()).filter(s -> s.getPath().endsWith(extension)).findAny()
|
||||
.map(ee -> ee.getPath()).orElse(null);
|
||||
String partitionPath = commitMetadata.getPartitionToWriteStats().values().stream()
|
||||
.flatMap(w -> w.stream()).filter(s -> s.getPath().endsWith(".parquet")).findAny()
|
||||
.flatMap(w -> w.stream()).filter(s -> s.getPath().endsWith(extension)).findAny()
|
||||
.map(ee -> ee.getPartitionPath()).orElse(null);
|
||||
Path parquetFilePath = new Path(basePathStr, filePath);
|
||||
HoodieBaseFile baseFile = new HoodieBaseFile(parquetFilePath.toString());
|
||||
Path baseFilePath = new Path(basePathStr, filePath);
|
||||
HoodieBaseFile baseFile = new HoodieBaseFile(baseFilePath.toString());
|
||||
|
||||
try {
|
||||
HoodieMergeHandle handle = new HoodieMergeHandle(cfg, instantTime, table, new HashMap<>(),
|
||||
partitionPath, FSUtils.getFileId(parquetFilePath.getName()), baseFile, new SparkTaskContextSupplier());
|
||||
partitionPath, FSUtils.getFileId(baseFilePath.getName()), baseFile, new SparkTaskContextSupplier());
|
||||
WriteStatus writeStatus = new WriteStatus(false, 0.0);
|
||||
writeStatus.setStat(new HoodieWriteStat());
|
||||
writeStatus.getStat().setNumWrites(0);
|
||||
@@ -454,7 +454,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
cfg.getProps().setProperty("hoodie.merge.data.validation.enabled", "true");
|
||||
HoodieWriteConfig cfg2 = HoodieWriteConfig.newBuilder().withProps(cfg.getProps()).build();
|
||||
HoodieMergeHandle handle = new HoodieMergeHandle(cfg2, newInstantTime, table, new HashMap<>(),
|
||||
partitionPath, FSUtils.getFileId(parquetFilePath.getName()), baseFile, new SparkTaskContextSupplier());
|
||||
partitionPath, FSUtils.getFileId(baseFilePath.getName()), baseFile, new SparkTaskContextSupplier());
|
||||
WriteStatus writeStatus = new WriteStatus(false, 0.0);
|
||||
writeStatus.setStat(new HoodieWriteStat());
|
||||
writeStatus.getStat().setNumWrites(0);
|
||||
@@ -850,6 +850,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max
|
||||
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
|
||||
SparkRDDWriteClient client = getHoodieWriteClient(config);
|
||||
BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient);
|
||||
|
||||
// Inserts => will write file1
|
||||
String commitTime1 = "001";
|
||||
@@ -865,7 +866,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
|
||||
String file1 = statuses.get(0).getFileId();
|
||||
assertEquals(100,
|
||||
readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath()))
|
||||
fileUtils.readRowKeys(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath()))
|
||||
.size(), "file should contain 100 records");
|
||||
|
||||
// Update + Inserts such that they just expand file1
|
||||
@@ -885,10 +886,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be expanded");
|
||||
assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), "Existing file should be expanded");
|
||||
Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
|
||||
assertEquals(140, readRowKeysFromParquet(hadoopConf, newFile).size(),
|
||||
assertEquals(140, fileUtils.readRowKeys(hadoopConf, newFile).size(),
|
||||
"file should contain 140 records");
|
||||
|
||||
List<GenericRecord> records = ParquetUtils.readAvroRecords(hadoopConf, newFile);
|
||||
List<GenericRecord> records = fileUtils.readAvroRecords(hadoopConf, newFile);
|
||||
for (GenericRecord record : records) {
|
||||
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
|
||||
assertEquals(commitTime2, record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(), "only expect commit2");
|
||||
@@ -919,7 +920,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
for (HoodieBaseFile file : files) {
|
||||
if (file.getFileName().contains(file1)) {
|
||||
assertEquals(commitTime3, file.getCommitTime(), "Existing file should be expanded");
|
||||
records = ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath()));
|
||||
records = fileUtils.readAvroRecords(hadoopConf, new Path(file.getPath()));
|
||||
for (GenericRecord record : records) {
|
||||
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
|
||||
String recordCommitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
|
||||
@@ -935,7 +936,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
assertEquals(0, keys2.size(), "All keys added in commit 2 must be updated in commit3 correctly");
|
||||
} else {
|
||||
assertEquals(commitTime3, file.getCommitTime(), "New file must be written for commit 3");
|
||||
records = ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath()));
|
||||
records = fileUtils.readAvroRecords(hadoopConf, new Path(file.getPath()));
|
||||
for (GenericRecord record : records) {
|
||||
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
|
||||
assertEquals(commitTime3, record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(),
|
||||
@@ -961,6 +962,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, false, mergeAllowDuplicateInserts); // hold upto 200 records max
|
||||
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
|
||||
SparkRDDWriteClient client = getHoodieWriteClient(config);
|
||||
BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient);
|
||||
|
||||
// Inserts => will write file1
|
||||
String commitTime1 = "001";
|
||||
@@ -974,7 +976,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
|
||||
String file1 = statuses.get(0).getFileId();
|
||||
assertEquals(100,
|
||||
readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath()))
|
||||
fileUtils.readRowKeys(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath()))
|
||||
.size(), "file should contain 100 records");
|
||||
|
||||
// Second, set of Inserts should just expand file1
|
||||
@@ -990,9 +992,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), "Existing file should be expanded");
|
||||
|
||||
Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
|
||||
assertEquals(140, readRowKeysFromParquet(hadoopConf, newFile).size(),
|
||||
assertEquals(140, fileUtils.readRowKeys(hadoopConf, newFile).size(),
|
||||
"file should contain 140 records");
|
||||
List<GenericRecord> records = ParquetUtils.readAvroRecords(hadoopConf, newFile);
|
||||
List<GenericRecord> records = fileUtils.readAvroRecords(hadoopConf, newFile);
|
||||
for (GenericRecord record : records) {
|
||||
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
|
||||
String recCommitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
|
||||
@@ -1011,8 +1013,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
assertNoWriteErrors(statuses);
|
||||
assertEquals(2, statuses.size(), "2 files needs to be committed.");
|
||||
assertEquals(340,
|
||||
readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())).size()
|
||||
+ readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(1).getStat().getPath())).size(),
|
||||
fileUtils.readRowKeys(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())).size()
|
||||
+ fileUtils.readRowKeys(hadoopConf, new Path(basePath, statuses.get(1).getStat().getPath())).size(),
|
||||
"file should contain 340 records");
|
||||
|
||||
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
|
||||
@@ -1024,7 +1026,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
int totalInserts = 0;
|
||||
for (HoodieBaseFile file : files) {
|
||||
assertEquals(commitTime3, file.getCommitTime(), "All files must be at commit 3");
|
||||
totalInserts += ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath())).size();
|
||||
totalInserts += fileUtils.readAvroRecords(hadoopConf, new Path(file.getPath())).size();
|
||||
}
|
||||
assertEquals(totalInserts, inserts1.size() + inserts2.size() + inserts3.size(), "Total number of records must add up");
|
||||
}
|
||||
@@ -1056,7 +1058,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
|
||||
String file1 = statuses.get(0).getFileId();
|
||||
assertEquals(100,
|
||||
readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath()))
|
||||
BaseFileUtils.getInstance(metaClient).readRowKeys(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath()))
|
||||
.size(), "file should contain 100 records");
|
||||
|
||||
// Delete 20 among 100 inserted
|
||||
@@ -1356,13 +1358,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify data in parquet files matches expected records and commit time.
|
||||
* Verify data in base files matches expected records and commit time.
|
||||
*/
|
||||
private void verifyRecordsWritten(String commitTime, List<HoodieRecord> expectedRecords, List<WriteStatus> allStatus) {
|
||||
List<GenericRecord> records = new ArrayList<>();
|
||||
for (WriteStatus status : allStatus) {
|
||||
Path filePath = new Path(basePath, status.getStat().getPath());
|
||||
records.addAll(ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), filePath));
|
||||
records.addAll(BaseFileUtils.getInstance(metaClient).readAvroRecords(jsc.hadoopConfiguration(), filePath));
|
||||
}
|
||||
|
||||
Set<String> expectedKeys = recordsToRecordKeySet(expectedRecords);
|
||||
@@ -1410,7 +1412,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
}
|
||||
|
||||
private void testDeletes(SparkRDDWriteClient client, List<HoodieRecord> previousRecords, int sizeToDelete,
|
||||
String existingFile, String instantTime, int exepctedRecords, List<String> keys) {
|
||||
String existingFile, String instantTime, int expectedRecords, List<String> keys) {
|
||||
client.startCommitWithTime(instantTime);
|
||||
|
||||
List<HoodieKey> hoodieKeysToDelete = randomSelectAsHoodieKeys(previousRecords, sizeToDelete);
|
||||
@@ -1427,16 +1429,16 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
for (int i = 0; i < fullPartitionPaths.length; i++) {
|
||||
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
|
||||
}
|
||||
assertEquals(exepctedRecords,
|
||||
assertEquals(expectedRecords,
|
||||
HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
|
||||
"Must contain " + exepctedRecords + " records");
|
||||
"Must contain " + expectedRecords + " records");
|
||||
|
||||
Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
|
||||
assertEquals(exepctedRecords,
|
||||
readRowKeysFromParquet(hadoopConf, newFile).size(),
|
||||
assertEquals(expectedRecords,
|
||||
BaseFileUtils.getInstance(metaClient).readRowKeys(hadoopConf, newFile).size(),
|
||||
"file should contain 110 records");
|
||||
|
||||
List<GenericRecord> records = ParquetUtils.readAvroRecords(hadoopConf, newFile);
|
||||
List<GenericRecord> records = BaseFileUtils.getInstance(metaClient).readAvroRecords(hadoopConf, newFile);
|
||||
for (GenericRecord record : records) {
|
||||
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
|
||||
assertTrue(keys.contains(recordKey), "key expected to be part of " + instantTime);
|
||||
@@ -1491,7 +1493,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
assertTrue(testTable.commitExists(instantTime),
|
||||
"After explicit commit, commit file should be created");
|
||||
|
||||
// Get parquet file paths from commit metadata
|
||||
// Get base file paths from commit metadata
|
||||
String actionType = metaClient.getCommitActionType();
|
||||
HoodieInstant commitInstant = new HoodieInstant(false, actionType, instantTime);
|
||||
HoodieTimeline commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants();
|
||||
|
||||
@@ -113,7 +113,7 @@ public class TestHoodieReadClient extends HoodieClientTestBase {
|
||||
assertEquals(100, filteredRDD.collect().size());
|
||||
|
||||
JavaRDD<HoodieRecord> smallRecordsRDD = jsc.parallelize(records.subList(0, 75), 1);
|
||||
// We create three parquet file, each having one record. (3 different partitions)
|
||||
// We create three base file, each having one record. (3 different partitions)
|
||||
List<WriteStatus> statuses = writeFn.apply(writeClient, smallRecordsRDD, newCommitTime).collect();
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
@@ -25,7 +25,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||
import org.apache.hudi.common.testutils.RawTripTestPayload;
|
||||
import org.apache.hudi.common.util.ParquetUtils;
|
||||
import org.apache.hudi.common.util.BaseFileUtils;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieUpsertException;
|
||||
import org.apache.hudi.io.HoodieCreateHandle;
|
||||
@@ -36,7 +36,6 @@ import org.apache.hudi.testutils.HoodieClientTestHarness;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.parquet.avro.AvroReadSupport;
|
||||
import org.apache.parquet.io.InvalidRecordException;
|
||||
import org.apache.parquet.io.ParquetDecodingException;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
@@ -123,9 +122,10 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness {
|
||||
Executable executable = () -> {
|
||||
HoodieMergeHandle mergeHandle = new HoodieMergeHandle(updateTable.getConfig(), "101", updateTable,
|
||||
updateRecords.iterator(), updateRecords.get(0).getPartitionPath(), insertResult.getFileId(), supplier);
|
||||
AvroReadSupport.setAvroReadSchema(updateTable.getHadoopConf(), mergeHandle.getWriterSchemaWithMetafields());
|
||||
List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(updateTable.getHadoopConf(),
|
||||
new Path(updateTable.getConfig().getBasePath() + "/" + insertResult.getStat().getPath()));
|
||||
List<GenericRecord> oldRecords = BaseFileUtils.getInstance(updateTable.getBaseFileFormat())
|
||||
.readAvroRecords(updateTable.getHadoopConf(),
|
||||
new Path(updateTable.getConfig().getBasePath() + "/" + insertResult.getStat().getPath()),
|
||||
mergeHandle.getWriterSchemaWithMetafields());
|
||||
for (GenericRecord rec : oldRecords) {
|
||||
mergeHandle.write(rec);
|
||||
}
|
||||
|
||||
@@ -754,7 +754,7 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
List<HoodieCleanStat> hoodieCleanStats = runCleaner(config);
|
||||
assertEquals(3,
|
||||
getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles()
|
||||
.size(), "Must clean three files, one parquet and 2 log files");
|
||||
.size(), "Must clean three files, one base and 2 log files");
|
||||
assertFalse(testTable.baseFileExists(p0, "000", file1P0));
|
||||
assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2));
|
||||
assertTrue(testTable.baseFileExists(p0, "001", file1P0));
|
||||
@@ -797,7 +797,7 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
List<HoodieCleanStat> hoodieCleanStats = runCleaner(config);
|
||||
assertEquals(3,
|
||||
getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles()
|
||||
.size(), "Must clean three files, one parquet and 2 log files");
|
||||
.size(), "Must clean three files, one base and 2 log files");
|
||||
assertFalse(testTable.baseFileExists(p0, "000", file1P0));
|
||||
assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2));
|
||||
assertTrue(testTable.baseFileExists(p0, "001", file1P0));
|
||||
@@ -935,8 +935,9 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
String partition1 = DEFAULT_PARTITION_PATHS[0];
|
||||
String partition2 = DEFAULT_PARTITION_PATHS[1];
|
||||
|
||||
String fileName1 = "data1_1_000.parquet";
|
||||
String fileName2 = "data2_1_000.parquet";
|
||||
String extension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
|
||||
String fileName1 = "data1_1_000" + extension;
|
||||
String fileName2 = "data2_1_000" + extension;
|
||||
|
||||
String filePath1 = metaClient.getBasePath() + "/" + partition1 + "/" + fileName1;
|
||||
String filePath2 = metaClient.getBasePath() + "/" + partition1 + "/" + fileName2;
|
||||
@@ -1025,8 +1026,9 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
String partition1 = DEFAULT_PARTITION_PATHS[0];
|
||||
String partition2 = DEFAULT_PARTITION_PATHS[1];
|
||||
|
||||
String fileName1 = "data1_1_000.parquet";
|
||||
String fileName2 = "data2_1_000.parquet";
|
||||
String extension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
|
||||
String fileName1 = "data1_1_000" + extension;
|
||||
String fileName2 = "data2_1_000" + extension;
|
||||
|
||||
Map<String, List<String>> filesToBeCleanedPerPartition = new HashMap<>();
|
||||
filesToBeCleanedPerPartition.put(partition1, Arrays.asList(fileName1));
|
||||
@@ -1314,7 +1316,7 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
|
||||
.build();
|
||||
// Deletions:
|
||||
// . FileId Parquet Logs Total Retained Commits
|
||||
// . FileId Base Logs Total Retained Commits
|
||||
// FileId7 5 10 15 009, 011
|
||||
// FileId6 5 10 15 009
|
||||
// FileId5 3 6 9 005
|
||||
@@ -1338,7 +1340,7 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(2).build())
|
||||
.build();
|
||||
// Deletions:
|
||||
// . FileId Parquet Logs Total Retained Commits
|
||||
// . FileId Base Logs Total Retained Commits
|
||||
// FileId7 5 10 15 009, 011
|
||||
// FileId6 4 8 12 007, 009
|
||||
// FileId5 2 4 6 003 005
|
||||
|
||||
@@ -22,6 +22,7 @@ import org.apache.hudi.common.fs.ConsistencyGuard;
|
||||
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
|
||||
import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
|
||||
import org.apache.hudi.common.fs.OptimisticConsistencyGuard;
|
||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||
import org.apache.hudi.common.testutils.FileCreateUtils;
|
||||
import org.apache.hudi.testutils.HoodieClientTestHarness;
|
||||
|
||||
@@ -44,6 +45,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
*/
|
||||
public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
|
||||
private static final String BASE_FILE_EXTENSION = HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension();
|
||||
|
||||
// multiple parameters, uses Collection<Object[]>
|
||||
public static List<Arguments> consistencyGuardType() {
|
||||
return Arrays.asList(
|
||||
@@ -73,17 +76,19 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
ConsistencyGuardConfig config = getConsistencyGuardConfig(1, 1000, 1000);
|
||||
ConsistencyGuard passing = consistencyGuardType.equals(FailSafeConsistencyGuard.class.getName())
|
||||
? new FailSafeConsistencyGuard(fs, config) : new OptimisticConsistencyGuard(fs, config);
|
||||
passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"));
|
||||
passing.waitTillFileAppears(new Path(basePath + "/partition/path/f2_1-0-1_000.parquet"));
|
||||
passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-1_000" + BASE_FILE_EXTENSION));
|
||||
passing.waitTillFileAppears(new Path(basePath + "/partition/path/f2_1-0-1_000" + BASE_FILE_EXTENSION));
|
||||
passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays
|
||||
.asList(basePath + "/partition/path/f1_1-0-1_000.parquet", basePath + "/partition/path/f2_1-0-1_000.parquet"));
|
||||
.asList(basePath + "/partition/path/f1_1-0-1_000" + BASE_FILE_EXTENSION,
|
||||
basePath + "/partition/path/f2_1-0-1_000" + BASE_FILE_EXTENSION));
|
||||
|
||||
fs.delete(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"), false);
|
||||
fs.delete(new Path(basePath + "/partition/path/f2_1-0-1_000.parquet"), false);
|
||||
passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"));
|
||||
passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f2_1-0-1_000.parquet"));
|
||||
fs.delete(new Path(basePath + "/partition/path/f1_1-0-1_000" + BASE_FILE_EXTENSION), false);
|
||||
fs.delete(new Path(basePath + "/partition/path/f2_1-0-1_000" + BASE_FILE_EXTENSION), false);
|
||||
passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000" + BASE_FILE_EXTENSION));
|
||||
passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f2_1-0-1_000" + BASE_FILE_EXTENSION));
|
||||
passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays
|
||||
.asList(basePath + "/partition/path/f1_1-0-1_000.parquet", basePath + "/partition/path/f2_1-0-1_000.parquet"));
|
||||
.asList(basePath + "/partition/path/f1_1-0-1_000" + BASE_FILE_EXTENSION,
|
||||
basePath + "/partition/path/f2_1-0-1_000" + BASE_FILE_EXTENSION));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -92,7 +97,8 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
assertThrows(TimeoutException.class, () -> {
|
||||
passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays
|
||||
.asList(basePath + "/partition/path/f1_1-0-2_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet"));
|
||||
.asList(basePath + "/partition/path/f1_1-0-2_000" + BASE_FILE_EXTENSION,
|
||||
basePath + "/partition/path/f2_1-0-2_000" + BASE_FILE_EXTENSION));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -101,7 +107,8 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
|
||||
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays
|
||||
.asList(basePath + "/partition/path/f1_1-0-2_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet"));
|
||||
.asList(basePath + "/partition/path/f1_1-0-2_000" + BASE_FILE_EXTENSION,
|
||||
basePath + "/partition/path/f2_1-0-2_000" + BASE_FILE_EXTENSION));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -109,7 +116,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
|
||||
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
assertThrows(TimeoutException.class, () -> {
|
||||
passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet"));
|
||||
passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000" + BASE_FILE_EXTENSION));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -117,7 +124,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
public void testCheckFailingAppearsTimedWait() throws Exception {
|
||||
FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
|
||||
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet"));
|
||||
passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000" + BASE_FILE_EXTENSION));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -126,7 +133,8 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
assertThrows(TimeoutException.class, () -> {
|
||||
passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays
|
||||
.asList(basePath + "/partition/path/f1_1-0-1_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet"));
|
||||
.asList(basePath + "/partition/path/f1_1-0-1_000" + BASE_FILE_EXTENSION,
|
||||
basePath + "/partition/path/f2_1-0-2_000" + BASE_FILE_EXTENSION));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -135,7 +143,8 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
|
||||
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays
|
||||
.asList(basePath + "/partition/path/f1_1-0-1_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet"));
|
||||
.asList(basePath + "/partition/path/f1_1-0-1_000" + BASE_FILE_EXTENSION,
|
||||
basePath + "/partition/path/f2_1-0-2_000" + BASE_FILE_EXTENSION));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -144,7 +153,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
|
||||
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
assertThrows(TimeoutException.class, () -> {
|
||||
passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"));
|
||||
passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000" + BASE_FILE_EXTENSION));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -153,7 +162,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
|
||||
FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
|
||||
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"));
|
||||
passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000" + BASE_FILE_EXTENSION));
|
||||
}
|
||||
|
||||
private ConsistencyGuardConfig getConsistencyGuardConfig() {
|
||||
|
||||
@@ -33,6 +33,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
@@ -131,7 +132,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
|
||||
@BeforeEach
|
||||
public void init() throws IOException {
|
||||
init(HoodieFileFormat.PARQUET);
|
||||
init(HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
@@ -345,7 +346,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
List<HoodieRecord> records004 = dataGen.generateUpdates(updateTime, 100);
|
||||
updateRecords(records004, client, cfg, updateTime);
|
||||
|
||||
// verify RO incremental reads - only one parquet file shows up because updates to into log files
|
||||
// verify RO incremental reads - only one base file shows up because updates to into log files
|
||||
incrementalROFiles = getROIncrementalFiles(partitionPath, false);
|
||||
validateFiles(partitionPath, 1, incrementalROFiles, false, roJobConf, 200, commitTime1);
|
||||
assertEquals(firstFilePath, incrementalROFiles[0].getPath());
|
||||
@@ -358,7 +359,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
String compactionCommitTime = "005";
|
||||
client.scheduleCompactionAtInstant("005", Option.empty());
|
||||
|
||||
// verify RO incremental reads - only one parquet file shows up because updates go into log files
|
||||
// verify RO incremental reads - only one base file shows up because updates go into log files
|
||||
incrementalROFiles = getROIncrementalFiles(partitionPath, true);
|
||||
validateFiles(partitionPath,1, incrementalROFiles, false, roJobConf, 200, commitTime1);
|
||||
|
||||
@@ -436,7 +437,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
|
||||
|
||||
/**
|
||||
* Write 1 (only inserts, written as parquet file)
|
||||
* Write 1 (only inserts, written as base file)
|
||||
*/
|
||||
String newCommitTime = "001";
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
@@ -465,7 +466,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||
dataFilesToRead = tableView.getLatestBaseFiles();
|
||||
assertTrue(dataFilesToRead.findAny().isPresent(),
|
||||
"should list the parquet files we wrote in the delta commit");
|
||||
"should list the base files we wrote in the delta commit");
|
||||
|
||||
/**
|
||||
* Write 2 (only updates, written to .log file)
|
||||
@@ -613,7 +614,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||
dataFilesToRead = tableView.getLatestBaseFiles();
|
||||
assertTrue(dataFilesToRead.findAny().isPresent(),
|
||||
"should list the parquet files we wrote in the delta commit");
|
||||
"should list the base files we wrote in the delta commit");
|
||||
|
||||
/**
|
||||
* Write 2 (inserts + updates - testing failed delta commit)
|
||||
@@ -630,7 +631,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
|
||||
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles,
|
||||
basePath);
|
||||
assertEquals(recordsRead.size(), 200);
|
||||
assertEquals(200, recordsRead.size());
|
||||
|
||||
statuses = secondClient.upsert(jsc.parallelize(copyOfRecords, 1), commitTime1).collect();
|
||||
// Verify there are no errors
|
||||
@@ -674,7 +675,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
// Test successful delta commit rollback
|
||||
thirdClient.rollback(commitTime2);
|
||||
allFiles = listAllBaseFilesInPath(hoodieTable);
|
||||
// After rollback, there should be no parquet file with the failed commit time
|
||||
// After rollback, there should be no base file with the failed commit time
|
||||
assertEquals(0, Arrays.stream(allFiles)
|
||||
.filter(file -> file.getPath().getName().contains(commitTime2)).count());
|
||||
|
||||
@@ -768,7 +769,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||
dataFilesToRead = tableView.getLatestBaseFiles();
|
||||
assertTrue(dataFilesToRead.findAny().isPresent(),
|
||||
"Should list the parquet files we wrote in the delta commit");
|
||||
"Should list the base files we wrote in the delta commit");
|
||||
|
||||
/**
|
||||
* Write 2 (inserts + updates)
|
||||
@@ -901,7 +902,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
|
||||
|
||||
/**
|
||||
* Write 1 (only inserts, written as parquet file)
|
||||
* Write 1 (only inserts, written as base file)
|
||||
*/
|
||||
String newCommitTime = "001";
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
@@ -926,17 +927,17 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient,
|
||||
metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles);
|
||||
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
|
||||
Map<String, Long> parquetFileIdToSize =
|
||||
Map<String, Long> fileIdToSize =
|
||||
dataFilesToRead.collect(Collectors.toMap(HoodieBaseFile::getFileId, HoodieBaseFile::getFileSize));
|
||||
|
||||
roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||
dataFilesToRead = roView.getLatestBaseFiles();
|
||||
List<HoodieBaseFile> dataFilesList = dataFilesToRead.collect(Collectors.toList());
|
||||
assertTrue(dataFilesList.size() > 0,
|
||||
"Should list the parquet files we wrote in the delta commit");
|
||||
"Should list the base files we wrote in the delta commit");
|
||||
|
||||
/**
|
||||
* Write 2 (only updates + inserts, written to .log file + correction of existing parquet file size)
|
||||
* Write 2 (only updates + inserts, written to .log file + correction of existing base file size)
|
||||
*/
|
||||
newCommitTime = "002";
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
@@ -961,10 +962,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), allFiles);
|
||||
dataFilesToRead = roView.getLatestBaseFiles();
|
||||
List<HoodieBaseFile> newDataFilesList = dataFilesToRead.collect(Collectors.toList());
|
||||
Map<String, Long> parquetFileIdToNewSize =
|
||||
Map<String, Long> fileIdToNewSize =
|
||||
newDataFilesList.stream().collect(Collectors.toMap(HoodieBaseFile::getFileId, HoodieBaseFile::getFileSize));
|
||||
|
||||
assertTrue(parquetFileIdToNewSize.entrySet().stream().anyMatch(entry -> parquetFileIdToSize.get(entry.getKey()) < entry.getValue()));
|
||||
assertTrue(fileIdToNewSize.entrySet().stream().anyMatch(entry -> fileIdToSize.get(entry.getKey()) < entry.getValue()));
|
||||
|
||||
List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
|
||||
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles,
|
||||
@@ -1082,8 +1083,9 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
// Do a compaction
|
||||
String instantTime = writeClient.scheduleCompaction(Option.empty()).get().toString();
|
||||
statuses = (JavaRDD<WriteStatus>) writeClient.compact(instantTime);
|
||||
assertEquals(statuses.map(status -> status.getStat().getPath().contains("parquet")).count(), numLogFiles);
|
||||
assertEquals(statuses.count(), numLogFiles);
|
||||
String extension = table.getBaseFileExtension();
|
||||
assertEquals(numLogFiles, statuses.map(status -> status.getStat().getPath().contains(extension)).count());
|
||||
assertEquals(numLogFiles, statuses.count());
|
||||
writeClient.commitCompaction(instantTime, statuses, Option.empty());
|
||||
}
|
||||
}
|
||||
@@ -1215,9 +1217,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
// Do a compaction
|
||||
newCommitTime = writeClient.scheduleCompaction(Option.empty()).get().toString();
|
||||
statuses = (JavaRDD<WriteStatus>) writeClient.compact(newCommitTime);
|
||||
// Ensure all log files have been compacted into parquet files
|
||||
assertEquals(statuses.map(status -> status.getStat().getPath().contains("parquet")).count(), numLogFiles);
|
||||
assertEquals(statuses.count(), numLogFiles);
|
||||
// Ensure all log files have been compacted into base files
|
||||
String extension = table.getBaseFileExtension();
|
||||
assertEquals(numLogFiles, statuses.map(status -> status.getStat().getPath().contains(extension)).count());
|
||||
assertEquals(numLogFiles, statuses.count());
|
||||
//writeClient.commitCompaction(newCommitTime, statuses, Option.empty());
|
||||
// Trigger a rollback of compaction
|
||||
table.getActiveTimeline().reload();
|
||||
@@ -1463,7 +1466,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
|
||||
|
||||
/**
|
||||
* Write 1 (only inserts, written as parquet file)
|
||||
* Write 1 (only inserts, written as base file)
|
||||
*/
|
||||
String newCommitTime = "001";
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
@@ -1493,7 +1496,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||
dataFilesToRead = roView.getLatestBaseFiles();
|
||||
assertTrue(dataFilesToRead.findAny().isPresent(),
|
||||
"should list the parquet files we wrote in the delta commit");
|
||||
"should list the base files we wrote in the delta commit");
|
||||
|
||||
/**
|
||||
* Write 2 (only updates, written to .log file)
|
||||
@@ -1603,7 +1606,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||
dataFilesToRead = roView.getLatestBaseFiles();
|
||||
assertTrue(dataFilesToRead.findAny().isPresent(),
|
||||
"should list the parquet files we wrote in the delta commit");
|
||||
"should list the base files we wrote in the delta commit");
|
||||
}
|
||||
|
||||
private void updateRecords(List<HoodieRecord> records, SparkRDDWriteClient client,
|
||||
|
||||
@@ -18,6 +18,8 @@
|
||||
|
||||
package org.apache.hudi.table.action.bootstrap;
|
||||
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import org.apache.hudi.avro.model.HoodieFileStatus;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
@@ -47,13 +49,15 @@ public class TestBootstrapUtils extends HoodieClientTestBase {
|
||||
});
|
||||
|
||||
// Files inside partitions and marker directories
|
||||
List<String> files = Arrays.asList(
|
||||
"2016/04/15/1_1-0-1_20190528120000.parquet",
|
||||
"2016/04/15/2_1-0-1_20190528120000.parquet",
|
||||
"2016/05/16/3_1-0-1_20190528120000.parquet",
|
||||
"2016/05/16/4_1-0-1_20190528120000.parquet",
|
||||
"2016/04/17/5_1-0-1_20190528120000.parquet",
|
||||
"2016/04/17/6_1-0-1_20190528120000.parquet");
|
||||
List<String> files = Stream.of(
|
||||
"2016/04/15/1_1-0-1_20190528120000",
|
||||
"2016/04/15/2_1-0-1_20190528120000",
|
||||
"2016/05/16/3_1-0-1_20190528120000",
|
||||
"2016/05/16/4_1-0-1_20190528120000",
|
||||
"2016/04/17/5_1-0-1_20190528120000",
|
||||
"2016/04/17/6_1-0-1_20190528120000")
|
||||
.map(file -> file + metaClient.getTableConfig().getBaseFileFormat().getFileExtension())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
files.forEach(f -> {
|
||||
try {
|
||||
|
||||
@@ -29,8 +29,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||
import org.apache.hudi.common.testutils.RawTripTestPayload;
|
||||
import org.apache.hudi.common.testutils.Transformations;
|
||||
import org.apache.hudi.common.util.BaseFileUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ParquetUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.config.HoodieStorageConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
@@ -154,14 +154,14 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
|
||||
assertEquals(1, allFiles.length);
|
||||
|
||||
// Read out the bloom filter and make sure filter can answer record exist or not
|
||||
Path parquetFilePath = allFiles[0].getPath();
|
||||
BloomFilter filter = ParquetUtils.readBloomFilterFromParquetMetadata(hadoopConf, parquetFilePath);
|
||||
Path filePath = allFiles[0].getPath();
|
||||
BloomFilter filter = BaseFileUtils.getInstance(table.getBaseFileFormat()).readBloomFilterFromMetadata(hadoopConf, filePath);
|
||||
for (HoodieRecord record : records) {
|
||||
assertTrue(filter.mightContain(record.getRecordKey()));
|
||||
}
|
||||
|
||||
// Read the parquet file, check the record content
|
||||
List<GenericRecord> fileRecords = ParquetUtils.readAvroRecords(hadoopConf, parquetFilePath);
|
||||
// Read the base file, check the record content
|
||||
List<GenericRecord> fileRecords = BaseFileUtils.getInstance(table.getBaseFileFormat()).readAvroRecords(hadoopConf, filePath);
|
||||
GenericRecord newRecord;
|
||||
int index = 0;
|
||||
for (GenericRecord record : fileRecords) {
|
||||
@@ -192,12 +192,12 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
|
||||
allFiles = getIncrementalFiles(partitionPath, firstCommitTime, -1);
|
||||
assertEquals(1, allFiles.length);
|
||||
// verify new incremental file group is same as the previous one
|
||||
assertEquals(FSUtils.getFileId(parquetFilePath.getName()), FSUtils.getFileId(allFiles[0].getPath().getName()));
|
||||
assertEquals(FSUtils.getFileId(filePath.getName()), FSUtils.getFileId(allFiles[0].getPath().getName()));
|
||||
|
||||
// Check whether the record has been updated
|
||||
Path updatedParquetFilePath = allFiles[0].getPath();
|
||||
Path updatedFilePath = allFiles[0].getPath();
|
||||
BloomFilter updatedFilter =
|
||||
ParquetUtils.readBloomFilterFromParquetMetadata(hadoopConf, updatedParquetFilePath);
|
||||
BaseFileUtils.getInstance(metaClient).readBloomFilterFromMetadata(hadoopConf, updatedFilePath);
|
||||
for (HoodieRecord record : records) {
|
||||
// No change to the _row_key
|
||||
assertTrue(updatedFilter.mightContain(record.getRecordKey()));
|
||||
@@ -206,7 +206,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
|
||||
assertTrue(updatedFilter.mightContain(insertedRecord1.getRecordKey()));
|
||||
records.add(insertedRecord1);// add this so it can further check below
|
||||
|
||||
ParquetReader updatedReader = ParquetReader.builder(new AvroReadSupport<>(), updatedParquetFilePath).build();
|
||||
ParquetReader updatedReader = ParquetReader.builder(new AvroReadSupport<>(), updatedFilePath).build();
|
||||
index = 0;
|
||||
while ((newRecord = (GenericRecord) updatedReader.read()) != null) {
|
||||
assertEquals(newRecord.get("_row_key").toString(), records.get(index).getRecordKey());
|
||||
@@ -393,7 +393,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
|
||||
// Check the updated file
|
||||
int counts = 0;
|
||||
for (File file : Paths.get(basePath, "2016/01/31").toFile().listFiles()) {
|
||||
if (file.getName().endsWith(".parquet") && FSUtils.getCommitTime(file.getName()).equals(instantTime)) {
|
||||
if (file.getName().endsWith(table.getBaseFileExtension()) && FSUtils.getCommitTime(file.getName()).equals(instantTime)) {
|
||||
LOG.info(file.getName() + "-" + file.length());
|
||||
counts++;
|
||||
}
|
||||
|
||||
@@ -135,7 +135,7 @@ public class CompactionTestBase extends HoodieClientTestBase {
|
||||
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
|
||||
List<HoodieBaseFile> dataFilesToRead = getCurrentLatestBaseFiles(hoodieTable);
|
||||
assertTrue(dataFilesToRead.stream().findAny().isPresent(),
|
||||
"should list the parquet files we wrote in the delta commit");
|
||||
"should list the base files we wrote in the delta commit");
|
||||
validateDeltaCommit(firstInstant, fgIdToCompactionOperation, cfg);
|
||||
}
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.FileSlice;
|
||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||
import org.apache.hudi.common.model.HoodieFileGroupId;
|
||||
import org.apache.hudi.common.model.HoodieLogFile;
|
||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
@@ -274,7 +275,7 @@ public class TestHoodieCompactionStrategy {
|
||||
private final long size;
|
||||
|
||||
public TestHoodieBaseFile(long size) {
|
||||
super("/tmp/XYXYXYXYXYYX_11_20180918020003.parquet");
|
||||
super("/tmp/XYXYXYXYXYYX_11_20180918020003" + HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension());
|
||||
this.size = size;
|
||||
}
|
||||
|
||||
|
||||
@@ -67,7 +67,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
|
||||
final String p1 = "2015/03/16";
|
||||
final String p2 = "2015/03/17";
|
||||
final String p3 = "2016/03/15";
|
||||
// Let's create some commit files and parquet files
|
||||
// Let's create some commit files and base files
|
||||
HoodieTestTable testTable = HoodieTestTable.of(metaClient)
|
||||
.withPartitionMetaFiles(p1, p2, p3)
|
||||
.addCommit("001")
|
||||
|
||||
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hudi.common.HoodieRollbackStat;
|
||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
@@ -37,6 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
|
||||
|
||||
public class TestRollbackUtils {
|
||||
private static final String BASE_FILE_EXTENSION = HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension();
|
||||
|
||||
private FileStatus generateFileStatus(String filePath) {
|
||||
Path dataFile1Path = new Path(filePath);
|
||||
@@ -62,15 +64,15 @@ public class TestRollbackUtils {
|
||||
String partitionPath2 = "/partitionPath2/";
|
||||
//prepare HoodieRollbackStat for different partition
|
||||
Map<FileStatus, Boolean> dataFilesOnlyStat1Files = new HashMap<>();
|
||||
dataFilesOnlyStat1Files.put(generateFileStatus(partitionPath1 + "dataFile1.parquet"), true);
|
||||
dataFilesOnlyStat1Files.put(generateFileStatus(partitionPath1 + "dataFile2.parquet"), true);
|
||||
dataFilesOnlyStat1Files.put(generateFileStatus(partitionPath1 + "dataFile1" + BASE_FILE_EXTENSION), true);
|
||||
dataFilesOnlyStat1Files.put(generateFileStatus(partitionPath1 + "dataFile2" + BASE_FILE_EXTENSION), true);
|
||||
HoodieRollbackStat dataFilesOnlyStat1 = HoodieRollbackStat.newBuilder()
|
||||
.withPartitionPath(partitionPath1)
|
||||
.withDeletedFileResults(dataFilesOnlyStat1Files).build();
|
||||
|
||||
Map<FileStatus, Boolean> dataFilesOnlyStat2Files = new HashMap<>();
|
||||
dataFilesOnlyStat2Files.put(generateFileStatus(partitionPath2 + "dataFile1.parquet"), true);
|
||||
dataFilesOnlyStat2Files.put(generateFileStatus(partitionPath2 + "dataFile2.parquet"), true);
|
||||
dataFilesOnlyStat2Files.put(generateFileStatus(partitionPath2 + "dataFile1" + BASE_FILE_EXTENSION), true);
|
||||
dataFilesOnlyStat2Files.put(generateFileStatus(partitionPath2 + "dataFile2" + BASE_FILE_EXTENSION), true);
|
||||
HoodieRollbackStat dataFilesOnlyStat2 = HoodieRollbackStat.newBuilder()
|
||||
.withPartitionPath(partitionPath2)
|
||||
.withDeletedFileResults(dataFilesOnlyStat1Files).build();
|
||||
@@ -83,7 +85,7 @@ public class TestRollbackUtils {
|
||||
//prepare HoodieRollbackStat for failed and block append
|
||||
Map<FileStatus, Boolean> dataFilesOnlyStat3Files = new HashMap<>();
|
||||
dataFilesOnlyStat3Files.put(generateFileStatus(partitionPath1 + "dataFile1.log"), true);
|
||||
dataFilesOnlyStat3Files.put(generateFileStatus(partitionPath1 + "dataFile3.parquet"), false);
|
||||
dataFilesOnlyStat3Files.put(generateFileStatus(partitionPath1 + "dataFile3" + BASE_FILE_EXTENSION), false);
|
||||
HoodieRollbackStat dataFilesOnlyStat3 = HoodieRollbackStat.newBuilder()
|
||||
.withPartitionPath(partitionPath1)
|
||||
.withDeletedFileResults(dataFilesOnlyStat3Files).build();
|
||||
@@ -98,10 +100,10 @@ public class TestRollbackUtils {
|
||||
HoodieRollbackStat dataFilesOnlyStatMerge1 =
|
||||
RollbackUtils.mergeRollbackStat(dataFilesOnlyStat1, dataFilesOnlyStat3);
|
||||
assertEquals(partitionPath1, dataFilesOnlyStatMerge1.getPartitionPath());
|
||||
assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile3.parquet"),
|
||||
assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile3" + BASE_FILE_EXTENSION),
|
||||
dataFilesOnlyStatMerge1.getFailedDeleteFiles());
|
||||
assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile1.parquet",
|
||||
partitionPath1 + "dataFile2.parquet", partitionPath1 + "dataFile1.log").stream().sorted().collect(Collectors.toList()),
|
||||
assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile1" + BASE_FILE_EXTENSION,
|
||||
partitionPath1 + "dataFile2" + BASE_FILE_EXTENSION, partitionPath1 + "dataFile1.log").stream().sorted().collect(Collectors.toList()),
|
||||
dataFilesOnlyStatMerge1.getSuccessDeleteFiles().stream().sorted().collect(Collectors.toList()));
|
||||
assertEquals(0, dataFilesOnlyStatMerge1.getCommandBlocksCount().size());
|
||||
|
||||
@@ -109,10 +111,10 @@ public class TestRollbackUtils {
|
||||
HoodieRollbackStat dataFilesOnlyStatMerge2 =
|
||||
RollbackUtils.mergeRollbackStat(dataFilesOnlyStatMerge1, dataFilesOnlyStat4);
|
||||
assertEquals(partitionPath1, dataFilesOnlyStatMerge1.getPartitionPath());
|
||||
assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile3.parquet").stream().sorted().collect(Collectors.toList()),
|
||||
assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile3" + BASE_FILE_EXTENSION).stream().sorted().collect(Collectors.toList()),
|
||||
dataFilesOnlyStatMerge2.getFailedDeleteFiles().stream().sorted().collect(Collectors.toList()));
|
||||
assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile1.parquet",
|
||||
partitionPath1 + "dataFile2.parquet", partitionPath1 + "dataFile1.log").stream().sorted().collect(Collectors.toList()),
|
||||
assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile1" + BASE_FILE_EXTENSION,
|
||||
partitionPath1 + "dataFile2" + BASE_FILE_EXTENSION, partitionPath1 + "dataFile1.log").stream().sorted().collect(Collectors.toList()),
|
||||
dataFilesOnlyStatMerge2.getSuccessDeleteFiles().stream().sorted().collect(Collectors.toList()));
|
||||
assertEquals(CollectionUtils.createImmutableMap(generateFileStatus(partitionPath1 + "dataFile1.log"), 10L),
|
||||
dataFilesOnlyStatMerge2.getCommandBlocksCount());
|
||||
|
||||
@@ -49,7 +49,6 @@ import org.junit.jupiter.api.io.TempDir;
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
|
||||
import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
|
||||
import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME;
|
||||
|
||||
@@ -120,7 +119,6 @@ public class FunctionalTestHarness implements SparkProvider, DFSProvider, Hoodie
|
||||
.setTableName(RAW_TRIPS_TEST_NAME)
|
||||
.setTableType(COPY_ON_WRITE)
|
||||
.setPayloadClass(HoodieAvroPayload.class)
|
||||
.setBaseFileFormat(PARQUET.toString())
|
||||
.fromProperties(props)
|
||||
.build();
|
||||
return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, props);
|
||||
|
||||
@@ -32,6 +32,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
import org.apache.hudi.common.model.HoodieLogFile;
|
||||
import org.apache.hudi.common.model.HoodiePartitionMetadata;
|
||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||
@@ -130,7 +131,8 @@ public class FSUtils {
|
||||
|
||||
// TODO: this should be removed
|
||||
public static String makeDataFileName(String instantTime, String writeToken, String fileId) {
|
||||
return String.format("%s_%s_%s%s", fileId, writeToken, instantTime, HoodieFileFormat.PARQUET.getFileExtension());
|
||||
return String.format("%s_%s_%s%s", fileId, writeToken, instantTime,
|
||||
HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension());
|
||||
}
|
||||
|
||||
public static String makeDataFileName(String instantTime, String writeToken, String fileId, String fileExtension) {
|
||||
@@ -142,7 +144,7 @@ public class FSUtils {
|
||||
}
|
||||
|
||||
public static String maskWithoutFileId(String instantTime, int taskPartitionId) {
|
||||
return String.format("*_%s_%s%s", taskPartitionId, instantTime, HoodieFileFormat.PARQUET.getFileExtension());
|
||||
return String.format("*_%s_%s%s", taskPartitionId, instantTime, HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension());
|
||||
}
|
||||
|
||||
public static String getCommitFromCommitFile(String commitFileName) {
|
||||
@@ -329,7 +331,7 @@ public class FSUtils {
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the file is a parquet file of a log file. Then get the fileId appropriately.
|
||||
* Check if the file is a base file of a log file. Then get the fileId appropriately.
|
||||
*/
|
||||
public static String getFileIdFromFilePath(Path filePath) {
|
||||
if (FSUtils.isLogFile(filePath)) {
|
||||
|
||||
@@ -0,0 +1,73 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.common.util;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.common.bloom.BloomFilter;
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
|
||||
public abstract class BaseFileUtils {
|
||||
|
||||
public static BaseFileUtils getInstance(String path) {
|
||||
if (path.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
|
||||
return new ParquetUtils();
|
||||
}
|
||||
throw new UnsupportedOperationException("The format for file " + path + " is not supported yet.");
|
||||
}
|
||||
|
||||
public static BaseFileUtils getInstance(HoodieFileFormat fileFormat) {
|
||||
if (HoodieFileFormat.PARQUET.equals(fileFormat)) {
|
||||
return new ParquetUtils();
|
||||
}
|
||||
throw new UnsupportedOperationException(fileFormat.name() + " format not supported yet.");
|
||||
}
|
||||
|
||||
public static BaseFileUtils getInstance(HoodieTableMetaClient metaClient) {
|
||||
return getInstance(metaClient.getTableConfig().getBaseFileFormat());
|
||||
}
|
||||
|
||||
public abstract Set<String> readRowKeys(Configuration configuration, Path filePath);
|
||||
|
||||
public abstract Set<String> filterRowKeys(Configuration configuration, Path filePath, Set<String> filter);
|
||||
|
||||
public abstract List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath);
|
||||
|
||||
public abstract Schema readAvroSchema(Configuration configuration, Path filePath);
|
||||
|
||||
public abstract BloomFilter readBloomFilterFromMetadata(Configuration configuration, Path filePath);
|
||||
|
||||
public abstract String[] readMinMaxRecordKeys(Configuration configuration, Path filePath);
|
||||
|
||||
public abstract List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath);
|
||||
|
||||
public abstract List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath, Schema schema);
|
||||
|
||||
public abstract Map<String, String> readFooter(Configuration conf, boolean required, Path orcFilePath,
|
||||
String... footerNames);
|
||||
|
||||
public abstract long getRowCount(Configuration conf, Path filePath);
|
||||
}
|
||||
@@ -55,7 +55,7 @@ import java.util.function.Function;
|
||||
/**
|
||||
* Utility functions involving with parquet.
|
||||
*/
|
||||
public class ParquetUtils {
|
||||
public class ParquetUtils extends BaseFileUtils {
|
||||
|
||||
/**
|
||||
* Read the rowKey list from the given parquet file.
|
||||
@@ -64,8 +64,9 @@ public class ParquetUtils {
|
||||
* @param configuration configuration to build fs object
|
||||
* @return Set Set of row keys
|
||||
*/
|
||||
public static Set<String> readRowKeysFromParquet(Configuration configuration, Path filePath) {
|
||||
return filterParquetRowKeys(configuration, filePath, new HashSet<>());
|
||||
@Override
|
||||
public Set<String> readRowKeys(Configuration configuration, Path filePath) {
|
||||
return filterRowKeys(configuration, filePath, new HashSet<>());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -77,7 +78,8 @@ public class ParquetUtils {
|
||||
* @param filter record keys filter
|
||||
* @return Set Set of row keys matching candidateRecordKeys
|
||||
*/
|
||||
public static Set<String> filterParquetRowKeys(Configuration configuration, Path filePath, Set<String> filter) {
|
||||
@Override
|
||||
public Set<String> filterRowKeys(Configuration configuration, Path filePath, Set<String> filter) {
|
||||
return filterParquetRowKeys(configuration, filePath, filter, HoodieAvroUtils.getRecordKeySchema());
|
||||
}
|
||||
|
||||
@@ -128,7 +130,8 @@ public class ParquetUtils {
|
||||
* @param configuration configuration to build fs object
|
||||
* @return {@link List} of {@link HoodieKey}s fetched from the parquet file
|
||||
*/
|
||||
public static List<HoodieKey> fetchRecordKeyPartitionPathFromParquet(Configuration configuration, Path filePath) {
|
||||
@Override
|
||||
public List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath) {
|
||||
List<HoodieKey> hoodieKeys = new ArrayList<>();
|
||||
try {
|
||||
if (!filePath.getFileSystem(configuration).exists(filePath)) {
|
||||
@@ -156,7 +159,7 @@ public class ParquetUtils {
|
||||
return hoodieKeys;
|
||||
}
|
||||
|
||||
public static ParquetMetadata readMetadata(Configuration conf, Path parquetFilePath) {
|
||||
public ParquetMetadata readMetadata(Configuration conf, Path parquetFilePath) {
|
||||
ParquetMetadata footer;
|
||||
try {
|
||||
// TODO(vc): Should we use the parallel reading version here?
|
||||
@@ -170,11 +173,12 @@ public class ParquetUtils {
|
||||
/**
|
||||
* Get the schema of the given parquet file.
|
||||
*/
|
||||
public static MessageType readSchema(Configuration configuration, Path parquetFilePath) {
|
||||
public MessageType readSchema(Configuration configuration, Path parquetFilePath) {
|
||||
return readMetadata(configuration, parquetFilePath).getFileMetaData().getSchema();
|
||||
}
|
||||
|
||||
private static Map<String, String> readParquetFooter(Configuration configuration, boolean required,
|
||||
@Override
|
||||
public Map<String, String> readFooter(Configuration configuration, boolean required,
|
||||
Path parquetFilePath, String... footerNames) {
|
||||
Map<String, String> footerVals = new HashMap<>();
|
||||
ParquetMetadata footer = readMetadata(configuration, parquetFilePath);
|
||||
@@ -190,16 +194,18 @@ public class ParquetUtils {
|
||||
return footerVals;
|
||||
}
|
||||
|
||||
public static Schema readAvroSchema(Configuration configuration, Path parquetFilePath) {
|
||||
@Override
|
||||
public Schema readAvroSchema(Configuration configuration, Path parquetFilePath) {
|
||||
return new AvroSchemaConverter(configuration).convert(readSchema(configuration, parquetFilePath));
|
||||
}
|
||||
|
||||
/**
|
||||
* Read out the bloom filter from the parquet file meta data.
|
||||
*/
|
||||
public static BloomFilter readBloomFilterFromParquetMetadata(Configuration configuration, Path parquetFilePath) {
|
||||
@Override
|
||||
public BloomFilter readBloomFilterFromMetadata(Configuration configuration, Path parquetFilePath) {
|
||||
Map<String, String> footerVals =
|
||||
readParquetFooter(configuration, false, parquetFilePath,
|
||||
readFooter(configuration, false, parquetFilePath,
|
||||
HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
|
||||
HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
|
||||
HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE);
|
||||
@@ -220,8 +226,9 @@ public class ParquetUtils {
|
||||
return toReturn;
|
||||
}
|
||||
|
||||
public static String[] readMinMaxRecordKeys(Configuration configuration, Path parquetFilePath) {
|
||||
Map<String, String> minMaxKeys = readParquetFooter(configuration, true, parquetFilePath,
|
||||
@Override
|
||||
public String[] readMinMaxRecordKeys(Configuration configuration, Path parquetFilePath) {
|
||||
Map<String, String> minMaxKeys = readFooter(configuration, true, parquetFilePath,
|
||||
HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER, HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER);
|
||||
if (minMaxKeys.size() != 2) {
|
||||
throw new HoodieException(
|
||||
@@ -235,7 +242,8 @@ public class ParquetUtils {
|
||||
/**
|
||||
* NOTE: This literally reads the entire file contents, thus should be used with caution.
|
||||
*/
|
||||
public static List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath) {
|
||||
@Override
|
||||
public List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath) {
|
||||
ParquetReader reader = null;
|
||||
List<GenericRecord> records = new ArrayList<>();
|
||||
try {
|
||||
@@ -262,13 +270,20 @@ public class ParquetUtils {
|
||||
return records;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath, Schema schema) {
|
||||
AvroReadSupport.setAvroReadSchema(configuration, schema);
|
||||
return readAvroRecords(configuration, filePath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of records in the parquet file.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @param parquetFilePath path of the file
|
||||
*/
|
||||
public static long getRowCount(Configuration conf, Path parquetFilePath) {
|
||||
@Override
|
||||
public long getRowCount(Configuration conf, Path parquetFilePath) {
|
||||
ParquetMetadata footer;
|
||||
long rowCount = 0;
|
||||
footer = readMetadata(conf, parquetFilePath);
|
||||
|
||||
@@ -27,8 +27,9 @@ import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.common.bloom.BloomFilter;
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
import org.apache.hudi.common.util.BaseFileUtils;
|
||||
import org.apache.hudi.common.util.ParquetReaderIterator;
|
||||
import org.apache.hudi.common.util.ParquetUtils;
|
||||
import org.apache.parquet.avro.AvroParquetReader;
|
||||
import org.apache.parquet.avro.AvroReadSupport;
|
||||
import org.apache.parquet.hadoop.ParquetReader;
|
||||
@@ -36,24 +37,26 @@ import org.apache.parquet.hadoop.ParquetReader;
|
||||
public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileReader {
|
||||
private Path path;
|
||||
private Configuration conf;
|
||||
private final BaseFileUtils parquetUtils;
|
||||
|
||||
public HoodieParquetReader(Configuration configuration, Path path) {
|
||||
this.conf = configuration;
|
||||
this.path = path;
|
||||
this.parquetUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
|
||||
}
|
||||
|
||||
public String[] readMinMaxRecordKeys() {
|
||||
return ParquetUtils.readMinMaxRecordKeys(conf, path);
|
||||
return parquetUtils.readMinMaxRecordKeys(conf, path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BloomFilter readBloomFilter() {
|
||||
return ParquetUtils.readBloomFilterFromParquetMetadata(conf, path);
|
||||
return parquetUtils.readBloomFilterFromMetadata(conf, path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> filterRowKeys(Set candidateRowKeys) {
|
||||
return ParquetUtils.filterParquetRowKeys(conf, path, candidateRowKeys);
|
||||
return parquetUtils.filterRowKeys(conf, path, candidateRowKeys);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -65,7 +68,7 @@ public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileR
|
||||
|
||||
@Override
|
||||
public Schema getSchema() {
|
||||
return ParquetUtils.readAvroSchema(conf, path);
|
||||
return parquetUtils.readAvroSchema(conf, path);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -74,6 +77,6 @@ public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileR
|
||||
|
||||
@Override
|
||||
public long getTotalRecords() {
|
||||
return ParquetUtils.getRowCount(conf, path);
|
||||
return parquetUtils.getRowCount(conf, path);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ import org.apache.hudi.common.bootstrap.index.BootstrapIndex.IndexWriter;
|
||||
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
|
||||
import org.apache.hudi.common.model.BootstrapFileMapping;
|
||||
import org.apache.hudi.common.model.HoodieFileGroupId;
|
||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
@@ -62,7 +63,7 @@ public class TestBootstrapIndex extends HoodieCommonTestHarness {
|
||||
|
||||
private static final String[] PARTITIONS = {"2020/03/18", "2020/03/19", "2020/03/20", "2020/03/21"};
|
||||
private static final Set<String> PARTITION_SET = Arrays.stream(PARTITIONS).collect(Collectors.toSet());
|
||||
private static final String BOOTSTRAP_BASE_PATH = "/tmp/source/parquet_tables/table1";
|
||||
private static final String BOOTSTRAP_BASE_PATH = "/tmp/source/data_tables/table1";
|
||||
|
||||
@BeforeEach
|
||||
public void init() throws IOException {
|
||||
@@ -168,7 +169,7 @@ public class TestBootstrapIndex extends HoodieCommonTestHarness {
|
||||
return Arrays.stream(partitions).map(partition -> {
|
||||
return Pair.of(partition, IntStream.range(0, numEntriesPerPartition).mapToObj(idx -> {
|
||||
String hudiFileId = UUID.randomUUID().toString();
|
||||
String sourceFileName = idx + ".parquet";
|
||||
String sourceFileName = idx + HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension();
|
||||
HoodieFileStatus sourceFileStatus = HoodieFileStatus.newBuilder()
|
||||
.setPath(HoodiePath.newBuilder().setUri(sourceBasePath + "/" + partition + "/" + sourceFileName).build())
|
||||
.setLength(256 * 1024 * 1024L)
|
||||
|
||||
@@ -21,6 +21,7 @@ package org.apache.hudi.common.fs;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.common.model.HoodieLogFile;
|
||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
|
||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||
@@ -56,6 +57,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
|
||||
private final long minCleanToKeep = 10;
|
||||
|
||||
private static String TEST_WRITE_TOKEN = "1-0-1";
|
||||
private static final String BASE_FILE_EXTENSION = HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension();
|
||||
|
||||
@Rule
|
||||
public final EnvironmentVariables environmentVariables = new EnvironmentVariables();
|
||||
@@ -69,14 +71,14 @@ public class TestFSUtils extends HoodieCommonTestHarness {
|
||||
public void testMakeDataFileName() {
|
||||
String instantTime = COMMIT_FORMATTER.format(new Date());
|
||||
String fileName = UUID.randomUUID().toString();
|
||||
assertEquals(FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName), fileName + "_" + TEST_WRITE_TOKEN + "_" + instantTime + ".parquet");
|
||||
assertEquals(FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName), fileName + "_" + TEST_WRITE_TOKEN + "_" + instantTime + BASE_FILE_EXTENSION);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaskFileName() {
|
||||
String instantTime = COMMIT_FORMATTER.format(new Date());
|
||||
int taskPartitionId = 2;
|
||||
assertEquals(FSUtils.maskWithoutFileId(instantTime, taskPartitionId), "*_" + taskPartitionId + "_" + instantTime + ".parquet");
|
||||
assertEquals(FSUtils.maskWithoutFileId(instantTime, taskPartitionId), "*_" + taskPartitionId + "_" + instantTime + BASE_FILE_EXTENSION);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -100,9 +102,12 @@ public class TestFSUtils extends HoodieCommonTestHarness {
|
||||
});
|
||||
|
||||
// Files inside partitions and marker directories
|
||||
List<String> files = Arrays.asList("2016/04/15/1_1-0-1_20190528120000.parquet",
|
||||
"2016/05/16/2_1-0-1_20190528120000.parquet", ".hoodie/.temp/2/2016/05/16/2_1-0-1_20190528120000.parquet",
|
||||
".hoodie/.temp/2/2016/04/15/1_1-0-1_20190528120000.parquet");
|
||||
List<String> files = Stream.of("2016/04/15/1_1-0-1_20190528120000",
|
||||
"2016/05/16/2_1-0-1_20190528120000",
|
||||
".hoodie/.temp/2/2016/05/16/2_1-0-1_20190528120000",
|
||||
".hoodie/.temp/2/2016/04/15/1_1-0-1_20190528120000")
|
||||
.map(fileName -> fileName + BASE_FILE_EXTENSION)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
files.forEach(f -> {
|
||||
try {
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hudi.common.model;
|
||||
|
||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@@ -33,7 +34,7 @@ public class TestHoodieDeltaWriteStat {
|
||||
@Test
|
||||
public void testBaseFileAndLogFiles() {
|
||||
HoodieDeltaWriteStat writeStat = new HoodieDeltaWriteStat();
|
||||
String baseFile = "file1.parquet";
|
||||
String baseFile = "file1" + HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension();
|
||||
String logFile1 = ".log1.log";
|
||||
String logFile2 = ".log2.log";
|
||||
|
||||
|
||||
@@ -284,7 +284,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
|
||||
HoodieWriteStat stat = new HoodieWriteStat();
|
||||
stat.setFileId(i + "");
|
||||
stat.setPartitionPath(Paths.get(basePath, partition).toString());
|
||||
stat.setPath(commitTs + "." + i + ".parquet");
|
||||
stat.setPath(commitTs + "." + i + metaClient.getTableConfig().getBaseFileFormat().getFileExtension());
|
||||
commit.addWriteStat(partition, stat);
|
||||
}
|
||||
for (Map.Entry<String, String> extraEntries : extraMetadata.entrySet()) {
|
||||
@@ -303,7 +303,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
|
||||
HoodieWriteStat stat = new HoodieWriteStat();
|
||||
stat.setFileId(i + "");
|
||||
stat.setPartitionPath(Paths.get(basePath, newFilePartition).toString());
|
||||
stat.setPath(commitTs + "." + i + ".parquet");
|
||||
stat.setPath(commitTs + "." + i + metaClient.getTableConfig().getBaseFileFormat().getFileExtension());
|
||||
commit.addWriteStat(newFilePartition, stat);
|
||||
}
|
||||
Map<String, List<String>> partitionToReplaceFileIds = new HashMap<>();
|
||||
|
||||
@@ -306,7 +306,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
String partitionPath = "2016/05/01";
|
||||
new File(basePath + "/" + partitionPath).mkdirs();
|
||||
String fileId = UUID.randomUUID().toString();
|
||||
String srcName = "part_0000.parquet";
|
||||
String srcName = "part_0000" + metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
|
||||
HoodieFileStatus srcFileStatus = HoodieFileStatus.newBuilder()
|
||||
.setPath(HoodiePath.newBuilder().setUri(BOOTSTRAP_SOURCE_PATH + partitionPath + "/" + srcName).build())
|
||||
.setLength(256 * 1024 * 1024L)
|
||||
|
||||
@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.CompactionOperation;
|
||||
import org.apache.hudi.common.model.FileSlice;
|
||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||
import org.apache.hudi.common.model.HoodieFileGroup;
|
||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.testutils.MockHoodieTimeline;
|
||||
@@ -66,7 +67,8 @@ public class TestPriorityBasedFileSystemView {
|
||||
public void setUp() {
|
||||
fsView = new PriorityBasedFileSystemView(primary, secondary);
|
||||
testBaseFileStream = Stream.of(new HoodieBaseFile("test"));
|
||||
testFileSliceStream = Stream.of(new FileSlice("2020-01-01", "20:20", "file0001.parquet"));
|
||||
testFileSliceStream = Stream.of(new FileSlice("2020-01-01", "20:20",
|
||||
"file0001" + HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension()));
|
||||
}
|
||||
|
||||
private void resetMocks() {
|
||||
|
||||
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
import org.apache.hudi.common.model.HoodiePartitionMetadata;
|
||||
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
|
||||
import org.apache.hudi.common.model.IOType;
|
||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||
@@ -54,9 +55,10 @@ import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serial
|
||||
public class FileCreateUtils {
|
||||
|
||||
private static final String WRITE_TOKEN = "1-0-1";
|
||||
private static final String BASE_FILE_EXTENSION = HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension();
|
||||
|
||||
public static String baseFileName(String instantTime, String fileId) {
|
||||
return baseFileName(instantTime, fileId, HoodieFileFormat.PARQUET.getFileExtension());
|
||||
return baseFileName(instantTime, fileId, BASE_FILE_EXTENSION);
|
||||
}
|
||||
|
||||
public static String baseFileName(String instantTime, String fileId, String fileExtension) {
|
||||
@@ -72,7 +74,7 @@ public class FileCreateUtils {
|
||||
}
|
||||
|
||||
public static String markerFileName(String instantTime, String fileId, IOType ioType) {
|
||||
return markerFileName(instantTime, fileId, ioType, HoodieFileFormat.PARQUET.getFileExtension());
|
||||
return markerFileName(instantTime, fileId, ioType, BASE_FILE_EXTENSION);
|
||||
}
|
||||
|
||||
public static String markerFileName(String instantTime, String fileId, IOType ioType, String fileExtension) {
|
||||
|
||||
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
|
||||
import org.apache.hudi.common.model.IOType;
|
||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
@@ -437,7 +438,7 @@ public class HoodieTestTable {
|
||||
}
|
||||
|
||||
public FileStatus[] listAllBaseFiles() throws IOException {
|
||||
return listAllBaseFiles(HoodieFileFormat.PARQUET.getFileExtension());
|
||||
return listAllBaseFiles(HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension());
|
||||
}
|
||||
|
||||
public FileStatus[] listAllBaseFiles(String fileExtension) throws IOException {
|
||||
|
||||
@@ -97,6 +97,8 @@ public class TestCompactionUtils extends HoodieCommonTestHarness {
|
||||
|
||||
@Test
|
||||
public void testBuildFromFileSlice() {
|
||||
String extension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
|
||||
|
||||
// Empty File-Slice with no data and log files
|
||||
FileSlice emptyFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "empty1");
|
||||
HoodieCompactionOperation op =
|
||||
@@ -106,7 +108,7 @@ public class TestCompactionUtils extends HoodieCommonTestHarness {
|
||||
|
||||
// File Slice with data-file but no log files
|
||||
FileSlice noLogFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noLog1");
|
||||
noLogFileSlice.setBaseFile(new DummyHoodieBaseFile("/tmp/noLog_1_000.parquet"));
|
||||
noLogFileSlice.setBaseFile(new DummyHoodieBaseFile("/tmp/noLog_1_000" + extension));
|
||||
op = CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], noLogFileSlice, Option.of(metricsCaptureFn));
|
||||
testFileSliceCompactionOpEquality(noLogFileSlice, op, DEFAULT_PARTITION_PATHS[0],
|
||||
LATEST_COMPACTION_METADATA_VERSION);
|
||||
@@ -122,7 +124,7 @@ public class TestCompactionUtils extends HoodieCommonTestHarness {
|
||||
|
||||
// File Slice with data-file and log files present
|
||||
FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1");
|
||||
fileSlice.setBaseFile(new DummyHoodieBaseFile("/tmp/noLog_1_000.parquet"));
|
||||
fileSlice.setBaseFile(new DummyHoodieBaseFile("/tmp/noLog_1_000" + extension));
|
||||
fileSlice.addLogFile(
|
||||
new HoodieLogFile(new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))));
|
||||
fileSlice.addLogFile(
|
||||
@@ -135,16 +137,18 @@ public class TestCompactionUtils extends HoodieCommonTestHarness {
|
||||
* Generate input for compaction plan tests.
|
||||
*/
|
||||
private Pair<List<Pair<String, FileSlice>>, HoodieCompactionPlan> buildCompactionPlan() {
|
||||
String extension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
|
||||
|
||||
Path fullPartitionPath = new Path(new Path(metaClient.getBasePath()), DEFAULT_PARTITION_PATHS[0]);
|
||||
FileSlice emptyFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "empty1");
|
||||
FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1");
|
||||
fileSlice.setBaseFile(new DummyHoodieBaseFile(fullPartitionPath.toString() + "/data1_1_000.parquet"));
|
||||
fileSlice.setBaseFile(new DummyHoodieBaseFile(fullPartitionPath.toString() + "/data1_1_000" + extension));
|
||||
fileSlice.addLogFile(new HoodieLogFile(
|
||||
new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN)))));
|
||||
fileSlice.addLogFile(new HoodieLogFile(
|
||||
new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN)))));
|
||||
FileSlice noLogFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noLog1");
|
||||
noLogFileSlice.setBaseFile(new DummyHoodieBaseFile(fullPartitionPath.toString() + "/noLog_1_000.parquet"));
|
||||
noLogFileSlice.setBaseFile(new DummyHoodieBaseFile(fullPartitionPath.toString() + "/noLog_1_000" + extension));
|
||||
FileSlice noDataFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1");
|
||||
noDataFileSlice.addLogFile(new HoodieLogFile(
|
||||
new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN)))));
|
||||
|
||||
@@ -58,6 +58,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
*/
|
||||
public class TestParquetUtils extends HoodieCommonTestHarness {
|
||||
|
||||
private ParquetUtils parquetUtils = new ParquetUtils();
|
||||
|
||||
public static List<Arguments> bloomFilterTypeCodes() {
|
||||
return Arrays.asList(
|
||||
Arguments.of(BloomFilterTypeCode.SIMPLE.name()),
|
||||
@@ -83,13 +85,13 @@ public class TestParquetUtils extends HoodieCommonTestHarness {
|
||||
|
||||
// Read and verify
|
||||
List<String> rowKeysInFile = new ArrayList<>(
|
||||
ParquetUtils.readRowKeysFromParquet(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath)));
|
||||
parquetUtils.readRowKeys(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath)));
|
||||
Collections.sort(rowKeysInFile);
|
||||
Collections.sort(rowKeys);
|
||||
|
||||
assertEquals(rowKeys, rowKeysInFile, "Did not read back the expected list of keys");
|
||||
BloomFilter filterInFile =
|
||||
ParquetUtils.readBloomFilterFromParquetMetadata(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath));
|
||||
parquetUtils.readBloomFilterFromMetadata(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath));
|
||||
for (String rowKey : rowKeys) {
|
||||
assertTrue(filterInFile.mightContain(rowKey), "key should be found in bloom filter");
|
||||
}
|
||||
@@ -113,7 +115,7 @@ public class TestParquetUtils extends HoodieCommonTestHarness {
|
||||
|
||||
// Read and verify
|
||||
Set<String> filtered =
|
||||
ParquetUtils.filterParquetRowKeys(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath), filter);
|
||||
parquetUtils.filterRowKeys(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath), filter);
|
||||
|
||||
assertEquals(filter.size(), filtered.size(), "Filtered count does not match");
|
||||
|
||||
@@ -140,7 +142,7 @@ public class TestParquetUtils extends HoodieCommonTestHarness {
|
||||
|
||||
// Read and verify
|
||||
List<HoodieKey> fetchedRows =
|
||||
ParquetUtils.fetchRecordKeyPartitionPathFromParquet(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath));
|
||||
parquetUtils.fetchRecordKeyPartitionPath(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath));
|
||||
assertEquals(rowKeys.size(), fetchedRows.size(), "Total count does not match");
|
||||
|
||||
for (HoodieKey entry : fetchedRows) {
|
||||
@@ -157,7 +159,7 @@ public class TestParquetUtils extends HoodieCommonTestHarness {
|
||||
}
|
||||
writeParquetFile(BloomFilterTypeCode.SIMPLE.name(), filePath, rowKeys);
|
||||
|
||||
assertEquals(123, ParquetUtils.getRowCount(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath)));
|
||||
assertEquals(123, parquetUtils.getRowCount(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath)));
|
||||
}
|
||||
|
||||
private void writeParquetFile(String typeCode, String filePath, List<String> rowKeys) throws Exception {
|
||||
|
||||
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.common.model.HoodiePartitionMetadata;
|
||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -36,6 +37,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public final class TestTablePathUtils {
|
||||
private static final String BASE_FILE_EXTENSION = HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension();
|
||||
|
||||
@TempDir
|
||||
static File tempDir;
|
||||
@@ -73,9 +75,9 @@ public final class TestTablePathUtils {
|
||||
partitionMetadata2.trySave(2);
|
||||
|
||||
// Create files
|
||||
URI filePathURI1 = Paths.get(partitionPathURI1.getPath(), "data1.parquet").toUri();
|
||||
URI filePathURI1 = Paths.get(partitionPathURI1.getPath(), "data1" + BASE_FILE_EXTENSION).toUri();
|
||||
filePath1 = new Path(filePathURI1);
|
||||
URI filePathURI2 = Paths.get(partitionPathURI2.getPath(), "data2.parquet").toUri();
|
||||
URI filePathURI2 = Paths.get(partitionPathURI2.getPath(), "data2" + BASE_FILE_EXTENSION).toUri();
|
||||
filePath2 = new Path(filePathURI2);
|
||||
|
||||
assertTrue(new File(filePathURI1).createNewFile());
|
||||
|
||||
@@ -27,7 +27,7 @@ import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.common.util.ParquetUtils;
|
||||
import org.apache.hudi.common.util.BaseFileUtils;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
@@ -221,6 +221,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
||||
private void loadRecords(String partitionPath) throws Exception {
|
||||
LOG.info("Start loading records under partition {} into the index state", partitionPath);
|
||||
HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
|
||||
BaseFileUtils fileUtils = BaseFileUtils.getInstance(hoodieTable.getBaseFileFormat());
|
||||
List<HoodieBaseFile> latestBaseFiles =
|
||||
HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, hoodieTable);
|
||||
final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
|
||||
@@ -230,7 +231,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
||||
final List<HoodieKey> hoodieKeys;
|
||||
try {
|
||||
hoodieKeys =
|
||||
ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
|
||||
fileUtils.fetchRecordKeyPartitionPath(hadoopConf, new Path(baseFile.getPath()));
|
||||
} catch (Exception e) {
|
||||
// in case there was some empty parquet file when the pipeline
|
||||
// crushes exceptionally.
|
||||
|
||||
@@ -116,7 +116,7 @@ public class TestInputFormat {
|
||||
void testReadBaseAndLogFiles() throws Exception {
|
||||
beforeEach(HoodieTableType.MERGE_ON_READ);
|
||||
|
||||
// write parquet first with compaction
|
||||
// write base first with compaction
|
||||
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
|
||||
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
|
||||
TestData.writeData(TestData.DATA_SET_INSERT, conf);
|
||||
|
||||
@@ -172,7 +172,7 @@ public class InputFormatTestUtil {
|
||||
|
||||
public static File prepareParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles,
|
||||
int numberOfRecords, String commitNumber, HoodieTableType tableType) throws IOException {
|
||||
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), tableType);
|
||||
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), tableType, HoodieFileFormat.PARQUET);
|
||||
java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01"));
|
||||
createData(schema, partitionPath, numberOfFiles, numberOfRecords, commitNumber);
|
||||
return partitionPath.toFile();
|
||||
@@ -185,7 +185,7 @@ public class InputFormatTestUtil {
|
||||
|
||||
public static File prepareSimpleParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles,
|
||||
int numberOfRecords, String commitNumber, HoodieTableType tableType) throws Exception {
|
||||
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), tableType);
|
||||
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), tableType, HoodieFileFormat.PARQUET);
|
||||
java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01"));
|
||||
createSimpleData(schema, partitionPath, numberOfFiles, numberOfRecords, commitNumber);
|
||||
return partitionPath.toFile();
|
||||
@@ -198,7 +198,7 @@ public class InputFormatTestUtil {
|
||||
|
||||
public static File prepareNonPartitionedParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles,
|
||||
int numberOfRecords, String commitNumber, HoodieTableType tableType) throws IOException {
|
||||
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), tableType);
|
||||
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), tableType, HoodieFileFormat.PARQUET);
|
||||
createData(schema, basePath, numberOfFiles, numberOfRecords, commitNumber);
|
||||
return basePath.toFile();
|
||||
}
|
||||
@@ -207,7 +207,7 @@ public class InputFormatTestUtil {
|
||||
String commitNumber) throws IOException {
|
||||
AvroParquetWriter parquetWriter;
|
||||
for (int i = 0; i < numberOfFiles; i++) {
|
||||
String fileId = FSUtils.makeDataFileName(commitNumber, TEST_WRITE_TOKEN, "fileid" + i);
|
||||
String fileId = FSUtils.makeDataFileName(commitNumber, TEST_WRITE_TOKEN, "fileid" + i, HoodieFileFormat.PARQUET.getFileExtension());
|
||||
parquetWriter = new AvroParquetWriter(new Path(partitionPath.resolve(fileId).toString()), schema);
|
||||
try {
|
||||
for (GenericRecord record : generateAvroRecords(schema, numberOfRecords, commitNumber, fileId)) {
|
||||
@@ -223,7 +223,7 @@ public class InputFormatTestUtil {
|
||||
String commitNumber) throws Exception {
|
||||
AvroParquetWriter parquetWriter;
|
||||
for (int i = 0; i < numberOfFiles; i++) {
|
||||
String fileId = FSUtils.makeDataFileName(commitNumber, "1", "fileid" + i);
|
||||
String fileId = FSUtils.makeDataFileName(commitNumber, "1", "fileid" + i, HoodieFileFormat.PARQUET.getFileExtension());
|
||||
parquetWriter = new AvroParquetWriter(new Path(partitionPath.resolve(fileId).toString()), schema);
|
||||
try {
|
||||
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, numberOfRecords);
|
||||
@@ -254,7 +254,8 @@ public class InputFormatTestUtil {
|
||||
int totalNumberOfRecords, int numberOfRecordsToUpdate, String newCommit) throws IOException {
|
||||
File fileToUpdate = Objects.requireNonNull(directory.listFiles((dir, name) -> name.endsWith("parquet")))[0];
|
||||
String fileId = FSUtils.getFileId(fileToUpdate.getName());
|
||||
File dataFile = new File(directory, FSUtils.makeDataFileName(newCommit, TEST_WRITE_TOKEN, fileId));
|
||||
File dataFile = new File(directory,
|
||||
FSUtils.makeDataFileName(newCommit, TEST_WRITE_TOKEN, fileId, HoodieFileFormat.PARQUET.getFileExtension()));
|
||||
try (AvroParquetWriter parquetWriter = new AvroParquetWriter(new Path(dataFile.getAbsolutePath()), schema)) {
|
||||
for (GenericRecord record : generateAvroRecords(schema, totalNumberOfRecords, originalCommit, fileId)) {
|
||||
if (numberOfRecordsToUpdate > 0) {
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
package org.apache.hudi.utilities.checkpointing;
|
||||
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
|
||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
@@ -33,6 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness {
|
||||
private static final String BASE_FILE_EXTENSION = HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension();
|
||||
|
||||
@Test
|
||||
public void testValidKafkaConnectPath() throws Exception {
|
||||
@@ -46,19 +48,19 @@ public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness {
|
||||
// kafka connect tmp folder
|
||||
new File(topicPath + "/TMP").mkdirs();
|
||||
// tmp file that being written
|
||||
new File(topicPath + "/TMP/" + "topic1+0+301+400.parquet").createNewFile();
|
||||
// regular parquet files
|
||||
new File(topicPath + "/TMP/" + "topic1+0+301+400" + BASE_FILE_EXTENSION).createNewFile();
|
||||
// regular base files
|
||||
new File(topicPath + "/year=2016/month=05/day=01/"
|
||||
+ "topic1+0+100+200.parquet").createNewFile();
|
||||
+ "topic1+0+100+200" + BASE_FILE_EXTENSION).createNewFile();
|
||||
new File(topicPath + "/year=2016/month=05/day=01/"
|
||||
+ "topic1+1+100+200.parquet").createNewFile();
|
||||
+ "topic1+1+100+200" + BASE_FILE_EXTENSION).createNewFile();
|
||||
new File(topicPath + "/year=2016/month=05/day=02/"
|
||||
+ "topic1+0+201+300.parquet").createNewFile();
|
||||
// noise parquet file
|
||||
+ "topic1+0+201+300" + BASE_FILE_EXTENSION).createNewFile();
|
||||
// noise base file
|
||||
new File(topicPath + "/year=2016/month=05/day=01/"
|
||||
+ "random_snappy_1.parquet").createNewFile();
|
||||
+ "random_snappy_1" + BASE_FILE_EXTENSION).createNewFile();
|
||||
new File(topicPath + "/year=2016/month=05/day=02/"
|
||||
+ "random_snappy_2.parquet").createNewFile();
|
||||
+ "random_snappy_2" + BASE_FILE_EXTENSION).createNewFile();
|
||||
final TypedProperties props = new TypedProperties();
|
||||
props.put("hoodie.deltastreamer.checkpoint.provider.path", topicPath.toString());
|
||||
final InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(props);
|
||||
@@ -73,13 +75,13 @@ public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness {
|
||||
// create regular kafka connect hdfs dirs
|
||||
new File(topicPath + "/year=2016/month=05/day=01/").mkdirs();
|
||||
new File(topicPath + "/year=2016/month=05/day=02/").mkdirs();
|
||||
// parquet files with missing partition
|
||||
// base files with missing partition
|
||||
new File(topicPath + "/year=2016/month=05/day=01/"
|
||||
+ "topic1+0+100+200.parquet").createNewFile();
|
||||
+ "topic1+0+100+200" + BASE_FILE_EXTENSION).createNewFile();
|
||||
new File(topicPath + "/year=2016/month=05/day=01/"
|
||||
+ "topic1+2+100+200.parquet").createNewFile();
|
||||
+ "topic1+2+100+200" + BASE_FILE_EXTENSION).createNewFile();
|
||||
new File(topicPath + "/year=2016/month=05/day=02/"
|
||||
+ "topic1+0+201+300.parquet").createNewFile();
|
||||
+ "topic1+0+201+300" + BASE_FILE_EXTENSION).createNewFile();
|
||||
final TypedProperties props = new TypedProperties();
|
||||
props.put("hoodie.deltastreamer.checkpoint.provider.path", topicPath.toString());
|
||||
final InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(props);
|
||||
|
||||
Reference in New Issue
Block a user