From 8b5df9814448906ef62396fb9578292d6ebcb9d8 Mon Sep 17 00:00:00 2001 From: Wu Sheng Date: Sun, 12 Apr 2026 20:47:04 +0800 Subject: [PATCH 1/2] feat: add sw_threading plugin for automatic cross-thread context propagation Previously, @runnable captured the snapshot at decoration time, which only worked when the decorator was applied inline during an active request. When used at module level (the natural Python pattern), the snapshot was None because no trace context existed at import time. The new sw_threading plugin patches Thread.__init__ to capture the snapshot on the parent thread, and Thread.run to automatically create a local span with CrossThread ref on the child thread. This makes cross-thread trace propagation transparent without any decorator. @runnable is updated to read the snapshot from the Thread object at execution time, so both module-level and inline usage now work correctly. When sw_threading detects the target is @runnable-wrapped, it defers to @runnable for span creation. Closes apache/skywalking#11605 Co-Authored-By: Claude Opus 4.6 (1M context) --- skywalking/decorators.py | 10 +- skywalking/plugins/sw_threading.py | 54 +++++++++ tests/plugin/web/sw_threading/__init__.py | 16 +++ .../web/sw_threading/docker-compose.yml | 65 ++++++++++ .../plugin/web/sw_threading/expected.data.yml | 113 ++++++++++++++++++ .../web/sw_threading/services/__init__.py | 16 +++ .../web/sw_threading/services/consumer.py | 40 +++++++ .../web/sw_threading/services/provider.py | 32 +++++ .../plugin/web/sw_threading/test_threading.py | 33 +++++ 9 files changed, 376 insertions(+), 3 deletions(-) create mode 100644 skywalking/plugins/sw_threading.py create mode 100644 tests/plugin/web/sw_threading/__init__.py create mode 100644 tests/plugin/web/sw_threading/docker-compose.yml create mode 100644 tests/plugin/web/sw_threading/expected.data.yml create mode 100644 tests/plugin/web/sw_threading/services/__init__.py create mode 100644 tests/plugin/web/sw_threading/services/consumer.py create mode 100644 tests/plugin/web/sw_threading/services/provider.py create mode 100644 tests/plugin/web/sw_threading/test_threading.py diff --git a/skywalking/decorators.py b/skywalking/decorators.py index d9fb2116..61dbe11f 100644 --- a/skywalking/decorators.py +++ b/skywalking/decorators.py @@ -73,14 +73,17 @@ def runnable( tags: List[Tag] = None, ): def decorator(func): - snapshot = get_context().capture() + _op = op or f'Thread/{func.__name__}' @wraps(func) def wrapper(*args, **kwargs): - _op = op or f'Thread/{func.__name__}' + import threading + snapshot = getattr(threading.current_thread(), '_sw_snapshot', None) + context = get_context() with context.new_local_span(op=_op) as span: - context.continued(snapshot) + if snapshot is not None: + context.continued(snapshot) span.layer = layer span.component = component if tags: @@ -88,6 +91,7 @@ def wrapper(*args, **kwargs): span.tag(tag) func(*args, **kwargs) + wrapper._sw_runnable = True return wrapper return decorator diff --git a/skywalking/plugins/sw_threading.py b/skywalking/plugins/sw_threading.py new file mode 100644 index 00000000..fb995a33 --- /dev/null +++ b/skywalking/plugins/sw_threading.py @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import threading + +from skywalking.trace.context import get_context + +link_vector = ['https://docs.python.org/3/library/threading.html'] +support_matrix = { + 'threading': { + '>=3.7': ['*'] + } +} +note = """Automatically propagates trace context across threads.""" + + +def install(): + _original_init = threading.Thread.__init__ + _original_run = threading.Thread.run + + def _sw_init(self, *args, **kwargs): + _original_init(self, *args, **kwargs) + self._sw_snapshot = get_context().capture() + + def _sw_run(self): + snapshot = getattr(self, '_sw_snapshot', None) + target = getattr(self, '_target', None) + + # If target is @runnable-wrapped, let @runnable handle the span and continued + if snapshot is not None and not getattr(target, '_sw_runnable', False): + context = get_context() + op = f'Thread/{target.__name__}' if target and hasattr(target, '__name__') else f'Thread/{self.name}' + with context.new_local_span(op=op): + context.continued(snapshot) + _original_run(self) + else: + _original_run(self) + + threading.Thread.__init__ = _sw_init + threading.Thread.run = _sw_run diff --git a/tests/plugin/web/sw_threading/__init__.py b/tests/plugin/web/sw_threading/__init__.py new file mode 100644 index 00000000..b1312a09 --- /dev/null +++ b/tests/plugin/web/sw_threading/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/plugin/web/sw_threading/docker-compose.yml b/tests/plugin/web/sw_threading/docker-compose.yml new file mode 100644 index 00000000..b443714b --- /dev/null +++ b/tests/plugin/web/sw_threading/docker-compose.yml @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +version: '2.1' + +services: + collector: + extends: + service: collector + file: ../../docker-compose.base.yml + + provider: + extends: + service: agent + file: ../../docker-compose.base.yml + ports: + - 9091:9091 + volumes: + - .:/app + command: ['bash', '-c', 'pip install flask itsdangerous==2.0.1 "Werkzeug<3" && pip install -r /app/requirements.txt && sw-python run python3 /app/services/provider.py'] + depends_on: + collector: + condition: service_healthy + healthcheck: + test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9091"] + interval: 5s + timeout: 60s + retries: 120 + environment: + SW_AGENT_NAME: provider + SW_AGENT_LOGGING_LEVEL: DEBUG + + consumer: + extends: + service: agent + file: ../../docker-compose.base.yml + ports: + - 9090:9090 + volumes: + - .:/app + command: ['bash', '-c', 'pip install flask itsdangerous==2.0.1 "Werkzeug<3" && pip install -r /app/requirements.txt && sw-python run python3 /app/services/consumer.py'] + depends_on: + collector: + condition: service_healthy + provider: + condition: service_healthy + environment: + SW_AGENT_NAME: consumer + SW_AGENT_LOGGING_LEVEL: DEBUG +networks: + beyond: diff --git a/tests/plugin/web/sw_threading/expected.data.yml b/tests/plugin/web/sw_threading/expected.data.yml new file mode 100644 index 00000000..8d950d3f --- /dev/null +++ b/tests/plugin/web/sw_threading/expected.data.yml @@ -0,0 +1,113 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +segmentItems: + - serviceName: provider + segmentSize: 1 + segments: + - segmentId: not null + spans: + - operationName: /users + parentSpanId: -1 + spanId: 0 + spanLayer: Http + tags: + - key: http.method + value: POST + - key: http.url + value: http://provider:9091/users + - key: http.status_code + value: '200' + refs: + - parentEndpoint: /users + networkAddress: 'provider:9091' + refType: CrossProcess + parentSpanId: 1 + parentTraceSegmentId: not null + parentServiceInstance: not null + parentService: consumer + traceId: not null + startTime: gt 0 + endTime: gt 0 + componentId: 7001 + spanType: Entry + peer: not null + skipAnalysis: false + - serviceName: consumer + segmentSize: 2 + segments: + - segmentId: not null + spans: + - operationName: /users + parentSpanId: 0 + spanId: 1 + spanLayer: Http + startTime: gt 0 + endTime: gt 0 + componentId: 7002 + isError: false + spanType: Exit + peer: provider:9091 + skipAnalysis: false + tags: + - key: http.method + value: POST + - key: http.url + value: 'http://provider:9091/users' + - key: http.status_code + value: '200' + - operationName: Thread/post + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: Unknown + startTime: gt 0 + endTime: gt 0 + componentId: 0 + isError: false + spanType: Local + peer: '' + skipAnalysis: false + refs: + - parentEndpoint: /users + networkAddress: '' + refType: CrossThread + parentSpanId: 0 + parentTraceSegmentId: not null + parentServiceInstance: not null + parentService: consumer + traceId: not null + - segmentId: not null + spans: + - operationName: /users + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: Http + tags: + - key: http.method + value: GET + - key: http.url + value: http://0.0.0.0:9090/users + - key: http.status_code + value: '200' + startTime: gt 0 + endTime: gt 0 + componentId: 7001 + spanType: Entry + peer: not null + skipAnalysis: false diff --git a/tests/plugin/web/sw_threading/services/__init__.py b/tests/plugin/web/sw_threading/services/__init__.py new file mode 100644 index 00000000..b1312a09 --- /dev/null +++ b/tests/plugin/web/sw_threading/services/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/plugin/web/sw_threading/services/consumer.py b/tests/plugin/web/sw_threading/services/consumer.py new file mode 100644 index 00000000..bb799052 --- /dev/null +++ b/tests/plugin/web/sw_threading/services/consumer.py @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import requests + + +def post(): + requests.post('http://provider:9091/users', timeout=5) + + +if __name__ == '__main__': + from flask import Flask, jsonify + + app = Flask(__name__) + + @app.route('/users', methods=['POST', 'GET']) + def application(): + from threading import Thread + t = Thread(target=post) + t.start() + t.join() + + return jsonify({'status': 'ok'}) + + PORT = 9090 + app.run(host='0.0.0.0', port=PORT, debug=True) diff --git a/tests/plugin/web/sw_threading/services/provider.py b/tests/plugin/web/sw_threading/services/provider.py new file mode 100644 index 00000000..79cc50d6 --- /dev/null +++ b/tests/plugin/web/sw_threading/services/provider.py @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import time + + +if __name__ == '__main__': + from flask import Flask, jsonify + + app = Flask(__name__) + + @app.route('/users', methods=['POST', 'GET']) + def application(): + time.sleep(0.5) + return jsonify({'status': 'ok'}) + + PORT = 9091 + app.run(host='0.0.0.0', port=PORT, debug=True) diff --git a/tests/plugin/web/sw_threading/test_threading.py b/tests/plugin/web/sw_threading/test_threading.py new file mode 100644 index 00000000..5e4a23dd --- /dev/null +++ b/tests/plugin/web/sw_threading/test_threading.py @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from typing import Callable + +import pytest +import requests + +from tests.plugin.base import TestPluginBase + + +@pytest.fixture +def prepare(): + # type: () -> Callable + return lambda *_: requests.get('http://0.0.0.0:9090/users', timeout=5) + + +class TestPlugin(TestPluginBase): + def test_plugin(self, docker_compose): + self.validate() From 6b84d9e9562f9614357e7b288552b850244f6143 Mon Sep 17 00:00:00 2001 From: Wu Sheng Date: Sun, 12 Apr 2026 20:57:30 +0800 Subject: [PATCH 2/2] docs: regenerate Plugins.md with sw_threading entry Co-Authored-By: Claude Opus 4.6 (1M context) --- docs/en/setup/Plugins.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/en/setup/Plugins.md b/docs/en/setup/Plugins.md index c430fb85..d300ae68 100644 --- a/docs/en/setup/Plugins.md +++ b/docs/en/setup/Plugins.md @@ -47,6 +47,7 @@ or a limitation of SkyWalking auto-instrumentation (welcome to contribute!) | [requests](https://requests.readthedocs.io/en/master/) | Python >=3.7 - ['2.26', '2.25']; | `sw_requests` | | [sanic](https://sanic.readthedocs.io/en/latest) | Python >=3.10 - NOT SUPPORTED YET; Python >=3.7 - ['20.12']; | `sw_sanic` | | [sanic](https://sanic.readthedocs.io/en/latest) | Python >=3.14 - ['24.12.*']; Python >=3.10 - ['23.12.*', '24.12.*']; | `sw_sanic_v2` | +| [threading](https://docs.python.org/3/library/threading.html) | Python >=3.7 - ['*']; | `sw_threading` | | [tornado](https://www.tornadoweb.org) | Python >=3.14 - ['6.4']; Python >=3.10 - ['6.0', '6.1']; | `sw_tornado` | | [urllib3](https://urllib3.readthedocs.io/en/latest/) | Python >=3.12 - NOT SUPPORTED YET; Python >=3.10 - ['1.26', '1.25']; | `sw_urllib3` | | [urllib3](https://urllib3.readthedocs.io/en/latest/) | Python >=3.12 - ['2.3', '2.0']; | `sw_urllib3_v2` | @@ -66,6 +67,7 @@ support both Neo4j 5 and 4.4 DBMS. For legacy Sanic <=21.3, see sw_sanic. Note: Sanic's touchup system recompiles handle_request at startup, so we use signal listeners instead of monkey-patching handle_request. +- Automatically propagates trace context across threads. - urllib3 1.x plugin. For urllib3 2.x, see sw_urllib3_v2. - urllib3 2.x plugin. For urllib3 1.x, see sw_urllib3. - The websocket instrumentation only traces client side connection handshake,