From 17cb5cb43377746763722a8f96741e7d521a702d Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Mon, 21 Feb 2022 07:55:30 -0500 Subject: [PATCH] [HUDI-3432] Fixing restore with metadata enabled (#4849) * Fixing restore with metadata enabled * Fixing test failures --- .../TestHoodieClientOnCopyOnWriteStorage.java | 2 ++ hudi-common/src/main/avro/HoodieMetadata.avsc | 6 +++-- .../apache/hudi/HoodieMergeOnReadRDD.scala | 22 ++++++++++++------- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index a1d7569a1..6ab33b422 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -577,6 +577,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { TimelineLayoutVersion.CURR_VERSION).build(); client = getHoodieWriteClient(newConfig); + client.savepoint("004", "user1","comment1"); + client.restoreToInstant("004"); assertFalse(metaClient.reloadActiveTimeline().getRollbackTimeline().lastInstant().isPresent()); diff --git a/hudi-common/src/main/avro/HoodieMetadata.avsc b/hudi-common/src/main/avro/HoodieMetadata.avsc index 6b18f0333..4037dd0f1 100644 --- a/hudi-common/src/main/avro/HoodieMetadata.avsc +++ b/hudi-common/src/main/avro/HoodieMetadata.avsc @@ -88,7 +88,8 @@ } ] } - ] + ], + "default" : null }, { "doc": "Metadata Index of column statistics for all data files in the user table", @@ -163,7 +164,8 @@ } ] } - ] + ], + "default" : null } ] } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index 4c3d30bd4..8903ee6a4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -20,9 +20,8 @@ package org.apache.hudi import org.apache.avro.Schema import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} - import org.apache.hadoop.conf.Configuration - +import org.apache.hadoop.fs.Path import org.apache.hudi.HoodieDataSourceHelper._ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner @@ -30,7 +29,6 @@ import org.apache.hudi.config.HoodiePayloadConfig import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.config.HoodieRealtimeConfig import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS - import org.apache.spark.rdd.RDD import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer} import org.apache.spark.sql.catalyst.InternalRow @@ -40,7 +38,6 @@ import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskCont import java.io.Closeable import java.util.Properties - import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Try @@ -135,6 +132,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala private var recordToLoad: InternalRow = _ + override def hasNext: Boolean = { if (logRecordsKeyIterator.hasNext) { val curAvrokey = logRecordsKeyIterator.next() @@ -192,7 +190,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, } else { if (logRecordsKeyIterator.hasNext) { val curAvrokey = logRecordsKeyIterator.next() - val curAvroRecord =logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps) + val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps) if (!curAvroRecord.isPresent) { // delete record found, skipping this.hasNext @@ -318,14 +316,19 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, historyAvroRecord, tableAvroSchema, payloadProps) } } - } +} private object HoodieMergeOnReadRDD { val CONFIG_INSTANTIATION_LOCK = new Object() def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: Configuration): HoodieMergedLogRecordScanner = { val fs = FSUtils.getFs(split.tablePath, config) - HoodieMergedLogRecordScanner.newBuilder() + val partitionPath: String = if (split.logPaths.isEmpty || split.logPaths.get.asJava.isEmpty) { + null + } else { + new Path(split.logPaths.get.asJava.get(0)).getParent.getName + } + val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(fs) .withBasePath(split.tablePath) .withLogFilePaths(split.logPaths.get.asJava) @@ -343,6 +346,9 @@ private object HoodieMergeOnReadRDD { .withSpillableMapBasePath( config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) - .build() + if (partitionPath != null) { + logRecordScannerBuilder.withPartition(partitionPath) + } + logRecordScannerBuilder.build() } }