1
0

HUDI-138 - Meta Files handling also need to support consistency guard

This commit is contained in:
Balaji Varadarajan
2019-06-20 18:05:01 -07:00
committed by Balaji Varadarajan
parent 621c246fa9
commit 5823c1ebd7
21 changed files with 482 additions and 167 deletions

View File

@@ -19,6 +19,8 @@
package com.uber.hoodie;
import com.uber.hoodie.client.embedded.EmbeddedTimelineService;
import com.uber.hoodie.client.utils.ClientUtils;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import java.io.IOException;
@@ -117,4 +119,8 @@ public abstract class AbstractHoodieClient implements Serializable {
public Optional<EmbeddedTimelineService> getTimelineServer() {
return timelineServer;
}
protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) {
return ClientUtils.createMetaClient(jsc, config, loadActiveTimelineOnLoad);
}
}

View File

@@ -114,7 +114,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
*/
public List<RenameOpResult> unscheduleCompactionPlan(
String compactionInstant, boolean skipValidation, int parallelism, boolean dryRun) throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTableMetaClient metaClient = createMetaClient(false);
List<Pair<HoodieLogFile, HoodieLogFile>> renameActions =
getRenamingActionsForUnschedulingCompactionPlan(metaClient, compactionInstant, parallelism,
Optional.absent(), skipValidation);
@@ -156,7 +156,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
*/
public List<RenameOpResult> unscheduleCompactionFileId(HoodieFileGroupId fgId,
boolean skipValidation, boolean dryRun) throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTableMetaClient metaClient = createMetaClient(false);
List<Pair<HoodieLogFile, HoodieLogFile>> renameActions =
getRenamingActionsForUnschedulingCompactionForFileId(metaClient, fgId,
Optional.absent(), skipValidation);
@@ -198,7 +198,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
*/
public List<RenameOpResult> repairCompaction(String compactionInstant,
int parallelism, boolean dryRun) throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTableMetaClient metaClient = createMetaClient(false);
List<ValidationOpResult> validationResults =
validateCompactionPlan(metaClient, compactionInstant, parallelism);
List<ValidationOpResult> failed = validationResults.stream()

View File

@@ -76,7 +76,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
@@ -153,7 +152,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieRecords) {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
createMetaClient(true), config, jsc);
JavaRDD<HoodieRecord<T>> recordsWithLocation = index.tagLocation(hoodieRecords, jsc, table);
return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown());
@@ -471,9 +470,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, jsc, table);
// Trigger the insert and collect statuses
statuses = statuses.persist(config.getWriteStatusStorageLevel());
commitOnAutoCommit(commitTime, statuses,
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true)
.getCommitActionType());
commitOnAutoCommit(commitTime, statuses, table.getMetaClient().getCommitActionType());
return statuses;
}
@@ -496,7 +493,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
public boolean commit(String commitTime, JavaRDD<WriteStatus> writeStatuses,
Optional<Map<String, String>> extraMetadata) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true);
HoodieTableMetaClient metaClient = createMetaClient(false);
return commit(commitTime, writeStatuses, extraMetadata, metaClient.getCommitActionType());
}
@@ -506,7 +503,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
logger.info("Commiting " + commitTime);
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
createMetaClient(true), config, jsc);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
@@ -536,7 +533,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
// We cannot have unbounded commit files. Archive commits if we have to archive
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(config,
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true));
createMetaClient(true));
archiveLog.archiveIfRequired(jsc);
if (config.isAutoClean()) {
// Call clean to cleanup if there is anything to cleanup after the commit,
@@ -580,7 +577,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
public boolean savepoint(String user, String comment) {
HoodieTable<T> table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
createMetaClient(true), config, jsc);
if (table.getCompletedCommitsTimeline().empty()) {
throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty");
}
@@ -610,7 +607,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
public boolean savepoint(String commitTime, String user, String comment) {
HoodieTable<T> table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
createMetaClient(true), config, jsc);
if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
}
@@ -674,7 +671,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
public void deleteSavepoint(String savepointTime) {
HoodieTable<T> table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
createMetaClient(true), config, jsc);
if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
}
@@ -705,7 +702,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
private void deleteRequestedCompaction(String compactionTime) {
HoodieTable<T> table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
createMetaClient(true), config, jsc);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieInstant compactionRequestedInstant =
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionTime);
@@ -734,7 +731,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
public boolean rollbackToSavepoint(String savepointTime) {
HoodieTable<T> table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
createMetaClient(true), config, jsc);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
// Rollback to savepoint is expected to be a manual operation and no concurrent write or compaction is expected
@@ -788,7 +785,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
createMetaClient(true), config, jsc);
// Get all the commits on the timeline after the provided commit time
List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getCommitsAndCompactionTimeline().getInstants()
.filter(instant -> HoodieActiveTimeline.GREATER.test(instant.getTimestamp(), instantTime))
@@ -848,7 +845,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
private List<HoodieRollbackStat> doRollbackAndGetStats(final String commitToRollback) throws
IOException {
HoodieTable<T> table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
createMetaClient(true), config, jsc);
HoodieTimeline inflightCommitTimeline = table.getInflightCommitTimeline();
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
// Check if any of the commits is a savepoint - do not allow rollback on those commits
@@ -899,7 +896,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
private void finishRollback(final Timer.Context context, List<HoodieRollbackStat> rollbackStats,
List<String> commitsToRollback, final String startRollbackTime) throws IOException {
HoodieTable<T> table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
createMetaClient(true), config, jsc);
Optional<Long> durationInMs = Optional.empty();
Long numFilesDeleted = rollbackStats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum();
if (context != null) {
@@ -925,7 +922,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
private void finishRestore(final Timer.Context context, Map<String, List<HoodieRollbackStat>> commitToStats,
List<String> commitsToRollback, final String startRestoreTime, final String restoreToInstant) throws IOException {
HoodieTable<T> table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
createMetaClient(true), config, jsc);
Optional<Long> durationInMs = Optional.empty();
Long numFilesDeleted = 0L;
for (Map.Entry<String, List<HoodieRollbackStat>> commitToStat : commitToStats.entrySet()) {
@@ -1001,7 +998,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
createMetaClient(true), config, jsc);
List<HoodieCleanStat> cleanStats = table.clean(jsc);
if (cleanStats.isEmpty()) {
@@ -1053,7 +1050,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
rollbackInflightCommits();
}
logger.info("Generate a new instant time " + instantTime);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath());
HoodieTableMetaClient metaClient = createMetaClient(true);
// if there are pending compactions, their instantTime must not be greater than that of this instant time
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending -> {
Preconditions.checkArgument(
@@ -1086,8 +1083,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
public boolean scheduleCompactionAtInstant(String instantTime, Optional<Map<String, String>> extraMetadata)
throws IOException {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true);
HoodieTableMetaClient metaClient = createMetaClient(true);
// if there are inflight writes, their instantTime must not be less than that of compaction instant time
metaClient.getCommitsTimeline().filterInflightsExcludingCompaction().firstInstant().ifPresent(earliestInflight -> {
Preconditions.checkArgument(
@@ -1130,8 +1126,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
public void commitCompaction(String compactionInstantTime, JavaRDD<WriteStatus> writeStatuses,
Optional<Map<String, String>> extraMetadata) throws IOException {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true);
HoodieTableMetaClient metaClient = createMetaClient(true);
HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(
@@ -1178,7 +1173,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
private void rollbackInflightCommits() {
HoodieTable<T> table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
createMetaClient(true), config, jsc);
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterInflightsExcludingCompaction();
List<String> commits = inflightTimeline.getInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
@@ -1190,11 +1185,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
private HoodieTable getTableAndInitCtx(JavaRDD<HoodieRecord<T>> records) {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(
// Clone Configuration here. Otherwise we could see ConcurrentModificationException (race) in multi-threaded
// execution (HoodieDeltaStreamer) when Configuration gets serialized by Spark.
new Configuration(jsc.hadoopConfiguration()), config.getBasePath(), true), config, jsc);
HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) {
writeContext = metrics.getCommitCtx();
} else {
@@ -1214,8 +1205,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean autoCommit) throws IOException {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true);
HoodieTableMetaClient metaClient = createMetaClient(true);
HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieTimeline pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline();
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
@@ -1223,7 +1213,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
//inflight compaction - Needs to rollback first deleting new parquet files before we run compaction.
rollbackInflightCompaction(inflightInstant, table);
// refresh table
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true);
metaClient = createMetaClient(true);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline();
}
@@ -1253,8 +1243,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
activeTimeline.transitionCompactionRequestedToInflight(compactionInstant);
compactionTimer = metrics.getCompactionCtx();
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true);
HoodieTableMetaClient metaClient = createMetaClient(true);
HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
JavaRDD<WriteStatus> statuses = table.compact(jsc, compactionInstant.getTimestamp(), compactionPlan);
// Force compaction action
@@ -1383,7 +1372,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
try {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
createMetaClient(true), config, jsc);
// 0. All of the rolling stat management is only done by the DELTA commit for MOR and COMMIT for COW other wise
// there may be race conditions
HoodieRollingStatMetadata rollingStatMetadata = new HoodieRollingStatMetadata(actionType);

View File

@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.client.utils;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.config.HoodieWriteConfig;
import org.apache.spark.api.java.JavaSparkContext;
public class ClientUtils {
/**
* Create Consistency Aware MetaClient
*
* @param jsc JavaSparkContext
* @param config HoodieWriteConfig
* @param loadActiveTimelineOnLoad early loading of timeline
*/
public static HoodieTableMetaClient createMetaClient(JavaSparkContext jsc, HoodieWriteConfig config,
boolean loadActiveTimelineOnLoad) {
return new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), loadActiveTimelineOnLoad,
config.getConsistencyGuardConfig());
}
}

