Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ opentelemetry-proto = { git = "https://github.com/open-telemetry/opentelemetry-r
"metrics",
"trace",
] }
opentelemetry = "0.29"
opentelemetry_sdk = { version = "0.29", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.29", features = ["http-json", "grpc-tonic"] }
tracing-opentelemetry = "0.30"
prometheus = { version = "0.13.4", default-features = false, features = ["process"] }
prometheus-parse = "0.2.5"
tracing = "0.1"
Expand Down
36 changes: 24 additions & 12 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use tokio::task::JoinSet;
use tracing::{error, warn};
use tracing::{error, instrument, warn};

use crate::event::{DEFAULT_TIMESTAMP_KEY, commit_schema};
use crate::metrics::{QUERY_EXECUTE_TIME, increment_query_calls_by_date};
Expand Down Expand Up @@ -115,6 +115,7 @@ pub async fn get_records_and_fields(
Ok((Some(records), Some(fields)))
}

#[instrument(name = "POST /query", skip_all, fields(http.request.method = "POST", http.route = "/query"))]
pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpResponse, QueryError> {
let mut session_state = QUERY_SESSION.get_ctx().state();
let time_range =
Expand Down Expand Up @@ -179,6 +180,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
///
/// # Returns
/// - `HttpResponse` with the count result as JSON, including fields if requested.
#[instrument(name = "handle_count_query", skip_all, fields(db.collection.name = %table_name))]
async fn handle_count_query(
query_request: &Query,
table_name: &str,
Expand Down Expand Up @@ -230,6 +232,7 @@ async fn handle_count_query(
///
/// # Returns
/// - `HttpResponse` with the full query result as a JSON object.
#[instrument(name = "handle_non_streaming_query", skip_all, fields(db.collection.name))]
async fn handle_non_streaming_query(
query: LogicalQuery,
table_name: Vec<String>,
Expand All @@ -238,6 +241,7 @@ async fn handle_non_streaming_query(
tenant_id: &Option<String>,
) -> Result<HttpResponse, QueryError> {
let first_table_name = table_name[0].clone();
tracing::Span::current().record("db.collection.name", &first_table_name.as_str());
let (records, fields) = execute(query, query_request.streaming, tenant_id).await?;
let records = match records {
Either::Left(rbs) => rbs,
Expand Down Expand Up @@ -283,6 +287,7 @@ async fn handle_non_streaming_query(
///
/// # Returns
/// - `HttpResponse` streaming the query results as NDJSON, optionally prefixed with the fields array.
#[instrument(name = "handle_streaming_query", skip_all, fields(db.collection.name))]
async fn handle_streaming_query(
query: LogicalQuery,
table_name: Vec<String>,
Expand All @@ -291,6 +296,7 @@ async fn handle_streaming_query(
tenant_id: &Option<String>,
) -> Result<HttpResponse, QueryError> {
let first_table_name = table_name[0].clone();
tracing::Span::current().record("db.collection.name", &first_table_name.as_str());
let (records_stream, fields) = execute(query, query_request.streaming, tenant_id).await?;
let records_stream = match records_stream {
Either::Left(_) => {
Expand Down Expand Up @@ -516,6 +522,7 @@ pub async fn update_schema_when_distributed(
/// Create streams for querier if they do not exist
/// get list of streams from memory and storage
/// create streams for memory from storage if they do not exist
#[instrument(name = "create_streams_for_distributed", skip_all)]
pub async fn create_streams_for_distributed(
streams: Vec<String>,
tenant_id: &Option<String>,
Expand All @@ -526,17 +533,21 @@ pub async fn create_streams_for_distributed(
let mut join_set = JoinSet::new();
for stream_name in streams {
let id = tenant_id.to_owned();
join_set.spawn(async move {
let result = PARSEABLE
.create_stream_and_schema_from_storage(&stream_name, &id)
.await;

if let Err(e) = &result {
warn!("Failed to create stream '{}': {}", stream_name, e);
}

(stream_name, result)
});
let span = tracing::Span::current();
join_set.spawn(tracing::Instrument::instrument(
async move {
let result = PARSEABLE
.create_stream_and_schema_from_storage(&stream_name, &id)
.await;

if let Err(e) = &result {
warn!("Failed to create stream '{}': {}", stream_name, e);
}

(stream_name, result)
},
span,
));
}

while let Some(result) = join_set.join_next().await {
Expand Down Expand Up @@ -579,6 +590,7 @@ impl FromRequest for Query {
}
}

#[instrument(name = "into_query", skip_all, fields(db.query.text = %query.query))]
pub async fn into_query(
query: &Query,
session_state: &SessionState,
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ mod static_schema;
mod stats;
pub mod storage;
pub mod sync;
pub mod telemetry;
pub mod tenants;
pub mod users;
pub mod utils;
Expand Down
35 changes: 28 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ use std::process::exit;
*/
#[cfg(feature = "kafka")]
use parseable::connectors;
use opentelemetry_sdk::trace::SdkTracerProvider;
use parseable::{
IngestServer, ParseableServer, QueryServer, Server, banner, metrics, option::Mode,
parseable::PARSEABLE, rbac, storage,
parseable::PARSEABLE, rbac, storage, telemetry,
};
use tokio::signal::ctrl_c;
use tokio::sync::oneshot;
Expand All @@ -33,7 +34,7 @@ use tracing_subscriber::{EnvFilter, Registry, fmt};

#[actix_web::main]
async fn main() -> anyhow::Result<()> {
init_logger();
let otel_provider = init_logger();
// Install the rustls crypto provider before any TLS operations.
// This is required for rustls 0.23+ which needs an explicit crypto provider.
// If the installation fails, log a warning but continue execution.
Expand Down Expand Up @@ -95,10 +96,17 @@ async fn main() -> anyhow::Result<()> {
parseable_server.await?;
}

// Flush any buffered OTel spans before exit
if let Some(provider) = otel_provider {
if let Err(e) = provider.shutdown() {
warn!("Failed to shut down OTel tracer provider: {:?}", e);
}
}

Ok(())
}

pub fn init_logger() {
pub fn init_logger() -> Option<SdkTracerProvider> {
let filter_layer = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
let default_level = if cfg!(debug_assertions) {
Level::DEBUG
Expand All @@ -116,10 +124,23 @@ pub fn init_logger() {
.with_target(true)
.compact();

Registry::default()
.with(filter_layer)
.with(fmt_layer)
.init();
let otel_provider = telemetry::init_otel_tracer();

if let Some(ref provider) = otel_provider {
let otel_layer = telemetry::build_otel_layer(provider);
Registry::default()
.with(filter_layer)
.with(fmt_layer)
.with(otel_layer)
.init();
} else {
Registry::default()
.with(filter_layer)
.with(fmt_layer)
.init();
}

otel_provider
}

#[cfg(windows)]
Expand Down
32 changes: 31 additions & 1 deletion src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use sysinfo::System;
use tokio::runtime::Runtime;
use tracing::instrument;

use self::error::ExecuteError;
use self::stream_schema_provider::GlobalSchemaProvider;
Expand Down Expand Up @@ -133,6 +134,7 @@ impl InMemorySessionContext {

/// This function executes a query on the dedicated runtime, ensuring that the query is not isolated to a single thread/CPU
/// at a time and has access to the entire thread pool, enabling better concurrent processing, and thus quicker results.
#[instrument(name = "query.execute", skip_all, fields(query.streaming = %is_streaming))]
pub async fn execute(
query: Query,
is_streaming: bool,
Expand Down Expand Up @@ -165,8 +167,33 @@ pub async fn execute(
ExecuteError,
> {
let id = tenant_id.clone();

// QUERY_RUNTIME is a separate Runtime::new() (different OS thread pool).
// tracing::Span does NOT propagate OTel context across OS threads.
// Use W3C TraceContext propagation to preserve the trace ID.
let mut carrier = std::collections::HashMap::new();
{
use tracing_opentelemetry::OpenTelemetrySpanExt;
let cx = tracing::Span::current().context();
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(&cx, &mut carrier);
});
}

QUERY_RUNTIME
.spawn(async move { query.execute(is_streaming, &id).await })
.spawn(async move {
// Extract the propagated context on the QUERY_RUNTIME thread
let parent_cx = opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.extract(&carrier)
});
let span = tracing::info_span!("query.execute_runtime");
{
use tracing_opentelemetry::OpenTelemetrySpanExt;
span.set_parent(parent_cx);
}
let _guard = span.enter();
query.execute(is_streaming, &id).await
})
.await
.expect("The Join should have been successful")
}
Expand Down Expand Up @@ -272,6 +299,7 @@ impl Query {
/// this function returns the result of the query
/// if streaming is true, it returns a stream
/// if streaming is false, it returns a vector of record batches
#[instrument(name = "Query.execute_datafusion", skip_all, fields(query.streaming = %is_streaming))]
pub async fn execute(
&self,
is_streaming: bool,
Expand Down Expand Up @@ -526,6 +554,7 @@ impl CountsRequest {
/// This function is supposed to read maninfest files for the given stream,
/// get the sum of `num_rows` between the `startTime` and `endTime`,
/// divide that by number of bins and return in a manner acceptable for the console
#[instrument(name = "get_bin_density", skip_all, fields(db.collection.name = %self.stream))]
pub async fn get_bin_density(
&self,
tenant_id: &Option<String>,
Expand Down Expand Up @@ -731,6 +760,7 @@ pub fn resolve_stream_names(sql: &str) -> Result<Vec<String>, anyhow::Error> {
Ok(tables)
}

#[instrument(name = "get_manifest_list", skip_all, fields(db.collection.name = %stream_name))]
pub async fn get_manifest_list(
stream_name: &str,
time_range: &TimeRange,
Expand Down
82 changes: 82 additions & 0 deletions src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Parseable Server (C) 2022 - 2025 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use opentelemetry::global;
use opentelemetry::trace::TracerProvider;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::trace::{BatchSpanProcessor, SdkTracerProvider};
use opentelemetry_sdk::Resource;
use tracing_opentelemetry::OpenTelemetryLayer;

/// Initializes the OpenTelemetry tracer provider if `OTEL_EXPORTER_OTLP_ENDPOINT` is set.
///
/// Returns `Some(SdkTracerProvider)` when tracing is configured, `None` otherwise.
/// The caller must call `provider.shutdown()` before process exit to flush buffered spans.
pub fn init_otel_tracer() -> Option<SdkTracerProvider> {
let endpoint = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").ok()?;
if endpoint.is_empty() {
return None;
}

// Register the W3C TraceContext propagator globally.
// This is REQUIRED unconditionally for cross-runtime context propagation
// (e.g., QUERY_RUNTIME uses a separate OS thread pool).
global::set_text_map_propagator(TraceContextPropagator::new());

let protocol = std::env::var("OTEL_EXPORTER_OTLP_PROTOCOL").unwrap_or_default();

let exporter = match protocol.as_str() {
"grpc" => opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(&endpoint)
.build()
.expect("Failed to build gRPC OTLP span exporter"),
_ => opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_endpoint(&endpoint)
.build()
.expect("Failed to build HTTP/JSON OTLP span exporter"),
};

let processor = BatchSpanProcessor::builder(exporter).build();

let resource = Resource::builder()
.with_service_name("parseable")
.build();

let provider = SdkTracerProvider::builder()
.with_span_processor(processor)
.with_resource(resource)
.build();

Some(provider)
}

/// Builds a `tracing_opentelemetry::OpenTelemetryLayer` from the given provider.
///
/// Compose this layer into the `tracing_subscriber::Registry` alongside other layers.
pub fn build_otel_layer<S>(
provider: &SdkTracerProvider,
) -> OpenTelemetryLayer<S, opentelemetry_sdk::trace::SdkTracer>
where
S: tracing::Subscriber + for<'span> tracing_subscriber::registry::LookupSpan<'span>,
{
let tracer = provider.tracer("parseable");
tracing_opentelemetry::layer().with_tracer(tracer)
}
Loading