1
0

Implementing custom payload/merge hooks abstractions for application specific merge logic

This commit is contained in:
Nishith Agarwal
2017-09-26 11:16:35 -07:00
committed by vinoth chandar
parent c7d63a7622
commit abe964bebd
16 changed files with 176 additions and 94 deletions

View File

@@ -17,10 +17,12 @@
package com.uber.hoodie.config;
import com.google.common.base.Preconditions;
import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.model.HoodieCleaningPolicy;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.io.compact.strategy.CompactionStrategy;
import com.uber.hoodie.io.compact.strategy.LogFileSizeBasedCompactionStrategy;
import javax.annotation.concurrent.Immutable;
import java.io.File;
import java.io.FileReader;
@@ -93,6 +95,10 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
// 200GB of target IO per compaction
public static final String DEFAULT_COMPACTION_STRATEGY = LogFileSizeBasedCompactionStrategy.class.getName();
// used to merge records written to log file
public static final String DEFAULT_PAYLOAD_CLASS = HoodieAvroPayload.class.getName();
public static final String PAYLOAD_CLASS = "hoodie.compaction.payload.class";
private HoodieCompactionConfig(Properties props) {
super(props);
}
@@ -187,6 +193,11 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
return this;
}
public Builder withPayloadClass(String payloadClassName) {
props.setProperty(PAYLOAD_CLASS, payloadClassName);
return this;
}
public Builder withTargetIOPerCompactionInMB(long targetIOPerCompactionInMB) {
props.setProperty(TARGET_IO_PER_COMPACTION_IN_MB_PROP, String.valueOf(targetIOPerCompactionInMB));
return this;
@@ -222,6 +233,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
CLEANER_PARALLELISM, DEFAULT_CLEANER_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(COMPACTION_STRATEGY_PROP),
COMPACTION_STRATEGY_PROP, DEFAULT_COMPACTION_STRATEGY);
setDefaultOnCondition(props, !props.containsKey(PAYLOAD_CLASS),
PAYLOAD_CLASS, DEFAULT_PAYLOAD_CLASS);
setDefaultOnCondition(props, !props.containsKey(TARGET_IO_PER_COMPACTION_IN_MB_PROP),
TARGET_IO_PER_COMPACTION_IN_MB_PROP, DEFAULT_TARGET_IO_PER_COMPACTION_IN_MB);

View File

@@ -23,6 +23,7 @@ import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.CompactionWriteStat;
import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.model.HoodieCompactionMetadata;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
@@ -155,14 +156,15 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
HoodieTimeline.DELTA_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp();
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, operation.getDeltaFilePaths(), readerSchema, maxInstantTime);
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, metaClient.getBasePath(),
operation.getDeltaFilePaths(), readerSchema, maxInstantTime);
if (!scanner.iterator().hasNext()) {
return Lists.newArrayList();
}
// Compacting is very similar to applying updates to existing file
HoodieCopyOnWriteTable<HoodieAvroPayload> table =
new HoodieCopyOnWriteTable<>(config, metaClient);
HoodieCopyOnWriteTable table =
new HoodieCopyOnWriteTable(config, metaClient);
Iterator<List<WriteStatus>> result = table
.handleUpdate(commitTime, operation.getFileId(), scanner.iterator());
Iterable<List<WriteStatus>> resultIterable = () -> result;

View File

