Mirrativ Tech Blog

株式会社ミラティブの開発者(バックエンド,iOS,Android,Unity,機械学習,インフラ, etc.)によるブログです

Neural Audio Codec を用いた大規模配信文字起こしシステムの構築

こんにちは ハタ です。
最近Mirrativ上に構築した配信の文字起こしシステムを紹介したいなと思います

音声からの文字起こしは、各社SaaSでAPI提供されているものがあると思いますが、今回紹介するものはセルフホスト型(自前のGPUマシンを使う)になります
構築していく上で色々試行錯誤したのでそれが紹介できればなと思っています

どんなものを作ったか

今回作ったものは Mirrativで配信されるすべての音声を対象に文字起こしを行う システムになります
Mirrativでは リアルタイムに処理するサーバサイド通知ぼかし などリアルタイム処理が多いのですが、今回のシステムは ニアリアルタイム での文字起こしを行っています

なぜニアリアルタイムとしているかですが、配信文字起こしシステムの概要を絵にしてみたのでご確認ください

いつくかの箇所で Disk が出てきていたり、バッファリングしている箇所などがあるのですが、これらがニアリアルタイムとしている箇所です
Live Recorder / Archiver / Transcriber それぞれである程度Delayを許容する仕組みとなっていて、状況に応じて Disk にオフロードされるようにしています

前提知識: 配信基盤

まず最初に紹介しなければならないのは、 Mirrativ の配信基盤です
かなり以前の記事 になりますが 次のような Origin/Edge 構成と紹介しました

今回紹介する中では主役ではない配信基盤ですが、Mirrativ上で配信される全てのストリームはこの配信基盤を使っています。
この配信基盤はオンプレミスな環境にあり、Origin/Edge 構成のサーバおよびストリームは全てインターネットを経由しない内部のネットワーク上に流れています
ライブ配信においてインターネット上にトラフィックを流すとどうしてもコストがかかってしまうのですが、内部ネットワーク上で行うためコストを抑えれる構成になっています

また、Mirrativの配信基盤ではVideoストリームとAudioストリームは別々のストリームとして扱えるようになっています

Audioストリームだけ分離できているため、文字起こしに必要な処理を最小限に実装できるのは特徴的かもしれません
また Push方式で配信しているため、受信機さえ用意すれば音声データを取り出すことができます
(配信基盤の内部構成にもし興味あれば こちらのスライド を参照ください)

配信文字起こしシステムでは下記のように配信基盤のAudioストリームのみを取り出しながら音声から文字起こししています

前提知識: Unix Domain Socket

次に、事前に紹介しておかなければならないのは、 Mirrativ のミドルウェアはほとんどが Go で作られていて、多くのライブラリも Go です
ただ、今回紹介する中で VAD や NAC, STT を処理するライブラリのほとんどは Python を使用しています

Go と Python の連携では HTTP API を使ったり gRPC を使ったものだったりが候補に上がるのですが
Go で利用している値をなるべく低コストで Python へ渡したい
値のやりとりだけになるため、なるべくシンプルにしたい
という思いから Unix Domain Socket(UDS) を使った IPC 通信を採用しました

Pythonが サーバ側となり、Goから値を受け取って個別の処理をするようにしています

例えば torch を使ったものだと

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--uds", "-u", type=str, required=True, default="/tmp/name.sock")
args = parser.parse_args()

import torch
torch.set_num_threads(1)
model = torch.jit.load("/path/to/model", map_location=torch.device("cuda"))
model.eval()

import socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(args.uds)
sock.listen(-1)

import io
from struct import pack,unpack

while True:
    conn, addr = sock.accept()
    head = conn.recv(4)
    size = unpack('>I', head) # BE uint32
    remain = size[0]
    buf = io.BytesIO()
    while 0 < remain:
        d = conn.recv(size[0])
        if len(d) < 1:
            break
        buf.write(d)
        remain -= len(d)
    buf.seek(0)

    try:
        res = process(buf)
        out = res.encode('ascii')

        wsize = pack('>I', len(out))
        conn.sendall(wsize)
        conn.sendall(out)
    finally:
        conn.close()

のようにしておき、事前に Model の読み込みなどを行ってから、Socketを受け取ります

この Python のサーバは Go から別プロセスとして起動しています (python のpathは venv などでセットアップ済みの python を向けてます)

type PyWorker struct {
    codePath string   // e.g.) worker.py
    args     []string
    env      []string
    sockPath string
}

