Skip to content
Snippets Groups Projects
Select Git revision
  • d20e9c482b25269015e023cf946bd893834aeb10
  • master default
  • solution
3 results

README.md

Blame
  • crud.py 13.79 KiB
    """
    Module to interact with the database
    """
    from datetime import date, datetime, time, timedelta
    from sqlalchemy.orm import Session
    from sqlalchemy.sql import func
    import pytz
    
    from db import models, schemas
    
    
    # Define CRUD operation to collect the statistics
    
    def get_waiting_time(place: str, db: Session):
        """ Get the last estimated waiting time for the given place """
        current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
        weekday, current_time = current_date.weekday(), current_date.time()
        data = {"status": False, "waiting_time": None, "next_timetable": None}
        first_timeslot = get_timeslot(place, weekday, True, db)
        if first_timeslot and current_time < first_timeslot[0]:
            data["next_timetable"] = "{:d}h{:02d}".format(first_timeslot[0].hour, first_timeslot[0].minute)
            return data
        elif first_timeslot and current_time <= first_timeslot[1]:
            last_record = db.query(
                models.Records.waiting_time
            ).filter(
                models.Records.place == place
            ).order_by(
                models.Records.date.desc()
            ).first()
            if last_record:
                waiting_time = last_record.waiting_time
                waiting_time = round(waiting_time.total_seconds() / 60)
            data["status"] = True
            data["waiting_time"] = waiting_time
            return data
        second_timeslot = get_timeslot(place, weekday, False, db)
        if second_timeslot and current_time < second_timeslot[0]:
            data["next_timetable"] = "{:d}h{:02d}".format(second_timeslot[0].hour, second_timeslot[0].minute)
            return data
        elif second_timeslot and current_time <= second_timeslot[1]:
            last_record = db.query(
                models.Records.waiting_time
            ).filter(
                models.Records.place == place
            ).order_by(
                models.Records.date.desc()
            ).first()
            if last_record:
                waiting_time = last_record.waiting_time
                waiting_time = round(waiting_time.total_seconds() / 60)
            data["status"] = True
            data["waiting_time"] = waiting_time
            return data
        return data
    
    
    def get_avg_graph_points(place: str, weekday: int, min_time: time, max_time: time, interval: timedelta, db: Session):
        """ Get the average waiting time for each interval between two time steps """
    
        def shift_time(t: time, delta: timedelta):
            return (datetime.combine(date(1, 1, 1), t) + delta).time()
    
        def avg_time_query(start_time, end_time):
            records = db.query(
                (func.round(
                    func.avg(
                        3600 * func.extract('HOUR', models.Records.waiting_time) +
                        60 * func.extract('MINUTE', models.Records.waiting_time) +
                        func.extract('SECOND', models.Records.waiting_time))
                )) / 60
            ).filter(
                models.Records.place == place,
                func.weekday(models.Records.date) == weekday,
                    (func.extract('HOUR', models.Records.date) > start_time.hour) |
                ((func.extract('HOUR', models.Records.date) == start_time.hour) &
                 (func.extract('MINUTE', models.Records.date) >= start_time.minute)),
                (func.extract('HOUR', models.Records.date) < end_time.hour) |
                ((func.extract('HOUR', models.Records.date) == end_time.hour) &
                 (func.extract('MINUTE', models.Records.date) < end_time.minute)),
            ).one()
            if records[0]:
                return int(records[0])
            return None
    
        def add_slot(slots_list, start_time, end_time):
            average_waiting_time = avg_time_query(start_time, end_time)
            if average_waiting_time:
                name = 60 * start_time.hour + start_time.minute
                slots_list.append({'name': name, 'time': average_waiting_time})
    
        stats = []
        start_time, end_time = min_time, shift_time(min_time, interval)
        while start_time < max_time:
            add_slot(stats, start_time, end_time)
            start_time, end_time = end_time, shift_time(end_time, interval)
    
        return stats
    
    
    def get_avg_graph(place: str, db: Session):
        """ Get the average waiting time for each interval between two time steps,
        for the current or next available timeslot"""
        current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
        weekday, current_time = current_date.weekday(), current_date.time()
        first_timeslot = get_timeslot(place, weekday, True, db)
        if first_timeslot and current_time <= first_timeslot[1]:
            return get_avg_graph_points(place, weekday, first_timeslot[0], first_timeslot[1], timedelta(minutes=5), db), first_timeslot[0], first_timeslot[1]
        second_timeslot = get_timeslot(place, weekday, False, db)
        if second_timeslot and current_time <= second_timeslot[1]:
            return get_avg_graph_points(place, weekday, second_timeslot[0], second_timeslot[1], timedelta(minutes=5), db), second_timeslot[0], second_timeslot[1]
        return [], None, None
    
    
    def get_current_graph_points(place: str, current_date: date, min_time: time, max_time: time, interval: timedelta, db: Session):
        """ Get the waiting time for each interval between two time steps for the current timeslot """
    
        def shift_time(t: time, delta: timedelta):
            return (datetime.combine(date(1, 1, 1), t) + delta).time()
    
        def avg_time_query(start_time, end_time):
            records = db.query(
                (func.round(
                    func.avg(
                        3600 * func.extract('HOUR', models.Records.waiting_time) +
                        60 * func.extract('MINUTE', models.Records.waiting_time) +
                        func.extract('SECOND', models.Records.waiting_time))
                )) / 60
            ).filter(
                models.Records.place == place,
                func.extract('YEAR', models.Records.date) == current_date.year,
                func.extract('MONTH', models.Records.date) == current_date.month,
                func.extract('DAY', models.Records.date) == current_date.day,
                    (func.extract('HOUR', models.Records.date) > start_time.hour) |
                ((func.extract('HOUR', models.Records.date) == start_time.hour) &
                 (func.extract('MINUTE', models.Records.date) >= start_time.minute)),
                (func.extract('HOUR', models.Records.date) < end_time.hour) |
                ((func.extract('HOUR', models.Records.date) == end_time.hour) &
                 (func.extract('MINUTE', models.Records.date) < end_time.minute)),
            ).one()
            if records[0]:
                return int(records[0])
            return None
    
        def add_slot(slots_list, start_time, end_time):
            average_waiting_time = avg_time_query(start_time, end_time)
            if average_waiting_time:
                name = 60 * start_time.hour + start_time.minute
                slots_list.append({'name': name, 'time': average_waiting_time})
    
        stats = []
        start_time, end_time = min_time, shift_time(min_time, interval)
        while start_time < max_time:
            add_slot(stats, start_time, end_time)
            start_time, end_time = end_time, shift_time(end_time, interval)
    
        return stats
    
    
    def get_current_graph(place: str, db: Session):
        """ Get the waiting_time_graph for the current timeslot"""
        current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
        weekday, day, current_time = current_date.weekday(), current_date.date(), current_date.time()
        first_timeslot = get_timeslot(place, weekday, True, db)
        if first_timeslot and current_time <= first_timeslot[0]:
            return [], None, None
        elif first_timeslot and current_time <= first_timeslot[1]:
            points =  get_current_graph_points(place, day, first_timeslot[0], current_time, timedelta(minutes=5), db)
            start_time = 60 * first_timeslot[0].hour + first_timeslot[0].minute
            end_time = 60 * first_timeslot[1].hour + first_timeslot[1].minute
            return points, start_time, end_time
        second_timeslot = get_timeslot(place, weekday, False, db)
        if second_timeslot and current_time <= second_timeslot[0]:
            return [], None, None
        elif second_timeslot and current_time <= second_timeslot[1]:
            points =  get_current_graph_points(place, day, second_timeslot[0], current_time, timedelta(minutes=5), db), second_timeslot[0], second_timeslot[1]
            start_time = 60 * second_timeslot[0].hour + second_timeslot[0].minute
            end_time = 60 * second_timeslot[1].hour + second_timeslot[1].minute
            return points, start_time, end_time
        return [], None, None
    
    
    # Define CRUD operation for the comments
    
    def get_comments(place: str, page: int, db: Session):
        """ Get  the 10 last comments for the given place """
        if page == 0:
            comments = db.query(models.Comments).order_by(models.Comments.published_at.desc(), models.Comments.id.desc()).all()
        else:
            comments = db.query(
                models.Comments).filter(
                models.Comments.place == place).order_by(
                models.Comments.published_at.desc(),
                models.Comments.id.desc()).slice(
                    (page -
                     1) *
                    10,
                    page *
                10).all()
        return comments
    
    
    def create_comment(place: str, new_comments: schemas.CommentBase, db: Session):
        """ Add a new comment to the database """
        date = datetime.now(tz=pytz.timezone("Europe/Paris"))
        db_comment = models.Comments(**new_comments.dict(), published_at=date, place=place)
        db.add(db_comment)
        db.commit()
        db.refresh(db_comment)
        return db_comment
    
    
    def delete_comment(id: int, db: Session):
        """ Delete the comment with the matching id """
        if id == 0:
            db.query(models.Comments).delete()
        else:
            db.query(models.Comments).filter(models.Comments.id == id).delete()
        db.commit()
    
    
    # Define CRUD operation for the news
    
    def get_news(place: str, db: Session):
        """ Get the news for the given place """
        news = db.query(models.News).filter(models.News.place == place).order_by(models.News.published_at.desc()).all()
        return news
    
    
    def create_news(new_news: schemas.NewsBase, db: Session):
        """ Add a news to the database """
        date = datetime.now(tz=pytz.timezone("Europe/Paris"))
        db_news = models.News(**new_news.dict(), published_at=date)
        db.add(db_news)
        db.commit()
        db.refresh(db_news)
        return db_news
    
    
    def delete_news(id: int, db: Session):
        """ Delete the news with the matching id """
        if id == 0:
            db.query(models.News).delete()
        else:
            db.query(models.News).filter(models.News.id == id).delete()
        db.commit()
    
    
    # Define CRUD operation for the opening hours
    
    def get_opening_hours(place: str, db: Session):
        """ Get the opening hours for the given place """
        opening_hours = db.query(
            models.OpeningHours
        ).filter(
            models.OpeningHours.place == place
        ).order_by(
            models.OpeningHours.day, models.OpeningHours.timeslot.desc()
        ).all()
        return opening_hours
    
    
    def get_timeslot(place: str, day: int, timeslot: bool, db: Session):
        """ Get the opening hours for the given place and timeslot"""
        opening_hours = db.query(
            models.OpeningHours.open_time,
            models.OpeningHours.close_time,
        ).filter(
            models.OpeningHours.place == place,
            models.OpeningHours.day == day,
            models.OpeningHours.timeslot == timeslot
        ).first()
        return opening_hours
    
    
    def create_opening_hours(new_opening_hours: schemas.OpeningHoursBase, db: Session):
        """ Add opening hours to the database """
        db_opening_hours = models.OpeningHours(**new_opening_hours.dict())
        db.add(db_opening_hours)
        db.commit()
        db.refresh(db_opening_hours)
        return db_opening_hours
    
    
    def delete_opening_hours(id: int, db: Session):
        """ Delete the opening hours with the matching id """
        if id == 0:
            db.query(models.OpeningHours).delete()
        else:
            db.query(models.OpeningHours).filter(models.OpeningHours.id == id).delete()
        db.commit()
    
    
    # Restaurants information
    
    def get_restaurants(db: Session):
        current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
        weekday, current_time = current_date.weekday(), current_date.time()
        restaurant_names = [r.place for r in db.query(models.OpeningHours.place).distinct()]
        restaurants = []
    
        for name in restaurant_names:
            restaurant = {}
            restaurant["name"] = name
            first_timeslot = get_timeslot(name, weekday, True, db)
            second_timeslot = get_timeslot(name, weekday, False, db)
    
            if (first_timeslot and first_timeslot[0] <= current_time < first_timeslot[1]) or (
                    second_timeslot and second_timeslot[0] <= current_time < second_timeslot[1]):
                restaurant["status"] = True
            else:
                restaurant["status"] = False
    
            if first_timeslot and second_timeslot:
                restaurant["timetable"] = (
                    f"{first_timeslot[0].hour:{'0'}{2}}h{first_timeslot[0].minute:{'0'}{2}}-"
                    f"{first_timeslot[1].hour:{'0'}{2}}h{first_timeslot[1].minute:{'0'}{2}} / "
                    f"{second_timeslot[0].hour:{'0'}{2}}h{second_timeslot[0].minute:{'0'}{2}}-"
                    f"{second_timeslot[1].hour:{'0'}{2}}h{second_timeslot[1].minute:{'0'}{2}}")
            elif first_timeslot:
                restaurant["timetable"] = (
                    f"{first_timeslot[0].hour:{'0'}{2}}h{first_timeslot[0].minute:{'0'}{2}}-"
                    f"{first_timeslot[1].hour:{'0'}{2}}h{first_timeslot[1].minute:{'0'}{2}}")
            else:
                restaurant["timeslot"] = "-"
    
            if restaurant["status"]:
                last_record = db.query(
                    models.Records
                ).filter(
                    models.Records.place == name
                ).order_by(
                    models.Records.date.desc()
                ).first()
                if last_record:
                    waiting_time = last_record.waiting_time
                    restaurant["waiting_time"] = round(waiting_time.total_seconds() / 60)
                else:
                    restaurant["waiting_time"] = None
            else:
                restaurant["waiting_time"] = None
    
            restaurants.append(restaurant)
    
        return restaurants