[HUDI-2952] Fixing metadata table for non-partitioned dataset (#4243)
This commit is contained in:
committed by
GitHub
parent
f194566ed4
commit
be368264f4
@@ -149,7 +149,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(TestHoodieBackedMetadata.class);
|
||||
|
||||
public static List<Arguments> bootstrapAndTableOperationTestArgs() {
|
||||
public static List<Arguments> tableTypeAndEnableOperationArgs() {
|
||||
return asList(
|
||||
Arguments.of(COPY_ON_WRITE, true),
|
||||
Arguments.of(COPY_ON_WRITE, false),
|
||||
@@ -162,7 +162,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
* Metadata Table bootstrap scenarios.
|
||||
*/
|
||||
@ParameterizedTest
|
||||
@MethodSource("bootstrapAndTableOperationTestArgs")
|
||||
@MethodSource("tableTypeAndEnableOperationArgs")
|
||||
public void testMetadataTableBootstrap(HoodieTableType tableType, boolean addRollback) throws Exception {
|
||||
init(tableType, false);
|
||||
// bootstrap with few commits
|
||||
@@ -243,7 +243,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
* Test various table operations sync to Metadata Table correctly.
|
||||
*/
|
||||
@ParameterizedTest
|
||||
@MethodSource("bootstrapAndTableOperationTestArgs")
|
||||
@MethodSource("tableTypeAndEnableOperationArgs")
|
||||
public void testTableOperations(HoodieTableType tableType, boolean enableFullScan) throws Exception {
|
||||
init(tableType, true, enableFullScan, false, false);
|
||||
doWriteInsertAndUpsert(testTable);
|
||||
@@ -319,6 +319,16 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
validateMetadata(testTable, emptyList(), true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetadataInsertUpsertCleanNonPartitioned() throws Exception {
|
||||
HoodieTableType tableType = COPY_ON_WRITE;
|
||||
init(tableType);
|
||||
doWriteOperationNonPartitioned(testTable, "0000001", INSERT);
|
||||
doWriteOperationNonPartitioned(testTable, "0000002", UPSERT);
|
||||
testTable.doCleanBasedOnCommits("0000003", Arrays.asList("0000001"));
|
||||
validateMetadata(testTable, emptyList(), true);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(HoodieTableType.class)
|
||||
public void testInsertUpsertCluster(HoodieTableType tableType) throws Exception {
|
||||
@@ -509,7 +519,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
doWriteInsertAndUpsert(testTable);
|
||||
|
||||
// trigger an upsert
|
||||
doWriteOperationAndValidate(testTable, "0000003");
|
||||
doWriteOperation(testTable, "0000003", UPSERT);
|
||||
|
||||
// trigger a commit and rollback
|
||||
doWriteOperation(testTable, "0000004");
|
||||
@@ -549,6 +559,27 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
validateMetadata(testTable, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRollbackOperationsNonPartitioned() throws Exception {
|
||||
HoodieTableType tableType = COPY_ON_WRITE;
|
||||
init(tableType);
|
||||
doWriteInsertAndUpsertNonPartitioned(testTable);
|
||||
|
||||
// trigger an upsert
|
||||
doWriteOperationNonPartitioned(testTable, "0000003", UPSERT);
|
||||
|
||||
// trigger a commit and rollback
|
||||
doWriteOperationNonPartitioned(testTable, "0000004", UPSERT);
|
||||
doRollback(testTable, "0000004", "0000005");
|
||||
validateMetadata(testTable);
|
||||
|
||||
// trigger few upserts and validate
|
||||
for (int i = 6; i < 10; i++) {
|
||||
doWriteOperationNonPartitioned(testTable, "000000" + i, UPSERT);
|
||||
}
|
||||
validateMetadata(testTable);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that manual rollbacks work correctly and enough timeline history is maintained on the metadata table
|
||||
* timeline.
|
||||
@@ -573,7 +604,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
.build();
|
||||
|
||||
initWriteConfigAndMetatableWriter(writeConfig, true);
|
||||
doWriteInsertAndUpsert(testTable, "000001", "000002");
|
||||
doWriteInsertAndUpsert(testTable, "000001", "000002", false);
|
||||
|
||||
for (int i = 3; i < 10; i++) {
|
||||
doWriteOperation(testTable, "00000" + i);
|
||||
@@ -674,8 +705,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(HoodieTableType.class)
|
||||
public void testMetadataBootstrapLargeCommitList(HoodieTableType tableType) throws Exception {
|
||||
@MethodSource("tableTypeAndEnableOperationArgs")
|
||||
public void testMetadataBootstrapLargeCommitList(HoodieTableType tableType, boolean nonPartitionedDataset) throws Exception {
|
||||
init(tableType, true, true, true, false);
|
||||
long baseCommitTime = Long.parseLong(HoodieActiveTimeline.createNewInstantTime());
|
||||
for (int i = 1; i < 25; i += 7) {
|
||||
@@ -687,17 +718,17 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
long commitTime6 = getNextCommitTime(commitTime5);
|
||||
long commitTime7 = getNextCommitTime(commitTime6);
|
||||
baseCommitTime = commitTime7;
|
||||
doWriteOperation(testTable, Long.toString(commitTime1), INSERT);
|
||||
doWriteOperation(testTable, Long.toString(commitTime2));
|
||||
doWriteOperation(testTable, Long.toString(commitTime1), INSERT, nonPartitionedDataset);
|
||||
doWriteOperation(testTable, Long.toString(commitTime2), UPSERT, nonPartitionedDataset);
|
||||
doClean(testTable, Long.toString(commitTime3), Arrays.asList(Long.toString(commitTime1)));
|
||||
doWriteOperation(testTable, Long.toString(commitTime4));
|
||||
doWriteOperation(testTable, Long.toString(commitTime4), UPSERT, nonPartitionedDataset);
|
||||
if (tableType == MERGE_ON_READ) {
|
||||
doCompaction(testTable, Long.toString(commitTime5));
|
||||
doCompaction(testTable, Long.toString(commitTime5), nonPartitionedDataset);
|
||||
}
|
||||
doWriteOperation(testTable, Long.toString(commitTime6));
|
||||
doWriteOperation(testTable, Long.toString(commitTime6), UPSERT, nonPartitionedDataset);
|
||||
doRollback(testTable, Long.toString(commitTime6), Long.toString(commitTime7));
|
||||
}
|
||||
validateMetadata(testTable, emptyList(), true);
|
||||
validateMetadata(testTable, emptyList(), nonPartitionedDataset);
|
||||
}
|
||||
|
||||
// Some operations are not feasible with test table infra. hence using write client to test those cases.
|
||||
@@ -1563,8 +1594,12 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
validateMetadata(testTable);
|
||||
}
|
||||
|
||||
private void doWriteInsertAndUpsertNonPartitioned(HoodieTestTable testTable) throws Exception {
|
||||
doWriteInsertAndUpsert(testTable, "0000001", "0000002", true);
|
||||
}
|
||||
|
||||
private void doWriteInsertAndUpsert(HoodieTestTable testTable) throws Exception {
|
||||
doWriteInsertAndUpsert(testTable, "0000001", "0000002");
|
||||
doWriteInsertAndUpsert(testTable, "0000001", "0000002", false);
|
||||
}
|
||||
|
||||
private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean mergeAllowDuplicateInserts) {
|
||||
|
||||
@@ -61,7 +61,7 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase {
|
||||
}
|
||||
|
||||
private void doWriteInsertAndUpsert(HoodieTestTable testTable) throws Exception {
|
||||
doWriteInsertAndUpsert(testTable, "0000001", "0000002");
|
||||
doWriteInsertAndUpsert(testTable, "0000001", "0000002", false);
|
||||
}
|
||||
|
||||
private void verifyBaseMetadataTable() throws IOException {
|
||||
|
||||
@@ -109,10 +109,10 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
|
||||
cleanupResources();
|
||||
}
|
||||
|
||||
protected void doWriteInsertAndUpsert(HoodieTestTable testTable, String commit1, String commit2) throws Exception {
|
||||
testTable.doWriteOperation(commit1, INSERT, asList("p1", "p2"), asList("p1", "p2"),
|
||||
protected void doWriteInsertAndUpsert(HoodieTestTable testTable, String commit1, String commit2, boolean nonPartitioned) throws Exception {
|
||||
testTable.doWriteOperation(commit1, INSERT, nonPartitioned ? asList("") : asList("p1", "p2"), nonPartitioned ? asList("") : asList("p1", "p2"),
|
||||
4, false);
|
||||
testTable.doWriteOperation(commit2, UPSERT, asList("p1", "p2"),
|
||||
testTable.doWriteOperation(commit2, UPSERT, nonPartitioned ? asList("") : asList("p1", "p2"),
|
||||
4, false);
|
||||
validateMetadata(testTable);
|
||||
}
|
||||
@@ -135,6 +135,18 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
|
||||
validateMetadata(testTable);
|
||||
}
|
||||
|
||||
protected void doWriteOperationNonPartitioned(HoodieTestTable testTable, String commitTime, WriteOperationType operationType) throws Exception {
|
||||
testTable.doWriteOperation(commitTime, operationType, emptyList(), asList(""), 3);
|
||||
}
|
||||
|
||||
protected void doWriteOperation(HoodieTestTable testTable, String commitTime, WriteOperationType operationType, boolean nonPartitioned) throws Exception {
|
||||
if (nonPartitioned) {
|
||||
doWriteOperationNonPartitioned(testTable, commitTime, operationType);
|
||||
} else {
|
||||
doWriteOperation(testTable, commitTime, operationType);
|
||||
}
|
||||
}
|
||||
|
||||
protected void doWriteOperation(HoodieTestTable testTable, String commitTime, WriteOperationType operationType) throws Exception {
|
||||
testTable.doWriteOperation(commitTime, operationType, emptyList(), asList("p1", "p2"), 3);
|
||||
}
|
||||
@@ -154,16 +166,28 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
|
||||
}
|
||||
}
|
||||
|
||||
protected void doCompactionNonPartitioned(HoodieTestTable testTable, String commitTime) throws Exception {
|
||||
doCompactionInternal(testTable, commitTime, false, true);
|
||||
}
|
||||
|
||||
protected void doCompaction(HoodieTestTable testTable, String commitTime, boolean nonPartitioned) throws Exception {
|
||||
doCompactionInternal(testTable, commitTime, false, nonPartitioned);
|
||||
}
|
||||
|
||||
protected void doCompaction(HoodieTestTable testTable, String commitTime) throws Exception {
|
||||
doCompactionInternal(testTable, commitTime, false);
|
||||
doCompactionInternal(testTable, commitTime, false, false);
|
||||
}
|
||||
|
||||
protected void doCompactionNonPartitionedAndValidate(HoodieTestTable testTable, String commitTime) throws Exception {
|
||||
doCompactionInternal(testTable, commitTime, true, true);
|
||||
}
|
||||
|
||||
protected void doCompactionAndValidate(HoodieTestTable testTable, String commitTime) throws Exception {
|
||||
doCompactionInternal(testTable, commitTime, true);
|
||||
doCompactionInternal(testTable, commitTime, true, false);
|
||||
}
|
||||
|
||||
private void doCompactionInternal(HoodieTestTable testTable, String commitTime, boolean validate) throws Exception {
|
||||
testTable.doCompaction(commitTime, asList("p1", "p2"));
|
||||
private void doCompactionInternal(HoodieTestTable testTable, String commitTime, boolean validate, boolean nonPartitioned) throws Exception {
|
||||
testTable.doCompaction(commitTime, nonPartitioned ? asList("") : asList("p1", "p2"));
|
||||
if (validate) {
|
||||
validateMetadata(testTable);
|
||||
}
|
||||
|
||||
@@ -235,7 +235,7 @@ public class TestHoodieMetadataBootstrap extends TestHoodieMetadataBase {
|
||||
}
|
||||
|
||||
private void doWriteInsertAndUpsert(HoodieTestTable testTable) throws Exception {
|
||||
doWriteInsertAndUpsert(testTable, "0000100", "0000101");
|
||||
doWriteInsertAndUpsert(testTable, "0000100", "0000101", false);
|
||||
}
|
||||
|
||||
private HoodieWriteConfig getWriteConfig(int minArchivalCommits, int maxArchivalCommits) throws Exception {
|
||||
|
||||
@@ -532,6 +532,9 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
|
||||
List<java.nio.file.Path> fsPartitionPaths = testTable.getAllPartitionPaths();
|
||||
List<String> fsPartitions = new ArrayList<>();
|
||||
fsPartitionPaths.forEach(entry -> fsPartitions.add(entry.getFileName().toString()));
|
||||
if (fsPartitions.isEmpty()) {
|
||||
fsPartitions.add("");
|
||||
}
|
||||
List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();
|
||||
|
||||
Collections.sort(fsPartitions);
|
||||
@@ -618,7 +621,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
|
||||
}
|
||||
}
|
||||
}
|
||||
assertEquals(fsStatuses.length, partitionToFilesMap.get(basePath + "/" + partition).length);
|
||||
assertEquals(fsStatuses.length, partitionToFilesMap.get(partitionPath.toString()).length);
|
||||
|
||||
// Block sizes should be valid
|
||||
Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getBlockSize() > 0));
|
||||
|
||||
@@ -48,6 +48,7 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable {
|
||||
String RECORDKEY_PARTITION_LIST = "__all_partitions__";
|
||||
// The partition name used for non-partitioned tables
|
||||
String NON_PARTITIONED_NAME = ".";
|
||||
String EMPTY_PARTITION_NAME = "";
|
||||
|
||||
// Base path of the Metadata Table relative to the dataset (.hoodie/metadata)
|
||||
static final String METADATA_TABLE_REL_PATH = HoodieTableMetaClient.METAFOLDER_NAME + Path.SEPARATOR + "metadata";
|
||||
|
||||
@@ -52,6 +52,7 @@ import java.util.function.BiFunction;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME;
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
|
||||
|
||||
/**
|
||||
@@ -89,7 +90,7 @@ public class HoodieTableMetadataUtil {
|
||||
List<HoodieRecord> records = new LinkedList<>();
|
||||
List<String> allPartitions = new LinkedList<>();
|
||||
commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> {
|
||||
final String partition = partitionStatName.equals("") ? NON_PARTITIONED_NAME : partitionStatName;
|
||||
final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionStatName;
|
||||
allPartitions.add(partition);
|
||||
|
||||
Map<String, Long> newFiles = new HashMap<>(writeStats.size());
|
||||
@@ -133,7 +134,8 @@ public class HoodieTableMetadataUtil {
|
||||
public static List<HoodieRecord> convertMetadataToRecords(HoodieCleanMetadata cleanMetadata, String instantTime) {
|
||||
List<HoodieRecord> records = new LinkedList<>();
|
||||
int[] fileDeleteCount = {0};
|
||||
cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> {
|
||||
cleanMetadata.getPartitionMetadata().forEach((partitionName, partitionMetadata) -> {
|
||||
final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName;
|
||||
// Files deleted from a partition
|
||||
List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
|
||||
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
|
||||
@@ -282,12 +284,13 @@ public class HoodieTableMetadataUtil {
|
||||
List<HoodieRecord> records = new LinkedList<>();
|
||||
int[] fileChangeCount = {0, 0}; // deletes, appends
|
||||
|
||||
partitionToDeletedFiles.forEach((partition, deletedFiles) -> {
|
||||
partitionToDeletedFiles.forEach((partitionName, deletedFiles) -> {
|
||||
fileChangeCount[0] += deletedFiles.size();
|
||||
final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName;
|
||||
|
||||
Option<Map<String, Long>> filesAdded = Option.empty();
|
||||
if (partitionToAppendedFiles.containsKey(partition)) {
|
||||
filesAdded = Option.of(partitionToAppendedFiles.remove(partition));
|
||||
if (partitionToAppendedFiles.containsKey(partitionName)) {
|
||||
filesAdded = Option.of(partitionToAppendedFiles.remove(partitionName));
|
||||
}
|
||||
|
||||
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded,
|
||||
@@ -295,7 +298,8 @@ public class HoodieTableMetadataUtil {
|
||||
records.add(record);
|
||||
});
|
||||
|
||||
partitionToAppendedFiles.forEach((partition, appendedFileMap) -> {
|
||||
partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
|
||||
final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName;
|
||||
fileChangeCount[1] += appendedFileMap.size();
|
||||
|
||||
// Validate that no appended file has been deleted
|
||||
|
||||
@@ -367,7 +367,9 @@ public class FileCreateUtils {
|
||||
if (Files.notExists(basePath)) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return Files.list(basePath).filter(entry -> !entry.getFileName().toString().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList());
|
||||
return Files.list(basePath).filter(entry -> (!entry.getFileName().toString().equals(HoodieTableMetaClient.METAFOLDER_NAME)
|
||||
&& !entry.getFileName().toString().contains("parquet") && !entry.getFileName().toString().contains("log"))
|
||||
&& !entry.getFileName().toString().endsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -602,7 +602,7 @@ public class HoodieTestTable {
|
||||
}
|
||||
|
||||
public List<java.nio.file.Path> getAllPartitionPaths() throws IOException {
|
||||
java.nio.file.Path basePathPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME).getParent().getParent();
|
||||
java.nio.file.Path basePathPath = Paths.get(basePath);
|
||||
return FileCreateUtils.getPartitionPaths(basePathPath);
|
||||
}
|
||||
|
||||
@@ -660,8 +660,10 @@ public class HoodieTestTable {
|
||||
return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString())).stream()
|
||||
.filter(entry -> {
|
||||
boolean toReturn = true;
|
||||
String filePath = entry.getPath().toString();
|
||||
String fileName = entry.getPath().getName();
|
||||
if (fileName.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
|
||||
if (fileName.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE) || (!fileName.contains("log") && !fileName.contains("parquet"))
|
||||
|| filePath.contains("metadata")) {
|
||||
toReturn = false;
|
||||
} else {
|
||||
for (String inflight : inflightCommits) {
|
||||
|
||||
Reference in New Issue
Block a user