"""
Manager for interacting with observation resources.
"""
__all__ = ["ObservationManager"]
import logging
from pathlib import Path
from typing import Any, Optional
from gpp_client.api.custom_fields import (
AirMassRangeFields,
CalculatedObservationWorkflowFields,
CloneObservationResultFields,
ConstraintSetFields,
CoordinatesFields,
CreateObservationResultFields,
DeclinationFields,
ElevationRangeFields,
ExposureTimeModeFields,
Flamingos2LongSlitAcquisitionFields,
Flamingos2LongSlitFields,
GmosNorthImagingFields,
GmosNorthImagingFilterFields,
GmosNorthLongSlitFields,
GmosSouthImagingFields,
GmosSouthImagingFilterFields,
GmosSouthLongSlitFields,
HourAngleRangeFields,
NonsiderealFields,
ObservationFields,
ObservationReferenceFields,
ObservationSelectResultFields,
ObservingModeFields,
OffsetFields,
OffsetPFields,
OffsetQFields,
ProgramFields,
ProperMotionDeclinationFields,
ProperMotionFields,
ProperMotionRAFields,
RightAscensionFields,
ScienceRequirementsFields,
SiderealFields,
SignalToNoiseExposureTimeModeFields,
TargetEnvironmentFields,
TargetFields,
TelluricTypeFields,
TimeAndCountExposureTimeModeFields,
TimeSpanFields,
TimingWindowEndAfterFields,
TimingWindowEndAtFields,
TimingWindowFields,
TimingWindowRepeatFields,
UpdateObservationsResultFields,
WavelengthFields,
)
from gpp_client.api.custom_mutations import Mutation
from gpp_client.api.custom_queries import Query
from gpp_client.api.enums import Existence
from gpp_client.api.input_types import (
CloneObservationInput,
CreateObservationInput,
ObservationPropertiesInput,
UpdateObservationsInput,
WhereObservation,
WhereObservationReference,
WhereOrderObservationId,
WhereString,
)
from gpp_client.managers.base import BaseManager
logger = logging.getLogger(__name__)
[docs]
class ObservationManager(BaseManager):
"""
Manager for interacting with observation resources.
"""
[docs]
async def clone(
self,
*,
observation_id: Optional[str] = None,
observation_reference: Optional[str] = None,
properties: Optional[ObservationPropertiesInput] = None,
from_json: Optional[str | Path | dict[str, Any]] = None,
) -> dict[str, Any]:
"""
Clone an existing observation to create a new one.
Parameters
----------
observation_id : str, optional
Unique internal ID of the observation to clone.
observation_reference : str, optional
Human-readable reference label (e.g., "G-2025A-1234-Q-0001") of the observation to clone.
properties : ObservationPropertiesInput, optional
Properties to override in the cloned observation. This or ``from_json`` may be supplied.
from_json : str | Path | dict[str, Any], optional
JSON representation of the properties. May be a path-like object (``str`` or ``Path``) to a JSON file, or a ``dict`` already containing the JSON data.
Returns
-------
dict[str, Any]
A dictionary containing details of the original and new cloned observations.
Raises
------
GPPValidationError
If a validation error occurs.
GPPClientError
If an unexpected error occurs unpacking the response.
Notes
-----
Exactly one of `observation_id` or `observation_reference` must be provided to
identify the observation to clone. Additionally, either `properties` or
`from_json` may be supplied to specify overrides for the cloned observation.
"""
logger.debug(
"Cloning observation from observation with ID: %s or reference: %s",
observation_id,
observation_reference,
)
self.validate_single_identifier(
observation_id=observation_id, observation_reference=observation_reference
)
properties = self.load_properties(
properties=properties, from_json=from_json, cls=ObservationPropertiesInput
)
input_data = CloneObservationInput(
observation_id=observation_id,
observation_reference=observation_reference,
set_=properties,
)
fields = Mutation.clone_observation(input=input_data).fields(
CloneObservationResultFields.new_observation().fields(*self._fields()),
)
operation_name = "cloneObservation"
result = await self.client.mutation(fields, operation_name=operation_name)
return self.get_result(result, operation_name)
[docs]
async def create(
self,
*,
properties: Optional[ObservationPropertiesInput] = None,
from_json: Optional[str | Path | dict[str, Any]] = None,
program_id: Optional[str] = None,
proposal_reference: Optional[str] = None,
program_reference: Optional[str] = None,
) -> dict[str, Any]:
"""
Create a new observation under a specified program.
Parameters
----------
properties : ObservationPropertiesInput, optional
Observation definition to use in creation. This or ``from_json`` must be
supplied.
from_json : str | Path | dict[str, Any], optional
JSON representation of the properties. May be a path-like object
(``str`` or ``Path``) to a JSON file, or a ``dict`` already containing the
JSON data.
program_id : str, optional
Direct program identifier. Must be provided if `proposal_reference` and
`program_reference` are omitted.
proposal_reference : str, optional
Proposal label alternative to `program_id`.
program_reference : str, optional
Program label alternative to `program_id`.
Returns
-------
dict[str, Any]
The created observation and its metadata.
Raises
------
GPPValidationError
If a validation error occurs.
GPPClientError
If an unexpected error occurs unpacking the response.
Notes
-----
Exactly one of ``properties`` or ``from_json`` must be supplied. Supplying
both or neither raises ``GPPValidationError``.
"""
logger.debug(
"Creating a new observation under a program ID: %s, proposal reference: %s, or program reference: %s",
program_id,
proposal_reference,
program_reference,
)
self.validate_single_identifier(
program_id=program_id,
proposal_reference=proposal_reference,
program_reference=program_reference,
)
properties = self.load_properties(
properties=properties, from_json=from_json, cls=ObservationPropertiesInput
)
input_data = CreateObservationInput(
program_id=program_id,
proposal_reference=proposal_reference,
program_reference=program_reference,
set_=properties,
)
fields = Mutation.create_observation(input=input_data).fields(
CreateObservationResultFields.observation().fields(*self._fields()),
)
operation_name = "createObservation"
result = await self.client.mutation(fields, operation_name=operation_name)
return self.get_result(result, operation_name)
[docs]
async def update_all(
self,
*,
properties: Optional[ObservationPropertiesInput] = None,
from_json: Optional[str | Path | dict[str, Any]] = None,
where: Optional[WhereObservation] = None,
limit: Optional[int] = None,
include_deleted: bool = False,
) -> dict[str, Any]:
"""
Update one or more observations with new properties.
Parameters
----------
properties : ObservationPropertiesInput, optional
Fields to update. This or ``from_json`` must be supplied.
from_json : str | Path | dict[str, Any], optional
JSON representation of the properties. May be a path-like object
(``str`` or ``Path``) to a JSON file, or a ``dict`` already containing the
JSON data.
where : WhereObservation, optional
Filter expression to limit which observations are updated.
limit : int, optional
Maximum number of observations to update.
include_deleted : bool, default=False
Whether to include soft-deleted observations.
Returns
-------
dict[str, Any]
The update result and updated records.
Raises
------
GPPValidationError
If a validation error occurs.
GPPClientError
If an unexpected error occurs unpacking the response.
Notes
-----
Exactly one of ``properties`` or ``from_json`` must be supplied. Supplying
both or neither raises ``GPPValidationError``.
"""
logger.debug("Updating observation(s)")
properties = self.load_properties(
properties=properties, from_json=from_json, cls=ObservationPropertiesInput
)
input_data = UpdateObservationsInput(
set_=properties,
where=where,
limit=limit,
include_deleted=include_deleted,
)
fields = Mutation.update_observations(input=input_data).fields(
UpdateObservationsResultFields.has_more,
UpdateObservationsResultFields.observations().fields(
*self._fields(include_deleted=include_deleted)
),
)
operation_name = "updateObservations"
result = await self.client.mutation(fields, operation_name=operation_name)
return self.get_result(result, operation_name)
@staticmethod
def _build_where_for_identifier(
*,
observation_id: str | None,
observation_reference: str | None,
) -> WhereObservation:
"""Build the ``WhereObservation`` filter for exactly one identifier."""
if observation_id is not None:
return WhereObservation(id=WhereOrderObservationId(eq=observation_id))
# At this point, observation_reference must be not ``None`` due to validation.
return WhereObservation(
reference=WhereObservationReference(
label=WhereString(eq=observation_reference)
)
)
[docs]
async def update_by_id(
self,
*,
observation_id: Optional[str] = None,
observation_reference: Optional[str] = None,
properties: Optional[ObservationPropertiesInput] = None,
from_json: Optional[str | Path | dict[str, Any]] = None,
include_deleted: bool = False,
) -> dict[str, Any]:
"""
Update a single observation by ID or reference.
Parameters
----------
observation_id : str, optional
Unique internal ID of the observation.
observation_reference : str, optional
Human-readable reference label (e.g., "G-2025A-1234-Q-0001").
properties : ObservationPropertiesInput, optional
New values to apply to the observation. This or ``from_json`` must be
supplied.
from_json : str | Path | dict[str, Any], optional
JSON representation of the properties. May be a path-like object
(``str`` or ``Path``) to a JSON file, or a ``dict`` already containing the
JSON data.
include_deleted : bool, default=False
Whether to include soft-deleted observations in the match.
Returns
-------
dict[str, Any]
The updated observation.
Raises
------
GPPValidationError
If a validation error occurs.
GPPClientError
If an unexpected error occurs unpacking the response.
Notes
-----
Exactly one of ``properties`` or ``from_json`` must be supplied. Supplying
both or neither raises ``GPPValidationError``.
"""
logger.debug(
"Updating observation by ID: %s or reference: %s",
observation_id,
observation_reference,
)
self.validate_single_identifier(
observation_id=observation_id,
observation_reference=observation_reference,
)
where = self._build_where_for_identifier(
observation_id=observation_id, observation_reference=observation_reference
)
result = await self.update_all(
where=where,
limit=1,
properties=properties,
include_deleted=include_deleted,
from_json=from_json,
)
return self.get_single_result(result, "observations")
[docs]
async def get_by_id(
self,
*,
observation_id: Optional[str] = None,
observation_reference: Optional[str] = None,
include_deleted: bool = False,
) -> dict[str, Any]:
"""
Fetch a single observation by ID or reference.
This method retrieves a single observation using either the internal ID or
the reference label. Exactly one of `observation_id` or `observation_reference`
must be provided.
Parameters
----------
observation_id : str, optional
The unique internal identifier of the observation.
observation_reference : str, optional
The human-readable reference label (e.g., "G-2024B-1234-Q-0001").
include_deleted : bool, default=False
Whether to include soft-deleted observations in the query.
Returns
-------
dict[str, Any]
The retrieved observation.
Raises
------
GPPValidationError
If a validation error occurs.
GPPClientError
If an unexpected error occurs unpacking the response.
"""
logger.debug(
"Fetching observation by ID: %s or reference: %s",
observation_id,
observation_reference,
)
self.validate_single_identifier(
observation_id=observation_id, observation_reference=observation_reference
)
fields = Query.observation(
observation_id=observation_id, observation_reference=observation_reference
).fields(*self._fields(include_deleted=include_deleted))
operation_name = "observation"
result = await self.client.query(fields, operation_name=operation_name)
return self.get_result(result, operation_name)
[docs]
async def get_all(
self,
*,
include_deleted: bool = False,
where: WhereObservation | None = None,
offset: int | None = None,
limit: int | None = None,
) -> dict[str, Any]:
"""
Retrieve all observations with optional filters.
Parameters
----------
include_deleted : bool, default=False
Whether to include soft-deleted observations.
where : WhereObservation, optional
Filter criteria.
offset : int, optional
Cursor offset (by ID).
limit : int, optional
Maximum number of observations.
Returns
-------
dict[str, Any]
A dictionary with the results.
Raises
------
GPPClientError
If an unexpected error occurs unpacking the response.
"""
logger.debug("Retrieving observation(s)")
fields = Query.observations(
include_deleted=include_deleted, where=where, offset=offset, limit=limit
).fields(
ObservationSelectResultFields.has_more,
ObservationSelectResultFields.matches().fields(
*self._fields(include_deleted=include_deleted)
),
)
operation_name = "observations"
result = await self.client.query(fields, operation_name=operation_name)
return self.get_result(result, operation_name)
[docs]
async def restore_by_id(
self,
*,
observation_id: Optional[str] = None,
observation_reference: Optional[str] = None,
) -> dict[str, Any]:
"""
Restore a soft-deleted observation using ID or reference.
Parameters
----------
observation_id : str, optional
Unique internal ID of the observation to restore.
observation_reference : str, optional
Human-readable reference label (e.g., "G-2025A-1234-Q-0001").
Returns
-------
dict[str, Any]
The restored observation with `existence` set to PRESENT.
Raises
------
GPPValidationError
If a validation error occurs.
GPPClientError
If an unexpected error occurs unpacking the response.
"""
logger.debug(
"Restoring observation with ID: %s or reference: %s",
observation_id,
observation_reference,
)
properties = ObservationPropertiesInput(existence=Existence.PRESENT)
return await self.update_by_id(
observation_id=observation_id,
observation_reference=observation_reference,
properties=properties,
include_deleted=True,
)
[docs]
async def delete_by_id(
self,
*,
observation_id: Optional[str] = None,
observation_reference: Optional[str] = None,
) -> dict[str, Any]:
"""
Soft-delete an observation using ID or reference.
Parameters
----------
observation_id : str, optional
Unique internal ID of the observation to delete.
observation_reference : str, optional
Human-readable reference label (e.g., "G-2025A-1234-Q-0001").
Returns
-------
dict[str, Any]
The deleted observation with `existence` set to DELETED.
Raises
------
GPPValidationError
If a validation error occurs.
GPPClientError
If an unexpected error occurs unpacking the response.
"""
logger.debug(
"Deleting observation with ID: %s or reference: %s",
observation_id,
observation_reference,
)
properties = ObservationPropertiesInput(existence=Existence.DELETED)
return await self.update_by_id(
observation_id=observation_id,
observation_reference=observation_reference,
properties=properties,
include_deleted=False,
)
@staticmethod
def _fields(include_deleted: bool = False) -> tuple:
"""
Return the GraphQL fields to retrieve for observations.
Parameters
----------
include_deleted : bool, default=False
Whether to include deleted fields in nested lookups.
Returns
-------
tuple
Field selections for observation queries.
"""
return (
ObservationFields.id,
ObservationFields.existence,
ObservationFields.reference().fields(ObservationReferenceFields.label),
ObservationFields.calibration_role,
ObservationFields.instrument,
ObservationFields.observer_notes,
ObservationFields.title,
ObservationFields.subtitle,
ObservationFields.program().fields(
ProgramFields.id,
ProgramFields.name,
ProgramFields.existence,
),
ObservationFields.science_requirements().fields(
ScienceRequirementsFields.mode
),
ObservationFields.science_band,
ObservationFields.workflow().fields(
CalculatedObservationWorkflowFields.state
),
ObservationFields.observing_mode().fields(
ObservingModeFields.instrument,
ObservingModeFields.mode,
ObservingModeFields.gmos_north_long_slit().fields(
GmosNorthLongSlitFields.grating,
GmosNorthLongSlitFields.filter_,
GmosNorthLongSlitFields.fpu,
GmosNorthLongSlitFields.central_wavelength().fields(
WavelengthFields.nanometers
),
GmosNorthLongSlitFields.spatial_offsets().fields(
OffsetQFields.arcseconds
),
),
ObservingModeFields.gmos_south_long_slit().fields(
GmosSouthLongSlitFields.grating,
GmosSouthLongSlitFields.filter_,
GmosSouthLongSlitFields.fpu,
GmosSouthLongSlitFields.central_wavelength().fields(
WavelengthFields.nanometers
),
GmosSouthLongSlitFields.spatial_offsets().fields(
OffsetQFields.arcseconds
),
),
ObservingModeFields.gmos_north_imaging().fields(
GmosNorthImagingFields.filters().fields(
GmosNorthImagingFilterFields.filter_
)
),
ObservingModeFields.gmos_south_imaging().fields(
GmosSouthImagingFields.filters().fields(
GmosSouthImagingFilterFields.filter_
)
),
ObservingModeFields.flamingos_2_long_slit().fields(
Flamingos2LongSlitFields.decker,
Flamingos2LongSlitFields.default_decker,
Flamingos2LongSlitFields.default_offsets().fields(
OffsetFields.q().fields(
OffsetQFields.arcseconds,
),
OffsetFields.p().fields(
OffsetPFields.arcseconds,
),
),
Flamingos2LongSlitFields.disperser,
Flamingos2LongSlitFields.filter_,
Flamingos2LongSlitFields.fpu,
Flamingos2LongSlitFields.telluric_type().fields(
TelluricTypeFields.tag, TelluricTypeFields.star_types
),
Flamingos2LongSlitFields.exposure_time_mode().fields(
ExposureTimeModeFields.signal_to_noise().fields(
SignalToNoiseExposureTimeModeFields.value,
SignalToNoiseExposureTimeModeFields.at().fields(
WavelengthFields.nanometers
),
),
ExposureTimeModeFields.time_and_count().fields(
TimeAndCountExposureTimeModeFields.time().fields(
TimeSpanFields.seconds
),
TimeAndCountExposureTimeModeFields.count,
TimeAndCountExposureTimeModeFields.at().fields(
WavelengthFields.nanometers
),
),
),
Flamingos2LongSlitFields.explicit_read_mode,
Flamingos2LongSlitFields.explicit_reads,
Flamingos2LongSlitFields.explicit_decker,
Flamingos2LongSlitFields.readout_mode,
Flamingos2LongSlitFields.default_readout_mode,
Flamingos2LongSlitFields.explicit_read_mode,
Flamingos2LongSlitFields.offsets().fields(
OffsetFields.q().fields(
OffsetQFields.arcseconds,
),
OffsetFields.p().fields(
OffsetPFields.arcseconds,
),
),
Flamingos2LongSlitFields.acquisition().fields(
Flamingos2LongSlitAcquisitionFields.exposure_time_mode().fields(
ExposureTimeModeFields.signal_to_noise().fields(
SignalToNoiseExposureTimeModeFields.value,
SignalToNoiseExposureTimeModeFields.at().fields(
WavelengthFields.nanometers
),
),
ExposureTimeModeFields.time_and_count().fields(
TimeAndCountExposureTimeModeFields.time().fields(
TimeSpanFields.seconds,
),
TimeAndCountExposureTimeModeFields.count,
TimeAndCountExposureTimeModeFields.at().fields(
WavelengthFields.nanometers
),
),
)
),
Flamingos2LongSlitFields.initial_disperser,
Flamingos2LongSlitFields.initial_filter,
Flamingos2LongSlitFields.initial_fpu,
),
),
ObservationFields.constraint_set().fields(
ConstraintSetFields.image_quality,
ConstraintSetFields.cloud_extinction,
ConstraintSetFields.sky_background,
ConstraintSetFields.water_vapor,
ConstraintSetFields.elevation_range().fields(
ElevationRangeFields.air_mass().fields(
AirMassRangeFields.min,
AirMassRangeFields.max,
),
ElevationRangeFields.hour_angle().fields(
HourAngleRangeFields.min_hours,
HourAngleRangeFields.max_hours,
),
),
),
ObservationFields.timing_windows().fields(
TimingWindowFields.inclusion,
TimingWindowFields.start_utc,
TimingWindowFields.end.on(
"TimingWindowEndAt", TimingWindowEndAtFields.at_utc
),
TimingWindowFields.end.on(
"TimingWindowEndAfter",
TimingWindowEndAfterFields.after().fields(TimeSpanFields.seconds),
TimingWindowEndAfterFields.repeat().fields(
TimingWindowRepeatFields.period().fields(
TimeSpanFields.seconds
),
TimingWindowRepeatFields.times,
),
),
),
ObservationFields.target_environment().fields(
TargetEnvironmentFields.asterism(include_deleted).fields(
TargetFields.sidereal().fields(
SiderealFields.ra().fields(RightAscensionFields.hms),
SiderealFields.dec().fields(DeclinationFields.dms),
SiderealFields.proper_motion().fields(
ProperMotionFields.ra().fields(
ProperMotionRAFields.milliarcseconds_per_year
),
ProperMotionFields.dec().fields(
ProperMotionDeclinationFields.milliarcseconds_per_year
),
),
SiderealFields.epoch,
),
TargetFields.nonsidereal().fields(
NonsiderealFields.des,
NonsiderealFields.key_type,
NonsiderealFields.key,
),
TargetFields.name,
),
TargetEnvironmentFields.explicit_base().fields(
CoordinatesFields.ra().fields(RightAscensionFields.hms),
CoordinatesFields.dec().fields(DeclinationFields.dms),
),
),
)