Skip to content
330 changes: 148 additions & 182 deletions src/murfey/client/analyser.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import threading
from importlib.metadata import entry_points
from pathlib import Path
from typing import Type
from typing import OrderedDict, Type

from murfey.client.context import Context
from murfey.client.destinations import find_longest_data_directory
Expand Down Expand Up @@ -56,23 +56,14 @@ def __init__(
):
super().__init__()
self._basepath = basepath_local.absolute()
self._token = token
self._environment = environment
self._limited = limited
self._experiment_type = ""
self._acquisition_software = ""
self._extension: str = ""
self._unseen_xml: list = []
self._context: Context | None = None
self._batch_store: dict = {}
self._environment = environment
self._force_mdoc_metadata = force_mdoc_metadata
self._token = token
self._serialem = serialem
self.parameters_model: (
Type[ProcessingParametersSPA] | Type[ProcessingParametersTomo] | None
) = None

self.queue: queue.Queue = queue.Queue()
self.thread = threading.Thread(name="Analyser", target=self._analyse)
self.thread = threading.Thread(name="Analyser", target=self._analyse_in_thread)
self._stopping = False
self._halt_thread = False
self._murfey_config = (
Expand All @@ -85,6 +76,17 @@ def __init__(
else {}
)

# SPA & Tomo-specific attributes
self._extension: str = ""
self._unseen_xml: list = []
self._batch_store: dict = {}
self._force_mdoc_metadata = force_mdoc_metadata
self._mdoc_for_reading: Path | None = None
self._serialem = serialem
self.parameters_model: (
Type[ProcessingParametersSPA] | Type[ProcessingParametersTomo] | None
) = None

def __repr__(self) -> str:
return f"<Analyser ({self._basepath})>"

Expand Down Expand Up @@ -334,9 +336,8 @@ def post_transfer(self, transferred_file: Path):
f"An exception was encountered post transfer: {e}", exc_info=True
)

