Select Git revision
video_capture.py 3.16 KiB
import cv2
from datetime import datetime, timedelta
import numpy as np
import keras
from utils.preprocessing import fix_singular_shape, norm_by_imagenet
from db import models
from dotenv import load_dotenv
import os
from db.database import SessionLocal
def handle_cameras():
model = keras.models.load_model('assets', compile=False)
db = SessionLocal()
load_dotenv()
camera_number = int(os.getenv('CAM_NUMBER'))
cameras = []
for i in range(camera_number):
camera = {}
camera["place"] = os.getenv(f'CAM_{i}_PLACE')
camera["IP"] = os.getenv(f'CAM_{i}_IP')
camera["user"] = os.getenv(f'CAM_{i}_USER')
camera["password"] = os.getenv(f'CAM_{i}_PASSWORD')
camera["stream"] = os.getenv(f'CAM_{i}_STREAM')
camera["a_factor"] = int(os.getenv(f'CAM_{i}_A_FACTOR'))
camera["b_factor"] = int(os.getenv(f'CAM_{i}_B_FACTOR'))
camera["framegap"] = int(os.getenv(f'CAM_{i}_FRAMEGAP'))
camera["count"] = 0
camera["cap"] = cv2.VideoCapture(
f'rtsp://{camera["user"]}:{camera["password"]}@{camera["IP"]}/{camera["stream"]}')
mask_length = int(os.getenv(f'CAM_{i}_POINTS_NB'))
mask_points = []
for j in range(mask_length):
point = os.getenv(f'CAM_{i}_POINT_{j}')
mask_points.append(list(map(int, point.split(','))))
mask = np.zeros((720, 1280, 3), dtype=np.float32)
cv2.fillPoly(mask, [np.array(mask_points)], (255, 255, 255))
camera["mask"] = mask
cameras.append(camera)
while True:
for camera in cameras:
if camera['cap'].isOpened():
ret, frame = camera['cap'].read()
if ret and camera['count'] % camera['framegap'] == 0:
current_time = datetime.now()
masked_img = cv2.bitwise_and(
frame.astype(np.float32), camera["mask"])
treated_img = fix_singular_shape(masked_img, 16)
input_image = np.expand_dims(
np.squeeze(
norm_by_imagenet(
np.array(
[treated_img]))),
axis=0)
pred_map = np.squeeze(model.predict(input_image))
count_prediction = np.sum(pred_map)
print(count_prediction)
waiting_time = timedelta(
seconds=camera['b_factor'] +
int(count_prediction) *
camera['a_factor'])
db_record = models.Records(
place=camera['place'],
date=current_time,
density=int(count_prediction),
waiting_time=waiting_time)
db.add(db_record)
db.commit()
db.refresh(db_record)
camera['count'] += 1
else:
camera["cap"] = cv2.VideoCapture(
f"rtsp://{camera['user']}:{camera['password']}@{camera['IP']}/{camera['stream']}")
print("tentative de reconnexion")