"""AWS cross-account IAM role :class:`AccountIntegrationProvider`.

Federated-first onboarding flow (AGENTS rule 26 + Tavily research §1):

1. ``bootstrap_artifacts`` — generate the IAM trust policy JSON the
   customer must attach to a fresh cross-account role on their side,
   embedding AlphaSwarm's account id as ``Principal.AWS`` and an HMAC-derived
   ``sts:ExternalId``. Emits a one-click CloudFormation StackSet URL
   and a copy/pasteable ``aws iam create-role`` snippet.
2. ``validate_identity`` — assume the role via ``sts:AssumeRole`` then
   call ``sts:GetCallerIdentity`` to confirm the trust chain works.
3. ``validate_permissions`` — ``iam:SimulatePrincipalPolicy`` against
   the assumed role to preview the permissions the customer granted.
4. ``enumerate_resources`` — list accounts (single-account most of the
   time) + regions so the wizard can populate dropdowns downstream.
5. ``connect`` — persist the ``{role_arn, external_id, region}`` tuple
   under ``CredentialKey('cloud_aws', f'org:{org_id}')`` via the
   IntegrationCredentialStore.

Long-lived AK/SK paths are intentionally NOT supported in this cut
(plan §"Auth methods (federated-first only in this cut)").

Two auth methods are supported:

- ``iam_role_external_id`` (default) — cross-account ``sts:AssumeRole``
  with HMAC-derived ``ExternalId``. The original federated-first path
  used for customer onboarding.
- ``sso_permission_set`` — AWS Identity Center / SSO. The wizard
  drives the OIDC device flow against ``cloud_aws_sso_start_url`` and
  exchanges the resulting access token for short-lived STS creds via
  ``sso.get_role_credentials`` against the chosen ``account_id`` +
  ``permission_set_name``. Used by AlphaSwarm staff for hosted-platform
  ops (terraform apply/destroy, workload management).

The two branches share the ``validate_*`` / ``enumerate`` / ``connect``
plumbing — only the ``boto3.Session`` factory differs.
"""
from __future__ import annotations

import logging
from typing import Any

from alphaswarm_admin.providers.base import (
    AccountIntegrationProvider,
    BootstrapArtifacts,
    EnumerationResult,
    IdentityProbe,
    IntegrationHealth,
    IntegrationKind,
    IntegrationProviderError,
    IntegrationRecord,
    IntegrationStatus,
    PermissionPreview,
    now,
)
from alphaswarm_admin.providers.cloud_common import (
    derive_external_id,
    require_sdk,
    row_to_record,
)
from alphaswarm_admin.services.integration_store import (
    IntegrationCredentialStore,
    get_integration_store,
)
from alphaswarm_admin.settings import AdminSettings, get_settings

logger = logging.getLogger(__name__)


# Minimum permission set the wizard expects on the customer-side role.
# Used by ``validate_permissions`` to flag missing privileges; keep
# this list tight so we surface a useful red badge in the UI without
# requiring the customer to over-grant.
_AWS_BASELINE_PERMISSIONS: tuple[str, ...] = (
    "sts:GetCallerIdentity",
    "iam:GetRole",
    "ec2:DescribeRegions",
)


