Module Widefield-Imaging-Acquisition.src.controls

Expand source code
import time
import sys
import os

try:
    import nidaqmx
    from nidaqmx.constants import AcquisitionType
    from pylablib.devices import IMAQ
except ModuleNotFoundError:
    pass
import numpy as np
from src.calculations import (
    extend_light_signal,
    shrink_array,
    find_rising_indices,
    reduce_stack,
    get_dictionary,
)
from src.waveforms import digital_square
import warnings
import logging

logging.basicConfig(
    filename="app.log", filemode="a", format="%(name)s - %(levelname)s - %(message)s"
)


warnings.filterwarnings("ignore")

config = get_dictionary(
    os.path.join(os.path.dirname(os.path.dirname(__file__)), "config.json")
)


class Instrument:
    def __init__(self, port, name):
        """A class used to represent a analog or digital instrument controlled by a DAQ

        Args:
            port (str): The associated physical port
            name (str): The name of the instrument
        """
        self.port = port
        self.name = name


class Camera(Instrument):
    def __init__(self, port, name):
        """A class used to represent a physical camera connected to the computer by a framegrabber

        Args:
            port (str): The name of the camera physical trigger port
            name (str): The name of the camera (can be found using NI-MAX)
        """
        super().__init__(port, name)
        self.frames = []
        self.baseline_frames = []
        self.stop_signal = False
        self.frames_read = 0
        self.video_running = False
        try:
            self.set_binning(config["Binning"])
            self.cam = IMAQ.IMAQCamera("img0")
            self.set_window(config["Binning"])
            self.cam.setup_acquisition()
            self.cam.start_acquisition()
        except Exception as err:
            pass

    def initialize(self, daq):
        """Initialize / Reset the camera parameters

        Args:
            daq (DAQ): The associated DAQ instance
        """
        self.daq = daq
        self.daq.stop_signal = False
        self.frames = []
        self.frames_read_list = []
        self.baseline_read_list = []
        self.frames_read = 0

    def set_binning(self, binning):
        """Set the binning of the camera

        Args:
            binning (int): The binning factor
        """
        lines = []
        with open(
            "C:\\Users\\Public\\Documents\\National Instruments\\NI-IMAQ\\Data\\Dalsa 1M60.icd",
            "r",
        ) as read_file:
            for i, line in enumerate(read_file):
                if i == 17:
                    lines.append(
                        f"      MaxImageSize ({str(int(1024/binning))}, {str(int(1024/binning))})"
                    )
                elif i == 2041:
                    lines.append(
                        f"                           Current ({binning}x{binning})"
                    )
                else:
                    lines.append(line.strip("\n"))
            with open(
                "C:\\Users\\Public\\Documents\\National Instruments\\NI-IMAQ\\Data\\Dalsa 1M60.icd",
                "w",
            ) as write_file:
                write_file.write("\n".join(lines))

    def set_window(self, binning):
        """Set the window of the camera to the correct size for the binning factor"""
        self.cam.set_grabber_attribute_value(
            "IMG_ATTR_ACQWINDOW_HEIGHT", int(1024 / binning), kind="auto"
        )
        self.cam.set_grabber_attribute_value(
            "IMG_ATTR_ACQWINDOW_WIDTH", int(1024 / binning), kind="auto"
        )

    def delete_frames(self):
        """Read all frames in the buffer"""
        self.cam.read_multiple_images()

    def loop(self, task):
        """While camera is running, add each acquired frame to a frames list

        Args:
            task (Task): The nidaqmx task used to track if acquisition is finished
        """
        self.task = task
        while task.is_task_done() is False and self.daq.stop_signal is False:
            try:
                self.cam.wait_for_frame(timeout=0.1)
                new_frames = self.cam.read_multiple_images()
                self.frames += new_frames
                self.video_running = True
                if self.adding_frames:
                    self.baseline_data += new_frames
                    self.frames_read_list.append(self.frames_read)
                if self.baseline_completed:
                    self.baseline_frames += new_frames
                    self.baseline_read_list.append(self.frames_read)
                self.frames_read += len(new_frames)
            except Exception as err:
                print("cam err")
                print(err)
                pass
        self.frames += self.cam.read_multiple_images()
        self.video_running = False

    def save(self, directory, extents):
        """Save the acquired frames (reduced if necessary) to a 3D NPY file

        Args:
            directory (str): The location in which to save the NPY file
            extents (tuple): The positions of the corners used to resize the frames
                             Equal to None if original size is kept
        """
        try:
            if extents:
                self.frames = shrink_array(self.frames, extents)
            if self.is_saving:
                while self.is_saving:
                    print("is saving")
                    pass
            print(len(self.frames))
            np.save(
                os.path.join(directory, "data", f"{self.file_index}.npy"), self.frames
            )
        except Exception as err:
            print("cam save err")
            print(err)
            pass


class DAQ:
    def __init__(self, name, lights, stimuli, camera, framerate, exposure):
        """A class used to represent a Data Acquisition Device (DAQ)

        Args:
            name (str): The name of the DAQ
            lights (list of Instrument): A list of the lights used in the experiment
            stimuli (list of Instrument): A list of the stimuli used in the experiment
            camera (Camera): The camera used in the experiment
            framerate (int): The acquisition framerate
            exposure (float): The exposure time in seconds
        """
        self.name = name
        self.trigger_activated = False
        self.trigger_port = None
        self.framerate, self.exposure = framerate, exposure
        self.stop_signal = False
        self.lights, self.stimuli, self.camera = lights, stimuli, camera
        self.tasks, self.light_signals, self.stim_signal, self.camera_signal = (
            [],
            [],
            [],
            None,
        )

    def close_all_lights(self, ports):
        self.lights = [
            Instrument(ports["infrared"], "ir"),
            Instrument(ports["red"], "red"),
            Instrument(ports["green"], "green"),
            Instrument(ports["blue"], "blue"),
        ]

        if config["Widefield Computer"]:
            with nidaqmx.Task(new_task_name="lights") as l_task:
                with nidaqmx.Task(new_task_name="a_stimuli") as s_task:
                    for light in self.lights:
                        l_task.do_channels.add_do_chan(f"{self.name}/{light.port}")
                    l_task.do_channels.add_do_chan(f"{self.name}/{self.camera.port}")
                    for stimulus in self.stimuli:
                        if "ao0" in stimulus.port or "ao1" in stimulus.port:
                            s_task.ao_channels.add_ao_voltage_chan(
                                f"{self.name}/{stimulus.port}"
                            )
                        else:
                            l_task.do_channels.add_do_chan(
                                f"{self.name}/{stimulus.port}"
                            )
                    self.sample([s_task, l_task], [False, False])
                    s_task.write([[0, 0], [0, 0]])
                    l_task.write(
                        [
                            [False, False],
                            [False, False],
                            [False, False],
                            [False, False],
                            [False, False],
                            [False, False],
                        ]
                    )
                    self.start([s_task, l_task])

        else:
            time.sleep(2)
            self.stop_signal = True
            pass

        self.lights = []

    def launch(self, name, time_values, stim_values):
        """Generate stimulation, light and camera signal and write them to the DAQ

        Args:
            name (str): The name of the experiment
            time_values (array): A array containing the time values
            stim_values (array): A array containing the stimulation values
        """
        self.reset_daq()
        self.experiment_name = name
        self.time_values = time_values
        self.stim_values = stim_values
        self.generate_stim_wave()
        if len(self.lights) > 0:
            self.generate_light_wave()
            self.generate_camera_wave()
            if config["Extend Signal"]:
                self.extend_light_wave()
        self.write_waveforms()

    def set_trigger(self, port):
        """Set the trigger port and activate it"""
        self.trigger_activated = True
        self.trigger_port = port

    def remove_trigger(self):
        """Deactivate the trigger"""
        self.trigger_activated = False

    def generate_stim_wave(self):
        """Create a stack of the stimulation signals and set the last values to zero"""
        self.stim_signal = np.stack((self.stim_values[:-1]))
        self.d_stim_signal = self.stim_values[-1]
        self.stim_signal[0][-1] = 0
        self.stim_signal[1][-1] = 0
        self.d_stim_signal[-1] = False

    def generate_light_wave(self):
        """Generate a light signal for each light used and set the last value to zero"""
        self.light_signals = []
        for potential_light_index in range(4):
            if potential_light_index < len(self.lights):
                signal = digital_square(
                    self.time_values,
                    self.framerate / len(self.lights),
                    self.framerate * self.exposure / len(self.lights),
                    int(potential_light_index * 3000 / (self.framerate)),
                )
                signal[-1] = False
                self.light_signals.append(signal)
        if len(self.light_signals) > 1:
            self.stacked_lights = np.stack((self.light_signals))
        else:
            self.stacked_lights = self.light_signals

    def generate_camera_wave(self):
        """Generate camera signal using the light signals and add it to the list of all signals"""
        try:
            self.camera_signal = np.max(np.vstack((self.stacked_lights)), axis=0)
        except ValueError:
            self.camera_signal = np.zeros(len(self.stim_signal[0]))
        self.all_signals = np.stack(self.light_signals + [self.camera_signal])
        self.allz_signals = np.stack(
            self.light_signals + [self.camera_signal] + [self.d_stim_signal]
        )

    def extend_light_wave(self):
        """Extend the light signal to be wider than the camera signal"""
        self.stacked_lights = extend_light_signal(
            self.stacked_lights, self.camera_signal
        )

    def write_waveforms(self):
        """Write lights, stimuli and camera signal to the DAQ"""

        if config["Widefield Computer"]:
            with nidaqmx.Task(new_task_name="lights") as l_task:
                self.control_task = l_task
                with nidaqmx.Task(new_task_name="a_stimuli") as s_task:
                    null_lights = [[False, False]]
                    self.tasks = [l_task, s_task]
                    for light in self.lights:
                        l_task.do_channels.add_do_chan(f"{self.name}/{light.port}")
                        null_lights.append([False, False])
                    if len(self.lights) > 0:
                        l_task.do_channels.add_do_chan(
                            f"{self.name}/{self.camera.port}"
                        )
                    for stimulus in self.stimuli:
                        if "ao0" in stimulus.port or "ao1" in stimulus.port:
                            s_task.ao_channels.add_ao_voltage_chan(
                                f"{self.name}/{stimulus.port}"
                            )
                        else:
                            l_task.do_channels.add_do_chan(
                                f"{self.name}/{stimulus.port}"
                            )
                            null_lights.append([False, False])
                    self.camera.initialize(self)
                    self.sample([s_task, l_task], self.stim_signal[0])
                    if len(self.lights) > 0:
                        self.write(
                            [s_task, l_task], [self.stim_signal, self.allz_signals]
                        )
                        self.camera.delete_frames()
                        if self.trigger_activated:
                            with nidaqmx.Task(new_task_name="trigger") as t_task:
                                t_task.di_channels.add_di_chan(
                                    (f"{self.name}/{self.trigger_port}")
                                )
                                while True:
                                    time.sleep(0.001)
                                    if t_task.read():
                                        break
                        self.start_time = time.time()
                        self.start([s_task, l_task])
                        self.camera.loop(l_task)
                        self.stop([s_task, l_task])
                        s_task.write([[0, 0], [0, 0]])
                        l_task.write(null_lights)
                        self.start([s_task, l_task])
                    else:
                        self.write(
                            [s_task, l_task], [self.stim_signal, self.d_stim_signal]
                        )
                        self.camera.delete_frames()
                        if self.trigger_activated:
                            with nidaqmx.Task(new_task_name="trigger") as t_task:
                                t_task.di_channels.add_di_chan(
                                    (f"{self.name}/{self.trigger_port}")
                                )
                                while True:
                                    time.sleep(0.001)
                                    if t_task.read():
                                        break
                        self.start_time = time.time()
                        self.start([s_task, l_task])
                        while (
                            s_task.is_task_done() is False and self.stop_signal is False
                        ):
                            time.sleep(0.01)
                            pass
                        self.stop([s_task, l_task])
                        s_task.write([[0, 0], [0, 0]])
                        l_task.write([False, False])
                        self.start([s_task, l_task])

        else:
            self.start_time = time.time()
            time.sleep(len(self.stim_signal[0]) / 3000)
            self.stop_signal = True
            pass

    def return_lights(self):
        """Return the lights used in the experiment
        
        Returns:
            list: List of lights used in the experiment"""
        lights = []
        for light in self.lights:
            lights.append(light.name)
        return lights

    def save(self, directory):
        """Save the light and stimulation data for each frame as a NPY file

        Args:
            directory (str): The directory in which to save the NPY file
        """
        try:
            indices = find_rising_indices(self.all_signals[-1])
            reduced_stack = reduce_stack(self.all_signals, indices)
            np.save(f"{directory}/{self.experiment_name}-light_signal", reduced_stack)
        except Exception as err:
            print(err)
        np.save(f"{directory}/{self.experiment_name}-stim_signal", self.stim_signal)

    def reset_daq(self):
        """Reset the DAQ parameters"""
        self.light_signals, self.stim_signal, self.camera_signal, self.time_values = (
            [],
            [],
            None,
            None,
        )

    def start(self, tasks):
        """Start each nidaqmx task in a list

        Args:
            tasks (list): A list of nidaqmx tasks
        """
        for task in tasks:
            task.start()

    def wait(self, tasks):
        """Wait for completion of nidaqmx tasks in a list

        Args:
            tasks (list): A list of nidaqmx tasks
        """
        for task in tasks:
            task.wait_until_done(timeout=1.5 * len(self.time_values) / 3000)

    def sample(self, tasks, signal):
        """Set the sampling rate for a list of nidaqmx tasks

        Args:
            tasks (list): A list of nidaqmx tasks
        """
        for task in tasks:
            task.timing.cfg_samp_clk_timing(
                3000,
                sample_mode=AcquisitionType.FINITE,
                samps_per_chan=len(signal),
            )

    def write(self, tasks, content):
        """Write a list of arrays to a list of nidaqmx tasks

        Args:
            tasks (list): A list of nidaqmx tasks
            content (list): A list of arrays to write
        """
        for i, task in enumerate(tasks):
            task.write(content[i])

    def stop(self, tasks):
        """Stop each nidaqmx task in a list

        Args:
            tasks (list): A list of nidaqmx tasks
        """
        for task in tasks:
            task.stop()

Classes

class Camera (port, name)

A class used to represent a physical camera connected to the computer by a framegrabber

Args

port : str
The name of the camera physical trigger port
name : str
The name of the camera (can be found using NI-MAX)
Expand source code
class Camera(Instrument):
    def __init__(self, port, name):
        """A class used to represent a physical camera connected to the computer by a framegrabber

        Args:
            port (str): The name of the camera physical trigger port
            name (str): The name of the camera (can be found using NI-MAX)
        """
        super().__init__(port, name)
        self.frames = []
        self.baseline_frames = []
        self.stop_signal = False
        self.frames_read = 0
        self.video_running = False
        try:
            self.set_binning(config["Binning"])
            self.cam = IMAQ.IMAQCamera("img0")
            self.set_window(config["Binning"])
            self.cam.setup_acquisition()
            self.cam.start_acquisition()
        except Exception as err:
            pass

    def initialize(self, daq):
        """Initialize / Reset the camera parameters

        Args:
            daq (DAQ): The associated DAQ instance
        """
        self.daq = daq
        self.daq.stop_signal = False
        self.frames = []
        self.frames_read_list = []
        self.baseline_read_list = []
        self.frames_read = 0

    def set_binning(self, binning):
        """Set the binning of the camera

        Args:
            binning (int): The binning factor
        """
        lines = []
        with open(
            "C:\\Users\\Public\\Documents\\National Instruments\\NI-IMAQ\\Data\\Dalsa 1M60.icd",
            "r",
        ) as read_file:
            for i, line in enumerate(read_file):
                if i == 17:
                    lines.append(
                        f"      MaxImageSize ({str(int(1024/binning))}, {str(int(1024/binning))})"
                    )
                elif i == 2041:
                    lines.append(
                        f"                           Current ({binning}x{binning})"
                    )
                else:
                    lines.append(line.strip("\n"))
            with open(
                "C:\\Users\\Public\\Documents\\National Instruments\\NI-IMAQ\\Data\\Dalsa 1M60.icd",
                "w",
            ) as write_file:
                write_file.write("\n".join(lines))

    def set_window(self, binning):
        """Set the window of the camera to the correct size for the binning factor"""
        self.cam.set_grabber_attribute_value(
            "IMG_ATTR_ACQWINDOW_HEIGHT", int(1024 / binning), kind="auto"
        )
        self.cam.set_grabber_attribute_value(
            "IMG_ATTR_ACQWINDOW_WIDTH", int(1024 / binning), kind="auto"
        )

    def delete_frames(self):
        """Read all frames in the buffer"""
        self.cam.read_multiple_images()

    def loop(self, task):
        """While camera is running, add each acquired frame to a frames list

        Args:
            task (Task): The nidaqmx task used to track if acquisition is finished
        """
        self.task = task
        while task.is_task_done() is False and self.daq.stop_signal is False:
            try:
                self.cam.wait_for_frame(timeout=0.1)
                new_frames = self.cam.read_multiple_images()
                self.frames += new_frames
                self.video_running = True
                if self.adding_frames:
                    self.baseline_data += new_frames
                    self.frames_read_list.append(self.frames_read)
                if self.baseline_completed:
                    self.baseline_frames += new_frames
                    self.baseline_read_list.append(self.frames_read)
                self.frames_read += len(new_frames)
            except Exception as err:
                print("cam err")
                print(err)
                pass
        self.frames += self.cam.read_multiple_images()
        self.video_running = False

    def save(self, directory, extents):
        """Save the acquired frames (reduced if necessary) to a 3D NPY file

        Args:
            directory (str): The location in which to save the NPY file
            extents (tuple): The positions of the corners used to resize the frames
                             Equal to None if original size is kept
        """
        try:
            if extents:
                self.frames = shrink_array(self.frames, extents)
            if self.is_saving:
                while self.is_saving:
                    print("is saving")
                    pass
            print(len(self.frames))
            np.save(
                os.path.join(directory, "data", f"{self.file_index}.npy"), self.frames
            )
        except Exception as err:
            print("cam save err")
            print(err)
            pass

Ancestors

Methods

def delete_frames(self)

Read all frames in the buffer

Expand source code
def delete_frames(self):
    """Read all frames in the buffer"""
    self.cam.read_multiple_images()
def initialize(self, daq)

Initialize / Reset the camera parameters

Args

daq : DAQ
The associated DAQ instance
Expand source code
def initialize(self, daq):
    """Initialize / Reset the camera parameters

    Args:
        daq (DAQ): The associated DAQ instance
    """
    self.daq = daq
    self.daq.stop_signal = False
    self.frames = []
    self.frames_read_list = []
    self.baseline_read_list = []
    self.frames_read = 0
def loop(self, task)

While camera is running, add each acquired frame to a frames list

Args

task : Task
The nidaqmx task used to track if acquisition is finished
Expand source code
def loop(self, task):
    """While camera is running, add each acquired frame to a frames list

    Args:
        task (Task): The nidaqmx task used to track if acquisition is finished
    """
    self.task = task
    while task.is_task_done() is False and self.daq.stop_signal is False:
        try:
            self.cam.wait_for_frame(timeout=0.1)
            new_frames = self.cam.read_multiple_images()
            self.frames += new_frames
            self.video_running = True
            if self.adding_frames:
                self.baseline_data += new_frames
                self.frames_read_list.append(self.frames_read)
            if self.baseline_completed:
                self.baseline_frames += new_frames
                self.baseline_read_list.append(self.frames_read)
            self.frames_read += len(new_frames)
        except Exception as err:
            print("cam err")
            print(err)
            pass
    self.frames += self.cam.read_multiple_images()
    self.video_running = False
def save(self, directory, extents)

Save the acquired frames (reduced if necessary) to a 3D NPY file

Args

directory : str
The location in which to save the NPY file
extents : tuple
The positions of the corners used to resize the frames Equal to None if original size is kept
Expand source code
def save(self, directory, extents):
    """Save the acquired frames (reduced if necessary) to a 3D NPY file

    Args:
        directory (str): The location in which to save the NPY file
        extents (tuple): The positions of the corners used to resize the frames
                         Equal to None if original size is kept
    """
    try:
        if extents:
            self.frames = shrink_array(self.frames, extents)
        if self.is_saving:
            while self.is_saving:
                print("is saving")
                pass
        print(len(self.frames))
        np.save(
            os.path.join(directory, "data", f"{self.file_index}.npy"), self.frames
        )
    except Exception as err:
        print("cam save err")
        print(err)
        pass
def set_binning(self, binning)

Set the binning of the camera

Args

binning : int
The binning factor
Expand source code
def set_binning(self, binning):
    """Set the binning of the camera

    Args:
        binning (int): The binning factor
    """
    lines = []
    with open(
        "C:\\Users\\Public\\Documents\\National Instruments\\NI-IMAQ\\Data\\Dalsa 1M60.icd",
        "r",
    ) as read_file:
        for i, line in enumerate(read_file):
            if i == 17:
                lines.append(
                    f"      MaxImageSize ({str(int(1024/binning))}, {str(int(1024/binning))})"
                )
            elif i == 2041:
                lines.append(
                    f"                           Current ({binning}x{binning})"
                )
            else:
                lines.append(line.strip("\n"))
        with open(
            "C:\\Users\\Public\\Documents\\National Instruments\\NI-IMAQ\\Data\\Dalsa 1M60.icd",
            "w",
        ) as write_file:
            write_file.write("\n".join(lines))
def set_window(self, binning)

Set the window of the camera to the correct size for the binning factor

Expand source code
def set_window(self, binning):
    """Set the window of the camera to the correct size for the binning factor"""
    self.cam.set_grabber_attribute_value(
        "IMG_ATTR_ACQWINDOW_HEIGHT", int(1024 / binning), kind="auto"
    )
    self.cam.set_grabber_attribute_value(
        "IMG_ATTR_ACQWINDOW_WIDTH", int(1024 / binning), kind="auto"
    )
class DAQ (name, lights, stimuli, camera, framerate, exposure)

A class used to represent a Data Acquisition Device (DAQ)

Args

name : str
The name of the DAQ
lights : list of Instrument
A list of the lights used in the experiment
stimuli : list of Instrument
A list of the stimuli used in the experiment
camera : Camera
The camera used in the experiment
framerate : int
The acquisition framerate
exposure : float
The exposure time in seconds
Expand source code
class DAQ:
    def __init__(self, name, lights, stimuli, camera, framerate, exposure):
        """A class used to represent a Data Acquisition Device (DAQ)

        Args:
            name (str): The name of the DAQ
            lights (list of Instrument): A list of the lights used in the experiment
            stimuli (list of Instrument): A list of the stimuli used in the experiment
            camera (Camera): The camera used in the experiment
            framerate (int): The acquisition framerate
            exposure (float): The exposure time in seconds
        """
        self.name = name
        self.trigger_activated = False
        self.trigger_port = None
        self.framerate, self.exposure = framerate, exposure
        self.stop_signal = False
        self.lights, self.stimuli, self.camera = lights, stimuli, camera
        self.tasks, self.light_signals, self.stim_signal, self.camera_signal = (
            [],
            [],
            [],
            None,
        )

    def close_all_lights(self, ports):
        self.lights = [
            Instrument(ports["infrared"], "ir"),
            Instrument(ports["red"], "red"),
            Instrument(ports["green"], "green"),
            Instrument(ports["blue"], "blue"),
        ]

        if config["Widefield Computer"]:
            with nidaqmx.Task(new_task_name="lights") as l_task:
                with nidaqmx.Task(new_task_name="a_stimuli") as s_task:
                    for light in self.lights:
                        l_task.do_channels.add_do_chan(f"{self.name}/{light.port}")
                    l_task.do_channels.add_do_chan(f"{self.name}/{self.camera.port}")
                    for stimulus in self.stimuli:
                        if "ao0" in stimulus.port or "ao1" in stimulus.port:
                            s_task.ao_channels.add_ao_voltage_chan(
                                f"{self.name}/{stimulus.port}"
                            )
                        else:
                            l_task.do_channels.add_do_chan(
                                f"{self.name}/{stimulus.port}"
                            )
                    self.sample([s_task, l_task], [False, False])
                    s_task.write([[0, 0], [0, 0]])
                    l_task.write(
                        [
                            [False, False],
                            [False, False],
                            [False, False],
                            [False, False],
                            [False, False],
                            [False, False],
                        ]
                    )
                    self.start([s_task, l_task])

        else:
            time.sleep(2)
            self.stop_signal = True
            pass

        self.lights = []

    def launch(self, name, time_values, stim_values):
        """Generate stimulation, light and camera signal and write them to the DAQ

        Args:
            name (str): The name of the experiment
            time_values (array): A array containing the time values
            stim_values (array): A array containing the stimulation values
        """
        self.reset_daq()
        self.experiment_name = name
        self.time_values = time_values
        self.stim_values = stim_values
        self.generate_stim_wave()
        if len(self.lights) > 0:
            self.generate_light_wave()
            self.generate_camera_wave()
            if config["Extend Signal"]:
                self.extend_light_wave()
        self.write_waveforms()

    def set_trigger(self, port):
        """Set the trigger port and activate it"""
        self.trigger_activated = True
        self.trigger_port = port

    def remove_trigger(self):
        """Deactivate the trigger"""
        self.trigger_activated = False

    def generate_stim_wave(self):
        """Create a stack of the stimulation signals and set the last values to zero"""
        self.stim_signal = np.stack((self.stim_values[:-1]))
        self.d_stim_signal = self.stim_values[-1]
        self.stim_signal[0][-1] = 0
        self.stim_signal[1][-1] = 0
        self.d_stim_signal[-1] = False

    def generate_light_wave(self):
        """Generate a light signal for each light used and set the last value to zero"""
        self.light_signals = []
        for potential_light_index in range(4):
            if potential_light_index < len(self.lights):
                signal = digital_square(
                    self.time_values,
                    self.framerate / len(self.lights),
                    self.framerate * self.exposure / len(self.lights),
                    int(potential_light_index * 3000 / (self.framerate)),
                )
                signal[-1] = False
                self.light_signals.append(signal)
        if len(self.light_signals) > 1:
            self.stacked_lights = np.stack((self.light_signals))
        else:
            self.stacked_lights = self.light_signals

    def generate_camera_wave(self):
        """Generate camera signal using the light signals and add it to the list of all signals"""
        try:
            self.camera_signal = np.max(np.vstack((self.stacked_lights)), axis=0)
        except ValueError:
            self.camera_signal = np.zeros(len(self.stim_signal[0]))
        self.all_signals = np.stack(self.light_signals + [self.camera_signal])
        self.allz_signals = np.stack(
            self.light_signals + [self.camera_signal] + [self.d_stim_signal]
        )

    def extend_light_wave(self):
        """Extend the light signal to be wider than the camera signal"""
        self.stacked_lights = extend_light_signal(
            self.stacked_lights, self.camera_signal
        )

    def write_waveforms(self):
        """Write lights, stimuli and camera signal to the DAQ"""

        if config["Widefield Computer"]:
            with nidaqmx.Task(new_task_name="lights") as l_task:
                self.control_task = l_task
                with nidaqmx.Task(new_task_name="a_stimuli") as s_task:
                    null_lights = [[False, False]]
                    self.tasks = [l_task, s_task]
                    for light in self.lights:
                        l_task.do_channels.add_do_chan(f"{self.name}/{light.port}")
                        null_lights.append([False, False])
                    if len(self.lights) > 0:
                        l_task.do_channels.add_do_chan(
                            f"{self.name}/{self.camera.port}"
                        )
                    for stimulus in self.stimuli:
                        if "ao0" in stimulus.port or "ao1" in stimulus.port:
                            s_task.ao_channels.add_ao_voltage_chan(
                                f"{self.name}/{stimulus.port}"
                            )
                        else:
                            l_task.do_channels.add_do_chan(
                                f"{self.name}/{stimulus.port}"
                            )
                            null_lights.append([False, False])
                    self.camera.initialize(self)
                    self.sample([s_task, l_task], self.stim_signal[0])
                    if len(self.lights) > 0:
                        self.write(
                            [s_task, l_task], [self.stim_signal, self.allz_signals]
                        )
                        self.camera.delete_frames()
                        if self.trigger_activated:
                            with nidaqmx.Task(new_task_name="trigger") as t_task:
                                t_task.di_channels.add_di_chan(
                                    (f"{self.name}/{self.trigger_port}")
                                )
                                while True:
                                    time.sleep(0.001)
                                    if t_task.read():
                                        break
                        self.start_time = time.time()
                        self.start([s_task, l_task])
                        self.camera.loop(l_task)
                        self.stop([s_task, l_task])
                        s_task.write([[0, 0], [0, 0]])
                        l_task.write(null_lights)
                        self.start([s_task, l_task])
                    else:
                        self.write(
                            [s_task, l_task], [self.stim_signal, self.d_stim_signal]
                        )
                        self.camera.delete_frames()
                        if self.trigger_activated:
                            with nidaqmx.Task(new_task_name="trigger") as t_task:
                                t_task.di_channels.add_di_chan(
                                    (f"{self.name}/{self.trigger_port}")
                                )
                                while True:
                                    time.sleep(0.001)
                                    if t_task.read():
                                        break
                        self.start_time = time.time()
                        self.start([s_task, l_task])
                        while (
                            s_task.is_task_done() is False and self.stop_signal is False
                        ):
                            time.sleep(0.01)
                            pass
                        self.stop([s_task, l_task])
                        s_task.write([[0, 0], [0, 0]])
                        l_task.write([False, False])
                        self.start([s_task, l_task])

        else:
            self.start_time = time.time()
            time.sleep(len(self.stim_signal[0]) / 3000)
            self.stop_signal = True
            pass

    def return_lights(self):
        """Return the lights used in the experiment
        
        Returns:
            list: List of lights used in the experiment"""
        lights = []
        for light in self.lights:
            lights.append(light.name)
        return lights

    def save(self, directory):
        """Save the light and stimulation data for each frame as a NPY file

        Args:
            directory (str): The directory in which to save the NPY file
        """
        try:
            indices = find_rising_indices(self.all_signals[-1])
            reduced_stack = reduce_stack(self.all_signals, indices)
            np.save(f"{directory}/{self.experiment_name}-light_signal", reduced_stack)
        except Exception as err:
            print(err)
        np.save(f"{directory}/{self.experiment_name}-stim_signal", self.stim_signal)

    def reset_daq(self):
        """Reset the DAQ parameters"""
        self.light_signals, self.stim_signal, self.camera_signal, self.time_values = (
            [],
            [],
            None,
            None,
        )

    def start(self, tasks):
        """Start each nidaqmx task in a list

        Args:
            tasks (list): A list of nidaqmx tasks
        """
        for task in tasks:
            task.start()

    def wait(self, tasks):
        """Wait for completion of nidaqmx tasks in a list

        Args:
            tasks (list): A list of nidaqmx tasks
        """
        for task in tasks:
            task.wait_until_done(timeout=1.5 * len(self.time_values) / 3000)

    def sample(self, tasks, signal):
        """Set the sampling rate for a list of nidaqmx tasks

        Args:
            tasks (list): A list of nidaqmx tasks
        """
        for task in tasks:
            task.timing.cfg_samp_clk_timing(
                3000,
                sample_mode=AcquisitionType.FINITE,
                samps_per_chan=len(signal),
            )

    def write(self, tasks, content):
        """Write a list of arrays to a list of nidaqmx tasks

        Args:
            tasks (list): A list of nidaqmx tasks
            content (list): A list of arrays to write
        """
        for i, task in enumerate(tasks):
            task.write(content[i])

    def stop(self, tasks):
        """Stop each nidaqmx task in a list

        Args:
            tasks (list): A list of nidaqmx tasks
        """
        for task in tasks:
            task.stop()

