1
0

[HUDI-2094] Supports hive style partitioning for flink writer (#3178)

This commit is contained in:
Danny Chan
2021-06-29 15:34:26 +08:00
committed by GitHub
parent 0749cc826a
commit b8a8f572d6
8 changed files with 47 additions and 20 deletions

View File

@@ -142,12 +142,6 @@ public class FlinkOptions {
+ "2) payload_combine: read the base file records first, for each record in base file, checks whether the key is in the\n"
+ " log file records(combines the two records with same key for base and log file records), then read the left log file records");
public static final ConfigOption<Boolean> HIVE_STYLE_PARTITION = ConfigOptions
.key("hoodie.datasource.hive_style_partition")
.booleanType()
.defaultValue(false)
.withDescription("Whether the partition path is with Hive style, e.g. '{partition key}={partition value}', default false");
public static final ConfigOption<Boolean> UTC_TIMEZONE = ConfigOptions
.key("read.utc-timezone")
.booleanType()
@@ -260,12 +254,20 @@ public class FlinkOptions {
.withDescription("Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`.\n"
+ "Actual value obtained by invoking .toString(), default ''");
public static final ConfigOption<Boolean> PARTITION_PATH_URL_ENCODE = ConfigOptions
.key("write.partition.url_encode")
public static final ConfigOption<Boolean> URL_ENCODE_PARTITIONING = ConfigOptions
.key(KeyGeneratorOptions.URL_ENCODE_PARTITIONING_OPT_KEY)
.booleanType()
.defaultValue(false)
.withDescription("Whether to encode the partition path url, default false");
public static final ConfigOption<Boolean> HIVE_STYLE_PARTITIONING = ConfigOptions
.key(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY)
.booleanType()
.defaultValue(false)
.withDescription("Whether to use Hive style partitioning.\n"
+ "If set true, the names of partition folders follow <partition_column_name>=<partition_value> format.\n"
+ "By default false (the names of partition folders are only partition values)");
public static final ConfigOption<String> KEYGEN_CLASS = ConfigOptions
.key(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP)
.stringType()

View File

@@ -554,7 +554,7 @@ public class StreamWriteFunction<K, I, O>
&& this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0);
}
private String instantToWrite() {
private String instantToWrite(boolean hasData) {
String instant = this.writeClient.getLastPendingInstant(this.actionType);
// if exactly-once semantics turns on,
// waits for the checkpoint notification until the checkpoint timeout threshold hits.
@@ -565,7 +565,7 @@ public class StreamWriteFunction<K, I, O>
// wait condition:
// 1. there is no inflight instant
// 2. the inflight instant does not change and the checkpoint has buffering data
while (instant == null || (instant.equals(this.currentInstant) && hasData())) {
while (instant == null || (instant.equals(this.currentInstant) && hasData)) {
// sleep for a while
try {
if (waitingTime > ckpTimeout) {
@@ -588,7 +588,7 @@ public class StreamWriteFunction<K, I, O>
@SuppressWarnings("unchecked, rawtypes")
private boolean flushBucket(DataBucket bucket) {
String instant = instantToWrite();
String instant = instantToWrite(true);
if (instant == null) {
// in case there are empty checkpoints that has no input data
@@ -619,7 +619,7 @@ public class StreamWriteFunction<K, I, O>
@SuppressWarnings("unchecked, rawtypes")
private void flushRemaining(boolean isEndInput) {
this.currentInstant = instantToWrite();
this.currentInstant = instantToWrite(hasData());
if (this.currentInstant == null) {
// in case there are empty checkpoints that has no input data
throw new HoodieException("No inflight instant when flushing data!");

View File

@@ -83,7 +83,7 @@ public class HiveSyncContext {
hiveSyncConfig.ignoreExceptions = conf.getBoolean(FlinkOptions.HIVE_SYNC_IGNORE_EXCEPTIONS);
hiveSyncConfig.supportTimestamp = conf.getBoolean(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP);
hiveSyncConfig.autoCreateDatabase = conf.getBoolean(FlinkOptions.HIVE_SYNC_AUTO_CREATE_DB);
hiveSyncConfig.decodePartition = conf.getBoolean(FlinkOptions.PARTITION_PATH_URL_ENCODE);
hiveSyncConfig.decodePartition = conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING);
hiveSyncConfig.skipROSuffix = conf.getBoolean(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX);
hiveSyncConfig.assumeDatePartitioning = conf.getBoolean(FlinkOptions.HIVE_SYNC_ASSUME_DATE_PARTITION);
return hiveSyncConfig;

View File

@@ -164,6 +164,11 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--write-partition-url-encode"}, description = "Whether to encode the partition path url, default false")
public Boolean writePartitionUrlEncode;
@Parameter(names = {"--hive-style-partitioning"}, description = "Whether to use Hive style partitioning.\n"
+ "If set true, the names of partition folders follow <partition_column_name>=<partition_value> format.\n"
+ "By default false (the names of partition folders are only partition values)")
public Boolean hiveStylePartitioning = false;
@Parameter(names = {"--write-task-max-size"}, description = "Maximum memory in MB for a write task, when the threshold hits,\n"
+ "it flushes the max size data bucket to avoid OOM, default 1GB")
public Double writeTaskMaxSize = 1024D;
@@ -314,7 +319,8 @@ public class FlinkStreamerConfig extends Configuration {
conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH, config.avroSchemaPath);
conf.setString(FlinkOptions.READ_AVRO_SCHEMA, config.avroSchema);
conf.setBoolean(FlinkOptions.UTC_TIMEZONE, config.utcTimezone);
conf.setBoolean(FlinkOptions.PARTITION_PATH_URL_ENCODE, config.writePartitionUrlEncode);
conf.setBoolean(FlinkOptions.URL_ENCODE_PARTITIONING, config.writePartitionUrlEncode);
conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, config.hiveStylePartitioning);
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, config.writeTaskMaxSize);
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, config.writeBatchSize);
conf.setInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE, config.writeLogBlockSize);

View File

@@ -218,7 +218,7 @@ public class HoodieTableSource implements
@Override
public Optional<List<Map<String, String>>> listPartitions() {
List<Map<String, String>> partitions = FilePathUtils.getPartitions(path, hadoopConf,
partitionKeys, defaultPartName, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION));
partitionKeys, defaultPartName, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING));
return Optional.of(partitions);
}
@@ -446,7 +446,7 @@ public class HoodieTableSource implements
return partitionKeys.isEmpty()
? new Path[] {path}
: FilePathUtils.partitionPath2ReadPath(path, partitionKeys, getOrFetchPartitions(),
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION));
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING));
}
private static class LatestFileFilter extends FilePathFilter {

View File

@@ -347,7 +347,7 @@ public class FilePathUtils {
return new Path[] {path};
} else {
final String defaultParName = conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME);
final boolean hivePartition = conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION);
final boolean hivePartition = conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING);
List<Map<String, String>> partitionPaths =
getPartitions(path, hadoopConf, partitionKeys, defaultParName, hivePartition);
return partitionPath2ReadPath(path, partitionKeys, partitionPaths, hivePartition);

View File

@@ -272,7 +272,7 @@ public class MergeOnReadInputFormat
// generate partition specs.
LinkedHashMap<String, String> partSpec = FilePathUtils.extractPartitionKeyValues(
new org.apache.hadoop.fs.Path(path).getParent(),
this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION),
this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING),
FilePathUtils.extractPartitionKeys(this.conf));
LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>();
partSpec.forEach((k, v) -> partObjects.put(k, restorePartValueFromType(

View File

@@ -45,7 +45,9 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.File;
import java.util.Collection;
@@ -56,6 +58,7 @@ import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.utils.TestData.assertRowsEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -254,11 +257,14 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
}
@ParameterizedTest
@EnumSource(value = ExecMode.class)
void testWriteAndRead(ExecMode execMode) {
@MethodSource("configParams")
void testWriteAndRead(ExecMode execMode, boolean hiveStylePartitioning) {
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv;
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
if (hiveStylePartitioning) {
options.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true");
}
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
tableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 values\n"
@@ -576,6 +582,19 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
BATCH, STREAM
}
/**
* Return test params => (execution mode, hive style partitioning).
*/
private static Stream<Arguments> configParams() {
Object[][] data =
new Object[][] {
{ExecMode.BATCH, false},
{ExecMode.BATCH, true},
{ExecMode.STREAM, false},
{ExecMode.STREAM, true}};
return Stream.of(data).map(Arguments::of);
}
private void execInsertSql(TableEnvironment tEnv, String insert) {
TableResult tableResult = tEnv.executeSql(insert);
// wait to finish