1
0

Fixing bugs found during running hoodie demo (#760)

This commit is contained in:
Balaji Varadarajan
2019-06-28 17:49:23 -07:00
committed by vinoth chandar
parent e48e35385a
commit 9f18a1ca80
3 changed files with 29 additions and 7 deletions

View File

@@ -216,7 +216,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
log.info("Building file system view for partition (" + partitionPathStr + ")");
// Create the path if it does not exist already
Path partitionPath = new Path(metaClient.getBasePath(), partitionPathStr);
Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePath(), partitionPathStr);
FSUtils.createPathIfNotExists(metaClient.getFs(), partitionPath);
long beginLsTs = System.currentTimeMillis();
FileStatus[] statuses = metaClient.getFs().listStatus(partitionPath);

View File

@@ -151,6 +151,7 @@ public class DeltaSync implements Serializable {
*/
private final HoodieTableType tableType;
public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
SchemaProvider schemaProvider, HoodieTableType tableType, TypedProperties props,
JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf,
@@ -359,9 +360,8 @@ public class DeltaSync implements Serializable {
log.info("Commit " + commitTime + " successful!");
// Schedule compaction if needed
if (tableType.equals(HoodieTableType.MERGE_ON_READ) && cfg.continuousMode) {
scheduledCompactionInstant = writeClient
.scheduleCompaction(Optional.of(checkpointCommitMetadata));
if (cfg.isAsyncCompactionEnabled()) {
scheduledCompactionInstant = writeClient.scheduleCompaction(Optional.of(checkpointCommitMetadata));
}
// Sync to hive if enabled
@@ -458,7 +458,7 @@ public class DeltaSync implements Serializable {
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withPayloadClass(cfg.payloadClassName)
// Inline compaction is disabled for continuous mode. otherwise enabled for MOR
.withInlineCompaction(!cfg.continuousMode && tableType.equals(HoodieTableType.MERGE_ON_READ)).build())
.withInlineCompaction(cfg.isInlineCompactionEnabled()).build())
.forTable(cfg.targetTableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withAutoCommit(false);

View File

@@ -24,6 +24,7 @@ import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.google.common.base.Preconditions;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.OverwriteWithLatestAvroPayload;
import com.uber.hoodie.SimpleKeyGenerator;
@@ -252,8 +253,26 @@ public class HoodieDeltaStreamer implements Serializable {
+ "https://spark.apache.org/docs/latest/job-scheduling.html")
public Integer compactSchedulingMinShare = 0;
/**
* Compaction is enabled for MoR table by default. This flag disables it
*/
@Parameter(names = {"--disable-compaction"}, description = "Compaction is enabled for MoR table by default."
+ "This flag disables it ")
public Boolean forceDisableCompaction = false;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
public boolean isAsyncCompactionEnabled() {
return continuousMode && !forceDisableCompaction
&& HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(storageType));
}
public boolean isInlineCompactionEnabled() {
return !continuousMode && !forceDisableCompaction
&& HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(storageType));
}
}
public static void main(String[] args) throws Exception {
@@ -325,6 +344,9 @@ public class HoodieDeltaStreamer implements Serializable {
HoodieTableMetaClient meta = new HoodieTableMetaClient(
new Configuration(fs.getConf()), cfg.targetBasePath, false);
tableType = meta.getTableType();
// This will guarantee there is no surprise with table type
Preconditions.checkArgument(tableType.equals(HoodieTableType.valueOf(cfg.storageType)),
"Hoodie table is of type " + tableType + " but passed in CLI argument is " + cfg.storageType);
} else {
tableType = HoodieTableType.valueOf(cfg.storageType);
}
@@ -350,7 +372,7 @@ public class HoodieDeltaStreamer implements Serializable {
ExecutorService executor = Executors.newFixedThreadPool(1);
return Pair.of(CompletableFuture.supplyAsync(() -> {
boolean error = false;
if (cfg.continuousMode && tableType.equals(HoodieTableType.MERGE_ON_READ)) {
if (cfg.isAsyncCompactionEnabled()) {
// set Scheduler Pool.
log.info("Setting Spark Pool name for delta-sync to " + SchedulerConfGenerator.DELTASYNC_POOL_NAME);
jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.DELTASYNC_POOL_NAME);
@@ -395,7 +417,7 @@ public class HoodieDeltaStreamer implements Serializable {
* @return
*/
protected Boolean onInitializingWriteClient(HoodieWriteClient writeClient) {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
if (cfg.isAsyncCompactionEnabled()) {
asyncCompactService = new AsyncCompactService(jssc, writeClient);
// Enqueue existing pending compactions first
HoodieTableMetaClient meta = new HoodieTableMetaClient(