1
0

HUDI-123 Rename code packages/constants to org.apache.hudi (#830)

- Rename com.uber.hoodie to org.apache.hudi
- Flag to pass com.uber.hoodie Input formats for hoodie-sync
- Works with HUDI demo. 
- Also tested for backwards compatibility with datasets built by com.uber.hoodie packages
- Migration guide : https://cwiki.apache.org/confluence/display/HUDI/Migration+Guide+From+com.uber.hoodie+to+org.apache.hudi
This commit is contained in:
Balaji Varadarajan
2019-08-11 17:48:17 -07:00
committed by vinoth chandar
parent 722b6be04a
commit a4f9d7575f
546 changed files with 3858 additions and 3562 deletions

View File

@@ -0,0 +1,35 @@
package org.apache.hudi.utilities;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.util.Map;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.SchedulerConfGenerator;
import org.junit.Test;
public class SchedulerConfGeneratorTest {
@Test
public void testGenerateSparkSchedulingConf() throws Exception {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
Map<String, String> configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
assertNull("spark.scheduler.mode not set", configs.get(SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY));
System.setProperty(SchedulerConfGenerator.SPARK_SCHEDULER_MODE_KEY, "FAIR");
cfg.continuousMode = false;
configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
assertNull("continuousMode is false", configs.get(SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY));
cfg.continuousMode = true;
cfg.storageType = HoodieTableType.COPY_ON_WRITE.name();
configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
assertNull("storageType is not MERGE_ON_READ", configs.get(
SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY));
cfg.storageType = HoodieTableType.MERGE_ON_READ.name();
configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
assertNotNull("all satisfies", configs.get(SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY));
}
}

View File

@@ -0,0 +1,287 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.Serializable;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hudi.HoodieReadClient;
import org.apache.hudi.HoodieWriteClient;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.minicluster.HdfsTestService;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestHDFSParquetImporter implements Serializable {
private static String dfsBasePath;
private static HdfsTestService hdfsTestService;
private static MiniDFSCluster dfsCluster;
private static DistributedFileSystem dfs;
@BeforeClass
public static void initClass() throws Exception {
hdfsTestService = new HdfsTestService();
dfsCluster = hdfsTestService.start(true);
// Create a temp folder as the base path
dfs = dfsCluster.getFileSystem();
dfsBasePath = dfs.getWorkingDirectory().toString();
dfs.mkdirs(new Path(dfsBasePath));
}
@AfterClass
public static void cleanupClass() throws Exception {
if (hdfsTestService != null) {
hdfsTestService.stop();
}
}
/**
* Test successful data import with retries.
*/
@Test
public void testDatasetImportWithRetries() throws Exception {
JavaSparkContext jsc = null;
try {
jsc = getJavaSparkContext();
// Test root folder.
String basePath = (new Path(dfsBasePath,
Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
// Hoodie root folder
Path hoodieFolder = new Path(basePath, "testTarget");
// Create schema file.
String schemaFile = new Path(basePath, "file.schema").toString();
//Create generic records.
Path srcFolder = new Path(basePath, "testSrc");
createRecords(srcFolder);
HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(),
hoodieFolder.toString(), "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1,
schemaFile);
AtomicInteger retry = new AtomicInteger(3);
AtomicInteger fileCreated = new AtomicInteger(0);
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg) {
@Override
protected int dataImport(JavaSparkContext jsc) throws IOException {
int ret = super.dataImport(jsc);
if (retry.decrementAndGet() == 0) {
fileCreated.incrementAndGet();
createSchemaFile(schemaFile);
}
return ret;
}
};
// Schema file is not created so this operation should fail.
assertEquals(0, dataImporter.dataImport(jsc, retry.get()));
assertEquals(retry.get(), -1);
assertEquals(fileCreated.get(), 1);
// Check if
// 1. .commit file is present
// 2. number of records in each partition == 24
// 3. total number of partitions == 4;
boolean isCommitFilePresent = false;
Map<String, Long> recordCounts = new HashMap<String, Long>();
RemoteIterator<LocatedFileStatus> hoodieFiles = dfs.listFiles(hoodieFolder, true);
while (hoodieFiles.hasNext()) {
LocatedFileStatus f = hoodieFiles.next();
isCommitFilePresent =
isCommitFilePresent || f.getPath().toString().endsWith(HoodieTimeline.COMMIT_EXTENSION);
if (f.getPath().toString().endsWith("parquet")) {
SQLContext sc = new SQLContext(jsc);
String partitionPath = f.getPath().getParent().toString();
long count = sc.read().parquet(f.getPath().toString()).count();
if (!recordCounts.containsKey(partitionPath)) {
recordCounts.put(partitionPath, 0L);
}
recordCounts.put(partitionPath, recordCounts.get(partitionPath) + count);
}
}
assertTrue("commit file is missing", isCommitFilePresent);
assertEquals("partition is missing", 4, recordCounts.size());
for (Entry<String, Long> e : recordCounts.entrySet()) {
assertEquals("missing records", 24, e.getValue().longValue());
}
} finally {
if (jsc != null) {
jsc.stop();
}
}
}
private void createRecords(Path srcFolder) throws ParseException, IOException {
Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
long startTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000;
List<GenericRecord> records = new ArrayList<GenericRecord>();
for (long recordNum = 0; recordNum < 96; recordNum++) {
records.add(HoodieTestDataGenerator
.generateGenericRecord(Long.toString(recordNum), "rider-" + recordNum,
"driver-" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
}
ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(srcFile)
.withSchema(HoodieTestDataGenerator.avroSchema)
.withConf(HoodieTestUtils.getDefaultHadoopConf())
.build();
for (GenericRecord record : records) {
writer.write(record);
}
writer.close();
}
private void createSchemaFile(String schemaFile) throws IOException {
FSDataOutputStream schemaFileOS = dfs.create(new Path(schemaFile));
schemaFileOS.write(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA.getBytes());
schemaFileOS.close();
}
/**
* Tests for scheme file. 1. File is missing. 2. File has invalid data.
*/
@Test
public void testSchemaFile() throws Exception {
JavaSparkContext jsc = null;
try {
jsc = getJavaSparkContext();
// Test root folder.
String basePath = (new Path(dfsBasePath,
Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
// Hoodie root folder
Path hoodieFolder = new Path(basePath, "testTarget");
Path srcFolder = new Path(basePath.toString(), "srcTest");
Path schemaFile = new Path(basePath.toString(), "missingFile.schema");
HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(),
hoodieFolder.toString(), "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1,
schemaFile.toString());
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
// Should fail - return : -1.
assertEquals(-1, dataImporter.dataImport(jsc, 0));
dfs.create(schemaFile).write("Random invalid schema data".getBytes());
// Should fail - return : -1.
assertEquals(-1, dataImporter.dataImport(jsc, 0));
} finally {
if (jsc != null) {
jsc.stop();
}
}
}
/**
* Test for missing rowKey and partitionKey.
*/
@Test
public void testRowAndPartitionKey() throws Exception {
JavaSparkContext jsc = null;
try {
jsc = getJavaSparkContext();
// Test root folder.
String basePath = (new Path(dfsBasePath,
Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
// Hoodie root folder
Path hoodieFolder = new Path(basePath, "testTarget");
//Create generic records.
Path srcFolder = new Path(basePath, "testSrc");
createRecords(srcFolder);
// Create schema file.
Path schemaFile = new Path(basePath.toString(), "missingFile.schema");
createSchemaFile(schemaFile.toString());
HDFSParquetImporter dataImporter;
HDFSParquetImporter.Config cfg;
// Check for invalid row key.
cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), "testTable",
"COPY_ON_WRITE", "invalidRowKey", "timestamp", 1, schemaFile.toString());
dataImporter = new HDFSParquetImporter(cfg);
assertEquals(-1, dataImporter.dataImport(jsc, 0));
// Check for invalid partition key.
cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), "testTable",
"COPY_ON_WRITE", "_row_key", "invalidTimeStamp", 1, schemaFile.toString());
dataImporter = new HDFSParquetImporter(cfg);
assertEquals(-1, dataImporter.dataImport(jsc, 0));
} finally {
if (jsc != null) {
jsc.stop();
}
}
}
private HDFSParquetImporter.Config getHDFSParquetImporterConfig(String srcPath, String targetPath,
String tableName, String tableType, String rowKey, String partitionKey, int parallelism,
String schemaFile) {
HDFSParquetImporter.Config cfg = new HDFSParquetImporter.Config();
cfg.srcPath = srcPath;
cfg.targetPath = targetPath;
cfg.tableName = tableName;
cfg.tableType = tableType;
cfg.rowKey = rowKey;
cfg.partitionKey = partitionKey;
cfg.parallelism = parallelism;
cfg.schemaFile = schemaFile;
return cfg;
}
private JavaSparkContext getJavaSparkContext() {
// Initialize a local spark env
SparkConf sparkConf = new SparkConf().setAppName("TestConversionCommand").setMaster("local[1]");
sparkConf = HoodieWriteClient.registerClasses(sparkConf);
return new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf));
}
}

