1
0

[HUDI-842] Implementation of HUDI RFC-15.

- Introduced an internal metadata table, that stores file listings.
 - metadata table is kept upto date with
 - Fixed handling of CleanerPlan.
 - [HUDI-842] Reduce parallelism to speed up the test.
 - [HUDI-842] Implementation of CLI commands for metadata operations and lookups.
 - [HUDI-842] Extend rollback metadata to include the files which have been appended to.
 - [HUDI-842] Support for rollbacks in MOR Table.
 - MarkerBasedRollbackStrategy needs to correctly provide the list of files for which rollback blocks were appended.
 - [HUDI-842] Added unit test for rollback of partial commits (inflight but not completed yet).
 - [HUDI-842] Handled the error case where metadata update succeeds but dataset commit fails.
 - [HUDI-842] Schema evolution strategy for Metadata Table. Each type of metadata saved (FilesystemMetadata, ColumnIndexMetadata, etc.) will be a separate field with default null. The type of the record will identify the valid field. This way, we can grow the schema when new type of information is saved within in which still keeping it backward compatible.
 - [HUDI-842] Fix non-partitioned case and speedup initial creation of metadata table.Choose only 1 partition for jsc as the number of records is low (hundreds to thousands). There is more overhead of creating large number of partitions for JavaRDD and it slows down operations like WorkloadProfile.
For the non-partitioned case, use "." as the name of the partition to prevent empty keys in HFile.
 - [HUDI-842] Reworked metrics pusblishing.
 - Code has been split into reader and writer side. HoodieMetadata code to be accessed by using HoodieTable.metadata() to get instance of metdata for the table.
Code is serializable to allow executors to use the functionality.
 - [RFC-15] Add metrics to track the time for each file system call.
 - [RFC-15] Added a distributed metrics registry for spark which can be used to collect metrics from executors. This helps create a stats dashboard which shows the metadata table improvements in real-time for production tables.
 - [HUDI-1321] Created HoodieMetadataConfig to specify configuration for the metadata table. This is safer than full-fledged properties for the metadata table (like HoodieWriteConfig) as it makes burdensome to tune the metadata. With limited configuration, we can control the performance of the metadata table closely.

[HUDI-1319][RFC-15] Adding interfaces for HoodieMetadata, HoodieMetadataWriter (apache#2266)
 - moved MetadataReader to HoodieBackedTableMetadata, under the HoodieTableMetadata interface
 - moved MetadataWriter to HoodieBackedTableMetadataWriter, under the HoodieTableMetadataWriter
 - Pulled all the metrics into HoodieMetadataMetrics
 - Writer now wraps the metadata, instead of extending it
 - New enum for MetadataPartitionType
 - Streamlined code flow inside HoodieBackedTableMetadataWriter w.r.t initializing metadata state
 - [HUDI-1319] Make async operations work with metadata table (apache#2332)
 - Changes the syncing model to only move over completed instants on data timeline
 - Syncing happens postCommit and on writeClient initialization
 - Latest delta commit on the metadata table is sufficient as the watermark for data timeline archival
 - Cleaning/Compaction use a suffix to the last instant written to metadata table, such that we keep the 1-1
 - .. mapping between data and metadata timelines.
 - Got rid of a lot of the complexity around checking for valid commits during open of base/log files
 - Tests now use local FS, to simulate more failure scenarios
 - Some failure scenarios exposed HUDI-1434, which is needed for MOR to work correctly

co-authored by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
Prashant Wason
2020-12-30 18:29:55 -08:00
committed by vinoth chandar
parent c3e9243ea1
commit 298808baaf
45 changed files with 4060 additions and 242 deletions

View File

@@ -71,6 +71,7 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
this.timelineServer = timelineServer;
shouldStopTimelineServer = !timelineServer.isPresent();
startEmbeddedServerView();
initWrapperFSMetrics();
}
/**
@@ -118,6 +119,10 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
return config;
}
protected void initWrapperFSMetrics() {
// no-op.
}
protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) {
return new HoodieTableMetaClient(hadoopConf, config.getBasePath(), loadActiveTimelineOnLoad,
config.getConsistencyGuardConfig(),

View File

@@ -134,6 +134,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
this.metrics = new HoodieMetrics(config, config.getTableName());
this.rollbackPending = rollbackPending;
this.index = createIndex(writeConfig);
syncTableMetadata();
}
protected abstract HoodieIndex<T, I, K, O> createIndex(HoodieWriteConfig writeConfig);
@@ -220,6 +221,10 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
}
}
protected void syncTableMetadata() {
// no-op
}
/**
* Filter out HoodieRecords that already exists in the output folder. This is useful in deduplication.
*
@@ -407,7 +412,9 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
// We cannot have unbounded commit files. Archive commits if we have to archive
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);
archiveLog.archiveIfRequired(context);
autoCleanOnCommit(instantTime);
autoCleanOnCommit();
syncTableMetadata();
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
@@ -434,9 +441,8 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
/**
* Handle auto clean during commit.
*
* @param instantTime
*/
protected void autoCleanOnCommit(String instantTime) {
protected void autoCleanOnCommit() {
if (config.isAutoClean()) {
// Call clean to cleanup if there is anything to cleanup after the commit,
if (config.isAsyncClean()) {
@@ -444,8 +450,9 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
AsyncCleanerService.waitForCompletion(asyncCleanerService);
LOG.info("Cleaner has finished");
} else {
// Do not reuse instantTime for clean as metadata table requires all changes to have unique instant timestamps.
LOG.info("Auto cleaning is enabled. Running cleaner now");
clean(instantTime);
clean();
}
}
}
@@ -599,8 +606,14 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
* Provides a new commit time for a write operation (insert/update/delete).
*/
public String startCommit() {
// NOTE : Need to ensure that rollback is done before a new commit is started
if (rollbackPending) {
// Only rollback pending commit/delta-commits. Do not touch compaction commits
rollbackPendingCommits();
}
String instantTime = HoodieActiveTimeline.createNewInstantTime();
startCommitWithTime(instantTime);
HoodieTableMetaClient metaClient = createMetaClient(true);
startCommit(instantTime, metaClient.getCommitActionType(), metaClient);
return instantTime;
}

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.client;
import org.apache.hudi.async.HoodieAsyncService;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
@@ -52,11 +53,11 @@ class AsyncCleanerService extends HoodieAsyncService {
}), executor);
}
public static AsyncCleanerService startAsyncCleaningIfEnabled(AbstractHoodieWriteClient writeClient,
String instantTime) {
public static AsyncCleanerService startAsyncCleaningIfEnabled(AbstractHoodieWriteClient writeClient) {
AsyncCleanerService asyncCleanerService = null;
if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) {
LOG.info("Auto cleaning is enabled. Running cleaner async to write operation");
String instantTime = HoodieActiveTimeline.createNewInstantTime();
LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime);
asyncCleanerService = new AsyncCleanerService(writeClient, instantTime);
asyncCleanerService.start(null);
} else {

View File

@@ -481,6 +481,11 @@ public class CompactionAdminClient extends AbstractHoodieClient {
throw new HoodieException("FileGroupId " + fgId + " not in pending compaction");
}
@Override
protected void initWrapperFSMetrics() {
// no-op
}
/**
* Holds Operation result for Renaming.
*/

View File

@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.config;
import org.apache.hudi.common.config.DefaultHoodieConfig;
import javax.annotation.concurrent.Immutable;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
/**
* Configurations used by the HUDI Metadata Table.
*/
@Immutable
public class HoodieMetadataConfig extends DefaultHoodieConfig {
public static final String METADATA_PREFIX = "hoodie.metadata";
// Enable the internal Metadata Table which saves file listings
public static final String METADATA_ENABLE_PROP = METADATA_PREFIX + ".enable";
public static final boolean DEFAULT_METADATA_ENABLE = false;
// Validate contents of Metadata Table on each access against the actual filesystem
public static final String METADATA_VALIDATE_PROP = METADATA_PREFIX + ".validate";
public static final boolean DEFAULT_METADATA_VALIDATE = false;
// Parallelism for inserts
public static final String METADATA_INSERT_PARALLELISM_PROP = METADATA_PREFIX + ".insert.parallelism";
public static final int DEFAULT_METADATA_INSERT_PARALLELISM = 1;
// Async clean
public static final String METADATA_ASYNC_CLEAN_PROP = METADATA_PREFIX + ".clean.async";
public static final boolean DEFAULT_METADATA_ASYNC_CLEAN = false;
// Maximum delta commits before compaction occurs
public static final String METADATA_COMPACT_NUM_DELTA_COMMITS_PROP = METADATA_PREFIX + ".compact.max.delta.commits";
public static final int DEFAULT_METADATA_COMPACT_NUM_DELTA_COMMITS = 24;
// Archival settings
public static final String MIN_COMMITS_TO_KEEP_PROP = METADATA_PREFIX + ".keep.min.commits";
public static final int DEFAULT_MIN_COMMITS_TO_KEEP = 20;
public static final String MAX_COMMITS_TO_KEEP_PROP = METADATA_PREFIX + ".keep.max.commits";
public static final int DEFAULT_MAX_COMMITS_TO_KEEP = 30;
// Cleaner commits retained
public static final String CLEANER_COMMITS_RETAINED_PROP = METADATA_PREFIX + ".cleaner.commits.retained";
public static final int DEFAULT_CLEANER_COMMITS_RETAINED = 3;
private HoodieMetadataConfig(Properties props) {
super(props);
}
public static HoodieMetadataConfig.Builder newBuilder() {
return new Builder();
}
public static class Builder {
private final Properties props = new Properties();
public Builder fromFile(File propertiesFile) throws IOException {
try (FileReader reader = new FileReader(propertiesFile)) {
this.props.load(reader);
return this;
}
}
public Builder fromProperties(Properties props) {
this.props.putAll(props);
return this;
}
public Builder enable(boolean enable) {
props.setProperty(METADATA_ENABLE_PROP, String.valueOf(enable));
return this;
}
public Builder validate(boolean validate) {
props.setProperty(METADATA_VALIDATE_PROP, String.valueOf(validate));
return this;
}
public Builder withInsertParallelism(int parallelism) {
props.setProperty(METADATA_INSERT_PARALLELISM_PROP, String.valueOf(parallelism));
return this;
}
public Builder withAsyncClean(boolean asyncClean) {
props.setProperty(METADATA_ASYNC_CLEAN_PROP, String.valueOf(asyncClean));
return this;
}
public Builder withMaxNumDeltaCommitsBeforeCompaction(int maxNumDeltaCommitsBeforeCompaction) {
props.setProperty(METADATA_COMPACT_NUM_DELTA_COMMITS_PROP, String.valueOf(maxNumDeltaCommitsBeforeCompaction));
return this;
}
public Builder archiveCommitsWith(int minToKeep, int maxToKeep) {
props.setProperty(MIN_COMMITS_TO_KEEP_PROP, String.valueOf(minToKeep));
props.setProperty(MAX_COMMITS_TO_KEEP_PROP, String.valueOf(maxToKeep));
return this;
}
public Builder retainCommits(int commitsRetained) {
props.setProperty(CLEANER_COMMITS_RETAINED_PROP, String.valueOf(commitsRetained));
return this;
}
public HoodieMetadataConfig build() {
HoodieMetadataConfig config = new HoodieMetadataConfig(props);
setDefaultOnCondition(props, !props.containsKey(METADATA_ENABLE_PROP), METADATA_ENABLE_PROP,
String.valueOf(DEFAULT_METADATA_ENABLE));
setDefaultOnCondition(props, !props.containsKey(METADATA_VALIDATE_PROP), METADATA_VALIDATE_PROP,
String.valueOf(DEFAULT_METADATA_VALIDATE));
setDefaultOnCondition(props, !props.containsKey(METADATA_INSERT_PARALLELISM_PROP), METADATA_INSERT_PARALLELISM_PROP,
String.valueOf(DEFAULT_METADATA_INSERT_PARALLELISM));
setDefaultOnCondition(props, !props.containsKey(METADATA_ASYNC_CLEAN_PROP), METADATA_ASYNC_CLEAN_PROP,
String.valueOf(DEFAULT_METADATA_ASYNC_CLEAN));
setDefaultOnCondition(props, !props.containsKey(METADATA_COMPACT_NUM_DELTA_COMMITS_PROP),
METADATA_COMPACT_NUM_DELTA_COMMITS_PROP, String.valueOf(DEFAULT_METADATA_COMPACT_NUM_DELTA_COMMITS));
setDefaultOnCondition(props, !props.containsKey(CLEANER_COMMITS_RETAINED_PROP), CLEANER_COMMITS_RETAINED_PROP,
String.valueOf(DEFAULT_CLEANER_COMMITS_RETAINED));
setDefaultOnCondition(props, !props.containsKey(MAX_COMMITS_TO_KEEP_PROP), MAX_COMMITS_TO_KEEP_PROP,
String.valueOf(DEFAULT_MAX_COMMITS_TO_KEEP));
setDefaultOnCondition(props, !props.containsKey(MIN_COMMITS_TO_KEEP_PROP), MIN_COMMITS_TO_KEEP_PROP,
String.valueOf(DEFAULT_MIN_COMMITS_TO_KEEP));
return config;
}
}
}

View File

@@ -62,6 +62,9 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig {
public static final String METRICS_REPORTER_CLASS = METRIC_PREFIX + ".reporter.class";
public static final String DEFAULT_METRICS_REPORTER_CLASS = "";
// Enable metrics collection from executors
public static final String ENABLE_EXECUTOR_METRICS = METRIC_PREFIX + ".executor.enable";
private HoodieMetricsConfig(Properties props) {
super(props);
}
@@ -126,6 +129,11 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig {
return this;
}
public Builder withExecutorMetrics(boolean enable) {
props.setProperty(ENABLE_EXECUTOR_METRICS, String.valueOf(enable));
return this;
}
public HoodieMetricsConfig build() {
HoodieMetricsConfig config = new HoodieMetricsConfig(props);
setDefaultOnCondition(props, !props.containsKey(METRICS_ON), METRICS_ON, String.valueOf(DEFAULT_METRICS_ON));

View File

@@ -689,6 +689,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Boolean.parseBoolean(props.getProperty(HoodieMetricsConfig.METRICS_ON));
}
public boolean isExecutorMetricsEnabled() {
return Boolean.parseBoolean(props.getProperty(HoodieMetricsConfig.ENABLE_EXECUTOR_METRICS, "false"));
}
public MetricsReporterType getMetricsReporterType() {
return MetricsReporterType.valueOf(props.getProperty(HoodieMetricsConfig.METRICS_REPORTER_TYPE));
}
@@ -874,6 +878,41 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Long.valueOf(props.getProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE_PROP));
}
/**
* File listing metadata configs.
*/
public boolean useFileListingMetadata() {
return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_ENABLE_PROP));
}
public boolean getFileListingMetadataVerify() {
return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_VALIDATE_PROP));
}
public int getMetadataInsertParallelism() {
return Integer.parseInt(props.getProperty(HoodieMetadataConfig.METADATA_INSERT_PARALLELISM_PROP));
}
public int getMetadataCompactDeltaCommitMax() {
return Integer.parseInt(props.getProperty(HoodieMetadataConfig.METADATA_COMPACT_NUM_DELTA_COMMITS_PROP));
}
public boolean isMetadataAsyncClean() {
return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_ASYNC_CLEAN_PROP));
}
public int getMetadataMaxCommitsToKeep() {
return Integer.parseInt(props.getProperty(HoodieMetadataConfig.MAX_COMMITS_TO_KEEP_PROP));
}
public int getMetadataMinCommitsToKeep() {
return Integer.parseInt(props.getProperty(HoodieMetadataConfig.MIN_COMMITS_TO_KEEP_PROP));
}
public int getMetadataCleanerCommitsRetained() {
return Integer.parseInt(props.getProperty(HoodieMetadataConfig.CLEANER_COMMITS_RETAINED_PROP));
}
public static class Builder {
protected final Properties props = new Properties();
@@ -889,6 +928,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private boolean isConsistencyGuardSet = false;
private boolean isCallbackConfigSet = false;
private boolean isPayloadConfigSet = false;
private boolean isMetadataConfigSet = false;
public Builder withEngineType(EngineType engineType) {
this.engineType = engineType;
@@ -1056,6 +1096,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
public Builder withMetadataConfig(HoodieMetadataConfig metadataConfig) {
props.putAll(metadataConfig.getProps());
isMetadataConfigSet = true;
return this;
}
public Builder withAutoCommit(boolean autoCommit) {
props.setProperty(HOODIE_AUTO_COMMIT_PROP, String.valueOf(autoCommit));
return this;
@@ -1204,6 +1250,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
HoodieWriteCommitCallbackConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isPayloadConfigSet,
HoodiePayloadConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isMetadataConfigSet,
HoodieMetadataConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !props.containsKey(EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION),
EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION, DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION);

View File

@@ -0,0 +1,647 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.metadata;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMetadataConfig;
import org.apache.hudi.config.HoodieMetricsConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
/**
* Writer implementation backed by an internal hudi table. Partition and file listing are saved within an internal MOR table
* called Metadata Table. This table is created by listing files and partitions (first time)
* and kept in sync using the instants on the main dataset.
*/
public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMetadataWriter {
private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadataWriter.class);
protected HoodieWriteConfig metadataWriteConfig;
protected HoodieWriteConfig datasetWriteConfig;
protected String tableName;
protected HoodieBackedTableMetadata metadata;
protected HoodieTableMetaClient metaClient;
protected Option<HoodieMetadataMetrics> metrics;
protected boolean enabled;
protected SerializableConfiguration hadoopConf;
protected final transient HoodieEngineContext engineContext;
protected HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
this.datasetWriteConfig = writeConfig;
this.engineContext = engineContext;
this.hadoopConf = new SerializableConfiguration(hadoopConf);
if (writeConfig.useFileListingMetadata()) {
this.tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;
this.metadataWriteConfig = createMetadataWriteConfig(writeConfig);
enabled = true;
// Inline compaction and auto clean is required as we dont expose this table outside
ValidationUtils.checkArgument(!this.metadataWriteConfig.isAutoClean(), "Cleaning is controlled internally for Metadata table.");
ValidationUtils.checkArgument(!this.metadataWriteConfig.isInlineCompaction(), "Compaction is controlled internally for metadata table.");
// Metadata Table cannot have metadata listing turned on. (infinite loop, much?)
ValidationUtils.checkArgument(this.metadataWriteConfig.shouldAutoCommit(), "Auto commit is required for Metadata Table");
ValidationUtils.checkArgument(!this.metadataWriteConfig.useFileListingMetadata(), "File listing cannot be used for Metadata Table");
initRegistry();
HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf, datasetWriteConfig.getBasePath());
initialize(engineContext, datasetMetaClient);
if (enabled) {
// (re) init the metadata for reading.
initTableMetadata();
// This is always called even in case the table was created for the first time. This is because
// initFromFilesystem() does file listing and hence may take a long time during which some new updates
// may have occurred on the table. Hence, calling this always ensures that the metadata is brought in sync
// with the active timeline.
HoodieTimer timer = new HoodieTimer().startTimer();
syncFromInstants(datasetMetaClient);
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.SYNC_STR, timer.endTimer()));
}
} else {
enabled = false;
this.metrics = Option.empty();
}
}
protected abstract void initRegistry();
/**
* Create a {@code HoodieWriteConfig} to use for the Metadata Table.
*
* @param writeConfig {@code HoodieWriteConfig} of the main dataset writer
*/
private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfig) {
int parallelism = writeConfig.getMetadataInsertParallelism();
// Create the write config for the metadata table by borrowing options from the main write config.
HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
.withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
.withConsistencyCheckEnabled(writeConfig.getConsistencyGuardConfig().isConsistencyCheckEnabled())
.withInitialConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getInitialConsistencyCheckIntervalMs())
.withMaxConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getMaxConsistencyCheckIntervalMs())
.withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks())
.build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.withAutoCommit(true)
.withAvroSchemaValidate(true)
.withEmbeddedTimelineServerEnabled(false)
.withAssumeDatePartitioning(false)
.withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath()))
.withSchema(HoodieMetadataRecord.getClassSchema().toString())
.forTable(tableName)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withAsyncClean(writeConfig.isMetadataAsyncClean())
// we will trigger cleaning manually, to control the instant times
.withAutoClean(false)
.withCleanerParallelism(parallelism)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.retainCommits(writeConfig.getMetadataCleanerCommitsRetained())
.archiveCommitsWith(writeConfig.getMetadataMinCommitsToKeep(), writeConfig.getMetadataMaxCommitsToKeep())
// we will trigger compaction manually, to control the instant times
.withInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build())
.withParallelism(parallelism, parallelism)
.withDeleteParallelism(parallelism)
.withRollbackParallelism(parallelism)
.withFinalizeWriteParallelism(parallelism);
if (writeConfig.isMetricsOn()) {
HoodieMetricsConfig.Builder metricsConfig = HoodieMetricsConfig.newBuilder()
.withReporterType(writeConfig.getMetricsReporterType().toString())
.withExecutorMetrics(writeConfig.isExecutorMetricsEnabled())
.on(true);
switch (writeConfig.getMetricsReporterType()) {
case GRAPHITE:
metricsConfig.onGraphitePort(writeConfig.getGraphiteServerPort())
.toGraphiteHost(writeConfig.getGraphiteServerHost())
.usePrefix(writeConfig.getGraphiteMetricPrefix());
break;
case JMX:
metricsConfig.onJmxPort(writeConfig.getJmxPort())
.toJmxHost(writeConfig.getJmxHost());
break;
case DATADOG:
// TODO:
break;
case CONSOLE:
case INMEMORY:
break;
default:
throw new HoodieMetadataException("Unsupported Metrics Reporter type " + writeConfig.getMetricsReporterType());
}
builder.withMetricsConfig(metricsConfig.build());
}
return builder.build();
}
public HoodieWriteConfig getWriteConfig() {
return metadataWriteConfig;
}
public HoodieTableMetadata metadata() {
return metadata;
}
/**
* Initialize the metadata table if it does not exist. Update the metadata to bring it in sync with the file system.
*
* This can happen in two ways:
* 1. If the metadata table did not exist, then file and partition listing is used
* 2. If the metadata table exists, the instants from active timeline are read in order and changes applied
*
* The above logic has been chosen because it is faster to perform #1 at scale rather than read all the Instants
* which are large in size (AVRO or JSON encoded and not compressed) and incur considerable IO for de-serialization
* and decoding.
*/
protected abstract void initialize(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient);
private void initTableMetadata() {
this.metadata = new HoodieBackedTableMetadata(hadoopConf.get(), datasetWriteConfig.getBasePath(), datasetWriteConfig.getSpillableMapBasePath(),
datasetWriteConfig.useFileListingMetadata(), datasetWriteConfig.getFileListingMetadataVerify(), false,
datasetWriteConfig.shouldAssumeDatePartitioning());
this.metaClient = metadata.getMetaClient();
}
protected void bootstrapIfNeeded(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient) throws IOException {
HoodieTimer timer = new HoodieTimer().startTimer();
boolean exists = datasetMetaClient.getFs().exists(new Path(metadataWriteConfig.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME));
if (!exists) {
// Initialize for the first time by listing partitions and files directly from the file system
bootstrapFromFilesystem(engineContext, datasetMetaClient);
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
}
}
/**
* Initialize the Metadata Table by listing files and partitions from the file system.
*
* @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
*/
private void bootstrapFromFilesystem(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient) throws IOException {
ValidationUtils.checkState(enabled, "Metadata table cannot be initialized as it is not enabled");
// If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
// Otherwise, we use the timestamp of the instant which does not have any non-completed instants before it.
Option<HoodieInstant> latestInstant = Option.empty();
boolean foundNonComplete = false;
for (HoodieInstant instant : datasetMetaClient.getActiveTimeline().getInstants().collect(Collectors.toList())) {
if (!instant.isCompleted()) {
foundNonComplete = true;
} else if (!foundNonComplete) {
latestInstant = Option.of(instant);
}
}
String createInstantTime = latestInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
LOG.info("Creating a new metadata table in " + metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
HoodieTableMetaClient.initTableType(hadoopConf.get(), metadataWriteConfig.getBasePath(),
HoodieTableType.MERGE_ON_READ, tableName, "archived", HoodieMetadataPayload.class.getName(),
HoodieFileFormat.HFILE.toString());
initTableMetadata();
// List all partitions in the basePath of the containing dataset
FileSystem fs = datasetMetaClient.getFs();
List<String> partitions = FSUtils.getAllPartitionPaths(fs, datasetWriteConfig.getBasePath(), datasetWriteConfig.shouldAssumeDatePartitioning());
LOG.info("Initializing metadata table by using file listings in " + partitions.size() + " partitions");
// List all partitions in parallel and collect the files in them
int parallelism = Math.max(partitions.size(), 1);
List<Pair<String, FileStatus[]>> partitionFileList = engineContext.map(partitions, partition -> {
FileSystem fsys = datasetMetaClient.getFs();
FileStatus[] statuses = FSUtils.getAllDataFilesInPartition(fsys, new Path(datasetWriteConfig.getBasePath(), partition));
return Pair.of(partition, statuses);
}, parallelism);
// Create a HoodieCommitMetadata with writeStats for all discovered files
int[] stats = {0};
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
partitionFileList.forEach(t -> {
final String partition = t.getKey();
try {
if (!fs.exists(new Path(datasetWriteConfig.getBasePath(), partition + Path.SEPARATOR + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))) {
return;
}
} catch (IOException e) {
throw new HoodieMetadataException("Failed to check partition " + partition, e);
}
// Filter the statuses to only include files which were created before or on createInstantTime
Arrays.stream(t.getValue()).filter(status -> {
String filename = status.getPath().getName();
if (filename.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
return false;
}
if (HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(filename), HoodieTimeline.GREATER_THAN,
createInstantTime)) {
return false;
}
return true;
}).forEach(status -> {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setPath(partition + Path.SEPARATOR + status.getPath().getName());
writeStat.setPartitionPath(partition);
writeStat.setTotalWriteBytes(status.getLen());
commitMetadata.addWriteStat(partition, writeStat);
stats[0] += 1;
});
// If the partition has no files then create a writeStat with no file path
if (commitMetadata.getWriteStats(partition) == null) {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setPartitionPath(partition);
commitMetadata.addWriteStat(partition, writeStat);
}
});
LOG.info("Committing " + partitionFileList.size() + " partitions and " + stats[0] + " files to metadata");
update(commitMetadata, createInstantTime);
}
/**
* Sync the Metadata Table from the instants created on the dataset.
*
* @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
*/
private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) {
ValidationUtils.checkState(enabled, "Metadata table cannot be synced as it is not enabled");
try {
List<HoodieInstant> instantsToSync = metadata.findInstantsToSync(datasetMetaClient);
if (instantsToSync.isEmpty()) {
return;
}
LOG.info("Syncing " + instantsToSync.size() + " instants to metadata table: " + instantsToSync);
// Read each instant in order and sync it to metadata table
final HoodieActiveTimeline timeline = datasetMetaClient.getActiveTimeline();
for (HoodieInstant instant : instantsToSync) {
LOG.info("Syncing instant " + instant + " to metadata table");
ValidationUtils.checkArgument(instant.isCompleted(), "Only completed instants can be synced.");
switch (instant.getAction()) {
case HoodieTimeline.CLEAN_ACTION:
HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(datasetMetaClient, instant);
update(cleanMetadata, instant.getTimestamp());
break;
case HoodieTimeline.DELTA_COMMIT_ACTION:
case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.COMPACTION_ACTION:
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
update(commitMetadata, instant.getTimestamp());
break;
case HoodieTimeline.ROLLBACK_ACTION:
HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
timeline.getInstantDetails(instant).get());
update(rollbackMetadata, instant.getTimestamp());
break;
case HoodieTimeline.RESTORE_ACTION:
HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
timeline.getInstantDetails(instant).get());
update(restoreMetadata, instant.getTimestamp());
break;
case HoodieTimeline.SAVEPOINT_ACTION:
// Nothing to be done here
break;
default:
throw new HoodieException("Unknown type of action " + instant.getAction());
}
}
// re-init the table metadata, for any future writes.
initTableMetadata();
} catch (IOException ioe) {
throw new HoodieIOException("Unable to sync instants from data to metadata table.", ioe);
}
}
/**
* Update from {@code HoodieCommitMetadata}.
*
* @param commitMetadata {@code HoodieCommitMetadata}
* @param instantTime Timestamp at which the commit was performed
*/
@Override
public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
if (!enabled) {
return;
}
List<HoodieRecord> records = new LinkedList<>();
List<String> allPartitions = new LinkedList<>();
commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> {
final String partition = partitionStatName.equals("") ? NON_PARTITIONED_NAME : partitionStatName;
allPartitions.add(partition);
Map<String, Long> newFiles = new HashMap<>(writeStats.size());
writeStats.forEach(hoodieWriteStat -> {
String pathWithPartition = hoodieWriteStat.getPath();
if (pathWithPartition == null) {
throw new HoodieMetadataException("Unable to find path in write stat to update metadata table " + hoodieWriteStat);
}
int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 : partition.length() + 1;
String filename = pathWithPartition.substring(offset);
ValidationUtils.checkState(!newFiles.containsKey(filename), "Duplicate files in HoodieCommitMetadata");
newFiles.put(filename, hoodieWriteStat.getTotalWriteBytes());
});
// New files added to a partition
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(
partition, Option.of(newFiles), Option.empty());
records.add(record);
});
// New partitions created
HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new ArrayList<>(allPartitions));
records.add(record);
LOG.info("Updating at " + instantTime + " from Commit/" + commitMetadata.getOperationType()
+ ". #partitions_updated=" + records.size());
commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
}
/**
* Update from {@code HoodieCleanerPlan}.
*
* @param cleanerPlan {@code HoodieCleanerPlan}
* @param instantTime Timestamp at which the clean plan was generated
*/
@Override
public void update(HoodieCleanerPlan cleanerPlan, String instantTime) {
if (!enabled) {
return;
}
List<HoodieRecord> records = new LinkedList<>();
int[] fileDeleteCount = {0};
cleanerPlan.getFilePathsToBeDeletedPerPartition().forEach((partition, deletedPathInfo) -> {
fileDeleteCount[0] += deletedPathInfo.size();
// Files deleted from a partition
List<String> deletedFilenames = deletedPathInfo.stream().map(p -> new Path(p.getFilePath()).getName())
.collect(Collectors.toList());
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
Option.of(deletedFilenames));
records.add(record);
});
LOG.info("Updating at " + instantTime + " from CleanerPlan. #partitions_updated=" + records.size()
+ ", #files_deleted=" + fileDeleteCount[0]);
commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
}
/**
* Update from {@code HoodieCleanMetadata}.
*
* @param cleanMetadata {@code HoodieCleanMetadata}
* @param instantTime Timestamp at which the clean was completed
*/
@Override
public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
if (!enabled) {
return;
}
List<HoodieRecord> records = new LinkedList<>();
int[] fileDeleteCount = {0};
cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> {
// Files deleted from a partition
List<String> deletedFiles = partitionMetadata.getSuccessDeleteFiles();
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
Option.of(new ArrayList<>(deletedFiles)));
records.add(record);
fileDeleteCount[0] += deletedFiles.size();
});
LOG.info("Updating at " + instantTime + " from Clean. #partitions_updated=" + records.size()
+ ", #files_deleted=" + fileDeleteCount[0]);
commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
}
/**
* Update from {@code HoodieRestoreMetadata}.
*
* @param restoreMetadata {@code HoodieRestoreMetadata}
* @param instantTime Timestamp at which the restore was performed
*/
@Override
public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) {
if (!enabled) {
return;
}
Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> {
rms.forEach(rm -> processRollbackMetadata(rm, partitionToDeletedFiles, partitionToAppendedFiles));
});
commitRollback(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Restore");
}
/**
* Update from {@code HoodieRollbackMetadata}.
*
* @param rollbackMetadata {@code HoodieRollbackMetadata}
* @param instantTime Timestamp at which the rollback was performed
*/
@Override
public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) {
if (!enabled) {
return;
}
Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
processRollbackMetadata(rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles);
commitRollback(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Rollback");
}
/**
* Extracts information about the deleted and append files from the {@code HoodieRollbackMetadata}.
*
* During a rollback files may be deleted (COW, MOR) or rollback blocks be appended (MOR only) to files. This
* function will extract this change file for each partition.
*
* @param rollbackMetadata {@code HoodieRollbackMetadata}
* @param partitionToDeletedFiles The {@code Map} to fill with files deleted per partition.
* @param partitionToAppendedFiles The {@code Map} to fill with files appended per partition and their sizes.
*/
private void processRollbackMetadata(HoodieRollbackMetadata rollbackMetadata,
Map<String, List<String>> partitionToDeletedFiles,
Map<String, Map<String, Long>> partitionToAppendedFiles) {
rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
final String partition = pm.getPartitionPath();
if (!pm.getSuccessDeleteFiles().isEmpty()) {
if (!partitionToDeletedFiles.containsKey(partition)) {
partitionToDeletedFiles.put(partition, new ArrayList<>());
}
// Extract deleted file name from the absolute paths saved in getSuccessDeleteFiles()
List<String> deletedFiles = pm.getSuccessDeleteFiles().stream().map(p -> new Path(p).getName())
.collect(Collectors.toList());
partitionToDeletedFiles.get(partition).addAll(deletedFiles);
}
if (!pm.getAppendFiles().isEmpty()) {
if (!partitionToAppendedFiles.containsKey(partition)) {
partitionToAppendedFiles.put(partition, new HashMap<>());
}
// Extract appended file name from the absolute paths saved in getAppendFiles()
pm.getAppendFiles().forEach((path, size) -> {
partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, (oldSize, newSizeCopy) -> {
return size + oldSize;
});
});
}
});
}
/**
* Create file delete records and commit.
*
* @param partitionToDeletedFiles {@code Map} of partitions and the deleted files
* @param instantTime Timestamp at which the deletes took place
* @param operation Type of the operation which caused the files to be deleted
*/
private void commitRollback(Map<String, List<String>> partitionToDeletedFiles,
Map<String, Map<String, Long>> partitionToAppendedFiles, String instantTime,
String operation) {
List<HoodieRecord> records = new LinkedList<>();
int[] fileChangeCount = {0, 0}; // deletes, appends
partitionToDeletedFiles.forEach((partition, deletedFiles) -> {
// Rollbacks deletes instants from timeline. The instant being rolled-back may not have been synced to the
// metadata table. Hence, the deleted filed need to be checked against the metadata.
try {
FileStatus[] existingStatuses = metadata.fetchAllFilesInPartition(new Path(metadata.getDatasetBasePath(), partition));
Set<String> currentFiles =
Arrays.stream(existingStatuses).map(s -> s.getPath().getName()).collect(Collectors.toSet());
int origCount = deletedFiles.size();
deletedFiles.removeIf(f -> !currentFiles.contains(f));
if (deletedFiles.size() != origCount) {
LOG.warn("Some Files to be deleted as part of " + operation + " at " + instantTime + " were not found in the "
+ " metadata for partition " + partition
+ ". To delete = " + origCount + ", found=" + deletedFiles.size());
}
fileChangeCount[0] += deletedFiles.size();
Option<Map<String, Long>> filesAdded = Option.empty();
if (partitionToAppendedFiles.containsKey(partition)) {
filesAdded = Option.of(partitionToAppendedFiles.remove(partition));
}
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded,
Option.of(new ArrayList<>(deletedFiles)));
records.add(record);
} catch (IOException e) {
throw new HoodieMetadataException("Failed to commit rollback deletes at instant " + instantTime, e);
}
});
partitionToAppendedFiles.forEach((partition, appendedFileMap) -> {
fileChangeCount[1] += appendedFileMap.size();
// Validate that no appended file has been deleted
ValidationUtils.checkState(
!appendedFileMap.keySet().removeAll(partitionToDeletedFiles.getOrDefault(partition, Collections.emptyList())),
"Rollback file cannot both be appended and deleted");
// New files added to a partition
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.of(appendedFileMap),
Option.empty());
records.add(record);
});
LOG.info("Updating at " + instantTime + " from " + operation + ". #partitions_updated=" + records.size()
+ ", #files_deleted=" + fileChangeCount[0] + ", #files_appended=" + fileChangeCount[1]);
commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
}
/**
* Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit.
*
*/
protected abstract void commit(List<HoodieRecord> records, String partitionName, String instantTime);
}

