1
0

[HUDI-1479] Use HoodieEngineContext to parallelize fetching of partiton paths (#2417)

* [HUDI-1479] Use HoodieEngineContext to parallelize fetching of partition paths

* Adding testClass for FileSystemBackedTableMetadata

Co-authored-by: Nishith Agarwal <nagarwal@uber.com>
This commit is contained in:
Udit Mehrotra
2021-01-10 21:19:52 -08:00
committed by GitHub
parent 23e93d05c0
commit 7ce3ac778e
38 changed files with 509 additions and 100 deletions

View File

@@ -119,6 +119,10 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
return config;
}
public HoodieEngineContext getEngineContext() {
return context;
}
protected void initWrapperFSMetrics() {
// no-op.
}

View File

@@ -221,8 +221,9 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
protected abstract void initialize(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient);
private void initTableMetadata() {
this.metadata = new HoodieBackedTableMetadata(hadoopConf.get(), datasetWriteConfig.getBasePath(), datasetWriteConfig.getSpillableMapBasePath(),
datasetWriteConfig.useFileListingMetadata(), datasetWriteConfig.getFileListingMetadataVerify(), false,
this.metadata = new HoodieBackedTableMetadata(engineContext, datasetWriteConfig.getBasePath(),
datasetWriteConfig.getSpillableMapBasePath(), datasetWriteConfig.useFileListingMetadata(),
datasetWriteConfig.getFileListingMetadataVerify(), false,
datasetWriteConfig.shouldAssumeDatePartitioning());
this.metaClient = metadata.getMetaClient();
}

View File

@@ -31,6 +31,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.ConsistencyGuard;
import org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility;
@@ -93,6 +94,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
protected final HoodieWriteConfig config;
protected final HoodieTableMetaClient metaClient;
protected final transient HoodieEngineContext context;
protected final HoodieIndex<T, I, K, O> index;
private SerializableConfiguration hadoopConfiguration;
@@ -108,6 +110,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
config.getViewStorageConfig());
this.metaClient = metaClient;
this.index = getIndex(config, context);
this.context = context;
this.taskContextSupplier = context.getTaskContextSupplier();
}
@@ -660,8 +663,16 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
public HoodieTableMetadata metadata() {
if (metadata == null) {
metadata = HoodieTableMetadata.create(hadoopConfiguration.get(), config.getBasePath(), config.getSpillableMapBasePath(),
config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.isMetricsOn(), config.shouldAssumeDatePartitioning());
HoodieEngineContext engineContext = context;
if (engineContext == null) {
// This is to handle scenarios where this is called at the executor tasks which do not have access
// to engine context, and it ends up being null (as its not serializable and marked transient here).
engineContext = new HoodieLocalEngineContext(hadoopConfiguration.get());
}
metadata = HoodieTableMetadata.create(engineContext, config.getBasePath(), config.getSpillableMapBasePath(),
config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.isMetricsOn(),
config.shouldAssumeDatePartitioning());
}
return metadata;
}

View File

@@ -67,7 +67,7 @@ public abstract class PartitionAwareClusteringPlanStrategy<T extends HoodieRecor
HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient();
LOG.info("Scheduling clustering for " + metaClient.getBasePath());
HoodieWriteConfig config = getWriteConfig();
List<String> partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
List<String> partitionPaths = FSUtils.getAllPartitionPaths(getEngineContext(), metaClient.getFs(), metaClient.getBasePath(),
config.useFileListingMetadata(), config.getFileListingMetadataVerify(),
config.shouldAssumeDatePartitioning());

View File

