Skip to content
Snippets Groups Projects
standard_magics.py 17.1 KiB
Newer Older
Simone Vadilonga's avatar
Simone Vadilonga committed
import asyncio
from bluesky import RunEngine, RunEngineInterrupted
from IPython.core.magic import Magics, magics_class, line_magic
from datetime import datetime
import bluesky.plans as bp
import bluesky.plan_stubs as bps

try:
    # cytools is a drop-in replacement for toolz, implemented in Cython
    from cytoolz import partition
except ImportError:
    from toolz import partition


Simone Vadilonga's avatar
Simone Vadilonga committed
from bluesky.magics import _ct_callback, get_labeled_devices
from bluesky.magics import MetaclassForClassProperties
Simone Vadilonga's avatar
Simone Vadilonga committed

Simone Vadilonga's avatar
Simone Vadilonga committed
from beamlinetools.utils.pbar_bessy import ProgressBarManager
from beamlinetools.magics.standard_magics_utils import color_generator
Simone Vadilonga's avatar
Simone Vadilonga committed


@magics_class
class BlueskyMagicsBessy(Magics, metaclass=MetaclassForClassProperties):
    """
Simone Vadilonga's avatar
Simone Vadilonga committed
    IPython magics for bluesky at BESSYII.
Simone Vadilonga's avatar
Simone Vadilonga committed

Simone Vadilonga's avatar
Simone Vadilonga committed
    Usage:
Simone Vadilonga's avatar
Simone Vadilonga committed

Simone Vadilonga's avatar
Simone Vadilonga committed
    label_axis_dict = {
                        # "dcm": ["dcm.monoz.user_readback", 
                        #        ]
                        }
    get_ipython().register_magics(BlueskyMagicsBessy(RE, get_ipython(), database_name ="db", exclude_labels_from_wa =['detectors'], label_axis_dict=label_axis_dict))
Simone Vadilonga's avatar
Simone Vadilonga committed

Simone Vadilonga's avatar
Simone Vadilonga committed
    """
    def __init__(self, RE, shell, label_axis_dict={}, exclude_labels_from_wa=['detectors'], database_name='db', **kwargs) -> None:
        """
        Initializes a BlueskyMagicsBessy instance.
Simone Vadilonga's avatar
Simone Vadilonga committed

Simone Vadilonga's avatar
Simone Vadilonga committed
        Args:
            RE: A RunEngine instance.
            shell: IPython shell instance.
            label_axis_dict: A dictionary mapping labels to lists of axis names.
            database_name: Name of the database.
            **kwargs: Additional keyword arguments.
Simone Vadilonga's avatar
Simone Vadilonga committed

Simone Vadilonga's avatar
Simone Vadilonga committed
        Returns:
            None
        """
Simone Vadilonga's avatar
Simone Vadilonga committed
        super().__init__( **kwargs)
        self.RE = RE
        self.shell = shell
Simone Vadilonga's avatar
Simone Vadilonga committed
        self.exclude_labels = exclude_labels_from_wa
Simone Vadilonga's avatar
Simone Vadilonga committed
        self.decimals = 4
        self.pbar_manager = ProgressBarManager()
        self.color_gen = color_generator()
        self.label_axis_dict = label_axis_dict
        self.database_name = database_name
Simone Vadilonga's avatar
Simone Vadilonga committed
        self.plotted_lines = []

Simone Vadilonga's avatar
Simone Vadilonga committed
    
    def _ensure_idle(self):
Simone Vadilonga's avatar
Simone Vadilonga committed
        """
        Ensure that the RunEngine is in the 'idle' state.

        If the RunEngine state is not 'idle', it aborts the current run.

        Returns:
            None
        """
