1
0

[HUDI-770] Organize upsert/insert API implementation under a single package (#1495)

This commit is contained in:
Balaji Varadarajan
2020-04-12 23:11:00 -07:00
committed by GitHub
parent 447ba3bae6
commit 17bf930342
39 changed files with 2703 additions and 859 deletions

View File

@@ -166,6 +166,18 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
postCommit(metadata, instantTime, extraMetadata);
emitCommitMetrics(instantTime, metadata, actionType);
LOG.info("Committed " + instantTime);
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
e);
}
return true;
}
void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
try {
if (writeContext != null) {
long durationInMs = metrics.getDurationInMs(writeContext.stop());
@@ -173,15 +185,10 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
metadata, actionType);
writeContext = null;
}
LOG.info("Committed " + instantTime);
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
e);
} catch (ParseException e) {
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime
+ "Instant time is not of valid format", e);
}
return true;
}
/**
@@ -189,10 +196,9 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
* @param metadata Commit Metadata corresponding to committed instant
* @param instantTime Instant Time
* @param extraMetadata Additional Metadata passed by user
* @throws IOException in case of error
*/
protected abstract void postCommit(HoodieCommitMetadata metadata, String instantTime,
Option<Map<String, String>> extraMetadata) throws IOException;
Option<Map<String, String>> extraMetadata);
/**
* Finalize Write operation.

View File

@@ -26,7 +26,6 @@ import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.utils.SparkConfigUtils;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
@@ -50,28 +49,22 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.BulkInsertMapFunction;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.HoodieCommitArchiveLog;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import com.codahale.metrics.Timer;
import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -81,7 +74,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import scala.Tuple2;
@@ -176,22 +168,11 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT);
setOperationType(WriteOperationType.UPSERT);
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =
combineOnCondition(config.shouldCombineBeforeUpsert(), records, config.getUpsertShuffleParallelism());
Timer.Context indexTimer = metrics.getIndexCtx();
// perform index loop up to get existing location of records
JavaRDD<HoodieRecord<T>> taggedRecords = getIndex().tagLocation(dedupedRecords, jsc, table);
metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
return upsertRecordsInternal(taggedRecords, instantTime, table, true);
} catch (Throwable e) {
if (e instanceof HoodieUpsertException) {
throw (HoodieUpsertException) e;
}
throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
HoodieWriteMetadata result = table.upsert(jsc,instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
}
return postWrite(result, instantTime, table);
}
/**
@@ -206,14 +187,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED);
setOperationType(WriteOperationType.UPSERT_PREPPED);
try {
return upsertRecordsInternal(preppedRecords, instantTime, table, true);
} catch (Throwable e) {
if (e instanceof HoodieUpsertException) {
throw (HoodieUpsertException) e;
}
throw new HoodieUpsertException("Failed to upsert prepared records for commit time " + instantTime, e);
}
HoodieWriteMetadata result = table.upsertPrepped(jsc,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
}
/**
@@ -229,18 +204,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT);
setOperationType(WriteOperationType.INSERT);
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =
combineOnCondition(config.shouldCombineBeforeInsert(), records, config.getInsertShuffleParallelism());
return upsertRecordsInternal(dedupedRecords, instantTime, table, false);
} catch (Throwable e) {
if (e instanceof HoodieInsertException) {
throw e;
}
throw new HoodieInsertException("Failed to insert for commit time " + instantTime, e);
}
HoodieWriteMetadata result = table.insert(jsc,instantTime, records);
return postWrite(result, instantTime, table);
}
/**
@@ -257,14 +222,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED);
setOperationType(WriteOperationType.INSERT_PREPPED);
try {
return upsertRecordsInternal(preppedRecords, instantTime, table, false);
} catch (Throwable e) {
if (e instanceof HoodieInsertException) {
throw e;
}
throw new HoodieInsertException("Failed to insert prepared records for commit time " + instantTime, e);
}
HoodieWriteMetadata result = table.insertPrepped(jsc,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
}
/**
@@ -301,18 +260,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT);
setOperationType(WriteOperationType.BULK_INSERT);
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =
combineOnCondition(config.shouldCombineBeforeInsert(), records, config.getInsertShuffleParallelism());
return bulkInsertInternal(dedupedRecords, instantTime, table, bulkInsertPartitioner);
} catch (Throwable e) {
if (e instanceof HoodieInsertException) {
throw e;
}
throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
}
HoodieWriteMetadata result = table.bulkInsert(jsc,instantTime, records, bulkInsertPartitioner);
return postWrite(result, instantTime, table);
}
/**
@@ -335,14 +284,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED);
setOperationType(WriteOperationType.BULK_INSERT_PREPPED);
try {
return bulkInsertInternal(preppedRecords, instantTime, table, bulkInsertPartitioner);
} catch (Throwable e) {
if (e instanceof HoodieInsertException) {
throw e;
}
throw new HoodieInsertException("Failed to bulk insert prepared records for commit time " + instantTime, e);
}
HoodieWriteMetadata result = table.bulkInsertPrepped(jsc,instantTime, preppedRecords, bulkInsertPartitioner);
return postWrite(result, instantTime, table);
}
/**
@@ -356,170 +299,59 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.DELETE);
setOperationType(WriteOperationType.DELETE);
try {
// De-dupe/merge if needed
JavaRDD<HoodieKey> dedupedKeys =
config.shouldCombineBeforeDelete() ? deduplicateKeys(keys) : keys;
JavaRDD<HoodieRecord<T>> dedupedRecords =
dedupedKeys.map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
Timer.Context indexTimer = metrics.getIndexCtx();
// perform index loop up to get existing location of records
JavaRDD<HoodieRecord<T>> taggedRecords = getIndex().tagLocation(dedupedRecords, jsc, table);
// filter out non existant keys/records
JavaRDD<HoodieRecord<T>> taggedValidRecords = taggedRecords.filter(HoodieRecord::isCurrentLocationKnown);
if (!taggedValidRecords.isEmpty()) {
metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
return upsertRecordsInternal(taggedValidRecords, instantTime, table, true);
} else {
// if entire set of keys are non existent
saveWorkloadProfileMetadataToInflight(new WorkloadProfile(jsc.emptyRDD()), table, instantTime);
JavaRDD<WriteStatus> writeStatusRDD = jsc.emptyRDD();
commitOnAutoCommit(instantTime, writeStatusRDD, table.getMetaClient().getCommitActionType());
return writeStatusRDD;
}
} catch (Throwable e) {
if (e instanceof HoodieUpsertException) {
throw (HoodieUpsertException) e;
}
throw new HoodieUpsertException("Failed to delete for commit time " + instantTime, e);
}
}
private JavaRDD<WriteStatus> bulkInsertInternal(JavaRDD<HoodieRecord<T>> dedupedRecords, String instantTime,
HoodieTable<T> table, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
final JavaRDD<HoodieRecord<T>> repartitionedRecords;
final int parallelism = config.getBulkInsertShuffleParallelism();
if (bulkInsertPartitioner.isPresent()) {
repartitionedRecords = bulkInsertPartitioner.get().repartitionRecords(dedupedRecords, parallelism);
} else {
// Now, sort the records and line them up nicely for loading.
repartitionedRecords = dedupedRecords.sortBy(record -> {
// Let's use "partitionPath + key" as the sort key. Spark, will ensure
// the records split evenly across RDD partitions, such that small partitions fit
// into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
return String.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
}, true, parallelism);
}
// generate new file ID prefixes for each output partition
final List<String> fileIDPrefixes =
IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList());
table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
table.getMetaClient().getCommitActionType(), instantTime), Option.empty());
JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(instantTime, config, table, fileIDPrefixes), true)
.flatMap(List::iterator);
return updateIndexAndCommitIfNeeded(writeStatusRDD, table, instantTime);
}
private JavaRDD<HoodieRecord<T>> combineOnCondition(boolean condition, JavaRDD<HoodieRecord<T>> records,
int parallelism) {
return condition ? deduplicateRecords(records, parallelism) : records;
HoodieWriteMetadata result = table.delete(jsc,instantTime, keys);
return postWrite(result, instantTime, table);
}
/**
* Save the workload profile in an intermediate file (here re-using commit files) This is useful when performing
* rollback for MOR tables. Only updates are recorded in the workload profile metadata since updates to log blocks
* are unknown across batches Inserts (which are new parquet files) are rolled back based on commit time. // TODO :
* Create a new WorkloadProfile metadata file instead of using HoodieCommitMetadata
* Common method containing steps to be performed after write (upsert/insert/..) operations including auto-commit.
* @param result Commit Action Result
* @param instantTime Instant Time
* @param hoodieTable Hoodie Table
* @return Write Status
*/
private void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, HoodieTable<T> table, String instantTime)
throws HoodieCommitException {
try {
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
profile.getPartitionPaths().forEach(path -> {
WorkloadStat partitionStat = profile.getWorkloadStat(path.toString());
partitionStat.getUpdateLocationToCount().forEach((key, value) -> {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId(key);
// TODO : Write baseCommitTime is possible here ?
writeStat.setPrevCommit(value.getKey());
writeStat.setNumUpdateWrites(value.getValue());
metadata.addWriteStat(path.toString(), writeStat);
});
});
metadata.setOperationType(getOperationType());
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
String commitActionType = table.getMetaClient().getCommitActionType();
HoodieInstant requested = new HoodieInstant(State.REQUESTED, commitActionType, instantTime);
activeTimeline.transitionRequestedToInflight(requested,
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} catch (IOException io) {
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io);
private JavaRDD<WriteStatus> postWrite(HoodieWriteMetadata result, String instantTime, HoodieTable<T> hoodieTable) {
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
}
}
private JavaRDD<WriteStatus> upsertRecordsInternal(JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime,
HoodieTable<T> hoodieTable, final boolean isUpsert) {
// Cache the tagged records, so we don't end up computing both
// TODO: Consistent contract in HoodieWriteClient regarding preppedRecord storage level handling
if (preppedRecords.getStorageLevel() == StorageLevel.NONE()) {
preppedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER());
} else {
LOG.info("RDD PreppedRecords was persisted at: " + preppedRecords.getStorageLevel());
}
WorkloadProfile profile = null;
if (hoodieTable.isWorkloadProfileNeeded()) {
profile = new WorkloadProfile(preppedRecords);
LOG.info("Workload profile :" + profile);
saveWorkloadProfileMetadataToInflight(profile, hoodieTable, instantTime);
}
// partition using the insert partitioner
final Partitioner partitioner = getPartitioner(hoodieTable, isUpsert, profile);
JavaRDD<HoodieRecord<T>> partitionedRecords = partition(preppedRecords, partitioner);
JavaRDD<WriteStatus> writeStatusRDD = partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> {
if (isUpsert) {
return hoodieTable.handleUpsertPartition(instantTime, partition, recordItr, partitioner);
} else {
return hoodieTable.handleInsertPartition(instantTime, partition, recordItr, partitioner);
if (result.isCommitted()) {
// Perform post commit operations.
if (result.getFinalizeDuration().isPresent()) {
metrics.updateFinalizeWriteMetrics(result.getFinalizeDuration().get().toMillis(),
result.getWriteStats().get().size());
}
}, true).flatMap(List::iterator);
return updateIndexAndCommitIfNeeded(writeStatusRDD, hoodieTable, instantTime);
}
postCommit(result.getCommitMetadata().get(), instantTime, Option.empty());
private Partitioner getPartitioner(HoodieTable table, boolean isUpsert, WorkloadProfile profile) {
if (isUpsert) {
return table.getUpsertPartitioner(profile, jsc);
} else {
return table.getInsertPartitioner(profile, jsc);
emitCommitMetrics(instantTime, result.getCommitMetadata().get(),
hoodieTable.getMetaClient().getCommitActionType());
}
}
private JavaRDD<HoodieRecord<T>> partition(JavaRDD<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
return dedupedRecords.mapToPair(
record -> new Tuple2<>(new Tuple2<>(record.getKey(), Option.ofNullable(record.getCurrentLocation())), record))
.partitionBy(partitioner).map(Tuple2::_2);
return result.getWriteStatuses();
}
@Override
protected void postCommit(HoodieCommitMetadata metadata, String instantTime,
Option<Map<String, String>> extraMetadata) throws IOException {
// Do an inline compaction if enabled
if (config.isInlineCompaction()) {
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true");
forceCompact(extraMetadata);
} else {
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false");
}
// We cannot have unbounded commit files. Archive commits if we have to archive
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(config, createMetaClient(true));
archiveLog.archiveIfRequired(jsc);
if (config.isAutoClean()) {
// Call clean to cleanup if there is anything to cleanup after the commit,
LOG.info("Auto cleaning is enabled. Running cleaner now");
clean(instantTime);
} else {
LOG.info("Auto cleaning is not enabled. Not running cleaner now");
Option<Map<String, String>> extraMetadata) {
try {
// Do an inline compaction if enabled
if (config.isInlineCompaction()) {
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true");
forceCompact(extraMetadata);
} else {
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false");
}
// We cannot have unbounded commit files. Archive commits if we have to archive
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(config, createMetaClient(true));
archiveLog.archiveIfRequired(jsc);
if (config.isAutoClean()) {
// Call clean to cleanup if there is anything to cleanup after the commit,
LOG.info("Auto cleaning is enabled. Running cleaner now");
clean(instantTime);
} else {
LOG.info("Auto cleaning is not enabled. Not running cleaner now");
}
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
}
@@ -977,47 +809,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
commitCompaction(writeStatuses, table, compactionInstantTime, true, mergedMetaData);
}
/**
* Deduplicate Hoodie records, using the given deduplication function.
*
* @param records hoodieRecords to deduplicate
* @param parallelism parallelism or partitions to be used while reducing/deduplicating
* @return RDD of HoodieRecord already be deduplicated
*/
JavaRDD<HoodieRecord<T>> deduplicateRecords(JavaRDD<HoodieRecord<T>> records, int parallelism) {
boolean isIndexingGlobal = getIndex().isGlobal();
return records.mapToPair(record -> {
HoodieKey hoodieKey = record.getKey();
// If index used is global, then records are expected to differ in their partitionPath
Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
return new Tuple2<>(key, record);
}).reduceByKey((rec1, rec2) -> {
@SuppressWarnings("unchecked")
T reducedData = (T) rec1.getData().preCombine(rec2.getData());
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
return new HoodieRecord<T>(rec1.getKey(), reducedData);
}, parallelism).map(Tuple2::_2);
}
/**
* Deduplicate Hoodie records, using the given deduplication function.
*
* @param keys RDD of HoodieKey to deduplicate
* @return RDD of HoodieKey already be deduplicated
*/
JavaRDD<HoodieKey> deduplicateKeys(JavaRDD<HoodieKey> keys) {
boolean isIndexingGlobal = getIndex().isGlobal();
if (isIndexingGlobal) {
return keys.keyBy(HoodieKey::getRecordKey)
.reduceByKey((key1, key2) -> key1)
.values();
} else {
return keys.distinct();
}
}
/**
* Cleanup all pending commits.
*/

View File

@@ -32,26 +32,30 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieRollingStatMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.NumericUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.CopyOnWriteLazyInsertIterable;
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.table.action.clean.CleanActionExecutor;
import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.BulkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.BulkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.DeleteCommitActionExecutor;
import org.apache.hudi.table.action.commit.InsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.InsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.UpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.UpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.rollback.RollbackHelper;
import org.apache.hudi.table.rollback.RollbackRequest;
import org.apache.log4j.LogManager;
@@ -59,21 +63,16 @@ import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -94,21 +93,44 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
@Override
public Partitioner getUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
if (profile == null) {
throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
}
return new UpsertPartitioner(profile, jsc);
public HoodieWriteMetadata upsert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records) {
return new UpsertCommitActionExecutor<>(jsc, config, this, instantTime, records).execute();
}
@Override
public Partitioner getInsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
return getUpsertPartitioner(profile, jsc);
public HoodieWriteMetadata insert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records) {
return new InsertCommitActionExecutor<>(jsc, config, this, instantTime, records).execute();
}
@Override
public boolean isWorkloadProfileNeeded() {
return true;
public HoodieWriteMetadata bulkInsert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
return new BulkInsertCommitActionExecutor<>(jsc, config,
this, instantTime, records, bulkInsertPartitioner).execute();
}
@Override
public HoodieWriteMetadata delete(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieKey> keys) {
return new DeleteCommitActionExecutor<>(jsc, config, this, instantTime, keys).execute();
}
@Override
public HoodieWriteMetadata upsertPrepped(JavaSparkContext jsc, String instantTime,
JavaRDD<HoodieRecord<T>> preppedRecords) {
return new UpsertPreppedCommitActionExecutor<>(jsc, config, this, instantTime, preppedRecords).execute();
}
@Override
public HoodieWriteMetadata insertPrepped(JavaSparkContext jsc, String instantTime,
JavaRDD<HoodieRecord<T>> preppedRecords) {
return new InsertPreppedCommitActionExecutor<>(jsc, config, this, instantTime, preppedRecords).execute();
}
@Override
public HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, String instantTime,
JavaRDD<HoodieRecord<T>> preppedRecords, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
return new BulkInsertPreppedCommitActionExecutor<>(jsc, config,
this, instantTime, preppedRecords, bulkInsertPartitioner).execute();
}
@Override
@@ -122,19 +144,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table");
}
public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String partitionPath, String fileId,
Iterator<HoodieRecord<T>> recordItr)
throws IOException {
// This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
if (!recordItr.hasNext()) {
LOG.info("Empty partition with fileId => " + fileId);
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
}
// these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime, partitionPath, fileId, recordItr);
return handleUpdateInternal(upsertHandle, instantTime, fileId);
}
public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException {
// these are updates
@@ -173,26 +182,12 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator();
}
protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
return new HoodieMergeHandle<>(config, instantTime, this, recordItr, partitionPath, fileId, sparkTaskContextSupplier);
}
protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords,
partitionPath, fileId, dataFileToBeMerged, sparkTaskContextSupplier);
}
public Iterator<List<WriteStatus>> handleInsert(String instantTime, String idPfx, Iterator<HoodieRecord<T>> recordItr)
throws Exception {
// This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
if (!recordItr.hasNext()) {
LOG.info("Empty partition");
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
}
return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime, this, idPfx, sparkTaskContextSupplier);
}
public Iterator<List<WriteStatus>> handleInsert(String instantTime, String partitionPath, String fileId,
Iterator<HoodieRecord<T>> recordItr) {
HoodieCreateHandle createHandle =
@@ -201,34 +196,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
return Collections.singletonList(Collections.singletonList(createHandle.close())).iterator();
}
@SuppressWarnings("unchecked")
@Override
public Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr,
Partitioner partitioner) {
UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner;
BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
BucketType btype = binfo.bucketType;
try {
if (btype.equals(BucketType.INSERT)) {
return handleInsert(instantTime, binfo.fileIdPrefix, recordItr);
} else if (btype.equals(BucketType.UPDATE)) {
return handleUpdate(instantTime, binfo.partitionPath, binfo.fileIdPrefix, recordItr);
} else {
throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition);
}
} catch (Throwable t) {
String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
LOG.error(msg, t);
throw new HoodieUpsertException(msg, t);
}
}
@Override
public Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr,
Partitioner partitioner) {
return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
}
@Override
public HoodieCleanMetadata clean(JavaSparkContext jsc, String cleanInstantTime) {
return new CleanActionExecutor(jsc, config, this, cleanInstantTime).execute();
@@ -389,242 +356,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
}
/**
* Packs incoming records to be upserted, into buckets (1 bucket = 1 RDD partition).
*/
class UpsertPartitioner extends Partitioner {
/**
* List of all small files to be corrected.
*/
List<SmallFile> smallFiles = new ArrayList<>();
/**
* Total number of RDD partitions, is determined by total buckets we want to pack the incoming workload into.
*/
private int totalBuckets = 0;
/**
* Stat for the current workload. Helps in determining total inserts, upserts etc.
*/
private WorkloadStat globalStat;
/**
* Helps decide which bucket an incoming update should go to.
*/
private HashMap<String, Integer> updateLocationToBucket;
/**
* Helps us pack inserts into 1 or more buckets depending on number of incoming records.
*/
private HashMap<String, List<InsertBucket>> partitionPathToInsertBuckets;
/**
* Remembers what type each bucket is for later.
*/
private HashMap<Integer, BucketInfo> bucketInfoMap;
/**
* Rolling stats for files.
*/
protected HoodieRollingStatMetadata rollingStatMetadata;
UpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
updateLocationToBucket = new HashMap<>();
partitionPathToInsertBuckets = new HashMap<>();
bucketInfoMap = new HashMap<>();
globalStat = profile.getGlobalStat();
rollingStatMetadata = getRollingStats();
assignUpdates(profile);
assignInserts(profile, jsc);
LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n"
+ "Partition to insert buckets => " + partitionPathToInsertBuckets + ", \n"
+ "UpdateLocations mapped to buckets =>" + updateLocationToBucket);
}
private void assignUpdates(WorkloadProfile profile) {
// each update location gets a partition
Set<Map.Entry<String, WorkloadStat>> partitionStatEntries = profile.getPartitionPathStatMap().entrySet();
for (Map.Entry<String, WorkloadStat> partitionStat : partitionStatEntries) {
for (Map.Entry<String, Pair<String, Long>> updateLocEntry :
partitionStat.getValue().getUpdateLocationToCount().entrySet()) {
addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey());
}
}
}
private int addUpdateBucket(String partitionPath, String fileIdHint) {
int bucket = totalBuckets;
updateLocationToBucket.put(fileIdHint, bucket);
BucketInfo bucketInfo = new BucketInfo();
bucketInfo.bucketType = BucketType.UPDATE;
bucketInfo.fileIdPrefix = fileIdHint;
bucketInfo.partitionPath = partitionPath;
bucketInfoMap.put(totalBuckets, bucketInfo);
totalBuckets++;
return bucket;
}
private void assignInserts(WorkloadProfile profile, JavaSparkContext jsc) {
// for new inserts, compute buckets depending on how many records we have for each partition
Set<String> partitionPaths = profile.getPartitionPaths();
long averageRecordSize =
averageBytesPerRecord(metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
config.getCopyOnWriteRecordSizeEstimate());
LOG.info("AvgRecordSize => " + averageRecordSize);
Map<String, List<SmallFile>> partitionSmallFilesMap =
getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), jsc);
for (String partitionPath : partitionPaths) {
WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
if (pStat.getNumInserts() > 0) {
List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath);
this.smallFiles.addAll(smallFiles);
LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles);
long totalUnassignedInserts = pStat.getNumInserts();
List<Integer> bucketNumbers = new ArrayList<>();
List<Long> recordsPerBucket = new ArrayList<>();
// first try packing this into one of the smallFiles
for (SmallFile smallFile : smallFiles) {
long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize,
totalUnassignedInserts);
if (recordsToAppend > 0 && totalUnassignedInserts > 0) {
// create a new bucket or re-use an existing bucket
int bucket;
if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) {
bucket = updateLocationToBucket.get(smallFile.location.getFileId());
LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket);
} else {
bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId());
LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket);
}
bucketNumbers.add(bucket);
recordsPerBucket.add(recordsToAppend);
totalUnassignedInserts -= recordsToAppend;
}
}
// if we have anything more, create new insert buckets, like normal
if (totalUnassignedInserts > 0) {
long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize();
if (config.shouldAutoTuneInsertSplits()) {
insertRecordsPerBucket = config.getParquetMaxFileSize() / averageRecordSize;
}
int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket);
LOG.info("After small file assignment: unassignedInserts => " + totalUnassignedInserts
+ ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket);
for (int b = 0; b < insertBuckets; b++) {
bucketNumbers.add(totalBuckets);
recordsPerBucket.add(totalUnassignedInserts / insertBuckets);
BucketInfo bucketInfo = new BucketInfo();
bucketInfo.bucketType = BucketType.INSERT;
bucketInfo.partitionPath = partitionPath;
bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
bucketInfoMap.put(totalBuckets, bucketInfo);
totalBuckets++;
}
}
// Go over all such buckets, and assign weights as per amount of incoming inserts.
List<InsertBucket> insertBuckets = new ArrayList<>();
for (int i = 0; i < bucketNumbers.size(); i++) {
InsertBucket bkt = new InsertBucket();
bkt.bucketNumber = bucketNumbers.get(i);
bkt.weight = (1.0 * recordsPerBucket.get(i)) / pStat.getNumInserts();
insertBuckets.add(bkt);
}
LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets);
partitionPathToInsertBuckets.put(partitionPath, insertBuckets);
}
}
}
private Map<String, List<SmallFile>> getSmallFilesForPartitions(List<String> partitionPaths, JavaSparkContext jsc) {
Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
if (partitionPaths != null && partitionPaths.size() > 0) {
JavaRDD<String> partitionPathRdds = jsc.parallelize(partitionPaths, partitionPaths.size());
partitionSmallFilesMap = partitionPathRdds.mapToPair((PairFunction<String, String, List<SmallFile>>)
partitionPath -> new Tuple2<>(partitionPath, getSmallFiles(partitionPath))).collectAsMap();
}
return partitionSmallFilesMap;
}
/**
* Returns a list of small files in the given partition path.
*/
protected List<SmallFile> getSmallFiles(String partitionPath) {
// smallFiles only for partitionPath
List<SmallFile> smallFileLocations = new ArrayList<>();
HoodieTimeline commitTimeline = getCompletedCommitsTimeline();
if (!commitTimeline.empty()) { // if we have some commits
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
List<HoodieBaseFile> allFiles = getBaseFileOnlyView()
.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
for (HoodieBaseFile file : allFiles) {
if (file.getFileSize() < config.getParquetSmallFileLimit()) {
String filename = file.getFileName();
SmallFile sf = new SmallFile();
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
sf.sizeBytes = file.getFileSize();
smallFileLocations.add(sf);
}
}
}
return smallFileLocations;
}
public BucketInfo getBucketInfo(int bucketNumber) {
return bucketInfoMap.get(bucketNumber);
}
public List<InsertBucket> getInsertBuckets(String partitionPath) {
return partitionPathToInsertBuckets.get(partitionPath);
}
@Override
public int numPartitions() {
return totalBuckets;
}
@Override
public int getPartition(Object key) {
Tuple2<HoodieKey, Option<HoodieRecordLocation>> keyLocation =
(Tuple2<HoodieKey, Option<HoodieRecordLocation>>) key;
if (keyLocation._2().isPresent()) {
HoodieRecordLocation location = keyLocation._2().get();
return updateLocationToBucket.get(location.getFileId());
} else {
List<InsertBucket> targetBuckets = partitionPathToInsertBuckets.get(keyLocation._1().getPartitionPath());
// pick the target bucket to use based on the weights.
double totalWeight = 0.0;
final long totalInserts = Math.max(1, globalStat.getNumInserts());
final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation._1().getRecordKey());
final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts;
for (InsertBucket insertBucket : targetBuckets) {
totalWeight += insertBucket.weight;
if (r <= totalWeight) {
return insertBucket.bucketNumber;
}
}
// return first one, by default
return targetBuckets.get(0).bucketNumber;
}
}
}
protected HoodieRollingStatMetadata getRollingStats() {
return null;
}
/**
* Obtains the average record size based on records written during previous commits. Used for estimating how many
* records pack into one file.

View File

@@ -24,9 +24,8 @@ import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -39,24 +38,26 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.MergeOnReadLazyInsertIterable;
import org.apache.hudi.io.HoodieAppendHandle;
import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.hudi.table.action.deltacommit.BulkInsertDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.BulkInsertPreppedDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.DeleteDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.InsertDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.InsertPreppedDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.UpsertDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.UpsertPreppedDeltaCommitActionExecutor;
import org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor;
import org.apache.hudi.table.rollback.RollbackHelper;
import org.apache.hudi.table.rollback.RollbackRequest;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -82,49 +83,49 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
private static final Logger LOG = LogManager.getLogger(HoodieMergeOnReadTable.class);
// UpsertPartitioner for MergeOnRead table type
private MergeOnReadUpsertPartitioner mergeOnReadUpsertPartitioner;
HoodieMergeOnReadTable(HoodieWriteConfig config, JavaSparkContext jsc, HoodieTableMetaClient metaClient) {
super(config, jsc, metaClient);
}
@Override
public Partitioner getUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
if (profile == null) {
throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
}
mergeOnReadUpsertPartitioner = new MergeOnReadUpsertPartitioner(profile, jsc);
return mergeOnReadUpsertPartitioner;
public HoodieWriteMetadata upsert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records) {
return new UpsertDeltaCommitActionExecutor<>(jsc, config, this, instantTime, records).execute();
}
@Override
public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String partitionPath,
String fileId, Iterator<HoodieRecord<T>> recordItr)
throws IOException {
LOG.info("Merging updates for commit " + instantTime + " for file " + fileId);
if (!index.canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
LOG.info("Small file corrections for updates for commit " + instantTime + " for file " + fileId);
return super.handleUpdate(instantTime, partitionPath, fileId, recordItr);
} else {
HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<>(config, instantTime, this,
partitionPath, fileId, recordItr, sparkTaskContextSupplier);
appendHandle.doAppend();
appendHandle.close();
return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator();
}
public HoodieWriteMetadata insert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records) {
return new InsertDeltaCommitActionExecutor<>(jsc, config, this, instantTime, records).execute();
}
@Override
public Iterator<List<WriteStatus>> handleInsert(String instantTime, String idPfx, Iterator<HoodieRecord<T>> recordItr)
throws Exception {
// If canIndexLogFiles, write inserts to log files else write inserts to parquet files
if (index.canIndexLogFiles()) {
return new MergeOnReadLazyInsertIterable<>(recordItr, config, instantTime, this, idPfx, sparkTaskContextSupplier);
} else {
return super.handleInsert(instantTime, idPfx, recordItr);
}
public HoodieWriteMetadata bulkInsert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
return new BulkInsertDeltaCommitActionExecutor<>(jsc, config,
this, instantTime, records, bulkInsertPartitioner).execute();
}
@Override
public HoodieWriteMetadata delete(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieKey> keys) {
return new DeleteDeltaCommitActionExecutor<>(jsc, config, this, instantTime, keys).execute();
}
@Override
public HoodieWriteMetadata upsertPrepped(JavaSparkContext jsc, String instantTime,
JavaRDD<HoodieRecord<T>> preppedRecords) {
return new UpsertPreppedDeltaCommitActionExecutor<>(jsc, config, this, instantTime, preppedRecords).execute();
}
@Override
public HoodieWriteMetadata insertPrepped(JavaSparkContext jsc, String instantTime,
JavaRDD<HoodieRecord<T>> preppedRecords) {
return new InsertPreppedDeltaCommitActionExecutor<>(jsc, config, this, instantTime, preppedRecords).execute();
}
@Override
public HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, String instantTime,
JavaRDD<HoodieRecord<T>> preppedRecords, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
return new BulkInsertPreppedDeltaCommitActionExecutor<>(jsc, config,
this, instantTime, preppedRecords, bulkInsertPartitioner).execute();
}
@Override
@@ -320,105 +321,6 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
super.finalizeWrite(jsc, instantTs, stats);
}
/**
* UpsertPartitioner for MergeOnRead table type, this allows auto correction of small parquet files to larger ones
* without the need for an index in the logFile.
*/
class MergeOnReadUpsertPartitioner extends HoodieCopyOnWriteTable.UpsertPartitioner {
MergeOnReadUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
super(profile, jsc);
}
@Override
protected List<SmallFile> getSmallFiles(String partitionPath) {
// smallFiles only for partitionPath
List<SmallFile> smallFileLocations = new ArrayList<>();
// Init here since this class (and member variables) might not have been initialized
HoodieTimeline commitTimeline = getCompletedCommitsTimeline();
// Find out all eligible small file slices
if (!commitTimeline.empty()) {
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
// find smallest file in partition and append to it
List<FileSlice> allSmallFileSlices = new ArrayList<>();
// If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to
// it. Doing this overtime for a partition, we ensure that we handle small file issues
if (!index.canIndexLogFiles()) {
// TODO : choose last N small files since there can be multiple small files written to a single partition
// by different spark partitions in a single batch
Option<FileSlice> smallFileSlice = Option.fromJavaOptional(getSliceView()
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false)
.filter(fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getBaseFile().get().getFileSize() < config.getParquetSmallFileLimit())
.min((FileSlice left, FileSlice right) -> left.getBaseFile().get().getFileSize() < right.getBaseFile().get().getFileSize() ? -1 : 1));
if (smallFileSlice.isPresent()) {
allSmallFileSlices.add(smallFileSlice.get());
}
} else {
// If we can index log files, we can add more inserts to log files for fileIds including those under
// pending compaction.
List<FileSlice> allFileSlices =
getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
.collect(Collectors.toList());
for (FileSlice fileSlice : allFileSlices) {
if (isSmallFile(fileSlice)) {
allSmallFileSlices.add(fileSlice);
}
}
}
// Create SmallFiles from the eligible file slices
for (FileSlice smallFileSlice : allSmallFileSlices) {
SmallFile sf = new SmallFile();
if (smallFileSlice.getBaseFile().isPresent()) {
// TODO : Move logic of file name, file id, base commit time handling inside file slice
String filename = smallFileSlice.getBaseFile().get().getFileName();
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
} else {
HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
FSUtils.getFileIdFromLogPath(logFile.getPath()));
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
}
}
}
return smallFileLocations;
}
public List<String> getSmallFileIds() {
return (List<String>) smallFiles.stream().map(smallFile -> ((SmallFile) smallFile).location.getFileId())
.collect(Collectors.toList());
}
private long getTotalFileSize(FileSlice fileSlice) {
if (!fileSlice.getBaseFile().isPresent()) {
return convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));
} else {
return fileSlice.getBaseFile().get().getFileSize()
+ convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));
}
}
private boolean isSmallFile(FileSlice fileSlice) {
long totalSize = getTotalFileSize(fileSlice);
return totalSize < config.getParquetMaxFileSize();
}
// TODO (NA) : Make this static part of utility
public long convertLogFilesSizeToExpectedParquetSize(List<HoodieLogFile> hoodieLogFiles) {
long totalSizeOfLogFiles = hoodieLogFiles.stream().map(HoodieLogFile::getFileSize)
.filter(size -> size > 0).reduce(Long::sum).orElse(0L);
// Here we assume that if there is no base parquet file, all log files contain only inserts.
// We can then just get the parquet equivalent size of these log files, compare that with
// {@link config.getParquetMaxFileSize()} and decide if there is scope to insert more rows
return (long) (totalSizeOfLogFiles * config.getLogFileToParquetCompressionRatio());
}
}
private List<RollbackRequest> generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant,
HoodieCommitMetadata commitMetadata) {
ValidationUtils.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));