@@ -91,9 +91,10 @@ public class RollbackUtils {
* @param config instance of {@link HoodieWriteConfig} to use.
* @return {@link List} of {@link ListingBasedRollbackRequest}s thus collected.
*/
public static List<ListingBasedRollbackRequest> generateRollbackRequestsByListingCOW(FileSystem fs, String basePath, HoodieWriteConfig config) {
public static List<ListingBasedRollbackRequest> generateRollbackRequestsByListingCOW(HoodieEngineContext engineContext,
FileSystem fs, String basePath, HoodieWriteConfig config) {
try {
return FSUtils.getAllPartitionPaths(fs, basePath, config.useFileListingMetadata(),
return FSUtils.getAllPartitionPaths(engineContext, fs, basePath, config.useFileListingMetadata(),
config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning()).stream()
.map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction)
.collect(Collectors.toList());
@@ -113,7 +114,7 @@ public class RollbackUtils {
public static List<ListingBasedRollbackRequest> generateRollbackRequestsUsingFileListingMOR(HoodieInstant instantToRollback, HoodieTable table, HoodieEngineContext context) throws IOException {
String commit = instantToRollback.getTimestamp();
HoodieWriteConfig config = table.getConfig();
List<String> partitions = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
List<String> partitions = FSUtils.getAllPartitionPaths(context, table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1);
context.setJobStatus(RollbackUtils.class.getSimpleName(), "Generate all rollback requests");

View File

@@ -90,7 +90,7 @@ public class SavepointActionExecutor<T extends HoodieRecordPayload, I, K, O> ext
"Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained);
context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest files for savepoint " + instantTime);
List<String> partitions = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(),
List<String> partitions = FSUtils.getAllPartitionPaths(context, table.getMetaClient().getFs(),
table.getMetaClient().getBasePath(), config.useFileListingMetadata(), config.getFileListingMetadataVerify(),
config.shouldAssumeDatePartitioning()
);

View File

@@ -65,7 +65,7 @@ public class FlinkCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayloa
@Override
protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(
table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), config);
context, table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), config);
return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests);
}
}

View File

@@ -93,8 +93,8 @@ public class ZeroToOneUpgradeHandler implements UpgradeHandler {
// generate rollback stats
List<ListingBasedRollbackRequest> rollbackRequests;
if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) {
rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
table.getConfig());
rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getFs(),
table.getMetaClient().getBasePath(), table.getConfig());
} else {
rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context);
}

View File

@@ -63,7 +63,7 @@ public class SparkHoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends
final HoodieTable hoodieTable) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
try {
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(context, metaClient.getFs(), metaClient.getBasePath(),
config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
return super.loadInvolvedFiles(allPartitionPaths, context, hoodieTable);
} catch (IOException e) {

View File

@@ -104,7 +104,7 @@ public class SparkHoodieGlobalSimpleIndex<T extends HoodieRecordPayload> extends
final HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
try {
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(context, metaClient.getFs(), metaClient.getBasePath(),
config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
// Obtain the latest data files from all the partitions.
return getLatestBaseFilesForAllPartitions(allPartitionPaths, context, hoodieTable);

View File

@@ -50,7 +50,7 @@ public class SparkInsertOverwriteTableCommitActionExecutor<T extends HoodieRecor
protected Map<String, List<String>> getPartitionToReplacedFileIds(JavaRDD<WriteStatus> writeStatuses) {
Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
try {
List<String> partitionPaths = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(),
List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, table.getMetaClient().getFs(),
table.getMetaClient().getBasePath(), config.useFileListingMetadata(), config.getFileListingMetadataVerify(),
false);
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);

View File

@@ -195,7 +195,7 @@ public class HoodieSparkMergeOnReadTableCompactor<T extends HoodieRecordPayload>
// TODO - rollback any compactions in flight
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
LOG.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime);
List<String> partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, metaClient.getFs(), metaClient.getBasePath(),
config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
// filter the partition paths if needed to reduce list status

View File

@@ -66,8 +66,8 @@ public class SparkCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayloa
@Override
protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(),
table.getMetaClient().getBasePath(), config);
List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context,
table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), config);
return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests);
}
}

View File

@@ -92,8 +92,8 @@ public class ZeroToOneUpgradeHandler implements UpgradeHandler {
// generate rollback stats
List<ListingBasedRollbackRequest> rollbackRequests;
if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) {
rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
table.getConfig());
rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getFs(),
table.getMetaClient().getBasePath(), table.getConfig());
} else {
rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context);
}

View File