View File

@@ -0,0 +1,569 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.SimpleKeyGenerator;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.DFSPropertiesConfiguration;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.exception.DatasetNotFoundException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.DistributedTestDataSource;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.sources.config.TestSourceConfig;
import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF4;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Basic tests against {@link HoodieDeltaStreamer}, by issuing bulk_inserts,
* upserts, inserts. Check counts at the end.
*/
public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties";
private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
private static volatile Logger log = LogManager.getLogger(TestHoodieDeltaStreamer.class);
@BeforeClass
public static void initClass() throws Exception {
UtilitiesTestBase.initClass(true);
// prepare the configs.
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs,
dfsBasePath + "/sql-transformer.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc");
TypedProperties props = new TypedProperties();
props.setProperty("include", "sql-transformer.properties");
props.setProperty("hoodie.datasource.write.keygenerator.class", TestGenerator.class.getName());
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
// Hive Configs
props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), "jdbc:hive2://127.0.0.1:9999/");
props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), "testdb1");
props.setProperty(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), "hive_trips");
props.setProperty(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY(), "false");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "datestr");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
MultiPartKeysValueExtractor.class.getName());
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE);
// Properties used for the delta-streamer which incrementally pulls from upstream Hudi source table and writes to
// downstream hudi table
TypedProperties downstreamProps = new TypedProperties();
downstreamProps.setProperty("include", "base.properties");
downstreamProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
downstreamProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
// Source schema is the target schema of upstream table
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/target.avsc");
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, dfs,
dfsBasePath + "/test-downstream-source.properties");
// Properties used for testing invalid key generator
TypedProperties invalidProps = new TypedProperties();
invalidProps.setProperty("include", "sql-transformer.properties");
invalidProps.setProperty("hoodie.datasource.write.keygenerator.class", "invalid");
invalidProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
invalidProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs,
dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID);
}
@AfterClass
public static void cleanupClass() throws Exception {
UtilitiesTestBase.cleanupClass();
}
@Before
public void setup() throws Exception {
super.setup();
TestDataSource.initDataGen();
}
@After
public void teardown() throws Exception {
super.teardown();
TestDataSource.resetDataGen();
}
static class TestHelpers {
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op) {
return makeConfig(basePath, op, TripsWithDistanceTransformer.class.getName());
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName) {
return makeConfig(basePath, op, transformerClassName, PROPS_FILENAME_TEST_SOURCE, false);
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName,
String propsFilename, boolean enableHiveSync) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips";
cfg.storageType = "COPY_ON_WRITE";
cfg.sourceClassName = TestDataSource.class.getName();
cfg.transformerClassName = transformerClassName;
cfg.operation = op;
cfg.enableHiveSync = enableHiveSync;
cfg.sourceOrderingField = "timestamp";
cfg.propsFilePath = dfsBasePath + "/" + propsFilename;
cfg.sourceLimit = 1000;
cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
return cfg;
}
static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, String basePath, Operation op,
boolean addReadLatestOnMissingCkpt) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips_copy";
cfg.storageType = "COPY_ON_WRITE";
cfg.sourceClassName = HoodieIncrSource.class.getName();
cfg.operation = op;
cfg.sourceOrderingField = "timestamp";
cfg.propsFilePath = dfsBasePath + "/test-downstream-source.properties";
cfg.sourceLimit = 1000;
List<String> cfgs = new ArrayList<>();
cfgs.add("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=" + addReadLatestOnMissingCkpt);
cfgs.add("hoodie.deltastreamer.source.hoodieincr.path=" + srcBasePath);
// No partition
cfgs.add("hoodie.deltastreamer.source.hoodieincr.partition.fields=datestr");
cfg.configs = cfgs;
return cfg;
}
static void assertRecordCount(long expected, String datasetPath, SQLContext sqlContext) {
long recordCount = sqlContext.read().format("org.apache.hudi").load(datasetPath).count();
assertEquals(expected, recordCount);
}
static List<Row> countsPerCommit(String datasetPath, SQLContext sqlContext) {
return sqlContext.read().format("org.apache.hudi").load(datasetPath).groupBy("_hoodie_commit_time").count()
.sort("_hoodie_commit_time").collectAsList();
}
static void assertDistanceCount(long expected, String datasetPath, SQLContext sqlContext) {
sqlContext.read().format("org.apache.hudi").load(datasetPath).registerTempTable("tmp_trips");
long recordCount =
sqlContext.sparkSession().sql("select * from tmp_trips where haversine_distance is not NULL").count();
assertEquals(expected, recordCount);
}
static void assertDistanceCountWithExactValue(long expected, String datasetPath, SQLContext sqlContext) {
sqlContext.read().format("org.apache.hudi").load(datasetPath).registerTempTable("tmp_trips");
long recordCount =
sqlContext.sparkSession().sql("select * from tmp_trips where haversine_distance = 1.0").count();
assertEquals(expected, recordCount);
}
static void assertAtleastNCompactionCommits(int minExpected, String datasetPath, FileSystem fs) {
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath);
HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
log.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numCompactionCommits = (int)timeline.getInstants().count();
assertTrue("Got=" + numCompactionCommits + ", exp >=" + minExpected, minExpected <= numCompactionCommits);
}
static void assertAtleastNDeltaCommits(int minExpected, String datasetPath, FileSystem fs) {
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath);
HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
log.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numDeltaCommits = (int)timeline.getInstants().count();
assertTrue("Got=" + numDeltaCommits + ", exp >=" + minExpected, minExpected <= numDeltaCommits);
}
static String assertCommitMetadata(String expected, String datasetPath, FileSystem fs, int totalCommits)
throws IOException {
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath);
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieInstant lastInstant = timeline.lastInstant().get();
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
timeline.getInstantDetails(lastInstant).get(), HoodieCommitMetadata.class);
assertEquals(totalCommits, timeline.countInstants());
assertEquals(expected, commitMetadata.getMetadata(HoodieDeltaStreamer.CHECKPOINT_KEY));
return lastInstant.getTimestamp();
}
static void waitTillCondition(Function<Boolean, Boolean> condition, long timeoutInSecs) throws Exception {
Future<Boolean> res = Executors.newSingleThreadExecutor().submit(() -> {
boolean ret = false;
while (!ret) {
try {
Thread.sleep(3000);
ret = condition.apply(true);
} catch (Throwable error) {
log.warn("Got error :", error);
ret = false;
}
}
return true;
});
res.get(timeoutInSecs, TimeUnit.SECONDS);
}
}
@Test
public void testProps() throws IOException {
TypedProperties props = new DFSPropertiesConfiguration(
dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getConfig();
assertEquals(2, props.getInteger("hoodie.upsert.shuffle.parallelism"));
assertEquals("_row_key", props.getString("hoodie.datasource.write.recordkey.field"));
assertEquals(
"org.apache.hudi.utilities.TestHoodieDeltaStreamer$TestGenerator",
props.getString("hoodie.datasource.write.keygenerator.class")
);
}
@Test
public void testPropsWithInvalidKeyGenerator() throws Exception {
try {
String datasetBasePath = dfsBasePath + "/test_dataset";
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(
datasetBasePath, Operation.BULK_INSERT, TripsWithDistanceTransformer.class.getName(),
PROPS_FILENAME_TEST_INVALID, false), jsc);
deltaStreamer.sync();
fail("Should error out when setting the key generator class property to an invalid value");
} catch (IOException e) {
//expected
log.error("Expected error during getting the key generator", e);
assertTrue(e.getMessage().contains("Could not load key generator class"));
}
}
@Test
public void testDatasetCreation() throws Exception {
try {
dfs.mkdirs(new Path(dfsBasePath + "/not_a_dataset"));
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(dfsBasePath + "/not_a_dataset", Operation.BULK_INSERT), jsc);
deltaStreamer.sync();
fail("Should error out when pointed out at a dir thats not a dataset");
} catch (DatasetNotFoundException e) {
//expected
log.error("Expected error during dataset creation", e);
}
}
@Test
public void testBulkInsertsAndUpserts() throws Exception {
String datasetBasePath = dfsBasePath + "/test_dataset";
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(datasetBasePath, Operation.BULK_INSERT);
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00000", datasetBasePath, dfs, 1);
// No new data => no commits.
cfg.sourceLimit = 0;
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00000", datasetBasePath, dfs, 1);
// upsert() #1
cfg.sourceLimit = 2000;
cfg.operation = Operation.UPSERT;
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(2000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(2000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2);
List<Row> counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext);
assertEquals(2000, counts.get(0).getLong(1));
}
@Test
public void testUpsertsCOWContinuousMode() throws Exception {
testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow");
}
@Test
public void testUpsertsMORContinuousMode() throws Exception {
testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor");
}
private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception {
String datasetBasePath = dfsBasePath + "/" + tempDir;
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(datasetBasePath, Operation.UPSERT);
cfg.continuousMode = true;
cfg.storageType = tableType.name();
cfg.configs.add(String.format("%s=%d", TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> {
try {
ds.sync();
} catch (Exception ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
});
TestHelpers.waitTillCondition((r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
TestHelpers.assertAtleastNDeltaCommits(5, datasetBasePath, dfs);
TestHelpers.assertAtleastNCompactionCommits(2, datasetBasePath, dfs);
} else {
TestHelpers.assertAtleastNCompactionCommits(5, datasetBasePath, dfs);
}
TestHelpers.assertRecordCount(totalRecords, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(totalRecords, datasetBasePath + "/*/*.parquet", sqlContext);
return true;
}, 180);
ds.shutdownGracefully();
dsFuture.get();
}
/**
* Test Bulk Insert and upserts with hive syncing. Tests Hudi incremental processing using a 2 step pipeline
* The first step involves using a SQL template to transform a source
* TEST-DATA-SOURCE ============================> HUDI TABLE 1 ===============> HUDI TABLE 2
* (incr-pull with transform) (incr-pull)
* Hudi Table 1 is synced with Hive.
* @throws Exception
*/
@Test
public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() throws Exception {
String datasetBasePath = dfsBasePath + "/test_dataset2";
String downstreamDatasetBasePath = dfsBasePath + "/test_downstream_dataset2";
HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(datasetBasePath, "hive_trips");
// Initial bulk insert to ingest to first hudi table
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(datasetBasePath, Operation.BULK_INSERT,
SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true);
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCountWithExactValue(1000, datasetBasePath + "/*/*.parquet", sqlContext);
String lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00000", datasetBasePath, dfs, 1);
// Now incrementally pull from the above hudi table and ingest to second table
HoodieDeltaStreamer.Config downstreamCfg =
TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath, Operation.BULK_INSERT, true);
new HoodieDeltaStreamer(downstreamCfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCountWithExactValue(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamDatasetBasePath, dfs, 1);
// No new data => no commits for upstream table
cfg.sourceLimit = 0;
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCountWithExactValue(1000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00000", datasetBasePath, dfs, 1);
// with no change in upstream table, no change in downstream too when pulled.
new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
TestHelpers.assertRecordCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCountWithExactValue(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamDatasetBasePath, dfs, 1);
// upsert() #1 on upstream hudi table
cfg.sourceLimit = 2000;
cfg.operation = Operation.UPSERT;
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(2000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(2000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCountWithExactValue(2000, datasetBasePath + "/*/*.parquet", sqlContext);
lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2);
List<Row> counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext);
assertEquals(2000, counts.get(0).getLong(1));
// Incrementally pull changes in upstream hudi table and apply to downstream table
downstreamCfg =
TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath, Operation.UPSERT, false);
downstreamCfg.sourceLimit = 2000;
new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
TestHelpers.assertRecordCount(2000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(2000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCountWithExactValue(2000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
String finalInstant =
TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamDatasetBasePath, dfs, 2);
counts = TestHelpers.countsPerCommit(downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
assertEquals(2000, counts.get(0).getLong(1));
// Test Hive integration
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), dfs);
assertTrue("Table " + hiveSyncConfig.tableName + " should exist",
hiveClient.doesTableExist());
assertEquals("Table partitions should match the number of partitions we wrote", 1,
hiveClient.scanTablePartitions().size());
assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES",
lastInstantForUpstreamTable, hiveClient.getLastCommitTimeSynced().get());
}
@Test
public void testFilterDupes() throws Exception {
String datasetBasePath = dfsBasePath + "/test_dupes_dataset";
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(datasetBasePath, Operation.BULK_INSERT);
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00000", datasetBasePath, dfs, 1);
// Generate the same 1000 records + 1000 new ones for upsert
cfg.filterDupes = true;
cfg.sourceLimit = 2000;
cfg.operation = Operation.UPSERT;
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(2000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2);
// 1000 records for commit 00000 & 1000 for commit 00001
List<Row> counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext);
assertEquals(1000, counts.get(0).getLong(1));
assertEquals(1000, counts.get(1).getLong(1));
}
@Test
public void testDistributedTestDataSource() throws Exception {
TypedProperties props = new TypedProperties();
props.setProperty(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, "1000");
props.setProperty(TestSourceConfig.NUM_SOURCE_PARTITIONS_PROP, "1");
props.setProperty(TestSourceConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS, "true");
DistributedTestDataSource distributedTestDataSource = new DistributedTestDataSource(props,
jsc, sparkSession, null);
InputBatch<JavaRDD<GenericRecord>> batch = distributedTestDataSource.fetchNext(Option.empty(), 10000000);
batch.getBatch().get().cache();
long c = batch.getBatch().get().count();
Assert.assertEquals(1000, c);
}
/**
* UDF to calculate Haversine distance
*/
public static class DistanceUDF implements UDF4<Double, Double, Double, Double, Double> {
/**
*
* Taken from https://stackoverflow.com/questions/3694380/calculating-distance-between-two-points-using-latitude-
* longitude-what-am-i-doi
* Calculate distance between two points in latitude and longitude taking
* into account height difference. If you are not interested in height
* difference pass 0.0. Uses Haversine method as its base.
*
* lat1, lon1 Start point lat2, lon2 End point el1 Start altitude in meters
* el2 End altitude in meters
* @returns Distance in Meters
*/
@Override
public Double call(Double lat1, Double lat2, Double lon1, Double lon2) {
final int R = 6371; // Radius of the earth
double latDistance = Math.toRadians(lat2 - lat1);
double lonDistance = Math.toRadians(lon2 - lon1);
double a = Math.sin(latDistance / 2) * Math.sin(latDistance / 2)
+ Math.cos(Math.toRadians(lat1)) * Math.cos(Math.toRadians(lat2))
* Math.sin(lonDistance / 2) * Math.sin(lonDistance / 2);
double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
double distance = R * c * 1000; // convert to meters
double height = 0;
distance = Math.pow(distance, 2) + Math.pow(height, 2);
return Math.sqrt(distance);
}
}
/**
* Adds a new field "haversine_distance" to the row
*/
public static class TripsWithDistanceTransformer implements Transformer {
@Override
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession,
Dataset<Row> rowDataset, TypedProperties properties) {
rowDataset.sqlContext().udf().register("distance_udf", new DistanceUDF(), DataTypes.DoubleType);
return rowDataset.withColumn("haversine_distance",
functions.callUDF("distance_udf", functions.col("begin_lat"),
functions.col("end_lat"), functions.col("begin_lon"), functions.col("end_lat")));
}
}
public static class TestGenerator extends SimpleKeyGenerator {
public TestGenerator(TypedProperties props) {
super(props);
}
}
}

