1
0

[HUDI-3213] Making commit preserve metadata to true for compaction (#4811)

* Making commit preserve metadata to true

* Fixing integ tests

* Fixing preserve commit metadata for metadata table

* fixed bootstrap tests

* temp diff

* Fixing merge handle

* renaming fallback record

* fixing build issue

* Fixing test failures
This commit is contained in:
Sivabalan Narayanan
2022-03-07 07:32:05 -05:00
committed by GitHub
parent 6f57bbfac4
commit 3539578ccb
15 changed files with 88 additions and 48 deletions

View File

@@ -250,7 +250,7 @@ public class HoodieCompactionConfig extends HoodieConfig {
public static final ConfigProperty<Boolean> PRESERVE_COMMIT_METADATA = ConfigProperty public static final ConfigProperty<Boolean> PRESERVE_COMMIT_METADATA = ConfigProperty
.key("hoodie.compaction.preserve.commit.metadata") .key("hoodie.compaction.preserve.commit.metadata")
.defaultValue(false) .defaultValue(true)
.sinceVersion("0.11.0") .sinceVersion("0.11.0")
.withDocumentation("When rewriting data, preserves existing hoodie_commit_time"); .withDocumentation("When rewriting data, preserves existing hoodie_commit_time");

View File

@@ -1072,8 +1072,7 @@ public class HoodieWriteConfig extends HoodieConfig {
} }
public boolean populateMetaFields() { public boolean populateMetaFields() {
return Boolean.parseBoolean(getStringOrDefault(HoodieTableConfig.POPULATE_META_FIELDS, return getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS);
HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()));
} }
/** /**

View File

@@ -61,6 +61,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import static org.apache.hudi.common.model.HoodieRecord.FILENAME_METADATA_FIELD_POS;
@SuppressWarnings("Duplicates") @SuppressWarnings("Duplicates")
/** /**
* Handle to merge incoming records to those in storage. * Handle to merge incoming records to those in storage.
@@ -262,7 +264,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation()); isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
} }
} }
return writeRecord(hoodieRecord, indexedRecord, isDelete); return writeRecord(hoodieRecord, indexedRecord, isDelete, oldRecord);
} }
protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException { protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
@@ -272,16 +274,16 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) { if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) {
return; return;
} }
if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()))) { if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()), null)) {
insertRecordsWritten++; insertRecordsWritten++;
} }
} }
protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord) { protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord) {
return writeRecord(hoodieRecord, indexedRecord, false); return writeRecord(hoodieRecord, indexedRecord, false, null);
} }
protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord, boolean isDelete) { protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord, boolean isDelete, GenericRecord oldRecord) {
Option recordMetadata = hoodieRecord.getData().getMetadata(); Option recordMetadata = hoodieRecord.getData().getMetadata();
if (!partitionPath.equals(hoodieRecord.getPartitionPath())) { if (!partitionPath.equals(hoodieRecord.getPartitionPath())) {
HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: " HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "
@@ -292,8 +294,10 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
try { try {
if (indexedRecord.isPresent() && !isDelete) { if (indexedRecord.isPresent() && !isDelete) {
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) indexedRecord.get()); IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) indexedRecord.get(), preserveMetadata, oldRecord);
if (preserveMetadata) { if (preserveMetadata && useWriterSchema) { // useWriteSchema will be true only incase of compaction.
// do not preserve FILENAME_METADATA_FIELD
recordWithMetadataInSchema.put(FILENAME_METADATA_FIELD_POS, newFilePath.getName());
fileWriter.writeAvro(hoodieRecord.getRecordKey(), recordWithMetadataInSchema); fileWriter.writeAvro(hoodieRecord.getRecordKey(), recordWithMetadataInSchema);
} else { } else {
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, hoodieRecord); fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, hoodieRecord);

View File

@@ -227,6 +227,10 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
return HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields); return HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields);
} }
protected GenericRecord rewriteRecord(GenericRecord record, boolean copyOverMetaFields, GenericRecord fallbackRecord) {
return HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields, copyOverMetaFields, fallbackRecord);
}
public abstract List<WriteStatus> close(); public abstract List<WriteStatus> close();
public List<WriteStatus> writeStatuses() { public List<WriteStatus> writeStatuses() {

View File

@@ -176,7 +176,7 @@ public class HoodieAvroUtils {
/** /**
* Adds the Hoodie metadata fields to the given schema. * Adds the Hoodie metadata fields to the given schema.
* *
* @param schema The schema * @param schema The schema
* @param withOperationField Whether to include the '_hoodie_operation' field * @param withOperationField Whether to include the '_hoodie_operation' field
*/ */
public static Schema addMetadataFields(Schema schema, boolean withOperationField) { public static Schema addMetadataFields(Schema schema, boolean withOperationField) {
@@ -276,7 +276,7 @@ public class HoodieAvroUtils {
List<Schema.Field> toBeAddedFields = new ArrayList<>(); List<Schema.Field> toBeAddedFields = new ArrayList<>();
Schema recordSchema = Schema.createRecord("HoodieRecordKey", "", "", false); Schema recordSchema = Schema.createRecord("HoodieRecordKey", "", "", false);
for (Schema.Field schemaField: fileSchema.getFields()) { for (Schema.Field schemaField : fileSchema.getFields()) {
if (fields.contains(schemaField.name())) { if (fields.contains(schemaField.name())) {
toBeAddedFields.add(new Schema.Field(schemaField.name(), schemaField.schema(), schemaField.doc(), schemaField.defaultVal())); toBeAddedFields.add(new Schema.Field(schemaField.name(), schemaField.schema(), schemaField.doc(), schemaField.defaultVal()));
} }
@@ -303,7 +303,7 @@ public class HoodieAvroUtils {
* engines have varying constraints regarding treating the case-sensitivity of fields, its best to let caller * engines have varying constraints regarding treating the case-sensitivity of fields, its best to let caller
* determine that. * determine that.
* *
* @param schema Passed in schema * @param schema Passed in schema
* @param newFieldNames Null Field names to be added * @param newFieldNames Null Field names to be added
*/ */
public static Schema appendNullSchemaFields(Schema schema, List<String> newFieldNames) { public static Schema appendNullSchemaFields(Schema schema, List<String> newFieldNames) {
@@ -382,10 +382,34 @@ public class HoodieAvroUtils {
return newRecord; return newRecord;
} }
public static GenericRecord rewriteRecord(GenericRecord genericRecord, Schema newSchema, boolean copyOverMetaFields, GenericRecord fallbackRecord) {
GenericRecord newRecord = new GenericData.Record(newSchema);
boolean isSpecificRecord = genericRecord instanceof SpecificRecordBase;
for (Schema.Field f : newSchema.getFields()) {
if (!(isSpecificRecord && isMetadataField(f.name()))) {
copyOldValueOrSetDefault(genericRecord, newRecord, f);
}
if (isMetadataField(f.name()) && copyOverMetaFields) {
// if meta field exists in primary generic record, copy over.
if (genericRecord.getSchema().getField(f.name()) != null) {
copyOldValueOrSetDefault(genericRecord, newRecord, f);
} else if (fallbackRecord != null && fallbackRecord.getSchema().getField(f.name()) != null) {
// if not, try to copy from the fallback record.
copyOldValueOrSetDefault(fallbackRecord, newRecord, f);
}
}
}
if (!GenericData.get().validate(newSchema, newRecord)) {
throw new SchemaCompatibilityException(
"Unable to validate the rewritten record " + genericRecord + " against schema " + newSchema);
}
return newRecord;
}
/** /**
* Converts list of {@link GenericRecord} provided into the {@link GenericRecord} adhering to the * Converts list of {@link GenericRecord} provided into the {@link GenericRecord} adhering to the
* provided {@code newSchema}. * provided {@code newSchema}.
* * <p>
* To better understand conversion rules please check {@link #rewriteRecord(GenericRecord, Schema)} * To better understand conversion rules please check {@link #rewriteRecord(GenericRecord, Schema)}
*/ */
public static List<GenericRecord> rewriteRecords(List<GenericRecord> records, Schema newSchema) { public static List<GenericRecord> rewriteRecords(List<GenericRecord> records, Schema newSchema) {
@@ -491,9 +515,8 @@ public class HoodieAvroUtils {
* Returns the string value of the given record {@code rec} and field {@code fieldName}. * Returns the string value of the given record {@code rec} and field {@code fieldName}.
* The field and value both could be missing. * The field and value both could be missing.
* *
* @param rec The record * @param rec The record
* @param fieldName The field name * @param fieldName The field name
*
* @return the string form of the field * @return the string form of the field
* or empty if the schema does not contain the field name or the value is null * or empty if the schema does not contain the field name or the value is null
*/ */
@@ -507,7 +530,7 @@ public class HoodieAvroUtils {
* This method converts values for fields with certain Avro/Parquet data types that require special handling. * This method converts values for fields with certain Avro/Parquet data types that require special handling.
* *
* @param fieldSchema avro field schema * @param fieldSchema avro field schema
* @param fieldValue avro field value * @param fieldValue avro field value
* @return field value either converted (for certain data types) or as it is. * @return field value either converted (for certain data types) or as it is.
*/ */
public static Object convertValueForSpecificDataTypes(Schema fieldSchema, Object fieldValue, boolean consistentLogicalTimestampEnabled) { public static Object convertValueForSpecificDataTypes(Schema fieldSchema, Object fieldValue, boolean consistentLogicalTimestampEnabled) {
@@ -527,15 +550,15 @@ public class HoodieAvroUtils {
/** /**
* This method converts values for fields with certain Avro Logical data types that require special handling. * This method converts values for fields with certain Avro Logical data types that require special handling.
* * <p>
* Logical Date Type is converted to actual Date value instead of Epoch Integer which is how it is * Logical Date Type is converted to actual Date value instead of Epoch Integer which is how it is
* represented/stored in parquet. * represented/stored in parquet.
* * <p>
* Decimal Data Type is converted to actual decimal value instead of bytes/fixed which is how it is * Decimal Data Type is converted to actual decimal value instead of bytes/fixed which is how it is
* represented/stored in parquet. * represented/stored in parquet.
* *
* @param fieldSchema avro field schema * @param fieldSchema avro field schema
* @param fieldValue avro field value * @param fieldValue avro field value
* @return field value either converted (for certain data types) or as it is. * @return field value either converted (for certain data types) or as it is.
*/ */
private static Object convertValueForAvroLogicalTypes(Schema fieldSchema, Object fieldValue, boolean consistentLogicalTimestampEnabled) { private static Object convertValueForAvroLogicalTypes(Schema fieldSchema, Object fieldValue, boolean consistentLogicalTimestampEnabled) {
@@ -569,6 +592,7 @@ public class HoodieAvroUtils {
/** /**
* Sanitizes Name according to Avro rule for names. * Sanitizes Name according to Avro rule for names.
* Removes characters other than the ones mentioned in https://avro.apache.org/docs/current/spec.html#names . * Removes characters other than the ones mentioned in https://avro.apache.org/docs/current/spec.html#names .
*
* @param name input name * @param name input name
* @return sanitized name * @return sanitized name
*/ */

View File

@@ -42,6 +42,8 @@ public abstract class HoodieRecord<T> implements Serializable {
public static final String OPERATION_METADATA_FIELD = "_hoodie_operation"; public static final String OPERATION_METADATA_FIELD = "_hoodie_operation";
public static final String HOODIE_IS_DELETED = "_hoodie_is_deleted"; public static final String HOODIE_IS_DELETED = "_hoodie_is_deleted";
public static int FILENAME_METADATA_FIELD_POS = 4;
public static final List<String> HOODIE_META_COLUMNS = public static final List<String> HOODIE_META_COLUMNS =
CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD, CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD,
RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, FILENAME_METADATA_FIELD); RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, FILENAME_METADATA_FIELD);

View File

@@ -172,9 +172,9 @@ public class HoodieTableConfig extends HoodieConfig {
.noDefaultValue() .noDefaultValue()
.withDocumentation("Base path of the dataset that needs to be bootstrapped as a Hudi table"); .withDocumentation("Base path of the dataset that needs to be bootstrapped as a Hudi table");
public static final ConfigProperty<String> POPULATE_META_FIELDS = ConfigProperty public static final ConfigProperty<Boolean> POPULATE_META_FIELDS = ConfigProperty
.key("hoodie.populate.meta.fields") .key("hoodie.populate.meta.fields")
.defaultValue("true") .defaultValue(true)
.withDocumentation("When enabled, populates all meta fields. When disabled, no meta fields are populated " .withDocumentation("When enabled, populates all meta fields. When disabled, no meta fields are populated "
+ "and incremental queries will not be functional. This is only meant to be used for append only/immutable data for batch processing"); + "and incremental queries will not be functional. This is only meant to be used for append only/immutable data for batch processing");

View File

@@ -395,7 +395,8 @@ object HoodieSparkSqlWriter {
val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters) val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters)
val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD) val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME) val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
val populateMetaFields = parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()).toBoolean val populateMetaFields = java.lang.Boolean.parseBoolean((parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(),
String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))))
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT) val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
HoodieTableMetaClient.withPropertyBuilder() HoodieTableMetaClient.withPropertyBuilder()
@@ -447,8 +448,8 @@ object HoodieSparkSqlWriter {
instantTime: String, instantTime: String,
partitionColumns: String): (Boolean, common.util.Option[String]) = { partitionColumns: String): (Boolean, common.util.Option[String]) = {
val sparkContext = sqlContext.sparkContext val sparkContext = sqlContext.sparkContext
val populateMetaFields = parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(), val populateMetaFields = java.lang.Boolean.parseBoolean((parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(),
HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()).toBoolean String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))))
val dropPartitionColumns = val dropPartitionColumns =
parameters.getOrElse(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key(), DataSourceWriteOptions.DROP_PARTITION_COLUMNS.defaultValue()).toBoolean parameters.getOrElse(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key(), DataSourceWriteOptions.DROP_PARTITION_COLUMNS.defaultValue()).toBoolean
// register classes & schemas // register classes & schemas

View File

@@ -364,6 +364,7 @@ public class HoodieJavaStreamingApp {
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1") .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1")
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "true") .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "true")
.option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true") .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true")
.option(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA.key(), "false")
.option(HoodieWriteConfig.TBL_NAME.key(), tableName).option("checkpointLocation", checkpointLocation) .option(HoodieWriteConfig.TBL_NAME.key(), tableName).option("checkpointLocation", checkpointLocation)
.outputMode(OutputMode.Append()); .outputMode(OutputMode.Append());

View File

@@ -256,7 +256,7 @@ public class TestBootstrap extends HoodieClientTestBase {
SparkRDDWriteClient client = new SparkRDDWriteClient(context, config); SparkRDDWriteClient client = new SparkRDDWriteClient(context, config);
client.bootstrap(Option.empty()); client.bootstrap(Option.empty());
checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap, checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap,
numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants); numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants, true);
// Rollback Bootstrap // Rollback Bootstrap
HoodieActiveTimeline.deleteInstantFile(metaClient.getFs(), metaClient.getMetaPath(), new HoodieInstant(State.COMPLETED, HoodieActiveTimeline.deleteInstantFile(metaClient.getFs(), metaClient.getMetaPath(), new HoodieInstant(State.COMPLETED,
@@ -284,7 +284,7 @@ public class TestBootstrap extends HoodieClientTestBase {
} }
checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap, checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap,
numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants); numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants, true);
// Upsert case // Upsert case
long updateTimestamp = Instant.now().toEpochMilli(); long updateTimestamp = Instant.now().toEpochMilli();
@@ -296,7 +296,7 @@ public class TestBootstrap extends HoodieClientTestBase {
String newInstantTs = client.startCommit(); String newInstantTs = client.startCommit();
client.upsert(updateBatch, newInstantTs); client.upsert(updateBatch, newInstantTs);
checkBootstrapResults(totalRecords, schema, newInstantTs, false, numInstantsAfterBootstrap + 1, checkBootstrapResults(totalRecords, schema, newInstantTs, false, numInstantsAfterBootstrap + 1,
updateTimestamp, deltaCommit ? timestamp : updateTimestamp, deltaCommit); updateTimestamp, deltaCommit ? timestamp : updateTimestamp, deltaCommit, true);
if (deltaCommit) { if (deltaCommit) {
Option<String> compactionInstant = client.scheduleCompaction(Option.empty()); Option<String> compactionInstant = client.scheduleCompaction(Option.empty());
@@ -304,7 +304,7 @@ public class TestBootstrap extends HoodieClientTestBase {
client.compact(compactionInstant.get()); client.compact(compactionInstant.get());
checkBootstrapResults(totalRecords, schema, compactionInstant.get(), checkNumRawFiles, checkBootstrapResults(totalRecords, schema, compactionInstant.get(), checkNumRawFiles,
numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp, !deltaCommit, numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp, !deltaCommit,
Arrays.asList(compactionInstant.get())); Arrays.asList(compactionInstant.get()), !config.isPreserveHoodieCommitMetadataForCompaction());
} }
} }
@@ -334,14 +334,14 @@ public class TestBootstrap extends HoodieClientTestBase {
} }
private void checkBootstrapResults(int totalRecords, Schema schema, String maxInstant, boolean checkNumRawFiles, private void checkBootstrapResults(int totalRecords, Schema schema, String maxInstant, boolean checkNumRawFiles,
int expNumInstants, long expTimestamp, long expROTimestamp, boolean isDeltaCommit) throws Exception { int expNumInstants, long expTimestamp, long expROTimestamp, boolean isDeltaCommit, boolean validateRecordsForCommitTime) throws Exception {
checkBootstrapResults(totalRecords, schema, maxInstant, checkNumRawFiles, expNumInstants, expNumInstants, checkBootstrapResults(totalRecords, schema, maxInstant, checkNumRawFiles, expNumInstants, expNumInstants,
expTimestamp, expROTimestamp, isDeltaCommit, Arrays.asList(maxInstant)); expTimestamp, expROTimestamp, isDeltaCommit, Arrays.asList(maxInstant), validateRecordsForCommitTime);
} }
private void checkBootstrapResults(int totalRecords, Schema schema, String instant, boolean checkNumRawFiles, private void checkBootstrapResults(int totalRecords, Schema schema, String instant, boolean checkNumRawFiles,
int expNumInstants, int numVersions, long expTimestamp, long expROTimestamp, boolean isDeltaCommit, int expNumInstants, int numVersions, long expTimestamp, long expROTimestamp, boolean isDeltaCommit,
List<String> instantsWithValidRecords) throws Exception { List<String> instantsWithValidRecords, boolean validateRecordsForCommitTime) throws Exception {
metaClient.reloadActiveTimeline(); metaClient.reloadActiveTimeline();
assertEquals(expNumInstants, metaClient.getCommitsTimeline().filterCompletedInstants().countInstants()); assertEquals(expNumInstants, metaClient.getCommitsTimeline().filterCompletedInstants().countInstants());
assertEquals(instant, metaClient.getActiveTimeline() assertEquals(instant, metaClient.getActiveTimeline()
@@ -361,8 +361,10 @@ public class TestBootstrap extends HoodieClientTestBase {
if (!isDeltaCommit) { if (!isDeltaCommit) {
String predicate = String.join(", ", String predicate = String.join(", ",
instantsWithValidRecords.stream().map(p -> "\"" + p + "\"").collect(Collectors.toList())); instantsWithValidRecords.stream().map(p -> "\"" + p + "\"").collect(Collectors.toList()));
assertEquals(totalRecords, sqlContext.sql("select * from bootstrapped where _hoodie_commit_time IN " if (validateRecordsForCommitTime) {
+ "(" + predicate + ")").count()); assertEquals(totalRecords, sqlContext.sql("select * from bootstrapped where _hoodie_commit_time IN "
+ "(" + predicate + ")").count());
}
Dataset<Row> missingOriginal = sqlContext.sql("select a._row_key from original a where a._row_key not " Dataset<Row> missingOriginal = sqlContext.sql("select a._row_key from original a where a._row_key not "
+ "in (select _hoodie_record_key from bootstrapped)"); + "in (select _hoodie_record_key from bootstrapped)");
assertEquals(0, missingOriginal.count()); assertEquals(0, missingOriginal.count());

View File

@@ -248,7 +248,7 @@ public class TestOrcBootstrap extends HoodieClientTestBase {
SparkRDDWriteClient client = new SparkRDDWriteClient(context, config); SparkRDDWriteClient client = new SparkRDDWriteClient(context, config);
client.bootstrap(Option.empty()); client.bootstrap(Option.empty());
checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap, checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap,
numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants); numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants, true);
// Rollback Bootstrap // Rollback Bootstrap
if (deltaCommit) { if (deltaCommit) {
@@ -278,7 +278,7 @@ public class TestOrcBootstrap extends HoodieClientTestBase {
} }
checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap, checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap,
numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants); numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants, true);
// Upsert case // Upsert case
long updateTimestamp = Instant.now().toEpochMilli(); long updateTimestamp = Instant.now().toEpochMilli();
@@ -290,7 +290,7 @@ public class TestOrcBootstrap extends HoodieClientTestBase {
String newInstantTs = client.startCommit(); String newInstantTs = client.startCommit();
client.upsert(updateBatch, newInstantTs); client.upsert(updateBatch, newInstantTs);
checkBootstrapResults(totalRecords, schema, newInstantTs, false, numInstantsAfterBootstrap + 1, checkBootstrapResults(totalRecords, schema, newInstantTs, false, numInstantsAfterBootstrap + 1,
updateTimestamp, deltaCommit ? timestamp : updateTimestamp, deltaCommit); updateTimestamp, deltaCommit ? timestamp : updateTimestamp, deltaCommit, true);
if (deltaCommit) { if (deltaCommit) {
Option<String> compactionInstant = client.scheduleCompaction(Option.empty()); Option<String> compactionInstant = client.scheduleCompaction(Option.empty());
@@ -298,7 +298,7 @@ public class TestOrcBootstrap extends HoodieClientTestBase {
client.compact(compactionInstant.get()); client.compact(compactionInstant.get());
checkBootstrapResults(totalRecords, schema, compactionInstant.get(), checkNumRawFiles, checkBootstrapResults(totalRecords, schema, compactionInstant.get(), checkNumRawFiles,
numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp, !deltaCommit, numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp, !deltaCommit,
Arrays.asList(compactionInstant.get())); Arrays.asList(compactionInstant.get()), !config.isPreserveHoodieCommitMetadataForCompaction());
} }
} }
@@ -328,14 +328,14 @@ public class TestOrcBootstrap extends HoodieClientTestBase {
} }
private void checkBootstrapResults(int totalRecords, Schema schema, String maxInstant, boolean checkNumRawFiles, private void checkBootstrapResults(int totalRecords, Schema schema, String maxInstant, boolean checkNumRawFiles,
int expNumInstants, long expTimestamp, long expROTimestamp, boolean isDeltaCommit) throws Exception { int expNumInstants, long expTimestamp, long expROTimestamp, boolean isDeltaCommit, boolean validateRecordsForCommitTime) throws Exception {
checkBootstrapResults(totalRecords, schema, maxInstant, checkNumRawFiles, expNumInstants, expNumInstants, checkBootstrapResults(totalRecords, schema, maxInstant, checkNumRawFiles, expNumInstants, expNumInstants,
expTimestamp, expROTimestamp, isDeltaCommit, Arrays.asList(maxInstant)); expTimestamp, expROTimestamp, isDeltaCommit, Arrays.asList(maxInstant), validateRecordsForCommitTime);
} }
private void checkBootstrapResults(int totalRecords, Schema schema, String instant, boolean checkNumRawFiles, private void checkBootstrapResults(int totalRecords, Schema schema, String instant, boolean checkNumRawFiles,
int expNumInstants, int numVersions, long expTimestamp, long expROTimestamp, boolean isDeltaCommit, int expNumInstants, int numVersions, long expTimestamp, long expROTimestamp, boolean isDeltaCommit,
List<String> instantsWithValidRecords) throws Exception { List<String> instantsWithValidRecords, boolean validateCommitRecords) throws Exception {
metaClient.reloadActiveTimeline(); metaClient.reloadActiveTimeline();
assertEquals(expNumInstants, metaClient.getCommitsTimeline().filterCompletedInstants().countInstants()); assertEquals(expNumInstants, metaClient.getCommitsTimeline().filterCompletedInstants().countInstants());
assertEquals(instant, metaClient.getActiveTimeline() assertEquals(instant, metaClient.getActiveTimeline()
@@ -355,8 +355,10 @@ public class TestOrcBootstrap extends HoodieClientTestBase {
if (!isDeltaCommit) { if (!isDeltaCommit) {
String predicate = String.join(", ", String predicate = String.join(", ",
instantsWithValidRecords.stream().map(p -> "\"" + p + "\"").collect(Collectors.toList())); instantsWithValidRecords.stream().map(p -> "\"" + p + "\"").collect(Collectors.toList()));
assertEquals(totalRecords, sqlContext.sql("select * from bootstrapped where _hoodie_commit_time IN " if (validateCommitRecords) {
+ "(" + predicate + ")").count()); assertEquals(totalRecords, sqlContext.sql("select * from bootstrapped where _hoodie_commit_time IN "
+ "(" + predicate + ")").count());
}
Dataset<Row> missingOriginal = sqlContext.sql("select a._row_key from original a where a._row_key not " Dataset<Row> missingOriginal = sqlContext.sql("select a._row_key from original a where a._row_key not "
+ "in (select _hoodie_record_key from bootstrapped)"); + "in (select _hoodie_record_key from bootstrapped)");
assertEquals(0, missingOriginal.count()); assertEquals(0, missingOriginal.count());

View File

@@ -272,8 +272,9 @@ class TestMORDataSource extends HoodieClientTestBase {
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commit5Time) .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commit5Time)
.option(DataSourceReadOptions.END_INSTANTTIME.key, commit6Time) .option(DataSourceReadOptions.END_INSTANTTIME.key, commit6Time)
.load(basePath) .load(basePath)
// compaction updated 150 rows + inserted 2 new row // even though compaction updated 150 rows, since preserve commit metadata is true, they won't be part of incremental query.
assertEquals(152, hudiIncDF6.count()) // inserted 2 new row
assertEquals(2, hudiIncDF6.count())
} }
@Test @Test

View File

@@ -66,7 +66,7 @@ public class DefaultSource extends BaseDefaultSource implements DataSourceV2,
String path = options.get("path").get(); String path = options.get("path").get();
String tblName = options.get(HoodieWriteConfig.TBL_NAME.key()).get(); String tblName = options.get(HoodieWriteConfig.TBL_NAME.key()).get();
boolean populateMetaFields = options.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(), boolean populateMetaFields = options.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(),
Boolean.parseBoolean(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())); HoodieTableConfig.POPULATE_META_FIELDS.defaultValue());
Map<String, String> properties = options.asMap(); Map<String, String> properties = options.asMap();
// Auto set the value of "hoodie.parquet.writelegacyformat.enabled" // Auto set the value of "hoodie.parquet.writelegacyformat.enabled"
mayBeOverwriteParquetWriteLegacyFormatProp(properties, schema); mayBeOverwriteParquetWriteLegacyFormatProp(properties, schema);

View File

@@ -53,7 +53,7 @@ public class DefaultSource extends BaseDefaultSource implements TableProvider {
String path = properties.get("path"); String path = properties.get("path");
String tblName = properties.get(HoodieWriteConfig.TBL_NAME.key()); String tblName = properties.get(HoodieWriteConfig.TBL_NAME.key());
boolean populateMetaFields = Boolean.parseBoolean(properties.getOrDefault(HoodieTableConfig.POPULATE_META_FIELDS.key(), boolean populateMetaFields = Boolean.parseBoolean(properties.getOrDefault(HoodieTableConfig.POPULATE_META_FIELDS.key(),
HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())); Boolean.toString(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())));
boolean arePartitionRecordsSorted = Boolean.parseBoolean(properties.getOrDefault(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED, boolean arePartitionRecordsSorted = Boolean.parseBoolean(properties.getOrDefault(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED,
Boolean.toString(HoodieInternalConfig.DEFAULT_BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED))); Boolean.toString(HoodieInternalConfig.DEFAULT_BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED)));
// Create a new map as the properties is an unmodifiableMap on Spark 3.2.0 // Create a new map as the properties is an unmodifiableMap on Spark 3.2.0

View File

@@ -276,7 +276,7 @@ public class DeltaSync implements Serializable {
.setPartitionFields(partitionColumns) .setPartitionFields(partitionColumns)
.setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key())) .setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key()))
.setPopulateMetaFields(props.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(), .setPopulateMetaFields(props.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(),
Boolean.parseBoolean(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))) HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))
.setKeyGeneratorClassProp(props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), .setKeyGeneratorClassProp(props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(),
SimpleKeyGenerator.class.getName())) SimpleKeyGenerator.class.getName()))
.setPreCombineField(cfg.sourceOrderingField) .setPreCombineField(cfg.sourceOrderingField)
@@ -370,7 +370,7 @@ public class DeltaSync implements Serializable {
.setPartitionFields(partitionColumns) .setPartitionFields(partitionColumns)
.setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key())) .setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key()))
.setPopulateMetaFields(props.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(), .setPopulateMetaFields(props.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(),
Boolean.parseBoolean(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))) HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))
.setKeyGeneratorClassProp(props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), .setKeyGeneratorClassProp(props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(),
SimpleKeyGenerator.class.getName())) SimpleKeyGenerator.class.getName()))
.initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath); .initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath);