diff --git a/docs/en/setup/Plugins.md b/docs/en/setup/Plugins.md index c430fb85..d300ae68 100644 --- a/docs/en/setup/Plugins.md +++ b/docs/en/setup/Plugins.md @@ -47,6 +47,7 @@ or a limitation of SkyWalking auto-instrumentation (welcome to contribute!) | [requests](https://requests.readthedocs.io/en/master/) | Python >=3.7 - ['2.26', '2.25']; | `sw_requests` | | [sanic](https://sanic.readthedocs.io/en/latest) | Python >=3.10 - NOT SUPPORTED YET; Python >=3.7 - ['20.12']; | `sw_sanic` | | [sanic](https://sanic.readthedocs.io/en/latest) | Python >=3.14 - ['24.12.*']; Python >=3.10 - ['23.12.*', '24.12.*']; | `sw_sanic_v2` | +| [threading](https://docs.python.org/3/library/threading.html) | Python >=3.7 - ['*']; | `sw_threading` | | [tornado](https://www.tornadoweb.org) | Python >=3.14 - ['6.4']; Python >=3.10 - ['6.0', '6.1']; | `sw_tornado` | | [urllib3](https://urllib3.readthedocs.io/en/latest/) | Python >=3.12 - NOT SUPPORTED YET; Python >=3.10 - ['1.26', '1.25']; | `sw_urllib3` | | [urllib3](https://urllib3.readthedocs.io/en/latest/) | Python >=3.12 - ['2.3', '2.0']; | `sw_urllib3_v2` | @@ -66,6 +67,7 @@ support both Neo4j 5 and 4.4 DBMS. For legacy Sanic <=21.3, see sw_sanic. Note: Sanic's touchup system recompiles handle_request at startup, so we use signal listeners instead of monkey-patching handle_request. +- Automatically propagates trace context across threads. - urllib3 1.x plugin. For urllib3 2.x, see sw_urllib3_v2. - urllib3 2.x plugin. For urllib3 1.x, see sw_urllib3. - The websocket instrumentation only traces client side connection handshake, diff --git a/skywalking/decorators.py b/skywalking/decorators.py index d9fb2116..61dbe11f 100644 --- a/skywalking/decorators.py +++ b/skywalking/decorators.py @@ -73,14 +73,17 @@ def runnable( tags: List[Tag] = None, ): def decorator(func): - snapshot = get_context().capture() + _op = op or f'Thread/{func.__name__}' @wraps(func) def wrapper(*args, **kwargs): - _op = op or f'Thread/{func.__name__}' + import threading + snapshot = getattr(threading.current_thread(), '_sw_snapshot', None) + context = get_context() with context.new_local_span(op=_op) as span: - context.continued(snapshot) + if snapshot is not None: + context.continued(snapshot) span.layer = layer span.component = component if tags: @@ -88,6 +91,7 @@ def wrapper(*args, **kwargs): span.tag(tag) func(*args, **kwargs) + wrapper._sw_runnable = True return wrapper return decorator diff --git a/skywalking/plugins/sw_threading.py b/skywalking/plugins/sw_threading.py new file mode 100644 index 00000000..fb995a33 --- /dev/null +++ b/skywalking/plugins/sw_threading.py @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import threading + +from skywalking.trace.context import get_context + +link_vector = ['https://docs.python.org/3/library/threading.html'] +support_matrix = { + 'threading': { + '>=3.7': ['*'] + } +} +note = """Automatically propagates trace context across threads.""" + + +def install(): + _original_init = threading.Thread.__init__ + _original_run = threading.Thread.run + + def _sw_init(self, *args, **kwargs): + _original_init(self, *args, **kwargs) + self._sw_snapshot = get_context().capture() + + def _sw_run(self): + snapshot = getattr(self, '_sw_snapshot', None) + target = getattr(self, '_target', None) + + # If target is @runnable-wrapped, let @runnable handle the span and continued + if snapshot is not None and not getattr(target, '_sw_runnable', False): + context = get_context() + op = f'Thread/{target.__name__}' if target and hasattr(target, '__name__') else f'Thread/{self.name}' + with context.new_local_span(op=op): + context.continued(snapshot) + _original_run(self) + else: + _original_run(self) + + threading.Thread.__init__ = _sw_init + threading.Thread.run = _sw_run diff --git a/tests/plugin/web/sw_threading/__init__.py b/tests/plugin/web/sw_threading/__init__.py new file mode 100644 index 00000000..b1312a09 --- /dev/null +++ b/tests/plugin/web/sw_threading/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/plugin/web/sw_threading/docker-compose.yml b/tests/plugin/web/sw_threading/docker-compose.yml new file mode 100644 index 00000000..b443714b --- /dev/null +++ b/tests/plugin/web/sw_threading/docker-compose.yml @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +version: '2.1' + +services: + collector: + extends: + service: collector + file: ../../docker-compose.base.yml + + provider: + extends: + service: agent + file: ../../docker-compose.base.yml + ports: + - 9091:9091 + volumes: + - .:/app + command: ['bash', '-c', 'pip install flask itsdangerous==2.0.1 "Werkzeug<3" && pip install -r /app/requirements.txt && sw-python run python3 /app/services/provider.py'] + depends_on: + collector: + condition: service_healthy + healthcheck: + test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9091"] + interval: 5s + timeout: 60s + retries: 120 + environment: + SW_AGENT_NAME: provider + SW_AGENT_LOGGING_LEVEL: DEBUG + + consumer: + extends: + service: agent + file: ../../docker-compose.base.yml + ports: + - 9090:9090 + volumes: + - .:/app + command: ['bash', '-c', 'pip install flask itsdangerous==2.0.1 "Werkzeug<3" && pip install -r /app/requirements.txt && sw-python run python3 /app/services/consumer.py'] + depends_on: + collector: + condition: service_healthy + provider: + condition: service_healthy + environment: + SW_AGENT_NAME: consumer + SW_AGENT_LOGGING_LEVEL: DEBUG +networks: + beyond: diff --git a/tests/plugin/web/sw_threading/expected.data.yml b/tests/plugin/web/sw_threading/expected.data.yml new file mode 100644 index 00000000..8d950d3f --- /dev/null +++ b/tests/plugin/web/sw_threading/expected.data.yml @@ -0,0 +1,113 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +segmentItems: + - serviceName: provider + segmentSize: 1 + segments: + - segmentId: not null + spans: + - operationName: /users + parentSpanId: -1 + spanId: 0 + spanLayer: Http + tags: + - key: http.method + value: POST + - key: http.url + value: http://provider:9091/users + - key: http.status_code + value: '200' + refs: + - parentEndpoint: /users + networkAddress: 'provider:9091' + refType: CrossProcess + parentSpanId: 1 + parentTraceSegmentId: not null + parentServiceInstance: not null + parentService: consumer + traceId: not null + startTime: gt 0 + endTime: gt 0 + componentId: 7001 + spanType: Entry + peer: not null + skipAnalysis: false + - serviceName: consumer + segmentSize: 2 + segments: + - segmentId: not null + spans: + - operationName: /users + parentSpanId: 0 + spanId: 1 + spanLayer: Http + startTime: gt 0 + endTime: gt 0 + componentId: 7002 + isError: false + spanType: Exit + peer: provider:9091 + skipAnalysis: false + tags: + - key: http.method + value: POST + - key: http.url + value: 'http://provider:9091/users' + - key: http.status_code + value: '200' + - operationName: Thread/post + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: Unknown + startTime: gt 0 + endTime: gt 0 + componentId: 0 + isError: false + spanType: Local + peer: '' + skipAnalysis: false + refs: + - parentEndpoint: /users + networkAddress: '' + refType: CrossThread + parentSpanId: 0 + parentTraceSegmentId: not null + parentServiceInstance: not null + parentService: consumer + traceId: not null + - segmentId: not null + spans: + - operationName: /users + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: Http + tags: + - key: http.method + value: GET + - key: http.url + value: http://0.0.0.0:9090/users + - key: http.status_code + value: '200' + startTime: gt 0 + endTime: gt 0 + componentId: 7001 + spanType: Entry + peer: not null + skipAnalysis: false diff --git a/tests/plugin/web/sw_threading/services/__init__.py b/tests/plugin/web/sw_threading/services/__init__.py new file mode 100644 index 00000000..b1312a09 --- /dev/null +++ b/tests/plugin/web/sw_threading/services/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/plugin/web/sw_threading/services/consumer.py b/tests/plugin/web/sw_threading/services/consumer.py new file mode 100644 index 00000000..bb799052 --- /dev/null +++ b/tests/plugin/web/sw_threading/services/consumer.py @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import requests + + +def post(): + requests.post('http://provider:9091/users', timeout=5) + + +if __name__ == '__main__': + from flask import Flask, jsonify + + app = Flask(__name__) + + @app.route('/users', methods=['POST', 'GET']) + def application(): + from threading import Thread + t = Thread(target=post) + t.start() + t.join() + + return jsonify({'status': 'ok'}) + + PORT = 9090 + app.run(host='0.0.0.0', port=PORT, debug=True) diff --git a/tests/plugin/web/sw_threading/services/provider.py b/tests/plugin/web/sw_threading/services/provider.py new file mode 100644 index 00000000..79cc50d6 --- /dev/null +++ b/tests/plugin/web/sw_threading/services/provider.py @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import time + + +if __name__ == '__main__': + from flask import Flask, jsonify + + app = Flask(__name__) + + @app.route('/users', methods=['POST', 'GET']) + def application(): + time.sleep(0.5) + return jsonify({'status': 'ok'}) + + PORT = 9091 + app.run(host='0.0.0.0', port=PORT, debug=True) diff --git a/tests/plugin/web/sw_threading/test_threading.py b/tests/plugin/web/sw_threading/test_threading.py new file mode 100644 index 00000000..5e4a23dd --- /dev/null +++ b/tests/plugin/web/sw_threading/test_threading.py @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from typing import Callable + +import pytest +import requests + +from tests.plugin.base import TestPluginBase + + +@pytest.fixture +def prepare(): + # type: () -> Callable + return lambda *_: requests.get('http://0.0.0.0:9090/users', timeout=5) + + +class TestPlugin(TestPluginBase): + def test_plugin(self, docker_compose): + self.validate()