From 7bd2fb5b7c4ca832b2ca818b2f20633e5d259179 Mon Sep 17 00:00:00 2001 From: Lucas Date: Tue, 11 Nov 2025 20:14:58 +0100 Subject: [PATCH] Add argon2 password hash --- backend/auth.py | 50 +++++++++++++++++++++++++++++++++++++++++++ backend/models.py | 3 +++ backend/routes.py | 52 ++++++++------------------------------------- backend/security.py | 19 +++++++++++++++++ create_user.py | 7 ++++-- requirements.txt | 3 +++ 6 files changed, 89 insertions(+), 45 deletions(-) create mode 100644 backend/auth.py create mode 100644 backend/security.py diff --git a/backend/auth.py b/backend/auth.py new file mode 100644 index 0000000..38865d3 --- /dev/null +++ b/backend/auth.py @@ -0,0 +1,50 @@ +from datetime import datetime, timedelta +from flask import session +from flask_login import login_user +from . import db +from backend.models import User, LoginIP +from backend.alldebrid import check_alldebrid_status, send_ntfy +from backend.security import verify_password + +MAX_ATTEMPTS = 5 +BLOCK_TIME = timedelta(minutes=15) + +def authenticate_user(username: str, password: str, ip: str): + ip_record = LoginIP.query.filter_by(ip=ip).first() + if not ip_record: + ip_record = LoginIP(ip=ip) + db.session.add(ip_record) + db.session.commit() + + # IP bloquée + if ip_record.blocked_until and datetime.utcnow() < ip_record.blocked_until: + remaining = int((ip_record.blocked_until - datetime.utcnow()).total_seconds() // 60) + 1 + return None, f"Trop de tentatives depuis votre IP. Réessayez dans {remaining} min." + + user = User.query.filter_by(username=username).first() + if user and verify_password(password, user.password): + # Reset IP + ip_record.count = 0 + ip_record.blocked_until = None + db.session.commit() + + login_user(user) + session['user'] = user.username + + # Vérification AllDebrid + premium = check_alldebrid_status() + session['alldebrid_active'] = premium + if premium: + send_ntfy("AllDebrid non premium", "Tentative avortée sur ygg-service !") + + return user, None + else: + ip_record.count += 1 + ip_record.last_attempt = datetime.utcnow() + if ip_record.count >= MAX_ATTEMPTS: + ip_record.blocked_until = datetime.utcnow() + BLOCK_TIME + msg = f"Trop de tentatives. Blocage pour {BLOCK_TIME.seconds // 60} minutes." + else: + msg = f"Identifiants invalides ({ip_record.count}/{MAX_ATTEMPTS})" + db.session.commit() + return None, msg \ No newline at end of file diff --git a/backend/models.py b/backend/models.py index c9792bd..bf8d42a 100644 --- a/backend/models.py +++ b/backend/models.py @@ -13,3 +13,6 @@ class LoginIP(db.Model): count = db.Column(db.Integer, default=0, nullable=False) blocked_until = db.Column(db.DateTime, nullable=True) last_attempt = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + failed_attempts = db.Column(db.Integer, default=0, nullable=False) + locked_until = db.Column(db.DateTime, nullable=True) + last_failed = db.Column(db.DateTime, nullable=True) diff --git a/backend/routes.py b/backend/routes.py index 54962e4..33685f9 100644 --- a/backend/routes.py +++ b/backend/routes.py @@ -1,19 +1,18 @@ # Standard library -from datetime import datetime, timedelta +from datetime import timedelta from io import BytesIO import time # Third-party libraries import requests from flask import render_template, request, redirect, url_for, session, flash, jsonify, make_response -from flask_login import login_user, logout_user, login_required, current_user -from werkzeug.security import check_password_hash +from flask_login import logout_user, login_required, current_user +from flask import request, render_template, redirect, url_for, flash + # Project imports -from . import db -from backend.models import User, LoginIP from backend.utils import format_size, calculate_age -from backend.alldebrid import check_alldebrid_status, send_ntfy +from backend.auth import authenticate_user MAX_ATTEMPTS = 5 BLOCK_TIME = timedelta(minutes=15) @@ -34,48 +33,15 @@ def init_app(app): @app.route('/login', methods=['GET', 'POST']) def login(): - ip = request.remote_addr or "unknown" - ip_record = LoginIP.query.filter_by(ip=ip).first() - if not ip_record: - ip_record = LoginIP(ip=ip) - db.session.add(ip_record) - db.session.commit() - - if ip_record.blocked_until and datetime.utcnow() < ip_record.blocked_until: - remaining = int((ip_record.blocked_until - datetime.utcnow()).total_seconds() // 60) + 1 - flash(f"Trop de tentatives depuis votre IP. Réessayez dans {remaining} min.") - return render_template("login.html") - if request.method == "POST": username = request.form.get("username") password = request.form.get("password") - user = User.query.filter_by(username=username).first() + ip = request.remote_addr or "unknown" - if user and check_password_hash(user.password, password): - ip_record.count = 0 - ip_record.blocked_until = None - db.session.commit() - login_user(user) - session['user'] = user.username - - # --- Vérification AllDebrid --- - print("Vérification en cours") - premium = check_alldebrid_status() - session['alldebrid_active'] = premium - if not premium: # notifier seulement si le compte n’est plus premium - print("Envoi notif") - send_ntfy("AllDebrid non premium", "Tentative avortée sur ygg-service !") - - return redirect(url_for("dashboard")) + user, msg = authenticate_user(username, password, ip) + if user: + return redirect(url_for('dashboard')) else: - ip_record.count += 1 - ip_record.last_attempt = datetime.utcnow() - if ip_record.count >= MAX_ATTEMPTS: - ip_record.blocked_until = datetime.utcnow() + BLOCK_TIME - msg = f"Trop de tentatives. Blocage pour {BLOCK_TIME.seconds // 60} minutes." - else: - msg = f"Identifiants invalides ({ip_record.count}/{MAX_ATTEMPTS})" - db.session.commit() flash(msg) return render_template("login.html") diff --git a/backend/security.py b/backend/security.py new file mode 100644 index 0000000..92d404f --- /dev/null +++ b/backend/security.py @@ -0,0 +1,19 @@ +from passlib.context import CryptContext + +# Configuration sensible : tu peux ajuster time_cost, memory_cost, parallelism +pwd_context = CryptContext( + schemes=["argon2"], + deprecated="auto", + argon2__time_cost=3, + argon2__memory_cost=64 * 1024, # 64 MB + argon2__parallelism=2 +) + +def hash_password(plain: str) -> str: + return pwd_context.hash(plain) + +def verify_password(plain: str, hashed: str) -> bool: + return pwd_context.verify(plain, hashed) + +def needs_rehash(hashed: str) -> bool: + return pwd_context.needs_update(hashed) \ No newline at end of file diff --git a/create_user.py b/create_user.py index 103a187..8eb0ac6 100755 --- a/create_user.py +++ b/create_user.py @@ -1,16 +1,19 @@ #!/usr/bin/env python3 from backend import create_app, db from backend.models import User -from werkzeug.security import generate_password_hash +from backend.security import hash_password app = create_app() + with app.app_context(): username = input("Nom d'utilisateur : ") password = input("Mot de passe : ") + if User.query.filter_by(username=username).first(): print(f"Utilisateur '{username}' existe déjà !") else: - user = User(username=username, password=generate_password_hash(password)) + hashed_pw = hash_password(password) # utilisation de passlib + user = User(username=username, password=hashed_pw) db.session.add(user) db.session.commit() print(f"Utilisateur '{username}' créé avec succès !") diff --git a/requirements.txt b/requirements.txt index 87bebef..885e9ab 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,6 @@ Flask-Login Flask-SQLAlchemy requests feedparser +Flask-Limiter +passlib[argon2] +argon2-cffi \ No newline at end of file