@@ -101,7 +101,7 @@ public class TestClientRollback extends HoodieClientTestBase {
assertNoWriteErrors(statuses);
HoodieWriteConfig config = getConfig();
List<String> partitionPaths =
FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), config.useFileListingMetadata(),
FSUtils.getAllPartitionPaths(context, fs, cfg.getBasePath(), config.useFileListingMetadata(),
config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieSparkTable table = HoodieSparkTable.create(getConfig(), context, metaClient);

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
@@ -454,7 +455,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
records = dataGen.generateUniqueUpdates(newCommitTime, 5);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
assertFalse(metadata(client).isInSync());
assertTrue(metadata(client).isInSync());
// updates and inserts
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
@@ -462,21 +463,21 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
records = dataGen.generateUpdates(newCommitTime, 10);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
assertFalse(metadata(client).isInSync());
assertTrue(metadata(client).isInSync());
// Compaction
if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
client.compact(newCommitTime);
assertFalse(metadata(client).isInSync());
assertTrue(metadata(client).isInSync());
}
// Savepoint
restoreToInstant = newCommitTime;
if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) {
client.savepoint("hoodie", "metadata test");
assertFalse(metadata(client).isInSync());
assertTrue(metadata(client).isInSync());
}
// Deletes
@@ -485,12 +486,12 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey());
client.startCommitWithTime(newCommitTime);
client.delete(deleteKeys, newCommitTime);
assertFalse(metadata(client).isInSync());
assertTrue(metadata(client).isInSync());
// Clean
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.clean(newCommitTime);
assertFalse(metadata(client).isInSync());
assertTrue(metadata(client).isInSync());
// updates
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
@@ -498,8 +499,8 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
records = dataGen.generateUniqueUpdates(newCommitTime, 10);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
assertFalse(metadata(client).isInSync());
assertTrue(metadata(client).isInSync());
// insert overwrite to test replacecommit
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION);
@@ -507,7 +508,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
HoodieWriteResult replaceResult = client.insertOverwrite(jsc.parallelize(records, 1), newCommitTime);
writeStatuses = replaceResult.getWriteStatuses().collect();
assertNoWriteErrors(writeStatuses);
assertFalse(metadata(client).isInSync());
assertTrue(metadata(client).isInSync());
}
// Enable metadata table and ensure it is synced
@@ -757,7 +758,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
private void validateMetadata(SparkRDDWriteClient client) throws IOException {
HoodieWriteConfig config = client.getConfig();
HoodieBackedTableMetadata tableMetadata = metadata(client);
HoodieTableMetadata tableMetadata = metadata(client);
assertNotNull(tableMetadata, "MetadataReader should have been initialized");
if (!config.useFileListingMetadata()) {
return;
@@ -767,7 +768,9 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
// Partitions should match
List<String> fsPartitions = FSUtils.getAllFoldersWithPartitionMetaFile(fs, basePath);
FileSystemBackedTableMetadata fsBackedTableMetadata = new FileSystemBackedTableMetadata(engineContext,
new SerializableConfiguration(hadoopConf), config.getBasePath(), config.shouldAssumeDatePartitioning());
List<String> fsPartitions = fsBackedTableMetadata.getAllPartitionPaths();
List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();
Collections.sort(fsPartitions);
@@ -849,7 +852,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
// Metadata table has a fixed number of partitions
// Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as that function filters all directory
// in the .hoodie folder.
List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(fs, HoodieTableMetadata.getMetadataTableBasePath(basePath),
List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, fs, HoodieTableMetadata.getMetadataTableBasePath(basePath),
false, false, false);
assertEquals(MetadataPartitionType.values().length, metadataTablePartitions.size());
@@ -873,10 +876,11 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
.create(hadoopConf, client.getConfig(), new HoodieSparkEngineContext(jsc));
}
private HoodieBackedTableMetadata metadata(SparkRDDWriteClient client) {
private HoodieTableMetadata metadata(SparkRDDWriteClient client) {
HoodieWriteConfig clientConfig = client.getConfig();
return (HoodieBackedTableMetadata) HoodieTableMetadata.create(hadoopConf, clientConfig.getBasePath(), clientConfig.getSpillableMapBasePath(),
clientConfig.useFileListingMetadata(), clientConfig.getFileListingMetadataVerify(), false, clientConfig.shouldAssumeDatePartitioning());
return HoodieTableMetadata.create(client.getEngineContext(), clientConfig.getBasePath(),
clientConfig.getSpillableMapBasePath(), clientConfig.useFileListingMetadata(),
clientConfig.getFileListingMetadataVerify(), false, clientConfig.shouldAssumeDatePartitioning());
}
// TODO: this can be moved to TestHarness after merge from master
@@ -912,4 +916,4 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
protected HoodieTableType getTableType() {
return tableType;
}
}
}