func (w *PyWorker) Run(ctx context.Context) error {
    pythonPath, err := exec.LookPath(filepath.Join("path", "to", "bin", "python"))
    if err != nil {
        return err
    }

    cmdArgs := make([]string, 0, 3)
    cmdArgs = append(cmdArgs, w.codePath)
    cmdArgs = append(cmdArgs,  "--uds", w.sockPath)
    cmdArgs = append(cmdArgs, w.args...)

    cmd := exec.Command(pythonPath, cmdArgs...)
    cmd.Env = append(os.Environ(), w.env...)
    if err := cmd.Start(); err != nil {
        return err
    }
    <-ctx.Done()
    cmd.Process.Signal(syscall.SIGINT)
    if _, err := cmd.Process.Wait(); err != nil {
        return err
    }
    return nil
}

PyWorker とあるように、複数個の Worker プロセスとして起動させてます

Python との通信は Length-Prefix 形式 としているので、クライアントとなる Go の実装は次のようにしています

type PyClient struct {
    sockPath string
}

func (c *PyClient) Call(data []byte) ([]byte, error) {
    conn, err := net.DialUnix("unix", nil, &net.UnixAddr{Net: "unix", Name: c.sockPath})
    if err != nil {
        return nil, err
    }
    defer conn.Close()

    writeSize := uint32(len(data))
    reqSizeBuf := make([]byte, 4)
    binary.BigEndian.PutUint32(reqSizeBuf, writeSize)

    if err := c.writeAll(conn, reqSizeBuf, 4); err != nil {
        return nil, err
    }
    if err := c.writeAll(conn, data, writeSize); err != nil {
        return nil, err
    }

    resSizeBuf, err := c.readAll(conn, 4)
    if err != nil {
        return nil, err
    }

    payloadSize := binary.BigEndian.Uint32(resSizeBuf)
    return c.readAll(conn, payloadSize)
}

func (c *PyClient) writeAll(w io.Writer, data []byte, size uint32) error {
    max := int64(size)
    total := int64(0)
    for {
        n, err := w.Write(data)
        if err != nil {
            return err
        }

        total += int64(n)
        if total == max {
            break
        }
    }
    return nil
}

func (c *PyClient) readAll(r io.Reader, size uint32) ([]byte, error) {
    max := int64(size)
    lim := io.LimitReader(r, max)

    out := bytes.NewBuffer(make([]byte, 0, size))

    buf := make([]byte, 16*1024)
    total := int64(0)
    for {
        n, err := lim.Read(buf)
        if err != nil {
            if errors.Is(err, io.EOF) {
                break
            }
            return nil, err
        }
        out.Write(buf[:n])
        total += int64(n)
        if total == max {
            break
        }
    }
    return out.Bytes(), nil
}

(Length-Prefix形式は、何と言う呼び名がいいのか分からなかったためそう呼称しました、 4byteのヘッダとペイロードとなる組み合わせのパケットです)

実際に扱うデータとしては Python でも処理がしやすい JSON 形式で送り、Go と Python の連携が行いやすくしています

Live Recorder

ここからは実際の処理の紹介になります
Live Recorder と呼んでいる箇所では、リアルタイムに受信した音声パケットをバッファリングしつつ、 5秒から10秒程度の chunk (塊)にまとめて保存しています

5秒から10秒程度 としているのは、後述する NAC で圧縮する際に大きすぎると性能が落ちやすく、小さすぎるとオーバヘッドが出やすいため最小5秒程度として性能に合わせてチューニングしています
また chunk には metadata も付与していて 収録時間やサンプリングレート等の後段の処理で必要になるものを記録しています
実際のコードとは少し違いますが、バッファリングとchunk作成は次のようなコードです

const (
    threshold time.Duration = 5 * time.Second
)

type DataLayout struct {
    Metadata Meta
    Data     []byte
}

type Meta struct {
    Dur time.Duration
    // ...他にもいろいろ
}

type Recorder struct {
    pktDur time.Duration
    buf    *bytes.Buffer
    bufDur time.Duration
    seq    uint16
}

func (r *Recorder) Recv(pkt []byte) {
    if threshold < r.bufDur {
        r.flush()
    }
    r.buf.Write(pkt)
    r.bufDur += r.pktDur
}

func (r *Recorder) flush() {
    f, _ := os.Create(fmt.Sprintf("chunk.%d.tmp", r.seq))
    gob.NewEncoder(f).Encode(DataLayout{
        Metadata: Meta{r.bufDur},
        Data:     r.buf.Bytes(),
    })
    f.Sync()
    f.Close()
    os.Rename(f.Name(), fmt.Sprintf("/path/to/dir/chunk.%d", r.seq))

    r.buf.Reset()
    r.bufDur = 0
    r.seq += 1
}

func New(frameLen, sampleRate int) *Recorder {
    // frameLen e.g.) 1024
    // sampleRate e.g.) 44100
    pktDur := float32(frameLen) / float32(sampleRate)
    return &Recorder{
        time.Duration(pktDur*1000.0) * time.Millisecond,
        bytes.NewBuffer(nil),
        0,
        0,
    }
}

非常にシンプルな例になっていますが、大枠は同じです
また os.Rename を使っているのは、 inotify を使っているため作成したチャンクを即座に検知できるように不要なイベントを減らすためです

大量のストリームを受信することになるため Recorder のバッファリングには注意が必要ですが、それ以上に注意するのが inotify を使っているため fs.inotify.max_user_watchesfs.inotify.max_user_instances の設定値には注意しなければいけません

fs.inotify.max_user_watches のデフォルト値は比較的大きく設定されていると思いますが fs.inotify.max_user_instances のデフォルト値は低くなっている場合があります
上記例では ディレクトリ単位 に inotify で監視しているのですが、大量にインスタンスを作成しないように工夫するか fs.inotify.max_user_instances の数値を大きくする必要があります

Archiver

Archiver と呼んでいる箇所では、Recorder によって作成された chunk それぞれに対してフィルタリングを行っていき、NACで圧縮しています

DS Filter

DS は Detect Silence つまり無音チャンクかどうかを検知しています
VAD 処理でも同様の処理ができますが、VADよりも軽量でシンプルな処理のため前処理として作成しています
また NAC ではモノラル音源 かつ float32 で処理するため、変換処理もここで行っています

const (
    pcm16f    float32 = 1.0 / 32768.0
    threshold float32 = float32(-70.0)
)

func DSFilter(pcm []int16, frameLen, channels int) ([]float32, bool) {
    mono := monaural(pcm, frameLen, channels)
    wav := int16ToFloat32(mono)
    db := decibel(wav)
    isSilence := db < threshold
    return wav, isSilence
}

func int16ToFloat32(pcm []int16) []float32 {
    normalized := make([]float32, len(pcm))
    for i, d := range pcm {
        normalized[i] = float32(d) * pcm16f // -1.0 to 1.0
    }
    return normalized
}

func decibel(pcm []float32) float32 {
    r := rms(pcm)
    rmse := float32(math.Log10(r))
    return 20 * rmse
}

func rms(pcm []float32) float64 {
    values := float32(0.0)
    for _, d := range pcm {
        values += (d * d) // math.Pow(d,2)
    }
    return math.Sqrt(float64(values / float32(len(pcm))))
}

この処理によって 無音ではない = 音があるchunk だけを VAD で処理できるようになりました
実際の配信をいくつかためしながら、数秒程度の無音となる状態は意外と発生しているようなので、無音を取り除き VADする負荷も削減しています

VAD Filter

Voice Activity Detector (VAD) は Silero VAD を利用しています
発話区間を検出して、声が入っている区間以外を取り除くことで文字起こしに必要なデータだけになるようにフィルタリングしています
軽量でCPUでも十分に高速であるため、次のようにして CPU 処理だけにしています

import torch
torch.set_num_threads(1)

model, utils = torch.hub.load(
    repo_or_dir='snakers4/silero-vad',
    model='silero_vad',
    force_reload=False,
)   
(get_speech_timestamps, _, _, _, collect_chunks) = utils

import torchaudio
import json

import io
import json
def process(buf: io.BytesIO) -> bytes:
    data = json.load(buf)
    pcm = data["pcm"]
    samplerate = data["sr"]
    pcm = torch.tensor(pcm, dtype=torch.float32).to(device="cpu")
    pcm = pcm.reshape(1, -1)
    pcm = pcm.mean(0, keepdim=True)

    # to 16kHz
    wav = torchaudio.transforms.Resample(orig_freq=samplerate, new_freq=16000)(pcm)
    wav = wav.squeeze(0)
    speeches = get_speech_timestamps(wav, model, threshold=0.4, sampling_rate=16000)
    wav = collect_chunks(speeches, wav)

    # to original
    wav = torchaudio.transforms.Resample(orig_freq=16000, new_freq=samplerate)(wav)
    return json.dumps(wav.to(device="cpu").tolist())

この VAD でフィルタリングしたものと元音声の波形を並べてみました

少しこの画像だと分かりづらいかもしれませんが、赤枠で囲っている範囲が除去されています
内容にもよると思いますが、おおよそ10%から20%前後の音声データを除去することが出来ているようです

NAC / Compress

