diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java new file mode 100644 index 000000000..741b05c45 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.checkpointing; + +import org.apache.hudi.exception.HoodieException; + +/** + * Provide the initial checkpoint for delta streamer. + */ +public interface InitialCheckPointProvider { + /** + * Get checkpoint string recognizable for delta streamer. + */ + String getCheckpoint() throws HoodieException; +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/KafkaConnectHdfsProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/KafkaConnectHdfsProvider.java new file mode 100644 index 000000000..f464f686e --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/KafkaConnectHdfsProvider.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.checkpointing; + +import org.apache.hudi.exception.HoodieException; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Generate checkpoint from Kafka-Connect-HDFS managed data set. + * Documentation: https://docs.confluent.io/current/connect/kafka-connect-hdfs/index.html + */ +public class KafkaConnectHdfsProvider implements InitialCheckPointProvider { + private final Path path; + private final FileSystem fs; + + private static final String FILENAME_SEPARATOR = "[\\+\\.]"; + + public KafkaConnectHdfsProvider(final Path basePath, final FileSystem fileSystem) { + this.path = basePath; + this.fs = fileSystem; + } + + /** + * PathFilter for Kafka-Connect-HDFS. + * Directory format: /partition1=xxx/partition2=xxx + * File format: topic+partition+lowerOffset+upperOffset.file + */ + public static class KafkaConnectPathFilter implements PathFilter { + private static final Pattern DIRECTORY_PATTERN = Pattern.compile(".*=.*"); + private static final Pattern PATTERN = + Pattern.compile("[a-zA-Z0-9\\._\\-]+\\+\\d+\\+\\d+\\+\\d+(.\\w+)?"); + + @Override + public boolean accept(final Path path) { + final String filename = path.getName(); + final Matcher matcher = PATTERN.matcher(filename); + return matcher.matches(); + } + + public boolean acceptDir(final Path path) { + final String dirName = path.getName(); + final Matcher matcher = DIRECTORY_PATTERN.matcher(dirName); + return matcher.matches(); + } + } + + /** + * Convert map contains max offset of each partition to string. + * @param topic Topic name + * @param checkpoint Map with partition as key and max offset as value + * @return Checkpoint string + */ + private static String buildCheckpointStr(final String topic, + final HashMap checkpoint) { + final StringBuilder checkpointStr = new StringBuilder(); + checkpointStr.append(topic); + for (int i = 0; i < checkpoint.size(); ++i) { + checkpointStr.append(",").append(i).append(":").append(checkpoint.get(i)); + } + return checkpointStr.toString(); + } + + /** + * List file status recursively. + * @param curPath Current Path + * @param filter PathFilter + * @return All file status match kafka connect naming convention + * @throws IOException + */ + private ArrayList listAllFileStatus(Path curPath, + KafkaConnectPathFilter filter) throws IOException { + ArrayList allFileStatus = new ArrayList<>(); + FileStatus[] fileStatus = this.fs.listStatus(curPath); + for (FileStatus status : fileStatus) { + if (status.isDirectory() && filter.acceptDir(status.getPath())) { + allFileStatus.addAll(listAllFileStatus(status.getPath(), filter)); + } else { + if (filter.accept(status.getPath())) { + allFileStatus.add(status); + } + } + } + return allFileStatus; + } + + @Override + public String getCheckpoint() throws HoodieException { + final KafkaConnectPathFilter filter = new KafkaConnectPathFilter(); + ArrayList fileStatus; + try { + fileStatus = listAllFileStatus(this.path, filter); + } catch (IOException e) { + throw new HoodieException(e.toString()); + } + if (fileStatus.size() == 0) { + throw new HoodieException("No valid Kafka Connect Hdfs file found under:" + this.path.getName()); + } + final String topic = fileStatus.get(0).getPath().getName().split(FILENAME_SEPARATOR)[0]; + int maxPartition = -1; + final HashMap checkpointMap = new HashMap<>(); + for (final FileStatus status : fileStatus) { + final String filename = status.getPath().getName(); + final String[] groups = filename.split(FILENAME_SEPARATOR); + final int partition = Integer.parseInt(groups[1]); + final int offsetUpper = Integer.parseInt(groups[3]); + maxPartition = Math.max(maxPartition, partition); + if (checkpointMap.containsKey(partition)) { + checkpointMap.put(partition, Math.max(checkpointMap.get(partition), offsetUpper)); + } else { + checkpointMap.put(partition, offsetUpper); + } + } + if (checkpointMap.size() != maxPartition + 1) { + throw new HoodieException("Missing partition from the file scan, " + + "max partition found(start from 0): " + + maxPartition + + " total partitions number appear in " + + this.path.getName() + + " is: " + + checkpointMap.size() + + " total partitions number expected: " + + (maxPartition + 1)); + } + return buildCheckpointStr(topic, checkpointMap); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java new file mode 100644 index 000000000..fed8e46e0 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.checkpointing; + +import org.apache.hudi.common.HoodieCommonTestHarness; +import org.apache.hudi.common.model.HoodieTestUtils; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.exception.HoodieException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; + +import static org.junit.Assert.assertEquals; + +public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness { + private FileSystem fs = null; + private String topicPath = null; + + @Before + public void init() { + // Prepare directories + initPath(); + final Configuration hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); + fs = FSUtils.getFs(basePath, hadoopConf); + } + + @Test + public void testValidKafkaConnectPath() throws Exception { + // a standard format(time based partition) of the files managed by kafka connect is: + // topic/year=xxx/month=xxx/day=xxx/topic+partition+lowerOffset+upperOffset.file + topicPath = basePath + "/topic1"; + new File(topicPath).mkdirs(); + // create regular kafka connect hdfs dirs + new File(topicPath + "/year=2016/month=05/day=01/").mkdirs(); + new File(topicPath + "/year=2016/month=05/day=02/").mkdirs(); + // kafka connect tmp folder + new File(topicPath + "/TMP").mkdirs(); + // tmp file that being written + new File(topicPath + "/TMP/" + "topic1+0+301+400.parquet").createNewFile(); + // regular parquet files + new File(topicPath + "/year=2016/month=05/day=01/" + + "topic1+0+100+200.parquet").createNewFile(); + new File(topicPath + "/year=2016/month=05/day=01/" + + "topic1+1+100+200.parquet").createNewFile(); + new File(topicPath + "/year=2016/month=05/day=02/" + + "topic1+0+201+300.parquet").createNewFile(); + // noise parquet file + new File(topicPath + "/year=2016/month=05/day=01/" + + "random_snappy_1.parquet").createNewFile(); + new File(topicPath + "/year=2016/month=05/day=02/" + + "random_snappy_2.parquet").createNewFile(); + InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(new Path(topicPath), fs); + assertEquals(provider.getCheckpoint(), "topic1,0:300,1:200"); + } + + @Test(expected = HoodieException.class) + public void testMissingPartition() throws Exception { + topicPath = basePath + "/topic2"; + new File(topicPath).mkdirs(); + // create regular kafka connect hdfs dirs + new File(topicPath + "/year=2016/month=05/day=01/").mkdirs(); + new File(topicPath + "/year=2016/month=05/day=02/").mkdirs(); + // parquet files with missing partition + new File(topicPath + "/year=2016/month=05/day=01/" + + "topic1+0+100+200.parquet").createNewFile(); + new File(topicPath + "/year=2016/month=05/day=01/" + + "topic1+2+100+200.parquet").createNewFile(); + new File(topicPath + "/year=2016/month=05/day=02/" + + "topic1+0+201+300.parquet").createNewFile(); + InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(new Path(topicPath), fs); + provider.getCheckpoint(); + } +}