Enabled deletes in merge_on_read
This commit is contained in:
committed by
vinoth chandar
parent
cf1dde0323
commit
616c9a68c3
@@ -16,7 +16,9 @@
|
||||
|
||||
package com.uber.hoodie;
|
||||
|
||||
import com.uber.hoodie.common.HoodieMergeOnReadTestUtils;
|
||||
import com.uber.hoodie.common.HoodieTestDataGenerator;
|
||||
import com.uber.hoodie.common.minicluster.HdfsTestService;
|
||||
import com.uber.hoodie.common.model.HoodieDataFile;
|
||||
import com.uber.hoodie.common.model.HoodieKey;
|
||||
import com.uber.hoodie.common.model.HoodieRecord;
|
||||
@@ -28,33 +30,55 @@ import com.uber.hoodie.common.table.TableFileSystemView;
|
||||
import com.uber.hoodie.common.table.timeline.HoodieInstant;
|
||||
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.common.util.HoodieAvroUtils;
|
||||
import com.uber.hoodie.config.HoodieCompactionConfig;
|
||||
import com.uber.hoodie.config.HoodieIndexConfig;
|
||||
import com.uber.hoodie.config.HoodieStorageConfig;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat;
|
||||
import com.uber.hoodie.index.HoodieIndex;
|
||||
import com.uber.hoodie.io.compact.HoodieCompactor;
|
||||
import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor;
|
||||
import com.uber.hoodie.table.HoodieTable;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.GenericRecordBuilder;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
|
||||
import org.apache.hadoop.io.ArrayWritable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.mapred.InputSplit;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.RecordReader;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static com.uber.hoodie.common.HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat;
|
||||
import static com.uber.hoodie.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
@@ -66,6 +90,37 @@ public class TestMergeOnReadTable {
|
||||
private HoodieCompactor compactor;
|
||||
private FileSystem fs;
|
||||
|
||||
//NOTE : Be careful in using DFS (FileSystem.class) vs LocalFs(RawLocalFileSystem.class)
|
||||
//The implementation and gurantees of many API's differ, for example check rename(src,dst)
|
||||
private static MiniDFSCluster dfsCluster;
|
||||
private static DistributedFileSystem dfs;
|
||||
private static HdfsTestService hdfsTestService;
|
||||
|
||||
@AfterClass
|
||||
public static void cleanUp() throws Exception {
|
||||
if (hdfsTestService != null) {
|
||||
hdfsTestService.stop();
|
||||
dfsCluster.shutdown();;
|
||||
}
|
||||
FSUtils.setFs(null);
|
||||
FileSystem.closeAll();
|
||||
HoodieTestUtils.resetFS();
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpDFS() throws IOException {
|
||||
|
||||
FileSystem.closeAll();
|
||||
if (hdfsTestService == null) {
|
||||
hdfsTestService = new HdfsTestService();
|
||||
dfsCluster = hdfsTestService.start(true);
|
||||
// Create a temp folder as the base path
|
||||
dfs = dfsCluster.getFileSystem();
|
||||
}
|
||||
FSUtils.setFs(dfs);
|
||||
HoodieTestUtils.resetFS();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void init() throws IOException {
|
||||
this.fs = FSUtils.getFs();
|
||||
@@ -73,13 +128,16 @@ public class TestMergeOnReadTable {
|
||||
// Initialize a local spark env
|
||||
SparkConf sparkConf =
|
||||
new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
||||
.setAppName("TestHoodieCompactor").setMaster("local[4]");
|
||||
.setAppName("TestHoodieCompactor").setMaster("local[1]");
|
||||
jsc = new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf));
|
||||
jsc.hadoopConfiguration().addResource(FSUtils.getFs().getConf());
|
||||
|
||||
// Create a temp folder as the base path
|
||||
TemporaryFolder folder = new TemporaryFolder();
|
||||
folder.create();
|
||||
basePath = folder.getRoot().getAbsolutePath();
|
||||
dfs.mkdirs(new Path(basePath));
|
||||
FSUtils.setFs(dfs);
|
||||
HoodieTestUtils.initTableType(basePath, HoodieTableType.MERGE_ON_READ);
|
||||
|
||||
compactor = new HoodieRealtimeTableCompactor();
|
||||
@@ -98,7 +156,6 @@ public class TestMergeOnReadTable {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSimpleInsertAndUpdate() throws Exception {
|
||||
HoodieWriteConfig cfg = getConfig();
|
||||
@@ -182,13 +239,90 @@ public class TestMergeOnReadTable {
|
||||
assertEquals("Must contain 200 records", 200, readClient.readSince("000").count());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleInsertAndDelete() throws Exception {
|
||||
HoodieWriteConfig cfg = getConfig();
|
||||
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
|
||||
|
||||
/**
|
||||
* Write 1 (only inserts, written as parquet file)
|
||||
*/
|
||||
String newCommitTime = "001";
|
||||
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||
|
||||
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath());
|
||||
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg);
|
||||
|
||||
Optional<HoodieInstant> deltaCommit =
|
||||
metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
|
||||
assertTrue(deltaCommit.isPresent());
|
||||
assertEquals("Delta commit should be 001", "001", deltaCommit.get().getTimestamp());
|
||||
|
||||
Optional<HoodieInstant> commit =
|
||||
metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
|
||||
assertFalse(commit.isPresent());
|
||||
|
||||
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
|
||||
TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metaClient,
|
||||
hoodieTable.getCompletedCompactionCommitTimeline(), allFiles);
|
||||
Stream<HoodieDataFile> dataFilesToRead = roView.getLatestDataFiles();
|
||||
assertTrue(!dataFilesToRead.findAny().isPresent());
|
||||
|
||||
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
|
||||
dataFilesToRead = roView.getLatestDataFiles();
|
||||
assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
|
||||
dataFilesToRead.findAny().isPresent());
|
||||
|
||||
/**
|
||||
* Write 2 (only inserts, written to .log file)
|
||||
*/
|
||||
newCommitTime = "002";
|
||||
records = dataGen.generateInserts(newCommitTime, 20);
|
||||
writeRecords = jsc.parallelize(records, 1);
|
||||
statuses = client.upsert(writeRecords, newCommitTime).collect();
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
/**
|
||||
* Write 2 (only deletes, written to .log file)
|
||||
*/
|
||||
newCommitTime = "004";
|
||||
List<HoodieRecord> fewRecordsForDelete = dataGen.generateDeletesFromExistingRecords(records);
|
||||
|
||||
statuses = client.upsert(jsc.parallelize(fewRecordsForDelete, 1), newCommitTime).collect();
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath());
|
||||
deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
|
||||
assertTrue(deltaCommit.isPresent());
|
||||
assertEquals("Latest Delta commit should be 004", "004", deltaCommit.get().getTimestamp());
|
||||
|
||||
commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
|
||||
assertFalse(commit.isPresent());
|
||||
|
||||
allFiles = HoodieTestUtils.listAllDataFilesInPath(fs, cfg.getBasePath());
|
||||
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
|
||||
dataFilesToRead = roView.getLatestDataFiles();
|
||||
assertTrue(dataFilesToRead.findAny().isPresent());
|
||||
|
||||
List<String> dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
|
||||
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles);
|
||||
//Wrote 40 records and deleted 20 records, so remaining 40-20 = 20
|
||||
assertEquals("Must contain 20 records", 20, recordsRead.size());
|
||||
}
|
||||
|
||||
private HoodieWriteConfig getConfig() {
|
||||
return getConfigBuilder().build();
|
||||
}
|
||||
|
||||
private HoodieWriteConfig.Builder getConfigBuilder() {
|
||||
return HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
|
||||
.withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
|
||||
.withCompactionConfig(
|
||||
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
|
||||
.withInlineCompaction(false).build())
|
||||
@@ -204,7 +338,4 @@ public class TestMergeOnReadTable {
|
||||
assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,98 @@
|
||||
/*
|
||||
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.uber.hoodie.common;
|
||||
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.common.util.HoodieAvroUtils;
|
||||
import com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.GenericRecordBuilder;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
|
||||
import org.apache.hadoop.io.ArrayWritable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.mapred.InputSplit;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.RecordReader;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.uber.hoodie.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
|
||||
|
||||
/**
|
||||
* Utility methods to aid in testing MergeOnRead (workaround for HoodieReadClient for MOR)
|
||||
*/
|
||||
public class HoodieMergeOnReadTestUtils {
|
||||
|
||||
public static List<GenericRecord> getRecordsUsingInputFormat(List<String> inputPaths) throws IOException {
|
||||
JobConf jobConf = new JobConf();
|
||||
Schema schema = HoodieAvroUtils.addMetadataFields(Schema.parse(TRIP_EXAMPLE_SCHEMA));
|
||||
HoodieRealtimeInputFormat inputFormat = new HoodieRealtimeInputFormat();
|
||||
setPropsForInputFormat(inputFormat, jobConf, schema);
|
||||
return inputPaths.stream().map(path -> {
|
||||
setInputPath(jobConf, path);
|
||||
List<GenericRecord> records = new ArrayList<>();
|
||||
try {
|
||||
List<InputSplit> splits = Arrays.asList(inputFormat.getSplits(jobConf, 1));
|
||||
RecordReader recordReader = inputFormat.getRecordReader(splits.get(0), jobConf, null);
|
||||
Void key = (Void) recordReader.createKey();
|
||||
ArrayWritable writable = (ArrayWritable) recordReader.createValue();
|
||||
while (recordReader.next(key, writable)) {
|
||||
GenericRecordBuilder newRecord = new GenericRecordBuilder(schema);
|
||||
// writable returns an array with [field1, field2, _hoodie_commit_time, _hoodie_commit_seqno]
|
||||
Writable[] values = writable.get();
|
||||
schema.getFields().forEach(field -> {
|
||||
newRecord.set(field, values[2]);
|
||||
});
|
||||
records.add(newRecord.build());
|
||||
}
|
||||
} catch (IOException ie) {
|
||||
ie.printStackTrace();
|
||||
}
|
||||
return records;
|
||||
}).reduce((a, b) -> {
|
||||
a.addAll(b);
|
||||
return a;
|
||||
}).get();
|
||||
}
|
||||
|
||||
private static void setPropsForInputFormat(HoodieRealtimeInputFormat inputFormat, JobConf jobConf, Schema schema) {
|
||||
List<Schema.Field> fields = schema.getFields();
|
||||
String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(","));
|
||||
String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
|
||||
Configuration conf = FSUtils.getFs().getConf();
|
||||
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
|
||||
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
|
||||
jobConf.set("partition_columns", "datestr");
|
||||
conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
|
||||
conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
|
||||
conf.set("partition_columns", "datestr");
|
||||
inputFormat.setConf(conf);
|
||||
jobConf.addResource(conf);
|
||||
}
|
||||
|
||||
private static void setInputPath(JobConf jobConf, String inputPath) {
|
||||
jobConf.set("mapreduce.input.fileinputformat.inputdir", inputPath);
|
||||
jobConf.set("mapreduce.input.fileinputformat.inputdir", inputPath);
|
||||
jobConf.set("map.input.dir", inputPath);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user