diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 357e3f574..c992ae537 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -305,6 +305,11 @@ public class HoodieWriteConfig extends HoodieConfig { + "By default, we use a persistent hashmap based loosely on bitcask, that offers O(1) inserts, lookups. " + "Change this to `ROCKS_DB` to prefer using rocksDB, for handling the spill."); + public static final ConfigProperty DISK_MAP_BITCASK_COMPRESSION_ENABLED = ConfigProperty + .key("hoodie.diskmap.bitcask.compression.enabled") + .defaultValue(true) + .withDocumentation("Turn on compression for BITCASK disk map used by the External Spillable Map"); + public static final ConfigProperty CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP = ConfigProperty .key("hoodie.client.heartbeat.interval_in_ms") .defaultValue(60 * 1000) @@ -582,6 +587,10 @@ public class HoodieWriteConfig extends HoodieConfig { return ExternalSpillableMap.DiskMapType.valueOf(getString(SPILLABLE_DISK_MAP_TYPE).toUpperCase(Locale.ROOT)); } + public boolean isBitCaskDiskMapCompressionEnabled() { + return getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED); + } + public EngineType getEngineType() { return engineType; } @@ -1541,6 +1550,11 @@ public class HoodieWriteConfig extends HoodieConfig { return this; } + public Builder withBitcaskDiskMapCompressionEnabled(boolean bitcaskDiskMapCompressionEnabled) { + writeConfig.setValue(DISK_MAP_BITCASK_COMPRESSION_ENABLED, String.valueOf(bitcaskDiskMapCompressionEnabled)); + return this; + } + public Builder withHeartbeatIntervalInMs(Integer heartbeatIntervalInMs) { writeConfig.setValue(CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP, String.valueOf(heartbeatIntervalInMs)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index a4dc8d44e..e8f1b5142 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -200,7 +200,7 @@ public class HoodieMergeHandle extends H LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge); this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(tableSchema), - config.getSpillableDiskMapType()); + config.getSpillableDiskMapType(), config.isBitCaskDiskMapCompressionEnabled()); } catch (IOException io) { throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java index b525289c9..d9e9701f4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java @@ -29,9 +29,12 @@ import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.RandomAccessFile; import java.io.Serializable; import java.net.InetAddress; @@ -47,6 +50,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; +import java.util.zip.Deflater; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.InflaterInputStream; /** * This class provides a disk spillable only map implementation. All of the data is currenly written to one file, @@ -59,27 +65,33 @@ public final class BitCaskDiskMap DISK_COMPRESSION_REF = + ThreadLocal.withInitial(CompressionHandler::new); // Stores the key and corresponding value's latest metadata spilled to disk private final Map valueMetadataMap; + // Enables compression for all values stored in the disk map + private final boolean isCompressionEnabled; // Write only file - private File writeOnlyFile; + private final File writeOnlyFile; // Write only OutputStream to be able to ONLY append to the file - private SizeAwareDataOutputStream writeOnlyFileHandle; + private final SizeAwareDataOutputStream writeOnlyFileHandle; // FileOutputStream for the file handle to be able to force fsync // since FileOutputStream's flush() does not force flush to disk - private FileOutputStream fileOutputStream; + private final FileOutputStream fileOutputStream; // Current position in the file - private AtomicLong filePosition; + private final AtomicLong filePosition; // FilePath to store the spilled data - private String filePath; + private final String filePath; // Thread-safe random access file - private ThreadLocal randomAccessFile = new ThreadLocal<>(); - private Queue openedAccessFiles = new ConcurrentLinkedQueue<>(); + private final ThreadLocal randomAccessFile = new ThreadLocal<>(); + private final Queue openedAccessFiles = new ConcurrentLinkedQueue<>(); private transient Thread shutdownThread = null; - public BitCaskDiskMap(String baseFilePath) throws IOException { + public BitCaskDiskMap(String baseFilePath, boolean isCompressionEnabled) throws IOException { this.valueMetadataMap = new ConcurrentHashMap<>(); + this.isCompressionEnabled = isCompressionEnabled; this.writeOnlyFile = new File(baseFilePath, UUID.randomUUID().toString()); this.filePath = writeOnlyFile.getPath(); initFile(writeOnlyFile); @@ -88,6 +100,10 @@ public final class BitCaskDiskMap iterator() { - return new LazyFileIterable(filePath, valueMetadataMap).iterator(); + return new LazyFileIterable(filePath, valueMetadataMap, isCompressionEnabled).iterator(); } /** @@ -188,13 +204,16 @@ public final class BitCaskDiskMap R get(ValueMetadata entry, RandomAccessFile file) { + public static R get(ValueMetadata entry, RandomAccessFile file, boolean isCompressionEnabled) { try { - return SerializationUtils - .deserialize(SpillableMapUtils.readBytesFromDisk(file, entry.getOffsetOfValue(), entry.getSizeOfValue())); + byte[] bytesFromDisk = SpillableMapUtils.readBytesFromDisk(file, entry.getOffsetOfValue(), entry.getSizeOfValue()); + if (isCompressionEnabled) { + return SerializationUtils.deserialize(DISK_COMPRESSION_REF.get().decompressBytes(bytesFromDisk)); + } + return SerializationUtils.deserialize(bytesFromDisk); } catch (IOException e) { throw new HoodieIOException("Unable to readFromDisk Hoodie Record from disk", e); } @@ -202,7 +221,8 @@ public final class BitCaskDiskMap valueStream() { final BufferedRandomAccessFile file = getRandomAccessFile(); - return valueMetadataMap.values().stream().sorted().sequential().map(valueMetaData -> (R) get(valueMetaData, file)); + return valueMetadataMap.values().stream().sorted().sequential().map(valueMetaData -> (R) get(valueMetaData, file, isCompressionEnabled)); } @Override @@ -399,4 +419,47 @@ public final class BitCaskDiskMap 0) { + decompressBaos.write(decompressIntermediateBuffer, 0, len); + } + return decompressBaos.toByteArray(); + } catch (IOException e) { + throw new HoodieIOException("IOException while decompressing bytes", e); + } + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java index dd5b9c240..8044f8410 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java @@ -72,6 +72,8 @@ public class ExternalSpillableMap valueSizeEstimator; // Type of the disk map private final DiskMapType diskMapType; + // Enables compression of values stored in disc + private final boolean isCompressionEnabled; // current space occupied by this map in-memory private Long currentInMemoryMapSize; // An estimate of the size of each payload written to this map @@ -88,6 +90,11 @@ public class ExternalSpillableMap keySizeEstimator, SizeEstimator valueSizeEstimator, DiskMapType diskMapType) throws IOException { + this(maxInMemorySizeInBytes, baseFilePath, keySizeEstimator, valueSizeEstimator, diskMapType, false); + } + + public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath, SizeEstimator keySizeEstimator, + SizeEstimator valueSizeEstimator, DiskMapType diskMapType, boolean isCompressionEnabled) throws IOException { this.inMemoryMap = new HashMap<>(); this.baseFilePath = baseFilePath; this.maxInMemorySizeInBytes = (long) Math.floor(maxInMemorySizeInBytes * sizingFactorForInMemoryMap); @@ -95,6 +102,7 @@ public class ExternalSpillableMap getDiskBasedMap() { @@ -108,7 +116,7 @@ public class ExternalSpillableMap(baseFilePath); + diskBasedMap = new BitCaskDiskMap<>(baseFilePath, isCompressionEnabled); } } catch (IOException e) { throw new HoodieIOException(e.getMessage(), e); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java index 024f55542..33d07d5bc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java @@ -37,12 +37,19 @@ public class LazyFileIterable implements Iterable { private final String filePath; // Stores the key and corresponding value's latest metadata spilled to disk private final Map inMemoryMetadataOfSpilledData; + // Was compressions enabled for the values when inserted into the file/ map + private final boolean isCompressionEnabled; private transient Thread shutdownThread = null; public LazyFileIterable(String filePath, Map map) { + this(filePath, map, false); + } + + public LazyFileIterable(String filePath, Map map, boolean isCompressionEnabled) { this.filePath = filePath; this.inMemoryMetadataOfSpilledData = map; + this.isCompressionEnabled = isCompressionEnabled; } @Override @@ -91,7 +98,7 @@ public class LazyFileIterable implements Iterable { throw new IllegalStateException("next() called on EOF'ed stream. File :" + filePath); } Map.Entry entry = this.metadataIterator.next(); - return BitCaskDiskMap.get(entry.getValue(), readOnlyFileHandle); + return BitCaskDiskMap.get(entry.getValue(), readOnlyFileHandle, isCompressionEnabled); } @Override diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java index 45aaff377..dd3d3fcd7 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java @@ -38,6 +38,8 @@ import org.apache.avro.generic.IndexedRecord; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.io.UncheckedIOException; @@ -66,27 +68,33 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness { initPath(); } - @Test - public void testSimpleInsert() throws IOException, URISyntaxException { - BitCaskDiskMap records = new BitCaskDiskMap<>(basePath); + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testSimpleInsert(boolean isCompressionEnabled) throws IOException, URISyntaxException { + BitCaskDiskMap records = new BitCaskDiskMap<>(basePath, isCompressionEnabled); List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); - ((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records); + Map originalRecords = iRecords.stream() + .collect(Collectors.toMap(k -> ((GenericRecord) k).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), v -> v)); + // make sure records have spilled to disk assertTrue(records.sizeOfFileOnDiskInBytes() > 0); Iterator> itr = records.iterator(); - List oRecords = new ArrayList<>(); while (itr.hasNext()) { HoodieRecord rec = itr.next(); - oRecords.add(rec); assert recordKeys.contains(rec.getRecordKey()); + IndexedRecord originalRecord = originalRecords.get(rec.getRecordKey()); + HoodieAvroPayload payload = (HoodieAvroPayload) rec.getData(); + Option value = payload.getInsertValue(HoodieAvroUtils.addMetadataFields(getSimpleSchema())); + assertEquals(originalRecord, value.get()); } } - @Test - public void testSimpleInsertWithoutHoodieMetadata() throws IOException, URISyntaxException { - BitCaskDiskMap records = new BitCaskDiskMap<>(basePath); + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testSimpleInsertWithoutHoodieMetadata(boolean isCompressionEnabled) throws IOException, URISyntaxException { + BitCaskDiskMap records = new BitCaskDiskMap<>(basePath, isCompressionEnabled); List hoodieRecords = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000); Set recordKeys = new HashSet<>(); // insert generated records into the map @@ -105,11 +113,12 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness { } } - @Test - public void testSimpleUpsert() throws IOException, URISyntaxException { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testSimpleUpsert(boolean isCompressionEnabled) throws IOException, URISyntaxException { Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); - BitCaskDiskMap records = new BitCaskDiskMap<>(basePath); + BitCaskDiskMap records = new BitCaskDiskMap<>(basePath, isCompressionEnabled); List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); // perform some inserts @@ -187,9 +196,10 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness { assertTrue(payloadSize > 0); } - @Test - public void testPutAll() throws IOException, URISyntaxException { - BitCaskDiskMap records = new BitCaskDiskMap<>(basePath); + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testPutAll(boolean isCompressionEnabled) throws IOException, URISyntaxException { + BitCaskDiskMap records = new BitCaskDiskMap<>(basePath, isCompressionEnabled); List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); Map recordMap = new HashMap<>(); iRecords.forEach(r -> { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java index c82d99932..05131516a 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java @@ -39,7 +39,8 @@ import org.junit.jupiter.api.MethodOrderer.Alphanumeric; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; import java.io.UncheckedIOException; @@ -48,11 +49,13 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.params.provider.Arguments.arguments; /** * Tests external spillable map {@link ExternalSpillableMap}. @@ -69,13 +72,13 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness { } @ParameterizedTest - @EnumSource(ExternalSpillableMap.DiskMapType.class) - public void simpleInsertTest(ExternalSpillableMap.DiskMapType diskMapType) throws IOException, URISyntaxException { + @MethodSource("testArguments") + public void simpleInsertTest(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled) throws IOException, URISyntaxException { Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); ExternalSpillableMap> records = new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), - new HoodieRecordSizeEstimator(schema), diskMapType); // 16B + new HoodieRecordSizeEstimator(schema), diskMapType, isCompressionEnabled); // 16B List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records); @@ -102,14 +105,13 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness { } @ParameterizedTest - @EnumSource(ExternalSpillableMap.DiskMapType.class) - public void testSimpleUpsert(ExternalSpillableMap.DiskMapType diskMapType) throws IOException, URISyntaxException { - + @MethodSource("testArguments") + public void testSimpleUpsert(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled) throws IOException, URISyntaxException { Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); ExternalSpillableMap> records = new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), - new HoodieRecordSizeEstimator(schema), diskMapType); // 16B + new HoodieRecordSizeEstimator(schema), diskMapType, isCompressionEnabled); // 16B List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records); @@ -140,15 +142,15 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness { } @ParameterizedTest - @EnumSource(ExternalSpillableMap.DiskMapType.class) - public void testAllMapOperations(ExternalSpillableMap.DiskMapType diskMapType) throws IOException, URISyntaxException { + @MethodSource("testArguments") + public void testAllMapOperations(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled) throws IOException, URISyntaxException { Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); String payloadClazz = HoodieAvroPayload.class.getName(); ExternalSpillableMap> records = new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), - new HoodieRecordSizeEstimator(schema), diskMapType); // 16B + new HoodieRecordSizeEstimator(schema), diskMapType, isCompressionEnabled); // 16B List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); // insert a bunch of records so that values spill to disk too @@ -198,13 +200,13 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness { } @ParameterizedTest - @EnumSource(ExternalSpillableMap.DiskMapType.class) - public void simpleTestWithException(ExternalSpillableMap.DiskMapType diskMapType) throws IOException, URISyntaxException { + @MethodSource("testArguments") + public void simpleTestWithException(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled) throws IOException, URISyntaxException { Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); ExternalSpillableMap> records = new ExternalSpillableMap<>(16L, failureOutputPath, new DefaultSizeEstimator(), - new HoodieRecordSizeEstimator(schema), diskMapType); // 16B + new HoodieRecordSizeEstimator(schema), diskMapType, isCompressionEnabled); // 16B List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records); @@ -218,14 +220,14 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness { } @ParameterizedTest - @EnumSource(ExternalSpillableMap.DiskMapType.class) - public void testDataCorrectnessWithUpsertsToDataInMapAndOnDisk(ExternalSpillableMap.DiskMapType diskMapType) throws IOException, URISyntaxException { + @MethodSource("testArguments") + public void testDataCorrectnessWithUpsertsToDataInMapAndOnDisk(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled) throws IOException, URISyntaxException { Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); ExternalSpillableMap> records = new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), - new HoodieRecordSizeEstimator(schema), diskMapType); // 16B + new HoodieRecordSizeEstimator(schema), diskMapType, isCompressionEnabled); // 16B List recordKeys = new ArrayList<>(); // Ensure we spill to disk @@ -271,14 +273,14 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness { } @ParameterizedTest - @EnumSource(ExternalSpillableMap.DiskMapType.class) - public void testDataCorrectnessWithoutHoodieMetadata(ExternalSpillableMap.DiskMapType diskMapType) throws IOException, URISyntaxException { + @MethodSource("testArguments") + public void testDataCorrectnessWithoutHoodieMetadata(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled) throws IOException, URISyntaxException { Schema schema = SchemaTestUtil.getSimpleSchema(); ExternalSpillableMap> records = new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), - new HoodieRecordSizeEstimator(schema), diskMapType); // 16B + new HoodieRecordSizeEstimator(schema), diskMapType, isCompressionEnabled); // 16B List recordKeys = new ArrayList<>(); // Ensure we spill to disk @@ -338,4 +340,13 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness { // TODO : come up with a performance eval test for spillableMap @Test public void testLargeInsertUpsert() {} -} \ No newline at end of file + + private static Stream testArguments() { + return Stream.of( + arguments(ExternalSpillableMap.DiskMapType.BITCASK, false), + arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false), + arguments(ExternalSpillableMap.DiskMapType.UNKNOWN, false), + arguments(ExternalSpillableMap.DiskMapType.BITCASK, true) + ); + } +}