pdf-icon

LLM630 Compute Kit - StackFlow API Yolo11n Visual Demo

This demo shows how to run a script on your PC to retrieve YOLO detection data via the StackFlow API and open a preview window for real-time visualization.

1. Preparation

  1. Before powering on the device, connect the CamModule SC850SL camera to the LLM630 Compute Kit using the FPC cable as shown below.
  1. Refer to the LLM630 Compute Kit UART / ADB / SSH Debugging Guide to learn how to configure the network and file transfers for the LLM630 Compute Kit, and obtain its IP address.

  2. Follow the LLM630 Compute Kit Software Update Tutorial to install the following model and software packages.

apt install llm-camera llm-yolo # SoftWare Package
Note
The CSI camera uses AI-ISP, which delivers excellent low-light image quality but consumes half of the NPU resources. The default YOLO model cannot be used with AI-ISP enabled. To support AI-ISP, install the following YOLO models:
apt install llm-model-yolo11n-npu1 llm-model-yolo11n-pose-npu1 llm-model-yolo11n-hand-pose-npu1 # Model Package

2. Client Script

Download the client test script. Ensure your PC is on the same subnet as the LLM630 Compute Kit. Make sure Python is installed and use Pip to install dependencies opencv-python and tornado.

pip install opencv-python tornado
pip install opencv-python tornado -i https://mirrors.aliyun.com/pypi/simple # For Chinese users

Copy and save the script below, and run it by passing the actual IP address of the device:

python llm-yolo-visual.py --host 192.168.20.24
import argparse
import base64
import cv2
import json
import numpy as np
import select
import socket
import sys
import time
import threading
import tornado.ioloop
import tornado.web
import platform

if platform.system() == "Windows":
    import msvcrt


latest_frame = [None]

COCO_KP_PAIRS = [
    (0, 2), (2, 4), (0, 1), (1, 3),
    (6, 5), (6, 8), (8, 10), (5, 7),
    (7, 9), (12, 11), (6, 12), (12, 14),
    (14, 16), (5, 11), (11, 13), (13, 15)
]
COCO_COLORS = [
    (255,0,0), (0,255,0), (0,0,255), (255,255,0),
    (255,0,255), (0,255,255), (128,128,0), (128,0,128)
]

HAND_KP_PAIRS = [
    (0, 1), (1, 2), (2, 3), (3, 4),
    (0, 5), (5, 6), (6, 7), (7, 8),
    (0, 17), (17, 18), (18, 19), (19, 20),
    (5, 9), (9, 13), (13, 17),
    (9, 10), (10, 11), (11, 12),
    (13, 14), (14, 15), (15, 16)
]
HAND_COLORS = [
    (255,0,0), (0,255,0), (0,0,255), (255,255,0),
    (255,0,255), (0,255,255), (128,128,0), (128,0,128),
    (0,128,128), (64,64,255), (255,64,64), (64,255,64)
]

class MJPEGHandler(tornado.web.RequestHandler):
    def get(self):
        self.set_header('Content-type', 'multipart/x-mixed-replace; boundary=frame')
        while True:
            if latest_frame[0] is not None:
                ret, jpeg = cv2.imencode('.jpg', latest_frame[0])
                if ret:
                    self.write(b'--frame\r\n')
                    self.write(b'Content-Type: image/jpeg\r\n\r\n')
                    self.write(jpeg.tobytes())
                    self.write(b'\r\n')
                    self.flush()
            tornado.ioloop.IOLoop.current().add_callback(lambda: None)  # yield to event loop


def start_webstream():
    app = tornado.web.Application([
        (r"/video_feed", MJPEGHandler),
    ])
    app.listen(5000)
    print("Tornado webstream started at http://localhost:5000/video_feed")
    tornado.ioloop.IOLoop.current().start()


