pdf-icon

LLM630 Compute Kit - StackFlow API DepthAnything

This example demonstrates how to acquire and preview DepthAnything-processed image data via StackFlow API by running a script on a PC.

1. Preparation

  1. Before powering on the device, connect the CamModule SC850SL camera to the LLM630 Compute Kit using an FPC cable as shown below:
  1. Refer to the LLM630 Compute Kit UART / ADB / SSH Debugging Guide to configure network access and file transfers, and to obtain the device IP address.

  2. Refer to the LLM630 Compute Kit Software Update Guide and install the following packages:

apt install llm-camera llm-depth-anything # SoftWare Package
Note
The CSI camera uses AI-ISP, which delivers excellent image quality in low light but consumes half of the NPU resources. The default DepthAnything model is not compatible with AI-ISP mode. To enable it, install the AI-ISP-compatible model using the command below:
apt install llm-model-depth-anything-npu1-ax630c # Model Package

2. Client Script

Download the test client script and ensure your PC is on the same subnet as the LLM630 Compute Kit. On the PC, install opencv-python using pip:

pip install opencv-python
pip install opencv-python -i https://mirrors.aliyun.com/pypi/simple # For Chinese users

Copy and save the following script. Run it with the actual device IP address:

python llm-depth-anything.py --host 192.168.20.24
import argparse
import base64
import cv2
import json
import numpy as np
import select
import socket
import sys
import time
import threading
import tornado.ioloop
import tornado.web

latest_frame = [None]

class MJPEGHandler(tornado.web.RequestHandler):
    def get(self):
        self.set_header('Content-type', 'multipart/x-mixed-replace; boundary=frame')
        while True:
            if latest_frame[0] is not None:
                ret, jpeg = cv2.imencode('.jpg', latest_frame[0])
                if ret:
                    self.write(b'--frame\r\n')
                    self.write(b'Content-Type: image/jpeg\r\n\r\n')
                    self.write(jpeg.tobytes())
                    self.write(b'\r\n')
                    self.flush()
            tornado.ioloop.IOLoop.current().add_callback(lambda: None)  # yield to event loop


def start_webstream():
    app = tornado.web.Application([
        (r"/video_feed", MJPEGHandler),
    ])
    app.listen(5000)
    print("Tornado webstream started at http://localhost:5000/video_feed")
    tornado.ioloop.IOLoop.current().start()



def create_tcp_connection(host, port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((host, port))
    return sock


def send_json(sock, data):
    json_data = json.dumps(data, ensure_ascii=False) + '\n'
    sock.sendall(json_data.encode('utf-8'))


recv_buffer = ""

def receive_response(sock):
    global recv_buffer
    while '\n' not in recv_buffer:
        part = sock.recv(4096).decode('utf-8')
        if not part:
            break
        recv_buffer += part
    if '\n' in recv_buffer:
        line, recv_buffer = recv_buffer.split('\n', 1)
        return line.strip()
    else:
        line, recv_buffer = recv_buffer, ""
        return line.strip()

def close_connection(sock):
    if sock:
        sock.close()


def create_init_data(response_format, deivce, enoutput, frame_height, frame_width, enable_webstream, rtsp):
    return {
        "request_id": "camera_001",
        "work_id": "camera",
        "action": "setup",
        "object": "camera.setup",
        "data": {
            "response_format": "image.yuvraw.base64" if response_format =="yuv" else "image.jpeg.base64",
            "input": deivce,
            "enoutput": enoutput,
            "frame_width": frame_width,
            "frame_height": frame_height,
            "enable_webstream": enable_webstream,
            "rtsp": "rtsp.1280x720.h265" if rtsp == "h265" else "rtsp.1280x720.h264",
        }
    }


def parse_setup_response(response_data):
    error = response_data.get('error')
    if error and error.get('code') != 0:
        print(f"Error Code: {error['code']}, Message: {error['message']}")
        return None

    return response_data.get('work_id')


def reset(sock):
    sent_request_id = 'reset_000'
    reset_data = {
        "request_id": sent_request_id,
        "work_id": "sys",
        "action": "reset"
    }
    ping_data = {
        "request_id": "ping_000",
        "work_id": "sys",
        "action": "ping"
    }
    send_json(sock, reset_data)
    while True:
        try:
            send_json(sock, ping_data)
            time.sleep(1)
        except (BrokenPipeError, ConnectionResetError, OSError) as e:
            return # Sock disconnection indicates reset is complete


def setup(sock, init_data):
    sent_request_id = init_data['request_id']
    send_json(sock, init_data)
    while True:
        response = receive_response(sock)
        response_data = json.loads(response)
        if response_data.get('request_id') == sent_request_id:
            return parse_setup_response(response_data)


def exit_session(sock, deinit_data):
    send_json(sock, deinit_data)
    print("Exit")


def parse_inference_response(response_data):
    error = response_data.get('error')
    if error and error.get('code') != 0:
        print(f"Error Code: {error['code']}, Message: {error['message']}")
        return None

    return {
        "work_id": response_data.get("work_id"),
        "object": response_data.get("object"),
        "data": response_data.get("data")
    }


def main(args):
    sock = create_tcp_connection(args.host, args.port)

    frame_height, frame_width = args.imgsz

    try:
        print("Reset...")
        reset(sock)
        close_connection(sock)
        sock = create_tcp_connection(args.host, args.port)

        print("Setup Camera...")
        init_data = create_init_data(
            response_format = args.format,
            enoutput=args.enoutput,
            deivce=args.device,
            frame_height=frame_height,
            frame_width=frame_width,
            enable_webstream=args.webstream,
            rtsp=args.rtsp
        )
        camera_work_id = setup(sock, init_data)
        if camera_work_id is not None:
            print(f"Camera setup with work_id: {camera_work_id}")
        else:
            print("Camera setup failed.")
            return

        print("Setup Depth Anything...")
        depth_anything_init_data = {
            "request_id": "depth_anything_001",
            "work_id": "depth_anything",
            "action": "setup",
            "object": "depth_anything.setup",
            "data": {
                "model": args.model,
                "response_format": "image.jpeg.base64",
                "input":  camera_work_id,
                "enoutput": True,
            }
        }
        depth_anything_work_id = setup(sock, depth_anything_init_data)
        if depth_anything_work_id is not None:
            print(f"Depth Anything setup with work_id: {depth_anything_work_id}")
        else:
            print("Depth Anything setup failed.")
            return

        print("Press 'q' to exit")
        depth_anything_bgr_frame = None

        webstream_thread = None
        if args.webstream:
            webstream_thread = threading.Thread(target=start_webstream, daemon=True)
            webstream_thread.start()

        while True:
            response = receive_response(sock)
            if not response:
                continue
            response_data = json.loads(response)

            Rawdata = parse_inference_response(response_data)
            if Rawdata is None:
                break

            work_id = Rawdata.get("work_id")
            object = Rawdata.get("object")
            data = Rawdata.get("data")

            if work_id == depth_anything_work_id and object == "image.jpeg.base64":
                decoded = base64.b64decode(data)
                jpg_array = np.frombuffer(decoded, dtype=np.uint8)
                depth_anything_bgr_frame = cv2.imdecode(jpg_array, cv2.IMREAD_COLOR)

                if depth_anything_bgr_frame is not None:
                    if args.webstream:
                        latest_frame[0] = depth_anything_bgr_frame.copy()

                    if args.host not in ["localhost", "127.0.0.1"]:
                        cv2.imshow("Depth Anything", depth_anything_bgr_frame)
                        if cv2.waitKey(1) & 0xFF == ord('q'):
                            break

        cv2.destroyAllWindows()

        exit_session(sock, {
            "request_id": "depth_anything_exit",
            "work_id": depth_anything_work_id,
            "action": "exit"
        })
        exit_session(sock, {
            "request_id": "camera_exit",
            "work_id": camera_work_id,
            "action": "exit"
        })
        time.sleep(3) # Allow time for the exit command to be processed
    finally:
        close_connection(sock)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="TCP Client to send JSON data.")
    parser.add_argument("--host", type=str, default="localhost", help="Server hostname (default: localhost)")
    parser.add_argument("--port", type=int, default=10001, help="Server port (default: 10001)")
    parser.add_argument("--device", type=str, default="axera_single_sc850sl", help="Camera name, i.e. axera_single_sc850sl or /dev/video0")
    parser.add_argument("--enoutput", type=bool, default=False, help="Whether to output image data")
    parser.add_argument("--format", "--output-format", type=str, default="jpeg", help="Output image data format, i.e. jpeg or yuv")
    parser.add_argument("--imgsz", "--img", "--img-size", nargs="+", type=int, default=[256, 384], help="image (h, w)")
    parser.add_argument("--webstream", action="store_true", help="Enable webstream")
    parser.add_argument("--rtsp", default="h264", help="rtsp output, i.e. h264 or h265")
    parser.add_argument("--model", type=str, default="depth-anything-npu1-ax630c", help="Model name")


    args = parser.parse_args()
    main(args)

Parameter Description

  • host: IP address of the LLM630 Compute Kit

  • port: TCP port (default: 10001)

  • device: Camera name; use 'axera_single_sc850sl' for MIPI CSI, or e.g. '/dev/video0' for USB cameras

  • enoutput: Whether to output image data (default: false)

  • format: Output image format (default: yuv, options: jpeg)

  • imgsz: Image size (default: 320×320)

  • webstream: Whether to enable web streaming (default: off). If enabled:

    • http://IP:8989/ shows the camera stream
    • http://IP:5000/video_feed shows the DepthAnything result
  • rtsp: RTSP stream encoding format (h264 by default, or h265)

  • model: DepthAnything model name to load (default: depth-anything-npu1-ax630c)

3. Start Interaction

The camera view and depth detection result will be shown on the PC screen, as illustrated below. Press the "q" key to exit.

On This Page