"""AuthVerificationWorkflow + InvitationLifecycleWorkflow.

Two layers of coverage:

1. **Registration contract** — the module-level workflows expose the
   handlers BA's hooks need (``mark_verified`` / ``mark_accepted``) and are
   wired into ``restate_services/main.py``.

2. **Lifecycle branch coverage** — the workflow body is driven through a
   hand-rolled ``WorkflowContext`` fake. Each scenario (verified-fast,
   recheck-first, verified-after-reminder, abandoned for verification;
   accepted-fast, terminal-short-circuit, after-reminder, expired for
   invitation) is forced by parameterizing the ``restate.select`` outcomes
   and the DB recheck return values. Side-effects are intercepted via the
   ``ctx.run`` callback — we assert the *order* of action names, never call
   the real DB / network helpers.

Integration tests against a live Neon branch (the cleanup SQL, idempotent
mark_verified resolve, etc.) are gated separately on TEST_DATABASE_URL the
same way ``test_orphan_team_cleanup.py`` is.
"""

from __future__ import annotations

import os
import sys
from contextlib import contextmanager
from datetime import timedelta
from pathlib import Path
from typing import Any
from unittest.mock import patch

import pytest

# auth_workflows lives in restate_services/. Make it importable.
_ROOT = Path(__file__).resolve().parents[2]
_RESTATE_DIR = _ROOT / "restate_services"
if str(_RESTATE_DIR) not in sys.path:
    sys.path.insert(0, str(_RESTATE_DIR))

# db.py builds a SQLAlchemy engine at import time; needs a parseable DSN.
os.environ.setdefault(
    "NEON_DATABASE_URL",
    "postgresql://user:pass@localhost:5432/testdb",
)

import auth_workflows  # noqa: E402  # ty: ignore[unresolved-import]

# ── Fake WorkflowContext ───────────────────────────────────────────────


class _FakePromise:
    """A durable-promise stand-in. Holds resolved state for ``peek``/``resolve``
    idempotency checks."""

    def __init__(self) -> None:
        self.resolved: bool | None = None

    def value(self) -> tuple[str, _FakePromise]:
        return ("PROMISE_VALUE", self)

    async def peek(self) -> bool | None:
        return self.resolved

    async def resolve(self, v: bool) -> None:
        self.resolved = v


class _FakeWorkflowContext:
    """Drives ``auth_workflows`` main bodies in unit tests.

    Parameterized by:
      * ``verified_db_states`` — consumed left-to-right by
        ``recheck_first``/``recheck_final`` ctx.run calls.
      * ``invitation_db_states`` — consumed by ``recheck_status_1``/
        ``recheck_status_2`` ctx.run calls.

    Records every ``ctx.run`` invocation by name in ``run_calls`` and every
    ``ctx.sleep`` duration in ``sleeps``.
    """

    def __init__(
        self,
        key: str,
        *,
        verified_db_states: list[bool] | None = None,
        invitation_db_states: list[str | None] | None = None,
    ) -> None:
        self._key = key
        self._promise = _FakePromise()
        self.run_calls: list[str] = []
        self.sleeps: list[timedelta] = []
        self._verified_db: list[bool] = list(verified_db_states or [])
        self._invitation_db: list[str | None] = list(invitation_db_states or [])

    def key(self) -> str:
        return self._key

    def promise(self, _name: str, *, type_hint: Any = None) -> _FakePromise:
        return self._promise

    def sleep(self, delta: timedelta) -> tuple[str, timedelta]:
        self.sleeps.append(delta)
        return ("SLEEP", delta)

    async def run(self, name: str, fn: Any) -> Any:
        self.run_calls.append(name)
        if name in ("recheck_first", "recheck_final"):
            return self._verified_db.pop(0)
        if name in ("recheck_status_1", "recheck_status_2"):
            return self._invitation_db.pop(0)
        if name == "send_reminder":
            return None
        if name == "hard_cleanup":
            return {"deleted": 1, "reason": "abandoned"}
        # Fallthrough: actually invoke the lambda (defensive — should not happen).
        result = fn()
        if hasattr(result, "__await__"):
            return await result
        return result


@contextmanager
def _patched_select(branches: list[str]) -> Any:
    """Replace ``restate.select`` with a fake that yields the next outcome
    from ``branches`` per call. Each branch is the winning kwarg name —
    ``"verified"`` / ``"accepted"`` (promise wins) or ``"timeout"`` (sleep wins).
    """
    queue = list(branches)

    async def fake_select(**_kws: Any) -> list[Any]:
        assert queue, "fake_select called more than the branches list allowed"
        return [queue.pop(0), None]

    with patch("auth_workflows.restate.select", side_effect=fake_select):
        yield


