Module swarm.serialhandler

Expand source code
import asyncio
from logging import Logger
from asyncio.locks import Lock
import serial


class SerialHandler:
    def __init__(self, port: str, logger: Logger):
        self.logger = logger
        self.ser = serial.Serial(port, 115200, timeout=5)
        # self.ser.set_buffer_size(rx_size=1024, tx_size=1024)
        self.lock = Lock()
        self.message_queue = asyncio.Queue()

    def try_reboot(self):
        self.ser.write(b"startCLI\r\n")
        while self.ser.in_waiting <= 0:
            pass

        self.logger.info("Message from the ftSwarm")
        while True:
            message = self.ser.read_until(serial.LF)
            # noinspection PyBroadException
            try:
                self.logger.info("- " + message.decode("UTF-8").removesuffix("\n".removesuffix("\r")))
                if "@@@ ftSwarmOS CLI started" in message.decode("UTF-8").removesuffix("\n").removesuffix("\r"):
                    break
            except:
                pass

        self.ser.read_all()

    async def send_and_wait(self, cmd: str, wait_for_return=True):
        async with self.lock:
            return await self._send_and_wait(cmd, wait_for_return)

    async def _send_and_wait(self, cmd: str, wait_for_return):
        if not self.ser.is_open:
            raise serial.SerialException("Serial port is not open")

        self.logger.debug("Swarm <- " + cmd)
        self.ser.write(cmd.encode("UTF-8") + b"\r\n")

        if not wait_for_return:
            return

        while True:
            message = await self._get_message(queue=False)  # Queueing is only for building the cli backlog
            if message is None:
                await asyncio.sleep(0.01)
                continue

            if message.startswith("R: "):
                return message[3:]
            else:
                self.message_queue.put_nowait(message)

    async def get_message(self) -> str | None:
        async with self.lock:
            return await self._get_message()

    async def _get_message(self, queue=True) -> str | None:
        if not self.ser.is_open:
            raise serial.SerialException("Serial port is not open")

        if self.message_queue.qsize() > 0 and queue:
            return self.message_queue.get_nowait()  # Prioritize queue

        if self.ser.in_waiting <= 0:
            return

        # Get message
        rest = self.ser.read_until(serial.LF)
        message = rest.removesuffix(b"\n").removesuffix(b"\r").decode("UTF-8")
        self.logger.debug("Swarm -> " + message)

        return message

    def close(self):
        self.ser.close()

Classes

class SerialHandler (port: str, logger: logging.Logger)
Expand source code
class SerialHandler:
    def __init__(self, port: str, logger: Logger):
        self.logger = logger
        self.ser = serial.Serial(port, 115200, timeout=5)
        # self.ser.set_buffer_size(rx_size=1024, tx_size=1024)
        self.lock = Lock()
        self.message_queue = asyncio.Queue()

    def try_reboot(self):
        self.ser.write(b"startCLI\r\n")
        while self.ser.in_waiting <= 0:
            pass

        self.logger.info("Message from the ftSwarm")
        while True:
            message = self.ser.read_until(serial.LF)
            # noinspection PyBroadException
            try:
                self.logger.info("- " + message.decode("UTF-8").removesuffix("\n".removesuffix("\r")))
                if "@@@ ftSwarmOS CLI started" in message.decode("UTF-8").removesuffix("\n").removesuffix("\r"):
                    break
            except:
                pass

        self.ser.read_all()

    async def send_and_wait(self, cmd: str, wait_for_return=True):
        async with self.lock:
            return await self._send_and_wait(cmd, wait_for_return)

    async def _send_and_wait(self, cmd: str, wait_for_return):
        if not self.ser.is_open:
            raise serial.SerialException("Serial port is not open")

        self.logger.debug("Swarm <- " + cmd)
        self.ser.write(cmd.encode("UTF-8") + b"\r\n")

        if not wait_for_return:
            return

        while True:
            message = await self._get_message(queue=False)  # Queueing is only for building the cli backlog
            if message is None:
                await asyncio.sleep(0.01)
                continue

            if message.startswith("R: "):
                return message[3:]
            else:
                self.message_queue.put_nowait(message)

    async def get_message(self) -> str | None:
        async with self.lock:
            return await self._get_message()

    async def _get_message(self, queue=True) -> str | None:
        if not self.ser.is_open:
            raise serial.SerialException("Serial port is not open")

        if self.message_queue.qsize() > 0 and queue:
            return self.message_queue.get_nowait()  # Prioritize queue

        if self.ser.in_waiting <= 0:
            return

        # Get message
        rest = self.ser.read_until(serial.LF)
        message = rest.removesuffix(b"\n").removesuffix(b"\r").decode("UTF-8")
        self.logger.debug("Swarm -> " + message)

        return message

    def close(self):
        self.ser.close()

Methods

def close(self)
Expand source code
def close(self):
    self.ser.close()
async def get_message(self) ‑> str | None
Expand source code
async def get_message(self) -> str | None:
    async with self.lock:
        return await self._get_message()
async def send_and_wait(self, cmd: str, wait_for_return=True)
Expand source code
async def send_and_wait(self, cmd: str, wait_for_return=True):
    async with self.lock:
        return await self._send_and_wait(cmd, wait_for_return)
def try_reboot(self)
Expand source code
def try_reboot(self):
    self.ser.write(b"startCLI\r\n")
    while self.ser.in_waiting <= 0:
        pass

    self.logger.info("Message from the ftSwarm")
    while True:
        message = self.ser.read_until(serial.LF)
        # noinspection PyBroadException
        try:
            self.logger.info("- " + message.decode("UTF-8").removesuffix("\n".removesuffix("\r")))
            if "@@@ ftSwarmOS CLI started" in message.decode("UTF-8").removesuffix("\n").removesuffix("\r"):
                break
        except:
            pass

    self.ser.read_all()