Methods

def close_all_lights(self, ports)
Expand source code
def close_all_lights(self, ports):
    self.lights = [
        Instrument(ports["infrared"], "ir"),
        Instrument(ports["red"], "red"),
        Instrument(ports["green"], "green"),
        Instrument(ports["blue"], "blue"),
    ]

    if config["Widefield Computer"]:
        with nidaqmx.Task(new_task_name="lights") as l_task:
            with nidaqmx.Task(new_task_name="a_stimuli") as s_task:
                for light in self.lights:
                    l_task.do_channels.add_do_chan(f"{self.name}/{light.port}")
                l_task.do_channels.add_do_chan(f"{self.name}/{self.camera.port}")
                for stimulus in self.stimuli:
                    if "ao0" in stimulus.port or "ao1" in stimulus.port:
                        s_task.ao_channels.add_ao_voltage_chan(
                            f"{self.name}/{stimulus.port}"
                        )
                    else:
                        l_task.do_channels.add_do_chan(
                            f"{self.name}/{stimulus.port}"
                        )
                self.sample([s_task, l_task], [False, False])
                s_task.write([[0, 0], [0, 0]])
                l_task.write(
                    [
                        [False, False],
                        [False, False],
                        [False, False],
                        [False, False],
                        [False, False],
                        [False, False],
                    ]
                )
                self.start([s_task, l_task])

    else:
        time.sleep(2)
        self.stop_signal = True
        pass

    self.lights = []