def create_tcp_connection(host, port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((host, port))
    return sock


def send_json(sock, data):
    json_data = json.dumps(data, ensure_ascii=False) + '\n'
    sock.sendall(json_data.encode('utf-8'))


recv_buffer = ""

def receive_response(sock):
    global recv_buffer
    while '\n' not in recv_buffer:
        part = sock.recv(4096).decode('utf-8')
        if not part:
            break
        recv_buffer += part
    if '\n' in recv_buffer:
        line, recv_buffer = recv_buffer.split('\n', 1)
        return line.strip()
    else:
        line, recv_buffer = recv_buffer, ""
        return line.strip()

def close_connection(sock):
    if sock:
        sock.close()


def create_init_data(response_format, deivce, enoutput, frame_height, frame_width, enable_webstream, rtsp):
    return {
        "request_id": "camera_001",
        "work_id": "camera",
        "action": "setup",
        "object": "camera.setup",
        "data": {
            "response_format": "image.yuvraw.base64" if response_format =="yuv" else "image.jpeg.base64",
            "input": deivce,
            "enoutput": enoutput,
            "frame_width": frame_width,
            "frame_height": frame_height,
            "enable_webstream": enable_webstream,
            "rtsp": "rtsp.1280x720.h265" if rtsp == "h265" else "rtsp.1280x720.h264",
        }
    }


def parse_setup_response(response_data):
    error = response_data.get('error')
    if error and error.get('code') != 0:
        print(f"Error Code: {error['code']}, Message: {error['message']}")
        return None

    return response_data.get('work_id')


def reset(sock):
    sent_request_id = 'reset_000'
    reset_data = {
        "request_id": sent_request_id,
        "work_id": "sys",
        "action": "reset"
    }
    ping_data = {
        "request_id": "ping_000",
        "work_id": "sys",
        "action": "ping"
    }
    send_json(sock, reset_data)
    while True:
        try:
            send_json(sock, ping_data)
            time.sleep(1)
        except (BrokenPipeError, ConnectionResetError, OSError) as e:
            return # Sock disconnection indicates reset is complete


def setup(sock, init_data):
    sent_request_id = init_data['request_id']
    send_json(sock, init_data)
    while True:
        response = receive_response(sock)
        response_data = json.loads(response)
        if response_data.get('request_id') == sent_request_id:
            return parse_setup_response(response_data)


def exit_session(sock, deinit_data):
    send_json(sock, deinit_data)
    print("Exit")


def parse_inference_response(response_data):
    error = response_data.get('error')
    if error and error.get('code') != 0:
        print(f"Error Code: {error['code']}, Message: {error['message']}")
        return None

    return {
        "work_id": response_data.get("work_id"),
        "object": response_data.get("object"),
        "data": response_data.get("data")
    }


def parse_yolo_result(data):
    results = []
    for item in data:
        bbox = [float(x) for x in item.get('bbox', [])]
        kps = [float(x) for x in item.get('kps', [])]
        cls = item.get('class', '')
        conf = float(item.get('confidence', 0))
        results.append({
            'bbox': bbox,
            'class': cls,
            'confidence': conf,
            'kps': kps
        })
    return results


def draw_keypoints(frame, kps, num_points, colors):
    for i in range(num_points):
        x, y, s = int(kps[i*3]), int(kps[i*3+1]), kps[i*3+2]
        if s > 0.05:
            cv2.circle(frame, (x, y), 3, colors[i % len(colors)], -1)


def draw_lines(frame, kps, pairs, colors):
    for idx, (i, j) in enumerate(pairs):
        xi, yi, si = int(kps[i*3]), int(kps[i*3+1]), kps[i*3+2]
        xj, yj, sj = int(kps[j*3]), int(kps[j*3+1]), kps[j*3+2]
        if si > 0.05 and sj > 0.05:
            cv2.line(frame, (xi, yi), (xj, yj), colors[idx % len(colors)], 2)


def main(args):
    sock = create_tcp_connection(args.host, args.port)

    frame_height, frame_width = args.imgsz

    try:
        print("Reset...")
        reset(sock)
        close_connection(sock)
        sock = create_tcp_connection(args.host, args.port)

        print("Setup Camera...")
        init_data = create_init_data(
            response_format = args.format,
            enoutput=args.enoutput,
            deivce=args.device,
            frame_height=frame_height,
            frame_width=frame_width,
            enable_webstream=args.webstream,
            rtsp=args.rtsp
        )
        camera_work_id = setup(sock, init_data)
        if camera_work_id is not None:
            print(f"Camera setup with work_id: {camera_work_id}")
        else:
            print("Camera setup failed.")
            return

        print("Setup Yolo...")
        yolo_init_data = {
            "request_id": "yolo_001",
            "work_id": "yolo",
            "action": "setup",
            "object": "yolo.setup",
            "data": {
                "model": args.model,
                "response_format": "yolo.box",
                "input":  camera_work_id,
                "enoutput": True,
            }
        }
        yolo_work_id = setup(sock, yolo_init_data)
        if yolo_work_id is not None:
            print(f"Yolo setup with work_id: {yolo_work_id}")
        else:
            print("Yolo setup failed.")
            return

        yolo_results = []

        webstream_thread = None
        if args.webstream:
            webstream_thread = threading.Thread(target=start_webstream, daemon=True)
            webstream_thread.start()

        while True:
            if platform.system() == "Windows":
                if msvcrt.kbhit():
                    key = msvcrt.getwch()
                    if key == 'q':
                        print("Quit by user.")
                        break
            else:
                if sys.stdin in select.select([sys.stdin], [], [], 0)[0]:
                    key = sys.stdin.readline().strip()
                    if key == 'q':
                        print("Quit by user.")
                        break

            response = receive_response(sock)
            if not response:
                continue
            response_data = json.loads(response)

            Rawdata = parse_inference_response(response_data)
            if Rawdata is None:
                break

            work_id = Rawdata.get("work_id")
            object = Rawdata.get("object")
            data = Rawdata.get("data")

            if work_id == yolo_work_id and object == "yolo.box":
                yolo_results = parse_yolo_result(data)

            elif work_id == camera_work_id and object in ["image.jpeg.base64", "image.yuyv422.base64"]:
                decoded = base64.b64decode(data)
                if object == "image.yuyv422.base64" or args.format == "yuv":
                    yuv_frame = np.frombuffer(decoded, dtype=np.uint8).reshape((frame_height, frame_width, 2))
                    bgr_frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_YUY2)
                else:
                    jpg_array = np.frombuffer(decoded, dtype=np.uint8)
                    bgr_frame = cv2.imdecode(jpg_array, cv2.IMREAD_COLOR)

                if bgr_frame is not None:
                    if yolo_results:
                        for det in yolo_results:
                            x1, y1, x2, y2 = map(int, det['bbox'])
                            cls = det['class']
                            conf = det['confidence']
                            cv2.rectangle(bgr_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                            cv2.putText(
                                bgr_frame, f"{cls} {conf:.2f}", (x1, y1 - 10),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2
                            )
                            kps = det.get('kps', [])
                            if not kps:
                                continue
                            if args.model == "yolo11n-pose-npu1" and len(kps) == 17 * 3:
                                draw_keypoints(bgr_frame, kps, 17, COCO_COLORS)
                                draw_lines(bgr_frame, kps, COCO_KP_PAIRS, COCO_COLORS)
                            elif args.model == "yolo11n-hand-pose-npu1" and len(kps) == 21 * 3:
                                draw_keypoints(bgr_frame, kps, 21, HAND_COLORS)
                                draw_lines(bgr_frame, kps, HAND_KP_PAIRS, HAND_COLORS)

                    if args.webstream:
                        latest_frame[0] = bgr_frame.copy()

                    if args.host not in ["localhost", "127.0.0.1"]:
                        cv2.imshow("YOLO Detection", bgr_frame)
                        if cv2.waitKey(1) & 0xFF == ord('q'):
                            break

        cv2.destroyAllWindows()

        exit_session(sock, {
            "request_id": "yolo_exit",
            "work_id": yolo_work_id,
            "action": "exit"
        })
        exit_session(sock, {
            "request_id": "camera_exit",
            "work_id": camera_work_id,
            "action": "exit"
        })
        time.sleep(3) # Allow time for the exit command to be processed
    finally:
        close_connection(sock)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="TCP Client to send JSON data.")
    parser.add_argument("--host", type=str, default="localhost", help="Server hostname (default: localhost)")
    parser.add_argument("--port", type=int, default=10001, help="Server port (default: 10001)")
    parser.add_argument("--device", type=str, default="axera_single_sc850sl", help="Camera name, i.e. axera_single_sc850sl or /dev/video0")
    parser.add_argument("--enoutput", type=bool, default=True, help="Whether to output image data")
    parser.add_argument("--format", "--output-format", type=str, default="jpeg", help="Output image data format, i.e. jpeg or yuv")
    parser.add_argument("--imgsz", "--img", "--img-size", nargs="+", type=int, default=[320, 320], help="image (h, w)")
    parser.add_argument("--webstream", action="store_true", help="Enable webstream")
    parser.add_argument("--rtsp", default="h264", help="rtsp output, i.e. h264 or h265")
    parser.add_argument("--model", type=str, default="yolo11n-npu1", help="Model name, i.e. yolo11n-npu1 or yolo11n-pose-npu1, yolo11n-hand-pose-npu1")


    args = parser.parse_args()
    main(args)

Parameter Explanation

  • host: IP address of the LLM630 Compute Kit
  • port: TCP port (default: 10001)
  • device: Camera device name. Use 'axera_single_sc850sl' for MIPI CSI cameras. For USB cameras, specify like /dev/video0.
  • enoutput: Whether to output image data (default: false)
  • format: Output format, default is 'yuv', can be 'jpeg'
  • imgsz: Image resolution, default is 320×320
  • webstream: Enable browser streaming. When enabled, access http://IP:8989/ for raw camera stream and http://IP:5000/video_feed for YOLO results. Replace IP with actual device IP.
  • rtsp: RTSP stream codec. Default: 'h264', optional: 'h265'
  • model: YOLO model to load. Default: yolo11n-npu1, options: yolo11n-pose-npu1, yolo11n-hand-pose-npu1

3. Start Interaction

The PC will display the camera feed and detection results as shown below. Press the “q” key to exit.

On This Page