Files
ygg-service/backend/auth.py
2025-11-11 21:26:34 +01:00

63 lines
2.0 KiB
Python

from datetime import datetime, timedelta
from flask import session
from flask_login import login_user
from backend.api import check_alldebrid_status, send_ntfy
from backend.security import verify_password, needs_rehash, hash_password
MAX_ATTEMPTS = 5
BLOCK_TIME = timedelta(minutes=15)
def authenticate_user(username: str, password: str, ip: str):
from . import db
from backend.models import User, LoginIP
now = datetime.utcnow()
# --- Gestion IP ---
ip_record = LoginIP.query.filter_by(ip=ip).first()
if not ip_record:
ip_record = LoginIP(ip=ip)
db.session.add(ip_record)
db.session.commit()
# IP bloquée
if ip_record.blocked_until and now < ip_record.blocked_until:
remaining = int((ip_record.blocked_until - now).total_seconds() // 60) + 1
return None, f"Trop de tentatives depuis votre IP. Réessayez dans {remaining} min."
# --- Récupération utilisateur ---
user = User.query.filter_by(username=username).first()
if user and verify_password(password, user.password):
# Upgrade du hash si nécessaire
if needs_rehash(user.password):
user.password = hash_password(password)
# Reset compteur IP
ip_record.count = 0
ip_record.blocked_until = None
ip_record.last_attempt = now
db.session.commit()
login_user(user)
# Vérification AllDebrid
premium = check_alldebrid_status()
session['alldebrid_active'] = premium
if not premium:
send_ntfy("AllDebrid non premium", "Tentative avortée sur ygg-service !")
return user, None
# --- Échec de connexion ---
ip_record.count += 1
ip_record.last_attempt = now
if ip_record.count >= MAX_ATTEMPTS:
ip_record.blocked_until = now + BLOCK_TIME
msg = f"Trop de tentatives depuis votre IP. Blocage pour {BLOCK_TIME.seconds // 60} minutes."
else:
msg = f"Identifiants invalides ({ip_record.count}/{MAX_ATTEMPTS})"
db.session.commit()
return None, msg