diff --git a/src/murfey/client/analyser.py b/src/murfey/client/analyser.py index 88ee0721c..dd15ec53c 100644 --- a/src/murfey/client/analyser.py +++ b/src/murfey/client/analyser.py @@ -56,23 +56,14 @@ def __init__( ): super().__init__() self._basepath = basepath_local.absolute() + self._token = token + self._environment = environment self._limited = limited self._experiment_type = "" self._acquisition_software = "" - self._extension: str = "" - self._unseen_xml: list = [] self._context: Context | None = None - self._batch_store: dict = {} - self._environment = environment - self._force_mdoc_metadata = force_mdoc_metadata - self._token = token - self._serialem = serialem - self.parameters_model: ( - Type[ProcessingParametersSPA] | Type[ProcessingParametersTomo] | None - ) = None - self.queue: queue.Queue = queue.Queue() - self.thread = threading.Thread(name="Analyser", target=self._analyse) + self.thread = threading.Thread(name="Analyser", target=self._analyse_in_thread) self._stopping = False self._halt_thread = False self._murfey_config = ( @@ -85,6 +76,17 @@ def __init__( else {} ) + # SPA & Tomo-specific attributes + self._extension: str = "" + self._unseen_xml: list = [] + self._batch_store: dict = {} + self._force_mdoc_metadata = force_mdoc_metadata + self._mdoc_for_reading: Path | None = None + self._serialem = serialem + self.parameters_model: ( + Type[ProcessingParametersSPA] | Type[ProcessingParametersTomo] | None + ) = None + def __repr__(self) -> str: return f"" @@ -334,9 +336,8 @@ def post_transfer(self, transferred_file: Path): f"An exception was encountered post transfer: {e}", exc_info=True ) - def _analyse(self): + def _analyse_in_thread(self): logger.info("Analyser thread started") - mdoc_for_reading = None while not self._halt_thread: transferred_file = self.queue.get() transferred_file = ( @@ -347,185 +348,182 @@ def _analyse(self): if not transferred_file: self._halt_thread = True continue - if self._limited: - if ( - "Metadata" in transferred_file.parts - or transferred_file.name == "EpuSession.dm" - and not self._context - ): - if not (context := _get_context("SPAMetadataContext")): - continue - self._context = context.load()( - "epu", - self._basepath, - self._murfey_config, - self._token, - ) - elif ( - "Batch" in transferred_file.parts - or "SearchMaps" in transferred_file.parts - or transferred_file.name == "Session.dm" - and not self._context - ): - if not (context := _get_context("TomographyMetadataContext")): - continue - self._context = context.load()( - "tomo", - self._basepath, - self._murfey_config, - self._token, - ) - self.post_transfer(transferred_file) - else: - dc_metadata = {} - if not self._serialem and ( - self._force_mdoc_metadata - and transferred_file.suffix == ".mdoc" - or mdoc_for_reading - ): - if self._context: - try: - dc_metadata = self._context.gather_metadata( - mdoc_for_reading or transferred_file, - environment=self._environment, - ) - except KeyError as e: - logger.error( - f"Metadata gathering failed with a key error for key: {e.args[0]}" - ) - raise e - if not dc_metadata: - mdoc_for_reading = None - elif transferred_file.suffix == ".mdoc": - mdoc_for_reading = transferred_file - if not self._context: - if not self._find_extension(transferred_file): - logger.debug(f"No extension found for {transferred_file}") - continue - if not self._find_context(transferred_file): - logger.debug( - f"Couldn't find context for {str(transferred_file)!r}" + self._analyse(transferred_file) + self.queue.task_done() + logger.debug("Analyer thread has stopped analysing incoming files") + self.notify(final=True) + + def _analyse(self, transferred_file: Path): + if self._limited: + if ( + "Metadata" in transferred_file.parts + or transferred_file.name == "EpuSession.dm" + and not self._context + ): + if not (context := _get_context("SPAMetadataContext")): + return + self._context = context.load()( + "epu", + self._basepath, + self._murfey_config, + self._token, + ) + elif ( + "Batch" in transferred_file.parts + or "SearchMaps" in transferred_file.parts + or transferred_file.name == "Session.dm" + and not self._context + ): + if not (context := _get_context("TomographyMetadataContext")): + return + self._context = context.load()( + "tomo", + self._basepath, + self._murfey_config, + self._token, + ) + self.post_transfer(transferred_file) + else: + dc_metadata = {} + if not self._serialem and ( + self._force_mdoc_metadata + and transferred_file.suffix == ".mdoc" + or self._mdoc_for_reading + ): + if self._context: + try: + dc_metadata = self._context.gather_metadata( + self._mdoc_for_reading or transferred_file, + environment=self._environment, ) - self.queue.task_done() - continue - elif self._extension: - logger.info( - f"Context found successfully for {transferred_file}" + except KeyError as e: + logger.error( + f"Metadata gathering failed with a key error for key: {e.args[0]}" ) - try: + raise e + if not dc_metadata: + self._mdoc_for_reading = None + elif transferred_file.suffix == ".mdoc": + self._mdoc_for_reading = transferred_file + if not self._context: + if not self._find_extension(transferred_file): + logger.debug(f"No extension found for {transferred_file}") + return + if not self._find_context(transferred_file): + logger.debug(f"Couldn't find context for {str(transferred_file)!r}") + return + elif self._extension: + logger.info(f"Context found successfully for {transferred_file}") + try: + if self._context is not None: self._context.post_first_transfer( transferred_file, environment=self._environment, ) - except Exception as e: - logger.error(f"Exception encountered: {e}") - if "AtlasContext" not in str(self._context): - if not dc_metadata: - try: + except Exception as e: + logger.error(f"Exception encountered: {e}") + if "AtlasContext" not in str(self._context): + if not dc_metadata: + try: + if self._context is not None: dc_metadata = self._context.gather_metadata( self._xml_file(transferred_file), environment=self._environment, ) - except NotImplementedError: - dc_metadata = {} - except KeyError as e: - logger.error( - f"Metadata gathering failed with a key error for key: {e.args[0]}" - ) - raise e - except ValueError as e: - logger.error( - f"Metadata gathering failed with a value error: {e}" - ) - if not dc_metadata or not self._force_mdoc_metadata: - self._unseen_xml.append(transferred_file) - else: - self._unseen_xml = [] - if dc_metadata.get("file_extension"): - self._extension = dc_metadata["file_extension"] - else: - dc_metadata["file_extension"] = self._extension - dc_metadata["acquisition_software"] = ( - self._context._acquisition_software - ) - self.notify(dc_metadata) - - # Contexts that can be immediately posted without additional work - elif "CLEMContext" in str(self._context): - logger.debug( - f"File {transferred_file.name!r} is part of CLEM workflow" - ) - self.post_transfer(transferred_file) - elif "FIBContext" in str(self._context): - logger.debug( - f"File {transferred_file.name!r} is part of the FIB workflow" - ) - self.post_transfer(transferred_file) - elif "SXTContext" in str(self._context): - logger.debug(f"File {transferred_file.name!r} is an SXT file") - self.post_transfer(transferred_file) - elif "AtlasContext" in str(self._context): - logger.debug(f"File {transferred_file.name!r} is part of the atlas") - self.post_transfer(transferred_file) - - # Handle files with tomography and SPA context differently - elif not self._extension or self._unseen_xml: - if not self._find_extension(transferred_file): - logger.error(f"No extension found for {transferred_file}") - continue - if self._extension: - logger.info( - f"Extension found successfully for {transferred_file}" - ) - try: - self._context.post_first_transfer( - transferred_file, - environment=self._environment, - ) - except Exception as e: - logger.error(f"Exception encountered: {e}") - if not dc_metadata: - try: - dc_metadata = self._context.gather_metadata( - mdoc_for_reading - or self._xml_file(transferred_file), - environment=self._environment, - ) + except NotImplementedError: + dc_metadata = {} except KeyError as e: logger.error( f"Metadata gathering failed with a key error for key: {e.args[0]}" ) raise e + except ValueError as e: + logger.error( + f"Metadata gathering failed with a value error: {e}" + ) if not dc_metadata or not self._force_mdoc_metadata: - mdoc_for_reading = None self._unseen_xml.append(transferred_file) - if dc_metadata: + else: self._unseen_xml = [] if dc_metadata.get("file_extension"): self._extension = dc_metadata["file_extension"] else: dc_metadata["file_extension"] = self._extension - dc_metadata["acquisition_software"] = ( - self._context._acquisition_software + if self._context is not None: + dc_metadata["acquisition_software"] = ( + self._context._acquisition_software + ) + self.notify(dc_metadata) + + # Contexts that can be immediately posted without additional work + elif "CLEMContext" in str(self._context): + logger.debug(f"File {transferred_file.name!r} is part of CLEM workflow") + self.post_transfer(transferred_file) + elif "FIBContext" in str(self._context): + logger.debug( + f"File {transferred_file.name!r} is part of the FIB workflow" + ) + self.post_transfer(transferred_file) + elif "SXTContext" in str(self._context): + logger.debug(f"File {transferred_file.name!r} is an SXT file") + self.post_transfer(transferred_file) + elif "AtlasContext" in str(self._context): + logger.debug(f"File {transferred_file.name!r} is part of the atlas") + self.post_transfer(transferred_file) + + # Handle files with tomography and SPA context differently + elif not self._extension or self._unseen_xml: + if not self._find_extension(transferred_file): + logger.error(f"No extension found for {transferred_file}") + return + if self._extension: + logger.info(f"Extension found successfully for {transferred_file}") + try: + self._context.post_first_transfer( + transferred_file, + environment=self._environment, + ) + except Exception as e: + logger.error(f"Exception encountered: {e}") + if not dc_metadata: + try: + dc_metadata = self._context.gather_metadata( + self._mdoc_for_reading + or self._xml_file(transferred_file), + environment=self._environment, ) - self.notify(dc_metadata) - elif any( - context in str(self._context) - for context in ( - "SPAContext", - "SPAMetadataContext", - "TomographyContext", - "TomographyMetadataContext", - ) - ): - context = str(self._context).split(" ")[0].split(".")[-1] - logger.debug( - f"Transferring file {str(transferred_file)} with context {context!r}" - ) - self.post_transfer(transferred_file) - self.queue.task_done() - logger.debug("Analyer thread has stopped analysing incoming files") - self.notify(final=True) + except KeyError as e: + logger.error( + f"Metadata gathering failed with a key error for key: {e.args[0]}" + ) + raise e + if not dc_metadata or not self._force_mdoc_metadata: + self._mdoc_for_reading = None + self._unseen_xml.append(transferred_file) + if dc_metadata: + self._unseen_xml = [] + if dc_metadata.get("file_extension"): + self._extension = dc_metadata["file_extension"] + else: + dc_metadata["file_extension"] = self._extension + dc_metadata["acquisition_software"] = ( + self._context._acquisition_software + ) + self.notify(dc_metadata) + elif any( + context in str(self._context) + for context in ( + "SPAContext", + "SPAMetadataContext", + "TomographyContext", + "TomographyMetadataContext", + ) + ): + context = str(self._context).split(" ")[0].split(".")[-1] + logger.debug( + f"Transferring file {str(transferred_file)} with context {context!r}" + ) + self.post_transfer(transferred_file) def _xml_file(self, data_file: Path) -> Path: if not self._environment: