Module pyminflux.reader.util

Functions

def find_last_valid_iteration(data_array: numpy.ndarray)
Expand source code
def find_last_valid_iteration(data_array: np.ndarray):
    """Find last valid iteration across all relevant parameters.

    Parameters
    ----------

    data_array: np.ndarray
        MINFLUX NumPy array.
    """

    # Initialize output
    last_valid = {
        "efo_index": -1,
        "cfr_index": -1,
        "dcr_index": -1,
        "eco_index": -1,
        "loc_index": -1,
        "valid_cfr": [],
    }

    # Number of iterations
    num_iterations = data_array["itr"].shape[1]

    # Do we have aggregated measurements?
    if num_iterations == 1:
        # For clarity, let's set the indices to 0
        last_valid = {
            "efo_index": 0,
            "cfr_index": 0,
            "dcr_index": 0,
            "eco_index": 0,
            "loc_index": 0,
            "valid_cfr": [True],
            "reloc": [False],
        }
        return last_valid

    # Set efo index
    last_valid["efo_index"] = num_iterations - 1

    # Set cfr index
    last_valid["valid_cfr"] = (np.std(data_array["itr"]["cfr"], axis=0) > 0.0).tolist()
    valid_indices = np.where(last_valid["valid_cfr"])[0]
    if len(valid_indices) == 0:
        last_valid["cfr_index"] = num_iterations - 1
    else:
        last_valid["cfr_index"] = valid_indices[-1]

    # Set relocalized iterations: here we can use a simple trick to understand
    # which iterations re-localize. If the second localization of the first TID
    # with more than one is NaN, the iteration does not relocalize, otherwise
    # it does.

    # Is the first trace longer than two localizations?
    reloc_index = None
    if data_array["tid"][1] == data_array["tid"][0]:
        reloc_index = 1
    else:
        u_tids = np.unique(data_array["tid"])
        for i, u_tid in enumerate(u_tids):
            if i == 0:
                # We already know that this tid only has one localization
                continue
            (indices,) = np.where(data_array["tid"] == u_tid)
            if len(indices) < 2:
                continue
            if data_array["tid"][indices[1]] == data_array["tid"][indices[0]]:
                # Found valid index
                reloc_index = indices[1]
                break
    if reloc_index is None:
        # The whole dataset does not contain any iteration with more than one localization
        last_valid["reloc"] = [False] * num_iterations
    else:
        last_valid["reloc"] = np.logical_not(
            np.isnan(data_array["itr"]["loc"][reloc_index, :, 0])
        )

    # Set dcr index
    last_valid["dcr_index"] = num_iterations - 1

    # Set eco index
    last_valid["eco_index"] = num_iterations - 1

    # Set loc index
    last_valid["loc_index"] = num_iterations - 1

    return last_valid

Find last valid iteration across all relevant parameters.

Parameters

data_array : np.ndarray
MINFLUX NumPy array.
def find_last_valid_iteration_v2(data_full_df: pandas.core.frame.DataFrame, num_iterations: int | None = None)
Expand source code
def find_last_valid_iteration_v2(
    data_full_df: pd.DataFrame, num_iterations: Optional[int] = None
):
    """Find last valid iteration across all relevant parameters.

    Parameters
    ----------

    data_full_df: pd.DataFrame
        Full processed DataFrame.

    num_iterations: int
        Maximum number of iterations per localization. Omit to scan it from the data.
    """

    # Initialize output
    last_valid = {
        "efo_index": -1,
        "cfr_index": -1,
        "dcr_index": -1,
        "eco_index": -1,
        "loc_index": -1,
        "valid_cfr": [],
    }

    # Number of iterations
    if num_iterations is None:
        num_iterations = int(np.max(data_full_df["itr"]) + 1)
    num_iterations = int(num_iterations)

    # Do we have aggregated measurements?
    if num_iterations == 1:
        # For clarity, let's set the indices to 0
        last_valid = {
            "efo_index": 0,
            "cfr_index": 0,
            "dcr_index": 0,
            "eco_index": 0,
            "loc_index": 0,
            "valid_cfr": [True],
            "reloc": [False],
        }
        return last_valid

    # Extract trace starting indices: all localizations are valid at this point
    (trace_start,) = np.where(data_full_df["bot"].to_numpy())

    # Set cfr index
    offsets = np.arange(num_iterations)[:, np.newaxis]
    indices = trace_start + offsets
    cfr = data_full_df["cfr"].to_numpy()[indices]

    # In the pathological case where there is only one trace in the dataset,
    # we cannot look at the variation in cfr values across traces, but only
    # at the values in this specific one.
    if len(trace_start) == 1:
        last_valid["valid_cfr"] = (cfr.ravel() > 0.0).tolist()
    else:
        last_valid["valid_cfr"] = (np.std(cfr, axis=1) > 0.0).tolist()
    valid_indices = np.where(last_valid["valid_cfr"])[0]
    if len(valid_indices) == 0:
        last_valid["cfr_index"] = num_iterations - 1
    else:
        last_valid["cfr_index"] = valid_indices[-1]

    # Set efo index
    last_valid["efo_index"] = num_iterations - 1

    # Again, in the pathological case where there is only one trace, we add a fake
    # next trace start for the next steps to work.
    if len(trace_start) == 1:
        trace_start = np.append(trace_start, [len(data_full_df.index)]).ravel()

    # Find indices (values) of relocalized iterations (from the first complete iteration)
    (complete_iterations,) = np.where(np.diff(trace_start) >= num_iterations)
    if len(complete_iterations) == 0:
        raise ValueError("No complete iterations found!")
    first_complete_iteration = complete_iterations[0]

    # For robustness, use more than one trace to extract the repeating iterations
    num_iterations_to_consider = min(10, len(complete_iterations))
    candidates = data_full_df["itr"].to_numpy()[
        trace_start[first_complete_iteration] : trace_start[
            first_complete_iteration + num_iterations_to_consider
        ]
    ]
    candidates = candidates[num_iterations:]

    # Remove full iterations, what is left are the relocalized indices
    filtered_candidates = remove_subarray_occurrences(
        candidates, np.arange(num_iterations)
    )
    reloc = np.unique(filtered_candidates)
    last_valid["reloc"] = np.array([False] * num_iterations)
    last_valid["reloc"][reloc] = True
    last_valid["reloc"] = last_valid["reloc"].tolist()

    # Set dcr index
    last_valid["dcr_index"] = num_iterations - 1

    # Set eco index
    last_valid["eco_index"] = num_iterations - 1

    # Set loc index
    last_valid["loc_index"] = num_iterations - 1

    return last_valid

Find last valid iteration across all relevant parameters.

Parameters

data_full_df : pd.DataFrame
Full processed DataFrame.
num_iterations : int
Maximum number of iterations per localization. Omit to scan it from the data.
def get_reader_version_for_mat_file(file_path)
Expand source code
def get_reader_version_for_mat_file(file_path):
    """Return version of MinFluxReader required to open this Imspector .mat file without loading it.


    filename: Union[Path, str]
        Full path to the `.mat` file to scan.

    Returns
    -------

    reader_version: int
        Return the version for the MinFluxReader version needed to open this Imspector *.mat file.
        If the file is invalid or in case of error, return -1.
    """

    # Returns a list of tuples: (variable name, shape, dtype)
    try:
        mat_metadata = whosmat(file_path)
    except ValueError as e:
        print(f"Error parsing file {file_path}: {e}")
        return -1

    # Process the list
    reader_version = 1
    for variables in mat_metadata:
        if (
            variables[0] == "fnl"
            or variables[0] == "bot"
            or variables[0] == "eot"
            or (variables[0] == "itr" and variables[1] == "int32")
        ):
            reader_version = 2
            break

    return reader_version

Return version of MinFluxReader required to open this Imspector .mat file without loading it.

filename: Union[Path, str] Full path to the .mat file to scan.

