From 286055ce34bdbbac68c995a2710fc7be07734b12 Mon Sep 17 00:00:00 2001 From: wenningd Date: Fri, 25 Dec 2020 06:43:34 -0800 Subject: [PATCH] [HUDI-1451] Support bulk insert v2 with Spark 3.0.0 (#2328) Co-authored-by: Wenning Ding - Added support for bulk insert v2 with datasource v2 api in Spark 3.0.0. --- .../apache/hudi/config/HoodieWriteConfig.java | 1 + .../hudi-spark-common/pom.xml | 32 +++ .../hudi/internal/BaseDefaultSource.java | 45 +++ .../internal/BaseWriterCommitMessage.java | 45 +++ .../BulkInsertDataInternalWriterHelper.java | 113 ++++++++ .../DataSourceInternalWriterHelper.java | 107 +++++++ ...oodieBulkInsertInternalWriterTestBase.java | 112 ++++++++ .../apache/hudi/HoodieSparkSqlWriter.scala | 25 +- hudi-spark-datasource/hudi-spark2/pom.xml | 8 + .../apache/hudi/internal/DefaultSource.java | 30 +- .../HoodieBulkInsertDataInternalWriter.java | 74 +---- .../HoodieDataSourceInternalWriter.java | 65 ++--- .../internal/HoodieWriterCommitMessage.java | 20 +- ...estHoodieBulkInsertDataInternalWriter.java | 91 +----- .../TestHoodieDataSourceInternalWriter.java | 92 +----- hudi-spark-datasource/hudi-spark3/pom.xml | 61 ++++ .../hudi/spark3/internal/DefaultSource.java | 54 ++++ .../HoodieBulkInsertDataInternalWriter.java | 64 +++++ ...ieBulkInsertDataInternalWriterFactory.java | 52 ++++ .../HoodieDataSourceInternalBatchWrite.java | 91 ++++++ ...ieDataSourceInternalBatchWriteBuilder.java | 55 ++++ .../HoodieDataSourceInternalTable.java | 77 +++++ .../internal/HoodieWriterCommitMessage.java | 36 +++ ...estHoodieBulkInsertDataInternalWriter.java | 150 ++++++++++ ...estHoodieDataSourceInternalBatchWrite.java | 265 ++++++++++++++++++ pom.xml | 9 +- 26 files changed, 1445 insertions(+), 329 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseDefaultSource.java create mode 100644 hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java create mode 100644 hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java create mode 100644 hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java create mode 100644 hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java create mode 100644 hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java create mode 100644 hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriter.java create mode 100644 hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriterFactory.java create mode 100644 hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java create mode 100644 hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWriteBuilder.java create mode 100644 hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalTable.java create mode 100644 hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieWriterCommitMessage.java create mode 100644 hudi-spark-datasource/hudi-spark3/src/test/scala/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java create mode 100644 hudi-spark-datasource/hudi-spark3/src/test/scala/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index bf9e20362..69544e887 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -70,6 +70,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { public static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism"; public static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"; public static final String BULKINSERT_USER_DEFINED_PARTITIONER_CLASS = "hoodie.bulkinsert.user.defined.partitioner.class"; + public static final String BULKINSERT_INPUT_DATA_SCHEMA_DDL = "hoodie.bulkinsert.schema.ddl"; public static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"; public static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism"; public static final String DEFAULT_ROLLBACK_PARALLELISM = "100"; diff --git a/hudi-spark-datasource/hudi-spark-common/pom.xml b/hudi-spark-datasource/hudi-spark-common/pom.xml index af6403dc5..763d3b2b7 100644 --- a/hudi-spark-datasource/hudi-spark-common/pom.xml +++ b/hudi-spark-datasource/hudi-spark-common/pom.xml @@ -173,6 +173,38 @@ org.apache.spark spark-sql_${scala.binary.version} + + + + org.apache.hudi + hudi-client-common + ${project.version} + tests + test-jar + test + + + org.apache.hudi + hudi-spark-client + ${project.version} + tests + test-jar + test + + + org.apache.hudi + hudi-common + ${project.version} + tests + test-jar + test + + + + org.junit.jupiter + junit-jupiter-api + test + diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseDefaultSource.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseDefaultSource.java new file mode 100644 index 000000000..e75c9a213 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseDefaultSource.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.internal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.sql.SparkSession; + +/** + * Base class for DefaultSource used by Spark datasource v2. + */ +public class BaseDefaultSource { + + protected SparkSession sparkSession = null; + protected Configuration configuration = null; + + protected SparkSession getSparkSession() { + if (sparkSession == null) { + sparkSession = SparkSession.builder().getOrCreate(); + } + return sparkSession; + } + + protected Configuration getConfiguration() { + if (configuration == null) { + this.configuration = getSparkSession().sparkContext().hadoopConfiguration(); + } + return configuration; + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java new file mode 100644 index 000000000..88a792123 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.internal; + +import org.apache.hudi.client.HoodieInternalWriteStatus; + +import java.util.Arrays; +import java.util.List; + +/** + * Base class for HoodieWriterCommitMessage used by Spark datasource v2. + */ +public class BaseWriterCommitMessage { + + private List writeStatuses; + + public BaseWriterCommitMessage(List writeStatuses) { + this.writeStatuses = writeStatuses; + } + + public List getWriteStatuses() { + return writeStatuses; + } + + @Override + public String toString() { + return "HoodieWriterCommitMessage{" + "writeStatuses=" + Arrays.toString(writeStatuses.toArray()) + '}'; + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java new file mode 100644 index 000000000..eb26c4f32 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.internal; + +import org.apache.hudi.client.HoodieInternalWriteStatus; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.HoodieRowCreateHandle; +import org.apache.hudi.table.HoodieTable; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +/** + * Helper class for HoodieBulkInsertDataInternalWriter used by Spark datasource v2. + */ +public class BulkInsertDataInternalWriterHelper { + + private static final Logger LOG = LogManager.getLogger(BulkInsertDataInternalWriterHelper.class); + + private final String instantTime; + private final int taskPartitionId; + private final long taskId; + private final long taskEpochId; + private final HoodieTable hoodieTable; + private final HoodieWriteConfig writeConfig; + private final StructType structType; + private final List writeStatusList = new ArrayList<>(); + + private HoodieRowCreateHandle handle; + private String lastKnownPartitionPath = null; + private String fileIdPrefix; + private int numFilesWritten = 0; + + public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, HoodieWriteConfig writeConfig, + String instantTime, int taskPartitionId, long taskId, long taskEpochId, StructType structType) { + this.hoodieTable = hoodieTable; + this.writeConfig = writeConfig; + this.instantTime = instantTime; + this.taskPartitionId = taskPartitionId; + this.taskId = taskId; + this.taskEpochId = taskEpochId; + this.structType = structType; + this.fileIdPrefix = UUID.randomUUID().toString(); + } + + public void write(InternalRow record) throws IOException { + try { + String partitionPath = record.getUTF8String( + HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString(); + + if ((lastKnownPartitionPath == null) || !lastKnownPartitionPath.equals(partitionPath) || !handle.canWrite()) { + LOG.info("Creating new file for partition path " + partitionPath); + createNewHandle(partitionPath); + lastKnownPartitionPath = partitionPath; + } + handle.write(record); + } catch (Throwable t) { + LOG.error("Global error thrown while trying to write records in HoodieRowCreateHandle ", t); + throw t; + } + } + + public List getWriteStatuses() throws IOException { + close(); + return writeStatusList; + } + + public void abort() { + } + + private void createNewHandle(String partitionPath) throws IOException { + if (null != handle) { + close(); + } + handle = new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(), + instantTime, taskPartitionId, taskId, taskEpochId, structType); + } + + public void close() throws IOException { + if (null != handle) { + writeStatusList.add(handle.close()); + handle = null; + } + } + + private String getNextFileId() { + return String.format("%s-%d", fileIdPrefix, numFilesWritten++); + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java new file mode 100644 index 000000000..b40d36bea --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.internal; + +import org.apache.hudi.DataSourceUtils; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieInstant.State; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTable; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.StructType; + +import java.util.List; + +/** + * Helper class for HoodieDataSourceInternalWriter used by Spark datasource v2. + */ +public class DataSourceInternalWriterHelper { + + private static final Logger LOG = LogManager.getLogger(DataSourceInternalWriterHelper.class); + public static final String INSTANT_TIME_OPT_KEY = "hoodie.instant.time"; + + private final String instantTime; + private final HoodieTableMetaClient metaClient; + private final SparkRDDWriteClient writeClient; + private final HoodieTable hoodieTable; + private final WriteOperationType operationType; + + public DataSourceInternalWriterHelper(String instantTime, HoodieWriteConfig writeConfig, StructType structType, + SparkSession sparkSession, Configuration configuration) { + this.instantTime = instantTime; + this.operationType = WriteOperationType.BULK_INSERT; + this.writeClient = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), writeConfig, true); + writeClient.setOperationType(operationType); + writeClient.startCommitWithTime(instantTime); + this.metaClient = new HoodieTableMetaClient(configuration, writeConfig.getBasePath()); + this.hoodieTable = HoodieSparkTable.create(writeConfig, new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), metaClient); + } + + public boolean useCommitCoordinator() { + return true; + } + + public void onDataWriterCommit(String message) { + LOG.info("Received commit of a data writer = " + message); + } + + public void commit(List writeStatList) { + try { + writeClient.commitStats(instantTime, writeStatList, Option.empty(), + DataSourceUtils.getCommitActionType(operationType, metaClient.getTableType())); + } catch (Exception ioe) { + throw new HoodieException(ioe.getMessage(), ioe); + } finally { + writeClient.close(); + } + } + + public void abort() { + LOG.error("Commit " + instantTime + " aborted "); + writeClient.rollback(instantTime); + writeClient.close(); + } + + public void createInflightCommit() { + metaClient.getActiveTimeline().transitionRequestedToInflight( + new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, instantTime), Option.empty()); + } + + public HoodieTable getHoodieTable() { + return hoodieTable; + } + + public WriteOperationType getWriteOperationType() { + return operationType; + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java new file mode 100644 index 000000000..d66a5ee51 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.internal; + +import org.apache.hudi.client.HoodieInternalWriteStatus; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.testutils.HoodieClientTestHarness; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; + +import java.util.List; +import java.util.Random; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Base class for TestHoodieBulkInsertDataInternalWriter. + */ +public class HoodieBulkInsertInternalWriterTestBase extends HoodieClientTestHarness { + + protected static final Random RANDOM = new Random(); + + @BeforeEach + public void setUp() throws Exception { + initSparkContexts(); + initPath(); + initFileSystem(); + initTestDataGenerator(); + initMetaClient(); + } + + @AfterEach + public void tearDown() throws Exception { + cleanupResources(); + } + + protected void assertWriteStatuses(List writeStatuses, int batches, int size, + Option> fileAbsPaths, Option> fileNames) { + assertEquals(batches, writeStatuses.size()); + int counter = 0; + for (HoodieInternalWriteStatus writeStatus : writeStatuses) { + // verify write status + assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3], writeStatus.getPartitionPath()); + assertEquals(writeStatus.getTotalRecords(), size); + assertNull(writeStatus.getGlobalError()); + assertEquals(writeStatus.getFailedRowsSize(), 0); + assertEquals(writeStatus.getTotalErrorRecords(), 0); + assertFalse(writeStatus.hasErrors()); + assertNotNull(writeStatus.getFileId()); + String fileId = writeStatus.getFileId(); + if (fileAbsPaths.isPresent()) { + fileAbsPaths.get().add(basePath + "/" + writeStatus.getStat().getPath()); + } + if (fileNames.isPresent()) { + fileNames.get().add(writeStatus.getStat().getPath() + .substring(writeStatus.getStat().getPath().lastIndexOf('/') + 1)); + } + HoodieWriteStat writeStat = writeStatus.getStat(); + assertEquals(size, writeStat.getNumInserts()); + assertEquals(size, writeStat.getNumWrites()); + assertEquals(fileId, writeStat.getFileId()); + assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter++ % 3], writeStat.getPartitionPath()); + assertEquals(0, writeStat.getNumDeletes()); + assertEquals(0, writeStat.getNumUpdateWrites()); + assertEquals(0, writeStat.getTotalWriteErrors()); + } + } + + protected void assertOutput(Dataset expectedRows, Dataset actualRows, String instantTime, Option> fileNames) { + // verify 3 meta fields that are filled in within create handle + actualRows.collectAsList().forEach(entry -> { + assertEquals(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).toString(), instantTime); + assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD))); + if (fileNames.isPresent()) { + assertTrue(fileNames.get().contains(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS + .get(HoodieRecord.FILENAME_METADATA_FIELD)))); + } + assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD))); + }); + + // after trimming 2 of the meta fields, rest of the fields should match + Dataset trimmedExpected = expectedRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD); + Dataset trimmedActual = actualRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD); + assertEquals(0, trimmedActual.except(trimmedExpected).count()); + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index d66103600..eb7319fd8 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -38,7 +38,7 @@ import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, B import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} -import org.apache.hudi.internal.HoodieDataSourceInternalWriter +import org.apache.hudi.internal.{HoodieDataSourceInternalWriter, DataSourceInternalWriterHelper} import org.apache.hudi.sync.common.AbstractSyncTool import org.apache.log4j.LogManager import org.apache.spark.SPARK_VERSION @@ -130,9 +130,6 @@ private[hudi] object HoodieSparkSqlWriter { // scalastyle:off if (parameters(ENABLE_ROW_WRITER_OPT_KEY).toBoolean && operation == WriteOperationType.BULK_INSERT) { - if (!SPARK_VERSION.startsWith("2.")) { - throw new HoodieException("Bulk insert using row writer is not supported with Spark 3. To use row writer please switch to spark 2.") - } val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, parameters, df, tblName, basePath, path, instantTime) return (success, commitTime, common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig) @@ -299,10 +296,22 @@ private[hudi] object HoodieSparkSqlWriter { val nameSpace = s"hoodie.${tblName}" val writeConfig = DataSourceUtils.createHoodieConfig(null, path.get, tblName, mapAsJavaMap(parameters)) val hoodieDF = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace) - hoodieDF.write.format("org.apache.hudi.internal") - .option(HoodieDataSourceInternalWriter.INSTANT_TIME_OPT_KEY, instantTime) - .options(parameters) - .save() + if (SPARK_VERSION.startsWith("2.")) { + hoodieDF.write.format("org.apache.hudi.internal") + .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime) + .options(parameters) + .save() + } else if (SPARK_VERSION.startsWith("3.")) { + hoodieDF.write.format("org.apache.hudi.spark3.internal") + .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime) + .option(HoodieWriteConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL, hoodieDF.schema.toDDL) + .options(parameters) + .mode(SaveMode.Append) + .save() + } else { + throw new HoodieException("Bulk insert using row writer is not supported with current Spark version." + + " To use row writer please switch to spark 2 or spark 3") + } val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) val metaSyncEnabled = parameters.get(META_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) val syncHiveSucess = if (hiveSyncEnabled || metaSyncEnabled) { diff --git a/hudi-spark-datasource/hudi-spark2/pom.xml b/hudi-spark-datasource/hudi-spark2/pom.xml index 8947bb79f..cd44e79ce 100644 --- a/hudi-spark-datasource/hudi-spark2/pom.xml +++ b/hudi-spark-datasource/hudi-spark2/pom.xml @@ -214,6 +214,14 @@ test-jar test + + org.apache.hudi + hudi-spark-common + ${project.version} + tests + test-jar + test + org.junit.jupiter diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java index 5fb71df77..526f0ce47 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java +++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java @@ -21,11 +21,7 @@ package org.apache.hudi.internal; import org.apache.hudi.DataSourceUtils; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hadoop.conf.Configuration; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.sources.DataSourceRegister; import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.DataSourceV2; @@ -40,14 +36,8 @@ import java.util.Optional; /** * DataSource V2 implementation for managing internal write logic. Only called internally. */ -public class DefaultSource implements DataSourceV2, ReadSupport, WriteSupport, - DataSourceRegister { - - private static final Logger LOG = LogManager - .getLogger(DefaultSource.class); - - private SparkSession sparkSession = null; - private Configuration configuration = null; +public class DefaultSource extends BaseDefaultSource implements DataSourceV2, + ReadSupport, WriteSupport, DataSourceRegister { @Override public String shortName() { @@ -67,25 +57,11 @@ public class DefaultSource implements DataSourceV2, ReadSupport, WriteSupport, @Override public Optional createWriter(String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options) { - String instantTime = options.get(HoodieDataSourceInternalWriter.INSTANT_TIME_OPT_KEY).get(); + String instantTime = options.get(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY).get(); String path = options.get("path").get(); String tblName = options.get(HoodieWriteConfig.TABLE_NAME).get(); HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(null, path, tblName, options.asMap()); return Optional.of(new HoodieDataSourceInternalWriter(instantTime, config, schema, getSparkSession(), getConfiguration())); } - - private SparkSession getSparkSession() { - if (sparkSession == null) { - sparkSession = SparkSession.builder().getOrCreate(); - } - return sparkSession; - } - - private Configuration getConfiguration() { - if (configuration == null) { - this.configuration = getSparkSession().sparkContext().hadoopConfiguration(); - } - return configuration; - } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java index 7aa0fc6a3..3ce8d776a 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java +++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java @@ -18,102 +18,42 @@ package org.apache.hudi.internal; -import org.apache.hudi.client.HoodieInternalWriteStatus; -import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.io.HoodieRowCreateHandle; import org.apache.hudi.table.HoodieTable; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.sources.v2.writer.DataWriter; import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; import org.apache.spark.sql.types.StructType; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; /** * Hoodie's Implementation of {@link DataWriter}. This is used in data source implementation for bulk insert. */ public class HoodieBulkInsertDataInternalWriter implements DataWriter { - private static final long serialVersionUID = 1L; - private static final Logger LOG = LogManager.getLogger(HoodieBulkInsertDataInternalWriter.class); - - private final String instantTime; - private final int taskPartitionId; - private final long taskId; - private final long taskEpochId; - private final HoodieTable hoodieTable; - private final HoodieWriteConfig writeConfig; - private final StructType structType; - private final List writeStatusList = new ArrayList<>(); - - private HoodieRowCreateHandle handle; - private String lastKnownPartitionPath = null; - private String fileIdPrefix = null; - private int numFilesWritten = 0; + private final BulkInsertDataInternalWriterHelper bulkInsertWriterHelper; public HoodieBulkInsertDataInternalWriter(HoodieTable hoodieTable, HoodieWriteConfig writeConfig, String instantTime, int taskPartitionId, long taskId, long taskEpochId, StructType structType) { - this.hoodieTable = hoodieTable; - this.writeConfig = writeConfig; - this.instantTime = instantTime; - this.taskPartitionId = taskPartitionId; - this.taskId = taskId; - this.taskEpochId = taskEpochId; - this.structType = structType; - this.fileIdPrefix = UUID.randomUUID().toString(); + this.bulkInsertWriterHelper = new BulkInsertDataInternalWriterHelper(hoodieTable, + writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, structType); } @Override public void write(InternalRow record) throws IOException { - try { - String partitionPath = record.getUTF8String( - HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString(); - - if ((lastKnownPartitionPath == null) || !lastKnownPartitionPath.equals(partitionPath) || !handle.canWrite()) { - LOG.info("Creating new file for partition path " + partitionPath); - createNewHandle(partitionPath); - lastKnownPartitionPath = partitionPath; - } - handle.write(record); - } catch (Throwable t) { - LOG.error("Global error thrown while trying to write records in HoodieRowCreateHandle ", t); - throw t; - } + bulkInsertWriterHelper.write(record); } @Override public WriterCommitMessage commit() throws IOException { - close(); - return new HoodieWriterCommitMessage(writeStatusList); + return new HoodieWriterCommitMessage(bulkInsertWriterHelper.getWriteStatuses()); } @Override - public void abort() throws IOException { - } - - private void createNewHandle(String partitionPath) throws IOException { - if (null != handle) { - close(); - } - handle = new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(), - instantTime, taskPartitionId, taskId, taskEpochId, structType); - } - - public void close() throws IOException { - if (null != handle) { - writeStatusList.add(handle.close()); - } - } - - protected String getNextFileId() { - return String.format("%s-%d", fileIdPrefix, numFilesWritten++); + public void abort() { + bulkInsertWriterHelper.abort(); } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java index e8cbff80a..4b3dafc62 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java +++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java @@ -18,24 +18,12 @@ package org.apache.hudi.internal; -import org.apache.hadoop.conf.Configuration; -import org.apache.hudi.DataSourceUtils; -import org.apache.hudi.client.SparkRDDWriteClient; -import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.client.HoodieInternalWriteStatus; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieInstant.State; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.table.HoodieSparkTable; -import org.apache.hudi.table.HoodieTable; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.spark.api.java.JavaSparkContext; + +import org.apache.hadoop.conf.Configuration; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; @@ -53,71 +41,50 @@ import java.util.stream.Collectors; */ public class HoodieDataSourceInternalWriter implements DataSourceWriter { - private static final long serialVersionUID = 1L; - private static final Logger LOG = LogManager.getLogger(HoodieDataSourceInternalWriter.class); - public static final String INSTANT_TIME_OPT_KEY = "hoodie.instant.time"; - private final String instantTime; - private final HoodieTableMetaClient metaClient; private final HoodieWriteConfig writeConfig; private final StructType structType; - private final SparkRDDWriteClient writeClient; - private final HoodieTable hoodieTable; - private final WriteOperationType operationType; + private final DataSourceInternalWriterHelper dataSourceInternalWriterHelper; public HoodieDataSourceInternalWriter(String instantTime, HoodieWriteConfig writeConfig, StructType structType, SparkSession sparkSession, Configuration configuration) { this.instantTime = instantTime; this.writeConfig = writeConfig; this.structType = structType; - this.operationType = WriteOperationType.BULK_INSERT; - this.writeClient = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), writeConfig, true); - writeClient.setOperationType(operationType); - writeClient.startCommitWithTime(instantTime); - this.metaClient = new HoodieTableMetaClient(configuration, writeConfig.getBasePath()); - this.hoodieTable = HoodieSparkTable.create(writeConfig, new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), metaClient); + this.dataSourceInternalWriterHelper = new DataSourceInternalWriterHelper(instantTime, writeConfig, structType, + sparkSession, configuration); } @Override public DataWriterFactory createWriterFactory() { - metaClient.getActiveTimeline().transitionRequestedToInflight( - new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, instantTime), Option.empty()); - if (WriteOperationType.BULK_INSERT == operationType) { - return new HoodieBulkInsertDataInternalWriterFactory(hoodieTable, writeConfig, instantTime, structType); + dataSourceInternalWriterHelper.createInflightCommit(); + if (WriteOperationType.BULK_INSERT == dataSourceInternalWriterHelper.getWriteOperationType()) { + return new HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(), + writeConfig, instantTime, structType); } else { - throw new IllegalArgumentException("Write Operation Type + " + operationType + " not supported "); + throw new IllegalArgumentException("Write Operation Type + " + dataSourceInternalWriterHelper.getWriteOperationType() + " not supported "); } } @Override public boolean useCommitCoordinator() { - return true; + return dataSourceInternalWriterHelper.useCommitCoordinator(); } @Override public void onDataWriterCommit(WriterCommitMessage message) { - LOG.info("Received commit of a data writer =" + message); + dataSourceInternalWriterHelper.onDataWriterCommit(message.toString()); } @Override public void commit(WriterCommitMessage[] messages) { List writeStatList = Arrays.stream(messages).map(m -> (HoodieWriterCommitMessage) m) - .flatMap(m -> m.getWriteStatuses().stream().map(m2 -> m2.getStat())).collect(Collectors.toList()); - - try { - writeClient.commitStats(instantTime, writeStatList, Option.empty(), - DataSourceUtils.getCommitActionType(operationType, metaClient.getTableType())); - } catch (Exception ioe) { - throw new HoodieException(ioe.getMessage(), ioe); - } finally { - writeClient.close(); - } + .flatMap(m -> m.getWriteStatuses().stream().map(HoodieInternalWriteStatus::getStat)).collect(Collectors.toList()); + dataSourceInternalWriterHelper.commit(writeStatList); } @Override public void abort(WriterCommitMessage[] messages) { - LOG.error("Commit " + instantTime + " aborted "); - writeClient.rollback(instantTime); - writeClient.close(); + dataSourceInternalWriterHelper.abort(); } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java index 757000c57..240e4b981 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java +++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java @@ -18,28 +18,18 @@ package org.apache.hudi.internal; -import java.util.ArrayList; -import java.util.List; import org.apache.hudi.client.HoodieInternalWriteStatus; import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; +import java.util.List; + /** * Hoodie's {@link WriterCommitMessage} used in datasource implementation. */ -public class HoodieWriterCommitMessage implements WriterCommitMessage { - - private List writeStatuses = new ArrayList<>(); +public class HoodieWriterCommitMessage extends BaseWriterCommitMessage + implements WriterCommitMessage { public HoodieWriterCommitMessage(List writeStatuses) { - this.writeStatuses = writeStatuses; - } - - public List getWriteStatuses() { - return writeStatuses; - } - - @Override - public String toString() { - return "HoodieWriterCommitMessage{" + "writeStatuses=" + writeStatuses + '}'; + super(writeStatuses); } } diff --git a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java index ac69af51e..0b021abeb 100644 --- a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java +++ b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java @@ -18,26 +18,19 @@ package org.apache.hudi.internal; -import org.apache.hudi.client.HoodieInternalWriteStatus; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.testutils.HoodieClientTestHarness; -import org.apache.spark.package$; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.List; -import java.util.Random; import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER; import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE; @@ -45,36 +38,13 @@ import static org.apache.hudi.testutils.SparkDatasetTestUtils.getConfigBuilder; import static org.apache.hudi.testutils.SparkDatasetTestUtils.getInternalRowWithError; import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows; import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import static org.junit.jupiter.api.Assumptions.assumeTrue; /** * Unit tests {@link HoodieBulkInsertDataInternalWriter}. */ -public class TestHoodieBulkInsertDataInternalWriter extends HoodieClientTestHarness { - - private static final Random RANDOM = new Random(); - - @BeforeEach - public void setUp() throws Exception { - // this test is only compatible with spark 2 - assumeTrue(package$.MODULE$.SPARK_VERSION().startsWith("2.")); - initSparkContexts("TestHoodieBulkInsertDataInternalWriter"); - initPath(); - initFileSystem(); - initTestDataGenerator(); - initMetaClient(); - } - - @AfterEach - public void tearDown() throws Exception { - cleanupResources(); - } +public class TestHoodieBulkInsertDataInternalWriter extends + HoodieBulkInsertInternalWriterTestBase { @Test public void testDataInternalWriter() throws Exception { @@ -103,15 +73,15 @@ public class TestHoodieBulkInsertDataInternalWriter extends HoodieClientTestHarn } } - HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); - List fileAbsPaths = new ArrayList<>(); - List fileNames = new ArrayList<>(); + BaseWriterCommitMessage commitMetadata = (BaseWriterCommitMessage) writer.commit(); + Option> fileAbsPaths = Option.of(new ArrayList<>()); + Option> fileNames = Option.of(new ArrayList<>()); // verify write statuses assertWriteStatuses(commitMetadata.getWriteStatuses(), batches, size, fileAbsPaths, fileNames); // verify rows - Dataset result = sqlContext.read().parquet(fileAbsPaths.toArray(new String[0])); + Dataset result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0])); assertOutput(totalInputRows, result, instantTime, fileNames); } } @@ -156,15 +126,15 @@ public class TestHoodieBulkInsertDataInternalWriter extends HoodieClientTestHarn // expected } - HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); + BaseWriterCommitMessage commitMetadata = (BaseWriterCommitMessage) writer.commit(); - List fileAbsPaths = new ArrayList<>(); - List fileNames = new ArrayList<>(); + Option> fileAbsPaths = Option.of(new ArrayList<>()); + Option> fileNames = Option.of(new ArrayList<>()); // verify write statuses assertWriteStatuses(commitMetadata.getWriteStatuses(), 1, size / 2, fileAbsPaths, fileNames); // verify rows - Dataset result = sqlContext.read().parquet(fileAbsPaths.toArray(new String[0])); + Dataset result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0])); assertOutput(inputRows, result, instantTime, fileNames); } @@ -176,43 +146,4 @@ public class TestHoodieBulkInsertDataInternalWriter extends HoodieClientTestHarn writer.write(internalRow); } } - - private void assertWriteStatuses(List writeStatuses, int batches, int size, List fileAbsPaths, List fileNames) { - assertEquals(batches, writeStatuses.size()); - int counter = 0; - for (HoodieInternalWriteStatus writeStatus : writeStatuses) { - // verify write status - assertEquals(writeStatus.getTotalRecords(), size); - assertNull(writeStatus.getGlobalError()); - assertEquals(writeStatus.getFailedRowsSize(), 0); - assertNotNull(writeStatus.getFileId()); - String fileId = writeStatus.getFileId(); - assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3], writeStatus.getPartitionPath()); - fileAbsPaths.add(basePath + "/" + writeStatus.getStat().getPath()); - fileNames.add(writeStatus.getStat().getPath().substring(writeStatus.getStat().getPath().lastIndexOf('/') + 1)); - HoodieWriteStat writeStat = writeStatus.getStat(); - assertEquals(size, writeStat.getNumInserts()); - assertEquals(size, writeStat.getNumWrites()); - assertEquals(fileId, writeStat.getFileId()); - assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter++ % 3], writeStat.getPartitionPath()); - assertEquals(0, writeStat.getNumDeletes()); - assertEquals(0, writeStat.getNumUpdateWrites()); - assertEquals(0, writeStat.getTotalWriteErrors()); - } - } - - private void assertOutput(Dataset expectedRows, Dataset actualRows, String instantTime, List fileNames) { - // verify 3 meta fields that are filled in within create handle - actualRows.collectAsList().forEach(entry -> { - assertEquals(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).toString(), instantTime); - assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD))); - assertTrue(fileNames.contains(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD)))); - assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD))); - }); - - // after trimming 2 of the meta fields, rest of the fields should match - Dataset trimmedExpected = expectedRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD); - Dataset trimmedActual = actualRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD); - assertEquals(0, trimmedActual.except(trimmedExpected).count()); - } } diff --git a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java index 454c74d96..184ff771c 100644 --- a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java +++ b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java @@ -18,61 +18,32 @@ package org.apache.hudi.internal; -import org.apache.hudi.client.HoodieInternalWriteStatus; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.HoodieClientTestUtils; -import org.apache.spark.package$; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.sources.v2.writer.DataWriter; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Random; import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER; import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE; import static org.apache.hudi.testutils.SparkDatasetTestUtils.getConfigBuilder; import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows; import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assumptions.assumeTrue; /** * Unit tests {@link HoodieDataSourceInternalWriter}. */ -public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness { - - private static final Random RANDOM = new Random(); - - @BeforeEach - public void setUp() throws Exception { - // this test is only compatible with spark 2 - assumeTrue(package$.MODULE$.SPARK_VERSION().startsWith("2.")); - initSparkContexts("TestHoodieDataSourceInternalWriter"); - initPath(); - initFileSystem(); - initTestDataGenerator(); - initMetaClient(); - } - - @AfterEach - public void tearDown() throws Exception { - cleanupResources(); - } +public class TestHoodieDataSourceInternalWriter extends + HoodieBulkInsertInternalWriterTestBase { @Test public void testDataSourceWriter() throws Exception { @@ -84,7 +55,7 @@ public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf); DataWriter writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(0, RANDOM.nextLong(), RANDOM.nextLong()); - List partitionPaths = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS); + String[] partitionPaths = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS; List partitionPathsAbs = new ArrayList<>(); for (String partitionPath : partitionPaths) { partitionPathsAbs.add(basePath + "/" + partitionPath + "/*"); @@ -112,8 +83,8 @@ public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness metaClient.reloadActiveTimeline(); Dataset result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0])); // verify output - assertOutput(totalInputRows, result, instantTime); - assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size); + assertOutput(totalInputRows, result, instantTime, Option.empty()); + assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty()); } @Test @@ -155,8 +126,8 @@ public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness Dataset result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime); // verify output - assertOutput(totalInputRows, result, instantTime); - assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size); + assertOutput(totalInputRows, result, instantTime, Option.empty()); + assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty()); } } @@ -199,8 +170,8 @@ public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness Dataset result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime); // verify output - assertOutput(totalInputRows, result, instantTime); - assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size); + assertOutput(totalInputRows, result, instantTime, Option.empty()); + assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty()); } } @@ -250,8 +221,8 @@ public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness metaClient.reloadActiveTimeline(); Dataset result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0])); // verify rows - assertOutput(totalInputRows, result, instantTime0); - assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size); + assertOutput(totalInputRows, result, instantTime0, Option.empty()); + assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty()); // 2nd batch. abort in the end String instantTime1 = "00" + 1; @@ -274,7 +245,7 @@ public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0])); // verify rows // only rows from first batch should be present - assertOutput(totalInputRows, result, instantTime0); + assertOutput(totalInputRows, result, instantTime0, Option.empty()); } private void writeRows(Dataset inputRows, DataWriter writer) throws Exception { @@ -284,41 +255,4 @@ public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness writer.write(internalRow); } } - - private void assertWriteStatuses(List writeStatuses, int batches, int size) { - assertEquals(batches, writeStatuses.size()); - int counter = 0; - for (HoodieInternalWriteStatus writeStatus : writeStatuses) { - assertEquals(writeStatus.getPartitionPath(), HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3]); - assertEquals(writeStatus.getTotalRecords(), size); - assertEquals(writeStatus.getFailedRowsSize(), 0); - assertEquals(writeStatus.getTotalErrorRecords(), 0); - assertFalse(writeStatus.hasErrors()); - assertNull(writeStatus.getGlobalError()); - assertNotNull(writeStatus.getFileId()); - String fileId = writeStatus.getFileId(); - HoodieWriteStat writeStat = writeStatus.getStat(); - assertEquals(size, writeStat.getNumInserts()); - assertEquals(size, writeStat.getNumWrites()); - assertEquals(fileId, writeStat.getFileId()); - assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter++ % 3], writeStat.getPartitionPath()); - assertEquals(0, writeStat.getNumDeletes()); - assertEquals(0, writeStat.getNumUpdateWrites()); - assertEquals(0, writeStat.getTotalWriteErrors()); - } - } - - private void assertOutput(Dataset expectedRows, Dataset actualRows, String instantTime) { - // verify 3 meta fields that are filled in within create handle - actualRows.collectAsList().forEach(entry -> { - assertEquals(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).toString(), instantTime); - assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD))); - assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD))); - }); - - // after trimming 2 of the meta fields, rest of the fields should match - Dataset trimmedExpected = expectedRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD); - Dataset trimmedActual = actualRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD); - assertEquals(0, trimmedActual.except(trimmedExpected).count()); - } } diff --git a/hudi-spark-datasource/hudi-spark3/pom.xml b/hudi-spark-datasource/hudi-spark3/pom.xml index 7b1ffa977..df914098b 100644 --- a/hudi-spark-datasource/hudi-spark3/pom.xml +++ b/hudi-spark-datasource/hudi-spark3/pom.xml @@ -153,11 +153,72 @@ true + + com.fasterxml.jackson.core + jackson-databind + ${fasterxml.spark3.version} + + + com.fasterxml.jackson.core + jackson-annotations + ${fasterxml.spark3.version} + + + com.fasterxml.jackson.core + jackson-core + ${fasterxml.spark3.version} + + org.apache.hudi hudi-spark-client ${project.version} + + org.apache.hudi + hudi-spark-common + ${project.version} + + + + + org.apache.hudi + hudi-client-common + ${project.version} + tests + test-jar + test + + + org.apache.hudi + hudi-spark-client + ${project.version} + tests + test-jar + test + + + org.apache.hudi + hudi-common + ${project.version} + tests + test-jar + test + + + org.apache.hudi + hudi-spark-common + ${project.version} + tests + test-jar + test + + + + org.junit.jupiter + junit-jupiter-api + test + diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java new file mode 100644 index 000000000..d59b5ad5c --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.spark3.internal; + +import org.apache.hudi.DataSourceUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.internal.BaseDefaultSource; +import org.apache.hudi.internal.DataSourceInternalWriterHelper; + +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableProvider; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +import java.util.Map; + +/** + * DataSource V2 implementation for managing internal write logic. Only called internally. + * This class is only compatible with datasource V2 API in Spark 3. + */ +public class DefaultSource extends BaseDefaultSource implements TableProvider { + + @Override + public StructType inferSchema(CaseInsensitiveStringMap options) { + return StructType.fromDDL(options.get(HoodieWriteConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL)); + } + + @Override + public Table getTable(StructType schema, Transform[] partitioning, Map properties) { + String instantTime = properties.get(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY); + String path = properties.get("path"); + String tblName = properties.get(HoodieWriteConfig.TABLE_NAME); + HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(null, path, tblName, properties); + return new HoodieDataSourceInternalTable(instantTime, config, schema, getSparkSession(), + getConfiguration()); + } +} diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriter.java new file mode 100644 index 000000000..f67187c27 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriter.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.spark3.internal; + +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.internal.BulkInsertDataInternalWriterHelper; +import org.apache.hudi.table.HoodieTable; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.types.StructType; + +import java.io.IOException; + +/** + * Hoodie's Implementation of {@link DataWriter}. This is used in data source "hudi.spark3.internal" implementation for bulk insert. + */ +public class HoodieBulkInsertDataInternalWriter implements DataWriter { + + private final BulkInsertDataInternalWriterHelper bulkInsertWriterHelper; + + public HoodieBulkInsertDataInternalWriter(HoodieTable hoodieTable, HoodieWriteConfig writeConfig, + String instantTime, int taskPartitionId, long taskId, StructType structType) { + this.bulkInsertWriterHelper = new BulkInsertDataInternalWriterHelper(hoodieTable, + writeConfig, instantTime, taskPartitionId, taskId, 0, structType); + } + + @Override + public void write(InternalRow record) throws IOException { + bulkInsertWriterHelper.write(record); + } + + @Override + public WriterCommitMessage commit() throws IOException { + return new HoodieWriterCommitMessage(bulkInsertWriterHelper.getWriteStatuses()); + } + + @Override + public void abort() { + bulkInsertWriterHelper.abort(); + } + + @Override + public void close() throws IOException { + bulkInsertWriterHelper.close(); + } +} diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriterFactory.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriterFactory.java new file mode 100644 index 000000000..31b43ea7d --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriterFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.spark3.internal; + +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.DataWriterFactory; +import org.apache.spark.sql.types.StructType; + +/** + * Factory to assist in instantiating {@link HoodieBulkInsertDataInternalWriter}. + */ +public class HoodieBulkInsertDataInternalWriterFactory implements DataWriterFactory { + + private final String instantTime; + private final HoodieTable hoodieTable; + private final HoodieWriteConfig writeConfig; + private final StructType structType; + + public HoodieBulkInsertDataInternalWriterFactory(HoodieTable hoodieTable, HoodieWriteConfig writeConfig, + String instantTime, StructType structType) { + this.hoodieTable = hoodieTable; + this.writeConfig = writeConfig; + this.instantTime = instantTime; + this.structType = structType; + } + + @Override + public DataWriter createWriter(int partitionId, long taskId) { + return new HoodieBulkInsertDataInternalWriter(hoodieTable, writeConfig, instantTime, partitionId, taskId, + structType); + } +} diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java new file mode 100644 index 000000000..b0945156d --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.spark3.internal; + +import org.apache.hudi.client.HoodieInternalWriteStatus; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.internal.DataSourceInternalWriterHelper; + +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.write.DataWriterFactory; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.types.StructType; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Implementation of {@link BatchWrite} for datasource "hudi.spark3.internal" to be used in datasource implementation + * of bulk insert. + */ +public class HoodieDataSourceInternalBatchWrite implements BatchWrite { + + private final String instantTime; + private final HoodieWriteConfig writeConfig; + private final StructType structType; + private final DataSourceInternalWriterHelper dataSourceInternalWriterHelper; + + public HoodieDataSourceInternalBatchWrite(String instantTime, HoodieWriteConfig writeConfig, StructType structType, + SparkSession jss, Configuration hadoopConfiguration) { + this.instantTime = instantTime; + this.writeConfig = writeConfig; + this.structType = structType; + this.dataSourceInternalWriterHelper = new DataSourceInternalWriterHelper(instantTime, writeConfig, structType, + jss, hadoopConfiguration); + } + + @Override + public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { + dataSourceInternalWriterHelper.createInflightCommit(); + if (WriteOperationType.BULK_INSERT == dataSourceInternalWriterHelper.getWriteOperationType()) { + return new HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(), + writeConfig, instantTime, structType); + } else { + throw new IllegalArgumentException("Write Operation Type + " + dataSourceInternalWriterHelper.getWriteOperationType() + " not supported "); + } + } + + @Override + public boolean useCommitCoordinator() { + return dataSourceInternalWriterHelper.useCommitCoordinator(); + } + + @Override + public void onDataWriterCommit(WriterCommitMessage message) { + dataSourceInternalWriterHelper.onDataWriterCommit(message.toString()); + } + + @Override + public void commit(WriterCommitMessage[] messages) { + List writeStatList = Arrays.stream(messages).map(m -> (HoodieWriterCommitMessage) m) + .flatMap(m -> m.getWriteStatuses().stream().map(HoodieInternalWriteStatus::getStat)).collect(Collectors.toList()); + dataSourceInternalWriterHelper.commit(writeStatList); + } + + @Override + public void abort(WriterCommitMessage[] messages) { + dataSourceInternalWriterHelper.abort(); + } +} diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWriteBuilder.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWriteBuilder.java new file mode 100644 index 000000000..10e2e64f1 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWriteBuilder.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.spark3.internal; + +import org.apache.hudi.config.HoodieWriteConfig; + +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.types.StructType; + +/** + * Implementation of {@link WriteBuilder} for datasource "hudi.spark3.internal" to be used in datasource implementation + * of bulk insert. + */ +public class HoodieDataSourceInternalBatchWriteBuilder implements WriteBuilder { + + private final String instantTime; + private final HoodieWriteConfig writeConfig; + private final StructType structType; + private final SparkSession jss; + private final Configuration hadoopConfiguration; + + public HoodieDataSourceInternalBatchWriteBuilder(String instantTime, HoodieWriteConfig writeConfig, StructType structType, + SparkSession jss, Configuration hadoopConfiguration) { + this.instantTime = instantTime; + this.writeConfig = writeConfig; + this.structType = structType; + this.jss = jss; + this.hadoopConfiguration = hadoopConfiguration; + } + + @Override + public BatchWrite buildForBatch() { + return new HoodieDataSourceInternalBatchWrite(instantTime, writeConfig, structType, jss, + hadoopConfiguration); + } +} diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalTable.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalTable.java new file mode 100644 index 000000000..f1fded033 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalTable.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.spark3.internal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.config.HoodieWriteConfig; + +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.SupportsWrite; +import org.apache.spark.sql.connector.catalog.TableCapability; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.types.StructType; + +import java.util.HashSet; +import java.util.Set; + +/** + * Hoodie's Implementation of {@link SupportsWrite}. This is used in data source "hudi.spark3.internal" implementation for bulk insert. + */ +class HoodieDataSourceInternalTable implements SupportsWrite { + + private final String instantTime; + private final HoodieWriteConfig writeConfig; + private final StructType structType; + private final SparkSession jss; + private final Configuration hadoopConfiguration; + + public HoodieDataSourceInternalTable(String instantTime, HoodieWriteConfig config, + StructType schema, SparkSession jss, Configuration hadoopConfiguration) { + this.instantTime = instantTime; + this.writeConfig = config; + this.structType = schema; + this.jss = jss; + this.hadoopConfiguration = hadoopConfiguration; + } + + @Override + public String name() { + return this.getClass().toString(); + } + + @Override + public StructType schema() { + return structType; + } + + @Override + public Set capabilities() { + return new HashSet() {{ + add(TableCapability.BATCH_WRITE); + add(TableCapability.TRUNCATE); + }}; + } + + @Override + public WriteBuilder newWriteBuilder(LogicalWriteInfo logicalWriteInfo) { + return new HoodieDataSourceInternalBatchWriteBuilder(instantTime, writeConfig, structType, jss, + hadoopConfiguration); + } +} diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieWriterCommitMessage.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieWriterCommitMessage.java new file mode 100644 index 000000000..7fe787deb --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieWriterCommitMessage.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.spark3.internal; + +import org.apache.hudi.client.HoodieInternalWriteStatus; +import org.apache.hudi.internal.BaseWriterCommitMessage; +import org.apache.spark.sql.connector.write.WriterCommitMessage; + +import java.util.List; + +/** + * Hoodie's {@link WriterCommitMessage} used in datasource "hudi.spark3.internal" implementation. + */ +public class HoodieWriterCommitMessage extends BaseWriterCommitMessage + implements WriterCommitMessage { + + public HoodieWriterCommitMessage(List writeStatuses) { + super(writeStatuses); + } +} diff --git a/hudi-spark-datasource/hudi-spark3/src/test/scala/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark3/src/test/scala/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java new file mode 100644 index 000000000..ffb649bd3 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3/src/test/scala/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.spark3.internal; + +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.internal.HoodieBulkInsertInternalWriterTestBase; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTable; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER; +import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE; +import static org.apache.hudi.testutils.SparkDatasetTestUtils.getConfigBuilder; +import static org.apache.hudi.testutils.SparkDatasetTestUtils.getInternalRowWithError; +import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows; +import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * Unit tests {@link HoodieBulkInsertDataInternalWriter}. + */ +public class TestHoodieBulkInsertDataInternalWriter extends + HoodieBulkInsertInternalWriterTestBase { + + @Test + public void testDataInternalWriter() throws Exception { + // init config and table + HoodieWriteConfig cfg = getConfigBuilder(basePath).build(); + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); + // execute N rounds + for (int i = 0; i < 5; i++) { + String instantTime = "00" + i; + // init writer + HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), STRUCT_TYPE); + + int size = 10 + RANDOM.nextInt(1000); + // write N rows to partition1, N rows to partition2 and N rows to partition3 ... Each batch should create a new RowCreateHandle and a new file + int batches = 5; + Dataset totalInputRows = null; + + for (int j = 0; j < batches; j++) { + String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; + Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); + writeRows(inputRows, writer); + if (totalInputRows == null) { + totalInputRows = inputRows; + } else { + totalInputRows = totalInputRows.union(inputRows); + } + } + + HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); + Option> fileAbsPaths = Option.of(new ArrayList<>()); + Option> fileNames = Option.of(new ArrayList<>()); + + // verify write statuses + assertWriteStatuses(commitMetadata.getWriteStatuses(), batches, size, fileAbsPaths, fileNames); + + // verify rows + Dataset result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0])); + assertOutput(totalInputRows, result, instantTime, fileNames); + } + } + + + /** + * Issue some corrupted or wrong schematized InternalRow after few valid InternalRows so that global error is thrown. write batch 1 of valid records write batch2 of invalid records which is expected + * to throw Global Error. Verify global error is set appropriately and only first batch of records are written to disk. + */ + @Test + public void testGlobalFailure() throws Exception { + // init config and table + HoodieWriteConfig cfg = getConfigBuilder(basePath).build(); + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); + String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0]; + + String instantTime = "001"; + HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), STRUCT_TYPE); + + int size = 10 + RANDOM.nextInt(100); + int totalFailures = 5; + // Generate first batch of valid rows + Dataset inputRows = getRandomRows(sqlContext, size / 2, partitionPath, false); + List internalRows = toInternalRows(inputRows, ENCODER); + + // generate some failures rows + for (int i = 0; i < totalFailures; i++) { + internalRows.add(getInternalRowWithError(partitionPath)); + } + + // generate 2nd batch of valid rows + Dataset inputRows2 = getRandomRows(sqlContext, size / 2, partitionPath, false); + internalRows.addAll(toInternalRows(inputRows2, ENCODER)); + + // issue writes + try { + for (InternalRow internalRow : internalRows) { + writer.write(internalRow); + } + fail("Should have failed"); + } catch (Throwable e) { + // expected + } + + HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); + + Option> fileAbsPaths = Option.of(new ArrayList<>()); + Option> fileNames = Option.of(new ArrayList<>()); + // verify write statuses + assertWriteStatuses(commitMetadata.getWriteStatuses(), 1, size / 2, fileAbsPaths, fileNames); + + // verify rows + Dataset result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0])); + assertOutput(inputRows, result, instantTime, fileNames); + } + + private void writeRows(Dataset inputRows, HoodieBulkInsertDataInternalWriter writer) + throws Exception { + List internalRows = toInternalRows(inputRows, ENCODER); + // issue writes + for (InternalRow internalRow : internalRows) { + writer.write(internalRow); + } + } +} diff --git a/hudi-spark-datasource/hudi-spark3/src/test/scala/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java b/hudi-spark-datasource/hudi-spark3/src/test/scala/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java new file mode 100644 index 000000000..69829ec28 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3/src/test/scala/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.spark3.internal; + +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.internal.HoodieBulkInsertInternalWriterTestBase; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.testutils.HoodieClientTestUtils; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.write.DataWriter; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER; +import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE; +import static org.apache.hudi.testutils.SparkDatasetTestUtils.getConfigBuilder; +import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows; +import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows; + +/** + * Unit tests {@link HoodieDataSourceInternalBatchWrite}. + */ +public class TestHoodieDataSourceInternalBatchWrite extends + HoodieBulkInsertInternalWriterTestBase { + + @Test + public void testDataSourceWriter() throws Exception { + // init config and table + HoodieWriteConfig cfg = getConfigBuilder(basePath).build(); + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); + String instantTime = "001"; + // init writer + HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = + new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf); + DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0, RANDOM.nextLong()); + + String[] partitionPaths = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS; + List partitionPathsAbs = new ArrayList<>(); + for (String partitionPath : partitionPaths) { + partitionPathsAbs.add(basePath + "/" + partitionPath + "/*"); + } + + int size = 10 + RANDOM.nextInt(1000); + int batches = 5; + Dataset totalInputRows = null; + + for (int j = 0; j < batches; j++) { + String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; + Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); + writeRows(inputRows, writer); + if (totalInputRows == null) { + totalInputRows = inputRows; + } else { + totalInputRows = totalInputRows.union(inputRows); + } + } + + HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); + List commitMessages = new ArrayList<>(); + commitMessages.add(commitMetadata); + dataSourceInternalBatchWrite.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0])); + metaClient.reloadActiveTimeline(); + Dataset result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0])); + // verify output + assertOutput(totalInputRows, result, instantTime, Option.empty()); + assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty()); + } + + @Test + public void testMultipleDataSourceWrites() throws Exception { + // init config and table + HoodieWriteConfig cfg = getConfigBuilder(basePath).build(); + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); + int partitionCounter = 0; + + // execute N rounds + for (int i = 0; i < 5; i++) { + String instantTime = "00" + i; + // init writer + HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = + new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf); + + List commitMessages = new ArrayList<>(); + Dataset totalInputRows = null; + DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++, RANDOM.nextLong()); + + int size = 10 + RANDOM.nextInt(1000); + int batches = 5; // one batch per partition + + for (int j = 0; j < batches; j++) { + String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; + Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); + writeRows(inputRows, writer); + if (totalInputRows == null) { + totalInputRows = inputRows; + } else { + totalInputRows = totalInputRows.union(inputRows); + } + } + + HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); + commitMessages.add(commitMetadata); + dataSourceInternalBatchWrite.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0])); + metaClient.reloadActiveTimeline(); + + Dataset result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime); + + // verify output + assertOutput(totalInputRows, result, instantTime, Option.empty()); + assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty()); + } + } + + @Test + public void testLargeWrites() throws Exception { + // init config and table + HoodieWriteConfig cfg = getConfigBuilder(basePath).build(); + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); + int partitionCounter = 0; + + // execute N rounds + for (int i = 0; i < 3; i++) { + String instantTime = "00" + i; + // init writer + HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = + new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf); + + List commitMessages = new ArrayList<>(); + Dataset totalInputRows = null; + DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++, RANDOM.nextLong()); + + int size = 10000 + RANDOM.nextInt(10000); + int batches = 3; // one batch per partition + + for (int j = 0; j < batches; j++) { + String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; + Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); + writeRows(inputRows, writer); + if (totalInputRows == null) { + totalInputRows = inputRows; + } else { + totalInputRows = totalInputRows.union(inputRows); + } + } + + HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); + commitMessages.add(commitMetadata); + dataSourceInternalBatchWrite.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0])); + metaClient.reloadActiveTimeline(); + + Dataset result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime); + + // verify output + assertOutput(totalInputRows, result, instantTime, Option.empty()); + assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty()); + } + } + + /** + * Tests that DataSourceWriter.abort() will abort the written records of interest write and commit batch1 write and abort batch2 Read of entire dataset should show only records from batch1. + * commit batch1 + * abort batch2 + * verify only records from batch1 is available to read + */ + @Test + public void testAbort() throws Exception { + // init config and table + HoodieWriteConfig cfg = getConfigBuilder(basePath).build(); + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); + String instantTime0 = "00" + 0; + // init writer + HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = + new HoodieDataSourceInternalBatchWrite(instantTime0, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf); + + DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0, RANDOM.nextLong()); + + List partitionPaths = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS); + List partitionPathsAbs = new ArrayList<>(); + for (String partitionPath : partitionPaths) { + partitionPathsAbs.add(basePath + "/" + partitionPath + "/*"); + } + + int size = 10 + RANDOM.nextInt(100); + int batches = 1; + Dataset totalInputRows = null; + + for (int j = 0; j < batches; j++) { + String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; + Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); + writeRows(inputRows, writer); + if (totalInputRows == null) { + totalInputRows = inputRows; + } else { + totalInputRows = totalInputRows.union(inputRows); + } + } + + HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); + List commitMessages = new ArrayList<>(); + commitMessages.add(commitMetadata); + // commit 1st batch + dataSourceInternalBatchWrite.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0])); + metaClient.reloadActiveTimeline(); + Dataset result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0])); + // verify rows + assertOutput(totalInputRows, result, instantTime0, Option.empty()); + assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty()); + + // 2nd batch. abort in the end + String instantTime1 = "00" + 1; + dataSourceInternalBatchWrite = + new HoodieDataSourceInternalBatchWrite(instantTime1, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf); + writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(1, RANDOM.nextLong()); + + for (int j = 0; j < batches; j++) { + String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; + Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); + writeRows(inputRows, writer); + } + + commitMetadata = (HoodieWriterCommitMessage) writer.commit(); + commitMessages = new ArrayList<>(); + commitMessages.add(commitMetadata); + // commit 1st batch + dataSourceInternalBatchWrite.abort(commitMessages.toArray(new HoodieWriterCommitMessage[0])); + metaClient.reloadActiveTimeline(); + result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0])); + // verify rows + // only rows from first batch should be present + assertOutput(totalInputRows, result, instantTime0, Option.empty()); + } + + private void writeRows(Dataset inputRows, DataWriter writer) throws Exception { + List internalRows = toInternalRows(inputRows, ENCODER); + // issue writes + for (InternalRow internalRow : internalRows) { + writer.write(internalRow); + } + } +} diff --git a/pom.xml b/pom.xml index 8f170883f..0b1053330 100644 --- a/pom.xml +++ b/pom.xml @@ -87,6 +87,7 @@ 2.6.7.3 2.6.7.1 2.7.4 + 2.10.0 2.0.0 2.17 1.10.1 @@ -1355,10 +1356,10 @@ ${scala12.version} 2.12 2.4.1 - 2.10.0 - 2.10.0 - 2.10.0 - 2.10.0 + ${fasterxml.spark3.version} + ${fasterxml.spark3.version} + ${fasterxml.spark3.version} + ${fasterxml.spark3.version}