1
0

[HUDI-2439] Remove SparkBoundedInMemoryExecutor (#4860)

This commit is contained in:
Raymond Xu
2022-02-26 05:02:12 -08:00
committed by GitHub
parent 1379300b5b
commit c77b2591d0
14 changed files with 99 additions and 113 deletions

View File

@@ -18,11 +18,6 @@
package org.apache.hudi.table; package org.apache.hudi.table;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCleanerPlan;
@@ -59,6 +54,7 @@ import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
@@ -75,10 +71,17 @@ import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.storage.HoodieLayoutFactory; import org.apache.hudi.table.storage.HoodieLayoutFactory;
import org.apache.hudi.table.storage.HoodieStorageLayout; import org.apache.hudi.table.storage.HoodieStorageLayout;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
@@ -776,8 +779,8 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
* @param triggeringInstantTimestamp - The instant that is triggering this metadata write * @param triggeringInstantTimestamp - The instant that is triggering this metadata write
* @return instance of {@link HoodieTableMetadataWriter} * @return instance of {@link HoodieTableMetadataWriter}
*/ */
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp, public <R extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
Option<T> actionMetadata) { Option<R> actionMetadata) {
// Each engine is expected to override this and // Each engine is expected to override this and
// provide the actual metadata writer, if enabled. // provide the actual metadata writer, if enabled.
return Option.empty(); return Option.empty();
@@ -786,4 +789,8 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
public HoodieTableMetadata getMetadataTable() { public HoodieTableMetadata getMetadataTable() {
return this.metadata; return this.metadata;
} }
public Runnable getPreExecuteRunnable() {
return Functions.noop();
}
} }

View File