@@ -19,6 +19,7 @@ import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.HoodieTableConfig;
@@ -84,7 +85,7 @@ public class HoodieClientExample {
Path path = new Path(tablePath);
FileSystem fs = FSUtils.getFs();
if (!fs.exists(path)) {
HoodieTableMetaClient.initTableType(fs, tablePath, HoodieTableType.valueOf(tableType), tableName);
HoodieTableMetaClient.initTableType(fs, tablePath, HoodieTableType.valueOf(tableType), tableName, HoodieAvroPayload.class.getName());
}
// Create the write client to write some records in

View File

@@ -16,8 +16,10 @@
package com.uber.hoodie.common.table;
import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.model.HoodieFileFormat;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -49,10 +51,12 @@ public class HoodieTableConfig implements Serializable {
"hoodie.table.ro.file.format";
public static final String HOODIE_RT_FILE_FORMAT_PROP_NAME =
"hoodie.table.rt.file.format";
public static final String HOODIE_PAYLOAD_CLASS_PROP_NAME = "hoodie.compaction.payload.class";
public static final HoodieTableType DEFAULT_TABLE_TYPE = HoodieTableType.COPY_ON_WRITE;
public static final HoodieFileFormat DEFAULT_RO_FILE_FORMAT = HoodieFileFormat.PARQUET;
public static final HoodieFileFormat DEFAULT_RT_FILE_FORMAT = HoodieFileFormat.HOODIE_LOG;
public static final String DEFAULT_PAYLOAD_CLASS = HoodieAvroPayload.class.getName();
private Properties props;
public HoodieTableConfig(FileSystem fs, String metaPath) {
@@ -98,6 +102,10 @@ public class HoodieTableConfig implements Serializable {
if (!properties.containsKey(HOODIE_TABLE_TYPE_PROP_NAME)) {
properties.setProperty(HOODIE_TABLE_TYPE_PROP_NAME, DEFAULT_TABLE_TYPE.name());
}
if (properties.getProperty(HOODIE_TABLE_TYPE_PROP_NAME) == HoodieTableType.MERGE_ON_READ.name()
&& !properties.containsKey(HOODIE_PAYLOAD_CLASS_PROP_NAME)) {
properties.setProperty(HOODIE_PAYLOAD_CLASS_PROP_NAME, DEFAULT_PAYLOAD_CLASS);
}
properties
.store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis()));
} finally {
@@ -118,6 +126,15 @@ public class HoodieTableConfig implements Serializable {
return DEFAULT_TABLE_TYPE;
}
/**
* Read the payload class for HoodieRecords from the table properties
*
* @return
*/
public String getPayloadClass() {
return props.getProperty(HOODIE_PAYLOAD_CLASS_PROP_NAME, DEFAULT_PAYLOAD_CLASS);
}
/**
* Read the table name
*

View File

@@ -182,10 +182,13 @@ public class HoodieTableMetaClient implements Serializable {
* @return
* @throws IOException
*/
public static HoodieTableMetaClient initTableType(FileSystem fs, String basePath, HoodieTableType tableType, String tableName) throws IOException {
public static HoodieTableMetaClient initTableType(FileSystem fs, String basePath, HoodieTableType tableType, String tableName, String payloadClassName) throws IOException {
Properties properties = new Properties();
properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName);
properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name());
if(tableType == HoodieTableType.MERGE_ON_READ) {
properties.setProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, payloadClassName);
}
return HoodieTableMetaClient.initializePathAsHoodieDataset(fs, basePath, properties);
}

View File

