1
0

[HUDI-1013] Adding Bulk Insert V2 implementation (#1834)

- Adding ability to use native spark row writing for bulk_insert
 - Controlled by `ENABLE_ROW_WRITER_OPT_KEY` datasource write option
 - Introduced KeyGeneratorInterface in hudi-client, moved KeyGenerator back to hudi-spark
 - Simplified the new API additions to just two new methods : getRecordKey(row), getPartitionPath(row)
 - Fixed all built-in key generators with new APIs
 - Made the field position map lazily created upon the first call to row based apis
 - Implemented native row based key generators for CustomKeyGenerator
 - Fixed all the tests, with these new APIs

Co-authored-by: Balaji Varadarajan <varadarb@uber.com>
Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
Sivabalan Narayanan
2020-08-13 03:33:39 -04:00
committed by GitHub
parent 8d04268264
commit 379cf0786f
62 changed files with 4682 additions and 485 deletions

View File

@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client;
import org.junit.jupiter.api.Test;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Unit tests {@link HoodieInternalWriteStatus}.
*/
public class TestHoodieInternalWriteStatus {
@Test
public void testFailureFraction() {
HoodieInternalWriteStatus status = new HoodieInternalWriteStatus(true, 0.1);
String fileId = UUID.randomUUID().toString();
String partitionPath = UUID.randomUUID().toString();
status.setFileId(fileId);
status.setPartitionPath(partitionPath);
Throwable t = new Exception("some error in writing");
for (int i = 0; i < 1000; i++) {
status.markFailure(UUID.randomUUID().toString(), t);
}
// verification
assertEquals(fileId, status.getFileId());
assertEquals(partitionPath, status.getPartitionPath());
assertTrue(status.getFailedRecordKeys().size() > 0);
assertTrue(status.getFailedRecordKeys().size() < 150); // 150 instead of 100, to prevent flaky test
assertTrue(status.hasErrors());
}
@Test
public void testSuccessRecordTracking() {
boolean[] vals = {true, false};
for (boolean trackSuccess : vals) {
HoodieInternalWriteStatus status = new HoodieInternalWriteStatus(trackSuccess, 1.0);
String fileId = UUID.randomUUID().toString();
status.setFileId(fileId);
String partitionPath = UUID.randomUUID().toString();
status.setPartitionPath(partitionPath);
Throwable t = new Exception("some error in writing");
for (int i = 0; i < 1000; i++) {
status.markSuccess(UUID.randomUUID().toString());
status.markFailure(UUID.randomUUID().toString(), t);
}
// verification
assertEquals(fileId, status.getFileId());
assertEquals(partitionPath, status.getPartitionPath());
assertEquals(1000, status.getFailedRecordKeys().size());
assertTrue(status.hasErrors());
if (trackSuccess) {
assertEquals(1000, status.getSuccessRecordKeys().size());
} else {
assertTrue(status.getSuccessRecordKeys().isEmpty());
}
assertEquals(2000, status.getTotalRecords());
}
}
@Test
public void testGlobalError() {
HoodieInternalWriteStatus status = new HoodieInternalWriteStatus(true, 0.1);
Throwable t = new Exception("some error in writing");
status.setGlobalError(t);
assertEquals(t, status.getGlobalError());
}
}

View File

@@ -0,0 +1,239 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.model;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.types.DataTypes;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Unit tests {@link HoodieInternalRow}.
*/
public class TestHoodieInternalRow {
private static final Random RANDOM = new Random();
private static final int INTEGER_INDEX = 5;
private static final int STRING_INDEX = 6;
private static final int BOOLEAN_INDEX = 7;
private static final int SHORT_INDEX = 8;
private static final int BYTE_INDEX = 9;
private static final int LONG_INDEX = 10;
private static final int FLOAT_INDEX = 11;
private static final int DOUBLE_INDEX = 12;
private static final int DECIMAL_INDEX = 13;
private static final int BINARY_INDEX = 14;
private static final int STRUCT_INDEX = 15;
// to do array and map
private static final int ARRAY_INDEX = 16;
private static final int MAP_INDEX = 17;
private List<Integer> nullIndices;
public TestHoodieInternalRow() {
this.nullIndices = new ArrayList<>();
}
@Test
public void testGet() {
Object[] values = getRandomValue(true);
InternalRow row = new GenericInternalRow(values);
HoodieInternalRow hoodieInternalRow = new HoodieInternalRow("commitTime", "commitSeqNo", "recordKey", "partitionPath", "fileName", row);
assertValues(hoodieInternalRow, "commitTime", "commitSeqNo", "recordKey", "partitionPath",
"fileName", values, nullIndices);
}
@Test
public void testUpdate() {
Object[] values = getRandomValue(true);
InternalRow row = new GenericInternalRow(values);
HoodieInternalRow hoodieInternalRow = new HoodieInternalRow("commitTime", "commitSeqNo", "recordKey", "partitionPath", "fileName", row);
hoodieInternalRow.update(0, "commitTime_updated");
hoodieInternalRow.update(1, "commitSeqNo_updated");
hoodieInternalRow.update(2, "recordKey_updated");
hoodieInternalRow.update(3, "partitionPath_updated");
hoodieInternalRow.update(4, "fileName_updated");
values = getRandomValue(true);
hoodieInternalRow.update(INTEGER_INDEX, values[INTEGER_INDEX]);
hoodieInternalRow.update(BOOLEAN_INDEX, values[BOOLEAN_INDEX]);
hoodieInternalRow.update(SHORT_INDEX, values[SHORT_INDEX]);
hoodieInternalRow.update(BYTE_INDEX, values[BYTE_INDEX]);
hoodieInternalRow.update(LONG_INDEX, values[LONG_INDEX]);
hoodieInternalRow.update(FLOAT_INDEX, values[FLOAT_INDEX]);
hoodieInternalRow.update(DOUBLE_INDEX, values[DOUBLE_INDEX]);
//hoodieInternalRow.update(decimalIndex, values[decimalIndex]);
hoodieInternalRow.update(BINARY_INDEX, values[BINARY_INDEX]);
hoodieInternalRow.update(STRUCT_INDEX, values[STRUCT_INDEX]);
hoodieInternalRow.update(STRING_INDEX, values[STRING_INDEX].toString());
assertValues(hoodieInternalRow, "commitTime_updated", "commitSeqNo_updated", "recordKey_updated", "partitionPath_updated",
"fileName_updated", values, nullIndices);
}
@Test
public void testIsNullCheck() {
for (int i = 0; i < 16; i++) {
Object[] values = getRandomValue(true);
InternalRow row = new GenericInternalRow(values);
HoodieInternalRow hoodieInternalRow = new HoodieInternalRow("commitTime", "commitSeqNo", "recordKey", "partitionPath", "fileName", row);
hoodieInternalRow.setNullAt(i);
nullIndices.clear();
nullIndices.add(i);
assertValues(hoodieInternalRow, "commitTime", "commitSeqNo", "recordKey", "partitionPath",
"fileName", values, nullIndices);
}
// try setting multiple values as null
// run it for 5 rounds
for (int i = 0; i < 5; i++) {
int numNullValues = 1 + RANDOM.nextInt(4);
List<Integer> nullsSoFar = new ArrayList<>();
while (nullsSoFar.size() < numNullValues) {
int randomIndex = RANDOM.nextInt(16);
if (!nullsSoFar.contains(randomIndex)) {
nullsSoFar.add(randomIndex);
}
}
Object[] values = getRandomValue(true);
InternalRow row = new GenericInternalRow(values);
HoodieInternalRow hoodieInternalRow = new HoodieInternalRow("commitTime", "commitSeqNo", "recordKey", "partitionPath", "fileName", row);
nullIndices.clear();
for (Integer index : nullsSoFar) {
hoodieInternalRow.setNullAt(index);
nullIndices.add(index);
}
assertValues(hoodieInternalRow, "commitTime", "commitSeqNo", "recordKey", "partitionPath",
"fileName", values, nullIndices);
}
}
/**
* Fetches a random Object[] of values for testing.
*
* @param withStructType true if structType need to be added as one of the elements in the Object[]
* @return the random Object[] thus generated
*/
private Object[] getRandomValue(boolean withStructType) {
Object[] values = new Object[16];
values[INTEGER_INDEX] = RANDOM.nextInt();
values[STRING_INDEX] = UUID.randomUUID().toString();
values[BOOLEAN_INDEX] = RANDOM.nextBoolean();
values[SHORT_INDEX] = (short) RANDOM.nextInt(2);
byte[] bytes = new byte[1];
RANDOM.nextBytes(bytes);
values[BYTE_INDEX] = bytes[0];
values[LONG_INDEX] = RANDOM.nextLong();
values[FLOAT_INDEX] = RANDOM.nextFloat();
values[DOUBLE_INDEX] = RANDOM.nextDouble();
// TODO fix decimal type.
values[DECIMAL_INDEX] = RANDOM.nextFloat();
bytes = new byte[20];
RANDOM.nextBytes(bytes);
values[BINARY_INDEX] = bytes;
if (withStructType) {
Object[] structField = getRandomValue(false);
values[STRUCT_INDEX] = new GenericInternalRow(structField);
}
return values;
}
private void assertValues(HoodieInternalRow hoodieInternalRow, String commitTime, String commitSeqNo, String recordKey, String partitionPath, String filename, Object[] values,
List<Integer> nullIndexes) {
for (Integer index : nullIndexes) {
assertTrue(hoodieInternalRow.isNullAt(index));
}
for (int i = 0; i < 16; i++) {
if (!nullIndexes.contains(i)) {
assertFalse(hoodieInternalRow.isNullAt(i));
}
}
if (!nullIndexes.contains(0)) {
assertEquals(commitTime, hoodieInternalRow.get(0, DataTypes.StringType).toString());
}
if (!nullIndexes.contains(1)) {
assertEquals(commitSeqNo, hoodieInternalRow.get(1, DataTypes.StringType).toString());
}
if (!nullIndexes.contains(2)) {
assertEquals(recordKey, hoodieInternalRow.get(2, DataTypes.StringType).toString());
}
if (!nullIndexes.contains(3)) {
assertEquals(partitionPath, hoodieInternalRow.get(3, DataTypes.StringType).toString());
}
if (!nullIndexes.contains(4)) {
assertEquals(filename, hoodieInternalRow.get(4, DataTypes.StringType).toString());
}
if (!nullIndexes.contains(INTEGER_INDEX)) {
assertEquals(values[INTEGER_INDEX], hoodieInternalRow.getInt(INTEGER_INDEX));
assertEquals(values[INTEGER_INDEX], hoodieInternalRow.get(INTEGER_INDEX, DataTypes.IntegerType));
}
if (!nullIndexes.contains(STRING_INDEX)) {
assertEquals(values[STRING_INDEX].toString(), hoodieInternalRow.get(STRING_INDEX, DataTypes.StringType));
}
if (!nullIndexes.contains(BOOLEAN_INDEX)) {
assertEquals(values[BOOLEAN_INDEX], hoodieInternalRow.getBoolean(BOOLEAN_INDEX));
assertEquals(values[BOOLEAN_INDEX], hoodieInternalRow.get(BOOLEAN_INDEX, DataTypes.BooleanType));
}
if (!nullIndexes.contains(SHORT_INDEX)) {
assertEquals(values[SHORT_INDEX], hoodieInternalRow.getShort(SHORT_INDEX));
assertEquals(values[SHORT_INDEX], hoodieInternalRow.get(SHORT_INDEX, DataTypes.ShortType));
}
if (!nullIndexes.contains(BYTE_INDEX)) {
assertEquals(values[BYTE_INDEX], hoodieInternalRow.getByte(BYTE_INDEX));
assertEquals(values[BYTE_INDEX], hoodieInternalRow.get(BYTE_INDEX, DataTypes.ByteType));
}
if (!nullIndexes.contains(LONG_INDEX)) {
assertEquals(values[LONG_INDEX], hoodieInternalRow.getLong(LONG_INDEX));
assertEquals(values[LONG_INDEX], hoodieInternalRow.get(LONG_INDEX, DataTypes.LongType));
}
if (!nullIndexes.contains(FLOAT_INDEX)) {
assertEquals(values[FLOAT_INDEX], hoodieInternalRow.getFloat(FLOAT_INDEX));
assertEquals(values[FLOAT_INDEX], hoodieInternalRow.get(FLOAT_INDEX, DataTypes.FloatType));
}
if (!nullIndexes.contains(DOUBLE_INDEX)) {
assertEquals(values[DOUBLE_INDEX], hoodieInternalRow.getDouble(DOUBLE_INDEX));
assertEquals(values[DOUBLE_INDEX], hoodieInternalRow.get(DOUBLE_INDEX, DataTypes.DoubleType));
}
if (!nullIndexes.contains(BINARY_INDEX)) {
assertEquals(values[BINARY_INDEX], hoodieInternalRow.getBinary(BINARY_INDEX));
assertEquals(values[BINARY_INDEX], hoodieInternalRow.get(BINARY_INDEX, DataTypes.BinaryType));
}
if (!nullIndexes.contains(STRUCT_INDEX)) {
assertEquals(values[STRUCT_INDEX], hoodieInternalRow.getStruct(STRUCT_INDEX, 18));
}
}
}

View File

@@ -0,0 +1,231 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io;
import org.apache.hudi.client.HoodieInternalWriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getConfigBuilder;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getInternalRowWithError;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
* Unit tests {@link HoodieRowCreateHandle}.
*/
public class TestHoodieRowCreateHandle extends HoodieClientTestHarness {
private static final Random RANDOM = new Random();
@BeforeEach
public void setUp() throws Exception {
initSparkContexts("TestHoodieRowCreateHandle");
initPath();
initFileSystem();
initTestDataGenerator();
initMetaClient();
}
@AfterEach
public void tearDown() throws Exception {
cleanupResources();
}
@Test
public void testRowCreateHandle() throws IOException {
// init config and table
HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
List<String> fileNames = new ArrayList<>();
List<String> fileAbsPaths = new ArrayList<>();
Dataset<Row> totalInputRows = null;
// one round per partition
for (int i = 0; i < 5; i++) {
String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[i % 3];
// init some args
String fileId = UUID.randomUUID().toString();
String instantTime = "000";
HoodieRowCreateHandle handle = new HoodieRowCreateHandle(table, cfg, partitionPath, fileId, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), STRUCT_TYPE);
int size = 10 + RANDOM.nextInt(1000);
// Generate inputs
Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath, false);
if (totalInputRows == null) {
totalInputRows = inputRows;
} else {
totalInputRows = totalInputRows.union(inputRows);
}
// issue writes
HoodieInternalWriteStatus writeStatus = writeAndGetWriteStatus(inputRows, handle);
fileAbsPaths.add(basePath + "/" + writeStatus.getStat().getPath());
fileNames.add(handle.getFileName());
// verify output
assertOutput(writeStatus, size, fileId, partitionPath, instantTime, totalInputRows, fileNames, fileAbsPaths);
}
}
/**
* Issue some corrupted or wrong schematized InternalRow after few valid InternalRows so that global error is thrown. write batch 1 of valid records write batch 2 of invalid records Global Error
* should be thrown.
*/
@Test
public void testGlobalFailure() throws IOException {
// init config and table
HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
// init some args
String fileId = UUID.randomUUID().toString();
String instantTime = "000";
HoodieRowCreateHandle handle = new HoodieRowCreateHandle(table, cfg, partitionPath, fileId, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), STRUCT_TYPE);
int size = 10 + RANDOM.nextInt(1000);
int totalFailures = 5;
// Generate first batch of valid rows
Dataset<Row> inputRows = getRandomRows(sqlContext, size / 2, partitionPath, false);
List<InternalRow> internalRows = toInternalRows(inputRows, ENCODER);
// generate some failures rows
for (int i = 0; i < totalFailures; i++) {
internalRows.add(getInternalRowWithError(partitionPath));
}
// generate 2nd batch of valid rows
Dataset<Row> inputRows2 = getRandomRows(sqlContext, size / 2, partitionPath, false);
internalRows.addAll(toInternalRows(inputRows2, ENCODER));
// issue writes
try {
for (InternalRow internalRow : internalRows) {
handle.write(internalRow);
}
fail("Should have failed");
} catch (Throwable e) {
// expected
}
// close the create handle
HoodieInternalWriteStatus writeStatus = handle.close();
List<String> fileNames = new ArrayList<>();
fileNames.add(handle.getFileName());
// verify write status
assertNotNull(writeStatus.getGlobalError());
assertTrue(writeStatus.getGlobalError().getMessage().contains("java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String"));
assertEquals(writeStatus.getFileId(), fileId);
assertEquals(writeStatus.getPartitionPath(), partitionPath);
// verify rows
Dataset<Row> result = sqlContext.read().parquet(basePath + "/" + partitionPath);
// passing only first batch of inputRows since after first batch global error would have been thrown
assertRows(inputRows, result, instantTime, fileNames);
}
@Test
public void testInstantiationFailure() throws IOException {
// init config and table
HoodieWriteConfig cfg = getConfigBuilder(basePath).withPath("/dummypath/abc/").build();
HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
try {
new HoodieRowCreateHandle(table, cfg, " def", UUID.randomUUID().toString(), "001", RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), STRUCT_TYPE);
fail("Should have thrown exception");
} catch (HoodieInsertException ioe) {
// expected
}
}
private HoodieInternalWriteStatus writeAndGetWriteStatus(Dataset<Row> inputRows, HoodieRowCreateHandle handle) throws IOException {
List<InternalRow> internalRows = toInternalRows(inputRows, ENCODER);
// issue writes
for (InternalRow internalRow : internalRows) {
handle.write(internalRow);
}
// close the create handle
return handle.close();
}
private void assertOutput(HoodieInternalWriteStatus writeStatus, int size, String fileId, String partitionPath, String instantTime, Dataset<Row> inputRows, List<String> filenames,
List<String> fileAbsPaths) {
assertEquals(writeStatus.getPartitionPath(), partitionPath);
assertEquals(writeStatus.getTotalRecords(), size);
assertEquals(writeStatus.getFailedRowsSize(), 0);
assertEquals(writeStatus.getTotalErrorRecords(), 0);
assertFalse(writeStatus.hasErrors());
assertNull(writeStatus.getGlobalError());
assertEquals(writeStatus.getFileId(), fileId);
HoodieWriteStat writeStat = writeStatus.getStat();
assertEquals(size, writeStat.getNumInserts());
assertEquals(size, writeStat.getNumWrites());
assertEquals(fileId, writeStat.getFileId());
assertEquals(partitionPath, writeStat.getPartitionPath());
assertEquals(0, writeStat.getNumDeletes());
assertEquals(0, writeStat.getNumUpdateWrites());
assertEquals(0, writeStat.getTotalWriteErrors());
// verify rows
Dataset<Row> result = sqlContext.read().parquet(fileAbsPaths.toArray(new String[0]));
assertRows(inputRows, result, instantTime, filenames);
}
private void assertRows(Dataset<Row> expectedRows, Dataset<Row> actualRows, String instantTime, List<String> filenames) {
// verify 3 meta fields that are filled in within create handle
actualRows.collectAsList().forEach(entry -> {
assertEquals(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).toString(), instantTime);
assertTrue(filenames.contains(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD)).toString()));
assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)));
});
// after trimming 2 of the meta fields, rest of the fields should match
Dataset<Row> trimmedExpected = expectedRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD);
Dataset<Row> trimmedActual = actualRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD);
assertEquals(0, trimmedActual.except(trimmedExpected).count());
}
}