View File

@@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.util.FSUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class TestHoodieSnapshotCopier {
private static String TEST_WRITE_TOKEN = "1-0-1";
private String rootPath = null;
private String basePath = null;
private String outputPath = null;
private FileSystem fs = null;
private JavaSparkContext jsc = null;
@Before
public void init() throws IOException {
try {
// Prepare directories
TemporaryFolder folder = new TemporaryFolder();
folder.create();
rootPath = "file://" + folder.getRoot().getAbsolutePath();
basePath = rootPath + "/" + HoodieTestUtils.RAW_TRIPS_TEST_NAME;
outputPath = rootPath + "/output";
final Configuration hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
fs = FSUtils.getFs(basePath, hadoopConf);
HoodieTestUtils.init(hadoopConf, basePath);
// Start a local Spark job
SparkConf conf = new SparkConf().setAppName("snapshot-test-job").setMaster("local[2]");
jsc = new JavaSparkContext(conf);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testEmptySnapshotCopy() throws IOException {
// There is no real data (only .hoodie directory)
assertEquals(fs.listStatus(new Path(basePath)).length, 1);
assertFalse(fs.exists(new Path(outputPath)));
// Do the snapshot
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
copier.snapshot(jsc, basePath, outputPath, true);
// Nothing changed; we just bail out
assertEquals(fs.listStatus(new Path(basePath)).length, 1);
assertFalse(fs.exists(new Path(outputPath + "/_SUCCESS")));
}
//TODO - uncomment this after fixing test failures
//@Test
public void testSnapshotCopy() throws Exception {
// Generate some commits and corresponding parquets
String commitTime1 = "20160501010101";
String commitTime2 = "20160502020601";
String commitTime3 = "20160506030611";
new File(basePath + "/.hoodie").mkdirs();
new File(basePath + "/.hoodie/hoodie.properties").createNewFile();
// Only first two have commit files
new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
new File(basePath + "/.hoodie/" + commitTime3 + ".inflight").createNewFile();
// Some parquet files
new File(basePath + "/2016/05/01/").mkdirs();
new File(basePath + "/2016/05/02/").mkdirs();
new File(basePath + "/2016/05/06/").mkdirs();
HoodieTestDataGenerator
.writePartitionMetadata(fs, new String[] {"2016/05/01", "2016/05/02", "2016/05/06"},
basePath);
// Make commit1
File file11 = new File(
basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id11"));
file11.createNewFile();
File file12 = new File(
basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id12"));
file12.createNewFile();
File file13 = new File(
basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id13"));
file13.createNewFile();
// Make commit2
File file21 = new File(
basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id21"));
file21.createNewFile();
File file22 = new File(
basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id22"));
file22.createNewFile();
File file23 = new File(
basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id23"));
file23.createNewFile();
// Make commit3
File file31 = new File(
basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id31"));
file31.createNewFile();
File file32 = new File(
basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id32"));
file32.createNewFile();
File file33 = new File(
basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id33"));
file33.createNewFile();
// Do a snapshot copy
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
copier.snapshot(jsc, basePath, outputPath, false);
// Check results
assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file11.getName())));
assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + file12.getName())));
assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + file13.getName())));
assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file21.getName())));
assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + file22.getName())));
assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + file23.getName())));
assertFalse(fs.exists(new Path(outputPath + "/2016/05/01/" + file31.getName())));
assertFalse(fs.exists(new Path(outputPath + "/2016/05/02/" + file32.getName())));
assertFalse(fs.exists(new Path(outputPath + "/2016/05/06/" + file33.getName())));
assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime1 + ".commit")));
assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime2 + ".commit")));
assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 + ".commit")));
assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 + ".inflight")));
assertTrue(fs.exists(new Path(outputPath + "/.hoodie/hoodie.properties")));
assertTrue(fs.exists(new Path(outputPath + "/_SUCCESS")));
}
@After
public void cleanup() {
if (rootPath != null) {
new File(rootPath).delete();
}
if (jsc != null) {
jsc.stop();
}
}
}

