pdf-icon

LLM630 Compute Kit - StackFlow API Yolo11n 可視化デモ

本デモでは、PC 上でスクリプトを実行し、StackFlow API 経由で YOLO の検出データを取得し、リアルタイムでプレビュー表示する方法を紹介します。

1. 準備作業

  1. 以下の図に従って、電源を入れる前に CamModule SC850SL カメラと LLM630 Compute Kit を FPC ケーブルで接続します。
  1. LLM630 Compute Kit の UART / ADB / SSH 接続とデバッグ手順を参照し、ネットワーク設定とファイル転送方法を学び、デバイスの IP アドレスを取得します。

  2. LLM630 Compute Kit ソフトウェア更新ガイドを参考に、以下のソフトウェアおよびモデルパッケージをインストールしてください。

apt install llm-camera llm-yolo # SoftWare Package
注意
CSI カメラは AI-ISP を使用しており、暗所で非常に優れた画質を実現しますが、NPU の半分を消費します。デフォルトの YOLO モデルは AI-ISP 有効時には使用できません。以下のコマンドで AI-ISP 対応 YOLO モデルをインストールしてください。
apt install llm-model-yolo11n-npu1 llm-model-yolo11n-pose-npu1 llm-model-yolo11n-hand-pose-npu1 # Model Package

2. クライアントプログラム

クライアントスクリプトをダウンロードし、PC が LLM630 Compute Kit と同一ネットワークセグメント上にあることを確認してください。Python 環境を用意し、opencv-python および tornado を Pip でインストールします。

pip install opencv-python tornado
pip install opencv-python tornado -i https://mirrors.aliyun.com/pypi/simple # For Chinese users

以下のスクリプトをコピー&保存し、実行時にデバイスの IP アドレスを引数に渡します。

python llm-yolo-visual.py --host 192.168.20.24
import argparse
import base64
import cv2
import json
import numpy as np
import select
import socket
import sys
import time
import threading
import tornado.ioloop
import tornado.web
import platform

if platform.system() == "Windows":
    import msvcrt


latest_frame = [None]

COCO_KP_PAIRS = [
    (0, 2), (2, 4), (0, 1), (1, 3),
    (6, 5), (6, 8), (8, 10), (5, 7),
    (7, 9), (12, 11), (6, 12), (12, 14),
    (14, 16), (5, 11), (11, 13), (13, 15)
]
COCO_COLORS = [
    (255,0,0), (0,255,0), (0,0,255), (255,255,0),
    (255,0,255), (0,255,255), (128,128,0), (128,0,128)
]

HAND_KP_PAIRS = [
    (0, 1), (1, 2), (2, 3), (3, 4),
    (0, 5), (5, 6), (6, 7), (7, 8),
    (0, 17), (17, 18), (18, 19), (19, 20),
    (5, 9), (9, 13), (13, 17),
    (9, 10), (10, 11), (11, 12),
    (13, 14), (14, 15), (15, 16)
]
HAND_COLORS = [
    (255,0,0), (0,255,0), (0,0,255), (255,255,0),
    (255,0,255), (0,255,255), (128,128,0), (128,0,128),
    (0,128,128), (64,64,255), (255,64,64), (64,255,64)
]

class MJPEGHandler(tornado.web.RequestHandler):
    def get(self):
        self.set_header('Content-type', 'multipart/x-mixed-replace; boundary=frame')
        while True:
            if latest_frame[0] is not None:
                ret, jpeg = cv2.imencode('.jpg', latest_frame[0])
                if ret:
                    self.write(b'--frame\r\n')
                    self.write(b'Content-Type: image/jpeg\r\n\r\n')
                    self.write(jpeg.tobytes())
                    self.write(b'\r\n')
                    self.flush()
            tornado.ioloop.IOLoop.current().add_callback(lambda: None)  # yield to event loop


def start_webstream():
    app = tornado.web.Application([
        (r"/video_feed", MJPEGHandler),
    ])
    app.listen(5000)
    print("Tornado webstream started at http://localhost:5000/video_feed")
    tornado.ioloop.IOLoop.current().start()


