diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java index 7b61548af..13f2ebc2e 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java @@ -114,6 +114,12 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { } } + public Builder fromProperties(Properties props) { + this.props.putAll(props); + return this; + } + + public Builder withAutoClean(Boolean autoClean) { props.setProperty(AUTO_CLEAN_PROP, String.valueOf(autoClean)); return this; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java index a392ad8a7..bbcc065de 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java @@ -64,6 +64,11 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { } } + public Builder fromProperties(Properties props) { + this.props.putAll(props); + return this; + } + public Builder withIndexType(HoodieIndex.IndexType indexType) { props.setProperty(INDEX_TYPE_PROP, indexType.name()); return this; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieMetricsConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieMetricsConfig.java index 48dcdfe3c..0ef107de5 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieMetricsConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieMetricsConfig.java @@ -68,6 +68,12 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig { } } + public Builder fromProperties(Properties props) { + this.props.putAll(props); + return this; + } + + public Builder on(boolean metricsOn) { props.setProperty(METRICS_ON, String.valueOf(metricsOn)); return this; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieStorageConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieStorageConfig.java index 50ed232cf..b9ce48aca 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieStorageConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieStorageConfig.java @@ -55,6 +55,11 @@ public class HoodieStorageConfig extends DefaultHoodieConfig { } } + public Builder fromProperties(Properties props) { + this.props.putAll(props); + return this; + } + public Builder limitFileSize(int maxFileSize) { props.setProperty(PARQUET_FILE_MAX_BYTES, String.valueOf(maxFileSize)); return this; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index ce70942a3..e38637130 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -361,14 +361,15 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { setDefaultOnCondition(props, !props.containsKey(HOODIE_ASSUME_DATE_PARTITIONING_PROP), HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING); - - setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().build()); + // Make sure the props is propagated + setDefaultOnCondition(props, !isIndexConfigSet, + HoodieIndexConfig.newBuilder().fromProperties(props).build()); setDefaultOnCondition(props, !isStorageConfigSet, - HoodieStorageConfig.newBuilder().build()); + HoodieStorageConfig.newBuilder().fromProperties(props).build()); setDefaultOnCondition(props, !isCompactionConfigSet, - HoodieCompactionConfig.newBuilder().build()); + HoodieCompactionConfig.newBuilder().fromProperties(props).build()); setDefaultOnCondition(props, !isMetricsConfigSet, - HoodieMetricsConfig.newBuilder().build()); + HoodieMetricsConfig.newBuilder().fromProperties(props).build()); return config; } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java index 7814c3c8e..d1af640fe 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java @@ -105,4 +105,11 @@ public interface TableFileSystemView { */ Map> groupLatestDataFileWithLogFiles(String partitionPath) throws IOException; + /** + * Get the file Status for the path specified + * + * @param path + * @return + */ + FileStatus getFileStatus(String path); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java index 542906372..b65f3dba4 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java @@ -217,6 +217,15 @@ public class HoodieTableFileSystemView implements TableFileSystemView, Serializa Collectors.toMap(Pair::getKey, Pair::getRight))).orElseGet(Maps::newHashMap); } + @Override + public FileStatus getFileStatus(String path) { + try { + return fs.getFileStatus(new Path(path)); + } catch (IOException e) { + throw new HoodieIOException("Could not get FileStatus on path " + path); + } + } + protected Stream> getFilesByFileId(FileStatus[] files, String maxCommitTime) throws IOException { diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java index 7404e20ac..e0fe295e6 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java @@ -114,6 +114,7 @@ public class HoodieInputFormat extends MapredParquetInputFormat .collect(Collectors.toList()); for (HoodieDataFile filteredFile : filteredFiles) { LOG.info("Processing incremental hoodie file - " + filteredFile.getPath()); + filteredFile = checkFileStatus(fsView, filteredFile); returns.add(filteredFile.getFileStatus()); } LOG.info( @@ -126,6 +127,7 @@ public class HoodieInputFormat extends MapredParquetInputFormat if (LOG.isDebugEnabled()) { LOG.debug("Processing latest hoodie file - " + filteredFile.getPath()); } + filteredFile = checkFileStatus(fsView, filteredFile); returns.add(filteredFile.getFileStatus()); } } @@ -134,6 +136,24 @@ public class HoodieInputFormat extends MapredParquetInputFormat } + /** + * Checks the file status for a race condition which can set the file size to 0. + * 1. HiveInputFormat does super.listStatus() and gets back a FileStatus[] + * 2. Then it creates the HoodieTableMetaClient for the paths listed. + * 3. Generation of splits looks at FileStatus size to create splits, which skips this file + * + * @param fsView + * @param fileStatus + * @return + */ + private HoodieDataFile checkFileStatus(TableFileSystemView fsView, HoodieDataFile fileStatus) { + if(fileStatus.getFileSize() == 0) { + LOG.info("Refreshing file status " + fileStatus.getPath()); + return new HoodieDataFile(fsView.getFileStatus(fileStatus.getPath())); + } + return fileStatus; + } + private Map> groupFileStatus(FileStatus[] fileStatuses) throws IOException { // This assumes the paths for different tables are grouped together