KAKEHASHI Tech Blog

カケハシのEngineer Teamによるブログです。

SQLAlchemy でスロークエリの呼び出し元を追跡する

こんにちは、AI在庫管理のバックエンドを担当している沖(@takuoki)です。

今回は、スロークエリの呼び出し元の追跡が難しいという問題について、実際に取り組んだ解決策をご紹介します。比較的シンプルなアプローチでしたが、開発体験の大幅な改善につながったので、同様の課題をお持ちの方の参考になれば嬉しいです。

課題:スロークエリの呼び出し元がわからない

システムを運用していると、次のような状況に遭遇することがあります。

  • スロークエリの発生件数が増加し、調査および対応が必要になる
  • 発行回数の多いスロークエリがどれかは確認できている
  • しかし、そのクエリがコードのどこで呼び出されたかを確認するのが難しい

特に開発期間が長くなりシステムの複雑さが増してくると、飛躍的に呼び出し元の調査が難しくなります。AI在庫管理でも、似たようなクエリが複数箇所から呼び出されることもあり、どの処理がボトルネックになっているかの特定に時間を要していました。

この問題を解決するために、クエリ実行時にコメントを付与し、呼び出し元を特定可能にするアプローチを採用しました。SQLAlchemy には、クエリ実行前に処理を挿入できるイベントリスナーの仕組みが備わっているため、これを活用することにしました。

今回は SQLAlchemy(Python)+ MySQL という環境での実例をご紹介しますが、他の環境でも同様のアプローチが適用できると考えています。

SQLAlchemy のイベントリスナーを用いた実装方法

SQLAlchemy のイベントリスナーを使用することで、データベース操作の各段階にイベントフックのような処理を挿入できます。例えば、データを更新した際に呼び出される after_update のイベントリスナーを追加することで、変更されたフィールドの情報を監査ログとして残すといったことが可能です。

今回は、すべての SQL 実行前に呼び出される before_cursor_execute のイベントリスナーを追加して、すべてのSQLに自動的にコメントを付与するアプローチを採用しました。この方法であれば、既存コードの修正なしですべてのクエリに一律でコメントが付与され、運用負荷を低く抑えることができます。

まずは API 名などの呼び出し元情報を管理するため、スレッドローカル変数を使用します。ミドルウェアなどの共通処理で set_caller_name を呼び出し、API 名を設定しておきます。今回のアプリケーションの実行環境は Lambda のため簡易な実装としていますが、他の環境の場合は格納場所に注意が必要なことがあるかもしれません。

threading_local = threading.local()

def set_caller_name(name: str) -> None:
    threading_local.name = name

def get_caller_name() -> str:
    return getattr(threading_local, "name", "unknown")

続いて、作成した DB エンジンに対してイベントリスナーを追加します。before_cursor_execute の中では、get_caller_name で呼び出し元の情報を取得し、コメントとして付与されるようにします。

def add_event_listener_for_sql_comment(engine: Engine, get_caller_name: Callable[[], str]) -> None:
    @event.listens_for(engine, "before_cursor_execute", retval=True)
    def before_cursor_execute(
        conn: Connection,
        cursor: Any,
        statement: str,
        parameters: Any,
        context: Optional[ExecutionContext],
        executemany: bool,
    ) -> tuple[str, Any]:
        caller_name = get_caller_name()
        comment = f"[caller] {caller_name}"
        statement = f"/* {comment} */ {statement}"
        return statement, parameters

以上の実装により、スロークエリのログには次のような形式でコメントが付与されるようになります。

/* [caller] api: getUser */ SELECT * FROM users WHERE id = 1;

バルクインサートで発生した問題

実装後、ログ出力も正常に行われ、パフォーマンス上も特に問題なく動作しているように見えました。しかし、この実装のリリース以降、バッチ処理の実行時間が延び、INSERT 文の実行回数も大幅に増える事象が発生しました。調査を行ったところ、これまでバルクインサートされていた処理が、単発の INSERT 文に置き換わっていることが判明しました。

原因を調査した結果、使用していたドライバ pymysql の下記の実装に起因することがわかりました。

RE_INSERT_VALUES = re.compile(
    r"\s*((?:INSERT|REPLACE)\b.+\bVALUES?\s*)"
    + r"(\(\s*(?:%s|%\(.+\)s)\s*(?:,\s*(?:%s|%\(.+\)s)\s*)*\))"
    + r"(\s*(?:ON DUPLICATE.*)?);?\s*\Z",
    re.IGNORECASE | re.DOTALL,
)

class Cursor:
    def executemany(self, query, args):
        if not args:
            return

        m = RE_INSERT_VALUES.match(query)
        if m:
            # バルク実行の処理
            ...
        else:
            # 単発実行のループ処理
            ...

(参照:pymysql/cursors.py

pymysql は正規表現 RE_INSERT_VALUES を使って INSERT 文を判定し、バルクインサートの最適化を行います。しかし、コメントを SQL 文の先頭に付与すると、この正規表現にマッチしなくなり、結果としてバルクインサートが単発の INSERT 文として実行されてしまいます。

この問題を解決するため、コメントの挿入位置を SQL 文の先頭ではなく最初の単語(SELECT, INSERT 等)の直後になるように実装を修正した結果、バルクインサート処理が正常に動作するようになりました。

def add_event_listener_for_sql_comment(engine: Engine, get_caller_name: Callable[[], str]) -> None:
    @event.listens_for(engine, "before_cursor_execute", retval=True)
    def before_cursor_execute(
        conn: Connection,
        cursor: Any,
        statement: str,
        parameters: Any,
        context: Optional[ExecutionContext],
        executemany: bool,
    ) -> tuple[str, Any]:
        caller_name = get_caller_name()
        comment = f"[caller] {caller_name}"
        statement = _add_comment_to_statement(statement, comment)
        return statement, parameters


def _add_comment_to_statement(statement: str, comment: str) -> str:
    if not comment:
        return statement

    escaped_comment = comment.replace("/", "\\/")

    if not statement:
        return f"/* {escaped_comment} */"

    # 最初の非空白文字の位置を探す
    first_non_space = 0
    while first_non_space < len(statement) and statement[first_non_space].isspace():
        first_non_space += 1

    # 最初の空白の位置を探す
    first_space = first_non_space
    while first_space < len(statement) and not statement[first_space].isspace():
        first_space += 1

    if first_space >= len(statement):
        # キーワードのみの場合
        return f"{statement} /* {escaped_comment} */"

    # キーワードと残りの部分がある場合
    remaining = statement[first_space:]
    return f"{statement[:first_space]} /* {escaped_comment} */{remaining}"

実際に発行される SQL は下記のようになります。

SELECT /* [caller] api: getUser */ * FROM users WHERE id = 1;

おわりに

今回の実装により、CloudWatch データベースインサイトや Datadog のダッシュボードにおいて、スロークエリの呼び出し元を特定した状態で確認できるようになりました。実際に普段見ているダッシュボードにも、呼び出し元ごとのスロークエリ発生件数の推移を確認できるようになっています。

これまで呼び出し元の特定に多くの時間を要していたスロークエリ対応において、問題箇所を素早く特定できるようになったため、結果として多くのパフォーマンス改善につなげることができました。

今回は SQLAlchemy(Python)と MySQL での実例をご紹介しましたが、同様の機能を持つライブラリであれば同じアプローチを適用できると考えています。シンプルなアプローチではありますが、このような小さな改善の積み重ねにより、開発体験の向上に寄与できたことを嬉しく思います。