Select Git revision
crud.py 20.08 KiB
"""
Module to interact with the database
"""
from datetime import date, datetime, time, timedelta
from fastapi import HTTPException
from sqlalchemy.orm import Session
from sqlalchemy.sql import func
from sqlalchemy import Time, Date, cast
from uuid import uuid4
import secrets
import pytz
from db import models, schemas
# Define CRUD operation to collect the statistics
def get_waiting_time(place: str, db: Session):
""" Get the last estimated waiting time for the given place """
current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
date, weekday, current_time = current_date.date(), current_date.weekday(), current_date.time()
opening_hours = db.query(
models.OpeningHours.open_time,
models.OpeningHours.close_time).filter(
models.OpeningHours.place == place,
models.OpeningHours.day == weekday).order_by(
models.OpeningHours.open_time).all()
for time_slot in opening_hours:
closure = db.query(
models.Closure).filter(
models.Closure.place == place,
models.Closure.beginning_date <= datetime.combine(date, time_slot.open_time),
models.Closure.end_date >= datetime.combine(date, time_slot.open_time)).order_by(
models.Closure.beginning_date).first()
if not closure:
if current_time < time_slot.open_time:
return schemas.WaitingTime(next_timetable=time_slot.open_time.strftime('%Hh%M'))
elif current_time <= time_slot.close_time:
limit = datetime.combine(date, time_slot.open_time)
last_record = db.query(
models.Records.waiting_time).filter(
models.Records.place == place).filter(
models.Records.date >= limit).order_by(
models.Records.date.desc()).first()
waiting_time = None
if last_record:
waiting_time = round(
last_record.waiting_time.total_seconds() / 60)
return schemas.WaitingTime(status=True, waiting_time=waiting_time)
return schemas.WaitingTime()
# Define some utils function
def shift_time(t: time, delta: timedelta):
return (datetime.combine(date(1, 1, 1), t) + delta).time()
def add_slot(slots_list, start_time, end_time, function):
waiting_time = function(start_time, end_time)
if waiting_time:
name = 60 * start_time.hour + start_time.minute
slots_list.append(schemas.RecordRead(name=name, time=waiting_time))
def get_avg_graph_points(place: str, weekday: int, min_time: time,
max_time: time, interval: timedelta, db: Session):
""" Get the average waiting time for each interval between two time steps """
def avg_time_query(start_time, end_time):
records = db.query(
func.round(
func.avg(
60 * func.extract('HOUR', models.Records.waiting_time) +
func.extract('MINUTE', models.Records.waiting_time))
)
).filter(
models.Records.place == place,
func.weekday(models.Records.date) == weekday,
cast(models.Records.date, Time) >= start_time,
cast(models.Records.date, Time) <= end_time,
).first()
if records[0] or records[0] == 0:
return int(records[0])
return None
stats = []
start_time, end_time = min_time, shift_time(min_time, interval)
while start_time < max_time:
add_slot(stats, start_time, end_time, avg_time_query)
start_time, end_time = end_time, shift_time(end_time, interval)
return stats
def get_avg_graph(place: str, db: Session):
""" Get the average waiting time for each interval between two time steps,
for the current or next available timeslot"""
current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
weekday, current_time = current_date.weekday(), current_date.time()
opening_hours = db.query(
models.OpeningHours.open_time,
models.OpeningHours.close_time).filter(
models.OpeningHours.place == place,
models.OpeningHours.day == weekday).order_by(
models.OpeningHours.open_time).all()
closure = db.query(
models.Closure).filter(
models.Closure.place == place,
models.Closure.beginning_date <= current_date,
models.Closure.end_date >= current_date).first()
if not closure:
for time_slot in opening_hours:
if current_time <= time_slot.close_time:
return get_avg_graph_points(place, weekday, time_slot.open_time, time_slot.close_time, timedelta(minutes=5), db)
return []
def get_current_graph_points(place: str, current_date: date,
min_time: time, max_time: time, interval: timedelta, db: Session):
""" Get the waiting time for each interval between two time steps for the current timeslot """
def current_time_query(start_time, end_time):
records = db.query(
func.round(
func.avg(
60 * func.extract('HOUR', models.Records.waiting_time) +
func.extract('MINUTE', models.Records.waiting_time))
)
).filter(
models.Records.place == place,
cast(models.Records.date, Date) == current_date,
cast(models.Records.date, Time) >= start_time,
cast(models.Records.date, Time) <= end_time
).first()
if records[0] or records[0] == 0:
return int(records[0])
return None
stats = []
start_time, end_time = min_time, shift_time(min_time, interval)
while start_time < max_time:
add_slot(stats, start_time, end_time, current_time_query)
start_time, end_time = end_time, shift_time(end_time, interval)
return stats
def get_current_graph(place: str, db: Session):
""" Get the waiting_time_graph for the current timeslot"""
current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
weekday, day, current_time = current_date.weekday(), current_date.date(), current_date.time()
opening_hours = db.query(
models.OpeningHours.open_time,
models.OpeningHours.close_time).filter(
models.OpeningHours.place == place,
models.OpeningHours.day == weekday).all()
closure = db.query(
models.Closure).filter(
models.Closure.place == place,
models.Closure.beginning_date <= current_date,
models.Closure.end_date >= current_date).first()
if not closure:
for time_slot in opening_hours:
if time_slot.open_time <= current_time <= time_slot.close_time:
points = get_current_graph_points(
place, day, time_slot.open_time, current_time, timedelta(minutes=5), db)
start_time = 60 * time_slot.open_time.hour + time_slot.open_time.minute
end_time = 60 * time_slot.close_time.hour + time_slot.close_time.minute
return schemas.Graph(data=points, start=start_time, end=end_time)
return schemas.Graph(data=[])
# Define CRUD operation for the comments
def get_comments(place: str, page: int, db: Session):
""" Get the 20 last comments for the given place """
if page == 0:
comments = db.query(
models.Comments).order_by(
models.Comments.published_at.desc(),
models.Comments.id.desc()).all()
else:
comments = db.query(
models.Comments,
models.Users.username).join(
models.Users).filter(
models.Comments.place == place).order_by(
models.Comments.published_at.desc(),
models.Comments.id.desc()).slice(
(page - 1) * 20, page * 20).all()
comments_list = [schemas.Comment(**comment.__dict__, username=username) for comment, username in comments]
comments_list.reverse()
return comments_list
def create_comment(user: schemas.User, place: str, new_comments: schemas.CommentBase, db: Session):
""" Add a new comment to the database """
date = datetime.now(tz=pytz.timezone("Europe/Paris"))
db_comment = models.Comments(**new_comments.dict(), published_at=date, place=place, user_id=user.id)
db.add(db_comment)
db.commit()
db.refresh(db_comment)
return schemas.Comment(**db_comment.__dict__, username=user.username)
def delete_comment(id: int, db: Session):
""" Delete the comment with the matching id """
if id == 0:
db.query(models.Comments).delete()
else:
db.query(models.Comments).filter(models.Comments.id == id).delete()
db.commit()
# Define CRUD operation for the news
def get_news(place: str, admin: bool, db: Session):
""" Get the news for the given place """
current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
news = db.query(
models.News).filter(
models.News.place == place,
models.News.end_date >= current_date).order_by(
models.News.published_at.desc()).all()
if admin:
return news
opening_hours = db.query(
models.OpeningHours.open_time,
models.OpeningHours.close_time,
models.OpeningHours.day).filter(
models.OpeningHours.place == place,
models.OpeningHours.day >= current_date.weekday()).order_by(
models.OpeningHours.day,
models.OpeningHours.open_time).all()
if not opening_hours:
opening_hours = db.query(
models.OpeningHours.open_time,
models.OpeningHours.close_time,
models.OpeningHours.day).filter(
models.OpeningHours.place == place).order_by(
models.OpeningHours.day,
models.OpeningHours.open_time).all()
next_time_slot = None
for open_time, close_time, day in opening_hours:
next_date = current_date + timedelta(days=day - current_date.weekday())
if day < current_date.weekday():
next_date = next_date + timedelta(days=7)
next_time_slot = datetime.combine(next_date.date(), open_time)
break
if current_date < pytz.timezone("Europe/Paris").localize(datetime.combine(next_date.date(), close_time)):
next_time_slot = datetime.combine(next_date.date(), open_time)
break
if next_time_slot:
closure = db.query(
models.Closure).filter(
models.Closure.place == place,
models.Closure.beginning_date <= next_time_slot,
models.Closure.end_date > next_time_slot).first()
if closure:
closure_news = schemas.News(
title="Fermeture exceptionnelle",
content=f"{place} est exceptionnellement hors service jusqu'au {closure.end_date.strftime('%d/%m/%y à %Hh%M')}",
end_date=closure.end_date,
place=place,
published_at=current_date)
news.append(closure_news)
return news
def create_news(new_news: schemas.NewsBase, db: Session):
""" Add a news to the database """
date = datetime.now(tz=pytz.timezone("Europe/Paris"))
db_news = models.News(**new_news.dict(), published_at=date)
db.add(db_news)
db.commit()
db.refresh(db_news)
return db_news
def delete_news(id: int, db: Session):
""" Delete the news with the matching id """
if id == 0:
db.query(models.News).delete()
else:
db.query(models.News).filter(models.News.id == id).delete()
db.commit()
# Define CRUD operation for the opening hours
def get_opening_hours(place: str, db: Session):
""" Get the opening hours for the given place """
opening_hours = db.query(
models.OpeningHours
).filter(
models.OpeningHours.place == place
).order_by(
models.OpeningHours.day, models.OpeningHours.open_time
).all()
return opening_hours
def create_opening_hours(
new_opening_hours: schemas.OpeningHoursBase, db: Session):
""" Add opening hours to the database """
db_opening_hours = models.OpeningHours(**new_opening_hours.dict())
db.add(db_opening_hours)
db.commit()
db.refresh(db_opening_hours)
return db_opening_hours
def delete_opening_hours(id: int, db: Session):
""" Delete the opening hours with the matching id """
if id == 0:
db.query(models.OpeningHours).delete()
else:
db.query(
models.OpeningHours).filter(
models.OpeningHours.id == id).delete()
db.commit()
# Restaurants information
def get_restaurants(db: Session):
weekday = datetime.now(tz=pytz.timezone("Europe/Paris")).weekday()
places = db.query(models.OpeningHours.place).distinct()
restaurants = []
for place in places:
opening_hours = db.query(
models.OpeningHours).filter(
models.OpeningHours.place == place.place,
models.OpeningHours.day == weekday).order_by(
models.OpeningHours.open_time).all()
opening_hours_formated = [
f"{row.open_time.strftime('%Hh%M')}-{row.close_time.strftime('%Hh%M')}" for row in opening_hours]
timetable = "/".join(opening_hours_formated)
infos = get_waiting_time(place.place, db)
restaurants.append(schemas.Restaurant(
**infos.dict(), name=place.place, timetable=timetable))
return restaurants
# Define CRUD operation for the authentication
def init_user(db: Session):
""" Add a news to the database """
cookie = uuid4()
state = secrets.token_urlsafe(30)
expiration_date = datetime.now(tz=pytz.timezone(
"Europe/Paris")) + timedelta(minutes=10)
db_user = models.Users(state=state, cookie=cookie,
expiration_date=expiration_date)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def get_user(cookie: str, db: Session):
""" Get user infos """
try:
user = db.query(models.Users).filter(
models.Users.cookie == cookie).one()
except BaseException:
raise HTTPException(status_code=401, detail="Invalid cookie")
if pytz.timezone("Europe/Paris").localize(user.expiration_date) < datetime.now(tz=pytz.timezone("Europe/Paris")):
user.cookie = None
db.add(user)
db.commit()
raise HTTPException(status_code=401, detail="Expired cookie")
return user
def delete_state(user: schemas.User, db: Session):
""" Delete the state of a user """
user.state = None
db.add(user)
db.commit()
def update_user(user: schemas.User, user_info: dict, db: Session):
full_name = f"{user_info['firstName']} {user_info['lastName']}"
expiration_date = datetime.now(
tz=pytz.timezone("Europe/Paris")) + timedelta(days=3)
existing_user = db.query(models.Users).filter(
models.Users.username == full_name).first()
if existing_user:
existing_user.cookie = user.cookie
existing_user.expiration_date = expiration_date
existing_user.admin = "admin eatfast" in user_info["roles"]
db.delete(user)
db.add(existing_user)
db.commit()
db.refresh(existing_user)
return existing_user
else:
user.username = full_name
user.expiration_date = expiration_date
user.admin = "admin eatfast" in user_info["roles"]
db.add(user)
db.commit()
db.refresh(user)
return user
def end_session(cookie: str, db: Session):
user = db.query(models.Users).filter(models.Users.cookie == cookie).one()
user.expiration_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
user.cookie = None
db.add(user)
db.commit()
return
def delete_user(cookie: str, db: Session):
db.query(models.Users).filter(models.Users.cookie == cookie).delete()
db.commit()
return
# Define CRUD operations for data collection
def get_records(place: str, db: Session):
records = db.query(models.Records).filter(
models.Records.place == place).order_by(models.Records.date.desc()).all()
return records
def create_record(record: schemas.RecordBase, db: Session):
db_record = models.Records(**record.dict())
db.add(db_record)
db.commit()
db.refresh(db_record)
return db_record
def delete_record(id: int, db: Session):
if id == 0:
db.query(models.Records).delete()
else:
db.query(models.Records).filter(models.Records.id == id).delete()
db.commit()
return
def get_collaborative_records(place: str, db: Session):
records = db.query(models.CollaborativeRecords).filter(
models.CollaborativeRecords.place == place).order_by(models.CollaborativeRecords.date.desc()).all()
return [schemas.CollaborativeRecords(**record.__dict__) for record in records]
def create_collaborative_record(user: schemas.User, place: str, db: Session):
current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
date, weekday, current_time = current_date.date(
), current_date.weekday(), current_date.time()
try:
time_slot = db.query(
models.OpeningHours).filter(
models.OpeningHours.place == place,
models.OpeningHours.day == weekday,
models.OpeningHours.open_time <= current_time,
models.OpeningHours.close_time >= current_time).one()
except BaseException:
raise HTTPException(status_code=404, detail="No restaurant opened")
last_record = db.query(models.CollaborativeRecords).filter(
models.CollaborativeRecords.user_id == user.id).order_by(models.CollaborativeRecords.date.desc()).first()
if not last_record or last_record.date <= datetime.combine(date, time_slot.open_time):
db_record = models.CollaborativeRecords(
user_id=user.id, place=place, date=current_date)
db.add(db_record)
db.commit()
db.refresh(db_record)
return db_record
raise HTTPException(status_code=406, detail="Client already registered")
def update_collaborative_record(user: schemas.User, db: Session):
current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
date, weekday, current_time = current_date.date(
), current_date.weekday(), current_date.time()
last_record = db.query(models.CollaborativeRecords).filter(
models.CollaborativeRecords.user_id == user.id).order_by(models.CollaborativeRecords.date.desc()).first()
try:
time_slot = db.query(
models.OpeningHours).filter(
models.OpeningHours.place == last_record.place,
models.OpeningHours.day == weekday,
models.OpeningHours.open_time <= current_time,
models.OpeningHours.close_time >= current_time).one()
except BaseException:
raise HTTPException(status_code=404, detail="No restaurant opened")
if last_record.date >= datetime.combine(date, time_slot.open_time) and not last_record.waiting_time:
last_record.waiting_time = current_date - \
pytz.timezone("Europe/Paris").localize(last_record.date)
print(last_record.waiting_time)
db.add(last_record)
db.commit()
db.refresh(last_record)
return schemas.CollaborativeRecords(**last_record.__dict__)
raise HTTPException(status_code=406, detail="Client already registered")
def delete_collaborative_record(id: int, db: Session):
if id == 0:
db.query(models.CollaborativeRecords).delete()
else:
db.query(models.CollaborativeRecords).filter(
models.CollaborativeRecords.id == id).delete()
db.commit()
return
# Define CRUD operation for exceptional closure
def get_closure(place: str, db: Session):
current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
closures = db.query(
models.Closure).filter(
models.Closure.place == place,
models.Closure.end_date >= current_date).order_by(
models.Closure.beginning_date).all()
return [schemas.Closure(**closure.__dict__) for closure in closures]
def create_closure(closure: schemas.Closure, db: Session):
db_closure = models.Closure(**closure.dict())
db.add(db_closure)
db.commit()
db.refresh(db_closure)
return schemas.Closure(**db_closure.__dict__)
def delete_closure(id: int, db: Session):
if id == 0:
db.query(models.Closure).delete()
else:
db.query(models.Closure).filter(models.Closure.id == id).delete()
db.commit()
return