1
0

Revert "[HUDI-2087] Support Append only in Flink stream (#3174)" (#3251)

This reverts commit 371526789d.
This commit is contained in:
vinoth chandar
2021-07-09 11:20:09 -07:00
committed by GitHub
parent 371526789d
commit b4562e86e4
14 changed files with 33 additions and 246 deletions

View File

@@ -186,12 +186,6 @@ public class FlinkOptions {
.defaultValue(TABLE_TYPE_COPY_ON_WRITE)
.withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ");
public static final ConfigOption<Boolean> APPEND_ONLY_ENABLE = ConfigOptions
.key("append_only.enable")
.booleanType()
.defaultValue(false)
.withDescription("Whether to write data to new baseFile without index, only support in COW, default false");
public static final ConfigOption<String> OPERATION = ConfigOptions
.key("write.operation")
.stringType()

View File

@@ -28,13 +28,11 @@ import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.bootstrap.IndexRecord;
import org.apache.hudi.sink.utils.PayloadCreation;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
@@ -57,9 +55,9 @@ import java.util.Objects;
* it then assigns the bucket with ID using the {@link BucketAssigner}.
*
* <p>All the records are tagged with HoodieRecordLocation, instead of real instant time,
* INSERT record uses "INSERT" and UPSERT record uses "UPDATE" as instant time. There is no need to keep
* INSERT record uses "I" and UPSERT record uses "U" as instant time. There is no need to keep
* the "real" instant time for each record, the bucket ID (partition path & fileID) actually decides
* where the record should write to. The "INSERT" and "UPDATE" tags are only used for downstream to decide whether
* where the record should write to. The "I" and "U" tags are only used for downstream to decide whether
* the data bucket is an INSERT or an UPSERT, we should factor the tags out when the underneath writer
* supports specifying the bucket type explicitly.
*
@@ -108,18 +106,11 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
*/
private final boolean globalIndex;
private final boolean appendOnly;
public BucketAssignFunction(Configuration conf) {
this.conf = conf;
this.isChangingRecords = WriteOperationType.isChangingRecords(
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
this.globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED);
this.appendOnly = conf.getBoolean(FlinkOptions.APPEND_ONLY_ENABLE);
if (appendOnly) {
ValidationUtils.checkArgument(conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.COPY_ON_WRITE.name()),
"APPEND_ONLY mode only support in COPY_ON_WRITE table");
}
}
@Override
@@ -179,33 +170,25 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
final String partitionPath = hoodieKey.getPartitionPath();
final HoodieRecordLocation location;
if (appendOnly) {
location = getNewRecordLocation(partitionPath);
this.context.setCurrentKey(recordKey);
record.setCurrentLocation(location);
out.collect((O) record);
return;
}
// Only changing records need looking up the index for the location,
// append only records are always recognized as INSERT.
HoodieRecordGlobalLocation oldLoc = indexState.value();
if (isChangingRecords && oldLoc != null) {
// Set up the instant time as "UPDATE" to mark the bucket as an update bucket.
// Set up the instant time as "U" to mark the bucket as an update bucket.
if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) {
if (globalIndex) {
// if partition path changes, emit a delete record for old partition path,
// then update the index state using location with new partition path.
HoodieRecord<?> deleteRecord = new HoodieRecord<>(new HoodieKey(recordKey, oldLoc.getPartitionPath()),
payloadCreation.createDeletePayload((BaseAvroPayload) record.getData()));
deleteRecord.setCurrentLocation(oldLoc.toLocal(BucketType.UPDATE.name()));
deleteRecord.setCurrentLocation(oldLoc.toLocal("U"));
deleteRecord.seal();
out.collect((O) deleteRecord);
}
location = getNewRecordLocation(partitionPath);
updateIndexState(partitionPath, location);
} else {
location = oldLoc.toLocal(BucketType.UPDATE.name());
location = oldLoc.toLocal("U");
this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
}
} else {
@@ -220,26 +203,17 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
}
private HoodieRecordLocation getNewRecordLocation(String partitionPath) {
BucketInfo bucketInfo;
if (appendOnly) {
bucketInfo = this.bucketAssigner.addAppendOnly(partitionPath);
} else {
bucketInfo = this.bucketAssigner.addInsert(partitionPath);
}
final BucketInfo bucketInfo = this.bucketAssigner.addInsert(partitionPath);
final HoodieRecordLocation location;
switch (bucketInfo.getBucketType()) {
case INSERT:
// This is an insert bucket, use HoodieRecordLocation instant time as "INSERT".
// This is an insert bucket, use HoodieRecordLocation instant time as "I".
// Downstream operators can then check the instant time to know whether
// a record belongs to an insert bucket.
location = new HoodieRecordLocation(BucketType.INSERT.name(), bucketInfo.getFileIdPrefix());
location = new HoodieRecordLocation("I", bucketInfo.getFileIdPrefix());
break;
case UPDATE:
location = new HoodieRecordLocation(BucketType.UPDATE.name(), bucketInfo.getFileIdPrefix());
break;
case APPEND_ONLY:
location = new HoodieRecordLocation(BucketType.APPEND_ONLY.name(), bucketInfo.getFileIdPrefix());
location = new HoodieRecordLocation("U", bucketInfo.getFileIdPrefix());
break;
default:
throw new AssertionError();

View File

@@ -140,14 +140,6 @@ public class BucketAssigner implements AutoCloseable {
}
// if we have anything more, create new insert buckets, like normal
return getOrCreateNewFileBucket(partitionPath, BucketType.INSERT);
}
public BucketInfo addAppendOnly(String partitionPath) {
return getOrCreateNewFileBucket(partitionPath, BucketType.APPEND_ONLY);
}
private BucketInfo getOrCreateNewFileBucket(String partitionPath, BucketType bucketType) {
if (newFileAssignStates.containsKey(partitionPath)) {
NewFileAssignState newFileAssignState = newFileAssignStates.get(partitionPath);
if (newFileAssignState.canAssign()) {
@@ -156,7 +148,7 @@ public class BucketAssigner implements AutoCloseable {
final String key = StreamerUtil.generateBucketKey(partitionPath, newFileAssignState.fileId);
return bucketInfoMap.get(key);
}
BucketInfo bucketInfo = new BucketInfo(bucketType, FSUtils.createNewFileIdPfx(), partitionPath);
BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath);
final String key = StreamerUtil.generateBucketKey(partitionPath, bucketInfo.getFileIdPrefix());
bucketInfoMap.put(key, bucketInfo);
newFileAssignStates.put(partitionPath, new NewFileAssignState(bucketInfo.getFileIdPrefix(), writeProfile.getRecordsPerBucket()));

View File

@@ -69,9 +69,6 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ.", required = true)
public String tableType;
@Parameter(names = {"--append-only"}, description = "Write data to new parquet in every checkpoint. Only support in COPY_ON_WRITE table.", required = true)
public Boolean appendOnly = false;
@Parameter(names = {"--props"}, description = "Path to properties file on localfs or dfs, with configurations for "
+ "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
+ "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
@@ -293,13 +290,7 @@ public class FlinkStreamerConfig extends Configuration {
conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName);
// copy_on_write works same as COPY_ON_WRITE
conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase());
conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, config.appendOnly);
if (config.appendOnly) {
// append only should use insert operation
conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
} else {
conf.setString(FlinkOptions.OPERATION, config.operation.value());
}
conf.setString(FlinkOptions.OPERATION, config.operation.value());
conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName);
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, config.filterDupes);

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.table;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
@@ -145,11 +144,6 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
TableSchema schema) {
// table name
conf.setString(FlinkOptions.TABLE_NAME.key(), tableName);
// append only
if (conf.getBoolean(FlinkOptions.APPEND_ONLY_ENABLE)) {
// append only should use insert operation
conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
}
// hoodie key about options
setupHoodieKeyOptions(conf, table);
// cleaning options