1
0

[HUDI-288]: Add support for ingesting multiple kafka streams in a single DeltaStreamer deployment (#1150)

* [HUDI-288]: Add support for ingesting multiple kafka streams in a single DeltaStreamer deployment
This commit is contained in:
Pratyaksh Sharma
2020-04-08 04:40:26 +05:30
committed by GitHub
parent b5d093a21b
commit d610252d6b
26 changed files with 1184 additions and 78 deletions

View File

@@ -292,7 +292,8 @@ public class HoodieCommitArchiveLog {
archivedMetaWrapper.setActionType(ActionType.clean.name()); archivedMetaWrapper.setActionType(ActionType.clean.name());
break; break;
} }
case HoodieTimeline.COMMIT_ACTION: { case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.DELTA_COMMIT_ACTION: {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class); .fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata)); archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata));
@@ -311,13 +312,6 @@ public class HoodieCommitArchiveLog {
archivedMetaWrapper.setActionType(ActionType.savepoint.name()); archivedMetaWrapper.setActionType(ActionType.savepoint.name());
break; break;
} }
case HoodieTimeline.DELTA_COMMIT_ACTION: {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata));
archivedMetaWrapper.setActionType(ActionType.commit.name());
break;
}
case HoodieTimeline.COMPACTION_ACTION: { case HoodieTimeline.COMPACTION_ACTION: {
HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, hoodieInstant.getTimestamp()); HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, hoodieInstant.getTimestamp());
archivedMetaWrapper.setHoodieCompactionPlan(plan); archivedMetaWrapper.setHoodieCompactionPlan(plan);

View File

@@ -68,7 +68,7 @@ public class TestMultiFS extends HoodieClientTestHarness {
cleanupTestDataGenerator(); cleanupTestDataGenerator();
} }
private HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig config) throws Exception { private HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig config) {
return new HoodieWriteClient(jsc, config); return new HoodieWriteClient(jsc, config);
} }
@@ -89,7 +89,7 @@ public class TestMultiFS extends HoodieClientTestHarness {
HoodieWriteConfig localConfig = getHoodieWriteConfig(tablePath); HoodieWriteConfig localConfig = getHoodieWriteConfig(tablePath);
try (HoodieWriteClient hdfsWriteClient = getHoodieWriteClient(cfg); try (HoodieWriteClient hdfsWriteClient = getHoodieWriteClient(cfg);
HoodieWriteClient localWriteClient = getHoodieWriteClient(localConfig);) { HoodieWriteClient localWriteClient = getHoodieWriteClient(localConfig)) {
// Write generated data to hdfs (only inserts) // Write generated data to hdfs (only inserts)
String readCommitTime = hdfsWriteClient.startCommit(); String readCommitTime = hdfsWriteClient.startCommit();

View File

@@ -41,6 +41,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
@@ -69,6 +71,7 @@ public class HoodieTestDataGenerator {
// based on examination of sample file, the schema produces the following per record size // based on examination of sample file, the schema produces the following per record size
public static final int SIZE_PER_RECORD = 50 * 1024; public static final int SIZE_PER_RECORD = 50 * 1024;
private static Logger logger = LogManager.getLogger(HoodieTestDataGenerator.class);
public static final String DEFAULT_FIRST_PARTITION_PATH = "2016/03/15"; public static final String DEFAULT_FIRST_PARTITION_PATH = "2016/03/15";
public static final String DEFAULT_SECOND_PARTITION_PATH = "2015/03/16"; public static final String DEFAULT_SECOND_PARTITION_PATH = "2015/03/16";
public static final String DEFAULT_THIRD_PARTITION_PATH = "2015/03/17"; public static final String DEFAULT_THIRD_PARTITION_PATH = "2015/03/17";
@@ -89,12 +92,18 @@ public class HoodieTestDataGenerator {
public static final String TIP_NESTED_SCHEMA = "{\"name\": \"tip_history\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"record\", \"name\": \"tip_history\", \"fields\": [" public static final String TIP_NESTED_SCHEMA = "{\"name\": \"tip_history\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"record\", \"name\": \"tip_history\", \"fields\": ["
+ "{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},"; + "{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},";
public static final String MAP_TYPE_SCHEMA = "{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},"; public static final String MAP_TYPE_SCHEMA = "{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},";
public static final String TRIP_EXAMPLE_SCHEMA = public static final String TRIP_EXAMPLE_SCHEMA =
TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX; TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
public static final String TRIP_FLATTENED_SCHEMA = public static final String TRIP_FLATTENED_SCHEMA =
TRIP_SCHEMA_PREFIX + FARE_FLATTENED_SCHEMA + TRIP_SCHEMA_SUFFIX; TRIP_SCHEMA_PREFIX + FARE_FLATTENED_SCHEMA + TRIP_SCHEMA_SUFFIX;
public static final String TRIP_SCHEMA = "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":["
+ "{\"name\":\"timestamp\",\"type\":\"double\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},"
+ "{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}";
public static final String SHORT_TRIP_SCHEMA = "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":["
+ "{\"name\":\"timestamp\",\"type\":\"double\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},"
+ "{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}";
public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString(); public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString();
public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double," public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,"
+ "map<string,string>,struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean"; + "map<string,string>,struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean";
@@ -102,13 +111,17 @@ public class HoodieTestDataGenerator {
public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS = public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS =
HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA); HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA);
public static final Schema AVRO_SHORT_TRIP_SCHEMA = new Schema.Parser().parse(SHORT_TRIP_SCHEMA);
public static final Schema AVRO_TRIP_SCHEMA = new Schema.Parser().parse(TRIP_SCHEMA);
public static final Schema FLATTENED_AVRO_SCHEMA = new Schema.Parser().parse(TRIP_FLATTENED_SCHEMA); public static final Schema FLATTENED_AVRO_SCHEMA = new Schema.Parser().parse(TRIP_FLATTENED_SCHEMA);
private static final Random RAND = new Random(46474747); private static final Random RAND = new Random(46474747);
private final Map<Integer, KeyPartition> existingKeys; //Maintains all the existing keys schema wise
private final Map<String, Map<Integer, KeyPartition>> existingKeysBySchema;
private final String[] partitionPaths; private final String[] partitionPaths;
private int numExistingKeys; //maintains the count of existing keys schema wise
private Map<String, Integer> numKeysBySchema;
public HoodieTestDataGenerator(String[] partitionPaths) { public HoodieTestDataGenerator(String[] partitionPaths) {
this(partitionPaths, new HashMap<>()); this(partitionPaths, new HashMap<>());
@@ -120,7 +133,9 @@ public class HoodieTestDataGenerator {
public HoodieTestDataGenerator(String[] partitionPaths, Map<Integer, KeyPartition> keyPartitionMap) { public HoodieTestDataGenerator(String[] partitionPaths, Map<Integer, KeyPartition> keyPartitionMap) {
this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length); this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length);
this.existingKeys = keyPartitionMap; this.existingKeysBySchema = new HashMap<>();
existingKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap);
numKeysBySchema = new HashMap<>();
} }
public static void writePartitionMetadata(FileSystem fs, String[] partitionPaths, String basePath) { public static void writePartitionMetadata(FileSystem fs, String[] partitionPaths, String basePath) {
@@ -129,6 +144,18 @@ public class HoodieTestDataGenerator {
} }
} }
public TestRawTripPayload generateRandomValueAsPerSchema(String schemaStr, HoodieKey key, String commitTime, boolean isFlattened) throws IOException {
if (TRIP_EXAMPLE_SCHEMA.equals(schemaStr)) {
return generateRandomValue(key, commitTime, isFlattened);
} else if (TRIP_SCHEMA.equals(schemaStr)) {
return generatePayloadForTripSchema(key, commitTime);
} else if (SHORT_TRIP_SCHEMA.equals(schemaStr)) {
return generatePayloadForShortTripSchema(key, commitTime);
}
return null;
}
/** /**
* Generates a new avro record of the above nested schema format, * Generates a new avro record of the above nested schema format,
* retaining the key if optionally provided. * retaining the key if optionally provided.
@@ -160,6 +187,19 @@ public class HoodieTestDataGenerator {
return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA); return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
} }
/**
* Generates a new avro record with TRIP_SCHEMA, retaining the key if optionally provided.
*/
public TestRawTripPayload generatePayloadForTripSchema(HoodieKey key, String commitTime) throws IOException {
GenericRecord rec = generateRecordForTripSchema(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0);
return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_SCHEMA);
}
public TestRawTripPayload generatePayloadForShortTripSchema(HoodieKey key, String commitTime) throws IOException {
GenericRecord rec = generateRecordForShortTripSchema(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0);
return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), SHORT_TRIP_SCHEMA);
}
/** /**
* Generates a new avro record of the above schema format for a delete. * Generates a new avro record of the above schema format for a delete.
*/ */
@@ -223,6 +263,31 @@ public class HoodieTestDataGenerator {
return rec; return rec;
} }
/*
Generate random record using TRIP_SCHEMA
*/
public GenericRecord generateRecordForTripSchema(String rowKey, String riderName, String driverName, double timestamp) {
GenericRecord rec = new GenericData.Record(AVRO_TRIP_SCHEMA);
rec.put("_row_key", rowKey);
rec.put("timestamp", timestamp);
rec.put("rider", riderName);
rec.put("driver", driverName);
rec.put("fare", RAND.nextDouble() * 100);
rec.put("_hoodie_is_deleted", false);
return rec;
}
public GenericRecord generateRecordForShortTripSchema(String rowKey, String riderName, String driverName, double timestamp) {
GenericRecord rec = new GenericData.Record(AVRO_SHORT_TRIP_SCHEMA);
rec.put("_row_key", rowKey);
rec.put("timestamp", timestamp);
rec.put("rider", riderName);
rec.put("driver", driverName);
rec.put("fare", RAND.nextDouble() * 100);
rec.put("_hoodie_is_deleted", false);
return rec;
}
public static void createCommitFile(String basePath, String instantTime, Configuration configuration) { public static void createCommitFile(String basePath, String instantTime, Configuration configuration) {
Arrays.asList(HoodieTimeline.makeCommitFileName(instantTime), HoodieTimeline.makeInflightCommitFileName(instantTime), Arrays.asList(HoodieTimeline.makeCommitFileName(instantTime), HoodieTimeline.makeInflightCommitFileName(instantTime),
HoodieTimeline.makeRequestedCommitFileName(instantTime)) HoodieTimeline.makeRequestedCommitFileName(instantTime))
@@ -283,6 +348,10 @@ public class HoodieTestDataGenerator {
} }
} }
public List<HoodieRecord> generateInsertsAsPerSchema(String commitTime, Integer n, String schemaStr) {
return generateInsertsStream(commitTime, n, schemaStr).collect(Collectors.toList());
}
/** /**
* Generates new inserts with nested schema, uniformly across the partition paths above. * Generates new inserts with nested schema, uniformly across the partition paths above.
* It also updates the list of existing keys. * It also updates the list of existing keys.
@@ -301,15 +370,22 @@ public class HoodieTestDataGenerator {
* @return List of {@link HoodieRecord}s * @return List of {@link HoodieRecord}s
*/ */
public List<HoodieRecord> generateInserts(String instantTime, Integer n, boolean isFlattened) { public List<HoodieRecord> generateInserts(String instantTime, Integer n, boolean isFlattened) {
return generateInsertsStream(instantTime, n, isFlattened).collect(Collectors.toList()); return generateInsertsStream(instantTime, n, isFlattened, TRIP_EXAMPLE_SCHEMA).collect(Collectors.toList());
}
/**
* Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys.
*/
public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer n, String schemaStr) {
return generateInsertsStream(commitTime, n, false, schemaStr);
} }
/** /**
* Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys. * Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys.
*/ */
public Stream<HoodieRecord> generateInsertsStream( public Stream<HoodieRecord> generateInsertsStream(
String instantTime, Integer n, boolean isFlattened) { String instantTime, Integer n, boolean isFlattened, String schemaStr) {
int currSize = getNumExistingKeys(); int currSize = getNumExistingKeys(schemaStr);
return IntStream.range(0, n).boxed().map(i -> { return IntStream.range(0, n).boxed().map(i -> {
String partitionPath = partitionPaths[RAND.nextInt(partitionPaths.length)]; String partitionPath = partitionPaths[RAND.nextInt(partitionPaths.length)];
@@ -317,16 +393,36 @@ public class HoodieTestDataGenerator {
KeyPartition kp = new KeyPartition(); KeyPartition kp = new KeyPartition();
kp.key = key; kp.key = key;
kp.partitionPath = partitionPath; kp.partitionPath = partitionPath;
existingKeys.put(currSize + i, kp); populateKeysBySchema(schemaStr, currSize + i, kp);
numExistingKeys++; incrementNumExistingKeysBySchema(schemaStr);
try { try {
return new HoodieRecord(key, generateRandomValue(key, instantTime, isFlattened)); return new HoodieRecord(key, generateRandomValueAsPerSchema(schemaStr, key, instantTime, isFlattened));
} catch (IOException e) { } catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e); throw new HoodieIOException(e.getMessage(), e);
} }
}); });
} }
/*
Takes care of populating keys schema wise
*/
private void populateKeysBySchema(String schemaStr, int i, KeyPartition kp) {
if (existingKeysBySchema.containsKey(schemaStr)) {
existingKeysBySchema.get(schemaStr).put(i, kp);
} else {
existingKeysBySchema.put(schemaStr, new HashMap<>());
existingKeysBySchema.get(schemaStr).put(i, kp);
}
}
private void incrementNumExistingKeysBySchema(String schemaStr) {
if (numKeysBySchema.containsKey(schemaStr)) {
numKeysBySchema.put(schemaStr, numKeysBySchema.get(schemaStr) + 1);
} else {
numKeysBySchema.put(schemaStr, 1);
}
}
public List<HoodieRecord> generateSameKeyInserts(String instantTime, List<HoodieRecord> origin) throws IOException { public List<HoodieRecord> generateSameKeyInserts(String instantTime, List<HoodieRecord> origin) throws IOException {
List<HoodieRecord> copy = new ArrayList<>(); List<HoodieRecord> copy = new ArrayList<>();
for (HoodieRecord r : origin) { for (HoodieRecord r : origin) {
@@ -339,7 +435,7 @@ public class HoodieTestDataGenerator {
public List<HoodieRecord> generateInsertsWithHoodieAvroPayload(String instantTime, int limit) { public List<HoodieRecord> generateInsertsWithHoodieAvroPayload(String instantTime, int limit) {
List<HoodieRecord> inserts = new ArrayList<>(); List<HoodieRecord> inserts = new ArrayList<>();
int currSize = getNumExistingKeys(); int currSize = getNumExistingKeys(TRIP_EXAMPLE_SCHEMA);
for (int i = 0; i < limit; i++) { for (int i = 0; i < limit; i++) {
String partitionPath = partitionPaths[RAND.nextInt(partitionPaths.length)]; String partitionPath = partitionPaths[RAND.nextInt(partitionPaths.length)];
HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath); HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath);
@@ -349,8 +445,8 @@ public class HoodieTestDataGenerator {
KeyPartition kp = new KeyPartition(); KeyPartition kp = new KeyPartition();
kp.key = key; kp.key = key;
kp.partitionPath = partitionPath; kp.partitionPath = partitionPath;
existingKeys.put(currSize + i, kp); populateKeysBySchema(TRIP_EXAMPLE_SCHEMA, currSize + i, kp);
numExistingKeys++; incrementNumExistingKeysBySchema(TRIP_EXAMPLE_SCHEMA);
} }
return inserts; return inserts;
} }
@@ -431,6 +527,8 @@ public class HoodieTestDataGenerator {
public List<HoodieRecord> generateUpdates(String instantTime, Integer n) throws IOException { public List<HoodieRecord> generateUpdates(String instantTime, Integer n) throws IOException {
List<HoodieRecord> updates = new ArrayList<>(); List<HoodieRecord> updates = new ArrayList<>();
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
Map<Integer, KeyPartition> existingKeys = existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
Integer numExistingKeys = numKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
KeyPartition kp = existingKeys.get(RAND.nextInt(numExistingKeys - 1)); KeyPartition kp = existingKeys.get(RAND.nextInt(numExistingKeys - 1));
HoodieRecord record = generateUpdateRecord(kp.key, instantTime); HoodieRecord record = generateUpdateRecord(kp.key, instantTime);
updates.add(record); updates.add(record);
@@ -438,6 +536,10 @@ public class HoodieTestDataGenerator {
return updates; return updates;
} }
public List<HoodieRecord> generateUpdatesAsPerSchema(String commitTime, Integer n, String schemaStr) {
return generateUniqueUpdatesStream(commitTime, n, schemaStr).collect(Collectors.toList());
}
/** /**
* Generates deduped updates of keys previously inserted, randomly distributed across the keys above. * Generates deduped updates of keys previously inserted, randomly distributed across the keys above.
* *
@@ -446,7 +548,7 @@ public class HoodieTestDataGenerator {
* @return list of hoodie record updates * @return list of hoodie record updates
*/ */
public List<HoodieRecord> generateUniqueUpdates(String instantTime, Integer n) { public List<HoodieRecord> generateUniqueUpdates(String instantTime, Integer n) {
return generateUniqueUpdatesStream(instantTime, n).collect(Collectors.toList()); return generateUniqueUpdatesStream(instantTime, n, TRIP_EXAMPLE_SCHEMA).collect(Collectors.toList());
} }
/** /**
@@ -466,8 +568,10 @@ public class HoodieTestDataGenerator {
* @param n Number of unique records * @param n Number of unique records
* @return stream of hoodie record updates * @return stream of hoodie record updates
*/ */
public Stream<HoodieRecord> generateUniqueUpdatesStream(String instantTime, Integer n) { public Stream<HoodieRecord> generateUniqueUpdatesStream(String instantTime, Integer n, String schemaStr) {
final Set<KeyPartition> used = new HashSet<>(); final Set<KeyPartition> used = new HashSet<>();
int numExistingKeys = numKeysBySchema.getOrDefault(schemaStr, 0);
Map<Integer, KeyPartition> existingKeys = existingKeysBySchema.get(schemaStr);
if (n > numExistingKeys) { if (n > numExistingKeys) {
throw new IllegalArgumentException("Requested unique updates is greater than number of available keys"); throw new IllegalArgumentException("Requested unique updates is greater than number of available keys");
} }
@@ -480,9 +584,10 @@ public class HoodieTestDataGenerator {
index = (index + 1) % numExistingKeys; index = (index + 1) % numExistingKeys;
kp = existingKeys.get(index); kp = existingKeys.get(index);
} }
logger.debug("key getting updated: " + kp.key.getRecordKey());
used.add(kp); used.add(kp);
try { try {
return new HoodieRecord(kp.key, generateRandomValue(kp.key, instantTime)); return new HoodieRecord(kp.key, generateRandomValueAsPerSchema(schemaStr, kp.key, instantTime, false));
} catch (IOException e) { } catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e); throw new HoodieIOException(e.getMessage(), e);
} }
@@ -497,6 +602,8 @@ public class HoodieTestDataGenerator {
*/ */
public Stream<HoodieKey> generateUniqueDeleteStream(Integer n) { public Stream<HoodieKey> generateUniqueDeleteStream(Integer n) {
final Set<KeyPartition> used = new HashSet<>(); final Set<KeyPartition> used = new HashSet<>();
Map<Integer, KeyPartition> existingKeys = existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
Integer numExistingKeys = numKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
if (n > numExistingKeys) { if (n > numExistingKeys) {
throw new IllegalArgumentException("Requested unique deletes is greater than number of available keys"); throw new IllegalArgumentException("Requested unique deletes is greater than number of available keys");
} }
@@ -514,6 +621,7 @@ public class HoodieTestDataGenerator {
used.add(kp); used.add(kp);
result.add(kp.key); result.add(kp.key);
} }
numKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, numExistingKeys);
return result.stream(); return result.stream();
} }
@@ -526,6 +634,8 @@ public class HoodieTestDataGenerator {
*/ */
public Stream<HoodieRecord> generateUniqueDeleteRecordStream(String instantTime, Integer n) { public Stream<HoodieRecord> generateUniqueDeleteRecordStream(String instantTime, Integer n) {
final Set<KeyPartition> used = new HashSet<>(); final Set<KeyPartition> used = new HashSet<>();
Map<Integer, KeyPartition> existingKeys = existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
Integer numExistingKeys = numKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
if (n > numExistingKeys) { if (n > numExistingKeys) {
throw new IllegalArgumentException("Requested unique deletes is greater than number of available keys"); throw new IllegalArgumentException("Requested unique deletes is greater than number of available keys");
} }
@@ -536,7 +646,7 @@ public class HoodieTestDataGenerator {
while (!existingKeys.containsKey(index)) { while (!existingKeys.containsKey(index)) {
index = (index + 1) % numExistingKeys; index = (index + 1) % numExistingKeys;
} }
// swap chosen index with last index and remove last entry. // swap chosen index with last index and remove last entry.
KeyPartition kp = existingKeys.remove(index); KeyPartition kp = existingKeys.remove(index);
existingKeys.put(index, existingKeys.get(numExistingKeys - 1)); existingKeys.put(index, existingKeys.get(numExistingKeys - 1));
existingKeys.remove(numExistingKeys - 1); existingKeys.remove(numExistingKeys - 1);
@@ -548,6 +658,7 @@ public class HoodieTestDataGenerator {
throw new HoodieIOException(e.getMessage(), e); throw new HoodieIOException(e.getMessage(), e);
} }
} }
numKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, numExistingKeys);
return result.stream(); return result.stream();
} }
@@ -555,8 +666,8 @@ public class HoodieTestDataGenerator {
return partitionPaths; return partitionPaths;
} }
public int getNumExistingKeys() { public int getNumExistingKeys(String schemaStr) {
return numExistingKeys; return numKeysBySchema.getOrDefault(schemaStr, 0);
} }
public static class KeyPartition implements Serializable { public static class KeyPartition implements Serializable {
@@ -566,6 +677,6 @@ public class HoodieTestDataGenerator {
} }
public void close() { public void close() {
existingKeys.clear(); existingKeysBySchema.clear();
} }
} }

View File

@@ -31,6 +31,7 @@ import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types; import org.apache.parquet.schema.Types;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@@ -70,6 +71,11 @@ public class TestHiveSyncTool {
TestUtil.clear(); TestUtil.clear();
} }
@AfterClass
public static void cleanUpClass() {
TestUtil.shutdown();
}
/** /**
* Testing converting array types to Hive field declaration strings. According to the Parquet-113 spec: * Testing converting array types to Hive field declaration strings. According to the Parquet-113 spec:
* https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists

View File

@@ -83,6 +83,7 @@ public class TestUtil {
private static MiniDFSCluster dfsCluster; private static MiniDFSCluster dfsCluster;
private static ZooKeeperServer zkServer; private static ZooKeeperServer zkServer;
private static HiveServer2 hiveServer; private static HiveServer2 hiveServer;
private static HiveTestService hiveTestService;
private static Configuration configuration; private static Configuration configuration;
static HiveSyncConfig hiveSyncConfig; static HiveSyncConfig hiveSyncConfig;
private static DateTimeFormatter dtfOut; private static DateTimeFormatter dtfOut;
@@ -100,8 +101,8 @@ public class TestUtil {
zkServer = zkService.start(); zkServer = zkService.start();
} }
if (hiveServer == null) { if (hiveServer == null) {
HiveTestService hiveService = new HiveTestService(configuration); hiveTestService = new HiveTestService(configuration);
hiveServer = hiveService.start(); hiveServer = hiveTestService.start();
} }
fileSystem = FileSystem.get(configuration); fileSystem = FileSystem.get(configuration);
@@ -139,11 +140,13 @@ public class TestUtil {
return hiveServer.getHiveConf(); return hiveServer.getHiveConf();
} }
@SuppressWarnings("unused")
public static void shutdown() { public static void shutdown() {
if (hiveServer != null) { if (hiveServer != null) {
hiveServer.stop(); hiveServer.stop();
} }
if (hiveTestService != null) {
hiveTestService.stop();
}
if (dfsCluster != null) { if (dfsCluster != null) {
dfsCluster.shutdown(); dfsCluster.shutdown();
} }

View File

@@ -121,6 +121,20 @@ public class HiveTestService {
return hiveServer; return hiveServer;
} }
public void stop() {
resetSystemProperties();
if (tServer != null) {
tServer.stop();
}
if (hiveServer != null) {
hiveServer.stop();
}
LOG.info("Hive Minicluster service shut down.");
tServer = null;
hiveServer = null;
hadoopConf = null;
}
private HiveConf configureHive(Configuration conf, String localHiveLocation) throws IOException { private HiveConf configureHive(Configuration conf, String localHiveLocation) throws IOException {
conf.set("hive.metastore.local", "false"); conf.set("hive.metastore.local", "false");
conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://" + bindIP + ":" + metastorePort); conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://" + bindIP + ":" + metastorePort);
@@ -183,6 +197,17 @@ public class HiveTestService {
} }
} }
private void resetSystemProperties() {
for (Map.Entry<String, String> entry : sysProps.entrySet()) {
if (entry.getValue() != null) {
System.setProperty(entry.getKey(), entry.getValue());
} else {
System.getProperties().remove(entry.getKey());
}
}
sysProps.clear();
}
private static String getHiveLocation(String baseLocation) { private static String getHiveLocation(String baseLocation) {
return baseLocation + Path.SEPARATOR + "hive"; return baseLocation + Path.SEPARATOR + "hive";
} }

View File

@@ -121,14 +121,14 @@ public class ITTestHoodieDemo extends ITTestBase {
+ " --table-type COPY_ON_WRITE " + " --table-type COPY_ON_WRITE "
+ " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts " + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
+ " --target-base-path " + COW_BASE_PATH + " --target-table " + COW_TABLE_NAME + " --target-base-path " + COW_BASE_PATH + " --target-table " + COW_TABLE_NAME
+ " --props /var/demo/config/dfs-source.properties " + " --props /var/demo/config/dfs-source.properties"
+ " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider " + " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
+ String.format(HIVE_SYNC_CMD_FMT, "dt", COW_TABLE_NAME), + String.format(HIVE_SYNC_CMD_FMT, "dt", COW_TABLE_NAME),
("spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE ("spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE
+ " --table-type MERGE_ON_READ " + " --table-type MERGE_ON_READ "
+ " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts " + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
+ " --target-base-path " + MOR_BASE_PATH + " --target-table " + MOR_TABLE_NAME + " --target-base-path " + MOR_BASE_PATH + " --target-table " + MOR_TABLE_NAME
+ " --props /var/demo/config/dfs-source.properties " + " --props /var/demo/config/dfs-source.properties"
+ " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider " + " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
+ " --disable-compaction " + String.format(HIVE_SYNC_CMD_FMT, "dt", MOR_TABLE_NAME))); + " --disable-compaction " + String.format(HIVE_SYNC_CMD_FMT, "dt", MOR_TABLE_NAME)));
@@ -173,14 +173,14 @@ public class ITTestHoodieDemo extends ITTestBase {
+ " --table-type COPY_ON_WRITE " + " --table-type COPY_ON_WRITE "
+ " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts " + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
+ " --target-base-path " + COW_BASE_PATH + " --target-table " + COW_TABLE_NAME + " --target-base-path " + COW_BASE_PATH + " --target-table " + COW_TABLE_NAME
+ " --props /var/demo/config/dfs-source.properties " + " --props /var/demo/config/dfs-source.properties"
+ " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider " + " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
+ String.format(HIVE_SYNC_CMD_FMT, "dt", COW_TABLE_NAME)), + String.format(HIVE_SYNC_CMD_FMT, "dt", COW_TABLE_NAME)),
("spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE ("spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE
+ " --table-type MERGE_ON_READ " + " --table-type MERGE_ON_READ "
+ " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts " + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
+ " --target-base-path " + MOR_BASE_PATH + " --target-table " + MOR_TABLE_NAME + " --target-base-path " + MOR_BASE_PATH + " --target-table " + MOR_TABLE_NAME
+ " --props /var/demo/config/dfs-source.properties " + " --props /var/demo/config/dfs-source.properties"
+ " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider " + " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
+ " --disable-compaction " + String.format(HIVE_SYNC_CMD_FMT, "dt", MOR_TABLE_NAME))); + " --disable-compaction " + String.format(HIVE_SYNC_CMD_FMT, "dt", MOR_TABLE_NAME)));
executeCommandStringsInDocker(ADHOC_1_CONTAINER, cmds); executeCommandStringsInDocker(ADHOC_1_CONTAINER, cmds);

View File

@@ -262,6 +262,7 @@ object DataSourceWriteOptions {
val HIVE_URL_OPT_KEY = "hoodie.datasource.hive_sync.jdbcurl" val HIVE_URL_OPT_KEY = "hoodie.datasource.hive_sync.jdbcurl"
val HIVE_PARTITION_FIELDS_OPT_KEY = "hoodie.datasource.hive_sync.partition_fields" val HIVE_PARTITION_FIELDS_OPT_KEY = "hoodie.datasource.hive_sync.partition_fields"
val HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY = "hoodie.datasource.hive_sync.partition_extractor_class" val HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY = "hoodie.datasource.hive_sync.partition_extractor_class"
val HIVE_ASSUME_DATE_PARTITION_OPT_KEY = "hoodie.datasource.hive_sync.assume_date_partitioning"
val HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY = "hoodie.datasource.hive_sync.use_pre_apache_input_format" val HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY = "hoodie.datasource.hive_sync.use_pre_apache_input_format"
// DEFAULT FOR HIVE SPECIFIC CONFIGS // DEFAULT FOR HIVE SPECIFIC CONFIGS

View File

@@ -26,7 +26,6 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -153,20 +152,14 @@ public class DeltaSync implements Serializable {
*/ */
private transient HoodieWriteClient writeClient; private transient HoodieWriteClient writeClient;
/**
* Table Type.
*/
private final HoodieTableType tableType;
public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider,
HoodieTableType tableType, TypedProperties props, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf, TypedProperties props, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf,
Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException { Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
this.cfg = cfg; this.cfg = cfg;
this.jssc = jssc; this.jssc = jssc;
this.sparkSession = sparkSession; this.sparkSession = sparkSession;
this.fs = fs; this.fs = fs;
this.tableType = tableType;
this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient; this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
this.props = props; this.props = props;
this.schemaProvider = schemaProvider; this.schemaProvider = schemaProvider;

View File

@@ -93,6 +93,17 @@ public class HoodieDeltaStreamer implements Serializable {
getDefaultHiveConf(jssc.hadoopConfiguration())); getDefaultHiveConf(jssc.hadoopConfiguration()));
} }
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, TypedProperties props) throws IOException {
this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()),
getDefaultHiveConf(jssc.hadoopConfiguration()), props);
}
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf,
TypedProperties properties) throws IOException {
this.cfg = cfg;
this.deltaSyncService = new DeltaSyncService(cfg, jssc, fs, hiveConf, properties);
}
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf) throws IOException { public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf) throws IOException {
this.cfg = cfg; this.cfg = cfg;
this.deltaSyncService = new DeltaSyncService(cfg, jssc, fs, hiveConf); this.deltaSyncService = new DeltaSyncService(cfg, jssc, fs, hiveConf);
@@ -142,7 +153,7 @@ public class HoodieDeltaStreamer implements Serializable {
UPSERT, INSERT, BULK_INSERT UPSERT, INSERT, BULK_INSERT
} }
private static class OperationConvertor implements IStringConverter<Operation> { protected static class OperationConvertor implements IStringConverter<Operation> {
@Override @Override
public Operation convert(String value) throws ParameterException { public Operation convert(String value) throws ParameterException {
@@ -150,7 +161,7 @@ public class HoodieDeltaStreamer implements Serializable {
} }
} }
private static class TransformersConverter implements IStringConverter<List<String>> { protected static class TransformersConverter implements IStringConverter<List<String>> {
@Override @Override
public List<String> convert(String value) throws ParameterException { public List<String> convert(String value) throws ParameterException {
@@ -169,6 +180,7 @@ public class HoodieDeltaStreamer implements Serializable {
required = true) required = true)
public String targetBasePath; public String targetBasePath;
// TODO: How to obtain hive configs to register?
@Parameter(names = {"--target-table"}, description = "name of the target table in Hive", required = true) @Parameter(names = {"--target-table"}, description = "name of the target table in Hive", required = true)
public String targetTableName; public String targetTableName;
@@ -359,8 +371,8 @@ public class HoodieDeltaStreamer implements Serializable {
*/ */
private transient DeltaSync deltaSync; private transient DeltaSync deltaSync;
public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf) public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf,
throws IOException { TypedProperties properties) throws IOException {
this.cfg = cfg; this.cfg = cfg;
this.jssc = jssc; this.jssc = jssc;
this.sparkSession = SparkSession.builder().config(jssc.getConf()).getOrCreate(); this.sparkSession = SparkSession.builder().config(jssc.getConf()).getOrCreate();
@@ -376,7 +388,7 @@ public class HoodieDeltaStreamer implements Serializable {
tableType = HoodieTableType.valueOf(cfg.tableType); tableType = HoodieTableType.valueOf(cfg.tableType);
} }
this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); this.props = properties != null ? properties : UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
LOG.info("Creating delta streamer with configs : " + props.toString()); LOG.info("Creating delta streamer with configs : " + props.toString());
this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc); this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc);
@@ -384,8 +396,14 @@ public class HoodieDeltaStreamer implements Serializable {
cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation; cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation;
} }
deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, tableType, props, jssc, fs, hiveConf, deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props, jssc, fs, hiveConf,
this::onInitializingWriteClient); this::onInitializingWriteClient);
}
public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf)
throws IOException {
this(cfg, jssc, fs, hiveConf, null);
} }
public DeltaSync getDeltaSync() { public DeltaSync getDeltaSync() {

View File

@@ -0,0 +1,393 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.deltastreamer;
import com.beust.jcommander.Parameter;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
import com.beust.jcommander.JCommander;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.utilities.sources.JsonDFSSource;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Wrapper over HoodieDeltaStreamer.java class.
* Helps with ingesting incremental data into hoodie datasets for multiple tables.
* Currently supports only COPY_ON_WRITE storage type.
*/
public class HoodieMultiTableDeltaStreamer {
private static Logger logger = LogManager.getLogger(HoodieMultiTableDeltaStreamer.class);
private List<TableExecutionContext> tableExecutionContexts;
private transient JavaSparkContext jssc;
private Set<String> successTables;
private Set<String> failedTables;
public HoodieMultiTableDeltaStreamer(Config config, JavaSparkContext jssc) throws IOException {
this.tableExecutionContexts = new ArrayList<>();
this.successTables = new HashSet<>();
this.failedTables = new HashSet<>();
this.jssc = jssc;
String commonPropsFile = config.propsFilePath;
String configFolder = config.configFolder;
FileSystem fs = FSUtils.getFs(commonPropsFile, jssc.hadoopConfiguration());
configFolder = configFolder.charAt(configFolder.length() - 1) == '/' ? configFolder.substring(0, configFolder.length() - 1) : configFolder;
checkIfPropsFileAndConfigFolderExist(commonPropsFile, configFolder, fs);
TypedProperties properties = UtilHelpers.readConfig(fs, new Path(commonPropsFile), new ArrayList<>()).getConfig();
//get the tables to be ingested and their corresponding config files from this properties instance
populateTableExecutionContextList(properties, configFolder, fs, config);
}
private void checkIfPropsFileAndConfigFolderExist(String commonPropsFile, String configFolder, FileSystem fs) throws IOException {
if (!fs.exists(new Path(commonPropsFile))) {
throw new IllegalArgumentException("Please provide valid common config file path!");
}
if (!fs.exists(new Path(configFolder))) {
fs.mkdirs(new Path(configFolder));
}
}
private void checkIfTableConfigFileExists(String configFolder, FileSystem fs, String configFilePath) throws IOException {
if (!fs.exists(new Path(configFilePath)) || !fs.isFile(new Path(configFilePath))) {
throw new IllegalArgumentException("Please provide valid table config file path!");
}
Path path = new Path(configFilePath);
Path filePathInConfigFolder = new Path(configFolder, path.getName());
if (!fs.exists(filePathInConfigFolder)) {
FileUtil.copy(fs, path, fs, filePathInConfigFolder, false, fs.getConf());
}
}
//commonProps are passed as parameter which contain table to config file mapping
private void populateTableExecutionContextList(TypedProperties properties, String configFolder, FileSystem fs, Config config) throws IOException {
List<String> tablesToBeIngested = getTablesToBeIngested(properties);
logger.info("tables to be ingested via MultiTableDeltaStreamer : " + tablesToBeIngested);
TableExecutionContext executionContext;
for (String table : tablesToBeIngested) {
String[] tableWithDatabase = table.split("\\.");
String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] : "default";
String currentTable = tableWithDatabase.length > 1 ? tableWithDatabase[1] : table;
String configProp = Constants.INGESTION_PREFIX + database + Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
String configFilePath = properties.getString(configProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
checkIfTableConfigFileExists(configFolder, fs, configFilePath);
TypedProperties tableProperties = UtilHelpers.readConfig(fs, new Path(configFilePath), new ArrayList<>()).getConfig();
properties.forEach((k,v) -> {
tableProperties.setProperty(k.toString(), v.toString());
});
final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
//copy all the values from config to cfg
String targetBasePath = resetTarget(config, database, currentTable);
Helpers.deepCopyConfigs(config, cfg);
String overriddenTargetBasePath = tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) ? targetBasePath : overriddenTargetBasePath;
if (cfg.enableHiveSync && StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), ""))) {
throw new HoodieException("Hive sync table field not provided!");
}
populateSchemaProviderProps(cfg, tableProperties);
executionContext = new TableExecutionContext();
executionContext.setProperties(tableProperties);
executionContext.setConfig(cfg);
executionContext.setDatabase(database);
executionContext.setTableName(currentTable);
this.tableExecutionContexts.add(executionContext);
}
}
private List<String> getTablesToBeIngested(TypedProperties properties) {
String combinedTablesString = properties.getString(Constants.TABLES_TO_BE_INGESTED_PROP);
if (combinedTablesString == null) {
return new ArrayList<>();
}
String[] tablesArray = combinedTablesString.split(Constants.COMMA_SEPARATOR);
return Arrays.asList(tablesArray);
}
private void populateSchemaProviderProps(HoodieDeltaStreamer.Config cfg, TypedProperties typedProperties) {
if (cfg.schemaProviderClassName.equals(SchemaRegistryProvider.class.getName())) {
String schemaRegistryBaseUrl = typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP);
String schemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP);
typedProperties.setProperty(Constants.SOURCE_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + schemaRegistrySuffix);
typedProperties.setProperty(Constants.TARGET_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + schemaRegistrySuffix);
}
}
public static class Helpers {
static String getDefaultConfigFilePath(String configFolder, String database, String currentTable) {
return configFolder + Constants.FILEDELIMITER + database + Constants.UNDERSCORE + currentTable + Constants.DEFAULT_CONFIG_FILE_NAME_SUFFIX;
}
static String getTableWithDatabase(TableExecutionContext context) {
return context.getDatabase() + Constants.DELIMITER + context.getTableName();
}
static void deepCopyConfigs(Config globalConfig, HoodieDeltaStreamer.Config tableConfig) {
tableConfig.enableHiveSync = globalConfig.enableHiveSync;
tableConfig.schemaProviderClassName = globalConfig.schemaProviderClassName;
tableConfig.sourceOrderingField = globalConfig.sourceOrderingField;
tableConfig.sourceClassName = globalConfig.sourceClassName;
tableConfig.tableType = globalConfig.tableType;
tableConfig.targetTableName = globalConfig.targetTableName;
tableConfig.operation = globalConfig.operation;
tableConfig.sourceLimit = globalConfig.sourceLimit;
tableConfig.checkpoint = globalConfig.checkpoint;
tableConfig.continuousMode = globalConfig.continuousMode;
tableConfig.filterDupes = globalConfig.filterDupes;
tableConfig.payloadClassName = globalConfig.payloadClassName;
tableConfig.forceDisableCompaction = globalConfig.forceDisableCompaction;
tableConfig.maxPendingCompactions = globalConfig.maxPendingCompactions;
tableConfig.minSyncIntervalSeconds = globalConfig.minSyncIntervalSeconds;
tableConfig.transformerClassNames = globalConfig.transformerClassNames;
tableConfig.commitOnErrors = globalConfig.commitOnErrors;
tableConfig.compactSchedulingMinShare = globalConfig.compactSchedulingMinShare;
tableConfig.compactSchedulingWeight = globalConfig.compactSchedulingWeight;
tableConfig.deltaSyncSchedulingMinShare = globalConfig.deltaSyncSchedulingMinShare;
tableConfig.deltaSyncSchedulingWeight = globalConfig.deltaSyncSchedulingWeight;
tableConfig.sparkMaster = globalConfig.sparkMaster;
}
}
public static void main(String[] args) throws IOException {
final Config config = new Config();
JCommander cmd = new JCommander(config, null, args);
if (config.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
JavaSparkContext jssc = UtilHelpers.buildSparkContext("multi-table-delta-streamer", Constants.LOCAL_SPARK_MASTER);
try {
new HoodieMultiTableDeltaStreamer(config, jssc).sync();
} finally {
jssc.stop();
}
}
public static class Config implements Serializable {
@Parameter(names = {"--base-path-prefix"},
description = "base path prefix for multi table support via HoodieMultiTableDeltaStreamer class")
public String basePathPrefix;
@Parameter(names = {"--target-table"}, description = "name of the target table in Hive", required = true)
public String targetTableName;
@Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ", required = true)
public String tableType;
@Parameter(names = {"--config-folder"}, description = "Path to folder which contains all the properties file", required = true)
public String configFolder;
@Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+ "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
+ "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
+ "to individual classes, for supported properties.")
public String propsFilePath =
"file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";
@Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+ "(using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter")
public List<String> configs = new ArrayList<>();
@Parameter(names = {"--source-class"},
description = "Subclass of org.apache.hudi.utilities.sources to read data. "
+ "Built-in options: org.apache.hudi.utilities.sources.{JsonDFSSource (default), AvroDFSSource, "
+ "JsonKafkaSource, AvroKafkaSource, HiveIncrPullSource}")
public String sourceClassName = JsonDFSSource.class.getName();
@Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how"
+ " to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record")
public String sourceOrderingField = "ts";
@Parameter(names = {"--payload-class"}, description = "subclass of HoodieRecordPayload, that works off "
+ "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value")
public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName();
@Parameter(names = {"--schemaprovider-class"}, description = "subclass of org.apache.hudi.utilities.schema"
+ ".SchemaProvider to attach schemas to input & target table data, built in options: "
+ "org.apache.hudi.utilities.schema.FilebasedSchemaProvider."
+ "Source (See org.apache.hudi.utilities.sources.Source) implementation can implement their own SchemaProvider."
+ " For Sources that return Dataset<Row>, the schema is obtained implicitly. "
+ "However, this CLI option allows overriding the schemaprovider returned by Source.")
public String schemaProviderClassName = null;
@Parameter(names = {"--transformer-class"},
description = "A subclass or a list of subclasses of org.apache.hudi.utilities.transform.Transformer"
+ ". Allows transforming raw source Dataset to a target Dataset (conforming to target schema) before "
+ "writing. Default : Not set. E:g - org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which "
+ "allows a SQL query templated to be passed as a transformation function). "
+ "Pass a comma-separated list of subclass names to chain the transformations.",
converter = HoodieDeltaStreamer.TransformersConverter.class)
public List<String> transformerClassNames = null;
@Parameter(names = {"--source-limit"}, description = "Maximum amount of data to read from source. "
+ "Default: No limit For e.g: DFS-Source => max bytes to read, Kafka-Source => max events to read")
public long sourceLimit = Long.MAX_VALUE;
@Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input "
+ "is purely new data/inserts to gain speed)", converter = HoodieDeltaStreamer.OperationConvertor.class)
public HoodieDeltaStreamer.Operation operation = HoodieDeltaStreamer.Operation.UPSERT;
@Parameter(names = {"--filter-dupes"},
description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert")
public Boolean filterDupes = false;
@Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive")
public Boolean enableHiveSync = false;
@Parameter(names = {"--max-pending-compactions"},
description = "Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unless"
+ "outstanding compactions is less than this number")
public Integer maxPendingCompactions = 5;
@Parameter(names = {"--continuous"}, description = "Delta Streamer runs in continuous mode running"
+ " source-fetch -> Transform -> Hudi Write in loop")
public Boolean continuousMode = false;
@Parameter(names = {"--min-sync-interval-seconds"},
description = "the min sync interval of each sync in continuous mode")
public Integer minSyncIntervalSeconds = 0;
@Parameter(names = {"--spark-master"}, description = "spark master to use.")
public String sparkMaster = "local[2]";
@Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written")
public Boolean commitOnErrors = false;
@Parameter(names = {"--delta-sync-scheduling-weight"},
description = "Scheduling weight for delta sync as defined in "
+ "https://spark.apache.org/docs/latest/job-scheduling.html")
public Integer deltaSyncSchedulingWeight = 1;
@Parameter(names = {"--compact-scheduling-weight"}, description = "Scheduling weight for compaction as defined in "
+ "https://spark.apache.org/docs/latest/job-scheduling.html")
public Integer compactSchedulingWeight = 1;
@Parameter(names = {"--delta-sync-scheduling-minshare"}, description = "Minshare for delta sync as defined in "
+ "https://spark.apache.org/docs/latest/job-scheduling.html")
public Integer deltaSyncSchedulingMinShare = 0;
@Parameter(names = {"--compact-scheduling-minshare"}, description = "Minshare for compaction as defined in "
+ "https://spark.apache.org/docs/latest/job-scheduling.html")
public Integer compactSchedulingMinShare = 0;
/**
* Compaction is enabled for MoR table by default. This flag disables it
*/
@Parameter(names = {"--disable-compaction"},
description = "Compaction is enabled for MoR table by default. This flag disables it ")
public Boolean forceDisableCompaction = false;
/**
* Resume Delta Streamer from this checkpoint.
*/
@Parameter(names = {"--checkpoint"}, description = "Resume Delta Streamer from this checkpoint.")
public String checkpoint = null;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
}
/**
* Resets target table name and target path using base-path-prefix.
* @param configuration
* @param database
* @param tableName
* @return
*/
private static String resetTarget(Config configuration, String database, String tableName) {
String basePathPrefix = configuration.basePathPrefix;
basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' ? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
String targetBasePath = basePathPrefix + Constants.FILEDELIMITER + database + Constants.FILEDELIMITER + tableName;
configuration.targetTableName = database + Constants.DELIMITER + tableName;
return targetBasePath;
}
/*
Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync
*/
public void sync() {
for (TableExecutionContext context : tableExecutionContexts) {
try {
new HoodieDeltaStreamer(context.getConfig(), jssc, context.getProperties()).sync();
successTables.add(Helpers.getTableWithDatabase(context));
} catch (Exception e) {
logger.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
failedTables.add(Helpers.getTableWithDatabase(context));
}
}
logger.info("Ingestion was successful for topics: " + successTables);
if (!failedTables.isEmpty()) {
logger.info("Ingestion failed for topics: " + failedTables);
}
}
public static class Constants {
public static final String KAFKA_TOPIC_PROP = "hoodie.deltastreamer.source.kafka.topic";
private static final String SOURCE_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url";
private static final String TARGET_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.targetUrl";
public static final String HIVE_SYNC_TABLE_PROP = "hoodie.datasource.hive_sync.table";
private static final String SCHEMA_REGISTRY_BASE_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.baseUrl";
private static final String SCHEMA_REGISTRY_URL_SUFFIX_PROP = "hoodie.deltastreamer.schemaprovider.registry.urlSuffix";
private static final String TABLES_TO_BE_INGESTED_PROP = "hoodie.deltastreamer.ingestion.tablesToBeIngested";
private static final String INGESTION_PREFIX = "hoodie.deltastreamer.ingestion.";
private static final String INGESTION_CONFIG_SUFFIX = ".configFile";
private static final String DEFAULT_CONFIG_FILE_NAME_SUFFIX = "_config.properties";
private static final String TARGET_BASE_PATH_PROP = "hoodie.deltastreamer.ingestion.targetBasePath";
private static final String LOCAL_SPARK_MASTER = "local[2]";
private static final String FILEDELIMITER = "/";
private static final String DELIMITER = ".";
private static final String UNDERSCORE = "_";
private static final String COMMA_SEPARATOR = ",";
}
public Set<String> getSuccessTables() {
return successTables;
}
public Set<String> getFailedTables() {
return failedTables;
}
public List<TableExecutionContext> getTableExecutionContexts() {
return this.tableExecutionContexts;
}
}

