1
0

[HUDI-679] Make io package Spark free (#1460)

* [HUDI-679] Make io package Spark free
This commit is contained in:
leesf
2020-03-29 16:54:00 +08:00
committed by GitHub
parent ac73bdcdc3
commit 07c3c5d797
19 changed files with 136 additions and 58 deletions

View File

@@ -22,6 +22,7 @@ import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.avro.HoodieAvroWriteSupport
import org.apache.hudi.client.SparkTaskContextSupplier
import org.apache.hudi.common.HoodieJsonPayload
import org.apache.hudi.common.bloom.filter.{BloomFilter, BloomFilterFactory}
import org.apache.hudi.common.model.HoodieRecord
@@ -45,7 +46,7 @@ object SparkHelpers {
HoodieIndexConfig.DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_INDEX_FILTER_TYPE);
val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter)
val parquetConfig: HoodieParquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf, HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO.toDouble)
val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](instantTime, destinationFile, parquetConfig, schema)
val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](instantTime, destinationFile, parquetConfig, schema, new SparkTaskContextSupplier())
for (rec <- sourceRecords) {
val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
if (!keysToSkip.contains(key)) {

View File

@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client;
import org.apache.spark.TaskContext;
import java.io.Serializable;
import java.util.function.Supplier;
/**
* Spark task context supplier.
*/
public class SparkTaskContextSupplier implements Serializable {
public Supplier<Integer> getPartitionIdSupplier() {
return () -> TaskContext.getPartitionId();
}
public Supplier<Integer> getStageIdSupplier() {
return () -> TaskContext.get().stageId();
}
public Supplier<Long> getAttemptIdSupplier() {
return () -> TaskContext.get().taskAttemptId();
}
}

View File

@@ -51,6 +51,6 @@ public class BulkInsertMapFunction<T extends HoodieRecordPayload>
@Override
public Iterator<List<WriteStatus>> call(Integer partition, Iterator<HoodieRecord<T>> sortedRecordItr) {
return new CopyOnWriteLazyInsertIterable<>(sortedRecordItr, config, instantTime, hoodieTable,
fileIDPrefixes.get(partition));
fileIDPrefixes.get(partition), hoodieTable.getSparkTaskContextSupplier());
}
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.execution;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.model.HoodieRecord;
@@ -50,15 +51,18 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload>
protected final HoodieTable<T> hoodieTable;
protected final String idPrefix;
protected int numFilesWritten;
protected SparkTaskContextSupplier sparkTaskContextSupplier;
public CopyOnWriteLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
String instantTime, HoodieTable<T> hoodieTable, String idPrefix) {
String instantTime, HoodieTable<T> hoodieTable, String idPrefix,
SparkTaskContextSupplier sparkTaskContextSupplier) {
super(sortedRecordItr);
this.hoodieConfig = config;
this.instantTime = instantTime;
this.hoodieTable = hoodieTable;
this.idPrefix = idPrefix;
this.numFilesWritten = 0;
this.sparkTaskContextSupplier = sparkTaskContextSupplier;
}
// Used for caching HoodieRecord along with insertValue. We need this to offload computation work to buffering thread.
@@ -137,7 +141,7 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload>
// lazily initialize the handle, for the first time
if (handle == null) {
handle = new HoodieCreateHandle(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(),
getNextFileId(idPrefix));
getNextFileId(idPrefix), sparkTaskContextSupplier);
}
if (handle.canWrite(payload.record)) {
@@ -148,7 +152,7 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload>
statuses.add(handle.close());
// Need to handle the rejected payload & open new handle
handle = new HoodieCreateHandle(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(),
getNextFileId(idPrefix));
getNextFileId(idPrefix), sparkTaskContextSupplier);
handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload.
}
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.execution;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -35,8 +36,8 @@ import java.util.List;
public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extends CopyOnWriteLazyInsertIterable<T> {
public MergeOnReadLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
String instantTime, HoodieTable<T> hoodieTable, String idPfx) {
super(sortedRecordItr, config, instantTime, hoodieTable, idPfx);
String instantTime, HoodieTable<T> hoodieTable, String idPfx, SparkTaskContextSupplier sparkTaskContextSupplier) {
super(sortedRecordItr, config, instantTime, hoodieTable, idPfx, sparkTaskContextSupplier);
}
@Override
@@ -53,7 +54,7 @@ public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extend
// lazily initialize the handle, for the first time
if (handle == null) {
handle = new HoodieAppendHandle(hoodieConfig, instantTime, hoodieTable,
insertPayload.getPartitionPath(), getNextFileId(idPrefix));
insertPayload.getPartitionPath(), getNextFileId(idPrefix), sparkTaskContextSupplier);
}
if (handle.canWrite(insertPayload)) {
// write the payload, if the handle has capacity
@@ -64,7 +65,7 @@ public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extend
statuses.add(handle.getWriteStatus());
// Need to handle the rejected payload & open new handle
handle = new HoodieAppendHandle(hoodieConfig, instantTime, hoodieTable,
insertPayload.getPartitionPath(), getNextFileId(idPrefix));
insertPayload.getPartitionPath(), getNextFileId(idPrefix), sparkTaskContextSupplier);
handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload.
}
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.io;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
@@ -49,7 +50,6 @@ import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;
import org.apache.spark.util.SizeEstimator;
import java.io.IOException;
@@ -101,16 +101,16 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
private long insertRecordsWritten = 0;
public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
super(config, instantTime, partitionPath, fileId, hoodieTable);
String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr, SparkTaskContextSupplier sparkTaskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable, sparkTaskContextSupplier);
writeStatus.setStat(new HoodieDeltaWriteStat());
this.fileId = fileId;
this.recordItr = recordItr;
}
public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId) {
this(config, instantTime, hoodieTable, partitionPath, fileId, null);
String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, null, sparkTaskContextSupplier);
}
private void init(HoodieRecord record) {
@@ -137,7 +137,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
//save hoodie partition meta in the partition path
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, baseInstantTime,
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
partitionMetadata.trySave(TaskContext.getPartitionId());
partitionMetadata.trySave(getPartitionId());
this.writer = createLogWriter(fileSlice, baseInstantTime);
this.currentLogFile = writer.getLogFile();
((HoodieDeltaWriteStat) writeStatus.getStat()).setLogVersion(currentLogFile.getLogVersion());
@@ -163,7 +163,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
avroRecord = Option.of(rewriteRecord((GenericRecord) avroRecord.get()));
String seqId =
HoodieRecord.generateSequenceId(instantTime, TaskContext.getPartitionId(), recordIndex.getAndIncrement());
HoodieRecord.generateSequenceId(instantTime, getPartitionId(), recordIndex.getAndIncrement());
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord.get(), hoodieRecord.getRecordKey(),
hoodieRecord.getPartitionPath(), fileId);
HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord.get(), instantTime, seqId);

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.io;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
@@ -38,7 +39,6 @@ import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;
import java.io.IOException;
import java.util.Iterator;
@@ -56,8 +56,8 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
private boolean useWriterSchema = false;
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId) {
super(config, instantTime, partitionPath, fileId, hoodieTable);
String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable, sparkTaskContextSupplier);
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
@@ -66,10 +66,10 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
try {
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime,
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
partitionMetadata.trySave(TaskContext.getPartitionId());
partitionMetadata.trySave(getPartitionId());
createMarkerFile(partitionPath);
this.storageWriter =
HoodieStorageWriterFactory.getStorageWriter(instantTime, path, hoodieTable, config, writerSchema);
HoodieStorageWriterFactory.getStorageWriter(instantTime, path, hoodieTable, config, writerSchema, this.sparkTaskContextSupplier);
} catch (IOException e) {
throw new HoodieInsertException("Failed to initialize HoodieStorageWriter for path " + path, e);
}
@@ -80,8 +80,8 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
* Called by the compactor code path.
*/
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordIterator) {
this(config, instantTime, hoodieTable, partitionPath, fileId);
String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordIterator, SparkTaskContextSupplier sparkTaskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, sparkTaskContextSupplier);
this.recordIterator = recordIterator;
this.useWriterSchema = true;
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.io;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkConfigUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
@@ -46,7 +47,6 @@ import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;
import java.io.IOException;
import java.util.HashSet;
@@ -71,8 +71,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
private boolean useWriterSchema;
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId) {
super(config, instantTime, partitionPath, fileId, hoodieTable);
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable, sparkTaskContextSupplier);
init(fileId, recordItr);
init(fileId, partitionPath, hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get());
}
@@ -82,8 +82,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
*/
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
Map<String, HoodieRecord<T>> keyToNewRecords, String partitionPath, String fileId,
HoodieBaseFile dataFileToBeMerged) {
super(config, instantTime, partitionPath, fileId, hoodieTable);
HoodieBaseFile dataFileToBeMerged, SparkTaskContextSupplier sparkTaskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable, sparkTaskContextSupplier);
this.keyToNewRecords = keyToNewRecords;
this.useWriterSchema = true;
init(fileId, this.partitionPath, dataFileToBeMerged);
@@ -111,7 +111,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime,
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
partitionMetadata.trySave(TaskContext.getPartitionId());
partitionMetadata.trySave(getPartitionId());
oldFilePath = new Path(config.getBasePath() + "/" + partitionPath + "/" + latestValidFilePath);
String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/")
@@ -132,7 +132,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
// Create the writer for writing the new version file
storageWriter =
HoodieStorageWriterFactory.getStorageWriter(instantTime, newFilePath, hoodieTable, config, writerSchema);
HoodieStorageWriterFactory.getStorageWriter(instantTime, newFilePath, hoodieTable, config, writerSchema, sparkTaskContextSupplier);
} catch (IOException io) {
LOG.error("Error in update task at commit " + instantTime, io);
writeStatus.setGlobalError(io);

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.io;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -38,7 +39,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;
import java.io.IOException;
@@ -55,26 +55,27 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
protected final String partitionPath;
protected final String fileId;
protected final String writeToken;
protected final SparkTaskContextSupplier sparkTaskContextSupplier;
public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath,
String fileId, HoodieTable<T> hoodieTable) {
String fileId, HoodieTable<T> hoodieTable, SparkTaskContextSupplier sparkTaskContextSupplier) {
super(config, instantTime, hoodieTable);
this.partitionPath = partitionPath;
this.fileId = fileId;
this.writeToken = makeSparkWriteToken();
this.originalSchema = new Schema.Parser().parse(config.getSchema());
this.writerSchema = createHoodieWriteSchema(originalSchema);
this.timer = new HoodieTimer().startTimer();
this.writeStatus = (WriteStatus) ReflectionUtils.loadClass(config.getWriteStatusClassName(),
!hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction());
this.sparkTaskContextSupplier = sparkTaskContextSupplier;
this.writeToken = makeWriteToken();
}
/**
* Generate a write token based on the currently running spark task and its place in the spark dag.
*/
private static String makeSparkWriteToken() {
return FSUtils.makeWriteToken(TaskContext.getPartitionId(), TaskContext.get().stageId(),
TaskContext.get().taskAttemptId());
private String makeWriteToken() {
return FSUtils.makeWriteToken(getPartitionId(), getStageId(), getAttemptId());
}
public static Schema createHoodieWriteSchema(Schema originalSchema) {
@@ -171,4 +172,16 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
protected FileSystem getFileSystem() {
return hoodieTable.getMetaClient().getFs();
}
protected int getPartitionId() {
return sparkTaskContextSupplier.getPartitionIdSupplier().get();
}
protected int getStageId() {
return sparkTaskContextSupplier.getStageIdSupplier().get();
}
protected long getAttemptId() {
return sparkTaskContextSupplier.getAttemptIdSupplier().get();
}
}

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.io.storage;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.common.io.storage.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -32,7 +33,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.spark.TaskContext;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
@@ -52,9 +52,10 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
private final HoodieAvroWriteSupport writeSupport;
private final String instantTime;
private final Schema schema;
private final SparkTaskContextSupplier sparkTaskContextSupplier;
public HoodieParquetWriter(String instantTime, Path file, HoodieParquetConfig parquetConfig, Schema schema)
throws IOException {
public HoodieParquetWriter(String instantTime, Path file, HoodieParquetConfig parquetConfig,
Schema schema, SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()),
ParquetFileWriter.Mode.CREATE, parquetConfig.getWriteSupport(), parquetConfig.getCompressionCodecName(),
parquetConfig.getBlockSize(), parquetConfig.getPageSize(), parquetConfig.getPageSize(),
@@ -72,6 +73,7 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
this.writeSupport = parquetConfig.getWriteSupport();
this.instantTime = instantTime;
this.schema = schema;
this.sparkTaskContextSupplier = sparkTaskContextSupplier;
}
public static Configuration registerFileSystem(Path file, Configuration conf) {
@@ -85,7 +87,7 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
@Override
public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
String seqId =
HoodieRecord.generateSequenceId(instantTime, TaskContext.getPartitionId(), recordIndex.getAndIncrement());
HoodieRecord.generateSequenceId(instantTime, sparkTaskContextSupplier.getPartitionIdSupplier().get(), recordIndex.getAndIncrement());
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(),
file.getName());
HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId);

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.io.storage;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.common.bloom.filter.BloomFilter;
import org.apache.hudi.common.bloom.filter.BloomFilterFactory;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -39,19 +40,19 @@ import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
public class HoodieStorageWriterFactory {
public static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieStorageWriter<R> getStorageWriter(
String instantTime, Path path, HoodieTable<T> hoodieTable, HoodieWriteConfig config, Schema schema)
throws IOException {
String instantTime, Path path, HoodieTable<T> hoodieTable, HoodieWriteConfig config, Schema schema,
SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
final String name = path.getName();
final String extension = FSUtils.isLogFile(path) ? HOODIE_LOG.getFileExtension() : FSUtils.getFileExtension(name);
if (PARQUET.getFileExtension().equals(extension)) {
return newParquetStorageWriter(instantTime, path, config, schema, hoodieTable);
return newParquetStorageWriter(instantTime, path, config, schema, hoodieTable, sparkTaskContextSupplier);
}
throw new UnsupportedOperationException(extension + " format not supported yet.");
}
private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieStorageWriter<R> newParquetStorageWriter(
String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable)
throws IOException {
String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
BloomFilter filter = BloomFilterFactory
.createBloomFilter(config.getBloomFilterNumEntries(), config.getBloomFilterFPP(),
config.getDynamicBloomFilterMaxNumEntries(),
@@ -63,6 +64,6 @@ public class HoodieStorageWriterFactory {
config.getParquetBlockSize(), config.getParquetPageSize(), config.getParquetMaxFileSize(),
hoodieTable.getHadoopConf(), config.getParquetCompressionRatio());
return new HoodieParquetWriter<>(instantTime, path, parquetConfig, schema);
return new HoodieParquetWriter<>(instantTime, path, parquetConfig, schema, sparkTaskContextSupplier);
}
}

View File

@@ -222,13 +222,13 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
return new HoodieMergeHandle<>(config, instantTime, this, recordItr, partitionPath, fileId);
return new HoodieMergeHandle<>(config, instantTime, this, recordItr, partitionPath, fileId, sparkTaskContextSupplier);
}
protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords,
partitionPath, fileId, dataFileToBeMerged);
partitionPath, fileId, dataFileToBeMerged, sparkTaskContextSupplier);
}
public Iterator<List<WriteStatus>> handleInsert(String instantTime, String idPfx, Iterator<HoodieRecord<T>> recordItr)
@@ -238,13 +238,13 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
LOG.info("Empty partition");
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
}
return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime, this, idPfx);
return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime, this, idPfx, sparkTaskContextSupplier);
}
public Iterator<List<WriteStatus>> handleInsert(String instantTime, String partitionPath, String fileId,
Iterator<HoodieRecord<T>> recordItr) {
HoodieCreateHandle createHandle =
new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordItr);
new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordItr, sparkTaskContextSupplier);
createHandle.write();
return Collections.singletonList(Collections.singletonList(createHandle.close())).iterator();
}