View File

@@ -32,6 +32,7 @@ import org.apache.hudi.common.fs.ConsistencyGuard;
import org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -54,16 +55,15 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
@@ -127,19 +127,83 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
}
/**
* Provides a partitioner to perform the upsert operation, based on the workload profile.
* Upsert a batch of new records into Hoodie table at the supplied instantTime.
* @param jsc Java Spark Context jsc
* @param instantTime Instant Time for the action
* @param records JavaRDD of hoodieRecords to upsert
* @return HoodieWriteMetadata
*/
public abstract Partitioner getUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc);
public abstract HoodieWriteMetadata upsert(JavaSparkContext jsc, String instantTime,
JavaRDD<HoodieRecord<T>> records);
/**
* Provides a partitioner to perform the insert operation, based on the workload profile.
* Insert a batch of new records into Hoodie table at the supplied instantTime.
* @param jsc Java Spark Context jsc
* @param instantTime Instant Time for the action
* @param records JavaRDD of hoodieRecords to upsert
* @return HoodieWriteMetadata
*/
public abstract Partitioner getInsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc);
public abstract HoodieWriteMetadata insert(JavaSparkContext jsc, String instantTime,
JavaRDD<HoodieRecord<T>> records);
/**
* Return whether this HoodieTable implementation can benefit from workload profiling.
* Bulk Insert a batch of new records into Hoodie table at the supplied instantTime.
* @param jsc Java Spark Context jsc
* @param instantTime Instant Time for the action
* @param records JavaRDD of hoodieRecords to upsert
* @param bulkInsertPartitioner User Defined Partitioner
* @return HoodieWriteMetadata
*/
public abstract boolean isWorkloadProfileNeeded();
public abstract HoodieWriteMetadata bulkInsert(JavaSparkContext jsc, String instantTime,
JavaRDD<HoodieRecord<T>> records, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner);
/**
* Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be
* de-duped and non existent keys will be removed before deleting.
*
* @param jsc Java Spark Context jsc
* @param instantTime Instant Time for the action
* @param keys {@link List} of {@link HoodieKey}s to be deleted
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata delete(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieKey> keys);
/**
* Upserts the given prepared records into the Hoodie table, at the supplied instantTime.
* <p>
* This implementation requires that the input records are already tagged, and de-duped if needed.
* @param jsc Java Spark Context jsc
* @param instantTime Instant Time for the action
* @param preppedRecords JavaRDD of hoodieRecords to upsert
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata upsertPrepped(JavaSparkContext jsc, String instantTime,
JavaRDD<HoodieRecord<T>> preppedRecords);
/**
* Inserts the given prepared records into the Hoodie table, at the supplied instantTime.
* <p>
* This implementation requires that the input records are already tagged, and de-duped if needed.
* @param jsc Java Spark Context jsc
* @param instantTime Instant Time for the action
* @param preppedRecords JavaRDD of hoodieRecords to upsert
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata insertPrepped(JavaSparkContext jsc, String instantTime,
JavaRDD<HoodieRecord<T>> preppedRecords);
/**
* Bulk Insert the given prepared records into the Hoodie table, at the supplied instantTime.
* <p>
* This implementation requires that the input records are already tagged, and de-duped if needed.
* @param jsc Java Spark Context jsc
* @param instantTime Instant Time for the action
* @param preppedRecords JavaRDD of hoodieRecords to upsert
* @param bulkInsertPartitioner User Defined Partitioner
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, String instantTime,
JavaRDD<HoodieRecord<T>> preppedRecords, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner);
public HoodieWriteConfig getConfig() {
return config;
@@ -259,18 +323,6 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
return index;
}
/**
* Perform the ultimate IO for a given upserted (RDD) partition.
*/
public abstract Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition,
Iterator<HoodieRecord<T>> recordIterator, Partitioner partitioner);
/**
* Perform the ultimate IO for a given inserted (RDD) partition.
*/
public abstract Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition,
Iterator<HoodieRecord<T>> recordIterator, Partitioner partitioner);
/**
* Schedule compaction for the instant time.
*

View File

@@ -18,13 +18,14 @@
package org.apache.hudi.table.action;
import java.io.Serializable;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaSparkContext;
public abstract class BaseActionExecutor<R> {
public abstract class BaseActionExecutor<R> implements Serializable {
protected final JavaSparkContext jsc;
protected final transient JavaSparkContext jsc;
protected final HoodieWriteConfig config;

View File

@@ -0,0 +1,291 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkConfigUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import scala.Tuple2;
public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseActionExecutor<HoodieWriteMetadata> {
private static final Logger LOG = LogManager.getLogger(BaseCommitActionExecutor.class);
private final WriteOperationType operationType;
protected final SparkTaskContextSupplier sparkTaskContextSupplier = new SparkTaskContextSupplier();
public BaseCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config,
HoodieTable table, String instantTime, WriteOperationType operationType) {
this(jsc, config, table, instantTime, operationType, null);
}
public BaseCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config,
HoodieTable table, String instantTime, WriteOperationType operationType,
JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
super(jsc, config, table, instantTime);
this.operationType = operationType;
}
public HoodieWriteMetadata execute(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
HoodieWriteMetadata result = new HoodieWriteMetadata();
// Cache the tagged records, so we don't end up computing both
// TODO: Consistent contract in HoodieWriteClient regarding preppedRecord storage level handling
if (inputRecordsRDD.getStorageLevel() == StorageLevel.NONE()) {
inputRecordsRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
} else {
LOG.info("RDD PreppedRecords was persisted at: " + inputRecordsRDD.getStorageLevel());
}
WorkloadProfile profile = null;
if (isWorkloadProfileNeeded()) {
profile = new WorkloadProfile(inputRecordsRDD);
LOG.info("Workload profile :" + profile);
saveWorkloadProfileMetadataToInflight(profile, instantTime);
}
// partition using the insert partitioner
final Partitioner partitioner = getPartitioner(profile);
JavaRDD<HoodieRecord<T>> partitionedRecords = partition(inputRecordsRDD, partitioner);
JavaRDD<WriteStatus> writeStatusRDD = partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> {
if (WriteOperationType.isChangingRecords(operationType)) {
return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
} else {
return handleInsertPartition(instantTime, partition, recordItr, partitioner);
}
}, true).flatMap(List::iterator);
updateIndexAndCommitIfNeeded(writeStatusRDD, result);
return result;
}
/**
* Save the workload profile in an intermediate file (here re-using commit files) This is useful when performing
* rollback for MOR tables. Only updates are recorded in the workload profile metadata since updates to log blocks
* are unknown across batches Inserts (which are new parquet files) are rolled back based on commit time. // TODO :
* Create a new WorkloadProfile metadata file instead of using HoodieCommitMetadata
*/
void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, String instantTime)
throws HoodieCommitException {
try {
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
profile.getPartitionPaths().forEach(path -> {
WorkloadStat partitionStat = profile.getWorkloadStat(path.toString());
partitionStat.getUpdateLocationToCount().forEach((key, value) -> {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId(key);
// TODO : Write baseCommitTime is possible here ?
writeStat.setPrevCommit(value.getKey());
writeStat.setNumUpdateWrites(value.getValue());
metadata.addWriteStat(path.toString(), writeStat);
});
});
metadata.setOperationType(operationType);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
String commitActionType = table.getMetaClient().getCommitActionType();
HoodieInstant requested = new HoodieInstant(State.REQUESTED, commitActionType, instantTime);
activeTimeline.transitionRequestedToInflight(requested,
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} catch (IOException io) {
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io);
}
}
private Partitioner getPartitioner(WorkloadProfile profile) {
if (WriteOperationType.isChangingRecords(operationType)) {
return getUpsertPartitioner(profile);
} else {
return getInsertPartitioner(profile);
}
}
private JavaRDD<HoodieRecord<T>> partition(JavaRDD<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
return dedupedRecords.mapToPair(
record -> new Tuple2<>(new Tuple2<>(record.getKey(), Option.ofNullable(record.getCurrentLocation())), record))
.partitionBy(partitioner).map(Tuple2::_2);
}
protected void updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD, HoodieWriteMetadata result) {
// cache writeStatusRDD before updating index, so that all actions before this are not triggered again for future
// RDD actions that are performed after updating the index.
writeStatusRDD = writeStatusRDD.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps()));
Instant indexStartTime = Instant.now();
// Update the index back
JavaRDD<WriteStatus> statuses = ((HoodieTable<T>)table).getIndex().updateLocation(writeStatusRDD, jsc,
(HoodieTable<T>)table);
result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
result.setWriteStatuses(statuses);
// Trigger the insert and collect statuses
commitOnAutoCommit(result);
}
protected void commitOnAutoCommit(HoodieWriteMetadata result) {
if (config.shouldAutoCommit()) {
LOG.info("Auto commit enabled: Committing " + instantTime);
commit(Option.empty(), result);
} else {
LOG.info("Auto commit disabled for " + instantTime);
}
}
private void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata result) {
String actionType = table.getMetaClient().getCommitActionType();
LOG.info("Committing " + instantTime + ", action Type " + actionType);
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.create(config, jsc);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
result.setCommitted(true);
List<HoodieWriteStat> stats = result.getWriteStatuses().map(WriteStatus::getStat).collect();
result.setWriteStats(stats);
updateMetadataAndRollingStats(metadata, stats);
// Finalize write
finalizeWrite(instantTime, stats, result);
// add in extra metadata
if (extraMetadata.isPresent()) {
extraMetadata.get().forEach(metadata::addMetadata);
}
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());
metadata.setOperationType(operationType);
try {
activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
LOG.info("Committed " + instantTime);
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
e);
}
result.setCommitMetadata(Option.of(metadata));
}
/**
* Finalize Write operation.
* @param instantTime Instant Time
* @param stats Hoodie Write Stat
*/
protected void finalizeWrite(String instantTime, List<HoodieWriteStat> stats, HoodieWriteMetadata result) {
try {
Instant start = Instant.now();
table.finalizeWrite(jsc, instantTime, stats);
result.setFinalizeDuration(Duration.between(start, Instant.now()));
} catch (HoodieIOException ioe) {
throw new HoodieCommitException("Failed to complete commit " + instantTime + " due to finalize errors.", ioe);
}
}
private void updateMetadataAndRollingStats(HoodieCommitMetadata metadata, List<HoodieWriteStat> writeStats) {
// 1. Look up the previous compaction/commit and get the HoodieCommitMetadata from there.
// 2. Now, first read the existing rolling stats and merge with the result of current metadata.
// Need to do this on every commit (delta or commit) to support COW and MOR.
for (HoodieWriteStat stat : writeStats) {
String partitionPath = stat.getPartitionPath();
// TODO: why is stat.getPartitionPath() null at times here.
metadata.addWriteStat(partitionPath, stat);
}
}
protected boolean isWorkloadProfileNeeded() {
return true;
}
@SuppressWarnings("unchecked")
protected Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr,
Partitioner partitioner) {
UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner;
BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
BucketType btype = binfo.bucketType;
try {
if (btype.equals(BucketType.INSERT)) {
return handleInsert(binfo.fileIdPrefix, recordItr);
} else if (btype.equals(BucketType.UPDATE)) {
return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr);
} else {
throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition);
}
} catch (Throwable t) {
String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
LOG.error(msg, t);
throw new HoodieUpsertException(msg, t);
}
}
protected Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr,
Partitioner partitioner) {
return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
}
/**
* Provides a partitioner to perform the upsert operation, based on the workload profile.
*/
protected abstract Partitioner getUpsertPartitioner(WorkloadProfile profile);
/**
* Provides a partitioner to perform the insert operation, based on the workload profile.
*/
protected abstract Partitioner getInsertPartitioner(WorkloadProfile profile);
protected abstract Iterator<List<WriteStatus>> handleInsert(String idPfx,
Iterator<HoodieRecord<T>> recordItr) throws Exception;
protected abstract Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId,
Iterator<HoodieRecord<T>> recordItr) throws IOException;
}

