"""
DATAQ 4108 Device Level code
fully python3 compatible.
The main purpose of this module is to provide useful interface between DI-4108 and a server system level code that does all numbercrunching. This modules only job is to attend DI-4108 and insure that all data is collected and stored in a circular buffer.
"""
import traceback
import psutil, os
from numpy import nan, mean, std, nanstd, asfarray, asarray, hstack, array, concatenate, delete, round, vstack, hstack, zeros, transpose, split, unique, nonzero, take, savetxt, min, max
import sys
if sys.version_info[0] == 3:
if sys.version_info[1] <= 7:
from time import gmtime, strftime, time, sleep, clock
else:
from time import gmtime, strftime, time, sleep
from time import perf_counter as clock
else:
from time import gmtime, strftime, time, sleep, clock
from pdb import pm
from logging import debug,info,warning,error
from timeit import Timer, timeit
from circular_buffer_numpy.queue import Queue
from ubcs_auxiliary.saved_property import DataBase, SavedProperty
from ubcs_auxiliary.threading import new_thread
from ubcs_auxiliary.advsleep import precision_sleep #home-made module for accurate sleep
[docs]class Device():
db = DataBase(root = 'TEMP', name = 'DI4108_DL')
#: serial number, five character long string
pr_serial_number = SavedProperty(db,'Serial Number', '00000').init()
pr_AI_channels = SavedProperty(db,'Number of AI',8).init()
pr_DI_channels = SavedProperty(db,'Number of DI',1).init()
pr_rate = SavedProperty(db,'rate(Hz)', 4000).init()
pr_baserate = SavedProperty(db,'baserate(Hz)', 20000).init()
pr_dec = SavedProperty(db,'dec', 5).init()
pr_DOI_config = SavedProperty(db,'DIO config', '1111111').init()
pr_DOI_set = SavedProperty(db,'DIO set', '1111111').init()
pr_packet_size = SavedProperty(db,'packet size', 64).init()
pr_packet_buffer_size = SavedProperty(db,'pr_packet_buffer_size', 1000).init()
pr_os_buffer = SavedProperty(db,'os_buffer', 12800).init()
pressure_sensor_offset = SavedProperty(db,'pressure_sensor_offset',
[69.5595, 65.562, 68.881375, 84.2195, 86.96075, 17.248, 17.322, 0]).init() # value read at atmoshperic pressure
pg_period = SavedProperty(db,'pg_period', 5.0).init()
pg_pre_pulse_width = SavedProperty(db,'pg_pre_pulse_width', 0.1).init()
pg_depre_pulse_width = SavedProperty(db,'pg_depre_pulse_width', 0.1).init()
pg_delay_width = SavedProperty(db,'pg_delay_width', 0.5).init()
trigger_mode = 0
DIO_default = 126
def __init__(self):
self.name = 'DI4108_DL'
self.device_info = {}
self.device_info['empty'] = 'empty'
self.task_name_dict = {}
self.task_name_dict['empty'] = 'empty'
self.pr_rate = (self.pr_baserate)/self.pr_dec
self.pr_buffer_size = (int(self.pr_packet_size*self.pr_rate),10)
self.queue = Queue(shape = self.pr_buffer_size, dtype = 'int16')
self.OverflowFlag = False
self.io_push_queue = None
self.io_put_queue = None
self.curr_dio = 127
self.user_set_dio = self.curr_dio
self.run_once_counter = 0
self.threads = {}
[docs] def first_time_setup(self):
"""
set control variables to factory settings
"""
self.pr_AI_channels = 8
self.pr_DI_channels = 1
self.pr_packet_size = 128
self.pr_baserate = 20000
self.pr_dec = 5
self.pr_rate = self.pr_baserate*1.0/self.pr_dec
self.pr_DOI_config = '1111111' #all outputs
self.pr_DOI_set = '1111111' #all high
self.pressure_sensor_offset = [69.5595, 65.562, 68.881375, 84.2195, 86.96075, 17.248, 17.322, 0]
[docs] def bind_driver(self, driver = None):
"""
bind driver to device instance
"""
self.driver = driver
def _set_priority(self):
import traceback
import platform #https://stackoverflow.com/questions/110362/how-can-i-find-the-current-os-in-python
p = psutil.Process(os.getpid()) #source: https://psutil.readthedocs.io/en/release-2.2.1/
# psutil.ABOVE_NORMAL_PRIORITY_CLASS
# psutil.BELOW_NORMAL_PRIORITY_CLASS
# psutil.HIGH_PRIORITY_CLASS
# psutil.IDLE_PRIORITY_CLASS
# psutil.NORMAL_PRIORITY_CLASS
# psutil.REALTIME_PRIORITY_CLASS
try:
if platform.system() == 'Windows':
p.nice(psutil.REALTIME_PRIORITY_CLASS)
elif platform.system() == 'Linux': #linux FIXIT
p.nice(-20) # nice runs from -20 to +12, where -20 the most not nice code(highest priority)
except:
error(trace.format_exc())
self.proccess = p
[docs] def init(self):
"""
initialize the DL program
"""
if self.driver is not None:
self.driver.init()
self.device_info = {**self.driver.get_hardware_information(),**self.device_info}
self.stop()
self.config_DL(baserate = self.pr_baserate,dec = self.pr_dec, DOI_config = self.pr_DOI_config, DOI_set = self.pr_DOI_set)
else:
warning('The driver object in the device _init() is {}'.format(driver))
[docs] def start(self):
"""
Create a new thread and submits self.run for execution
Parameters
----------
Returns
-------
Examples
--------
>>> device.start()
"""
from ubcs_auxiliary.threading import new_thread
self.threads['running'] = new_thread(self.run)
[docs] def stop(self):
"""
Orderly stop of the device code.
- stops data acquisiion
- erases all data from USB buffers
- set digital IO to all high
- closes usb port connection
Parameters
----------
Returns
-------
Examples
--------
>>> device.stop()
"""
self.running = False
self.driver.stop_scan()
self.driver.flush()
self.set_DIO('1111111') # turns all valves off
self.driver.close_port()
[docs] def kill(self):
"""
Orderly self-destruction. kills all threads and deletes the instance.
Parameters
----------
Returns
-------
Examples
--------
>>> device.kill()
"""
self.stop()
del self
[docs] def config_DL(self,baserate = None,dec = None, DOI_config = None, DOI_set = None):
"""
configure analog and digital channels, and rate via baserate and dec.
Parameters
----------
value :: string
Returns
-------
Examples
--------
>>> device.config_DL(baserate = 20000, dec = 5, DOI_config = '1111111',DOI_set = '1111111')
"""
debug('DL: config DL start')
self.driver.config_channels(baserate = baserate,
dec = dec,
conf_digital = int(DOI_config,2),
set_digital = int(DOI_set,2))
debug('DL: config DL end')
[docs] def config_DIO(self, value = '00000000'):
"""
configure direction of digital IOs. It takes a string with 8 characters, '0' or '1' where
'0' -
'1' -
Parameters
----------
value :: string
Returns
-------
Examples
--------
>>> device.config_DIO('00000000')
"""
if self.running:
driver.config_digital(int(string,2))
reply = None
else:
reply = driver.config_digital(int(string,2), echo = True)
return reply
[docs] def set_DIO(self, value = '00000000'):
"""
set digital input/output state
Parameters
----------
value :: string
Returns
-------
Examples
--------
>>> device.DIO = 127
"""
if isinstance(value,str):
value = int(value,2)
self.driver.set_digital(value)
self.io_push({'dio':value})
info('set DIO inside device daq library')
[docs] def set_outside_DIO(self, value = '00000000'):
"""
sets digital input/output state
Parameters
----------
value :: string
Returns
-------
Examples
--------
>>> device.DIO = 127
"""
if isinstance(value,str):
value = int(value,2)
self.driver.set_digital(value)
info('set DIO inside device daq library')
[docs] def get_DIO(self):
"""
get digital state from data queue
"""
return int(self.queue.peek_last_N(3)[-1,9])
DIO = property(get_DIO,set_DIO)
### Advance function
[docs] def run(self):
"""
This is a main fuction that will execute run_once function on the while running loop. Usually it gets submitted to a separate thread.
This function collects data and puts it in the Ring Buffer.
It is run in a separate thread(See main priogram)
Parameters
----------
Returns
-------
Examples
--------
>>> device.run()
"""
from time import strftime, localtime, time
info('Measurement thread has started')
self.driver.readall()
self.running = True
self.driver.start_scan()
while self.running:
self.run_once()
info('data acquisition thread has stopped')
[docs] def run_once(self):
"""
the repeated block executed in while running loop in the main run() thread
Parameters
----------
Returns
-------
Examples
--------
>>> device.run_once()
"""
self.run_once_counter +=1
from numpy import array
data, flag = (self.driver.get_readings(points_to_read = self.pr_packet_size))
self.data = data
debug(f'data from one reading cycle of the driver {data.shape}')
debug(f'the length of pressure sensor is offset{len(self.pressure_sensor_offset)}')
pressure_sensor_offset = array(self.pressure_sensor_offset)
if flag:
for i in range(8):
try:
data[:,i] = data[:,i] - pressure_sensor_offset[i]
except:
info('run_once')
error(traceback.format_exc())
data[:,8] = self.run_once_counter
self.queue.enqueue(data)
else:
info('buffer overflow')
self.running = False
#####Auxiliary functions
[docs] def parse_binary(self,value = 0):
"""
takes an integer and converts it to 8 bit representation as an array.
If float number is passed, it will be converted to int.
"""
from numpy import arange,ndarray,nan
value = int(value)
binary = format(value, '#010b')
arr = arange(7)
for i in range(7):
arr[i] = binary[9-i]
return arr
[docs] def unparse_binary(self,arr = array([1, 1, 1, 1, 1, 1, 1])):
"""
takes an integer and converts it to 8 bit representation as an array.
If float number is passed, it will be converted to int.
"""
from numpy import arange,ndarray,nan
integer = 0
for i in range(len(arr)):
integer += arr[i]*2**(i)
return integer
[docs] def set_pressure_sensor_offset(self,dt = 3):
"""
set pressure sensor offset
"""
# wait for dt seconds
# calculated offset and standard deviation
# assisgn first 8 mean values(analog values) to
# self.pressure_sensor_offset
from time import sleep
sleep(dt)
freq = int(self.pr_rate)
offsets = self.queue.peek_last_N(dt*freq).mean(axis = 0)
stds = self.queue.peek_last_N(dt*freq).std(axis = 0)
info('Pressure sensors offsets are %r and errors are %r ' %(offsets,stds))
self.pressure_sensor_offset = offsets[:8]
### Input-Output controller section
[docs] def io_push(self,io_dict = None):
"""
a wrapper that takes care of write command to the io module
Parameters
----------
io_dict :: dictionary
a string name of the variable
Returns
-------
Examples
--------
>>> self.io_push()
"""
if self.io_push_queue is not None:
self.io_push_queue.put(io_dict)
[docs] def io_pull(self, io_dict):
"""
a wrapper that takes care of 'read' command in the io module
Parameters
----------
io_dict :: dictionary
a key-value pairs
Returns
-------
Examples
--------
>>> self.io_pull(io_dict = {'key':'value'})
"""
if self.io_pull_queue is not None:
for key, value in io_dict.items:
info(f'received update to {key} to change to {value}')
if __name__ == "__main__": #for testing
import sys
flag = False
if sys.argv[1] == 'mock':
flag = True
from tempfile import gettempdir
import logging
logging.basicConfig(filename=gettempdir()+'/icarus_device.log',
level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
from icarus_nmr.device_daq import Device
device = Device()
if flag:
from icarus_nmr.driver_mock import Driver
else:
from icarus_nmr.driver_usb_bulk_di_4108 import Driver
driver = Driver()
device.bind_driver(driver)
device.init()
self = device