diff --git a/docs/configurations.md b/docs/configurations.md
index 4639fb2e5..74d360854 100644
--- a/docs/configurations.md
+++ b/docs/configurations.md
@@ -25,6 +25,8 @@ summary: "Here we list all possible configurations and what they mean"
Should HoodieWriteClient autoCommit after insert and upsert. The client can choose to turn off auto-commit and commit on a "defined success condition"
- [withAssumeDatePartitioning](#withAssumeDatePartitioning) (assumeDatePartitioning = false)
Should HoodieWriteClient assume the data is partitioned by dates, i.e three levels from base path. This is a stop-gap to support tables created by versions < 0.3.1. Will be removed eventually
+ - [withConsistencyCheckEnabled](#withConsistencyCheckEnabled) (enabled = false)
+ Should HoodieWriteClient perform additional checks to ensure written files' are listable on the underlying filesystem/storage. Set this to true, to workaround S3's eventual consistency model and ensure all data written as a part of a commit is faithfully available for queries.
- [withIndexConfig](#withIndexConfig) (HoodieIndexConfig)
Hoodie uses a index to help find the FileID which contains an incoming record key. This is pluggable to have a external index (HBase) or use the default bloom filter stored in the Parquet files
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
index 0e63ac167..147ba56a0 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
@@ -497,24 +497,25 @@ public class HoodieWriteClient implements Seriali
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
-
- List> stats = writeStatuses.mapToPair(
- (PairFunction) writeStatus -> new Tuple2<>(
- writeStatus.getPartitionPath(), writeStatus.getStat())).collect();
-
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
- updateMetadataAndRollingStats(actionType, metadata, stats);
+ List writeStatusList = writeStatuses.collect();
+ updateMetadataAndRollingStats(actionType, metadata, writeStatusList);
// Finalize write
final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
- final Optional result = table.finalizeWrite(jsc, stats);
- if (finalizeCtx != null && result.isPresent()) {
- Optional durationInMs = Optional.of(metrics.getDurationInMs(finalizeCtx.stop()));
- durationInMs.ifPresent(duration -> {
- logger.info("Finalize write elapsed time (milliseconds): " + duration);
- metrics.updateFinalizeWriteMetrics(duration, result.get());
- });
+ try {
+ table.finalizeWrite(jsc, writeStatusList);
+ if (finalizeCtx != null) {
+ Optional durationInMs = Optional.of(metrics.getDurationInMs(finalizeCtx.stop()));
+ durationInMs.ifPresent(duration -> {
+ logger.info("Finalize write elapsed time (milliseconds): " + duration);
+ metrics.updateFinalizeWriteMetrics(duration, writeStatusList.size());
+ });
+ }
+ } catch (HoodieIOException ioe) {
+ throw new HoodieCommitException(
+ "Failed to complete commit " + commitTime + " due to finalize errors.", ioe);
}
// add in extra metadata
@@ -555,11 +556,11 @@ public class HoodieWriteClient implements Seriali
logger.info("Committed " + commitTime);
} catch (IOException e) {
throw new HoodieCommitException(
- "Failed to commit " + config.getBasePath() + " at time " + commitTime, e);
+ "Failed to complete commit " + config.getBasePath() + " at time " + commitTime, e);
} catch (ParseException e) {
throw new HoodieCommitException(
- "Commit time is not of valid format.Failed to commit " + config.getBasePath()
- + " at time " + commitTime, e);
+ "Failed to complete commit " + config.getBasePath() + " at time " + commitTime
+ + "Instant time is not of valid format", e);
}
return true;
}
@@ -1258,8 +1259,8 @@ public class HoodieWriteClient implements Seriali
return compactionInstantTimeOpt;
}
- private void updateMetadataAndRollingStats(String actionType, HoodieCommitMetadata metadata, List> stats) {
+ private void updateMetadataAndRollingStats(String actionType, HoodieCommitMetadata metadata,
+ List writeStatusList) {
// TODO : make sure we cannot rollback / archive last commit file
try {
// Create a Hoodie table which encapsulated the commits and files visible
@@ -1272,12 +1273,14 @@ public class HoodieWriteClient implements Seriali
// 2. Now, first read the existing rolling stats and merge with the result of current metadata.
// Need to do this on every commit (delta or commit) to support COW and MOR.
- for (Tuple2 stat : stats) {
- metadata.addWriteStat(stat._1(), stat._2());
- HoodieRollingStat hoodieRollingStat = new HoodieRollingStat(stat._2().getFileId(),
- stat._2().getNumWrites() - (stat._2().getNumUpdateWrites() - stat._2.getNumDeletes()),
- stat._2().getNumUpdateWrites(), stat._2.getNumDeletes(), stat._2().getTotalWriteBytes());
- rollingStatMetadata.addRollingStat(stat._1, hoodieRollingStat);
+ for (WriteStatus status : writeStatusList) {
+ HoodieWriteStat stat = status.getStat();
+ //TODO: why is stat.getPartitionPath() null at times here.
+ metadata.addWriteStat(status.getPartitionPath(), stat);
+ HoodieRollingStat hoodieRollingStat = new HoodieRollingStat(stat.getFileId(),
+ stat.getNumWrites() - (stat.getNumUpdateWrites() - stat.getNumDeletes()),
+ stat.getNumUpdateWrites(), stat.getNumDeletes(), stat.getTotalWriteBytes());
+ rollingStatMetadata.addRollingStat(status.getPartitionPath(), hoodieRollingStat);
}
// The last rolling stat should be present in the completed timeline
Optional lastInstant = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
index 2349860b1..07ddc9673 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
@@ -68,6 +68,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE = "false";
private static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM;
+ private static final String CONSISTENCY_CHECK_ENABLED = "hoodie.consistency.check.enabled";
+ private static final String DEFAULT_CONSISTENCY_CHECK_ENABLED = "false";
private HoodieWriteConfig(Properties props) {
super(props);
@@ -150,6 +152,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM));
}
+ public boolean isConsistencyCheckEnabled() {
+ return Boolean.parseBoolean(props.getProperty(CONSISTENCY_CHECK_ENABLED));
+ }
+
/**
* compaction properties
**/
@@ -551,6 +557,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
+ public Builder withConsistencyCheckEnabled(boolean enabled) {
+ props.setProperty(CONSISTENCY_CHECK_ENABLED, String.valueOf(enabled));
+ return this;
+ }
+
public HoodieWriteConfig build() {
HoodieWriteConfig config = new HoodieWriteConfig(props);
// Check for mandatory properties
@@ -581,6 +592,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE);
setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM),
FINALIZE_WRITE_PARALLELISM, DEFAULT_FINALIZE_WRITE_PARALLELISM);
+ setDefaultOnCondition(props, !props.containsKey(CONSISTENCY_CHECK_ENABLED),
+ CONSISTENCY_CHECK_ENABLED, DEFAULT_CONSISTENCY_CHECK_ENABLED);
// Make sure the props is propagated
setDefaultOnCondition(props, !isIndexConfigSet,
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/ConsistencyCheck.java b/hoodie-client/src/main/java/com/uber/hoodie/io/ConsistencyCheck.java
new file mode 100644
index 000000000..43913089e
--- /dev/null
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/ConsistencyCheck.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.io;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.uber.hoodie.common.SerializableConfiguration;
+import com.uber.hoodie.common.util.FSUtils;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * Checks if all the written paths have their metadata consistent on storage and thus be listable to
+ * queries. This is important for cloud, stores like AWS S3 which are eventually consistent with
+ * their metadata. Without such checks, we may proceed to commit the written data, without the
+ * written data being made available to queries. In cases like incremental pull this can lead to
+ * downstream readers failing to ever see some data.
+ */
+public class ConsistencyCheck implements Serializable {
+
+ private static final transient Logger log = LogManager.getLogger(ConsistencyCheck.class);
+
+ private String basePath;
+
+ private List relPaths;
+
+ private transient JavaSparkContext jsc;
+
+ private SerializableConfiguration hadoopConf;
+
+ private int parallelism;
+
+ public ConsistencyCheck(String basePath, List relPaths, JavaSparkContext jsc,
+ int parallelism) {
+ this.basePath = basePath;
+ this.relPaths = relPaths;
+ this.jsc = jsc;
+ this.hadoopConf = new SerializableConfiguration(jsc.hadoopConfiguration());
+ this.parallelism = parallelism;
+ }
+
+ @VisibleForTesting
+ void sleepSafe(long waitMs) {
+ try {
+ Thread.sleep(waitMs);
+ } catch (InterruptedException e) {
+ // ignore & continue next attempt
+ }
+ }
+
+ /**
+ * Repeatedly lists the filesystem on the paths, with exponential backoff and marks paths found as
+ * passing the check.
+ *
+ * @return list of (relative) paths failing the check
+ */
+ public List check(int maxAttempts, long initalDelayMs) {
+ long waitMs = initalDelayMs;
+ int attempt = 0;
+
+ List remainingPaths = new ArrayList<>(relPaths);
+ while (attempt++ < maxAttempts) {
+ remainingPaths = jsc.parallelize(remainingPaths, parallelism)
+ .groupBy(p -> new Path(basePath, p).getParent()) // list by partition
+ .map(pair -> {
+ FileSystem fs = FSUtils.getFs(basePath, hadoopConf.get());
+ // list the partition path and obtain all file paths present
+ Set fileNames = Arrays.stream(fs.listStatus(pair._1()))
+ .map(s -> s.getPath().getName())
+ .collect(Collectors.toSet());
+
+ // only return paths that can't be found
+ return StreamSupport.stream(pair._2().spliterator(), false)
+ .filter(p -> !fileNames.contains(new Path(basePath, p).getName()))
+ .collect(Collectors.toList());
+ })
+ .flatMap(itr -> itr.iterator()).collect();
+ if (remainingPaths.size() == 0) {
+ break; // we are done.
+ }
+
+ log.info("Consistency check, waiting for " + waitMs + " ms , after attempt :" + attempt);
+ sleepSafe(waitMs);
+ waitMs = waitMs * 2; // double check interval every attempt
+ }
+
+ return remainingPaths;
+ }
+}
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
index d8a0f484b..81efc83d9 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
@@ -29,7 +29,6 @@ import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieRollingStatMetadata;
-import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
@@ -382,44 +381,40 @@ public class HoodieCopyOnWriteTable extends Hoodi
*/
@Override
@SuppressWarnings("unchecked")
- public Optional finalizeWrite(JavaSparkContext jsc, List writeStatuses) {
- if (!config.shouldUseTempFolderForCopyOnWrite()) {
- return Optional.empty();
+ public void finalizeWrite(JavaSparkContext jsc, List writeStatuses)
+ throws HoodieIOException {
+
+ super.finalizeWrite(jsc, writeStatuses);
+
+ if (config.shouldUseTempFolderForCopyOnWrite()) {
+ // This is to rename each data file from temporary path to its final location
+ jsc.parallelize(writeStatuses, config.getFinalizeWriteParallelism())
+ .map(status -> status.getStat())
+ .foreach(writeStat -> {
+ final FileSystem fs = getMetaClient().getFs();
+ final Path finalPath = new Path(config.getBasePath(), writeStat.getPath());
+
+ if (writeStat.getTempPath() != null) {
+ final Path tempPath = new Path(config.getBasePath(), writeStat.getTempPath());
+ boolean success;
+ try {
+ logger.info("Renaming temporary file: " + tempPath + " to " + finalPath);
+ success = fs.rename(tempPath, finalPath);
+ } catch (IOException e) {
+ throw new HoodieIOException(
+ "Failed to rename file: " + tempPath + " to " + finalPath);
+ }
+
+ if (!success) {
+ throw new HoodieIOException(
+ "Failed to rename file: " + tempPath + " to " + finalPath);
+ }
+ }
+ });
+
+ // clean temporary data files
+ cleanTemporaryDataFiles(jsc);
}
-
- // This is to rename each data file from temporary path to its final location
- List> results = jsc
- .parallelize(writeStatuses, config.getFinalizeWriteParallelism()).map(writeStatus -> {
- Tuple2 writeStatTuple2 = (Tuple2)
- writeStatus;
- HoodieWriteStat writeStat = writeStatTuple2._2();
- final FileSystem fs = getMetaClient().getFs();
- final Path finalPath = new Path(config.getBasePath(), writeStat.getPath());
-
- if (writeStat.getTempPath() != null) {
- final Path tempPath = new Path(config.getBasePath(), writeStat.getTempPath());
- boolean success;
- try {
- logger.info("Renaming temporary file: " + tempPath + " to " + finalPath);
- success = fs.rename(tempPath, finalPath);
- } catch (IOException e) {
- throw new HoodieIOException(
- "Failed to rename file: " + tempPath + " to " + finalPath);
- }
-
- if (!success) {
- throw new HoodieIOException(
- "Failed to rename file: " + tempPath + " to " + finalPath);
- }
- }
-
- return new Tuple2<>(writeStat.getPath(), true);
- }).collect();
-
- // clean temporary data files
- cleanTemporaryDataFiles(jsc);
-
- return Optional.of(results.size());
}
/**
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java
index 77e6ddf4f..9968d72d4 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java
@@ -43,6 +43,7 @@ import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieCompactionException;
import com.uber.hoodie.exception.HoodieException;
+import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.HoodieRollbackException;
import com.uber.hoodie.exception.HoodieUpsertException;
import com.uber.hoodie.func.MergeOnReadLazyInsertIterable;
@@ -294,9 +295,10 @@ public class HoodieMergeOnReadTable extends
}
@Override
- public Optional finalizeWrite(JavaSparkContext jsc, List writeStatuses) {
- // do nothing for MOR tables
- return Optional.empty();
+ public void finalizeWrite(JavaSparkContext jsc, List writeStatuses)
+ throws HoodieIOException {
+ // delegate to base class for MOR tables
+ super.finalizeWrite(jsc, writeStatuses);
}
@Override
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java
index 4ef33f9b1..28de32b90 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java
@@ -23,7 +23,6 @@ import com.uber.hoodie.common.HoodieCleanStat;
import com.uber.hoodie.common.HoodieRollbackStat;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
-import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.TableFileSystemView;
@@ -33,26 +32,31 @@ import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.AvroUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieException;
+import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.HoodieSavepointException;
import com.uber.hoodie.index.HoodieIndex;
+import com.uber.hoodie.io.ConsistencyCheck;
import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
-import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import scala.Tuple2;
/**
* Abstract implementation of a HoodieTable
*/
public abstract class HoodieTable implements Serializable {
+ // time between successive attempts to ensure written data's metadata is consistent on storage
+ private static long INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = 2000L;
+ // maximum number of checks, for consistency of written data. Will wait upto 256 Secs
+ private static int MAX_CONSISTENCY_CHECKS = 7;
+
protected final HoodieWriteConfig config;
protected final HoodieTableMetaClient metaClient;
protected final HoodieIndex index;
@@ -245,11 +249,26 @@ public abstract class HoodieTable implements Seri
throws IOException;
/**
- * Finalize the written data files
+ * Finalize the written data onto storage. Perform any final cleanups
*
+ * @param jsc Spark Context
* @param writeStatuses List of WriteStatus
- * @return number of files finalized
+ * @throws HoodieIOException if some paths can't be finalized on storage
*/
- public abstract Optional finalizeWrite(JavaSparkContext jsc,
- List> writeStatuses);
+ public void finalizeWrite(JavaSparkContext jsc, List writeStatuses)
+ throws HoodieIOException {
+ if (config.isConsistencyCheckEnabled()) {
+ List pathsToCheck = writeStatuses.stream()
+ .map(ws -> ws.getStat().getTempPath() != null
+ ? ws.getStat().getTempPath() : ws.getStat().getPath())
+ .collect(Collectors.toList());
+
+ List failingPaths = new ConsistencyCheck(config.getBasePath(), pathsToCheck, jsc,
+ config.getFinalizeWriteParallelism())
+ .check(MAX_CONSISTENCY_CHECKS, INITIAL_CONSISTENCY_CHECK_INTERVAL_MS);
+ if (failingPaths.size() > 0) {
+ throw new HoodieIOException("Could not verify consistency of paths : " + failingPaths);
+ }
+ }
+ }
}
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
index 2d2417cca..03f895b27 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
@@ -192,6 +192,7 @@ public class TestCleaner extends TestHoodieClientBase {
HoodieCompactionConfig.newBuilder().withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS)
.retainFileVersions(maxVersions).build())
.withParallelism(1, 1).withBulkInsertParallelism(1)
+ .withFinalizeWriteParallelism(1).withConsistencyCheckEnabled(true)
.build();
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
@@ -271,9 +272,6 @@ public class TestCleaner extends TestHoodieClientBase {
for (HoodieFileGroup fileGroup : fileGroups) {
if (selectedFileIdForCompaction.containsKey(fileGroup.getId())) {
// Ensure latest file-slice selected for compaction is retained
- String oldestCommitRetained =
- fileGroup.getAllDataFiles().map(HoodieDataFile::getCommitTime).sorted().findFirst().get();
-
Optional dataFileForCompactionPresent =
fileGroup.getAllDataFiles().filter(df -> {
return compactionFileIdToLatestFileSlice.get(fileGroup.getId())
@@ -357,7 +355,8 @@ public class TestCleaner extends TestHoodieClientBase {
HoodieWriteConfig cfg = getConfigBuilder().withCompactionConfig(
HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainCommits(maxCommits).build())
- .withParallelism(1, 1).withBulkInsertParallelism(1).build();
+ .withParallelism(1, 1).withBulkInsertParallelism(1)
+ .withFinalizeWriteParallelism(1).withConsistencyCheckEnabled(true).build();
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
final Function2, String, Integer> recordInsertGenWrappedFunction =
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java
index d33697baf..282601d4d 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java
@@ -143,6 +143,8 @@ public class TestHoodieClientBase implements Serializable {
HoodieWriteConfig.Builder getConfigBuilder() {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
+ .withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
+ .withConsistencyCheckEnabled(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
.forTable("test-trip-table")
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java
index 684e9ec20..427b6b03e 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java
@@ -19,6 +19,7 @@ package com.uber.hoodie;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -40,6 +41,8 @@ import com.uber.hoodie.common.util.ParquetUtils;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieStorageConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
+import com.uber.hoodie.exception.HoodieCommitException;
+import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.table.HoodieTable;
import java.io.FileInputStream;
@@ -665,6 +668,42 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
}
+ /**
+ * Tests behavior of committing only when consistency is verified
+ */
+ @Test
+ public void testConsistencyCheckDuringFinalize() throws Exception {
+ HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
+ HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
+ basePath);
+
+ String commitTime = "000";
+ client.startCommitWithTime(commitTime);
+ JavaRDD writeRecords = jsc
+ .parallelize(dataGen.generateInserts(commitTime, 200), 1);
+ JavaRDD result = client.bulkInsert(writeRecords, commitTime);
+
+ // move one of the files & commit should fail
+ WriteStatus status = result.take(1).get(0);
+ Path origPath = new Path(basePath + "/" + status.getStat().getPath());
+ Path hidePath = new Path(basePath + "/" + status.getStat().getPath() + "_hide");
+ metaClient.getFs().rename(origPath, hidePath);
+
+ try {
+ client.commit(commitTime, result);
+ fail("Commit should fail due to consistency check");
+ } catch (HoodieCommitException cme) {
+ assertTrue(cme.getCause() instanceof HoodieIOException);
+ }
+
+ // Re-introduce & commit should succeed
+ metaClient.getFs().rename(hidePath, origPath);
+ assertTrue("Commit should succeed", client.commit(commitTime, result));
+ assertTrue("After explicit commit, commit file should be created",
+ HoodieTestUtils.doesCommitExist(basePath, commitTime));
+ }
+
/**
* Build Hoodie Write Config for small data file sizes
*/
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
index bde2bf6ad..f92466963 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.times;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.WriteStatus;
+import com.uber.hoodie.common.HoodieClientTestUtils;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTestUtils;
@@ -50,7 +51,6 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After;
@@ -101,8 +101,7 @@ public class TestHbaseIndex {
hbaseConfig = utility.getConnection().getConfiguration();
utility.createTable(TableName.valueOf(tableName), Bytes.toBytes("_s"));
// Initialize a local spark env
- SparkConf sparkConf = new SparkConf().setAppName("TestHbaseIndex").setMaster("local[1]");
- jsc = new JavaSparkContext(sparkConf);
+ jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHbaseIndex"));
jsc.hadoopConfiguration().addResource(utility.getConfiguration());
}
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestConsistencyCheck.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestConsistencyCheck.java
new file mode 100644
index 000000000..9a1c4d5b3
--- /dev/null
+++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestConsistencyCheck.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyList;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.uber.hoodie.common.HoodieClientTestUtils;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestConsistencyCheck {
+
+ private String basePath;
+ private JavaSparkContext jsc;
+
+ @Before
+ public void setup() throws IOException {
+ jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("ConsistencyCheckTest"));
+ TemporaryFolder testFolder = new TemporaryFolder();
+ testFolder.create();
+ basePath = testFolder.getRoot().getAbsolutePath();
+ }
+
+ @After
+ public void teardown() {
+ if (jsc != null) {
+ jsc.stop();
+ }
+ File testFolderPath = new File(basePath);
+ if (testFolderPath.exists()) {
+ testFolderPath.delete();
+ }
+ }
+
+ @Test
+ public void testExponentialBackoff() throws Exception {
+ HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
+ JavaSparkContext jscSpy = spy(jsc);
+
+ ConsistencyCheck failing = new ConsistencyCheck(basePath,
+ Arrays.asList("partition/path/f1_0_000.parquet", "partition/path/f2_0_000.parquet"),
+ jscSpy, 2);
+ long startMs = System.currentTimeMillis();
+ assertEquals(1, failing.check(5, 10).size());
+ assertTrue((System.currentTimeMillis() - startMs) > (10 + 20 + 40 + 80));
+ verify(jscSpy, times(5)).parallelize(anyList(), anyInt());
+ }
+
+ @Test
+ public void testCheckPassingAndFailing() throws Exception {
+ HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
+ HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f2");
+ HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f3");
+
+ ConsistencyCheck passing = new ConsistencyCheck(basePath,
+ Arrays.asList("partition/path/f1_0_000.parquet", "partition/path/f2_0_000.parquet"),
+ jsc, 2);
+ assertEquals(0, passing.check(1, 1000).size());
+
+ ConsistencyCheck failing = new ConsistencyCheck(basePath,
+ Arrays.asList("partition/path/f1_0_000.parquet", "partition/path/f4_0_000.parquet"),
+ jsc, 2);
+ assertEquals(1, failing.check(1, 1000).size());
+ }
+}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
index 12649c527..a2c990621 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
@@ -207,6 +207,20 @@ public class HoodieTableMetaClient implements Serializable {
return archivedTimeline;
}
+
+ /**
+ * Helper method to initialize a dataset, with given basePath, tableType, name, archiveFolder
+ */
+ public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath,
+ String tableType, String tableName, String archiveLogFolder) throws IOException {
+ HoodieTableType type = HoodieTableType.valueOf(tableType);
+ Properties properties = new Properties();
+ properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName);
+ properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, type.name());
+ properties.put(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, archiveLogFolder);
+ return HoodieTableMetaClient.initializePathAsHoodieDataset(hadoopConf, basePath, properties);
+ }
+
/**
* Helper method to initialize a given path, as a given storage type and table name
*/
diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala
index 1a79b352a..68ac868e8 100644
--- a/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala
+++ b/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala
@@ -19,12 +19,12 @@
package com.uber.hoodie
import java.util
+import java.util.Optional
import java.util.concurrent.ConcurrentHashMap
-import java.util.{Optional, Properties}
import com.uber.hoodie.DataSourceReadOptions._
import com.uber.hoodie.DataSourceWriteOptions._
-import com.uber.hoodie.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import com.uber.hoodie.common.table.HoodieTableMetaClient
import com.uber.hoodie.common.util.{FSUtils, TypedProperties}
import com.uber.hoodie.config.HoodieWriteConfig
import com.uber.hoodie.exception.HoodieException
@@ -205,11 +205,8 @@ class DefaultSource extends RelationProvider
// Create the dataset if not present (APPEND mode)
if (!exists) {
- val properties = new Properties();
- properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tblName.get);
- properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, storageType);
- properties.put(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived");
- HoodieTableMetaClient.initializePathAsHoodieDataset(sparkContext.hadoopConfiguration, path.get, properties);
+ HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, storageType,
+ tblName.get, "archived")
}
// Create a HoodieWriteClient & issue the write.
diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
index 211a171e0..835e7fa66 100644
--- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -54,7 +54,6 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
-import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
@@ -159,11 +158,8 @@ public class HoodieDeltaStreamer implements Serializable {
}
}
} else {
- Properties properties = new Properties();
- properties.put(HoodieWriteConfig.TABLE_NAME, cfg.targetTableName);
- HoodieTableMetaClient
- .initializePathAsHoodieDataset(jssc.hadoopConfiguration(), cfg.targetBasePath,
- properties);
+ HoodieTableMetaClient.initTableType(jssc.hadoopConfiguration(), cfg.targetBasePath,
+ cfg.storageType, cfg.targetTableName, "archived");
}
log.info("Checkpoint to resume from : " + resumeCheckpointStr);
@@ -247,6 +243,10 @@ public class HoodieDeltaStreamer implements Serializable {
@Parameter(names = {"--target-table"}, description = "name of the target table in Hive", required = true)
public String targetTableName;
+ @Parameter(names = {"--storage-type"}, description = "Type of Storage. "
+ + "COPY_ON_WRITE (or) MERGE_ON_READ", required = true)
+ public String storageType;
+
@Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+ "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
+ "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java
index d2ff8ace1..1a17a687c 100644
--- a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java
+++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java
@@ -89,6 +89,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips";
+ cfg.storageType = "COPY_ON_WRITE";
cfg.sourceClassName = TestDataSource.class.getName();
cfg.operation = op;
cfg.sourceOrderingField = "timestamp";
diff --git a/pom.xml b/pom.xml
index c86adfa11..62bfd162b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -625,7 +625,7 @@
junit
junit
- 4.12
+ ${junit.version}
org.apache.hadoop