audio_handling.py 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. import av
  2. import numpy as np
  3. from pydub import AudioSegment
  4. class AudioFrameHandler:
  5. """To play/pass custom audio based on some event"""
  6. def __init__(self, sound_file_path: str = ""):
  7. self.custom_audio = AudioSegment.from_file(file=sound_file_path, format="wav")
  8. self.custom_audio_len = len(self.custom_audio)
  9. self.ms_per_audio_segment: int = 20
  10. self.audio_segment_shape: tuple
  11. self.play_state_tracker: dict = {"curr_segment": -1} # Currently playing segment
  12. self.audio_segments_created: bool = False
  13. self.audio_segments: list = []
  14. def prepare_audio(self, frame: av.AudioFrame):
  15. raw_samples = frame.to_ndarray()
  16. sound = AudioSegment(
  17. data=raw_samples.tobytes(),
  18. sample_width=frame.format.bytes,
  19. frame_rate=frame.sample_rate,
  20. channels=len(frame.layout.channels),
  21. )
  22. self.ms_per_audio_segment = len(sound)
  23. self.audio_segment_shape = raw_samples.shape
  24. self.custom_audio = self.custom_audio.set_channels(sound.channels)
  25. self.custom_audio = self.custom_audio.set_frame_rate(sound.frame_rate)
  26. self.custom_audio = self.custom_audio.set_sample_width(sound.sample_width)
  27. self.audio_segments = [
  28. self.custom_audio[i : i + self.ms_per_audio_segment]
  29. for i in range(0, self.custom_audio_len - self.custom_audio_len % self.ms_per_audio_segment, self.ms_per_audio_segment)
  30. ]
  31. self.total_segments = len(self.audio_segments) - 1 # -1 because we start from 0.
  32. self.audio_segments_created = True
  33. def process(self, frame: av.AudioFrame, play_sound: bool = False):
  34. """
  35. Takes in the current input audio frame and based on play_sound boolean value
  36. either starts sending the custom audio frame or dampens the frame wave to emulate silence.
  37. For eg. playing a notification based on some event.
  38. """
  39. if not self.audio_segments_created:
  40. self.prepare_audio(frame)
  41. raw_samples = frame.to_ndarray()
  42. _curr_segment = self.play_state_tracker["curr_segment"]
  43. if play_sound:
  44. if _curr_segment < self.total_segments:
  45. _curr_segment += 1
  46. else:
  47. _curr_segment = 0
  48. sound = self.audio_segments[_curr_segment]
  49. else:
  50. if -1 < _curr_segment < self.total_segments:
  51. _curr_segment += 1
  52. sound = self.audio_segments[_curr_segment]
  53. else:
  54. _curr_segment = -1
  55. sound = AudioSegment(
  56. data=raw_samples.tobytes(),
  57. sample_width=frame.format.bytes,
  58. frame_rate=frame.sample_rate,
  59. channels=len(frame.layout.channels),
  60. )
  61. sound = sound.apply_gain(-100)
  62. self.play_state_tracker["curr_segment"] = _curr_segment
  63. channel_sounds = sound.split_to_mono()
  64. channel_samples = [s.get_array_of_samples() for s in channel_sounds]
  65. new_samples = np.array(channel_samples).T
  66. new_samples = new_samples.reshape(self.audio_segment_shape)
  67. new_frame = av.AudioFrame.from_ndarray(new_samples, layout=frame.layout.name)
  68. new_frame.sample_rate = frame.sample_rate
  69. return new_frame