[HUDI-995] Migrate HoodieTestUtils APIs to HoodieTestTable (#2143)
* [HUDI-995] Migrate HoodieTestUtils APIs to HoodieTestTable Remove APIs in `HoodieTestUtils` - listAllDataFilesAndLogFilesInPath - listAllLogFilesInPath - listAllDataFilesInPath - writeRecordsToLogFiles - createCleanFiles - createPendingCleanFiles Migrate the callers to use `HoodieTestTable` and `HoodieWriteableTestTable` with new APIs added - listAllBaseAndLogFiles - listAllLogFiles - listAllBaseFiles - withLogAppends - addClean - addInflightClean Also added related APIs in `FileCreateUtils` - createCleanFile - createRequestedCleanFile - createInflightCleanFile
This commit is contained in:
@@ -281,9 +281,9 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
|
||||
|
||||
// We create three parquet file, each having one record. (two different partitions)
|
||||
HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(hoodieTable, SCHEMA);
|
||||
String fileId1 = testTable.addCommit("001").withInserts(p1, record1);
|
||||
String fileId2 = testTable.addCommit("002").withInserts(p1, record2);
|
||||
String fileId3 = testTable.addCommit("003").withInserts(p2, record4);
|
||||
String fileId1 = testTable.addCommit("001").getFileIdWithInserts(p1, record1);
|
||||
String fileId2 = testTable.addCommit("002").getFileIdWithInserts(p1, record2);
|
||||
String fileId3 = testTable.addCommit("003").getFileIdWithInserts(p2, record4);
|
||||
|
||||
// We do the tag again
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
@@ -375,7 +375,7 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
|
||||
incomingPayloadSamePartition);
|
||||
|
||||
// We have some records to be tagged (two different partitions)
|
||||
testTable.addCommit("1000").withInserts(p1, originalRecord);
|
||||
testTable.addCommit("1000").getFileIdWithInserts(p1, originalRecord);
|
||||
|
||||
// test against incoming record with a different partition
|
||||
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Collections.singletonList(incomingRecord));
|
||||
|
||||
@@ -223,7 +223,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
||||
BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name());
|
||||
filter.add(record3.getRecordKey());
|
||||
HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(metaClient, SCHEMA, filter);
|
||||
String fileId = testTable.addCommit("000").withInserts(partition, record1, record2);
|
||||
String fileId = testTable.addCommit("000").getFileIdWithInserts(partition, record1, record2);
|
||||
String filename = testTable.getBaseFileNameById(fileId);
|
||||
|
||||
// The bloom filter contains 3 records
|
||||
@@ -310,9 +310,9 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
||||
}
|
||||
|
||||
// We create three parquet file, each having one record. (two different partitions)
|
||||
String fileId1 = testTable.addCommit("001").withInserts("2016/01/31", record1);
|
||||
String fileId2 = testTable.addCommit("002").withInserts("2016/01/31", record2);
|
||||
String fileId3 = testTable.addCommit("003").withInserts("2015/01/31", record4);
|
||||
String fileId1 = testTable.addCommit("001").getFileIdWithInserts("2016/01/31", record1);
|
||||
String fileId2 = testTable.addCommit("002").getFileIdWithInserts("2016/01/31", record2);
|
||||
String fileId3 = testTable.addCommit("003").getFileIdWithInserts("2015/01/31", record4);
|
||||
|
||||
// We do the tag again
|
||||
taggedRecordRDD = bloomIndex.tagLocation(recordRDD, context, HoodieSparkTable.create(config, context, metaClient));
|
||||
@@ -380,9 +380,9 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
||||
}
|
||||
|
||||
// We create three parquet file, each having one record. (two different partitions)
|
||||
String fileId1 = testTable.addCommit("001").withInserts("2016/01/31", record1);
|
||||
String fileId2 = testTable.addCommit("002").withInserts("2016/01/31", record2);
|
||||
String fileId3 = testTable.addCommit("003").withInserts("2015/01/31", record4);
|
||||
String fileId1 = testTable.addCommit("001").getFileIdWithInserts("2016/01/31", record1);
|
||||
String fileId2 = testTable.addCommit("002").getFileIdWithInserts("2016/01/31", record2);
|
||||
String fileId3 = testTable.addCommit("003").getFileIdWithInserts("2015/01/31", record4);
|
||||
|
||||
// We do the tag again
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
@@ -433,7 +433,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
||||
BloomFilterTypeCode.SIMPLE.name());
|
||||
filter.add(record2.getRecordKey());
|
||||
HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(metaClient, SCHEMA, filter);
|
||||
String fileId = testTable.addCommit("000").withInserts("2016/01/31", record1);
|
||||
String fileId = testTable.addCommit("000").getFileIdWithInserts("2016/01/31", record1);
|
||||
assertTrue(filter.mightContain(record1.getRecordKey()));
|
||||
assertTrue(filter.mightContain(record2.getRecordKey()));
|
||||
|
||||
|
||||
@@ -217,10 +217,10 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
|
||||
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record5));
|
||||
|
||||
// intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
|
||||
String fileId1 = testTable.addCommit("1000").withInserts("2016/04/01", record1);
|
||||
String fileId2 = testTable.addCommit("2000").withInserts("2015/03/12");
|
||||
String fileId3 = testTable.addCommit("3000").withInserts("2015/03/12", record2);
|
||||
String fileId4 = testTable.addCommit("4000").withInserts("2015/03/12", record4);
|
||||
String fileId1 = testTable.addCommit("1000").getFileIdWithInserts("2016/04/01", record1);
|
||||
String fileId2 = testTable.addCommit("2000").getFileIdWithInserts("2015/03/12");
|
||||
String fileId3 = testTable.addCommit("3000").getFileIdWithInserts("2015/03/12", record2);
|
||||
String fileId4 = testTable.addCommit("4000").getFileIdWithInserts("2015/03/12", record4);
|
||||
|
||||
// partitions will NOT be respected by this loadInvolvedFiles(...) call
|
||||
JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, context, hoodieTable);
|
||||
@@ -299,7 +299,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
|
||||
new HoodieKey(incomingPayloadSamePartition.getRowKey(), incomingPayloadSamePartition.getPartitionPath()),
|
||||
incomingPayloadSamePartition);
|
||||
|
||||
testTable.addCommit("1000").withInserts(p1, originalRecord);
|
||||
testTable.addCommit("1000").getFileIdWithInserts(p1, originalRecord);
|
||||
|
||||
// test against incoming record with a different partition
|
||||
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Collections.singletonList(incomingRecord));
|
||||
|
||||
@@ -130,7 +130,7 @@ public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness {
|
||||
|
||||
for (List<HoodieRecord> recordsPerSlice : recordsForFileSlices) {
|
||||
String instantTime = makeNewCommitTime();
|
||||
String fileId = testTable.addCommit(instantTime).withInserts(entry.getKey(), recordsPerSlice.toArray(new HoodieRecord[0]));
|
||||
String fileId = testTable.addCommit(instantTime).getFileIdWithInserts(entry.getKey(), recordsPerSlice.toArray(new HoodieRecord[0]));
|
||||
Tuple2<String, String> fileIdInstantTimePair = new Tuple2<>(fileId, instantTime);
|
||||
List<Tuple2<HoodieKey, HoodieRecordLocation>> expectedEntries = new ArrayList<>();
|
||||
for (HoodieRecord record : recordsPerSlice) {
|
||||
|
||||
@@ -18,11 +18,13 @@
|
||||
|
||||
package org.apache.hudi.io;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.avro.model.HoodieActionInstant;
|
||||
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
||||
import org.apache.hudi.common.HoodieCleanStat;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
|
||||
import org.apache.hudi.common.model.HoodieCleaningPolicy;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
@@ -32,15 +34,21 @@ import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
|
||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.testutils.HoodieTestTable;
|
||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieSparkTable;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.HoodieTimelineArchiveLog;
|
||||
import org.apache.hudi.testutils.HoodieClientTestHarness;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -48,11 +56,14 @@ import org.junit.jupiter.api.Test;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
@@ -146,13 +157,14 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
|
||||
|
||||
assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match");
|
||||
|
||||
HoodieTestUtils.createCleanFiles(metaClient, basePath, "100", wrapperFs.getConf());
|
||||
HoodieTestUtils.createCleanFiles(metaClient, basePath, "101", wrapperFs.getConf());
|
||||
HoodieTestUtils.createCleanFiles(metaClient, basePath, "102", wrapperFs.getConf());
|
||||
HoodieTestUtils.createCleanFiles(metaClient, basePath, "103", wrapperFs.getConf());
|
||||
HoodieTestUtils.createCleanFiles(metaClient, basePath, "104", wrapperFs.getConf());
|
||||
HoodieTestUtils.createCleanFiles(metaClient, basePath, "105", wrapperFs.getConf());
|
||||
HoodieTestUtils.createPendingCleanFiles(metaClient, "106", "107");
|
||||
createCleanMetadata("100", false);
|
||||
createCleanMetadata("101", false);
|
||||
createCleanMetadata("102", false);
|
||||
createCleanMetadata("103", false);
|
||||
createCleanMetadata("104", false);
|
||||
createCleanMetadata("105", false);
|
||||
createCleanMetadata("106", true);
|
||||
createCleanMetadata("107", true);
|
||||
|
||||
// reload the timeline and get all the commmits before archive
|
||||
timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
|
||||
@@ -227,7 +239,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
|
||||
int numCommits = 4;
|
||||
int commitInstant = 100;
|
||||
for (int i = 0; i < numCommits; i++) {
|
||||
createReplaceMetadata(commitInstant);
|
||||
createReplaceMetadata(String.valueOf(commitInstant));
|
||||
commitInstant += 100;
|
||||
}
|
||||
|
||||
@@ -478,17 +490,34 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
|
||||
assertEquals(expectedCommitMetadata.getOperationType(), WriteOperationType.INSERT.toString());
|
||||
}
|
||||
|
||||
private void createReplaceMetadata(int commitInstant) throws Exception {
|
||||
String commitTime = "" + commitInstant;
|
||||
String fileId1 = "file-" + commitInstant + "-1";
|
||||
String fileId2 = "file-" + commitInstant + "-2";
|
||||
private void createReplaceMetadata(String instantTime) throws Exception {
|
||||
String fileId1 = "file-" + instantTime + "-1";
|
||||
String fileId2 = "file-" + instantTime + "-2";
|
||||
|
||||
// create replace instant to mark fileId1 as deleted
|
||||
HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata();
|
||||
replaceMetadata.addReplaceFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1);
|
||||
replaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE);
|
||||
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
|
||||
testTable.addReplaceCommit(commitTime, replaceMetadata);
|
||||
testTable.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
|
||||
HoodieTestTable.of(metaClient)
|
||||
.addReplaceCommit(instantTime, replaceMetadata)
|
||||
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
|
||||
}
|
||||
|
||||
private void createCleanMetadata(String instantTime, boolean inflightOnly) throws IOException {
|
||||
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(),
|
||||
CleanPlanV2MigrationHandler.VERSION, new HashMap<>());
|
||||
if (inflightOnly) {
|
||||
HoodieTestTable.of(metaClient).addInflightClean(instantTime, cleanerPlan);
|
||||
} else {
|
||||
HoodieCleanStat cleanStats = new HoodieCleanStat(
|
||||
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
|
||||
HoodieTestUtils.DEFAULT_PARTITION_PATHS[new Random().nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)],
|
||||
Collections.emptyList(),
|
||||
Collections.emptyList(),
|
||||
Collections.emptyList(),
|
||||
instantTime);
|
||||
HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats));
|
||||
HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, cleanMetadata);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -982,7 +982,7 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
HoodieTestTable testTable = HoodieTestTable.of(metaClient)
|
||||
.addRequestedCommit("000")
|
||||
.withMarkerFiles("default", 10, IOType.MERGE);
|
||||
final int numTempFilesBefore = testTable.listAllFilesInTempFolder().size();
|
||||
final int numTempFilesBefore = testTable.listAllFilesInTempFolder().length;
|
||||
assertEquals(10, numTempFilesBefore, "Some marker files are created.");
|
||||
|
||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
|
||||
@@ -992,7 +992,7 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000"), Option.empty());
|
||||
metaClient.reloadActiveTimeline();
|
||||
table.rollback(context, "001", new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"), true);
|
||||
final int numTempFilesAfter = testTable.listAllFilesInTempFolder().size();
|
||||
final int numTempFilesAfter = testTable.listAllFilesInTempFolder().length;
|
||||
assertEquals(0, numTempFilesAfter, "All temp files are deleted.");
|
||||
}
|
||||
|
||||
|
||||
@@ -42,6 +42,7 @@ import org.apache.hudi.common.table.view.SyncableFileSystemView;
|
||||
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
|
||||
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
|
||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.testutils.HoodieTestTable;
|
||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||
import org.apache.hudi.common.testutils.Transformations;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
@@ -63,6 +64,7 @@ import org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExec
|
||||
import org.apache.hudi.testutils.HoodieClientTestHarness;
|
||||
import org.apache.hudi.testutils.HoodieClientTestUtils;
|
||||
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
|
||||
import org.apache.hudi.testutils.HoodieWriteableTestTable;
|
||||
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
@@ -159,7 +161,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
client.compact(compactionCommitTime);
|
||||
|
||||
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient);
|
||||
FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
|
||||
FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
|
||||
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||
HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
|
||||
@@ -207,7 +209,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
client.compact(compactionCommitTime);
|
||||
|
||||
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient);
|
||||
FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
|
||||
FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
|
||||
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||
HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
|
||||
@@ -377,7 +379,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
|
||||
assertFalse(commit.isPresent());
|
||||
|
||||
FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
|
||||
FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
|
||||
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
|
||||
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
|
||||
assertFalse(dataFilesToRead.findAny().isPresent());
|
||||
@@ -418,7 +420,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
|
||||
assertFalse(commit.isPresent());
|
||||
|
||||
allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
|
||||
allFiles = listAllBaseFilesInPath(hoodieTable);
|
||||
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||
dataFilesToRead = tableView.getLatestBaseFiles();
|
||||
assertTrue(dataFilesToRead.findAny().isPresent());
|
||||
@@ -476,7 +478,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient);
|
||||
FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
|
||||
FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
|
||||
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||
|
||||
final String absentCommit = newCommitTime;
|
||||
@@ -524,7 +526,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
|
||||
assertFalse(commit.isPresent());
|
||||
|
||||
FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
|
||||
FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
|
||||
tableView =
|
||||
getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
|
||||
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
|
||||
@@ -558,7 +560,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
|
||||
// Test failed delta commit rollback
|
||||
secondClient.rollback(commitTime1);
|
||||
allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
|
||||
allFiles = listAllBaseFilesInPath(hoodieTable);
|
||||
// After rollback, there should be no base file with the failed commit time
|
||||
List<String> remainingFiles = Arrays.stream(allFiles).filter(file -> file.getPath().getName()
|
||||
.contains(commitTime1)).map(fileStatus -> fileStatus.getPath().toString()).collect(Collectors.toList());
|
||||
@@ -593,7 +595,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
|
||||
// Test successful delta commit rollback
|
||||
thirdClient.rollback(commitTime2);
|
||||
allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
|
||||
allFiles = listAllBaseFilesInPath(hoodieTable);
|
||||
// After rollback, there should be no parquet file with the failed commit time
|
||||
assertEquals(0, Arrays.stream(allFiles)
|
||||
.filter(file -> file.getPath().getName().contains(commitTime2)).count());
|
||||
@@ -624,16 +626,16 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
String compactionInstantTime = thirdClient.scheduleCompaction(Option.empty()).get().toString();
|
||||
thirdClient.compact(compactionInstantTime);
|
||||
|
||||
allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
|
||||
allFiles = listAllBaseFilesInPath(hoodieTable);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
|
||||
|
||||
final String compactedCommitTime = metaClient.getActiveTimeline().reload().lastInstant().get().getTimestamp();
|
||||
assertTrue(Arrays.stream(listAllDataFilesInPath(hoodieTable, cfg.getBasePath()))
|
||||
assertTrue(Arrays.stream(listAllBaseFilesInPath(hoodieTable))
|
||||
.anyMatch(file -> compactedCommitTime.equals(new HoodieBaseFile(file).getCommitTime())));
|
||||
thirdClient.rollbackInflightCompaction(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactedCommitTime),
|
||||
hoodieTable);
|
||||
allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
|
||||
allFiles = listAllBaseFilesInPath(hoodieTable);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
|
||||
|
||||
@@ -680,7 +682,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
|
||||
assertFalse(commit.isPresent());
|
||||
|
||||
FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
|
||||
FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
|
||||
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
|
||||
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
|
||||
assertFalse(dataFilesToRead.findAny().isPresent());
|
||||
@@ -759,7 +761,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
JavaRDD<WriteStatus> ws = (JavaRDD<WriteStatus>) client.compact(compactionInstantTime);
|
||||
client.commitCompaction(compactionInstantTime, ws, Option.empty());
|
||||
|
||||
allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
|
||||
allFiles = listAllBaseFilesInPath(hoodieTable);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
|
||||
|
||||
@@ -787,7 +789,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
client.restoreToInstant("000");
|
||||
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
|
||||
allFiles = listAllBaseFilesInPath(hoodieTable);
|
||||
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
|
||||
dataFilesToRead = tableView.getLatestBaseFiles();
|
||||
assertFalse(dataFilesToRead.findAny().isPresent());
|
||||
@@ -842,7 +844,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
|
||||
assertFalse(commit.isPresent());
|
||||
|
||||
FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
|
||||
FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
|
||||
BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient,
|
||||
metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles);
|
||||
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
|
||||
@@ -876,7 +878,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
|
||||
assertFalse(commit.isPresent());
|
||||
|
||||
allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
|
||||
allFiles = listAllBaseFilesInPath(hoodieTable);
|
||||
roView = getHoodieTableFileSystemView(metaClient,
|
||||
hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), allFiles);
|
||||
dataFilesToRead = roView.getLatestBaseFiles();
|
||||
@@ -919,15 +921,14 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
updatedRecords = readClient.tagLocation(updatedRecordsRDD).collect();
|
||||
|
||||
// Write them to corresponding avro logfiles
|
||||
HoodieTestUtils.writeRecordsToLogFiles(metaClient.getFs(), metaClient.getBasePath(),
|
||||
HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, updatedRecords);
|
||||
|
||||
// Verify that all data file has one log file
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
|
||||
HoodieWriteableTestTable.of(table, HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS)
|
||||
.withLogAppends(updatedRecords);
|
||||
// In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state
|
||||
((SyncableFileSystemView) (table.getSliceView())).reset();
|
||||
|
||||
// Verify that all data file has one log file
|
||||
for (String partitionPath : dataGen.getPartitionPaths()) {
|
||||
List<FileSlice> groupedLogFiles =
|
||||
table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
|
||||
@@ -1400,7 +1401,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
|
||||
assertFalse(commit.isPresent());
|
||||
|
||||
FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
|
||||
FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
|
||||
BaseFileOnlyView roView =
|
||||
getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
|
||||
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
|
||||
@@ -1499,7 +1500,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().lastInstant();
|
||||
assertFalse(commit.isPresent());
|
||||
|
||||
FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
|
||||
FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
|
||||
BaseFileOnlyView roView =
|
||||
getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
|
||||
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
|
||||
@@ -1533,7 +1534,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
|
||||
assertFalse(commit.isPresent());
|
||||
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient);
|
||||
return listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
|
||||
return listAllBaseFilesInPath(hoodieTable);
|
||||
}
|
||||
|
||||
private FileStatus[] getROSnapshotFiles(String partitionPath)
|
||||
@@ -1598,8 +1599,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
assertEquals(expectedCommitsSet, actualCommits);
|
||||
}
|
||||
|
||||
private FileStatus[] listAllDataFilesInPath(HoodieTable table, String basePath) throws IOException {
|
||||
return HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), basePath, table.getBaseFileExtension());
|
||||
private FileStatus[] listAllBaseFilesInPath(HoodieTable table) throws IOException {
|
||||
return HoodieTestTable.of(table.getMetaClient()).listAllBaseFiles(table.getBaseFileExtension());
|
||||
}
|
||||
|
||||
private FileStatus[] listStatus(JobConf jobConf, boolean realtime) throws IOException {
|
||||
|
||||
@@ -35,7 +35,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||
import org.apache.hudi.common.testutils.HoodieTestTable;
|
||||
import org.apache.hudi.common.util.CompactionUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
@@ -130,7 +130,7 @@ public class CompactionTestBase extends HoodieClientTestBase {
|
||||
assertNoWriteErrors(statusList);
|
||||
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
|
||||
List<HoodieBaseFile> dataFilesToRead = getCurrentLatestDataFiles(hoodieTable, cfg);
|
||||
List<HoodieBaseFile> dataFilesToRead = getCurrentLatestBaseFiles(hoodieTable);
|
||||
assertTrue(dataFilesToRead.stream().findAny().isPresent(),
|
||||
"should list the parquet files we wrote in the delta commit");
|
||||
validateDeltaCommit(firstInstant, fgIdToCompactionOperation, cfg);
|
||||
@@ -225,10 +225,10 @@ public class CompactionTestBase extends HoodieClientTestBase {
|
||||
return statusList;
|
||||
}
|
||||
|
||||
protected List<HoodieBaseFile> getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException {
|
||||
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath());
|
||||
protected List<HoodieBaseFile> getCurrentLatestBaseFiles(HoodieTable table) throws IOException {
|
||||
FileStatus[] allBaseFiles = HoodieTestTable.of(table.getMetaClient()).listAllBaseFiles();
|
||||
HoodieTableFileSystemView view =
|
||||
getHoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles);
|
||||
getHoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allBaseFiles);
|
||||
return view.getLatestBaseFiles().collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
||||
@@ -44,6 +44,7 @@ import org.apache.hudi.index.bloom.SparkHoodieBloomIndex;
|
||||
import org.apache.hudi.table.HoodieSparkTable;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.testutils.HoodieClientTestHarness;
|
||||
import org.apache.hudi.testutils.HoodieWriteableTestTable;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
@@ -154,8 +155,8 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
|
||||
updatedRecords = ((JavaRDD<HoodieRecord>)index.tagLocation(updatedRecordsRDD, context, table)).collect();
|
||||
|
||||
// Write them to corresponding avro logfiles. Also, set the state transition properly.
|
||||
HoodieTestUtils.writeRecordsToLogFiles(fs, metaClient.getBasePath(),
|
||||
HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, updatedRecords);
|
||||
HoodieWriteableTestTable.of(table, HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS)
|
||||
.withLogAppends(updatedRecords);
|
||||
metaClient.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
|
||||
HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty());
|
||||
writeClient.commit(newCommitTime, jsc.emptyRDD(), Option.empty());
|
||||
|
||||
@@ -33,6 +33,7 @@ import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
@@ -75,11 +76,11 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
|
||||
// then: ensure files are deleted correctly, non-existent files reported as failed deletes
|
||||
assertEquals(2, stats.size());
|
||||
|
||||
List<FileStatus> partAFiles = testTable.listAllFiles("partA");
|
||||
List<FileStatus> partBFiles = testTable.listAllFiles("partB");
|
||||
FileStatus[] partAFiles = testTable.listAllFilesInPartition("partA");
|
||||
FileStatus[] partBFiles = testTable.listAllFilesInPartition("partB");
|
||||
|
||||
assertEquals(0, partBFiles.size());
|
||||
assertEquals(1, partAFiles.size());
|
||||
assertEquals(0, partBFiles.length);
|
||||
assertEquals(1, partAFiles.length);
|
||||
assertEquals(2, stats.stream().mapToInt(r -> r.getSuccessDeleteFiles().size()).sum());
|
||||
assertEquals(1, stats.stream().mapToInt(r -> r.getFailedDeleteFiles().size()).sum());
|
||||
}
|
||||
@@ -108,15 +109,15 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
|
||||
// then: ensure files are deleted, rollback block is appended (even if append does not exist)
|
||||
assertEquals(2, stats.size());
|
||||
// will have the log file
|
||||
List<FileStatus> partBFiles = testTable.listAllFiles("partB");
|
||||
assertEquals(1, partBFiles.size());
|
||||
assertTrue(partBFiles.get(0).getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension()));
|
||||
assertTrue(partBFiles.get(0).getLen() > 0);
|
||||
FileStatus[] partBFiles = testTable.listAllFilesInPartition("partB");
|
||||
assertEquals(1, partBFiles.length);
|
||||
assertTrue(partBFiles[0].getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension()));
|
||||
assertTrue(partBFiles[0].getLen() > 0);
|
||||
|
||||
List<FileStatus> partAFiles = testTable.listAllFiles("partA");
|
||||
assertEquals(3, partAFiles.size());
|
||||
assertEquals(2, partAFiles.stream().filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).count());
|
||||
assertEquals(1, partAFiles.stream().filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).filter(f -> f.getLen() > 0).count());
|
||||
FileStatus[] partAFiles = testTable.listAllFilesInPartition("partA");
|
||||
assertEquals(3, partAFiles.length);
|
||||
assertEquals(2, Stream.of(partAFiles).filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).count());
|
||||
assertEquals(1, Stream.of(partAFiles).filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).filter(f -> f.getLen() > 0).count());
|
||||
|
||||
// only partB/f1_001 will be deleted
|
||||
assertEquals(1, stats.stream().mapToInt(r -> r.getSuccessDeleteFiles().size()).sum());
|
||||
|
||||
@@ -25,8 +25,13 @@ import org.apache.hudi.client.SparkTaskContextSupplier;
|
||||
import org.apache.hudi.common.bloom.BloomFilter;
|
||||
import org.apache.hudi.common.bloom.BloomFilterFactory;
|
||||
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
|
||||
import org.apache.hudi.common.model.HoodieLogFile;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
||||
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
|
||||
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
|
||||
import org.apache.hudi.common.testutils.FileCreateUtils;
|
||||
import org.apache.hudi.common.testutils.HoodieTestTable;
|
||||
import org.apache.hudi.config.HoodieStorageConfig;
|
||||
@@ -36,19 +41,29 @@ import org.apache.hudi.table.HoodieTable;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.parquet.avro.AvroSchemaConverter;
|
||||
import org.apache.parquet.hadoop.ParquetWriter;
|
||||
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;
|
||||
|
||||
public class HoodieWriteableTestTable extends HoodieTestTable {
|
||||
private static final Logger LOG = LogManager.getLogger(HoodieWriteableTestTable.class);
|
||||
|
||||
private final Schema schema;
|
||||
private final BloomFilter filter;
|
||||
@@ -89,11 +104,15 @@ public class HoodieWriteableTestTable extends HoodieTestTable {
|
||||
return (HoodieWriteableTestTable) super.forCommit(instantTime);
|
||||
}
|
||||
|
||||
public String withInserts(String partition) throws Exception {
|
||||
return withInserts(partition, new HoodieRecord[0]);
|
||||
public String getFileIdWithInserts(String partition) throws Exception {
|
||||
return getFileIdWithInserts(partition, new HoodieRecord[0]);
|
||||
}
|
||||
|
||||
public String withInserts(String partition, HoodieRecord... records) throws Exception {
|
||||
public String getFileIdWithInserts(String partition, HoodieRecord... records) throws Exception {
|
||||
return getFileIdWithInserts(partition, Arrays.asList(records));
|
||||
}
|
||||
|
||||
public String getFileIdWithInserts(String partition, List<HoodieRecord> records) throws Exception {
|
||||
String fileId = UUID.randomUUID().toString();
|
||||
withInserts(partition, fileId, records);
|
||||
return fileId;
|
||||
@@ -104,6 +123,10 @@ public class HoodieWriteableTestTable extends HoodieTestTable {
|
||||
}
|
||||
|
||||
public HoodieWriteableTestTable withInserts(String partition, String fileId, HoodieRecord... records) throws Exception {
|
||||
return withInserts(partition, fileId, Arrays.asList(records));
|
||||
}
|
||||
|
||||
public HoodieWriteableTestTable withInserts(String partition, String fileId, List<HoodieRecord> records) throws Exception {
|
||||
FileCreateUtils.createPartitionMetaFile(basePath, partition);
|
||||
String fileName = baseFileName(currentInstantTime, fileId);
|
||||
|
||||
@@ -128,4 +151,38 @@ public class HoodieWriteableTestTable extends HoodieTestTable {
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieWriteableTestTable withLogAppends(HoodieRecord... records) throws Exception {
|
||||
return withLogAppends(Arrays.asList(records));
|
||||
}
|
||||
|
||||
public HoodieWriteableTestTable withLogAppends(List<HoodieRecord> records) throws Exception {
|
||||
for (List<HoodieRecord> groupedRecords: records.stream()
|
||||
.collect(Collectors.groupingBy(HoodieRecord::getCurrentLocation)).values()) {
|
||||
appendRecordsToLogFile(groupedRecords);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
private void appendRecordsToLogFile(List<HoodieRecord> groupedRecords) throws Exception {
|
||||
String partitionPath = groupedRecords.get(0).getPartitionPath();
|
||||
HoodieRecordLocation location = groupedRecords.get(0).getCurrentLocation();
|
||||
try (HoodieLogFormat.Writer logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(basePath, partitionPath))
|
||||
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId())
|
||||
.overBaseCommit(location.getInstantTime()).withFs(fs).build()) {
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, location.getInstantTime());
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
|
||||
logWriter.appendBlock(new HoodieAvroDataBlock(groupedRecords.stream().map(r -> {
|
||||
try {
|
||||
GenericRecord val = (GenericRecord) r.getData().getInsertValue(schema).get();
|
||||
HoodieAvroUtils.addHoodieKeyToRecord(val, r.getRecordKey(), r.getPartitionPath(), "");
|
||||
return (IndexedRecord) val;
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to convert record " + r.toString(), e);
|
||||
return null;
|
||||
}
|
||||
}).collect(Collectors.toList()), header));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user