Mirrativ Tech Blog

株式会社ミラティブの開発者(バックエンド,iOS,Android,Unity,機械学習,インフラ, etc.)によるブログです

Pub/Sub障害からの学び:JSONメッセージをOpenAPI Validationで守る

こんにちは、バックエンドエンジニアの山倉です。 MirrativではPub/Subサーバーを使ってサーバー - クライアント間のリアルタイム通信を行っており、メッセージ形式はJSONを採用しています。HTTP APIについては従来からOpenAPI Validationをかけていましたが、Pub/Subサーバーで使われるJSONメッセージに対しては十分なバリデーションができていませんでした。 本記事ではJSONメッセージにOpenAPI Validationを適用する仕組みを導入した経緯と実装方法を紹介します。

きっかけは、サーバー - クライアント間のJSONの型不一致による障害でした。この経験から「メッセージをスキーマで守ること」の重要性を実感し、再発防止策としてOpenAPI Validationを導入しました。

1. 要約

  • 新しい機能をリリースした際、Pub/Sub通信に使うJSONメッセージの一部に予期しない型の変化が発生した。その結果、クライアントでメッセージのパースに失敗し、障害が発生した
  • アプリケーションサーバーではJSONメッセージの構造や型を検証する仕組みがなかったため、こうした思わぬ変化があったとしても検知しにくかった
  • この反省から、Pub/Subサーバーにpublishする前にアプリケーションサーバー側でOpenAPI Validationを実施することで、自動テストやQAの段階で型や構造の不一致を検知できるようにした

2. 背景

Mirrativでは、コメントやギフト演出などリアルタイム性が求められるサーバー - クライアント間の通信を、Pub/Subサーバーによって実現しています。

コメントやギフトの演出機能はPub/Subサーバーによって実現されている

例えば「視聴者がコメントを送信すると、配信者の端末にコメントが届き画面に表示される」という機能は具体的には以下のようなステップを踏んでいます

  1. 視聴者がコメントを送信する
  2. クライアントアプリはアプリケーションサーバーのコメントAPIを呼び出す
  3. アプリケーションサーバーはPub/Subサーバーにコメントや視聴者情報をJSON形式でpublishする
  4. 配信者端末のクライアントアプリは3でpublishされたメッセージをsubscribeしており、受信したメッセージを元にコメントを表示する

視聴者がコメントしてから配信者の端末にコメントが表示されるまでの流れ

ここで、JSON形式のPub/Subメッセージについて、これまでスキーマ定義は明示的に行っていませんでした

このような背景があり、以下のような障害を引き起こすに至ってしまいました。

3. 障害の説明

何が起きたのか

新機能をサーバーにリリースした直後、一部の視聴者によるコメントが配信画面で表示されなくなる障害が発生しました。これを受けて公式アカウントでも不具合のお知らせを出す事態となりました1

原因

配信画面では、視聴者のコメントに VIPメンバーランク2に応じたVIPバッジが表示されます。

コメントにVIPメンバーバッジがついている様子(赤く囲った部分)

コメント情報が視聴者アプリからサーバーに送信され、Pub/Subサーバーを経由して配信者アプリに届きます。この際、クライアントアプリにてVIPバッジを出し分けるため、Pub/Subサーバーから送られるJSONメッセージには vip_rankというフィールドが含まれています。

本来このvip_rankint型で送られる想定でした。しかし、今回の施策実装時に一部の処理でPerlの暗黙的な型変換により数値が文字列として評価・格納されてしまう変更が入り、そのままstring型として送られてしまうケースが発生しました。その結果、クライアントは想定外の型を受け取ってパースに失敗し、コメントが画面に表示されなくなる という障害につながりました3

4. 再発防止策

今回の障害は、JSONメッセージのスキーマが明示されず、検証も行われていなかったため、型や構造の不一致が検知しにくかったことが背景にあります。 より正確にいうと、自動テストでPub/Subメッセージを検証する仕組みは入っていましたが、Perlで実装されており、動的型付け言語なため厳密な検証ができていませんでした。 例えば以下のようなテストは通ってしまいます。したがって今回のような型の不一致によって起きる障害をPerl側の自動テストで検知するのは難しいでしょう。

use strict;
use warnings;
use utf8;
use Test::Deep;
use Test::More;

subtest 'compare different types' => sub {
    cmp_deeply +{ foo => 1 }, +{ foo => '1' };
};

done_testing;

Mirrativでは現在サーバーシステム全体をPerlからGoへ移行している最中であり、そこでまず考えられる防止策として、コメント機能の実装についても同様に、動的型付け言語であるPerlから強い静的型付き言語であるGoに移植して自動テストすることを検討しました。しかし、これにはそれなりのコストがかかってしまうことが予想されます。 そこで、JSONメッセージのスキーマを定義し、Pub/Subの前段でOpenAPI Validationをかけることで、早い段階で不整合を捕捉できるようにしました。これには他にも以下の利点があります。

  • JSONメッセージのスキーマを明示することでドキュメンテーションとしての役割も果たせる
  • 将来的にはiOS / Androidなどのクライアント側のコードの自動生成にも活用可能
  • サーバー側・クライアント側のどちらの言語やフレームワークが変わったとしても検証や活用可能

アプリケーションサーバーからPub/Subサーバーへpublishする際、すべてのJSONメッセージが一度集約される処理がGoで実装されています。そこで、その処理の中でバリデーションを行いました。

JSONメッセージのスキーマを定める規格として、HTTP APIサーバーでの運用実績やチーム知見・周辺ツールの再利用性からOpenAPIを採用することにしました。

以降では、JSONメッセージ単体にOpenAPI Validationをかける実装をコードベースで詳述します。

実装方法

ディレクトリ構成

既存のHTTP API スキーマと同様にmulti-document形式を採用しました。エントリドキュメントはindex.yml4、各メッセージごとのスキーマはcomponents/schemas/配下に配置します。

.                               # スキーマを配置するトップディレクトリ(リポジトリ内の任意の場所)
├── components
│   └── schemas                 # JSONメッセージの種別ごとのスキーマ
│       └── Comment.yml
└── index.yml                   # エントリドキュメント(OpenAPIライブラリが一番初めに読み込むファイル)

スキーマ

pathsを空にし、components.schemasだけでJSON単体を検証します。

# index.yml

openapi: "3.0.0"
info:
  version: "1.0"
  title: "JSONメッセージ単体を検証する"
paths: {} # JSON単体を検証するため、pathsは使わない
components:
  schemas:
    Comment:
      $ref: "./components/schemas/Comment.yml"
# Comment.yml

type: object
description: コメントメッセージのオブジェクト
properties:
  type:
    type: integer
    enum: [1] 
  comment:
    description: liveComment
    type: string
  vip_rank:
    type: integer
required:
  - type
  - comment

バリデーションの実装(Go)

  • ライブラリは運用実績のある github.com/getkin/kin-openapi/openapi3 を使用します。
  • メッセージの種類(コメント、ギフト、...)を表すentity.PubSubMessageTypeを見て適用スキーマを選択します。
  • 実行速度の低下を考慮し、本番環境ではバリデーションを行いません。

以下が実装です5

type interactor struct {
    schemaByPubSubMessageType map[entity.PubSubMessageType]*openapi3.SchemaRef
}

func New(ctx context.Context) interactor {
    var schemaByPubSubMessageType map[entity.PubSubMessageType]*openapi3.SchemaRef
    if !IsProduction() {
        var err error
        schemaByPubSubMessageType, err = loadSchemas(ctx, filepath.Join("path", "to", "index.yml"))
        if err != nil {
            panic(err)
        }
    }
    return &interactor{
        schemaByPubSubMessageType,
    }
}

// JSONメッセージが一度集約される場所。Pub/Subサーバーに送る前に検証する
func (i *interactor) Publish(ctx context.Context, data string) error {
    if i.schemaByPubSubMessageType != nil {
        if err := i.validateJSON(data); err != nil {
            return err
        }
    }
    // 検証が通ったメッセージのみ Pub/Subサーバーへ
    // ...
    return nil
}

