Select Git revision
crud.py 2.96 KiB
"""
Module to interact with the database
"""
from datetime import date, datetime, time, timedelta
from numpy import average
from sqlalchemy.orm import Session
from sqlalchemy.sql import func
from db import models, schemas
def get_records(place: str, db: Session):
""" Get all the records for the given place """
records = db.query(models.Records).filter(models.Records.place == place).order_by(models.Records.date.desc()).all()
return records
def create_record(new_record: schemas.RecordBase, db: Session):
""" Add a new record to the database """
db_record = models.Records(**new_record.dict())
db.add(db_record)
db.commit()
db.refresh(db_record)
return db_record
def get_waiting_time(place: str, db: Session):
""" Get the last estimated waiting time for the given place """
db_record = db.query(models.Records).filter(models.Records.place == place).order_by(models.Records.date.desc()).first()
return db_record.waiting_time
def get_stats(place: str, weekday: int, min_time_hour: int, min_time_mn: int, max_time_hour: int, max_time_mn: int, interval: timedelta, db: Session):
""" Get the average waiting time for each interval between two time steps """
def shift_time(t: time, delta: timedelta):
return (datetime.combine(date(1, 1, 1), t) + delta).time()
def avg_time_query(start_time, end_time):
records = db.query(
(func.round(
func.avg(
3600 * func.extract('HOUR', models.Records.waiting_time) +
60 * func.extract('MINUTE', models.Records.waiting_time) +
func.extract('SECOND', models.Records.waiting_time))
)) / 60
).filter(
models.Records.place == place,
func.weekday(models.Records.date) == weekday,
(func.extract('HOUR', models.Records.date) > start_time.hour) |
((func.extract('HOUR', models.Records.date) == start_time.hour) &
(func.extract('MINUTE', models.Records.date) >= start_time.minute)),
(func.extract('HOUR', models.Records.date) < end_time.hour) |
((func.extract('HOUR', models.Records.date) == end_time.hour) &
(func.extract('MINUTE', models.Records.date) < end_time.minute)),
).one()
if records[0]:
return int(records[0])
return None
def add_slot(slots_list, start_time, end_time):
average_waiting_time = avg_time_query(start_time, end_time)
if average_waiting_time:
name = f'{start_time.hour:02}h{start_time.minute:02}'
slots_list.append({'name': name, 'time': average_waiting_time})
min_time, max_time = time(min_time_hour, min_time_mn), time(max_time_hour, max_time_mn)
stats = []
start_time, end_time = min_time, shift_time(min_time, interval)
while start_time < max_time:
add_slot(stats, start_time, end_time)
start_time, end_time = end_time, shift_time(end_time, interval)
return stats