1
0

[HUDI-714]Add javadoc and comments to hudi write method link (#1409)

* [HUDI-714] Add javadoc and comments to hudi write method link
This commit is contained in:
Mathieu
2020-05-16 20:36:51 +08:00
committed by GitHub
parent 148b2458f6
commit 25a0080b2f
9 changed files with 64 additions and 26 deletions

View File

@@ -225,6 +225,12 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
return index; return index;
} }
/**
* Get HoodieTable and init {@link Timer.Context}.
*
* @param operationType write operation type
* @return HoodieTable
*/
protected HoodieTable getTableAndInitCtx(WriteOperationType operationType) { protected HoodieTable getTableAndInitCtx(WriteOperationType operationType) {
HoodieTableMetaClient metaClient = createMetaClient(true); HoodieTableMetaClient metaClient = createMetaClient(true);
if (operationType == WriteOperationType.DELETE) { if (operationType == WriteOperationType.DELETE) {

View File

@@ -49,7 +49,9 @@ public class WorkloadProfile<T extends HoodieRecordPayload> implements Serializa
*/ */
private final HashMap<String, WorkloadStat> partitionPathStatMap; private final HashMap<String, WorkloadStat> partitionPathStatMap;
/**
* Global workloadStat.
*/
private final WorkloadStat globalStat; private final WorkloadStat globalStat;
public WorkloadProfile(JavaRDD<HoodieRecord<T>> taggedRecords) { public WorkloadProfile(JavaRDD<HoodieRecord<T>> taggedRecords) {
@@ -59,13 +61,18 @@ public class WorkloadProfile<T extends HoodieRecordPayload> implements Serializa
buildProfile(); buildProfile();
} }
/**
* Method help to build WorkloadProfile.
*/
private void buildProfile() { private void buildProfile() {
// group the records by partitionPath + currentLocation combination, count the number of
// records in each partition
Map<Tuple2<String, Option<HoodieRecordLocation>>, Long> partitionLocationCounts = taggedRecords Map<Tuple2<String, Option<HoodieRecordLocation>>, Long> partitionLocationCounts = taggedRecords
.mapToPair(record -> new Tuple2<>( .mapToPair(record -> new Tuple2<>(
new Tuple2<>(record.getPartitionPath(), Option.ofNullable(record.getCurrentLocation())), record)) new Tuple2<>(record.getPartitionPath(), Option.ofNullable(record.getCurrentLocation())), record))
.countByKey(); .countByKey();
// count the number of both inserts and updates in each partition, update the counts to workLoadStats
for (Map.Entry<Tuple2<String, Option<HoodieRecordLocation>>, Long> e : partitionLocationCounts.entrySet()) { for (Map.Entry<Tuple2<String, Option<HoodieRecordLocation>>, Long> e : partitionLocationCounts.entrySet()) {
String partitionPath = e.getKey()._1(); String partitionPath = e.getKey()._1();
Long count = e.getValue(); Long count = e.getValue();

View File

@@ -29,7 +29,6 @@ import java.util.Objects;
*/ */
public class HoodieKey implements Serializable { public class HoodieKey implements Serializable {
private final String recordKey; private final String recordKey;
private final String partitionPath; private final String partitionPath;

View File

@@ -136,7 +136,7 @@ public class HoodieTableMetaClient implements Serializable {
} }
/** /**
* For serailizing and de-serializing. * For serializing and de-serializing.
* *
* @deprecated * @deprecated
*/ */
@@ -149,7 +149,7 @@ public class HoodieTableMetaClient implements Serializable {
} }
/** /**
* This method is only used when this object is deserialized in a spark executor. * This method is only used when this object is de-serialized in a spark executor.
* *
* @deprecated * @deprecated
*/ */

View File

@@ -168,8 +168,8 @@ public class ITTestHoodieDemo extends ITTestBase {
private void testSparkSQLAfterFirstBatch() throws Exception { private void testSparkSQLAfterFirstBatch() throws Exception {
Pair<String, String> stdOutErrPair = executeSparkSQLCommand(SPARKSQL_BATCH1_COMMANDS, true); Pair<String, String> stdOutErrPair = executeSparkSQLCommand(SPARKSQL_BATCH1_COMMANDS, true);
assertStdOutContains(stdOutErrPair, "|default |stock_ticks_cow |false |\n" assertStdOutContains(stdOutErrPair, "|default |stock_ticks_cow |false |\n"
+ "|default |stock_ticks_mor_ro |false |\n" + + "|default |stock_ticks_mor_ro |false |\n"
"|default |stock_ticks_mor_rt |false |"); + "|default |stock_ticks_mor_rt |false |");
assertStdOutContains(stdOutErrPair, assertStdOutContains(stdOutErrPair,
"+------+-------------------+\n|GOOG |2018-08-31 10:29:00|\n+------+-------------------+", 3); "+------+-------------------+\n|GOOG |2018-08-31 10:29:00|\n+------+-------------------+", 3);
assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 |", 3); assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 |", 3);

View File

@@ -241,6 +241,13 @@ public class DataSourceUtils {
return new HoodieRecord<>(hKey, payload); return new HoodieRecord<>(hKey, payload);
} }
/**
* Drop records already present in the dataset.
*
* @param jssc JavaSparkContext
* @param incomingHoodieRecords HoodieRecords to deduplicate
* @param writeConfig HoodieWriteConfig
*/
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords, public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords,
HoodieWriteConfig writeConfig) { HoodieWriteConfig writeConfig) {

View File

@@ -41,7 +41,6 @@ import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException; import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider; import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider; import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
@@ -184,6 +183,8 @@ public class DeltaSync implements Serializable {
/** /**
* Refresh Timeline. * Refresh Timeline.
*
* @throws IOException in case of any IOException
*/ */
private void refreshTimeline() throws IOException { private void refreshTimeline() throws IOException {
if (fs.exists(new Path(cfg.targetBasePath))) { if (fs.exists(new Path(cfg.targetBasePath))) {
@@ -239,6 +240,11 @@ public class DeltaSync implements Serializable {
/** /**
* Read from Upstream Source and apply transformation if needed. * Read from Upstream Source and apply transformation if needed.
*
* @param commitTimelineOpt Timeline with completed commits
* @return Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> Input data read from upstream source, consists
* of schemaProvider, checkpointStr and hoodieRecord
* @throws Exception in case of any Exception
*/ */
private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource( private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource(
Option<HoodieTimeline> commitTimelineOpt) throws Exception { Option<HoodieTimeline> commitTimelineOpt) throws Exception {
@@ -355,18 +361,23 @@ public class DeltaSync implements Serializable {
boolean isEmpty = records.isEmpty(); boolean isEmpty = records.isEmpty();
// try to start a new commit
String instantTime = startCommit(); String instantTime = startCommit();
LOG.info("Starting commit : " + instantTime); LOG.info("Starting commit : " + instantTime);
JavaRDD<WriteStatus> writeStatusRDD; JavaRDD<WriteStatus> writeStatusRDD;
if (cfg.operation == Operation.INSERT) { switch (cfg.operation) {
writeStatusRDD = writeClient.insert(records, instantTime); case INSERT:
} else if (cfg.operation == Operation.UPSERT) { writeStatusRDD = writeClient.insert(records, instantTime);
writeStatusRDD = writeClient.upsert(records, instantTime); break;
} else if (cfg.operation == Operation.BULK_INSERT) { case UPSERT:
writeStatusRDD = writeClient.bulkInsert(records, instantTime); writeStatusRDD = writeClient.upsert(records, instantTime);
} else { break;
throw new HoodieDeltaStreamerException("Unknown operation :" + cfg.operation); case BULK_INSERT:
writeStatusRDD = writeClient.bulkInsert(records, instantTime);
break;
default:
throw new HoodieDeltaStreamerException("Unknown operation : " + cfg.operation);
} }
long totalErrorRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue(); long totalErrorRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue();
@@ -425,6 +436,13 @@ public class DeltaSync implements Serializable {
return scheduledCompactionInstant; return scheduledCompactionInstant;
} }
/**
* Try to start a new commit.
* <p>
* Exception will be thrown if it failed in 2 tries.
*
* @return Instant time of the commit
*/
private String startCommit() { private String startCommit() {
final int maxRetries = 2; final int maxRetries = 2;
int retryNum = 1; int retryNum = 1;

View File

@@ -70,7 +70,7 @@ import java.util.stream.IntStream;
* An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply it to the target * An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply it to the target
* table. Does not maintain any state, queries at runtime to see how far behind the target table is from the source * table. Does not maintain any state, queries at runtime to see how far behind the target table is from the source
* table. This can be overriden to force sync from a timestamp. * table. This can be overriden to force sync from a timestamp.
* * <p>
* In continuous mode, DeltaStreamer runs in loop-mode going through the below operations (a) pull-from-source (b) * In continuous mode, DeltaStreamer runs in loop-mode going through the below operations (a) pull-from-source (b)
* write-to-sink (c) Schedule Compactions if needed (d) Conditionally Sync to Hive each cycle. For MOR table with * write-to-sink (c) Schedule Compactions if needed (d) Conditionally Sync to Hive each cycle. For MOR table with
* continuous mode enabled, a separate compactor thread is allocated to execute compactions * continuous mode enabled, a separate compactor thread is allocated to execute compactions
@@ -154,7 +154,7 @@ public class HoodieDeltaStreamer implements Serializable {
UPSERT, INSERT, BULK_INSERT UPSERT, INSERT, BULK_INSERT
} }
protected static class OperationConvertor implements IStringConverter<Operation> { protected static class OperationConverter implements IStringConverter<Operation> {
@Override @Override
public Operation convert(String value) throws ParameterException { public Operation convert(String value) throws ParameterException {
@@ -223,7 +223,7 @@ public class HoodieDeltaStreamer implements Serializable {
public long sourceLimit = Long.MAX_VALUE; public long sourceLimit = Long.MAX_VALUE;
@Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input " @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input "
+ "is purely new data/inserts to gain speed)", converter = OperationConvertor.class) + "is purely new data/inserts to gain speed)", converter = OperationConverter.class)
public Operation operation = Operation.UPSERT; public Operation operation = Operation.UPSERT;
@Parameter(names = {"--filter-dupes"}, @Parameter(names = {"--filter-dupes"},

View File

@@ -112,7 +112,7 @@ public class HoodieMultiTableDeltaStreamer {
String configFilePath = properties.getString(configProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable)); String configFilePath = properties.getString(configProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
checkIfTableConfigFileExists(configFolder, fs, configFilePath); checkIfTableConfigFileExists(configFolder, fs, configFilePath);
TypedProperties tableProperties = UtilHelpers.readConfig(fs, new Path(configFilePath), new ArrayList<>()).getConfig(); TypedProperties tableProperties = UtilHelpers.readConfig(fs, new Path(configFilePath), new ArrayList<>()).getConfig();
properties.forEach((k,v) -> { properties.forEach((k, v) -> {
tableProperties.setProperty(k.toString(), v.toString()); tableProperties.setProperty(k.toString(), v.toString());
}); });
final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
@@ -155,7 +155,7 @@ public class HoodieMultiTableDeltaStreamer {
public static class Helpers { public static class Helpers {
static String getDefaultConfigFilePath(String configFolder, String database, String currentTable) { static String getDefaultConfigFilePath(String configFolder, String database, String currentTable) {
return configFolder + Constants.FILEDELIMITER + database + Constants.UNDERSCORE + currentTable + Constants.DEFAULT_CONFIG_FILE_NAME_SUFFIX; return configFolder + Constants.FILE_DELIMITER + database + Constants.UNDERSCORE + currentTable + Constants.DEFAULT_CONFIG_FILE_NAME_SUFFIX;
} }
static String getTableWithDatabase(TableExecutionContext context) { static String getTableWithDatabase(TableExecutionContext context) {
@@ -264,7 +264,7 @@ public class HoodieMultiTableDeltaStreamer {
public long sourceLimit = Long.MAX_VALUE; public long sourceLimit = Long.MAX_VALUE;
@Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input " @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input "
+ "is purely new data/inserts to gain speed)", converter = HoodieDeltaStreamer.OperationConvertor.class) + "is purely new data/inserts to gain speed)", converter = HoodieDeltaStreamer.OperationConverter.class)
public HoodieDeltaStreamer.Operation operation = HoodieDeltaStreamer.Operation.UPSERT; public HoodieDeltaStreamer.Operation operation = HoodieDeltaStreamer.Operation.UPSERT;
@Parameter(names = {"--filter-dupes"}, @Parameter(names = {"--filter-dupes"},
@@ -329,6 +329,7 @@ public class HoodieMultiTableDeltaStreamer {
/** /**
* Resets target table name and target path using base-path-prefix. * Resets target table name and target path using base-path-prefix.
*
* @param configuration * @param configuration
* @param database * @param database
* @param tableName * @param tableName
@@ -337,13 +338,13 @@ public class HoodieMultiTableDeltaStreamer {
private static String resetTarget(Config configuration, String database, String tableName) { private static String resetTarget(Config configuration, String database, String tableName) {
String basePathPrefix = configuration.basePathPrefix; String basePathPrefix = configuration.basePathPrefix;
basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' ? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix; basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' ? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
String targetBasePath = basePathPrefix + Constants.FILEDELIMITER + database + Constants.FILEDELIMITER + tableName; String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + database + Constants.FILE_DELIMITER + tableName;
configuration.targetTableName = database + Constants.DELIMITER + tableName; configuration.targetTableName = database + Constants.DELIMITER + tableName;
return targetBasePath; return targetBasePath;
} }
/* /**
Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync * Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync.
*/ */
public void sync() { public void sync() {
for (TableExecutionContext context : tableExecutionContexts) { for (TableExecutionContext context : tableExecutionContexts) {
@@ -375,7 +376,7 @@ public class HoodieMultiTableDeltaStreamer {
private static final String DEFAULT_CONFIG_FILE_NAME_SUFFIX = "_config.properties"; private static final String DEFAULT_CONFIG_FILE_NAME_SUFFIX = "_config.properties";
private static final String TARGET_BASE_PATH_PROP = "hoodie.deltastreamer.ingestion.targetBasePath"; private static final String TARGET_BASE_PATH_PROP = "hoodie.deltastreamer.ingestion.targetBasePath";
private static final String LOCAL_SPARK_MASTER = "local[2]"; private static final String LOCAL_SPARK_MASTER = "local[2]";
private static final String FILEDELIMITER = "/"; private static final String FILE_DELIMITER = "/";
private static final String DELIMITER = "."; private static final String DELIMITER = ".";
private static final String UNDERSCORE = "_"; private static final String UNDERSCORE = "_";
private static final String COMMA_SEPARATOR = ","; private static final String COMMA_SEPARATOR = ",";