CÁCH TÍCH HỢP SPEAKER DIARIZATION VÀO VIDEO DUBBER TỐT NHẤT

 Dưới đây là phương pháp tối ưu nhất để tích hợp Speaker Diarization vào VideoDubber, kèm code chi tiết và best practices:

🏆 PHƯƠNG PHÁP TỐI ƯU: Kết hợp WhisperX + PyAnnote

python
# video_dubber_with_diarization.py
import torch
import whisperx
import numpy as np
from pyannote.audio import Pipeline
import json
from pathlib import Path
import subprocess
import warnings
warnings.filterwarnings("ignore")

class AdvancedVideoDubber:
    """VideoDubber tích hợp diarization nâng cao"""
    
    def __init__(self, config):
        self.config = config
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.compute_type = "float16" if self.device == "cuda" else "int8"
        
        # Khởi tạo models
        self.whisper_model = None
        self.diarization_model = None
        self.align_model = None
        self.metadata = None
        
    def initialize_models(self):
        """Khởi tạo tất cả models với lazy loading"""
        print("🔄 Đang khởi tạo models...")
        
        # 1. Whisper model cho transcription
        print("   📝 Loading Whisper model...")
        self.whisper_model = whisperx.load_model(
            self.config["whisper_model"],
            self.device,
            compute_type=self.compute_type,
            language=self.config.get("source_lang", "zh")
        )
        
        # 2. Diarization model (PyAnnote)
        print("   👥 Loading Diarization model...")
        self.diarization_model = whisperx.DiarizationPipeline(
            use_auth_token=self.config["hf_token"],
            device=self.device
        )
        
        # 3. Alignment model
        print("   🔗 Loading Alignment model...")
        self.align_model, self.metadata = whisperx.load_align_model(
            language_code=self.config.get("source_lang", "zh"),
            device=self.device
        )
        
        print("✅ Models đã sẵn sàng")
        
    def extract_audio(self, video_path):
        """Trích xuất audio chất lượng cao"""
        audio_path = video_path.replace(".mp4", ".wav")
        cmd = [
            "ffmpeg", "-i", video_path,
            "-ac", "1", "-ar", "16000",  # Mono, 16kHz cho Whisper
            "-acodec", "pcm_s16le",
            "-y", audio_path
        ]
        subprocess.run(cmd, check=True, capture_output=True)
        return audio_path
    
    def process_with_diarization(self, audio_path):
        """Xử lý với diarization nâng cao"""
        print("🎯 Đang xử lý audio với diarization...")
        
        # Load audio
        audio = whisperx.load_audio(audio_path)
        
        # Bước 1: Transcribe với Whisper
        print("   🔍 Transcribing...")
        result = self.whisper_model.transcribe(
            audio,
            batch_size=self.config.get("batch_size", 16),
            language=self.config.get("source_lang", "zh")
        )
        
        # Bước 2: Alignment để có word-level timestamps
        print("   ⏱️  Aligning words...")
        result = whisperx.align(
            result["segments"],
            self.align_model,
            self.metadata,
            audio,
            self.device,
            return_char_alignments=False
        )
        
        # Bước 3: Diarization
        print("   🎭 Diarizing speakers...")
        diarize_segments = self.diarization_model(audio)
        
        # Bước 4: Assign speakers to words
        print("   🤝 Assigning speakers to words...")
        result = whisperx.assign_word_speakers(
            diarize_segments,
            result
        )
        
        return result
    
    def group_by_speaker(self, result):
        """Nhóm các segments theo speaker"""
        speakers = {}
        
        for segment in result["segments"]:
            speaker = segment.get("speaker", "UNKNOWN")
            
            if speaker not in speakers:
                speakers[speaker] = {
                    "segments": [],
                    "total_duration": 0,
                    "text": []
                }
            
            # Thêm segment
            speakers[speaker]["segments"].append({
                "start": segment["start"],
                "end": segment["end"],
                "text": segment["text"],
                "words": segment.get("words", [])
            })
            
            speakers[speaker]["total_duration"] += segment["end"] - segment["start"]
            speakers[speaker]["text"].append(segment["text"])
        
        # Gộp các segments gần nhau của cùng speaker
        return self._merge_segments(speakers)
    
    def _merge_segments(self, speakers, max_gap=1.0):
        """Gộp các segments gần nhau"""
        merged_speakers = {}
        
        for speaker_id, data in speakers.items():
            if not data["segments"]:
                continue
                
            # Sắp xếp segments theo thời gian
            sorted_segments = sorted(data["segments"], key=lambda x: x["start"])
            
            merged = []
            current = sorted_segments[0].copy()
            
            for next_seg in sorted_segments[1:]:
                # Nếu khoảng cách nhỏ hơn max_gap và cùng speaker
                if next_seg["start"] - current["end"] <= max_gap:
                    # Gộp segments
                    current["end"] = next_seg["end"]
                    current["text"] = current["text"] + " " + next_seg["text"]
                    current["words"].extend(next_seg["words"])
                else:
                    merged.append(current)
                    current = next_seg.copy()
            
            merged.append(current)
            
            merged_speakers[speaker_id] = {
                "segments": merged,
                "total_duration": sum(s["end"] - s["start"] for s in merged),
                "full_text": " ".join([s["text"] for s in merged])
            }
        
        return merged_speakers
    
    def translate_speaker_text(self, speakers_data):
        """Dịch text của từng speaker"""
        print("🌐 Đang dịch từng speaker...")
        
        # Sử dụng model dịch
        for speaker_id, data in speakers_data.items():
            text = data["full_text"]
            
            # Gọi translation model (ví dụ: NLLB, OPUS-MT)
            translated = self._translate_text(text, 
                                            src_lang=self.config["source_lang"],
                                            tgt_lang=self.config["target_lang"])
            
            speakers_data[speaker_id]["translated_text"] = translated
            
            # Chia translated text theo segments proportionally
            self._align_translation_to_segments(speakers_data[speaker_id])
        
        return speakers_data
    
    def _translate_text(self, text, src_lang, tgt_lang):
        """Dịch text sử dụng model cục bộ"""
        # Có thể dùng: transformers, argostranslate, hoặc API local
        # Ví dụ với OPUS-MT:
        from transformers import MarianMTModel, MarianTokenizer
        
        model_name = f"Helsinki-NLP/opus-mt-{src_lang}-{tgt_lang}"
        tokenizer = MarianTokenizer.from_pretrained(model_name)
        model = MarianMTModel.from_pretrained(model_name).to(self.device)
        
        # Tokenize và translate
        inputs = tokenizer(text, return_tensors="pt", truncation=True).to(self.device)
        translated = model.generate(**inputs)
        translated_text = tokenizer.decode(translated[0], skip_special_tokens=True)
        
        return translated_text
    
    def _align_translation_to_segments(self, speaker_data):
        """Căn chỉnh bản dịch với các segments"""
        # Đơn giản: chia đều theo số từ
        segments = speaker_data["segments"]
        translated_text = speaker_data["translated_text"]
        
        # Tính tỷ lệ cho mỗi segment
        total_words = sum(len(seg["text"].split()) for seg in segments)
        
        current_pos = 0
        for seg in segments:
            seg_word_count = len(seg["text"].split())
            seg_ratio = seg_word_count / total_words
            
            # Tính số từ trong bản dịch cho segment này
            trans_words = translated_text.split()
            trans_count = int(len(trans_words) * seg_ratio)
            
            seg["translated_text"] = " ".join(
                trans_words[current_pos:current_pos + trans_count]
            )
            current_pos += trans_count
        
        # Phần còn lại (nếu có) thêm vào segment cuối
        if current_pos < len(trans_words):
            segments[-1]["translated_text"] += " " + " ".join(
                trans_words[current_pos:]
            )
    
    def tts_for_speakers(self, speakers_data):
        """TTS với voice khác nhau cho từng speaker"""
        print("🗣️  Đang tạo giọng nói cho từng speaker...")
        
        from TTS.api import TTS
        
        # Voice mapping cho các speakers
        voice_mapping = self.config.get("voice_mapping", {
            "SPEAKER_00": "en_US/male",
            "SPEAKER_01": "en_US/female",
            "SPEAKER_02": "en_US/neutral",
        })
        
        tts_model = TTS("tts_models/multilingual/multi-dataset/xtts_v2").to(self.device)
        
        for speaker_id, data in speakers_data.items():
            voice_key = voice_mapping.get(speaker_id, "en_US/neutral")
            
            # Tạo audio cho từng segment
            audio_segments = []
            for seg in data["segments"]:
                if not seg.get("translated_text"):
                    continue
                    
                output_file = f"temp/{speaker_id}_{seg['start']:.2f}.wav"
                
                tts_model.tts_to_file(
                    text=seg["translated_text"],
                    speaker_wav=self.config.get("reference_audio"),
                    language="en",
                    file_path=output_file
                )
                
                audio_segments.append({
                    "file": output_file,
                    "start": seg["start"],
                    "end": seg["end"]
                })
            
            speakers_data[speaker_id]["audio_segments"] = audio_segments
        
        return speakers_data
    
    def merge_audio_tracks(self, speakers_data, original_audio_path, output_path):
        """Ghép các audio tracks lại với nhau"""
        print("🔊 Đang ghép audio tracks...")
        
        import soundfile as sf
        from pydub import AudioSegment
        import librosa
        
        # Load original audio để lấy duration
        audio, sr = librosa.load(original_audio_path, sr=16000)
        duration = len(audio) / sr
        
        # Tạo silent audio track
        silent = AudioSegment.silent(duration=duration * 1000)  # pydub uses milliseconds
        
        # Overlay từng speaker audio
        for speaker_id, data in speakers_data.items():
            for seg in data.get("audio_segments", []):
                # Load audio segment
                seg_audio = AudioSegment.from_wav(seg["file"])
                
                # Điều chỉnh độ dài nếu cần
                target_duration = (seg["end"] - seg["start"]) * 1000
                if len(seg_audio) > target_duration:
                    # Cắt bớt
                    seg_audio = seg_audio[:target_duration]
                elif len(seg_audio) < target_duration:
                    # Thêm silence
                    silence_needed = target_duration - len(seg_audio)
                    seg_audio += AudioSegment.silent(duration=silence_needed)
                
                # Overlay vào track chính
                start_ms = seg["start"] * 1000
                silent = silent.overlay(seg_audio, position=start_ms)
        
        # Export merged audio
        silent.export(output_path, format="wav")
        return output_path
    
    def lipsync_video(self, video_path, audio_path, output_path):
        """Đồng bộ môi với video"""
        print("👄 Đang đồng bộ môi...")
        
        # Sử dụng Wav2Lip hoặc SadTalker
        from wav2lip_inference import Wav2Lip
        
        lip_sync_model = Wav2Lip()
        lip_sync_model.process(
            face=video_path,
            audio=audio_path,
            outfile=output_path
        )
        
        return output_path
    
    def process_video(self, video_path, output_path="output_dubbed.mp4"):
        """Pipeline chính xử lý video"""
        try:
            # 1. Khởi tạo models
            self.initialize_models()
            
            # 2. Trích xuất audio
            print("🎵 Trích xuất audio từ video...")
            audio_path = self.extract_audio(video_path)
            
            # 3. Xử lý với diarization
            result = self.process_with_diarization(audio_path)
            
            # 4. Nhóm theo speaker
            print("👥 Nhóm theo speakers...")
            speakers_data = self.group_by_speaker(result)
            
            # 5. Dịch text
            speakers_data = self.translate_speaker_text(speakers_data)
            
            # 6. TTS cho từng speaker
            speakers_data = self.tts_for_speakers(speakers_data)
            
            # 7. Ghép audio tracks
            print("🎛️  Ghép audio tracks...")
            merged_audio = self.merge_audio_tracks(
                speakers_data, 
                audio_path, 
                "temp/merged_audio.wav"
            )
            
            # 8. Đồng bộ môi
            final_video = self.lipsync_video(
                video_path,
                merged_audio,
                output_path
            )
            
            print(f"✅ Hoàn thành! Video đã lưu tại: {final_video}")
            
            # Export metadata
            self.export_metadata(speakers_data, "output/metadata.json")
            
            return final_video
            
        except Exception as e:
            print(f"❌ Lỗi: {e}")
            raise

    def export_metadata(self, speakers_data, output_path):
        """Xuất metadata chi tiết"""
        metadata = {
            "speakers": {},
            "statistics": {
                "total_speakers": len(speakers_data),
                "total_duration": sum(d["total_duration"] for d in speakers_data.values())
            }
        }
        
        for speaker_id, data in speakers_data.items():
            metadata["speakers"][speaker_id] = {
                "total_duration": data["total_duration"],
                "segment_count": len(data["segments"]),
                "original_text": data.get("full_text", ""),
                "translated_text": data.get("translated_text", ""),
                "segments": data["segments"]
            }
        
        with open(output_path, "w", encoding="utf-8") as f:
            json.dump(metadata, f, ensure_ascii=False, indent=2)

