Source code for pumpp.task.base
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''The base class for task transformer objects'''
import numpy as np
from librosa import time_to_frames, times_like
from librosa.sequence import viterbi_binary, viterbi_discriminative
import jams
from ..base import Scope
__all__ = ['BaseTaskTransformer']
def fill_value(dtype):
'''Get a fill-value for a given dtype
Parameters
----------
dtype : type
Returns
-------
`np.nan` if `dtype` is real or complex
0 otherwise
'''
if np.issubdtype(dtype, np.floating) or np.issubdtype(dtype, np.complexfloating):
return dtype(np.nan)
return dtype(0)
[docs]class BaseTaskTransformer(Scope):
'''Base class for task transformer objects
Attributes
----------
name : str
The name prefix for this transformer object
namespace : str
The JAMS namespace for annotations in this task
sr : number > 0
The sampling rate for audio
hop_length : int > 0
The number of samples between frames
'''
[docs] def __init__(self, name, namespace, sr, hop_length):
super(BaseTaskTransformer, self).__init__(name)
# This will trigger an exception if the namespace is not found
jams.schema.is_dense(namespace)
self.namespace = namespace
self.sr = sr
self.hop_length = hop_length
def empty(self, duration):
'''Create an empty jams.Annotation for this task.
This method should be overridden by derived classes.
Parameters
----------
duration : int >= 0
Duration of the annotation
'''
return jams.Annotation(namespace=self.namespace, time=0, duration=0)
def transform(self, jam, query=None):
'''Transform jam object to make data for this task
Parameters
----------
jam : jams.JAMS
The jams container object
query : string, dict, or callable [optional]
An optional query to narrow the elements of `jam.annotations`
to be considered.
If not provided, all annotations are considered.
Returns
-------
data : dict
A dictionary of transformed annotations.
All annotations which can be converted to the target namespace
will be converted.
'''
anns = []
if query:
results = jam.search(**query)
else:
results = jam.annotations
# Find annotations that can be coerced to our target namespace
for ann in results:
try:
anns.append(jams.nsconvert.convert(ann, self.namespace))
except jams.NamespaceError:
pass
duration = jam.file_metadata.duration
# If none, make a fake one
if not anns:
anns = [self.empty(duration)]
# Apply transformations
results = []
for ann in anns:
results.append(self.transform_annotation(ann, duration))
# If the annotation range is None, it spans the entire track
if ann.time is None or ann.duration is None:
valid = [0, duration]
else:
valid = [ann.time, ann.time + ann.duration]
results[-1]['_valid'] = time_to_frames(valid, sr=self.sr,
hop_length=self.hop_length)
# Prefix and collect
return self.merge(results)
def encode_events(self, duration, events, values, dtype=np.bool):
'''Encode labeled events as a time-series matrix.
Parameters
----------
duration : number
The duration of the track
events : ndarray, shape=(n,)
Time index of the events
values : ndarray, shape=(n, m)
Values array. Must have the same first index as `events`.
dtype : numpy data type
Returns
-------
target : ndarray, shape=(n_frames, n_values)
'''
frames = time_to_frames(events, sr=self.sr,
hop_length=self.hop_length)
n_total = int(time_to_frames(duration, sr=self.sr,
hop_length=self.hop_length))
n_alloc = n_total
if np.any(frames):
n_alloc = max(n_total, 1 + int(frames.max()))
target = np.empty((n_alloc, values.shape[1]),
dtype=dtype)
target.fill(fill_value(dtype))
values = values.astype(dtype)
for column, event in zip(values, frames):
target[event] += column
return target[:n_total]
def encode_intervals(self, duration, intervals, values, dtype=np.bool,
multi=True, fill=None):
'''Encode labeled intervals as a time-series matrix.
Parameters
----------
duration : number
The duration (in frames) of the track
intervals : np.ndarray, shape=(n, 2)
The list of intervals
values : np.ndarray, shape=(n, m)
The (encoded) values corresponding to each interval
dtype : np.dtype
The desired output type
multi : bool
If `True`, allow multiple labels per interval.
fill : dtype (optional)
Optional default fill value for missing data.
If not provided, the default is inferred from `dtype`.
Returns
-------
target : np.ndarray, shape=(duration * sr / hop_length, m)
The labeled interval encoding, sampled at the desired frame rate
'''
if fill is None:
fill = fill_value(dtype)
frames = time_to_frames(intervals, sr=self.sr,
hop_length=self.hop_length)
n_total = int(time_to_frames(duration, sr=self.sr,
hop_length=self.hop_length))
values = values.astype(dtype)
n_alloc = n_total
if np.any(frames):
n_alloc = max(n_total, 1 + int(frames.max()))
target = np.empty((n_alloc, values.shape[1]),
dtype=dtype)
target.fill(fill)
for column, interval in zip(values, frames):
if multi:
target[interval[0]:interval[1]] += column
else:
target[interval[0]:interval[1]] = column
return target[:n_total]
def decode_events(self, encoded, transition=None, p_state=None, p_init=None):
'''Decode labeled events into (time, value) pairs
Real-valued inputs are thresholded at 0.5.
Optionally, viterbi decoding can be applied to each event class.
Parameters
----------
encoded : np.ndarray, shape=(n_frames, m)
Frame-level annotation encodings as produced by ``encode_events``.
transition : None or np.ndarray [shape=(2, 2) or (m, 2, 2)]
Optional transition matrix for each event, used for Viterbi
p_state : None or np.ndarray [shape=(m,)]
Optional marginal probability for each event
p_init : None or np.ndarray [shape=(m,)]
Optional marginal probability for each event
Returns
-------
[(time, value)] : iterable of tuples
where `time` is the event time and `value` is an
np.ndarray, shape=(m,) of the encoded value at that time
See Also
--------
librosa.sequence.viterbi_binary
'''
if np.isrealobj(encoded):
if transition is None:
encoded = (encoded >= 0.5)
else:
encoded = viterbi_binary(encoded.T, transition,
p_state=p_state,
p_init=p_init).T
times = times_like(encoded,
sr=self.sr,
hop_length=self.hop_length,
axis=0)
return zip(times, encoded)
def decode_intervals(self, encoded, duration=None, multi=True, sparse=False,
transition=None, p_state=None, p_init=None):
'''Decode labeled intervals into (start, end, value) triples
Parameters
----------
encoded : np.ndarray, shape=(n_frames, m)
Frame-level annotation encodings as produced by
``encode_intervals``
duration : None or float > 0
The max duration of the annotation (in seconds)
Must be greater than the length of encoded array.
multi : bool
If true, allow multiple labels per input frame.
If false, take the most likely label per input frame.
sparse : bool
If true, values are returned as indices, not one-hot.
If false, values are returned as one-hot encodings.
Only applies when `multi=False`.
transition : None or np.ndarray [shape=(m, m) or (2, 2) or (m, 2, 2)]
Optional transition matrix for each interval, used for Viterbi
decoding. If `multi=True`, then transition should be `(2, 2)` or
`(m, 2, 2)`-shaped. If `multi=False`, then transition should be
`(m, m)`-shaped.
p_state : None or np.ndarray [shape=(m,)]
Optional marginal probability for each label.
p_init : None or np.ndarray [shape=(m,)]
Optional marginal probability for each label.
Returns
-------
[(start, end, value)] : iterable of tuples
where `start` and `end` are the interval boundaries (in seconds)
and `value` is an np.ndarray, shape=(m,) of the encoded value
for this interval.
'''
if np.isrealobj(encoded):
if multi:
if transition is None:
encoded = encoded >= 0.5
else:
encoded = viterbi_binary(encoded.T, transition,
p_init=p_init, p_state=p_state).T
elif sparse and encoded.shape[1] > 1:
# map to argmax if it's densely encoded (logits)
if transition is None:
encoded = np.argmax(encoded, axis=1)[:, np.newaxis]
else:
encoded = viterbi_discriminative(encoded.T, transition,
p_init=p_init,
p_state=p_state)[:, np.newaxis]
elif not sparse:
# if dense and multi, map to one-hot encoding
if transition is None:
encoded = (encoded == np.max(encoded, axis=1, keepdims=True))
else:
encoded_ = viterbi_discriminative(encoded.T, transition,
p_init=p_init,
p_state=p_state)
# Map to one-hot encoding
encoded = np.zeros(encoded.shape, dtype=bool)
encoded[np.arange(len(encoded_)), encoded_] = True
if duration is None:
# 1+ is fair here, because encode_intervals already pads
duration = 1 + encoded.shape[0]
else:
duration = 1 + time_to_frames(duration,
sr=self.sr,
hop_length=self.hop_length)
# [0, duration] inclusive
times = times_like(duration + 1,
sr=self.sr, hop_length=self.hop_length)
# Find the change-points of the rows
if sparse:
idx = np.where(encoded[1:] != encoded[:-1])[0]
else:
idx = np.where(np.max(encoded[1:] != encoded[:-1], axis=-1))[0]
idx = np.unique(np.append(idx, encoded.shape[0]))
delta = np.diff(np.append(-1, idx))
# Starting positions can be integrated from changes
position = np.cumsum(np.append(0, delta))
return [(times[p], times[p + d], encoded[p])
for (p, d) in zip(position, delta)]