DS や VAD を使って、声だけの wav データを作り出すことができました
直接 wav のまま Disk に保存しても良いのですが、データは小さいほうが嬉しいです

Neural Audio Codec (NAC) は、ニューラルネットワークを活用したコーデックで、低いビットレートでも音の劣化を抑えつつ保存することができる最近見かけるようになった形式です
今回は Metaの開発した EnCodec を利用していて、Demo を聞いてもらえるとわかりやすいのですが、6kbpsや3kbpsなどビットレートが低い場合でも比較的聞きやすい状態かなと思います

今回、モノラルのデータを扱うようにしていることもあり 24kHz モデル の 6kbps を使っています*1
実際にはいくつかのビットレートでも試していて、文字起こしの際の誤認識も少なかったのでこの形に落ち着いていますが物によってはもっと低いものでもいけるかも知れません

from pathlib import Path
from encodec import EncodecModel
from encodec.compress import compress

model = EncodecModel.encodec_model_24khz(pretrained=True, repository=Path(args.model)).to(device=args.device)
model.set_target_bandwidth(args.bandwidth)

import torchaudio

import io
import json
def process(buf: io.BytesIO) -> bytes:
    data = json.load(buf)
    pcm = data["pcm"]
    samplerate = data["sr"]

    pcm = torch.tensor(pcm, dtype=torch.float32).to(device=args.device)
    pcm = pcm.reshape(1, -1)
    pcm = pcm.mean(0, keepdim=True)

    wav = torchaudio.transforms.Resample(samplerate, model.sample_rate).to(device=args.device)(pcm)
    compressed = compress(model, wav, use_lm=False)
    return compressed

compressed の中身は ECDC 形式 バイナリでパッキングされています

NAC での圧縮は GPUを使うようにしているのですが、CPUでも処理可能です
GPUを使っている理由としては処理速度の差になります
今回構築した文字起こしシステムでは、全ての配信をターゲットにしているため、とにかく大量のデータを取り込む必要があり、処理速度が速いGPUを使っています
時間をかけても良い場面などではCPUを選択したり、CPU/GPUのハイブリッドな構成もあり得るかと思います

また圧縮率ですが、DS + VAD処理が入っているためあまり参考になりませんが

圧縮前 圧縮後
80KB 4.4KB
40KB 1.4KB
40KB 4.6KB

となっていて、低いビットレートでの圧縮率の高さが見えます

Transcriber

最後に文字起こしを行う箇所になります

NAC / Decompress

ECDC chunk がある程度集まったら Decompress しています
これも Encodec のライブラリにあるものをそのまま使えます

from pathlib import Path
from encodec import EncodecModel
from encodec.compress import decompress

import io
import json
def process(buf: io.BytesIO) -> bytes:
    wav, sample_rate = decompress(buf.getvalue(), args.device)

    wav = wav.mean(0, keepdim=True)
    wav = torchaudio.transforms.Resample(sample_rate, 16000).to(device=args.device)(wav)
    return json.dumps(wav[0].to(device="cpu").tolist())

Decompress の処理自体はそこまで複雑ではありません
ただし、出来上がった音声データ (wav) は、オンメモリではなく ファイルとしてディスク上に保存するようにしています
これは、 UDS で送信する際に大きなデータを送ることを回避していて、仮に送ってしまうと、送信前のGoで作成したwavと受信したPythonのwav を2つ同時にメモリ上に保持することになり、大量の配信を処理する場面ではメモリ消費量が気になってしまいます
そこでGoで作成したwavはDiskに保存することでメモリの肥大化を防ぐようにしています

import "github.com/go-audio/wav"

const (
    pcm16Max float32 = 32767.0 // 2^15
)

func DecompressToFile(client *PyClient, files []string) (string, error) {
    out, err := os.CreateTemp(os.TempDir(), "*.wav")
    if err != nil {
        return "", err
    }
    defer out.Close()

    we := wav.NewEncoder(out, 16000, 16, 1, 1)
    for _, p := range files {
        ecdc, err := os.ReadFile(p)
        if err !=! nil {
            return "", err
        }
        data, err := client.Call(ecdc)
        if err != nil {
            return "", err
        }
        res := make([]float32, 0, 16*1024)
        if err := json.Unmarshal(data, &res); err != nil {
            return "", err
        }
        for _, d := range float32ToInt16(res) {
            if err := we.WriteFrame(d); err != nil {
                return "", err
            }
        }
    }
    if err := we.Close(); err != nil {
        return "", err
    }
    if err := out.Sync(); err != nil {
        return "", err
    }
    return out.Name(), nil
}

