from datetime import datetime, timedelta from flask import session from flask_login import login_user from backend.api import check_alldebrid_status, send_ntfy from backend.security import verify_password, needs_rehash, hash_password MAX_ATTEMPTS = 5 BLOCK_TIME = timedelta(minutes=15) def authenticate_user(username: str, password: str, ip: str): from . import db from backend.models import User, LoginIP now = datetime.utcnow() # --- Gestion IP --- ip_record = LoginIP.query.filter_by(ip=ip).first() if not ip_record: ip_record = LoginIP(ip=ip) db.session.add(ip_record) db.session.commit() # IP bloquée if ip_record.blocked_until and now < ip_record.blocked_until: remaining = int((ip_record.blocked_until - now).total_seconds() // 60) + 1 return None, f"Trop de tentatives depuis votre IP. Réessayez dans {remaining} min." # --- Récupération utilisateur --- user = User.query.filter_by(username=username).first() if user and verify_password(password, user.password): # Upgrade du hash si nécessaire if needs_rehash(user.password): user.password = hash_password(password) # Reset compteur IP ip_record.count = 0 ip_record.blocked_until = None ip_record.last_attempt = now db.session.commit() login_user(user) # Vérification AllDebrid premium = check_alldebrid_status() session['alldebrid_active'] = premium if not premium: send_ntfy("AllDebrid non premium", "Tentative avortée sur ygg-service !") return user, None # --- Échec de connexion --- ip_record.count += 1 ip_record.last_attempt = now if ip_record.count >= MAX_ATTEMPTS: ip_record.blocked_until = now + BLOCK_TIME msg = f"Trop de tentatives depuis votre IP. Blocage pour {BLOCK_TIME.seconds // 60} minutes." else: msg = f"Identifiants invalides ({ip_record.count}/{MAX_ATTEMPTS})" db.session.commit() return None, msg