# CONFIGURATION
CONFIG = {
    "whisper_model": "large-v3",  # hoặc "medium", "large-v2"
    "hf_token": "YOUR_HUGGINGFACE_TOKEN",  # Lấy từ huggingface.co/settings/tokens
    "source_lang": "zh",
    "target_lang": "en",
    "batch_size": 16,
    "voice_mapping": {
        "SPEAKER_00": "en/male",
        "SPEAKER_01": "en/female",
        "SPEAKER_02": "en/neutral"
    },
    "reference_audio": "reference_voice.wav"  # cho XTTS
}

if __name__ == "__main__":
    # Khởi tạo
    dubber = AdvancedVideoDubber(CONFIG)
    
    # Xử lý video
    dubber.process_video(
        "input_video.mp4",
        "output_dubbed.mp4"
    )

📁 CẤU TRÚC DỰ ÁN HOÀN CHỈNH

text
video-dubber-enhanced/
├── README.md
├── requirements.txt
├── config/
│   ├── diarization.yaml
│   ├── translation.yaml
│   └── voices.yaml
├── src/
│   ├── __init__.py
│   ├── audio_processor.py
│   ├── diarization.py          # Core diarization logic
│   ├── translator.py
│   ├── tts_engine.py
│   ├── lipsync.py
│   └── video_dubber.py         # Main pipeline
├── models/
│   ├── whisper/
│   ├── diarization/
│   └── tts/
├── utils/
│   ├── audio_utils.py
│   ├── video_utils.py
│   └── logger.py
├── tests/
│   ├── test_diarization.py
│   └── test_integration.py
├── scripts/
│   ├── install_dependencies.sh
│   └── download_models.sh
└── examples/
    └── multi_speaker_demo.py