View File

@@ -0,0 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities;
import com.google.common.collect.ImmutableList;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.server.HiveServer2;
import org.apache.hudi.common.TestRawTripPayload;
import org.apache.hudi.common.minicluster.HdfsTestService;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.hive.util.HiveTestService;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
/**
* Abstract test that provides a dfs & spark contexts.
*
* TODO(vc): this needs to be done across the board.
*/
public class UtilitiesTestBase {
protected static String dfsBasePath;
protected static HdfsTestService hdfsTestService;
protected static MiniDFSCluster dfsCluster;
protected static DistributedFileSystem dfs;
protected transient JavaSparkContext jsc = null;
protected transient SparkSession sparkSession = null;
protected transient SQLContext sqlContext;
protected static HiveServer2 hiveServer;
@BeforeClass
public static void initClass() throws Exception {
initClass(false);
}
static void initClass(boolean startHiveService) throws Exception {
hdfsTestService = new HdfsTestService();
dfsCluster = hdfsTestService.start(true);
dfs = dfsCluster.getFileSystem();
dfsBasePath = dfs.getWorkingDirectory().toString();
dfs.mkdirs(new Path(dfsBasePath));
if (startHiveService) {
HiveTestService hiveService = new HiveTestService(hdfsTestService.getHadoopConf());
hiveServer = hiveService.start();
clearHiveDb();
}
}
@AfterClass
public static void cleanupClass() throws Exception {
if (hdfsTestService != null) {
hdfsTestService.stop();
}
if (hiveServer != null) {
hiveServer.stop();
}
}
@Before
public void setup() throws Exception {
TestDataSource.initDataGen();
jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", "local[2]");
sqlContext = new SQLContext(jsc);
sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
}
@After
public void teardown() throws Exception {
TestDataSource.resetDataGen();
if (jsc != null) {
jsc.stop();
}
}
/**
* Helper to get hive sync config
* @param basePath
* @param tableName
* @return
*/
protected static HiveSyncConfig getHiveSyncConfig(String basePath, String tableName) {
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
hiveSyncConfig.jdbcUrl = "jdbc:hive2://127.0.0.1:9999/";
hiveSyncConfig.hiveUser = "";
hiveSyncConfig.hivePass = "";
hiveSyncConfig.databaseName = "testdb1";
hiveSyncConfig.tableName = tableName;
hiveSyncConfig.basePath = basePath;
hiveSyncConfig.assumeDatePartitioning = false;
hiveSyncConfig.usePreApacheInputFormat = false;
hiveSyncConfig.partitionFields = new ImmutableList.Builder<String>().add("datestr").build();
return hiveSyncConfig;
}
/**
* Initialize Hive DB
* @throws IOException
*/
private static void clearHiveDb() throws IOException {
HiveConf hiveConf = new HiveConf();
// Create Dummy hive sync config
HiveSyncConfig hiveSyncConfig = getHiveSyncConfig("/dummy", "dummy");
hiveConf.addResource(hiveServer.getHiveConf());
HoodieTableMetaClient.initTableType(dfs.getConf(), hiveSyncConfig.basePath, HoodieTableType.COPY_ON_WRITE,
hiveSyncConfig.tableName, null);
HoodieHiveClient client = new HoodieHiveClient(hiveSyncConfig, hiveConf, dfs);
client.updateHiveSQL("drop database if exists " + hiveSyncConfig.databaseName);
client.updateHiveSQL("create database " + hiveSyncConfig.databaseName);
client.close();
}
public static class Helpers {
// to get hold of resources bundled with jar
private static ClassLoader classLoader = Helpers.class.getClassLoader();
public static void copyToDFS(String testResourcePath, FileSystem fs, String targetPath) throws IOException {
BufferedReader reader = new BufferedReader(
new InputStreamReader(classLoader.getResourceAsStream(testResourcePath)));
PrintStream os = new PrintStream(fs.create(new Path(targetPath), true));
String line;
while ((line = reader.readLine()) != null) {
os.println(line);
}
os.flush();
os.close();
}
public static void savePropsToDFS(TypedProperties props, FileSystem fs, String targetPath) throws IOException {
String[] lines = props.keySet().stream().map(k -> String.format("%s=%s", k, props.get(k))).toArray(String[]::new);
saveStringsToDFS(lines, fs, targetPath);
}
public static void saveStringsToDFS(String[] lines, FileSystem fs, String targetPath) throws IOException {
PrintStream os = new PrintStream(fs.create(new Path(targetPath), true));
for (String l : lines) {
os.println(l);
}
os.flush();
os.close();
}
public static TypedProperties setupSchemaOnDFS() throws IOException {
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc");
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
return props;
}
public static String toJsonString(HoodieRecord hr) {
try {
return ((TestRawTripPayload) hr.getData()).getJsonData();
} catch (IOException ioe) {
return null;
}
}
public static String[] jsonifyRecords(List<HoodieRecord> records) throws IOException {
return records.stream().map(Helpers::toJsonString).toArray(String[]::new);
}
}
}