View File

@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import java.io.Serializable;
/**
* Helper class for a bucket's type (INSERT and UPDATE) and its file location.
*/
public class BucketInfo implements Serializable {
BucketType bucketType;
String fileIdPrefix;
String partitionPath;
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("BucketInfo {");
sb.append("bucketType=").append(bucketType).append(", ");
sb.append("fileIdPrefix=").append(fileIdPrefix).append(", ");
sb.append("partitionPath=").append(partitionPath);
sb.append('}');
return sb.toString();
}
}

View File

@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
public enum BucketType {
UPDATE, INSERT
}

View File

@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class BulkInsertCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends CommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
private final Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner;
public BulkInsertCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT);
this.inputRecordsRDD = inputRecordsRDD;
this.bulkInsertPartitioner = bulkInsertPartitioner;
}
@Override
public HoodieWriteMetadata execute() {
try {
return BulkInsertHelper.bulkInsert(inputRecordsRDD, instantTime, (HoodieTable<T>) table, config,
this, true, bulkInsertPartitioner);
} catch (Throwable e) {
if (e instanceof HoodieInsertException) {
throw e;
}
throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
}
}
}

View File

@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.BulkInsertMapFunction;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
import org.apache.spark.api.java.JavaRDD;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class BulkInsertHelper<T extends HoodieRecordPayload<T>> {
public static <T extends HoodieRecordPayload<T>> HoodieWriteMetadata bulkInsert(
JavaRDD<HoodieRecord<T>> inputRecords, String instantTime,
HoodieTable<T> table, HoodieWriteConfig config,
CommitActionExecutor<T> executor, boolean performDedupe,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
HoodieWriteMetadata result = new HoodieWriteMetadata();
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords = inputRecords;
if (performDedupe) {
dedupedRecords = WriteHelper.combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords,
config.getInsertShuffleParallelism(), ((HoodieTable<T>)table));
}
final JavaRDD<HoodieRecord<T>> repartitionedRecords;
final int parallelism = config.getBulkInsertShuffleParallelism();
if (bulkInsertPartitioner.isPresent()) {
repartitionedRecords = bulkInsertPartitioner.get().repartitionRecords(dedupedRecords, parallelism);
} else {
// Now, sort the records and line them up nicely for loading.
repartitionedRecords = dedupedRecords.sortBy(record -> {
// Let's use "partitionPath + key" as the sort key. Spark, will ensure
// the records split evenly across RDD partitions, such that small partitions fit
// into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
return String.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
}, true, parallelism);
}
// generate new file ID prefixes for each output partition
final List<String> fileIDPrefixes =
IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList());
table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
table.getMetaClient().getCommitActionType(), instantTime), Option.empty());
JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(instantTime, config, table, fileIDPrefixes), true)
.flatMap(List::iterator);
executor.updateIndexAndCommitIfNeeded(writeStatusRDD, result);
return result;
}
}

