1
0

[HUDI-2789] Flink batch upsert for non partitioned table does not work (#4028)

This commit is contained in:
Danny Chan
2021-11-18 13:59:03 +08:00
committed by GitHub
parent 2d3f2a3275
commit 71a2ae0fd6
4 changed files with 25 additions and 11 deletions

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.configuration;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
@@ -92,4 +93,11 @@ public class OptionsResolver {
final String strategy = conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toLowerCase(Locale.ROOT); final String strategy = conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toLowerCase(Locale.ROOT);
return FlinkOptions.TIME_ELAPSED.equals(strategy) || FlinkOptions.NUM_OR_TIME.equals(strategy); return FlinkOptions.TIME_ELAPSED.equals(strategy) || FlinkOptions.NUM_OR_TIME.equals(strategy);
} }
/**
* Returns whether the table is partitioned.
*/
public static boolean isPartitionedTable(Configuration conf) {
return FilePathUtils.extractPartitionKeys(conf).length > 0;
}
} }

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.sink.utils;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperator; import org.apache.hudi.sink.StreamWriteOperator;
import org.apache.hudi.sink.append.AppendWriteOperator; import org.apache.hudi.sink.append.AppendWriteOperator;
@@ -129,10 +130,10 @@ public class Pipelines {
final boolean globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED); final boolean globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED);
if (overwrite) { if (overwrite) {
return rowDataToHoodieRecord(conf, rowType, dataStream); return rowDataToHoodieRecord(conf, rowType, dataStream);
} else if (bounded && !globalIndex) { } else if (bounded && !globalIndex && OptionsResolver.isPartitionedTable(conf)) {
return boundedBootstrap(conf, rowType, defaultParallelism, dataStream); return boundedBootstrap(conf, rowType, defaultParallelism, dataStream);
} else { } else {
return streamBootstrap(conf, rowType, defaultParallelism, dataStream); return streamBootstrap(conf, rowType, defaultParallelism, dataStream, bounded);
} }
} }
@@ -140,10 +141,11 @@ public class Pipelines {
Configuration conf, Configuration conf,
RowType rowType, RowType rowType,
int defaultParallelism, int defaultParallelism,
DataStream<RowData> dataStream) { DataStream<RowData> dataStream,
boolean bounded) {
DataStream<HoodieRecord> dataStream1 = rowDataToHoodieRecord(conf, rowType, dataStream); DataStream<HoodieRecord> dataStream1 = rowDataToHoodieRecord(conf, rowType, dataStream);
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED) || bounded) {
dataStream1 = dataStream1 dataStream1 = dataStream1
.transform( .transform(
"index_bootstrap", "index_bootstrap",
@@ -161,13 +163,10 @@ public class Pipelines {
RowType rowType, RowType rowType,
int defaultParallelism, int defaultParallelism,
DataStream<RowData> dataStream) { DataStream<RowData> dataStream) {
final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf); final RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
if (partitionFields.length > 0) { // shuffle by partition keys
RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType); dataStream = dataStream
// shuffle by partition keys .keyBy(rowDataKeyGen::getPartitionPath);
dataStream = dataStream
.keyBy(rowDataKeyGen::getPartitionPath);
}
return rowDataToHoodieRecord(conf, rowType, dataStream) return rowDataToHoodieRecord(conf, rowType, dataStream)
.transform( .transform(

View File

@@ -424,6 +424,8 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
String hoodieTableDDL = sql("t1") String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.TABLE_NAME, tableType.name()) .option(FlinkOptions.TABLE_NAME, tableType.name())
.option("hoodie.parquet.small.file.limit", "0") // invalidate the small file strategy
.option("hoodie.parquet.max.file.size", "0")
.noPartition() .noPartition()
.end(); .end();
tableEnv.executeSql(hoodieTableDDL); tableEnv.executeSql(hoodieTableDDL);

View File

@@ -247,6 +247,11 @@ public class TestConfigurations {
return this; return this;
} }
public Sql option(String key, Object val) {
this.options.put(key, val.toString());
return this;
}
public Sql options(Map<String, String> options) { public Sql options(Map<String, String> options) {
this.options.putAll(options); this.options.putAll(options);
return this; return this;