From a086d255c89d12eb42cad8c5ae0e000f3b83bbe6 Mon Sep 17 00:00:00 2001 From: Samrat Date: Tue, 20 Jul 2021 07:19:43 +0530 Subject: [PATCH] [HUDI-1860] Add INSERT_OVERWRITE and INSERT_OVERWRITE_TABLE support to DeltaStreamer (#3184) --- .../client/AbstractHoodieWriteClient.java | 5 +-- .../apache/hudi/common/util/CommitUtils.java | 4 ++ .../testsuite/HoodieDeltaStreamerWrapper.java | 10 +++++ .../testsuite/HoodieTestSuiteWriter.java | 20 ++++++++++ .../dag/nodes/InsertOverwriteNode.java | 40 +++++++++++++++++++ .../dag/nodes/InsertOverwriteTableNode.java | 40 +++++++++++++++++++ .../utilities/deltastreamer/DeltaSync.java | 19 +++++++-- .../functional/TestHoodieDeltaStreamer.java | 34 ++++++++++++++++ 8 files changed, 166 insertions(+), 6 deletions(-) create mode 100644 hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertOverwriteNode.java create mode 100644 hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertOverwriteTableNode.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index c2f90ca38..f0a45c5bc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -673,8 +673,7 @@ public abstract class AbstractHoodieWriteClient insertOverwrite() throws + Exception { + return upsert(WriteOperationType.INSERT_OVERWRITE); + } + + public JavaRDD insertOverwriteTable() throws + Exception { + return upsert(WriteOperationType.INSERT_OVERWRITE_TABLE); + } + public void scheduleCompact() throws Exception { // Since we don't support scheduleCompact() operation in delta-streamer, assume upsert without any data that will // trigger scheduling compaction diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java index 7c1643226..fee76ac82 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java @@ -163,6 +163,26 @@ public class HoodieTestSuiteWriter implements Serializable { } } + public JavaRDD insertOverwrite(Option instantTime) throws Exception { + if(cfg.useDeltaStreamer){ + return deltaStreamerWrapper.insertOverwrite(); + } else { + Pair>> nextBatch = fetchSource(); + lastCheckpoint = Option.of(nextBatch.getValue().getLeft()); + return writeClient.insertOverwrite(nextBatch.getRight().getRight(), instantTime.get()).getWriteStatuses(); + } + } + + public JavaRDD insertOverwriteTable(Option instantTime) throws Exception { + if(cfg.useDeltaStreamer){ + return deltaStreamerWrapper.insertOverwriteTable(); + } else { + Pair>> nextBatch = fetchSource(); + lastCheckpoint = Option.of(nextBatch.getValue().getLeft()); + return writeClient.insertOverwriteTable(nextBatch.getRight().getRight(), instantTime.get()).getWriteStatuses(); + } + } + public JavaRDD bulkInsert(Option instantTime) throws Exception { if (cfg.useDeltaStreamer) { return deltaStreamerWrapper.bulkInsert(); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertOverwriteNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertOverwriteNode.java new file mode 100644 index 000000000..bcd01ff77 --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertOverwriteNode.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite.dag.nodes; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter; +import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; +import org.apache.spark.api.java.JavaRDD; + +public class InsertOverwriteNode extends InsertNode { + + public InsertOverwriteNode(Config config) { + super(config); + } + + @Override + protected JavaRDD ingest(HoodieTestSuiteWriter hoodieTestSuiteWriter, + Option commitTime) + throws Exception { + log.info("Execute insert overwrite node {}", this.getName()); + return hoodieTestSuiteWriter.insertOverwrite(commitTime); + } +} diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertOverwriteTableNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertOverwriteTableNode.java new file mode 100644 index 000000000..508b5b4e4 --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertOverwriteTableNode.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite.dag.nodes; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter; +import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; +import org.apache.spark.api.java.JavaRDD; + +public class InsertOverwriteTableNode extends InsertNode { + + public InsertOverwriteTableNode(Config config) { + super(config); + } + + @Override + protected JavaRDD ingest(HoodieTestSuiteWriter hoodieTestSuiteWriter, + Option commitTime) + throws Exception { + log.info("Execute insert overwrite table node {}", this.getName()); + return hoodieTestSuiteWriter.insertOverwriteTable(commitTime); + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 9d445dc04..7d2feb005 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -32,10 +32,13 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; @@ -82,6 +85,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.function.Function; import java.util.HashMap; import java.util.HashSet; @@ -460,6 +464,12 @@ public class DeltaSync implements Serializable { case BULK_INSERT: writeStatusRDD = writeClient.bulkInsert(records, instantTime); break; + case INSERT_OVERWRITE: + writeStatusRDD = writeClient.insertOverwrite(records, instantTime).getWriteStatuses(); + break; + case INSERT_OVERWRITE_TABLE: + writeStatusRDD = writeClient.insertOverwriteTable(records, instantTime).getWriteStatuses(); + break; default: throw new HoodieDeltaStreamerException("Unknown operation : " + cfg.operation); } @@ -480,8 +490,8 @@ public class DeltaSync implements Serializable { LOG.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total=" + totalErrorRecords + "/" + totalRecords); } - - boolean success = writeClient.commit(instantTime, writeStatusRDD, Option.of(checkpointCommitMetadata)); + String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType)); + boolean success = writeClient.commit(instantTime, writeStatusRDD, Option.of(checkpointCommitMetadata), commitActionType, Collections.emptyMap()); if (success) { LOG.info("Commit " + instantTime + " successful!"); this.formatAdapter.getSource().onCommit(checkpointStr); @@ -530,7 +540,10 @@ public class DeltaSync implements Serializable { RuntimeException lastException = null; while (retryNum <= maxRetries) { try { - return writeClient.startCommit(); + String instantTime = HoodieActiveTimeline.createNewInstantTime(); + String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType)); + writeClient.startCommitWithTime(instantTime, commitActionType); + return instantTime; } catch (IllegalArgumentException ie) { lastException = ie; LOG.error("Got error trying to start a new commit. Retrying after sleeping for a sec", ie); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index db0ab197f..80f471e31 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -1723,6 +1723,40 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { } } + @Test + public void testInsertOverwrite() throws Exception { + testDeltaStreamerWithSpecifiedOperation(dfsBasePath + "/insert_overwrite", WriteOperationType.INSERT_OVERWRITE); + } + + @Test + public void testInsertOverwriteTable() throws Exception { + testDeltaStreamerWithSpecifiedOperation(dfsBasePath + "/insert_overwrite_table", WriteOperationType.INSERT_OVERWRITE_TABLE); + } + + void testDeltaStreamerWithSpecifiedOperation(final String tableBasePath, WriteOperationType operationType) throws Exception { + // Initial insert + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT); + new HoodieDeltaStreamer(cfg, jsc).sync(); + TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertDistanceCount(1000, tableBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1); + + // setting the operationType + cfg.operation = operationType; + // No new data => no commits. + cfg.sourceLimit = 0; + new HoodieDeltaStreamer(cfg, jsc).sync(); + TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertDistanceCount(1000, tableBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1); + + cfg.sourceLimit = 1000; + new HoodieDeltaStreamer(cfg, jsc).sync(); + TestHelpers.assertRecordCount(1950, tableBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertDistanceCount(1950, tableBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2); + } + /** * UDF to calculate Haversine distance. */