class CloudAwsProvider(AccountIntegrationProvider):
    """Federated AWS cross-account role onboarding provider."""

    kind = IntegrationKind.CLOUD_AWS

    def __init__(
        self,
        *,
        store: IntegrationCredentialStore | None = None,
        settings: AdminSettings | None = None,
        sts_factory: Any = None,
        iam_factory: Any = None,
        ec2_factory: Any = None,
        sso_session_factory: Any = None,
    ) -> None:
        self._store = store or get_integration_store()
        self._settings = settings or get_settings()
        self._sts_factory = sts_factory  # tests inject (role_arn, external_id, region) -> client
        self._iam_factory = iam_factory  # tests inject (role_arn) -> client
        self._ec2_factory = ec2_factory  # tests inject (role_arn) -> client
        # tests inject (access_token, account_id, role_name, region) -> {sts, iam, ec2}
        self._sso_session_factory = sso_session_factory

    # ------------------------------------------------------------------
    # Bootstrap — generate the trust policy and copy/paste snippets
    # ------------------------------------------------------------------

    async def bootstrap_artifacts(
        self,
        org_id: str,
        body: dict[str, Any],
    ) -> BootstrapArtifacts:
        auth_method = str(body.get("auth_method") or "iam_role_external_id").strip()
        if auth_method == "sso_permission_set":
            return self._sso_bootstrap_artifacts(org_id, body)
        partner_account_id = self._settings.cloud_aws_partner_account_id.strip()
        if not partner_account_id:
            raise IntegrationProviderError(
                "ALPHASWARM_ADMIN_AWS_PARTNER_ACCOUNT_ID is not configured; the "
                "AWS onboarding wizard cannot emit a trust policy without it",
                code="provider_misconfigured",
                status_code=503,
            )
        external_id = derive_external_id(
            org_id=org_id,
            cloud_kind=self.kind,
            secret=self._settings.cloud_aws_external_id_secret or None,
        )
        role_name = (
            self._settings.cloud_aws_role_name_pattern.format(org_id=org_id)
            if "{org_id}" in self._settings.cloud_aws_role_name_pattern
            else self._settings.cloud_aws_role_name_pattern
        )
        managed_policy_arns = list(body.get("managed_policy_arns") or [])

        trust_policy = {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Principal": {"AWS": f"arn:aws:iam::{partner_account_id}:root"},
                    "Action": "sts:AssumeRole",
                    "Condition": {
                        "StringEquals": {"sts:ExternalId": external_id},
                    },
                },
            ],
        }
        # CloudFormation one-click — the customer pastes the trust
        # policy inline so we don't need to host a public template.
        cf_url = (
            "https://console.aws.amazon.com/cloudformation/home"
            "?#/stacks/create/review"
            f"&stackName=alphaswarm-broker-{org_id}"
            f"&param_PartnerAccountId={partner_account_id}"
            f"&param_ExternalId={external_id}"
            f"&param_RoleName={role_name}"
        )
        cli_command = (
            f'aws iam create-role --role-name {role_name} '
            f"--assume-role-policy-document '{_compact_json(trust_policy)}'"
        )
        return BootstrapArtifacts(
            cloud_kind=self.kind,
            auth_method="iam_role_external_id",
            external_id=external_id,
            templates={"trust_policy.json": _compact_json(trust_policy)},
            cli_commands={"create_role": cli_command},
            iac_links={"cloudformation_stackset": cf_url},
            metadata={
                "partner_account_id": partner_account_id,
                "suggested_role_name": role_name,
                "baseline_permissions": list(_AWS_BASELINE_PERMISSIONS),
                "managed_policy_arns": managed_policy_arns,
            },
            next_step=(
                "Apply the trust policy on the customer's AWS account, "
                "note the resulting Role ARN, then return to step 3 to "
                "validate the identity."
            ),
        )

    # ------------------------------------------------------------------
    # Identity probe — assume + GetCallerIdentity
    # ------------------------------------------------------------------

    async def validate_identity(
        self,
        org_id: str,
        body: dict[str, Any],
    ) -> IdentityProbe:
        try:
            session, region = self._session_for_auth_method(org_id, body)
        except IntegrationProviderError as exc:
            return IdentityProbe(ok=False, error=str(exc))
        sts = session["sts"]
        try:
            identity = sts.get_caller_identity()
        except Exception as exc:  # noqa: BLE001 — boto raises Boto3Error subclasses
            return IdentityProbe(
                ok=False,
                error=f"GetCallerIdentity failed: {exc}",
            )
        return IdentityProbe(
            ok=True,
            identity={
                "caller_arn": str(identity.get("Arn") or ""),
                "account": str(identity.get("Account") or ""),
                "user_id": str(identity.get("UserId") or ""),
                "region": region,
            },
        )

    # ------------------------------------------------------------------
    # Permission preview — SimulatePrincipalPolicy
    # ------------------------------------------------------------------

    async def validate_permissions(
        self,
        org_id: str,
        body: dict[str, Any],
    ) -> PermissionPreview:
        actions: list[str] = list(body.get("actions") or _AWS_BASELINE_PERMISSIONS)
        try:
            session, _region = self._session_for_auth_method(org_id, body)
        except IntegrationProviderError as exc:
            return PermissionPreview(error=str(exc))
        iam = session["iam"]
        # For the SSO branch the assumed-role principal is implicit on
        # the session itself; use the GetCallerIdentity ARN as the
        # ``PolicySourceArn`` so SimulatePrincipalPolicy works on both
        # auth methods. For the legacy branch the operator-supplied
        # ``role_arn`` is identical to the caller arn after assumption.
        sts = session["sts"]
        try:
            caller = sts.get_caller_identity()
            policy_source_arn = str(
                body.get("role_arn") or caller.get("Arn") or ""
            ).strip()
        except Exception as exc:  # noqa: BLE001
            return PermissionPreview(error=f"GetCallerIdentity failed: {exc}")
        if not policy_source_arn:
            return PermissionPreview(error="caller arn unresolved")
        try:
            response = iam.simulate_principal_policy(
                PolicySourceArn=policy_source_arn,
                ActionNames=actions,
            )
        except Exception as exc:  # noqa: BLE001
            return PermissionPreview(error=f"SimulatePrincipalPolicy failed: {exc}")
        allowed: list[str] = []
        denied: list[str] = []
        for entry in response.get("EvaluationResults") or []:
            decision = str(entry.get("EvalDecision") or "")
            action = str(entry.get("EvalActionName") or "")
            if decision == "allowed":
                allowed.append(action)
            else:
                denied.append(action)
        baseline = set(_AWS_BASELINE_PERMISSIONS)
        missing = tuple(sorted(baseline.difference(allowed)))
        return PermissionPreview(
            allowed=tuple(allowed),
            denied=tuple(denied),
            missing_required=missing,
            metadata={
                "policy_source_arn": policy_source_arn,
                "tested_actions": list(actions),
            },
        )

    # ------------------------------------------------------------------
    # Enumerate — list regions (and the single calling account)
    # ------------------------------------------------------------------

    async def enumerate_resources(
        self,
        org_id: str,
        body: dict[str, Any],
    ) -> EnumerationResult:
        auth_method = str(body.get("auth_method") or "iam_role_external_id").strip()
        try:
            session, region = self._session_for_auth_method(org_id, body)
        except IntegrationProviderError as exc:
            return EnumerationResult(error=str(exc))
        ec2 = session["ec2"]
        sts = session["sts"]
        try:
            identity = sts.get_caller_identity()
            account_id = str(identity.get("Account") or "")
        except Exception as exc:  # noqa: BLE001
            return EnumerationResult(error=f"GetCallerIdentity failed: {exc}")
        try:
            regions_response = ec2.describe_regions(AllRegions=False)
            regions = [
                {
                    "name": str(r.get("RegionName") or ""),
                    "endpoint": str(r.get("Endpoint") or ""),
                }
                for r in regions_response.get("Regions") or []
                if r.get("RegionName")
            ]
        except Exception as exc:  # noqa: BLE001
            return EnumerationResult(
                error=f"DescribeRegions failed: {exc}",
                resources={"accounts": [{"id": account_id}]},
            )
        if auth_method == "sso_permission_set":
            account_block = {
                "id": account_id,
                "permission_set_name": str(body.get("permission_set_name") or ""),
            }
        else:
            account_block = {
                "id": account_id,
                "role_arn": str(body.get("role_arn") or ""),
            }
        return EnumerationResult(
            resources={
                "accounts": [account_block],
                "regions": regions,
            },
        )

    # ------------------------------------------------------------------
    # Connect — persist
    # ------------------------------------------------------------------

    async def connect(
        self,
        org_id: str,
        body: dict[str, Any],
        *,
        bearer: str | None = None,
    ) -> IntegrationRecord:
        auth_method = str(body.get("auth_method") or "iam_role_external_id").strip()
        # Re-validate before persisting so we never store a record
        # that cannot be assumed / minted.
        probe = await self.validate_identity(org_id, body)
        if not probe.ok:
            raise IntegrationProviderError(
                probe.error or "identity probe failed",
                code="unauthorized",
                status_code=401,
            )
        account_id = str(probe.identity.get("account") or "")
        if auth_method == "sso_permission_set":
            permission_set_name = str(body.get("permission_set_name") or "").strip()
            region = str(body.get("region") or "").strip() or None
            sso_region = str(
                body.get("sso_region") or self._settings.cloud_aws_sso_region
            ).strip() or None
            start_url = str(
                body.get("sso_start_url")
                or body.get("start_url")
                or self._settings.cloud_aws_sso_start_url
            ).strip()
            namespace = str(
                body.get("namespace")
                or f"sso:{account_id}:{permission_set_name}"
            )
            row = self._store.put(
                org_id=org_id,
                kind=self.kind.value,
                namespace=namespace,
                # Token slot stores the source-identity ARN — there is
                # no long-lived secret. Short-lived STS creds live in
                # AwsSsoCredentialStore (resolver priority 4) keyed by
                # ``aws_sso:<account_id>:<role_name>:<operator_sub>``.
                token=str(probe.identity.get("caller_arn") or ""),
                metadata={
                    "auth_method": "sso_permission_set",
                    "account_id": account_id,
                    "permission_set_name": permission_set_name,
                    "region": region or "",
                    "sso_start_url": start_url,
                    "sso_region": sso_region or "",
                    "caller_arn": str(probe.identity.get("caller_arn") or ""),
                    "identity_center_instance_arn": (
                        self._settings.cloud_aws_identity_center_instance_arn or ""
                    ),
                },
            )
            return row_to_record(
                row, self.kind, status_override=IntegrationStatus.HEALTHY
            )
        # Legacy iam_role_external_id branch
        role_arn, external_id, region = self._read_assume_inputs(org_id, body)
        namespace = str(body.get("namespace") or account_id or role_arn)
        # NOTE: the role_arn doubles as the resolver "token" — there is
        # no long-lived secret to store; the IntegrationCredentialStore
        # still encrypts it so the file at rest carries no plaintext
        # arn either.
        row = self._store.put(
            org_id=org_id,
            kind=self.kind.value,
            namespace=namespace,
            token=role_arn,
            metadata={
                "auth_method": "iam_role_external_id",
                "role_arn": role_arn,
                "external_id": external_id,
                "region": region or "",
                "account_id": account_id,
                "caller_arn": str(probe.identity.get("caller_arn") or ""),
            },
        )
        return row_to_record(row, self.kind, status_override=IntegrationStatus.HEALTHY)

    async def health(self, record: IntegrationRecord) -> IntegrationHealth:
        auth_method = str(
            record.metadata.get("auth_method") or "iam_role_external_id"
        )
        if auth_method == "sso_permission_set":
            # SSO health probes require a fresh access token from the
            # operator's session; without one we report DEGRADED so the
            # UI prompts the operator to re-sign-in. The watchdog beat
            # task in alphaswarm/tasks/token_refresh_tasks.py refreshes
            # in-flight sessions; the operator's manual login restores
            # the access token.
            return IntegrationHealth(
                status=IntegrationStatus.DEGRADED,
                checked_at=now(),
                error=(
                    "SSO health requires an active operator session; "
                    "open the wizard and re-validate to refresh"
                ),
                metadata={
                    "auth_method": auth_method,
                    "account_id": str(record.metadata.get("account_id") or ""),
                    "permission_set_name": str(
                        record.metadata.get("permission_set_name") or ""
                    ),
                },
            )
        role_arn = str(record.metadata.get("role_arn") or "")
        external_id = str(record.metadata.get("external_id") or "")
        region = str(record.metadata.get("region") or "") or None
        if not role_arn or not external_id:
            return IntegrationHealth(
                status=IntegrationStatus.DISCONNECTED,
                checked_at=now(),
                error="role_arn / external_id not on persisted record",
            )
        probe = await self.validate_identity(
            record.org_id,
            {"role_arn": role_arn, "external_id": external_id, "region": region},
        )
        if not probe.ok:
            self._store.update_health(
                org_id=record.org_id,
                kind=self.kind.value,
                status=IntegrationStatus.DEGRADED.value,
                error=probe.error,
            )
            return IntegrationHealth(
                status=IntegrationStatus.DEGRADED,
                checked_at=now(),
                error=probe.error,
            )
        self._store.update_health(
            org_id=record.org_id,
            kind=self.kind.value,
            status=IntegrationStatus.HEALTHY.value,
        )
        return IntegrationHealth(
            status=IntegrationStatus.HEALTHY,
            checked_at=now(),
            metadata={"caller_arn": str(probe.identity.get("caller_arn") or "")},
        )

    async def disconnect(
        self,
        record: IntegrationRecord,
        *,
        bearer: str | None = None,
    ) -> None:
        self._store.delete(org_id=record.org_id, kind=self.kind.value)

    async def list_for_org(self, org_id: str) -> list[IntegrationRecord]:
        rows = [
            row
            for row in self._store.list_for_org(org_id)
            if row.kind == self.kind.value
        ]
        return [row_to_record(row, self.kind) for row in rows]

    # ------------------------------------------------------------------
    # Internals
    # ------------------------------------------------------------------

    def _read_assume_inputs(
        self,
        org_id: str,
        body: dict[str, Any],
    ) -> tuple[str, str, str | None]:
        role_arn = str(body.get("role_arn") or "").strip()
        if not role_arn:
            raise IntegrationProviderError(
                "role_arn required",
                code="invalid_payload",
                status_code=422,
            )
        external_id = str(body.get("external_id") or "").strip()
        if not external_id:
            external_id = derive_external_id(
                org_id=org_id,
                cloud_kind=self.kind,
                secret=self._settings.cloud_aws_external_id_secret or None,
            )
        region = str(body.get("region") or "").strip() or None
        return role_arn, external_id, region

    def _read_sso_inputs(
        self,
        body: dict[str, Any],
    ) -> tuple[str, str, str, str, str | None]:
        """Validate + return the SSO ``(access_token, account_id,
        role_name, sso_region, region)`` tuple.

        ``access_token`` is the SSO access token from the device flow;
        ``role_name`` is the AWS Identity Center permission set name;
        ``sso_region`` is the Identity Center home region; ``region``
        is the optional workload region for STS / EC2 calls.
        """
        access_token = str(body.get("access_token") or "").strip()
        account_id = str(body.get("account_id") or "").strip()
        role_name = str(
            body.get("role_name") or body.get("permission_set_name") or ""
        ).strip()
        sso_region = (
            str(body.get("sso_region") or "").strip()
            or self._settings.cloud_aws_sso_region.strip()
            or "us-east-1"
        )
        region = str(body.get("region") or "").strip() or None
        if not access_token:
            raise IntegrationProviderError(
                "access_token required for sso_permission_set auth",
                code="invalid_payload",
                status_code=422,
            )
        if not account_id:
            raise IntegrationProviderError(
                "account_id required for sso_permission_set auth",
                code="invalid_payload",
                status_code=422,
            )
        if not role_name:
            raise IntegrationProviderError(
                "permission_set_name (role_name) required for sso_permission_set auth",
                code="invalid_payload",
                status_code=422,
            )
        return access_token, account_id, role_name, sso_region, region

    def _session_for_auth_method(
        self,
        org_id: str,
        body: dict[str, Any],
    ) -> tuple[dict[str, Any], str | None]:
        """Dispatch ``body`` onto the correct boto3-session factory.

        Returns ``({"sts": ..., "iam": ..., "ec2": ...}, region)`` so
        callers can re-use a single client bag regardless of auth
        method. Centralising the dispatch keeps ``validate_identity``
        / ``validate_permissions`` / ``enumerate_resources`` /
        ``connect`` auth-method-agnostic.
        """
        auth_method = str(body.get("auth_method") or "iam_role_external_id").strip()
        if auth_method == "sso_permission_set":
            access_token, account_id, role_name, sso_region, region = (
                self._read_sso_inputs(body)
            )
            session = self._sso_session(
                access_token=access_token,
                account_id=account_id,
                role_name=role_name,
                sso_region=sso_region,
                region=region,
            )
            return session, region
        role_arn, external_id, region = self._read_assume_inputs(org_id, body)
        return (
            self._assume_role(
                role_arn=role_arn,
                external_id=external_id,
                region=region,
            ),
            region,
        )

    def _sso_bootstrap_artifacts(
        self,
        org_id: str,
        body: dict[str, Any],
    ) -> BootstrapArtifacts:
        """Bootstrap branch for the ``sso_permission_set`` auth method.

        No trust policy is emitted — Identity Center handles the
        permission-set + account-assignment plumbing on the AWS side
        via the Phase 2 ``aws_iam_identity_center`` Terraform module.
        We surface the start URL + Identity Center instance ARN so
        the wizard can drive the operator to the device-flow login.
        """
        start_url = str(
            body.get("sso_start_url")
            or self._settings.cloud_aws_sso_start_url
        ).strip()
        if not start_url:
            raise IntegrationProviderError(
                "ALPHASWARM_AWS_SSO_START_URL is not configured; the "
                "AWS SSO onboarding branch cannot bootstrap without it",
                code="provider_misconfigured",
                status_code=503,
            )
        sso_region = (
            str(body.get("sso_region") or "").strip()
            or self._settings.cloud_aws_sso_region.strip()
            or "us-east-1"
        )
        identity_center_arn = (
            self._settings.cloud_aws_identity_center_instance_arn.strip()
        )
        default_perm = self._settings.cloud_aws_sso_default_permission_set.strip()
        return BootstrapArtifacts(
            cloud_kind=self.kind,
            auth_method="sso_permission_set",
            external_id="",  # not applicable for the SSO branch
            templates={},
            cli_commands={
                "alphaswarm_cli_login": (
                    f"alphaswarm-cli aws sso-login --start-url {start_url}"
                ),
            },
            iac_links={
                "identity_center_console": (
                    "https://console.aws.amazon.com/singlesignon/home"
                    f"?region={sso_region}#/instances"
                ),
            },
            metadata={
                "sso_start_url": start_url,
                "sso_region": sso_region,
                "identity_center_instance_arn": identity_center_arn,
                "default_permission_set": default_perm,
                "next_step_kind": "device_flow",
            },
            next_step=(
                "Run the OIDC device flow against the start URL, pick "
                "an account + permission set, then return to validate."
            ),
        )

    def _sso_session(
        self,
        *,
        access_token: str,
        account_id: str,
        role_name: str,
        sso_region: str,
        region: str | None,
    ) -> dict[str, Any]:
        """Return ``{sts, iam, ec2}`` clients backed by SSO STS creds.

        Calls ``sso.get_role_credentials`` with the operator's access
        token to mint short-lived STS creds, then wraps them in a
        boto3 Session matching the shape :func:`_assume_role` returns.
        Tests inject ``sso_session_factory(access_token, account_id,
        role_name, region)`` to bypass the real boto3 calls.
        """
        if self._sso_session_factory is not None:
            return self._sso_session_factory(
                access_token, account_id, role_name, region
            )
        boto3 = require_sdk(
            self.kind,
            extras_name="cloud-aws",
            import_target="boto3",
        )
        try:
            sso = boto3.client("sso", region_name=sso_region)
            page = sso.get_role_credentials(
                accessToken=access_token,
                accountId=account_id,
                roleName=role_name,
            )
        except Exception as exc:  # noqa: BLE001
            message = str(exc)
            code = (
                "unauthorized"
                if "Unauthorized" in message or "AccessDenied" in message
                else "upstream_unreachable"
            )
            status_code = 401 if code == "unauthorized" else 502
            raise IntegrationProviderError(
                f"sso.get_role_credentials failed: {message}",
                code=code,
                status_code=status_code,
            ) from exc
        creds = page.get("roleCredentials") or {}
        access_key = str(creds.get("accessKeyId") or "")
        secret_key = str(creds.get("secretAccessKey") or "")
        session_token = str(creds.get("sessionToken") or "")
        if not access_key or not secret_key or not session_token:
            raise IntegrationProviderError(
                "sso.get_role_credentials returned no credentials",
                code="invalid_response",
                status_code=502,
            )
        scoped_session = boto3.Session(
            aws_access_key_id=access_key,
            aws_secret_access_key=secret_key,
            aws_session_token=session_token,
            region_name=region,
        )
        return {
            "sts": scoped_session.client("sts"),
            "iam": scoped_session.client("iam"),
            "ec2": scoped_session.client("ec2", region_name=region or "us-east-1"),
        }

    def _assume_role(
        self,
        *,
        role_arn: str,
        external_id: str,
        region: str | None,
    ) -> dict[str, Any]:
        """Return ``{"sts": ..., "iam": ..., "ec2": ...}`` clients.

        When the test factories are provided they are called directly
        — the real ``sts:AssumeRole`` is only hit at runtime.
        """
        if self._sts_factory is not None:
            return {
                "sts": self._sts_factory(role_arn, external_id, region),
                "iam": (self._iam_factory or self._sts_factory)(
                    role_arn, external_id, region
                ),
                "ec2": (self._ec2_factory or self._sts_factory)(
                    role_arn, external_id, region
                ),
            }
        boto3 = require_sdk(
            self.kind,
            extras_name="cloud-aws",
            import_target="boto3",
        )
        try:
            session = boto3.Session(region_name=region) if region else boto3.Session()
            sts_root = session.client("sts")
            assumed = sts_root.assume_role(
                RoleArn=role_arn,
                RoleSessionName="alphaswarm-admin",
                ExternalId=external_id,
            )
        except Exception as exc:  # noqa: BLE001
            message = str(exc)
            code = "unauthorized" if "AccessDenied" in message else "upstream_unreachable"
            status_code = 401 if code == "unauthorized" else 502
            raise IntegrationProviderError(
                f"AssumeRole failed: {message}",
                code=code,
                status_code=status_code,
            ) from exc
        creds = assumed.get("Credentials") or {}
        access_key = str(creds.get("AccessKeyId") or "")
        secret_key = str(creds.get("SecretAccessKey") or "")
        session_token = str(creds.get("SessionToken") or "")
        if not access_key or not secret_key or not session_token:
            raise IntegrationProviderError(
                "AssumeRole returned no credentials",
                code="invalid_response",
                status_code=502,
            )
        scoped_session = boto3.Session(
            aws_access_key_id=access_key,
            aws_secret_access_key=secret_key,
            aws_session_token=session_token,
            region_name=region,
        )
        return {
            "sts": scoped_session.client("sts"),
            "iam": scoped_session.client("iam"),
            "ec2": scoped_session.client("ec2", region_name=region or "us-east-1"),
        }


def _compact_json(payload: dict[str, Any]) -> str:
    """JSON serialiser that emits stable spacing for trust-policy templates."""
    import json

    return json.dumps(payload, separators=(", ", ": "), sort_keys=True)


__all__ = ["CloudAwsProvider"]
