[HUDI-3855] Fixing FILENAME_METADATA_FIELD not being correctly updated in HoodieMergeHandle (#5296)
Fixing FILENAME_METADATA_FIELD not being correctly updated in HoodieMergeHandle, in cases when old-record is carried over from existing file as is. - Revisited HoodieFileWriter API to accept HoodieKey instead of HoodieRecord - Fixed FILENAME_METADATA_FIELD not being overridden in cases when simply old record is carried over - Exposing standard JVM's debugger ports in Docker setup
This commit is contained in:
@@ -26,6 +26,8 @@ services:
|
|||||||
ports:
|
ports:
|
||||||
- "50070:50070"
|
- "50070:50070"
|
||||||
- "8020:8020"
|
- "8020:8020"
|
||||||
|
# JVM debugging port (will be mapped to a random port on host)
|
||||||
|
- "5005"
|
||||||
env_file:
|
env_file:
|
||||||
- ./hadoop.env
|
- ./hadoop.env
|
||||||
healthcheck:
|
healthcheck:
|
||||||
@@ -45,6 +47,8 @@ services:
|
|||||||
ports:
|
ports:
|
||||||
- "50075:50075"
|
- "50075:50075"
|
||||||
- "50010:50010"
|
- "50010:50010"
|
||||||
|
# JVM debugging port (will be mapped to a random port on host)
|
||||||
|
- "5005"
|
||||||
links:
|
links:
|
||||||
- "namenode"
|
- "namenode"
|
||||||
- "historyserver"
|
- "historyserver"
|
||||||
@@ -99,6 +103,8 @@ services:
|
|||||||
SERVICE_PRECONDITION: "namenode:50070 hive-metastore-postgresql:5432"
|
SERVICE_PRECONDITION: "namenode:50070 hive-metastore-postgresql:5432"
|
||||||
ports:
|
ports:
|
||||||
- "9083:9083"
|
- "9083:9083"
|
||||||
|
# JVM debugging port (will be mapped to a random port on host)
|
||||||
|
- "5005"
|
||||||
healthcheck:
|
healthcheck:
|
||||||
test: ["CMD", "nc", "-z", "hivemetastore", "9083"]
|
test: ["CMD", "nc", "-z", "hivemetastore", "9083"]
|
||||||
interval: 30s
|
interval: 30s
|
||||||
@@ -118,6 +124,8 @@ services:
|
|||||||
SERVICE_PRECONDITION: "hivemetastore:9083"
|
SERVICE_PRECONDITION: "hivemetastore:9083"
|
||||||
ports:
|
ports:
|
||||||
- "10000:10000"
|
- "10000:10000"
|
||||||
|
# JVM debugging port (will be mapped to a random port on host)
|
||||||
|
- "5005"
|
||||||
depends_on:
|
depends_on:
|
||||||
- "hivemetastore"
|
- "hivemetastore"
|
||||||
links:
|
links:
|
||||||
@@ -136,6 +144,8 @@ services:
|
|||||||
ports:
|
ports:
|
||||||
- "8080:8080"
|
- "8080:8080"
|
||||||
- "7077:7077"
|
- "7077:7077"
|
||||||
|
# JVM debugging port (will be mapped to a random port on host)
|
||||||
|
- "5005"
|
||||||
environment:
|
environment:
|
||||||
- INIT_DAEMON_STEP=setup_spark
|
- INIT_DAEMON_STEP=setup_spark
|
||||||
links:
|
links:
|
||||||
@@ -154,6 +164,8 @@ services:
|
|||||||
- sparkmaster
|
- sparkmaster
|
||||||
ports:
|
ports:
|
||||||
- "8081:8081"
|
- "8081:8081"
|
||||||
|
# JVM debugging port (will be mapped to a random port on host)
|
||||||
|
- "5005"
|
||||||
environment:
|
environment:
|
||||||
- "SPARK_MASTER=spark://sparkmaster:7077"
|
- "SPARK_MASTER=spark://sparkmaster:7077"
|
||||||
links:
|
links:
|
||||||
@@ -167,7 +179,7 @@ services:
|
|||||||
hostname: zookeeper
|
hostname: zookeeper
|
||||||
container_name: zookeeper
|
container_name: zookeeper
|
||||||
ports:
|
ports:
|
||||||
- '2181:2181'
|
- "2181:2181"
|
||||||
environment:
|
environment:
|
||||||
- ALLOW_ANONYMOUS_LOGIN=yes
|
- ALLOW_ANONYMOUS_LOGIN=yes
|
||||||
|
|
||||||
@@ -176,7 +188,7 @@ services:
|
|||||||
hostname: kafkabroker
|
hostname: kafkabroker
|
||||||
container_name: kafkabroker
|
container_name: kafkabroker
|
||||||
ports:
|
ports:
|
||||||
- '9092:9092'
|
- "9092:9092"
|
||||||
environment:
|
environment:
|
||||||
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
|
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
|
||||||
- ALLOW_PLAINTEXT_LISTENER=yes
|
- ALLOW_PLAINTEXT_LISTENER=yes
|
||||||
@@ -186,7 +198,9 @@ services:
|
|||||||
hostname: presto-coordinator-1
|
hostname: presto-coordinator-1
|
||||||
image: apachehudi/hudi-hadoop_2.8.4-prestobase_0.271:latest
|
image: apachehudi/hudi-hadoop_2.8.4-prestobase_0.271:latest
|
||||||
ports:
|
ports:
|
||||||
- '8090:8090'
|
- "8090:8090"
|
||||||
|
# JVM debugging port (will be mapped to a random port on host)
|
||||||
|
- "5005"
|
||||||
environment:
|
environment:
|
||||||
- PRESTO_JVM_MAX_HEAP=512M
|
- PRESTO_JVM_MAX_HEAP=512M
|
||||||
- PRESTO_QUERY_MAX_MEMORY=1GB
|
- PRESTO_QUERY_MAX_MEMORY=1GB
|
||||||
@@ -226,7 +240,9 @@ services:
|
|||||||
hostname: trino-coordinator-1
|
hostname: trino-coordinator-1
|
||||||
image: apachehudi/hudi-hadoop_2.8.4-trinocoordinator_368:latest
|
image: apachehudi/hudi-hadoop_2.8.4-trinocoordinator_368:latest
|
||||||
ports:
|
ports:
|
||||||
- '8091:8091'
|
- "8091:8091"
|
||||||
|
# JVM debugging port (will be mapped to a random port on host)
|
||||||
|
- "5005"
|
||||||
links:
|
links:
|
||||||
- "hivemetastore"
|
- "hivemetastore"
|
||||||
volumes:
|
volumes:
|
||||||
@@ -239,7 +255,9 @@ services:
|
|||||||
image: apachehudi/hudi-hadoop_2.8.4-trinoworker_368:latest
|
image: apachehudi/hudi-hadoop_2.8.4-trinoworker_368:latest
|
||||||
depends_on: [ "trino-coordinator-1" ]
|
depends_on: [ "trino-coordinator-1" ]
|
||||||
ports:
|
ports:
|
||||||
- '8092:8092'
|
- "8092:8092"
|
||||||
|
# JVM debugging port (will be mapped to a random port on host)
|
||||||
|
- "5005"
|
||||||
links:
|
links:
|
||||||
- "hivemetastore"
|
- "hivemetastore"
|
||||||
- "hiveserver"
|
- "hiveserver"
|
||||||
@@ -268,6 +286,8 @@ services:
|
|||||||
- sparkmaster
|
- sparkmaster
|
||||||
ports:
|
ports:
|
||||||
- '4040:4040'
|
- '4040:4040'
|
||||||
|
# JVM debugging port (mapped to 5006 on the host)
|
||||||
|
- "5006:5005"
|
||||||
environment:
|
environment:
|
||||||
- "SPARK_MASTER=spark://sparkmaster:7077"
|
- "SPARK_MASTER=spark://sparkmaster:7077"
|
||||||
links:
|
links:
|
||||||
@@ -286,6 +306,9 @@ services:
|
|||||||
container_name: adhoc-2
|
container_name: adhoc-2
|
||||||
env_file:
|
env_file:
|
||||||
- ./hadoop.env
|
- ./hadoop.env
|
||||||
|
ports:
|
||||||
|
# JVM debugging port (mapped to 5005 on the host)
|
||||||
|
- "5005:5005"
|
||||||
depends_on:
|
depends_on:
|
||||||
- sparkmaster
|
- sparkmaster
|
||||||
environment:
|
environment:
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ package org.apache.hudi.io;
|
|||||||
|
|
||||||
import org.apache.hudi.common.engine.TaskContextSupplier;
|
import org.apache.hudi.common.engine.TaskContextSupplier;
|
||||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||||
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieRecordLocation;
|
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
@@ -93,7 +94,8 @@ public class HoodieConcatHandle<T extends HoodieRecordPayload, I, K, O> extends
|
|||||||
public void write(GenericRecord oldRecord) {
|
public void write(GenericRecord oldRecord) {
|
||||||
String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt);
|
String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt);
|
||||||
try {
|
try {
|
||||||
fileWriter.writeAvro(key, oldRecord);
|
// NOTE: We're enforcing preservation of the record metadata to keep existing semantic
|
||||||
|
writeToFile(new HoodieKey(key, partitionPath), oldRecord, true);
|
||||||
} catch (IOException | RuntimeException e) {
|
} catch (IOException | RuntimeException e) {
|
||||||
String errMsg = String.format("Failed to write old record into new file for key %s from old file %s to new file %s with writerSchema %s",
|
String errMsg = String.format("Failed to write old record into new file for key %s from old file %s to new file %s with writerSchema %s",
|
||||||
key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
|
key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
|
||||||
|
|||||||
@@ -142,7 +142,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
|
|||||||
fileWriter.writeAvro(record.getRecordKey(),
|
fileWriter.writeAvro(record.getRecordKey(),
|
||||||
rewriteRecordWithMetadata((GenericRecord) avroRecord.get(), path.getName()));
|
rewriteRecordWithMetadata((GenericRecord) avroRecord.get(), path.getName()));
|
||||||
} else {
|
} else {
|
||||||
fileWriter.writeAvroWithMetadata(rewriteRecord((GenericRecord) avroRecord.get()), record);
|
fileWriter.writeAvroWithMetadata(record.getKey(), rewriteRecord((GenericRecord) avroRecord.get()));
|
||||||
}
|
}
|
||||||
// update the new location of record, so we know where to find it next
|
// update the new location of record, so we know where to find it next
|
||||||
record.unseal();
|
record.unseal();
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ import org.apache.hudi.client.WriteStatus;
|
|||||||
import org.apache.hudi.common.engine.TaskContextSupplier;
|
import org.apache.hudi.common.engine.TaskContextSupplier;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||||
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
import org.apache.hudi.common.model.HoodieOperation;
|
import org.apache.hudi.common.model.HoodieOperation;
|
||||||
import org.apache.hudi.common.model.HoodiePartitionMetadata;
|
import org.apache.hudi.common.model.HoodiePartitionMetadata;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
@@ -292,13 +293,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
|
|||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
if (indexedRecord.isPresent() && !isDelete) {
|
if (indexedRecord.isPresent() && !isDelete) {
|
||||||
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
|
writeToFile(hoodieRecord.getKey(), (GenericRecord) indexedRecord.get(), preserveMetadata && useWriterSchemaForCompaction);
|
||||||
if (preserveMetadata && useWriterSchemaForCompaction) { // useWriteSchema will be true only in case of compaction.
|
|
||||||
fileWriter.writeAvro(hoodieRecord.getRecordKey(),
|
|
||||||
rewriteRecordWithMetadata((GenericRecord) indexedRecord.get(), newFilePath.getName()));
|
|
||||||
} else {
|
|
||||||
fileWriter.writeAvroWithMetadata(rewriteRecord((GenericRecord) indexedRecord.get()), hoodieRecord);
|
|
||||||
}
|
|
||||||
recordsWritten++;
|
recordsWritten++;
|
||||||
} else {
|
} else {
|
||||||
recordsDeleted++;
|
recordsDeleted++;
|
||||||
@@ -352,14 +347,9 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (copyOldRecord) {
|
if (copyOldRecord) {
|
||||||
// this should work as it is, since this is an existing record
|
|
||||||
try {
|
try {
|
||||||
// rewrite file names
|
// NOTE: We're enforcing preservation of the record metadata to keep existing semantic
|
||||||
// do not preserve FILENAME_METADATA_FIELD
|
writeToFile(new HoodieKey(key, partitionPath), oldRecord, true);
|
||||||
if (preserveMetadata && useWriterSchemaForCompaction) {
|
|
||||||
oldRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, newFilePath.getName());
|
|
||||||
}
|
|
||||||
fileWriter.writeAvro(key, oldRecord);
|
|
||||||
} catch (IOException | RuntimeException e) {
|
} catch (IOException | RuntimeException e) {
|
||||||
String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s",
|
String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s",
|
||||||
key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
|
key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
|
||||||
@@ -370,6 +360,16 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void writeToFile(HoodieKey key, GenericRecord avroRecord, boolean shouldPreserveRecordMetadata) throws IOException {
|
||||||
|
if (shouldPreserveRecordMetadata) {
|
||||||
|
// NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the
|
||||||
|
// file holding this record even in cases when overall metadata is preserved
|
||||||
|
fileWriter.writeAvro(key.getRecordKey(), rewriteRecordWithMetadata(avroRecord, newFilePath.getName()));
|
||||||
|
} else {
|
||||||
|
fileWriter.writeAvroWithMetadata(key, rewriteRecord(avroRecord));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected void writeIncomingRecords() throws IOException {
|
protected void writeIncomingRecords() throws IOException {
|
||||||
// write out any pending records (this can happen when inserts are turned into updates)
|
// write out any pending records (this can happen when inserts are turned into updates)
|
||||||
Iterator<HoodieRecord<T>> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap)
|
Iterator<HoodieRecord<T>> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap)
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ import java.util.Queue;
|
|||||||
*/
|
*/
|
||||||
public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieMergeHandle<T, I, K, O> {
|
public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieMergeHandle<T, I, K, O> {
|
||||||
|
|
||||||
private Queue<String> newRecordKeysSorted = new PriorityQueue<>();
|
private final Queue<String> newRecordKeysSorted = new PriorityQueue<>();
|
||||||
|
|
||||||
public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
|
public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
|
||||||
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier,
|
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier,
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ package org.apache.hudi.io.storage;
|
|||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||||
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
|
|
||||||
import org.apache.avro.generic.IndexedRecord;
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
@@ -29,7 +30,7 @@ import java.io.IOException;
|
|||||||
|
|
||||||
public interface HoodieFileWriter<R extends IndexedRecord> {
|
public interface HoodieFileWriter<R extends IndexedRecord> {
|
||||||
|
|
||||||
void writeAvroWithMetadata(R newRecord, HoodieRecord record) throws IOException;
|
void writeAvroWithMetadata(HoodieKey key, R newRecord) throws IOException;
|
||||||
|
|
||||||
boolean canWrite();
|
boolean canWrite();
|
||||||
|
|
||||||
@@ -37,9 +38,9 @@ public interface HoodieFileWriter<R extends IndexedRecord> {
|
|||||||
|
|
||||||
void writeAvro(String key, R oldRecord) throws IOException;
|
void writeAvro(String key, R oldRecord) throws IOException;
|
||||||
|
|
||||||
default void prepRecordWithMetadata(R avroRecord, HoodieRecord record, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) {
|
default void prepRecordWithMetadata(HoodieKey key, R avroRecord, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) {
|
||||||
String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex.getAndIncrement());
|
String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex.getAndIncrement());
|
||||||
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName);
|
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, key.getRecordKey(), key.getPartitionPath(), fileName);
|
||||||
HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId);
|
HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,16 +18,6 @@
|
|||||||
|
|
||||||
package org.apache.hudi.io.storage;
|
package org.apache.hudi.io.storage;
|
||||||
|
|
||||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
|
||||||
import org.apache.hudi.common.bloom.BloomFilter;
|
|
||||||
import org.apache.hudi.common.engine.TaskContextSupplier;
|
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
|
||||||
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
|
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
|
||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
|
||||||
import org.apache.hudi.common.util.Option;
|
|
||||||
import org.apache.hudi.common.util.StringUtils;
|
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.avro.generic.IndexedRecord;
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
@@ -40,6 +30,15 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
|
|||||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
|
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||||
|
import org.apache.hudi.common.bloom.BloomFilter;
|
||||||
|
import org.apache.hudi.common.engine.TaskContextSupplier;
|
||||||
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
|
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
|
||||||
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
|
|
||||||
import java.io.DataInput;
|
import java.io.DataInput;
|
||||||
import java.io.DataOutput;
|
import java.io.DataOutput;
|
||||||
@@ -111,13 +110,13 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
|
public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException {
|
||||||
if (populateMetaFields) {
|
if (populateMetaFields) {
|
||||||
prepRecordWithMetadata(avroRecord, record, instantTime,
|
prepRecordWithMetadata(key, avroRecord, instantTime,
|
||||||
taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName());
|
taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName());
|
||||||
writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord);
|
writeAvro(key.getRecordKey(), avroRecord);
|
||||||
} else {
|
} else {
|
||||||
writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord);
|
writeAvro(key.getRecordKey(), avroRecord);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -18,34 +18,35 @@
|
|||||||
|
|
||||||
package org.apache.hudi.io.storage;
|
package org.apache.hudi.io.storage;
|
||||||
|
|
||||||
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
|
import org.apache.avro.Schema;
|
||||||
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hudi.common.bloom.BloomFilter;
|
||||||
|
import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
|
||||||
|
import org.apache.hudi.common.engine.TaskContextSupplier;
|
||||||
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
|
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
|
||||||
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
|
import org.apache.hudi.common.util.AvroOrcUtils;
|
||||||
|
import org.apache.orc.OrcFile;
|
||||||
|
import org.apache.orc.TypeDescription;
|
||||||
|
import org.apache.orc.Writer;
|
||||||
|
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
|
||||||
|
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import org.apache.avro.Schema;
|
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
|
||||||
import org.apache.avro.generic.IndexedRecord;
|
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
|
||||||
import org.apache.hadoop.fs.Path;
|
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
|
||||||
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
|
|
||||||
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
|
|
||||||
import org.apache.hudi.common.bloom.BloomFilter;
|
|
||||||
import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
|
|
||||||
import org.apache.orc.OrcFile;
|
|
||||||
import org.apache.orc.TypeDescription;
|
|
||||||
import org.apache.orc.Writer;
|
|
||||||
import org.apache.hudi.common.engine.TaskContextSupplier;
|
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
|
||||||
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
|
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
|
||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
|
||||||
import org.apache.hudi.common.util.AvroOrcUtils;
|
|
||||||
|
|
||||||
public class HoodieOrcWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
|
public class HoodieOrcWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
|
||||||
implements HoodieFileWriter<R>, Closeable {
|
implements HoodieFileWriter<R>, Closeable {
|
||||||
@@ -94,10 +95,10 @@ public class HoodieOrcWriter<T extends HoodieRecordPayload, R extends IndexedRec
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
|
public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException {
|
||||||
prepRecordWithMetadata(avroRecord, record, instantTime,
|
prepRecordWithMetadata(key, avroRecord, instantTime,
|
||||||
taskContextSupplier.getPartitionIdSupplier().get(), RECORD_INDEX, file.getName());
|
taskContextSupplier.getPartitionIdSupplier().get(), RECORD_INDEX, file.getName());
|
||||||
writeAvro(record.getRecordKey(), avroRecord);
|
writeAvro(key.getRecordKey(), avroRecord);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -18,16 +18,15 @@
|
|||||||
|
|
||||||
package org.apache.hudi.io.storage;
|
package org.apache.hudi.io.storage;
|
||||||
|
|
||||||
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hudi.avro.HoodieAvroWriteSupport;
|
import org.apache.hudi.avro.HoodieAvroWriteSupport;
|
||||||
import org.apache.hudi.common.engine.TaskContextSupplier;
|
import org.apache.hudi.common.engine.TaskContextSupplier;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
|
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
|
||||||
import org.apache.avro.generic.IndexedRecord;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.parquet.hadoop.ParquetFileWriter;
|
import org.apache.parquet.hadoop.ParquetFileWriter;
|
||||||
import org.apache.parquet.hadoop.ParquetWriter;
|
import org.apache.parquet.hadoop.ParquetWriter;
|
||||||
|
|
||||||
@@ -84,12 +83,12 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
|
public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException {
|
||||||
if (populateMetaFields) {
|
if (populateMetaFields) {
|
||||||
prepRecordWithMetadata(avroRecord, record, instantTime,
|
prepRecordWithMetadata(key, avroRecord, instantTime,
|
||||||
taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName());
|
taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName());
|
||||||
super.write(avroRecord);
|
super.write(avroRecord);
|
||||||
writeSupport.add(record.getRecordKey());
|
writeSupport.add(key.getRecordKey());
|
||||||
} else {
|
} else {
|
||||||
super.write(avroRecord);
|
super.write(avroRecord);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -154,8 +154,9 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
|
|||||||
record.put("time", Integer.toString(RANDOM.nextInt()));
|
record.put("time", Integer.toString(RANDOM.nextInt()));
|
||||||
record.put("number", i);
|
record.put("number", i);
|
||||||
if (testAvroWithMeta) {
|
if (testAvroWithMeta) {
|
||||||
writer.writeAvroWithMetadata(record, new HoodieAvroRecord(new HoodieKey((String) record.get("_row_key"),
|
// payload does not matter. GenericRecord passed in is what matters
|
||||||
Integer.toString((Integer) record.get("number"))), new EmptyHoodieRecordPayload())); // payload does not matter. GenericRecord passed in is what matters
|
writer.writeAvroWithMetadata(new HoodieAvroRecord(new HoodieKey((String) record.get("_row_key"),
|
||||||
|
Integer.toString((Integer) record.get("number"))), new EmptyHoodieRecordPayload()).getKey(), record);
|
||||||
// only HoodieKey will be looked up from the 2nd arg(HoodieRecord).
|
// only HoodieKey will be looked up from the 2nd arg(HoodieRecord).
|
||||||
} else {
|
} else {
|
||||||
writer.writeAvro(key, record);
|
writer.writeAvro(key, record);
|
||||||
|
|||||||
@@ -21,13 +21,15 @@ package org.apache.hudi.io;
|
|||||||
import org.apache.hudi.client.SparkRDDWriteClient;
|
import org.apache.hudi.client.SparkRDDWriteClient;
|
||||||
import org.apache.hudi.client.WriteStatus;
|
import org.apache.hudi.client.WriteStatus;
|
||||||
import org.apache.hudi.common.config.HoodieCommonConfig;
|
import org.apache.hudi.common.config.HoodieCommonConfig;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.model.BaseFile;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
|
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||||
|
import org.apache.hudi.common.testutils.HoodieTestTable;
|
||||||
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
|
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
|
||||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||||
import org.apache.hudi.config.HoodieIndexConfig;
|
import org.apache.hudi.config.HoodieIndexConfig;
|
||||||
@@ -36,8 +38,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
|
|||||||
import org.apache.hudi.index.HoodieIndex;
|
import org.apache.hudi.index.HoodieIndex;
|
||||||
import org.apache.hudi.testutils.HoodieClientTestHarness;
|
import org.apache.hudi.testutils.HoodieClientTestHarness;
|
||||||
import org.apache.hudi.testutils.HoodieClientTestUtils;
|
import org.apache.hudi.testutils.HoodieClientTestUtils;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.sql.Row;
|
||||||
@@ -51,6 +51,8 @@ import java.nio.file.Paths;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
|
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
|
||||||
@@ -94,7 +96,6 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
|
|||||||
.withProperties(properties)
|
.withProperties(properties)
|
||||||
.build();
|
.build();
|
||||||
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
|
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
|
||||||
FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Write 1 (only inserts) This will do a bulk insert of 44 records of which there are 2 records repeated 21 times
|
* Write 1 (only inserts) This will do a bulk insert of 44 records of which there are 2 records repeated 21 times
|
||||||
@@ -202,6 +203,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
|
|||||||
// Check the entire dataset has 47 records still
|
// Check the entire dataset has 47 records still
|
||||||
dataSet = getRecords();
|
dataSet = getRecords();
|
||||||
assertEquals(47, dataSet.count(), "Must contain 47 records");
|
assertEquals(47, dataSet.count(), "Must contain 47 records");
|
||||||
|
|
||||||
Row[] rows = (Row[]) dataSet.collect();
|
Row[] rows = (Row[]) dataSet.collect();
|
||||||
int record1Count = 0;
|
int record1Count = 0;
|
||||||
int record2Count = 0;
|
int record2Count = 0;
|
||||||
@@ -228,6 +230,22 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
|
|||||||
// Assert that id2 record count which has been updated to rider-004 and driver-004 is 21, which is the total
|
// Assert that id2 record count which has been updated to rider-004 and driver-004 is 21, which is the total
|
||||||
// number of records with row_key id2
|
// number of records with row_key id2
|
||||||
assertEquals(21, record2Count);
|
assertEquals(21, record2Count);
|
||||||
|
|
||||||
|
// Validate that all the records only reference the _latest_ base files as part of the
|
||||||
|
// FILENAME_METADATA_FIELD payload (entailing that corresponding metadata is in-sync with
|
||||||
|
// the state of the table
|
||||||
|
HoodieTableFileSystemView tableView =
|
||||||
|
getHoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline(), HoodieTestTable.of(metaClient).listAllBaseFiles());
|
||||||
|
|
||||||
|
Set<String> latestBaseFileNames = tableView.getLatestBaseFiles()
|
||||||
|
.map(BaseFile::getFileName)
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
|
Set<Object> metadataFilenameFieldRefs = dataSet.collectAsList().stream()
|
||||||
|
.map(row -> row.getAs(HoodieRecord.FILENAME_METADATA_FIELD))
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
|
assertEquals(latestBaseFileNames, metadataFilenameFieldRefs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user