Module pyminflux.reader
Readers of MINFLUX data.
Sub-modules
pyminflux.reader.metadatapyminflux.reader.util
Classes
class MSRReader (filename: pathlib.Path | str)-
Expand source code
class MSRReader: """Reads data and metadata information from `.MSR` (OBF format) files. For documentation, see: https://imspectordocs.readthedocs.io/en/latest/fileformat.html#the-obf-file-format Note: binary data is stored in little-endian order. """ def __init__(self, filename: Union[Path, str]): """Constructor. Parameters ---------- filename: Union[Path, str] Full path to the file name to open. """ # Store the filename self.filename = Path(filename) # File header self.obf_file_header = OBFFileHeader() # Metadata self.obf_file_metadata = OBFFileMetadata() # List of stack metadata objects self._obf_stacks_list: list[OBFStackMetadata] = [] def scan(self) -> bool: """Scan the metadata of the file. Returns ------- success: bool True if the file was scanned successfully, False otherwise. """ # Open the file with open(self.filename, mode="rb") as f: if not self._read_obf_header(f): return False # Scan metadata self.obf_file_metadata = self._scan_metadata(f, self.obf_file_header) # Get the first stack position next_stack_pos = self.obf_file_header.first_stack_pos while next_stack_pos != 0: # Scan the next stack success, obs_stack_metadata = self._read_obf_stack(f, next_stack_pos) if not success: return False # Append current stack header self._obf_stacks_list.append(obs_stack_metadata) # Do we have a next header to parse? next_stack_pos = obs_stack_metadata.next_stack_pos return True def __getitem__(self, stack_index: int) -> Union[OBFStackMetadata, None]: """Allows accessing the reader with the `[]` notation to get the next stack metadata. Parameters ---------- stack_index: int Index of the stack to be retrieved. Returns ------- metadata: Union[OBFStackMetadata, None] Metadata for the requested stack, or None if no file was loaded. """ # Is anything loaded? if len(self._obf_stacks_list) == 0: return None if stack_index < 0 or stack_index > (len(self._obf_stacks_list) - 1): raise ValueError(f"Index value {stack_index} is out of bounds.") # Get and return the metadata metadata = self._obf_stacks_list[stack_index] return metadata def __iter__(self): """Return the iterator. Returns ------- iterator """ self._current_index = 0 return self def __next__(self): if self._current_index < len(self._obf_stacks_list): metadata = self.__getitem__(self._current_index) self._current_index += 1 return metadata else: raise StopIteration @property def num_stacks(self): """Return the number of stacks contained in the file.""" return len(self._obf_stacks_list) def get_data_physical_sizes( self, stack_index: int, scaled: bool = True ) -> Union[list, None]: """Returns the (scaled) data physical size for the requested stack. Parameters ---------- stack_index: int Index of the stack for which to read the data. scaled: bool If scaled is True, the physical sizes will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units(). Returns ------- offsets: Union[list, None] Physical sizes for 2D images, None otherwise. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"stack_index={stack_index} is out of bounds.") # Get the metadata for the requested stack obf_stack_metadata = self._obf_stacks_list[stack_index] if obf_stack_metadata is None: return None # Get the physical lengths phys_lengths = obf_stack_metadata.physical_lengths[: obf_stack_metadata.rank] # Do we need to scale? if scaled: _, factors = self.get_data_units(stack_index=stack_index) for i, factor in enumerate(factors): if factor != 1.0: phys_lengths[i] *= factor # Return the physical lengths as list return phys_lengths def get_data_offsets( self, stack_index: int, scaled: bool = True ) -> Union[list, None]: """Returns the (scaled) data offsets for the requested stack. Parameters ---------- stack_index: int Index of the stack for which to read the data. scaled: bool If scaled is True, the offsets will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units(). Returns ------- offsets: Union[list, None] Offsets for 2D images, None otherwise. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"stack_index={stack_index} is out of bounds.") # Get the metadata for the requested stack obf_stack_metadata = self._obf_stacks_list[stack_index] if obf_stack_metadata is None: return None # Get the offsets offsets = obf_stack_metadata.physical_offsets[: obf_stack_metadata.rank] # Do we need to scale? if scaled: _, factors = self.get_data_units(stack_index=stack_index) for i, factor in enumerate(factors): if factor != 1.0: offsets[i] *= factor return offsets def get_data_pixel_sizes( self, stack_index: int, scaled: bool = True ) -> Union[list, None]: """Returns the (scaled) data pixel size for the requested stack. Parameters ---------- stack_index: int Index of the stack for which to read the data. scaled: bool If scaled is True, the pixel sizes will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units(). Returns ------- offsets: Union[list, None] Pixel sizes for 2D images, None otherwise. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"stack_index={stack_index} is out of bounds.") # Get the metadata for the requested stack obf_stack_metadata = self._obf_stacks_list[stack_index] if obf_stack_metadata is None: return None # Get the physical sizes phys_lengths = self.get_data_physical_sizes( stack_index=stack_index, scaled=scaled ) # Get the number of pixels along each dimension num_pixels = obf_stack_metadata.num_pixels[: obf_stack_metadata.rank] # Now divide by the image size pixel_sizes = np.array(phys_lengths) / np.array(num_pixels) # Return the pixel size as list return pixel_sizes.tolist() def get_data_units(self, stack_index: int) -> Union[tuple[list, list], None]: """Returns the data units and scale factors per dimension for requested stack. Units are one of: "m": meters "kg": kilograms "s": s "A": Amperes "K": Kelvin "mol": moles "cd": candela "r": radian "sr": sr Parameters ---------- stack_index: int Index of the stack for which to read the data. Returns ------- unit: Union[tuple[list, list], None] List of units and list of scale factors, or None if no file was opened. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"stack_index={stack_index} is out of bounds.") # Get the metadata for the requested stack obf_stack_metadata = self._obf_stacks_list[stack_index] if obf_stack_metadata is None: return None units = [] scale_factors = [] for dim in range(obf_stack_metadata.rank): dimensions = obf_stack_metadata.si_dimensions[dim] scale_factors.append(dimensions.scale_factor) for i, exponent in enumerate(dimensions.exponents): if i == 0 and exponent.numerator > 0: units.append("m") break elif i == 1 and exponent.numerator > 0: units.append("kg") break elif i == 2 and exponent.numerator > 0: units.append("s") break elif i == 3 and exponent.numerator > 0: units.append("A") break elif i == 4 and exponent.numerator > 0: units.append("K") break elif i == 5 and exponent.numerator > 0: units.append("mol") break elif i == 6 and exponent.numerator > 0: units.append("cd") break elif i == 7 and exponent.numerator > 0: units.append("r") break elif i == 8 and exponent.numerator > 0: units.append("sr") break else: units.append("") break # Return the extracted units and scale factors return units, scale_factors def get_data(self, stack_index: int) -> Union[np.ndarray, None]: """Read the data for requested stack: only images are returned. Parameters ---------- stack_index: int Index of the stack for which to read the data. Returns ------- frame: Union[np.ndarray, None] Data as a 2D NumPy array. None if it could not be read or if it was not a 2D image. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"stack_index={stack_index} is out of bounds.") # Get the metadata for the requested stack obf_stack_metadata = self._obf_stacks_list[stack_index] # Currently, we only support format 6 and newer if obf_stack_metadata.format_version < 6: print("Reading data is supported only for stack format 6 and newer.") return None # If there are chunks, we currently do not read if obf_stack_metadata.num_chunk_positions > 0: print("Reading chunked data is currently not supported.") return None # We currently only read 2D images if self._get_num_dims(obf_stack_metadata.num_pixels) != 2: print("Only 2D images are currently supported.") return None # Get NumPy data type np_data_type, _ = self._get_numpy_data_type( obf_stack_metadata.data_type_on_disk ) if np_data_type is None: print("Unsupported data type.") return None # Extract some info height = obf_stack_metadata.num_pixels[1] width = obf_stack_metadata.num_pixels[0] bytes_per_sample = obf_stack_metadata.bytes_per_sample # Expected number of (decompressed) samples expected_num_samples = width * height # Number of written bytes written_bytes = obf_stack_metadata.samples_written * bytes_per_sample # Open the file with open(self.filename, mode="rb") as f: # Seek to the beginning of the data f.seek(obf_stack_metadata.data_start_position) # Is there compression? if obf_stack_metadata.compression_type != 0: # Read the bytes compressed_data = f.read(written_bytes) # Decompress them decompressed_data = zlib.decompress(compressed_data) # Cast to a "byte" NumPy array raw_frame = np.frombuffer(decompressed_data, dtype=np.uint8) else: # Read the bytes raw_data = f.read(written_bytes) # Cast to a "byte" NumPy array raw_frame = np.frombuffer(raw_data, dtype=np.uint8) # Reinterpret as final data type format (little Endian) frame = raw_frame.view(np.dtype(np_data_type)) # Make sure the final frame size matches the expected size if len(frame) != expected_num_samples: print("Unexpected length of data retrieved!") return None # Reshape frame = frame.reshape((height, width)) return frame def get_ome_xml_metadata(self) -> Union[str, None]: """Return the OME XML metadata. Returns ------- ome_xml_metadata: Union[str, None] OME XML metadata as formatted string. If no file was loaded, returns None. """ # Get the ome-xml tree root = self.obf_file_metadata.tree if root is None: return None # Return metadata as formatted XML string return self._tree_to_formatted_xml(root) def export_ome_xml_metadata(self, file_name: Union[str, Path]): """Export the OME-XML metadata to file. Parameters ---------- file_name: Union[str, Path] Output file name. """ # Get the ome-xml tree, optionally as formatted string metadata = self.get_ome_xml_metadata() if metadata is None: print("Nothing to export.") return # Make sure the parent path to the file exists Path(file_name).parent.mkdir(parents=True, exist_ok=True) # Save to file with open(file_name, "w", encoding="utf-8") as f: f.write(metadata) def get_tag_dictionary(self, stack_index: int) -> Union[dict, None]: """Return the tag dictionary for the requested stack. Parameters ---------- stack_index: int Index of the stack for which to return the tag dictionary. Returns ------- tag_dictionary: Union[dict, None] Dictionary. If no file was loaded, returns None. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"Stack number {stack_index} is out of range.") # Get stack metadata obf_stack_metadata = self._obf_stacks_list[stack_index] if obf_stack_metadata is None: return None # Get the tag dictionary tag = obf_stack_metadata.tag_dictionary # Return the tag dictionary return tag def export_tag_dictionary(self, stack_index: int, file_name: Union[str, Path]): """Export the tag dictionary to file. Parameters ---------- stack_index: int Index of the stack for which to export the tag dictionary. file_name: Union[str, Path] Output file name. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"Stack number {stack_index} is out of range.") # Get tag dictionary tag_dictionary = self.get_tag_dictionary(stack_index) if tag_dictionary is None: return None # Make sure file_name is of type Path file_name = Path(file_name) # Make sure the parent path to the file exists file_name.parent.mkdir(parents=True, exist_ok=True) # Export the dictionaries for key, value in tag_dictionary.items(): if type(value) is ET.Element: mod_file_name = file_name.parent / f"{file_name.stem}_{key}.xml" xml_str = self._tree_to_formatted_xml(value) with open(mod_file_name, "w") as f: f.write(xml_str) elif type(value) is dict: mod_file_name = file_name.parent / f"{file_name.stem}_{key}.json" with open(mod_file_name, "w") as f: json.dump(value, f, indent=4) else: mod_file_name = file_name.parent / f"{file_name.stem}_{key}.txt" with open(mod_file_name, "w") as f: f.write(value) @staticmethod def _tree_to_formatted_xml(root: ET, xml_declaration: bool = True) -> str: """Converts an xml. tree to formatted xml. Parameters ---------- root: xml.etree.ElementTree Root element of the xml tree. xml_declaration: bool Whether to prepend the xml declaration to the converted xml. Returns ------- xml_str: str Formatted xml. """ # Format tree (optionally add xml declaration) xml_str = ET.tostring( root, encoding="utf-8", xml_declaration=xml_declaration, method="xml" ).decode("utf-8") # Remove tabs and new lines xml_str = re.sub(r"[\n\t]", "", xml_str) # Remove stretches of blank spaces between nodes xml_str = re.sub(r">\s+<", "><", xml_str) return xml_str @staticmethod def _get_footer_struct_size(version: int) -> int: """Returns the size in pixel of the footer structure for given version. Parameters ---------- version: int Version number. Returns ------- size: int Size in bytes of the footer structure for specified version. """ if version == 0: return 0 elif version == 1: return _Constants.V1A_FOOTER_LENGTH # We return version "1A" elif version == 2: return _Constants.V2_FOOTER_LENGTH elif version == 3: return _Constants.V3_FOOTER_LENGTH elif version == 4: return _Constants.V4_FOOTER_LENGTH elif version == 5: return _Constants.V5A_FOOTER_LENGTH # We return version "5A" elif version == 6: return _Constants.V6_FOOTER_LENGTH elif version == 7: return _Constants.V7_FOOTER_LENGTH else: raise ValueError(f"Unexpected stack version {version}.") @staticmethod def _get_num_dims(num_pixels: list[uint32]): """Return the number of dimensions of the data. Parameters ---------- num_pixels: list[uint32] List of number of pixels per dimension. Returns ------- n_dims: int Number of dimensions for which the number of pixels is larger than 1. """ n_dims = int(np.sum(np.array(num_pixels) > 1)) return n_dims @staticmethod def _get_numpy_data_type( data_type_on_disk: uint32, ) -> tuple[Union[np.dtype, None], Union[str, None]]: """Get the NumPy data type corresponding to the stored datatype. Parameters ---------- data_type_on_disk: uint32 UInt32 value from the stack metadata indicating the type of the data. Returns ------- numpy_type: np.dtype Numpy dtype class. If the data type is not supported, returns None instead. str_type: str Type string (little endian). If the data type is not supported, returns None instead. """ if data_type_on_disk == 0x00000001: return np.uint8, "<u1" elif data_type_on_disk == 0x00000002: return np.int8, "<i1" elif data_type_on_disk == 0x00000004: return np.uint16, "<u2" elif data_type_on_disk == 0x00000008: return np.int16, "<i2" elif data_type_on_disk == 0x00000010: return np.uint32, "<u4" elif data_type_on_disk == 0x00000020: return np.int32, "<i4" elif data_type_on_disk == 0x00000040: return np.float32, "<f4" elif data_type_on_disk == 0x00000080: return np.float64, "<f8" elif data_type_on_disk == 0x00001000: return np.uint64, "<u8" elif data_type_on_disk == 0x00002000: return np.int64, "<i8" else: return None, None def _read_obf_header(self, f: BinaryIO) -> bool: """Read the OBF header. Parameters ---------- f: BinaryIO Open file handle. Returns ------- success: bool True if reading the file header was successful, False otherwise. """ # Read the magic header magic_header = f.read(10) if not magic_header == b"OMAS_BF\n\xff\xff": print("Not a valid MSR (OBF) file.") return False # Store the magic header self.obf_file_header.magic_header = magic_header # Get format version (uint32) self.obf_file_header.format_version = struct.unpack("<I", f.read(4))[0] if self.obf_file_header.format_version < 2: print("The MSR (OBF) file must be version 2 or above.") return False # Get position of the first stack header in the file (uint64) self.obf_file_header.first_stack_pos = struct.unpack("<Q", f.read(8))[0] # Get length of following utf-8 description (uint32) self.obf_file_header.descr_len = struct.unpack("<I", f.read(4))[0] # Get description (bytes -> utf-8) description = "" if self.obf_file_header.descr_len > 0: description = f.read(self.obf_file_header.descr_len).decode( "utf-8", errors="replace" ) self.obf_file_header.description = description # Get metadata position (uint64) self.obf_file_header.meta_data_position = struct.unpack("<Q", f.read(8))[0] return True def _read_obf_stack( self, f: BinaryIO, next_stack_pos: int ) -> tuple[bool, OBFStackMetadata]: """Read current OBF stack metadata (header + footer). Parameters ---------- f: BinaryIO Open file handle. next_stack_pos: int Position in file where the next stack starts. Returns ------- success: bool Whether parsing was successful. obf_stack_metadata: OBFStackMetadata OFBStackMetadata object. """ # Initialize the metadata obf_stack_metadata = OBFStackMetadata() # Move at the beginning of the stack f.seek(next_stack_pos) # Read the header success, obf_stack_metadata = self._read_obf_stack_header(f, obf_stack_metadata) if not success: return False, obf_stack_metadata # Process the footer obf_stack_metadata = self._read_obf_stack_footer(f, obf_stack_metadata) # Return return True, obf_stack_metadata def _read_obf_stack_header( self, f: BinaryIO, obf_stack_metadata: OBFStackMetadata ) -> tuple[bool, OBFStackMetadata]: """Read the OBF stack header and update metadata. The file should already be positioned at the right location. Parameters ---------- f: BinaryIO File handle to open. obf_stack_metadata: OBFStackMetadata Current OFBStackMetadata object Returns ------- success: bool Whether parsing was successful. obf_stack_metadata: OBFStackMetadata Updated OFBStackMetadata object """ # Read the magic header obf_stack_metadata.magic_header = f.read(16) if not obf_stack_metadata.magic_header == b"OMAS_BF_STACK\n\xff\xff": print("Could not find OBF stack header.") return False, obf_stack_metadata # Get format version (uint32) obf_stack_metadata.format_version = struct.unpack("<I", f.read(4))[0] # Get the number of valid dimensions obf_stack_metadata.rank = struct.unpack("<I", f.read(4))[0] # Get the number of pixels along each dimension obf_stack_metadata.num_pixels = [] for i in range(_Constants.BF_MAX_DIMENSIONS): n = struct.unpack("<I", f.read(4))[0] if i < obf_stack_metadata.rank: obf_stack_metadata.num_pixels.append(n) # Get the physical lengths along each dimension obf_stack_metadata.physical_lengths = [] for i in range(_Constants.BF_MAX_DIMENSIONS): p = struct.unpack("<d", f.read(8))[0] if i < obf_stack_metadata.rank: obf_stack_metadata.physical_lengths.append(p) # Get the physical lengths along each dimension obf_stack_metadata.physical_offsets = [] for i in range(_Constants.BF_MAX_DIMENSIONS): o = struct.unpack("<d", f.read(8))[0] if i < obf_stack_metadata.rank: obf_stack_metadata.physical_offsets.append(o) # Read the data type; it should be one of: # 0x00000000: automatically determine the data type # 0x00000001: uint8 # 0x00000002: int8 # 0x00000004: uint16 # 0x00000008: int16 # 0x00000010: uint32 # 0x00000020: int32 # 0x00000040: float32 # 0x00000080: float64 (double) # 0x00000400: Byte RGB, 3 samples per pixel # 0x00000800: Byte RGB, 4 samples per pixel # 0x00001000: uint64 # 0x00002000: int64 # 0x00010000: (c++) boolean # # Note: all numeric formats have a complex-number variant with # format: data_type | 0x40000000 obf_stack_metadata.data_type_on_disk = struct.unpack("<I", f.read(4))[0] obf_stack_metadata.bytes_per_sample = self._get_bytes_per_sample_from_data_type( obf_stack_metadata.data_type_on_disk ) # Compression type (0 for none, 1 for zip) obf_stack_metadata.compression_type = struct.unpack("<I", f.read(4))[0] # Compression level (0 through 9) obf_stack_metadata.compression_level = struct.unpack("<I", f.read(4))[0] # Length of the stack name obf_stack_metadata.length_stack_name = struct.unpack("<I", f.read(4))[0] # Description length obf_stack_metadata.length_stack_description = struct.unpack("<I", f.read(4))[0] # Reserved field obf_stack_metadata.reserved = struct.unpack("<Q", f.read(8))[0] # Data length on disk obf_stack_metadata.data_len_disk = struct.unpack("<Q", f.read(8))[0] # Next stack position in the file obf_stack_metadata.next_stack_pos = struct.unpack("<Q", f.read(8))[0] # Scan also stack name and description (right after the end of the header) obf_stack_metadata.stack_name = ( "" if obf_stack_metadata.length_stack_name == 0 else f.read(obf_stack_metadata.length_stack_name).decode( "utf-8", errors="replace" ) ) obf_stack_metadata.stack_description = ( "" if obf_stack_metadata.length_stack_description == 0 else f.read(obf_stack_metadata.length_stack_description).decode("utf-8") ) # Now we are at the beginning of the stack (image or other) obf_stack_metadata.data_start_position = f.tell() # Start position of the footer footer_start_position = ( obf_stack_metadata.data_start_position + obf_stack_metadata.data_len_disk ) # Move to the beginning of the footer f.seek(footer_start_position) return True, obf_stack_metadata def _read_obf_stack_footer(self, f: BinaryIO, obf_stack_metadata: OBFStackMetadata): """Process footer. Parameters ---------- f: BinaryIO Open file handle. obf_stack_metadata: OBFStackMetadata Metadata object for current stack. Returns ------- obf_stack_metadata: OBFFileMetadata Updated metadata object for current stack. """ # # Version 0 # # Current position (beginning of the footer) obf_stack_metadata.footer_start_pos = f.tell() # If stack version is 0, there is no footer if obf_stack_metadata.format_version == 0: obf_stack_metadata.footer_size = 0 return obf_stack_metadata # # Version 1/1A # # What is the expected size of the footer for this header version? size_for_version = self._get_footer_struct_size( obf_stack_metadata.format_version ) # Keep track ot the side while we proceed current_size = 0 # Get size of the footer header obf_stack_metadata.footer_size = struct.unpack("<I", f.read(4))[0] current_size += 4 # Position of the beginning of the variable metadata obf_stack_metadata.variable_metadata_start_position = ( obf_stack_metadata.footer_start_pos + obf_stack_metadata.footer_size ) # Entries are != 0 for all axes that have a pixel position array (after the footer) col_positions_present = [] for i in range(_Constants.BF_MAX_DIMENSIONS): p = struct.unpack("<I", f.read(4))[0] if i < obf_stack_metadata.rank: col_positions_present.append(p != 0) current_size += 4 obf_stack_metadata.has_col_positions = col_positions_present # Entries are != 0 for all axes that have a label (after the footer) col_labels_present = [] for i in range(_Constants.BF_MAX_DIMENSIONS): b = struct.unpack("<I", f.read(4))[0] if i < obf_stack_metadata.rank: col_labels_present.append(b != 0) current_size += 4 obf_stack_metadata.has_col_labels = col_labels_present # Metadata length (superseded by tag dictionary in version > 4) obf_stack_metadata.obsolete_metadata_length = struct.unpack("<I", f.read(4))[0] current_size += 4 # Internal check assert ( current_size == _Constants.V1A_FOOTER_LENGTH ), "Unexpected length of version 1/1A data." # Have we read enough for this version? if current_size > size_for_version: return obf_stack_metadata # # Version 2 # # SI units of the value carried fractions = [] for i in range(_Constants.OBF_SI_FRACTION_NUM_ELEMENTS): numerator = struct.unpack("<i", f.read(4))[0] denominator = struct.unpack("<i", f.read(4))[0] fractions.append(SIFraction(numerator=numerator, denominator=denominator)) current_size += 8 scale_factor = struct.unpack("<d", f.read(8))[0] current_size += 8 si_value = SIUnit(exponents=fractions, scale_factor=scale_factor) obf_stack_metadata.si_value = si_value # SI units of the axes dimensions = [] for i in range(_Constants.BF_MAX_DIMENSIONS): fractions = [] for j in range(_Constants.OBF_SI_FRACTION_NUM_ELEMENTS): numerator = struct.unpack("<i", f.read(4))[0] denominator = struct.unpack("<i", f.read(4))[0] fractions.append( SIFraction(numerator=numerator, denominator=denominator) ) current_size += 8 scale_factor = struct.unpack("<d", f.read(8))[0] current_size += 8 dimensions.append(SIUnit(exponents=fractions, scale_factor=scale_factor)) # Add all SI dimensions obf_stack_metadata.si_dimensions = dimensions # Internal check assert ( current_size == _Constants.V2_FOOTER_LENGTH ), "Unexpected length of version 2 data." # Have we read enough for this version? if current_size > size_for_version: return obf_stack_metadata # # Version 3 # # The number of flush points num_flush_points = struct.unpack("<Q", f.read(8))[0] current_size += 8 obf_stack_metadata.num_flush_points = num_flush_points # The flush block size flush_block_size = struct.unpack("<Q", f.read(8))[0] current_size += 8 obf_stack_metadata.flush_block_size = flush_block_size # Internal check assert ( current_size == _Constants.V3_FOOTER_LENGTH ), "Unexpected length of version 3 data." # Have we read enough for this version? if current_size > size_for_version: return obf_stack_metadata # # Version 4 # obf_stack_metadata.tag_dictionary_length = struct.unpack("<Q", f.read(8))[0] current_size += 8 # Internal check assert ( current_size == _Constants.V4_FOOTER_LENGTH ), "Unexpected length of version 4 data." # Have we read enough for this version? if current_size > size_for_version: return obf_stack_metadata # # Version 5/5A # # Where on disk all the meta-data ends obf_stack_metadata.stack_end_disk = struct.unpack("<Q", f.read(8))[0] current_size += 8 # Min supported format version obf_stack_metadata.min_format_version = struct.unpack("<I", f.read(4))[0] current_size += 4 # The position where the stack ends on disk. obf_stack_metadata.stack_end_used_disk = struct.unpack("<Q", f.read(8))[0] current_size += 8 # Internal check assert ( current_size == _Constants.V5A_FOOTER_LENGTH ), "Unexpected length of version 5/5A data." # Have we read enough for this version? if current_size > size_for_version: return obf_stack_metadata # # Version 6 # # The total number of samples available on disk. By convention all remaining data is # assumed to be zero or undefined. If this is less than the data contained of the stack # it is safe to assume that the stack was truncated by ending the measurement early. # If 0, the number of samples written is the one expected from the stack size. obf_stack_metadata.samples_written = struct.unpack("<Q", f.read(8))[0] current_size += 8 obf_stack_metadata.num_chunk_positions = struct.unpack("<Q", f.read(8))[0] current_size += 8 # Internal check assert ( current_size == _Constants.V6_FOOTER_LENGTH ), "Unexpected length of version 6 data." # Have we read enough for this version? if current_size > size_for_version: return obf_stack_metadata # # Version 7 # # There is no new documented footer metadata for version 7. # # Read data after the end of footer # f.seek(obf_stack_metadata.variable_metadata_start_position) # Read labels labels = [] for i in range(obf_stack_metadata.rank): n = struct.unpack("<I", f.read(4))[0] label = f.read(n).decode("utf-8") labels.append(label) obf_stack_metadata.labels = labels # Read steps (where presents) steps = [] for dimension in range(obf_stack_metadata.rank): lst = [] if obf_stack_metadata.has_col_positions[dimension]: for position in range(obf_stack_metadata.num_pixels[dimension]): step = struct.unpack("<d", f.read(8))[0] lst.append(step) steps.append(lst) # Skip the obsolete metadata f.seek(f.tell() + obf_stack_metadata.obsolete_metadata_length) # Flush points if obf_stack_metadata.num_flush_points > 0: flush_points = [] for i in range(obf_stack_metadata.num_flush_points): flush_points.append(struct.unpack("<Q", f.read(8))[0]) obf_stack_metadata.flush_points = flush_points # Tag dictionary tag_dictionary = {} length_key = 1 while length_key > 0: new_key = self._read_string(f) length_key = len(new_key) if length_key > 0: # Get value new_value = self._read_string(f, as_str=True, as_utf8=True) # Try to process it try: tree = ET.fromstring(new_value) except ET.ParseError: # Some keys are not XML, but stringified dictionaries try: tree = json.loads(new_value) except json.JSONDecodeError as e: print( f"Failed processing value for key '{new_key}' ({e}): storing as raw string." ) tree = new_value # Store it without further processing tag_dictionary[new_key] = tree obf_stack_metadata.tag_dictionary = tag_dictionary # Chunk positions if obf_stack_metadata.num_chunk_positions > 0: logical_positions = [] file_positions = [] # Start with 0 logical_positions.append(0) file_positions.append(0) for i in range(obf_stack_metadata.num_chunk_positions): logical_positions.append(struct.unpack("<Q", f.read(8))[0]) file_positions.append(struct.unpack("<Q", f.read(8))[0]) obf_stack_metadata.chunk_logical_positions = logical_positions obf_stack_metadata.chunk_file_positions = file_positions # Return return obf_stack_metadata def _scan_metadata( self, f: BinaryIO, obf_file_header: OBFFileHeader ) -> Union[OBFFileMetadata, None]: """Scan the metadata at the location stored in the header. The expected values are a key matching: "ome_xml" followed by valid OME XML metadata that we parse and return as an ElementTree. Parameters ---------- f: BinaryIO Open file handle. obf_file_header: OBFFileHeader File header structure. Returns ------- metadata: OBFFileMetadata OME-XML file metadata. """ if obf_file_header.meta_data_position == 0: return None # Remember current position current_pos = f.tell() # Move to the beginning of the metadata f.seek(obf_file_header.meta_data_position) # Initialize OBFFileMetadata object metadata = OBFFileMetadata() # Keep reading strings until done strings = [] length_str = 1 while length_str > 0: new_str = self._read_string(f) length_str = len(new_str) if length_str > 0: strings.append(new_str) # Now parse success = False tree = None if len(strings) == 2 and strings[0] == "ome_xml": try: tree = ET.fromstring(strings[1]) success = True except ET.ParseError as e: success = False if not success: metadata.tree = None metadata.unknown_strings = strings else: metadata.tree = tree metadata.unknown_strings = [] # Return to previous file position f.seek(current_pos) return metadata @staticmethod def _read_string( f: BinaryIO, as_str: bool = True, as_utf8: bool = True ) -> Union[str, bytes]: """Read a string at current position. Parameters ---------- f: BinaryIO Open file handles. as_str: bool = True If True parse the raw byte array to string. as_utf8: bool = True If True decode the string to utf-8. Ignored if as_str is False. Returns ------- string: Union[bytes, str] Either raw bytes or a str, optionally utf-8 encoded. """ # Read the length of the following string length = struct.unpack("<I", f.read(4))[0] if length == 0: return "" # Read `length` bytes and convert them to utf-8 if requested value = f.read(length) if as_str: if as_utf8: value = value.decode("utf-8") return value @staticmethod def _get_bytes_per_sample_from_data_type(data_type: uint32) -> int: """Return the number of bytes per sample for given data type.""" supported_types = { 0x00000001: 1, # 8-bit unsigned byte 0x00000002: 1, # 8-bit signed char 0x00000004: 2, # 16-bit word value 0x00000008: 2, # 16-bit signed integer 0x00000010: 4, # 32-bit unsigned integer 0x00000020: 4, # 32-bit signed integer 0x00000040: 4, # 32-bit floating point value 0x00000080: 8, # 64-bit floating point value } # Get the number of bytes num_bytes_per_sample = supported_types.get(data_type, -1) # Check that it is supported if num_bytes_per_sample == -1: raise ValueError(f"Unsupported data type 0x{data_type:08x}.") # Return it return num_bytes_per_sample def get_image_info_list(self): """Return a list of images from all stacks.""" # Initialize the list images = [] # Do we have images? if self.num_stacks == 0: return images for i, stack in enumerate(self._obf_stacks_list): # Only return images if (np.array(stack.num_pixels) > 1).sum() == 2: # Get pixel size pixel_sizes = np.round( np.array(self.get_data_pixel_sizes(stack_index=i)) * 1e9, 2 )[:2] # Get detector detector = self._get_detector( imspector_dictionary_root=stack.tag_dictionary["imspector"], img_name=stack.stack_name, ) if detector is None: continue # Build a (univocal) summary string as_string = ( f"{detector}: {stack.stack_name}: " f"size = (h={stack.num_pixels[1]} x w={stack.num_pixels[0]}); " f"pixel size = {pixel_sizes[0]}nm " f"(index = {i})" ) images.append( { "index": i, "name": stack.stack_name, "detector": detector, "description": stack.stack_description, "num_pixels": stack.num_pixels, "physical_lengths": stack.physical_lengths, "physical_offsets": stack.physical_offsets, "pixel_sizes": pixel_sizes, "as_string": as_string, } ) # Sort the list using natural sorting by the 'as_string' key images = natsorted(images, key=lambda x: x["as_string"]) # Return the extracted metadata return images def get_image_info_dict(self): """Return a hierarchical dictionary of images from all stacks.""" # Initialize the dictionary images = {} # Do we have images? if self.num_stacks == 0: return images for i, stack in enumerate(self._obf_stacks_list): # Only return images if (np.array(stack.num_pixels) > 1).sum() == 2: # Get pixel size pixel_sizes = np.round( np.array(self.get_data_pixel_sizes(stack_index=i)) * 1e9, 2 )[:2] # Get detector detector = self._get_detector( imspector_dictionary_root=stack.tag_dictionary["imspector"], img_name=stack.stack_name, ) if detector is None: continue # Get acquisition number match = re.match( r"^.+{(?P<index>\d+)}(?P<extra>.*)$", stack.stack_name, re.IGNORECASE, ) if match: if match["extra"] == "": key = f"Image {match['index']}" else: key = f"Image {match['index']} ({match['extra']})" else: key = stack.stack_name if key in images: image = images[key] else: image = { "metadata": "", "detectors": [], } # Metadata frame_size = ( stack.num_pixels[0] * pixel_sizes[0] / 1000, stack.num_pixels[1] * pixel_sizes[1] / 1000, ) # Build metadata string metadata = f"Frame: {frame_size[0]:.1f}x{frame_size[1]:.1f}µm - Pixel: {pixel_sizes[0]}nm" if image["metadata"] == "": image["metadata"] = metadata else: if image["metadata"] != metadata: raise ValueError( f"The same detector seems to have inconsistent metadata across acquisitions!" ) # Append current detectir image["detectors"].append( { "index": i, "name": stack.stack_name, "detector": detector, "description": stack.stack_description, "num_pixels": stack.num_pixels, "physical_lengths": stack.physical_lengths, "physical_offsets": stack.physical_offsets, "pixel_sizes": pixel_sizes, } ) # Store the (updated) image in the dictionary images[key] = image # Sort the dictionary using natural sorting of its keys images = dict(natsorted(images.items())) # Return the extracted metadata return images @staticmethod def _get_detector(imspector_dictionary_root: ET, img_name: str) -> Union[str, None]: """Extract the detector names from the tag dictionary of current stack. Parameters ---------- imspector_dictionary_root: xml.etree.ElementTree Root of the "imspector" tree (i.e., tag_dictionary["imspector"]). Returns ------- name: Union[str, None] Name of the detector, or None if the detector could not be found. """ # Get the channels node channels_node = imspector_dictionary_root.find( "./doc/ExpControl/measurement/channels" ) if channels_node is None: return None # Find all items items = channels_node.findall("item") if items is None: return None # Process items detector = None for item in items: detector = item.find("./detsel/detector") name = item.find("./name") if detector is not None and name is not None: if name.text in img_name: return detector.text return detectorReads data and metadata information from
.MSR(OBF format) files.For documentation, see: https://imspectordocs.readthedocs.io/en/latest/fileformat.html#the-obf-file-format
Note: binary data is stored in little-endian order.
Constructor.
Parameters
filename:Union[Path, str]- Full path to the file name to open.
Instance variables
prop num_stacks-
Expand source code
@property def num_stacks(self): """Return the number of stacks contained in the file.""" return len(self._obf_stacks_list)Return the number of stacks contained in the file.
Methods
def export_ome_xml_metadata(self, file_name: pathlib.Path | str)-
Expand source code
def export_ome_xml_metadata(self, file_name: Union[str, Path]): """Export the OME-XML metadata to file. Parameters ---------- file_name: Union[str, Path] Output file name. """ # Get the ome-xml tree, optionally as formatted string metadata = self.get_ome_xml_metadata() if metadata is None: print("Nothing to export.") return # Make sure the parent path to the file exists Path(file_name).parent.mkdir(parents=True, exist_ok=True) # Save to file with open(file_name, "w", encoding="utf-8") as f: f.write(metadata)Export the OME-XML metadata to file.
Parameters
file_name:Union[str, Path]- Output file name.
def export_tag_dictionary(self, stack_index: int, file_name: pathlib.Path | str)-
Expand source code
def export_tag_dictionary(self, stack_index: int, file_name: Union[str, Path]): """Export the tag dictionary to file. Parameters ---------- stack_index: int Index of the stack for which to export the tag dictionary. file_name: Union[str, Path] Output file name. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"Stack number {stack_index} is out of range.") # Get tag dictionary tag_dictionary = self.get_tag_dictionary(stack_index) if tag_dictionary is None: return None # Make sure file_name is of type Path file_name = Path(file_name) # Make sure the parent path to the file exists file_name.parent.mkdir(parents=True, exist_ok=True) # Export the dictionaries for key, value in tag_dictionary.items(): if type(value) is ET.Element: mod_file_name = file_name.parent / f"{file_name.stem}_{key}.xml" xml_str = self._tree_to_formatted_xml(value) with open(mod_file_name, "w") as f: f.write(xml_str) elif type(value) is dict: mod_file_name = file_name.parent / f"{file_name.stem}_{key}.json" with open(mod_file_name, "w") as f: json.dump(value, f, indent=4) else: mod_file_name = file_name.parent / f"{file_name.stem}_{key}.txt" with open(mod_file_name, "w") as f: f.write(value)Export the tag dictionary to file.
Parameters
stack_index:int- Index of the stack for which to export the tag dictionary.
file_name:Union[str, Path]- Output file name.
def get_data(self, stack_index: int) ‑> numpy.ndarray | None-
Expand source code
def get_data(self, stack_index: int) -> Union[np.ndarray, None]: """Read the data for requested stack: only images are returned. Parameters ---------- stack_index: int Index of the stack for which to read the data. Returns ------- frame: Union[np.ndarray, None] Data as a 2D NumPy array. None if it could not be read or if it was not a 2D image. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"stack_index={stack_index} is out of bounds.") # Get the metadata for the requested stack obf_stack_metadata = self._obf_stacks_list[stack_index] # Currently, we only support format 6 and newer if obf_stack_metadata.format_version < 6: print("Reading data is supported only for stack format 6 and newer.") return None # If there are chunks, we currently do not read if obf_stack_metadata.num_chunk_positions > 0: print("Reading chunked data is currently not supported.") return None # We currently only read 2D images if self._get_num_dims(obf_stack_metadata.num_pixels) != 2: print("Only 2D images are currently supported.") return None # Get NumPy data type np_data_type, _ = self._get_numpy_data_type( obf_stack_metadata.data_type_on_disk ) if np_data_type is None: print("Unsupported data type.") return None # Extract some info height = obf_stack_metadata.num_pixels[1] width = obf_stack_metadata.num_pixels[0] bytes_per_sample = obf_stack_metadata.bytes_per_sample # Expected number of (decompressed) samples expected_num_samples = width * height # Number of written bytes written_bytes = obf_stack_metadata.samples_written * bytes_per_sample # Open the file with open(self.filename, mode="rb") as f: # Seek to the beginning of the data f.seek(obf_stack_metadata.data_start_position) # Is there compression? if obf_stack_metadata.compression_type != 0: # Read the bytes compressed_data = f.read(written_bytes) # Decompress them decompressed_data = zlib.decompress(compressed_data) # Cast to a "byte" NumPy array raw_frame = np.frombuffer(decompressed_data, dtype=np.uint8) else: # Read the bytes raw_data = f.read(written_bytes) # Cast to a "byte" NumPy array raw_frame = np.frombuffer(raw_data, dtype=np.uint8) # Reinterpret as final data type format (little Endian) frame = raw_frame.view(np.dtype(np_data_type)) # Make sure the final frame size matches the expected size if len(frame) != expected_num_samples: print("Unexpected length of data retrieved!") return None # Reshape frame = frame.reshape((height, width)) return frameRead the data for requested stack: only images are returned.
Parameters
stack_index:int- Index of the stack for which to read the data.
Returns
frame:Union[np.ndarray, None]- Data as a 2D NumPy array. None if it could not be read or if it was not a 2D image.
def get_data_offsets(self, stack_index: int, scaled: bool = True) ‑> list | None-
Expand source code
def get_data_offsets( self, stack_index: int, scaled: bool = True ) -> Union[list, None]: """Returns the (scaled) data offsets for the requested stack. Parameters ---------- stack_index: int Index of the stack for which to read the data. scaled: bool If scaled is True, the offsets will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units(). Returns ------- offsets: Union[list, None] Offsets for 2D images, None otherwise. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"stack_index={stack_index} is out of bounds.") # Get the metadata for the requested stack obf_stack_metadata = self._obf_stacks_list[stack_index] if obf_stack_metadata is None: return None # Get the offsets offsets = obf_stack_metadata.physical_offsets[: obf_stack_metadata.rank] # Do we need to scale? if scaled: _, factors = self.get_data_units(stack_index=stack_index) for i, factor in enumerate(factors): if factor != 1.0: offsets[i] *= factor return offsetsReturns the (scaled) data offsets for the requested stack.
Parameters
stack_index:int- Index of the stack for which to read the data.
scaled:bool- If scaled is True, the offsets will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units().
Returns
offsets:Union[list, None]- Offsets for 2D images, None otherwise.
def get_data_physical_sizes(self, stack_index: int, scaled: bool = True) ‑> list | None-
Expand source code
def get_data_physical_sizes( self, stack_index: int, scaled: bool = True ) -> Union[list, None]: """Returns the (scaled) data physical size for the requested stack. Parameters ---------- stack_index: int Index of the stack for which to read the data. scaled: bool If scaled is True, the physical sizes will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units(). Returns ------- offsets: Union[list, None] Physical sizes for 2D images, None otherwise. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"stack_index={stack_index} is out of bounds.") # Get the metadata for the requested stack obf_stack_metadata = self._obf_stacks_list[stack_index] if obf_stack_metadata is None: return None # Get the physical lengths phys_lengths = obf_stack_metadata.physical_lengths[: obf_stack_metadata.rank] # Do we need to scale? if scaled: _, factors = self.get_data_units(stack_index=stack_index) for i, factor in enumerate(factors): if factor != 1.0: phys_lengths[i] *= factor # Return the physical lengths as list return phys_lengthsReturns the (scaled) data physical size for the requested stack.
Parameters
stack_index:int- Index of the stack for which to read the data.
scaled:bool- If scaled is True, the physical sizes will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units().
Returns
offsets:Union[list, None]- Physical sizes for 2D images, None otherwise.
def get_data_pixel_sizes(self, stack_index: int, scaled: bool = True) ‑> list | None-
Expand source code
def get_data_pixel_sizes( self, stack_index: int, scaled: bool = True ) -> Union[list, None]: """Returns the (scaled) data pixel size for the requested stack. Parameters ---------- stack_index: int Index of the stack for which to read the data. scaled: bool If scaled is True, the pixel sizes will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units(). Returns ------- offsets: Union[list, None] Pixel sizes for 2D images, None otherwise. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"stack_index={stack_index} is out of bounds.") # Get the metadata for the requested stack obf_stack_metadata = self._obf_stacks_list[stack_index] if obf_stack_metadata is None: return None # Get the physical sizes phys_lengths = self.get_data_physical_sizes( stack_index=stack_index, scaled=scaled ) # Get the number of pixels along each dimension num_pixels = obf_stack_metadata.num_pixels[: obf_stack_metadata.rank] # Now divide by the image size pixel_sizes = np.array(phys_lengths) / np.array(num_pixels) # Return the pixel size as list return pixel_sizes.tolist()Returns the (scaled) data pixel size for the requested stack.
Parameters
stack_index:int- Index of the stack for which to read the data.
scaled:bool- If scaled is True, the pixel sizes will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units().
Returns
offsets:Union[list, None]- Pixel sizes for 2D images, None otherwise.
def get_data_units(self, stack_index: int) ‑> tuple[list, list] | None-
Expand source code
def get_data_units(self, stack_index: int) -> Union[tuple[list, list], None]: """Returns the data units and scale factors per dimension for requested stack. Units are one of: "m": meters "kg": kilograms "s": s "A": Amperes "K": Kelvin "mol": moles "cd": candela "r": radian "sr": sr Parameters ---------- stack_index: int Index of the stack for which to read the data. Returns ------- unit: Union[tuple[list, list], None] List of units and list of scale factors, or None if no file was opened. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"stack_index={stack_index} is out of bounds.") # Get the metadata for the requested stack obf_stack_metadata = self._obf_stacks_list[stack_index] if obf_stack_metadata is None: return None units = [] scale_factors = [] for dim in range(obf_stack_metadata.rank): dimensions = obf_stack_metadata.si_dimensions[dim] scale_factors.append(dimensions.scale_factor) for i, exponent in enumerate(dimensions.exponents): if i == 0 and exponent.numerator > 0: units.append("m") break elif i == 1 and exponent.numerator > 0: units.append("kg") break elif i == 2 and exponent.numerator > 0: units.append("s") break elif i == 3 and exponent.numerator > 0: units.append("A") break elif i == 4 and exponent.numerator > 0: units.append("K") break elif i == 5 and exponent.numerator > 0: units.append("mol") break elif i == 6 and exponent.numerator > 0: units.append("cd") break elif i == 7 and exponent.numerator > 0: units.append("r") break elif i == 8 and exponent.numerator > 0: units.append("sr") break else: units.append("") break # Return the extracted units and scale factors return units, scale_factorsReturns the data units and scale factors per dimension for requested stack.
Units are one of: "m": meters "kg": kilograms "s": s "A": Amperes "K": Kelvin "mol": moles "cd": candela "r": radian "sr": sr
Parameters
stack_index:int- Index of the stack for which to read the data.
Returns
unit:Union[tuple[list, list], None]- List of units and list of scale factors, or None if no file was opened.
def get_image_info_dict(self)-
Expand source code
def get_image_info_dict(self): """Return a hierarchical dictionary of images from all stacks.""" # Initialize the dictionary images = {} # Do we have images? if self.num_stacks == 0: return images for i, stack in enumerate(self._obf_stacks_list): # Only return images if (np.array(stack.num_pixels) > 1).sum() == 2: # Get pixel size pixel_sizes = np.round( np.array(self.get_data_pixel_sizes(stack_index=i)) * 1e9, 2 )[:2] # Get detector detector = self._get_detector( imspector_dictionary_root=stack.tag_dictionary["imspector"], img_name=stack.stack_name, ) if detector is None: continue # Get acquisition number match = re.match( r"^.+{(?P<index>\d+)}(?P<extra>.*)$", stack.stack_name, re.IGNORECASE, ) if match: if match["extra"] == "": key = f"Image {match['index']}" else: key = f"Image {match['index']} ({match['extra']})" else: key = stack.stack_name if key in images: image = images[key] else: image = { "metadata": "", "detectors": [], } # Metadata frame_size = ( stack.num_pixels[0] * pixel_sizes[0] / 1000, stack.num_pixels[1] * pixel_sizes[1] / 1000, ) # Build metadata string metadata = f"Frame: {frame_size[0]:.1f}x{frame_size[1]:.1f}µm - Pixel: {pixel_sizes[0]}nm" if image["metadata"] == "": image["metadata"] = metadata else: if image["metadata"] != metadata: raise ValueError( f"The same detector seems to have inconsistent metadata across acquisitions!" ) # Append current detectir image["detectors"].append( { "index": i, "name": stack.stack_name, "detector": detector, "description": stack.stack_description, "num_pixels": stack.num_pixels, "physical_lengths": stack.physical_lengths, "physical_offsets": stack.physical_offsets, "pixel_sizes": pixel_sizes, } ) # Store the (updated) image in the dictionary images[key] = image # Sort the dictionary using natural sorting of its keys images = dict(natsorted(images.items())) # Return the extracted metadata return imagesReturn a hierarchical dictionary of images from all stacks.
def get_image_info_list(self)-
Expand source code
def get_image_info_list(self): """Return a list of images from all stacks.""" # Initialize the list images = [] # Do we have images? if self.num_stacks == 0: return images for i, stack in enumerate(self._obf_stacks_list): # Only return images if (np.array(stack.num_pixels) > 1).sum() == 2: # Get pixel size pixel_sizes = np.round( np.array(self.get_data_pixel_sizes(stack_index=i)) * 1e9, 2 )[:2] # Get detector detector = self._get_detector( imspector_dictionary_root=stack.tag_dictionary["imspector"], img_name=stack.stack_name, ) if detector is None: continue # Build a (univocal) summary string as_string = ( f"{detector}: {stack.stack_name}: " f"size = (h={stack.num_pixels[1]} x w={stack.num_pixels[0]}); " f"pixel size = {pixel_sizes[0]}nm " f"(index = {i})" ) images.append( { "index": i, "name": stack.stack_name, "detector": detector, "description": stack.stack_description, "num_pixels": stack.num_pixels, "physical_lengths": stack.physical_lengths, "physical_offsets": stack.physical_offsets, "pixel_sizes": pixel_sizes, "as_string": as_string, } ) # Sort the list using natural sorting by the 'as_string' key images = natsorted(images, key=lambda x: x["as_string"]) # Return the extracted metadata return imagesReturn a list of images from all stacks.
def get_ome_xml_metadata(self) ‑> str | None-
Expand source code
def get_ome_xml_metadata(self) -> Union[str, None]: """Return the OME XML metadata. Returns ------- ome_xml_metadata: Union[str, None] OME XML metadata as formatted string. If no file was loaded, returns None. """ # Get the ome-xml tree root = self.obf_file_metadata.tree if root is None: return None # Return metadata as formatted XML string return self._tree_to_formatted_xml(root)Return the OME XML metadata.
Returns
ome_xml_metadata:Union[str, None]- OME XML metadata as formatted string. If no file was loaded, returns None.
def get_tag_dictionary(self, stack_index: int) ‑> dict | None-
Expand source code
def get_tag_dictionary(self, stack_index: int) -> Union[dict, None]: """Return the tag dictionary for the requested stack. Parameters ---------- stack_index: int Index of the stack for which to return the tag dictionary. Returns ------- tag_dictionary: Union[dict, None] Dictionary. If no file was loaded, returns None. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"Stack number {stack_index} is out of range.") # Get stack metadata obf_stack_metadata = self._obf_stacks_list[stack_index] if obf_stack_metadata is None: return None # Get the tag dictionary tag = obf_stack_metadata.tag_dictionary # Return the tag dictionary return tagReturn the tag dictionary for the requested stack.
Parameters
stack_index:int- Index of the stack for which to return the tag dictionary.
Returns
tag_dictionary:Union[dict, None]- Dictionary. If no file was loaded, returns None.
def scan(self) ‑> bool-
Expand source code
def scan(self) -> bool: """Scan the metadata of the file. Returns ------- success: bool True if the file was scanned successfully, False otherwise. """ # Open the file with open(self.filename, mode="rb") as f: if not self._read_obf_header(f): return False # Scan metadata self.obf_file_metadata = self._scan_metadata(f, self.obf_file_header) # Get the first stack position next_stack_pos = self.obf_file_header.first_stack_pos while next_stack_pos != 0: # Scan the next stack success, obs_stack_metadata = self._read_obf_stack(f, next_stack_pos) if not success: return False # Append current stack header self._obf_stacks_list.append(obs_stack_metadata) # Do we have a next header to parse? next_stack_pos = obs_stack_metadata.next_stack_pos return TrueScan the metadata of the file.
Returns
success:bool- True if the file was scanned successfully, False otherwise.
class MinFluxReader (filename: pathlib.Path | str,
valid: bool = True,
z_scaling_factor: float = 1.0,
is_tracking: bool = False,
pool_dcr: bool = False,
dwell_time: float = 1.0)-
Expand source code
class MinFluxReader: __docs__ = "Reader of MINFLUX data in `.pmx`, `.npy` or `.mat` formats and Imspector m2205 files, and `.pmx` version 1.0 - 2.0." __slots__ = [ "_pool_dcr", "_cfr_index", "_processed_dataframe", "_dcr_index", "_dwell_time", "_eco_index", "_efo_index", "_filename", "_full_raw_data_array", "_is_3d", "_is_aggregated", "_is_last_valid", "_is_tracking", "_last_valid", "_last_valid_cfr", "_loc_index", "_relocalizations", "_reps", "_tid_index", "_tim_index", "_unit_scaling_factor", "_valid", "_valid_cfr", "_valid_entries", "_vld_index", "_z_scaling_factor", "_tid_offsets", ] def __init__( self, filename: Union[Path, str], valid: bool = True, z_scaling_factor: float = 1.0, is_tracking: bool = False, pool_dcr: bool = False, dwell_time: float = 1.0, ): """Constructor. Parameters ---------- filename: Union[Path, str] Full path to the `.pmx`, `.npy` or `.mat` file to read valid: bool (optional, default = True) Whether to load only valid localizations. z_scaling_factor: float (optional, default = 1.0) Refractive index mismatch correction factor to apply to the z coordinates. is_tracking: bool (optional, default = False) Whether the dataset comes from a tracking experiment; otherwise, it is considered as a localization experiment. pool_dcr: bool (optional, default = False) Whether to pool DCR values weighted by the relative ECO of all relocalized iterations. dwell_time: float (optional, default 1.0) Dwell time in milliseconds. """ # Store the filename self._filename: Path = Path(filename) if not self._filename.exists(): raise IOError(f"The file {self._filename} does not seem to exist.") # Keep track of whether the chosen sequence is the last valid. self._is_last_valid: bool = False # Store the valid flag self._valid: bool = valid # The localizations are stored in meters in the Imspector files and by # design also in the `.pmx` format. Here, we scale them to be in nm self._unit_scaling_factor: float = 1e9 # Store the z correction factor self._z_scaling_factor: float = z_scaling_factor # Store the dwell time self._dwell_time = dwell_time # Initialize the data self._full_raw_data_array = None self._processed_dataframe = None self._valid_entries = None self._tid_offsets = [] # Whether the acquisition is 2D or 3D self._is_3d: bool = False # Whether the acquisition is a tracking dataset self._is_tracking: bool = is_tracking # Whether to pool the dcr values self._pool_dcr = pool_dcr # Whether the file contains aggregate measurements self._is_aggregated: bool = False # Indices dependent on 2D or 3D acquisition and whether the # data comes from a localization or a tracking experiment. self._reps: int = -1 self._efo_index: int = -1 self._cfr_index: int = -1 self._dcr_index: int = -1 self._eco_index: int = -1 self._loc_index: int = -1 self._valid_cfr: list = [] self._relocalizations: list = [] # Constant indices self._tid_index: int = 0 self._tim_index: int = 0 self._vld_index: int = 0 # Keep track of the last valid global and CFR iterations as returned # by the initial scan self._last_valid: int = -1 self._last_valid_cfr: int = -1 # Load the file if not self._load(): raise IOError(f"The file {self._filename} is not a valid MINFLUX file.") @property def version(self) -> int: return 1 @property def is_last_valid(self) -> Union[bool, None]: """Return True if the selected iteration is the "last valid", False otherwise. If the dataframe has not been processed yet, `is_last_valid` will be None.""" if self._processed_dataframe is None: return None return self._is_last_valid @property def z_scaling_factor(self) -> float: """Returns the scaling factor for the z coordinates.""" return self._z_scaling_factor @property def is_3d(self) -> bool: """Returns True is the acquisition is 3D, False otherwise.""" return self._is_3d @property def is_aggregated(self) -> bool: """Returns True is the acquisition is aggregated, False otherwise.""" return self._is_aggregated @property def is_tracking(self) -> bool: """Returns True for a tracking acquisition, False otherwise.""" return self._is_tracking @property def is_pool_dcr(self) -> bool: """Returns True if the DCR values over all relocalized iterations (to use all photons).""" return self._pool_dcr @property def dwell_time(self) -> float: """Returns the dwell time.""" return self._dwell_time @property def tid_offsets(self) -> list: """Return list of (first_iid, tid_offset) pairs applied when combining datasets.""" return list(self._tid_offsets) @property def num_valid_entries(self) -> int: """Number of valid entries.""" if self._valid_entries is None: return 0 return int(self._valid_entries.sum()) @property def num_invalid_entries(self) -> int: """Number of valid entries.""" if self._valid_entries is None: return 0 return int(np.logical_not(self._valid_entries).sum()) @property def tot_num_entries(self) -> int: """Total number of entries.""" return self.num_valid_entries + self.num_invalid_entries @property def valid_cfr(self) -> list: """Return the iterations with valid CFR measurements. Returns ------- cfr: boolean array with True for the iteration indices that have a valid measurement. """ if self.tot_num_entries == 0: return [] return self._valid_cfr @property def relocalizations(self) -> list: """Return the iterations with relocalizations. Returns ------- reloc: boolean array with True for the iteration indices that are relocalized. """ if self.tot_num_entries == 0: return [] return self._relocalizations @property def valid_raw_data_array(self) -> Union[None, np.ndarray]: """Return the raw data.""" if self.tot_num_entries == 0: return None return self._full_raw_data_array[self._valid_entries].copy() @property def processed_dataframe(self) -> Union[None, pd.DataFrame]: """Return the raw data as dataframe (some properties only).""" if self._processed_dataframe is not None: return self._processed_dataframe self._processed_dataframe = self._process() return self._processed_dataframe @property def filename(self) -> Union[Path, None]: """Return the filename if set.""" if self._filename is None: return None return Path(self._filename) def set_indices(self, index, cfr_index, process: bool = True): """Set the parameter indices. We distinguish between the index of all parameters that are always measured and are accessed from the same iteration, and the cfr index, that is not always measured. Parameters ---------- index: int Global iteration index for all parameters but cfr cfr_index: int Iteration index for cfr process: bool (Optional, default = True) By default, when setting the indices, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place. """ # The cfr index is not allowed to be smaller than the global iteration index if index < cfr_index: raise ValueError( "The value of index must be greater than or equal to cfr_index." ) # Make sure there is loaded data if self.tot_num_entries == 0: raise ValueError("No data loaded.") if self._reps == -1: raise ValueError("No data loaded.") if len(self._valid_cfr) == 0: raise ValueError("No data loaded.") # Check that the arguments are compatible with the loaded data if index < 0 or index > self._reps - 1: raise ValueError( f"The value of index must be between 0 and {self._reps - 1}." ) if cfr_index < 0 or cfr_index > len(self._valid_cfr) - 1: raise ValueError( f"The value of index must be between 0 and {len(self._valid_cfr) - 1}." ) # Now set the general values self._efo_index = index self._dcr_index = index self._eco_index = index self._loc_index = index # Set the cfr index self._cfr_index = cfr_index # Constant indices self._tid_index: int = 0 self._tim_index: int = 0 self._vld_index: int = 0 # Re-process the file? If the processed dataframe already exists, # the processing will take place anyway. if process or self._processed_dataframe is not None: self._processed_dataframe = self._process() def set_tracking(self, is_tracking: bool, process: bool = True): """Sets whether the acquisition is tracking or localization. Parameters ---------- is_tracking: bool Set to True for a tracking acquisition, False for a localization acquisition. process: bool (Optional, default = True) By default, when setting the tracking flag, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place. """ # Update the flag self._is_tracking = is_tracking # Re-process the file? if process or self._processed_dataframe is not None: self._processed_dataframe = self._process() def set_dwell_time(self, dwell_time: float, process: bool = True): """ Sets the dwell time. Parameters ---------- dwell_time: float Dwell time. process: bool (Optional, default = True) By default, when setting the dwell time, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place. """ # Update the flag self._dwell_time = dwell_time # Re-process the file? if process or self._processed_dataframe is not None: self._processed_dataframe = self._process() def set_pool_dcr(self, pool_dcr: bool, process: bool = True): """ Sets whether the DCR values should be pooled (and weighted by ECO). Parameters ---------- pool_dcr: bool Whether the DCR values should be pooled (and weighted by ECO). process: bool (Optional, default = True) By default, when setting the DCR binning flag, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place. """ # Update the flag self._pool_dcr = pool_dcr # Re-process the file? if process or self._processed_dataframe is not None: self._processed_dataframe = self._process() @classmethod def processed_properties(cls) -> list: """Returns the properties read from the file that correspond to the processed dataframe column names.""" return [ "tid", "tim", "x", "y", "z", "efo", "cfr", "eco", "dcr", "dwell", "fluo", "fbg", ] @classmethod def raw_properties(cls) -> list: """Returns the properties read from the file and dynamic that correspond to the raw dataframe column names.""" return [ "tid", "aid", "vld", "tim", "x", "y", "z", "efo", "cfr", "eco", "dcr", "fbg", ] def _load(self) -> bool: """Load the file.""" if not self._filename.is_file(): print(f"File {self._filename} does not exist.") return False # Reset stored TID offsets self._tid_offsets = [] # Call the specialized _load_*() function if self._filename.name.lower().endswith(".npy"): try: data_array = np.load(str(self._filename), allow_pickle=False) if "fluo" in data_array.dtype.names: self._full_raw_data_array = data_array else: self._full_raw_data_array = _migrate_npy_array(data_array) except ( OSError, UnpicklingError, ValueError, EOFError, FileNotFoundError, TypeError, Exception, ) as e: print(f"Could not open {self._filename}: {e}") return False elif self._filename.name.lower().endswith(".mat"): try: self._full_raw_data_array = _convert_from_mat(self._filename) except Exception as e: print(f"Could not open {self._filename}: {e}") return False elif self._filename.name.lower().endswith(".pmx"): try: # Read filtered dataframe self._full_raw_data_array = PMXReader.get_array(self._filename) self._tid_offsets = PMXReader.get_tid_offsets(self._filename) if self._full_raw_data_array is None: print(f"Could not open {self._filename}.") return False except Exception as e: print(f"Could not open {self._filename}: {e}") return False else: print(f"Unexpected file {self._filename}.") return False # Store a logical array with the valid entries self._valid_entries = self._full_raw_data_array["vld"] # Cache whether the data is 2D or 3D and whether is aggregated # The cases are different for localization vs. tracking experiments # num_locs = self._full_raw_data_array["itr"].shape[1] self._is_3d = ( float(np.nanmean(self._full_raw_data_array["itr"][:, -1]["loc"][:, -1])) != 0.0 ) # Set all relevant indices self._set_all_indices() # Return success return True def _process(self) -> Union[None, pd.DataFrame]: """Returns processed dataframe for valid (or invalid) entries. Returns ------- df: pd.DataFrame Processed data as DataFrame. """ # Do we have a data array to work on? if self.tot_num_entries == 0: return None if self._valid: indices = self._valid_entries else: indices = np.logical_not(self._valid_entries) # Extract the valid iterations itr = self._full_raw_data_array["itr"][indices] # Extract the valid identifiers tid = self._full_raw_data_array["tid"][indices] # Extract the valid time points tim = self._full_raw_data_array["tim"][indices] # Extract the fluorophore IDs fluo = self._full_raw_data_array["fluo"][indices] if np.all(fluo) == 0: fluo = np.ones(fluo.shape, dtype=fluo.dtype) # The following extraction pattern will change whether the # acquisition is normal or aggregated if self.is_aggregated: # Extract the locations loc = itr["loc"].squeeze() * self._unit_scaling_factor loc[:, 2] = loc[:, 2] * self._z_scaling_factor # Extract EFO efo = itr["efo"] # Extract CFR cfr = itr["cfr"] # Extract ECO eco = itr["eco"] # Extract DCR dcr = itr["dcr"] # Extract the background bfg = itr["bfg"] # Dwell dwell = np.around((eco / (efo / 1000)) / self._dwell_time, decimals=0) else: # Extract the locations loc = itr[:, self._loc_index]["loc"] * self._unit_scaling_factor loc[:, 2] = loc[:, 2] * self._z_scaling_factor # Extract EFO efo = itr[:, self._efo_index]["efo"] # Extract CFR cfr = itr[:, self._cfr_index]["cfr"] # Extract ECO eco = itr[:, self._eco_index]["eco"] # Extract the background fbg = itr[:, self._loc_index]["fbg"] # Pool DCR values? if self._pool_dcr and np.sum(self._relocalizations) > 1: # Calculate ECO contributions eco_all = itr[:, self._relocalizations]["eco"] eco_sum = eco_all.sum(axis=1) eco_all_norm = eco_all / eco_sum.reshape(-1, 1) # Extract DCR values and weigh them by the relative ECO contributions dcr = itr[:, self._relocalizations]["dcr"] dcr = dcr * eco_all_norm dcr = dcr.sum(axis=1) else: # Extract DCR dcr = itr[:, self._dcr_index]["dcr"] # Calculate dwell dwell = np.around((eco / (efo / 1000)) / self._dwell_time, decimals=0) # Create a Pandas dataframe for the results df = pd.DataFrame( index=pd.RangeIndex(start=0, stop=len(tid)), columns=MinFluxReader.processed_properties(), ) # Store the extracted valid hits into the dataframe df["tid"] = tid df["x"] = loc[:, 0] df["y"] = loc[:, 1] df["z"] = loc[:, 2] df["tim"] = tim df["efo"] = efo df["cfr"] = cfr df["eco"] = eco df["dcr"] = dcr df["dwell"] = dwell df["fbg"] = fbg df["fluo"] = fluo # Remove rows with NaNs in the loc matrix df = df.dropna(subset=["x"]) # Check if the selected indices correspond to the last valid iteration self._is_last_valid = bool( self._cfr_index == self._last_valid_cfr and self._efo_index == self._last_valid ) return df def _set_all_indices(self): """Set indices of properties to be read.""" if self.tot_num_entries == 0: return False # Number of iterations self._reps = self._full_raw_data_array["itr"].shape[1] # Is this an aggregated acquisition? if self._reps == 1: self._is_aggregated = True else: self._is_aggregated = False # Query the data to find the last valid iteration # for all measurements last_valid = find_last_valid_iteration(self._full_raw_data_array) # Set the extracted indices self._efo_index = last_valid["efo_index"] self._cfr_index = last_valid["cfr_index"] self._dcr_index = last_valid["dcr_index"] self._eco_index = last_valid["eco_index"] self._loc_index = last_valid["loc_index"] self._valid_cfr = last_valid["valid_cfr"] self._relocalizations = last_valid["reloc"] # Keep track of the last valid iteration self._last_valid = len(self._valid_cfr) - 1 self._last_valid_cfr = last_valid["cfr_index"] def __repr__(self) -> str: """String representation of the object.""" if self.num_valid_entries == 0: return "No file loaded." str_valid = ( "all valid" if self.num_invalid_entries == 0 else f"{self.num_valid_entries} valid and {self.num_invalid_entries} non valid" ) str_acq = "3D" if self.is_3d else "2D" aggr_str = "aggregated" if self.is_aggregated else "normal" return ( f"File: {self._filename.name}: " f"{str_acq} {aggr_str} acquisition with {self.tot_num_entries} entries ({str_valid})." ) def __str__(self) -> str: """Human-friendly representation of the object.""" return self.__repr__()Constructor.
Parameters
filename:Union[Path, str]- Full path to the
.pmx,.npyor.matfile to read valid:bool (optional, default= True)- Whether to load only valid localizations.
z_scaling_factor:float (optional, default= 1.0)- Refractive index mismatch correction factor to apply to the z coordinates.
is_tracking:bool (optional, default= False)- Whether the dataset comes from a tracking experiment; otherwise, it is considered as a localization experiment.
pool_dcr:bool (optional, default= False)- Whether to pool DCR values weighted by the relative ECO of all relocalized iterations.
dwell_time:float (optional, default1.0)- Dwell time in milliseconds.
Subclasses
- pyminflux.reader._reader_v2.MinFluxReaderV2
Static methods
def processed_properties() ‑> list-
Returns the properties read from the file that correspond to the processed dataframe column names.
def raw_properties() ‑> list-
Returns the properties read from the file and dynamic that correspond to the raw dataframe column names.
Instance variables
prop dwell_time : float-
Expand source code
@property def dwell_time(self) -> float: """Returns the dwell time.""" return self._dwell_timeReturns the dwell time.
prop filename : pathlib.Path | None-
Expand source code
@property def filename(self) -> Union[Path, None]: """Return the filename if set.""" if self._filename is None: return None return Path(self._filename)Return the filename if set.
prop is_3d : bool-
Expand source code
@property def is_3d(self) -> bool: """Returns True is the acquisition is 3D, False otherwise.""" return self._is_3dReturns True is the acquisition is 3D, False otherwise.
prop is_aggregated : bool-
Expand source code
@property def is_aggregated(self) -> bool: """Returns True is the acquisition is aggregated, False otherwise.""" return self._is_aggregatedReturns True is the acquisition is aggregated, False otherwise.
prop is_last_valid : bool | None-
Expand source code
@property def is_last_valid(self) -> Union[bool, None]: """Return True if the selected iteration is the "last valid", False otherwise. If the dataframe has not been processed yet, `is_last_valid` will be None.""" if self._processed_dataframe is None: return None return self._is_last_validReturn True if the selected iteration is the "last valid", False otherwise. If the dataframe has not been processed yet,
is_last_validwill be None. prop is_pool_dcr : bool-
Expand source code
@property def is_pool_dcr(self) -> bool: """Returns True if the DCR values over all relocalized iterations (to use all photons).""" return self._pool_dcrReturns True if the DCR values over all relocalized iterations (to use all photons).
prop is_tracking : bool-
Expand source code
@property def is_tracking(self) -> bool: """Returns True for a tracking acquisition, False otherwise.""" return self._is_trackingReturns True for a tracking acquisition, False otherwise.
prop num_invalid_entries : int-
Expand source code
@property def num_invalid_entries(self) -> int: """Number of valid entries.""" if self._valid_entries is None: return 0 return int(np.logical_not(self._valid_entries).sum())Number of valid entries.
prop num_valid_entries : int-
Expand source code
@property def num_valid_entries(self) -> int: """Number of valid entries.""" if self._valid_entries is None: return 0 return int(self._valid_entries.sum())Number of valid entries.
prop processed_dataframe : pandas.core.frame.DataFrame | None-
Expand source code
@property def processed_dataframe(self) -> Union[None, pd.DataFrame]: """Return the raw data as dataframe (some properties only).""" if self._processed_dataframe is not None: return self._processed_dataframe self._processed_dataframe = self._process() return self._processed_dataframeReturn the raw data as dataframe (some properties only).
prop relocalizations : list-
Expand source code
@property def relocalizations(self) -> list: """Return the iterations with relocalizations. Returns ------- reloc: boolean array with True for the iteration indices that are relocalized. """ if self.tot_num_entries == 0: return [] return self._relocalizationsReturn the iterations with relocalizations.
Returns
reloc: boolean array with True for the iteration indices that are relocalized.
prop tid_offsets : list-
Expand source code
@property def tid_offsets(self) -> list: """Return list of (first_iid, tid_offset) pairs applied when combining datasets.""" return list(self._tid_offsets)Return list of (first_iid, tid_offset) pairs applied when combining datasets.
prop tot_num_entries : int-
Expand source code
@property def tot_num_entries(self) -> int: """Total number of entries.""" return self.num_valid_entries + self.num_invalid_entriesTotal number of entries.
prop valid_cfr : list-
Expand source code
@property def valid_cfr(self) -> list: """Return the iterations with valid CFR measurements. Returns ------- cfr: boolean array with True for the iteration indices that have a valid measurement. """ if self.tot_num_entries == 0: return [] return self._valid_cfrReturn the iterations with valid CFR measurements.
Returns
cfr:boolean array with True for the iteration indices- that have a valid measurement.
prop valid_raw_data_array : numpy.ndarray | None-
Expand source code
@property def valid_raw_data_array(self) -> Union[None, np.ndarray]: """Return the raw data.""" if self.tot_num_entries == 0: return None return self._full_raw_data_array[self._valid_entries].copy()Return the raw data.
prop version : int-
Expand source code
@property def version(self) -> int: return 1 prop z_scaling_factor : float-
Expand source code
@property def z_scaling_factor(self) -> float: """Returns the scaling factor for the z coordinates.""" return self._z_scaling_factorReturns the scaling factor for the z coordinates.
Methods
def set_dwell_time(self, dwell_time: float, process: bool = True)-
Expand source code
def set_dwell_time(self, dwell_time: float, process: bool = True): """ Sets the dwell time. Parameters ---------- dwell_time: float Dwell time. process: bool (Optional, default = True) By default, when setting the dwell time, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place. """ # Update the flag self._dwell_time = dwell_time # Re-process the file? if process or self._processed_dataframe is not None: self._processed_dataframe = self._process()Sets the dwell time.
Parameters
dwell_time:float- Dwell time.
process:bool (Optional, default= True)- By default, when setting the dwell time, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place.
def set_indices(self, index, cfr_index, process: bool = True)-
Expand source code
def set_indices(self, index, cfr_index, process: bool = True): """Set the parameter indices. We distinguish between the index of all parameters that are always measured and are accessed from the same iteration, and the cfr index, that is not always measured. Parameters ---------- index: int Global iteration index for all parameters but cfr cfr_index: int Iteration index for cfr process: bool (Optional, default = True) By default, when setting the indices, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place. """ # The cfr index is not allowed to be smaller than the global iteration index if index < cfr_index: raise ValueError( "The value of index must be greater than or equal to cfr_index." ) # Make sure there is loaded data if self.tot_num_entries == 0: raise ValueError("No data loaded.") if self._reps == -1: raise ValueError("No data loaded.") if len(self._valid_cfr) == 0: raise ValueError("No data loaded.") # Check that the arguments are compatible with the loaded data if index < 0 or index > self._reps - 1: raise ValueError( f"The value of index must be between 0 and {self._reps - 1}." ) if cfr_index < 0 or cfr_index > len(self._valid_cfr) - 1: raise ValueError( f"The value of index must be between 0 and {len(self._valid_cfr) - 1}." ) # Now set the general values self._efo_index = index self._dcr_index = index self._eco_index = index self._loc_index = index # Set the cfr index self._cfr_index = cfr_index # Constant indices self._tid_index: int = 0 self._tim_index: int = 0 self._vld_index: int = 0 # Re-process the file? If the processed dataframe already exists, # the processing will take place anyway. if process or self._processed_dataframe is not None: self._processed_dataframe = self._process()Set the parameter indices.
We distinguish between the index of all parameters that are always measured and are accessed from the same iteration, and the cfr index, that is not always measured.
Parameters
index:int- Global iteration index for all parameters but cfr
cfr_index:int- Iteration index for cfr
process:bool (Optional, default= True)- By default, when setting the indices, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place.
def set_pool_dcr(self, pool_dcr: bool, process: bool = True)-
Expand source code
def set_pool_dcr(self, pool_dcr: bool, process: bool = True): """ Sets whether the DCR values should be pooled (and weighted by ECO). Parameters ---------- pool_dcr: bool Whether the DCR values should be pooled (and weighted by ECO). process: bool (Optional, default = True) By default, when setting the DCR binning flag, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place. """ # Update the flag self._pool_dcr = pool_dcr # Re-process the file? if process or self._processed_dataframe is not None: self._processed_dataframe = self._process()Sets whether the DCR values should be pooled (and weighted by ECO).
Parameters
pool_dcr:bool- Whether the DCR values should be pooled (and weighted by ECO).
process:bool (Optional, default= True)- By default, when setting the DCR binning flag, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place.
def set_tracking(self, is_tracking: bool, process: bool = True)-
Expand source code
def set_tracking(self, is_tracking: bool, process: bool = True): """Sets whether the acquisition is tracking or localization. Parameters ---------- is_tracking: bool Set to True for a tracking acquisition, False for a localization acquisition. process: bool (Optional, default = True) By default, when setting the tracking flag, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place. """ # Update the flag self._is_tracking = is_tracking # Re-process the file? if process or self._processed_dataframe is not None: self._processed_dataframe = self._process()Sets whether the acquisition is tracking or localization.
Parameters
is_tracking:bool- Set to True for a tracking acquisition, False for a localization acquisition.
process:bool (Optional, default= True)- By default, when setting the tracking flag, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place.
class MinFluxReaderFactory-
Expand source code
class MinFluxReaderFactory: __docs__ = "Factory for MinFluxReader version 1 or 2." @staticmethod def get_reader(filename: Union[Path, str]) -> (MinFluxReader, str): """Returns the appropriate reader class for the passed filename. Usage ----- reader_class = MinFluxReaderFactory.get_reader(filename) # One of MinFluxReader or MinFluxReaderV2 reader = reader_class(filename, valid, z_scaling_factor, is_tracking, pool_dcr, dwell_time) Parameters ---------- filename: Union[Path, str] Full path to the `.pmx`, `.npy`, `.mat`, or '.json' file to read. Returns ------- reader: MinFluxReader class Either version 1 or version 2 MinFluxReader. Version 2 MinFluxReader supports Imspector >=24.10. """ # Check if the file exists filename = Path(filename) if not filename.exists(): return None, f"{filename} does not exist." # If filename is a folder, we check for a valid Zarr file if filename.is_dir(): if zarr.load(str(filename)) is not None: return MinFluxReaderV2, "" else: return None, f"{filename} is not a valid Zarr file." # Determine file type file_ext = filename.suffix.lower() # Check the file if file_ext == ".npy": reader_version = get_reader_version_for_npy_file(filename) elif file_ext == ".mat": reader_version = get_reader_version_for_mat_file(filename) elif file_ext == ".json": reader_version = 2 elif file_ext == ".pmx": reader_version = get_reader_version_for_pmx_file(filename) else: return None, f"{filename} is not supported." # Return the requested reader if reader_version == 1: return MinFluxReader, "" elif reader_version == 2: return MinFluxReaderV2, "" elif reader_version == -1: # In case parsing the files failed, the returned reader_version would be 1. return None, f"Error processing file {filename}." else: # Unexpected version number return None, f"MinFluxReader version {reader_version} is not supported."Static methods
def get_reader(filename: pathlib.Path | str) ‑> (, ) -
Expand source code
@staticmethod def get_reader(filename: Union[Path, str]) -> (MinFluxReader, str): """Returns the appropriate reader class for the passed filename. Usage ----- reader_class = MinFluxReaderFactory.get_reader(filename) # One of MinFluxReader or MinFluxReaderV2 reader = reader_class(filename, valid, z_scaling_factor, is_tracking, pool_dcr, dwell_time) Parameters ---------- filename: Union[Path, str] Full path to the `.pmx`, `.npy`, `.mat`, or '.json' file to read. Returns ------- reader: MinFluxReader class Either version 1 or version 2 MinFluxReader. Version 2 MinFluxReader supports Imspector >=24.10. """ # Check if the file exists filename = Path(filename) if not filename.exists(): return None, f"{filename} does not exist." # If filename is a folder, we check for a valid Zarr file if filename.is_dir(): if zarr.load(str(filename)) is not None: return MinFluxReaderV2, "" else: return None, f"{filename} is not a valid Zarr file." # Determine file type file_ext = filename.suffix.lower() # Check the file if file_ext == ".npy": reader_version = get_reader_version_for_npy_file(filename) elif file_ext == ".mat": reader_version = get_reader_version_for_mat_file(filename) elif file_ext == ".json": reader_version = 2 elif file_ext == ".pmx": reader_version = get_reader_version_for_pmx_file(filename) else: return None, f"{filename} is not supported." # Return the requested reader if reader_version == 1: return MinFluxReader, "" elif reader_version == 2: return MinFluxReaderV2, "" elif reader_version == -1: # In case parsing the files failed, the returned reader_version would be 1. return None, f"Error processing file {filename}." else: # Unexpected version number return None, f"MinFluxReader version {reader_version} is not supported."Returns the appropriate reader class for the passed filename.
Usage
reader_class = MinFluxReaderFactory.get_reader(filename) # One of MinFluxReader or MinFluxReaderV2 reader = reader_class(filename, valid, z_scaling_factor, is_tracking, pool_dcr, dwell_time)
Parameters
filename:Union[Path, str]- Full path to the
.pmx,.npy,.mat, or '.json' file to read.
Returns
reader:MinFluxReader class- Either version 1 or version 2 MinFluxReader. Version 2 MinFluxReader supports Imspector >=24.10.
class MinFluxReaderV2 (filename: pathlib.Path | str,
z_scaling_factor: float = 1.0,
is_tracking: bool = False,
pool_dcr: bool = False,
dwell_time: float = 1.0)-
Expand source code
class MinFluxReaderV2(MinFluxReader): """Reader of MINFLUX data in `.npy`, `.mat` and `.json` Imspector m2410 files, and `.pmx` version 0.6.0 and newer.""" def __init__( self, filename: Union[Path, str], z_scaling_factor: float = 1.0, is_tracking: bool = False, pool_dcr: bool = False, dwell_time: float = 1.0, ): """Constructor. Parameters ---------- filename: Union[Path, str] Full path to the `.pmx`, `.npy` or `.mat` file to read z_scaling_factor: float (optional, default = 1.0) Refractive index mismatch correction factor to apply to the z coordinates. is_tracking: bool (optional, default = False) Whether the dataset comes from a tracking experiment; otherwise, it is considered as a localization experiment. pool_dcr: bool (optional, default = False) Whether to pool DCR values weighted by the relative ECO of all relocalized iterations. dwell_time: float (optional, default 1.0) Dwell time in milliseconds. """ # Version 2 does not use the _full_raw_data_array property, but uses the full dataframe instead self._full_raw_dataframe = None # Store beamline monitoring data is present self._mbm = None # Call the base constructor super().__init__( filename=filename, valid=True, # Pass valid=True to the base class z_scaling_factor=z_scaling_factor, is_tracking=is_tracking, pool_dcr=pool_dcr, dwell_time=dwell_time, ) # Delete the _full_raw_data_array property from version 1 (version 2 does NOT # store the raw data from Imspector, but the processed dataframes that is # derived from it). del self._full_raw_data_array @property def version(self) -> int: return 2 @property def mbm_data(self): """Return the loaded beamline monitoring data.""" return self._mbm_data @property def valid_full_raw_dataframe(self) -> Union[None, np.ndarray]: """Return the raw data.""" if self.tot_num_entries == 0: return None return self._full_raw_dataframe[self._valid_entries].copy() @classmethod def processed_properties(cls) -> list: """Returns the properties read from the file that correspond to the processed dataframe column names.""" return [ "tid", "tim", "x", "y", "z", "efo", "cfr", "eco", "dcr", "dwell", "fluo", "fbg", "iid", # Custom: iteration ID ] def _load(self) -> bool: """Load the file.""" if not self._filename.exists(): print(f"File {self._filename} does not exist.") return False # Reset stored TID offsets self._tid_offsets = [] raw_dataframe = pd.DataFrame( columns=[ "vld", "fnl", "bot", "eot", "sta", "tim", "tid", "gri", "thi", "sqi", "itr", "x", "y", "z", "lncx", "lncy", "lncz", "eco", "ecc", "efo", "efc", "cfr", "dcr", "fbg", "fluo", # Custom: fluorophore ID "iid", # Custom: iteration ID ] ) # Do we have a Zarr file? if self._filename.is_dir(): # Create phony file_ext ".zarr" for the following logic file_ext = ".zarr" else: # Determine file type file_ext = self._filename.suffix.lower() # Call the specialized _load_*() function try: if file_ext == ".zarr": # Load and convert to NumPy raw_dataframe = self._load_zarr(raw_dataframe) elif file_ext == ".npy": raw_dataframe = self._load_numpy(raw_dataframe) elif file_ext == ".mat": raw_dataframe = self._load_mat(raw_dataframe) elif file_ext == ".json": raw_dataframe = self._load_json(raw_dataframe) elif file_ext == ".pmx": raw_dataframe = self._load_pmx(raw_dataframe) self._tid_offsets = PMXReader.get_tid_offsets(self._filename) else: print(f"Unexpected file {self._filename}.") return False except Exception as e: print(f"{e}") return False # Finalize the initialization for all imported file formats if file_ext in [".zarr", ".npy", ".mat", ".json"]: # Initialize the fluo field raw_dataframe.loc[:, "fluo"] = 1 # **Important**: apply data types **after** creating the dataframe to make sure that # data coming from binary types (.npy and .mat) and data coming from text types (.json) # generate identical dataframes. data_full_df_dtype = { "vld": "?", "fnl": "?", "bot": "?", "eot": "?", "sta": "u1", "tim": "<f8", "tid": "<u4", "gri": "<u4", "thi": "u1", "sqi": "u1", "itr": "<i4", "x": "<f8", "y": "<f8", "z": "<f8", "lncx": "<f8", "lncy": "<f8", "lncz": "<f8", "eco": "<u4", "ecc": "<u4", "efo": "<f4", "efc": "<f4", "fbg": "<f4", "cfr": "<f2", "dcr": "<f2", "fluo": "u1", "iid": "<u4", } # Apply the iteration ID. A new iid is started if: # 1. Previous row had fnl=True OR # 2. Current row has a different tid than the previous row # Calculate tid differences tid_values = raw_dataframe["tid"].values tid_diff = np.zeros(len(tid_values), dtype=bool) tid_diff[1:] = ( tid_values[1:] != tid_values[:-1] ) # Compare current with previous, skip first row # Calculate previous fnl flags prev_fnl = np.zeros(len(raw_dataframe), dtype=bool) prev_fnl[1:] = raw_dataframe["fnl"].values[:-1] # Previous row's fnl # Combine conditions - either previous row had fnl=True or current row has new tid new_iid = np.logical_or(prev_fnl, tid_diff) # Calculate iid by cumulative sum of these indicators, adding 1 for the first group raw_dataframe.loc[:, "iid"] = new_iid.cumsum() + 1 # Apply the correct datatypes to the columns raw_dataframe = raw_dataframe.astype(data_full_df_dtype) # Assign the new dataframe self._full_raw_dataframe = raw_dataframe # Store a logical array with the valid entries self._valid_entries = self._full_raw_dataframe["vld"] if not np.all(self._get_valid_subset()): print("All entries at this stage must be valid!") return False # Cache whether the data is 2D or 3D and whether is aggregated z_values = self._full_raw_dataframe[self._valid_entries]["z"].to_numpy() self._is_3d = np.abs(z_values).max() > 1e-11 # Set all relevant indices self._set_all_indices() # In case of a Zarr file, try loading beamline monitoring data if file_ext == ".zarr": self._load_mbm() # Return success return True def _load_mbm(self): """Load beamline monitoring data if present.""" # Make sure that self._filename points to the root of the Zarr file self._filename = find_zarr_root(self._filename) # Initialize dictionary mbm_data = {"mbm": {}} # Read grd/mbm mbm_points = zarr.load(str(self._filename / "grd" / "mbm" / "points")) mbm = zarr.load(str(self._filename / "mbm")) grd_mbm = zarr.load(str(self._filename / "grd" / "mbm")) if mbm_points is None or mbm is None or grd_mbm is None: print(f"No beamline monitoring data found in {self._filename}.") self._mbm_data = mbm_data return mbm_gri = grd_mbm.grp.points.attrs["points_by_gri"] mbm_neighbourhood = mbm.grp.attrs["neighbourhood"] # Get list of used beads from mbm attributes # https://wiki.abberior.rocks/MINFLUX_Files_and_Data#MBM_Information used_beads = mbm.grp.attrs.get("used", []) num_beads = 0 num_used_beads = 0 for key in mbm_gri: bead_name = mbm_gri[key]["name"] pts = mbm_points[mbm_points["gri"] == int(key)] bead_data = {"bead_name": bead_name, "gri": key, "used": 0, "points": pts} if bead_name in used_beads: bead_data["used"] = 1 num_used_beads += 1 mbm_data["mbm"][bead_name] = bead_data num_beads += 1 # Add mbm_neighbourhood information mbm_data["mbm_neighborhood"] = mbm_neighbourhood # Store the loaded information self._mbm_data = mbm_data print( f"Read {num_beads} " f"{'beads' if num_beads != 1 else 'bead'} ({num_used_beads} " f"used)." ) def _load_zarr(self, df: pd.DataFrame): """Load the Zarr file and update the dataframe.""" # Make sure that self._filename points to the root of the Zarr file self._filename = find_zarr_root(self._filename) # Path to "mfx" filename = self._filename / "mfx" if not filename.is_dir(): print("Could not open the Zarr file.") return None # Load array npy_array = np.array(zarr.load(str(filename))) if npy_array is None: print("Could not open the Zarr file.") return None # Drop all invalid entries npy_array = npy_array[npy_array["vld"]] # Fill the dataframe for name in npy_array.dtype.names: if name == "dcr": # In version 2, the dcr is 2D: dcr[:, 0] corresponds to the dcr of # version 1, while dcr[:, 1] is just 1.0 - dcr[:, 0]. We drop the # second dimension. df["dcr"] = npy_array["dcr"][:, 0] continue # Special cases if name == "loc": df["x"] = npy_array["loc"][:, 0] df["y"] = npy_array["loc"][:, 1] df["z"] = npy_array["loc"][:, 2] continue if name == "lnc": df["lncx"] = npy_array["lnc"][:, 0] df["lncy"] = npy_array["lnc"][:, 1] df["lncz"] = npy_array["lnc"][:, 2] continue # Single arrays df[name] = npy_array[name] # Incomplete traces are kept in the Zarr file; we drop them before # building the clean dataframe. thresh = int(np.max(df["itr"]) + 1) df = df[df.groupby("tid")["tid"].transform("size") >= thresh] return df def _load_numpy(self, df: pd.DataFrame): """Load the NumPy file and update the dataframe.""" try: # Load array npy_array = np.load(str(self._filename), allow_pickle=False) except ( OSError, UnpicklingError, ValueError, EOFError, FileNotFoundError, TypeError, Exception, ) as e: raise Exception(f"Could not open {self._filename}: {e}") # Drop all invalid entries npy_array = npy_array[npy_array["vld"]] # Fill the dataframe for name in npy_array.dtype.names: if name == "dcr": # In version 2, the dcr is 2D: dcr[:, 0] corresponds to the dcr of # version 1, while dcr[:, 1] is just 1.0 - dcr[:, 0]. We drop the # second dimension. df["dcr"] = npy_array["dcr"][:, 0] continue # Special cases if name == "loc": df["x"] = npy_array["loc"][:, 0] df["y"] = npy_array["loc"][:, 1] df["z"] = npy_array["loc"][:, 2] continue if name == "lnc": df["lncx"] = npy_array["lnc"][:, 0] df["lncy"] = npy_array["lnc"][:, 1] df["lncz"] = npy_array["lnc"][:, 2] continue # Single arrays df[name] = npy_array[name] return df def _load_mat(self, df: pd.DataFrame): """Load the MAT file and update the dataframe.""" # Load .mat file try: mat_array = loadmat(str(self._filename)) except (FileNotFoundError, ValueError) as e: raise Exception(f"Could not open {self._filename}: {e}") # Fill the dataframe for key in mat_array.keys(): if key in ["__header__", "__version__", "__globals__"]: continue if key == "dcr": # In version 2, the dcr is 2D: dcr[:, 0] corresponds to the dcr of # version 1, while dcr[:, 1] is just 1.0 - dcr[:, 0]. We drop the # second dimension. df["dcr"] = mat_array["dcr"][:, 0] continue # Special cases if key == "loc": df["x"] = mat_array["loc"][:, 0] df["y"] = mat_array["loc"][:, 1] df["z"] = mat_array["loc"][:, 2] continue if key == "lnc": df["lncx"] = mat_array["lnc"][:, 0] df["lncy"] = mat_array["lnc"][:, 1] df["lncz"] = mat_array["lnc"][:, 2] continue # Single arrays df[key] = mat_array[key].ravel() # Only keep valid entries df = df[df["vld"] == 1] return df def _load_json(self, df: pd.DataFrame): """Load the JSON file and update the dataframe.""" # Load array try: with open(str(self._filename), "r", encoding="utf-8") as f: json_array = json.load(f) except ( FileNotFoundError, UnicodeDecodeError, JSONDecodeError, Exception, ) as e: raise Exception(f"Could not open {self._filename}: {e}") # Create a dictionary of empty lists and keys matching the loaded ones dict_keys = list(json_array[0].keys()) + ["x", "y", "z", "lncx", "lncy", "lncz"] d = {key: [] for key in dict_keys} del d["loc"] del d["lnc"] for entry in json_array: for key in entry: if key == "dcr": # In version 2, the dcr is 2D: dcr[:, 0] corresponds to the dcr of # version 1, while dcr[:, 1] is just 1.0 - dcr[:, 0]. We drop the # second dimension. d["dcr"].append(entry["dcr"][0]) continue # Special cases if key == "loc": d["x"].append(entry["loc"][0]) d["y"].append(entry["loc"][1]) d["z"].append(entry["loc"][2]) continue if key == "lnc": d["lncx"].append(entry["lnc"][0]) d["lncy"].append(entry["lnc"][1]) d["lncz"].append(entry["lnc"][2]) continue d[key].append(entry[key]) # Fill dataframe for key in d: df[key] = d[key] # Only keep valid entries df = df[df["vld"]] return df def _load_pmx(self, df: pd.DataFrame): """Load the PMX file and update the dataframe.""" # Read filtered dataframe df = PMXReader.get_dataframe(self._filename) return df def _set_all_indices(self): """Set indices of properties to be read.""" if self.num_valid_entries == 0: return False # Number of iterations self._reps = int(np.max(self._full_raw_dataframe["itr"]) + 1) # Is this an aggregated acquisition? self._is_aggregated = self._reps == 1 # Query the data to find the last valid iteration # for all measurements try: last_valid = find_last_valid_iteration_v2( self._full_raw_dataframe, num_iterations=self._reps ) except ValueError as e: print(f"[ERROR] {e}") return False # Set the extracted indices self._efo_index = last_valid["efo_index"] self._cfr_index = last_valid["cfr_index"] self._dcr_index = last_valid["dcr_index"] self._eco_index = last_valid["eco_index"] self._loc_index = last_valid["loc_index"] self._valid_cfr = last_valid["valid_cfr"] self._relocalizations = last_valid["reloc"] # Keep track of the last valid iteration self._last_valid = len(self._valid_cfr) - 1 self._last_valid_cfr = last_valid["cfr_index"] def _extend_array_with_prepend(self, arr: np.array, n: int): """ Extends the input sorted NumPy array by prepending `n` consecutive values before each element. Elements where the gap from the previous kept element is <= `n` are discarded. Parameters ---------- arr: np.ndarray Sorted 1D NumPy array of integers. n: int Number of consecutive values to prepend before each element. Returns ------- ext_arr: np.ndarray Extended array with new values prepended. """ # Compute differences between consecutive elements diffs = np.diff(arr) # The first element is always kept keep_mask = np.concatenate(([True], diffs > n)) # Select elements to keep kept_elements = arr[keep_mask] # Generate new values for each kept element # For each element x in kept_elements, generate x-n, x-(n-1), ..., x-1 prepend_offsets = np.arange(n, 0, -1) new_values = ( kept_elements[:, np.newaxis] - prepend_offsets ) # Shape: (num_kept, n) # Flatten the new_values array new_values = new_values.flatten() # Combine new values with the kept elements combined = np.concatenate((new_values, kept_elements)) # Remove any potential duplicates and ensure the array is sorted extended_array = np.unique(combined) return extended_array def _get_valid_subset(self): """Returns the valid subset of the full dataframe from which to extract the requested iteration data.""" # MinFluxReaderV2 only works with valid entries val_indices = self._valid_entries # Valid data_valid_df = self._full_raw_dataframe.loc[val_indices] # Here we have to use different logic for tracking vs. localization # acquisitions. Tracking (and potentially other custom sequences) # only have one cfr value per trace id. condition = (data_valid_df["itr"].eq(self._cfr_index)).groupby( data_valid_df["tid"] ).sum() == 1 one_cfr_per_tid = np.all(condition) if one_cfr_per_tid: # Traces that only have one cfr in the first localization (and then not # measured anymore) are a special case of incomplete iterations. In this # case, we do not want to drop them: to make them valid, we copy the cfr # value from the first iterations to all subsequent localizations. indices = data_valid_df.index[ (data_valid_df["itr"] == self._cfr_index) | (data_valid_df["itr"] == self._loc_index) ].to_numpy() else: # For localizations, we preserve only all those iterations that have a full set # from the cfr iteration to the localized iteration: but for those we make sure # to have all relocalizations to support dcr pooling indices = data_valid_df.index[ data_valid_df["itr"] == self._loc_index ].to_numpy() if self._pool_dcr: start_index = ( self._loc_index - np.sum(self.relocalizations[: self._loc_index + 1]) + 1 ) else: start_index = self._cfr_index num_rows = self._loc_index - start_index if num_rows > 0: indices = self._extend_array_with_prepend(indices, num_rows) # @TODO DEBUG: remove when properly tested # assert np.all(np.unique(data_valid_df.iloc[indices]["itr"]) == np.arange(self._cfr_index, self._loc_index + 1)) return indices def _process(self) -> Union[None, pd.DataFrame]: """Returns processed dataframe for valid (or invalid) entries. Returns ------- df: pd.DataFrame Processed data as DataFrame. """ # Do we have a data array to work on? if self.tot_num_entries == 0: return None # Get valid subset valid_subset = self._get_valid_subset() data_valid_df = self._full_raw_dataframe.loc[valid_subset] # Extract the iteration IDs iid = data_valid_df["iid"].to_numpy() # Extract the valid iterations itr = data_valid_df["itr"].to_numpy() # Extract the valid identifiers tid = data_valid_df["tid"].to_numpy() # Extract the valid time points tim = data_valid_df["tim"].to_numpy() # Extract the fluorophore IDs fluo = data_valid_df["fluo"].to_numpy() if np.all(fluo) == 0: fluo = np.ones(fluo.shape, dtype=fluo.dtype) # The following extraction pattern will change whether the # acquisition is normal or aggregated if self.is_aggregated: # Extract the locations x = data_valid_df["x"].to_numpy() y = data_valid_df["y"].to_numpy() z = data_valid_df["z"].to_numpy() z *= self._z_scaling_factor # Extract EFO efo = data_valid_df["efo"].to_numpy() # Extract CFR cfr = data_valid_df["cfr"].to_numpy() # Extract ECO eco = data_valid_df["eco"].to_numpy() # Extract DCR dcr = data_valid_df["dcr"].to_numpy() # Extract the background fbg = data_valid_df["fbg"].to_numpy() # Dwell dwell = np.around((eco / (efo / 1000)) / self._dwell_time, decimals=0) else: # In contrast to version 1 of the reader and of the Imspector file formats, we now extract # by value and not by index! # Extract the iteration IDs iid = iid[itr == self._loc_index] # Trace IDs tid = tid[itr == self._loc_index] # Extract the valid time points tim = tim[itr == self._loc_index] # Extract the locations loc = ( data_valid_df[["x", "y", "z"]][itr == self._loc_index] * self._unit_scaling_factor ) loc["z"] *= self._z_scaling_factor x = loc["x"].to_numpy() y = loc["y"].to_numpy() z = loc["z"].to_numpy() # Extract EFO efo = data_valid_df["efo"][itr == self._efo_index].to_numpy() # Extract CFR (conditional to the presence of the last loc) cfr = data_valid_df["cfr"][itr == self._cfr_index].to_numpy() if len(cfr) < len(tid): # This is the (tracking) case where the cfr value is # stored from a non-relocalized iteration. Moreover, # this cfr is only measured for the first, complete, # sequence of the trace. _, counts = np.unique(tid, return_counts=True) cfr = np.repeat(cfr, counts) # Extract ECO eco = data_valid_df["eco"][itr == self._eco_index].to_numpy() # Extract the background fbg = data_valid_df["fbg"][itr == self._loc_index].to_numpy() # Fluorophore fluo = data_valid_df["fluo"][itr == self._loc_index].to_numpy() # Pool DCR values? num_relocs = int(np.sum(self._relocalizations[: self._loc_index + 1])) if self._pool_dcr and num_relocs > 1: # Calculate ECO contributions eco_all = data_valid_df["eco"].to_numpy().reshape(-1, num_relocs) eco_sum = eco_all.sum(axis=1) eco_all_norm = eco_all / eco_sum.reshape(-1, 1) # Extract DCR values and weigh them by the relative ECO contributions dcr = data_valid_df["dcr"].to_numpy().reshape(-1, num_relocs) dcr = dcr * eco_all_norm dcr = dcr.sum(axis=1) else: # Extract DCR dcr = data_valid_df["dcr"][itr == self._dcr_index].to_numpy() # Calculate dwell dwell = np.around((eco / (efo / 1000)) / self._dwell_time, decimals=0) # Create a Pandas dataframe for the results (make sure to use properties # from the V2 reader df = pd.DataFrame( index=pd.RangeIndex(start=0, stop=len(tid)), columns=self.processed_properties(), ) # Store the extracted valid hits into the dataframe df["tid"] = tid df["x"] = x df["y"] = y df["z"] = z df["tim"] = tim df["efo"] = efo df["cfr"] = cfr df["eco"] = eco df["dcr"] = dcr df["dwell"] = dwell df["fbg"] = fbg df["fluo"] = fluo df["iid"] = iid # Check if the selected indices correspond to the last valid iteration self._is_last_valid = bool( self._cfr_index == self._last_valid_cfr and self._efo_index == self._last_valid ) return dfReader of MINFLUX data in
.npy,.matand.jsonImspector m2410 files, and.pmxversion 0.6.0 and newer.Constructor.
Parameters
filename:Union[Path, str]- Full path to the
.pmx,.npyor.matfile to read z_scaling_factor:float (optional, default= 1.0)- Refractive index mismatch correction factor to apply to the z coordinates.
is_tracking:bool (optional, default= False)- Whether the dataset comes from a tracking experiment; otherwise, it is considered as a localization experiment.
pool_dcr:bool (optional, default= False)- Whether to pool DCR values weighted by the relative ECO of all relocalized iterations.
dwell_time:float (optional, default1.0)- Dwell time in milliseconds.
Ancestors
- pyminflux.reader._reader.MinFluxReader
Static methods
def processed_properties() ‑> list-
Returns the properties read from the file that correspond to the processed dataframe column names.
Instance variables
prop mbm_data-
Expand source code
@property def mbm_data(self): """Return the loaded beamline monitoring data.""" return self._mbm_dataReturn the loaded beamline monitoring data.
prop valid_full_raw_dataframe : numpy.ndarray | None-
Expand source code
@property def valid_full_raw_dataframe(self) -> Union[None, np.ndarray]: """Return the raw data.""" if self.tot_num_entries == 0: return None return self._full_raw_dataframe[self._valid_entries].copy()Return the raw data.
prop version : int-
Expand source code
@property def version(self) -> int: return 2
class PMXReader-
Expand source code
class PMXReader: """Reader of (processed) MINFLUX from native `.pmx` format.""" @staticmethod def get_metadata(filename) -> Union[PMXMetadata, None]: """Reads metadata information from `.pmx` files.""" # Open the file with h5py.File(filename, "r") as f: # Read the file_version attribute file_version = f.attrs["file_version"] # First, check that the version is known if file_version not in ["1.0", "2.0", "3.0"]: return None # Convert version string to number version_int = version_str_to_int(file_version) # Initialize parameters (for versions above version 1.0) tr_len_thresholds = None time_thresholds = None dwell_time = 1.0 is_tracking = False pool_dcr = False scale_bar_size = 500 # Version 1.0 parameters if version_int > 0: try: z_scaling_factor = float(f["parameters/z_scaling_factor"][()]) except KeyError: return None try: min_trace_length = int(f["parameters/min_trace_length"][()]) except KeyError: return None try: efo_thresholds = tuple(f["parameters/applied_efo_thresholds"][:]) except KeyError as e: efo_thresholds = None try: cfr_thresholds = tuple(f["parameters/applied_cfr_thresholds"][:]) except KeyError as e: cfr_thresholds = None try: num_fluorophores = int(f["parameters/num_fluorophores"][()]) except KeyError: return None # Version 2.0 parameters if version_int > 10000: # Parameters are present in the file, and we can read them try: # This setting can be missing tr_len_thresholds = tuple( f["parameters/applied_tr_len_thresholds"][:] ) except KeyError as e: tr_len_thresholds = None try: dwell_time = float(f["parameters/dwell_time"][()]) except KeyError as e: return None try: # This setting can be missing time_thresholds = tuple(f["parameters/applied_time_thresholds"][:]) except KeyError as e: time_thresholds = None # HDF5 does not have a native boolean type, so we save as int8 and convert it # back to boolean on read. try: is_tracking = bool(f["parameters/is_tracking"][()]) except KeyError as e: return None try: pool_dcr = bool(f["parameters/pool_dcr"][()]) except KeyError as e: # This is an addendum to version 2.0, and we allow it to be missing. # It will fall back to False. pool_dcr = False try: scale_bar_size = float(f["parameters/scale_bar_size"][()]) except KeyError as e: return None # Version 3.0 parameters # No new parameters # Store and return metadata = PMXMetadata( pool_dcr=pool_dcr, cfr_thresholds=cfr_thresholds, dwell_time=dwell_time, efo_thresholds=efo_thresholds, is_tracking=is_tracking, min_trace_length=min_trace_length, num_fluorophores=num_fluorophores, scale_bar_size=scale_bar_size, time_thresholds=time_thresholds, tr_len_thresholds=tr_len_thresholds, z_scaling_factor=z_scaling_factor, ) return metadata @staticmethod def get_fluorophore_names(filename: Union[Path, str]) -> dict: """Read fluorophore names from `.pmx` files. Parameters ---------- filename: Union[Path, str] Full path to the `.pmx` file to scan. Returns ------- fluorophore_names: dict Dictionary mapping fluo_id (int) to name (str). Returns empty dict if fluorophore names are not present (backwards compatibility). """ try: with h5py.File(filename, "r") as f: # Try to read fluorophore names from parameters group if "parameters" in f and "fluorophore_names" in f["parameters"].attrs: import json names_json = f["parameters"].attrs["fluorophore_names"] # Parse JSON and convert keys back to integers names_dict = json.loads(names_json) return {int(k): v for k, v in names_dict.items()} except Exception: pass # Return empty dict for backwards compatibility (will use default string representation) return {} @staticmethod def get_tid_offsets(filename: Union[Path, str]) -> list: """Read TID offset mapping from `.pmx` files. Returns ------- tid_offsets: list List of (first_iid, tid_offset) tuples. Returns empty list if not present. """ try: with h5py.File(filename, "r") as f: if "parameters" in f and "tid_offsets" in f["parameters"].attrs: import json offsets_json = f["parameters"].attrs["tid_offsets"] if isinstance(offsets_json, bytes): offsets_json = offsets_json.decode("utf-8") offsets = json.loads(offsets_json) tid_offsets = [] for entry in offsets: if isinstance(entry, dict): first_iid = int(entry.get("first_iid", 0)) tid_offset = int(entry.get("tid_offset", 0)) else: # Fallback for list/tuple format if len(entry) < 2: continue first_iid = int(entry[0]) tid_offset = int(entry[1]) tid_offsets.append((first_iid, tid_offset)) return tid_offsets except Exception: pass return [] @staticmethod def get_dataframe(filename: Union[Path, str]): """Return the full dataframe. Parameters ---------- filename: Union[Path, str] Full path to the `.pmx` file to scan. """ with h5py.File(filename, "r") as f: # Read the file_version attribute file_version = f.attrs["file_version"] if file_version != "3.0": return None # Read the reader_version attribute: it must be 2 reader_version = f.attrs["reader_version"] if reader_version != 2: raise ValueError("`reader_version` must be 2.") # # Read raw dataset # dataset = f["/raw/df"] # Read the NumPy data data_array = dataset[:] # Read column names column_names = dataset.attrs["column_names"] # Read column data types column_types = dataset.attrs["column_types"] # Read the index index_data = f["/raw/df_index"][:] # Create DataFrame with specified columns df = pd.DataFrame(data_array, index=index_data, columns=column_names) # Apply column data types for col, dtype in zip(column_names, column_types): df[col] = df[col].astype(dtype) return df @staticmethod def get_filtered_dataframe(filename: Union[Path, str]): """Reads the Pandas DataFrame from `.pmx` files versions 1.0, 2.0, and 3.0. Parameters ---------- filename: Union[Path, str] Full path to the `.pmx` file to scan. """ with h5py.File(filename, "r") as f: # Read the file_version attribute file_version = f.attrs["file_version"] if file_version not in ["1.0", "2.0", "3.0"]: return None # Read dataset dataset = f["/paraview/dataframe"] # Read the NumPy data data_array = dataset[:] # Read column names column_names = dataset.attrs["column_names"] # Read column data types column_types = dataset.attrs["column_types"] # Read the index index_data = f["/paraview/dataframe_index"][:] # Create DataFrame with specified columns df = pd.DataFrame(data_array, index=index_data, columns=column_names) # Apply column data types for col, dtype in zip(column_names, column_types): df[col] = df[col].astype(dtype) return df @staticmethod def get_array(filename: Union[Path, str]): """Returns the raw Numpy array (filtered). This applies to: - pmx files version 1.0, 2.0 - pmx files version 3.0 with reader version 1 pmx files version 3.0 with reader version 2 only store the (filtered) raw dataframe. Parameters ---------- filename: Union[Path, str] Full path to the `.pmx` file to scan. """ # Open the file and read the data with h5py.File(filename, "r") as f: # Read the file_version attribute file_version = f.attrs["file_version"] if file_version not in ["1.0", "2.0", "3.0"]: return None if file_version == "3.0": reader_version = f.attrs["reader_version"] if reader_version == 1: data_array = f["raw/npy"][:] else: return None else: # We only read the raw NumPy array data_array = f["raw/npy"][:] return data_arrayReader of (processed) MINFLUX from native
.pmxformat.Static methods
def get_array(filename: pathlib.Path | str)-
Expand source code
@staticmethod def get_array(filename: Union[Path, str]): """Returns the raw Numpy array (filtered). This applies to: - pmx files version 1.0, 2.0 - pmx files version 3.0 with reader version 1 pmx files version 3.0 with reader version 2 only store the (filtered) raw dataframe. Parameters ---------- filename: Union[Path, str] Full path to the `.pmx` file to scan. """ # Open the file and read the data with h5py.File(filename, "r") as f: # Read the file_version attribute file_version = f.attrs["file_version"] if file_version not in ["1.0", "2.0", "3.0"]: return None if file_version == "3.0": reader_version = f.attrs["reader_version"] if reader_version == 1: data_array = f["raw/npy"][:] else: return None else: # We only read the raw NumPy array data_array = f["raw/npy"][:] return data_arrayReturns the raw Numpy array (filtered). This applies to:
- pmx files version 1.0, 2.0
- pmx files version 3.0 with reader version 1
pmx files version 3.0 with reader version 2 only store the (filtered) raw dataframe.
Parameters
filename:Union[Path, str]- Full path to the
.pmxfile to scan.
def get_dataframe(filename: pathlib.Path | str)-
Expand source code
@staticmethod def get_dataframe(filename: Union[Path, str]): """Return the full dataframe. Parameters ---------- filename: Union[Path, str] Full path to the `.pmx` file to scan. """ with h5py.File(filename, "r") as f: # Read the file_version attribute file_version = f.attrs["file_version"] if file_version != "3.0": return None # Read the reader_version attribute: it must be 2 reader_version = f.attrs["reader_version"] if reader_version != 2: raise ValueError("`reader_version` must be 2.") # # Read raw dataset # dataset = f["/raw/df"] # Read the NumPy data data_array = dataset[:] # Read column names column_names = dataset.attrs["column_names"] # Read column data types column_types = dataset.attrs["column_types"] # Read the index index_data = f["/raw/df_index"][:] # Create DataFrame with specified columns df = pd.DataFrame(data_array, index=index_data, columns=column_names) # Apply column data types for col, dtype in zip(column_names, column_types): df[col] = df[col].astype(dtype) return dfReturn the full dataframe.
Parameters
filename:Union[Path, str]- Full path to the
.pmxfile to scan.
def get_filtered_dataframe(filename: pathlib.Path | str)-
Expand source code
@staticmethod def get_filtered_dataframe(filename: Union[Path, str]): """Reads the Pandas DataFrame from `.pmx` files versions 1.0, 2.0, and 3.0. Parameters ---------- filename: Union[Path, str] Full path to the `.pmx` file to scan. """ with h5py.File(filename, "r") as f: # Read the file_version attribute file_version = f.attrs["file_version"] if file_version not in ["1.0", "2.0", "3.0"]: return None # Read dataset dataset = f["/paraview/dataframe"] # Read the NumPy data data_array = dataset[:] # Read column names column_names = dataset.attrs["column_names"] # Read column data types column_types = dataset.attrs["column_types"] # Read the index index_data = f["/paraview/dataframe_index"][:] # Create DataFrame with specified columns df = pd.DataFrame(data_array, index=index_data, columns=column_names) # Apply column data types for col, dtype in zip(column_names, column_types): df[col] = df[col].astype(dtype) return dfReads the Pandas DataFrame from
.pmxfiles versions 1.0, 2.0, and 3.0.Parameters
filename:Union[Path, str]- Full path to the
.pmxfile to scan.
def get_fluorophore_names(filename: pathlib.Path | str) ‑> dict-
Expand source code
@staticmethod def get_fluorophore_names(filename: Union[Path, str]) -> dict: """Read fluorophore names from `.pmx` files. Parameters ---------- filename: Union[Path, str] Full path to the `.pmx` file to scan. Returns ------- fluorophore_names: dict Dictionary mapping fluo_id (int) to name (str). Returns empty dict if fluorophore names are not present (backwards compatibility). """ try: with h5py.File(filename, "r") as f: # Try to read fluorophore names from parameters group if "parameters" in f and "fluorophore_names" in f["parameters"].attrs: import json names_json = f["parameters"].attrs["fluorophore_names"] # Parse JSON and convert keys back to integers names_dict = json.loads(names_json) return {int(k): v for k, v in names_dict.items()} except Exception: pass # Return empty dict for backwards compatibility (will use default string representation) return {}Read fluorophore names from
.pmxfiles.Parameters
filename:Union[Path, str]- Full path to the
.pmxfile to scan.
Returns
fluorophore_names:dict- Dictionary mapping fluo_id (int) to name (str). Returns empty dict if fluorophore names are not present (backwards compatibility).
def get_metadata(filename) ‑> pyminflux.reader.metadata._metadata.PMXMetadata | None-
Expand source code
@staticmethod def get_metadata(filename) -> Union[PMXMetadata, None]: """Reads metadata information from `.pmx` files.""" # Open the file with h5py.File(filename, "r") as f: # Read the file_version attribute file_version = f.attrs["file_version"] # First, check that the version is known if file_version not in ["1.0", "2.0", "3.0"]: return None # Convert version string to number version_int = version_str_to_int(file_version) # Initialize parameters (for versions above version 1.0) tr_len_thresholds = None time_thresholds = None dwell_time = 1.0 is_tracking = False pool_dcr = False scale_bar_size = 500 # Version 1.0 parameters if version_int > 0: try: z_scaling_factor = float(f["parameters/z_scaling_factor"][()]) except KeyError: return None try: min_trace_length = int(f["parameters/min_trace_length"][()]) except KeyError: return None try: efo_thresholds = tuple(f["parameters/applied_efo_thresholds"][:]) except KeyError as e: efo_thresholds = None try: cfr_thresholds = tuple(f["parameters/applied_cfr_thresholds"][:]) except KeyError as e: cfr_thresholds = None try: num_fluorophores = int(f["parameters/num_fluorophores"][()]) except KeyError: return None # Version 2.0 parameters if version_int > 10000: # Parameters are present in the file, and we can read them try: # This setting can be missing tr_len_thresholds = tuple( f["parameters/applied_tr_len_thresholds"][:] ) except KeyError as e: tr_len_thresholds = None try: dwell_time = float(f["parameters/dwell_time"][()]) except KeyError as e: return None try: # This setting can be missing time_thresholds = tuple(f["parameters/applied_time_thresholds"][:]) except KeyError as e: time_thresholds = None # HDF5 does not have a native boolean type, so we save as int8 and convert it # back to boolean on read. try: is_tracking = bool(f["parameters/is_tracking"][()]) except KeyError as e: return None try: pool_dcr = bool(f["parameters/pool_dcr"][()]) except KeyError as e: # This is an addendum to version 2.0, and we allow it to be missing. # It will fall back to False. pool_dcr = False try: scale_bar_size = float(f["parameters/scale_bar_size"][()]) except KeyError as e: return None # Version 3.0 parameters # No new parameters # Store and return metadata = PMXMetadata( pool_dcr=pool_dcr, cfr_thresholds=cfr_thresholds, dwell_time=dwell_time, efo_thresholds=efo_thresholds, is_tracking=is_tracking, min_trace_length=min_trace_length, num_fluorophores=num_fluorophores, scale_bar_size=scale_bar_size, time_thresholds=time_thresholds, tr_len_thresholds=tr_len_thresholds, z_scaling_factor=z_scaling_factor, ) return metadataReads metadata information from
.pmxfiles. def get_tid_offsets(filename: pathlib.Path | str) ‑> list-
Expand source code
@staticmethod def get_tid_offsets(filename: Union[Path, str]) -> list: """Read TID offset mapping from `.pmx` files. Returns ------- tid_offsets: list List of (first_iid, tid_offset) tuples. Returns empty list if not present. """ try: with h5py.File(filename, "r") as f: if "parameters" in f and "tid_offsets" in f["parameters"].attrs: import json offsets_json = f["parameters"].attrs["tid_offsets"] if isinstance(offsets_json, bytes): offsets_json = offsets_json.decode("utf-8") offsets = json.loads(offsets_json) tid_offsets = [] for entry in offsets: if isinstance(entry, dict): first_iid = int(entry.get("first_iid", 0)) tid_offset = int(entry.get("tid_offset", 0)) else: # Fallback for list/tuple format if len(entry) < 2: continue first_iid = int(entry[0]) tid_offset = int(entry[1]) tid_offsets.append((first_iid, tid_offset)) return tid_offsets except Exception: pass return []Read TID offset mapping from
.pmxfiles.Returns
tid_offsets:list- List of (first_iid, tid_offset) tuples. Returns empty list if not present.