1
0

[HUDI-2790] Fix the changelog mode of HoodieTableSource (#4029)

This commit is contained in:
Danny Chan
2021-11-18 16:40:48 +08:00
committed by GitHub
parent 71a2ae0fd6
commit 8772cec4bd
3 changed files with 16 additions and 7 deletions

View File

@@ -100,4 +100,14 @@ public class OptionsResolver {
public static boolean isPartitionedTable(Configuration conf) {
return FilePathUtils.extractPartitionKeys(conf).length > 0;
}
/**
* Returns whether the source should emit changelog.
*
* @return true if the source is read as streaming with changelog mode enabled
*/
public static boolean emitChangelog(Configuration conf) {
return conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
&& conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED);
}
}

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.HoodieROTablePathFilter;
import org.apache.hudi.source.FileIndex;
@@ -196,11 +197,9 @@ public class HoodieTableSource implements
@Override
public ChangelogMode getChangelogMode() {
return conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)
? ChangelogModes.FULL
// when all the changes are persisted or read as batch,
// use INSERT mode.
: ChangelogMode.insertOnly();
// when read as streaming and changelog mode is enabled, emit as FULL mode;
// when all the changes are compacted or read as batch, emit as INSERT mode.
return OptionsResolver.emitChangelog(conf) ? ChangelogModes.FULL : ChangelogMode.insertOnly();
}
@Override

View File

@@ -25,6 +25,7 @@ import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.table.format.FilePathUtils;
@@ -179,8 +180,7 @@ public class MergeOnReadInputFormat
}
} else if (!split.getBasePath().isPresent()) {
// log files only
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
&& conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) {
if (OptionsResolver.emitChangelog(conf)) {
this.iterator = new LogFileOnlyIterator(getUnMergedLogFileIterator(split));
} else {
this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));