View File

@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class BulkInsertPreppedCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends CommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd;
private final Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner;
public BulkInsertPreppedCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> preppedInputRecordRdd,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT);
this.preppedInputRecordRdd = preppedInputRecordRdd;
this.bulkInsertPartitioner = bulkInsertPartitioner;
}
@Override
public HoodieWriteMetadata execute() {
try {
return BulkInsertHelper.bulkInsert(preppedInputRecordRdd, instantTime, (HoodieTable<T>) table, config,
this, false, bulkInsertPartitioner);
} catch (Throwable e) {
if (e instanceof HoodieInsertException) {
throw e;
}
throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
}
}
}

View File

@@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.ParquetReaderIterator;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.CopyOnWriteLazyInsertIterable;
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public abstract class CommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseCommitActionExecutor<T> {
private static final Logger LOG = LogManager.getLogger(CommitActionExecutor.class);
public CommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, WriteOperationType operationType) {
super(jsc, config, table, instantTime, operationType);
}
@Override
public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId,
Iterator<HoodieRecord<T>> recordItr)
throws IOException {
// This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
if (!recordItr.hasNext()) {
LOG.info("Empty partition with fileId => " + fileId);
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
}
// these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(partitionPath, fileId, recordItr);
return handleUpdateInternal(upsertHandle, fileId);
}
public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException {
// these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(partitionPath, fileId, keyToNewRecords, oldDataFile);
return handleUpdateInternal(upsertHandle, fileId);
}
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle upsertHandle, String fileId)
throws IOException {
if (upsertHandle.getOldFilePath() == null) {
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
} else {
AvroReadSupport.setAvroReadSchema(table.getHadoopConf(), upsertHandle.getWriterSchema());
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
try (ParquetReader<IndexedRecord> reader =
AvroParquetReader.<IndexedRecord>builder(upsertHandle.getOldFilePath()).withConf(table.getHadoopConf()).build()) {
wrapper = new SparkBoundedInMemoryExecutor(config, new ParquetReaderIterator(reader),
new UpdateHandler(upsertHandle), x -> x);
wrapper.execute();
} catch (Exception e) {
throw new HoodieException(e);
} finally {
upsertHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
}
}
}
// TODO(vc): This needs to be revisited
if (upsertHandle.getWriteStatus().getPartitionPath() == null) {
LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", "
+ upsertHandle.getWriteStatus());
}
return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator();
}
protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
return new HoodieMergeHandle<>(config, instantTime, (HoodieTable<T>)table, recordItr, partitionPath, fileId, sparkTaskContextSupplier);
}
protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
return new HoodieMergeHandle<>(config, instantTime, (HoodieTable<T>)table, keyToNewRecords,
partitionPath, fileId, dataFileToBeMerged, sparkTaskContextSupplier);
}
@Override
public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr)
throws Exception {
// This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
if (!recordItr.hasNext()) {
LOG.info("Empty partition");
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
}
return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable<T>)table, idPfx,
sparkTaskContextSupplier);
}
@Override
public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
if (profile == null) {
throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
}
return new UpsertPartitioner(profile, jsc, table, config);
}
@Override
public Partitioner getInsertPartitioner(WorkloadProfile profile) {
return getUpsertPartitioner(profile);
}
/**
* Consumer that dequeues records from queue and sends to Merge Handle.
*/
private static class UpdateHandler extends BoundedInMemoryQueueConsumer<GenericRecord, Void> {
private final HoodieMergeHandle upsertHandle;
private UpdateHandler(HoodieMergeHandle upsertHandle) {
this.upsertHandle = upsertHandle;
}
@Override
protected void consumeOneRecord(GenericRecord record) {
upsertHandle.write(record);
}
@Override
protected void finish() {}
@Override
protected Void getResult() {
return null;
}
}
}

