12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- import av
- import numpy as np
- from pydub import AudioSegment
- class AudioFrameHandler:
- """To play/pass custom audio based on some event"""
- def __init__(self, sound_file_path: str = ""):
- self.custom_audio = AudioSegment.from_file(file=sound_file_path, format="wav")
- self.custom_audio_len = len(self.custom_audio)
- self.ms_per_audio_segment: int = 20
- self.audio_segment_shape: tuple
- self.play_state_tracker: dict = {"curr_segment": -1} # Currently playing segment
- self.audio_segments_created: bool = False
- self.audio_segments: list = []
- def prepare_audio(self, frame: av.AudioFrame):
- raw_samples = frame.to_ndarray()
- sound = AudioSegment(
- data=raw_samples.tobytes(),
- sample_width=frame.format.bytes,
- frame_rate=frame.sample_rate,
- channels=len(frame.layout.channels),
- )
- self.ms_per_audio_segment = len(sound)
- self.audio_segment_shape = raw_samples.shape
- self.custom_audio = self.custom_audio.set_channels(sound.channels)
- self.custom_audio = self.custom_audio.set_frame_rate(sound.frame_rate)
- self.custom_audio = self.custom_audio.set_sample_width(sound.sample_width)
- self.audio_segments = [
- self.custom_audio[i : i + self.ms_per_audio_segment]
- for i in range(0, self.custom_audio_len - self.custom_audio_len % self.ms_per_audio_segment, self.ms_per_audio_segment)
- ]
- self.total_segments = len(self.audio_segments) - 1 # -1 because we start from 0.
- self.audio_segments_created = True
- def process(self, frame: av.AudioFrame, play_sound: bool = False):
- """
- Takes in the current input audio frame and based on play_sound boolean value
- either starts sending the custom audio frame or dampens the frame wave to emulate silence.
- For eg. playing a notification based on some event.
- """
- if not self.audio_segments_created:
- self.prepare_audio(frame)
- raw_samples = frame.to_ndarray()
- _curr_segment = self.play_state_tracker["curr_segment"]
- if play_sound:
- if _curr_segment < self.total_segments:
- _curr_segment += 1
- else:
- _curr_segment = 0
- sound = self.audio_segments[_curr_segment]
- else:
- if -1 < _curr_segment < self.total_segments:
- _curr_segment += 1
- sound = self.audio_segments[_curr_segment]
- else:
- _curr_segment = -1
- sound = AudioSegment(
- data=raw_samples.tobytes(),
- sample_width=frame.format.bytes,
- frame_rate=frame.sample_rate,
- channels=len(frame.layout.channels),
- )
- sound = sound.apply_gain(-100)
- self.play_state_tracker["curr_segment"] = _curr_segment
- channel_sounds = sound.split_to_mono()
- channel_samples = [s.get_array_of_samples() for s in channel_sounds]
- new_samples = np.array(channel_samples).T
- new_samples = new_samples.reshape(self.audio_segment_shape)
- new_frame = av.AudioFrame.from_ndarray(new_samples, layout=frame.layout.name)
- new_frame.sample_rate = frame.sample_rate
- return new_frame
|