@@ -91,7 +91,7 @@ public class FlinkMergeHelper<T extends HoodieRecordPayload> extends BaseMergeHe
ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>(); ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>(); ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator), wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator),
Option.of(new UpdateHandler(mergeHandle)), record -> { Option.of(new UpdateHandler(mergeHandle)), record -> {
if (!externalSchemaTransformation) { if (!externalSchemaTransformation) {
return record; return record;

View File

@@ -91,7 +91,7 @@ public class JavaMergeHelper<T extends HoodieRecordPayload> extends BaseMergeHel
ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>(); ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>(); ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator), wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator),
Option.of(new UpdateHandler(mergeHandle)), record -> { Option.of(new UpdateHandler(mergeHandle)), record -> {
if (!externalSchemaTransformation) { if (!externalSchemaTransformation) {
return record; return record;

View File

@@ -1,56 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.execution;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import java.util.Iterator;
import java.util.function.Function;
public class SparkBoundedInMemoryExecutor<I, O, E> extends BoundedInMemoryExecutor<I, O, E> {
// Need to set current spark thread's TaskContext into newly launched thread so that new thread can access
// TaskContext properties.
final TaskContext sparkThreadTaskContext;
public SparkBoundedInMemoryExecutor(final HoodieWriteConfig hoodieConfig, final Iterator<I> inputItr,
BoundedInMemoryQueueConsumer<O, E> consumer, Function<I, O> bufferedIteratorTransform) {
this(hoodieConfig, new IteratorBasedQueueProducer<>(inputItr), consumer, bufferedIteratorTransform);
}
public SparkBoundedInMemoryExecutor(final HoodieWriteConfig hoodieConfig, BoundedInMemoryQueueProducer<I> producer,
BoundedInMemoryQueueConsumer<O, E> consumer, Function<I, O> bufferedIteratorTransform) {
super(hoodieConfig.getWriteBufferLimitBytes(), producer, Option.of(consumer), bufferedIteratorTransform);
this.sparkThreadTaskContext = TaskContext.get();
}
@Override
public void preExecute() {
// Passing parent thread's TaskContext to newly launched thread for it to access original TaskContext properties.
TaskContext$.MODULE$.setTaskContext(sparkThreadTaskContext);
}
}

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.execution; package org.apache.hudi.execution;
import org.apache.avro.Schema;
import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.engine.TaskContextSupplier;
@@ -30,6 +29,8 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.WriteHandleFactory; import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@@ -84,8 +85,8 @@ public class SparkLazyInsertIterable<T extends HoodieRecordPayload> extends Hood
schema = HoodieAvroUtils.addMetadataFields(schema); schema = HoodieAvroUtils.addMetadataFields(schema);
} }
bufferedIteratorExecutor = bufferedIteratorExecutor =
new SparkBoundedInMemoryExecutor<>(hoodieConfig, inputItr, getInsertHandler(), new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, getInsertHandler(),
getTransformFunction(schema, hoodieConfig)); getTransformFunction(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable());
final List<WriteStatus> result = bufferedIteratorExecutor.execute(); final List<WriteStatus> result = bufferedIteratorExecutor.execute();
assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining(); assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
return result; return result;

View File

@@ -18,8 +18,6 @@
package org.apache.hudi.table; package org.apache.hudi.table;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodieData;
@@ -39,6 +37,11 @@ import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.fs.Path;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import java.io.IOException; import java.io.IOException;
@@ -111,8 +114,8 @@ public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
* @return instance of {@link HoodieTableMetadataWriter} * @return instance of {@link HoodieTableMetadataWriter}
*/ */
@Override @Override
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp, public <R extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
Option<T> actionMetadata) { Option<R> actionMetadata) {
if (config.isMetadataTableEnabled()) { if (config.isMetadataTableEnabled()) {
// Create the metadata table writer. First time after the upgrade this creation might trigger // Create the metadata table writer. First time after the upgrade this creation might trigger
// metadata table bootstrapping. Bootstrapping process could fail and checking the table // metadata table bootstrapping. Bootstrapping process could fail and checking the table
@@ -132,4 +135,10 @@ public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
return Option.empty(); return Option.empty();
} }
@Override
public Runnable getPreExecuteRunnable() {
final TaskContext taskContext = TaskContext.get();
return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
}
} }

View File

@@ -29,7 +29,6 @@ import org.apache.hudi.common.util.OrcReaderIterator;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieBootstrapHandle; import org.apache.hudi.io.HoodieBootstrapHandle;
import org.apache.hudi.keygen.KeyGeneratorInterface; import org.apache.hudi.keygen.KeyGeneratorInterface;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
@@ -68,7 +67,7 @@ class OrcBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
Reader orcReader = OrcFile.createReader(sourceFilePath, OrcFile.readerOptions(table.getHadoopConf())); Reader orcReader = OrcFile.createReader(sourceFilePath, OrcFile.readerOptions(table.getHadoopConf()));
TypeDescription orcSchema = orcReader.getSchema(); TypeDescription orcSchema = orcReader.getSchema();
try (RecordReader reader = orcReader.rows(new Reader.Options(table.getHadoopConf()).schema(orcSchema))) { try (RecordReader reader = orcReader.rows(new Reader.Options(table.getHadoopConf()).schema(orcSchema))) {
wrapper = new SparkBoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void>(config, wrapper = new BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void>(config.getWriteBufferLimitBytes(),
new OrcReaderIterator(reader, avroSchema, orcSchema), new BootstrapRecordConsumer(bootstrapHandle), inp -> { new OrcReaderIterator(reader, avroSchema, orcSchema), new BootstrapRecordConsumer(bootstrapHandle), inp -> {
String recKey = keyGenerator.getKey(inp).getRecordKey(); String recKey = keyGenerator.getKey(inp).getRecordKey();
GenericRecord gr = new GenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA); GenericRecord gr = new GenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA);
@@ -76,7 +75,7 @@ class OrcBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
BootstrapRecordPayload payload = new BootstrapRecordPayload(gr); BootstrapRecordPayload payload = new BootstrapRecordPayload(gr);
HoodieRecord rec = new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload); HoodieRecord rec = new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload);
return rec; return rec;
}); }, table.getPreExecuteRunnable());
wrapper.execute(); wrapper.execute();
} catch (Exception e) { } catch (Exception e) {
throw new HoodieException(e); throw new HoodieException(e);

View File

@@ -28,7 +28,6 @@ import org.apache.hudi.common.util.ParquetReaderIterator;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieBootstrapHandle; import org.apache.hudi.io.HoodieBootstrapHandle;
import org.apache.hudi.keygen.KeyGeneratorInterface; import org.apache.hudi.keygen.KeyGeneratorInterface;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
@@ -72,7 +71,7 @@ class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
try { try {
ParquetReader<IndexedRecord> reader = ParquetReader<IndexedRecord> reader =
AvroParquetReader.<IndexedRecord>builder(sourceFilePath).withConf(table.getHadoopConf()).build(); AvroParquetReader.<IndexedRecord>builder(sourceFilePath).withConf(table.getHadoopConf()).build();
wrapper = new SparkBoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void>(config, wrapper = new BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void>(config.getWriteBufferLimitBytes(),
new ParquetReaderIterator(reader), new BootstrapRecordConsumer(bootstrapHandle), inp -> { new ParquetReaderIterator(reader), new BootstrapRecordConsumer(bootstrapHandle), inp -> {
String recKey = keyGenerator.getKey(inp).getRecordKey(); String recKey = keyGenerator.getKey(inp).getRecordKey();
GenericRecord gr = new GenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA); GenericRecord gr = new GenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA);
@@ -80,7 +79,7 @@ class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
BootstrapRecordPayload payload = new BootstrapRecordPayload(gr); BootstrapRecordPayload payload = new BootstrapRecordPayload(gr);
HoodieRecord rec = new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload); HoodieRecord rec = new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload);
return rec; return rec;
}); }, table.getPreExecuteRunnable());
wrapper.execute(); wrapper.execute();
} catch (Exception e) { } catch (Exception e) {
throw new HoodieException(e); throw new HoodieException(e);

View File

@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -90,13 +89,13 @@ public class SparkMergeHelper<T extends HoodieRecordPayload> extends BaseMergeHe
ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>(); ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>(); ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
wrapper = new SparkBoundedInMemoryExecutor(table.getConfig(), readerIterator, wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), readerIterator,
new UpdateHandler(mergeHandle), record -> { new UpdateHandler(mergeHandle), record -> {
if (!externalSchemaTransformation) { if (!externalSchemaTransformation) {
return record; return record;
} }
return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record); return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record);
}); }, table.getPreExecuteRunnable());
wrapper.execute(); wrapper.execute();
} catch (Exception e) { } catch (Exception e) {
throw new HoodieException(e); throw new HoodieException(e);

View File

@@ -22,12 +22,15 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@@ -44,7 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class TestSparkBoundedInMemoryExecutor extends HoodieClientTestHarness { public class TestBoundedInMemoryExecutorInSpark extends HoodieClientTestHarness {
private final String instantTime = HoodieActiveTimeline.createNewInstantTime(); private final String instantTime = HoodieActiveTimeline.createNewInstantTime();
@@ -58,6 +61,11 @@ public class TestSparkBoundedInMemoryExecutor extends HoodieClientTestHarness {
cleanupResources(); cleanupResources();
} }
private Runnable getPreExecuteRunnable() {
final TaskContext taskContext = TaskContext.get();
return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
}
@Test @Test
public void testExecutor() { public void testExecutor() {
@@ -85,10 +93,10 @@ public class TestSparkBoundedInMemoryExecutor extends HoodieClientTestHarness {
} }
}; };
SparkBoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = null; BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = null;
try { try {
executor = new SparkBoundedInMemoryExecutor(hoodieWriteConfig, hoodieRecords.iterator(), consumer, executor = new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer,
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable());
int result = executor.execute(); int result = executor.execute();
// It should buffer and write 100 records // It should buffer and write 100 records
assertEquals(100, result); assertEquals(100, result);
@@ -131,11 +139,11 @@ public class TestSparkBoundedInMemoryExecutor extends HoodieClientTestHarness {
} }
}; };
SparkBoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = null; BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = null;
try { try {
executor = new SparkBoundedInMemoryExecutor(hoodieWriteConfig, hoodieRecords.iterator(), consumer, executor = new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer,
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable());
SparkBoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> finalExecutor = executor; BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> finalExecutor = executor;
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();

View File

@@ -25,6 +25,11 @@ import java.io.Serializable;
*/ */
public interface Functions { public interface Functions {
static Runnable noop() {
return () -> {
};
}
/** /**
* A function which has not any parameter. * A function which has not any parameter.
*/ */

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.common.util.queue; package org.apache.hudi.common.util.queue;
import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SizeEstimator; import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
@@ -26,7 +27,8 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import java.util.Arrays; import java.util.Collections;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@@ -54,29 +56,35 @@ public class BoundedInMemoryExecutor<I, O, E> {
private final List<BoundedInMemoryQueueProducer<I>> producers; private final List<BoundedInMemoryQueueProducer<I>> producers;
// Consumer // Consumer
private final Option<BoundedInMemoryQueueConsumer<O, E>> consumer; private final Option<BoundedInMemoryQueueConsumer<O, E>> consumer;
// pre-execute function to implement environment specific behavior before executors (producers/consumer) run
private final Runnable preExecuteRunnable;
public BoundedInMemoryExecutor(final long bufferLimitInBytes, final Iterator<I> inputItr,
BoundedInMemoryQueueConsumer<O, E> consumer, Function<I, O> transformFunction, Runnable preExecuteRunnable) {
this(bufferLimitInBytes, new IteratorBasedQueueProducer<>(inputItr), Option.of(consumer), transformFunction, preExecuteRunnable);
}
public BoundedInMemoryExecutor(final long bufferLimitInBytes, BoundedInMemoryQueueProducer<I> producer, public BoundedInMemoryExecutor(final long bufferLimitInBytes, BoundedInMemoryQueueProducer<I> producer,
Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction) { Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction) {
this(bufferLimitInBytes, Arrays.asList(producer), consumer, transformFunction, new DefaultSizeEstimator<>()); this(bufferLimitInBytes, producer, consumer, transformFunction, Functions.noop());
}
public BoundedInMemoryExecutor(final long bufferLimitInBytes, BoundedInMemoryQueueProducer<I> producer,
Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction, Runnable preExecuteRunnable) {
this(bufferLimitInBytes, Collections.singletonList(producer), consumer, transformFunction, new DefaultSizeEstimator<>(), preExecuteRunnable);
} }
public BoundedInMemoryExecutor(final long bufferLimitInBytes, List<BoundedInMemoryQueueProducer<I>> producers, public BoundedInMemoryExecutor(final long bufferLimitInBytes, List<BoundedInMemoryQueueProducer<I>> producers,
Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction, Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction,
final SizeEstimator<O> sizeEstimator) { final SizeEstimator<O> sizeEstimator, Runnable preExecuteRunnable) {
this.producers = producers; this.producers = producers;
this.consumer = consumer; this.consumer = consumer;
this.preExecuteRunnable = preExecuteRunnable;
// Ensure single thread for each producer thread and one for consumer // Ensure single thread for each producer thread and one for consumer
this.executorService = Executors.newFixedThreadPool(producers.size() + 1); this.executorService = Executors.newFixedThreadPool(producers.size() + 1);
this.queue = new BoundedInMemoryQueue<>(bufferLimitInBytes, transformFunction, sizeEstimator); this.queue = new BoundedInMemoryQueue<>(bufferLimitInBytes, transformFunction, sizeEstimator);
} }
/**
* Callback to implement environment specific behavior before executors (producers/consumer) run.
*/
public void preExecute() {
// Do Nothing in general context
}
/** /**
* Start all Producers. * Start all Producers.
*/ */
@@ -88,7 +96,7 @@ public class BoundedInMemoryExecutor<I, O, E> {
producers.stream().map(producer -> { producers.stream().map(producer -> {
return completionService.submit(() -> { return completionService.submit(() -> {
try { try {
preExecute(); preExecuteRunnable.run();
producer.produce(queue); producer.produce(queue);
} catch (Throwable e) { } catch (Throwable e) {
LOG.error("error producing records", e); LOG.error("error producing records", e);
@@ -116,7 +124,7 @@ public class BoundedInMemoryExecutor<I, O, E> {
return consumer.map(consumer -> { return consumer.map(consumer -> {
return executorService.submit(() -> { return executorService.submit(() -> {
LOG.info("starting consumer thread"); LOG.info("starting consumer thread");
preExecute(); preExecuteRunnable.run();
try { try {
E result = consumer.consume(queue); E result = consumer.consume(queue);
LOG.info("Queue Consumption is done; notifying producer threads"); LOG.info("Queue Consumption is done; notifying producer threads");

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner; import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer; import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
@@ -50,6 +51,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.function.Function;
/** /**
* Utilities for format. * Utilities for format.
@@ -193,8 +195,9 @@ public class FormatUtils {
HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(new JobConf(hadoopConf)), HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(new JobConf(hadoopConf)),
getParallelProducers(), getParallelProducers(),
Option.empty(), Option.empty(),
x -> x, Function.identity(),
new DefaultSizeEstimator<>()); new DefaultSizeEstimator<>(),
Functions.noop());
// Consumer of this record reader // Consumer of this record reader
this.iterator = this.executor.getQueue().iterator(); this.iterator = this.executor.getQueue().iterator();
this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, hadoopConf, this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, hadoopConf,

View File

@@ -18,18 +18,10 @@
package org.apache.hudi.hadoop.realtime; package org.apache.hudi.hadoop.realtime;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner; import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer; import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
@@ -40,6 +32,18 @@ import org.apache.hudi.hadoop.SafeParquetRecordReaderWrapper;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader
implements RecordReader<NullWritable, ArrayWritable> { implements RecordReader<NullWritable, ArrayWritable> {
@@ -74,7 +78,7 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader
this.parquetRecordsIterator = new RecordReaderValueIterator<>(this.parquetReader); this.parquetRecordsIterator = new RecordReaderValueIterator<>(this.parquetReader);
this.executor = new BoundedInMemoryExecutor<>( this.executor = new BoundedInMemoryExecutor<>(
HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf), getParallelProducers(), HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf), getParallelProducers(),
Option.empty(), x -> x, new DefaultSizeEstimator<>()); Option.empty(), Function.identity(), new DefaultSizeEstimator<>(), Functions.noop());
// Consumer of this record reader // Consumer of this record reader
this.iterator = this.executor.getQueue().iterator(); this.iterator = this.executor.getQueue().iterator();
this.logRecordScanner = HoodieUnMergedLogRecordScanner.newBuilder() this.logRecordScanner = HoodieUnMergedLogRecordScanner.newBuilder()