[HUDI-3643] Fix hive count exception when the table is empty and the path depth is less than 3 (#5051)
This commit is contained in:
@@ -71,7 +71,6 @@ public class HoodieHiveUtils {
|
|||||||
public static final String DEFAULT_SCAN_MODE = SNAPSHOT_SCAN_MODE;
|
public static final String DEFAULT_SCAN_MODE = SNAPSHOT_SCAN_MODE;
|
||||||
public static final int DEFAULT_MAX_COMMITS = 1;
|
public static final int DEFAULT_MAX_COMMITS = 1;
|
||||||
public static final int MAX_COMMIT_ALL = -1;
|
public static final int MAX_COMMIT_ALL = -1;
|
||||||
public static final int DEFAULT_LEVELS_TO_BASEPATH = 3;
|
|
||||||
public static final Pattern HOODIE_CONSUME_MODE_PATTERN_STRING = Pattern.compile("hoodie\\.(.*)\\.consume\\.mode");
|
public static final Pattern HOODIE_CONSUME_MODE_PATTERN_STRING = Pattern.compile("hoodie\\.(.*)\\.consume\\.mode");
|
||||||
public static final String GLOBALLY_CONSISTENT_READ_TIMESTAMP = "last_replication_timestamp";
|
public static final String GLOBALLY_CONSISTENT_READ_TIMESTAMP = "last_replication_timestamp";
|
||||||
|
|
||||||
|
|||||||
@@ -46,6 +46,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView;
|
|||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.StringUtils;
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
|
import org.apache.hudi.exception.TableNotFoundException;
|
||||||
import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
|
import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
|
||||||
import org.apache.hudi.hadoop.HoodieHFileInputFormat;
|
import org.apache.hudi.hadoop.HoodieHFileInputFormat;
|
||||||
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
|
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
|
||||||
@@ -68,6 +69,7 @@ import java.util.stream.Collectors;
|
|||||||
|
|
||||||
import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
|
import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
|
||||||
import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;
|
import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;
|
||||||
|
import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
|
||||||
|
|
||||||
public class HoodieInputFormatUtils {
|
public class HoodieInputFormatUtils {
|
||||||
|
|
||||||
@@ -324,14 +326,24 @@ public class HoodieInputFormatUtils {
|
|||||||
* Extract HoodieTableMetaClient from a partition path (not base path)
|
* Extract HoodieTableMetaClient from a partition path (not base path)
|
||||||
*/
|
*/
|
||||||
public static HoodieTableMetaClient getTableMetaClientForBasePathUnchecked(Configuration conf, Path partitionPath) throws IOException {
|
public static HoodieTableMetaClient getTableMetaClientForBasePathUnchecked(Configuration conf, Path partitionPath) throws IOException {
|
||||||
|
Path baseDir = partitionPath;
|
||||||
FileSystem fs = partitionPath.getFileSystem(conf);
|
FileSystem fs = partitionPath.getFileSystem(conf);
|
||||||
int levels = HoodieHiveUtils.DEFAULT_LEVELS_TO_BASEPATH;
|
|
||||||
if (HoodiePartitionMetadata.hasPartitionMetadata(fs, partitionPath)) {
|
if (HoodiePartitionMetadata.hasPartitionMetadata(fs, partitionPath)) {
|
||||||
HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, partitionPath);
|
HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, partitionPath);
|
||||||
metadata.readFromFS();
|
metadata.readFromFS();
|
||||||
levels = metadata.getPartitionDepth();
|
int levels = metadata.getPartitionDepth();
|
||||||
|
baseDir = HoodieHiveUtils.getNthParent(partitionPath, levels);
|
||||||
|
} else {
|
||||||
|
for (int i = 0; i < partitionPath.depth(); i++) {
|
||||||
|
if (fs.exists(new Path(baseDir, METAFOLDER_NAME))) {
|
||||||
|
break;
|
||||||
|
} else if (i == partitionPath.depth() - 1) {
|
||||||
|
throw new TableNotFoundException(partitionPath.toString());
|
||||||
|
} else {
|
||||||
|
baseDir = baseDir.getParent();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Path baseDir = HoodieHiveUtils.getNthParent(partitionPath, levels);
|
|
||||||
LOG.info("Reading hoodie metadata from path " + baseDir.toString());
|
LOG.info("Reading hoodie metadata from path " + baseDir.toString());
|
||||||
return HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(baseDir.toString()).build();
|
return HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(baseDir.toString()).build();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -162,6 +162,21 @@ public class TestHoodieHFileInputFormat {
|
|||||||
assertEquals(10, inputSplits.length);
|
assertEquals(10, inputSplits.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInputFormatLoadWithEmptyTable() throws IOException {
|
||||||
|
// initial hoodie table
|
||||||
|
String bathPathStr = "/tmp/test_empty_table";
|
||||||
|
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), bathPathStr, HoodieTableType.COPY_ON_WRITE,
|
||||||
|
baseFileFormat);
|
||||||
|
// Add the paths
|
||||||
|
FileInputFormat.setInputPaths(jobConf, bathPathStr);
|
||||||
|
|
||||||
|
FileStatus[] files = inputFormat.listStatus(jobConf);
|
||||||
|
assertEquals(0, files.length);
|
||||||
|
InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 0);
|
||||||
|
assertEquals(0, inputSplits.length);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInputFormatUpdates() throws IOException {
|
public void testInputFormatUpdates() throws IOException {
|
||||||
// initial commit
|
// initial commit
|
||||||
|
|||||||
@@ -167,6 +167,21 @@ public class TestHoodieParquetInputFormat {
|
|||||||
assertEquals(10, files.length);
|
assertEquals(10, files.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInputFormatLoadWithEmptyTable() throws IOException {
|
||||||
|
// initial hoodie table
|
||||||
|
String bathPathStr = "/tmp/test_empty_table";
|
||||||
|
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), bathPathStr, HoodieTableType.COPY_ON_WRITE,
|
||||||
|
baseFileFormat);
|
||||||
|
// Add the paths
|
||||||
|
FileInputFormat.setInputPaths(jobConf, bathPathStr);
|
||||||
|
|
||||||
|
FileStatus[] files = inputFormat.listStatus(jobConf);
|
||||||
|
assertEquals(0, files.length);
|
||||||
|
InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 0);
|
||||||
|
assertEquals(0, inputSplits.length);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInputFormatUpdates() throws IOException {
|
public void testInputFormatUpdates() throws IOException {
|
||||||
// initial commit
|
// initial commit
|
||||||
|
|||||||
@@ -56,6 +56,12 @@ public class TestInputPathHandler {
|
|||||||
// non Hoodie table
|
// non Hoodie table
|
||||||
public static final String TRIPS_STATS_TEST_NAME = "trips_stats";
|
public static final String TRIPS_STATS_TEST_NAME = "trips_stats";
|
||||||
|
|
||||||
|
// empty snapshot table
|
||||||
|
public static final String EMPTY_SNAPSHOT_TEST_NAME = "empty_snapshot";
|
||||||
|
|
||||||
|
// empty incremental table
|
||||||
|
public static final String EMPTY_INCREMENTAL_TEST_NAME = "empty_incremental";
|
||||||
|
|
||||||
@TempDir
|
@TempDir
|
||||||
static java.nio.file.Path parentPath;
|
static java.nio.file.Path parentPath;
|
||||||
|
|
||||||
@@ -67,6 +73,8 @@ public class TestInputPathHandler {
|
|||||||
private static String basePathTable2 = null;
|
private static String basePathTable2 = null;
|
||||||
private static String basePathTable3 = null;
|
private static String basePathTable3 = null;
|
||||||
private static String basePathTable4 = null; // non hoodie Path
|
private static String basePathTable4 = null; // non hoodie Path
|
||||||
|
private static String basePathTable5 = null;
|
||||||
|
private static String basePathTable6 = null;
|
||||||
private static List<String> incrementalTables;
|
private static List<String> incrementalTables;
|
||||||
private static List<Path> incrementalPaths;
|
private static List<Path> incrementalPaths;
|
||||||
private static List<Path> snapshotPaths;
|
private static List<Path> snapshotPaths;
|
||||||
@@ -110,6 +118,9 @@ public class TestInputPathHandler {
|
|||||||
basePathTable2 = parentPath.resolve(MODEL_TRIPS_TEST_NAME).toAbsolutePath().toString();
|
basePathTable2 = parentPath.resolve(MODEL_TRIPS_TEST_NAME).toAbsolutePath().toString();
|
||||||
basePathTable3 = parentPath.resolve(ETL_TRIPS_TEST_NAME).toAbsolutePath().toString();
|
basePathTable3 = parentPath.resolve(ETL_TRIPS_TEST_NAME).toAbsolutePath().toString();
|
||||||
basePathTable4 = parentPath.resolve(TRIPS_STATS_TEST_NAME).toAbsolutePath().toString();
|
basePathTable4 = parentPath.resolve(TRIPS_STATS_TEST_NAME).toAbsolutePath().toString();
|
||||||
|
String tempPath = "/tmp/";
|
||||||
|
basePathTable5 = tempPath + EMPTY_SNAPSHOT_TEST_NAME;
|
||||||
|
basePathTable6 = tempPath + EMPTY_INCREMENTAL_TEST_NAME;
|
||||||
|
|
||||||
dfs.mkdirs(new Path(basePathTable1));
|
dfs.mkdirs(new Path(basePathTable1));
|
||||||
initTableType(dfs.getConf(), basePathTable1, RAW_TRIPS_TEST_NAME, HoodieTableType.MERGE_ON_READ);
|
initTableType(dfs.getConf(), basePathTable1, RAW_TRIPS_TEST_NAME, HoodieTableType.MERGE_ON_READ);
|
||||||
@@ -126,6 +137,12 @@ public class TestInputPathHandler {
|
|||||||
dfs.mkdirs(new Path(basePathTable4));
|
dfs.mkdirs(new Path(basePathTable4));
|
||||||
nonHoodiePaths.addAll(generatePartitions(dfs, basePathTable4));
|
nonHoodiePaths.addAll(generatePartitions(dfs, basePathTable4));
|
||||||
|
|
||||||
|
initTableType(dfs.getConf(), basePathTable5, EMPTY_SNAPSHOT_TEST_NAME, HoodieTableType.COPY_ON_WRITE);
|
||||||
|
snapshotPaths.add(new Path(basePathTable5));
|
||||||
|
|
||||||
|
initTableType(dfs.getConf(), basePathTable6, EMPTY_INCREMENTAL_TEST_NAME, HoodieTableType.MERGE_ON_READ);
|
||||||
|
incrementalPaths.add(new Path(basePathTable6));
|
||||||
|
|
||||||
inputPaths.addAll(incrementalPaths);
|
inputPaths.addAll(incrementalPaths);
|
||||||
inputPaths.addAll(snapshotPaths);
|
inputPaths.addAll(snapshotPaths);
|
||||||
inputPaths.addAll(nonHoodiePaths);
|
inputPaths.addAll(nonHoodiePaths);
|
||||||
@@ -133,6 +150,7 @@ public class TestInputPathHandler {
|
|||||||
incrementalTables = new ArrayList<>();
|
incrementalTables = new ArrayList<>();
|
||||||
incrementalTables.add(RAW_TRIPS_TEST_NAME);
|
incrementalTables.add(RAW_TRIPS_TEST_NAME);
|
||||||
incrementalTables.add(MODEL_TRIPS_TEST_NAME);
|
incrementalTables.add(MODEL_TRIPS_TEST_NAME);
|
||||||
|
incrementalTables.add(EMPTY_INCREMENTAL_TEST_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath,
|
static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath,
|
||||||
|
|||||||
Reference in New Issue
Block a user