Fixing small file handling, inline compaction defaults
- Small file limit is now 100MB by default - Turned on inline compaction by default for MOR - Changes take effect on DataSource and DeltaStreamer
This commit is contained in:
@@ -528,8 +528,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
|||||||
try {
|
try {
|
||||||
activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, commitTime),
|
activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, commitTime),
|
||||||
Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
||||||
// Save was a success
|
// Save was a success & Do a inline compaction if enabled
|
||||||
// Do a inline compaction if enabled
|
|
||||||
if (config.isInlineCompaction()) {
|
if (config.isInlineCompaction()) {
|
||||||
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true");
|
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true");
|
||||||
forceCompact(extraMetadata);
|
forceCompact(extraMetadata);
|
||||||
@@ -1103,7 +1102,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
|||||||
HoodieTimeline.compareTimestamps(instant.getTimestamp(), instantTime,
|
HoodieTimeline.compareTimestamps(instant.getTimestamp(), instantTime,
|
||||||
HoodieTimeline.GREATER_OR_EQUAL)).collect(Collectors.toList());
|
HoodieTimeline.GREATER_OR_EQUAL)).collect(Collectors.toList());
|
||||||
Preconditions.checkArgument(conflictingInstants.isEmpty(),
|
Preconditions.checkArgument(conflictingInstants.isEmpty(),
|
||||||
"Following instants have timestamps >= compactionInstant. Instants :"
|
"Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
|
||||||
+ conflictingInstants);
|
+ conflictingInstants);
|
||||||
HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||||
HoodieCompactionPlan workload = table.scheduleCompaction(jsc, instantTime);
|
HoodieCompactionPlan workload = table.scheduleCompaction(jsc, instantTime);
|
||||||
@@ -1343,8 +1342,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Performs a compaction operation on a dataset. WARNING: Compaction operation cannot be executed
|
* Performs a compaction operation on a dataset, serially before or after an insert/upsert action.
|
||||||
* asynchronously. Please always use this serially before or after an insert/upsert action.
|
|
||||||
*/
|
*/
|
||||||
private Optional<String> forceCompact(Optional<Map<String, String>> extraMetadata) throws IOException {
|
private Optional<String> forceCompact(Optional<Map<String, String>> extraMetadata) throws IOException {
|
||||||
Optional<String> compactionInstantTimeOpt = scheduleCompaction(extraMetadata);
|
Optional<String> compactionInstantTimeOpt = scheduleCompaction(extraMetadata);
|
||||||
|
|||||||
@@ -47,8 +47,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
|||||||
public static final String MIN_COMMITS_TO_KEEP_PROP = "hoodie.keep.min.commits";
|
public static final String MIN_COMMITS_TO_KEEP_PROP = "hoodie.keep.min.commits";
|
||||||
// Upsert uses this file size to compact new data onto existing files..
|
// Upsert uses this file size to compact new data onto existing files..
|
||||||
public static final String PARQUET_SMALL_FILE_LIMIT_BYTES = "hoodie.parquet.small.file.limit";
|
public static final String PARQUET_SMALL_FILE_LIMIT_BYTES = "hoodie.parquet.small.file.limit";
|
||||||
// Turned off by default
|
// By default, treat any file <= 100MB as a small file.
|
||||||
public static final String DEFAULT_PARQUET_SMALL_FILE_LIMIT_BYTES = String.valueOf(0);
|
public static final String DEFAULT_PARQUET_SMALL_FILE_LIMIT_BYTES = String.valueOf(104857600);
|
||||||
/**
|
/**
|
||||||
* Configs related to specific table types
|
* Configs related to specific table types
|
||||||
**/
|
**/
|
||||||
|
|||||||
@@ -126,6 +126,12 @@ public class DataSourceUtils {
|
|||||||
|
|
||||||
public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr,
|
public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr,
|
||||||
String basePath, String tblName, Map<String, String> parameters) throws Exception {
|
String basePath, String tblName, Map<String, String> parameters) throws Exception {
|
||||||
|
|
||||||
|
// inline compaction is on by default for MOR
|
||||||
|
boolean inlineCompact = parameters.containsKey(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY())
|
||||||
|
&& parameters.get(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY()).equals(DataSourceWriteOptions
|
||||||
|
.MOR_STORAGE_TYPE_OPT_VAL());
|
||||||
|
|
||||||
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().combineInput(true, true)
|
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().combineInput(true, true)
|
||||||
.withPath(basePath).withAutoCommit(false)
|
.withPath(basePath).withAutoCommit(false)
|
||||||
.withSchema(schemaStr).forTable(tblName).withIndexConfig(
|
.withSchema(schemaStr).forTable(tblName).withIndexConfig(
|
||||||
@@ -134,6 +140,7 @@ public class DataSourceUtils {
|
|||||||
.withPayloadClass(parameters.get(
|
.withPayloadClass(parameters.get(
|
||||||
DataSourceWriteOptions
|
DataSourceWriteOptions
|
||||||
.PAYLOAD_CLASS_OPT_KEY()))
|
.PAYLOAD_CLASS_OPT_KEY()))
|
||||||
|
.withInlineCompaction(inlineCompact)
|
||||||
.build())
|
.build())
|
||||||
// override above with Hoodie configs specified as options.
|
// override above with Hoodie configs specified as options.
|
||||||
.withProps(parameters).build();
|
.withProps(parameters).build();
|
||||||
|
|||||||
@@ -29,9 +29,9 @@ import org.junit.{Before, Test}
|
|||||||
import org.scalatest.junit.AssertionsForJUnit
|
import org.scalatest.junit.AssertionsForJUnit
|
||||||
|
|
||||||
import scala.collection.JavaConversions._
|
import scala.collection.JavaConversions._
|
||||||
|
import scala.concurrent.ExecutionContext.Implicits.global
|
||||||
import scala.concurrent.duration.Duration
|
import scala.concurrent.duration.Duration
|
||||||
import scala.concurrent.{Await, Future}
|
import scala.concurrent.{Await, Future}
|
||||||
import scala.concurrent.ExecutionContext.Implicits.global
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Basic tests on the spark datasource
|
* Basic tests on the spark datasource
|
||||||
@@ -131,6 +131,7 @@ class DataSourceTest extends AssertionsForJUnit {
|
|||||||
val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
|
val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
|
||||||
inputDF1.write.format("com.uber.hoodie")
|
inputDF1.write.format("com.uber.hoodie")
|
||||||
.options(commonOpts)
|
.options(commonOpts)
|
||||||
|
.option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
|
||||||
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
||||||
.option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
|
.option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
|
|||||||
@@ -36,6 +36,7 @@ import com.uber.hoodie.WriteStatus;
|
|||||||
import com.uber.hoodie.common.model.HoodieCommitMetadata;
|
import com.uber.hoodie.common.model.HoodieCommitMetadata;
|
||||||
import com.uber.hoodie.common.model.HoodieRecord;
|
import com.uber.hoodie.common.model.HoodieRecord;
|
||||||
import com.uber.hoodie.common.model.HoodieRecordPayload;
|
import com.uber.hoodie.common.model.HoodieRecordPayload;
|
||||||
|
import com.uber.hoodie.common.model.HoodieTableType;
|
||||||
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||||
import com.uber.hoodie.common.table.HoodieTimeline;
|
import com.uber.hoodie.common.table.HoodieTimeline;
|
||||||
import com.uber.hoodie.common.table.timeline.HoodieInstant;
|
import com.uber.hoodie.common.table.timeline.HoodieInstant;
|
||||||
@@ -323,17 +324,22 @@ public class HoodieDeltaStreamer implements Serializable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) throws Exception {
|
private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) {
|
||||||
HoodieWriteConfig.Builder builder =
|
HoodieWriteConfig.Builder builder =
|
||||||
HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.targetBasePath)
|
HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.targetBasePath)
|
||||||
.withAutoCommit(false)
|
.withAutoCommit(false)
|
||||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build())
|
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||||
|
.withPayloadClass(cfg.payloadClassName)
|
||||||
|
// turn on inline compaction by default, for MOR tables
|
||||||
|
.withInlineCompaction(HoodieTableType.valueOf(cfg.storageType) == HoodieTableType.MERGE_ON_READ)
|
||||||
|
.build())
|
||||||
.forTable(cfg.targetTableName)
|
.forTable(cfg.targetTableName)
|
||||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
|
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
|
||||||
.withProps(props);
|
.withProps(props);
|
||||||
if (null != schemaProvider) {
|
if (null != schemaProvider) {
|
||||||
builder = builder.withSchema(schemaProvider.getTargetSchema().toString());
|
builder = builder.withSchema(schemaProvider.getTargetSchema().toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user