View File

@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class DeleteCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends CommitActionExecutor<T> {
private final JavaRDD<HoodieKey> keys;
public DeleteCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieKey> keys) {
super(jsc, config, table, instantTime, WriteOperationType.DELETE);
this.keys = keys;
}
public HoodieWriteMetadata execute() {
return DeleteHelper.execute(instantTime, keys, jsc, config, (HoodieTable<T>)table, this);
}
}

View File

@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.time.Duration;
import java.time.Instant;
/**
* Helper class to perform delete keys on hoodie table.
* @param <T>
*/
public class DeleteHelper<T extends HoodieRecordPayload<T>> {
/**
* Deduplicate Hoodie records, using the given deduplication function.
*
* @param keys RDD of HoodieKey to deduplicate
* @return RDD of HoodieKey already be deduplicated
*/
private static <T extends HoodieRecordPayload<T>> JavaRDD<HoodieKey> deduplicateKeys(JavaRDD<HoodieKey> keys,
HoodieTable<T> table) {
boolean isIndexingGlobal = table.getIndex().isGlobal();
if (isIndexingGlobal) {
return keys.keyBy(HoodieKey::getRecordKey)
.reduceByKey((key1, key2) -> key1)
.values();
} else {
return keys.distinct();
}
}
public static <T extends HoodieRecordPayload<T>> HoodieWriteMetadata execute(String instantTime,
JavaRDD<HoodieKey> keys, JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<T> table,
CommitActionExecutor<T> deleteExecutor) {
try {
HoodieWriteMetadata result = null;
// De-dupe/merge if needed
JavaRDD<HoodieKey> dedupedKeys = config.shouldCombineBeforeDelete() ? deduplicateKeys(keys, table) : keys;
JavaRDD<HoodieRecord<T>> dedupedRecords =
dedupedKeys.map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
Instant beginTag = Instant.now();
// perform index loop up to get existing location of records
JavaRDD<HoodieRecord<T>> taggedRecords =
((HoodieTable<T>)table).getIndex().tagLocation(dedupedRecords, jsc, (HoodieTable<T>)table);
Duration tagLocationDuration = Duration.between(beginTag, Instant.now());
// filter out non existant keys/records
JavaRDD<HoodieRecord<T>> taggedValidRecords = taggedRecords.filter(HoodieRecord::isCurrentLocationKnown);
if (!taggedValidRecords.isEmpty()) {
result = deleteExecutor.execute(taggedValidRecords);
result.setIndexLookupDuration(tagLocationDuration);
} else {
// if entire set of keys are non existent
deleteExecutor.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(jsc.emptyRDD()), instantTime);
result = new HoodieWriteMetadata();
result.setWriteStatuses(jsc.emptyRDD());
deleteExecutor.commitOnAutoCommit(result);
}
return result;
} catch (Throwable e) {
if (e instanceof HoodieUpsertException) {
throw (HoodieUpsertException) e;
}
throw new HoodieUpsertException("Failed to delete for commit time " + instantTime, e);
}
}
}

