Skip to content
Snippets Groups Projects
Select Git revision
  • 63fa4023d8f484bcc1763f817d93081b6e75e971
  • main default
2 results

index.js

Blame
  • crud.py 20.08 KiB
    """
    Module to interact with the database
    """
    from datetime import date, datetime, time, timedelta
    from fastapi import HTTPException
    from sqlalchemy.orm import Session
    from sqlalchemy.sql import func
    from sqlalchemy import Time, Date, cast
    from uuid import uuid4
    import secrets
    import pytz
    
    from db import models, schemas
    
    
    # Define CRUD operation to collect the statistics
    
    def get_waiting_time(place: str, db: Session):
        """ Get the last estimated waiting time for the given place """
        current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
        date, weekday, current_time = current_date.date(), current_date.weekday(), current_date.time()
        opening_hours = db.query(
            models.OpeningHours.open_time,
            models.OpeningHours.close_time).filter(
            models.OpeningHours.place == place,
            models.OpeningHours.day == weekday).order_by(
            models.OpeningHours.open_time).all()
        for time_slot in opening_hours:
            closure = db.query(
                models.Closure).filter(
                models.Closure.place == place,
                models.Closure.beginning_date <= datetime.combine(date, time_slot.open_time),
                models.Closure.end_date >= datetime.combine(date, time_slot.open_time)).order_by(
                models.Closure.beginning_date).first()
            if not closure:
                if current_time < time_slot.open_time:
                    return schemas.WaitingTime(next_timetable=time_slot.open_time.strftime('%Hh%M'))
                elif current_time <= time_slot.close_time:
                    limit = datetime.combine(date, time_slot.open_time)
                    last_record = db.query(
                        models.Records.waiting_time).filter(
                        models.Records.place == place).filter(
                        models.Records.date >= limit).order_by(
                        models.Records.date.desc()).first()
                    waiting_time = None
                    if last_record:
                        waiting_time = round(
                            last_record.waiting_time.total_seconds() / 60)
                    return schemas.WaitingTime(status=True, waiting_time=waiting_time)
        return schemas.WaitingTime()
    
    
    # Define some utils function
    def shift_time(t: time, delta: timedelta):
        return (datetime.combine(date(1, 1, 1), t) + delta).time()
    
    
    def add_slot(slots_list, start_time, end_time, function):
        waiting_time = function(start_time, end_time)
        if waiting_time:
            name = 60 * start_time.hour + start_time.minute
            slots_list.append(schemas.RecordRead(name=name, time=waiting_time))
    
    
    def get_avg_graph_points(place: str, weekday: int, min_time: time,
                             max_time: time, interval: timedelta, db: Session):
        """ Get the average waiting time for each interval between two time steps """
    
        def avg_time_query(start_time, end_time):
            records = db.query(
                func.round(
                    func.avg(
                        60 * func.extract('HOUR', models.Records.waiting_time) +
                        func.extract('MINUTE', models.Records.waiting_time))
                )
            ).filter(
                models.Records.place == place,
                func.weekday(models.Records.date) == weekday,
                cast(models.Records.date, Time) >= start_time,
                cast(models.Records.date, Time) <= end_time,
            ).first()
            if records[0] or records[0] == 0:
                return int(records[0])
            return None
    
        stats = []
        start_time, end_time = min_time, shift_time(min_time, interval)
        while start_time < max_time:
            add_slot(stats, start_time, end_time, avg_time_query)
            start_time, end_time = end_time, shift_time(end_time, interval)
    
        return stats
    
    
    def get_avg_graph(place: str, db: Session):
        """ Get the average waiting time for each interval between two time steps,
        for the current or next available timeslot"""
        current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
        weekday, current_time = current_date.weekday(), current_date.time()
        opening_hours = db.query(
            models.OpeningHours.open_time,
            models.OpeningHours.close_time).filter(
            models.OpeningHours.place == place,
            models.OpeningHours.day == weekday).order_by(
            models.OpeningHours.open_time).all()
        closure = db.query(
            models.Closure).filter(
            models.Closure.place == place,
            models.Closure.beginning_date <= current_date,
            models.Closure.end_date >= current_date).first()
        if not closure:
            for time_slot in opening_hours:
                if current_time <= time_slot.close_time:
                    return get_avg_graph_points(place, weekday, time_slot.open_time, time_slot.close_time, timedelta(minutes=5), db)
        return []
    
    
    def get_current_graph_points(place: str, current_date: date,
                                 min_time: time, max_time: time, interval: timedelta, db: Session):
        """ Get the waiting time for each interval between two time steps for the current timeslot """
    
        def current_time_query(start_time, end_time):
            records = db.query(
                func.round(
                    func.avg(
                        60 * func.extract('HOUR', models.Records.waiting_time) +
                        func.extract('MINUTE', models.Records.waiting_time))
                )
            ).filter(
                models.Records.place == place,
                cast(models.Records.date, Date) == current_date,
                cast(models.Records.date, Time) >= start_time,
                cast(models.Records.date, Time) <= end_time
            ).first()
            if records[0] or records[0] == 0:
                return int(records[0])
            return None
    
        stats = []
        start_time, end_time = min_time, shift_time(min_time, interval)
        while start_time < max_time:
            add_slot(stats, start_time, end_time, current_time_query)
            start_time, end_time = end_time, shift_time(end_time, interval)
    
        return stats
    
    
    def get_current_graph(place: str, db: Session):
        """ Get the waiting_time_graph for the current timeslot"""
        current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
        weekday, day, current_time = current_date.weekday(), current_date.date(), current_date.time()
        opening_hours = db.query(
            models.OpeningHours.open_time,
            models.OpeningHours.close_time).filter(
            models.OpeningHours.place == place,
            models.OpeningHours.day == weekday).all()
        closure = db.query(
            models.Closure).filter(
            models.Closure.place == place,
            models.Closure.beginning_date <= current_date,
            models.Closure.end_date >= current_date).first()
        if not closure:
            for time_slot in opening_hours:
                if time_slot.open_time <= current_time <= time_slot.close_time:
                    points = get_current_graph_points(
                        place, day, time_slot.open_time, current_time, timedelta(minutes=5), db)
                    start_time = 60 * time_slot.open_time.hour + time_slot.open_time.minute
                    end_time = 60 * time_slot.close_time.hour + time_slot.close_time.minute
                    return schemas.Graph(data=points, start=start_time, end=end_time)
        return schemas.Graph(data=[])
    
    
    # Define CRUD operation for the comments
    
    def get_comments(place: str, page: int, db: Session):
        """ Get the 20 last comments for the given place """
        if page == 0:
            comments = db.query(
                models.Comments).order_by(
                models.Comments.published_at.desc(),
                models.Comments.id.desc()).all()
        else:
            comments = db.query(
                models.Comments,
                models.Users.username).join(
                models.Users).filter(
                models.Comments.place == place).order_by(
                models.Comments.published_at.desc(),
                models.Comments.id.desc()).slice(
                (page - 1) * 20, page * 20).all()
        comments_list = [schemas.Comment(**comment.__dict__, username=username) for comment, username in comments]
        comments_list.reverse()
        return comments_list
    
    
    def create_comment(user: schemas.User, place: str, new_comments: schemas.CommentBase, db: Session):
        """ Add a new comment to the database """
        date = datetime.now(tz=pytz.timezone("Europe/Paris"))
        db_comment = models.Comments(**new_comments.dict(), published_at=date, place=place, user_id=user.id)
        db.add(db_comment)
        db.commit()
        db.refresh(db_comment)
        return schemas.Comment(**db_comment.__dict__, username=user.username)
    
    
    def delete_comment(id: int, db: Session):
        """ Delete the comment with the matching id """
        if id == 0:
            db.query(models.Comments).delete()
        else:
            db.query(models.Comments).filter(models.Comments.id == id).delete()
        db.commit()
    
    
    # Define CRUD operation for the news
    
    def get_news(place: str, admin: bool, db: Session):
        """ Get the news for the given place """
        current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
        news = db.query(
            models.News).filter(
            models.News.place == place,
            models.News.end_date >= current_date).order_by(
            models.News.published_at.desc()).all()
        if admin:
            return news
    
        opening_hours = db.query(
            models.OpeningHours.open_time,
            models.OpeningHours.close_time,
            models.OpeningHours.day).filter(
            models.OpeningHours.place == place,
            models.OpeningHours.day >= current_date.weekday()).order_by(
            models.OpeningHours.day,
            models.OpeningHours.open_time).all()
        if not opening_hours:
            opening_hours = db.query(
                models.OpeningHours.open_time,
                models.OpeningHours.close_time,
                models.OpeningHours.day).filter(
                models.OpeningHours.place == place).order_by(
                models.OpeningHours.day,
                models.OpeningHours.open_time).all()
        next_time_slot = None
        for open_time, close_time, day in opening_hours:
            next_date = current_date + timedelta(days=day - current_date.weekday())
            if day < current_date.weekday():
                next_date = next_date + timedelta(days=7)
                next_time_slot = datetime.combine(next_date.date(), open_time)
                break
            if current_date < pytz.timezone("Europe/Paris").localize(datetime.combine(next_date.date(), close_time)):
                next_time_slot = datetime.combine(next_date.date(), open_time)
                break
        if next_time_slot:
            closure = db.query(
                models.Closure).filter(
                models.Closure.place == place,
                models.Closure.beginning_date <= next_time_slot,
                models.Closure.end_date > next_time_slot).first()
            if closure:
                closure_news = schemas.News(
                    title="Fermeture exceptionnelle",
                    content=f"{place} est exceptionnellement hors service jusqu'au {closure.end_date.strftime('%d/%m/%y à %Hh%M')}",
                    end_date=closure.end_date,
                    place=place,
                    published_at=current_date)
                news.append(closure_news)
    
        return news
    
    
    def create_news(new_news: schemas.NewsBase, db: Session):
        """ Add a news to the database """
        date = datetime.now(tz=pytz.timezone("Europe/Paris"))
        db_news = models.News(**new_news.dict(), published_at=date)
        db.add(db_news)
        db.commit()
        db.refresh(db_news)
        return db_news
    
    
    def delete_news(id: int, db: Session):
        """ Delete the news with the matching id """
        if id == 0:
            db.query(models.News).delete()
        else:
            db.query(models.News).filter(models.News.id == id).delete()
        db.commit()
    
    
    # Define CRUD operation for the opening hours
    
    def get_opening_hours(place: str, db: Session):
        """ Get the opening hours for the given place """
        opening_hours = db.query(
            models.OpeningHours
        ).filter(
            models.OpeningHours.place == place
        ).order_by(
            models.OpeningHours.day, models.OpeningHours.open_time
        ).all()
        return opening_hours
    
    
    def create_opening_hours(
            new_opening_hours: schemas.OpeningHoursBase, db: Session):
        """ Add opening hours to the database """
        db_opening_hours = models.OpeningHours(**new_opening_hours.dict())
        db.add(db_opening_hours)
        db.commit()
        db.refresh(db_opening_hours)
        return db_opening_hours
    
    
    def delete_opening_hours(id: int, db: Session):
        """ Delete the opening hours with the matching id """
        if id == 0:
            db.query(models.OpeningHours).delete()
        else:
            db.query(
                models.OpeningHours).filter(
                models.OpeningHours.id == id).delete()
        db.commit()
    
    
    # Restaurants information
    
    def get_restaurants(db: Session):
        weekday = datetime.now(tz=pytz.timezone("Europe/Paris")).weekday()
        places = db.query(models.OpeningHours.place).distinct()
        restaurants = []
    
        for place in places:
            opening_hours = db.query(
                models.OpeningHours).filter(
                models.OpeningHours.place == place.place,
                models.OpeningHours.day == weekday).order_by(
                models.OpeningHours.open_time).all()
            opening_hours_formated = [
                f"{row.open_time.strftime('%Hh%M')}-{row.close_time.strftime('%Hh%M')}" for row in opening_hours]
            timetable = "/".join(opening_hours_formated)
            infos = get_waiting_time(place.place, db)
            restaurants.append(schemas.Restaurant(
                **infos.dict(), name=place.place, timetable=timetable))
    
        return restaurants
    
    
    # Define CRUD operation for the authentication
    
    def init_user(db: Session):
        """ Add a news to the database """
        cookie = uuid4()
        state = secrets.token_urlsafe(30)
        expiration_date = datetime.now(tz=pytz.timezone(
            "Europe/Paris")) + timedelta(minutes=10)
        db_user = models.Users(state=state, cookie=cookie,
                               expiration_date=expiration_date)
        db.add(db_user)
        db.commit()
        db.refresh(db_user)
        return db_user
    
    
    def get_user(cookie: str, db: Session):
        """ Get user infos """
        try:
            user = db.query(models.Users).filter(
                models.Users.cookie == cookie).one()
        except BaseException:
            raise HTTPException(status_code=401, detail="Invalid cookie")
    
        if pytz.timezone("Europe/Paris").localize(user.expiration_date) < datetime.now(tz=pytz.timezone("Europe/Paris")):
            user.cookie = None
            db.add(user)
            db.commit()
            raise HTTPException(status_code=401, detail="Expired cookie")
    
        return user
    
    
    def delete_state(user: schemas.User, db: Session):
        """ Delete the state of a user """
        user.state = None
        db.add(user)
        db.commit()
    
    
    def update_user(user: schemas.User, user_info: dict, db: Session):
        full_name = f"{user_info['firstName']} {user_info['lastName']}"
        expiration_date = datetime.now(
            tz=pytz.timezone("Europe/Paris")) + timedelta(days=3)
        existing_user = db.query(models.Users).filter(
            models.Users.username == full_name).first()
        if existing_user:
            existing_user.cookie = user.cookie
            existing_user.expiration_date = expiration_date
            existing_user.admin = "admin eatfast" in user_info["roles"]
            db.delete(user)
            db.add(existing_user)
            db.commit()
            db.refresh(existing_user)
            return existing_user
        else:
            user.username = full_name
            user.expiration_date = expiration_date
            user.admin = "admin eatfast" in user_info["roles"]
            db.add(user)
            db.commit()
            db.refresh(user)
            return user
    
    
    def end_session(cookie: str, db: Session):
        user = db.query(models.Users).filter(models.Users.cookie == cookie).one()
        user.expiration_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
        user.cookie = None
        db.add(user)
        db.commit()
        return
    
    
    def delete_user(cookie: str, db: Session):
        db.query(models.Users).filter(models.Users.cookie == cookie).delete()
        db.commit()
        return
    
    
    # Define CRUD operations for data collection
    
    def get_records(place: str, db: Session):
        records = db.query(models.Records).filter(
            models.Records.place == place).order_by(models.Records.date.desc()).all()
        return records
    
    
    def create_record(record: schemas.RecordBase, db: Session):
        db_record = models.Records(**record.dict())
        db.add(db_record)
        db.commit()
        db.refresh(db_record)
        return db_record
    
    
    def delete_record(id: int, db: Session):
        if id == 0:
            db.query(models.Records).delete()
        else:
            db.query(models.Records).filter(models.Records.id == id).delete()
        db.commit()
        return
    
    
    def get_collaborative_records(place: str, db: Session):
        records = db.query(models.CollaborativeRecords).filter(
            models.CollaborativeRecords.place == place).order_by(models.CollaborativeRecords.date.desc()).all()
        return [schemas.CollaborativeRecords(**record.__dict__) for record in records]
    
    
    def create_collaborative_record(user: schemas.User, place: str, db: Session):
        current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
        date, weekday, current_time = current_date.date(
        ), current_date.weekday(), current_date.time()
    
        try:
            time_slot = db.query(
                models.OpeningHours).filter(
                models.OpeningHours.place == place,
                models.OpeningHours.day == weekday,
                models.OpeningHours.open_time <= current_time,
                models.OpeningHours.close_time >= current_time).one()
        except BaseException:
            raise HTTPException(status_code=404, detail="No restaurant opened")
    
        last_record = db.query(models.CollaborativeRecords).filter(
            models.CollaborativeRecords.user_id == user.id).order_by(models.CollaborativeRecords.date.desc()).first()
        if not last_record or last_record.date <= datetime.combine(date, time_slot.open_time):
            db_record = models.CollaborativeRecords(
                user_id=user.id, place=place, date=current_date)
            db.add(db_record)
            db.commit()
            db.refresh(db_record)
            return db_record
    
        raise HTTPException(status_code=406, detail="Client already registered")
    
    
    def update_collaborative_record(user: schemas.User, db: Session):
        current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
        date, weekday, current_time = current_date.date(
        ), current_date.weekday(), current_date.time()
        last_record = db.query(models.CollaborativeRecords).filter(
            models.CollaborativeRecords.user_id == user.id).order_by(models.CollaborativeRecords.date.desc()).first()
    
        try:
            time_slot = db.query(
                models.OpeningHours).filter(
                models.OpeningHours.place == last_record.place,
                models.OpeningHours.day == weekday,
                models.OpeningHours.open_time <= current_time,
                models.OpeningHours.close_time >= current_time).one()
        except BaseException:
            raise HTTPException(status_code=404, detail="No restaurant opened")
    
        if last_record.date >= datetime.combine(date, time_slot.open_time) and not last_record.waiting_time:
            last_record.waiting_time = current_date - \
                pytz.timezone("Europe/Paris").localize(last_record.date)
            print(last_record.waiting_time)
            db.add(last_record)
            db.commit()
            db.refresh(last_record)
            return schemas.CollaborativeRecords(**last_record.__dict__)
    
        raise HTTPException(status_code=406, detail="Client already registered")
    
    
    def delete_collaborative_record(id: int, db: Session):
        if id == 0:
            db.query(models.CollaborativeRecords).delete()
        else:
            db.query(models.CollaborativeRecords).filter(
                models.CollaborativeRecords.id == id).delete()
        db.commit()
        return
    
    
    # Define CRUD operation for exceptional closure
    
    def get_closure(place: str, db: Session):
        current_date = datetime.now(tz=pytz.timezone("Europe/Paris"))
        closures = db.query(
            models.Closure).filter(
            models.Closure.place == place,
            models.Closure.end_date >= current_date).order_by(
            models.Closure.beginning_date).all()
        return [schemas.Closure(**closure.__dict__) for closure in closures]
    
    
    def create_closure(closure: schemas.Closure, db: Session):
        db_closure = models.Closure(**closure.dict())
        db.add(db_closure)
        db.commit()
        db.refresh(db_closure)
        return schemas.Closure(**db_closure.__dict__)
    
    
    def delete_closure(id: int, db: Session):
        if id == 0:
            db.query(models.Closure).delete()
        else:
            db.query(models.Closure).filter(models.Closure.id == id).delete()
        db.commit()
        return