Source code for icarus_nmr.driver_usb_bulk_di_4108

"""
DATAQ 4108 driver level code
author: Valentyn Stadnytskyi
June 2018 - June 2018

1.0.0 - designed for usb Bulk protocol.
1.0.1 - dec is added to the communication

"""

from numpy import nan, mean, std, asarray, array, concatenate, delete, round, vstack, hstack, zeros, transpose, split
from serial import Serial

import sys
if sys.version_info[0] == 3:
    if sys.version_info[1] <= 7:
        from time import gmtime, strftime, time, sleep, clock
    else:
        from time import gmtime, strftime, time, sleep
        from time import perf_counter as clock
else:
    from time import gmtime, strftime, time, sleep, clock

import os.path
import struct
from pdb import pm


from logging import info,warn, debug, error
from struct import pack, unpack
from timeit import Timer

import traceback


[docs]class Driver(object): def __init__(self): """ """ self.available_ports = [] self.dev = None def init(self): self.discover() self.use_port() info("initialization of the driver is complete") def get_information(self): if self.dev != None: self.dev_dict = {} self.epi_dict = {} self.dev_dict['DEV:address'] = self.dev.address self.dev_dict['DEV:bDeviceClass'] = self.dev.bDeviceClass self.dev_dict['DEV:bDescriptorType'] = self.dev.bDescriptorType self.dev_dict['DEV:bDeviceProtocol'] = self.dev.bDeviceProtocol self.dev_dict['DEV:bLength'] = self.dev.bLength self.dev_dict['DEV:bMaxPacketSize0'] = self.dev.bMaxPacketSize0 self.dev_dict['DEV:bNumConfigurations'] = self.dev.bNumConfigurations self.dev_dict['DEV:manufacturer'] = self.dev.manufacturer self.dev_dict['DEV:serial_number'] = self.dev.serial_number self.dev_dict['DEV:speed'] = self.dev.speed self.dev_dict['DEV:product'] = self.dev.product #endpoint IN description self.epi_dict['EPI:bmAttributes'] = self.epi.bmAttributes self.epi_dict['EPI:wMaxPacketSize'] = self.epi.wMaxPacketSize self.epi_dict['EPI:bSynchAddress'] = self.epi.bSynchAddress self.epi_dict['EPI:bInterval'] = self.epi.bInterval self.epi_dict['EPI:bEndpointAddress'] = self.epi.bEndpointAddress self.epi_dict['EPI:bDescriptorType'] = self.epi.bDescriptorType self.epi_dict['EPI:bInterval'] = self.epi.bInterval self.epi_dict['EPI:bInterval'] = self.epi.bInterval def get_hardware_information(self): dic = {} dic[b'Device name'] = self.inquire(b'info 1 \r').split(b'info 1 ')[1][1:-1] dic[b'Firmware version'] = self.inquire(b'info 2 \r').split(b'info 2 ')[1][1:-1] dic[b'Serial Number'] = self.inquire(b'info 6 \r').split(b'info 6 ')[1][1:-1] dic[b'Sample Rate Divisor'] = self.inquire(b'info 9 \r').split(b'info 9 ')[1][1:-1] return dic def use_port(self, port = ''): import usb.core import usb.util self.dev.reset() # set the active configuration. With no arguments, the first # configuration will be the active one self.dev.set_configuration() # get an endpoint instance cfg = self.dev.get_active_configuration() intf = cfg[(0,0)] self.epo = usb.util.find_descriptor( intf, # match the first OUT endpoint custom_match = \ lambda e: \ usb.util.endpoint_direction(e.bEndpointAddress) == \ usb.util.ENDPOINT_OUT) self.epi = usb.util.find_descriptor( intf, # match the first IN endpoint custom_match = \ lambda e: \ usb.util.endpoint_direction(e.bEndpointAddress) == \ usb.util.ENDPOINT_IN) assert self.epo is not None assert self.epi is not None self.epi.wMaxPacketSize = 7200000 self.epo.wMaxPacketSize = 7200000 self.epi.bmAttributes = 1 self.epi.bInterval = 100 self.usb_buff = int(self.epi.wMaxPacketSize/100) #self.readall()
[docs] def discover(self): """ the function allows to discover DI-4108 device. returns: flag if a device(devices) are found. assigns: self.available_ports list entry [0] - COM port namer pvproperty(value=2.0)[1] - serial number """ import usb.core flag = True self.dev = usb.core.find(idVendor=0x0683, idProduct=0x4108) if self.dev is None: raise ValueError('Device not found') flag = False return flag
"""Basic serial communication functions""" def read(self,N = 0): if N == 0: usb_buff = int(self.usb_buff) else: usb_buff = int(N) from time import sleep #tested dec 17, 2017 string = b"" timeout = 1000 try: data = self.dev.read(self.epi,usb_buff,timeout) except: error(traceback.format_exc()) data = '' if len(data) != 0: for i in data: string += bytes([i]) return string def readall(self): return self.read(0) def write(self,command): try: self.dev.write(self.epo,command) except: error(traceback.format_exc()) def inquire(self,command): self.write(command) res = self.read(self.usb_buff) return res def close_port(self): pass """DI-4108 Communcation functions""" def config_digital(self, number = 127, echo = False): #tested dec 17, 2017 string = 'endo ' + str(number) + '\r' flag = False self.write(string) a = self.readall() info('%r' % a) if echo == True: if a == 'endo ' + str(number) + '\r': flag = True else: flag = False else: flag = None return flag def set_digital(self, number = 127, echo = False): #tested dec 17, 2017 string = 'dout ' + str(number) + ' \r' flag = False self.write(string) if echo == True: a = self.readall() debug('%r' % a) if a == 'dout ' + str(number) + ' \r\x00': flag = True else: flag = False else: flag = None return flag def set_analog(self, echo = False, filterN = 1): #tested dec 17, 2017 import traceback flag = False try: filterN = int(filterN) except: error(traceback.format_exc()) filterN = 1 self.write('filter * '+str(int(filterN))+' \r') # 0 - Sets all channels to read the last point; 1 - CIC filter self.write('slist 0 0 \r') # 256 Sets analog channel 0 to be read at +-5V, increment by 1 to go to next channel self.write('slist 1 1 \r') # 257 "" but channel 1 self.write('slist 2 2 \r') # 1282 Sets to read channel 2 at +-.2V self.write('slist 3 3 \r') # Sets to read channel 3 at +-.2V self.write('slist 4 4 \r') # Sets to read channel 4 at +-.2V self.write('slist 5 5 \r') # Sets to read channel 5 at +-.2V self.write('slist 6 6 \r') # Sets to read channel 6 at +-5V self.write('slist 7 7 \r') # Sets to read channel 7 at +-5V self.write('slist 8 8 \r') # Sets to read digital input self.write('ps 6 \r') # set DATSQ packet size 1024 bytes page 3 di-4x08 Protocol #read buffer. The echoed commands need to be checked for succesful inplementation. #the following part was supposed to check if command has echoed back, meaning was received by the DI-4018. #I don't think this command is needed at all. a = self.readall() b = self.readall() #print('%r' % (a + b)) if echo == True: if (a+b) == 'filter * 0 \r\x00slist 0 256 \rslist 1 257 \rslist 2 1282 \r\x00slist 3 1283 \r\x00slist 4 1284 \r\x00slist 5 1285 \r\x00slist 6 262 \rslist 7 263 \rslist 8 8 \r': flag = True else: flag = True return flag def set_acquisition_rate(self, rate = 1000, baserate = 20000, dec = 0, echo = True): #tested dec 17, 2017 baserate = 20000 if dec == 0: dec = int(baserate/rate) flag = False string = 'srate ' + str(int(60000000/baserate)) +' \r' #self.ser.write('srate 6000000\r') # Supposedly sets it to read at 50kHz, Hz=60000000/srate self.write(string) string = 'dec ' + str(dec) +' \r' self.write(string) from time import sleep sleep(0.2) #print('%r,%r' % (a,b)) if echo == True: a = self.readall() debug(a) b = self.readall() debug(a) if a+b == 'srate '+ str(60000000/rate)+ '\r\x00': flag = True else: flag = True return flag def start_scan(self): info( self.write('start\r')) info( 'The configured measurement(s) has(have) started') def stop_scan(self): self.write('stop\r') info("measurement ended") def full_stop(self): self.stop_scan() self.close_port()
[docs] def flush(self): """backcompetability with serial communication""" pass
"""Advance function""" def config_channels(self, rate = 3000, conf_digital = 127, set_digital = 127, dec = 20, baserate = 20000): x = self.config_digital(conf_digital, echo = True) a = self.set_digital(set_digital, echo = True) b = self.set_analog(echo = True) c = self.set_acquisition_rate(rate = rate, dec = dec, baserate = 20000, echo = True) #3000 gives maximum rate return x*a*b*c def get_readings(self, points_to_read = 1, to_read_analog = 8, to_read_digital = 1): to_read = int(to_read_analog)*2+int(to_read_digital)*2 result = self.read(to_read*points_to_read) if b'stop' in result: flag = False else: flag = True try: data = asarray(unpack(('h'*to_read_analog+'BB')*points_to_read,result)) except: error(traceback.format_exc()) data = None #first N bytes (where N/2 is number of analog channels to read #see https://docs.python.org/2/library/struct.html for 'h' description #digital_data = array(unpack('B',res[-1:])[0]) #This how we can format the integer to binary string ---->> bin(unpack('B',res[-1])[0])[2:].zfill(7).split() #this will unpack the very last byte as binary and make sure to #keep the leading zero. The very first zero 'D7' byte count 18 (see manual) is ommited. #will be shown as a string. We need to convert it to a numpy array for easier usage try: res = asarray(split(data,points_to_read)) except: error(traceback.format_exc()) res = None return res, flag #(analog_data,digital_data) def blink(self): from time import sleep for i in range(8): self.write('led ' + str(i) + ' \r') sleep(1) self.write('led ' + str(7) + ' \r') """Test functions""" def self_test(self): self.inquire('led 7\r') self.tau = 0.001 #dictionary with device parameters such as S\N, device name, ect self.dict = {} self.dict['Device name'] = self.inquire('info 1 \r').split('info 1 ')[1][1:-1] self.dict['Firmware version'] = self.inquire('info 2 \r').split('info 2 ')[1][1:-1] self.dict['Serial Number'] = self.inquire('info 6 \r').split('info 6 ')[1][1:-1] self.dict['Sample Rate Divisor'] = self.inquire('info 9 \r').split('info 9 ')[1][1:-1] #Useful variables #wait time after write (before) read. This seems to be not necessary. for i in self.dict.keys(): print('%r, %r' % (i, self.dict[i])) print('information request complete: the DI-4108 with SN %r' %self.dict['Serial Number']) print('%r' % self.inquire('led 2\r')) def test1(self): self.self_test() self.config_channels() self.start_scan() while self._waiting()[0] <1: sleep(0.001) start_t = time() while time()-start_t<2: print("%0.5f %r %r" % (time()-start_t,self._waiting()[0],self.get_readings())) self.stop_scan() print('test 1 is Done. IN buffer has all data') print('data = dev.get_readings()') print('buffer waiting %r' % self._waiting()[0]) def test2(self): self.self_test() self.config_channels() self.start_scan() sleep(6) self.stop_scan() sleep(1) while self._waiting()[0]>5: print('time %r and value %r'% (time(),self.get_readings())) print('test 2 is Done') def test3(self): self.self_test() self.config_channels() self.start_scan() sleep(6) self.stop_scan() print(self.waiting()) sleep(1) while self.waiting()[0]>5: print(self.waiting()[0]) print(self.get_readings()) print('test 2 is Done') def echo_test1(self): self.config_channels() self.start_scan() while self.waiting()[0] <1: sleep(0.001) start_t = time() while time()- start_t <1: self.write('dout 0\r') self.write('dout 127 \r') sleep(0.06) self.stop_scan() print("%r" % self._waiting()[0]) data = self.readall() print('%r' % data) def performance_test1(self): self.self_test() self.config_channels() self.start_scan() while self.waiting()[0] <10000: sleep(0.001) start_t = time() self.stop_scan() print('test 1 is Done. IN buffer has all data') print('data = dev.get_readings(10)') print('buffer waiting %r' % self._waiting()[0]) print('t = Timer(lambda: dev.get_readings(N))') print('print t.timeit(number = M)') def performance_test2(self): self.self_test() self.config_channels() self.start_scan() start_t = time() self.lst = [] time_st = time() while time()-start_t<10: if self._waiting()[0]> 512*16: data = self.get_readings(512) self.lst.append([time()-start_t,self._waiting()[0],(time() - time_st)*1000]) print("%r %0.10f" % (self._waiting()[0], (time() - time_st)*1000)) sleep(24/1000) #wait for 12.8 ms start_t = time() self.stop_scan() print('time between 4 kS = %0.5f' % mean(10.0/len(self.lst))) print('Sampling rate: %0.1f' % (512/mean(10.0/len(self.lst)))) print('test 1 is Done. IN buffer has all data') print('data = dev.get_readings(10)') print('buffer waiting %r' % self._waiting()[0]) print('t = Timer(lambda: dev.get_readings(N))') print('print t.timeit(number = M)')
if __name__ == "__main__": #for testing import logging from tempfile import gettempdir driver = Driver() self = driver logging.basicConfig(#filename=gettempdir()+'/DI_4108_BULK_LL.log', level=logging.DEBUG, format="%(asctime)s %(levelname)s: %(message)s") print('self.self_test()') print('self.test1()') print('self.test2()') print('self.echo_test1()') print('self.performance_test1()') print('self.performance_test2()')