@@ -17,15 +17,17 @@
package com.uber.hoodie.common.table.log;
import com.google.common.collect.Maps;
import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.log.block.HoodieCommandBlock;
import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.exception.HoodieIOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -55,13 +57,12 @@ import static com.uber.hoodie.common.table.log.block.HoodieLogBlock.LogMetadataT
* list of records which will be used as a lookup table when merging the base columnar file
* with the redo log file.
*
* TODO(FIX) - Does not apply application specific merge logic - defaults to HoodieAvroPayload
*/
public class HoodieCompactedLogRecordScanner implements Iterable<HoodieRecord<HoodieAvroPayload>> {
public class HoodieCompactedLogRecordScanner implements Iterable<HoodieRecord<? extends HoodieRecordPayload>> {
private final static Logger log = LogManager.getLogger(HoodieCompactedLogRecordScanner.class);
// Final list of compacted/merged records to iterate
private final Collection<HoodieRecord<HoodieAvroPayload>> logRecords;
private final Collection<HoodieRecord<? extends HoodieRecordPayload>> logRecords;
// Reader schema for the records
private final Schema readerSchema;
// Total log files read - for metrics
@@ -72,16 +73,22 @@ public class HoodieCompactedLogRecordScanner implements Iterable<HoodieRecord<Ho
private long totalRecordsToUpdate;
// Latest valid instant time
private String latestInstantTime;
private HoodieTableMetaClient hoodieTableMetaClient;
// Merge strategy to use when combining records from log
private String payloadClassFQN;
// Store only the last log blocks (needed to implement rollback)
Deque<HoodieLogBlock> lastBlocks = new ArrayDeque<>();
public HoodieCompactedLogRecordScanner(FileSystem fs, List<String> logFilePaths,
public HoodieCompactedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths,
Schema readerSchema, String latestInstantTime) {
this.readerSchema = readerSchema;
this.latestInstantTime = latestInstantTime;
this.hoodieTableMetaClient = new HoodieTableMetaClient(fs, basePath);
// load class from the payload fully qualified class name
this.payloadClassFQN = this.hoodieTableMetaClient.getTableConfig().getPayloadClass();
// Store only the last log blocks (needed to implement rollback)
Deque<HoodieLogBlock> lastBlocks = new ArrayDeque<>();
// Store merged records for all versions for this log file
Map<String, HoodieRecord<HoodieAvroPayload>> records = Maps.newHashMap();
Map<String, HoodieRecord<? extends HoodieRecordPayload>> records = Maps.newHashMap();
// iterate over the paths
Iterator<String> logFilePathsItr = logFilePaths.iterator();
while(logFilePathsItr.hasNext()) {
@@ -132,7 +139,7 @@ public class HoodieCompactedLogRecordScanner implements Iterable<HoodieRecord<Ho
// the rollback operation itself
HoodieLogBlock lastBlock = lastBlocks.peek();
if (lastBlock != null && lastBlock.getBlockType() != CORRUPT_BLOCK &&
targetInstantForCommandBlock.contentEquals(lastBlock.getLogMetadata().get(INSTANT_TIME))) {
targetInstantForCommandBlock.contentEquals(lastBlock.getLogMetadata().get(INSTANT_TIME))) {
log.info("Rolling back the last log block read in " + logFile.getPath());
lastBlocks.pop();
} else if(lastBlock != null && lastBlock.getBlockType() == CORRUPT_BLOCK) {
@@ -169,29 +176,30 @@ public class HoodieCompactedLogRecordScanner implements Iterable<HoodieRecord<Ho
/**
* Iterate over the GenericRecord in the block, read the hoodie key and partition path
* and merge with the HoodieAvroPayload if the same key was found before
* and merge with the application specific payload if the same key was found before
* Sufficient to just merge the log records since the base data is merged on previous compaction
*
* @param dataBlock
*/
private Map<String, HoodieRecord<HoodieAvroPayload>> loadRecordsFromBlock(HoodieAvroDataBlock dataBlock) {
Map<String, HoodieRecord<HoodieAvroPayload>> recordsFromLastBlock = Maps.newHashMap();
private Map<String, HoodieRecord<? extends HoodieRecordPayload>> loadRecordsFromBlock(HoodieAvroDataBlock dataBlock) {
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordsFromLastBlock = Maps.newHashMap();
List<IndexedRecord> recs = dataBlock.getRecords();
totalLogRecords.addAndGet(recs.size());
recs.forEach(rec -> {
String key = ((GenericRecord) rec).get(HoodieRecord.RECORD_KEY_METADATA_FIELD)
.toString();
String partitionPath =
((GenericRecord) rec).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
.toString();
HoodieRecord<HoodieAvroPayload> hoodieRecord = new HoodieRecord<>(
new HoodieKey(key, partitionPath),
new HoodieAvroPayload(Optional.of(((GenericRecord) rec))));
String partitionPath =
((GenericRecord) rec).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
.toString();
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = new HoodieRecord<>(
new HoodieKey(key, partitionPath),
ReflectionUtils.loadPayload(this.payloadClassFQN, new Object[]{Optional.of(rec)}, Optional.class));
if (recordsFromLastBlock.containsKey(key)) {
// Merge and store the merged record
HoodieAvroPayload combinedValue = recordsFromLastBlock.get(key).getData()
.preCombine(hoodieRecord.getData());
HoodieRecordPayload combinedValue = recordsFromLastBlock.get(key).getData()
.preCombine(hoodieRecord.getData());
recordsFromLastBlock
.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()),
.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()),
combinedValue));
} else {
// Put the record as is
@@ -207,8 +215,8 @@ public class HoodieCompactedLogRecordScanner implements Iterable<HoodieRecord<Ho
* @param records
* @param lastBlocks
*/
private void merge(Map<String, HoodieRecord<HoodieAvroPayload>> records,
Deque<HoodieLogBlock> lastBlocks) {
private void merge(Map<String, HoodieRecord<? extends HoodieRecordPayload>> records,
Deque<HoodieLogBlock> lastBlocks) {
while (!lastBlocks.isEmpty()) {
HoodieLogBlock lastBlock = lastBlocks.pop();
switch (lastBlock.getBlockType()) {
@@ -230,15 +238,15 @@ public class HoodieCompactedLogRecordScanner implements Iterable<HoodieRecord<Ho
* @param records
* @param recordsFromLastBlock
*/
private void merge(Map<String, HoodieRecord<HoodieAvroPayload>> records,
Map<String, HoodieRecord<HoodieAvroPayload>> recordsFromLastBlock) {
private void merge(Map<String, HoodieRecord<? extends HoodieRecordPayload>> records,
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordsFromLastBlock) {
recordsFromLastBlock.forEach((key, hoodieRecord) -> {
if (records.containsKey(key)) {
// Merge and store the merged record
HoodieAvroPayload combinedValue = records.get(key).getData()
.preCombine(hoodieRecord.getData());
HoodieRecordPayload combinedValue = records.get(key).getData()
.preCombine(hoodieRecord.getData());
records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()),
combinedValue));
combinedValue));
} else {
// Put the record as is
records.put(key, hoodieRecord);
@@ -247,7 +255,7 @@ public class HoodieCompactedLogRecordScanner implements Iterable<HoodieRecord<Ho
}
@Override
public Iterator<HoodieRecord<HoodieAvroPayload>> iterator() {
public Iterator<HoodieRecord<? extends HoodieRecordPayload>> iterator() {
return logRecords.iterator();
}

View File

@@ -19,43 +19,55 @@ package com.uber.hoodie.common.util;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.exception.HoodieException;
import org.apache.avro.generic.GenericRecord;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
public class ReflectionUtils {
private static Map<String, Class<?>> clazzCache = new HashMap<>();
private static Map<String, Class<?>> clazzCache = new HashMap<>();
public static <T extends HoodieRecordPayload> T loadPayload(String recordPayloadClass) throws IOException {
try {
if(clazzCache.get(recordPayloadClass) == null) {
Class<?> clazz = Class.<HoodieRecordPayload>forName(recordPayloadClass);
clazzCache.put(recordPayloadClass, clazz);
}
return (T) clazzCache.get(recordPayloadClass).newInstance();
} catch (ClassNotFoundException e) {
throw new IOException("Could not load payload class " + recordPayloadClass, e);
} catch (InstantiationException e) {
throw new IOException("Could not load payload class " + recordPayloadClass, e);
} catch (IllegalAccessException e) {
throw new IOException("Could not load payload class " + recordPayloadClass, e);
}
public static <T> T loadClass(String fqcn) {
try {
if(clazzCache.get(fqcn) == null) {
Class<?> clazz = Class.<HoodieRecordPayload>forName(fqcn);
clazzCache.put(fqcn, clazz);
}
return (T) clazzCache.get(fqcn).newInstance();
} catch (ClassNotFoundException e) {
throw new HoodieException("Could not load class " + fqcn, e);
} catch (InstantiationException e) {
throw new HoodieException("Could not load class " + fqcn, e);
} catch (IllegalAccessException e) {
throw new HoodieException("Could not load class " + fqcn, e);
}
}
public static <T> T loadClass(String fqcn) {
try {
if(clazzCache.get(fqcn) == null) {
Class<?> clazz = Class.<HoodieRecordPayload>forName(fqcn);
clazzCache.put(fqcn, clazz);
}
return (T) clazzCache.get(fqcn).newInstance();
} catch (ClassNotFoundException e) {
throw new HoodieException("Could not load class " + fqcn, e);
} catch (InstantiationException e) {
throw new HoodieException("Could not load class " + fqcn, e);
} catch (IllegalAccessException e) {
throw new HoodieException("Could not load class " + fqcn, e);
}
/**
* Instantiate a given class with a generic record payload
*
* @param recordPayloadClass
* @param payloadArgs
* @param <T>
* @return
*/
public static <T extends HoodieRecordPayload> T loadPayload(String recordPayloadClass,
Object [] payloadArgs,
Class<?> ... constructorArgTypes) {
try {
if(clazzCache.get(recordPayloadClass) == null) {
Class<?> clazz = Class.<HoodieRecordPayload>forName(recordPayloadClass);
clazzCache.put(recordPayloadClass, clazz);
}
return (T) clazzCache.get(recordPayloadClass).getConstructor(constructorArgTypes).newInstance(payloadArgs);
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
throw new HoodieException("Unable to instantiate payload class ", e);
} catch (ClassNotFoundException e) {
throw new HoodieException("Unable to instantiate payload class ", e);
}
}
}

View File

@@ -88,6 +88,7 @@ public class HoodieTestUtils {
Properties properties = new Properties();
properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, RAW_TRIPS_TEST_NAME);
properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name());
properties.setProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, HoodieAvroPayload.class.getName());
return HoodieTableMetaClient.initializePathAsHoodieDataset(fs, basePath, properties);
}

View File

@@ -20,6 +20,9 @@ import com.google.common.collect.Maps;
import com.uber.hoodie.common.minicluster.MiniClusterUtil;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.log.HoodieLogFormat.Reader;
import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
@@ -66,6 +69,7 @@ public class HoodieLogFormatTest {
private FileSystem fs;
private Path partitionPath;
private String basePath;
@BeforeClass
public static void setUpClass() throws IOException, InterruptedException {
@@ -76,6 +80,7 @@ public class HoodieLogFormatTest {
@AfterClass
public static void tearDownClass() {
MiniClusterUtil.shutdown();
HoodieTestUtils.resetFS();
}
@Before
@@ -85,6 +90,9 @@ public class HoodieLogFormatTest {
folder.create();
assertTrue(fs.mkdirs(new Path(folder.getRoot().getPath())));
this.partitionPath = new Path(folder.getRoot().getPath());
this.basePath = folder.getRoot().getParent();
HoodieTestUtils.fs = fs;
HoodieTestUtils.initTableType(basePath, HoodieTableType.MERGE_ON_READ);
}
@After
@@ -430,7 +438,8 @@ public class HoodieLogFormatTest {
.map(s -> s.getPath().toString())
.collect(Collectors.toList());
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100");
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles,
schema, "100");
assertEquals("", 200, scanner.getTotalLogRecords());
Set<String> readKeys = new HashSet<>(200);
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
@@ -484,9 +493,9 @@ public class HoodieLogFormatTest {
.map(s -> s.getPath().toString())
.collect(Collectors.toList());
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100");
assertEquals("We only read 200 records, since 200 of them are valid", 200,
scanner.getTotalLogRecords());
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles,
schema, "100");
assertEquals("We only read 200 records, but only 200 of them are valid", 200, scanner.getTotalLogRecords());
Set<String> readKeys = new HashSet<>(200);
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
assertEquals("Stream collect should return all 200 records", 200, readKeys.size());
@@ -552,7 +561,8 @@ public class HoodieLogFormatTest {
.map(s -> s.getPath().toString())
.collect(Collectors.toList());
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100");
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles,
schema, "100");
assertEquals("We would read 200 records", 200,
scanner.getTotalLogRecords());
Set<String> readKeys = new HashSet<>(200);
@@ -607,7 +617,8 @@ public class HoodieLogFormatTest {
.map(s -> s.getPath().toString())
.collect(Collectors.toList());
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100");
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles,
schema, "100");
assertEquals("We still would read 200 records", 200,
scanner.getTotalLogRecords());
final List<String> readKeys = new ArrayList<>(200);
@@ -625,7 +636,7 @@ public class HoodieLogFormatTest {
writer = writer.appendBlock(commandBlock);
readKeys.clear();
scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100");
scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, "100");
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
assertEquals("Stream collect should return all 200 records after rollback of delete", 200, readKeys.size());
}
@@ -684,7 +695,8 @@ public class HoodieLogFormatTest {
.map(s -> s.getPath().toString())
.collect(Collectors.toList());
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100");
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles,
schema, "100");
assertEquals("We would read 100 records", 100,
scanner.getTotalLogRecords());
@@ -734,7 +746,8 @@ public class HoodieLogFormatTest {
.map(s -> s.getPath().toString())
.collect(Collectors.toList());
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100");
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath,
allLogFiles, schema, "100");
assertEquals("We would read 0 records", 0,
scanner.getTotalLogRecords());
}
@@ -766,7 +779,8 @@ public class HoodieLogFormatTest {
.map(s -> s.getPath().toString())
.collect(Collectors.toList());
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100");
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath,
allLogFiles, schema, "100");
assertEquals("We still would read 100 records", 100,
scanner.getTotalLogRecords());
final List<String> readKeys = new ArrayList<>(100);

View File

@@ -35,15 +35,17 @@ public class HoodieRealtimeFileSplit extends FileSplit {
private String maxCommitTime;
private String basePath;
public HoodieRealtimeFileSplit() {
super();
}
public HoodieRealtimeFileSplit(FileSplit baseSplit, List<String> deltaLogFiles, String maxCommitTime) throws IOException {
public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List<String> deltaLogFiles, String maxCommitTime) throws IOException {
super(baseSplit.getPath(), baseSplit.getStart(), baseSplit.getLength(), baseSplit.getLocations());
this.deltaFilePaths = deltaLogFiles;
this.maxCommitTime = maxCommitTime;
this.basePath = basePath;
}
public List<String> getDeltaFilePaths() {
@@ -54,6 +56,10 @@ public class HoodieRealtimeFileSplit extends FileSplit {
return maxCommitTime;
}
public String getBasePath() {
return basePath;
}
private static void writeString(String str, DataOutput out) throws IOException {
byte[] pathBytes = str.getBytes(StandardCharsets.UTF_8);
out.writeInt(pathBytes.length);

View File

@@ -127,7 +127,7 @@ public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Conf
HoodieTimeline.DELTA_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp();
rtSplits.add(
new HoodieRealtimeFileSplit(split, logFilePaths, maxCommitTime));
new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime));
} catch (IOException e) {
throw new HoodieIOException("Error creating hoodie real time split ", e);
}

View File

@@ -20,8 +20,10 @@ package com.uber.hoodie.hadoop.realtime;
import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.log.HoodieCompactedLogRecordScanner;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import org.apache.avro.Schema;
@@ -122,14 +124,12 @@ public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWrita
LOG.info(
String.format("About to read compacted logs %s for base split %s, projecting cols %s",
split.getDeltaFilePaths(), split.getPath(), projectionFields));
HoodieCompactedLogRecordScanner compactedLogRecordScanner =
new HoodieCompactedLogRecordScanner(FSUtils.getFs(), split.getDeltaFilePaths(),
new HoodieCompactedLogRecordScanner(FSUtils.getFs(), split.getBasePath(), split.getDeltaFilePaths(),
readerSchema, split.getMaxCommitTime());
// NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit
// but can return records for completed commits > the commit we are trying to read (if using readCommit() API)
for (HoodieRecord<HoodieAvroPayload> hoodieRecord : compactedLogRecordScanner) {
for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : compactedLogRecordScanner) {
GenericRecord rec = (GenericRecord) hoodieRecord.getData().getInsertValue(readerSchema)
.get();
String key = hoodieRecord.getRecordKey();

View File

@@ -113,7 +113,7 @@ public class HoodieRealtimeRecordReaderTest {
//create a split with baseFile (parquet file written earlier) and new log file(s)
String logFilePath = writer.getLogFile().getPath().toString();
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(new FileSplit(new Path(partitionDir
+ "/fileid0_1_" + commitTime + ".parquet"),0,1,jobConf), Arrays.asList(logFilePath), newCommitTime);
+ "/fileid0_1_" + commitTime + ".parquet"),0,1,jobConf), basePath.getRoot().getPath(), Arrays.asList(logFilePath), newCommitTime);
//create a RecordReader to be used by HoodieRealtimeRecordReader
RecordReader<Void, ArrayWritable> reader =
@@ -167,7 +167,7 @@ public class HoodieRealtimeRecordReaderTest {
//create a split with baseFile (parquet file written earlier) and new log file(s)
String logFilePath = writer.getLogFile().getPath().toString();
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(new FileSplit(new Path(partitionDir
+ "/fileid0_1_" + commitTime + ".parquet"),0,1,jobConf), Arrays.asList(logFilePath), newCommitTime);
+ "/fileid0_1_" + commitTime + ".parquet"),0,1,jobConf), basePath.getRoot().getPath(), Arrays.asList(logFilePath), newCommitTime);
//create a RecordReader to be used by HoodieRealtimeRecordReader
RecordReader<Void, ArrayWritable> reader =

View File

@@ -24,6 +24,7 @@ import com.uber.hoodie.common.BloomFilter;
import com.uber.hoodie.common.minicluster.HdfsTestService;
import com.uber.hoodie.common.minicluster.ZookeeperTestService;
import com.uber.hoodie.common.model.CompactionWriteStat;
import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieCompactionMetadata;
import com.uber.hoodie.common.model.HoodieDataFile;
@@ -120,7 +121,7 @@ public class TestUtil {
fileSystem.delete(new Path(hiveSyncConfig.basePath), true);
HoodieTableMetaClient
.initTableType(fileSystem, hiveSyncConfig.basePath, HoodieTableType.COPY_ON_WRITE,
hiveSyncConfig.tableName);
hiveSyncConfig.tableName, HoodieAvroPayload.class.getName());
HoodieHiveClient client = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(),
fileSystem);
@@ -156,7 +157,7 @@ public class TestUtil {
FileUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
HoodieTableMetaClient
.initTableType(fileSystem, hiveSyncConfig.basePath, HoodieTableType.COPY_ON_WRITE,
hiveSyncConfig.tableName);
hiveSyncConfig.tableName, HoodieAvroPayload.class.getName());
boolean result = fileSystem.mkdirs(path);
checkResult(result);
DateTime dateTime = DateTime.now();
@@ -171,7 +172,7 @@ public class TestUtil {
FileUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
HoodieTableMetaClient
.initTableType(fileSystem, hiveSyncConfig.basePath, HoodieTableType.MERGE_ON_READ,
hiveSyncConfig.tableName);
hiveSyncConfig.tableName, HoodieAvroPayload.class.getName());
boolean result = fileSystem.mkdirs(path);
checkResult(result);

View File

@@ -21,6 +21,7 @@ package com.uber.hoodie;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieException;
@@ -114,6 +115,8 @@ public class DataSourceUtils {
HoodieIndexConfig.newBuilder()
.withIndexType(HoodieIndex.IndexType.BLOOM)
.build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withPayloadClass(parameters.get(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY())).build())
// override above with Hoodie configs specified as options.
.withProps(parameters)
.build();

View File

@@ -24,7 +24,9 @@ import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.uber.hoodie.DataSourceUtils;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.KeyGenerator;
import com.uber.hoodie.OverwriteWithLatestAvroPayload;
import com.uber.hoodie.SimpleKeyGenerator;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieRecord;
@@ -33,20 +35,18 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.utilities.HiveIncrementalPuller;
import com.uber.hoodie.utilities.UtilHelpers;
import com.uber.hoodie.SimpleKeyGenerator;
import com.uber.hoodie.utilities.schema.FilebasedSchemaProvider;
import com.uber.hoodie.utilities.sources.DFSSource;
import com.uber.hoodie.KeyGenerator;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import com.uber.hoodie.utilities.sources.Source;
import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException;
import com.uber.hoodie.utilities.schema.FilebasedSchemaProvider;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import com.uber.hoodie.utilities.sources.DFSSource;
import com.uber.hoodie.utilities.sources.Source;
import com.uber.hoodie.utilities.sources.SourceDataFormat;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.configuration.PropertiesConfiguration;
@@ -58,6 +58,7 @@ import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.collection.JavaConversions;
import java.io.IOException;
import java.io.Serializable;
@@ -67,8 +68,6 @@ import java.util.List;
import java.util.Optional;
import java.util.Properties;
import scala.collection.JavaConversions;
/**
* An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply it to the target dataset.
* Does not maintain any state, queries at runtime to see how far behind the target dataset is from
@@ -252,6 +251,8 @@ public class HoodieDeltaStreamer implements Serializable {
.combineInput(true, true)
.withPath(cfg.targetBasePath)
.withAutoCommit(false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withPayloadClass(OverwriteWithLatestAvroPayload.class.getName()).build())
.withSchema(schemaProvider.getTargetSchema().toString())
.forTable(cfg.targetTableName)
.withIndexConfig(