1
0

[HUDI-1451] Support bulk insert v2 with Spark 3.0.0 (#2328)

Co-authored-by: Wenning Ding <wenningd@amazon.com>

- Added support for bulk insert v2 with datasource v2 api in Spark 3.0.0.
This commit is contained in:
wenningd
2020-12-25 06:43:34 -08:00
committed by GitHub
parent 89f482eaf2
commit 286055ce34
26 changed files with 1445 additions and 329 deletions

View File

@@ -21,11 +21,7 @@ package org.apache.hudi.internal;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
@@ -40,14 +36,8 @@ import java.util.Optional;
/**
* DataSource V2 implementation for managing internal write logic. Only called internally.
*/
public class DefaultSource implements DataSourceV2, ReadSupport, WriteSupport,
DataSourceRegister {
private static final Logger LOG = LogManager
.getLogger(DefaultSource.class);
private SparkSession sparkSession = null;
private Configuration configuration = null;
public class DefaultSource extends BaseDefaultSource implements DataSourceV2,
ReadSupport, WriteSupport, DataSourceRegister {
@Override
public String shortName() {
@@ -67,25 +57,11 @@ public class DefaultSource implements DataSourceV2, ReadSupport, WriteSupport,
@Override
public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema, SaveMode mode,
DataSourceOptions options) {
String instantTime = options.get(HoodieDataSourceInternalWriter.INSTANT_TIME_OPT_KEY).get();
String instantTime = options.get(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY).get();
String path = options.get("path").get();
String tblName = options.get(HoodieWriteConfig.TABLE_NAME).get();
HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(null, path, tblName, options.asMap());
return Optional.of(new HoodieDataSourceInternalWriter(instantTime, config, schema, getSparkSession(),
getConfiguration()));
}
private SparkSession getSparkSession() {
if (sparkSession == null) {
sparkSession = SparkSession.builder().getOrCreate();
}
return sparkSession;
}
private Configuration getConfiguration() {
if (configuration == null) {
this.configuration = getSparkSession().sparkContext().hadoopConfiguration();
}
return configuration;
}
}

View File

@@ -18,102 +18,42 @@
package org.apache.hudi.internal;
import org.apache.hudi.client.HoodieInternalWriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.HoodieRowCreateHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.writer.DataWriter;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
/**
* Hoodie's Implementation of {@link DataWriter<InternalRow>}. This is used in data source implementation for bulk insert.
*/
public class HoodieBulkInsertDataInternalWriter implements DataWriter<InternalRow> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(HoodieBulkInsertDataInternalWriter.class);
private final String instantTime;
private final int taskPartitionId;
private final long taskId;
private final long taskEpochId;
private final HoodieTable hoodieTable;
private final HoodieWriteConfig writeConfig;
private final StructType structType;
private final List<HoodieInternalWriteStatus> writeStatusList = new ArrayList<>();
private HoodieRowCreateHandle handle;
private String lastKnownPartitionPath = null;
private String fileIdPrefix = null;
private int numFilesWritten = 0;
private final BulkInsertDataInternalWriterHelper bulkInsertWriterHelper;
public HoodieBulkInsertDataInternalWriter(HoodieTable hoodieTable, HoodieWriteConfig writeConfig,
String instantTime, int taskPartitionId, long taskId, long taskEpochId,
StructType structType) {
this.hoodieTable = hoodieTable;
this.writeConfig = writeConfig;
this.instantTime = instantTime;
this.taskPartitionId = taskPartitionId;
this.taskId = taskId;
this.taskEpochId = taskEpochId;
this.structType = structType;
this.fileIdPrefix = UUID.randomUUID().toString();
this.bulkInsertWriterHelper = new BulkInsertDataInternalWriterHelper(hoodieTable,
writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, structType);
}
@Override
public void write(InternalRow record) throws IOException {
try {
String partitionPath = record.getUTF8String(
HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString();
if ((lastKnownPartitionPath == null) || !lastKnownPartitionPath.equals(partitionPath) || !handle.canWrite()) {
LOG.info("Creating new file for partition path " + partitionPath);
createNewHandle(partitionPath);
lastKnownPartitionPath = partitionPath;
}
handle.write(record);
} catch (Throwable t) {
LOG.error("Global error thrown while trying to write records in HoodieRowCreateHandle ", t);
throw t;
}
bulkInsertWriterHelper.write(record);
}
@Override
public WriterCommitMessage commit() throws IOException {
close();
return new HoodieWriterCommitMessage(writeStatusList);
return new HoodieWriterCommitMessage(bulkInsertWriterHelper.getWriteStatuses());
}
@Override
public void abort() throws IOException {
}
private void createNewHandle(String partitionPath) throws IOException {
if (null != handle) {
close();
}
handle = new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
instantTime, taskPartitionId, taskId, taskEpochId, structType);
}
public void close() throws IOException {
if (null != handle) {
writeStatusList.add(handle.close());
}
}
protected String getNextFileId() {
return String.format("%s-%d", fileIdPrefix, numFilesWritten++);
public void abort() {
bulkInsertWriterHelper.abort();
}
}

View File

@@ -18,24 +18,12 @@
package org.apache.hudi.internal;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.HoodieInternalWriteStatus;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
@@ -53,71 +41,50 @@ import java.util.stream.Collectors;
*/
public class HoodieDataSourceInternalWriter implements DataSourceWriter {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(HoodieDataSourceInternalWriter.class);
public static final String INSTANT_TIME_OPT_KEY = "hoodie.instant.time";
private final String instantTime;
private final HoodieTableMetaClient metaClient;
private final HoodieWriteConfig writeConfig;
private final StructType structType;
private final SparkRDDWriteClient writeClient;
private final HoodieTable hoodieTable;
private final WriteOperationType operationType;
private final DataSourceInternalWriterHelper dataSourceInternalWriterHelper;
public HoodieDataSourceInternalWriter(String instantTime, HoodieWriteConfig writeConfig, StructType structType,
SparkSession sparkSession, Configuration configuration) {
this.instantTime = instantTime;
this.writeConfig = writeConfig;
this.structType = structType;
this.operationType = WriteOperationType.BULK_INSERT;
this.writeClient = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), writeConfig, true);
writeClient.setOperationType(operationType);
writeClient.startCommitWithTime(instantTime);
this.metaClient = new HoodieTableMetaClient(configuration, writeConfig.getBasePath());
this.hoodieTable = HoodieSparkTable.create(writeConfig, new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), metaClient);
this.dataSourceInternalWriterHelper = new DataSourceInternalWriterHelper(instantTime, writeConfig, structType,
sparkSession, configuration);
}
@Override
public DataWriterFactory<InternalRow> createWriterFactory() {
metaClient.getActiveTimeline().transitionRequestedToInflight(
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, instantTime), Option.empty());
if (WriteOperationType.BULK_INSERT == operationType) {
return new HoodieBulkInsertDataInternalWriterFactory(hoodieTable, writeConfig, instantTime, structType);
dataSourceInternalWriterHelper.createInflightCommit();
if (WriteOperationType.BULK_INSERT == dataSourceInternalWriterHelper.getWriteOperationType()) {
return new HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(),
writeConfig, instantTime, structType);
} else {
throw new IllegalArgumentException("Write Operation Type + " + operationType + " not supported ");
throw new IllegalArgumentException("Write Operation Type + " + dataSourceInternalWriterHelper.getWriteOperationType() + " not supported ");
}
}
@Override
public boolean useCommitCoordinator() {
return true;
return dataSourceInternalWriterHelper.useCommitCoordinator();
}
@Override
public void onDataWriterCommit(WriterCommitMessage message) {
LOG.info("Received commit of a data writer =" + message);
dataSourceInternalWriterHelper.onDataWriterCommit(message.toString());
}
@Override
public void commit(WriterCommitMessage[] messages) {
List<HoodieWriteStat> writeStatList = Arrays.stream(messages).map(m -> (HoodieWriterCommitMessage) m)
.flatMap(m -> m.getWriteStatuses().stream().map(m2 -> m2.getStat())).collect(Collectors.toList());
try {
writeClient.commitStats(instantTime, writeStatList, Option.empty(),
DataSourceUtils.getCommitActionType(operationType, metaClient.getTableType()));
} catch (Exception ioe) {
throw new HoodieException(ioe.getMessage(), ioe);
} finally {
writeClient.close();
}
.flatMap(m -> m.getWriteStatuses().stream().map(HoodieInternalWriteStatus::getStat)).collect(Collectors.toList());
dataSourceInternalWriterHelper.commit(writeStatList);
}
@Override
public void abort(WriterCommitMessage[] messages) {
LOG.error("Commit " + instantTime + " aborted ");
writeClient.rollback(instantTime);
writeClient.close();
dataSourceInternalWriterHelper.abort();
}
}

