Skip to content
Snippets Groups Projects
Select Git revision
  • 0aaec851d6a44a2c772ad802d1806996b6930f35
  • master default
2 results

Preprocessing_1$Map.class

Blame
  • crud.py 2.96 KiB
    """
    Module to interact with the database
    """
    from datetime import date, datetime, time, timedelta
    from numpy import average
    from sqlalchemy.orm import Session
    from sqlalchemy.sql import func
    
    from db import models, schemas
    
    
    def get_records(place: str, db: Session):
        """ Get all the records for the given place """
        records = db.query(models.Records).filter(models.Records.place == place).order_by(models.Records.date.desc()).all()
        return records
    
    
    def create_record(new_record: schemas.RecordBase, db: Session):
        """ Add a new record to the database """
        db_record = models.Records(**new_record.dict())
        db.add(db_record)
        db.commit()
        db.refresh(db_record)
        return db_record
    
    
    def get_waiting_time(place: str, db: Session):
        """ Get the last estimated waiting time for the given place """
        db_record = db.query(models.Records).filter(models.Records.place == place).order_by(models.Records.date.desc()).first()
        return db_record.waiting_time
    
    
    def get_stats(place: str, weekday: int, min_time_hour: int, min_time_mn: int, max_time_hour: int, max_time_mn: int, interval: timedelta, db: Session):
        """ Get the average waiting time for each interval between two time steps """
    
        def shift_time(t: time, delta: timedelta):
            return (datetime.combine(date(1, 1, 1), t) + delta).time()
    
        def avg_time_query(start_time, end_time):
            records = db.query(
                (func.round(
                    func.avg(
                        3600 * func.extract('HOUR', models.Records.waiting_time) +
                        60 * func.extract('MINUTE', models.Records.waiting_time) +
                        func.extract('SECOND', models.Records.waiting_time))
                )) / 60
            ).filter(
                models.Records.place == place,
                func.weekday(models.Records.date) == weekday,
                    (func.extract('HOUR', models.Records.date) > start_time.hour) |
                ((func.extract('HOUR', models.Records.date) == start_time.hour) &
                 (func.extract('MINUTE', models.Records.date) >= start_time.minute)),
                (func.extract('HOUR', models.Records.date) < end_time.hour) |
                ((func.extract('HOUR', models.Records.date) == end_time.hour) &
                 (func.extract('MINUTE', models.Records.date) < end_time.minute)),
            ).one()
            if records[0]:
                return int(records[0])
            return None
    
        def add_slot(slots_list, start_time, end_time):
            average_waiting_time = avg_time_query(start_time, end_time)
            if average_waiting_time:
                name = f'{start_time.hour:02}h{start_time.minute:02}'
                slots_list.append({'name': name, 'time': average_waiting_time})
    
        min_time, max_time = time(min_time_hour, min_time_mn), time(max_time_hour, max_time_mn)
        stats = []
        start_time, end_time = min_time, shift_time(min_time, interval)
        while start_time < max_time:
            add_slot(stats, start_time, end_time)
            start_time, end_time = end_time, shift_time(end_time, interval)
    
        return stats