Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
3c2dd39
feat(sql): migrate to DataFusion-based streaming SQL planner
luoluoyuyu Mar 15, 2026
18c76c8
update
luoluoyuyu Mar 16, 2026
a42f5a3
update
luoluoyuyu Mar 16, 2026
29b19d9
update
luoluoyuyu Mar 17, 2026
67b65a9
update
luoluoyuyu Mar 18, 2026
1821c0f
update
luoluoyuyu Mar 18, 2026
3c94267
update
luoluoyuyu Mar 21, 2026
94879da
update
luoluoyuyu Mar 21, 2026
d647ea1
update
luoluoyuyu Mar 21, 2026
13e1341
update
luoluoyuyu Mar 21, 2026
c830cbb
update
luoluoyuyu Mar 21, 2026
e768a48
update
luoluoyuyu Mar 22, 2026
cdb6ddb
update
luoluoyuyu Mar 22, 2026
27bd75c
update
luoluoyuyu Mar 22, 2026
b4149bc
update
luoluoyuyu Mar 23, 2026
3b86ea0
update
luoluoyuyu Mar 24, 2026
f54301f
update
luoluoyuyu Mar 24, 2026
ee03dc8
update
luoluoyuyu Mar 24, 2026
5dc090c
update
luoluoyuyu Mar 25, 2026
97b978e
update
luoluoyuyu Mar 26, 2026
7842995
update
luoluoyuyu Mar 28, 2026
157e13d
update
luoluoyuyu Mar 28, 2026
de79169
update
luoluoyuyu Mar 29, 2026
58a9e5c
update
luoluoyuyu Mar 29, 2026
0321c50
update
luoluoyuyu Mar 29, 2026
9b41175
update
luoluoyuyu Mar 29, 2026
5b596f2
update
luoluoyuyu Mar 29, 2026
18a19f1
update
luoluoyuyu Mar 29, 2026
561da59
update
luoluoyuyu Mar 29, 2026
728c750
update
luoluoyuyu Mar 29, 2026
b64e4ce
update
luoluoyuyu Mar 30, 2026
53655b4
update
luoluoyuyu Mar 30, 2026
9bed7e7
update
luoluoyuyu Mar 30, 2026
174ebaa
update
luoluoyuyu Mar 30, 2026
87f7722
update
luoluoyuyu Mar 31, 2026
249bd5d
update
luoluoyuyu Mar 31, 2026
52610ec
update
luoluoyuyu Apr 1, 2026
d1bf1c7
update
luoluoyuyu Apr 1, 2026
871a1cf
update
luoluoyuyu Apr 1, 2026
ff5ec46
update
luoluoyuyu Apr 1, 2026
d4387f9
update
luoluoyuyu Apr 1, 2026
c842c0b
update
luoluoyuyu Apr 1, 2026
29d7a4b
update
luoluoyuyu Apr 1, 2026
7b1f959
update
luoluoyuyu Apr 1, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,168 changes: 2,058 additions & 110 deletions Cargo.lock

Large diffs are not rendered by default.

46 changes: 39 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,57 @@ tonic = { version = "0.12", features = ["default"] }
async-trait = "0.1"
num_cpus = "1.0"
protocol = { path = "./protocol" }
prost = "0.13"
rdkafka = { version = "0.38", features = ["cmake-build", "ssl", "gssapi"] }
crossbeam-channel = "0.5"
pest = "2.7"
pest_derive = "2.7"
clap = { version = "4.5", features = ["derive"] }
wasmtime = { version = "41.0.3", features = ["component-model", "async"] }
base64 = "0.22"
wasmtime-wasi = "41.0.3"
rocksdb = { version = "0.21", features = ["multi-threaded-cf", "lz4"] }
bincode = "1.3"
bincode = { version = "2", features = ["serde"] }
chrono = "0.4"
tokio-stream = "0.1.18"
lru = "0.12"
parking_lot = "0.12"
arrow-array = "52"
arrow-ipc = "52"
arrow-schema = "52"
arrow = { version = "55", default-features = false }
arrow-array = "55"
arrow-ipc = "55"
arrow-schema = { version = "55", features = ["serde"] }
futures = "0.3"
serde_json_path = "0.7"
xxhash-rust = { version = "0.8", features = ["xxh3"] }
proctitle = "0.1"
unicase = "2.7"
petgraph = "0.7"
rand = { version = "0.8", features = ["small_rng"] }
itertools = "0.14"
strum = { version = "0.26", features = ["derive"] }

typify = { git = 'https://github.com/ArroyoSystems/typify.git', branch = 'arroyo' }
parquet = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '55.2.0/parquet'}
arrow-json = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '55.2.0/json'}
datafusion = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
datafusion-common = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
datafusion-execution = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
datafusion-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
datafusion-physical-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
datafusion-functions = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}
datafusion-functions-window = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '48.0.1/arroyo'}

sqlparser = { git = "https://github.com/FunctionStream/sqlparser-rs", branch = "0.6.0/function-sql-parser" }

ahash = "0.8"
governor = "0.8.0"
mini-moka = "0.10"
sha2 = "0.10"
hex = "0.4"

