1
0

[HUDI-1072] Introduce REPLACE top level action. Implement insert_overwrite operation on top of replace action (#2048)

This commit is contained in:
satishkotha
2020-09-29 17:04:25 -07:00
committed by GitHub
parent 32c9cad52c
commit a99e93bed5
60 changed files with 2129 additions and 380 deletions

View File

@@ -18,14 +18,20 @@
package org.apache.hudi;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
@@ -42,10 +48,6 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -174,25 +176,35 @@ public class DataSourceUtils {
return new HoodieWriteClient<>(jssc, createHoodieConfig(schemaStr, basePath, tblName, parameters), true);
}
public static JavaRDD<WriteStatus> doWriteOperation(HoodieWriteClient client, JavaRDD<HoodieRecord> hoodieRecords,
String instantTime, WriteOperationType operation) throws HoodieException {
public static String getCommitActionType(WriteOperationType operation, HoodieTableType tableType) {
if (operation == WriteOperationType.INSERT_OVERWRITE) {
return HoodieTimeline.REPLACE_COMMIT_ACTION;
} else {
return CommitUtils.getCommitActionType(tableType);
}
}
public static HoodieWriteResult doWriteOperation(HoodieWriteClient client, JavaRDD<HoodieRecord> hoodieRecords,
String instantTime, WriteOperationType operation) throws HoodieException {
switch (operation) {
case BULK_INSERT:
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner =
createUserDefinedBulkInsertPartitioner(client.getConfig());
return client.bulkInsert(hoodieRecords, instantTime, userDefinedBulkInsertPartitioner);
return new HoodieWriteResult(client.bulkInsert(hoodieRecords, instantTime, userDefinedBulkInsertPartitioner));
case INSERT:
return client.insert(hoodieRecords, instantTime);
return new HoodieWriteResult(client.insert(hoodieRecords, instantTime));
case UPSERT:
return client.upsert(hoodieRecords, instantTime);
return new HoodieWriteResult(client.upsert(hoodieRecords, instantTime));
case INSERT_OVERWRITE:
return client.insertOverwrite(hoodieRecords, instantTime);
default:
throw new HoodieException("Not a valid operation type for doWriteOperation: " + operation.toString());
}
}
public static JavaRDD<WriteStatus> doDeleteOperation(HoodieWriteClient client, JavaRDD<HoodieKey> hoodieKeys,
public static HoodieWriteResult doDeleteOperation(HoodieWriteClient client, JavaRDD<HoodieKey> hoodieKeys,
String instantTime) {
return client.delete(hoodieKeys, instantTime);
return new HoodieWriteResult(client.delete(hoodieKeys, instantTime));
}
public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey,

View File

@@ -73,6 +73,10 @@ public class QuickstartUtils {
this(DEFAULT_PARTITION_PATHS, new HashMap<>());
}
public DataGenerator(String[] partitionPaths) {
this(partitionPaths, new HashMap<>());
}
private DataGenerator(String[] partitionPaths, Map<Integer, HoodieKey> keyPartitionMap) {
this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length);
this.existingKeys = keyPartitionMap;

View File

@@ -18,11 +18,8 @@
package org.apache.hudi.internal;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
@@ -44,6 +41,10 @@ import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/**
* Implementation of {@link DataSourceWriter} for datasource "hudi.internal" to be used in datasource implementation
* of bulk insert.
@@ -102,7 +103,8 @@ public class HoodieDataSourceInternalWriter implements DataSourceWriter {
.flatMap(m -> m.getWriteStatuses().stream().map(m2 -> m2.getStat())).collect(Collectors.toList());
try {
writeClient.commitStats(instantTime, writeStatList, Option.empty());
writeClient.commitStats(instantTime, writeStatList, Option.empty(),
DataSourceUtils.getCommitActionType(operationType, metaClient.getTableType()));
} catch (Exception ioe) {
throw new HoodieException(ioe.getMessage(), ioe);
} finally {

View File

@@ -142,6 +142,7 @@ object DataSourceWriteOptions {
val UPSERT_OPERATION_OPT_VAL = WriteOperationType.UPSERT.value
val DELETE_OPERATION_OPT_VAL = WriteOperationType.DELETE.value
val BOOTSTRAP_OPERATION_OPT_VAL = WriteOperationType.BOOTSTRAP.value
val INSERT_OVERWRITE_OPERATION_OPT_VAL = WriteOperationType.INSERT_OVERWRITE.value
val DEFAULT_OPERATION_OPT_VAL = UPSERT_OPERATION_OPT_VAL
/**

View File

@@ -27,13 +27,10 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.{HoodieWriteClient, WriteStatus}
import org.apache.hudi.client.{HoodieWriteClient, HoodieWriteResult}
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.HoodieRecordPayload
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.model.WriteOperationType
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.util.ReflectionUtils
import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP, DEFAULT_BOOTSTRAP_INDEX_CLASS}
@@ -44,7 +41,7 @@ import org.apache.hudi.internal.HoodieDataSourceInternalWriter
import org.apache.hudi.sync.common.AbstractSyncTool
import org.apache.log4j.LogManager
import org.apache.spark.SparkContext
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
@@ -80,7 +77,7 @@ private[hudi] object HoodieSparkSqlWriter {
case Some(ser) if ser.equals("org.apache.spark.serializer.KryoSerializer") =>
case _ => throw new HoodieException("hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer")
}
val tableType = parameters(TABLE_TYPE_OPT_KEY)
val tableType = HoodieTableType.valueOf(parameters(TABLE_TYPE_OPT_KEY))
var operation = WriteOperationType.fromValue(parameters(OPERATION_OPT_KEY))
// It does not make sense to allow upsert() operation if INSERT_DROP_DUPS_OPT_KEY is true
// Auto-correct the operation to "insert" if OPERATION_OPT_KEY is set to "upsert" wrongly
@@ -113,11 +110,13 @@ private[hudi] object HoodieSparkSqlWriter {
val archiveLogFolder = parameters.getOrElse(
HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
val tableMetaClient = HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get,
HoodieTableType.valueOf(tableType), tblName, archiveLogFolder, parameters(PAYLOAD_CLASS_OPT_KEY),
tableType, tblName, archiveLogFolder, parameters(PAYLOAD_CLASS_OPT_KEY),
null.asInstanceOf[String])
tableConfig = tableMetaClient.getTableConfig
}
val commitActionType = DataSourceUtils.getCommitActionType(operation, tableConfig.getTableType)
// short-circuit if bulk_insert via row is enabled.
// scalastyle:off
if (parameters(ENABLE_ROW_WRITER_OPT_KEY).toBoolean) {
@@ -127,7 +126,7 @@ private[hudi] object HoodieSparkSqlWriter {
}
// scalastyle:on
val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
val (writeResult, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
if (operation != WriteOperationType.DELETE) {
// register classes & schemas
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
@@ -169,9 +168,9 @@ private[hudi] object HoodieSparkSqlWriter {
log.info("new batch has no new records, skipping...")
(true, common.util.Option.empty())
}
client.startCommitWithTime(instantTime)
val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
(writeStatuses, client)
client.startCommitWithTime(instantTime, commitActionType)
val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
(writeResult, client)
} else {
val structName = s"${tblName}_record"
val nameSpace = s"hoodie.${tblName}"
@@ -198,15 +197,15 @@ private[hudi] object HoodieSparkSqlWriter {
}
// Issue deletes
client.startCommitWithTime(instantTime)
client.startCommitWithTime(instantTime, commitActionType)
val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, instantTime)
(writeStatuses, client)
}
// Check for errors and commit the write.
val (writeSuccessful, compactionInstant) =
commitAndPerformPostOperations(writeStatuses, parameters, writeClient, tableConfig, instantTime, basePath,
operation, jsc)
commitAndPerformPostOperations(writeResult, parameters, writeClient, tableConfig, jsc,
TableInstantInfo(basePath, instantTime, commitActionType, operation))
(writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, writeClient, tableConfig)
}
}
@@ -383,31 +382,34 @@ private[hudi] object HoodieSparkSqlWriter {
metaSyncSuccess
}
private def commitAndPerformPostOperations(writeStatuses: JavaRDD[WriteStatus],
/**
* Group all table/action specific information into a case class.
*/
case class TableInstantInfo(basePath: Path, instantTime: String, commitActionType: String, operation: WriteOperationType)
private def commitAndPerformPostOperations(writeResult: HoodieWriteResult,
parameters: Map[String, String],
client: HoodieWriteClient[HoodieRecordPayload[Nothing]],
tableConfig: HoodieTableConfig,
instantTime: String,
basePath: Path,
operation: WriteOperationType,
jsc: JavaSparkContext): (Boolean, common.util.Option[java.lang.String]) = {
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
jsc: JavaSparkContext,
tableInstantInfo: TableInstantInfo
): (Boolean, common.util.Option[java.lang.String]) = {
val errorCount = writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).count()
if (errorCount == 0) {
log.info("No errors. Proceeding to commit the write.")
val metaMap = parameters.filter(kv =>
kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
val commitSuccess = if (metaMap.isEmpty) {
client.commit(instantTime, writeStatuses)
} else {
client.commit(instantTime, writeStatuses,
common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
}
val commitSuccess =
client.commit(tableInstantInfo.instantTime, writeResult.getWriteStatuses,
common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))),
tableInstantInfo.commitActionType,
writeResult.getPartitionToReplaceFileIds)
if (commitSuccess) {
log.info("Commit " + instantTime + " successful!")
log.info("Commit " + tableInstantInfo.instantTime + " successful!")
}
else {
log.info("Commit " + instantTime + " failed!")
log.info("Commit " + tableInstantInfo.instantTime + " failed!")
}
val asyncCompactionEnabled = isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())
@@ -419,7 +421,7 @@ private[hudi] object HoodieSparkSqlWriter {
}
log.info(s"Compaction Scheduled is $compactionInstant")
val metaSyncSuccess = metaSync(parameters, basePath, jsc.hadoopConfiguration())
val metaSyncSuccess = metaSync(parameters, tableInstantInfo.basePath, jsc.hadoopConfiguration())
log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled")
if (!asyncCompactionEnabled) {
@@ -427,10 +429,10 @@ private[hudi] object HoodieSparkSqlWriter {
}
(commitSuccess && metaSyncSuccess, compactionInstant)
} else {
log.error(s"${operation.toString} failed with $errorCount errors :")
log.error(s"${tableInstantInfo.operation} failed with $errorCount errors :")
if (log.isTraceEnabled) {
log.trace("Printing out the top 100 errors")
writeStatuses.rdd.filter(ws => ws.hasErrors)
writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors)
.take(100)
.foreach(ws => {
log.trace("Global error :", ws.getGlobalError)