[HUDI-3140] Fix bulk_insert failure on Spark 3.2.0 (#4498)
This commit is contained in:
@@ -31,6 +31,7 @@ import org.apache.spark.sql.connector.expressions.Transform;
|
|||||||
import org.apache.spark.sql.types.StructType;
|
import org.apache.spark.sql.types.StructType;
|
||||||
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
|
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.apache.hudi.DataSourceUtils.mayBeOverwriteParquetWriteLegacyFormatProp;
|
import static org.apache.hudi.DataSourceUtils.mayBeOverwriteParquetWriteLegacyFormatProp;
|
||||||
@@ -55,11 +56,13 @@ public class DefaultSource extends BaseDefaultSource implements TableProvider {
|
|||||||
HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()));
|
HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()));
|
||||||
boolean arePartitionRecordsSorted = Boolean.parseBoolean(properties.getOrDefault(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED,
|
boolean arePartitionRecordsSorted = Boolean.parseBoolean(properties.getOrDefault(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED,
|
||||||
Boolean.toString(HoodieInternalConfig.DEFAULT_BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED)));
|
Boolean.toString(HoodieInternalConfig.DEFAULT_BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED)));
|
||||||
|
// Create a new map as the properties is an unmodifiableMap on Spark 3.2.0
|
||||||
|
Map<String, String> newProps = new HashMap<>(properties);
|
||||||
// Auto set the value of "hoodie.parquet.writeLegacyFormat.enabled"
|
// Auto set the value of "hoodie.parquet.writeLegacyFormat.enabled"
|
||||||
mayBeOverwriteParquetWriteLegacyFormatProp(properties, schema);
|
mayBeOverwriteParquetWriteLegacyFormatProp(newProps, schema);
|
||||||
// 1st arg to createHoodieConfig is not really required to be set. but passing it anyways.
|
// 1st arg to createHoodieConfig is not really required to be set. but passing it anyways.
|
||||||
HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(properties.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()), path, tblName, properties);
|
HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(newProps.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()), path, tblName, newProps);
|
||||||
return new HoodieDataSourceInternalTable(instantTime, config, schema, getSparkSession(),
|
return new HoodieDataSourceInternalTable(instantTime, config, schema, getSparkSession(),
|
||||||
getConfiguration(), properties, populateMetaFields, arePartitionRecordsSorted);
|
getConfiguration(), newProps, populateMetaFields, arePartitionRecordsSorted);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user