Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
338 changes: 168 additions & 170 deletions src/murfey/client/analyser.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,14 @@ def __init__(
):
super().__init__()
self._basepath = basepath_local.absolute()
self._token = token
self._environment = environment
self._limited = limited
self._experiment_type = ""
self._acquisition_software = ""
self._extension: str = ""
self._unseen_xml: list = []
self._context: Context | None = None
self._batch_store: dict = {}
self._environment = environment
self._force_mdoc_metadata = force_mdoc_metadata
self._token = token
self._serialem = serialem
self.parameters_model: (
Type[ProcessingParametersSPA] | Type[ProcessingParametersTomo] | None
) = None

self.queue: queue.Queue = queue.Queue()
self.thread = threading.Thread(name="Analyser", target=self._analyse)
self.thread = threading.Thread(name="Analyser", target=self._analyse_in_thread)
self._stopping = False
self._halt_thread = False
self._murfey_config = (
Expand All @@ -85,6 +76,17 @@ def __init__(
else {}
)

# SPA & Tomo-specific attributes
self._extension: str = ""
self._unseen_xml: list = []
self._batch_store: dict = {}
self._force_mdoc_metadata = force_mdoc_metadata
self._mdoc_for_reading: Path | None = None
self._serialem = serialem
self.parameters_model: (
Type[ProcessingParametersSPA] | Type[ProcessingParametersTomo] | None
) = None

def __repr__(self) -> str:
return f"<Analyser ({self._basepath})>"

Expand Down Expand Up @@ -334,9 +336,8 @@ def post_transfer(self, transferred_file: Path):
f"An exception was encountered post transfer: {e}", exc_info=True
)

