From 6de9f5d9e5cb1f82d7c32d04b114e7d4a181619b Mon Sep 17 00:00:00 2001 From: satishkotha Date: Mon, 27 Apr 2020 12:50:39 -0700 Subject: [PATCH] [HUDI-819] Fix a bug with MergeOnReadLazyInsertIterable. Variable declared here[1] masks protected statuses variable. So although hoodie writes data, will not include writestatus in the completed section. This can cause duplicates being written (#1540) [1] https://github.com/apache/incubator-hudi/blob/master/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java#L53 --- .../hudi/execution/BulkInsertMapFunction.java | 2 +- ...tIterable.java => LazyInsertIterable.java} | 35 +++++---- .../MergeOnReadLazyInsertIterable.java | 74 ------------------- .../apache/hudi/io/AppendHandleFactory.java | 36 +++++++++ .../apache/hudi/io/CreateHandleFactory.java | 36 +++++++++ .../apache/hudi/io/WriteHandleFactory.java | 35 +++++++++ .../action/commit/CommitActionExecutor.java | 4 +- .../DeltaCommitActionExecutor.java | 7 +- .../TestBoundedInMemoryExecutor.java | 4 +- .../execution/TestBoundedInMemoryQueue.java | 4 +- 10 files changed, 138 insertions(+), 99 deletions(-) rename hudi-client/src/main/java/org/apache/hudi/execution/{CopyOnWriteLazyInsertIterable.java => LazyInsertIterable.java} (81%) delete mode 100644 hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java create mode 100644 hudi-client/src/main/java/org/apache/hudi/io/AppendHandleFactory.java create mode 100644 hudi-client/src/main/java/org/apache/hudi/io/CreateHandleFactory.java create mode 100644 hudi-client/src/main/java/org/apache/hudi/io/WriteHandleFactory.java diff --git a/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java b/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java index 5d4391ca4..67c1d7569 100644 --- a/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java +++ b/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java @@ -50,7 +50,7 @@ public class BulkInsertMapFunction @Override public Iterator> call(Integer partition, Iterator> sortedRecordItr) { - return new CopyOnWriteLazyInsertIterable<>(sortedRecordItr, config, instantTime, hoodieTable, + return new LazyInsertIterable<>(sortedRecordItr, config, instantTime, hoodieTable, fileIDPrefixes.get(partition), hoodieTable.getSparkTaskContextSupplier()); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java b/hudi-client/src/main/java/org/apache/hudi/execution/LazyInsertIterable.java similarity index 81% rename from hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java rename to hudi-client/src/main/java/org/apache/hudi/execution/LazyInsertIterable.java index 8f98496fc..fe0d5c47c 100644 --- a/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java +++ b/hudi-client/src/main/java/org/apache/hudi/execution/LazyInsertIterable.java @@ -28,7 +28,8 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.io.HoodieCreateHandle; +import org.apache.hudi.io.CreateHandleFactory; +import org.apache.hudi.io.WriteHandleFactory; import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.HoodieTable; @@ -43,26 +44,34 @@ import java.util.function.Function; /** * Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new files. */ -public class CopyOnWriteLazyInsertIterable +public class LazyInsertIterable extends LazyIterableIterator, List> { protected final HoodieWriteConfig hoodieConfig; protected final String instantTime; protected final HoodieTable hoodieTable; protected final String idPrefix; - protected int numFilesWritten; protected SparkTaskContextSupplier sparkTaskContextSupplier; + protected WriteHandleFactory writeHandleFactory; - public CopyOnWriteLazyInsertIterable(Iterator> sortedRecordItr, HoodieWriteConfig config, - String instantTime, HoodieTable hoodieTable, String idPrefix, - SparkTaskContextSupplier sparkTaskContextSupplier) { + public LazyInsertIterable(Iterator> sortedRecordItr, HoodieWriteConfig config, + String instantTime, HoodieTable hoodieTable, String idPrefix, + SparkTaskContextSupplier sparkTaskContextSupplier) { + this(sortedRecordItr, config, instantTime, hoodieTable, idPrefix, sparkTaskContextSupplier, + new CreateHandleFactory<>()); + } + + public LazyInsertIterable(Iterator> sortedRecordItr, HoodieWriteConfig config, + String instantTime, HoodieTable hoodieTable, String idPrefix, + SparkTaskContextSupplier sparkTaskContextSupplier, + WriteHandleFactory writeHandleFactory) { super(sortedRecordItr); this.hoodieConfig = config; this.instantTime = instantTime; this.hoodieTable = hoodieTable; this.idPrefix = idPrefix; - this.numFilesWritten = 0; this.sparkTaskContextSupplier = sparkTaskContextSupplier; + this.writeHandleFactory = writeHandleFactory; } // Used for caching HoodieRecord along with insertValue. We need this to offload computation work to buffering thread. @@ -118,10 +127,6 @@ public class CopyOnWriteLazyInsertIterable @Override protected void end() {} - protected String getNextFileId(String idPfx) { - return String.format("%s-%d", idPfx, numFilesWritten++); - } - protected CopyOnWriteInsertHandler getInsertHandler() { return new CopyOnWriteInsertHandler(); } @@ -140,8 +145,8 @@ public class CopyOnWriteLazyInsertIterable final HoodieRecord insertPayload = payload.record; // lazily initialize the handle, for the first time if (handle == null) { - handle = new HoodieCreateHandle(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(), - getNextFileId(idPrefix), sparkTaskContextSupplier); + handle = writeHandleFactory.create(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(), + idPrefix, sparkTaskContextSupplier); } if (handle.canWrite(payload.record)) { @@ -151,8 +156,8 @@ public class CopyOnWriteLazyInsertIterable // handle is full. statuses.add(handle.close()); // Need to handle the rejected payload & open new handle - handle = new HoodieCreateHandle(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(), - getNextFileId(idPrefix), sparkTaskContextSupplier); + handle = writeHandleFactory.create(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(), + idPrefix, sparkTaskContextSupplier); handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload. } } diff --git a/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java b/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java deleted file mode 100644 index 02a9eada8..000000000 --- a/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.execution; - -import org.apache.hudi.client.SparkTaskContextSupplier; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.io.HoodieAppendHandle; -import org.apache.hudi.table.HoodieTable; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -/** - * Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new log files. - */ -public class MergeOnReadLazyInsertIterable extends CopyOnWriteLazyInsertIterable { - - public MergeOnReadLazyInsertIterable(Iterator> sortedRecordItr, HoodieWriteConfig config, - String instantTime, HoodieTable hoodieTable, String idPfx, SparkTaskContextSupplier sparkTaskContextSupplier) { - super(sortedRecordItr, config, instantTime, hoodieTable, idPfx, sparkTaskContextSupplier); - } - - @Override - protected CopyOnWriteInsertHandler getInsertHandler() { - return new MergeOnReadInsertHandler(); - } - - protected class MergeOnReadInsertHandler extends CopyOnWriteInsertHandler { - - @Override - protected void consumeOneRecord(HoodieInsertValueGenResult payload) { - final HoodieRecord insertPayload = payload.record; - List statuses = new ArrayList<>(); - // lazily initialize the handle, for the first time - if (handle == null) { - handle = new HoodieAppendHandle(hoodieConfig, instantTime, hoodieTable, - insertPayload.getPartitionPath(), getNextFileId(idPrefix), sparkTaskContextSupplier); - } - if (handle.canWrite(insertPayload)) { - // write the payload, if the handle has capacity - handle.write(insertPayload, payload.insertValue, payload.exception); - } else { - // handle is full. - handle.close(); - statuses.add(handle.getWriteStatus()); - // Need to handle the rejected payload & open new handle - handle = new HoodieAppendHandle(hoodieConfig, instantTime, hoodieTable, - insertPayload.getPartitionPath(), getNextFileId(idPrefix), sparkTaskContextSupplier); - handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload. - } - } - } - -} diff --git a/hudi-client/src/main/java/org/apache/hudi/io/AppendHandleFactory.java b/hudi-client/src/main/java/org/apache/hudi/io/AppendHandleFactory.java new file mode 100644 index 000000000..4a5554bd9 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/io/AppendHandleFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.client.SparkTaskContextSupplier; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +public class AppendHandleFactory extends WriteHandleFactory { + + @Override + public HoodieAppendHandle create(final HoodieWriteConfig hoodieConfig, final String commitTime, + final HoodieTable hoodieTable, final String partitionPath, + final String fileIdPrefix, final SparkTaskContextSupplier sparkTaskContextSupplier) { + + return new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable, partitionPath, + getNextFileId(fileIdPrefix), sparkTaskContextSupplier); + } +} \ No newline at end of file diff --git a/hudi-client/src/main/java/org/apache/hudi/io/CreateHandleFactory.java b/hudi-client/src/main/java/org/apache/hudi/io/CreateHandleFactory.java new file mode 100644 index 000000000..68d8b4d58 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/io/CreateHandleFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.client.SparkTaskContextSupplier; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +public class CreateHandleFactory extends WriteHandleFactory { + + @Override + public HoodieWriteHandle create(final HoodieWriteConfig hoodieConfig, final String commitTime, + final HoodieTable hoodieTable, final String partitionPath, + final String fileIdPrefix, SparkTaskContextSupplier sparkTaskContextSupplier) { + + return new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, partitionPath, + getNextFileId(fileIdPrefix), sparkTaskContextSupplier); + } +} \ No newline at end of file diff --git a/hudi-client/src/main/java/org/apache/hudi/io/WriteHandleFactory.java b/hudi-client/src/main/java/org/apache/hudi/io/WriteHandleFactory.java new file mode 100644 index 000000000..7039b71e5 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/io/WriteHandleFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.client.SparkTaskContextSupplier; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +public abstract class WriteHandleFactory { + private int numFilesWritten = 0; + + public abstract HoodieWriteHandle create(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, + String partitionPath, String fileIdPrefix, SparkTaskContextSupplier sparkTaskContextSupplier); + + protected String getNextFileId(String idPfx) { + return String.format("%s-%d", idPfx, numFilesWritten++); + } +} \ No newline at end of file diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java index b95883789..5208c1258 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java @@ -29,7 +29,7 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieUpsertException; -import org.apache.hudi.execution.CopyOnWriteLazyInsertIterable; +import org.apache.hudi.execution.LazyInsertIterable; import org.apache.hudi.execution.SparkBoundedInMemoryExecutor; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.table.HoodieTable; @@ -132,7 +132,7 @@ public abstract class CommitActionExecutor> LOG.info("Empty partition"); return Collections.singletonList((List) Collections.EMPTY_LIST).iterator(); } - return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable)table, idPfx, + return new LazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable)table, idPfx, sparkTaskContextSupplier); } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java index 775580ef1..be3806e46 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java @@ -24,8 +24,9 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieUpsertException; -import org.apache.hudi.execution.MergeOnReadLazyInsertIterable; +import org.apache.hudi.execution.LazyInsertIterable; import org.apache.hudi.io.HoodieAppendHandle; +import org.apache.hudi.io.AppendHandleFactory; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadProfile; @@ -84,8 +85,8 @@ public abstract class DeltaCommitActionExecutor throws Exception { // If canIndexLogFiles, write inserts to log files else write inserts to parquet files if (table.getIndex().canIndexLogFiles()) { - return new MergeOnReadLazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable)table, idPfx, - sparkTaskContextSupplier); + return new LazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable)table, idPfx, + sparkTaskContextSupplier, new AppendHandleFactory<>()); } else { return super.handleInsert(idPfx, recordItr); } diff --git a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java index 58f898ca8..1db248fd7 100644 --- a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java +++ b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java @@ -25,7 +25,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.HoodieInsertValueGenResult; +import org.apache.hudi.execution.LazyInsertIterable.HoodieInsertValueGenResult; import org.apache.avro.generic.IndexedRecord; import org.junit.After; @@ -37,7 +37,7 @@ import java.util.List; import scala.Tuple2; -import static org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.getTransformFunction; +import static org.apache.hudi.execution.LazyInsertIterable.getTransformFunction; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; diff --git a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java index 72b3eff28..859cf4e36 100644 --- a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java +++ b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java @@ -31,7 +31,7 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer; import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.HoodieInsertValueGenResult; +import org.apache.hudi.execution.LazyInsertIterable.HoodieInsertValueGenResult; import org.apache.avro.generic.IndexedRecord; import org.junit.After; @@ -53,7 +53,7 @@ import java.util.stream.IntStream; import scala.Tuple2; -import static org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.getTransformFunction; +import static org.apache.hudi.execution.LazyInsertIterable.getTransformFunction; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when;