View File

@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieCleaningPolicy;
import com.uber.hoodie.common.table.view.FileSystemViewStorageConfig;
import com.uber.hoodie.common.util.ConsistencyGuardConfig;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.io.compact.strategy.CompactionStrategy;
@@ -66,10 +67,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();
private static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM;
private static final String CONSISTENCY_CHECK_ENABLED_PROP = "hoodie.consistency.check.enabled";
private static final String DEFAULT_CONSISTENCY_CHECK_ENABLED = "false";
private static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server";
private static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "false";
private static final String FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP = "hoodie.fail.on.timeline.archiving";
private static final String DEFAULT_FAIL_ON_TIMELINE_ARCHIVING_ENABLED = "true";
// time between successive attempts to ensure written data's metadata is consistent on storage
@@ -85,6 +86,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks";
private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;
private ConsistencyGuardConfig consistencyGuardConfig;
// Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
// We keep track of original config and rewritten config
private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig;
@@ -94,6 +97,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
super(props);
Properties newProps = new Properties();
newProps.putAll(props);
this.consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().fromProperties(newProps).build();
this.clientSpecifiedViewStorageConfig = FileSystemViewStorageConfig.newBuilder().fromProperties(newProps).build();
this.viewStorageConfig = clientSpecifiedViewStorageConfig;
}
@@ -162,10 +166,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM));
}
public boolean isConsistencyCheckEnabled() {
return Boolean.parseBoolean(props.getProperty(CONSISTENCY_CHECK_ENABLED_PROP));
}
public boolean isEmbeddedTimelineServerEnabled() {
return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED));
}
@@ -495,6 +495,14 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Double.valueOf(props.getProperty(HoodieMemoryConfig.WRITESTATUS_FAILURE_FRACTION_PROP));
}
public ConsistencyGuardConfig getConsistencyGuardConfig() {
return consistencyGuardConfig;
}
public void setConsistencyGuardConfig(ConsistencyGuardConfig consistencyGuardConfig) {
this.consistencyGuardConfig = consistencyGuardConfig;
}
public FileSystemViewStorageConfig getViewStorageConfig() {
return viewStorageConfig;
}
@@ -520,6 +528,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private boolean isMetricsConfigSet = false;
private boolean isMemoryConfigSet = false;
private boolean isViewConfigSet = false;
private boolean isConsistencyGuardSet = false;
public Builder fromFile(File propertiesFile) throws IOException {
FileReader reader = new FileReader(propertiesFile);
@@ -639,36 +648,22 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
public Builder withConsistencyGuardConfig(ConsistencyGuardConfig consistencyGuardConfig) {
props.putAll(consistencyGuardConfig.getProps());
isConsistencyGuardSet = true;
return this;
}
public Builder withFinalizeWriteParallelism(int parallelism) {
props.setProperty(FINALIZE_WRITE_PARALLELISM, String.valueOf(parallelism));
return this;
}
public Builder withConsistencyCheckEnabled(boolean enabled) {
props.setProperty(CONSISTENCY_CHECK_ENABLED_PROP, String.valueOf(enabled));
return this;
}
public Builder withEmbeddedTimelineServerEnabled(boolean enabled) {
props.setProperty(EMBEDDED_TIMELINE_SERVER_ENABLED, String.valueOf(enabled));
return this;
}
public Builder withInitialConsistencyCheckIntervalMs(int initialIntevalMs) {
props.setProperty(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(initialIntevalMs));
return this;
}
public Builder withMaxConsistencyCheckIntervalMs(int maxIntervalMs) {
props.setProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(maxIntervalMs));
return this;
}
public Builder withMaxConsistencyChecks(int maxConsistencyChecks) {
props.setProperty(MAX_CONSISTENCY_CHECKS_PROP, String.valueOf(maxConsistencyChecks));
return this;
}
public HoodieWriteConfig build() {
// Check for mandatory properties
setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM,
@@ -691,8 +686,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
HOODIE_WRITE_STATUS_CLASS_PROP, DEFAULT_HOODIE_WRITE_STATUS_CLASS);
setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM),
FINALIZE_WRITE_PARALLELISM, DEFAULT_FINALIZE_WRITE_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(CONSISTENCY_CHECK_ENABLED_PROP),
CONSISTENCY_CHECK_ENABLED_PROP, DEFAULT_CONSISTENCY_CHECK_ENABLED);
setDefaultOnCondition(props, !props.containsKey(EMBEDDED_TIMELINE_SERVER_ENABLED),
EMBEDDED_TIMELINE_SERVER_ENABLED, DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED);
setDefaultOnCondition(props, !props.containsKey(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP),
@@ -717,6 +710,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
HoodieMemoryConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isViewConfigSet,
FileSystemViewStorageConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isConsistencyGuardSet,
ConsistencyGuardConfig.newBuilder().fromProperties(props).build());
// Build WriteConfig at the end
HoodieWriteConfig config = new HoodieWriteConfig(props);

