Введение

Часто разработчики запрашивают интеграцию устройств SOLT и предоставление API, но при этом избегают работы с последовательным портом из-за отсутствия опыта. Однако, на самом деле, работа с COM-портом довольно проста и интуитивно понятна. В некоторых случаях реализация собственного чтения данных в программе оказывается намного удобнее и эффективнее, чем настройка и поддержание работы промежуточного сервера с API.

Давайте разберемся, как это сделать – это займет всего 5-10 минут!

Кнопка или пульт вызова это радио передающее устройство , в качестве приемника - модема используется устройство SOLT SR5-MPRT. Модем SR5-MPRT - это устройство с интерфейсом RS232 выпускаемое в двух версиях со встроенным USB конвертером и интерфейсом подключения USB A и без - с интерфейсом подключения DB9. По своей сути это идентичные устройства с одинаковыми протоколами.

Каждое устройство (пульт , кнопка SOLT) имеет уникальный код, запрограммированный на заводе, который позволяет однозначно идентифицировать источник сигнала. Этот код называется remote_id и передается в пакете данных (Пример 41 31 42 32 43 33 ("A1B2C3") )

Помимо уникального идентификатора устройства, каждый пульт содержит одну или несколько кнопок, нажатие которых передает соответствующий код. Коды кнопок обозначают различные действия:

Кнопки вызова

Hex

Кнопки отмены вызова

Hex

1

0x14

2

0x24

3

0x28

4

0x34

5

0x38

6

0x44

7

0x48

8

0x54

9

0x58

10

0x64

11

0x68

12

0x74

13

0x78

14

0x84

15 - Отмена

0xE4

16 - Полная отмена всех вызовов

0xF4

Например пульт SB9-3XBK имеет 2 кнопки вызова 1 = 14 и 3 = 28 и одну кнопку Отмены 15 = E4

Пульт SB7-1PBK имеет только одну кнопку вызова 1 = 14

Описание протокола SOLT

Передача данных в протоколе SOLT осуществляется пакетами фиксированной длины 19 байт. Формат пакета следующий:

Байт

Назначение

Описание

1

Символ начала (STX)

Определяет начало пакета

2-3

Код нажатой кнопки

Два ASCII-символа, идентифицирующие кнопку

4

Разделитель -

Фиксированный разделитель

5-10

ID пульта

Уникальный идентификатор пульта (ASCII)

11

Разделитель -

Фиксированный разделитель

12-18

Пользовательские данные

ASCII-строка с идентификатором пользователя

19

Символ конца (ETX)

Завершение пакета

Пример пакета в шестнадцатеричном представлении:

02 31 34 2D 41 31 42 32 43 33 2D 31 32 33 34 35 36 37 03

