Skip to content
Snippets Groups Projects
Commit 878f7151 authored by Simone Vadilonga's avatar Simone Vadilonga
Browse files

new magics

parent 405e4c8b
No related branches found
No related tags found
No related merge requests found
......@@ -23,12 +23,12 @@ label_axis_dict = {
# "r.ps.det2.readback",
]
}
get_ipython().register_magics(BlueskyMagicsBessy(RE, get_ipython(), database_name ="db", label_axis_dict=label_axis_dict))
exclude_labels_from_wa=['detectors']
get_ipython().register_magics(BlueskyMagicsBessy(RE, get_ipython(), database_name ="db", exclude_labels_from_wa=exclude_labels_from_wa,label_axis_dict=label_axis_dict))
simplify = Simplify(get_ipython())
simplify.autogenerate_magics('/opt/bluesky/beamlinetools/beamlinetools/BEAMLINE_CONFIG/plans.py')
run_plan = simplify.execute_magic
# This magic does not work, we use bec.peakinfo at the moment
# get_ipython().register_magics(PeakInfoMagic)
get_ipython().register_magics(PeakInfoMagic)
# usage: peakinfo
\ No newline at end of file
......@@ -5,7 +5,7 @@ from .base import BlueskyMagicsBase
@magics_class
class PeakInfoMagic(BlueskyMagicsBase):
"""
Magic class for diplayin peak information
Magic class for diplaying peak information
"""
@property
......@@ -13,7 +13,7 @@ class PeakInfoMagic(BlueskyMagicsBase):
"""
last run metadata
"""
return self.shell.user_ns['db'][-1]['primary'].metadata
return self.shell.user_ns['db'][-1].metadata
@property
def bec(self):
......@@ -27,7 +27,6 @@ class PeakInfoMagic(BlueskyMagicsBase):
"""
show last scan peak information
"""
if line is None or len(line)==0:
# no paramters given : list all recorded detectors
detectors = self.metadata['start']['detectors']
......@@ -39,7 +38,7 @@ class PeakInfoMagic(BlueskyMagicsBase):
# first translate orginal detector names
# to the hinted ones
names = []
hints = self.metadata['descriptors'][0]['hints']
hints = self.metadata['start']['hints']
for detector_name in line.split():
try:
if detector_name in hints:
......
import asyncio
import warnings
# from bluesky.utils import ProgressBarManager
from beamlinetools.utils.pbar_bessy import ProgressBarManager
from bluesky import RunEngine, RunEngineInterrupted
from IPython.core.magic import Magics, magics_class, line_magic
from traitlets import MetaHasTraits
import collections
from datetime import datetime
import bluesky.plans as bp
import bluesky.plan_stubs as bps
try:
# cytools is a drop-in replacement for toolz, implemented in Cython
from cytoolz import partition
except ImportError:
from toolz import partition
from .standard_magics_utils import color_generator, find_parent, _ct_callback
from .standard_magics_utils import is_parent, get_labeled_devices, get_axis
from .standard_magics_utils import MetaclassForClassProperties
from bluesky.magics import _ct_callback, get_labeled_devices
from bluesky.magics import MetaclassForClassProperties
from beamlinetools.utils.pbar_bessy import ProgressBarManager
from beamlinetools.magics.standard_magics_utils import color_generator
@magics_class
class BlueskyMagicsBessy(Magics, metaclass=MetaclassForClassProperties):
"""
IPython magics for bluesky.
IPython magics for bluesky at BESSYII.
To install:
Usage:
>>> ip = get_ipython()
>>> ip.register_magics(BlueskyMagics)
label_axis_dict = {
# "dcm": ["dcm.monoz.user_readback",
# ]
}
get_ipython().register_magics(BlueskyMagicsBessy(RE, get_ipython(), database_name ="db", exclude_labels_from_wa =['detectors'], label_axis_dict=label_axis_dict))
Optionally configure default detectors and positioners by setting
the class attributes:
* ``BlueskyMagics.detectors``
* ``BlueskyMagics.positioners``
"""
def __init__(self, RE, shell, label_axis_dict={}, exclude_labels_from_wa=['detectors'], database_name='db', **kwargs) -> None:
"""
Initializes a BlueskyMagicsBessy instance.
For more advanced configuration, access the magic's RunEngine instance and
ProgressBarManager instance:
Args:
RE: A RunEngine instance.
shell: IPython shell instance.
label_axis_dict: A dictionary mapping labels to lists of axis names.
database_name: Name of the database.
**kwargs: Additional keyword arguments.
* ``BlueskyMagics.RE``
* ``BlueskyMagics.pbar_manager``
"""
def __init__(self, RE, shell, label_axis_dict={}, database_name='db', **kwargs) -> None:
Returns:
None
"""
super().__init__( **kwargs)
self.RE = RE
self.shell = shell
self.exclude_labels = exclude_labels_from_wa
self.decimals = 4
self.pbar_manager = ProgressBarManager()
self.color_gen = color_generator()
self.label_axis_dict = label_axis_dict
self.database_name = database_name
self.plotted_lines = []
def _ensure_idle(self):
"""
Ensure that the RunEngine is in the 'idle' state.
If the RunEngine state is not 'idle', it aborts the current run.
Returns:
None
"""
if self.RE.state != 'idle':
print('The RunEngine invoked by magics cannot be resumed.')
print('Aborting...')
......@@ -65,7 +75,16 @@ class BlueskyMagicsBessy(Magics, metaclass=MetaclassForClassProperties):
def _get_device_from_ns(self, device_name):
"""
Safely get a device object from the user namespace.
Get a device object from the user namespace.
Args:
device_name (str): The name of the device.
Returns:
Any: The device object.
Raises:
ValueError: If the device is not found in the user namespace.
"""
if "_" in device_name:
device_name = device_name.replace("_",".")
......@@ -80,6 +99,12 @@ class BlueskyMagicsBessy(Magics, metaclass=MetaclassForClassProperties):
raise ValueError(f"Device {device_name} not found in user namespace.")
def find_motor_detector(self):
"""
Find the motor and detector from the most recent run.
Returns:
tuple: A tuple containing the motor and detector objects.
"""
db = eval(self.database_name, self.shell.user_ns)
run = db[-1]
detector = run.metadata['start']['detectors']
......@@ -87,6 +112,18 @@ class BlueskyMagicsBessy(Magics, metaclass=MetaclassForClassProperties):
return motor, detector
def get_motor_position(self, motor):
"""
Get the position of a motor.
Args:
motor (str): The name of the motor.
Returns:
float: The position of the motor.
Raises:
NotImplementedError: If the position cannot be obtained using both primary and alternative methods.
"""
motor = self._get_device_from_ns(motor)
try:
motor_pos = motor.readback.get()
......@@ -103,6 +140,12 @@ class BlueskyMagicsBessy(Magics, metaclass=MetaclassForClassProperties):
return motor_pos
def plot_motors_current_pos(self):
"""
Plot the current positions of motors.
Returns:
tuple: A tuple containing the motor and its current position.
"""
bec = self.shell.user_ns.get('bec')
live_plots = bec._live_plots
live_plot_key = list(live_plots.keys())[0]
......@@ -110,29 +153,38 @@ class BlueskyMagicsBessy(Magics, metaclass=MetaclassForClassProperties):
motor, detectors = self.find_motor_detector()
x_motor = self.get_motor_position(motor)
color = next(self.color_gen)
if len(self.plotted_lines)>0:
for l in self.plotted_lines:
l.remove()
self.plotted_lines=[]
for det in live_plot_dets:
live_plot = live_plots[live_plot_key][det]
live_plot.ax.axvline(x=x_motor,ymin=-1e30,ymax=+1e30,
line = live_plot.ax.axvline(x=x_motor,ymin=-1e30,ymax=+1e30,
color=color, linestyle='dashed')
self.plotted_lines.append(line)
return motor, x_motor
def find_motor_and_positon(self,position):
"position should be min, max, pic, cen, com"
"""
Find the motor and its position.
Args:
position (str): The position identifier ('min', 'max', 'pic', 'cen', 'com').
Returns:
tuple: A tuple containing the motor name and its position.
Raises:
None
"""
db = eval(self.database_name, self.shell.user_ns)
run = db[-1]
detector = run.metadata['start']['detectors'][0]
motor = run.metadata['start']['motors'][0]
# parent_finder = run.metadata['start']['plan_args']['args'][0]
# parent = find_parent(parent_finder)
# motor_axis = motor.replace(parent+'_', '.')
bec = self.shell.user_ns.get('bec')
peak_dict = bec.peaks
mot_pos = peak_dict[position][detector]
# if motor == motor_axis:
# motor_and_axis = motor
# else:
# motor_and_axis = 'parent'+motor_axis
if "_" in motor:
motor = motor.replace("_",".")
if position not in ['cen','com']:
......@@ -140,6 +192,19 @@ class BlueskyMagicsBessy(Magics, metaclass=MetaclassForClassProperties):
return motor, mot_pos
def move_to_pos(self, motor, motor_position):
"""
Move the motor to the specified position.
Args:
motor (str): The name of the motor to move.
motor_position (float): The position to move the motor to.
Returns:
None
Raises:
None
"""
plan = bps.mv(motor,motor_position)
self.RE.waiting_hook = self.pbar_manager
try:
......@@ -151,11 +216,29 @@ class BlueskyMagicsBessy(Magics, metaclass=MetaclassForClassProperties):
@line_magic
def where(self, line):
"""
Print the current position of the motor, and plot it.
Args:
line (str): Not used.
Returns:
None
"""
motor, x_motor = self.plot_motors_current_pos()
print(f'The motor: {motor} is at the position {x_motor}')
@line_magic
def cen(self, line):
"""
Move the motor to the center position, and plot it..
Args:
line (str): Not used.
Returns:
None
"""
motor_and_axis, mot_pos = self.find_motor_and_positon('cen')
mot = self._get_device_from_ns(motor_and_axis)
self.move_to_pos(mot, mot_pos)
......@@ -165,6 +248,15 @@ class BlueskyMagicsBessy(Magics, metaclass=MetaclassForClassProperties):
@line_magic
def pic(self, line):
"""
Move the motor to the max position, and plot it.
Args:
line (str): Not used.
Returns:
None
"""
motor_and_axis, mot_pos = self.find_motor_and_positon('max')
mot = self._get_device_from_ns(motor_and_axis)
self.move_to_pos(mot, mot_pos)
......@@ -173,6 +265,15 @@ class BlueskyMagicsBessy(Magics, metaclass=MetaclassForClassProperties):
@line_magic
def com(self, line):
"""
Move the motor to the center of mass position, and plot it.
Args:
line (str): Not used.
Returns:
None
"""
motor_and_axis, mot_pos = self.find_motor_and_positon('com')
mot = self._get_device_from_ns(motor_and_axis)
self.move_to_pos(mot, mot_pos)
......@@ -181,6 +282,15 @@ class BlueskyMagicsBessy(Magics, metaclass=MetaclassForClassProperties):
@line_magic
def minimum(self, line):
"""
Move the motor to the minimum position, and plot it.
Args:
line (str): Not used.
Returns:
None
"""
motor_and_axis, mot_pos = self.find_motor_and_positon('min')
mot = self._get_device_from_ns(motor_and_axis)
self.move_to_pos(mot, mot_pos)
......@@ -189,6 +299,15 @@ class BlueskyMagicsBessy(Magics, metaclass=MetaclassForClassProperties):
@line_magic
def mov(self, line):
"""
Move the motor(s) to the specified position(s).
Args:
line (str): The line containing motor and position pairs.
Returns:
None
"""
if len(line.split()) % 2 != 0:
raise TypeError("Wrong parameters. Expected: "
"%mov motor position (or several pairs like that)")
......@@ -207,11 +326,40 @@ class BlueskyMagicsBessy(Magics, metaclass=MetaclassForClassProperties):
self._ensure_idle()
return None
@line_magic
def movr(self, line):
if len(line.split()) % 2 != 0:
raise TypeError("Wrong parameters. Expected: "
"%mov motor position (or several pairs like that)")
args = []
for motor, pos in partition(2, line.split()):
args.append(eval(motor, self.shell.user_ns))
args.append(eval(pos, self.shell.user_ns))
plan = bps.mvr(*args)
self.pbar_manager.user_ns=self.shell.user_ns
self.RE.waiting_hook = self.pbar_manager
try:
self.RE(plan)
except RunEngineInterrupted:
...
self.RE.waiting_hook = None
self._ensure_idle()
return None
FMT_PREC = 6
@line_magic
def ct(self, line):
"""
Count data from detectors.
Args:
line (str, optional): A space-separated list of detector labels.
Returns:
None
"""
# If the deprecated BlueskyMagics.detectors list is non-empty, it has
# been configured by the user, and we must revert to the old behavior.
if type(self).detectors:
......@@ -257,6 +405,18 @@ class BlueskyMagicsBessy(Magics, metaclass=MetaclassForClassProperties):
FMT_PREC = 6
def get_axis_custom(self,axis):
"""
Get the value of a custom axis.
Args:
axis (str): The name of the axis.
Returns:
Any: The value of the axis.
Raises:
None
"""
try:
axis_value = eval(axis + '.get()', self.shell.user_ns)
except Exception as e:
......@@ -264,77 +424,88 @@ class BlueskyMagicsBessy(Magics, metaclass=MetaclassForClassProperties):
axis_value = 'Disconnected'
return axis_value
def get_axis(self, device):
try:
if isinstance(device[1].get(), (int, float, complex, str)):
#print(device[0], device[1].get())
axis_name = device[0]
axis_value = device[1].get()
else:
#print(device[1].name , device[1].readback.get())
axis_name = device[1].name
axis_value = device[1].readback.get()
except:
axis_value = 'Disconnected'
return axis_name, axis_value
@line_magic
def wa(self, line):
"List positioner info. 'wa' stands for 'where all'."
# If the deprecated BlueskyMagics.positioners list is non-empty, it has
# been configured by the user, and we must revert to the old behavior.
if type(self).positioners:
if line.strip():
positioners = eval(line, self.shell.user_ns)
else:
positioners = type(self).positioners
if len(positioners) > 0:
_print_positioners(positioners, precision=self.FMT_PREC)
"""
List positioner info. 'wa' stands for 'where all'.
Args:
line (str): A space-separated list of positioner labels.
Returns:
None
"""
devices_dict = get_labeled_devices(user_ns=self.shell.user_ns)
if line.strip():
if '[' in line or ']' in line:
raise ValueError("It looks like you entered a list like "
"`%wa [motors, detectors]` "
"Magics work a bit differently than "
"normal Python. Enter "
"*space-separated* labels like "
"`%wa motors detectors`.")
# User has provided a white list of labels like
# %wa label1 label2
labels = line.strip().split()
else:
# new behaviour
devices_dict = get_labeled_devices(user_ns=self.shell.user_ns)
if line.strip():
if '[' in line or ']' in line:
raise ValueError("It looks like you entered a list like "
"`%wa [motors, detectors]` "
"Magics work a bit differently than "
"normal Python. Enter "
"*space-separated* labels like "
"`%wa motors detectors`.")
# User has provided a white list of labels like
# %wa label1 label2
labels = line.strip().split()
else:
# Show all labels.
labels = list(devices_dict.keys())
additional_labels = [lab for lab in self.label_axis_dict.keys() if lab not in labels]
labels.extend(additional_labels)
for label in labels:
headers = ['Positioner', 'Value']
LINE_FMT = '{: <30} {: <11} '
lines = []
lines.append(LINE_FMT.format(*headers))
# Show all labels.
labels = list(devices_dict.keys())
additional_labels = [lab for lab in self.label_axis_dict.keys() if lab not in labels]
labels.extend(additional_labels)
for label in labels:
headers = ['Positioner', 'Value']
LINE_FMT = '{: <30} {: <11} '
lines = []
lines.append(LINE_FMT.format(*headers))
try:
devices_dict[label]
except KeyError:
try:
devices_dict[label]
self.label_axis_dict[label]
except KeyError:
try:
self.label_axis_dict[label]
except KeyError:
print('<no matches for this label>')
continue
if label == 'detectors' or label == 'motors' or label == 'keithley':
continue
elif label in self.label_axis_dict.keys(): # manual patch for pgm, labels do not show up
print()
print(label)
# device = devices_dict[label]
# device_name = device[0].name
# device_name = device_name.partition('.')[0]
device_name = self.label_axis_dict[label][0].split(".")[0]
axis = self.label_axis_dict[label]
for ax in axis:
axis_value = self.get_axis_custom(ax)
if ".user_readback" in ax:
ax_to_print = ax.replace(".user_readback", "")
if ".readback" in ax:
ax_to_print = ax.replace(".readback","")
lines.append(LINE_FMT.format(ax , axis_value))
print('\n'.join(lines))
print('<no matches for this label>')
continue
if label in self.exclude_labels:
continue
elif label in self.label_axis_dict.keys(): # manual patch for pgm, labels do not show up
print()
print(label)
for device in devices_dict[label]:
device_name, device_value = get_axis(device)
device_name = device_name.replace("_", ".")
lines.append(LINE_FMT.format(device_name , device_value))
# device = devices_dict[label]
# device_name = device[0].name
# device_name = device_name.partition('.')[0]
device_name = self.label_axis_dict[label][0].split(".")[0]
axis = self.label_axis_dict[label]
for ax in axis:
axis_value = self.get_axis_custom(ax)
if ".user_readback" in ax:
ax_to_print = ax.replace(".user_readback", "")
if ".readback" in ax:
ax_to_print = ax.replace(".readback","")
lines.append(LINE_FMT.format(ax , axis_value))
print('\n'.join(lines))
continue
print()
print(label)
for device in devices_dict[label]:
device_name, device_value = self.get_axis(device)
device_name = device_name.replace("_", ".")
lines.append(LINE_FMT.format(device_name , device_value))
print('\n'.join(lines))
......
# These are experimental IPython magics, providing quick shortcuts for simple
# tasks. None of these save any data.
# To use, run this in an IPython shell:
# ip = get_ipython()
# ip.register_magics(BlueskyMagicsSimo)
import asyncio
import warnings
# from bluesky.utils import ProgressBarManager
from beamlinetools.utils.pbar_bessy import ProgressBarManager
from bluesky import RunEngine, RunEngineInterrupted
from IPython.core.magic import Magics, magics_class, line_magic
from traitlets import MetaHasTraits
import numpy as np
import collections
from datetime import datetime
from operator import attrgetter
import bluesky.plans as bp
import bluesky.plan_stubs as bps
from IPython import get_ipython
user_ns = get_ipython().user_ns
try:
# cytools is a drop-in replacement for toolz, implemented in Cython
from cytoolz import partition
except ImportError:
from toolz import partition
# This is temporarily here to allow for warnings to be printed
# we changed positioners to a property but since we never instantiate
# the class we need to add this
class MetaclassForClassProperties(MetaHasTraits, type):
@property
def positioners(self):
if self._positioners:
warnings.warn("BlueskyMagics.positioners is deprecated. "
"Please use the newer labels feature.")
return self._positioners
@positioners.setter
def positioners(self, val):
warnings.warn("BlueskyMagics.positioners is deprecated. "
"Please use the newer labels feature.")
self._positioners = val
@property
def detectors(self):
if self._detectors:
warnings.warn("BlueskyMagics.detectors is deprecated. "
"Please use the newer labels feature.")
return self._detectors
@detectors.setter
def detectors(self, val):
warnings.warn("BlueskyMagics.detectors is deprecated. "
"Please use the newer labels feature.")
self._detectors = val
_positioners = []
_detectors = []
def get_axis(device):
try:
if isinstance(device[1].get(), (int, float, complex, str)):
#print(device[0], device[1].get())
axis_name = device[0]
axis_value = device[1].get()
else:
#print(device[1].name , device[1].readback.get())
axis_name = device[1].name
axis_value = device[1].readback.get()
except:
axis_value = 'Disconnected'
return axis_name, axis_value
def get_labeled_devices(user_ns=None, maxdepth=8):
''' Returns dict of labels mapped to devices with that label
......@@ -160,24 +86,6 @@ def is_parent(dev):
return (not isinstance(dev, type) and getattr(dev, 'component_names', []))
def _ct_callback(name, doc):
if name != 'event':
return
for k, v in doc['data'].items():
print('{: <30} {}'.format(k, v))
def find_parent(string):
index = string.find("parent='") + 8
counter=0
for i in string[index:]:
if counter == 0:
parent=i
counter += 1
elif i == "'":
return parent
else:
parent += i
def color_generator():
while True:
yield "black"
......
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment