Skip to content
Snippets Groups Projects
Select Git revision
  • f219e3dad3c91b02a89f771f2e0c2d4d936dd85c
  • master default
2 results

app.js

Blame
  • video_capture.py 3.16 KiB
    import cv2
    from datetime import datetime, timedelta
    import numpy as np
    import keras
    from utils.preprocessing import fix_singular_shape, norm_by_imagenet
    from db import models
    from dotenv import load_dotenv
    import os
    
    from db.database import SessionLocal
    
    
    def handle_cameras():
        model = keras.models.load_model('assets', compile=False)
        db = SessionLocal()
        load_dotenv()
        camera_number = int(os.getenv('CAM_NUMBER'))
        cameras = []
        for i in range(camera_number):
            camera = {}
            camera["place"] = os.getenv(f'CAM_{i}_PLACE')
            camera["IP"] = os.getenv(f'CAM_{i}_IP')
            camera["user"] = os.getenv(f'CAM_{i}_USER')
            camera["password"] = os.getenv(f'CAM_{i}_PASSWORD')
            camera["stream"] = os.getenv(f'CAM_{i}_STREAM')
            camera["a_factor"] = int(os.getenv(f'CAM_{i}_A_FACTOR'))
            camera["b_factor"] = int(os.getenv(f'CAM_{i}_B_FACTOR'))
            camera["framegap"] = int(os.getenv(f'CAM_{i}_FRAMEGAP'))
            camera["count"] = 0
            camera["cap"] = cv2.VideoCapture(
                f'rtsp://{camera["user"]}:{camera["password"]}@{camera["IP"]}/{camera["stream"]}')
            mask_length = int(os.getenv(f'CAM_{i}_POINTS_NB'))
            mask_points = []
            for j in range(mask_length):
                point = os.getenv(f'CAM_{i}_POINT_{j}')
                mask_points.append(list(map(int, point.split(','))))
            mask = np.zeros((720, 1280, 3), dtype=np.float32)
            cv2.fillPoly(mask, [np.array(mask_points)], (255, 255, 255))
            camera["mask"] = mask
        cameras.append(camera)
    
        while True:
            for camera in cameras:
                if camera['cap'].isOpened():
                    ret, frame = camera['cap'].read()
                    if ret and camera['count'] % camera['framegap'] == 0:
                        current_time = datetime.now()
                        masked_img = cv2.bitwise_and(
                            frame.astype(np.float32), camera["mask"])
                        treated_img = fix_singular_shape(masked_img, 16)
                        input_image = np.expand_dims(
                            np.squeeze(
                                norm_by_imagenet(
                                    np.array(
                                        [treated_img]))),
                            axis=0)
                        pred_map = np.squeeze(model.predict(input_image))
                        count_prediction = np.sum(pred_map)
                        print(count_prediction)
                        waiting_time = timedelta(
                            seconds=camera['b_factor'] +
                            int(count_prediction) *
                            camera['a_factor'])
                        db_record = models.Records(
                            place=camera['place'],
                            date=current_time,
                            density=int(count_prediction),
                            waiting_time=waiting_time)
                        db.add(db_record)
                        db.commit()
                        db.refresh(db_record)
                    camera['count'] += 1
                else:
                    camera["cap"] = cv2.VideoCapture(
                        f"rtsp://{camera['user']}:{camera['password']}@{camera['IP']}/{camera['stream']}")
                    print("tentative de reconnexion")