import numpy
import time
import os
import wave
from logging import getLogger
logger = getLogger('pyscab.'+__name__)
[docs]class DataHandler(object):
"""
Handling wav files and pcm data.
Parameters
----------
frame_rate : int, default=44100
frame rate of data.
verbose : bool, default=False
verbosity while data loading.
Attributes
----------
pcm_data : list
loaded audio data.
n_ch : list of int
number of channels of loaded audio data.
n_frames : list of int
number of frames of loaded audio data.
paths : list of str
path of loaded audio file.
id : list of int
id of loaded audio data.
volume : list of float
volume of loaded audio data.
frame_rate : int
frame rate of data
sample width : list of int
sample width in byte of loaded audio data.
dtype : numpy.dtype
data format type.
"""
def __init__(self, frame_rate = 44100, verbose=False):
self.pcm_data = list()
self.n_ch = list()
self.n_frames = list()
self.paths = list()
self.id = list()
self.volume = list()
self.frame_rate = frame_rate
self.sample_width = list()
self.verbose = verbose
self.dtype = numpy.dtype("int16")
[docs] def load(self, id, path, volume=1.0):
"""
Load wav file by path.
Parameters
----------
id : int
id for this audio data.
path : str
file path of audio file to be loaded.
volume : float, default=1.0
volume for this audio data.
Raises
------
ValueError
Raises ValueError if existing (already been used) id is passed to id.
ValueError
Raises ValueError if format of loaded wav file is not Int16 or UInt8.
Examples
--------
You just need to specify the id for data, and file path to be loaded.
>>> import pyscab
>>> dh = pyscab.DataHandler()
>>> dh.load(1, path = "/home/USER/Music/something.wav", volume = 0.5)
"""
if self._is_exist_id(id) is False:
self.id.append(id)
else:
raise ValueError("Passed id " + str(id) + " is dumplicated.")
f_name = os.path.basename(path)
wf = wave.open(path, 'rb')
n_ch = wf.getnchannels()
# it should be set in __init__()
sw = wf.getsampwidth()
if sw == 1:
self.dtype = dtype = numpy.dtype("uint8")
elif sw == 2:
self.dtype = dtype = numpy.dtype("int16")
else:
raise ValueError("file must be Int16 or UInt8 PCM wave format")
nf = wf.getnframes()
if self.verbose:
start = time.time()
print("start loading : " + f_name)
logger.debug("start loading : %s", path)
data = numpy.zeros((nf, n_ch)).astype(dtype)
data_wav = wf.readframes(nf)
data_wav = numpy.frombuffer(data_wav, dtype=dtype)
data_wav = data_wav * volume
data_wav = data_wav.astype(dtype)
for ch in range(n_ch):
data[:, ch] = data_wav[ch::n_ch]
if self.verbose:
print("finish loading : " + str(f_name))
end = time.time()
print("elapsed time : " + str(end-start))
self.paths.append(path)
self.pcm_data.append(data)
self.n_ch.append(n_ch)
self.sample_width.append(sw)
self.n_frames.append(nf)
self.volume.append(volume)
[docs] def apply_window(self, id, t_raise_fall):
"""
Applying window function to data specified by id.
Parameters
----------
id : int
data id to be applied filter function.
t_raise_fall : float
time duration for raise and fall.
Notes
-----
If r_raise_fall is set to 0.05. Both raising and falling time will be set to 0.05.
Currently, only linear function can be applied. Other function will be implemented in the future.
"""
n_frames = self.get_nframes_by_id(id)
fs = self.frame_rate
raise_ = numpy.arange(0, int(t_raise_fall*fs))
fall_ = numpy.arange(int(t_raise_fall*fs), 0, -1)
raise_ = raise_ / max(raise_)
fall_ = fall_ / max(fall_)
flat = numpy.ones((n_frames - numpy.size(raise_) - numpy.size(raise_)))
win_filter = numpy.concatenate([raise_, flat, fall_])
idx = self._id2idx(id)
n_ch = self.n_ch[idx]
win = numpy.zeros((numpy.size(win_filter), n_ch))
for m in range(n_ch):
win[:, m] = win_filter
self.pcm_data[idx] = self.pcm_data[idx]*win
self.pcm_data[idx] = self.pcm_data[idx].astype(self.dtype)
[docs] def add_pcm(self, id, data):
"""
Add the pcm audio data (matrix) to DataHandler.
Parameters
----------
id : int
id for this audio data.
data : numpy.ndarray
audio data. shoule have shape of (number of samples, number of channels)
Notes
-----
File path will be automatically set to "PCM"
Raises
------
ValueError
Raises ValueError if existing (already been used) id is passed to id.
"""
if self._is_exist_id(id) is False:
self.id.append(id)
else:
raise ValueError("Passed id " + str(id) + " is dumplicated.")
if data.ndim == 1:
data = numpy.atleast_2d(data)
data = data.transpose()
self.paths.append("PCM")
self.pcm_data.append(data)
self.n_frames.append(data.shape[0])
self.n_ch.append(data.shape[1])
[docs] def get_nframes_by_id(self, id):
"""
get number of frames of the audio data specified by id.
Parameters
----------
id : int
id of audio data.
Returns
-------
number_of_frames : int
number of frames of the audio data specified by id.
"""
return numpy.shape(self.get_data_by_id(id))[0]
[docs] def get_length_by_id(self, id):
"""
get length (in seconds) of the audio data specified by id.
Parameters
----------
id : int
id of audio data.
Returns
-------
length : int
length (in seconds) of the audio data specified by id.
"""
# return type in second.
return self.get_nframes_by_id(id)/self.frame_rate
[docs] def get_data_by_id(self, id):
"""
get data of the audio data specified by id.
Parameters
----------
id : int
id of audio data.
Returns
-------
data : numpy.ndarray
audio data array specified by id.
"""
idx = self._id2idx(id)
return self.pcm_data[idx]
[docs] def get_n_ch_by_id(self, id):
"""
get number of channels of the audio data specified by id.
Parameters
----------
id : int
id of audio data.
Returns
-------
number_of_channels : int
number of channels of audio data specified by id.
"""
idx = self._id2idx(id)
return self.n_ch[idx]
[docs] def get_path_by_id(self,id):
"""
get file path of the audio data specified by id.
Parameters
----------
id : int
id of audio data.
Returns
-------
path : str
file path of audio data specified by id.
"""
idx = self._id2idx(id)
return self.paths[idx]
def _id2idx(self, id):
idx = numpy.where(numpy.array(self.id) == id)
if len(idx[0]) == 0:
return None
#raise ValueError("id does not exist.")
else:
return int(idx[0])
def _is_exist_id(self, id):
if self._id2idx(id) == None:
return False
else:
return True