Source code for caproto_sandbox.simple_daq.server

#!/usr/bin/env python3
"""
Simple IOC based on caproto library.
It has
"""
from caproto.server import pvproperty, PVGroup, ioc_arg_parser, run
from textwrap import dedent
from pdb import pm

from numpy import random, array, zeros, ndarray, nan, isnan
from time import time, sleep, ctime



[docs]class Server(PVGroup): """ An IOC with three uncoupled read/writable PVs Scalar PVs ---------- CPU MEMORY BATTERY Vectors PVs ----------- """ CPU = pvproperty(value=0.0, read_only = True, dtype = float, precision = 1, units = '%') MEMORY = pvproperty(value=0.0, read_only = True, dtype = float, precision = 1, units = 'GB') BATTERY = pvproperty(value=0.0, read_only = True, dtype = float, precision = 1, units = '%') TIME = pvproperty(value='time unknown', read_only = True, dtype = str) dt = pvproperty(value=1.0, precision = 3, units = 's') @CPU.startup async def CPU(self, instance, async_lib): """ This method will be called when the server starts up and first initialization of PV with name CPU. """ self.io_pull_queue = async_lib.ThreadsafeQueue() self.io_push_queue = async_lib.ThreadsafeQueue() self.device.io_push_queue = self.io_push_queue self.device.io_pull_queue = self.io_pull_queue # Loop and grab items from the response queue one at a time while True: io_dict = await self.io_push_queue.async_get() # Propagate the keypress to the EPICS PV, triggering any monitors # along the way for key in list(io_dict.keys()): if key == 'TIME': await self.TIME.write(ctime(io_dict[key])) elif key == 'CPU': await self.CPU.write(io_dict[key]) elif key == 'MEMORY': await self.MEMORY.write(io_dict[key]/1024/1024/1024) elif key == 'BATTERY': await self.BATTERY.write(io_dict[key]) elif key == 'dt': await self.dt.write(io_dict[key]) @dt.putter async def dt(self, instance, value): """ this function is called everytime the value of dt is changed in the server's database, whether it is changed externally or internally. """ print('Received update for the {}, sending new value {}'.format('dt',value)) self.device.dt = value return value
def run_server(name = 'simple_daq'): from caproto_sandbox.simple_daq.driver import Driver from caproto_sandbox.simple_daq.device import Device from caproto_sandbox.simple_daq.server import Server import sys print(sys.argv) sys.argv.append('--list-pvs') driver = Driver() device = Device(driver = driver) device.start() ioc_options, run_options = ioc_arg_parser( default_prefix=f'{name}:', desc=dedent(Server.__doc__)) server = Server(**ioc_options) # pass the device instance into the server instance for bidirectional communication server.device = device run(server.pvdb, **run_options) if __name__ == '__main__': from caproto_sandbox.simple_daq.driver import Driver from caproto_sandbox.simple_daq.device import Device import sys driver = Driver() device = Device(driver = driver) device.start() ioc_options, run_options = ioc_arg_parser( default_prefix='simple_daq:', desc=dedent(Server.__doc__)) server = Server(**ioc_options) # pass the device instance into the server instance for bidirectional communication server.device = device print(sys.argv) run(server.pvdb, **run_options)