Source code for circular_buffer_numpy.circular_buffer

#!/bin/env python
# -*- coding: utf-8 -*-
"""Circular buffer Server
    by Valentyn Stadnytskyi
    created: Nov 4, 2017
"""
import logging
from logging import debug, info, warn, error
import warnings
logging.getLogger(__name__).addHandler(logging.NullHandler())

[docs]class CircularBuffer(object): """ :ivar pointer: initial value: -1 :ivar g_pointer: initial value: -1 :ivar packet_length: initial value 1 """ pointer = -1 # running current pointer value g_pointer = -1 # running current global_pointer value def __init__(self, shape=(100, 2), dtype='float64', packet_length=1): from numpy import nan, zeros, empty """ initializes the class. creates an empty numpy array with given size and give dtype. the shape follows numpy definition where the first index corresponds to x or col and second is y or row. populates the array with nan if dtype is float or zeros if else. creates attributes: __info__, name, size, dtype, type, packet_length, pointer g_pointer packet_pointer g_packet_pointer """ self.__info__ = "Server RingBuffer" self.name = 'circular buffer server' self.type = 'server' self.packet_length = packet_length self.buffer = empty(shape, dtype=dtype) if self.length%self.packet_length != 0: warnings.warn('The number of packets that can fit into this buffer is not integer. The all functions related to manipulation with packets are not going to work properly.', DeprecationWarning, stacklevel=2)
[docs] def append(self, data): """ appends data to the existing circular buffer. Parameters ---------- data :: (numpy array) data to append Returns ------- Examples -------- >>> buffer = circular_buffer_numpy.circular_buffer.CircularBuffer(shape = (10,4) >>> from numpy.random import random >>> rand_arr = random(size=(6,4)) >>> buffer.append(rand_arr) >>> buffer.pointer 5 """ if len(data.shape) == len(self.shape)-1: data = data.reshape((1,data.shape[0])) for i in range(data.shape[0]): if self.pointer == self.shape[0]-1: self.pointer = -1 self.buffer[self.pointer+1] = data[i] self.pointer += 1 self.g_pointer += 1
[docs] def reset(self, clear=False): """ resets pointers to -1 (empty buffer), the full reset can be force via parameter clears The parameter clear can be used to Parameters ---------- clear : (boolean) force clearing the buffer Returns ------- Examples -------- >>> circual_buffer.CircularBuffer.reset() """ from numpy import nan if clear: if 'float' in self.type: self.buffer = self.buffer * nan else: self.buffer = self.buffer*0 self.pointer = -1 self.g_pointer = -1 debug('{},{}'.format(self.pointer, self.g_pointer))
[docs] def change_length(self, length): """ changes length of the buffer Parameters ---------- length : integer new length of the buffer Returns ------- Examples -------- >>> buffer = circual_buffer.CircularBuffer(shape=(10,4)) >>> buffer.shape (10,4) >>> buffer.change_length(12) >>> buffer.shape (12,4) """ from numpy import zeros, nan, copy old_buffer = self.get_all() new_length = [length] + list(data_shape) self.buffer = zeros(shape=new_length, dtype=self.dtype) * nan self.append(old_buffer)
[docs] def get_all(self): """ return entire circular buffer server in ordered way, where last value is the last collected. Parameters ---------- None Returns ------- array : array_like array of data points in the historic order Examples -------- >>> data = circual_buffer.CircularBuffer.get_all() """ return self.get_last_N(N=self.shape[0])
[docs] def get_data(self): """ return all valid circular buffer entries in ordered way, where last value is the last collected. Parameters ---------- None Returns ------- array : array_like (numpy array) Examples -------- >>> data = circual_buffer.CircularBuffer.get_data() """ if self.g_pointer + 1 < self.length: return self.get_last_N(self.g_pointer+1) else: return self.get_all()
[docs] def get_last_N(self, N): """ returns last N entries from the known self.pointer(circular buffer pointer) Parameters ---------- N : integer number of points to return Returns ------- array (numpy array) Examples -------- >>> data = circual_buffer.CircularBuffer.get_last_N(10) """ from numpy import concatenate P = self.pointer if N-1 <= P: result = self.buffer[P+1-N:P+1] else: result = concatenate((self.buffer[-(N-P-1):], self.buffer[:P+1]), axis=0) return result
[docs] def get_last_value(self): """ returns last entry from the known. Same as self.buffer[self.pointer] Parameters ---------- None Returns ------- array : array_like Examples -------- >>> data = circual_buffer.CircularBuffer.get_last_value() """ return self.get_last_N(N=1)
[docs] def get_value(self, linear_pointer = None, circular_pointer = None): """ returns last entry from the known. Same as self.buffer[self.pointer] Parameters ---------- None Returns ------- array : array_like Examples -------- >>> data = circual_buffer.CircularBuffer.get_last_value() """ if (linear_pointer is None) and (circular_pointer is None): raise Exception('linear_pointer or circular_pointer has to be supplied. None were supplied.') if (linear_pointer is not None) and (circular_pointer is not None): raise Exception('linear_pointer or circular_pointer has to be supplied, not both.') if (linear_pointer is not None) and (circular_pointer is None): pointer = linear_pointer - self.length*(linear_pointer//self.length) if (linear_pointer is None) and (circular_pointer is not None): if not circular_pointer>self.length: pointer = circular_pointer else: raise Exception('circular_pointer exceeds the length of the buffer') return self.buffer[pointer]
[docs] def get_i_j(self, i, j): """ returns buffer between indices i and j (including index i) if j < i, it assumes that buffer wrapped around and will give information accordingly. NOTE: the user needs to pay attention to the order at which indices are passed NOTE: index i cannot be -1 otherwise it will return empty array Parameters ---------- i : integer start index in the buffer j : integer end index in the buffer Returns ------- array : array_like Examples -------- >>> data = circual_buffer.CircularBuffer.get_i_j(i=2,j=5) """ if j > i: res = self.buffer[i:j] else: length = self.shape[0] - i + j res = self.get_N(N=length, M=j-1) return res
[docs] def get_N(self, N=0, M=0): """ return N points before index M in the circular buffer Parameters ---------- N : integer number of points to return M : integer index of the pointer Returns ------- array : array_like Examples -------- >>> data = circual_buffer.CircularBuffer.get_N(N=2, M=5) """ from numpy import concatenate P = M if N-1 <= P: result = self.buffer[P+1-N:P+1] else: result = concatenate((self.buffer[-(N-P-1):], self.buffer[:P+1]), axis=0) return result
[docs] def get_N_global(self, N=0, M=0): """ return N points before global index M in the circular buffer. Parameters ---------- N : integer number of points to return M : integer global index of the pointer Returns ------- array : array_like Examples -------- >>> data = circual_buffer.CircularBuffer.get_N(N=2, M=5) """ from numpy import concatenate P = M while M >= self.shape[0]: M = M - self.shape[0] P = M if N-1 <= P: result = self.buffer[P+1-N:P+1] else: result = concatenate((self.buffer[-(N-P-1):], self.buffer[:P+1]), axis=0) return result
[docs] def get_packet_linear_i_j(self,i, j = None, copy = False): """ return packets between i and j linear packet pointers. If i==j, function returns i's packet only. if j is None, function returns only i's packet """ if j is None: j = i N_of_packets = int(self.length/self.packet_length) while (i) > N_of_packets-1: i -= N_of_packets while (j) > N_of_packets-1: j -= N_of_packets return self.get_packet_circular_i_j(i=i,j=j, copy = copy)
[docs] def get_packet_circular_i_j(self,i, j = None, copy = False): """ return packets between i and j circular packet pointers. If i==j, function returns i's packet only. if j is None, function returns only i's packet """ from numpy import copy as cp if j is None: j = i pack_len = self.packet_length if copy: data = cp(self.get_i_j(i = i*pack_len,j = (j+1)*pack_len)) else: data = self.get_i_j(i = i*pack_len,j = (j+1)*pack_len) return data
@property def linear_packet_pointer(self): """ returns global packet pointer calculated from global pointer and packet size. The packet pointer can be a float number. It serves as an indaction something was not appended in packets. """ return int(((self.g_pointer+1)/self.packet_length)-1) g_packet_pointer = linear_packet_pointer @property def circular_packet_pointer(self): """ returns packet pointer calculated from local pointer and packet size. The packet pointer can be a float number. It serves as an indaction something was not appended in packets. """ return int(((self.pointer+1)/self.packet_length)-1) @property def packet_pointer(self): """ returns packet pointer calculated from local pointer and packet size. The packet pointer can be a float number. It serves as an indaction something was not appended in packets. """ warnings.warn('The packet pointer will be replaced with circular packet pointer and global packet pointer will be replaced with linear packet pointer', DeprecationWarning, stacklevel=2) return self.circular_packet_pointer @property def g_packet_pointer(self): """ returns packet pointer calculated from local pointer and packet size. The packet pointer can be a float number. It serves as an indaction something was not appended in packets. """ warnings.warn('The packet pointer will be replaced with circular packet pointer and global packet pointer will be replaced with linear packet pointer', DeprecationWarning, stacklevel=2) return self.linear_packet_pointer @property def size(self): """ Return ------ size : integer property objects that returns the size of the circular buffer. """ debug('returing size') return self.buffer.size @property def shape(self): """ shape : tuple property objects that returns the shape of the circular buffer. """ return self.buffer.shape
[docs] def get_length(self): """ integer: returns the length of the circular buffer along fast """ return self.buffer.shape[0]
length = property(get_length) @property def data_shape(self): """ tuple: property objects that returns the shape of one data point (or N dimensional array) of the circular buffer """ return self.buffer.shape[1:] @property def dtype(self): """ dtype: property objects that returns the shape of one data point (or N dimensional array) of the circular buffer """ return self.buffer.dtype @property def linear_pointer(self): """ 'linear pointer' refers to a global pointer that has been counting from the beginning of times. """ return self.g_pointer @property def circular_pointer(self): """ 'circular pointer' refers to an actual pointer in the circular buffer numpy array and is an alias of 'pointer' """ return self.pointer
if __name__ == "__main__": # for testing purposes from pdb import pm # for debugging from time import time import logging from tempfile import gettempdir import traceback #https://docs.python.org/3/library/logging.html#logrecord-attributes from tempfile import gettempdir logging.basicConfig(filename=gettempdir()+'/circular_buffer_LL.log', level=logging.DEBUG, format="%(asctime)-15s|PID:%(process)-6s|%(levelname)-8s|%(name)s| module:%(module)s-%(funcName)s|message:%(message)s") print("Circular buffer library") print("two classes: server and client") print("server = server() \nclient = client()") print("---------------------------") print("Server functions") print("server.append(data)") print("server.get_all()") print("server.get_N(N = integer)") print("---------------------------") from circular_buffer_numpy.circular_buffer import CircularBuffer from numpy import random buffer = CircularBuffer(shape=(100, 2, 4), packet_length = 5) buffer.reset(); j = 0 for i in range(101): data = random.randint(1024, size=(5, 2, 4))*0 + i buffer.append(data) print(data.mean() == i) j = i%5 print('c normal',buffer.circular_pointer,(i%20+1)*5-1) print('l normal',buffer.linear_pointer,(i+1)*5-1) print('c packet',buffer.circular_packet_pointer,i%20) print('l packet',buffer.linear_packet_pointer, i) print('01234567', buffer.get_packet_circular_i_j(i%20,i%20).mean()==i) print('01234567',buffer.get_packet_linear_i_j(i,i).mean(),i) if i > 20: print('qqqqqqqq',buffer.get_packet_linear_i_j(i-10,i-8).mean(),i-9) print('--------')