Od włożenia karty SD do działającego programu, który wykrywa dowolny czujnik 1-Wire, pozwala go nazwać, monitoruje kilka naraz i zapisuje wszystko do pliku. Tłumaczone tak, jakbyś nigdy wcześniej nie widział terminala. Bo może i nie widziałeś.
Zanim wydasz oszczędności życia na sprzęt, którego nie użyjesz — przeczytaj obie kolumny.
Używamy Raspberry Pi Imager (wersja 2.0.8). Sam pobierze system i nagra kartę. Nie, nie musisz go „instalować ręcznie z terminala" — to nie 2009 rok.
Karta do Pi, zasilanie podłączone, odczekaj minutę czy dwie. Z laptopa połącz się przez SSH
(zamień pi i raspberrypi na swoje):
ssh pi@raspberrypi.localZaktualizuj system. To nudne, ale ważne, jak mycie zębów:
sudo apt update && sudo apt upgrade -yDS18B20 (i każdy inny czujnik 1-Wire) gada z Pi przez tryb 1-Wire. Domyślnie wyłączony.
sudo raspi-configW menu: 3 Interface Options → I7 1-Wire → Yes. Potem Finish i restart:
sudo reboot40 pinów w dwóch rzędach. Pin 1 jest w rogu (często z kwadratowym lutem). Nieparzyste w jednym rzędzie, parzyste w drugim. Bez czarów.
Trzy przewody. W wersji wodoodpornej zwykle: czerwony = zasilanie, czarny = masa, żółty (lub niebieski) = dane.
| Przewód | Kolor | Pin Raspberry Pi |
|---|---|---|
| VCC | R czerwony | pin 1 — 3V3 |
| DATA | Ż żółty | pin 7 — GPIO 4 |
| GND | C czarny | pin 9 — GND |
| Rezystor 4,7 kΩ | + | między DATA a 3V3 |
ls /sys/bus/w1/devices/Powinien pojawić się folder 28-... (po jednym na każdy podłączony czujnik). Odczyt surowy:
cat /sys/bus/w1/devices/28-*/w1_slaveNa końcu zobaczysz t=23437 = 23,437 °C.
DHT11 nie jest 1-Wire (mimo że ma jedną linię danych — mylące, wiem). Ma własną bibliotekę i siedzi na osobnym pinie: GPIO 17 (pin 11).
| Pin DHT11 | Kolor | Pin Raspberry Pi |
|---|---|---|
| + (VCC) | R czerwony | pin 2 — 5V |
| OUT | N niebieski | pin 11 — GPIO 17 |
| − (GND) | C czarny | pin 6 — GND |
Zanim odpalimy „prawdziwy" program, sprawdźmy czujniki gołymi poleceniami. Jak słuchanie silnika przed jazdą.
watch -n 2 'cat /sys/bus/w1/devices/28-*/w1_slave | grep -o "t=[0-9]*" | cut -d= -f2 | awk "{print \$1/1000 \" C\"}"'Wyjście: Ctrl+C.
python3 -c "import board, adafruit_dht, time; d=adafruit_dht.DHT11(board.D17);
while True:
try: print(d.temperature,'C', d.humidity,'%')
except RuntimeError: pass
time.sleep(2)"try/except. Nie psuje się — po prostu jest DHT11.
To dla studentów, więc tłumaczę jak małemu dziecku: każda linijka osobno, z wyjaśnieniem co robi i po co. Wklejaj po kolei, nie hurtem.
Python masz już w systemie. Dokładamy menedżer paczek (pip) i moduł „wirtualnych środowisk" (venv):
sudo apt install python3-pip python3-venv -yTworzymy folder i wchodzimy do niego. ~ oznacza Twój katalog domowy:
mkdir ~/czujniki cd ~/czujniki
To „osobne pudełko" na biblioteki, żeby nie zaśmiecać systemu. Flaga
--system-site-packages pozwala mu widzieć systemowe sterowniki GPIO:
python3 -m venv .venv --system-site-packagesPo tym poleceniu na początku linii pojawi się (.venv). To znaczy: jesteś w pudełku.
source .venv/bin/activatetextual rysuje kolorowy interfejs w terminalu, adafruit-circuitpython-dht obsługuje DHT11:
pip install textual adafruit-circuitpython-dhtNa nowym Raspberry Pi OS (Debian 13) DHT potrzebuje warstwy lgpio. Zainstaluj systemowo:
sudo apt install python3-lgpio -ycd ~/czujniki && source .venv/bin/activate.
Bez (.venv) na początku linii program powie, że nie widzi bibliotek — i będzie miał rację.
textual. DHT11 to jedyny rozpieszczony gość, który wymaga dodatków. Możesz pominąć kroki 6,
jeśli odpuszczasz wilgotność.
Ten program robi to, czego nie zrobi jednolinijkowiec: wykrywa wszystkie
czujniki 1-Wire, pozwala je dodać przyciskiem, nazwać
(i nadać nazwę kolumnie w pliku), monitoruje wiele naraz na jednym wykresie
i zapisuje do CSV. Ma też tryb --sim — udaje 3 czujniki, gdy nie masz sprzętu pod ręką.
cd ~/czujniki && source .venv/bin/activatenano czujniki.pypython3 czujniki.py --simpython3 czujniki.pyczujniki.py#!/usr/bin/env python3
"""
czujniki.py — monitor wielu czujnikow 1-Wire (np. DS18B20) + DHT11
Co potrafi:
- Wykrywa WSZYSTKIE czujniki na magistrali 1-Wire (/sys/bus/w1/devices/*).
Nie tylko DS18B20 (28-*), ale kazdy czujnik 1-Wire ktory wystawia "w1_slave".
- Przycisk "Dodaj czujnik" -> okienko ze swiezo przeskanowana lista znalezionych
urzadzen. Dla kazdego mozesz ustawic: wlasna nazwe + nazwe kolumny w pliku CSV.
- Mozesz dodac wiele czujnikow, kazdy ma swoja linie na wykresie i kolumne w pliku.
- Opcjonalnie DHT11 (temperatura + wilgotnosc) na wybranym pinie GPIO.
- Wykres ASCII na zywo, log, zapis do CSV w katalogu ./wyniki/
Wzorowany na 2dta.py (AGHOS Thermoanalytical System).
Wymagania:
pip install textual adafruit-circuitpython-dht
Uruchomienie:
python3 czujniki.py # prawdziwe czujniki
python3 czujniki.py --sim # symulator (bez sprzetu, do testow UI)
"""
import sys
import time
import glob
import math
import itertools
from datetime import datetime
from pathlib import Path
from textual import on
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Container, Horizontal, Vertical, VerticalScroll
from textual.reactive import reactive
from textual.screen import Screen, ModalScreen
from textual.widgets import Button, Input, Label, Log, Static, Checkbox
from textual.widget import Widget
# --- proba importu bibliotek sprzetowych DHT (jesli brak -> DHT niedostepny) ---
try:
import board
import adafruit_dht
DHT_AVAILABLE = True
except Exception:
DHT_AVAILABLE = False
RESULTS_DIR = Path("wyniki")
SYSTEM_HEADER = "# Monitor czujnikow 1-Wire + DHT11"
W1_BASE = "/sys/bus/w1/devices"
# Kolory/markery przydzielane kolejnym czujnikom 1-Wire na wykresie
W1_MARKERS = [
"[bold cyan]*[/]",
"[bold yellow]*[/]",
"[bold magenta]*[/]",
"[bold red]*[/]",
"[bold white]*[/]",
"[bold blue]*[/]",
]
# ══════════════════════════════════════════════════════════════════════════
# WYKRYWANIE I ODCZYT CZUJNIKOW 1-WIRE
# ══════════════════════════════════════════════════════════════════════════
def scan_w1_devices() -> list[str]:
"""
Zwraca liste ID wszystkich urzadzen 1-Wire widocznych dla kernela.
Pomija wpis 'w1_bus_master*'. Dziala dla DS18B20 (28-*) i innych
rodzin (10-*, 22-*, 3b-* itd.) — kazde co ma plik w1_slave.
"""
out = []
base = Path(W1_BASE)
if not base.exists():
return out
for p in sorted(base.iterdir()):
if p.name.startswith("w1_bus_master"):
continue
if (p / "w1_slave").exists():
out.append(p.name)
return out
def read_w1_temp(device_id: str) -> float | None:
"""
Odczyt temperatury z czujnika 1-Wire o danym ID.
Plik w1_slave konczy sie 'YES' (CRC ok) i zawiera 't=23456' (=23.456 C).
Dziala dla wiekszosci termometrow 1-Wire uzywajacych tego formatu.
"""
f = Path(W1_BASE) / device_id / "w1_slave"
try:
lines = f.read_text().splitlines()
if not lines or not lines[0].strip().endswith("YES"):
return None
idx = lines[1].find("t=")
if idx == -1:
return None
return float(lines[1][idx + 2:]) / 1000.0
except Exception:
return None
class DHT11Reader:
"""Odczyt temperatury i wilgotnosci z DHT11. Pojedyncze bledy sa normalne."""
def __init__(self, pin_name="D17"):
self._pin_name = pin_name
self._dev = None
self._last = (None, None)
def open(self):
if not DHT_AVAILABLE:
raise RuntimeError("Brak biblioteki adafruit_dht / board")
pin = getattr(board, self._pin_name)
self._dev = adafruit_dht.DHT11(pin)
def read(self):
if not self._dev:
return None, None
try:
t = self._dev.temperature
h = self._dev.humidity
if t is not None and h is not None:
self._last = (float(t), float(h))
except RuntimeError:
pass
except Exception:
pass
return self._last
def close(self):
try:
if self._dev:
self._dev.exit()
except Exception:
pass
self._dev = None
class SimulatedW1:
"""Symuluje kilka 'wirtualnych' czujnikow 1-Wire do testow bez sprzetu."""
FAKE_IDS = ["28-sim00000001", "28-sim00000002", "10-sim000000aa"]
def __init__(self):
self._start = time.monotonic()
def scan(self):
return list(self.FAKE_IDS)
def read(self, device_id):
t = time.monotonic() - self._start
# kazdy fejkowy czujnik ma lekko inny przebieg
seed = sum(ord(c) for c in device_id) % 10
return round(20.0 + seed + 3.0 * math.sin((t + seed) / 18), 3)
class SimulatedDHT:
def __init__(self):
self._start = time.monotonic()
def read(self):
t = time.monotonic() - self._start
return round(21.0 + 2.0 * math.sin(t / 25), 1), round(45.0 + 10.0 * math.sin(t / 30), 1)
# ══════════════════════════════════════════════════════════════════════════
# WYKRES ASCII (wiele linii temperatury + wilgotnosc DHT)
# ══════════════════════════════════════════════════════════════════════════
class AsciiChart(Widget):
DEFAULT_CSS = """
AsciiChart {
height: 1fr; width: 100%;
border: solid $accent; padding: 0 1; background: $surface;
}
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._temp_series: dict[str, list[float]] = {} # nazwa -> wartosci
self._temp_marker: dict[str, str] = {} # nazwa -> markup
self._h_dht: list[float] = []
self.full_view = False
def register_series(self, name: str, marker: str):
self._temp_series.setdefault(name, [])
self._temp_marker[name] = marker
def push_temp(self, name: str, value):
if value is not None and name in self._temp_series:
self._temp_series[name].append(value)
def push_humidity(self, value):
if value is not None:
self._h_dht.append(value)
def commit(self):
self.refresh()
def toggle_view(self):
self.full_view = not self.full_view
self.refresh()
def render(self) -> str:
all_temp = list(itertools.chain.from_iterable(self._temp_series.values()))
hum_vals = self._h_dht
if not all_temp and not hum_vals:
return "\n" * 3 + " [dim]Oczekiwanie na dane...[/dim]"
lo = min(all_temp) if all_temp else 0.0
hi = max(all_temp) if all_temp else 1.0
span = hi - lo if hi != lo else 1.0
lo_h, hi_h = 0.0, 100.0
span_h = hi_h - lo_h
h = max(4, self.size.height - 4)
w = max(10, self.size.width - 11 - 9)
grid = [[None] * w for _ in range(h)]
def plot(data, markup, lo_v, span_v):
if not data:
return
series = data if self.full_view else data[-w:]
total = max(len(series), 1)
for i, val in enumerate(series):
col = int(i * w / total) if self.full_view else i
if col >= w:
continue
y = h - 1 - int((val - lo_v) / span_v * (h - 1))
grid[max(0, min(h - 1, y))][col] = markup
for name, data in self._temp_series.items():
plot(data, self._temp_marker.get(name, "[bold cyan]*[/]"), lo, span)
plot(self._h_dht, "[bold #08ff08]*[/]", lo_h, span_h)
lines = []
for ri, row in enumerate(grid):
t_label = hi - (ri / max(h - 1, 1)) * span
h_label = hi_h - (ri / max(h - 1, 1)) * span_h
cells = "".join(c if c is not None else " " for c in row)
lines.append(f"{t_label:6.1f}\u00b0\u2502{cells}\u2502[#08ff08]{h_label:5.0f}%[/#08ff08]")
lines.append(" \u2514" + "\u2500" * w + "\u2518")
mode = "[bold green]CALOSC[/bold green]" if self.full_view else "[dim]ostatnie pkt (x=calosc)[/dim]"
legend = " ".join(
f"{self._temp_marker.get(n,'*')}={n}" for n in self._temp_series
)
if self._h_dht:
legend += " [bold #08ff08]*[/]=wilgotnosc"
lines.append(f" {mode} {legend}")
return "\n".join(lines)
class LiveValue(Static):
DEFAULT_CSS = """
LiveValue {
border: solid $accent; padding: 0 1; height: 100%; width: 1fr;
content-align: center middle; text-align: center;
}
LiveValue.error { border: solid red; color: red; }
LiveValue.ok { color: $text; }
"""
def set_value(self, label, value, unit=""):
if value is None:
self.add_class("error"); self.remove_class("ok")
self.update(f"[dim]{label}[/dim]\n[red]BRAK[/red]")
else:
self.remove_class("error"); self.add_class("ok")
self.update(f"[dim]{label}[/dim]\n[bold white]{value}[/bold white] {unit}")
# ══════════════════════════════════════════════════════════════════════════
# MODEL: konfiguracja jednego czujnika 1-Wire
# ══════════════════════════════════════════════════════════════════════════
class SensorEntry:
def __init__(self, device_id, display_name, column_name):
self.device_id = device_id # np. 28-3c01...
self.display_name = display_name # np. "Piec"
self.column_name = column_name # np. "T_piec[C]"
# ══════════════════════════════════════════════════════════════════════════
# OKIENKO: DODAJ CZUJNIK (skan + przypisanie nazwy i kolumny)
# ══════════════════════════════════════════════════════════════════════════
class AddSensorModal(ModalScreen):
"""
Modal: skanuje 1-Wire, pokazuje znalezione ID, pozwala dla kazdego
zaznaczyc checkbox i wpisac nazwe + nazwe kolumny. Zwraca liste SensorEntry.
"""
CSS = """
AddSensorModal { align: center middle; }
#add-box {
width: 84; height: auto; max-height: 90%;
border: double $accent; padding: 1 2; background: $surface;
}
#scroll { height: auto; max-height: 20; }
.row {
height: auto; layout: horizontal; margin-bottom: 1;
border: solid #304060; padding: 0 1;
}
.row Checkbox { width: 30; }
.row Input { width: 1fr; margin: 0 1; }
#add-btns { height: 3; margin-top: 1; layout: horizontal; }
.hint { color: #8899aa; }
"""
def __init__(self, already_added: set[str], **kwargs):
super().__init__(**kwargs)
self.already_added = already_added
self.found: list[str] = []
self._fresh: list[str] = []
def compose(self) -> ComposeResult:
with Container(id="add-box"):
yield Label("[bold]\U0001F50D Dodaj czujnik 1-Wire[/bold]\n")
yield Label("Skanowanie magistrali 1-Wire...", id="scan-info", classes="hint")
with VerticalScroll(id="scroll"):
pass # wiersze dodamy w on_mount
with Horizontal(id="add-btns"):
yield Button("Skanuj ponownie", variant="primary", id="btn-rescan")
yield Button("Dodaj zaznaczone", variant="success", id="btn-confirm")
yield Button("Anuluj", variant="error", id="btn-cancel")
def on_mount(self):
self._do_scan()
def _do_scan(self):
if self.app.sim_mode:
self.found = self.app.sim_w1.scan()
else:
self.found = scan_w1_devices()
scroll = self.query_one("#scroll", VerticalScroll)
for child in list(scroll.children):
child.remove()
info = self.query_one("#scan-info", Label)
self._fresh = [d for d in self.found if d not in self.already_added]
if not self.found:
info.update("[red]Nie znaleziono zadnych czujnikow 1-Wire.[/red] "
"Sprawdz 1-Wire w raspi-config i polaczenia.")
return
if not self._fresh:
info.update("[yellow]Wszystkie znalezione czujniki sa juz dodane.[/yellow]")
return
info.update(f"[green]Znaleziono {len(self._fresh)} nowy(ch) czujnik(ow).[/green] "
"Zaznacz, nadaj nazwe i nazwe kolumny:")
for idx, dev in enumerate(self._fresh):
row = Horizontal(classes="row")
scroll.mount(row)
family = dev.split("-")[0]
row.mount(Checkbox(f"{dev} (rodz. {family})", id=f"chk-{idx}", value=True))
row.mount(Input(value=f"czujnik_{idx+1}", placeholder="nazwa", id=f"name-{idx}"))
row.mount(Input(value=f"T_{idx+1}[C]", placeholder="nazwa kolumny", id=f"col-{idx}"))
@on(Button.Pressed, "#btn-rescan")
def rescan(self):
self._do_scan()
@on(Button.Pressed, "#btn-cancel")
def cancel(self):
self.dismiss([])
@on(Button.Pressed, "#btn-confirm")
def confirm(self):
result: list[SensorEntry] = []
for idx, dev in enumerate(self._fresh):
try:
chk = self.query_one(f"#chk-{idx}", Checkbox)
except Exception:
continue
if not chk.value:
continue
name = self.query_one(f"#name-{idx}", Input).value.strip() or dev
col = self.query_one(f"#col-{idx}", Input).value.strip() or f"{name}[C]"
result.append(SensorEntry(dev, name, col))
self.dismiss(result)
# ══════════════════════════════════════════════════════════════════════════
# EKRAN STARTOWY
# ══════════════════════════════════════════════════════════════════════════
class SetupScreen(Screen):
CSS = """
SetupScreen { align: center middle; }
#setup-box {
width: 86; height: auto; max-height: 92%;
border: double $accent; padding: 1 2; background: $surface;
}
Input { margin-bottom: 1; }
#sensor-list {
height: auto; max-height: 12; border: solid #304060;
padding: 0 1; margin-bottom: 1;
}
.srow { height: auto; }
#row-buttons { height: 3; }
#btn-row { height: 3; margin-top: 1; }
.hint { color: #8899aa; }
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.sensors: list[SensorEntry] = []
def compose(self) -> ComposeResult:
with Container(id="setup-box"):
yield Label("[bold]\U0001F321 Monitor czujnikow 1-Wire + DHT11[/bold]\n")
yield Label("Nazwa pliku z wynikami (bez rozszerzenia):")
yield Input(placeholder="np. pomiar_001", id="inp-filename")
yield Label("Interwal zapisu do pliku (sekundy):")
yield Input(value="5", id="inp-interval")
yield Label("\n[bold cyan]Czujniki 1-Wire[/bold cyan]")
yield Label("Brak dodanych czujnikow.", id="sensor-list-info", classes="hint")
with VerticalScroll(id="sensor-list"):
pass
with Horizontal(id="row-buttons"):
yield Button("\u2795 Dodaj czujnik", variant="primary", id="btn-add-sensor")
yield Button("\U0001F5D1 Wyczysc liste", variant="warning", id="btn-clear")
yield Label("\n[bold cyan]DHT11 (opcjonalnie)[/bold cyan]")
yield Checkbox("Uzyj czujnika DHT11 (temp + wilgotnosc)", id="chk-dht", value=False)
yield Label("Pin danych DHT11 (np. D17 = GPIO17, pin 11):")
yield Input(value="D17", id="inp-dht-pin")
with Horizontal(id="btn-row"):
yield Button("Anuluj", variant="error", id="btn-cancel")
yield Button("Start \u25B6", variant="success", id="btn-start")
def _refresh_sensor_list(self):
lst = self.query_one("#sensor-list", VerticalScroll)
for child in list(lst.children):
child.remove()
info = self.query_one("#sensor-list-info", Label)
if not self.sensors:
info.update("Brak dodanych czujnikow.")
return
info.update(f"[green]Dodano {len(self.sensors)} czujnik(ow):[/green]")
for s in self.sensors:
lst.mount(Static(
f" [cyan]{s.display_name}[/cyan] "
f"[dim]{s.device_id}[/dim] \u2192 kolumna: [yellow]{s.column_name}[/yellow]",
classes="srow"
))
@on(Button.Pressed, "#btn-add-sensor")
def add_sensor(self):
already = {s.device_id for s in self.sensors}
def _got(result):
if result:
have = {s.device_id for s in self.sensors}
for entry in result:
if entry.device_id not in have:
self.sensors.append(entry)
self._refresh_sensor_list()
self.app.push_screen(AddSensorModal(already), _got)
@on(Button.Pressed, "#btn-clear")
def clear_sensors(self):
self.sensors = []
self._refresh_sensor_list()
@on(Button.Pressed, "#btn-cancel")
def cancel(self):
self.app.exit()
@on(Button.Pressed, "#btn-start")
def start(self):
filename = self.query_one("#inp-filename", Input).value.strip()
interval = self.query_one("#inp-interval", Input).value.strip()
use_dht = self.query_one("#chk-dht", Checkbox).value
dht_pin = self.query_one("#inp-dht-pin", Input).value.strip() or "D17"
if not filename:
self.notify("Podaj nazwe pliku!", severity="error"); return
if not interval.isdigit() or int(interval) < 1:
self.notify("Interwal musi byc >= 1 s", severity="error"); return
if not self.sensors and not use_dht:
self.notify("Dodaj przynajmniej jeden czujnik (1-Wire lub DHT11)!",
severity="error"); return
cfg = dict(
filename=filename + ".csv",
interval=int(interval),
sensors=self.sensors,
use_dht=use_dht,
dht_pin=dht_pin,
)
self.app.push_screen(MeasurementScreen(cfg))
# ══════════════════════════════════════════════════════════════════════════
# EKRAN POMIARU
# ══════════════════════════════════════════════════════════════════════════
MEASUREMENT_CSS = """
MeasurementScreen { layout: vertical; height: 100%; }
#header-bar {
height: 2; width: 100%; background: $accent; color: $text;
content-align: center middle; text-style: bold;
}
#mid { height: 1fr; width: 100%; layout: horizontal; }
#left-panel { width: 2fr; height: 100%; layout: vertical; }
#right-panel {
width: 1fr; height: 100%; layout: vertical;
border-left: solid $accent; padding: 0 1;
}
#log-label { height: 2; width: 100%; }
#log-area { height: 1fr; width: 100%; border: solid $accent; }
#bottom-row { height: 6; width: 100%; layout: horizontal; }
#btn-stop {
height: 100%; width: 16; background: darkred; color: white;
border: solid red; text-style: bold; content-align: center middle;
}
#btn-stop:hover { background: red; }
#status-bar {
height: 2; width: 100%; background: $surface;
border-top: solid $accent; content-align: center middle;
}
"""
class MeasurementScreen(Screen):
BINDINGS = [
Binding("q", "quit_measure", "Zakoncz"),
Binding("x", "toggle_chart_view", "Widok wykresu [x]"),
]
CSS = MEASUREMENT_CSS
elapsed = reactive(0)
def __init__(self, config, **kwargs):
super().__init__(**kwargs)
self.config = config
self._running = False
self._dht = None
self._sim_dht = None
self._save_path = None
self._point_count = 0
self._markers = {}
for i, s in enumerate(config["sensors"]):
self._markers[s.display_name] = W1_MARKERS[i % len(W1_MARKERS)]
def compose(self) -> ComposeResult:
c = self.config
n = len(c["sensors"]) + (1 if c["use_dht"] else 0)
title = f"\U0001F321 Pomiar | {c['filename']} | {n} czujnik(ow) | zapis co {c['interval']}s"
yield Static(title, id="header-bar")
with Horizontal(id="mid"):
with Vertical(id="left-panel"):
yield AsciiChart(id="chart")
with Vertical(id="right-panel"):
yield Static("[bold]\U0001F4CB Log pomiarow[/bold]", id="log-label")
yield Log(id="log-area", auto_scroll=True)
with Horizontal(id="bottom-row"):
for i, s in enumerate(c["sensors"][:3]):
yield LiveValue(id=f"lv-w1-{i}", classes="ok")
if c["use_dht"]:
yield LiveValue(id="lv-dht-t", classes="ok")
yield LiveValue(id="lv-dht-h", classes="ok")
yield LiveValue(id="lv-elapsed", classes="ok")
yield Button("\u25A0 STOP", id="btn-stop")
yield Static("", id="status-bar")
def on_mount(self):
chart = self.query_one("#chart", AsciiChart)
for s in self.config["sensors"]:
chart.register_series(s.display_name, self._markers[s.display_name])
if self.config["use_dht"]:
if self.app.sim_mode:
self._sim_dht = SimulatedDHT()
else:
try:
self._dht = DHT11Reader(self.config["dht_pin"])
self._dht.open()
except Exception as e:
self.notify(f"DHT11: {e}", severity="warning", timeout=6)
self._init_file()
self._running = True
self.set_interval(self.config["interval"], self._measure)
self.set_interval(1, self._tick)
self.set_interval(1, self._refresh_ui)
def _init_file(self):
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
self._save_path = RESULTS_DIR / self.config["filename"]
if self._save_path.exists():
ts = datetime.now().strftime("%H%M%S")
self._save_path = RESULTS_DIR / f"{self._save_path.stem}_{ts}.csv"
cols = ["Data", "Godzina"]
cols += [s.column_name for s in self.config["sensors"]]
if self.config["use_dht"]:
cols += ["DHT11_temp[C]", "DHT11_wilgotnosc[%]"]
with open(self._save_path, "w") as f:
f.write(SYSTEM_HEADER + "\n")
f.write(f"# Plik: {self._save_path.stem}\n")
f.write(f"# Interwal: {self.config['interval']} s\n")
for s in self.config["sensors"]:
f.write(f"# Czujnik: {s.display_name} = {s.device_id} -> {s.column_name}\n")
f.write("# " + ",".join(cols) + "\n")
def _read_w1(self, device_id):
if self.app.sim_mode:
return self.app.sim_w1.read(device_id)
return read_w1_temp(device_id)
def _read_dht(self):
if self._sim_dht:
return self._sim_dht.read()
if self._dht:
return self._dht.read()
return None, None
async def _refresh_ui(self):
if not self._running:
return
chart = self.query_one("#chart", AsciiChart)
for i, s in enumerate(self.config["sensors"]):
val = self._read_w1(s.device_id)
chart.push_temp(s.display_name, val)
if i < 3:
self.query_one(f"#lv-w1-{i}", LiveValue).set_value(
s.display_name, f"{val:.2f}" if val is not None else None, "\u00b0C"
)
if self.config["use_dht"]:
t, h = self._read_dht()
chart.push_humidity(h)
self.query_one("#lv-dht-t", LiveValue).set_value("DHT temp", f"{t:.1f}" if t is not None else None, "\u00b0C")
self.query_one("#lv-dht-h", LiveValue).set_value("DHT wilg.", f"{h:.1f}" if h is not None else None, "%")
self.query_one("#lv-elapsed", LiveValue).set_value("Czas", self._fmt_elapsed(), "")
chart.commit()
async def _measure(self):
if not self._running:
return
now = datetime.now()
row = [now.strftime("%Y-%m-%d"), now.strftime("%H:%M:%S")]
log_bits = []
for s in self.config["sensors"]:
val = self._read_w1(s.device_id)
row.append(f"{val:.3f}" if val is not None else "")
log_bits.append(f"{s.display_name}:{('%.2f' % val) if val is not None else '--'}")
if self.config["use_dht"]:
t, h = self._read_dht()
row.append(f"{t:.1f}" if t is not None else "")
row.append(f"{h:.1f}" if h is not None else "")
log_bits.append(f"DHT:{('%.1f' % t) if t is not None else '--'}/{('%.1f' % h) if h is not None else '--'}%")
with open(self._save_path, "a") as f:
f.write(",".join(row) + "\n")
self._point_count += 1
self.query_one("#log-area", Log).write_line(
f"[{now:%H:%M:%S}] " + " ".join(log_bits)
)
self.query_one("#status-bar", Static).update(
f"Plik: {self._save_path.name} \u2502 Punktow: {self._point_count} \u2502 Czas: {self._fmt_elapsed()}"
)
def action_toggle_chart_view(self):
self.query_one("#chart", AsciiChart).toggle_view()
@on(Button.Pressed, "#btn-stop")
def on_stop_button(self):
self._stop()
def action_quit_measure(self):
self._stop()
def _stop(self):
if not self._running:
return
self._running = False
if self._dht:
self._dht.close()
self.app.push_screen(SummaryScreen(self._save_path, self._point_count))
def _tick(self):
self.elapsed += 1
def _fmt_elapsed(self):
e = self.elapsed
return f"{e//3600:02d}:{(e%3600)//60:02d}:{e%60:02d}"
class SummaryScreen(Screen):
CSS = """
SummaryScreen { align: center middle; }
#sum-box {
width: 70; height: auto; border: double green;
padding: 1 2; background: $surface;
}
"""
def __init__(self, save_path, point_count, **kwargs):
super().__init__(**kwargs)
self.save_path = save_path
self.point_count = point_count
def compose(self) -> ComposeResult:
with Container(id="sum-box"):
yield Label("[bold green]\u2713 Pomiar zakonczony[/bold green]\n")
yield Label(f"Plik: [bold]{self.save_path}[/bold]")
yield Label(f"Punktow: [bold]{self.point_count}[/bold]")
yield Label("")
yield Button("Nowy pomiar", variant="primary", id="btn-new")
yield Button("Wyjscie", variant="error", id="btn-exit")
@on(Button.Pressed, "#btn-new")
def new_measurement(self):
self.app.pop_screen()
@on(Button.Pressed, "#btn-exit")
def exit_app(self):
self.app.exit()
# ══════════════════════════════════════════════════════════════════════════
# APLIKACJA
# ══════════════════════════════════════════════════════════════════════════
class SensorApp(App):
CSS = """
App { background: #0a0f1a; }
Button { margin: 0 1; }
Input { background: #1a2740; border: solid #304060; color: #a0c4ff; }
Input:focus { border: solid #0984e3; }
Label { color: #8899aa; }
Checkbox { color: #a0c4ff; }
"""
TITLE = "Monitor czujnikow 1-Wire + DHT11"
def __init__(self, sim_mode=False, **kwargs):
super().__init__(**kwargs)
self.sim_mode = sim_mode
self.sim_w1 = SimulatedW1() if sim_mode else None
def on_mount(self):
self.push_screen(SetupScreen())
if __name__ == "__main__":
sim = "--sim" in sys.argv
SensorApp(sim_mode=sim).run()
T_piec[C]). Kliknij Dodaj zaznaczone.D17).~/czujniki/wyniki/ — otworzysz je w Excelu albo Arkuszach Google.
--sim generuje 3 wirtualne czujniki i fejkowy DHT, żebyś przeklikał cały
interfejs w pociągu, bez Raspberry Pi w plecaku. Idealny do nauki obsługi, zanim dotkniesz kabli.
| Objaw | Najczęstsza przyczyna i ratunek |
|---|---|
| Skan nic nie znajduje | 1-Wire niewłączone (raspi-config) lub brak restartu. Sprawdź też rezystor 4,7 kΩ i czy DATA siedzi na GPIO 4. |
| „board not found" / brak modułu | Nie włączone środowisko. Wpisz source .venv/bin/activate. Brak (.venv) = brak bibliotek. |
| DHT11 ciągle „BRAK" | Sprawdź pin (GPIO 17), zasilanie 5V i python3-lgpio. Pojedyncze błędy są normalne — program próbuje dalej. |
| Program mówi o czujniku, którego nie ma | Czujnik „zniknął" z magistrali (luźny styk). Kliknij Skanuj ponownie w okienku dodawania. |
| Interfejs się rozjeżdża | Powiększ okno terminala. TUI lubi przestrzeń, jak kot. |
| Dwa czujniki ten sam wykres, nie wiadomo który | To cel — każdy ma inny kolor markera, opisany w legendzie pod wykresem. |