diff --git a/docker/compose/docker-compose_hadoop284_hive233_spark244.yml b/docker/compose/docker-compose_hadoop284_hive233_spark244.yml index 086004f12..b8217fc0d 100644 --- a/docker/compose/docker-compose_hadoop284_hive233_spark244.yml +++ b/docker/compose/docker-compose_hadoop284_hive233_spark244.yml @@ -26,6 +26,8 @@ services: ports: - "50070:50070" - "8020:8020" + # JVM debugging port (will be mapped to a random port on host) + - "5005" env_file: - ./hadoop.env healthcheck: @@ -45,6 +47,8 @@ services: ports: - "50075:50075" - "50010:50010" + # JVM debugging port (will be mapped to a random port on host) + - "5005" links: - "namenode" - "historyserver" @@ -99,6 +103,8 @@ services: SERVICE_PRECONDITION: "namenode:50070 hive-metastore-postgresql:5432" ports: - "9083:9083" + # JVM debugging port (will be mapped to a random port on host) + - "5005" healthcheck: test: ["CMD", "nc", "-z", "hivemetastore", "9083"] interval: 30s @@ -118,6 +124,8 @@ services: SERVICE_PRECONDITION: "hivemetastore:9083" ports: - "10000:10000" + # JVM debugging port (will be mapped to a random port on host) + - "5005" depends_on: - "hivemetastore" links: @@ -136,6 +144,8 @@ services: ports: - "8080:8080" - "7077:7077" + # JVM debugging port (will be mapped to a random port on host) + - "5005" environment: - INIT_DAEMON_STEP=setup_spark links: @@ -154,6 +164,8 @@ services: - sparkmaster ports: - "8081:8081" + # JVM debugging port (will be mapped to a random port on host) + - "5005" environment: - "SPARK_MASTER=spark://sparkmaster:7077" links: @@ -167,7 +179,7 @@ services: hostname: zookeeper container_name: zookeeper ports: - - '2181:2181' + - "2181:2181" environment: - ALLOW_ANONYMOUS_LOGIN=yes @@ -176,7 +188,7 @@ services: hostname: kafkabroker container_name: kafkabroker ports: - - '9092:9092' + - "9092:9092" environment: - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes @@ -186,7 +198,9 @@ services: hostname: presto-coordinator-1 image: apachehudi/hudi-hadoop_2.8.4-prestobase_0.271:latest ports: - - '8090:8090' + - "8090:8090" + # JVM debugging port (will be mapped to a random port on host) + - "5005" environment: - PRESTO_JVM_MAX_HEAP=512M - PRESTO_QUERY_MAX_MEMORY=1GB @@ -226,7 +240,9 @@ services: hostname: trino-coordinator-1 image: apachehudi/hudi-hadoop_2.8.4-trinocoordinator_368:latest ports: - - '8091:8091' + - "8091:8091" + # JVM debugging port (will be mapped to a random port on host) + - "5005" links: - "hivemetastore" volumes: @@ -239,7 +255,9 @@ services: image: apachehudi/hudi-hadoop_2.8.4-trinoworker_368:latest depends_on: [ "trino-coordinator-1" ] ports: - - '8092:8092' + - "8092:8092" + # JVM debugging port (will be mapped to a random port on host) + - "5005" links: - "hivemetastore" - "hiveserver" @@ -268,6 +286,8 @@ services: - sparkmaster ports: - '4040:4040' + # JVM debugging port (mapped to 5006 on the host) + - "5006:5005" environment: - "SPARK_MASTER=spark://sparkmaster:7077" links: @@ -286,6 +306,9 @@ services: container_name: adhoc-2 env_file: - ./hadoop.env + ports: + # JVM debugging port (mapped to 5005 on the host) + - "5005:5005" depends_on: - sparkmaster environment: diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java index c33c0f08c..022f600b5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java @@ -20,6 +20,7 @@ package org.apache.hudi.io; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -93,7 +94,8 @@ public class HoodieConcatHandle extends public void write(GenericRecord oldRecord) { String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt); try { - fileWriter.writeAvro(key, oldRecord); + // NOTE: We're enforcing preservation of the record metadata to keep existing semantic + writeToFile(new HoodieKey(key, partitionPath), oldRecord, true); } catch (IOException | RuntimeException e) { String errMsg = String.format("Failed to write old record into new file for key %s from old file %s to new file %s with writerSchema %s", key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true)); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 91a7622bf..41d583668 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -142,7 +142,7 @@ public class HoodieCreateHandle extends fileWriter.writeAvro(record.getRecordKey(), rewriteRecordWithMetadata((GenericRecord) avroRecord.get(), path.getName())); } else { - fileWriter.writeAvroWithMetadata(rewriteRecord((GenericRecord) avroRecord.get()), record); + fileWriter.writeAvroWithMetadata(record.getKey(), rewriteRecord((GenericRecord) avroRecord.get())); } // update the new location of record, so we know where to find it next record.unseal(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 06e752f59..3363571dd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; @@ -292,13 +293,7 @@ public class HoodieMergeHandle extends H } try { if (indexedRecord.isPresent() && !isDelete) { - // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema - if (preserveMetadata && useWriterSchemaForCompaction) { // useWriteSchema will be true only in case of compaction. - fileWriter.writeAvro(hoodieRecord.getRecordKey(), - rewriteRecordWithMetadata((GenericRecord) indexedRecord.get(), newFilePath.getName())); - } else { - fileWriter.writeAvroWithMetadata(rewriteRecord((GenericRecord) indexedRecord.get()), hoodieRecord); - } + writeToFile(hoodieRecord.getKey(), (GenericRecord) indexedRecord.get(), preserveMetadata && useWriterSchemaForCompaction); recordsWritten++; } else { recordsDeleted++; @@ -352,14 +347,9 @@ public class HoodieMergeHandle extends H } if (copyOldRecord) { - // this should work as it is, since this is an existing record try { - // rewrite file names - // do not preserve FILENAME_METADATA_FIELD - if (preserveMetadata && useWriterSchemaForCompaction) { - oldRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, newFilePath.getName()); - } - fileWriter.writeAvro(key, oldRecord); + // NOTE: We're enforcing preservation of the record metadata to keep existing semantic + writeToFile(new HoodieKey(key, partitionPath), oldRecord, true); } catch (IOException | RuntimeException e) { String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s", key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true)); @@ -370,6 +360,16 @@ public class HoodieMergeHandle extends H } } + protected void writeToFile(HoodieKey key, GenericRecord avroRecord, boolean shouldPreserveRecordMetadata) throws IOException { + if (shouldPreserveRecordMetadata) { + // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the + // file holding this record even in cases when overall metadata is preserved + fileWriter.writeAvro(key.getRecordKey(), rewriteRecordWithMetadata(avroRecord, newFilePath.getName())); + } else { + fileWriter.writeAvroWithMetadata(key, rewriteRecord(avroRecord)); + } + } + protected void writeIncomingRecords() throws IOException { // write out any pending records (this can happen when inserts are turned into updates) Iterator> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java index 931b08c2f..d6c1d1be4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java @@ -47,7 +47,7 @@ import java.util.Queue; */ public class HoodieSortedMergeHandle extends HoodieMergeHandle { - private Queue newRecordKeysSorted = new PriorityQueue<>(); + private final Queue newRecordKeysSorted = new PriorityQueue<>(); public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Iterator> recordItr, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java index 9f749566b..1d1dd5c9b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java @@ -21,6 +21,7 @@ package org.apache.hudi.io.storage; import java.util.concurrent.atomic.AtomicLong; import org.apache.avro.generic.GenericRecord; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.avro.generic.IndexedRecord; @@ -29,7 +30,7 @@ import java.io.IOException; public interface HoodieFileWriter { - void writeAvroWithMetadata(R newRecord, HoodieRecord record) throws IOException; + void writeAvroWithMetadata(HoodieKey key, R newRecord) throws IOException; boolean canWrite(); @@ -37,9 +38,9 @@ public interface HoodieFileWriter { void writeAvro(String key, R oldRecord) throws IOException; - default void prepRecordWithMetadata(R avroRecord, HoodieRecord record, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) { + default void prepRecordWithMetadata(HoodieKey key, R avroRecord, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) { String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex.getAndIncrement()); - HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName); + HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, key.getRecordKey(), key.getPartitionPath(), fileName); HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId); return; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java index 1642eb2c4..91f79cefa 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java @@ -18,16 +18,6 @@ package org.apache.hudi.io.storage; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.common.bloom.BloomFilter; -import org.apache.hudi.common.engine.TaskContextSupplier; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.fs.HoodieWrapperFileSystem; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; - import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; @@ -40,6 +30,15 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.io.Writable; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import java.io.DataInput; import java.io.DataOutput; @@ -111,13 +110,13 @@ public class HoodieHFileWriter implements HoodieFileWriter, Closeable { @@ -94,10 +95,10 @@ public class HoodieOrcWriter latestBaseFileNames = tableView.getLatestBaseFiles() + .map(BaseFile::getFileName) + .collect(Collectors.toSet()); + + Set metadataFilenameFieldRefs = dataSet.collectAsList().stream() + .map(row -> row.getAs(HoodieRecord.FILENAME_METADATA_FIELD)) + .collect(Collectors.toSet()); + + assertEquals(latestBaseFileNames, metadataFilenameFieldRefs); } }