From d610252d6b54dcbb1897bb8881d59c3838a46c18 Mon Sep 17 00:00:00 2001 From: Pratyaksh Sharma Date: Wed, 8 Apr 2020 04:40:26 +0530 Subject: [PATCH] [HUDI-288]: Add support for ingesting multiple kafka streams in a single DeltaStreamer deployment (#1150) * [HUDI-288]: Add support for ingesting multiple kafka streams in a single DeltaStreamer deployment --- .../hudi/table/HoodieCommitArchiveLog.java | 10 +- .../org/apache/hudi/client/TestMultiFS.java | 4 +- .../hudi/common/HoodieTestDataGenerator.java | 151 ++++++- .../apache/hudi/hive/TestHiveSyncTool.java | 6 + .../java/org/apache/hudi/hive/TestUtil.java | 9 +- .../hudi/hive/util/HiveTestService.java | 25 ++ .../apache/hudi/integ/ITTestHoodieDemo.java | 8 +- .../org/apache/hudi/DataSourceOptions.scala | 1 + .../utilities/deltastreamer/DeltaSync.java | 9 +- .../deltastreamer/HoodieDeltaStreamer.java | 32 +- .../HoodieMultiTableDeltaStreamer.java | 393 ++++++++++++++++++ .../deltastreamer/TableExecutionContext.java | 85 ++++ .../utilities/TestHoodieDeltaStreamer.java | 64 ++- .../TestHoodieMultiTableDeltaStreamer.java | 166 ++++++++ .../hudi/utilities/UtilitiesTestBase.java | 19 +- .../sources/AbstractBaseTestSource.java | 19 +- .../sources/AbstractDFSSourceTestBase.java | 2 +- .../utilities/sources/TestKafkaSource.java | 9 +- .../sources/TestParquetDFSSource.java | 2 +- .../invalid_hive_sync_uber_config.properties | 23 + .../short_trip_uber_config.properties | 24 ++ .../source_short_trip_uber.avsc | 44 ++ .../delta-streamer-config/source_uber.avsc | 44 ++ .../target_short_trip_uber.avsc | 44 ++ .../delta-streamer-config/target_uber.avsc | 44 ++ .../uber_config.properties | 25 ++ 26 files changed, 1184 insertions(+), 78 deletions(-) create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/TableExecutionContext.java create mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMultiTableDeltaStreamer.java create mode 100644 hudi-utilities/src/test/resources/delta-streamer-config/invalid_hive_sync_uber_config.properties create mode 100644 hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties create mode 100644 hudi-utilities/src/test/resources/delta-streamer-config/source_short_trip_uber.avsc create mode 100644 hudi-utilities/src/test/resources/delta-streamer-config/source_uber.avsc create mode 100644 hudi-utilities/src/test/resources/delta-streamer-config/target_short_trip_uber.avsc create mode 100644 hudi-utilities/src/test/resources/delta-streamer-config/target_uber.avsc create mode 100644 hudi-utilities/src/test/resources/delta-streamer-config/uber_config.properties diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java index 635e96b2b..73dd7999b 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java @@ -292,7 +292,8 @@ public class HoodieCommitArchiveLog { archivedMetaWrapper.setActionType(ActionType.clean.name()); break; } - case HoodieTimeline.COMMIT_ACTION: { + case HoodieTimeline.COMMIT_ACTION: + case HoodieTimeline.DELTA_COMMIT_ACTION: { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class); archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata)); @@ -311,13 +312,6 @@ public class HoodieCommitArchiveLog { archivedMetaWrapper.setActionType(ActionType.savepoint.name()); break; } - case HoodieTimeline.DELTA_COMMIT_ACTION: { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata - .fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class); - archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata)); - archivedMetaWrapper.setActionType(ActionType.commit.name()); - break; - } case HoodieTimeline.COMPACTION_ACTION: { HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, hoodieInstant.getTimestamp()); archivedMetaWrapper.setHoodieCompactionPlan(plan); diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java index 0e606e4bd..c6ec5232a 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java @@ -68,7 +68,7 @@ public class TestMultiFS extends HoodieClientTestHarness { cleanupTestDataGenerator(); } - private HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig config) throws Exception { + private HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig config) { return new HoodieWriteClient(jsc, config); } @@ -89,7 +89,7 @@ public class TestMultiFS extends HoodieClientTestHarness { HoodieWriteConfig localConfig = getHoodieWriteConfig(tablePath); try (HoodieWriteClient hdfsWriteClient = getHoodieWriteClient(cfg); - HoodieWriteClient localWriteClient = getHoodieWriteClient(localConfig);) { + HoodieWriteClient localWriteClient = getHoodieWriteClient(localConfig)) { // Write generated data to hdfs (only inserts) String readCommitTime = hdfsWriteClient.startCommit(); diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java index 087587ab0..8e9036a1f 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java @@ -41,6 +41,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.io.IOException; import java.io.Serializable; @@ -69,6 +71,7 @@ public class HoodieTestDataGenerator { // based on examination of sample file, the schema produces the following per record size public static final int SIZE_PER_RECORD = 50 * 1024; + private static Logger logger = LogManager.getLogger(HoodieTestDataGenerator.class); public static final String DEFAULT_FIRST_PARTITION_PATH = "2016/03/15"; public static final String DEFAULT_SECOND_PARTITION_PATH = "2015/03/16"; public static final String DEFAULT_THIRD_PARTITION_PATH = "2015/03/17"; @@ -89,12 +92,18 @@ public class HoodieTestDataGenerator { public static final String TIP_NESTED_SCHEMA = "{\"name\": \"tip_history\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"record\", \"name\": \"tip_history\", \"fields\": [" + "{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},"; public static final String MAP_TYPE_SCHEMA = "{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},"; - public static final String TRIP_EXAMPLE_SCHEMA = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX; public static final String TRIP_FLATTENED_SCHEMA = TRIP_SCHEMA_PREFIX + FARE_FLATTENED_SCHEMA + TRIP_SCHEMA_SUFFIX; + public static final String TRIP_SCHEMA = "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[" + + "{\"name\":\"timestamp\",\"type\":\"double\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"}," + + "{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}"; + public static final String SHORT_TRIP_SCHEMA = "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[" + + "{\"name\":\"timestamp\",\"type\":\"double\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"}," + + "{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}"; + public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString(); public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double," + "map,struct,array>,boolean"; @@ -102,13 +111,17 @@ public class HoodieTestDataGenerator { public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS = HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA); + public static final Schema AVRO_SHORT_TRIP_SCHEMA = new Schema.Parser().parse(SHORT_TRIP_SCHEMA); + public static final Schema AVRO_TRIP_SCHEMA = new Schema.Parser().parse(TRIP_SCHEMA); public static final Schema FLATTENED_AVRO_SCHEMA = new Schema.Parser().parse(TRIP_FLATTENED_SCHEMA); private static final Random RAND = new Random(46474747); - private final Map existingKeys; + //Maintains all the existing keys schema wise + private final Map> existingKeysBySchema; private final String[] partitionPaths; - private int numExistingKeys; + //maintains the count of existing keys schema wise + private Map numKeysBySchema; public HoodieTestDataGenerator(String[] partitionPaths) { this(partitionPaths, new HashMap<>()); @@ -120,7 +133,9 @@ public class HoodieTestDataGenerator { public HoodieTestDataGenerator(String[] partitionPaths, Map keyPartitionMap) { this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length); - this.existingKeys = keyPartitionMap; + this.existingKeysBySchema = new HashMap<>(); + existingKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap); + numKeysBySchema = new HashMap<>(); } public static void writePartitionMetadata(FileSystem fs, String[] partitionPaths, String basePath) { @@ -129,6 +144,18 @@ public class HoodieTestDataGenerator { } } + public TestRawTripPayload generateRandomValueAsPerSchema(String schemaStr, HoodieKey key, String commitTime, boolean isFlattened) throws IOException { + if (TRIP_EXAMPLE_SCHEMA.equals(schemaStr)) { + return generateRandomValue(key, commitTime, isFlattened); + } else if (TRIP_SCHEMA.equals(schemaStr)) { + return generatePayloadForTripSchema(key, commitTime); + } else if (SHORT_TRIP_SCHEMA.equals(schemaStr)) { + return generatePayloadForShortTripSchema(key, commitTime); + } + + return null; + } + /** * Generates a new avro record of the above nested schema format, * retaining the key if optionally provided. @@ -160,6 +187,19 @@ public class HoodieTestDataGenerator { return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA); } + /** + * Generates a new avro record with TRIP_SCHEMA, retaining the key if optionally provided. + */ + public TestRawTripPayload generatePayloadForTripSchema(HoodieKey key, String commitTime) throws IOException { + GenericRecord rec = generateRecordForTripSchema(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0); + return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_SCHEMA); + } + + public TestRawTripPayload generatePayloadForShortTripSchema(HoodieKey key, String commitTime) throws IOException { + GenericRecord rec = generateRecordForShortTripSchema(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0); + return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), SHORT_TRIP_SCHEMA); + } + /** * Generates a new avro record of the above schema format for a delete. */ @@ -223,6 +263,31 @@ public class HoodieTestDataGenerator { return rec; } + /* + Generate random record using TRIP_SCHEMA + */ + public GenericRecord generateRecordForTripSchema(String rowKey, String riderName, String driverName, double timestamp) { + GenericRecord rec = new GenericData.Record(AVRO_TRIP_SCHEMA); + rec.put("_row_key", rowKey); + rec.put("timestamp", timestamp); + rec.put("rider", riderName); + rec.put("driver", driverName); + rec.put("fare", RAND.nextDouble() * 100); + rec.put("_hoodie_is_deleted", false); + return rec; + } + + public GenericRecord generateRecordForShortTripSchema(String rowKey, String riderName, String driverName, double timestamp) { + GenericRecord rec = new GenericData.Record(AVRO_SHORT_TRIP_SCHEMA); + rec.put("_row_key", rowKey); + rec.put("timestamp", timestamp); + rec.put("rider", riderName); + rec.put("driver", driverName); + rec.put("fare", RAND.nextDouble() * 100); + rec.put("_hoodie_is_deleted", false); + return rec; + } + public static void createCommitFile(String basePath, String instantTime, Configuration configuration) { Arrays.asList(HoodieTimeline.makeCommitFileName(instantTime), HoodieTimeline.makeInflightCommitFileName(instantTime), HoodieTimeline.makeRequestedCommitFileName(instantTime)) @@ -283,6 +348,10 @@ public class HoodieTestDataGenerator { } } + public List generateInsertsAsPerSchema(String commitTime, Integer n, String schemaStr) { + return generateInsertsStream(commitTime, n, schemaStr).collect(Collectors.toList()); + } + /** * Generates new inserts with nested schema, uniformly across the partition paths above. * It also updates the list of existing keys. @@ -301,15 +370,22 @@ public class HoodieTestDataGenerator { * @return List of {@link HoodieRecord}s */ public List generateInserts(String instantTime, Integer n, boolean isFlattened) { - return generateInsertsStream(instantTime, n, isFlattened).collect(Collectors.toList()); + return generateInsertsStream(instantTime, n, isFlattened, TRIP_EXAMPLE_SCHEMA).collect(Collectors.toList()); + } + + /** + * Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys. + */ + public Stream generateInsertsStream(String commitTime, Integer n, String schemaStr) { + return generateInsertsStream(commitTime, n, false, schemaStr); } /** * Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys. */ public Stream generateInsertsStream( - String instantTime, Integer n, boolean isFlattened) { - int currSize = getNumExistingKeys(); + String instantTime, Integer n, boolean isFlattened, String schemaStr) { + int currSize = getNumExistingKeys(schemaStr); return IntStream.range(0, n).boxed().map(i -> { String partitionPath = partitionPaths[RAND.nextInt(partitionPaths.length)]; @@ -317,16 +393,36 @@ public class HoodieTestDataGenerator { KeyPartition kp = new KeyPartition(); kp.key = key; kp.partitionPath = partitionPath; - existingKeys.put(currSize + i, kp); - numExistingKeys++; + populateKeysBySchema(schemaStr, currSize + i, kp); + incrementNumExistingKeysBySchema(schemaStr); try { - return new HoodieRecord(key, generateRandomValue(key, instantTime, isFlattened)); + return new HoodieRecord(key, generateRandomValueAsPerSchema(schemaStr, key, instantTime, isFlattened)); } catch (IOException e) { throw new HoodieIOException(e.getMessage(), e); } }); } + /* + Takes care of populating keys schema wise + */ + private void populateKeysBySchema(String schemaStr, int i, KeyPartition kp) { + if (existingKeysBySchema.containsKey(schemaStr)) { + existingKeysBySchema.get(schemaStr).put(i, kp); + } else { + existingKeysBySchema.put(schemaStr, new HashMap<>()); + existingKeysBySchema.get(schemaStr).put(i, kp); + } + } + + private void incrementNumExistingKeysBySchema(String schemaStr) { + if (numKeysBySchema.containsKey(schemaStr)) { + numKeysBySchema.put(schemaStr, numKeysBySchema.get(schemaStr) + 1); + } else { + numKeysBySchema.put(schemaStr, 1); + } + } + public List generateSameKeyInserts(String instantTime, List origin) throws IOException { List copy = new ArrayList<>(); for (HoodieRecord r : origin) { @@ -339,7 +435,7 @@ public class HoodieTestDataGenerator { public List generateInsertsWithHoodieAvroPayload(String instantTime, int limit) { List inserts = new ArrayList<>(); - int currSize = getNumExistingKeys(); + int currSize = getNumExistingKeys(TRIP_EXAMPLE_SCHEMA); for (int i = 0; i < limit; i++) { String partitionPath = partitionPaths[RAND.nextInt(partitionPaths.length)]; HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath); @@ -349,8 +445,8 @@ public class HoodieTestDataGenerator { KeyPartition kp = new KeyPartition(); kp.key = key; kp.partitionPath = partitionPath; - existingKeys.put(currSize + i, kp); - numExistingKeys++; + populateKeysBySchema(TRIP_EXAMPLE_SCHEMA, currSize + i, kp); + incrementNumExistingKeysBySchema(TRIP_EXAMPLE_SCHEMA); } return inserts; } @@ -431,6 +527,8 @@ public class HoodieTestDataGenerator { public List generateUpdates(String instantTime, Integer n) throws IOException { List updates = new ArrayList<>(); for (int i = 0; i < n; i++) { + Map existingKeys = existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA); + Integer numExistingKeys = numKeysBySchema.get(TRIP_EXAMPLE_SCHEMA); KeyPartition kp = existingKeys.get(RAND.nextInt(numExistingKeys - 1)); HoodieRecord record = generateUpdateRecord(kp.key, instantTime); updates.add(record); @@ -438,6 +536,10 @@ public class HoodieTestDataGenerator { return updates; } + public List generateUpdatesAsPerSchema(String commitTime, Integer n, String schemaStr) { + return generateUniqueUpdatesStream(commitTime, n, schemaStr).collect(Collectors.toList()); + } + /** * Generates deduped updates of keys previously inserted, randomly distributed across the keys above. * @@ -446,7 +548,7 @@ public class HoodieTestDataGenerator { * @return list of hoodie record updates */ public List generateUniqueUpdates(String instantTime, Integer n) { - return generateUniqueUpdatesStream(instantTime, n).collect(Collectors.toList()); + return generateUniqueUpdatesStream(instantTime, n, TRIP_EXAMPLE_SCHEMA).collect(Collectors.toList()); } /** @@ -466,8 +568,10 @@ public class HoodieTestDataGenerator { * @param n Number of unique records * @return stream of hoodie record updates */ - public Stream generateUniqueUpdatesStream(String instantTime, Integer n) { + public Stream generateUniqueUpdatesStream(String instantTime, Integer n, String schemaStr) { final Set used = new HashSet<>(); + int numExistingKeys = numKeysBySchema.getOrDefault(schemaStr, 0); + Map existingKeys = existingKeysBySchema.get(schemaStr); if (n > numExistingKeys) { throw new IllegalArgumentException("Requested unique updates is greater than number of available keys"); } @@ -480,9 +584,10 @@ public class HoodieTestDataGenerator { index = (index + 1) % numExistingKeys; kp = existingKeys.get(index); } + logger.debug("key getting updated: " + kp.key.getRecordKey()); used.add(kp); try { - return new HoodieRecord(kp.key, generateRandomValue(kp.key, instantTime)); + return new HoodieRecord(kp.key, generateRandomValueAsPerSchema(schemaStr, kp.key, instantTime, false)); } catch (IOException e) { throw new HoodieIOException(e.getMessage(), e); } @@ -497,6 +602,8 @@ public class HoodieTestDataGenerator { */ public Stream generateUniqueDeleteStream(Integer n) { final Set used = new HashSet<>(); + Map existingKeys = existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA); + Integer numExistingKeys = numKeysBySchema.get(TRIP_EXAMPLE_SCHEMA); if (n > numExistingKeys) { throw new IllegalArgumentException("Requested unique deletes is greater than number of available keys"); } @@ -514,6 +621,7 @@ public class HoodieTestDataGenerator { used.add(kp); result.add(kp.key); } + numKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, numExistingKeys); return result.stream(); } @@ -526,6 +634,8 @@ public class HoodieTestDataGenerator { */ public Stream generateUniqueDeleteRecordStream(String instantTime, Integer n) { final Set used = new HashSet<>(); + Map existingKeys = existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA); + Integer numExistingKeys = numKeysBySchema.get(TRIP_EXAMPLE_SCHEMA); if (n > numExistingKeys) { throw new IllegalArgumentException("Requested unique deletes is greater than number of available keys"); } @@ -536,7 +646,7 @@ public class HoodieTestDataGenerator { while (!existingKeys.containsKey(index)) { index = (index + 1) % numExistingKeys; } - // swap chosen index with last index and remove last entry. + // swap chosen index with last index and remove last entry. KeyPartition kp = existingKeys.remove(index); existingKeys.put(index, existingKeys.get(numExistingKeys - 1)); existingKeys.remove(numExistingKeys - 1); @@ -548,6 +658,7 @@ public class HoodieTestDataGenerator { throw new HoodieIOException(e.getMessage(), e); } } + numKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, numExistingKeys); return result.stream(); } @@ -555,8 +666,8 @@ public class HoodieTestDataGenerator { return partitionPaths; } - public int getNumExistingKeys() { - return numExistingKeys; + public int getNumExistingKeys(String schemaStr) { + return numKeysBySchema.getOrDefault(schemaStr, 0); } public static class KeyPartition implements Serializable { @@ -566,6 +677,6 @@ public class HoodieTestDataGenerator { } public void close() { - existingKeys.clear(); + existingKeysBySchema.clear(); } } diff --git a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 449c7f343..f39431495 100644 --- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -31,6 +31,7 @@ import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; import org.joda.time.DateTime; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -70,6 +71,11 @@ public class TestHiveSyncTool { TestUtil.clear(); } + @AfterClass + public static void cleanUpClass() { + TestUtil.shutdown(); + } + /** * Testing converting array types to Hive field declaration strings. According to the Parquet-113 spec: * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists diff --git a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java index 3c0f55155..cee13301c 100644 --- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java +++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java @@ -83,6 +83,7 @@ public class TestUtil { private static MiniDFSCluster dfsCluster; private static ZooKeeperServer zkServer; private static HiveServer2 hiveServer; + private static HiveTestService hiveTestService; private static Configuration configuration; static HiveSyncConfig hiveSyncConfig; private static DateTimeFormatter dtfOut; @@ -100,8 +101,8 @@ public class TestUtil { zkServer = zkService.start(); } if (hiveServer == null) { - HiveTestService hiveService = new HiveTestService(configuration); - hiveServer = hiveService.start(); + hiveTestService = new HiveTestService(configuration); + hiveServer = hiveTestService.start(); } fileSystem = FileSystem.get(configuration); @@ -139,11 +140,13 @@ public class TestUtil { return hiveServer.getHiveConf(); } - @SuppressWarnings("unused") public static void shutdown() { if (hiveServer != null) { hiveServer.stop(); } + if (hiveTestService != null) { + hiveTestService.stop(); + } if (dfsCluster != null) { dfsCluster.shutdown(); } diff --git a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/HiveTestService.java b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/HiveTestService.java index d2808d6a8..c1c355ea5 100644 --- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/HiveTestService.java +++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/HiveTestService.java @@ -121,6 +121,20 @@ public class HiveTestService { return hiveServer; } + public void stop() { + resetSystemProperties(); + if (tServer != null) { + tServer.stop(); + } + if (hiveServer != null) { + hiveServer.stop(); + } + LOG.info("Hive Minicluster service shut down."); + tServer = null; + hiveServer = null; + hadoopConf = null; + } + private HiveConf configureHive(Configuration conf, String localHiveLocation) throws IOException { conf.set("hive.metastore.local", "false"); conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://" + bindIP + ":" + metastorePort); @@ -183,6 +197,17 @@ public class HiveTestService { } } + private void resetSystemProperties() { + for (Map.Entry entry : sysProps.entrySet()) { + if (entry.getValue() != null) { + System.setProperty(entry.getKey(), entry.getValue()); + } else { + System.getProperties().remove(entry.getKey()); + } + } + sysProps.clear(); + } + private static String getHiveLocation(String baseLocation) { return baseLocation + Path.SEPARATOR + "hive"; } diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java index 5e6bc338e..01eecd0b9 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java @@ -121,14 +121,14 @@ public class ITTestHoodieDemo extends ITTestBase { + " --table-type COPY_ON_WRITE " + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts " + " --target-base-path " + COW_BASE_PATH + " --target-table " + COW_TABLE_NAME - + " --props /var/demo/config/dfs-source.properties " + + " --props /var/demo/config/dfs-source.properties" + " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider " + String.format(HIVE_SYNC_CMD_FMT, "dt", COW_TABLE_NAME), ("spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE + " --table-type MERGE_ON_READ " + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts " + " --target-base-path " + MOR_BASE_PATH + " --target-table " + MOR_TABLE_NAME - + " --props /var/demo/config/dfs-source.properties " + + " --props /var/demo/config/dfs-source.properties" + " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider " + " --disable-compaction " + String.format(HIVE_SYNC_CMD_FMT, "dt", MOR_TABLE_NAME))); @@ -173,14 +173,14 @@ public class ITTestHoodieDemo extends ITTestBase { + " --table-type COPY_ON_WRITE " + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts " + " --target-base-path " + COW_BASE_PATH + " --target-table " + COW_TABLE_NAME - + " --props /var/demo/config/dfs-source.properties " + + " --props /var/demo/config/dfs-source.properties" + " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider " + String.format(HIVE_SYNC_CMD_FMT, "dt", COW_TABLE_NAME)), ("spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE + " --table-type MERGE_ON_READ " + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts " + " --target-base-path " + MOR_BASE_PATH + " --target-table " + MOR_TABLE_NAME - + " --props /var/demo/config/dfs-source.properties " + + " --props /var/demo/config/dfs-source.properties" + " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider " + " --disable-compaction " + String.format(HIVE_SYNC_CMD_FMT, "dt", MOR_TABLE_NAME))); executeCommandStringsInDocker(ADHOC_1_CONTAINER, cmds); diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 10978e407..9d7d6cc7d 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -262,6 +262,7 @@ object DataSourceWriteOptions { val HIVE_URL_OPT_KEY = "hoodie.datasource.hive_sync.jdbcurl" val HIVE_PARTITION_FIELDS_OPT_KEY = "hoodie.datasource.hive_sync.partition_fields" val HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY = "hoodie.datasource.hive_sync.partition_extractor_class" + val HIVE_ASSUME_DATE_PARTITION_OPT_KEY = "hoodie.datasource.hive_sync.assume_date_partitioning" val HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY = "hoodie.datasource.hive_sync.use_pre_apache_input_format" // DEFAULT FOR HIVE SPECIFIC CONFIGS diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 5cc33ee75..c964c919e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -26,7 +26,6 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -153,20 +152,14 @@ public class DeltaSync implements Serializable { */ private transient HoodieWriteClient writeClient; - /** - * Table Type. - */ - private final HoodieTableType tableType; - public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, - HoodieTableType tableType, TypedProperties props, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf, + TypedProperties props, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf, Function onInitializingHoodieWriteClient) throws IOException { this.cfg = cfg; this.jssc = jssc; this.sparkSession = sparkSession; this.fs = fs; - this.tableType = tableType; this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient; this.props = props; this.schemaProvider = schemaProvider; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index bc4c85dcc..836847815 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -93,6 +93,17 @@ public class HoodieDeltaStreamer implements Serializable { getDefaultHiveConf(jssc.hadoopConfiguration())); } + public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, TypedProperties props) throws IOException { + this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()), + getDefaultHiveConf(jssc.hadoopConfiguration()), props); + } + + public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf, + TypedProperties properties) throws IOException { + this.cfg = cfg; + this.deltaSyncService = new DeltaSyncService(cfg, jssc, fs, hiveConf, properties); + } + public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf) throws IOException { this.cfg = cfg; this.deltaSyncService = new DeltaSyncService(cfg, jssc, fs, hiveConf); @@ -142,7 +153,7 @@ public class HoodieDeltaStreamer implements Serializable { UPSERT, INSERT, BULK_INSERT } - private static class OperationConvertor implements IStringConverter { + protected static class OperationConvertor implements IStringConverter { @Override public Operation convert(String value) throws ParameterException { @@ -150,7 +161,7 @@ public class HoodieDeltaStreamer implements Serializable { } } - private static class TransformersConverter implements IStringConverter> { + protected static class TransformersConverter implements IStringConverter> { @Override public List convert(String value) throws ParameterException { @@ -169,6 +180,7 @@ public class HoodieDeltaStreamer implements Serializable { required = true) public String targetBasePath; + // TODO: How to obtain hive configs to register? @Parameter(names = {"--target-table"}, description = "name of the target table in Hive", required = true) public String targetTableName; @@ -359,8 +371,8 @@ public class HoodieDeltaStreamer implements Serializable { */ private transient DeltaSync deltaSync; - public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf) - throws IOException { + public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf, + TypedProperties properties) throws IOException { this.cfg = cfg; this.jssc = jssc; this.sparkSession = SparkSession.builder().config(jssc.getConf()).getOrCreate(); @@ -376,7 +388,7 @@ public class HoodieDeltaStreamer implements Serializable { tableType = HoodieTableType.valueOf(cfg.tableType); } - this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); + this.props = properties != null ? properties : UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); LOG.info("Creating delta streamer with configs : " + props.toString()); this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc); @@ -384,8 +396,14 @@ public class HoodieDeltaStreamer implements Serializable { cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation; } - deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, tableType, props, jssc, fs, hiveConf, - this::onInitializingWriteClient); + deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props, jssc, fs, hiveConf, + this::onInitializingWriteClient); + + } + + public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf) + throws IOException { + this(cfg, jssc, fs, hiveConf, null); } public DeltaSync getDeltaSync() { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java new file mode 100644 index 000000000..74455f28c --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.deltastreamer; + +import com.beust.jcommander.Parameter; +import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.utilities.UtilHelpers; +import org.apache.hudi.utilities.schema.SchemaRegistryProvider; + +import com.beust.jcommander.JCommander; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.utilities.sources.JsonDFSSource; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Wrapper over HoodieDeltaStreamer.java class. + * Helps with ingesting incremental data into hoodie datasets for multiple tables. + * Currently supports only COPY_ON_WRITE storage type. + */ +public class HoodieMultiTableDeltaStreamer { + + private static Logger logger = LogManager.getLogger(HoodieMultiTableDeltaStreamer.class); + + private List tableExecutionContexts; + private transient JavaSparkContext jssc; + private Set successTables; + private Set failedTables; + + public HoodieMultiTableDeltaStreamer(Config config, JavaSparkContext jssc) throws IOException { + this.tableExecutionContexts = new ArrayList<>(); + this.successTables = new HashSet<>(); + this.failedTables = new HashSet<>(); + this.jssc = jssc; + String commonPropsFile = config.propsFilePath; + String configFolder = config.configFolder; + FileSystem fs = FSUtils.getFs(commonPropsFile, jssc.hadoopConfiguration()); + configFolder = configFolder.charAt(configFolder.length() - 1) == '/' ? configFolder.substring(0, configFolder.length() - 1) : configFolder; + checkIfPropsFileAndConfigFolderExist(commonPropsFile, configFolder, fs); + TypedProperties properties = UtilHelpers.readConfig(fs, new Path(commonPropsFile), new ArrayList<>()).getConfig(); + //get the tables to be ingested and their corresponding config files from this properties instance + populateTableExecutionContextList(properties, configFolder, fs, config); + } + + private void checkIfPropsFileAndConfigFolderExist(String commonPropsFile, String configFolder, FileSystem fs) throws IOException { + if (!fs.exists(new Path(commonPropsFile))) { + throw new IllegalArgumentException("Please provide valid common config file path!"); + } + + if (!fs.exists(new Path(configFolder))) { + fs.mkdirs(new Path(configFolder)); + } + } + + private void checkIfTableConfigFileExists(String configFolder, FileSystem fs, String configFilePath) throws IOException { + if (!fs.exists(new Path(configFilePath)) || !fs.isFile(new Path(configFilePath))) { + throw new IllegalArgumentException("Please provide valid table config file path!"); + } + + Path path = new Path(configFilePath); + Path filePathInConfigFolder = new Path(configFolder, path.getName()); + if (!fs.exists(filePathInConfigFolder)) { + FileUtil.copy(fs, path, fs, filePathInConfigFolder, false, fs.getConf()); + } + } + + //commonProps are passed as parameter which contain table to config file mapping + private void populateTableExecutionContextList(TypedProperties properties, String configFolder, FileSystem fs, Config config) throws IOException { + List tablesToBeIngested = getTablesToBeIngested(properties); + logger.info("tables to be ingested via MultiTableDeltaStreamer : " + tablesToBeIngested); + TableExecutionContext executionContext; + for (String table : tablesToBeIngested) { + String[] tableWithDatabase = table.split("\\."); + String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] : "default"; + String currentTable = tableWithDatabase.length > 1 ? tableWithDatabase[1] : table; + String configProp = Constants.INGESTION_PREFIX + database + Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX; + String configFilePath = properties.getString(configProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable)); + checkIfTableConfigFileExists(configFolder, fs, configFilePath); + TypedProperties tableProperties = UtilHelpers.readConfig(fs, new Path(configFilePath), new ArrayList<>()).getConfig(); + properties.forEach((k,v) -> { + tableProperties.setProperty(k.toString(), v.toString()); + }); + final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); + //copy all the values from config to cfg + String targetBasePath = resetTarget(config, database, currentTable); + Helpers.deepCopyConfigs(config, cfg); + String overriddenTargetBasePath = tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, ""); + cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) ? targetBasePath : overriddenTargetBasePath; + if (cfg.enableHiveSync && StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), ""))) { + throw new HoodieException("Hive sync table field not provided!"); + } + populateSchemaProviderProps(cfg, tableProperties); + executionContext = new TableExecutionContext(); + executionContext.setProperties(tableProperties); + executionContext.setConfig(cfg); + executionContext.setDatabase(database); + executionContext.setTableName(currentTable); + this.tableExecutionContexts.add(executionContext); + } + } + + private List getTablesToBeIngested(TypedProperties properties) { + String combinedTablesString = properties.getString(Constants.TABLES_TO_BE_INGESTED_PROP); + if (combinedTablesString == null) { + return new ArrayList<>(); + } + String[] tablesArray = combinedTablesString.split(Constants.COMMA_SEPARATOR); + return Arrays.asList(tablesArray); + } + + private void populateSchemaProviderProps(HoodieDeltaStreamer.Config cfg, TypedProperties typedProperties) { + if (cfg.schemaProviderClassName.equals(SchemaRegistryProvider.class.getName())) { + String schemaRegistryBaseUrl = typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP); + String schemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP); + typedProperties.setProperty(Constants.SOURCE_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + schemaRegistrySuffix); + typedProperties.setProperty(Constants.TARGET_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + schemaRegistrySuffix); + } + } + + public static class Helpers { + + static String getDefaultConfigFilePath(String configFolder, String database, String currentTable) { + return configFolder + Constants.FILEDELIMITER + database + Constants.UNDERSCORE + currentTable + Constants.DEFAULT_CONFIG_FILE_NAME_SUFFIX; + } + + static String getTableWithDatabase(TableExecutionContext context) { + return context.getDatabase() + Constants.DELIMITER + context.getTableName(); + } + + static void deepCopyConfigs(Config globalConfig, HoodieDeltaStreamer.Config tableConfig) { + tableConfig.enableHiveSync = globalConfig.enableHiveSync; + tableConfig.schemaProviderClassName = globalConfig.schemaProviderClassName; + tableConfig.sourceOrderingField = globalConfig.sourceOrderingField; + tableConfig.sourceClassName = globalConfig.sourceClassName; + tableConfig.tableType = globalConfig.tableType; + tableConfig.targetTableName = globalConfig.targetTableName; + tableConfig.operation = globalConfig.operation; + tableConfig.sourceLimit = globalConfig.sourceLimit; + tableConfig.checkpoint = globalConfig.checkpoint; + tableConfig.continuousMode = globalConfig.continuousMode; + tableConfig.filterDupes = globalConfig.filterDupes; + tableConfig.payloadClassName = globalConfig.payloadClassName; + tableConfig.forceDisableCompaction = globalConfig.forceDisableCompaction; + tableConfig.maxPendingCompactions = globalConfig.maxPendingCompactions; + tableConfig.minSyncIntervalSeconds = globalConfig.minSyncIntervalSeconds; + tableConfig.transformerClassNames = globalConfig.transformerClassNames; + tableConfig.commitOnErrors = globalConfig.commitOnErrors; + tableConfig.compactSchedulingMinShare = globalConfig.compactSchedulingMinShare; + tableConfig.compactSchedulingWeight = globalConfig.compactSchedulingWeight; + tableConfig.deltaSyncSchedulingMinShare = globalConfig.deltaSyncSchedulingMinShare; + tableConfig.deltaSyncSchedulingWeight = globalConfig.deltaSyncSchedulingWeight; + tableConfig.sparkMaster = globalConfig.sparkMaster; + } + } + + public static void main(String[] args) throws IOException { + final Config config = new Config(); + JCommander cmd = new JCommander(config, null, args); + if (config.help || args.length == 0) { + cmd.usage(); + System.exit(1); + } + JavaSparkContext jssc = UtilHelpers.buildSparkContext("multi-table-delta-streamer", Constants.LOCAL_SPARK_MASTER); + try { + new HoodieMultiTableDeltaStreamer(config, jssc).sync(); + } finally { + jssc.stop(); + } + } + + public static class Config implements Serializable { + + @Parameter(names = {"--base-path-prefix"}, + description = "base path prefix for multi table support via HoodieMultiTableDeltaStreamer class") + public String basePathPrefix; + + @Parameter(names = {"--target-table"}, description = "name of the target table in Hive", required = true) + public String targetTableName; + + @Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ", required = true) + public String tableType; + + @Parameter(names = {"--config-folder"}, description = "Path to folder which contains all the properties file", required = true) + public String configFolder; + + @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for " + + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are " + + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer" + + "to individual classes, for supported properties.") + public String propsFilePath = + "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties"; + + @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " + + "(using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter") + public List configs = new ArrayList<>(); + + @Parameter(names = {"--source-class"}, + description = "Subclass of org.apache.hudi.utilities.sources to read data. " + + "Built-in options: org.apache.hudi.utilities.sources.{JsonDFSSource (default), AvroDFSSource, " + + "JsonKafkaSource, AvroKafkaSource, HiveIncrPullSource}") + public String sourceClassName = JsonDFSSource.class.getName(); + + @Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how" + + " to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record") + public String sourceOrderingField = "ts"; + + @Parameter(names = {"--payload-class"}, description = "subclass of HoodieRecordPayload, that works off " + + "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value") + public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName(); + + @Parameter(names = {"--schemaprovider-class"}, description = "subclass of org.apache.hudi.utilities.schema" + + ".SchemaProvider to attach schemas to input & target table data, built in options: " + + "org.apache.hudi.utilities.schema.FilebasedSchemaProvider." + + "Source (See org.apache.hudi.utilities.sources.Source) implementation can implement their own SchemaProvider." + + " For Sources that return Dataset, the schema is obtained implicitly. " + + "However, this CLI option allows overriding the schemaprovider returned by Source.") + public String schemaProviderClassName = null; + + @Parameter(names = {"--transformer-class"}, + description = "A subclass or a list of subclasses of org.apache.hudi.utilities.transform.Transformer" + + ". Allows transforming raw source Dataset to a target Dataset (conforming to target schema) before " + + "writing. Default : Not set. E:g - org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which " + + "allows a SQL query templated to be passed as a transformation function). " + + "Pass a comma-separated list of subclass names to chain the transformations.", + converter = HoodieDeltaStreamer.TransformersConverter.class) + public List transformerClassNames = null; + + @Parameter(names = {"--source-limit"}, description = "Maximum amount of data to read from source. " + + "Default: No limit For e.g: DFS-Source => max bytes to read, Kafka-Source => max events to read") + public long sourceLimit = Long.MAX_VALUE; + + @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input " + + "is purely new data/inserts to gain speed)", converter = HoodieDeltaStreamer.OperationConvertor.class) + public HoodieDeltaStreamer.Operation operation = HoodieDeltaStreamer.Operation.UPSERT; + + @Parameter(names = {"--filter-dupes"}, + description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert") + public Boolean filterDupes = false; + + @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive") + public Boolean enableHiveSync = false; + + @Parameter(names = {"--max-pending-compactions"}, + description = "Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unless" + + "outstanding compactions is less than this number") + public Integer maxPendingCompactions = 5; + + @Parameter(names = {"--continuous"}, description = "Delta Streamer runs in continuous mode running" + + " source-fetch -> Transform -> Hudi Write in loop") + public Boolean continuousMode = false; + + @Parameter(names = {"--min-sync-interval-seconds"}, + description = "the min sync interval of each sync in continuous mode") + public Integer minSyncIntervalSeconds = 0; + + @Parameter(names = {"--spark-master"}, description = "spark master to use.") + public String sparkMaster = "local[2]"; + + @Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written") + public Boolean commitOnErrors = false; + + @Parameter(names = {"--delta-sync-scheduling-weight"}, + description = "Scheduling weight for delta sync as defined in " + + "https://spark.apache.org/docs/latest/job-scheduling.html") + public Integer deltaSyncSchedulingWeight = 1; + + @Parameter(names = {"--compact-scheduling-weight"}, description = "Scheduling weight for compaction as defined in " + + "https://spark.apache.org/docs/latest/job-scheduling.html") + public Integer compactSchedulingWeight = 1; + + @Parameter(names = {"--delta-sync-scheduling-minshare"}, description = "Minshare for delta sync as defined in " + + "https://spark.apache.org/docs/latest/job-scheduling.html") + public Integer deltaSyncSchedulingMinShare = 0; + + @Parameter(names = {"--compact-scheduling-minshare"}, description = "Minshare for compaction as defined in " + + "https://spark.apache.org/docs/latest/job-scheduling.html") + public Integer compactSchedulingMinShare = 0; + + /** + * Compaction is enabled for MoR table by default. This flag disables it + */ + @Parameter(names = {"--disable-compaction"}, + description = "Compaction is enabled for MoR table by default. This flag disables it ") + public Boolean forceDisableCompaction = false; + + /** + * Resume Delta Streamer from this checkpoint. + */ + @Parameter(names = {"--checkpoint"}, description = "Resume Delta Streamer from this checkpoint.") + public String checkpoint = null; + + @Parameter(names = {"--help", "-h"}, help = true) + public Boolean help = false; + } + + /** + * Resets target table name and target path using base-path-prefix. + * @param configuration + * @param database + * @param tableName + * @return + */ + private static String resetTarget(Config configuration, String database, String tableName) { + String basePathPrefix = configuration.basePathPrefix; + basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' ? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix; + String targetBasePath = basePathPrefix + Constants.FILEDELIMITER + database + Constants.FILEDELIMITER + tableName; + configuration.targetTableName = database + Constants.DELIMITER + tableName; + return targetBasePath; + } + + /* + Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync + */ + public void sync() { + for (TableExecutionContext context : tableExecutionContexts) { + try { + new HoodieDeltaStreamer(context.getConfig(), jssc, context.getProperties()).sync(); + successTables.add(Helpers.getTableWithDatabase(context)); + } catch (Exception e) { + logger.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e); + failedTables.add(Helpers.getTableWithDatabase(context)); + } + } + + logger.info("Ingestion was successful for topics: " + successTables); + if (!failedTables.isEmpty()) { + logger.info("Ingestion failed for topics: " + failedTables); + } + } + + public static class Constants { + public static final String KAFKA_TOPIC_PROP = "hoodie.deltastreamer.source.kafka.topic"; + private static final String SOURCE_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url"; + private static final String TARGET_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.targetUrl"; + public static final String HIVE_SYNC_TABLE_PROP = "hoodie.datasource.hive_sync.table"; + private static final String SCHEMA_REGISTRY_BASE_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.baseUrl"; + private static final String SCHEMA_REGISTRY_URL_SUFFIX_PROP = "hoodie.deltastreamer.schemaprovider.registry.urlSuffix"; + private static final String TABLES_TO_BE_INGESTED_PROP = "hoodie.deltastreamer.ingestion.tablesToBeIngested"; + private static final String INGESTION_PREFIX = "hoodie.deltastreamer.ingestion."; + private static final String INGESTION_CONFIG_SUFFIX = ".configFile"; + private static final String DEFAULT_CONFIG_FILE_NAME_SUFFIX = "_config.properties"; + private static final String TARGET_BASE_PATH_PROP = "hoodie.deltastreamer.ingestion.targetBasePath"; + private static final String LOCAL_SPARK_MASTER = "local[2]"; + private static final String FILEDELIMITER = "/"; + private static final String DELIMITER = "."; + private static final String UNDERSCORE = "_"; + private static final String COMMA_SEPARATOR = ","; + } + + public Set getSuccessTables() { + return successTables; + } + + public Set getFailedTables() { + return failedTables; + } + + public List getTableExecutionContexts() { + return this.tableExecutionContexts; + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/TableExecutionContext.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/TableExecutionContext.java new file mode 100644 index 000000000..12c00d4b9 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/TableExecutionContext.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.deltastreamer; + +import org.apache.hudi.common.config.TypedProperties; + +import java.util.Objects; + +/** + * Wrapper over TableConfig objects. + * Useful for incrementally syncing multiple tables one by one via HoodieMultiTableDeltaStreamer.java class. + */ +public class TableExecutionContext { + + private TypedProperties properties; + private HoodieDeltaStreamer.Config config; + private String database; + private String tableName; + + public HoodieDeltaStreamer.Config getConfig() { + return config; + } + + public void setConfig(HoodieDeltaStreamer.Config config) { + this.config = config; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public TypedProperties getProperties() { + return properties; + } + + public void setProperties(TypedProperties properties) { + this.properties = properties; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + TableExecutionContext that = (TableExecutionContext) o; + return Objects.equals(properties, that.properties) && Objects.equals(database, that.database) && Objects.equals(tableName, that.tableName); + } + + @Override + public int hashCode() { + return Objects.hash(properties, database, tableName); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java index b7323d466..7edf5345c 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java @@ -69,6 +69,7 @@ import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.api.java.UDF4; import org.apache.spark.sql.functions; import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.streaming.kafka010.KafkaTestUtils; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -100,6 +101,10 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { private static final Random RANDOM = new Random(); private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties"; + public static final String PROPS_FILENAME_TEST_SOURCE1 = "test-source1.properties"; + public static final String PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1 = "test-invalid-hive-sync-source1.properties"; + public static final String PROPS_INVALID_FILE = "test-invalid-props.properties"; + public static final String PROPS_INVALID_TABLE_CONFIG_FILE = "test-invalid-table-config.properties"; private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties"; private static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties"; private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties"; @@ -107,6 +112,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { private static final int PARQUET_NUM_RECORDS = 5; private static final int CSV_NUM_RECORDS = 3; private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class); + public static KafkaTestUtils testUtils; private static int testNum = 1; @@ -114,9 +120,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { public static void initClass() throws Exception { UtilitiesTestBase.initClass(true); PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles"; + testUtils = new KafkaTestUtils(); + testUtils.setup(); // prepare the configs. UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/config/base.properties"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs, dfsBasePath + "/sql-transformer.properties"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc"); @@ -124,6 +133,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target-flattened.avsc", dfs, dfsBasePath + "/target-flattened.avsc"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_short_trip_uber.avsc", dfs, dfsBasePath + "/source_short_trip_uber.avsc"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_uber.avsc", dfs, dfsBasePath + "/source_uber.avsc"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target_short_trip_uber.avsc", dfs, dfsBasePath + "/target_short_trip_uber.avsc"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target_uber.avsc", dfs, dfsBasePath + "/target_uber.avsc"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/invalid_hive_sync_uber_config.properties", dfs, dfsBasePath + "/config/invalid_hive_sync_uber_config.properties"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/uber_config.properties", dfs, dfsBasePath + "/config/uber_config.properties"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/short_trip_uber_config.properties", dfs, dfsBasePath + "/config/short_trip_uber_config.properties"); + TypedProperties props = new TypedProperties(); props.setProperty("include", "sql-transformer.properties"); props.setProperty("hoodie.datasource.write.keygenerator.class", TestGenerator.class.getName()); @@ -163,11 +180,54 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID); + TypedProperties props1 = new TypedProperties(); + populateCommonProps(props1); + UtilitiesTestBase.Helpers.savePropsToDFS(props1, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE1); + + TypedProperties properties = new TypedProperties(); + populateInvalidTableConfigFilePathProps(properties); + UtilitiesTestBase.Helpers.savePropsToDFS(properties, dfs, dfsBasePath + "/" + PROPS_INVALID_TABLE_CONFIG_FILE); + + TypedProperties invalidHiveSyncProps = new TypedProperties(); + invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber"); + invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_hive_sync_uber_config.properties"); + UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs, dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1); + prepareParquetDFSFiles(PARQUET_NUM_RECORDS); } + private static void populateInvalidTableConfigFilePathProps(TypedProperties props) { + props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); + props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd"); + props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber"); + props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_uber_config.properties"); + } + + private static void populateCommonProps(TypedProperties props) { + props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); + props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd"); + props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber"); + props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/uber_config.properties"); + props.setProperty("hoodie.deltastreamer.ingestion.short_trip_db.dummy_table_short_trip.configFile", dfsBasePath + "/config/short_trip_uber_config.properties"); + + //Kafka source properties + props.setProperty("bootstrap.servers", testUtils.brokerAddress()); + props.setProperty("auto.offset.reset", "earliest"); + props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000)); + + // Hive Configs + props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), "jdbc:hive2://127.0.0.1:9999/"); + props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), "testdb2"); + props.setProperty(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY(), "false"); + props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "datestr"); + props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(), + MultiPartKeysValueExtractor.class.getName()); + } + @AfterClass - public static void cleanupClass() throws Exception { + public static void cleanupClass() { UtilitiesTestBase.cleanupClass(); } @@ -649,7 +709,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { String path = PARQUET_SOURCE_ROOT + "/1.parquet"; HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); Helpers.saveParquetToDFS(Helpers.toGenericRecords( - dataGenerator.generateInserts("000", numRecords), dataGenerator), new Path(path)); + dataGenerator.generateInserts("000", numRecords)), new Path(path)); } private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMultiTableDeltaStreamer.java new file mode 100644 index 000000000..db0e7775f --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMultiTableDeltaStreamer.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities; + +import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.common.HoodieTestDataGenerator; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer; +import org.apache.hudi.utilities.deltastreamer.TableExecutionContext; +import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; +import org.apache.hudi.utilities.sources.JsonKafkaSource; +import org.apache.hudi.utilities.sources.TestDataSource; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer { + + private static volatile Logger log = LogManager.getLogger(TestHoodieMultiTableDeltaStreamer.class); + + static class TestHelpers { + + static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync) { + HoodieMultiTableDeltaStreamer.Config config = new HoodieMultiTableDeltaStreamer.Config(); + config.configFolder = configFolder; + config.targetTableName = "dummy_table"; + config.basePathPrefix = dfsBasePath + "/multi_table_dataset"; + config.propsFilePath = dfsBasePath + "/" + fileName; + config.tableType = "COPY_ON_WRITE"; + config.sourceClassName = sourceClassName; + config.sourceOrderingField = "timestamp"; + config.schemaProviderClassName = FilebasedSchemaProvider.class.getName(); + config.enableHiveSync = enableHiveSync; + return config; + } + } + + @Test + public void testInvalidHiveSyncProps() throws IOException { + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1,dfsBasePath + "/config", TestDataSource.class.getName(), true); + try { + new HoodieMultiTableDeltaStreamer(cfg, jsc); + fail("Should fail when hive sync table not provided with enableHiveSync flag"); + } catch (HoodieException he) { + log.error("Expected error when creating table execution objects", he); + assertTrue(he.getMessage().contains("Hive sync table field not provided!")); + } + } + + @Test + public void testInvalidPropsFilePath() throws IOException { + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_FILE,dfsBasePath + "/config", TestDataSource.class.getName(), true); + try { + new HoodieMultiTableDeltaStreamer(cfg, jsc); + fail("Should fail when invalid props file is provided"); + } catch (IllegalArgumentException iae) { + log.error("Expected error when creating table execution objects", iae); + assertTrue(iae.getMessage().contains("Please provide valid common config file path!")); + } + } + + @Test + public void testInvalidTableConfigFilePath() throws IOException { + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_TABLE_CONFIG_FILE,dfsBasePath + "/config", TestDataSource.class.getName(), true); + try { + new HoodieMultiTableDeltaStreamer(cfg, jsc); + fail("Should fail when invalid table config props file path is provided"); + } catch (IllegalArgumentException iae) { + log.error("Expected error when creating table execution objects", iae); + assertTrue(iae.getMessage().contains("Please provide valid table config file path!")); + } + } + + @Test + public void testCustomConfigProps() throws IOException { + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1,dfsBasePath + "/config", TestDataSource.class.getName(), false); + HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc); + TableExecutionContext executionContext = streamer.getTableExecutionContexts().get(1); + assertEquals(streamer.getTableExecutionContexts().size(), 2); + assertEquals(executionContext.getConfig().targetBasePath, dfsBasePath + "/multi_table_dataset/uber_db/dummy_table_uber"); + assertEquals(executionContext.getConfig().targetTableName, "uber_db.dummy_table_uber"); + assertEquals(executionContext.getProperties().getString(HoodieMultiTableDeltaStreamer.Constants.KAFKA_TOPIC_PROP), "topic1"); + assertEquals(executionContext.getProperties().getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()), "_row_key"); + assertEquals(executionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY()), TestHoodieDeltaStreamer.TestGenerator.class.getName()); + assertEquals(executionContext.getProperties().getString(HoodieMultiTableDeltaStreamer.Constants.HIVE_SYNC_TABLE_PROP), "uber_hive_dummy_table"); + } + + @Test + @Ignore + public void testInvalidIngestionProps() { + try { + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1,dfsBasePath + "/config", TestDataSource.class.getName(), true); + new HoodieMultiTableDeltaStreamer(cfg, jsc); + fail("Creation of execution object should fail without kafka topic"); + } catch (Exception e) { + log.error("Creation of execution object failed with error: " + e.getMessage(), e); + assertTrue(e.getMessage().contains("Please provide valid table config arguments!")); + } + } + + @Test //0 corresponds to fg + public void testMultiTableExecution() throws IOException { + //create topics for each table + testUtils.createTopic("topic1", 2); + testUtils.createTopic("topic2", 2); + + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + testUtils.sendMessages("topic1", Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, HoodieTestDataGenerator.TRIP_SCHEMA))); + testUtils.sendMessages("topic2", Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA))); + + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1,dfsBasePath + "/config", JsonKafkaSource.class.getName(), false); + HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc); + List executionContexts = streamer.getTableExecutionContexts(); + TypedProperties properties = executionContexts.get(1).getProperties(); + properties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc"); + properties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc"); + executionContexts.get(1).setProperties(properties); + TypedProperties properties1 = executionContexts.get(0).getProperties(); + properties1.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_short_trip_uber.avsc"); + properties1.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_short_trip_uber.avsc"); + executionContexts.get(0).setProperties(properties1); + String targetBasePath1 = executionContexts.get(1).getConfig().targetBasePath; + String targetBasePath2 = executionContexts.get(0).getConfig().targetBasePath; + streamer.sync(); + + TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1 + "/*/*.parquet", sqlContext); + TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2 + "/*/*.parquet", sqlContext); + + //insert updates for already existing records in kafka topics + testUtils.sendMessages("topic1", Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5, HoodieTestDataGenerator.TRIP_SCHEMA))); + testUtils.sendMessages("topic2", Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA))); + streamer.sync(); + assertEquals(streamer.getSuccessTables().size(), 2); + assertTrue(streamer.getFailedTables().isEmpty()); + + //assert the record count matches now + TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1 + "/*/*.parquet", sqlContext); + TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2 + "/*/*.parquet", sqlContext); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java index 7f149571e..e82d66e05 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java @@ -83,6 +83,7 @@ public class UtilitiesTestBase { protected transient SparkSession sparkSession = null; protected transient SQLContext sqlContext; protected static HiveServer2 hiveServer; + protected static HiveTestService hiveTestService; private static ObjectMapper mapper = new ObjectMapper(); @BeforeClass @@ -97,20 +98,23 @@ public class UtilitiesTestBase { dfsBasePath = dfs.getWorkingDirectory().toString(); dfs.mkdirs(new Path(dfsBasePath)); if (startHiveService) { - HiveTestService hiveService = new HiveTestService(hdfsTestService.getHadoopConf()); - hiveServer = hiveService.start(); + hiveTestService = new HiveTestService(hdfsTestService.getHadoopConf()); + hiveServer = hiveTestService.start(); clearHiveDb(); } } @AfterClass - public static void cleanupClass() throws Exception { + public static void cleanupClass() { if (hdfsTestService != null) { hdfsTestService.stop(); } if (hiveServer != null) { hiveServer.stop(); } + if (hiveTestService != null) { + hiveTestService.stop(); + } } @Before @@ -263,20 +267,19 @@ public class UtilitiesTestBase { return props; } - public static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, HoodieTestDataGenerator dataGenerator) { + public static GenericRecord toGenericRecord(HoodieRecord hoodieRecord) { try { - Option recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.AVRO_SCHEMA); + Option recordOpt = hoodieRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA); return (GenericRecord) recordOpt.get(); } catch (IOException e) { return null; } } - public static List toGenericRecords(List hoodieRecords, - HoodieTestDataGenerator dataGenerator) { + public static List toGenericRecords(List hoodieRecords) { List records = new ArrayList(); for (HoodieRecord hoodieRecord : hoodieRecords) { - records.add(toGenericRecord(hoodieRecord, dataGenerator)); + records.add(toGenericRecord(hoodieRecord)); } return records; } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java index 262a14f3e..9e28833b3 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java @@ -88,7 +88,7 @@ public abstract class AbstractBaseTestSource extends AvroSource { HoodieTestDataGenerator dataGenerator = dataGeneratorMap.get(partition); // generate `sourceLimit` number of upserts each time. - int numExistingKeys = dataGenerator.getNumExistingKeys(); + int numExistingKeys = dataGenerator.getNumExistingKeys(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); LOG.info("NumExistingKeys=" + numExistingKeys); int numUpdates = Math.min(numExistingKeys, sourceLimit / 2); @@ -116,21 +116,22 @@ public abstract class AbstractBaseTestSource extends AvroSource { LOG.info("After adjustments => NumInserts=" + numInserts + ", NumUpdates=" + (numUpdates - 50) + ", NumDeletes=50, maxUniqueRecords=" + maxUniqueKeys); // if we generate update followed by deletes -> some keys in update batch might be picked up for deletes. Hence generating delete batch followed by updates - deleteStream = dataGenerator.generateUniqueDeleteRecordStream(instantTime, 50).map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator)); - updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, numUpdates - 50).map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator)); + deleteStream = dataGenerator.generateUniqueDeleteRecordStream(instantTime, 50).map(AbstractBaseTestSource::toGenericRecord); + updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, numUpdates - 50, HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .map(AbstractBaseTestSource::toGenericRecord); } else { LOG.info("After adjustments => NumInserts=" + numInserts + ", NumUpdates=" + numUpdates + ", maxUniqueRecords=" + maxUniqueKeys); - updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, numUpdates) - .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator)); + updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, numUpdates, HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .map(AbstractBaseTestSource::toGenericRecord); } - Stream insertStream = dataGenerator.generateInsertsStream(instantTime, numInserts, false) - .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator)); + Stream insertStream = dataGenerator.generateInsertsStream(instantTime, numInserts, false, HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .map(AbstractBaseTestSource::toGenericRecord); return Stream.concat(deleteStream, Stream.concat(updateStream, insertStream)); } - private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, HoodieTestDataGenerator dataGenerator) { + private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord) { try { - Option recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.AVRO_SCHEMA); + Option recordOpt = hoodieRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA); return (GenericRecord) recordOpt.get(); } catch (IOException e) { return null; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java index b4a023e4c..c0f0a3d3f 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java @@ -64,7 +64,7 @@ public abstract class AbstractDFSSourceTestBase extends UtilitiesTestBase { } @AfterClass - public static void cleanupClass() throws Exception { + public static void cleanupClass() { UtilitiesTestBase.cleanupClass(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java index e592c74aa..c5225573c 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java @@ -42,7 +42,6 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import java.io.IOException; import java.util.HashMap; import java.util.UUID; @@ -64,7 +63,7 @@ public class TestKafkaSource extends UtilitiesTestBase { } @AfterClass - public static void cleanupClass() throws Exception { + public static void cleanupClass() { UtilitiesTestBase.cleanupClass(); } @@ -95,7 +94,7 @@ public class TestKafkaSource extends UtilitiesTestBase { } @Test - public void testJsonKafkaSource() throws IOException { + public void testJsonKafkaSource() { // topic setup. testUtils.createTopic(TEST_TOPIC_NAME, 2); @@ -143,7 +142,7 @@ public class TestKafkaSource extends UtilitiesTestBase { } @Test - public void testJsonKafkaSourceWithDefaultUpperCap() throws IOException { + public void testJsonKafkaSourceWithDefaultUpperCap() { // topic setup. testUtils.createTopic(TEST_TOPIC_NAME, 2); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); @@ -172,7 +171,7 @@ public class TestKafkaSource extends UtilitiesTestBase { } @Test - public void testJsonKafkaSourceWithConfigurableUpperCap() throws IOException { + public void testJsonKafkaSourceWithConfigurableUpperCap() { // topic setup. testUtils.createTopic(TEST_TOPIC_NAME, 2); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java index a1a7697db..a2b357407 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java @@ -48,6 +48,6 @@ public class TestParquetDFSSource extends AbstractDFSSourceTestBase { @Override void writeNewDataToFile(List records, Path path) throws IOException { - Helpers.saveParquetToDFS(Helpers.toGenericRecords(records, dataGenerator), path); + Helpers.saveParquetToDFS(Helpers.toGenericRecords(records), path); } } diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/invalid_hive_sync_uber_config.properties b/hudi-utilities/src/test/resources/delta-streamer-config/invalid_hive_sync_uber_config.properties new file mode 100644 index 000000000..5c569c5d0 --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/invalid_hive_sync_uber_config.properties @@ -0,0 +1,23 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +include=base.properties +hoodie.datasource.write.recordkey.field=_row_key +hoodie.datasource.write.partitionpath.field=created_at +hoodie.deltastreamer.source.kafka.topic=test_topic +hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP +hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd \ No newline at end of file diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties b/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties new file mode 100644 index 000000000..52d39baee --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties @@ -0,0 +1,24 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +include=base.properties +hoodie.datasource.write.recordkey.field=_row_key +hoodie.datasource.write.partitionpath.field=created_at +hoodie.deltastreamer.source.kafka.topic=topic2 +hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP +hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S +hoodie.datasource.hive_sync.table=short_trip_uber_hive_dummy_table \ No newline at end of file diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/source_short_trip_uber.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/source_short_trip_uber.avsc new file mode 100644 index 000000000..8a589bdf7 --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/source_short_trip_uber.avsc @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "type" : "record", + "name" : "shortTripRec", + "fields" : [ + { + "name" : "timestamp", + "type" : "double" + }, { + "name" : "_row_key", + "type" : "string" + }, { + "name" : "rider", + "type" : "string" + }, { + "name" : "driver", + "type" : "string" + }, { + "name" : "fare", + "type" : "double" + }, + { + "name" : "_hoodie_is_deleted", + "type" : "boolean", + "default" : false + } ] +} + diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/source_uber.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/source_uber.avsc new file mode 100644 index 000000000..324862ee9 --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/source_uber.avsc @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "type" : "record", + "name" : "tripUberRec", + "fields" : [ + { + "name" : "timestamp", + "type" : "double" + }, { + "name" : "_row_key", + "type" : "string" + }, { + "name" : "rider", + "type" : "string" + }, { + "name" : "driver", + "type" : "string" + }, { + "name" : "fare", + "type" : "double" + }, + { + "name" : "_hoodie_is_deleted", + "type" : "boolean", + "default" : false + } ] +} + diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/target_short_trip_uber.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/target_short_trip_uber.avsc new file mode 100644 index 000000000..8a589bdf7 --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/target_short_trip_uber.avsc @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "type" : "record", + "name" : "shortTripRec", + "fields" : [ + { + "name" : "timestamp", + "type" : "double" + }, { + "name" : "_row_key", + "type" : "string" + }, { + "name" : "rider", + "type" : "string" + }, { + "name" : "driver", + "type" : "string" + }, { + "name" : "fare", + "type" : "double" + }, + { + "name" : "_hoodie_is_deleted", + "type" : "boolean", + "default" : false + } ] +} + diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/target_uber.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/target_uber.avsc new file mode 100644 index 000000000..324862ee9 --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/target_uber.avsc @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "type" : "record", + "name" : "tripUberRec", + "fields" : [ + { + "name" : "timestamp", + "type" : "double" + }, { + "name" : "_row_key", + "type" : "string" + }, { + "name" : "rider", + "type" : "string" + }, { + "name" : "driver", + "type" : "string" + }, { + "name" : "fare", + "type" : "double" + }, + { + "name" : "_hoodie_is_deleted", + "type" : "boolean", + "default" : false + } ] +} + diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/uber_config.properties b/hudi-utilities/src/test/resources/delta-streamer-config/uber_config.properties new file mode 100644 index 000000000..3d3501fec --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/uber_config.properties @@ -0,0 +1,25 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +include=base.properties +hoodie.datasource.write.recordkey.field=_row_key +hoodie.datasource.write.partitionpath.field=created_at +hoodie.deltastreamer.source.kafka.topic=topic1 +hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP +hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S +hoodie.datasource.hive_sync.database=uber_hive_db +hoodie.datasource.hive_sync.table=uber_hive_dummy_table \ No newline at end of file