1
0

[MINOR] clean up and add comments to flink client (#2261)

This commit is contained in:
Gary Li
2020-11-19 15:27:52 +08:00
committed by GitHub
parent d7af8caa45
commit c8d5ea2752
21 changed files with 102 additions and 71 deletions

View File

@@ -18,14 +18,6 @@
package org.apache.hudi.table.action.commit; package org.apache.hudi.table.action.commit;
import java.io.ByteArrayOutputStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.utils.MergingIterator; import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieBaseFile;
@@ -37,15 +29,23 @@ import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
/** /**
* Helper to read records from previous version of parquet and run Merge. * Helper to read records from previous version of base file and run Merge.
*/ */
public abstract class AbstractMergeHelper<T extends HoodieRecordPayload, I, K, O> { public abstract class AbstractMergeHelper<T extends HoodieRecordPayload, I, K, O> {

View File

@@ -96,6 +96,11 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
return recordsWithLocation.stream().filter(v1 -> !v1.isCurrentLocationKnown()).collect(Collectors.toList()); return recordsWithLocation.stream().filter(v1 -> !v1.isCurrentLocationKnown()).collect(Collectors.toList());
} }
@Override
public void bootstrap(Option<Map<String, String>> extraMetadata) {
throw new HoodieNotSupportedException("Bootstrap operation is not supported yet");
}
@Override @Override
public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTime) { public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table = HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =

View File

@@ -18,21 +18,23 @@
package org.apache.hudi.client.common; package org.apache.hudi.client.common;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.common.function.SerializableConsumer; import org.apache.hudi.client.common.function.SerializableConsumer;
import org.apache.hudi.client.common.function.SerializableFunction; import org.apache.hudi.client.common.function.SerializableFunction;
import org.apache.hudi.client.common.function.SerializablePairFunction; import org.apache.hudi.client.common.function.SerializablePairFunction;
import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import scala.Tuple2;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.hadoop.conf.Configuration;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import scala.Tuple2;
import static org.apache.hudi.client.common.function.FunctionWrapper.throwingFlatMapWrapper; import static org.apache.hudi.client.common.function.FunctionWrapper.throwingFlatMapWrapper;
import static org.apache.hudi.client.common.function.FunctionWrapper.throwingForeachWrapper; import static org.apache.hudi.client.common.function.FunctionWrapper.throwingForeachWrapper;
import static org.apache.hudi.client.common.function.FunctionWrapper.throwingMapToPairWrapper; import static org.apache.hudi.client.common.function.FunctionWrapper.throwingMapToPairWrapper;

View File

@@ -19,7 +19,6 @@
package org.apache.hudi.index; package org.apache.hudi.index;
import org.apache.hudi.ApiMaturityLevel; import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieEngineContext; import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.client.common.HoodieFlinkEngineContext;
@@ -31,6 +30,7 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.state.FlinkInMemoryStateIndex; import org.apache.hudi.index.state.FlinkInMemoryStateIndex;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import java.util.List; import java.util.List;

View File

@@ -46,6 +46,14 @@ import org.apache.hudi.table.action.rollback.FlinkCopyOnWriteRollbackActionExecu
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
/**
* Implementation of a very heavily read-optimized Hoodie Table where, all data is stored in base files, with
* zero read amplification.
* <p>
* INSERTS - Produce new files, block aligned to desired size (or) Merge with the smallest existing file, to expand it
* <p>
* UPDATES - Produce a new version of the file, just replacing the updated records with new values
*/
public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends HoodieFlinkTable<T> { public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends HoodieFlinkTable<T> {
protected HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) { protected HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.table.action.commit; package org.apache.hudi.table.action.commit;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieEngineContext; import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieBaseFile;
@@ -47,6 +46,7 @@ import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat; import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;

View File

@@ -94,7 +94,7 @@ public class FlinkDeleteHelper<R> extends
List<HoodieRecord<EmptyHoodieRecordPayload>> dedupedRecords = List<HoodieRecord<EmptyHoodieRecordPayload>> dedupedRecords =
dedupedKeys.stream().map(key -> new HoodieRecord<>(key, new EmptyHoodieRecordPayload())).collect(Collectors.toList()); dedupedKeys.stream().map(key -> new HoodieRecord<>(key, new EmptyHoodieRecordPayload())).collect(Collectors.toList());
Instant beginTag = Instant.now(); Instant beginTag = Instant.now();
// perform index loop up to get existing location of records // perform index look up to get existing location of records
List<HoodieRecord<EmptyHoodieRecordPayload>> taggedRecords = List<HoodieRecord<EmptyHoodieRecordPayload>> taggedRecords =
table.getIndex().tagLocation(dedupedRecords, context, table); table.getIndex().tagLocation(dedupedRecords, context, table);
Duration tagLocationDuration = Duration.between(beginTag, Instant.now()); Duration tagLocationDuration = Duration.between(beginTag, Instant.now());

View File

@@ -49,6 +49,9 @@ import java.util.stream.Collectors;
import scala.Tuple2; import scala.Tuple2;
/**
* Packs incoming records to be upserted, into buckets.
*/
public class UpsertPartitioner<T extends HoodieRecordPayload<T>> implements Partitioner { public class UpsertPartitioner<T extends HoodieRecordPayload<T>> implements Partitioner {
private static final Logger LOG = LogManager.getLogger(UpsertPartitioner.class); private static final Logger LOG = LogManager.getLogger(UpsertPartitioner.class);

View File

@@ -30,11 +30,12 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles; import org.apache.hudi.table.MarkerFiles;
import scala.Tuple2;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import scala.Tuple2;
@SuppressWarnings("checkstyle:LineLength") @SuppressWarnings("checkstyle:LineLength")
public class FlinkMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> extends AbstractMarkerBasedRollbackStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> { public class FlinkMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> extends AbstractMarkerBasedRollbackStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
public FlinkMarkerBasedRollbackStrategy(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) { public FlinkMarkerBasedRollbackStrategy(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {

View File

@@ -50,6 +50,9 @@ import java.util.stream.Collectors;
import scala.Tuple2; import scala.Tuple2;
/**
* Performs Rollback of Hoodie Tables.
*/
public class ListingBasedRollbackHelper implements Serializable { public class ListingBasedRollbackHelper implements Serializable {
private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class); private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class);

View File

@@ -18,6 +18,17 @@
package org.apache.hudi; package org.apache.hudi;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.operator.InstantGenerateOperator;
import org.apache.hudi.operator.KeyedWriteProcessFunction;
import org.apache.hudi.operator.KeyedWriteProcessOperator;
import org.apache.hudi.sink.CommitSink;
import org.apache.hudi.source.JsonStringToHoodieRecordMapFunction;
import org.apache.hudi.util.StreamerUtil;
import com.beust.jcommander.IStringConverter; import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.JCommander; import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameter;
@@ -31,16 +42,6 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.operator.InstantGenerateOperator;
import org.apache.hudi.operator.KeyedWriteProcessFunction;
import org.apache.hudi.operator.KeyedWriteProcessOperator;
import org.apache.hudi.sink.CommitSink;
import org.apache.hudi.source.JsonStringToHoodieRecordMapFunction;
import org.apache.hudi.util.StreamerUtil;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@@ -51,7 +52,7 @@ import java.util.Properties;
* An Utility which can incrementally consume data from Kafka and apply it to the target table. * An Utility which can incrementally consume data from Kafka and apply it to the target table.
* currently, it only support COW table and insert, upsert operation. * currently, it only support COW table and insert, upsert operation.
*/ */
public class HudiFlinkStreamer { public class HoodieFlinkStreamer {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

View File

@@ -18,17 +18,7 @@
package org.apache.hudi.operator; package org.apache.hudi.operator;
import org.apache.flink.api.common.state.ListState; import org.apache.hudi.HoodieFlinkStreamer;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.HudiFlinkStreamer;
import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.client.common.HoodieFlinkEngineContext;
@@ -42,6 +32,17 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -63,7 +64,7 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class); private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class);
public static final String NAME = "InstantGenerateOperator"; public static final String NAME = "InstantGenerateOperator";
private HudiFlinkStreamer.Config cfg; private HoodieFlinkStreamer.Config cfg;
private HoodieFlinkWriteClient writeClient; private HoodieFlinkWriteClient writeClient;
private SerializableConfiguration serializableHadoopConf; private SerializableConfiguration serializableHadoopConf;
private transient FileSystem fs; private transient FileSystem fs;
@@ -87,7 +88,7 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
public void open() throws Exception { public void open() throws Exception {
super.open(); super.open();
// get configs from runtimeContext // get configs from runtimeContext
cfg = (HudiFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
// retry times // retry times
retryTimes = Integer.valueOf(cfg.blockRetryTime); retryTimes = Integer.valueOf(cfg.blockRetryTime);

View File

@@ -18,7 +18,7 @@
package org.apache.hudi.operator; package org.apache.hudi.operator;
import org.apache.hudi.HudiFlinkStreamer; import org.apache.hudi.HoodieFlinkStreamer;
import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
@@ -77,7 +77,7 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, Hood
/** /**
* Job conf. * Job conf.
*/ */
private HudiFlinkStreamer.Config cfg; private HoodieFlinkStreamer.Config cfg;
/** /**
* Write Client. * Write Client.
@@ -90,7 +90,7 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, Hood
indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
cfg = (HudiFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
HoodieFlinkEngineContext context = HoodieFlinkEngineContext context =
new HoodieFlinkEngineContext(new SerializableConfiguration(new org.apache.hadoop.conf.Configuration()), new FlinkTaskContextSupplier(getRuntimeContext())); new HoodieFlinkEngineContext(new SerializableConfiguration(new org.apache.hadoop.conf.Configuration()), new FlinkTaskContextSupplier(getRuntimeContext()));

View File

@@ -18,13 +18,14 @@
package org.apache.hudi.operator; package org.apache.hudi.operator;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;

View File

@@ -18,14 +18,15 @@
package org.apache.hudi.schema; package org.apache.hudi.schema;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StreamerUtil;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;

View File

@@ -18,9 +18,10 @@
package org.apache.hudi.schema; package org.apache.hudi.schema;
import org.apache.avro.Schema;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.avro.Schema;
import java.io.Serializable; import java.io.Serializable;
/** /**

View File

@@ -18,10 +18,6 @@
package org.apache.hudi.sink; package org.apache.hudi.sink;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hudi.HudiFlinkStreamer;
import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
@@ -29,8 +25,13 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieFlinkStreamerException; import org.apache.hudi.exception.HoodieFlinkStreamerException;
import org.apache.hudi.HoodieFlinkStreamer;
import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -50,7 +51,7 @@ public class CommitSink extends RichSinkFunction<Tuple3<String, List<WriteStatus
/** /**
* Job conf. * Job conf.
*/ */
private HudiFlinkStreamer.Config cfg; private HoodieFlinkStreamer.Config cfg;
/** /**
* Write client. * Write client.
@@ -71,7 +72,7 @@ public class CommitSink extends RichSinkFunction<Tuple3<String, List<WriteStatus
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {
super.open(parameters); super.open(parameters);
// Get configs from runtimeContext // Get configs from runtimeContext
cfg = (HudiFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism(); writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism();
@@ -81,7 +82,7 @@ public class CommitSink extends RichSinkFunction<Tuple3<String, List<WriteStatus
@Override @Override
public void invoke(Tuple3<String, List<WriteStatus>, Integer> writeStatues, Context context) { public void invoke(Tuple3<String, List<WriteStatus>, Integer> writeStatues, Context context) {
LOG.info("Receive records, instantTime = [{}], subtaskId = [{}], records size = [{}]", writeStatues.f0, writeStatues.f2, writeStatues.f1.size()); LOG.info("Receive records, instantTime = [{}], subtaskId = [{}], WriteStatus size = [{}]", writeStatues.f0, writeStatues.f2, writeStatues.f1.size());
try { try {
if (bufferedWriteStatus.containsKey(writeStatues.f0)) { if (bufferedWriteStatus.containsKey(writeStatues.f0)) {
bufferedWriteStatus.get(writeStatues.f0).add(writeStatues.f1); bufferedWriteStatus.get(writeStatues.f0).add(writeStatues.f1);

View File

@@ -18,9 +18,7 @@
package org.apache.hudi.source; package org.apache.hudi.source;
import org.apache.avro.generic.GenericRecord; import org.apache.hudi.HoodieFlinkStreamer;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.hudi.HudiFlinkStreamer;
import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
@@ -30,6 +28,9 @@ import org.apache.hudi.schema.FilebasedSchemaProvider;
import org.apache.hudi.util.AvroConvertor; import org.apache.hudi.util.AvroConvertor;
import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StreamerUtil;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.MapFunction;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -42,12 +43,12 @@ public class JsonStringToHoodieRecordMapFunction implements MapFunction<String,
private static Logger LOG = LoggerFactory.getLogger(JsonStringToHoodieRecordMapFunction.class); private static Logger LOG = LoggerFactory.getLogger(JsonStringToHoodieRecordMapFunction.class);
private final HudiFlinkStreamer.Config cfg; private final HoodieFlinkStreamer.Config cfg;
private TypedProperties props; private TypedProperties props;
private KeyGenerator keyGenerator; private KeyGenerator keyGenerator;
private AvroConvertor avroConvertor; private AvroConvertor avroConvertor;
public JsonStringToHoodieRecordMapFunction(HudiFlinkStreamer.Config cfg) { public JsonStringToHoodieRecordMapFunction(HoodieFlinkStreamer.Config cfg) {
this.cfg = cfg; this.cfg = cfg;
init(); init();
} }

View File

@@ -18,10 +18,11 @@
package org.apache.hudi.util; package org.apache.hudi.util;
import org.apache.hudi.avro.MercifulJsonConverter;
import com.twitter.bijection.Injection; import com.twitter.bijection.Injection;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.avro.MercifulJsonConverter;
import java.io.Serializable; import java.io.Serializable;

View File

@@ -18,11 +18,7 @@
package org.apache.hudi.util; package org.apache.hudi.util;
import org.apache.avro.generic.GenericRecord; import org.apache.hudi.HoodieFlinkStreamer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.HudiFlinkStreamer;
import org.apache.hudi.common.config.DFSPropertiesConfiguration; import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
@@ -37,6 +33,11 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator; import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.schema.FilebasedSchemaProvider; import org.apache.hudi.schema.FilebasedSchemaProvider;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -51,14 +52,14 @@ public class StreamerUtil {
private static Logger LOG = LoggerFactory.getLogger(StreamerUtil.class); private static Logger LOG = LoggerFactory.getLogger(StreamerUtil.class);
public static Properties getKafkaProps(HudiFlinkStreamer.Config cfg) { public static Properties getKafkaProps(HoodieFlinkStreamer.Config cfg) {
Properties result = new Properties(); Properties result = new Properties();
result.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cfg.kafkaBootstrapServers); result.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cfg.kafkaBootstrapServers);
result.put(ConsumerConfig.GROUP_ID_CONFIG, cfg.kafkaGroupId); result.put(ConsumerConfig.GROUP_ID_CONFIG, cfg.kafkaGroupId);
return result; return result;
} }
public static TypedProperties getProps(HudiFlinkStreamer.Config cfg) { public static TypedProperties getProps(HoodieFlinkStreamer.Config cfg) {
return readConfig( return readConfig(
FSUtils.getFs(cfg.propsFilePath, getHadoopConf()), FSUtils.getFs(cfg.propsFilePath, getHadoopConf()),
new Path(cfg.propsFilePath), cfg.configs).getConfig(); new Path(cfg.propsFilePath), cfg.configs).getConfig();
@@ -130,7 +131,7 @@ public class StreamerUtil {
} }
} }
public static HoodieWriteConfig getHoodieClientConfig(HudiFlinkStreamer.Config cfg) { public static HoodieWriteConfig getHoodieClientConfig(HoodieFlinkStreamer.Config cfg) {
FileSystem fs = FSUtils.getFs(cfg.targetBasePath, getHadoopConf()); FileSystem fs = FSUtils.getFs(cfg.targetBasePath, getHadoopConf());
HoodieWriteConfig.Builder builder = HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, true) HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, true)