语音分离模型:mossfomer2

  • 计算短时能量
def compute_short_time_energy(audio: np.ndarray, frame_size: int, hop_size: int) -> np.ndarray:
    """计算音频信号的短时能量
    + 将音频分为若干帧,每一帧长度为 frame_size, 帧与帧之间以 hop_size 的步长滑动。
    + 每一帧的能量定义为该帧所有采样点的平方和。

    Args:
        audio (array): 1D音频序列
        frame_size (int): 帧长: 多少秒内的采样点数
        hop_size (int): 帧移: 多少秒内的采样点数

    Returns:
        array: 能量数组,每个元素对应一帧的能量
    """

    num_frames = int(np.ceil((len(audio) - frame_size) / hop_size)) + 1
    energy = np.zeros(num_frames)
    for i in range(num_frames):
        start = i * hop_size
        end = min(start + frame_size, len(audio))
        frame = audio[start:end]
        energy[i] = np.sum(frame ** 2)
    return energy
  • 主声源提取
def segment_audio(audio, sr, energy, threshold):
    """利用短时能量定位音频中的主声源区域

    Args:
        audio (array): 1D音频序列
        sr (int): 采样率
        energy (array): 每帧的短时能量数组
        threshold (float): 能量阈值,低于该值认为振幅较低

    Returns:
        list: 列表,每个元组包含 (开始时间, 结束时间, 主声源片段, 短时能量值)
    """

    segments = []
    acc_energy = []
    in_segment = False
    start_frame = None
    last_active_frame = None

    # 遍历每一帧能量,检测连续区域
    for i, e in enumerate(energy):
        if e >= threshold:
            # 当前帧能量达到阈值,若尚未进入段则记录起始帧
            if not in_segment:
                in_segment = True
                start_frame = i
            last_active_frame = i  # 更新最后一次达到阈值的帧
            acc_energy.append(e)
        else:
            # 当前帧能量低于阈值且之前处于主声源区域,完成一次分段
            if in_segment:
                start_sample = start_frame * hop_size
                # 以最后一次能量达到阈值的帧末尾作为结束位置
                end_sample = min(last_active_frame * hop_size + frame_size, len(audio))
                segments.append((start_sample / sr, end_sample / sr, audio[start_sample:end_sample], np.mean(acc_energy)))
                in_segment = False
                start_frame = None
                last_active_frame = None

    # 若音频结束时仍在主声源区域,则补全该段
    if in_segment:
        start_sample = start_frame * hop_size
        end_sample = len(audio)
        segments.append((start_sample / sr, end_sample / sr, audio[start_sample:end_sample]))

    return segments

def save_segments(segments, sr, output_dir="segments"):
    """保存检测到的主声源片段为独立的音频文件,并合并保存所有片段

    Args:
        segments (list): 分段结果列表,每个元组为 (开始时间, 结束时间, 主声源片段)
        sr (int): 采样率
        output_dir (str, optional): 保存的目录(默认保存到 "segments" 文件夹)
    """

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    merged_audio_list = [seg[-2] for seg in segments]

    # 合并所有片段并保存为一个文件
    if merged_audio_list:
        merged_audio = np.concatenate(merged_audio_list)
        merged_file = os.path.join(output_dir, "merged_segments.wav")
        sf.write(merged_file, merged_audio, sr)
        print(f"已保存合并文件:{merged_file}")
 # audio_file = "/home/wangguisen/projects/voice_separation/output/单轨音频/普通单轨音频_5_spk0.wav"
 audio_file = "/home/wangguisen/projects/voice_separation/2-.wav"
 audio, sr = sf.read(audio_file)

 # 参数设置:例如帧长20ms,帧移10ms
 frame_size = int(0.02 * sr)  # 20ms 对应的采样点数
 hop_size = int(0.01 * sr)    # 10ms 对应的采样点数

 print("[DEBUG] frame_size: ", frame_size)
 print("[DEBUG] hop_size: ", hop_size)

 # 设置能量阈值(根据实际情况调整),低于此值认为振幅较低
 threshold = 0.001

 # 计算短时能量
 energy = compute_short_time_energy(audio, frame_size, hop_size)

 # 利用短时能量分段定位主声源
 segments = segment_audio(audio, sr, energy, threshold)

 # 输出检测到的主声源区间
 print("检测到的主声源区间:")
 for seg in segments:
     start, end, _, e = seg
     print(f"开始时间: {start:.3f} s, 结束时间: {end:.3f} s, 短时能量值: {e: .3f}")

 # 保存每段主声源到独立文件,并合并保存所有片段
 save_segments(segments, sr, output_dir="segments")
Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