1
0

Incorporating code review feedback for DataSource

This commit is contained in:
Vinoth Chandar
2017-10-02 20:29:07 -07:00
committed by vinoth chandar
parent 64e0573aca
commit 274aaf49fe
11 changed files with 141 additions and 43 deletions

View File

@@ -97,7 +97,7 @@ summary: "Here we list all possible configurations and what they mean"
- [OPERATION_OPT_KEY](#OPERATION_OPT_KEY) (Default: upsert) <br/> - [OPERATION_OPT_KEY](#OPERATION_OPT_KEY) (Default: upsert) <br/>
<span style="color:grey">whether to do upsert, insert or bulkinsert for the write operation</span> <span style="color:grey">whether to do upsert, insert or bulkinsert for the write operation</span>
- [STORAGE_TYPE_OPT_KEY](#STORAGE_TYPE_OPT_KEY) (Default: COPY_ON_WRITE) <br/> - [STORAGE_TYPE_OPT_KEY](#STORAGE_TYPE_OPT_KEY) (Default: COPY_ON_WRITE) <br/>
<span style="color:grey">The storage type for the underlying data, for this write.</span> <span style="color:grey">The storage type for the underlying data, for this write. This can't change between writes.</span>
- [TABLE_NAME_OPT_KEY](#TABLE_NAME_OPT_KEY) (Default: None (mandatory)) <br/> - [TABLE_NAME_OPT_KEY](#TABLE_NAME_OPT_KEY) (Default: None (mandatory)) <br/>
<span style="color:grey">Hive table name, to register the dataset into.</span> <span style="color:grey">Hive table name, to register the dataset into.</span>
- [PRECOMBINE_FIELD_OPT_KEY](#PRECOMBINE_FIELD_OPT_KEY) (Default: ts) <br/> - [PRECOMBINE_FIELD_OPT_KEY](#PRECOMBINE_FIELD_OPT_KEY) (Default: ts) <br/>
@@ -121,7 +121,7 @@ summary: "Here we list all possible configurations and what they mean"
- [read options](#readoptions) (read.format.option(...)) <br/> - [read options](#readoptions) (read.format.option(...)) <br/>
<span style="color:grey">Options useful for reading datasets</span> <span style="color:grey">Options useful for reading datasets</span>
- [VIEW_TYPE_OPT_KEY](#VIEW_TYPE_OPT_KEY) (Default: = READ_OPTIMIZED) <br/> - [VIEW_TYPE_OPT_KEY](#VIEW_TYPE_OPT_KEY) (Default: = read_optimized) <br/>
<span style="color:grey">Whether data needs to be read, in incremental mode (new data since an instantTime) <span style="color:grey">Whether data needs to be read, in incremental mode (new data since an instantTime)
(or) Read Optimized mode (obtain latest view, based on columnar data) (or) Read Optimized mode (obtain latest view, based on columnar data)
(or) Real time mode (obtain latest view, based on row & columnar data)</span> (or) Real time mode (obtain latest view, based on row & columnar data)</span>

View File

@@ -107,7 +107,7 @@ public class HoodieClientTestUtils {
return HoodieReadClient.addHoodieSupport(sparkConf); return HoodieReadClient.addHoodieSupport(sparkConf);
} }
public static HashMap<String, String> getLatestFileIsToFullPath(String basePath, public static HashMap<String, String> getLatestFileIDsToFullPath(String basePath,
HoodieTimeline commitTimeline, HoodieTimeline commitTimeline,
List<HoodieInstant> commitsToReturn) throws IOException { List<HoodieInstant> commitsToReturn) throws IOException {
HashMap<String, String> fileIdToFullPath = new HashMap<>(); HashMap<String, String> fileIdToFullPath = new HashMap<>();
@@ -129,7 +129,7 @@ public class HoodieClientTestUtils {
new HoodieException("No commit exists at " + commitTime); new HoodieException("No commit exists at " + commitTime);
} }
try { try {
HashMap<String, String> paths = getLatestFileIsToFullPath(basePath, commitTimeline, Arrays.asList(commitInstant)); HashMap<String, String> paths = getLatestFileIDsToFullPath(basePath, commitTimeline, Arrays.asList(commitInstant));
return sqlContext.read() return sqlContext.read()
.parquet(paths.values().toArray(new String[paths.size()])) .parquet(paths.values().toArray(new String[paths.size()]))
.filter(String.format("%s ='%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitTime)); .filter(String.format("%s ='%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitTime));
@@ -150,7 +150,7 @@ public class HoodieClientTestUtils {
.getInstants().collect(Collectors.toList()); .getInstants().collect(Collectors.toList());
try { try {
// Go over the commit metadata, and obtain the new files that need to be read. // Go over the commit metadata, and obtain the new files that need to be read.
HashMap<String, String> fileIdToFullPath = getLatestFileIsToFullPath(basePath, commitTimeline, commitsToReturn); HashMap<String, String> fileIdToFullPath = getLatestFileIDsToFullPath(basePath, commitTimeline, commitsToReturn);
return sqlContext.read() return sqlContext.read()
.parquet(fileIdToFullPath.values().toArray(new String[fileIdToFullPath.size()])) .parquet(fileIdToFullPath.values().toArray(new String[fileIdToFullPath.size()]))
.filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, lastCommitTime)); .filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, lastCommitTime));

View File

@@ -16,6 +16,7 @@
package com.uber.hoodie.common.util; package com.uber.hoodie.common.util;
import com.uber.hoodie.avro.MercifulJsonConverter;
import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieIOException;
import java.net.URI; import java.net.URI;
@@ -118,4 +119,11 @@ public class SchemaTestUtil {
return new Schema.Parser() return new Schema.Parser()
.parse(SchemaTestUtil.class.getResourceAsStream("/complex-test-evolved.avro")); .parse(SchemaTestUtil.class.getResourceAsStream("/complex-test-evolved.avro"));
} }
public static GenericRecord generateAvroRecordFromJson(Schema schema, int recordNumber,
String commitTime, String fileId) throws IOException {
TestRecord record = new TestRecord(commitTime, recordNumber, fileId);
MercifulJsonConverter converter = new MercifulJsonConverter(schema);
return converter.convert(record.toJsonString());
}
} }

View File

@@ -20,6 +20,7 @@ import com.uber.hoodie.avro.MercifulJsonConverter;
import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.SchemaTestUtil;
import com.uber.hoodie.common.util.TestRecord; import com.uber.hoodie.common.util.TestRecord;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData;
@@ -120,31 +121,11 @@ public class InputFormatTestUtil {
private static Iterable<? extends GenericRecord> generateAvroRecords(Schema schema, int numberOfRecords, String commitTime, String fileId) throws IOException { private static Iterable<? extends GenericRecord> generateAvroRecords(Schema schema, int numberOfRecords, String commitTime, String fileId) throws IOException {
List<GenericRecord> records = new ArrayList<>(numberOfRecords); List<GenericRecord> records = new ArrayList<>(numberOfRecords);
for(int i=0;i<numberOfRecords;i++) { for(int i=0;i<numberOfRecords;i++) {
records.add(generateAvroRecordFromJson(schema, i, commitTime, fileId)); records.add(SchemaTestUtil.generateAvroRecordFromJson(schema, i, commitTime, fileId));
} }
return records; return records;
} }
public static GenericRecord generateAvroRecord(Schema schema, int recordNumber,
String commitTime, String fileId) {
GenericRecord record = new GenericData.Record(schema);
record.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitTime);
record.put("field1", "field" + recordNumber);
record.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, "key_" + recordNumber);
record.put("field2", "field" + recordNumber);
record.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, commitTime);
record.put(HoodieRecord.FILENAME_METADATA_FIELD, fileId);
record.put(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, commitTime + "_" + recordNumber);
return record;
}
public static GenericRecord generateAvroRecordFromJson(Schema schema, int recordNumber,
String commitTime, String fileId) throws IOException {
TestRecord record = new TestRecord(commitTime, recordNumber, fileId);
MercifulJsonConverter converter = new MercifulJsonConverter(schema);
return converter.convert(record.toJsonString());
}
public static void simulateParquetUpdates(File directory, Schema schema, String originalCommit, public static void simulateParquetUpdates(File directory, Schema schema, String originalCommit,
int totalNumberOfRecords, int numberOfRecordsToUpdate, int totalNumberOfRecords, int numberOfRecordsToUpdate,
String newCommit) throws IOException { String newCommit) throws IOException {

View File

@@ -78,7 +78,7 @@ public class HoodieRealtimeRecordReaderTest {
.overBaseCommit(baseCommit).withFs(FSUtils.getFs()).build(); .overBaseCommit(baseCommit).withFs(FSUtils.getFs()).build();
List<IndexedRecord> records = new ArrayList<>(); List<IndexedRecord> records = new ArrayList<>();
for(int i=0; i < numberOfRecords; i++) { for(int i=0; i < numberOfRecords; i++) {
records.add(InputFormatTestUtil.generateAvroRecordFromJson(schema, i, newCommit, "fileid0")); records.add(SchemaTestUtil.generateAvroRecordFromJson(schema, i, newCommit, "fileid0"));
} }
Schema writeSchema = records.get(0).getSchema(); Schema writeSchema = records.get(0).getSchema();
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, writeSchema); HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, writeSchema);

View File

@@ -59,7 +59,8 @@ public class DataSourceUtils {
if (i == parts.length - 1) { if (i == parts.length - 1) {
return val.toString(); return val.toString();
} else { } else {
if (val instanceof GenericRecord) { // VC: Need a test here
if (!(val instanceof GenericRecord)) {
throw new HoodieException("Cannot find a record at part value :" + part); throw new HoodieException("Cannot find a record at part value :" + part);
} }
valueNode = (GenericRecord) val; valueNode = (GenericRecord) val;
@@ -80,7 +81,7 @@ public class DataSourceUtils {
} }
/** /**
* Create a payload class via reflection, passing in an ordering/precombine value value. * Create a payload class via reflection, passing in an ordering/precombine value.
*/ */
public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal) throws IOException { public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal) throws IOException {
try { try {

View File

@@ -49,7 +49,6 @@ public class HoodieDataSourceHelpers {
* Get a list of instant times that have occurred, from the given instant timestamp. * Get a list of instant times that have occurred, from the given instant timestamp.
* *
* @param instantTimestamp * @param instantTimestamp
* @return
*/ */
public static List<String> listCommitsSince(FileSystem fs, String basePath, String instantTimestamp) { public static List<String> listCommitsSince(FileSystem fs, String basePath, String instantTimestamp) {
HoodieTimeline timeline = allCompletedCommitsCompactions(fs, basePath); HoodieTimeline timeline = allCompletedCommitsCompactions(fs, basePath);
@@ -71,7 +70,6 @@ public class HoodieDataSourceHelpers {
* *
* @param fs * @param fs
* @param basePath * @param basePath
* @return
*/ */
public static HoodieTimeline allCompletedCommitsCompactions(FileSystem fs, String basePath) { public static HoodieTimeline allCompletedCommitsCompactions(FileSystem fs, String basePath) {
HoodieTable table = HoodieTable HoodieTable table = HoodieTable

View File

@@ -38,9 +38,9 @@ object DataSourceReadOptions {
* Default: READ_OPTIMIZED * Default: READ_OPTIMIZED
*/ */
val VIEW_TYPE_OPT_KEY = "hoodie.datasource.view.type" val VIEW_TYPE_OPT_KEY = "hoodie.datasource.view.type"
val VIEW_TYPE_READ_OPTIMIZED_OPT_VAL = "READ_OPTIMIZED" val VIEW_TYPE_READ_OPTIMIZED_OPT_VAL = "read_optimized"
val VIEW_TYPE_INCREMENTAL_OPT_VAL = "INCREMENTAL" val VIEW_TYPE_INCREMENTAL_OPT_VAL = "incremental"
val VIEW_TYPE_REALTIME_OPT_VAL = "REALTIME" val VIEW_TYPE_REALTIME_OPT_VAL = "realtime"
val DEFAULT_VIEW_TYPE_OPT_VAL = VIEW_TYPE_READ_OPTIMIZED_OPT_VAL val DEFAULT_VIEW_TYPE_OPT_VAL = VIEW_TYPE_READ_OPTIMIZED_OPT_VAL
@@ -82,6 +82,7 @@ object DataSourceWriteOptions {
/** /**
* The storage type for the underlying data, for this write. * The storage type for the underlying data, for this write.
* Note that this can't change across writes.
* *
* Default: COPY_ON_WRITE * Default: COPY_ON_WRITE
*/ */

View File

@@ -66,9 +66,6 @@ public class HoodieJavaApp {
cli.run(); cli.run();
} }
public void run() throws Exception { public void run() throws Exception {
// Spark session setup.. // Spark session setup..
@@ -103,6 +100,7 @@ public class HoodieJavaApp {
.mode(SaveMode.Overwrite) // This will remove any existing data at path below, and create a new dataset if needed .mode(SaveMode.Overwrite) // This will remove any existing data at path below, and create a new dataset if needed
.save(tablePath); // ultimately where the dataset will be placed .save(tablePath); // ultimately where the dataset will be placed
String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, tablePath); String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
logger.info("First commit at instant time :" + commitInstantTime1);
/** /**
* Commit that updates records * Commit that updates records
@@ -120,6 +118,7 @@ public class HoodieJavaApp {
.mode(SaveMode.Append) .mode(SaveMode.Append)
.save(tablePath); .save(tablePath);
String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath); String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
logger.info("Second commit at instant time :" + commitInstantTime1);
/** /**
* Read & do some queries * Read & do some queries
@@ -142,7 +141,7 @@ public class HoodieJavaApp {
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), commitInstantTime1) // Only changes in write 2 above .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), commitInstantTime1) // Only changes in write 2 above
.load(tablePath); // For incremental view, pass in the root/base path of dataset .load(tablePath); // For incremental view, pass in the root/base path of dataset
System.out.println("You will only see records from : " + commitInstantTime2); logger.info("You will only see records from : " + commitInstantTime2);
hoodieIncViewDF.groupBy(hoodieIncViewDF.col("_hoodie_commit_time")).count().show(); hoodieIncViewDF.groupBy(hoodieIncViewDF.col("_hoodie_commit_time")).count().show();
} }
} }

View File

@@ -0,0 +1,110 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
import com.uber.hoodie.{DataSourceWriteOptions, OverwriteWithLatestAvroPayload, SimpleKeyGenerator}
import com.uber.hoodie.common.util.SchemaTestUtil
import com.uber.hoodie.exception.HoodieException
import org.apache.avro.generic.GenericRecord
import org.apache.commons.configuration.PropertiesConfiguration
import org.junit.Assert._
import org.junit.{Before, Test}
import org.scalatest.junit.AssertionsForJUnit
/**
* Tests on the default key generator, payload classes.
*/
class DataSourceDefaultsTest extends AssertionsForJUnit {
val schema = SchemaTestUtil.getComplexEvolvedSchema
var baseRecord : GenericRecord = null
@Before def initialize(): Unit = {
baseRecord = SchemaTestUtil
.generateAvroRecordFromJson(schema, 1, "001", "f1")
}
private def getKeyConfig(recordKeyFieldName: String, paritionPathField: String): PropertiesConfiguration = {
val props = new PropertiesConfiguration()
props.addProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, recordKeyFieldName)
props.addProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, paritionPathField)
props
}
@Test def testSimpleKeyGenerator() = {
// top level, valid fields
val hk1 = new SimpleKeyGenerator(getKeyConfig("field1", "name")).getKey(baseRecord)
assertEquals("field1", hk1.getRecordKey)
assertEquals("name1", hk1.getPartitionPath)
// recordKey field not specified
try {
val props = new PropertiesConfiguration()
props.addProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "field1")
new SimpleKeyGenerator(props).getKey(baseRecord)
fail("Should have errored out")
} catch {
case e: HoodieException => {
// do nothing
}
};
// partitionPath field is null
try {
new SimpleKeyGenerator(getKeyConfig("field1", null)).getKey(baseRecord)
fail("Should have errored out")
} catch {
case e: HoodieException => {
// do nothing
}
};
// nested field as record key and partition path
val hk2 = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.isAdmin"))
.getKey(baseRecord)
assertEquals("UserId1@001", hk2.getRecordKey)
assertEquals("false", hk2.getPartitionPath)
// Nested record key not found
try {
new SimpleKeyGenerator(getKeyConfig("testNestedRecord.NotThere", "testNestedRecord.isAdmin"))
.getKey(baseRecord)
fail("Should have errored out")
} catch {
case e: HoodieException => {
// do nothing
}
};
}
@Test def testOverwriteWithLatestAvroPayload() = {
val overWritePayload1 = new OverwriteWithLatestAvroPayload(baseRecord, 1)
val laterRecord = SchemaTestUtil
.generateAvroRecordFromJson(schema, 2, "001", "f1")
val overWritePayload2 = new OverwriteWithLatestAvroPayload(laterRecord, 2)
// it will provide the record with greatest combine value
val combinedPayload12 = overWritePayload1.preCombine(overWritePayload2)
val combinedGR12 = combinedPayload12.getInsertValue(schema).get().asInstanceOf[GenericRecord]
assertEquals("field2", combinedGR12.get("field1"))
// and it will be deterministic, to order of processing.
val combinedPayload21 = overWritePayload2.preCombine(overWritePayload1)
val combinedGR21 = combinedPayload21.getInsertValue(schema).get().asInstanceOf[GenericRecord]
assertEquals("field2", combinedGR21.get("field1"))
}
}

View File

@@ -21,7 +21,7 @@ import com.uber.hoodie.common.util.FSUtils
import com.uber.hoodie.config.HoodieWriteConfig import com.uber.hoodie.config.HoodieWriteConfig
import com.uber.hoodie.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import com.uber.hoodie.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} import org.apache.spark.sql._
import org.junit.Assert._ import org.junit.Assert._
import org.junit.{Before, Test} import org.junit.{Before, Test}
import org.junit.rules.TemporaryFolder import org.junit.rules.TemporaryFolder
@@ -127,9 +127,9 @@ class DataSourceTest extends AssertionsForJUnit {
try { try {
val hoodieROViewDF1 = spark.read.format("com.uber.hoodie") val hoodieROViewDF1 = spark.read.format("com.uber.hoodie")
.load(basePath + "/*/*/*/*") .load(basePath + "/*/*/*/*")
fail() // we would error out, since no compaction has yet occurred. fail("we should error out, since no compaction has yet occurred.")
} catch { } catch {
case e: Exception => { case e: AnalysisException => {
// do nothing // do nothing
} }
}; };