[HUDI-809] Migrate CommonTestHarness to JUnit 5 (#1530)
This commit is contained in:
@@ -39,10 +39,11 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
|
||||
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.parquet.avro.AvroParquetWriter;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
@@ -57,19 +58,19 @@ public class InputFormatTestUtil {
|
||||
|
||||
private static String TEST_WRITE_TOKEN = "1-0-1";
|
||||
|
||||
public static File prepareTable(TemporaryFolder basePath, int numberOfFiles, String commitNumber)
|
||||
public static File prepareTable(java.nio.file.Path basePath, int numberOfFiles, String commitNumber)
|
||||
throws IOException {
|
||||
basePath.create();
|
||||
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.getRoot().toString());
|
||||
File partitionPath = basePath.newFolder("2016", "05", "01");
|
||||
return simulateInserts(partitionPath, "fileId1", numberOfFiles, commitNumber);
|
||||
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString());
|
||||
java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01"));
|
||||
Files.createDirectories(partitionPath);
|
||||
return simulateInserts(partitionPath.toFile(), "fileId1", numberOfFiles, commitNumber);
|
||||
}
|
||||
|
||||
public static File simulateInserts(File partitionPath, String fileId, int numberOfFiles, String commitNumber)
|
||||
throws IOException {
|
||||
throws IOException {
|
||||
for (int i = 0; i < numberOfFiles; i++) {
|
||||
File dataFile = new File(partitionPath, FSUtils.makeDataFileName(commitNumber, TEST_WRITE_TOKEN, fileId + i));
|
||||
dataFile.createNewFile();
|
||||
Files.createFile(partitionPath.toPath()
|
||||
.resolve(FSUtils.makeDataFileName(commitNumber, TEST_WRITE_TOKEN, fileId + i)));
|
||||
}
|
||||
return partitionPath;
|
||||
}
|
||||
@@ -86,19 +87,18 @@ public class InputFormatTestUtil {
|
||||
List<File> toUpdateList = dataFiles.subList(0, Math.min(numberOfFilesUpdated, dataFiles.size()));
|
||||
for (File file : toUpdateList) {
|
||||
String fileId = FSUtils.getFileId(file.getName());
|
||||
File dataFile = new File(directory, FSUtils.makeDataFileName(newCommit, TEST_WRITE_TOKEN, fileId));
|
||||
dataFile.createNewFile();
|
||||
Files.createFile(directory.toPath().resolve(FSUtils.makeDataFileName(newCommit, TEST_WRITE_TOKEN, fileId)));
|
||||
}
|
||||
}
|
||||
|
||||
public static void commit(TemporaryFolder basePath, String commitNumber) throws IOException {
|
||||
public static void commit(java.nio.file.Path basePath, String commitNumber) throws IOException {
|
||||
// create the commit
|
||||
new File(basePath.getRoot().toString() + "/.hoodie/", commitNumber + ".commit").createNewFile();
|
||||
Files.createFile(basePath.resolve(Paths.get(".hoodie", commitNumber + ".commit")));
|
||||
}
|
||||
|
||||
public static void deltaCommit(TemporaryFolder basePath, String commitNumber) throws IOException {
|
||||
public static void deltaCommit(java.nio.file.Path basePath, String commitNumber) throws IOException {
|
||||
// create the commit
|
||||
new File(basePath.getRoot().toString() + "/.hoodie/", commitNumber + ".deltacommit").createNewFile();
|
||||
Files.createFile(basePath.resolve(Paths.get(".hoodie", commitNumber + ".deltacommit")));
|
||||
}
|
||||
|
||||
public static void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull) {
|
||||
@@ -119,40 +119,35 @@ public class InputFormatTestUtil {
|
||||
return new Schema.Parser().parse(InputFormatTestUtil.class.getResourceAsStream(location));
|
||||
}
|
||||
|
||||
public static File prepareParquetTable(TemporaryFolder basePath, Schema schema, int numberOfFiles,
|
||||
public static File prepareParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles,
|
||||
int numberOfRecords, String commitNumber) throws IOException {
|
||||
basePath.create();
|
||||
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.getRoot().toString());
|
||||
File partitionPath = basePath.newFolder("2016", "05", "01");
|
||||
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString());
|
||||
java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01"));
|
||||
createData(schema, partitionPath, numberOfFiles, numberOfRecords, commitNumber);
|
||||
return partitionPath;
|
||||
return partitionPath.toFile();
|
||||
}
|
||||
|
||||
public static File prepareSimpleParquetTable(TemporaryFolder basePath, Schema schema, int numberOfFiles,
|
||||
public static File prepareSimpleParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles,
|
||||
int numberOfRecords, String commitNumber) throws Exception {
|
||||
basePath.create();
|
||||
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.getRoot().toString());
|
||||
File partitionPath = basePath.newFolder("2016", "05", "01");
|
||||
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString());
|
||||
java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01"));
|
||||
createSimpleData(schema, partitionPath, numberOfFiles, numberOfRecords, commitNumber);
|
||||
return partitionPath;
|
||||
return partitionPath.toFile();
|
||||
}
|
||||
|
||||
public static File prepareNonPartitionedParquetTable(TemporaryFolder baseDir, Schema schema, int numberOfFiles,
|
||||
public static File prepareNonPartitionedParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles,
|
||||
int numberOfRecords, String commitNumber) throws IOException {
|
||||
baseDir.create();
|
||||
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), baseDir.getRoot().toString());
|
||||
File basePath = baseDir.getRoot();
|
||||
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString());
|
||||
createData(schema, basePath, numberOfFiles, numberOfRecords, commitNumber);
|
||||
return basePath;
|
||||
return basePath.toFile();
|
||||
}
|
||||
|
||||
private static void createData(Schema schema, File partitionPath, int numberOfFiles, int numberOfRecords,
|
||||
private static void createData(Schema schema, java.nio.file.Path partitionPath, int numberOfFiles, int numberOfRecords,
|
||||
String commitNumber) throws IOException {
|
||||
AvroParquetWriter parquetWriter;
|
||||
for (int i = 0; i < numberOfFiles; i++) {
|
||||
String fileId = FSUtils.makeDataFileName(commitNumber, TEST_WRITE_TOKEN, "fileid" + i);
|
||||
File dataFile = new File(partitionPath, fileId);
|
||||
parquetWriter = new AvroParquetWriter(new Path(dataFile.getAbsolutePath()), schema);
|
||||
parquetWriter = new AvroParquetWriter(new Path(partitionPath.resolve(fileId).toString()), schema);
|
||||
try {
|
||||
for (GenericRecord record : generateAvroRecords(schema, numberOfRecords, commitNumber, fileId)) {
|
||||
parquetWriter.write(record);
|
||||
@@ -163,13 +158,12 @@ public class InputFormatTestUtil {
|
||||
}
|
||||
}
|
||||
|
||||
private static void createSimpleData(Schema schema, File partitionPath, int numberOfFiles, int numberOfRecords,
|
||||
private static void createSimpleData(Schema schema, java.nio.file.Path partitionPath, int numberOfFiles, int numberOfRecords,
|
||||
String commitNumber) throws Exception {
|
||||
AvroParquetWriter parquetWriter;
|
||||
for (int i = 0; i < numberOfFiles; i++) {
|
||||
String fileId = FSUtils.makeDataFileName(commitNumber, "1", "fileid" + i);
|
||||
File dataFile = new File(partitionPath, fileId);
|
||||
parquetWriter = new AvroParquetWriter(new Path(dataFile.getAbsolutePath()), schema);
|
||||
parquetWriter = new AvroParquetWriter(new Path(partitionPath.resolve(fileId).toString()), schema);
|
||||
try {
|
||||
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, numberOfRecords);
|
||||
Schema hoodieFieldsSchema = HoodieAvroUtils.addMetadataFields(schema);
|
||||
@@ -212,7 +206,6 @@ public class InputFormatTestUtil {
|
||||
parquetWriter.write(record);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static HoodieLogFormat.Writer writeRollback(File partitionDir, FileSystem fs, String fileId, String baseCommit,
|
||||
|
||||
@@ -23,6 +23,11 @@ import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
@@ -33,15 +38,9 @@ import org.apache.hadoop.mapred.InputSplit;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.RecordReader;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
@@ -50,15 +49,12 @@ import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class TestHoodieParquetInputFormat {
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder basePath = new TemporaryFolder();
|
||||
|
||||
private HoodieParquetInputFormat inputFormat;
|
||||
private JobConf jobConf;
|
||||
|
||||
@@ -70,16 +66,19 @@ public class TestHoodieParquetInputFormat {
|
||||
count++;
|
||||
}
|
||||
}
|
||||
assertEquals(msg, expected, count);
|
||||
assertEquals(expected, count, msg);
|
||||
}
|
||||
|
||||
@Before
|
||||
@BeforeEach
|
||||
public void setUp() {
|
||||
inputFormat = new HoodieParquetInputFormat();
|
||||
jobConf = new JobConf();
|
||||
inputFormat.setConf(jobConf);
|
||||
}
|
||||
|
||||
@TempDir
|
||||
public java.nio.file.Path basePath;
|
||||
|
||||
// Verify that HoodieParquetInputFormat does not return instants after pending compaction
|
||||
@Test
|
||||
public void testPendingCompactionWithActiveCommits() throws IOException {
|
||||
@@ -98,7 +97,7 @@ public class TestHoodieParquetInputFormat {
|
||||
instants.add(t4);
|
||||
instants.add(t5);
|
||||
instants.add(t6);
|
||||
HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.getRoot().getAbsolutePath().toString());
|
||||
HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.toString());
|
||||
HoodieActiveTimeline timeline = new HoodieActiveTimeline(metaClient);
|
||||
timeline.setInstants(instants);
|
||||
|
||||
@@ -196,16 +195,16 @@ public class TestHoodieParquetInputFormat {
|
||||
InputFormatTestUtil.setupIncremental(jobConf, "100", 1);
|
||||
|
||||
FileStatus[] files = inputFormat.listStatus(jobConf);
|
||||
assertEquals("We should exclude commit 100 when returning incremental pull with start commit time as 100", 0,
|
||||
files.length);
|
||||
assertEquals(0, files.length,
|
||||
"We should exclude commit 100 when returning incremental pull with start commit time as 100");
|
||||
}
|
||||
|
||||
private void createCommitFile(TemporaryFolder basePath, String commitNumber, String partitionPath)
|
||||
private void createCommitFile(java.nio.file.Path basePath, String commitNumber, String partitionPath)
|
||||
throws IOException {
|
||||
List<HoodieWriteStat> writeStats = HoodieTestUtils.generateFakeHoodieWriteStat(1);
|
||||
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
|
||||
writeStats.forEach(stat -> commitMetadata.addWriteStat(partitionPath, stat));
|
||||
File file = new File(basePath.getRoot().toString() + "/.hoodie/", commitNumber + ".commit");
|
||||
File file = basePath.resolve(".hoodie").resolve(commitNumber + ".commit").toFile();
|
||||
file.createNewFile();
|
||||
FileOutputStream fileOutputStream = new FileOutputStream(file);
|
||||
fileOutputStream.write(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
|
||||
@@ -213,10 +212,10 @@ public class TestHoodieParquetInputFormat {
|
||||
fileOutputStream.close();
|
||||
}
|
||||
|
||||
private File createCompactionFile(TemporaryFolder basePath, String commitTime)
|
||||
throws IOException {
|
||||
File file = new File(basePath.getRoot().toString() + "/.hoodie/",
|
||||
HoodieTimeline.makeRequestedCompactionFileName(commitTime));
|
||||
private File createCompactionFile(java.nio.file.Path basePath, String commitTime)
|
||||
throws IOException {
|
||||
File file = basePath.resolve(".hoodie")
|
||||
.resolve(HoodieTimeline.makeRequestedCompactionFileName(commitTime)).toFile();
|
||||
assertTrue(file.createNewFile());
|
||||
FileOutputStream os = new FileOutputStream(file);
|
||||
try {
|
||||
@@ -255,14 +254,14 @@ public class TestHoodieParquetInputFormat {
|
||||
|
||||
InputFormatTestUtil.setupIncremental(jobConf, "100", 1);
|
||||
FileStatus[] files = inputFormat.listStatus(jobConf);
|
||||
assertEquals("Pulling 1 commit from 100, should get us the 5 files committed at 200", 5, files.length);
|
||||
assertEquals(5, files.length, "Pulling 1 commit from 100, should get us the 5 files committed at 200");
|
||||
ensureFilesInCommit("Pulling 1 commit from 100, should get us the 5 files committed at 200", files, "200", 5);
|
||||
|
||||
InputFormatTestUtil.setupIncremental(jobConf, "100", 3);
|
||||
files = inputFormat.listStatus(jobConf);
|
||||
|
||||
assertEquals("Pulling 3 commits from 100, should get us the 3 files from 400 commit, 1 file from 300 "
|
||||
+ "commit and 1 file from 200 commit", 5, files.length);
|
||||
assertEquals(5, files.length, "Pulling 3 commits from 100, should get us the 3 files from 400 commit, 1 file from 300 "
|
||||
+ "commit and 1 file from 200 commit");
|
||||
ensureFilesInCommit("Pulling 3 commits from 100, should get us the 3 files from 400 commit", files, "400", 3);
|
||||
ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files from 300 commit", files, "300", 1);
|
||||
ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files from 200 commit", files, "200", 1);
|
||||
@@ -270,8 +269,8 @@ public class TestHoodieParquetInputFormat {
|
||||
InputFormatTestUtil.setupIncremental(jobConf, "100", HoodieHiveUtil.MAX_COMMIT_ALL);
|
||||
files = inputFormat.listStatus(jobConf);
|
||||
|
||||
assertEquals("Pulling all commits from 100, should get us the 1 file from each of 200,300,400,500,400 commits",
|
||||
5, files.length);
|
||||
assertEquals(5, files.length,
|
||||
"Pulling all commits from 100, should get us the 1 file from each of 200,300,400,500,400 commits");
|
||||
ensureFilesInCommit("Pulling all commits from 100, should get us the 1 files from 600 commit", files, "600", 1);
|
||||
ensureFilesInCommit("Pulling all commits from 100, should get us the 1 files from 500 commit", files, "500", 1);
|
||||
ensureFilesInCommit("Pulling all commits from 100, should get us the 1 files from 400 commit", files, "400", 1);
|
||||
@@ -335,29 +334,29 @@ public class TestHoodieParquetInputFormat {
|
||||
File compactionFile = createCompactionFile(basePath, "300");
|
||||
|
||||
// write inserts into new bucket
|
||||
InputFormatTestUtil.simulateInserts(partitionDir, "fileId2", 10, "400");
|
||||
InputFormatTestUtil.simulateInserts(partitionDir, "fileId2", 10, "400");
|
||||
createCommitFile(basePath, "400", "2016/05/01");
|
||||
|
||||
// Add the paths
|
||||
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
|
||||
InputFormatTestUtil.setupIncremental(jobConf, "0", -1);
|
||||
FileStatus[] files = inputFormat.listStatus(jobConf);
|
||||
assertEquals("Pulling all commit from beginning, should not return instants after begin compaction",
|
||||
10, files.length);
|
||||
assertEquals(10, files.length,
|
||||
"Pulling all commit from beginning, should not return instants after begin compaction");
|
||||
ensureFilesInCommit("Pulling all commit from beginning, should not return instants after begin compaction",
|
||||
files, "100", 10);
|
||||
files, "100", 10);
|
||||
|
||||
// delete compaction and verify inserts show up
|
||||
compactionFile.delete();
|
||||
InputFormatTestUtil.setupIncremental(jobConf, "0", -1);
|
||||
files = inputFormat.listStatus(jobConf);
|
||||
assertEquals("after deleting compaction, should get all inserted files",
|
||||
20, files.length);
|
||||
assertEquals(20, files.length,
|
||||
"after deleting compaction, should get all inserted files");
|
||||
|
||||
ensureFilesInCommit("Pulling all commit from beginning, should return instants before requested compaction",
|
||||
files, "100", 10);
|
||||
files, "100", 10);
|
||||
ensureFilesInCommit("Pulling all commit from beginning, should return instants after requested compaction",
|
||||
files, "400", 10);
|
||||
files, "400", 10);
|
||||
|
||||
}
|
||||
|
||||
@@ -381,7 +380,7 @@ public class TestHoodieParquetInputFormat {
|
||||
totalCount++;
|
||||
}
|
||||
}
|
||||
assertEquals(msg, expectedNumberOfRecordsInCommit, actualCount);
|
||||
assertEquals(msg, totalExpected, totalCount);
|
||||
assertEquals(expectedNumberOfRecordsInCommit, actualCount, msg);
|
||||
assertEquals(totalExpected, totalCount, msg);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,28 +18,28 @@
|
||||
|
||||
package org.apache.hudi.hadoop;
|
||||
|
||||
import org.apache.hudi.common.HoodieCommonTestHarness;
|
||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.testutils.HoodieCommonTestHarnessJunit5;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class TestHoodieROTablePathFilter extends HoodieCommonTestHarness {
|
||||
public class TestHoodieROTablePathFilter extends HoodieCommonTestHarnessJunit5 {
|
||||
|
||||
@Before
|
||||
@BeforeEach
|
||||
public void setUp() throws Exception {
|
||||
initMetaClient();
|
||||
}
|
||||
@@ -61,7 +61,7 @@ public class TestHoodieROTablePathFilter extends HoodieCommonTestHarness {
|
||||
|
||||
HoodieROTablePathFilter pathFilter = new HoodieROTablePathFilter();
|
||||
Path partitionPath = new Path("file://" + basePath + File.separator + "2017/01/01");
|
||||
assertTrue("Directories should be accepted", pathFilter.accept(partitionPath));
|
||||
assertTrue(pathFilter.accept(partitionPath), "Directories should be accepted");
|
||||
|
||||
assertTrue(
|
||||
pathFilter.accept(new Path("file:///" + HoodieTestUtils.getDataFilePath(basePath, "2017/01/01", "001", "f1"))));
|
||||
@@ -87,10 +87,8 @@ public class TestHoodieROTablePathFilter extends HoodieCommonTestHarness {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNonHoodiePaths() throws IOException {
|
||||
TemporaryFolder folder = new TemporaryFolder();
|
||||
folder.create();
|
||||
String basePath = folder.getRoot().getAbsolutePath();
|
||||
public void testNonHoodiePaths(@TempDir java.nio.file.Path tempDir) throws IOException {
|
||||
String basePath = tempDir.toAbsolutePath().toString();
|
||||
HoodieROTablePathFilter pathFilter = new HoodieROTablePathFilter();
|
||||
|
||||
String path = basePath + File.separator + "nonhoodiefolder";
|
||||
@@ -100,7 +98,5 @@ public class TestHoodieROTablePathFilter extends HoodieCommonTestHarness {
|
||||
path = basePath + File.separator + "nonhoodiefolder/somefile";
|
||||
new File(path).createNewFile();
|
||||
assertTrue(pathFilter.accept(new Path("file:///" + path)));
|
||||
|
||||
folder.delete();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,19 +18,15 @@
|
||||
|
||||
package org.apache.hudi.hadoop.realtime;
|
||||
|
||||
import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_MAP_WORK;
|
||||
import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_MAPPER_CLASS;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||
import org.apache.hudi.common.HoodieCommonTestHarness;
|
||||
import org.apache.hudi.common.minicluster.MiniClusterUtil;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
||||
import org.apache.hudi.common.testutils.HoodieCommonTestHarnessJunit5;
|
||||
import org.apache.hudi.common.util.SchemaTestUtil;
|
||||
import org.apache.hudi.hadoop.InputFormatTestUtil;
|
||||
import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@@ -47,60 +43,60 @@ import org.apache.hadoop.mapred.FileInputFormat;
|
||||
import org.apache.hadoop.mapred.InputSplit;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.RecordReader;
|
||||
import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedHashMap;
|
||||
|
||||
public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness {
|
||||
import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_MAP_WORK;
|
||||
import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_MAPPER_CLASS;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarnessJunit5 {
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder basePath = new TemporaryFolder();
|
||||
private JobConf jobConf;
|
||||
private FileSystem fs;
|
||||
private Configuration hadoopConf;
|
||||
|
||||
@BeforeClass
|
||||
@BeforeAll
|
||||
public static void setUpClass() throws IOException, InterruptedException {
|
||||
// Append is not supported in LocalFileSystem. HDFS needs to be setup.
|
||||
MiniClusterUtil.setUp();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
@AfterAll
|
||||
public static void tearDownClass() {
|
||||
MiniClusterUtil.shutdown();
|
||||
}
|
||||
|
||||
@Before
|
||||
@BeforeEach
|
||||
public void setUp() throws IOException, InterruptedException {
|
||||
this.fs = MiniClusterUtil.fileSystem;
|
||||
jobConf = new JobConf();
|
||||
hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
|
||||
assertTrue(fs.mkdirs(new Path(folder.getRoot().getPath())));
|
||||
HoodieTestUtils.init(MiniClusterUtil.configuration, basePath.getRoot().getPath(), HoodieTableType.MERGE_ON_READ);
|
||||
HoodieTestUtils.init(MiniClusterUtil.configuration, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
@Disabled
|
||||
public void testHoodieRealtimeCombineHoodieInputFormat() throws Exception {
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
// initial commit
|
||||
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
|
||||
HoodieTestUtils.init(hadoopConf, basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ);
|
||||
HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ);
|
||||
String commitTime = "100";
|
||||
final int numRecords = 1000;
|
||||
// Create 3 parquet files with 1000 records each
|
||||
File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 3, numRecords, commitTime);
|
||||
InputFormatTestUtil.commit(basePath, commitTime);
|
||||
File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir, schema, 3, numRecords, commitTime);
|
||||
InputFormatTestUtil.commit(tempDir, commitTime);
|
||||
|
||||
// insert 1000 update records to log file 0
|
||||
String newCommitTime = "101";
|
||||
@@ -124,10 +120,10 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness {
|
||||
tblDesc.setInputFileFormatClass(HoodieCombineHiveInputFormat.class);
|
||||
PartitionDesc partDesc = new PartitionDesc(tblDesc, null);
|
||||
LinkedHashMap<Path, PartitionDesc> pt = new LinkedHashMap<>();
|
||||
pt.put(new Path(basePath.getRoot().getAbsolutePath()), partDesc);
|
||||
pt.put(new Path(tempDir.toAbsolutePath().toString()), partDesc);
|
||||
MapredWork mrwork = new MapredWork();
|
||||
mrwork.getMapWork().setPathToPartitionInfo(pt);
|
||||
Path mapWorkPath = new Path(basePath.getRoot().getAbsolutePath());
|
||||
Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString());
|
||||
Utilities.setMapRedWork(conf, mrwork, mapWorkPath);
|
||||
jobConf = new JobConf(conf);
|
||||
// Add the paths
|
||||
@@ -143,7 +139,7 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness {
|
||||
InputFormatTestUtil.setPropsForInputFormat(jobConf, schema, tripsHiveColumnTypes);
|
||||
InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1);
|
||||
// Since the SPLIT_SIZE is 3, we should create only 1 split with all 3 file groups
|
||||
assertEquals(splits.length, 1);
|
||||
assertEquals(1, splits.length);
|
||||
RecordReader<NullWritable, ArrayWritable> recordReader =
|
||||
combineHiveInputFormat.getRecordReader(splits[0], jobConf, null);
|
||||
NullWritable nullWritable = recordReader.createKey();
|
||||
|
||||
@@ -52,11 +52,9 @@ import org.apache.hadoop.mapred.FileInputFormat;
|
||||
import org.apache.hadoop.mapred.FileSplit;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.RecordReader;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
@@ -68,29 +66,29 @@ import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.REALTIME_SKIP_MERGE_PROP;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class TestHoodieRealtimeRecordReader {
|
||||
|
||||
private static final String PARTITION_COLUMN = "datestr";
|
||||
@Rule
|
||||
public TemporaryFolder basePath = new TemporaryFolder();
|
||||
private JobConf jobConf;
|
||||
private FileSystem fs;
|
||||
private Configuration hadoopConf;
|
||||
|
||||
@Before
|
||||
@BeforeEach
|
||||
public void setUp() {
|
||||
jobConf = new JobConf();
|
||||
jobConf.set(AbstractRealtimeRecordReader.MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(1024 * 1024));
|
||||
hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
|
||||
fs = FSUtils.getFs(basePath.getRoot().getAbsolutePath(), hadoopConf);
|
||||
fs = FSUtils.getFs(basePath.toString(), hadoopConf);
|
||||
}
|
||||
|
||||
private Writer writeLogFile(File partitionDir, Schema schema, String fileId, String baseCommit, String
|
||||
newCommit,
|
||||
@TempDir
|
||||
public java.nio.file.Path basePath;
|
||||
|
||||
private Writer writeLogFile(File partitionDir, Schema schema, String fileId, String baseCommit, String newCommit,
|
||||
int numberOfRecords) throws InterruptedException, IOException {
|
||||
return InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, fileId, baseCommit, newCommit,
|
||||
numberOfRecords, 0,
|
||||
@@ -125,7 +123,7 @@ public class TestHoodieRealtimeRecordReader {
|
||||
private void testReader(boolean partitioned) throws Exception {
|
||||
// initial commit
|
||||
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
|
||||
HoodieTestUtils.init(hadoopConf, basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ);
|
||||
HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ);
|
||||
String baseInstant = "100";
|
||||
File partitionDir = partitioned ? InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, 100, baseInstant)
|
||||
: InputFormatTestUtil.prepareNonPartitionedParquetTable(basePath, schema, 1, 100, baseInstant);
|
||||
@@ -139,7 +137,7 @@ public class TestHoodieRealtimeRecordReader {
|
||||
// TODO: HUDI-154 Once Hive 2.x PR (PR-674) is merged, enable this change
|
||||
// logVersionsWithAction.add(Pair.of(HoodieTimeline.ROLLBACK_ACTION, 3));
|
||||
FileSlice fileSlice =
|
||||
new FileSlice(partitioned ? FSUtils.getRelativePartitionPath(new Path(basePath.getRoot().getAbsolutePath()),
|
||||
new FileSlice(partitioned ? FSUtils.getRelativePartitionPath(new Path(basePath.toString()),
|
||||
new Path(partitionDir.getAbsolutePath())) : "default", baseInstant, "fileid0");
|
||||
logVersionsWithAction.forEach(logVersionWithAction -> {
|
||||
try {
|
||||
@@ -163,13 +161,13 @@ public class TestHoodieRealtimeRecordReader {
|
||||
}
|
||||
long size = writer.getCurrentSize();
|
||||
writer.close();
|
||||
assertTrue("block - size should be > 0", size > 0);
|
||||
assertTrue(size > 0, "block - size should be > 0");
|
||||
|
||||
// create a split with baseFile (parquet file written earlier) and new log file(s)
|
||||
fileSlice.addLogFile(writer.getLogFile());
|
||||
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
|
||||
new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + baseInstant + ".parquet"), 0, 1, jobConf),
|
||||
basePath.getRoot().getPath(), fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
|
||||
basePath.toString(), fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
|
||||
.map(h -> h.getPath().toString()).collect(Collectors.toList()),
|
||||
instantTime);
|
||||
|
||||
@@ -210,7 +208,7 @@ public class TestHoodieRealtimeRecordReader {
|
||||
public void testUnMergedReader() throws Exception {
|
||||
// initial commit
|
||||
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
|
||||
HoodieTestUtils.init(hadoopConf, basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ);
|
||||
HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ);
|
||||
String instantTime = "100";
|
||||
final int numRecords = 1000;
|
||||
final int firstBatchLastRecordKey = numRecords - 1;
|
||||
@@ -227,13 +225,13 @@ public class TestHoodieRealtimeRecordReader {
|
||||
numRecords, numRecords, 0);
|
||||
long size = writer.getCurrentSize();
|
||||
writer.close();
|
||||
assertTrue("block - size should be > 0", size > 0);
|
||||
assertTrue(size > 0, "block - size should be > 0");
|
||||
|
||||
// create a split with baseFile (parquet file written earlier) and new log file(s)
|
||||
String logFilePath = writer.getLogFile().getPath().toString();
|
||||
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
|
||||
new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, jobConf),
|
||||
basePath.getRoot().getPath(), Collections.singletonList(logFilePath), newCommitTime);
|
||||
basePath.toString(), Collections.singletonList(logFilePath), newCommitTime);
|
||||
|
||||
// create a RecordReader to be used by HoodieRealtimeRecordReader
|
||||
RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader(
|
||||
@@ -262,17 +260,17 @@ public class TestHoodieRealtimeRecordReader {
|
||||
int gotKey = Integer.parseInt(keyStr.substring("key".length()));
|
||||
if (gotCommit.equals(newCommitTime)) {
|
||||
numRecordsAtCommit2++;
|
||||
Assert.assertTrue(gotKey > firstBatchLastRecordKey);
|
||||
Assert.assertTrue(gotKey <= secondBatchLastRecordKey);
|
||||
assertTrue(gotKey > firstBatchLastRecordKey);
|
||||
assertTrue(gotKey <= secondBatchLastRecordKey);
|
||||
assertEquals(gotKey, lastSeenKeyFromLog + 1);
|
||||
lastSeenKeyFromLog++;
|
||||
} else {
|
||||
numRecordsAtCommit1++;
|
||||
Assert.assertTrue(gotKey >= 0);
|
||||
Assert.assertTrue(gotKey <= firstBatchLastRecordKey);
|
||||
assertTrue(gotKey >= 0);
|
||||
assertTrue(gotKey <= firstBatchLastRecordKey);
|
||||
}
|
||||
// Ensure unique key
|
||||
Assert.assertFalse(seenKeys.contains(gotKey));
|
||||
assertFalse(seenKeys.contains(gotKey));
|
||||
seenKeys.add(gotKey);
|
||||
key = recordReader.createKey();
|
||||
value = recordReader.createValue();
|
||||
@@ -288,7 +286,7 @@ public class TestHoodieRealtimeRecordReader {
|
||||
public void testReaderWithNestedAndComplexSchema() throws Exception {
|
||||
// initial commit
|
||||
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getComplexEvolvedSchema());
|
||||
HoodieTestUtils.init(hadoopConf, basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ);
|
||||
HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ);
|
||||
String instantTime = "100";
|
||||
int numberOfRecords = 100;
|
||||
int numberOfLogRecords = numberOfRecords / 2;
|
||||
@@ -303,14 +301,14 @@ public class TestHoodieRealtimeRecordReader {
|
||||
writeLogFile(partitionDir, schema, "fileid0", instantTime, newCommitTime, numberOfLogRecords);
|
||||
long size = writer.getCurrentSize();
|
||||
writer.close();
|
||||
assertTrue("block - size should be > 0", size > 0);
|
||||
assertTrue(size > 0, "block - size should be > 0");
|
||||
InputFormatTestUtil.deltaCommit(basePath, newCommitTime);
|
||||
|
||||
// create a split with baseFile (parquet file written earlier) and new log file(s)
|
||||
String logFilePath = writer.getLogFile().getPath().toString();
|
||||
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
|
||||
new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, jobConf),
|
||||
basePath.getRoot().getPath(), Collections.singletonList(logFilePath), newCommitTime);
|
||||
basePath.toString(), Collections.singletonList(logFilePath), newCommitTime);
|
||||
|
||||
// create a RecordReader to be used by HoodieRealtimeRecordReader
|
||||
RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader(
|
||||
@@ -345,66 +343,69 @@ public class TestHoodieRealtimeRecordReader {
|
||||
value = recordReader.createValue();
|
||||
|
||||
// Assert type STRING
|
||||
assertEquals("test value for field: field1", values[5].toString(), "field" + currentRecordNo);
|
||||
assertEquals("test value for field: field2", values[6].toString(),
|
||||
"field" + currentRecordNo + recordCommitTimeSuffix);
|
||||
assertEquals("test value for field: name", values[7].toString(), "name" + currentRecordNo);
|
||||
assertEquals(values[5].toString(), "field" + currentRecordNo, "test value for field: field1");
|
||||
assertEquals(values[6].toString(), "field" + currentRecordNo + recordCommitTimeSuffix,
|
||||
"test value for field: field2");
|
||||
assertEquals(values[7].toString(), "name" + currentRecordNo,
|
||||
"test value for field: name");
|
||||
|
||||
// Assert type INT
|
||||
IntWritable intWritable = (IntWritable) values[8];
|
||||
assertEquals("test value for field: favoriteIntNumber", intWritable.get(),
|
||||
currentRecordNo + recordCommitTime.hashCode());
|
||||
assertEquals(intWritable.get(), currentRecordNo + recordCommitTime.hashCode(),
|
||||
"test value for field: favoriteIntNumber");
|
||||
|
||||
// Assert type LONG
|
||||
LongWritable longWritable = (LongWritable) values[9];
|
||||
assertEquals("test value for field: favoriteNumber", longWritable.get(),
|
||||
currentRecordNo + recordCommitTime.hashCode());
|
||||
assertEquals(longWritable.get(), currentRecordNo + recordCommitTime.hashCode(),
|
||||
"test value for field: favoriteNumber");
|
||||
|
||||
// Assert type FLOAT
|
||||
FloatWritable floatWritable = (FloatWritable) values[10];
|
||||
assertEquals("test value for field: favoriteFloatNumber", floatWritable.get(),
|
||||
(float) ((currentRecordNo + recordCommitTime.hashCode()) / 1024.0), 0);
|
||||
assertEquals(floatWritable.get(), (float) ((currentRecordNo + recordCommitTime.hashCode()) / 1024.0), 0,
|
||||
"test value for field: favoriteFloatNumber");
|
||||
|
||||
// Assert type DOUBLE
|
||||
DoubleWritable doubleWritable = (DoubleWritable) values[11];
|
||||
assertEquals("test value for field: favoriteDoubleNumber", doubleWritable.get(),
|
||||
(currentRecordNo + recordCommitTime.hashCode()) / 1024.0, 0);
|
||||
assertEquals(doubleWritable.get(), (currentRecordNo + recordCommitTime.hashCode()) / 1024.0, 0,
|
||||
"test value for field: favoriteDoubleNumber");
|
||||
|
||||
// Assert type MAP
|
||||
ArrayWritable mapItem = (ArrayWritable) values[12];
|
||||
Writable mapItemValue1 = mapItem.get()[0];
|
||||
Writable mapItemValue2 = mapItem.get()[1];
|
||||
|
||||
assertEquals("test value for field: tags", ((ArrayWritable) mapItemValue1).get()[0].toString(),
|
||||
"mapItem1");
|
||||
assertEquals("test value for field: tags", ((ArrayWritable) mapItemValue2).get()[0].toString(),
|
||||
"mapItem2");
|
||||
assertEquals("test value for field: tags", ((ArrayWritable) mapItemValue1).get().length, 2);
|
||||
assertEquals("test value for field: tags", ((ArrayWritable) mapItemValue2).get().length, 2);
|
||||
assertEquals(((ArrayWritable) mapItemValue1).get()[0].toString(), "mapItem1",
|
||||
"test value for field: tags");
|
||||
assertEquals(((ArrayWritable) mapItemValue2).get()[0].toString(), "mapItem2",
|
||||
"test value for field: tags");
|
||||
assertEquals(((ArrayWritable) mapItemValue1).get().length, 2,
|
||||
"test value for field: tags");
|
||||
assertEquals(((ArrayWritable) mapItemValue2).get().length, 2,
|
||||
"test value for field: tags");
|
||||
Writable mapItemValue1value = ((ArrayWritable) mapItemValue1).get()[1];
|
||||
Writable mapItemValue2value = ((ArrayWritable) mapItemValue2).get()[1];
|
||||
assertEquals("test value for field: tags[\"mapItem1\"].item1",
|
||||
((ArrayWritable) mapItemValue1value).get()[0].toString(), "item" + currentRecordNo);
|
||||
assertEquals("test value for field: tags[\"mapItem2\"].item1",
|
||||
((ArrayWritable) mapItemValue2value).get()[0].toString(), "item2" + currentRecordNo);
|
||||
assertEquals("test value for field: tags[\"mapItem1\"].item2",
|
||||
((ArrayWritable) mapItemValue1value).get()[1].toString(), "item" + currentRecordNo + recordCommitTimeSuffix);
|
||||
assertEquals("test value for field: tags[\"mapItem2\"].item2",
|
||||
((ArrayWritable) mapItemValue2value).get()[1].toString(), "item2" + currentRecordNo + recordCommitTimeSuffix);
|
||||
assertEquals(((ArrayWritable) mapItemValue1value).get()[0].toString(), "item" + currentRecordNo,
|
||||
"test value for field: tags[\"mapItem1\"].item1");
|
||||
assertEquals(((ArrayWritable) mapItemValue2value).get()[0].toString(), "item2" + currentRecordNo,
|
||||
"test value for field: tags[\"mapItem2\"].item1");
|
||||
assertEquals(((ArrayWritable) mapItemValue1value).get()[1].toString(), "item" + currentRecordNo + recordCommitTimeSuffix,
|
||||
"test value for field: tags[\"mapItem1\"].item2");
|
||||
assertEquals(((ArrayWritable) mapItemValue2value).get()[1].toString(), "item2" + currentRecordNo + recordCommitTimeSuffix,
|
||||
"test value for field: tags[\"mapItem2\"].item2");
|
||||
|
||||
// Assert type RECORD
|
||||
ArrayWritable recordItem = (ArrayWritable) values[13];
|
||||
Writable[] nestedRecord = recordItem.get();
|
||||
assertFalse("test value for field: testNestedRecord.isAdmin", ((BooleanWritable) nestedRecord[0]).get());
|
||||
assertEquals("test value for field: testNestedRecord.userId", nestedRecord[1].toString(),
|
||||
"UserId" + currentRecordNo + recordCommitTimeSuffix);
|
||||
assertFalse(((BooleanWritable) nestedRecord[0]).get(), "test value for field: testNestedRecord.isAdmin");
|
||||
assertEquals(nestedRecord[1].toString(), "UserId" + currentRecordNo + recordCommitTimeSuffix,
|
||||
"test value for field: testNestedRecord.userId");
|
||||
|
||||
// Assert type ARRAY
|
||||
ArrayWritable arrayValue = (ArrayWritable) values[14];
|
||||
Writable[] arrayValues = arrayValue.get();
|
||||
for (int i = 0; i < arrayValues.length; i++) {
|
||||
assertEquals("test value for field: stringArray", "stringArray" + i + recordCommitTimeSuffix,
|
||||
arrayValues[i].toString());
|
||||
assertEquals("stringArray" + i + recordCommitTimeSuffix, arrayValues[i].toString(),
|
||||
"test value for field: stringArray");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -414,7 +415,7 @@ public class TestHoodieRealtimeRecordReader {
|
||||
// initial commit
|
||||
List<String> logFilePaths = new ArrayList<>();
|
||||
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
|
||||
HoodieTestUtils.init(hadoopConf, basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ);
|
||||
HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ);
|
||||
String instantTime = "100";
|
||||
int numberOfRecords = 100;
|
||||
int numberOfLogRecords = numberOfRecords / 2;
|
||||
@@ -434,7 +435,7 @@ public class TestHoodieRealtimeRecordReader {
|
||||
long size = writer.getCurrentSize();
|
||||
logFilePaths.add(writer.getLogFile().getPath().toString());
|
||||
writer.close();
|
||||
assertTrue("block - size should be > 0", size > 0);
|
||||
assertTrue(size > 0, "block - size should be > 0");
|
||||
|
||||
// write rollback for the previous block in new log file version
|
||||
newCommitTime = "102";
|
||||
@@ -447,7 +448,7 @@ public class TestHoodieRealtimeRecordReader {
|
||||
// create a split with baseFile (parquet file written earlier) and new log file(s)
|
||||
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
|
||||
new FileSplit(new Path(partitionDir + "/fileid0_1_" + instantTime + ".parquet"), 0, 1, jobConf),
|
||||
basePath.getRoot().getPath(), logFilePaths, newCommitTime);
|
||||
basePath.toString(), logFilePaths, newCommitTime);
|
||||
|
||||
// create a RecordReader to be used by HoodieRealtimeRecordReader
|
||||
RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader(
|
||||
|
||||
Reference in New Issue
Block a user