View File

@@ -0,0 +1,105 @@
package org.apache.hudi.utilities.sources;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.common.util.collection.RocksDBBasedMap;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.config.TestSourceConfig;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
public abstract class AbstractBaseTestSource extends AvroSource {
static final int DEFAULT_PARTITION_NUM = 0;
// Static instance, helps with reuse across a test.
protected static transient Map<Integer, HoodieTestDataGenerator> dataGeneratorMap = new HashMap<>();
public static void initDataGen() {
dataGeneratorMap.putIfAbsent(DEFAULT_PARTITION_NUM,
new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS));
}
public static void initDataGen(TypedProperties props, int partition) {
try {
boolean useRocksForTestDataGenKeys = props.getBoolean(TestSourceConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS,
TestSourceConfig.DEFAULT_USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS);
String baseStoreDir = props.getString(TestSourceConfig.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS,
File.createTempFile("test_data_gen", ".keys").getParent()) + "/" + partition;
log.info("useRocksForTestDataGenKeys=" + useRocksForTestDataGenKeys + ", BaseStoreDir=" + baseStoreDir);
dataGeneratorMap.put(partition, new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS,
useRocksForTestDataGenKeys ? new RocksDBBasedMap<>(baseStoreDir) : new HashMap<>()));
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}
public static void resetDataGen() {
for (HoodieTestDataGenerator dataGenerator : dataGeneratorMap.values()) {
dataGenerator.close();
}
dataGeneratorMap.clear();
}
protected AbstractBaseTestSource(TypedProperties props,
JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
}
protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props, int sourceLimit, String commitTime,
int partition) {
int maxUniqueKeys = props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP,
TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS);
HoodieTestDataGenerator dataGenerator = dataGeneratorMap.get(partition);
// generate `sourceLimit` number of upserts each time.
int numExistingKeys = dataGenerator.getNumExistingKeys();
log.info("NumExistingKeys=" + numExistingKeys);
int numUpdates = Math.min(numExistingKeys, sourceLimit / 2);
int numInserts = sourceLimit - numUpdates;
log.info("Before adjustments => numInserts=" + numInserts + ", numUpdates=" + numUpdates);
if (numInserts + numExistingKeys > maxUniqueKeys) {
// Limit inserts so that maxUniqueRecords is maintained
numInserts = Math.max(0, maxUniqueKeys - numExistingKeys);
}
if ((numInserts + numUpdates) < sourceLimit) {
// try to expand updates to safe limit
numUpdates = Math.min(numExistingKeys, sourceLimit - numInserts);
}
log.info("NumInserts=" + numInserts + ", NumUpdates=" + numUpdates + ", maxUniqueRecords=" + maxUniqueKeys);
long memoryUsage1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
log.info("Before DataGen. Memory Usage=" + memoryUsage1 + ", Total Memory=" + Runtime.getRuntime().totalMemory()
+ ", Free Memory=" + Runtime.getRuntime().freeMemory());
Stream<GenericRecord> updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates)
.map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
Stream<GenericRecord> insertStream = dataGenerator.generateInsertsStream(commitTime, numInserts)
.map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
return Stream.concat(updateStream, insertStream);
}
private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, HoodieTestDataGenerator dataGenerator) {
try {
Option<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.avroSchema);
return (GenericRecord) recordOpt.get();
} catch (IOException e) {
return null;
}
}
}