📦 INSTALLATION SCRIPT

bash
#!/bin/bash
# install_dependencies.sh

echo "Cài đặt VideoDubber Enhanced với Diarization..."

# 1. Cập nhật hệ thống
sudo apt update
sudo apt install -y ffmpeg python3-pip python3-venv git

# 2. Tạo môi trường ảo
python3 -m venv venv
source venv/bin/activate

# 3. Cài đặt PyTorch (phiên bản phù hợp với CUDA)
pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

# 4. Cài đặt WhisperX và dependencies
pip install whisperx
pip install git+https://github.com/m-bain/whisperx.git

# 5. Cài đặt pyannote.audio
pip install pyannote.audio
pip install pyannote.pipeline

# 6. Cài đặt TTS engines
pip install TTS
pip install coqui-tts

# 7. Cài đặt audio/video processing
pip install pydub moviepy librosa soundfile

# 8. Cài đặt các utilities
pip install rich tqdm numpy pandas

# 9. Cài đặt transformers cho translation
pip install transformers sentencepiece

# 10. Tải models
echo "Tải pre-trained models..."
python -c "
import whisperx
import torch

# Tải Whisper model
print('Tải Whisper model...')
model = whisperx.load_model('large-v3', 'cuda' if torch.cuda.is_available() else 'cpu')