func (i *interactor) validateJSON(messageJSON string) error {
    const validationNeededForMessageTypeAfter = ... // 既存で存在するメッセージで最大の種別
    messageType := entity.PubSubMessageType(gjson.Get(messageJSON, "type").Uint())
    schema, ok := i.schemaByPubSubMessageType[messageType]
    if !ok {
        // 既存分はスキーマが定義されていなければバリデーションをスキップし、新規分はスキーマを定義するように強制する
        if messageType > validationNeededForMessageTypeAfter {
            return errors.Errorf("JSONメッセージのスキーマを定義してください。message type: %d", messageType)
        }
        return nil
    }

    var data any
    if err := json.Unmarshal([]byte(messageJSON), &data); err != nil {
        return errors.WithStack(err)
    }

    return errors.WithStack(schema.Value.VisitJSON(data))
}

// メッセージ種別ごとのスキーマを読み込む
func loadSchemas(ctx context.Context, docPath string) (map[entity.PubSubMessageType]*openapi3.SchemaRef, error) {
    schemaByPubSubMessageType := make(map[entity.PubSubMessageType]*openapi3.SchemaRef)

    loader := openapi3.NewLoader()
    loader.IsExternalRefsAllowed = true
    doc, err := loader.LoadFromFile(docPath)
    if err != nil {
        return nil, errors.WithStack(err)
    }
    if err := doc.Validate(ctx); err != nil {
        return nil, errors.WithStack(err)
    }

    for _, schemaRef := range doc.Components.Schemas {
        tProperty, ok := schemaRef.Value.Properties["type"]
        if !ok {
            return nil, errors.New("スキーマの全てのcomponentsについて、propertiesにはtypeが含まれる必要があります")
        }

        if len(tProperty.Value.Enum) != 1 {
            return nil, errors.New("スキーマの全てのcomponentsについて、propertiesのtypeフィールドはenumで定義された定数を1つだけ持つ必要があります")
        }

        tFloat, ok := tProperty.Value.Enum[0].(float64)
        if !ok {
            return nil, errors.New("スキーマの全てのcomponentsについて、propertiesのtフィールドは数値のenumである必要があります")
        }

        schemaByPubSubMessageType[entity.PubSubMessageType(tFloat)] = schemaRef
    }

    return schemaByPubSubMessageType, nil
}

カバレッジの考え方

現状多数のentity.PubSubMessageTypeが存在しており、その全種を一気にスキーマ化するのは困難です。なのでまずは障害が起きた配信コメントのJSONメッセージからスキーマを定め、それから徐々に他種類のJSONメッセージについてもスキーマ化する方針を取りました。JSONメッセージに変更が入る場合や、新たな種類のメッセージはスキーマを定めることで、徐々にカバレッジを上げていきます。

We are hiring!

ミラティブではエンジニアを絶賛募集中です。日本全国どこからでもフルリモート勤務が可能です。ご興味のある方はお気軽にご連絡ください!

speakerdeck.com

mirrativ.notion.site

www.mirrativ.co.jp


  1. ミラティブ公式アカウントの障害お詫びのポスト
  2. VIPメンバーランクとは:対象の金額プランのコイン購入をすることで、ユーザーは「VIPメンバー」になることができます。VIPメンバーの継続期間によってVIPメンバーランクが定まり、ランクに応じて様々な特典が受けられるようになります
  3. なお、問題が発生したのは配信者がiOS端末を利用していた場合で、 Android端末の場合は発生しませんでした。これは受け取ったJSONを構造体にマッピングをする際、型の差異を許容するかしないかのポリシーの違いがあったためです
  4. OpenAPI仕様では "It is RECOMMENDED that the entry document of an OAD be named: openapi.json or openapi.yaml." と記載されているため、index.yml よりも openapi.yaml の方が推奨されます
  5. 説明のために改変、簡易化している部分があります(例. uint64をuint32にキャストする部分には専用の関数を使い、オーバーフローした場合はpanicするようになっている など)