[HUDI-3709] Fixing ParquetWriter impls not respecting Parquet Max File Size limit (#5129)
This commit is contained in:
@@ -37,8 +37,6 @@ public interface HoodieFileWriter<R extends IndexedRecord> {
|
|||||||
|
|
||||||
void writeAvro(String key, R oldRecord) throws IOException;
|
void writeAvro(String key, R oldRecord) throws IOException;
|
||||||
|
|
||||||
long getBytesWritten();
|
|
||||||
|
|
||||||
default void prepRecordWithMetadata(R avroRecord, HoodieRecord record, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) {
|
default void prepRecordWithMetadata(R avroRecord, HoodieRecord record, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) {
|
||||||
String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex.getAndIncrement());
|
String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex.getAndIncrement());
|
||||||
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName);
|
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName);
|
||||||
|
|||||||
@@ -187,9 +187,4 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR
|
|||||||
writer.close();
|
writer.close();
|
||||||
writer = null;
|
writer = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getBytesWritten() {
|
|
||||||
return fs.getBytesWritten(file);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -165,9 +165,4 @@ public class HoodieOrcWriter<T extends HoodieRecordPayload, R extends IndexedRec
|
|||||||
|
|
||||||
writer.close();
|
writer.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getBytesWritten() {
|
|
||||||
return fs.getBytesWritten(file);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -97,7 +97,7 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean canWrite() {
|
public boolean canWrite() {
|
||||||
return fs.getBytesWritten(file) < maxFileSize;
|
return getDataSize() < maxFileSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -107,9 +107,4 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
|
|||||||
writeSupport.add(key);
|
writeSupport.add(key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getBytesWritten() {
|
|
||||||
return fs.getBytesWritten(file);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -402,7 +402,7 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase
|
|||||||
counts++;
|
counts++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assertEquals(3, counts, "If the number of records are more than 1150, then there should be a new file");
|
assertEquals(5, counts, "If the number of records are more than 1150, then there should be a new file");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
@@ -56,7 +56,7 @@ public class HoodieInternalRowParquetWriter extends ParquetWriter<InternalRow>
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean canWrite() {
|
public boolean canWrite() {
|
||||||
return fs.getBytesWritten(file) < maxFileSize;
|
return getDataSize() < maxFileSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -437,7 +437,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
|
|||||||
counts++;
|
counts++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assertEquals(3, counts, "If the number of records are more than 1150, then there should be a new file");
|
assertEquals(5, counts, "If the number of records are more than 1150, then there should be a new file");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
Reference in New Issue
Block a user