diff --git a/hoodie-client/src/main/java/com/uber/hoodie/WriteStatus.java b/hoodie-client/src/main/java/com/uber/hoodie/WriteStatus.java index ac54d890c..54b32f099 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/WriteStatus.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/WriteStatus.java @@ -25,12 +25,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Random; /** * Status of a write operation. */ public class WriteStatus implements Serializable { + private static final long RANDOM_SEED = 9038412832L; + private final HashMap errors = new HashMap<>(); private final List writtenRecords = new ArrayList<>(); @@ -48,6 +51,16 @@ public class WriteStatus implements Serializable { private long totalRecords = 0; private long totalErrorRecords = 0; + private final double failureFraction; + private final boolean trackSuccessRecords; + private final transient Random random; + + public WriteStatus(Boolean trackSuccessRecords, Double failureFraction) { + this.trackSuccessRecords = trackSuccessRecords; + this.failureFraction = failureFraction; + this.random = new Random(RANDOM_SEED); + } + /** * Mark write as success, optionally using given parameters for the purpose of calculating some * aggregate metrics. This method is not meant to cache passed arguments, since WriteStatus @@ -58,9 +71,10 @@ public class WriteStatus implements Serializable { * @param optionalRecordMetadata optional metadata related to data contained in {@link * HoodieRecord} before deflation. */ - public void markSuccess(HoodieRecord record, - Optional> optionalRecordMetadata) { - writtenRecords.add(record); + public void markSuccess(HoodieRecord record, Optional> optionalRecordMetadata) { + if (trackSuccessRecords) { + writtenRecords.add(record); + } totalRecords++; } @@ -74,10 +88,11 @@ public class WriteStatus implements Serializable { * @param optionalRecordMetadata optional metadata related to data contained in {@link * HoodieRecord} before deflation. */ - public void markFailure(HoodieRecord record, Throwable t, - Optional> optionalRecordMetadata) { - failedRecords.add(record); - errors.put(record.getKey(), t); + public void markFailure(HoodieRecord record, Throwable t, Optional> optionalRecordMetadata) { + if (random.nextDouble() <= failureFraction) { + failedRecords.add(record); + errors.put(record.getKey(), t); + } totalRecords++; totalErrorRecords++; } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieMemoryConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieMemoryConfig.java index 5746e8d16..051ac16d0 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieMemoryConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieMemoryConfig.java @@ -52,6 +52,12 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig { // Default file path prefix for spillable file public static final String DEFAULT_SPILLABLE_MAP_BASE_PATH = "/tmp/"; + // Property to control how what fraction of the failed record, exceptions we report back to driver. + public static final String WRITESTATUS_FAILURE_FRACTION_PROP = "hoodie.memory.writestatus.failure.fraction"; + // Default is 10%. If set to 100%, with lot of failures, this can cause memory pressure, cause OOMs and + // mask actual data errors. + public static final double DEFAULT_WRITESTATUS_FAILURE_FRACTION = 0.1; + private HoodieMemoryConfig(Properties props) { super(props); } @@ -97,6 +103,11 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig { return this; } + public Builder withWriteStatusFailureFraction(double failureFraction) { + props.setProperty(WRITESTATUS_FAILURE_FRACTION_PROP, String.valueOf(failureFraction)); + return this; + } + /** * Dynamic calculation of max memory to use for for spillable map. user.available.memory = spark.executor.memory * * (1 - spark.memory.fraction) spillable.available.memory = user.available.memory * hoodie.memory.fraction. Anytime @@ -118,8 +129,8 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig { if (SparkEnv.get() != null) { // 1 GB is the default conf used by Spark, look at SparkContext.scala long executorMemoryInBytes = Utils.memoryStringToMb(SparkEnv.get().conf().get(SPARK_EXECUTOR_MEMORY_PROP, - DEFAULT_SPARK_EXECUTOR_MEMORY_MB)) * 1024 - * 1024L; + DEFAULT_SPARK_EXECUTOR_MEMORY_MB)) * 1024 + * 1024L; // 0.6 is the default value used by Spark, // look at {@link // https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkConf.scala#L507} @@ -159,6 +170,9 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig { setDefaultOnCondition(props, !props.containsKey(SPILLABLE_MAP_BASE_PATH_PROP), SPILLABLE_MAP_BASE_PATH_PROP, DEFAULT_SPILLABLE_MAP_BASE_PATH); + setDefaultOnCondition(props, + !props.containsKey(WRITESTATUS_FAILURE_FRACTION_PROP), + WRITESTATUS_FAILURE_FRACTION_PROP, String.valueOf(DEFAULT_WRITESTATUS_FAILURE_FRACTION)); return config; } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index 67b38d553..7156623f6 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -406,21 +406,21 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { } public Long getMaxMemoryPerCompaction() { - return Long - .valueOf( - props.getProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION_PROP)); + return Long.valueOf(props.getProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION_PROP)); } public int getMaxDFSStreamBufferSize() { - return Integer - .valueOf( - props.getProperty(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP)); + return Integer.valueOf(props.getProperty(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP)); } public String getSpillableMapBasePath() { return props.getProperty(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH_PROP); } + public double getWriteStatusFailureFraction() { + return Double.valueOf(props.getProperty(HoodieMemoryConfig.WRITESTATUS_FAILURE_FRACTION_PROP)); + } + public static class Builder { private final Properties props = new Properties(); @@ -428,7 +428,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private boolean isStorageConfigSet = false; private boolean isCompactionConfigSet = false; private boolean isMetricsConfigSet = false; - private boolean isAutoCommit = true; private boolean isMemoryConfigSet = false; public Builder fromFile(File propertiesFile) throws IOException { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java index b4a03d180..a1f8f63fd 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java @@ -61,7 +61,9 @@ public abstract class HoodieIOHandle { this.originalSchema = new Schema.Parser().parse(config.getSchema()); this.writerSchema = createHoodieWriteSchema(originalSchema); this.timer = new HoodieTimer().startTimer(); - this.writeStatus = ReflectionUtils.loadClass(config.getWriteStatusClassName()); + this.writeStatus = (WriteStatus) ReflectionUtils.loadClass(config.getWriteStatusClassName(), + !hoodieTable.getIndex().isImplicitWithStorage(), + config.getWriteStatusFailureFraction()); } /** diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java index 282601d4d..2df095e84 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.HoodieTestDataGenerator; +import com.uber.hoodie.common.TestRawTripPayload; import com.uber.hoodie.common.model.HoodiePartitionMetadata; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieTableType; @@ -144,6 +145,7 @@ public class TestHoodieClientBase implements Serializable { return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2) .withBulkInsertParallelism(2).withFinalizeWriteParallelism(2) + .withWriteStatusClass(TestRawTripPayload.MetadataMergeWriteStatus.class) .withConsistencyCheckEnabled(true) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestWriteStatus.java b/hoodie-client/src/test/java/com/uber/hoodie/TestWriteStatus.java new file mode 100644 index 000000000..08035ab0c --- /dev/null +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestWriteStatus.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.uber.hoodie.common.model.HoodieRecord; +import java.io.IOException; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestWriteStatus { + @Test + public void testFailureFraction() throws IOException { + WriteStatus status = new WriteStatus(true, 0.1); + Throwable t = new Exception("some error in writing"); + for (int i = 0; i < 1000; i++) { + status.markFailure(Mockito.mock(HoodieRecord.class), t, null); + } + assertTrue(status.getFailedRecords().size() > 0); + assertTrue(status.getFailedRecords().size() < 150); //150 instead of 100, to prevent flaky test + assertTrue(status.hasErrors()); + } + + @Test + public void testSuccessRecordTracking() { + WriteStatus status = new WriteStatus(false, 1.0); + Throwable t = new Exception("some error in writing"); + for (int i = 0; i < 1000; i++) { + status.markSuccess(Mockito.mock(HoodieRecord.class), null); + status.markFailure(Mockito.mock(HoodieRecord.class), t, null); + } + assertEquals(1000, status.getFailedRecords().size()); + assertTrue(status.hasErrors()); + assertTrue(status.getWrittenRecords().isEmpty()); + assertEquals(2000, status.getTotalRecords()); + } +} diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/TestRawTripPayload.java b/hoodie-client/src/test/java/com/uber/hoodie/common/TestRawTripPayload.java index b73cba791..e0a1916d7 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/TestRawTripPayload.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/TestRawTripPayload.java @@ -144,6 +144,10 @@ public class TestRawTripPayload implements HoodieRecordPayload mergedMetadataMap = new HashMap<>(); + public MetadataMergeWriteStatus(Boolean trackSuccessRecords, Double failureFraction) { + super(trackSuccessRecords, failureFraction); + } + public static Map mergeMetadataForWriteStatuses(List writeStatuses) { Map allWriteStatusMergedMetadataMap = new HashMap<>(); for (WriteStatus writeStatus : writeStatuses) { diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java index f92466963..16e67c403 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java @@ -337,7 +337,7 @@ public class TestHbaseIndex { } private WriteStatus getSampleWriteStatus(final int numInserts, final int numUpdateWrites) { - final WriteStatus writeStatus = new WriteStatus(); + final WriteStatus writeStatus = new WriteStatus(false, 0.1); HoodieWriteStat hoodieWriteStat = new HoodieWriteStat(); hoodieWriteStat.setNumInserts(numInserts); hoodieWriteStat.setNumUpdateWrites(numUpdateWrites); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java index 3db10a493..714f0e5cd 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java @@ -342,10 +342,10 @@ public class TestCopyOnWriteTable { assertEquals(2, returnedStatuses.size()); assertEquals("2016/01/31", returnedStatuses.get(0).getPartitionPath()); assertEquals(0, returnedStatuses.get(0).getFailedRecords().size()); - assertEquals(10, returnedStatuses.get(0).getWrittenRecords().size()); + assertEquals(10, returnedStatuses.get(0).getTotalRecords()); assertEquals("2016/02/01", returnedStatuses.get(1).getPartitionPath()); assertEquals(0, returnedStatuses.get(0).getFailedRecords().size()); - assertEquals(1, returnedStatuses.get(1).getWrittenRecords().size()); + assertEquals(1, returnedStatuses.get(1).getTotalRecords()); // Case 2: // 1 record for partition 1, 5 record for partition 2, 1 records for partition 3. @@ -358,13 +358,13 @@ public class TestCopyOnWriteTable { assertEquals(3, returnedStatuses.size()); assertEquals("2016/01/31", returnedStatuses.get(0).getPartitionPath()); - assertEquals(1, returnedStatuses.get(0).getWrittenRecords().size()); + assertEquals(1, returnedStatuses.get(0).getTotalRecords()); assertEquals("2016/02/01", returnedStatuses.get(1).getPartitionPath()); - assertEquals(5, returnedStatuses.get(1).getWrittenRecords().size()); + assertEquals(5, returnedStatuses.get(1).getTotalRecords()); assertEquals("2016/02/02", returnedStatuses.get(2).getPartitionPath()); - assertEquals(1, returnedStatuses.get(2).getWrittenRecords().size()); + assertEquals(1, returnedStatuses.get(2).getTotalRecords()); }