Add configuration for semaphore signals and implement railway signal control

This commit is contained in:
Artem Kashaev
2026-01-22 10:30:31 +05:00
parent 7a455bd6ec
commit 5ae313defd
8 changed files with 638 additions and 111 deletions
+44 -16
View File
@@ -3,38 +3,66 @@
{ {
"id": 1283, "id": 1283,
"pin": 12, "pin": 12,
"angle_minus": 70, "angle_minus": 115,
"angle_plus": 125 "angle_plus": 65
}, },
{ {
"id": 1272, "id": 1272,
"pin": 13, "pin": 13,
"angle_minus": 65, "angle_minus": 115,
"angle_plus": 125 "angle_plus": 65
}, },
{ {
"id": 1444, "id": 1444,
"pin": 2, "pin": 2,
"angle_minus": 125, "angle_minus": 115,
"angle_plus": 65 "angle_plus": 65
}, },
{ {
"id": 1274, "id": 1274,
"pin": 3, "pin": 3,
"angle_minus": 125, "angle_minus": 115,
"angle_plus": 65 "angle_plus": 65
}
],
"seminsus": [
{
"id": 6000,
"pin_rx": 16,
"pin_tx": 6
}, },
{ {
"id": 6001, "id": 1457,
"pin_rx": 17, "pin": 4,
"pin_tx": 7 "angle_minus": 65,
"angle_plus": 115
}
],
"irs": [
{
"id": 7000,
"pin": 16
},
{
"id": 7001,
"pin": 17
},
{
"id": 7002,
"pin": 18
},
{
"id": 7003,
"pin": 19
},
{
"id": 7004,
"pin": 20
},
{
"id": 7005,
"pin": 21
},
{
"id": 7006,
"pin": 22
},
{
"id": 7007,
"pin": 26
} }
] ]
} }
+14
View File
@@ -0,0 +1,14 @@
from machine import Pin
class DoubleIr:
def __init__(self, id: int, pin: Pin):
self.id = id
self.pin = pin
self.last_state = pin.value()
def check(self):
val = self.pin.value()
if val != self.last_state:
self.last_state = val
print(f"EVENT IK_MODULE {self.id} {val}")
+17 -66
View File
@@ -1,71 +1,22 @@
from machine import Pin from machine import Pin
from time import ticks_ms, ticks_diff from time import sleep_ms
from ir_pair import IRRxTxPollPair # pin1 = Pin(16, Pin.IN)
pin2 = Pin(17, Pin.IN)
class Beam:
def __init__(self, id: int, pin: Pin):
self.id = id
self.pin = pin
self.prev_val = pin.value()
# --- Настройки пинов --- def check(self):
# RX_PIN: выход ИК-приёмника (обычно TTL сигнал с модуля VS1838/TSOP*) val = self.pin.value()
# TX_PIN: пин ИК-светодиода/ключа (на него подаётся 38кГц PWM во время окна опроса) if val != self.prev_val:
RX_PIN = 16 self.prev_val = val
TX_PIN = 6 print(f"EVENT IK_MODULE {self.id} {val}")
beam = Beam(1, pin2)
# --- Тайминги опроса --- while True:
POLL_PERIOD_MS = 500 # период опроса (как часто проверяем) beam.check()
TX_ON_MS = 50 # сколько держим 38кГц включённым в каждом цикле sleep_ms(50)
STATUS_PERIOD_MS = 1000
# --- Условия теста ---
# N влияет сразу на:
# - сколько раз "мигнуть" ИК-передатчиком за 1 цикл опроса
# - сколько фронтов (edges) нужно увидеть, чтобы считать луч НЕ перекрытым
N = 5
LED = Pin("LED", Pin.OUT)
def main():
pair = IRRxTxPollPair(
rx_pin=RX_PIN,
tx_pin=TX_PIN,
poll_period_ms=POLL_PERIOD_MS,
tx_on_ms=TX_ON_MS,
blinks_per_poll=N,
blink_off_ms=5,
freq_hz=38_000,
duty_percent=33,
min_edges=N,
# Считаем только FALLING: обычно на каждом включении carrier RX даёт 1 спад.
# Тогда 5 миганий ~= 5 edges (а при подсчёте обоих фронтов было бы ~10).
count_rising=False,
count_falling=True,
)
last_status = ticks_ms()
print("IR poll test started")
print("RX pin:", RX_PIN, "TX pin:", TX_PIN)
print("poll_period_ms:", POLL_PERIOD_MS, "tx_on_ms:", TX_ON_MS)
print("N:", N, "(blinks_per_poll and min_edges)")
try:
while True:
pair.poll_once() # результат и счетчики сохраняются внутри pair
LED.toggle()
now = ticks_ms()
if ticks_diff(now, last_status) >= STATUS_PERIOD_MS:
seen = bool(pair.last_seen)
print(
("BEAM NOT BLOCKED" if seen else "BEAM BLOCKED")
+ " | edges=%d" % pair.last_edges
)
last_status = now
finally:
pair.deinit()
main()
+1 -1
View File
@@ -5,7 +5,7 @@ import random
# Настройки: 8 LED, подключено к GP6 # Настройки: 8 LED, подключено к GP6
num_leds = 8 num_leds = 8
pin = 6 # GP6 pin = 0 # GP6
np = neopixel.NeoPixel(Pin(pin), num_leds) np = neopixel.NeoPixel(Pin(pin), num_leds)
+82 -19
View File
@@ -2,35 +2,40 @@ import sys
from switch import Switch from switch import Switch
import select import select
from machine import Pin from machine import Pin
from time import sleep_ms from time import sleep_ms, ticks_diff, ticks_ms
from ir_pair import IRRxTxPollPair from ir_pair import IRRxTxPollPair
from double_ir import DoubleIr
SEMINSU = dict() SEMINSU_V1 = dict()
SEMINSU_V2 = dict()
SWITCHES = dict() SWITCHES = dict()
LED = Pin("LED", Pin.OUT) # "LED" — специальное имя для встроенного индикатора LED = Pin("LED", Pin.OUT) # "LED" — специальное имя для встроенного индикатора
def load_switches(): def load_switches():
with open("config.json", "r") as file: with open("config.json", "r") as file:
import json import json
config = json.load(file) config = json.load(file)
for sw_cfg in config["switches"]: for sw_cfg in config["switches"]:
sw = Switch( sw = Switch(
id=sw_cfg["id"], id=sw_cfg["id"],
pin=sw_cfg["pin"], pin=sw_cfg["pin"],
angle_minus=sw_cfg["angle_minus"], angle_minus=sw_cfg["angle_minus"],
angle_plus=sw_cfg["angle_plus"] angle_plus=sw_cfg["angle_plus"],
) )
SWITCHES[sw.id] = sw SWITCHES[sw.id] = sw
def load_seminsus(): def load_seminsus_v1():
with open("config.json", "r") as file: with open("config.json", "r") as file:
import json import json
config = json.load(file) config = json.load(file)
for sem_cfg in config["seminsus"]: if "seminsus_v1" not in config:
return
for sem_cfg in config["seminsus_v1"]:
seminsu = IRRxTxPollPair( seminsu = IRRxTxPollPair(
rx_pin=sem_cfg["pin_rx"], rx_pin=sem_cfg["pin_rx"],
tx_pin=sem_cfg["pin_tx"], tx_pin=sem_cfg["pin_tx"],
@@ -44,9 +49,21 @@ def load_seminsus():
count_rising=False, count_rising=False,
count_falling=True, count_falling=True,
) )
SEMINSU[sem_cfg["id"]] = seminsu SEMINSU_V1[sem_cfg["id"]] = seminsu
def load_seminsus_v2():
with open("config.json", "r") as file:
import json
config = json.load(file)
if "irs" not in config:
return
for sem_cfg in config["irs"]:
pin = Pin(sem_cfg["pin"], Pin.IN)
seminsu = DoubleIr(sem_cfg["id"], pin)
SEMINSU_V2[sem_cfg["id"]] = seminsu
def resolve_command(command: str): def resolve_command(command: str):
parts = command.split() parts = command.split()
@@ -75,7 +92,9 @@ def resolve_command(command: str):
evts = [] evts = []
for id, sw in SWITCHES.items(): for id, sw in SWITCHES.items():
evts.append(f"EVENT SWITCH {id} {sw.pos}") evts.append(f"EVENT SWITCH {id} {sw.pos}")
for id, seminsu in SEMINSU.items(): for id, seminsu in SEMINSU_V1.items():
evts.append(f"EVENT IK_MODULE {id} {seminsu.last_state}")
for id, seminsu in SEMINSU_V2.items():
evts.append(f"EVENT IK_MODULE {id} {seminsu.last_state}") evts.append(f"EVENT IK_MODULE {id} {seminsu.last_state}")
return "\n".join(evts) return "\n".join(evts)
@@ -95,22 +114,28 @@ def _next_seminsu_id():
def poll_seminsus_step(): def poll_seminsus_step():
"""Неблокирующий шаг опроса seminsu. """Неблокирующий шаг опроса IK_MODULE (seminsus_v1 + seminsu_v2).
В каждый момент времени опрашивается только ОДНА пара. Идём round-robin по объединению ID из SEMINSU_V1 и SEMINSU_V2.
ID не пересекаются, поэтому тип выбирается по наличию в словаре.
Для v1 сохраняется текущая неблокирующая логика: start_poll()/update()
и печать события только при изменении состояния.
Для v2 вызывается check() (печать события внутри check() при изменении).
""" """
global _active_seminsu_id global _active_seminsu_id
if not SEMINSU: if not SEMINSU_V1 and not SEMINSU_V2:
return return
if _active_seminsu_id is None: # Если есть активный v1-цикл — продолжаем его до завершения.
_active_seminsu_id = _next_seminsu_id() if _active_seminsu_id is not None:
if _active_seminsu_id is None: seminsu = SEMINSU_V1.get(_active_seminsu_id)
if seminsu is None:
_active_seminsu_id = None
return return
SEMINSU[_active_seminsu_id].start_poll()
seminsu = SEMINSU[_active_seminsu_id]
done = seminsu.update() done = seminsu.update()
if not done: if not done:
return return
@@ -122,20 +147,56 @@ def poll_seminsus_step():
print(f"EVENT IK_MODULE {_active_seminsu_id} {seminsu.last_state}") print(f"EVENT IK_MODULE {_active_seminsu_id} {seminsu.last_state}")
_active_seminsu_id = None _active_seminsu_id = None
return
# Берём следующий ID из объединённого round-robin.
sid = _next_seminsu_id()
if sid is None:
return
if sid in SEMINSU_V2:
SEMINSU_V2[sid].check()
return
# Иначе — v1.
seminsu = SEMINSU_V1.get(sid)
if seminsu is None:
return
_active_seminsu_id = sid
seminsu.start_poll()
done = seminsu.update()
if not done:
return
# Быстрый случай: цикл успел завершиться в этом же шаге.
if seminsu.prev_state is not None and seminsu.last_state is not None:
if seminsu.prev_state != seminsu.last_state:
print(f"EVENT IK_MODULE {sid} {seminsu.last_state}")
_active_seminsu_id = None
def work(): def work():
poll = select.poll() poll = select.poll()
poll.register(sys.stdin, select.POLLIN) poll.register(sys.stdin, select.POLLIN)
# Готовим список id seminsu для round-robin. # Мигание встроенного светодиода: полный период 1 секунда.
# toggle() каждые 500 мс => 0.5с ON + 0.5с OFF.
LED.value(0)
last_led_toggle_ms = ticks_ms()
# Готовим список id IK_MODULE для round-robin (v1 + v2).
global _seminsu_ids, _seminsu_idx, _active_seminsu_id global _seminsu_ids, _seminsu_idx, _active_seminsu_id
_seminsu_ids = list(SEMINSU.keys()) _seminsu_ids = sorted(list(SEMINSU_V1.keys()) + list(SEMINSU_V2.keys()))
_seminsu_idx = 0 _seminsu_idx = 0
_active_seminsu_id = None _active_seminsu_id = None
while True: while True:
now_ms = ticks_ms()
if ticks_diff(now_ms, last_led_toggle_ms) >= 500:
LED.toggle() LED.toggle()
last_led_toggle_ms = now_ms
# 1) Обработка stdin НЕ блокирует цикл seminsu. # 1) Обработка stdin НЕ блокирует цикл seminsu.
events = poll.poll(0) events = poll.poll(0)
@@ -159,7 +220,9 @@ def work():
# Небольшая пауза, чтобы не крутить CPU на 100%. # Небольшая пауза, чтобы не крутить CPU на 100%.
sleep_ms(1) sleep_ms(1)
if __name__ == "__main__": if __name__ == "__main__":
load_switches() load_switches()
load_seminsus() load_seminsus_v1()
load_seminsus_v2()
work() work()
+323
View File
@@ -0,0 +1,323 @@
import machine
import neopixel
import time
try:
import micropython
except ImportError: # CPython / non-MicroPython
micropython = None # type: ignore
LAMP_COLORS = {
# Имена цветов -> RGBA (для RGBW ленты)
"OFF": (0, 0, 0, 0),
"RED": (100, 0, 0, 0),
"YELLOW": (100, 45, 0, 0),
"GREEN": (0, 100, 0, 0),
"BLUE": (0, 0, 100, 0),
"WHITE": (0, 0, 0, 100),
"MOON_WHITE": (0, 0, 0, 100),
}
class Lamp:
"""Одна лампа светофора (один пиксель).
3 режима:
- off: выключено
- on: постоянно горит
- blink: мигает (переключение ON/OFF с периодом period_ms)
"""
MODE_OFF = "off"
MODE_ON = "on"
MODE_BLINK = "blink"
def __init__(self, signal, lamp_id: int, pixel_index: int, rgba):
self._signal = signal
self.id = lamp_id
self.pixel = pixel_index
self.rgba = rgba
self.mode = Lamp.MODE_OFF
self._blink_period_ms = 500
self._blink_on = False
self._next_toggle_ms = 0
def off(self):
self._signal._set_lamp_mode(self, Lamp.MODE_OFF)
def on(self):
self._signal._set_lamp_mode(self, Lamp.MODE_ON)
def blink(self, period_ms: int = 500):
self._signal._set_lamp_mode(self, Lamp.MODE_BLINK, period_ms=period_ms)
class RailwaySignal:
"""Контейнер ламп светофора.
Хранит NeoPixel и набор ламп (каждая лампа — один пиксель).
Мигающие лампы обслуживаются общим Timer.
"""
def __init__(
self,
pin: int,
num_leds: int,
*,
bpp: int = 4,
timing: int = 1,
colors: dict | None = None,
timer_id: int = -1,
timer_tick_ms: int = 50,
brightness: float = 0.5,
):
self.pin = pin
self.num_leds = num_leds
self.bpp = bpp
self.timing = timing
self.colors = colors if colors is not None else LAMP_COLORS
self.brightness = float(brightness)
if self.brightness < 0.0:
self.brightness = 0.0
if self.brightness > 1.0:
self.brightness = 1.0
self._timer_id = timer_id
self._timer_tick_ms = int(timer_tick_ms)
self._timer = None
self._timer_running = False
self._scheduled = False
self._lamps_by_id = {}
self._np = neopixel.NeoPixel(
machine.Pin(pin),
num_leds,
bpp=bpp,
timing=timing,
)
self.clear()
def set_brightness(self, brightness: float):
"""Устанавливает глобальный коэффициент яркости (0.0..1.0)."""
self.brightness = float(brightness)
if self.brightness < 0.0:
self.brightness = 0.0
if self.brightness > 1.0:
self.brightness = 1.0
# Перерисовываем текущее состояние ламп
for lamp in self._lamps_by_id.values():
self._apply_lamp(lamp)
self.show()
def _scale(self, value: int) -> int:
value = int(value)
if value <= 0:
return 0
scaled = int(value * self.brightness)
if scaled < 0:
return 0
if scaled > 255:
return 255
return scaled
def _normalize_rgba(self, rgba: tuple | list):
"""Приводит цвет к формату (r,g,b,w) с учётом self.bpp.
- для bpp=4: всегда (r,g,b,w)
- для bpp=3: W-канал маппится в RGB только если RGB=0 (белый)
"""
if len(rgba) == 3:
r, g, b = rgba
w = 0
else:
r, g, b, w = rgba
r = int(r)
g = int(g)
b = int(b)
w = int(w)
if self.bpp == 3:
# На RGB-ленте нет W-канала. Самый ожидаемый кейс — "WHITE"/"MOON_WHITE",
# которые заданы как (0,0,0,w). Маппим их в (w,w,w).
if r == 0 and g == 0 and b == 0 and w > 0:
r = w
g = w
b = w
w = 0
return (r, g, b, w)
def add_lamp(self, lamp_id: int, pixel_index: int, color: str | tuple | list):
if pixel_index < 0 or pixel_index >= self.num_leds:
raise ValueError("pixel_index out of range")
rgba = None
if isinstance(color, str):
rgba = self.colors.get(color.upper())
elif isinstance(color, (tuple, list)):
rgba = color
if rgba is None:
raise ValueError("Unknown color")
rgba = self._normalize_rgba(rgba)
lamp = Lamp(self, lamp_id, pixel_index, rgba)
self._lamps_by_id[lamp_id] = lamp
lamp.off()
return lamp
def lamp(self, lamp_id: int):
return self._lamps_by_id.get(lamp_id)
def _stop_timer(self):
if self._timer is None or not self._timer_running:
return
try:
self._timer.deinit()
except Exception:
pass
self._timer_running = False
def _ensure_timer(self):
if self._timer_running:
return
try:
self._timer = machine.Timer(self._timer_id)
except Exception:
# Some ports require Timer(0/1/2...), some allow -1.
self._timer = machine.Timer(0)
try:
self._timer.init(period=self._timer_tick_ms, mode=machine.Timer.PERIODIC, callback=self._timer_cb)
self._timer_running = True
except Exception:
self._timer_running = False
def _now_ms(self) -> int:
ticks_ms = getattr(time, "ticks_ms", None)
if ticks_ms is not None:
return int(ticks_ms())
return int(time.time() * 1000)
def _ticks_diff(self, a: int, b: int) -> int:
ticks_diff = getattr(time, "ticks_diff", None)
if ticks_diff is not None:
return int(ticks_diff(a, b))
return int(a - b)
def show(self):
self._np.write()
def clear(self):
self._stop_timer()
self._scheduled = False
if self.bpp == 3:
self._np.fill((0, 0, 0))
else:
self._np.fill((0, 0, 0, 0))
self.show()
def set_pixel_rgba(self, index: int, r: int, g: int, b: int, w: int = 0):
if self.bpp == 3:
self._np[index] = (self._scale(r), self._scale(g), self._scale(b))
else:
self._np[index] = (
self._scale(r),
self._scale(g),
self._scale(b),
self._scale(w),
)
def fill_rgba(self, r: int, g: int, b: int, w: int = 0):
if self.bpp == 3:
self._np.fill((self._scale(r), self._scale(g), self._scale(b)))
else:
self._np.fill((self._scale(r), self._scale(g), self._scale(b), self._scale(w)))
def _apply_lamp(self, lamp: Lamp):
if lamp.mode == Lamp.MODE_ON:
r, g, b, w = lamp.rgba
self.set_pixel_rgba(lamp.pixel, r, g, b, w)
elif lamp.mode == Lamp.MODE_BLINK:
if lamp._blink_on:
r, g, b, w = lamp.rgba
self.set_pixel_rgba(lamp.pixel, r, g, b, w)
else:
self.set_pixel_rgba(lamp.pixel, 0, 0, 0, 0)
else:
self.set_pixel_rgba(lamp.pixel, 0, 0, 0, 0)
def _any_blinking(self) -> bool:
for lamp in self._lamps_by_id.values():
if lamp.mode == Lamp.MODE_BLINK:
return True
return False
def _set_lamp_mode(self, lamp: Lamp, mode: str, *, period_ms: int = 500):
if mode == Lamp.MODE_BLINK:
lamp.mode = Lamp.MODE_BLINK
lamp._blink_period_ms = int(period_ms)
lamp._blink_on = False # стартуем с OFF как раньше
lamp._next_toggle_ms = self._now_ms() + lamp._blink_period_ms
elif mode == Lamp.MODE_ON:
lamp.mode = Lamp.MODE_ON
lamp._blink_on = False
else:
lamp.mode = Lamp.MODE_OFF
lamp._blink_on = False
self._apply_lamp(lamp)
self.show()
if self._any_blinking():
self._ensure_timer()
else:
self._stop_timer()
def _timer_cb(self, _t):
# IRQ context: keep it tiny.
if not self._any_blinking():
return
if micropython is not None:
if self._scheduled:
return
try:
self._scheduled = True
micropython.schedule(self._scheduled_tick, 0)
except Exception:
self._scheduled = False
def _scheduled_tick(self, _arg):
self._scheduled = False
now_ms = self._now_ms()
dirty = False
for lamp in self._lamps_by_id.values():
if lamp.mode != Lamp.MODE_BLINK:
continue
if self._ticks_diff(now_ms, lamp._next_toggle_ms) < 0:
continue
lamp._blink_on = not lamp._blink_on
lamp._next_toggle_ms = now_ms + lamp._blink_period_ms
self._apply_lamp(lamp)
dirty = True
if dirty:
self.show()
if not self._any_blinking():
self._stop_timer()
+16
View File
@@ -0,0 +1,16 @@
{
"semaphores": [
{
"id": 1,
"pin": 15,
"bpp": 4,
"lamps": [
{"color": "RED", "id": 101},
{"color": "YELLOW", "id": 102},
{"color": "GREEN", "id": 103},
{"color": "MOON_WHITE", "id": 104},
{"color": "BLUE", "id": 105}
]
}
]
}
+132
View File
@@ -0,0 +1,132 @@
import sys
import time
try:
import ujson as json # MicroPython
except ImportError:
import json
from railway_signal import RailwaySignal
def _sleep_ms(ms: int):
sleep_ms = getattr(time, "sleep_ms", None)
if sleep_ms is not None:
sleep_ms(ms)
else:
time.sleep(ms / 1000)
def load_config():
# Поддержка обоих имён (в проекте встречаются оба варианта)
for name in ("semaphore-config.json", "semaphor-config.json", "semaphor-config.json"):
try:
with open(name, "r") as f:
return json.load(f)
except OSError:
continue
raise OSError("Config not found: semaphore-config.json")
def init_semaphores(cfg: dict):
semaphores = {}
lamps_by_id = {}
for sem in cfg.get("semaphores", []):
sem_id = sem.get("id")
pin = sem.get("pin")
bpp = sem.get("bpp", 4)
lamps = sem.get("lamps", [])
if sem_id is None or pin is None:
continue
signal = RailwaySignal(pin=int(pin), num_leds=len(lamps), bpp=int(bpp), timing=1)
semaphores[int(sem_id)] = signal
# Маппинг: порядок ламп в конфиге -> индекс пикселя
for pixel_index, lamp_cfg in enumerate(lamps):
lamp_id = lamp_cfg.get("id")
color = lamp_cfg.get("color", "OFF")
if lamp_id is None:
continue
lamp = signal.add_lamp(int(lamp_id), pixel_index, str(color))
lamps_by_id[int(lamp_id)] = lamp
return semaphores, lamps_by_id
def print_help():
print("Commands:")
print(" <lamp_id> off")
print(" <lamp_id> on")
print(" <lamp_id> blink [period_ms]")
print("Examples:")
print(" 101 on")
print(" 102 blink 500")
def main():
cfg = load_config()
semaphores, lamps_by_id = init_semaphores(cfg)
print("READY")
print(f"Semaphores: {sorted(semaphores.keys())}")
print(f"Lamps: {sorted(lamps_by_id.keys())}")
print_help()
while True:
line = sys.stdin.readline()
if not line:
_sleep_ms(20)
continue
line = line.strip()
if not line:
continue
if line.lower() in ("help", "h", "?"):
print_help()
continue
parts = line.split()
if len(parts) < 2:
print("ERR ожидаю: <lamp_id> <state> [period_ms]")
continue
try:
lamp_id = int(parts[0])
except ValueError:
print("ERR lamp_id должен быть числом")
continue
state = parts[1].lower()
lamp = lamps_by_id.get(lamp_id)
if lamp is None:
print(f"ERR lamp_id {lamp_id} не найден")
continue
if state == "off":
lamp.off()
print(f"OK {lamp_id} off")
elif state == "on":
lamp.on()
print(f"OK {lamp_id} on")
elif state == "blink":
period_ms = 500
if len(parts) >= 3:
try:
period_ms = int(parts[2])
except ValueError:
print("ERR period_ms должен быть числом")
continue
lamp.blink(period_ms)
print(f"OK {lamp_id} blink {period_ms}")
else:
print("ERR state должен быть off/on/blink")
if __name__ == "__main__":
main()