[HUDI-2105] Compaction Failed For MergeInto MOR Table (#3190)
This commit is contained in:
@@ -39,6 +39,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
|
|||||||
import org.apache.hudi.common.model.HoodieTableType;
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
|
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
|
import org.apache.hudi.common.table.TableSchemaResolver;
|
||||||
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
|
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
|
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
|
||||||
@@ -94,6 +95,18 @@ public class HoodieSparkMergeOnReadTableCompactor<T extends HoodieRecordPayload>
|
|||||||
return jsc.emptyRDD();
|
return jsc.emptyRDD();
|
||||||
}
|
}
|
||||||
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
|
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
|
||||||
|
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
|
||||||
|
|
||||||
|
// Here we firstly use the table schema as the reader schema to read
|
||||||
|
// log file.That is because in the case of MergeInto, the config.getSchema may not
|
||||||
|
// the same with the table schema.
|
||||||
|
try {
|
||||||
|
Schema readerSchema = schemaUtil.getTableAvroSchema(false);
|
||||||
|
config.setSchema(readerSchema.toString());
|
||||||
|
} catch (Exception e) {
|
||||||
|
// If there is no commit in the table, just ignore the exception.
|
||||||
|
}
|
||||||
|
|
||||||
// Compacting is very similar to applying updates to existing file
|
// Compacting is very similar to applying updates to existing file
|
||||||
HoodieSparkCopyOnWriteTable table = new HoodieSparkCopyOnWriteTable(config, context, metaClient);
|
HoodieSparkCopyOnWriteTable table = new HoodieSparkCopyOnWriteTable(config, context, metaClient);
|
||||||
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
|
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
|
||||||
@@ -108,7 +121,6 @@ public class HoodieSparkMergeOnReadTableCompactor<T extends HoodieRecordPayload>
|
|||||||
private List<WriteStatus> compact(HoodieSparkCopyOnWriteTable hoodieCopyOnWriteTable, HoodieTableMetaClient metaClient,
|
private List<WriteStatus> compact(HoodieSparkCopyOnWriteTable hoodieCopyOnWriteTable, HoodieTableMetaClient metaClient,
|
||||||
HoodieWriteConfig config, CompactionOperation operation, String instantTime) throws IOException {
|
HoodieWriteConfig config, CompactionOperation operation, String instantTime) throws IOException {
|
||||||
FileSystem fs = metaClient.getFs();
|
FileSystem fs = metaClient.getFs();
|
||||||
|
|
||||||
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
|
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
|
||||||
LOG.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames()
|
LOG.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames()
|
||||||
+ " for commit " + instantTime);
|
+ " for commit " + instantTime);
|
||||||
|
|||||||
@@ -575,7 +575,6 @@ class TestMergeIntoTable extends TestHoodieSqlBase {
|
|||||||
checkAnswer(s"select id, name, price from $tableName")(
|
checkAnswer(s"select id, name, price from $tableName")(
|
||||||
Seq(1, "a1", 10.0)
|
Seq(1, "a1", 10.0)
|
||||||
)
|
)
|
||||||
|
|
||||||
spark.sql(
|
spark.sql(
|
||||||
s"""
|
s"""
|
||||||
| merge into $tableName
|
| merge into $tableName
|
||||||
@@ -593,4 +592,54 @@ class TestMergeIntoTable extends TestHoodieSqlBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("Test MergeInto For MOR With Compaction On") {
|
||||||
|
withTempDir { tmp =>
|
||||||
|
val tableName = generateTableName
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
|create table $tableName (
|
||||||
|
| id int,
|
||||||
|
| name string,
|
||||||
|
| price double,
|
||||||
|
| ts long
|
||||||
|
|) using hudi
|
||||||
|
| location '${tmp.getCanonicalPath}'
|
||||||
|
| options (
|
||||||
|
| primaryKey ='id',
|
||||||
|
| type = 'mor',
|
||||||
|
| preCombineField = 'ts',
|
||||||
|
| hoodie.compact.inline = 'true'
|
||||||
|
| )
|
||||||
|
""".stripMargin)
|
||||||
|
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
|
||||||
|
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
|
||||||
|
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
|
||||||
|
spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
|
||||||
|
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
|
||||||
|
Seq(1, "a1", 10.0, 1000),
|
||||||
|
Seq(2, "a2", 10.0, 1000),
|
||||||
|
Seq(3, "a3", 10.0, 1000),
|
||||||
|
Seq(4, "a4",10.0, 1000)
|
||||||
|
)
|
||||||
|
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
|merge into $tableName h0
|
||||||
|
|using (
|
||||||
|
| select 4 as id, 'a4' as name, 11 as price, 1000 as ts
|
||||||
|
| ) s0
|
||||||
|
| on h0.id = s0.id
|
||||||
|
| when matched then update set *
|
||||||
|
|""".stripMargin)
|
||||||
|
|
||||||
|
// 5 commits will trigger compaction.
|
||||||
|
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
|
||||||
|
Seq(1, "a1", 10.0, 1000),
|
||||||
|
Seq(2, "a2", 10.0, 1000),
|
||||||
|
Seq(3, "a3", 10.0, 1000),
|
||||||
|
Seq(4, "a4", 11.0, 1000)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user