#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Digital signal processing functions; pyo table, file, & sample conversions
"""
import os
import sys
import time
import numpy as np
from scipy.signal import butter, lfilter
try:
import pyo64 as pyo
except Exception:
import pyo
class PyoFormatException(Exception):
pass
# --- time-related helper functions --------------------------
# Ensure we have a high-resolution clock; code from PsychoPy (Sol Simpson)
if sys.platform == 'win32':
from ctypes import byref, c_int64, windll
_fcounter = c_int64()
_qpfreq = c_int64()
windll.Kernel32.QueryPerformanceFrequency(byref(_qpfreq))
_qpfreq = float(_qpfreq.value)
_winQPC = windll.Kernel32.QueryPerformanceCounter
def get_time():
"""High-precision replacement for time.time() on Windows.
"""
_winQPC(byref(_fcounter))
return _fcounter.value / _qpfreq
else:
import timeit
get_time = timeit.default_timer
MIN_SLEEP = 0.001 # used in sleep() function
def sleep(sec=0):
"""Use time.sleep with a minimum duration sleep threshold.
"""
time.sleep(max(MIN_SLEEP, sec))
# --- digital signal processing helper functions --------------------------
_butter_cache = {}
def _butter(order, band, rate=44100):
"""Cache-ing version of scipy.signal's butter().
Allows faster band-pass filtering during real-time processing.
"""
global _butter_cache
_h = hash((order, band, rate))
if not _h in _butter_cache:
low, high = band
nyqfreq = float(rate) / 2
lowf = low / nyqfreq
highf = high / nyqfreq
_butter_cache[_h] = butter(order, (lowf, highf), btype='band')
return _butter_cache[_h]
def bandpass_pre_cache(lows=(80, 100, 120),
highs=(1200, 3000, 8000),
bands=((2000, 8000),), # content-filtered speech
rate=44100):
"""Call _butter now to cache some useful (b, a) values.
"""
for low in lows:
for high in highs:
_butter(6, (low, high), rate=rate)
for band in bands:
_butter(6, band, rate=rate)
[docs]def bandpass(data, low=80, high=1200, rate=44100, order=6):
"""Return bandpass filtered `data`.
"""
b, a = _butter(order, (low, high), rate)
return lfilter(b, a, data)
[docs]def rms(data):
"""Basic audio-power measure: root-mean-square of data.
Identical to `std` when the mean is zero; faster to compute just rms.
"""
if data.dtype == np.int16:
md2 = data.astype(float) ** 2 # int16 wrap around --> negative
else:
md2 = data ** 2
return np.sqrt(np.mean(md2))
[docs]def std(data):
"""Like rms, but also subtracts the mean (= slower).
"""
return np.std(data)
[docs]def smooth(data, win=16, tile=True):
"""Running smoothed average, via convolution over `win` window-size.
`tile` with the mean at start and end by default; otherwise replace with 0.
"""
weights = np.ones(win) / win
data_c = np.convolve(data, weights)[win - 1:-(win - 1)]
if tile:
pre = np.tile(data_c[0], win // 2)
post = np.tile(data_c[-1], win // 2)
else:
pre = post = np.zeros(win // 2)
data_pre_c = np.concatenate((pre, data_c))
data_pre_c_post = np.concatenate((data_pre_c, post))
return data_pre_c_post[:len(data)]
[docs]def zero_crossings(data):
"""Return a vector of length n-1 of zero-crossings within vector `data`.
1 if the adjacent values switched sign, or
0 if they stayed the same sign.
"""
zx = np.zeros(len(data))
zx[np.where(data[:-1] * data[1:] < 0)] = 1
return zx
[docs]def tone(freq=440, sec=2, rate=44100, vol=.99):
"""Return a np.array suitable for use as a tone (pure sine wave).
"""
samples = sec * rate
time_steps = np.arange(0., 1., 1. / samples)
scaling = 2 * np.pi * freq * sec
return np.sin(time_steps * scaling) * vol
[docs]def apodize(data, ms=5, rate=44100):
"""Apply a Hanning window (5ms) to reduce a sound's 'click' onset / offset.
"""
hw_size = int(min(rate // (1000 / ms), len(data) // 15))
hanning_window = np.hanning(2 * hw_size + 1)
data[:hw_size] *= hanning_window[:hw_size]
data[-hw_size:] *= hanning_window[-hw_size:]
return data
# --- pyo helper functions ------------------------------------------------
# format codes for _get_pyo_codes():
pyo_formats = {'wav': 0, 'aif': 1, 'aiff': 1, 'au': 2, 'raw': 3,
'sd2': 4, 'flac': 5, 'caf': 6, 'ogg': 7}
pyo_dtype = {'int16': 0, 'int24': 1, 'int32': 2, 'float32': 3,
'float64': 4, 'U-Law': 5, 'A-Law': 6}
def _get_pyo_codes(fmt='', dtype='int16', file_out=''):
"""Convert file and data formats to int codes, e.g., wav int16 -> (0, 0).
"""
if not fmt:
dot_ext = os.path.splitext(file_out)[1]
fmt = dot_ext.lower().strip('.')
if fmt in pyo_formats:
file_fmt = pyo_formats[fmt]
else:
msg = 'format `{0}` not supported'.format(file_out)
raise PyoFormatException(msg)
if fmt in ['sd2', 'flac']:
ok_dfmt = {'int16': 0, 'int24': 1}
else:
ok_dfmt = pyo_dtype
if dtype in ok_dfmt:
data_fmt = pyo_dtype[dtype]
else:
msg = 'data format `{0}` not supported for `{1}`'.format(
dtype, file_out)
raise PyoFormatException(msg)
return file_fmt, data_fmt
[docs]def samples_from_table(table, start=0, stop=-1, rate=44100):
"""Return samples as a np.array read from a pyo table.
A (start, stop) selection in seconds may require a non-default rate.
"""
samples = np.array(table.getTable())
if (start, stop) != (0, -1):
if stop > start:
samples = samples[start * rate:stop * rate]
elif start:
samples = samples[start * rate:]
return samples
[docs]def table_from_samples(samples, start=0, stop=-1, rate=44100):
"""Return a pyo DataTable constructed from samples.
A (start, stop) selection in seconds may require a non-default rate.
"""
if type(samples) == np.ndarray:
samples = samples.tolist()
if type(samples) != list:
raise TypeError('samples should be a list or np.array')
if (start, stop) != (0, -1):
if stop > start:
samples = samples[start * rate:stop * rate]
elif start:
samples = samples[start * rate:]
table = pyo.DataTable(size=len(samples), init=samples)
return table
[docs]def table_from_file(file_in, start=0, stop=-1):
"""Read data from files, any pyo format, returns (rate, pyo SndTable)
"""
table = pyo.SndTable()
try:
table.setSound(file_in, start=start, stop=stop)
except TypeError:
msg = 'bad file `{0}`, or format not supported'.format(file_in)
raise PyoFormatException(msg)
rate = pyo.sndinfo(file_in)[2]
return rate, table
[docs]def samples_from_file(file_in, start=0, stop=-1):
"""Read data from files, returns tuple (rate, np.array(.float64))
"""
if not os.path.isfile(file_in):
raise IOError('no such file `{0}`'.format(file_in))
rate, table = table_from_file(file_in, start=start, stop=stop)
return rate, np.array(table.getTable())
[docs]def samples_to_file(samples, rate, file_out, fmt='', dtype='int16'):
"""Write data to file, using requested format or infer from file .ext.
Only integer `rate` values are supported.
See http://ajaxsoundstudio.com/pyodoc/api/functions/sndfile.html
"""
file_fmt, data_fmt = _get_pyo_codes(fmt, dtype, file_out)
if type(samples) == np.ndarray:
samples = samples.tolist()
if type(samples) != list:
raise TypeError('samples should be a list or np.array')
try:
pyo.savefile(samples, path=file_out, sr=int(rate), channels=1,
fileformat=file_fmt, sampletype=data_fmt)
except Exception:
msg = 'could not save `{0}`; permissions or other issue?'
raise IOError(msg.format(file_out))
[docs]def table_to_file(table, file_out, fmt='', dtype='int16'):
"""Write data to file, using requested format or infer from file .ext.
"""
file_fmt, data_fmt = _get_pyo_codes(fmt, dtype, file_out)
try:
pyo.savefileFromTable(table=table, path=file_out,
fileformat=file_fmt, sampletype=data_fmt)
except Exception:
msg = 'could not save `{0}`; permissions or other issue?'
raise IOError(msg.format(file_out))