Source code for gpp_client.managers.attachment

"""
Manager for interacting with attachment resources.
"""

__all__ = ["AttachmentManager"]

import logging
from pathlib import Path
from typing import Any
from urllib.parse import urlparse

from aiohttp import ClientHandlerType, ClientRequest, ClientResponse

from gpp_client.api.custom_fields import (
    AttachmentFields,
    ObservationFields,
    ProgramFields,
)
from gpp_client.api.custom_queries import Query
from gpp_client.api.enums import AttachmentType
from gpp_client.exceptions import GPPClientError, GPPResponseError
from gpp_client.managers.base import BaseManager

logger = logging.getLogger(__name__)


[docs] class AttachmentManager(BaseManager): """ Manager for interacting with attachment resources. """ _DEFAULT_OK: set[int] = {200} _UPLOAD_OK: set[int] = {200, 201} _UPDATE_OK: set[int] = {200, 201} _DELETE_OK: set[int] = {200, 204} @staticmethod def _build_upload_params( *, program_id: str, attachment_type: AttachmentType, file_name: str, description: str | None, ) -> dict[str, str]: """ Build upload parameters for attachment upload. Parameters ---------- program_id : str The program ID to associate the attachment with. attachment_type : AttachmentType The attachment type. file_name : str The file name to store for the attachment. description : str | None, optional Optional attachment description. Returns ------- dict[str, str] The parameters for the upload request. """ params: dict[str, str] = { "programId": program_id, "fileName": file_name, "attachmentType": attachment_type.value, } if description and description.strip(): params["description"] = description.strip() return params @staticmethod def _build_update_params( *, file_name: str, description: str | None ) -> dict[str, str]: """ Build update parameters for attachment update. Parameters ---------- file_name : str The new file name for the attachment. description : str | None The new description for the attachment. Returns ------- dict[str, str] The parameters for the update request. """ params: dict[str, str] = {"fileName": file_name} if description is not None and description.strip() != "": params["description"] = description.strip() return params
[docs] async def upload( self, program_id: str, *, attachment_type: AttachmentType, file_name: str, description: str | None = None, file_path: str | Path | None = None, content: bytes | None = None, ) -> str: """ Upload a new attachment for a program. Parameters ---------- program_id : str The program ID to associate the attachment with. attachment_type : AttachmentType The attachment type. file_name : str The file name to store for the attachment. description : str | None, optional Optional attachment description. file_path : str | Path | None, optional Path to a file whose contents will be uploaded. Mutually exclusive with ``content``. content : bytes | None, optional Raw bytes to upload. Mutually exclusive with ``file_path``. Returns ------- str The created attachment ID. Raises ------ GPPClientError If the upload fails or the response is invalid. GPPValidationError If a validation error occurs. """ logger.debug( "Uploading attachment for program %s (type=%s, file_name=%s)", program_id, attachment_type, file_name, ) body = self.resolve_content(file_path=file_path, content=content) params = self._build_upload_params( program_id=program_id, attachment_type=attachment_type, file_name=file_name, description=description, ) session = await self.rest_client.get_session() url = "/attachment" try: async with session.post(url, params=params, data=body) as response: await self.raise_for_status(response, ok_statuses=self._UPLOAD_OK) text = await response.text() attachment_id = text.strip() if not attachment_id: raise GPPClientError( "Upload attachment returned an empty attachment id." ) logger.debug("Uploaded attachment id=%s", attachment_id) return attachment_id except GPPResponseError: raise except Exception as exc: self.raise_error(GPPClientError, exc)
[docs] async def delete_by_id(self, attachment_id: str) -> None: """ Delete an attachment by its ID. Parameters ---------- attachment_id : str The ID of the attachment to delete. Raises ------ GPPClientError If the deletion fails. """ logger.debug("Deleting attachment %s", attachment_id) session = await self.rest_client.get_session() url = f"/attachment/{attachment_id}" try: async with session.delete(url) as response: await self.raise_for_status(response, ok_statuses=self._DELETE_OK) logger.debug( "Deleted attachment %s", attachment_id, ) except GPPResponseError: raise except Exception as exc: self.raise_error(GPPClientError, exc)
[docs] async def update_by_id( self, attachment_id: str, *, file_name: str, description: str | None = None, file_path: str | Path | None = None, content: bytes | None = None, ) -> None: """ Update an attachment by its ID. Parameters ---------- attachment_id : str The ID of the attachment to update. file_name : str The new file name for the attachment. This is required. description : str | None, optional The new description for the attachment. file_path : str | Path | None, optional The path to the new file content for the attachment. content : bytes | None, optional The new file content as bytes. Raises ------ GPPClientError If the update fails. GPPValidationError If a validation error occurs. """ logger.debug("Updating attachment %s", attachment_id) body = self.resolve_content(file_path=file_path, content=content) # File name is required. params = self._build_update_params(file_name=file_name, description=description) session = await self.rest_client.get_session() url = f"/attachment/{attachment_id}" try: async with session.put(url, params=params, data=body) as response: await self.raise_for_status(response, ok_statuses=self._UPDATE_OK) logger.debug( "Updated attachment %s", attachment_id, ) except GPPResponseError: raise except Exception as exc: self.raise_error(GPPClientError, exc)
[docs] async def get_download_url_by_id(self, attachment_id: str) -> str: """ Get the download URL for an attachment by its ID. Parameters ---------- attachment_id : str The ID of the attachment. Returns ------- str The download URL for the attachment. """ logger.debug("Getting download URL for attachment %s", attachment_id) session = await self.rest_client.get_session() url = f"/attachment/url/{attachment_id}" try: async with session.get(url) as response: await self.raise_for_status(response, ok_statuses=self._DEFAULT_OK) download_url = await response.text() except GPPResponseError: raise except Exception as exc: self.raise_error(GPPClientError, exc) return download_url
[docs] async def download_by_id( self, attachment_id: str, save_to: str | Path | None = None, overwrite: bool = False, chunk_size: int = 1024 * 1024, ) -> Path: """ Download an attachment by its ID. Parameters ---------- attachment_id : str The ID of the attachment. save_to : str | Path | None, optional The directory to save the downloaded attachment. If ``None``, defaults to home directory. overwrite : bool, optional Whether to overwrite the file if it already exists. Default is ``False``. chunk_size : int, optional The chunk size for downloading the file in bytes. Default is 1 MB. Returns ------- Path The path to the downloaded file. """ logger.debug("Downloading attachment %s", attachment_id) session = await self.rest_client.get_session() download_url = await self.get_download_url_by_id(attachment_id) # Get the filename and resolve the destination directory. filename = filename_from_presigned_url(download_url) dest_dir = resolve_download_dir(save_to) logger.debug("Resolved download directory: %s", dest_dir) path = dest_dir / filename # Create the destination directory if it doesn't exist. dest_dir.mkdir(parents=True, exist_ok=True) # Check if the file exists and handle overwrite option. if path.exists(): if not overwrite: raise GPPClientError( f"File {path} already exists and overwrite is set to False." ) logger.debug("File %s exists, overwriting.", path) path.unlink() # Use the presigned URL to download the attachment content. try: async with session.get( download_url, middlewares=(remove_headers_middleware,), ) as response: await self.raise_for_status(response, ok_statuses=self._DEFAULT_OK) # Download the file in chunks to avoid loading it all into memory. with path.open("wb") as fh: async for chunk in response.content.iter_chunked(chunk_size): fh.write(chunk) logger.info("Downloaded %s", path) return path except GPPResponseError: raise except Exception as exc: self.raise_error(GPPClientError, exc, include_traceback=True)
[docs] async def get_all_by_observation( self, *, observation_reference: str | None, observation_id: str | None ) -> dict[str, Any]: """ Get all attachments associated with a given observation. Parameters ---------- observation_reference : str | None The observation reference. observation_id : str | None The observation ID. Returns ------- dict[str, Any] A dictionary containing attachment information. """ self.validate_single_identifier( observation_id=observation_id, observation_reference=observation_reference ) fields = Query.observation( observation_id=observation_id, observation_reference=observation_reference ).fields( ObservationFields.attachments().fields(*self._fields()), ) operation_name = "observation" result = await self.client.query(fields, operation_name=operation_name) return self.get_result(result, operation_name)
[docs] async def get_all_by_program( self, *, program_id: str | None, proposal_reference: str | None, program_reference: str | None, ) -> dict[str, Any]: """ Get all attachments associated with a given program. Parameters ---------- program_id : str | None The program ID. proposal_reference : str | None The proposal reference. program_reference : str | None The program reference. Returns ------- dict[str, Any] A dictionary containing attachment information. """ self.validate_single_identifier( program_id=program_id, program_reference=program_reference, proposal_reference=proposal_reference, ) fields = Query.program( program_id=program_id, program_reference=program_reference, proposal_reference=proposal_reference, ).fields( ProgramFields.attachments().fields(*self._fields()), ) operation_name = "program" result = await self.client.query(fields, operation_name=operation_name) return self.get_result(result, operation_name)
@staticmethod def _fields() -> tuple: """ Get the fields to retrieve for attachments. Returns ------- tuple A tuple of attachment field names. """ return ( AttachmentFields.id, AttachmentFields.file_name, AttachmentFields.attachment_type, AttachmentFields.file_size, AttachmentFields.checked, AttachmentFields.description, AttachmentFields.updated_at, )
async def remove_headers_middleware( req: ClientRequest, handler: ClientHandlerType, ) -> ClientResponse: """ Remove Authorization / Content-Type headers for presigned or external URLs. Needed because some presigned URLs (e.g., AWS S3) reject requests with unexpected headers. Parameters ---------- req : ClientRequest The outgoing request. handler : ClientHandlerType The next handler in the middleware chain. Returns ------- ClientResponse The response from the handler. """ req.headers.pop("Authorization", None) req.headers.pop("Content-Type", None) return await handler(req) def filename_from_presigned_url(download_url: str) -> str: """ Extract filename from a presigned S3 URL. Parameters ---------- download_url : str The presigned download URL. Returns ------- str The filename extracted from the URL. """ parsed = urlparse(download_url) name = Path(parsed.path).name if not name: raise ValueError("Could not determine filename from presigned URL") return name def resolve_download_dir(save_to: str | Path | None) -> Path: """ Resolve the download directory. Parameters ---------- save_to : str | Path | None The directory to save the downloaded file to. If ``None``, defaults to home directory. Returns ------- Path The resolved directory path. """ # Default is home directory. if save_to is None: return Path.home() path = Path(save_to).expanduser() if path.exists() and not path.is_dir(): raise ValueError(f"save_to must be a directory, got file: {path}") return path