Select Git revision
This project manages its dependencies using Yarn.
Learn more
crud.py 13.79 KiB
"""
Module to interact with the database
"""
from datetime import date, datetime, time, timedelta
from sqlalchemy.orm import Session
from sqlalchemy.sql import func
import pytz
from db import models, schemas
# Define CRUD operation to collect the statistics
def get_waiting_time(place: str, db: Session):
""" Get the last estimated waiting time for the given place """
current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
weekday, current_time = current_date.weekday(), current_date.time()
data = {"status": False, "waiting_time": None, "next_timetable": None}
first_timeslot = get_timeslot(place, weekday, True, db)
if first_timeslot and current_time < first_timeslot[0]:
data["next_timetable"] = "{:d}h{:02d}".format(first_timeslot[0].hour, first_timeslot[0].minute)
return data
elif first_timeslot and current_time <= first_timeslot[1]:
last_record = db.query(
models.Records.waiting_time
).filter(
models.Records.place == place
).order_by(
models.Records.date.desc()
).first()
if last_record:
waiting_time = last_record.waiting_time
waiting_time = round(waiting_time.total_seconds() / 60)
data["status"] = True
data["waiting_time"] = waiting_time
return data
second_timeslot = get_timeslot(place, weekday, False, db)
if second_timeslot and current_time < second_timeslot[0]:
data["next_timetable"] = "{:d}h{:02d}".format(second_timeslot[0].hour, second_timeslot[0].minute)
return data
elif second_timeslot and current_time <= second_timeslot[1]:
last_record = db.query(
models.Records.waiting_time
).filter(
models.Records.place == place
).order_by(
models.Records.date.desc()
).first()
if last_record:
waiting_time = last_record.waiting_time
waiting_time = round(waiting_time.total_seconds() / 60)
data["status"] = True
data["waiting_time"] = waiting_time
return data
return data
def get_avg_graph_points(place: str, weekday: int, min_time: time, max_time: time, interval: timedelta, db: Session):
""" Get the average waiting time for each interval between two time steps """
def shift_time(t: time, delta: timedelta):
return (datetime.combine(date(1, 1, 1), t) + delta).time()
def avg_time_query(start_time, end_time):
records = db.query(
(func.round(
func.avg(
3600 * func.extract('HOUR', models.Records.waiting_time) +
60 * func.extract('MINUTE', models.Records.waiting_time) +
func.extract('SECOND', models.Records.waiting_time))
)) / 60
).filter(
models.Records.place == place,
func.weekday(models.Records.date) == weekday,
(func.extract('HOUR', models.Records.date) > start_time.hour) |
((func.extract('HOUR', models.Records.date) == start_time.hour) &
(func.extract('MINUTE', models.Records.date) >= start_time.minute)),
(func.extract('HOUR', models.Records.date) < end_time.hour) |
((func.extract('HOUR', models.Records.date) == end_time.hour) &
(func.extract('MINUTE', models.Records.date) < end_time.minute)),
).one()
if records[0]:
return int(records[0])
return None
def add_slot(slots_list, start_time, end_time):
average_waiting_time = avg_time_query(start_time, end_time)
if average_waiting_time:
name = 60 * start_time.hour + start_time.minute
slots_list.append({'name': name, 'time': average_waiting_time})
stats = []
start_time, end_time = min_time, shift_time(min_time, interval)
while start_time < max_time:
add_slot(stats, start_time, end_time)
start_time, end_time = end_time, shift_time(end_time, interval)
return stats
def get_avg_graph(place: str, db: Session):
""" Get the average waiting time for each interval between two time steps,
for the current or next available timeslot"""
current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
weekday, current_time = current_date.weekday(), current_date.time()
first_timeslot = get_timeslot(place, weekday, True, db)
if first_timeslot and current_time <= first_timeslot[1]:
return get_avg_graph_points(place, weekday, first_timeslot[0], first_timeslot[1], timedelta(minutes=5), db), first_timeslot[0], first_timeslot[1]
second_timeslot = get_timeslot(place, weekday, False, db)
if second_timeslot and current_time <= second_timeslot[1]:
return get_avg_graph_points(place, weekday, second_timeslot[0], second_timeslot[1], timedelta(minutes=5), db), second_timeslot[0], second_timeslot[1]
return [], None, None
def get_current_graph_points(place: str, current_date: date, min_time: time, max_time: time, interval: timedelta, db: Session):
""" Get the waiting time for each interval between two time steps for the current timeslot """
def shift_time(t: time, delta: timedelta):
return (datetime.combine(date(1, 1, 1), t) + delta).time()
def avg_time_query(start_time, end_time):
records = db.query(
(func.round(
func.avg(
3600 * func.extract('HOUR', models.Records.waiting_time) +
60 * func.extract('MINUTE', models.Records.waiting_time) +
func.extract('SECOND', models.Records.waiting_time))
)) / 60
).filter(
models.Records.place == place,
func.extract('YEAR', models.Records.date) == current_date.year,
func.extract('MONTH', models.Records.date) == current_date.month,
func.extract('DAY', models.Records.date) == current_date.day,
(func.extract('HOUR', models.Records.date) > start_time.hour) |
((func.extract('HOUR', models.Records.date) == start_time.hour) &
(func.extract('MINUTE', models.Records.date) >= start_time.minute)),
(func.extract('HOUR', models.Records.date) < end_time.hour) |
((func.extract('HOUR', models.Records.date) == end_time.hour) &
(func.extract('MINUTE', models.Records.date) < end_time.minute)),
).one()
if records[0]:
return int(records[0])
return None
def add_slot(slots_list, start_time, end_time):
average_waiting_time = avg_time_query(start_time, end_time)
if average_waiting_time:
name = 60 * start_time.hour + start_time.minute
slots_list.append({'name': name, 'time': average_waiting_time})
stats = []
start_time, end_time = min_time, shift_time(min_time, interval)
while start_time < max_time:
add_slot(stats, start_time, end_time)
start_time, end_time = end_time, shift_time(end_time, interval)
return stats
def get_current_graph(place: str, db: Session):
""" Get the waiting_time_graph for the current timeslot"""
current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
weekday, day, current_time = current_date.weekday(), current_date.date(), current_date.time()
first_timeslot = get_timeslot(place, weekday, True, db)
if first_timeslot and current_time <= first_timeslot[0]:
return [], None, None
elif first_timeslot and current_time <= first_timeslot[1]:
points = get_current_graph_points(place, day, first_timeslot[0], current_time, timedelta(minutes=5), db)
start_time = 60 * first_timeslot[0].hour + first_timeslot[0].minute
end_time = 60 * first_timeslot[1].hour + first_timeslot[1].minute
return points, start_time, end_time
second_timeslot = get_timeslot(place, weekday, False, db)
if second_timeslot and current_time <= second_timeslot[0]:
return [], None, None
elif second_timeslot and current_time <= second_timeslot[1]:
points = get_current_graph_points(place, day, second_timeslot[0], current_time, timedelta(minutes=5), db), second_timeslot[0], second_timeslot[1]
start_time = 60 * second_timeslot[0].hour + second_timeslot[0].minute
end_time = 60 * second_timeslot[1].hour + second_timeslot[1].minute
return points, start_time, end_time
return [], None, None
# Define CRUD operation for the comments
def get_comments(place: str, page: int, db: Session):
""" Get the 10 last comments for the given place """
if page == 0:
comments = db.query(models.Comments).order_by(models.Comments.published_at.desc(), models.Comments.id.desc()).all()
else:
comments = db.query(
models.Comments).filter(
models.Comments.place == place).order_by(
models.Comments.published_at.desc(),
models.Comments.id.desc()).slice(
(page -
1) *
10,
page *
10).all()
return comments
def create_comment(place: str, new_comments: schemas.CommentBase, db: Session):
""" Add a new comment to the database """
date = datetime.now(tz=pytz.timezone("Europe/Paris"))
db_comment = models.Comments(**new_comments.dict(), published_at=date, place=place)
db.add(db_comment)
db.commit()
db.refresh(db_comment)
return db_comment
def delete_comment(id: int, db: Session):
""" Delete the comment with the matching id """
if id == 0:
db.query(models.Comments).delete()
else:
db.query(models.Comments).filter(models.Comments.id == id).delete()
db.commit()
# Define CRUD operation for the news
def get_news(place: str, db: Session):
""" Get the news for the given place """
news = db.query(models.News).filter(models.News.place == place).order_by(models.News.published_at.desc()).all()
return news
def create_news(new_news: schemas.NewsBase, db: Session):
""" Add a news to the database """
date = datetime.now(tz=pytz.timezone("Europe/Paris"))
db_news = models.News(**new_news.dict(), published_at=date)
db.add(db_news)
db.commit()
db.refresh(db_news)
return db_news
def delete_news(id: int, db: Session):
""" Delete the news with the matching id """
if id == 0:
db.query(models.News).delete()
else:
db.query(models.News).filter(models.News.id == id).delete()
db.commit()
# Define CRUD operation for the opening hours
def get_opening_hours(place: str, db: Session):
""" Get the opening hours for the given place """
opening_hours = db.query(
models.OpeningHours
).filter(
models.OpeningHours.place == place
).order_by(
models.OpeningHours.day, models.OpeningHours.timeslot.desc()
).all()
return opening_hours
def get_timeslot(place: str, day: int, timeslot: bool, db: Session):
""" Get the opening hours for the given place and timeslot"""
opening_hours = db.query(
models.OpeningHours.open_time,
models.OpeningHours.close_time,
).filter(
models.OpeningHours.place == place,
models.OpeningHours.day == day,
models.OpeningHours.timeslot == timeslot
).first()
return opening_hours
def create_opening_hours(new_opening_hours: schemas.OpeningHoursBase, db: Session):
""" Add opening hours to the database """
db_opening_hours = models.OpeningHours(**new_opening_hours.dict())
db.add(db_opening_hours)
db.commit()
db.refresh(db_opening_hours)
return db_opening_hours
def delete_opening_hours(id: int, db: Session):
""" Delete the opening hours with the matching id """
if id == 0:
db.query(models.OpeningHours).delete()
else:
db.query(models.OpeningHours).filter(models.OpeningHours.id == id).delete()
db.commit()
# Restaurants information
def get_restaurants(db: Session):
current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
weekday, current_time = current_date.weekday(), current_date.time()
restaurant_names = [r.place for r in db.query(models.OpeningHours.place).distinct()]
restaurants = []
for name in restaurant_names:
restaurant = {}
restaurant["name"] = name
first_timeslot = get_timeslot(name, weekday, True, db)
second_timeslot = get_timeslot(name, weekday, False, db)
if (first_timeslot and first_timeslot[0] <= current_time < first_timeslot[1]) or (
second_timeslot and second_timeslot[0] <= current_time < second_timeslot[1]):
restaurant["status"] = True
else:
restaurant["status"] = False
if first_timeslot and second_timeslot:
restaurant["timetable"] = (
f"{first_timeslot[0].hour:{'0'}{2}}h{first_timeslot[0].minute:{'0'}{2}}-"
f"{first_timeslot[1].hour:{'0'}{2}}h{first_timeslot[1].minute:{'0'}{2}} / "
f"{second_timeslot[0].hour:{'0'}{2}}h{second_timeslot[0].minute:{'0'}{2}}-"
f"{second_timeslot[1].hour:{'0'}{2}}h{second_timeslot[1].minute:{'0'}{2}}")
elif first_timeslot:
restaurant["timetable"] = (
f"{first_timeslot[0].hour:{'0'}{2}}h{first_timeslot[0].minute:{'0'}{2}}-"
f"{first_timeslot[1].hour:{'0'}{2}}h{first_timeslot[1].minute:{'0'}{2}}")
else:
restaurant["timeslot"] = "-"
if restaurant["status"]:
last_record = db.query(
models.Records
).filter(
models.Records.place == name
).order_by(
models.Records.date.desc()
).first()
if last_record:
waiting_time = last_record.waiting_time
restaurant["waiting_time"] = round(waiting_time.total_seconds() / 60)
else:
restaurant["waiting_time"] = None
else:
restaurant["waiting_time"] = None
restaurants.append(restaurant)
return restaurants