1
0

[MINOR] Private the NoArgsConstructor of SparkMergeHelper and code clean (#2194)

This commit is contained in:
wangxianghu
2020-10-26 12:22:11 +08:00
committed by GitHub
parent 8545ea3856
commit e206ddd431
3 changed files with 6 additions and 13 deletions

View File

@@ -199,6 +199,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
commitOnAutoCommit(result);
}
@Override
protected String getCommitActionType() {
return table.getMetaClient().getCommitActionType();
}
@@ -276,14 +277,6 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
return handleUpdateInternal(upsertHandle, fileId);
}
public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords,
HoodieBaseFile oldDataFile) throws IOException {
// these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(partitionPath, fileId, keyToNewRecords, oldDataFile);
return handleUpdateInternal(upsertHandle, fileId);
}
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle upsertHandle, String fileId)
throws IOException {
if (upsertHandle.getOldFilePath() == null) {

View File

@@ -28,8 +28,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
@@ -41,8 +39,6 @@ import java.util.stream.Collectors;
public class SparkInsertOverwriteCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkCommitActionExecutor<T> {
private static final Logger LOG = LogManager.getLogger(SparkInsertOverwriteCommitActionExecutor.class);
private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
public SparkInsertOverwriteCommitActionExecutor(HoodieEngineContext context,
@@ -53,7 +49,7 @@ public class SparkInsertOverwriteCommitActionExecutor<T extends HoodieRecordPayl
}
@Override
public HoodieWriteMetadata execute() {
public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false);
}
@@ -68,6 +64,7 @@ public class SparkInsertOverwriteCommitActionExecutor<T extends HoodieRecordPayl
return HoodieTimeline.REPLACE_COMMIT_ACTION;
}
@Override
protected Map<String, List<String>> getPartitionToReplacedFileIds(JavaRDD<WriteStatus> writeStatuses) {
return writeStatuses.map(status -> status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
new Tuple2<>(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap();

View File

@@ -46,6 +46,9 @@ import java.util.Iterator;
public class SparkMergeHelper<T extends HoodieRecordPayload> extends AbstractMergeHelper<T, JavaRDD<HoodieRecord<T>>,
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
private SparkMergeHelper() {
}
private static class MergeHelperHolder {
private static final SparkMergeHelper SPARK_MERGE_HELPER = new SparkMergeHelper();
}