View File

@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.common.config.TypedProperties;
import java.util.Objects;
/**
* Wrapper over TableConfig objects.
* Useful for incrementally syncing multiple tables one by one via HoodieMultiTableDeltaStreamer.java class.
*/
public class TableExecutionContext {
private TypedProperties properties;
private HoodieDeltaStreamer.Config config;
private String database;
private String tableName;
public HoodieDeltaStreamer.Config getConfig() {
return config;
}
public void setConfig(HoodieDeltaStreamer.Config config) {
this.config = config;
}
public String getDatabase() {
return database;
}
public void setDatabase(String database) {
this.database = database;
}
public String getTableName() {
return tableName;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}
public TypedProperties getProperties() {
return properties;
}
public void setProperties(TypedProperties properties) {
this.properties = properties;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TableExecutionContext that = (TableExecutionContext) o;
return Objects.equals(properties, that.properties) && Objects.equals(database, that.database) && Objects.equals(tableName, that.tableName);
}
@Override
public int hashCode() {
return Objects.hash(properties, database, tableName);
}
}

View File

@@ -69,6 +69,7 @@ import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF4; import org.apache.spark.sql.api.java.UDF4;
import org.apache.spark.sql.functions; import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@@ -100,6 +101,10 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private static final Random RANDOM = new Random(); private static final Random RANDOM = new Random();
private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties"; private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties";
public static final String PROPS_FILENAME_TEST_SOURCE1 = "test-source1.properties";
public static final String PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1 = "test-invalid-hive-sync-source1.properties";
public static final String PROPS_INVALID_FILE = "test-invalid-props.properties";
public static final String PROPS_INVALID_TABLE_CONFIG_FILE = "test-invalid-table-config.properties";
private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties"; private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
private static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties"; private static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties";
private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties"; private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
@@ -107,6 +112,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private static final int PARQUET_NUM_RECORDS = 5; private static final int PARQUET_NUM_RECORDS = 5;
private static final int CSV_NUM_RECORDS = 3; private static final int CSV_NUM_RECORDS = 3;
private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class); private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class);
public static KafkaTestUtils testUtils;
private static int testNum = 1; private static int testNum = 1;
@@ -114,9 +120,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
public static void initClass() throws Exception { public static void initClass() throws Exception {
UtilitiesTestBase.initClass(true); UtilitiesTestBase.initClass(true);
PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles"; PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
testUtils = new KafkaTestUtils();
testUtils.setup();
// prepare the configs. // prepare the configs.
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/config/base.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs, UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs,
dfsBasePath + "/sql-transformer.properties"); dfsBasePath + "/sql-transformer.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc");
@@ -124,6 +133,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target-flattened.avsc", dfs, dfsBasePath + "/target-flattened.avsc"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target-flattened.avsc", dfs, dfsBasePath + "/target-flattened.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_short_trip_uber.avsc", dfs, dfsBasePath + "/source_short_trip_uber.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_uber.avsc", dfs, dfsBasePath + "/source_uber.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target_short_trip_uber.avsc", dfs, dfsBasePath + "/target_short_trip_uber.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target_uber.avsc", dfs, dfsBasePath + "/target_uber.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/invalid_hive_sync_uber_config.properties", dfs, dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/uber_config.properties", dfs, dfsBasePath + "/config/uber_config.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/short_trip_uber_config.properties", dfs, dfsBasePath + "/config/short_trip_uber_config.properties");
TypedProperties props = new TypedProperties(); TypedProperties props = new TypedProperties();
props.setProperty("include", "sql-transformer.properties"); props.setProperty("include", "sql-transformer.properties");
props.setProperty("hoodie.datasource.write.keygenerator.class", TestGenerator.class.getName()); props.setProperty("hoodie.datasource.write.keygenerator.class", TestGenerator.class.getName());
@@ -163,11 +180,54 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID); UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID);
TypedProperties props1 = new TypedProperties();
populateCommonProps(props1);
UtilitiesTestBase.Helpers.savePropsToDFS(props1, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE1);
TypedProperties properties = new TypedProperties();
populateInvalidTableConfigFilePathProps(properties);
UtilitiesTestBase.Helpers.savePropsToDFS(properties, dfs, dfsBasePath + "/" + PROPS_INVALID_TABLE_CONFIG_FILE);
TypedProperties invalidHiveSyncProps = new TypedProperties();
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber");
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs, dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1);
prepareParquetDFSFiles(PARQUET_NUM_RECORDS); prepareParquetDFSFiles(PARQUET_NUM_RECORDS);
} }
private static void populateInvalidTableConfigFilePathProps(TypedProperties props) {
props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd");
props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber");
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_uber_config.properties");
}
private static void populateCommonProps(TypedProperties props) {
props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd");
props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber");
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/uber_config.properties");
props.setProperty("hoodie.deltastreamer.ingestion.short_trip_db.dummy_table_short_trip.configFile", dfsBasePath + "/config/short_trip_uber_config.properties");
//Kafka source properties
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000));
// Hive Configs
props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), "jdbc:hive2://127.0.0.1:9999/");
props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), "testdb2");
props.setProperty(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY(), "false");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "datestr");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
MultiPartKeysValueExtractor.class.getName());
}
@AfterClass @AfterClass
public static void cleanupClass() throws Exception { public static void cleanupClass() {
UtilitiesTestBase.cleanupClass(); UtilitiesTestBase.cleanupClass();
} }
@@ -649,7 +709,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
String path = PARQUET_SOURCE_ROOT + "/1.parquet"; String path = PARQUET_SOURCE_ROOT + "/1.parquet";
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
Helpers.saveParquetToDFS(Helpers.toGenericRecords( Helpers.saveParquetToDFS(Helpers.toGenericRecords(
dataGenerator.generateInserts("000", numRecords), dataGenerator), new Path(path)); dataGenerator.generateInserts("000", numRecords)), new Path(path));
} }
private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException { private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException {

View File

@@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.TableExecutionContext;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer {
private static volatile Logger log = LogManager.getLogger(TestHoodieMultiTableDeltaStreamer.class);
static class TestHelpers {
static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync) {
HoodieMultiTableDeltaStreamer.Config config = new HoodieMultiTableDeltaStreamer.Config();
config.configFolder = configFolder;
config.targetTableName = "dummy_table";
config.basePathPrefix = dfsBasePath + "/multi_table_dataset";
config.propsFilePath = dfsBasePath + "/" + fileName;
config.tableType = "COPY_ON_WRITE";
config.sourceClassName = sourceClassName;
config.sourceOrderingField = "timestamp";
config.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
config.enableHiveSync = enableHiveSync;
return config;
}
}
@Test
public void testInvalidHiveSyncProps() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1,dfsBasePath + "/config", TestDataSource.class.getName(), true);
try {
new HoodieMultiTableDeltaStreamer(cfg, jsc);
fail("Should fail when hive sync table not provided with enableHiveSync flag");
} catch (HoodieException he) {
log.error("Expected error when creating table execution objects", he);
assertTrue(he.getMessage().contains("Hive sync table field not provided!"));
}
}
@Test
public void testInvalidPropsFilePath() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_FILE,dfsBasePath + "/config", TestDataSource.class.getName(), true);
try {
new HoodieMultiTableDeltaStreamer(cfg, jsc);
fail("Should fail when invalid props file is provided");
} catch (IllegalArgumentException iae) {
log.error("Expected error when creating table execution objects", iae);
assertTrue(iae.getMessage().contains("Please provide valid common config file path!"));
}
}
@Test
public void testInvalidTableConfigFilePath() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_TABLE_CONFIG_FILE,dfsBasePath + "/config", TestDataSource.class.getName(), true);
try {
new HoodieMultiTableDeltaStreamer(cfg, jsc);
fail("Should fail when invalid table config props file path is provided");
} catch (IllegalArgumentException iae) {
log.error("Expected error when creating table execution objects", iae);
assertTrue(iae.getMessage().contains("Please provide valid table config file path!"));
}
}
@Test
public void testCustomConfigProps() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1,dfsBasePath + "/config", TestDataSource.class.getName(), false);
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
TableExecutionContext executionContext = streamer.getTableExecutionContexts().get(1);
assertEquals(streamer.getTableExecutionContexts().size(), 2);
assertEquals(executionContext.getConfig().targetBasePath, dfsBasePath + "/multi_table_dataset/uber_db/dummy_table_uber");
assertEquals(executionContext.getConfig().targetTableName, "uber_db.dummy_table_uber");
assertEquals(executionContext.getProperties().getString(HoodieMultiTableDeltaStreamer.Constants.KAFKA_TOPIC_PROP), "topic1");
assertEquals(executionContext.getProperties().getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()), "_row_key");
assertEquals(executionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY()), TestHoodieDeltaStreamer.TestGenerator.class.getName());
assertEquals(executionContext.getProperties().getString(HoodieMultiTableDeltaStreamer.Constants.HIVE_SYNC_TABLE_PROP), "uber_hive_dummy_table");
}
@Test
@Ignore
public void testInvalidIngestionProps() {
try {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1,dfsBasePath + "/config", TestDataSource.class.getName(), true);
new HoodieMultiTableDeltaStreamer(cfg, jsc);
fail("Creation of execution object should fail without kafka topic");
} catch (Exception e) {
log.error("Creation of execution object failed with error: " + e.getMessage(), e);
assertTrue(e.getMessage().contains("Please provide valid table config arguments!"));
}
}
@Test //0 corresponds to fg
public void testMultiTableExecution() throws IOException {
//create topics for each table
testUtils.createTopic("topic1", 2);
testUtils.createTopic("topic2", 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.sendMessages("topic1", Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, HoodieTestDataGenerator.TRIP_SCHEMA)));
testUtils.sendMessages("topic2", Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1,dfsBasePath + "/config", JsonKafkaSource.class.getName(), false);
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
List<TableExecutionContext> executionContexts = streamer.getTableExecutionContexts();
TypedProperties properties = executionContexts.get(1).getProperties();
properties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc");
properties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc");
executionContexts.get(1).setProperties(properties);
TypedProperties properties1 = executionContexts.get(0).getProperties();
properties1.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_short_trip_uber.avsc");
properties1.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_short_trip_uber.avsc");
executionContexts.get(0).setProperties(properties1);
String targetBasePath1 = executionContexts.get(1).getConfig().targetBasePath;
String targetBasePath2 = executionContexts.get(0).getConfig().targetBasePath;
streamer.sync();
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1 + "/*/*.parquet", sqlContext);
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2 + "/*/*.parquet", sqlContext);
//insert updates for already existing records in kafka topics
testUtils.sendMessages("topic1", Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5, HoodieTestDataGenerator.TRIP_SCHEMA)));
testUtils.sendMessages("topic2", Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
streamer.sync();
assertEquals(streamer.getSuccessTables().size(), 2);
assertTrue(streamer.getFailedTables().isEmpty());
//assert the record count matches now
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1 + "/*/*.parquet", sqlContext);
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2 + "/*/*.parquet", sqlContext);
}
}

