Skip to content
Snippets Groups Projects
Commit 81fda1e1 authored by Simone Vadilonga's avatar Simone Vadilonga
Browse files

new mca implementation

parent 878f7151
No related branches found
No related tags found
No related merge requests found
from ophyd import Device, EpicsMotor, EpicsSignal, EpicsSignalRO
from ophyd import Component as Cpt
from ophyd.mca import EpicsMCA, ROI
from ophyd import PseudoPositioner, PseudoSingle
from ophyd.pseudopos import pseudo_position_argument, real_position_argument
import numpy as np
from ophyd.mca import EpicsMCARecord, ROI, EpicsDXPLowLevel, EpicsDXPBaseSystem#, EpicsDXPMultiElementSystem
from ophyd.areadetector import EpicsSignal as EpicsSignal
from ophyd import EpicsSignal, EpicsSignalRO
from ophyd import status, DeviceStatus, Signal
from ophyd.status import SubscriptionStatus, MoveStatus, AndStatus
class EpicsDXPMultiElementSystem(EpicsDXPBaseSystem):
# Preset info
preset_events = Cpt(EpicsSignal, "PresetEvents")
preset_live_time = Cpt(EpicsSignal, "PresetLive")
preset_real_time = Cpt(EpicsSignal, "PresetReal")
preset_mode = Cpt(EpicsSignal, "PresetMode", string=True)
preset_triggers = Cpt(EpicsSignal, "PresetTriggers")
# Acquisition
erase_all = Cpt(EpicsSignal, "EraseAll")
erase_start = Cpt(EpicsSignal, "EraseStart", trigger_value=1)
start_all = Cpt(EpicsSignal, "StartAll")
stop_all = Cpt(EpicsSignal, "StopAll")
# Status
set_acquire_busy = Cpt(EpicsSignal, "SetAcquireBusy")
acquire_busy = Cpt(EpicsSignal, "AcquireBusy")
status_all = Cpt(EpicsSignal, "StatusAll")
status_all_once = Cpt(EpicsSignal, "StatusAllOnce")
acquiring = Cpt(EpicsSignal, "Acquiring")
done_value = 0
# Reading
read_baseline_histograms = Cpt(EpicsSignal, "ReadBaselineHistograms")
read_all = Cpt(EpicsSignal, "ReadAll")
read_all_once = Cpt(EpicsSignal, "ReadAllOnce")
# As a debugging note, if snl_connected is not '1', your IOC is
# misconfigured:
snl_connected = Cpt(EpicsSignal, "SNL_Connected")
# Copying to individual elements
copy_adcp_ercent_rule = Cpt(EpicsSignal, "CopyADCPercentRule")
copy_baseline_cut_enable = Cpt(EpicsSignal, "CopyBaselineCutEnable")
copy_baseline_cut_percent = Cpt(EpicsSignal, "CopyBaselineCutPercent")
copy_baseline_filter_length = Cpt(EpicsSignal, "CopyBaselineFilterLength")
copy_baseline_threshold = Cpt(EpicsSignal, "CopyBaselineThreshold")
copy_decay_time = Cpt(EpicsSignal, "CopyDecayTime")
copy_detector_polarity = Cpt(EpicsSignal, "CopyDetectorPolarity")
copy_energy_threshold = Cpt(EpicsSignal, "CopyEnergyThreshold")
copy_gap_time = Cpt(EpicsSignal, "CopyGapTime")
copy_max_energy = Cpt(EpicsSignal, "CopyMaxEnergy")
copy_max_width = Cpt(EpicsSignal, "CopyMaxWidth")
copy_peaking_time = Cpt(EpicsSignal, "CopyPeakingTime")
copy_preamp_gain = Cpt(EpicsSignal, "CopyPreampGain")
copy_roic_hannel = Cpt(EpicsSignal, "CopyROIChannel")
copy_roie_nergy = Cpt(EpicsSignal, "CopyROIEnergy")
copy_roi_sca = Cpt(EpicsSignal, "CopyROI_SCA")
copy_reset_delay = Cpt(EpicsSignal, "CopyResetDelay")
copy_trigger_gap_time = Cpt(EpicsSignal, "CopyTriggerGapTime")
copy_trigger_peaking_time = Cpt(EpicsSignal, "CopyTriggerPeakingTime")
copy_trigger_threshold = Cpt(EpicsSignal, "CopyTriggerThreshold")
# do_* executes the process:
do_read_all = Cpt(EpicsSignal, "DoReadAll")
do_read_baseline_histograms = Cpt(EpicsSignal, "DoReadBaselineHistograms")
do_read_traces = Cpt(EpicsSignal, "DoReadTraces")
do_status_all = Cpt(EpicsSignal, "DoStatusAll")
# Time
dead_time = Cpt(EpicsSignal, "DeadTime")
elapsed_live = Cpt(EpicsSignal, "ElapsedLive")
elapsed_real = Cpt(EpicsSignal, "ElapsedReal")
idead_time = Cpt(EpicsSignal, "IDeadTime")
# low-level
read_low_level_params = Cpt(EpicsSignal, "ReadLLParams")
# Traces
read_traces = Cpt(EpicsSignal, "ReadTraces")
trace_modes = Cpt(EpicsSignal, "TraceModes", string=True)
trace_times = Cpt(EpicsSignal, "TraceTimes")
class EpicsDXP(Device):
"""All high-level DXP parameters for each channel"""
preset_mode = Cpt(EpicsSignal, "PresetMode", string=True)
# live_time_output = Cpt(EpicsSignal, "LiveTimeOutput", string=True)
# elapsed_live_time = Cpt(EpicsSignal, "ElapsedLive")
# elapsed_real_time = Cpt(EpicsSignal, "ElapsedReal")
# elapsed_trigger_live_time = Cpt(EpicsSignal, "ElapsedTriggerLiveTime")
# Trigger Filter PVs
trigger_peaking_time = Cpt(EpicsSignal, "TriggerPeakingTime")
trigger_threshold = Cpt(EpicsSignal, "TriggerThreshold")
trigger_gap_time = Cpt(EpicsSignal, "TriggerGapTime")
# trigger_output = Cpt(EpicsSignal, "TriggerOutput", string=True)
max_width = Cpt(EpicsSignal, "MaxWidth")
from collections import OrderedDict
from ophyd.device import (Device, Component as Cpt, DynamicDeviceComponent as DDC,
Kind)
from ophyd import FormattedComponent as FCpt
class ROI(Device):
# 'name' is not an allowed attribute
label = FCpt(EpicsSignal, '{self.prefix}.R{self._ch}NM', lazy=True, kind='config')
count = FCpt(EpicsSignal, '{self.prefix}.R{self._ch}', lazy=True, kind='normal')
net_count = FCpt(EpicsSignalRO, '{self.prefix}.R{self._ch}N', lazy=True, kind='config')
preset_count = FCpt(EpicsSignal, '{self.prefix}.R{self._ch}P', lazy=True, kind='config')
is_preset = FCpt(EpicsSignal, '{self.prefix}.R{self._ch}IP', lazy=True, kind='config')
bkgnd_chans = FCpt(EpicsSignal, '{self.prefix}.R{self._ch}BG', lazy=True, kind='config')
# hi_chan = FCpt(EpicsSignal, '{self.prefix}.R{self._ch}HI', lazy=True, kind='config')
# lo_chan = FCpt(EpicsSignal, '{self.prefix}.R{self._ch}LO', lazy=True, kind='config')
# hi_en = FCpt(EpicsSignal, '{self.prefix}:R{self._ch}HIENERGY', lazy=True, kind='config')
# lo_en = FCpt(EpicsSignal, '{self.prefix}:R{self._ch}LOENERGY', lazy =True, kind='config')
def __init__(self, prefix,ch, *, read_attrs=None, configuration_attrs=None,
name=None, parent=None, **kwargs):
super().__init__(prefix, read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
name=name, parent=parent, **kwargs)
self._ch = ch
self.hide(verbose=False)
def display(self, verbose=True):
# Energy Filter PVs
peaking_time = Cpt(EpicsSignal, "PeakingTime")
energy_threshold = Cpt(EpicsSignal, "EnergyThreshold")
gap_time = Cpt(EpicsSignal, "GapTime")
# Baseline PVs
baseline_cut_percent = Cpt(EpicsSignal, "BaselineCutPercent")
baseline_cut_enable = Cpt(EpicsSignal, "BaselineCutEnable")
baseline_filter_length = Cpt(EpicsSignal, "BaselineFilterLength")
baseline_threshold = Cpt(EpicsSignal, "BaselineThreshold")
baseline_energy_array = Cpt(EpicsSignal, "BaselineEnergyArray")
baseline_histogram = Cpt(EpicsSignal, "BaselineHistogram")
baseline_threshold = Cpt(EpicsSignal, "BaselineThreshold")
# Misc PVs
self.count.kind="hinted"
if verbose:
dev_name = self.name
dev_name = dev_name.replace("_", ".")
print(f"{dev_name} will be plotted")
preamp_gain = Cpt(EpicsSignal, "PreampGain")
detector_polarity = Cpt(EpicsSignal, "DetectorPolarity")
reset_delay = Cpt(EpicsSignal, "ResetDelay")
decay_time = Cpt(EpicsSignal, "DecayTime")
max_energy = Cpt(EpicsSignal, "MaxEnergy")
adc_percent_rule = Cpt(EpicsSignal, "ADCPercentRule")
max_width = Cpt(EpicsSignal, "MaxWidth")
# read-only diagnostics
triggers = Cpt(EpicsSignalRO, "Triggers", lazy=True)
events = Cpt(EpicsSignalRO, "Events", lazy=True)
overflows = Cpt(EpicsSignalRO, "Overflows", lazy=True)
underflows = Cpt(EpicsSignalRO, "Underflows", lazy=True)
input_count_rate = Cpt(EpicsSignalRO, "InputCountRate", lazy=True)
output_count_rate = Cpt(EpicsSignalRO, "OutputCountRate", lazy=True)
mca_bin_width = Cpt(EpicsSignalRO, "MCABinWidth_RBV")
calibration_energy = Cpt(EpicsSignalRO, "CalibrationEnergy_RBV")
current_pixel = Cpt(EpicsSignal, "CurrentPixel")
dynamic_range = Cpt(EpicsSignalRO, "DynamicRange_RBV")
def hide(self, verbose=True):
# Preset options
preset_events = Cpt(EpicsSignal, "PresetEvents")
# preset_mode = Cpt(EpicsSignal, "PresetMode", string=True)
# preset_triggers = Cpt(EpicsSignal, "PresetTriggers")
# Trace options
trace_data = Cpt(EpicsSignal, "TraceData")
trace_mode = Cpt(EpicsSignal, "TraceMode", string=True)
trace_time_array = Cpt(EpicsSignal, "TraceTimeArray")
trace_time = Cpt(EpicsSignal, "TraceTime")
self.count.kind="normal"
if verbose:
dev_name = self.name
dev_name = dev_name.replace("_", ".")
print(f"{dev_name} will not be plotted")
class xMapDXP(EpicsDXP):
pass
class mca(EpicsMCARecord, ROI):
pass
class xMap(EpicsDXPMultiElementSystem):
"""DXP xMap with 1 channel example"""
class Channel(Device):
# ROIS
roi0 =Cpt(ROI, '', ch=0, kind='normal')
roi1 =Cpt(ROI, '', ch=1, kind='normal')
roi2 =Cpt(ROI, '', ch=2, kind='normal')
roi3 =Cpt(ROI, '', ch=3, kind='normal')
roi4 =Cpt(ROI, '', ch=4, kind='normal')
roi5 =Cpt(ROI, '', ch=5, kind='normal')
roi6 =Cpt(ROI, '', ch=6, kind='normal')
roi7 =Cpt(ROI, '', ch=7, kind='normal')
#calibration
offset = Cpt(EpicsSignalRO, '.CALO',kind='config')
slope = Cpt(EpicsSignalRO, '.CALS',kind='config')
quadratic = Cpt(EpicsSignalRO, '.CALQ',kind='config')
egu = Cpt(EpicsSignalRO, '.EGU',kind='config')
two_theta = Cpt(EpicsSignalRO, '.TTH',kind='config')
class MyEpicsMCA(Device):
# Aquisition
start_all = Cpt(EpicsSignal, "StartAll")
stop_all = Cpt(EpicsSignal, "StopAll")
erase_start = Cpt(EpicsSignal, "EraseStart")
erase = Cpt(EpicsSignal, "EraseAll")
acquiring = Cpt(EpicsSignal, "Acquiring")
done_value = 0
# Preset info
preset_events = Cpt(EpicsSignal, "PresetEvents")
preset_live_time = Cpt(EpicsSignal, "PresetLive")
preset_real_time = Cpt(EpicsSignal, "PresetReal")
preset_mode = Cpt(EpicsSignal, "PresetMode", string=True)
preset_triggers = Cpt(EpicsSignal, "PresetTriggers")
# channels
ch1 = Cpt(Channel, "mca1")
ch2 = Cpt(Channel, "mca2")
ch3 = Cpt(Channel, "mca3")
dxp1 = Cpt(xMapDXP, "dxp1:")
mca1 = Cpt(mca, "mca1")
dxp2 = Cpt(xMapDXP, "dxp2:")
mca2 = Cpt(mca, "mca2")
dxp3 = Cpt(xMapDXP, "dxp3:")
mca3 = Cpt(mca, "mca3")
aquiring_time = 4
def stage(self):
self.old_aquiring_time=self.preset_real_time.get()
self.preset_real_time.set(self.aquiring_time)
def unstage(self):
self.preset_real_time.set(1)
self.preset_real_time.set(self.old_aquiring_time)
def trigger(self):
callback_signal = self.acquiring
#variable used as an event flag
acquisition_status = False
def acquisition_started(status):
nonlocal acquisition_status #Define as nonlocal as we want to modify it
acquisition_status = True
def check_value(*, old_value, value, **kwargs):
#Return True when the acquisition is complete, False otherwise.
if not acquisition_status: #But only report done if acquisition was already started
return False
return (value == self.done_value)
# create the status with SubscriptionStatus that add's a callback to check_value.
sta_cnt = SubscriptionStatus(callback_signal, check_value, run=False)
# Start the acquisition
sta_acq = self.erase_start.set(1)
......@@ -224,4 +130,4 @@ class xMap(EpicsDXPMultiElementSystem):
return stat
\ No newline at end of file
from ophyd import Device, EpicsMotor, EpicsSignal, EpicsSignalRO
from ophyd import Component as Cpt
from ophyd import PseudoPositioner, PseudoSingle
from ophyd.pseudopos import pseudo_position_argument, real_position_argument
import numpy as np
from ophyd.mca import EpicsMCARecord, ROI, EpicsDXPLowLevel, EpicsDXPBaseSystem#, EpicsDXPMultiElementSystem
from ophyd.areadetector import EpicsSignal as EpicsSignal
from ophyd.status import SubscriptionStatus, MoveStatus, AndStatus
class EpicsDXPMultiElementSystem(EpicsDXPBaseSystem):
# Preset info
preset_events = Cpt(EpicsSignal, "PresetEvents")
preset_live_time = Cpt(EpicsSignal, "PresetLive")
preset_real_time = Cpt(EpicsSignal, "PresetReal")
preset_mode = Cpt(EpicsSignal, "PresetMode", string=True)
preset_triggers = Cpt(EpicsSignal, "PresetTriggers")
# Acquisition
erase_all = Cpt(EpicsSignal, "EraseAll")
erase_start = Cpt(EpicsSignal, "EraseStart", trigger_value=1)
start_all = Cpt(EpicsSignal, "StartAll")
stop_all = Cpt(EpicsSignal, "StopAll")
# Status
set_acquire_busy = Cpt(EpicsSignal, "SetAcquireBusy")
acquire_busy = Cpt(EpicsSignal, "AcquireBusy")
status_all = Cpt(EpicsSignal, "StatusAll")
status_all_once = Cpt(EpicsSignal, "StatusAllOnce")
acquiring = Cpt(EpicsSignal, "Acquiring")
done_value = 0
# Reading
read_baseline_histograms = Cpt(EpicsSignal, "ReadBaselineHistograms")
read_all = Cpt(EpicsSignal, "ReadAll")
read_all_once = Cpt(EpicsSignal, "ReadAllOnce")
# As a debugging note, if snl_connected is not '1', your IOC is
# misconfigured:
snl_connected = Cpt(EpicsSignal, "SNL_Connected")
# Copying to individual elements
copy_adcp_ercent_rule = Cpt(EpicsSignal, "CopyADCPercentRule")
copy_baseline_cut_enable = Cpt(EpicsSignal, "CopyBaselineCutEnable")
copy_baseline_cut_percent = Cpt(EpicsSignal, "CopyBaselineCutPercent")
copy_baseline_filter_length = Cpt(EpicsSignal, "CopyBaselineFilterLength")
copy_baseline_threshold = Cpt(EpicsSignal, "CopyBaselineThreshold")
copy_decay_time = Cpt(EpicsSignal, "CopyDecayTime")
copy_detector_polarity = Cpt(EpicsSignal, "CopyDetectorPolarity")
copy_energy_threshold = Cpt(EpicsSignal, "CopyEnergyThreshold")
copy_gap_time = Cpt(EpicsSignal, "CopyGapTime")
copy_max_energy = Cpt(EpicsSignal, "CopyMaxEnergy")
copy_max_width = Cpt(EpicsSignal, "CopyMaxWidth")
copy_peaking_time = Cpt(EpicsSignal, "CopyPeakingTime")
copy_preamp_gain = Cpt(EpicsSignal, "CopyPreampGain")
copy_roic_hannel = Cpt(EpicsSignal, "CopyROIChannel")
copy_roie_nergy = Cpt(EpicsSignal, "CopyROIEnergy")
copy_roi_sca = Cpt(EpicsSignal, "CopyROI_SCA")
copy_reset_delay = Cpt(EpicsSignal, "CopyResetDelay")
copy_trigger_gap_time = Cpt(EpicsSignal, "CopyTriggerGapTime")
copy_trigger_peaking_time = Cpt(EpicsSignal, "CopyTriggerPeakingTime")
copy_trigger_threshold = Cpt(EpicsSignal, "CopyTriggerThreshold")
# do_* executes the process:
do_read_all = Cpt(EpicsSignal, "DoReadAll")
do_read_baseline_histograms = Cpt(EpicsSignal, "DoReadBaselineHistograms")
do_read_traces = Cpt(EpicsSignal, "DoReadTraces")
do_status_all = Cpt(EpicsSignal, "DoStatusAll")
# Time
dead_time = Cpt(EpicsSignal, "DeadTime")
elapsed_live = Cpt(EpicsSignal, "ElapsedLive")
elapsed_real = Cpt(EpicsSignal, "ElapsedReal")
idead_time = Cpt(EpicsSignal, "IDeadTime")
# low-level
read_low_level_params = Cpt(EpicsSignal, "ReadLLParams")
# Traces
read_traces = Cpt(EpicsSignal, "ReadTraces")
trace_modes = Cpt(EpicsSignal, "TraceModes", string=True)
trace_times = Cpt(EpicsSignal, "TraceTimes")
class EpicsDXP(Device):
"""All high-level DXP parameters for each channel"""
preset_mode = Cpt(EpicsSignal, "PresetMode", string=True)
# live_time_output = Cpt(EpicsSignal, "LiveTimeOutput", string=True)
# elapsed_live_time = Cpt(EpicsSignal, "ElapsedLive")
# elapsed_real_time = Cpt(EpicsSignal, "ElapsedReal")
# elapsed_trigger_live_time = Cpt(EpicsSignal, "ElapsedTriggerLiveTime")
# Trigger Filter PVs
trigger_peaking_time = Cpt(EpicsSignal, "TriggerPeakingTime")
trigger_threshold = Cpt(EpicsSignal, "TriggerThreshold")
trigger_gap_time = Cpt(EpicsSignal, "TriggerGapTime")
# trigger_output = Cpt(EpicsSignal, "TriggerOutput", string=True)
max_width = Cpt(EpicsSignal, "MaxWidth")
# Energy Filter PVs
peaking_time = Cpt(EpicsSignal, "PeakingTime")
energy_threshold = Cpt(EpicsSignal, "EnergyThreshold")
gap_time = Cpt(EpicsSignal, "GapTime")
# Baseline PVs
baseline_cut_percent = Cpt(EpicsSignal, "BaselineCutPercent")
baseline_cut_enable = Cpt(EpicsSignal, "BaselineCutEnable")
baseline_filter_length = Cpt(EpicsSignal, "BaselineFilterLength")
baseline_threshold = Cpt(EpicsSignal, "BaselineThreshold")
baseline_energy_array = Cpt(EpicsSignal, "BaselineEnergyArray")
baseline_histogram = Cpt(EpicsSignal, "BaselineHistogram")
baseline_threshold = Cpt(EpicsSignal, "BaselineThreshold")
# Misc PVs
preamp_gain = Cpt(EpicsSignal, "PreampGain")
detector_polarity = Cpt(EpicsSignal, "DetectorPolarity")
reset_delay = Cpt(EpicsSignal, "ResetDelay")
decay_time = Cpt(EpicsSignal, "DecayTime")
max_energy = Cpt(EpicsSignal, "MaxEnergy")
adc_percent_rule = Cpt(EpicsSignal, "ADCPercentRule")
max_width = Cpt(EpicsSignal, "MaxWidth")
# read-only diagnostics
triggers = Cpt(EpicsSignalRO, "Triggers", lazy=True)
events = Cpt(EpicsSignalRO, "Events", lazy=True)
overflows = Cpt(EpicsSignalRO, "Overflows", lazy=True)
underflows = Cpt(EpicsSignalRO, "Underflows", lazy=True)
input_count_rate = Cpt(EpicsSignalRO, "InputCountRate", lazy=True)
output_count_rate = Cpt(EpicsSignalRO, "OutputCountRate", lazy=True)
mca_bin_width = Cpt(EpicsSignalRO, "MCABinWidth_RBV")
calibration_energy = Cpt(EpicsSignalRO, "CalibrationEnergy_RBV")
current_pixel = Cpt(EpicsSignal, "CurrentPixel")
dynamic_range = Cpt(EpicsSignalRO, "DynamicRange_RBV")
# Preset options
preset_events = Cpt(EpicsSignal, "PresetEvents")
# preset_mode = Cpt(EpicsSignal, "PresetMode", string=True)
# preset_triggers = Cpt(EpicsSignal, "PresetTriggers")
# Trace options
trace_data = Cpt(EpicsSignal, "TraceData")
trace_mode = Cpt(EpicsSignal, "TraceMode", string=True)
trace_time_array = Cpt(EpicsSignal, "TraceTimeArray")
trace_time = Cpt(EpicsSignal, "TraceTime")
class xMapDXP(EpicsDXP):
pass
class mca(EpicsMCARecord, ROI):
pass
class xMap(EpicsDXPMultiElementSystem):
"""DXP xMap with 1 channel example"""
start_all = Cpt(EpicsSignal, "StartAll")
stop_all = Cpt(EpicsSignal, "StopAll")
erase_start = Cpt(EpicsSignal, "EraseStart")
erase = Cpt(EpicsSignal, "EraseAll")
acquiring = Cpt(EpicsSignal, "Acquiring")
done_value = 0
dxp1 = Cpt(xMapDXP, "dxp1:")
mca1 = Cpt(mca, "mca1")
dxp2 = Cpt(xMapDXP, "dxp2:")
mca2 = Cpt(mca, "mca2")
dxp3 = Cpt(xMapDXP, "dxp3:")
mca3 = Cpt(mca, "mca3")
aquiring_time = 4
def stage(self):
self.preset_real_time.set(self.aquiring_time)
def unstage(self):
self.preset_real_time.set(1)
def trigger(self):
self.old_aquiring_timegnal = self.acquiring
#variable used as an event flag
acquisition_status = False
def acquisition_started(status):
nonlocal acquisition_status #Define as nonlocal as we want to modify it
acquisition_status = True
def check_value(*, old_value, value, **kwargs):
#Return True when the acquisition is complete, False otherwise.
if not acquisition_status: #But only report done if acquisition was already started
return False
return (value == self.done_value)
# create the status with SubscriptionStatus that add's a callback to check_value.
sta_cnt = SubscriptionStatus(callback_signal, check_value, run=False)
# Start the acquisition
sta_acq = self.erase_start.set(1)
sta_acq.add_callback(acquisition_started)
stat = AndStatus(sta_cnt, sta_acq)
return stat
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment