[HUDI-3654] Preparations for hudi metastore. (#5572)
* [HUDI-3654] Preparations for hudi metastore. Co-authored-by: gengxiaoyu <gengxiaoyu@bytedance.com>
This commit is contained in:
@@ -135,7 +135,8 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable {
|
||||
return HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(config.getBasePath())
|
||||
.setLoadActiveTimelineOnLoad(loadActiveTimelineOnLoad).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
|
||||
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
|
||||
.setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
|
||||
.setFileSystemRetryConfig(config.getFileSystemRetryConfig())
|
||||
.setProperties(config.getProps()).build();
|
||||
}
|
||||
|
||||
public Option<EmbeddedTimelineService> getTimelineServer() {
|
||||
|
||||
@@ -459,6 +459,9 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
|
||||
|
||||
private Stream<HoodieInstant> getInstantsToArchive() {
|
||||
Stream<HoodieInstant> instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
|
||||
if (config.isMetastoreEnabled()) {
|
||||
return Stream.empty();
|
||||
}
|
||||
|
||||
// For archiving and cleaning instants, we need to include intermediate state files if they exist
|
||||
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
|
||||
|
||||
@@ -28,6 +28,7 @@ import org.apache.hudi.common.config.ConfigProperty;
|
||||
import org.apache.hudi.common.config.HoodieCommonConfig;
|
||||
import org.apache.hudi.common.config.HoodieConfig;
|
||||
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
||||
import org.apache.hudi.common.config.HoodieMetastoreConfig;
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.common.engine.EngineType;
|
||||
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
|
||||
@@ -495,6 +496,7 @@ public class HoodieWriteConfig extends HoodieConfig {
|
||||
private FileSystemViewStorageConfig viewStorageConfig;
|
||||
private HoodiePayloadConfig hoodiePayloadConfig;
|
||||
private HoodieMetadataConfig metadataConfig;
|
||||
private HoodieMetastoreConfig metastoreConfig;
|
||||
private HoodieCommonConfig commonConfig;
|
||||
private EngineType engineType;
|
||||
|
||||
@@ -886,6 +888,7 @@ public class HoodieWriteConfig extends HoodieConfig {
|
||||
this.viewStorageConfig = clientSpecifiedViewStorageConfig;
|
||||
this.hoodiePayloadConfig = HoodiePayloadConfig.newBuilder().fromProperties(newProps).build();
|
||||
this.metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(props).build();
|
||||
this.metastoreConfig = HoodieMetastoreConfig.newBuilder().fromProperties(props).build();
|
||||
this.commonConfig = HoodieCommonConfig.newBuilder().fromProperties(props).build();
|
||||
}
|
||||
|
||||
@@ -2100,6 +2103,13 @@ public class HoodieWriteConfig extends HoodieConfig {
|
||||
return HoodieStorageLayout.LayoutType.valueOf(getString(HoodieLayoutConfig.LAYOUT_TYPE));
|
||||
}
|
||||
|
||||
/**
|
||||
* Metastore configs.
|
||||
*/
|
||||
public boolean isMetastoreEnabled() {
|
||||
return metastoreConfig.enableMetastore();
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
|
||||
protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
|
||||
|
||||
@@ -63,7 +63,8 @@ public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
|
||||
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
|
||||
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
|
||||
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
|
||||
.setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
|
||||
.setFileSystemRetryConfig(config.getFileSystemRetryConfig())
|
||||
.setProperties(config.getProps()).build();
|
||||
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, refreshTimeline);
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,93 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.common.config;
|
||||
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Configurations used by the HUDI Metastore.
|
||||
*/
|
||||
@Immutable
|
||||
@ConfigClassProperty(name = "Metastore Configs",
|
||||
groupName = ConfigGroups.Names.WRITE_CLIENT,
|
||||
description = "Configurations used by the Hudi Metastore.")
|
||||
public class HoodieMetastoreConfig extends HoodieConfig {
|
||||
|
||||
public static final String METASTORE_PREFIX = "hoodie.metastore";
|
||||
|
||||
public static final ConfigProperty<Boolean> METASTORE_ENABLE = ConfigProperty
|
||||
.key(METASTORE_PREFIX + ".enable")
|
||||
.defaultValue(false)
|
||||
.withDocumentation("Use metastore server to store hoodie table metadata");
|
||||
|
||||
public static final ConfigProperty<String> METASTORE_URLS = ConfigProperty
|
||||
.key(METASTORE_PREFIX + ".uris")
|
||||
.defaultValue("thrift://localhost:9090")
|
||||
.withDocumentation("Metastore server uris");
|
||||
|
||||
public static final ConfigProperty<Integer> METASTORE_CONNECTION_RETRIES = ConfigProperty
|
||||
.key(METASTORE_PREFIX + ".connect.retries")
|
||||
.defaultValue(3)
|
||||
.withDocumentation("Number of retries while opening a connection to metastore");
|
||||
|
||||
public static final ConfigProperty<Integer> METASTORE_CONNECTION_RETRY_DELAY = ConfigProperty
|
||||
.key(METASTORE_PREFIX + ".connect.retry.delay")
|
||||
.defaultValue(1)
|
||||
.withDocumentation("Number of seconds for the client to wait between consecutive connection attempts");
|
||||
|
||||
public static HoodieMetastoreConfig.Builder newBuilder() {
|
||||
return new HoodieMetastoreConfig.Builder();
|
||||
}
|
||||
|
||||
public boolean enableMetastore() {
|
||||
return getBoolean(METASTORE_ENABLE);
|
||||
}
|
||||
|
||||
public String getMetastoreUris() {
|
||||
return getStringOrDefault(METASTORE_URLS);
|
||||
}
|
||||
|
||||
public int getConnectionRetryLimit() {
|
||||
return getIntOrDefault(METASTORE_CONNECTION_RETRIES);
|
||||
}
|
||||
|
||||
public int getConnectionRetryDelay() {
|
||||
return getIntOrDefault(METASTORE_CONNECTION_RETRY_DELAY);
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
private final HoodieMetastoreConfig config = new HoodieMetastoreConfig();
|
||||
|
||||
public Builder fromProperties(Properties props) {
|
||||
this.config.getProps().putAll(props);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setUris(String uris) {
|
||||
config.setValue(METASTORE_URLS, uris);
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieMetastoreConfig build() {
|
||||
config.setDefaults(HoodieMetastoreConfig.class.getName());
|
||||
return config;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -19,6 +19,7 @@
|
||||
package org.apache.hudi.common.table;
|
||||
|
||||
import org.apache.hudi.common.config.HoodieConfig;
|
||||
import org.apache.hudi.common.config.HoodieMetastoreConfig;
|
||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
@@ -38,6 +39,7 @@ import org.apache.hudi.common.table.timeline.TimelineLayout;
|
||||
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
||||
import org.apache.hudi.common.util.CommitUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ReflectionUtils;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
@@ -98,21 +100,22 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
// NOTE: Since those two parameters lay on the hot-path of a lot of computations, we
|
||||
// use tailored extension of the {@code Path} class allowing to avoid repetitive
|
||||
// computations secured by its immutability
|
||||
private SerializablePath basePath;
|
||||
private SerializablePath metaPath;
|
||||
protected SerializablePath basePath;
|
||||
protected SerializablePath metaPath;
|
||||
|
||||
private transient HoodieWrapperFileSystem fs;
|
||||
private boolean loadActiveTimelineOnLoad;
|
||||
private SerializableConfiguration hadoopConf;
|
||||
protected SerializableConfiguration hadoopConf;
|
||||
private HoodieTableType tableType;
|
||||
private TimelineLayoutVersion timelineLayoutVersion;
|
||||
private HoodieTableConfig tableConfig;
|
||||
private HoodieActiveTimeline activeTimeline;
|
||||
protected HoodieTableConfig tableConfig;
|
||||
protected HoodieActiveTimeline activeTimeline;
|
||||
private HoodieArchivedTimeline archivedTimeline;
|
||||
private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build();
|
||||
private FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().build();
|
||||
protected HoodieMetastoreConfig metastoreConfig;
|
||||
|
||||
private HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad,
|
||||
protected HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad,
|
||||
ConsistencyGuardConfig consistencyGuardConfig, Option<TimelineLayoutVersion> layoutVersion,
|
||||
String payloadClassName, FileSystemRetryConfig fileSystemRetryConfig) {
|
||||
LOG.info("Loading HoodieTableMetaClient from " + basePath);
|
||||
@@ -367,6 +370,13 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
return archivedTimeline;
|
||||
}
|
||||
|
||||
public HoodieMetastoreConfig getMetastoreConfig() {
|
||||
if (metastoreConfig == null) {
|
||||
metastoreConfig = new HoodieMetastoreConfig();
|
||||
}
|
||||
return metastoreConfig;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns fresh new archived commits as a timeline from startTs (inclusive).
|
||||
*
|
||||
@@ -451,7 +461,8 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
HoodieTableConfig.create(fs, metaPathDir, props);
|
||||
// We should not use fs.getConf as this might be different from the original configuration
|
||||
// used to create the fs in unit tests
|
||||
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
|
||||
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath)
|
||||
.setProperties(props).build();
|
||||
LOG.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType() + " from " + basePath);
|
||||
return metaClient;
|
||||
}
|
||||
@@ -620,6 +631,21 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
initializeBootstrapDirsIfNotExists(getHadoopConf(), basePath.toString(), getFs());
|
||||
}
|
||||
|
||||
private static HoodieTableMetaClient newMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad,
|
||||
ConsistencyGuardConfig consistencyGuardConfig, Option<TimelineLayoutVersion> layoutVersion,
|
||||
String payloadClassName, FileSystemRetryConfig fileSystemRetryConfig, Properties props) {
|
||||
HoodieMetastoreConfig metastoreConfig = null == props
|
||||
? new HoodieMetastoreConfig.Builder().build()
|
||||
: new HoodieMetastoreConfig.Builder().fromProperties(props).build();
|
||||
return metastoreConfig.enableMetastore()
|
||||
? (HoodieTableMetaClient) ReflectionUtils.loadClass("org.apache.hudi.common.table.HoodieTableMetastoreClient",
|
||||
new Class<?>[]{Configuration.class, ConsistencyGuardConfig.class, FileSystemRetryConfig.class, String.class, String.class, HoodieMetastoreConfig.class},
|
||||
conf, consistencyGuardConfig, fileSystemRetryConfig,
|
||||
props.getProperty(HoodieTableConfig.DATABASE_NAME.key()), props.getProperty(HoodieTableConfig.NAME.key()), metastoreConfig)
|
||||
: new HoodieTableMetaClient(conf, basePath,
|
||||
loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName, fileSystemRetryConfig);
|
||||
}
|
||||
|
||||
public static Builder builder() {
|
||||
return new Builder();
|
||||
}
|
||||
@@ -636,6 +662,7 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build();
|
||||
private FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().build();
|
||||
private Option<TimelineLayoutVersion> layoutVersion = Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION);
|
||||
private Properties props;
|
||||
|
||||
public Builder setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
@@ -672,11 +699,16 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setProperties(Properties properties) {
|
||||
this.props = properties;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieTableMetaClient build() {
|
||||
ValidationUtils.checkArgument(conf != null, "Configuration needs to be set to init HoodieTableMetaClient");
|
||||
ValidationUtils.checkArgument(basePath != null, "basePath needs to be set to init HoodieTableMetaClient");
|
||||
return new HoodieTableMetaClient(conf, basePath,
|
||||
loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName, fileSystemRetryConfig);
|
||||
return newMetaClient(conf, basePath,
|
||||
loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName, fileSystemRetryConfig, props);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -245,7 +245,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteInstantFile(HoodieInstant instant) {
|
||||
protected void deleteInstantFile(HoodieInstant instant) {
|
||||
LOG.info("Deleting instant " + instant);
|
||||
Path inFlightCommitFilePath = getInstantFileNamePath(instant.getFileName());
|
||||
try {
|
||||
@@ -536,7 +536,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
transitionState(fromInstant, toInstant, data, false);
|
||||
}
|
||||
|
||||
private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data,
|
||||
protected void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data,
|
||||
boolean allowRedundantTransitions) {
|
||||
ValidationUtils.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp()));
|
||||
try {
|
||||
@@ -566,7 +566,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
}
|
||||
}
|
||||
|
||||
private void revertCompleteToInflight(HoodieInstant completed, HoodieInstant inflight) {
|
||||
protected void revertCompleteToInflight(HoodieInstant completed, HoodieInstant inflight) {
|
||||
ValidationUtils.checkArgument(completed.getTimestamp().equals(inflight.getTimestamp()));
|
||||
Path inFlightCommitFilePath = getInstantFileNamePath(inflight.getFileName());
|
||||
Path commitFilePath = getInstantFileNamePath(completed.getFileName());
|
||||
@@ -632,7 +632,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
}
|
||||
|
||||
/**
|
||||
* Saves content for inflight/requested REPLACE instant.
|
||||
* Saves content for requested REPLACE instant.
|
||||
*/
|
||||
public void saveToPendingReplaceCommit(HoodieInstant instant, Option<byte[]> content) {
|
||||
ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION));
|
||||
@@ -719,7 +719,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
createFileInMetaPath(instant.getFileName(), content, false);
|
||||
}
|
||||
|
||||
private void createFileInMetaPath(String filename, Option<byte[]> content, boolean allowOverwrite) {
|
||||
protected void createFileInMetaPath(String filename, Option<byte[]> content, boolean allowOverwrite) {
|
||||
Path fullPath = getInstantFileNamePath(filename);
|
||||
if (allowOverwrite || metaClient.getTimelineLayoutVersion().isNullVersion()) {
|
||||
FileIOUtils.createFileInPath(metaClient.getFs(), fullPath, content);
|
||||
|
||||
@@ -20,12 +20,14 @@ package org.apache.hudi.common.table.view;
|
||||
|
||||
import org.apache.hudi.common.config.HoodieCommonConfig;
|
||||
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
||||
import org.apache.hudi.common.config.HoodieMetastoreConfig;
|
||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.common.function.SerializableSupplier;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.Functions.Function2;
|
||||
import org.apache.hudi.common.util.ReflectionUtils;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
|
||||
import org.apache.hudi.metadata.HoodieTableMetadata;
|
||||
@@ -59,6 +61,8 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
public class FileSystemViewManager {
|
||||
private static final Logger LOG = LogManager.getLogger(FileSystemViewManager.class);
|
||||
|
||||
private static final String HOODIE_METASTORE_FILE_SYSTEM_VIEW_CLASS = "org.apache.hudi.common.table.view.HoodieMetastoreFileSystemView";
|
||||
|
||||
private final SerializableConfiguration conf;
|
||||
// The View Storage config used to store file-system views
|
||||
private final FileSystemViewStorageConfig viewStorageConfig;
|
||||
@@ -165,6 +169,11 @@ public class FileSystemViewManager {
|
||||
return new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(),
|
||||
metadataSupplier.get());
|
||||
}
|
||||
if (metaClient.getMetastoreConfig().enableMetastore()) {
|
||||
return (HoodieTableFileSystemView) ReflectionUtils.loadClass(HOODIE_METASTORE_FILE_SYSTEM_VIEW_CLASS,
|
||||
new Class<?>[] {HoodieTableMetaClient.class, HoodieTimeline.class, HoodieMetastoreConfig.class},
|
||||
metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), metaClient.getMetastoreConfig());
|
||||
}
|
||||
return new HoodieTableFileSystemView(metaClient, timeline, viewConf.isIncrementalTimelineSyncEnabled());
|
||||
}
|
||||
|
||||
@@ -184,6 +193,11 @@ public class FileSystemViewManager {
|
||||
if (metadataConfig.enabled()) {
|
||||
return new HoodieMetadataFileSystemView(engineContext, metaClient, timeline, metadataConfig);
|
||||
}
|
||||
if (metaClient.getMetastoreConfig().enableMetastore()) {
|
||||
return (HoodieTableFileSystemView) ReflectionUtils.loadClass(HOODIE_METASTORE_FILE_SYSTEM_VIEW_CLASS,
|
||||
new Class<?>[] {HoodieTableMetaClient.class, HoodieTimeline.class, HoodieMetadataConfig.class},
|
||||
metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), metaClient.getMetastoreConfig());
|
||||
}
|
||||
return new HoodieTableFileSystemView(metaClient, timeline);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user