diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala index 3802bb46a..fbfc1d8ec 100644 --- a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala +++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala @@ -28,7 +28,7 @@ import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory} import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} import org.apache.hudi.common.util.BaseFileUtils import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig} -import org.apache.hudi.io.storage.{HoodieAvroParquetConfig, HoodieParquetWriter} +import org.apache.hudi.io.storage.{HoodieAvroParquetConfig, HoodieAvroParquetWriter} import org.apache.parquet.avro.AvroSchemaConverter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.sql.{DataFrame, SQLContext} @@ -50,8 +50,7 @@ object SparkHelpers { // Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'. parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader) - val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](instantTime, destinationFile, parquetConfig, schema, new SparkTaskContextSupplier(), - true) + val writer = new HoodieAvroParquetWriter[IndexedRecord](destinationFile, parquetConfig, instantTime, new SparkTaskContextSupplier(), true) for (rec <- sourceRecords) { val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString if (!keysToSkip.contains(key)) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java similarity index 51% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java index 095cacc14..6f7940d04 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java @@ -18,22 +18,15 @@ package org.apache.hudi.io.storage; -import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.engine.TaskContextSupplier; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.parquet.hadoop.ParquetFileWriter; -import org.apache.parquet.hadoop.ParquetWriter; import javax.annotation.concurrent.NotThreadSafe; import java.io.IOException; -import java.util.concurrent.atomic.AtomicLong; /** * HoodieParquetWriter extends the ParquetWriter to help limit the size of underlying file. Provides a way to check if @@ -42,45 +35,24 @@ import java.util.concurrent.atomic.AtomicLong; * ATTENTION: HoodieParquetWriter is not thread safe and developer should take care of the order of write and close */ @NotThreadSafe -public class HoodieParquetWriter - extends ParquetWriter implements HoodieFileWriter { +public class HoodieAvroParquetWriter + extends HoodieBaseParquetWriter + implements HoodieFileWriter { - private static AtomicLong recordIndex = new AtomicLong(1); - - private final Path file; - private final HoodieWrapperFileSystem fs; - private final long maxFileSize; - private final HoodieAvroWriteSupport writeSupport; + private final String fileName; private final String instantTime; private final TaskContextSupplier taskContextSupplier; private final boolean populateMetaFields; + private final HoodieAvroWriteSupport writeSupport; - public HoodieParquetWriter(String instantTime, - Path file, - HoodieAvroParquetConfig parquetConfig, - Schema schema, - TaskContextSupplier taskContextSupplier, - boolean populateMetaFields) throws IOException { - super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()), - ParquetFileWriter.Mode.CREATE, - parquetConfig.getWriteSupport(), - parquetConfig.getCompressionCodecName(), - parquetConfig.getBlockSize(), - parquetConfig.getPageSize(), - parquetConfig.getPageSize(), - parquetConfig.dictionaryEnabled(), - DEFAULT_IS_VALIDATING_ENABLED, - DEFAULT_WRITER_VERSION, - FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf())); - this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()); - this.fs = - (HoodieWrapperFileSystem) this.file.getFileSystem(FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf())); - // We cannot accurately measure the snappy compressed output file size. We are choosing a - // conservative 10% - // TODO - compute this compression ratio dynamically by looking at the bytes written to the - // stream and the actual file size reported by HDFS - this.maxFileSize = parquetConfig.getMaxFileSize() - + Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio()); + @SuppressWarnings({"unchecked", "rawtypes"}) + public HoodieAvroParquetWriter(Path file, + HoodieAvroParquetConfig parquetConfig, + String instantTime, + TaskContextSupplier taskContextSupplier, + boolean populateMetaFields) throws IOException { + super(file, (HoodieBaseParquetConfig) parquetConfig); + this.fileName = file.getName(); this.writeSupport = parquetConfig.getWriteSupport(); this.instantTime = instantTime; this.taskContextSupplier = taskContextSupplier; @@ -91,7 +63,7 @@ public class HoodieParquetWriter target type of the object being written into Parquet files (for ex, + * {@code IndexedRecord}, {@code InternalRow}) + */ +public abstract class HoodieBaseParquetWriter extends ParquetWriter { + + private static final int WRITTEN_RECORDS_THRESHOLD_FOR_FILE_SIZE_CHECK = 1000; + + private final AtomicLong writtenRecordCount = new AtomicLong(0); + private final long maxFileSize; + private long lastCachedDataSize = -1; + + public HoodieBaseParquetWriter(Path file, + HoodieBaseParquetConfig> parquetConfig) throws IOException { + super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()), + ParquetFileWriter.Mode.CREATE, + parquetConfig.getWriteSupport(), + parquetConfig.getCompressionCodecName(), + parquetConfig.getBlockSize(), + parquetConfig.getPageSize(), + parquetConfig.getPageSize(), + parquetConfig.dictionaryEnabled(), + DEFAULT_IS_VALIDATING_ENABLED, + DEFAULT_WRITER_VERSION, + FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf())); + + // We cannot accurately measure the snappy compressed output file size. We are choosing a + // conservative 10% + // TODO - compute this compression ratio dynamically by looking at the bytes written to the + // stream and the actual file size reported by HDFS + this.maxFileSize = parquetConfig.getMaxFileSize() + + Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio()); + } + + public boolean canWrite() { + // TODO we can actually do evaluation more accurately: + // if we cache last data size check, since we account for how many records + // were written we can accurately project avg record size, and therefore + // estimate how many more records we can write before cut off + if (lastCachedDataSize == -1 || getWrittenRecordCount() % WRITTEN_RECORDS_THRESHOLD_FOR_FILE_SIZE_CHECK == 0) { + lastCachedDataSize = getDataSize(); + } + return lastCachedDataSize < maxFileSize; + } + + @Override + public void write(R object) throws IOException { + super.write(object); + writtenRecordCount.incrementAndGet(); + } + + protected long getWrittenRecordCount() { + return writtenRecordCount.get(); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java index 1d1dd5c9b..cce59d3b6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java @@ -18,14 +18,12 @@ package org.apache.hudi.io.storage; -import java.util.concurrent.atomic.AtomicLong; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.avro.generic.IndexedRecord; - import java.io.IOException; public interface HoodieFileWriter { @@ -38,8 +36,8 @@ public interface HoodieFileWriter { void writeAvro(String key, R oldRecord) throws IOException; - default void prepRecordWithMetadata(HoodieKey key, R avroRecord, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) { - String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex.getAndIncrement()); + default void prepRecordWithMetadata(HoodieKey key, R avroRecord, String instantTime, Integer partitionId, long recordIndex, String fileName) { + String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex); HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, key.getRecordKey(), key.getPartitionPath(), fileName); HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId); return; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java index 7d0c307db..ffdff2573 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java @@ -81,7 +81,7 @@ public class HoodieFileWriterFactory { config.getParquetBlockSize(), config.getParquetPageSize(), config.getParquetMaxFileSize(), conf, config.getParquetCompressionRatio(), config.parquetDictionaryEnabled()); - return new HoodieParquetWriter<>(instantTime, path, parquetConfig, schema, taskContextSupplier, populateMetaFields); + return new HoodieAvroParquetWriter<>(path, parquetConfig, instantTime, taskContextSupplier, populateMetaFields); } static HoodieFileWriter newHFileFileWriter( diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java index 91f79cefa..f065608b2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java @@ -113,7 +113,7 @@ public class HoodieHFileWriter( + new Path(Paths.get(basePath, partition, fileName).toString()), config, currentInstantTime, + contextSupplier, populateMetaFields)) { int seqId = 1; for (HoodieRecord record : records) { GenericRecord avroRecord = (GenericRecord) ((HoodieRecordPayload) record.getData()).getInsertValue(schema).get(); diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java index 518414d61..7b0c4dbdf 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java @@ -379,7 +379,7 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase List records = new ArrayList<>(); // Approx 1150 records are written for block size of 64KB - for (int i = 0; i < 2000; i++) { + for (int i = 0; i < 2050; i++) { String recordStr = "{\"_row_key\":\"" + UUID.randomUUID().toString() + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":" + i + "}"; RawTripTestPayload rowChange = new RawTripTestPayload(recordStr); @@ -402,7 +402,8 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase counts++; } } - assertEquals(5, counts, "If the number of records are more than 1150, then there should be a new file"); + // we check canWrite only once every 1000 records. and so 2 files with 1000 records and 3rd file with 50 records. + assertEquals(3, counts, "If the number of records are more than 1150, then there should be a new file"); } @Test diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java index 7e64d8387..5a0a60ea0 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java @@ -19,11 +19,7 @@ package org.apache.hudi.io.storage.row; import org.apache.hadoop.fs.Path; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.fs.HoodieWrapperFileSystem; - -import org.apache.parquet.hadoop.ParquetFileWriter; -import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.hudi.io.storage.HoodieBaseParquetWriter; import org.apache.spark.sql.catalyst.InternalRow; import java.io.IOException; @@ -31,32 +27,16 @@ import java.io.IOException; /** * Parquet's impl of {@link HoodieInternalRowFileWriter} to write {@link InternalRow}s. */ -public class HoodieInternalRowParquetWriter extends ParquetWriter +public class HoodieInternalRowParquetWriter extends HoodieBaseParquetWriter implements HoodieInternalRowFileWriter { - private final Path file; - private final HoodieWrapperFileSystem fs; - private final long maxFileSize; private final HoodieRowParquetWriteSupport writeSupport; public HoodieInternalRowParquetWriter(Path file, HoodieRowParquetConfig parquetConfig) throws IOException { - super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()), - ParquetFileWriter.Mode.CREATE, parquetConfig.getWriteSupport(), parquetConfig.getCompressionCodecName(), - parquetConfig.getBlockSize(), parquetConfig.getPageSize(), parquetConfig.getPageSize(), - DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED, - DEFAULT_WRITER_VERSION, FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf())); - this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()); - this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(FSUtils.registerFileSystem(file, - parquetConfig.getHadoopConf())); - this.maxFileSize = parquetConfig.getMaxFileSize() - + Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio()); - this.writeSupport = parquetConfig.getWriteSupport(); - } + super(file, parquetConfig); - @Override - public boolean canWrite() { - return getDataSize() < maxFileSize; + this.writeSupport = parquetConfig.getWriteSupport(); } @Override @@ -69,9 +49,4 @@ public class HoodieInternalRowParquetWriter extends ParquetWriter public void writeRow(InternalRow row) throws IOException { super.write(row); } - - @Override - public void close() throws IOException { - super.close(); - } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java index b7f34ab2b..66016305d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java @@ -49,7 +49,7 @@ public class TestHoodieFileWriterFactory extends HoodieClientTestBase { SparkTaskContextSupplier supplier = new SparkTaskContextSupplier(); HoodieFileWriter parquetWriter = HoodieFileWriterFactory.getFileWriter(instantTime, parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier); - assertTrue(parquetWriter instanceof HoodieParquetWriter); + assertTrue(parquetWriter instanceof HoodieAvroParquetWriter); // hfile format. final Path hfilePath = new Path(basePath + "/partition/path/f1_1-0-1_000.hfile"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java index 8114daa30..9574d35a6 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java @@ -419,7 +419,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { List records = new ArrayList<>(); // Approx 1150 records are written for block size of 64KB - for (int i = 0; i < 2000; i++) { + for (int i = 0; i < 2050; i++) { String recordStr = "{\"_row_key\":\"" + UUID.randomUUID().toString() + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":" + i + "}"; RawTripTestPayload rowChange = new RawTripTestPayload(recordStr); @@ -441,7 +441,8 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { counts++; } } - assertEquals(5, counts, "If the number of records are more than 1150, then there should be a new file"); + // we check canWrite only once every 1000 records. and so 2 files with 1000 records and 3rd file with 50 records. + assertEquals(3, counts, "If the number of records are more than 1150, then there should be a new file"); } @Test