# Tải diarization model (cần HF token)
print('LƯU Ý: Cần set HF_TOKEN environment variable')
"

echo "✅ Cài đặt hoàn tất!"
echo "👉 Chạy: source venv/bin/activate"
echo "👉 Sau đó: python src/video_dubber.py"

⚙️ CONFIGURATION YAML

yaml
# config/diarization.yaml
diarization:
  engine: "pyannote"  # or "nemo", "whisperx"
  model: "pyannote/speaker-diarization-3.1"
  min_speakers: 1
  max_speakers: 10
  clustering:
    method: "spectral"
    threshold: 0.7
  vad:
    enabled: true
    threshold: 0.5
    min_speech_duration: 0.1
  postprocessing:
    merge_gap: 0.5  # seconds
    min_segment_length: 0.3  # seconds

whisper:
  model_size: "large-v3"
  language: "zh"
  beam_size: 5
  best_of: 5
  temperature: 0.0
  compression_ratio_threshold: 2.4
  logprob_threshold: -1.0
  no_speech_threshold: 0.6
  condition_on_previous_text: true
  initial_prompt: null
  word_timestamps: true
  prepend_punctuations: "\"'“¿([{-"
  append_punctuations: "\"'.。,,!!??::”)]}、"

🚀 OPTIMIZED PIPELINE WITH CACHING