Simone Vadilonga's avatar
Simone Vadilonga committed
        if self.RE.state != 'idle':
            print('The RunEngine invoked by magics cannot be resumed.')
            print('Aborting...')
            self.RE.abort()
    
    def _get_device_from_ns(self, device_name):
        """
Simone Vadilonga's avatar
Simone Vadilonga committed
        Get a device object from the user namespace.

        Args:
            device_name (str): The name of the device.

        Returns:
            Any: The device object.
        
        Raises:
            ValueError: If the device is not found in the user namespace.
Simone Vadilonga's avatar
Simone Vadilonga committed
        """
        if "_" in device_name:
            device_name = device_name.replace("_",".")
        try:
            dev = eval(device_name, self.shell.user_ns)
            if dev is not None:
                return dev
            else:
                raise ValueError(f"Device {device_name} is {dev }")

        except KeyError:
            raise ValueError(f"Device {device_name} not found in user namespace.")

    def find_motor_detector(self):
Simone Vadilonga's avatar
Simone Vadilonga committed
        """
        Find the motor and detector from the most recent run.

        Returns:
            tuple: A tuple containing the motor and detector objects.
        """
Simone Vadilonga's avatar
Simone Vadilonga committed
        db        = eval(self.database_name, self.shell.user_ns)
        run       = db[-1]
        detector  = run.metadata['start']['detectors']
        motor     = run.metadata['start']['motors'][0]
        return motor, detector
    
    def get_motor_position(self, motor):
Simone Vadilonga's avatar
Simone Vadilonga committed
        """
        Get the position of a motor.

        Args:
            motor (str): The name of the motor.

        Returns:
            float: The position of the motor.

        Raises:
            NotImplementedError: If the position cannot be obtained using both primary and alternative methods.
        """
Simone Vadilonga's avatar
Simone Vadilonga committed
        motor = self._get_device_from_ns(motor)
        try:
            motor_pos = motor.readback.get()
        except Exception as e:
            pass
            #print(f'Exception attempting primary method: {e}')
            try:
                # If the primary method fails, try an alternative method
                motor_pos = motor.user_readback.get()
            except Exception as e:
                # Handle failure of the alternative method
                print(f'Exception attempting alternative method: {e}')
                raise NotImplementedError('Unable to find the motor position using both primary and alternative methods')
        return motor_pos

    def plot_motors_current_pos(self):
Simone Vadilonga's avatar
Simone Vadilonga committed
        """
        Plot the current positions of motors.

        Returns:
            tuple: A tuple containing the motor and its current position.
        """
Simone Vadilonga's avatar
Simone Vadilonga committed
        bec        = self.shell.user_ns.get('bec')
        live_plots = bec._live_plots
        live_plot_key = list(live_plots.keys())[0]
        live_plot_dets = live_plots[live_plot_key].keys()
        motor, detectors = self.find_motor_detector()
        x_motor = self.get_motor_position(motor)
        color = next(self.color_gen)
Simone Vadilonga's avatar
Simone Vadilonga committed
        if len(self.plotted_lines)>0:
            for l in self.plotted_lines:
                l.remove()
        self.plotted_lines=[]
Simone Vadilonga's avatar
Simone Vadilonga committed
        for det in live_plot_dets:
            live_plot = live_plots[live_plot_key][det]
Simone Vadilonga's avatar
Simone Vadilonga committed
            line = live_plot.ax.axvline(x=x_motor,ymin=-1e30,ymax=+1e30, 
Simone Vadilonga's avatar
Simone Vadilonga committed
                                 color=color, linestyle='dashed')
Simone Vadilonga's avatar
Simone Vadilonga committed
            self.plotted_lines.append(line)

Simone Vadilonga's avatar
Simone Vadilonga committed
        return motor, x_motor
    
    def find_motor_and_positon(self,position):
Simone Vadilonga's avatar
Simone Vadilonga committed
        """
        Find the motor and its position.

        Args:
            position (str): The position identifier ('min', 'max', 'pic', 'cen', 'com').

        Returns:
            tuple: A tuple containing the motor name and its position.

        Raises:
            None
        """
Simone Vadilonga's avatar
Simone Vadilonga committed
        db        = eval(self.database_name, self.shell.user_ns)
        run       = db[-1]
        detector  = run.metadata['start']['detectors'][0]
        motor     = run.metadata['start']['motors'][0]
        bec        = self.shell.user_ns.get('bec')
        peak_dict = bec.peaks
        mot_pos   = peak_dict[position][detector]
        if "_" in motor:
            motor = motor.replace("_",".")
        if position not in ['cen','com']:
            mot_pos = mot_pos[0]
        return motor, mot_pos
    
    def move_to_pos(self, motor, motor_position):
Simone Vadilonga's avatar
Simone Vadilonga committed
        """
        Move the motor to the specified position.

        Args:
            motor (str): The name of the motor to move.
            motor_position (float): The position to move the motor to.

        Returns:
            None

        Raises:
            None
        """
Simone Vadilonga's avatar
Simone Vadilonga committed
        plan = bps.mv(motor,motor_position)
        self.RE.waiting_hook = self.pbar_manager
        try:
            self.RE(plan)
        except RunEngineInterrupted:
            ...
        self.RE.waiting_hook = None
        self._ensure_idle()

    @line_magic
    def where(self, line):
Simone Vadilonga's avatar
Simone Vadilonga committed
        """
        Print the current position of the motor, and plot it.

        Args:
            line (str): Not used.

        Returns:
            None
        """
Simone Vadilonga's avatar
Simone Vadilonga committed
        motor, x_motor = self.plot_motors_current_pos()
        print(f'The motor: {motor} is at the position {x_motor}')

    @line_magic
    def cen(self, line):
Simone Vadilonga's avatar
Simone Vadilonga committed
        """
        Move the motor to the center position, and plot it..

        Args:
            line (str): Not used.

        Returns:
            None
        """
Simone Vadilonga's avatar
Simone Vadilonga committed
        motor_and_axis, mot_pos = self.find_motor_and_positon('cen')
        mot = self._get_device_from_ns(motor_and_axis)
        self.move_to_pos(mot, mot_pos)
        self.plot_motors_current_pos()
        print('Moved motor', motor_and_axis, 'to position', mot_pos)


    @line_magic
    def pic(self, line):
Simone Vadilonga's avatar
Simone Vadilonga committed
        """
        Move the motor to the max position, and plot it.

        Args:
            line (str): Not used.

        Returns:
            None
        """
Simone Vadilonga's avatar
Simone Vadilonga committed
        motor_and_axis, mot_pos = self.find_motor_and_positon('max')
        mot = self._get_device_from_ns(motor_and_axis)
        self.move_to_pos(mot, mot_pos)
        self.plot_motors_current_pos()
        print('Moved motor', motor_and_axis, 'to position', mot_pos)

    @line_magic
    def com(self, line):
Simone Vadilonga's avatar
Simone Vadilonga committed
        """
        Move the motor to the center of mass position, and plot it.

        Args:
            line (str): Not used.

        Returns:
            None
        """
Simone Vadilonga's avatar
Simone Vadilonga committed
        motor_and_axis, mot_pos = self.find_motor_and_positon('com')
        mot = self._get_device_from_ns(motor_and_axis)
        self.move_to_pos(mot, mot_pos)
        self.plot_motors_current_pos()
        print('Moved motor', motor_and_axis, 'to position', mot_pos)

    @line_magic
    def minimum(self, line):
Simone Vadilonga's avatar
Simone Vadilonga committed
        """
        Move the motor to the minimum position, and plot it.

        Args:
            line (str): Not used.

        Returns:
            None
        """
Simone Vadilonga's avatar
Simone Vadilonga committed
        motor_and_axis, mot_pos = self.find_motor_and_positon('min')
        mot = self._get_device_from_ns(motor_and_axis)
        self.move_to_pos(mot, mot_pos)
        self.plot_motors_current_pos()
        print('Moved motor', motor_and_axis, 'to position', mot_pos)

    @line_magic
    def mov(self, line):
Simone Vadilonga's avatar
Simone Vadilonga committed
        """
        Move the motor(s) to the specified position(s).

        Args:
            line (str): The line containing motor and position pairs.

        Returns:
            None
        """
Simone Vadilonga's avatar
Simone Vadilonga committed
        if len(line.split()) % 2 != 0:
            raise TypeError("Wrong parameters. Expected: "
                            "%mov motor position (or several pairs like that)")
        args = []
        for motor, pos in partition(2, line.split()):
            args.append(eval(motor, self.shell.user_ns))
            args.append(eval(pos, self.shell.user_ns))
        plan = bps.mv(*args)
        self.pbar_manager.user_ns=self.shell.user_ns
        self.RE.waiting_hook = self.pbar_manager
        try:
            self.RE(plan)
        except RunEngineInterrupted:
            ...
        self.RE.waiting_hook = None
        self._ensure_idle()
        return None

Simone Vadilonga's avatar
Simone Vadilonga committed
    @line_magic
    def movr(self, line):
        if len(line.split()) % 2 != 0:
            raise TypeError("Wrong parameters. Expected: "
                            "%mov motor position (or several pairs like that)")
        args = []
        for motor, pos in partition(2, line.split()):
            args.append(eval(motor, self.shell.user_ns))
            args.append(eval(pos, self.shell.user_ns))
        plan = bps.mvr(*args)
        self.pbar_manager.user_ns=self.shell.user_ns
        self.RE.waiting_hook = self.pbar_manager
        try:
            self.RE(plan)
        except RunEngineInterrupted:
            ...
        self.RE.waiting_hook = None
        self._ensure_idle()
        return None

Simone Vadilonga's avatar
Simone Vadilonga committed

    FMT_PREC = 6

    @line_magic
    def ct(self, line):
Simone Vadilonga's avatar
Simone Vadilonga committed
        """
        Count data from detectors.

        Args:
            line (str, optional): A space-separated list of detector labels.

        Returns:
            None
        """
Simone Vadilonga's avatar
Simone Vadilonga committed
        # If the deprecated BlueskyMagics.detectors list is non-empty, it has
        # been configured by the user, and we must revert to the old behavior.
        if type(self).detectors:
            if line.strip():
                dets = eval(line, self.shell.user_ns)
            else:
                dets = type(self).detectors
        else:
            # new behaviour
            devices_dict = get_labeled_devices(user_ns=self.shell.user_ns)
            if line.strip():
                if '[' in line or ']' in line:
                    raise ValueError("It looks like you entered a list like "
                                     "`%ct [motors, detectors]` "
                                     "Magics work a bit differently than "
                                     "normal Python. Enter "
                                     "*space-separated* labels like "
                                     "`%ct motors detectors`.")
                # User has provided a white list of labels like
                # %ct label1 label2
                labels = line.strip().split()
            else:
                labels = ['detectors']
            dets = []
            for label in labels:
                dets.extend(obj for _, obj in devices_dict.get(label, []))
        plan = bp.count(dets)
        print("[This data will not be saved. "
              "Use the RunEngine to collect data.]")
        # Get the current date and time
        current_datetime = datetime.now()
        # Format the date and time in European format
        formatted_datetime = current_datetime.strftime("%A, %d/%m/%Y %H:%M:%S")
        # Print the formatted dyate and time
        print(f"Date and Time: {formatted_datetime}")
        try:
            RE = RunEngine({}, loop=asyncio.new_event_loop())
            RE(plan, _ct_callback)
        except RunEngineInterrupted:
            ...
        self._ensure_idle()
        return None

    FMT_PREC = 6
    def get_axis_custom(self,axis):
Simone Vadilonga's avatar
Simone Vadilonga committed
        """
        Get the value of a custom axis.

        Args:
            axis (str): The name of the axis.

        Returns:
            Any: The value of the axis.

        Raises:
            None
        """
Simone Vadilonga's avatar
Simone Vadilonga committed
        try:
            command = f"asyncio.run({axis}.read())"
            # this is the command to use once we update the container, 
            # Peter added it, should give directlz the value
            # command = f"asyncio.run({axis}.get_value())"
            axis_dict   = eval(command, self.shell.user_ns)
            keys = list(axis_dict.keys())
            axis_value = axis_dict[keys[0]]['value']
Simone Vadilonga's avatar
Simone Vadilonga committed
        except Exception as e:
            # print(f'exception {e}')
            axis_value = 'Disconnected'
        return axis_value
    
Simone Vadilonga's avatar
Simone Vadilonga committed
    def get_axis(self, device):
        try:
            if isinstance(device[1].get(), (int, float, complex, str)):
                #print(device[0], device[1].get())
                axis_name  = device[0]
                axis_value = device[1].get()
            else:
                #print(device[1].name , device[1].readback.get())
                axis_name  = device[1].name
                axis_value = device[1].readback.get()
        except:
            axis_value = 'Disconnected'
        return axis_name, axis_value
    
Simone Vadilonga's avatar
Simone Vadilonga committed
    @line_magic
    def wa(self, line):
Simone Vadilonga's avatar
Simone Vadilonga committed
        """
        List positioner info. 'wa' stands for 'where all'.

        Args:
            line (str): A space-separated list of positioner labels.

        Returns:
            None
        """
        devices_dict = get_labeled_devices(user_ns=self.shell.user_ns)
        if line.strip():
            if '[' in line or ']' in line:
                raise ValueError("It looks like you entered a list like "
                                    "`%wa [motors, detectors]` "
                                    "Magics work a bit differently than "
                                    "normal Python. Enter "
                                    "*space-separated* labels like "
                                    "`%wa motors detectors`.")
            # User has provided a white list of labels like
            # %wa label1 label2
            labels = line.strip().split()
Simone Vadilonga's avatar
Simone Vadilonga committed
        else:
Simone Vadilonga's avatar
Simone Vadilonga committed
            # Show all labels.
            labels = list(devices_dict.keys())
            additional_labels = [lab for lab in self.label_axis_dict.keys() if lab not in labels]
            labels.extend(additional_labels)
        
        
Simone Vadilonga's avatar
Simone Vadilonga committed
        for label in labels:
            headers = ['Positioner', 'Value']
            LINE_FMT = '{: <40} {: <11} '
Simone Vadilonga's avatar
Simone Vadilonga committed
            lines = []
            lines.append(LINE_FMT.format(*headers))
            try:
                devices_dict[label]
            except KeyError:
Simone Vadilonga's avatar
Simone Vadilonga committed
                try:
Simone Vadilonga's avatar
Simone Vadilonga committed
                    self.label_axis_dict[label]
Simone Vadilonga's avatar
Simone Vadilonga committed
                except KeyError:
Simone Vadilonga's avatar
Simone Vadilonga committed
                    print('<no matches for this label>')
Simone Vadilonga's avatar
Simone Vadilonga committed
                    continue
Simone Vadilonga's avatar
Simone Vadilonga committed
            if label in self.exclude_labels:
                continue
            elif label in self.label_axis_dict.keys():   # manual patch for pgm, labels do not show up                    
Simone Vadilonga's avatar
Simone Vadilonga committed
                print()
                print(label)
Simone Vadilonga's avatar
Simone Vadilonga committed
                # device         = devices_dict[label]
                # device_name    = device[0].name
                # device_name    = device_name.partition('.')[0]
                device_name = self.label_axis_dict[label][0].split(".")[0]
                axis  = self.label_axis_dict[label]
                for ax in axis:
                    axis_value = self.get_axis_custom(ax)
                    if ".user_readback" in ax:
                        ax_to_print = ax.replace(".user_readback", "")
                    if ".readback" in ax:
                        ax_to_print = ax.replace(".readback","")

Marcel Bajdel's avatar
Marcel Bajdel committed
                    lines.append(LINE_FMT.format(ax_to_print , axis_value))
Simone Vadilonga's avatar
Simone Vadilonga committed
                print('\n'.join(lines))
Simone Vadilonga's avatar
Simone Vadilonga committed
                continue
            print()
            print(label)
            for device in devices_dict[label]:
                device_name, device_value = self.get_axis(device)
                device_name = device_name.replace("_", ".")
                lines.append(LINE_FMT.format(device_name , device_value))
            print('\n'.join(lines))