View File

@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import java.util.List;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.util.Option;
import org.apache.spark.api.java.JavaRDD;
import java.time.Duration;
/**
* Contains metadata, write-statuses and latency times corresponding to a commit/delta-commit action.
*/
public class HoodieWriteMetadata {
private JavaRDD<WriteStatus> writeStatuses;
private Option<Duration> indexLookupDuration = Option.empty();
// Will be set when auto-commit happens
private boolean isCommitted;
private Option<HoodieCommitMetadata> commitMetadata = Option.empty();
private Option<List<HoodieWriteStat>> writeStats = Option.empty();
private Option<Duration> indexUpdateDuration = Option.empty();
private Option<Duration> finalizeDuration = Option.empty();
public HoodieWriteMetadata() {
}
public JavaRDD<WriteStatus> getWriteStatuses() {
return writeStatuses;
}
public Option<HoodieCommitMetadata> getCommitMetadata() {
return commitMetadata;
}
public void setWriteStatuses(JavaRDD<WriteStatus> writeStatuses) {
this.writeStatuses = writeStatuses;
}
public void setCommitMetadata(Option<HoodieCommitMetadata> commitMetadata) {
this.commitMetadata = commitMetadata;
}
public Option<Duration> getFinalizeDuration() {
return finalizeDuration;
}
public void setFinalizeDuration(Duration finalizeDuration) {
this.finalizeDuration = Option.ofNullable(finalizeDuration);
}
public Option<Duration> getIndexUpdateDuration() {
return indexUpdateDuration;
}
public void setIndexUpdateDuration(Duration indexUpdateDuration) {
this.indexUpdateDuration = Option.ofNullable(indexUpdateDuration);
}
public boolean isCommitted() {
return isCommitted;
}
public void setCommitted(boolean committed) {
isCommitted = committed;
}
public Option<List<HoodieWriteStat>> getWriteStats() {
return writeStats;
}
public void setWriteStats(List<HoodieWriteStat> writeStats) {
this.writeStats = Option.of(writeStats);
}
public Option<Duration> getIndexLookupDuration() {
return indexLookupDuration;
}
public void setIndexLookupDuration(Duration indexLookupDuration) {
this.indexLookupDuration = Option.ofNullable(indexLookupDuration);
}
}

View File

@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import java.io.Serializable;
/**
* Helper class for an insert bucket along with the weight [0.0, 1.0] that defines the amount of incoming inserts that
* should be allocated to the bucket.
*/
public class InsertBucket implements Serializable {
int bucketNumber;
// fraction of total inserts, that should go into this bucket
double weight;
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("WorkloadStat {");
sb.append("bucketNumber=").append(bucketNumber).append(", ");
sb.append("weight=").append(weight);
sb.append('}');
return sb.toString();
}
}

View File

@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class InsertCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends CommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
public InsertCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
super(jsc, config, table, instantTime, WriteOperationType.INSERT);
this.inputRecordsRDD = inputRecordsRDD;
}
@Override
public HoodieWriteMetadata execute() {
return WriteHelper.write(instantTime, inputRecordsRDD, jsc, (HoodieTable<T>) table,
config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false);
}
}

View File

@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class InsertPreppedCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends CommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> preppedRecords;
public InsertPreppedCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> preppedRecords) {
super(jsc, config, table, instantTime, WriteOperationType.INSERT_PREPPED);
this.preppedRecords = preppedRecords;
}
public HoodieWriteMetadata execute() {
return super.execute(preppedRecords);
}
}

View File

@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import java.io.Serializable;
import org.apache.hudi.common.model.HoodieRecordLocation;
/**
* Helper class for a small file's location and its actual size on disk.
*/
public class SmallFile implements Serializable {
public HoodieRecordLocation location;
public long sizeBytes;
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("SmallFile {");
sb.append("location=").append(location).append(", ");
sb.append("sizeBytes=").append(sizeBytes);
sb.append('}');
return sb.toString();
}
}

View File

@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class UpsertCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends CommitActionExecutor<T> {
private JavaRDD<HoodieRecord<T>> inputRecordsRDD;
public UpsertCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
super(jsc, config, table, instantTime, WriteOperationType.UPSERT);
this.inputRecordsRDD = inputRecordsRDD;
}
@Override
public HoodieWriteMetadata execute() {
return WriteHelper.write(instantTime, inputRecordsRDD, jsc, (HoodieTable<T>)table,
config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, true);
}
}

View File

@@ -0,0 +1,316 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.NumericUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
import scala.Tuple2;
/**
* Packs incoming records to be upserted, into buckets (1 bucket = 1 RDD partition).
*/
public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partitioner {
private static final Logger LOG = LogManager.getLogger(UpsertPartitioner.class);
/**
* List of all small files to be corrected.
*/
protected List<SmallFile> smallFiles = new ArrayList<>();
/**
* Total number of RDD partitions, is determined by total buckets we want to pack the incoming workload into.
*/
private int totalBuckets = 0;
/**
* Stat for the current workload. Helps in determining total inserts, upserts etc.
*/
private WorkloadStat globalStat;
/**
* Helps decide which bucket an incoming update should go to.
*/
private HashMap<String, Integer> updateLocationToBucket;
/**
* Helps us pack inserts into 1 or more buckets depending on number of incoming records.
*/
private HashMap<String, List<InsertBucket>> partitionPathToInsertBuckets;
/**
* Remembers what type each bucket is for later.
*/
private HashMap<Integer, BucketInfo> bucketInfoMap;
protected final HoodieTable<T> table;
protected final HoodieWriteConfig config;
public UpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc, HoodieTable<T> table,
HoodieWriteConfig config) {
updateLocationToBucket = new HashMap<>();
partitionPathToInsertBuckets = new HashMap<>();
bucketInfoMap = new HashMap<>();
globalStat = profile.getGlobalStat();
this.table = table;
this.config = config;
assignUpdates(profile);
assignInserts(profile, jsc);
LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n"
+ "Partition to insert buckets => " + partitionPathToInsertBuckets + ", \n"
+ "UpdateLocations mapped to buckets =>" + updateLocationToBucket);
}
private void assignUpdates(WorkloadProfile profile) {
// each update location gets a partition
Set<Entry<String, WorkloadStat>> partitionStatEntries = profile.getPartitionPathStatMap().entrySet();
for (Map.Entry<String, WorkloadStat> partitionStat : partitionStatEntries) {
for (Map.Entry<String, Pair<String, Long>> updateLocEntry :
partitionStat.getValue().getUpdateLocationToCount().entrySet()) {
addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey());
}
}
}
private int addUpdateBucket(String partitionPath, String fileIdHint) {
int bucket = totalBuckets;
updateLocationToBucket.put(fileIdHint, bucket);
BucketInfo bucketInfo = new BucketInfo();
bucketInfo.bucketType = BucketType.UPDATE;
bucketInfo.fileIdPrefix = fileIdHint;
bucketInfo.partitionPath = partitionPath;
bucketInfoMap.put(totalBuckets, bucketInfo);
totalBuckets++;
return bucket;
}
private void assignInserts(WorkloadProfile profile, JavaSparkContext jsc) {
// for new inserts, compute buckets depending on how many records we have for each partition
Set<String> partitionPaths = profile.getPartitionPaths();
long averageRecordSize =
averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
config.getCopyOnWriteRecordSizeEstimate());
LOG.info("AvgRecordSize => " + averageRecordSize);
Map<String, List<SmallFile>> partitionSmallFilesMap =
getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), jsc);
for (String partitionPath : partitionPaths) {
WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
if (pStat.getNumInserts() > 0) {
List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath);
this.smallFiles.addAll(smallFiles);
LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles);
long totalUnassignedInserts = pStat.getNumInserts();
List<Integer> bucketNumbers = new ArrayList<>();
List<Long> recordsPerBucket = new ArrayList<>();
// first try packing this into one of the smallFiles
for (SmallFile smallFile : smallFiles) {
long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize,
totalUnassignedInserts);
if (recordsToAppend > 0 && totalUnassignedInserts > 0) {
// create a new bucket or re-use an existing bucket
int bucket;
if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) {
bucket = updateLocationToBucket.get(smallFile.location.getFileId());
LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket);
} else {
bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId());
LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket);
}
bucketNumbers.add(bucket);
recordsPerBucket.add(recordsToAppend);
totalUnassignedInserts -= recordsToAppend;
}
}
// if we have anything more, create new insert buckets, like normal
if (totalUnassignedInserts > 0) {
long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize();
if (config.shouldAutoTuneInsertSplits()) {
insertRecordsPerBucket = config.getParquetMaxFileSize() / averageRecordSize;
}
int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket);
LOG.info("After small file assignment: unassignedInserts => " + totalUnassignedInserts
+ ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket);
for (int b = 0; b < insertBuckets; b++) {
bucketNumbers.add(totalBuckets);
recordsPerBucket.add(totalUnassignedInserts / insertBuckets);
BucketInfo bucketInfo = new BucketInfo();
bucketInfo.bucketType = BucketType.INSERT;
bucketInfo.partitionPath = partitionPath;
bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
bucketInfoMap.put(totalBuckets, bucketInfo);
totalBuckets++;
}
}
// Go over all such buckets, and assign weights as per amount of incoming inserts.
List<InsertBucket> insertBuckets = new ArrayList<>();
for (int i = 0; i < bucketNumbers.size(); i++) {
InsertBucket bkt = new InsertBucket();
bkt.bucketNumber = bucketNumbers.get(i);
bkt.weight = (1.0 * recordsPerBucket.get(i)) / pStat.getNumInserts();
insertBuckets.add(bkt);
}
LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets);
partitionPathToInsertBuckets.put(partitionPath, insertBuckets);
}
}
}
private Map<String, List<SmallFile>> getSmallFilesForPartitions(List<String> partitionPaths, JavaSparkContext jsc) {
Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
if (partitionPaths != null && partitionPaths.size() > 0) {
JavaRDD<String> partitionPathRdds = jsc.parallelize(partitionPaths, partitionPaths.size());
partitionSmallFilesMap = partitionPathRdds.mapToPair((PairFunction<String, String, List<SmallFile>>)
partitionPath -> new Tuple2<>(partitionPath, getSmallFiles(partitionPath))).collectAsMap();
}
return partitionSmallFilesMap;
}
/**
* Returns a list of small files in the given partition path.
*/
protected List<SmallFile> getSmallFiles(String partitionPath) {
// smallFiles only for partitionPath
List<SmallFile> smallFileLocations = new ArrayList<>();
HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants();
if (!commitTimeline.empty()) { // if we have some commits
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
List<HoodieBaseFile> allFiles = table.getBaseFileOnlyView()
.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
for (HoodieBaseFile file : allFiles) {
if (file.getFileSize() < config.getParquetSmallFileLimit()) {
String filename = file.getFileName();
SmallFile sf = new SmallFile();
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
sf.sizeBytes = file.getFileSize();
smallFileLocations.add(sf);
}
}
}
return smallFileLocations;
}
public BucketInfo getBucketInfo(int bucketNumber) {
return bucketInfoMap.get(bucketNumber);
}
public List<InsertBucket> getInsertBuckets(String partitionPath) {
return partitionPathToInsertBuckets.get(partitionPath);
}
@Override
public int numPartitions() {
return totalBuckets;
}
@Override
public int getPartition(Object key) {
Tuple2<HoodieKey, Option<HoodieRecordLocation>> keyLocation =
(Tuple2<HoodieKey, Option<HoodieRecordLocation>>) key;
if (keyLocation._2().isPresent()) {
HoodieRecordLocation location = keyLocation._2().get();
return updateLocationToBucket.get(location.getFileId());
} else {
List<InsertBucket> targetBuckets = partitionPathToInsertBuckets.get(keyLocation._1().getPartitionPath());
// pick the target bucket to use based on the weights.
double totalWeight = 0.0;
final long totalInserts = Math.max(1, globalStat.getNumInserts());
final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation._1().getRecordKey());
final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts;
for (InsertBucket insertBucket : targetBuckets) {
totalWeight += insertBucket.weight;
if (r <= totalWeight) {
return insertBucket.bucketNumber;
}
}
// return first one, by default
return targetBuckets.get(0).bucketNumber;
}
}
/**
* Obtains the average record size based on records written during previous commits. Used for estimating how many
* records pack into one file.
*/
protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, int defaultRecordSizeEstimate) {
long avgSize = defaultRecordSizeEstimate;
try {
if (!commitTimeline.empty()) {
// Go over the reverse ordered commits to get a more recent estimate of average record size.
Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator();
while (instants.hasNext()) {
HoodieInstant instant = instants.next();
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
if (totalBytesWritten > 0 && totalRecordsWritten > 0) {
avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten);
break;
}
}
}
} catch (Throwable t) {
// make this fail safe.
LOG.error("Error trying to compute average bytes/record ", t);
}
return avgSize;
}
}

View File

@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class UpsertPreppedCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends CommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> preppedRecords;
public UpsertPreppedCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> preppedRecords) {
super(jsc, config, table, instantTime, WriteOperationType.UPSERT_PREPPED);
this.preppedRecords = preppedRecords;
}
public HoodieWriteMetadata execute() {
return super.execute(preppedRecords);
}
}

View File

@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.time.Duration;
import java.time.Instant;
import scala.Tuple2;
public class WriteHelper<T extends HoodieRecordPayload<T>> {
public static <T extends HoodieRecordPayload<T>> HoodieWriteMetadata write(String instantTime,
JavaRDD<HoodieRecord<T>> inputRecordsRDD, JavaSparkContext jsc,
HoodieTable<T> table, boolean shouldCombine,
int shuffleParallelism, CommitActionExecutor<T> executor, boolean performTagging) {
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =
combineOnCondition(shouldCombine, inputRecordsRDD, shuffleParallelism, table);
Instant lookupBegin = Instant.now();
JavaRDD<HoodieRecord<T>> taggedRecords = dedupedRecords;
if (performTagging) {
// perform index loop up to get existing location of records
taggedRecords = tag(dedupedRecords, jsc, table);
}
Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());
HoodieWriteMetadata result = executor.execute(taggedRecords);
result.setIndexLookupDuration(indexLookupDuration);
return result;
} catch (Throwable e) {
if (e instanceof HoodieUpsertException) {
throw (HoodieUpsertException) e;
}
throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
}
}
private static <T extends HoodieRecordPayload<T>> JavaRDD<HoodieRecord<T>> tag(
JavaRDD<HoodieRecord<T>> dedupedRecords, JavaSparkContext jsc, HoodieTable<T> table) {
// perform index loop up to get existing location of records
return table.getIndex().tagLocation(dedupedRecords, jsc, table);
}
public static <T extends HoodieRecordPayload<T>> JavaRDD<HoodieRecord<T>> combineOnCondition(
boolean condition, JavaRDD<HoodieRecord<T>> records, int parallelism, HoodieTable<T> table) {
return condition ? deduplicateRecords(records, table, parallelism) : records;
}
/**
* Deduplicate Hoodie records, using the given deduplication function.
*
* @param records hoodieRecords to deduplicate
* @param parallelism parallelism or partitions to be used while reducing/deduplicating
* @return RDD of HoodieRecord already be deduplicated
*/
public static <T extends HoodieRecordPayload<T>> JavaRDD<HoodieRecord<T>> deduplicateRecords(
JavaRDD<HoodieRecord<T>> records, HoodieTable<T> table, int parallelism) {
return deduplicateRecords(records, table.getIndex(), parallelism);
}
public static <T extends HoodieRecordPayload<T>> JavaRDD<HoodieRecord<T>> deduplicateRecords(
JavaRDD<HoodieRecord<T>> records, HoodieIndex<T> index, int parallelism) {
boolean isIndexingGlobal = index.isGlobal();
return records.mapToPair(record -> {
HoodieKey hoodieKey = record.getKey();
// If index used is global, then records are expected to differ in their partitionPath
Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
return new Tuple2<>(key, record);
}).reduceByKey((rec1, rec2) -> {
@SuppressWarnings("unchecked")
T reducedData = (T) rec1.getData().preCombine(rec2.getData());
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
return new HoodieRecord<T>(rec1.getKey(), reducedData);
}, parallelism).map(Tuple2::_2);
}
}

View File

@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.deltacommit;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
import org.apache.hudi.table.action.commit.BulkInsertHelper;
import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class BulkInsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends DeltaCommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
private final Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner;
public BulkInsertDeltaCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT);
this.inputRecordsRDD = inputRecordsRDD;
this.bulkInsertPartitioner = bulkInsertPartitioner;
}
@Override
public HoodieWriteMetadata execute() {
try {
return BulkInsertHelper.bulkInsert(inputRecordsRDD, instantTime, (HoodieTable<T>) table, config,
this, true, bulkInsertPartitioner);
} catch (Throwable e) {
if (e instanceof HoodieInsertException) {
throw e;
}
throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
}
}
}

View File

@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.deltacommit;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
import org.apache.hudi.table.action.commit.BulkInsertHelper;
import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class BulkInsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends DeltaCommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd;
private final Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner;
public BulkInsertPreppedDeltaCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> preppedInputRecordRdd,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT);
this.preppedInputRecordRdd = preppedInputRecordRdd;
this.bulkInsertPartitioner = bulkInsertPartitioner;
}
@Override
public HoodieWriteMetadata execute() {
try {
return BulkInsertHelper.bulkInsert(preppedInputRecordRdd, instantTime, (HoodieTable<T>) table, config,
this, false, bulkInsertPartitioner);
} catch (Throwable e) {
if (e instanceof HoodieInsertException) {
throw e;
}
throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
}
}
}

View File

@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.deltacommit;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.DeleteHelper;
import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class DeleteDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends DeltaCommitActionExecutor<T> {
private final JavaRDD<HoodieKey> keys;
public DeleteDeltaCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieKey> keys) {
super(jsc, config, table, instantTime, WriteOperationType.DELETE);
this.keys = keys;
}
public HoodieWriteMetadata execute() {
return DeleteHelper.execute(instantTime, keys, jsc, config, (HoodieTable<T>)table, this);
}
}

View File

@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.deltacommit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.MergeOnReadLazyInsertIterable;
import org.apache.hudi.io.HoodieAppendHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.action.commit.CommitActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
public abstract class DeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends CommitActionExecutor<T> {
private static final Logger LOG = LogManager.getLogger(DeltaCommitActionExecutor.class);
// UpsertPartitioner for MergeOnRead table type
private UpsertDeltaCommitPartitioner mergeOnReadUpsertPartitioner;
public DeltaCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, WriteOperationType operationType) {
super(jsc, config, table, instantTime, operationType);
}
@Override
public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
if (profile == null) {
throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
}
mergeOnReadUpsertPartitioner = new UpsertDeltaCommitPartitioner(profile, jsc, table, config);
return mergeOnReadUpsertPartitioner;
}
@Override
public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId,
Iterator<HoodieRecord<T>> recordItr) throws IOException {
LOG.info("Merging updates for commit " + instantTime + " for file " + fileId);
if (!table.getIndex().canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
LOG.info("Small file corrections for updates for commit " + instantTime + " for file " + fileId);
return super.handleUpdate(partitionPath, fileId, recordItr);
} else {
HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<>(config, instantTime, (HoodieTable<T>)table,
partitionPath, fileId, recordItr, sparkTaskContextSupplier);
appendHandle.doAppend();
appendHandle.close();
return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator();
}
}
@Override
public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr)
throws Exception {
// If canIndexLogFiles, write inserts to log files else write inserts to parquet files
if (table.getIndex().canIndexLogFiles()) {
return new MergeOnReadLazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable<T>)table, idPfx,
sparkTaskContextSupplier);
} else {
return super.handleInsert(idPfx, recordItr);
}
}
}

View File

@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.deltacommit;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.WriteHelper;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class InsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends DeltaCommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
public InsertDeltaCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
super(jsc, config, table, instantTime, WriteOperationType.INSERT);
this.inputRecordsRDD = inputRecordsRDD;
}
@Override
public HoodieWriteMetadata execute() {
return WriteHelper.write(instantTime, inputRecordsRDD, jsc, (HoodieTable<T>) table,
config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(),this, false);
}
}

View File

@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.deltacommit;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class InsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends DeltaCommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> preppedRecords;
public InsertPreppedDeltaCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> preppedRecords) {
super(jsc, config, table, instantTime, WriteOperationType.INSERT_PREPPED);
this.preppedRecords = preppedRecords;
}
public HoodieWriteMetadata execute() {
return super.execute(preppedRecords);
}
}

View File

@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.deltacommit;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.WriteHelper;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class UpsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends DeltaCommitActionExecutor<T> {
private JavaRDD<HoodieRecord<T>> inputRecordsRDD;
public UpsertDeltaCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
super(jsc, config, table, instantTime, WriteOperationType.UPSERT);
this.inputRecordsRDD = inputRecordsRDD;
}
@Override
public HoodieWriteMetadata execute() {
return WriteHelper.write(instantTime, inputRecordsRDD, jsc, (HoodieTable<T>) table,
config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(),this, true);
}
}

View File

@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.deltacommit;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.action.commit.SmallFile;
import org.apache.hudi.table.action.commit.UpsertPartitioner;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* UpsertPartitioner for MergeOnRead table type, this allows auto correction of small parquet files to larger ones
* without the need for an index in the logFile.
*/
public class UpsertDeltaCommitPartitioner<T extends HoodieRecordPayload<T>> extends UpsertPartitioner<T> {
UpsertDeltaCommitPartitioner(WorkloadProfile profile, JavaSparkContext jsc, HoodieTable<T> table,
HoodieWriteConfig config) {
super(profile, jsc, table, config);
}
@Override
protected List<SmallFile> getSmallFiles(String partitionPath) {
// smallFiles only for partitionPath
List<SmallFile> smallFileLocations = new ArrayList<>();
// Init here since this class (and member variables) might not have been initialized
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
// Find out all eligible small file slices
if (!commitTimeline.empty()) {
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
// find smallest file in partition and append to it
List<FileSlice> allSmallFileSlices = new ArrayList<>();
// If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to
// it. Doing this overtime for a partition, we ensure that we handle small file issues
if (!table.getIndex().canIndexLogFiles()) {
// TODO : choose last N small files since there can be multiple small files written to a single partition
// by different spark partitions in a single batch
Option<FileSlice> smallFileSlice = Option.fromJavaOptional(table.getSliceView()
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false)
.filter(
fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getBaseFile().get().getFileSize() < config
.getParquetSmallFileLimit())
.min((FileSlice left, FileSlice right) ->
left.getBaseFile().get().getFileSize() < right.getBaseFile().get().getFileSize() ? -1 : 1));
if (smallFileSlice.isPresent()) {
allSmallFileSlices.add(smallFileSlice.get());
}
} else {
// If we can index log files, we can add more inserts to log files for fileIds including those under
// pending compaction.
List<FileSlice> allFileSlices =
table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
.collect(Collectors.toList());
for (FileSlice fileSlice : allFileSlices) {
if (isSmallFile(fileSlice)) {
allSmallFileSlices.add(fileSlice);
}
}
}
// Create SmallFiles from the eligible file slices
for (FileSlice smallFileSlice : allSmallFileSlices) {
SmallFile sf = new SmallFile();
if (smallFileSlice.getBaseFile().isPresent()) {
// TODO : Move logic of file name, file id, base commit time handling inside file slice
String filename = smallFileSlice.getBaseFile().get().getFileName();
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
} else {
HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
FSUtils.getFileIdFromLogPath(logFile.getPath()));
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
}
}
}
return smallFileLocations;
}
public List<String> getSmallFileIds() {
return (List<String>) smallFiles.stream().map(smallFile -> ((SmallFile) smallFile).location.getFileId())
.collect(Collectors.toList());
}
private long getTotalFileSize(FileSlice fileSlice) {
if (!fileSlice.getBaseFile().isPresent()) {
return convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));
} else {
return fileSlice.getBaseFile().get().getFileSize()
+ convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));
}
}
private boolean isSmallFile(FileSlice fileSlice) {
long totalSize = getTotalFileSize(fileSlice);
return totalSize < config.getParquetMaxFileSize();
}
// TODO (NA) : Make this static part of utility
public long convertLogFilesSizeToExpectedParquetSize(List<HoodieLogFile> hoodieLogFiles) {
long totalSizeOfLogFiles = hoodieLogFiles.stream().map(HoodieLogFile::getFileSize)
.filter(size -> size > 0).reduce(Long::sum).orElse(0L);
// Here we assume that if there is no base parquet file, all log files contain only inserts.
// We can then just get the parquet equivalent size of these log files, compare that with
// {@link config.getParquetMaxFileSize()} and decide if there is scope to insert more rows
return (long) (totalSizeOfLogFiles * config.getLogFileToParquetCompressionRatio());
}
}

View File

@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.deltacommit;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class UpsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends DeltaCommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> preppedRecords;
public UpsertPreppedDeltaCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> preppedRecords) {
super(jsc, config, table, instantTime, WriteOperationType.UPSERT_PREPPED);
this.preppedRecords = preppedRecords;
}
public HoodieWriteMetadata execute() {
return super.execute(preppedRecords);
}
}