1
0

[HUDI-1504] Allow log files generated during restore/rollback to be synced as well

- TestHoodieBackedMetadata#testSync etc now run for MOR tables
 - HUDI-1502 is still pending and has issues for MOR/rollbacks
 - Also addressed bunch of code review comments.
This commit is contained in:
Vinoth Chandar
2021-01-04 01:15:49 -08:00
committed by vinoth chandar
parent 1a0579ca7d
commit 31e674eb57
10 changed files with 53 additions and 68 deletions

View File

@@ -29,7 +29,7 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieBackedTableMetadata; import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metrics.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.springframework.shell.core.CommandMarker; import org.springframework.shell.core.CommandMarker;
@@ -117,7 +117,7 @@ public class MetadataCommand implements CommandMarker {
// Metadata directory does not exist // Metadata directory does not exist
} }
return String.format("Removed Metdata Table from %s", metadataPath); return String.format("Removed Metadata Table from %s", metadataPath);
} }
@CliCommand(value = "metadata init", help = "Update the metadata table from commits since the creation") @CliCommand(value = "metadata init", help = "Update the metadata table from commits since the creation")

View File

@@ -481,11 +481,6 @@ public class CompactionAdminClient extends AbstractHoodieClient {
throw new HoodieException("FileGroupId " + fgId + " not in pending compaction"); throw new HoodieException("FileGroupId " + fgId + " not in pending compaction");
} }
@Override
protected void initWrapperFSMetrics() {
// no-op
}
/** /**
* Holds Operation result for Renaming. * Holds Operation result for Renaming.
*/ */

View File

