#!/usr/bin/env python3
"""
Simple IOC based on caproto library.
It has
"""
from caproto.server import pvproperty, PVGroup, ioc_arg_parser, run
import caproto
from textwrap import dedent
from pdb import pm
from numpy import random, array, zeros, ndarray, nan, isnan
from time import time,sleep
from pickle import dumps, loads
from circular_buffer_numpy import circular_buffer
[docs]class Device(object):
def __init__(self, driver):
"""
initialization of the instance and creation of all other instances and variables
"""
from circular_buffer_numpy.circular_buffer import CircularBuffer
self.buffer = CircularBuffer(shape = (1000,4))
self.dt = 1
self.running = False
self.header = ['time','cpu','memory','battery']
self.io_push_queue = None
self.io_put_queue = None
self.driver = driver
self.threads = {}
[docs] def run_once(self):
"""
the single execution of a code that later is looped in while running=true loop
"""
#
arr = self.driver.read()
self.buffer.append(arr)
#push to IO for publishing
io_dict = {}
io_dict['TIME'] = arr[0,0]
io_dict['CPU'] = arr[0,1]
io_dict['MEMORY'] = arr[0,2]
io_dict['BATTERY'] = arr[0,3]
io_dict['LIST'] = [arr[0,0],arr[0,1],arr[0,2],arr[0,3]]
self.io_push(io_dict)
[docs] def run(self):
"""
while running = True loop that executes run_once() in a loop on a timer.
"""
from time import sleep, time
self.running = True
while self.running:
t1 = time()
self.run_once()
t2 = time()
dt = t2-t1
sleep(self.dt-dt)
[docs] def start(self):
"""
start the while running=True loop(function run()) in a separate thread.
"""
from ubcs_auxiliary.multithreading import new_thread
self.threads['running'] = new_thread(self.run)
[docs] def stop(self):
"""
stop the while running=True loop(function run()) in a separate thread.
"""
self.running = False
[docs] def set_dt(self,value):
"""
wrapper to set dT
"""
self.dt = value
[docs] def get_dt(self):
"""
wrapper to get dT
"""
return self.dt
[docs] def io_push(self,io_dict = None):
"""
wrapper to push the updates into CA server for further publishing on the network.
"""
if self.io_push_queue is not None:
self.io_push_queue.put(io_dict)
if __name__ == '__main__':
from caproto_sandbox.simple_daq.driver import Driver
driver = Driver()
device = Device(driver = driver)
device.start()