import pyaudio
import numpy
from logging import getLogger
logger = getLogger('pyscab.'+__name__)
[docs]class CallbackParams(object):
"""
class for referencing parameters of callback function of portaudio.
Attributes
----------
data : list of pyscab.ReadAudioChunk
containing audio data to play.
data_finished : list of bool
True if all audio data in attribute data which has corresponding index were played.
"""
def __init__(self):
self.init()
[docs] def init(self):
"""
initialize all parameters hold by CallbackParams instance.
"""
self.data = list()
self.data_finished = list()
self.format = None
self.time = None
self.n_ch = None
callback_params = CallbackParams()
[docs]def callback(in_data, frame_count, time_info, status, p=callback_params):
data = numpy.zeros((frame_count, p.n_ch), dtype=p.format)
for idx, is_finished in enumerate(p.data_finished):
#for idx_audio, audio in enumerate(p.data):
#for audio in p.data:
if is_finished is False:
chunk = p.data[idx].read_chunk()
for idx_ch, ch in enumerate(p.data[idx].get_ch()):
data[:, ch-1] += chunk[:, idx_ch]
#else:
# p.data_finished[idx] = True
# if all audio chunks were loaded, delete the instance
#
# deleted this in ver. 0.0.6, since doing this will take time and makes problem with presenting short audio with fast SOA.
#del p.data[idx_audio]
p.time = time_info
return (numpy.ravel(data, order='C'), pyaudio.paContinue)
[docs]def get_available_devices():
"""
print available devices connected to the computer.
"""
pya = pyaudio.PyAudio()
hi = HardwareInformation(pya)
devices = hi.devices
for device in devices:
print("name : '" + str(devices[device]['name']) + "', maxOutputChannels : " + str(devices[device]['maxOutputChannels']))
class AudioInterface(object):
"""
Class for handling Audio Interface device.
Attributes
----------
device_name : str
name of the device to be opened.
format : str {'INT16', 'UINT8'}
format to be used for device.
frame_rate : int
frame rate of device.
frames_per_butter : int
frames per buffer
pya : instance of pyaudio.PyAudio()
instance of PyAudio device.
device_idx : int
device index of opened device.
num_cannels : int
number of channels to be opened.
n_ReadAudioChunk_obj : int
total number of ReadAudioChunk instance.
See Also
--------
Class ReadAudioChunk : used for reading audio data by chunk.
"""
def __init__(self,
device_name=None,
n_ch = 2,
format="INT16",
frame_rate=44100,
frames_per_buffer=512,):
"""
set audio device to be used and its settings
Parameters
----------
device_name : str, default=None
device name to be opened.
If it's None, 'default' will be used for Linux and 'ASIO4ALL' for Windows.
n_ch : int, default=2
number of channels to be opened
format : {'INT16', 'UINT8'}, default='INT16'
audio data fortmat to be played
frame_rate : int, default=44100
frame rate of audio device.
frames_per_butter : int, default=512
frames per butter of audio device. Which means chunk size of audio data read in callback function will be set to this value.
"""
if device_name is None:
import platform
if platform.system() == "Linux":
self.device_name = "default"
elif platform.system() == "Windows":
self.device_name = "ASIO4ALL v2"
else:
raise ValueError("Unknown Operating System")
else:
self.device_name = device_name
self.format = format
if format.upper() == "INT16":
self.format_pyaudio = pyaudio.paInt16
self.format_numpy = numpy.dtype("int16")
elif format.upper() == "UINT8":
self.format_pyaudio = pyaudio.paUInt8
self.format_numpy = numpy.dtype("uint8")
else:
raise ValueError("Unknown Format : " + self.format + ". It can only take INT16 or UINT8.")
self.frame_rate = frame_rate
self.frames_per_buffer = frames_per_buffer
self.pya = pya = pyaudio.PyAudio()
hardware_information = HardwareInformation(pya)
available_devices = hardware_information.devices
logger.debug("Audio Device '%s' was selected.", self.device_name)
device = available_devices[self.device_name]
self.device_idx = device['index']
self.num_channels = n_ch
self.n_ReadAudioChunk_obj = 0
def open(self):
"""
open audio device.
"""
callback_params.init()
self.n_ReadAudioChunk_obj = 0
callback_params.n_ch = self.num_channels
callback_params.format = self.format_numpy
self.stream = self.pya.open(format=self.format_pyaudio,
channels=self.num_channels,
frames_per_buffer=self.frames_per_buffer,
output_device_index=self.device_idx,
rate=self.frame_rate,
output=True,
stream_callback=callback)
def play(self, data, ch):
"""
play audio data from specified channel.
Parameters
----------
data : numpy.ndarray
audio data which have a shape of (number of samples, number of channels)
ch : list of int
channel number of audio device to be played (start from 1)
Examples
--------
In this case, audio_data will be played from 1st channel of audio device.
>>> play(audio_data, [1])
"""
callback_params.data.append(ReadAudioChunk(data, self.frames_per_buffer, ch, format = self.format_numpy, idx_obj = self.n_ReadAudioChunk_obj))
callback_params.data_finished.append(False)
self.n_ReadAudioChunk_obj += 1 # should be add 1 after executing appending ReadAudioChunk obj.
def get_time_info(self):
"""
get time information from portaudio (PaStreamCallbackTimeInfo)
Returns
-------
time_info : dict
"""
return callback_params.time
def close(self):
"""
stop audio stream and close device.
"""
self.stream.stop_stream()
self.stream.close()
def terminate(self):
"""
terminate portaudio.
"""
self.pya.terminate()
[docs]class ReadAudioChunk(object):
"""
reading audio data by chunk.
Attributes
----------
data : numpy.ndarray
audio data.
n_ch_data : int
number of channels of audio data
n_frames : int
number of frames of audio data
chunk_sixe : int
chunk size.
ch : list of int
channel number of device which audio data will be played.
format : numpy.dtype
audio data format
finished : Bool
If True, all audio data have already been read. If False, there's remained data.
idx_obj : int
id of instance.
"""
def __init__(self, data, chunk_size, ch, volume=1.0, format = numpy.dtype("int16"), idx_obj = None):
"""
load data with specified settings
Parameteres
----------
data : numpy.ndarray
audio data.
chunk_sixe : int
chunk size.
ch : list of int
channel number of device which audio data will be played.
format : numpy.dtype, default=numpy.dtype('int16')
audio data format
idx_obj : int, default=None
id of instance.
"""
self.data = data.astype(format)
self.n_ch_data = data.shape[1] # n_ch of data
self.n_frames = data.shape[0]
self.current_idx = 0
self.chunk_size = chunk_size
self.ch = ch # ch num to be stimuli presented
self.format = format
#self.remained_frames = self.n_frames
self.finished = False
self.idx_obj = idx_obj # will be change to id
# if mono audio will be played from multiple channel,
# concatenate it to multiple channel matrix
# in order to make computational load in portaudio callback function less
while self.n_ch_data < len(self.ch):
self.data = numpy.hstack((self.data,self.data))
self.n_ch_data = self.data.shape[1]
[docs] def read_chunk(self):
"""
read audio data by chunk.
Returns
-------
chunk_data : numpy.ndarray
chunk data which have a shape of (chunk size, number of channels)
"""
start = self.current_idx*self.chunk_size
end = (self.current_idx*self.chunk_size)+self.chunk_size
if end >= (self.n_frames-1):
data = numpy.zeros((self.chunk_size, self.n_ch_data), dtype=self.format)
data[0:(self.n_frames-start-1),:] = self.data[start:-1,:]
#=====================
#This is old method.
#
#deleted in ver 0.0.6 since it is slow.
#=====================
# padding zero to last block
#data = self.data[start:-1,:]
#n_padding = self.chunk_size - data.shape[0]
#data = numpy.vstack((data, numpy.zeros((n_padding, self.n_ch_data), dtype=self.format)))
#data = numpy.vstack((data, numpy.zeros((n_padding, self.n_ch_data))))
#self.finished = True
#=====================
callback_params.data_finished[self.idx_obj] = True
else:
data = self.data[start:end,:]
#self.remained_frames -= self.chunk_size
self.current_idx += 1
return data
[docs] def is_finished(self):
"""
check if it's finished
Returns
-------
is_finished : bool
True if all data already been read, False if there's remained data.
"""
return self.finished
[docs] def get_ch(self):
"""
get channel number to play the audio data.
Returns
-------
channel : list of int
channel number which audio data will be played.
"""
return self.ch