View File

@@ -0,0 +1,79 @@
/*
* Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package org.apache.hudi.utilities.sources;
import java.util.Iterator;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.config.TestSourceConfig;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
/**
* A Test DataSource which scales test-data generation by using spark parallelism.
*/
public class DistributedTestDataSource extends AbstractBaseTestSource {
private final int numTestSourcePartitions;
public DistributedTestDataSource(TypedProperties props,
JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
this.numTestSourcePartitions = props.getInteger(TestSourceConfig.NUM_SOURCE_PARTITIONS_PROP,
TestSourceConfig.DEFAULT_NUM_SOURCE_PARTITIONS);
}
@Override
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
int nextCommitNum = lastCkptStr.map(s -> Integer.parseInt(s) + 1).orElse(0);
String commitTime = String.format("%05d", nextCommitNum);
log.info("Source Limit is set to " + sourceLimit);
// No new data.
if (sourceLimit <= 0) {
return new InputBatch<>(Option.empty(), commitTime);
}
TypedProperties newProps = new TypedProperties();
newProps.putAll(props);
// Set the maxUniqueRecords per partition for TestDataSource
int maxUniqueRecords = props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP,
TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS);
String maxUniqueRecordsPerPartition = String.valueOf(Math.max(1, maxUniqueRecords / numTestSourcePartitions));
newProps.setProperty(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, maxUniqueRecordsPerPartition);
int perPartitionSourceLimit = Math.max(1, (int) (sourceLimit / numTestSourcePartitions));
JavaRDD<GenericRecord> avroRDD = sparkContext.parallelize(IntStream.range(0, numTestSourcePartitions).boxed()
.collect(Collectors.toList()), numTestSourcePartitions).mapPartitionsWithIndex((p, idx) -> {
log.info("Initializing source with newProps=" + newProps);
if (!dataGeneratorMap.containsKey(p)) {
initDataGen(newProps, p);
}
Iterator<GenericRecord> itr = fetchNextBatch(newProps, perPartitionSourceLimit, commitTime, p).iterator();
return itr;
}, true);
return new InputBatch<>(Option.of(avroRDD), commitTime);
}
}

View File