[features]
default = ["incremental-cache", "python"]
incremental-cache = ["wasmtime/incremental-cache"]
python = []

[patch."https://github.com/ArroyoSystems/sqlparser-rs"]
sqlparser = { git = "https://github.com/FunctionStream/sqlparser-rs", branch = "0.6.0/function-sql-parser" }
21 changes: 12 additions & 9 deletions README-zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

[中文](README-zh.md) | [English](README.md)

**Function Stream** 是一个基于 Rust 构建的高性能、事件驱动的流处理框架。它提供了一个模块化的运行时,用于编排编译为 **WebAssembly (WASM)** 的 Serverless 风格处理函数,支持使用 **Go、Python 和 Rust** 编写函数。
**Function Stream** 是一个基于 Rust 构建的高性能、事件驱动的流处理框架。它提供了一个模块化的运行时,用于编排编译为 **WebAssembly (WASM)** 的 Serverless 风格处理函数,支持使用 **Go、Python 和 Rust** 编写函数。同时内置 **Streaming SQL** 引擎,可通过纯声明式 SQL 构建实时数据管道 — 包括时间窗口聚合、多流关联和持续 ETL。

## 目录

Expand All @@ -46,6 +46,7 @@

## 核心特性

- **Streaming SQL 引擎**:使用纯 SQL 构建实时管道 — 注册数据源(`CREATE TABLE`)、启动持续计算(`CREATE STREAMING TABLE ... AS SELECT`)、管理生命周期(`SHOW` / `DROP`)。支持滚动窗口、滑动窗口、窗口关联等丰富语义。
- **事件驱动的 WASM 运行时**:以接近原生的性能和沙箱隔离的方式执行多语言函数(Go、Python、Rust)。
- **持久化状态管理**:内置支持基于 RocksDB 的状态存储,用于有状态流处理。
- **SQL 驱动的 CLI**:使用类 SQL 命令进行作业管理和流检测的交互式 REPL。
Expand Down Expand Up @@ -200,14 +201,16 @@ function-stream-<version>/

## 文档

| 文档 | 描述 |
|------------------------------------------------------|---------------|
| [服务端配置与运维指南](docs/server-configuration-zh.md) | 服务端配置与运维操作 |
| [Function 任务配置规范](docs/function-configuration-zh.md) | 任务定义规范 |
| [SQL CLI 交互式管理指南](docs/sql-cli-guide-zh.md) | 交互式管理指南 |
| [Function 管理与开发指南](docs/function-development-zh.md) | 管理与开发指南 |
| [Go SDK 开发与交互指南](docs/Go-SDK/go-sdk-guide-zh.md) | Go SDK 指南 |
| [Python SDK 开发与交互指南](docs/Python-SDK/python-sdk-guide-zh.md) | Python SDK 指南 |
| 文档 | 描述 |
|------------------------------------------------------------------------|--------------------------|
| [Streaming SQL 使用指南](docs/streaming-sql-guide-zh.md) | 声明式 SQL 实时流处理指南 |
| [连接器、格式与类型参考](docs/connectors-and-formats-zh.md) | 支持的 Source/Sink、格式与数据类型 |
| [服务端配置与运维指南](docs/server-configuration-zh.md) | 服务端配置与运维操作 |
| [Function 任务配置规范](docs/function-configuration-zh.md) | 任务定义规范 |
| [SQL CLI 交互式管理指南](docs/sql-cli-guide-zh.md) | 交互式管理指南 |
| [Function 管理与开发指南](docs/function-development-zh.md) | 管理与开发指南 |
| [Go SDK 开发与交互指南](docs/Go-SDK/go-sdk-guide-zh.md) | Go SDK 指南 |
| [Python SDK 开发与交互指南](docs/Python-SDK/python-sdk-guide-zh.md) | Python SDK 指南 |

## 配置

Expand Down
21 changes: 12 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

[中文](README-zh.md) | [English](README.md)

**Function Stream** is a high-performance, event-driven stream processing framework built in Rust. It provides a modular runtime to orchestrate serverless-style processing functions compiled to **WebAssembly (WASM)**, supporting functions written in **Go, Python, and Rust**.
**Function Stream** is a high-performance, event-driven stream processing framework built in Rust. It provides a modular runtime to orchestrate serverless-style processing functions compiled to **WebAssembly (WASM)**, supporting functions written in **Go, Python, and Rust**. It also features a **Streaming SQL** engine that lets you build real-time data pipelines — including time-windowed aggregations, multi-stream joins, and continuous ETL — using pure declarative SQL.

## Table of Contents

Expand All @@ -46,6 +46,7 @@

## Key Features

* **Streaming SQL Engine**: Build real-time pipelines with pure SQL — register sources (`CREATE TABLE`), launch continuous computations (`CREATE STREAMING TABLE ... AS SELECT`), and manage lifecycle (`SHOW` / `DROP`). Supports tumbling windows, hopping windows, window joins, and more.
* **Event-Driven WASM Runtime**: Executes polyglot functions (Go, Python, Rust) with near-native performance and sandboxed isolation.
* **Durable State Management**: Built-in support for RocksDB-backed state stores for stateful stream processing.
* **SQL-Powered CLI**: Interactive REPL for job management and stream inspection using SQL-like commands.
Expand Down Expand Up @@ -199,14 +200,16 @@ We provide a robust shell script to manage the server process, capable of handli

