Files
ygg-service/backend/routes.py
2025-11-11 20:00:08 +01:00

274 lines
9.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Standard library
from datetime import datetime, timedelta
from io import BytesIO
import time
# Third-party libraries
import requests
from flask import render_template, request, redirect, url_for, session, flash, jsonify, make_response
from flask_login import login_user, logout_user, login_required, current_user
from werkzeug.security import check_password_hash
# Project imports
from . import db
from backend.models import User, LoginIP
from backend.utils import format_size, calculate_age
from backend.alldebrid import check_alldebrid_status, send_ntfy
MAX_ATTEMPTS = 5
BLOCK_TIME = timedelta(minutes=15)
MAX_SIZE_BYTES = 5*1024**3
NB_PAGES = 2
YGG_PASSKEY = "xj1MgNuyzFKCjOtnawGBC2egDOciUg04"
ALLDEBRID_KEY = "mtrQI4h583rHe2ZpvpbC"
def init_app(app):
@app.route('/')
def home():
if 'user' in session:
return redirect(url_for('dashboard'))
return redirect(url_for('login'))
@app.route('/login', methods=['GET', 'POST'])
def login():
ip = request.remote_addr or "unknown"
ip_record = LoginIP.query.filter_by(ip=ip).first()
if not ip_record:
ip_record = LoginIP(ip=ip)
db.session.add(ip_record)
db.session.commit()
if ip_record.blocked_until and datetime.utcnow() < ip_record.blocked_until:
remaining = int((ip_record.blocked_until - datetime.utcnow()).total_seconds() // 60) + 1
flash(f"Trop de tentatives depuis votre IP. Réessayez dans {remaining} min.")
return render_template("login.html")
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
user = User.query.filter_by(username=username).first()
if user and check_password_hash(user.password, password):
ip_record.count = 0
ip_record.blocked_until = None
db.session.commit()
login_user(user)
session['user'] = user.username
# --- Vérification AllDebrid ---
print("Vérification en cours")
premium = check_alldebrid_status()
session['alldebrid_active'] = premium
if not premium: # notifier seulement si le compte nest plus premium
print("Envoi notif")
send_ntfy("AllDebrid non premium", "Tentative avortée sur ygg-service !")
return redirect(url_for("dashboard"))
else:
ip_record.count += 1
ip_record.last_attempt = datetime.utcnow()
if ip_record.count >= MAX_ATTEMPTS:
ip_record.blocked_until = datetime.utcnow() + BLOCK_TIME
msg = f"Trop de tentatives. Blocage pour {BLOCK_TIME.seconds // 60} minutes."
else:
msg = f"Identifiants invalides ({ip_record.count}/{MAX_ATTEMPTS})"
db.session.commit()
flash(msg)
return render_template("login.html")
@app.route('/dashboard')
@login_required
def dashboard():
return render_template('dashboard.html', user=current_user.username)
@app.route('/logout')
@login_required
def logout():
logout_user() # Déconnecte Flask-Login
session.clear() # Efface toutes les clés de session
resp = make_response(redirect(url_for('login')))
resp.set_cookie('session', '', expires=0) # supprime le cookie de session
return resp
@app.route('/search')
@login_required
def search():
query = request.args.get('query')
category_id = request.args.get('category_id')
# Préparer l'URL
url = "https://yggapi.eu/torrents"
# Préparer les paramètres
params = {
"page": 1,
"q": query,
"order_by": "uploaded_at",
"per_page": 100
}
# Ajouter la catégorie seulement si elle est renseignée et non vide
if category_id:
params["category_id"] = category_id
results = []
# Appeler l'API
try:
for page in range(1, NB_PAGES):
params['page'] = page
print("Appel API page", page, "avec params:", params)
response = requests.get(url, params=params, timeout=5)
response.raise_for_status()
data = response.json() # ici c'est une liste directement
print(f"Nombre de torrents reçus page {page}:", len(data))
results.extend(data)
except Exception as e:
print("Erreur API Yggtorrent:", e)
results = []
filtered_results = []
for torrent in results:
if torrent['size'] <= MAX_SIZE_BYTES:
torrent['size'] = format_size(torrent['size'])
days, human = calculate_age(torrent['uploaded_at'])
torrent['age_days'] = days # to show
torrent['age_human'] = human # to sort
filtered_results.append(torrent)
return render_template(
'search_results.html',
query=query,
results=filtered_results,
user=current_user.username
)
@app.route("/torrent/<torrent_id>")
@login_required
def torrent_details(torrent_id):
# --- 1er bloc : récupération du torrent ---
try:
url = f"https://yggapi.eu/torrent/{torrent_id}/download"
params = {
"passkey": YGG_PASSKEY,
"tracker_domain": "tracker.p2p-world.net"
}
response = requests.get(url, params=params, timeout=10, allow_redirects=True)
response.raise_for_status()
torrent_file_content = response.content
torrent_filename = f"{torrent_id}.torrent"
# Request error
except requests.exceptions.RequestException as e:
print("Erreur lors de la récupération du torrent depuis Yggtorrent:", e)
return "Erreur récupération torrent", 500
# --- 2e bloc : upload à Alldebrid ---
try:
files = {'files[]': (torrent_filename, BytesIO(torrent_file_content))}
headers = {"Authorization": f"Bearer {ALLDEBRID_KEY}"}
upload_request = requests.post(
"https://api.alldebrid.com/v4/magnet/upload/file",
headers=headers,
files=files,
timeout=20
)
upload_request.raise_for_status()
first_answer = upload_request.json()
debrid_id = first_answer["data"]["files"][0]["id"]
# API error
if first_answer["status"] != "success":
print("API error 1 : upload Alldebrid:", first_answer)
return "API error 1 : upload Alldebrid", 500
# Request error
except requests.exceptions.RequestException as e:
print("Request error 1 : upload Alldebrid:", e)
return "Request error 1 : upload Alldebrid:", 500
print("Alldebrid id:",debrid_id)
time.sleep(2)
# --- 3e bloc : second appel Alldebrid ---
try:
headers = {"Authorization": f"Bearer {ALLDEBRID_KEY}"}
alldebrid_link_request = requests.get(
"https://api.alldebrid.com/v4/magnet/files",
headers=headers,
params={"id[]": debrid_id},
timeout=10
)
alldebrid_link_request.raise_for_status()
second_answer = alldebrid_link_request.json()
# API error
if second_answer.get("status") != "success":
print("API error 2 : get Alldebrid link:", second_answer)
return "API error 2 : get Alldebrid link", 500
alldebrid_links = []
for magnet in second_answer.get("data", {}).get("magnets", []):
for file in magnet.get("files", []):
# Cas où il y a une liste "e"
if "e" in file:
for entry in file["e"]:
if "l" in entry:
alldebrid_links.append(entry["l"])
# Cas où le lien est directement dans "l"
elif "l" in file:
alldebrid_links.append(file["l"])
# Request error
except requests.exceptions.RequestException as e:
print("Request error 2 : get Alldebrid link:", e)
return "Request error 2 : get Alldebrid link", 500
print("Alldebrid links:",alldebrid_links)
# --- 4e bloc : troisieme appel Alldebrid ---
direct_links = []
try:
headers = {"Authorization": f"Bearer {ALLDEBRID_KEY}"}
for link in alldebrid_links:
direct_link_request = requests.get(
"https://api.alldebrid.com/v4/link/unlock",
headers=headers,
params={"link": link},
timeout=10
)
direct_link_request.raise_for_status()
third_anwser = direct_link_request.json()
# API error
if third_anwser.get("status") != "success":
print("API error 3 : get direct link:", third_anwser)
return "API error 3 : get direct link", 500
# Récupérer les liens directs
direct_links.append({
"name": third_anwser["data"]["filename"],
"size": third_anwser["data"]["filesize"],
"link": third_anwser["data"]["link"]
})
# Request error
except requests.exceptions.RequestException as e:
print("API error 3 : get direct link:", e)
return "API error 3 : get direct link", 500
print("Direct links:",direct_links)
# 3. Retour du lien direct au client
return jsonify(direct_links)