From 524193eb4bd45eb21a64c68e927600a52b9ade1d Mon Sep 17 00:00:00 2001 From: Pratyaksh Sharma Date: Wed, 7 Oct 2020 09:04:03 +0530 Subject: [PATCH] [HUDI-603]: DeltaStreamer can now fetch schema before every run in continuous mode (#1566) Co-authored-by: Balaji Varadarajan --- .../hudi/async/AsyncCompactService.java | 4 + .../apache/hudi/client/AbstractCompactor.java | 5 + .../hudi/client/AbstractHoodieClient.java | 17 ++-- .../EmbeddedTimelineServerHelper.java | 72 ++++++++++++++ .../hudi/client/HoodieSparkCompactor.java | 5 +- .../utilities/deltastreamer/DeltaSync.java | 97 ++++++++++++++++--- .../deltastreamer/HoodieDeltaStreamer.java | 35 ++++--- .../schema/SchemaRegistryProvider.java | 30 +++--- .../hudi/utilities/schema/SchemaSet.java | 44 +++++++++ 9 files changed, 252 insertions(+), 57 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaSet.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java index d7faf5411..47f883284 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java @@ -161,4 +161,8 @@ public abstract class AsyncCompactService extends HoodieAsyncService { protected boolean shouldStopCompactor() { return false; } + + public synchronized void updateWriteClient(AbstractHoodieWriteClient writeClient) { + this.compactor.updateWriteClient(writeClient); + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractCompactor.java index 30bc569d7..c80b34a3e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractCompactor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractCompactor.java @@ -38,4 +38,9 @@ public abstract class AbstractCompactor } public abstract void compact(HoodieInstant instant) throws IOException; + + public void updateWriteClient(AbstractHoodieWriteClient writeClient) { + this.compactionClient = writeClient; + } + } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java index 7a59ebcfb..e50228132 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java @@ -18,9 +18,7 @@ package org.apache.hudi.client; -import org.apache.hadoop.conf.Configuration; - -import org.apache.hudi.client.common.EngineProperty; +import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.common.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; @@ -29,6 +27,7 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -100,14 +99,8 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl if (config.isEmbeddedTimelineServerEnabled()) { if (!timelineServer.isPresent()) { // Run Embedded Timeline Server - LOG.info("Starting Timeline service !!"); - Option hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST); - timelineServer = Option.of(new EmbeddedTimelineService(context, hostAddr.orElse(null), - config.getEmbeddedTimelineServerPort(), config.getClientSpecifiedViewStorageConfig())); try { - timelineServer.get().startServer(); - // Allow executor to find this newly instantiated timeline service - config.setViewStorageConfig(timelineServer.get().getRemoteFileSystemViewConfig()); + timelineServer = EmbeddedTimelineServerHelper.createEmbeddedTimelineService(context, config); } catch (IOException e) { LOG.warn("Unable to start timeline service. Proceeding as if embedded server is disabled", e); stopEmbeddedServerView(false); @@ -129,4 +122,8 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl config.getConsistencyGuardConfig(), Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))); } + + public Option getTimelineServer() { + return timelineServer; + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java new file mode 100644 index 000000000..1d5984794 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.embedded; + +import org.apache.hudi.client.common.EngineProperty; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; + +/** + * Helper class to instantiate embedded timeline service. + */ +public class EmbeddedTimelineServerHelper { + + private static final Logger LOG = LogManager.getLogger(EmbeddedTimelineService.class); + + /** + * Instantiate Embedded Timeline Server. + * @param context Hoodie Engine Context + * @param config Hoodie Write Config + * @return TimelineServer if configured to run + * @throws IOException + */ + public static Option createEmbeddedTimelineService( + HoodieEngineContext context, HoodieWriteConfig config) throws IOException { + Option timelineServer = Option.empty(); + if (config.isEmbeddedTimelineServerEnabled()) { + // Run Embedded Timeline Server + LOG.info("Starting Timeline service !!"); + Option hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST); + timelineServer = Option.of(new EmbeddedTimelineService(context, hostAddr.orElse(null), + config.getEmbeddedTimelineServerPort(), config.getClientSpecifiedViewStorageConfig())); + timelineServer.get().startServer(); + updateWriteConfigWithTimelineServer(timelineServer.get(), config); + } + return timelineServer; + } + + /** + * Adjusts hoodie write config with timeline server settings. + * @param timelineServer Embedded Timeline Server + * @param config Hoodie Write Config + */ + public static void updateWriteConfigWithTimelineServer(EmbeddedTimelineService timelineServer, + HoodieWriteConfig config) { + // Allow executor to find this newly instantiated timeline service + if (config.isEmbeddedTimelineServerEnabled()) { + config.setViewStorageConfig(timelineServer.getRemoteFileSystemViewConfig()); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java index 68e6da777..b81570de9 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java @@ -41,7 +41,8 @@ public class HoodieSparkCompactor extends Abstrac @Override public void compact(HoodieInstant instant) throws IOException { LOG.info("Compactor executing compaction " + instant); - JavaRDD res = compactionClient.compact(instant.getTimestamp()); + SparkRDDWriteClient writeClient = (SparkRDDWriteClient)compactionClient; + JavaRDD res = writeClient.compact(instant.getTimestamp()); long numWriteErrors = res.collect().stream().filter(WriteStatus::hasErrors).count(); if (numWriteErrors != 0) { // We treat even a single error in compaction as fatal @@ -50,6 +51,6 @@ public class HoodieSparkCompactor extends Abstrac "Compaction for instant (" + instant + ") failed with write errors. Errors :" + numWriteErrors); } // Commit compaction - compactionClient.commitCompaction(instant.getTimestamp(), res, Option.empty()); + writeClient.commitCompaction(instant.getTimestamp(), res, Option.empty()); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 36f121344..5a1756cc2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -24,6 +24,8 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper; +import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -51,6 +53,7 @@ import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback; import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig; import org.apache.hudi.utilities.schema.DelegatingSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.schema.SchemaSet; import org.apache.hudi.utilities.sources.InputBatch; import org.apache.hudi.utilities.transform.Transformer; @@ -166,6 +169,17 @@ public class DeltaSync implements Serializable { */ private transient Option commitTimelineOpt; + /** + * Tracks whether new schema is being seen and creates client accordingly. + */ + private final SchemaSet processedSchema; + + /** + * DeltaSync will explicitly manage embedded timeline server so that they can be reused across Write Client + * instantiations. + */ + private transient Option embeddedTimelineService = Option.empty(); + /** * Write Client. */ @@ -184,6 +198,7 @@ public class DeltaSync implements Serializable { this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient; this.props = props; this.userProvidedSchemaProvider = schemaProvider; + this.processedSchema = new SchemaSet(); refreshTimeline(); // Register User Provided schema first @@ -244,6 +259,18 @@ public class DeltaSync implements Serializable { this.schemaProvider = srcRecordsWithCkpt.getKey(); // Setup HoodieWriteClient and compaction now that we decided on schema setupWriteClient(); + } else { + Schema newSourceSchema = srcRecordsWithCkpt.getKey().getSourceSchema(); + Schema newTargetSchema = srcRecordsWithCkpt.getKey().getTargetSchema(); + if (!(processedSchema.isSchemaPresent(newSourceSchema)) + || !(processedSchema.isSchemaPresent(newTargetSchema))) { + LOG.info("Seeing new schema. Source :" + newSourceSchema.toString(true) + + ", Target :" + newTargetSchema.toString(true)); + // We need to recreate write client with new schema and register them. + reInitWriteClient(newSourceSchema, newTargetSchema); + processedSchema.addSchema(newSourceSchema); + processedSchema.addSchema(newTargetSchema); + } } result = writeToSink(srcRecordsWithCkpt.getRight().getRight(), @@ -541,22 +568,49 @@ public class DeltaSync implements Serializable { * SchemaProvider creation is a precursor to HoodieWriteClient and AsyncCompactor creation. This method takes care of * this constraint. */ - private void setupWriteClient() { - LOG.info("Setting up Hoodie Write Client"); - if ((null != schemaProvider) && (null == writeClient)) { - registerAvroSchemas(schemaProvider); - HoodieWriteConfig hoodieCfg = getHoodieClientConfig(schemaProvider); - writeClient = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc), hoodieCfg, true); - onInitializingHoodieWriteClient.apply(writeClient); + public void setupWriteClient() throws IOException { + if ((null != schemaProvider)) { + Schema sourceSchema = schemaProvider.getSourceSchema(); + Schema targetSchema = schemaProvider.getTargetSchema(); + reInitWriteClient(sourceSchema, targetSchema); } } + private void reInitWriteClient(Schema sourceSchema, Schema targetSchema) throws IOException { + LOG.info("Setting up new Hoodie Write Client"); + registerAvroSchemas(sourceSchema, targetSchema); + HoodieWriteConfig hoodieCfg = getHoodieClientConfig(targetSchema); + if (hoodieCfg.isEmbeddedTimelineServerEnabled()) { + if (!embeddedTimelineService.isPresent()) { + embeddedTimelineService = EmbeddedTimelineServerHelper.createEmbeddedTimelineService(new HoodieSparkEngineContext(jssc), hoodieCfg); + } else { + EmbeddedTimelineServerHelper.updateWriteConfigWithTimelineServer(embeddedTimelineService.get(), hoodieCfg); + } + } + + if (null != writeClient) { + // Close Write client. + writeClient.close(); + } + writeClient = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc), hoodieCfg, true, embeddedTimelineService); + onInitializingHoodieWriteClient.apply(writeClient); + } + /** * Helper to construct Write Client config. * * @param schemaProvider Schema Provider */ private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) { + return getHoodieClientConfig(schemaProvider != null ? schemaProvider.getTargetSchema() : null); + } + + /** + * Helper to construct Write Client config. + * + * @param schema Schema + */ + private HoodieWriteConfig getHoodieClientConfig(Schema schema) { final boolean combineBeforeUpsert = true; final boolean autoCommit = false; HoodieWriteConfig.Builder builder = @@ -567,8 +621,8 @@ public class DeltaSync implements Serializable { .forTable(cfg.targetTableName) .withAutoCommit(autoCommit).withProps(props); - if (null != schemaProvider && null != schemaProvider.getTargetSchema()) { - builder = builder.withSchema(schemaProvider.getTargetSchema().toString()); + if (null != schema) { + builder = builder.withSchema(schema.toString()); } HoodieWriteConfig config = builder.build(); @@ -596,12 +650,24 @@ public class DeltaSync implements Serializable { * @param schemaProvider Schema Provider */ private void registerAvroSchemas(SchemaProvider schemaProvider) { - // register the schemas, so that shuffle does not serialize the full schemas if (null != schemaProvider) { + registerAvroSchemas(schemaProvider.getSourceSchema(), schemaProvider.getTargetSchema()); + } + } + + /** + * Register Avro Schemas. + * + * @param sourceSchema Source Schema + * @param targetSchema Target Schema + */ + private void registerAvroSchemas(Schema sourceSchema, Schema targetSchema) { + // register the schemas, so that shuffle does not serialize the full schemas + if (null != sourceSchema) { List schemas = new ArrayList<>(); - schemas.add(schemaProvider.getSourceSchema()); - if (schemaProvider.getTargetSchema() != null) { - schemas.add(schemaProvider.getTargetSchema()); + schemas.add(sourceSchema); + if (targetSchema != null) { + schemas.add(targetSchema); } LOG.info("Registering Schema :" + schemas); @@ -617,6 +683,11 @@ public class DeltaSync implements Serializable { writeClient.close(); writeClient = null; } + + LOG.info("Shutting down embedded timeline server"); + if (embeddedTimelineService.isPresent()) { + embeddedTimelineService.get().stop(); + } } public FileSystem getFs() { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 5e6e655a2..f86a94214 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -624,21 +624,26 @@ public class HoodieDeltaStreamer implements Serializable { */ protected Boolean onInitializingWriteClient(SparkRDDWriteClient writeClient) { if (cfg.isAsyncCompactionEnabled()) { - asyncCompactService = new SparkAsyncCompactService(new HoodieSparkEngineContext(jssc), writeClient); - // Enqueue existing pending compactions first - HoodieTableMetaClient meta = - new HoodieTableMetaClient(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, true); - List pending = CompactionUtils.getPendingCompactionInstantTimes(meta); - pending.forEach(hoodieInstant -> asyncCompactService.enqueuePendingCompaction(hoodieInstant)); - asyncCompactService.start((error) -> { - // Shutdown DeltaSync - shutdown(false); - return true; - }); - try { - asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions); - } catch (InterruptedException ie) { - throw new HoodieException(ie); + if (null != asyncCompactService) { + // Update the write client used by Async Compactor. + asyncCompactService.updateWriteClient(writeClient); + } else { + asyncCompactService = new SparkAsyncCompactService(new HoodieSparkEngineContext(jssc), writeClient); + // Enqueue existing pending compactions first + HoodieTableMetaClient meta = + new HoodieTableMetaClient(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, true); + List pending = CompactionUtils.getPendingCompactionInstantTimes(meta); + pending.forEach(hoodieInstant -> asyncCompactService.enqueuePendingCompaction(hoodieInstant)); + asyncCompactService.start((error) -> { + // Shutdown DeltaSync + shutdown(false); + return true; + }); + try { + asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions); + } catch (InterruptedException ie) { + throw new HoodieException(ie); + } } } return true; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java index 636668a5c..47c4c2f81 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java @@ -48,9 +48,6 @@ public class SchemaRegistryProvider extends SchemaProvider { "hoodie.deltastreamer.schemaprovider.registry.targetUrl"; } - private final Schema schema; - private final Schema targetSchema; - private static String fetchSchemaFromRegistry(String registryUrl) throws IOException { URL registry = new URL(registryUrl); ObjectMapper mapper = new ObjectMapper(); @@ -61,18 +58,6 @@ public class SchemaRegistryProvider extends SchemaProvider { public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) { super(props, jssc); DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP)); - String registryUrl = props.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP); - String targetRegistryUrl = props.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl); - try { - this.schema = getSchema(registryUrl); - if (!targetRegistryUrl.equals(registryUrl)) { - this.targetSchema = getSchema(targetRegistryUrl); - } else { - this.targetSchema = schema; - } - } catch (IOException ioe) { - throw new HoodieIOException("Error reading schema from registry :" + registryUrl, ioe); - } } private static Schema getSchema(String registryUrl) throws IOException { @@ -81,11 +66,22 @@ public class SchemaRegistryProvider extends SchemaProvider { @Override public Schema getSourceSchema() { - return schema; + String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP); + try { + return getSchema(registryUrl); + } catch (IOException ioe) { + throw new HoodieIOException("Error reading source schema from registry :" + registryUrl, ioe); + } } @Override public Schema getTargetSchema() { - return targetSchema; + String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP); + String targetRegistryUrl = config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl); + try { + return getSchema(targetRegistryUrl); + } catch (IOException ioe) { + throw new HoodieIOException("Error reading target schema from registry :" + registryUrl, ioe); + } } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaSet.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaSet.java new file mode 100644 index 000000000..f06bb0e6c --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaSet.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.schema; + +import java.io.Serializable; +import java.util.HashSet; +import org.apache.avro.Schema; +import org.apache.avro.SchemaNormalization; + +import java.util.Set; + +/** + * Tracks already processed schemas. + */ +public class SchemaSet implements Serializable { + + private final Set processedSchema = new HashSet<>(); + + public boolean isSchemaPresent(Schema schema) { + long schemaKey = SchemaNormalization.parsingFingerprint64(schema); + return processedSchema.contains(schemaKey); + } + + public void addSchema(Schema schema) { + long schemaKey = SchemaNormalization.parsingFingerprint64(schema); + processedSchema.add(schemaKey); + } +}