Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
V
ViaResto-website
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Aymeric Chaumont
ViaResto-website
Merge requests
!29
Time dependence
Code
Review changes
Check out branch
Download
Patches
Plain diff
Merged
Time dependence
time-dependence
into
collab_front
Overview
0
Commits
17
Pipelines
6
Changes
10
Merged
Aymeric Chaumont
requested to merge
time-dependence
into
collab_front
2 years ago
Overview
0
Commits
17
Pipelines
6
Changes
10
Expand
0
0
Merge request reports
Compare
collab_front
version 3
6dc5dcc2
2 years ago
version 2
ab5dd302
2 years ago
version 1
ab5dd302
2 years ago
collab_front (base)
and
latest version
latest version
620c514f
17 commits,
2 years ago
version 3
6dc5dcc2
16 commits,
2 years ago
version 2
ab5dd302
15 commits,
2 years ago
version 1
ab5dd302
15 commits,
2 years ago
10 files
+
474
−
282
Inline
Compare changes
Side-by-side
Inline
Show whitespace changes
Show one file at a time
Files
10
Search (e.g. *.vue) (Ctrl+P)
backend/db/crud.py
+
277
−
130
Options
"""
Module to interact with the database
"""
from
datetime
import
date
,
datetime
,
time
,
timedelta
from
sqlalchemy.orm
import
Session
from
sqlalchemy.sql
import
func
import
pytz
from
db
import
models
,
schemas
# Define CRUD operation to collect the statistics
def
get_waiting_time
(
place
:
str
,
db
:
Session
):
"""
Get the last estimated waiting time for the given place
"""
db_record
=
db
.
query
(
models
.
Records
).
filter
(
models
.
Records
.
place
==
place
).
order_by
(
models
.
Records
.
date
.
desc
()).
first
()
if
db_record
.
waiting_time
is
not
None
:
return
db_record
.
waiting_time
else
:
raise
Exception
def
get_stats
(
place
:
str
,
weekday
:
int
,
min_time_hour
:
int
,
min_time_mn
:
int
,
max_time_hour
:
int
,
max_time_mn
:
int
,
interval
:
timedelta
,
db
:
Session
):
"""
Get the average waiting time for each interval between two time steps
"""
def
shift_time
(
t
:
time
,
delta
:
timedelta
):
return
(
datetime
.
combine
(
date
(
1
,
1
,
1
),
t
)
+
delta
).
time
()
def
avg_time_query
(
start_time
,
end_time
):
records
=
db
.
query
(
(
func
.
round
(
func
.
avg
(
3600
*
func
.
extract
(
'
HOUR
'
,
models
.
Records
.
waiting_time
)
+
60
*
func
.
extract
(
'
MINUTE
'
,
models
.
Records
.
waiting_time
)
+
func
.
extract
(
'
SECOND
'
,
models
.
Records
.
waiting_time
))
))
/
60
).
filter
(
models
.
Records
.
place
==
place
,
func
.
weekday
(
models
.
Records
.
date
)
==
weekday
,
(
func
.
extract
(
'
HOUR
'
,
models
.
Records
.
date
)
>
start_time
.
hour
)
|
((
func
.
extract
(
'
HOUR
'
,
models
.
Records
.
date
)
==
start_time
.
hour
)
&
(
func
.
extract
(
'
MINUTE
'
,
models
.
Records
.
date
)
>=
start_time
.
minute
)),
(
func
.
extract
(
'
HOUR
'
,
models
.
Records
.
date
)
<
end_time
.
hour
)
|
((
func
.
extract
(
'
HOUR
'
,
models
.
Records
.
date
)
==
end_time
.
hour
)
&
(
func
.
extract
(
'
MINUTE
'
,
models
.
Records
.
date
)
<
end_time
.
minute
)),
).
one
()
if
records
[
0
]:
return
int
(
records
[
0
])
return
None
def
add_slot
(
slots_list
,
start_time
,
end_time
):
average_waiting_time
=
avg_time_query
(
start_time
,
end_time
)
if
average_waiting_time
:
name
=
f
'
{
start_time
.
hour
:
02
}
h
{
start_time
.
minute
:
02
}
'
slots_list
.
append
({
'
name
'
:
name
,
'
time
'
:
average_waiting_time
})
min_time
,
max_time
=
time
(
min_time_hour
,
min_time_mn
),
time
(
max_time_hour
,
max_time_mn
)
stats
=
[]
start_time
,
end_time
=
min_time
,
shift_time
(
min_time
,
interval
)
while
start_time
<
max_time
:
add_slot
(
stats
,
start_time
,
end_time
)
start_time
,
end_time
=
end_time
,
shift_time
(
end_time
,
interval
)
return
stats
# Define CRUD operation for the comments
def
get_comments
(
place
:
str
,
page
:
int
,
db
:
Session
):
"""
Get the 10 last comments for the given place
"""
if
page
==
0
:
comments
=
db
.
query
(
models
.
Comments
).
order_by
(
models
.
Comments
.
published_at
.
desc
(),
models
.
Comments
.
id
.
desc
()).
all
()
else
:
comments
=
db
.
query
(
models
.
Comments
).
filter
(
models
.
Comments
.
place
==
place
).
order_by
(
models
.
Comments
.
published_at
.
desc
(),
models
.
Comments
.
id
.
desc
()).
slice
(
(
page
-
1
)
*
10
,
page
*
10
).
all
()
return
comments
def
create_comment
(
place
:
str
,
new_comments
:
schemas
.
CommentBase
,
db
:
Session
):
"""
Add a new comment to the database
"""
date
=
datetime
.
now
(
tz
=
pytz
.
timezone
(
"
Europe/Paris
"
))
db_comment
=
models
.
Comments
(
**
new_comments
.
dict
(),
published_at
=
date
,
place
=
place
)
db
.
add
(
db_comment
)
db
.
commit
()
db
.
refresh
(
db_comment
)
return
db_comment
def
delete_comment
(
id
:
int
,
db
:
Session
):
"""
Delete the comment with the matching id
"""
if
id
==
0
:
db
.
query
(
models
.
Comments
).
delete
()
else
:
db
.
query
(
models
.
Comments
).
filter
(
models
.
Comments
.
id
==
id
).
delete
()
db
.
commit
()
# Define CRUD operation for the news
def
get_news
(
place
:
str
,
db
:
Session
):
"""
Get the news for the given place
"""
news
=
db
.
query
(
models
.
News
).
filter
(
models
.
News
.
place
==
place
).
order_by
(
models
.
News
.
published_at
.
desc
()).
all
()
return
news
def
create_news
(
new_news
:
schemas
.
NewsBase
,
db
:
Session
):
"""
Add a news to the database
"""
date
=
datetime
.
now
(
tz
=
pytz
.
timezone
(
"
Europe/Paris
"
))
db_news
=
models
.
News
(
**
new_news
.
dict
(),
published_at
=
date
)
db
.
add
(
db_news
)
db
.
commit
()
db
.
refresh
(
db_news
)
return
db_news
def
delete_news
(
id
:
int
,
db
:
Session
):
"""
Delete the news with the matching id
"""
if
id
==
0
:
db
.
query
(
models
.
News
).
delete
()
else
:
db
.
query
(
models
.
News
).
filter
(
models
.
News
.
id
==
id
).
delete
()
db
.
commit
()
"""
Module to interact with the database
"""
from
datetime
import
date
,
datetime
,
time
,
timedelta
from
sqlalchemy.orm
import
Session
from
sqlalchemy.sql
import
func
import
pytz
from
db
import
models
,
schemas
# Define CRUD operation to collect the statistics
def
get_waiting_time
(
place
:
str
,
db
:
Session
):
"""
Get the last estimated waiting time for the given place
"""
current_date
=
datetime
.
now
(
tz
=
pytz
.
timezone
(
"
Europe/Paris
"
))
weekday
,
current_time
=
current_date
.
weekday
(),
current_date
.
time
()
first_timeslot
=
get_timeslot
(
place
,
weekday
,
True
,
db
)
if
first_timeslot
and
current_time
<
first_timeslot
[
0
]:
return
first_timeslot
[
0
].
hour
,
first_timeslot
[
0
].
minute
elif
first_timeslot
and
current_time
<=
first_timeslot
[
1
]:
waiting_time
=
db
.
query
(
models
.
Records
.
waiting_time
).
filter
(
models
.
Records
.
place
==
place
).
order_by
(
models
.
Records
.
date
.
desc
()
).
first
()
waiting_time_minutes
=
round
(
waiting_time
[
0
].
total_seconds
()
/
60
)
return
waiting_time_minutes
,
None
second_timeslot
=
get_timeslot
(
place
,
weekday
,
False
,
db
)
if
second_timeslot
and
current_time
<
second_timeslot
[
0
]:
return
second_timeslot
[
0
].
hour
,
second_timeslot
[
0
].
minute
elif
second_timeslot
and
current_time
<=
second_timeslot
[
1
]:
waiting_time
=
db
.
query
(
models
.
Records
.
waiting_time
).
filter
(
models
.
Records
.
place
==
place
).
order_by
(
models
.
Records
.
date
.
desc
()
).
first
()
waiting_time_minutes
=
round
(
waiting_time
[
0
].
total_seconds
()
/
60
)
return
waiting_time_minutes
,
None
return
None
,
None
def
get_avg_graph_points
(
place
:
str
,
weekday
:
int
,
min_time
:
time
,
max_time
:
time
,
interval
:
timedelta
,
db
:
Session
):
"""
Get the average waiting time for each interval between two time steps
"""
def
shift_time
(
t
:
time
,
delta
:
timedelta
):
return
(
datetime
.
combine
(
date
(
1
,
1
,
1
),
t
)
+
delta
).
time
()
def
avg_time_query
(
start_time
,
end_time
):
records
=
db
.
query
(
(
func
.
round
(
func
.
avg
(
3600
*
func
.
extract
(
'
HOUR
'
,
models
.
Records
.
waiting_time
)
+
60
*
func
.
extract
(
'
MINUTE
'
,
models
.
Records
.
waiting_time
)
+
func
.
extract
(
'
SECOND
'
,
models
.
Records
.
waiting_time
))
))
/
60
).
filter
(
models
.
Records
.
place
==
place
,
func
.
weekday
(
models
.
Records
.
date
)
==
weekday
,
(
func
.
extract
(
'
HOUR
'
,
models
.
Records
.
date
)
>
start_time
.
hour
)
|
((
func
.
extract
(
'
HOUR
'
,
models
.
Records
.
date
)
==
start_time
.
hour
)
&
(
func
.
extract
(
'
MINUTE
'
,
models
.
Records
.
date
)
>=
start_time
.
minute
)),
(
func
.
extract
(
'
HOUR
'
,
models
.
Records
.
date
)
<
end_time
.
hour
)
|
((
func
.
extract
(
'
HOUR
'
,
models
.
Records
.
date
)
==
end_time
.
hour
)
&
(
func
.
extract
(
'
MINUTE
'
,
models
.
Records
.
date
)
<
end_time
.
minute
)),
).
one
()
if
records
[
0
]:
return
int
(
records
[
0
])
return
None
def
add_slot
(
slots_list
,
start_time
,
end_time
):
average_waiting_time
=
avg_time_query
(
start_time
,
end_time
)
if
average_waiting_time
:
name
=
f
'
{
start_time
.
hour
:
02
}
h
{
start_time
.
minute
:
02
}
'
slots_list
.
append
({
'
name
'
:
name
,
'
time
'
:
average_waiting_time
})
stats
=
[]
start_time
,
end_time
=
min_time
,
shift_time
(
min_time
,
interval
)
while
start_time
<
max_time
:
add_slot
(
stats
,
start_time
,
end_time
)
start_time
,
end_time
=
end_time
,
shift_time
(
end_time
,
interval
)
return
stats
def
get_avg_graph
(
place
:
str
,
db
:
Session
):
"""
Get the average waiting time for each interval between two time steps,
for the current or next available timeslot
"""
current_date
=
datetime
.
now
(
tz
=
pytz
.
timezone
(
"
Europe/Paris
"
))
weekday
,
current_time
=
current_date
.
weekday
(),
current_date
.
time
()
first_timeslot
=
get_timeslot
(
place
,
weekday
,
True
,
db
)
if
first_timeslot
and
current_time
<=
first_timeslot
[
1
]:
return
get_avg_graph_points
(
place
,
weekday
,
first_timeslot
[
0
],
first_timeslot
[
1
],
timedelta
(
minutes
=
5
),
db
)
second_timeslot
=
get_timeslot
(
place
,
weekday
,
False
,
db
)
if
second_timeslot
and
current_time
<=
second_timeslot
[
1
]:
return
get_avg_graph_points
(
place
,
weekday
,
second_timeslot
[
0
],
second_timeslot
[
1
],
timedelta
(
minutes
=
5
),
db
)
return
None
def
get_current_graph_points
(
place
:
str
,
current_date
:
date
,
min_time
:
time
,
max_time
:
time
,
interval
:
timedelta
,
db
:
Session
):
"""
Get the waiting time for each interval between two time steps for the current timeslot
"""
def
shift_time
(
t
:
time
,
delta
:
timedelta
):
return
(
datetime
.
combine
(
date
(
1
,
1
,
1
),
t
)
+
delta
).
time
()
def
avg_time_query
(
start_time
,
end_time
):
records
=
db
.
query
(
(
func
.
round
(
func
.
avg
(
3600
*
func
.
extract
(
'
HOUR
'
,
models
.
Records
.
waiting_time
)
+
60
*
func
.
extract
(
'
MINUTE
'
,
models
.
Records
.
waiting_time
)
+
func
.
extract
(
'
SECOND
'
,
models
.
Records
.
waiting_time
))
))
/
60
).
filter
(
models
.
Records
.
place
==
place
,
func
.
extract
(
'
YEAR
'
,
models
.
Records
.
date
)
==
current_date
.
year
,
func
.
extract
(
'
MONTH
'
,
models
.
Records
.
date
)
==
current_date
.
month
,
func
.
extract
(
'
DAY
'
,
models
.
Records
.
date
)
==
current_date
.
day
,
(
func
.
extract
(
'
HOUR
'
,
models
.
Records
.
date
)
>
start_time
.
hour
)
|
((
func
.
extract
(
'
HOUR
'
,
models
.
Records
.
date
)
==
start_time
.
hour
)
&
(
func
.
extract
(
'
MINUTE
'
,
models
.
Records
.
date
)
>=
start_time
.
minute
)),
(
func
.
extract
(
'
HOUR
'
,
models
.
Records
.
date
)
<
end_time
.
hour
)
|
((
func
.
extract
(
'
HOUR
'
,
models
.
Records
.
date
)
==
end_time
.
hour
)
&
(
func
.
extract
(
'
MINUTE
'
,
models
.
Records
.
date
)
<
end_time
.
minute
)),
).
one
()
if
records
[
0
]:
return
int
(
records
[
0
])
return
None
def
add_slot
(
slots_list
,
start_time
,
end_time
):
average_waiting_time
=
avg_time_query
(
start_time
,
end_time
)
if
average_waiting_time
:
name
=
f
'
{
start_time
.
hour
:
02
}
h
{
start_time
.
minute
:
02
}
'
slots_list
.
append
({
'
name
'
:
name
,
'
time
'
:
average_waiting_time
})
stats
=
[]
start_time
,
end_time
=
min_time
,
shift_time
(
min_time
,
interval
)
while
start_time
<
max_time
:
add_slot
(
stats
,
start_time
,
end_time
)
start_time
,
end_time
=
end_time
,
shift_time
(
end_time
,
interval
)
return
stats
def
get_current_graph
(
place
:
str
,
db
:
Session
):
"""
Get the waiting_time_graph for the current timeslot
"""
current_date
=
datetime
.
now
(
tz
=
pytz
.
timezone
(
"
Europe/Paris
"
))
weekday
,
day
,
current_time
=
current_date
.
weekday
(),
current_date
.
date
(),
current_date
.
time
()
first_timeslot
=
get_timeslot
(
place
,
weekday
,
True
,
db
)
if
first_timeslot
and
current_time
<=
first_timeslot
[
0
]:
return
None
elif
first_timeslot
and
current_time
<=
first_timeslot
[
1
]:
return
get_current_graph_points
(
place
,
day
,
first_timeslot
[
0
],
current_time
,
timedelta
(
minutes
=
5
),
db
)
second_timeslot
=
get_timeslot
(
place
,
weekday
,
False
,
db
)
if
second_timeslot
and
current_time
<=
second_timeslot
[
0
]:
return
None
elif
second_timeslot
and
current_time
<=
second_timeslot
[
1
]:
return
get_current_graph_points
(
place
,
day
,
second_timeslot
[
0
],
current_time
,
timedelta
(
minutes
=
5
),
db
)
return
None
# Define CRUD operation for the comments
def
get_comments
(
place
:
str
,
page
:
int
,
db
:
Session
):
"""
Get the 10 last comments for the given place
"""
if
page
==
0
:
comments
=
db
.
query
(
models
.
Comments
).
order_by
(
models
.
Comments
.
published_at
.
desc
(),
models
.
Comments
.
id
.
desc
()).
all
()
else
:
comments
=
db
.
query
(
models
.
Comments
).
filter
(
models
.
Comments
.
place
==
place
).
order_by
(
models
.
Comments
.
published_at
.
desc
(),
models
.
Comments
.
id
.
desc
()).
slice
(
(
page
-
1
)
*
10
,
page
*
10
).
all
()
return
comments
def
create_comment
(
place
:
str
,
new_comments
:
schemas
.
CommentBase
,
db
:
Session
):
"""
Add a new comment to the database
"""
date
=
datetime
.
now
(
tz
=
pytz
.
timezone
(
"
Europe/Paris
"
))
db_comment
=
models
.
Comments
(
**
new_comments
.
dict
(),
published_at
=
date
,
place
=
place
)
db
.
add
(
db_comment
)
db
.
commit
()
db
.
refresh
(
db_comment
)
return
db_comment
def
delete_comment
(
id
:
int
,
db
:
Session
):
"""
Delete the comment with the matching id
"""
if
id
==
0
:
db
.
query
(
models
.
Comments
).
delete
()
else
:
db
.
query
(
models
.
Comments
).
filter
(
models
.
Comments
.
id
==
id
).
delete
()
db
.
commit
()
# Define CRUD operation for the news
def
get_news
(
place
:
str
,
db
:
Session
):
"""
Get the news for the given place
"""
news
=
db
.
query
(
models
.
News
).
filter
(
models
.
News
.
place
==
place
).
order_by
(
models
.
News
.
published_at
.
desc
()).
all
()
return
news
def
create_news
(
new_news
:
schemas
.
NewsBase
,
db
:
Session
):
"""
Add a news to the database
"""
date
=
datetime
.
now
(
tz
=
pytz
.
timezone
(
"
Europe/Paris
"
))
db_news
=
models
.
News
(
**
new_news
.
dict
(),
published_at
=
date
)
db
.
add
(
db_news
)
db
.
commit
()
db
.
refresh
(
db_news
)
return
db_news
def
delete_news
(
id
:
int
,
db
:
Session
):
"""
Delete the news with the matching id
"""
if
id
==
0
:
db
.
query
(
models
.
News
).
delete
()
else
:
db
.
query
(
models
.
News
).
filter
(
models
.
News
.
id
==
id
).
delete
()
db
.
commit
()
# Define CRUD operation for the opening hours
def
get_opening_hours
(
place
:
str
,
db
:
Session
):
"""
Get the opening hours for the given place
"""
opening_hours
=
db
.
query
(
models
.
OpeningHours
.
day
,
models
.
OpeningHours
.
timeslot
,
models
.
OpeningHours
.
open_time
,
models
.
OpeningHours
.
close_time
,
).
filter
(
models
.
OpeningHours
.
place
==
place
).
order_by
(
models
.
OpeningHours
.
day
,
models
.
OpeningHours
.
timeslot
.
desc
()
).
all
()
return
opening_hours
def
get_timeslot
(
place
:
str
,
day
:
int
,
timeslot
:
bool
,
db
:
Session
):
"""
Get the opening hours for the given place and timeslot
"""
opening_hours
=
db
.
query
(
models
.
OpeningHours
.
open_time
,
models
.
OpeningHours
.
close_time
,
).
filter
(
models
.
OpeningHours
.
place
==
place
,
models
.
OpeningHours
.
day
==
day
,
models
.
OpeningHours
.
timeslot
==
timeslot
).
first
()
return
opening_hours
def
create_opening_hours
(
new_opening_hours
:
schemas
.
OpeningHoursBase
,
db
:
Session
):
"""
Add opening hours to the database
"""
db_opening_hours
=
models
.
OpeningHours
(
**
new_opening_hours
.
dict
())
db
.
add
(
db_opening_hours
)
db
.
commit
()
db
.
refresh
(
db_opening_hours
)
return
db_opening_hours
def
delete_opening_hours
(
id
:
int
,
db
:
Session
):
"""
Delete the opening hours with the matching id
"""
if
id
==
0
:
db
.
query
(
models
.
OpeningHours
).
delete
()
else
:
db
.
query
(
models
.
OpeningHours
).
filter
(
models
.
OpeningHours
.
id
==
id
).
delete
()
db
.
commit
()
Loading