1
0

[HUDI-1349] spark sql support overwrite use insert_overwrite_table (#2196)

This commit is contained in:
lw0090
2020-12-04 04:26:21 +08:00
committed by GitHub
parent 78fd122594
commit 1f0d5c077e
14 changed files with 224 additions and 10 deletions

View File

@@ -205,6 +205,17 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
*/
public abstract HoodieWriteMetadata<O> insertOverwrite(HoodieEngineContext context, String instantTime, I records);
/**
* Delete all the existing records of the Hoodie table and inserts the specified new records into Hoodie table at the supplied instantTime,
* for the partition paths contained in input records.
*
* @param context HoodieEngineContext
* @param instantTime Instant time for the replace action
* @param records input records
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata<O> insertOverwriteTable(HoodieEngineContext context, String instantTime, I records);
public HoodieWriteConfig getConfig() {
return config;
}

View File

@@ -106,6 +106,11 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends
throw new HoodieNotSupportedException("InsertOverWrite is not supported yet");
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> insertOverwriteTable(HoodieEngineContext context, String instantTime, List<HoodieRecord<T>> records) {
throw new HoodieNotSupportedException("insertOverwriteTable is not supported yet");
}
@Override
public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) {
throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table");

View File

@@ -191,6 +191,23 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
}
/**
* Removes all existing records of the Hoodie table and inserts the given HoodieRecords, into the table.
* @param records HoodieRecords to insert
* @param instantTime Instant time of the commit
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public HoodieWriteResult insertOverwriteTable(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE_TABLE, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT_OVERWRITE_TABLE);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
HoodieWriteMetadata result = table.insertOverwriteTable(context, instantTime, records);
return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
}
@Override
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, String instantTime) {
return bulkInsert(records, instantTime, Option.empty());

View File

@@ -45,6 +45,7 @@ import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.bootstrap.SparkBootstrapCommitActionExecutor;
import org.apache.hudi.table.action.clean.SparkCleanActionExecutor;
import org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkInsertOverwriteTableCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkBulkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkDeleteCommitActionExecutor;
@@ -129,6 +130,11 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> extends
return new SparkInsertOverwriteCommitActionExecutor(context, config, this, instantTime, records).execute();
}
@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> insertOverwriteTable(HoodieEngineContext context, String instantTime, JavaRDD<HoodieRecord<T>> records) {
return new SparkInsertOverwriteTableCommitActionExecutor(context, config, this, instantTime, records).execute();
}
@Override
public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) {
throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table");

View File

@@ -44,7 +44,14 @@ public class SparkInsertOverwriteCommitActionExecutor<T extends HoodieRecordPayl
public SparkInsertOverwriteCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
super(context, config, table, instantTime, WriteOperationType.INSERT_OVERWRITE);
this(context, config, table, instantTime, inputRecordsRDD, WriteOperationType.INSERT_OVERWRITE);
}
public SparkInsertOverwriteCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
WriteOperationType writeOperationType) {
super(context, config, table, instantTime, writeOperationType);
this.inputRecordsRDD = inputRecordsRDD;
}

View File

@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class SparkInsertOverwriteTableCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends SparkInsertOverwriteCommitActionExecutor<T> {
public SparkInsertOverwriteTableCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
super(context, config, table, instantTime, inputRecordsRDD, WriteOperationType.INSERT_OVERWRITE_TABLE);
}
protected List<String> getAllExistingFileIds(String partitionPath) {
return table.getSliceView().getLatestFileSlices(partitionPath)
.map(fg -> fg.getFileId()).distinct().collect(Collectors.toList());
}
@Override
protected Map<String, List<String>> getPartitionToReplacedFileIds(JavaRDD<WriteStatus> writeStatuses) {
Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
try {
List<String> partitionPaths = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(),
table.getMetaClient().getBasePath(), false);
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
if (partitionPaths != null && partitionPaths.size() > 0) {
context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions");
JavaRDD<String> partitionPathRdd = jsc.parallelize(partitionPaths, partitionPaths.size());
partitionToExistingFileIds = partitionPathRdd.mapToPair(
partitionPath -> new Tuple2<>(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap();
}
} catch (IOException e) {
throw new HoodieCommitException("In InsertOverwriteTable action failed to get existing fileIds of all partition "
+ config.getBasePath() + " at time " + instantTime, e);
}
return partitionToExistingFileIds;
}
}

View File

@@ -38,10 +38,12 @@ public enum WriteOperationType {
// delete
DELETE("delete"),
BOOTSTRAP("bootstrap"),
// insert overwrite
// insert overwrite with static partitioning
INSERT_OVERWRITE("insert_overwrite"),
// cluster
CLUSTER("cluster"),
// insert overwrite with dynamic partitioning
INSERT_OVERWRITE_TABLE("insert_overwrite_table"),
// used for old version
UNKNOWN("unknown");
@@ -72,6 +74,8 @@ public enum WriteOperationType {
return DELETE;
case "insert_overwrite":
return INSERT_OVERWRITE;
case "insert_overwrite_table":
return INSERT_OVERWRITE_TABLE;
default:
throw new HoodieException("Invalid value of Type.");
}
@@ -88,4 +92,4 @@ public enum WriteOperationType {
public static boolean isChangingRecords(WriteOperationType operationType) {
return operationType == UPSERT || operationType == UPSERT_PREPPED || operationType == DELETE;
}
}
}

View File

@@ -451,6 +451,10 @@ public class HoodieTestDataGenerator {
return generateInsertsStream(instantTime, n, false, TRIP_EXAMPLE_SCHEMA, true).collect(Collectors.toList());
}
public List<HoodieRecord> generateInsertsForPartition(String instantTime, Integer n, String partition) {
return generateInsertsStream(instantTime, n, false, TRIP_EXAMPLE_SCHEMA, false, () -> partition, () -> UUID.randomUUID().toString()).collect(Collectors.toList());
}
public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer n, boolean isFlattened, String schemaStr, boolean containsAllPartitions) {
return generateInsertsStream(commitTime, n, isFlattened, schemaStr, containsAllPartitions,
() -> partitionPaths[RAND.nextInt(partitionPaths.length)],

View File

@@ -86,7 +86,7 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
.collect(Collectors.groupingBy(split -> FSUtils.getFileId(split.getPath().getName())));
// Get the maxCommit from the last delta or compaction or commit - when bootstrapped from COW table
String maxCommitTime = metaClient.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp();
latestFileSlices.forEach(fileSlice -> {
List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());

View File

@@ -191,7 +191,7 @@ public class DataSourceUtils {
}
public static String getCommitActionType(WriteOperationType operation, HoodieTableType tableType) {
if (operation == WriteOperationType.INSERT_OVERWRITE) {
if (operation == WriteOperationType.INSERT_OVERWRITE || operation == WriteOperationType.INSERT_OVERWRITE_TABLE) {
return HoodieTimeline.REPLACE_COMMIT_ACTION;
} else {
return CommitUtils.getCommitActionType(tableType);
@@ -211,6 +211,8 @@ public class DataSourceUtils {
return new HoodieWriteResult(client.upsert(hoodieRecords, instantTime));
case INSERT_OVERWRITE:
return client.insertOverwrite(hoodieRecords, instantTime);
case INSERT_OVERWRITE_TABLE:
return client.insertOverwriteTable(hoodieRecords, instantTime);
default:
throw new HoodieException("Not a valid operation type for doWriteOperation: " + operation.toString());
}

View File

@@ -74,7 +74,8 @@ public class HoodieDataSourceHelpers {
if (metaClient.getTableType().equals(HoodieTableType.MERGE_ON_READ)) {
return metaClient.getActiveTimeline().getTimelineOfActions(
CollectionUtils.createSet(HoodieActiveTimeline.COMMIT_ACTION,
HoodieActiveTimeline.DELTA_COMMIT_ACTION)).filterCompletedInstants();
HoodieActiveTimeline.DELTA_COMMIT_ACTION,
HoodieActiveTimeline.REPLACE_COMMIT_ACTION)).filterCompletedInstants();
} else {
return metaClient.getCommitTimeline().filterCompletedInstants();
}

View File

@@ -157,6 +157,7 @@ object DataSourceWriteOptions {
val DELETE_OPERATION_OPT_VAL = WriteOperationType.DELETE.value
val BOOTSTRAP_OPERATION_OPT_VAL = WriteOperationType.BOOTSTRAP.value
val INSERT_OVERWRITE_OPERATION_OPT_VAL = WriteOperationType.INSERT_OVERWRITE.value
val INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL = WriteOperationType.INSERT_OVERWRITE_TABLE.value
val DEFAULT_OPERATION_OPT_VAL = UPSERT_OPERATION_OPT_VAL
/**

View File

@@ -93,6 +93,13 @@ private[hudi] object HoodieSparkSqlWriter {
operation = WriteOperationType.INSERT
}
// If the mode is Overwrite, can set operation to INSERT_OVERWRITE_TABLE.
// Then in DataSourceUtils.doWriteOperation will use client.insertOverwriteTable to overwrite
// the table. This will replace the old fs.delete(tablepath) mode.
if (mode == SaveMode.Overwrite && operation != WriteOperationType.INSERT_OVERWRITE_TABLE) {
operation = WriteOperationType.INSERT_OVERWRITE_TABLE
}
val jsc = new JavaSparkContext(sparkContext)
val basePath = new Path(path.get)
val instantTime = HoodieActiveTimeline.createNewInstantTime()
@@ -319,10 +326,6 @@ private[hudi] object HoodieSparkSqlWriter {
if (operation != WriteOperationType.DELETE) {
if (mode == SaveMode.ErrorIfExists && tableExists) {
throw new HoodieException(s"hoodie table at $tablePath already exists.")
} else if (mode == SaveMode.Overwrite && tableExists) {
log.warn(s"hoodie table at $tablePath already exists. Deleting existing data & overwriting with new data.")
fs.delete(tablePath, true)
tableExists = false
}
} else {
// Delete Operation only supports Append mode

View File

@@ -18,7 +18,13 @@
package org.apache.hudi.functional
import java.sql.{Date, Timestamp}
import java.util.function.Supplier
import java.util.stream.Stream
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.HoodieClientTestBase
@@ -156,6 +162,79 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertEquals(100, timeTravelDF.count()) // 100 initial inserts must be pulled
}
@Test def testOverWriteModeUseReplaceAction(): Unit = {
val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)
val records2 = recordsToStrings(dataGen.generateInserts("002", 5)).toList
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)
val metaClient = new HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true)
val commits = metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
.map(instant => (instant.asInstanceOf[HoodieInstant]).getAction)
assertEquals(2, commits.size)
assertEquals("commit", commits(0))
assertEquals("replacecommit", commits(1))
}
@Test def testOverWriteModeUseReplaceActionOnDisJointPartitions(): Unit = {
// step1: Write 5 records to hoodie table for partition1 DEFAULT_FIRST_PARTITION_PATH
val records1 = recordsToStrings(dataGen.generateInsertsForPartition("001", 5, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)
// step2: Write 7 more rectestOverWriteModeUseReplaceActionords using SaveMode.Overwrite for partition2 DEFAULT_SECOND_PARTITION_PATH
val records2 = recordsToStrings(dataGen.generateInsertsForPartition("002", 7, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).toList
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
.mode(SaveMode.Overwrite)
.save(basePath)
val allRecords = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*")
allRecords.registerTempTable("tmpTable")
spark.sql(String.format("select count(*) from tmpTable")).show()
// step3: Query the rows count from hoodie table for partition1 DEFAULT_FIRST_PARTITION_PATH
val recordCountForParititon1 = spark.sql(String.format("select count(*) from tmpTable where partition = '%s'", HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).collect()
assertEquals("0", recordCountForParititon1(0).get(0).toString)
// step4: Query the rows count from hoodie table for partition1 DEFAULT_SECOND_PARTITION_PATH
val recordCountForParititon2 = spark.sql(String.format("select count(*) from tmpTable where partition = '%s'", HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).collect()
assertEquals("7", recordCountForParititon2(0).get(0).toString)
// step5: Query the rows count from hoodie table
val recordCount = spark.sql(String.format("select count(*) from tmpTable")).collect()
assertEquals("7", recordCountForParititon2(0).get(0).toString)
// step6: Query the rows count from hoodie table for partition1 DEFAULT_SECOND_PARTITION_PATH using spark.collect and then filter mode
val recordsForPartitionColumn = spark.sql(String.format("select partition from tmpTable")).collect()
val filterSecondPartitionCount = recordsForPartitionColumn.filter(row => row.get(0).equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).size
assertEquals(7,filterSecondPartitionCount)
val metaClient = new HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true)
val commits = metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
.map(instant => instant.asInstanceOf[HoodieInstant].getAction)
assertEquals(2, commits.size)
assertEquals("commit", commits(0))
assertEquals("replacecommit", commits(1))
}
@Test def testDropInsertDup(): Unit = {
val insert1Cnt = 10
val insert2DupKeyCnt = 9