pdf-icon

StackFlow AI プラットフォーム

LLM630 Compute Kit - StackFlow API DepthAnything

この例では、PC 上でスクリプトを実行し、StackFlow API を通じて DepthAnything で変換された画像データを取得し、プレビューウィンドウでリアルタイムに表示する方法を紹介します。

1. 準備

  1. デバイスに電源を入れる前に、以下の図のように FPC ケーブルを使って CamModule SC850SL カメラを LLM630 Compute Kit に接続します。
  1. LLM630 Compute Kit UART / ADB / SSH 接続ガイドを参考にし、ネットワークやファイル転送の設定を行い、デバイスの IP アドレスを取得します。

  2. LLM630 Compute Kit ソフトウェアアップデートガイドを参照して、以下のパッケージをインストールしてください。

apt install llm-camera llm-depth-anything # SoftWare Package
注意
CSI カメラは AI-ISP を使用しており、暗所でも優れた画質を提供しますが、NPU の半分を AI-ISP に使用します。デフォルトの DepthAnything モデルは AI-ISP モードと互換性がありません。以下のコマンドで AI-ISP 対応モデルをインストールしてください。
apt install llm-model-depth-anything-npu1-ax630c # Model Package

2. クライアントプログラム

テスト用クライアントスクリプトをダウンロードし、PC と LLM630 Compute Kit が同じネットワーク内にあることを確認します。PC 上で以下のように pip を使用して opencv-python をインストールしてください。

pip install opencv-python
pip install opencv-python -i https://mirrors.aliyun.com/pypi/simple # For Chinese users

以下のスクリプトをコピーして保存し、実際のデバイス IP を指定して実行します:

python llm-depth-anything.py --host 192.168.20.24
import argparse
import base64
import cv2
import json
import numpy as np
import select
import socket
import sys
import time
import threading
import tornado.ioloop
import tornado.web

latest_frame = [None]

class MJPEGHandler(tornado.web.RequestHandler):
    def get(self):
        self.set_header('Content-type', 'multipart/x-mixed-replace; boundary=frame')
        while True:
            if latest_frame[0] is not None:
                ret, jpeg = cv2.imencode('.jpg', latest_frame[0])
                if ret:
                    self.write(b'--frame\r\n')
                    self.write(b'Content-Type: image/jpeg\r\n\r\n')
                    self.write(jpeg.tobytes())
                    self.write(b'\r\n')
                    self.flush()
            tornado.ioloop.IOLoop.current().add_callback(lambda: None)  # yield to event loop


def start_webstream():
    app = tornado.web.Application([
        (r"/video_feed", MJPEGHandler),
    ])
    app.listen(5000)
    print("Tornado webstream started at http://localhost:5000/video_feed")
    tornado.ioloop.IOLoop.current().start()



