From 274aaf49fedc9afd97092e169b970d600eeda7b1 Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Mon, 2 Oct 2017 20:29:07 -0700 Subject: [PATCH] Incorporating code review feedback for DataSource --- docs/configurations.md | 4 +- .../hoodie/common/HoodieClientTestUtils.java | 10 +- .../hoodie/common/util/SchemaTestUtil.java | 8 ++ .../hoodie/hadoop/InputFormatTestUtil.java | 23 +--- .../HoodieRealtimeRecordReaderTest.java | 2 +- .../java/com/uber/hoodie/DataSourceUtils.java | 5 +- .../uber/hoodie/HoodieDataSourceHelpers.java | 2 - .../com/uber/hoodie/DataSourceOptions.scala | 7 +- hoodie-spark/src/test/java/HoodieJavaApp.java | 7 +- .../test/scala/DataSourceDefaultsTest.scala | 110 ++++++++++++++++++ .../src/test/scala/DataSourceTest.scala | 6 +- 11 files changed, 141 insertions(+), 43 deletions(-) create mode 100644 hoodie-spark/src/test/scala/DataSourceDefaultsTest.scala diff --git a/docs/configurations.md b/docs/configurations.md index 88b3a5a24..51f2d315b 100644 --- a/docs/configurations.md +++ b/docs/configurations.md @@ -97,7 +97,7 @@ summary: "Here we list all possible configurations and what they mean" - [OPERATION_OPT_KEY](#OPERATION_OPT_KEY) (Default: upsert)
whether to do upsert, insert or bulkinsert for the write operation - [STORAGE_TYPE_OPT_KEY](#STORAGE_TYPE_OPT_KEY) (Default: COPY_ON_WRITE)
- The storage type for the underlying data, for this write. + The storage type for the underlying data, for this write. This can't change between writes. - [TABLE_NAME_OPT_KEY](#TABLE_NAME_OPT_KEY) (Default: None (mandatory))
Hive table name, to register the dataset into. - [PRECOMBINE_FIELD_OPT_KEY](#PRECOMBINE_FIELD_OPT_KEY) (Default: ts)
@@ -121,7 +121,7 @@ summary: "Here we list all possible configurations and what they mean" - [read options](#readoptions) (read.format.option(...))
Options useful for reading datasets - - [VIEW_TYPE_OPT_KEY](#VIEW_TYPE_OPT_KEY) (Default: = READ_OPTIMIZED)
+ - [VIEW_TYPE_OPT_KEY](#VIEW_TYPE_OPT_KEY) (Default: = read_optimized)
Whether data needs to be read, in incremental mode (new data since an instantTime) (or) Read Optimized mode (obtain latest view, based on columnar data) (or) Real time mode (obtain latest view, based on row & columnar data) diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java index e0148f93b..002b6cd20 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java @@ -107,9 +107,9 @@ public class HoodieClientTestUtils { return HoodieReadClient.addHoodieSupport(sparkConf); } - public static HashMap getLatestFileIsToFullPath(String basePath, - HoodieTimeline commitTimeline, - List commitsToReturn) throws IOException { + public static HashMap getLatestFileIDsToFullPath(String basePath, + HoodieTimeline commitTimeline, + List commitsToReturn) throws IOException { HashMap fileIdToFullPath = new HashMap<>(); for (HoodieInstant commit : commitsToReturn) { HoodieCommitMetadata metadata = @@ -129,7 +129,7 @@ public class HoodieClientTestUtils { new HoodieException("No commit exists at " + commitTime); } try { - HashMap paths = getLatestFileIsToFullPath(basePath, commitTimeline, Arrays.asList(commitInstant)); + HashMap paths = getLatestFileIDsToFullPath(basePath, commitTimeline, Arrays.asList(commitInstant)); return sqlContext.read() .parquet(paths.values().toArray(new String[paths.size()])) .filter(String.format("%s ='%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitTime)); @@ -150,7 +150,7 @@ public class HoodieClientTestUtils { .getInstants().collect(Collectors.toList()); try { // Go over the commit metadata, and obtain the new files that need to be read. - HashMap fileIdToFullPath = getLatestFileIsToFullPath(basePath, commitTimeline, commitsToReturn); + HashMap fileIdToFullPath = getLatestFileIDsToFullPath(basePath, commitTimeline, commitsToReturn); return sqlContext.read() .parquet(fileIdToFullPath.values().toArray(new String[fileIdToFullPath.size()])) .filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, lastCommitTime)); diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java index cab3d6d37..e42137391 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java @@ -16,6 +16,7 @@ package com.uber.hoodie.common.util; +import com.uber.hoodie.avro.MercifulJsonConverter; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.exception.HoodieIOException; import java.net.URI; @@ -118,4 +119,11 @@ public class SchemaTestUtil { return new Schema.Parser() .parse(SchemaTestUtil.class.getResourceAsStream("/complex-test-evolved.avro")); } + + public static GenericRecord generateAvroRecordFromJson(Schema schema, int recordNumber, + String commitTime, String fileId) throws IOException { + TestRecord record = new TestRecord(commitTime, recordNumber, fileId); + MercifulJsonConverter converter = new MercifulJsonConverter(schema); + return converter.convert(record.toJsonString()); + } } diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java index d48bfa22c..ae57a4fa5 100644 --- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java @@ -20,6 +20,7 @@ import com.uber.hoodie.avro.MercifulJsonConverter; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.SchemaTestUtil; import com.uber.hoodie.common.util.TestRecord; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -120,31 +121,11 @@ public class InputFormatTestUtil { private static Iterable generateAvroRecords(Schema schema, int numberOfRecords, String commitTime, String fileId) throws IOException { List records = new ArrayList<>(numberOfRecords); for(int i=0;i records = new ArrayList<>(); for(int i=0; i < numberOfRecords; i++) { - records.add(InputFormatTestUtil.generateAvroRecordFromJson(schema, i, newCommit, "fileid0")); + records.add(SchemaTestUtil.generateAvroRecordFromJson(schema, i, newCommit, "fileid0")); } Schema writeSchema = records.get(0).getSchema(); HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, writeSchema); diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java index b78da7955..2f68c14b9 100644 --- a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java @@ -59,7 +59,8 @@ public class DataSourceUtils { if (i == parts.length - 1) { return val.toString(); } else { - if (val instanceof GenericRecord) { + // VC: Need a test here + if (!(val instanceof GenericRecord)) { throw new HoodieException("Cannot find a record at part value :" + part); } valueNode = (GenericRecord) val; @@ -80,7 +81,7 @@ public class DataSourceUtils { } /** - * Create a payload class via reflection, passing in an ordering/precombine value value. + * Create a payload class via reflection, passing in an ordering/precombine value. */ public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal) throws IOException { try { diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/HoodieDataSourceHelpers.java b/hoodie-spark/src/main/java/com/uber/hoodie/HoodieDataSourceHelpers.java index 2d5a6907d..d43a81dd6 100644 --- a/hoodie-spark/src/main/java/com/uber/hoodie/HoodieDataSourceHelpers.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/HoodieDataSourceHelpers.java @@ -49,7 +49,6 @@ public class HoodieDataSourceHelpers { * Get a list of instant times that have occurred, from the given instant timestamp. * * @param instantTimestamp - * @return */ public static List listCommitsSince(FileSystem fs, String basePath, String instantTimestamp) { HoodieTimeline timeline = allCompletedCommitsCompactions(fs, basePath); @@ -71,7 +70,6 @@ public class HoodieDataSourceHelpers { * * @param fs * @param basePath - * @return */ public static HoodieTimeline allCompletedCommitsCompactions(FileSystem fs, String basePath) { HoodieTable table = HoodieTable diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala index cb799c99b..c4e3307c9 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala @@ -38,9 +38,9 @@ object DataSourceReadOptions { * Default: READ_OPTIMIZED */ val VIEW_TYPE_OPT_KEY = "hoodie.datasource.view.type" - val VIEW_TYPE_READ_OPTIMIZED_OPT_VAL = "READ_OPTIMIZED" - val VIEW_TYPE_INCREMENTAL_OPT_VAL = "INCREMENTAL" - val VIEW_TYPE_REALTIME_OPT_VAL = "REALTIME" + val VIEW_TYPE_READ_OPTIMIZED_OPT_VAL = "read_optimized" + val VIEW_TYPE_INCREMENTAL_OPT_VAL = "incremental" + val VIEW_TYPE_REALTIME_OPT_VAL = "realtime" val DEFAULT_VIEW_TYPE_OPT_VAL = VIEW_TYPE_READ_OPTIMIZED_OPT_VAL @@ -82,6 +82,7 @@ object DataSourceWriteOptions { /** * The storage type for the underlying data, for this write. + * Note that this can't change across writes. * * Default: COPY_ON_WRITE */ diff --git a/hoodie-spark/src/test/java/HoodieJavaApp.java b/hoodie-spark/src/test/java/HoodieJavaApp.java index bf791140a..c61d8cca4 100644 --- a/hoodie-spark/src/test/java/HoodieJavaApp.java +++ b/hoodie-spark/src/test/java/HoodieJavaApp.java @@ -66,9 +66,6 @@ public class HoodieJavaApp { cli.run(); } - - - public void run() throws Exception { // Spark session setup.. @@ -103,6 +100,7 @@ public class HoodieJavaApp { .mode(SaveMode.Overwrite) // This will remove any existing data at path below, and create a new dataset if needed .save(tablePath); // ultimately where the dataset will be placed String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, tablePath); + logger.info("First commit at instant time :" + commitInstantTime1); /** * Commit that updates records @@ -120,6 +118,7 @@ public class HoodieJavaApp { .mode(SaveMode.Append) .save(tablePath); String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath); + logger.info("Second commit at instant time :" + commitInstantTime1); /** * Read & do some queries @@ -142,7 +141,7 @@ public class HoodieJavaApp { .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), commitInstantTime1) // Only changes in write 2 above .load(tablePath); // For incremental view, pass in the root/base path of dataset - System.out.println("You will only see records from : " + commitInstantTime2); + logger.info("You will only see records from : " + commitInstantTime2); hoodieIncViewDF.groupBy(hoodieIncViewDF.col("_hoodie_commit_time")).count().show(); } } diff --git a/hoodie-spark/src/test/scala/DataSourceDefaultsTest.scala b/hoodie-spark/src/test/scala/DataSourceDefaultsTest.scala new file mode 100644 index 000000000..2996c46fd --- /dev/null +++ b/hoodie-spark/src/test/scala/DataSourceDefaultsTest.scala @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +import com.uber.hoodie.{DataSourceWriteOptions, OverwriteWithLatestAvroPayload, SimpleKeyGenerator} +import com.uber.hoodie.common.util.SchemaTestUtil +import com.uber.hoodie.exception.HoodieException +import org.apache.avro.generic.GenericRecord +import org.apache.commons.configuration.PropertiesConfiguration +import org.junit.Assert._ +import org.junit.{Before, Test} +import org.scalatest.junit.AssertionsForJUnit + +/** + * Tests on the default key generator, payload classes. + */ +class DataSourceDefaultsTest extends AssertionsForJUnit { + + val schema = SchemaTestUtil.getComplexEvolvedSchema + var baseRecord : GenericRecord = null + + @Before def initialize(): Unit = { + baseRecord = SchemaTestUtil + .generateAvroRecordFromJson(schema, 1, "001", "f1") + } + + + private def getKeyConfig(recordKeyFieldName: String, paritionPathField: String): PropertiesConfiguration = { + val props = new PropertiesConfiguration() + props.addProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, recordKeyFieldName) + props.addProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, paritionPathField) + props + } + @Test def testSimpleKeyGenerator() = { + // top level, valid fields + val hk1 = new SimpleKeyGenerator(getKeyConfig("field1", "name")).getKey(baseRecord) + assertEquals("field1", hk1.getRecordKey) + assertEquals("name1", hk1.getPartitionPath) + + // recordKey field not specified + try { + val props = new PropertiesConfiguration() + props.addProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "field1") + new SimpleKeyGenerator(props).getKey(baseRecord) + fail("Should have errored out") + } catch { + case e: HoodieException => { + // do nothing + } + }; + + // partitionPath field is null + try { + new SimpleKeyGenerator(getKeyConfig("field1", null)).getKey(baseRecord) + fail("Should have errored out") + } catch { + case e: HoodieException => { + // do nothing + } + }; + + // nested field as record key and partition path + val hk2 = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.isAdmin")) + .getKey(baseRecord) + assertEquals("UserId1@001", hk2.getRecordKey) + assertEquals("false", hk2.getPartitionPath) + + // Nested record key not found + try { + new SimpleKeyGenerator(getKeyConfig("testNestedRecord.NotThere", "testNestedRecord.isAdmin")) + .getKey(baseRecord) + fail("Should have errored out") + } catch { + case e: HoodieException => { + // do nothing + } + }; + } + + @Test def testOverwriteWithLatestAvroPayload() = { + val overWritePayload1 = new OverwriteWithLatestAvroPayload(baseRecord, 1) + val laterRecord = SchemaTestUtil + .generateAvroRecordFromJson(schema, 2, "001", "f1") + val overWritePayload2 = new OverwriteWithLatestAvroPayload(laterRecord, 2) + + // it will provide the record with greatest combine value + val combinedPayload12 = overWritePayload1.preCombine(overWritePayload2) + val combinedGR12 = combinedPayload12.getInsertValue(schema).get().asInstanceOf[GenericRecord] + assertEquals("field2", combinedGR12.get("field1")) + + // and it will be deterministic, to order of processing. + val combinedPayload21 = overWritePayload2.preCombine(overWritePayload1) + val combinedGR21 = combinedPayload21.getInsertValue(schema).get().asInstanceOf[GenericRecord] + assertEquals("field2", combinedGR21.get("field1")) + } +} diff --git a/hoodie-spark/src/test/scala/DataSourceTest.scala b/hoodie-spark/src/test/scala/DataSourceTest.scala index 35e2b3b58..764206f24 100644 --- a/hoodie-spark/src/test/scala/DataSourceTest.scala +++ b/hoodie-spark/src/test/scala/DataSourceTest.scala @@ -21,7 +21,7 @@ import com.uber.hoodie.common.util.FSUtils import com.uber.hoodie.config.HoodieWriteConfig import com.uber.hoodie.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} +import org.apache.spark.sql._ import org.junit.Assert._ import org.junit.{Before, Test} import org.junit.rules.TemporaryFolder @@ -127,9 +127,9 @@ class DataSourceTest extends AssertionsForJUnit { try { val hoodieROViewDF1 = spark.read.format("com.uber.hoodie") .load(basePath + "/*/*/*/*") - fail() // we would error out, since no compaction has yet occurred. + fail("we should error out, since no compaction has yet occurred.") } catch { - case e: Exception => { + case e: AnalysisException => { // do nothing } };