fix: cache object stores and bucket regions to reduce DNS query volume#3802
fix: cache object stores and bucket regions to reduce DNS query volume#3802andygrove wants to merge 5 commits intoapache:mainfrom
Conversation
When reading Parquet files from S3, each call to initRecordBatchReader creates a new SessionContext, RuntimeEnv, and S3 ObjectStore client. Each ObjectStore instance creates a new reqwest HTTP client with its own connection pool, requiring fresh DNS resolution for every file opened. In TPC-DS-scale workloads with thousands of files, this generates excessive DNS queries that can overwhelm resolvers (e.g., Route53 Resolver limits in EKS environments), causing UnknownHostException errors and intermittent S3 connectivity failures. This commit introduces two caches: 1. A global ObjectStore cache keyed by (URL prefix, config hash) in prepare_object_store_with_configs(). Subsequent reads from the same bucket with the same configuration reuse the existing ObjectStore, enabling HTTP connection pooling and eliminating redundant DNS lookups. 2. A region resolution cache in resolve_bucket_region(). When no region is explicitly configured, each file read triggers a HeadBucket API call (with its own reqwest client) to determine the bucket region. The cache ensures this call happens only once per bucket.
|
Seems similar to the challenges I'm seeing in iceberg-rust apache/iceberg-rust#2177 apache/iceberg-rust#2172. |
Path::parse re-encodes percent-encoded characters, causing double-encoding (e.g. %27 → %2527) when reusing cached object stores. Use Path::from_url_path which correctly handles already-encoded URL paths, consistent with other call sites in the codebase.
|
How would we verify that the number of DNS requests has gone down? |
There was a problem hiding this comment.
Thanks for investigating this @andygrove! I am afraid of global singletons. Please justify that life cycle and add documentation why it's not an issue, particularly in expired credentials scenarios.
| /// excessive DNS queries that can overwhelm DNS resolvers (e.g., Route53 Resolver limits | ||
| /// in EKS environments). | ||
| fn object_store_cache() -> &'static ObjectStoreCache { | ||
| static CACHE: OnceLock<ObjectStoreCache> = OnceLock::new(); |
There was a problem hiding this comment.
Anything static (global singleton) should be documented why static is the reasonable life cycle. In particular, I wonder about the unbounded size of a static cache, invalidation scenarios (what if a job runs long enough and needs new credentials passed into the object_store?), and why there was no other location with a reasonable life cycle to own this cache.
There was a problem hiding this comment.
Added docs, but I have not yet confirmed that the part about credential provider interaction is actually correct, so moved to draft for now.
…ches Address review comment asking for justification of global singleton caches. Add comments explaining: - Why process lifetime is the right scope (JNI creates a new RuntimeEnv per file, leaving no executor-scoped Rust object to own the cache) - Why unbounded size is acceptable (bounded by distinct bucket+config combinations, which is small in practice) - Credential invalidation behaviour (dynamic providers refresh transparently; static-credential config changes produce a new hash and a new cache entry) - Why region cache needs no invalidation (S3 bucket regions are immutable after bucket creation)
|
I tested with TPC-H @ 1TB in AWS and this does appear to have resolved the |
In the standard Spark-on-Kubernetes deployment model each executor pod is dedicated to a single Spark application, so the static cache lifetime is effectively bounded by the application lifetime.
Which issue does this PR close?
Part of #3799.
Rationale for this change
When reading Parquet files from S3, each call to
initRecordBatchReadercreates a newSessionContext,RuntimeEnv, and S3ObjectStoreclient. EachObjectStoreinstance creates a newreqwestHTTP client with its own connection pool, requiring fresh DNS resolution for every file opened.In TPC-DS-scale workloads with thousands of Parquet files, this generates excessive DNS queries that can overwhelm DNS resolvers (e.g., Route53 Resolver limits in EKS environments), causing
UnknownHostExceptionerrors, intermittent S3 connectivity failures, and job failures under high concurrency.Native Spark does not have this problem because Hadoop's
S3AFileSystemmaintains singleton instances per filesystem URI (viaFileSystem.get()cache) and benefits from JVM-level DNS caching. Comet's native Rust layer bypasses all of this.Additionally, when no region is explicitly configured, each file read triggers a
HeadBucketAPI call with a throwawayreqwest::Clientto auto-detect the bucket region, doubling the DNS query volume.What changes are included in this PR?
Two caches are introduced in the native Rust layer:
ObjectStore cache (
parquet_support.rs): A global cache keyed by(URL prefix, config hash)inprepare_object_store_with_configs(). Subsequent reads from the same bucket with the same configuration reuse the existingObjectStoreinstance, enabling HTTP connection pooling viareqwestand eliminating redundant DNS lookups.Region resolution cache (
s3.rs): A per-bucket cache inresolve_bucket_region()that ensures theHeadBucketAPI call (used to auto-detect bucket region when not explicitly configured) happens only once per bucket per JVM lifetime.Together these changes reduce DNS query volume from O(num_files) to O(num_distinct_buckets) per executor.
Workaround for users on current versions: Set
fs.s3a.endpoint.regionexplicitly to avoid theHeadBucketauto-detection entirely.How are these changes tested?
parquet_support.rscontinue to pass (they test local filesystem paths which exercise the sameprepare_object_store_with_configscode path).ObjectStoreinstances that would have been created without caching, so existing S3 integration tests cover correctness.