def extend_light_wave(self)

Extend the light signal to be wider than the camera signal

Expand source code
def extend_light_wave(self):
    """Extend the light signal to be wider than the camera signal"""
    self.stacked_lights = extend_light_signal(
        self.stacked_lights, self.camera_signal
    )
def generate_camera_wave(self)

Generate camera signal using the light signals and add it to the list of all signals

Expand source code
def generate_camera_wave(self):
    """Generate camera signal using the light signals and add it to the list of all signals"""
    try:
        self.camera_signal = np.max(np.vstack((self.stacked_lights)), axis=0)
    except ValueError:
        self.camera_signal = np.zeros(len(self.stim_signal[0]))
    self.all_signals = np.stack(self.light_signals + [self.camera_signal])
    self.allz_signals = np.stack(
        self.light_signals + [self.camera_signal] + [self.d_stim_signal]
    )
def generate_light_wave(self)

Generate a light signal for each light used and set the last value to zero

Expand source code
def generate_light_wave(self):
    """Generate a light signal for each light used and set the last value to zero"""
    self.light_signals = []
    for potential_light_index in range(4):
        if potential_light_index < len(self.lights):
            signal = digital_square(
                self.time_values,
                self.framerate / len(self.lights),
                self.framerate * self.exposure / len(self.lights),
                int(potential_light_index * 3000 / (self.framerate)),
            )
            signal[-1] = False
            self.light_signals.append(signal)
    if len(self.light_signals) > 1:
        self.stacked_lights = np.stack((self.light_signals))
    else:
        self.stacked_lights = self.light_signals
def generate_stim_wave(self)

Create a stack of the stimulation signals and set the last values to zero

Expand source code
def generate_stim_wave(self):
    """Create a stack of the stimulation signals and set the last values to zero"""
    self.stim_signal = np.stack((self.stim_values[:-1]))
    self.d_stim_signal = self.stim_values[-1]
    self.stim_signal[0][-1] = 0
    self.stim_signal[1][-1] = 0
    self.d_stim_signal[-1] = False
def launch(self, name, time_values, stim_values)

Generate stimulation, light and camera signal and write them to the DAQ

Args

name : str
The name of the experiment
time_values : array
A array containing the time values
stim_values : array
A array containing the stimulation values
Expand source code
def launch(self, name, time_values, stim_values):
    """Generate stimulation, light and camera signal and write them to the DAQ

    Args:
        name (str): The name of the experiment
        time_values (array): A array containing the time values
        stim_values (array): A array containing the stimulation values
    """
    self.reset_daq()
    self.experiment_name = name
    self.time_values = time_values
    self.stim_values = stim_values
    self.generate_stim_wave()
    if len(self.lights) > 0:
        self.generate_light_wave()
        self.generate_camera_wave()
        if config["Extend Signal"]:
            self.extend_light_wave()
    self.write_waveforms()
def remove_trigger(self)

Deactivate the trigger

Expand source code
def remove_trigger(self):
    """Deactivate the trigger"""
    self.trigger_activated = False
def reset_daq(self)

Reset the DAQ parameters

Expand source code
def reset_daq(self):
    """Reset the DAQ parameters"""
    self.light_signals, self.stim_signal, self.camera_signal, self.time_values = (
        [],
        [],
        None,
        None,
    )
def return_lights(self)

Return the lights used in the experiment

Returns

list
List of lights used in the experiment
Expand source code
def return_lights(self):
    """Return the lights used in the experiment
    
    Returns:
        list: List of lights used in the experiment"""
    lights = []
    for light in self.lights:
        lights.append(light.name)
    return lights
def sample(self, tasks, signal)

Set the sampling rate for a list of nidaqmx tasks

Args

tasks : list
A list of nidaqmx tasks
Expand source code
def sample(self, tasks, signal):
    """Set the sampling rate for a list of nidaqmx tasks

    Args:
        tasks (list): A list of nidaqmx tasks
    """
    for task in tasks:
        task.timing.cfg_samp_clk_timing(
            3000,
            sample_mode=AcquisitionType.FINITE,
            samps_per_chan=len(signal),
        )
def save(self, directory)

Save the light and stimulation data for each frame as a NPY file

Args

