Source code for gpp_client.rest.client

"""
REST API client for non-GraphQL requests.
"""

__all__ = ["RESTClient"]

import asyncio
import gzip
import logging
import ssl

import aiohttp
import certifi

logger = logging.getLogger(__name__)


[docs] class RESTClient: """ REST API client to non-GraphQL requests that help with the function of managers and coordinators. Parameters ---------- base_url : str Base URL of the REST API. Derived from GPPClient base_url. gpp_token : str GPP token to authenticate against the REST API. Same as GPPClient. timeout : float Timeout for REST API requests in seconds. """ _DEFAULT_TIMEOUT = 30.0 # Seconds. def __init__( self, base_url: str, gpp_token: str, timeout: float = _DEFAULT_TIMEOUT ) -> None: self.base_url = base_url self.gpp_token = gpp_token self._timeout = timeout self._session: aiohttp.ClientSession | None = None self._lock = asyncio.Lock() def _resolve_headers(self) -> dict[str, str]: """ Resolve the headers for the REST API requests. Returns ------- dict[str, str] Headers dictionary. """ headers = { "Content-Type": "text/plain", "Authorization": f"Bearer {self.gpp_token}", } return headers def _create_session(self) -> aiohttp.ClientSession: """ Create a new aiohttp client session. Returns ------- aiohttp.ClientSession Configured aiohttp client session. """ ssl_context = ssl.create_default_context(cafile=certifi.where()) connector = aiohttp.TCPConnector(ssl=ssl_context) return aiohttp.ClientSession( base_url=self.base_url, timeout=aiohttp.ClientTimeout(total=self._timeout), connector=connector, headers=self._resolve_headers(), )
[docs] async def get_session(self) -> aiohttp.ClientSession: """ Get or create an ``aiohttp.ClientSession`` for making REST API requests. Returns ------- aiohttp.ClientSession The aiohttp ClientSession instance. """ async with self._lock: if self._session is None or self._session.closed: self._session = self._create_session() return self._session
[docs] async def close(self) -> None: """ Close the session if it exists and is not already closed. """ if self._session and not self._session.closed: await self._session.close()
async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() async def _get_atom_digests( self, observation_ids: list[str], accept_gzip: bool = True ) -> str: """ Request atom digests for the given observation IDs. """ headers = {} if accept_gzip: headers["Accept-Encoding"] = "gzip" # Prepare body - one observation ID per line. body = "\n".join(observation_ids) session = await self.get_session() async with session.post( "/scheduler/atoms", data=body, headers=headers ) as response: # Handle different response codes if response.status == 400: error_text = await response.text() raise ValueError(f"Invalid observation IDs: {error_text}") elif response.status == 403: raise aiohttp.ClientResponseError( request_info=response.request_info, history=response.history, status=response.status, message="Access forbidden - check authentication and permissions", ) elif response.status != 200: response.raise_for_status() # Handle gzipped response content_encoding = response.headers.get("Content-Encoding", "").lower() if content_encoding == "gzip": try: content = await response.read() return gzip.decompress(content).decode("utf-8") except gzip.BadGzipFile: # Server claimed gzip but sent plain text. return await response.text() else: return await response.text()