def _analyse(self):
def _analyse_in_thread(self):
logger.info("Analyser thread started")
mdoc_for_reading = None
while not self._halt_thread:
transferred_file = self.queue.get()
transferred_file = (
Expand All @@ -347,185 +348,150 @@ def _analyse(self):
if not transferred_file:
self._halt_thread = True
continue
if self._limited:
if (
"Metadata" in transferred_file.parts
or transferred_file.name == "EpuSession.dm"
and not self._context
):
if not (context := _get_context("SPAMetadataContext")):
continue
self._context = context.load()(
"epu",
self._basepath,
self._murfey_config,
self._token,
)
elif (
"Batch" in transferred_file.parts
or "SearchMaps" in transferred_file.parts
or transferred_file.name == "Session.dm"
and not self._context
):
if not (context := _get_context("TomographyMetadataContext")):
continue
self._context = context.load()(
"tomo",
self._basepath,
self._murfey_config,
self._token,
)
self._analyse(transferred_file)
self.queue.task_done()
logger.debug("Analyser thread has stopped analysing incoming files")
self.notify(final=True)

def _analyse(self, transferred_file: Path):
if self._limited:
if (
"Metadata" in transferred_file.parts
or transferred_file.name == "EpuSession.dm"
and not self._context
):
if not (context := _get_context("SPAMetadataContext")):
return
self._context = context.load()(
"epu",
self._basepath,
self._murfey_config,
self._token,
)
elif (
"Batch" in transferred_file.parts
or "SearchMaps" in transferred_file.parts
or transferred_file.name == "Session.dm"
and not self._context
):
if not (context := _get_context("TomographyMetadataContext")):
return
self._context = context.load()(
"tomo",
self._basepath,
self._murfey_config,
self._token,
)
self.post_transfer(transferred_file)
else:
# Logic that doesn't require context determination
if not self._serialem and (
self._force_mdoc_metadata and transferred_file.suffix == ".mdoc"
):
self._mdoc_for_reading = transferred_file

# Try and determine context, and notify once when context is found
if self._context is None:
# Exit early if the file can't be used to determine the context
if not self._find_context(transferred_file):
logger.debug(f"Couldn't find context for {str(transferred_file)!r}")
return
else:
logger.info(f"Context found successfully using {transferred_file}")

# Trigger processing or metadata parsing according to the context
# Go through the straightforward ones first
if "CLEMContext" in str(self._context):
logger.debug(f"File {transferred_file.name!r} is part of CLEM workflow")
self.post_transfer(transferred_file)
elif "FIBContext" in str(self._context):
logger.debug(
f"File {transferred_file.name!r} is part of the FIB workflow"
)
self.post_transfer(transferred_file)
else:
dc_metadata = {}
elif "SXTContext" in str(self._context):
logger.debug(f"File {transferred_file.name!r} is an SXT file")
self.post_transfer(transferred_file)
elif "AtlasContext" in str(self._context):
logger.debug(f"File {transferred_file.name!r} is part of the atlas")
self.post_transfer(transferred_file)

# Handle files with tomography and SPA context differently
elif (
any(
context in str(self._context)
for context in (
"SPAContext",
"SPAMetadataContext",
"TomographyContext",
"TomographyMetadataContext",
)
)
and self._context is not None
):
context = str(self._context).split(" ")[0].split(".")[-1]

dc_metadata: OrderedDict | None = None
if not self._serialem and (
self._force_mdoc_metadata
and transferred_file.suffix == ".mdoc"
or mdoc_for_reading
or self._mdoc_for_reading
):
if self._context:
try:
dc_metadata = self._context.gather_metadata(
mdoc_for_reading or transferred_file,
environment=self._environment,
)
except KeyError as e:
logger.error(
f"Metadata gathering failed with a key error for key: {e.args[0]}"
)
raise e
if not dc_metadata:
mdoc_for_reading = None
elif transferred_file.suffix == ".mdoc":
mdoc_for_reading = transferred_file
if not self._context:
if not self._find_extension(transferred_file):
logger.debug(f"No extension found for {transferred_file}")
continue
if not self._find_context(transferred_file):
logger.debug(
f"Couldn't find context for {str(transferred_file)!r}"
try:
dc_metadata = self._context.gather_metadata(
self._mdoc_for_reading or transferred_file,
environment=self._environment,
)
self.queue.task_done()
continue
elif self._extension:
logger.info(
f"Context found successfully for {transferred_file}"
except KeyError as e:
logger.error(
f"Metadata gathering failed with a key error for key: "
f"{e.args[0]}"
)
try:
self._context.post_first_transfer(
transferred_file,
environment=self._environment,
)
except Exception as e:
logger.error(f"Exception encountered: {e}")
if "AtlasContext" not in str(self._context):
if not dc_metadata:
try:
dc_metadata = self._context.gather_metadata(
self._xml_file(transferred_file),
environment=self._environment,
)
except NotImplementedError:
dc_metadata = {}
except KeyError as e:
logger.error(
f"Metadata gathering failed with a key error for key: {e.args[0]}"
)
raise e
except ValueError as e:
logger.error(
f"Metadata gathering failed with a value error: {e}"
)
if not dc_metadata or not self._force_mdoc_metadata:
self._unseen_xml.append(transferred_file)
else:
self._unseen_xml = []
if dc_metadata.get("file_extension"):
self._extension = dc_metadata["file_extension"]
else:
dc_metadata["file_extension"] = self._extension
dc_metadata["acquisition_software"] = (
self._context._acquisition_software
)
self.notify(dc_metadata)

# Contexts that can be immediately posted without additional work
elif "CLEMContext" in str(self._context):
logger.debug(
f"File {transferred_file.name!r} is part of CLEM workflow"
)
self.post_transfer(transferred_file)
elif "FIBContext" in str(self._context):
logger.debug(
f"File {transferred_file.name!r} is part of the FIB workflow"
)
self.post_transfer(transferred_file)
elif "SXTContext" in str(self._context):
logger.debug(f"File {transferred_file.name!r} is an SXT file")
self.post_transfer(transferred_file)
elif "AtlasContext" in str(self._context):
logger.debug(f"File {transferred_file.name!r} is part of the atlas")
self.post_transfer(transferred_file)

# Handle files with tomography and SPA context differently
elif not self._extension or self._unseen_xml:
raise e
# Set the mdoc field to None if no metadata was found
if not dc_metadata:
self._mdoc_for_reading = None

if not self._extension or self._unseen_xml:
# Early return if no extension was found
if not self._find_extension(transferred_file):
logger.error(f"No extension found for {transferred_file}")
continue
if self._extension:
logger.warning(f"No extension found for {transferred_file}")
return
else:
logger.info(
f"Extension found successfully for {transferred_file}"
)
try:
self._context.post_first_transfer(
transferred_file,
environment=self._environment,
)
except Exception as e:
logger.error(f"Exception encountered: {e}")
if not dc_metadata:
try:
dc_metadata = self._context.gather_metadata(
mdoc_for_reading
or self._xml_file(transferred_file),
environment=self._environment,
)
except KeyError as e:
logger.error(
f"Metadata gathering failed with a key error for key: {e.args[0]}"
)
raise e
if not dc_metadata or not self._force_mdoc_metadata:
mdoc_for_reading = None
self._unseen_xml.append(transferred_file)
if dc_metadata:
self._unseen_xml = []
if dc_metadata.get("file_extension"):
self._extension = dc_metadata["file_extension"]
else:
dc_metadata["file_extension"] = self._extension
dc_metadata["acquisition_software"] = (
self._context._acquisition_software
)
self.notify(dc_metadata)
elif any(
context in str(self._context)
for context in (
"SPAContext",
"SPAMetadataContext",
"TomographyContext",
"TomographyMetadataContext",
)
):
context = str(self._context).split(" ")[0].split(".")[-1]
logger.debug(
f"Transferring file {str(transferred_file)} with context {context!r}"

logger.debug(
f"Transferring file {str(transferred_file)} with context {context!r}"
)
self.post_transfer(transferred_file)

if not dc_metadata and transferred_file.suffix != ".mdoc":
try:
dc_metadata = self._context.gather_metadata(
self._mdoc_for_reading or self._xml_file(transferred_file),
environment=self._environment,
)
except KeyError as e:
logger.error(
f"Metadata gathering failed with a key error for key: {e.args[0]}"
)
raise e
if not dc_metadata or not self._force_mdoc_metadata:
self._mdoc_for_reading = None
self._unseen_xml.append(transferred_file)
if dc_metadata:
self._unseen_xml = []
if dc_metadata.get("file_extension"):
self._extension = dc_metadata["file_extension"]
else:
dc_metadata["file_extension"] = self._extension
dc_metadata["acquisition_software"] = (
self._context._acquisition_software
)
self.post_transfer(transferred_file)
self.queue.task_done()
logger.debug("Analyer thread has stopped analysing incoming files")
self.notify(final=True)
self.notify(dc_metadata)
return

def _xml_file(self, data_file: Path) -> Path:
if not self._environment:
Expand Down
Loading
Loading