133 lines
2.9 KiB
Python
133 lines
2.9 KiB
Python
import sys
|
|
import time
|
|
|
|
try:
|
|
import ujson as json # MicroPython
|
|
except ImportError:
|
|
import json
|
|
|
|
from railway_signal import RailwaySignal
|
|
|
|
|
|
def _sleep_ms(ms: int):
|
|
sleep_ms = getattr(time, "sleep_ms", None)
|
|
if sleep_ms is not None:
|
|
sleep_ms(ms)
|
|
else:
|
|
time.sleep(ms / 1000)
|
|
|
|
|
|
def load_config():
|
|
# Поддержка обоих имён (в проекте встречаются оба варианта)
|
|
for name in ("semaphore-config.json", "semaphor-config.json", "semaphor-config.json"):
|
|
try:
|
|
with open(name, "r") as f:
|
|
return json.load(f)
|
|
except OSError:
|
|
continue
|
|
raise OSError("Config not found: semaphore-config.json")
|
|
|
|
|
|
def init_semaphores(cfg: dict):
|
|
semaphores = {}
|
|
lamps_by_id = {}
|
|
|
|
for sem in cfg.get("semaphores", []):
|
|
sem_id = sem.get("id")
|
|
pin = sem.get("pin")
|
|
bpp = sem.get("bpp", 4)
|
|
lamps = sem.get("lamps", [])
|
|
|
|
if sem_id is None or pin is None:
|
|
continue
|
|
|
|
signal = RailwaySignal(pin=int(pin), num_leds=len(lamps), bpp=int(bpp), timing=1)
|
|
semaphores[int(sem_id)] = signal
|
|
|
|
# Маппинг: порядок ламп в конфиге -> индекс пикселя
|
|
for pixel_index, lamp_cfg in enumerate(lamps):
|
|
lamp_id = lamp_cfg.get("id")
|
|
color = lamp_cfg.get("color", "OFF")
|
|
if lamp_id is None:
|
|
continue
|
|
|
|
lamp = signal.add_lamp(int(lamp_id), pixel_index, str(color))
|
|
lamps_by_id[int(lamp_id)] = lamp
|
|
|
|
return semaphores, lamps_by_id
|
|
|
|
|
|
def print_help():
|
|
print("Commands:")
|
|
print(" <lamp_id> off")
|
|
print(" <lamp_id> on")
|
|
print(" <lamp_id> blink [period_ms]")
|
|
print("Examples:")
|
|
print(" 101 on")
|
|
print(" 102 blink 500")
|
|
|
|
|
|
def main():
|
|
cfg = load_config()
|
|
semaphores, lamps_by_id = init_semaphores(cfg)
|
|
|
|
print("READY")
|
|
print(f"Semaphores: {sorted(semaphores.keys())}")
|
|
print(f"Lamps: {sorted(lamps_by_id.keys())}")
|
|
print_help()
|
|
|
|
while True:
|
|
line = sys.stdin.readline()
|
|
if not line:
|
|
_sleep_ms(20)
|
|
continue
|
|
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
if line.lower() in ("help", "h", "?"):
|
|
print_help()
|
|
continue
|
|
|
|
parts = line.split()
|
|
if len(parts) < 2:
|
|
print("ERR ожидаю: <lamp_id> <state> [period_ms]")
|
|
continue
|
|
|
|
try:
|
|
lamp_id = int(parts[0])
|
|
except ValueError:
|
|
print("ERR lamp_id должен быть числом")
|
|
continue
|
|
|
|
state = parts[1].lower()
|
|
lamp = lamps_by_id.get(lamp_id)
|
|
if lamp is None:
|
|
print(f"ERR lamp_id {lamp_id} не найден")
|
|
continue
|
|
|
|
if state == "off":
|
|
lamp.off()
|
|
print(f"OK {lamp_id} off")
|
|
elif state == "on":
|
|
lamp.on()
|
|
print(f"OK {lamp_id} on")
|
|
elif state == "blink":
|
|
period_ms = 500
|
|
if len(parts) >= 3:
|
|
try:
|
|
period_ms = int(parts[2])
|
|
except ValueError:
|
|
print("ERR period_ms должен быть числом")
|
|
continue
|
|
lamp.blink(period_ms)
|
|
print(f"OK {lamp_id} blink {period_ms}")
|
|
else:
|
|
print("ERR state должен быть off/on/blink")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|