"""
Mud-server proxy routes for the Axis Descriptor Lab.
This router owns the ``/api/mud/*`` endpoints that proxy authentication,
session, and world-selection requests to the optional Pipe-Works mud server.
The logic here is intentionally thin: validate the local mode, delegate to
the shared mud client, and translate connection/session failures into stable
HTTP responses for the frontend.
"""
from __future__ import annotations
import logging
import re
from typing import Any
from pathlib import Path
import httpx
from fastapi import APIRouter, HTTPException
from fastapi.responses import JSONResponse
from pipeworks_ipc import compute_payload_hash
from app.config import WORLD_ROOT
from app.mud_server_client import (
MudServerConnectionError,
MudServerFeatureUnavailableError,
MudServerSessionExpiredError,
get_mud_mode_config,
get_mud_client,
set_mud_mode,
)
from app.relabel_policy import resolve_axis_label
from app.schema import (
AxisPayload,
MudCompileImagePromptRequest,
MudImagePolicyBundleResponse,
MudLoginRequest,
MudLoginResponse,
MudModeRequest,
MudModeResponse,
MudPipelineBootstrapResponse,
MudPipelineGenerateConditionAxisRequest,
MudPipelinePolicySource,
MudPipelinePolicySourceReference,
MudPipelineResolveRequest,
MudPipelineResolveResponse,
MudPipelineRuntimeOptions,
MudPipelineSelectedBlocks,
MudSelectWorldRequest,
MudSessionResponse,
)
logger = logging.getLogger(__name__)
router = APIRouter(tags=["mud"])
_LAB_ALLOWED_ROLES = frozenset({"admin", "superuser"})
def _is_lab_authorized_role(role: str | None) -> bool:
"""Return True when the mud-server role is allowed to use the lab UI."""
return role in _LAB_ALLOWED_ROLES
def _pipeline_error(status_code: int, *, detail: str, code: str, stage: str) -> JSONResponse:
"""Return a stable pipeline error payload.
Pipeline-build frontend code needs stable machine-readable error fields so
stage status updates are deterministic and independent from free-text
messages. This helper keeps that contract shape consistent across endpoints.
"""
return JSONResponse(
status_code=status_code,
content={
"detail": detail,
"code": code,
"stage": stage,
},
)
def _normalize_condition_axis_payload(payload: dict[str, Any]) -> dict[str, Any]:
"""Normalize canonical mud-server axis payload shape for lab validation.
Mud-server canonical condition-axis responses now return numeric scalar
values for each axis (``axis_name -> float``). Lab internals still use
``AxisPayload`` with ``AxisValue`` objects (``{label, score}``) because
downstream stage-5/6 requests require labels.
This helper accepts both legacy object axes and canonical numeric axes,
deriving labels from the mirrored relabel policy table when needed.
Args:
payload: Raw upstream mud-server response payload.
Returns:
Payload with normalized ``axes`` entries in ``{label, score}`` shape.
Raises:
ValueError: When axis payload shape or scores are invalid.
"""
axes_payload = payload.get("axes")
if not isinstance(axes_payload, dict):
raise ValueError("Field 'axes' must be an object.")
normalized_axes: dict[str, dict[str, Any]] = {}
for axis_name, axis_value in axes_payload.items():
axis_token = str(axis_name).strip()
if not axis_token:
raise ValueError("Axis name must be a non-empty string.")
if isinstance(axis_value, (int, float)):
score = float(axis_value)
if score < 0.0 or score > 1.0:
raise ValueError(f"Axis '{axis_token}' score must be in [0, 1].")
normalized_axes[axis_token] = {
"label": resolve_axis_label(axis_token, score, axis_token),
"score": score,
}
continue
if isinstance(axis_value, dict):
score_value = axis_value.get("score")
if not isinstance(score_value, (int, float)):
raise ValueError(f"Axis '{axis_token}' object value must include numeric 'score'.")
score = float(score_value)
if score < 0.0 or score > 1.0:
raise ValueError(f"Axis '{axis_token}' score must be in [0, 1].")
label_value = axis_value.get("label")
if isinstance(label_value, str) and label_value.strip():
label = label_value.strip()
else:
label = resolve_axis_label(axis_token, score, axis_token)
normalized_axes[axis_token] = {
"label": label,
"score": score,
}
continue
raise ValueError(f"Axis '{axis_token}' must be numeric or an object with numeric 'score'.")
return {
**payload,
"axes": normalized_axes,
}
def _extract_http_error_detail(exc: httpx.HTTPStatusError) -> str:
"""Return the most useful upstream HTTP error detail string."""
response = exc.response
try:
payload = response.json()
except ValueError:
payload = None
if isinstance(payload, dict):
raw_detail = payload.get("detail")
if isinstance(raw_detail, str) and raw_detail.strip():
return raw_detail.strip()
if raw_detail is not None:
return str(raw_detail)
text = response.text.strip()
if text:
return text[:500]
return f"Upstream mud server returned HTTP {response.status_code}."
def _extract_http_error_code_stage(exc: httpx.HTTPStatusError) -> tuple[str | None, str | None]:
"""Extract structured upstream ``code``/``stage`` fields when available."""
try:
payload = exc.response.json()
except ValueError:
return None, None
if not isinstance(payload, dict):
return None, None
code = payload.get("code")
stage = payload.get("stage")
normalized_code = code.strip() if isinstance(code, str) and code.strip() else None
normalized_stage = stage.strip() if isinstance(stage, str) and stage.strip() else None
return normalized_code, normalized_stage
def _as_string_list(value: object) -> list[str]:
"""Normalize a candidate runtime-options value to a string list."""
if not isinstance(value, list):
return []
normalized: list[str] = []
seen: set[str] = set()
for item in value:
text = str(item).strip()
if text and text not in seen:
normalized.append(text)
seen.add(text)
return normalized
def _nested_dict(parent: dict[str, Any], key: str) -> dict[str, Any]:
"""Return nested dict value by key or an empty dict."""
value = parent.get(key)
return value if isinstance(value, dict) else {}
def _parse_inline_token_list(raw: str) -> list[str]:
"""Parse a ``[a, b, c]`` token list to normalized unique strings."""
text = raw.strip()
if text.startswith("[") and text.endswith("]"):
text = text[1:-1]
tokens = [part.strip() for part in text.split(",")]
normalized: list[str] = []
seen: set[str] = set()
for token in tokens:
value = token.strip().strip("'\"")
if value and value not in seen:
normalized.append(value)
seen.add(value)
return normalized
def _extract_species_from_local_policy_registry(
world_id: str, world_root: Path = WORLD_ROOT
) -> list[str]:
"""Fallback species resolver from mirrored local policy files.
Resolution path:
1. ``<world>/policies/manifest.yaml`` -> ``image.registries.species``
2. species registry entries -> ``compatible_species`` values
"""
world_policies_root = world_root / world_id / "policies"
manifest_path = world_policies_root / "manifest.yaml"
if not manifest_path.is_file():
return []
try:
manifest_text = manifest_path.read_text(encoding="utf-8")
except OSError:
return []
registry_rel_path: str | None = None
for line in manifest_text.splitlines():
match = re.match(r"^\s*species:\s*(\S.*)$", line)
if not match:
continue
candidate = match.group(1).strip().strip("'\"")
if candidate.endswith(".yaml") or candidate.endswith(".yml"):
registry_rel_path = candidate
break
if not registry_rel_path:
return []
registry_path = world_root / world_id / registry_rel_path
if not registry_path.is_file():
return []
try:
registry_text = registry_path.read_text(encoding="utf-8")
except OSError:
return []
species: list[str] = []
seen: set[str] = set()
for line in registry_text.splitlines():
match = re.match(r"^\s*compatible_species:\s*(\[.*\])\s*$", line)
if not match:
continue
for value in _parse_inline_token_list(match.group(1)):
if value and value not in seen:
species.append(value)
seen.add(value)
return species
def _extract_runtime_options(
world_id: str,
world_config: dict[str, Any],
) -> MudPipelineRuntimeOptions:
"""Extract runtime option sets from world config with policy-registry fallback."""
image_generation = _nested_dict(world_config, "image_generation")
runtime_options = _nested_dict(image_generation, "runtime_options")
if not runtime_options:
runtime_options = _nested_dict(world_config, "runtime_options")
species = _as_string_list(runtime_options.get("species"))
if not species:
species = _extract_species_from_local_policy_registry(world_id)
gender = _as_string_list(runtime_options.get("gender"))
world_context_tags = _as_string_list(runtime_options.get("world_context_tags"))
occupation_tags = _as_string_list(
runtime_options.get("occupation_tags") or runtime_options.get("occupation_signals")
)
return MudPipelineRuntimeOptions(
species=species,
gender=gender or ["male", "female"],
world_context_tags=world_context_tags,
occupation_tags=occupation_tags,
)
[docs]
@router.get("/mode", response_model=MudModeResponse, summary="Get runtime chat mode")
def mud_mode() -> MudModeResponse:
"""Return the active runtime chat mode and available mode options."""
return MudModeResponse.model_validate(get_mud_mode_config())
[docs]
@router.post("/mode", response_model=MudModeResponse, summary="Set runtime chat mode")
def mud_set_mode(req: MudModeRequest) -> MudModeResponse:
"""Switch the active chat translation mode without restarting the app."""
try:
return MudModeResponse.model_validate(set_mud_mode(req.mode_key, server_url=req.server_url))
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
[docs]
@router.post("/login", response_model=MudLoginResponse, summary="Proxy login to mud server")
def mud_login(req: MudLoginRequest) -> MudLoginResponse:
"""Proxy login to the mud server and store the session in memory."""
client = get_mud_client()
if client is None:
return MudLoginResponse(
authenticated=False,
message="Offline mode active — switch to a server mode to connect.",
)
try:
data = client.login(req.username, req.password)
except MudServerConnectionError:
return MudLoginResponse(authenticated=False, message="Cannot connect to mud server.")
except httpx.HTTPStatusError as exc:
return MudLoginResponse(
authenticated=False,
message=f"Login failed: {exc.response.status_code}",
)
role = data.get("role")
if data.get("success") and not _is_lab_authorized_role(role):
client.logout()
return MudLoginResponse(
authenticated=False,
role=role,
message="This mud server account is not authorised for the Axis Lab. Admin or superuser access is required.",
)
return MudLoginResponse(
authenticated=data.get("success", False),
role=role,
message=data.get("message"),
)
[docs]
@router.post("/logout", summary="Clear mud server session")
def mud_logout() -> dict:
"""Clear the mud server session from memory."""
client = get_mud_client()
if client is not None:
client.logout()
return {"success": True}
[docs]
@router.get("/session", response_model=MudSessionResponse, summary="Auth status")
def mud_session() -> MudSessionResponse:
"""Return current mud server auth status and translation mode."""
mode = get_mud_mode_config()
client = get_mud_client()
if client is None:
return MudSessionResponse(
authenticated=False,
selected_world_id=None,
mode_key=mode["mode_key"],
translation_mode=mode["translation_mode"],
active_server_url=mode["active_server_url"],
)
status = client.session_status()
return MudSessionResponse(
authenticated=status["authenticated"],
role=status.get("role"),
selected_world_id=status.get("selected_world_id"),
mode_key=mode["mode_key"],
translation_mode=mode["translation_mode"],
active_server_url=mode["active_server_url"],
)
[docs]
@router.get("/worlds", summary="List mud server worlds")
def mud_worlds() -> dict:
"""Proxy to ``GET /api/lab/worlds`` on the mud server."""
client = get_mud_client()
if client is None:
raise HTTPException(status_code=503, detail="Standalone mode — no mud server configured.")
try:
worlds = client.list_worlds()
for world in worlds:
logger.debug("mud_worlds: %r", world)
return {"worlds": worlds}
except MudServerSessionExpiredError:
raise HTTPException(
status_code=401,
detail="Mud server session expired. Please log in again.",
)
except httpx.HTTPStatusError as exc:
if exc.response.status_code == 403:
raise HTTPException(
status_code=403,
detail="This mud server account is not authorised for the Axis Lab.",
) from exc
raise
except MudServerConnectionError:
raise HTTPException(status_code=502, detail="Cannot connect to mud server.")
[docs]
@router.get("/world-config/{world_id}", summary="Get world config")
def mud_world_config(world_id: str) -> dict:
"""Proxy to ``GET /api/lab/world-config/{world_id}`` on the mud server."""
client = get_mud_client()
if client is None:
raise HTTPException(status_code=503, detail="Standalone mode — no mud server configured.")
try:
return client.world_config(world_id)
except MudServerSessionExpiredError:
raise HTTPException(
status_code=401,
detail="Mud server session expired. Please log in again.",
)
except httpx.HTTPStatusError as exc:
raise HTTPException(
status_code=exc.response.status_code,
detail=_extract_http_error_detail(exc),
) from exc
except MudServerConnectionError:
raise HTTPException(status_code=502, detail="Cannot connect to mud server.")
[docs]
@router.get("/world-prompts/{world_id}", summary="Get world prompt templates")
def mud_world_prompts(world_id: str) -> dict:
"""Proxy to ``GET /api/lab/world-prompts/{world_id}`` on the mud server."""
client = get_mud_client()
if client is None:
raise HTTPException(status_code=503, detail="Standalone mode — no mud server configured.")
try:
return client.world_prompts(world_id)
except MudServerSessionExpiredError:
raise HTTPException(
status_code=401,
detail="Mud server session expired. Please log in again.",
)
except httpx.HTTPStatusError as exc:
raise HTTPException(
status_code=exc.response.status_code,
detail=_extract_http_error_detail(exc),
) from exc
except MudServerConnectionError:
raise HTTPException(status_code=502, detail="Cannot connect to mud server.")
[docs]
@router.get(
"/world-image-policy-bundle/{world_id}",
response_model=MudImagePolicyBundleResponse,
summary="Get world image policy bundle",
)
def mud_world_image_policy_bundle(world_id: str) -> MudImagePolicyBundleResponse:
"""Proxy to ``GET /api/lab/world-image-policy-bundle/{world_id}`` on the mud server."""
client = get_mud_client()
if client is None:
raise HTTPException(status_code=503, detail="Standalone mode — no mud server configured.")
try:
payload = client.world_image_policy_bundle(world_id)
return MudImagePolicyBundleResponse.model_validate(payload)
except MudServerSessionExpiredError:
raise HTTPException(
status_code=401,
detail="Mud server session expired. Please log in again.",
)
except MudServerConnectionError:
raise HTTPException(status_code=502, detail="Cannot connect to mud server.")
[docs]
@router.get(
"/pipeline-build/bootstrap/{world_id}",
response_model=MudPipelineBootstrapResponse,
summary="Aggregate canonical pipeline bootstrap metadata for one world",
)
def mud_pipeline_build_bootstrap(world_id: str) -> MudPipelineBootstrapResponse | JSONResponse:
"""Return stage-1/2 metadata in one payload for Pipeline Build.
This endpoint intentionally aggregates multiple mud-server calls so the
frontend can avoid orchestration race conditions across session/world/policy
lookups.
"""
client = get_mud_client()
if client is None:
return _pipeline_error(
503,
detail="Standalone mode — no mud server configured.",
code="PIPELINE_MODE_UNAVAILABLE",
stage="session_world",
)
try:
worlds = client.list_worlds()
world_row = next(
(row for row in worlds if str(row.get("world_id") or "") == world_id),
None,
)
if world_row is None:
return _pipeline_error(
404,
detail=f"World {world_id!r} is not available for the active session.",
code="PIPELINE_WORLD_NOT_FOUND",
stage="session_world",
)
world_config = client.world_config(world_id)
bundle_payload = client.world_image_policy_bundle(world_id)
policy_bundle = MudImagePolicyBundleResponse.model_validate(bundle_payload)
runtime_options = _extract_runtime_options(world_id, world_config)
world_summary = {
"world_row": world_row,
"world_config": world_config,
}
return MudPipelineBootstrapResponse(
world_id=world_id,
world_summary=world_summary,
policy_bundle=policy_bundle,
policy_source=MudPipelinePolicySource(
source_kind="mud_server_canonical",
source_label="Mud server canonical",
source_path=None,
reference=MudPipelinePolicySourceReference(
world_id=world_id,
policy_bundle_id=policy_bundle.policy_bundle_id,
policy_bundle_version=policy_bundle.policy_bundle_version,
policy_hash=policy_bundle.policy_hash,
),
),
runtime_options=runtime_options,
required_fields=list(policy_bundle.required_runtime_inputs),
)
except MudServerSessionExpiredError:
return _pipeline_error(
401,
detail="Mud server session expired. Please log in again.",
code="PIPELINE_AUTH_REQUIRED",
stage="session_world",
)
except MudServerConnectionError:
return _pipeline_error(
502,
detail="Cannot connect to mud server.",
code="PIPELINE_UPSTREAM_UNAVAILABLE",
stage="session_world",
)
except httpx.HTTPStatusError as exc:
status_code = exc.response.status_code
detail = _extract_http_error_detail(exc)
stage = "policy_bundle" if "policy" in detail.lower() else "session_world"
code = (
"PIPELINE_POLICY_UNAVAILABLE"
if stage == "policy_bundle"
else "PIPELINE_WORLD_LOAD_FAILED"
)
return _pipeline_error(status_code, detail=detail, code=code, stage=stage)
except ValueError as exc:
return _pipeline_error(
502,
detail=f"Invalid upstream payload for pipeline bootstrap: {exc}",
code="PIPELINE_UPSTREAM_INVALID",
stage="policy_bundle",
)
[docs]
@router.post(
"/pipeline-build/resolve-image-selection",
response_model=MudPipelineResolveResponse,
summary="Resolve selected policy components for stages 5-7 without prompt output",
)
def mud_pipeline_build_resolve_image_selection(
req: MudPipelineResolveRequest,
) -> MudPipelineResolveResponse | JSONResponse:
"""Return canonical selection metadata and deterministic pre-compile hashes.
The route delegates selection to canonical mud-server compile logic and then
strips final prompt text, returning only stage-5/6/7 metadata for the UI.
"""
client = get_mud_client()
if client is None:
return _pipeline_error(
503,
detail="Standalone mode — no mud server configured.",
code="PIPELINE_MODE_UNAVAILABLE",
stage="session_world",
)
stage = "policy_bundle"
try:
bundle_payload = client.world_image_policy_bundle(req.world_id)
policy_bundle = MudImagePolicyBundleResponse.model_validate(bundle_payload)
stage = "compile_output"
axes_payload = {
axis_name: axis_value.model_dump() for axis_name, axis_value in req.axes.items()
}
compile_payload = client.compile_image_prompt(
world_id=req.world_id,
species=req.species,
gender=req.gender,
axes=axes_payload,
world_context=req.world_context,
occupation_signals=req.occupation_signals,
)
selected_species_block = compile_payload.get("selected_species_block_id")
selected_clothing_slots = compile_payload.get("selected_clothing_slot_ids")
if not isinstance(selected_clothing_slots, dict):
selected_clothing_slots = {}
composition_order = compile_payload.get("composition_order")
if not isinstance(composition_order, list):
composition_order = list(policy_bundle.composition_order)
policy_hash = str(compile_payload.get("policy_hash") or policy_bundle.policy_hash)
axis_hash_raw = compile_payload.get("axis_hash")
if not isinstance(axis_hash_raw, str) or not axis_hash_raw.strip():
return _pipeline_error(
502,
detail="Upstream compile response missing axis_hash.",
code="PIPELINE_UPSTREAM_INVALID",
stage="compile_output",
)
axis_hash = axis_hash_raw.strip()
compiler_input_hash = compute_payload_hash(
{
"world_id": req.world_id,
"identity": {"species": req.species, "gender": req.gender},
"axes": axes_payload,
"world_context": list(req.world_context),
"occupation_signals": list(req.occupation_signals),
"composition_order": composition_order,
"policy_hash": policy_hash,
"selected_blocks": {
"species_canon_block": selected_species_block,
"clothing_block": selected_clothing_slots,
},
"descriptor_layer": compile_payload.get("selected_descriptor_layer_id"),
"tone_profile": compile_payload.get("selected_tone_profile_id"),
}
)
return MudPipelineResolveResponse(
selected_blocks=MudPipelineSelectedBlocks(
species_canon_block=str(selected_species_block) if selected_species_block else None,
clothing_block={key: value for key, value in selected_clothing_slots.items()},
),
descriptor_layer=(
str(compile_payload.get("selected_descriptor_layer_id"))
if compile_payload.get("selected_descriptor_layer_id") is not None
else None
),
tone_profile=(
str(compile_payload.get("selected_tone_profile_id"))
if compile_payload.get("selected_tone_profile_id") is not None
else None
),
composition_order=[str(part) for part in composition_order],
policy_hash=policy_hash,
axis_hash=axis_hash,
compiler_input_hash=compiler_input_hash,
)
except MudServerSessionExpiredError:
return _pipeline_error(
401,
detail="Mud server session expired. Please log in again.",
code="PIPELINE_AUTH_REQUIRED",
stage="session_world",
)
except MudServerConnectionError:
return _pipeline_error(
502,
detail="Cannot connect to mud server.",
code="PIPELINE_UPSTREAM_UNAVAILABLE",
stage=stage,
)
except httpx.HTTPStatusError as exc:
return _pipeline_error(
exc.response.status_code,
detail=_extract_http_error_detail(exc),
code="PIPELINE_UPSTREAM_HTTP_ERROR",
stage=stage,
)
except ValueError as exc:
return _pipeline_error(
502,
detail=f"Invalid upstream payload for pipeline resolve: {exc}",
code="PIPELINE_UPSTREAM_INVALID",
stage=stage,
)
[docs]
@router.post(
"/pipeline-build/generate-condition-axis",
response_model=AxisPayload,
summary="Generate canonical condition-axis payload for Pipeline Build stage 4",
)
def mud_pipeline_build_generate_condition_axis(
req: MudPipelineGenerateConditionAxisRequest,
) -> AxisPayload | JSONResponse:
"""Generate canonical condition-axis payload from mud-server policy rules.
The endpoint keeps Stage 4 canonical in production mode by delegating axis
generation to mud-server APIs rather than local preset/manual editing.
"""
client = get_mud_client()
if client is None:
return _pipeline_error(
503,
detail="Standalone mode — no mud server configured.",
code="PIPELINE_MODE_UNAVAILABLE",
stage="session_world",
)
try:
payload = client.generate_condition_axis_payload(
world_id=req.world_id,
seed=req.seed,
species=req.inputs.entity.species,
gender=req.inputs.entity.identity.gender,
)
normalized_payload = _normalize_condition_axis_payload(payload)
return AxisPayload.model_validate(normalized_payload)
except MudServerSessionExpiredError:
return _pipeline_error(
401,
detail="Mud server session expired. Please log in again.",
code="PIPELINE_AUTH_REQUIRED",
stage="session_world",
)
except MudServerConnectionError:
return _pipeline_error(
502,
detail="Cannot connect to mud server.",
code="PIPELINE_UPSTREAM_UNAVAILABLE",
stage="axis_input",
)
except MudServerFeatureUnavailableError as exc:
return _pipeline_error(
501,
detail=(
"Active mud server does not expose canonical condition-axis generation "
f"for Pipeline Build Stage 4: {exc}"
),
code="PIPELINE_UPSTREAM_UNSUPPORTED",
stage="axis_input",
)
except httpx.HTTPStatusError as exc:
upstream_code, upstream_stage = _extract_http_error_code_stage(exc)
if isinstance(upstream_code, str) and upstream_code.startswith("CONDITION_AXIS_"):
return _pipeline_error(
exc.response.status_code,
detail=_extract_http_error_detail(exc),
code=upstream_code,
stage=upstream_stage or "axis_input",
)
return _pipeline_error(
exc.response.status_code,
detail=_extract_http_error_detail(exc),
code="PIPELINE_UPSTREAM_HTTP_ERROR",
stage="axis_input",
)
except ValueError as exc:
return _pipeline_error(
502,
detail=f"Invalid upstream payload for axis generation: {exc}",
code="PIPELINE_UPSTREAM_INVALID",
stage="axis_input",
)
[docs]
@router.post("/compile-image-prompt", summary="Compile canonical image prompt via mud server")
def mud_compile_image_prompt(req: MudCompileImagePromptRequest) -> dict:
"""Proxy ``POST /api/lab/compile-image-prompt`` using the active mud session."""
client = get_mud_client()
if client is None:
raise HTTPException(status_code=503, detail="Standalone mode — no mud server configured.")
try:
axes_payload = {
axis_name: axis_value.model_dump() for axis_name, axis_value in req.axes.items()
}
return client.compile_image_prompt(
world_id=req.world_id,
species=req.species,
gender=req.gender,
axes=axes_payload,
world_context=req.world_context,
occupation_signals=req.occupation_signals,
model_id=req.model_id,
aspect_ratio=req.aspect_ratio,
seed=req.seed,
)
except MudServerSessionExpiredError:
raise HTTPException(
status_code=401,
detail="Mud server session expired. Please log in again.",
)
except httpx.HTTPStatusError as exc:
raise HTTPException(
status_code=exc.response.status_code, detail=_extract_http_error_detail(exc)
) from exc
except MudServerConnectionError:
raise HTTPException(status_code=502, detail="Cannot connect to mud server.")
[docs]
@router.post("/select-world", summary="Select world for translation")
def mud_select_world(req: MudSelectWorldRequest) -> dict:
"""Store the selected world_id in the MudServerClient's memory."""
client = get_mud_client()
if client is None:
raise HTTPException(status_code=503, detail="Standalone mode — no mud server configured.")
client.select_world(req.world_id)
return {"success": True, "world_id": req.world_id}