func float32ToInt16(pcm []float32) []int16 {
    normalized := make([]int16, len(pcm))
    for i, d := range pcm {
        normalized[i] = int16(d * pcm16Max) // -32768 to 32767
    }
    return normalized
}

(16kHz にしているのは後段の STT で使うサンプリングレートのため、Decompress時に行っています)

Speach To Text

Speach To Text (STT) では、 faster-whisper を利用しています
言わずと知れた実装なので紹介は省くのですが、採用した理由は GPU 利用時のメモリサイズの小ささです

openai/whisperのmedium modelではVRAMを5GB 程度消費しますが、 faster-whisperではlarge-v2 modelでも4.7GB 程度となっています
なるべく精度が高いモデルを大量に動かしたい場面では、faster-whisperの高速性や消費メモリの小ささは選択肢となるなと思います

実装はとてもシンプルで、事前に VAD 処理済みのデータが入るため直接 Transcribe を実行します

from faster_whisper import WhisperModel
model = WhisperModel(args.model, device=args.device, compute_type=args.compute, cpu_threads=args.threads)

import io
import json
def process(buf: io.BytesIO) -> bytes:
    data = json.load(buf)
    path = data["wav_path"]
    segments, info = model.transcribe(
        path,
        beam_size=5,
        best_of=2,
        language='ja',
        without_timestamps=False,
        word_timestamps=True,
        temperature=0.0,
        repetition_penalty=1.1,
    )

    out = []
    for segment in segments:
        out.append("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))
    
    return json.dumps(out)

ここでの処理は CPU/GPU のハイブリッド構成としています
CPU処理自体は、音声ストリームの受信やVAD処理にしか使っていないため比較的空きがあります
使用するメモリも大半をDiskに逃すようにしているため、非常に小さくなっています
そこでマシンリソースの効率化や少しでも多く文字起こし処理を行うため、CPUとGPUどちらも実行するようにしています
(ただ、GPUの方が効率がいいのでCPUはオプションとしての実行にしています)

さて、こうして取り出された文字起こしできたテキストデータは、別のストレージに保存して終了です

コンテナイメージ

さて、ここまで紹介してきた配信文字起こしの実装ですが、実際にリリースを行う際には Dockerfile などからイメージを作成していくことになります

おそらく一番最初にハマるのが docker buildx build などで GPU を利用できないことかなと思います
色々やり方があるとは思うのですが、割り切って nvidia-container-runtime を default-runtime に向けるというのが、いつも使うコマンドと同じになるのでおすすめです

やり方はシンプルで /etc/docker/daemon.json で default-runtime を nvidia にして

{
  "runtimes": {
    "nvidia": {
      "path": "/usr/bin/nvidia-container-runtime",
      "runtimeArgs": []
    },
  },
  "default-runtime": "nvidia"
}

とする形です

また、 cudnn が入ったベースイメージは 公式のもの をそのまま使うのが便利です

// Dockerfile
FROM nvidia/cuda:11.8.0-cudnn8-runtime-ubuntu22.04

WORKDIR /app
Add . .
# ...残りはセットアップ

まとめ

Mirrativで配信されるすべての音声を対象に文字起こしを行う 配信文字起こしシステムの実装を紹介してきました

全ての音声データを処理するために

  • Mirrativの配信基盤を使って、オンプレミスな内部ネットワークでコストを抑えつシステムを構築した
  • Go と Python の連携は UDS の IPC 通信にして効率化を図っている
  • inotify を用いて バッチ処理よりもリアルタイムに近い、ニアリアルタイム処理にしている
  • 文字起こしに必要なデータだけになるように 無音chunkの検知や発話区間で刈り取りを行なって最小限のデータにした
  • NAC によって低ビットレートでも比較的劣化の少ないデータ形式で圧縮できた
  • VRAMの消費サイズが小さい faster-whisper を使い GPU を効率的に使うようにした
  • CPU/GPU はハイブリッドで利用している

今回、 個人的には久しぶりとなる GPU マシンを利用したシステムを作ってみました
一番最初は python 側に処理を寄せて、音声周りは pytorch で書いていたりしたのですが、CPU/GPUをどこでどう実行するか、メモリをどう管理するか等のチューニングのしやすさなどから今の構成となりました
まだ気になる箇所は他にもあるので順次アップデートしていってます

We are hiring!

効率的にマシンリソースを活用して、大規模配信文字起こしシステムを作ってみたいと思った方は ぜひカジュアル面談等も行なっているのでご連絡ください!

mirrativ.co.jp

mirrativ.notion.site

speakerdeck.com

*1:なお 48kHzモデルもあります こちらは2ch使えますがその分速度も遅くなるようです