View File

@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.metadata;
import java.io.IOException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.table.HoodieTable;
/**
* {@code HoodieTableFileSystemView} implementation that retrieved partition listings from the Metadata Table.
*/
public class HoodieMetadataFileSystemView extends HoodieTableFileSystemView {
private HoodieTable hoodieTable;
public HoodieMetadataFileSystemView(HoodieTableMetaClient metaClient, HoodieTable table,
HoodieTimeline visibleActiveTimeline, boolean enableIncrementalTimelineSync) {
super(metaClient, visibleActiveTimeline, enableIncrementalTimelineSync);
this.hoodieTable = table;
}
/**
* Return all the files in the partition by reading from the Metadata Table.
*
* @param partitionPath The absolute path of the partition
* @throws IOException
*/
@Override
protected FileStatus[] listPartition(Path partitionPath) throws IOException {
return hoodieTable.metadata().getAllFilesInPartition(partitionPath);
}
}

View File

@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.metadata;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import java.io.Serializable;
/**
* Interface that supports updating metadata for a given table, as actions complete.
*/
public interface HoodieTableMetadataWriter extends Serializable {
void update(HoodieCommitMetadata commitMetadata, String instantTime);
void update(HoodieCleanerPlan cleanerPlan, String instantTime);
void update(HoodieCleanMetadata cleanMetadata, String instantTime);
void update(HoodieRestoreMetadata restoreMetadata, String instantTime);
void update(HoodieRollbackMetadata rollbackMetadata, String instantTime);
}

