[HUDI-1529] Add block size to the FileStatus objects returned from metadata table to avoid too many file splits (#2451)
This commit is contained in:
@@ -801,6 +801,14 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
|
|||||||
// File sizes should be valid
|
// File sizes should be valid
|
||||||
Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getLen() > 0));
|
Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getLen() > 0));
|
||||||
|
|
||||||
|
// Block sizes should be valid
|
||||||
|
Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getBlockSize() > 0));
|
||||||
|
List<Long> fsBlockSizes = Arrays.stream(fsStatuses).map(FileStatus::getBlockSize).collect(Collectors.toList());
|
||||||
|
Collections.sort(fsBlockSizes);
|
||||||
|
List<Long> metadataBlockSizes = Arrays.stream(metaStatuses).map(FileStatus::getBlockSize).collect(Collectors.toList());
|
||||||
|
Collections.sort(metadataBlockSizes);
|
||||||
|
assertEquals(fsBlockSizes, metadataBlockSizes);
|
||||||
|
|
||||||
if ((fsFileNames.size() != metadataFilenames.size()) || (!fsFileNames.equals(metadataFilenames))) {
|
if ((fsFileNames.size() != metadataFilenames.size()) || (!fsFileNames.equals(metadataFilenames))) {
|
||||||
LOG.info("*** File system listing = " + Arrays.toString(fsFileNames.toArray()));
|
LOG.info("*** File system listing = " + Arrays.toString(fsFileNames.toArray()));
|
||||||
LOG.info("*** Metadata listing = " + Arrays.toString(metadataFilenames.toArray()));
|
LOG.info("*** Metadata listing = " + Arrays.toString(metadataFilenames.toArray()));
|
||||||
|
|||||||
@@ -202,7 +202,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
|
|||||||
throw new HoodieMetadataException("Metadata record for partition " + partitionName + " is inconsistent: "
|
throw new HoodieMetadataException("Metadata record for partition " + partitionName + " is inconsistent: "
|
||||||
+ hoodieRecord.get().getData());
|
+ hoodieRecord.get().getData());
|
||||||
}
|
}
|
||||||
statuses = hoodieRecord.get().getData().getFileStatuses(partitionPath);
|
statuses = hoodieRecord.get().getData().getFileStatuses(hadoopConf.get(), partitionPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (validateLookups) {
|
if (validateLookups) {
|
||||||
|
|||||||
@@ -29,7 +29,9 @@ import org.apache.hudi.exception.HoodieMetadataException;
|
|||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.avro.generic.IndexedRecord;
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@@ -177,10 +179,12 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
|
|||||||
/**
|
/**
|
||||||
* Returns the files added as part of this record.
|
* Returns the files added as part of this record.
|
||||||
*/
|
*/
|
||||||
public FileStatus[] getFileStatuses(Path partitionPath) {
|
public FileStatus[] getFileStatuses(Configuration hadoopConf, Path partitionPath) throws IOException {
|
||||||
|
FileSystem fs = partitionPath.getFileSystem(hadoopConf);
|
||||||
|
long blockSize = fs.getDefaultBlockSize(partitionPath);
|
||||||
return filterFileInfoEntries(false)
|
return filterFileInfoEntries(false)
|
||||||
.map(e -> new FileStatus(e.getValue().getSize(), false, 0, 0, 0, 0, null, null, null,
|
.map(e -> new FileStatus(e.getValue().getSize(), false, 0, blockSize, 0, 0,
|
||||||
new Path(partitionPath, e.getKey())))
|
null, null, null, new Path(partitionPath, e.getKey())))
|
||||||
.toArray(FileStatus[]::new);
|
.toArray(FileStatus[]::new);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user