Skip to content
Snippets Groups Projects

Time dependence

Merged Aymeric Chaumont requested to merge time-dependence into collab_front
Files
10
+ 277
130
"""
Module to interact with the database
"""
from datetime import date, datetime, time, timedelta
from sqlalchemy.orm import Session
from sqlalchemy.sql import func
import pytz
from db import models, schemas
# Define CRUD operation to collect the statistics
def get_waiting_time(place: str, db: Session):
""" Get the last estimated waiting time for the given place """
db_record = db.query(models.Records).filter(models.Records.place == place).order_by(models.Records.date.desc()).first()
if db_record.waiting_time is not None:
return db_record.waiting_time
else:
raise Exception
def get_stats(place: str, weekday: int, min_time_hour: int, min_time_mn: int, max_time_hour: int, max_time_mn: int, interval: timedelta, db: Session):
""" Get the average waiting time for each interval between two time steps """
def shift_time(t: time, delta: timedelta):
return (datetime.combine(date(1, 1, 1), t) + delta).time()
def avg_time_query(start_time, end_time):
records = db.query(
(func.round(
func.avg(
3600 * func.extract('HOUR', models.Records.waiting_time) +
60 * func.extract('MINUTE', models.Records.waiting_time) +
func.extract('SECOND', models.Records.waiting_time))
)) / 60
).filter(
models.Records.place == place,
func.weekday(models.Records.date) == weekday,
(func.extract('HOUR', models.Records.date) > start_time.hour) |
((func.extract('HOUR', models.Records.date) == start_time.hour) &
(func.extract('MINUTE', models.Records.date) >= start_time.minute)),
(func.extract('HOUR', models.Records.date) < end_time.hour) |
((func.extract('HOUR', models.Records.date) == end_time.hour) &
(func.extract('MINUTE', models.Records.date) < end_time.minute)),
).one()
if records[0]:
return int(records[0])
return None
def add_slot(slots_list, start_time, end_time):
average_waiting_time = avg_time_query(start_time, end_time)
if average_waiting_time:
name = f'{start_time.hour:02}h{start_time.minute:02}'
slots_list.append({'name': name, 'time': average_waiting_time})
min_time, max_time = time(min_time_hour, min_time_mn), time(max_time_hour, max_time_mn)
stats = []
start_time, end_time = min_time, shift_time(min_time, interval)
while start_time < max_time:
add_slot(stats, start_time, end_time)
start_time, end_time = end_time, shift_time(end_time, interval)
return stats
# Define CRUD operation for the comments
def get_comments(place: str, page: int, db: Session):
""" Get the 10 last comments for the given place """
if page == 0:
comments = db.query(models.Comments).order_by(models.Comments.published_at.desc(), models.Comments.id.desc()).all()
else:
comments = db.query(
models.Comments).filter(
models.Comments.place == place).order_by(
models.Comments.published_at.desc(),
models.Comments.id.desc()).slice(
(page -
1) *
10,
page *
10).all()
return comments
def create_comment(place: str, new_comments: schemas.CommentBase, db: Session):
""" Add a new comment to the database """
date = datetime.now(tz=pytz.timezone("Europe/Paris"))
db_comment = models.Comments(**new_comments.dict(), published_at=date, place=place)
db.add(db_comment)
db.commit()
db.refresh(db_comment)
return db_comment
def delete_comment(id: int, db: Session):
""" Delete the comment with the matching id """
if id == 0:
db.query(models.Comments).delete()
else:
db.query(models.Comments).filter(models.Comments.id == id).delete()
db.commit()
# Define CRUD operation for the news
def get_news(place: str, db: Session):
""" Get the news for the given place """
news = db.query(models.News).filter(models.News.place == place).order_by(models.News.published_at.desc()).all()
return news
def create_news(new_news: schemas.NewsBase, db: Session):
""" Add a news to the database """
date = datetime.now(tz=pytz.timezone("Europe/Paris"))
db_news = models.News(**new_news.dict(), published_at=date)
db.add(db_news)
db.commit()
db.refresh(db_news)
return db_news
def delete_news(id: int, db: Session):
""" Delete the news with the matching id """
if id == 0:
db.query(models.News).delete()
else:
db.query(models.News).filter(models.News.id == id).delete()
db.commit()
"""
Module to interact with the database
"""
from datetime import date, datetime, time, timedelta
from sqlalchemy.orm import Session
from sqlalchemy.sql import func
import pytz
from db import models, schemas
# Define CRUD operation to collect the statistics
def get_waiting_time(place: str, db: Session):
""" Get the last estimated waiting time for the given place """
current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
weekday, current_time = current_date.weekday(), current_date.time()
first_timeslot = get_timeslot(place, weekday, True, db)
if first_timeslot and current_time < first_timeslot[0]:
return first_timeslot[0].hour, first_timeslot[0].minute
elif first_timeslot and current_time <= first_timeslot[1]:
waiting_time = db.query(
models.Records.waiting_time
).filter(
models.Records.place == place
).order_by(
models.Records.date.desc()
).first()
waiting_time_minutes = round(waiting_time[0].total_seconds() / 60)
return waiting_time_minutes, None
second_timeslot = get_timeslot(place, weekday, False, db)
if second_timeslot and current_time < second_timeslot[0]:
return second_timeslot[0].hour, second_timeslot[0].minute
elif second_timeslot and current_time <= second_timeslot[1]:
waiting_time = db.query(
models.Records.waiting_time
).filter(
models.Records.place == place
).order_by(
models.Records.date.desc()
).first()
waiting_time_minutes = round(waiting_time[0].total_seconds() / 60)
return waiting_time_minutes, None
return None, None
def get_avg_graph_points(place: str, weekday: int, min_time: time, max_time: time, interval: timedelta, db: Session):
""" Get the average waiting time for each interval between two time steps """
def shift_time(t: time, delta: timedelta):
return (datetime.combine(date(1, 1, 1), t) + delta).time()
def avg_time_query(start_time, end_time):
records = db.query(
(func.round(
func.avg(
3600 * func.extract('HOUR', models.Records.waiting_time) +
60 * func.extract('MINUTE', models.Records.waiting_time) +
func.extract('SECOND', models.Records.waiting_time))
)) / 60
).filter(
models.Records.place == place,
func.weekday(models.Records.date) == weekday,
(func.extract('HOUR', models.Records.date) > start_time.hour) |
((func.extract('HOUR', models.Records.date) == start_time.hour) &
(func.extract('MINUTE', models.Records.date) >= start_time.minute)),
(func.extract('HOUR', models.Records.date) < end_time.hour) |
((func.extract('HOUR', models.Records.date) == end_time.hour) &
(func.extract('MINUTE', models.Records.date) < end_time.minute)),
).one()
if records[0]:
return int(records[0])
return None
def add_slot(slots_list, start_time, end_time):
average_waiting_time = avg_time_query(start_time, end_time)
if average_waiting_time:
name = f'{start_time.hour:02}h{start_time.minute:02}'
slots_list.append({'name': name, 'time': average_waiting_time})
stats = []
start_time, end_time = min_time, shift_time(min_time, interval)
while start_time < max_time:
add_slot(stats, start_time, end_time)
start_time, end_time = end_time, shift_time(end_time, interval)
return stats
def get_avg_graph(place: str, db: Session):
""" Get the average waiting time for each interval between two time steps,
for the current or next available timeslot"""
current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
weekday, current_time = current_date.weekday(), current_date.time()
first_timeslot = get_timeslot(place, weekday, True, db)
if first_timeslot and current_time <= first_timeslot[1]:
return get_avg_graph_points(place, weekday, first_timeslot[0], first_timeslot[1], timedelta(minutes=5), db)
second_timeslot = get_timeslot(place, weekday, False, db)
if second_timeslot and current_time <= second_timeslot[1]:
return get_avg_graph_points(place, weekday, second_timeslot[0], second_timeslot[1], timedelta(minutes=5), db)
return None
def get_current_graph_points(place: str, current_date: date, min_time: time, max_time: time, interval: timedelta, db: Session):
""" Get the waiting time for each interval between two time steps for the current timeslot """
def shift_time(t: time, delta: timedelta):
return (datetime.combine(date(1, 1, 1), t) + delta).time()
def avg_time_query(start_time, end_time):
records = db.query(
(func.round(
func.avg(
3600 * func.extract('HOUR', models.Records.waiting_time) +
60 * func.extract('MINUTE', models.Records.waiting_time) +
func.extract('SECOND', models.Records.waiting_time))
)) / 60
).filter(
models.Records.place == place,
func.extract('YEAR', models.Records.date) == current_date.year,
func.extract('MONTH', models.Records.date) == current_date.month,
func.extract('DAY', models.Records.date) == current_date.day,
(func.extract('HOUR', models.Records.date) > start_time.hour) |
((func.extract('HOUR', models.Records.date) == start_time.hour) &
(func.extract('MINUTE', models.Records.date) >= start_time.minute)),
(func.extract('HOUR', models.Records.date) < end_time.hour) |
((func.extract('HOUR', models.Records.date) == end_time.hour) &
(func.extract('MINUTE', models.Records.date) < end_time.minute)),
).one()
if records[0]:
return int(records[0])
return None
def add_slot(slots_list, start_time, end_time):
average_waiting_time = avg_time_query(start_time, end_time)
if average_waiting_time:
name = f'{start_time.hour:02}h{start_time.minute:02}'
slots_list.append({'name': name, 'time': average_waiting_time})
stats = []
start_time, end_time = min_time, shift_time(min_time, interval)
while start_time < max_time:
add_slot(stats, start_time, end_time)
start_time, end_time = end_time, shift_time(end_time, interval)
return stats
def get_current_graph(place: str, db: Session):
""" Get the waiting_time_graph for the current timeslot"""
current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
weekday, day, current_time = current_date.weekday(), current_date.date(), current_date.time()
first_timeslot = get_timeslot(place, weekday, True, db)
if first_timeslot and current_time <= first_timeslot[0]:
return None
elif first_timeslot and current_time <= first_timeslot[1]:
return get_current_graph_points(place, day, first_timeslot[0], current_time, timedelta(minutes=5), db)
second_timeslot = get_timeslot(place, weekday, False, db)
if second_timeslot and current_time <= second_timeslot[0]:
return None
elif second_timeslot and current_time <= second_timeslot[1]:
return get_current_graph_points(place, day, second_timeslot[0], current_time, timedelta(minutes=5), db)
return None
# Define CRUD operation for the comments
def get_comments(place: str, page: int, db: Session):
""" Get the 10 last comments for the given place """
if page == 0:
comments = db.query(models.Comments).order_by(models.Comments.published_at.desc(), models.Comments.id.desc()).all()
else:
comments = db.query(
models.Comments).filter(
models.Comments.place == place).order_by(
models.Comments.published_at.desc(),
models.Comments.id.desc()).slice(
(page -
1) *
10,
page *
10).all()
return comments
def create_comment(place: str, new_comments: schemas.CommentBase, db: Session):
""" Add a new comment to the database """
date = datetime.now(tz=pytz.timezone("Europe/Paris"))
db_comment = models.Comments(**new_comments.dict(), published_at=date, place=place)
db.add(db_comment)
db.commit()
db.refresh(db_comment)
return db_comment
def delete_comment(id: int, db: Session):
""" Delete the comment with the matching id """
if id == 0:
db.query(models.Comments).delete()
else:
db.query(models.Comments).filter(models.Comments.id == id).delete()
db.commit()
# Define CRUD operation for the news
def get_news(place: str, db: Session):
""" Get the news for the given place """
news = db.query(models.News).filter(models.News.place == place).order_by(models.News.published_at.desc()).all()
return news
def create_news(new_news: schemas.NewsBase, db: Session):
""" Add a news to the database """
date = datetime.now(tz=pytz.timezone("Europe/Paris"))
db_news = models.News(**new_news.dict(), published_at=date)
db.add(db_news)
db.commit()
db.refresh(db_news)
return db_news
def delete_news(id: int, db: Session):
""" Delete the news with the matching id """
if id == 0:
db.query(models.News).delete()
else:
db.query(models.News).filter(models.News.id == id).delete()
db.commit()
# Define CRUD operation for the opening hours
def get_opening_hours(place: str, db: Session):
""" Get the opening hours for the given place """
opening_hours = db.query(
models.OpeningHours.day,
models.OpeningHours.timeslot,
models.OpeningHours.open_time,
models.OpeningHours.close_time,
).filter(
models.OpeningHours.place == place
).order_by(
models.OpeningHours.day, models.OpeningHours.timeslot.desc()
).all()
return opening_hours
def get_timeslot(place: str, day: int, timeslot: bool, db: Session):
""" Get the opening hours for the given place and timeslot"""
opening_hours = db.query(
models.OpeningHours.open_time,
models.OpeningHours.close_time,
).filter(
models.OpeningHours.place == place,
models.OpeningHours.day == day,
models.OpeningHours.timeslot == timeslot
).first()
return opening_hours
def create_opening_hours(new_opening_hours: schemas.OpeningHoursBase, db: Session):
""" Add opening hours to the database """
db_opening_hours = models.OpeningHours(**new_opening_hours.dict())
db.add(db_opening_hours)
db.commit()
db.refresh(db_opening_hours)
return db_opening_hours
def delete_opening_hours(id: int, db: Session):
""" Delete the opening hours with the matching id """
if id == 0:
db.query(models.OpeningHours).delete()
else:
db.query(models.OpeningHours).filter(models.OpeningHours.id == id).delete()
db.commit()
Loading