1
0

[MINOR] Cosmetic changes for flink (#3701)

This commit is contained in:
Danny Chan
2021-09-22 12:18:02 +08:00
committed by GitHub
parent 55df8f61e1
commit e813dae36d
8 changed files with 59 additions and 19 deletions

View File

@@ -67,7 +67,7 @@ import java.util.stream.Collectors;
* <p><h2>The Semantics</h2>
*
* <p>The task implements exactly-once semantics by buffering the data between checkpoints. The operator coordinator
* starts a new instant on the time line when a checkpoint triggers, the coordinator checkpoints always
* starts a new instant on the timeline when a checkpoint triggers, the coordinator checkpoints always
* start before its operator, so when this function starts a checkpoint, a REQUESTED instant already exists.
*
* <p>The function process thread blocks data buffering after the checkpoint thread finishes flushing the existing data buffer until

View File

@@ -28,11 +28,8 @@ import java.util.List;
/**
* WriteProfile that always return empty small files.
*
* <p>This write profile is used for cases:
* i). INSERT OVERWRITE and INSERT OVERWRITE TABLE operations,
* the existing small files are ignored because of the 'OVERWRITE' semantics;
* ii). INSERT operation when data file merge is disabled.
*
* <p>This write profile is used for INSERT OVERWRITE and INSERT OVERWRITE TABLE operations,
* the existing small files are ignored because of the 'OVERWRITE' semantics.
*
* <p>Note: assumes the index can always index log files for Flink write.
*/

View File

@@ -41,8 +41,9 @@ import org.apache.flink.table.types.logical.RowType;
import java.util.Properties;
/**
* An Utility which can incrementally consume data from Kafka and apply it to the target table.
* currently, it only supports COW table and insert, upsert operation.
* A utility which can incrementally consume data from Kafka and apply it to the target table.
* It has the similar functionality with SQL data source except that the source is bind to Kafka
* and the format is bind to JSON.
*/
public class HoodieFlinkStreamer {
public static void main(String[] args) throws Exception {

View File

@@ -135,7 +135,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
}
/**
* Setup the config options based on the table definition, for e.g the table name, primary key.
* Sets up the config options based on the table definition, for e.g the table name, primary key.
*
* @param conf The configuration to setup
* @param tableName The table name

View File

@@ -40,6 +40,7 @@ import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadTableState;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.ChangelogModes;
import org.apache.hudi.util.InputFormats;
import org.apache.hudi.util.StreamerUtil;
import org.apache.avro.Schema;
@@ -48,7 +49,6 @@ import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.CollectionInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -108,9 +108,6 @@ public class HoodieTableSource implements
private static final int NO_LIMIT_CONSTANT = -1;
private static final InputFormat<RowData, ?> EMPTY_INPUT_FORMAT =
new CollectionInputFormat<>(Collections.emptyList(), null);
private final transient org.apache.hadoop.conf.Configuration hadoopConf;
private final transient HoodieTableMetaClient metaClient;
private final long maxCompactionMemoryInBytes;
@@ -340,7 +337,7 @@ public class HoodieTableSource implements
if (inputSplits.size() == 0) {
// When there is no input splits, just return an empty source.
LOG.warn("No input splits generate for MERGE_ON_READ input format, returns empty collection instead");
return EMPTY_INPUT_FORMAT;
return InputFormats.EMPTY_INPUT_FORMAT;
}
return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
rowDataType, inputSplits, false);
@@ -360,7 +357,7 @@ public class HoodieTableSource implements
if (result.isEmpty()) {
// When there is no input splits, just return an empty source.
LOG.warn("No input splits generate for incremental read, returns empty collection instead");
return new CollectionInputFormat<>(Collections.emptyList(), null);
return InputFormats.EMPTY_INPUT_FORMAT;
}
return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
rowDataType, result.getInputSplits(), false);
@@ -419,7 +416,7 @@ public class HoodieTableSource implements
private InputFormat<RowData, ?> baseFileOnlyInputFormat() {
final Path[] paths = getReadPaths();
if (paths.length == 0) {
return EMPTY_INPUT_FORMAT;
return InputFormats.EMPTY_INPUT_FORMAT;
}
FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
FilePathUtils.toFlinkPaths(paths),

View File

@@ -125,7 +125,7 @@ public class MergeOnReadInputFormat
/**
* Flag saying whether to emit the deletes. In streaming read mode, downstream
* operators need the delete messages to retract the legacy accumulator.
* operators need the DELETE messages to retract the legacy accumulator.
*/
private boolean emitDelete;

View File

@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.util;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.java.io.CollectionInputFormat;
import org.apache.flink.table.data.RowData;
import java.util.Collections;
/**
* Utilities for all kinds of {@link org.apache.flink.api.common.io.InputFormat}s.
*/
public class InputFormats {
public static final InputFormat<RowData, ?> EMPTY_INPUT_FORMAT =
new CollectionInputFormat<>(Collections.emptyList(), null);
}

View File

@@ -250,7 +250,7 @@ public class StreamerUtil {
basePath, conf.getString(FlinkOptions.TABLE_NAME));
}
// Do not close the filesystem in order to use the CACHE,
// some of the filesystems release the handles in #close method.
// some filesystems release the handles in #close method.
}
/**
@@ -359,7 +359,7 @@ public class StreamerUtil {
}
/**
* Return the median instant time between the given two instant time.
* Returns the median instant time between the given two instant time.
*/
public static String medianInstantTime(String highVal, String lowVal) {
try {
@@ -399,6 +399,10 @@ public class StreamerUtil {
}
}
/**
* Returns whether the give file is in valid hoodie format.
* For example, filtering out the empty or corrupt files.
*/
public static boolean isValidFile(FileStatus fileStatus) {
final String extension = FSUtils.getFileExtension(fileStatus.getPath().toString());
if (PARQUET.getFileExtension().equals(extension)) {
@@ -416,11 +420,19 @@ public class StreamerUtil {
return fileStatus.getLen() > 0;
}
/**
* Returns whether insert deduplication is allowed with given configuration {@code conf}.
*/
public static boolean allowDuplicateInserts(Configuration conf) {
WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
return operationType == WriteOperationType.INSERT && !conf.getBoolean(FlinkOptions.INSERT_DEDUP);
}
/**
* Returns whether there are successful commits on the timeline.
* @param metaClient The meta client
* @return true if there is any successful commit
*/
public static boolean haveSuccessfulCommits(HoodieTableMetaClient metaClient) {
return !metaClient.getCommitsTimeline().filterCompletedInstants().empty();
}