View File

@@ -49,6 +49,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
@@ -61,6 +62,8 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.log4j.LogManager;
@@ -94,6 +97,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
private SerializableConfiguration hadoopConfiguration;
private transient FileSystemViewManager viewManager;
private HoodieTableMetadata metadata;
protected final TaskContextSupplier taskContextSupplier;
@@ -242,28 +246,41 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
* Get the view of the file system for this table.
*/
public TableFileSystemView getFileSystemView() {
return new HoodieTableFileSystemView(metaClient, getCompletedCommitsTimeline());
if (config.useFileListingMetadata()) {
return getFileSystemViewInternal(getCompletedCommitsTimeline());
} else {
return new HoodieTableFileSystemView(metaClient, getCompletedCommitsTimeline());
}
}
/**
* Get the base file only view of the file system for this table.
*/
public BaseFileOnlyView getBaseFileOnlyView() {
return getViewManager().getFileSystemView(metaClient);
return getFileSystemViewInternal(metaClient.getActiveTimeline().filterCompletedAndCompactionInstants());
}
/**
* Get the full view of the file system for this table.
*/
public SliceView getSliceView() {
return getViewManager().getFileSystemView(metaClient);
return getFileSystemViewInternal(metaClient.getActiveTimeline().filterCompletedAndCompactionInstants());
}
/**
* Get complete view of the file system for this table with ability to force sync.
*/
public SyncableFileSystemView getHoodieView() {
return getViewManager().getFileSystemView(metaClient);
return getFileSystemViewInternal(metaClient.getActiveTimeline().filterCompletedAndCompactionInstants());
}
private SyncableFileSystemView getFileSystemViewInternal(HoodieTimeline timeline) {
if (config.useFileListingMetadata()) {
FileSystemViewStorageConfig viewConfig = config.getViewStorageConfig();
return new HoodieMetadataFileSystemView(metaClient, this, timeline, viewConfig.isIncrementalTimelineSyncEnabled());
} else {
return getViewManager().getFileSystemView(metaClient);
}
}
/**
@@ -640,4 +657,12 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
public boolean requireSortedRecords() {
return getBaseFileFormat() == HoodieFileFormat.HFILE;
}
public HoodieTableMetadata metadata() {
if (metadata == null) {
metadata = HoodieTableMetadata.create(hadoopConfiguration.get(), config.getBasePath(), config.getSpillableMapBasePath(),
config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.isMetricsOn(), config.shouldAssumeDatePartitioning());
}
return metadata;
}
}

View File

@@ -88,6 +88,11 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
private final HoodieTable<T, I, K, O> table;
private final HoodieTableMetaClient metaClient;
/*
public HoodieTimelineArchiveLog(HoodieWriteConfig config, Configuration configuration) {
this(config, HoodieTable.create(config, configuration));
}*/
public HoodieTimelineArchiveLog(HoodieWriteConfig config, HoodieTable<T, I, K, O> table) {
this.config = config;
this.table = table;
@@ -195,6 +200,20 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
.collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(),
HoodieInstant.getComparableAction(i.getAction()))));
// If metadata table is enabled, do not archive instants which are more recent that the latest synced
// instant on the metadata table. This is required for metadata table sync.
if (config.useFileListingMetadata()) {
Option<String> lastSyncedInstantTime = table.metadata().getSyncedInstantTime();
if (lastSyncedInstantTime.isPresent()) {
LOG.info("Limiting archiving of instants to last synced instant on metadata table at " + lastSyncedInstantTime.get());
instants = instants.filter(i -> HoodieTimeline.compareTimestamps(i.getTimestamp(), HoodieTimeline.LESSER_THAN,
lastSyncedInstantTime.get()));
} else {
LOG.info("Not archiving as there is no instants yet on the metadata table");
instants = Stream.empty();
}
}
return instants.flatMap(hoodieInstant ->
groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(),
HoodieInstant.getComparableAction(hoodieInstant.getAction()))).stream());

