ThinkDSP/thinkdsp.py
2020-02-16 10:59:22 +00:00

1831 lines
44 KiB
Python

"""This file contains code used in "Think DSP",
by Allen B. Downey, available from greenteapress.com
Copyright 2013 Allen B. Downey
License: GNU GPLv3 http://www.gnu.org/licenses/gpl.html
"""
from __future__ import print_function, division
import array
import copy
import math
import numpy as np
import random
import scipy
import scipy.stats
import scipy.fftpack
import struct
import subprocess
import thinkplot
import warnings
from fractions import gcd
from wave import open as open_wave
import matplotlib.pyplot as pyplot
try:
from IPython.display import Audio
except:
warnings.warn(
"Can't import Audio from IPython.display; " "Wave.make_audio() will not work."
)
PI2 = math.pi * 2
def random_seed(x):
"""Initialize the random and np.random generators.
x: int seed
"""
random.seed(x)
np.random.seed(x)
class UnimplementedMethodException(Exception):
"""Exception if someone calls a method that should be overridden."""
class WavFileWriter:
"""Writes wav files."""
def __init__(self, filename="sound.wav", framerate=11025):
"""Opens the file and sets parameters.
filename: string
framerate: samples per second
"""
self.filename = filename
self.framerate = framerate
self.nchannels = 1
self.sampwidth = 2
self.bits = self.sampwidth * 8
self.bound = 2 ** (self.bits - 1) - 1
self.fmt = "h"
self.dtype = np.int16
self.fp = open_wave(self.filename, "w")
self.fp.setnchannels(self.nchannels)
self.fp.setsampwidth(self.sampwidth)
self.fp.setframerate(self.framerate)
def write(self, wave):
"""Writes a wave.
wave: Wave
"""
zs = wave.quantize(self.bound, self.dtype)
self.fp.writeframes(zs.tostring())
def close(self, duration=0):
"""Closes the file.
duration: how many seconds of silence to append
"""
if duration:
self.write(rest(duration))
self.fp.close()
def read_wave(filename="sound.wav"):
"""Reads a wave file.
filename: string
returns: Wave
"""
fp = open_wave(filename, "r")
nchannels = fp.getnchannels()
nframes = fp.getnframes()
sampwidth = fp.getsampwidth()
framerate = fp.getframerate()
z_str = fp.readframes(nframes)
fp.close()
dtype_map = {1: np.int8, 2: np.int16, 3: "special", 4: np.int32}
if sampwidth not in dtype_map:
raise ValueError("sampwidth %d unknown" % sampwidth)
if sampwidth == 3:
xs = np.fromstring(z_str, dtype=np.int8).astype(np.int32)
ys = (xs[2::3] * 256 + xs[1::3]) * 256 + xs[0::3]
else:
ys = np.fromstring(z_str, dtype=dtype_map[sampwidth])
# if it's in stereo, just pull out the first channel
if nchannels == 2:
ys = ys[::2]
# ts = np.arange(len(ys)) / framerate
wave = Wave(ys, framerate=framerate)
wave.normalize()
return wave
def play_wave(filename="sound.wav", player="aplay"):
"""Plays a wave file.
filename: string
player: string name of executable that plays wav files
"""
cmd = "%s %s" % (player, filename)
popen = subprocess.Popen(cmd, shell=True)
popen.communicate()
def find_index(x, xs):
"""Find the index corresponding to a given value in an array."""
n = len(xs)
start = xs[0]
end = xs[-1]
i = round((n - 1) * (x - start) / (end - start))
return int(i)
class _SpectrumParent:
"""Contains code common to Spectrum and DCT.
"""
def __init__(self, hs, fs, framerate, full=False):
"""Initializes a spectrum.
hs: array of amplitudes (real or complex)
fs: array of frequencies
framerate: frames per second
full: boolean to indicate full or real FFT
"""
self.hs = np.asanyarray(hs)
self.fs = np.asanyarray(fs)
self.framerate = framerate
self.full = full
@property
def max_freq(self):
"""Returns the Nyquist frequency for this spectrum."""
return self.framerate / 2
@property
def amps(self):
"""Returns a sequence of amplitudes (read-only property)."""
return np.absolute(self.hs)
@property
def power(self):
"""Returns a sequence of powers (read-only property)."""
return self.amps ** 2
def copy(self):
"""Makes a copy.
Returns: new Spectrum
"""
return copy.deepcopy(self)
def max_diff(self, other):
"""Computes the maximum absolute difference between spectra.
other: Spectrum
returns: float
"""
assert self.framerate == other.framerate
assert len(self) == len(other)
hs = self.hs - other.hs
return np.max(np.abs(hs))
def ratio(self, denom, thresh=1, val=0):
"""The ratio of two spectrums.
denom: Spectrum
thresh: values smaller than this are replaced
val: with this value
returns: new Wave
"""
ratio_spectrum = self.copy()
ratio_spectrum.hs /= denom.hs
ratio_spectrum.hs[denom.amps < thresh] = val
return ratio_spectrum
def invert(self):
"""Inverts this spectrum/filter.
returns: new Wave
"""
inverse = self.copy()
inverse.hs = 1 / inverse.hs
return inverse
@property
def freq_res(self):
return self.framerate / 2 / (len(self.fs) - 1)
def render_full(self, high=None):
"""Extracts amps and fs from a full spectrum.
high: cutoff frequency
returns: fs, amps
"""
hs = np.fft.fftshift(self.hs)
amps = np.abs(hs)
fs = np.fft.fftshift(self.fs)
i = 0 if high is None else find_index(-high, fs)
j = None if high is None else find_index(high, fs) + 1
return fs[i:j], amps[i:j]
def plot(self, high=None, **options):
"""Plots amplitude vs frequency.
Note: if this is a full spectrum, it ignores low and high
high: frequency to cut off at
"""
if self.full:
fs, amps = self.render_full(high)
thinkplot.plot(fs, amps, **options)
else:
i = None if high is None else find_index(high, self.fs)
thinkplot.plot(self.fs[:i], self.amps[:i], **options)
def plot_power(self, high=None, **options):
"""Plots power vs frequency.
high: frequency to cut off at
"""
if self.full:
fs, amps = self.render_full(high)
thinkplot.plot(fs, amps ** 2, **options)
else:
i = None if high is None else find_index(high, self.fs)
thinkplot.plot(self.fs[:i], self.power[:i], **options)
def estimate_slope(self):
"""Runs linear regression on log power vs log frequency.
returns: slope, inter, r2, p, stderr
"""
x = np.log(self.fs[1:])
y = np.log(self.power[1:])
t = scipy.stats.linregress(x, y)
return t
def peaks(self):
"""Finds the highest peaks and their frequencies.
returns: sorted list of (amplitude, frequency) pairs
"""
t = list(zip(self.amps, self.fs))
t.sort(reverse=True)
return t
class Spectrum(_SpectrumParent):
"""Represents the spectrum of a signal."""
def __len__(self):
"""Length of the spectrum."""
return len(self.hs)
def __add__(self, other):
"""Adds two spectrums elementwise.
other: Spectrum
returns: new Spectrum
"""
if other == 0:
return self.copy()
assert all(self.fs == other.fs)
hs = self.hs + other.hs
return Spectrum(hs, self.fs, self.framerate, self.full)
__radd__ = __add__
def __mul__(self, other):
"""Multiplies two spectrums elementwise.
other: Spectrum
returns: new Spectrum
"""
assert all(self.fs == other.fs)
hs = self.hs * other.hs
return Spectrum(hs, self.fs, self.framerate, self.full)
def convolve(self, other):
"""Convolves two Spectrums.
other: Spectrum
returns: Spectrum
"""
assert all(self.fs == other.fs)
if self.full:
hs1 = np.fft.fftshift(self.hs)
hs2 = np.fft.fftshift(other.hs)
hs = np.convolve(hs1, hs2, mode="same")
hs = np.fft.ifftshift(hs)
else:
# not sure this branch would mean very much
hs = np.convolve(self.hs, other.hs, mode="same")
return Spectrum(hs, self.fs, self.framerate, self.full)
@property
def real(self):
"""Returns the real part of the hs (read-only property)."""
return np.real(self.hs)
@property
def imag(self):
"""Returns the imaginary part of the hs (read-only property)."""
return np.imag(self.hs)
@property
def angles(self):
"""Returns a sequence of angles (read-only property)."""
return np.angle(self.hs)
def scale(self, factor):
"""Multiplies all elements by the given factor.
factor: what to multiply the magnitude by (could be complex)
"""
self.hs *= factor
def low_pass(self, cutoff, factor=0):
"""Attenuate frequencies above the cutoff.
cutoff: frequency in Hz
factor: what to multiply the magnitude by
"""
self.hs[abs(self.fs) > cutoff] *= factor
def high_pass(self, cutoff, factor=0):
"""Attenuate frequencies below the cutoff.
cutoff: frequency in Hz
factor: what to multiply the magnitude by
"""
self.hs[abs(self.fs) < cutoff] *= factor
def band_stop(self, low_cutoff, high_cutoff, factor=0):
"""Attenuate frequencies between the cutoffs.
low_cutoff: frequency in Hz
high_cutoff: frequency in Hz
factor: what to multiply the magnitude by
"""
# TODO: test this function
fs = abs(self.fs)
indices = (low_cutoff < fs) & (fs < high_cutoff)
self.hs[indices] *= factor
def pink_filter(self, beta=1):
"""Apply a filter that would make white noise pink.
beta: exponent of the pink noise
"""
denom = self.fs ** (beta / 2.0)
denom[0] = 1
self.hs /= denom
def differentiate(self):
"""Apply the differentiation filter.
returns: new Spectrum
"""
new = self.copy()
new.hs *= PI2 * 1j * new.fs
return new
def integrate(self):
"""Apply the integration filter.
returns: new Spectrum
"""
new = self.copy()
new.hs /= PI2 * 1j * new.fs
return new
def make_integrated_spectrum(self):
"""Makes an integrated spectrum.
"""
cs = np.cumsum(self.power)
cs /= cs[-1]
return IntegratedSpectrum(cs, self.fs)
def make_wave(self):
"""Transforms to the time domain.
returns: Wave
"""
if self.full:
ys = np.fft.ifft(self.hs)
else:
ys = np.fft.irfft(self.hs)
# NOTE: whatever the start time was, we lose it when
# we transform back; we could fix that by saving start
# time in the Spectrum
# ts = self.start + np.arange(len(ys)) / self.framerate
return Wave(ys, framerate=self.framerate)
class IntegratedSpectrum:
"""Represents the integral of a spectrum."""
def __init__(self, cs, fs):
"""Initializes an integrated spectrum:
cs: sequence of cumulative amplitudes
fs: sequence of frequencies
"""
self.cs = np.asanyarray(cs)
self.fs = np.asanyarray(fs)
def plot_power(self, low=0, high=None, expo=False, **options):
"""Plots the integrated spectrum.
low: int index to start at
high: int index to end at
"""
cs = self.cs[low:high]
fs = self.fs[low:high]
if expo:
cs = np.exp(cs)
thinkplot.plot(fs, cs, **options)
def estimate_slope(self, low=1, high=-12000):
"""Runs linear regression on log cumulative power vs log frequency.
returns: slope, inter, r2, p, stderr
"""
# print self.fs[low:high]
# print self.cs[low:high]
x = np.log(self.fs[low:high])
y = np.log(self.cs[low:high])
t = scipy.stats.linregress(x, y)
return t
class Dct(_SpectrumParent):
"""Represents the spectrum of a signal using discrete cosine transform."""
@property
def amps(self):
"""Returns a sequence of amplitudes (read-only property).
Note: for DCTs, amps are positive or negative real.
"""
return self.hs
def __add__(self, other):
"""Adds two DCTs elementwise.
other: DCT
returns: new DCT
"""
if other == 0:
return self
assert self.framerate == other.framerate
hs = self.hs + other.hs
return Dct(hs, self.fs, self.framerate)
__radd__ = __add__
def make_wave(self):
"""Transforms to the time domain.
returns: Wave
"""
N = len(self.hs)
ys = scipy.fftpack.idct(self.hs, type=2) / 2 / N
# NOTE: whatever the start time was, we lose it when
# we transform back
# ts = self.start + np.arange(len(ys)) / self.framerate
return Wave(ys, framerate=self.framerate)
class Spectrogram:
"""Represents the spectrum of a signal."""
def __init__(self, spec_map, seg_length):
"""Initialize the spectrogram.
spec_map: map from float time to Spectrum
seg_length: number of samples in each segment
"""
self.spec_map = spec_map
self.seg_length = seg_length
def any_spectrum(self):
"""Returns an arbitrary spectrum from the spectrogram."""
index = next(iter(self.spec_map))
return self.spec_map[index]
@property
def time_res(self):
"""Time resolution in seconds."""
spectrum = self.any_spectrum()
return float(self.seg_length) / spectrum.framerate
@property
def freq_res(self):
"""Frequency resolution in Hz."""
return self.any_spectrum().freq_res
def times(self):
"""Sorted sequence of times.
returns: sequence of float times in seconds
"""
ts = sorted(iter(self.spec_map))
return ts
def frequencies(self):
"""Sequence of frequencies.
returns: sequence of float freqencies in Hz.
"""
fs = self.any_spectrum().fs
return fs
def plot(self, high=None, **options):
"""Make a pseudocolor plot.
high: highest frequency component to plot
"""
fs = self.frequencies()
i = None if high is None else find_index(high, fs)
fs = fs[:i]
ts = self.times()
# make the array
size = len(fs), len(ts)
array = np.zeros(size, dtype=np.float)
# copy amplitude from each spectrum into a column of the array
for j, t in enumerate(ts):
spectrum = self.spec_map[t]
array[:, j] = spectrum.amps[:i]
thinkplot.pcolor(ts, fs, array, **options)
def make_wave(self):
"""Inverts the spectrogram and returns a Wave.
returns: Wave
"""
res = []
for t, spectrum in sorted(self.spec_map.items()):
wave = spectrum.make_wave()
n = len(wave)
window = 1 / np.hamming(n)
wave.window(window)
i = wave.find_index(t)
start = i - n // 2
end = start + n
res.append((start, end, wave))
starts, ends, waves = zip(*res)
low = min(starts)
high = max(ends)
ys = np.zeros(high - low, np.float)
for start, end, wave in res:
ys[start:end] = wave.ys
# ts = np.arange(len(ys)) / self.framerate
return Wave(ys, framerate=wave.framerate)
class Wave:
"""Represents a discrete-time waveform.
"""
def __init__(self, ys, ts=None, framerate=None):
"""Initializes the wave.
ys: wave array
ts: array of times
framerate: samples per second
"""
self.ys = np.asanyarray(ys)
self.framerate = framerate if framerate is not None else 11025
if ts is None:
self.ts = np.arange(len(ys)) / self.framerate
else:
self.ts = np.asanyarray(ts)
def copy(self):
"""Makes a copy.
Returns: new Wave
"""
return copy.deepcopy(self)
def __len__(self):
return len(self.ys)
@property
def start(self):
return self.ts[0]
@property
def end(self):
return self.ts[-1]
@property
def duration(self):
"""Duration (property).
returns: float duration in seconds
"""
return len(self.ys) / self.framerate
def __add__(self, other):
"""Adds two waves elementwise.
other: Wave
returns: new Wave
"""
if other == 0:
return self
assert self.framerate == other.framerate
# make an array of times that covers both waves
start = min(self.start, other.start)
end = max(self.end, other.end)
n = int(round((end - start) * self.framerate)) + 1
ys = np.zeros(n)
ts = start + np.arange(n) / self.framerate
def add_ys(wave):
i = find_index(wave.start, ts)
# make sure the arrays line up reasonably well
diff = ts[i] - wave.start
dt = 1 / wave.framerate
if (diff / dt) > 0.1:
warnings.warn(
"Can't add these waveforms; their " "time arrays don't line up."
)
j = i + len(wave)
ys[i:j] += wave.ys
add_ys(self)
add_ys(other)
return Wave(ys, ts, self.framerate)
__radd__ = __add__
def __or__(self, other):
"""Concatenates two waves.
other: Wave
returns: new Wave
"""
if self.framerate != other.framerate:
raise ValueError("Wave.__or__: framerates do not agree")
ys = np.concatenate((self.ys, other.ys))
# ts = np.arange(len(ys)) / self.framerate
return Wave(ys, framerate=self.framerate)
def __mul__(self, other):
"""Multiplies two waves elementwise.
Note: this operation ignores the timestamps; the result
has the timestamps of self.
other: Wave
returns: new Wave
"""
# the spectrums have to have the same framerate and duration
assert self.framerate == other.framerate
assert len(self) == len(other)
ys = self.ys * other.ys
return Wave(ys, self.ts, self.framerate)
def max_diff(self, other):
"""Computes the maximum absolute difference between waves.
other: Wave
returns: float
"""
assert self.framerate == other.framerate
assert len(self) == len(other)
ys = self.ys - other.ys
return np.max(np.abs(ys))
def convolve(self, other):
"""Convolves two waves.
Note: this operation ignores the timestamps; the result
has the timestamps of self.
other: Wave or NumPy array
returns: Wave
"""
if isinstance(other, Wave):
assert self.framerate == other.framerate
window = other.ys
else:
window = other
ys = np.convolve(self.ys, window, mode="full")
# ts = np.arange(len(ys)) / self.framerate
return Wave(ys, framerate=self.framerate)
def diff(self):
"""Computes the difference between successive elements.
returns: new Wave
"""
ys = np.diff(self.ys)
ts = self.ts[1:].copy()
return Wave(ys, ts, self.framerate)
def cumsum(self):
"""Computes the cumulative sum of the elements.
returns: new Wave
"""
ys = np.cumsum(self.ys)
ts = self.ts.copy()
return Wave(ys, ts, self.framerate)
def quantize(self, bound, dtype):
"""Maps the waveform to quanta.
bound: maximum amplitude
dtype: numpy data type or string
returns: quantized signal
"""
return quantize(self.ys, bound, dtype)
def apodize(self, denom=20, duration=0.1):
"""Tapers the amplitude at the beginning and end of the signal.
Tapers either the given duration of time or the given
fraction of the total duration, whichever is less.
denom: float fraction of the segment to taper
duration: float duration of the taper in seconds
"""
self.ys = apodize(self.ys, self.framerate, denom, duration)
def hamming(self):
"""Apply a Hamming window to the wave.
"""
self.ys *= np.hamming(len(self.ys))
def window(self, window):
"""Apply a window to the wave.
window: sequence of multipliers, same length as self.ys
"""
self.ys *= window
def scale(self, factor):
"""Multplies the wave by a factor.
factor: scale factor
"""
self.ys *= factor
def shift(self, shift):
"""Shifts the wave left or right in time.
shift: float time shift
"""
# TODO: track down other uses of this function and check them
self.ts += shift
def roll(self, roll):
"""Rolls this wave by the given number of locations.
"""
self.ys = np.roll(self.ys, roll)
def truncate(self, n):
"""Trims this wave to the given length.
n: integer index
"""
self.ys = truncate(self.ys, n)
self.ts = truncate(self.ts, n)
def zero_pad(self, n):
"""Trims this wave to the given length.
n: integer index
"""
self.ys = zero_pad(self.ys, n)
self.ts = self.start + np.arange(n) / self.framerate
def normalize(self, amp=1.0):
"""Normalizes the signal to the given amplitude.
amp: float amplitude
"""
self.ys = normalize(self.ys, amp=amp)
def unbias(self):
"""Unbiases the signal.
"""
self.ys = unbias(self.ys)
def find_index(self, t):
"""Find the index corresponding to a given time."""
n = len(self)
start = self.start
end = self.end
i = round((n - 1) * (t - start) / (end - start))
return int(i)
def segment(self, start=None, duration=None):
"""Extracts a segment.
start: float start time in seconds
duration: float duration in seconds
returns: Wave
"""
if start is None:
start = self.ts[0]
i = 0
else:
i = self.find_index(start)
j = None if duration is None else self.find_index(start + duration)
return self.slice(i, j)
def slice(self, i, j):
"""Makes a slice from a Wave.
i: first slice index
j: second slice index
"""
ys = self.ys[i:j].copy()
ts = self.ts[i:j].copy()
return Wave(ys, ts, self.framerate)
def make_spectrum(self, full=False):
"""Computes the spectrum using FFT.
returns: Spectrum
"""
n = len(self.ys)
d = 1 / self.framerate
if full:
hs = np.fft.fft(self.ys)
fs = np.fft.fftfreq(n, d)
else:
hs = np.fft.rfft(self.ys)
fs = np.fft.rfftfreq(n, d)
return Spectrum(hs, fs, self.framerate, full)
def make_dct(self):
"""Computes the DCT of this wave.
"""
N = len(self.ys)
hs = scipy.fftpack.dct(self.ys, type=2)
fs = (0.5 + np.arange(N)) / 2
return Dct(hs, fs, self.framerate)
def make_spectrogram(self, seg_length, win_flag=True):
"""Computes the spectrogram of the wave.
seg_length: number of samples in each segment
win_flag: boolean, whether to apply hamming window to each segment
returns: Spectrogram
"""
if win_flag:
window = np.hamming(seg_length)
i, j = 0, seg_length
step = int(seg_length // 2)
# map from time to Spectrum
spec_map = {}
while j < len(self.ys):
segment = self.slice(i, j)
if win_flag:
segment.window(window)
# the nominal time for this segment is the midpoint
t = (segment.start + segment.end) / 2
spec_map[t] = segment.make_spectrum()
i += step
j += step
return Spectrogram(spec_map, seg_length)
def get_xfactor(self, options):
try:
xfactor = options["xfactor"]
options.pop("xfactor")
except KeyError:
xfactor = 1
return xfactor
def plot(self, **options):
"""Plots the wave.
"""
xfactor = self.get_xfactor(options)
thinkplot.plot(self.ts * xfactor, self.ys, **options)
def plot_vlines(self, **options):
"""Plots the wave with vertical lines for samples.
"""
xfactor = self.get_xfactor(options)
thinkplot.vlines(self.ts * xfactor, 0, self.ys, **options)
def corr(self, other):
"""Correlation coefficient two waves.
other: Wave
returns: float coefficient of correlation
"""
corr = np.corrcoef(self.ys, other.ys)[0, 1]
return corr
def cov_mat(self, other):
"""Covariance matrix of two waves.
other: Wave
returns: 2x2 covariance matrix
"""
return np.cov(self.ys, other.ys)
def cov(self, other):
"""Covariance of two unbiased waves.
other: Wave
returns: float
"""
total = sum(self.ys * other.ys) / len(self.ys)
return total
def cos_cov(self, k):
"""Covariance with a cosine signal.
freq: freq of the cosine signal in Hz
returns: float covariance
"""
n = len(self.ys)
factor = math.pi * k / n
ys = [math.cos(factor * (i + 0.5)) for i in range(n)]
total = 2 * sum(self.ys * ys)
return total
def cos_transform(self):
"""Discrete cosine transform.
returns: list of frequency, cov pairs
"""
n = len(self.ys)
res = []
for k in range(n):
cov = self.cos_cov(k)
res.append((k, cov))
return res
def write(self, filename="sound.wav"):
"""Write a wave file.
filename: string
"""
print("Writing", filename)
wfile = WavFileWriter(filename, self.framerate)
wfile.write(self)
wfile.close()
def play(self, filename="sound.wav"):
"""Plays a wave file.
filename: string
"""
self.write(filename)
play_wave(filename)
def make_audio(self):
"""Makes an IPython Audio object.
"""
audio = Audio(data=self.ys.real, rate=self.framerate)
return audio
def unbias(ys):
"""Shifts a wave array so it has mean 0.
ys: wave array
returns: wave array
"""
return ys - ys.mean()
def normalize(ys, amp=1.0):
"""Normalizes a wave array so the maximum amplitude is +amp or -amp.
ys: wave array
amp: max amplitude (pos or neg) in result
returns: wave array
"""
high, low = abs(max(ys)), abs(min(ys))
return amp * ys / max(high, low)
def shift_right(ys, shift):
"""Shifts a wave array to the right and zero pads.
ys: wave array
shift: integer shift
returns: wave array
"""
res = np.zeros(len(ys) + shift)
res[shift:] = ys
return res
def shift_left(ys, shift):
"""Shifts a wave array to the left.
ys: wave array
shift: integer shift
returns: wave array
"""
return ys[shift:]
def truncate(ys, n):
"""Trims a wave array to the given length.
ys: wave array
n: integer length
returns: wave array
"""
return ys[:n]
def quantize(ys, bound, dtype):
"""Maps the waveform to quanta.
ys: wave array
bound: maximum amplitude
dtype: numpy data type of the result
returns: quantized signal
"""
if max(ys) > 1 or min(ys) < -1:
warnings.warn("Warning: normalizing before quantizing.")
ys = normalize(ys)
zs = (ys * bound).astype(dtype)
return zs
def apodize(ys, framerate, denom=20, duration=0.1):
"""Tapers the amplitude at the beginning and end of the signal.
Tapers either the given duration of time or the given
fraction of the total duration, whichever is less.
ys: wave array
framerate: int frames per second
denom: float fraction of the segment to taper
duration: float duration of the taper in seconds
returns: wave array
"""
# a fixed fraction of the segment
n = len(ys)
k1 = n // denom
# a fixed duration of time
k2 = int(duration * framerate)
k = min(k1, k2)
w1 = np.linspace(0, 1, k)
w2 = np.ones(n - 2 * k)
w3 = np.linspace(1, 0, k)
window = np.concatenate((w1, w2, w3))
return ys * window
class Signal:
"""Represents a time-varying signal."""
def __add__(self, other):
"""Adds two signals.
other: Signal
returns: Signal
"""
if other == 0:
return self
return SumSignal(self, other)
__radd__ = __add__
@property
def period(self):
"""Period of the signal in seconds (property).
Since this is used primarily for purposes of plotting,
the default behavior is to return a value, 0.1 seconds,
that is reasonable for many signals.
returns: float seconds
"""
return 0.1
def plot(self, framerate=11025):
"""Plots the signal.
The default behavior is to plot three periods.
framerate: samples per second
"""
duration = self.period * 3
wave = self.make_wave(duration, start=0, framerate=framerate)
wave.plot()
def make_wave(self, duration=1, start=0, framerate=11025):
"""Makes a Wave object.
duration: float seconds
start: float seconds
framerate: int frames per second
returns: Wave
"""
n = round(duration * framerate)
ts = start + np.arange(n) / framerate
ys = self.evaluate(ts)
return Wave(ys, ts, framerate=framerate)
def infer_framerate(ts):
"""Given ts, find the framerate.
Assumes that the ts are equally spaced.
ts: sequence of times in seconds
returns: frames per second
"""
# TODO: confirm that this is never used and remove it
dt = ts[1] - ts[0]
framerate = 1.0 / dt
return framerate
class SumSignal(Signal):
"""Represents the sum of signals."""
def __init__(self, *args):
"""Initializes the sum.
args: tuple of signals
"""
self.signals = args
@property
def period(self):
"""Period of the signal in seconds.
Note: this is not correct; it's mostly a placekeeper.
But it is correct for a harmonic sequence where all
component frequencies are multiples of the fundamental.
returns: float seconds
"""
return max(sig.period for sig in self.signals)
def evaluate(self, ts):
"""Evaluates the signal at the given times.
ts: float array of times
returns: float wave array
"""
ts = np.asarray(ts)
return sum(sig.evaluate(ts) for sig in self.signals)
class Sinusoid(Signal):
"""Represents a sinusoidal signal."""
def __init__(self, freq=440, amp=1.0, offset=0, func=np.sin):
"""Initializes a sinusoidal signal.
freq: float frequency in Hz
amp: float amplitude, 1.0 is nominal max
offset: float phase offset in radians
func: function that maps phase to amplitude
"""
self.freq = freq
self.amp = amp
self.offset = offset
self.func = func
@property
def period(self):
"""Period of the signal in seconds.
returns: float seconds
"""
return 1.0 / self.freq
def evaluate(self, ts):
"""Evaluates the signal at the given times.
ts: float array of times
returns: float wave array
"""
ts = np.asarray(ts)
phases = PI2 * self.freq * ts + self.offset
ys = self.amp * self.func(phases)
return ys
def CosSignal(freq=440, amp=1.0, offset=0):
"""Makes a cosine Sinusoid.
freq: float frequency in Hz
amp: float amplitude, 1.0 is nominal max
offset: float phase offset in radians
returns: Sinusoid object
"""
return Sinusoid(freq, amp, offset, func=np.cos)
def SinSignal(freq=440, amp=1.0, offset=0):
"""Makes a sine Sinusoid.
freq: float frequency in Hz
amp: float amplitude, 1.0 is nominal max
offset: float phase offset in radians
returns: Sinusoid object
"""
return Sinusoid(freq, amp, offset, func=np.sin)
def Sinc(freq=440, amp=1.0, offset=0):
"""Makes a Sinc function.
freq: float frequency in Hz
amp: float amplitude, 1.0 is nominal max
offset: float phase offset in radians
returns: Sinusoid object
"""
return Sinusoid(freq, amp, offset, func=np.sinc)
class ComplexSinusoid(Sinusoid):
"""Represents a complex exponential signal."""
def evaluate(self, ts):
"""Evaluates the signal at the given times.
ts: float array of times
returns: float wave array
"""
ts = np.asarray(ts)
phases = PI2 * self.freq * ts + self.offset
ys = self.amp * np.exp(1j * phases)
return ys
class SquareSignal(Sinusoid):
"""Represents a square signal."""
def evaluate(self, ts):
"""Evaluates the signal at the given times.
ts: float array of times
returns: float wave array
"""
ts = np.asarray(ts)
cycles = self.freq * ts + self.offset / PI2
frac, _ = np.modf(cycles)
ys = self.amp * np.sign(unbias(frac))
return ys
class SawtoothSignal(Sinusoid):
"""Represents a sawtooth signal."""
def evaluate(self, ts):
"""Evaluates the signal at the given times.
ts: float array of times
returns: float wave array
"""
ts = np.asarray(ts)
cycles = self.freq * ts + self.offset / PI2
frac, _ = np.modf(cycles)
ys = normalize(unbias(frac), self.amp)
return ys
class ParabolicSignal(Sinusoid):
"""Represents a parabolic signal."""
def evaluate(self, ts):
"""Evaluates the signal at the given times.
ts: float array of times
returns: float wave array
"""
ts = np.asarray(ts)
cycles = self.freq * ts + self.offset / PI2
frac, _ = np.modf(cycles)
ys = (frac - 0.5) ** 2
ys = normalize(unbias(ys), self.amp)
return ys
class CubicSignal(ParabolicSignal):
"""Represents a cubic signal."""
def evaluate(self, ts):
"""Evaluates the signal at the given times.
ts: float array of times
returns: float wave array
"""
ys = ParabolicSignal.evaluate(self, ts)
ys = np.cumsum(ys)
ys = normalize(unbias(ys), self.amp)
return ys
class GlottalSignal(Sinusoid):
"""Represents a periodic signal that resembles a glottal signal."""
def evaluate(self, ts):
"""Evaluates the signal at the given times.
ts: float array of times
returns: float wave array
"""
ts = np.asarray(ts)
cycles = self.freq * ts + self.offset / PI2
frac, _ = np.modf(cycles)
ys = frac ** 2 * (1 - frac)
ys = normalize(unbias(ys), self.amp)
return ys
class TriangleSignal(Sinusoid):
"""Represents a triangle signal."""
def evaluate(self, ts):
"""Evaluates the signal at the given times.
ts: float array of times
returns: float wave array
"""
ts = np.asarray(ts)
cycles = self.freq * ts + self.offset / PI2
frac, _ = np.modf(cycles)
ys = np.abs(frac - 0.5)
ys = normalize(unbias(ys), self.amp)
return ys
class Chirp(Signal):
"""Represents a signal with variable frequency."""
def __init__(self, start=440, end=880, amp=1.0):
"""Initializes a linear chirp.
start: float frequency in Hz
end: float frequency in Hz
amp: float amplitude, 1.0 is nominal max
"""
self.start = start
self.end = end
self.amp = amp
@property
def period(self):
"""Period of the signal in seconds.
returns: float seconds
"""
return ValueError("Non-periodic signal.")
def evaluate(self, ts):
"""Evaluates the signal at the given times.
ts: float array of times
returns: float wave array
"""
freqs = np.linspace(self.start, self.end, len(ts) - 1)
return self._evaluate(ts, freqs)
def _evaluate(self, ts, freqs):
"""Helper function that evaluates the signal.
ts: float array of times
freqs: float array of frequencies during each interval
"""
dts = np.diff(ts)
dps = PI2 * freqs * dts
phases = np.cumsum(dps)
phases = np.insert(phases, 0, 0)
ys = self.amp * np.cos(phases)
return ys
class ExpoChirp(Chirp):
"""Represents a signal with varying frequency."""
def evaluate(self, ts):
"""Evaluates the signal at the given times.
ts: float array of times
returns: float wave array
"""
start, end = np.log10(self.start), np.log10(self.end)
freqs = np.logspace(start, end, len(ts) - 1)
return self._evaluate(ts, freqs)
class SilentSignal(Signal):
"""Represents silence."""
def evaluate(self, ts):
"""Evaluates the signal at the given times.
ts: float array of times
returns: float wave array
"""
return np.zeros(len(ts))
class Impulses(Signal):
"""Represents silence."""
def __init__(self, locations, amps=1):
self.locations = np.asanyarray(locations)
self.amps = amps
def evaluate(self, ts):
"""Evaluates the signal at the given times.
ts: float array of times
returns: float wave array
"""
ys = np.zeros(len(ts))
indices = np.searchsorted(ts, self.locations)
ys[indices] = self.amps
return ys
class _Noise(Signal):
"""Represents a noise signal (abstract parent class)."""
def __init__(self, amp=1.0):
"""Initializes a white noise signal.
amp: float amplitude, 1.0 is nominal max
"""
self.amp = amp
@property
def period(self):
"""Period of the signal in seconds.
returns: float seconds
"""
return ValueError("Non-periodic signal.")
class UncorrelatedUniformNoise(_Noise):
"""Represents uncorrelated uniform noise."""
def evaluate(self, ts):
"""Evaluates the signal at the given times.
ts: float array of times
returns: float wave array
"""
ys = np.random.uniform(-self.amp, self.amp, len(ts))
return ys
class UncorrelatedGaussianNoise(_Noise):
"""Represents uncorrelated gaussian noise."""
def evaluate(self, ts):
"""Evaluates the signal at the given times.
ts: float array of times
returns: float wave array
"""
ys = np.random.normal(0, self.amp, len(ts))
return ys
class BrownianNoise(_Noise):
"""Represents Brownian noise, aka red noise."""
def evaluate(self, ts):
"""Evaluates the signal at the given times.
Computes Brownian noise by taking the cumulative sum of
a uniform random series.
ts: float array of times
returns: float wave array
"""
dys = np.random.uniform(-1, 1, len(ts))
# ys = scipy.integrate.cumtrapz(dys, ts)
ys = np.cumsum(dys)
ys = normalize(unbias(ys), self.amp)
return ys
class PinkNoise(_Noise):
"""Represents Brownian noise, aka red noise."""
def __init__(self, amp=1.0, beta=1.0):
"""Initializes a pink noise signal.
amp: float amplitude, 1.0 is nominal max
"""
self.amp = amp
self.beta = beta
def make_wave(self, duration=1, start=0, framerate=11025):
"""Makes a Wave object.
duration: float seconds
start: float seconds
framerate: int frames per second
returns: Wave
"""
signal = UncorrelatedUniformNoise()
wave = signal.make_wave(duration, start, framerate)
spectrum = wave.make_spectrum()
spectrum.pink_filter(beta=self.beta)
wave2 = spectrum.make_wave()
wave2.unbias()
wave2.normalize(self.amp)
return wave2
def rest(duration):
"""Makes a rest of the given duration.
duration: float seconds
returns: Wave
"""
signal = SilentSignal()
wave = signal.make_wave(duration)
return wave
def make_note(midi_num, duration, sig_cons=CosSignal, framerate=11025):
"""Make a MIDI note with the given duration.
midi_num: int MIDI note number
duration: float seconds
sig_cons: Signal constructor function
framerate: int frames per second
returns: Wave
"""
freq = midi_to_freq(midi_num)
signal = sig_cons(freq)
wave = signal.make_wave(duration, framerate=framerate)
wave.apodize()
return wave
def make_chord(midi_nums, duration, sig_cons=CosSignal, framerate=11025):
"""Make a chord with the given duration.
midi_nums: sequence of int MIDI note numbers
duration: float seconds
sig_cons: Signal constructor function
framerate: int frames per second
returns: Wave
"""
freqs = [midi_to_freq(num) for num in midi_nums]
signal = sum(sig_cons(freq) for freq in freqs)
wave = signal.make_wave(duration, framerate=framerate)
wave.apodize()
return wave
def midi_to_freq(midi_num):
"""Converts MIDI note number to frequency.
midi_num: int MIDI note number
returns: float frequency in Hz
"""
x = (midi_num - 69) / 12.0
freq = 440.0 * 2 ** x
return freq
def sin_wave(freq, duration=1, offset=0):
"""Makes a sine wave with the given parameters.
freq: float cycles per second
duration: float seconds
offset: float radians
returns: Wave
"""
signal = SinSignal(freq, offset=offset)
wave = signal.make_wave(duration)
return wave
def cos_wave(freq, duration=1, offset=0):
"""Makes a cosine wave with the given parameters.
freq: float cycles per second
duration: float seconds
offset: float radians
returns: Wave
"""
signal = CosSignal(freq, offset=offset)
wave = signal.make_wave(duration)
return wave
def mag(a):
"""Computes the magnitude of a numpy array.
a: numpy array
returns: float
"""
return np.sqrt(np.dot(a, a))
def zero_pad(array, n):
"""Extends an array with zeros.
array: numpy array
n: length of result
returns: new NumPy array
"""
res = np.zeros(n)
res[: len(array)] = array
return res
def main():
cos_basis = cos_wave(440)
sin_basis = sin_wave(440)
wave = cos_wave(440, offset=math.pi / 2)
cos_cov = cos_basis.cov(wave)
sin_cov = sin_basis.cov(wave)
print(cos_cov, sin_cov, mag((cos_cov, sin_cov)))
return
wfile = WavFileWriter()
for sig_cons in [
SinSignal,
TriangleSignal,
SawtoothSignal,
GlottalSignal,
ParabolicSignal,
SquareSignal,
]:
print(sig_cons)
sig = sig_cons(440)
wave = sig.make_wave(1)
wave.apodize()
wfile.write(wave)
wfile.close()
return
signal = GlottalSignal(440)
signal.plot()
pyplot.show()
return
wfile = WavFileWriter()
for m in range(60, 0, -1):
wfile.write(make_note(m, 0.25))
wfile.close()
return
wave1 = make_note(69, 1)
wave2 = make_chord([69, 72, 76], 1)
wave = wave1 | wave2
wfile = WavFileWriter()
wfile.write(wave)
wfile.close()
return
sig1 = CosSignal(freq=440)
sig2 = CosSignal(freq=523.25)
sig3 = CosSignal(freq=660)
sig4 = CosSignal(freq=880)
sig5 = CosSignal(freq=987)
sig = sig1 + sig2 + sig3 + sig4
# wave = Wave(sig, duration=0.02)
# wave.plot()
wave = sig.make_wave(duration=1)
# wave.normalize()
wfile = WavFileWriter(wave)
wfile.write()
wfile.close()
if __name__ == "__main__":
main()