def _analyse(self):
def _analyse_in_thread(self):
logger.info("Analyser thread started")
mdoc_for_reading = None
while not self._halt_thread:
transferred_file = self.queue.get()
transferred_file = (
Expand All @@ -347,185 +348,182 @@ def _analyse(self):
if not transferred_file:
self._halt_thread = True
continue
if self._limited:
if (
"Metadata" in transferred_file.parts
or transferred_file.name == "EpuSession.dm"
and not self._context
):
if not (context := _get_context("SPAMetadataContext")):
continue
self._context = context.load()(
"epu",
self._basepath,
self._murfey_config,
self._token,
)
elif (
"Batch" in transferred_file.parts
or "SearchMaps" in transferred_file.parts
or transferred_file.name == "Session.dm"
and not self._context
):
if not (context := _get_context("TomographyMetadataContext")):
continue
self._context = context.load()(
"tomo",
self._basepath,
self._murfey_config,
self._token,
)
self.post_transfer(transferred_file)
else:
dc_metadata = {}
if not self._serialem and (
self._force_mdoc_metadata
and transferred_file.suffix == ".mdoc"
or mdoc_for_reading
):
if self._context:
try:
dc_metadata = self._context.gather_metadata(
mdoc_for_reading or transferred_file,
environment=self._environment,
)
except KeyError as e:
logger.error(
f"Metadata gathering failed with a key error for key: {e.args[0]}"
)
raise e
if not dc_metadata:
mdoc_for_reading = None
elif transferred_file.suffix == ".mdoc":
mdoc_for_reading = transferred_file
if not self._context:
if not self._find_extension(transferred_file):
logger.debug(f"No extension found for {transferred_file}")
continue
if not self._find_context(transferred_file):
logger.debug(
f"Couldn't find context for {str(transferred_file)!r}"
self._analyse(transferred_file)
self.queue.task_done()
logger.debug("Analyer thread has stopped analysing incoming files")
self.notify(final=True)

def _analyse(self, transferred_file: Path):
if self._limited:
if (
"Metadata" in transferred_file.parts
or transferred_file.name == "EpuSession.dm"
and not self._context
):
if not (context := _get_context("SPAMetadataContext")):
return
self._context = context.load()(
"epu",
self._basepath,
self._murfey_config,
self._token,
)
elif (
"Batch" in transferred_file.parts
or "SearchMaps" in transferred_file.parts
or transferred_file.name == "Session.dm"
and not self._context
):
if not (context := _get_context("TomographyMetadataContext")):
return
self._context = context.load()(
"tomo",
self._basepath,
self._murfey_config,
self._token,
)
self.post_transfer(transferred_file)
else:
dc_metadata = {}
if not self._serialem and (
self._force_mdoc_metadata
and transferred_file.suffix == ".mdoc"
or self._mdoc_for_reading
):
if self._context:
try:
dc_metadata = self._context.gather_metadata(
self._mdoc_for_reading or transferred_file,
environment=self._environment,
)
self.queue.task_done()
continue
elif self._extension:
logger.info(
f"Context found successfully for {transferred_file}"
except KeyError as e:
logger.error(
f"Metadata gathering failed with a key error for key: {e.args[0]}"
)
try:
raise e
if not dc_metadata:
self._mdoc_for_reading = None
elif transferred_file.suffix == ".mdoc":
self._mdoc_for_reading = transferred_file
if not self._context:
if not self._find_extension(transferred_file):
logger.debug(f"No extension found for {transferred_file}")
return
if not self._find_context(transferred_file):
logger.debug(f"Couldn't find context for {str(transferred_file)!r}")
return
elif self._extension:
logger.info(f"Context found successfully for {transferred_file}")
try:
if self._context is not None:
self._context.post_first_transfer(
transferred_file,
environment=self._environment,
)
except Exception as e:
logger.error(f"Exception encountered: {e}")
if "AtlasContext" not in str(self._context):
if not dc_metadata:
try:
except Exception as e:
logger.error(f"Exception encountered: {e}")
if "AtlasContext" not in str(self._context):
if not dc_metadata:
try:
if self._context is not None:
dc_metadata = self._context.gather_metadata(
self._xml_file(transferred_file),
environment=self._environment,
)
except NotImplementedError:
dc_metadata = {}
except KeyError as e:
logger.error(
f"Metadata gathering failed with a key error for key: {e.args[0]}"
)
raise e
except ValueError as e:
logger.error(
f"Metadata gathering failed with a value error: {e}"
)
if not dc_metadata or not self._force_mdoc_metadata:
self._unseen_xml.append(transferred_file)
else:
self._unseen_xml = []
if dc_metadata.get("file_extension"):
self._extension = dc_metadata["file_extension"]
else:
dc_metadata["file_extension"] = self._extension
dc_metadata["acquisition_software"] = (
self._context._acquisition_software
)
self.notify(dc_metadata)

# Contexts that can be immediately posted without additional work
elif "CLEMContext" in str(self._context):
logger.debug(
f"File {transferred_file.name!r} is part of CLEM workflow"
)
self.post_transfer(transferred_file)
elif "FIBContext" in str(self._context):
logger.debug(
f"File {transferred_file.name!r} is part of the FIB workflow"
)
self.post_transfer(transferred_file)
elif "SXTContext" in str(self._context):
logger.debug(f"File {transferred_file.name!r} is an SXT file")
self.post_transfer(transferred_file)
elif "AtlasContext" in str(self._context):
logger.debug(f"File {transferred_file.name!r} is part of the atlas")
self.post_transfer(transferred_file)

# Handle files with tomography and SPA context differently
elif not self._extension or self._unseen_xml:
if not self._find_extension(transferred_file):
logger.error(f"No extension found for {transferred_file}")
continue
if self._extension:
logger.info(
f"Extension found successfully for {transferred_file}"
)
try:
self._context.post_first_transfer(
transferred_file,
environment=self._environment,
)
except Exception as e:
logger.error(f"Exception encountered: {e}")
if not dc_metadata:
try:
dc_metadata = self._context.gather_metadata(
mdoc_for_reading
or self._xml_file(transferred_file),
environment=self._environment,
)
except NotImplementedError:
dc_metadata = {}
except KeyError as e:
logger.error(
f"Metadata gathering failed with a key error for key: {e.args[0]}"
)
raise e
except ValueError as e:
logger.error(
f"Metadata gathering failed with a value error: {e}"
)
if not dc_metadata or not self._force_mdoc_metadata:
mdoc_for_reading = None
self._unseen_xml.append(transferred_file)
if dc_metadata:
else:
self._unseen_xml = []
if dc_metadata.get("file_extension"):
self._extension = dc_metadata["file_extension"]
else:
dc_metadata["file_extension"] = self._extension
dc_metadata["acquisition_software"] = (
self._context._acquisition_software
if self._context is not None:
dc_metadata["acquisition_software"] = (
self._context._acquisition_software
)
self.notify(dc_metadata)

# Contexts that can be immediately posted without additional work
elif "CLEMContext" in str(self._context):
logger.debug(f"File {transferred_file.name!r} is part of CLEM workflow")
self.post_transfer(transferred_file)
elif "FIBContext" in str(self._context):
logger.debug(
f"File {transferred_file.name!r} is part of the FIB workflow"
)
self.post_transfer(transferred_file)
elif "SXTContext" in str(self._context):
logger.debug(f"File {transferred_file.name!r} is an SXT file")
self.post_transfer(transferred_file)
elif "AtlasContext" in str(self._context):
logger.debug(f"File {transferred_file.name!r} is part of the atlas")
self.post_transfer(transferred_file)

# Handle files with tomography and SPA context differently
elif not self._extension or self._unseen_xml:
if not self._find_extension(transferred_file):
logger.error(f"No extension found for {transferred_file}")
return
if self._extension:
logger.info(f"Extension found successfully for {transferred_file}")
try:
self._context.post_first_transfer(
transferred_file,
environment=self._environment,
)
except Exception as e:
logger.error(f"Exception encountered: {e}")
if not dc_metadata:
try:
dc_metadata = self._context.gather_metadata(
self._mdoc_for_reading
or self._xml_file(transferred_file),
environment=self._environment,
)
self.notify(dc_metadata)
elif any(
context in str(self._context)
for context in (
"SPAContext",
"SPAMetadataContext",
"TomographyContext",
"TomographyMetadataContext",
)
):
context = str(self._context).split(" ")[0].split(".")[-1]
logger.debug(
f"Transferring file {str(transferred_file)} with context {context!r}"
)
self.post_transfer(transferred_file)
self.queue.task_done()
logger.debug("Analyer thread has stopped analysing incoming files")
self.notify(final=True)
except KeyError as e:
logger.error(
f"Metadata gathering failed with a key error for key: {e.args[0]}"
)
raise e
if not dc_metadata or not self._force_mdoc_metadata:
self._mdoc_for_reading = None
self._unseen_xml.append(transferred_file)
if dc_metadata:
self._unseen_xml = []
if dc_metadata.get("file_extension"):
self._extension = dc_metadata["file_extension"]
else:
dc_metadata["file_extension"] = self._extension
dc_metadata["acquisition_software"] = (
self._context._acquisition_software
)
self.notify(dc_metadata)
elif any(
context in str(self._context)
for context in (
"SPAContext",
"SPAMetadataContext",
"TomographyContext",
"TomographyMetadataContext",
)
):
context = str(self._context).split(" ")[0].split(".")[-1]
logger.debug(
f"Transferring file {str(transferred_file)} with context {context!r}"
)
self.post_transfer(transferred_file)

def _xml_file(self, data_file: Path) -> Path:
if not self._environment:
Expand Down
Loading