pdf-icon

StackFlow AI Platform

Module LLM Applications

CV Vision Application

Vision Language Model (VLM)

Large Language Model (LLM)

Voice Assistant

AI Pyramid - Voice Assistant

AI Pyramid comes factory-preinstalled with a voice assistant demo implemented using the StackFlow framework. It supports single-turn conversations that can be awakened by saying 'Hℹ M Five' or by pressing a button.
The service is configured to start on boot via the /etc/rc.local file, and the startup script is located at /usr/local/m5stack/bin/AI_Pyramid_Demo.py.

Below is the source code of AI_Pyramid_Demo.py.

  • The default ASR model is sense-voice-small-10s-ax650, which supports automatic multilingual recognition including Chinese, English, Japanese, Cantonese, and Korean.
  • The default LLM model is qwen2.5-0.5B-Int4-ax650.
  • The default TTS model is melotts-en-default-ax650, which can be replaced with melotts-zh-cn-ax650, melotts-ja-jp-ax650, or melotts-es-es-ax650 to switch to Chinese, Japanese, Spanish, and other languages.

For more available models, see the Model List

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2026 M5Stack Technology CO LTD
#
# SPDX-License-Identifier: MIT
import argparse
import json
import socket
import uuid
from typing import Any, Dict, Optional

class TcpJsonClient:
    def __init__(self, host="127.0.0.1", port=10001, timeout=60.0):
        self.host = host
        self.port = port
        self.timeout = timeout
        self.sock: Optional[socket.socket] = None
        self._rx_buf = b""
    def connect(self):
        self.sock = socket.create_connection((self.host, self.port), timeout=self.timeout)
        self.sock.settimeout(None)
    def close(self):
        if self.sock:
            try:
                self.sock.close()
            finally:
                self.sock = None
    def send_json(self, payload: Dict[str, Any]):
        assert self.sock is not None
        data = (json.dumps(payload, ensure_ascii=False) + "\n").encode("utf-8")
        self.sock.sendall(data)
    def recv_json(self) -> Dict[str, Any]:
        assert self.sock is not None
        while True:
            if b"\n" in self._rx_buf:
                line, self._rx_buf = self._rx_buf.split(b"\n", 1)
                line = line.strip()
                if not line:
                    continue
                return json.loads(line.decode("utf-8", errors="replace"))
            chunk = self.sock.recv(4096)
            if not chunk:
                raise ConnectionError("Server closed connection")
            self._rx_buf += chunk
    def recv_json_stream(self):
        while True:
            yield self.recv_json()

def new_request_id() -> str:
    return str(uuid.uuid4())

def assert_ok(resp: Dict[str, Any]):
    err = resp.get("error") or {}
    code = err.get("code", 0)
    msg = err.get("message", "")
    if code != 0:
        raise RuntimeError(f"Server error code={code}, message={msg}")

def make_setup_kws(kws_text="HI PYRAMID") -> Dict[str, Any]:
    return {
        "request_id": new_request_id(),
        "work_id": "kws",
        "action": "setup",
        "object": "kws.setup",
        "data": {
            "model": "kws-ax650",
            "response_format": "kws.bool",
            "input": ["buttons_thread", "sys.pcm"],
            "enoutput": True,
            "kws": kws_text,
            "enwake_audio": True,
        },
    }

def make_setup_asr(kws_work_id: str) -> Dict[str, Any]:
    return {
        "request_id": new_request_id(),
        "work_id": "asr",
        "action": "setup",
        "object": "asr.setup",
        "data": {
            "model": "sense-voice-small-10s-ax650",
            "response_format": "asr.utf-8.stream",
            "input": [kws_work_id, "sys.pcm"],
            "enoutput": True,
        },
    }

def make_setup_llm(kws_work_id: str, asr_work_id: str) -> Dict[str, Any]:
    return {
        "request_id": new_request_id(),
        "work_id": "llm",
        "action": "setup",
        "object": "llm.setup",
        "data": {
            "model": "qwen2.5-0.5B-Int4-ax650",
            "response_format": "llm.utf-8.stream",
            "input": [kws_work_id, asr_work_id],
            "enoutput": True,
            "max_token_len": 256,
            "prompt": "You are a knowledgeable assistant capable of answering various questions and providing information. Respond exclusively in English and do not use any other language.",
        },
    }

