'''WAVdyno.py (2014-04-01) Written by Brett Riggs (mrriggs) Functions for recording inertia dyno data from sound card and Arduino.''' import time import math import csv import datetime import pyaudio import serial from multiprocessing.pool import ThreadPool #Record dyno run from sound card def RecWav(PA,iD,WAVframerate,howLong): '''RecWav() reads input from the soundcard and returns a tuple containing the list of sample values for channel 1, the list of sample values for channel 2, and the sample rate''' CHUNK = int(WAVframerate * howLong) stream = PA.open(format = pyaudio.paInt16, channels = 2, rate = WAVframerate, input = True, frames_per_buffer = CHUNK, input_device_index = iD) signals = stream.read(CHUNK) print "Done recording" stream.close() PA.terminate() #store data in lists WAVsignal1 = [] WAVsignal2 = [] i = 0 while i < len(signals)-1: v = ord(signals[i]) + 256 * ord(signals[i+1]) if v > 32767: # One's complement correction v = v - 65535 WAVsignal1.append(v) v = ord(signals[i+2]) + 256 * ord(signals[i+3]) if v > 32767: # One's complement correction v = v - 65535 WAVsignal2.append(v) i = i + 4 #print "saved to list ",str(len(WAVsignal1))," of ",str(WAVframerate*howLong) print "Saving file" #return(WAVsignal1,WAVsignal2,WAVframerate) return(FindTrig(WAVsignal1,WAVframerate), FindTrig(WAVsignal2,WAVframerate), WAVframerate) #Find trigger events def FindTrig(WAVsignal,WAVframerate): '''FindTrig() takes the raw input list from RecWav() and returns a list of only the sample intervals between trigger signals.''' TRIGGER = 28000 HYSTER = 10 Events = [] #Start at baseline i = 0 while i <= len(WAVsignal)-HYSTER: if abs(sum(WAVsignal[i:i+HYSTER])) < (HYSTER*TRIGGER): break i = i + 1 #Find if rising or falling while i <= len(WAVsignal)-HYSTER: if abs(sum(WAVsignal[i:i+HYSTER])) >= (HYSTER*TRIGGER): if WAVsignal[i] < 0: RISING = True else: RISING = False break i = i + 1 #Find trigger events RESET = False beforeCross = 0 while i <= len(WAVsignal)-HYSTER: #Find approaching trigger if (not RESET and not RISING and sum(WAVsignal[i:i+HYSTER]) >= (HYSTER*TRIGGER)) \ or (not RESET and RISING and sum(WAVsignal[i:i+HYSTER]) <= (-1*HYSTER*TRIGGER)): RESET = True beforeCross = 0 #Find beginning of zero cross if (RESET and not RISING and sum(WAVsignal[i:i+HYSTER]) >= (HYSTER*TRIGGER)) \ or (RESET and RISING and sum(WAVsignal[i:i+HYSTER]) <= (-1*HYSTER*TRIGGER)): beforeCross = i + HYSTER - 1 #Find end of zero cross if (RESET and RISING and sum(WAVsignal[i:i+HYSTER]) >= (HYSTER*TRIGGER)) \ or (RESET and not RISING and sum(WAVsignal[i:i+HYSTER]) <= (-1*HYSTER*TRIGGER)): RESET = False Events.append((i+beforeCross)/2) #(float(i)/WAVframerate) i = i + 1 return Events #Record sensor data from Arduino def RecSens(ser,howLong): '''RecSens() reads input from the Arduino and returns a list containing the list of sample time and sample values for each channel''' holdT = time.clock() ARDsignal = [] while time.clock() - holdT < howLong: ser.write("x") holdHex = ser.read(size=18) ARDsignal.append((time.clock()-holdT, int(holdHex[0:3],16), int(holdHex[3:6],16), int(holdHex[6:9],16), int(holdHex[9:12],16), int(holdHex[12:15],16), int(holdHex[15:18],16))) return ARDsignal #Generate .csv file from lists def MakeCsv(WAVsignal1,WAVsignal2,WAVframerate,ARDsignal,CSVfilename): '''MakeCsv() saves the lists from RecWav() in a .csv format that is compatible with the Dyno Calculator.xls spreadsheet.''' d = open(CSVfilename,'w') c = csv.writer(d, quoting=csv.QUOTE_ALL, lineterminator='\n') #, lineterminator='\r\n') c.writerow(["Start Date:",datetime.date.today().strftime("%m/%d/%Y")]) c.writerow(["Start Time:",datetime.datetime.now().strftime("%H:%M:%S")]) c.writerow(["Sample Interval:",str((1.0/WAVframerate)*1000000)+" us"]) c.writerow([]) c.writerow([]) c.writerow([]) c.writerow(["Dyno Sample","","","Motor Sample","","","","","","","","Time","Sense1","Sense2","Sense3","Sense4","Sense5","Sense6"]) if len(WAVsignal1) > len(WAVsignal2): samNum = len(WAVsignal1) else: samNum = len(WAVsignal2) if len(ARDsignal) > samNum: samNum = len(ARDsignal) for i in range(samNum): try: hold1 = WAVsignal1[i] except: hold1 = "" try: hold2 = WAVsignal2[i] except: hold2 = "" try: hold3 = ARDsignal[i] except: hold3 = ("","","","","","","") c.writerow([hold1,"","",hold2,"","","","","","","", hold3[0],hold3[1],hold3[2],hold3[3], hold3[4],hold3[5],hold3[6]]) d.close #test pyAudio def testPA(PA,iD): #find highest sample rate try: PA.is_format_supported(input_format=pyaudio.paInt16, input_channels=2,rate=22050,input_device=iD) WAVframerate = 22050 except: return 0 try: PA.is_format_supported(input_format=pyaudio.paInt16, input_channels=2,rate=44100,input_device=iD) WAVframerate = 44100 except: pass try: PA.is_format_supported(input_format=pyaudio.paInt16, input_channels=2,rate=48000,input_device=iD) WAVframerate = 48000 except: pass try: PA.is_format_supported(input_format=pyaudio.paInt16, input_channels=2,rate=96000,input_device=iD) WAVframerate = 96000 except: pass try: PA.is_format_supported(input_format=pyaudio.paInt16, input_channels=2,rate=192000,input_device=iD) WAVframerate = 192000 except: pass return WAVframerate #main def cmdDyno(CSVfilename="",howLong=10,runNum=1): '''Record dyno run and save to CSV file''' #connect to serial port try: ser = serial.Serial(2,timeout=2) boArd = True except: try: ser = serial.Serial(3,timeout=2) board = True except: boArd = False #connect to sound card PA = pyaudio.PyAudio() inputDevice = 1 WAVframerate = testPA(PA,inputDevice) if WAVframerate == 0: print "Could not read sound card" quit() #get/correct file name if CSVfilename == "" and runNum == 1: CSVfilename = raw_input("Enter 'Save-As' file name >>> ") if len(CSVfilename) > 4: if CSVfilename[-4] == ".": CSVfilename = CSVfilename[:-4] if CSVfilename == "": HOLDfilename = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S") + ".csv" elif runNum == 1: HOLDfilename = CSVfilename + ".csv" else: HOLDfilename = CSVfilename + str(runNum) + ".csv" #count down q = raw_input("Press Enter key to start:") print "5" time.sleep(1) print "4" time.sleep(1) print "3" time.sleep(1) print "2" time.sleep(1) print "1" time.sleep(1) print "GO!" pool = ThreadPool(processes=2) #start recording on both devices SC = pool.apply_async(RecWav,(PA,inputDevice,WAVframerate,howLong)) if boArd: AR = pool.apply_async(RecSens,(ser,howLong)) #read both devices when finished recording raw = SC.get() if boArd: sens = AR.get() else: sens = [] #close devices if boArd: ser.close() #generate report if raw == (): print "Could not read sound card" else: MakeCsv(raw[0],raw[1],raw[2],sens,HOLDfilename) q = raw_input("Do you want to run again? (y/n)>>>") if q[0].upper() == "Y": cmdDyno(CSVfilename,howLong,runNum+1) if __name__ == "__main__": import sys if len(sys.argv) == 1: cmdDyno() elif len(sys.argv) == 2: cmdDyno(str(sys.argv[1])) else: cmdDyno(str(sys.argv[1]),str(sys.argv[2]))