[client] Support log scanner scan to arrow record batch#2995
Draft
luoyuxia wants to merge 1 commit intoapache:mainfrom
Draft
[client] Support log scanner scan to arrow record batch#2995luoyuxia wants to merge 1 commit intoapache:mainfrom
luoyuxia wants to merge 1 commit intoapache:mainfrom
Conversation
bfa46ef to
e29e8c5
Compare
Contributor
There was a problem hiding this comment.
Pull request overview
This PR adds an Arrow-native scan path so Fluss clients can poll log data as Arrow VectorSchemaRoot batches (instead of iterating row-by-row), including support for schema evolution (missing columns filled with nulls) and projection reordering.
Changes:
- Introduces
ArrowLogScanner/ArrowScanRecordsand theArrowBatchDatacontainer API for polling Arrow record batches from the client. - Adds unshaded-Arrow batch loading + client-side projection utilities (
UnshadedArrowReadUtils,UnshadedFlussVectorLoader, unshaded compression codecs/factory). - Refactors existing log scanner/collector code to share implementation via new abstract base classes and adds tests covering Arrow-batch loading and schema evolution.
Reviewed changes
Copilot reviewed 28 out of 28 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java | Adds loadArrowBatch(...) API and Arrow-specific methods to ReadContext. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java | Adds selected row type + Arrow column projection calculation; makes Arrow resources lazily created. |
| fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java | Implements loadArrowBatch(...) for Arrow log batches, including optional client-side projection. |
| fluss-common/src/main/java/org/apache/fluss/record/ArrowBatchData.java | New public container holding VectorSchemaRoot + log metadata + change types; closeable. |
| fluss-common/src/main/java/org/apache/fluss/utils/UnshadedArrowReadUtils.java | New utilities to deserialize unshaded Arrow batches and project/reorder vectors. |
| fluss-common/src/main/java/org/apache/fluss/record/UnshadedFlussVectorLoader.java | New unshaded variant of FlussVectorLoader for scanner/read path. |
| fluss-common/src/main/java/org/apache/fluss/compression/Unshaded*ArrowCompressionCodec.java | New unshaded Arrow compression codecs for LZ4/Zstd. |
| fluss-common/src/main/java/org/apache/fluss/compression/UnshadedArrowCompressionFactory.java | Factory providing unshaded compression codecs for read path. |
| fluss-common/src/main/java/org/apache/fluss/row/ProjectedRow.java | Exposes index mapping via new accessor. |
| fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java | Delegates loadArrowBatch() for file-backed batches. |
| fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java | Adds schema-evolution test for missing projected columns in Arrow batches. |
| fluss-common/src/test/java/org/apache/fluss/record/FileLogInputStreamTest.java | Adds test to load Arrow batch from file log input stream. |
| fluss-common/pom.xml | Adds unshaded Arrow dependencies (currently provided). |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java | Adds createArrowLogScanner() API. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java | Implements createArrowLogScanner() with log-format/limit validation. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ArrowLogScanner*.java | New Arrow scanner interfaces + implementation. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ArrowScanRecords.java | New container for scanned Arrow batches per bucket. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java | Refactors to share logic via AbstractLogScanner. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/AbstractLogScanner.java | New shared scanner implementation (poll/subscribe/unsubscribe/wakeup/close). |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java | Adds Arrow fetch collection path. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ArrowLogFetchCollector.java | New fetch collector producing ArrowScanRecords. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/AbstractLogFetchCollector.java | New shared fetch-collection implementation. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java | Refactors to extend AbstractLogFetchCollector. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/CompletedFetch.java | Adds fetchArrowBatches(...) path that loads batches via loadArrowBatch(...). |
| fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogScannerITCase.java | Adds IT coverage for Arrow scanning, schema evolution, and projection reorder. |
| fluss-client/pom.xml | Adds unshaded Arrow dependencies (currently provided). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/row/ProjectedRow.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/record/UnshadedFlussVectorLoader.java
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
Outdated
Show resolved
Hide resolved
b865197 to
1432afb
Compare
Contributor
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 24 out of 24 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
fluss-common/src/main/java/org/apache/fluss/utils/UnshadedArrowReadUtils.java
Show resolved
Hide resolved
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/CompletedFetch.java
Show resolved
Hide resolved
1432afb to
e897604
Compare
e897604 to
1daf809
Compare
1daf809 to
433fd85
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Purpose
Linked issue: close #2965
Support
LogScannerto scan log data as Arrow record batches (ArrowBatchData) instead of row-by-rowScanRecord, enabling consumers like the tiering service to process columnar Arrow batches directly.Brief change log
LogScannerImpl.pollRecordBatch(Duration)returningArrowScanRecordsfor ARROW-format append-only log tables.ArrowBatchDatato hold scanned ArrowVectorSchemaRootwith log metadata and memory ownership.LogRecordBatch.loadArrowBatch(ReadContext)to load Arrow batch directly from batch memory.ArrowRecordBatchContext/UnshadedArrowBatchAccessto bridge shaded/unshaded Arrow types with batch-scoped child allocators.UnshadedArrowReadUtilsfor the unshaded read path with schema evolution support.AbstractLogFetchCollector<T, R>to share fetch-collection logic between row-based and Arrow batch paths.CompletedFetch.nextFetchedBatch()/finishFetchedBatches()and generalizeLogScannerImpl.doPoll()for code reuse.Tests
LogScannerITCase.testPollArrowBatchesWithSchemaEvolutionFileLogInputStreamTest.testLoadArrowBatchFromFileLogInputStreamAPI and Format
@InternalAPI:LogScannerImpl.pollRecordBatch(Duration),ArrowBatchData,ArrowScanRecords.Documentation
N/A