def make_setup_melotts(kws_work_id: str, llm_work_id: str) -> Dict[str, Any]:
    return {
        "request_id": new_request_id(),
        "work_id": "melotts",
        "action": "setup",
        "object": "melotts.setup",
        "data": {
            "model": "melotts-en-default-ax650",
            "response_format": "sys.pcm",
            "input": [kws_work_id, llm_work_id],
            "enoutput": False,
        },
    }

def make_exit(work_id: str) -> Dict[str, Any]:
    return {"request_id": new_request_id(), "work_id": work_id, "action": "exit"}

def make_trigger(work_id: str) -> Dict[str, Any]:
    return {"request_id": new_request_id(), "work_id": work_id, "action": "trigger"}

class App:
    def __init__(self, host: str, port: int, kws_text: str):
        self.client = TcpJsonClient(host, port)
        self.kws_text = kws_text
        self.workids: Dict[str, Optional[str]] = {"kws": None, "asr": None, "llm": None, "melotts": None}
        self.created = []
        self._asr_text = ""
        self._llm_text = ""
    def _add_created(self, wid: Optional[str]):
        if wid and wid not in self.created:
            self.created.append(wid)
    def setup_pipeline(self):
        self.client.send_json(make_setup_kws(self.kws_text))
        resp = self.client.recv_json()
        assert_ok(resp)
        self.workids["kws"] = resp.get("work_id")
        self._add_created(self.workids["kws"])
        self.client.send_json(make_setup_asr(self.workids["kws"]))
        resp = self.client.recv_json()
        assert_ok(resp)
        self.workids["asr"] = resp.get("work_id")
        self._add_created(self.workids["asr"])
        self.client.send_json(make_setup_llm(self.workids["kws"], self.workids["asr"]))
        resp = self.client.recv_json()
        assert_ok(resp)
        self.workids["llm"] = resp.get("work_id")
        self._add_created(self.workids["llm"])
        self.client.send_json(make_setup_melotts(self.workids["kws"], self.workids["llm"]))
        resp = self.client.recv_json()
        assert_ok(resp)
        self.workids["melotts"] = resp.get("work_id")
        self._add_created(self.workids["melotts"])
    def handle_message(self, msg: Dict[str, Any]):
        assert_ok(msg)
        obj = msg.get("object")
        data = msg.get("data")
        if obj == "kws.bool" and data is True:
            self._asr_text = ""
            self._llm_text = ""
            print("\n[kws.trigger]")
            return
        if obj == "asr.utf-8.stream" and isinstance(data, dict):
            delta = data.get("delta", "")
            if delta:
                self._asr_text += delta
                print(delta, end="", flush=True)
            if data.get("finish") is True:
                print()
            return
        if obj == "llm.utf-8.stream" and isinstance(data, dict):
            delta = data.get("delta", "")
            if delta:
                self._llm_text += delta
                print(delta, end="", flush=True)
            if data.get("finish") is True:
                print()
            return
    def run(self):
        self.client.connect()
        try:
            self.setup_pipeline()
            for msg in self.client.recv_json_stream():
                self.handle_message(msg)
        finally:
            for wid in reversed(self.created):
                try:
                    self.client.send_json(make_exit(wid))
                except Exception:
                    pass
            self.client.close()

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--host", default="127.0.0.1")
    ap.add_argument("--port", type=int, default=10001)
    ap.add_argument("--kws", default="HI PYRAMID")
    args = ap.parse_args()
    App(args.host, args.port, args.kws).run()

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        pass

Parameter Description

  • host: The IP address of the AI Pyramid
  • port: TCP communication port, default is 10001
  • kws: Wake-up keyword. Note that when using the kws-ax650 model, the default wake word is 'HI M FIVE' and cannot be changed. Only llm-model-sherpa-onnx-kws-zipformer-gigaspeech-3.3m-2024-01-01 supports custom wake words.
    See Keyword Spotting for details.
On This Page