1
0

[HUDI-3432] Fixing restore with metadata enabled (#4849)

* Fixing restore with metadata enabled

* Fixing test failures
This commit is contained in:
Sivabalan Narayanan
2022-02-21 07:55:30 -05:00
committed by GitHub
parent 76b6ad6491
commit 17cb5cb433
3 changed files with 20 additions and 10 deletions

View File

@@ -577,6 +577,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
TimelineLayoutVersion.CURR_VERSION).build();
client = getHoodieWriteClient(newConfig);
client.savepoint("004", "user1","comment1");
client.restoreToInstant("004");
assertFalse(metaClient.reloadActiveTimeline().getRollbackTimeline().lastInstant().isPresent());

View File

@@ -88,7 +88,8 @@
}
]
}
]
],
"default" : null
},
{
"doc": "Metadata Index of column statistics for all data files in the user table",
@@ -163,7 +164,8 @@
}
]
}
]
],
"default" : null
}
]
}

View File

@@ -20,9 +20,8 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.HoodieDataSourceHelper._
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
@@ -30,7 +29,6 @@ import org.apache.hudi.config.HoodiePayloadConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.InternalRow
@@ -40,7 +38,6 @@ import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskCont
import java.io.Closeable
import java.util.Properties
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Try
@@ -135,6 +132,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
private var recordToLoad: InternalRow = _
override def hasNext: Boolean = {
if (logRecordsKeyIterator.hasNext) {
val curAvrokey = logRecordsKeyIterator.next()
@@ -192,7 +190,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
} else {
if (logRecordsKeyIterator.hasNext) {
val curAvrokey = logRecordsKeyIterator.next()
val curAvroRecord =logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
if (!curAvroRecord.isPresent) {
// delete record found, skipping
this.hasNext
@@ -318,14 +316,19 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
historyAvroRecord, tableAvroSchema, payloadProps)
}
}
}
}
private object HoodieMergeOnReadRDD {
val CONFIG_INSTANTIATION_LOCK = new Object()
def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: Configuration): HoodieMergedLogRecordScanner = {
val fs = FSUtils.getFs(split.tablePath, config)
HoodieMergedLogRecordScanner.newBuilder()
val partitionPath: String = if (split.logPaths.isEmpty || split.logPaths.get.asJava.isEmpty) {
null
} else {
new Path(split.logPaths.get.asJava.get(0)).getParent.getName
}
val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(split.tablePath)
.withLogFilePaths(split.logPaths.get.asJava)
@@ -343,6 +346,9 @@ private object HoodieMergeOnReadRDD {
.withSpillableMapBasePath(
config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
.build()
if (partitionPath != null) {
logRecordScannerBuilder.withPartition(partitionPath)
}
logRecordScannerBuilder.build()
}
}