@@ -88,11 +88,6 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
private final HoodieTable<T, I, K, O> table; private final HoodieTable<T, I, K, O> table;
private final HoodieTableMetaClient metaClient; private final HoodieTableMetaClient metaClient;
/*
public HoodieTimelineArchiveLog(HoodieWriteConfig config, Configuration configuration) {
this(config, HoodieTable.create(config, configuration));
}*/
public HoodieTimelineArchiveLog(HoodieWriteConfig config, HoodieTable<T, I, K, O> table) { public HoodieTimelineArchiveLog(HoodieWriteConfig config, HoodieTable<T, I, K, O> table) {
this.config = config; this.config = config;
this.table = table; this.table = table;

View File

@@ -44,7 +44,7 @@ import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndex; import org.apache.hudi.index.SparkHoodieIndex;
import org.apache.hudi.metrics.DistributedRegistry; import org.apache.hudi.metrics.DistributedRegistry;
import org.apache.hudi.metrics.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;

View File

@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.metrics; package org.apache.hudi.metadata;
import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
@@ -35,10 +35,7 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter; import org.apache.hudi.metrics.DistributedRegistry;
import org.apache.hudi.metadata.HoodieMetadataMetrics;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;

View File

@@ -18,24 +18,6 @@
package org.apache.hudi.metadata; package org.apache.hudi.metadata;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -66,10 +48,12 @@ import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metrics.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
@@ -77,8 +61,24 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieBackedMetadata extends HoodieClientTestHarness { public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
private static final Logger LOG = LogManager.getLogger(TestHoodieBackedMetadata.class); private static final Logger LOG = LogManager.getLogger(TestHoodieBackedMetadata.class);
@@ -172,13 +172,10 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
/** /**
* Test various table operations sync to Metadata Table correctly. * Test various table operations sync to Metadata Table correctly.
*/ */
//@ParameterizedTest @ParameterizedTest
//@EnumSource(HoodieTableType.class) @EnumSource(HoodieTableType.class)
//public void testTableOperations(HoodieTableType tableType) throws Exception { public void testTableOperations(HoodieTableType tableType) throws Exception {
@Test init(tableType);
public void testTableOperations() throws Exception {
//FIXME(metadata): This is broken for MOR, until HUDI-1434 is fixed
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
@@ -281,7 +278,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
assertNoWriteErrors(writeStatuses); assertNoWriteErrors(writeStatuses);
validateMetadata(client); validateMetadata(client);
// Rollback of inserts // Write 2 (inserts) + Rollback of inserts
newCommitTime = HoodieActiveTimeline.createNewInstantTime(); newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime); client.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 20); records = dataGen.generateInserts(newCommitTime, 20);
@@ -292,7 +289,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
client.syncTableMetadata(); client.syncTableMetadata();
validateMetadata(client); validateMetadata(client);
// Rollback of updates // Write 3 (updates) + Rollback of updates
newCommitTime = HoodieActiveTimeline.createNewInstantTime(); newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime); client.startCommitWithTime(newCommitTime);
records = dataGen.generateUniqueUpdates(newCommitTime, 20); records = dataGen.generateUniqueUpdates(newCommitTime, 20);
@@ -341,7 +338,6 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
client.rollback(newCommitTime); client.rollback(newCommitTime);
client.syncTableMetadata(); client.syncTableMetadata();
validateMetadata(client); validateMetadata(client);
} }
// Rollback of partial commits // Rollback of partial commits
@@ -411,13 +407,10 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
/** /**
* Test sync of table operations. * Test sync of table operations.
*/ */
//@ParameterizedTest @ParameterizedTest
//@EnumSource(HoodieTableType.class) @EnumSource(HoodieTableType.class)
//public void testSync(HoodieTableType tableType) throws Exception { public void testSync(HoodieTableType tableType) throws Exception {
@Test init(tableType);
public void testSync() throws Exception {
//FIXME(metadata): This is broken for MOR, until HUDI-1434 is fixed
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
String newCommitTime; String newCommitTime;
@@ -453,6 +446,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
} }
// Various table operations without metadata table enabled // Various table operations without metadata table enabled
String restoreToInstant;
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
// updates // updates
newCommitTime = HoodieActiveTimeline.createNewInstantTime(); newCommitTime = HoodieActiveTimeline.createNewInstantTime();
@@ -479,7 +473,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
} }
// Savepoint // Savepoint
String savepointInstant = newCommitTime; restoreToInstant = newCommitTime;
if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) { if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) {
client.savepoint("hoodie", "metadata test"); client.savepoint("hoodie", "metadata test");
assertFalse(metadata(client).isInSync()); assertFalse(metadata(client).isInSync());
@@ -505,21 +499,20 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses); assertNoWriteErrors(writeStatuses);
assertFalse(metadata(client).isInSync()); assertFalse(metadata(client).isInSync());
client.restoreToInstant(savepointInstant);
assertFalse(metadata(client).isInSync());
} }
// Enable metadata table and ensure it is synced // Enable metadata table and ensure it is synced
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
// Restore cannot be done until the metadata table is in sync. See HUDI-1502 for details
client.restoreToInstant(restoreToInstant);
assertFalse(metadata(client).isInSync());
newCommitTime = HoodieActiveTimeline.createNewInstantTime(); newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime); client.startCommitWithTime(newCommitTime);
validateMetadata(client); validateMetadata(client);
assertTrue(metadata(client).isInSync()); assertTrue(metadata(client).isInSync());
} }
} }
/** /**
@@ -673,8 +666,6 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
/** /**
* Test when reading from metadata table which is out of sync with dataset that results are still consistent. * Test when reading from metadata table which is out of sync with dataset that results are still consistent.
*/ */
// @ParameterizedTest
// @EnumSource(HoodieTableType.class)
@Test @Test
public void testMetadataOutOfSync() throws Exception { public void testMetadataOutOfSync() throws Exception {
init(HoodieTableType.COPY_ON_WRITE); init(HoodieTableType.COPY_ON_WRITE);

View File

@@ -206,7 +206,7 @@ public class FSUtils {
public static List<String> getAllFoldersWithPartitionMetaFile(FileSystem fs, String basePathStr) throws IOException { public static List<String> getAllFoldersWithPartitionMetaFile(FileSystem fs, String basePathStr) throws IOException {
// If the basePathStr is a folder within the .hoodie directory then we are listing partitions within an // If the basePathStr is a folder within the .hoodie directory then we are listing partitions within an
// internal table. // internal table.
final boolean isMetadataTable = basePathStr.contains(HoodieTableMetaClient.METAFOLDER_NAME); final boolean isMetadataTable = HoodieTableMetadata.isMetadataTable(basePathStr);
final Path basePath = new Path(basePathStr); final Path basePath = new Path(basePathStr);
final List<String> partitions = new ArrayList<>(); final List<String> partitions = new ArrayList<>();
processFiles(fs, basePathStr, (locatedFileStatus) -> { processFiles(fs, basePathStr, (locatedFileStatus) -> {

View File

@@ -270,10 +270,10 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
} }
HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath); HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
List<HoodieInstant> unsyncedInstants = findInstantsToSync(datasetMetaClient); List<HoodieInstant> unSyncedInstants = findInstantsToSync(datasetMetaClient);
Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema()); Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
timelineRecordScanner = timelineRecordScanner =
new HoodieMetadataMergedInstantRecordScanner(datasetMetaClient, unsyncedInstants, getSyncedInstantTime(), schema, MAX_MEMORY_SIZE_IN_BYTES, spillableMapDirectory, null); new HoodieMetadataMergedInstantRecordScanner(datasetMetaClient, unSyncedInstants, getSyncedInstantTime(), schema, MAX_MEMORY_SIZE_IN_BYTES, spillableMapDirectory, null);
} }
protected List<HoodieInstant> findInstantsToSync() { protected List<HoodieInstant> findInstantsToSync() {

View File

@@ -171,7 +171,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
* Returns the list of filenames deleted as part of this record. * Returns the list of filenames deleted as part of this record.
*/ */
public List<String> getDeletions() { public List<String> getDeletions() {
return filterFileInfoEntries(true).map(e -> e.getKey()).sorted().collect(Collectors.toList()); return filterFileInfoEntries(true).map(Map.Entry::getKey).sorted().collect(Collectors.toList());
} }
/** /**

View File

@@ -238,13 +238,20 @@ public class HoodieTableMetadataUtil {
Option<String> lastSyncTs) { Option<String> lastSyncTs) {
rollbackMetadata.getPartitionMetadata().values().forEach(pm -> { rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
// Has this rollback produced new files?
boolean hasAppendFiles = pm.getAppendFiles().values().stream().mapToLong(Long::longValue).sum() > 0;
// If commit being rolled back has not been synced to metadata table yet then there is no need to update metadata // If commit being rolled back has not been synced to metadata table yet then there is no need to update metadata
if (lastSyncTs.isPresent() && HoodieTimeline.compareTimestamps(rollbackMetadata.getCommitsRollback().get(0), HoodieTimeline.GREATER_THAN, lastSyncTs.get())) { boolean shouldSkip = lastSyncTs.isPresent()
&& HoodieTimeline.compareTimestamps(rollbackMetadata.getCommitsRollback().get(0), HoodieTimeline.GREATER_THAN, lastSyncTs.get());
if (!hasAppendFiles && shouldSkip) {
LOG.info(String.format("Skipping syncing of rollbackMetadata at %s, given metadata table is already synced upto to %s",
rollbackMetadata.getCommitsRollback().get(0), lastSyncTs.get()));
return; return;
} }
final String partition = pm.getPartitionPath(); final String partition = pm.getPartitionPath();
if (!pm.getSuccessDeleteFiles().isEmpty()) { if (!pm.getSuccessDeleteFiles().isEmpty() && !shouldSkip) {
if (!partitionToDeletedFiles.containsKey(partition)) { if (!partitionToDeletedFiles.containsKey(partition)) {
partitionToDeletedFiles.put(partition, new ArrayList<>()); partitionToDeletedFiles.put(partition, new ArrayList<>());
} }