Returns

reader_version : int
Return the version for the MinFluxReader version needed to open this Imspector *.mat file. If the file is invalid or in case of error, return -1.
def get_reader_version_for_npy_file(file_path)
Expand source code
def get_reader_version_for_npy_file(file_path):
    """Return version of MinFluxReader required to open this Imspector .npy file without loading it.


    filename: Union[Path, str]
        Full path to the `.npy` file to scan.

    Returns
    -------

    reader_version: int
        Return the version for the MinFluxReader version needed to open this Imspector *.npy file.
        If the file is invalid or in case of error, return -1.
    """
    with open(file_path, "rb") as f:
        # Read the magic string
        magic_string = f.read(6)

        if not magic_string == b"\x93NUMPY":
            print(f"{file_path} is not a valid .npy file.")
            return -1

        # Set file pointer at the beginning of the header
        f.seek(8)

        # Manual header parsing, since `np.lib.format.read_array_header_{1|2}_0` seems to find
        # issues with the header structure (at least in some files)
        header_length = int.from_bytes(f.read(2), byteorder="little")
        header_bytes = f.read(header_length)

        try:
            # Try decoding header manually
            header_str = header_bytes.decode("ascii")

            # Basic validation of header string
            header_str = header_str.replace("\n", "").replace(" ", "")

            # Use ast.literal_eval to safely parse
            header_dict = ast.literal_eval(header_str)

        except Exception as parse_error:
            print(f"Manual parsing failed: {parse_error}")
            return -1

    # Process header dictionary
    reader_version = 1
    for dtype in header_dict["descr"]:
        if (
            dtype[0] == "fnl"
            or dtype[0] == "bot"
            or dtype[0] == "eot"
            or (dtype[0] == "itr" and dtype[1] == "<i4")
        ):
            reader_version = 2
            break

    return reader_version

Return version of MinFluxReader required to open this Imspector .npy file without loading it.

filename: Union[Path, str] Full path to the .npy file to scan.

Returns

reader_version : int
Return the version for the MinFluxReader version needed to open this Imspector *.npy file. If the file is invalid or in case of error, return -1.
def get_reader_version_for_pmx_file(file_path)
Expand source code
def get_reader_version_for_pmx_file(file_path):
    """Return version of MinFluxReader required to open this .pmx file without loading it.


    filename: Union[Path, str]
        Full path to the `.pmx` file to scan.

    Returns
    -------

    reader_version: int
        Return the version for the MinFluxReader version needed to open this *.pmx file.
        If the file is invalid or in case of error, return -1.
    """
    # Open the file and read the data
    with h5py.File(file_path, "r") as f:

        # Read the file_version attribute
        file_version = f.attrs["file_version"]

        if file_version not in ["1.0", "2.0", "3.0"]:
            return -1

        if file_version in ["1.0", "2.0"]:
            return 1
        else:
            try:
                reader_version = int(f.attrs["reader_version"])
            except (KeyError, ValueError) as e:
                reader_version = -1
            return reader_version

Return version of MinFluxReader required to open this .pmx file without loading it.

filename: Union[Path, str] Full path to the .pmx file to scan.

Returns

reader_version : int
Return the version for the MinFluxReader version needed to open this *.pmx file. If the file is invalid or in case of error, return -1.
def version_str_to_int(version_string: str) ‑> int
Expand source code
def version_str_to_int(version_string: str) -> int:
    """Convert version string in the form MAJOR.MINOR(.PATCH) to int for comparisons."""

    version_pattern = re.compile(r"^(\d+)\.(\d+)(?:\.(\d+))?$")

    match = version_pattern.match(version_string)
    if not match:
        raise ValueError(f"Invalid version string format: {version_string}")
    major = int(match.group(1))
    minor = int(match.group(2))
    patch = int(match.group(3)) if match.group(3) else 0
    return major * 10000 + minor * 100 + patch

Convert version string in the form MAJOR.MINOR(.PATCH) to int for comparisons.