CÁCH TÍCH HỢP SPEAKER DIARIZATION VÀO VIDEO DUBBER TỐT NHẤT
- Get link
- X
- Other Apps
By
Jane Wilde
-
Dưới đây là phương pháp tối ưu nhất để tích hợp Speaker Diarization vào VideoDubber, kèm code chi tiết và best practices:
🏆 PHƯƠNG PHÁP TỐI ƯU: Kết hợp WhisperX + PyAnnote
python
# video_dubber_with_diarization.py import torch import whisperx import numpy as np from pyannote.audio import Pipeline import json from pathlib import Path import subprocess import warnings warnings.filterwarnings("ignore") class AdvancedVideoDubber: """VideoDubber tích hợp diarization nâng cao""" def __init__(self, config): self.config = config self.device = "cuda" if torch.cuda.is_available() else "cpu" self.compute_type = "float16" if self.device == "cuda" else "int8" # Khởi tạo models self.whisper_model = None self.diarization_model = None self.align_model = None self.metadata = None def initialize_models(self): """Khởi tạo tất cả models với lazy loading""" print("🔄 Đang khởi tạo models...") # 1. Whisper model cho transcription print(" 📝 Loading Whisper model...") self.whisper_model = whisperx.load_model( self.config["whisper_model"], self.device, compute_type=self.compute_type, language=self.config.get("source_lang", "zh") ) # 2. Diarization model (PyAnnote) print(" 👥 Loading Diarization model...") self.diarization_model = whisperx.DiarizationPipeline( use_auth_token=self.config["hf_token"], device=self.device ) # 3. Alignment model print(" 🔗 Loading Alignment model...") self.align_model, self.metadata = whisperx.load_align_model( language_code=self.config.get("source_lang", "zh"), device=self.device ) print("✅ Models đã sẵn sàng") def extract_audio(self, video_path): """Trích xuất audio chất lượng cao""" audio_path = video_path.replace(".mp4", ".wav") cmd = [ "ffmpeg", "-i", video_path, "-ac", "1", "-ar", "16000", # Mono, 16kHz cho Whisper "-acodec", "pcm_s16le", "-y", audio_path ] subprocess.run(cmd, check=True, capture_output=True) return audio_path def process_with_diarization(self, audio_path): """Xử lý với diarization nâng cao""" print("🎯 Đang xử lý audio với diarization...") # Load audio audio = whisperx.load_audio(audio_path) # Bước 1: Transcribe với Whisper print(" 🔍 Transcribing...") result = self.whisper_model.transcribe( audio, batch_size=self.config.get("batch_size", 16), language=self.config.get("source_lang", "zh") ) # Bước 2: Alignment để có word-level timestamps print(" ⏱️ Aligning words...") result = whisperx.align( result["segments"], self.align_model, self.metadata, audio, self.device, return_char_alignments=False ) # Bước 3: Diarization print(" 🎭 Diarizing speakers...") diarize_segments = self.diarization_model(audio) # Bước 4: Assign speakers to words print(" 🤝 Assigning speakers to words...") result = whisperx.assign_word_speakers( diarize_segments, result ) return result def group_by_speaker(self, result): """Nhóm các segments theo speaker""" speakers = {} for segment in result["segments"]: speaker = segment.get("speaker", "UNKNOWN") if speaker not in speakers: speakers[speaker] = { "segments": [], "total_duration": 0, "text": [] } # Thêm segment speakers[speaker]["segments"].append({ "start": segment["start"], "end": segment["end"], "text": segment["text"], "words": segment.get("words", []) }) speakers[speaker]["total_duration"] += segment["end"] - segment["start"] speakers[speaker]["text"].append(segment["text"]) # Gộp các segments gần nhau của cùng speaker return self._merge_segments(speakers) def _merge_segments(self, speakers, max_gap=1.0): """Gộp các segments gần nhau""" merged_speakers = {} for speaker_id, data in speakers.items(): if not data["segments"]: continue # Sắp xếp segments theo thời gian sorted_segments = sorted(data["segments"], key=lambda x: x["start"]) merged = [] current = sorted_segments[0].copy() for next_seg in sorted_segments[1:]: # Nếu khoảng cách nhỏ hơn max_gap và cùng speaker if next_seg["start"] - current["end"] <= max_gap: # Gộp segments current["end"] = next_seg["end"] current["text"] = current["text"] + " " + next_seg["text"] current["words"].extend(next_seg["words"]) else: merged.append(current) current = next_seg.copy() merged.append(current) merged_speakers[speaker_id] = { "segments": merged, "total_duration": sum(s["end"] - s["start"] for s in merged), "full_text": " ".join([s["text"] for s in merged]) } return merged_speakers def translate_speaker_text(self, speakers_data): """Dịch text của từng speaker""" print("🌐 Đang dịch từng speaker...") # Sử dụng model dịch for speaker_id, data in speakers_data.items(): text = data["full_text"] # Gọi translation model (ví dụ: NLLB, OPUS-MT) translated = self._translate_text(text, src_lang=self.config["source_lang"], tgt_lang=self.config["target_lang"]) speakers_data[speaker_id]["translated_text"] = translated # Chia translated text theo segments proportionally self._align_translation_to_segments(speakers_data[speaker_id]) return speakers_data def _translate_text(self, text, src_lang, tgt_lang): """Dịch text sử dụng model cục bộ""" # Có thể dùng: transformers, argostranslate, hoặc API local # Ví dụ với OPUS-MT: from transformers import MarianMTModel, MarianTokenizer model_name = f"Helsinki-NLP/opus-mt-{src_lang}-{tgt_lang}" tokenizer = MarianTokenizer.from_pretrained(model_name) model = MarianMTModel.from_pretrained(model_name).to(self.device) # Tokenize và translate inputs = tokenizer(text, return_tensors="pt", truncation=True).to(self.device) translated = model.generate(**inputs) translated_text = tokenizer.decode(translated[0], skip_special_tokens=True) return translated_text def _align_translation_to_segments(self, speaker_data): """Căn chỉnh bản dịch với các segments""" # Đơn giản: chia đều theo số từ segments = speaker_data["segments"] translated_text = speaker_data["translated_text"] # Tính tỷ lệ cho mỗi segment total_words = sum(len(seg["text"].split()) for seg in segments) current_pos = 0 for seg in segments: seg_word_count = len(seg["text"].split()) seg_ratio = seg_word_count / total_words # Tính số từ trong bản dịch cho segment này trans_words = translated_text.split() trans_count = int(len(trans_words) * seg_ratio) seg["translated_text"] = " ".join( trans_words[current_pos:current_pos + trans_count] ) current_pos += trans_count # Phần còn lại (nếu có) thêm vào segment cuối if current_pos < len(trans_words): segments[-1]["translated_text"] += " " + " ".join( trans_words[current_pos:] ) def tts_for_speakers(self, speakers_data): """TTS với voice khác nhau cho từng speaker""" print("🗣️ Đang tạo giọng nói cho từng speaker...") from TTS.api import TTS # Voice mapping cho các speakers voice_mapping = self.config.get("voice_mapping", { "SPEAKER_00": "en_US/male", "SPEAKER_01": "en_US/female", "SPEAKER_02": "en_US/neutral", }) tts_model = TTS("tts_models/multilingual/multi-dataset/xtts_v2").to(self.device) for speaker_id, data in speakers_data.items(): voice_key = voice_mapping.get(speaker_id, "en_US/neutral") # Tạo audio cho từng segment audio_segments = [] for seg in data["segments"]: if not seg.get("translated_text"): continue output_file = f"temp/{speaker_id}_{seg['start']:.2f}.wav" tts_model.tts_to_file( text=seg["translated_text"], speaker_wav=self.config.get("reference_audio"), language="en", file_path=output_file ) audio_segments.append({ "file": output_file, "start": seg["start"], "end": seg["end"] }) speakers_data[speaker_id]["audio_segments"] = audio_segments return speakers_data def merge_audio_tracks(self, speakers_data, original_audio_path, output_path): """Ghép các audio tracks lại với nhau""" print("🔊 Đang ghép audio tracks...") import soundfile as sf from pydub import AudioSegment import librosa # Load original audio để lấy duration audio, sr = librosa.load(original_audio_path, sr=16000) duration = len(audio) / sr # Tạo silent audio track silent = AudioSegment.silent(duration=duration * 1000) # pydub uses milliseconds # Overlay từng speaker audio for speaker_id, data in speakers_data.items(): for seg in data.get("audio_segments", []): # Load audio segment seg_audio = AudioSegment.from_wav(seg["file"]) # Điều chỉnh độ dài nếu cần target_duration = (seg["end"] - seg["start"]) * 1000 if len(seg_audio) > target_duration: # Cắt bớt seg_audio = seg_audio[:target_duration] elif len(seg_audio) < target_duration: # Thêm silence silence_needed = target_duration - len(seg_audio) seg_audio += AudioSegment.silent(duration=silence_needed) # Overlay vào track chính start_ms = seg["start"] * 1000 silent = silent.overlay(seg_audio, position=start_ms) # Export merged audio silent.export(output_path, format="wav") return output_path def lipsync_video(self, video_path, audio_path, output_path): """Đồng bộ môi với video""" print("👄 Đang đồng bộ môi...") # Sử dụng Wav2Lip hoặc SadTalker from wav2lip_inference import Wav2Lip lip_sync_model = Wav2Lip() lip_sync_model.process( face=video_path, audio=audio_path, outfile=output_path ) return output_path def process_video(self, video_path, output_path="output_dubbed.mp4"): """Pipeline chính xử lý video""" try: # 1. Khởi tạo models self.initialize_models() # 2. Trích xuất audio print("🎵 Trích xuất audio từ video...") audio_path = self.extract_audio(video_path) # 3. Xử lý với diarization result = self.process_with_diarization(audio_path) # 4. Nhóm theo speaker print("👥 Nhóm theo speakers...") speakers_data = self.group_by_speaker(result) # 5. Dịch text speakers_data = self.translate_speaker_text(speakers_data) # 6. TTS cho từng speaker speakers_data = self.tts_for_speakers(speakers_data) # 7. Ghép audio tracks print("🎛️ Ghép audio tracks...") merged_audio = self.merge_audio_tracks( speakers_data, audio_path, "temp/merged_audio.wav" ) # 8. Đồng bộ môi final_video = self.lipsync_video( video_path, merged_audio, output_path ) print(f"✅ Hoàn thành! Video đã lưu tại: {final_video}") # Export metadata self.export_metadata(speakers_data, "output/metadata.json") return final_video except Exception as e: print(f"❌ Lỗi: {e}") raise def export_metadata(self, speakers_data, output_path): """Xuất metadata chi tiết""" metadata = { "speakers": {}, "statistics": { "total_speakers": len(speakers_data), "total_duration": sum(d["total_duration"] for d in speakers_data.values()) } } for speaker_id, data in speakers_data.items(): metadata["speakers"][speaker_id] = { "total_duration": data["total_duration"], "segment_count": len(data["segments"]), "original_text": data.get("full_text", ""), "translated_text": data.get("translated_text", ""), "segments": data["segments"] } with open(output_path, "w", encoding="utf-8") as f: json.dump(metadata, f, ensure_ascii=False, indent=2) # CONFIGURATION CONFIG = { "whisper_model": "large-v3", # hoặc "medium", "large-v2" "hf_token": "YOUR_HUGGINGFACE_TOKEN", # Lấy từ huggingface.co/settings/tokens "source_lang": "zh", "target_lang": "en", "batch_size": 16, "voice_mapping": { "SPEAKER_00": "en/male", "SPEAKER_01": "en/female", "SPEAKER_02": "en/neutral" }, "reference_audio": "reference_voice.wav" # cho XTTS } if __name__ == "__main__": # Khởi tạo dubber = AdvancedVideoDubber(CONFIG) # Xử lý video dubber.process_video( "input_video.mp4", "output_dubbed.mp4" )
📁 CẤU TRÚC DỰ ÁN HOÀN CHỈNH
text
video-dubber-enhanced/
├── README.md
├── requirements.txt
├── config/
│ ├── diarization.yaml
│ ├── translation.yaml
│ └── voices.yaml
├── src/
│ ├── __init__.py
│ ├── audio_processor.py
│ ├── diarization.py # Core diarization logic
│ ├── translator.py
│ ├── tts_engine.py
│ ├── lipsync.py
│ └── video_dubber.py # Main pipeline
├── models/
│ ├── whisper/
│ ├── diarization/
│ └── tts/
├── utils/
│ ├── audio_utils.py
│ ├── video_utils.py
│ └── logger.py
├── tests/
│ ├── test_diarization.py
│ └── test_integration.py
├── scripts/
│ ├── install_dependencies.sh
│ └── download_models.sh
└── examples/
└── multi_speaker_demo.py📦 INSTALLATION SCRIPT
bash
#!/bin/bash # install_dependencies.sh echo "Cài đặt VideoDubber Enhanced với Diarization..." # 1. Cập nhật hệ thống sudo apt update sudo apt install -y ffmpeg python3-pip python3-venv git # 2. Tạo môi trường ảo python3 -m venv venv source venv/bin/activate # 3. Cài đặt PyTorch (phiên bản phù hợp với CUDA) pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118 # 4. Cài đặt WhisperX và dependencies pip install whisperx pip install git+https://github.com/m-bain/whisperx.git # 5. Cài đặt pyannote.audio pip install pyannote.audio pip install pyannote.pipeline # 6. Cài đặt TTS engines pip install TTS pip install coqui-tts # 7. Cài đặt audio/video processing pip install pydub moviepy librosa soundfile # 8. Cài đặt các utilities pip install rich tqdm numpy pandas # 9. Cài đặt transformers cho translation pip install transformers sentencepiece # 10. Tải models echo "Tải pre-trained models..." python -c " import whisperx import torch # Tải Whisper model print('Tải Whisper model...') model = whisperx.load_model('large-v3', 'cuda' if torch.cuda.is_available() else 'cpu') # Tải diarization model (cần HF token) print('LƯU Ý: Cần set HF_TOKEN environment variable') " echo "✅ Cài đặt hoàn tất!" echo "👉 Chạy: source venv/bin/activate" echo "👉 Sau đó: python src/video_dubber.py"
⚙️ CONFIGURATION YAML
yaml
# config/diarization.yaml diarization: engine: "pyannote" # or "nemo", "whisperx" model: "pyannote/speaker-diarization-3.1" min_speakers: 1 max_speakers: 10 clustering: method: "spectral" threshold: 0.7 vad: enabled: true threshold: 0.5 min_speech_duration: 0.1 postprocessing: merge_gap: 0.5 # seconds min_segment_length: 0.3 # seconds whisper: model_size: "large-v3" language: "zh" beam_size: 5 best_of: 5 temperature: 0.0 compression_ratio_threshold: 2.4 logprob_threshold: -1.0 no_speech_threshold: 0.6 condition_on_previous_text: true initial_prompt: null word_timestamps: true prepend_punctuations: "\"'“¿([{-" append_punctuations: "\"'.。,,!!??::”)]}、"
🚀 OPTIMIZED PIPELINE WITH CACHING
python
# src/optimized_dubber.py import hashlib import pickle from functools import lru_cache from pathlib import Path class OptimizedVideoDubber(AdvancedVideoDubber): """VideoDubber với caching và optimization""" def __init__(self, config): super().__init__(config) self.cache_dir = Path("cache") self.cache_dir.mkdir(exist_ok=True) def _get_cache_key(self, video_path, step): """Tạo cache key từ video và processing step""" # Hash video file with open(video_path, 'rb') as f: video_hash = hashlib.md5(f.read()).hexdigest() config_hash = hashlib.md5( str(self.config).encode() ).hexdigest() return f"{video_hash}_{config_hash}_{step}" @lru_cache(maxsize=10) def cached_diarization(self, audio_path): """Cached diarization để tránh reprocessing""" cache_key = self._get_cache_key(audio_path, "diarization") cache_file = self.cache_dir / f"{cache_key}.pkl" if cache_file.exists(): print("📦 Loading diarization from cache...") with open(cache_file, 'rb') as f: return pickle.load(f) # Process và cache result = super().process_with_diarization(audio_path) with open(cache_file, 'wb') as f: pickle.dump(result, f) return result def process_with_optimization(self, video_path): """Pipeline tối ưu với caching và parallel processing""" from concurrent.futures import ThreadPoolExecutor, as_completed # 1. Extract audio audio_path = self.extract_audio(video_path) # 2. Diarization (cached) result = self.cached_diarization(audio_path) # 3. Group speakers speakers_data = self.group_by_speaker(result) # 4. Parallel translation và TTS cho từng speaker with ThreadPoolExecutor(max_workers=len(speakers_data)) as executor: future_to_speaker = {} for speaker_id, data in speakers_data.items(): future = executor.submit( self.process_single_speaker, speaker_id, data, self.config ) future_to_speaker[future] = speaker_id # Collect results for future in as_completed(future_to_speaker): speaker_id = future_to_speaker[future] try: speakers_data[speaker_id] = future.result() except Exception as e: print(f"Error processing speaker {speaker_id}: {e}") # 5. Merge và sync return self.merge_and_sync(speakers_data, video_path) def process_single_speaker(self, speaker_id, data, config): """Xử lý song song cho từng speaker""" # Translation data = self.translate_speaker_text({speaker_id: data})[speaker_id] # TTS data = self.tts_for_speakers({speaker_id: data})[speaker_id] return data
🎯 BEST PRACTICES
1. Model Selection Matrix
python
DIARIZATION_MODELS = { "high_accuracy": { "name": "pyannote/speaker-diarization-3.1", "requirements": "HF Token, GPU, >8GB RAM", "accuracy": "95%+", "best_for": "Studio quality, interviews" }, "balanced": { "name": "nvidia/speaker-diarization", "requirements": "GPU, 4GB RAM", "accuracy": "90%", "best_for": "General purpose" }, "fast": { "name": "whisperx-diarize", "requirements": "CPU/GPU, 2GB RAM", "accuracy": "85%", "best_for": "Real-time, low-resource" } }
2. Speaker Voice Mapping Strategy
python
def intelligent_voice_mapping(speakers_data): """Phân tích và chọn giọng phù hợp cho từng speaker""" voice_profiles = { "male_deep": {"pitch": "low", "speed": "slow", "model": "en_male_01"}, "male_mid": {"pitch": "medium", "speed": "normal", "model": "en_male_02"}, "female_high": {"pitch": "high", "speed": "fast", "model": "en_female_01"}, "female_mid": {"pitch": "medium", "speed": "normal", "model": "en_female_02"}, "child": {"pitch": "very_high", "speed": "fast", "model": "en_child_01"} } # Phân tích acoustic features for speaker_id, data in speakers_data.items(): # Estimate gender từ pitch (nếu có reference audio) # Hoặc dựa vào context if "先生" in data["full_text"] or "男" in data["full_text"]: data["voice_profile"] = voice_profiles["male_mid"] elif "女士" in data["full_text"] or "女" in data["full_text"]: data["voice_profile"] = voice_profiles["female_mid"] else: # Default based on speaking duration và style data["voice_profile"] = voice_profiles["male_mid"] return speakers_data
3. Error Handling & Fallbacks
python
class RobustDiarization: """Diarization với fallback mechanisms""" def diarize_with_fallback(self, audio_path): """Thử nhiều phương pháp nếu phương pháp chính fail""" methods = [ self._pyannote_diarization, self._nemo_diarization, self._simple_diarization ] for method in methods: try: result = method(audio_path) if self._validate_diarization(result): return result except Exception as e: print(f"Method {method.__name__} failed: {e}") continue # Fallback cuối cùng: single speaker return self._single_speaker_fallback(audio_path) def _validate_diarization(self, result): """Validate diarization result""" if not result or "segments" not in result: return False # Check segment consistency segments = result["segments"] if len(segments) == 0: return False # Check for overlapping segments sorted_segments = sorted(segments, key=lambda x: x["start"]) for i in range(1, len(sorted_segments)): if sorted_segments[i]["start"] < sorted_segments[i-1]["end"]: return False return True
📊 PERFORMANCE OPTIMIZATION
python
# performance_optimizer.py import torch import gc from contextlib import contextmanager @contextmanager def optimized_inference(device="cuda"): """Context manager cho inference tối ưu""" torch.backends.cudnn.benchmark = True torch.set_grad_enabled(False) if device == "cuda": torch.cuda.empty_cache() try: yield finally: gc.collect() if device == "cuda": torch.cuda.empty_cache() class PerformanceOptimizer: def __init__(self): self.batch_sizes = { "whisper": 32, "diarization": 16, "tts": 8 } def auto_tune_batch_size(self, model_type, available_memory): """Tự động điều chỉnh batch size dựa trên memory""" if model_type == "whisper": if available_memory > 16000: # 16GB return 32 elif available_memory > 8000: # 8GB return 16 else: return 8 # Similar logic for other models return 4 def mixed_precision_inference(self, model, inputs): """Mixed precision inference để tiết kiệm memory""" with torch.cuda.amp.autocast(): return model(inputs)
🧪 TESTING & VALIDATION
python
# tests/test_diarization_integration.py import pytest import numpy as np from src.diarization import AdvancedVideoDubber class TestDiarizationIntegration: @pytest.fixture def dubber(self): return AdvancedVideoDubber(test_config) def test_multi_speaker_detection(self, dubber): """Test nhận diện multiple speakers""" # Tạo test audio với 3 speakers test_audio = self.create_test_audio(speakers=3) result = dubber.process_with_diarization(test_audio) # Kiểm tra số lượng speakers speakers = set(seg.get("speaker", "UNKNOWN") for seg in result["segments"]) assert len(speakers) >= 2, f"Chỉ detect được {len(speakers)} speakers" def test_speaker_consistency(self, dubber): """Test consistency của speaker assignment""" result = dubber.process_with_diarization("test_audio.wav") # Check không có overlapping segments với cùng speaker segments_by_speaker = {} for seg in result["segments"]: speaker = seg["speaker"] if speaker not in segments_by_speaker: segments_by_speaker[speaker] = [] segments_by_speaker[speaker].append(seg) for speaker, segs in segments_by_speaker.items(): sorted_segs = sorted(segs, key=lambda x: x["start"]) for i in range(1, len(sorted_segs)): assert sorted_segs[i]["start"] >= sorted_segs[i-1]["end"], \ f"Speaker {speaker} có overlapping segments"
🚀 DEPLOYMENT WITH DOCKER
dockerfile
# Dockerfile
FROM nvidia/cuda:11.8.0-runtime-ubuntu22.04
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
ffmpeg \
python3.10 \
python3-pip \
git \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements
COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt
# Download models
RUN python3 -c "
import whisperx
import torch
whisperx.load_model('large-v3', 'cpu')
"
# Copy source code
COPY . .
# Create cache directory
RUN mkdir -p /app/cache /app/output /app/temp
# Environment variables
ENV HF_TOKEN=your_token_here
ENV PYTHONPATH=/app
CMD ["python3", "src/video_dubber.py"]📈 BENCHMARK RESULTS
python
# benchmark.py import time from dataclasses import dataclass @dataclass class BenchmarkResult: video_duration: float processing_time: float speakers_detected: int accuracy: float memory_used: float fps: float class VideoDubberBenchmark: def run_benchmark(self, video_path): """Chạy benchmark toàn diện""" results = [] # Test với các config khác nhau configs = [ {"whisper_model": "base", "diarization": "simple"}, {"whisper_model": "medium", "diarization": "pyannote"}, {"whisper_model": "large-v3", "diarization": "pyannote+hf"} ] for config in configs: dubber = AdvancedVideoDubber(config) start_time = time.time() result = dubber.process_video(video_path) end_time = time.time() benchmark = BenchmarkResult( video_duration=self.get_video_duration(video_path), processing_time=end_time - start_time, speakers_detected=len(result["speakers"]), accuracy=self.calculate_accuracy(result), memory_used=self.get_memory_usage(), fps=self.calculate_fps(video_path, result) ) results.append(benchmark) return results
KẾT LUẬN: Cách tốt nhất là kết hợp WhisperX cho transcription + alignment và PyAnnote cho diarization, với caching và parallel processing. Đảm bảo:
✅ Sử dụng GPU để tăng tốc
✅ Implement caching để tránh reprocessing
✅ Parallel processing cho từng speaker
✅ Intelligent voice mapping
✅ Comprehensive error handling
✅ Performance monitoring và optimization
- Get link
- X
- Other Apps
Comments
Post a Comment