Initial commit: MVP tidtakingssystem

- Backend: FastAPI, EXIF-parser, EasyOCR, SQLite
- Frontend: React admin (startliste, passeringer, gjennomgang, resultater)
- Docker: docker-compose med depot/processed/data-volumer

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-20 15:01:33 +01:00
commit 330ba7a93d
35 changed files with 5038 additions and 0 deletions
+20
View File
@@ -0,0 +1,20 @@
FROM python:3.12-slim
WORKDIR /app
# Systemavhengigheter for OpenCV og EasyOCR
RUN apt-get update && apt-get install -y --no-install-recommends \
libglib2.0-0 \
libgl1 \
libgomp1 \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Kataloger monteres som volumer fra docker-compose
RUN mkdir -p /depot /processed /data
CMD ["uvicorn", "api:app", "--host", "0.0.0.0", "--port", "8000"]
+177
View File
@@ -0,0 +1,177 @@
"""
FastAPI REST API for timing-systemet.
"""
import asyncio
import logging
from contextlib import asynccontextmanager
from pathlib import Path
import aiosqlite
from fastapi import Depends, FastAPI, File, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import Optional
from ingest import watch_depot, process_existing
from passage_log import delete_passage, get_passages, log_passage, resolve_passage
from profile_db import (
clear_startlist,
delete_athlete,
get_db,
import_startlist_csv,
list_athletes,
upsert_athlete,
)
from results import get_results
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Behandle bilder som lå i depot fra før oppstart
await process_existing()
# Start fil-overvåkning i bakgrunnen
asyncio.create_task(watch_depot())
yield
app = FastAPI(title="Timing API", version="0.1.0", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# --- DB-avhengighet ---
async def get_connection():
db = await get_db()
try:
yield db
finally:
await db.close()
# =====================
# Startliste-endepunkter
# =====================
@app.get("/api/athletes")
async def list_athletes_endpoint(db=Depends(get_connection)):
return await list_athletes(db)
@app.post("/api/athletes/import", summary="Last opp startliste som CSV")
async def import_csv(
file: UploadFile = File(...),
db=Depends(get_connection),
):
if not file.filename.endswith(".csv"):
raise HTTPException(400, "Kun CSV-filer støttes")
content = (await file.read()).decode("utf-8-sig") # utf-8-sig håndterer BOM
result = await import_startlist_csv(db, content)
return result
@app.delete("/api/athletes/all")
async def clear_all_athletes(db=Depends(get_connection)):
await clear_startlist(db)
return {"ok": True}
@app.delete("/api/athletes/{profile_id}")
async def remove_athlete(profile_id: str, db=Depends(get_connection)):
ok = await delete_athlete(db, profile_id)
if not ok:
raise HTTPException(404, "Utøver ikke funnet")
return {"ok": True}
# =====================
# Passeringer
# =====================
@app.get("/api/passages")
async def list_passages(
station: Optional[str] = None,
needs_review: Optional[bool] = None,
profile_id: Optional[str] = None,
db=Depends(get_connection),
):
return await get_passages(db, profile_id=profile_id, station=station, needs_review=needs_review)
@app.get("/api/passages/review", summary="Hent passeringer som trenger manuell gjennomgang")
async def review_queue(db=Depends(get_connection)):
return await get_passages(db, needs_review=True)
class ResolveRequest(BaseModel):
profile_id: Optional[str] = None
bib_number: Optional[str] = None
review_note: Optional[str] = None
@app.post("/api/passages/{passage_id}/resolve")
async def resolve(passage_id: str, body: ResolveRequest, db=Depends(get_connection)):
ok = await resolve_passage(
db,
passage_id,
profile_id=body.profile_id,
bib_number=body.bib_number,
review_note=body.review_note,
)
if not ok:
raise HTTPException(404, "Passering ikke funnet")
return {"ok": True}
@app.delete("/api/passages/{passage_id}")
async def remove_passage(passage_id: str, db=Depends(get_connection)):
ok = await delete_passage(db, passage_id)
if not ok:
raise HTTPException(404, "Passering ikke funnet")
return {"ok": True}
# =====================
# Resultater
# =====================
@app.get("/api/results")
async def get_results_endpoint(db=Depends(get_connection)):
return await get_results(db)
# =====================
# Bilder
# =====================
@app.get("/api/images/{path:path}")
async def serve_image(path: str):
"""Lever behandlede bilder til frontend."""
full_path = Path("/processed") / path
if not full_path.exists() or not full_path.is_file():
raise HTTPException(404, "Bilde ikke funnet")
# Sjekk at stien er innenfor /processed (path traversal)
try:
full_path.resolve().relative_to(Path("/processed").resolve())
except ValueError:
raise HTTPException(403, "Forbudt")
return FileResponse(full_path)
# =====================
# Helse
# =====================
@app.get("/api/health")
async def health():
return {"status": "ok"}
+156
View File
@@ -0,0 +1,156 @@
"""
Parse og normaliser EXIF-metadata fra bilder.
Returnerer et strukturert dict eller kaster ExifError ved manglende/ugyldig data.
"""
import re
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import piexif
from PIL import Image
class ExifError(ValueError):
pass
@dataclass
class ImageMetadata:
timestamp_utc: datetime
gps_lat: float
gps_lon: float
gps_alt: Optional[float]
station: Optional[str] # fra ImageDescription eller CameraLabel
def _rational_to_float(rational) -> float:
"""Konverter EXIF rational (teller, nevner) til float."""
if isinstance(rational, tuple):
num, den = rational
if den == 0:
return 0.0
return num / den
return float(rational)
def _parse_gps_coord(dms_rationals, ref: str) -> float:
"""Konverter GPS DMS rationals + referanse til desimalgrader."""
deg = _rational_to_float(dms_rationals[0])
min_ = _rational_to_float(dms_rationals[1])
sec = _rational_to_float(dms_rationals[2])
value = deg + min_ / 60.0 + sec / 3600.0
if ref in ("S", "W"):
value = -value
return value
def _parse_timestamp(exif_dict: dict) -> datetime:
"""
Hent tidsstempel fra DateTimeOriginal + SubSecTimeOriginal.
Returnerer datetime i UTC. Kaster ExifError hvis manglende.
"""
ifd0 = exif_dict.get("Exif", {})
raw_dt = ifd0.get(piexif.ExifIFD.DateTimeOriginal)
if not raw_dt:
raise ExifError("Mangler DateTimeOriginal i EXIF")
if isinstance(raw_dt, bytes):
raw_dt = raw_dt.decode("ascii", errors="replace")
# Format: "2024:06:15 10:23:45"
try:
dt = datetime.strptime(raw_dt.strip(), "%Y:%m:%d %H:%M:%S")
except ValueError as e:
raise ExifError(f"Ugyldig DateTimeOriginal-format: {raw_dt!r}") from e
# Sub-sekunder
raw_sub = ifd0.get(piexif.ExifIFD.SubSecTimeOriginal)
microseconds = 0
if raw_sub:
if isinstance(raw_sub, bytes):
raw_sub = raw_sub.decode("ascii", errors="replace")
sub_str = raw_sub.strip()
if sub_str.isdigit():
# Pad/truncate til 6 siffer (mikrosekunder)
sub_str = (sub_str + "000000")[:6]
microseconds = int(sub_str)
dt = dt.replace(microsecond=microseconds, tzinfo=timezone.utc)
return dt
def _parse_gps(exif_dict: dict) -> tuple[float, float, Optional[float]]:
"""
Hent GPS-koordinater. Kaster ExifError hvis lat/lon mangler.
"""
gps = exif_dict.get("GPS", {})
lat_dms = gps.get(piexif.GPSIFD.GPSLatitude)
lat_ref = gps.get(piexif.GPSIFD.GPSLatitudeRef)
lon_dms = gps.get(piexif.GPSIFD.GPSLongitude)
lon_ref = gps.get(piexif.GPSIFD.GPSLongitudeRef)
if not (lat_dms and lat_ref and lon_dms and lon_ref):
raise ExifError("Mangler GPS-koordinater (lat/lon) i EXIF")
if isinstance(lat_ref, bytes):
lat_ref = lat_ref.decode("ascii")
if isinstance(lon_ref, bytes):
lon_ref = lon_ref.decode("ascii")
lat = _parse_gps_coord(lat_dms, lat_ref)
lon = _parse_gps_coord(lon_dms, lon_ref)
alt = None
alt_raw = gps.get(piexif.GPSIFD.GPSAltitude)
if alt_raw:
alt = _rational_to_float(alt_raw)
alt_ref = gps.get(piexif.GPSIFD.GPSAltitudeRef, 0)
if alt_ref == 1:
alt = -alt
return lat, lon, alt
def _parse_station(exif_dict: dict) -> Optional[str]:
"""Hent stasjonsnavn fra ImageDescription."""
ifd0 = exif_dict.get("0th", {})
desc = ifd0.get(piexif.ImageIFD.ImageDescription)
if desc:
if isinstance(desc, bytes):
desc = desc.decode("utf-8", errors="replace")
return desc.strip() or None
return None
def parse_image(path: Path) -> ImageMetadata:
"""
Les EXIF fra bildefil og returner ImageMetadata.
Kaster ExifError hvis påkrevde felt mangler.
"""
try:
img = Image.open(path)
raw_exif = img.info.get("exif")
if not raw_exif:
raise ExifError("Bildet har ingen EXIF-data")
exif_dict = piexif.load(raw_exif)
except ExifError:
raise
except Exception as e:
raise ExifError(f"Kunne ikke lese EXIF: {e}") from e
timestamp = _parse_timestamp(exif_dict)
lat, lon, alt = _parse_gps(exif_dict)
station = _parse_station(exif_dict)
return ImageMetadata(
timestamp_utc=timestamp,
gps_lat=lat,
gps_lon=lon,
gps_alt=alt,
station=station,
)
+170
View File
@@ -0,0 +1,170 @@
"""
Bildehåndtering:
- Overvåk depot/-katalogen for nye bilder
- Valider EXIF
- Kjør OCR
- Flytt til processed/ med unikt filnavn
- Logg passering til DB
Kan kjøres som egen prosess (python ingest.py) eller importeres av API.
"""
import asyncio
import logging
import shutil
import uuid
from pathlib import Path
import aiosqlite
from watchdog.events import FileSystemEventHandler, FileCreatedEvent
from watchdog.observers import Observer
from exif_parser import ExifError, parse_image
from ocr import read_bib
from passage_log import log_passage
from profile_db import get_athlete_by_bib, init_db
logger = logging.getLogger(__name__)
DEPOT_DIR = Path("/depot")
PROCESSED_DIR = Path("/processed")
REJECTED_DIR = DEPOT_DIR / "rejected"
DB_PATH = "/data/timing.db"
# Konfidens-terskel for automatisk logging
MIN_AUTO_CONFIDENCE = 0.75
VALID_SUFFIXES = {".jpg", ".jpeg", ".png"}
def _destination_path(source: Path, timestamp) -> Path:
"""
Bygg destinasjonssti: processed/<år>/<måned>/<uuid>_<originalfilnavn>
"""
year = timestamp.strftime("%Y")
month = timestamp.strftime("%m")
unique_name = f"{uuid.uuid4().hex}_{source.name}"
dest = PROCESSED_DIR / year / month / unique_name
dest.parent.mkdir(parents=True, exist_ok=True)
return dest
async def process_image(path: Path) -> None:
"""
Behandle ett bilde: valider EXIF, kjør OCR, flytt fil, logg passering.
"""
if path.suffix.lower() not in VALID_SUFFIXES:
logger.debug("Ignorerer ikke-bilde: %s", path)
return
logger.info("Behandler: %s", path.name)
# --- EXIF-validering ---
try:
meta = parse_image(path)
except ExifError as e:
logger.warning("Ugyldig EXIF i %s: %s — avviser", path.name, e)
REJECTED_DIR.mkdir(parents=True, exist_ok=True)
shutil.move(str(path), str(REJECTED_DIR / path.name))
return
# --- OCR ---
ocr = read_bib(path)
logger.debug("OCR: digits=%s conf=%.2f", ocr.digits, ocr.confidence)
# --- Flytt til processed/ ---
dest = _destination_path(path, meta.timestamp_utc)
shutil.move(str(path), str(dest))
logger.info("Flyttet til: %s", dest)
# --- Bestem konfidens og review-flagg ---
confidence = ocr.confidence
needs_review = False
review_note = None
id_method = "bib_ocr"
if ocr.digits is None or confidence < MIN_AUTO_CONFIDENCE:
needs_review = True
review_note = "number_unreadable" if ocr.digits is None else "low_confidence"
id_method = "bib_ocr_uncertain"
# --- Koble mot profil-DB ---
profile_id = None
bib_number = ocr.digits
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
await init_db(db)
if bib_number and not needs_review:
athlete = await get_athlete_by_bib(db, bib_number)
if athlete:
profile_id = athlete["profile_id"]
else:
logger.debug("Ukjent startnummer: %s", bib_number)
await log_passage(
db,
profile_id=profile_id,
bib_number=bib_number,
station=meta.station or "unknown",
timestamp_utc=meta.timestamp_utc,
gps_lat=meta.gps_lat,
gps_lon=meta.gps_lon,
gps_alt=meta.gps_alt,
confidence=confidence,
id_method=id_method,
source_image=str(dest),
needs_review=needs_review,
review_note=review_note,
)
logger.info(
"Passering logget: bib=%s station=%s needs_review=%s",
bib_number, meta.station, needs_review,
)
async def process_existing() -> None:
"""Behandle bilder som allerede ligger i depot/ ved oppstart."""
for path in sorted(DEPOT_DIR.glob("*")):
if path.is_file() and path.suffix.lower() in VALID_SUFFIXES:
await process_image(path)
class DepotHandler(FileSystemEventHandler):
"""Watchdog-handler: kaller process_image ved nye filer."""
def __init__(self, loop: asyncio.AbstractEventLoop):
self._loop = loop
def on_created(self, event: FileCreatedEvent):
if not event.is_directory:
path = Path(event.src_path)
asyncio.run_coroutine_threadsafe(process_image(path), self._loop)
async def watch_depot() -> None:
"""Start filsystem-overvåkning av depot/."""
loop = asyncio.get_running_loop()
handler = DepotHandler(loop)
observer = Observer()
observer.schedule(handler, str(DEPOT_DIR), recursive=False)
observer.start()
logger.info("Overvåker depot: %s", DEPOT_DIR)
try:
while True:
await asyncio.sleep(1)
finally:
observer.stop()
observer.join()
async def main() -> None:
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
await process_existing()
await watch_depot()
if __name__ == "__main__":
asyncio.run(main())
+108
View File
@@ -0,0 +1,108 @@
"""
Startnummer-deteksjon og OCR.
Bruker EasyOCR for å lese sifre fra bib-nummerlapp.
Returnerer en OcrResult med tall, konfidens og partial-flagg.
"""
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
import cv2
import numpy as np
# EasyOCR lastes lazy for å unngå lang oppstartstid
_reader = None
def _get_reader():
global _reader
if _reader is None:
import easyocr
_reader = easyocr.Reader(["en"], gpu=False, verbose=False)
return _reader
@dataclass
class OcrResult:
digits: Optional[str] # Gjenkjente sifre, f.eks. "42", None hvis ingen
confidence: float # 0.01.0
partial: bool # True hvis nummeret trolig er delvis skjult
raw_texts: list[str] = field(default_factory=list) # Alle OCR-treff for debug
def _preprocess(image_path: Path) -> np.ndarray:
"""Forbered bilde for OCR: gråskala, kontrast, skarphet."""
img = cv2.imread(str(image_path))
if img is None:
raise ValueError(f"Kunne ikke lese bilde: {image_path}")
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# CLAHE for bedre kontrast i varierende lys
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
enhanced = clahe.apply(gray)
# Skaler opp hvis bildet er lite (OCR er bedre på større tekst)
h, w = enhanced.shape
if max(h, w) < 800:
scale = 800 / max(h, w)
enhanced = cv2.resize(
enhanced, (int(w * scale), int(h * scale)),
interpolation=cv2.INTER_CUBIC,
)
return enhanced
def _extract_bib_number(texts: list[tuple]) -> tuple[Optional[str], float, bool]:
"""
Finn beste siffersekvens blant OCR-treff.
Returnerer (sifre, konfidens, partial).
"""
candidates = []
for (_, text, conf) in texts:
# Behold kun sifre
digits = re.sub(r"[^0-9]", "", text)
if digits:
candidates.append((digits, float(conf)))
if not candidates:
return None, 0.0, False
# Velg kandidat med høyest konfidens
best_digits, best_conf = max(candidates, key=lambda x: x[1])
# Heuristikk: 12 sifre kan tyde på delvis synlig nummer
partial = len(best_digits) < 2
return best_digits, best_conf, partial
def read_bib(image_path: Path) -> OcrResult:
"""
Les startnummer fra bildet.
Returnerer OcrResult. Aldri exception — fallback til konfidens 0 ved feil.
"""
try:
processed = _preprocess(image_path)
reader = _get_reader()
results = reader.readtext(processed, detail=1, paragraph=False)
raw_texts = [text for (_, text, _) in results]
digits, confidence, partial = _extract_bib_number(results)
return OcrResult(
digits=digits,
confidence=confidence,
partial=partial,
raw_texts=raw_texts,
)
except Exception as e:
return OcrResult(
digits=None,
confidence=0.0,
partial=False,
raw_texts=[f"ERROR: {e}"],
)
+119
View File
@@ -0,0 +1,119 @@
"""
Skriv og query passeringslogg i SQLite.
"""
import uuid
from datetime import datetime
from typing import Optional
import aiosqlite
async def log_passage(
db: aiosqlite.Connection,
*,
profile_id: Optional[str],
bib_number: Optional[str],
station: str,
timestamp_utc: datetime,
gps_lat: float,
gps_lon: float,
gps_alt: Optional[float],
confidence: float,
id_method: str,
source_image: str,
needs_review: bool = False,
review_note: Optional[str] = None,
) -> str:
"""Logg én passering. Returnerer passage_id."""
passage_id = str(uuid.uuid4())
await db.execute(
"""
INSERT INTO passages (
passage_id, profile_id, bib_number, station,
timestamp_utc, gps_lat, gps_lon, gps_alt,
confidence, id_method, source_image,
needs_review, review_note
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
passage_id,
profile_id,
bib_number,
station,
timestamp_utc.isoformat(),
gps_lat,
gps_lon,
gps_alt,
confidence,
id_method,
source_image,
int(needs_review),
review_note,
),
)
await db.commit()
return passage_id
async def get_passages(
db: aiosqlite.Connection,
profile_id: Optional[str] = None,
station: Optional[str] = None,
needs_review: Optional[bool] = None,
) -> list[dict]:
"""Hent passeringer med valgfrie filtre."""
clauses = []
params = []
if profile_id is not None:
clauses.append("p.profile_id = ?")
params.append(profile_id)
if station is not None:
clauses.append("p.station = ?")
params.append(station)
if needs_review is not None:
clauses.append("p.needs_review = ?")
params.append(int(needs_review))
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
query = f"""
SELECT p.*, a.name, a.club
FROM passages p
LEFT JOIN athletes a ON a.profile_id = p.profile_id
{where}
ORDER BY p.timestamp_utc
"""
async with db.execute(query, params) as cur:
rows = await cur.fetchall()
return [dict(r) for r in rows]
async def resolve_passage(
db: aiosqlite.Connection,
passage_id: str,
profile_id: Optional[str],
bib_number: Optional[str],
review_note: Optional[str] = None,
) -> bool:
"""Manuell oppdatering av en passering etter gjennomgang."""
cur = await db.execute(
"""
UPDATE passages
SET profile_id = ?, bib_number = ?, needs_review = 0,
review_note = ?, id_method = 'manual'
WHERE passage_id = ?
""",
(profile_id, bib_number, review_note, passage_id),
)
await db.commit()
return cur.rowcount > 0
async def delete_passage(db: aiosqlite.Connection, passage_id: str) -> bool:
cur = await db.execute(
"DELETE FROM passages WHERE passage_id = ?", (passage_id,)
)
await db.commit()
return cur.rowcount > 0
+151
View File
@@ -0,0 +1,151 @@
"""
CRUD for utøverprofiler og startliste.
Bruker aiosqlite for async tilgang fra FastAPI.
"""
import csv
import io
import uuid
from typing import Optional
import aiosqlite
DB_PATH = "/data/timing.db"
async def init_db(db: aiosqlite.Connection) -> None:
await db.executescript("""
CREATE TABLE IF NOT EXISTS athletes (
profile_id TEXT PRIMARY KEY,
bib_number TEXT UNIQUE,
name TEXT,
club TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS passages (
passage_id TEXT PRIMARY KEY,
profile_id TEXT REFERENCES athletes(profile_id),
bib_number TEXT,
station TEXT NOT NULL,
timestamp_utc TEXT NOT NULL,
gps_lat REAL,
gps_lon REAL,
gps_alt REAL,
confidence REAL,
id_method TEXT,
source_image TEXT,
needs_review INTEGER NOT NULL DEFAULT 0,
review_note TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_passages_profile ON passages(profile_id);
CREATE INDEX IF NOT EXISTS idx_passages_station ON passages(station);
CREATE INDEX IF NOT EXISTS idx_passages_needs_review ON passages(needs_review);
""")
await db.commit()
async def get_db() -> aiosqlite.Connection:
db = await aiosqlite.connect(DB_PATH)
db.row_factory = aiosqlite.Row
await init_db(db)
return db
# --- Utøverprofiler ---
async def upsert_athlete(
db: aiosqlite.Connection,
bib_number: str,
name: str,
club: Optional[str] = None,
) -> str:
"""Opprett eller oppdater utøver basert på startnummer. Returnerer profile_id."""
async with db.execute(
"SELECT profile_id FROM athletes WHERE bib_number = ?", (bib_number,)
) as cur:
row = await cur.fetchone()
if row:
profile_id = row["profile_id"]
await db.execute(
"UPDATE athletes SET name = ?, club = ? WHERE profile_id = ?",
(name, club, profile_id),
)
else:
profile_id = str(uuid.uuid4())
await db.execute(
"INSERT INTO athletes (profile_id, bib_number, name, club) VALUES (?, ?, ?, ?)",
(profile_id, bib_number, name, club),
)
await db.commit()
return profile_id
async def import_startlist_csv(db: aiosqlite.Connection, csv_content: str) -> dict:
"""
Importer startliste fra CSV-streng.
Forventet kolonner: bib_number, name (og valgfritt: club)
Returnerer {'imported': N, 'errors': [...]}
"""
reader = csv.DictReader(io.StringIO(csv_content))
imported = 0
errors = []
# Normaliser kolonnenavn (fjern whitespace, lowercase)
fieldnames = [f.strip().lower() for f in (reader.fieldnames or [])]
if "bib_number" not in fieldnames and "bib" not in fieldnames:
return {"imported": 0, "errors": ["CSV mangler 'bib_number'-kolonne"]}
if "name" not in fieldnames:
return {"imported": 0, "errors": ["CSV mangler 'name'-kolonne"]}
bib_col = "bib_number" if "bib_number" in fieldnames else "bib"
for i, row in enumerate(reader, start=2):
# Normaliser nøkler
row = {k.strip().lower(): v.strip() for k, v in row.items()}
bib = row.get(bib_col, "").strip()
name = row.get("name", "").strip()
club = row.get("club", "").strip() or None
if not bib or not name:
errors.append(f"Rad {i}: mangler bib eller navn")
continue
try:
await upsert_athlete(db, bib, name, club)
imported += 1
except Exception as e:
errors.append(f"Rad {i}: {e}")
return {"imported": imported, "errors": errors}
async def get_athlete_by_bib(db: aiosqlite.Connection, bib: str) -> Optional[dict]:
async with db.execute(
"SELECT * FROM athletes WHERE bib_number = ?", (bib,)
) as cur:
row = await cur.fetchone()
return dict(row) if row else None
async def list_athletes(db: aiosqlite.Connection) -> list[dict]:
async with db.execute(
"SELECT * FROM athletes ORDER BY CAST(bib_number AS INTEGER), bib_number"
) as cur:
rows = await cur.fetchall()
return [dict(r) for r in rows]
async def delete_athlete(db: aiosqlite.Connection, profile_id: str) -> bool:
cur = await db.execute(
"DELETE FROM athletes WHERE profile_id = ?", (profile_id,)
)
await db.commit()
return cur.rowcount > 0
async def clear_startlist(db: aiosqlite.Connection) -> None:
await db.execute("DELETE FROM athletes")
await db.commit()
+9
View File
@@ -0,0 +1,9 @@
fastapi==0.115.6
uvicorn[standard]==0.34.0
pillow==11.1.0
piexif==1.1.3
easyocr==1.7.2
opencv-python-headless==4.11.0.86
python-multipart==0.0.20
watchdog==6.0.0
aiosqlite==0.21.0
+125
View File
@@ -0,0 +1,125 @@
"""
Beregn split-tider og totalresultat fra passeringsloggen.
"""
from datetime import datetime
from typing import Optional
import aiosqlite
async def get_results(db: aiosqlite.Connection) -> list[dict]:
"""
Hent totalresultat for alle utøvere som har passert start og mål.
Returnerer sortert liste med split-tider.
"""
# Hent alle bekreftede passeringer gruppert per utøver
async with db.execute("""
SELECT p.profile_id, p.bib_number, a.name, a.club,
p.station, p.timestamp_utc
FROM passages p
LEFT JOIN athletes a ON a.profile_id = p.profile_id
WHERE p.needs_review = 0 AND p.profile_id IS NOT NULL
ORDER BY p.profile_id, p.timestamp_utc
""") as cur:
rows = await cur.fetchall()
# Grupper per utøver
athletes: dict[str, dict] = {}
for row in rows:
pid = row["profile_id"]
if pid not in athletes:
athletes[pid] = {
"profile_id": pid,
"bib_number": row["bib_number"],
"name": row["name"],
"club": row["club"],
"passages": [],
}
athletes[pid]["passages"].append({
"station": row["station"],
"timestamp_utc": row["timestamp_utc"],
})
results = []
for pid, data in athletes.items():
passages = data["passages"]
if not passages:
continue
# Finn start og mål
start_p = next((p for p in passages if p["station"] == "start"), None)
finish_p = next((p for p in passages if p["station"] == "finish"), None)
start_time = _parse_ts(start_p["timestamp_utc"]) if start_p else None
finish_time = _parse_ts(finish_p["timestamp_utc"]) if finish_p else None
total_seconds = None
if start_time and finish_time:
total_seconds = (finish_time - start_time).total_seconds()
# Split-tider mellom alle stasjoner
splits = []
prev_ts = start_time
prev_station = "start"
for p in passages:
if p["station"] == "start":
continue
ts = _parse_ts(p["timestamp_utc"])
split_s = (ts - prev_ts).total_seconds() if prev_ts else None
splits.append({
"from": prev_station,
"to": p["station"],
"split_seconds": split_s,
"split_formatted": _fmt_seconds(split_s),
"timestamp_utc": p["timestamp_utc"],
})
prev_ts = ts
prev_station = p["station"]
results.append({
"profile_id": pid,
"bib_number": data["bib_number"],
"name": data["name"] or f"Ukjent ({data['bib_number']})",
"club": data["club"],
"start_time": start_p["timestamp_utc"] if start_p else None,
"finish_time": finish_p["timestamp_utc"] if finish_p else None,
"total_seconds": total_seconds,
"total_formatted": _fmt_seconds(total_seconds),
"splits": splits,
"dnf": finish_time is None and start_time is not None,
"dns": start_time is None,
})
# Sorter: fullført (etter tid), DNF, DNS
results.sort(key=_sort_key)
return results
def _parse_ts(ts_str: str) -> Optional[datetime]:
if not ts_str:
return None
try:
return datetime.fromisoformat(ts_str)
except ValueError:
return None
def _fmt_seconds(seconds: Optional[float]) -> Optional[str]:
if seconds is None:
return None
seconds = int(seconds)
h = seconds // 3600
m = (seconds % 3600) // 60
s = seconds % 60
if h:
return f"{h}:{m:02d}:{s:02d}"
return f"{m}:{s:02d}"
def _sort_key(r: dict):
if r["dns"]:
return (3, 0)
if r["dnf"]:
return (2, 0)
return (1, r["total_seconds"] or float("inf"))