Source code for app.chat_renderer

"""
app/chat_renderer.py
-----------------------------------------------------------------------------
Unified synchronous HTTP client for the Ollama API.

Provides two interfaces:
  - :meth:`ChatRenderer.render` — fire-and-forget chat call that swallows
    errors and returns ``None`` on any failure (used by the Chat Translation
    page, matching mud_server behaviour).
  - :meth:`ChatRenderer.generate` — same transport but raises on failure
    (used by the Character Description page, where the route handler maps
    each exception to an HTTPException).
  - :meth:`ChatRenderer.list_models` — static helper that queries
    ``/api/tags`` and returns a sorted list of pulled model names.

Both generation methods use Ollama's ``/api/chat`` endpoint with the
OpenAI-compatible messages array (system + user roles), which is what the
production MUD translation layer also uses.  This ensures that any
model-behaviour differences between ``/api/generate`` (flat prompt) and
``/api/chat`` (messages) are visible during lab testing.

Connection pooling
------------------
HTTP connections are reused across requests via a module-level client pool
keyed by ``(host, connect_timeout, read_timeout)``.  This avoids the
overhead of a fresh TCP handshake + TLS negotiation on every call and
prevents cold-start failures when Character B fires immediately after
Character A.  Call :func:`close_all_clients` at shutdown to release
pooled connections cleanly.

Sync rationale
--------------
The lab's route handlers are synchronous (FastAPI runs them in a
thread-pool executor), so a blocking httpx call here does not stall the
async event loop.  Using an async client would require ``asyncio.run()`` or
restructuring the handler, neither of which is worth the complexity for a
single-user tool.

Request structure sent to Ollama
---------------------------------
.. code-block:: json

    {
      "model": "<model-tag>",
      "stream": false,
      "keep_alive": "5m",
      "messages": [
        {"role": "system", "content": "<rendered system prompt>"},
        {"role": "user",   "content": "<ooc message>"}
      ],
      "options": {
        "temperature": <float>,
        "num_predict": <int>,
        "seed": <int>  // only when seed is not None
      }
    }

The ``stream: false`` flag is required to get a single JSON response body
rather than a series of newline-delimited chunks.

The ``keep_alive`` field tells Ollama how long to keep the model loaded in
memory after responding (default ``"5m"``).  This prevents cold-start
latency on back-to-back requests (e.g. Character A then Character B).

Environment variables
---------------------
OLLAMA_HOST – Base URL of the Ollama server (default: http://localhost:11434).
              Read once at import time so the value is consistent for the
              lifetime of the process.
"""

from __future__ import annotations

import logging
import os
import threading

import httpx
from dotenv import load_dotenv

load_dotenv()

logger = logging.getLogger(__name__)

# Strip any trailing slash so we can safely append paths.
# Exported so main.py can pass the default to the template.
OLLAMA_HOST: str = os.getenv("OLLAMA_HOST", "http://localhost:11434").rstrip("/")

# ── Shared client pool ────────────────────────────────────────────────────────
# Keyed by (host, connect_timeout, read_timeout) so that requests to the same
# Ollama instance reuse TCP connections (HTTP Keep-Alive).  Thread-safe via a
# simple lock — contention is negligible for a single-user tool.

_client_pool: dict[tuple[str, float, float], httpx.Client] = {}
_pool_lock = threading.Lock()


def _get_client(host: str, timeout: httpx.Timeout) -> httpx.Client:
    """Return a shared ``httpx.Client`` for the given host and timeout.

    Creates a new client on first access; subsequent calls with the same
    parameters return the cached instance.
    """
    key = (host, timeout.connect or 10.0, timeout.read or 120.0)
    with _pool_lock:
        client = _client_pool.get(key)
        if client is None:
            client = httpx.Client(timeout=timeout)
            _client_pool[key] = client
        return client


