1
0

[HUDI-3561] Avoid including whole MultipleSparkJobExecutionStrategy object into the closure for Spark to serialize (#4954)

- Avoid including whole MultipleSparkJobExecutionStrategy object into the closure for Spark to serialize
This commit is contained in:
Alexey Kudinkin
2022-03-07 10:42:03 -08:00
committed by GitHub
parent 3539578ccb
commit f0bcee3c01
2 changed files with 30 additions and 24 deletions

View File

@@ -1051,7 +1051,7 @@ public class HoodieWriteConfig extends HoodieConfig {
} }
public BulkInsertSortMode getBulkInsertSortMode() { public BulkInsertSortMode getBulkInsertSortMode() {
String sortMode = getString(BULK_INSERT_SORT_MODE); String sortMode = getStringOrDefault(BULK_INSERT_SORT_MODE);
return BulkInsertSortMode.valueOf(sortMode.toUpperCase()); return BulkInsertSortMode.valueOf(sortMode.toUpperCase());
} }

View File

@@ -18,6 +18,10 @@
package org.apache.hudi.client.clustering.run.strategy; package org.apache.hudi.client.clustering.run.strategy;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup; import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieClusteringPlan;
@@ -25,7 +29,7 @@ import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.ConcatenatingIterator; import org.apache.hudi.client.utils.ConcatenatingIterator;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ClusteringOperation; import org.apache.hudi.common.model.ClusteringOperation;
@@ -58,11 +62,6 @@ import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy; import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
@@ -246,12 +245,18 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
*/ */
private JavaRDD<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContext jsc, private JavaRDD<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContext jsc,
List<ClusteringOperation> clusteringOps) { List<ClusteringOperation> clusteringOps) {
return jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> { SerializableConfiguration hadoopConf = new SerializableConfiguration(getHoodieTable().getHadoopConf());
HoodieWriteConfig writeConfig = getWriteConfig();
// NOTE: It's crucial to make sure that we don't capture whole "this" object into the
// closure, as this might lead to issues attempting to serialize its nested fields
return jsc.parallelize(clusteringOps, clusteringOps.size())
.mapPartitions(clusteringOpsPartition -> {
List<Iterator<IndexedRecord>> iteratorsForPartition = new ArrayList<>(); List<Iterator<IndexedRecord>> iteratorsForPartition = new ArrayList<>();
clusteringOpsPartition.forEachRemaining(clusteringOp -> { clusteringOpsPartition.forEachRemaining(clusteringOp -> {
try { try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema())); Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema()));
HoodieFileReader<IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath())); HoodieFileReader<IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(clusteringOp.getDataFilePath()));
iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema)); iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema));
} catch (IOException e) { } catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
@@ -260,7 +265,8 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
}); });
return new ConcatenatingIterator<>(iteratorsForPartition); return new ConcatenatingIterator<>(iteratorsForPartition);
}).map(this::transform); })
.map(record -> transform(record, writeConfig));
} }
/** /**
@@ -279,12 +285,12 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
/** /**
* Transform IndexedRecord into HoodieRecord. * Transform IndexedRecord into HoodieRecord.
*/ */
private HoodieRecord<T> transform(IndexedRecord indexedRecord) { private static <T> HoodieRecord<T> transform(IndexedRecord indexedRecord, HoodieWriteConfig writeConfig) {
GenericRecord record = (GenericRecord) indexedRecord; GenericRecord record = (GenericRecord) indexedRecord;
Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty(); Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
if (!getWriteConfig().populateMetaFields()) { if (!writeConfig.populateMetaFields()) {
try { try {
keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(getWriteConfig().getProps()))); keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps()));
} catch (IOException e) { } catch (IOException e) {
throw new HoodieIOException("Only BaseKeyGenerators are supported when meta columns are disabled ", e); throw new HoodieIOException("Only BaseKeyGenerators are supported when meta columns are disabled ", e);
} }