Newer
Older
import asyncio
from bluesky import RunEngine, RunEngineInterrupted
from IPython.core.magic import Magics, magics_class, line_magic
from datetime import datetime
import bluesky.plans as bp
import bluesky.plan_stubs as bps
try:
# cytools is a drop-in replacement for toolz, implemented in Cython
from cytoolz import partition
except ImportError:
from toolz import partition
from bluesky.magics import _ct_callback, get_labeled_devices
from bluesky.magics import MetaclassForClassProperties
from beamlinetools.utils.pbar_bessy import ProgressBarManager
from beamlinetools.magics.standard_magics_utils import color_generator
@magics_class
class BlueskyMagicsBessy(Magics, metaclass=MetaclassForClassProperties):
"""
label_axis_dict = {
# "dcm": ["dcm.monoz.user_readback",
# ]
}
get_ipython().register_magics(BlueskyMagicsBessy(RE, get_ipython(), database_name ="db", exclude_labels_from_wa =['detectors'], label_axis_dict=label_axis_dict))
"""
def __init__(self, RE, shell, label_axis_dict={}, exclude_labels_from_wa=['detectors'], database_name='db', **kwargs) -> None:
"""
Initializes a BlueskyMagicsBessy instance.
Args:
RE: A RunEngine instance.
shell: IPython shell instance.
label_axis_dict: A dictionary mapping labels to lists of axis names.
database_name: Name of the database.
**kwargs: Additional keyword arguments.
super().__init__( **kwargs)
self.RE = RE
self.shell = shell
self.decimals = 4
self.pbar_manager = ProgressBarManager()
self.color_gen = color_generator()
self.label_axis_dict = label_axis_dict
self.database_name = database_name
"""
Ensure that the RunEngine is in the 'idle' state.
If the RunEngine state is not 'idle', it aborts the current run.
Returns:
None
"""
if self.RE.state != 'idle':
print('The RunEngine invoked by magics cannot be resumed.')
print('Aborting...')
self.RE.abort()
def _get_device_from_ns(self, device_name):
"""
Get a device object from the user namespace.
Args:
device_name (str): The name of the device.
Returns:
Any: The device object.
Raises:
ValueError: If the device is not found in the user namespace.
"""
if "_" in device_name:
device_name = device_name.replace("_",".")
try:
dev = eval(device_name, self.shell.user_ns)
if dev is not None:
return dev
else:
raise ValueError(f"Device {device_name} is {dev }")
except KeyError:
raise ValueError(f"Device {device_name} not found in user namespace.")
def find_motor_detector(self):
"""
Find the motor and detector from the most recent run.
Returns:
tuple: A tuple containing the motor and detector objects.
"""
db = eval(self.database_name, self.shell.user_ns)
run = db[-1]
detector = run.metadata['start']['detectors']
motor = run.metadata['start']['motors'][0]
return motor, detector
def get_motor_position(self, motor):
"""
Get the position of a motor.
Args:
motor (str): The name of the motor.
Returns:
float: The position of the motor.
Raises:
NotImplementedError: If the position cannot be obtained using both primary and alternative methods.
"""
motor = self._get_device_from_ns(motor)
try:
motor_pos = motor.readback.get()
except Exception as e:
pass
#print(f'Exception attempting primary method: {e}')
try:
# If the primary method fails, try an alternative method
motor_pos = motor.user_readback.get()
except Exception as e:
# Handle failure of the alternative method
print(f'Exception attempting alternative method: {e}')
raise NotImplementedError('Unable to find the motor position using both primary and alternative methods')
return motor_pos
def plot_motors_current_pos(self):
"""
Plot the current positions of motors.
Returns:
tuple: A tuple containing the motor and its current position.
"""
bec = self.shell.user_ns.get('bec')
live_plots = bec._live_plots
live_plot_key = list(live_plots.keys())[0]
live_plot_dets = live_plots[live_plot_key].keys()
motor, detectors = self.find_motor_detector()
x_motor = self.get_motor_position(motor)
color = next(self.color_gen)
if len(self.plotted_lines)>0:
for l in self.plotted_lines:
l.remove()
self.plotted_lines=[]
for det in live_plot_dets:
live_plot = live_plots[live_plot_key][det]
line = live_plot.ax.axvline(x=x_motor,ymin=-1e30,ymax=+1e30,
return motor, x_motor
def find_motor_and_positon(self,position):
"""
Find the motor and its position.
Args:
position (str): The position identifier ('min', 'max', 'pic', 'cen', 'com').
Returns:
tuple: A tuple containing the motor name and its position.
Raises:
None
"""
db = eval(self.database_name, self.shell.user_ns)
run = db[-1]
detector = run.metadata['start']['detectors'][0]
motor = run.metadata['start']['motors'][0]
bec = self.shell.user_ns.get('bec')
peak_dict = bec.peaks
mot_pos = peak_dict[position][detector]
if "_" in motor:
motor = motor.replace("_",".")
if position not in ['cen','com']:
mot_pos = mot_pos[0]
return motor, mot_pos
def move_to_pos(self, motor, motor_position):
"""
Move the motor to the specified position.
Args:
motor (str): The name of the motor to move.
motor_position (float): The position to move the motor to.
Returns:
None
Raises:
None
"""
plan = bps.mv(motor,motor_position)
self.RE.waiting_hook = self.pbar_manager
try:
self.RE(plan)
except RunEngineInterrupted:
...
self.RE.waiting_hook = None
self._ensure_idle()
@line_magic
def where(self, line):
"""
Print the current position of the motor, and plot it.
Args:
line (str): Not used.
Returns:
None
"""
motor, x_motor = self.plot_motors_current_pos()
print(f'The motor: {motor} is at the position {x_motor}')
@line_magic
def cen(self, line):
"""
Move the motor to the center position, and plot it..
Args:
line (str): Not used.
Returns:
None
"""
motor_and_axis, mot_pos = self.find_motor_and_positon('cen')
mot = self._get_device_from_ns(motor_and_axis)
self.move_to_pos(mot, mot_pos)
self.plot_motors_current_pos()
print('Moved motor', motor_and_axis, 'to position', mot_pos)
@line_magic
def pic(self, line):
"""
Move the motor to the max position, and plot it.
Args:
line (str): Not used.
Returns:
None
"""
motor_and_axis, mot_pos = self.find_motor_and_positon('max')
mot = self._get_device_from_ns(motor_and_axis)
self.move_to_pos(mot, mot_pos)
self.plot_motors_current_pos()
print('Moved motor', motor_and_axis, 'to position', mot_pos)
@line_magic
def com(self, line):
"""
Move the motor to the center of mass position, and plot it.
Args:
line (str): Not used.
Returns:
None
"""
motor_and_axis, mot_pos = self.find_motor_and_positon('com')
mot = self._get_device_from_ns(motor_and_axis)
self.move_to_pos(mot, mot_pos)
self.plot_motors_current_pos()
print('Moved motor', motor_and_axis, 'to position', mot_pos)
@line_magic
def minimum(self, line):
"""
Move the motor to the minimum position, and plot it.
Args:
line (str): Not used.
Returns:
None
"""
motor_and_axis, mot_pos = self.find_motor_and_positon('min')
mot = self._get_device_from_ns(motor_and_axis)
self.move_to_pos(mot, mot_pos)
self.plot_motors_current_pos()
print('Moved motor', motor_and_axis, 'to position', mot_pos)
@line_magic
def mov(self, line):
"""
Move the motor(s) to the specified position(s).
Args:
line (str): The line containing motor and position pairs.
Returns:
None
"""
if len(line.split()) % 2 != 0:
raise TypeError("Wrong parameters. Expected: "
"%mov motor position (or several pairs like that)")
args = []
for motor, pos in partition(2, line.split()):
args.append(eval(motor, self.shell.user_ns))
args.append(eval(pos, self.shell.user_ns))
plan = bps.mv(*args)
self.pbar_manager.user_ns=self.shell.user_ns
self.RE.waiting_hook = self.pbar_manager
try:
self.RE(plan)
except RunEngineInterrupted:
...
self.RE.waiting_hook = None
self._ensure_idle()
return None
@line_magic
def movr(self, line):
if len(line.split()) % 2 != 0:
raise TypeError("Wrong parameters. Expected: "
"%mov motor position (or several pairs like that)")
args = []
for motor, pos in partition(2, line.split()):
args.append(eval(motor, self.shell.user_ns))
args.append(eval(pos, self.shell.user_ns))
plan = bps.mvr(*args)
self.pbar_manager.user_ns=self.shell.user_ns
self.RE.waiting_hook = self.pbar_manager
try:
self.RE(plan)
except RunEngineInterrupted:
...
self.RE.waiting_hook = None
self._ensure_idle()
return None
FMT_PREC = 6
@line_magic
def ct(self, line):
"""
Count data from detectors.
Args:
line (str, optional): A space-separated list of detector labels.
Returns:
None
"""
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
# If the deprecated BlueskyMagics.detectors list is non-empty, it has
# been configured by the user, and we must revert to the old behavior.
if type(self).detectors:
if line.strip():
dets = eval(line, self.shell.user_ns)
else:
dets = type(self).detectors
else:
# new behaviour
devices_dict = get_labeled_devices(user_ns=self.shell.user_ns)
if line.strip():
if '[' in line or ']' in line:
raise ValueError("It looks like you entered a list like "
"`%ct [motors, detectors]` "
"Magics work a bit differently than "
"normal Python. Enter "
"*space-separated* labels like "
"`%ct motors detectors`.")
# User has provided a white list of labels like
# %ct label1 label2
labels = line.strip().split()
else:
labels = ['detectors']
dets = []
for label in labels:
dets.extend(obj for _, obj in devices_dict.get(label, []))
plan = bp.count(dets)
print("[This data will not be saved. "
"Use the RunEngine to collect data.]")
# Get the current date and time
current_datetime = datetime.now()
# Format the date and time in European format
formatted_datetime = current_datetime.strftime("%A, %d/%m/%Y %H:%M:%S")
# Print the formatted dyate and time
print(f"Date and Time: {formatted_datetime}")
try:
RE = RunEngine({}, loop=asyncio.new_event_loop())
RE(plan, _ct_callback)
except RunEngineInterrupted:
...
self._ensure_idle()
return None
FMT_PREC = 6
def get_axis_custom(self,axis):
"""
Get the value of a custom axis.
Args:
axis (str): The name of the axis.
Returns:
Any: The value of the axis.
Raises:
None
"""
command = f"asyncio.run({axis}.read())"
# this is the command to use once we update the container,
# Peter added it, should give directlz the value
# command = f"asyncio.run({axis}.get_value())"
axis_dict = eval(command, self.shell.user_ns)
keys = list(axis_dict.keys())
axis_value = axis_dict[keys[0]]['value']
except Exception as e:
# print(f'exception {e}')
axis_value = 'Disconnected'
return axis_value
def get_axis(self, device):
try:
if isinstance(device[1].get(), (int, float, complex, str)):
#print(device[0], device[1].get())
axis_name = device[0]
axis_value = device[1].get()
else:
#print(device[1].name , device[1].readback.get())
axis_name = device[1].name
axis_value = device[1].readback.get()
except:
axis_value = 'Disconnected'
return axis_name, axis_value
"""
List positioner info. 'wa' stands for 'where all'.
Args:
line (str): A space-separated list of positioner labels.
Returns:
None
"""
devices_dict = get_labeled_devices(user_ns=self.shell.user_ns)
if line.strip():
if '[' in line or ']' in line:
raise ValueError("It looks like you entered a list like "
"`%wa [motors, detectors]` "
"Magics work a bit differently than "
"normal Python. Enter "
"*space-separated* labels like "
"`%wa motors detectors`.")
# User has provided a white list of labels like
# %wa label1 label2
labels = line.strip().split()
additional_labels = [lab for lab in self.label_axis_dict.keys() if lab not in labels]
labels.extend(additional_labels)
for label in labels:
headers = ['Positioner', 'Value']
lines = []
lines.append(LINE_FMT.format(*headers))
try:
devices_dict[label]
except KeyError:
if label in self.exclude_labels:
continue
elif label in self.label_axis_dict.keys(): # manual patch for pgm, labels do not show up
# device = devices_dict[label]
# device_name = device[0].name
# device_name = device_name.partition('.')[0]
device_name = self.label_axis_dict[label][0].split(".")[0]
axis = self.label_axis_dict[label]
for ax in axis:
axis_value = self.get_axis_custom(ax)
if ".user_readback" in ax:
ax_to_print = ax.replace(".user_readback", "")
if ".readback" in ax:
ax_to_print = ax.replace(".readback","")
continue
print()
print(label)
for device in devices_dict[label]:
device_name, device_value = self.get_axis(device)
device_name = device_name.replace("_", ".")
lines.append(LINE_FMT.format(device_name , device_value))
print('\n'.join(lines))