Fixes HUDI-38: Reduce memory overhead of WriteStatus
- For implicit indexes (e.g BloomIndex), don't buffer up written records - By default, only collect 10% of failing records to avoid OOMs - Improves debuggability via above, since data errors can now show up in collect() - Unit tests & fixing subclasses & adjusting tests
This commit is contained in:
committed by
vinoth chandar
parent
e56c1612e4
commit
f1410bfdcd
@@ -25,12 +25,15 @@ import java.util.HashMap;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Status of a write operation.
|
* Status of a write operation.
|
||||||
*/
|
*/
|
||||||
public class WriteStatus implements Serializable {
|
public class WriteStatus implements Serializable {
|
||||||
|
|
||||||
|
private static final long RANDOM_SEED = 9038412832L;
|
||||||
|
|
||||||
private final HashMap<HoodieKey, Throwable> errors = new HashMap<>();
|
private final HashMap<HoodieKey, Throwable> errors = new HashMap<>();
|
||||||
|
|
||||||
private final List<HoodieRecord> writtenRecords = new ArrayList<>();
|
private final List<HoodieRecord> writtenRecords = new ArrayList<>();
|
||||||
@@ -48,6 +51,16 @@ public class WriteStatus implements Serializable {
|
|||||||
private long totalRecords = 0;
|
private long totalRecords = 0;
|
||||||
private long totalErrorRecords = 0;
|
private long totalErrorRecords = 0;
|
||||||
|
|
||||||
|
private final double failureFraction;
|
||||||
|
private final boolean trackSuccessRecords;
|
||||||
|
private final transient Random random;
|
||||||
|
|
||||||
|
public WriteStatus(Boolean trackSuccessRecords, Double failureFraction) {
|
||||||
|
this.trackSuccessRecords = trackSuccessRecords;
|
||||||
|
this.failureFraction = failureFraction;
|
||||||
|
this.random = new Random(RANDOM_SEED);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Mark write as success, optionally using given parameters for the purpose of calculating some
|
* Mark write as success, optionally using given parameters for the purpose of calculating some
|
||||||
* aggregate metrics. This method is not meant to cache passed arguments, since WriteStatus
|
* aggregate metrics. This method is not meant to cache passed arguments, since WriteStatus
|
||||||
@@ -58,9 +71,10 @@ public class WriteStatus implements Serializable {
|
|||||||
* @param optionalRecordMetadata optional metadata related to data contained in {@link
|
* @param optionalRecordMetadata optional metadata related to data contained in {@link
|
||||||
* HoodieRecord} before deflation.
|
* HoodieRecord} before deflation.
|
||||||
*/
|
*/
|
||||||
public void markSuccess(HoodieRecord record,
|
public void markSuccess(HoodieRecord record, Optional<Map<String, String>> optionalRecordMetadata) {
|
||||||
Optional<Map<String, String>> optionalRecordMetadata) {
|
if (trackSuccessRecords) {
|
||||||
writtenRecords.add(record);
|
writtenRecords.add(record);
|
||||||
|
}
|
||||||
totalRecords++;
|
totalRecords++;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -74,10 +88,11 @@ public class WriteStatus implements Serializable {
|
|||||||
* @param optionalRecordMetadata optional metadata related to data contained in {@link
|
* @param optionalRecordMetadata optional metadata related to data contained in {@link
|
||||||
* HoodieRecord} before deflation.
|
* HoodieRecord} before deflation.
|
||||||
*/
|
*/
|
||||||
public void markFailure(HoodieRecord record, Throwable t,
|
public void markFailure(HoodieRecord record, Throwable t, Optional<Map<String, String>> optionalRecordMetadata) {
|
||||||
Optional<Map<String, String>> optionalRecordMetadata) {
|
if (random.nextDouble() <= failureFraction) {
|
||||||
failedRecords.add(record);
|
failedRecords.add(record);
|
||||||
errors.put(record.getKey(), t);
|
errors.put(record.getKey(), t);
|
||||||
|
}
|
||||||
totalRecords++;
|
totalRecords++;
|
||||||
totalErrorRecords++;
|
totalErrorRecords++;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -52,6 +52,12 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig {
|
|||||||
// Default file path prefix for spillable file
|
// Default file path prefix for spillable file
|
||||||
public static final String DEFAULT_SPILLABLE_MAP_BASE_PATH = "/tmp/";
|
public static final String DEFAULT_SPILLABLE_MAP_BASE_PATH = "/tmp/";
|
||||||
|
|
||||||
|
// Property to control how what fraction of the failed record, exceptions we report back to driver.
|
||||||
|
public static final String WRITESTATUS_FAILURE_FRACTION_PROP = "hoodie.memory.writestatus.failure.fraction";
|
||||||
|
// Default is 10%. If set to 100%, with lot of failures, this can cause memory pressure, cause OOMs and
|
||||||
|
// mask actual data errors.
|
||||||
|
public static final double DEFAULT_WRITESTATUS_FAILURE_FRACTION = 0.1;
|
||||||
|
|
||||||
private HoodieMemoryConfig(Properties props) {
|
private HoodieMemoryConfig(Properties props) {
|
||||||
super(props);
|
super(props);
|
||||||
}
|
}
|
||||||
@@ -97,6 +103,11 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withWriteStatusFailureFraction(double failureFraction) {
|
||||||
|
props.setProperty(WRITESTATUS_FAILURE_FRACTION_PROP, String.valueOf(failureFraction));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Dynamic calculation of max memory to use for for spillable map. user.available.memory = spark.executor.memory *
|
* Dynamic calculation of max memory to use for for spillable map. user.available.memory = spark.executor.memory *
|
||||||
* (1 - spark.memory.fraction) spillable.available.memory = user.available.memory * hoodie.memory.fraction. Anytime
|
* (1 - spark.memory.fraction) spillable.available.memory = user.available.memory * hoodie.memory.fraction. Anytime
|
||||||
@@ -118,8 +129,8 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig {
|
|||||||
if (SparkEnv.get() != null) {
|
if (SparkEnv.get() != null) {
|
||||||
// 1 GB is the default conf used by Spark, look at SparkContext.scala
|
// 1 GB is the default conf used by Spark, look at SparkContext.scala
|
||||||
long executorMemoryInBytes = Utils.memoryStringToMb(SparkEnv.get().conf().get(SPARK_EXECUTOR_MEMORY_PROP,
|
long executorMemoryInBytes = Utils.memoryStringToMb(SparkEnv.get().conf().get(SPARK_EXECUTOR_MEMORY_PROP,
|
||||||
DEFAULT_SPARK_EXECUTOR_MEMORY_MB)) * 1024
|
DEFAULT_SPARK_EXECUTOR_MEMORY_MB)) * 1024
|
||||||
* 1024L;
|
* 1024L;
|
||||||
// 0.6 is the default value used by Spark,
|
// 0.6 is the default value used by Spark,
|
||||||
// look at {@link
|
// look at {@link
|
||||||
// https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkConf.scala#L507}
|
// https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkConf.scala#L507}
|
||||||
@@ -159,6 +170,9 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig {
|
|||||||
setDefaultOnCondition(props,
|
setDefaultOnCondition(props,
|
||||||
!props.containsKey(SPILLABLE_MAP_BASE_PATH_PROP),
|
!props.containsKey(SPILLABLE_MAP_BASE_PATH_PROP),
|
||||||
SPILLABLE_MAP_BASE_PATH_PROP, DEFAULT_SPILLABLE_MAP_BASE_PATH);
|
SPILLABLE_MAP_BASE_PATH_PROP, DEFAULT_SPILLABLE_MAP_BASE_PATH);
|
||||||
|
setDefaultOnCondition(props,
|
||||||
|
!props.containsKey(WRITESTATUS_FAILURE_FRACTION_PROP),
|
||||||
|
WRITESTATUS_FAILURE_FRACTION_PROP, String.valueOf(DEFAULT_WRITESTATUS_FAILURE_FRACTION));
|
||||||
return config;
|
return config;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -406,21 +406,21 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Long getMaxMemoryPerCompaction() {
|
public Long getMaxMemoryPerCompaction() {
|
||||||
return Long
|
return Long.valueOf(props.getProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION_PROP));
|
||||||
.valueOf(
|
|
||||||
props.getProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION_PROP));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getMaxDFSStreamBufferSize() {
|
public int getMaxDFSStreamBufferSize() {
|
||||||
return Integer
|
return Integer.valueOf(props.getProperty(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP));
|
||||||
.valueOf(
|
|
||||||
props.getProperty(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getSpillableMapBasePath() {
|
public String getSpillableMapBasePath() {
|
||||||
return props.getProperty(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH_PROP);
|
return props.getProperty(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH_PROP);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public double getWriteStatusFailureFraction() {
|
||||||
|
return Double.valueOf(props.getProperty(HoodieMemoryConfig.WRITESTATUS_FAILURE_FRACTION_PROP));
|
||||||
|
}
|
||||||
|
|
||||||
public static class Builder {
|
public static class Builder {
|
||||||
|
|
||||||
private final Properties props = new Properties();
|
private final Properties props = new Properties();
|
||||||
@@ -428,7 +428,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
private boolean isStorageConfigSet = false;
|
private boolean isStorageConfigSet = false;
|
||||||
private boolean isCompactionConfigSet = false;
|
private boolean isCompactionConfigSet = false;
|
||||||
private boolean isMetricsConfigSet = false;
|
private boolean isMetricsConfigSet = false;
|
||||||
private boolean isAutoCommit = true;
|
|
||||||
private boolean isMemoryConfigSet = false;
|
private boolean isMemoryConfigSet = false;
|
||||||
|
|
||||||
public Builder fromFile(File propertiesFile) throws IOException {
|
public Builder fromFile(File propertiesFile) throws IOException {
|
||||||
|
|||||||
@@ -61,7 +61,9 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
|
|||||||
this.originalSchema = new Schema.Parser().parse(config.getSchema());
|
this.originalSchema = new Schema.Parser().parse(config.getSchema());
|
||||||
this.writerSchema = createHoodieWriteSchema(originalSchema);
|
this.writerSchema = createHoodieWriteSchema(originalSchema);
|
||||||
this.timer = new HoodieTimer().startTimer();
|
this.timer = new HoodieTimer().startTimer();
|
||||||
this.writeStatus = ReflectionUtils.loadClass(config.getWriteStatusClassName());
|
this.writeStatus = (WriteStatus) ReflectionUtils.loadClass(config.getWriteStatusClassName(),
|
||||||
|
!hoodieTable.getIndex().isImplicitWithStorage(),
|
||||||
|
config.getWriteStatusFailureFraction());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
|
|||||||
import com.uber.hoodie.common.HoodieCleanStat;
|
import com.uber.hoodie.common.HoodieCleanStat;
|
||||||
import com.uber.hoodie.common.HoodieClientTestUtils;
|
import com.uber.hoodie.common.HoodieClientTestUtils;
|
||||||
import com.uber.hoodie.common.HoodieTestDataGenerator;
|
import com.uber.hoodie.common.HoodieTestDataGenerator;
|
||||||
|
import com.uber.hoodie.common.TestRawTripPayload;
|
||||||
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
|
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
|
||||||
import com.uber.hoodie.common.model.HoodieRecord;
|
import com.uber.hoodie.common.model.HoodieRecord;
|
||||||
import com.uber.hoodie.common.model.HoodieTableType;
|
import com.uber.hoodie.common.model.HoodieTableType;
|
||||||
@@ -144,6 +145,7 @@ public class TestHoodieClientBase implements Serializable {
|
|||||||
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
|
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
|
||||||
.withParallelism(2, 2)
|
.withParallelism(2, 2)
|
||||||
.withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
|
.withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
|
||||||
|
.withWriteStatusClass(TestRawTripPayload.MetadataMergeWriteStatus.class)
|
||||||
.withConsistencyCheckEnabled(true)
|
.withConsistencyCheckEnabled(true)
|
||||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
|
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
|
||||||
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
|
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
|
||||||
|
|||||||
@@ -0,0 +1,53 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.uber.hoodie;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import com.uber.hoodie.common.model.HoodieRecord;
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
public class TestWriteStatus {
|
||||||
|
@Test
|
||||||
|
public void testFailureFraction() throws IOException {
|
||||||
|
WriteStatus status = new WriteStatus(true, 0.1);
|
||||||
|
Throwable t = new Exception("some error in writing");
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
status.markFailure(Mockito.mock(HoodieRecord.class), t, null);
|
||||||
|
}
|
||||||
|
assertTrue(status.getFailedRecords().size() > 0);
|
||||||
|
assertTrue(status.getFailedRecords().size() < 150); //150 instead of 100, to prevent flaky test
|
||||||
|
assertTrue(status.hasErrors());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSuccessRecordTracking() {
|
||||||
|
WriteStatus status = new WriteStatus(false, 1.0);
|
||||||
|
Throwable t = new Exception("some error in writing");
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
status.markSuccess(Mockito.mock(HoodieRecord.class), null);
|
||||||
|
status.markFailure(Mockito.mock(HoodieRecord.class), t, null);
|
||||||
|
}
|
||||||
|
assertEquals(1000, status.getFailedRecords().size());
|
||||||
|
assertTrue(status.hasErrors());
|
||||||
|
assertTrue(status.getWrittenRecords().isEmpty());
|
||||||
|
assertEquals(2000, status.getTotalRecords());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -144,6 +144,10 @@ public class TestRawTripPayload implements HoodieRecordPayload<TestRawTripPayloa
|
|||||||
|
|
||||||
private Map<String, String> mergedMetadataMap = new HashMap<>();
|
private Map<String, String> mergedMetadataMap = new HashMap<>();
|
||||||
|
|
||||||
|
public MetadataMergeWriteStatus(Boolean trackSuccessRecords, Double failureFraction) {
|
||||||
|
super(trackSuccessRecords, failureFraction);
|
||||||
|
}
|
||||||
|
|
||||||
public static Map<String, String> mergeMetadataForWriteStatuses(List<WriteStatus> writeStatuses) {
|
public static Map<String, String> mergeMetadataForWriteStatuses(List<WriteStatus> writeStatuses) {
|
||||||
Map<String, String> allWriteStatusMergedMetadataMap = new HashMap<>();
|
Map<String, String> allWriteStatusMergedMetadataMap = new HashMap<>();
|
||||||
for (WriteStatus writeStatus : writeStatuses) {
|
for (WriteStatus writeStatus : writeStatuses) {
|
||||||
|
|||||||
@@ -337,7 +337,7 @@ public class TestHbaseIndex {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private WriteStatus getSampleWriteStatus(final int numInserts, final int numUpdateWrites) {
|
private WriteStatus getSampleWriteStatus(final int numInserts, final int numUpdateWrites) {
|
||||||
final WriteStatus writeStatus = new WriteStatus();
|
final WriteStatus writeStatus = new WriteStatus(false, 0.1);
|
||||||
HoodieWriteStat hoodieWriteStat = new HoodieWriteStat();
|
HoodieWriteStat hoodieWriteStat = new HoodieWriteStat();
|
||||||
hoodieWriteStat.setNumInserts(numInserts);
|
hoodieWriteStat.setNumInserts(numInserts);
|
||||||
hoodieWriteStat.setNumUpdateWrites(numUpdateWrites);
|
hoodieWriteStat.setNumUpdateWrites(numUpdateWrites);
|
||||||
|
|||||||
@@ -342,10 +342,10 @@ public class TestCopyOnWriteTable {
|
|||||||
assertEquals(2, returnedStatuses.size());
|
assertEquals(2, returnedStatuses.size());
|
||||||
assertEquals("2016/01/31", returnedStatuses.get(0).getPartitionPath());
|
assertEquals("2016/01/31", returnedStatuses.get(0).getPartitionPath());
|
||||||
assertEquals(0, returnedStatuses.get(0).getFailedRecords().size());
|
assertEquals(0, returnedStatuses.get(0).getFailedRecords().size());
|
||||||
assertEquals(10, returnedStatuses.get(0).getWrittenRecords().size());
|
assertEquals(10, returnedStatuses.get(0).getTotalRecords());
|
||||||
assertEquals("2016/02/01", returnedStatuses.get(1).getPartitionPath());
|
assertEquals("2016/02/01", returnedStatuses.get(1).getPartitionPath());
|
||||||
assertEquals(0, returnedStatuses.get(0).getFailedRecords().size());
|
assertEquals(0, returnedStatuses.get(0).getFailedRecords().size());
|
||||||
assertEquals(1, returnedStatuses.get(1).getWrittenRecords().size());
|
assertEquals(1, returnedStatuses.get(1).getTotalRecords());
|
||||||
|
|
||||||
// Case 2:
|
// Case 2:
|
||||||
// 1 record for partition 1, 5 record for partition 2, 1 records for partition 3.
|
// 1 record for partition 1, 5 record for partition 2, 1 records for partition 3.
|
||||||
@@ -358,13 +358,13 @@ public class TestCopyOnWriteTable {
|
|||||||
|
|
||||||
assertEquals(3, returnedStatuses.size());
|
assertEquals(3, returnedStatuses.size());
|
||||||
assertEquals("2016/01/31", returnedStatuses.get(0).getPartitionPath());
|
assertEquals("2016/01/31", returnedStatuses.get(0).getPartitionPath());
|
||||||
assertEquals(1, returnedStatuses.get(0).getWrittenRecords().size());
|
assertEquals(1, returnedStatuses.get(0).getTotalRecords());
|
||||||
|
|
||||||
assertEquals("2016/02/01", returnedStatuses.get(1).getPartitionPath());
|
assertEquals("2016/02/01", returnedStatuses.get(1).getPartitionPath());
|
||||||
assertEquals(5, returnedStatuses.get(1).getWrittenRecords().size());
|
assertEquals(5, returnedStatuses.get(1).getTotalRecords());
|
||||||
|
|
||||||
assertEquals("2016/02/02", returnedStatuses.get(2).getPartitionPath());
|
assertEquals("2016/02/02", returnedStatuses.get(2).getPartitionPath());
|
||||||
assertEquals(1, returnedStatuses.get(2).getWrittenRecords().size());
|
assertEquals(1, returnedStatuses.get(2).getTotalRecords());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user