diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 58e01a5c7..af13171c1 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -225,6 +225,12 @@ public abstract class AbstractHoodieWriteClient e return index; } + /** + * Get HoodieTable and init {@link Timer.Context}. + * + * @param operationType write operation type + * @return HoodieTable + */ protected HoodieTable getTableAndInitCtx(WriteOperationType operationType) { HoodieTableMetaClient metaClient = createMetaClient(true); if (operationType == WriteOperationType.DELETE) { diff --git a/hudi-client/src/main/java/org/apache/hudi/table/WorkloadProfile.java b/hudi-client/src/main/java/org/apache/hudi/table/WorkloadProfile.java index 41ca21271..8179c90a3 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/WorkloadProfile.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/WorkloadProfile.java @@ -49,7 +49,9 @@ public class WorkloadProfile implements Serializa */ private final HashMap partitionPathStatMap; - + /** + * Global workloadStat. + */ private final WorkloadStat globalStat; public WorkloadProfile(JavaRDD> taggedRecords) { @@ -59,13 +61,18 @@ public class WorkloadProfile implements Serializa buildProfile(); } + /** + * Method help to build WorkloadProfile. + */ private void buildProfile() { - + // group the records by partitionPath + currentLocation combination, count the number of + // records in each partition Map>, Long> partitionLocationCounts = taggedRecords .mapToPair(record -> new Tuple2<>( new Tuple2<>(record.getPartitionPath(), Option.ofNullable(record.getCurrentLocation())), record)) .countByKey(); + // count the number of both inserts and updates in each partition, update the counts to workLoadStats for (Map.Entry>, Long> e : partitionLocationCounts.entrySet()) { String partitionPath = e.getKey()._1(); Long count = e.getValue(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java index d03fb1b47..c40bdc45c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java @@ -29,7 +29,6 @@ import java.util.Objects; */ public class HoodieKey implements Serializable { - private final String recordKey; private final String partitionPath; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 03e91c207..3218ddf8b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -136,7 +136,7 @@ public class HoodieTableMetaClient implements Serializable { } /** - * For serailizing and de-serializing. + * For serializing and de-serializing. * * @deprecated */ @@ -149,7 +149,7 @@ public class HoodieTableMetaClient implements Serializable { } /** - * This method is only used when this object is deserialized in a spark executor. + * This method is only used when this object is de-serialized in a spark executor. * * @deprecated */ diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java index 4657092b7..744f4f891 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java @@ -168,8 +168,8 @@ public class ITTestHoodieDemo extends ITTestBase { private void testSparkSQLAfterFirstBatch() throws Exception { Pair stdOutErrPair = executeSparkSQLCommand(SPARKSQL_BATCH1_COMMANDS, true); assertStdOutContains(stdOutErrPair, "|default |stock_ticks_cow |false |\n" - + "|default |stock_ticks_mor_ro |false |\n" + - "|default |stock_ticks_mor_rt |false |"); + + "|default |stock_ticks_mor_ro |false |\n" + + "|default |stock_ticks_mor_rt |false |"); assertStdOutContains(stdOutErrPair, "+------+-------------------+\n|GOOG |2018-08-31 10:29:00|\n+------+-------------------+", 3); assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 |", 3); diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index 9983c191c..e45bd65e7 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -241,6 +241,13 @@ public class DataSourceUtils { return new HoodieRecord<>(hKey, payload); } + /** + * Drop records already present in the dataset. + * + * @param jssc JavaSparkContext + * @param incomingHoodieRecords HoodieRecords to deduplicate + * @param writeConfig HoodieWriteConfig + */ @SuppressWarnings("unchecked") public static JavaRDD dropDuplicates(JavaSparkContext jssc, JavaRDD incomingHoodieRecords, HoodieWriteConfig writeConfig) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index fd051ed09..e36495dac 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -41,7 +41,6 @@ import org.apache.hudi.hive.HiveSyncTool; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.utilities.UtilHelpers; -import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation; import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException; import org.apache.hudi.utilities.schema.DelegatingSchemaProvider; import org.apache.hudi.utilities.schema.RowBasedSchemaProvider; @@ -184,6 +183,8 @@ public class DeltaSync implements Serializable { /** * Refresh Timeline. + * + * @throws IOException in case of any IOException */ private void refreshTimeline() throws IOException { if (fs.exists(new Path(cfg.targetBasePath))) { @@ -239,6 +240,11 @@ public class DeltaSync implements Serializable { /** * Read from Upstream Source and apply transformation if needed. + * + * @param commitTimelineOpt Timeline with completed commits + * @return Pair>> Input data read from upstream source, consists + * of schemaProvider, checkpointStr and hoodieRecord + * @throws Exception in case of any Exception */ private Pair>> readFromSource( Option commitTimelineOpt) throws Exception { @@ -355,18 +361,23 @@ public class DeltaSync implements Serializable { boolean isEmpty = records.isEmpty(); + // try to start a new commit String instantTime = startCommit(); LOG.info("Starting commit : " + instantTime); JavaRDD writeStatusRDD; - if (cfg.operation == Operation.INSERT) { - writeStatusRDD = writeClient.insert(records, instantTime); - } else if (cfg.operation == Operation.UPSERT) { - writeStatusRDD = writeClient.upsert(records, instantTime); - } else if (cfg.operation == Operation.BULK_INSERT) { - writeStatusRDD = writeClient.bulkInsert(records, instantTime); - } else { - throw new HoodieDeltaStreamerException("Unknown operation :" + cfg.operation); + switch (cfg.operation) { + case INSERT: + writeStatusRDD = writeClient.insert(records, instantTime); + break; + case UPSERT: + writeStatusRDD = writeClient.upsert(records, instantTime); + break; + case BULK_INSERT: + writeStatusRDD = writeClient.bulkInsert(records, instantTime); + break; + default: + throw new HoodieDeltaStreamerException("Unknown operation : " + cfg.operation); } long totalErrorRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue(); @@ -425,6 +436,13 @@ public class DeltaSync implements Serializable { return scheduledCompactionInstant; } + /** + * Try to start a new commit. + *

+ * Exception will be thrown if it failed in 2 tries. + * + * @return Instant time of the commit + */ private String startCommit() { final int maxRetries = 2; int retryNum = 1; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 99bf69691..a569c4f82 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -70,7 +70,7 @@ import java.util.stream.IntStream; * An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply it to the target * table. Does not maintain any state, queries at runtime to see how far behind the target table is from the source * table. This can be overriden to force sync from a timestamp. - * + *

* In continuous mode, DeltaStreamer runs in loop-mode going through the below operations (a) pull-from-source (b) * write-to-sink (c) Schedule Compactions if needed (d) Conditionally Sync to Hive each cycle. For MOR table with * continuous mode enabled, a separate compactor thread is allocated to execute compactions @@ -154,7 +154,7 @@ public class HoodieDeltaStreamer implements Serializable { UPSERT, INSERT, BULK_INSERT } - protected static class OperationConvertor implements IStringConverter { + protected static class OperationConverter implements IStringConverter { @Override public Operation convert(String value) throws ParameterException { @@ -223,7 +223,7 @@ public class HoodieDeltaStreamer implements Serializable { public long sourceLimit = Long.MAX_VALUE; @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input " - + "is purely new data/inserts to gain speed)", converter = OperationConvertor.class) + + "is purely new data/inserts to gain speed)", converter = OperationConverter.class) public Operation operation = Operation.UPSERT; @Parameter(names = {"--filter-dupes"}, diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java index e43bca3b2..7684d1b9d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java @@ -112,7 +112,7 @@ public class HoodieMultiTableDeltaStreamer { String configFilePath = properties.getString(configProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable)); checkIfTableConfigFileExists(configFolder, fs, configFilePath); TypedProperties tableProperties = UtilHelpers.readConfig(fs, new Path(configFilePath), new ArrayList<>()).getConfig(); - properties.forEach((k,v) -> { + properties.forEach((k, v) -> { tableProperties.setProperty(k.toString(), v.toString()); }); final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); @@ -155,7 +155,7 @@ public class HoodieMultiTableDeltaStreamer { public static class Helpers { static String getDefaultConfigFilePath(String configFolder, String database, String currentTable) { - return configFolder + Constants.FILEDELIMITER + database + Constants.UNDERSCORE + currentTable + Constants.DEFAULT_CONFIG_FILE_NAME_SUFFIX; + return configFolder + Constants.FILE_DELIMITER + database + Constants.UNDERSCORE + currentTable + Constants.DEFAULT_CONFIG_FILE_NAME_SUFFIX; } static String getTableWithDatabase(TableExecutionContext context) { @@ -264,7 +264,7 @@ public class HoodieMultiTableDeltaStreamer { public long sourceLimit = Long.MAX_VALUE; @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input " - + "is purely new data/inserts to gain speed)", converter = HoodieDeltaStreamer.OperationConvertor.class) + + "is purely new data/inserts to gain speed)", converter = HoodieDeltaStreamer.OperationConverter.class) public HoodieDeltaStreamer.Operation operation = HoodieDeltaStreamer.Operation.UPSERT; @Parameter(names = {"--filter-dupes"}, @@ -329,6 +329,7 @@ public class HoodieMultiTableDeltaStreamer { /** * Resets target table name and target path using base-path-prefix. + * * @param configuration * @param database * @param tableName @@ -337,13 +338,13 @@ public class HoodieMultiTableDeltaStreamer { private static String resetTarget(Config configuration, String database, String tableName) { String basePathPrefix = configuration.basePathPrefix; basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' ? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix; - String targetBasePath = basePathPrefix + Constants.FILEDELIMITER + database + Constants.FILEDELIMITER + tableName; + String targetBasePath = basePathPrefix + Constants.FILE_DELIMITER + database + Constants.FILE_DELIMITER + tableName; configuration.targetTableName = database + Constants.DELIMITER + tableName; return targetBasePath; } - /* - Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync + /** + * Creates actual HoodieDeltaStreamer objects for every table/topic and does incremental sync. */ public void sync() { for (TableExecutionContext context : tableExecutionContexts) { @@ -375,7 +376,7 @@ public class HoodieMultiTableDeltaStreamer { private static final String DEFAULT_CONFIG_FILE_NAME_SUFFIX = "_config.properties"; private static final String TARGET_BASE_PATH_PROP = "hoodie.deltastreamer.ingestion.targetBasePath"; private static final String LOCAL_SPARK_MASTER = "local[2]"; - private static final String FILEDELIMITER = "/"; + private static final String FILE_DELIMITER = "/"; private static final String DELIMITER = "."; private static final String UNDERSCORE = "_"; private static final String COMMA_SEPARATOR = ",";