Где:

  • 02 (STX) — начало пакета,
  • 31 34 ("14") — код кнопки вызова,
  • 2D ("-") — первый разделитель,
  • 41 31 42 32 43 33 ("A1B2C3") — ID пульта,
  • 2D ("-") — второй разделитель,
  • 31 32 33 34 35 36 37 ("1234567") — пользовательские данные (они указываются при регистрации кнопки в модеме SR5-MPRT
  • 03 (ETX) — символ конца пакета.

Пример программы на Python

Полный код приведен в конце статьи.

Установка необходимых библиотек

Перед запуском кода необходимо установить библиотеки для работы с COM-портом. Используйте следующую команду для установки:

pip install serial-asyncio

Эти библиотеки обеспечивают работу с последовательным портом как в асинхронном режиме.

Хранение переменных и настройки порта

В начале программы задаются константы для работы с последовательным портом:

# Настройки COM-порта
SERIAL_PORT =
"COM3"  # Укажите свой порт, например, "/dev/ttyUSB0" для Linux
BAUDRATE =
9600  # Скорость передачи данных
PACKET_LENGTH =
19  # Ожидаемая длина пакета

Эти параметры определяют порт, скорость передачи данных и ожидаемую длину пакета в байтах.

Класс SerialReader

Для обработки входящих данных используется класс SerialReader, который реализует методы:

  • data_received(self, data): принимает входящие данные, накапливает их в буфере и передает на разбор.
  • process_packet(self, data): разбирает полученный пакет.

class SerialReader(asyncio.Protocol):
   
def __init__(self):
       self.buffer = bytearray()

   
def data_received(self, data):
       
"""Обрабатываем входящие данные"""
       self.buffer.extend(data)
       
while len(self.buffer) >= PACKET_LENGTH:
           packet = self.buffer[:PACKET_LENGTH]
           self.buffer = self.buffer[PACKET_LENGTH:]
           self.process_packet(packet)  
# Передача данных на разбор

   
def process_packet(self, data):
       
"""Разбор пакета данных"""
       
if len(data) != PACKET_LENGTH:
           print(
"Неверная длина пакета")
           
return
       
       
# Извлекаем отдельные части пакета
       start_symbol = chr(data[
0])  # Первый байт - символ начала
       button_number =
''.join(chr(byte) for byte in data[1:3]).strip()  # 2-3 байты - номер кнопки
       separator1 = chr(data[
3])  # 4-й байт - разделитель
       remote_id =
''.join(chr(byte) for byte in data[4:10]).strip()  # 5-10 байты - ID пульта
       separator2 = chr(data[
10])  # 11-й байт - второй разделитель
       user_info =
''.join(chr(byte) for byte in data[11:18]).strip()  # 12-18 байты - пользовательские данные
       end_symbol = chr(data[
18])  # 19-й байт - символ конца
       
       parsed_data = (
           
f'Символ начала: {start_symbol}, Номер кнопки: {button_number}, '
           
f'ID пульта: {remote_id}, Пользовательские данные: {user_info}, '
           
f'Разделители: {separator1}, {separator2}, Символ конца: {end_symbol}'
       )
       print(parsed_data)

Теперь, когда разобраны основные настройки и класс обработки данных, можно переходить к процессу открытия порта и приема данных.

Открытие COM-порта и прием данных

В данном примере используется асинхронный метод работы с COM-портом с помощью библиотеки serial_asyncio. Такой подход позволяет:

  • Обрабатывать данные в реальном времени без блокировки основного потока.
  • Работать с несколькими подключениями или выполнять другие задачи параллельно.
  • Избегать зависания программы при ожидании данных.

Однако, если задача программы требует более простого или последовательного подхода, можно использовать обычный (синхронный) метод с библиотекой pyserial. Например, при необходимости обработки небольшого количества данных без фоновых процессов.

При запуске программы открывается последовательный порт с заданными параметрами:

async def main():
   print(
f'Открытие порта {SERIAL_PORT} со скоростью {BAUDRATE}')
   loop = asyncio.get_running_loop()
   transport, protocol =
await serial_asyncio.create_serial_connection(
       loop, SerialReader, SERIAL_PORT, BAUDRATE
   )
   
try:
       
await asyncio.Event().wait()  # Бесконечное ожидание
   
except asyncio.CancelledError:
       
pass
   
finally:
       transport.close()
       print(
"Порт закрыт")

Разбор пакета

При получении данных выполняется разбор пакета:

def process_packet(self, data):
   
if len(data) != PACKET_LENGTH:
       print(
"Неверная длина пакета")
       
return
   
   
# Извлекаем отдельные части пакета
   start_symbol = chr(data[
0])  # Первый байт - символ начала
   button_number =
''.join(chr(byte) for byte in data[1:3]).strip()  # 2-3 байты - номер кнопки
   separator1 = chr(data[
3])  # 4-й байт - разделитель
   remote_id =
''.join(chr(byte) for byte in data[4:10]).strip()  # 5-10 байты - ID пульта
   separator2 = chr(data[
10])  # 11-й байт - второй разделитель
   user_info =
''.join(chr(byte) for byte in data[11:18]).strip()  # 12-18 байты - пользовательские данные
   end_symbol = chr(data[
18])  # 19-й байт - символ конца
   
   parsed_data = (
       
f'Символ начала: {start_symbol}, Номер кнопки: {button_number}, '
       
f'ID пульта: {remote_id}, Пользовательские данные: {user_info}, '
       
f'Разделители: {separator1}, {separator2}, Символ конца: {end_symbol}'
   )
   print(parsed_data)

Полный код примера

import asyncio
import serial_asyncio

# Настройки COM-порта
SERIAL_PORT =
"COM3"  # Укажите свой порт, например, "/dev/ttyUSB0" для Linux
BAUDRATE =
9600  # Скорость передачи данных
PACKET_LENGTH =
19  # Ожидаемая длина пакета

class SerialReader(asyncio.Protocol):
   
def __init__(self):
       self.buffer = bytearray()

   
def data_received(self, data):
       
"""Обрабатываем входящие данные"""
       self.buffer.extend(data)
       
while len(self.buffer) >= PACKET_LENGTH:
           packet = self.buffer[:PACKET_LENGTH]
           self.buffer = self.buffer[PACKET_LENGTH:]
           self.process_packet(packet)  
# Теперь передаем данные в метод

   
def process_packet(self, data):
       
"""Разбор пакета данных"""
       
if len(data) != PACKET_LENGTH:
           print(
"Неверная длина пакета")
           
return
       
       
# Извлекаем отдельные части пакета
       start_symbol = chr(data[
0])  # Первый байт - символ начала
       button_number =
''.join(chr(byte) for byte in data[1:3]).strip()  # 2-3 байты - номер кнопки
       separator1 = chr(data[
3])  # 4-й байт - разделитель
       remote_id =
''.join(chr(byte) for byte in data[4:10]).strip()  # 5-10 байты - ID пульта
       separator2 = chr(data[
10])  # 11-й байт - второй разделитель
       user_info =
''.join(chr(byte) for byte in data[11:18]).strip()  # 12-18 байты - пользовательские данные
       end_symbol = chr(data[
18])  # 19-й байт - символ конца
       
       parsed_data = (
           
f'Символ начала: {start_symbol}, Номер кнопки: {button_number}, '
           
f'ID пульта: {remote_id}, Пользовательские данные: {user_info}, '
           
f'Разделители: {separator1}, {separator2}, Символ конца: {end_symbol}'
       )
       print(parsed_data)

async def main():
   
"""Запуск асинхронного чтения из COM-порта"""
   print(
f'Открытие порта {SERIAL_PORT} со скоростью {BAUDRATE}')
   loop = asyncio.get_running_loop()
   transport, protocol =
await serial_asyncio.create_serial_connection(
       loop, SerialReader, SERIAL_PORT, BAUDRATE
   )
   
try:
       
await asyncio.Event().wait()  # Бесконечное ожидание
   
except asyncio.CancelledError:
       
pass
   
finally:
       transport.close()
       print(
"Порт закрыт")

if __name__ == "__main__":
   asyncio.run(main())

Мы используем cookie-файлы. Cookie помогают нам обеспечивать корректную работу сайта, проводить ретаргетинг, а также собирать статистику и отзывы для улучшения сервиса.
Принять все Подробнее Выбрать
Обратите внимание: ограничение использования cookie может повлиять на работу отдельных функций сайта. Мы рекомендуем отключать их только опытным пользователям.