def create_tcp_connection(host, port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((host, port))
    return sock


def send_json(sock, data):
    json_data = json.dumps(data, ensure_ascii=False) + '\n'
    sock.sendall(json_data.encode('utf-8'))


recv_buffer = ""

def receive_response(sock):
    global recv_buffer
    while '\n' not in recv_buffer:
        part = sock.recv(4096).decode('utf-8')
        if not part:
            break
        recv_buffer += part
    if '\n' in recv_buffer:
        line, recv_buffer = recv_buffer.split('\n', 1)
        return line.strip()
    else:
        line, recv_buffer = recv_buffer, ""
        return line.strip()

def close_connection(sock):
    if sock:
        sock.close()


def create_init_data(response_format, deivce, enoutput, frame_height, frame_width, enable_webstream, rtsp):
    return {
        "request_id": "camera_001",
        "work_id": "camera",
        "action": "setup",
        "object": "camera.setup",
        "data": {
            "response_format": "image.yuvraw.base64" if response_format =="yuv" else "image.jpeg.base64",
            "input": deivce,
            "enoutput": enoutput,
            "frame_width": frame_width,
            "frame_height": frame_height,
            "enable_webstream": enable_webstream,
            "rtsp": "rtsp.1280x720.h265" if rtsp == "h265" else "rtsp.1280x720.h264",
        }
    }


def parse_setup_response(response_data):
    error = response_data.get('error')
    if error and error.get('code') != 0:
        print(f"Error Code: {error['code']}, Message: {error['message']}")
        return None

    return response_data.get('work_id')


def reset(sock):
    sent_request_id = 'reset_000'
    reset_data = {
        "request_id": sent_request_id,
        "work_id": "sys",
        "action": "reset"
    }
    ping_data = {
        "request_id": "ping_000",
        "work_id": "sys",
        "action": "ping"
    }
    send_json(sock, reset_data)
    while True:
        try:
            send_json(sock, ping_data)
            time.sleep(1)
        except (BrokenPipeError, ConnectionResetError, OSError) as e:
            return # Sock disconnection indicates reset is complete


def setup(sock, init_data):
    sent_request_id = init_data['request_id']
    send_json(sock, init_data)
    while True:
        response = receive_response(sock)
        response_data = json.loads(response)
        if response_data.get('request_id') == sent_request_id:
            return parse_setup_response(response_data)


def exit_session(sock, deinit_data):
    send_json(sock, deinit_data)
    print("Exit")


def parse_inference_response(response_data):
    error = response_data.get('error')
    if error and error.get('code') != 0:
        print(f"Error Code: {error['code']}, Message: {error['message']}")
        return None

    return {
        "work_id": response_data.get("work_id"),
        "object": response_data.get("object"),
        "data": response_data.get("data")
    }


def main(args):
    sock = create_tcp_connection(args.host, args.port)

    frame_height, frame_width = args.imgsz

    try:
        print("Reset...")
        reset(sock)
        close_connection(sock)
        sock = create_tcp_connection(args.host, args.port)

        print("Setup Camera...")
        init_data = create_init_data(
            response_format = args.format,
            enoutput=args.enoutput,
            deivce=args.device,
            frame_height=frame_height,
            frame_width=frame_width,
            enable_webstream=args.webstream,
            rtsp=args.rtsp
        )
        camera_work_id = setup(sock, init_data)
        if camera_work_id is not None:
            print(f"Camera setup with work_id: {camera_work_id}")
        else:
            print("Camera setup failed.")
            return

        print("Setup Depth Anything...")
        depth_anything_init_data = {
            "request_id": "depth_anything_001",
            "work_id": "depth_anything",
            "action": "setup",
            "object": "depth_anything.setup",
            "data": {
                "model": args.model,
                "response_format": "image.jpeg.base64",
                "input":  camera_work_id,
                "enoutput": True,
            }
        }
        depth_anything_work_id = setup(sock, depth_anything_init_data)
        if depth_anything_work_id is not None:
            print(f"Depth Anything setup with work_id: {depth_anything_work_id}")
        else:
            print("Depth Anything setup failed.")
            return

        print("Press 'q' to exit")
        depth_anything_bgr_frame = None

        webstream_thread = None
        if args.webstream:
            webstream_thread = threading.Thread(target=start_webstream, daemon=True)
            webstream_thread.start()

        while True:
            response = receive_response(sock)
            if not response:
                continue
            response_data = json.loads(response)

            Rawdata = parse_inference_response(response_data)
            if Rawdata is None:
                break

            work_id = Rawdata.get("work_id")
            object = Rawdata.get("object")
            data = Rawdata.get("data")

            if work_id == depth_anything_work_id and object == "image.jpeg.base64":
                decoded = base64.b64decode(data)
                jpg_array = np.frombuffer(decoded, dtype=np.uint8)
                depth_anything_bgr_frame = cv2.imdecode(jpg_array, cv2.IMREAD_COLOR)

                if depth_anything_bgr_frame is not None:
                    if args.webstream:
                        latest_frame[0] = depth_anything_bgr_frame.copy()

                    if args.host not in ["localhost", "127.0.0.1"]:
                        cv2.imshow("Depth Anything", depth_anything_bgr_frame)
                        if cv2.waitKey(1) & 0xFF == ord('q'):
                            break

        cv2.destroyAllWindows()

        exit_session(sock, {
            "request_id": "depth_anything_exit",
            "work_id": depth_anything_work_id,
            "action": "exit"
        })
        exit_session(sock, {
            "request_id": "camera_exit",
            "work_id": camera_work_id,
            "action": "exit"
        })
        time.sleep(3) # Allow time for the exit command to be processed
    finally:
        close_connection(sock)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="TCP Client to send JSON data.")
    parser.add_argument("--host", type=str, default="localhost", help="Server hostname (default: localhost)")
    parser.add_argument("--port", type=int, default=10001, help="Server port (default: 10001)")
    parser.add_argument("--device", type=str, default="axera_single_sc850sl", help="Camera name, i.e. axera_single_sc850sl or /dev/video0")
    parser.add_argument("--enoutput", type=bool, default=False, help="Whether to output image data")
    parser.add_argument("--format", "--output-format", type=str, default="jpeg", help="Output image data format, i.e. jpeg or yuv")
    parser.add_argument("--imgsz", "--img", "--img-size", nargs="+", type=int, default=[256, 384], help="image (h, w)")
    parser.add_argument("--webstream", action="store_true", help="Enable webstream")
    parser.add_argument("--rtsp", default="h264", help="rtsp output, i.e. h264 or h265")
    parser.add_argument("--model", type=str, default="depth-anything-npu1-ax630c", help="Model name")


    args = parser.parse_args()
    main(args)

パラメータ説明

  • host:LLM630 Compute Kit の IP アドレス

  • port:TCP ポート(デフォルト:10001)

  • device:カメラ名(MIPI CSI の場合 'axera_single_sc850sl'、USB カメラは '/dev/video0' など)

  • enoutput:画像データを出力するか(デフォルト:無効)

  • format:出力画像のフォーマット(デフォルト:yuvjpeg も選択可能)

  • imgsz:画像サイズ(デフォルト:320×320)

  • webstream:Web ストリームを有効にするか(デフォルト:無効)

    • 有効時、http://IP:8989/ でカメラ映像、http://IP:5000/video_feed で深度画像を表示可能
  • rtsp:RTSP 映像のエンコード方式(デフォルトは h264h265 も選択可能)

  • model:使用する DepthAnything モデル名(デフォルト:depth-anything-npu1-ax630c

3. インタラクション開始

以下のように、PC 画面にカメラ映像と深度検出結果が表示されます。キーボードの “q” キーを押すと終了します。

On This Page