From 6f540f429b9124c89d59b71cc6dea2c36c48ac13 Mon Sep 17 00:00:00 2001 From: Neelesh Salian Date: Tue, 14 Apr 2026 21:40:14 -0700 Subject: [PATCH] Fix thread-safety in Variant lazy caches and add comments Co-authored-by: Steve Loughran --- .../org/apache/parquet/variant/Variant.java | 188 +++++++++++------- .../parquet/variant/VariantBuilder.java | 20 +- .../parquet/variant/VariantConverters.java | 4 +- 3 files changed, 140 insertions(+), 72 deletions(-) diff --git a/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java b/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java index 0ccb8359ba..bcdf9711a1 100644 --- a/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java +++ b/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java @@ -36,6 +36,26 @@ public final class Variant { */ final ByteBuffer metadata; + /** + * Pre-computed metadata dictionary size + */ + private final int dictSize; + + /** + * Lazy cache for metadata dictionary strings. + */ + private volatile String[] metadataCache; + + /** + * Lazy cache for the parsed object header. + */ + private volatile VariantUtil.ObjectInfo cachedObjectInfo; + + /** + * Lazy cache for the parsed array header. + */ + private volatile VariantUtil.ArrayInfo cachedArrayInfo; + /** * The threshold to switch from linear search to binary search when looking up a field by key in * an object. This is a performance optimization to avoid the overhead of binary search for a @@ -67,6 +87,26 @@ public Variant(ByteBuffer value, ByteBuffer metadata) { "Unsupported variant metadata version: %d", metadata.get(metadata.position()) & VariantUtil.VERSION_MASK)); } + + // Pre-compute dictionary size for lazy metadata cache allocation. + int pos = this.metadata.position(); + int metaOffsetSize = ((this.metadata.get(pos) >> 6) & 0x3) + 1; + if (this.metadata.remaining() > 1) { + this.dictSize = VariantUtil.readUnsigned(this.metadata, pos + 1, metaOffsetSize); + } else { + this.dictSize = 0; + } + this.metadataCache = null; + } + + /** + * Package-private constructor that shares pre-parsed metadata state from a parent Variant. + */ + Variant(ByteBuffer value, ByteBuffer metadata, String[] metadataCache, int dictSize) { + this.value = value.asReadOnlyBuffer(); + this.metadata = metadata.asReadOnlyBuffer(); + this.metadataCache = metadataCache; + this.dictSize = dictSize; } public ByteBuffer getValueBuffer() { @@ -194,7 +234,7 @@ public Type getType() { * @throws IllegalArgumentException if `getType()` does not return `Type.OBJECT` */ public int numObjectElements() { - return VariantUtil.getObjectInfo(value).numElements; + return objectInfo().numElements; } /** @@ -206,22 +246,18 @@ public int numObjectElements() { * @throws IllegalArgumentException if `getType()` does not return `Type.OBJECT` */ public Variant getFieldByKey(String key) { - VariantUtil.ObjectInfo info = VariantUtil.getObjectInfo(value); - // Use linear search for a short list. Switch to binary search when the length reaches - // `BINARY_SEARCH_THRESHOLD`. + VariantUtil.ObjectInfo info = objectInfo(); + int idStart = value.position() + info.idStartOffset; + int offsetStart = value.position() + info.offsetStartOffset; + int dataStart = value.position() + info.dataStartOffset; + if (info.numElements < BINARY_SEARCH_THRESHOLD) { for (int i = 0; i < info.numElements; ++i) { - ObjectField field = getFieldAtIndex( - i, - value, - metadata, - info.idSize, - info.offsetSize, - value.position() + info.idStartOffset, - value.position() + info.offsetStartOffset, - value.position() + info.dataStartOffset); - if (field.key.equals(key)) { - return field.value; + int id = VariantUtil.readUnsigned(value, idStart + info.idSize * i, info.idSize); + String fieldKey = getMetadataKeyCached(id); + if (fieldKey.equals(key)) { + int offset = VariantUtil.readUnsigned(value, offsetStart + info.offsetSize * i, info.offsetSize); + return childVariant(VariantUtil.slice(value, dataStart + offset)); } } } else { @@ -232,22 +268,16 @@ public Variant getFieldByKey(String key) { // performance optimization, because it can properly handle the case where `low + high` // overflows int. int mid = (low + high) >>> 1; - ObjectField field = getFieldAtIndex( - mid, - value, - metadata, - info.idSize, - info.offsetSize, - value.position() + info.idStartOffset, - value.position() + info.offsetStartOffset, - value.position() + info.dataStartOffset); - int cmp = field.key.compareTo(key); + int midId = VariantUtil.readUnsigned(value, idStart + info.idSize * mid, info.idSize); + String midKey = getMetadataKeyCached(midId); + int cmp = midKey.compareTo(key); if (cmp < 0) { low = mid + 1; } else if (cmp > 0) { high = mid - 1; } else { - return field.value; + int offset = VariantUtil.readUnsigned(value, offsetStart + info.offsetSize * mid, info.offsetSize); + return childVariant(VariantUtil.slice(value, dataStart + offset)); } } } @@ -275,35 +305,14 @@ public ObjectField(String key, Variant value) { * @throws IllegalArgumentException if `getType()` does not return `Type.OBJECT` */ public ObjectField getFieldAtIndex(int idx) { - VariantUtil.ObjectInfo info = VariantUtil.getObjectInfo(value); - // Use linear search for a short list. Switch to binary search when the length reaches - // `BINARY_SEARCH_THRESHOLD`. - ObjectField field = getFieldAtIndex( - idx, - value, - metadata, - info.idSize, - info.offsetSize, - value.position() + info.idStartOffset, - value.position() + info.offsetStartOffset, - value.position() + info.dataStartOffset); - return field; - } - - static ObjectField getFieldAtIndex( - int index, - ByteBuffer value, - ByteBuffer metadata, - int idSize, - int offsetSize, - int idStart, - int offsetStart, - int dataStart) { - // idStart, offsetStart, and dataStart are absolute positions in the `value` buffer. - int id = VariantUtil.readUnsigned(value, idStart + idSize * index, idSize); - int offset = VariantUtil.readUnsigned(value, offsetStart + offsetSize * index, offsetSize); - String key = VariantUtil.getMetadataKey(metadata, id); - Variant v = new Variant(VariantUtil.slice(value, dataStart + offset), metadata); + VariantUtil.ObjectInfo info = objectInfo(); + int idStart = value.position() + info.idStartOffset; + int offsetStart = value.position() + info.offsetStartOffset; + int dataStart = value.position() + info.dataStartOffset; + int id = VariantUtil.readUnsigned(value, idStart + info.idSize * idx, info.idSize); + int offset = VariantUtil.readUnsigned(value, offsetStart + info.offsetSize * idx, info.offsetSize); + String key = getMetadataKeyCached(id); + Variant v = childVariant(VariantUtil.slice(value, dataStart + offset)); return new ObjectField(key, v); } @@ -312,7 +321,7 @@ static ObjectField getFieldAtIndex( * @throws IllegalArgumentException if `getType()` does not return `Type.ARRAY` */ public int numArrayElements() { - return VariantUtil.getArrayInfo(value).numElements; + return arrayInfo().numElements; } /** @@ -324,23 +333,64 @@ public int numArrayElements() { * @throws IllegalArgumentException if `getType()` does not return `Type.ARRAY` */ public Variant getElementAtIndex(int index) { - VariantUtil.ArrayInfo info = VariantUtil.getArrayInfo(value); + VariantUtil.ArrayInfo info = arrayInfo(); if (index < 0 || index >= info.numElements) { return null; } - return getElementAtIndex( - index, - value, - metadata, - info.offsetSize, - value.position() + info.offsetStartOffset, - value.position() + info.dataStartOffset); + int offsetStart = value.position() + info.offsetStartOffset; + int dataStart = value.position() + info.dataStartOffset; + int offset = VariantUtil.readUnsigned(value, offsetStart + info.offsetSize * index, info.offsetSize); + return childVariant(VariantUtil.slice(value, dataStart + offset)); + } + + /** + * Creates a child Variant that shares this instance's metadata cache. + */ + private Variant childVariant(ByteBuffer childValue) { + return new Variant(childValue, metadata, metadataCache, dictSize); } - private static Variant getElementAtIndex( - int index, ByteBuffer value, ByteBuffer metadata, int offsetSize, int offsetStart, int dataStart) { - // offsetStart and dataStart are absolute positions in the `value` buffer. - int offset = VariantUtil.readUnsigned(value, offsetStart + offsetSize * index, offsetSize); - return new Variant(VariantUtil.slice(value, dataStart + offset), metadata); + /** + * Returns the metadata dictionary string for the given ID, caching the result. + */ + String getMetadataKeyCached(int id) { + // Fall back to uncached lookup for out-of-range IDs + if (id < 0 || id >= dictSize) { + return VariantUtil.getMetadataKey(metadata, id); + } + // Demand-create shared dictionary cache + String[] cache = metadataCache; + if (cache == null) { + cache = new String[dictSize]; + metadataCache = cache; + } + if (cache[id] == null) { + cache[id] = VariantUtil.getMetadataKey(metadata, id); + } + return cache[id]; + } + + /** + * Returns the cached object header, parsing it on first access. + */ + private VariantUtil.ObjectInfo objectInfo() { + VariantUtil.ObjectInfo info = cachedObjectInfo; + if (info == null) { + info = VariantUtil.getObjectInfo(value); + cachedObjectInfo = info; + } + return info; + } + + /** + * Returns the cached array header, parsing it on first access. + */ + private VariantUtil.ArrayInfo arrayInfo() { + VariantUtil.ArrayInfo info = cachedArrayInfo; + if (info == null) { + info = VariantUtil.getArrayInfo(value); + cachedArrayInfo = info; + } + return info; } } diff --git a/parquet-variant/src/main/java/org/apache/parquet/variant/VariantBuilder.java b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantBuilder.java index 4b8eb6d8c3..213f69729f 100644 --- a/parquet-variant/src/main/java/org/apache/parquet/variant/VariantBuilder.java +++ b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantBuilder.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Set; +import org.apache.parquet.io.api.Binary; /** * Builder for creating Variant value and metadata. @@ -109,7 +110,14 @@ public void appendEncodedValue(ByteBuffer value) { */ public void appendString(String str) { onAppend(); - byte[] data = str.getBytes(StandardCharsets.UTF_8); + writeUTF8bytes(str.getBytes(StandardCharsets.UTF_8)); + } + + /** + * Write bytes as a UTF8 string. + * @param data data to write; this is not modified. + */ + private void writeUTF8bytes(final byte[] data) { boolean longStr = data.length > VariantUtil.MAX_SHORT_STR_SIZE; checkCapacity((longStr ? 1 + VariantUtil.U32_SIZE : 1) + data.length); if (longStr) { @@ -125,6 +133,16 @@ public void appendString(String str) { writePos += data.length; } + /** + * Given a Binary, append it to the variant as a string. + * Avoids intermediate String creation when unmarshalling from shredded string columns. + * @param binary source data. + */ + void appendAsString(Binary binary) { + onAppend(); + writeUTF8bytes(binary.getBytesUnsafe()); + } + /** * Appends a null value to the Variant builder. */ diff --git a/parquet-variant/src/main/java/org/apache/parquet/variant/VariantConverters.java b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantConverters.java index 6d0986c2b2..bda088c55d 100644 --- a/parquet-variant/src/main/java/org/apache/parquet/variant/VariantConverters.java +++ b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantConverters.java @@ -233,6 +233,7 @@ static class PartiallyShreddedFieldsConverter extends GroupConverter { PartiallyShreddedFieldsConverter(GroupType fieldsType, ParentConverter parent) { this.converters = new Converter[fieldsType.getFieldCount()]; this.parent = parent; + ParentConverter newParent = converter -> converter.accept(objectBuilder); for (int index = 0; index < fieldsType.getFieldCount(); index += 1) { Type field = fieldsType.getType(index); @@ -240,7 +241,6 @@ static class PartiallyShreddedFieldsConverter extends GroupConverter { String name = field.getName(); shreddedFieldNames.add(name); - ParentConverter newParent = converter -> converter.accept(objectBuilder); converters[index] = new FieldValueConverter(name, field.asGroupType(), newParent); } } @@ -501,7 +501,7 @@ static class VariantStringConverter extends ShreddedScalarConverter { @Override public void addBinary(Binary value) { - parent.build(builder -> builder.appendString(value.toStringUsingUTF8())); + parent.build(builder -> builder.appendAsString(value)); } }