From 9ca6f91e97d542eebf91756c3c78fa20f41d4247 Mon Sep 17 00:00:00 2001 From: vinothchandar Date: Thu, 20 Sep 2018 17:50:27 +0530 Subject: [PATCH] Perform consistency checks during write finalize - Check to ensure written files are listable on storage - Docs reflected to capture how this helps with s3 storage - Unit tests added, corrections to existing tests - Fix DeltaStreamer to manage archived commits in a separate folder --- docs/configurations.md | 2 + .../com/uber/hoodie/HoodieWriteClient.java | 51 ++++---- .../uber/hoodie/config/HoodieWriteConfig.java | 13 ++ .../com/uber/hoodie/io/ConsistencyCheck.java | 112 ++++++++++++++++++ .../hoodie/table/HoodieCopyOnWriteTable.java | 71 ++++++----- .../hoodie/table/HoodieMergeOnReadTable.java | 8 +- .../com/uber/hoodie/table/HoodieTable.java | 33 ++++-- .../java/com/uber/hoodie/TestCleaner.java | 7 +- .../com/uber/hoodie/TestHoodieClientBase.java | 2 + .../TestHoodieClientOnCopyOnWriteStorage.java | 39 ++++++ .../com/uber/hoodie/index/TestHbaseIndex.java | 5 +- .../uber/hoodie/io/TestConsistencyCheck.java | 91 ++++++++++++++ .../common/table/HoodieTableMetaClient.java | 14 +++ .../scala/com/uber/hoodie/DefaultSource.scala | 11 +- .../deltastreamer/HoodieDeltaStreamer.java | 12 +- .../utilities/TestHoodieDeltaStreamer.java | 1 + pom.xml | 2 +- 17 files changed, 381 insertions(+), 93 deletions(-) create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/io/ConsistencyCheck.java create mode 100644 hoodie-client/src/test/java/com/uber/hoodie/io/TestConsistencyCheck.java diff --git a/docs/configurations.md b/docs/configurations.md index 4639fb2e5..74d360854 100644 --- a/docs/configurations.md +++ b/docs/configurations.md @@ -25,6 +25,8 @@ summary: "Here we list all possible configurations and what they mean" Should HoodieWriteClient autoCommit after insert and upsert. The client can choose to turn off auto-commit and commit on a "defined success condition" - [withAssumeDatePartitioning](#withAssumeDatePartitioning) (assumeDatePartitioning = false)
Should HoodieWriteClient assume the data is partitioned by dates, i.e three levels from base path. This is a stop-gap to support tables created by versions < 0.3.1. Will be removed eventually + - [withConsistencyCheckEnabled](#withConsistencyCheckEnabled) (enabled = false)
+ Should HoodieWriteClient perform additional checks to ensure written files' are listable on the underlying filesystem/storage. Set this to true, to workaround S3's eventual consistency model and ensure all data written as a part of a commit is faithfully available for queries. - [withIndexConfig](#withIndexConfig) (HoodieIndexConfig)
Hoodie uses a index to help find the FileID which contains an incoming record key. This is pluggable to have a external index (HBase) or use the default bloom filter stored in the Parquet files diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 0e63ac167..147ba56a0 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -497,24 +497,25 @@ public class HoodieWriteClient implements Seriali new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); - - List> stats = writeStatuses.mapToPair( - (PairFunction) writeStatus -> new Tuple2<>( - writeStatus.getPartitionPath(), writeStatus.getStat())).collect(); - HoodieCommitMetadata metadata = new HoodieCommitMetadata(); - updateMetadataAndRollingStats(actionType, metadata, stats); + List writeStatusList = writeStatuses.collect(); + updateMetadataAndRollingStats(actionType, metadata, writeStatusList); // Finalize write final Timer.Context finalizeCtx = metrics.getFinalizeCtx(); - final Optional result = table.finalizeWrite(jsc, stats); - if (finalizeCtx != null && result.isPresent()) { - Optional durationInMs = Optional.of(metrics.getDurationInMs(finalizeCtx.stop())); - durationInMs.ifPresent(duration -> { - logger.info("Finalize write elapsed time (milliseconds): " + duration); - metrics.updateFinalizeWriteMetrics(duration, result.get()); - }); + try { + table.finalizeWrite(jsc, writeStatusList); + if (finalizeCtx != null) { + Optional durationInMs = Optional.of(metrics.getDurationInMs(finalizeCtx.stop())); + durationInMs.ifPresent(duration -> { + logger.info("Finalize write elapsed time (milliseconds): " + duration); + metrics.updateFinalizeWriteMetrics(duration, writeStatusList.size()); + }); + } + } catch (HoodieIOException ioe) { + throw new HoodieCommitException( + "Failed to complete commit " + commitTime + " due to finalize errors.", ioe); } // add in extra metadata @@ -555,11 +556,11 @@ public class HoodieWriteClient implements Seriali logger.info("Committed " + commitTime); } catch (IOException e) { throw new HoodieCommitException( - "Failed to commit " + config.getBasePath() + " at time " + commitTime, e); + "Failed to complete commit " + config.getBasePath() + " at time " + commitTime, e); } catch (ParseException e) { throw new HoodieCommitException( - "Commit time is not of valid format.Failed to commit " + config.getBasePath() - + " at time " + commitTime, e); + "Failed to complete commit " + config.getBasePath() + " at time " + commitTime + + "Instant time is not of valid format", e); } return true; } @@ -1258,8 +1259,8 @@ public class HoodieWriteClient implements Seriali return compactionInstantTimeOpt; } - private void updateMetadataAndRollingStats(String actionType, HoodieCommitMetadata metadata, List> stats) { + private void updateMetadataAndRollingStats(String actionType, HoodieCommitMetadata metadata, + List writeStatusList) { // TODO : make sure we cannot rollback / archive last commit file try { // Create a Hoodie table which encapsulated the commits and files visible @@ -1272,12 +1273,14 @@ public class HoodieWriteClient implements Seriali // 2. Now, first read the existing rolling stats and merge with the result of current metadata. // Need to do this on every commit (delta or commit) to support COW and MOR. - for (Tuple2 stat : stats) { - metadata.addWriteStat(stat._1(), stat._2()); - HoodieRollingStat hoodieRollingStat = new HoodieRollingStat(stat._2().getFileId(), - stat._2().getNumWrites() - (stat._2().getNumUpdateWrites() - stat._2.getNumDeletes()), - stat._2().getNumUpdateWrites(), stat._2.getNumDeletes(), stat._2().getTotalWriteBytes()); - rollingStatMetadata.addRollingStat(stat._1, hoodieRollingStat); + for (WriteStatus status : writeStatusList) { + HoodieWriteStat stat = status.getStat(); + //TODO: why is stat.getPartitionPath() null at times here. + metadata.addWriteStat(status.getPartitionPath(), stat); + HoodieRollingStat hoodieRollingStat = new HoodieRollingStat(stat.getFileId(), + stat.getNumWrites() - (stat.getNumUpdateWrites() - stat.getNumDeletes()), + stat.getNumUpdateWrites(), stat.getNumDeletes(), stat.getTotalWriteBytes()); + rollingStatMetadata.addRollingStat(status.getPartitionPath(), hoodieRollingStat); } // The last rolling stat should be present in the completed timeline Optional lastInstant = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants() diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index 2349860b1..07ddc9673 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -68,6 +68,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE = "false"; private static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism"; private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM; + private static final String CONSISTENCY_CHECK_ENABLED = "hoodie.consistency.check.enabled"; + private static final String DEFAULT_CONSISTENCY_CHECK_ENABLED = "false"; private HoodieWriteConfig(Properties props) { super(props); @@ -150,6 +152,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM)); } + public boolean isConsistencyCheckEnabled() { + return Boolean.parseBoolean(props.getProperty(CONSISTENCY_CHECK_ENABLED)); + } + /** * compaction properties **/ @@ -551,6 +557,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder withConsistencyCheckEnabled(boolean enabled) { + props.setProperty(CONSISTENCY_CHECK_ENABLED, String.valueOf(enabled)); + return this; + } + public HoodieWriteConfig build() { HoodieWriteConfig config = new HoodieWriteConfig(props); // Check for mandatory properties @@ -581,6 +592,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE); setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM), FINALIZE_WRITE_PARALLELISM, DEFAULT_FINALIZE_WRITE_PARALLELISM); + setDefaultOnCondition(props, !props.containsKey(CONSISTENCY_CHECK_ENABLED), + CONSISTENCY_CHECK_ENABLED, DEFAULT_CONSISTENCY_CHECK_ENABLED); // Make sure the props is propagated setDefaultOnCondition(props, !isIndexConfigSet, diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/ConsistencyCheck.java b/hoodie-client/src/main/java/com/uber/hoodie/io/ConsistencyCheck.java new file mode 100644 index 000000000..43913089e --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/ConsistencyCheck.java @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.io; + +import com.google.common.annotations.VisibleForTesting; +import com.uber.hoodie.common.SerializableConfiguration; +import com.uber.hoodie.common.util.FSUtils; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; + +/** + * Checks if all the written paths have their metadata consistent on storage and thus be listable to + * queries. This is important for cloud, stores like AWS S3 which are eventually consistent with + * their metadata. Without such checks, we may proceed to commit the written data, without the + * written data being made available to queries. In cases like incremental pull this can lead to + * downstream readers failing to ever see some data. + */ +public class ConsistencyCheck implements Serializable { + + private static final transient Logger log = LogManager.getLogger(ConsistencyCheck.class); + + private String basePath; + + private List relPaths; + + private transient JavaSparkContext jsc; + + private SerializableConfiguration hadoopConf; + + private int parallelism; + + public ConsistencyCheck(String basePath, List relPaths, JavaSparkContext jsc, + int parallelism) { + this.basePath = basePath; + this.relPaths = relPaths; + this.jsc = jsc; + this.hadoopConf = new SerializableConfiguration(jsc.hadoopConfiguration()); + this.parallelism = parallelism; + } + + @VisibleForTesting + void sleepSafe(long waitMs) { + try { + Thread.sleep(waitMs); + } catch (InterruptedException e) { + // ignore & continue next attempt + } + } + + /** + * Repeatedly lists the filesystem on the paths, with exponential backoff and marks paths found as + * passing the check. + * + * @return list of (relative) paths failing the check + */ + public List check(int maxAttempts, long initalDelayMs) { + long waitMs = initalDelayMs; + int attempt = 0; + + List remainingPaths = new ArrayList<>(relPaths); + while (attempt++ < maxAttempts) { + remainingPaths = jsc.parallelize(remainingPaths, parallelism) + .groupBy(p -> new Path(basePath, p).getParent()) // list by partition + .map(pair -> { + FileSystem fs = FSUtils.getFs(basePath, hadoopConf.get()); + // list the partition path and obtain all file paths present + Set fileNames = Arrays.stream(fs.listStatus(pair._1())) + .map(s -> s.getPath().getName()) + .collect(Collectors.toSet()); + + // only return paths that can't be found + return StreamSupport.stream(pair._2().spliterator(), false) + .filter(p -> !fileNames.contains(new Path(basePath, p).getName())) + .collect(Collectors.toList()); + }) + .flatMap(itr -> itr.iterator()).collect(); + if (remainingPaths.size() == 0) { + break; // we are done. + } + + log.info("Consistency check, waiting for " + waitMs + " ms , after attempt :" + attempt); + sleepSafe(waitMs); + waitMs = waitMs * 2; // double check interval every attempt + } + + return remainingPaths; + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index d8a0f484b..81efc83d9 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -29,7 +29,6 @@ import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieRollingStatMetadata; -import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; @@ -382,44 +381,40 @@ public class HoodieCopyOnWriteTable extends Hoodi */ @Override @SuppressWarnings("unchecked") - public Optional finalizeWrite(JavaSparkContext jsc, List writeStatuses) { - if (!config.shouldUseTempFolderForCopyOnWrite()) { - return Optional.empty(); + public void finalizeWrite(JavaSparkContext jsc, List writeStatuses) + throws HoodieIOException { + + super.finalizeWrite(jsc, writeStatuses); + + if (config.shouldUseTempFolderForCopyOnWrite()) { + // This is to rename each data file from temporary path to its final location + jsc.parallelize(writeStatuses, config.getFinalizeWriteParallelism()) + .map(status -> status.getStat()) + .foreach(writeStat -> { + final FileSystem fs = getMetaClient().getFs(); + final Path finalPath = new Path(config.getBasePath(), writeStat.getPath()); + + if (writeStat.getTempPath() != null) { + final Path tempPath = new Path(config.getBasePath(), writeStat.getTempPath()); + boolean success; + try { + logger.info("Renaming temporary file: " + tempPath + " to " + finalPath); + success = fs.rename(tempPath, finalPath); + } catch (IOException e) { + throw new HoodieIOException( + "Failed to rename file: " + tempPath + " to " + finalPath); + } + + if (!success) { + throw new HoodieIOException( + "Failed to rename file: " + tempPath + " to " + finalPath); + } + } + }); + + // clean temporary data files + cleanTemporaryDataFiles(jsc); } - - // This is to rename each data file from temporary path to its final location - List> results = jsc - .parallelize(writeStatuses, config.getFinalizeWriteParallelism()).map(writeStatus -> { - Tuple2 writeStatTuple2 = (Tuple2) - writeStatus; - HoodieWriteStat writeStat = writeStatTuple2._2(); - final FileSystem fs = getMetaClient().getFs(); - final Path finalPath = new Path(config.getBasePath(), writeStat.getPath()); - - if (writeStat.getTempPath() != null) { - final Path tempPath = new Path(config.getBasePath(), writeStat.getTempPath()); - boolean success; - try { - logger.info("Renaming temporary file: " + tempPath + " to " + finalPath); - success = fs.rename(tempPath, finalPath); - } catch (IOException e) { - throw new HoodieIOException( - "Failed to rename file: " + tempPath + " to " + finalPath); - } - - if (!success) { - throw new HoodieIOException( - "Failed to rename file: " + tempPath + " to " + finalPath); - } - } - - return new Tuple2<>(writeStat.getPath(), true); - }).collect(); - - // clean temporary data files - cleanTemporaryDataFiles(jsc); - - return Optional.of(results.size()); } /** diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index 77e6ddf4f..9968d72d4 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -43,6 +43,7 @@ import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieCompactionException; import com.uber.hoodie.exception.HoodieException; +import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieRollbackException; import com.uber.hoodie.exception.HoodieUpsertException; import com.uber.hoodie.func.MergeOnReadLazyInsertIterable; @@ -294,9 +295,10 @@ public class HoodieMergeOnReadTable extends } @Override - public Optional finalizeWrite(JavaSparkContext jsc, List writeStatuses) { - // do nothing for MOR tables - return Optional.empty(); + public void finalizeWrite(JavaSparkContext jsc, List writeStatuses) + throws HoodieIOException { + // delegate to base class for MOR tables + super.finalizeWrite(jsc, writeStatuses); } @Override diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index 4ef33f9b1..28de32b90 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -23,7 +23,6 @@ import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.HoodieRollbackStat; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; -import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; @@ -33,26 +32,31 @@ import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.common.util.AvroUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieException; +import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieSavepointException; import com.uber.hoodie.index.HoodieIndex; +import com.uber.hoodie.io.ConsistencyCheck; import java.io.IOException; import java.io.Serializable; import java.util.Iterator; import java.util.List; -import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import scala.Tuple2; /** * Abstract implementation of a HoodieTable */ public abstract class HoodieTable implements Serializable { + // time between successive attempts to ensure written data's metadata is consistent on storage + private static long INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = 2000L; + // maximum number of checks, for consistency of written data. Will wait upto 256 Secs + private static int MAX_CONSISTENCY_CHECKS = 7; + protected final HoodieWriteConfig config; protected final HoodieTableMetaClient metaClient; protected final HoodieIndex index; @@ -245,11 +249,26 @@ public abstract class HoodieTable implements Seri throws IOException; /** - * Finalize the written data files + * Finalize the written data onto storage. Perform any final cleanups * + * @param jsc Spark Context * @param writeStatuses List of WriteStatus - * @return number of files finalized + * @throws HoodieIOException if some paths can't be finalized on storage */ - public abstract Optional finalizeWrite(JavaSparkContext jsc, - List> writeStatuses); + public void finalizeWrite(JavaSparkContext jsc, List writeStatuses) + throws HoodieIOException { + if (config.isConsistencyCheckEnabled()) { + List pathsToCheck = writeStatuses.stream() + .map(ws -> ws.getStat().getTempPath() != null + ? ws.getStat().getTempPath() : ws.getStat().getPath()) + .collect(Collectors.toList()); + + List failingPaths = new ConsistencyCheck(config.getBasePath(), pathsToCheck, jsc, + config.getFinalizeWriteParallelism()) + .check(MAX_CONSISTENCY_CHECKS, INITIAL_CONSISTENCY_CHECK_INTERVAL_MS); + if (failingPaths.size() > 0) { + throw new HoodieIOException("Could not verify consistency of paths : " + failingPaths); + } + } + } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java index 2d2417cca..03f895b27 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java @@ -192,6 +192,7 @@ public class TestCleaner extends TestHoodieClientBase { HoodieCompactionConfig.newBuilder().withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) .retainFileVersions(maxVersions).build()) .withParallelism(1, 1).withBulkInsertParallelism(1) + .withFinalizeWriteParallelism(1).withConsistencyCheckEnabled(true) .build(); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); @@ -271,9 +272,6 @@ public class TestCleaner extends TestHoodieClientBase { for (HoodieFileGroup fileGroup : fileGroups) { if (selectedFileIdForCompaction.containsKey(fileGroup.getId())) { // Ensure latest file-slice selected for compaction is retained - String oldestCommitRetained = - fileGroup.getAllDataFiles().map(HoodieDataFile::getCommitTime).sorted().findFirst().get(); - Optional dataFileForCompactionPresent = fileGroup.getAllDataFiles().filter(df -> { return compactionFileIdToLatestFileSlice.get(fileGroup.getId()) @@ -357,7 +355,8 @@ public class TestCleaner extends TestHoodieClientBase { HoodieWriteConfig cfg = getConfigBuilder().withCompactionConfig( HoodieCompactionConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainCommits(maxCommits).build()) - .withParallelism(1, 1).withBulkInsertParallelism(1).build(); + .withParallelism(1, 1).withBulkInsertParallelism(1) + .withFinalizeWriteParallelism(1).withConsistencyCheckEnabled(true).build(); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); final Function2, String, Integer> recordInsertGenWrappedFunction = diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java index d33697baf..282601d4d 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java @@ -143,6 +143,8 @@ public class TestHoodieClientBase implements Serializable { HoodieWriteConfig.Builder getConfigBuilder() { return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2) + .withBulkInsertParallelism(2).withFinalizeWriteParallelism(2) + .withConsistencyCheckEnabled(true) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) .forTable("test-trip-table") diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java index 684e9ec20..427b6b03e 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java @@ -19,6 +19,7 @@ package com.uber.hoodie; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -40,6 +41,8 @@ import com.uber.hoodie.common.util.ParquetUtils; import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieStorageConfig; import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.exception.HoodieCommitException; +import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.table.HoodieTable; import java.io.FileInputStream; @@ -665,6 +668,42 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { } + /** + * Tests behavior of committing only when consistency is verified + */ + @Test + public void testConsistencyCheckDuringFinalize() throws Exception { + HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), + basePath); + + String commitTime = "000"; + client.startCommitWithTime(commitTime); + JavaRDD writeRecords = jsc + .parallelize(dataGen.generateInserts(commitTime, 200), 1); + JavaRDD result = client.bulkInsert(writeRecords, commitTime); + + // move one of the files & commit should fail + WriteStatus status = result.take(1).get(0); + Path origPath = new Path(basePath + "/" + status.getStat().getPath()); + Path hidePath = new Path(basePath + "/" + status.getStat().getPath() + "_hide"); + metaClient.getFs().rename(origPath, hidePath); + + try { + client.commit(commitTime, result); + fail("Commit should fail due to consistency check"); + } catch (HoodieCommitException cme) { + assertTrue(cme.getCause() instanceof HoodieIOException); + } + + // Re-introduce & commit should succeed + metaClient.getFs().rename(hidePath, origPath); + assertTrue("Commit should succeed", client.commit(commitTime, result)); + assertTrue("After explicit commit, commit file should be created", + HoodieTestUtils.doesCommitExist(basePath, commitTime)); + } + /** * Build Hoodie Write Config for small data file sizes */ diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java index bde2bf6ad..f92466963 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.times; import com.uber.hoodie.HoodieWriteClient; import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieTestUtils; @@ -50,7 +51,6 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.junit.After; @@ -101,8 +101,7 @@ public class TestHbaseIndex { hbaseConfig = utility.getConnection().getConfiguration(); utility.createTable(TableName.valueOf(tableName), Bytes.toBytes("_s")); // Initialize a local spark env - SparkConf sparkConf = new SparkConf().setAppName("TestHbaseIndex").setMaster("local[1]"); - jsc = new JavaSparkContext(sparkConf); + jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHbaseIndex")); jsc.hadoopConfiguration().addResource(utility.getConfiguration()); } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestConsistencyCheck.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestConsistencyCheck.java new file mode 100644 index 000000000..9a1c4d5b3 --- /dev/null +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestConsistencyCheck.java @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyList; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.uber.hoodie.common.HoodieClientTestUtils; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestConsistencyCheck { + + private String basePath; + private JavaSparkContext jsc; + + @Before + public void setup() throws IOException { + jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("ConsistencyCheckTest")); + TemporaryFolder testFolder = new TemporaryFolder(); + testFolder.create(); + basePath = testFolder.getRoot().getAbsolutePath(); + } + + @After + public void teardown() { + if (jsc != null) { + jsc.stop(); + } + File testFolderPath = new File(basePath); + if (testFolderPath.exists()) { + testFolderPath.delete(); + } + } + + @Test + public void testExponentialBackoff() throws Exception { + HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + JavaSparkContext jscSpy = spy(jsc); + + ConsistencyCheck failing = new ConsistencyCheck(basePath, + Arrays.asList("partition/path/f1_0_000.parquet", "partition/path/f2_0_000.parquet"), + jscSpy, 2); + long startMs = System.currentTimeMillis(); + assertEquals(1, failing.check(5, 10).size()); + assertTrue((System.currentTimeMillis() - startMs) > (10 + 20 + 40 + 80)); + verify(jscSpy, times(5)).parallelize(anyList(), anyInt()); + } + + @Test + public void testCheckPassingAndFailing() throws Exception { + HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f2"); + HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f3"); + + ConsistencyCheck passing = new ConsistencyCheck(basePath, + Arrays.asList("partition/path/f1_0_000.parquet", "partition/path/f2_0_000.parquet"), + jsc, 2); + assertEquals(0, passing.check(1, 1000).size()); + + ConsistencyCheck failing = new ConsistencyCheck(basePath, + Arrays.asList("partition/path/f1_0_000.parquet", "partition/path/f4_0_000.parquet"), + jsc, 2); + assertEquals(1, failing.check(1, 1000).size()); + } +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java index 12649c527..a2c990621 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java @@ -207,6 +207,20 @@ public class HoodieTableMetaClient implements Serializable { return archivedTimeline; } + + /** + * Helper method to initialize a dataset, with given basePath, tableType, name, archiveFolder + */ + public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, + String tableType, String tableName, String archiveLogFolder) throws IOException { + HoodieTableType type = HoodieTableType.valueOf(tableType); + Properties properties = new Properties(); + properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName); + properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, type.name()); + properties.put(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, archiveLogFolder); + return HoodieTableMetaClient.initializePathAsHoodieDataset(hadoopConf, basePath, properties); + } + /** * Helper method to initialize a given path, as a given storage type and table name */ diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala index 1a79b352a..68ac868e8 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala @@ -19,12 +19,12 @@ package com.uber.hoodie import java.util +import java.util.Optional import java.util.concurrent.ConcurrentHashMap -import java.util.{Optional, Properties} import com.uber.hoodie.DataSourceReadOptions._ import com.uber.hoodie.DataSourceWriteOptions._ -import com.uber.hoodie.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import com.uber.hoodie.common.table.HoodieTableMetaClient import com.uber.hoodie.common.util.{FSUtils, TypedProperties} import com.uber.hoodie.config.HoodieWriteConfig import com.uber.hoodie.exception.HoodieException @@ -205,11 +205,8 @@ class DefaultSource extends RelationProvider // Create the dataset if not present (APPEND mode) if (!exists) { - val properties = new Properties(); - properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tblName.get); - properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, storageType); - properties.put(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived"); - HoodieTableMetaClient.initializePathAsHoodieDataset(sparkContext.hadoopConfiguration, path.get, properties); + HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, storageType, + tblName.get, "archived") } // Create a HoodieWriteClient & issue the write. diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java index 211a171e0..835e7fa66 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -54,7 +54,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Optional; -import java.util.Properties; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileSystem; @@ -159,11 +158,8 @@ public class HoodieDeltaStreamer implements Serializable { } } } else { - Properties properties = new Properties(); - properties.put(HoodieWriteConfig.TABLE_NAME, cfg.targetTableName); - HoodieTableMetaClient - .initializePathAsHoodieDataset(jssc.hadoopConfiguration(), cfg.targetBasePath, - properties); + HoodieTableMetaClient.initTableType(jssc.hadoopConfiguration(), cfg.targetBasePath, + cfg.storageType, cfg.targetTableName, "archived"); } log.info("Checkpoint to resume from : " + resumeCheckpointStr); @@ -247,6 +243,10 @@ public class HoodieDeltaStreamer implements Serializable { @Parameter(names = {"--target-table"}, description = "name of the target table in Hive", required = true) public String targetTableName; + @Parameter(names = {"--storage-type"}, description = "Type of Storage. " + + "COPY_ON_WRITE (or) MERGE_ON_READ", required = true) + public String storageType; + @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for " + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are " + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer" diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java index d2ff8ace1..1a17a687c 100644 --- a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java @@ -89,6 +89,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); cfg.targetBasePath = basePath; cfg.targetTableName = "hoodie_trips"; + cfg.storageType = "COPY_ON_WRITE"; cfg.sourceClassName = TestDataSource.class.getName(); cfg.operation = op; cfg.sourceOrderingField = "timestamp"; diff --git a/pom.xml b/pom.xml index c86adfa11..62bfd162b 100644 --- a/pom.xml +++ b/pom.xml @@ -625,7 +625,7 @@ junit junit - 4.12 + ${junit.version} org.apache.hadoop