@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.sources;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.utilities.UtilitiesTestBase;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Basic tests against all subclasses of {@link JsonDFSSource}
*/
public class TestDFSSource extends UtilitiesTestBase {
private FilebasedSchemaProvider schemaProvider;
@BeforeClass
public static void initClass() throws Exception {
UtilitiesTestBase.initClass();
}
@AfterClass
public static void cleanupClass() throws Exception {
UtilitiesTestBase.cleanupClass();
}
@Before
public void setup() throws Exception {
super.setup();
schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc);
}
@After
public void teardown() throws Exception {
super.teardown();
}
@Test
public void testJsonDFSSource() throws IOException {
dfs.mkdirs(new Path(dfsBasePath + "/jsonFiles"));
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsBasePath + "/jsonFiles");
JsonDFSSource jsonDFSSource = new JsonDFSSource(props, jsc, sparkSession, schemaProvider);
SourceFormatAdapter jsonSource = new SourceFormatAdapter(jsonDFSSource);
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
assertEquals(Option.empty(), jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
UtilitiesTestBase.Helpers.saveStringsToDFS(
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 100)), dfs,
dfsBasePath + "/jsonFiles/1.json");
assertEquals(Option.empty(), jsonSource.fetchNewDataInAvroFormat(Option.empty(), 10).getBatch());
InputBatch<JavaRDD<GenericRecord>> fetch1 =
jsonSource.fetchNewDataInAvroFormat(Option.empty(), 1000000);
assertEquals(100, fetch1.getBatch().get().count());
// Test json -> Row format
InputBatch<Dataset<Row>> fetch1AsRows =
jsonSource.fetchNewDataInRowFormat(Option.empty(), 1000000);
assertEquals(100, fetch1AsRows.getBatch().get().count());
// Test Avro -> Row format
Dataset<Row> fetch1Rows = AvroConversionUtils.createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()),
schemaProvider.getSourceSchema().toString(), jsonDFSSource.getSparkSession());
assertEquals(100, fetch1Rows.count());
// 2. Produce new data, extract new data
UtilitiesTestBase.Helpers.saveStringsToDFS(
Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 10000)),
dfs, dfsBasePath + "/jsonFiles/2.json");
InputBatch<Dataset<Row>> fetch2 = jsonSource.fetchNewDataInRowFormat(
Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(10000, fetch2.getBatch().get().count());
// 3. Extract with previous checkpoint => gives same data back (idempotent)
InputBatch<Dataset<Row>> fetch3 = jsonSource.fetchNewDataInRowFormat(
Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(10000, fetch3.getBatch().get().count());
assertEquals(fetch2.getCheckpointForNextBatch(), fetch3.getCheckpointForNextBatch());
fetch3.getBatch().get().registerTempTable("test_dfs_table");
Dataset<Row> rowDataset = new SQLContext(jsc.sc()).sql("select * from test_dfs_table");
assertEquals(10000, rowDataset.count());
// 4. Extract with latest checkpoint => no new data returned
InputBatch<JavaRDD<GenericRecord>> fetch4 = jsonSource.fetchNewDataInAvroFormat(
Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(Option.empty(), fetch4.getBatch());
}
}

View File

@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.sources;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
/**
* An implementation of {@link Source}, that emits test upserts.
*/
public class TestDataSource extends AbstractBaseTestSource {
private static volatile Logger log = LogManager.getLogger(TestDataSource.class);
public TestDataSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
initDataGen();
}
@Override
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr,
long sourceLimit) {
int nextCommitNum = lastCheckpointStr.map(s -> Integer.parseInt(s) + 1).orElse(0);
String commitTime = String.format("%05d", nextCommitNum);
log.info("Source Limit is set to " + sourceLimit);
// No new data.
if (sourceLimit <= 0) {
return new InputBatch<>(Option.empty(), commitTime);
}
List<GenericRecord> records = fetchNextBatch(props, (int)sourceLimit, commitTime, DEFAULT_PARTITION_NUM)
.collect(Collectors.toList());
JavaRDD<GenericRecord> avroRDD = sparkContext.<GenericRecord>parallelize(records, 4);
return new InputBatch<>(Option.of(avroRDD), commitTime);
}
}

View File

@@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.sources;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.HashMap;
import kafka.common.TopicAndPartition;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.utilities.UtilitiesTestBase;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset;
import org.apache.spark.streaming.kafka.KafkaTestUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Tests against {@link AvroKafkaSource}
*/
public class TestKafkaSource extends UtilitiesTestBase {
private static String TEST_TOPIC_NAME = "hoodie_test";
private FilebasedSchemaProvider schemaProvider;
private KafkaTestUtils testUtils;
@BeforeClass
public static void initClass() throws Exception {
UtilitiesTestBase.initClass();
}
@AfterClass
public static void cleanupClass() throws Exception {
UtilitiesTestBase.cleanupClass();
}
@Before
public void setup() throws Exception {
super.setup();
schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc);
testUtils = new KafkaTestUtils();
testUtils.setup();
}
@After
public void teardown() throws Exception {
super.teardown();
testUtils.teardown();
}
@Test
public void testJsonKafkaSource() throws IOException {
// topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
props.setProperty("metadata.broker.list", testUtils.brokerAddress());
props.setProperty("auto.offset.reset", "smallest");
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
assertEquals(900, fetch1.getBatch().get().count());
// Test Avro To DataFrame<Row> path
Dataset<Row> fetch1AsRows = AvroConversionUtils.createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()),
schemaProvider.getSourceSchema().toString(), jsonSource.getSparkSession());
assertEquals(900, fetch1AsRows.count());
// 2. Produce new data, extract new data
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
InputBatch<Dataset<Row>> fetch2 = kafkaSource.fetchNewDataInRowFormat(
Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(1100, fetch2.getBatch().get().count());
// 3. Extract with previous checkpoint => gives same data back (idempotent)
InputBatch<JavaRDD<GenericRecord>> fetch3 = kafkaSource.fetchNewDataInAvroFormat(
Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(fetch2.getBatch().get().count(), fetch3.getBatch().get().count());
assertEquals(fetch2.getCheckpointForNextBatch(), fetch3.getCheckpointForNextBatch());
// Same using Row API
InputBatch<Dataset<Row>> fetch3AsRows =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(fetch2.getBatch().get().count(), fetch3AsRows.getBatch().get().count());
assertEquals(fetch2.getCheckpointForNextBatch(), fetch3AsRows.getCheckpointForNextBatch());
// 4. Extract with latest checkpoint => no new data returned
InputBatch<JavaRDD<GenericRecord>> fetch4 = kafkaSource.fetchNewDataInAvroFormat(
Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(Option.empty(), fetch4.getBatch());
// Same using Row API
InputBatch<Dataset<Row>> fetch4AsRows =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(Option.empty(), fetch4AsRows.getBatch());
}
private static HashMap<TopicAndPartition, LeaderOffset> makeOffsetMap(int[] partitions, long[] offsets) {
HashMap<TopicAndPartition, LeaderOffset> map = new HashMap<>();
for (int i = 0; i < partitions.length; i++) {
map.put(new TopicAndPartition(TEST_TOPIC_NAME, partitions[i]), new LeaderOffset("", -1, offsets[i]));
}
return map;
}
@Test
public void testComputeOffsetRanges() {
// test totalNewMessages()
long totalMsgs = CheckpointUtils.totalNewMessages(new OffsetRange[]{
OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100),
OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200)
});
assertEquals(200, totalMsgs);
// should consume all the full data
OffsetRange[] ranges = CheckpointUtils.computeOffsetRanges(
makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}),
makeOffsetMap(new int[]{0, 1}, new long[]{300000, 350000}),
1000000L
);
assertEquals(200000, CheckpointUtils.totalNewMessages(ranges));
// should only consume upto limit
ranges = CheckpointUtils.computeOffsetRanges(
makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}),
makeOffsetMap(new int[]{0, 1}, new long[]{300000, 350000}),
10000
);
assertEquals(10000, CheckpointUtils.totalNewMessages(ranges));
assertEquals(200000, ranges[0].fromOffset());
assertEquals(205000, ranges[0].untilOffset());
assertEquals(250000, ranges[1].fromOffset());
assertEquals(255000, ranges[1].untilOffset());
// should also consume from new partitions.
ranges = CheckpointUtils.computeOffsetRanges(
makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}),
makeOffsetMap(new int[]{0, 1, 2}, new long[]{300000, 350000, 100000}),
1000000L
);
assertEquals(300000, CheckpointUtils.totalNewMessages(ranges));
assertEquals(3, ranges.length);
// for skewed offsets, does not starve any partition & can catch up
ranges = CheckpointUtils.computeOffsetRanges(
makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}),
makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000, 10000}),
100000
);
assertEquals(100000, CheckpointUtils.totalNewMessages(ranges));
assertEquals(10, ranges[0].count());
assertEquals(89990, ranges[1].count());
assertEquals(10000, ranges[2].count());
ranges = CheckpointUtils.computeOffsetRanges(
makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}),
makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000, 10000}),
1000000
);
assertEquals(110010, CheckpointUtils.totalNewMessages(ranges));
assertEquals(10, ranges[0].count());
assertEquals(100000, ranges[1].count());
assertEquals(10000, ranges[2].count());
}
}

