diff --git a/hoodie-cli/src/main/scala/com/uber/hoodie/cli/DedupeSparkJob.scala b/hoodie-cli/src/main/scala/com/uber/hoodie/cli/DedupeSparkJob.scala index c86d13ae7..8ae72284c 100644 --- a/hoodie-cli/src/main/scala/com/uber/hoodie/cli/DedupeSparkJob.scala +++ b/hoodie-cli/src/main/scala/com/uber/hoodie/cli/DedupeSparkJob.scala @@ -20,7 +20,7 @@ import java.util.stream.Collectors import com.uber.hoodie.common.model.{HoodieDataFile, HoodieRecord} import com.uber.hoodie.common.table.HoodieTableMetaClient -import com.uber.hoodie.common.table.view.ReadOptimizedTableView +import com.uber.hoodie.common.table.view.HoodieTableFileSystemView import com.uber.hoodie.common.util.FSUtils import com.uber.hoodie.exception.HoodieException import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} @@ -75,7 +75,7 @@ class DedupeSparkJob (basePath: String, val dedupeTblName = s"${tmpTableName}_dupeKeys" val metadata = new HoodieTableMetaClient(fs, basePath) - val fsView = new ReadOptimizedTableView(fs, metadata) + val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants()) val allFiles = fs.listStatus(new org.apache.hadoop.fs.Path(s"${basePath}/${duplicatedPartitionPath}")) val latestFiles:java.util.List[HoodieDataFile] = fsView.getLatestVersions(allFiles).collect(Collectors.toList[HoodieDataFile]()) @@ -126,7 +126,7 @@ class DedupeSparkJob (basePath: String, def fixDuplicates(dryRun: Boolean = true) = { val metadata = new HoodieTableMetaClient(fs, basePath) - val fsView = new ReadOptimizedTableView(fs, metadata) + val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants()) val allFiles = fs.listStatus(new Path(s"${basePath}/${duplicatedPartitionPath}")) val latestFiles:java.util.List[HoodieDataFile] = fsView.getLatestVersions(allFiles).collect(Collectors.toList[HoodieDataFile]()) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java index 19ab4427e..d248dad62 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java @@ -26,12 +26,12 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.timeline.HoodieInstant; -import com.uber.hoodie.common.table.view.ReadOptimizedTableView; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.index.HoodieBloomIndex; +import com.uber.hoodie.table.HoodieTable; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; @@ -79,7 +79,7 @@ public class HoodieReadClient implements Serializable { */ private transient final HoodieBloomIndex index; private final HoodieTimeline commitTimeline; - private HoodieTableMetaClient metaClient; + private HoodieTable hoodieTable; private transient Optional sqlContextOpt; @@ -89,9 +89,11 @@ public class HoodieReadClient implements Serializable { public HoodieReadClient(JavaSparkContext jsc, String basePath) { this.jsc = jsc; this.fs = FSUtils.getFs(); - this.metaClient = new HoodieTableMetaClient(fs, basePath, true); - this.commitTimeline = - metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + // Create a Hoodie table which encapsulated the commits and files visible + this.hoodieTable = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(fs, basePath, true), null); + this.commitTimeline = hoodieTable.getCompletedCompactionCommitTimeline(); + this.index = new HoodieBloomIndex(HoodieWriteConfig.newBuilder().withPath(basePath).build(), jsc); this.sqlContextOpt = Optional.absent(); @@ -134,7 +136,7 @@ public class HoodieReadClient implements Serializable { assertSqlContext(); JavaPairRDD> keyToFileRDD = - index.fetchRecordLocation(hoodieKeys, metaClient); + index.fetchRecordLocation(hoodieKeys, hoodieTable); List paths = keyToFileRDD .filter(new Function>, Boolean>() { @Override @@ -184,14 +186,14 @@ public class HoodieReadClient implements Serializable { public Dataset read(String... paths) { assertSqlContext(); List filteredPaths = new ArrayList<>(); - TableFileSystemView fileSystemView = new ReadOptimizedTableView(fs, metaClient); + TableFileSystemView fileSystemView = hoodieTable.getFileSystemView(); try { for (String path : paths) { - if (!path.contains(metaClient.getBasePath())) { + if (!path.contains(hoodieTable.getMetaClient().getBasePath())) { throw new HoodieException("Path " + path + " does not seem to be a part of a Hoodie dataset at base path " - + metaClient.getBasePath()); + + hoodieTable.getMetaClient().getBasePath()); } List latestFiles = fileSystemView.getLatestVersions(fs.globStatus(new Path(path))).collect( @@ -243,8 +245,9 @@ public class HoodieReadClient implements Serializable { */ public Dataset readCommit(String commitTime) { assertSqlContext(); + String actionType = hoodieTable.getCompactedCommitActionType(); HoodieInstant commitInstant = - new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime); + new HoodieInstant(false, actionType, commitTime); if (!commitTimeline.containsInstant(commitInstant)) { new HoodieException("No commit exists at " + commitTime); } @@ -261,6 +264,7 @@ public class HoodieReadClient implements Serializable { } } + /** * Checks if the given [Keys] exists in the hoodie table and returns [Key, * Optional[FullFilePath]] If the optional FullFilePath value is not present, then the key is @@ -269,7 +273,7 @@ public class HoodieReadClient implements Serializable { */ public JavaPairRDD> checkExists( JavaRDD hoodieKeys) { - return index.fetchRecordLocation(hoodieKeys, metaClient); + return index.fetchRecordLocation(hoodieKeys, hoodieTable); } /** @@ -280,7 +284,7 @@ public class HoodieReadClient implements Serializable { * @return A subset of hoodieRecords RDD, with existing records filtered out. */ public JavaRDD filterExists(JavaRDD hoodieRecords) { - JavaRDD recordsWithLocation = index.tagLocation(hoodieRecords, metaClient); + JavaRDD recordsWithLocation = index.tagLocation(hoodieRecords, hoodieTable); return recordsWithLocation.filter(new Function() { @Override public Boolean call(HoodieRecord v1) throws Exception { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 029ec6c3e..570292b0c 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -128,9 +128,11 @@ public class HoodieWriteClient implements Seriali * @return A subset of hoodieRecords RDD, with existing records filtered out. */ public JavaRDD> filterExists(JavaRDD> hoodieRecords) { - HoodieTableMetaClient metaClient = - new HoodieTableMetaClient(fs, config.getBasePath(), true); - JavaRDD> recordsWithLocation = index.tagLocation(hoodieRecords, metaClient); + // Create a Hoodie table which encapsulated the commits and files visible + HoodieTable table = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); + + JavaRDD> recordsWithLocation = index.tagLocation(hoodieRecords, table); return recordsWithLocation.filter(new Function, Boolean>() { @Override public Boolean call(HoodieRecord v1) throws Exception { @@ -144,11 +146,9 @@ public class HoodieWriteClient implements Seriali */ public JavaRDD upsert(JavaRDD> records, final String commitTime) { writeContext = metrics.getCommitCtx(); - HoodieTableMetaClient metaClient = - new HoodieTableMetaClient(fs, config.getBasePath(), true); - - final HoodieTable table = - HoodieTable.getHoodieTable(metaClient, commitTime, config); + // Create a Hoodie table which encapsulated the commits and files visible + HoodieTable table = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); try { // De-dupe/merge if needed @@ -157,15 +157,14 @@ public class HoodieWriteClient implements Seriali config.getUpsertShuffleParallelism()); // perform index loop up to get existing location of records - JavaRDD> taggedRecords = index.tagLocation(dedupedRecords, metaClient); + JavaRDD> taggedRecords = index.tagLocation(dedupedRecords, table); // Cache the tagged records, so we don't end up computing both taggedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER()); - WorkloadProfile profile = null; if (table.isWorkloadProfileNeeded()) { - profile = new WorkloadProfile(taggedRecords); + profile = new WorkloadProfile<>(taggedRecords); logger.info("Workload profile :" + profile); } @@ -196,7 +195,8 @@ public class HoodieWriteClient implements Seriali @Override public Iterator> call(Integer partition, Iterator> recordItr) throws Exception { - return table.handleUpsertPartition(partition, recordItr, upsertPartitioner); + return table.handleUpsertPartition(commitTime, partition, recordItr, + upsertPartitioner); } }, true).flatMap(new FlatMapFunction, WriteStatus>() { @Override @@ -207,7 +207,7 @@ public class HoodieWriteClient implements Seriali }); // Update the index back. - JavaRDD resultRDD = index.updateLocation(upsertStatusRDD, metaClient); + JavaRDD resultRDD = index.updateLocation(upsertStatusRDD, table); resultRDD = resultRDD.persist(config.getWriteStatusStorageLevel()); commitOnAutoCommit(commitTime, resultRDD); return resultRDD; @@ -250,8 +250,9 @@ public class HoodieWriteClient implements Seriali */ public JavaRDD insert(JavaRDD> records, final String commitTime) { writeContext = metrics.getCommitCtx(); - HoodieTableMetaClient metaClient = - new HoodieTableMetaClient(fs, config.getBasePath(), true); + // Create a Hoodie table which encapsulated the commits and files visible + HoodieTable table = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); try { // De-dupe/merge if needed @@ -272,7 +273,7 @@ public class HoodieWriteClient implements Seriali } }, true, config.getInsertShuffleParallelism()); JavaRDD writeStatusRDD = sortedRecords - .mapPartitionsWithIndex(new InsertMapFunction(commitTime, config, metaClient), + .mapPartitionsWithIndex(new InsertMapFunction(commitTime, config, table), true).flatMap(new FlatMapFunction, WriteStatus>() { @Override public Iterable call(List writeStatuses) @@ -281,7 +282,7 @@ public class HoodieWriteClient implements Seriali } }); // Update the index back - JavaRDD statuses = index.updateLocation(writeStatusRDD, metaClient); + JavaRDD statuses = index.updateLocation(writeStatusRDD, table); // Trigger the insert and collect statuses statuses = statuses.persist(config.getWriteStatusStorageLevel()); commitOnAutoCommit(commitTime, statuses); @@ -299,9 +300,11 @@ public class HoodieWriteClient implements Seriali */ public boolean commit(String commitTime, JavaRDD writeStatuses) { logger.info("Comitting " + commitTime); - HoodieTableMetaClient metaClient = - new HoodieTableMetaClient(fs, config.getBasePath(), true); - HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + // Create a Hoodie table which encapsulated the commits and files visible + HoodieTable table = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); + + HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); List> stats = writeStatuses.mapToPair(new PairFunction() { @@ -318,8 +321,9 @@ public class HoodieWriteClient implements Seriali } try { + String actionType = table.getCommitActionType(); activeTimeline.saveAsComplete( - new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime), + new HoodieInstant(true, actionType, commitTime), Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); // Save was a success // We cannot have unbounded commit files. Archive commits if we have to archive @@ -333,7 +337,7 @@ public class HoodieWriteClient implements Seriali metadata); writeContext = null; } - logger.info("Status of the commit " + commitTime); + logger.info("Committed " + commitTime); } catch (IOException e) { throw new HoodieCommitException( "Failed to commit " + config.getBasePath() + " at time " + commitTime, e); @@ -355,9 +359,10 @@ public class HoodieWriteClient implements Seriali */ public boolean rollback(final String commitTime) throws HoodieRollbackException { final Timer.Context context = metrics.getRollbackCtx(); - HoodieTableMetaClient metaClient = - new HoodieTableMetaClient(fs, config.getBasePath(), true); - HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + // Create a Hoodie table which encapsulated the commits and files visible + HoodieTable table = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); + HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieTimeline inflightTimeline = activeTimeline.getCommitTimeline().filterInflights(); HoodieTimeline commitTimeline = activeTimeline.getCommitTimeline().filterCompletedInstants(); @@ -394,7 +399,8 @@ public class HoodieWriteClient implements Seriali // 3. Delete the new generated parquet files logger.info("Clean out all parquet files generated at time: " + commitTime); final Accumulator numFilesDeletedAccu = jsc.accumulator(0); - jsc.parallelize(FSUtils.getAllPartitionPaths(fs, metaClient.getBasePath())) + jsc.parallelize( + FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath())) .foreach(new VoidFunction() { @Override public void call(String partitionPath) throws Exception { @@ -450,10 +456,11 @@ public class HoodieWriteClient implements Seriali try { logger.info("Cleaner started"); final Timer.Context context = metrics.getCleanCtx(); - HoodieTableMetaClient metaClient = - new HoodieTableMetaClient(fs, config.getBasePath(), true); + // Create a Hoodie table which encapsulated the commits and files visible + HoodieTable table = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); - List partitionsToClean = FSUtils.getAllPartitionPaths(fs, metaClient.getBasePath()); + List partitionsToClean = FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath()); // shuffle to distribute cleaning work across partitions evenly Collections.shuffle(partitionsToClean); logger.info("Partitions to clean up : " + partitionsToClean + ", with policy " + config.getCleanerPolicy()); @@ -468,7 +475,7 @@ public class HoodieWriteClient implements Seriali @Override public Integer call(String partitionPathToClean) throws Exception { FileSystem fs = FSUtils.getFs(); - HoodieCleaner cleaner = new HoodieCleaner(metaClient, config, fs); + HoodieCleaner cleaner = new HoodieCleaner(table, config); return cleaner.clean(partitionPathToClean); } }).reduce(new Function2() { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index c439f6d7e..d996273de 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -36,7 +36,7 @@ import java.util.Properties; public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String BASE_PATH_PROP = "hoodie.base.path"; private static final String AVRO_SCHEMA = "hoodie.avro.schema"; - private static final String TABLE_NAME = "hoodie.table.name"; + public static final String TABLE_NAME = "hoodie.table.name"; private static final String DEFAULT_PARALLELISM = "200"; private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism"; private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/exception/HoodieAppendException.java b/hoodie-client/src/main/java/com/uber/hoodie/exception/HoodieAppendException.java new file mode 100644 index 000000000..0ba0eb50c --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/exception/HoodieAppendException.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.exception; + +/** + *

+ * Exception thrown for any higher level errors when HoodieClient is doing a delta commit + *

+ */ +public class HoodieAppendException extends HoodieException { + public HoodieAppendException(String msg, Throwable e) { + super(msg, e); + } + + public HoodieAppendException(String msg) { + super(msg); + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/BulkInsertMapFunction.java b/hoodie-client/src/main/java/com/uber/hoodie/func/BulkInsertMapFunction.java index da6b526f1..74478450d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/func/BulkInsertMapFunction.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/func/BulkInsertMapFunction.java @@ -21,6 +21,7 @@ import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.table.HoodieTable; import org.apache.spark.api.java.function.Function2; import java.util.Iterator; @@ -35,18 +36,18 @@ public class InsertMapFunction private String commitTime; private HoodieWriteConfig config; - private HoodieTableMetaClient metaClient; + private HoodieTable hoodieTable; public InsertMapFunction(String commitTime, HoodieWriteConfig config, - HoodieTableMetaClient metaClient) { + HoodieTable hoodieTable) { this.commitTime = commitTime; this.config = config; - this.metaClient = metaClient; + this.hoodieTable = hoodieTable; } @Override public Iterator> call(Integer partition, Iterator> sortedRecordItr) throws Exception { - return new LazyInsertIterable<>(sortedRecordItr, config, commitTime, metaClient); + return new LazyInsertIterable<>(sortedRecordItr, config, commitTime, hoodieTable); } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/LazyInsertIterable.java b/hoodie-client/src/main/java/com/uber/hoodie/func/LazyInsertIterable.java index ab369da7e..aa1b27891 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/func/LazyInsertIterable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/func/LazyInsertIterable.java @@ -24,6 +24,7 @@ import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.io.HoodieIOHandle; import com.uber.hoodie.io.HoodieInsertHandle; +import com.uber.hoodie.table.HoodieTable; import org.apache.spark.TaskContext; import java.util.ArrayList; @@ -40,17 +41,17 @@ public class LazyInsertIterable extends LazyItera private final HoodieWriteConfig hoodieConfig; private final String commitTime; - private final HoodieTableMetaClient metaClient; + private final HoodieTable hoodieTable; private Set partitionsCleaned; private HoodieInsertHandle handle; public LazyInsertIterable(Iterator> sortedRecordItr, HoodieWriteConfig config, - String commitTime, HoodieTableMetaClient metaClient) { + String commitTime, HoodieTable hoodieTable) { super(sortedRecordItr); this.partitionsCleaned = new HashSet<>(); this.hoodieConfig = config; this.commitTime = commitTime; - this.metaClient = metaClient; + this.hoodieTable = hoodieTable; } @Override protected void start() { @@ -78,7 +79,7 @@ public class LazyInsertIterable extends LazyItera // lazily initialize the handle, for the first time if (handle == null) { handle = - new HoodieInsertHandle(hoodieConfig, commitTime, metaClient, + new HoodieInsertHandle(hoodieConfig, commitTime, hoodieTable, record.getPartitionPath()); } @@ -90,7 +91,7 @@ public class LazyInsertIterable extends LazyItera statuses.add(handle.close()); // Need to handle the rejected record & open new handle handle = - new HoodieInsertHandle(hoodieConfig, commitTime, metaClient, + new HoodieInsertHandle(hoodieConfig, commitTime, hoodieTable, record.getPartitionPath()); handle.write(record); // we should be able to write 1 record. break; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/HBaseIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/HBaseIndex.java index fcfff43d5..314039b3a 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/HBaseIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/HBaseIndex.java @@ -30,6 +30,7 @@ import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.exception.HoodieDependentSystemUnavailableException; import com.uber.hoodie.exception.HoodieIndexException; +import com.uber.hoodie.table.HoodieTable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; @@ -67,7 +68,7 @@ public class HBaseIndex extends HoodieIndex { @Override public JavaPairRDD> fetchRecordLocation( - JavaRDD hoodieKeys, HoodieTableMetaClient metaClient) { + JavaRDD hoodieKeys, HoodieTable hoodieTable) { throw new UnsupportedOperationException("HBase index does not implement check exist yet"); } @@ -93,10 +94,10 @@ public class HBaseIndex extends HoodieIndex { class LocationTagFunction implements Function2>, Iterator>> { - private final HoodieTableMetaClient metaClient; + private final HoodieTable hoodieTable; - LocationTagFunction(HoodieTableMetaClient metaClient) { - this.metaClient = metaClient; + LocationTagFunction(HoodieTable hoodieTable) { + this.hoodieTable = hoodieTable; } @Override @@ -129,9 +130,7 @@ public class HBaseIndex extends HoodieIndex { String fileId = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN)); - HoodieTimeline commitTimeline = - metaClient.getActiveTimeline().getCommitTimeline() - .filterCompletedInstants(); + HoodieTimeline commitTimeline = hoodieTable.getCompletedCommitTimeline(); // if the last commit ts for this row is less than the system commit ts if (!commitTimeline.empty() && commitTimeline.containsInstant( new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTs))) { @@ -160,9 +159,8 @@ public class HBaseIndex extends HoodieIndex { } @Override - public JavaRDD> tagLocation(JavaRDD> recordRDD, - HoodieTableMetaClient metaClient) { - return recordRDD.mapPartitionsWithIndex(this.new LocationTagFunction(metaClient), true); + public JavaRDD> tagLocation(JavaRDD> recordRDD, HoodieTable hoodieTable) { + return recordRDD.mapPartitionsWithIndex(this.new LocationTagFunction(hoodieTable), true); } class UpdateLocationTask implements Function2, Iterator> { @@ -223,7 +221,7 @@ public class HBaseIndex extends HoodieIndex { @Override public JavaRDD updateLocation(JavaRDD writeStatusRDD, - HoodieTableMetaClient metaClient) { + HoodieTable hoodieTable) { return writeStatusRDD.mapPartitionsWithIndex(new UpdateLocationTask(), true); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java index 3be461746..a36ab999d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java @@ -18,21 +18,16 @@ package com.uber.hoodie.index; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; - -import com.uber.hoodie.common.model.HoodieDataFile; -import com.uber.hoodie.common.table.HoodieTableMetaClient; -import com.uber.hoodie.common.table.TableFileSystemView; -import com.uber.hoodie.common.table.timeline.HoodieInstant; -import com.uber.hoodie.common.table.view.ReadOptimizedTableView; -import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.FSUtils; - -import org.apache.hadoop.fs.FileSystem; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.table.HoodieTable; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -43,9 +38,12 @@ import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; - import scala.Tuple2; -import java.util.*; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.stream.Collectors; /** @@ -67,7 +65,7 @@ public class HoodieBloomIndex extends HoodieIndex } @Override - public JavaRDD> tagLocation(JavaRDD> recordRDD, final HoodieTableMetaClient metaClient) { + public JavaRDD> tagLocation(JavaRDD> recordRDD, final HoodieTable hoodieTable) { // Step 1: Extract out thinner JavaPairRDD of (partitionPath, recordKey) JavaPairRDD partitionRecordKeyPairRDD = recordRDD @@ -80,7 +78,7 @@ public class HoodieBloomIndex extends HoodieIndex // Lookup indexes for all the partition/recordkey pair JavaPairRDD rowKeyFilenamePairRDD = - lookupIndex(partitionRecordKeyPairRDD, metaClient); + lookupIndex(partitionRecordKeyPairRDD, hoodieTable); // Cache the result, for subsequent stages. rowKeyFilenamePairRDD.cache(); @@ -94,7 +92,7 @@ public class HoodieBloomIndex extends HoodieIndex } public JavaPairRDD> fetchRecordLocation( - JavaRDD hoodieKeys, final HoodieTableMetaClient metaClient) { + JavaRDD hoodieKeys, final HoodieTable hoodieTable) { JavaPairRDD partitionRecordKeyPairRDD = hoodieKeys.mapToPair(new PairFunction() { @Override @@ -105,7 +103,7 @@ public class HoodieBloomIndex extends HoodieIndex // Lookup indexes for all the partition/recordkey pair JavaPairRDD rowKeyFilenamePairRDD = - lookupIndex(partitionRecordKeyPairRDD, metaClient); + lookupIndex(partitionRecordKeyPairRDD, hoodieTable); JavaPairRDD rowKeyHoodieKeyPairRDD = hoodieKeys.mapToPair(new PairFunction() { @@ -125,9 +123,9 @@ public class HoodieBloomIndex extends HoodieIndex if (keyPathTuple._2._2.isPresent()) { String fileName = keyPathTuple._2._2.get(); String partitionPath = keyPathTuple._2._1.getPartitionPath(); - recordLocationPath = Optional - .of(new Path(new Path(metaClient.getBasePath(), partitionPath), fileName) - .toUri().getPath()); + recordLocationPath = Optional.of(new Path( + new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath), + fileName).toUri().getPath()); } else { recordLocationPath = Optional.absent(); } @@ -141,18 +139,18 @@ public class HoodieBloomIndex extends HoodieIndex * record keys already present and drop the record keys if not present * * @param partitionRecordKeyPairRDD - * @param metaClient + * @param hoodieTable * @return */ private JavaPairRDD lookupIndex( - JavaPairRDD partitionRecordKeyPairRDD, final HoodieTableMetaClient metaClient) { + JavaPairRDD partitionRecordKeyPairRDD, final HoodieTable hoodieTable) { // Obtain records per partition, in the incoming records Map recordsPerPartition = partitionRecordKeyPairRDD.countByKey(); List affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet()); // Step 2: Load all involved files as pairs JavaPairRDD partitionFilePairRDD = - loadInvolvedFiles(affectedPartitionPathList, metaClient); + loadInvolvedFiles(affectedPartitionPathList, hoodieTable); Map filesPerPartition = partitionFilePairRDD.countByKey(); // Compute total subpartitions, to split partitions into. @@ -212,19 +210,17 @@ public class HoodieBloomIndex extends HoodieIndex */ @VisibleForTesting JavaPairRDD loadInvolvedFiles(List partitions, - final HoodieTableMetaClient metaClient) { + final HoodieTable hoodieTable) { return jsc.parallelize(partitions, Math.max(partitions.size(), 1)) .flatMapToPair(new PairFlatMapFunction() { @Override public Iterable> call(String partitionPath) { - FileSystem fs = FSUtils.getFs(); - TableFileSystemView view = new ReadOptimizedTableView(fs, metaClient); java.util.Optional latestCommitTime = - metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant(); + hoodieTable.getCommitTimeline().filterCompletedInstants().lastInstant(); List> list = new ArrayList<>(); if (latestCommitTime.isPresent()) { List filteredFiles = - view.getLatestVersionInPartition(partitionPath, + hoodieTable.getFileSystemView().getLatestVersionInPartition(partitionPath, latestCommitTime.get().getTimestamp()).collect(Collectors.toList()); for (HoodieDataFile file : filteredFiles) { list.add(new Tuple2<>(partitionPath, file.getFileName())); @@ -235,6 +231,7 @@ public class HoodieBloomIndex extends HoodieIndex }); } + @Override public boolean rollbackCommit(String commitTime) { // Nope, don't need to do anything. @@ -424,7 +421,7 @@ public class HoodieBloomIndex extends HoodieIndex } @Override - public JavaRDD updateLocation(JavaRDD writeStatusRDD, HoodieTableMetaClient metaClient) { + public JavaRDD updateLocation(JavaRDD writeStatusRDD, HoodieTable hoodieTable) { return writeStatusRDD; } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java index 7cc0a3404..dcae31ff7 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java @@ -26,6 +26,7 @@ import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.exception.HoodieIndexException; +import com.uber.hoodie.table.HoodieTable; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -63,15 +64,14 @@ public abstract class HoodieIndex implements Seri * @return */ public abstract JavaPairRDD> fetchRecordLocation( - JavaRDD hoodieKeys, final HoodieTableMetaClient metaClient); + JavaRDD hoodieKeys, final HoodieTable metaClient); /** * Looks up the index and tags each incoming record with a location of a file that contains the * row (if it is actually present) */ public abstract JavaRDD> tagLocation(JavaRDD> recordRDD, - HoodieTableMetaClient metaClient) throws - HoodieIndexException; + HoodieTable hoodieTable) throws HoodieIndexException; /** * Extracts the location of written records, and updates the index. @@ -79,7 +79,7 @@ public abstract class HoodieIndex implements Seri * TODO(vc): We may need to propagate the record as well in a WriteStatus class */ public abstract JavaRDD updateLocation(JavaRDD writeStatusRDD, - HoodieTableMetaClient metaClient) throws HoodieIndexException; + HoodieTable hoodieTable) throws HoodieIndexException; /** * Rollback the efffects of the commit made at commitTime. diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java index b3b13d25b..a546b4bc1 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java @@ -25,6 +25,7 @@ import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.table.HoodieTable; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -55,7 +56,7 @@ public class InMemoryHashIndex extends HoodieInde @Override public JavaPairRDD> fetchRecordLocation( - JavaRDD hoodieKeys, final HoodieTableMetaClient metaClient) { + JavaRDD hoodieKeys, final HoodieTable hoodieTable) { throw new UnsupportedOperationException("InMemory index does not implement check exist yet"); } @@ -81,13 +82,13 @@ public class InMemoryHashIndex extends HoodieInde @Override public JavaRDD> tagLocation(JavaRDD> recordRDD, - HoodieTableMetaClient metaClient) { + HoodieTable hoodieTable) { return recordRDD.mapPartitionsWithIndex(this.new LocationTagFunction(), true); } @Override public JavaRDD updateLocation(JavaRDD writeStatusRDD, - HoodieTableMetaClient metaClient) { + HoodieTable hoodieTable) { return writeStatusRDD.map(new Function() { @Override public WriteStatus call(WriteStatus writeStatus) { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java new file mode 100644 index 000000000..272c98e1e --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -0,0 +1,163 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.io; + +import com.clearspring.analytics.util.Lists; +import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.model.HoodieDeltaWriteStat; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordLocation; +import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.table.log.HoodieLogAppendConfig; +import com.uber.hoodie.common.table.log.HoodieLogFile; +import com.uber.hoodie.common.table.log.avro.RollingAvroLogAppender; +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.HoodieAvroUtils; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.exception.HoodieAppendException; +import com.uber.hoodie.exception.HoodieUpsertException; +import com.uber.hoodie.table.HoodieTable; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.TaskContext; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; + +public class HoodieAppendHandle extends HoodieIOHandle { + private static Logger logger = LogManager.getLogger(HoodieUpdateHandle.class); + private static AtomicLong recordIndex = new AtomicLong(1); + + private final WriteStatus writeStatus; + private final String fileId; + private String partitionPath; + private RollingAvroLogAppender logAppender; + private List> records; + private long recordsWritten = 0; + private HoodieLogFile currentLogFile; + + public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, + HoodieTable hoodieTable, String fileId, Iterator> recordItr) { + super(config, commitTime, hoodieTable); + WriteStatus writeStatus = new WriteStatus(); + writeStatus.setStat(new HoodieDeltaWriteStat()); + this.writeStatus = writeStatus; + this.fileId = fileId; + init(recordItr); + } + + private void init(Iterator> recordItr) { + List> records = Lists.newArrayList(); + recordItr.forEachRemaining(record -> { + records.add(record); + // extract some information from the first record + if (partitionPath == null) { + partitionPath = record.getPartitionPath(); + String latestValidFilePath = + fileSystemView.getLatestDataFilesForFileId(record.getPartitionPath(), fileId) + .findFirst().get().getFileName(); + String baseCommitTime = FSUtils.getCommitTime(latestValidFilePath); + writeStatus.getStat().setPrevCommit(baseCommitTime); + writeStatus.setFileId(fileId); + writeStatus.setPartitionPath(record.getPartitionPath()); + writeStatus.getStat().setFileId(fileId); + + try { + HoodieLogAppendConfig logConfig = HoodieLogAppendConfig.newBuilder() + .onPartitionPath( + new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath)) + .withFileId(fileId).withBaseCommitTime(baseCommitTime).withSchema(schema) + .withFs(fs).withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); + this.logAppender = new RollingAvroLogAppender(logConfig); + this.currentLogFile = logAppender.getConfig().getLogFile(); + ((HoodieDeltaWriteStat) writeStatus.getStat()) + .setLogVersion(currentLogFile.getLogVersion()); + ((HoodieDeltaWriteStat) writeStatus.getStat()) + .setLogOffset(logAppender.getCurrentSize()); + } catch (Exception e) { + logger.error("Error in update task at commit " + commitTime, e); + writeStatus.setGlobalError(e); + throw new HoodieUpsertException( + "Failed to initialize HoodieUpdateHandle for FileId: " + fileId + + " on commit " + commitTime + " on HDFS path " + hoodieTable + .getMetaClient().getBasePath()); + } + writeStatus.getStat().setFullPath(currentLogFile.getPath().toString()); + } + // update the new location of the record, so we know where to find it next + record.setNewLocation(new HoodieRecordLocation(commitTime, fileId)); + }); + this.records = records; + } + + private Optional getIndexedRecord(HoodieRecord hoodieRecord) { + try { + IndexedRecord avroRecord = hoodieRecord.getData().getInsertValue(schema); + String seqId = HoodieRecord.generateSequenceId(commitTime, TaskContext.getPartitionId(), + recordIndex.getAndIncrement()); + HoodieAvroUtils + .addHoodieKeyToRecord((GenericRecord) avroRecord, hoodieRecord.getRecordKey(), + hoodieRecord.getPartitionPath(), fileId); + HoodieAvroUtils + .addCommitMetadataToRecord((GenericRecord) avroRecord, commitTime, seqId); + hoodieRecord.deflate(); + writeStatus.markSuccess(hoodieRecord); + recordsWritten++; + return Optional.of(avroRecord); + } catch (Exception e) { + logger.error("Error writing record " + hoodieRecord, e); + writeStatus.markFailure(hoodieRecord, e); + } + return Optional.empty(); + } + + public void doAppend() { + Iterator recordItr = + records.stream().map(this::getIndexedRecord).filter(Optional::isPresent) + .map(Optional::get).iterator(); + try { + logAppender.append(recordItr); + } catch (Exception e) { + throw new HoodieAppendException( + "Failed while appeding records to " + currentLogFile.getPath(), e); + } + } + + public void close() { + try { + if (logAppender != null) { + logAppender.close(); + } + writeStatus.getStat().setNumWrites(recordsWritten); + writeStatus.getStat().setTotalWriteErrors(writeStatus.getFailedRecords().size()); + } catch (IOException e) { + throw new HoodieUpsertException("Failed to close UpdateHandle", e); + } + } + + public WriteStatus getWriteStatus() { + return writeStatus; + } + + +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleaner.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleaner.java index 14bd732bb..07d1780b5 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleaner.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleaner.java @@ -17,14 +17,12 @@ package com.uber.hoodie.io; import com.uber.hoodie.common.model.HoodieDataFile; -import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.timeline.HoodieInstant; -import com.uber.hoodie.common.table.view.ReadOptimizedTableView; -import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.common.util.FSUtils; - +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.table.HoodieTable; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -58,18 +56,16 @@ public class HoodieCleaner { private final TableFileSystemView fileSystemView; private final HoodieTimeline commitTimeline; - private HoodieTableMetaClient metaClient; + private HoodieTable hoodieTable; private HoodieWriteConfig config; private FileSystem fs; - public HoodieCleaner(HoodieTableMetaClient metaClient, HoodieWriteConfig config, - FileSystem fs) { - this.metaClient = metaClient; - this.fileSystemView = new ReadOptimizedTableView(fs, metaClient); - this.commitTimeline = - metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + public HoodieCleaner(HoodieTable hoodieTable, HoodieWriteConfig config) { + this.hoodieTable = hoodieTable; + this.fileSystemView = hoodieTable.getCompactedFileSystemView(); + this.commitTimeline = hoodieTable.getCompletedCommitTimeline(); this.config = config; - this.fs = fs; + this.fs = hoodieTable.getFs(); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java index 307f56023..bed5293e6 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java @@ -16,15 +16,14 @@ package com.uber.hoodie.io; -import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; -import com.uber.hoodie.common.table.view.ReadOptimizedTableView; -import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; +import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieIOException; +import com.uber.hoodie.table.HoodieTable; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -39,20 +38,19 @@ public abstract class HoodieIOHandle { protected final String commitTime; protected final HoodieWriteConfig config; protected final FileSystem fs; - protected final HoodieTableMetaClient metaClient; - protected final HoodieTimeline hoodieTimeline; - protected final TableFileSystemView fileSystemView; + protected final HoodieTable hoodieTable; + protected HoodieTimeline hoodieTimeline; + protected TableFileSystemView fileSystemView; protected final Schema schema; public HoodieIOHandle(HoodieWriteConfig config, String commitTime, - HoodieTableMetaClient metaClient) { + HoodieTable hoodieTable) { this.commitTime = commitTime; this.config = config; this.fs = FSUtils.getFs(); - this.metaClient = metaClient; - this.hoodieTimeline = - metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); - this.fileSystemView = new ReadOptimizedTableView(fs, metaClient); + this.hoodieTable = hoodieTable; + this.hoodieTimeline = hoodieTable.getCompletedCommitTimeline(); + this.fileSystemView = hoodieTable.getFileSystemView(); this.schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieInsertHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieInsertHandle.java index e1a787dad..6b799f833 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieInsertHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieInsertHandle.java @@ -16,17 +16,17 @@ package com.uber.hoodie.io; -import com.uber.hoodie.common.table.HoodieTableMetaClient; -import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieInsertException; import com.uber.hoodie.io.storage.HoodieStorageWriter; import com.uber.hoodie.io.storage.HoodieStorageWriterFactory; +import com.uber.hoodie.table.HoodieTable; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; @@ -45,8 +45,8 @@ public class HoodieInsertHandle extends HoodieIOH private int recordsWritten = 0; public HoodieInsertHandle(HoodieWriteConfig config, String commitTime, - HoodieTableMetaClient metadata, String partitionPath) { - super(config, commitTime, metadata); + HoodieTable hoodieTable, String partitionPath) { + super(config, commitTime, hoodieTable); this.status = new WriteStatus(); status.setFileId(UUID.randomUUID().toString()); status.setPartitionPath(partitionPath); @@ -54,7 +54,7 @@ public class HoodieInsertHandle extends HoodieIOH this.path = makeNewPath(partitionPath, TaskContext.getPartitionId(), status.getFileId()); try { this.storageWriter = - HoodieStorageWriterFactory.getStorageWriter(commitTime, path, metadata, config, schema); + HoodieStorageWriterFactory.getStorageWriter(commitTime, path, hoodieTable, config, schema); } catch (IOException e) { throw new HoodieInsertException( "Failed to initialize HoodieStorageWriter for path " + path, e); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java index 0c7fdfe2a..01933e73d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java @@ -16,7 +16,6 @@ package com.uber.hoodie.io; -import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieRecord; @@ -27,6 +26,7 @@ import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.exception.HoodieUpsertException; import com.uber.hoodie.io.storage.HoodieStorageWriter; import com.uber.hoodie.io.storage.HoodieStorageWriterFactory; +import com.uber.hoodie.table.HoodieTable; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; @@ -38,11 +38,12 @@ import java.io.IOException; import java.util.HashMap; import java.util.Iterator; -@SuppressWarnings("Duplicates") public class HoodieUpdateHandle extends HoodieIOHandle { +@SuppressWarnings("Duplicates") +public class HoodieUpdateHandle extends HoodieIOHandle { private static Logger logger = LogManager.getLogger(HoodieUpdateHandle.class); - private final WriteStatus writeStatus; - private final HashMap> keyToNewRecords; + private WriteStatus writeStatus; + private HashMap> keyToNewRecords; private HoodieStorageWriter storageWriter; private Path newFilePath; private Path oldFilePath; @@ -52,22 +53,23 @@ import java.util.Iterator; public HoodieUpdateHandle(HoodieWriteConfig config, String commitTime, - HoodieTableMetaClient metaClient, + HoodieTable hoodieTable, Iterator> recordItr, String fileId) { - super(config, commitTime, metaClient); - WriteStatus writeStatus = new WriteStatus(); - writeStatus.setStat(new HoodieWriteStat()); - this.writeStatus = writeStatus; - this.fileId = fileId; - this.keyToNewRecords = new HashMap<>(); - init(recordItr); + super(config, commitTime, hoodieTable); + init(fileId, recordItr); } /** * Load the new incoming records in a map, and extract the old file path. */ - private void init(Iterator> newRecordsItr) { + private void init(String fileId, Iterator> newRecordsItr) { + WriteStatus writeStatus = new WriteStatus(); + writeStatus.setStat(new HoodieWriteStat()); + this.writeStatus = writeStatus; + this.fileId = fileId; + this.keyToNewRecords = new HashMap<>(); + try { // Load the new records in a map while (newRecordsItr.hasNext()) { @@ -104,14 +106,14 @@ import java.util.Iterator; } // Create the writer for writing the new version file storageWriter = HoodieStorageWriterFactory - .getStorageWriter(commitTime, newFilePath, metaClient, config, schema); + .getStorageWriter(commitTime, newFilePath, hoodieTable, config, schema); } catch (Exception e) { logger.error("Error in update task at commit " + commitTime, e); writeStatus.setGlobalError(e); throw new HoodieUpsertException( "Failed to initialize HoodieUpdateHandle for FileId: " + fileId + " on commit " - + commitTime + " on HDFS path " + metaClient.getBasePath()); + + commitTime + " on path " + hoodieTable.getMetaClient().getBasePath(), e); } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java index bb6e54bdf..eb5db75ff 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java @@ -16,18 +16,15 @@ package com.uber.hoodie.io.compact; -import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; -import com.uber.hoodie.common.table.view.RealtimeTableView; import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.table.HoodieTable; import org.apache.spark.api.java.JavaSparkContext; import java.io.Serializable; -import java.nio.charset.StandardCharsets; import java.util.Date; -import java.util.Optional; /** * A HoodieCompactor runs compaction on a hoodie table @@ -38,14 +35,13 @@ public interface HoodieCompactor extends Serializable { * @throws Exception */ HoodieCompactionMetadata compact(JavaSparkContext jsc, final HoodieWriteConfig config, - HoodieTableMetaClient metaClient, RealtimeTableView fsView, - CompactionFilter compactionFilter) throws Exception; + HoodieTable hoodieTable, CompactionFilter compactionFilter) throws Exception; // Helper methods - default String startCompactionCommit(HoodieTableMetaClient metaClient) { + default String startCompactionCommit(HoodieTable hoodieTable) { String commitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()); - HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + HoodieActiveTimeline activeTimeline = hoodieTable.getActiveTimeline(); activeTimeline .createInflight(new HoodieInstant(true, HoodieTimeline.COMPACTION_ACTION, commitTime)); return commitTime; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index 8cf0c1a72..da787c13a 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -24,13 +24,14 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; -import com.uber.hoodie.common.table.view.RealtimeTableView; import com.uber.hoodie.common.util.AvroUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieCommitException; import com.uber.hoodie.table.HoodieCopyOnWriteTable; +import com.uber.hoodie.table.HoodieMergeOnReadTable; +import com.uber.hoodie.table.HoodieTable; import org.apache.avro.Schema; import org.apache.commons.collections.IteratorUtils; import org.apache.hadoop.fs.FileSystem; @@ -61,11 +62,12 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { @Override public HoodieCompactionMetadata compact(JavaSparkContext jsc, HoodieWriteConfig config, - HoodieTableMetaClient metaClient, RealtimeTableView fsView, + HoodieTable hoodieTable, CompactionFilter compactionFilter) throws Exception { // TODO - rollback any compactions in flight - String compactionCommit = startCompactionCommit(metaClient); + HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); + String compactionCommit = startCompactionCommit(hoodieTable); log.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommit); List partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath()); @@ -74,9 +76,9 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { List operations = jsc.parallelize(partitionPaths, partitionPaths.size()) .flatMap((FlatMapFunction) partitionPath -> { - FileSystem fileSystem = FSUtils.getFs(); - return fsView.groupLatestDataFileWithLogFiles(fileSystem, partitionPath) - .entrySet().stream() + return hoodieTable.getFileSystemView() + .groupLatestDataFileWithLogFiles(partitionPath).entrySet() + .stream() .map(s -> new CompactionOperation(s.getKey(), partitionPath, s.getValue())) .collect(Collectors.toList()); }).collect(); @@ -145,9 +147,15 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { List> readDeltaFilesInMemory = AvroUtils.loadFromFiles(fs, operation.getDeltaFilePaths(), schema); + if(readDeltaFilesInMemory.isEmpty()) { + return IteratorUtils.emptyIterator(); + } + + // Compacting is very similar to applying updates to existing file HoodieCopyOnWriteTable table = - new HoodieCopyOnWriteTable<>(commitTime, config, metaClient); - return table.handleUpdate(operation.getFileId(), readDeltaFilesInMemory.iterator()); + new HoodieCopyOnWriteTable<>(config, metaClient); + return table + .handleUpdate(commitTime, operation.getFileId(), readDeltaFilesInMemory.iterator()); } public boolean commitCompaction(String commitTime, HoodieTableMetaClient metaClient, diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieStorageWriterFactory.java b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieStorageWriterFactory.java index 3fedde19f..b9084dc61 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieStorageWriterFactory.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieStorageWriterFactory.java @@ -22,6 +22,7 @@ import com.uber.hoodie.avro.HoodieAvroWriteSupport; import com.uber.hoodie.common.BloomFilter; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.table.HoodieTable; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; @@ -32,7 +33,7 @@ import java.io.IOException; public class HoodieStorageWriterFactory { public static HoodieStorageWriter getStorageWriter( - String commitTime, Path path, HoodieTableMetaClient metaClient, HoodieWriteConfig config, Schema schema) + String commitTime, Path path, HoodieTable hoodieTable, HoodieWriteConfig config, Schema schema) throws IOException { //TODO - based on the metadata choose the implementation of HoodieStorageWriter // Currently only parquet is supported diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index d055d896e..b4e10ff8e 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -19,9 +19,7 @@ package com.uber.hoodie.table; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; -import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.timeline.HoodieInstant; -import com.uber.hoodie.common.table.view.ReadOptimizedTableView; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieCommitMetadata; @@ -70,6 +68,11 @@ import scala.Tuple2; * */ public class HoodieCopyOnWriteTable extends HoodieTable { + public HoodieCopyOnWriteTable(HoodieWriteConfig config, HoodieTableMetaClient metaClient) { + super(config, metaClient); + } + + // seed for random number generator. No particular significance, just makes testing deterministic private static final long RANDOM_NUMBER_SEED = 356374L; @@ -137,10 +140,6 @@ public class HoodieCopyOnWriteTable extends Hoodi } - public HoodieCopyOnWriteTable(String commitTime, HoodieWriteConfig config, HoodieTableMetaClient metaClient) { - super(commitTime, config, metaClient); - } - /** * Packs incoming records to be upserted, into buckets (1 bucket = 1 RDD partition) */ @@ -291,13 +290,11 @@ public class HoodieCopyOnWriteTable extends Hoodi FileSystem fs = FSUtils.getFs(); List smallFileLocations = new ArrayList<>(); - HoodieTimeline commitTimeline = - metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); - TableFileSystemView fileSystemView = new ReadOptimizedTableView(fs, metaClient); + HoodieTimeline commitTimeline = getCompletedCommitTimeline(); if (!commitTimeline.empty()) { // if we have some commits HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); - List allFiles = fileSystemView + List allFiles = getFileSystemView() .getLatestVersionInPartition(partitionPath, latestCommitTime.getTimestamp()) .collect(Collectors.toList()); @@ -399,11 +396,10 @@ public class HoodieCopyOnWriteTable extends Hoodi - public Iterator> handleUpdate(String fileLoc, Iterator> recordItr) + public Iterator> handleUpdate(String commitTime, String fileLoc, Iterator> recordItr) throws IOException { // these are updates - HoodieUpdateHandle upsertHandle = - new HoodieUpdateHandle<>(config, commitTime, metaClient, recordItr, fileLoc); + HoodieUpdateHandle upsertHandle = getUpdateHandle(commitTime, fileLoc, recordItr); if (upsertHandle.getOldFilePath() == null) { throw new HoodieUpsertException("Error in finding the old file path at commit " + commitTime +" at fileLoc: " + fileLoc); @@ -437,23 +433,27 @@ public class HoodieCopyOnWriteTable extends Hoodi return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator(); } - public Iterator> handleInsert(Iterator> recordItr) throws Exception { - return new LazyInsertIterable<>(recordItr, config, commitTime, metaClient); + protected HoodieUpdateHandle getUpdateHandle(String commitTime, String fileLoc, Iterator> recordItr) { + return new HoodieUpdateHandle<>(config, commitTime, this, recordItr, fileLoc); + } + + public Iterator> handleInsert(String commitTime, Iterator> recordItr) throws Exception { + return new LazyInsertIterable<>(recordItr, config, commitTime, this); } + @SuppressWarnings("unchecked") @Override - public Iterator> handleUpsertPartition(Integer partition, - Iterator recordItr, - Partitioner partitioner) { + public Iterator> handleUpsertPartition(String commitTime, Integer partition, + Iterator recordItr, Partitioner partitioner) { UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner; BucketInfo binfo = upsertPartitioner.getBucketInfo(partition); BucketType btype = binfo.bucketType; try { if (btype.equals(BucketType.INSERT)) { - return handleInsert(recordItr); + return handleInsert(commitTime, recordItr); } else if (btype.equals(BucketType.UPDATE)) { - return handleUpdate(binfo.fileLoc, recordItr); + return handleUpdate(commitTime, binfo.fileLoc, recordItr); } else { throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java new file mode 100644 index 000000000..4fbe608c7 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.table; + +import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.io.HoodieAppendHandle; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +/** + * Implementation of a more real-time read-optimized Hoodie Table where + * + * INSERTS - Same as HoodieCopyOnWriteTable - Produce new files, block aligned to desired size (or) + * Merge with the smallest existing file, to expand it + * + * UPDATES - Appends the changes to a rolling log file maintained per file Id. + * Compaction merges the log file into the base file. + * + */ +public class HoodieMergeOnReadTable extends HoodieCopyOnWriteTable { + private static Logger logger = LogManager.getLogger(HoodieMergeOnReadTable.class); + + public HoodieMergeOnReadTable(HoodieWriteConfig config, + HoodieTableMetaClient metaClient) { + super(config, metaClient); + } + + @Override + public Iterator> handleUpdate(String commitTime, String fileId, + Iterator> recordItr) throws IOException { + logger.info("Merging updates for commit " + commitTime + " for file " + fileId); + HoodieAppendHandle appendHandle = + new HoodieAppendHandle<>(config, commitTime, this, fileId, recordItr); + appendHandle.doAppend(); + appendHandle.close(); + return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())) + .iterator(); + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index 557882357..e7ab29adf 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -16,14 +16,20 @@ package com.uber.hoodie.table; +import com.google.common.collect.Sets; import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.TableFileSystemView; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; +import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; -import com.uber.hoodie.common.model.HoodieTableType; +import com.uber.hoodie.exception.HoodieCommitException; import com.uber.hoodie.exception.HoodieException; +import org.apache.hadoop.fs.FileSystem; import org.apache.spark.Partitioner; import java.io.Serializable; @@ -34,16 +40,10 @@ import java.util.List; * Abstract implementation of a HoodieTable */ public abstract class HoodieTable implements Serializable { - - protected final String commitTime; - protected final HoodieWriteConfig config; - protected final HoodieTableMetaClient metaClient; - protected HoodieTable(String commitTime, HoodieWriteConfig config, - HoodieTableMetaClient metaClient) { - this.commitTime = commitTime; + protected HoodieTable(HoodieWriteConfig config, HoodieTableMetaClient metaClient) { this.config = config; this.metaClient = metaClient; } @@ -73,6 +73,124 @@ public abstract class HoodieTable implements Seri */ public abstract boolean isWorkloadProfileNeeded(); + public HoodieWriteConfig getConfig() { + return config; + } + + public HoodieTableMetaClient getMetaClient() { + return metaClient; + } + + public FileSystem getFs() { + return metaClient.getFs(); + } + + /** + * Get the view of the file system for this table + * + * @return + */ + public TableFileSystemView getFileSystemView() { + return new HoodieTableFileSystemView(metaClient, getCompletedCommitTimeline()); + } + + /** + * Get the view of the file system for this table + * + * @return + */ + public TableFileSystemView getCompactedFileSystemView() { + return new HoodieTableFileSystemView(metaClient, getCompletedCompactionCommitTimeline()); + } + + /** + * Get only the completed (no-inflights) commit timeline + * @return + */ + public HoodieTimeline getCompletedCommitTimeline() { + return getCommitTimeline().filterCompletedInstants(); + } + + public HoodieActiveTimeline getActiveTimeline() { + return metaClient.getActiveTimeline(); + } + + /** + * Get the commit timeline visible for this table + * + * @return + */ + public HoodieTimeline getCommitTimeline() { + switch (metaClient.getTableType()) { + case COPY_ON_WRITE: + return getActiveTimeline().getCommitTimeline(); + case MERGE_ON_READ: + // We need to include the parquet files written out in delta commits + return getActiveTimeline().getTimelineOfActions( + Sets.newHashSet(HoodieActiveTimeline.COMPACTION_ACTION, + HoodieActiveTimeline.DELTA_COMMIT_ACTION)); + default: + throw new HoodieException("Unsupported table type :"+ metaClient.getTableType()); + } + } + + /** + * Get only the completed (no-inflights) compaction commit timeline + * @return + */ + public HoodieTimeline getCompletedCompactionCommitTimeline() { + return getCompactionCommitTimeline().filterCompletedInstants(); + } + + + /** + * Get the compacted commit timeline visible for this table + * + * @return + */ + public HoodieTimeline getCompactionCommitTimeline() { + switch (metaClient.getTableType()) { + case COPY_ON_WRITE: + return getActiveTimeline().getCommitTimeline(); + case MERGE_ON_READ: + // We need to include the parquet files written out in delta commits in tagging + return getActiveTimeline().getTimelineOfActions( + Sets.newHashSet(HoodieActiveTimeline.COMPACTION_ACTION)); + default: + throw new HoodieException("Unsupported table type :"+ metaClient.getTableType()); + } + } + + /** + * Gets the commit action type + * @return + */ + public String getCommitActionType() { + switch (metaClient.getTableType()) { + case COPY_ON_WRITE: + return HoodieActiveTimeline.COMMIT_ACTION; + case MERGE_ON_READ: + return HoodieActiveTimeline.DELTA_COMMIT_ACTION; + } + throw new HoodieCommitException( + "Could not commit on unknown storage type " + metaClient.getTableType()); + } + + /** + * Gets the action type for a compaction commit + * @return + */ + public String getCompactedCommitActionType() { + switch (metaClient.getTableType()) { + case COPY_ON_WRITE: + return HoodieTimeline.COMMIT_ACTION; + case MERGE_ON_READ: + return HoodieTimeline.COMPACTION_ACTION; + } + throw new HoodieException("Unsupported table type :"+ metaClient.getTableType()); + } + + /** * Perform the ultimate IO for a given upserted (RDD) partition @@ -81,28 +199,19 @@ public abstract class HoodieTable implements Seri * @param recordIterator * @param partitioner */ - public abstract Iterator> handleUpsertPartition(Integer partition, - Iterator> recordIterator, Partitioner partitioner); - - /** - * Perform the ultimate IO for a given inserted (RDD) partition - * - * @param partition - * @param recordIterator - * @param partitioner - */ - public abstract Iterator> handleInsertPartition(Integer partition, - Iterator> recordIterator, - Partitioner partitioner); + public abstract Iterator> handleUpsertPartition(String commitTime, + Integer partition, Iterator> recordIterator, Partitioner partitioner); - public static HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, - String commitTime, - HoodieWriteConfig config) { - if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) { - return new HoodieCopyOnWriteTable(commitTime, config, metaClient); - } else { - throw new HoodieException("Unsupported table type :"+ metaClient.getTableType()); + public static HoodieTable getHoodieTable( + HoodieTableMetaClient metaClient, HoodieWriteConfig config) { + switch (metaClient.getTableType()) { + case COPY_ON_WRITE: + return new HoodieCopyOnWriteTable<>(config, metaClient); + case MERGE_ON_READ: + return new HoodieMergeOnReadTable<>(config, metaClient); + default: + throw new HoodieException("Unsupported table type :" + metaClient.getTableType()); } } } diff --git a/hoodie-client/src/test/java/HoodieClientExample.java b/hoodie-client/src/test/java/HoodieClientExample.java index be7d3ad87..54aa10938 100644 --- a/hoodie-client/src/test/java/HoodieClientExample.java +++ b/hoodie-client/src/test/java/HoodieClientExample.java @@ -15,6 +15,8 @@ */ import com.uber.hoodie.HoodieWriteClient; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.model.HoodieRecord; @@ -28,6 +30,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.List; +import java.util.Properties; /** * Driver program that uses the Hoodie client with synthetic workload, and performs basic @@ -55,6 +58,11 @@ public class HoodieClientExample { .forTable("sample-table").withIndexConfig( HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .build(); + Properties properties = new Properties(); + properties.put(HoodieWriteConfig.TABLE_NAME, "sample-table"); + HoodieTableMetaClient + .initializePathAsHoodieDataset(FSUtils.getFs(), "file:///tmp/hoodie/sample-table", + properties); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); /** diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java index 37e5a4697..ed14a2f1c 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java @@ -30,7 +30,6 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.timeline.HoodieInstant; -import com.uber.hoodie.common.table.view.ReadOptimizedTableView; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.ParquetUtils; import com.uber.hoodie.config.HoodieWriteConfig; @@ -41,6 +40,7 @@ import com.uber.hoodie.exception.HoodieRollbackException; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.io.HoodieCleaner; +import com.uber.hoodie.table.HoodieTable; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -203,7 +203,10 @@ public class TestHoodieClient implements Serializable { assertEquals("Latest commit should be 001",readClient.latestCommit(), newCommitTime); assertEquals("Must contain 200 records", readClient.readCommit(newCommitTime).count(), records.size()); // Should have 100 records in table (check using Index), all in locations marked at commit - List taggedRecords = index.tagLocation(jsc.parallelize(records, 1), new HoodieTableMetaClient(fs, basePath)).collect(); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); + + List taggedRecords = index.tagLocation(jsc.parallelize(records, 1), table).collect(); checkTaggedRecords(taggedRecords, "001"); /** @@ -228,8 +231,11 @@ public class TestHoodieClient implements Serializable { assertEquals("Expecting two commits.", readClient.listCommitsSince("000").size(), 2); assertEquals("Latest commit should be 004",readClient.latestCommit(), newCommitTime); + metaClient = new HoodieTableMetaClient(fs, basePath); + table = HoodieTable.getHoodieTable(metaClient, getConfig()); + // Index should be able to locate all updates in correct locations. - taggedRecords = index.tagLocation(jsc.parallelize(dedupedRecords, 1), new HoodieTableMetaClient(fs, basePath)).collect(); + taggedRecords = index.tagLocation(jsc.parallelize(dedupedRecords, 1), table).collect(); checkTaggedRecords(taggedRecords, "004"); // Check the entire dataset has 100 records still @@ -276,7 +282,9 @@ public class TestHoodieClient implements Serializable { assertEquals("Expecting a single commit.", new HoodieReadClient(jsc, basePath).listCommitsSince("000").size(), 1); // Should have 100 records in table (check using Index), all in locations marked at commit - List taggedRecords = index.tagLocation(jsc.parallelize(records, 1), new HoodieTableMetaClient(fs, basePath)).collect(); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); + List taggedRecords = index.tagLocation(jsc.parallelize(records, 1), table).collect(); checkTaggedRecords(taggedRecords, newCommitTime); // Keep doing some writes and clean inline. Make sure we have expected number of files remaining. @@ -291,10 +299,10 @@ public class TestHoodieClient implements Serializable { assertNoWriteErrors(statuses); HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath); - HoodieTimeline timeline = - metadata.getActiveTimeline().getCommitTimeline(); + table = HoodieTable.getHoodieTable(metadata, getConfig()); + HoodieTimeline timeline = table.getCommitTimeline(); - TableFileSystemView fsView = new ReadOptimizedTableView(fs, metadata); + TableFileSystemView fsView = table.getFileSystemView(); // Need to ensure the following for (String partitionPath : dataGen.getPartitionPaths()) { // compute all the versions of all files, from time 0 @@ -358,7 +366,10 @@ public class TestHoodieClient implements Serializable { // verify that there is a commit assertEquals("Expecting a single commit.", new HoodieReadClient(jsc, basePath).listCommitsSince("000").size(), 1); // Should have 100 records in table (check using Index), all in locations marked at commit - List taggedRecords = index.tagLocation(jsc.parallelize(records, 1), new HoodieTableMetaClient(fs, basePath)).collect(); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); + + List taggedRecords = index.tagLocation(jsc.parallelize(records, 1), table).collect(); checkTaggedRecords(taggedRecords, newCommitTime); // Keep doing some writes and clean inline. Make sure we have expected number of files remaining. @@ -372,7 +383,8 @@ public class TestHoodieClient implements Serializable { assertNoWriteErrors(statuses); HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath); - HoodieTimeline activeTimeline = metadata.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieTable table1 = HoodieTable.getHoodieTable(metadata, cfg); + HoodieTimeline activeTimeline = table1.getCompletedCommitTimeline(); Optional earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits - 1); Set acceptableCommits = @@ -384,7 +396,7 @@ public class TestHoodieClient implements Serializable { acceptableCommits.add(earliestRetainedCommit.get()); } - TableFileSystemView fsView = new ReadOptimizedTableView(fs, metadata); + TableFileSystemView fsView = table1.getFileSystemView(); // Need to ensure the following for (String partitionPath : dataGen.getPartitionPaths()) { List> fileVersions = fsView.getEveryVersionInPartition(partitionPath).collect(Collectors.toList()); @@ -637,7 +649,8 @@ public class TestHoodieClient implements Serializable { assertEquals("2 files needs to be committed.", 2, statuses.size()); HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath); - TableFileSystemView fileSystemView = new ReadOptimizedTableView(fs, metadata); + HoodieTable table = HoodieTable.getHoodieTable(metadata, config); + TableFileSystemView fileSystemView = table.getFileSystemView(); List files = fileSystemView.getLatestVersionInPartition(TEST_PARTITION_PATH, commitTime3).collect( Collectors.toList()); int numTotalInsertsInCommit3 = 0; diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java new file mode 100644 index 000000000..89ab04cc9 --- /dev/null +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java @@ -0,0 +1,208 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie; + +import com.uber.hoodie.common.HoodieTestDataGenerator; +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieKey; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieTableType; +import com.uber.hoodie.common.model.HoodieTestUtils; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.TableFileSystemView; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.config.HoodieCompactionConfig; +import com.uber.hoodie.config.HoodieIndexConfig; +import com.uber.hoodie.config.HoodieStorageConfig; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.index.HoodieIndex; +import com.uber.hoodie.io.compact.CompactionFilter; +import com.uber.hoodie.io.compact.HoodieCompactor; +import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; +import com.uber.hoodie.table.HoodieTable; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Stream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestMergeOnReadTable { + private transient JavaSparkContext jsc = null; + private transient SQLContext sqlContext; + private String basePath = null; + private HoodieCompactor compactor; + private FileSystem fs; + + @Before + public void init() throws IOException { + this.fs = FSUtils.getFs(); + + // Initialize a local spark env + SparkConf sparkConf = + new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .setAppName("TestHoodieCompactor").setMaster("local[4]"); + jsc = new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf)); + + // Create a temp folder as the base path + TemporaryFolder folder = new TemporaryFolder(); + folder.create(); + basePath = folder.getRoot().getAbsolutePath(); + HoodieTestUtils.initTableType(basePath, HoodieTableType.MERGE_ON_READ); + + compactor = new HoodieRealtimeTableCompactor(); + + //SQLContext stuff + sqlContext = new SQLContext(jsc); + } + + @After + public void clean() { + if (basePath != null) { + new File(basePath).delete(); + } + if (jsc != null) { + jsc.stop(); + } + } + + + @Test + public void testSimpleInsertAndUpdate() throws Exception { + HoodieWriteConfig cfg = getConfig(); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + + /** + * Write 1 (only inserts) + */ + String newCommitTime = "001"; + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + List records = dataGen.generateInserts(newCommitTime, 200); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + List statuses = client.upsert(writeRecords, newCommitTime).collect(); + assertNoWriteErrors(statuses); + + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath()); + HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg); + + Optional deltaCommit = + metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); + assertTrue(deltaCommit.isPresent()); + assertEquals("Delta commit should be 001", "001", deltaCommit.get().getTimestamp()); + + Optional commit = + metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + assertFalse(commit.isPresent()); + + TableFileSystemView fsView = hoodieTable.getCompactedFileSystemView(); + FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); + Stream dataFilesToRead = fsView.getLatestVersions(allFiles); + assertTrue(!dataFilesToRead.findAny().isPresent()); + + fsView = hoodieTable.getFileSystemView(); + dataFilesToRead = fsView.getLatestVersions(allFiles); + assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit", + dataFilesToRead.findAny().isPresent()); + + /** + * Write 2 (updates) + */ + newCommitTime = "004"; + records = dataGen.generateUpdates(newCommitTime, 100); + Map recordsMap = new HashMap<>(); + for (HoodieRecord rec : records) { + if (!recordsMap.containsKey(rec.getKey())) { + recordsMap.put(rec.getKey(), rec); + } + } + + + statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath()); + deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant(); + assertTrue(deltaCommit.isPresent()); + assertEquals("Latest Delta commit should be 004", "004", deltaCommit.get().getTimestamp()); + + commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + assertFalse(commit.isPresent()); + + + HoodieCompactor compactor = new HoodieRealtimeTableCompactor(); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); + + compactor.compact(jsc, getConfig(), table, CompactionFilter.allowAll()); + + metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath()); + allFiles = HoodieTestUtils.listAllDataFilesInPath(fs, cfg.getBasePath()); + dataFilesToRead = fsView.getLatestVersions(allFiles); + assertTrue(dataFilesToRead.findAny().isPresent()); + + // verify that there is a commit + HoodieReadClient readClient = new HoodieReadClient(jsc, basePath, sqlContext); + assertEquals("Expecting a single commit.", 1, readClient.listCommitsSince("000").size()); + String latestCompactionCommitTime = readClient.latestCommit(); + assertTrue(metaClient.getActiveTimeline() + .compareTimestamps("000", latestCompactionCommitTime, HoodieTimeline.LESSER)); + assertEquals("Must contain 200 records", 200, readClient.readSince("000").count()); + } + + private HoodieWriteConfig getConfig() { + return getConfigBuilder().build(); + } + + private HoodieWriteConfig.Builder getConfigBuilder() { + return HoodieWriteConfig.newBuilder().withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + .withCompactionConfig( + HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) + + .forTable("test-trip-table").withIndexConfig( + HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()); + } + + private void assertNoWriteErrors(List statuses) { + // Verify there are no errors + for (WriteStatus status : statuses) { + assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors()); + } + } + + + +} diff --git a/hoodie-client/src/test/java/com/uber/hoodie/func/TestUpdateMapFunction.java b/hoodie-client/src/test/java/com/uber/hoodie/func/TestUpdateMapFunction.java index 093dd085d..955865e1f 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/func/TestUpdateMapFunction.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/func/TestUpdateMapFunction.java @@ -57,7 +57,7 @@ public class TestUpdateMapFunction { // Create a bunch of records with a old version of schema HoodieWriteConfig config = makeHoodieClientConfig("/exampleSchema.txt"); HoodieTableMetaClient metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); - HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable("100", config, metadata); + HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, metadata); String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; @@ -78,7 +78,7 @@ public class TestUpdateMapFunction { records.add( new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3)); - Iterator> insertResult = table.handleInsert(records.iterator()); + Iterator> insertResult = table.handleInsert("100", records.iterator()); Path commitFile = new Path(config.getBasePath() + "/.hoodie/" + HoodieTimeline.makeCommitFileName("100")); FSUtils.getFs().create(commitFile); @@ -91,7 +91,7 @@ public class TestUpdateMapFunction { System.out.println(fileId); - table = new HoodieCopyOnWriteTable("101", config, metadata); + table = new HoodieCopyOnWriteTable(config, metadata); // New content with values for the newly added field recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12,\"added_field\":1}"; @@ -104,7 +104,7 @@ public class TestUpdateMapFunction { records.add(record1); try { - table.handleUpdate(fileId, records.iterator()); + table.handleUpdate("101", fileId, records.iterator()); } catch (ClassCastException e) { fail( "UpdateFunction could not read records written with exampleSchema.txt using the exampleEvolvedSchema.txt"); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHoodieBloomIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHoodieBloomIndex.java index faa866ec6..76bf3aa19 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHoodieBloomIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHoodieBloomIndex.java @@ -30,6 +30,7 @@ import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; +import com.uber.hoodie.table.HoodieTable; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.commons.io.IOUtils; @@ -127,7 +128,8 @@ public class TestHoodieBloomIndex { new File(basePath + "/2015/03/12/4_0_20150312101010.parquet").createNewFile(); List partitions = Arrays.asList("2016/01/21", "2016/04/01", "2015/03/12"); HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath); - JavaPairRDD rdd = index.loadInvolvedFiles(partitions, metadata); + HoodieTable table = HoodieTable.getHoodieTable(metadata, config); + JavaPairRDD rdd = index.loadInvolvedFiles(partitions, table); // Still 0, as no valid commit assertEquals(rdd.count(), 0); @@ -136,7 +138,7 @@ public class TestHoodieBloomIndex { new File(basePath + "/.hoodie/20160401010101.commit").createNewFile(); new File(basePath + "/.hoodie/20150312101010.commit").createNewFile(); metadata = new HoodieTableMetaClient(fs, basePath); - rdd = index.loadInvolvedFiles(partitions, metadata); + rdd = index.loadInvolvedFiles(partitions, table); final List> filesList = rdd.collect(); assertEquals(filesList.size(), 4); @@ -214,12 +216,13 @@ public class TestHoodieBloomIndex { // Also create the metadata and config HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath); HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); + HoodieTable table = HoodieTable.getHoodieTable(metadata, config); // Let's tag HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config, jsc); try { - bloomIndex.tagLocation(recordRDD, metadata); + bloomIndex.tagLocation(recordRDD, table); } catch (IllegalArgumentException e) { fail("EmptyRDD should not result in IllegalArgumentException: Positive number of slices required"); } @@ -250,10 +253,11 @@ public class TestHoodieBloomIndex { // Also create the metadata and config HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath); HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); + HoodieTable table = HoodieTable.getHoodieTable(metadata, config); // Let's tag HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config, jsc); - JavaRDD taggedRecordRDD = bloomIndex.tagLocation(recordRDD, metadata); + JavaRDD taggedRecordRDD = bloomIndex.tagLocation(recordRDD, table); // Should not find any files for (HoodieRecord record : taggedRecordRDD.collect()) { @@ -267,7 +271,9 @@ public class TestHoodieBloomIndex { // We do the tag again metadata = new HoodieTableMetaClient(fs, basePath); - taggedRecordRDD = bloomIndex.tagLocation(recordRDD, metadata); + table = HoodieTable.getHoodieTable(metadata, config); + + taggedRecordRDD = bloomIndex.tagLocation(recordRDD, table); // Check results for (HoodieRecord record : taggedRecordRDD.collect()) { @@ -311,10 +317,11 @@ public class TestHoodieBloomIndex { // Also create the metadata and config HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath); HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); + HoodieTable table = HoodieTable.getHoodieTable(metadata, config); // Let's tag HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config, jsc); - JavaPairRDD> taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, metadata); + JavaPairRDD> taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, table); // Should not find any files for (Tuple2> record : taggedRecordRDD.collect()) { @@ -328,7 +335,8 @@ public class TestHoodieBloomIndex { // We do the tag again metadata = new HoodieTableMetaClient(fs, basePath); - taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, metadata); + table = HoodieTable.getHoodieTable(metadata, config); + taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, table); // Check results for (Tuple2> record : taggedRecordRDD.collect()) { @@ -377,8 +385,10 @@ public class TestHoodieBloomIndex { JavaRDD recordRDD = jsc.parallelize(Arrays.asList(record1, record2)); HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath); HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); + HoodieTable table = HoodieTable.getHoodieTable(metadata, config); + HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config, jsc); - JavaRDD taggedRecordRDD = bloomIndex.tagLocation(recordRDD, metadata); + JavaRDD taggedRecordRDD = bloomIndex.tagLocation(recordRDD, table); // Check results for (HoodieRecord record : taggedRecordRDD.collect()) { diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCleaner.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCleaner.java index b7d453ba4..8ee4c8863 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCleaner.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCleaner.java @@ -22,6 +22,7 @@ import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieCompactionConfig; +import com.uber.hoodie.table.HoodieTable; import org.junit.Before; import org.junit.Test; import java.io.IOException; @@ -56,7 +57,9 @@ public class TestHoodieCleaner { String file1P1C0 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[1], "000"); HoodieTableMetaClient metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); - HoodieCleaner cleaner = new HoodieCleaner(metadata, config, FSUtils.getFs()); + HoodieTable table = HoodieTable.getHoodieTable(metadata, config); + + HoodieCleaner cleaner = new HoodieCleaner(table, config); assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[0])); assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[1])); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "000", file1P0C0)); @@ -70,7 +73,9 @@ public class TestHoodieCleaner { HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "001", file1P0C0); // update HoodieTestUtils.createDataFile(basePath, partitionPaths[1], "001", file1P1C0); // update metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); - cleaner = new HoodieCleaner(metadata, config, FSUtils.getFs()); + table = HoodieTable.getHoodieTable(metadata, config); + + cleaner = new HoodieCleaner(table, config); assertEquals("Must clean 1 file" , 1, cleaner.clean(partitionPaths[0])); assertEquals("Must clean 1 file" , 1, cleaner.clean(partitionPaths[1])); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file2P0C1)); @@ -85,7 +90,9 @@ public class TestHoodieCleaner { HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "002", file2P0C1); // update String file3P0C2 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "002"); metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); - cleaner = new HoodieCleaner(metadata, config, FSUtils.getFs()); + table = HoodieTable.getHoodieTable(metadata, config); + + cleaner = new HoodieCleaner(table, config); assertEquals("Must clean two files" , 2, cleaner.clean(partitionPaths[0])); assertFalse(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file1P0C0)); assertFalse(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file2P0C1)); @@ -113,7 +120,9 @@ public class TestHoodieCleaner { String file1P1C0 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[1], "000"); HoodieTableMetaClient metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); - HoodieCleaner cleaner = new HoodieCleaner(metadata, config, FSUtils.getFs()); + HoodieTable table = HoodieTable.getHoodieTable(metadata, config); + + HoodieCleaner cleaner = new HoodieCleaner(table, config); assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[0])); assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[1])); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "000", file1P0C0)); @@ -127,7 +136,9 @@ public class TestHoodieCleaner { HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "001", file1P0C0); // update HoodieTestUtils.createDataFile(basePath, partitionPaths[1], "001", file1P1C0); // update metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); - cleaner = new HoodieCleaner(metadata, config, FSUtils.getFs()); + table = HoodieTable.getHoodieTable(metadata, config); + + cleaner = new HoodieCleaner(table, config); assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[0])); assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[1])); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file2P0C1)); @@ -142,7 +153,9 @@ public class TestHoodieCleaner { HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "002", file2P0C1); // update String file3P0C2 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "002"); metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); - cleaner = new HoodieCleaner(metadata, config, FSUtils.getFs()); + table = HoodieTable.getHoodieTable(metadata, config); + + cleaner = new HoodieCleaner(table, config); assertEquals( "Must not clean any file. We have to keep 1 version before the latest commit time to keep", 0, cleaner.clean(partitionPaths[0])); @@ -156,7 +169,9 @@ public class TestHoodieCleaner { HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "003", file2P0C1); // update String file4P0C3 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "003"); metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); - cleaner = new HoodieCleaner(metadata, config, FSUtils.getFs()); + table = HoodieTable.getHoodieTable(metadata, config); + + cleaner = new HoodieCleaner(table, config); assertEquals( "Must not clean one old file", 1, cleaner.clean(partitionPaths[0])); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java index b0583c6d5..11b9f71c5 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java @@ -28,8 +28,6 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.log.HoodieLogFile; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; -import com.uber.hoodie.common.table.timeline.HoodieInstant; -import com.uber.hoodie.common.table.view.RealtimeTableView; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieIndexConfig; @@ -41,6 +39,7 @@ import com.uber.hoodie.io.compact.CompactionFilter; import com.uber.hoodie.io.compact.HoodieCompactionMetadata; import com.uber.hoodie.io.compact.HoodieCompactor; import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; +import com.uber.hoodie.table.HoodieTable; import org.apache.hadoop.fs.FileSystem; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -112,24 +111,27 @@ public class TestHoodieCompactor { HoodieTestUtils.initTableType(basePath, HoodieTableType.COPY_ON_WRITE); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(FSUtils.getFs(), basePath); - RealtimeTableView fsView = new RealtimeTableView(FSUtils.getFs(), metaClient); - compactor.compact(jsc, getConfig(), metaClient, fsView, CompactionFilter.allowAll()); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); + + compactor.compact(jsc, getConfig(), table, CompactionFilter.allowAll()); } @Test public void testCompactionEmpty() throws Exception { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(FSUtils.getFs(), basePath); - RealtimeTableView fsView = new RealtimeTableView(FSUtils.getFs(), metaClient); HoodieWriteConfig config = getConfig(); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config); HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + String newCommitTime = writeClient.startCommit(); List records = dataGen.generateInserts(newCommitTime, 100); JavaRDD recordsRDD = jsc.parallelize(records, 1); writeClient.insert(recordsRDD, newCommitTime).collect(); HoodieCompactionMetadata result = - compactor.compact(jsc, getConfig(), metaClient, fsView, CompactionFilter.allowAll()); - assertTrue("If there is nothing to compact, result wull be null", result == null); + compactor.compact(jsc, getConfig(), table, CompactionFilter.allowAll()); + assertTrue("If there is nothing to compact, result will be empty", + result.getFileIdAndFullPaths().isEmpty()); } @Test @@ -145,11 +147,13 @@ public class TestHoodieCompactor { // Update all the 100 records HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config); + newCommitTime = "101"; List updatedRecords = dataGen.generateUpdates(newCommitTime, records); JavaRDD updatedRecordsRDD = jsc.parallelize(updatedRecords, 1); HoodieIndex index = new HoodieBloomIndex<>(config, jsc); - updatedRecords = index.tagLocation(updatedRecordsRDD, metaClient).collect(); + updatedRecords = index.tagLocation(updatedRecordsRDD, table).collect(); // Write them to corresponding avro logfiles HoodieTestUtils @@ -158,10 +162,10 @@ public class TestHoodieCompactor { // Verify that all data file has one log file metaClient = new HoodieTableMetaClient(fs, basePath); - RealtimeTableView fsView = new RealtimeTableView(fs, metaClient); + table = HoodieTable.getHoodieTable(metaClient, config); for (String partitionPath : dataGen.getPartitionPaths()) { Map> groupedLogFiles = - fsView.groupLatestDataFileWithLogFiles(fs, partitionPath); + table.getFileSystemView().groupLatestDataFileWithLogFiles(partitionPath); for (List logFiles : groupedLogFiles.values()) { assertEquals("There should be 1 log file written for every data file", 1, logFiles.size()); @@ -170,13 +174,14 @@ public class TestHoodieCompactor { // Do a compaction metaClient = new HoodieTableMetaClient(fs, basePath); - fsView = new RealtimeTableView(fs, metaClient); + table = HoodieTable.getHoodieTable(metaClient, config); + HoodieCompactionMetadata result = - compactor.compact(jsc, getConfig(), metaClient, fsView, CompactionFilter.allowAll()); + compactor.compact(jsc, getConfig(), table, CompactionFilter.allowAll()); // Verify that recently written compacted data file has no log file metaClient = new HoodieTableMetaClient(fs, basePath); - fsView = new RealtimeTableView(fs, metaClient); + table = HoodieTable.getHoodieTable(metaClient, config); HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); assertTrue("Compaction commit should be > than last insert", timeline @@ -185,7 +190,7 @@ public class TestHoodieCompactor { for (String partitionPath : dataGen.getPartitionPaths()) { Map> groupedLogFiles = - fsView.groupLatestDataFileWithLogFiles(fs, partitionPath); + table.getFileSystemView().groupLatestDataFileWithLogFiles(partitionPath); for (List logFiles : groupedLogFiles.values()) { assertTrue( "After compaction there should be no log files visiable on a Realtime view", diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java index 9201fc061..f8ab264e3 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java @@ -89,8 +89,10 @@ public class TestCopyOnWriteTable { String commitTime = HoodieTestUtils.makeNewCommitTime(); HoodieWriteConfig config = makeHoodieClientConfig(); - HoodieInsertHandle io = new HoodieInsertHandle(config, commitTime, - new HoodieTableMetaClient(FSUtils.getFs(), basePath), partitionPath); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(FSUtils.getFs(), basePath); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config); + + HoodieInsertHandle io = new HoodieInsertHandle(config, commitTime, table, partitionPath); Path newPath = io.makeNewPath(record.getPartitionPath(), unitNumber, fileName); assertTrue(newPath.toString().equals(this.basePath + "/" + partitionPath + "/" + FSUtils .makeDataFileName(commitTime, unitNumber, fileName))); @@ -113,8 +115,9 @@ public class TestCopyOnWriteTable { HoodieWriteConfig config = makeHoodieClientConfig(); String firstCommitTime = HoodieTestUtils.makeNewCommitTime(); HoodieTableMetaClient metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); + String partitionPath = "/2016/01/31"; - HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(firstCommitTime, config, metadata); + HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, metadata); // Get some records belong to the same partition (2016/01/31) String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; @@ -131,7 +134,7 @@ public class TestCopyOnWriteTable { records.add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3)); // Insert new records - HoodieClientTestUtils.collectStatuses(table.handleInsert(records.iterator())); + HoodieClientTestUtils.collectStatuses(table.handleInsert(firstCommitTime, records.iterator())); // We should have a parquet file generated (TODO: better control # files after we revise AvroParquetIO) File parquetFile = null; for (File file : new File(this.basePath + partitionPath).listFiles()) { @@ -175,8 +178,8 @@ public class TestCopyOnWriteTable { Thread.sleep(1000); String newCommitTime = HoodieTestUtils.makeNewCommitTime(); metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); - table = new HoodieCopyOnWriteTable(newCommitTime, config, metadata); - Iterator> iter = table.handleUpdate(updatedRecord1.getCurrentLocation().getFileId(), updatedRecords.iterator()); + table = new HoodieCopyOnWriteTable(config, metadata); + Iterator> iter = table.handleUpdate(newCommitTime, updatedRecord1.getCurrentLocation().getFileId(), updatedRecords.iterator()); // Check the updated file File updatedParquetFile = null; @@ -242,7 +245,7 @@ public class TestCopyOnWriteTable { String commitTime = HoodieTestUtils.makeNewCommitTime(); FileSystem fs = FSUtils.getFs(); HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath); - HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(commitTime, config, metadata); + HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, metadata); // Write a few records, and get atleast one file // 10 records for partition 1, 1 record for partition 2. @@ -250,7 +253,7 @@ public class TestCopyOnWriteTable { records.addAll(newHoodieRecords(1, "2016-02-01T03:16:41.415Z")); // Simulate crash after first file - List statuses = HoodieClientTestUtils.collectStatuses(table.handleInsert(records.iterator())); + List statuses = HoodieClientTestUtils.collectStatuses(table.handleInsert(commitTime, records.iterator())); WriteStatus status = statuses.get(0); Path partialFile = new Path(String.format("%s/%s/%s", basePath, @@ -263,7 +266,7 @@ public class TestCopyOnWriteTable { records = newHoodieRecords(10, "2016-01-31T03:16:41.415Z"); records.addAll(newHoodieRecords(1, "2016-02-01T03:16:41.415Z")); - statuses = HoodieClientTestUtils.collectStatuses(table.handleInsert(records.iterator())); + statuses = HoodieClientTestUtils.collectStatuses(table.handleInsert(commitTime, records.iterator())); status = statuses.get(0); Path retriedFIle = new Path(String.format("%s/%s/%s", @@ -280,7 +283,7 @@ public class TestCopyOnWriteTable { HoodieWriteConfig config = makeHoodieClientConfig(); String commitTime = HoodieTestUtils.makeNewCommitTime(); HoodieTableMetaClient metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); - HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(commitTime, config, metadata); + HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, metadata); // Case 1: // 10 records for partition 1, 1 record for partition 2. @@ -288,7 +291,7 @@ public class TestCopyOnWriteTable { records.addAll(newHoodieRecords(1, "2016-02-01T03:16:41.415Z")); // Insert new records - List returnedStatuses = HoodieClientTestUtils.collectStatuses(table.handleInsert(records.iterator())); + List returnedStatuses = HoodieClientTestUtils.collectStatuses(table.handleInsert(commitTime, records.iterator())); // TODO: check the actual files and make sure 11 records, total were written. @@ -307,7 +310,7 @@ public class TestCopyOnWriteTable { records.addAll(newHoodieRecords(1, "2016-02-02T03:16:41.415Z")); // Insert new records - returnedStatuses = HoodieClientTestUtils.collectStatuses(table.handleInsert(records.iterator())); + returnedStatuses = HoodieClientTestUtils.collectStatuses(table.handleInsert(commitTime, records.iterator())); assertEquals(3, returnedStatuses.size()); assertEquals("2016/01/31", returnedStatuses.get(0).getPartitionPath()); @@ -327,7 +330,7 @@ public class TestCopyOnWriteTable { .parquetPageSize(64 * 1024).build()).build(); String commitTime = HoodieTestUtils.makeNewCommitTime(); HoodieTableMetaClient metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); - HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(commitTime, config, metadata); + HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, metadata); List records = new ArrayList<>(); // Approx 1150 records are written for block size of 64KB @@ -339,7 +342,7 @@ public class TestCopyOnWriteTable { } // Insert new records - HoodieClientTestUtils.collectStatuses(table.handleInsert(records.iterator())); + HoodieClientTestUtils.collectStatuses(table.handleInsert(commitTime, records.iterator())); // Check the updated file int counts = 0; @@ -371,7 +374,7 @@ public class TestCopyOnWriteTable { HoodieClientTestUtils.fakeDataFile(basePath, TEST_PARTITION_PATH, "001", "file1", fileSize); HoodieTableMetaClient metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); - HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable("001", config, metadata); + HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, metadata); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{TEST_PARTITION_PATH}); List insertRecords = dataGenerator.generateInserts("001", numInserts); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieDeltaWriteStat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieDeltaWriteStat.java new file mode 100644 index 000000000..2f3ee88bd --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieDeltaWriteStat.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.model; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +/** + * Statistics about a single Hoodie delta log operation. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class HoodieDeltaWriteStat extends HoodieWriteStat { + + private int logVersion; + private long logOffset; + + public void setLogVersion(int logVersion) { + this.logVersion = logVersion; + } + + public int getLogVersion() { + return logVersion; + } + + public void setLogOffset(long logOffset) { + this.logOffset = logOffset; + } + + public long getLogOffset() { + return logOffset; + } +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java index 761acece8..67f36121b 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java @@ -39,16 +39,20 @@ import java.util.stream.Stream; */ public interface HoodieTimeline extends Serializable { String COMMIT_ACTION = "commit"; + String DELTA_COMMIT_ACTION = "deltacommit"; String CLEAN_ACTION = "clean"; String SAVEPOINT_ACTION = "savepoint"; String COMPACTION_ACTION = "compaction"; String INFLIGHT_EXTENSION = ".inflight"; + String COMMIT_EXTENSION = "." + COMMIT_ACTION; + String DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION; String CLEAN_EXTENSION = "." + CLEAN_ACTION; String SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION; String COMPACTION_EXTENSION = "." + COMPACTION_ACTION; //this is to preserve backwards compatibility on commit in-flight filenames String INFLIGHT_COMMIT_EXTENSION = INFLIGHT_EXTENSION; + String INFLIGHT_DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_CLEAN_EXTENSION = "." + CLEAN_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_COMPACTION_EXTENSION = "." + COMPACTION_ACTION + INFLIGHT_EXTENSION; @@ -203,6 +207,14 @@ public interface HoodieTimeline extends Serializable { return commitTime + HoodieTimeline.COMPACTION_EXTENSION; } + static String makeInflightDeltaFileName(String commitTime) { + return commitTime + HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION; + } + + static String makeDeltaFileName(String commitTime) { + return commitTime + HoodieTimeline.DELTA_COMMIT_EXTENSION; + } + static String getCommitFromCommitFile(String commitFileName) { return commitFileName.split("\\.")[0]; } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java index fb3004e4a..7814c3c8e 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java @@ -18,9 +18,13 @@ package com.uber.hoodie.common.table; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.table.log.HoodieLogFile; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.stream.Stream; /** @@ -29,6 +33,7 @@ import java.util.stream.Stream; *

* ReadOptimizedView - Lets queries run only on organized columnar data files at the expense of latency * WriteOptimizedView - Lets queries run on columnar data as well as delta files (sequential) at the expense of query execution time + * * @since 0.3.0 */ public interface TableFileSystemView { @@ -90,4 +95,14 @@ public interface TableFileSystemView { * @return */ Stream getLatestVersions(FileStatus[] fileStatuses); + + /** + * Group data files with corresponding delta files + * @param fs + * @param partitionPath + * @return + * @throws IOException + */ + Map> groupLatestDataFileWithLogFiles(String partitionPath) throws IOException; + } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogAppender.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogAppender.java index afa6a3908..6ad55b74f 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogAppender.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogAppender.java @@ -20,6 +20,7 @@ import com.uber.hoodie.common.table.log.avro.AvroLogAppender; import com.uber.hoodie.common.table.log.avro.RollingAvroLogAppender; import java.io.IOException; +import java.util.Iterator; import java.util.List; /** @@ -36,7 +37,7 @@ public interface HoodieLogAppender { * @param records * @throws IOException */ - void append(List records) throws IOException, InterruptedException; + void append(Iterator records) throws IOException, InterruptedException; /** * Syncs the log manually if auto-flush is not set in HoodieLogAppendConfig. If auto-flush is set diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/AvroLogAppender.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/AvroLogAppender.java index ef29ac8ef..b46f1e39e 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/AvroLogAppender.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/AvroLogAppender.java @@ -37,6 +37,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; +import java.util.Iterator; import java.util.List; /** @@ -99,8 +100,8 @@ public class AvroLogAppender implements HoodieLogAppender { } } - public void append(List records) throws IOException { - records.forEach(r -> { + public void append(Iterator records) throws IOException { + records.forEachRemaining(r -> { try { writer.append(r); } catch (IOException e) { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/RollingAvroLogAppender.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/RollingAvroLogAppender.java index ce82e8bba..07587939d 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/RollingAvroLogAppender.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/RollingAvroLogAppender.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.io.IOException; +import java.util.Iterator; import java.util.List; /** @@ -64,8 +65,8 @@ public class RollingAvroLogAppender implements HoodieLogAppender return logWriter.getCurrentSize(); } - public void append(List records) throws IOException, InterruptedException { - LOG.info("Appending " + records.size() + " records to " + config.getLogFile()); + public void append(Iterator records) throws IOException, InterruptedException { + LOG.info("Appending records to " + config.getLogFile()); rollOverIfNeeded(); Preconditions.checkArgument(logWriter != null); logWriter.append(records); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java index d1654974a..1ffed5930 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java @@ -16,6 +16,7 @@ package com.uber.hoodie.common.table.timeline; +import com.google.common.collect.Sets; import com.google.common.io.Closeables; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; @@ -85,8 +86,9 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { public HoodieActiveTimeline(FileSystem fs, String metaPath) { this(fs, metaPath, - new String[] {COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, SAVEPOINT_EXTENSION, COMPACTION_EXTENSION, - INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION, COMPACTION_EXTENSION}); + new String[] {COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION, + INFLIGHT_DELTA_COMMIT_EXTENSION, COMPACTION_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION, + CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION, COMPACTION_EXTENSION}); } /** @@ -113,7 +115,16 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { * @return */ public HoodieTimeline getCommitTimeline() { - return new HoodieDefaultTimeline(filterInstantsByAction(COMMIT_ACTION), + return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, COMPACTION_ACTION)); + } + + /** + * Get only the commits (inflight and completed) in the active timeline + * + * @return + */ + public HoodieTimeline getDeltaCommitTimeline() { + return new HoodieDefaultTimeline(filterInstantsByAction(DELTA_COMMIT_ACTION), (Function> & Serializable) this::getInstantDetails); } @@ -138,6 +149,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { (Function> & Serializable) this::getInstantDetails); } + /** * Get only the cleaner action (inflight and completed) in the active timeline * diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java index 1679f0b13..24a666194 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java @@ -97,6 +97,10 @@ public class HoodieInstant implements Serializable { return isInflight ? HoodieTimeline.makeInflightCompactionFileName(timestamp) : HoodieTimeline.makeCompactionFileName(timestamp); + } else if (HoodieTimeline.DELTA_COMMIT_ACTION.equals(action)) { + return isInflight ? + HoodieTimeline.makeInflightDeltaFileName(timestamp) : + HoodieTimeline.makeDeltaFileName(timestamp); } throw new IllegalArgumentException("Cannot get file name for unknown action " + action); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java similarity index 79% rename from hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java rename to hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java index 2c44044de..2473ade75 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java @@ -16,12 +16,16 @@ package com.uber.hoodie.common.table.view; +import com.google.common.collect.Maps; import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.log.HoodieLogFile; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieIOException; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.FileStatus; @@ -47,19 +51,18 @@ import java.util.stream.Stream; * listDataFilesInPartition which includes files to be included in the view * * @see TableFileSystemView - * @see ReadOptimizedTableView * @since 0.3.0 */ -public abstract class AbstractTableFileSystemView implements TableFileSystemView, Serializable { +public class HoodieTableFileSystemView implements TableFileSystemView, Serializable { protected HoodieTableMetaClient metaClient; protected transient FileSystem fs; // This is the commits that will be visible for all views extending this view protected HoodieTimeline visibleActiveCommitTimeline; - public AbstractTableFileSystemView(FileSystem fs, HoodieTableMetaClient metaClient, + public HoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveCommitTimeline) { this.metaClient = metaClient; - this.fs = fs; + this.fs = metaClient.getFs(); this.visibleActiveCommitTimeline = visibleActiveCommitTimeline; } @@ -183,6 +186,37 @@ public abstract class AbstractTableFileSystemView implements TableFileSystemView } } + public Map> groupLatestDataFileWithLogFiles( + String partitionPath) throws IOException { + if (metaClient.getTableType() != HoodieTableType.MERGE_ON_READ) { + throw new HoodieException("Unsupported table type :" + metaClient.getTableType()); + } + + // All the files in the partition + FileStatus[] files = fs.listStatus(new Path(metaClient.getBasePath(), partitionPath)); + // All the log files filtered from the above list, sorted by version numbers + List allLogFiles = Arrays.stream(files).filter(s -> s.getPath().getName() + .contains(metaClient.getTableConfig().getRTFileFormat().getFileExtension())) + .map(HoodieLogFile::new).collect(Collectors.collectingAndThen(Collectors.toList(), + l -> l.stream().sorted(HoodieLogFile.getLogVersionComparator()) + .collect(Collectors.toList()))); + + // Filter the delta files by the commit time of the latest base fine and collect as a list + Optional lastTimestamp = metaClient.getActiveTimeline().lastInstant(); + if (!lastTimestamp.isPresent()) { + return Maps.newHashMap(); + } + + return getLatestVersionInPartition(partitionPath, lastTimestamp.get().getTimestamp()).map( + hoodieDataFile -> Pair.of(hoodieDataFile, allLogFiles.stream().filter( + s -> s.getFileId().equals(hoodieDataFile.getFileId()) && s.getBaseCommitTime() + .equals(hoodieDataFile.getCommitTime())).collect(Collectors.toList()))).collect( + Collectors.toMap( + (Function>, HoodieDataFile>) Pair::getKey, + (Function>, List>) Pair::getRight)); + } + + protected Stream> getFilesByFileId(FileStatus[] files, String maxCommitTime) throws IOException { return groupFilesByFileId(files, maxCommitTime).values().stream(); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/ReadOptimizedTableView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/ReadOptimizedTableView.java deleted file mode 100644 index 3aeaa4914..000000000 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/ReadOptimizedTableView.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.common.table.view; - -import com.uber.hoodie.common.table.HoodieTableMetaClient; -import com.uber.hoodie.exception.HoodieIOException; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import java.io.IOException; - -/** - * ReadOptimized view which includes only the ROStorageformat files - */ -public class ReadOptimizedTableView extends AbstractTableFileSystemView { - public ReadOptimizedTableView(FileSystem fs, HoodieTableMetaClient metaClient) { - // Get the active timeline and filter only completed commits - super(fs, metaClient, - metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants()); - } - -} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RealtimeTableView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RealtimeTableView.java deleted file mode 100644 index c5b1feb4a..000000000 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RealtimeTableView.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.common.table.view; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.uber.hoodie.common.model.HoodieDataFile; -import com.uber.hoodie.common.model.HoodieTableType; -import com.uber.hoodie.common.table.HoodieTableMetaClient; -import com.uber.hoodie.common.table.log.HoodieLogFile; -import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; -import com.uber.hoodie.common.table.timeline.HoodieInstant; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.function.Function; -import java.util.stream.Collectors; - -/** - * Realtime Table View which includes both ROStorageformat files and RTStorageFormat files - */ -public class RealtimeTableView extends AbstractTableFileSystemView { - public RealtimeTableView(FileSystem fs, HoodieTableMetaClient metaClient) { - // For realtime table view, visibleActiveCommitTimeline is a merged timeline of all commits and compactions - super(fs, metaClient, metaClient.getActiveTimeline().getTimelineOfActions( - Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION, - HoodieActiveTimeline.COMPACTION_ACTION)).filterCompletedInstants()); - Preconditions.checkArgument(metaClient.getTableType() == HoodieTableType.MERGE_ON_READ, - "Realtime view can only be constructed on Hoodie tables with MERGE_ON_READ storage type"); - } - - public Map> groupLatestDataFileWithLogFiles(FileSystem fs, - String partitionPath) throws IOException { - // All the files in the partition - FileStatus[] files = fs.listStatus(new Path(metaClient.getBasePath(), partitionPath)); - // All the log files filtered from the above list, sorted by version numbers - List allLogFiles = Arrays.stream(files).filter(s -> s.getPath().getName() - .contains(metaClient.getTableConfig().getRTFileFormat().getFileExtension())) - .map(HoodieLogFile::new).collect(Collectors.collectingAndThen(Collectors.toList(), - l -> l.stream().sorted(HoodieLogFile.getLogVersionComparator()) - .collect(Collectors.toList()))); - - // Filter the delta files by the commit time of the latest base fine and collect as a list - Optional lastTimestamp = metaClient.getActiveTimeline().lastInstant(); - if(!lastTimestamp.isPresent()) { - return Maps.newHashMap(); - } - - return getLatestVersionInPartition(partitionPath, lastTimestamp.get().getTimestamp()).map( - hoodieDataFile -> Pair.of(hoodieDataFile, allLogFiles.stream().filter( - s -> s.getFileId().equals(hoodieDataFile.getFileId()) && s.getBaseCommitTime() - .equals(hoodieDataFile.getCommitTime())).collect(Collectors.toList()))).collect( - Collectors.toMap( - (Function>, HoodieDataFile>) Pair::getKey, - (Function>, List>) Pair::getRight)); - } - -} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/exception/HoodieNotSupportedException.java b/hoodie-common/src/main/java/com/uber/hoodie/exception/HoodieNotSupportedException.java new file mode 100644 index 000000000..2305df3ab --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/exception/HoodieNotSupportedException.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.exception; + +public class HoodieNotSupportedException extends HoodieException { + public HoodieNotSupportedException(String errorMsg) { + super(errorMsg); + } +} diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java index d6251273f..54864cf76 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java @@ -20,6 +20,7 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.uber.hoodie.common.table.HoodieTableConfig; import com.uber.hoodie.common.table.HoodieTableMetaClient; @@ -32,12 +33,16 @@ import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.common.util.SchemaTestUtil; +import com.uber.hoodie.exception.HoodieException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.jute.Index; import org.junit.rules.TemporaryFolder; @@ -58,6 +63,7 @@ import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -192,15 +198,28 @@ public class HoodieTestUtils { r.getRecordKey(), r.getPartitionPath(), ""); - return val; + return (IndexedRecord) val; } catch (IOException e) { return null; } - }).collect(Collectors.toList())); + }).collect(Collectors.toList()).iterator()); log.close(); } catch (Exception e) { fail(e.toString()); } }); } + + public static FileStatus[] listAllDataFilesInPath(FileSystem fs, String basePath) + throws IOException { + RemoteIterator itr = fs.listFiles(new Path(basePath), true); + List returns = Lists.newArrayList(); + while(itr.hasNext()) { + LocatedFileStatus status = itr.next(); + if(status.getPath().getName().contains(".parquet")) { + returns.add(status); + } + } + return returns.toArray(new FileStatus[returns.size()]); + } } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/avro/AvroLogAppenderTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/avro/AvroLogAppenderTest.java index cd79e9775..236281918 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/avro/AvroLogAppenderTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/avro/AvroLogAppenderTest.java @@ -85,7 +85,7 @@ public class AvroLogAppenderTest { .withBaseCommitTime("100") .withSchema(SchemaTestUtil.getSimpleSchema()).withFs(fs).build(); RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig); - logAppender.append(SchemaTestUtil.generateTestRecords(0, 100)); + logAppender.append(SchemaTestUtil.generateTestRecords(0, 100).iterator()); long size1 = logAppender.getCurrentSize(); assertTrue("", size1 > 0); assertEquals("", size1, fs.getFileStatus(logConfig.getLogFile().getPath()).getLen()); @@ -93,7 +93,7 @@ public class AvroLogAppenderTest { // Close and Open again and append 100 more records logAppender = new RollingAvroLogAppender(logConfig); - logAppender.append(SchemaTestUtil.generateTestRecords(100, 100)); + logAppender.append(SchemaTestUtil.generateTestRecords(100, 100).iterator()); long size2 = logAppender.getCurrentSize(); assertTrue("", size2 > size1); assertEquals("", size2, fs.getFileStatus(logConfig.getLogFile().getPath()).getLen()); @@ -101,7 +101,7 @@ public class AvroLogAppenderTest { // Close and Open again and append 100 more records logAppender = new RollingAvroLogAppender(logConfig); - logAppender.append(SchemaTestUtil.generateTestRecords(200, 100)); + logAppender.append(SchemaTestUtil.generateTestRecords(200, 100).iterator()); long size3 = logAppender.getCurrentSize(); assertTrue("", size3 > size2); assertEquals("", size3, fs.getFileStatus(logConfig.getLogFile().getPath()).getLen()); @@ -123,13 +123,13 @@ public class AvroLogAppenderTest { .withBaseCommitTime("100") .withSchema(SchemaTestUtil.getSimpleSchema()).withFs(fs).build(); RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig); - logAppender.append(SchemaTestUtil.generateTestRecords(0, 100)); + logAppender.append(SchemaTestUtil.generateTestRecords(0, 100).iterator()); // do not close this log appender // logAppender.close(); // Try opening again and append 100 more records logAppender = new RollingAvroLogAppender(logConfig); - logAppender.append(SchemaTestUtil.generateTestRecords(100, 100)); + logAppender.append(SchemaTestUtil.generateTestRecords(100, 100).iterator()); assertEquals("", logAppender.getCurrentSize(), fs.getFileStatus(logConfig.getLogFile().getPath()).getLen()); logAppender.close(); @@ -144,7 +144,7 @@ public class AvroLogAppenderTest { .withBaseCommitTime("100") .withSchema(SchemaTestUtil.getSimpleSchema()).withFs(fs).build(); RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig); - logAppender.append(SchemaTestUtil.generateTestRecords(0, 100)); + logAppender.append(SchemaTestUtil.generateTestRecords(0, 100).iterator()); logAppender.close(); // Append some arbit byte[] to thee end of the log (mimics a partially written commit) @@ -157,7 +157,7 @@ public class AvroLogAppenderTest { outputStream.close(); logAppender = new RollingAvroLogAppender(logConfig); - logAppender.append(SchemaTestUtil.generateTestRecords(100, 100)); + logAppender.append(SchemaTestUtil.generateTestRecords(100, 100).iterator()); logAppender.close(); } @@ -175,7 +175,7 @@ public class AvroLogAppenderTest { long size1 = logAppender.getCurrentSize(); List inputRecords = SchemaTestUtil.generateTestRecords(0, 100); - logAppender.append(inputRecords); + logAppender.append(inputRecords.iterator()); logAppender.close(); AvroLogReader logReader = @@ -195,21 +195,21 @@ public class AvroLogAppenderTest { .withBaseCommitTime("100") .withSchema(SchemaTestUtil.getSimpleSchema()).withFs(fs).build(); RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig); - logAppender.append(SchemaTestUtil.generateTestRecords(0, 100)); + logAppender.append(SchemaTestUtil.generateTestRecords(0, 100).iterator()); long size1 = logAppender.getCurrentSize(); logAppender.close(); // Close and Open again and append 100 more records logAppender = new RollingAvroLogAppender(logConfig); List secondBatchInput = SchemaTestUtil.generateTestRecords(100, 100); - logAppender.append(secondBatchInput); + logAppender.append(secondBatchInput.iterator()); long size2 = logAppender.getCurrentSize(); logAppender.close(); // Close and Open again and append 100 more records logAppender = new RollingAvroLogAppender(logConfig); List lastBatchInput = SchemaTestUtil.generateTestRecords(200, 100); - logAppender.append(lastBatchInput); + logAppender.append(lastBatchInput.iterator()); long size3 = logAppender.getCurrentSize(); logAppender.close(); @@ -242,7 +242,7 @@ public class AvroLogAppenderTest { .withSchema(SchemaTestUtil.getSimpleSchema()).withFs(fs).build(); RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig); long size1 = logAppender.getCurrentSize(); - logAppender.append(SchemaTestUtil.generateTestRecords(0, 100)); + logAppender.append(SchemaTestUtil.generateTestRecords(0, 100).iterator()); logAppender.close(); // Append some arbit byte[] to thee end of the log (mimics a partially written commit) @@ -256,7 +256,7 @@ public class AvroLogAppenderTest { logAppender = new RollingAvroLogAppender(logConfig); long size2 = logAppender.getCurrentSize(); - logAppender.append(SchemaTestUtil.generateTestRecords(100, 100)); + logAppender.append(SchemaTestUtil.generateTestRecords(100, 100).iterator()); logAppender.close(); AvroLogReader logReader = @@ -285,7 +285,7 @@ public class AvroLogAppenderTest { RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig); long size1 = logAppender.getCurrentSize(); List input1 = SchemaTestUtil.generateTestRecords(0, 100); - logAppender.append(input1); + logAppender.append(input1.iterator()); logAppender.close(); // Need to rebuild config to set the latest version as path @@ -296,7 +296,7 @@ public class AvroLogAppenderTest { logAppender = new RollingAvroLogAppender(logConfig); long size2 = logAppender.getCurrentSize(); List input2 = SchemaTestUtil.generateTestRecords(100, 100); - logAppender.append(input2); + logAppender.append(input2.iterator()); logAppender.close(); logConfig = HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath) diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/ReadOptimizedTableViewTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/ReadOptimizedTableViewTest.java index f1ec707a2..e912f2b86 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/ReadOptimizedTableViewTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/ReadOptimizedTableViewTest.java @@ -54,12 +54,14 @@ public class ReadOptimizedTableViewTest { folder.create(); this.basePath = folder.getRoot().getAbsolutePath(); metaClient = HoodieTestUtils.init(basePath); - fsView = new ReadOptimizedTableView(HoodieTestUtils.fs, metaClient); + fsView = new HoodieTableFileSystemView(metaClient, + metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants()); } private void refreshFsView() { metaClient = new HoodieTableMetaClient(HoodieTestUtils.fs, basePath, true); - fsView = new ReadOptimizedTableView(HoodieTestUtils.fs, metaClient); + fsView = new HoodieTableFileSystemView(metaClient, + metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants()); } @Test diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java index e8b50d0fd..0bc33e215 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java @@ -22,8 +22,7 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.timeline.HoodieInstant; -import com.uber.hoodie.common.table.view.ReadOptimizedTableView; -import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.exception.InvalidDatasetException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -95,8 +94,9 @@ public class HoodieInputFormat extends MapredParquetInputFormat LOG.info("Hoodie Metadata initialized with completed commit Ts as :" + metadata); String tableName = metadata.getTableConfig().getTableName(); String mode = HoodieHiveUtil.readMode(Job.getInstance(job), tableName); - TableFileSystemView fsView = new ReadOptimizedTableView(FSUtils.getFs(), metadata); HoodieTimeline timeline = metadata.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + TableFileSystemView fsView = new HoodieTableFileSystemView(metadata, timeline); + if (HoodieHiveUtil.INCREMENTAL_SCAN_MODE.equals(mode)) { // this is of the form commitTs_partition_sequenceNumber String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName); diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java index 464458fd7..7cfb8392e 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java @@ -25,8 +25,9 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.timeline.HoodieInstant; -import com.uber.hoodie.common.table.view.ReadOptimizedTableView; +import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.table.HoodieTable; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -65,7 +66,8 @@ public class HoodieSnapshotCopier implements Serializable { public void snapshot(JavaSparkContext jsc, String baseDir, final String outputDir) throws IOException { FileSystem fs = FSUtils.getFs(); final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs, baseDir); - final TableFileSystemView fsView = new ReadOptimizedTableView(fs, tableMetadata); + final TableFileSystemView fsView = new HoodieTableFileSystemView(tableMetadata, + tableMetadata.getActiveTimeline().getCommitTimeline().filterCompletedInstants()); // Get the latest commit final Optional latestCommit = tableMetadata.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();