Skip to content
Snippets Groups Projects

add staging deployment to CI

Merged Aymeric Chaumont requested to merge staging-ci into main

Files

+ 144
47
@@ -4,6 +4,8 @@ Module to interact with the database
from datetime import date, datetime, time, timedelta
from sqlalchemy.orm import Session
from sqlalchemy.sql import func
from uuid import uuid4
import secrets
import pytz
from db import models, schemas
@@ -15,36 +17,50 @@ def get_waiting_time(place: str, db: Session):
""" Get the last estimated waiting time for the given place """
current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
weekday, current_time = current_date.weekday(), current_date.time()
data = {"status": False, "waiting_time": None, "next_timetable": None}
first_timeslot = get_timeslot(place, weekday, True, db)
if first_timeslot and current_time < first_timeslot[0]:
return first_timeslot[0].hour, first_timeslot[0].minute
data["next_timetable"] = "{:d}h{:02d}".format(
first_timeslot[0].hour, first_timeslot[0].minute)
return data
elif first_timeslot and current_time <= first_timeslot[1]:
waiting_time = db.query(
last_record = db.query(
models.Records.waiting_time
).filter(
models.Records.place == place
).order_by(
models.Records.date.desc()
).first()
waiting_time_minutes = round(waiting_time[0].total_seconds() / 60)
return waiting_time_minutes, None
if last_record:
waiting_time = last_record.waiting_time
waiting_time = round(waiting_time.total_seconds() / 60)
data["status"] = True
data["waiting_time"] = waiting_time
return data
second_timeslot = get_timeslot(place, weekday, False, db)
if second_timeslot and current_time < second_timeslot[0]:
return second_timeslot[0].hour, second_timeslot[0].minute
data["next_timetable"] = "{:d}h{:02d}".format(
second_timeslot[0].hour, second_timeslot[0].minute)
return data
elif second_timeslot and current_time <= second_timeslot[1]:
waiting_time = db.query(
last_record = db.query(
models.Records.waiting_time
).filter(
models.Records.place == place
).order_by(
models.Records.date.desc()
).first()
waiting_time_minutes = round(waiting_time[0].total_seconds() / 60)
return waiting_time_minutes, None
return None, None
if last_record:
waiting_time = last_record.waiting_time
waiting_time = round(waiting_time.total_seconds() / 60)
data["status"] = True
data["waiting_time"] = waiting_time
return data
return data
def get_avg_graph_points(place: str, weekday: int, min_time: time, max_time: time, interval: timedelta, db: Session):
def get_avg_graph_points(place: str, weekday: int, min_time: time,
max_time: time, interval: timedelta, db: Session):
""" Get the average waiting time for each interval between two time steps """
def shift_time(t: time, delta: timedelta):
@@ -75,7 +91,7 @@ def get_avg_graph_points(place: str, weekday: int, min_time: time, max_time: tim
def add_slot(slots_list, start_time, end_time):
average_waiting_time = avg_time_query(start_time, end_time)
if average_waiting_time:
name = f'{start_time.hour:02}h{start_time.minute:02}'
name = 60 * start_time.hour + start_time.minute
slots_list.append({'name': name, 'time': average_waiting_time})
stats = []
@@ -94,14 +110,16 @@ def get_avg_graph(place: str, db: Session):
weekday, current_time = current_date.weekday(), current_date.time()
first_timeslot = get_timeslot(place, weekday, True, db)
if first_timeslot and current_time <= first_timeslot[1]:
return get_avg_graph_points(place, weekday, first_timeslot[0], first_timeslot[1], timedelta(minutes=5), db)
return get_avg_graph_points(
place, weekday, first_timeslot[0], first_timeslot[1], timedelta(minutes=5), db)
second_timeslot = get_timeslot(place, weekday, False, db)
if second_timeslot and current_time <= second_timeslot[1]:
return get_avg_graph_points(place, weekday, second_timeslot[0], second_timeslot[1], timedelta(minutes=5), db)
return None
return []
def get_current_graph_points(place: str, current_date: date, min_time: time, max_time: time, interval: timedelta, db: Session):
def get_current_graph_points(place: str, current_date: date,
min_time: time, max_time: time, interval: timedelta, db: Session):
""" Get the waiting time for each interval between two time steps for the current timeslot """
def shift_time(t: time, delta: timedelta):
@@ -134,7 +152,7 @@ def get_current_graph_points(place: str, current_date: date, min_time: time, max
def add_slot(slots_list, start_time, end_time):
average_waiting_time = avg_time_query(start_time, end_time)
if average_waiting_time:
name = f'{start_time.hour:02}h{start_time.minute:02}'
name = 60 * start_time.hour + start_time.minute
slots_list.append({'name': name, 'time': average_waiting_time})
stats = []
@@ -149,48 +167,60 @@ def get_current_graph_points(place: str, current_date: date, min_time: time, max
def get_current_graph(place: str, db: Session):
""" Get the waiting_time_graph for the current timeslot"""
current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
weekday, day, current_time = current_date.weekday(), current_date.date(), current_date.time()
weekday, day, current_time = current_date.weekday(
), current_date.date(), current_date.time()
first_timeslot = get_timeslot(place, weekday, True, db)
if first_timeslot and current_time <= first_timeslot[0]:
return None
return [], None, None
elif first_timeslot and current_time <= first_timeslot[1]:
return get_current_graph_points(place, day, first_timeslot[0], current_time, timedelta(minutes=5), db)
points = get_current_graph_points(place, day, first_timeslot[0], current_time, timedelta(minutes=5), db)
start_time = 60 * first_timeslot[0].hour + first_timeslot[0].minute
end_time = 60 * first_timeslot[1].hour + first_timeslot[1].minute
return points, start_time, end_time
second_timeslot = get_timeslot(place, weekday, False, db)
if second_timeslot and current_time <= second_timeslot[0]:
return None
return [], None, None
elif second_timeslot and current_time <= second_timeslot[1]:
return get_current_graph_points(place, day, second_timeslot[0], current_time, timedelta(minutes=5), db)
return None
points = get_current_graph_points(place, day, second_timeslot[0], current_time, timedelta(minutes=5), db)
start_time = 60 * second_timeslot[0].hour + second_timeslot[0].minute
end_time = 60 * second_timeslot[1].hour + second_timeslot[1].minute
return points, start_time, end_time
return [], None, None
# Define CRUD operation for the comments
def get_comments(place: str, page: int, db: Session):
""" Get the 10 last comments for the given place """
""" Get the 20 last comments for the given place """
if page == 0:
comments = db.query(models.Comments).order_by(models.Comments.published_at.desc(), models.Comments.id.desc()).all()
comments = db.query(
models.Comments).order_by(
models.Comments.published_at.desc(),
models.Comments.id.desc()).all()
else:
comments = db.query(
models.Comments).filter(
models.Comments,
models.Users.username).join(
models.Users).filter(
models.Comments.place == place).order_by(
models.Comments.published_at.desc(),
models.Comments.id.desc()).slice(
(page -
1) *
10,
page *
10).all()
return comments
models.Comments.published_at.desc(),
models.Comments.id.desc()).slice(
(page -
1) *
20,
page *
20).all()
return list(schemas.Comment(**comment.__dict__, username=username) for comment, username in comments)
def create_comment(place: str, new_comments: schemas.CommentBase, db: Session):
def create_comment(user: schemas.User, place: str, new_comments: schemas.CommentBase, db: Session):
""" Add a new comment to the database """
date = datetime.now(tz=pytz.timezone("Europe/Paris"))
db_comment = models.Comments(**new_comments.dict(), published_at=date, place=place)
db_comment = models.Comments(**new_comments.dict(), published_at=date, place=place, user_id=user.id)
db.add(db_comment)
db.commit()
db.refresh(db_comment)
return db_comment
return schemas.Comment(**db_comment.__dict__, username=user.username)
def delete_comment(id: int, db: Session):
@@ -206,7 +236,10 @@ def delete_comment(id: int, db: Session):
def get_news(place: str, db: Session):
""" Get the news for the given place """
news = db.query(models.News).filter(models.News.place == place).order_by(models.News.published_at.desc()).all()
news = db.query(
models.News).filter(
models.News.place == place).order_by(
models.News.published_at.desc()).all()
return news
@@ -234,10 +267,7 @@ def delete_news(id: int, db: Session):
def get_opening_hours(place: str, db: Session):
""" Get the opening hours for the given place """
opening_hours = db.query(
models.OpeningHours.day,
models.OpeningHours.timeslot,
models.OpeningHours.open_time,
models.OpeningHours.close_time,
models.OpeningHours
).filter(
models.OpeningHours.place == place
).order_by(
@@ -259,7 +289,8 @@ def get_timeslot(place: str, day: int, timeslot: bool, db: Session):
return opening_hours
def create_opening_hours(new_opening_hours: schemas.OpeningHoursBase, db: Session):
def create_opening_hours(
new_opening_hours: schemas.OpeningHoursBase, db: Session):
""" Add opening hours to the database """
db_opening_hours = models.OpeningHours(**new_opening_hours.dict())
db.add(db_opening_hours)
@@ -273,7 +304,9 @@ def delete_opening_hours(id: int, db: Session):
if id == 0:
db.query(models.OpeningHours).delete()
else:
db.query(models.OpeningHours).filter(models.OpeningHours.id == id).delete()
db.query(
models.OpeningHours).filter(
models.OpeningHours.id == id).delete()
db.commit()
@@ -282,7 +315,9 @@ def delete_opening_hours(id: int, db: Session):
def get_restaurants(db: Session):
current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
weekday, current_time = current_date.weekday(), current_date.time()
restaurant_names = [r.place for r in db.query(models.OpeningHours.place).distinct()]
restaurant_names = [
r.place for r in db.query(
models.OpeningHours.place).distinct()]
restaurants = []
for name in restaurant_names:
@@ -311,18 +346,80 @@ def get_restaurants(db: Session):
restaurant["timeslot"] = "-"
if restaurant["status"]:
waiting_time = db.query(
models.Records.waiting_time
last_record = db.query(
models.Records
).filter(
models.Records.place == name
).order_by(
models.Records.date.desc()
).first()
waiting_time_minutes = round(waiting_time[0].total_seconds() / 60)
restaurant["waiting_time"] = waiting_time_minutes
if last_record:
waiting_time = last_record.waiting_time
restaurant["waiting_time"] = round(
waiting_time.total_seconds() / 60)
else:
restaurant["waiting_time"] = None
else:
restaurant["waiting_time"] = None
restaurants.append(restaurant)
return restaurants
# Define CRUD operation for the authentication
def init_user(db: Session):
""" Add a news to the database """
cookie = uuid4()
state = secrets.token_urlsafe(30)
expiration_date = datetime.now(tz=pytz.timezone("Europe/Paris")) + timedelta(minutes=10)
db_user = models.Users(state=state, cookie=cookie, expiration_date=expiration_date)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def get_user(cookie: str, db: Session):
""" Get user infos """
user = db.query(models.Users).filter(models.Users.cookie == cookie).one()
if pytz.timezone("Europe/Paris").localize(user.expiration_date) < datetime.now(tz=pytz.timezone("Europe/Paris")):
return
return user
def delete_state(user: schemas.User, db: Session):
""" Delete the state of a user """
user.state = None
db.add(user)
db.commit()
def update_user(user: schemas.User, user_info: dict, db: Session):
full_name = f"{user_info['firstName']} {user_info['lastName']}"
expiration_date = datetime.now(tz=pytz.timezone("Europe/Paris")) + timedelta(days=3)
existing_user = db.query(models.Users).filter(models.Users.username == full_name).first()
if existing_user:
existing_user.cookie = user.cookie
existing_user.expiration_date = expiration_date
db.delete(user)
db.add(existing_user)
db.commit()
db.refresh(existing_user)
return existing_user
else:
user.username = full_name
user.expiration_date = expiration_date
db.add(user)
db.commit()
db.refresh(user)
return user
def end_session(cookie: str, db: Session):
user = db.query(models.Users).filter(models.Users.cookie == cookie).one()
user.expiration_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
db.add(user)
db.commit()
return
Loading