本デモでは、PC 上でスクリプトを実行し、StackFlow API 経由で YOLO の検出データを取得し、リアルタイムでプレビュー表示する方法を紹介します。
LLM630 Compute Kit の UART / ADB / SSH 接続とデバッグ手順を参照し、ネットワーク設定とファイル転送方法を学び、デバイスの IP アドレスを取得します。
LLM630 Compute Kit ソフトウェア更新ガイドを参考に、以下のソフトウェアおよびモデルパッケージをインストールしてください。
apt install llm-camera llm-yolo # SoftWare Package
apt install llm-model-yolo11n-npu1 llm-model-yolo11n-pose-npu1 llm-model-yolo11n-hand-pose-npu1 # Model Package
クライアントスクリプトをダウンロードし、PC が LLM630 Compute Kit と同一ネットワークセグメント上にあることを確認してください。Python 環境を用意し、opencv-python
および tornado
を Pip でインストールします。
pip install opencv-python tornado
pip install opencv-python tornado -i https://mirrors.aliyun.com/pypi/simple # For Chinese users
以下のスクリプトをコピー&保存し、実行時にデバイスの IP アドレスを引数に渡します。
python llm-yolo-visual.py --host 192.168.20.24
import argparse
import base64
import cv2
import json
import numpy as np
import select
import socket
import sys
import time
import threading
import tornado.ioloop
import tornado.web
import platform
if platform.system() == "Windows":
import msvcrt
latest_frame = [None]
COCO_KP_PAIRS = [
(0, 2), (2, 4), (0, 1), (1, 3),
(6, 5), (6, 8), (8, 10), (5, 7),
(7, 9), (12, 11), (6, 12), (12, 14),
(14, 16), (5, 11), (11, 13), (13, 15)
]
COCO_COLORS = [
(255,0,0), (0,255,0), (0,0,255), (255,255,0),
(255,0,255), (0,255,255), (128,128,0), (128,0,128)
]
HAND_KP_PAIRS = [
(0, 1), (1, 2), (2, 3), (3, 4),
(0, 5), (5, 6), (6, 7), (7, 8),
(0, 17), (17, 18), (18, 19), (19, 20),
(5, 9), (9, 13), (13, 17),
(9, 10), (10, 11), (11, 12),
(13, 14), (14, 15), (15, 16)
]
HAND_COLORS = [
(255,0,0), (0,255,0), (0,0,255), (255,255,0),
(255,0,255), (0,255,255), (128,128,0), (128,0,128),
(0,128,128), (64,64,255), (255,64,64), (64,255,64)
]
class MJPEGHandler(tornado.web.RequestHandler):
def get(self):
self.set_header('Content-type', 'multipart/x-mixed-replace; boundary=frame')
while True:
if latest_frame[0] is not None:
ret, jpeg = cv2.imencode('.jpg', latest_frame[0])
if ret:
self.write(b'--frame\r\n')
self.write(b'Content-Type: image/jpeg\r\n\r\n')
self.write(jpeg.tobytes())
self.write(b'\r\n')
self.flush()
tornado.ioloop.IOLoop.current().add_callback(lambda: None) # yield to event loop
def start_webstream():
app = tornado.web.Application([
(r"/video_feed", MJPEGHandler),
])
app.listen(5000)
print("Tornado webstream started at http://localhost:5000/video_feed")
tornado.ioloop.IOLoop.current().start()
def create_tcp_connection(host, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
return sock
def send_json(sock, data):
json_data = json.dumps(data, ensure_ascii=False) + '\n'
sock.sendall(json_data.encode('utf-8'))
recv_buffer = ""
def receive_response(sock):
global recv_buffer
while '\n' not in recv_buffer:
part = sock.recv(4096).decode('utf-8')
if not part:
break
recv_buffer += part
if '\n' in recv_buffer:
line, recv_buffer = recv_buffer.split('\n', 1)
return line.strip()
else:
line, recv_buffer = recv_buffer, ""
return line.strip()
def close_connection(sock):
if sock:
sock.close()
def create_init_data(response_format, deivce, enoutput, frame_height, frame_width, enable_webstream, rtsp):
return {
"request_id": "camera_001",
"work_id": "camera",
"action": "setup",
"object": "camera.setup",
"data": {
"response_format": "image.yuvraw.base64" if response_format =="yuv" else "image.jpeg.base64",
"input": deivce,
"enoutput": enoutput,
"frame_width": frame_width,
"frame_height": frame_height,
"enable_webstream": enable_webstream,
"rtsp": "rtsp.1280x720.h265" if rtsp == "h265" else "rtsp.1280x720.h264",
}
}
def parse_setup_response(response_data):
error = response_data.get('error')
if error and error.get('code') != 0:
print(f"Error Code: {error['code']}, Message: {error['message']}")
return None
return response_data.get('work_id')
def reset(sock):
sent_request_id = 'reset_000'
reset_data = {
"request_id": sent_request_id,
"work_id": "sys",
"action": "reset"
}
ping_data = {
"request_id": "ping_000",
"work_id": "sys",
"action": "ping"
}
send_json(sock, reset_data)
while True:
try:
send_json(sock, ping_data)
time.sleep(1)
except (BrokenPipeError, ConnectionResetError, OSError) as e:
return # Sock disconnection indicates reset is complete
def setup(sock, init_data):
sent_request_id = init_data['request_id']
send_json(sock, init_data)
while True:
response = receive_response(sock)
response_data = json.loads(response)
if response_data.get('request_id') == sent_request_id:
return parse_setup_response(response_data)
def exit_session(sock, deinit_data):
send_json(sock, deinit_data)
print("Exit")
def parse_inference_response(response_data):
error = response_data.get('error')
if error and error.get('code') != 0:
print(f"Error Code: {error['code']}, Message: {error['message']}")
return None
return {
"work_id": response_data.get("work_id"),
"object": response_data.get("object"),
"data": response_data.get("data")
}
def parse_yolo_result(data):
results = []
for item in data:
bbox = [float(x) for x in item.get('bbox', [])]
kps = [float(x) for x in item.get('kps', [])]
cls = item.get('class', '')
conf = float(item.get('confidence', 0))
results.append({
'bbox': bbox,
'class': cls,
'confidence': conf,
'kps': kps
})
return results
def draw_keypoints(frame, kps, num_points, colors):
for i in range(num_points):
x, y, s = int(kps[i*3]), int(kps[i*3+1]), kps[i*3+2]
if s > 0.05:
cv2.circle(frame, (x, y), 3, colors[i % len(colors)], -1)
def draw_lines(frame, kps, pairs, colors):
for idx, (i, j) in enumerate(pairs):
xi, yi, si = int(kps[i*3]), int(kps[i*3+1]), kps[i*3+2]
xj, yj, sj = int(kps[j*3]), int(kps[j*3+1]), kps[j*3+2]
if si > 0.05 and sj > 0.05:
cv2.line(frame, (xi, yi), (xj, yj), colors[idx % len(colors)], 2)
def main(args):
sock = create_tcp_connection(args.host, args.port)
frame_height, frame_width = args.imgsz
try:
print("Reset...")
reset(sock)
close_connection(sock)
sock = create_tcp_connection(args.host, args.port)
print("Setup Camera...")
init_data = create_init_data(
response_format = args.format,
enoutput=args.enoutput,
deivce=args.device,
frame_height=frame_height,
frame_width=frame_width,
enable_webstream=args.webstream,
rtsp=args.rtsp
)
camera_work_id = setup(sock, init_data)
if camera_work_id is not None:
print(f"Camera setup with work_id: {camera_work_id}")
else:
print("Camera setup failed.")
return
print("Setup Yolo...")
yolo_init_data = {
"request_id": "yolo_001",
"work_id": "yolo",
"action": "setup",
"object": "yolo.setup",
"data": {
"model": args.model,
"response_format": "yolo.box",
"input": camera_work_id,
"enoutput": True,
}
}
yolo_work_id = setup(sock, yolo_init_data)
if yolo_work_id is not None:
print(f"Yolo setup with work_id: {yolo_work_id}")
else:
print("Yolo setup failed.")
return
yolo_results = []
webstream_thread = None
if args.webstream:
webstream_thread = threading.Thread(target=start_webstream, daemon=True)
webstream_thread.start()
while True:
if platform.system() == "Windows":
if msvcrt.kbhit():
key = msvcrt.getwch()
if key == 'q':
print("Quit by user.")
break
else:
if sys.stdin in select.select([sys.stdin], [], [], 0)[0]:
key = sys.stdin.readline().strip()
if key == 'q':
print("Quit by user.")
break
response = receive_response(sock)
if not response:
continue
response_data = json.loads(response)
Rawdata = parse_inference_response(response_data)
if Rawdata is None:
break
work_id = Rawdata.get("work_id")
object = Rawdata.get("object")
data = Rawdata.get("data")
if work_id == yolo_work_id and object == "yolo.box":
yolo_results = parse_yolo_result(data)
elif work_id == camera_work_id and object in ["image.jpeg.base64", "image.yuyv422.base64"]:
decoded = base64.b64decode(data)
if object == "image.yuyv422.base64" or args.format == "yuv":
yuv_frame = np.frombuffer(decoded, dtype=np.uint8).reshape((frame_height, frame_width, 2))
bgr_frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_YUY2)
else:
jpg_array = np.frombuffer(decoded, dtype=np.uint8)
bgr_frame = cv2.imdecode(jpg_array, cv2.IMREAD_COLOR)
if bgr_frame is not None:
if yolo_results:
for det in yolo_results:
x1, y1, x2, y2 = map(int, det['bbox'])
cls = det['class']
conf = det['confidence']
cv2.rectangle(bgr_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(
bgr_frame, f"{cls} {conf:.2f}", (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2
)
kps = det.get('kps', [])
if not kps:
continue
if args.model == "yolo11n-pose-npu1" and len(kps) == 17 * 3:
draw_keypoints(bgr_frame, kps, 17, COCO_COLORS)
draw_lines(bgr_frame, kps, COCO_KP_PAIRS, COCO_COLORS)
elif args.model == "yolo11n-hand-pose-npu1" and len(kps) == 21 * 3:
draw_keypoints(bgr_frame, kps, 21, HAND_COLORS)
draw_lines(bgr_frame, kps, HAND_KP_PAIRS, HAND_COLORS)
if args.webstream:
latest_frame[0] = bgr_frame.copy()
if args.host not in ["localhost", "127.0.0.1"]:
cv2.imshow("YOLO Detection", bgr_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cv2.destroyAllWindows()
exit_session(sock, {
"request_id": "yolo_exit",
"work_id": yolo_work_id,
"action": "exit"
})
exit_session(sock, {
"request_id": "camera_exit",
"work_id": camera_work_id,
"action": "exit"
})
time.sleep(3) # Allow time for the exit command to be processed
finally:
close_connection(sock)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="TCP Client to send JSON data.")
parser.add_argument("--host", type=str, default="localhost", help="Server hostname (default: localhost)")
parser.add_argument("--port", type=int, default=10001, help="Server port (default: 10001)")
parser.add_argument("--device", type=str, default="axera_single_sc850sl", help="Camera name, i.e. axera_single_sc850sl or /dev/video0")
parser.add_argument("--enoutput", type=bool, default=True, help="Whether to output image data")
parser.add_argument("--format", "--output-format", type=str, default="jpeg", help="Output image data format, i.e. jpeg or yuv")
parser.add_argument("--imgsz", "--img", "--img-size", nargs="+", type=int, default=[320, 320], help="image (h, w)")
parser.add_argument("--webstream", action="store_true", help="Enable webstream")
parser.add_argument("--rtsp", default="h264", help="rtsp output, i.e. h264 or h265")
parser.add_argument("--model", type=str, default="yolo11n-npu1", help="Model name, i.e. yolo11n-npu1 or yolo11n-pose-npu1, yolo11n-hand-pose-npu1")
args = parser.parse_args()
main(args)
/dev/video0
などを指定http://IP:8989/
でカメラ画像、http://IP:5000/video_feed
で検出結果を確認可能(IP は実際のものに置き換えてください)PC 画面にカメラ映像と検出結果が表示されます。下図のように表示され、「q」キーで終了できます。