1
0

[HUDI 1308] Harden RFC-15 Implementation based on production testing (#2441)

Addresses leaks, perf degradation observed during testing. These were regressions from the original rfc-15 PoC implementation.

* Pass a single instance of HoodieTableMetadata everywhere
* Fix tests and add config for enabling metrics
 - Removed special casing of assumeDatePartitioning inside FSUtils#getAllPartitionPaths()
 - Consequently, IOException is never thrown and many files had to be adjusted
- More diligent handling of open file handles in metadata table
 - Added config for controlling reuse of connections
 - Added config for turning off fallback to listing, so we can see tests fail
 - Changed all ipf listing code to cache/amortize the open/close for better performance
 - Timelineserver also reuses connections, for better performance
 - Without timelineserver, when metadata table is opened from executors, reuse is not allowed
 - HoodieMetadataConfig passed into HoodieTableMetadata#create as argument.
 -  Fix TestHoodieBackedTableMetadata#testSync
This commit is contained in:
vinoth chandar
2021-01-19 21:20:28 -08:00
committed by GitHub
parent e23967b9e9
commit 5ca0625b27
55 changed files with 767 additions and 570 deletions

View File

@@ -49,8 +49,8 @@ public class EmbeddedTimelineServerHelper {
// Run Embedded Timeline Server
LOG.info("Starting Timeline service !!");
Option<String> hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST);
timelineServer = Option.of(new EmbeddedTimelineService(context, hostAddr.orElse(null),
config.getEmbeddedTimelineServerPort(), config.getClientSpecifiedViewStorageConfig()));
timelineServer = Option.of(new EmbeddedTimelineService(context, hostAddr.orElse(null), config.getEmbeddedTimelineServerPort(),
config.getMetadataConfig(), config.getClientSpecifiedViewStorageConfig(), config.getBasePath()));
timelineServer.get().startServer();
updateWriteConfigWithTimelineServer(timelineServer.get(), config);
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.client.embedded;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.table.view.FileSystemViewManager;
@@ -41,14 +42,22 @@ public class EmbeddedTimelineService {
private int serverPort;
private int preferredPort;
private String hostAddr;
private HoodieEngineContext context;
private final SerializableConfiguration hadoopConf;
private final FileSystemViewStorageConfig config;
private final HoodieMetadataConfig metadataConfig;
private final String basePath;
private transient FileSystemViewManager viewManager;
private transient TimelineService server;
public EmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, int embeddedTimelineServerPort, FileSystemViewStorageConfig config) {
public EmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, int embeddedTimelineServerPort,
HoodieMetadataConfig metadataConfig, FileSystemViewStorageConfig config, String basePath) {
setHostAddr(embeddedTimelineServiceHostAddr);
this.context = context;
this.config = config;
this.basePath = basePath;
this.metadataConfig = metadataConfig;
this.hadoopConf = context.getHadoopConf();
this.viewManager = createViewManager();
this.preferredPort = embeddedTimelineServerPort;
@@ -64,7 +73,7 @@ public class EmbeddedTimelineService {
// Reset to default if set to Remote
builder.withStorageType(FileSystemViewStorageType.MEMORY);
}
return FileSystemViewManager.createViewManager(hadoopConf, builder.build());
return FileSystemViewManager.createViewManager(context, metadataConfig, builder.build(), basePath);
}
public void startServer() throws IOException {

View File

@@ -82,7 +82,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String BULKINSERT_INPUT_DATA_SCHEMA_DDL = "hoodie.bulkinsert.schema.ddl";
public static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
public static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism";
public static final String FILE_LISTING_PARALLELISM = "hoodie.file.listing.parallelism";
public static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
public static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism";
public static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes";
@@ -97,8 +96,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String DEFAULT_WRITE_STATUS_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
public static final String HOODIE_AUTO_COMMIT_PROP = "hoodie.auto.commit";
public static final String DEFAULT_HOODIE_AUTO_COMMIT = "true";
public static final String HOODIE_ASSUME_DATE_PARTITIONING_PROP = "hoodie.assume.date.partitioning";
public static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false";
public static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class";
public static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();
public static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
@@ -157,6 +155,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig;
private FileSystemViewStorageConfig viewStorageConfig;
private HoodiePayloadConfig hoodiePayloadConfig;
private HoodieMetadataConfig metadataConfig;
private EngineType engineType;
@@ -176,6 +175,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
this.clientSpecifiedViewStorageConfig = FileSystemViewStorageConfig.newBuilder().fromProperties(newProps).build();
this.viewStorageConfig = clientSpecifiedViewStorageConfig;
this.hoodiePayloadConfig = HoodiePayloadConfig.newBuilder().fromProperties(newProps).build();
this.metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(props).build();
}
public static HoodieWriteConfig.Builder newBuilder() {
@@ -222,7 +222,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
}
public Boolean shouldAssumeDatePartitioning() {
return Boolean.parseBoolean(props.getProperty(HOODIE_ASSUME_DATE_PARTITIONING_PROP));
return metadataConfig.shouldAssumeDatePartitioning();
}
public boolean shouldUseExternalSchemaTransformation() {
@@ -258,7 +258,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
}
public int getFileListingParallelism() {
return Math.max(Integer.parseInt(props.getProperty(FILE_LISTING_PARALLELISM)), 1);
return metadataConfig.getFileListingParallelism();
}
public boolean shouldRollbackUsingMarkers() {
@@ -837,6 +837,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return hoodiePayloadConfig;
}
public HoodieMetadataConfig getMetadataConfig() {
return metadataConfig;
}
/**
* Commit call back configs.
*/
@@ -888,11 +892,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
* File listing metadata configs.
*/
public boolean useFileListingMetadata() {
return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_ENABLE_PROP));
return metadataConfig.useFileListingMetadata();
}
public boolean getFileListingMetadataVerify() {
return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_VALIDATE_PROP));
return metadataConfig.validateFileListingMetadata();
}
public int getMetadataInsertParallelism() {
@@ -1007,11 +1011,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
public Builder withFileListingParallelism(int parallelism) {
props.setProperty(FILE_LISTING_PARALLELISM, String.valueOf(parallelism));
return this;
}
public Builder withUserDefinedBulkInsertPartitionerClass(String className) {
props.setProperty(BULKINSERT_USER_DEFINED_PARTITIONER_CLASS, className);
return this;
@@ -1118,11 +1117,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
public Builder withAssumeDatePartitioning(boolean assumeDatePartitioning) {
props.setProperty(HOODIE_ASSUME_DATE_PARTITIONING_PROP, String.valueOf(assumeDatePartitioning));
return this;
}
public Builder withWriteStatusClass(Class<? extends WriteStatus> writeStatusClass) {
props.setProperty(HOODIE_WRITE_STATUS_CLASS_PROP, writeStatusClass.getName());
return this;
@@ -1198,8 +1192,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(DELETE_PARALLELISM), DELETE_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(FILE_LISTING_PARALLELISM), FILE_LISTING_PARALLELISM,
DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM,
DEFAULT_ROLLBACK_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(KEYGENERATOR_CLASS_PROP),
@@ -1220,8 +1213,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
DEFAULT_WRITE_STATUS_STORAGE_LEVEL);
setDefaultOnCondition(props, !props.containsKey(HOODIE_AUTO_COMMIT_PROP), HOODIE_AUTO_COMMIT_PROP,
DEFAULT_HOODIE_AUTO_COMMIT);
setDefaultOnCondition(props, !props.containsKey(HOODIE_ASSUME_DATE_PARTITIONING_PROP),
HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING);
setDefaultOnCondition(props, !props.containsKey(HOODIE_WRITE_STATUS_CLASS_PROP), HOODIE_WRITE_STATUS_CLASS_PROP,
DEFAULT_HOODIE_WRITE_STATUS_CLASS);
setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM), FINALIZE_WRITE_PARALLELISM,

View File

@@ -46,6 +46,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMetricsConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
@@ -108,9 +109,6 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf, datasetWriteConfig.getBasePath());
initialize(engineContext, datasetMetaClient);
if (enabled) {
// (re) init the metadata for reading.
initTableMetadata();
// This is always called even in case the table was created for the first time. This is because
// initFromFilesystem() does file listing and hence may take a long time during which some new updates
// may have occurred on the table. Hence, calling this always ensures that the metadata is brought in sync
@@ -148,7 +146,6 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
.withAutoCommit(true)
.withAvroSchemaValidate(true)
.withEmbeddedTimelineServerEnabled(false)
.withAssumeDatePartitioning(false)
.withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath()))
.withSchema(HoodieMetadataRecord.getClassSchema().toString())
.forTable(tableName)
@@ -220,12 +217,17 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
*/
protected abstract void initialize(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient);
private void initTableMetadata() {
this.metadata = new HoodieBackedTableMetadata(engineContext, datasetWriteConfig.getBasePath(),
datasetWriteConfig.getSpillableMapBasePath(), datasetWriteConfig.useFileListingMetadata(),
datasetWriteConfig.getFileListingMetadataVerify(), false,
datasetWriteConfig.shouldAssumeDatePartitioning());
this.metaClient = metadata.getMetaClient();
protected void initTableMetadata() {
try {
if (this.metadata != null) {
this.metadata.close();
}
this.metadata = new HoodieBackedTableMetadata(engineContext, datasetWriteConfig.getMetadataConfig(),
datasetWriteConfig.getBasePath(), datasetWriteConfig.getSpillableMapBasePath());
this.metaClient = metadata.getMetaClient();
} catch (Exception e) {
throw new HoodieException("Error initializing metadata table for reads", e);
}
}
protected void bootstrapIfNeeded(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient) throws IOException {
@@ -355,9 +357,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
*/
private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) {
ValidationUtils.checkState(enabled, "Metadata table cannot be synced as it is not enabled");
// (re) init the metadata for reading.
initTableMetadata();
try {
List<HoodieInstant> instantsToSync = metadata.findInstantsToSync(datasetMetaClient);
List<HoodieInstant> instantsToSync = metadata.findInstantsToSync();
if (instantsToSync.isEmpty()) {
return;
}
@@ -373,7 +376,6 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
commit(records.get(), MetadataPartitionType.FILES.partitionPath(), instant.getTimestamp());
}
}
// re-init the table metadata, for any future writes.
initTableMetadata();
} catch (IOException ioe) {
throw new HoodieIOException("Unable to sync instants from data to metadata table.", ioe);
@@ -450,6 +452,13 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
}
}
@Override
public void close() throws Exception {
if (metadata != null) {
metadata.close();
}
}
/**
* Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit.
*

View File

@@ -29,7 +29,7 @@ import java.io.Serializable;
/**
* Interface that supports updating metadata for a given table, as actions complete.
*/
public interface HoodieTableMetadataWriter extends Serializable {
public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable {
void update(HoodieCommitMetadata commitMetadata, String instantTime);

View File

@@ -29,6 +29,7 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
@@ -49,8 +50,8 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
@@ -63,7 +64,6 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
@@ -94,23 +94,28 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
protected final HoodieWriteConfig config;
protected final HoodieTableMetaClient metaClient;
protected final transient HoodieEngineContext context;
protected final HoodieIndex<T, I, K, O> index;
private SerializableConfiguration hadoopConfiguration;
private transient FileSystemViewManager viewManager;
private HoodieTableMetadata metadata;
protected final TaskContextSupplier taskContextSupplier;
private final HoodieTableMetadata metadata;
private transient FileSystemViewManager viewManager;
protected final transient HoodieEngineContext context;
protected HoodieTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
this.config = config;
this.hadoopConfiguration = context.getHadoopConf();
this.viewManager = FileSystemViewManager.createViewManager(hadoopConfiguration,
config.getViewStorageConfig());
this.context = context;
// disable reuse of resources, given there is no close() called on the executors ultimately
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(config.getMetadataConfig().getProps())
.enableReuse(false).build();
this.metadata = HoodieTableMetadata.create(context, metadataConfig, config.getBasePath(),
FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR);
this.viewManager = FileSystemViewManager.createViewManager(context, config.getMetadataConfig(), config.getViewStorageConfig(), () -> metadata);
this.metaClient = metaClient;
this.index = getIndex(config, context);
this.context = context;
this.taskContextSupplier = context.getTaskContextSupplier();
}
@@ -118,7 +123,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
private synchronized FileSystemViewManager getViewManager() {
if (null == viewManager) {
viewManager = FileSystemViewManager.createViewManager(hadoopConfiguration, config.getViewStorageConfig());
viewManager = FileSystemViewManager.createViewManager(getContext(), config.getMetadataConfig(), config.getViewStorageConfig(), () -> metadata);
}
return viewManager;
}
@@ -249,41 +254,28 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
* Get the view of the file system for this table.
*/
public TableFileSystemView getFileSystemView() {
if (config.useFileListingMetadata()) {
return getFileSystemViewInternal(getCompletedCommitsTimeline());
} else {
return new HoodieTableFileSystemView(metaClient, getCompletedCommitsTimeline());
}
return new HoodieTableFileSystemView(metaClient, getCompletedCommitsTimeline());
}
/**
* Get the base file only view of the file system for this table.
*/
public BaseFileOnlyView getBaseFileOnlyView() {
return getFileSystemViewInternal(metaClient.getActiveTimeline().filterCompletedAndCompactionInstants());
return getViewManager().getFileSystemView(metaClient);
}
/**
* Get the full view of the file system for this table.
*/
public SliceView getSliceView() {
return getFileSystemViewInternal(metaClient.getActiveTimeline().filterCompletedAndCompactionInstants());
return getViewManager().getFileSystemView(metaClient);
}
/**
* Get complete view of the file system for this table with ability to force sync.
*/
public SyncableFileSystemView getHoodieView() {
return getFileSystemViewInternal(metaClient.getActiveTimeline().filterCompletedAndCompactionInstants());
}
private SyncableFileSystemView getFileSystemViewInternal(HoodieTimeline timeline) {
if (config.useFileListingMetadata()) {
FileSystemViewStorageConfig viewConfig = config.getViewStorageConfig();
return new HoodieMetadataFileSystemView(metaClient, this.metadata(), timeline, viewConfig.isIncrementalTimelineSyncEnabled());
} else {
return getViewManager().getFileSystemView(metaClient);
}
return getViewManager().getFileSystemView(metaClient);
}
/**
@@ -661,19 +653,9 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
return getBaseFileFormat() == HoodieFileFormat.HFILE;
}
public HoodieTableMetadata metadata() {
if (metadata == null) {
HoodieEngineContext engineContext = context;
if (engineContext == null) {
// This is to handle scenarios where this is called at the executor tasks which do not have access
// to engine context, and it ends up being null (as its not serializable and marked transient here).
engineContext = new HoodieLocalEngineContext(hadoopConfiguration.get());
}
metadata = HoodieTableMetadata.create(engineContext, config.getBasePath(), config.getSpillableMapBasePath(),
config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.isMetricsOn(),
config.shouldAssumeDatePartitioning());
}
return metadata;
public HoodieEngineContext getContext() {
// This is to handle scenarios where this is called at the executor tasks which do not have access
// to engine context, and it ends up being null (as its not serializable and marked transient here).
return context == null ? new HoodieLocalEngineContext(hadoopConfiguration.get()) : context;
}
}

View File

@@ -46,6 +46,7 @@ import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CollectionUtils;
@@ -56,6 +57,8 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -198,14 +201,20 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
// If metadata table is enabled, do not archive instants which are more recent that the latest synced
// instant on the metadata table. This is required for metadata table sync.
if (config.useFileListingMetadata()) {
Option<String> lastSyncedInstantTime = table.metadata().getSyncedInstantTime();
if (lastSyncedInstantTime.isPresent()) {
LOG.info("Limiting archiving of instants to last synced instant on metadata table at " + lastSyncedInstantTime.get());
instants = instants.filter(i -> HoodieTimeline.compareTimestamps(i.getTimestamp(), HoodieTimeline.LESSER_THAN,
lastSyncedInstantTime.get()));
} else {
LOG.info("Not archiving as there is no instants yet on the metadata table");
instants = Stream.empty();
try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(table.getContext(), config.getMetadataConfig(),
config.getBasePath(), FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR)) {
Option<String> lastSyncedInstantTime = tableMetadata.getSyncedInstantTime();
if (lastSyncedInstantTime.isPresent()) {
LOG.info("Limiting archiving of instants to last synced instant on metadata table at " + lastSyncedInstantTime.get());
instants = instants.filter(i -> HoodieTimeline.compareTimestamps(i.getTimestamp(), HoodieTimeline.LESSER_THAN,
lastSyncedInstantTime.get()));
} else {
LOG.info("Not archiving as there is no instants yet on the metadata table");
instants = Stream.empty();
}
} catch (Exception e) {
throw new HoodieException("Error limiting instant archival based on metadata table", e);
}
}

View File

@@ -67,7 +67,7 @@ public abstract class BaseCleanActionExecutor<T extends HoodieRecordPayload, I,
*/
HoodieCleanerPlan requestClean(HoodieEngineContext context) {
try {
CleanPlanner<T, I, K, O> planner = new CleanPlanner<>(table, config);
CleanPlanner<T, I, K, O> planner = new CleanPlanner<>(context, table, config);
Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();
List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);

View File

@@ -20,6 +20,8 @@ package org.apache.hudi.table.action.clean;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
@@ -76,8 +78,10 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
private final Map<HoodieFileGroupId, CompactionOperation> fgIdToPendingCompactionOperations;
private HoodieTable<T, I, K, O> hoodieTable;
private HoodieWriteConfig config;
private transient HoodieEngineContext context;
public CleanPlanner(HoodieTable<T, I, K, O> hoodieTable, HoodieWriteConfig config) {
public CleanPlanner(HoodieEngineContext context, HoodieTable<T, I, K, O> hoodieTable, HoodieWriteConfig config) {
this.context = context;
this.hoodieTable = hoodieTable;
this.fileSystemView = hoodieTable.getHoodieView();
this.commitTimeline = hoodieTable.getCompletedCommitsTimeline();
@@ -161,7 +165,7 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
*/
private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata,
Option<HoodieInstant> newInstantToRetain) {
LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
LOG.info("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
+ "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
+ ". New Instant to retain : " + newInstantToRetain);
return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(
@@ -190,9 +194,9 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
* @return all partitions paths for the dataset.
* @throws IOException
*/
private List<String> getPartitionPathsForFullCleaning() throws IOException {
private List<String> getPartitionPathsForFullCleaning() {
// Go to brute force mode of scanning all partitions
return hoodieTable.metadata().getAllPartitionPaths();
return FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), config.getBasePath());
}
/**

View File

@@ -28,12 +28,10 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -63,48 +61,42 @@ public abstract class PartitionAwareClusteringPlanStrategy<T extends HoodieRecor
@Override
public Option<HoodieClusteringPlan> generateClusteringPlan() {
try {
HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient();
LOG.info("Scheduling clustering for " + metaClient.getBasePath());
HoodieWriteConfig config = getWriteConfig();
List<String> partitionPaths = FSUtils.getAllPartitionPaths(getEngineContext(), metaClient.getFs(), metaClient.getBasePath(),
config.useFileListingMetadata(), config.getFileListingMetadataVerify(),
config.shouldAssumeDatePartitioning());
HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient();
LOG.info("Scheduling clustering for " + metaClient.getBasePath());
HoodieWriteConfig config = getWriteConfig();
List<String> partitionPaths = FSUtils.getAllPartitionPaths(getEngineContext(), config.getMetadataConfig(), metaClient.getBasePath());
// filter the partition paths if needed to reduce list status
partitionPaths = filterPartitionPaths(partitionPaths);
// filter the partition paths if needed to reduce list status
partitionPaths = filterPartitionPaths(partitionPaths);
if (partitionPaths.isEmpty()) {
// In case no partitions could be picked, return no clustering plan
return Option.empty();
}
List<HoodieClusteringGroup> clusteringGroups = getEngineContext().flatMap(partitionPaths,
partitionPath -> {
List<FileSlice> fileSlicesEligible = getFileSlicesEligibleForClustering(partitionPath).collect(Collectors.toList());
return buildClusteringGroupsForPartition(partitionPath, fileSlicesEligible).limit(getWriteConfig().getClusteringMaxNumGroups());
},
partitionPaths.size())
.stream().limit(getWriteConfig().getClusteringMaxNumGroups()).collect(Collectors.toList());
if (clusteringGroups.isEmpty()) {
LOG.info("No data available to cluster");
return Option.empty();
}
HoodieClusteringStrategy strategy = HoodieClusteringStrategy.newBuilder()
.setStrategyClassName(getWriteConfig().getClusteringExecutionStrategyClass())
.setStrategyParams(getStrategyParams())
.build();
return Option.of(HoodieClusteringPlan.newBuilder()
.setStrategy(strategy)
.setInputGroups(clusteringGroups)
.setExtraMetadata(getExtraMetadata())
.setVersion(getPlanVersion())
.build());
} catch (IOException e) {
throw new HoodieIOException("Unable to create clustering plan", e);
if (partitionPaths.isEmpty()) {
// In case no partitions could be picked, return no clustering plan
return Option.empty();
}
List<HoodieClusteringGroup> clusteringGroups = getEngineContext().flatMap(partitionPaths,
partitionPath -> {
List<FileSlice> fileSlicesEligible = getFileSlicesEligibleForClustering(partitionPath).collect(Collectors.toList());
return buildClusteringGroupsForPartition(partitionPath, fileSlicesEligible).limit(getWriteConfig().getClusteringMaxNumGroups());
},
partitionPaths.size())
.stream().limit(getWriteConfig().getClusteringMaxNumGroups()).collect(Collectors.toList());
if (clusteringGroups.isEmpty()) {
LOG.info("No data available to cluster");
return Option.empty();
}
HoodieClusteringStrategy strategy = HoodieClusteringStrategy.newBuilder()
.setStrategyClassName(getWriteConfig().getClusteringExecutionStrategyClass())
.setStrategyParams(getStrategyParams())
.build();
return Option.of(HoodieClusteringPlan.newBuilder()
.setStrategy(strategy)
.setInputGroups(clusteringGroups)
.setExtraMetadata(getExtraMetadata())
.setVersion(getPlanVersion())
.build());
}
}

View File

@@ -94,15 +94,10 @@ public class RollbackUtils {
* @return {@link List} of {@link ListingBasedRollbackRequest}s thus collected.
*/
public static List<ListingBasedRollbackRequest> generateRollbackRequestsByListingCOW(HoodieEngineContext engineContext,
FileSystem fs, String basePath, HoodieWriteConfig config) {
try {
return FSUtils.getAllPartitionPaths(engineContext, fs, basePath, config.useFileListingMetadata(),
config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning()).stream()
.map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction)
.collect(Collectors.toList());
} catch (IOException e) {
throw new HoodieIOException("Error generating rollback requests", e);
}
String basePath, HoodieWriteConfig config) {
return FSUtils.getAllPartitionPaths(engineContext, config.getMetadataConfig(), basePath).stream()
.map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction)
.collect(Collectors.toList());
}
/**
@@ -116,8 +111,7 @@ public class RollbackUtils {
public static List<ListingBasedRollbackRequest> generateRollbackRequestsUsingFileListingMOR(HoodieInstant instantToRollback, HoodieTable table, HoodieEngineContext context) throws IOException {
String commit = instantToRollback.getTimestamp();
HoodieWriteConfig config = table.getConfig();
List<String> partitions = FSUtils.getAllPartitionPaths(context, table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
List<String> partitions = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath());
int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1);
context.setJobStatus(RollbackUtils.class.getSimpleName(), "Generate all rollback requests");
return context.flatMap(partitions, partitionPath -> {

View File

@@ -90,10 +90,7 @@ public class SavepointActionExecutor<T extends HoodieRecordPayload, I, K, O> ext
"Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained);
context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest files for savepoint " + instantTime);
List<String> partitions = FSUtils.getAllPartitionPaths(context, table.getMetaClient().getFs(),
table.getMetaClient().getBasePath(), config.useFileListingMetadata(), config.getFileListingMetadataVerify(),
config.shouldAssumeDatePartitioning()
);
List<String> partitions = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath());
Map<String, List<String>> latestFilesMap = context.mapToPair(partitions, partitionPath -> {
// Scan all partitions files with this commit time
LOG.info("Collecting latest files in partition path " + partitionPath);