From 5ae313defd435d182c50bf465b16ea34b8e7517f Mon Sep 17 00:00:00 2001 From: Artem Kashaev Date: Thu, 22 Jan 2026 10:30:31 +0500 Subject: [PATCH] Add configuration for semaphore signals and implement railway signal control --- config.json | 66 ++++++--- double_ir.py | 14 ++ ir_test.py | 83 +++-------- led_panel_blink.py | 2 +- main.py | 113 +++++++++++---- railway_signal.py | 323 ++++++++++++++++++++++++++++++++++++++++++ semaphore-config.json | 16 +++ test_lighter_main.py | 132 +++++++++++++++++ 8 files changed, 638 insertions(+), 111 deletions(-) create mode 100644 double_ir.py create mode 100644 railway_signal.py create mode 100644 semaphore-config.json create mode 100644 test_lighter_main.py diff --git a/config.json b/config.json index 7fdd93d..29eed62 100644 --- a/config.json +++ b/config.json @@ -3,38 +3,66 @@ { "id": 1283, "pin": 12, - "angle_minus": 70, - "angle_plus": 125 + "angle_minus": 115, + "angle_plus": 65 }, - { + { "id": 1272, "pin": 13, - "angle_minus": 65, - "angle_plus": 125 + "angle_minus": 115, + "angle_plus": 65 }, - { + { "id": 1444, "pin": 2, - "angle_minus": 125, + "angle_minus": 115, "angle_plus": 65 }, - { + { "id": 1274, "pin": 3, - "angle_minus": 125, + "angle_minus": 115, "angle_plus": 65 - } - ], - "seminsus": [ - { - "id": 6000, - "pin_rx": 16, - "pin_tx": 6 }, { - "id": 6001, - "pin_rx": 17, - "pin_tx": 7 + "id": 1457, + "pin": 4, + "angle_minus": 65, + "angle_plus": 115 + } + ], + "irs": [ + { + "id": 7000, + "pin": 16 + }, + { + "id": 7001, + "pin": 17 + }, + { + "id": 7002, + "pin": 18 + }, + { + "id": 7003, + "pin": 19 + }, + { + "id": 7004, + "pin": 20 + }, + { + "id": 7005, + "pin": 21 + }, + { + "id": 7006, + "pin": 22 + }, + { + "id": 7007, + "pin": 26 } ] } \ No newline at end of file diff --git a/double_ir.py b/double_ir.py new file mode 100644 index 0000000..ea47097 --- /dev/null +++ b/double_ir.py @@ -0,0 +1,14 @@ +from machine import Pin + + +class DoubleIr: + def __init__(self, id: int, pin: Pin): + self.id = id + self.pin = pin + self.last_state = pin.value() + + def check(self): + val = self.pin.value() + if val != self.last_state: + self.last_state = val + print(f"EVENT IK_MODULE {self.id} {val}") \ No newline at end of file diff --git a/ir_test.py b/ir_test.py index 19a88e8..7286480 100644 --- a/ir_test.py +++ b/ir_test.py @@ -1,71 +1,22 @@ from machine import Pin -from time import ticks_ms, ticks_diff +from time import sleep_ms -from ir_pair import IRRxTxPollPair +# pin1 = Pin(16, Pin.IN) +pin2 = Pin(17, Pin.IN) +class Beam: + def __init__(self, id: int, pin: Pin): + self.id = id + self.pin = pin + self.prev_val = pin.value() -# --- Настройки пинов --- -# RX_PIN: выход ИК-приёмника (обычно TTL сигнал с модуля VS1838/TSOP*) -# TX_PIN: пин ИК-светодиода/ключа (на него подаётся 38кГц PWM во время окна опроса) -RX_PIN = 16 -TX_PIN = 6 + def check(self): + val = self.pin.value() + if val != self.prev_val: + self.prev_val = val + print(f"EVENT IK_MODULE {self.id} {val}") - -# --- Тайминги опроса --- -POLL_PERIOD_MS = 500 # период опроса (как часто проверяем) -TX_ON_MS = 50 # сколько держим 38кГц включённым в каждом цикле -STATUS_PERIOD_MS = 1000 - - -# --- Условия теста --- -# N влияет сразу на: -# - сколько раз "мигнуть" ИК-передатчиком за 1 цикл опроса -# - сколько фронтов (edges) нужно увидеть, чтобы считать луч НЕ перекрытым -N = 5 - - -LED = Pin("LED", Pin.OUT) - - -def main(): - pair = IRRxTxPollPair( - rx_pin=RX_PIN, - tx_pin=TX_PIN, - poll_period_ms=POLL_PERIOD_MS, - tx_on_ms=TX_ON_MS, - blinks_per_poll=N, - blink_off_ms=5, - freq_hz=38_000, - duty_percent=33, - min_edges=N, - # Считаем только FALLING: обычно на каждом включении carrier RX даёт 1 спад. - # Тогда 5 миганий ~= 5 edges (а при подсчёте обоих фронтов было бы ~10). - count_rising=False, - count_falling=True, - ) - - last_status = ticks_ms() - - print("IR poll test started") - print("RX pin:", RX_PIN, "TX pin:", TX_PIN) - print("poll_period_ms:", POLL_PERIOD_MS, "tx_on_ms:", TX_ON_MS) - print("N:", N, "(blinks_per_poll and min_edges)") - - try: - while True: - pair.poll_once() # результат и счетчики сохраняются внутри pair - LED.toggle() - - now = ticks_ms() - if ticks_diff(now, last_status) >= STATUS_PERIOD_MS: - seen = bool(pair.last_seen) - print( - ("BEAM NOT BLOCKED" if seen else "BEAM BLOCKED") - + " | edges=%d" % pair.last_edges - ) - last_status = now - finally: - pair.deinit() - - -main() +beam = Beam(1, pin2) +while True: + beam.check() + sleep_ms(50) \ No newline at end of file diff --git a/led_panel_blink.py b/led_panel_blink.py index 5d14120..af9ad3b 100644 --- a/led_panel_blink.py +++ b/led_panel_blink.py @@ -5,7 +5,7 @@ import random # Настройки: 8 LED, подключено к GP6 num_leds = 8 -pin = 6 # GP6 +pin = 0 # GP6 np = neopixel.NeoPixel(Pin(pin), num_leds) diff --git a/main.py b/main.py index 55b6b56..a87370c 100644 --- a/main.py +++ b/main.py @@ -2,35 +2,40 @@ import sys from switch import Switch import select from machine import Pin -from time import sleep_ms +from time import sleep_ms, ticks_diff, ticks_ms from ir_pair import IRRxTxPollPair +from double_ir import DoubleIr -SEMINSU = dict() +SEMINSU_V1 = dict() +SEMINSU_V2 = dict() SWITCHES = dict() LED = Pin("LED", Pin.OUT) # "LED" — специальное имя для встроенного индикатора - def load_switches(): with open("config.json", "r") as file: import json + config = json.load(file) for sw_cfg in config["switches"]: sw = Switch( id=sw_cfg["id"], pin=sw_cfg["pin"], angle_minus=sw_cfg["angle_minus"], - angle_plus=sw_cfg["angle_plus"] + angle_plus=sw_cfg["angle_plus"], ) SWITCHES[sw.id] = sw -def load_seminsus(): +def load_seminsus_v1(): with open("config.json", "r") as file: import json + config = json.load(file) - for sem_cfg in config["seminsus"]: + if "seminsus_v1" not in config: + return + for sem_cfg in config["seminsus_v1"]: seminsu = IRRxTxPollPair( rx_pin=sem_cfg["pin_rx"], tx_pin=sem_cfg["pin_tx"], @@ -44,8 +49,20 @@ def load_seminsus(): count_rising=False, count_falling=True, ) - SEMINSU[sem_cfg["id"]] = seminsu - + SEMINSU_V1[sem_cfg["id"]] = seminsu + + +def load_seminsus_v2(): + with open("config.json", "r") as file: + import json + + config = json.load(file) + if "irs" not in config: + return + for sem_cfg in config["irs"]: + pin = Pin(sem_cfg["pin"], Pin.IN) + seminsu = DoubleIr(sem_cfg["id"], pin) + SEMINSU_V2[sem_cfg["id"]] = seminsu def resolve_command(command: str): @@ -75,7 +92,9 @@ def resolve_command(command: str): evts = [] for id, sw in SWITCHES.items(): evts.append(f"EVENT SWITCH {id} {sw.pos}") - for id, seminsu in SEMINSU.items(): + for id, seminsu in SEMINSU_V1.items(): + evts.append(f"EVENT IK_MODULE {id} {seminsu.last_state}") + for id, seminsu in SEMINSU_V2.items(): evts.append(f"EVENT IK_MODULE {id} {seminsu.last_state}") return "\n".join(evts) @@ -95,31 +114,65 @@ def _next_seminsu_id(): def poll_seminsus_step(): - """Неблокирующий шаг опроса seminsu. + """Неблокирующий шаг опроса IK_MODULE (seminsus_v1 + seminsu_v2). - В каждый момент времени опрашивается только ОДНА пара. + Идём round-robin по объединению ID из SEMINSU_V1 и SEMINSU_V2. + ID не пересекаются, поэтому тип выбирается по наличию в словаре. + + Для v1 сохраняется текущая неблокирующая логика: start_poll()/update() + и печать события только при изменении состояния. + + Для v2 вызывается check() (печать события внутри check() при изменении). """ global _active_seminsu_id - if not SEMINSU: + if not SEMINSU_V1 and not SEMINSU_V2: return - if _active_seminsu_id is None: - _active_seminsu_id = _next_seminsu_id() - if _active_seminsu_id is None: + # Если есть активный v1-цикл — продолжаем его до завершения. + if _active_seminsu_id is not None: + seminsu = SEMINSU_V1.get(_active_seminsu_id) + if seminsu is None: + _active_seminsu_id = None return - SEMINSU[_active_seminsu_id].start_poll() - seminsu = SEMINSU[_active_seminsu_id] + done = seminsu.update() + if not done: + return + + # Цикл завершён — печатаем при изменении состояния. + if seminsu.prev_state is not None and seminsu.last_state is not None: + if seminsu.prev_state != seminsu.last_state: + # state: 1 = перекрыт, 0 = не перекрыт + print(f"EVENT IK_MODULE {_active_seminsu_id} {seminsu.last_state}") + + _active_seminsu_id = None + return + + # Берём следующий ID из объединённого round-robin. + sid = _next_seminsu_id() + if sid is None: + return + + if sid in SEMINSU_V2: + SEMINSU_V2[sid].check() + return + + # Иначе — v1. + seminsu = SEMINSU_V1.get(sid) + if seminsu is None: + return + _active_seminsu_id = sid + seminsu.start_poll() + done = seminsu.update() if not done: return - # Цикл завершён — печатаем при изменении состояния. + # Быстрый случай: цикл успел завершиться в этом же шаге. if seminsu.prev_state is not None and seminsu.last_state is not None: if seminsu.prev_state != seminsu.last_state: - # state: 1 = перекрыт, 0 = не перекрыт - print(f"EVENT IK_MODULE {_active_seminsu_id} {seminsu.last_state}") + print(f"EVENT IK_MODULE {sid} {seminsu.last_state}") _active_seminsu_id = None @@ -128,14 +181,22 @@ def work(): poll = select.poll() poll.register(sys.stdin, select.POLLIN) - # Готовим список id seminsu для round-robin. + # Мигание встроенного светодиода: полный период 1 секунда. + # toggle() каждые 500 мс => 0.5с ON + 0.5с OFF. + LED.value(0) + last_led_toggle_ms = ticks_ms() + + # Готовим список id IK_MODULE для round-robin (v1 + v2). global _seminsu_ids, _seminsu_idx, _active_seminsu_id - _seminsu_ids = list(SEMINSU.keys()) + _seminsu_ids = sorted(list(SEMINSU_V1.keys()) + list(SEMINSU_V2.keys())) _seminsu_idx = 0 _active_seminsu_id = None while True: - LED.toggle() + now_ms = ticks_ms() + if ticks_diff(now_ms, last_led_toggle_ms) >= 500: + LED.toggle() + last_led_toggle_ms = now_ms # 1) Обработка stdin НЕ блокирует цикл seminsu. events = poll.poll(0) @@ -159,7 +220,9 @@ def work(): # Небольшая пауза, чтобы не крутить CPU на 100%. sleep_ms(1) + if __name__ == "__main__": load_switches() - load_seminsus() - work() \ No newline at end of file + load_seminsus_v1() + load_seminsus_v2() + work() diff --git a/railway_signal.py b/railway_signal.py new file mode 100644 index 0000000..47bade4 --- /dev/null +++ b/railway_signal.py @@ -0,0 +1,323 @@ +import machine +import neopixel +import time + +try: + import micropython +except ImportError: # CPython / non-MicroPython + micropython = None # type: ignore + +LAMP_COLORS = { + # Имена цветов -> RGBA (для RGBW ленты) + "OFF": (0, 0, 0, 0), + "RED": (100, 0, 0, 0), + "YELLOW": (100, 45, 0, 0), + "GREEN": (0, 100, 0, 0), + "BLUE": (0, 0, 100, 0), + "WHITE": (0, 0, 0, 100), + "MOON_WHITE": (0, 0, 0, 100), +} + + +class Lamp: + """Одна лампа светофора (один пиксель). + + 3 режима: + - off: выключено + - on: постоянно горит + - blink: мигает (переключение ON/OFF с периодом period_ms) + """ + + MODE_OFF = "off" + MODE_ON = "on" + MODE_BLINK = "blink" + + def __init__(self, signal, lamp_id: int, pixel_index: int, rgba): + self._signal = signal + self.id = lamp_id + self.pixel = pixel_index + self.rgba = rgba + + self.mode = Lamp.MODE_OFF + self._blink_period_ms = 500 + self._blink_on = False + self._next_toggle_ms = 0 + + def off(self): + self._signal._set_lamp_mode(self, Lamp.MODE_OFF) + + def on(self): + self._signal._set_lamp_mode(self, Lamp.MODE_ON) + + def blink(self, period_ms: int = 500): + self._signal._set_lamp_mode(self, Lamp.MODE_BLINK, period_ms=period_ms) + + +class RailwaySignal: + """Контейнер ламп светофора. + + Хранит NeoPixel и набор ламп (каждая лампа — один пиксель). + Мигающие лампы обслуживаются общим Timer. + """ + + def __init__( + self, + pin: int, + num_leds: int, + *, + bpp: int = 4, + timing: int = 1, + colors: dict | None = None, + timer_id: int = -1, + timer_tick_ms: int = 50, + brightness: float = 0.5, + ): + self.pin = pin + self.num_leds = num_leds + self.bpp = bpp + self.timing = timing + + self.colors = colors if colors is not None else LAMP_COLORS + + self.brightness = float(brightness) + if self.brightness < 0.0: + self.brightness = 0.0 + if self.brightness > 1.0: + self.brightness = 1.0 + + self._timer_id = timer_id + self._timer_tick_ms = int(timer_tick_ms) + self._timer = None + self._timer_running = False + self._scheduled = False + + self._lamps_by_id = {} + + self._np = neopixel.NeoPixel( + machine.Pin(pin), + num_leds, + bpp=bpp, + timing=timing, + ) + + self.clear() + + def set_brightness(self, brightness: float): + """Устанавливает глобальный коэффициент яркости (0.0..1.0).""" + + self.brightness = float(brightness) + if self.brightness < 0.0: + self.brightness = 0.0 + if self.brightness > 1.0: + self.brightness = 1.0 + + # Перерисовываем текущее состояние ламп + for lamp in self._lamps_by_id.values(): + self._apply_lamp(lamp) + self.show() + + def _scale(self, value: int) -> int: + value = int(value) + if value <= 0: + return 0 + scaled = int(value * self.brightness) + if scaled < 0: + return 0 + if scaled > 255: + return 255 + return scaled + + def _normalize_rgba(self, rgba: tuple | list): + """Приводит цвет к формату (r,g,b,w) с учётом self.bpp. + + - для bpp=4: всегда (r,g,b,w) + - для bpp=3: W-канал маппится в RGB только если RGB=0 (белый) + """ + + if len(rgba) == 3: + r, g, b = rgba + w = 0 + else: + r, g, b, w = rgba + + r = int(r) + g = int(g) + b = int(b) + w = int(w) + + if self.bpp == 3: + # На RGB-ленте нет W-канала. Самый ожидаемый кейс — "WHITE"/"MOON_WHITE", + # которые заданы как (0,0,0,w). Маппим их в (w,w,w). + if r == 0 and g == 0 and b == 0 and w > 0: + r = w + g = w + b = w + w = 0 + + return (r, g, b, w) + + def add_lamp(self, lamp_id: int, pixel_index: int, color: str | tuple | list): + if pixel_index < 0 or pixel_index >= self.num_leds: + raise ValueError("pixel_index out of range") + + rgba = None + if isinstance(color, str): + rgba = self.colors.get(color.upper()) + elif isinstance(color, (tuple, list)): + rgba = color + + if rgba is None: + raise ValueError("Unknown color") + + rgba = self._normalize_rgba(rgba) + + lamp = Lamp(self, lamp_id, pixel_index, rgba) + self._lamps_by_id[lamp_id] = lamp + lamp.off() + return lamp + + def lamp(self, lamp_id: int): + return self._lamps_by_id.get(lamp_id) + + def _stop_timer(self): + if self._timer is None or not self._timer_running: + return + try: + self._timer.deinit() + except Exception: + pass + self._timer_running = False + + def _ensure_timer(self): + if self._timer_running: + return + + try: + self._timer = machine.Timer(self._timer_id) + except Exception: + # Some ports require Timer(0/1/2...), some allow -1. + self._timer = machine.Timer(0) + + try: + self._timer.init(period=self._timer_tick_ms, mode=machine.Timer.PERIODIC, callback=self._timer_cb) + self._timer_running = True + except Exception: + self._timer_running = False + + def _now_ms(self) -> int: + ticks_ms = getattr(time, "ticks_ms", None) + if ticks_ms is not None: + return int(ticks_ms()) + return int(time.time() * 1000) + + def _ticks_diff(self, a: int, b: int) -> int: + ticks_diff = getattr(time, "ticks_diff", None) + if ticks_diff is not None: + return int(ticks_diff(a, b)) + return int(a - b) + + def show(self): + self._np.write() + + def clear(self): + self._stop_timer() + self._scheduled = False + if self.bpp == 3: + self._np.fill((0, 0, 0)) + else: + self._np.fill((0, 0, 0, 0)) + self.show() + + def set_pixel_rgba(self, index: int, r: int, g: int, b: int, w: int = 0): + if self.bpp == 3: + self._np[index] = (self._scale(r), self._scale(g), self._scale(b)) + else: + self._np[index] = ( + self._scale(r), + self._scale(g), + self._scale(b), + self._scale(w), + ) + + def fill_rgba(self, r: int, g: int, b: int, w: int = 0): + if self.bpp == 3: + self._np.fill((self._scale(r), self._scale(g), self._scale(b))) + else: + self._np.fill((self._scale(r), self._scale(g), self._scale(b), self._scale(w))) + + def _apply_lamp(self, lamp: Lamp): + if lamp.mode == Lamp.MODE_ON: + r, g, b, w = lamp.rgba + self.set_pixel_rgba(lamp.pixel, r, g, b, w) + elif lamp.mode == Lamp.MODE_BLINK: + if lamp._blink_on: + r, g, b, w = lamp.rgba + self.set_pixel_rgba(lamp.pixel, r, g, b, w) + else: + self.set_pixel_rgba(lamp.pixel, 0, 0, 0, 0) + else: + self.set_pixel_rgba(lamp.pixel, 0, 0, 0, 0) + + def _any_blinking(self) -> bool: + for lamp in self._lamps_by_id.values(): + if lamp.mode == Lamp.MODE_BLINK: + return True + return False + + def _set_lamp_mode(self, lamp: Lamp, mode: str, *, period_ms: int = 500): + if mode == Lamp.MODE_BLINK: + lamp.mode = Lamp.MODE_BLINK + lamp._blink_period_ms = int(period_ms) + lamp._blink_on = False # стартуем с OFF как раньше + lamp._next_toggle_ms = self._now_ms() + lamp._blink_period_ms + elif mode == Lamp.MODE_ON: + lamp.mode = Lamp.MODE_ON + lamp._blink_on = False + else: + lamp.mode = Lamp.MODE_OFF + lamp._blink_on = False + + self._apply_lamp(lamp) + self.show() + + if self._any_blinking(): + self._ensure_timer() + else: + self._stop_timer() + + def _timer_cb(self, _t): + # IRQ context: keep it tiny. + if not self._any_blinking(): + return + + if micropython is not None: + if self._scheduled: + return + try: + self._scheduled = True + micropython.schedule(self._scheduled_tick, 0) + except Exception: + self._scheduled = False + + def _scheduled_tick(self, _arg): + self._scheduled = False + + now_ms = self._now_ms() + dirty = False + + for lamp in self._lamps_by_id.values(): + if lamp.mode != Lamp.MODE_BLINK: + continue + if self._ticks_diff(now_ms, lamp._next_toggle_ms) < 0: + continue + + lamp._blink_on = not lamp._blink_on + lamp._next_toggle_ms = now_ms + lamp._blink_period_ms + self._apply_lamp(lamp) + dirty = True + + if dirty: + self.show() + + if not self._any_blinking(): + self._stop_timer() diff --git a/semaphore-config.json b/semaphore-config.json new file mode 100644 index 0000000..b296a25 --- /dev/null +++ b/semaphore-config.json @@ -0,0 +1,16 @@ +{ + "semaphores": [ + { + "id": 1, + "pin": 15, + "bpp": 4, + "lamps": [ + {"color": "RED", "id": 101}, + {"color": "YELLOW", "id": 102}, + {"color": "GREEN", "id": 103}, + {"color": "MOON_WHITE", "id": 104}, + {"color": "BLUE", "id": 105} + ] + } + ] +} \ No newline at end of file diff --git a/test_lighter_main.py b/test_lighter_main.py new file mode 100644 index 0000000..8eee988 --- /dev/null +++ b/test_lighter_main.py @@ -0,0 +1,132 @@ +import sys +import time + +try: + import ujson as json # MicroPython +except ImportError: + import json + +from railway_signal import RailwaySignal + + +def _sleep_ms(ms: int): + sleep_ms = getattr(time, "sleep_ms", None) + if sleep_ms is not None: + sleep_ms(ms) + else: + time.sleep(ms / 1000) + + +def load_config(): + # Поддержка обоих имён (в проекте встречаются оба варианта) + for name in ("semaphore-config.json", "semaphor-config.json", "semaphor-config.json"): + try: + with open(name, "r") as f: + return json.load(f) + except OSError: + continue + raise OSError("Config not found: semaphore-config.json") + + +def init_semaphores(cfg: dict): + semaphores = {} + lamps_by_id = {} + + for sem in cfg.get("semaphores", []): + sem_id = sem.get("id") + pin = sem.get("pin") + bpp = sem.get("bpp", 4) + lamps = sem.get("lamps", []) + + if sem_id is None or pin is None: + continue + + signal = RailwaySignal(pin=int(pin), num_leds=len(lamps), bpp=int(bpp), timing=1) + semaphores[int(sem_id)] = signal + + # Маппинг: порядок ламп в конфиге -> индекс пикселя + for pixel_index, lamp_cfg in enumerate(lamps): + lamp_id = lamp_cfg.get("id") + color = lamp_cfg.get("color", "OFF") + if lamp_id is None: + continue + + lamp = signal.add_lamp(int(lamp_id), pixel_index, str(color)) + lamps_by_id[int(lamp_id)] = lamp + + return semaphores, lamps_by_id + + +def print_help(): + print("Commands:") + print(" off") + print(" on") + print(" blink [period_ms]") + print("Examples:") + print(" 101 on") + print(" 102 blink 500") + + +def main(): + cfg = load_config() + semaphores, lamps_by_id = init_semaphores(cfg) + + print("READY") + print(f"Semaphores: {sorted(semaphores.keys())}") + print(f"Lamps: {sorted(lamps_by_id.keys())}") + print_help() + + while True: + line = sys.stdin.readline() + if not line: + _sleep_ms(20) + continue + + line = line.strip() + if not line: + continue + + if line.lower() in ("help", "h", "?"): + print_help() + continue + + parts = line.split() + if len(parts) < 2: + print("ERR ожидаю: [period_ms]") + continue + + try: + lamp_id = int(parts[0]) + except ValueError: + print("ERR lamp_id должен быть числом") + continue + + state = parts[1].lower() + lamp = lamps_by_id.get(lamp_id) + if lamp is None: + print(f"ERR lamp_id {lamp_id} не найден") + continue + + if state == "off": + lamp.off() + print(f"OK {lamp_id} off") + elif state == "on": + lamp.on() + print(f"OK {lamp_id} on") + elif state == "blink": + period_ms = 500 + if len(parts) >= 3: + try: + period_ms = int(parts[2]) + except ValueError: + print("ERR period_ms должен быть числом") + continue + lamp.blink(period_ms) + print(f"OK {lamp_id} blink {period_ms}") + else: + print("ERR state должен быть off/on/blink") + + +if __name__ == "__main__": + main() +