View File

@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io.storage;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getConfigBuilder;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Unit tests {@link HoodieInternalRowParquetWriter}.
*/
public class TestHoodieInternalRowParquetWriter extends HoodieClientTestHarness {
private static final Random RANDOM = new Random();
@BeforeEach
public void setUp() throws Exception {
initSparkContexts("TestHoodieInternalRowParquetWriter");
initPath();
initFileSystem();
initTestDataGenerator();
initMetaClient();
}
@AfterEach
public void tearDown() throws Exception {
cleanupResources();
}
@Test
public void endToEndTest() throws IOException {
HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
for (int i = 0; i < 5; i++) {
// init write support and parquet config
HoodieRowParquetWriteSupport writeSupport = getWriteSupport(cfg, hadoopConf);
HoodieRowParquetConfig parquetConfig = new HoodieRowParquetConfig(writeSupport,
CompressionCodecName.SNAPPY, cfg.getParquetBlockSize(), cfg.getParquetPageSize(), cfg.getParquetMaxFileSize(),
writeSupport.getHadoopConf(), cfg.getParquetCompressionRatio());
// prepare path
String fileId = UUID.randomUUID().toString();
Path filePath = new Path(basePath + "/" + fileId);
String partitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
metaClient.getFs().mkdirs(new Path(basePath));
// init writer
HoodieInternalRowParquetWriter writer = new HoodieInternalRowParquetWriter(filePath, parquetConfig);
// generate input
int size = 10 + RANDOM.nextInt(100);
// Generate inputs
Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath, false);
List<InternalRow> internalRows = toInternalRows(inputRows, ENCODER);
// issue writes
for (InternalRow internalRow : internalRows) {
writer.write(internalRow);
}
// close the writer
writer.close();
// verify rows
Dataset<Row> result = sqlContext.read().parquet(basePath);
assertEquals(0, inputRows.except(result).count());
}
}
private HoodieRowParquetWriteSupport getWriteSupport(HoodieWriteConfig writeConfig, Configuration hadoopConf) {
BloomFilter filter = BloomFilterFactory.createBloomFilter(
writeConfig.getBloomFilterNumEntries(),
writeConfig.getBloomFilterFPP(),
writeConfig.getDynamicBloomFilterMaxNumEntries(),
writeConfig.getBloomFilterType());
return new HoodieRowParquetWriteSupport(hadoopConf, STRUCT_TYPE, filter);
}
}

