1
0

[HUDI-819] Fix a bug with MergeOnReadLazyInsertIterable.

Variable declared here[1] masks protected statuses variable. So although hoodie writes data, will not include writestatus in the completed section. This can cause duplicates being written (#1540)
[1] https://github.com/apache/incubator-hudi/blob/master/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java#L53
This commit is contained in:
satishkotha
2020-04-27 12:50:39 -07:00
committed by GitHub
parent 19ca0b5629
commit 6de9f5d9e5
10 changed files with 138 additions and 99 deletions

View File

@@ -50,7 +50,7 @@ public class BulkInsertMapFunction<T extends HoodieRecordPayload>
@Override
public Iterator<List<WriteStatus>> call(Integer partition, Iterator<HoodieRecord<T>> sortedRecordItr) {
return new CopyOnWriteLazyInsertIterable<>(sortedRecordItr, config, instantTime, hoodieTable,
return new LazyInsertIterable<>(sortedRecordItr, config, instantTime, hoodieTable,
fileIDPrefixes.get(partition), hoodieTable.getSparkTaskContextSupplier());
}
}

View File

@@ -28,7 +28,8 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.table.HoodieTable;
@@ -43,26 +44,34 @@ import java.util.function.Function;
/**
* Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new files.
*/
public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload>
public class LazyInsertIterable<T extends HoodieRecordPayload>
extends LazyIterableIterator<HoodieRecord<T>, List<WriteStatus>> {
protected final HoodieWriteConfig hoodieConfig;
protected final String instantTime;
protected final HoodieTable<T> hoodieTable;
protected final String idPrefix;
protected int numFilesWritten;
protected SparkTaskContextSupplier sparkTaskContextSupplier;
protected WriteHandleFactory<T> writeHandleFactory;
public CopyOnWriteLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
String instantTime, HoodieTable<T> hoodieTable, String idPrefix,
SparkTaskContextSupplier sparkTaskContextSupplier) {
public LazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
String instantTime, HoodieTable<T> hoodieTable, String idPrefix,
SparkTaskContextSupplier sparkTaskContextSupplier) {
this(sortedRecordItr, config, instantTime, hoodieTable, idPrefix, sparkTaskContextSupplier,
new CreateHandleFactory<>());
}
public LazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
String instantTime, HoodieTable<T> hoodieTable, String idPrefix,
SparkTaskContextSupplier sparkTaskContextSupplier,
WriteHandleFactory<T> writeHandleFactory) {
super(sortedRecordItr);
this.hoodieConfig = config;
this.instantTime = instantTime;
this.hoodieTable = hoodieTable;
this.idPrefix = idPrefix;
this.numFilesWritten = 0;
this.sparkTaskContextSupplier = sparkTaskContextSupplier;
this.writeHandleFactory = writeHandleFactory;
}
// Used for caching HoodieRecord along with insertValue. We need this to offload computation work to buffering thread.
@@ -118,10 +127,6 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload>
@Override
protected void end() {}
protected String getNextFileId(String idPfx) {
return String.format("%s-%d", idPfx, numFilesWritten++);
}
protected CopyOnWriteInsertHandler getInsertHandler() {
return new CopyOnWriteInsertHandler();
}
@@ -140,8 +145,8 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload>
final HoodieRecord insertPayload = payload.record;
// lazily initialize the handle, for the first time
if (handle == null) {
handle = new HoodieCreateHandle(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(),
getNextFileId(idPrefix), sparkTaskContextSupplier);
handle = writeHandleFactory.create(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(),
idPrefix, sparkTaskContextSupplier);
}
if (handle.canWrite(payload.record)) {
@@ -151,8 +156,8 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload>
// handle is full.
statuses.add(handle.close());
// Need to handle the rejected payload & open new handle
handle = new HoodieCreateHandle(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(),
getNextFileId(idPrefix), sparkTaskContextSupplier);
handle = writeHandleFactory.create(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(),
idPrefix, sparkTaskContextSupplier);
handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload.
}
}

View File