View File

@@ -18,28 +18,18 @@
package org.apache.hudi.internal;
import java.util.ArrayList;
import java.util.List;
import org.apache.hudi.client.HoodieInternalWriteStatus;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import java.util.List;
/**
* Hoodie's {@link WriterCommitMessage} used in datasource implementation.
*/
public class HoodieWriterCommitMessage implements WriterCommitMessage {
private List<HoodieInternalWriteStatus> writeStatuses = new ArrayList<>();
public class HoodieWriterCommitMessage extends BaseWriterCommitMessage
implements WriterCommitMessage {
public HoodieWriterCommitMessage(List<HoodieInternalWriteStatus> writeStatuses) {
this.writeStatuses = writeStatuses;
}
public List<HoodieInternalWriteStatus> getWriteStatuses() {
return writeStatuses;
}
@Override
public String toString() {
return "HoodieWriterCommitMessage{" + "writeStatuses=" + writeStatuses + '}';
super(writeStatuses);
}
}

View File

@@ -18,26 +18,19 @@
package org.apache.hudi.internal;
import org.apache.hudi.client.HoodieInternalWriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.spark.package$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE;
@@ -45,36 +38,13 @@ import static org.apache.hudi.testutils.SparkDatasetTestUtils.getConfigBuilder;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getInternalRowWithError;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
/**
* Unit tests {@link HoodieBulkInsertDataInternalWriter}.
*/
public class TestHoodieBulkInsertDataInternalWriter extends HoodieClientTestHarness {
private static final Random RANDOM = new Random();
@BeforeEach
public void setUp() throws Exception {
// this test is only compatible with spark 2
assumeTrue(package$.MODULE$.SPARK_VERSION().startsWith("2."));
initSparkContexts("TestHoodieBulkInsertDataInternalWriter");
initPath();
initFileSystem();
initTestDataGenerator();
initMetaClient();
}
@AfterEach
public void tearDown() throws Exception {
cleanupResources();
}
public class TestHoodieBulkInsertDataInternalWriter extends
HoodieBulkInsertInternalWriterTestBase {
@Test
public void testDataInternalWriter() throws Exception {
@@ -103,15 +73,15 @@ public class TestHoodieBulkInsertDataInternalWriter extends HoodieClientTestHarn
}
}
HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit();
List<String> fileAbsPaths = new ArrayList<>();
List<String> fileNames = new ArrayList<>();
BaseWriterCommitMessage commitMetadata = (BaseWriterCommitMessage) writer.commit();
Option<List<String>> fileAbsPaths = Option.of(new ArrayList<>());
Option<List<String>> fileNames = Option.of(new ArrayList<>());
// verify write statuses
assertWriteStatuses(commitMetadata.getWriteStatuses(), batches, size, fileAbsPaths, fileNames);
// verify rows
Dataset<Row> result = sqlContext.read().parquet(fileAbsPaths.toArray(new String[0]));
Dataset<Row> result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0]));
assertOutput(totalInputRows, result, instantTime, fileNames);
}
}
@@ -156,15 +126,15 @@ public class TestHoodieBulkInsertDataInternalWriter extends HoodieClientTestHarn
// expected
}
HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit();
BaseWriterCommitMessage commitMetadata = (BaseWriterCommitMessage) writer.commit();
List<String> fileAbsPaths = new ArrayList<>();
List<String> fileNames = new ArrayList<>();
Option<List<String>> fileAbsPaths = Option.of(new ArrayList<>());
Option<List<String>> fileNames = Option.of(new ArrayList<>());
// verify write statuses
assertWriteStatuses(commitMetadata.getWriteStatuses(), 1, size / 2, fileAbsPaths, fileNames);
// verify rows
Dataset<Row> result = sqlContext.read().parquet(fileAbsPaths.toArray(new String[0]));
Dataset<Row> result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0]));
assertOutput(inputRows, result, instantTime, fileNames);
}
@@ -176,43 +146,4 @@ public class TestHoodieBulkInsertDataInternalWriter extends HoodieClientTestHarn
writer.write(internalRow);
}
}
private void assertWriteStatuses(List<HoodieInternalWriteStatus> writeStatuses, int batches, int size, List<String> fileAbsPaths, List<String> fileNames) {
assertEquals(batches, writeStatuses.size());
int counter = 0;
for (HoodieInternalWriteStatus writeStatus : writeStatuses) {
// verify write status
assertEquals(writeStatus.getTotalRecords(), size);
assertNull(writeStatus.getGlobalError());
assertEquals(writeStatus.getFailedRowsSize(), 0);
assertNotNull(writeStatus.getFileId());
String fileId = writeStatus.getFileId();
assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3], writeStatus.getPartitionPath());
fileAbsPaths.add(basePath + "/" + writeStatus.getStat().getPath());
fileNames.add(writeStatus.getStat().getPath().substring(writeStatus.getStat().getPath().lastIndexOf('/') + 1));
HoodieWriteStat writeStat = writeStatus.getStat();
assertEquals(size, writeStat.getNumInserts());
assertEquals(size, writeStat.getNumWrites());
assertEquals(fileId, writeStat.getFileId());
assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter++ % 3], writeStat.getPartitionPath());
assertEquals(0, writeStat.getNumDeletes());
assertEquals(0, writeStat.getNumUpdateWrites());
assertEquals(0, writeStat.getTotalWriteErrors());
}
}
private void assertOutput(Dataset<Row> expectedRows, Dataset<Row> actualRows, String instantTime, List<String> fileNames) {
// verify 3 meta fields that are filled in within create handle
actualRows.collectAsList().forEach(entry -> {
assertEquals(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).toString(), instantTime);
assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD)));
assertTrue(fileNames.contains(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD))));
assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)));
});
// after trimming 2 of the meta fields, rest of the fields should match
Dataset<Row> trimmedExpected = expectedRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD);
Dataset<Row> trimmedActual = actualRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD);
assertEquals(0, trimmedActual.except(trimmedExpected).count());
}
}