View File

@@ -20,7 +20,6 @@ package org.apache.hudi.table.action.clean;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
@@ -180,14 +179,13 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
}
/**
* Scan and list all paritions for cleaning.
* Scan and list all partitions for cleaning.
* @return all partitions paths for the dataset.
* @throws IOException
*/
private List<String> getPartitionPathsForFullCleaning() throws IOException {
// Go to brute force mode of scanning all partitions
return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(), hoodieTable.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning());
return hoodieTable.metadata().getAllPartitionPaths();
}
/**

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.table.action.rollback;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.common.HoodieRollbackStat;
@@ -37,6 +38,7 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
/**
* Performs rollback using marker files generated during the write..
@@ -119,10 +121,17 @@ public abstract class AbstractMarkerBasedRollbackStrategy<T extends HoodieRecord
}
}
Map<FileStatus, Long> filesToNumBlocksRollback = Collections.emptyMap();
if (config.useFileListingMetadata()) {
// When metadata is enabled, the information of files appended to is required
filesToNumBlocksRollback = Collections.singletonMap(
table.getMetaClient().getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
1L);
}
return HoodieRollbackStat.newBuilder()
.withPartitionPath(partitionPath)
// we don't use this field per se. Avoiding the extra file status call.
.withRollbackBlockAppendResults(Collections.emptyMap())
.withRollbackBlockAppendResults(filesToNumBlocksRollback)
.build();
}
}

View File

@@ -107,7 +107,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
HoodieWriteMetadata<List<WriteStatus>> result = table.upsert(context, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
@@ -126,7 +126,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.INSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());

View File

@@ -98,7 +98,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
HoodieWriteMetadata<List<WriteStatus>> result = table.upsert(context, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
@@ -113,7 +113,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT_PREPPED);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
HoodieWriteMetadata<List<WriteStatus>> result = table.upsertPrepped(context,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
}
@@ -124,7 +124,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.INSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
@@ -139,7 +139,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT_PREPPED);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
HoodieWriteMetadata<List<WriteStatus>> result = table.insertPrepped(context,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
}

View File

@@ -23,6 +23,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -41,6 +43,8 @@ import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndex;
import org.apache.hudi.metrics.DistributedRegistry;
import org.apache.hudi.metrics.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
@@ -51,6 +55,7 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -136,7 +141,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.upsert(context, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
@@ -150,7 +155,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT_PREPPED);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.upsertPrepped(context,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
}
@@ -161,7 +166,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.insert(context,instantTime, records);
return postWrite(result, instantTime, table);
}
@@ -172,7 +177,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT_PREPPED);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.insertPrepped(context,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
}
@@ -188,7 +193,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT_OVERWRITE);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
HoodieWriteMetadata result = table.insertOverwrite(context, instantTime, records);
return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
}
@@ -205,7 +210,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE_TABLE, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT_OVERWRITE_TABLE);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
HoodieWriteMetadata result = table.insertOverwriteTable(context, instantTime, records);
return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
}
@@ -221,7 +226,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
getTableAndInitCtx(WriteOperationType.BULK_INSERT, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.BULK_INSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.bulkInsert(context,instantTime, records, userDefinedBulkInsertPartitioner);
return postWrite(result, instantTime, table);
}
@@ -232,7 +237,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.BULK_INSERT_PREPPED);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.bulkInsertPrepped(context,instantTime, preppedRecords, bulkInsertPartitioner);
return postWrite(result, instantTime, table);
}
@@ -394,4 +399,34 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
}
return table;
}
@Override
public void syncTableMetadata() {
// Open up the metadata table again, for syncing
SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
}
@Override
protected void initWrapperFSMetrics() {
if (config.isMetricsOn()) {
Registry registry;
Registry registryMeta;
JavaSparkContext jsc = ((HoodieSparkEngineContext) context).getJavaSparkContext();
if (config.isExecutorMetricsEnabled()) {
// Create a distributed registry for HoodieWrapperFileSystem
registry = Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName(),
DistributedRegistry.class.getName());
((DistributedRegistry)registry).register(jsc);
registryMeta = Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName() + "MetaFolder",
DistributedRegistry.class.getName());
((DistributedRegistry)registryMeta).register(jsc);
} else {
registry = Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName());
registryMeta = Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName() + "MetaFolder");
}
HoodieWrapperFileSystem.setMetricsRegistry(registry, registryMeta);
}
}
}

View File

@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.metrics;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hudi.common.metrics.Registry;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.util.AccumulatorV2;
/**
* Lightweight Metrics Registry to track Hudi events.
*/
public class DistributedRegistry extends AccumulatorV2<Map<String, Long>, Map<String, Long>>
implements Registry, Serializable {
private String name;
ConcurrentHashMap<String, Long> counters = new ConcurrentHashMap<>();
public DistributedRegistry(String name) {
this.name = name;
}
public void register(JavaSparkContext jsc) {
if (!isRegistered()) {
jsc.sc().register(this);
}
}
@Override
public void clear() {
counters.clear();
}
@Override
public void increment(String name) {
counters.merge(name, 1L, (oldValue, newValue) -> oldValue + newValue);
}
@Override
public void add(String name, long value) {
counters.merge(name, value, (oldValue, newValue) -> oldValue + newValue);
}
/**
* Get all Counter type metrics.
*/
@Override
public Map<String, Long> getAllCounts(boolean prefixWithRegistryName) {
HashMap<String, Long> countersMap = new HashMap<>();
counters.forEach((k, v) -> {
String key = prefixWithRegistryName ? name + "." + k : k;
countersMap.put(key, v);
});
return countersMap;
}
@Override
public void add(Map<String, Long> arg) {
arg.forEach((key, value) -> add(key, value));
}
@Override
public AccumulatorV2<Map<String, Long>, Map<String, Long>> copy() {
DistributedRegistry registry = new DistributedRegistry(name);
counters.forEach((key, value) -> registry.add(key, value));
return registry;
}
@Override
public boolean isZero() {
return counters.isEmpty();
}
@Override
public void merge(AccumulatorV2<Map<String, Long>, Map<String, Long>> acc) {
acc.value().forEach((key, value) -> add(key, value));
}
@Override
public void reset() {
counters.clear();
}
@Override
public Map<String, Long> value() {
return counters;
}
}

