Files
Artem Kashaev 30feef708c Update configuration and enhance railway signal functionality
- Adjust angle parameters for switches in config.json
- Implement semaphore loading in main.py
- Refactor Lamp modes to use integers in railway_signal.py
- Add set_mode method for Lamp class to control lamp states
2026-01-27 15:17:56 +05:00

279 lines
8.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import sys
from switch import Switch
import select
from machine import Pin
from time import sleep_ms, ticks_diff, ticks_ms
from ir_pair import IRRxTxPollPair
from double_ir import DoubleIr
from railway_signal import RailwaySignal, Lamp
SEMINSU_V1: dict[int, IRRxTxPollPair] = dict()
SEMINSU_V2: dict[int, DoubleIr] = dict()
SWITCHES: dict[int, Switch] = dict()
LAMPS: dict[int, Lamp] = dict()
_SEMAPHORES: dict[int, RailwaySignal] = dict()
LED = Pin("LED", Pin.OUT) # "LED" — специальное имя для встроенного индикатора
def load_switches():
with open("config.json", "r") as file:
import json
config = json.load(file)
for sw_cfg in config.get("switches", []):
sw = Switch(
id=sw_cfg["id"],
pin=sw_cfg["pin"],
angle_minus=sw_cfg["angle_minus"],
angle_plus=sw_cfg["angle_plus"],
)
SWITCHES[sw.id] = sw
def load_seminsus_v1():
with open("config.json", "r") as file:
import json
config = json.load(file)
if "seminsus_v1" not in config:
return
for sem_cfg in config["seminsus_v1"]:
seminsu = IRRxTxPollPair(
rx_pin=sem_cfg["pin_rx"],
tx_pin=sem_cfg["pin_tx"],
poll_period_ms=50,
tx_on_ms=3,
blinks_per_poll=10,
blink_off_ms=2,
freq_hz=38_000,
duty_percent=33,
min_edges=10,
count_rising=False,
count_falling=True,
)
SEMINSU_V1[sem_cfg["id"]] = seminsu
def load_seminsus_v2():
with open("config.json", "r") as file:
import json
config = json.load(file)
if "irs" not in config:
return
for sem_cfg in config["irs"]:
pin = Pin(sem_cfg["pin"], Pin.IN)
seminsu = DoubleIr(sem_cfg["id"], pin)
SEMINSU_V2[sem_cfg["id"]] = seminsu
def load_semaphores():
with open("config.json", "r") as file:
import json
config = json.load(file)
for sem in config.get("semaphores", []):
sem_id = sem.get("id")
pin = sem.get("pin")
bpp = sem.get("bpp", 4)
lamps = sem.get("lamps", [])
if sem_id is None or pin is None:
continue
signal = RailwaySignal(pin=int(pin), num_leds=len(lamps), bpp=int(bpp), timing=1)
_SEMAPHORES[int(sem_id)] = signal
for pxl_ind, lamp_cfg in enumerate(lamps):
lamp_id = lamp_cfg.get("id")
color = lamp_cfg.get("color", "OFF")
if lamp_id is None:
continue
lamp = signal.add_lamp(int(lamp_id), pxl_ind, str(color))
LAMPS[int(lamp_id)] = lamp
def resolve_command(command: str):
parts = command.split()
if len(parts) == 4 and parts[0] == "SWITCH" and parts[2] == "TURN":
try:
sw_id = int(parts[1])
except ValueError:
return "ERROR Invalid ID"
direction = parts[3]
if direction not in ("1", "0"):
return "ERROR Invalid direction"
if sw_id not in SWITCHES:
return f"ERROR Switch {sw_id} not found"
# Выполняем действие
if direction == "0":
SWITCHES[sw_id].set_plus()
else:
SWITCHES[sw_id].set_minus()
# РОВНО ОДНА СТРОКА — подтверждение успеха
return f"EVENT SWITCH {sw_id} {direction}"
elif parts[0] == "GET" and parts[1] == "ALL":
evts = []
for id, sw in SWITCHES.items():
evts.append(f"EVENT SWITCH {id} {sw.pos}")
for id, seminsu in SEMINSU_V1.items():
evts.append(f"EVENT IK_MODULE {id} {seminsu.last_state}")
for id, seminsu in SEMINSU_V2.items():
evts.append(f"EVENT IK_MODULE {id} {seminsu.last_state}")
for id, lamp in LAMPS.items():
evts.append(f"EVENT LAMP {id} {lamp.mode}")
return "\n".join(evts)
elif len(parts) == 4 and parts[0] == "LAMP" and parts[2] == "SET":
try:
lamp_id = int(parts[1])
except ValueError:
return "ERROR Invalid lamp ID"
if parts[3].isdigit():
mode = int(parts[3])
else:
return "ERROR Invalid mode"
if lamp_id not in LAMPS:
return f"ERROR Lamp {lamp_id} not found"
lamp = LAMPS[lamp_id]
lamp.set_mode(mode)
return f"EVENT LAMP {lamp_id} {mode}"
_seminsu_ids = []
_seminsu_idx = 0
_active_seminsu_id = None
def _next_seminsu_id():
global _seminsu_idx
if not _seminsu_ids:
return None
sid = _seminsu_ids[_seminsu_idx]
_seminsu_idx = (_seminsu_idx + 1) % len(_seminsu_ids)
return sid
def poll_seminsus_step():
"""Неблокирующий шаг опроса IK_MODULE (seminsus_v1 + seminsu_v2).
Идём round-robin по объединению ID из SEMINSU_V1 и SEMINSU_V2.
ID не пересекаются, поэтому тип выбирается по наличию в словаре.
Для v1 сохраняется текущая неблокирующая логика: start_poll()/update()
и печать события только при изменении состояния.
Для v2 вызывается check() (печать события внутри check() при изменении).
"""
global _active_seminsu_id
if not SEMINSU_V1 and not SEMINSU_V2:
return
# Если есть активный v1-цикл — продолжаем его до завершения.
if _active_seminsu_id is not None:
seminsu = SEMINSU_V1.get(_active_seminsu_id)
if seminsu is None:
_active_seminsu_id = None
return
done = seminsu.update()
if not done:
return
# Цикл завершён — печатаем при изменении состояния.
if seminsu.prev_state is not None and seminsu.last_state is not None:
if seminsu.prev_state != seminsu.last_state:
# state: 1 = перекрыт, 0 = не перекрыт
print(f"EVENT IK_MODULE {_active_seminsu_id} {seminsu.last_state}")
_active_seminsu_id = None
return
# Берём следующий ID из объединённого round-robin.
sid = _next_seminsu_id()
if sid is None:
return
if sid in SEMINSU_V2:
SEMINSU_V2[sid].check()
return
# Иначе — v1.
seminsu = SEMINSU_V1.get(sid)
if seminsu is None:
return
_active_seminsu_id = sid
seminsu.start_poll()
done = seminsu.update()
if not done:
return
# Быстрый случай: цикл успел завершиться в этом же шаге.
if seminsu.prev_state is not None and seminsu.last_state is not None:
if seminsu.prev_state != seminsu.last_state:
print(f"EVENT IK_MODULE {sid} {seminsu.last_state}")
_active_seminsu_id = None
def work():
poll = select.poll()
poll.register(sys.stdin, select.POLLIN)
# Мигание встроенного светодиода: полный период 1 секунда.
# toggle() каждые 500 мс => 0.5с ON + 0.5с OFF.
LED.value(0)
last_led_toggle_ms = ticks_ms()
# Готовим список id IK_MODULE для round-robin (v1 + v2).
global _seminsu_ids, _seminsu_idx, _active_seminsu_id
_seminsu_ids = sorted(list(SEMINSU_V1.keys()) + list(SEMINSU_V2.keys()))
_seminsu_idx = 0
_active_seminsu_id = None
while True:
now_ms = ticks_ms()
if ticks_diff(now_ms, last_led_toggle_ms) >= 500:
LED.toggle()
last_led_toggle_ms = now_ms
# 1) Обработка stdin НЕ блокирует цикл seminsu.
events = poll.poll(0)
for fd, event in events:
if event & select.POLLIN:
try:
line = sys.stdin.readline().strip()
if not line:
continue
result = resolve_command(line)
if result:
print(result)
except Exception as e:
print(f"ERROR {e}")
# 2) Один неблокирующий шаг опроса seminsu.
poll_seminsus_step()
# Небольшая пауза, чтобы не крутить CPU на 100%.
sleep_ms(1)
if __name__ == "__main__":
load_switches()
load_seminsus_v1()
load_seminsus_v2()
load_semaphores()
work()