Skip to content
Snippets Groups Projects

Time dependence

Merged Aymeric Chaumont requested to merge time-dependence into collab_front

Files

+ 277
130
@@ -13,14 +13,38 @@ from db import models, schemas
def get_waiting_time(place: str, db: Session):
""" Get the last estimated waiting time for the given place """
db_record = db.query(models.Records).filter(models.Records.place == place).order_by(models.Records.date.desc()).first()
if db_record.waiting_time is not None:
return db_record.waiting_time
else:
raise Exception
current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
weekday, current_time = current_date.weekday(), current_date.time()
first_timeslot = get_timeslot(place, weekday, True, db)
if first_timeslot and current_time < first_timeslot[0]:
return first_timeslot[0].hour, first_timeslot[0].minute
elif first_timeslot and current_time <= first_timeslot[1]:
waiting_time = db.query(
models.Records.waiting_time
).filter(
models.Records.place == place
).order_by(
models.Records.date.desc()
).first()
waiting_time_minutes = round(waiting_time[0].total_seconds() / 60)
return waiting_time_minutes, None
second_timeslot = get_timeslot(place, weekday, False, db)
if second_timeslot and current_time < second_timeslot[0]:
return second_timeslot[0].hour, second_timeslot[0].minute
elif second_timeslot and current_time <= second_timeslot[1]:
waiting_time = db.query(
models.Records.waiting_time
).filter(
models.Records.place == place
).order_by(
models.Records.date.desc()
).first()
waiting_time_minutes = round(waiting_time[0].total_seconds() / 60)
return waiting_time_minutes, None
return None, None
def get_stats(place: str, weekday: int, min_time_hour: int, min_time_mn: int, max_time_hour: int, max_time_mn: int, interval: timedelta, db: Session):
def get_avg_graph_points(place: str, weekday: int, min_time: time, max_time: time, interval: timedelta, db: Session):
""" Get the average waiting time for each interval between two time steps """
def shift_time(t: time, delta: timedelta):
@@ -54,7 +78,6 @@ def get_stats(place: str, weekday: int, min_time_hour: int, min_time_mn: int, ma
name = f'{start_time.hour:02}h{start_time.minute:02}'
slots_list.append({'name': name, 'time': average_waiting_time})
min_time, max_time = time(min_time_hour, min_time_mn), time(max_time_hour, max_time_mn)
stats = []
start_time, end_time = min_time, shift_time(min_time, interval)
while start_time < max_time:
@@ -64,6 +87,82 @@ def get_stats(place: str, weekday: int, min_time_hour: int, min_time_mn: int, ma
return stats
def get_avg_graph(place: str, db: Session):
""" Get the average waiting time for each interval between two time steps,
for the current or next available timeslot"""
current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
weekday, current_time = current_date.weekday(), current_date.time()
first_timeslot = get_timeslot(place, weekday, True, db)
if first_timeslot and current_time <= first_timeslot[1]:
return get_avg_graph_points(place, weekday, first_timeslot[0], first_timeslot[1], timedelta(minutes=5), db)
second_timeslot = get_timeslot(place, weekday, False, db)
if second_timeslot and current_time <= second_timeslot[1]:
return get_avg_graph_points(place, weekday, second_timeslot[0], second_timeslot[1], timedelta(minutes=5), db)
return None
def get_current_graph_points(place: str, current_date: date, min_time: time, max_time: time, interval: timedelta, db: Session):
""" Get the waiting time for each interval between two time steps for the current timeslot """
def shift_time(t: time, delta: timedelta):
return (datetime.combine(date(1, 1, 1), t) + delta).time()
def avg_time_query(start_time, end_time):
records = db.query(
(func.round(
func.avg(
3600 * func.extract('HOUR', models.Records.waiting_time) +
60 * func.extract('MINUTE', models.Records.waiting_time) +
func.extract('SECOND', models.Records.waiting_time))
)) / 60
).filter(
models.Records.place == place,
func.extract('YEAR', models.Records.date) == current_date.year,
func.extract('MONTH', models.Records.date) == current_date.month,
func.extract('DAY', models.Records.date) == current_date.day,
(func.extract('HOUR', models.Records.date) > start_time.hour) |
((func.extract('HOUR', models.Records.date) == start_time.hour) &
(func.extract('MINUTE', models.Records.date) >= start_time.minute)),
(func.extract('HOUR', models.Records.date) < end_time.hour) |
((func.extract('HOUR', models.Records.date) == end_time.hour) &
(func.extract('MINUTE', models.Records.date) < end_time.minute)),
).one()
if records[0]:
return int(records[0])
return None
def add_slot(slots_list, start_time, end_time):
average_waiting_time = avg_time_query(start_time, end_time)
if average_waiting_time:
name = f'{start_time.hour:02}h{start_time.minute:02}'
slots_list.append({'name': name, 'time': average_waiting_time})
stats = []
start_time, end_time = min_time, shift_time(min_time, interval)
while start_time < max_time:
add_slot(stats, start_time, end_time)
start_time, end_time = end_time, shift_time(end_time, interval)
return stats
def get_current_graph(place: str, db: Session):
""" Get the waiting_time_graph for the current timeslot"""
current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
weekday, day, current_time = current_date.weekday(), current_date.date(), current_date.time()
first_timeslot = get_timeslot(place, weekday, True, db)
if first_timeslot and current_time <= first_timeslot[0]:
return None
elif first_timeslot and current_time <= first_timeslot[1]:
return get_current_graph_points(place, day, first_timeslot[0], current_time, timedelta(minutes=5), db)
second_timeslot = get_timeslot(place, weekday, False, db)
if second_timeslot and current_time <= second_timeslot[0]:
return None
elif second_timeslot and current_time <= second_timeslot[1]:
return get_current_graph_points(place, day, second_timeslot[0], current_time, timedelta(minutes=5), db)
return None
# Define CRUD operation for the comments
def get_comments(place: str, page: int, db: Session):
@@ -128,3 +227,51 @@ def delete_news(id: int, db: Session):
else:
db.query(models.News).filter(models.News.id == id).delete()
db.commit()
# Define CRUD operation for the opening hours
def get_opening_hours(place: str, db: Session):
""" Get the opening hours for the given place """
opening_hours = db.query(
models.OpeningHours.day,
models.OpeningHours.timeslot,
models.OpeningHours.open_time,
models.OpeningHours.close_time,
).filter(
models.OpeningHours.place == place
).order_by(
models.OpeningHours.day, models.OpeningHours.timeslot.desc()
).all()
return opening_hours
def get_timeslot(place: str, day: int, timeslot: bool, db: Session):
""" Get the opening hours for the given place and timeslot"""
opening_hours = db.query(
models.OpeningHours.open_time,
models.OpeningHours.close_time,
).filter(
models.OpeningHours.place == place,
models.OpeningHours.day == day,
models.OpeningHours.timeslot == timeslot
).first()
return opening_hours
def create_opening_hours(new_opening_hours: schemas.OpeningHoursBase, db: Session):
""" Add opening hours to the database """
db_opening_hours = models.OpeningHours(**new_opening_hours.dict())
db.add(db_opening_hours)
db.commit()
db.refresh(db_opening_hours)
return db_opening_hours
def delete_opening_hours(id: int, db: Session):
""" Delete the opening hours with the matching id """
if id == 0:
db.query(models.OpeningHours).delete()
else:
db.query(models.OpeningHours).filter(models.OpeningHours.id == id).delete()
db.commit()
Loading