View File

@@ -39,7 +39,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.IOType;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
import org.apache.hudi.io.storage.HoodieParquetWriter;
import org.apache.avro.Schema;
@@ -255,7 +255,7 @@ public class HoodieClientTestUtils {
HoodieAvroWriteSupport writeSupport =
new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter);
String instantTime = FSUtils.getCommitTime(filename);
HoodieParquetConfig config = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP,
HoodieAvroParquetConfig config = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP,
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024,
HoodieTestUtils.getDefaultHadoopConf(), Double.valueOf(HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO));
HoodieParquetWriter writer =

View File

@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.testutils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;
import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
/**
* Dataset test utils.
*/
public class SparkDatasetTestUtils {
public static final StructType STRUCT_TYPE = new StructType(new StructField[] {
new StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, DataTypes.StringType, false, Metadata.empty()),
new StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, DataTypes.StringType, false, Metadata.empty()),
new StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, DataTypes.StringType, false, Metadata.empty()),
new StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, DataTypes.StringType, false, Metadata.empty()),
new StructField(HoodieRecord.FILENAME_METADATA_FIELD, DataTypes.StringType, false, Metadata.empty()),
new StructField("randomInt", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("randomLong", DataTypes.LongType, false, Metadata.empty())});
public static final StructType ERROR_STRUCT_TYPE = new StructType(new StructField[] {
new StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, DataTypes.StringType, false, Metadata.empty()),
new StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, DataTypes.LongType, false, Metadata.empty()),
new StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, DataTypes.StringType, false, Metadata.empty()),
new StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, DataTypes.StringType, false, Metadata.empty()),
new StructField(HoodieRecord.FILENAME_METADATA_FIELD, DataTypes.StringType, false, Metadata.empty()),
new StructField("randomInt", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("randomStr", DataTypes.StringType, false, Metadata.empty())});
public static final ExpressionEncoder ENCODER = getEncoder(STRUCT_TYPE);
public static final ExpressionEncoder ERROR_ENCODER = getEncoder(ERROR_STRUCT_TYPE);
/**
* Generate Encode for the passed in {@link StructType}.
*
* @param schema instance of {@link StructType} for which encoder is requested.
* @return the encoder thus generated.
*/
private static ExpressionEncoder getEncoder(StructType schema) {
List<Attribute> attributes = JavaConversions.asJavaCollection(schema.toAttributes()).stream()
.map(Attribute::toAttribute).collect(Collectors.toList());
return RowEncoder.apply(schema)
.resolveAndBind(JavaConverters.asScalaBufferConverter(attributes).asScala().toSeq(),
SimpleAnalyzer$.MODULE$);
}
/**
* Generate random Rows.
*
* @param count total number of Rows to be generated.
* @param partitionPath partition path to be set
* @return the Dataset<Row>s thus generated.
*/
public static Dataset<Row> getRandomRows(SQLContext sqlContext, int count, String partitionPath, boolean isError) {
List<Row> records = new ArrayList<>();
for (long recordNum = 0; recordNum < count; recordNum++) {
records.add(getRandomValue(partitionPath, isError));
}
return sqlContext.createDataFrame(records, isError ? ERROR_STRUCT_TYPE : STRUCT_TYPE);
}
/**
* Generate random Row.
*
* @param partitionPath partition path to be set in the Row.
* @return the Row thus generated.
*/
public static Row getRandomValue(String partitionPath, boolean isError) {
// order commit time, seq no, record key, partition path, file name
Object[] values = new Object[7];
values[0] = ""; //commit time
if (!isError) {
values[1] = ""; // commit seq no
} else {
values[1] = RANDOM.nextLong();
}
values[2] = UUID.randomUUID().toString();
values[3] = partitionPath;
values[4] = ""; // filename
values[5] = RANDOM.nextInt();
if (!isError) {
values[6] = RANDOM.nextLong();
} else {
values[6] = UUID.randomUUID().toString();
}
return new GenericRow(values);
}
/**
* Convert Dataset<Row>s to List of {@link InternalRow}s.
*
* @param rows Dataset<Row>s to be converted
* @return the List of {@link InternalRow}s thus converted.
*/
public static List<InternalRow> toInternalRows(Dataset<Row> rows, ExpressionEncoder encoder) {
List<InternalRow> toReturn = new ArrayList<>();
List<Row> rowList = rows.collectAsList();
for (Row row : rowList) {
toReturn.add(encoder.toRow(row).copy());
}
return toReturn;
}
public static InternalRow getInternalRowWithError(String partitionPath) {
// order commit time, seq no, record key, partition path, file name
String recordKey = UUID.randomUUID().toString();
Object[] values = new Object[7];
values[0] = "";
values[1] = "";
values[2] = recordKey;
values[3] = partitionPath;
values[4] = "";
values[5] = RANDOM.nextInt();
values[6] = RANDOM.nextBoolean();
return new GenericInternalRow(values);
}
public static HoodieWriteConfig.Builder getConfigBuilder(String basePath) {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
.forTable("test-trip-table")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withBulkInsertParallelism(2);
}
}