from __future__ import annotations

import os
from collections.abc import AsyncIterator
from datetime import UTC, datetime
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse

import pytest
import pytest_asyncio
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncConnection, create_async_engine

SOLE_OWNER_CHECK = text(
    """
    SELECT COUNT(*)::int AS c
    FROM "member" m
    WHERE m."userId" = :uid
      AND m."role" = 'owner'
      AND NOT EXISTS (
        SELECT 1 FROM "member" m2
        WHERE m2."organizationId" = m."organizationId"
          AND m2."role" = 'owner'
          AND m2."userId" <> m."userId"
      )
    """
)

TEST_DATABASE_URL = os.getenv("TEST_DATABASE_URL", "")

pytestmark = [
    pytest.mark.integration,
    pytest.mark.skipif(not TEST_DATABASE_URL, reason="TEST_DATABASE_URL not provided"),
]


def _build_async_db_url(raw: str) -> str:
    if raw.startswith("postgresql+asyncpg://"):
        url = raw
    elif raw.startswith("postgresql://"):
        url = raw.replace("postgresql://", "postgresql+asyncpg://", 1)
    elif raw.startswith("postgres://"):
        url = raw.replace("postgres://", "postgresql+asyncpg://", 1)
    else:
        return raw

    parsed = urlparse(url)
    qs = parse_qs(parsed.query, keep_blank_values=True)
    qs.pop("sslmode", None)
    qs.pop("channel_binding", None)
    return urlunparse(parsed._replace(query=urlencode(qs, doseq=True)))


async def _count_sole_owner_orgs(conn: AsyncConnection, user_id: str) -> int:
    result = await conn.execute(SOLE_OWNER_CHECK, {"uid": user_id})
    return int(result.scalar_one())


async def make_user(conn: AsyncConnection, user_id: str, *, email: str | None = None) -> str:
    now = datetime.now(UTC)
    await conn.execute(
        text(
            'INSERT INTO "user" (id, email, "emailVerified", "createdAt", "updatedAt") '
            "VALUES (:id, :email, :verified, :created_at, :updated_at)"
        ),
        {
            "id": user_id,
            "email": email or f"{user_id}@example.com",
            "verified": True,
            "created_at": now,
            "updated_at": now,
        },
    )
    return user_id


async def make_org(conn: AsyncConnection, org_id: str, *, name: str | None = None) -> str:
    await conn.execute(
        text(
            'INSERT INTO organization (id, name, slug, logo, metadata, "createdAt") '
            "VALUES (:id, :name, :slug, NULL, NULL, :created_at)"
        ),
        {
            "id": org_id,
            "name": name or org_id,
            "slug": org_id,
            "created_at": datetime.now(UTC),
        },
    )
    return org_id


async def make_member(
    conn: AsyncConnection,
    member_id: str,
    *,
    user_id: str,
    organization_id: str,
    role: str,
) -> str:
    await conn.execute(
        text(
            'INSERT INTO "member" (id, "userId", "organizationId", role, "createdAt") '
            "VALUES (:id, :user_id, :organization_id, :role, :created_at)"
        ),
        {
            "id": member_id,
            "user_id": user_id,
            "organization_id": organization_id,
            "role": role,
            "created_at": datetime.now(UTC),
        },
    )
    return member_id


@pytest_asyncio.fixture
async def auth_db() -> AsyncIterator[AsyncConnection]:
    engine = create_async_engine(_build_async_db_url(TEST_DATABASE_URL))
    async with engine.connect() as conn:
        await conn.execute(
            text(
                'CREATE TEMP TABLE "user" ('
                "id text PRIMARY KEY, "
                "email text NOT NULL, "
                '"emailVerified" boolean NOT NULL, '
                '"createdAt" timestamptz NOT NULL, '
                '"updatedAt" timestamptz NOT NULL)'
            )
        )
        await conn.execute(
            text(
                "CREATE TEMP TABLE organization ("
                "id text PRIMARY KEY, "
                "name text NOT NULL, "
                "slug text NOT NULL, "
                "logo text NULL, "
                "metadata jsonb NULL, "
                '"createdAt" timestamptz NOT NULL)'
            )
        )
        await conn.execute(
            text(
                'CREATE TEMP TABLE "member" ('
                "id text PRIMARY KEY, "
                '"userId" text NOT NULL REFERENCES "user" (id), '
                '"organizationId" text NOT NULL REFERENCES organization (id), '
                "role text NOT NULL, "
                '"createdAt" timestamptz NOT NULL)'
            )
        )
        yield conn
    await engine.dispose()


@pytest.mark.asyncio
async def test_counts_single_sole_owner_organization(auth_db: AsyncConnection) -> None:
    await make_user(auth_db, "user-1")
    await make_org(auth_db, "org-1")
    await make_member(
        auth_db,
        "member-1",
        user_id="user-1",
        organization_id="org-1",
        role="owner",
    )

    assert await _count_sole_owner_orgs(auth_db, "user-1") == 1


@pytest.mark.asyncio
async def test_counts_multiple_sole_owner_organizations(auth_db: AsyncConnection) -> None:
    await make_user(auth_db, "user-1")
    await make_org(auth_db, "org-1")
    await make_org(auth_db, "org-2")
    await make_member(
        auth_db,
        "member-1",
        user_id="user-1",
        organization_id="org-1",
        role="owner",
    )
    await make_member(
        auth_db,
        "member-2",
        user_id="user-1",
        organization_id="org-2",
        role="owner",
    )

    assert await _count_sole_owner_orgs(auth_db, "user-1") == 2


@pytest.mark.asyncio
async def test_excludes_co_owned_organizations(auth_db: AsyncConnection) -> None:
    await make_user(auth_db, "user-1")
    await make_user(auth_db, "user-2")
    await make_org(auth_db, "org-1")
    await make_member(
        auth_db,
        "member-1",
        user_id="user-1",
        organization_id="org-1",
        role="owner",
    )
    await make_member(
        auth_db,
        "member-2",
        user_id="user-2",
        organization_id="org-1",
        role="owner",
    )

    assert await _count_sole_owner_orgs(auth_db, "user-1") == 0


@pytest.mark.asyncio
async def test_ignores_non_owner_memberships(auth_db: AsyncConnection) -> None:
    await make_user(auth_db, "user-1")
    await make_org(auth_db, "org-1")
    await make_member(
        auth_db,
        "member-1",
        user_id="user-1",
        organization_id="org-1",
        role="admin",
    )

    assert await _count_sole_owner_orgs(auth_db, "user-1") == 0