## Documentation

| Document | Description |
|----------------------------------------------------------|-----------------------------------|
| [Server Configuration](docs/server-configuration.md) | Server Configuration & Operations |
| [Function Configuration](docs/function-configuration.md) | Task Definition Specification |
| [SQL CLI Guide](docs/sql-cli-guide.md) | Interactive Management Guide |
| [Function Development](docs/function-development.md) | Management & Development Guide |
| [Go SDK Guide](docs/Go-SDK/go-sdk-guide.md) | Go SDK Guide |
| [Python SDK Guide](docs/Python-SDK/python-sdk-guide.md) | Python SDK Guide |
| Document | Description |
|----------------------------------------------------------------|-------------------------------------------------|
| [Streaming SQL Guide](docs/streaming-sql-guide.md) | Declarative SQL for Real-Time Stream Processing |
| [Connectors, Formats & Types](docs/connectors-and-formats.md) | Supported Sources, Sinks, Formats & Data Types |
| [Server Configuration](docs/server-configuration.md) | Server Configuration & Operations |
| [Function Configuration](docs/function-configuration.md) | Task Definition Specification |
| [SQL CLI Guide](docs/sql-cli-guide.md) | Interactive Management Guide |
| [Function Development](docs/function-development.md) | Management & Development Guide |
| [Go SDK Guide](docs/Go-SDK/go-sdk-guide.md) | Go SDK Guide |
| [Python SDK Guide](docs/Python-SDK/python-sdk-guide.md) | Python SDK Guide |

## Configuration

Expand Down
2 changes: 0 additions & 2 deletions cli/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ arrow-array = "52"
arrow-ipc = "52"
arrow-schema = "52"
comfy-table = "7"
function-stream = { path = "../../" }
protocol = { path = "../../protocol" }
clap = { version = "4.5", features = ["derive"] }
thiserror = "2"
tokio = { version = "1.0", features = ["full", "signal"] }
tonic = { version = "0.12", features = ["default"] }
rustyline = { version = "14.0", features = ["with-dirs"] }
Expand Down
52 changes: 44 additions & 8 deletions cli/cli/src/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,62 @@ use comfy_table::{Attribute, Cell, Color, ContentArrangement, Table, TableCompon
use protocol::cli::{function_stream_service_client::FunctionStreamServiceClient, SqlRequest};
use rustyline::error::ReadlineError;
use rustyline::{Config, DefaultEditor, EditMode};
use std::fmt;
use std::io::{self, Cursor, Write};
use std::sync::Arc;
use tokio::sync::Mutex;
use tonic::Request;

#[derive(Debug, thiserror::Error)]
/// CLI errors.
///
/// **Important:** [`tonic::Status`] must not be formatted with `{}` — its [`fmt::Display`] dumps
/// `details` / `metadata` (e.g. HTTP headers). Only [`tonic::Status::message`] is stored in
/// [`ReplError::Rpc`].
#[derive(Debug)]
pub enum ReplError {
#[error("RPC error: {0}")]
Rpc(Box<tonic::Status>),
#[error("Connection failed: {0}")]
Rpc(String),
Connection(String),
#[error("Internal error: {0}")]
Internal(String),
#[error("IO error: {0}")]
Io(#[from] io::Error),
Io(io::Error),
}

impl fmt::Display for ReplError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ReplError::Rpc(s) => f.write_str(s),
ReplError::Connection(s) => f.write_str(s),
ReplError::Internal(s) => write!(f, "Internal error: {s}"),
ReplError::Io(e) => write!(f, "IO error: {e}"),
}
}
}

impl std::error::Error for ReplError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
ReplError::Io(e) => Some(e),
_ => None,
}
}
}

impl From<io::Error> for ReplError {
fn from(e: io::Error) -> Self {
ReplError::Io(e)
}
}

impl From<tonic::Status> for ReplError {
fn from(s: tonic::Status) -> Self {
ReplError::Rpc(Box::new(s))
let msg = s.message();
if msg.is_empty() {
ReplError::Rpc(format!(
"gRPC {} (server returned no message)",
s.code()
))
} else {
ReplError::Rpc(msg.to_string())
}
}
}

Expand Down
7 changes: 7 additions & 0 deletions conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,10 @@ task_storage:

# Maximum bytes for level base in bytes (optional)
max_bytes_for_level_base: 268435456

# Stream table catalog (SQL: CREATE TABLE connector sources, SHOW TABLES, SHOW CREATE TABLE).
# When persist is true (default), metadata is stored under RocksDB at db_path (default: data/stream_catalog)
# and reloaded after process restart. Set persist: false only for tests/ephemeral nodes.
stream_catalog:
persist: true
# db_path: data/stream_catalog
Loading
Loading