diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index bf9e20362..69544e887 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -70,6 +70,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism";
public static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism";
public static final String BULKINSERT_USER_DEFINED_PARTITIONER_CLASS = "hoodie.bulkinsert.user.defined.partitioner.class";
+ public static final String BULKINSERT_INPUT_DATA_SCHEMA_DDL = "hoodie.bulkinsert.schema.ddl";
public static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
public static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism";
public static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
diff --git a/hudi-spark-datasource/hudi-spark-common/pom.xml b/hudi-spark-datasource/hudi-spark-common/pom.xml
index af6403dc5..763d3b2b7 100644
--- a/hudi-spark-datasource/hudi-spark-common/pom.xml
+++ b/hudi-spark-datasource/hudi-spark-common/pom.xml
@@ -173,6 +173,38 @@
org.apache.spark
spark-sql_${scala.binary.version}
+
+
+
+ org.apache.hudi
+ hudi-client-common
+ ${project.version}
+ tests
+ test-jar
+ test
+
+
+ org.apache.hudi
+ hudi-spark-client
+ ${project.version}
+ tests
+ test-jar
+ test
+
+
+ org.apache.hudi
+ hudi-common
+ ${project.version}
+ tests
+ test-jar
+ test
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseDefaultSource.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseDefaultSource.java
new file mode 100644
index 000000000..e75c9a213
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseDefaultSource.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * Base class for DefaultSource used by Spark datasource v2.
+ */
+public class BaseDefaultSource {
+
+ protected SparkSession sparkSession = null;
+ protected Configuration configuration = null;
+
+ protected SparkSession getSparkSession() {
+ if (sparkSession == null) {
+ sparkSession = SparkSession.builder().getOrCreate();
+ }
+ return sparkSession;
+ }
+
+ protected Configuration getConfiguration() {
+ if (configuration == null) {
+ this.configuration = getSparkSession().sparkContext().hadoopConfiguration();
+ }
+ return configuration;
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java
new file mode 100644
index 000000000..88a792123
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Base class for HoodieWriterCommitMessage used by Spark datasource v2.
+ */
+public class BaseWriterCommitMessage {
+
+ private List writeStatuses;
+
+ public BaseWriterCommitMessage(List writeStatuses) {
+ this.writeStatuses = writeStatuses;
+ }
+
+ public List getWriteStatuses() {
+ return writeStatuses;
+ }
+
+ @Override
+ public String toString() {
+ return "HoodieWriterCommitMessage{" + "writeStatuses=" + Arrays.toString(writeStatuses.toArray()) + '}';
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java
new file mode 100644
index 000000000..eb26c4f32
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.HoodieRowCreateHandle;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Helper class for HoodieBulkInsertDataInternalWriter used by Spark datasource v2.
+ */
+public class BulkInsertDataInternalWriterHelper {
+
+ private static final Logger LOG = LogManager.getLogger(BulkInsertDataInternalWriterHelper.class);
+
+ private final String instantTime;
+ private final int taskPartitionId;
+ private final long taskId;
+ private final long taskEpochId;
+ private final HoodieTable hoodieTable;
+ private final HoodieWriteConfig writeConfig;
+ private final StructType structType;
+ private final List writeStatusList = new ArrayList<>();
+
+ private HoodieRowCreateHandle handle;
+ private String lastKnownPartitionPath = null;
+ private String fileIdPrefix;
+ private int numFilesWritten = 0;
+
+ public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, HoodieWriteConfig writeConfig,
+ String instantTime, int taskPartitionId, long taskId, long taskEpochId, StructType structType) {
+ this.hoodieTable = hoodieTable;
+ this.writeConfig = writeConfig;
+ this.instantTime = instantTime;
+ this.taskPartitionId = taskPartitionId;
+ this.taskId = taskId;
+ this.taskEpochId = taskEpochId;
+ this.structType = structType;
+ this.fileIdPrefix = UUID.randomUUID().toString();
+ }
+
+ public void write(InternalRow record) throws IOException {
+ try {
+ String partitionPath = record.getUTF8String(
+ HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString();
+
+ if ((lastKnownPartitionPath == null) || !lastKnownPartitionPath.equals(partitionPath) || !handle.canWrite()) {
+ LOG.info("Creating new file for partition path " + partitionPath);
+ createNewHandle(partitionPath);
+ lastKnownPartitionPath = partitionPath;
+ }
+ handle.write(record);
+ } catch (Throwable t) {
+ LOG.error("Global error thrown while trying to write records in HoodieRowCreateHandle ", t);
+ throw t;
+ }
+ }
+
+ public List getWriteStatuses() throws IOException {
+ close();
+ return writeStatusList;
+ }
+
+ public void abort() {
+ }
+
+ private void createNewHandle(String partitionPath) throws IOException {
+ if (null != handle) {
+ close();
+ }
+ handle = new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
+ instantTime, taskPartitionId, taskId, taskEpochId, structType);
+ }
+
+ public void close() throws IOException {
+ if (null != handle) {
+ writeStatusList.add(handle.close());
+ handle = null;
+ }
+ }
+
+ private String getNextFileId() {
+ return String.format("%s-%d", fileIdPrefix, numFilesWritten++);
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
new file mode 100644
index 000000000..b40d36bea
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+
+/**
+ * Helper class for HoodieDataSourceInternalWriter used by Spark datasource v2.
+ */
+public class DataSourceInternalWriterHelper {
+
+ private static final Logger LOG = LogManager.getLogger(DataSourceInternalWriterHelper.class);
+ public static final String INSTANT_TIME_OPT_KEY = "hoodie.instant.time";
+
+ private final String instantTime;
+ private final HoodieTableMetaClient metaClient;
+ private final SparkRDDWriteClient writeClient;
+ private final HoodieTable hoodieTable;
+ private final WriteOperationType operationType;
+
+ public DataSourceInternalWriterHelper(String instantTime, HoodieWriteConfig writeConfig, StructType structType,
+ SparkSession sparkSession, Configuration configuration) {
+ this.instantTime = instantTime;
+ this.operationType = WriteOperationType.BULK_INSERT;
+ this.writeClient = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), writeConfig, true);
+ writeClient.setOperationType(operationType);
+ writeClient.startCommitWithTime(instantTime);
+ this.metaClient = new HoodieTableMetaClient(configuration, writeConfig.getBasePath());
+ this.hoodieTable = HoodieSparkTable.create(writeConfig, new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), metaClient);
+ }
+
+ public boolean useCommitCoordinator() {
+ return true;
+ }
+
+ public void onDataWriterCommit(String message) {
+ LOG.info("Received commit of a data writer = " + message);
+ }
+
+ public void commit(List writeStatList) {
+ try {
+ writeClient.commitStats(instantTime, writeStatList, Option.empty(),
+ DataSourceUtils.getCommitActionType(operationType, metaClient.getTableType()));
+ } catch (Exception ioe) {
+ throw new HoodieException(ioe.getMessage(), ioe);
+ } finally {
+ writeClient.close();
+ }
+ }
+
+ public void abort() {
+ LOG.error("Commit " + instantTime + " aborted ");
+ writeClient.rollback(instantTime);
+ writeClient.close();
+ }
+
+ public void createInflightCommit() {
+ metaClient.getActiveTimeline().transitionRequestedToInflight(
+ new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, instantTime), Option.empty());
+ }
+
+ public HoodieTable getHoodieTable() {
+ return hoodieTable;
+ }
+
+ public WriteOperationType getWriteOperationType() {
+ return operationType;
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java
new file mode 100644
index 000000000..d66a5ee51
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class for TestHoodieBulkInsertDataInternalWriter.
+ */
+public class HoodieBulkInsertInternalWriterTestBase extends HoodieClientTestHarness {
+
+ protected static final Random RANDOM = new Random();
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ initSparkContexts();
+ initPath();
+ initFileSystem();
+ initTestDataGenerator();
+ initMetaClient();
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ cleanupResources();
+ }
+
+ protected void assertWriteStatuses(List writeStatuses, int batches, int size,
+ Option> fileAbsPaths, Option> fileNames) {
+ assertEquals(batches, writeStatuses.size());
+ int counter = 0;
+ for (HoodieInternalWriteStatus writeStatus : writeStatuses) {
+ // verify write status
+ assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3], writeStatus.getPartitionPath());
+ assertEquals(writeStatus.getTotalRecords(), size);
+ assertNull(writeStatus.getGlobalError());
+ assertEquals(writeStatus.getFailedRowsSize(), 0);
+ assertEquals(writeStatus.getTotalErrorRecords(), 0);
+ assertFalse(writeStatus.hasErrors());
+ assertNotNull(writeStatus.getFileId());
+ String fileId = writeStatus.getFileId();
+ if (fileAbsPaths.isPresent()) {
+ fileAbsPaths.get().add(basePath + "/" + writeStatus.getStat().getPath());
+ }
+ if (fileNames.isPresent()) {
+ fileNames.get().add(writeStatus.getStat().getPath()
+ .substring(writeStatus.getStat().getPath().lastIndexOf('/') + 1));
+ }
+ HoodieWriteStat writeStat = writeStatus.getStat();
+ assertEquals(size, writeStat.getNumInserts());
+ assertEquals(size, writeStat.getNumWrites());
+ assertEquals(fileId, writeStat.getFileId());
+ assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter++ % 3], writeStat.getPartitionPath());
+ assertEquals(0, writeStat.getNumDeletes());
+ assertEquals(0, writeStat.getNumUpdateWrites());
+ assertEquals(0, writeStat.getTotalWriteErrors());
+ }
+ }
+
+ protected void assertOutput(Dataset expectedRows, Dataset actualRows, String instantTime, Option> fileNames) {
+ // verify 3 meta fields that are filled in within create handle
+ actualRows.collectAsList().forEach(entry -> {
+ assertEquals(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).toString(), instantTime);
+ assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD)));
+ if (fileNames.isPresent()) {
+ assertTrue(fileNames.get().contains(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS
+ .get(HoodieRecord.FILENAME_METADATA_FIELD))));
+ }
+ assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)));
+ });
+
+ // after trimming 2 of the meta fields, rest of the fields should match
+ Dataset trimmedExpected = expectedRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD);
+ Dataset trimmedActual = actualRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD);
+ assertEquals(0, trimmedActual.except(trimmedExpected).count());
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index d66103600..eb7319fd8 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -38,7 +38,7 @@ import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, B
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
-import org.apache.hudi.internal.HoodieDataSourceInternalWriter
+import org.apache.hudi.internal.{HoodieDataSourceInternalWriter, DataSourceInternalWriterHelper}
import org.apache.hudi.sync.common.AbstractSyncTool
import org.apache.log4j.LogManager
import org.apache.spark.SPARK_VERSION
@@ -130,9 +130,6 @@ private[hudi] object HoodieSparkSqlWriter {
// scalastyle:off
if (parameters(ENABLE_ROW_WRITER_OPT_KEY).toBoolean &&
operation == WriteOperationType.BULK_INSERT) {
- if (!SPARK_VERSION.startsWith("2.")) {
- throw new HoodieException("Bulk insert using row writer is not supported with Spark 3. To use row writer please switch to spark 2.")
- }
val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, parameters, df, tblName,
basePath, path, instantTime)
return (success, commitTime, common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
@@ -299,10 +296,22 @@ private[hudi] object HoodieSparkSqlWriter {
val nameSpace = s"hoodie.${tblName}"
val writeConfig = DataSourceUtils.createHoodieConfig(null, path.get, tblName, mapAsJavaMap(parameters))
val hoodieDF = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace)
- hoodieDF.write.format("org.apache.hudi.internal")
- .option(HoodieDataSourceInternalWriter.INSTANT_TIME_OPT_KEY, instantTime)
- .options(parameters)
- .save()
+ if (SPARK_VERSION.startsWith("2.")) {
+ hoodieDF.write.format("org.apache.hudi.internal")
+ .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
+ .options(parameters)
+ .save()
+ } else if (SPARK_VERSION.startsWith("3.")) {
+ hoodieDF.write.format("org.apache.hudi.spark3.internal")
+ .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
+ .option(HoodieWriteConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL, hoodieDF.schema.toDDL)
+ .options(parameters)
+ .mode(SaveMode.Append)
+ .save()
+ } else {
+ throw new HoodieException("Bulk insert using row writer is not supported with current Spark version."
+ + " To use row writer please switch to spark 2 or spark 3")
+ }
val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
val metaSyncEnabled = parameters.get(META_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
val syncHiveSucess = if (hiveSyncEnabled || metaSyncEnabled) {
diff --git a/hudi-spark-datasource/hudi-spark2/pom.xml b/hudi-spark-datasource/hudi-spark2/pom.xml
index 8947bb79f..cd44e79ce 100644
--- a/hudi-spark-datasource/hudi-spark2/pom.xml
+++ b/hudi-spark-datasource/hudi-spark2/pom.xml
@@ -214,6 +214,14 @@
test-jar
test
+
+ org.apache.hudi
+ hudi-spark-common
+ ${project.version}
+ tests
+ test-jar
+ test
+
org.junit.jupiter
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
index 5fb71df77..526f0ce47 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
+++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
@@ -21,11 +21,7 @@ package org.apache.hudi.internal;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
@@ -40,14 +36,8 @@ import java.util.Optional;
/**
* DataSource V2 implementation for managing internal write logic. Only called internally.
*/
-public class DefaultSource implements DataSourceV2, ReadSupport, WriteSupport,
- DataSourceRegister {
-
- private static final Logger LOG = LogManager
- .getLogger(DefaultSource.class);
-
- private SparkSession sparkSession = null;
- private Configuration configuration = null;
+public class DefaultSource extends BaseDefaultSource implements DataSourceV2,
+ ReadSupport, WriteSupport, DataSourceRegister {
@Override
public String shortName() {
@@ -67,25 +57,11 @@ public class DefaultSource implements DataSourceV2, ReadSupport, WriteSupport,
@Override
public Optional createWriter(String writeUUID, StructType schema, SaveMode mode,
DataSourceOptions options) {
- String instantTime = options.get(HoodieDataSourceInternalWriter.INSTANT_TIME_OPT_KEY).get();
+ String instantTime = options.get(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY).get();
String path = options.get("path").get();
String tblName = options.get(HoodieWriteConfig.TABLE_NAME).get();
HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(null, path, tblName, options.asMap());
return Optional.of(new HoodieDataSourceInternalWriter(instantTime, config, schema, getSparkSession(),
getConfiguration()));
}
-
- private SparkSession getSparkSession() {
- if (sparkSession == null) {
- sparkSession = SparkSession.builder().getOrCreate();
- }
- return sparkSession;
- }
-
- private Configuration getConfiguration() {
- if (configuration == null) {
- this.configuration = getSparkSession().sparkContext().hadoopConfiguration();
- }
- return configuration;
- }
}
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java
index 7aa0fc6a3..3ce8d776a 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java
+++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java
@@ -18,102 +18,42 @@
package org.apache.hudi.internal;
-import org.apache.hudi.client.HoodieInternalWriteStatus;
-import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.io.HoodieRowCreateHandle;
import org.apache.hudi.table.HoodieTable;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.writer.DataWriter;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
/**
* Hoodie's Implementation of {@link DataWriter}. This is used in data source implementation for bulk insert.
*/
public class HoodieBulkInsertDataInternalWriter implements DataWriter {
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LogManager.getLogger(HoodieBulkInsertDataInternalWriter.class);
-
- private final String instantTime;
- private final int taskPartitionId;
- private final long taskId;
- private final long taskEpochId;
- private final HoodieTable hoodieTable;
- private final HoodieWriteConfig writeConfig;
- private final StructType structType;
- private final List writeStatusList = new ArrayList<>();
-
- private HoodieRowCreateHandle handle;
- private String lastKnownPartitionPath = null;
- private String fileIdPrefix = null;
- private int numFilesWritten = 0;
+ private final BulkInsertDataInternalWriterHelper bulkInsertWriterHelper;
public HoodieBulkInsertDataInternalWriter(HoodieTable hoodieTable, HoodieWriteConfig writeConfig,
String instantTime, int taskPartitionId, long taskId, long taskEpochId,
StructType structType) {
- this.hoodieTable = hoodieTable;
- this.writeConfig = writeConfig;
- this.instantTime = instantTime;
- this.taskPartitionId = taskPartitionId;
- this.taskId = taskId;
- this.taskEpochId = taskEpochId;
- this.structType = structType;
- this.fileIdPrefix = UUID.randomUUID().toString();
+ this.bulkInsertWriterHelper = new BulkInsertDataInternalWriterHelper(hoodieTable,
+ writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, structType);
}
@Override
public void write(InternalRow record) throws IOException {
- try {
- String partitionPath = record.getUTF8String(
- HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString();
-
- if ((lastKnownPartitionPath == null) || !lastKnownPartitionPath.equals(partitionPath) || !handle.canWrite()) {
- LOG.info("Creating new file for partition path " + partitionPath);
- createNewHandle(partitionPath);
- lastKnownPartitionPath = partitionPath;
- }
- handle.write(record);
- } catch (Throwable t) {
- LOG.error("Global error thrown while trying to write records in HoodieRowCreateHandle ", t);
- throw t;
- }
+ bulkInsertWriterHelper.write(record);
}
@Override
public WriterCommitMessage commit() throws IOException {
- close();
- return new HoodieWriterCommitMessage(writeStatusList);
+ return new HoodieWriterCommitMessage(bulkInsertWriterHelper.getWriteStatuses());
}
@Override
- public void abort() throws IOException {
- }
-
- private void createNewHandle(String partitionPath) throws IOException {
- if (null != handle) {
- close();
- }
- handle = new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
- instantTime, taskPartitionId, taskId, taskEpochId, structType);
- }
-
- public void close() throws IOException {
- if (null != handle) {
- writeStatusList.add(handle.close());
- }
- }
-
- protected String getNextFileId() {
- return String.format("%s-%d", fileIdPrefix, numFilesWritten++);
+ public void abort() {
+ bulkInsertWriterHelper.abort();
}
}
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
index e8cbff80a..4b3dafc62 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
+++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
@@ -18,24 +18,12 @@
package org.apache.hudi.internal;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hudi.DataSourceUtils;
-import org.apache.hudi.client.SparkRDDWriteClient;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.client.HoodieInternalWriteStatus;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieInstant.State;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.table.HoodieSparkTable;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaSparkContext;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
@@ -53,71 +41,50 @@ import java.util.stream.Collectors;
*/
public class HoodieDataSourceInternalWriter implements DataSourceWriter {
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LogManager.getLogger(HoodieDataSourceInternalWriter.class);
- public static final String INSTANT_TIME_OPT_KEY = "hoodie.instant.time";
-
private final String instantTime;
- private final HoodieTableMetaClient metaClient;
private final HoodieWriteConfig writeConfig;
private final StructType structType;
- private final SparkRDDWriteClient writeClient;
- private final HoodieTable hoodieTable;
- private final WriteOperationType operationType;
+ private final DataSourceInternalWriterHelper dataSourceInternalWriterHelper;
public HoodieDataSourceInternalWriter(String instantTime, HoodieWriteConfig writeConfig, StructType structType,
SparkSession sparkSession, Configuration configuration) {
this.instantTime = instantTime;
this.writeConfig = writeConfig;
this.structType = structType;
- this.operationType = WriteOperationType.BULK_INSERT;
- this.writeClient = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), writeConfig, true);
- writeClient.setOperationType(operationType);
- writeClient.startCommitWithTime(instantTime);
- this.metaClient = new HoodieTableMetaClient(configuration, writeConfig.getBasePath());
- this.hoodieTable = HoodieSparkTable.create(writeConfig, new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), metaClient);
+ this.dataSourceInternalWriterHelper = new DataSourceInternalWriterHelper(instantTime, writeConfig, structType,
+ sparkSession, configuration);
}
@Override
public DataWriterFactory createWriterFactory() {
- metaClient.getActiveTimeline().transitionRequestedToInflight(
- new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, instantTime), Option.empty());
- if (WriteOperationType.BULK_INSERT == operationType) {
- return new HoodieBulkInsertDataInternalWriterFactory(hoodieTable, writeConfig, instantTime, structType);
+ dataSourceInternalWriterHelper.createInflightCommit();
+ if (WriteOperationType.BULK_INSERT == dataSourceInternalWriterHelper.getWriteOperationType()) {
+ return new HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(),
+ writeConfig, instantTime, structType);
} else {
- throw new IllegalArgumentException("Write Operation Type + " + operationType + " not supported ");
+ throw new IllegalArgumentException("Write Operation Type + " + dataSourceInternalWriterHelper.getWriteOperationType() + " not supported ");
}
}
@Override
public boolean useCommitCoordinator() {
- return true;
+ return dataSourceInternalWriterHelper.useCommitCoordinator();
}
@Override
public void onDataWriterCommit(WriterCommitMessage message) {
- LOG.info("Received commit of a data writer =" + message);
+ dataSourceInternalWriterHelper.onDataWriterCommit(message.toString());
}
@Override
public void commit(WriterCommitMessage[] messages) {
List writeStatList = Arrays.stream(messages).map(m -> (HoodieWriterCommitMessage) m)
- .flatMap(m -> m.getWriteStatuses().stream().map(m2 -> m2.getStat())).collect(Collectors.toList());
-
- try {
- writeClient.commitStats(instantTime, writeStatList, Option.empty(),
- DataSourceUtils.getCommitActionType(operationType, metaClient.getTableType()));
- } catch (Exception ioe) {
- throw new HoodieException(ioe.getMessage(), ioe);
- } finally {
- writeClient.close();
- }
+ .flatMap(m -> m.getWriteStatuses().stream().map(HoodieInternalWriteStatus::getStat)).collect(Collectors.toList());
+ dataSourceInternalWriterHelper.commit(writeStatList);
}
@Override
public void abort(WriterCommitMessage[] messages) {
- LOG.error("Commit " + instantTime + " aborted ");
- writeClient.rollback(instantTime);
- writeClient.close();
+ dataSourceInternalWriterHelper.abort();
}
}
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java
index 757000c57..240e4b981 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java
+++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java
@@ -18,28 +18,18 @@
package org.apache.hudi.internal;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.hudi.client.HoodieInternalWriteStatus;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import java.util.List;
+
/**
* Hoodie's {@link WriterCommitMessage} used in datasource implementation.
*/
-public class HoodieWriterCommitMessage implements WriterCommitMessage {
-
- private List writeStatuses = new ArrayList<>();
+public class HoodieWriterCommitMessage extends BaseWriterCommitMessage
+ implements WriterCommitMessage {
public HoodieWriterCommitMessage(List writeStatuses) {
- this.writeStatuses = writeStatuses;
- }
-
- public List getWriteStatuses() {
- return writeStatuses;
- }
-
- @Override
- public String toString() {
- return "HoodieWriterCommitMessage{" + "writeStatuses=" + writeStatuses + '}';
+ super(writeStatuses);
}
}
diff --git a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
index ac69af51e..0b021abeb 100644
--- a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
+++ b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
@@ -18,26 +18,19 @@
package org.apache.hudi.internal;
-import org.apache.hudi.client.HoodieInternalWriteStatus;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.testutils.HoodieClientTestHarness;
-import org.apache.spark.package$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
-import java.util.Random;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE;
@@ -45,36 +38,13 @@ import static org.apache.hudi.testutils.SparkDatasetTestUtils.getConfigBuilder;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getInternalRowWithError;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
-import static org.junit.jupiter.api.Assumptions.assumeTrue;
/**
* Unit tests {@link HoodieBulkInsertDataInternalWriter}.
*/
-public class TestHoodieBulkInsertDataInternalWriter extends HoodieClientTestHarness {
-
- private static final Random RANDOM = new Random();
-
- @BeforeEach
- public void setUp() throws Exception {
- // this test is only compatible with spark 2
- assumeTrue(package$.MODULE$.SPARK_VERSION().startsWith("2."));
- initSparkContexts("TestHoodieBulkInsertDataInternalWriter");
- initPath();
- initFileSystem();
- initTestDataGenerator();
- initMetaClient();
- }
-
- @AfterEach
- public void tearDown() throws Exception {
- cleanupResources();
- }
+public class TestHoodieBulkInsertDataInternalWriter extends
+ HoodieBulkInsertInternalWriterTestBase {
@Test
public void testDataInternalWriter() throws Exception {
@@ -103,15 +73,15 @@ public class TestHoodieBulkInsertDataInternalWriter extends HoodieClientTestHarn
}
}
- HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit();
- List fileAbsPaths = new ArrayList<>();
- List fileNames = new ArrayList<>();
+ BaseWriterCommitMessage commitMetadata = (BaseWriterCommitMessage) writer.commit();
+ Option> fileAbsPaths = Option.of(new ArrayList<>());
+ Option> fileNames = Option.of(new ArrayList<>());
// verify write statuses
assertWriteStatuses(commitMetadata.getWriteStatuses(), batches, size, fileAbsPaths, fileNames);
// verify rows
- Dataset result = sqlContext.read().parquet(fileAbsPaths.toArray(new String[0]));
+ Dataset result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0]));
assertOutput(totalInputRows, result, instantTime, fileNames);
}
}
@@ -156,15 +126,15 @@ public class TestHoodieBulkInsertDataInternalWriter extends HoodieClientTestHarn
// expected
}
- HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit();
+ BaseWriterCommitMessage commitMetadata = (BaseWriterCommitMessage) writer.commit();
- List fileAbsPaths = new ArrayList<>();
- List fileNames = new ArrayList<>();
+ Option> fileAbsPaths = Option.of(new ArrayList<>());
+ Option> fileNames = Option.of(new ArrayList<>());
// verify write statuses
assertWriteStatuses(commitMetadata.getWriteStatuses(), 1, size / 2, fileAbsPaths, fileNames);
// verify rows
- Dataset result = sqlContext.read().parquet(fileAbsPaths.toArray(new String[0]));
+ Dataset result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0]));
assertOutput(inputRows, result, instantTime, fileNames);
}
@@ -176,43 +146,4 @@ public class TestHoodieBulkInsertDataInternalWriter extends HoodieClientTestHarn
writer.write(internalRow);
}
}
-
- private void assertWriteStatuses(List writeStatuses, int batches, int size, List fileAbsPaths, List fileNames) {
- assertEquals(batches, writeStatuses.size());
- int counter = 0;
- for (HoodieInternalWriteStatus writeStatus : writeStatuses) {
- // verify write status
- assertEquals(writeStatus.getTotalRecords(), size);
- assertNull(writeStatus.getGlobalError());
- assertEquals(writeStatus.getFailedRowsSize(), 0);
- assertNotNull(writeStatus.getFileId());
- String fileId = writeStatus.getFileId();
- assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3], writeStatus.getPartitionPath());
- fileAbsPaths.add(basePath + "/" + writeStatus.getStat().getPath());
- fileNames.add(writeStatus.getStat().getPath().substring(writeStatus.getStat().getPath().lastIndexOf('/') + 1));
- HoodieWriteStat writeStat = writeStatus.getStat();
- assertEquals(size, writeStat.getNumInserts());
- assertEquals(size, writeStat.getNumWrites());
- assertEquals(fileId, writeStat.getFileId());
- assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter++ % 3], writeStat.getPartitionPath());
- assertEquals(0, writeStat.getNumDeletes());
- assertEquals(0, writeStat.getNumUpdateWrites());
- assertEquals(0, writeStat.getTotalWriteErrors());
- }
- }
-
- private void assertOutput(Dataset expectedRows, Dataset actualRows, String instantTime, List fileNames) {
- // verify 3 meta fields that are filled in within create handle
- actualRows.collectAsList().forEach(entry -> {
- assertEquals(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).toString(), instantTime);
- assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD)));
- assertTrue(fileNames.contains(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD))));
- assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)));
- });
-
- // after trimming 2 of the meta fields, rest of the fields should match
- Dataset trimmedExpected = expectedRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD);
- Dataset trimmedActual = actualRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD);
- assertEquals(0, trimmedActual.except(trimmedExpected).count());
- }
}
diff --git a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
index 454c74d96..184ff771c 100644
--- a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
+++ b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
@@ -18,61 +18,32 @@
package org.apache.hudi.internal;
-import org.apache.hudi.client.HoodieInternalWriteStatus;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.testutils.HoodieClientTestUtils;
-import org.apache.spark.package$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.writer.DataWriter;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Random;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getConfigBuilder;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assumptions.assumeTrue;
/**
* Unit tests {@link HoodieDataSourceInternalWriter}.
*/
-public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness {
-
- private static final Random RANDOM = new Random();
-
- @BeforeEach
- public void setUp() throws Exception {
- // this test is only compatible with spark 2
- assumeTrue(package$.MODULE$.SPARK_VERSION().startsWith("2."));
- initSparkContexts("TestHoodieDataSourceInternalWriter");
- initPath();
- initFileSystem();
- initTestDataGenerator();
- initMetaClient();
- }
-
- @AfterEach
- public void tearDown() throws Exception {
- cleanupResources();
- }
+public class TestHoodieDataSourceInternalWriter extends
+ HoodieBulkInsertInternalWriterTestBase {
@Test
public void testDataSourceWriter() throws Exception {
@@ -84,7 +55,7 @@ public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness
new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf);
DataWriter writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(0, RANDOM.nextLong(), RANDOM.nextLong());
- List partitionPaths = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS);
+ String[] partitionPaths = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS;
List partitionPathsAbs = new ArrayList<>();
for (String partitionPath : partitionPaths) {
partitionPathsAbs.add(basePath + "/" + partitionPath + "/*");
@@ -112,8 +83,8 @@ public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness
metaClient.reloadActiveTimeline();
Dataset result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0]));
// verify output
- assertOutput(totalInputRows, result, instantTime);
- assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size);
+ assertOutput(totalInputRows, result, instantTime, Option.empty());
+ assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty());
}
@Test
@@ -155,8 +126,8 @@ public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness
Dataset result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime);
// verify output
- assertOutput(totalInputRows, result, instantTime);
- assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size);
+ assertOutput(totalInputRows, result, instantTime, Option.empty());
+ assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty());
}
}
@@ -199,8 +170,8 @@ public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness
Dataset result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime);
// verify output
- assertOutput(totalInputRows, result, instantTime);
- assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size);
+ assertOutput(totalInputRows, result, instantTime, Option.empty());
+ assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty());
}
}
@@ -250,8 +221,8 @@ public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness
metaClient.reloadActiveTimeline();
Dataset result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0]));
// verify rows
- assertOutput(totalInputRows, result, instantTime0);
- assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size);
+ assertOutput(totalInputRows, result, instantTime0, Option.empty());
+ assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty());
// 2nd batch. abort in the end
String instantTime1 = "00" + 1;
@@ -274,7 +245,7 @@ public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness
result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0]));
// verify rows
// only rows from first batch should be present
- assertOutput(totalInputRows, result, instantTime0);
+ assertOutput(totalInputRows, result, instantTime0, Option.empty());
}
private void writeRows(Dataset inputRows, DataWriter writer) throws Exception {
@@ -284,41 +255,4 @@ public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness
writer.write(internalRow);
}
}
-
- private void assertWriteStatuses(List writeStatuses, int batches, int size) {
- assertEquals(batches, writeStatuses.size());
- int counter = 0;
- for (HoodieInternalWriteStatus writeStatus : writeStatuses) {
- assertEquals(writeStatus.getPartitionPath(), HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3]);
- assertEquals(writeStatus.getTotalRecords(), size);
- assertEquals(writeStatus.getFailedRowsSize(), 0);
- assertEquals(writeStatus.getTotalErrorRecords(), 0);
- assertFalse(writeStatus.hasErrors());
- assertNull(writeStatus.getGlobalError());
- assertNotNull(writeStatus.getFileId());
- String fileId = writeStatus.getFileId();
- HoodieWriteStat writeStat = writeStatus.getStat();
- assertEquals(size, writeStat.getNumInserts());
- assertEquals(size, writeStat.getNumWrites());
- assertEquals(fileId, writeStat.getFileId());
- assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter++ % 3], writeStat.getPartitionPath());
- assertEquals(0, writeStat.getNumDeletes());
- assertEquals(0, writeStat.getNumUpdateWrites());
- assertEquals(0, writeStat.getTotalWriteErrors());
- }
- }
-
- private void assertOutput(Dataset expectedRows, Dataset actualRows, String instantTime) {
- // verify 3 meta fields that are filled in within create handle
- actualRows.collectAsList().forEach(entry -> {
- assertEquals(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).toString(), instantTime);
- assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD)));
- assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)));
- });
-
- // after trimming 2 of the meta fields, rest of the fields should match
- Dataset trimmedExpected = expectedRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD);
- Dataset trimmedActual = actualRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD);
- assertEquals(0, trimmedActual.except(trimmedExpected).count());
- }
}
diff --git a/hudi-spark-datasource/hudi-spark3/pom.xml b/hudi-spark-datasource/hudi-spark3/pom.xml
index 7b1ffa977..df914098b 100644
--- a/hudi-spark-datasource/hudi-spark3/pom.xml
+++ b/hudi-spark-datasource/hudi-spark3/pom.xml
@@ -153,11 +153,72 @@
true
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${fasterxml.spark3.version}
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ ${fasterxml.spark3.version}
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+ ${fasterxml.spark3.version}
+
+
org.apache.hudi
hudi-spark-client
${project.version}
+
+ org.apache.hudi
+ hudi-spark-common
+ ${project.version}
+
+
+
+
+ org.apache.hudi
+ hudi-client-common
+ ${project.version}
+ tests
+ test-jar
+ test
+
+
+ org.apache.hudi
+ hudi-spark-client
+ ${project.version}
+ tests
+ test-jar
+ test
+
+
+ org.apache.hudi
+ hudi-common
+ ${project.version}
+ tests
+ test-jar
+ test
+
+
+ org.apache.hudi
+ hudi-spark-common
+ ${project.version}
+ tests
+ test-jar
+ test
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java
new file mode 100644
index 000000000..d59b5ad5c
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.spark3.internal;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.internal.BaseDefaultSource;
+import org.apache.hudi.internal.DataSourceInternalWriterHelper;
+
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+import java.util.Map;
+
+/**
+ * DataSource V2 implementation for managing internal write logic. Only called internally.
+ * This class is only compatible with datasource V2 API in Spark 3.
+ */
+public class DefaultSource extends BaseDefaultSource implements TableProvider {
+
+ @Override
+ public StructType inferSchema(CaseInsensitiveStringMap options) {
+ return StructType.fromDDL(options.get(HoodieWriteConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL));
+ }
+
+ @Override
+ public Table getTable(StructType schema, Transform[] partitioning, Map properties) {
+ String instantTime = properties.get(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY);
+ String path = properties.get("path");
+ String tblName = properties.get(HoodieWriteConfig.TABLE_NAME);
+ HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(null, path, tblName, properties);
+ return new HoodieDataSourceInternalTable(instantTime, config, schema, getSparkSession(),
+ getConfiguration());
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriter.java
new file mode 100644
index 000000000..f67187c27
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.spark3.internal;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.internal.BulkInsertDataInternalWriterHelper;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.write.DataWriter;
+import org.apache.spark.sql.connector.write.WriterCommitMessage;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+
+/**
+ * Hoodie's Implementation of {@link DataWriter}. This is used in data source "hudi.spark3.internal" implementation for bulk insert.
+ */
+public class HoodieBulkInsertDataInternalWriter implements DataWriter {
+
+ private final BulkInsertDataInternalWriterHelper bulkInsertWriterHelper;
+
+ public HoodieBulkInsertDataInternalWriter(HoodieTable hoodieTable, HoodieWriteConfig writeConfig,
+ String instantTime, int taskPartitionId, long taskId, StructType structType) {
+ this.bulkInsertWriterHelper = new BulkInsertDataInternalWriterHelper(hoodieTable,
+ writeConfig, instantTime, taskPartitionId, taskId, 0, structType);
+ }
+
+ @Override
+ public void write(InternalRow record) throws IOException {
+ bulkInsertWriterHelper.write(record);
+ }
+
+ @Override
+ public WriterCommitMessage commit() throws IOException {
+ return new HoodieWriterCommitMessage(bulkInsertWriterHelper.getWriteStatuses());
+ }
+
+ @Override
+ public void abort() {
+ bulkInsertWriterHelper.abort();
+ }
+
+ @Override
+ public void close() throws IOException {
+ bulkInsertWriterHelper.close();
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriterFactory.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriterFactory.java
new file mode 100644
index 000000000..31b43ea7d
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriterFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.spark3.internal;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.write.DataWriter;
+import org.apache.spark.sql.connector.write.DataWriterFactory;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Factory to assist in instantiating {@link HoodieBulkInsertDataInternalWriter}.
+ */
+public class HoodieBulkInsertDataInternalWriterFactory implements DataWriterFactory {
+
+ private final String instantTime;
+ private final HoodieTable hoodieTable;
+ private final HoodieWriteConfig writeConfig;
+ private final StructType structType;
+
+ public HoodieBulkInsertDataInternalWriterFactory(HoodieTable hoodieTable, HoodieWriteConfig writeConfig,
+ String instantTime, StructType structType) {
+ this.hoodieTable = hoodieTable;
+ this.writeConfig = writeConfig;
+ this.instantTime = instantTime;
+ this.structType = structType;
+ }
+
+ @Override
+ public DataWriter createWriter(int partitionId, long taskId) {
+ return new HoodieBulkInsertDataInternalWriter(hoodieTable, writeConfig, instantTime, partitionId, taskId,
+ structType);
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java
new file mode 100644
index 000000000..b0945156d
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.spark3.internal;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.internal.DataSourceInternalWriterHelper;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.write.DataWriterFactory;
+import org.apache.spark.sql.connector.write.WriterCommitMessage;
+import org.apache.spark.sql.connector.write.BatchWrite;
+import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of {@link BatchWrite} for datasource "hudi.spark3.internal" to be used in datasource implementation
+ * of bulk insert.
+ */
+public class HoodieDataSourceInternalBatchWrite implements BatchWrite {
+
+ private final String instantTime;
+ private final HoodieWriteConfig writeConfig;
+ private final StructType structType;
+ private final DataSourceInternalWriterHelper dataSourceInternalWriterHelper;
+
+ public HoodieDataSourceInternalBatchWrite(String instantTime, HoodieWriteConfig writeConfig, StructType structType,
+ SparkSession jss, Configuration hadoopConfiguration) {
+ this.instantTime = instantTime;
+ this.writeConfig = writeConfig;
+ this.structType = structType;
+ this.dataSourceInternalWriterHelper = new DataSourceInternalWriterHelper(instantTime, writeConfig, structType,
+ jss, hadoopConfiguration);
+ }
+
+ @Override
+ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
+ dataSourceInternalWriterHelper.createInflightCommit();
+ if (WriteOperationType.BULK_INSERT == dataSourceInternalWriterHelper.getWriteOperationType()) {
+ return new HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(),
+ writeConfig, instantTime, structType);
+ } else {
+ throw new IllegalArgumentException("Write Operation Type + " + dataSourceInternalWriterHelper.getWriteOperationType() + " not supported ");
+ }
+ }
+
+ @Override
+ public boolean useCommitCoordinator() {
+ return dataSourceInternalWriterHelper.useCommitCoordinator();
+ }
+
+ @Override
+ public void onDataWriterCommit(WriterCommitMessage message) {
+ dataSourceInternalWriterHelper.onDataWriterCommit(message.toString());
+ }
+
+ @Override
+ public void commit(WriterCommitMessage[] messages) {
+ List writeStatList = Arrays.stream(messages).map(m -> (HoodieWriterCommitMessage) m)
+ .flatMap(m -> m.getWriteStatuses().stream().map(HoodieInternalWriteStatus::getStat)).collect(Collectors.toList());
+ dataSourceInternalWriterHelper.commit(writeStatList);
+ }
+
+ @Override
+ public void abort(WriterCommitMessage[] messages) {
+ dataSourceInternalWriterHelper.abort();
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWriteBuilder.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWriteBuilder.java
new file mode 100644
index 000000000..10e2e64f1
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWriteBuilder.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.spark3.internal;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.write.BatchWrite;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Implementation of {@link WriteBuilder} for datasource "hudi.spark3.internal" to be used in datasource implementation
+ * of bulk insert.
+ */
+public class HoodieDataSourceInternalBatchWriteBuilder implements WriteBuilder {
+
+ private final String instantTime;
+ private final HoodieWriteConfig writeConfig;
+ private final StructType structType;
+ private final SparkSession jss;
+ private final Configuration hadoopConfiguration;
+
+ public HoodieDataSourceInternalBatchWriteBuilder(String instantTime, HoodieWriteConfig writeConfig, StructType structType,
+ SparkSession jss, Configuration hadoopConfiguration) {
+ this.instantTime = instantTime;
+ this.writeConfig = writeConfig;
+ this.structType = structType;
+ this.jss = jss;
+ this.hadoopConfiguration = hadoopConfiguration;
+ }
+
+ @Override
+ public BatchWrite buildForBatch() {
+ return new HoodieDataSourceInternalBatchWrite(instantTime, writeConfig, structType, jss,
+ hadoopConfiguration);
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalTable.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalTable.java
new file mode 100644
index 000000000..f1fded033
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalTable.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.spark3.internal;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.SupportsWrite;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Hoodie's Implementation of {@link SupportsWrite}. This is used in data source "hudi.spark3.internal" implementation for bulk insert.
+ */
+class HoodieDataSourceInternalTable implements SupportsWrite {
+
+ private final String instantTime;
+ private final HoodieWriteConfig writeConfig;
+ private final StructType structType;
+ private final SparkSession jss;
+ private final Configuration hadoopConfiguration;
+
+ public HoodieDataSourceInternalTable(String instantTime, HoodieWriteConfig config,
+ StructType schema, SparkSession jss, Configuration hadoopConfiguration) {
+ this.instantTime = instantTime;
+ this.writeConfig = config;
+ this.structType = schema;
+ this.jss = jss;
+ this.hadoopConfiguration = hadoopConfiguration;
+ }
+
+ @Override
+ public String name() {
+ return this.getClass().toString();
+ }
+
+ @Override
+ public StructType schema() {
+ return structType;
+ }
+
+ @Override
+ public Set capabilities() {
+ return new HashSet() {{
+ add(TableCapability.BATCH_WRITE);
+ add(TableCapability.TRUNCATE);
+ }};
+ }
+
+ @Override
+ public WriteBuilder newWriteBuilder(LogicalWriteInfo logicalWriteInfo) {
+ return new HoodieDataSourceInternalBatchWriteBuilder(instantTime, writeConfig, structType, jss,
+ hadoopConfiguration);
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieWriterCommitMessage.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieWriterCommitMessage.java
new file mode 100644
index 000000000..7fe787deb
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieWriterCommitMessage.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.spark3.internal;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.internal.BaseWriterCommitMessage;
+import org.apache.spark.sql.connector.write.WriterCommitMessage;
+
+import java.util.List;
+
+/**
+ * Hoodie's {@link WriterCommitMessage} used in datasource "hudi.spark3.internal" implementation.
+ */
+public class HoodieWriterCommitMessage extends BaseWriterCommitMessage
+ implements WriterCommitMessage {
+
+ public HoodieWriterCommitMessage(List writeStatuses) {
+ super(writeStatuses);
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark3/src/test/scala/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark3/src/test/scala/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java
new file mode 100644
index 000000000..ffb649bd3
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3/src/test/scala/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.spark3.internal;
+
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.internal.HoodieBulkInsertInternalWriterTestBase;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER;
+import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE;
+import static org.apache.hudi.testutils.SparkDatasetTestUtils.getConfigBuilder;
+import static org.apache.hudi.testutils.SparkDatasetTestUtils.getInternalRowWithError;
+import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows;
+import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Unit tests {@link HoodieBulkInsertDataInternalWriter}.
+ */
+public class TestHoodieBulkInsertDataInternalWriter extends
+ HoodieBulkInsertInternalWriterTestBase {
+
+ @Test
+ public void testDataInternalWriter() throws Exception {
+ // init config and table
+ HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
+ HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+ // execute N rounds
+ for (int i = 0; i < 5; i++) {
+ String instantTime = "00" + i;
+ // init writer
+ HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), STRUCT_TYPE);
+
+ int size = 10 + RANDOM.nextInt(1000);
+ // write N rows to partition1, N rows to partition2 and N rows to partition3 ... Each batch should create a new RowCreateHandle and a new file
+ int batches = 5;
+ Dataset totalInputRows = null;
+
+ for (int j = 0; j < batches; j++) {
+ String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
+ Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false);
+ writeRows(inputRows, writer);
+ if (totalInputRows == null) {
+ totalInputRows = inputRows;
+ } else {
+ totalInputRows = totalInputRows.union(inputRows);
+ }
+ }
+
+ HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit();
+ Option> fileAbsPaths = Option.of(new ArrayList<>());
+ Option> fileNames = Option.of(new ArrayList<>());
+
+ // verify write statuses
+ assertWriteStatuses(commitMetadata.getWriteStatuses(), batches, size, fileAbsPaths, fileNames);
+
+ // verify rows
+ Dataset result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0]));
+ assertOutput(totalInputRows, result, instantTime, fileNames);
+ }
+ }
+
+
+ /**
+ * Issue some corrupted or wrong schematized InternalRow after few valid InternalRows so that global error is thrown. write batch 1 of valid records write batch2 of invalid records which is expected
+ * to throw Global Error. Verify global error is set appropriately and only first batch of records are written to disk.
+ */
+ @Test
+ public void testGlobalFailure() throws Exception {
+ // init config and table
+ HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
+ HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+ String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+
+ String instantTime = "001";
+ HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), STRUCT_TYPE);
+
+ int size = 10 + RANDOM.nextInt(100);
+ int totalFailures = 5;
+ // Generate first batch of valid rows
+ Dataset inputRows = getRandomRows(sqlContext, size / 2, partitionPath, false);
+ List internalRows = toInternalRows(inputRows, ENCODER);
+
+ // generate some failures rows
+ for (int i = 0; i < totalFailures; i++) {
+ internalRows.add(getInternalRowWithError(partitionPath));
+ }
+
+ // generate 2nd batch of valid rows
+ Dataset inputRows2 = getRandomRows(sqlContext, size / 2, partitionPath, false);
+ internalRows.addAll(toInternalRows(inputRows2, ENCODER));
+
+ // issue writes
+ try {
+ for (InternalRow internalRow : internalRows) {
+ writer.write(internalRow);
+ }
+ fail("Should have failed");
+ } catch (Throwable e) {
+ // expected
+ }
+
+ HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit();
+
+ Option> fileAbsPaths = Option.of(new ArrayList<>());
+ Option> fileNames = Option.of(new ArrayList<>());
+ // verify write statuses
+ assertWriteStatuses(commitMetadata.getWriteStatuses(), 1, size / 2, fileAbsPaths, fileNames);
+
+ // verify rows
+ Dataset result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0]));
+ assertOutput(inputRows, result, instantTime, fileNames);
+ }
+
+ private void writeRows(Dataset inputRows, HoodieBulkInsertDataInternalWriter writer)
+ throws Exception {
+ List internalRows = toInternalRows(inputRows, ENCODER);
+ // issue writes
+ for (InternalRow internalRow : internalRows) {
+ writer.write(internalRow);
+ }
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark3/src/test/scala/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java b/hudi-spark-datasource/hudi-spark3/src/test/scala/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java
new file mode 100644
index 000000000..69829ec28
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3/src/test/scala/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.spark3.internal;
+
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.internal.HoodieBulkInsertInternalWriterTestBase;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.write.DataWriter;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER;
+import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE;
+import static org.apache.hudi.testutils.SparkDatasetTestUtils.getConfigBuilder;
+import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows;
+import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows;
+
+/**
+ * Unit tests {@link HoodieDataSourceInternalBatchWrite}.
+ */
+public class TestHoodieDataSourceInternalBatchWrite extends
+ HoodieBulkInsertInternalWriterTestBase {
+
+ @Test
+ public void testDataSourceWriter() throws Exception {
+ // init config and table
+ HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
+ HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+ String instantTime = "001";
+ // init writer
+ HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
+ new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf);
+ DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0, RANDOM.nextLong());
+
+ String[] partitionPaths = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS;
+ List partitionPathsAbs = new ArrayList<>();
+ for (String partitionPath : partitionPaths) {
+ partitionPathsAbs.add(basePath + "/" + partitionPath + "/*");
+ }
+
+ int size = 10 + RANDOM.nextInt(1000);
+ int batches = 5;
+ Dataset totalInputRows = null;
+
+ for (int j = 0; j < batches; j++) {
+ String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
+ Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false);
+ writeRows(inputRows, writer);
+ if (totalInputRows == null) {
+ totalInputRows = inputRows;
+ } else {
+ totalInputRows = totalInputRows.union(inputRows);
+ }
+ }
+
+ HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit();
+ List commitMessages = new ArrayList<>();
+ commitMessages.add(commitMetadata);
+ dataSourceInternalBatchWrite.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0]));
+ metaClient.reloadActiveTimeline();
+ Dataset result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0]));
+ // verify output
+ assertOutput(totalInputRows, result, instantTime, Option.empty());
+ assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty());
+ }
+
+ @Test
+ public void testMultipleDataSourceWrites() throws Exception {
+ // init config and table
+ HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
+ HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+ int partitionCounter = 0;
+
+ // execute N rounds
+ for (int i = 0; i < 5; i++) {
+ String instantTime = "00" + i;
+ // init writer
+ HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
+ new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf);
+
+ List commitMessages = new ArrayList<>();
+ Dataset totalInputRows = null;
+ DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++, RANDOM.nextLong());
+
+ int size = 10 + RANDOM.nextInt(1000);
+ int batches = 5; // one batch per partition
+
+ for (int j = 0; j < batches; j++) {
+ String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
+ Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false);
+ writeRows(inputRows, writer);
+ if (totalInputRows == null) {
+ totalInputRows = inputRows;
+ } else {
+ totalInputRows = totalInputRows.union(inputRows);
+ }
+ }
+
+ HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit();
+ commitMessages.add(commitMetadata);
+ dataSourceInternalBatchWrite.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0]));
+ metaClient.reloadActiveTimeline();
+
+ Dataset result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime);
+
+ // verify output
+ assertOutput(totalInputRows, result, instantTime, Option.empty());
+ assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty());
+ }
+ }
+
+ @Test
+ public void testLargeWrites() throws Exception {
+ // init config and table
+ HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
+ HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+ int partitionCounter = 0;
+
+ // execute N rounds
+ for (int i = 0; i < 3; i++) {
+ String instantTime = "00" + i;
+ // init writer
+ HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
+ new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf);
+
+ List commitMessages = new ArrayList<>();
+ Dataset totalInputRows = null;
+ DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++, RANDOM.nextLong());
+
+ int size = 10000 + RANDOM.nextInt(10000);
+ int batches = 3; // one batch per partition
+
+ for (int j = 0; j < batches; j++) {
+ String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
+ Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false);
+ writeRows(inputRows, writer);
+ if (totalInputRows == null) {
+ totalInputRows = inputRows;
+ } else {
+ totalInputRows = totalInputRows.union(inputRows);
+ }
+ }
+
+ HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit();
+ commitMessages.add(commitMetadata);
+ dataSourceInternalBatchWrite.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0]));
+ metaClient.reloadActiveTimeline();
+
+ Dataset result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime);
+
+ // verify output
+ assertOutput(totalInputRows, result, instantTime, Option.empty());
+ assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty());
+ }
+ }
+
+ /**
+ * Tests that DataSourceWriter.abort() will abort the written records of interest write and commit batch1 write and abort batch2 Read of entire dataset should show only records from batch1.
+ * commit batch1
+ * abort batch2
+ * verify only records from batch1 is available to read
+ */
+ @Test
+ public void testAbort() throws Exception {
+ // init config and table
+ HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
+ HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+ String instantTime0 = "00" + 0;
+ // init writer
+ HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
+ new HoodieDataSourceInternalBatchWrite(instantTime0, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf);
+
+ DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0, RANDOM.nextLong());
+
+ List partitionPaths = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS);
+ List partitionPathsAbs = new ArrayList<>();
+ for (String partitionPath : partitionPaths) {
+ partitionPathsAbs.add(basePath + "/" + partitionPath + "/*");
+ }
+
+ int size = 10 + RANDOM.nextInt(100);
+ int batches = 1;
+ Dataset totalInputRows = null;
+
+ for (int j = 0; j < batches; j++) {
+ String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
+ Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false);
+ writeRows(inputRows, writer);
+ if (totalInputRows == null) {
+ totalInputRows = inputRows;
+ } else {
+ totalInputRows = totalInputRows.union(inputRows);
+ }
+ }
+
+ HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit();
+ List commitMessages = new ArrayList<>();
+ commitMessages.add(commitMetadata);
+ // commit 1st batch
+ dataSourceInternalBatchWrite.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0]));
+ metaClient.reloadActiveTimeline();
+ Dataset result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0]));
+ // verify rows
+ assertOutput(totalInputRows, result, instantTime0, Option.empty());
+ assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty());
+
+ // 2nd batch. abort in the end
+ String instantTime1 = "00" + 1;
+ dataSourceInternalBatchWrite =
+ new HoodieDataSourceInternalBatchWrite(instantTime1, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf);
+ writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(1, RANDOM.nextLong());
+
+ for (int j = 0; j < batches; j++) {
+ String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
+ Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false);
+ writeRows(inputRows, writer);
+ }
+
+ commitMetadata = (HoodieWriterCommitMessage) writer.commit();
+ commitMessages = new ArrayList<>();
+ commitMessages.add(commitMetadata);
+ // commit 1st batch
+ dataSourceInternalBatchWrite.abort(commitMessages.toArray(new HoodieWriterCommitMessage[0]));
+ metaClient.reloadActiveTimeline();
+ result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0]));
+ // verify rows
+ // only rows from first batch should be present
+ assertOutput(totalInputRows, result, instantTime0, Option.empty());
+ }
+
+ private void writeRows(Dataset inputRows, DataWriter writer) throws Exception {
+ List internalRows = toInternalRows(inputRows, ENCODER);
+ // issue writes
+ for (InternalRow internalRow : internalRows) {
+ writer.write(internalRow);
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 8f170883f..0b1053330 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,6 +87,7 @@
2.6.7.3
2.6.7.1
2.7.4
+ 2.10.0
2.0.0
2.17
1.10.1
@@ -1355,10 +1356,10 @@
${scala12.version}
2.12
2.4.1
- 2.10.0
- 2.10.0
- 2.10.0
- 2.10.0
+ ${fasterxml.spark3.version}
+ ${fasterxml.spark3.version}
+ ${fasterxml.spark3.version}
+ ${fasterxml.spark3.version}