#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Cavro Centris Syringe pump device
author: Valentyn Stadnytskyi
Created: May 28 2019
Last modified: May 28 2019
"""
__version__ = '0.0.0'
from syringe_pump.driver import Driver
import traceback
from pdb import pm
from numpy import nan, mean, std, nanstd, asfarray, asarray, hstack, array, concatenate, delete, round, vstack, hstack, zeros, transpose, split, unique, nonzero, take, savetxt, min, max
from time import time, sleep, clock
import sys
import struct
from pdb import pm
from time import gmtime, strftime, time
from logging import debug,info,warning,error
from struct import pack, unpack
from timeit import Timer, timeit
[docs]class Device(object):
def __init__(self):
#Thread.__init__(self)
self.running = False
#self.daemon = False # OK for main thread to exit even if instance is still running
self.description = ''
#circular buffers dictionary contains information about all circular buffers and their type (Server, Client or Queue)
self.circular_buffers = {}
self.command_queue = []
self.position = 0.0
self.velocity = 0.0
self.speed = 0.0
self.flow_speed_high_limit = 5.0
self.dpos = 0.002
self.low_level_limit_alarm = 5.0
self.low_level_limit_warning = 10.0
self.running = 0
self.scan_period = 0.001
self.default_scan_period = 1.0
self.io_put_queue = None
self.io_get_queue = None
# ############################################################################
# Basic IOC operations
# ############################################################################
[docs] def first_time_setup(self):
"""default factory setting or first time setup"""
raise NotImplementedError
[docs] def init(self, pump_id = None, speed = None, backlash = None, orientation = None, volume = None):
"""
initialize the device level code
- initializes the driver
- sets default valve position to 'o'
- sets initial position to 0.0
- sets initial speed to input speed value
Parameters
----------
pump_id: integer
pump_id
speed: float
initial speed of the syringe pump, default is 25
backlash: float
the backlash of the syringe pump. The default value is 100
orientation: string
the orientation of the syringe pump valve: Y or Z
volume: float
the volume of the installed syringe
Returns
-------
Examples
--------
>>> device.init(pump_id = 1, speed = 25, backlash = 100,orientation = 'Y', volume = 250)
"""
from threading import RLock
self.lock = RLock()
from circular_buffer_numpy.circular_buffer import CircularBuffer
self.buffers = {}
self.buffers['position'] = CircularBuffer(shape = (1*3600*2,2), dtype = 'float64')
from syringe_pump.driver import Driver
self.pump_id = pump_id
self.driver = Driver()
self.name = 'NIH_syringe_pump_'+ str(pump_id)
self.prefix = 'NIH:SYRINGE' + str(pump_id)
if pump_id is not None:
self.driver.init(pump_id, speed = speed, backlash = backlash, orientation = orientation, volume = volume)
self.speed = speed
self.cmd_position = 0.0
self.set_valve(b'o')
self.set_speed(speed)
[docs] def abort(self):
"""
orderly abort of the current operation
Parameters
----------
Returns
-------
Examples
--------
>>> device.abort()
"""
debug('device.abort')
reply = self.driver.abort()
self.process_driver_reply(reply)
t1 = time()
flag = True
while self.get_busy():
sleep(0.1)
if time() - t1 > 10.0:
flag = False
break
[docs] def close(self):
"""
orderly close of the serial port and shutdown
Parameters
----------
Returns
-------
Examples
--------
>>> device.close()
"""
self.stop()
self.abort()
#self.cleanup()
self.driver.close()
[docs] def kill(self):
"""
orderly close of the serial port and shutdown of the device and deletion of the instance
Parameters
----------
Returns
-------
Examples
--------
>>> device.kill()
"""
self.close()
del self
[docs] def help(self):
"""returns help information in a string format.
Parameters
----------
Returns
-------
Examples
--------
>>> device.run_once()
"""
raise NotImplementedError
[docs] def snapshot(self):
"""returns snapshot of current PVs and their values in a dictionary format
Parameters
----------
Returns
-------
Examples
--------
>>> device.run_once()
"""
raise NotImplementedError
[docs] def start(self):
"""
starts run() in a separate thread
Parameters
----------
Returns
-------
Examples
--------
>>> device.run_once()
"""
from ubcs_auxiliary.threading import new_thread
new_thread(self.run)
[docs] def stop(self):
"""
stop run() in the separate thread
Parameters
----------
Returns
-------
Examples
--------
>>> device.run_once()
"""
self.running = False
self.iowrite(".RUNNING",value = self.running)
[docs] def run_once(self):
"""
run once
Parameters
----------
Returns
-------
Examples
--------
>>> device.run_once()
"""
debug('run_once')
self.position = self.get_position()
if not self.busy:
self.scan_period = self.default_scan_period
self.iowrite(pv_name = "DMOV",value = self.isdonemoving())
self.iowrite(pv_name = "RBV",value = self.position)
def run(self):
""""""
self.running = True
self.iowrite("RUNNING",value = self.running)
while self.running:
t = time()
self.run_once()
while time() - t < self.scan_period:
sleep(0.1)
self.running = False
self.iowrite("RUNNING",value = self.running)
def get_busy(self):
reply = self.driver.busy()
value = self.process_driver_reply(reply)
if value is not None:
return bool(int(self.process_driver_reply(reply)))
else:
return None
[docs] def monitor(self,PV_name,value,char_value):
"""Process PV change requests"""
info("monitor: %s = %r" % (PV_name,value))
if PV_name == self.prefix + "VAL":
self.set_cmd_position(value)
if PV_name == self.prefix + "VELO":
self.set_speed_on_the_fly(value)
if PV_name == self.prefix + "VALVE":
self.set_valve(value)
[docs] def command_monitor(self,PV_name,value,char_value):
"""Process PV change requests"""
from pickle import loads
info("command_monitor: %s = %r" % (PV_name,value))
cmd_dict = {'abort':[],
'fill':[],
'empty':[],
'prime':['N'],
'flow':['position','speed']
}
if PV_name == self.prefix+"CMD":
cmd_in = loads(value)
try:
if cmd_in.keys()[0] == 'abort':
self.abort()
if cmd_in.keys()[0] == 'fill':
self.fill()
if cmd_in.keys()[0] == 'empty':
self.empty()
if cmd_in.keys()[0] == 'prime':
self.prime(N = cmd_in['prime']['N'])
if cmd_in.keys()[0] == 'flow':
position = cmd_in['flow']['position']
speed = cmd_in['flow']['speed']
self.flow(position = position, speed = speed)
except:
error(traceback.format_exc())
# input-output(io) controller wrappers
[docs] def iowrite(self, pv_name = None, value = None):
"""
put dictionary of key:value pairs to IO.
Parameters
----------
pv_name: (string)
string name of the PV
value: (string,list,integer,float,array)
the new value to be submitted to io for processing.
Returns
-------
Examples
--------
>>> device.ioput(pv_name = '.running',value = True)
"""
if self.io_put_queue is not None:
debug(f"iowrite got request {pv_name},{value}")
self.io_put_queue.put({pv_name: value})
else:
debug(f"no IO is linked to the device. Couldn't process {pv_name}")
[docs] def ioread(self, pv_name = None, value = None):
"""
put dictionary of key:value pairs to IO.
Parameters
----------
pv_dict: (dictionary)
dictionary of PVs and new PV values
Returns
-------
Examples
--------
>>> device.ioput(pv_name = '.running',value = False)
"""
raise NotImplementedError
[docs] def ioexecute(self, pv_name = None, value = None, **kwargs):
"""
executes command arrived from IO
Parameters
----------
pv_name: (string)
string name of the PV
value: (string,list,integer,float,array)
the new value to be submitted to io for processing.
Returns
-------
Examples
--------
>>> device.ioexecute(pv_name = '.running',value = False)
"""
print(f"ioexecute received: {pv_name},{value}")
if pv_name == 'VAL':
print(value,type(value))
if type(value) == float:
self.move_abs(value)
else:
warning(f'the input value {pv_name} for PV {value} is not float')
if pv_name == 'VELO':
print(value,type(value))
if type(value) == float:
self.set_speed_on_the_fly(value)
else:
warning(f'the input value {pv_name} for PV {value} is not float')
if pv_name == 'VALVE':
print(f'{value},{type(value)}')
value = value.lower()
if value == 'o' or value == 'i' or value == 'b':
self.set_valve(value)
else:
warning(f'the input value {pv_name} for PV {value} is not float')
if pv_name == 'CMD':
print(f'{value},{type(value)}')
####################################################################################################
### device specific functions
####################################################################################################
def home(self):
with self.lock:
self.set_status('homing...')
self.driver.home()
self.iowrite(".cmd_HOME",value = 0)
self.iowrite(".VELO",value = self.speed)
self.iowrite(".VAL",value = self.cmd_position)
self.set_status('homing complete')
[docs] def get_position(self):
"""
request position of the pump via driver
Parameters
----------
Returns
-------
value: float
current pump position
Examples
--------
>>> device.get_position()
0.0
"""
from numpy import nan, zeros
reply = self.driver._get_position()
value = self.process_driver_reply(reply)
if value is None:
value = nan
self.position = round(float(value),3)
arr = zeros((1,2))
arr[0,0] = time()
arr[0,1] = self.position
self.buffers['position'].append(arr)
return self.position
def set_position(self, value):
self.set_cmd_position(value)
[docs] def get_cmd_position(self):
"""
get commanded position of the pump
Parameters
----------
Returns
-------
value: float
last known commanded position
Examples
--------
>>> device.get_cmd_position()
0.0
"""
return self.cmd_position
[docs] def set_cmd_position(self,value):
"""
set commanded position of the pump
Parameters
----------
value: float
last known commanded position
Returns
-------
Examples
--------
>>> device.set_cmd_position(value = 25.0)
"""
reply = self.driver._set_position(value)
self.cmd_position = value
self.scan_period = 0.001
[docs] def get_speed(self):
"""
read speed from the pump
Parameters
----------
Returns
-------
speed: float
returns pump speed
Examples
--------
>>> device.get_speed()
25.0
"""
reply = self.driver.get_speed()
self.speed = self.process_driver_reply(reply)
return self.speed
[docs] def set_speed_on_the_fly(self,value):
"""
sets speed on the fly. speed values above 68.8 will be ignored.
Parameters
----------
speed: float
pump speed
Returns
-------
Examples
--------
>>> device.set_speed_on_the_fly(25.0)
"""
reply = self.driver._set_speed_on_the_fly(value)
debug(f'reply: {reply}')
temp = self.process_driver_reply(reply)
debug(f'set_speed_on_the_fly: {reply}, {temp}')
self.speed = value
debug(f'set_speed_on_the_fly: {self.speed}')
[docs] def set_speed(self,value):
"""
sets speed of the pump. no limits on the input value
Parameters
----------
speed: float
pump speed
Returns
-------
Examples
--------
>>> device.set_speed(25.0)
"""
reply = self.driver.set_speed(value)
temp = self.process_driver_reply(reply)
self.speed = value
def set_status(self, value = ''):
value = str(value)
self.iowrite('.STATUS', value)
self.status = value
[docs] def isdonemoving(self):
"""
return flag if motion is complete. It will compare the cmd_position and actual position
Parameters
----------
Returns
-------
flag: boolean
boolean if motor is moving or not
Examples
--------
>>> device.isdonemoving()
True
"""
flag = abs(self.cmd_position - self.position) < self.dpos
return flag
@property
def moving(self):
"""
an alias for busy
"""
response = self.busy
return response
def move_abs(self,position = 0, speed = None):
self.driver.abort()
if speed is None:
speed = self.speed
self.scan_period = 0.001
self.cmd_position = position
response = self.driver.move_abs(position = position, speed = speed)
return response
[docs] def get_valve(self):
"""
reads valve orientation from the pump
Parameters
----------
Returns
-------
valve: string
valve orientation as a string" 'i','o','b'
Examples
--------
>>> device.get_valve()
'i'
"""
reply = self.driver.valve
value = self.process_driver_reply(reply)
self.valve = value
return value
[docs] def set_valve(self,value):
"""
writes valve orientation into the pump
Parameters
----------
valve: string
valve orientation as a string" 'i','o','b'
Returns
-------
Examples
--------
>>> device.get_valve('i')
"""
self.driver.set_valve(value)
self.valve = value
[docs] def process_driver_reply(self,reply):
"""
waits for the syringe to finish the previous task. The while loop checks every dt for the status of the syringe pump.
Parameters
----------
dt: float
period in seconds at which the ump status is checked.
Returns
-------
Examples
--------
>>> reply = {'value': '', 'error_code': '`', 'busy': False, 'error': 'No Error'}
>>> device.process_driver_reply(reply)
"""
from time import time
debug('process_driver_reply')
if reply is not None:
self.busy = reply['busy']
self.error_code = reply['error_code']
self.error = reply['error']
self.last_reply_process = time()
value = reply['value']
if False:
self.iowrite("MOVN",value = self.moving)
self.iowrite("ERROR",value = self.error)
self.iowrite("ERROR_CODE",value = self.error_code)
elif False:
self.iowrite("ERROR",value = 'Communication Error')
self.iowrite("ERROR_CODE",value = '!')
else:
logging.warning('warning in process_driver_reply: reply = {}'.format(reply))
value = None
return value
[docs] def get_alarm(self):
"""
returns integer if alarm conditions are met
"""
if self.position <= self.low_level_limit_alarm:
string = 'current position {} below low level limit {}'.format(self.position, self.low_level_limit_alarm)
else:
string = ''
return string
[docs] def get_warning(self):
"""
returns integer if alarm conditions are met
"""
if self.position <= self.low_level_limit_warning:
flag = 'current position {} below low level limit {}'.format(self.position, self.low_level_limit_warning)
else:
flag = ''
return flag
#Compound actions
[docs] def wait(self, dt = 0.34):
"""
waits for the syringe to finish the previous task. The while loop checks every dt for the status of the syringe pump.
Parameters
----------
dt: float
period in seconds at which the ump status is checked.
Returns
-------
Examples
--------
>>> device.wait(dt = 0.34)
"""
busy = self.get_busy()
while self.busy:
sleep(dt)
busy = self.get_busy()
[docs] def prime(self, N = 5):
"""
performs safe compound "prime" command which empties and fills the syringe N times The final state of the valve is 'out'. The final state of the syringe is full.
Parameters
----------
N: ingteger
number of times to empty and fill. Default = 5 which is good enough.
Returns
-------
Examples
--------
>>> device.empty()
"""
start_speed = self.speed
self.set_speed(68.0)
self.set_valve('i')
self.wait()
for i in range(N):
self.set_cmd_position(0.0)
self.wait()
self.set_cmd_position(250.0)
self.wait()
self.set_speed(start_speed)
self.set_valve('o')
busy = self.get_busy()
[docs] def empty(self):
"""
performs safe compound "empty" command which empties the syringe with fluid back into the reservour. The final state of the valve is 'out'.
Parameters
----------
Returns
-------
Examples
--------
>>> device.empty()
"""
start_speed = self.speed
self.set_speed(100.0)
self.set_valve('i')
self.wait()
self.set_cmd_position(0.0)
self.wait()
self.set_speed(start_speed)
self.set_valve('o')
[docs] def fill(self, volume = None):
"""
performs safe compound "fill" command which refills the syringe with fluid from the reservour. The final state of the valve is 'out'.
Parameters
----------
volume :: float
volume in uL to fill up to.
Returns
-------
Examples
--------
>>> device.fill(volume = 100.0)
"""
start_speed = self.speed
self.set_speed(100.0)
self.set_valve('i')
self.wait()
self.set_cmd_position(0.0)
self.wait()
if volume is None:
volume = 250.0
self.set_cmd_position(volume)
self.wait()
self.set_speed(start_speed)
self.set_valve('o')
[docs] def flow(self,position = 0, speed = 0.1):
"""
performs safe compound "flow" command which initits flow towards posotion with the given speed. Makes sure to set the valve orientation to 'o' output.
Parameters
----------
Returns
-------
Examples
--------
>>> device.flow(position = 25, speed = 0.1)
"""
self.abort()
if self.valve is not 'o':
self.set_valve('o')
if speed <= self.flow_speed_high_limit:
reply = self.move_abs(position = position, speed = speed)
self.process_driver_reply(reply)
else:
warning('the flow command received speed {} large than flow_speed_high_limit {}'.format(speed,self.flow_speed_high_limit))
[docs] def create_low_pressure(self,N = 1):
"""
performs safe compound "create_low_pressure" command which creates low pressure in syringe pump #2.
Parameters
----------
Returns
-------
Examples
--------
>>> device.create_low_pressure(N = 1)
"""
from time import sleep
self.abort()
for i in range(N):
self.set_valve('i')
sleep(0.3)
while self.busy:
sleep(0.1)
self.move_abs(250,65)
sleep(1)
while self.busy:
sleep(0.1)
self.set_valve('o')
sleep(0.3)
while self.busy:
sleep(0.1)
self.move_abs(0,65)
sleep(1)
while self.busy:
sleep(0.1)
[docs] def parse_cmd_string(self,string):
"""
"""
raise NotImplementedError
[docs] def execute_cmd(self,command):
"""
executes command from CMD(command) PV.
Parameters
----------
command: string
Returns
-------
Examples
--------
>>> device.execute_cmd(command = 'exec:flow(position = 25,speed = 0.1)')
"""
raise NotImplementedError
if __name__ == "__main__": #for testing
from tempfile import gettempdir
import logging
logging.basicConfig(filename=gettempdir()+'/syringe_pump_DL.log',
level=logging.DEBUG, format="%(asctime)s %(levelname)s: %(message)s")
pump = Device()
print("pump.init(1,0.1,100,'Y',250)")
print("pump.start()")