View File

@@ -83,6 +83,7 @@ public class UtilitiesTestBase {
protected transient SparkSession sparkSession = null; protected transient SparkSession sparkSession = null;
protected transient SQLContext sqlContext; protected transient SQLContext sqlContext;
protected static HiveServer2 hiveServer; protected static HiveServer2 hiveServer;
protected static HiveTestService hiveTestService;
private static ObjectMapper mapper = new ObjectMapper(); private static ObjectMapper mapper = new ObjectMapper();
@BeforeClass @BeforeClass
@@ -97,20 +98,23 @@ public class UtilitiesTestBase {
dfsBasePath = dfs.getWorkingDirectory().toString(); dfsBasePath = dfs.getWorkingDirectory().toString();
dfs.mkdirs(new Path(dfsBasePath)); dfs.mkdirs(new Path(dfsBasePath));
if (startHiveService) { if (startHiveService) {
HiveTestService hiveService = new HiveTestService(hdfsTestService.getHadoopConf()); hiveTestService = new HiveTestService(hdfsTestService.getHadoopConf());
hiveServer = hiveService.start(); hiveServer = hiveTestService.start();
clearHiveDb(); clearHiveDb();
} }
} }
@AfterClass @AfterClass
public static void cleanupClass() throws Exception { public static void cleanupClass() {
if (hdfsTestService != null) { if (hdfsTestService != null) {
hdfsTestService.stop(); hdfsTestService.stop();
} }
if (hiveServer != null) { if (hiveServer != null) {
hiveServer.stop(); hiveServer.stop();
} }
if (hiveTestService != null) {
hiveTestService.stop();
}
} }
@Before @Before
@@ -263,20 +267,19 @@ public class UtilitiesTestBase {
return props; return props;
} }
public static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, HoodieTestDataGenerator dataGenerator) { public static GenericRecord toGenericRecord(HoodieRecord hoodieRecord) {
try { try {
Option<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.AVRO_SCHEMA); Option<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA);
return (GenericRecord) recordOpt.get(); return (GenericRecord) recordOpt.get();
} catch (IOException e) { } catch (IOException e) {
return null; return null;
} }
} }
public static List<GenericRecord> toGenericRecords(List<HoodieRecord> hoodieRecords, public static List<GenericRecord> toGenericRecords(List<HoodieRecord> hoodieRecords) {
HoodieTestDataGenerator dataGenerator) {
List<GenericRecord> records = new ArrayList<GenericRecord>(); List<GenericRecord> records = new ArrayList<GenericRecord>();
for (HoodieRecord hoodieRecord : hoodieRecords) { for (HoodieRecord hoodieRecord : hoodieRecords) {
records.add(toGenericRecord(hoodieRecord, dataGenerator)); records.add(toGenericRecord(hoodieRecord));
} }
return records; return records;
} }

