From 9f18a1ca80ec1d08253688d9b4d4538a86068559 Mon Sep 17 00:00:00 2001 From: Balaji Varadarajan Date: Fri, 28 Jun 2019 17:49:23 -0700 Subject: [PATCH] Fixing bugs found during running hoodie demo (#760) --- .../view/AbstractTableFileSystemView.java | 2 +- .../utilities/deltastreamer/DeltaSync.java | 8 +++--- .../deltastreamer/HoodieDeltaStreamer.java | 26 +++++++++++++++++-- 3 files changed, 29 insertions(+), 7 deletions(-) diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java index 129a584c1..8b05b695b 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java @@ -216,7 +216,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV log.info("Building file system view for partition (" + partitionPathStr + ")"); // Create the path if it does not exist already - Path partitionPath = new Path(metaClient.getBasePath(), partitionPathStr); + Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePath(), partitionPathStr); FSUtils.createPathIfNotExists(metaClient.getFs(), partitionPath); long beginLsTs = System.currentTimeMillis(); FileStatus[] statuses = metaClient.getFs().listStatus(partitionPath); diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java index c372b3c1a..89e5c7311 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java @@ -151,6 +151,7 @@ public class DeltaSync implements Serializable { */ private final HoodieTableType tableType; + public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieTableType tableType, TypedProperties props, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf, @@ -359,9 +360,8 @@ public class DeltaSync implements Serializable { log.info("Commit " + commitTime + " successful!"); // Schedule compaction if needed - if (tableType.equals(HoodieTableType.MERGE_ON_READ) && cfg.continuousMode) { - scheduledCompactionInstant = writeClient - .scheduleCompaction(Optional.of(checkpointCommitMetadata)); + if (cfg.isAsyncCompactionEnabled()) { + scheduledCompactionInstant = writeClient.scheduleCompaction(Optional.of(checkpointCommitMetadata)); } // Sync to hive if enabled @@ -458,7 +458,7 @@ public class DeltaSync implements Serializable { .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withPayloadClass(cfg.payloadClassName) // Inline compaction is disabled for continuous mode. otherwise enabled for MOR - .withInlineCompaction(!cfg.continuousMode && tableType.equals(HoodieTableType.MERGE_ON_READ)).build()) + .withInlineCompaction(cfg.isInlineCompactionEnabled()).build()) .forTable(cfg.targetTableName) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withAutoCommit(false); diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java index 61594143e..c49f3f836 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -24,6 +24,7 @@ import com.beust.jcommander.IStringConverter; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; +import com.google.common.base.Preconditions; import com.uber.hoodie.HoodieWriteClient; import com.uber.hoodie.OverwriteWithLatestAvroPayload; import com.uber.hoodie.SimpleKeyGenerator; @@ -252,8 +253,26 @@ public class HoodieDeltaStreamer implements Serializable { + "https://spark.apache.org/docs/latest/job-scheduling.html") public Integer compactSchedulingMinShare = 0; + /** + * Compaction is enabled for MoR table by default. This flag disables it + */ + @Parameter(names = {"--disable-compaction"}, description = "Compaction is enabled for MoR table by default." + + "This flag disables it ") + public Boolean forceDisableCompaction = false; + @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; + + + public boolean isAsyncCompactionEnabled() { + return continuousMode && !forceDisableCompaction + && HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(storageType)); + } + + public boolean isInlineCompactionEnabled() { + return !continuousMode && !forceDisableCompaction + && HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(storageType)); + } } public static void main(String[] args) throws Exception { @@ -325,6 +344,9 @@ public class HoodieDeltaStreamer implements Serializable { HoodieTableMetaClient meta = new HoodieTableMetaClient( new Configuration(fs.getConf()), cfg.targetBasePath, false); tableType = meta.getTableType(); + // This will guarantee there is no surprise with table type + Preconditions.checkArgument(tableType.equals(HoodieTableType.valueOf(cfg.storageType)), + "Hoodie table is of type " + tableType + " but passed in CLI argument is " + cfg.storageType); } else { tableType = HoodieTableType.valueOf(cfg.storageType); } @@ -350,7 +372,7 @@ public class HoodieDeltaStreamer implements Serializable { ExecutorService executor = Executors.newFixedThreadPool(1); return Pair.of(CompletableFuture.supplyAsync(() -> { boolean error = false; - if (cfg.continuousMode && tableType.equals(HoodieTableType.MERGE_ON_READ)) { + if (cfg.isAsyncCompactionEnabled()) { // set Scheduler Pool. log.info("Setting Spark Pool name for delta-sync to " + SchedulerConfGenerator.DELTASYNC_POOL_NAME); jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.DELTASYNC_POOL_NAME); @@ -395,7 +417,7 @@ public class HoodieDeltaStreamer implements Serializable { * @return */ protected Boolean onInitializingWriteClient(HoodieWriteClient writeClient) { - if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { + if (cfg.isAsyncCompactionEnabled()) { asyncCompactService = new AsyncCompactService(jssc, writeClient); // Enqueue existing pending compactions first HoodieTableMetaClient meta = new HoodieTableMetaClient(