import sys from switch import Switch import select from machine import Pin from time import sleep_ms, ticks_diff, ticks_ms from ir_pair import IRRxTxPollPair from double_ir import DoubleIr from railway_signal import RailwaySignal, Lamp SEMINSU_V1: dict[int, IRRxTxPollPair] = dict() SEMINSU_V2: dict[int, DoubleIr] = dict() SWITCHES: dict[int, Switch] = dict() LAMPS: dict[int, Lamp] = dict() _SEMAPHORES: dict[int, RailwaySignal] = dict() LED = Pin("LED", Pin.OUT) # "LED" — специальное имя для встроенного индикатора def load_switches(): with open("config.json", "r") as file: import json config = json.load(file) for sw_cfg in config.get("switches", []): sw = Switch( id=sw_cfg["id"], pin=sw_cfg["pin"], angle_minus=sw_cfg["angle_minus"], angle_plus=sw_cfg["angle_plus"], ) SWITCHES[sw.id] = sw def load_seminsus_v1(): with open("config.json", "r") as file: import json config = json.load(file) if "seminsus_v1" not in config: return for sem_cfg in config["seminsus_v1"]: seminsu = IRRxTxPollPair( rx_pin=sem_cfg["pin_rx"], tx_pin=sem_cfg["pin_tx"], poll_period_ms=50, tx_on_ms=3, blinks_per_poll=10, blink_off_ms=2, freq_hz=38_000, duty_percent=33, min_edges=10, count_rising=False, count_falling=True, ) SEMINSU_V1[sem_cfg["id"]] = seminsu def load_seminsus_v2(): with open("config.json", "r") as file: import json config = json.load(file) if "irs" not in config: return for sem_cfg in config["irs"]: pin = Pin(sem_cfg["pin"], Pin.IN) seminsu = DoubleIr(sem_cfg["id"], pin) SEMINSU_V2[sem_cfg["id"]] = seminsu def load_semaphores(): with open("config.json", "r") as file: import json config = json.load(file) for sem in config.get("semaphores", []): sem_id = sem.get("id") pin = sem.get("pin") bpp = sem.get("bpp", 4) lamps = sem.get("lamps", []) if sem_id is None or pin is None: continue signal = RailwaySignal(pin=int(pin), num_leds=len(lamps), bpp=int(bpp), timing=1) _SEMAPHORES[int(sem_id)] = signal for pxl_ind, lamp_cfg in enumerate(lamps): lamp_id = lamp_cfg.get("id") color = lamp_cfg.get("color", "OFF") if lamp_id is None: continue lamp = signal.add_lamp(int(lamp_id), pxl_ind, str(color)) LAMPS[int(lamp_id)] = lamp def resolve_command(command: str): parts = command.split() if len(parts) == 4 and parts[0] == "SWITCH" and parts[2] == "TURN": try: sw_id = int(parts[1]) except ValueError: return "ERROR Invalid ID" direction = parts[3] if direction not in ("1", "0"): return "ERROR Invalid direction" if sw_id not in SWITCHES: return f"ERROR Switch {sw_id} not found" # Выполняем действие if direction == "0": SWITCHES[sw_id].set_plus() else: SWITCHES[sw_id].set_minus() # РОВНО ОДНА СТРОКА — подтверждение успеха return f"EVENT SWITCH {sw_id} {direction}" elif parts[0] == "GET" and parts[1] == "ALL": evts = [] for id, sw in SWITCHES.items(): evts.append(f"EVENT SWITCH {id} {sw.pos}") for id, seminsu in SEMINSU_V1.items(): evts.append(f"EVENT IK_MODULE {id} {seminsu.last_state}") for id, seminsu in SEMINSU_V2.items(): evts.append(f"EVENT IK_MODULE {id} {seminsu.last_state}") for id, lamp in LAMPS.items(): evts.append(f"EVENT LAMP {id} {lamp.mode}") return "\n".join(evts) elif len(parts) == 4 and parts[0] == "LAMP" and parts[2] == "SET": try: lamp_id = int(parts[1]) except ValueError: return "ERROR Invalid lamp ID" if parts[3].isdigit(): mode = int(parts[3]) else: return "ERROR Invalid mode" if lamp_id not in LAMPS: return f"ERROR Lamp {lamp_id} not found" lamp = LAMPS[lamp_id] lamp.set_mode(mode) return f"EVENT LAMP {lamp_id} {mode}" _seminsu_ids = [] _seminsu_idx = 0 _active_seminsu_id = None def _next_seminsu_id(): global _seminsu_idx if not _seminsu_ids: return None sid = _seminsu_ids[_seminsu_idx] _seminsu_idx = (_seminsu_idx + 1) % len(_seminsu_ids) return sid def poll_seminsus_step(): """Неблокирующий шаг опроса IK_MODULE (seminsus_v1 + seminsu_v2). Идём round-robin по объединению ID из SEMINSU_V1 и SEMINSU_V2. ID не пересекаются, поэтому тип выбирается по наличию в словаре. Для v1 сохраняется текущая неблокирующая логика: start_poll()/update() и печать события только при изменении состояния. Для v2 вызывается check() (печать события внутри check() при изменении). """ global _active_seminsu_id if not SEMINSU_V1 and not SEMINSU_V2: return # Если есть активный v1-цикл — продолжаем его до завершения. if _active_seminsu_id is not None: seminsu = SEMINSU_V1.get(_active_seminsu_id) if seminsu is None: _active_seminsu_id = None return done = seminsu.update() if not done: return # Цикл завершён — печатаем при изменении состояния. if seminsu.prev_state is not None and seminsu.last_state is not None: if seminsu.prev_state != seminsu.last_state: # state: 1 = перекрыт, 0 = не перекрыт print(f"EVENT IK_MODULE {_active_seminsu_id} {seminsu.last_state}") _active_seminsu_id = None return # Берём следующий ID из объединённого round-robin. sid = _next_seminsu_id() if sid is None: return if sid in SEMINSU_V2: SEMINSU_V2[sid].check() return # Иначе — v1. seminsu = SEMINSU_V1.get(sid) if seminsu is None: return _active_seminsu_id = sid seminsu.start_poll() done = seminsu.update() if not done: return # Быстрый случай: цикл успел завершиться в этом же шаге. if seminsu.prev_state is not None and seminsu.last_state is not None: if seminsu.prev_state != seminsu.last_state: print(f"EVENT IK_MODULE {sid} {seminsu.last_state}") _active_seminsu_id = None def work(): poll = select.poll() poll.register(sys.stdin, select.POLLIN) # Мигание встроенного светодиода: полный период 1 секунда. # toggle() каждые 500 мс => 0.5с ON + 0.5с OFF. LED.value(0) last_led_toggle_ms = ticks_ms() # Готовим список id IK_MODULE для round-robin (v1 + v2). global _seminsu_ids, _seminsu_idx, _active_seminsu_id _seminsu_ids = sorted(list(SEMINSU_V1.keys()) + list(SEMINSU_V2.keys())) _seminsu_idx = 0 _active_seminsu_id = None while True: now_ms = ticks_ms() if ticks_diff(now_ms, last_led_toggle_ms) >= 500: LED.toggle() last_led_toggle_ms = now_ms # 1) Обработка stdin НЕ блокирует цикл seminsu. events = poll.poll(0) for fd, event in events: if event & select.POLLIN: try: line = sys.stdin.readline().strip() if not line: continue result = resolve_command(line) if result: print(result) except Exception as e: print(f"ERROR {e}") # 2) Один неблокирующий шаг опроса seminsu. poll_seminsus_step() # Небольшая пауза, чтобы не крутить CPU на 100%. sleep_ms(1) if __name__ == "__main__": load_switches() load_seminsus_v1() load_seminsus_v2() load_semaphores() work()