directory : str
The directory in which to save the NPY file
Expand source code
def save(self, directory):
    """Save the light and stimulation data for each frame as a NPY file

    Args:
        directory (str): The directory in which to save the NPY file
    """
    try:
        indices = find_rising_indices(self.all_signals[-1])
        reduced_stack = reduce_stack(self.all_signals, indices)
        np.save(f"{directory}/{self.experiment_name}-light_signal", reduced_stack)
    except Exception as err:
        print(err)
    np.save(f"{directory}/{self.experiment_name}-stim_signal", self.stim_signal)
def set_trigger(self, port)

Set the trigger port and activate it

Expand source code
def set_trigger(self, port):
    """Set the trigger port and activate it"""
    self.trigger_activated = True
    self.trigger_port = port
def start(self, tasks)

Start each nidaqmx task in a list

Args

tasks : list
A list of nidaqmx tasks
Expand source code
def start(self, tasks):
    """Start each nidaqmx task in a list

    Args:
        tasks (list): A list of nidaqmx tasks
    """
    for task in tasks:
        task.start()
def stop(self, tasks)

Stop each nidaqmx task in a list

Args

tasks : list
A list of nidaqmx tasks
Expand source code
def stop(self, tasks):
    """Stop each nidaqmx task in a list

    Args:
        tasks (list): A list of nidaqmx tasks
    """
    for task in tasks:
        task.stop()
def wait(self, tasks)

Wait for completion of nidaqmx tasks in a list

Args

tasks : list
A list of nidaqmx tasks
Expand source code
def wait(self, tasks):
    """Wait for completion of nidaqmx tasks in a list

    Args:
        tasks (list): A list of nidaqmx tasks
    """
    for task in tasks:
        task.wait_until_done(timeout=1.5 * len(self.time_values) / 3000)
def write(self, tasks, content)

Write a list of arrays to a list of nidaqmx tasks

Args

tasks : list
A list of nidaqmx tasks
content : list
A list of arrays to write
Expand source code
def write(self, tasks, content):
    """Write a list of arrays to a list of nidaqmx tasks

    Args:
        tasks (list): A list of nidaqmx tasks
        content (list): A list of arrays to write
    """
    for i, task in enumerate(tasks):
        task.write(content[i])
def write_waveforms(self)

Write lights, stimuli and camera signal to the DAQ

Expand source code
def write_waveforms(self):
    """Write lights, stimuli and camera signal to the DAQ"""

    if config["Widefield Computer"]:
        with nidaqmx.Task(new_task_name="lights") as l_task:
            self.control_task = l_task
            with nidaqmx.Task(new_task_name="a_stimuli") as s_task:
                null_lights = [[False, False]]
                self.tasks = [l_task, s_task]
                for light in self.lights:
                    l_task.do_channels.add_do_chan(f"{self.name}/{light.port}")
                    null_lights.append([False, False])
                if len(self.lights) > 0:
                    l_task.do_channels.add_do_chan(
                        f"{self.name}/{self.camera.port}"
                    )
                for stimulus in self.stimuli:
                    if "ao0" in stimulus.port or "ao1" in stimulus.port:
                        s_task.ao_channels.add_ao_voltage_chan(
                            f"{self.name}/{stimulus.port}"
                        )
                    else:
                        l_task.do_channels.add_do_chan(
                            f"{self.name}/{stimulus.port}"
                        )
                        null_lights.append([False, False])
                self.camera.initialize(self)
                self.sample([s_task, l_task], self.stim_signal[0])
                if len(self.lights) > 0:
                    self.write(
                        [s_task, l_task], [self.stim_signal, self.allz_signals]
                    )
                    self.camera.delete_frames()
                    if self.trigger_activated:
                        with nidaqmx.Task(new_task_name="trigger") as t_task:
                            t_task.di_channels.add_di_chan(
                                (f"{self.name}/{self.trigger_port}")
                            )
                            while True:
                                time.sleep(0.001)
                                if t_task.read():
                                    break
                    self.start_time = time.time()
                    self.start([s_task, l_task])
                    self.camera.loop(l_task)
                    self.stop([s_task, l_task])
                    s_task.write([[0, 0], [0, 0]])
                    l_task.write(null_lights)
                    self.start([s_task, l_task])
                else:
                    self.write(
                        [s_task, l_task], [self.stim_signal, self.d_stim_signal]
                    )
                    self.camera.delete_frames()
                    if self.trigger_activated:
                        with nidaqmx.Task(new_task_name="trigger") as t_task:
                            t_task.di_channels.add_di_chan(
                                (f"{self.name}/{self.trigger_port}")
                            )
                            while True:
                                time.sleep(0.001)
                                if t_task.read():
                                    break
                    self.start_time = time.time()
                    self.start([s_task, l_task])
                    while (
                        s_task.is_task_done() is False and self.stop_signal is False
                    ):
                        time.sleep(0.01)
                        pass
                    self.stop([s_task, l_task])
                    s_task.write([[0, 0], [0, 0]])
                    l_task.write([False, False])
                    self.start([s_task, l_task])

    else:
        self.start_time = time.time()
        time.sleep(len(self.stim_signal[0]) / 3000)
        self.stop_signal = True
        pass
class Instrument (port, name)

A class used to represent a analog or digital instrument controlled by a DAQ

Args

port : str
The associated physical port
name : str
The name of the instrument
Expand source code
class Instrument:
    def __init__(self, port, name):
        """A class used to represent a analog or digital instrument controlled by a DAQ

        Args:
            port (str): The associated physical port
            name (str): The name of the instrument
        """
        self.port = port
        self.name = name

Subclasses