Skip to content
Snippets Groups Projects
Commit de28297e authored by Pierre Minssen's avatar Pierre Minssen
Browse files

creation d'une fake_cam et renomage des variable ip/port

parent a3015e7a
Branches
No related tags found
No related merge requests found
...@@ -10,26 +10,34 @@ UDP_PORT = 52381 ...@@ -10,26 +10,34 @@ UDP_PORT = 52381
class Camera(command_lib.Command, inq_command_lib.Inquiry): class Camera(command_lib.Command, inq_command_lib.Inquiry):
def __init__(self, UDP_IP = "192.168.0.100", UDP_PORT = 52381, UDP_IP_OUT = "192.168.0.57", VV= "09", WW= "09", seq_num: int = 0,debug=False, virtualcam=False): def __init__(self, UDP_IP_SEND ="192.168.0.100", UDP_PORT_SEND = 52381, UDP_IP_RECEIVE ="192.168.0.57", VV="09", WW="09", seq_num: hex = '00000000', debug=False, virtualcam=True):
""" """
Création d'une caméra avec Création d'une caméra avec
:param UDP_IP: IP de la caméra (192.168.0.100 par défaut) :param UDP_IP_SEND: IP de la caméra (192.168.0.100 par défaut)
:param UDP_PORT: Port de la caméra (52381 par défaut) :param UDP_PORT_SEND: Port de la caméra (52381 par défaut)
:param UDP_PORT: Ip du pc (192.168.0.57 par défaut) :param UDP_IP_RECEIVE: Ip du pc (192.168.0.57 par défaut)
:param VV: Vitesse de déplacement horizontale (entre 01 et 18) en hex :param VV: Vitesse de déplacement horizontale (entre 01 et 18) en hex
:param WW: Vitesse de déplacement veritcale (entre 01 et 17) en hex :param WW: Vitesse de déplacement veritcale (entre 01 et 17) en hex
:param seq_num: sequence number de départ :param seq_num: sequence number de départ
""" """
self.seq_num = seq_num # erreur du capteur UWB (p pour précision...) WTF ce commentaire ??? self.seq_num = seq_num
self.VV = VV self.VV = VV
self.WW = WW self.WW = WW
self.camera_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP self.camera_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
self.CAMERA_IP = UDP_IP self.CAMERA_IP_SEND = UDP_IP_SEND
self.CAMERA_PORT = UDP_PORT self.CAMERA_PORT = UDP_PORT_SEND
self.CAMERA_IP_OUT = UDP_IP_OUT self.CAMERA_IP_RECEIVE = UDP_IP_RECEIVE
if virtualcam:
self.CAMERA_IP_SEND = "127.0.0.1"
self.CAMERA_PORT = 5006
self.CAMERA_IP_RECEIVE = "127.0.0.2"
self.listener = listener.Listener(self.CAMERA_IP_RECEIVE, 5007)
else:
self.listener = listener.Listener(self.CAMERA_IP_RECEIVE, self.CAMERA_PORT)
self.debug = debug self.debug = debug
self.virtualcam = virtualcam self.virtualcam = virtualcam
self.listener = listener.Listener(self.CAMERA_IP_OUT, self.CAMERA_PORT)
self.listener.start() self.listener.start()
def send(self, message): def send(self, message):
...@@ -37,70 +45,43 @@ class Camera(command_lib.Command, inq_command_lib.Inquiry): ...@@ -37,70 +45,43 @@ class Camera(command_lib.Command, inq_command_lib.Inquiry):
Envoie un message à la cam (supporte les ' ') Envoie un message à la cam (supporte les ' ')
:param message: message str['hex'] :param message: message str['hex']
""" """
if(not(self.virtualcam)): #Si la cam n'est pas virtuel, on envoie le paquet #if(not(self.virtualcam)): #Si la cam n'est pas virtuel, on envoie le paquet
self.camera_sock.sendto(bytes.fromhex(message.replace(' ', '')), (self.CAMERA_IP, self.CAMERA_PORT)) self.camera_sock.sendto(bytes.fromhex(message.replace(' ', '')), (self.CAMERA_IP_SEND, self.CAMERA_PORT))
self.seq_num += 1 # ballec de l'hexa pour l'instant self.seq_num += 1 # ballec de l'hexa pour l'instant
if(self.debug or self.virtualcam): #Si la cam est virtuelle ou on debug, on envoie if(self.debug or self.virtualcam): #Si la cam est virtuelle ou on debug, on envoie
print("Commande n°{} envoyée : {} \n".format(self.seq_num,message)) print("Commande n°{} envoyée : {} \n".format(self.seq_num,message))
def send_payload(self, payloadtype,payload): def send_payload(self, payloadtype, payload, command_type="default"):
message=payloadtype +' ' + self.payload2header(payload) + payload message = payloadtype + ' ' + self.payload2header(payload) + ' ' + payload
if(not(self.virtualcam)): #Si la cam n'est pas virtuel, on envoie le paquet #if(not(self.virtualcam)): #Si la cam n'est pas virtuel, on envoie le paquet
self.listener.message[self.seq_num] = [command_type]
self.camera_sock.sendto(bytes.fromhex(message.replace(' ', '')), (self.CAMERA_IP, self.CAMERA_PORT)) self.camera_sock.sendto(bytes.fromhex(message.replace(' ', '')), (self.CAMERA_IP_SEND, self.CAMERA_PORT))
self.seq_num += 1 # ballec de l'hexa pour l'instant
if(self.debug or self.virtualcam): #Si la cam est virtuelle ou on debug, on envoie if(self.debug or self.virtualcam): #Si la cam est virtuelle ou on debug, on envoie
print("Commande n°{} envoyée : {} \n".format(self.seq_num,message)) print("Commande n°{} envoyée : {} \n".format(self.seq_num,message))
self.increment_seq_number()
def payload2header(self, payload): def payload2header(self, payload):
""" """
take the payload, calculate the payload length and add the sequence number take the payload, calculate the payload length and add the sequence number
:param payload: :param payload:
:return: header without prefix = payloadlength + seq_num :return: header without prefix = payload_length + seq_num
""" """
length = hex(len(payload.replace(' ', ''))) length = hex(len(payload.replace(' ', ''))//2)
payloadlength = length[2:] payload_length = length[2:]
if len(length) == 1: diff = 4 - len(payload_length)
payloadlength = '0' + payloadlength payload_length = '0'*diff + payload_length
header = payload_length
header = payloadlength header = header + ' ' + self.seq_num
header = header + ('0' * (8 - len(str(self.seq_num))) + str(self.seq_num))
return header return header
# cam's respond def increment_seq_number(self):
@staticmethod seq_num = hex(int(self.seq_num, 16)+1)[2:]
def decompose_reception(reception: str): diff = 8 - len(seq_num)
""" self.seq_num = '0' * diff + seq_num
decomposes the hex received messaged if it's not a error respond
:param reception: str hex eg '01110003000085029051ff'
:return: list of str hex [payload_type, payload_length, sequence_number, payload] eg ['0111', '0003', '00008502', '9051ff']
"""
payload_type = reception[0:4]
payload_length = reception[4:8]
sequence_number = reception[8:16]
payload = reception[16:]
return [payload_type, payload_length, sequence_number, payload]
@staticmethod
def payload2meaning(payload: str, command_type="default"):
"""
Reveal the meaning of the visca respond
:param payload: payload recieved in str hex eg '9051ff'
:param command_type: type of command, the respond's meaning depends on the inquiry/command
:return: the meaning in str
"""
meaning = "Not known"
if command_type in inq_resp.inq_list:
# if payload is not in the dictionary meaning = None
meaning = inq_resp.inq_list.get(command_type).get(payload)
else:
meaning = default_resp.liste_respond.get(payload)
if meaning is None:
meaning = "Respond not understood"
return meaning
......
...@@ -20,12 +20,13 @@ class Command: ...@@ -20,12 +20,13 @@ class Command:
def send_command(self, payload: hex): def send_command(self, payload: hex):
payloadtype = "01 00" payloadtype = "01 00"
self.send_payload(payloadtype, payload) self.send_payload(payloadtype, payload, "command")
#CAM_POWER : Power ON/OFF #CAM_POWER : Power ON/OFF
def power_on(self): def power_on(self):
payload = '81 01 04 00 02 FF' payload = '81 01 04 00 02 FF'
self.send_command(payload) self.send_command(payload)
def power_off(self): def power_off(self):
payload = '81 01 04 00 02 FF' payload = '81 01 04 00 02 FF'
self.send_command(payload) self.send_command(payload)
......
import socket
import viscaoveriplib.commands_library as command_lib
import viscaoveriplib.inquiry_commands_library as inq_command_lib
import viscaoveriplib.fake_listener as listener
import viscaoveriplib.inquiry_responds_library as inq_resp
import viscaoveriplib.responds_messages as default_resp
class FakeCamera(command_lib.Command, inq_command_lib.Inquiry):
def __init__(self, UDP_IP_SEND ="127.0.0.2", UDP_PORT_SEND = 5007, UDP_IP_RECEIVE ="127.0.0.1", seq_num: int = 0, debug=False, virtualcam=False):
"""
Simulation of a camera for testing purpuses
in one python console run camera.py with the virtual cam True -> cam = Camera()
in another one run fake_camera.py -> fake = FakeCamera()
then test command on cam eg cam.ir_receive()
the respond message is choosen in fake_listener.py
:param UDP_IP_SEND: IP de la caméra (192.168.0.100 par défaut)
:param UDP_PORT_SEND: Port de la caméra (52381 par défaut)
:param UDP_PORT: Ip du pc (192.168.0.57 par défaut)
:param seq_num: sequence number de départ
"""
self.seq_num = seq_num
self.camera_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
self.CAMERA_IP_SEND = UDP_IP_SEND
self.CAMERA_PORT = UDP_PORT_SEND
self.CAMERA_IP_RECEIVE = UDP_IP_RECEIVE
self.debug = debug
self.virtualcam = virtualcam
self.listener = listener.FakeListener(self, self.CAMERA_IP_RECEIVE, 5006)
#self.listener = listener.FakeListener(self.CAMERA_IP_RECEIVE, self.CAMERA_PORT)
self.listener.start()
def send(self, message):
"""
Envoie un message à la cam (supporte les ' ')
:param message: message str['hex']
"""
if not self.virtualcam: # Si la cam n'est pas virtuel, on envoie le paquet
self.camera_sock.sendto(bytes.fromhex(message.replace(' ', '')), (self.CAMERA_IP_SEND, self.CAMERA_PORT))
self.seq_num += 1 # ballec de l'hexa pour l'instant
if self.debug or self.virtualcam: #Si la cam est virtuelle ou on debug, on envoie
print("Commande n°{} envoyée : {} \n".format(self.seq_num,message))
def send_payload(self, payloadtype, payload):
message=payloadtype + ' ' + self.payload2header(payload) + payload
if not self.virtualcam: # Si la cam n'est pas virtuel, on envoie le paquet
# self.camera_sock.sendto(bytes.fromhex(message.replace(' ', '')), (self.CAMERA_IP_SEND, self.CAMERA_PORT))
self.camera_sock.sendto(bytes.fromhex(message), (self.CAMERA_IP_SEND, self.CAMERA_PORT))
self.seq_num += 1 # ballec de l'hexa pour l'instant
if self.debug or self.virtualcam: #Si la cam est virtuelle ou on debug, on envoie
print("Commande n°{} envoyée : {} \n".format(self.seq_num,message))
def payload2header(self, payload):
"""
take the payload, calculate the payload length and add the sequence number
:param payload:
:return: header without prefix = payloadlength + seq_num
"""
length = hex(len(payload.replace(' ', '')))
payloadlength = length[2:]
if len(payloadlength) == 1:
payloadlength = '0' + payloadlength
header = payloadlength
header = header + ('0' * (8 - len(str(self.seq_num))) + str(self.seq_num))
return header
# cam's respond
@staticmethod
def decompose_reception(reception: str):
"""
decomposes the hex received messaged
:param reception: str hex eg '01110003000085029051ff'
:return: list of str hex [payload_type, payload_length, sequence_number, payload] eg ['0111', '0003', '00008502', '9051ff']
"""
payload_type = reception[0:4]
payload_length = reception[4:8]
sequence_number = reception[8:16]
payload = reception[16:]
return [payload_type, payload_length, sequence_number, payload]
import socket
from threading import Thread
import random
import time
class FakeListener(Thread):
"""Thread chargé simplement d'afficher un mot dans la console."""
def __init__(self, camera, IP_OUT = "127.0.0.2", PORT = 5006):
Thread.__init__(self)
self.daemon = True
self.message = []
self.IP_OUT = IP_OUT
self.PORT = PORT
self.camera = camera
self.liste_respond = ['9041ff', '9051ff', "006002ff", "906003ff", "906104ff", "906105ff", "906141ff"]
def run(self):
sock_recp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_recp.bind((self.IP_OUT, self.PORT))
while True:
data, addr = sock_recp.recvfrom(1024) # buffer size is 1024 bytes
self.message.append(data.hex())
seq_num = data.hex()[8:16]
print('0111 ' + '0003 ' + seq_num + ' ' + self.liste_respond[0])
self.camera.send('0111' + '0003' + seq_num + self.liste_respond[0])
print("fake : " + self.message[-1])
time.sleep(random.random())
self.camera.send('0111' + '0003' + seq_num + self.liste_respond[1])
import camera_main
class Inquiry: class Inquiry:
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
Dictionary of responds depending on the inquiry command Dictionary of responds depending on the inquiry command
""" """
respond = {'y05002FF': "On", 'y05003FF': "Off", "y05000FF": "auto/standard"} basic_respond = {'y05002FF': "On", 'y05003FF': "Off", "y05000FF": "auto/standard"}
wb_mode_respond = {"y05000FF": "auto", "y05001FF": "In Door", "y05002FF": "Out Door", wb_mode_respond = {"y05000FF": "auto", "y05001FF": "In Door", "y05002FF": "Out Door",
"y05003FF": "One Push WB", "y05004FF": "ATW", "y05005FF": "Manual"} "y05003FF": "One Push WB", "y05004FF": "ATW", "y05005FF": "Manual"}
...@@ -27,4 +27,4 @@ ir_condition = {"y05000FF": "ir remote commander stable reception enable", ...@@ -27,4 +27,4 @@ ir_condition = {"y05000FF": "ir remote commander stable reception enable",
"y05001FF": "ir remote commander reception unstable environment", "y05001FF": "ir remote commander reception unstable environment",
"y05002FF": "power ON by ir remote commander (cannot be judged)"} "y05002FF": "power ON by ir remote commander (cannot be judged)"}
inq_list = {"respond" : respond, "wb_mode_respond" : wb_mode_respond, "picture_mode_respond": picture_mode_respond, "video_system_respond" : video_system_respond, "color_system_respond" : color_system_respond, "ir_condition" : ir_condition} inq_list = {"basic_respond" : basic_respond, "wb_mode_respond" : wb_mode_respond, "picture_mode_respond": picture_mode_respond, "video_system_respond" : video_system_respond, "color_system_respond" : color_system_respond, "ir_condition" : ir_condition}
\ No newline at end of file \ No newline at end of file
import socket import socket
from threading import Thread from threading import Thread
import viscaoveriplib.inquiry_responds_library as inq_resp
from viscaoveriplib.responds_messages import liste_respond as default_respond
from viscaoveriplib.responds_library import respond_list
class Listener(Thread): class Listener(Thread):
"""Thread chargé simplement d'afficher un mot dans la console."""
def __init__(self, IP_OUT = "192.168.0.57", PORT = 52381): def __init__(self, IP_OUT = "192.168.0.57", PORT = 52381):
Thread.__init__(self) Thread.__init__(self)
self.message = {} self.daemon = True
self.IP_OUT = IP_OUT self.IP_OUT = IP_OUT
self.PORT = PORT self.PORT = PORT
self.message = {}
def run(self): def run(self):
sock_recp = socket.socket(socket.AF_INET, # Internet sock_recp = socket.socket(socket.AF_INET, # Internet
...@@ -19,9 +20,28 @@ class Listener(Thread): ...@@ -19,9 +20,28 @@ class Listener(Thread):
sock_recp.bind((self.IP_OUT, self.PORT)) sock_recp.bind((self.IP_OUT, self.PORT))
while True: while True:
data, addr = sock_recp.recvfrom(1024) # buffer size is 1024 bytes data, addr = sock_recp.recvfrom(1024) # buffer size is 1024 bytes
self.message.append(data.hex()) [payload_type, payload_length, sequence_number, payload] = self.decompose_reception(data.hex())
type_command = self.message[sequence_number][0]
if type_command in respond_list:
if payload in respond_list[type_command]:
print("Respond : " + respond_list[type_command][payload])
# cam's respond
@staticmethod
def decompose_reception(reception: str):
"""
decomposes the hex received messaged
:param reception: str hex eg '01110003000085029051ff'
:return: list of str hex [payload_type, payload_length, sequence_number, payload] eg ['0111', '0003', '00008502', '9051ff']
"""
payload_type = reception[0:4]
payload_length = reception[4:8]
sequence_number = reception[8:16]
payload = reception[16:]
return [payload_type, payload_length, sequence_number, payload]
liste_respond = {'9041ff': "Acknowledge", '9051ff': "Completion", "006002ff": "S1ntax Error",
"906003ff": "Command Buffer Full", "906104ff": "Command Canceled", "906105ff": "No Socket",
"906141ff": "Command Not Executable"}
"""
liste_respond = {'904yff': "Acknowledge", '905yff': "Completion", "006002ff": "Syntax Error", liste_respond = {'904yff': "Acknowledge", '905yff': "Completion", "006002ff": "Syntax Error",
"906003ff": "Command Buffer Full", "906y04ff": "Command Canceled", "906y05ff": "No Socket", "906003ff": "Command Buffer Full", "906y04ff": "Command Canceled", "906y05ff": "No Socket",
"906y41ff": "Command Not Executable"} "906y41ff": "Command Not Executable"}
"""
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment