1
0

[HUDI-3993] Replacing UDF in Bulk Insert w/ RDD transformation (#5470)

This commit is contained in:
Alexey Kudinkin
2022-07-21 06:20:47 -07:00
committed by GitHub
parent c7fe3fd01d
commit a33bdd32e3
41 changed files with 1180 additions and 870 deletions

View File

@@ -18,6 +18,9 @@
package org.apache.hudi;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
@@ -41,10 +44,6 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.util.DataTypeUtils;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -293,7 +292,7 @@ public class DataSourceUtils {
// - {@code HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED} has not been explicitly
// set by the writer
//
// If both of these conditions are true, than we override the default value of {@code
// If both of these conditions are true, then we override the default value of {@code
// HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED} and set it to "true"
LOG.warn("Small Decimal Type found in the persisted schema, reverting default value of 'hoodie.parquet.writelegacyformat.enabled' to true");
properties.put(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key(), "true");

View File

@@ -1,189 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.keygen.ComplexKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import scala.collection.JavaConverters;
import static org.apache.spark.sql.functions.callUDF;
/**
* Helper class to assist in preparing {@link Dataset<Row>}s for bulk insert with datasource implementation.
*/
public class HoodieDatasetBulkInsertHelper {
private static final Logger LOG = LogManager.getLogger(HoodieDatasetBulkInsertHelper.class);
private static final String RECORD_KEY_UDF_FN = "hudi_recordkey_gen_function_";
private static final String PARTITION_PATH_UDF_FN = "hudi_partition_gen_function_";
/**
* Prepares input hoodie spark dataset for bulk insert. It does the following steps.
* 1. Uses KeyGenerator to generate hoodie record keys and partition path.
* 2. Add hoodie columns to input spark dataset.
* 3. Reorders input dataset columns so that hoodie columns appear in the beginning.
* 4. Sorts input dataset by hoodie partition path and record key
*
* @param sqlContext SQL Context
* @param config Hoodie Write Config
* @param rows Spark Input dataset
* @return hoodie dataset which is ready for bulk insert.
*/
public static Dataset<Row> prepareHoodieDatasetForBulkInsert(SQLContext sqlContext,
HoodieWriteConfig config, Dataset<Row> rows, String structName, String recordNamespace,
BulkInsertPartitioner<Dataset<Row>> bulkInsertPartitionerRows,
boolean isGlobalIndex, boolean dropPartitionColumns) {
List<Column> originalFields =
Arrays.stream(rows.schema().fields()).map(f -> new Column(f.name())).collect(Collectors.toList());
TypedProperties properties = new TypedProperties();
properties.putAll(config.getProps());
String keyGeneratorClass = properties.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key());
String recordKeyFields = properties.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
String partitionPathFields = properties.containsKey(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
? properties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()) : "";
BuiltinKeyGenerator keyGenerator = (BuiltinKeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, properties);
Dataset<Row> rowDatasetWithRecordKeysAndPartitionPath;
if (keyGeneratorClass.equals(NonpartitionedKeyGenerator.class.getName())) {
// for non partitioned, set partition path to empty.
rowDatasetWithRecordKeysAndPartitionPath = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, functions.col(recordKeyFields))
.withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, functions.lit("").cast(DataTypes.StringType));
} else if (keyGeneratorClass.equals(SimpleKeyGenerator.class.getName())
|| (keyGeneratorClass.equals(ComplexKeyGenerator.class.getName()) && !recordKeyFields.contains(",") && !partitionPathFields.contains(",")
&& (!partitionPathFields.contains("timestamp")))) { // incase of ComplexKeyGen, check partition path type.
// simple fields for both record key and partition path: can directly use withColumn
String partitionPathField = keyGeneratorClass.equals(SimpleKeyGenerator.class.getName()) ? partitionPathFields :
partitionPathFields.substring(partitionPathFields.indexOf(":") + 1);
rowDatasetWithRecordKeysAndPartitionPath = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, functions.col(recordKeyFields).cast(DataTypes.StringType))
.withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, functions.col(partitionPathField).cast(DataTypes.StringType));
} else {
// use udf
String tableName = properties.getString(HoodieWriteConfig.TBL_NAME.key());
String recordKeyUdfFn = RECORD_KEY_UDF_FN + tableName;
String partitionPathUdfFn = PARTITION_PATH_UDF_FN + tableName;
sqlContext.udf().register(recordKeyUdfFn, (UDF1<Row, String>) keyGenerator::getRecordKey, DataTypes.StringType);
sqlContext.udf().register(partitionPathUdfFn, (UDF1<Row, String>) keyGenerator::getPartitionPath, DataTypes.StringType);
final Dataset<Row> rowDatasetWithRecordKeys = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD,
callUDF(recordKeyUdfFn, org.apache.spark.sql.functions.struct(
JavaConverters.collectionAsScalaIterableConverter(originalFields).asScala().toSeq())));
rowDatasetWithRecordKeysAndPartitionPath =
rowDatasetWithRecordKeys.withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD,
callUDF(partitionPathUdfFn,
org.apache.spark.sql.functions.struct(
JavaConverters.collectionAsScalaIterableConverter(originalFields).asScala().toSeq())));
}
// Add other empty hoodie fields which will be populated before writing to parquet.
Dataset<Row> rowDatasetWithHoodieColumns =
rowDatasetWithRecordKeysAndPartitionPath.withColumn(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
functions.lit("").cast(DataTypes.StringType))
.withColumn(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD,
functions.lit("").cast(DataTypes.StringType))
.withColumn(HoodieRecord.FILENAME_METADATA_FIELD,
functions.lit("").cast(DataTypes.StringType));
Dataset<Row> processedDf = rowDatasetWithHoodieColumns;
if (dropPartitionColumns) {
String partitionColumns = String.join(",", keyGenerator.getPartitionPathFields());
for (String partitionField : keyGenerator.getPartitionPathFields()) {
originalFields.remove(new Column(partitionField));
}
processedDf = rowDatasetWithHoodieColumns.drop(partitionColumns);
}
Dataset<Row> dedupedDf = processedDf;
if (config.shouldCombineBeforeInsert()) {
dedupedDf = SparkRowWriteHelper.newInstance().deduplicateRows(processedDf, config.getPreCombineField(), isGlobalIndex);
}
List<Column> orderedFields = Stream.concat(HoodieRecord.HOODIE_META_COLUMNS.stream().map(Column::new),
originalFields.stream()).collect(Collectors.toList());
Dataset<Row> colOrderedDataset = dedupedDf.select(
JavaConverters.collectionAsScalaIterableConverter(orderedFields).asScala().toSeq());
return bulkInsertPartitionerRows.repartitionRecords(colOrderedDataset, config.getBulkInsertShuffleParallelism());
}
/**
* Add empty meta fields and reorder such that meta fields are at the beginning.
*
* @param rows
* @return
*/
public static Dataset<Row> prepareHoodieDatasetForBulkInsertWithoutMetaFields(Dataset<Row> rows) {
// add empty meta cols.
Dataset<Row> rowsWithMetaCols = rows
.withColumn(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
functions.lit("").cast(DataTypes.StringType))
.withColumn(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD,
functions.lit("").cast(DataTypes.StringType))
.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD,
functions.lit("").cast(DataTypes.StringType))
.withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD,
functions.lit("").cast(DataTypes.StringType))
.withColumn(HoodieRecord.FILENAME_METADATA_FIELD,
functions.lit("").cast(DataTypes.StringType));
List<Column> originalFields =
Arrays.stream(rowsWithMetaCols.schema().fields())
.filter(field -> !HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(field.name()))
.map(f -> new Column(f.name())).collect(Collectors.toList());
List<Column> metaFields =
Arrays.stream(rowsWithMetaCols.schema().fields())
.filter(field -> HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(field.name()))
.map(f -> new Column(f.name())).collect(Collectors.toList());
// reorder such that all meta columns are at the beginning followed by original columns
List<Column> allCols = new ArrayList<>();
allCols.addAll(metaFields);
allCols.addAll(originalFields);
return rowsWithMetaCols.select(
JavaConverters.collectionAsScalaIterableConverter(allCols).asScala().toSeq());
}
}

View File

@@ -1,72 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;
import java.util.List;
import java.util.stream.Collectors;
/**
* Helper class to assist in deduplicating Rows for BulkInsert with Rows.
*/
public class SparkRowWriteHelper {
private SparkRowWriteHelper() {
}
private static class WriteHelperHolder {
private static final SparkRowWriteHelper SPARK_WRITE_HELPER = new SparkRowWriteHelper();
}
public static SparkRowWriteHelper newInstance() {
return SparkRowWriteHelper.WriteHelperHolder.SPARK_WRITE_HELPER;
}
public Dataset<Row> deduplicateRows(Dataset<Row> inputDf, String preCombineField, boolean isGlobalIndex) {
return inputDf.groupByKey((MapFunction<Row, String>) value ->
isGlobalIndex
? (value.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD))
: (value.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + "+" + value.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD)), Encoders.STRING())
.reduceGroups((ReduceFunction<Row>) (v1, v2) ->
((Comparable) v1.getAs(preCombineField)).compareTo(v2.getAs(preCombineField)) >= 0 ? v1 : v2)
.map((MapFunction<Tuple2<String, Row>, Row>) value -> value._2, getEncoder(inputDf.schema()));
}
private ExpressionEncoder getEncoder(StructType schema) {
List<Attribute> attributes = JavaConversions.asJavaCollection(schema.toAttributes()).stream()
.map(Attribute::toAttribute).collect(Collectors.toList());
return RowEncoder.apply(schema)
.resolveAndBind(JavaConverters.asScalaBufferConverter(attributes).asScala().toSeq(),
SimpleAnalyzer$.MODULE$);
}
}

View File

@@ -27,18 +27,17 @@ import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.row.HoodieRowCreateHandle;
import org.apache.hudi.io.storage.row.HoodieRowCreateHandleWithoutMetaFields;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import java.io.IOException;
import java.util.ArrayList;
@@ -64,16 +63,20 @@ public class BulkInsertDataInternalWriterHelper {
private final StructType structType;
private final Boolean arePartitionRecordsSorted;
private final List<HoodieInternalWriteStatus> writeStatusList = new ArrayList<>();
private HoodieRowCreateHandle handle;
private String lastKnownPartitionPath = null;
private String fileIdPrefix;
private int numFilesWritten = 0;
private Map<String, HoodieRowCreateHandle> handles = new HashMap<>();
private final String fileIdPrefix;
private final Map<String, HoodieRowCreateHandle> handles = new HashMap<>();
private final boolean populateMetaFields;
private Option<BuiltinKeyGenerator> keyGeneratorOpt = null;
private boolean simpleKeyGen = false;
private int simplePartitionFieldIndex = -1;
private DataType simplePartitionFieldDataType;
private final Option<BuiltinKeyGenerator> keyGeneratorOpt;
private final boolean simpleKeyGen;
private final int simplePartitionFieldIndex;
private final DataType simplePartitionFieldDataType;
/**
* NOTE: This is stored as Catalyst's internal {@link UTF8String} to avoid
* conversion (deserialization) b/w {@link UTF8String} and {@link String}
*/
private String lastKnownPartitionPath = null;
private HoodieRowCreateHandle handle;
private int numFilesWritten = 0;
public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, HoodieWriteConfig writeConfig,
String instantTime, int taskPartitionId, long taskId, long taskEpochId, StructType structType,
@@ -88,13 +91,21 @@ public class BulkInsertDataInternalWriterHelper {
this.populateMetaFields = populateMetaFields;
this.arePartitionRecordsSorted = arePartitionRecordsSorted;
this.fileIdPrefix = UUID.randomUUID().toString();
if (!populateMetaFields) {
this.keyGeneratorOpt = getKeyGenerator(writeConfig.getProps());
if (keyGeneratorOpt.isPresent() && keyGeneratorOpt.get() instanceof SimpleKeyGenerator) {
simpleKeyGen = true;
simplePartitionFieldIndex = (Integer) structType.getFieldIndex((keyGeneratorOpt.get()).getPartitionPathFields().get(0)).get();
simplePartitionFieldDataType = structType.fields()[simplePartitionFieldIndex].dataType();
}
} else {
this.keyGeneratorOpt = Option.empty();
}
if (keyGeneratorOpt.isPresent() && keyGeneratorOpt.get() instanceof SimpleKeyGenerator) {
this.simpleKeyGen = true;
this.simplePartitionFieldIndex = (Integer) structType.getFieldIndex(keyGeneratorOpt.get().getPartitionPathFields().get(0)).get();
this.simplePartitionFieldDataType = structType.fields()[simplePartitionFieldIndex].dataType();
} else {
this.simpleKeyGen = false;
this.simplePartitionFieldIndex = -1;
this.simplePartitionFieldDataType = null;
}
}
@@ -120,32 +131,16 @@ public class BulkInsertDataInternalWriterHelper {
}
}
public void write(InternalRow record) throws IOException {
public void write(InternalRow row) throws IOException {
try {
String partitionPath = null;
if (populateMetaFields) { // usual path where meta fields are pre populated in prep step.
partitionPath = String.valueOf(record.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_POS));
} else { // if meta columns are disabled.
if (!keyGeneratorOpt.isPresent()) { // NoPartitionerKeyGen
partitionPath = "";
} else if (simpleKeyGen) { // SimpleKeyGen
Object parititionPathValue = record.get(simplePartitionFieldIndex, simplePartitionFieldDataType);
partitionPath = parititionPathValue != null ? parititionPathValue.toString() : PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
if (writeConfig.isHiveStylePartitioningEnabled()) {
partitionPath = (keyGeneratorOpt.get()).getPartitionPathFields().get(0) + "=" + partitionPath;
}
} else {
// only BuiltIn key generators are supported if meta fields are disabled.
partitionPath = keyGeneratorOpt.get().getPartitionPath(record, structType);
}
}
if ((lastKnownPartitionPath == null) || !lastKnownPartitionPath.equals(partitionPath) || !handle.canWrite()) {
String partitionPath = extractPartitionPath(row);
if (lastKnownPartitionPath == null || !lastKnownPartitionPath.equals(partitionPath) || !handle.canWrite()) {
LOG.info("Creating new file for partition path " + partitionPath);
handle = getRowCreateHandle(partitionPath);
lastKnownPartitionPath = partitionPath;
}
handle.write(record);
handle.write(row);
} catch (Throwable t) {
LOG.error("Global error thrown while trying to write records in HoodieRowCreateHandle ", t);
throw t;
@@ -157,30 +152,7 @@ public class BulkInsertDataInternalWriterHelper {
return writeStatusList;
}
public void abort() {
}
private HoodieRowCreateHandle getRowCreateHandle(String partitionPath) throws IOException {
if (!handles.containsKey(partitionPath)) { // if there is no handle corresponding to the partition path
// if records are sorted, we can close all existing handles
if (arePartitionRecordsSorted) {
close();
}
HoodieRowCreateHandle rowCreateHandle = populateMetaFields ? new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
instantTime, taskPartitionId, taskId, taskEpochId, structType) : new HoodieRowCreateHandleWithoutMetaFields(hoodieTable, writeConfig, partitionPath, getNextFileId(),
instantTime, taskPartitionId, taskId, taskEpochId, structType);
handles.put(partitionPath, rowCreateHandle);
} else if (!handles.get(partitionPath).canWrite()) {
// even if there is a handle to the partition path, it could have reached its max size threshold. So, we close the handle here and
// create a new one.
writeStatusList.add(handles.remove(partitionPath).close());
HoodieRowCreateHandle rowCreateHandle = populateMetaFields ? new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
instantTime, taskPartitionId, taskId, taskEpochId, structType) : new HoodieRowCreateHandleWithoutMetaFields(hoodieTable, writeConfig, partitionPath, getNextFileId(),
instantTime, taskPartitionId, taskId, taskEpochId, structType);
handles.put(partitionPath, rowCreateHandle);
}
return handles.get(partitionPath);
}
public void abort() {}
public void close() throws IOException {
for (HoodieRowCreateHandle rowCreateHandle : handles.values()) {
@@ -190,6 +162,56 @@ public class BulkInsertDataInternalWriterHelper {
handle = null;
}
private String extractPartitionPath(InternalRow row) {
String partitionPath;
if (populateMetaFields) {
// In case meta-fields are materialized w/in the table itself, we can just simply extract
// partition path from there
//
// NOTE: Helper keeps track of [[lastKnownPartitionPath]] as [[UTF8String]] to avoid
// conversion from Catalyst internal representation into a [[String]]
partitionPath = row.getString(HoodieRecord.PARTITION_PATH_META_FIELD_POS);
} else if (keyGeneratorOpt.isPresent()) {
// TODO(HUDI-4039) this should be handled by the SimpleKeyGenerator itself
if (simpleKeyGen) {
String partitionPathValue = row.get(simplePartitionFieldIndex, simplePartitionFieldDataType).toString();
partitionPath = partitionPathValue != null ? partitionPathValue : PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
if (writeConfig.isHiveStylePartitioningEnabled()) {
partitionPath = (keyGeneratorOpt.get()).getPartitionPathFields().get(0) + "=" + partitionPath;
}
} else {
// only BuiltIn key generators are supported if meta fields are disabled.
partitionPath = keyGeneratorOpt.get().getPartitionPath(row, structType);
}
} else {
partitionPath = "";
}
return partitionPath;
}
private HoodieRowCreateHandle getRowCreateHandle(String partitionPath) throws IOException {
if (!handles.containsKey(partitionPath)) { // if there is no handle corresponding to the partition path
// if records are sorted, we can close all existing handles
if (arePartitionRecordsSorted) {
close();
}
HoodieRowCreateHandle rowCreateHandle = createHandle(partitionPath);
handles.put(partitionPath, rowCreateHandle);
} else if (!handles.get(partitionPath).canWrite()) {
// even if there is a handle to the partition path, it could have reached its max size threshold. So, we close the handle here and
// create a new one.
writeStatusList.add(handles.remove(partitionPath).close());
HoodieRowCreateHandle rowCreateHandle = createHandle(partitionPath);
handles.put(partitionPath, rowCreateHandle);
}
return handles.get(partitionPath);
}
private HoodieRowCreateHandle createHandle(String partitionPath) {
return new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
instantTime, taskPartitionId, taskId, taskEpochId, structType, populateMetaFields);
}
private String getNextFileId() {
return String.format("%s-%d", fileIdPrefix, numFilesWritten++);
}

View File

@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.hudi.client.model.HoodieInternalRow
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.util.ReflectionUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.index.SparkHoodieIndexFactory
import org.apache.hudi.keygen.BuiltinKeyGenerator
import org.apache.hudi.table.BulkInsertPartitioner
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.HoodieUnsafeRDDUtils.createDataFrame
import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeRDDUtils, Row}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.mutable
object HoodieDatasetBulkInsertHelper extends Logging {
/**
* Prepares [[DataFrame]] for bulk-insert into Hudi table, taking following steps:
*
* <ol>
* <li>Invoking configured [[KeyGenerator]] to produce record key, alas partition-path value</li>
* <li>Prepends Hudi meta-fields to every row in the dataset</li>
* <li>Dedupes rows (if necessary)</li>
* <li>Partitions dataset using provided [[partitioner]]</li>
* </ol>
*/
def prepareForBulkInsert(df: DataFrame,
config: HoodieWriteConfig,
partitioner: BulkInsertPartitioner[Dataset[Row]],
shouldDropPartitionColumns: Boolean): Dataset[Row] = {
val populateMetaFields = config.populateMetaFields()
val schema = df.schema
val keyGeneratorClassName = config.getStringOrThrow(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME,
"Key-generator class name is required")
val prependedRdd: RDD[InternalRow] =
df.queryExecution.toRdd.mapPartitions { iter =>
val keyGenerator =
ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps))
.asInstanceOf[BuiltinKeyGenerator]
iter.map { row =>
val (recordKey, partitionPath) =
if (populateMetaFields) {
(UTF8String.fromString(keyGenerator.getRecordKey(row, schema)),
UTF8String.fromString(keyGenerator.getPartitionPath(row, schema)))
} else {
(UTF8String.EMPTY_UTF8, UTF8String.EMPTY_UTF8)
}
val commitTimestamp = UTF8String.EMPTY_UTF8
val commitSeqNo = UTF8String.EMPTY_UTF8
val filename = UTF8String.EMPTY_UTF8
// TODO use mutable row, avoid re-allocating
new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey, partitionPath, filename, row, false)
}
}
val metaFields = Seq(
StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType),
StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType),
StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, StringType),
StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, StringType),
StructField(HoodieRecord.FILENAME_METADATA_FIELD, StringType))
val updatedSchema = StructType(metaFields ++ schema.fields)
val updatedDF = if (populateMetaFields && config.shouldCombineBeforeInsert) {
val dedupedRdd = dedupeRows(prependedRdd, updatedSchema, config.getPreCombineField, SparkHoodieIndexFactory.isGlobalIndex(config))
HoodieUnsafeRDDUtils.createDataFrame(df.sparkSession, dedupedRdd, updatedSchema)
} else {
HoodieUnsafeRDDUtils.createDataFrame(df.sparkSession, prependedRdd, updatedSchema)
}
val trimmedDF = if (shouldDropPartitionColumns) {
dropPartitionColumns(updatedDF, config)
} else {
updatedDF
}
partitioner.repartitionRecords(trimmedDF, config.getBulkInsertShuffleParallelism)
}
private def dedupeRows(rdd: RDD[InternalRow], schema: StructType, preCombineFieldRef: String, isGlobalIndex: Boolean): RDD[InternalRow] = {
val recordKeyMetaFieldOrd = schema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)
val partitionPathMetaFieldOrd = schema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
// NOTE: Pre-combine field could be a nested field
val preCombineFieldPath = composeNestedFieldPath(schema, preCombineFieldRef)
rdd.map { row =>
val rowKey = if (isGlobalIndex) {
row.getString(recordKeyMetaFieldOrd)
} else {
val partitionPath = row.getString(partitionPathMetaFieldOrd)
val recordKey = row.getString(recordKeyMetaFieldOrd)
s"$partitionPath:$recordKey"
}
// NOTE: It's critical whenever we keep the reference to the row, to make a copy
// since Spark might be providing us with a mutable copy (updated during the iteration)
(rowKey, row.copy())
}
.reduceByKey {
(oneRow, otherRow) =>
val onePreCombineVal = getNestedInternalRowValue(oneRow, preCombineFieldPath).asInstanceOf[Comparable[AnyRef]]
val otherPreCombineVal = getNestedInternalRowValue(otherRow, preCombineFieldPath).asInstanceOf[Comparable[AnyRef]]
if (onePreCombineVal.compareTo(otherPreCombineVal.asInstanceOf[AnyRef]) >= 0) {
oneRow
} else {
otherRow
}
}
.values
}
private def dropPartitionColumns(df: DataFrame, config: HoodieWriteConfig): DataFrame = {
val partitionPathFields = getPartitionPathFields(config).toSet
val nestedPartitionPathFields = partitionPathFields.filter(f => f.contains('.'))
if (nestedPartitionPathFields.nonEmpty) {
logWarning(s"Can not drop nested partition path fields: $nestedPartitionPathFields")
}
val partitionPathCols = (partitionPathFields -- nestedPartitionPathFields).toSeq
df.drop(partitionPathCols: _*)
}
private def getPartitionPathFields(config: HoodieWriteConfig): Seq[String] = {
val keyGeneratorClassName = config.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME)
val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator]
keyGenerator.getPartitionPathFields.asScala
}
}

View File

@@ -515,8 +515,8 @@ object HoodieSparkSqlWriter {
instantTime: String,
partitionColumns: String): (Boolean, common.util.Option[String]) = {
val sparkContext = sqlContext.sparkContext
val populateMetaFields = java.lang.Boolean.parseBoolean((parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(),
String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))))
val populateMetaFields = java.lang.Boolean.parseBoolean(parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(),
String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())))
val dropPartitionColumns = parameters.get(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key()).map(_.toBoolean)
.getOrElse(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.defaultValue())
// register classes & schemas
@@ -556,12 +556,9 @@ object HoodieSparkSqlWriter {
} else {
false
}
val hoodieDF = if (populateMetaFields) {
HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace,
bulkInsertPartitionerRows, isGlobalIndex, dropPartitionColumns)
} else {
HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsertWithoutMetaFields(df)
}
val hoodieDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(df, writeConfig, bulkInsertPartitionerRows, dropPartitionColumns)
if (HoodieSparkUtils.isSpark2) {
hoodieDF.write.format("org.apache.hudi.internal")
.option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)