V1 after team test

This commit is contained in:
lucas
2024-08-02 00:08:12 +02:00
parent 496a68d631
commit 05139b0b43
8 changed files with 269 additions and 74 deletions

View File

@@ -7,14 +7,17 @@ import keyboard
import json
import os
from waitress import serve
from pygame import mixer
import time
import requests # Ajouté pour envoyer des requêtes HTTP
app = Flask(__name__)
DEBUG = False
INGAME = True
# Variables globales
fastest = None
port = 5000 # Définir le port utilisé par vos boutons
lock = threading.Lock()
order = [] # Liste pour suivre l'ordre des boutons
button_ips = {} # Dictionnaire pour enregistrer les adresses IP des boutons
@@ -25,109 +28,204 @@ if os.path.exists(json_file):
with open(json_file, 'r') as f:
button_ips = json.load(f)
def init_mixer():
if not mixer.get_init():
mixer.init()
if DEBUG:
print("Mixer initialized.")
def play_sound(file):
mixer.music.load(file)
mixer.music.play()
def notify_button(ip, endpoint):
""" Fonction pour envoyer une requête à un bouton à son adresse IP. """
url = f"http://{ip}:{port}{endpoint}" # Utiliser la variable `port` définie en haut
try:
response = requests.get(url)
if DEBUG:
print(f"Requête envoyée à {ip}{endpoint}: {response.status_code}")
except Exception as e:
print(f"Erreur lors de l'envoi de la requête à {ip}: {e}")
@app.route('/execute')
def execute_script():
global fastest
client_ip = request.remote_addr # Obtenir l'adresse IP du client
# Vérifier si l'adresse IP est enregistrée
button_id = None
for button, ip in button_ips.items():
if ip == client_ip:
button_id = button
break
if INGAME:
client_ip = request.remote_addr # Obtenir l'adresse IP du client
if button_id is None: # Si l'IP n'est pas trouvée
error_message = {"message": "Erreur : cette adresse IP n'est pas enregistrée."}
print(f"{error_message['message']} - IP: {client_ip}")
return jsonify(error_message), 400 # Retourne un code d'erreur 400
# Vérifier si l'adresse IP est enregistrée
button_id = None
for button, ip in button_ips.items():
if ip == client_ip:
button_id = button
break
send_vlc_command('pl_pause', True) # Mettre VLC en pause
# if button_id is None: # Si l'IP n'est pas trouvée
# error_message = {"message": "Erreur : cette adresse IP n'est pas enregistrée."}
# print(f"{error_message['message']} - IP: {client_ip}")
# return jsonify(error_message), 400 # Retourne un code d'erreur 400
# Utiliser un verrou pour gérer les accès concurrents
with lock:
if button_id not in order:
order.append(button_id) # Ajouter le boîtier à la liste d'ordre
print(f"Boîtiers en ordre: {order}")
if fastest is None:
fastest = button_id # Le premier à arriver devient le gagnant
response_message = {"message": "fastest"}
# print(f"Fastest is {button_id}")
# Utiliser un verrou pour gérer les accès concurrents
with lock:
if button_id not in order:
send_vlc_command('pl_pause', True) # Mettre VLC en pause
play_sound('coin.mp3') # Son de canard au premier buzz
order.append(button_id) # Ajouter le boîtier à la liste d'ordre
print(f"Boîtiers en ordre: {order}")
# Notification au bouton premier dans l'ordre
if order[0] == button_id:
notify_button(button_ips[button_id], '/on') # Envoyer /on
# Notifier tous les autres boutons enregistrés avec /off
for other_button_id, ip in button_ips.items():
if other_button_id != button_id:
notify_button(ip, '/off') # Envoyer /off
response_message = {"message": "added"} # Réponse pour l'ajout à l'ordre
else:
response_message = {"message": "second"}
# print(f"{button_id} is second")
else:
response_message = {"message": "duplicate"} # Requête répétée, rejetée
response_message = {"message": "duplicate"} # Requête répétée, rejetée
return jsonify(response_message), 200
return jsonify(response_message), 200
else:
return jsonify({"message": "not allowed"}), 200
def good_answer():
global INGAME
while True:
# Attendre que la touche 'r' soit pressée pour vider l'ordre
# Attendre que la touche 'o' soit pressée pour vider l'ordre
keyboard.wait('o')
with lock:
fastest = None # Réinitialiser le gagnant
if order:
INGAME = False
print(f"--- Boîtier {order[0]} gagne ! ---")
order.clear() # Enlever le premier boîtier de l'ordre
#seek_to_time('')
send_vlc_command('pl_play')
play_sound('victory.mp3')
# Envoyer /off à tous les boutons
for button, ip in button_ips.items():
notify_button(ip, '/off') # Envoi de /off aux boutons
order.clear() # Vider l'ordre
time.sleep(1)
send_vlc_command('pl_play') # Reprendre la musique
def incomplet_answer():
global fastest
while True:
# Attendre que la touche 'r' soit pressée pour vider l'ordre
# Attendre que la touche 'x' soit pressée pour enlever le premier boîtier
keyboard.wait('x')
with lock:
fastest = None # Réinitialiser le gagnant
if order:
print(f"Boîtier {order[0]} : Réponse incomplète")
# Vérifier si le premier boîtier est retiré
first_button = order[0]
print(f"Boîtier {first_button} : Réponse incomplète")
order.pop(0) # Enlever le premier boîtier de l'ordre
print(f"Boîtiers en ordre: {order}")
if not order:
send_vlc_command('pl_play')
def no_answer():
global fastest
while True:
# Attendre que la touche 'r' soit pressée pour vider l'ordre
keyboard.wait('p')
with lock:
fastest = None # Réinitialiser le gagnant
if order:
print(f"Boîtier {order[0]} : Pas de réponse")
order.pop(0) # Enlever le premier boîtier de l'ordre
print(f"Boîtiers en ordre: {order}")
# Si le bouton retiré était le premier, lui envoyer /off
notify_button(button_ips[first_button], '/off')
# Si le second boîtier devient le premier, envoyer /on
if order:
second_button = order[0]
notify_button(button_ips[second_button], '/on')
if not order:
send_vlc_command('pl_play')
# Envoyer /blink à tous les boutons
send_vlc_command('pl_play') # Reprendre la musique
for button, ip in button_ips.items():
notify_button(ip, '/blink')
def next_song():
global fastest
global INGAME
while True:
# Attendre que la touche 'r' soit pressée pour vider l'ordre
keyboard.wait('n')
print("Are you sure to next ?")
# Attendre que la touche 'n' soit pressée pour demander le prochain son
keyboard.wait('n')
send_vlc_command('pl_pause')
for button, ip in button_ips.items():
notify_button(ip, '/off')
time.sleep(1)
play_sound('bell.mp3')
print("Music - Next in 2 seconds")
time.sleep(2)
send_vlc_command('pl_next') # Passer à la chanson suivante
INGAME = True
with lock:
fastest = None # Réinitialiser le gagnant
if order:
order.pop(0) # Enlever le premier boîtier de l'ordre
print('Next song')
send_vlc_command('pl_next')
# Si order est vide, envoyer /blink à tous les boutons
if not order:
for button, ip in button_ips.items():
notify_button(ip, '/blink') # Envoyer /blink à tous les boutons
send_vlc_command('pl_play') # Reprendre la musique
def play():
while True:
# Attendre que la touche 'space' soit pressée pour mettre VLC en pause
keyboard.wait('m')
print("Music - Play")
send_vlc_command('pl_play')
def pause():
while True:
# Attendre que la touche 'r' soit pressée pour vider l'ordre
keyboard.wait('space')
# Attendre que la touche 'space' soit pressée pour mettre VLC en pause
keyboard.wait('p')
print("Music - Pause")
send_vlc_command('pl_pause', True)
def light_on():
while True:
keyboard.wait('r')
print("Buzzer light - On")
for button, ip in button_ips.items():
notify_button(ip, '/on')
def light_blink():
while True:
keyboard.wait('t')
print("Buzzer light - Blink")
for button, ip in button_ips.items():
notify_button(ip, '/blink')
def light_off():
while True:
keyboard.wait('y')
print("Buzzer light - Off")
for button, ip in button_ips.items():
notify_button(ip, '/off')
def buzzer_on():
global INGAME
while True:
keyboard.wait('f')
INGAME = True
print("Buzzer - Enable")
for button, ip in button_ips.items():
notify_button(ip, '/blink')
def buzzer_off():
global INGAME
while True:
keyboard.wait('h')
INGAME = False
print("Buzzer - Disable")
for button, ip in button_ips.items():
notify_button(ip, '/off')
if __name__ == '__main__':
print("Serveur démarré")
init_mixer()
threading.Thread(target=good_answer, daemon=True).start()
threading.Thread(target=incomplet_answer, daemon=True).start()
threading.Thread(target=no_answer, daemon=True).start()
threading.Thread(target=next_song, daemon=True).start()
threading.Thread(target=play, daemon=True).start()
threading.Thread(target=pause, daemon=True).start()
threading.Thread(target=light_on, daemon=True).start()
threading.Thread(target=light_off, daemon=True).start()
threading.Thread(target=light_blink, daemon=True).start()
threading.Thread(target=buzzer_on, daemon=True).start()
threading.Thread(target=buzzer_off, daemon=True).start()
INGAME = True
print("Buzzer - Enable")
for button, ip in button_ips.items():
notify_button(ip, '/blink')
serve(app, host='0.0.0.0', port=5000)
serve(app, host='0.0.0.0', port=5000)