1
0

[HUDI-811] Restructure test packages (#1607)

* restructure hudi-spark tests
* restructure hudi-timeline-service tests
* restructure hudi-hadoop-mr hudi-utilities tests
* restructure hudi-hive-sync tests
This commit is contained in:
Raymond Xu
2020-05-13 15:37:03 -07:00
committed by GitHub
parent 32bada29dc
commit 0d4848b68b
36 changed files with 106 additions and 80 deletions

View File

@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;

View File

@@ -42,7 +42,7 @@ import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
public class InputPathHandlerTest { public class TestInputPathHandler {
// Incremental Table // Incremental Table
public static final String RAW_TRIPS_TEST_NAME = "raw_trips"; public static final String RAW_TRIPS_TEST_NAME = "raw_trips";

View File

@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.hadoop.realtime; package org.apache.hudi.hadoop.hive;
import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.minicluster.MiniClusterUtil; import org.apache.hudi.common.minicluster.MiniClusterUtil;
@@ -25,8 +25,7 @@ import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.util.SchemaTestUtil; import org.apache.hudi.common.util.SchemaTestUtil;
import org.apache.hudi.hadoop.InputFormatTestUtil; import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;

View File

@@ -30,7 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.SchemaTestUtil; import org.apache.hudi.common.util.SchemaTestUtil;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.InputFormatTestUtil; import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Field;

View File

@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.hadoop; package org.apache.hudi.hadoop.testutils;
import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock; import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.SchemaTestUtil; import org.apache.hudi.common.util.SchemaTestUtil;
import org.apache.hudi.hadoop.HoodieHiveUtil;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SchemaTestUtil; import org.apache.hudi.common.util.SchemaTestUtil;
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent; import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent;
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent.PartitionEventType; import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.hive.testutils.TestUtil;
import org.apache.hudi.hive.util.HiveSchemaUtil; import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Partition;

View File

@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.hive.util; package org.apache.hudi.hive.testutils;
import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.FileIOUtils;

View File

@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.hive; package org.apache.hudi.hive.testutils;
import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilter;
@@ -41,7 +41,9 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.SchemaTestUtil; import org.apache.hudi.common.util.SchemaTestUtil;
import org.apache.hudi.hive.util.HiveTestService; import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
@@ -85,9 +87,9 @@ public class TestUtil {
private static HiveServer2 hiveServer; private static HiveServer2 hiveServer;
private static HiveTestService hiveTestService; private static HiveTestService hiveTestService;
private static Configuration configuration; private static Configuration configuration;
static HiveSyncConfig hiveSyncConfig; public static HiveSyncConfig hiveSyncConfig;
private static DateTimeFormatter dtfOut; private static DateTimeFormatter dtfOut;
static FileSystem fileSystem; public static FileSystem fileSystem;
private static Set<String> createdTablesSet = new HashSet<>(); private static Set<String> createdTablesSet = new HashSet<>();
public static void setUp() throws IOException, InterruptedException { public static void setUp() throws IOException, InterruptedException {
@@ -122,7 +124,7 @@ public class TestUtil {
clear(); clear();
} }
static void clear() throws IOException { public static void clear() throws IOException {
fileSystem.delete(new Path(hiveSyncConfig.basePath), true); fileSystem.delete(new Path(hiveSyncConfig.basePath), true);
HoodieTableMetaClient.initTableType(configuration, hiveSyncConfig.basePath, HoodieTableType.COPY_ON_WRITE, HoodieTableMetaClient.initTableType(configuration, hiveSyncConfig.basePath, HoodieTableType.COPY_ON_WRITE,
hiveSyncConfig.tableName, HoodieAvroPayload.class.getName()); hiveSyncConfig.tableName, HoodieAvroPayload.class.getName());
@@ -136,7 +138,7 @@ public class TestUtil {
client.updateHiveSQL("create database " + hiveSyncConfig.databaseName); client.updateHiveSQL("create database " + hiveSyncConfig.databaseName);
} }
static HiveConf getHiveConf() { public static HiveConf getHiveConf() {
return hiveServer.getHiveConf(); return hiveServer.getHiveConf();
} }
@@ -155,7 +157,7 @@ public class TestUtil {
} }
} }
static void createCOWTable(String instantTime, int numberOfPartitions, boolean useSchemaFromCommitMetadata) public static void createCOWTable(String instantTime, int numberOfPartitions, boolean useSchemaFromCommitMetadata)
throws IOException, URISyntaxException { throws IOException, URISyntaxException {
Path path = new Path(hiveSyncConfig.basePath); Path path = new Path(hiveSyncConfig.basePath);
FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath)); FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
@@ -170,7 +172,7 @@ public class TestUtil {
createCommitFile(commitMetadata, instantTime); createCommitFile(commitMetadata, instantTime);
} }
static void createMORTable(String commitTime, String deltaCommitTime, int numberOfPartitions, public static void createMORTable(String commitTime, String deltaCommitTime, int numberOfPartitions,
boolean createDeltaCommit, boolean useSchemaFromCommitMetadata) boolean createDeltaCommit, boolean useSchemaFromCommitMetadata)
throws IOException, URISyntaxException, InterruptedException { throws IOException, URISyntaxException, InterruptedException {
Path path = new Path(hiveSyncConfig.basePath); Path path = new Path(hiveSyncConfig.basePath);
@@ -200,7 +202,7 @@ public class TestUtil {
} }
} }
static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, public static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple,
boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws IOException, URISyntaxException { boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata commitMetadata =
createPartitions(numberOfPartitions, isParquetSchemaSimple, useSchemaFromCommitMetadata, startFrom, instantTime); createPartitions(numberOfPartitions, isParquetSchemaSimple, useSchemaFromCommitMetadata, startFrom, instantTime);
@@ -208,7 +210,7 @@ public class TestUtil {
createCommitFile(commitMetadata, instantTime); createCommitFile(commitMetadata, instantTime);
} }
static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple, public static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple,
boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime, String deltaCommitTime) boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime, String deltaCommitTime)
throws IOException, URISyntaxException, InterruptedException { throws IOException, URISyntaxException, InterruptedException {
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, isParquetSchemaSimple, HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, isParquetSchemaSimple,

View File

@@ -28,6 +28,7 @@ import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.hive.NonPartitionedExtractor; import org.apache.hudi.hive.NonPartitionedExtractor;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator; import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.testutils.DataSourceTestUtils;
import com.beust.jcommander.JCommander; import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameter;

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.testutils.DataSourceTestUtils;
import com.beust.jcommander.JCommander; import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameter;

View File

@@ -16,13 +16,14 @@
* limitations under the License. * limitations under the License.
*/ */
import org.apache.hudi.DataSourceUtils; package org.apache.hudi;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.testutils.DataSourceTestUtils;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData;
@@ -52,7 +53,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class DataSourceUtilsTest { public class TestDataSourceUtils {
@Mock @Mock
private HoodieWriteClient hoodieWriteClient; private HoodieWriteClient hoodieWriteClient;

View File

@@ -16,12 +16,15 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.testutils;
import org.apache.hudi.common.TestRawTripPayload; import org.apache.hudi.common.TestRawTripPayload;
import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner; import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import java.io.IOException; import java.io.IOException;

View File

@@ -15,8 +15,9 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi
import org.apache.avro.generic.GenericRecord import org.apache.avro.generic.GenericRecord
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.{EmptyHoodieRecordPayload, OverwriteWithLatestAvroPayload} import org.apache.hudi.common.model.{EmptyHoodieRecordPayload, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.util.{Option, SchemaTestUtil} import org.apache.hudi.common.util.{Option, SchemaTestUtil}

View File

@@ -15,12 +15,13 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi package org.apache.hudi.functional
import java.util.{Date, UUID} import java.util.{Date, UUID}
import org.apache.commons.io.FileUtils import org.apache.commons.io.FileUtils
import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkSqlWriter
import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.{SaveMode, SparkSession}

View File

@@ -15,10 +15,13 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.functional
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.common.HoodieTestDataGenerator import org.apache.hudi.common.HoodieTestDataGenerator
import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.functions.col import org.apache.spark.sql.functions.col

View File

@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.timeline.table.view; package org.apache.hudi.timeline.service.functional;
import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;

View File

@@ -82,8 +82,8 @@ public class HoodieSnapshotExporter {
public static class OutputFormatValidator implements IValueValidator<String> { public static class OutputFormatValidator implements IValueValidator<String> {
static final String HUDI = "hudi"; public static final String HUDI = "hudi";
static final List<String> FORMATS = CollectionUtils.createImmutableList("json", "parquet", HUDI); public static final List<String> FORMATS = CollectionUtils.createImmutableList("json", "parquet", HUDI);
@Override @Override
public void validate(String name, String value) { public void validate(String name, String value) {
@@ -97,20 +97,20 @@ public class HoodieSnapshotExporter {
public static class Config implements Serializable { public static class Config implements Serializable {
@Parameter(names = {"--source-base-path"}, description = "Base path for the source Hudi dataset to be snapshotted", required = true) @Parameter(names = {"--source-base-path"}, description = "Base path for the source Hudi dataset to be snapshotted", required = true)
String sourceBasePath; public String sourceBasePath;
@Parameter(names = {"--target-output-path"}, description = "Base path for the target output files (snapshots)", required = true) @Parameter(names = {"--target-output-path"}, description = "Base path for the target output files (snapshots)", required = true)
String targetOutputPath; public String targetOutputPath;
@Parameter(names = {"--output-format"}, description = "Output format for the exported dataset; accept these values: json|parquet|hudi", required = true, @Parameter(names = {"--output-format"}, description = "Output format for the exported dataset; accept these values: json|parquet|hudi", required = true,
validateValueWith = OutputFormatValidator.class) validateValueWith = OutputFormatValidator.class)
String outputFormat; public String outputFormat;
@Parameter(names = {"--output-partition-field"}, description = "A field to be used by Spark repartitioning") @Parameter(names = {"--output-partition-field"}, description = "A field to be used by Spark repartitioning")
String outputPartitionField = null; public String outputPartitionField = null;
@Parameter(names = {"--output-partitioner"}, description = "A class to facilitate custom repartitioning") @Parameter(names = {"--output-partitioner"}, description = "A class to facilitate custom repartitioning")
String outputPartitioner = null; public String outputPartitioner = null;
} }
public void export(JavaSparkContext jsc, Config cfg) throws IOException { public void export(JavaSparkContext jsc, Config cfg) throws IOException {

View File

@@ -16,11 +16,9 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.utilities; package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.SchedulerConfGenerator;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;

View File

@@ -16,9 +16,10 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.utilities; package org.apache.hudi.utilities.functional;
import org.apache.hudi.payload.AWSDmsAvroPayload; import org.apache.hudi.payload.AWSDmsAvroPayload;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.transform.AWSDmsTransformer; import org.apache.hudi.utilities.transform.AWSDmsTransformer;
import org.apache.avro.Schema; import org.apache.avro.Schema;

View File

@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.utilities; package org.apache.hudi.utilities.functional;
import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.HoodieWriteClient;
@@ -26,6 +26,7 @@ import org.apache.hudi.common.minicluster.HdfsTestService;
import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.utilities.HDFSParquetImporter;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;

View File

@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.utilities; package org.apache.hudi.utilities.functional;
import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.HoodieTestDataGenerator;
@@ -43,12 +43,13 @@ import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.CsvDFSSource; import org.apache.hudi.utilities.sources.CsvDFSSource;
import org.apache.hudi.utilities.sources.DistributedTestDataSource;
import org.apache.hudi.utilities.sources.HoodieIncrSource; import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.InputBatch; import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.ParquetDFSSource; import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.TestDataSource; import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.sources.config.TestSourceConfig; import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource;
import org.apache.hudi.utilities.testutils.sources.config.TestSourceConfig;
import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer; import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
import org.apache.hudi.utilities.transform.Transformer; import org.apache.hudi.utilities.transform.Transformer;
@@ -389,7 +390,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getConfig(); new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getConfig();
assertEquals(2, props.getInteger("hoodie.upsert.shuffle.parallelism")); assertEquals(2, props.getInteger("hoodie.upsert.shuffle.parallelism"));
assertEquals("_row_key", props.getString("hoodie.datasource.write.recordkey.field")); assertEquals("_row_key", props.getString("hoodie.datasource.write.recordkey.field"));
assertEquals("org.apache.hudi.utilities.TestHoodieDeltaStreamer$TestGenerator", assertEquals("org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestGenerator",
props.getString("hoodie.datasource.write.keygenerator.class")); props.getString("hoodie.datasource.write.keygenerator.class"));
} }

View File

@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.utilities; package org.apache.hudi.utilities.functional;
import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.HoodieTestDataGenerator;

View File

@@ -16,12 +16,13 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.utilities; package org.apache.hudi.utilities.functional;
import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.utilities.HoodieSnapshotCopier;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;

View File

@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.utilities; package org.apache.hudi.utilities.functional;
import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.common.HoodieClientTestHarness; import org.apache.hudi.common.HoodieClientTestHarness;
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.utilities.HoodieSnapshotExporter;
import org.apache.hudi.utilities.HoodieSnapshotExporter.Config; import org.apache.hudi.utilities.HoodieSnapshotExporter.Config;
import org.apache.hudi.utilities.HoodieSnapshotExporter.OutputFormatValidator; import org.apache.hudi.utilities.HoodieSnapshotExporter.OutputFormatValidator;
import org.apache.hudi.utilities.HoodieSnapshotExporter.Partitioner; import org.apache.hudi.utilities.HoodieSnapshotExporter.Partitioner;

View File

@@ -16,11 +16,13 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.utilities; package org.apache.hudi.utilities.functional;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.JdbcbasedSchemaProvider; import org.apache.hudi.utilities.schema.JdbcbasedSchemaProvider;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;

View File

@@ -16,13 +16,12 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.utilities; package org.apache.hudi.utilities.keygen;
import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.util.SchemaTestUtil; import org.apache.hudi.common.util.SchemaTestUtil;
import org.apache.hudi.utilities.keygen.TimestampBasedKeyGenerator;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;

View File

@@ -20,8 +20,9 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.utilities.UtilitiesTestBase;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.testutils.sources.AbstractDFSSourceTestBase;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@@ -45,7 +46,7 @@ public class TestCsvDFSSource extends AbstractDFSSourceTestBase {
} }
@Override @Override
Source prepareDFSSource() { public Source prepareDFSSource() {
TypedProperties props = new TypedProperties(); TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot); props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot);
props.setProperty("hoodie.deltastreamer.csv.header", Boolean.toString(true)); props.setProperty("hoodie.deltastreamer.csv.header", Boolean.toString(true));
@@ -54,7 +55,7 @@ public class TestCsvDFSSource extends AbstractDFSSourceTestBase {
} }
@Override @Override
void writeNewDataToFile(List<HoodieRecord> records, Path path) throws IOException { public void writeNewDataToFile(List<HoodieRecord> records, Path path) throws IOException {
UtilitiesTestBase.Helpers.saveCsvToDFS( UtilitiesTestBase.Helpers.saveCsvToDFS(
true, '\t', Helpers.jsonifyRecords(records), dfs, path.toString()); true, '\t', Helpers.jsonifyRecords(records), dfs, path.toString());
} }

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;

View File

@@ -20,7 +20,8 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.utilities.UtilitiesTestBase; import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.testutils.sources.AbstractDFSSourceTestBase;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@@ -41,14 +42,14 @@ public class TestJsonDFSSource extends AbstractDFSSourceTestBase {
} }
@Override @Override
Source prepareDFSSource() { public Source prepareDFSSource() {
TypedProperties props = new TypedProperties(); TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot); props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot);
return new JsonDFSSource(props, jsc, sparkSession, schemaProvider); return new JsonDFSSource(props, jsc, sparkSession, schemaProvider);
} }
@Override @Override
void writeNewDataToFile(List<HoodieRecord> records, Path path) throws IOException { public void writeNewDataToFile(List<HoodieRecord> records, Path path) throws IOException {
UtilitiesTestBase.Helpers.saveStringsToDFS( UtilitiesTestBase.Helpers.saveStringsToDFS(
Helpers.jsonifyRecords(records), dfs, path.toString()); Helpers.jsonifyRecords(records), dfs, path.toString());
} }

View File

@@ -22,11 +22,11 @@ import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.UtilitiesTestBase;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter; import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.utilities.testutils.sources.AbstractDFSSourceTestBase;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@@ -40,14 +41,14 @@ public class TestParquetDFSSource extends AbstractDFSSourceTestBase {
} }
@Override @Override
Source prepareDFSSource() { public Source prepareDFSSource() {
TypedProperties props = new TypedProperties(); TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot); props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot);
return new ParquetDFSSource(props, jsc, sparkSession, schemaProvider); return new ParquetDFSSource(props, jsc, sparkSession, schemaProvider);
} }
@Override @Override
void writeNewDataToFile(List<HoodieRecord> records, Path path) throws IOException { public void writeNewDataToFile(List<HoodieRecord> records, Path path) throws IOException {
Helpers.saveParquetToDFS(Helpers.toGenericRecords(records), path); Helpers.saveParquetToDFS(Helpers.toGenericRecords(records), path);
} }
} }

View File

@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.utilities; package org.apache.hudi.utilities.testutils;
import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.TestRawTripPayload; import org.apache.hudi.common.TestRawTripPayload;
@@ -31,7 +31,8 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveClient; import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.hive.util.HiveTestService; import org.apache.hudi.hive.testutils.HiveTestService;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.sources.TestDataSource; import org.apache.hudi.utilities.sources.TestDataSource;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
@@ -91,7 +92,7 @@ public class UtilitiesTestBase {
initClass(false); initClass(false);
} }
static void initClass(boolean startHiveService) throws Exception { public static void initClass(boolean startHiveService) throws Exception {
hdfsTestService = new HdfsTestService(); hdfsTestService = new HdfsTestService();
dfsCluster = hdfsTestService.start(true); dfsCluster = hdfsTestService.start(true);
dfs = dfsCluster.getFileSystem(); dfs = dfsCluster.getFileSystem();

View File

@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.utilities.sources; package org.apache.hudi.utilities.testutils.sources;
import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
@@ -25,7 +25,8 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.RocksDBBasedMap; import org.apache.hudi.common.util.collection.RocksDBBasedMap;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.config.TestSourceConfig; import org.apache.hudi.utilities.sources.AvroSource;
import org.apache.hudi.utilities.testutils.sources.config.TestSourceConfig;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
@@ -44,7 +45,7 @@ public abstract class AbstractBaseTestSource extends AvroSource {
private static final Logger LOG = LogManager.getLogger(AbstractBaseTestSource.class); private static final Logger LOG = LogManager.getLogger(AbstractBaseTestSource.class);
static final int DEFAULT_PARTITION_NUM = 0; public static final int DEFAULT_PARTITION_NUM = 0;
// Static instance, helps with reuse across a test. // Static instance, helps with reuse across a test.
protected static transient Map<Integer, HoodieTestDataGenerator> dataGeneratorMap = new HashMap<>(); protected static transient Map<Integer, HoodieTestDataGenerator> dataGeneratorMap = new HashMap<>();

View File

@@ -16,15 +16,17 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.utilities.sources; package org.apache.hudi.utilities.testutils.sources;
import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.UtilitiesTestBase;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter; import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@@ -52,11 +54,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
*/ */
public abstract class AbstractDFSSourceTestBase extends UtilitiesTestBase { public abstract class AbstractDFSSourceTestBase extends UtilitiesTestBase {
FilebasedSchemaProvider schemaProvider; protected FilebasedSchemaProvider schemaProvider;
String dfsRoot; protected String dfsRoot;
String fileSuffix; protected String fileSuffix;
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); protected HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
boolean useFlattenedSchema = false; protected boolean useFlattenedSchema = false;
@BeforeAll @BeforeAll
public static void initClass() throws Exception { public static void initClass() throws Exception {
@@ -84,16 +86,15 @@ public abstract class AbstractDFSSourceTestBase extends UtilitiesTestBase {
* *
* @return A {@link Source} using DFS as the file system. * @return A {@link Source} using DFS as the file system.
*/ */
abstract Source prepareDFSSource(); protected abstract Source prepareDFSSource();
/** /**
* Writes test data, i.e., a {@link List} of {@link HoodieRecord}, to a file on DFS. * Writes test data, i.e., a {@link List} of {@link HoodieRecord}, to a file on DFS.
* *
* @param records Test data. * @param records Test data.
* @param path The path in {@link Path} of the file to write. * @param path The path in {@link Path} of the file to write.
* @throws IOException
*/ */
abstract void writeNewDataToFile(List<HoodieRecord> records, Path path) throws IOException; protected abstract void writeNewDataToFile(List<HoodieRecord> records, Path path) throws IOException;
/** /**
* Generates a batch of test data and writes the data to a file. * Generates a batch of test data and writes the data to a file.
@@ -102,9 +103,8 @@ public abstract class AbstractDFSSourceTestBase extends UtilitiesTestBase {
* @param instantTime The commit time. * @param instantTime The commit time.
* @param n The number of records to generate. * @param n The number of records to generate.
* @return The file path. * @return The file path.
* @throws IOException
*/ */
Path generateOneFile(String filename, String instantTime, int n) throws IOException { protected Path generateOneFile(String filename, String instantTime, int n) throws IOException {
Path path = new Path(dfsRoot, filename + fileSuffix); Path path = new Path(dfsRoot, filename + fileSuffix);
writeNewDataToFile(dataGenerator.generateInserts(instantTime, n, useFlattenedSchema), path); writeNewDataToFile(dataGenerator.generateInserts(instantTime, n, useFlattenedSchema), path);
return path; return path;

View File

@@ -16,12 +16,13 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.utilities.sources; package org.apache.hudi.utilities.testutils.sources;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.config.TestSourceConfig; import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.testutils.sources.config.TestSourceConfig;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;

View File

@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.utilities.sources.config; package org.apache.hudi.utilities.testutils.sources.config;
/** /**
* Configurations for Test Data Sources. * Configurations for Test Data Sources.