diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index d845b9075..e3e0eb421 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -25,12 +25,14 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieNotSupportedException; @@ -46,6 +48,7 @@ import com.codahale.metrics.Timer; import org.apache.hadoop.conf.Configuration; import java.io.IOException; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -218,4 +221,35 @@ public class HoodieFlinkWriteClient extends return unCompletedTimeline.getInstants().filter(x -> x.getAction().equals(commitType)).map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); } + + public String getInflightAndRequestedInstant(String tableType) { + final String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType)); + HoodieFlinkTable table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); + HoodieTimeline unCompletedTimeline = table.getMetaClient().getCommitsTimeline().filterInflightsAndRequested(); + return unCompletedTimeline.getInstants() + .filter(x -> x.getAction().equals(commitType)) + .map(HoodieInstant::getTimestamp) + .collect(Collectors.toList()).stream() + .max(Comparator.naturalOrder()) + .orElse(null); + } + + public String getLastCompletedInstant(String tableType) { + final String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType)); + HoodieFlinkTable table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); + HoodieTimeline completedTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants(); + return completedTimeline.getInstants() + .filter(x -> x.getAction().equals(commitType)) + .map(HoodieInstant::getTimestamp) + .collect(Collectors.toList()).stream() + .max(Comparator.naturalOrder()) + .orElse(null); + } + + public void deletePendingInstant(String tableType, String instant) { + HoodieFlinkTable table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); + String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType)); + table.getMetaClient().getActiveTimeline() + .deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, instant)); + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java index 4354ea3f9..44eafd57f 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java @@ -54,9 +54,7 @@ public class FlinkInMemoryStateIndex extends Flin if (context.getRuntimeContext() != null) { MapStateDescriptor indexStateDesc = new MapStateDescriptor<>("indexState", TypeInformation.of(HoodieKey.class), TypeInformation.of(HoodieRecordLocation.class)); - if (context.getRuntimeContext() != null) { - mapState = context.getRuntimeContext().getMapState(indexStateDesc); - } + mapState = context.getRuntimeContext().getMapState(indexStateDesc); } } diff --git a/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc b/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc index a972bfd52..f1c9fd5b3 100644 --- a/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc +++ b/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc @@ -57,10 +57,10 @@ /* overlaps with 'commitsRollback' field. Adding this to track action type for all the instants being rolled back. */ { "name": "instantsRollback", - "default": null, + "default": [], "type": { "type": "array", - "default": null, + "default": [], "items": "HoodieInstantInfo" } } diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index ec19fe354..3b989db2a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -217,7 +217,7 @@ public class HoodieAvroUtils { private static Schema initRecordKeySchema() { Schema.Field recordKeyField = - new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance()); + new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE); Schema recordKeySchema = Schema.createRecord("HoodieRecordKey", "", "", false); recordKeySchema.setFields(Collections.singletonList(recordKeyField)); return recordKeySchema; @@ -263,9 +263,9 @@ public class HoodieAvroUtils { */ public static Schema appendNullSchemaFields(Schema schema, List newFieldNames) { List newFields = schema.getFields().stream() - .map(field -> new Field(field.name(), field.schema(), field.doc(), field.defaultValue())).collect(Collectors.toList()); + .map(field -> new Field(field.name(), field.schema(), field.doc(), field.defaultVal())).collect(Collectors.toList()); for (String newField : newFieldNames) { - newFields.add(new Schema.Field(newField, METADATA_FIELD_SCHEMA, "", NullNode.getInstance())); + newFields.add(new Schema.Field(newField, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE)); } Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError()); newSchema.setFields(newFields); @@ -329,7 +329,8 @@ public class HoodieAvroUtils { private static void copyOldValueOrSetDefault(GenericRecord oldRecord, GenericRecord newRecord, Schema.Field f) { // cache the result of oldRecord.get() to save CPU expensive hash lookup - Object fieldValue = oldRecord.get(f.name()); + Schema oldSchema = oldRecord.getSchema(); + Object fieldValue = oldSchema.getField(f.name()) == null ? null : oldRecord.get(f.name()); if (fieldValue == null) { if (f.defaultVal() instanceof JsonProperties.Null) { newRecord.put(f.name(), null); @@ -381,7 +382,7 @@ public class HoodieAvroUtils { throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! " + "Derived Schema Fields: " + new ArrayList<>(schemaFieldsMap.keySet())); } else { - projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue())); + projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal())); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java index e1e61244b..dd1853d83 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java @@ -79,7 +79,14 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload * @returns {@code true} if record represents a delete record. {@code false} otherwise. */ protected boolean isDeleteRecord(GenericRecord genericRecord) { - Object deleteMarker = genericRecord.get("_hoodie_is_deleted"); + final String isDeleteKey = "_hoodie_is_deleted"; + // Modify to be compatible with new version Avro. + // The new version Avro throws for GenericRecord.get if the field name + // does not exist in the schema. + if (genericRecord.getSchema().getField(isDeleteKey) == null) { + return false; + } + Object deleteMarker = genericRecord.get(isDeleteKey); return (deleteMarker instanceof Boolean && (boolean) deleteMarker); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index 17e93feca..8017bc3d7 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -98,8 +98,8 @@ public class HoodieTestDataGenerator { + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},"; public static final String FARE_FLATTENED_SCHEMA = "{\"name\": \"fare\", \"type\": \"double\"}," + "{\"name\": \"currency\", \"type\": \"string\"},"; - public static final String TIP_NESTED_SCHEMA = "{\"name\": \"tip_history\", \"default\": null, \"type\": {\"type\": " - + "\"array\", \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [" + public static final String TIP_NESTED_SCHEMA = "{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": " + + "\"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [" + "{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},"; public static final String MAP_TYPE_SCHEMA = "{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},"; public static final String EXTRA_TYPE_SCHEMA = "{\"name\": \"distance_in_meters\", \"type\": \"int\"}," diff --git a/hudi-flink/pom.xml b/hudi-flink/pom.xml index 3d100c772..2a0f39556 100644 --- a/hudi-flink/pom.xml +++ b/hudi-flink/pom.xml @@ -123,28 +123,77 @@ kafka-clients ${kafka.version} + + org.apache.flink + flink-hadoop-compatibility_${scala.binary.version} + ${flink.version} + + + org.apache.flink + flink-avro + ${flink.version} + provided + + + org.apache.flink + flink-json + ${flink.version} + provided + + + org.apache.flink + flink-table-common + ${flink.version} + provided + + + org.apache.flink + flink-table-runtime-blink_${scala.binary.version} + ${flink.version} + provided + org.apache.hadoop hadoop-common compile + + + org.slf4j + slf4j-log4j12 + + org.apache.hadoop hadoop-hdfs compile + + + org.slf4j + slf4j-log4j12 + + org.apache.hadoop hadoop-auth compile + + + org.slf4j + slf4j-log4j12 + + org.apache.avro avro + + 1.10.0 compile @@ -160,6 +209,12 @@ org.apache.hadoop hadoop-mapreduce-client-core compile + + + org.slf4j + slf4j-log4j12 + + @@ -173,7 +228,9 @@ 0.9.7 - + + + org.junit.jupiter junit-jupiter-api @@ -194,28 +251,7 @@ junit-jupiter-params test - - org.mockito - mockito-junit-jupiter - test - - - org.junit.platform - junit-platform-runner - test - - - org.junit.platform - junit-platform-suite-api - test - - - org.junit.platform - junit-platform-commons - test - - - + org.apache.hudi hudi-common @@ -240,14 +276,7 @@ test-jar test - - org.apache.hudi - hudi-hadoop-mr - ${project.version} - test - - - + org.apache.flink flink-test-utils_${scala.binary.version} @@ -259,15 +288,21 @@ flink-runtime_${scala.binary.version} ${flink.version} test - tests + test-jar org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} test - tests + test-jar + + + org.apache.flink + flink-table-runtime-blink_${scala.binary.version} + ${flink.version} + test + test-jar - diff --git a/hudi-flink/src/main/java/org/apache/hudi/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/HoodieFlinkStreamer.java deleted file mode 100644 index b65b43c0d..000000000 --- a/hudi-flink/src/main/java/org/apache/hudi/HoodieFlinkStreamer.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi; - -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; -import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.operator.InstantGenerateOperator; -import org.apache.hudi.operator.KeyedWriteProcessFunction; -import org.apache.hudi.operator.KeyedWriteProcessOperator; -import org.apache.hudi.sink.CommitSink; -import org.apache.hudi.source.JsonStringToHoodieRecordMapFunction; -import org.apache.hudi.util.StreamerUtil; - -import com.beust.jcommander.IStringConverter; -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import com.beust.jcommander.ParameterException; -import org.apache.flink.api.common.serialization.SimpleStringSchema; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; - -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; - -/** - * An Utility which can incrementally consume data from Kafka and apply it to the target table. - * currently, it only support COW table and insert, upsert operation. - */ -public class HoodieFlinkStreamer { - public static void main(String[] args) throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - final Config cfg = new Config(); - JCommander cmd = new JCommander(cfg, null, args); - if (cfg.help || args.length == 0) { - cmd.usage(); - System.exit(1); - } - env.enableCheckpointing(cfg.checkpointInterval); - env.getConfig().setGlobalJobParameters(cfg); - // We use checkpoint to trigger write operation, including instant generating and committing, - // There can only be one checkpoint at one time. - env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); - env.disableOperatorChaining(); - - if (cfg.flinkCheckPointPath != null) { - env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath)); - } - - TypedProperties props = StreamerUtil.getProps(cfg); - - // add kafka config - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cfg.kafkaBootstrapServers); - props.put(ConsumerConfig.GROUP_ID_CONFIG, cfg.kafkaGroupId); - - // add data source config - props.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS, cfg.payloadClassName); - props.put(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, cfg.sourceOrderingField); - - // Read from kafka source - DataStream inputRecords = - env.addSource(new FlinkKafkaConsumer<>(cfg.kafkaTopic, new SimpleStringSchema(), props)) - .filter(Objects::nonNull) - .map(new JsonStringToHoodieRecordMapFunction(props)) - .name("kafka_to_hudi_record") - .uid("kafka_to_hudi_record_uid"); - - // InstantGenerateOperator helps to emit globally unique instantTime - inputRecords.transform(InstantGenerateOperator.NAME, TypeInformation.of(HoodieRecord.class), new InstantGenerateOperator()) - .name("instant_generator") - .uid("instant_generator_id") - - // Keyby partition path, to avoid multiple subtasks writing to a partition at the same time - .keyBy(HoodieRecord::getPartitionPath) - - // write operator, where the write operation really happens - .transform(KeyedWriteProcessOperator.NAME, TypeInformation.of(new TypeHint, Integer>>() { - }), new KeyedWriteProcessOperator(new KeyedWriteProcessFunction())) - .name("write_process") - .uid("write_process_uid") - .setParallelism(env.getParallelism()) - - // Commit can only be executed once, so make it one parallelism - .addSink(new CommitSink()) - .name("commit_sink") - .uid("commit_sink_uid") - .setParallelism(1); - - env.execute(cfg.targetTableName); - } - - public static class Config extends Configuration { - @Parameter(names = {"--kafka-topic"}, description = "kafka topic", required = true) - public String kafkaTopic; - - @Parameter(names = {"--kafka-group-id"}, description = "kafka consumer group id", required = true) - public String kafkaGroupId; - - @Parameter(names = {"--kafka-bootstrap-servers"}, description = "kafka bootstrap.servers", required = true) - public String kafkaBootstrapServers; - - @Parameter(names = {"--flink-checkpoint-path"}, description = "flink checkpoint path") - public String flinkCheckPointPath; - - @Parameter(names = {"--flink-block-retry-times"}, description = "Times to retry when latest instant has not completed") - public String blockRetryTime = "10"; - - @Parameter(names = {"--flink-block-retry-interval"}, description = "Seconds between two tries when latest instant has not completed") - public String blockRetryInterval = "1"; - - @Parameter(names = {"--target-base-path"}, - description = "base path for the target hoodie table. " - + "(Will be created if did not exist first time around. If exists, expected to be a hoodie table)", - required = true) - public String targetBasePath; - - @Parameter(names = {"--target-table"}, description = "name of the target table in Hive", required = true) - public String targetTableName; - - @Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ", required = true) - public String tableType; - - @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for " - + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are " - + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer" - + "to individual classes, for supported properties.") - public String propsFilePath = - "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties"; - - @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " - + "(using the CLI parameter \"--props\") can also be passed command line using this parameter.") - public List configs = new ArrayList<>(); - - @Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how" - + " to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record") - public String sourceOrderingField = "ts"; - - @Parameter(names = {"--payload-class"}, description = "subclass of HoodieRecordPayload, that works off " - + "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value") - public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName(); - - @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input " - + "is purely new data/inserts to gain speed)", converter = OperationConverter.class) - public WriteOperationType operation = WriteOperationType.UPSERT; - - @Parameter(names = {"--filter-dupes"}, - description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert") - public Boolean filterDupes = false; - - @Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written") - public Boolean commitOnErrors = false; - - /** - * Flink checkpoint interval. - */ - @Parameter(names = {"--checkpoint-interval"}, description = "Flink checkpoint interval.") - public Long checkpointInterval = 1000 * 5L; - - @Parameter(names = {"--help", "-h"}, help = true) - public Boolean help = false; - } - - private static class OperationConverter implements IStringConverter { - - @Override - public WriteOperationType convert(String value) throws ParameterException { - return WriteOperationType.valueOf(value); - } - } -} diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java new file mode 100644 index 000000000..655fd1aef --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator; + +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.streamer.FlinkStreamerConfig; +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.keygen.SimpleAvroKeyGenerator; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Hoodie Flink config options. + * + *

It has the options for Hoodie table read and write. It also defines some utilities. + */ +public class FlinkOptions { + private FlinkOptions() { + } + + // ------------------------------------------------------------------------ + // Base Options + // ------------------------------------------------------------------------ + public static final ConfigOption PATH = ConfigOptions + .key("path") + .stringType() + .noDefaultValue() + .withDescription("Base path for the target hoodie table." + + "\nThe path would be created if it does not exist,\n" + + "otherwise a Hoodie table expects to be initialized successfully"); + + // ------------------------------------------------------------------------ + // Read Options + // ------------------------------------------------------------------------ + public static final ConfigOption READ_SCHEMA_FILE_PATH = ConfigOptions + .key("read.schema.file.path") + .stringType() + .noDefaultValue() + .withDescription("Avro schema file path, the parsed schema is used for deserializing"); + + // ------------------------------------------------------------------------ + // Write Options + // ------------------------------------------------------------------------ + public static final ConfigOption TABLE_NAME = ConfigOptions + .key(HoodieWriteConfig.TABLE_NAME) + .stringType() + .noDefaultValue() + .withDescription("Table name to register to Hive metastore"); + + public static final ConfigOption TABLE_TYPE = ConfigOptions + .key("write.table.type") + .stringType() + .defaultValue(HoodieTableType.COPY_ON_WRITE.name()) + .withDescription("Type of table to write, COPY_ON_WRITE (or) MERGE_ON_READ"); + + public static final ConfigOption OPERATION = ConfigOptions + .key("write.operation") + .stringType() + .defaultValue("upsert") + .withDescription("The write operation, that this write should do"); + + public static final ConfigOption PRECOMBINE_FIELD = ConfigOptions + .key("write.precombine.field") + .stringType() + .defaultValue("ts") + .withDescription("Field used in preCombining before actual write. When two records have the same\n" + + "key value, we will pick the one with the largest value for the precombine field,\n" + + "determined by Object.compareTo(..)"); + + public static final ConfigOption PAYLOAD_CLASS = ConfigOptions + .key("write.payload.class") + .stringType() + .defaultValue(OverwriteWithLatestAvroPayload.class.getName()) + .withDescription("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.\n" + + "This will render any value set for the option in-effective"); + + /** + * Flag to indicate whether to drop duplicates upon insert. + * By default insert will accept duplicates, to gain extra performance. + */ + public static final ConfigOption INSERT_DROP_DUPS = ConfigOptions + .key("write.insert.drop.duplicates") + .booleanType() + .defaultValue(false) + .withDescription("Flag to indicate whether to drop duplicates upon insert.\n" + + "By default insert will accept duplicates, to gain extra performance"); + + public static final ConfigOption RETRY_TIMES = ConfigOptions + .key("write.retry.times") + .intType() + .defaultValue(3) + .withDescription("Flag to indicate how many times streaming job should retry for a failed checkpoint batch.\n" + + "By default 3"); + + public static final ConfigOption RETRY_INTERVAL_MS = ConfigOptions + .key("write.retry.interval.ms") + .longType() + .defaultValue(2000L) + .withDescription("Flag to indicate how long (by millisecond) before a retry should issued for failed checkpoint batch.\n" + + "By default 2000 and it will be doubled by every retry"); + + public static final ConfigOption IGNORE_FAILED = ConfigOptions + .key("write.ignore.failed") + .booleanType() + .defaultValue(true) + .withDescription("Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch.\n" + + "By default true (in favor of streaming progressing over data integrity)"); + + public static final ConfigOption RECORD_KEY_FIELD = ConfigOptions + .key(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY) + .stringType() + .defaultValue("uuid") + .withDescription("Record key field. Value to be used as the `recordKey` component of `HoodieKey`.\n" + + "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using " + + "the dot notation eg: `a.b.c`"); + + public static final ConfigOption PARTITION_PATH_FIELD = ConfigOptions + .key(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY) + .stringType() + .defaultValue("partition-path") + .withDescription("Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`.\n" + + "Actual value obtained by invoking .toString()"); + + public static final ConfigOption KEYGEN_CLASS = ConfigOptions + .key(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP) + .stringType() + .defaultValue(SimpleAvroKeyGenerator.class.getName()) + .withDescription("Key generator class, that implements will extract the key out of incoming record"); + + public static final ConfigOption WRITE_TASK_PARALLELISM = ConfigOptions + .key("write.task.parallelism") + .intType() + .defaultValue(4) + .withDescription("Parallelism of tasks that do actual write, default is 4"); + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + // Remember to update the set when adding new options. + public static final List> OPTIONAL_OPTIONS = Arrays.asList( + TABLE_TYPE, OPERATION, PRECOMBINE_FIELD, PAYLOAD_CLASS, INSERT_DROP_DUPS, RETRY_TIMES, + RETRY_INTERVAL_MS, IGNORE_FAILED, RECORD_KEY_FIELD, PARTITION_PATH_FIELD, KEYGEN_CLASS + ); + + // Prefix for Hoodie specific properties. + private static final String PROPERTIES_PREFIX = "properties."; + + /** + * Transforms a {@code HoodieFlinkStreamer.Config} into {@code Configuration}. + * The latter is more suitable for the table APIs. It reads all the properties + * in the properties file (set by `--props` option) and cmd line options + * (set by `--hoodie-conf` option). + */ + @SuppressWarnings("unchecked, rawtypes") + public static org.apache.flink.configuration.Configuration fromStreamerConfig(FlinkStreamerConfig config) { + Map propsMap = new HashMap((Map) StreamerUtil.getProps(config)); + org.apache.flink.configuration.Configuration conf = fromMap(propsMap); + + conf.setString(FlinkOptions.PATH, config.targetBasePath); + conf.setString(READ_SCHEMA_FILE_PATH, config.readSchemaFilePath); + conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName); + // copy_on_write works same as COPY_ON_WRITE + conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase()); + conf.setString(FlinkOptions.OPERATION, config.operation.value()); + conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField); + conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName); + conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, config.filterDupes); + conf.setInteger(FlinkOptions.RETRY_TIMES, Integer.parseInt(config.instantRetryTimes)); + conf.setLong(FlinkOptions.RETRY_INTERVAL_MS, Long.parseLong(config.instantRetryInterval)); + conf.setBoolean(FlinkOptions.IGNORE_FAILED, config.commitOnErrors); + conf.setString(FlinkOptions.RECORD_KEY_FIELD, config.recordKeyField); + conf.setString(FlinkOptions.PARTITION_PATH_FIELD, config.partitionPathField); + conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass); + conf.setInteger(FlinkOptions.WRITE_TASK_PARALLELISM, config.writeTaskNum); + + return conf; + } + + /** + * Collects the config options that start with 'properties.' into a 'key'='value' list. + */ + public static Map getHoodieProperties(Map options) { + final Map hoodieProperties = new HashMap<>(); + + if (hasPropertyOptions(options)) { + options.keySet().stream() + .filter(key -> key.startsWith(PROPERTIES_PREFIX)) + .forEach(key -> { + final String value = options.get(key); + final String subKey = key.substring((PROPERTIES_PREFIX).length()); + hoodieProperties.put(subKey, value); + }); + } + return hoodieProperties; + } + + /** + * Collects all the config options, the 'properties.' prefix would be removed if the option key starts with it. + */ + public static Configuration flatOptions(Configuration conf) { + final Map propsMap = new HashMap<>(); + + conf.toMap().forEach((key, value) -> { + final String subKey = key.startsWith(PROPERTIES_PREFIX) + ? key.substring((PROPERTIES_PREFIX).length()) + : key; + propsMap.put(subKey, value); + }); + return fromMap(propsMap); + } + + private static boolean hasPropertyOptions(Map options) { + return options.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX)); + } + + /** Creates a new configuration that is initialized with the options of the given map. */ + private static Configuration fromMap(Map map) { + final Configuration configuration = new Configuration(); + map.forEach(configuration::setString); + return configuration; + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java index 103aef514..5c9930d60 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java @@ -18,7 +18,7 @@ package org.apache.hudi.operator; -import org.apache.hudi.HoodieFlinkStreamer; +import org.apache.hudi.streamer.FlinkStreamerConfig; import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.common.HoodieFlinkEngineContext; @@ -66,7 +66,7 @@ public class InstantGenerateOperator extends AbstractStreamOperator

Work Flow

+ * + *

The function firstly buffers the data as a batch of {@link HoodieRecord}s, + * It flushes(write) the records batch when a Flink checkpoint starts. After a batch has been written successfully, + * the function notifies its operator coordinator {@link StreamWriteOperatorCoordinator} to mark a successful write. + * + *

Exactly-once Semantics

+ * + *

The task implements exactly-once semantics by buffering the data between checkpoints. The operator coordinator + * starts a new instant on the time line when a checkpoint triggers, the coordinator checkpoints always + * start before its operator, so when this function starts a checkpoint, a REQUESTED instant already exists. + * The function process thread then block data buffering and the checkpoint thread starts flushing the existing data buffer. + * When the existing data buffer write successfully, the process thread unblock and start buffering again for the next round checkpoint. + * Because any checkpoint failures would trigger the write rollback, it implements the exactly-once semantics. + * + *

Fault Tolerance

+ * + *

The operator coordinator checks the validity for the last instant when it starts a new one. The operator rolls back + * the written data and throws when any error occurs. This means any checkpoint or task failure would trigger a failover. + * The operator coordinator would try several times when committing the writestatus. + * + *

Note: The function task requires the input stream be partitioned by the partition fields to avoid different write tasks + * write to the same file group that conflict. The general case for partition path is a datetime field, + * so the sink task is very possible to have IO bottleneck, the more flexible solution is to shuffle the + * data by the file group IDs. + * + * @param Type of the input record + * @see StreamWriteOperatorCoordinator + */ +public class StreamWriteFunction extends KeyedProcessFunction implements CheckpointedFunction { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(StreamWriteFunction.class); + + /** + * Write buffer for a checkpoint. + */ + private transient List buffer; + + /** + * The buffer lock to control data buffering/flushing. + */ + private transient ReentrantLock bufferLock; + + /** + * The condition to decide whether to add new records into the buffer. + */ + private transient Condition addToBufferCondition; + + /** + * Flag saying whether there is an on-going checkpoint. + */ + private volatile boolean onCheckpointing = false; + + /** + * Config options. + */ + private final Configuration config; + + /** + * Id of current subtask. + */ + private int taskID; + + /** + * Write Client. + */ + private transient HoodieFlinkWriteClient writeClient; + + private transient BiFunction, String, List> writeFunction; + + /** + * HoodieKey generator. + */ + private transient KeyGenerator keyGenerator; + + /** + * Row type of the input. + */ + private final RowType rowType; + + /** + * Avro schema of the input. + */ + private final Schema avroSchema; + + private transient RowDataToAvroConverters.RowDataToAvroConverter converter; + + /** + * The REQUESTED instant we write the data. + */ + private volatile String currentInstant; + + /** + * Gateway to send operator events to the operator coordinator. + */ + private transient OperatorEventGateway eventGateway; + + /** + * Constructs a StreamingSinkFunction. + * + * @param rowType The input row type + * @param config The config options + */ + public StreamWriteFunction(RowType rowType, Configuration config) { + this.rowType = rowType; + this.avroSchema = StreamerUtil.getSourceSchema(config); + this.config = config; + } + + @Override + public void open(Configuration parameters) throws IOException { + this.taskID = getRuntimeContext().getIndexOfThisSubtask(); + this.keyGenerator = StreamerUtil.createKeyGenerator(FlinkOptions.flatOptions(this.config)); + this.converter = RowDataToAvroConverters.createConverter(this.rowType); + initBuffer(); + initWriteClient(); + initWriteFunction(); + } + + @Override + public void initializeState(FunctionInitializationContext context) { + // no operation + } + + @Override + public void snapshotState(FunctionSnapshotContext functionSnapshotContext) { + bufferLock.lock(); + try { + // Based on the fact that the coordinator starts the checkpoint first, + // it would check the validity. + this.onCheckpointing = true; + this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE)); + Preconditions.checkNotNull(this.currentInstant, + "No inflight instant when flushing data"); + // wait for the buffer data flush out and request a new instant + flushBuffer(); + // signal the task thread to start buffering + addToBufferCondition.signal(); + } finally { + this.onCheckpointing = false; + bufferLock.unlock(); + } + } + + @Override + public void processElement(I value, KeyedProcessFunction.Context ctx, Collector out) throws Exception { + bufferLock.lock(); + try { + if (onCheckpointing) { + addToBufferCondition.await(); + } + this.buffer.add(toHoodieRecord(value)); + } finally { + bufferLock.unlock(); + } + } + + @Override + public void close() { + if (this.writeClient != null) { + this.writeClient.close(); + } + } + + // ------------------------------------------------------------------------- + // Getter/Setter + // ------------------------------------------------------------------------- + + @VisibleForTesting + @SuppressWarnings("rawtypes") + public List getBuffer() { + return buffer; + } + + @VisibleForTesting + @SuppressWarnings("rawtypes") + public HoodieFlinkWriteClient getWriteClient() { + return writeClient; + } + + public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) { + this.eventGateway = operatorEventGateway; + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + private void initBuffer() { + this.buffer = new ArrayList<>(); + this.bufferLock = new ReentrantLock(); + this.addToBufferCondition = this.bufferLock.newCondition(); + } + + private void initWriteClient() { + HoodieFlinkEngineContext context = + new HoodieFlinkEngineContext( + new SerializableConfiguration(StreamerUtil.getHadoopConf()), + new FlinkTaskContextSupplier(getRuntimeContext())); + + writeClient = new HoodieFlinkWriteClient<>(context, StreamerUtil.getHoodieClientConfig(this.config)); + } + + private void initWriteFunction() { + final String writeOperation = this.config.get(FlinkOptions.OPERATION); + switch (WriteOperationType.fromValue(writeOperation)) { + case INSERT: + this.writeFunction = (records, instantTime) -> this.writeClient.insert(records, instantTime); + break; + case UPSERT: + this.writeFunction = (records, instantTime) -> this.writeClient.upsert(records, instantTime); + break; + default: + throw new RuntimeException("Unsupported write operation : " + writeOperation); + } + } + + /** + * Converts the give record to a {@link HoodieRecord}. + * + * @param record The input record + * @return HoodieRecord based on the configuration + * @throws IOException if error occurs + */ + @SuppressWarnings("rawtypes") + private HoodieRecord toHoodieRecord(I record) throws IOException { + boolean shouldCombine = this.config.getBoolean(FlinkOptions.INSERT_DROP_DUPS) + || WriteOperationType.fromValue(this.config.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT; + GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record); + final String payloadClazz = this.config.getString(FlinkOptions.PAYLOAD_CLASS); + Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, + this.config.getString(FlinkOptions.PRECOMBINE_FIELD), false); + HoodieRecordPayload payload = shouldCombine + ? StreamerUtil.createPayload(payloadClazz, gr, orderingVal) + : StreamerUtil.createPayload(payloadClazz, gr); + return new HoodieRecord<>(keyGenerator.getKey(gr), payload); + } + + private void flushBuffer() { + final List writeStatus; + if (buffer.size() > 0) { + writeStatus = writeFunction.apply(buffer, currentInstant); + buffer.clear(); + } else { + LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant); + writeStatus = Collections.emptyList(); + } + this.eventGateway.sendEventToCoordinator(new BatchWriteSuccessEvent(this.taskID, currentInstant, writeStatus)); + this.currentInstant = ""; + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java new file mode 100644 index 000000000..3f4d940ee --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.table.types.logical.RowType; + +/** + * Operator for {@link StreamSink}. + * + * @param The input type + */ +public class StreamWriteOperator + extends KeyedProcessOperator + implements OperatorEventHandler { + private final StreamWriteFunction sinkFunction; + + public StreamWriteOperator(RowType rowType, Configuration conf) { + super(new StreamWriteFunction<>(rowType, conf)); + this.sinkFunction = (StreamWriteFunction) getUserFunction(); + } + + @Override + public void handleOperatorEvent(OperatorEvent operatorEvent) { + // do nothing + } + + void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) { + sinkFunction.setOperatorEventGateway(operatorEventGateway); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java new file mode 100644 index 000000000..524c6015e --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator; + +import org.apache.hudi.client.FlinkTaskContextSupplier; +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.operator.event.BatchWriteSuccessEvent; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * {@link OperatorCoordinator} for {@link StreamWriteFunction}. + * + *

This coordinator starts a new instant when a new checkpoint starts. It commits the instant when all the + * operator tasks write the buffer successfully for a round of checkpoint. + * + *

If there is no data for a round of checkpointing, it rolls back the metadata. + * + * @see StreamWriteFunction for the work flow and semantics + */ +public class StreamWriteOperatorCoordinator + implements OperatorCoordinator { + private static final Logger LOG = LoggerFactory.getLogger(StreamWriteOperatorCoordinator.class); + + /** + * Config options. + */ + private final Configuration conf; + + /** + * Write client. + */ + private transient HoodieFlinkWriteClient writeClient; + + private long inFlightCheckpoint = -1; + + /** + * Current REQUESTED instant, for validation. + */ + private String inFlightInstant = ""; + + /** + * Event buffer for one round of checkpointing. When all the elements are non-null and have the same + * write instant, then the instant succeed and we can commit it. + */ + private transient BatchWriteSuccessEvent[] eventBuffer; + + /** + * Task number of the operator. + */ + private final int parallelism; + + /** + * Constructs a StreamingSinkOperatorCoordinator. + * + * @param conf The config options + * @param parallelism The operator task number + */ + public StreamWriteOperatorCoordinator( + Configuration conf, + int parallelism) { + this.conf = conf; + this.parallelism = parallelism; + } + + @Override + public void start() throws Exception { + // initialize event buffer + reset(); + // writeClient + initWriteClient(); + // init table, create it if not exists. + initTable(); + } + + @Override + public void close() { + if (writeClient != null) { + writeClient.close(); + } + this.eventBuffer = null; + } + + @Override + public void checkpointCoordinator(long checkpointId, CompletableFuture result) { + try { + final String errMsg = "A new checkpoint starts while the last checkpoint buffer" + + " data has not finish writing, roll back the last write and throw"; + checkAndForceCommit(errMsg); + this.inFlightInstant = this.writeClient.startCommit(); + this.inFlightCheckpoint = checkpointId; + LOG.info("Create instant [{}], at checkpoint [{}]", this.inFlightInstant, checkpointId); + result.complete(writeCheckpointBytes()); + } catch (Throwable throwable) { + // when a checkpoint fails, throws directly. + result.completeExceptionally( + new CompletionException( + String.format("Failed to checkpoint Instant %s for source %s", + this.inFlightInstant, this.getClass().getSimpleName()), throwable)); + } + } + + @Override + public void checkpointComplete(long checkpointId) { + // start to commit the instant. + checkAndCommitWithRetry(); + } + + public void notifyCheckpointAborted(long checkpointId) { + Preconditions.checkState(inFlightCheckpoint == checkpointId, + "The aborted checkpoint should always be the last checkpoint"); + checkAndForceCommit("The last checkpoint was aborted, roll back the last write and throw"); + } + + @Override + public void resetToCheckpoint(@Nullable byte[] checkpointData) throws Exception { + if (checkpointData != null) { + // restore when any checkpoint completed + deserializeCheckpointAndRestore(checkpointData); + } + } + + @Override + public void handleEventFromOperator(int i, OperatorEvent operatorEvent) { + // no event to handle + Preconditions.checkState(operatorEvent instanceof BatchWriteSuccessEvent, + "The coordinator can only handle BatchWriteSuccessEvent"); + BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent; + Preconditions.checkState(event.getInstantTime().equals(this.inFlightInstant), + String.format("Receive an unexpected event for instant %s from task %d", + event.getInstantTime(), event.getTaskID())); + this.eventBuffer[event.getTaskID()] = event; + } + + @Override + public void subtaskFailed(int i, @Nullable Throwable throwable) { + // no operation + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + @SuppressWarnings("rawtypes") + private void initWriteClient() { + writeClient = new HoodieFlinkWriteClient( + new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)), + StreamerUtil.getHoodieClientConfig(this.conf), + true); + } + + private void initTable() throws IOException { + final String basePath = this.conf.getString(FlinkOptions.PATH); + final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf(); + // Hadoop FileSystem + try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) { + if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))) { + HoodieTableMetaClient.initTableType( + hadoopConf, + basePath, + HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)), + this.conf.getString(FlinkOptions.TABLE_NAME), + "archived", + this.conf.getString(FlinkOptions.PAYLOAD_CLASS), + 1); + LOG.info("Table initialized"); + } else { + LOG.info("Table [{}/{}] already exists, no need to initialize the table", + basePath, this.conf.getString(FlinkOptions.TABLE_NAME)); + } + } + } + + static byte[] readBytes(DataInputStream in, int size) throws IOException { + byte[] bytes = new byte[size]; + in.readFully(bytes); + return bytes; + } + + /** + * Serialize the coordinator state. The current implementation may not be super efficient, + * but it should not matter that much because most of the state should be rather small. + * Large states themselves may already be a problem regardless of how the serialization + * is implemented. + * + * @return A byte array containing the serialized state of the source coordinator. + * @throws IOException When something goes wrong in serialization. + */ + private byte[] writeCheckpointBytes() throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputViewStreamWrapper(baos)) { + + out.writeLong(this.inFlightCheckpoint); + byte[] serializedInstant = this.inFlightInstant.getBytes(); + out.writeInt(serializedInstant.length); + out.write(serializedInstant); + out.flush(); + return baos.toByteArray(); + } + } + + /** + * Restore the state of this source coordinator from the state bytes. + * + * @param bytes The checkpoint bytes that was returned from {@link #writeCheckpointBytes()} + * @throws Exception When the deserialization failed. + */ + private void deserializeCheckpointAndRestore(byte[] bytes) throws Exception { + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + DataInputStream in = new DataInputViewStreamWrapper(bais)) { + long checkpointID = in.readLong(); + int serializedInstantSize = in.readInt(); + byte[] serializedInstant = readBytes(in, serializedInstantSize); + this.inFlightCheckpoint = checkpointID; + this.inFlightInstant = new String(serializedInstant); + } + } + + private void reset() { + this.inFlightInstant = ""; + this.eventBuffer = new BatchWriteSuccessEvent[this.parallelism]; + } + + private void checkAndForceCommit(String errMsg) { + if (!checkReady()) { + // forced but still has inflight instant + String inflightInstant = writeClient.getInflightAndRequestedInstant(this.conf.getString(FlinkOptions.TABLE_TYPE)); + if (inflightInstant != null) { + assert inflightInstant.equals(this.inFlightInstant); + writeClient.rollback(this.inFlightInstant); + throw new HoodieException(errMsg); + } + if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) { + // The last checkpoint finished successfully. + return; + } + } + doCommit(); + } + + private void checkAndCommitWithRetry() { + int retryTimes = this.conf.getInteger(FlinkOptions.RETRY_TIMES); + if (retryTimes < 0) { + retryTimes = 1; + } + long retryIntervalMillis = this.conf.getLong(FlinkOptions.RETRY_INTERVAL_MS); + int tryTimes = 0; + while (tryTimes++ < retryTimes) { + try { + if (!checkReady()) { + // Do not throw if the try times expires but the event buffer are still not ready, + // because we have a force check when next checkpoint starts. + sleepFor(retryIntervalMillis); + continue; + } + doCommit(); + return; + } catch (Throwable throwable) { + String cause = throwable.getCause() == null ? "" : throwable.getCause().toString(); + LOG.warn("Try to commit the instant {} failed, with times {} and cause {}", this.inFlightInstant, tryTimes, cause); + if (tryTimes == retryTimes) { + throw new HoodieException(throwable); + } + sleepFor(retryIntervalMillis); + } + } + } + + /** + * Sleep {@code intervalMillis} milliseconds in current thread. + */ + private void sleepFor(long intervalMillis) { + try { + TimeUnit.MILLISECONDS.sleep(intervalMillis); + } catch (InterruptedException e) { + LOG.error("Thread interrupted while waiting to retry the instant commits"); + throw new HoodieException(e); + } + } + + /** Checks the buffer is ready to commit. */ + private boolean checkReady() { + return Arrays.stream(eventBuffer).allMatch(event -> + event != null && event.getInstantTime().equals(this.inFlightInstant)); + } + + /** Performs the actual commit action. */ + private void doCommit() { + List writeResults = Arrays.stream(eventBuffer) + .map(BatchWriteSuccessEvent::getWriteStatuses) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + if (writeResults.size() == 0) { + // No data has written, clear the metadata file + this.writeClient.deletePendingInstant(this.conf.getString(FlinkOptions.TABLE_TYPE), this.inFlightInstant); + reset(); + return; + } + + // commit or rollback + long totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L); + long totalRecords = writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L); + boolean hasErrors = totalErrorRecords > 0; + + if (!hasErrors || this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) { + HashMap checkpointCommitMetadata = new HashMap<>(); + if (hasErrors) { + LOG.warn("Some records failed to merge but forcing commit since commitOnErrors set to true. Errors/Total=" + + totalErrorRecords + "/" + totalRecords); + } + + boolean success = writeClient.commit(this.inFlightInstant, writeResults, Option.of(checkpointCommitMetadata)); + if (success) { + reset(); + LOG.info("Commit instant [{}] success!", this.inFlightInstant); + } else { + throw new HoodieException(String.format("Commit instant [%s] failed!", this.inFlightInstant)); + } + } else { + LOG.error("Error when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords); + LOG.error("The first 100 error messages"); + writeResults.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws -> { + LOG.error("Global error for partition path {} and fileID {}: {}", + ws.getGlobalError(), ws.getPartitionPath(), ws.getFileId()); + if (ws.getErrors().size() > 0) { + ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " and value " + value)); + } + }); + // Rolls back instant + writeClient.rollback(this.inFlightInstant); + throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", this.inFlightInstant)); + } + } + + @VisibleForTesting + public BatchWriteSuccessEvent[] getEventBuffer() { + return eventBuffer; + } + + @VisibleForTesting + public String getInFlightInstant() { + return inFlightInstant; + } + + /** + * Provider for {@link StreamWriteOperatorCoordinator}. + */ + public static class Provider implements OperatorCoordinator.Provider { + private final OperatorID operatorId; + private final Configuration conf; + private final int numTasks; + + public Provider(OperatorID operatorId, Configuration conf, int numTasks) { + this.operatorId = operatorId; + this.conf = conf; + this.numTasks = numTasks; + } + + public OperatorID getOperatorId() { + return this.operatorId; + } + + public OperatorCoordinator create(Context context) { + return new StreamWriteOperatorCoordinator(this.conf, this.numTasks); + } + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java new file mode 100644 index 000000000..f5faa54ea --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher; +import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.table.types.logical.RowType; + +/** + * Factory class for {@link StreamWriteOperator}. + */ +public class StreamWriteOperatorFactory + extends SimpleUdfStreamOperatorFactory + implements CoordinatedOperatorFactory, OneInputStreamOperatorFactory { + private static final long serialVersionUID = 1L; + + private final StreamWriteOperator operator; + private final Configuration conf; + private final int numTasks; + + public StreamWriteOperatorFactory( + RowType rowType, + Configuration conf, + int numTasks) { + super(new StreamWriteOperator<>(rowType, conf)); + this.operator = (StreamWriteOperator) getOperator(); + this.conf = conf; + this.numTasks = numTasks; + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator(StreamOperatorParameters parameters) { + final OperatorID operatorID = parameters.getStreamConfig().getOperatorID(); + final OperatorEventDispatcher eventDispatcher = parameters.getOperatorEventDispatcher(); + + this.operator.setOperatorEventGateway(eventDispatcher.getOperatorEventGateway(operatorID)); + this.operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); + this.operator.setProcessingTimeService(this.processingTimeService); + eventDispatcher.registerEventHandler(operatorID, operator); + return (T) operator; + } + + @Override + public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) { + return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf, this.numTasks); + } + + @Override + public void setProcessingTimeService(ProcessingTimeService processingTimeService) { + super.setProcessingTimeService(processingTimeService); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/event/BatchWriteSuccessEvent.java b/hudi-flink/src/main/java/org/apache/hudi/operator/event/BatchWriteSuccessEvent.java new file mode 100644 index 000000000..db5432e1e --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/event/BatchWriteSuccessEvent.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.event; + +import org.apache.flink.runtime.operators.coordination.OperatorEvent; + +import org.apache.hudi.client.WriteStatus; + +import java.util.List; + +/** + * An operator event to mark successful checkpoint batch write. + */ +public class BatchWriteSuccessEvent implements OperatorEvent { + private static final long serialVersionUID = 1L; + + private final List writeStatuses; + private final int taskID; + private final String instantTime; + + public BatchWriteSuccessEvent( + int taskID, + String instantTime, + List writeStatuses) { + this.taskID = taskID; + this.instantTime = instantTime; + this.writeStatuses = writeStatuses; + } + + public List getWriteStatuses() { + return writeStatuses; + } + + public int getTaskID() { + return taskID; + } + + public String getInstantTime() { + return instantTime; + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java b/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java index 82699d978..aed6e17de 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java +++ b/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java @@ -21,9 +21,11 @@ package org.apache.hudi.schema; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.operator.FlinkOptions; import org.apache.hudi.util.StreamerUtil; import org.apache.avro.Schema; +import org.apache.flink.configuration.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -43,16 +45,13 @@ public class FilebasedSchemaProvider extends SchemaProvider { private static final String TARGET_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider.target.schema.file"; } - private final FileSystem fs; - private final Schema sourceSchema; private Schema targetSchema; public FilebasedSchemaProvider(TypedProperties props) { - super(props); StreamerUtil.checkRequiredProperties(props, Collections.singletonList(Config.SOURCE_SCHEMA_FILE_PROP)); - this.fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), StreamerUtil.getHadoopConf()); + FileSystem fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), StreamerUtil.getHadoopConf()); try { this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(props.getString(Config.SOURCE_SCHEMA_FILE_PROP)))); if (props.containsKey(Config.TARGET_SCHEMA_FILE_PROP)) { @@ -64,6 +63,16 @@ public class FilebasedSchemaProvider extends SchemaProvider { } } + public FilebasedSchemaProvider(Configuration conf) { + final String readSchemaPath = conf.getString(FlinkOptions.READ_SCHEMA_FILE_PATH); + final FileSystem fs = FSUtils.getFs(readSchemaPath, StreamerUtil.getHadoopConf()); + try { + this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(readSchemaPath))); + } catch (IOException ioe) { + throw new HoodieIOException("Error reading schema", ioe); + } + } + @Override public Schema getSourceSchema() { return sourceSchema; diff --git a/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java b/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java index 74b406782..5def413b5 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java +++ b/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java @@ -18,8 +18,6 @@ package org.apache.hudi.schema; -import org.apache.hudi.common.config.TypedProperties; - import org.apache.avro.Schema; import java.io.Serializable; @@ -29,12 +27,6 @@ import java.io.Serializable; */ public abstract class SchemaProvider implements Serializable { - protected TypedProperties config; - - protected SchemaProvider(TypedProperties props) { - this.config = props; - } - public abstract Schema getSourceSchema(); public Schema getTargetSchema() { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CommitSink.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CommitSink.java index 4ca793076..3a12842e4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/CommitSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CommitSink.java @@ -25,7 +25,7 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieFlinkStreamerException; -import org.apache.hudi.HoodieFlinkStreamer; +import org.apache.hudi.streamer.FlinkStreamerConfig; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.api.java.tuple.Tuple3; @@ -51,7 +51,7 @@ public class CommitSink extends RichSinkFunction configs = new ArrayList<>(); + + @Parameter(names = {"--record-key-field"}, description = "Record key field. Value to be used as the `recordKey` component of `HoodieKey`.\n" + + "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using " + + "the dot notation eg: `a.b.c`. By default `uuid`.") + public String recordKeyField = "uuid"; + + @Parameter(names = {"--partition-path-field"}, description = "Partition path field. Value to be used at \n" + + "the `partitionPath` component of `HoodieKey`. Actual value obtained by invoking .toString(). By default `partitionpath`.") + public String partitionPathField = "partitionpath"; + + @Parameter(names = {"--keygen-class"}, description = "Key generator class, that implements will extract the key out of incoming record.\n" + + "By default `SimpleAvroKeyGenerator`.") + public String keygenClass = SimpleAvroKeyGenerator.class.getName(); + + @Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how" + + " to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record.") + public String sourceOrderingField = "ts"; + + @Parameter(names = {"--payload-class"}, description = "Subclass of HoodieRecordPayload, that works off " + + "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value.") + public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName(); + + @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input " + + "is purely new data/inserts to gain speed).", converter = OperationConverter.class) + public WriteOperationType operation = WriteOperationType.UPSERT; + + @Parameter(names = {"--filter-dupes"}, + description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert.") + public Boolean filterDupes = false; + + @Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written.") + public Boolean commitOnErrors = false; + + /** + * Flink checkpoint interval. + */ + @Parameter(names = {"--checkpoint-interval"}, description = "Flink checkpoint interval.") + public Long checkpointInterval = 1000 * 5L; + + @Parameter(names = {"--help", "-h"}, help = true) + public Boolean help = false; + + @Parameter(names = {"--write-task-num"}, description = "Parallelism of tasks that do actual write, default is 4.") + public Integer writeTaskNum = 4; +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java new file mode 100644 index 000000000..f6d75d3ea --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.streamer; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.operator.InstantGenerateOperator; +import org.apache.hudi.operator.KeyedWriteProcessFunction; +import org.apache.hudi.operator.KeyedWriteProcessOperator; +import org.apache.hudi.sink.CommitSink; +import org.apache.hudi.source.JsonStringToHoodieRecordMapFunction; +import org.apache.hudi.util.StreamerUtil; + +import com.beust.jcommander.JCommander; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; + +import java.util.List; +import java.util.Objects; + +/** + * An Utility which can incrementally consume data from Kafka and apply it to the target table. + * currently, it only support COW table and insert, upsert operation. + */ +public class HoodieFlinkStreamer { + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final FlinkStreamerConfig cfg = new FlinkStreamerConfig(); + JCommander cmd = new JCommander(cfg, null, args); + if (cfg.help || args.length == 0) { + cmd.usage(); + System.exit(1); + } + env.enableCheckpointing(cfg.checkpointInterval); + env.getConfig().setGlobalJobParameters(cfg); + // We use checkpoint to trigger write operation, including instant generating and committing, + // There can only be one checkpoint at one time. + env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + + if (cfg.flinkCheckPointPath != null) { + env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath)); + } + + TypedProperties props = StreamerUtil.appendKafkaProps(cfg); + + // add data source config + props.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS, cfg.payloadClassName); + props.put(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, cfg.sourceOrderingField); + + // Read from kafka source + DataStream inputRecords = + env.addSource(new FlinkKafkaConsumer<>(cfg.kafkaTopic, new SimpleStringSchema(), props)) + .filter(Objects::nonNull) + .map(new JsonStringToHoodieRecordMapFunction(props)) + .name("kafka_to_hudi_record") + .uid("kafka_to_hudi_record_uid"); + + inputRecords.transform(InstantGenerateOperator.NAME, TypeInformation.of(HoodieRecord.class), new InstantGenerateOperator()) + .name("instant_generator") + .uid("instant_generator_id") + + // Keyby partition path, to avoid multiple subtasks writing to a partition at the same time + .keyBy(HoodieRecord::getPartitionPath) + + // write operator, where the write operation really happens + .transform(KeyedWriteProcessOperator.NAME, TypeInformation.of(new TypeHint, Integer>>() { + }), new KeyedWriteProcessOperator(new KeyedWriteProcessFunction())) + .name("write_process") + .uid("write_process_uid") + .setParallelism(env.getParallelism()) + + // Commit can only be executed once, so make it one parallelism + .addSink(new CommitSink()) + .name("commit_sink") + .uid("commit_sink_uid") + .setParallelism(1); + + env.execute(cfg.targetTableName); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java new file mode 100644 index 000000000..a8f92459a --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.streamer; + +import org.apache.hudi.operator.FlinkOptions; +import org.apache.hudi.operator.StreamWriteOperatorFactory; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.StreamerUtil; + +import com.beust.jcommander.JCommander; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; +import org.apache.flink.formats.json.TimestampFormat; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Properties; + +/** + * An Utility which can incrementally consume data from Kafka and apply it to the target table. + * currently, it only support COW table and insert, upsert operation. + */ +public class HoodieFlinkStreamerV2 { + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final FlinkStreamerConfig cfg = new FlinkStreamerConfig(); + JCommander cmd = new JCommander(cfg, null, args); + if (cfg.help || args.length == 0) { + cmd.usage(); + System.exit(1); + } + env.enableCheckpointing(cfg.checkpointInterval); + env.getConfig().setGlobalJobParameters(cfg); + // We use checkpoint to trigger write operation, including instant generating and committing, + // There can only be one checkpoint at one time. + env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + + if (cfg.flinkCheckPointPath != null) { + env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath)); + } + + Properties kafkaProps = StreamerUtil.appendKafkaProps(cfg); + + // Read from kafka source + RowType rowType = + (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg)) + .getLogicalType(); + Configuration conf = FlinkOptions.fromStreamerConfig(cfg); + int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM); + StreamWriteOperatorFactory operatorFactory = + new StreamWriteOperatorFactory<>(rowType, conf, numWriteTask); + + int partitionFieldIndex = rowType.getFieldIndex(conf.getString(FlinkOptions.PARTITION_PATH_FIELD)); + LogicalType partitionFieldType = rowType.getTypeAt(partitionFieldIndex); + final RowData.FieldGetter partitionFieldGetter = + RowData.createFieldGetter(partitionFieldType, partitionFieldIndex); + + DataStream dataStream = env.addSource(new FlinkKafkaConsumer<>( + cfg.kafkaTopic, + new JsonRowDataDeserializationSchema( + rowType, + new RowDataTypeInfo(rowType), + false, + true, + TimestampFormat.ISO_8601 + ), kafkaProps)) + .name("kafka_source") + .uid("uid_kafka_source") + // Key-by partition path, to avoid multiple subtasks write to a partition at the same time + .keyBy(partitionFieldGetter::getFieldOrNull) + .transform("hoodie_stream_write", null, operatorFactory) + .uid("uid_hoodie_stream_write") + .setParallelism(numWriteTask); // should make it configurable + + env.addOperator(dataStream.getTransformation()); + + env.execute(cfg.targetTableName); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/OperationConverter.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/OperationConverter.java new file mode 100644 index 000000000..9baaf0a80 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/OperationConverter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.streamer; + +import org.apache.hudi.common.model.WriteOperationType; + +import com.beust.jcommander.IStringConverter; +import com.beust.jcommander.ParameterException; + +/** + * Converter that converts a string into enum WriteOperationType. + */ +public class OperationConverter implements IStringConverter { + @Override + public WriteOperationType convert(String value) throws ParameterException { + return WriteOperationType.valueOf(value); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java b/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java new file mode 100644 index 000000000..21664f3d7 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.AtomicDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.TypeInformationRawType; + +import java.util.List; + +/** + * Converts an Avro schema into Flink's type information. It uses {@link org.apache.flink.api.java.typeutils.RowTypeInfo} for + * representing objects and converts Avro types into types that are compatible with Flink's Table & + * SQL API. + * + *

Note: Changes in this class need to be kept in sync with the corresponding runtime classes + * {@link org.apache.flink.formats.avro.AvroRowDeserializationSchema} and {@link org.apache.flink.formats.avro.AvroRowSerializationSchema}. + * + *

NOTE: reference from Flink release 1.12.0, should remove when Flink version upgrade to that. + */ +public class AvroSchemaConverter { + + /** + * Converts an Avro schema {@code schema} into a nested row structure with deterministic field order and + * data types that are compatible with Flink's Table & SQL API. + * + * @param schema Avro schema definition + * @return data type matching the schema + */ + public static DataType convertToDataType(Schema schema) { + switch (schema.getType()) { + case RECORD: + final List schemaFields = schema.getFields(); + + final DataTypes.Field[] fields = new DataTypes.Field[schemaFields.size()]; + for (int i = 0; i < schemaFields.size(); i++) { + final Schema.Field field = schemaFields.get(i); + fields[i] = DataTypes.FIELD(field.name(), convertToDataType(field.schema())); + } + return DataTypes.ROW(fields).notNull(); + case ENUM: + return DataTypes.STRING().notNull(); + case ARRAY: + return DataTypes.ARRAY(convertToDataType(schema.getElementType())).notNull(); + case MAP: + return DataTypes.MAP( + DataTypes.STRING().notNull(), + convertToDataType(schema.getValueType())) + .notNull(); + case UNION: + final Schema actualSchema; + final boolean nullable; + if (schema.getTypes().size() == 2 + && schema.getTypes().get(0).getType() == Schema.Type.NULL) { + actualSchema = schema.getTypes().get(1); + nullable = true; + } else if (schema.getTypes().size() == 2 + && schema.getTypes().get(1).getType() == Schema.Type.NULL) { + actualSchema = schema.getTypes().get(0); + nullable = true; + } else if (schema.getTypes().size() == 1) { + actualSchema = schema.getTypes().get(0); + nullable = false; + } else { + // use Kryo for serialization + return new AtomicDataType( + new TypeInformationRawType<>(false, Types.GENERIC(Object.class))); + } + DataType converted = convertToDataType(actualSchema); + return nullable ? converted.nullable() : converted; + case FIXED: + // logical decimal type + if (schema.getLogicalType() instanceof LogicalTypes.Decimal) { + final LogicalTypes.Decimal decimalType = + (LogicalTypes.Decimal) schema.getLogicalType(); + return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale()) + .notNull(); + } + // convert fixed size binary data to primitive byte arrays + return DataTypes.VARBINARY(schema.getFixedSize()).notNull(); + case STRING: + // convert Avro's Utf8/CharSequence to String + return DataTypes.STRING().notNull(); + case BYTES: + // logical decimal type + if (schema.getLogicalType() instanceof LogicalTypes.Decimal) { + final LogicalTypes.Decimal decimalType = + (LogicalTypes.Decimal) schema.getLogicalType(); + return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale()) + .notNull(); + } + return DataTypes.BYTES().notNull(); + case INT: + // logical date and time type + final org.apache.avro.LogicalType logicalType = schema.getLogicalType(); + if (logicalType == LogicalTypes.date()) { + return DataTypes.DATE().notNull(); + } else if (logicalType == LogicalTypes.timeMillis()) { + return DataTypes.TIME(3).notNull(); + } + return DataTypes.INT().notNull(); + case LONG: + // logical timestamp type + if (schema.getLogicalType() == LogicalTypes.timestampMillis()) { + return DataTypes.TIMESTAMP(3).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) { + return DataTypes.TIMESTAMP(6).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.timeMillis()) { + return DataTypes.TIME(3).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) { + return DataTypes.TIME(6).notNull(); + } + return DataTypes.BIGINT().notNull(); + case FLOAT: + return DataTypes.FLOAT().notNull(); + case DOUBLE: + return DataTypes.DOUBLE().notNull(); + case BOOLEAN: + return DataTypes.BOOLEAN().notNull(); + case NULL: + return DataTypes.NULL(); + default: + throw new IllegalArgumentException("Unsupported Avro type '" + schema.getType() + "'."); + } + } +} + diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java b/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java new file mode 100644 index 000000000..d5ff66388 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.extractValueTypeToAvroMap; + +/** + * Tool class used to convert from {@link RowData} to Avro {@link GenericRecord}. + * + *

NOTE: reference from Flink release 1.12.0, should remove when Flink version upgrade to that. + */ +@Internal +public class RowDataToAvroConverters { + + // -------------------------------------------------------------------------------- + // Runtime Converters + // -------------------------------------------------------------------------------- + + /** + * Runtime converter that converts objects of Flink Table & SQL internal data structures to + * corresponding Avro data structures. + */ + @FunctionalInterface + public interface RowDataToAvroConverter extends Serializable { + Object convert(Schema schema, Object object); + } + + // -------------------------------------------------------------------------------- + // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is + // necessary because the maven shade plugin cannot relocate classes in + // SerializedLambdas (MSHADE-260). On the other hand we want to relocate Avro for + // sql-client uber jars. + // -------------------------------------------------------------------------------- + + /** + * Creates a runtime converter accroding to the given logical type that converts objects of + * Flink Table & SQL internal data structures to corresponding Avro data structures. + */ + public static RowDataToAvroConverter createConverter(LogicalType type) { + final RowDataToAvroConverter converter; + switch (type.getTypeRoot()) { + case NULL: + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return null; + } + }; + break; + case TINYINT: + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return ((Byte) object).intValue(); + } + }; + break; + case SMALLINT: + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return ((Short) object).intValue(); + } + }; + break; + case BOOLEAN: // boolean + case INTEGER: // int + case INTERVAL_YEAR_MONTH: // long + case BIGINT: // long + case INTERVAL_DAY_TIME: // long + case FLOAT: // float + case DOUBLE: // double + case TIME_WITHOUT_TIME_ZONE: // int + case DATE: // int + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return object; + } + }; + break; + case CHAR: + case VARCHAR: + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return new Utf8(object.toString()); + } + }; + break; + case BINARY: + case VARBINARY: + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return ByteBuffer.wrap((byte[]) object); + } + }; + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return ((TimestampData) object).toInstant().toEpochMilli(); + } + }; + break; + case DECIMAL: + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return ByteBuffer.wrap(((DecimalData) object).toUnscaledBytes()); + } + }; + break; + case ARRAY: + converter = createArrayConverter((ArrayType) type); + break; + case ROW: + converter = createRowConverter((RowType) type); + break; + case MAP: + case MULTISET: + converter = createMapConverter(type); + break; + case RAW: + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + + // wrap into nullable converter + return new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + if (object == null) { + return null; + } + + // get actual schema if it is a nullable schema + Schema actualSchema; + if (schema.getType() == Schema.Type.UNION) { + List types = schema.getTypes(); + int size = types.size(); + if (size == 2 && types.get(1).getType() == Schema.Type.NULL) { + actualSchema = types.get(0); + } else if (size == 2 && types.get(0).getType() == Schema.Type.NULL) { + actualSchema = types.get(1); + } else { + throw new IllegalArgumentException( + "The Avro schema is not a nullable type: " + schema.toString()); + } + } else { + actualSchema = schema; + } + return converter.convert(actualSchema, object); + } + }; + } + + private static RowDataToAvroConverter createRowConverter(RowType rowType) { + final RowDataToAvroConverter[] fieldConverters = + rowType.getChildren().stream() + .map(RowDataToAvroConverters::createConverter) + .toArray(RowDataToAvroConverter[]::new); + final LogicalType[] fieldTypes = + rowType.getFields().stream() + .map(RowType.RowField::getType) + .toArray(LogicalType[]::new); + final RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fieldTypes.length]; + for (int i = 0; i < fieldTypes.length; i++) { + fieldGetters[i] = RowData.createFieldGetter(fieldTypes[i], i); + } + final int length = rowType.getFieldCount(); + + return new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + final RowData row = (RowData) object; + final List fields = schema.getFields(); + final GenericRecord record = new GenericData.Record(schema); + for (int i = 0; i < length; ++i) { + final Schema.Field schemaField = fields.get(i); + Object avroObject = + fieldConverters[i].convert( + schemaField.schema(), fieldGetters[i].getFieldOrNull(row)); + record.put(i, avroObject); + } + return record; + } + }; + } + + private static RowDataToAvroConverter createArrayConverter(ArrayType arrayType) { + LogicalType elementType = arrayType.getElementType(); + final ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(elementType); + final RowDataToAvroConverter elementConverter = createConverter(arrayType.getElementType()); + + return new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + final Schema elementSchema = schema.getElementType(); + ArrayData arrayData = (ArrayData) object; + List list = new ArrayList<>(); + for (int i = 0; i < arrayData.size(); ++i) { + list.add( + elementConverter.convert( + elementSchema, elementGetter.getElementOrNull(arrayData, i))); + } + return list; + } + }; + } + + private static RowDataToAvroConverter createMapConverter(LogicalType type) { + LogicalType valueType = extractValueTypeToAvroMap(type); + final ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType); + final RowDataToAvroConverter valueConverter = createConverter(valueType); + + return new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + final Schema valueSchema = schema.getValueType(); + final MapData mapData = (MapData) object; + final ArrayData keyArray = mapData.keyArray(); + final ArrayData valueArray = mapData.valueArray(); + final Map map = new HashMap<>(mapData.size()); + for (int i = 0; i < mapData.size(); ++i) { + final String key = keyArray.getString(i).toString(); + final Object value = + valueConverter.convert( + valueSchema, valueGetter.getElementOrNull(valueArray, i)); + map.put(key, value); + } + return map; + } + }; + } +} + diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 71de651b2..9460ee89e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -18,47 +18,71 @@ package org.apache.hudi.util; -import org.apache.hudi.HoodieFlinkStreamer; +import org.apache.hudi.keygen.SimpleAvroKeyGenerator; +import org.apache.hudi.streamer.FlinkStreamerConfig; import org.apache.hudi.common.config.DFSPropertiesConfiguration; import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.engine.EngineType; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.config.HoodieCompactionConfig; -import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.keygen.KeyGenerator; -import org.apache.hudi.keygen.SimpleAvroKeyGenerator; +import org.apache.hudi.operator.FlinkOptions; import org.apache.hudi.schema.FilebasedSchemaProvider; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; +import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Preconditions; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.File; import java.io.IOException; import java.io.StringReader; import java.util.List; +import java.util.Properties; +/** + * Utilities for Flink stream read and write. + */ public class StreamerUtil { - private static Logger LOG = LoggerFactory.getLogger(StreamerUtil.class); + private static final Logger LOG = LoggerFactory.getLogger(StreamerUtil.class); - public static TypedProperties getProps(HoodieFlinkStreamer.Config cfg) { + public static TypedProperties appendKafkaProps(FlinkStreamerConfig config) { + TypedProperties properties = getProps(config); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, config.kafkaGroupId); + return properties; + } + + public static TypedProperties getProps(FlinkStreamerConfig cfg) { return readConfig( FSUtils.getFs(cfg.propsFilePath, getHadoopConf()), new Path(cfg.propsFilePath), cfg.configs).getConfig(); } + public static Schema getSourceSchema(FlinkStreamerConfig cfg) { + return new FilebasedSchemaProvider(FlinkOptions.fromStreamerConfig(cfg)).getSourceSchema(); + } + + public static Schema getSourceSchema(org.apache.flink.configuration.Configuration conf) { + return new FilebasedSchemaProvider(conf).getSourceSchema(); + } /** - * Read conig from files. + * Read config from properties file (`--props` option) and cmd line (`--hoodie-conf` option). */ public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath, List overriddenProps) { DFSPropertiesConfiguration conf; @@ -81,16 +105,50 @@ public class StreamerUtil { return conf; } - public static Configuration getHadoopConf() { - return new Configuration(); + public static org.apache.hadoop.conf.Configuration getHadoopConf() { + // create hadoop configuration with hadoop conf directory configured. + org.apache.hadoop.conf.Configuration hadoopConf = null; + for (String possibleHadoopConfPath : HadoopUtils.possibleHadoopConfPaths(new Configuration())) { + hadoopConf = getHadoopConfiguration(possibleHadoopConfPath); + if (hadoopConf != null) { + break; + } + } + if (hadoopConf == null) { + hadoopConf = new org.apache.hadoop.conf.Configuration(); + } + return hadoopConf; } - public static void checkRequiredProperties(TypedProperties props, List checkPropNames) { - checkPropNames.forEach(prop -> { - if (!props.containsKey(prop)) { - throw new HoodieNotSupportedException("Required property " + prop + " is missing"); + /** + * Returns a new Hadoop Configuration object using the path to the hadoop conf configured. + * + * @param hadoopConfDir Hadoop conf directory path. + * @return A Hadoop configuration instance. + */ + private static org.apache.hadoop.conf.Configuration getHadoopConfiguration(String hadoopConfDir) { + if (new File(hadoopConfDir).exists()) { + org.apache.hadoop.conf.Configuration hadoopConfiguration = new org.apache.hadoop.conf.Configuration(); + File coreSite = new File(hadoopConfDir, "core-site.xml"); + if (coreSite.exists()) { + hadoopConfiguration.addResource(new Path(coreSite.getAbsolutePath())); } - }); + File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml"); + if (hdfsSite.exists()) { + hadoopConfiguration.addResource(new Path(hdfsSite.getAbsolutePath())); + } + File yarnSite = new File(hadoopConfDir, "yarn-site.xml"); + if (yarnSite.exists()) { + hadoopConfiguration.addResource(new Path(yarnSite.getAbsolutePath())); + } + // Add mapred-site.xml. We need to read configurations like compression codec. + File mapredSite = new File(hadoopConfDir, "mapred-site.xml"); + if (mapredSite.exists()) { + hadoopConfiguration.addResource(new Path(mapredSite.getAbsolutePath())); + } + return hadoopConfiguration; + } + return null; } /** @@ -109,6 +167,21 @@ public class StreamerUtil { } } + /** + * Create a key generator class via reflection, passing in any configs needed. + *

+ * If the class name of key generator is configured through the properties file, i.e., {@code props}, use the corresponding key generator class; otherwise, use the default key generator class + * specified in {@link FlinkOptions}. + */ + public static KeyGenerator createKeyGenerator(Configuration conf) throws IOException { + String keyGeneratorClass = conf.getString(FlinkOptions.KEYGEN_CLASS); + try { + return (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, flinkConf2TypedProperties(conf)); + } catch (Throwable e) { + throw new IOException("Could not load key generator class " + keyGeneratorClass, e); + } + } + /** * Create a payload class via reflection, passing in an ordering/precombine value. */ @@ -122,21 +195,59 @@ public class StreamerUtil { } } - public static HoodieWriteConfig getHoodieClientConfig(HoodieFlinkStreamer.Config cfg) { - FileSystem fs = FSUtils.getFs(cfg.targetBasePath, getHadoopConf()); - HoodieWriteConfig.Builder builder = - HoodieWriteConfig.newBuilder().withEngineType(EngineType.FLINK).withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, true) - .withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build()) - .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(cfg.sourceOrderingField) - .build()) - .forTable(cfg.targetTableName) - .withAutoCommit(false) - .withProps(readConfig(fs, new Path(cfg.propsFilePath), cfg.configs) - .getConfig()); - - builder = builder.withSchema(new FilebasedSchemaProvider(getProps(cfg)).getTargetSchema().toString()); - HoodieWriteConfig config = builder.build(); - return config; + /** + * Create a payload class via reflection, do not ordering/precombine value. + */ + public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record) + throws IOException { + try { + return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass, + new Class[] {Option.class}, Option.of(record)); + } catch (Throwable e) { + throw new IOException("Could not create payload for class: " + payloadClass, e); + } } + public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) { + HoodieWriteConfig.Builder builder = + HoodieWriteConfig.newBuilder() + .withEngineType(EngineType.FLINK) + .withPath(conf.getString(FlinkOptions.PATH)) + .combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true) + .withCompactionConfig( + HoodieCompactionConfig.newBuilder() + .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS)) + .build()) + .forTable(conf.getString(FlinkOptions.TABLE_NAME)) + .withAutoCommit(false) + .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf))); + + builder = builder.withSchema(getSourceSchema(conf).toString()); + return builder.build(); + } + + /** + * Converts the give {@link Configuration} to {@link TypedProperties}. + * The default values are also set up. + * + * @param conf The flink configuration + * @return a TypedProperties instance + */ + public static TypedProperties flinkConf2TypedProperties(Configuration conf) { + Properties properties = new Properties(); + // put all the set up options + conf.addAllToProperties(properties); + // put all the default options + for (ConfigOption option : FlinkOptions.OPTIONAL_OPTIONS) { + if (!conf.contains(option)) { + properties.put(option.key(), option.defaultValue()); + } + } + return new TypedProperties(properties); + } + + public static void checkRequiredProperties(TypedProperties props, List checkPropNames) { + checkPropNames.forEach(prop -> + Preconditions.checkState(!props.containsKey(prop), "Required property " + prop + " is missing")); + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java new file mode 100644 index 000000000..c2d7a65f2 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator; + +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.operator.event.BatchWriteSuccessEvent; +import org.apache.hudi.operator.utils.StreamWriteFunctionWrapper; +import org.apache.hudi.operator.utils.TestConfigurations; +import org.apache.hudi.operator.utils.TestData; + +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.table.data.RowData; +import org.hamcrest.MatcherAssert; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.hudi.operator.utils.TestData.checkWrittenData; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test cases for StreamingSinkFunction. + */ +public class StreamWriteFunctionTest { + + private static final Map EXPECTED = new HashMap<>(); + + static { + EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1,par1, id2,par1,id2,Stephen,33,2,par1]"); + EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3,par2, id4,par2,id4,Fabian,31,4,par2]"); + EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3]"); + EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]"); + } + + private static final Map EXPECTED2 = new HashMap<>(); + + static { + EXPECTED2.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]"); + EXPECTED2.put("par2", "[id3,par2,id3,Julian,54,3,par2, id4,par2,id4,Fabian,32,4,par2]"); + EXPECTED2.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3, " + + "id9,par3,id9,Jane,19,6,par3]"); + EXPECTED2.put("par4", "[id10,par4,id10,Ella,38,7,par4, id11,par4,id11,Phoebe,52,8,par4, " + + "id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]"); + } + + private StreamWriteFunctionWrapper funcWrapper; + + @TempDir + File tempFile; + + @BeforeEach + public void before() throws Exception { + this.funcWrapper = new StreamWriteFunctionWrapper<>( + tempFile.getAbsolutePath(), + TestConfigurations.SERIALIZER); + } + + @AfterEach + public void after() throws Exception { + funcWrapper.close(); + } + + @Test + public void testCheckpoint() throws Exception { + // open the function and ingest data + funcWrapper.openFunction(); + for (RowData rowData : TestData.DATA_SET_ONE) { + funcWrapper.invoke(rowData); + } + + // no checkpoint, so the coordinator does not accept any events + assertTrue( + funcWrapper.getEventBuffer().length == 1 + && funcWrapper.getEventBuffer()[0] == null, "The coordinator events buffer expect to be empty"); + + // this triggers the data write and event send + funcWrapper.checkpointFunction(1); + + String instant = funcWrapper.getWriteClient() + .getInflightAndRequestedInstant("COPY_ON_WRITE"); + + final OperatorEvent nextEvent = funcWrapper.getNextEvent(); + MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); + List writeStatuses = ((BatchWriteSuccessEvent) nextEvent).getWriteStatuses(); + assertNotNull(writeStatuses); + MatcherAssert.assertThat(writeStatuses.size(), is(4)); // write 4 partition files + assertThat(writeStatuses.stream() + .map(WriteStatus::getPartitionPath).sorted(Comparator.naturalOrder()) + .collect(Collectors.joining(",")), + is("par1,par2,par3,par4")); + + funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); + assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); + + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); + funcWrapper.checkpointComplete(1); + // the coordinator checkpoint commits the inflight instant. + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + + // checkpoint for next round, no data input, so after the checkpoint, + // there should not be REQUESTED Instant + // this triggers the data write and event send + funcWrapper.checkpointFunction(2); + + String instant2 = funcWrapper.getWriteClient() + .getInflightAndRequestedInstant("COPY_ON_WRITE"); + assertNotEquals(instant, instant2); + + final OperatorEvent nextEvent2 = funcWrapper.getNextEvent(); + assertThat("The operator expect to send an event", nextEvent2, instanceOf(BatchWriteSuccessEvent.class)); + List writeStatuses2 = ((BatchWriteSuccessEvent) nextEvent2).getWriteStatuses(); + assertNotNull(writeStatuses2); + assertThat(writeStatuses2.size(), is(0)); // write empty statuses + + funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent2); + assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); + + funcWrapper.checkpointComplete(2); + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, null); + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + } + + @Test + public void testCheckpointFails() throws Exception { + // open the function and ingest data + funcWrapper.openFunction(); + // no data written and triggers checkpoint fails, + // then we should revert the start instant + + // this triggers the data write and event send + funcWrapper.checkpointFunction(1); + + String instant = funcWrapper.getWriteClient() + .getInflightAndRequestedInstant("COPY_ON_WRITE"); + assertNotNull(instant); + + final OperatorEvent nextEvent = funcWrapper.getNextEvent(); + assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); + List writeStatuses = ((BatchWriteSuccessEvent) nextEvent).getWriteStatuses(); + assertNotNull(writeStatuses); + assertThat(writeStatuses.size(), is(0)); // no data write + + // fails the checkpoint + assertThrows(HoodieException.class, + () -> funcWrapper.checkpointFails(1), + "The last checkpoint was aborted, roll back the last write and throw"); + + // the instant metadata should be cleared + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, null); + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, null); + + for (RowData rowData : TestData.DATA_SET_ONE) { + funcWrapper.invoke(rowData); + } + + // this triggers the data write and event send + funcWrapper.checkpointFunction(2); + // Do not sent the write event and fails the checkpoint + assertThrows(HoodieException.class, + () -> funcWrapper.checkpointFails(2), + "The last checkpoint was aborted, roll back the last write and throw"); + } + + @Test + public void testInsert() throws Exception { + // open the function and ingest data + funcWrapper.openFunction(); + for (RowData rowData : TestData.DATA_SET_ONE) { + funcWrapper.invoke(rowData); + } + + assertEmptyDataFiles(); + // this triggers the data write and event send + funcWrapper.checkpointFunction(1); + + String instant = funcWrapper.getWriteClient() + .getInflightAndRequestedInstant("COPY_ON_WRITE"); + + final OperatorEvent nextEvent = funcWrapper.getNextEvent(); + assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); + checkWrittenData(tempFile, EXPECTED); + + funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); + assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); + + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); + funcWrapper.checkpointComplete(1); + // the coordinator checkpoint commits the inflight instant. + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + checkWrittenData(tempFile, EXPECTED); + } + + @Test + public void testUpsert() throws Exception { + // open the function and ingest data + funcWrapper.openFunction(); + for (RowData rowData : TestData.DATA_SET_ONE) { + funcWrapper.invoke(rowData); + } + + assertEmptyDataFiles(); + // this triggers the data write and event send + funcWrapper.checkpointFunction(1); + + OperatorEvent nextEvent = funcWrapper.getNextEvent(); + assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); + + funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); + assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); + + funcWrapper.checkpointComplete(1); + + // upsert another data buffer + for (RowData rowData : TestData.DATA_SET_TWO) { + funcWrapper.invoke(rowData); + } + // the data is not flushed yet + checkWrittenData(tempFile, EXPECTED); + // this triggers the data write and event send + funcWrapper.checkpointFunction(2); + + String instant = funcWrapper.getWriteClient() + .getInflightAndRequestedInstant("COPY_ON_WRITE"); + + nextEvent = funcWrapper.getNextEvent(); + assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); + checkWrittenData(tempFile, EXPECTED2); + + funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); + assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); + + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); + funcWrapper.checkpointComplete(2); + // the coordinator checkpoint commits the inflight instant. + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + checkWrittenData(tempFile, EXPECTED2); + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + @SuppressWarnings("rawtypes") + private void checkInstantState( + HoodieFlinkWriteClient writeClient, + HoodieInstant.State state, + String instantStr) { + final String instant; + switch (state) { + case REQUESTED: + instant = writeClient.getInflightAndRequestedInstant("COPY_ON_WRITE"); + break; + case COMPLETED: + instant = writeClient.getLastCompletedInstant("COPY_ON_WRITE"); + break; + default: + throw new AssertionError("Unexpected state"); + } + assertThat(instant, is(instantStr)); + } + + /** + * Asserts the data files are empty. + */ + private void assertEmptyDataFiles() { + File[] dataFiles = tempFile.listFiles(file -> !file.getName().startsWith(".")); + assertNotNull(dataFiles); + assertThat(dataFiles.length, is(0)); + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java new file mode 100644 index 000000000..56f946b96 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator; + +import org.apache.hudi.operator.utils.TestConfigurations; +import org.apache.hudi.operator.utils.TestData; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.io.FilePathFilter; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.io.TextInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; +import org.apache.flink.formats.json.TimestampFormat; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.FileProcessingMode; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.TestLogger; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +/** + * Integration test for Flink Hoodie stream sink. + */ +public class StreamWriteITCase extends TestLogger { + + private static final Map EXPECTED = new HashMap<>(); + + static { + EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1, id2,par1,id2,Stephen,33,2000,par1]"); + EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2, id4,par2,id4,Fabian,31,4000,par2]"); + EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, id6,par3,id6,Emma,20,6000,par3]"); + EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4, id8,par4,id8,Han,56,8000,par4]"); + } + + @TempDir + File tempFile; + + @Test + public void testWriteToHoodie() throws Exception { + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + execEnv.getConfig().disableObjectReuse(); + execEnv.setParallelism(4); + // 1 second a time + execEnv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); + + // Read from kafka source + RowType rowType = + (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)) + .getLogicalType(); + StreamWriteOperatorFactory operatorFactory = + new StreamWriteOperatorFactory<>(rowType, conf, 4); + + int partitionFieldIndex = rowType.getFieldIndex(conf.getString(FlinkOptions.PARTITION_PATH_FIELD)); + final RowData.FieldGetter partitionFieldGetter = + RowData.createFieldGetter(rowType.getTypeAt(partitionFieldIndex), partitionFieldIndex); + + JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( + rowType, + new RowDataTypeInfo(rowType), + false, + true, + TimestampFormat.ISO_8601 + ); + String sourcePath = Objects.requireNonNull(Thread.currentThread() + .getContextClassLoader().getResource("test_source.data")).toString(); + + TextInputFormat format = new TextInputFormat(new Path(sourcePath)); + format.setFilesFilter(FilePathFilter.createDefaultFilter()); + TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; + format.setCharsetName("UTF-8"); + + DataStream dataStream = execEnv + // use PROCESS_CONTINUOUSLY mode to trigger checkpoint + .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo) + .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) + // Key-by partition path, to avoid multiple subtasks write to a partition at the same time + .keyBy(partitionFieldGetter::getFieldOrNull) + .transform("hoodie_stream_write", null, operatorFactory) + .uid("uid_hoodie_stream_write") + .setParallelism(4); + execEnv.addOperator(dataStream.getTransformation()); + + JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME))); + if (client.getJobStatus().get() != JobStatus.FAILED) { + try { + TimeUnit.SECONDS.sleep(10); + client.cancel(); + } catch (Throwable var1) { + // ignored + } + } + + TestData.checkWrittenData(tempFile, EXPECTED); + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java new file mode 100644 index 000000000..c533b48e5 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.operator.event.BatchWriteSuccessEvent; +import org.apache.hudi.operator.utils.TestConfigurations; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test cases for StreamingSinkOperatorCoordinator. + */ +public class StreamWriteOperatorCoordinatorTest { + private StreamWriteOperatorCoordinator coordinator; + + @TempDir + File tempFile; + + @BeforeEach + public void before() throws Exception { + coordinator = new StreamWriteOperatorCoordinator( + TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), 2); + coordinator.start(); + } + + @AfterEach + public void after() { + coordinator.close(); + } + + @Test + public void testTableInitialized() throws IOException { + final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf(); + String basePath = tempFile.getAbsolutePath(); + try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) { + assertTrue(fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))); + } + } + + @Test + public void testCheckpointAndRestore() throws Exception { + CompletableFuture future = new CompletableFuture<>(); + coordinator.checkpointCoordinator(1, future); + coordinator.resetToCheckpoint(future.get()); + } + + @Test + public void testReceiveInvalidEvent() { + CompletableFuture future = new CompletableFuture<>(); + coordinator.checkpointCoordinator(1, future); + OperatorEvent event = new BatchWriteSuccessEvent(0, "abc", Collections.emptyList()); + assertThrows(IllegalStateException.class, + () -> coordinator.handleEventFromOperator(0, event), + "Receive an unexpected event for instant abc from task 0"); + } + + @Test + public void testCheckpointInvalid() { + final CompletableFuture future = new CompletableFuture<>(); + coordinator.checkpointCoordinator(1, future); + String inflightInstant = coordinator.getInFlightInstant(); + OperatorEvent event = new BatchWriteSuccessEvent(0, inflightInstant, Collections.emptyList()); + coordinator.handleEventFromOperator(0, event); + final CompletableFuture future2 = new CompletableFuture<>(); + coordinator.checkpointCoordinator(2, future2); + assertTrue(future2.isCompletedExceptionally()); + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockFunctionInitializationContext.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockFunctionInitializationContext.java new file mode 100644 index 000000000..40e58fe1f --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockFunctionInitializationContext.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.utils; + +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.runtime.state.FunctionInitializationContext; + +/** + * A {@link FunctionInitializationContext} for testing purpose. + */ +public class MockFunctionInitializationContext implements FunctionInitializationContext { + + private final MockOperatorStateStore operatorStateStore; + + public MockFunctionInitializationContext() { + operatorStateStore = new MockOperatorStateStore(); + } + + @Override + public boolean isRestored() { + throw new UnsupportedOperationException(); + } + + @Override + public MockOperatorStateStore getOperatorStateStore() { + return operatorStateStore; + } + + @Override + public KeyedStateStore getKeyedStateStore() { + return operatorStateStore; + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockMapState.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockMapState.java new file mode 100644 index 000000000..96972986b --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockMapState.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.utils; + +import org.apache.flink.api.common.state.MapState; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * Mock map state for testing. + * + * @param Type of state key + * @param Type of state value + */ +public class MockMapState implements MapState { + private final Map backingMap = new HashMap<>(); + + @Override + public V get(K uk) { + return backingMap.get(uk); + } + + @Override + public void put(K uk, V uv) { + backingMap.put(uk, uv); + } + + @Override + public void putAll(Map map) { + backingMap.putAll(map); + } + + @Override + public void remove(K uk) { + backingMap.remove(uk); + } + + @Override + public boolean contains(K uk) { + return backingMap.containsKey(uk); + } + + @Override + public Iterable> entries() { + return backingMap.entrySet(); + } + + @Override + public Iterable keys() { + return backingMap.keySet(); + } + + @Override + public Iterable values() { + return backingMap.values(); + } + + @Override + public Iterator> iterator() { + return backingMap.entrySet().iterator(); + } + + @Override + public boolean isEmpty() { + return backingMap.isEmpty(); + } + + @Override + public void clear() { + backingMap.clear(); + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockOperatorStateStore.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockOperatorStateStore.java new file mode 100644 index 000000000..016ad5bec --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockOperatorStateStore.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.utils; + +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.BroadcastState; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * An {@link OperatorStateStore} for testing purpose. + */ +@SuppressWarnings("rawtypes") +public class MockOperatorStateStore implements KeyedStateStore, OperatorStateStore { + + private final Map> historyStateMap; + + private Map currentStateMap; + private Map lastSuccessStateMap; + + private MapState mapState; + + public MockOperatorStateStore() { + this.historyStateMap = new HashMap<>(); + + this.currentStateMap = new HashMap<>(); + this.lastSuccessStateMap = new HashMap<>(); + + this.mapState = new MockMapState<>(); + } + + @Override + public BroadcastState getBroadcastState(MapStateDescriptor stateDescriptor) throws Exception { + return null; + } + + @Override + public ValueState getState(ValueStateDescriptor valueStateDescriptor) { + return null; + } + + @Override + @SuppressWarnings("unchecked") + public ListState getListState(ListStateDescriptor stateDescriptor) { + String name = stateDescriptor.getName(); + currentStateMap.putIfAbsent(name, new TestUtils.MockListState()); + return currentStateMap.get(name); + } + + @Override + public ReducingState getReducingState(ReducingStateDescriptor reducingStateDescriptor) { + return null; + } + + @Override + public AggregatingState getAggregatingState(AggregatingStateDescriptor aggregatingStateDescriptor) { + return null; + } + + @Override + public FoldingState getFoldingState(FoldingStateDescriptor foldingStateDescriptor) { + return null; + } + + @Override + @SuppressWarnings("unchecked") + public MapState getMapState(MapStateDescriptor mapStateDescriptor) { + return this.mapState; + } + + @Override + public ListState getUnionListState(ListStateDescriptor stateDescriptor) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public Set getRegisteredStateNames() { + throw new UnsupportedOperationException(); + } + + @Override + public Set getRegisteredBroadcastStateNames() { + throw new UnsupportedOperationException(); + } + + public void checkpointBegin(long checkpointId) { + Map copiedStates = Collections.unmodifiableMap(copyStates(currentStateMap)); + historyStateMap.put(checkpointId, copiedStates); + } + + public void checkpointSuccess(long checkpointId) { + lastSuccessStateMap = historyStateMap.get(checkpointId); + } + + public void rollBackToLastSuccessCheckpoint() { + this.currentStateMap = copyStates(lastSuccessStateMap); + } + + @SuppressWarnings("unchecked") + private Map copyStates(Map stateMap) { + Map copiedStates = new HashMap<>(); + for (Map.Entry entry : stateMap.entrySet()) { + TestUtils.MockListState copiedState = new TestUtils.MockListState(); + copiedState.addAll(entry.getValue().getBackingList()); + copiedStates.put(entry.getKey(), copiedState); + } + return copiedStates; + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockStreamingRuntimeContext.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockStreamingRuntimeContext.java new file mode 100644 index 000000000..1db98df86 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockStreamingRuntimeContext.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.utils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; + +import java.util.HashMap; + +/** + * Mock {@link StreamingRuntimeContext} to use in tests. + * + *

NOTE: Adapted from Apache Flink, the MockStreamOperator is modified to support MapState. + */ +public class MockStreamingRuntimeContext extends StreamingRuntimeContext { + + private final boolean isCheckpointingEnabled; + + private final int numParallelSubtasks; + private final int subtaskIndex; + + public MockStreamingRuntimeContext( + boolean isCheckpointingEnabled, + int numParallelSubtasks, + int subtaskIndex) { + + this(isCheckpointingEnabled, numParallelSubtasks, subtaskIndex, new MockEnvironmentBuilder() + .setTaskName("mockTask") + .setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE) + .build()); + } + + public MockStreamingRuntimeContext( + boolean isCheckpointingEnabled, + int numParallelSubtasks, + int subtaskIndex, + MockEnvironment environment) { + + super(new MockStreamOperator(), environment, new HashMap<>()); + + this.isCheckpointingEnabled = isCheckpointingEnabled; + this.numParallelSubtasks = numParallelSubtasks; + this.subtaskIndex = subtaskIndex; + } + + @Override + public MetricGroup getMetricGroup() { + return new UnregisteredMetricsGroup(); + } + + @Override + public boolean isCheckpointingEnabled() { + return isCheckpointingEnabled; + } + + @Override + public int getIndexOfThisSubtask() { + return subtaskIndex; + } + + @Override + public int getNumberOfParallelSubtasks() { + return numParallelSubtasks; + } + + private static class MockStreamOperator extends AbstractStreamOperator { + private static final long serialVersionUID = -1153976702711944427L; + + private transient TestProcessingTimeService testProcessingTimeService; + + private transient MockOperatorStateStore mockOperatorStateStore; + + @Override + public ExecutionConfig getExecutionConfig() { + return new ExecutionConfig(); + } + + @Override + public OperatorID getOperatorID() { + return new OperatorID(); + } + + @Override + public ProcessingTimeService getProcessingTimeService() { + if (testProcessingTimeService == null) { + testProcessingTimeService = new TestProcessingTimeService(); + } + return testProcessingTimeService; + } + + @Override + public KeyedStateStore getKeyedStateStore() { + if (mockOperatorStateStore == null) { + mockOperatorStateStore = new MockOperatorStateStore(); + } + return mockOperatorStateStore; + } + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java new file mode 100644 index 000000000..1b02791e7 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.utils; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext; +import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway; + +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.operator.StreamWriteFunction; +import org.apache.hudi.operator.StreamWriteOperatorCoordinator; +import org.apache.hudi.operator.event.BatchWriteSuccessEvent; + +import java.util.concurrent.CompletableFuture; + +/** + * A wrapper class to manipulate the {@link StreamWriteFunction} instance for testing. + * + * @param Input type + */ +public class StreamWriteFunctionWrapper { + private final TypeSerializer serializer; + private final Configuration conf; + + private final IOManager ioManager; + private final StreamingRuntimeContext runtimeContext; + private final MockOperatorEventGateway gateway; + private final StreamWriteOperatorCoordinator coordinator; + private final MockFunctionInitializationContext functionInitializationContext; + + private StreamWriteFunction function; + + public StreamWriteFunctionWrapper(String tablePath, TypeSerializer serializer) throws Exception { + this.serializer = serializer; + this.ioManager = new IOManagerAsync(); + MockEnvironment environment = new MockEnvironmentBuilder() + .setTaskName("mockTask") + .setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE) + .setIOManager(ioManager) + .build(); + this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment); + this.gateway = new MockOperatorEventGateway(); + this.conf = TestConfigurations.getDefaultConf(tablePath); + // one function + this.coordinator = new StreamWriteOperatorCoordinator(conf, 1); + this.coordinator.start(); + this.functionInitializationContext = new MockFunctionInitializationContext(); + } + + public void openFunction() throws Exception { + function = new StreamWriteFunction<>(TestConfigurations.ROW_TYPE, this.conf); + function.setRuntimeContext(runtimeContext); + function.setOperatorEventGateway(gateway); + function.open(this.conf); + } + + public void invoke(I record) throws Exception { + function.processElement(record, null, null); + } + + public BatchWriteSuccessEvent[] getEventBuffer() { + return this.coordinator.getEventBuffer(); + } + + public OperatorEvent getNextEvent() { + return this.gateway.getNextEvent(); + } + + @SuppressWarnings("rawtypes") + public HoodieFlinkWriteClient getWriteClient() { + return this.function.getWriteClient(); + } + + public void checkpointFunction(long checkpointId) throws Exception { + // checkpoint the coordinator first + this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>()); + function.snapshotState(new MockFunctionSnapshotContext(checkpointId)); + functionInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId); + } + + public void checkpointComplete(long checkpointId) { + functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId); + coordinator.checkpointComplete(checkpointId); + } + + public void checkpointFails(long checkpointId) { + coordinator.notifyCheckpointAborted(checkpointId); + } + + public void close() throws Exception { + coordinator.close(); + ioManager.close(); + } + + public StreamWriteOperatorCoordinator getCoordinator() { + return coordinator; + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java new file mode 100644 index 000000000..7513feddc --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.utils; + +import org.apache.hudi.operator.FlinkOptions; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Objects; + +/** + * Configurations for the test. + */ +public class TestConfigurations { + private TestConfigurations() { + } + + public static final RowType ROW_TYPE = (RowType) DataTypes.ROW( + DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key + DataTypes.FIELD("name", DataTypes.VARCHAR(10)), + DataTypes.FIELD("age", DataTypes.INT()), + DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field + DataTypes.FIELD("partition", DataTypes.VARCHAR(10))) + .notNull() + .getLogicalType(); + + public static final RowDataSerializer SERIALIZER = new RowDataSerializer(new ExecutionConfig(), ROW_TYPE); + + public static Configuration getDefaultConf(String tablePath) { + Configuration conf = new Configuration(); + conf.setString(FlinkOptions.PATH, tablePath); + conf.setString(FlinkOptions.READ_SCHEMA_FILE_PATH, + Objects.requireNonNull(Thread.currentThread() + .getContextClassLoader().getResource("test_read_schema.avsc")).toString()); + conf.setString(FlinkOptions.TABLE_NAME, "TestHoodieTable"); + conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition"); + return conf; + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java new file mode 100644 index 000000000..7c2c31451 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.utils; + +import org.apache.hudi.common.fs.FSUtils; + +import org.apache.avro.generic.GenericRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.data.writer.BinaryRowWriter; +import org.apache.flink.table.data.writer.BinaryWriter; +import org.apache.flink.table.runtime.types.InternalSerializers; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.Strings; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.hadoop.ParquetReader; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +import static junit.framework.TestCase.assertEquals; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +/** Data set for testing, also some utilities to check the results. */ +public class TestData { + public static List DATA_SET_ONE = Arrays.asList( + binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(1), StringData.fromString("par1")), + binaryRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33, + TimestampData.fromEpochMillis(2), StringData.fromString("par1")), + binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 53, + TimestampData.fromEpochMillis(3), StringData.fromString("par2")), + binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 31, + TimestampData.fromEpochMillis(4), StringData.fromString("par2")), + binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18, + TimestampData.fromEpochMillis(5), StringData.fromString("par3")), + binaryRow(StringData.fromString("id6"), StringData.fromString("Emma"), 20, + TimestampData.fromEpochMillis(6), StringData.fromString("par3")), + binaryRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44, + TimestampData.fromEpochMillis(7), StringData.fromString("par4")), + binaryRow(StringData.fromString("id8"), StringData.fromString("Han"), 56, + TimestampData.fromEpochMillis(8), StringData.fromString("par4")) + ); + + public static List DATA_SET_TWO = Arrays.asList( + // advance the age by 1 + binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24, + TimestampData.fromEpochMillis(1), StringData.fromString("par1")), + binaryRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 34, + TimestampData.fromEpochMillis(2), StringData.fromString("par1")), + binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 54, + TimestampData.fromEpochMillis(3), StringData.fromString("par2")), + binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 32, + TimestampData.fromEpochMillis(4), StringData.fromString("par2")), + // same with before + binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18, + TimestampData.fromEpochMillis(5), StringData.fromString("par3")), + // new data + binaryRow(StringData.fromString("id9"), StringData.fromString("Jane"), 19, + TimestampData.fromEpochMillis(6), StringData.fromString("par3")), + binaryRow(StringData.fromString("id10"), StringData.fromString("Ella"), 38, + TimestampData.fromEpochMillis(7), StringData.fromString("par4")), + binaryRow(StringData.fromString("id11"), StringData.fromString("Phoebe"), 52, + TimestampData.fromEpochMillis(8), StringData.fromString("par4")) + ); + + /** + * Checks the source data TestConfigurations.DATA_SET_ONE are written as expected. + * + *

Note: Replace it with the Flink reader when it is supported. + * + * @param baseFile The file base to check, should be a directly + * @param expected The expected results mapping, the key should be the partition path + */ + public static void checkWrittenData(File baseFile, Map expected) throws IOException { + assert baseFile.isDirectory(); + FileFilter filter = file -> !file.getName().startsWith("."); + File[] partitionDirs = baseFile.listFiles(filter); + assertNotNull(partitionDirs); + assertThat(partitionDirs.length, is(4)); + for (File partitionDir : partitionDirs) { + File[] dataFiles = partitionDir.listFiles(file -> file.getName().endsWith(".parquet")); + assertNotNull(dataFiles); + File latestDataFile = Arrays.stream(dataFiles) + .max(Comparator.comparing(f -> FSUtils.getCommitTime(f.getName()))) + .orElse(dataFiles[0]); + ParquetReader reader = AvroParquetReader + .builder(new Path(latestDataFile.getAbsolutePath())).build(); + List readBuffer = new ArrayList<>(); + GenericRecord nextRecord = reader.read(); + while (nextRecord != null) { + readBuffer.add(filterOutVariables(nextRecord)); + nextRecord = reader.read(); + } + readBuffer.sort(Comparator.naturalOrder()); + assertThat(readBuffer.toString(), is(expected.get(partitionDir.getName()))); + } + } + + /** + * Filter out the variables like file name. + */ + private static String filterOutVariables(GenericRecord genericRecord) { + List fields = new ArrayList<>(); + fields.add(genericRecord.get("_hoodie_record_key").toString()); + fields.add(genericRecord.get("_hoodie_partition_path").toString()); + fields.add(genericRecord.get("uuid").toString()); + fields.add(genericRecord.get("name").toString()); + fields.add(genericRecord.get("age").toString()); + fields.add(genericRecord.get("ts").toString()); + fields.add(genericRecord.get("partition").toString()); + return Strings.join(fields, ","); + } + + private static BinaryRowData binaryRow(Object... fields) { + LogicalType[] types = TestConfigurations.ROW_TYPE.getFields().stream().map(RowType.RowField::getType) + .toArray(LogicalType[]::new); + assertEquals( + "Filed count inconsistent with type information", + fields.length, + types.length); + BinaryRowData row = new BinaryRowData(fields.length); + BinaryRowWriter writer = new BinaryRowWriter(row); + writer.reset(); + for (int i = 0; i < fields.length; i++) { + Object field = fields[i]; + if (field == null) { + writer.setNullAt(i); + } else { + BinaryWriter.write(writer, i, field, types[i], InternalSerializers.create(types[i])); + } + } + writer.complete(); + return row; + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestJsonStringToHoodieRecordMapFunction.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestJsonStringToHoodieRecordMapFunction.java index 98066e96a..a24f1531d 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/TestJsonStringToHoodieRecordMapFunction.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestJsonStringToHoodieRecordMapFunction.java @@ -70,7 +70,7 @@ public class TestJsonStringToHoodieRecordMapFunction extends HoodieFlinkClientTe props.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS, OverwriteWithLatestAvroPayload.class.getName()); props.put(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, "timestamp"); props.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key"); - props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "partitionPath"); + props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "current_date"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); diff --git a/hudi-flink/src/test/resources/test_read_schema.avsc b/hudi-flink/src/test/resources/test_read_schema.avsc new file mode 100644 index 000000000..0cbb4e3d2 --- /dev/null +++ b/hudi-flink/src/test/resources/test_read_schema.avsc @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "type" : "record", + "name" : "record", + "fields" : [ { + "name" : "uuid", + "type" : [ "null", "string" ], + "default" : null + }, { + "name" : "name", + "type" : [ "null", "string" ], + "default" : null + }, { + "name" : "age", + "type" : [ "null", "int" ], + "default" : null + }, { + "name" : "ts", + "type" : [ "null", { + "type" : "long", + "logicalType" : "timestamp-millis" + } ], + "default" : null + }, { + "name" : "partition", + "type" : [ "null", "string" ], + "default" : null + } ] +} diff --git a/hudi-flink/src/test/resources/test_source.data b/hudi-flink/src/test/resources/test_source.data new file mode 100644 index 000000000..2f628e29c --- /dev/null +++ b/hudi-flink/src/test/resources/test_source.data @@ -0,0 +1,8 @@ +{"uuid": "id1", "name": "Danny", "age": 23, "ts": "1970-01-01T00:00:01", "partition": "par1"} +{"uuid": "id2", "name": "Stephen", "age": 33, "ts": "1970-01-01T00:00:02", "partition": "par1"} +{"uuid": "id3", "name": "Julian", "age": 53, "ts": "1970-01-01T00:00:03", "partition": "par2"} +{"uuid": "id4", "name": "Fabian", "age": 31, "ts": "1970-01-01T00:00:04", "partition": "par2"} +{"uuid": "id5", "name": "Sophia", "age": 18, "ts": "1970-01-01T00:00:05", "partition": "par3"} +{"uuid": "id6", "name": "Emma", "age": 20, "ts": "1970-01-01T00:00:06", "partition": "par3"} +{"uuid": "id7", "name": "Bob", "age": 44, "ts": "1970-01-01T00:00:07", "partition": "par4"} +{"uuid": "id8", "name": "Han", "age": 56, "ts": "1970-01-01T00:00:08", "partition": "par4"}