View File

@@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.metrics;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieMetadataMetrics;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter {
private static final Logger LOG = LogManager.getLogger(SparkHoodieBackedTableMetadataWriter.class);
public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, HoodieEngineContext context) {
return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, context);
}
SparkHoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
super(hadoopConf, writeConfig, engineContext);
}
@Override
protected void initRegistry() {
if (metadataWriteConfig.isMetricsOn()) {
Registry registry;
if (metadataWriteConfig.isExecutorMetricsEnabled()) {
registry = Registry.getRegistry("HoodieMetadata", DistributedRegistry.class.getName());
} else {
registry = Registry.getRegistry("HoodieMetadata");
}
this.metrics = Option.of(new HoodieMetadataMetrics(registry));
} else {
this.metrics = Option.empty();
}
}
@Override
protected void initialize(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient) {
try {
metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> {
if (registry instanceof DistributedRegistry) {
HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) engineContext;
((DistributedRegistry) registry).register(sparkEngineContext.getJavaSparkContext());
}
});
if (enabled) {
bootstrapIfNeeded(engineContext, datasetMetaClient);
}
} catch (IOException e) {
LOG.error("Failed to initialize metadata table. Disabling the writer.", e);
enabled = false;
}
}
@Override
protected void commit(List<HoodieRecord> records, String partitionName, String instantTime) {
ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled");
metadata.closeReaders();
JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName);
try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) {
writeClient.startCommitWithTime(instantTime);
List<WriteStatus> statuses = writeClient.upsertPreppedRecords(recordRDD, instantTime).collect();
statuses.forEach(writeStatus -> {
if (writeStatus.hasErrors()) {
throw new HoodieMetadataException("Failed to commit metadata table records at instant " + instantTime);
}
});
// trigger cleaning, compaction, with suffixes based on the same instant time. This ensures that any future
// delta commits synced over will not have an instant time lesser than the last completed instant on the
// metadata table.
if (writeClient.scheduleCompactionAtInstant(instantTime + "001", Option.empty())) {
writeClient.compact(instantTime + "001");
}
writeClient.clean(instantTime + "002");
}
// Update total size of the metadata and count of base/log files
metrics.ifPresent(m -> {
try {
Map<String, String> stats = m.getStats(false, metaClient, metadata);
m.updateMetrics(Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_BASE_FILE_SIZE)),
Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_LOG_FILE_SIZE)),
Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_BASE_FILES)),
Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_LOG_FILES)));
} catch (HoodieIOException e) {
LOG.error("Could not publish metadata size metrics", e);
}
});
}
/**
* Tag each record with the location.
*
* Since we only read the latest base file in a partition, we tag the records with the instant time of the latest
* base file.
*/
private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName) {
HoodieTable table = HoodieSparkTable.create(metadataWriteConfig, engineContext);
TableFileSystemView.SliceView fsView = table.getSliceView();
List<HoodieBaseFile> baseFiles = fsView.getLatestFileSlices(partitionName)
.map(FileSlice::getBaseFile)
.filter(Option::isPresent)
.map(Option::get)
.collect(Collectors.toList());
// All the metadata fits within a single base file
if (partitionName.equals(MetadataPartitionType.FILES.partitionPath())) {
if (baseFiles.size() > 1) {
throw new HoodieMetadataException("Multiple base files found in metadata partition");
}
}
JavaSparkContext jsc = ((HoodieSparkEngineContext) engineContext).getJavaSparkContext();
String fileId;
String instantTime;
if (!baseFiles.isEmpty()) {
fileId = baseFiles.get(0).getFileId();
instantTime = baseFiles.get(0).getCommitTime();
} else {
// If there is a log file then we can assume that it has the data
List<HoodieLogFile> logFiles = fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath())
.map(FileSlice::getLatestLogFile)
.filter(Option::isPresent)
.map(Option::get)
.collect(Collectors.toList());
if (logFiles.isEmpty()) {
// No base and log files. All are new inserts
return jsc.parallelize(records, 1);
}
fileId = logFiles.get(0).getFileId();
instantTime = logFiles.get(0).getBaseCommitTime();
}
return jsc.parallelize(records, 1).map(r -> r.setCurrentLocation(new HoodieRecordLocation(instantTime, fileId)));
}
}

View File

