diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java index 8eac8dd6f..169b9271d 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java @@ -161,7 +161,9 @@ public class MetadataCommand implements CommandMarker { @CliCommand(value = "metadata list-partitions", help = "Print a list of all partitions from the metadata") public String listPartitions() throws IOException { HoodieCLI.getTableMetaClient(); - HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(HoodieCLI.conf, HoodieCLI.basePath, "/tmp", true, false, false); + initJavaSparkContext(); + HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new HoodieSparkEngineContext(jsc), + HoodieCLI.basePath, "/tmp", true, false, false, false); StringBuffer out = new StringBuffer("\n"); if (!metadata.enabled()) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java index 30bb48db7..765965fa5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java @@ -119,6 +119,10 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl return config; } + public HoodieEngineContext getEngineContext() { + return context; + } + protected void initWrapperFSMetrics() { // no-op. } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index d086b8085..1466a173e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -221,8 +221,9 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta protected abstract void initialize(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient); private void initTableMetadata() { - this.metadata = new HoodieBackedTableMetadata(hadoopConf.get(), datasetWriteConfig.getBasePath(), datasetWriteConfig.getSpillableMapBasePath(), - datasetWriteConfig.useFileListingMetadata(), datasetWriteConfig.getFileListingMetadataVerify(), false, + this.metadata = new HoodieBackedTableMetadata(engineContext, datasetWriteConfig.getBasePath(), + datasetWriteConfig.getSpillableMapBasePath(), datasetWriteConfig.useFileListingMetadata(), + datasetWriteConfig.getFileListingMetadataVerify(), false, datasetWriteConfig.shouldAssumeDatePartitioning()); this.metaClient = metadata.getMetaClient(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 454b3dc46..10e6ad95d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -31,6 +31,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.ConsistencyGuard; import org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility; @@ -93,6 +94,7 @@ public abstract class HoodieTable implem protected final HoodieWriteConfig config; protected final HoodieTableMetaClient metaClient; + protected final transient HoodieEngineContext context; protected final HoodieIndex index; private SerializableConfiguration hadoopConfiguration; @@ -108,6 +110,7 @@ public abstract class HoodieTable implem config.getViewStorageConfig()); this.metaClient = metaClient; this.index = getIndex(config, context); + this.context = context; this.taskContextSupplier = context.getTaskContextSupplier(); } @@ -660,8 +663,16 @@ public abstract class HoodieTable implem public HoodieTableMetadata metadata() { if (metadata == null) { - metadata = HoodieTableMetadata.create(hadoopConfiguration.get(), config.getBasePath(), config.getSpillableMapBasePath(), - config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.isMetricsOn(), config.shouldAssumeDatePartitioning()); + HoodieEngineContext engineContext = context; + if (engineContext == null) { + // This is to handle scenarios where this is called at the executor tasks which do not have access + // to engine context, and it ends up being null (as its not serializable and marked transient here). + engineContext = new HoodieLocalEngineContext(hadoopConfiguration.get()); + } + + metadata = HoodieTableMetadata.create(engineContext, config.getBasePath(), config.getSpillableMapBasePath(), + config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.isMetricsOn(), + config.shouldAssumeDatePartitioning()); } return metadata; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java index 17d7feef1..a94a4e6ff 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java @@ -67,7 +67,7 @@ public abstract class PartitionAwareClusteringPlanStrategy partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), + List partitionPaths = FSUtils.getAllPartitionPaths(getEngineContext(), metaClient.getFs(), metaClient.getBasePath(), config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java index ee7f4dd71..1ca019619 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java @@ -91,9 +91,10 @@ public class RollbackUtils { * @param config instance of {@link HoodieWriteConfig} to use. * @return {@link List} of {@link ListingBasedRollbackRequest}s thus collected. */ - public static List generateRollbackRequestsByListingCOW(FileSystem fs, String basePath, HoodieWriteConfig config) { + public static List generateRollbackRequestsByListingCOW(HoodieEngineContext engineContext, + FileSystem fs, String basePath, HoodieWriteConfig config) { try { - return FSUtils.getAllPartitionPaths(fs, basePath, config.useFileListingMetadata(), + return FSUtils.getAllPartitionPaths(engineContext, fs, basePath, config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning()).stream() .map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction) .collect(Collectors.toList()); @@ -113,7 +114,7 @@ public class RollbackUtils { public static List generateRollbackRequestsUsingFileListingMOR(HoodieInstant instantToRollback, HoodieTable table, HoodieEngineContext context) throws IOException { String commit = instantToRollback.getTimestamp(); HoodieWriteConfig config = table.getConfig(); - List partitions = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), + List partitions = FSUtils.getAllPartitionPaths(context, table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning()); int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1); context.setJobStatus(RollbackUtils.class.getSimpleName(), "Generate all rollback requests"); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java index d7d1783e3..4b222dae1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java @@ -90,7 +90,7 @@ public class SavepointActionExecutor ext "Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained); context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest files for savepoint " + instantTime); - List partitions = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), + List partitions = FSUtils.getAllPartitionPaths(context, table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning() ); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java index cd87bd74a..585b41b3e 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java @@ -65,7 +65,7 @@ public class FlinkCopyOnWriteRollbackActionExecutor executeRollbackUsingFileListing(HoodieInstant instantToRollback) { List rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW( - table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), config); + context, table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), config); return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java index 75e2a0011..5a61c5cdc 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java @@ -93,8 +93,8 @@ public class ZeroToOneUpgradeHandler implements UpgradeHandler { // generate rollback stats List rollbackRequests; if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) { - rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), - table.getConfig()); + rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getFs(), + table.getMetaClient().getBasePath(), table.getConfig()); } else { rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieGlobalBloomIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieGlobalBloomIndex.java index 8c3ba63e7..27a9131b4 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieGlobalBloomIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieGlobalBloomIndex.java @@ -63,7 +63,7 @@ public class SparkHoodieGlobalBloomIndex extends final HoodieTable hoodieTable) { HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); try { - List allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), + List allPartitionPaths = FSUtils.getAllPartitionPaths(context, metaClient.getFs(), metaClient.getBasePath(), config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning()); return super.loadInvolvedFiles(allPartitionPaths, context, hoodieTable); } catch (IOException e) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieGlobalSimpleIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieGlobalSimpleIndex.java index f266bb5ab..c58359cc9 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieGlobalSimpleIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieGlobalSimpleIndex.java @@ -104,7 +104,7 @@ public class SparkHoodieGlobalSimpleIndex extends final HoodieTable>, JavaRDD, JavaRDD> hoodieTable) { HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); try { - List allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), + List allPartitionPaths = FSUtils.getAllPartitionPaths(context, metaClient.getFs(), metaClient.getBasePath(), config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning()); // Obtain the latest data files from all the partitions. return getLatestBaseFilesForAllPartitions(allPartitionPaths, context, hoodieTable); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java index b799db473..039f62834 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java @@ -50,7 +50,7 @@ public class SparkInsertOverwriteTableCommitActionExecutor> getPartitionToReplacedFileIds(JavaRDD writeStatuses) { Map> partitionToExistingFileIds = new HashMap<>(); try { - List partitionPaths = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), + List partitionPaths = FSUtils.getAllPartitionPaths(context, table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), config.useFileListingMetadata(), config.getFileListingMetadataVerify(), false); JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java index 81b110d88..4773bf210 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java @@ -195,7 +195,7 @@ public class HoodieSparkMergeOnReadTableCompactor // TODO - rollback any compactions in flight HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); LOG.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime); - List partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), + List partitionPaths = FSUtils.getAllPartitionPaths(context, metaClient.getFs(), metaClient.getBasePath(), config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning()); // filter the partition paths if needed to reduce list status diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java index b770bbfe5..11d3bafde 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java @@ -66,8 +66,8 @@ public class SparkCopyOnWriteRollbackActionExecutor executeRollbackUsingFileListing(HoodieInstant instantToRollback) { - List rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(), - table.getMetaClient().getBasePath(), config); + List rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, + table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), config); return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java index a53c7cca7..bb1a9ad64 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java @@ -92,8 +92,8 @@ public class ZeroToOneUpgradeHandler implements UpgradeHandler { // generate rollback stats List rollbackRequests; if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) { - rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), - table.getConfig()); + rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getFs(), + table.getMetaClient().getBasePath(), table.getConfig()); } else { rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java index eab9bb191..bc639b213 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java @@ -101,7 +101,7 @@ public class TestClientRollback extends HoodieClientTestBase { assertNoWriteErrors(statuses); HoodieWriteConfig config = getConfig(); List partitionPaths = - FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), config.useFileListingMetadata(), + FSUtils.getAllPartitionPaths(context, fs, cfg.getBasePath(), config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning()); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieSparkTable table = HoodieSparkTable.create(getConfig(), context, metaClient); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java index 593223653..7e3dea470 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java @@ -23,6 +23,7 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.FileSlice; @@ -454,7 +455,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { records = dataGen.generateUniqueUpdates(newCommitTime, 5); writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); assertNoWriteErrors(writeStatuses); - assertFalse(metadata(client).isInSync()); + assertTrue(metadata(client).isInSync()); // updates and inserts newCommitTime = HoodieActiveTimeline.createNewInstantTime(); @@ -462,21 +463,21 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { records = dataGen.generateUpdates(newCommitTime, 10); writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); assertNoWriteErrors(writeStatuses); - assertFalse(metadata(client).isInSync()); + assertTrue(metadata(client).isInSync()); // Compaction if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) { newCommitTime = HoodieActiveTimeline.createNewInstantTime(); client.scheduleCompactionAtInstant(newCommitTime, Option.empty()); client.compact(newCommitTime); - assertFalse(metadata(client).isInSync()); + assertTrue(metadata(client).isInSync()); } // Savepoint restoreToInstant = newCommitTime; if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) { client.savepoint("hoodie", "metadata test"); - assertFalse(metadata(client).isInSync()); + assertTrue(metadata(client).isInSync()); } // Deletes @@ -485,12 +486,12 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { JavaRDD deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey()); client.startCommitWithTime(newCommitTime); client.delete(deleteKeys, newCommitTime); - assertFalse(metadata(client).isInSync()); + assertTrue(metadata(client).isInSync()); // Clean newCommitTime = HoodieActiveTimeline.createNewInstantTime(); client.clean(newCommitTime); - assertFalse(metadata(client).isInSync()); + assertTrue(metadata(client).isInSync()); // updates newCommitTime = HoodieActiveTimeline.createNewInstantTime(); @@ -498,8 +499,8 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { records = dataGen.generateUniqueUpdates(newCommitTime, 10); writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); assertNoWriteErrors(writeStatuses); - assertFalse(metadata(client).isInSync()); - + assertTrue(metadata(client).isInSync()); + // insert overwrite to test replacecommit newCommitTime = HoodieActiveTimeline.createNewInstantTime(); client.startCommitWithTime(newCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION); @@ -507,7 +508,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { HoodieWriteResult replaceResult = client.insertOverwrite(jsc.parallelize(records, 1), newCommitTime); writeStatuses = replaceResult.getWriteStatuses().collect(); assertNoWriteErrors(writeStatuses); - assertFalse(metadata(client).isInSync()); + assertTrue(metadata(client).isInSync()); } // Enable metadata table and ensure it is synced @@ -757,7 +758,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { private void validateMetadata(SparkRDDWriteClient client) throws IOException { HoodieWriteConfig config = client.getConfig(); - HoodieBackedTableMetadata tableMetadata = metadata(client); + HoodieTableMetadata tableMetadata = metadata(client); assertNotNull(tableMetadata, "MetadataReader should have been initialized"); if (!config.useFileListingMetadata()) { return; @@ -767,7 +768,9 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); // Partitions should match - List fsPartitions = FSUtils.getAllFoldersWithPartitionMetaFile(fs, basePath); + FileSystemBackedTableMetadata fsBackedTableMetadata = new FileSystemBackedTableMetadata(engineContext, + new SerializableConfiguration(hadoopConf), config.getBasePath(), config.shouldAssumeDatePartitioning()); + List fsPartitions = fsBackedTableMetadata.getAllPartitionPaths(); List metadataPartitions = tableMetadata.getAllPartitionPaths(); Collections.sort(fsPartitions); @@ -849,7 +852,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { // Metadata table has a fixed number of partitions // Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as that function filters all directory // in the .hoodie folder. - List metadataTablePartitions = FSUtils.getAllPartitionPaths(fs, HoodieTableMetadata.getMetadataTableBasePath(basePath), + List metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, fs, HoodieTableMetadata.getMetadataTableBasePath(basePath), false, false, false); assertEquals(MetadataPartitionType.values().length, metadataTablePartitions.size()); @@ -873,10 +876,11 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { .create(hadoopConf, client.getConfig(), new HoodieSparkEngineContext(jsc)); } - private HoodieBackedTableMetadata metadata(SparkRDDWriteClient client) { + private HoodieTableMetadata metadata(SparkRDDWriteClient client) { HoodieWriteConfig clientConfig = client.getConfig(); - return (HoodieBackedTableMetadata) HoodieTableMetadata.create(hadoopConf, clientConfig.getBasePath(), clientConfig.getSpillableMapBasePath(), - clientConfig.useFileListingMetadata(), clientConfig.getFileListingMetadataVerify(), false, clientConfig.shouldAssumeDatePartitioning()); + return HoodieTableMetadata.create(client.getEngineContext(), clientConfig.getBasePath(), + clientConfig.getSpillableMapBasePath(), clientConfig.useFileListingMetadata(), + clientConfig.getFileListingMetadataVerify(), false, clientConfig.shouldAssumeDatePartitioning()); } // TODO: this can be moved to TestHarness after merge from master @@ -912,4 +916,4 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { protected HoodieTableType getTableType() { return tableType; } -} +} \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index 02e67e19f..e95712bc0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -39,6 +39,7 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig { // Validate contents of Metadata Table on each access against the actual filesystem public static final String METADATA_VALIDATE_PROP = METADATA_PREFIX + ".validate"; public static final boolean DEFAULT_METADATA_VALIDATE = false; + public static final boolean DEFAULT_METADATA_ENABLE_FOR_READERS = false; // Parallelism for inserts public static final String METADATA_INSERT_PARALLELISM_PROP = METADATA_PREFIX + ".insert.parallelism"; @@ -62,10 +63,6 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig { public static final String CLEANER_COMMITS_RETAINED_PROP = METADATA_PREFIX + ".cleaner.commits.retained"; public static final int DEFAULT_CLEANER_COMMITS_RETAINED = 3; - // We can set the default to true for readers, as it will internally default to listing from filesystem if metadata - // table is not found - public static final boolean DEFAULT_METADATA_ENABLE_FOR_READERS = true; - private HoodieMetadataConfig(Properties props) { super(props); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java new file mode 100644 index 000000000..e80456702 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.engine; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.function.SerializableConsumer; +import org.apache.hudi.common.function.SerializableFunction; +import org.apache.hudi.common.function.SerializablePairFunction; +import org.apache.hudi.common.util.Option; + +import org.apache.hudi.common.util.collection.Pair; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toList; +import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWrapper; +import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper; +import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper; +import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper; + +/** + * A java based engine context, use this implementation on the query engine integrations if needed. + */ +public final class HoodieLocalEngineContext extends HoodieEngineContext { + + public HoodieLocalEngineContext(Configuration conf) { + this(conf, new LocalTaskContextSupplier()); + } + + public HoodieLocalEngineContext(Configuration conf, TaskContextSupplier taskContextSupplier) { + super(new SerializableConfiguration(conf), taskContextSupplier); + } + + @Override + public List map(List data, SerializableFunction func, int parallelism) { + return data.stream().parallel().map(throwingMapWrapper(func)).collect(toList()); + } + + @Override + public List flatMap(List data, SerializableFunction> func, int parallelism) { + return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(toList()); + } + + @Override + public void foreach(List data, SerializableConsumer consumer, int parallelism) { + data.stream().forEach(throwingForeachWrapper(consumer)); + } + + @Override + public Map mapToPair(List data, SerializablePairFunction func, Integer parallelism) { + return data.stream().map(throwingMapToPairWrapper(func)).collect( + Collectors.toMap(Pair::getLeft, Pair::getRight, (oldVal, newVal) -> newVal) + ); + } + + @Override + public void setProperty(EngineProperty key, String value) { + // no operation for now + } + + @Override + public Option getProperty(EngineProperty key) { + return Option.empty(); + } + + @Override + public void setJobStatus(String activeModule, String activityDescription) { + // no operation for now + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java new file mode 100644 index 000000000..0c7ae20e1 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.engine; + +import org.apache.hudi.common.util.Option; + +import java.util.function.Supplier; + +public final class LocalTaskContextSupplier extends TaskContextSupplier { + @Override + public Supplier getPartitionIdSupplier() { + return () -> 0; + } + + @Override + public Supplier getStageIdSupplier() { + return () -> 0; + } + + @Override + public Supplier getAttemptIdSupplier() { + return () -> 0L; + } + + @Override + public Option getProperty(EngineProperty prop) { + return Option.empty(); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 2d638b47e..7978eed6c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.fs; import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodiePartitionMetadata; @@ -252,13 +253,14 @@ public class FSUtils { } } - public static List getAllPartitionPaths(FileSystem fs, String basePathStr, boolean useFileListingFromMetadata, boolean verifyListings, + public static List getAllPartitionPaths(HoodieEngineContext engineContext, FileSystem fs, String basePathStr, + boolean useFileListingFromMetadata, boolean verifyListings, boolean assumeDatePartitioning) throws IOException { if (assumeDatePartitioning) { return getAllPartitionFoldersThreeLevelsDown(fs, basePathStr); } else { - HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(fs.getConf(), basePathStr, "/tmp/", useFileListingFromMetadata, - verifyListings, false, false); + HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(engineContext, basePathStr, "/tmp/", + useFileListingFromMetadata, verifyListings, false, false); return tableMetadata.getAllPartitionPaths(); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java index c5a31fa3a..6901bc567 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.table.view; import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Functions.Function2; @@ -159,12 +160,11 @@ public class FileSystemViewManager { return new HoodieTableFileSystemView(metaClient, timeline, viewConf.isIncrementalTimelineSyncEnabled()); } - public static HoodieTableFileSystemView createInMemoryFileSystemView(HoodieTableMetaClient metaClient, - boolean useFileListingFromMetadata, - boolean verifyListings) { + public static HoodieTableFileSystemView createInMemoryFileSystemView(HoodieEngineContext engineContext, + HoodieTableMetaClient metaClient, boolean useFileListingFromMetadata, boolean verifyListings) { LOG.info("Creating InMemory based view for basePath " + metaClient.getBasePath()); if (useFileListingFromMetadata) { - return new HoodieMetadataFileSystemView(metaClient, + return new HoodieMetadataFileSystemView(engineContext, metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), true, verifyListings); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index 33371da0b..4ae71deb6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -23,6 +23,7 @@ import org.apache.avro.Schema; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.HoodiePartitionMetadata; @@ -33,7 +34,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -54,6 +54,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024; static final int BUFFER_SIZE = 10 * 1024 * 1024; + protected final transient HoodieEngineContext engineContext; protected final SerializableConfiguration hadoopConf; protected final String datasetBasePath; protected boolean enabled; @@ -66,10 +67,11 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { protected final String spillableMapDirectory; private transient HoodieMetadataMergedInstantRecordScanner timelineRecordScanner; - protected BaseTableMetadata(Configuration hadoopConf, String datasetBasePath, String spillableMapDirectory, + protected BaseTableMetadata(HoodieEngineContext engineContext, String datasetBasePath, String spillableMapDirectory, boolean enabled, boolean validateLookups, boolean enableMetrics, boolean assumeDatePartitioning) { - this.hadoopConf = new SerializableConfiguration(hadoopConf); + this.engineContext = engineContext; + this.hadoopConf = new SerializableConfiguration(engineContext.getHadoopConf()); this.datasetBasePath = datasetBasePath; this.spillableMapDirectory = spillableMapDirectory; @@ -102,7 +104,8 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { LOG.error("Failed to retrieve list of partition from metadata", e); } } - return new FileSystemBackedTableMetadata(hadoopConf, datasetBasePath, assumeDatePartitioning).getAllPartitionPaths(); + return new FileSystemBackedTableMetadata(engineContext, hadoopConf, datasetBasePath, + assumeDatePartitioning).getAllPartitionPaths(); } /** @@ -155,7 +158,8 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { if (validateLookups) { // Validate the Metadata Table data by listing the partitions from the file system timer.startTimer(); - FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(hadoopConf, datasetBasePath, assumeDatePartitioning); + FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(engineContext, + hadoopConf, datasetBasePath, assumeDatePartitioning); List actualPartitions = fileSystemBackedTableMetadata.getAllPartitionPaths(); metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_PARTITIONS_STR, timer.endTimer())); @@ -290,4 +294,8 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { protected void closeReaders() { timelineRecordScanner = null; } + + protected HoodieEngineContext getEngineContext() { + return engineContext; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java index 73ce8e487..326d6fd54 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -19,23 +19,36 @@ package org.apache.hudi.metadata; import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodiePartitionMetadata; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; import java.util.List; +import java.util.stream.Collectors; public class FileSystemBackedTableMetadata implements HoodieTableMetadata { + private static final int DEFAULT_LISTING_PARALLELISM = 1500; + + private final transient HoodieEngineContext engineContext; private final SerializableConfiguration hadoopConf; private final String datasetBasePath; private final boolean assumeDatePartitioning; - public FileSystemBackedTableMetadata(SerializableConfiguration conf, String datasetBasePath, boolean assumeDatePartitioning) { + public FileSystemBackedTableMetadata(HoodieEngineContext engineContext, SerializableConfiguration conf, String datasetBasePath, + boolean assumeDatePartitioning) { + this.engineContext = engineContext; this.hadoopConf = conf; this.datasetBasePath = datasetBasePath; this.assumeDatePartitioning = assumeDatePartitioning; @@ -49,12 +62,47 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata { @Override public List getAllPartitionPaths() throws IOException { - FileSystem fs = new Path(datasetBasePath).getFileSystem(hadoopConf.get()); if (assumeDatePartitioning) { + FileSystem fs = new Path(datasetBasePath).getFileSystem(hadoopConf.get()); return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, datasetBasePath); - } else { - return FSUtils.getAllFoldersWithPartitionMetaFile(fs, datasetBasePath); } + + List pathsToList = new LinkedList<>(); + pathsToList.add(new Path(datasetBasePath)); + List partitionPaths = new ArrayList<>(); + + while (!pathsToList.isEmpty()) { + // TODO: Get the parallelism from HoodieWriteConfig + int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size()); + + // List all directories in parallel + List> dirToFileListing = engineContext.map(pathsToList, path -> { + FileSystem fileSystem = path.getFileSystem(hadoopConf.get()); + return Pair.of(path, fileSystem.listStatus(path)); + }, listingParallelism); + pathsToList.clear(); + + // If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to + // the results. + dirToFileListing.forEach(p -> { + Option partitionMetaFile = Option.fromJavaOptional(Arrays.stream(p.getRight()).parallel() + .filter(fs -> fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) + .findFirst()); + + if (partitionMetaFile.isPresent()) { + // Is a partition. + String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), p.getLeft()); + partitionPaths.add(partitionName); + } else { + // Add sub-dirs to the queue + pathsToList.addAll(Arrays.stream(p.getRight()) + .filter(fs -> fs.isDirectory() && !fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) + .map(fs -> fs.getPath()) + .collect(Collectors.toList())); + } + }); + } + return partitionPaths; } @Override @@ -64,6 +112,6 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata { @Override public boolean isInSync() { - throw new UnsupportedOperationException(); + return true; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 65c3244f9..b90266e7c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -21,6 +21,8 @@ package org.apache.hudi.metadata; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieLogFile; @@ -70,15 +72,15 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { private transient HoodieFileReader baseFileReader; private transient HoodieMetadataMergedLogRecordScanner logRecordScanner; - public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, String spillableMapDirectory, - boolean enabled, boolean validateLookups, boolean assumeDatePartitioning) { - this(conf, datasetBasePath, spillableMapDirectory, enabled, validateLookups, false, assumeDatePartitioning); + public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, String spillableMapDirectory, boolean enabled, + boolean validateLookups, boolean assumeDatePartitioning) { + this(new HoodieLocalEngineContext(conf), datasetBasePath, spillableMapDirectory, enabled, validateLookups, + false, assumeDatePartitioning); } - public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, String spillableMapDirectory, - boolean enabled, boolean validateLookups, boolean enableMetrics, - boolean assumeDatePartitioning) { - super(conf, datasetBasePath, spillableMapDirectory, enabled, validateLookups, enableMetrics, assumeDatePartitioning); + public HoodieBackedTableMetadata(HoodieEngineContext engineContext, String datasetBasePath, String spillableMapDirectory, + boolean enabled, boolean validateLookups, boolean enableMetrics, boolean assumeDatePartitioning) { + super(engineContext, datasetBasePath, spillableMapDirectory, enabled, validateLookups, enableMetrics, assumeDatePartitioning); this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(datasetBasePath); if (enabled) { try { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java index 9f9e4054b..a440419a7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java @@ -22,6 +22,8 @@ import java.io.IOException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; + +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; @@ -40,12 +42,13 @@ public class HoodieMetadataFileSystemView extends HoodieTableFileSystemView { this.tableMetadata = tableMetadata; } - public HoodieMetadataFileSystemView(HoodieTableMetaClient metaClient, + public HoodieMetadataFileSystemView(HoodieEngineContext engineContext, + HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, boolean useFileListingFromMetadata, boolean verifyListings) { super(metaClient, visibleActiveTimeline); - this.tableMetadata = HoodieTableMetadata.create(metaClient.getHadoopConf(), metaClient.getBasePath(), + this.tableMetadata = HoodieTableMetadata.create(engineContext, metaClient.getBasePath(), FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR, useFileListingFromMetadata, verifyListings, false, false); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java index acb29f79e..2bfbf3964 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java @@ -18,10 +18,11 @@ package org.apache.hudi.metadata; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -68,10 +69,16 @@ public interface HoodieTableMetadata extends Serializable { return basePath.endsWith(METADATA_TABLE_REL_PATH); } - static HoodieTableMetadata create(Configuration conf, String datasetBasePath, String spillableMapPath, boolean useFileListingFromMetadata, - boolean verifyListings, boolean enableMetrics, boolean shouldAssumeDatePartitioning) { - return new HoodieBackedTableMetadata(conf, datasetBasePath, spillableMapPath, useFileListingFromMetadata, verifyListings, - enableMetrics, shouldAssumeDatePartitioning); + static HoodieTableMetadata create(HoodieEngineContext engineContext, String datasetBasePath, + String spillableMapPath, boolean useFileListingFromMetadata, boolean verifyListings, + boolean enableMetrics, boolean shouldAssumeDatePartitioning) { + if (useFileListingFromMetadata) { + return new HoodieBackedTableMetadata(engineContext, datasetBasePath, spillableMapPath, useFileListingFromMetadata, + verifyListings, enableMetrics, shouldAssumeDatePartitioning); + } else { + return new FileSystemBackedTableMetadata(engineContext, new SerializableConfiguration(engineContext.getHadoopConf()), + datasetBasePath, shouldAssumeDatePartitioning); + } } /** diff --git a/hudi-common/src/test/java/org/apache/hudi/metadata/TestFileSystemBackedTableMetadata.java b/hudi-common/src/test/java/org/apache/hudi/metadata/TestFileSystemBackedTableMetadata.java new file mode 100644 index 000000000..e44054f3a --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/metadata/TestFileSystemBackedTableMetadata.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.testutils.HoodieTestTable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.IntStream; + +public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness { + + private static final String DEFAULT_PARTITION = ""; + private static final List DATE_PARTITIONS = Arrays.asList("2019/01/01", "2020/01/02", "2021/03/01"); + private static final List ONE_LEVEL_PARTITIONS = Arrays.asList("2019", "2020", "2021"); + private static final List MULTI_LEVEL_PARTITIONS = Arrays.asList("2019/01", "2020/01", "2021/01"); + private static HoodieTestTable hoodieTestTable; + + @BeforeEach + public void setUp() throws IOException { + initMetaClient(); + hoodieTestTable = HoodieTestTable.of(metaClient); + } + + @AfterEach + public void tearDown() throws IOException { + metaClient.getFs().delete(new Path(metaClient.getBasePath()), true); + } + + /** + * Test non partition hoodie table. + * @throws Exception + */ + @Test + public void testNonPartitionedTable() throws Exception { + // Generate 10 files under basepath + hoodieTestTable.addCommit("100").withBaseFilesInPartition(DEFAULT_PARTITION, IntStream.range(0, 10).toArray()); + HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); + FileSystemBackedTableMetadata fileSystemBackedTableMetadata = + new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false); + Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 0); + Assertions.assertTrue(fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath)).length == 10); + } + + /** + * Test listing of partitions result for date based partitions. + * @throws Exception + */ + @Test + public void testDatePartitionedTable() throws Exception { + String instant = "100"; + hoodieTestTable = hoodieTestTable.addCommit(instant); + // Generate 10 files under each partition + DATE_PARTITIONS.stream().forEach(p -> { + try { + hoodieTestTable = hoodieTestTable.withBaseFilesInPartition(p, IntStream.range(0, 10).toArray()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); + FileSystemBackedTableMetadata fileSystemBackedTableMetadata = + new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, true); + Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 3); + Assertions.assertTrue(fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + DATE_PARTITIONS.get(0))).length == 10); + } + + /** + * Test listing of partitions result for date based partitions with assumeDataPartitioning = false. + * @throws Exception + */ + @Test + public void testDatePartitionedTableWithAssumeDateIsFalse() throws Exception { + String instant = "100"; + hoodieTestTable = hoodieTestTable.addCommit(instant); + // Generate 10 files under each partition + DATE_PARTITIONS.stream().forEach(p -> { + try { + hoodieTestTable = hoodieTestTable.withBaseFilesInPartition(p, IntStream.range(0, 10).toArray()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); + FileSystemBackedTableMetadata fileSystemBackedTableMetadata = + new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false); + Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 0); + } + + @Test + public void testOneLevelPartitionedTable() throws Exception { + String instant = "100"; + hoodieTestTable = hoodieTestTable.addCommit(instant); + // Generate 10 files under each partition + ONE_LEVEL_PARTITIONS.stream().forEach(p -> { + try { + hoodieTestTable = hoodieTestTable.withPartitionMetaFiles(p) + .withBaseFilesInPartition(p, IntStream.range(0, 10).toArray()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); + FileSystemBackedTableMetadata fileSystemBackedTableMetadata = + new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false); + Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 3); + Assertions.assertTrue(fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + ONE_LEVEL_PARTITIONS.get(0))).length == 10); + } + + @Test + public void testMultiLevelPartitionedTable() throws Exception { + String instant = "100"; + hoodieTestTable = hoodieTestTable.addCommit(instant); + // Generate 10 files under each partition + MULTI_LEVEL_PARTITIONS.stream().forEach(p -> { + try { + hoodieTestTable = hoodieTestTable.withPartitionMetaFiles(p) + .withBaseFilesInPartition(p, IntStream.range(0, 10).toArray()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); + FileSystemBackedTableMetadata fileSystemBackedTableMetadata = + new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false); + Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 3); + Assertions.assertTrue(fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + MULTI_LEVEL_PARTITIONS.get(0))).length == 10); + } + + @Test + public void testMultiLevelEmptyPartitionTable() throws Exception { + String instant = "100"; + hoodieTestTable = hoodieTestTable.addCommit(instant); + // Generate 10 files under each partition + MULTI_LEVEL_PARTITIONS.stream().forEach(p -> { + try { + hoodieTestTable = hoodieTestTable.withPartitionMetaFiles(p); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); + FileSystemBackedTableMetadata fileSystemBackedTableMetadata = + new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false); + Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 3); + Assertions.assertTrue(fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + MULTI_LEVEL_PARTITIONS.get(0))).length == 0); + } + +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java index baedb1688..dd285ba42 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configurable; import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodiePartitionMetadata; @@ -172,8 +173,9 @@ public class HoodieROTablePathFilter implements Configurable, PathFilter, Serial boolean useFileListingFromMetadata = getConf().getBoolean(METADATA_ENABLE_PROP, DEFAULT_METADATA_ENABLE_FOR_READERS); boolean verifyFileListing = getConf().getBoolean(METADATA_VALIDATE_PROP, DEFAULT_METADATA_VALIDATE); - HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(metaClient, - useFileListingFromMetadata, verifyFileListing); + HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(conf.get()); + HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, + metaClient, useFileListingFromMetadata, verifyFileListing); String partition = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), folder); List latestFiles = fsView.getLatestBaseFiles(partition).collect(Collectors.toList()); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index 019b558dc..2ca7ab5a6 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi.hadoop.utils; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -427,8 +428,9 @@ public class HoodieInputFormatUtils { boolean useFileListingFromMetadata = job.getBoolean(METADATA_ENABLE_PROP, DEFAULT_METADATA_ENABLE_FOR_READERS); boolean verifyFileListing = job.getBoolean(METADATA_VALIDATE_PROP, DEFAULT_METADATA_VALIDATE); - HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(metaClient, - useFileListingFromMetadata, verifyFileListing); + HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(job); + HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, + metaClient, useFileListingFromMetadata, verifyFileListing); List filteredBaseFiles = new ArrayList<>(); for (Path p : paths) { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java index 1b272ae89..68d2d2297 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi.hadoop.utils; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; @@ -81,9 +82,9 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { // for each partition path obtain the data & log file groupings, then map back to inputsplits HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath); if (!fsCache.containsKey(metaClient)) { - - HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(metaClient, - useFileListingFromMetadata, verifyFileListing); + HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(conf); + HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, + metaClient, useFileListingFromMetadata, verifyFileListing); fsCache.put(metaClient, fsView); } HoodieTableFileSystemView fsView = fsCache.get(metaClient); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java index a41da2c21..4592ae811 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java @@ -19,6 +19,7 @@ package org.apache.hudi.integ.testsuite.reader; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; @@ -86,7 +87,8 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { // Using FSUtils.getFS here instead of metaClient.getFS() since we dont want to count these listStatus // calls in metrics as they are not part of normal HUDI operation. FileSystem fs = FSUtils.getFs(metaClient.getBasePath(), metaClient.getHadoopConf()); - List partitionPaths = FSUtils.getAllPartitionPaths(fs, metaClient.getBasePath(), + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + List partitionPaths = FSUtils.getAllPartitionPaths(engineContext, fs, metaClient.getBasePath(), HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false); // Sort partition so we can pick last N partitions by default Collections.sort(partitionPaths); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java index 521ff0551..fe77e1280 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java @@ -373,7 +373,7 @@ public class TestBootstrap extends HoodieClientTestBase { reloadInputFormats(); List records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat( jsc.hadoopConfiguration(), - FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, + FSUtils.getAllPartitionPaths(context, metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream() .map(f -> basePath + "/" + f).collect(Collectors.toList()), basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>()); @@ -392,7 +392,7 @@ public class TestBootstrap extends HoodieClientTestBase { seenKeys = new HashSet<>(); records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat( jsc.hadoopConfiguration(), - FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, + FSUtils.getAllPartitionPaths(context, metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream() .map(f -> basePath + "/" + f).collect(Collectors.toList()), basePath, rtJobConf, true, schema, TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>()); @@ -409,7 +409,7 @@ public class TestBootstrap extends HoodieClientTestBase { reloadInputFormats(); records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat( jsc.hadoopConfiguration(), - FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, + FSUtils.getAllPartitionPaths(context, metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream() .map(f -> basePath + "/" + f).collect(Collectors.toList()), basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, @@ -427,7 +427,7 @@ public class TestBootstrap extends HoodieClientTestBase { seenKeys = new HashSet<>(); records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat( jsc.hadoopConfiguration(), - FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, + FSUtils.getAllPartitionPaths(context, metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream() .map(f -> basePath + "/" + f).collect(Collectors.toList()), basePath, rtJobConf, true, schema, TRIP_HIVE_COLUMN_TYPES, true, @@ -443,7 +443,7 @@ public class TestBootstrap extends HoodieClientTestBase { reloadInputFormats(); records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat( jsc.hadoopConfiguration(), - FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, + FSUtils.getAllPartitionPaths(context, metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream() .map(f -> basePath + "/" + f).collect(Collectors.toList()), basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, true, @@ -461,7 +461,7 @@ public class TestBootstrap extends HoodieClientTestBase { seenKeys = new HashSet<>(); records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat( jsc.hadoopConfiguration(), - FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, + FSUtils.getAllPartitionPaths(context, metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream() .map(f -> basePath + "/" + f).collect(Collectors.toList()), basePath, rtJobConf, true, schema, TRIP_HIVE_COLUMN_TYPES, true, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index f315a2672..730b7d2a7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -18,10 +18,7 @@ package org.apache.hudi.functional import java.sql.{Date, Timestamp} -import java.util.function.Supplier -import java.util.stream.Stream -import org.apache.hadoop.fs.Path import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.HoodieInstant diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java index 8c9184847..957477e13 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java @@ -20,6 +20,8 @@ package org.apache.hudi.sync.common; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; + +import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -127,7 +129,9 @@ public abstract class AbstractSyncHoodieClient { if (!lastCommitTimeSynced.isPresent()) { LOG.info("Last commit time synced is not known, listing all partitions in " + basePath + ",FS :" + fs); try { - return FSUtils.getAllPartitionPaths(fs, basePath, useFileListingFromMetadata, verifyMetadataFileListing, assumeDatePartitioning); + HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); + return FSUtils.getAllPartitionPaths(engineContext, fs, basePath, useFileListingFromMetadata, verifyMetadataFileListing, + assumeDatePartitioning); } catch (IOException e) { throw new HoodieIOException("Failed to list all partitions in " + basePath, e); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java index fe7d02494..0c6b0b339 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java @@ -86,6 +86,7 @@ public class HoodieSnapshotCopier implements Serializable { final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), baseDir); final BaseFileOnlyView fsView = new HoodieTableFileSystemView(tableMetadata, tableMetadata.getActiveTimeline().getCommitsAndCompactionTimeline().filterCompletedInstants()); + HoodieEngineContext context = new HoodieSparkEngineContext(jsc); // Get the latest commit Option latestCommit = tableMetadata.getActiveTimeline().getCommitsAndCompactionTimeline().filterCompletedInstants().lastInstant(); @@ -97,7 +98,7 @@ public class HoodieSnapshotCopier implements Serializable { LOG.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.", latestCommitTimestamp)); - List partitions = FSUtils.getAllPartitionPaths(fs, baseDir, useFileListingFromMetadata, verifyMetadataFileListing, shouldAssumeDatePartitioning); + List partitions = FSUtils.getAllPartitionPaths(context, fs, baseDir, useFileListingFromMetadata, verifyMetadataFileListing, shouldAssumeDatePartitioning); if (partitions.size() > 0) { LOG.info(String.format("The job needs to copy %d partitions.", partitions.size())); @@ -108,7 +109,6 @@ public class HoodieSnapshotCopier implements Serializable { fs.delete(new Path(outputDir), true); } - HoodieEngineContext context = new HoodieSparkEngineContext(jsc); context.setJobStatus(this.getClass().getSimpleName(), "Creating a snapshot"); List> filesToCopy = context.flatMap(partitions, partition -> { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java index c306d0e6b..db80ce96a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java @@ -117,6 +117,7 @@ public class HoodieSnapshotExporter { public void export(JavaSparkContext jsc, Config cfg) throws IOException { FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration()); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); if (outputPathExists(fs, cfg)) { throw new HoodieSnapshotExporterException("The target output path already exists."); @@ -128,7 +129,7 @@ public class HoodieSnapshotExporter { LOG.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.", latestCommitTimestamp)); - final List partitions = getPartitions(fs, cfg); + final List partitions = getPartitions(engineContext, fs, cfg); if (partitions.isEmpty()) { throw new HoodieSnapshotExporterException("The source dataset has 0 partition to snapshot."); } @@ -153,8 +154,8 @@ public class HoodieSnapshotExporter { return latestCommit.isPresent() ? Option.of(latestCommit.get().getTimestamp()) : Option.empty(); } - private List getPartitions(FileSystem fs, Config cfg) throws IOException { - return FSUtils.getAllPartitionPaths(fs, cfg.sourceBasePath, true, false, false); + private List getPartitions(HoodieEngineContext engineContext, FileSystem fs, Config cfg) throws IOException { + return FSUtils.getAllPartitionPaths(engineContext, fs, cfg.sourceBasePath, true, false, false); } private void createSuccessTag(FileSystem fs, Config cfg) throws IOException { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java index 7a1034950..b4813a0ba 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java @@ -85,13 +85,14 @@ public class TimelineServerPerf implements Serializable { } public void run() throws IOException { - - List allPartitionPaths = FSUtils.getAllPartitionPaths(timelineServer.getFs(), cfg.basePath, cfg.useFileListingFromMetadata, - cfg.verifyMetadataFileListing, true); + JavaSparkContext jsc = UtilHelpers.buildSparkContext("hudi-view-perf-" + cfg.basePath, cfg.sparkMaster); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + List allPartitionPaths = FSUtils.getAllPartitionPaths(engineContext, timelineServer.getFs(), cfg.basePath, + cfg.useFileListingFromMetadata, cfg.verifyMetadataFileListing, true); Collections.shuffle(allPartitionPaths); List selected = allPartitionPaths.stream().filter(p -> !p.contains("error")).limit(cfg.maxPartitions) .collect(Collectors.toList()); - JavaSparkContext jsc = UtilHelpers.buildSparkContext("hudi-view-perf-" + cfg.basePath, cfg.sparkMaster); + if (!useExternalTimelineServer) { this.timelineServer.startService(); setHostAddrFromSparkConf(jsc.getConf());