diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java index b1c5531a2..49cc25b89 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java @@ -67,7 +67,6 @@ import java.util.stream.Collectors; import scala.Tuple2; import scala.Tuple3; -import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath; import static org.apache.hudi.common.util.ValidationUtils.checkArgument; /** @@ -221,7 +220,6 @@ public class HoodieLogFileCommand implements CommandMarker { .withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue()) .withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()) .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()) - .withPartition(getRelativePartitionPath(new Path(client.getBasePath()), new Path(logFilePaths.get(0)).getParent())) .build(); for (HoodieRecord hoodieRecord : scanner) { Option record = hoodieRecord.getData().getInsertValue(readerSchema); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java index 621061ae7..ee7fbda11 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java @@ -65,7 +65,6 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath; import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -204,7 +203,6 @@ public class TestHoodieLogFileCommand extends CLIFunctionalTestHarness { // get expected result of 10 records. List logFilePaths = Arrays.stream(fs.globStatus(new Path(partitionPath + "/*"))) .map(status -> status.getPath().toString()).collect(Collectors.toList()); - assertTrue(logFilePaths.size() > 0); HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(fs) .withBasePath(tablePath) @@ -223,7 +221,6 @@ public class TestHoodieLogFileCommand extends CLIFunctionalTestHarness { .withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue()) .withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()) .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()) - .withPartition(getRelativePartitionPath(new Path(tablePath), new Path(logFilePaths.get(0)).getParent())) .build(); Iterator> records = scanner.iterator(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index ed1873644..e3d8554d0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.HoodieRecordSizeEstimator; import org.apache.hudi.common.util.HoodieTimer; @@ -37,6 +38,8 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; import org.apache.hudi.internal.schema.InternalSchema; + +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -45,6 +48,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath; import static org.apache.hudi.common.util.ValidationUtils.checkState; /** @@ -310,6 +314,9 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader @Override public HoodieMergedLogRecordScanner build() { + if (this.partitionName == null && CollectionUtils.nonEmpty(this.logFilePaths)) { + this.partitionName = getRelativePartitionPath(new Path(basePath), new Path(this.logFilePaths.get(0)).getParent()); + } return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader, bufferSize, spillableMapBasePath, instantRange, diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 536fec609..4fa53bb41 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -89,7 +89,6 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath; import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -575,13 +574,12 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { writer.close(); FileCreateUtils.createDeltaCommit(basePath, "100", fs); // scan all log blocks (across multiple log files) - List logFilePaths = logFiles.stream() - .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()); - assertTrue(logFilePaths.size() > 0); HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(fs) .withBasePath(basePath) - .withLogFilePaths(logFilePaths) + .withLogFilePaths( + logFiles.stream() + .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList())) .withReaderSchema(schema) .withLatestInstantTime("100") .withMaxMemorySizeInBytes(10240L) @@ -591,7 +589,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withPartition(getRelativePartitionPath(new Path(basePath), new Path(logFilePaths.get(0)).getParent())) .build(); List scannedRecords = new ArrayList<>(); @@ -806,7 +803,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); assertEquals(200, scanner.getTotalLogRecords()); Set readKeys = new HashSet<>(200); @@ -885,7 +881,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); assertEquals(200, scanner.getTotalLogRecords(), "We read 200 records from 2 write batches"); Set readKeys = new HashSet<>(200); @@ -973,7 +968,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); assertEquals(200, scanner.getTotalLogRecords(), "We would read 200 records"); Set readKeys = new HashSet<>(200); @@ -1052,7 +1046,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 records"); @@ -1099,7 +1092,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); assertEquals(200, readKeys.size(), "Stream collect should return all 200 records after rollback of delete"); @@ -1195,7 +1187,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 records"); @@ -1299,7 +1290,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); assertEquals(0, scanner.getTotalLogRecords(), "We would have scanned 0 records because of rollback"); @@ -1368,7 +1358,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records"); FileCreateUtils.deleteDeltaCommit(basePath, "100", fs); @@ -1420,7 +1409,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); assertEquals(100, scanner.getTotalLogRecords(), "We still would read 100 records"); final List readKeys = new ArrayList<>(100); @@ -1491,7 +1479,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records"); } @@ -1598,7 +1585,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records"); FileCreateUtils.deleteDeltaCommit(basePath, "100", fs); @@ -1673,7 +1659,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); assertEquals(Math.max(numRecordsInLog1, numRecordsInLog2), scanner.getNumMergedRecordsInLog(), diff --git a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java index 97a682c3a..67691a3ec 100644 --- a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java +++ b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java @@ -57,8 +57,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import static junit.framework.TestCase.assertEquals; -import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath; -import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -348,7 +346,7 @@ public class TestQuickstartData { List logPaths, Schema readSchema, String instant) { - HoodieMergedLogRecordScanner.Builder logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder() + return HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(fs) .withBasePath(basePath) .withLogFilePaths(logPaths) @@ -360,12 +358,8 @@ public class TestQuickstartData { .withMaxMemorySizeInBytes(1024 * 1024L) .withSpillableMapBasePath("/tmp/") .withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()) - .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()); - if (!isNullOrEmpty(logPaths)) { - logRecordScannerBuilder - .withPartition(getRelativePartitionPath(new Path(basePath), new Path(logPaths.get(0)).getParent())); - } - return logRecordScannerBuilder.build(); + .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()) + .build(); } /** diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index f01993edc..fce9b75f7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.format; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; @@ -42,7 +43,6 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.types.RowKind; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import java.util.ArrayList; import java.util.Arrays; @@ -52,10 +52,6 @@ import java.util.Locale; import java.util.Map; import java.util.function.Function; -import static org.apache.hudi.common.fs.FSUtils.getFs; -import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath; -import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty; - /** * Utilities for format. */ @@ -128,13 +124,11 @@ public class FormatUtils { Schema logSchema, Configuration config, boolean withOperationField) { - String basePath = split.getTablePath(); - List logPaths = split.getLogPaths().get(); - FileSystem fs = getFs(basePath, config); - HoodieMergedLogRecordScanner.Builder logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder() + FileSystem fs = FSUtils.getFs(split.getTablePath(), config); + return HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(fs) - .withBasePath(basePath) - .withLogFilePaths(logPaths) + .withBasePath(split.getTablePath()) + .withLogFilePaths(split.getLogPaths().get()) .withReaderSchema(logSchema) .withLatestInstantTime(split.getLatestCommit()) .withReadBlocksLazily( @@ -150,12 +144,8 @@ public class FormatUtils { config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) .withInstantRange(split.getInstantRange()) - .withOperationField(withOperationField); - if (!isNullOrEmpty(logPaths)) { - logRecordScannerBuilder - .withPartition(getRelativePartitionPath(new Path(basePath), new Path(logPaths.get(0)).getParent())); - } - return logRecordScannerBuilder.build(); + .withOperationField(withOperationField) + .build(); } private static HoodieUnMergedLogRecordScanner unMergedLogScanner( @@ -163,7 +153,7 @@ public class FormatUtils { Schema logSchema, Configuration config, HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback) { - FileSystem fs = getFs(split.getTablePath(), config); + FileSystem fs = FSUtils.getFs(split.getTablePath(), config); return HoodieUnMergedLogRecordScanner.newBuilder() .withFileSystem(fs) .withBasePath(split.getTablePath()) @@ -244,8 +234,8 @@ public class FormatUtils { HoodieWriteConfig writeConfig, Configuration hadoopConf) { String basePath = writeConfig.getBasePath(); - HoodieMergedLogRecordScanner.Builder logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder() - .withFileSystem(getFs(basePath, hadoopConf)) + return HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(FSUtils.getFs(basePath, hadoopConf)) .withBasePath(basePath) .withLogFilePaths(logPaths) .withReaderSchema(logSchema) @@ -256,12 +246,8 @@ public class FormatUtils { .withMaxMemorySizeInBytes(writeConfig.getMaxMemoryPerPartitionMerge()) .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath()) .withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType()) - .withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled()); - if (!isNullOrEmpty(logPaths)) { - logRecordScannerBuilder - .withPartition(getRelativePartitionPath(new Path(basePath), new Path(logPaths.get(0)).getParent())); - } - return logRecordScannerBuilder.build(); + .withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) + .build(); } private static Boolean string2Boolean(String s) { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index f2439b447..c1e924056 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -67,8 +67,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import static junit.framework.TestCase.assertEquals; -import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath; -import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -663,7 +661,7 @@ public class TestData { List logPaths, Schema readSchema, String instant) { - HoodieMergedLogRecordScanner.Builder logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder() + return HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(fs) .withBasePath(basePath) .withLogFilePaths(logPaths) @@ -675,12 +673,8 @@ public class TestData { .withMaxMemorySizeInBytes(1024 * 1024L) .withSpillableMapBasePath("/tmp/") .withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()) - .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()); - if (!isNullOrEmpty(logPaths)) { - logRecordScannerBuilder - .withPartition(getRelativePartitionPath(new Path(basePath), new Path(logPaths.get(0)).getParent())); - } - return logRecordScannerBuilder.build(); + .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()) + .build(); } /** diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index 9618f5f7c..b917f004b 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -18,6 +18,12 @@ package org.apache.hudi.hadoop.realtime; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.fs.FSUtils; @@ -29,27 +35,15 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; - -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; import java.util.HashSet; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Set; -import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath; -import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty; - class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader implements RecordReader { @@ -83,11 +77,10 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader // NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit // but can return records for completed commits > the commit we are trying to read (if using // readCommit() API) - List logPaths = split.getDeltaLogPaths(); - HoodieMergedLogRecordScanner.Builder logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder() + return HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(FSUtils.getFs(split.getPath().toString(), jobConf)) .withBasePath(split.getBasePath()) - .withLogFilePaths(logPaths) + .withLogFilePaths(split.getDeltaLogPaths()) .withReaderSchema(usesCustomPayload ? getWriterSchema() : getReaderSchema()) .withLatestInstantTime(split.getMaxCommitTime()) .withMaxMemorySizeInBytes(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf)) @@ -97,12 +90,8 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader .withSpillableMapBasePath(jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) .withDiskMapType(jobConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())) .withBitCaskDiskMapCompressionEnabled(jobConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), - HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())); - if (!isNullOrEmpty(logPaths)) { - logRecordScannerBuilder - .withPartition(getRelativePartitionPath(new Path(split.getBasePath()), new Path(logPaths.get(0)).getParent())); - } - return logRecordScannerBuilder.build(); + HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())) + .build(); } private Option buildGenericRecordwithCustomPayload(HoodieRecord record) throws IOException {