View File

@@ -88,7 +88,7 @@ public abstract class AbstractBaseTestSource extends AvroSource {
HoodieTestDataGenerator dataGenerator = dataGeneratorMap.get(partition); HoodieTestDataGenerator dataGenerator = dataGeneratorMap.get(partition);
// generate `sourceLimit` number of upserts each time. // generate `sourceLimit` number of upserts each time.
int numExistingKeys = dataGenerator.getNumExistingKeys(); int numExistingKeys = dataGenerator.getNumExistingKeys(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
LOG.info("NumExistingKeys=" + numExistingKeys); LOG.info("NumExistingKeys=" + numExistingKeys);
int numUpdates = Math.min(numExistingKeys, sourceLimit / 2); int numUpdates = Math.min(numExistingKeys, sourceLimit / 2);
@@ -116,21 +116,22 @@ public abstract class AbstractBaseTestSource extends AvroSource {
LOG.info("After adjustments => NumInserts=" + numInserts + ", NumUpdates=" + (numUpdates - 50) + ", NumDeletes=50, maxUniqueRecords=" LOG.info("After adjustments => NumInserts=" + numInserts + ", NumUpdates=" + (numUpdates - 50) + ", NumDeletes=50, maxUniqueRecords="
+ maxUniqueKeys); + maxUniqueKeys);
// if we generate update followed by deletes -> some keys in update batch might be picked up for deletes. Hence generating delete batch followed by updates // if we generate update followed by deletes -> some keys in update batch might be picked up for deletes. Hence generating delete batch followed by updates
deleteStream = dataGenerator.generateUniqueDeleteRecordStream(instantTime, 50).map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator)); deleteStream = dataGenerator.generateUniqueDeleteRecordStream(instantTime, 50).map(AbstractBaseTestSource::toGenericRecord);
updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, numUpdates - 50).map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator)); updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, numUpdates - 50, HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.map(AbstractBaseTestSource::toGenericRecord);
} else { } else {
LOG.info("After adjustments => NumInserts=" + numInserts + ", NumUpdates=" + numUpdates + ", maxUniqueRecords=" + maxUniqueKeys); LOG.info("After adjustments => NumInserts=" + numInserts + ", NumUpdates=" + numUpdates + ", maxUniqueRecords=" + maxUniqueKeys);
updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, numUpdates) updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, numUpdates, HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator)); .map(AbstractBaseTestSource::toGenericRecord);
} }
Stream<GenericRecord> insertStream = dataGenerator.generateInsertsStream(instantTime, numInserts, false) Stream<GenericRecord> insertStream = dataGenerator.generateInsertsStream(instantTime, numInserts, false, HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator)); .map(AbstractBaseTestSource::toGenericRecord);
return Stream.concat(deleteStream, Stream.concat(updateStream, insertStream)); return Stream.concat(deleteStream, Stream.concat(updateStream, insertStream));
} }
private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, HoodieTestDataGenerator dataGenerator) { private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord) {
try { try {
Option<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.AVRO_SCHEMA); Option<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA);
return (GenericRecord) recordOpt.get(); return (GenericRecord) recordOpt.get();
} catch (IOException e) { } catch (IOException e) {
return null; return null;

View File

@@ -64,7 +64,7 @@ public abstract class AbstractDFSSourceTestBase extends UtilitiesTestBase {
} }
@AfterClass @AfterClass
public static void cleanupClass() throws Exception { public static void cleanupClass() {
UtilitiesTestBase.cleanupClass(); UtilitiesTestBase.cleanupClass();
} }

