1
0

[HUDI-1612] Fix write test flakiness in StreamWriteITCase (#2567)

* [HUDI-1612] Fix write test flakiness in StreamWriteITCase
This commit is contained in:
lamber-ken
2021-02-11 23:37:19 +08:00
committed by GitHub
parent 26da4f5462
commit ff0e3f5669
2 changed files with 63 additions and 9 deletions

View File

@@ -54,6 +54,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.File; import java.io.File;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -65,13 +66,13 @@ import java.util.concurrent.TimeUnit;
*/ */
public class StreamWriteITCase extends TestLogger { public class StreamWriteITCase extends TestLogger {
private static final Map<String, String> EXPECTED = new HashMap<>(); private static final Map<String, List<String>> EXPECTED = new HashMap<>();
static { static {
EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1, id2,par1,id2,Stephen,33,2000,par1]"); EXPECTED.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1"));
EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2, id4,par2,id4,Fabian,31,4000,par2]"); EXPECTED.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2"));
EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, id6,par3,id6,Emma,20,6000,par3]"); EXPECTED.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3"));
EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4, id8,par4,id8,Han,56,8000,par4]"); EXPECTED.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4"));
} }
@TempDir @TempDir
@@ -85,6 +86,7 @@ public class StreamWriteITCase extends TestLogger {
execEnv.setParallelism(4); execEnv.setParallelism(4);
// set up checkpoint interval // set up checkpoint interval
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// Read from file source // Read from file source
RowType rowType = RowType rowType =
@@ -137,7 +139,7 @@ public class StreamWriteITCase extends TestLogger {
} }
} }
TestData.checkWrittenData(tempFile, EXPECTED); TestData.checkWrittenFullData(tempFile, EXPECTED);
} }
@Test @Test
@@ -215,6 +217,6 @@ public class StreamWriteITCase extends TestLogger {
} }
} }
TestData.checkWrittenData(tempFile, EXPECTED); TestData.checkWrittenFullData(tempFile, EXPECTED);
} }
} }

View File

@@ -18,7 +18,13 @@
package org.apache.hudi.operator.utils; package org.apache.hudi.operator.utils;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
@@ -49,6 +55,7 @@ import static junit.framework.TestCase.assertEquals;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** Data set for testing, also some utilities to check the results. */ /** Data set for testing, also some utilities to check the results. */
public class TestData { public class TestData {
@@ -105,7 +112,7 @@ public class TestData {
* *
* <p>Note: Replace it with the Flink reader when it is supported. * <p>Note: Replace it with the Flink reader when it is supported.
* *
* @param baseFile The file base to check, should be a directly * @param baseFile The file base to check, should be a directory
* @param expected The expected results mapping, the key should be the partition path * @param expected The expected results mapping, the key should be the partition path
*/ */
public static void checkWrittenData(File baseFile, Map<String, String> expected) throws IOException { public static void checkWrittenData(File baseFile, Map<String, String> expected) throws IOException {
@@ -117,7 +124,7 @@ public class TestData {
* *
* <p>Note: Replace it with the Flink reader when it is supported. * <p>Note: Replace it with the Flink reader when it is supported.
* *
* @param baseFile The file base to check, should be a directly * @param baseFile The file base to check, should be a directory
* @param expected The expected results mapping, the key should be the partition path * @param expected The expected results mapping, the key should be the partition path
* @param partitions The expected partition number * @param partitions The expected partition number
*/ */
@@ -149,6 +156,51 @@ public class TestData {
} }
} }
/**
* Checks the source data are written as expected.
*
* <p>Note: Replace it with the Flink reader when it is supported.
*
* @param basePath The file base to check, should be a directory
* @param expected The expected results mapping, the key should be the partition path
*/
public static void checkWrittenFullData(
File basePath,
Map<String, List<String>> expected) throws IOException {
// 1. init flink table
HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.getAbsolutePath());
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath.getAbsolutePath()).build();
FlinkTaskContextSupplier supplier = new FlinkTaskContextSupplier(null);
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(supplier);
HoodieFlinkTable table = HoodieFlinkTable.create(config, context, metaClient);
// 2. check each partition data
expected.forEach((partition, partitionDataSet) -> {
List<String> readBuffer = new ArrayList<>();
table.getFileSystemView().getAllFileGroups(partition)
.forEach(v -> v.getLatestDataFile().ifPresent(baseFile -> {
String path = baseFile.getPath();
try {
ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(new Path(path)).build();
GenericRecord nextRecord = reader.read();
while (nextRecord != null) {
readBuffer.add(filterOutVariables(nextRecord));
nextRecord = reader.read();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
assertTrue(partitionDataSet.size() == readBuffer.size() && partitionDataSet.containsAll(readBuffer));
});
}
/** /**
* Filter out the variables like file name. * Filter out the variables like file name.
*/ */