[docs] def close_all_clients() -> None: """Close all pooled HTTP clients and clear the pool. Call this during application shutdown to release TCP connections cleanly. """ with _pool_lock: for client in _client_pool.values(): try: client.close() except Exception: # nosec B110 — best-effort shutdown cleanup pass _client_pool.clear()
[docs] class ChatRenderer: """Synchronous Ollama client that calls the ``/api/chat`` endpoint. Requests reuse a shared ``httpx.Client`` from a module-level pool keyed by ``(host, connect_timeout, read_timeout)``. This enables HTTP Keep-Alive across calls and avoids cold-start latency when multiple requests target the same Ollama instance in quick succession. Args: host: Ollama server base URL, e.g. ``'http://localhost:11434'``. A trailing slash is stripped automatically. ``/api/chat`` is appended internally. model: Ollama model tag, e.g. ``'gemma2:2b'``. Must match a model that has been pulled in Ollama. timeout_seconds: HTTP *read* timeout in seconds. Applies to waiting for the model to finish generating. Defaults to 120 s to accommodate slow hardware or large models. The *connect* timeout is always 10 s. temperature: Sampling temperature forwarded to Ollama's ``options.temperature``. 0.0 is deterministic (greedy decoding); higher values add randomness. seed: Optional integer forwarded to Ollama's ``options.seed``. When provided, Ollama uses this as the random seed for token sampling, which makes the output reproducible for the same input. When ``None``, the ``seed`` key is omitted from the options object and Ollama chooses its own seed. max_tokens: ``num_predict`` ceiling for the generation. Ollama stops after this many tokens even if the model would continue. keep_alive: Duration string telling Ollama how long to keep the model loaded in memory after responding (e.g. ``"5m"``, ``"1h"``, ``"0"`` to unload immediately). Defaults to ``"5m"`` to prevent cold-start latency on back-to-back requests. """
[docs] def __init__( self, *, host: str, model: str, timeout_seconds: float = 120.0, temperature: float = 0.7, seed: int | None = None, max_tokens: int = 128, keep_alive: str = "5m", ) -> None: self._host = host.rstrip("/") self._endpoint = f"{self._host}/api/chat" self._model = model # httpx.Timeout(default, connect=...) sets read/write/pool to `default` # while overriding just the connect timeout. self._timeout = httpx.Timeout(timeout_seconds, connect=10.0) self._temperature = temperature self._seed = seed self._max_tokens = max_tokens self._keep_alive = keep_alive
def _build_payload(self, system_prompt: str, user_message: str) -> dict: options: dict = { "temperature": self._temperature, "num_predict": self._max_tokens, } # Only include seed in options when explicitly provided — Ollama's # default behaviour (random seed) is preserved when seed is None. if self._seed is not None: options["seed"] = self._seed return { "model": self._model, "stream": False, # single JSON response, not a stream of chunks "keep_alive": self._keep_alive, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_message}, ], "options": options, }
[docs] def render(self, system_prompt: str, user_message: str) -> str | None: """POST to Ollama /api/chat and return the raw response content. Builds the request payload, sends it to ``self._endpoint``, and extracts the model's response from ``data["message"]["content"]``. The ``system_prompt`` and ``user_message`` are sent as separate entries in the ``messages`` array using the ``"system"`` and ``"user"`` roles respectively. This matches the format used by the production MUD translation layer. No content-level validation is performed here; that is handled downstream by :class:`~app.output_validator.OutputValidator`. Args: system_prompt: Fully-rendered system prompt text. All ``{{placeholder}}`` variables should already have been substituted before this call. user_message: The OOC message (user turn). Sent verbatim as the ``"user"`` role message. Returns: The stripped ``message.content`` string on success, or ``None`` on any of the following failure conditions: - **TimeoutException**: Ollama took longer than ``timeout_seconds`` to respond. - **ConnectError**: Ollama is not reachable at the configured endpoint (wrong host, not running, firewall). - **Any other exception**: Unexpected HTTP or JSON parsing error. All failure paths log a warning/error via the module logger. ``None`` return causes the endpoint to report ``"fallback.api_error"`` in the translation result. """ payload = self._build_payload(system_prompt, user_message) client = _get_client(self._host, self._timeout) try: response = client.post(self._endpoint, json=payload) response.raise_for_status() data = response.json() # Ollama /api/chat response shape: # {"model": ..., "message": {"role": "assistant", "content": "..."}, ...} content = data.get("message", {}).get("content", "").strip() return content or None except httpx.TimeoutException: logger.warning( "ChatRenderer: request timed out (endpoint=%s, read_timeout=%.0fs)", self._endpoint, self._timeout.read, ) return None except httpx.ConnectError: logger.warning("ChatRenderer: cannot connect to Ollama at %s", self._endpoint) return None except Exception as exc: logger.error("ChatRenderer: request failed: %s", exc) return None
[docs] def generate(self, system_prompt: str, user_message: str) -> tuple[str, dict]: """POST to /api/chat; return (text, usage). Raises on any failure. Same payload structure as :meth:`render`, but exceptions propagate to the caller instead of being caught. This matches the contract expected by the ``/api/generate`` route handler, which maps each exception type to an HTTPException. Args: system_prompt: Fully-rendered system prompt text. user_message: The user turn (axis JSON string for description generation, or OOC message for chat). Returns: tuple[str, dict]: - ``str`` — Stripped ``message.content``. - ``dict`` — ``{"prompt_eval_count": int|None, "eval_count": int|None}`` Raises: httpx.HTTPStatusError: Non-2xx response from Ollama. httpx.TimeoutException: Request timed out. ValueError: Response is missing the ``"message"`` key. """ payload = self._build_payload(system_prompt, user_message) client = _get_client(self._host, self._timeout) response = client.post(self._endpoint, json=payload) response.raise_for_status() data = response.json() if "message" not in data: raise ValueError( f"Ollama /api/chat response for model '{self._model}' is missing " f"the 'message' key. Got keys: {list(data.keys())}" ) text = data["message"].get("content", "").strip() usage = { "prompt_eval_count": data.get("prompt_eval_count"), "eval_count": data.get("eval_count"), } return text, usage
[docs] @staticmethod def list_models(host: str | None = None) -> list[str]: """Sorted model names from /api/tags. Returns [] on any error. Args: host: Optional Ollama server base URL. When ``None``, the module-level :data:`OLLAMA_HOST` constant is used. Returns: Sorted list of model name strings, e.g. ``["gemma2:2b", "llama3.2:1b"]``. Returns an empty list if Ollama is unreachable or returns an error, allowing the frontend to degrade gracefully. """ base = host.rstrip("/") if host else OLLAMA_HOST url = f"{base}/api/tags" timeout = httpx.Timeout(connect=3.0, read=5.0, write=3.0, pool=3.0) try: with httpx.Client(timeout=timeout) as client: response = client.get(url) response.raise_for_status() data = response.json() names = [m["name"] for m in data.get("models", []) if "name" in m] return sorted(names) except Exception as exc: logger.warning("Failed to list Ollama models: %s: %s", type(exc).__name__, exc) return []