From 08be6a2455b6f883968e545106c9b06e187b33e3 Mon Sep 17 00:00:00 2001 From: Lucas Date: Tue, 11 Nov 2025 21:26:34 +0100 Subject: [PATCH] Refactor --- backend/__init__.py | 64 +++++++++----- backend/alldebrid.py | 40 --------- backend/api.py | 144 ++++++++++++++++++++++++++++++++ backend/auth.py | 48 +++++++---- backend/routes.py | 193 ++++++------------------------------------- 5 files changed, 241 insertions(+), 248 deletions(-) delete mode 100644 backend/alldebrid.py create mode 100644 backend/api.py diff --git a/backend/__init__.py b/backend/__init__.py index a6ada08..708c4a5 100644 --- a/backend/__init__.py +++ b/backend/__init__.py @@ -1,50 +1,72 @@ + from flask import Flask from flask_sqlalchemy import SQLAlchemy from flask_login import LoginManager +from dotenv import load_dotenv +from pathlib import Path import os db = SQLAlchemy() login_manager = LoginManager() def create_app(): - # Frontend path - base_dir = os.path.abspath(os.path.dirname(__file__)) - template_dir = os.path.join(base_dir, '..', 'frontend', 'templates') - static_dir = os.path.join(base_dir, '..', 'frontend', 'static') + # --- Chemins --- + base_dir = Path(__file__).resolve().parent + template_dir = base_dir.parent / 'frontend' / 'templates' + static_dir = base_dir.parent / 'frontend' / 'static' + instance_path = base_dir.parent / 'instance' + instance_path.mkdir(parents=True, exist_ok=True) - # DB path - instance_path = os.path.join(base_dir, '..', 'instance') - os.makedirs(instance_path, exist_ok=True) + # --- Chargement des variables d'environnement --- + env_path = base_dir.parent / '.env' + load_dotenv(env_path) + # --- Vérification de la clé secrète --- + secret = os.getenv('FLASK_SECRET') + if not secret: + raise RuntimeError("FLASK_SECRET must be set in environment") + + # --- Création de l'application --- app = Flask( __name__, - template_folder=template_dir, - static_folder=static_dir, - instance_path=os.path.abspath(instance_path) + template_folder=str(template_dir), + static_folder=str(static_dir), + instance_path=str(instance_path) ) - app.config['SECRET_KEY'] = 'IddAhxI%$G%2X3joI04i' - app.config['SQLALCHEMY_DATABASE_URI'] = f"sqlite:///{os.path.join(app.instance_path, 'users.db')}" - app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False + # --- Configuration --- + app.config.update( + SECRET_KEY=secret, + SQLALCHEMY_DATABASE_URI=f"sqlite:///{instance_path / 'users.db'}", + SQLALCHEMY_TRACK_MODIFICATIONS=False, + SESSION_COOKIE_HTTPONLY=True, + SESSION_COOKIE_SAMESITE='Lax', + SESSION_COOKIE_SECURE=os.getenv("FLASK_ENV") == "production" + ) - # Init extensions + # --- Initialisation des extensions --- db.init_app(app) login_manager.init_app(app) login_manager.login_view = "login" + login_manager.session_protection = "strong" + login_manager.login_message = "Veuillez vous connecter pour accéder à cette page." + login_manager.login_message_category = "warning" - # Import models for user_loader + from backend import routes from backend.models import User + # Loader pour Flask-Login @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) + + # --- Chargement différé des modèles et routes --- + with app.app_context(): - # Import routes and give app - from backend import routes + # Création de la DB si nécessaire + db.create_all() + + # Initialisation des routes routes.init_app(app) - # Create DB if needed - with app.app_context(): - db.create_all() - return app diff --git a/backend/alldebrid.py b/backend/alldebrid.py deleted file mode 100644 index d639e18..0000000 --- a/backend/alldebrid.py +++ /dev/null @@ -1,40 +0,0 @@ -import requests -import os - -ALLDEBRID_API_KEY = os.getenv("ALLDEBRID_API_KEY", "mtrQI4h583rHe2ZpvpbC") -NTFY_TOPIC_URL = os.getenv("NTFY_TOPIC_URL", "https://ntfy.lucasroyer.fr/alldebrid") -NTFY_TOKEN = os.getenv("NTFY_TOKEN", "tk_qqi1ayj2a0etxafgicl0h7ww71ofb") # Ton token pour topic protégé - -def check_alldebrid_status(): - # Retourne True si premium actif, False sinon - try: - r = requests.get( - "https://api.alldebrid.com/v4/user", - params={"agent": "ygg-service", "apikey": ALLDEBRID_API_KEY}, - timeout=5 - ) - data = r.json() - return data.get("data", {}).get("user", {}).get("isPremium", False) - except Exception as e: - print("Erreur AllDebrid:", e) - return False - -def send_ntfy(title, message): - # Envoie une notification sur le topic ntfy, avec token si nécessaire - headers = {"Title": title} - if NTFY_TOKEN: - headers["Authorization"] = f"Bearer {NTFY_TOKEN}" - - try: - r = requests.post( - NTFY_TOPIC_URL, - data=message.encode("utf-8"), - headers=headers, - timeout=5 - ) - if r.status_code not in (200, 201): - print(f"❌ Échec notification ({r.status_code}): {r.text}") - else: - print(f"✅ Notification envoyée : {title}") - except Exception as e: - print("Erreur ntfy:", e) \ No newline at end of file diff --git a/backend/api.py b/backend/api.py new file mode 100644 index 0000000..1a88d48 --- /dev/null +++ b/backend/api.py @@ -0,0 +1,144 @@ +from io import BytesIO +import requests +import os +from backend.utils import format_size, calculate_age + +YGG_PASSKEY = os.getenv("YGG_PASSKEY") +ALLDEBRID_API_KEY = os.getenv("ALLDEBRID_API_KEY") +NTFY_TOPIC_URL = os.getenv("NTFY_TOPIC_URL") +NTFY_TOKEN = os.getenv("NTFY_TOKEN") + +MAX_SIZE_BYTES = 5*1024**3 +NB_PAGES = 2 + +def check_alldebrid_status(): + # Retourne True si premium actif, False sinon + try: + r = requests.get( + "https://api.alldebrid.com/v4/user", + params={"agent": "ygg-service", "apikey": ALLDEBRID_API_KEY}, + timeout=5 + ) + data = r.json() + return data.get("data", {}).get("user", {}).get("isPremium", False) + except Exception as e: + print("Erreur AllDebrid:", e) + return False + +def send_ntfy(title, message): + # Envoie une notification sur le topic ntfy, avec token si nécessaire + headers = {"Title": title} + if NTFY_TOKEN: + headers["Authorization"] = f"Bearer {NTFY_TOKEN}" + + try: + r = requests.post( + NTFY_TOPIC_URL, + data=message.encode("utf-8"), + headers=headers, + timeout=5 + ) + if r.status_code not in (200, 201): + print(f"❌ Échec notification ({r.status_code}): {r.text}") + else: + print(f"✅ Notification envoyée : {title}") + except Exception as e: + print("Erreur ntfy:", e) + +def search_torrents(query: str, category_id: str | None = None) -> list[dict]: + """Recherche des torrents sur l'API Yggtorrent, retourne la liste brute.""" + url = "https://yggapi.eu/torrents" + params = { + "page": 1, + "q": query, + "order_by": "uploaded_at", + "per_page": 100 + } + if category_id: + params["category_id"] = category_id + + results = [] + try: + for page in range(1, NB_PAGES + 1): + params['page'] = page + r = requests.get(url, params=params, timeout=5) + r.raise_for_status() + data = r.json() + results.extend(data) + except requests.exceptions.RequestException as e: + print("Erreur API Yggtorrent:", e) + results = [] + + return results + +def filter_and_format_torrents(torrents: list[dict]) -> list[dict]: + """Filtre les torrents trop gros et formate la taille et l'âge.""" + filtered = [] + for t in torrents: + if t['size'] <= MAX_SIZE_BYTES: + t['size'] = format_size(t['size']) + days, human = calculate_age(t['uploaded_at']) + t['age_days'] = days + t['age_human'] = human + filtered.append(t) + return filtered + +def download_torrent_ygg(torrent_id: str) -> tuple[bytes, str]: + # Télécharge le fichier .torrent depuis Yggtorrent + url = f"https://yggapi.eu/torrent/{torrent_id}/download" + params = { + "passkey": YGG_PASSKEY, + "tracker_domain": "tracker.p2p-world.net" + } + r = requests.get(url, params=params, timeout=10, allow_redirects=True) + r.raise_for_status() + filename = f"{torrent_id}.torrent" + return r.content, filename + +def upload_to_alldebrid(torrent_content: bytes, filename: str) -> str: + # Upload du torrent sur Alldebrid, retourne l'ID + headers = {"Authorization": f"Bearer {ALLDEBRID_API_KEY}"} + files = {'files[]': (filename, BytesIO(torrent_content))} + r = requests.post("https://api.alldebrid.com/v4/magnet/upload/file", headers=headers, files=files, timeout=20) + r.raise_for_status() + data = r.json() + if data.get("status") != "success": + raise RuntimeError(f"Upload Alldebrid échoué : {data}") + return data["data"]["files"][0]["id"] + +def get_alldebrid_links(debrid_id: str) -> list[dict]: + # Récupère les liens directs Alldebrid à partir d'un ID + headers = {"Authorization": f"Bearer {ALLDEBRID_API_KEY}"} + + # Récupération des fichiers du magnet + r = requests.get("https://api.alldebrid.com/v4/magnet/files", headers=headers, params={"id[]": debrid_id}, timeout=10) + r.raise_for_status() + data = r.json() + if data.get("status") != "success": + raise RuntimeError(f"Récupération liens Alldebrid échouée : {data}") + + alldebrid_links = [] + for magnet in data.get("data", {}).get("magnets", []): + for file in magnet.get("files", []): + if "e" in file: + for entry in file["e"]: + if "l" in entry: + alldebrid_links.append(entry["l"]) + elif "l" in file: + alldebrid_links.append(file["l"]) + + # Débloquer les liens + direct_links = [] + for link in alldebrid_links: + r2 = requests.get("https://api.alldebrid.com/v4/link/unlock", headers=headers, params={"link": link}, timeout=10) + r2.raise_for_status() + info = r2.json() + if info.get("status") != "success": + raise RuntimeError(f"Déblocage lien Alldebrid échoué : {info}") + direct_links.append({ + "name": info["data"]["filename"], + "size": info["data"]["filesize"], + "link": info["data"]["link"] + }) + + return direct_links \ No newline at end of file diff --git a/backend/auth.py b/backend/auth.py index 38865d3..a5d0046 100644 --- a/backend/auth.py +++ b/backend/auth.py @@ -1,15 +1,20 @@ from datetime import datetime, timedelta from flask import session from flask_login import login_user -from . import db -from backend.models import User, LoginIP -from backend.alldebrid import check_alldebrid_status, send_ntfy -from backend.security import verify_password + +from backend.api import check_alldebrid_status, send_ntfy +from backend.security import verify_password, needs_rehash, hash_password MAX_ATTEMPTS = 5 BLOCK_TIME = timedelta(minutes=15) def authenticate_user(username: str, password: str, ip: str): + from . import db + from backend.models import User, LoginIP + + now = datetime.utcnow() + + # --- Gestion IP --- ip_record = LoginIP.query.filter_by(ip=ip).first() if not ip_record: ip_record = LoginIP(ip=ip) @@ -17,34 +22,41 @@ def authenticate_user(username: str, password: str, ip: str): db.session.commit() # IP bloquée - if ip_record.blocked_until and datetime.utcnow() < ip_record.blocked_until: - remaining = int((ip_record.blocked_until - datetime.utcnow()).total_seconds() // 60) + 1 + if ip_record.blocked_until and now < ip_record.blocked_until: + remaining = int((ip_record.blocked_until - now).total_seconds() // 60) + 1 return None, f"Trop de tentatives depuis votre IP. Réessayez dans {remaining} min." + # --- Récupération utilisateur --- user = User.query.filter_by(username=username).first() if user and verify_password(password, user.password): - # Reset IP + # Upgrade du hash si nécessaire + if needs_rehash(user.password): + user.password = hash_password(password) + + # Reset compteur IP ip_record.count = 0 ip_record.blocked_until = None + ip_record.last_attempt = now db.session.commit() login_user(user) - session['user'] = user.username # Vérification AllDebrid premium = check_alldebrid_status() session['alldebrid_active'] = premium - if premium: + if not premium: send_ntfy("AllDebrid non premium", "Tentative avortée sur ygg-service !") return user, None + + # --- Échec de connexion --- + ip_record.count += 1 + ip_record.last_attempt = now + if ip_record.count >= MAX_ATTEMPTS: + ip_record.blocked_until = now + BLOCK_TIME + msg = f"Trop de tentatives depuis votre IP. Blocage pour {BLOCK_TIME.seconds // 60} minutes." else: - ip_record.count += 1 - ip_record.last_attempt = datetime.utcnow() - if ip_record.count >= MAX_ATTEMPTS: - ip_record.blocked_until = datetime.utcnow() + BLOCK_TIME - msg = f"Trop de tentatives. Blocage pour {BLOCK_TIME.seconds // 60} minutes." - else: - msg = f"Identifiants invalides ({ip_record.count}/{MAX_ATTEMPTS})" - db.session.commit() - return None, msg \ No newline at end of file + msg = f"Identifiants invalides ({ip_record.count}/{MAX_ATTEMPTS})" + + db.session.commit() + return None, msg diff --git a/backend/routes.py b/backend/routes.py index 33685f9..ec2351f 100644 --- a/backend/routes.py +++ b/backend/routes.py @@ -1,27 +1,17 @@ # Standard library -from datetime import timedelta -from io import BytesIO -import time +import time, os # Third-party libraries import requests from flask import render_template, request, redirect, url_for, session, flash, jsonify, make_response from flask_login import logout_user, login_required, current_user -from flask import request, render_template, redirect, url_for, flash - # Project imports -from backend.utils import format_size, calculate_age from backend.auth import authenticate_user +from backend.api import search_torrents, filter_and_format_torrents, download_torrent_ygg, upload_to_alldebrid, get_alldebrid_links -MAX_ATTEMPTS = 5 -BLOCK_TIME = timedelta(minutes=15) - -MAX_SIZE_BYTES = 5*1024**3 -NB_PAGES = 2 - -YGG_PASSKEY = "xj1MgNuyzFKCjOtnawGBC2egDOciUg04" -ALLDEBRID_KEY = "mtrQI4h583rHe2ZpvpbC" +YGG_PASSKEY = os.getenv("YGG_PASSKEY") +ALLDEBRID_KEY = os.getenv("ALLDEBRID_API_KEY") def init_app(app): @@ -66,46 +56,11 @@ def init_app(app): query = request.args.get('query') category_id = request.args.get('category_id') - # Préparer l'URL - url = "https://yggapi.eu/torrents" + # 1. Recherche brute + torrents = search_torrents(query, category_id) - # Préparer les paramètres - params = { - "page": 1, - "q": query, - "order_by": "uploaded_at", - "per_page": 100 - } - - # Ajouter la catégorie seulement si elle est renseignée et non vide - if category_id: - params["category_id"] = category_id - - results = [] - - # Appeler l'API - try: - for page in range(1, NB_PAGES): - params['page'] = page - print("Appel API page", page, "avec params:", params) - response = requests.get(url, params=params, timeout=5) - response.raise_for_status() - data = response.json() # ici c'est une liste directement - print(f"Nombre de torrents reçus page {page}:", len(data)) - results.extend(data) - except Exception as e: - print("Erreur API Yggtorrent:", e) - results = [] - - filtered_results = [] - - for torrent in results: - if torrent['size'] <= MAX_SIZE_BYTES: - torrent['size'] = format_size(torrent['size']) - days, human = calculate_age(torrent['uploaded_at']) - torrent['age_days'] = days # to show - torrent['age_human'] = human # to sort - filtered_results.append(torrent) + # 2. Filtrage et formatage + filtered_results = filter_and_format_torrents(torrents) return render_template( 'search_results.html', @@ -117,124 +72,24 @@ def init_app(app): @app.route("/torrent/") @login_required def torrent_details(torrent_id): - # --- 1er bloc : récupération du torrent --- try: - url = f"https://yggapi.eu/torrent/{torrent_id}/download" - params = { - "passkey": YGG_PASSKEY, - "tracker_domain": "tracker.p2p-world.net" - } - response = requests.get(url, params=params, timeout=10, allow_redirects=True) - response.raise_for_status() - torrent_file_content = response.content - torrent_filename = f"{torrent_id}.torrent" + # 1. Téléchargement depuis Ygg + torrent_content, filename = download_torrent_ygg(torrent_id) + + # 2. Upload sur Alldebrid + debrid_id = upload_to_alldebrid(torrent_content, filename) + + # Pause courte si nécessaire (simule un délai API) + time.sleep(2) + + # 3. Récupération des liens directs + direct_links = get_alldebrid_links(debrid_id) - # Request error except requests.exceptions.RequestException as e: - print("Erreur lors de la récupération du torrent depuis Yggtorrent:", e) - return "Erreur récupération torrent", 500 + print("Erreur HTTP:", e) + return "Erreur lors de la communication avec l'API", 500 + except RuntimeError as e: + print(e) + return str(e), 500 - # --- 2e bloc : upload à Alldebrid --- - try: - files = {'files[]': (torrent_filename, BytesIO(torrent_file_content))} - headers = {"Authorization": f"Bearer {ALLDEBRID_KEY}"} - - upload_request = requests.post( - "https://api.alldebrid.com/v4/magnet/upload/file", - headers=headers, - files=files, - timeout=20 - ) - upload_request.raise_for_status() - first_answer = upload_request.json() - - debrid_id = first_answer["data"]["files"][0]["id"] - - # API error - if first_answer["status"] != "success": - print("API error 1 : upload Alldebrid:", first_answer) - return "API error 1 : upload Alldebrid", 500 - - # Request error - except requests.exceptions.RequestException as e: - print("Request error 1 : upload Alldebrid:", e) - return "Request error 1 : upload Alldebrid:", 500 - - print("Alldebrid id:",debrid_id) - - time.sleep(2) - - # --- 3e bloc : second appel Alldebrid --- - try: - headers = {"Authorization": f"Bearer {ALLDEBRID_KEY}"} - - alldebrid_link_request = requests.get( - "https://api.alldebrid.com/v4/magnet/files", - headers=headers, - params={"id[]": debrid_id}, - timeout=10 - ) - alldebrid_link_request.raise_for_status() - second_answer = alldebrid_link_request.json() - - # API error - if second_answer.get("status") != "success": - print("API error 2 : get Alldebrid link:", second_answer) - return "API error 2 : get Alldebrid link", 500 - - alldebrid_links = [] - - for magnet in second_answer.get("data", {}).get("magnets", []): - for file in magnet.get("files", []): - # Cas où il y a une liste "e" - if "e" in file: - for entry in file["e"]: - if "l" in entry: - alldebrid_links.append(entry["l"]) - # Cas où le lien est directement dans "l" - elif "l" in file: - alldebrid_links.append(file["l"]) - - # Request error - except requests.exceptions.RequestException as e: - print("Request error 2 : get Alldebrid link:", e) - return "Request error 2 : get Alldebrid link", 500 - - print("Alldebrid links:",alldebrid_links) - - # --- 4e bloc : troisieme appel Alldebrid --- - direct_links = [] - try: - headers = {"Authorization": f"Bearer {ALLDEBRID_KEY}"} - - for link in alldebrid_links: - direct_link_request = requests.get( - "https://api.alldebrid.com/v4/link/unlock", - headers=headers, - params={"link": link}, - timeout=10 - ) - direct_link_request.raise_for_status() - third_anwser = direct_link_request.json() - - # API error - if third_anwser.get("status") != "success": - print("API error 3 : get direct link:", third_anwser) - return "API error 3 : get direct link", 500 - - # Récupérer les liens directs - direct_links.append({ - "name": third_anwser["data"]["filename"], - "size": third_anwser["data"]["filesize"], - "link": third_anwser["data"]["link"] - }) - - # Request error - except requests.exceptions.RequestException as e: - print("API error 3 : get direct link:", e) - return "API error 3 : get direct link", 500 - - print("Direct links:",direct_links) - - # 3. Retour du lien direct au client return jsonify(direct_links) \ No newline at end of file