View File

@@ -43,7 +43,6 @@ public abstract class HoodieReadHandle<T extends HoodieRecordPayload> extends Ho
return hoodieTable.getMetaClient().getFs();
}
public Pair<String, String> getPartitionPathFilePair() {
return partitionPathFilePair;
}

View File

@@ -19,14 +19,11 @@
package com.uber.hoodie.io;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.io.storage.HoodieWrapperFileSystem;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.FailSafeConsistencyGuard;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.HoodieTimer;
import com.uber.hoodie.common.util.NoOpConsistencyGuard;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieException;
@@ -68,13 +65,6 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
config.getWriteStatusFailureFraction());
}
private static FileSystem getFileSystem(HoodieTable hoodieTable, HoodieWriteConfig config) {
return new HoodieWrapperFileSystem(hoodieTable.getMetaClient().getFs(), config.isConsistencyCheckEnabled()
? new FailSafeConsistencyGuard(hoodieTable.getMetaClient().getFs(),
config.getMaxConsistencyChecks(), config.getInitialConsistencyCheckIntervalMs(),
config.getMaxConsistencyCheckIntervalMs()) : new NoOpConsistencyGuard());
}
/**
* Generate a write token based on the currently running spark task and its place in the spark dag.
*/
@@ -175,9 +165,6 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
@Override
protected FileSystem getFileSystem() {
return new HoodieWrapperFileSystem(hoodieTable.getMetaClient().getFs(), config.isConsistencyCheckEnabled()
? new FailSafeConsistencyGuard(hoodieTable.getMetaClient().getFs(),
config.getMaxConsistencyChecks(), config.getInitialConsistencyCheckIntervalMs(),
config.getMaxConsistencyCheckIntervalMs()) : new NoOpConsistencyGuard());
return hoodieTable.getMetaClient().getFs();
}
}

View File

@@ -21,6 +21,7 @@ package com.uber.hoodie.table;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.avro.model.HoodieCompactionPlan;
import com.uber.hoodie.avro.model.HoodieSavepointMetadata;
import com.uber.hoodie.client.utils.ClientUtils;
import com.uber.hoodie.common.HoodieCleanStat;
import com.uber.hoodie.common.HoodieRollbackStat;
import com.uber.hoodie.common.SerializableConfiguration;
@@ -83,7 +84,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
this.hadoopConfiguration = new SerializableConfiguration(jsc.hadoopConfiguration());
this.viewManager = FileSystemViewManager.createViewManager(
new SerializableConfiguration(jsc.hadoopConfiguration()), config.getViewStorageConfig());
this.metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true);
this.metaClient = ClientUtils.createMetaClient(jsc, config, true);
this.index = HoodieIndex.createIndex(config, jsc);
}
@@ -291,7 +292,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
*/
public void finalizeWrite(JavaSparkContext jsc, String instantTs, List<HoodieWriteStat> stats)
throws HoodieIOException {
cleanFailedWrites(jsc, instantTs, stats, config.isConsistencyCheckEnabled());
cleanFailedWrites(jsc, instantTs, stats, config.getConsistencyGuardConfig().isConsistencyCheckEnabled());
}
/**
@@ -412,7 +413,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
private boolean waitForCondition(String partitionPath, Stream<Pair<String, String>> partitionFilePaths,
FileVisibility visibility) {
final FileSystem fileSystem = metaClient.getFs();
final FileSystem fileSystem = metaClient.getRawFs();
List<String> fileList = partitionFilePaths.map(Pair::getValue).collect(Collectors.toList());
try {
getFailSafeConsistencyGuard(fileSystem).waitTill(partitionPath, fileList, visibility);
@@ -424,8 +425,6 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
}
private ConsistencyGuard getFailSafeConsistencyGuard(FileSystem fileSystem) {
return new FailSafeConsistencyGuard(fileSystem, config.getMaxConsistencyChecks(),
config.getInitialConsistencyCheckIntervalMs(),
config.getMaxConsistencyCheckIntervalMs());
return new FailSafeConsistencyGuard(fileSystem, config.getConsistencyGuardConfig());
}
}