1
0

HUDI-644 kafka connect checkpoint provider (#1453)

This commit is contained in:
YanJia-Gary-Li
2020-04-03 18:57:34 -07:00
committed by GitHub
parent deb95ad996
commit 575d87cf7d
3 changed files with 277 additions and 0 deletions

View File

@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.checkpointing;
import org.apache.hudi.exception.HoodieException;
/**
* Provide the initial checkpoint for delta streamer.
*/
public interface InitialCheckPointProvider {
/**
* Get checkpoint string recognizable for delta streamer.
*/
String getCheckpoint() throws HoodieException;
}

View File

@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.checkpointing;
import org.apache.hudi.exception.HoodieException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Generate checkpoint from Kafka-Connect-HDFS managed data set.
* Documentation: https://docs.confluent.io/current/connect/kafka-connect-hdfs/index.html
*/
public class KafkaConnectHdfsProvider implements InitialCheckPointProvider {
private final Path path;
private final FileSystem fs;
private static final String FILENAME_SEPARATOR = "[\\+\\.]";
public KafkaConnectHdfsProvider(final Path basePath, final FileSystem fileSystem) {
this.path = basePath;
this.fs = fileSystem;
}
/**
* PathFilter for Kafka-Connect-HDFS.
* Directory format: /partition1=xxx/partition2=xxx
* File format: topic+partition+lowerOffset+upperOffset.file
*/
public static class KafkaConnectPathFilter implements PathFilter {
private static final Pattern DIRECTORY_PATTERN = Pattern.compile(".*=.*");
private static final Pattern PATTERN =
Pattern.compile("[a-zA-Z0-9\\._\\-]+\\+\\d+\\+\\d+\\+\\d+(.\\w+)?");
@Override
public boolean accept(final Path path) {
final String filename = path.getName();
final Matcher matcher = PATTERN.matcher(filename);
return matcher.matches();
}
public boolean acceptDir(final Path path) {
final String dirName = path.getName();
final Matcher matcher = DIRECTORY_PATTERN.matcher(dirName);
return matcher.matches();
}
}
/**
* Convert map contains max offset of each partition to string.
* @param topic Topic name
* @param checkpoint Map with partition as key and max offset as value
* @return Checkpoint string
*/
private static String buildCheckpointStr(final String topic,
final HashMap<Integer, Integer> checkpoint) {
final StringBuilder checkpointStr = new StringBuilder();
checkpointStr.append(topic);
for (int i = 0; i < checkpoint.size(); ++i) {
checkpointStr.append(",").append(i).append(":").append(checkpoint.get(i));
}
return checkpointStr.toString();
}
/**
* List file status recursively.
* @param curPath Current Path
* @param filter PathFilter
* @return All file status match kafka connect naming convention
* @throws IOException
*/
private ArrayList<FileStatus> listAllFileStatus(Path curPath,
KafkaConnectPathFilter filter) throws IOException {
ArrayList<FileStatus> allFileStatus = new ArrayList<>();
FileStatus[] fileStatus = this.fs.listStatus(curPath);
for (FileStatus status : fileStatus) {
if (status.isDirectory() && filter.acceptDir(status.getPath())) {
allFileStatus.addAll(listAllFileStatus(status.getPath(), filter));
} else {
if (filter.accept(status.getPath())) {
allFileStatus.add(status);
}
}
}
return allFileStatus;
}
@Override
public String getCheckpoint() throws HoodieException {
final KafkaConnectPathFilter filter = new KafkaConnectPathFilter();
ArrayList<FileStatus> fileStatus;
try {
fileStatus = listAllFileStatus(this.path, filter);
} catch (IOException e) {
throw new HoodieException(e.toString());
}
if (fileStatus.size() == 0) {
throw new HoodieException("No valid Kafka Connect Hdfs file found under:" + this.path.getName());
}
final String topic = fileStatus.get(0).getPath().getName().split(FILENAME_SEPARATOR)[0];
int maxPartition = -1;
final HashMap<Integer, Integer> checkpointMap = new HashMap<>();
for (final FileStatus status : fileStatus) {
final String filename = status.getPath().getName();
final String[] groups = filename.split(FILENAME_SEPARATOR);
final int partition = Integer.parseInt(groups[1]);
final int offsetUpper = Integer.parseInt(groups[3]);
maxPartition = Math.max(maxPartition, partition);
if (checkpointMap.containsKey(partition)) {
checkpointMap.put(partition, Math.max(checkpointMap.get(partition), offsetUpper));
} else {
checkpointMap.put(partition, offsetUpper);
}
}
if (checkpointMap.size() != maxPartition + 1) {
throw new HoodieException("Missing partition from the file scan, "
+ "max partition found(start from 0): "
+ maxPartition
+ " total partitions number appear in "
+ this.path.getName()
+ " is: "
+ checkpointMap.size()
+ " total partitions number expected: "
+ (maxPartition + 1));
}
return buildCheckpointStr(topic, checkpointMap);
}
}

View File

@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.checkpointing;
import org.apache.hudi.common.HoodieCommonTestHarness;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import static org.junit.Assert.assertEquals;
public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness {
private FileSystem fs = null;
private String topicPath = null;
@Before
public void init() {
// Prepare directories
initPath();
final Configuration hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
fs = FSUtils.getFs(basePath, hadoopConf);
}
@Test
public void testValidKafkaConnectPath() throws Exception {
// a standard format(time based partition) of the files managed by kafka connect is:
// topic/year=xxx/month=xxx/day=xxx/topic+partition+lowerOffset+upperOffset.file
topicPath = basePath + "/topic1";
new File(topicPath).mkdirs();
// create regular kafka connect hdfs dirs
new File(topicPath + "/year=2016/month=05/day=01/").mkdirs();
new File(topicPath + "/year=2016/month=05/day=02/").mkdirs();
// kafka connect tmp folder
new File(topicPath + "/TMP").mkdirs();
// tmp file that being written
new File(topicPath + "/TMP/" + "topic1+0+301+400.parquet").createNewFile();
// regular parquet files
new File(topicPath + "/year=2016/month=05/day=01/"
+ "topic1+0+100+200.parquet").createNewFile();
new File(topicPath + "/year=2016/month=05/day=01/"
+ "topic1+1+100+200.parquet").createNewFile();
new File(topicPath + "/year=2016/month=05/day=02/"
+ "topic1+0+201+300.parquet").createNewFile();
// noise parquet file
new File(topicPath + "/year=2016/month=05/day=01/"
+ "random_snappy_1.parquet").createNewFile();
new File(topicPath + "/year=2016/month=05/day=02/"
+ "random_snappy_2.parquet").createNewFile();
InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(new Path(topicPath), fs);
assertEquals(provider.getCheckpoint(), "topic1,0:300,1:200");
}
@Test(expected = HoodieException.class)
public void testMissingPartition() throws Exception {
topicPath = basePath + "/topic2";
new File(topicPath).mkdirs();
// create regular kafka connect hdfs dirs
new File(topicPath + "/year=2016/month=05/day=01/").mkdirs();
new File(topicPath + "/year=2016/month=05/day=02/").mkdirs();
// parquet files with missing partition
new File(topicPath + "/year=2016/month=05/day=01/"
+ "topic1+0+100+200.parquet").createNewFile();
new File(topicPath + "/year=2016/month=05/day=01/"
+ "topic1+2+100+200.parquet").createNewFile();
new File(topicPath + "/year=2016/month=05/day=02/"
+ "topic1+0+201+300.parquet").createNewFile();
InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(new Path(topicPath), fs);
provider.getCheckpoint();
}
}