Make sure properties set in HoodieWriteConfig is propogated down to individual configs. Fix a race condition which lets InputFormat to think file size is 0 when it is actually not
This commit is contained in:
committed by
prazanna
parent
91b088f29f
commit
8974e11161
@@ -114,6 +114,12 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder fromProperties(Properties props) {
|
||||||
|
this.props.putAll(props);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public Builder withAutoClean(Boolean autoClean) {
|
public Builder withAutoClean(Boolean autoClean) {
|
||||||
props.setProperty(AUTO_CLEAN_PROP, String.valueOf(autoClean));
|
props.setProperty(AUTO_CLEAN_PROP, String.valueOf(autoClean));
|
||||||
return this;
|
return this;
|
||||||
|
|||||||
@@ -64,6 +64,11 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder fromProperties(Properties props) {
|
||||||
|
this.props.putAll(props);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public Builder withIndexType(HoodieIndex.IndexType indexType) {
|
public Builder withIndexType(HoodieIndex.IndexType indexType) {
|
||||||
props.setProperty(INDEX_TYPE_PROP, indexType.name());
|
props.setProperty(INDEX_TYPE_PROP, indexType.name());
|
||||||
return this;
|
return this;
|
||||||
|
|||||||
@@ -68,6 +68,12 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder fromProperties(Properties props) {
|
||||||
|
this.props.putAll(props);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public Builder on(boolean metricsOn) {
|
public Builder on(boolean metricsOn) {
|
||||||
props.setProperty(METRICS_ON, String.valueOf(metricsOn));
|
props.setProperty(METRICS_ON, String.valueOf(metricsOn));
|
||||||
return this;
|
return this;
|
||||||
|
|||||||
@@ -55,6 +55,11 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder fromProperties(Properties props) {
|
||||||
|
this.props.putAll(props);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public Builder limitFileSize(int maxFileSize) {
|
public Builder limitFileSize(int maxFileSize) {
|
||||||
props.setProperty(PARQUET_FILE_MAX_BYTES, String.valueOf(maxFileSize));
|
props.setProperty(PARQUET_FILE_MAX_BYTES, String.valueOf(maxFileSize));
|
||||||
return this;
|
return this;
|
||||||
|
|||||||
@@ -361,14 +361,15 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
setDefaultOnCondition(props, !props.containsKey(HOODIE_ASSUME_DATE_PARTITIONING_PROP),
|
setDefaultOnCondition(props, !props.containsKey(HOODIE_ASSUME_DATE_PARTITIONING_PROP),
|
||||||
HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING);
|
HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING);
|
||||||
|
|
||||||
|
// Make sure the props is propagated
|
||||||
setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().build());
|
setDefaultOnCondition(props, !isIndexConfigSet,
|
||||||
|
HoodieIndexConfig.newBuilder().fromProperties(props).build());
|
||||||
setDefaultOnCondition(props, !isStorageConfigSet,
|
setDefaultOnCondition(props, !isStorageConfigSet,
|
||||||
HoodieStorageConfig.newBuilder().build());
|
HoodieStorageConfig.newBuilder().fromProperties(props).build());
|
||||||
setDefaultOnCondition(props, !isCompactionConfigSet,
|
setDefaultOnCondition(props, !isCompactionConfigSet,
|
||||||
HoodieCompactionConfig.newBuilder().build());
|
HoodieCompactionConfig.newBuilder().fromProperties(props).build());
|
||||||
setDefaultOnCondition(props, !isMetricsConfigSet,
|
setDefaultOnCondition(props, !isMetricsConfigSet,
|
||||||
HoodieMetricsConfig.newBuilder().build());
|
HoodieMetricsConfig.newBuilder().fromProperties(props).build());
|
||||||
return config;
|
return config;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -105,4 +105,11 @@ public interface TableFileSystemView {
|
|||||||
*/
|
*/
|
||||||
Map<HoodieDataFile, List<HoodieLogFile>> groupLatestDataFileWithLogFiles(String partitionPath) throws IOException;
|
Map<HoodieDataFile, List<HoodieLogFile>> groupLatestDataFileWithLogFiles(String partitionPath) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the file Status for the path specified
|
||||||
|
*
|
||||||
|
* @param path
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
FileStatus getFileStatus(String path);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -217,6 +217,15 @@ public class HoodieTableFileSystemView implements TableFileSystemView, Serializa
|
|||||||
Collectors.toMap(Pair::getKey, Pair::getRight))).orElseGet(Maps::newHashMap);
|
Collectors.toMap(Pair::getKey, Pair::getRight))).orElseGet(Maps::newHashMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FileStatus getFileStatus(String path) {
|
||||||
|
try {
|
||||||
|
return fs.getFileStatus(new Path(path));
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieIOException("Could not get FileStatus on path " + path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
protected Stream<List<HoodieDataFile>> getFilesByFileId(FileStatus[] files,
|
protected Stream<List<HoodieDataFile>> getFilesByFileId(FileStatus[] files,
|
||||||
String maxCommitTime) throws IOException {
|
String maxCommitTime) throws IOException {
|
||||||
|
|||||||
@@ -114,6 +114,7 @@ public class HoodieInputFormat extends MapredParquetInputFormat
|
|||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
for (HoodieDataFile filteredFile : filteredFiles) {
|
for (HoodieDataFile filteredFile : filteredFiles) {
|
||||||
LOG.info("Processing incremental hoodie file - " + filteredFile.getPath());
|
LOG.info("Processing incremental hoodie file - " + filteredFile.getPath());
|
||||||
|
filteredFile = checkFileStatus(fsView, filteredFile);
|
||||||
returns.add(filteredFile.getFileStatus());
|
returns.add(filteredFile.getFileStatus());
|
||||||
}
|
}
|
||||||
LOG.info(
|
LOG.info(
|
||||||
@@ -126,6 +127,7 @@ public class HoodieInputFormat extends MapredParquetInputFormat
|
|||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
|
LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
|
||||||
}
|
}
|
||||||
|
filteredFile = checkFileStatus(fsView, filteredFile);
|
||||||
returns.add(filteredFile.getFileStatus());
|
returns.add(filteredFile.getFileStatus());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -134,6 +136,24 @@ public class HoodieInputFormat extends MapredParquetInputFormat
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks the file status for a race condition which can set the file size to 0.
|
||||||
|
* 1. HiveInputFormat does super.listStatus() and gets back a FileStatus[]
|
||||||
|
* 2. Then it creates the HoodieTableMetaClient for the paths listed.
|
||||||
|
* 3. Generation of splits looks at FileStatus size to create splits, which skips this file
|
||||||
|
*
|
||||||
|
* @param fsView
|
||||||
|
* @param fileStatus
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private HoodieDataFile checkFileStatus(TableFileSystemView fsView, HoodieDataFile fileStatus) {
|
||||||
|
if(fileStatus.getFileSize() == 0) {
|
||||||
|
LOG.info("Refreshing file status " + fileStatus.getPath());
|
||||||
|
return new HoodieDataFile(fsView.getFileStatus(fileStatus.getPath()));
|
||||||
|
}
|
||||||
|
return fileStatus;
|
||||||
|
}
|
||||||
|
|
||||||
private Map<HoodieTableMetaClient, List<FileStatus>> groupFileStatus(FileStatus[] fileStatuses)
|
private Map<HoodieTableMetaClient, List<FileStatus>> groupFileStatus(FileStatus[] fileStatuses)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// This assumes the paths for different tables are grouped together
|
// This assumes the paths for different tables are grouped together
|
||||||
|
|||||||
Reference in New Issue
Block a user