View File

@@ -42,7 +42,6 @@ import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.UUID; import java.util.UUID;
@@ -64,7 +63,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
} }
@AfterClass @AfterClass
public static void cleanupClass() throws Exception { public static void cleanupClass() {
UtilitiesTestBase.cleanupClass(); UtilitiesTestBase.cleanupClass();
} }
@@ -95,7 +94,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
} }
@Test @Test
public void testJsonKafkaSource() throws IOException { public void testJsonKafkaSource() {
// topic setup. // topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2); testUtils.createTopic(TEST_TOPIC_NAME, 2);
@@ -143,7 +142,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
} }
@Test @Test
public void testJsonKafkaSourceWithDefaultUpperCap() throws IOException { public void testJsonKafkaSourceWithDefaultUpperCap() {
// topic setup. // topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2); testUtils.createTopic(TEST_TOPIC_NAME, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
@@ -172,7 +171,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
} }
@Test @Test
public void testJsonKafkaSourceWithConfigurableUpperCap() throws IOException { public void testJsonKafkaSourceWithConfigurableUpperCap() {
// topic setup. // topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2); testUtils.createTopic(TEST_TOPIC_NAME, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();

View File

@@ -48,6 +48,6 @@ public class TestParquetDFSSource extends AbstractDFSSourceTestBase {
@Override @Override
void writeNewDataToFile(List<HoodieRecord> records, Path path) throws IOException { void writeNewDataToFile(List<HoodieRecord> records, Path path) throws IOException {
Helpers.saveParquetToDFS(Helpers.toGenericRecords(records, dataGenerator), path); Helpers.saveParquetToDFS(Helpers.toGenericRecords(records), path);
} }
} }

View File

@@ -0,0 +1,23 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
include=base.properties
hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.partitionpath.field=created_at
hoodie.deltastreamer.source.kafka.topic=test_topic
hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd

View File

@@ -0,0 +1,24 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
include=base.properties
hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.partitionpath.field=created_at
hoodie.deltastreamer.source.kafka.topic=topic2
hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S
hoodie.datasource.hive_sync.table=short_trip_uber_hive_dummy_table

View File

@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"type" : "record",
"name" : "shortTripRec",
"fields" : [
{
"name" : "timestamp",
"type" : "double"
}, {
"name" : "_row_key",
"type" : "string"
}, {
"name" : "rider",
"type" : "string"
}, {
"name" : "driver",
"type" : "string"
}, {
"name" : "fare",
"type" : "double"
},
{
"name" : "_hoodie_is_deleted",
"type" : "boolean",
"default" : false
} ]
}

