from datetime import datetime, timedelta from flask import session from flask_login import login_user from . import db from backend.models import User, LoginIP from backend.alldebrid import check_alldebrid_status, send_ntfy from backend.security import verify_password MAX_ATTEMPTS = 5 BLOCK_TIME = timedelta(minutes=15) def authenticate_user(username: str, password: str, ip: str): ip_record = LoginIP.query.filter_by(ip=ip).first() if not ip_record: ip_record = LoginIP(ip=ip) db.session.add(ip_record) db.session.commit() # IP bloquée if ip_record.blocked_until and datetime.utcnow() < ip_record.blocked_until: remaining = int((ip_record.blocked_until - datetime.utcnow()).total_seconds() // 60) + 1 return None, f"Trop de tentatives depuis votre IP. Réessayez dans {remaining} min." user = User.query.filter_by(username=username).first() if user and verify_password(password, user.password): # Reset IP ip_record.count = 0 ip_record.blocked_until = None db.session.commit() login_user(user) session['user'] = user.username # Vérification AllDebrid premium = check_alldebrid_status() session['alldebrid_active'] = premium if premium: send_ntfy("AllDebrid non premium", "Tentative avortée sur ygg-service !") return user, None else: ip_record.count += 1 ip_record.last_attempt = datetime.utcnow() if ip_record.count >= MAX_ATTEMPTS: ip_record.blocked_until = datetime.utcnow() + BLOCK_TIME msg = f"Trop de tentatives. Blocage pour {BLOCK_TIME.seconds // 60} minutes." else: msg = f"Identifiants invalides ({ip_record.count}/{MAX_ATTEMPTS})" db.session.commit() return None, msg