From b34a204a527a156406908686e54484a0c3d8a3d7 Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Tue, 12 Mar 2019 15:59:41 -0700 Subject: [PATCH] Fixing small file handling, inline compaction defaults - Small file limit is now 100MB by default - Turned on inline compaction by default for MOR - Changes take effect on DataSource and DeltaStreamer --- .../main/java/com/uber/hoodie/HoodieWriteClient.java | 8 +++----- .../com/uber/hoodie/config/HoodieCompactionConfig.java | 4 ++-- .../src/main/java/com/uber/hoodie/DataSourceUtils.java | 7 +++++++ hoodie-spark/src/test/scala/DataSourceTest.scala | 3 ++- .../utilities/deltastreamer/HoodieDeltaStreamer.java | 10 ++++++++-- 5 files changed, 22 insertions(+), 10 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 1f584de20..632c13df9 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -528,8 +528,7 @@ public class HoodieWriteClient implements Seriali try { activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, commitTime), Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - // Save was a success - // Do a inline compaction if enabled + // Save was a success & Do a inline compaction if enabled if (config.isInlineCompaction()) { metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true"); forceCompact(extraMetadata); @@ -1103,7 +1102,7 @@ public class HoodieWriteClient implements Seriali HoodieTimeline.compareTimestamps(instant.getTimestamp(), instantTime, HoodieTimeline.GREATER_OR_EQUAL)).collect(Collectors.toList()); Preconditions.checkArgument(conflictingInstants.isEmpty(), - "Following instants have timestamps >= compactionInstant. Instants :" + "Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :" + conflictingInstants); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieCompactionPlan workload = table.scheduleCompaction(jsc, instantTime); @@ -1343,8 +1342,7 @@ public class HoodieWriteClient implements Seriali } /** - * Performs a compaction operation on a dataset. WARNING: Compaction operation cannot be executed - * asynchronously. Please always use this serially before or after an insert/upsert action. + * Performs a compaction operation on a dataset, serially before or after an insert/upsert action. */ private Optional forceCompact(Optional> extraMetadata) throws IOException { Optional compactionInstantTimeOpt = scheduleCompaction(extraMetadata); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java index 2cde40138..95e0c9b9e 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java @@ -47,8 +47,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { public static final String MIN_COMMITS_TO_KEEP_PROP = "hoodie.keep.min.commits"; // Upsert uses this file size to compact new data onto existing files.. public static final String PARQUET_SMALL_FILE_LIMIT_BYTES = "hoodie.parquet.small.file.limit"; - // Turned off by default - public static final String DEFAULT_PARQUET_SMALL_FILE_LIMIT_BYTES = String.valueOf(0); + // By default, treat any file <= 100MB as a small file. + public static final String DEFAULT_PARQUET_SMALL_FILE_LIMIT_BYTES = String.valueOf(104857600); /** * Configs related to specific table types **/ diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java index 429c43953..28811bbfd 100644 --- a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java @@ -126,6 +126,12 @@ public class DataSourceUtils { public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath, String tblName, Map parameters) throws Exception { + + // inline compaction is on by default for MOR + boolean inlineCompact = parameters.containsKey(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY()) + && parameters.get(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY()).equals(DataSourceWriteOptions + .MOR_STORAGE_TYPE_OPT_VAL()); + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().combineInput(true, true) .withPath(basePath).withAutoCommit(false) .withSchema(schemaStr).forTable(tblName).withIndexConfig( @@ -134,6 +140,7 @@ public class DataSourceUtils { .withPayloadClass(parameters.get( DataSourceWriteOptions .PAYLOAD_CLASS_OPT_KEY())) + .withInlineCompaction(inlineCompact) .build()) // override above with Hoodie configs specified as options. .withProps(parameters).build(); diff --git a/hoodie-spark/src/test/scala/DataSourceTest.scala b/hoodie-spark/src/test/scala/DataSourceTest.scala index 42def5bf1..1520f2f11 100644 --- a/hoodie-spark/src/test/scala/DataSourceTest.scala +++ b/hoodie-spark/src/test/scala/DataSourceTest.scala @@ -29,9 +29,9 @@ import org.junit.{Before, Test} import org.scalatest.junit.AssertionsForJUnit import scala.collection.JavaConversions._ +import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} -import scala.concurrent.ExecutionContext.Implicits.global /** * Basic tests on the spark datasource @@ -131,6 +131,7 @@ class DataSourceTest extends AssertionsForJUnit { val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) inputDF1.write.format("com.uber.hoodie") .options(commonOpts) + .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL) .mode(SaveMode.Overwrite) diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java index 665fcff98..0ad10def1 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -36,6 +36,7 @@ import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; @@ -323,17 +324,22 @@ public class HoodieDeltaStreamer implements Serializable { } } - private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) throws Exception { + private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) { HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.targetBasePath) .withAutoCommit(false) - .withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withPayloadClass(cfg.payloadClassName) + // turn on inline compaction by default, for MOR tables + .withInlineCompaction(HoodieTableType.valueOf(cfg.storageType) == HoodieTableType.MERGE_ON_READ) + .build()) .forTable(cfg.targetTableName) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withProps(props); if (null != schemaProvider) { builder = builder.withSchema(schemaProvider.getTargetSchema().toString()); } + return builder.build(); }