View File

@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"type" : "record",
"name" : "tripUberRec",
"fields" : [
{
"name" : "timestamp",
"type" : "double"
}, {
"name" : "_row_key",
"type" : "string"
}, {
"name" : "rider",
"type" : "string"
}, {
"name" : "driver",
"type" : "string"
}, {
"name" : "fare",
"type" : "double"
},
{
"name" : "_hoodie_is_deleted",
"type" : "boolean",
"default" : false
} ]
}

View File

@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"type" : "record",
"name" : "shortTripRec",
"fields" : [
{
"name" : "timestamp",
"type" : "double"
}, {
"name" : "_row_key",
"type" : "string"
}, {
"name" : "rider",
"type" : "string"
}, {
"name" : "driver",
"type" : "string"
}, {
"name" : "fare",
"type" : "double"
},
{
"name" : "_hoodie_is_deleted",
"type" : "boolean",
"default" : false
} ]
}

View File

@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"type" : "record",
"name" : "tripUberRec",
"fields" : [
{
"name" : "timestamp",
"type" : "double"
}, {
"name" : "_row_key",
"type" : "string"
}, {
"name" : "rider",
"type" : "string"
}, {
"name" : "driver",
"type" : "string"
}, {
"name" : "fare",
"type" : "double"
},
{
"name" : "_hoodie_is_deleted",
"type" : "boolean",
"default" : false
} ]
}

View File

@@ -0,0 +1,25 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
include=base.properties
hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.partitionpath.field=created_at
hoodie.deltastreamer.source.kafka.topic=topic1
hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S
hoodie.datasource.hive_sync.database=uber_hive_db
hoodie.datasource.hive_sync.table=uber_hive_dummy_table