diff --git a/Cargo.toml b/Cargo.toml index 2d35ac750..cf758ef3b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,6 +104,10 @@ opentelemetry-proto = { git = "https://github.com/open-telemetry/opentelemetry-r "metrics", "trace", ] } +opentelemetry = "0.29" +opentelemetry_sdk = { version = "0.29", features = ["rt-tokio"] } +opentelemetry-otlp = { version = "0.29", features = ["http-json", "grpc-tonic"] } +tracing-opentelemetry = "0.30" prometheus = { version = "0.13.4", default-features = false, features = ["process"] } prometheus-parse = "0.2.5" tracing = "0.1" diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index c7641eaac..36669fbfd 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -43,7 +43,7 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Instant; use tokio::task::JoinSet; -use tracing::{error, warn}; +use tracing::{error, instrument, warn}; use crate::event::{DEFAULT_TIMESTAMP_KEY, commit_schema}; use crate::metrics::{QUERY_EXECUTE_TIME, increment_query_calls_by_date}; @@ -115,6 +115,7 @@ pub async fn get_records_and_fields( Ok((Some(records), Some(fields))) } +#[instrument(name = "POST /query", skip_all, fields(http.request.method = "POST", http.route = "/query"))] pub async fn query(req: HttpRequest, query_request: Query) -> Result { let mut session_state = QUERY_SESSION.get_ctx().state(); let time_range = @@ -179,6 +180,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result, @@ -238,6 +241,7 @@ async fn handle_non_streaming_query( tenant_id: &Option, ) -> Result { let first_table_name = table_name[0].clone(); + tracing::Span::current().record("db.collection.name", &first_table_name.as_str()); let (records, fields) = execute(query, query_request.streaming, tenant_id).await?; let records = match records { Either::Left(rbs) => rbs, @@ -283,6 +287,7 @@ async fn handle_non_streaming_query( /// /// # Returns /// - `HttpResponse` streaming the query results as NDJSON, optionally prefixed with the fields array. +#[instrument(name = "handle_streaming_query", skip_all, fields(db.collection.name))] async fn handle_streaming_query( query: LogicalQuery, table_name: Vec, @@ -291,6 +296,7 @@ async fn handle_streaming_query( tenant_id: &Option, ) -> Result { let first_table_name = table_name[0].clone(); + tracing::Span::current().record("db.collection.name", &first_table_name.as_str()); let (records_stream, fields) = execute(query, query_request.streaming, tenant_id).await?; let records_stream = match records_stream { Either::Left(_) => { @@ -516,6 +522,7 @@ pub async fn update_schema_when_distributed( /// Create streams for querier if they do not exist /// get list of streams from memory and storage /// create streams for memory from storage if they do not exist +#[instrument(name = "create_streams_for_distributed", skip_all)] pub async fn create_streams_for_distributed( streams: Vec, tenant_id: &Option, @@ -526,17 +533,21 @@ pub async fn create_streams_for_distributed( let mut join_set = JoinSet::new(); for stream_name in streams { let id = tenant_id.to_owned(); - join_set.spawn(async move { - let result = PARSEABLE - .create_stream_and_schema_from_storage(&stream_name, &id) - .await; - - if let Err(e) = &result { - warn!("Failed to create stream '{}': {}", stream_name, e); - } - - (stream_name, result) - }); + let span = tracing::Span::current(); + join_set.spawn(tracing::Instrument::instrument( + async move { + let result = PARSEABLE + .create_stream_and_schema_from_storage(&stream_name, &id) + .await; + + if let Err(e) = &result { + warn!("Failed to create stream '{}': {}", stream_name, e); + } + + (stream_name, result) + }, + span, + )); } while let Some(result) = join_set.join_next().await { @@ -579,6 +590,7 @@ impl FromRequest for Query { } } +#[instrument(name = "into_query", skip_all, fields(db.query.text = %query.query))] pub async fn into_query( query: &Query, session_state: &SessionState, diff --git a/src/lib.rs b/src/lib.rs index 5c3704d5c..0f24264a9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -49,6 +49,7 @@ mod static_schema; mod stats; pub mod storage; pub mod sync; +pub mod telemetry; pub mod tenants; pub mod users; pub mod utils; diff --git a/src/main.rs b/src/main.rs index 42cba34f5..1f2f95508 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,9 +19,10 @@ use std::process::exit; */ #[cfg(feature = "kafka")] use parseable::connectors; +use opentelemetry_sdk::trace::SdkTracerProvider; use parseable::{ IngestServer, ParseableServer, QueryServer, Server, banner, metrics, option::Mode, - parseable::PARSEABLE, rbac, storage, + parseable::PARSEABLE, rbac, storage, telemetry, }; use tokio::signal::ctrl_c; use tokio::sync::oneshot; @@ -33,7 +34,7 @@ use tracing_subscriber::{EnvFilter, Registry, fmt}; #[actix_web::main] async fn main() -> anyhow::Result<()> { - init_logger(); + let otel_provider = init_logger(); // Install the rustls crypto provider before any TLS operations. // This is required for rustls 0.23+ which needs an explicit crypto provider. // If the installation fails, log a warning but continue execution. @@ -95,10 +96,17 @@ async fn main() -> anyhow::Result<()> { parseable_server.await?; } + // Flush any buffered OTel spans before exit + if let Some(provider) = otel_provider { + if let Err(e) = provider.shutdown() { + warn!("Failed to shut down OTel tracer provider: {:?}", e); + } + } + Ok(()) } -pub fn init_logger() { +pub fn init_logger() -> Option { let filter_layer = EnvFilter::try_from_default_env().unwrap_or_else(|_| { let default_level = if cfg!(debug_assertions) { Level::DEBUG @@ -116,10 +124,23 @@ pub fn init_logger() { .with_target(true) .compact(); - Registry::default() - .with(filter_layer) - .with(fmt_layer) - .init(); + let otel_provider = telemetry::init_otel_tracer(); + + if let Some(ref provider) = otel_provider { + let otel_layer = telemetry::build_otel_layer(provider); + Registry::default() + .with(filter_layer) + .with(fmt_layer) + .with(otel_layer) + .init(); + } else { + Registry::default() + .with(filter_layer) + .with(fmt_layer) + .init(); + } + + otel_provider } #[cfg(windows)] diff --git a/src/query/mod.rs b/src/query/mod.rs index e88ef802f..b7398bcd6 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -55,6 +55,7 @@ use std::sync::{Arc, RwLock}; use std::task::{Context, Poll}; use sysinfo::System; use tokio::runtime::Runtime; +use tracing::instrument; use self::error::ExecuteError; use self::stream_schema_provider::GlobalSchemaProvider; @@ -133,6 +134,7 @@ impl InMemorySessionContext { /// This function executes a query on the dedicated runtime, ensuring that the query is not isolated to a single thread/CPU /// at a time and has access to the entire thread pool, enabling better concurrent processing, and thus quicker results. +#[instrument(name = "query.execute", skip_all, fields(query.streaming = %is_streaming))] pub async fn execute( query: Query, is_streaming: bool, @@ -165,8 +167,33 @@ pub async fn execute( ExecuteError, > { let id = tenant_id.clone(); + + // QUERY_RUNTIME is a separate Runtime::new() (different OS thread pool). + // tracing::Span does NOT propagate OTel context across OS threads. + // Use W3C TraceContext propagation to preserve the trace ID. + let mut carrier = std::collections::HashMap::new(); + { + use tracing_opentelemetry::OpenTelemetrySpanExt; + let cx = tracing::Span::current().context(); + opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.inject_context(&cx, &mut carrier); + }); + } + QUERY_RUNTIME - .spawn(async move { query.execute(is_streaming, &id).await }) + .spawn(async move { + // Extract the propagated context on the QUERY_RUNTIME thread + let parent_cx = opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.extract(&carrier) + }); + let span = tracing::info_span!("query.execute_runtime"); + { + use tracing_opentelemetry::OpenTelemetrySpanExt; + span.set_parent(parent_cx); + } + let _guard = span.enter(); + query.execute(is_streaming, &id).await + }) .await .expect("The Join should have been successful") } @@ -272,6 +299,7 @@ impl Query { /// this function returns the result of the query /// if streaming is true, it returns a stream /// if streaming is false, it returns a vector of record batches + #[instrument(name = "Query.execute_datafusion", skip_all, fields(query.streaming = %is_streaming))] pub async fn execute( &self, is_streaming: bool, @@ -526,6 +554,7 @@ impl CountsRequest { /// This function is supposed to read maninfest files for the given stream, /// get the sum of `num_rows` between the `startTime` and `endTime`, /// divide that by number of bins and return in a manner acceptable for the console + #[instrument(name = "get_bin_density", skip_all, fields(db.collection.name = %self.stream))] pub async fn get_bin_density( &self, tenant_id: &Option, @@ -731,6 +760,7 @@ pub fn resolve_stream_names(sql: &str) -> Result, anyhow::Error> { Ok(tables) } +#[instrument(name = "get_manifest_list", skip_all, fields(db.collection.name = %stream_name))] pub async fn get_manifest_list( stream_name: &str, time_range: &TimeRange, diff --git a/src/telemetry.rs b/src/telemetry.rs new file mode 100644 index 000000000..ebc6fb10f --- /dev/null +++ b/src/telemetry.rs @@ -0,0 +1,82 @@ +/* + * Parseable Server (C) 2022 - 2025 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use opentelemetry::global; +use opentelemetry::trace::TracerProvider; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::propagation::TraceContextPropagator; +use opentelemetry_sdk::trace::{BatchSpanProcessor, SdkTracerProvider}; +use opentelemetry_sdk::Resource; +use tracing_opentelemetry::OpenTelemetryLayer; + +/// Initializes the OpenTelemetry tracer provider if `OTEL_EXPORTER_OTLP_ENDPOINT` is set. +/// +/// Returns `Some(SdkTracerProvider)` when tracing is configured, `None` otherwise. +/// The caller must call `provider.shutdown()` before process exit to flush buffered spans. +pub fn init_otel_tracer() -> Option { + let endpoint = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").ok()?; + if endpoint.is_empty() { + return None; + } + + // Register the W3C TraceContext propagator globally. + // This is REQUIRED unconditionally for cross-runtime context propagation + // (e.g., QUERY_RUNTIME uses a separate OS thread pool). + global::set_text_map_propagator(TraceContextPropagator::new()); + + let protocol = std::env::var("OTEL_EXPORTER_OTLP_PROTOCOL").unwrap_or_default(); + + let exporter = match protocol.as_str() { + "grpc" => opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_endpoint(&endpoint) + .build() + .expect("Failed to build gRPC OTLP span exporter"), + _ => opentelemetry_otlp::SpanExporter::builder() + .with_http() + .with_endpoint(&endpoint) + .build() + .expect("Failed to build HTTP/JSON OTLP span exporter"), + }; + + let processor = BatchSpanProcessor::builder(exporter).build(); + + let resource = Resource::builder() + .with_service_name("parseable") + .build(); + + let provider = SdkTracerProvider::builder() + .with_span_processor(processor) + .with_resource(resource) + .build(); + + Some(provider) +} + +/// Builds a `tracing_opentelemetry::OpenTelemetryLayer` from the given provider. +/// +/// Compose this layer into the `tracing_subscriber::Registry` alongside other layers. +pub fn build_otel_layer( + provider: &SdkTracerProvider, +) -> OpenTelemetryLayer +where + S: tracing::Subscriber + for<'span> tracing_subscriber::registry::LookupSpan<'span>, +{ + let tracer = provider.tracer("parseable"); + tracing_opentelemetry::layer().with_tracer(tracer) +}