diff --git a/hoodie-client/pom.xml b/hoodie-client/pom.xml
index 7712a2dc5..9b2a28318 100644
--- a/hoodie-client/pom.xml
+++ b/hoodie-client/pom.xml
@@ -177,6 +177,17 @@
1.10.19
test
+
+ com.uber.hoodie
+ hoodie-hadoop-mr
+ 0.4.0-SNAPSHOT
+ test
+
+
+ org.apache.hive
+ hive-exec
+ test
+
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
index be7c7966a..914ff6f48 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
@@ -19,32 +19,36 @@ package com.uber.hoodie.io;
import com.clearspring.analytics.util.Lists;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieDeltaWriteStat;
+import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.log.HoodieLogFormat;
import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
-import com.uber.hoodie.common.model.HoodieLogFile;
+import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieAppendException;
import com.uber.hoodie.exception.HoodieUpsertException;
import com.uber.hoodie.table.HoodieTable;
-import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
/**
* IO Operation to append data onto an existing file.
@@ -151,11 +155,22 @@ public class HoodieAppendHandle extends HoodieIOH
}
public void doAppend() {
- List recordItr =
- records.stream().map(this::getIndexedRecord).filter(Optional::isPresent)
- .map(Optional::get).collect(Collectors.toList());
+
+ List recordList = new ArrayList<>();
+ List keysToDelete = new ArrayList<>();
+ records.stream().forEach(record -> {
+ Optional indexedRecord = getIndexedRecord(record);
+ if(indexedRecord.isPresent()) {
+ recordList.add(indexedRecord.get());
+ } else {
+ keysToDelete.add(record.getRecordKey());
+ }
+ });
try {
- writer = writer.appendBlock(new HoodieAvroDataBlock(recordItr, schema));
+ writer = writer.appendBlock(new HoodieAvroDataBlock(recordList, schema));
+ if(keysToDelete.size() > 0) {
+ writer = writer.appendBlock(new HoodieDeleteBlock(keysToDelete.stream().toArray(String[]::new)));
+ }
} catch (Exception e) {
throw new HoodieAppendException(
"Failed while appeding records to " + currentLogFile.getPath(), e);
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java
index adf5c3a7e..63ac22f3d 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java
@@ -16,7 +16,9 @@
package com.uber.hoodie;
+import com.uber.hoodie.common.HoodieMergeOnReadTestUtils;
import com.uber.hoodie.common.HoodieTestDataGenerator;
+import com.uber.hoodie.common.minicluster.HdfsTestService;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
@@ -28,33 +30,55 @@ import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieStorageConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
+import com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.io.compact.HoodieCompactor;
import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor;
import com.uber.hoodie.table.HoodieTable;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static com.uber.hoodie.common.HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat;
+import static com.uber.hoodie.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -66,6 +90,37 @@ public class TestMergeOnReadTable {
private HoodieCompactor compactor;
private FileSystem fs;
+ //NOTE : Be careful in using DFS (FileSystem.class) vs LocalFs(RawLocalFileSystem.class)
+ //The implementation and gurantees of many API's differ, for example check rename(src,dst)
+ private static MiniDFSCluster dfsCluster;
+ private static DistributedFileSystem dfs;
+ private static HdfsTestService hdfsTestService;
+
+ @AfterClass
+ public static void cleanUp() throws Exception {
+ if (hdfsTestService != null) {
+ hdfsTestService.stop();
+ dfsCluster.shutdown();;
+ }
+ FSUtils.setFs(null);
+ FileSystem.closeAll();
+ HoodieTestUtils.resetFS();
+ }
+
+ @BeforeClass
+ public static void setUpDFS() throws IOException {
+
+ FileSystem.closeAll();
+ if (hdfsTestService == null) {
+ hdfsTestService = new HdfsTestService();
+ dfsCluster = hdfsTestService.start(true);
+ // Create a temp folder as the base path
+ dfs = dfsCluster.getFileSystem();
+ }
+ FSUtils.setFs(dfs);
+ HoodieTestUtils.resetFS();
+ }
+
@Before
public void init() throws IOException {
this.fs = FSUtils.getFs();
@@ -73,13 +128,16 @@ public class TestMergeOnReadTable {
// Initialize a local spark env
SparkConf sparkConf =
new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .setAppName("TestHoodieCompactor").setMaster("local[4]");
+ .setAppName("TestHoodieCompactor").setMaster("local[1]");
jsc = new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf));
+ jsc.hadoopConfiguration().addResource(FSUtils.getFs().getConf());
// Create a temp folder as the base path
TemporaryFolder folder = new TemporaryFolder();
folder.create();
basePath = folder.getRoot().getAbsolutePath();
+ dfs.mkdirs(new Path(basePath));
+ FSUtils.setFs(dfs);
HoodieTestUtils.initTableType(basePath, HoodieTableType.MERGE_ON_READ);
compactor = new HoodieRealtimeTableCompactor();
@@ -98,7 +156,6 @@ public class TestMergeOnReadTable {
}
}
-
@Test
public void testSimpleInsertAndUpdate() throws Exception {
HoodieWriteConfig cfg = getConfig();
@@ -182,13 +239,90 @@ public class TestMergeOnReadTable {
assertEquals("Must contain 200 records", 200, readClient.readSince("000").count());
}
+ @Test
+ public void testSimpleInsertAndDelete() throws Exception {
+ HoodieWriteConfig cfg = getConfig();
+ HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
+
+ /**
+ * Write 1 (only inserts, written as parquet file)
+ */
+ String newCommitTime = "001";
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+ List records = dataGen.generateInserts(newCommitTime, 20);
+ JavaRDD writeRecords = jsc.parallelize(records, 1);
+
+ List statuses = client.upsert(writeRecords, newCommitTime).collect();
+ assertNoWriteErrors(statuses);
+
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath());
+ HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg);
+
+ Optional deltaCommit =
+ metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
+ assertTrue(deltaCommit.isPresent());
+ assertEquals("Delta commit should be 001", "001", deltaCommit.get().getTimestamp());
+
+ Optional commit =
+ metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
+ assertFalse(commit.isPresent());
+
+ FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
+ TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metaClient,
+ hoodieTable.getCompletedCompactionCommitTimeline(), allFiles);
+ Stream dataFilesToRead = roView.getLatestDataFiles();
+ assertTrue(!dataFilesToRead.findAny().isPresent());
+
+ roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
+ dataFilesToRead = roView.getLatestDataFiles();
+ assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
+ dataFilesToRead.findAny().isPresent());
+
+ /**
+ * Write 2 (only inserts, written to .log file)
+ */
+ newCommitTime = "002";
+ records = dataGen.generateInserts(newCommitTime, 20);
+ writeRecords = jsc.parallelize(records, 1);
+ statuses = client.upsert(writeRecords, newCommitTime).collect();
+ assertNoWriteErrors(statuses);
+
+ /**
+ * Write 2 (only deletes, written to .log file)
+ */
+ newCommitTime = "004";
+ List fewRecordsForDelete = dataGen.generateDeletesFromExistingRecords(records);
+
+ statuses = client.upsert(jsc.parallelize(fewRecordsForDelete, 1), newCommitTime).collect();
+ // Verify there are no errors
+ assertNoWriteErrors(statuses);
+
+ metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath());
+ deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
+ assertTrue(deltaCommit.isPresent());
+ assertEquals("Latest Delta commit should be 004", "004", deltaCommit.get().getTimestamp());
+
+ commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
+ assertFalse(commit.isPresent());
+
+ allFiles = HoodieTestUtils.listAllDataFilesInPath(fs, cfg.getBasePath());
+ roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
+ dataFilesToRead = roView.getLatestDataFiles();
+ assertTrue(dataFilesToRead.findAny().isPresent());
+
+ List dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
+ List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles);
+ //Wrote 40 records and deleted 20 records, so remaining 40-20 = 20
+ assertEquals("Must contain 20 records", 20, recordsRead.size());
+ }
+
private HoodieWriteConfig getConfig() {
return getConfigBuilder().build();
}
private HoodieWriteConfig.Builder getConfigBuilder() {
return HoodieWriteConfig.newBuilder().withPath(basePath)
- .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
+ .withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
.withInlineCompaction(false).build())
@@ -204,7 +338,4 @@ public class TestMergeOnReadTable {
assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors());
}
}
-
-
-
}
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieMergeOnReadTestUtils.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieMergeOnReadTestUtils.java
new file mode 100644
index 000000000..4b2424eb7
--- /dev/null
+++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieMergeOnReadTestUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common;
+
+import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.HoodieAvroUtils;
+import com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static com.uber.hoodie.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+
+/**
+ * Utility methods to aid in testing MergeOnRead (workaround for HoodieReadClient for MOR)
+ */
+public class HoodieMergeOnReadTestUtils {
+
+ public static List getRecordsUsingInputFormat(List inputPaths) throws IOException {
+ JobConf jobConf = new JobConf();
+ Schema schema = HoodieAvroUtils.addMetadataFields(Schema.parse(TRIP_EXAMPLE_SCHEMA));
+ HoodieRealtimeInputFormat inputFormat = new HoodieRealtimeInputFormat();
+ setPropsForInputFormat(inputFormat, jobConf, schema);
+ return inputPaths.stream().map(path -> {
+ setInputPath(jobConf, path);
+ List records = new ArrayList<>();
+ try {
+ List splits = Arrays.asList(inputFormat.getSplits(jobConf, 1));
+ RecordReader recordReader = inputFormat.getRecordReader(splits.get(0), jobConf, null);
+ Void key = (Void) recordReader.createKey();
+ ArrayWritable writable = (ArrayWritable) recordReader.createValue();
+ while (recordReader.next(key, writable)) {
+ GenericRecordBuilder newRecord = new GenericRecordBuilder(schema);
+ // writable returns an array with [field1, field2, _hoodie_commit_time, _hoodie_commit_seqno]
+ Writable[] values = writable.get();
+ schema.getFields().forEach(field -> {
+ newRecord.set(field, values[2]);
+ });
+ records.add(newRecord.build());
+ }
+ } catch (IOException ie) {
+ ie.printStackTrace();
+ }
+ return records;
+ }).reduce((a, b) -> {
+ a.addAll(b);
+ return a;
+ }).get();
+ }
+
+ private static void setPropsForInputFormat(HoodieRealtimeInputFormat inputFormat, JobConf jobConf, Schema schema) {
+ List fields = schema.getFields();
+ String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(","));
+ String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
+ Configuration conf = FSUtils.getFs().getConf();
+ jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
+ jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
+ jobConf.set("partition_columns", "datestr");
+ conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
+ conf.set("partition_columns", "datestr");
+ inputFormat.setConf(conf);
+ jobConf.addResource(conf);
+ }
+
+ private static void setInputPath(JobConf jobConf, String inputPath) {
+ jobConf.set("mapreduce.input.fileinputformat.inputdir", inputPath);
+ jobConf.set("mapreduce.input.fileinputformat.inputdir", inputPath);
+ jobConf.set("map.input.dir", inputPath);
+ }
+}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java
index cdb6baa55..933de8c53 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java
@@ -40,6 +40,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static com.uber.hoodie.common.util.FSUtils.getFs;
+
/**
* Utility functions involving with parquet.
*/
@@ -52,6 +54,7 @@ public class ParquetUtils {
*/
public static Set readRowKeysFromParquet(Path filePath) {
Configuration conf = new Configuration();
+ conf.addResource(getFs().getConf());
Schema readSchema = HoodieAvroUtils.getRecordKeySchema();
AvroReadSupport.setAvroReadSchema(conf, readSchema);
AvroReadSupport.setRequestedProjection(conf, readSchema);
@@ -97,7 +100,7 @@ public class ParquetUtils {
ParquetMetadata footer;
try {
// TODO(vc): Should we use the parallel reading version here?
- footer = ParquetFileReader.readFooter(conf, parquetFilePath);
+ footer = ParquetFileReader.readFooter(getFs().getConf(), parquetFilePath);
} catch (IOException e) {
throw new HoodieIOException("Failed to read footer for parquet " + parquetFilePath,
e);
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java
index 8771a7bb5..640f68029 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java
@@ -64,6 +64,10 @@ public class HoodieTestUtils {
public static final String RAW_TRIPS_TEST_NAME = "raw_trips";
public static final int DEFAULT_TASK_PARTITIONID = 1;
+ public static void resetFS() {
+ HoodieTestUtils.fs = FSUtils.getFs();
+ }
+
public static HoodieTableMetaClient init(String basePath) throws IOException {
return initTableType(basePath, HoodieTableType.COPY_ON_WRITE);
}