Skip to content
Snippets Groups Projects
Commit e39afa39 authored by Simone Vadilonga's avatar Simone Vadilonga
Browse files

update

parent c1ad3f4a
No related branches found
No related tags found
No related merge requests found
from .resolve import *
from __future__ import with_statement
from apstools.callbacks.callback_base import FileWriterCallbackBase
from apstools.callbacks.nexus_writer import NXWriter
from bluesky.callbacks.zmq import RemoteDispatcher
import time
import databroker
from datetime import datetime, timezone
import h5py
import logging
import numpy as np
import os
import yaml
import json
import argparse
from bluesky.callbacks.zmq import Proxy
import pandas as pd
logger = logging.getLogger("emiltools")
NEXUS_FILE_EXTENSION = "nxs" # use this file extension for the output
NEXUS_RELEASE = "v2020.1" # NeXus release to which this file is written
class CSVCallback(FileWriterCallbackBase):
def __init__(self,_receivers=[], file_path = None):
super().__init__()
self.receivers = _receivers
self.file_path= file_path
def start(self,doc):
"""
Extend start function fromparentclass callback_base.py to capture
diffenert writing logic
"""
super().start(doc)
self.start_doc = doc
for receiver in self.receivers:
receiver.collectMetaData(doc)
def event(self, doc):
"""
a single "row" of data
"""
if not self.scanning:
return
# uid = doc["uid"]
descriptor_uid = doc["descriptor"]
# seq_num = doc["seq_num"]
# gather the data by streams
descriptor = self.acquisitions.get(descriptor_uid)
if descriptor is not None:
for k, v in doc["data"].items():
data = descriptor["data"].get(k)
if data is None:
print(
f"entry key {k} not found in"
f" descriptor of {descriptor['stream']}"
)
else:
data["data"].append(v)
data["time"].append(doc["time"]) #take the event time rather than the time the individual item returned
def make_file_name(self):
"""
generate a file name to be used
"""
date_string = datetime.now().strftime('%Y_%m_%d')
fname = f"{self.scan_id:05d}"
fname += f"_{self.uid[:7]}"
path = os.path.abspath(self.file_path or os.getcwd())
path = os.path.join(path,date_string)
path = os.path.join(path,'csv')
os.makedirs(path, exist_ok=True)
return os.path.join(path, fname)
def writer(self):
for stream_name, stream_data in sorted(self.streams.items()):
export_d = {}
for data_key, data in self.acquisitions[stream_data[0]]['data'].items():
#Test if the data is just one dimension, if not don't export to csv
if data['shape']==[] and data['external']==False:
export_d[data_key] = data['data']
export_d['time'] = data['time']
export_df = pd.DataFrame(data=export_d)
if 'time' in export_df:
export_df = export_df[ ['time'] + [ col for col in export_df.columns if col != 'time' ] ]
#save the dataframe of this element of the stream (like biologic, baseline, primary) as a csv file
fname = (self.file_name or self.make_file_name())+ "_" + str(stream_name)+".csv"
try:
export_df.to_csv(fname)
#Then export each file to the requested remote locations
for receiver in self.receivers:
receiver.sent(fname)
except Exception as e:
logger.error(f"CSV Export: Error Exporting {fname}: {e} ")
#Write the metadata to a json file and export that
json_object = json.dumps(self.start_doc, indent =4)
fname = (self.file_name or self.make_file_name())+ "_" + str("meta.json")
try:
with open(fname, "w") as outfile:
outfile.write(json_object)
#Then export each file to the requested remote locations
for receiver in self.receivers:
receiver.sent(fname)
except Exception as e:
logger.error(f"CSV Export: Error Exporting {fname}: {e} ")
def clear(self):
super().clear()
self.start_doc = {}
class NXWriterBESSY(NXWriter):
"""
BESSYII X-Ray Source Specific class for writing HDF5/NeXus file (using APS NXWriter class).
Conforms to NXArchiver Standard.
Methods of NXWriter are overwritten to make things specific to BESSY
One scan is written to one HDF5/NeXus file at location specified by self.file_path with name
given by self.make_file_name
EXAMPLE write a run from a catalog
nxwriter = NXWriter()
nxwriter.file_path = os.getcwd() # This is also the default
nxwriter.instrument_name = 'SISSY1'
run = db[-1]
nxwriter.export_run(run)
EXAMPLE subscribe a RE
RE.subscribe(nxwriter.receiver)
METHODS
.. autosummary::
~export_run
~make_file_name
~write_entry
~write_instrument
~write_sample
~write_source
~write_user
"""
# convention: methods written in alphabetical order
def __init__(self,_receivers=[], file_path=None):
super().__init__()
self.receivers = _receivers
self.file_path = file_path
def export_run(self,run):
"""
export a single run from a catalog to a single file
"""
if isinstance(run, databroker.core.BlueskyRun):
h = run.catalog_object.v1[run.name] # header
for key,doc in h.db.get_documents(h):
self.receiver(key, doc)
else:
raise ValueError(f'{run}: is not an instance of databroker.core.BlueskyRun')
def make_file_name(self):
"""
generate a file name to be used
"""
date_string = datetime.now().strftime('%Y_%m_%d')
fname = f"{self.scan_id:05d}"
fname += f"_{self.uid[:7]}.{self.file_extension}"
path = os.path.abspath(self.file_path or os.getcwd())
path = os.path.join(path,date_string)
path = os.path.join(path,'nx')
os.makedirs(path, exist_ok=True)
return os.path.join(path, fname)
def start(self,doc):
"""
Extend start function fromparentclass callback_base.py to capture
diffenert writing logic
"""
super().start(doc)
for receiver in self.receivers:
receiver.collectMetaData(doc)
def writer(self):
"""
write collected data to HDF5/NeXus data file
"""
fname = self.file_name or self.make_file_name()
with h5py.File(fname, "w") as self.root:
self.write_root(fname)
self.root = None
logger.info(f"wrote NeXus file: {fname}") # lgtm [py/clear-text-logging-sensitive-data]
self.output_nexus_file = fname
for receiver in self.receivers:
receiver.sent(fname)
def write_entry(self):
"""
group: /entry/data:NXentry
"""
nxentry = self.create_NX_group(
self.root, self.root.attrs["default"] + ":NXentry"
)
nxentry.create_dataset(
"start_time",
data=datetime.fromtimestamp(
self.start_time
).isoformat(),
)
nxentry.create_dataset(
"end_time",
data=datetime.fromtimestamp(
self.stop_time
).isoformat(),
)
ds = nxentry.create_dataset(
"duration", data=self.stop_time - self.start_time
)
ds.attrs["units"] = "s"
nxentry.create_dataset("program_name", data="bluesky")
self.write_instrument(nxentry) # also writes streams and metadata
try:
nxdata = self.write_data(nxentry)
nxentry.attrs["default"] = nxdata.name.split("/")[-1]
except KeyError as exc:
logger.warning(exc)
self.write_sample(nxentry)
self.write_user(nxentry)
# apply links
#h5_addr = "/entry/instrument/source/cycle"
#if h5_addr in self.root:
# nxentry["run_cycle"] = self.root[h5_addr]
#else:
# logger.warning("No data for /entry/run_cycle")
nxentry["title"] = self.get_sample_title()
nxentry["plan_name"] = self.root[
"/entry/instrument/bluesky/metadata/plan_name"
]
nxentry["entry_identifier"] = self.root[
"/entry/instrument/bluesky/uid"
]
return nxentry
def write_metadata(self, parent):
"""
group: /entry/instrument/bluesky/metadata:NXnote
metadata from the bluesky start document
"""
group = self.create_NX_group(parent, "metadata:NXnote")
ds = group.create_dataset("run_start_uid", data=self.uid)
ds.attrs["long_name"] = "bluesky run uid"
ds.attrs["target"] = ds.name
for k, v in self.metadata.items():
is_yaml = False
if isinstance(v, (dict, tuple, list)):
# fallback technique: save complicated structures as YAML text
v = yaml.dump(v)
is_yaml = True
if isinstance(v, str):
v = self.h5string(v)
if v==None:
v = self.h5string('None')
ds = group.create_dataset(k, data=v)
ds.attrs["target"] = ds.name
if is_yaml:
ds.attrs["text_format"] = "yaml"
return group
def write_instrument(self, parent):
"""
group: /entry/instrument:NXinstrument
"""
nxinstrument = self.create_NX_group(
parent, "instrument:NXinstrument"
)
bluesky_group = self.create_NX_group(
nxinstrument, "bluesky:NXnote"
)
md_group = self.write_metadata(bluesky_group)
self.write_streams(bluesky_group)
bluesky_group["uid"] = md_group["run_start_uid"]
bluesky_group["plan_name"] = md_group["plan_name"]
try:
self.assign_signal_type()
except KeyError as exc:
logger.warning(exc)
self.write_slits(nxinstrument)
try:
self.write_detector(nxinstrument)
except KeyError as exc:
logger.warning(exc)
#self.write_monochromator(nxinstrument)
try:
self.write_positioner(nxinstrument)
except KeyError as exc:
logger.warning(exc)
self.write_source(nxinstrument)
return nxinstrument
def write_sample(self, parent):
"""
group: /entry/sample:NXsample
"""
sample = None
try:
sample = self.metadata['sample']
except KeyError as exc:
logger.warning("no %s defined -- not creating sample group", str(exc))
return
nxsample = self.create_NX_group(parent, "sample:NXsample")
if isinstance(sample, dict):
for k, v in sample.items():
nxsample[k] = v
elif isinstance(sample, str):
nxsample.create_dataset("name", data=sample)
return nxsample
def write_source(self, parent):
"""
group: /entry/instrument/source:NXsource
"""
nxsource = self.create_NX_group(parent, "source:NXsource")
ds = nxsource.create_dataset("name", data="BESSY II")
nxsource.create_dataset("type", data="Synchrotron X-Ray Source")
nxsource.create_dataset("probe", data="x-ray")
return nxsource
def write_stream_external(
self, parent, d, subgroup, stream_name, k, v
):
"""
Must be called in a proxy since areaDetector file writer in stream mode
does not close and write the file until after the plan has finished
"""
# TODO: rabbit-hole alert! simplify
# lots of variations possible
# count number of unique resources (expect only 1)
resource_id_list = []
for datum_id in d:
resource_id = self.externals[datum_id]["resource"]
if resource_id not in resource_id_list:
resource_id_list.append(resource_id)
if len(resource_id_list) != 1:
raise ValueError(
f"{len(resource_id_list)}"
f" unique resource UIDs: {resource_id_list}"
)
fname = self.getResourceFile(resource_id)
logger.info("reading %s from EPICS AD data file: %s", k, fname)
num_retries = 10
attempt_no = 1
success = 0
time.sleep(1)
while attempt_no < num_retries and success == 0:
try:
with h5py.File(fname, "r") as hdf_image_file_root:
h5_obj = hdf_image_file_root["/entry/data/data"]
ds = subgroup.create_dataset(
"value",
data=h5_obj[()],
compression="lzf",
# compression="gzip",
# compression_opts=9,
shuffle=True,
fletcher32=True,
)
ds.attrs["target"] = ds.name
ds.attrs["source_file"] = fname
ds.attrs["source_address"] = h5_obj.name
ds.attrs["resource_id"] = resource_id
ds.attrs["units"] = ""
logger.info(f"sucessfully written {k} from {fname}")
success = 1
subgroup.attrs["signal"] = "value"
except BlockingIOError as error:
if attempt_no < (num_retries + 1):
logger.warning(f"could not open cam image file after {attempt_no} attemps, waiting")
time.sleep(3)
attempt_no = attempt_no -1
else:
logger.warning(f"Failed to open cam image file after {attempt_no} attemps, likely because other callbacks took ages")
raise error
def write_user(self, parent):
"""
group: /entry/contact:NXuser
"""
keymap = dict(
name='user_name',
facility_user_id='user_profile'
)
try:
links = {k: self.metadata[v] for k, v in keymap.items()}
except KeyError as exc:
logger.warning("no %s defined -- not creating user group", str(exc))
return
nxuser = self.create_NX_group(parent, "user:NXuser")
nxuser.create_dataset("role", data="contact")
for k, v in links.items():
nxuser[k] = v
return nxuser
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment