Skip to content
Snippets Groups Projects
Select Git revision
  • 38ecad5c7212f7a5cffc1d0d4ab9bace1b89fa05
  • master default
  • clement
  • fix_requirements
  • new_signup
  • interface_admin
  • hamza
  • dev
  • test
  • melissa
  • context_sheet
  • sorties_new
  • Seon82-patch-2
  • export_bdd
  • refactor/participation-user-link
15 results

test_signals.py

Blame
  • video-capture.py 1.15 KiB
    import cv2
    from datetime import datetime, timedelta
    import numpy as np
    import keras
    from utils.preprocessing import fix_singular_shape, norm_by_imagenet
    from db import models, database, schemas
    
    from db.database import SessionLocal
    
    db = SessionLocal()
    model = keras.models.load_model('assets')
    
    cap = cv2.VideoCapture("rtsp://viarezocam:superponey@10.148.38.9/stream1")
    count = 0
    frame_gap = 450
    
    while(cap.isOpened()):
        ret, frame = cap.read()
        if ret and count % frame_gap == 0:
            current_time = datetime.now()
            treated_img = fix_singular_shape(frame, 16)
            input_image = np.expand_dims(np.squeeze(norm_by_imagenet(np.array([treated_img]))), axis=0)
            pred_map = np.squeeze(model.predict(input_image))
            count_prediction = np.sum(pred_map)
            waiting_time = timedelta(seconds=120 + count_prediction * 30)
            record = {"place": "local",
                "date": current_time,
                "density": count_prediction,
                "waiting_time": waiting_time}
            db_record = models.Records(**record)
            db.add(db_record)
            db.commit()
            db.refresh(db_record)
        count += 1
    cap.release()
    cv2.destroyAllWindows()