From 616c9a68c3168d927f08117014949df2f1a859ec Mon Sep 17 00:00:00 2001 From: Nishith Agarwal Date: Fri, 7 Jul 2017 12:50:01 -0700 Subject: [PATCH] Enabled deletes in merge_on_read --- hoodie-client/pom.xml | 11 ++ .../uber/hoodie/io/HoodieAppendHandle.java | 27 +++- .../com/uber/hoodie/TestMergeOnReadTable.java | 143 +++++++++++++++++- .../common/HoodieMergeOnReadTestUtils.java | 98 ++++++++++++ .../uber/hoodie/common/util/ParquetUtils.java | 5 +- .../hoodie/common/model/HoodieTestUtils.java | 4 + 6 files changed, 275 insertions(+), 13 deletions(-) create mode 100644 hoodie-client/src/test/java/com/uber/hoodie/common/HoodieMergeOnReadTestUtils.java diff --git a/hoodie-client/pom.xml b/hoodie-client/pom.xml index 7712a2dc5..9b2a28318 100644 --- a/hoodie-client/pom.xml +++ b/hoodie-client/pom.xml @@ -177,6 +177,17 @@ 1.10.19 test + + com.uber.hoodie + hoodie-hadoop-mr + 0.4.0-SNAPSHOT + test + + + org.apache.hive + hive-exec + test + diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index be7c7966a..914ff6f48 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -19,32 +19,36 @@ package com.uber.hoodie.io; import com.clearspring.analytics.util.Lists; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieDeltaWriteStat; +import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.table.log.HoodieLogFormat; import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer; import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; -import com.uber.hoodie.common.model.HoodieLogFile; +import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieAppendException; import com.uber.hoodie.exception.HoodieUpsertException; import com.uber.hoodie.table.HoodieTable; -import java.util.stream.Collectors; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.TaskContext; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; /** * IO Operation to append data onto an existing file. @@ -151,11 +155,22 @@ public class HoodieAppendHandle extends HoodieIOH } public void doAppend() { - List recordItr = - records.stream().map(this::getIndexedRecord).filter(Optional::isPresent) - .map(Optional::get).collect(Collectors.toList()); + + List recordList = new ArrayList<>(); + List keysToDelete = new ArrayList<>(); + records.stream().forEach(record -> { + Optional indexedRecord = getIndexedRecord(record); + if(indexedRecord.isPresent()) { + recordList.add(indexedRecord.get()); + } else { + keysToDelete.add(record.getRecordKey()); + } + }); try { - writer = writer.appendBlock(new HoodieAvroDataBlock(recordItr, schema)); + writer = writer.appendBlock(new HoodieAvroDataBlock(recordList, schema)); + if(keysToDelete.size() > 0) { + writer = writer.appendBlock(new HoodieDeleteBlock(keysToDelete.stream().toArray(String[]::new))); + } } catch (Exception e) { throw new HoodieAppendException( "Failed while appeding records to " + currentLogFile.getPath(), e); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java index adf5c3a7e..63ac22f3d 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java @@ -16,7 +16,9 @@ package com.uber.hoodie; +import com.uber.hoodie.common.HoodieMergeOnReadTestUtils; import com.uber.hoodie.common.HoodieTestDataGenerator; +import com.uber.hoodie.common.minicluster.HdfsTestService; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; @@ -28,33 +30,55 @@ import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieStorageConfig; import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.io.compact.HoodieCompactor; import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; import com.uber.hoodie.table.HoodieTable; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import java.util.stream.Stream; +import static com.uber.hoodie.common.HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat; +import static com.uber.hoodie.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -66,6 +90,37 @@ public class TestMergeOnReadTable { private HoodieCompactor compactor; private FileSystem fs; + //NOTE : Be careful in using DFS (FileSystem.class) vs LocalFs(RawLocalFileSystem.class) + //The implementation and gurantees of many API's differ, for example check rename(src,dst) + private static MiniDFSCluster dfsCluster; + private static DistributedFileSystem dfs; + private static HdfsTestService hdfsTestService; + + @AfterClass + public static void cleanUp() throws Exception { + if (hdfsTestService != null) { + hdfsTestService.stop(); + dfsCluster.shutdown();; + } + FSUtils.setFs(null); + FileSystem.closeAll(); + HoodieTestUtils.resetFS(); + } + + @BeforeClass + public static void setUpDFS() throws IOException { + + FileSystem.closeAll(); + if (hdfsTestService == null) { + hdfsTestService = new HdfsTestService(); + dfsCluster = hdfsTestService.start(true); + // Create a temp folder as the base path + dfs = dfsCluster.getFileSystem(); + } + FSUtils.setFs(dfs); + HoodieTestUtils.resetFS(); + } + @Before public void init() throws IOException { this.fs = FSUtils.getFs(); @@ -73,13 +128,16 @@ public class TestMergeOnReadTable { // Initialize a local spark env SparkConf sparkConf = new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .setAppName("TestHoodieCompactor").setMaster("local[4]"); + .setAppName("TestHoodieCompactor").setMaster("local[1]"); jsc = new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf)); + jsc.hadoopConfiguration().addResource(FSUtils.getFs().getConf()); // Create a temp folder as the base path TemporaryFolder folder = new TemporaryFolder(); folder.create(); basePath = folder.getRoot().getAbsolutePath(); + dfs.mkdirs(new Path(basePath)); + FSUtils.setFs(dfs); HoodieTestUtils.initTableType(basePath, HoodieTableType.MERGE_ON_READ); compactor = new HoodieRealtimeTableCompactor(); @@ -98,7 +156,6 @@ public class TestMergeOnReadTable { } } - @Test public void testSimpleInsertAndUpdate() throws Exception { HoodieWriteConfig cfg = getConfig(); @@ -182,13 +239,90 @@ public class TestMergeOnReadTable { assertEquals("Must contain 200 records", 200, readClient.readSince("000").count()); } + @Test + public void testSimpleInsertAndDelete() throws Exception { + HoodieWriteConfig cfg = getConfig(); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + + /** + * Write 1 (only inserts, written as parquet file) + */ + String newCommitTime = "001"; + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + List records = dataGen.generateInserts(newCommitTime, 20); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + List statuses = client.upsert(writeRecords, newCommitTime).collect(); + assertNoWriteErrors(statuses); + + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath()); + HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg); + + Optional deltaCommit = + metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); + assertTrue(deltaCommit.isPresent()); + assertEquals("Delta commit should be 001", "001", deltaCommit.get().getTimestamp()); + + Optional commit = + metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + assertFalse(commit.isPresent()); + + FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); + TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metaClient, + hoodieTable.getCompletedCompactionCommitTimeline(), allFiles); + Stream dataFilesToRead = roView.getLatestDataFiles(); + assertTrue(!dataFilesToRead.findAny().isPresent()); + + roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles); + dataFilesToRead = roView.getLatestDataFiles(); + assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit", + dataFilesToRead.findAny().isPresent()); + + /** + * Write 2 (only inserts, written to .log file) + */ + newCommitTime = "002"; + records = dataGen.generateInserts(newCommitTime, 20); + writeRecords = jsc.parallelize(records, 1); + statuses = client.upsert(writeRecords, newCommitTime).collect(); + assertNoWriteErrors(statuses); + + /** + * Write 2 (only deletes, written to .log file) + */ + newCommitTime = "004"; + List fewRecordsForDelete = dataGen.generateDeletesFromExistingRecords(records); + + statuses = client.upsert(jsc.parallelize(fewRecordsForDelete, 1), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath()); + deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant(); + assertTrue(deltaCommit.isPresent()); + assertEquals("Latest Delta commit should be 004", "004", deltaCommit.get().getTimestamp()); + + commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + assertFalse(commit.isPresent()); + + allFiles = HoodieTestUtils.listAllDataFilesInPath(fs, cfg.getBasePath()); + roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles); + dataFilesToRead = roView.getLatestDataFiles(); + assertTrue(dataFilesToRead.findAny().isPresent()); + + List dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); + List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles); + //Wrote 40 records and deleted 20 records, so remaining 40-20 = 20 + assertEquals("Must contain 20 records", 20, recordsRead.size()); + } + private HoodieWriteConfig getConfig() { return getConfigBuilder().build(); } private HoodieWriteConfig.Builder getConfigBuilder() { return HoodieWriteConfig.newBuilder().withPath(basePath) - .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + .withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withCompactionConfig( HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024) .withInlineCompaction(false).build()) @@ -204,7 +338,4 @@ public class TestMergeOnReadTable { assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors()); } } - - - } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieMergeOnReadTestUtils.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieMergeOnReadTestUtils.java new file mode 100644 index 000000000..4b2424eb7 --- /dev/null +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieMergeOnReadTestUtils.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common; + +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.HoodieAvroUtils; +import com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static com.uber.hoodie.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; + +/** + * Utility methods to aid in testing MergeOnRead (workaround for HoodieReadClient for MOR) + */ +public class HoodieMergeOnReadTestUtils { + + public static List getRecordsUsingInputFormat(List inputPaths) throws IOException { + JobConf jobConf = new JobConf(); + Schema schema = HoodieAvroUtils.addMetadataFields(Schema.parse(TRIP_EXAMPLE_SCHEMA)); + HoodieRealtimeInputFormat inputFormat = new HoodieRealtimeInputFormat(); + setPropsForInputFormat(inputFormat, jobConf, schema); + return inputPaths.stream().map(path -> { + setInputPath(jobConf, path); + List records = new ArrayList<>(); + try { + List splits = Arrays.asList(inputFormat.getSplits(jobConf, 1)); + RecordReader recordReader = inputFormat.getRecordReader(splits.get(0), jobConf, null); + Void key = (Void) recordReader.createKey(); + ArrayWritable writable = (ArrayWritable) recordReader.createValue(); + while (recordReader.next(key, writable)) { + GenericRecordBuilder newRecord = new GenericRecordBuilder(schema); + // writable returns an array with [field1, field2, _hoodie_commit_time, _hoodie_commit_seqno] + Writable[] values = writable.get(); + schema.getFields().forEach(field -> { + newRecord.set(field, values[2]); + }); + records.add(newRecord.build()); + } + } catch (IOException ie) { + ie.printStackTrace(); + } + return records; + }).reduce((a, b) -> { + a.addAll(b); + return a; + }).get(); + } + + private static void setPropsForInputFormat(HoodieRealtimeInputFormat inputFormat, JobConf jobConf, Schema schema) { + List fields = schema.getFields(); + String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(",")); + String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); + Configuration conf = FSUtils.getFs().getConf(); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions); + jobConf.set("partition_columns", "datestr"); + conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions); + conf.set("partition_columns", "datestr"); + inputFormat.setConf(conf); + jobConf.addResource(conf); + } + + private static void setInputPath(JobConf jobConf, String inputPath) { + jobConf.set("mapreduce.input.fileinputformat.inputdir", inputPath); + jobConf.set("mapreduce.input.fileinputformat.inputdir", inputPath); + jobConf.set("map.input.dir", inputPath); + } +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java index cdb6baa55..933de8c53 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java @@ -40,6 +40,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static com.uber.hoodie.common.util.FSUtils.getFs; + /** * Utility functions involving with parquet. */ @@ -52,6 +54,7 @@ public class ParquetUtils { */ public static Set readRowKeysFromParquet(Path filePath) { Configuration conf = new Configuration(); + conf.addResource(getFs().getConf()); Schema readSchema = HoodieAvroUtils.getRecordKeySchema(); AvroReadSupport.setAvroReadSchema(conf, readSchema); AvroReadSupport.setRequestedProjection(conf, readSchema); @@ -97,7 +100,7 @@ public class ParquetUtils { ParquetMetadata footer; try { // TODO(vc): Should we use the parallel reading version here? - footer = ParquetFileReader.readFooter(conf, parquetFilePath); + footer = ParquetFileReader.readFooter(getFs().getConf(), parquetFilePath); } catch (IOException e) { throw new HoodieIOException("Failed to read footer for parquet " + parquetFilePath, e); diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java index 8771a7bb5..640f68029 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java @@ -64,6 +64,10 @@ public class HoodieTestUtils { public static final String RAW_TRIPS_TEST_NAME = "raw_trips"; public static final int DEFAULT_TASK_PARTITIONID = 1; + public static void resetFS() { + HoodieTestUtils.fs = FSUtils.getFs(); + } + public static HoodieTableMetaClient init(String basePath) throws IOException { return initTableType(basePath, HoodieTableType.COPY_ON_WRITE); }