def create_tcp_connection(host, port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((host, port))
    return sock


def send_json(sock, data):
    json_data = json.dumps(data, ensure_ascii=False) + '\n'
    sock.sendall(json_data.encode('utf-8'))


recv_buffer = ""

def receive_response(sock):
    global recv_buffer
    while '\n' not in recv_buffer:
        part = sock.recv(4096).decode('utf-8')
        if not part:
            break
        recv_buffer += part
    if '\n' in recv_buffer:
        line, recv_buffer = recv_buffer.split('\n', 1)
        return line.strip()
    else:
        line, recv_buffer = recv_buffer, ""
        return line.strip()

def close_connection(sock):
    if sock:
        sock.close()


def create_init_data(response_format, deivce, enoutput, frame_height, frame_width, enable_webstream, rtsp):
    return {
        "request_id": "camera_001",
        "work_id": "camera",
        "action": "setup",
        "object": "camera.setup",
        "data": {
            "response_format": "image.yuvraw.base64" if response_format =="yuv" else "image.jpeg.base64",
            "input": deivce,
            "enoutput": enoutput,
            "frame_width": frame_width,
            "frame_height": frame_height,
            "enable_webstream": enable_webstream,
            "rtsp": "rtsp.1280x720.h265" if rtsp == "h265" else "rtsp.1280x720.h264",
        }
    }


def parse_setup_response(response_data):
    error = response_data.get('error')
    if error and error.get('code') != 0:
        print(f"Error Code: {error['code']}, Message: {error['message']}")
        return None

    return response_data.get('work_id')


def reset(sock):
    sent_request_id = 'reset_000'
    reset_data = {
        "request_id": sent_request_id,
        "work_id": "sys",
        "action": "reset"
    }
    ping_data = {
        "request_id": "ping_000",
        "work_id": "sys",
        "action": "ping"
    }
    send_json(sock, reset_data)
    while True:
        try:
            send_json(sock, ping_data)
            time.sleep(1)
        except (BrokenPipeError, ConnectionResetError, OSError) as e:
            return # Sock disconnection indicates reset is complete


def setup(sock, init_data):
    sent_request_id = init_data['request_id']
    send_json(sock, init_data)
    while True:
        response = receive_response(sock)
        response_data = json.loads(response)
        if response_data.get('request_id') == sent_request_id:
            return parse_setup_response(response_data)


def exit_session(sock, deinit_data):
    send_json(sock, deinit_data)
    print("Exit")


def parse_inference_response(response_data):
    error = response_data.get('error')
    if error and error.get('code') != 0:
        print(f"Error Code: {error['code']}, Message: {error['message']}")
        return None

    return {
        "work_id": response_data.get("work_id"),
        "object": response_data.get("object"),
        "data": response_data.get("data")
    }


def parse_yolo_result(data):
    results = []
    for item in data:
        bbox = [float(x) for x in item.get('bbox', [])]
        kps = [float(x) for x in item.get('kps', [])]
        cls = item.get('class', '')
        conf = float(item.get('confidence', 0))
        results.append({
            'bbox': bbox,
            'class': cls,
            'confidence': conf,
            'kps': kps
        })
    return results


def draw_keypoints(frame, kps, num_points, colors):
    for i in range(num_points):
        x, y, s = int(kps[i*3]), int(kps[i*3+1]), kps[i*3+2]
        if s > 0.05:
            cv2.circle(frame, (x, y), 3, colors[i % len(colors)], -1)


def draw_lines(frame, kps, pairs, colors):
    for idx, (i, j) in enumerate(pairs):
        xi, yi, si = int(kps[i*3]), int(kps[i*3+1]), kps[i*3+2]
        xj, yj, sj = int(kps[j*3]), int(kps[j*3+1]), kps[j*3+2]
        if si > 0.05 and sj > 0.05:
            cv2.line(frame, (xi, yi), (xj, yj), colors[idx % len(colors)], 2)


def main(args):
    sock = create_tcp_connection(args.host, args.port)

    frame_height, frame_width = args.imgsz

    try:
        print("Reset...")
        reset(sock)
        close_connection(sock)
        sock = create_tcp_connection(args.host, args.port)

        print("Setup Camera...")
        init_data = create_init_data(
            response_format = args.format,
            enoutput=args.enoutput,
            deivce=args.device,
            frame_height=frame_height,
            frame_width=frame_width,
            enable_webstream=args.webstream,
            rtsp=args.rtsp
        )
        camera_work_id = setup(sock, init_data)
        if camera_work_id is not None:
            print(f"Camera setup with work_id: {camera_work_id}")
        else:
            print("Camera setup failed.")
            return

        print("Setup Yolo...")
        yolo_init_data = {
            "request_id": "yolo_001",
            "work_id": "yolo",
            "action": "setup",
            "object": "yolo.setup",
            "data": {
                "model": args.model,
                "response_format": "yolo.box",
                "input":  camera_work_id,
                "enoutput": True,
            }
        }
        yolo_work_id = setup(sock, yolo_init_data)
        if yolo_work_id is not None:
            print(f"Yolo setup with work_id: {yolo_work_id}")
        else:
            print("Yolo setup failed.")
            return

        yolo_results = []

        webstream_thread = None
        if args.webstream:
            webstream_thread = threading.Thread(target=start_webstream, daemon=True)
            webstream_thread.start()

        while True:
            if platform.system() == "Windows":
                if msvcrt.kbhit():
                    key = msvcrt.getwch()
                    if key == 'q':
                        print("Quit by user.")
                        break
            else:
                if sys.stdin in select.select([sys.stdin], [], [], 0)[0]:
                    key = sys.stdin.readline().strip()
                    if key == 'q':
                        print("Quit by user.")
                        break

            response = receive_response(sock)
            if not response:
                continue
            response_data = json.loads(response)

            Rawdata = parse_inference_response(response_data)
            if Rawdata is None:
                break

            work_id = Rawdata.get("work_id")
            object = Rawdata.get("object")
            data = Rawdata.get("data")

            if work_id == yolo_work_id and object == "yolo.box":
                yolo_results = parse_yolo_result(data)

            elif work_id == camera_work_id and object in ["image.jpeg.base64", "image.yuyv422.base64"]:
                decoded = base64.b64decode(data)
                if object == "image.yuyv422.base64" or args.format == "yuv":
                    yuv_frame = np.frombuffer(decoded, dtype=np.uint8).reshape((frame_height, frame_width, 2))
                    bgr_frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_YUY2)
                else:
                    jpg_array = np.frombuffer(decoded, dtype=np.uint8)
                    bgr_frame = cv2.imdecode(jpg_array, cv2.IMREAD_COLOR)

                if bgr_frame is not None:
                    if yolo_results:
                        for det in yolo_results:
                            x1, y1, x2, y2 = map(int, det['bbox'])
                            cls = det['class']
                            conf = det['confidence']
                            cv2.rectangle(bgr_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                            cv2.putText(
                                bgr_frame, f"{cls} {conf:.2f}", (x1, y1 - 10),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2
                            )
                            kps = det.get('kps', [])
                            if not kps:
                                continue
                            if args.model == "yolo11n-pose-npu1" and len(kps) == 17 * 3:
                                draw_keypoints(bgr_frame, kps, 17, COCO_COLORS)
                                draw_lines(bgr_frame, kps, COCO_KP_PAIRS, COCO_COLORS)
                            elif args.model == "yolo11n-hand-pose-npu1" and len(kps) == 21 * 3:
                                draw_keypoints(bgr_frame, kps, 21, HAND_COLORS)
                                draw_lines(bgr_frame, kps, HAND_KP_PAIRS, HAND_COLORS)

                    if args.webstream:
                        latest_frame[0] = bgr_frame.copy()

                    if args.host not in ["localhost", "127.0.0.1"]:
                        cv2.imshow("YOLO Detection", bgr_frame)
                        if cv2.waitKey(1) & 0xFF == ord('q'):
                            break

        cv2.destroyAllWindows()

        exit_session(sock, {
            "request_id": "yolo_exit",
            "work_id": yolo_work_id,
            "action": "exit"
        })
        exit_session(sock, {
            "request_id": "camera_exit",
            "work_id": camera_work_id,
            "action": "exit"
        })
        time.sleep(3) # Allow time for the exit command to be processed
    finally:
        close_connection(sock)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="TCP Client to send JSON data.")
    parser.add_argument("--host", type=str, default="localhost", help="Server hostname (default: localhost)")
    parser.add_argument("--port", type=int, default=10001, help="Server port (default: 10001)")
    parser.add_argument("--device", type=str, default="axera_single_sc850sl", help="Camera name, i.e. axera_single_sc850sl or /dev/video0")
    parser.add_argument("--enoutput", type=bool, default=True, help="Whether to output image data")
    parser.add_argument("--format", "--output-format", type=str, default="jpeg", help="Output image data format, i.e. jpeg or yuv")
    parser.add_argument("--imgsz", "--img", "--img-size", nargs="+", type=int, default=[320, 320], help="image (h, w)")
    parser.add_argument("--webstream", action="store_true", help="Enable webstream")
    parser.add_argument("--rtsp", default="h264", help="rtsp output, i.e. h264 or h265")
    parser.add_argument("--model", type=str, default="yolo11n-npu1", help="Model name, i.e. yolo11n-npu1 or yolo11n-pose-npu1, yolo11n-hand-pose-npu1")


    args = parser.parse_args()
    main(args)

パラメータの説明

  • host:LLM630 Compute Kit の IP アドレス
  • port:TCP ポート(デフォルト:10001)
  • device:カメラ名。MIPI CSI カメラなら 'axera_single_sc850sl'。USB カメラなら /dev/video0 などを指定
  • enoutput:画像出力を有効にするか(デフォルト:無効)
  • format:出力画像フォーマット。デフォルトは 'yuv'、'jpeg' も選択可
  • imgsz:画像サイズ。デフォルトは 320×320
  • webstream:Web ストリームの有効化。ON にすると、http://IP:8989/ でカメラ画像、http://IP:5000/video_feed で検出結果を確認可能(IP は実際のものに置き換えてください)
  • rtsp:RTSP 映像のエンコード方式。デフォルトは 'h264'、'h265' も可
  • model:YOLO モデル名。デフォルトは 'yolo11n-npu1'、'yolo11n-pose-npu1' や 'yolo11n-hand-pose-npu1' に変更可能

3. 操作開始

PC 画面にカメラ映像と検出結果が表示されます。下図のように表示され、「q」キーで終了できます。

On This Page