"""
Module for managing observation workflow states in the GPP client.
"""
__all__ = ["WorkflowStateDomain"]
import asyncio
import logging
from typing import Any
from gpp_client.domains.base import BaseDomain
from gpp_client.exceptions import GPPClientError, GPPRetryableError, GPPValidationError
from gpp_client.generated.enums import CalculationState, ObservationWorkflowState
from gpp_client.generated.get_observation_workflow_state_by_id import (
GetObservationWorkflowStateById,
)
from gpp_client.generated.get_observation_workflow_state_by_reference import (
GetObservationWorkflowStateByReference,
)
logger = logging.getLogger(__name__)
[docs]
class WorkflowStateDomain(BaseDomain):
"""
Domain for managing observation workflow states.
"""
[docs]
async def get_by_id(
self,
observation_id: str,
) -> GetObservationWorkflowStateById:
"""
Get workflow state details for an observation by ID.
Parameters
----------
observation_id : str
The observation ID.
Returns
-------
GetObservationWorkflowStateById
The generated GraphQL response model.
"""
return await self._graphql.get_observation_workflow_state_by_id(
observation_id=observation_id
)
[docs]
async def get_by_reference(
self,
observation_reference: str,
) -> GetObservationWorkflowStateByReference:
"""
Get workflow state details for an observation by reference.
Parameters
----------
observation_reference : str
The observation reference label.
Returns
-------
GetObservationWorkflowStateByReference
The generated GraphQL response model.
"""
return await self._graphql.get_observation_workflow_state_by_reference(
observation_reference=observation_reference
)
[docs]
async def update_by_id(
self,
observation_id: str,
*,
workflow_state: ObservationWorkflowState,
) -> dict[str, Any]:
"""
Update the workflow state of an observation by its ID, or return the current
workflow if already set.
This function will:
- Fetch the current observation and its workflow.
- If the calculation state is not ``READY``, raise an error to retry later.
- If the desired state is already set, return the workflow as-is.
- Otherwise, validate the requested workflow state against
``validTransitions``.
- If valid, submit the mutation to update the workflow state.
Parameters
----------
observation_id : str
The observation ID.
workflow_state : ObservationWorkflowState
The desired workflow state to transition to.
Returns
-------
dict[str, Any]
The returned workflow state for the observation.
Raises
------
GPPClientError
If there are general client-side errors.
GPPValidationError
If the requested workflow state transition is invalid.
GPPRetryableError
If the observation calculation is not ``READY``.
"""
logger.debug(
"Updating workflow state for observation ID %s to %s",
observation_id,
workflow_state.value,
)
result = await self.get_by_id(observation_id=observation_id)
workflow = result["workflow"]
# If calculation is not 'READY', raise an error to retry later.
try:
_check_ready(workflow)
except RuntimeError as exc:
self.raise_error(GPPRetryableError, exc)
# If the desired state is already set, return as-is.
if _check_already_set(workflow, workflow_state):
# Return the same shape as other return paths.
logger.debug(
"Workflow state for observation ID %s is already %s; no update needed.",
observation_id,
workflow_state.value,
)
return workflow["value"]
# Validate the requested workflow state against 'validTransitions'.
try:
_check_valid_transition(workflow, workflow_state)
except ValueError as exc:
self.raise_error(GPPValidationError, exc)
return await self._graphql.set_observation_workflow_state(
observation_id=observation_id,
state=workflow_state,
)
[docs]
async def update_by_id_with_retry(
self,
observation_id: str,
*,
workflow_state: ObservationWorkflowState,
max_attempts: int = 10,
initial_delay: float = 0.0,
retry_delay: float = 1.0,
) -> dict[str, Any]:
"""
Update the workflow state of an observation by its ID, retrying if the
observation is not ready.
This function wraps ``update_by_id`` with retry logic to handle cases where
the observation calculation is not yet in the ``READY`` state.
Parameters
----------
observation_id : str
The observation ID.
workflow_state : ObservationWorkflowState
The desired workflow state to transition to.
max_attempts : int, default=10
Maximum number of retry attempts.
initial_delay : float, default=0.0
Initial delay in seconds before first attempt.
retry_delay : float, default=1.0
Delay in seconds between retry attempts.
Returns
-------
dict[str, Any]
The returned workflow state for the observation.
Raises
------
GPPClientError
If the maximum number of retry attempts is exceeded without success.
GPPValidationError
If the requested workflow state transition is invalid.
"""
logger.debug(
"Updating workflow state for observation ID %s to %s with up to %d retries",
observation_id,
workflow_state.value,
max_attempts,
)
logger.debug("Initial delay before first attempt: %.1f seconds", initial_delay)
await asyncio.sleep(initial_delay)
for attempt in range(1, max_attempts + 1):
try:
logger.debug(
"Attempt %d/%d: Updating workflow state for observation ID %s to %s",
attempt,
max_attempts,
observation_id,
workflow_state.value,
)
result = await self.update_by_id(
observation_id=observation_id,
workflow_state=workflow_state,
)
return result
except GPPRetryableError:
# This is the only retryable case: calculation state not READY.
if attempt < max_attempts:
await asyncio.sleep(retry_delay)
except (GPPValidationError, GPPClientError) as exc:
self.raise_error(type(exc), exc)
exc = GPPClientError("Failed to set workflow state after multiple retries.")
self.raise_error(type(exc), exc)
def _check_ready(workflow: dict[str, Any]) -> None:
"""
Raise an error if the observation calculation is not in the ``READY`` state.
Parameters
----------
workflow : dict[str, Any]
The workflow data structure returned by ``get_by_id()``.
Raises
------
RuntimeError
If the calculation state is not ``READY``.
"""
if workflow["state"] != CalculationState.READY.value:
raise RuntimeError(
"Observation calculation is not READY (current state: "
f"{workflow['state']}). Please retry after background processing "
"is complete."
)
def _check_already_set(
workflow: dict[str, Any],
workflow_state: ObservationWorkflowState,
) -> bool:
"""
Check if the workflow is already set to the desired state.
Parameters
----------
workflow : dict[str, Any]
The workflow data structure returned by ``get_by_id()``.
workflow_state : ObservationWorkflowState
The desired workflow state.
Returns
-------
bool
``True`` if the current workflow state matches the desired state,
otherwise ``False``.
"""
return workflow["value"]["state"] == workflow_state.value
def _check_valid_transition(
workflow: dict[str, Any],
workflow_state: ObservationWorkflowState,
) -> None:
"""
Validate that the desired workflow state is allowed as a transition.
Parameters
----------
workflow : dict[str, Any]
The workflow data structure returned by ``get_by_id()``.
workflow_state : ObservationWorkflowState
The desired workflow state to transition to.
Raises
------
ValueError
If the requested transition is not allowed based on
``validTransitions``.
"""
valid_transitions = workflow["value"].get("validTransitions", [])
if workflow_state.value not in valid_transitions:
valid_str = ", ".join(valid_transitions) or "None"
raise ValueError(
f"Cannot transition to '{workflow_state.value}'. "
f"Valid transitions are: {valid_str}."
)