Initial commit
This commit is contained in:
50
backend/__init__.py
Normal file
50
backend/__init__.py
Normal file
@@ -0,0 +1,50 @@
|
||||
from flask import Flask
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
from flask_login import LoginManager
|
||||
import os
|
||||
|
||||
db = SQLAlchemy()
|
||||
login_manager = LoginManager()
|
||||
|
||||
def create_app():
|
||||
# Frontend path
|
||||
base_dir = os.path.abspath(os.path.dirname(__file__))
|
||||
template_dir = os.path.join(base_dir, '..', 'frontend', 'templates')
|
||||
static_dir = os.path.join(base_dir, '..', 'frontend', 'static')
|
||||
|
||||
# DB path
|
||||
instance_path = os.path.join(base_dir, '..', 'instance')
|
||||
os.makedirs(instance_path, exist_ok=True)
|
||||
|
||||
app = Flask(
|
||||
__name__,
|
||||
template_folder=template_dir,
|
||||
static_folder=static_dir,
|
||||
instance_path=os.path.abspath(instance_path)
|
||||
)
|
||||
|
||||
app.config['SECRET_KEY'] = 'IddAhxI%$G%2X3joI04i'
|
||||
app.config['SQLALCHEMY_DATABASE_URI'] = f"sqlite:///{os.path.join(app.instance_path, 'users.db')}"
|
||||
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
||||
|
||||
# Init extensions
|
||||
db.init_app(app)
|
||||
login_manager.init_app(app)
|
||||
login_manager.login_view = "login"
|
||||
|
||||
# Import models for user_loader
|
||||
from backend.models import User
|
||||
|
||||
@login_manager.user_loader
|
||||
def load_user(user_id):
|
||||
return User.query.get(int(user_id))
|
||||
|
||||
# Import routes and give app
|
||||
from backend import routes
|
||||
routes.init_app(app)
|
||||
|
||||
# Create DB if needed
|
||||
with app.app_context():
|
||||
db.create_all()
|
||||
|
||||
return app
|
||||
15
backend/models.py
Normal file
15
backend/models.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from . import db
|
||||
from flask_login import UserMixin
|
||||
from datetime import datetime
|
||||
|
||||
class User(UserMixin, db.Model):
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
username = db.Column(db.String(80), unique=True, nullable=False)
|
||||
password = db.Column(db.String(200), nullable=False)
|
||||
|
||||
class LoginIP(db.Model):
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
ip = db.Column(db.String(45), unique=True, nullable=False)
|
||||
count = db.Column(db.Integer, default=0, nullable=False)
|
||||
blocked_until = db.Column(db.DateTime, nullable=True)
|
||||
last_attempt = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
|
||||
246
backend/routes.py
Normal file
246
backend/routes.py
Normal file
@@ -0,0 +1,246 @@
|
||||
# Standard library
|
||||
from datetime import datetime, timedelta
|
||||
from io import BytesIO
|
||||
import time
|
||||
|
||||
# Third-party libraries
|
||||
import requests
|
||||
from flask import render_template, request, redirect, url_for, session, flash, jsonify
|
||||
from flask_login import login_user, logout_user, login_required, current_user
|
||||
from werkzeug.security import check_password_hash
|
||||
|
||||
# Project imports
|
||||
from . import db
|
||||
from backend.models import User, LoginIP
|
||||
from backend.utils import format_size, calculate_age
|
||||
|
||||
MAX_ATTEMPTS = 5
|
||||
BLOCK_TIME = timedelta(minutes=15)
|
||||
|
||||
MAX_SIZE_BYTES = 5*1024**3
|
||||
YGG_PASSKEY = "xj1MgNuyzFKCjOtnawGBC2egDOciUg04"
|
||||
ALLDEBRID_KEY = "mtrQI4h583rHe2ZpvpbC"
|
||||
|
||||
def init_app(app):
|
||||
|
||||
@app.route('/')
|
||||
def home():
|
||||
if 'user' in session:
|
||||
return redirect(url_for('dashboard'))
|
||||
return redirect(url_for('login'))
|
||||
|
||||
@app.route('/login', methods=['GET', 'POST'])
|
||||
def login():
|
||||
ip = request.remote_addr or "unknown"
|
||||
ip_record = LoginIP.query.filter_by(ip=ip).first()
|
||||
if not ip_record:
|
||||
ip_record = LoginIP(ip=ip)
|
||||
db.session.add(ip_record)
|
||||
db.session.commit()
|
||||
|
||||
if ip_record.blocked_until and datetime.utcnow() < ip_record.blocked_until:
|
||||
remaining = int((ip_record.blocked_until - datetime.utcnow()).total_seconds() // 60) + 1
|
||||
flash(f"Trop de tentatives depuis votre IP. Réessayez dans {remaining} min.")
|
||||
return render_template("login.html")
|
||||
|
||||
if request.method == "POST":
|
||||
username = request.form.get("username")
|
||||
password = request.form.get("password")
|
||||
user = User.query.filter_by(username=username).first()
|
||||
|
||||
if user and check_password_hash(user.password, password):
|
||||
ip_record.count = 0
|
||||
ip_record.blocked_until = None
|
||||
db.session.commit()
|
||||
login_user(user)
|
||||
session['user'] = user.username
|
||||
return redirect(url_for("dashboard"))
|
||||
else:
|
||||
ip_record.count += 1
|
||||
ip_record.last_attempt = datetime.utcnow()
|
||||
if ip_record.count >= MAX_ATTEMPTS:
|
||||
ip_record.blocked_until = datetime.utcnow() + BLOCK_TIME
|
||||
msg = f"Trop de tentatives. Blocage pour {BLOCK_TIME.seconds // 60} minutes."
|
||||
else:
|
||||
msg = f"Identifiants invalides ({ip_record.count}/{MAX_ATTEMPTS})"
|
||||
db.session.commit()
|
||||
flash(msg)
|
||||
|
||||
return render_template("login.html")
|
||||
|
||||
@app.route('/dashboard')
|
||||
@login_required
|
||||
def dashboard():
|
||||
return render_template('dashboard.html', user=current_user.username)
|
||||
|
||||
@app.route('/logout')
|
||||
@login_required
|
||||
def logout():
|
||||
logout_user()
|
||||
return redirect(url_for('login'))
|
||||
|
||||
@app.route('/search')
|
||||
@login_required
|
||||
def search():
|
||||
query = request.args.get('query')
|
||||
category_id = request.args.get('category_id')
|
||||
|
||||
# Préparer l'URL
|
||||
url = "https://yggapi.eu/torrents"
|
||||
params = {
|
||||
"page": 1,
|
||||
"q": query,
|
||||
"category_id" : category_id,
|
||||
"order_by": "uploaded_at",
|
||||
"per_page": 100
|
||||
}
|
||||
# Appeler l'API
|
||||
try:
|
||||
response = requests.get(url, params=params, timeout=5)
|
||||
response.raise_for_status() # déclenche une exception si erreur HTTP
|
||||
results = response.json()
|
||||
except Exception as e:
|
||||
print("Erreur API Yggtorrent:", e)
|
||||
results = []
|
||||
|
||||
filtered_results = []
|
||||
|
||||
for torrent in results:
|
||||
if torrent['size'] <= MAX_SIZE_BYTES:
|
||||
torrent['size'] = format_size(torrent['size'])
|
||||
days, human = calculate_age(torrent['uploaded_at'])
|
||||
torrent['age_days'] = days # to show
|
||||
torrent['age_human'] = human # to sort
|
||||
filtered_results.append(torrent)
|
||||
|
||||
return render_template(
|
||||
'search_results.html',
|
||||
query=query,
|
||||
results=filtered_results,
|
||||
user=current_user.username
|
||||
)
|
||||
|
||||
@app.route("/torrent/<torrent_id>")
|
||||
@login_required
|
||||
def torrent_details(torrent_id):
|
||||
# --- 1er bloc : récupération du torrent ---
|
||||
try:
|
||||
url = f"https://yggapi.eu/torrent/{torrent_id}/download"
|
||||
params = {
|
||||
"passkey": YGG_PASSKEY,
|
||||
"tracker_domain": "tracker.p2p-world.net"
|
||||
}
|
||||
response = requests.get(url, params=params, timeout=10, allow_redirects=True)
|
||||
response.raise_for_status()
|
||||
torrent_file_content = response.content
|
||||
torrent_filename = f"{torrent_id}.torrent"
|
||||
|
||||
# Request error
|
||||
except requests.exceptions.RequestException as e:
|
||||
print("Erreur lors de la récupération du torrent depuis Yggtorrent:", e)
|
||||
return "Erreur récupération torrent", 500
|
||||
|
||||
# --- 2e bloc : upload à Alldebrid ---
|
||||
try:
|
||||
files = {'files[]': (torrent_filename, BytesIO(torrent_file_content))}
|
||||
headers = {"Authorization": f"Bearer {ALLDEBRID_KEY}"}
|
||||
|
||||
upload_request = requests.post(
|
||||
"https://api.alldebrid.com/v4/magnet/upload/file",
|
||||
headers=headers,
|
||||
files=files,
|
||||
timeout=20
|
||||
)
|
||||
upload_request.raise_for_status()
|
||||
first_answer = upload_request.json()
|
||||
|
||||
debrid_id = first_answer["data"]["files"][0]["id"]
|
||||
|
||||
# API error
|
||||
if first_answer["status"] != "success":
|
||||
print("API error 1 : upload Alldebrid:", first_answer)
|
||||
return "API error 1 : upload Alldebrid", 500
|
||||
|
||||
# Request error
|
||||
except requests.exceptions.RequestException as e:
|
||||
print("Request error 1 : upload Alldebrid:", e)
|
||||
return "Request error 1 : upload Alldebrid:", 500
|
||||
|
||||
print("Alldebrid id:",debrid_id)
|
||||
|
||||
time.sleep(2)
|
||||
|
||||
# --- 3e bloc : second appel Alldebrid ---
|
||||
try:
|
||||
headers = {"Authorization": f"Bearer {ALLDEBRID_KEY}"}
|
||||
|
||||
alldebrid_link_request = requests.get(
|
||||
"https://api.alldebrid.com/v4/magnet/files",
|
||||
headers=headers,
|
||||
params={"id[]": debrid_id},
|
||||
timeout=10
|
||||
)
|
||||
alldebrid_link_request.raise_for_status()
|
||||
second_answer = alldebrid_link_request.json()
|
||||
|
||||
# API error
|
||||
if second_answer.get("status") != "success":
|
||||
print("API error 2 : get Alldebrid link:", second_answer)
|
||||
return "API error 2 : get Alldebrid link", 500
|
||||
|
||||
alldebrid_links = []
|
||||
|
||||
for magnet in second_answer.get("data", {}).get("magnets", []):
|
||||
for file in magnet.get("files", []):
|
||||
# Cas où il y a une liste "e"
|
||||
if "e" in file:
|
||||
for entry in file["e"]:
|
||||
if "l" in entry:
|
||||
alldebrid_links.append(entry["l"])
|
||||
# Cas où le lien est directement dans "l"
|
||||
elif "l" in file:
|
||||
alldebrid_links.append(file["l"])
|
||||
|
||||
# Request error
|
||||
except requests.exceptions.RequestException as e:
|
||||
print("Request error 2 : get Alldebrid link:", e)
|
||||
return "Request error 2 : get Alldebrid link", 500
|
||||
|
||||
print("Alldebrid links:",alldebrid_links)
|
||||
|
||||
# --- 4e bloc : troisieme appel Alldebrid ---
|
||||
direct_links = []
|
||||
try:
|
||||
headers = {"Authorization": f"Bearer {ALLDEBRID_KEY}"}
|
||||
|
||||
for link in alldebrid_links:
|
||||
direct_link_request = requests.get(
|
||||
"https://api.alldebrid.com/v4/link/unlock",
|
||||
headers=headers,
|
||||
params={"link": link},
|
||||
timeout=10
|
||||
)
|
||||
direct_link_request.raise_for_status()
|
||||
third_anwser = direct_link_request.json()
|
||||
|
||||
# API error
|
||||
if third_anwser.get("status") != "success":
|
||||
print("API error 3 : get direct link:", third_anwser)
|
||||
return "API error 3 : get direct link", 500
|
||||
|
||||
# Récupérer les liens directs
|
||||
direct_links.append({
|
||||
"name": third_anwser["data"]["filename"],
|
||||
"size": third_anwser["data"]["filesize"],
|
||||
"link": third_anwser["data"]["link"]
|
||||
})
|
||||
|
||||
# Request error
|
||||
except requests.exceptions.RequestException as e:
|
||||
print("API error 3 : get direct link:", e)
|
||||
return "API error 3 : get direct link", 500
|
||||
|
||||
print("Direct links:",direct_links)
|
||||
|
||||
# 3. Retour du lien direct au client
|
||||
return jsonify(direct_links)
|
||||
26
backend/utils.py
Normal file
26
backend/utils.py
Normal file
@@ -0,0 +1,26 @@
|
||||
from datetime import datetime, timezone
|
||||
|
||||
def format_size(bytes_size):
|
||||
"""Convertit une taille en octets en format lisible (Ko, Mo, Go, ...)."""
|
||||
for unit in ['o', 'Ko', 'Mo', 'Go', 'To']:
|
||||
if bytes_size < 1024:
|
||||
return f"{bytes_size:.2f} {unit}"
|
||||
bytes_size /= 1024
|
||||
return f"{bytes_size:.2f} Po"
|
||||
|
||||
def calculate_age(uploaded_at_iso):
|
||||
uploaded_date = datetime.fromisoformat(uploaded_at_iso.rstrip('Z')).replace(tzinfo=timezone.utc)
|
||||
now = datetime.now(timezone.utc)
|
||||
delta_days = (now.date() - uploaded_date.date()).days
|
||||
|
||||
# human
|
||||
if delta_days < 30:
|
||||
age_human = f"{delta_days} j"
|
||||
elif delta_days < 365:
|
||||
months = delta_days // 30
|
||||
age_human = f"{months} mois"
|
||||
else:
|
||||
years = delta_days // 365
|
||||
age_human = f"{years} ans"
|
||||
|
||||
return delta_days, age_human
|
||||
Reference in New Issue
Block a user