Source code for gpp_client.domains.attachment

"""
Module for attachment-related domain functionality.
"""

__all__ = ["AttachmentDomain"]

import logging
from pathlib import Path
from urllib.parse import urlparse

from aiohttp import ClientHandlerType, ClientRequest, ClientResponse

from gpp_client.domains.base import BaseDomain
from gpp_client.exceptions import GPPClientError, GPPResponseError
from gpp_client.generated.enums import AttachmentType
from gpp_client.generated.get_observation_attachments_by_id import (
    GetObservationAttachmentsById,
)
from gpp_client.generated.get_observation_attachments_by_reference import (
    GetObservationAttachmentsByReference,
)
from gpp_client.generated.get_program_attachments_by_id import (
    GetProgramAttachmentsById,
)
from gpp_client.generated.get_program_attachments_by_proposal_reference import (
    GetProgramAttachmentsByProposalReference,
)
from gpp_client.generated.get_program_attachments_by_reference import (
    GetProgramAttachmentsByReference,
)

logger = logging.getLogger(__name__)

DEFAULT_OK: set[int] = {200}
UPLOAD_OK: set[int] = {200, 201}
UPDATE_OK: set[int] = {200, 201}
DELETE_OK: set[int] = {200, 204}


[docs] class AttachmentDomain(BaseDomain): """ Domain class for attachment-related operations. """
[docs] async def upload( self, program_id: str, *, attachment_type: AttachmentType, file_name: str, description: str | None = None, file_path: str | Path | None = None, content: bytes | None = None, ) -> str: """ Upload a new attachment for a program. Parameters ---------- program_id : str The program ID to associate the attachment with. attachment_type : AttachmentType The attachment type. file_name : str The file name to store for the attachment. description : str | None, optional Optional attachment description. file_path : str | Path | None, optional Path to a file whose contents will be uploaded. Mutually exclusive with ``content``. content : bytes | None, optional Raw bytes to upload. Mutually exclusive with ``file_path``. Returns ------- str The created attachment ID. Raises ------ GPPClientError If the upload fails or the response is invalid. GPPValidationError If a validation error occurs. """ logger.debug( "Uploading attachment for program %s (type=%s, file_name=%s)", program_id, attachment_type, file_name, ) body = self.resolve_content(file_path=file_path, content=content) params = _build_upload_params( program_id=program_id, attachment_type=attachment_type, file_name=file_name, description=description, ) session = await self._rest.get_session() url = "/attachment" try: async with session.post(url, params=params, data=body) as response: await self.raise_for_status(response, ok_statuses=UPLOAD_OK) text = await response.text() attachment_id = text.strip() if not attachment_id: raise GPPClientError( "Upload attachment returned an empty attachment id." ) logger.debug("Uploaded attachment id=%s", attachment_id) return attachment_id except GPPResponseError: raise except Exception as exc: self.raise_error(GPPClientError, exc)
[docs] async def delete_by_id(self, attachment_id: str) -> None: """ Delete an attachment by its ID. Parameters ---------- attachment_id : str The ID of the attachment to delete. Raises ------ GPPClientError If the deletion fails. """ logger.debug("Deleting attachment %s", attachment_id) session = await self._rest.get_session() url = f"/attachment/{attachment_id}" try: async with session.delete(url) as response: await self.raise_for_status(response, ok_statuses=DELETE_OK) logger.debug( "Deleted attachment %s", attachment_id, ) except GPPResponseError: raise except Exception as exc: self.raise_error(GPPClientError, exc)
[docs] async def update_by_id( self, attachment_id: str, *, file_name: str, description: str | None = None, file_path: str | Path | None = None, content: bytes | None = None, ) -> None: """ Update an attachment by its ID. Parameters ---------- attachment_id : str The ID of the attachment to update. file_name : str The new file name for the attachment. This is required. description : str | None, optional The new description for the attachment. file_path : str | Path | None, optional The path to the new file content for the attachment. content : bytes | None, optional The new file content as bytes. Raises ------ GPPClientError If the update fails. GPPValidationError If a validation error occurs. """ logger.debug("Updating attachment %s", attachment_id) body = self.resolve_content(file_path=file_path, content=content) # File name is required. params = _build_update_params(file_name=file_name, description=description) session = await self._rest.get_session() url = f"/attachment/{attachment_id}" try: async with session.put(url, params=params, data=body) as response: await self.raise_for_status(response, ok_statuses=UPDATE_OK) logger.debug( "Updated attachment %s", attachment_id, ) except GPPResponseError: raise except Exception as exc: self.raise_error(GPPClientError, exc)
[docs] async def get_download_url_by_id(self, attachment_id: str) -> str: """ Get the download URL for an attachment by its ID. Parameters ---------- attachment_id : str The ID of the attachment. Returns ------- str The download URL for the attachment. """ logger.debug("Getting download URL for attachment %s", attachment_id) session = await self._rest.get_session() url = f"/attachment/url/{attachment_id}" try: async with session.get(url) as response: await self.raise_for_status(response, ok_statuses=DEFAULT_OK) download_url = await response.text() except GPPResponseError: raise except Exception as exc: self.raise_error(GPPClientError, exc) return download_url
[docs] async def download_by_id( self, attachment_id: str, save_to: str | Path | None = None, overwrite: bool = False, chunk_size: int = 1024 * 1024, ) -> Path: """ Download an attachment by its ID. Parameters ---------- attachment_id : str The ID of the attachment. save_to : str | Path | None, optional The directory to save the downloaded attachment. If ``None``, defaults to home directory. overwrite : bool, default=False Whether to overwrite the file if it already exists. chunk_size : int, default=1 MB The chunk size for downloading the file in bytes. Returns ------- Path The path to the downloaded file. """ logger.debug("Downloading attachment %s", attachment_id) session = await self._rest.get_session() download_url = await self.get_download_url_by_id(attachment_id) # Get the filename and resolve the destination directory. filename = _filename_from_presigned_url(download_url) dest_dir = _resolve_download_dir(save_to) logger.debug("Resolved download directory: %s", dest_dir) path = dest_dir / filename # Create the destination directory if it doesn't exist. dest_dir.mkdir(parents=True, exist_ok=True) # Check if the file exists and handle overwrite option. if path.exists(): if not overwrite: raise GPPClientError( f"File {path} already exists and overwrite is set to False." ) logger.debug("File %s exists, overwriting.", path) path.unlink() # Use the presigned URL to download the attachment content. try: async with session.get( download_url, middlewares=(_remove_headers_middleware,), ) as response: await self.raise_for_status(response, ok_statuses=DEFAULT_OK) # Download the file in chunks to avoid loading it all into memory. with path.open("wb") as fh: async for chunk in response.content.iter_chunked(chunk_size): fh.write(chunk) logger.info("Downloaded %s", path) return path except GPPResponseError: raise except Exception as exc: self.raise_error(GPPClientError, exc, include_traceback=True)
[docs] async def get_all_by_observation_id( self, observation_id: str, ) -> GetObservationAttachmentsById: """ Get all attachments for an observation by observation ID. Parameters ---------- observation_id : str The observation ID. Returns ------- GetObservationAttachmentsById The generated GraphQL response model. """ return await self._graphql.get_observation_attachments_by_id( observation_id=observation_id )
[docs] async def get_all_by_observation_reference( self, observation_reference: str, ) -> GetObservationAttachmentsByReference: """ Get all attachments for an observation by observation reference. Parameters ---------- observation_reference : str The observation reference label. Returns ------- GetObservationAttachmentsByReference The generated GraphQL response model. """ return await self._graphql.get_observation_attachments_by_reference( observation_reference=observation_reference )
[docs] async def get_all_by_program_id( self, program_id: str, ) -> GetProgramAttachmentsById: """ Get all attachments for a program by program ID. Parameters ---------- program_id : str The program ID. Returns ------- GetProgramAttachmentsById The generated GraphQL response model. """ return await self._graphql.get_program_attachments_by_id(program_id=program_id)
[docs] async def get_all_by_program_reference( self, program_reference: str, ) -> GetProgramAttachmentsByReference: """ Get all attachments for a program by program reference. Parameters ---------- program_reference : str The program reference label. Returns ------- GetProgramAttachmentsByReference The generated GraphQL response model. """ return await self._graphql.get_program_attachments_by_program_reference( program_reference=program_reference )
[docs] async def get_all_by_proposal_reference( self, proposal_reference: str, ) -> GetProgramAttachmentsByProposalReference: """ Get all attachments for a program by proposal reference. Parameters ---------- proposal_reference : str The proposal reference label. Returns ------- GetProgramAttachmentsByProposalReference The generated GraphQL response model. """ return await self._graphql.get_program_attachments_by_proposal_reference( proposal_reference=proposal_reference )
async def _remove_headers_middleware( req: ClientRequest, handler: ClientHandlerType, ) -> ClientResponse: """ Remove Authorization / Content-Type headers for presigned or external URLs. Needed because some presigned URLs (e.g., AWS S3) reject requests with unexpected headers. Parameters ---------- req : ClientRequest The outgoing request. handler : ClientHandlerType The next handler in the middleware chain. Returns ------- ClientResponse The response from the handler. """ req.headers.pop("Authorization", None) req.headers.pop("Content-Type", None) return await handler(req) def _filename_from_presigned_url(download_url: str) -> str: """ Extract filename from a presigned S3 URL. Parameters ---------- download_url : str The presigned download URL. Returns ------- str The filename extracted from the URL. """ parsed = urlparse(download_url) name = Path(parsed.path).name if not name: raise ValueError("Could not determine filename from presigned URL") return name def _resolve_download_dir(save_to: str | Path | None) -> Path: """ Resolve the download directory. Parameters ---------- save_to : str | Path | None The directory to save the downloaded file to. If ``None``, defaults to home directory. Returns ------- Path The resolved directory path. """ # Default is home directory. if save_to is None: return Path.home() path = Path(save_to).expanduser() if path.exists() and not path.is_dir(): raise ValueError(f"save_to must be a directory, got file: {path}") return path def _build_upload_params( *, program_id: str, attachment_type: AttachmentType, file_name: str, description: str | None, ) -> dict[str, str]: """ Build upload parameters for attachment upload. Parameters ---------- program_id : str The program ID to associate the attachment with. attachment_type : AttachmentType The attachment type. file_name : str The file name to store for the attachment. description : str | None, optional Optional attachment description. Returns ------- dict[str, str] The parameters for the upload request. """ params: dict[str, str] = { "programId": program_id, "fileName": file_name, "attachmentType": attachment_type.value, } if description and description.strip(): params["description"] = description.strip() return params def _build_update_params(*, file_name: str, description: str | None) -> dict[str, str]: """ Build update parameters for attachment update. Parameters ---------- file_name : str The new file name for the attachment. description : str | None The new description for the attachment. Returns ------- dict[str, str] The parameters for the update request. """ params: dict[str, str] = {"fileName": file_name} if description is not None and description.strip() != "": params["description"] = description.strip() return params