View File

@@ -108,7 +108,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
return super.handleUpdate(instantTime, partitionPath, fileId, recordItr);
} else {
HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<>(config, instantTime, this,
partitionPath, fileId, recordItr);
partitionPath, fileId, recordItr, sparkTaskContextSupplier);
appendHandle.doAppend();
appendHandle.close();
return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator();
@@ -120,7 +120,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
throws Exception {
// If canIndexLogFiles, write inserts to log files else write inserts to parquet files
if (index.canIndexLogFiles()) {
return new MergeOnReadLazyInsertIterable<>(recordItr, config, instantTime, this, idPfx);
return new MergeOnReadLazyInsertIterable<>(recordItr, config, instantTime, this, idPfx, sparkTaskContextSupplier);
} else {
return super.handleInsert(instantTime, idPfx, recordItr);
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.table;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
@@ -84,6 +85,8 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
private SerializableConfiguration hadoopConfiguration;
private transient FileSystemViewManager viewManager;
protected final SparkTaskContextSupplier sparkTaskContextSupplier = new SparkTaskContextSupplier();
protected HoodieTable(HoodieWriteConfig config, JavaSparkContext jsc) {
this.config = config;
this.hadoopConfiguration = new SerializableConfiguration(jsc.hadoopConfiguration());
@@ -448,4 +451,8 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
private ConsistencyGuard getFailSafeConsistencyGuard(FileSystem fileSystem) {
return new FailSafeConsistencyGuard(fileSystem, config.getConsistencyGuardConfig());
}
public SparkTaskContextSupplier getSparkTaskContextSupplier() {
return sparkTaskContextSupplier;
}
}

View File

@@ -89,7 +89,7 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness {
.add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3));
HoodieCreateHandle createHandle =
new HoodieCreateHandle(config, "100", table, rowChange1.getPartitionPath(), "f1-0", insertRecords.iterator());
new HoodieCreateHandle(config, "100", table, rowChange1.getPartitionPath(), "f1-0", insertRecords.iterator(), supplier);
createHandle.write();
return createHandle.close();
}).collect();
@@ -119,7 +119,7 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness {
try {
HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config2, "101", table2,
updateRecords.iterator(), record1.getPartitionPath(), fileId);
updateRecords.iterator(), record1.getPartitionPath(), fileId, supplier);
Configuration conf = new Configuration();
AvroReadSupport.setAvroReadSchema(conf, mergeHandle.getWriterSchema());
List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(conf,

View File

@@ -17,6 +17,7 @@
package org.apache.hudi.common;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.TestHoodieClientBase;
import org.apache.hudi.common.minicluster.HdfsTestService;
import org.apache.hudi.common.model.HoodieTestUtils;
@@ -55,6 +56,8 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
protected transient HoodieTableMetaClient metaClient;
private static AtomicInteger instantGen = new AtomicInteger(1);
protected final SparkTaskContextSupplier supplier = new SparkTaskContextSupplier();
public String getNextInstant() {
return String.format("%09d", instantGen.getAndIncrement());
}

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.common;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.bloom.filter.BloomFilter;
@@ -230,7 +231,8 @@ public class HoodieClientTestUtils {
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024,
HoodieTestUtils.getDefaultHadoopConf(), Double.valueOf(HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO));
HoodieParquetWriter writer =
new HoodieParquetWriter(instantTime, new Path(basePath + "/" + partitionPath + "/" + filename), config, schema);
new HoodieParquetWriter(instantTime, new Path(basePath + "/" + partitionPath + "/" + filename), config,
schema, new SparkTaskContextSupplier());
int seqId = 1;
for (HoodieRecord record : records) {
GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get();

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.io.storage;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.TestHoodieClientBase;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -44,15 +45,16 @@ public class TestHoodieStorageWriterFactory extends TestHoodieClientBase {
final Path parquetPath = new Path(basePath + "/partition/path/f1_1-0-1_000.parquet");
final HoodieWriteConfig cfg = getConfig();
HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
SparkTaskContextSupplier supplier = new SparkTaskContextSupplier();
HoodieStorageWriter<IndexedRecord> parquetWriter = HoodieStorageWriterFactory.getStorageWriter(instantTime,
parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA);
parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
Assert.assertTrue(parquetWriter instanceof HoodieParquetWriter);
// other file format exception.
final Path logPath = new Path(basePath + "/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1");
try {
HoodieStorageWriter<IndexedRecord> logWriter = HoodieStorageWriterFactory.getStorageWriter(instantTime, logPath,
table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA);
table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
fail("should fail since log storage writer is not supported yet.");
} catch (Exception e) {
Assert.assertTrue(e instanceof UnsupportedOperationException);

View File

@@ -103,7 +103,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
when(record.getPartitionPath()).thenReturn(partitionPath);
String writeToken = FSUtils.makeWriteToken(TaskContext.getPartitionId(), TaskContext.get().stageId(),
TaskContext.get().taskAttemptId());
HoodieCreateHandle io = new HoodieCreateHandle(config, instantTime, table, partitionPath, fileName);
HoodieCreateHandle io = new HoodieCreateHandle(config, instantTime, table, partitionPath, fileName, supplier);
return Pair.of(io.makeNewPath(record.getPartitionPath()), writeToken);
}).collect().get(0);