@@ -1,74 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.execution;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.HoodieAppendHandle;
import org.apache.hudi.table.HoodieTable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new log files.
*/
public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extends CopyOnWriteLazyInsertIterable<T> {
public MergeOnReadLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
String instantTime, HoodieTable<T> hoodieTable, String idPfx, SparkTaskContextSupplier sparkTaskContextSupplier) {
super(sortedRecordItr, config, instantTime, hoodieTable, idPfx, sparkTaskContextSupplier);
}
@Override
protected CopyOnWriteInsertHandler getInsertHandler() {
return new MergeOnReadInsertHandler();
}
protected class MergeOnReadInsertHandler extends CopyOnWriteInsertHandler {
@Override
protected void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
final HoodieRecord insertPayload = payload.record;
List<WriteStatus> statuses = new ArrayList<>();
// lazily initialize the handle, for the first time
if (handle == null) {
handle = new HoodieAppendHandle(hoodieConfig, instantTime, hoodieTable,
insertPayload.getPartitionPath(), getNextFileId(idPrefix), sparkTaskContextSupplier);
}
if (handle.canWrite(insertPayload)) {
// write the payload, if the handle has capacity
handle.write(insertPayload, payload.insertValue, payload.exception);
} else {
// handle is full.
handle.close();
statuses.add(handle.getWriteStatus());
// Need to handle the rejected payload & open new handle
handle = new HoodieAppendHandle(hoodieConfig, instantTime, hoodieTable,
insertPayload.getPartitionPath(), getNextFileId(idPrefix), sparkTaskContextSupplier);
handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload.
}
}
}
}

View File

@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
public class AppendHandleFactory<T extends HoodieRecordPayload> extends WriteHandleFactory<T> {
@Override
public HoodieAppendHandle<T> create(final HoodieWriteConfig hoodieConfig, final String commitTime,
final HoodieTable<T> hoodieTable, final String partitionPath,
final String fileIdPrefix, final SparkTaskContextSupplier sparkTaskContextSupplier) {
return new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable, partitionPath,
getNextFileId(fileIdPrefix), sparkTaskContextSupplier);
}
}

View File

@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
public class CreateHandleFactory<T extends HoodieRecordPayload> extends WriteHandleFactory<T> {
@Override
public HoodieWriteHandle<T> create(final HoodieWriteConfig hoodieConfig, final String commitTime,
final HoodieTable<T> hoodieTable, final String partitionPath,
final String fileIdPrefix, SparkTaskContextSupplier sparkTaskContextSupplier) {
return new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, partitionPath,
getNextFileId(fileIdPrefix), sparkTaskContextSupplier);
}
}

View File

@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
public abstract class WriteHandleFactory<T extends HoodieRecordPayload> {
private int numFilesWritten = 0;
public abstract HoodieWriteHandle<T> create(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileIdPrefix, SparkTaskContextSupplier sparkTaskContextSupplier);
protected String getNextFileId(String idPfx) {
return String.format("%s-%d", idPfx, numFilesWritten++);
}
}

View File

@@ -29,7 +29,7 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.CopyOnWriteLazyInsertIterable;
import org.apache.hudi.execution.LazyInsertIterable;
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.table.HoodieTable;
@@ -132,7 +132,7 @@ public abstract class CommitActionExecutor<T extends HoodieRecordPayload<T>>
LOG.info("Empty partition");
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
}
return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable<T>)table, idPfx,
return new LazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable<T>)table, idPfx,
sparkTaskContextSupplier);
}

View File

@@ -24,8 +24,9 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.MergeOnReadLazyInsertIterable;
import org.apache.hudi.execution.LazyInsertIterable;
import org.apache.hudi.io.HoodieAppendHandle;
import org.apache.hudi.io.AppendHandleFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
@@ -84,8 +85,8 @@ public abstract class DeltaCommitActionExecutor<T extends HoodieRecordPayload<T>
throws Exception {
// If canIndexLogFiles, write inserts to log files else write inserts to parquet files
if (table.getIndex().canIndexLogFiles()) {
return new MergeOnReadLazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable<T>)table, idPfx,
sparkTaskContextSupplier);
return new LazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable<T>)table, idPfx,
sparkTaskContextSupplier, new AppendHandleFactory<>());
} else {
return super.handleInsert(idPfx, recordItr);
}

View File

@@ -25,7 +25,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.HoodieInsertValueGenResult;
import org.apache.hudi.execution.LazyInsertIterable.HoodieInsertValueGenResult;
import org.apache.avro.generic.IndexedRecord;
import org.junit.After;
@@ -37,7 +37,7 @@ import java.util.List;
import scala.Tuple2;
import static org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.getTransformFunction;
import static org.apache.hudi.execution.LazyInsertIterable.getTransformFunction;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

View File

@@ -31,7 +31,7 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.HoodieInsertValueGenResult;
import org.apache.hudi.execution.LazyInsertIterable.HoodieInsertValueGenResult;
import org.apache.avro.generic.IndexedRecord;
import org.junit.After;
@@ -53,7 +53,7 @@ import java.util.stream.IntStream;
import scala.Tuple2;
import static org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.getTransformFunction;
import static org.apache.hudi.execution.LazyInsertIterable.getTransformFunction;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;