From ff8313caf1dfe0b13fa9f160489a74600b0e8756 Mon Sep 17 00:00:00 2001 From: Shen Hong Date: Sun, 3 Jan 2021 20:38:45 +0800 Subject: [PATCH] [HUDI-1423] Support delete in hudi-java-client (#2353) --- .../hudi/client/HoodieJavaWriteClient.java | 6 +- .../table/HoodieJavaCopyOnWriteTable.java | 3 +- .../JavaDeleteCommitActionExecutor.java | 46 +++++++ .../table/action/commit/JavaDeleteHelper.java | 125 ++++++++++++++++++ .../java/HoodieJavaWriteClientExample.java | 10 ++ 5 files changed, 188 insertions(+), 2 deletions(-) create mode 100644 hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteCommitActionExecutor.java create mode 100644 hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java index 71a85deff..2b5e60760 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java @@ -167,7 +167,11 @@ public class HoodieJavaWriteClient extends @Override public List delete(List keys, String instantTime) { - throw new HoodieNotSupportedException("Delete is not supported in HoodieJavaClient"); + HoodieTable>, List, List> table = + getTableAndInitCtx(WriteOperationType.DELETE, instantTime); + setOperationType(WriteOperationType.DELETE); + HoodieWriteMetadata> result = table.delete(context,instantTime, keys); + return postWrite(result, instantTime, table); } @Override diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index ddc995ab1..9a22f75f1 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -38,6 +38,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata; import org.apache.hudi.table.action.clean.JavaCleanActionExecutor; +import org.apache.hudi.table.action.commit.JavaDeleteCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaInsertCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor; @@ -81,7 +82,7 @@ public class HoodieJavaCopyOnWriteTable extends H public HoodieWriteMetadata> delete(HoodieEngineContext context, String instantTime, List keys) { - throw new HoodieNotSupportedException("Delete is not supported yet"); + return new JavaDeleteCommitActionExecutor<>(context, config, this, instantTime, keys).execute(); } @Override diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteCommitActionExecutor.java new file mode 100644 index 000000000..ca61af4fe --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteCommitActionExecutor.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import java.util.List; + +public class JavaDeleteCommitActionExecutor> extends BaseJavaCommitActionExecutor { + private final List keys; + + public JavaDeleteCommitActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, HoodieTable table, + String instantTime, List keys) { + super(context, config, table, instantTime, WriteOperationType.DELETE); + this.keys = keys; + } + + @Override + public HoodieWriteMetadata> execute() { + return JavaDeleteHelper.newInstance().execute(instantTime, keys, context, config, table, this); + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java new file mode 100644 index 000000000..0de1111f1 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.model.EmptyHoodieRecordPayload; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieUpsertException; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.WorkloadProfile; +import org.apache.hudi.table.WorkloadStat; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import java.time.Duration; +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.stream.Collectors; + +@SuppressWarnings("checkstyle:LineLength") +public class JavaDeleteHelper extends + AbstractDeleteHelper>, List, List, R> { + + private JavaDeleteHelper() { + } + + private static class DeleteHelperHolder { + private static final JavaDeleteHelper JAVA_DELETE_HELPER = new JavaDeleteHelper(); + } + + public static JavaDeleteHelper newInstance() { + return DeleteHelperHolder.JAVA_DELETE_HELPER; + } + + @Override + public List deduplicateKeys(List keys, + HoodieTable>, List, List> table, + int parallelism) { + boolean isIndexingGlobal = table.getIndex().isGlobal(); + if (isIndexingGlobal) { + HashSet recordKeys = keys.stream().map(HoodieKey::getRecordKey).collect(Collectors.toCollection(HashSet::new)); + List deduplicatedKeys = new LinkedList<>(); + keys.forEach(x -> { + if (recordKeys.contains(x.getRecordKey())) { + deduplicatedKeys.add(x); + } + }); + return deduplicatedKeys; + } else { + HashSet set = new HashSet<>(keys); + keys.clear(); + keys.addAll(set); + return keys; + } + } + + @Override + public HoodieWriteMetadata> execute(String instantTime, + List keys, + HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable>, List, List> table, + BaseCommitActionExecutor>, List, List, R> deleteExecutor) { + try { + HoodieWriteMetadata> result = null; + List dedupedKeys = keys; + final int parallelism = config.getDeleteShuffleParallelism(); + if (config.shouldCombineBeforeDelete()) { + // De-dupe/merge if needed + dedupedKeys = deduplicateKeys(keys, table, parallelism); + } + + List> dedupedRecords = + dedupedKeys.stream().map(key -> new HoodieRecord<>(key, new EmptyHoodieRecordPayload())).collect(Collectors.toList()); + Instant beginTag = Instant.now(); + // perform index look up to get existing location of records + List> taggedRecords = + table.getIndex().tagLocation(dedupedRecords, context, table); + Duration tagLocationDuration = Duration.between(beginTag, Instant.now()); + + // filter out non existent keys/records + List> taggedValidRecords = taggedRecords.stream().filter(HoodieRecord::isCurrentLocationKnown).collect(Collectors.toList()); + if (!taggedValidRecords.isEmpty()) { + result = deleteExecutor.execute(taggedValidRecords); + result.setIndexLookupDuration(tagLocationDuration); + } else { + // if entire set of keys are non existent + deleteExecutor.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())), instantTime); + result = new HoodieWriteMetadata<>(); + result.setWriteStatuses(Collections.EMPTY_LIST); + deleteExecutor.commitOnAutoCommit(result); + } + return result; + } catch (Throwable e) { + if (e instanceof HoodieUpsertException) { + throw (HoodieUpsertException) e; + } + throw new HoodieUpsertException("Failed to delete for commit time " + instantTime, e); + } + } + +} diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java b/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java index 31fccfa77..6cb1ea9d2 100644 --- a/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java +++ b/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java @@ -23,6 +23,7 @@ import org.apache.hudi.client.HoodieJavaWriteClient; import org.apache.hudi.client.common.HoodieJavaEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -104,6 +105,15 @@ public class HoodieJavaWriteClientExample { recordsSoFar.stream().map(r -> new HoodieRecord(r)).collect(Collectors.toList()); client.upsert(writeRecords, newCommitTime); + // Delete + newCommitTime = client.startCommit(); + LOG.info("Starting commit " + newCommitTime); + // just delete half of the records + int numToDelete = recordsSoFar.size() / 2; + List toBeDeleted = + recordsSoFar.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList()); + client.delete(toBeDeleted, newCommitTime); + client.close(); } }