@@ -0,0 +1,801 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.metadata;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieMetadataConfig;
import org.apache.hudi.config.HoodieMetricsConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metrics.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
public class TestHoodieFsMetadata extends HoodieClientTestHarness {
private static final Logger LOG = LogManager.getLogger(TestHoodieFsMetadata.class);
@TempDir
public java.nio.file.Path tempFolder;
private String metadataTableBasePath;
private HoodieTableType tableType;
public void init(HoodieTableType tableType) throws IOException {
this.tableType = tableType;
initPath();
initSparkContexts("TestHoodieMetadata");
initFileSystem();
fs.mkdirs(new Path(basePath));
initMetaClient();
initTestDataGenerator();
metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
}
@AfterEach
public void clean() throws IOException {
cleanupResources();
}
/**
* Metadata Table should not be created unless it is enabled in config.
*/
@Test
public void testDefaultNoMetadataTable() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
// Metadata table should not exist until created for the first time
assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not exist");
assertThrows(TableNotFoundException.class, () -> new HoodieTableMetaClient(hadoopConf, metadataTableBasePath));
// Metadata table is not created if disabled by config
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
client.startCommitWithTime("001");
assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not be created");
assertThrows(TableNotFoundException.class, () -> new HoodieTableMetaClient(hadoopConf, metadataTableBasePath));
}
// Metadata table created when enabled by config
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) {
client.startCommitWithTime("001");
assertTrue(fs.exists(new Path(metadataTableBasePath)));
validateMetadata(client);
}
}
/**
* Only valid partition directories are added to the metadata.
*/
@Test
public void testOnlyValidPartitionsAdded() throws Exception {
// This test requires local file system
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
// Create an empty directory which is not a partition directory (lacks partition metadata)
final String nonPartitionDirectory = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0] + "-nonpartition";
Files.createDirectories(Paths.get(basePath, nonPartitionDirectory));
// Create some commits
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
testTable.withPartitionMetaFiles("p1", "p2")
.addCommit("001").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10)
.addCommit("002").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10, 10)
.addInflightCommit("003").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10);
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
client.startCommitWithTime("005");
List<String> partitions = metadataWriter(client).metadata().getAllPartitionPaths();
assertFalse(partitions.contains(nonPartitionDirectory),
"Must not contain the non-partition " + nonPartitionDirectory);
assertTrue(partitions.contains("p1"), "Must contain partition p1");
assertTrue(partitions.contains("p2"), "Must contain partition p2");
FileStatus[] statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p1"));
assertTrue(statuses.length == 2);
statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p2"));
assertTrue(statuses.length == 5);
}
}
/**
* Test various table operations sync to Metadata Table correctly.
*/
//@ParameterizedTest
//@EnumSource(HoodieTableType.class)
//public void testTableOperations(HoodieTableType tableType) throws Exception {
@Test
public void testTableOperations() throws Exception {
//FIXME(metadata): This is broken for MOR, until HUDI-1434 is fixed
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
// Write 1 (Bulk insert)
String newCommitTime = "001";
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
// Write 2 (inserts)
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
validateMetadata(client);
records = dataGen.generateInserts(newCommitTime, 20);
writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
// Write 3 (updates)
newCommitTime = "003";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUniqueUpdates(newCommitTime, 10);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
// Write 4 (updates and inserts)
newCommitTime = "004";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 10);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
// Compaction
if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
newCommitTime = "005";
client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
client.compact(newCommitTime);
validateMetadata(client);
}
// Write 5 (updates and inserts)
newCommitTime = "006";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 5);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
// Compaction
if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
newCommitTime = "007";
client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
client.compact(newCommitTime);
validateMetadata(client);
}
// Deletes
newCommitTime = "008";
records = dataGen.generateDeletes(newCommitTime, 10);
JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey());
client.startCommitWithTime(newCommitTime);
client.delete(deleteKeys, newCommitTime);
validateMetadata(client);
// Clean
newCommitTime = "009";
client.clean(newCommitTime);
validateMetadata(client);
// Restore
client.restoreToInstant("006");
validateMetadata(client);
}
}
/**
* Test rollback of various table operations sync to Metadata Table correctly.
*/
//@ParameterizedTest
//@EnumSource(HoodieTableType.class)
//public void testRollbackOperations(HoodieTableType tableType) throws Exception {
@Test
public void testRollbackOperations() throws Exception {
//FIXME(metadata): This is broken for MOR, until HUDI-1434 is fixed
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
// Write 1 (Bulk insert)
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
// Rollback of inserts
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 20);
writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
client.rollback(newCommitTime);
client.syncTableMetadata();
validateMetadata(client);
// Rollback of updates
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUniqueUpdates(newCommitTime, 20);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
client.rollback(newCommitTime);
client.syncTableMetadata();
validateMetadata(client);
// Rollback of updates and inserts
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 10);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
client.rollback(newCommitTime);
client.syncTableMetadata();
validateMetadata(client);
// Rollback of Compaction
if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
client.compact(newCommitTime);
validateMetadata(client);
}
// Rollback of Deletes
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
records = dataGen.generateDeletes(newCommitTime, 10);
JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey());
client.startCommitWithTime(newCommitTime);
writeStatuses = client.delete(deleteKeys, newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
client.rollback(newCommitTime);
client.syncTableMetadata();
validateMetadata(client);
// Rollback of Clean
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.clean(newCommitTime);
validateMetadata(client);
client.rollback(newCommitTime);
client.syncTableMetadata();
validateMetadata(client);
}
// Rollback of partial commits
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfigBuilder(false, true, false).withRollbackUsingMarkers(false).build())) {
// Write updates and inserts
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
client.rollback(newCommitTime);
client.syncTableMetadata();
validateMetadata(client);
}
// Marker based rollback of partial commits
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfigBuilder(false, true, false).withRollbackUsingMarkers(true).build())) {
// Write updates and inserts
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
client.rollback(newCommitTime);
client.syncTableMetadata();
validateMetadata(client);
}
}
/**
* Test sync of table operations.
*/
//@ParameterizedTest
//@EnumSource(HoodieTableType.class)
//public void testSync(HoodieTableType tableType) throws Exception {
@Test
public void testSync() throws Exception {
//FIXME(metadata): This is broken for MOR, until HUDI-1434 is fixed
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
String newCommitTime;
List<HoodieRecord> records;
List<WriteStatus> writeStatuses;
// Initial commits without metadata table enabled
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
records = dataGen.generateInserts(newCommitTime, 5);
client.startCommitWithTime(newCommitTime);
writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
records = dataGen.generateInserts(newCommitTime, 5);
client.startCommitWithTime(newCommitTime);
writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
}
// Enable metadata table so it initialized by listing from file system
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
// inserts
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 5);
writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
assertTrue(metadata(client).isInSync());
}
// Various table operations without metadata table enabled
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
// updates
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUniqueUpdates(newCommitTime, 5);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
assertFalse(metadata(client).isInSync());
// updates and inserts
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 10);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
assertFalse(metadata(client).isInSync());
// Compaction
if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
client.compact(newCommitTime);
assertFalse(metadata(client).isInSync());
}
// Savepoint
String savepointInstant = newCommitTime;
if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) {
client.savepoint("hoodie", "metadata test");
assertFalse(metadata(client).isInSync());
}
// Deletes
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
records = dataGen.generateDeletes(newCommitTime, 5);
JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey());
client.startCommitWithTime(newCommitTime);
client.delete(deleteKeys, newCommitTime);
assertFalse(metadata(client).isInSync());
// Clean
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.clean(newCommitTime);
assertFalse(metadata(client).isInSync());
// updates
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUniqueUpdates(newCommitTime, 10);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
assertFalse(metadata(client).isInSync());
client.restoreToInstant(savepointInstant);
assertFalse(metadata(client).isInSync());
}
// Enable metadata table and ensure it is synced
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
validateMetadata(client);
assertTrue(metadata(client).isInSync());
}
}
/**
* Instants on Metadata Table should be archived as per config.
* Metadata Table should be automatically compacted as per config.
*/
@ParameterizedTest
@ValueSource(booleans = {false})
public void testCleaningArchivingAndCompaction(boolean asyncClean) throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
final int maxDeltaCommitsBeforeCompaction = 4;
HoodieWriteConfig config = getWriteConfigBuilder(true, true, false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
.archiveCommitsWith(6, 8).retainCommits(1)
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3)
.retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(asyncClean).build())
.build();
List<HoodieRecord> records;
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, config)) {
for (int i = 1; i < 10; ++i) {
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
if (i == 1) {
records = dataGen.generateInserts(newCommitTime, 5);
} else {
records = dataGen.generateUpdates(newCommitTime, 2);
}
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
}
}
HoodieTableMetaClient metadataMetaClient = new HoodieTableMetaClient(hadoopConf, metadataTableBasePath);
HoodieActiveTimeline metadataTimeline = metadataMetaClient.getActiveTimeline();
// check that there are 2 compactions.
assertEquals(2, metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants());
// check that cleaning has, once after each compaction. There will be more instances on the timeline, since it's less aggressively archived
assertEquals(4, metadataTimeline.getCleanerTimeline().filterCompletedInstants().countInstants());
// ensure archiving has happened
List<HoodieInstant> instants = metadataTimeline.getCommitsAndCompactionTimeline()
.getInstants().collect(Collectors.toList());
Collections.reverse(instants);
long numDeltaCommits = instants.stream().filter(instant -> instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)).count();
assertEquals(5, numDeltaCommits);
}
/**
* Test various error scenarios.
*/
@Test
public void testErrorCases() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
// TESTCASE: If commit on the metadata table succeeds but fails on the dataset, then on next init the metadata table
// should be rolled back to last valid commit.
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) {
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 5);
writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
// There is no way to simulate failed commit on the main dataset, hence we simply delete the completed
// instant so that only the inflight is left over.
String commitInstantFileName = HoodieTimeline.makeCommitFileName(newCommitTime);
assertTrue(fs.delete(new Path(basePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME,
commitInstantFileName), false));
}
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) {
// Start the next commit which will rollback the previous one and also should update the metadata table by
// updating it with HoodieRollbackMetadata.
String newCommitTime = client.startCommit();
// Dangling commit but metadata should be valid at this time
validateMetadata(client);
// Next insert
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 5);
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
// Post rollback commit and metadata should be valid
validateMetadata(client);
}
}
/**
* Test non-partitioned datasets.
*/
@Test
public void testNonPartitioned() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
HoodieTestDataGenerator nonPartitionedGenerator = new HoodieTestDataGenerator(new String[] {""});
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
// Write 1 (Bulk insert)
String newCommitTime = "001";
List<HoodieRecord> records = nonPartitionedGenerator.generateInserts(newCommitTime, 10);
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
validateMetadata(client);
List<String> metadataPartitions = metadata(client).getAllPartitionPaths();
assertTrue(metadataPartitions.contains(""), "Must contain empty partition");
}
}
/**
* Test various metrics published by metadata table.
*/
@Test
public void testMetadataMetrics() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfigBuilder(true, true, true).build())) {
// Write
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
Registry metricsRegistry = Registry.getRegistry("HoodieMetadata");
assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".count"));
assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".totalDuration"));
assertEquals(metricsRegistry.getAllCounts().get(HoodieMetadataMetrics.INITIALIZE_STR + ".count"), 1L);
assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.size"));
assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.size"));
assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.count"));
assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.count"));
}
}
/**
* Validate the metadata tables contents to ensure it matches what is on the file system.
*
* @throws IOException
*/
private void validateMetadata(SparkRDDWriteClient client) throws IOException {
HoodieWriteConfig config = client.getConfig();
HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client);
assertNotNull(metadataWriter, "MetadataWriter should have been initialized");
if (!config.useFileListingMetadata()) {
return;
}
HoodieTimer timer = new HoodieTimer().startTimer();
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
// Validate write config for metadata table
HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig();
assertFalse(metadataWriteConfig.useFileListingMetadata(), "No metadata table for metadata table");
assertFalse(metadataWriteConfig.getFileListingMetadataVerify(), "No verify for metadata table");
// Metadata table should be in sync with the dataset
assertTrue(metadata(client).isInSync());
// Partitions should match
List<String> fsPartitions = FSUtils.getAllFoldersWithPartitionMetaFile(fs, basePath);
List<String> metadataPartitions = metadataWriter.metadata().getAllPartitionPaths();
Collections.sort(fsPartitions);
Collections.sort(metadataPartitions);
assertEquals(fsPartitions.size(), metadataPartitions.size(), "Partitions should match");
assertTrue(fsPartitions.equals(metadataPartitions), "Partitions should match");
// Files within each partition should match
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(config, engineContext);
TableFileSystemView tableView = table.getHoodieView();
fsPartitions.forEach(partition -> {
try {
Path partitionPath;
if (partition.equals("")) {
// Should be the non-partitioned case
partitionPath = new Path(basePath);
} else {
partitionPath = new Path(basePath, partition);
}
FileStatus[] fsStatuses = FSUtils.getAllDataFilesInPartition(fs, partitionPath);
FileStatus[] metaStatuses = metadataWriter.metadata().getAllFilesInPartition(partitionPath);
List<String> fsFileNames = Arrays.stream(fsStatuses)
.map(s -> s.getPath().getName()).collect(Collectors.toList());
List<String> metadataFilenames = Arrays.stream(metaStatuses)
.map(s -> s.getPath().getName()).collect(Collectors.toList());
Collections.sort(fsFileNames);
Collections.sort(metadataFilenames);
// File sizes should be valid
Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getLen() > 0));
if ((fsFileNames.size() != metadataFilenames.size()) || (!fsFileNames.equals(metadataFilenames))) {
LOG.info("*** File system listing = " + Arrays.toString(fsFileNames.toArray()));
LOG.info("*** Metadata listing = " + Arrays.toString(metadataFilenames.toArray()));
}
assertEquals(fsFileNames.size(), metadataFilenames.size(), "Files within partition " + partition + " should match");
assertTrue(fsFileNames.equals(metadataFilenames), "Files within partition " + partition + " should match");
// FileSystemView should expose the same data
List<HoodieFileGroup> fileGroups = tableView.getAllFileGroups(partition).collect(Collectors.toList());
fileGroups.forEach(g -> LogManager.getLogger(TestHoodieFsMetadata.class).info(g));
fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> LogManager.getLogger(TestHoodieFsMetadata.class).info(b)));
fileGroups.forEach(g -> g.getAllFileSlices().forEach(s -> LogManager.getLogger(TestHoodieFsMetadata.class).info(s)));
long numFiles = fileGroups.stream()
.mapToLong(g -> g.getAllBaseFiles().count() + g.getAllFileSlices().mapToLong(s -> s.getLogFiles().count()).sum())
.sum();
assertEquals(metadataFilenames.size(), numFiles);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
assertTrue(false, "Exception should not be raised: " + e);
}
});
HoodieTableMetaClient metadataMetaClient = new HoodieTableMetaClient(hadoopConf, metadataTableBasePath);
// Metadata table should be in sync with the dataset
assertTrue(metadataWriter.metadata().isInSync());
// Metadata table is MOR
assertEquals(metadataMetaClient.getTableType(), HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR");
// Metadata table is HFile format
assertEquals(metadataMetaClient.getTableConfig().getBaseFileFormat(), HoodieFileFormat.HFILE,
"Metadata Table base file format should be HFile");
// Metadata table has a fixed number of partitions
// Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as that function filters all directory
// in the .hoodie folder.
List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(fs, HoodieTableMetadata.getMetadataTableBasePath(basePath),
false);
assertEquals(MetadataPartitionType.values().length, metadataTablePartitions.size());
// Metadata table should automatically compact and clean
// versions are +1 as autoclean / compaction happens end of commits
int numFileVersions = metadataWriteConfig.getCleanerFileVersionsRetained() + 1;
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline());
metadataTablePartitions.forEach(partition -> {
List<FileSlice> latestSlices = fsView.getLatestFileSlices(partition).collect(Collectors.toList());
assertTrue(latestSlices.stream().map(FileSlice::getBaseFile).count() <= 1, "Should have a single latest base file");
assertTrue(latestSlices.size() <= 1, "Should have a single latest file slice");
assertTrue(latestSlices.size() <= numFileVersions, "Should limit file slice to "
+ numFileVersions + " but was " + latestSlices.size());
});
LOG.info("Validation time=" + timer.endTimer());
}
private HoodieBackedTableMetadataWriter metadataWriter(SparkRDDWriteClient client) {
return (HoodieBackedTableMetadataWriter) SparkHoodieBackedTableMetadataWriter
.create(hadoopConf, client.getConfig(), new HoodieSparkEngineContext(jsc));
}
private HoodieBackedTableMetadata metadata(SparkRDDWriteClient client) {
HoodieWriteConfig clientConfig = client.getConfig();
return (HoodieBackedTableMetadata) HoodieTableMetadata.create(hadoopConf, clientConfig.getBasePath(), clientConfig.getSpillableMapBasePath(),
clientConfig.useFileListingMetadata(), clientConfig.getFileListingMetadataVerify(), false, clientConfig.shouldAssumeDatePartitioning());
}
// TODO: this can be moved to TestHarness after merge from master
private void assertNoWriteErrors(List<WriteStatus> statuses) {
// Verify there are no errors
for (WriteStatus status : statuses) {
assertFalse(status.hasErrors(), "Errors found in write of " + status.getFileId());
}
}
private HoodieWriteConfig getWriteConfig(boolean autoCommit, boolean useFileListingMetadata) {
return getWriteConfigBuilder(autoCommit, useFileListingMetadata, false).build();
}
private HoodieWriteConfig.Builder getWriteConfigBuilder(boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics) {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2)
.withAutoCommit(autoCommit).withAssumeDatePartitioning(false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1)
.withAutoClean(false).retainCommits(1).retainFileVersions(1).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).build())
.withEmbeddedTimelineServerEnabled(false).forTable("test-trip-table")
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
.withEnableBackupForRemoteFileSystemView(false).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(useFileListingMetadata).build())
.withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)
.withExecutorMetrics(true).usePrefix("unit-test").build());
}
@Override
protected HoodieTableType getTableType() {
return tableType;
}
}