# ── AuthVerificationWorkflow ───────────────────────────────────────────


@pytest.mark.asyncio
async def test_verify_email_fast_path() -> None:
    """User clicks the link inside the first 48h — return immediately."""
    ctx = _FakeWorkflowContext("user-fast")
    with _patched_select(["verified"]):
        result = await auth_workflows.verify_email(
            ctx, {"email": "fast@example.com", "name": "Fast"}
        )
    assert result == {"status": "verified", "phase": "fast"}
    assert ctx.run_calls == []
    assert ctx.sleeps == [auth_workflows.VERIFY_FIRST_NUDGE]


@pytest.mark.asyncio
async def test_verify_email_recheck_first_path() -> None:
    """Promise was never resolved but the DB shows verified — race recovery."""
    ctx = _FakeWorkflowContext("user-race", verified_db_states=[True])
    with _patched_select(["timeout"]):
        result = await auth_workflows.verify_email(
            ctx, {"email": "race@example.com", "name": "Race"}
        )
    assert result == {"status": "verified", "phase": "recheck_first"}
    assert ctx.run_calls == ["recheck_first"]


@pytest.mark.asyncio
async def test_verify_email_after_reminder_path() -> None:
    """Reminder sent at +48h, user verifies before +7d total."""
    ctx = _FakeWorkflowContext("user-reminded", verified_db_states=[False])
    with _patched_select(["timeout", "verified"]):
        result = await auth_workflows.verify_email(
            ctx, {"email": "remind@example.com", "name": "Reminded"}
        )
    assert result == {"status": "verified", "phase": "after_reminder"}
    assert ctx.run_calls == ["recheck_first", "send_reminder"]
    assert ctx.sleeps == [
        auth_workflows.VERIFY_FIRST_NUDGE,
        auth_workflows.VERIFY_SECOND_NUDGE,
    ]


@pytest.mark.asyncio
async def test_verify_email_abandoned_path() -> None:
    """7 days elapsed, still unverified — hard cleanup fires."""
    ctx = _FakeWorkflowContext("user-abandoned", verified_db_states=[False, False])
    with _patched_select(["timeout", "timeout"]):
        result = await auth_workflows.verify_email(
            ctx, {"email": "gone@example.com", "name": "Gone"}
        )
    assert result["status"] == "abandoned"
    assert ctx.run_calls == [
        "recheck_first",
        "send_reminder",
        "recheck_final",
        "hard_cleanup",
    ]


@pytest.mark.asyncio
async def test_verify_email_recheck_final_path() -> None:
    """Promise unresolved past both nudges, but final DB check finds verified."""
    ctx = _FakeWorkflowContext("user-race-final", verified_db_states=[False, True])
    with _patched_select(["timeout", "timeout"]):
        result = await auth_workflows.verify_email(
            ctx, {"email": "racefinal@example.com", "name": "RaceFinal"}
        )
    assert result == {"status": "verified", "phase": "recheck_final"}
    # No hard_cleanup must fire when the recheck saves us.
    assert "hard_cleanup" not in ctx.run_calls


@pytest.mark.asyncio
async def test_verify_email_rejects_missing_payload() -> None:
    from restate.exceptions import TerminalError

    ctx = _FakeWorkflowContext("user-bad")
    with pytest.raises(TerminalError):
        await auth_workflows.verify_email(ctx, {"email": ""})


@pytest.mark.asyncio
async def test_mark_verified_resolves_when_unresolved() -> None:
    ctx = _FakeWorkflowContext("user-resolve")
    result = await auth_workflows.mark_verified(ctx, {})
    assert result["status"] == "ok"
    assert ctx._promise.resolved is True


@pytest.mark.asyncio
async def test_mark_verified_is_idempotent() -> None:
    ctx = _FakeWorkflowContext("user-twice")
    ctx._promise.resolved = True  # prior resolve
    result = await auth_workflows.mark_verified(ctx, {})
    assert result["status"] == "noop"
    assert result.get("already_resolved") is True


# ── InvitationLifecycleWorkflow ─────────────────────────────────────────


@pytest.mark.asyncio
async def test_invitation_accepted_fast() -> None:
    ctx = _FakeWorkflowContext("inv-fast")
    with _patched_select(["accepted"]):
        result = await auth_workflows.lifecycle(
            ctx,
            {
                "email": "fast@example.com",
                "expiresInSeconds": int(timedelta(hours=48).total_seconds()),
            },
        )
    assert result == {"status": "accepted", "phase": "fast"}


@pytest.mark.asyncio
async def test_invitation_clamps_to_max_wait() -> None:
    """A misconfigured huge ``expiresInSeconds`` MUST be clamped."""
    ctx = _FakeWorkflowContext("inv-clamp")
    with _patched_select(["accepted"]):
        await auth_workflows.lifecycle(
            ctx,
            {
                "email": "huge@example.com",
                "expiresInSeconds": int(timedelta(days=365).total_seconds()),
            },
        )
    # First sleep is half-expiry; total can never exceed INVITE_MAX_WAIT.
    assert ctx.sleeps[0] <= auth_workflows.INVITE_MAX_WAIT


@pytest.mark.asyncio
async def test_invitation_terminal_status_short_circuits() -> None:
    """Cancellation between phase 1 timeout and reminder dispatch short-circuits."""
    ctx = _FakeWorkflowContext("inv-cancelled", invitation_db_states=["cancelled"])
    with _patched_select(["timeout"]):
        result = await auth_workflows.lifecycle(ctx, {"email": "x@y.z", "expiresInSeconds": 3600})
    assert result["status"] == "terminal"
    assert result["last_status"] == "cancelled"
    # No reminder was sent because we short-circuited.
    assert "send_reminder" not in ctx.run_calls


@pytest.mark.asyncio
async def test_invitation_expired_path() -> None:
    ctx = _FakeWorkflowContext("inv-expired", invitation_db_states=["pending", "pending"])
    with _patched_select(["timeout", "timeout"]):
        result = await auth_workflows.lifecycle(
            ctx,
            {
                "email": "exp@example.com",
                "expiresInSeconds": 3600,
                "acceptUrl": "https://app.dineo/accept/abc",
            },
        )
    assert result["status"] == "expired"
    assert ctx.run_calls == [
        "recheck_status_1",
        "send_reminder",
        "recheck_status_2",
    ]


@pytest.mark.asyncio
async def test_invitation_accepted_after_reminder() -> None:
    ctx = _FakeWorkflowContext("inv-after", invitation_db_states=["pending"])
    with _patched_select(["timeout", "accepted"]):
        result = await auth_workflows.lifecycle(
            ctx,
            {
                "email": "after@example.com",
                "expiresInSeconds": 3600,
                "acceptUrl": "https://app.dineo/accept/abc",
            },
        )
    assert result == {"status": "accepted", "phase": "after_reminder"}


@pytest.mark.asyncio
async def test_mark_accepted_resolves_when_unresolved() -> None:
    ctx = _FakeWorkflowContext("inv-resolve")
    result = await auth_workflows.mark_invitation_accepted(ctx, {})
    assert result["status"] == "ok"
    assert ctx._promise.resolved is True


@pytest.mark.asyncio
async def test_mark_accepted_is_idempotent() -> None:
    ctx = _FakeWorkflowContext("inv-twice")
    ctx._promise.resolved = True
    result = await auth_workflows.mark_invitation_accepted(ctx, {})
    assert result["status"] == "noop"


# ── Registration / contract ─────────────────────────────────────────────


def test_workflows_expose_required_handlers() -> None:
    av = auth_workflows.auth_verification_workflow
    inv = auth_workflows.invitation_lifecycle_workflow
    # Each workflow exposes exactly two handlers: the main + the resolver.
    assert "verify_email" in av.handlers
    assert "mark_verified" in av.handlers
    assert "lifecycle" in inv.handlers
    assert "mark_accepted" in inv.handlers


def test_workflows_registered_in_restate_main() -> None:
    """The main.py services list MUST include both workflow handles."""
    main_text = (_RESTATE_DIR / "main.py").read_text(encoding="utf-8")
    assert "auth_verification_workflow," in main_text
    assert "invitation_lifecycle_workflow," in main_text
    # And the import line is present.
    assert (
        "from auth_workflows import auth_verification_workflow, invitation_lifecycle_workflow"
    ) in main_text


def test_timeouts_have_hard_upper_bounds() -> None:
    """Tunables MUST live as named constants — no scattered magic numbers."""
    assert timedelta(hours=48) == auth_workflows.VERIFY_FIRST_NUDGE
    assert timedelta(days=5) == auth_workflows.VERIFY_SECOND_NUDGE
    assert timedelta(days=7) == auth_workflows.VERIFY_ABANDON_AFTER
    assert timedelta(hours=48) == auth_workflows.INVITE_DEFAULT_EXPIRY
    assert timedelta(days=7) == auth_workflows.INVITE_MAX_WAIT


def test_promise_names_match_ba_resolver_expectations() -> None:
    """The promise names BA's hooks resolve MUST match the constants."""
    assert auth_workflows.PROMISE_VERIFIED == "verified"
    assert auth_workflows.PROMISE_INVITE_ACCEPTED == "accepted"
