diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java index 1ccf4eccb..d47dd1d52 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java @@ -17,10 +17,12 @@ package com.uber.hoodie.config; import com.google.common.base.Preconditions; +import com.uber.hoodie.common.model.HoodieAvroPayload; import com.uber.hoodie.common.model.HoodieCleaningPolicy; - +import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.io.compact.strategy.CompactionStrategy; import com.uber.hoodie.io.compact.strategy.LogFileSizeBasedCompactionStrategy; + import javax.annotation.concurrent.Immutable; import java.io.File; import java.io.FileReader; @@ -93,6 +95,10 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { // 200GB of target IO per compaction public static final String DEFAULT_COMPACTION_STRATEGY = LogFileSizeBasedCompactionStrategy.class.getName(); + // used to merge records written to log file + public static final String DEFAULT_PAYLOAD_CLASS = HoodieAvroPayload.class.getName(); + public static final String PAYLOAD_CLASS = "hoodie.compaction.payload.class"; + private HoodieCompactionConfig(Properties props) { super(props); } @@ -187,6 +193,11 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { return this; } + public Builder withPayloadClass(String payloadClassName) { + props.setProperty(PAYLOAD_CLASS, payloadClassName); + return this; + } + public Builder withTargetIOPerCompactionInMB(long targetIOPerCompactionInMB) { props.setProperty(TARGET_IO_PER_COMPACTION_IN_MB_PROP, String.valueOf(targetIOPerCompactionInMB)); return this; @@ -222,6 +233,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { CLEANER_PARALLELISM, DEFAULT_CLEANER_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(COMPACTION_STRATEGY_PROP), COMPACTION_STRATEGY_PROP, DEFAULT_COMPACTION_STRATEGY); + setDefaultOnCondition(props, !props.containsKey(PAYLOAD_CLASS), + PAYLOAD_CLASS, DEFAULT_PAYLOAD_CLASS); setDefaultOnCondition(props, !props.containsKey(TARGET_IO_PER_COMPACTION_IN_MB_PROP), TARGET_IO_PER_COMPACTION_IN_MB_PROP, DEFAULT_TARGET_IO_PER_COMPACTION_IN_MB); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index b1b874120..be039570d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -23,6 +23,7 @@ import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.CompactionWriteStat; import com.uber.hoodie.common.model.HoodieAvroPayload; import com.uber.hoodie.common.model.HoodieCompactionMetadata; +import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; @@ -155,14 +156,15 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { HoodieTimeline.DELTA_COMMIT_ACTION)) .filterCompletedInstants().lastInstant().get().getTimestamp(); - HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, operation.getDeltaFilePaths(), readerSchema, maxInstantTime); + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, metaClient.getBasePath(), + operation.getDeltaFilePaths(), readerSchema, maxInstantTime); if (!scanner.iterator().hasNext()) { return Lists.newArrayList(); } // Compacting is very similar to applying updates to existing file - HoodieCopyOnWriteTable table = - new HoodieCopyOnWriteTable<>(config, metaClient); + HoodieCopyOnWriteTable table = + new HoodieCopyOnWriteTable(config, metaClient); Iterator> result = table .handleUpdate(commitTime, operation.getFileId(), scanner.iterator()); Iterable> resultIterable = () -> result; diff --git a/hoodie-client/src/test/java/HoodieClientExample.java b/hoodie-client/src/test/java/HoodieClientExample.java index cffe5605d..26f097a93 100644 --- a/hoodie-client/src/test/java/HoodieClientExample.java +++ b/hoodie-client/src/test/java/HoodieClientExample.java @@ -19,6 +19,7 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.uber.hoodie.HoodieWriteClient; import com.uber.hoodie.common.HoodieTestDataGenerator; +import com.uber.hoodie.common.model.HoodieAvroPayload; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.table.HoodieTableConfig; @@ -84,7 +85,7 @@ public class HoodieClientExample { Path path = new Path(tablePath); FileSystem fs = FSUtils.getFs(); if (!fs.exists(path)) { - HoodieTableMetaClient.initTableType(fs, tablePath, HoodieTableType.valueOf(tableType), tableName); + HoodieTableMetaClient.initTableType(fs, tablePath, HoodieTableType.valueOf(tableType), tableName, HoodieAvroPayload.class.getName()); } // Create the write client to write some records in diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java index 7d64bf093..48cb75ca3 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java @@ -16,8 +16,10 @@ package com.uber.hoodie.common.table; +import com.uber.hoodie.common.model.HoodieAvroPayload; import com.uber.hoodie.common.model.HoodieFileFormat; import com.uber.hoodie.common.model.HoodieTableType; +import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieIOException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -49,10 +51,12 @@ public class HoodieTableConfig implements Serializable { "hoodie.table.ro.file.format"; public static final String HOODIE_RT_FILE_FORMAT_PROP_NAME = "hoodie.table.rt.file.format"; + public static final String HOODIE_PAYLOAD_CLASS_PROP_NAME = "hoodie.compaction.payload.class"; public static final HoodieTableType DEFAULT_TABLE_TYPE = HoodieTableType.COPY_ON_WRITE; public static final HoodieFileFormat DEFAULT_RO_FILE_FORMAT = HoodieFileFormat.PARQUET; public static final HoodieFileFormat DEFAULT_RT_FILE_FORMAT = HoodieFileFormat.HOODIE_LOG; + public static final String DEFAULT_PAYLOAD_CLASS = HoodieAvroPayload.class.getName(); private Properties props; public HoodieTableConfig(FileSystem fs, String metaPath) { @@ -98,6 +102,10 @@ public class HoodieTableConfig implements Serializable { if (!properties.containsKey(HOODIE_TABLE_TYPE_PROP_NAME)) { properties.setProperty(HOODIE_TABLE_TYPE_PROP_NAME, DEFAULT_TABLE_TYPE.name()); } + if (properties.getProperty(HOODIE_TABLE_TYPE_PROP_NAME) == HoodieTableType.MERGE_ON_READ.name() + && !properties.containsKey(HOODIE_PAYLOAD_CLASS_PROP_NAME)) { + properties.setProperty(HOODIE_PAYLOAD_CLASS_PROP_NAME, DEFAULT_PAYLOAD_CLASS); + } properties .store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis())); } finally { @@ -118,6 +126,15 @@ public class HoodieTableConfig implements Serializable { return DEFAULT_TABLE_TYPE; } + /** + * Read the payload class for HoodieRecords from the table properties + * + * @return + */ + public String getPayloadClass() { + return props.getProperty(HOODIE_PAYLOAD_CLASS_PROP_NAME, DEFAULT_PAYLOAD_CLASS); + } + /** * Read the table name * diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java index b0387d870..b1dd0296c 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java @@ -182,10 +182,13 @@ public class HoodieTableMetaClient implements Serializable { * @return * @throws IOException */ - public static HoodieTableMetaClient initTableType(FileSystem fs, String basePath, HoodieTableType tableType, String tableName) throws IOException { + public static HoodieTableMetaClient initTableType(FileSystem fs, String basePath, HoodieTableType tableType, String tableName, String payloadClassName) throws IOException { Properties properties = new Properties(); properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName); properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name()); + if(tableType == HoodieTableType.MERGE_ON_READ) { + properties.setProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, payloadClassName); + } return HoodieTableMetaClient.initializePathAsHoodieDataset(fs, basePath, properties); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java index 1d26ec64c..2060db1a1 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java @@ -17,15 +17,17 @@ package com.uber.hoodie.common.table.log; import com.google.common.collect.Maps; -import com.uber.hoodie.common.model.HoodieAvroPayload; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; import com.uber.hoodie.common.table.log.block.HoodieCommandBlock; import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock; import com.uber.hoodie.common.table.log.block.HoodieLogBlock; +import com.uber.hoodie.common.util.ReflectionUtils; import com.uber.hoodie.exception.HoodieIOException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -55,13 +57,12 @@ import static com.uber.hoodie.common.table.log.block.HoodieLogBlock.LogMetadataT * list of records which will be used as a lookup table when merging the base columnar file * with the redo log file. * - * TODO(FIX) - Does not apply application specific merge logic - defaults to HoodieAvroPayload */ -public class HoodieCompactedLogRecordScanner implements Iterable> { +public class HoodieCompactedLogRecordScanner implements Iterable> { private final static Logger log = LogManager.getLogger(HoodieCompactedLogRecordScanner.class); // Final list of compacted/merged records to iterate - private final Collection> logRecords; + private final Collection> logRecords; // Reader schema for the records private final Schema readerSchema; // Total log files read - for metrics @@ -72,16 +73,22 @@ public class HoodieCompactedLogRecordScanner implements Iterable lastBlocks = new ArrayDeque<>(); - public HoodieCompactedLogRecordScanner(FileSystem fs, List logFilePaths, + public HoodieCompactedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime) { this.readerSchema = readerSchema; this.latestInstantTime = latestInstantTime; + this.hoodieTableMetaClient = new HoodieTableMetaClient(fs, basePath); + // load class from the payload fully qualified class name + this.payloadClassFQN = this.hoodieTableMetaClient.getTableConfig().getPayloadClass(); - // Store only the last log blocks (needed to implement rollback) - Deque lastBlocks = new ArrayDeque<>(); // Store merged records for all versions for this log file - Map> records = Maps.newHashMap(); + Map> records = Maps.newHashMap(); // iterate over the paths Iterator logFilePathsItr = logFilePaths.iterator(); while(logFilePathsItr.hasNext()) { @@ -132,7 +139,7 @@ public class HoodieCompactedLogRecordScanner implements Iterable> loadRecordsFromBlock(HoodieAvroDataBlock dataBlock) { - Map> recordsFromLastBlock = Maps.newHashMap(); + private Map> loadRecordsFromBlock(HoodieAvroDataBlock dataBlock) { + Map> recordsFromLastBlock = Maps.newHashMap(); List recs = dataBlock.getRecords(); totalLogRecords.addAndGet(recs.size()); recs.forEach(rec -> { String key = ((GenericRecord) rec).get(HoodieRecord.RECORD_KEY_METADATA_FIELD) - .toString(); - String partitionPath = - ((GenericRecord) rec).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD) .toString(); - HoodieRecord hoodieRecord = new HoodieRecord<>( - new HoodieKey(key, partitionPath), - new HoodieAvroPayload(Optional.of(((GenericRecord) rec)))); + String partitionPath = + ((GenericRecord) rec).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + .toString(); + HoodieRecord hoodieRecord = new HoodieRecord<>( + new HoodieKey(key, partitionPath), + ReflectionUtils.loadPayload(this.payloadClassFQN, new Object[]{Optional.of(rec)}, Optional.class)); if (recordsFromLastBlock.containsKey(key)) { // Merge and store the merged record - HoodieAvroPayload combinedValue = recordsFromLastBlock.get(key).getData() - .preCombine(hoodieRecord.getData()); + HoodieRecordPayload combinedValue = recordsFromLastBlock.get(key).getData() + .preCombine(hoodieRecord.getData()); recordsFromLastBlock - .put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), + .put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue)); } else { // Put the record as is @@ -207,8 +215,8 @@ public class HoodieCompactedLogRecordScanner implements Iterable> records, - Deque lastBlocks) { + private void merge(Map> records, + Deque lastBlocks) { while (!lastBlocks.isEmpty()) { HoodieLogBlock lastBlock = lastBlocks.pop(); switch (lastBlock.getBlockType()) { @@ -230,15 +238,15 @@ public class HoodieCompactedLogRecordScanner implements Iterable> records, - Map> recordsFromLastBlock) { + private void merge(Map> records, + Map> recordsFromLastBlock) { recordsFromLastBlock.forEach((key, hoodieRecord) -> { if (records.containsKey(key)) { // Merge and store the merged record - HoodieAvroPayload combinedValue = records.get(key).getData() - .preCombine(hoodieRecord.getData()); + HoodieRecordPayload combinedValue = records.get(key).getData() + .preCombine(hoodieRecord.getData()); records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), - combinedValue)); + combinedValue)); } else { // Put the record as is records.put(key, hoodieRecord); @@ -247,7 +255,7 @@ public class HoodieCompactedLogRecordScanner implements Iterable> iterator() { + public Iterator> iterator() { return logRecords.iterator(); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ReflectionUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ReflectionUtils.java index 422e22d40..c1a8e9062 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ReflectionUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ReflectionUtils.java @@ -19,43 +19,55 @@ package com.uber.hoodie.common.util; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.exception.HoodieException; +import org.apache.avro.generic.GenericRecord; + import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.util.HashMap; import java.util.Map; +import java.util.Optional; public class ReflectionUtils { - private static Map> clazzCache = new HashMap<>(); + private static Map> clazzCache = new HashMap<>(); - public static T loadPayload(String recordPayloadClass) throws IOException { - try { - if(clazzCache.get(recordPayloadClass) == null) { - Class clazz = Class.forName(recordPayloadClass); - clazzCache.put(recordPayloadClass, clazz); - } - return (T) clazzCache.get(recordPayloadClass).newInstance(); - } catch (ClassNotFoundException e) { - throw new IOException("Could not load payload class " + recordPayloadClass, e); - } catch (InstantiationException e) { - throw new IOException("Could not load payload class " + recordPayloadClass, e); - } catch (IllegalAccessException e) { - throw new IOException("Could not load payload class " + recordPayloadClass, e); - } + public static T loadClass(String fqcn) { + try { + if(clazzCache.get(fqcn) == null) { + Class clazz = Class.forName(fqcn); + clazzCache.put(fqcn, clazz); + } + return (T) clazzCache.get(fqcn).newInstance(); + } catch (ClassNotFoundException e) { + throw new HoodieException("Could not load class " + fqcn, e); + } catch (InstantiationException e) { + throw new HoodieException("Could not load class " + fqcn, e); + } catch (IllegalAccessException e) { + throw new HoodieException("Could not load class " + fqcn, e); } + } - public static T loadClass(String fqcn) { - try { - if(clazzCache.get(fqcn) == null) { - Class clazz = Class.forName(fqcn); - clazzCache.put(fqcn, clazz); - } - return (T) clazzCache.get(fqcn).newInstance(); - } catch (ClassNotFoundException e) { - throw new HoodieException("Could not load class " + fqcn, e); - } catch (InstantiationException e) { - throw new HoodieException("Could not load class " + fqcn, e); - } catch (IllegalAccessException e) { - throw new HoodieException("Could not load class " + fqcn, e); - } + /** + * Instantiate a given class with a generic record payload + * + * @param recordPayloadClass + * @param payloadArgs + * @param + * @return + */ + public static T loadPayload(String recordPayloadClass, + Object [] payloadArgs, + Class ... constructorArgTypes) { + try { + if(clazzCache.get(recordPayloadClass) == null) { + Class clazz = Class.forName(recordPayloadClass); + clazzCache.put(recordPayloadClass, clazz); + } + return (T) clazzCache.get(recordPayloadClass).getConstructor(constructorArgTypes).newInstance(payloadArgs); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { + throw new HoodieException("Unable to instantiate payload class ", e); + } catch (ClassNotFoundException e) { + throw new HoodieException("Unable to instantiate payload class ", e); } + } } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java index c41d129fa..b316691ce 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java @@ -88,6 +88,7 @@ public class HoodieTestUtils { Properties properties = new Properties(); properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, RAW_TRIPS_TEST_NAME); properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name()); + properties.setProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, HoodieAvroPayload.class.getName()); return HoodieTableMetaClient.initializePathAsHoodieDataset(fs, basePath, properties); } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java index 57530064c..607cdaea5 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java @@ -20,6 +20,9 @@ import com.google.common.collect.Maps; import com.uber.hoodie.common.minicluster.MiniClusterUtil; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieTableType; +import com.uber.hoodie.common.model.HoodieTestUtils; +import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.log.HoodieLogFormat.Reader; import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer; import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; @@ -66,6 +69,7 @@ public class HoodieLogFormatTest { private FileSystem fs; private Path partitionPath; + private String basePath; @BeforeClass public static void setUpClass() throws IOException, InterruptedException { @@ -76,6 +80,7 @@ public class HoodieLogFormatTest { @AfterClass public static void tearDownClass() { MiniClusterUtil.shutdown(); + HoodieTestUtils.resetFS(); } @Before @@ -85,6 +90,9 @@ public class HoodieLogFormatTest { folder.create(); assertTrue(fs.mkdirs(new Path(folder.getRoot().getPath()))); this.partitionPath = new Path(folder.getRoot().getPath()); + this.basePath = folder.getRoot().getParent(); + HoodieTestUtils.fs = fs; + HoodieTestUtils.initTableType(basePath, HoodieTableType.MERGE_ON_READ); } @After @@ -430,7 +438,8 @@ public class HoodieLogFormatTest { .map(s -> s.getPath().toString()) .collect(Collectors.toList()); - HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100"); + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, + schema, "100"); assertEquals("", 200, scanner.getTotalLogRecords()); Set readKeys = new HashSet<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); @@ -484,9 +493,9 @@ public class HoodieLogFormatTest { .map(s -> s.getPath().toString()) .collect(Collectors.toList()); - HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100"); - assertEquals("We only read 200 records, since 200 of them are valid", 200, - scanner.getTotalLogRecords()); + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, + schema, "100"); + assertEquals("We only read 200 records, but only 200 of them are valid", 200, scanner.getTotalLogRecords()); Set readKeys = new HashSet<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); assertEquals("Stream collect should return all 200 records", 200, readKeys.size()); @@ -552,7 +561,8 @@ public class HoodieLogFormatTest { .map(s -> s.getPath().toString()) .collect(Collectors.toList()); - HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100"); + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, + schema, "100"); assertEquals("We would read 200 records", 200, scanner.getTotalLogRecords()); Set readKeys = new HashSet<>(200); @@ -607,7 +617,8 @@ public class HoodieLogFormatTest { .map(s -> s.getPath().toString()) .collect(Collectors.toList()); - HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100"); + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, + schema, "100"); assertEquals("We still would read 200 records", 200, scanner.getTotalLogRecords()); final List readKeys = new ArrayList<>(200); @@ -625,7 +636,7 @@ public class HoodieLogFormatTest { writer = writer.appendBlock(commandBlock); readKeys.clear(); - scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100"); + scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, "100"); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); assertEquals("Stream collect should return all 200 records after rollback of delete", 200, readKeys.size()); } @@ -684,7 +695,8 @@ public class HoodieLogFormatTest { .map(s -> s.getPath().toString()) .collect(Collectors.toList()); - HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100"); + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, + schema, "100"); assertEquals("We would read 100 records", 100, scanner.getTotalLogRecords()); @@ -734,7 +746,8 @@ public class HoodieLogFormatTest { .map(s -> s.getPath().toString()) .collect(Collectors.toList()); - HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100"); + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, + allLogFiles, schema, "100"); assertEquals("We would read 0 records", 0, scanner.getTotalLogRecords()); } @@ -766,7 +779,8 @@ public class HoodieLogFormatTest { .map(s -> s.getPath().toString()) .collect(Collectors.toList()); - HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100"); + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, + allLogFiles, schema, "100"); assertEquals("We still would read 100 records", 100, scanner.getTotalLogRecords()); final List readKeys = new ArrayList<>(100); diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeFileSplit.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeFileSplit.java index a352f48a5..0ce79fc60 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeFileSplit.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeFileSplit.java @@ -35,15 +35,17 @@ public class HoodieRealtimeFileSplit extends FileSplit { private String maxCommitTime; + private String basePath; public HoodieRealtimeFileSplit() { super(); } - public HoodieRealtimeFileSplit(FileSplit baseSplit, List deltaLogFiles, String maxCommitTime) throws IOException { + public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List deltaLogFiles, String maxCommitTime) throws IOException { super(baseSplit.getPath(), baseSplit.getStart(), baseSplit.getLength(), baseSplit.getLocations()); this.deltaFilePaths = deltaLogFiles; this.maxCommitTime = maxCommitTime; + this.basePath = basePath; } public List getDeltaFilePaths() { @@ -54,6 +56,10 @@ public class HoodieRealtimeFileSplit extends FileSplit { return maxCommitTime; } + public String getBasePath() { + return basePath; + } + private static void writeString(String str, DataOutput out) throws IOException { byte[] pathBytes = str.getBytes(StandardCharsets.UTF_8); out.writeInt(pathBytes.length); diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java index fa2e30af5..f4849109e 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java @@ -127,7 +127,7 @@ public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Conf HoodieTimeline.DELTA_COMMIT_ACTION)) .filterCompletedInstants().lastInstant().get().getTimestamp(); rtSplits.add( - new HoodieRealtimeFileSplit(split, logFilePaths, maxCommitTime)); + new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime)); } catch (IOException e) { throw new HoodieIOException("Error creating hoodie real time split ", e); } diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java index 591fb56aa..08a23d7ba 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java @@ -20,8 +20,10 @@ package com.uber.hoodie.hadoop.realtime; import com.uber.hoodie.common.model.HoodieAvroPayload; import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.table.log.HoodieCompactedLogRecordScanner; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.ReflectionUtils; import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieIOException; import org.apache.avro.Schema; @@ -122,14 +124,12 @@ public class HoodieRealtimeRecordReader implements RecordReader the commit we are trying to read (if using readCommit() API) - for (HoodieRecord hoodieRecord : compactedLogRecordScanner) { + for (HoodieRecord hoodieRecord : compactedLogRecordScanner) { GenericRecord rec = (GenericRecord) hoodieRecord.getData().getInsertValue(readerSchema) .get(); String key = hoodieRecord.getRecordKey(); diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java index a93189e3a..73595f6eb 100644 --- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java @@ -113,7 +113,7 @@ public class HoodieRealtimeRecordReaderTest { //create a split with baseFile (parquet file written earlier) and new log file(s) String logFilePath = writer.getLogFile().getPath().toString(); HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(new FileSplit(new Path(partitionDir - + "/fileid0_1_" + commitTime + ".parquet"),0,1,jobConf), Arrays.asList(logFilePath), newCommitTime); + + "/fileid0_1_" + commitTime + ".parquet"),0,1,jobConf), basePath.getRoot().getPath(), Arrays.asList(logFilePath), newCommitTime); //create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = @@ -167,7 +167,7 @@ public class HoodieRealtimeRecordReaderTest { //create a split with baseFile (parquet file written earlier) and new log file(s) String logFilePath = writer.getLogFile().getPath().toString(); HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(new FileSplit(new Path(partitionDir - + "/fileid0_1_" + commitTime + ".parquet"),0,1,jobConf), Arrays.asList(logFilePath), newCommitTime); + + "/fileid0_1_" + commitTime + ".parquet"),0,1,jobConf), basePath.getRoot().getPath(), Arrays.asList(logFilePath), newCommitTime); //create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java index c69b96840..bb7d6e3cc 100644 --- a/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java +++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java @@ -24,6 +24,7 @@ import com.uber.hoodie.common.BloomFilter; import com.uber.hoodie.common.minicluster.HdfsTestService; import com.uber.hoodie.common.minicluster.ZookeeperTestService; import com.uber.hoodie.common.model.CompactionWriteStat; +import com.uber.hoodie.common.model.HoodieAvroPayload; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieCompactionMetadata; import com.uber.hoodie.common.model.HoodieDataFile; @@ -120,7 +121,7 @@ public class TestUtil { fileSystem.delete(new Path(hiveSyncConfig.basePath), true); HoodieTableMetaClient .initTableType(fileSystem, hiveSyncConfig.basePath, HoodieTableType.COPY_ON_WRITE, - hiveSyncConfig.tableName); + hiveSyncConfig.tableName, HoodieAvroPayload.class.getName()); HoodieHiveClient client = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), fileSystem); @@ -156,7 +157,7 @@ public class TestUtil { FileUtils.deleteDirectory(new File(hiveSyncConfig.basePath)); HoodieTableMetaClient .initTableType(fileSystem, hiveSyncConfig.basePath, HoodieTableType.COPY_ON_WRITE, - hiveSyncConfig.tableName); + hiveSyncConfig.tableName, HoodieAvroPayload.class.getName()); boolean result = fileSystem.mkdirs(path); checkResult(result); DateTime dateTime = DateTime.now(); @@ -171,7 +172,7 @@ public class TestUtil { FileUtils.deleteDirectory(new File(hiveSyncConfig.basePath)); HoodieTableMetaClient .initTableType(fileSystem, hiveSyncConfig.basePath, HoodieTableType.MERGE_ON_READ, - hiveSyncConfig.tableName); + hiveSyncConfig.tableName, HoodieAvroPayload.class.getName()); boolean result = fileSystem.mkdirs(path); checkResult(result); diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java index 2f68c14b9..1af5c199e 100644 --- a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java @@ -21,6 +21,7 @@ package com.uber.hoodie; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieException; @@ -114,6 +115,8 @@ public class DataSourceUtils { HoodieIndexConfig.newBuilder() .withIndexType(HoodieIndex.IndexType.BLOOM) .build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withPayloadClass(parameters.get(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY())).build()) // override above with Hoodie configs specified as options. .withProps(parameters) .build(); diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java index 7f45fbea3..02fc0d7ce 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -24,7 +24,9 @@ import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; import com.uber.hoodie.DataSourceUtils; import com.uber.hoodie.HoodieWriteClient; +import com.uber.hoodie.KeyGenerator; import com.uber.hoodie.OverwriteWithLatestAvroPayload; +import com.uber.hoodie.SimpleKeyGenerator; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieRecord; @@ -33,20 +35,18 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.utilities.HiveIncrementalPuller; import com.uber.hoodie.utilities.UtilHelpers; -import com.uber.hoodie.SimpleKeyGenerator; -import com.uber.hoodie.utilities.schema.FilebasedSchemaProvider; -import com.uber.hoodie.utilities.sources.DFSSource; -import com.uber.hoodie.KeyGenerator; -import com.uber.hoodie.utilities.schema.SchemaProvider; -import com.uber.hoodie.utilities.sources.Source; import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException; +import com.uber.hoodie.utilities.schema.FilebasedSchemaProvider; +import com.uber.hoodie.utilities.schema.SchemaProvider; +import com.uber.hoodie.utilities.sources.DFSSource; +import com.uber.hoodie.utilities.sources.Source; import com.uber.hoodie.utilities.sources.SourceDataFormat; - import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.commons.configuration.PropertiesConfiguration; @@ -58,6 +58,7 @@ import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import scala.collection.JavaConversions; import java.io.IOException; import java.io.Serializable; @@ -67,8 +68,6 @@ import java.util.List; import java.util.Optional; import java.util.Properties; -import scala.collection.JavaConversions; - /** * An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply it to the target dataset. * Does not maintain any state, queries at runtime to see how far behind the target dataset is from @@ -252,6 +251,8 @@ public class HoodieDeltaStreamer implements Serializable { .combineInput(true, true) .withPath(cfg.targetBasePath) .withAutoCommit(false) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withPayloadClass(OverwriteWithLatestAvroPayload.class.getName()).build()) .withSchema(schemaProvider.getTargetSchema().toString()) .forTable(cfg.targetTableName) .withIndexConfig(