diff --git a/src/google/adk/agents/config_agent_utils.py b/src/google/adk/agents/config_agent_utils.py index 2c1c9bd9c8..5f5884ae74 100644 --- a/src/google/adk/agents/config_agent_utils.py +++ b/src/google/adk/agents/config_agent_utils.py @@ -30,6 +30,21 @@ from .common_configs import AgentRefConfig from .common_configs import CodeConfig +# Allowlist for safe module prefixes that can be imported from YAML config +_SAFE_MODULE_PREFIXES = frozenset({"google.adk."}) + + +def _is_safe_module_import(name: str) -> bool: + """Check if a module import is from a safe/allowed namespace. + + Args: + name: The fully qualified module name to check. + + Returns: + True if the module is in a safe namespace, False otherwise. + """ + return any(name.startswith(prefix) for prefix in _SAFE_MODULE_PREFIXES) + @experimental(FeatureName.AGENT_CONFIG) def from_config(config_path: str) -> BaseAgent: @@ -105,10 +120,34 @@ def _load_config_from_path(config_path: str) -> AgentConfig: @experimental(FeatureName.AGENT_CONFIG) def resolve_fully_qualified_name(name: str) -> Any: + """Resolve a fully qualified name to a Python object. + + Args: + name: The fully qualified name (e.g., 'google.adk.agents.LlmAgent'). + + Returns: + The resolved Python object. + + Raises: + ValueError: If the name is not in a safe namespace or cannot be resolved. + """ try: module_path, obj_name = name.rsplit(".", 1) + + # Security check: only allow imports from safe namespaces + if not _is_safe_module_import(module_path): + raise ValueError( + f"Module reference '{name}' is outside the allowed namespace. " + "Only google.adk.* references are permitted in YAML config." + ) + module = importlib.import_module(module_path) return getattr(module, obj_name) + except ValueError as e: + # Re-raise ValueError from security check without wrapping + if "outside the allowed namespace" in str(e): + raise e + raise ValueError(f"Invalid fully qualified name: {name}") from e except Exception as e: raise ValueError(f"Invalid fully qualified name: {name}") from e @@ -153,12 +192,20 @@ def _resolve_agent_code_reference(code: str) -> Any: The resolved agent instance. Raises: - ValueError: If the agent reference cannot be resolved. + ValueError: If the agent reference cannot be resolved or is outside allowed namespace. """ if "." not in code: raise ValueError(f"Invalid code reference: {code}") module_path, obj_name = code.rsplit(".", 1) + + # Security check: only allow imports from safe namespaces + if not _is_safe_module_import(module_path): + raise ValueError( + f"Code reference '{code}' is outside the allowed namespace. " + "Only google.adk.* references are permitted in YAML config." + ) + module = importlib.import_module(module_path) obj = getattr(module, obj_name) @@ -182,12 +229,20 @@ def resolve_code_reference(code_config: CodeConfig) -> Any: The resolved Python object. Raises: - ValueError: If the code reference cannot be resolved. + ValueError: If the code reference cannot be resolved or is outside allowed namespace. """ if not code_config or not code_config.name: raise ValueError("Invalid CodeConfig.") module_path, obj_name = code_config.name.rsplit(".", 1) + + # Security check: only allow imports from safe namespaces + if not _is_safe_module_import(module_path): + raise ValueError( + f"Code reference '{code_config.name}' is outside the allowed namespace. " + "Only google.adk.* references are permitted in YAML config." + ) + module = importlib.import_module(module_path) obj = getattr(module, obj_name) diff --git a/src/google/adk/cli/adk_web_server.py b/src/google/adk/cli/adk_web_server.py index 927cd7ad03..dc17ebaff1 100644 --- a/src/google/adk/cli/adk_web_server.py +++ b/src/google/adk/cli/adk_web_server.py @@ -51,6 +51,7 @@ from opentelemetry.sdk.trace import TracerProvider from pydantic import Field from pydantic import ValidationError +from pydantic import field_validator from starlette.types import Lifespan from typing_extensions import deprecated from typing_extensions import override @@ -366,6 +367,29 @@ class RunAgentRequest(common.BaseModel): # for resume long-running functions invocation_id: Optional[str] = None + @field_validator("app_name") + @classmethod + def validate_app_name(cls, v: str) -> str: + """Validate app_name to prevent path traversal attacks. + + Args: + v: The app_name value to validate. + + Returns: + The validated app_name. + + Raises: + ValueError: If the app_name contains path traversal characters. + """ + if not v: + raise ValueError("app_name cannot be empty") + # Check for path traversal attempts + if ".." in v or "/" in v or "\\" in v: + raise ValueError( + f"Invalid app_name: {v!r}. Path traversal characters are not allowed." + ) + return v + class CreateSessionRequest(common.BaseModel): session_id: Optional[str] = Field( diff --git a/src/google/adk/cli/utils/agent_loader.py b/src/google/adk/cli/utils/agent_loader.py index a7bbcbc2a6..c895826368 100644 --- a/src/google/adk/cli/utils/agent_loader.py +++ b/src/google/adk/cli/utils/agent_loader.py @@ -167,6 +167,8 @@ def _load_from_submodule( def _load_from_yaml_config( self, agent_name: str, agents_dir: str ) -> Optional[BaseAgent]: + # Validate agent_name doesn't escape agents_dir + self._validate_agent_path(agents_dir, agent_name) # Load from the config file at agents_dir/{agent_name}/root_agent.yaml config_path = os.path.join(agents_dir, agent_name, "root_agent.yaml") try: @@ -188,6 +190,31 @@ def _load_from_yaml_config( ) + e.args[1:] raise e + def _validate_agent_path(self, agents_dir: str, agent_name: str) -> None: + """Validate that the agent path resolves within agents_dir. + + Args: + agents_dir: The base directory for agents. + agent_name: The agent name/path to validate. + + Raises: + ValueError: If the resolved path would escape agents_dir. + """ + # Normalize paths to absolute, resolved paths + base_path = Path(agents_dir).resolve() + # Handle both forward and backward slashes by using Path + agent_path = base_path / agent_name + resolved_path = agent_path.resolve() + + # Check if the resolved path is still within the base directory + try: + resolved_path.relative_to(base_path) + except ValueError as e: + raise ValueError( + f"Agent '{agent_name}' resolves outside agents_dir. " + "Path traversal is not permitted." + ) from e + _VALID_AGENT_NAME_RE = re.compile(r"^[a-zA-Z0-9_]+$") def _validate_agent_name(self, agent_name: str) -> None: