diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/HoodieCLI.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/HoodieCLI.java index 67a999a58..ba9c023c8 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/HoodieCLI.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/HoodieCLI.java @@ -19,6 +19,7 @@ package com.uber.hoodie.cli; import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.util.ConsistencyGuardConfig; import com.uber.hoodie.common.util.FSUtils; import java.io.IOException; import org.apache.hadoop.conf.Configuration; @@ -27,8 +28,10 @@ import org.apache.hadoop.fs.FileSystem; public class HoodieCLI { public static Configuration conf; + public static ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build(); public static FileSystem fs; public static CLIState state = CLIState.INIT; + public static String basePath; public static HoodieTableMetaClient tableMetadata; public static HoodieTableMetaClient syncTableMetadata; @@ -37,6 +40,18 @@ public class HoodieCLI { INIT, DATASET, SYNC } + public static void setConsistencyGuardConfig(ConsistencyGuardConfig config) { + consistencyGuardConfig = config; + } + + private static void setTableMetaClient(HoodieTableMetaClient tableMetadata) { + HoodieCLI.tableMetadata = tableMetadata; + } + + private static void setBasePath(String basePath) { + HoodieCLI.basePath = basePath; + } + public static boolean initConf() { if (HoodieCLI.conf == null) { HoodieCLI.conf = FSUtils.prepareHadoopConf(new Configuration()); @@ -47,11 +62,16 @@ public class HoodieCLI { public static void initFS(boolean force) throws IOException { if (fs == null || force) { - fs = FileSystem.get(conf); + fs = (tableMetadata != null) ? tableMetadata.getFs() : FileSystem.get(conf); } } - public static void setTableMetadata(HoodieTableMetaClient tableMetadata) { - HoodieCLI.tableMetadata = tableMetadata; + public static void refreshTableMetadata() { + setTableMetaClient(new HoodieTableMetaClient(HoodieCLI.conf, basePath, false, HoodieCLI.consistencyGuardConfig)); + } + + public static void connectTo(String basePath) { + setBasePath(basePath); + refreshTableMetadata(); } } diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CleansCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CleansCommand.java index 79b271e8c..c0f7129a8 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CleansCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CleansCommand.java @@ -23,7 +23,6 @@ import com.uber.hoodie.avro.model.HoodieCleanPartitionMetadata; import com.uber.hoodie.cli.HoodieCLI; import com.uber.hoodie.cli.HoodiePrintHelper; import com.uber.hoodie.cli.TableHeader; -import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; @@ -91,9 +90,8 @@ public class CleansCommand implements CommandMarker { @CliCommand(value = "cleans refresh", help = "Refresh the commits") public String refreshCleans() throws IOException { - HoodieTableMetaClient metadata = new HoodieTableMetaClient(HoodieCLI.conf, HoodieCLI.tableMetadata.getBasePath()); - HoodieCLI.setTableMetadata(metadata); - return "Metadata for table " + metadata.getTableConfig().getTableName() + " refreshed."; + HoodieCLI.refreshTableMetadata(); + return "Metadata for table " + HoodieCLI.tableMetadata.getTableConfig().getTableName() + " refreshed."; } @CliCommand(value = "clean showpartitions", help = "Show partition level details of a clean") diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CommitsCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CommitsCommand.java index 641cb8095..2360503bb 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CommitsCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CommitsCommand.java @@ -115,9 +115,8 @@ public class CommitsCommand implements CommandMarker { @CliCommand(value = "commits refresh", help = "Refresh the commits") public String refreshCommits() throws IOException { - HoodieTableMetaClient metadata = new HoodieTableMetaClient(HoodieCLI.conf, HoodieCLI.tableMetadata.getBasePath()); - HoodieCLI.setTableMetadata(metadata); - return "Metadata for table " + metadata.getTableConfig().getTableName() + " refreshed."; + HoodieCLI.refreshTableMetadata(); + return "Metadata for table " + HoodieCLI.tableMetadata.getTableConfig().getTableName() + " refreshed."; } @CliCommand(value = "commit rollback", help = "Rollback a commit") diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/DatasetsCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/DatasetsCommand.java index f001f5cb0..257091ef8 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/DatasetsCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/DatasetsCommand.java @@ -23,6 +23,7 @@ import com.uber.hoodie.cli.HoodiePrintHelper; import com.uber.hoodie.cli.TableHeader; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.util.ConsistencyGuardConfig; import com.uber.hoodie.exception.DatasetNotFoundException; import java.io.IOException; import java.util.ArrayList; @@ -39,11 +40,25 @@ public class DatasetsCommand implements CommandMarker { @CliCommand(value = "connect", help = "Connect to a hoodie dataset") public String connect( - @CliOption(key = {"path"}, mandatory = true, help = "Base Path of the dataset") final String path) - throws IOException { - boolean initialized = HoodieCLI.initConf(); - HoodieCLI.initFS(initialized); - HoodieCLI.setTableMetadata(new HoodieTableMetaClient(HoodieCLI.conf, path)); + @CliOption(key = {"path"}, mandatory = true, help = "Base Path of the dataset") final String path, + @CliOption(key = {"eventuallyConsistent"}, mandatory = false, unspecifiedDefaultValue = "false", + help = "Enable eventual consistency") final boolean eventuallyConsistent, + @CliOption(key = {"initialCheckIntervalMs"}, mandatory = false, unspecifiedDefaultValue = "2000", + help = "Initial wait time for eventual consistency") final Integer initialConsistencyIntervalMs, + @CliOption(key = {"maxCheckIntervalMs"}, mandatory = false, unspecifiedDefaultValue = "300000", + help = "Max wait time for eventual consistency") final Integer maxConsistencyIntervalMs, + @CliOption(key = {"maxCheckIntervalMs"}, mandatory = false, unspecifiedDefaultValue = "7", + help = "Max checks for eventual consistency") final Integer maxConsistencyChecks) throws IOException { + HoodieCLI.setConsistencyGuardConfig( + ConsistencyGuardConfig.newBuilder() + .withConsistencyCheckEnabled(eventuallyConsistent) + .withInitialConsistencyCheckIntervalMs(initialConsistencyIntervalMs) + .withMaxConsistencyCheckIntervalMs(maxConsistencyIntervalMs) + .withMaxConsistencyChecks(maxConsistencyChecks) + .build()); + HoodieCLI.initConf(); + HoodieCLI.connectTo(path); + HoodieCLI.initFS(true); HoodieCLI.state = HoodieCLI.CLIState.DATASET; return "Metadata for table " + HoodieCLI.tableMetadata.getTableConfig().getTableName() + " loaded"; } @@ -85,7 +100,7 @@ public class DatasetsCommand implements CommandMarker { HoodieTableMetaClient.initTableType(HoodieCLI.conf, path, tableType, name, payloadClass); // Now connect to ensure loading works - return connect(path); + return connect(path, false, 0, 0, 0); } @CliAvailabilityIndicator({"desc"}) diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SavepointsCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SavepointsCommand.java index 82358aac6..87094690c 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SavepointsCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SavepointsCommand.java @@ -23,7 +23,6 @@ import com.uber.hoodie.cli.HoodieCLI; import com.uber.hoodie.cli.HoodiePrintHelper; import com.uber.hoodie.cli.utils.InputStreamConsumer; import com.uber.hoodie.cli.utils.SparkUtil; -import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; @@ -133,9 +132,8 @@ public class SavepointsCommand implements CommandMarker { @CliCommand(value = "savepoints refresh", help = "Refresh the savepoints") public String refreshMetaClient() throws IOException { - HoodieTableMetaClient metadata = new HoodieTableMetaClient(HoodieCLI.conf, HoodieCLI.tableMetadata.getBasePath()); - HoodieCLI.setTableMetadata(metadata); - return "Metadata for table " + metadata.getTableConfig().getTableName() + " refreshed."; + HoodieCLI.refreshTableMetadata(); + return "Metadata for table " + HoodieCLI.tableMetadata.getTableConfig().getTableName() + " refreshed."; } private static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/AbstractHoodieClient.java b/hoodie-client/src/main/java/com/uber/hoodie/AbstractHoodieClient.java index 5aa051c48..c03bd218c 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/AbstractHoodieClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/AbstractHoodieClient.java @@ -19,6 +19,8 @@ package com.uber.hoodie; import com.uber.hoodie.client.embedded.EmbeddedTimelineService; +import com.uber.hoodie.client.utils.ClientUtils; +import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; import java.io.IOException; @@ -117,4 +119,8 @@ public abstract class AbstractHoodieClient implements Serializable { public Optional getTimelineServer() { return timelineServer; } + + protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) { + return ClientUtils.createMetaClient(jsc, config, loadActiveTimelineOnLoad); + } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java index d2b007f70..03bec5905 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java @@ -114,7 +114,7 @@ public class CompactionAdminClient extends AbstractHoodieClient { */ public List unscheduleCompactionPlan( String compactionInstant, boolean skipValidation, int parallelism, boolean dryRun) throws Exception { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTableMetaClient metaClient = createMetaClient(false); List> renameActions = getRenamingActionsForUnschedulingCompactionPlan(metaClient, compactionInstant, parallelism, Optional.absent(), skipValidation); @@ -156,7 +156,7 @@ public class CompactionAdminClient extends AbstractHoodieClient { */ public List unscheduleCompactionFileId(HoodieFileGroupId fgId, boolean skipValidation, boolean dryRun) throws Exception { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTableMetaClient metaClient = createMetaClient(false); List> renameActions = getRenamingActionsForUnschedulingCompactionForFileId(metaClient, fgId, Optional.absent(), skipValidation); @@ -198,7 +198,7 @@ public class CompactionAdminClient extends AbstractHoodieClient { */ public List repairCompaction(String compactionInstant, int parallelism, boolean dryRun) throws Exception { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTableMetaClient metaClient = createMetaClient(false); List validationResults = validateCompactionPlan(metaClient, compactionInstant, parallelism); List failed = validationResults.stream() diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index a9b548b71..3c8b36cb6 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -76,7 +76,6 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; -import org.apache.hadoop.conf.Configuration; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.Partitioner; @@ -153,7 +152,7 @@ public class HoodieWriteClient extends AbstractHo public JavaRDD> filterExists(JavaRDD> hoodieRecords) { // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); + createMetaClient(true), config, jsc); JavaRDD> recordsWithLocation = index.tagLocation(hoodieRecords, jsc, table); return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown()); @@ -471,9 +470,7 @@ public class HoodieWriteClient extends AbstractHo JavaRDD statuses = index.updateLocation(writeStatusRDD, jsc, table); // Trigger the insert and collect statuses statuses = statuses.persist(config.getWriteStatusStorageLevel()); - commitOnAutoCommit(commitTime, statuses, - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true) - .getCommitActionType()); + commitOnAutoCommit(commitTime, statuses, table.getMetaClient().getCommitActionType()); return statuses; } @@ -496,7 +493,7 @@ public class HoodieWriteClient extends AbstractHo */ public boolean commit(String commitTime, JavaRDD writeStatuses, Optional> extraMetadata) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true); + HoodieTableMetaClient metaClient = createMetaClient(false); return commit(commitTime, writeStatuses, extraMetadata, metaClient.getCommitActionType()); } @@ -506,7 +503,7 @@ public class HoodieWriteClient extends AbstractHo logger.info("Commiting " + commitTime); // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); + createMetaClient(true), config, jsc); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieCommitMetadata metadata = new HoodieCommitMetadata(); @@ -536,7 +533,7 @@ public class HoodieWriteClient extends AbstractHo // We cannot have unbounded commit files. Archive commits if we have to archive HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(config, - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true)); + createMetaClient(true)); archiveLog.archiveIfRequired(jsc); if (config.isAutoClean()) { // Call clean to cleanup if there is anything to cleanup after the commit, @@ -580,7 +577,7 @@ public class HoodieWriteClient extends AbstractHo */ public boolean savepoint(String user, String comment) { HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); + createMetaClient(true), config, jsc); if (table.getCompletedCommitsTimeline().empty()) { throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty"); } @@ -610,7 +607,7 @@ public class HoodieWriteClient extends AbstractHo */ public boolean savepoint(String commitTime, String user, String comment) { HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); + createMetaClient(true), config, jsc); if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types"); } @@ -674,7 +671,7 @@ public class HoodieWriteClient extends AbstractHo */ public void deleteSavepoint(String savepointTime) { HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); + createMetaClient(true), config, jsc); if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types"); } @@ -705,7 +702,7 @@ public class HoodieWriteClient extends AbstractHo */ private void deleteRequestedCompaction(String compactionTime) { HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); + createMetaClient(true), config, jsc); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieInstant compactionRequestedInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionTime); @@ -734,7 +731,7 @@ public class HoodieWriteClient extends AbstractHo */ public boolean rollbackToSavepoint(String savepointTime) { HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); + createMetaClient(true), config, jsc); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); // Rollback to savepoint is expected to be a manual operation and no concurrent write or compaction is expected @@ -788,7 +785,7 @@ public class HoodieWriteClient extends AbstractHo // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); + createMetaClient(true), config, jsc); // Get all the commits on the timeline after the provided commit time List instantsToRollback = table.getActiveTimeline().getCommitsAndCompactionTimeline().getInstants() .filter(instant -> HoodieActiveTimeline.GREATER.test(instant.getTimestamp(), instantTime)) @@ -848,7 +845,7 @@ public class HoodieWriteClient extends AbstractHo private List doRollbackAndGetStats(final String commitToRollback) throws IOException { HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); + createMetaClient(true), config, jsc); HoodieTimeline inflightCommitTimeline = table.getInflightCommitTimeline(); HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline(); // Check if any of the commits is a savepoint - do not allow rollback on those commits @@ -899,7 +896,7 @@ public class HoodieWriteClient extends AbstractHo private void finishRollback(final Timer.Context context, List rollbackStats, List commitsToRollback, final String startRollbackTime) throws IOException { HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); + createMetaClient(true), config, jsc); Optional durationInMs = Optional.empty(); Long numFilesDeleted = rollbackStats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum(); if (context != null) { @@ -925,7 +922,7 @@ public class HoodieWriteClient extends AbstractHo private void finishRestore(final Timer.Context context, Map> commitToStats, List commitsToRollback, final String startRestoreTime, final String restoreToInstant) throws IOException { HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); + createMetaClient(true), config, jsc); Optional durationInMs = Optional.empty(); Long numFilesDeleted = 0L; for (Map.Entry> commitToStat : commitToStats.entrySet()) { @@ -1001,7 +998,7 @@ public class HoodieWriteClient extends AbstractHo // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); + createMetaClient(true), config, jsc); List cleanStats = table.clean(jsc); if (cleanStats.isEmpty()) { @@ -1053,7 +1050,7 @@ public class HoodieWriteClient extends AbstractHo rollbackInflightCommits(); } logger.info("Generate a new instant time " + instantTime); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath()); + HoodieTableMetaClient metaClient = createMetaClient(true); // if there are pending compactions, their instantTime must not be greater than that of this instant time metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending -> { Preconditions.checkArgument( @@ -1086,8 +1083,7 @@ public class HoodieWriteClient extends AbstractHo */ public boolean scheduleCompactionAtInstant(String instantTime, Optional> extraMetadata) throws IOException { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), - config.getBasePath(), true); + HoodieTableMetaClient metaClient = createMetaClient(true); // if there are inflight writes, their instantTime must not be less than that of compaction instant time metaClient.getCommitsTimeline().filterInflightsExcludingCompaction().firstInstant().ifPresent(earliestInflight -> { Preconditions.checkArgument( @@ -1130,8 +1126,7 @@ public class HoodieWriteClient extends AbstractHo */ public void commitCompaction(String compactionInstantTime, JavaRDD writeStatuses, Optional> extraMetadata) throws IOException { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), - config.getBasePath(), true); + HoodieTableMetaClient metaClient = createMetaClient(true); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan( @@ -1178,7 +1173,7 @@ public class HoodieWriteClient extends AbstractHo */ private void rollbackInflightCommits() { HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); + createMetaClient(true), config, jsc); HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterInflightsExcludingCompaction(); List commits = inflightTimeline.getInstants().map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); @@ -1190,11 +1185,7 @@ public class HoodieWriteClient extends AbstractHo private HoodieTable getTableAndInitCtx(JavaRDD> records) { // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient( - // Clone Configuration here. Otherwise we could see ConcurrentModificationException (race) in multi-threaded - // execution (HoodieDeltaStreamer) when Configuration gets serialized by Spark. - new Configuration(jsc.hadoopConfiguration()), config.getBasePath(), true), config, jsc); + HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) { writeContext = metrics.getCommitCtx(); } else { @@ -1214,8 +1205,7 @@ public class HoodieWriteClient extends AbstractHo */ private JavaRDD compact(String compactionInstantTime, boolean autoCommit) throws IOException { // Create a Hoodie table which encapsulated the commits and files visible - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), - config.getBasePath(), true); + HoodieTableMetaClient metaClient = createMetaClient(true); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieTimeline pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline(); HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); @@ -1223,7 +1213,7 @@ public class HoodieWriteClient extends AbstractHo //inflight compaction - Needs to rollback first deleting new parquet files before we run compaction. rollbackInflightCompaction(inflightInstant, table); // refresh table - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true); + metaClient = createMetaClient(true); table = HoodieTable.getHoodieTable(metaClient, config, jsc); pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline(); } @@ -1253,8 +1243,7 @@ public class HoodieWriteClient extends AbstractHo activeTimeline.transitionCompactionRequestedToInflight(compactionInstant); compactionTimer = metrics.getCompactionCtx(); // Create a Hoodie table which encapsulated the commits and files visible - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), - config.getBasePath(), true); + HoodieTableMetaClient metaClient = createMetaClient(true); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); JavaRDD statuses = table.compact(jsc, compactionInstant.getTimestamp(), compactionPlan); // Force compaction action @@ -1383,7 +1372,7 @@ public class HoodieWriteClient extends AbstractHo try { // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); + createMetaClient(true), config, jsc); // 0. All of the rolling stat management is only done by the DELTA commit for MOR and COMMIT for COW other wise // there may be race conditions HoodieRollingStatMetadata rollingStatMetadata = new HoodieRollingStatMetadata(actionType); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/client/utils/ClientUtils.java b/hoodie-client/src/main/java/com/uber/hoodie/client/utils/ClientUtils.java new file mode 100644 index 000000000..e9859f914 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/client/utils/ClientUtils.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.client.utils; + +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.config.HoodieWriteConfig; +import org.apache.spark.api.java.JavaSparkContext; + +public class ClientUtils { + + /** + * Create Consistency Aware MetaClient + * + * @param jsc JavaSparkContext + * @param config HoodieWriteConfig + * @param loadActiveTimelineOnLoad early loading of timeline + */ + public static HoodieTableMetaClient createMetaClient(JavaSparkContext jsc, HoodieWriteConfig config, + boolean loadActiveTimelineOnLoad) { + return new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), loadActiveTimelineOnLoad, + config.getConsistencyGuardConfig()); + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index 2d522ad02..e78c35863 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieCleaningPolicy; import com.uber.hoodie.common.table.view.FileSystemViewStorageConfig; +import com.uber.hoodie.common.util.ConsistencyGuardConfig; import com.uber.hoodie.common.util.ReflectionUtils; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.io.compact.strategy.CompactionStrategy; @@ -66,10 +67,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName(); private static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism"; private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM; - private static final String CONSISTENCY_CHECK_ENABLED_PROP = "hoodie.consistency.check.enabled"; - private static final String DEFAULT_CONSISTENCY_CHECK_ENABLED = "false"; + private static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server"; private static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "false"; + private static final String FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP = "hoodie.fail.on.timeline.archiving"; private static final String DEFAULT_FAIL_ON_TIMELINE_ARCHIVING_ENABLED = "true"; // time between successive attempts to ensure written data's metadata is consistent on storage @@ -85,6 +86,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks"; private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7; + private ConsistencyGuardConfig consistencyGuardConfig; + // Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled // We keep track of original config and rewritten config private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig; @@ -94,6 +97,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { super(props); Properties newProps = new Properties(); newProps.putAll(props); + this.consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().fromProperties(newProps).build(); this.clientSpecifiedViewStorageConfig = FileSystemViewStorageConfig.newBuilder().fromProperties(newProps).build(); this.viewStorageConfig = clientSpecifiedViewStorageConfig; } @@ -162,10 +166,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM)); } - public boolean isConsistencyCheckEnabled() { - return Boolean.parseBoolean(props.getProperty(CONSISTENCY_CHECK_ENABLED_PROP)); - } - public boolean isEmbeddedTimelineServerEnabled() { return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED)); } @@ -495,6 +495,14 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Double.valueOf(props.getProperty(HoodieMemoryConfig.WRITESTATUS_FAILURE_FRACTION_PROP)); } + public ConsistencyGuardConfig getConsistencyGuardConfig() { + return consistencyGuardConfig; + } + + public void setConsistencyGuardConfig(ConsistencyGuardConfig consistencyGuardConfig) { + this.consistencyGuardConfig = consistencyGuardConfig; + } + public FileSystemViewStorageConfig getViewStorageConfig() { return viewStorageConfig; } @@ -520,6 +528,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private boolean isMetricsConfigSet = false; private boolean isMemoryConfigSet = false; private boolean isViewConfigSet = false; + private boolean isConsistencyGuardSet = false; public Builder fromFile(File propertiesFile) throws IOException { FileReader reader = new FileReader(propertiesFile); @@ -639,36 +648,22 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder withConsistencyGuardConfig(ConsistencyGuardConfig consistencyGuardConfig) { + props.putAll(consistencyGuardConfig.getProps()); + isConsistencyGuardSet = true; + return this; + } + public Builder withFinalizeWriteParallelism(int parallelism) { props.setProperty(FINALIZE_WRITE_PARALLELISM, String.valueOf(parallelism)); return this; } - public Builder withConsistencyCheckEnabled(boolean enabled) { - props.setProperty(CONSISTENCY_CHECK_ENABLED_PROP, String.valueOf(enabled)); - return this; - } - public Builder withEmbeddedTimelineServerEnabled(boolean enabled) { props.setProperty(EMBEDDED_TIMELINE_SERVER_ENABLED, String.valueOf(enabled)); return this; } - public Builder withInitialConsistencyCheckIntervalMs(int initialIntevalMs) { - props.setProperty(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(initialIntevalMs)); - return this; - } - - public Builder withMaxConsistencyCheckIntervalMs(int maxIntervalMs) { - props.setProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(maxIntervalMs)); - return this; - } - - public Builder withMaxConsistencyChecks(int maxConsistencyChecks) { - props.setProperty(MAX_CONSISTENCY_CHECKS_PROP, String.valueOf(maxConsistencyChecks)); - return this; - } - public HoodieWriteConfig build() { // Check for mandatory properties setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM, @@ -691,8 +686,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { HOODIE_WRITE_STATUS_CLASS_PROP, DEFAULT_HOODIE_WRITE_STATUS_CLASS); setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM), FINALIZE_WRITE_PARALLELISM, DEFAULT_FINALIZE_WRITE_PARALLELISM); - setDefaultOnCondition(props, !props.containsKey(CONSISTENCY_CHECK_ENABLED_PROP), - CONSISTENCY_CHECK_ENABLED_PROP, DEFAULT_CONSISTENCY_CHECK_ENABLED); setDefaultOnCondition(props, !props.containsKey(EMBEDDED_TIMELINE_SERVER_ENABLED), EMBEDDED_TIMELINE_SERVER_ENABLED, DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED); setDefaultOnCondition(props, !props.containsKey(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP), @@ -717,6 +710,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { HoodieMemoryConfig.newBuilder().fromProperties(props).build()); setDefaultOnCondition(props, !isViewConfigSet, FileSystemViewStorageConfig.newBuilder().fromProperties(props).build()); + setDefaultOnCondition(props, !isConsistencyGuardSet, + ConsistencyGuardConfig.newBuilder().fromProperties(props).build()); // Build WriteConfig at the end HoodieWriteConfig config = new HoodieWriteConfig(props); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieReadHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieReadHandle.java index e0d33238a..e4b18383e 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieReadHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieReadHandle.java @@ -43,7 +43,6 @@ public abstract class HoodieReadHandle extends Ho return hoodieTable.getMetaClient().getFs(); } - public Pair getPartitionPathFilePair() { return partitionPathFilePair; } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieWriteHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieWriteHandle.java index 1adeecc1b..aa3eec2cc 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieWriteHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieWriteHandle.java @@ -19,14 +19,11 @@ package com.uber.hoodie.io; import com.uber.hoodie.WriteStatus; -import com.uber.hoodie.common.io.storage.HoodieWrapperFileSystem; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.util.FSUtils; -import com.uber.hoodie.common.util.FailSafeConsistencyGuard; import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.common.util.HoodieTimer; -import com.uber.hoodie.common.util.NoOpConsistencyGuard; import com.uber.hoodie.common.util.ReflectionUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieException; @@ -68,13 +65,6 @@ public abstract class HoodieWriteHandle extends H config.getWriteStatusFailureFraction()); } - private static FileSystem getFileSystem(HoodieTable hoodieTable, HoodieWriteConfig config) { - return new HoodieWrapperFileSystem(hoodieTable.getMetaClient().getFs(), config.isConsistencyCheckEnabled() - ? new FailSafeConsistencyGuard(hoodieTable.getMetaClient().getFs(), - config.getMaxConsistencyChecks(), config.getInitialConsistencyCheckIntervalMs(), - config.getMaxConsistencyCheckIntervalMs()) : new NoOpConsistencyGuard()); - } - /** * Generate a write token based on the currently running spark task and its place in the spark dag. */ @@ -175,9 +165,6 @@ public abstract class HoodieWriteHandle extends H @Override protected FileSystem getFileSystem() { - return new HoodieWrapperFileSystem(hoodieTable.getMetaClient().getFs(), config.isConsistencyCheckEnabled() - ? new FailSafeConsistencyGuard(hoodieTable.getMetaClient().getFs(), - config.getMaxConsistencyChecks(), config.getInitialConsistencyCheckIntervalMs(), - config.getMaxConsistencyCheckIntervalMs()) : new NoOpConsistencyGuard()); + return hoodieTable.getMetaClient().getFs(); } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index 11f545afc..7fd9a29d4 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -21,6 +21,7 @@ package com.uber.hoodie.table; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.avro.model.HoodieSavepointMetadata; +import com.uber.hoodie.client.utils.ClientUtils; import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.HoodieRollbackStat; import com.uber.hoodie.common.SerializableConfiguration; @@ -83,7 +84,7 @@ public abstract class HoodieTable implements Seri this.hadoopConfiguration = new SerializableConfiguration(jsc.hadoopConfiguration()); this.viewManager = FileSystemViewManager.createViewManager( new SerializableConfiguration(jsc.hadoopConfiguration()), config.getViewStorageConfig()); - this.metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true); + this.metaClient = ClientUtils.createMetaClient(jsc, config, true); this.index = HoodieIndex.createIndex(config, jsc); } @@ -291,7 +292,7 @@ public abstract class HoodieTable implements Seri */ public void finalizeWrite(JavaSparkContext jsc, String instantTs, List stats) throws HoodieIOException { - cleanFailedWrites(jsc, instantTs, stats, config.isConsistencyCheckEnabled()); + cleanFailedWrites(jsc, instantTs, stats, config.getConsistencyGuardConfig().isConsistencyCheckEnabled()); } /** @@ -412,7 +413,7 @@ public abstract class HoodieTable implements Seri private boolean waitForCondition(String partitionPath, Stream> partitionFilePaths, FileVisibility visibility) { - final FileSystem fileSystem = metaClient.getFs(); + final FileSystem fileSystem = metaClient.getRawFs(); List fileList = partitionFilePaths.map(Pair::getValue).collect(Collectors.toList()); try { getFailSafeConsistencyGuard(fileSystem).waitTill(partitionPath, fileList, visibility); @@ -424,8 +425,6 @@ public abstract class HoodieTable implements Seri } private ConsistencyGuard getFailSafeConsistencyGuard(FileSystem fileSystem) { - return new FailSafeConsistencyGuard(fileSystem, config.getMaxConsistencyChecks(), - config.getInitialConsistencyCheckIntervalMs(), - config.getMaxConsistencyCheckIntervalMs()); + return new FailSafeConsistencyGuard(fileSystem, config.getConsistencyGuardConfig()); } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java index 7798805ba..ffb6daf23 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java @@ -48,6 +48,7 @@ import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.timeline.HoodieInstant.State; import com.uber.hoodie.common.util.AvroUtils; import com.uber.hoodie.common.util.CompactionUtils; +import com.uber.hoodie.common.util.ConsistencyGuardConfig; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.collection.Pair; import com.uber.hoodie.config.HoodieCompactionConfig; @@ -195,7 +196,8 @@ public class TestCleaner extends TestHoodieClientBase { HoodieCompactionConfig.newBuilder().withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) .retainFileVersions(maxVersions).build()) .withParallelism(1, 1).withBulkInsertParallelism(1) - .withFinalizeWriteParallelism(1).withConsistencyCheckEnabled(true) + .withFinalizeWriteParallelism(1) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) .build(); HoodieWriteClient client = getHoodieWriteClient(cfg); @@ -357,7 +359,9 @@ public class TestCleaner extends TestHoodieClientBase { HoodieCompactionConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainCommits(maxCommits).build()) .withParallelism(1, 1).withBulkInsertParallelism(1) - .withFinalizeWriteParallelism(1).withConsistencyCheckEnabled(true).build(); + .withFinalizeWriteParallelism(1) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) + .build(); HoodieWriteClient client = getHoodieWriteClient(cfg); final Function2, String, Integer> recordInsertGenWrappedFunction = diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestConsistencyGuard.java b/hoodie-client/src/test/java/com/uber/hoodie/TestConsistencyGuard.java index 64d97f8b8..7712c1c78 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestConsistencyGuard.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestConsistencyGuard.java @@ -20,6 +20,7 @@ package com.uber.hoodie; import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.util.ConsistencyGuard; +import com.uber.hoodie.common.util.ConsistencyGuardConfig; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.FailSafeConsistencyGuard; import java.io.IOException; @@ -58,7 +59,7 @@ public class TestConsistencyGuard { HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f2"); HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f3"); - ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 1, 1000, 1000); + ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig(1, 1000, 1000)); passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet")); passing.waitTillFileAppears(new Path(basePath + "/partition/path/f2_1-0-1_000.parquet")); passing.waitTillAllFilesAppear(basePath + "/partition/path", @@ -77,7 +78,7 @@ public class TestConsistencyGuard { @Test(expected = TimeoutException.class) public void testCheckFailingAppear() throws Exception { HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); - ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 3, 10, 10); + ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays.asList(basePath + "/partition/path/f1_1-0-2_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet")); @@ -87,14 +88,14 @@ public class TestConsistencyGuard { @Test(expected = TimeoutException.class) public void testCheckFailingAppears() throws Exception { HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); - ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 3, 10, 10); + ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet")); } @Test(expected = TimeoutException.class) public void testCheckFailingDisappear() throws Exception { HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); - ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 3, 10, 10); + ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays.asList(basePath + "/partition/path/f1_1-0-1_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet")); @@ -104,7 +105,17 @@ public class TestConsistencyGuard { public void testCheckFailingDisappears() throws Exception { HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); - ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 3, 10, 10); + ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet")); } + + private ConsistencyGuardConfig getConsistencyGuardConfig() { + return getConsistencyGuardConfig(3, 10, 10); + } + + private ConsistencyGuardConfig getConsistencyGuardConfig(int maxChecks, int initalSleep, int maxSleep) { + return ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true) + .withInitialConsistencyCheckIntervalMs(initalSleep).withMaxConsistencyCheckIntervalMs(maxSleep) + .withMaxConsistencyChecks(maxChecks).build(); + } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java index a668d3b09..7f0f0d656 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java @@ -26,7 +26,7 @@ import static org.junit.Assert.assertTrue; import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.HoodieTestDataGenerator; -import com.uber.hoodie.common.TestRawTripPayload; +import com.uber.hoodie.common.TestRawTripPayload.MetadataMergeWriteStatus; import com.uber.hoodie.common.model.HoodiePartitionMetadata; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieTableType; @@ -37,12 +37,14 @@ import com.uber.hoodie.common.table.SyncableFileSystemView; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.view.FileSystemViewStorageConfig; import com.uber.hoodie.common.table.view.FileSystemViewStorageType; +import com.uber.hoodie.common.util.ConsistencyGuardConfig; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieStorageConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.index.HoodieIndex; +import com.uber.hoodie.index.HoodieIndex.IndexType; import com.uber.hoodie.table.HoodieTable; import java.io.File; import java.io.IOException; @@ -191,12 +193,12 @@ public class TestHoodieClientBase implements Serializable { return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2) .withBulkInsertParallelism(2).withFinalizeWriteParallelism(2) - .withWriteStatusClass(TestRawTripPayload.MetadataMergeWriteStatus.class) - .withConsistencyCheckEnabled(true) + .withWriteStatusClass(MetadataMergeWriteStatus.class) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) .forTable("test-trip-table") - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build()) .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig( FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE) .build()); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java index aa53d9f29..ad5fae899 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java @@ -39,6 +39,7 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.util.ConsistencyGuardConfig; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.ParquetUtils; import com.uber.hoodie.common.util.collection.Pair; @@ -686,8 +687,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { private Pair> testConsistencyCheck(HoodieTableMetaClient metaClient, String commitTime) throws Exception { - HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withMaxConsistencyCheckIntervalMs(1) - .withInitialConsistencyCheckIntervalMs(1).build(); + HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder() + .withConsistencyCheckEnabled(true) + .withMaxConsistencyCheckIntervalMs(1) + .withInitialConsistencyCheckIntervalMs(1) + .build()) + .build(); HoodieWriteClient client = getHoodieWriteClient(cfg); client.startCommitWithTime(commitTime); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/HoodieWrapperFileSystem.java b/hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/HoodieWrapperFileSystem.java index 14e76ac73..72e2d5716 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/HoodieWrapperFileSystem.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/HoodieWrapperFileSystem.java @@ -22,6 +22,7 @@ import com.uber.hoodie.common.storage.StorageSchemes; import com.uber.hoodie.common.util.ConsistencyGuard; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.NoOpConsistencyGuard; +import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieIOException; import java.io.IOException; import java.net.URI; @@ -236,7 +237,28 @@ public class HoodieWrapperFileSystem extends FileSystem { @Override public boolean rename(Path src, Path dst) throws IOException { - return fileSystem.rename(convertToDefaultPath(src), convertToDefaultPath(dst)); + try { + consistencyGuard.waitTillFileAppears(convertToDefaultPath(src)); + } catch (TimeoutException e) { + throw new HoodieException("Timed out waiting for " + src + " to appear", e); + } + + boolean success = fileSystem.rename(convertToDefaultPath(src), convertToDefaultPath(dst)); + + if (success) { + try { + consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst)); + } catch (TimeoutException e) { + throw new HoodieException("Timed out waiting for " + dst + " to appear", e); + } + + try { + consistencyGuard.waitTillFileDisappears(convertToDefaultPath(src)); + } catch (TimeoutException e) { + throw new HoodieException("Timed out waiting for " + src + " to disappear", e); + } + } + return success; } @Override @@ -247,7 +269,7 @@ public class HoodieWrapperFileSystem extends FileSystem { try { consistencyGuard.waitTillFileDisappears(f); } catch (TimeoutException e) { - return false; + throw new HoodieException("Timed out waiting for " + f + " to disappear", e); } } return success; @@ -270,7 +292,15 @@ public class HoodieWrapperFileSystem extends FileSystem { @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException { - return fileSystem.mkdirs(convertToDefaultPath(f), permission); + boolean success = fileSystem.mkdirs(convertToDefaultPath(f), permission); + if (success) { + try { + consistencyGuard.waitTillFileAppears(convertToDefaultPath(f)); + } catch (TimeoutException e) { + throw new HoodieException("Timed out waiting for directory " + f + " to appear", e); + } + } + return success; } @Override @@ -353,31 +383,39 @@ public class HoodieWrapperFileSystem extends FileSystem { @Override public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { - return fileSystem - .createNonRecursive(convertToDefaultPath(f), overwrite, bufferSize, replication, blockSize, - progress); + Path p = convertToDefaultPath(f); + return wrapOutputStream(p, fileSystem.createNonRecursive(p, overwrite, bufferSize, replication, blockSize, + progress)); } @Override public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { - return fileSystem - .createNonRecursive(convertToDefaultPath(f), permission, overwrite, bufferSize, replication, - blockSize, progress); + Path p = convertToDefaultPath(f); + return wrapOutputStream(p, fileSystem.createNonRecursive(p, permission, overwrite, bufferSize, replication, + blockSize, progress)); } @Override public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, EnumSet flags, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { - return fileSystem - .createNonRecursive(convertToDefaultPath(f), permission, flags, bufferSize, replication, - blockSize, progress); + Path p = convertToDefaultPath(f); + return wrapOutputStream(p, fileSystem.createNonRecursive(p, permission, flags, bufferSize, replication, + blockSize, progress)); } @Override public boolean createNewFile(Path f) throws IOException { - return fileSystem.createNewFile(convertToDefaultPath(f)); + boolean newFile = fileSystem.createNewFile(convertToDefaultPath(f)); + if (newFile) { + try { + consistencyGuard.waitTillFileAppears(convertToDefaultPath(f)); + } catch (TimeoutException e) { + throw new HoodieException("Timed out waiting for " + f + " to appear", e); + } + } + return newFile; } @Override @@ -394,6 +432,11 @@ public class HoodieWrapperFileSystem extends FileSystem { public void concat(Path trg, Path[] psrcs) throws IOException { Path[] psrcsNew = convertDefaults(psrcs); fileSystem.concat(convertToDefaultPath(trg), psrcsNew); + try { + consistencyGuard.waitTillFileAppears(convertToDefaultPath(trg)); + } catch (TimeoutException e) { + throw new HoodieException("Timed out waiting for " + trg + " to appear", e); + } } @Override @@ -408,7 +451,7 @@ public class HoodieWrapperFileSystem extends FileSystem { @Override public boolean delete(Path f) throws IOException { - return fileSystem.delete(convertToDefaultPath(f)); + return delete(f, true); } @Override @@ -493,62 +536,100 @@ public class HoodieWrapperFileSystem extends FileSystem { @Override public boolean mkdirs(Path f) throws IOException { - return fileSystem.mkdirs(convertToDefaultPath(f)); + boolean success = fileSystem.mkdirs(convertToDefaultPath(f)); + if (success) { + try { + consistencyGuard.waitTillFileAppears(convertToDefaultPath(f)); + } catch (TimeoutException e) { + throw new HoodieException("Timed out waiting for directory " + f + " to appear", e); + } + } + return success; } @Override public void copyFromLocalFile(Path src, Path dst) throws IOException { - fileSystem.copyFromLocalFile(convertToDefaultPath(src), convertToDefaultPath(dst)); + fileSystem.copyFromLocalFile(convertToLocalPath(src), convertToDefaultPath(dst)); + try { + consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst)); + } catch (TimeoutException e) { + throw new HoodieException("Timed out waiting for destination " + dst + " to appear", e); + } } @Override public void moveFromLocalFile(Path[] srcs, Path dst) throws IOException { - fileSystem.moveFromLocalFile(convertDefaults(srcs), convertToDefaultPath(dst)); + fileSystem.moveFromLocalFile(convertLocalPaths(srcs), convertToDefaultPath(dst)); + try { + consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst)); + } catch (TimeoutException e) { + throw new HoodieException("Timed out waiting for destination " + dst + " to appear", e); + } } @Override public void moveFromLocalFile(Path src, Path dst) throws IOException { - fileSystem.moveFromLocalFile(convertToDefaultPath(src), convertToDefaultPath(dst)); + fileSystem.moveFromLocalFile(convertToLocalPath(src), convertToDefaultPath(dst)); + try { + consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst)); + } catch (TimeoutException e) { + throw new HoodieException("Timed out waiting for destination " + dst + " to appear", e); + } } @Override public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException { - fileSystem.copyFromLocalFile(delSrc, convertToDefaultPath(src), convertToDefaultPath(dst)); + fileSystem.copyFromLocalFile(delSrc, convertToLocalPath(src), convertToDefaultPath(dst)); + try { + consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst)); + } catch (TimeoutException e) { + throw new HoodieException("Timed out waiting for destination " + dst + " to appear", e); + } } @Override public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path[] srcs, Path dst) throws IOException { fileSystem - .copyFromLocalFile(delSrc, overwrite, convertDefaults(srcs), convertToDefaultPath(dst)); + .copyFromLocalFile(delSrc, overwrite, convertLocalPaths(srcs), convertToDefaultPath(dst)); + try { + consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst)); + } catch (TimeoutException e) { + throw new HoodieException("Timed out waiting for destination " + dst + " to appear", e); + } } @Override public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst) throws IOException { fileSystem - .copyFromLocalFile(delSrc, overwrite, convertToDefaultPath(src), convertToDefaultPath(dst)); + .copyFromLocalFile(delSrc, overwrite, convertToLocalPath(src), convertToDefaultPath(dst)); + try { + consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst)); + } catch (TimeoutException e) { + throw new HoodieException("Timed out waiting for destination " + dst + " to appear", e); + } } @Override public void copyToLocalFile(Path src, Path dst) throws IOException { - fileSystem.copyToLocalFile(convertToDefaultPath(src), convertToDefaultPath(dst)); + fileSystem.copyToLocalFile(convertToDefaultPath(src), convertToLocalPath(dst)); } @Override public void moveToLocalFile(Path src, Path dst) throws IOException { - fileSystem.moveToLocalFile(convertToDefaultPath(src), convertToDefaultPath(dst)); + fileSystem.moveToLocalFile(convertToDefaultPath(src), convertToLocalPath(dst)); } @Override public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException { - fileSystem.copyToLocalFile(delSrc, convertToDefaultPath(src), convertToDefaultPath(dst)); + fileSystem.copyToLocalFile(delSrc, convertToDefaultPath(src), convertToLocalPath(dst)); } @Override public void copyToLocalFile(boolean delSrc, Path src, Path dst, boolean useRawLocalFileSystem) throws IOException { - fileSystem.copyToLocalFile(delSrc, convertToDefaultPath(src), convertToDefaultPath(dst), + fileSystem.copyToLocalFile(delSrc, convertToDefaultPath(src), convertToLocalPath(dst), useRawLocalFileSystem); } @@ -787,6 +868,22 @@ public class HoodieWrapperFileSystem extends FileSystem { return convertPathWithScheme(oldPath, fileSystem.getScheme()); } + private Path convertToLocalPath(Path oldPath) { + try { + return convertPathWithScheme(oldPath, FileSystem.getLocal(getConf()).getScheme()); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + } + + private Path[] convertLocalPaths(Path[] psrcs) { + Path[] psrcsNew = new Path[psrcs.length]; + for (int i = 0; i < psrcs.length; i++) { + psrcsNew[i] = convertToLocalPath(psrcs[i]); + } + return psrcsNew; + } + private Path[] convertDefaults(Path[] psrcs) { Path[] psrcsNew = new Path[psrcs.length]; for (int i = 0; i < psrcs.length; i++) { @@ -803,4 +900,8 @@ public class HoodieWrapperFileSystem extends FileSystem { throw new IllegalArgumentException(file.toString() + " does not have a open stream. Cannot get the bytes written on the stream"); } + + public FileSystem getFileSystem() { + return fileSystem; + } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java index c8dd7d26b..7ed0214af 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java @@ -20,12 +20,17 @@ package com.uber.hoodie.common.table; import static com.uber.hoodie.common.model.HoodieTableType.MERGE_ON_READ; +import com.google.common.base.Preconditions; import com.uber.hoodie.common.SerializableConfiguration; +import com.uber.hoodie.common.io.storage.HoodieWrapperFileSystem; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieArchivedTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.util.ConsistencyGuardConfig; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.FailSafeConsistencyGuard; +import com.uber.hoodie.common.util.NoOpConsistencyGuard; import com.uber.hoodie.exception.DatasetNotFoundException; import com.uber.hoodie.exception.HoodieException; import java.io.File; @@ -66,13 +71,14 @@ public class HoodieTableMetaClient implements Serializable { public static final String MARKER_EXTN = ".marker"; private String basePath; - private transient FileSystem fs; + private transient HoodieWrapperFileSystem fs; private String metaPath; private SerializableConfiguration hadoopConf; private HoodieTableType tableType; private HoodieTableConfig tableConfig; private HoodieActiveTimeline activeTimeline; private HoodieArchivedTimeline archivedTimeline; + private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build(); public HoodieTableMetaClient(Configuration conf, String basePath) throws DatasetNotFoundException { @@ -81,13 +87,19 @@ public class HoodieTableMetaClient implements Serializable { } public HoodieTableMetaClient(Configuration conf, String basePath, - boolean loadActiveTimelineOnLoad) + boolean loadActiveTimelineOnLoad) { + this(conf, basePath, loadActiveTimelineOnLoad, ConsistencyGuardConfig.newBuilder().build()); + } + + public HoodieTableMetaClient(Configuration conf, String basePath, + boolean loadActiveTimelineOnLoad, ConsistencyGuardConfig consistencyGuardConfig) throws DatasetNotFoundException { log.info("Loading HoodieTableMetaClient from " + basePath); this.basePath = basePath; + this.consistencyGuardConfig = consistencyGuardConfig; this.hadoopConf = new SerializableConfiguration(conf); Path basePathDir = new Path(this.basePath); - this.metaPath = basePath + File.separator + METAFOLDER_NAME; + this.metaPath = new Path(basePath, METAFOLDER_NAME).toString(); Path metaPathDir = new Path(this.metaPath); this.fs = getFs(); DatasetNotFoundException.checkValidDataset(fs, basePathDir, metaPathDir); @@ -190,13 +202,25 @@ public class HoodieTableMetaClient implements Serializable { /** * Get the FS implementation for this table */ - public FileSystem getFs() { + public HoodieWrapperFileSystem getFs() { if (fs == null) { - fs = FSUtils.getFs(metaPath, hadoopConf.get()); + FileSystem fileSystem = FSUtils.getFs(metaPath, hadoopConf.get()); + Preconditions.checkArgument(!(fileSystem instanceof HoodieWrapperFileSystem), + "File System not expected to be that of HoodieWrapperFileSystem"); + fs = new HoodieWrapperFileSystem(fileSystem, consistencyGuardConfig.isConsistencyCheckEnabled() + ? new FailSafeConsistencyGuard(fileSystem, consistencyGuardConfig) : new NoOpConsistencyGuard()); } return fs; } + /** + * Return raw file-system + * @return + */ + public FileSystem getRawFs() { + return getFs().getFileSystem(); + } + public Configuration getHadoopConf() { return hadoopConf.get(); } @@ -223,6 +247,10 @@ public class HoodieTableMetaClient implements Serializable { return activeTimeline; } + public ConsistencyGuardConfig getConsistencyGuardConfig() { + return consistencyGuardConfig; + } + /** * Get the archived commits as a timeline. This is costly operation, as all data from the archived * files are read. This should not be used, unless for historical debugging purposes diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ConsistencyGuardConfig.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ConsistencyGuardConfig.java new file mode 100644 index 000000000..14181919d --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ConsistencyGuardConfig.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.util; + +import com.uber.hoodie.config.DefaultHoodieConfig; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; + +public class ConsistencyGuardConfig extends DefaultHoodieConfig { + + private static final String CONSISTENCY_CHECK_ENABLED_PROP = "hoodie.consistency.check.enabled"; + private static final String DEFAULT_CONSISTENCY_CHECK_ENABLED = "false"; + + // time between successive attempts to ensure written data's metadata is consistent on storage + private static final String INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP = + "hoodie.consistency.check.initial_interval_ms"; + private static long DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = 2000L; + + // max interval time + private static final String MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP = "hoodie.consistency.check.max_interval_ms"; + private static long DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS = 300000L; + + // maximum number of checks, for consistency of written data. Will wait upto 256 Secs + private static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks"; + private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7; + + public ConsistencyGuardConfig(Properties props) { + super(props); + } + + public static ConsistencyGuardConfig.Builder newBuilder() { + return new Builder(); + } + + public boolean isConsistencyCheckEnabled() { + return Boolean.parseBoolean(props.getProperty(CONSISTENCY_CHECK_ENABLED_PROP)); + } + + public int getMaxConsistencyChecks() { + return Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECKS_PROP)); + } + + public int getInitialConsistencyCheckIntervalMs() { + return Integer.parseInt(props.getProperty(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP)); + } + + public int getMaxConsistencyCheckIntervalMs() { + return Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP)); + } + + public static class Builder { + + private final Properties props = new Properties(); + + public Builder fromFile(File propertiesFile) throws IOException { + FileReader reader = new FileReader(propertiesFile); + try { + props.load(reader); + return this; + } finally { + reader.close(); + } + } + + public Builder fromProperties(Properties props) { + this.props.putAll(props); + return this; + } + + public Builder withConsistencyCheckEnabled(boolean enabled) { + props.setProperty(CONSISTENCY_CHECK_ENABLED_PROP, String.valueOf(enabled)); + return this; + } + + public Builder withInitialConsistencyCheckIntervalMs(int initialIntevalMs) { + props.setProperty(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(initialIntevalMs)); + return this; + } + + public Builder withMaxConsistencyCheckIntervalMs(int maxIntervalMs) { + props.setProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(maxIntervalMs)); + return this; + } + + public Builder withMaxConsistencyChecks(int maxConsistencyChecks) { + props.setProperty(MAX_CONSISTENCY_CHECKS_PROP, String.valueOf(maxConsistencyChecks)); + return this; + } + + public ConsistencyGuardConfig build() { + setDefaultOnCondition(props, !props.containsKey(CONSISTENCY_CHECK_ENABLED_PROP), + CONSISTENCY_CHECK_ENABLED_PROP, DEFAULT_CONSISTENCY_CHECK_ENABLED); + setDefaultOnCondition(props, !props.containsKey(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP), + INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS)); + setDefaultOnCondition(props, !props.containsKey(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP), + MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS)); + setDefaultOnCondition(props, !props.containsKey(MAX_CONSISTENCY_CHECKS_PROP), + MAX_CONSISTENCY_CHECKS_PROP, String.valueOf(DEFAULT_MAX_CONSISTENCY_CHECKS)); + + return new ConsistencyGuardConfig(props); + } + } +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FailSafeConsistencyGuard.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FailSafeConsistencyGuard.java index f69fcefd7..ee215be19 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FailSafeConsistencyGuard.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FailSafeConsistencyGuard.java @@ -18,6 +18,7 @@ package com.uber.hoodie.common.util; +import com.google.common.base.Preconditions; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; @@ -40,15 +41,12 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard { private static final transient Logger log = LogManager.getLogger(FailSafeConsistencyGuard.class); private final FileSystem fs; - private final int maxAttempts; - private final long initialDelayMs; - private final long maxDelayMs; + private final ConsistencyGuardConfig consistencyGuardConfig; - public FailSafeConsistencyGuard(FileSystem fs, int maxAttempts, long initalDelayMs, long maxDelayMs) { + public FailSafeConsistencyGuard(FileSystem fs, ConsistencyGuardConfig consistencyGuardConfig) { this.fs = fs; - this.maxAttempts = maxAttempts; - this.initialDelayMs = initalDelayMs; - this.maxDelayMs = maxDelayMs; + this.consistencyGuardConfig = consistencyGuardConfig; + Preconditions.checkArgument(consistencyGuardConfig.isConsistencyCheckEnabled()); } @Override @@ -121,13 +119,13 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard { */ private boolean checkFileVisibility(Path filePath, FileVisibility visibility) throws IOException { try { - FileStatus[] status = fs.listStatus(filePath); + FileStatus status = fs.getFileStatus(filePath); switch (visibility) { case APPEAR: - return status.length != 0; + return status != null; case DISAPPEAR: default: - return status.length == 0; + return status == null; } } catch (FileNotFoundException nfe) { switch (visibility) { @@ -147,9 +145,9 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard { * @throws TimeoutException */ private void waitForFileVisibility(Path filePath, FileVisibility visibility) throws TimeoutException { - long waitMs = initialDelayMs; + long waitMs = consistencyGuardConfig.getInitialConsistencyCheckIntervalMs(); int attempt = 0; - while (attempt < maxAttempts) { + while (attempt < consistencyGuardConfig.getMaxConsistencyChecks()) { try { if (checkFileVisibility(filePath, visibility)) { return; @@ -160,7 +158,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard { sleepSafe(waitMs); waitMs = waitMs * 2; // double check interval every attempt - waitMs = waitMs > maxDelayMs ? maxDelayMs : waitMs; + waitMs = Math.min(waitMs, consistencyGuardConfig.getMaxConsistencyCheckIntervalMs()); attempt++; } throw new TimeoutException("Timed-out waiting for the file to " + visibility.name()); @@ -173,17 +171,17 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard { * @throws TimeoutException when retries are exhausted */ private void retryTillSuccess(Function predicate, String timedOutMessage) throws TimeoutException { - long waitMs = initialDelayMs; + long waitMs = consistencyGuardConfig.getInitialConsistencyCheckIntervalMs(); int attempt = 0; - log.warn("Max Attempts=" + maxAttempts); - while (attempt < maxAttempts) { + log.info("Max Attempts=" + consistencyGuardConfig.getMaxConsistencyChecks()); + while (attempt < consistencyGuardConfig.getMaxConsistencyChecks()) { boolean success = predicate.apply(attempt); if (success) { return; } sleepSafe(waitMs); waitMs = waitMs * 2; // double check interval every attempt - waitMs = waitMs > maxDelayMs ? maxDelayMs : waitMs; + waitMs = Math.min(waitMs, consistencyGuardConfig.getMaxConsistencyCheckIntervalMs()); attempt++; } throw new TimeoutException(timedOutMessage);