import socket import re import numpy as np preamble_fields = {'BYT_NR': int, # data width for waveform 'BIT_NR': int, # number of bits per waveform point 'ENCDG' : str, # encoding of waveform (binary/ascii) 'BN_FMT': str, # binary format of waveform 'BYT_OR': str, # ordering of waveform data bytes (LSB/MSB) 'NR_PT' : int, # record length of record waveform 'WFID' : str, # description string of waveform 'PT_FMT': str, # format of reverence waveform (Y/ENV) 'XINCR' : float, 'PT_OFF': int, 'XZERO' : float, 'XUNIT' : str, 'YMULT' : float, 'YZERO' : float, 'YOFF' : float, 'YUNIT' : str, 'NR_FR' : int } def convert_waveform(waveform, preamble): """Converts a waveform returned by the scope into voltage values.""" return preamble['YZERO'] + (waveform - preamble['YOFF'])*preamble['YMULT'] def build_time_array(preamble): return preamble['XZERO'] + (np.arange(preamble['NR_PT']) - preamble['PT_OFF'])*preamble['XINCR'] class TekScope(object): def __init__(self, host, port): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((host,port)) def send(self, msg): self.sock.sendall(msg) def recv(self): buffer = '' while True: buffer += self.sock.recv(1024) if buffer.endswith('\n'): break return buffer def query(self, msg): """Sends a query to the oscilloscope and returns the response.""" if not msg.endswith('\n'): msg += '\n' self.send(msg) return self.recv() def get_preamble(self): """Returns a dictionary containing information about the waveform format.""" header = self.query('header?\n') # turn header on so that we know preamble field names self.send('header 1\n') preamble = {} for s in self.query('wfmpre?\n').strip()[8:].split(';'): key, value = s.split(' ',1) preamble[key] = preamble_fields[key](value) self.send(header) return preamble def get_active_channels(self): """Returns a list of the active (displayed) channel numbers.""" header = self.query('header?\n') self.send('header 1\n') channels = [] for s in self.query('select?').strip()[8:].split(';'): m = re.match('CH(\d) (\d)', s) if m is not None: ch, state = map(int,m.groups()) if state != 0: channels.append(ch) self.send(header) return channels def get_waveform(self, channel): header = self.query('header?\n') self.send('header 0\n') # not sure what pt_fmt env is, so we'll always transmit # in pt_fmt y format self.send('wfmpre:pt_fmt y\n') encoding = self.query('data:encdg?')[:3] if encoding == 'RIB': dtype = '>i' elif encoding == 'RPB': dtype = '>u' elif encoding == 'SRI': dtype = '' where x is # the number of y characters, and y is the number of bytes in the # waveform. # for example: '#41000' at the beginning of a curve? response means # that there are 1000 bytes in the waveform x = self.sock.recv(2) assert x.startswith('#') y = int(self.sock.recv(int(x[1]))) waveform = np.fromstring(self.sock.recv(y),dtype) # messages end with a newline assert self.sock.recv(1024) == '\n' self.send(header) return waveform