はじめに
セーフィー株式会社 で画像認識AIの開発エンジニアをしている今野です。
今回は、最新の物体検出アルゴリズムであるYOLOv8を活用して、特定エリアを通過する車両を自動的にカウントするシステムの実装方法をご紹介します。このシステムは、交通量調査や駐車場の利用状況分析など、様々な場面で応用可能な技術です。
本記事では、YOLOv8による物体検出から、検出結果の後処理、そして実際の車両カウントまでの一連のプロセスを、具体的なコード例を交えて解説していきます。AIを活用した実用的なソリューションの構築に興味のある方々にとって、有益な情報となれば幸いです。
やりたいこと
本プロジェクトでは、以下の機能を実現することを目指します:
- カメラ映像を使った通行量カウント:道路を通過する車両を検出し、カウントします。
- 複数種類の対象物の識別:自動車、トラック、バス、自転車など、異なる種類の通行物を区別してカウントします。
- 特定エリアでのカウント:映像内の特定の範囲(例:交差点や横断歩道)を通過する対象物だけをカウントします。
実施手順
YOLOv8を使用した通行量カウントシステムの実装は、以下の手順で進めていきます:
- 環境構築
- YOLOv8による物体検出と追跡
- 検出結果の後処理
- 車両カウントの実装
- 結果の出力
これらの手順を通じて、YOLOv8を使用した基本的な通行量カウントシステムを構築していきます。各ステップの詳細は、以降のセクションで具体的に解説していきます。
環境構築
まず、必要なライブラリをインストールします。以下のコマンドを実行してください。
pip install ultralytics opencv-python numpy matplotlib shapely
YOLOv8 による物体検出と追跡
環境構築後、コマンドラインから検出・追尾を実行します
yolo task=detect mode=track model=yolov8x source=/*対象の動画*/ save_txt save_conf save=True project=/*保存先ディレクトリ*/ classes=2,3,4,6,8
上記のコマンドを実行すると、指定した動画ファイルに対して物体検出と追跡が行われ、結果が指定したディレクトリに保存されます。
コマンドの各引数の説明
- task: 実行するタスクを指定します。ここでは物体検出と追跡を行うために
track
を指定しています。detect
を指定すると、物体検出のみが実行されます。 - model: 使用するYOLOv8モデルを指定します。
yolov8x
は、YOLOv8の大きいサイズのモデルを指します。必要に応じて、yolov8s
やyolov8l
など他のサイズのモデルを指定することも可能です。 - source: 推論を行うソースを指定します。
- save_txt: このオプションを指定すると、検出および追跡結果がテキストファイルとして保存されます。各フレームごとにオブジェクトのクラスID、バウンディングボックスの座標などが記録されます。
- save_conf: このオプションを指定すると、各検出結果の信頼度スコアもテキストファイルに保存されます。これにより、どの程度の信頼度で物体が検出されたかを確認できます。
- save: Trueにすることで動画ファイルが指定したディレクトリに保存されます。この後の車両カウントに動画ファイルは不要なので、saveはFalseにして処理を高速化することも可能です。
- project: 結果を保存するルートディレクトリを指定します。ここで指定されたディレクトリの中に、結果が保存されます。指定がない場合、デフォルトで
runs
ディレクトリに保存されます。 - lasses: このオプションは、検出するクラスを特定のIDに絞るために使用します。指定したクラスIDに対応する物体のみが検出されます。今回使用するモデルはCOCO datasetのカテゴリidに準拠しているため、”2,3,4,6,8”を指定すると自転車、自動車、オートバイ、バス、トラックのみが検出されるようになります。
保存されるファイル
動画ファイル: 検出結果が入力動画に対して重畳されたビデオファイルが保存されます。
project
オプションで指定したディレクトリの中に、検出結果が保存されたフォルダが作成され、その中に動画ファイルが保存されます。テキストファイル : 検出された各フレームのオブジェクト情報が保存されたテキストファイルが生成されます。各フレームに対応するテキストファイルが保存され、クラスIDやバウンディングボックスの座標などの情報が含まれています。
出力テキストファイル例:
7 0.467559 0.775986 0.196506 0.249537 0.94994 1 7 0.302576 0.186449 0.136675 0.144221 0.91664 2 7 0.70413 0.417058 0.118785 0.119517 0.897981 3 7 0.717557 0.0861716 0.0540201 0.106011 0.838547 4 2 0.0168792 0.141835 0.0337098 0.0552144 0.810139 5 2 0.0770008 0.229309 0.068876 0.0676976 0.728882 6 2 0.532953 0.29052 0.0817046 0.088725 0.653665 7 2 0.194599 0.215173 0.0581096 0.0713775 0.633438 8 2 0.322347 0.272015 0.0641675 0.0660537 0.620266 9
検出結果の後処理
次に、YOLOv8で得られたトラッキング結果を車両カウントで使いやすい形へ変換する処理を行います
import argparse import glob import os import re import cv2 import matplotlib.pyplot as plt import numpy as np import pandas as pd from tqdm import tqdm from ultralytics.utils import yaml_load from ultralytics.utils.checks import check_yaml CLASSES = yaml_load(check_yaml("coco128.yaml"))["names"] def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument("--input_video", type=str, required=True, help="Path to the input video file") parser.add_argument("--input_dir", type=str, required=True, help="Directory containing the input label files") parser.add_argument("--output_dir", type=str, default="out", help="Directory to save the output files") parser.add_argument("--output_labels", type=str, default="output_video_results.csv", help="Name of the output CSV file") parser.add_argument("--vid_stride", type=int, default=1, help="Video stride for processing") args = parser.parse_args() return args def get_video_resolution(video_path: str) -> tuple[int, int]: """指定された動画の解像度を取得する Args: video_path (str): 動画のファイルパス Returns: tuple[int, int]: 解像度(width, height) """ cap = cv2.VideoCapture(video_path) try: width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) except Exception as e: print(f"An error occurred: {e}") finally: cap.release() return width, height def get_frame_num_from_label_file(label_file: str) -> int: """001.txtのような入力から0埋めなしの数字を取得する Args: label_file (str): 先頭に番号が付いたファイル名 Returns: int: 番号 """ match = re.search(r"(\d+)\.txt", label_file) assert match is not None return int(match.group(1)) def merge_labels(input_dir: str, input_video_path: str, output_dir: str, output_labels: str) -> None: """txtファイルを結合し、csvファイルを出力する Args: input_dir (str): 入力txtファイルが格納されているディレクトリ input_video_path (str): 入力動画のファイルパス output_dir (str): 出力ディレクトリ output_labels (str): 出力csvファイル名 """ # labelsファイルの一覧を取得 label_files = glob.glob(os.path.join(input_dir, "*.txt")) # ファイル名のフレーム番号部分(0埋めなしの数字)で昇順ソート label_files.sort(key=lambda x: get_frame_num_from_label_file(x)) # 入力動画から解像度情報を取得 width, height = get_video_resolution(input_video_path) # txtファイルを読み込み、csvファイルを出力 df_list = [] column_names = ["class_id", "center_x", "center_y", "width", "height", "confidence", "tracking_id"] for label_file in tqdm(label_files, desc="Loading labels"): # ファイル名からフレーム番号を取得 frame_num = get_frame_num_from_label_file(label_file) # ファイルを読み込み df = pd.read_csv(label_file, sep=" ", header=None) # カラム名を設定 df.columns = column_names # 正規化座標をピクセル座標に変換 df["center_x"] = (df["center_x"] * width).astype(int) df["center_y"] = (df["center_y"] * height).astype(int) df["width"] = (df["width"] * width).astype(int) df["height"] = (df["height"] * height).astype(int) # フレーム番号を追加 df["frame_num"] = frame_num # クラスラベルを追加 df["class_label"] = df["class_id"].apply(lambda x: CLASSES[x]) # カラムを並べ替え df = df[ ["frame_num", "class_label", "class_id", "center_x", "center_y", "width", "height", "confidence", "tracking_id"] ] df_list.append(df) # データフレームを出力 concat_df = pd.concat(df_list) concat_df.to_csv(os.path.join(output_dir, output_labels), index=False) def aggregate_labels(output_dir: str, output_labels: str) -> None: """ラベルを集約する Args: output_dir (str): 出力ディレクトリ output_labels (str): 出力csvファイル名 """ # 結合したラベルファイルを読み込み concat_df = pd.read_csv(os.path.join(output_dir, output_labels)) # tracking_idごとにclass_idの出現頻度をカウント class_id_counts: dict[int, dict[int, int]] = {} for _, row in concat_df.iterrows(): if row["tracking_id"] not in class_id_counts: class_id_counts[row["tracking_id"]] = {} if row["class_id"] not in class_id_counts[row["tracking_id"]]: class_id_counts[row["tracking_id"]][row["class_id"]] = 0 class_id_counts[row["tracking_id"]][row["class_id"]] += 1 # tracking_idごとにclass_idの多数派を算出 class_id_map: dict[int, int] = {} for track_id, class_id_count in class_id_counts.items(): class_id_map[track_id] = max(class_id_count, key=class_id_count.__getitem__) # 多数派のクラスで上書き for track_id, class_id in class_id_map.items(): concat_df.loc[concat_df["tracking_id"] == track_id, "class_id"] = class_id concat_df.loc[concat_df["tracking_id"] == track_id, "class_label"] = CLASSES[class_id] concat_df.to_csv(os.path.join(output_dir, output_labels), index=False) def main(): args = parse_args() # ディレクトリの存在確認 if not os.path.exists(args.input_dir): print("input_dir not found") return if not os.path.exists(args.output_dir): os.makedirs(args.output_dir, exist_ok=True) # ファイルの存在確認 if not os.path.exists(args.input_video): print("input_video not found") return # ラベルファイルを結合 merge_labels( input_dir=args.input_dir, input_video_path=args.input_video, output_dir=args.output_dir, output_labels=args.output_labels, ) # ラベル集約 aggregate_labels(args.output_dir, args.output_labels) if __name__ == "__main__": main()
このスクリプトは、YOLOv8が生成したラベルファイルを読み込み、各トラッキングIDごとに検出されたクラス情報を集約して、最も頻繁に出現したクラスをそのトラッキングIDに関連付けます。これにより、車両の種類を安定して識別し、車両カウントに適したデータ形式に変換します。
スクリプトの主な機能
- YOLOv8の推論結果から生成されたラベルファイルを一つのCSVファイルに結合
- 正規化された座標(0~1の範囲)をピクセル単位の座標に変換
- 各トラッキングIDに対して、最も頻繁に出現したクラスをそのトラッキングIDの最終クラスとして指定する
- すべての情報を一つのCSVファイルにまとめ、車両カウントに適した形式で保存する
上記のスクリプトをPythonファイルとして保存し、コマンドラインから実行します。
出力CSVファイル例:
車両カウントの実施
次に車両カウントを実施します
import pandas as pd from shapely.geometry import Point, Polygon import argparse def load_data(csv_file: str) -> pd.DataFrame: """CSVファイルを読み込み、データフレームを返す Args: csv_file (str): CSVファイルのパス Returns: pd.DataFrame: 読み込んだデータを格納したPandasデータフレーム """ return pd.read_csv(csv_file) def define_area(points: list[tuple[float, float]]) -> Polygon: """エリアを構成する座標リストを受け取り、Polygonオブジェクトを返す Args: points (list of tuples): エリアを定義する座標のリスト [(x1, y1), (x2, y2), ...] Returns: Polygon: ShapelyのPolygonオブジェクト """ return Polygon(points) def count_objects_in_area(df: pd.DataFrame, area_polygon: Polygon) -> dict: """データフレームとエリアのポリゴンを受け取り、エリア内のオブジェクトをクラスごとにカウントする Args: df (pd.DataFrame): オブジェクトのデータを含むデータフレーム area_polygon (Polygon): 対象エリアを定義するPolygonオブジェクト Returns: dict: 各クラスごとのオブジェクト数をtracking_idで集約した辞書 """ counts = {} for _, row in df.iterrows(): class_label = row['class_label'] tracking_id = row['tracking_id'] center_x = row['center_x'] center_y = row['center_y'] # オブジェクトの中心点がエリア内にあるかを確認 point = Point(center_x, center_y) if area_polygon.contains(point): if class_label not in counts: counts[class_label] = set() counts[class_label].add(tracking_id) return counts def print_counts(counts: dict) -> None: """カウント結果を出力する Args: counts (dict): 各クラスのオブジェクト数を保持する辞書 """ for class_label, tracking_ids in counts.items(): print(f"{class_label}: {len(tracking_ids)} objects") def parse_args() -> tuple[str, list[tuple[float, float]]]: """コマンドライン引数を解析する Returns: tuple: CSVファイルのパス (str) とエリアの座標リスト (list of tuples) を含むタプル """ parser = argparse.ArgumentParser(description="Count objects in a specified area from a CSV file.") parser.add_argument("csv_file", type=str, help="Path to the CSV file containing the object data.") parser.add_argument("area_points", type=float, nargs='+', help="List of coordinates defining the area (x1 y1 x2 y2 ...).") args = parser.parse_args() # エリアの座標をペアに分割してリストに変換 if len(args.area_points) % 2 != 0: raise ValueError("The number of coordinates for area_points must be even.") area_points = [(args.area_points[i], args.area_points[i+1]) for i in range(0, len(args.area_points), 2)] return args.csv_file, area_points def main() -> None: # コマンドライン引数を解析 csv_file, area_points = parse_args() df = load_data(csv_file) area_polygon = define_area(area_points) # オブジェクトのカウントを実行 counts = count_objects_in_area(df, area_polygon) # 結果を出力 print_counts(counts) if __name__ == "__main__": main()
このスクリプトでは指定したエリアを通過するオブジェクトを各クラスごとにカウントし表示します
スクリプトの使用方法
コマンドラインでスクリプトを実行し、CSVファイルのパスとエリアの座標を指定します。
CSVファイルは先ほど検出・追跡結果を変換し作成したファイルを指定し、エリアの座標はエリアを構成する3点以上の多角形の各頂点のx座標とy座標のペアで指定します。
座標の確認方法は動画から切り出した画像を使用してペイント等で取得することができますが、こちらのような外部のサイトを使用して取得することも可能です
次の例では、data.csv
というファイルを使用し、エリアを構成する4つの座標 (400, 400), (600, 400), (400, 600), (600, 600)
を指定しています。
python count_object.py output_video_results.csv 400 400 600 400 400 600 600 600
スクリプトを実行することで、指定したエリア内にある各クラスのオブジェクト数を確認することができます。
car: 23 objects truck: 6 objects
課題
クラス誤検知
現在のシステムでは、ミニバンを正確に分類できず、しばしばバスやトラックとして誤認識する問題が発生しています。この原因として、YOLOv8で使用しているモデルがCOCO Datasetを基に学習されていることが挙げられます。COCO Datasetは主に海外の画像を使用しており、海外ではミニバンの普及率が低いため、モデルがミニバンを十分に学習していない可能性があります。
この問題を解決するためには、国内のデータを収集し、それを用いてモデルを追加学習(ファインチューニング)することが考えられます。これにより、ミニバンの認識精度を向上させ、誤認識のリスクを減らすことが期待できます。
まとめ
今回の記事では、YOLOv8を用いた通行量カウントシステムの構築手順を詳細に解説しました。このシステムは、YOLOv8による高精度な物体検出機能を活用し、道路や駐車場といった特定エリア内を通過する車両を自動的にカウントするものです。
本実装は、基本的な車両カウントシステムですが、機能を拡張することでさまざまな用途に応用可能です。以下に、いくつかの実装アイデアを紹介します
- 多様なオブジェクト検出
- 機能:車両以外のオブジェクトも検出可能
- 特徴:COCOデータセットの80種類のクラスに対応
- 実装:クラス指定の変更で簡単に実現
- 応用例:歩行者、自転車、野生動物の調査
- 複数エリア間の移動追跡
- 機能:特定エリア間の移動オブジェクトをカウント
- 特徴:複数ポリゴンエリアでオブジェクトの軌跡を追跡
- 実装:エリア定義と軌跡追跡ロジックの追加
- 応用例:交差点の右左折車両計測、店舗の入退店客数把握
- リアルタイムカウント
- 機能:ライブ映像からのリアルタイム解析
- 特徴:即時的なデータ取得と分析が可能
- 実装:入力ソースを録画動画からライブカメラ映像へ変更
- 応用例:交通量モニタリング、イベント会場の人流分析
最後に
セーフィーではエンジニアを積極的に募集しています。気になる方はこちらをご覧ください
https://safie.co.jp/teams/engineering/
カジュアル面談から受け付けておりますので、気軽に応募いただければと思います
最後までお読みいただき、ありがとうございました。