View File

@@ -18,61 +18,32 @@
package org.apache.hudi.internal;
import org.apache.hudi.client.HoodieInternalWriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.spark.package$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.writer.DataWriter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getConfigBuilder;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
/**
* Unit tests {@link HoodieDataSourceInternalWriter}.
*/
public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness {
private static final Random RANDOM = new Random();
@BeforeEach
public void setUp() throws Exception {
// this test is only compatible with spark 2
assumeTrue(package$.MODULE$.SPARK_VERSION().startsWith("2."));
initSparkContexts("TestHoodieDataSourceInternalWriter");
initPath();
initFileSystem();
initTestDataGenerator();
initMetaClient();
}
@AfterEach
public void tearDown() throws Exception {
cleanupResources();
}
public class TestHoodieDataSourceInternalWriter extends
HoodieBulkInsertInternalWriterTestBase {
@Test
public void testDataSourceWriter() throws Exception {
@@ -84,7 +55,7 @@ public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness
new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf);
DataWriter<InternalRow> writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(0, RANDOM.nextLong(), RANDOM.nextLong());
List<String> partitionPaths = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS);
String[] partitionPaths = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS;
List<String> partitionPathsAbs = new ArrayList<>();
for (String partitionPath : partitionPaths) {
partitionPathsAbs.add(basePath + "/" + partitionPath + "/*");
@@ -112,8 +83,8 @@ public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness
metaClient.reloadActiveTimeline();
Dataset<Row> result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0]));
// verify output
assertOutput(totalInputRows, result, instantTime);
assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size);
assertOutput(totalInputRows, result, instantTime, Option.empty());
assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty());
}
@Test
@@ -155,8 +126,8 @@ public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness
Dataset<Row> result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime);
// verify output
assertOutput(totalInputRows, result, instantTime);
assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size);
assertOutput(totalInputRows, result, instantTime, Option.empty());
assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty());
}
}
@@ -199,8 +170,8 @@ public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness
Dataset<Row> result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime);
// verify output
assertOutput(totalInputRows, result, instantTime);
assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size);
assertOutput(totalInputRows, result, instantTime, Option.empty());
assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty());
}
}
@@ -250,8 +221,8 @@ public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness
metaClient.reloadActiveTimeline();
Dataset<Row> result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0]));
// verify rows
assertOutput(totalInputRows, result, instantTime0);
assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size);
assertOutput(totalInputRows, result, instantTime0, Option.empty());
assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty());
// 2nd batch. abort in the end
String instantTime1 = "00" + 1;
@@ -274,7 +245,7 @@ public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness
result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0]));
// verify rows
// only rows from first batch should be present
assertOutput(totalInputRows, result, instantTime0);
assertOutput(totalInputRows, result, instantTime0, Option.empty());
}
private void writeRows(Dataset<Row> inputRows, DataWriter<InternalRow> writer) throws Exception {
@@ -284,41 +255,4 @@ public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness
writer.write(internalRow);
}
}
private void assertWriteStatuses(List<HoodieInternalWriteStatus> writeStatuses, int batches, int size) {
assertEquals(batches, writeStatuses.size());
int counter = 0;
for (HoodieInternalWriteStatus writeStatus : writeStatuses) {
assertEquals(writeStatus.getPartitionPath(), HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3]);
assertEquals(writeStatus.getTotalRecords(), size);
assertEquals(writeStatus.getFailedRowsSize(), 0);
assertEquals(writeStatus.getTotalErrorRecords(), 0);
assertFalse(writeStatus.hasErrors());
assertNull(writeStatus.getGlobalError());
assertNotNull(writeStatus.getFileId());
String fileId = writeStatus.getFileId();
HoodieWriteStat writeStat = writeStatus.getStat();
assertEquals(size, writeStat.getNumInserts());
assertEquals(size, writeStat.getNumWrites());
assertEquals(fileId, writeStat.getFileId());
assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter++ % 3], writeStat.getPartitionPath());
assertEquals(0, writeStat.getNumDeletes());
assertEquals(0, writeStat.getNumUpdateWrites());
assertEquals(0, writeStat.getTotalWriteErrors());
}
}
private void assertOutput(Dataset<Row> expectedRows, Dataset<Row> actualRows, String instantTime) {
// verify 3 meta fields that are filled in within create handle
actualRows.collectAsList().forEach(entry -> {
assertEquals(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).toString(), instantTime);
assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD)));
assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)));
});
// after trimming 2 of the meta fields, rest of the fields should match
Dataset<Row> trimmedExpected = expectedRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD);
Dataset<Row> trimmedActual = actualRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD);
assertEquals(0, trimmedActual.except(trimmedExpected).count());
}
}