python
# src/optimized_dubber.py
import hashlib
import pickle
from functools import lru_cache
from pathlib import Path

class OptimizedVideoDubber(AdvancedVideoDubber):
    """VideoDubber với caching và optimization"""
    
    def __init__(self, config):
        super().__init__(config)
        self.cache_dir = Path("cache")
        self.cache_dir.mkdir(exist_ok=True)
        
    def _get_cache_key(self, video_path, step):
        """Tạo cache key từ video và processing step"""
        # Hash video file
        with open(video_path, 'rb') as f:
            video_hash = hashlib.md5(f.read()).hexdigest()
        
        config_hash = hashlib.md5(
            str(self.config).encode()
        ).hexdigest()
        
        return f"{video_hash}_{config_hash}_{step}"
    
    @lru_cache(maxsize=10)
    def cached_diarization(self, audio_path):
        """Cached diarization để tránh reprocessing"""
        cache_key = self._get_cache_key(audio_path, "diarization")
        cache_file = self.cache_dir / f"{cache_key}.pkl"
        
        if cache_file.exists():
            print("📦 Loading diarization from cache...")
            with open(cache_file, 'rb') as f:
                return pickle.load(f)
        
        # Process và cache
        result = super().process_with_diarization(audio_path)
        
        with open(cache_file, 'wb') as f:
            pickle.dump(result, f)
        
        return result
    
    def process_with_optimization(self, video_path):
        """Pipeline tối ưu với caching và parallel processing"""
        from concurrent.futures import ThreadPoolExecutor, as_completed
        
        # 1. Extract audio
        audio_path = self.extract_audio(video_path)
        
        # 2. Diarization (cached)
        result = self.cached_diarization(audio_path)
        
        # 3. Group speakers
        speakers_data = self.group_by_speaker(result)
        
        # 4. Parallel translation và TTS cho từng speaker
        with ThreadPoolExecutor(max_workers=len(speakers_data)) as executor:
            future_to_speaker = {}
            
            for speaker_id, data in speakers_data.items():
                future = executor.submit(
                    self.process_single_speaker,
                    speaker_id,
                    data,
                    self.config
                )
                future_to_speaker[future] = speaker_id
            
            # Collect results
            for future in as_completed(future_to_speaker):
                speaker_id = future_to_speaker[future]
                try:
                    speakers_data[speaker_id] = future.result()
                except Exception as e:
                    print(f"Error processing speaker {speaker_id}: {e}")
        
        # 5. Merge và sync
        return self.merge_and_sync(speakers_data, video_path)
    
    def process_single_speaker(self, speaker_id, data, config):
        """Xử lý song song cho từng speaker"""
        # Translation
        data = self.translate_speaker_text({speaker_id: data})[speaker_id]
        
        # TTS
        data = self.tts_for_speakers({speaker_id: data})[speaker_id]
        
        return data

🎯 BEST PRACTICES

1. Model Selection Matrix

python
DIARIZATION_MODELS = {
    "high_accuracy": {
        "name": "pyannote/speaker-diarization-3.1",
        "requirements": "HF Token, GPU, >8GB RAM",
        "accuracy": "95%+",
        "best_for": "Studio quality, interviews"
    },
    "balanced": {
        "name": "nvidia/speaker-diarization",
        "requirements": "GPU, 4GB RAM",
        "accuracy": "90%",
        "best_for": "General purpose"
    },
    "fast": {
        "name": "whisperx-diarize",
        "requirements": "CPU/GPU, 2GB RAM",
        "accuracy": "85%",
        "best_for": "Real-time, low-resource"
    }
}

