1
0

HUDI-92 : Making deltastreamer with DistributedTestSource also run locally

- Separating out the test data generators per partition
 - Minor logging improvements on IOHandle performance
This commit is contained in:
Vinoth Chandar
2019-07-19 04:53:28 -07:00
committed by Balaji Varadarajan
parent 68464c7d02
commit e20b77be3b
7 changed files with 63 additions and 45 deletions

1
.gitignore vendored
View File

@@ -6,6 +6,7 @@ target/
.DS_Store .DS_Store
*.class *.class
.java-version
# Package Files # # Package Files #
*.jar *.jar

View File

@@ -27,6 +27,7 @@ import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats; import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats;
import com.uber.hoodie.common.table.TableFileSystemView.RealtimeView; import com.uber.hoodie.common.table.TableFileSystemView.RealtimeView;
import com.uber.hoodie.common.table.log.HoodieLogFormat; import com.uber.hoodie.common.table.log.HoodieLogFormat;
@@ -247,17 +248,23 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
if (writer != null) { if (writer != null) {
writer.close(); writer.close();
} }
writeStatus.getStat().setFileId(this.fileId);
writeStatus.getStat().setNumWrites(recordsWritten); HoodieWriteStat stat = writeStatus.getStat();
writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten); stat.setFileId(this.fileId);
writeStatus.getStat().setNumInserts(insertRecordsWritten); stat.setNumWrites(recordsWritten);
writeStatus.getStat().setNumDeletes(recordsDeleted); stat.setNumUpdateWrites(updatedRecordsWritten);
writeStatus.getStat().setTotalWriteBytes(estimatedNumberOfBytesWritten); stat.setNumInserts(insertRecordsWritten);
writeStatus.getStat().setFileSizeInBytes(sizeInBytes); stat.setNumDeletes(recordsDeleted);
writeStatus.getStat().setTotalWriteErrors(writeStatus.getTotalErrorRecords()); stat.setTotalWriteBytes(estimatedNumberOfBytesWritten);
stat.setFileSizeInBytes(sizeInBytes);
stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
RuntimeStats runtimeStats = new RuntimeStats(); RuntimeStats runtimeStats = new RuntimeStats();
runtimeStats.setTotalUpsertTime(timer.endTimer()); runtimeStats.setTotalUpsertTime(timer.endTimer());
writeStatus.getStat().setRuntimeStats(runtimeStats); stat.setRuntimeStats(runtimeStats);
logger.info(String.format("AppendHandle for partitionPath %s fileID %s, took %d ms.",
stat.getPartitionPath(), stat.getFileId(), runtimeStats.getTotalUpsertTime()));
return writeStatus; return writeStatus;
} catch (IOException e) { } catch (IOException e) {
throw new HoodieUpsertException("Failed to close UpdateHandle", e); throw new HoodieUpsertException("Failed to close UpdateHandle", e);

View File

@@ -72,7 +72,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
throw new HoodieInsertException( throw new HoodieInsertException(
"Failed to initialize HoodieStorageWriter for path " + path, e); "Failed to initialize HoodieStorageWriter for path " + path, e);
} }
logger.info("New InsertHandle for partition :" + partitionPath + " with fileId " + fileId); logger.info("New CreateHandle for partition :" + partitionPath + " with fileId " + fileId);
} }
/** /**
@@ -172,6 +172,9 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
stat.setRuntimeStats(runtimeStats); stat.setRuntimeStats(runtimeStats);
writeStatus.setStat(stat); writeStatus.setStat(stat);
logger.info(String.format("CreateHandle for partitionPath %s fileID %s, took %d ms.",
stat.getPartitionPath(), stat.getFileId(), runtimeStats.getTotalCreateTime()));
return writeStatus; return writeStatus;
} catch (IOException e) { } catch (IOException e) {
throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e); throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e);

View File

@@ -327,16 +327,22 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
} }
long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath); long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);
writeStatus.getStat().setTotalWriteBytes(fileSizeInBytes); HoodieWriteStat stat = writeStatus.getStat();
writeStatus.getStat().setFileSizeInBytes(fileSizeInBytes);
writeStatus.getStat().setNumWrites(recordsWritten); stat.setTotalWriteBytes(fileSizeInBytes);
writeStatus.getStat().setNumDeletes(recordsDeleted); stat.setFileSizeInBytes(fileSizeInBytes);
writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten); stat.setNumWrites(recordsWritten);
writeStatus.getStat().setNumInserts(insertRecordsWritten); stat.setNumDeletes(recordsDeleted);
writeStatus.getStat().setTotalWriteErrors(writeStatus.getTotalErrorRecords()); stat.setNumUpdateWrites(updatedRecordsWritten);
stat.setNumInserts(insertRecordsWritten);
stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
RuntimeStats runtimeStats = new RuntimeStats(); RuntimeStats runtimeStats = new RuntimeStats();
runtimeStats.setTotalUpsertTime(timer.endTimer()); runtimeStats.setTotalUpsertTime(timer.endTimer());
writeStatus.getStat().setRuntimeStats(runtimeStats); stat.setRuntimeStats(runtimeStats);
logger.info(String.format("MergeHandle for partitionPath %s fileID %s, took %d ms.",
stat.getPartitionPath(), stat.getFileId(), runtimeStats.getTotalUpsertTime()));
return writeStatus; return writeStatus;
} catch (IOException e) { } catch (IOException e) {
throw new HoodieUpsertException("Failed to close UpdateHandle", e); throw new HoodieUpsertException("Failed to close UpdateHandle", e);

View File

@@ -9,9 +9,8 @@ import com.uber.hoodie.utilities.schema.SchemaProvider;
import com.uber.hoodie.utilities.sources.config.TestSourceConfig; import com.uber.hoodie.utilities.sources.config.TestSourceConfig;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
@@ -21,34 +20,35 @@ import org.apache.spark.sql.SparkSession;
public abstract class AbstractBaseTestSource extends AvroSource { public abstract class AbstractBaseTestSource extends AvroSource {
static final int DEFAULT_PARTITION_NUM = 0;
// Static instance, helps with reuse across a test. // Static instance, helps with reuse across a test.
protected static transient HoodieTestDataGenerator dataGenerator; protected static transient Map<Integer, HoodieTestDataGenerator> dataGeneratorMap = new HashMap<>();
public static void initDataGen() { public static void initDataGen() {
dataGenerator = new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS); dataGeneratorMap.putIfAbsent(DEFAULT_PARTITION_NUM,
new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS));
} }
public static void initDataGen(TypedProperties props) { public static void initDataGen(TypedProperties props, int partition) {
try { try {
boolean useRocksForTestDataGenKeys = props.getBoolean(TestSourceConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS, boolean useRocksForTestDataGenKeys = props.getBoolean(TestSourceConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS,
TestSourceConfig.DEFAULT_USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS); TestSourceConfig.DEFAULT_USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS);
String baseStoreDir = props.getString(TestSourceConfig.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS, null); String baseStoreDir = props.getString(TestSourceConfig.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS,
if (null == baseStoreDir) { File.createTempFile("test_data_gen", ".keys").getParent()) + "/" + partition;
baseStoreDir = File.createTempFile("test_data_gen", ".keys").getParent();
}
log.info("useRocksForTestDataGenKeys=" + useRocksForTestDataGenKeys + ", BaseStoreDir=" + baseStoreDir); log.info("useRocksForTestDataGenKeys=" + useRocksForTestDataGenKeys + ", BaseStoreDir=" + baseStoreDir);
dataGenerator = new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, dataGeneratorMap.put(partition, new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS,
useRocksForTestDataGenKeys ? new RocksDBBasedMap<>(baseStoreDir) : new HashMap<>()); useRocksForTestDataGenKeys ? new RocksDBBasedMap<>(baseStoreDir) : new HashMap<>()));
} catch (IOException e) { } catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e); throw new HoodieIOException(e.getMessage(), e);
} }
} }
public static void resetDataGen() { public static void resetDataGen() {
if (null != dataGenerator) { for (HoodieTestDataGenerator dataGenerator : dataGeneratorMap.values()) {
dataGenerator.close(); dataGenerator.close();
} }
dataGenerator = null; dataGeneratorMap.clear();
} }
protected AbstractBaseTestSource(TypedProperties props, protected AbstractBaseTestSource(TypedProperties props,
@@ -57,10 +57,13 @@ public abstract class AbstractBaseTestSource extends AvroSource {
super(props, sparkContext, sparkSession, schemaProvider); super(props, sparkContext, sparkSession, schemaProvider);
} }
protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props, int sourceLimit, String commitTime) { protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props, int sourceLimit, String commitTime,
int partition) {
int maxUniqueKeys = props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, int maxUniqueKeys = props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP,
TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS); TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS);
HoodieTestDataGenerator dataGenerator = dataGeneratorMap.get(partition);
// generate `sourceLimit` number of upserts each time. // generate `sourceLimit` number of upserts each time.
int numExistingKeys = dataGenerator.getNumExistingKeys(); int numExistingKeys = dataGenerator.getNumExistingKeys();
log.info("NumExistingKeys=" + numExistingKeys); log.info("NumExistingKeys=" + numExistingKeys);
@@ -84,15 +87,14 @@ public abstract class AbstractBaseTestSource extends AvroSource {
log.info("Before DataGen. Memory Usage=" + memoryUsage1 + ", Total Memory=" + Runtime.getRuntime().totalMemory() log.info("Before DataGen. Memory Usage=" + memoryUsage1 + ", Total Memory=" + Runtime.getRuntime().totalMemory()
+ ", Free Memory=" + Runtime.getRuntime().freeMemory()); + ", Free Memory=" + Runtime.getRuntime().freeMemory());
List<GenericRecord> records = new ArrayList<>();
Stream<GenericRecord> updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates) Stream<GenericRecord> updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates)
.map(AbstractBaseTestSource::toGenericRecord); .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
Stream<GenericRecord> insertStream = dataGenerator.generateInsertsStream(commitTime, numInserts) Stream<GenericRecord> insertStream = dataGenerator.generateInsertsStream(commitTime, numInserts)
.map(AbstractBaseTestSource::toGenericRecord); .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
return Stream.concat(updateStream, insertStream); return Stream.concat(updateStream, insertStream);
} }
private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord) { private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, HoodieTestDataGenerator dataGenerator) {
try { try {
Optional<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.avroSchema); Optional<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.avroSchema);
return (GenericRecord) recordOpt.get(); return (GenericRecord) recordOpt.get();

View File

@@ -66,14 +66,14 @@ public class DistributedTestDataSource extends AbstractBaseTestSource {
newProps.setProperty(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, maxUniqueRecordsPerPartition); newProps.setProperty(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, maxUniqueRecordsPerPartition);
int perPartitionSourceLimit = Math.max(1, (int) (sourceLimit / numTestSourcePartitions)); int perPartitionSourceLimit = Math.max(1, (int) (sourceLimit / numTestSourcePartitions));
JavaRDD<GenericRecord> avroRDD = sparkContext.parallelize(IntStream.range(0, numTestSourcePartitions).boxed() JavaRDD<GenericRecord> avroRDD = sparkContext.parallelize(IntStream.range(0, numTestSourcePartitions).boxed()
.collect(Collectors.toList()), numTestSourcePartitions).mapPartitions(idx -> { .collect(Collectors.toList()), numTestSourcePartitions).mapPartitionsWithIndex((p, idx) -> {
log.info("Initializing source with newProps=" + newProps); log.info("Initializing source with newProps=" + newProps);
if (null == dataGenerator) { if (!dataGeneratorMap.containsKey(p)) {
initDataGen(newProps); initDataGen(newProps, p);
} }
Iterator<GenericRecord> itr = fetchNextBatch(newProps, perPartitionSourceLimit, commitTime).iterator(); Iterator<GenericRecord> itr = fetchNextBatch(newProps, perPartitionSourceLimit, commitTime, p).iterator();
return itr; return itr;
}); }, true);
return new InputBatch<>(Optional.of(avroRDD), commitTime); return new InputBatch<>(Optional.of(avroRDD), commitTime);
} }
} }

View File

@@ -40,9 +40,7 @@ public class TestDataSource extends AbstractBaseTestSource {
public TestDataSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, public TestDataSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) { SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider); super(props, sparkContext, sparkSession, schemaProvider);
if (null == dataGenerator) { initDataGen();
initDataGen(props);
}
} }
@Override @Override
@@ -58,7 +56,8 @@ public class TestDataSource extends AbstractBaseTestSource {
return new InputBatch<>(Optional.empty(), commitTime); return new InputBatch<>(Optional.empty(), commitTime);
} }
List<GenericRecord> records = fetchNextBatch(props, (int)sourceLimit, commitTime).collect(Collectors.toList()); List<GenericRecord> records = fetchNextBatch(props, (int)sourceLimit, commitTime, DEFAULT_PARTITION_NUM)
.collect(Collectors.toList());
JavaRDD<GenericRecord> avroRDD = sparkContext.<GenericRecord>parallelize(records, 4); JavaRDD<GenericRecord> avroRDD = sparkContext.<GenericRecord>parallelize(records, 4);
return new InputBatch<>(Optional.of(avroRDD), commitTime); return new InputBatch<>(Optional.of(avroRDD), commitTime);
} }