View File

@@ -0,0 +1,43 @@
/*
* Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package org.apache.hudi.utilities.sources.config;
/**
* Configurations for Test Data Sources
*/
public class TestSourceConfig {
// Used by DistributedTestDataSource only. Number of partitions where each partitions generates test-data
public static final String NUM_SOURCE_PARTITIONS_PROP = "hoodie.deltastreamer.source.test.num_partitions";
public static final Integer DEFAULT_NUM_SOURCE_PARTITIONS = 10;
// Maximum number of unique records generated for the run
public static final String MAX_UNIQUE_RECORDS_PROP = "hoodie.deltastreamer.source.test.max_unique_records";
public static final Integer DEFAULT_MAX_UNIQUE_RECORDS = Integer.MAX_VALUE;
// Use Rocks DB for storing datagen keys
public static final String USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS =
"hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys";
public static final Boolean DEFAULT_USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS = false;
// Base Dir for storing datagen keys
public static final String ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS =
"hoodie.deltastreamer.source.test.datagen.rocksdb_base_dir";
}

View File

@@ -0,0 +1,8 @@
CREATE TABLE <tempDbTable>
<storedAsClause>
LOCATION '<tempDbTablePath>'
AS
<incrementalSQL>

View File

@@ -0,0 +1,20 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
hoodie.upsert.shuffle.parallelism=2
hoodie.insert.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2

View File

@@ -0,0 +1,27 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
include=base.properties
# Key generator props
hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.partitionpath.field=driver
# Schema provider props (change to absolute path based on your installation)
hoodie.deltastreamer.filebased.schemaprovider.source.schema.file=file:///path/to/hoodie/hoodie-utilities/src/main/resources/delta-streamer-props/source.avsc
hoodie.deltastreamer.filebased.schemaprovider.target.schema.file=file:///path/to/hoodie/hoodie-utilities/src/main/resources/delta-streamer-props/target.avsc
# DFS Source
hoodie.deltastreamer.source.dfs.root=file:///tmp/hoodie-dfs-input

View File

@@ -0,0 +1,30 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
include=base.properties
# Key fields, for kafka example
hoodie.datasource.write.recordkey.field=impressionid
hoodie.datasource.write.partitionpath.field=userid
# schema provider configs
hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latest
# Kafka Source
#hoodie.deltastreamer.source.kafka.topic=uber_trips
hoodie.deltastreamer.source.kafka.topic=impressions
#Kafka props
metadata.broker.list=localhost:9092
auto.offset.reset=smallest
schema.registry.url=http://localhost:8081

View File

@@ -0,0 +1,34 @@
{
"type" : "record",
"name" : "triprec",
"fields" : [
{
"name" : "timestamp",
"type" : "double"
}, {
"name" : "_row_key",
"type" : "string"
}, {
"name" : "rider",
"type" : "string"
}, {
"name" : "driver",
"type" : "string"
}, {
"name" : "begin_lat",
"type" : "double"
}, {
"name" : "begin_lon",
"type" : "double"
}, {
"name" : "end_lat",
"type" : "double"
}, {
"name" : "end_lon",
"type" : "double"
}, {
"name" : "fare",
"type" : "double"
} ]
}

View File

@@ -0,0 +1,19 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
include=base.properties
hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.fare, CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a

View File

@@ -0,0 +1,37 @@
{
"type" : "record",
"name" : "triprec",
"fields" : [
{
"name" : "timestamp",
"type" : "double"
}, {
"name" : "_row_key",
"type" : "string"
}, {
"name" : "rider",
"type" : "string"
}, {
"name" : "driver",
"type" : "string"
}, {
"name" : "begin_lat",
"type" : "double"
}, {
"name" : "begin_lon",
"type" : "double"
}, {
"name" : "end_lat",
"type" : "double"
}, {
"name" : "end_lon",
"type" : "double"
}, {
"name" : "fare",
"type" : "double"
}, {
"name" : "haversine_distance",
"type" : "double"
}]
}

View File

@@ -0,0 +1,23 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
log4j.rootLogger=WARN, A1
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=[%-5p] %d %c %x - %m%n

View File

@@ -0,0 +1,25 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
log4j.rootLogger=WARN, A1
log4j.category.com.uber=INFO
log4j.category.org.apache.parquet.hadoop=WARN
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n