2. Speaker Voice Mapping Strategy

python
def intelligent_voice_mapping(speakers_data):
    """Phân tích và chọn giọng phù hợp cho từng speaker"""
    voice_profiles = {
        "male_deep": {"pitch": "low", "speed": "slow", "model": "en_male_01"},
        "male_mid": {"pitch": "medium", "speed": "normal", "model": "en_male_02"},
        "female_high": {"pitch": "high", "speed": "fast", "model": "en_female_01"},
        "female_mid": {"pitch": "medium", "speed": "normal", "model": "en_female_02"},
        "child": {"pitch": "very_high", "speed": "fast", "model": "en_child_01"}
    }
    
    # Phân tích acoustic features
    for speaker_id, data in speakers_data.items():
        # Estimate gender từ pitch (nếu có reference audio)
        # Hoặc dựa vào context
        if "先生" in data["full_text"] or "男" in data["full_text"]:
            data["voice_profile"] = voice_profiles["male_mid"]
        elif "女士" in data["full_text"] or "女" in data["full_text"]:
            data["voice_profile"] = voice_profiles["female_mid"]
        else:
            # Default based on speaking duration và style
            data["voice_profile"] = voice_profiles["male_mid"]
    
    return speakers_data

3. Error Handling & Fallbacks

python
class RobustDiarization:
    """Diarization với fallback mechanisms"""
    
    def diarize_with_fallback(self, audio_path):
        """Thử nhiều phương pháp nếu phương pháp chính fail"""
        methods = [
            self._pyannote_diarization,
            self._nemo_diarization,
            self._simple_diarization
        ]
        
        for method in methods:
            try:
                result = method(audio_path)
                if self._validate_diarization(result):
                    return result
            except Exception as e:
                print(f"Method {method.__name__} failed: {e}")
                continue
        
        # Fallback cuối cùng: single speaker
        return self._single_speaker_fallback(audio_path)
    
    def _validate_diarization(self, result):
        """Validate diarization result"""
        if not result or "segments" not in result:
            return False
        
        # Check segment consistency
        segments = result["segments"]
        if len(segments) == 0:
            return False
        
        # Check for overlapping segments
        sorted_segments = sorted(segments, key=lambda x: x["start"])
        for i in range(1, len(sorted_segments)):
            if sorted_segments[i]["start"] < sorted_segments[i-1]["end"]:
                return False
        
        return True

📊 PERFORMANCE OPTIMIZATION

python
# performance_optimizer.py
import torch
import gc
from contextlib import contextmanager

@contextmanager
def optimized_inference(device="cuda"):
    """Context manager cho inference tối ưu"""
    torch.backends.cudnn.benchmark = True
    torch.set_grad_enabled(False)
    
    if device == "cuda":
        torch.cuda.empty_cache()
    
    try:
        yield
    finally:
        gc.collect()
        if device == "cuda":
            torch.cuda.empty_cache()

class PerformanceOptimizer:
    def __init__(self):
        self.batch_sizes = {
            "whisper": 32,
            "diarization": 16,
            "tts": 8
        }
        
    def auto_tune_batch_size(self, model_type, available_memory):
        """Tự động điều chỉnh batch size dựa trên memory"""
        if model_type == "whisper":
            if available_memory > 16000:  # 16GB
                return 32
            elif available_memory > 8000:  # 8GB
                return 16
            else:
                return 8
        
        # Similar logic for other models
        return 4
    
    def mixed_precision_inference(self, model, inputs):
        """Mixed precision inference để tiết kiệm memory"""
        with torch.cuda.amp.autocast():
            return model(inputs)

🧪 TESTING & VALIDATION

python
# tests/test_diarization_integration.py
import pytest
import numpy as np
from src.diarization import AdvancedVideoDubber

class TestDiarizationIntegration:
    @pytest.fixture
    def dubber(self):
        return AdvancedVideoDubber(test_config)
    
    def test_multi_speaker_detection(self, dubber):
        """Test nhận diện multiple speakers"""
        # Tạo test audio với 3 speakers
        test_audio = self.create_test_audio(speakers=3)
        
        result = dubber.process_with_diarization(test_audio)
        
        # Kiểm tra số lượng speakers
        speakers = set(seg.get("speaker", "UNKNOWN") 
                      for seg in result["segments"])
        
        assert len(speakers) >= 2, f"Chỉ detect được {len(speakers)} speakers"
    
    def test_speaker_consistency(self, dubber):
        """Test consistency của speaker assignment"""
        result = dubber.process_with_diarization("test_audio.wav")
        
        # Check không có overlapping segments với cùng speaker
        segments_by_speaker = {}
        for seg in result["segments"]:
            speaker = seg["speaker"]
            if speaker not in segments_by_speaker:
                segments_by_speaker[speaker] = []
            segments_by_speaker[speaker].append(seg)
        
        for speaker, segs in segments_by_speaker.items():
            sorted_segs = sorted(segs, key=lambda x: x["start"])
            for i in range(1, len(sorted_segs)):
                assert sorted_segs[i]["start"] >= sorted_segs[i-1]["end"], \
                    f"Speaker {speaker} có overlapping segments"

🚀 DEPLOYMENT WITH DOCKER

dockerfile
# Dockerfile
FROM nvidia/cuda:11.8.0-runtime-ubuntu22.04

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    ffmpeg \
    python3.10 \
    python3-pip \
    git \
    && rm -rf /var/lib/apt/lists/*

# Copy requirements
COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt

# Download models
RUN python3 -c "
import whisperx
import torch
whisperx.load_model('large-v3', 'cpu')
"

# Copy source code
COPY . .

# Create cache directory
RUN mkdir -p /app/cache /app/output /app/temp

# Environment variables
ENV HF_TOKEN=your_token_here
ENV PYTHONPATH=/app

CMD ["python3", "src/video_dubber.py"]

📈 BENCHMARK RESULTS

python
# benchmark.py
import time
from dataclasses import dataclass

@dataclass
class BenchmarkResult:
    video_duration: float
    processing_time: float
    speakers_detected: int
    accuracy: float
    memory_used: float
    fps: float

class VideoDubberBenchmark:
    def run_benchmark(self, video_path):
        """Chạy benchmark toàn diện"""
        results = []
        
        # Test với các config khác nhau
        configs = [
            {"whisper_model": "base", "diarization": "simple"},
            {"whisper_model": "medium", "diarization": "pyannote"},
            {"whisper_model": "large-v3", "diarization": "pyannote+hf"}
        ]
        
        for config in configs:
            dubber = AdvancedVideoDubber(config)
            
            start_time = time.time()
            result = dubber.process_video(video_path)
            end_time = time.time()
            
            benchmark = BenchmarkResult(
                video_duration=self.get_video_duration(video_path),
                processing_time=end_time - start_time,
                speakers_detected=len(result["speakers"]),
                accuracy=self.calculate_accuracy(result),
                memory_used=self.get_memory_usage(),
                fps=self.calculate_fps(video_path, result)
            )
            
            results.append(benchmark)
        
        return results

KẾT LUẬN: Cách tốt nhất là kết hợp WhisperX cho transcription + alignment và PyAnnote cho diarization, với caching và parallel processing. Đảm bảo:

  1. ✅ Sử dụng GPU để tăng tốc

  2. ✅ Implement caching để tránh reprocessing

  3. ✅ Parallel processing cho từng speaker

  4. ✅ Intelligent voice mapping

  5. ✅ Comprehensive error handling

  6. ✅ Performance monitoring và optimization

Comments

Popular posts from this blog

The World at a Crossroads: Donald Trump’s Presidency and Its Global Impact

Cho tam giác ABC vuông ở B, kéo dài AC về phía C một đoạn CD=AB=1, góc CBD=30 độ. Tính AC.

Cho tam giác ABC vuông tại A có AB < AC. Vẽ AH vuông góc với BC ( H thuộc BC), D là điểm trên cạnh AC sao cho AD=AB. Vẽ DE vuông góc với BC( E thuộc BC). Chứng minh rằng : HA=HE.