Animecli/animecli.py
2025-06-04 18:46:58 +02:00

613 lines
22 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sqlite3
from pathlib import Path
import requests
from bs4 import BeautifulSoup
import yt_dlp
import curses
import re
from urllib.parse import urljoin
import concurrent.futures
import configparser
import os
DB_DIR = Path.home() / ".animecli"
DB_PATH = DB_DIR / "animes.db"
DOWNLOAD_DIR = Path.home() / "MesAnimes"
CONFIG_PATH = DB_DIR / "config.ini"
DEFAULT_CONFIG = {
"vitesse_max": "0",
"telechargements_simultanes": "1",
"download_dir": str(DOWNLOAD_DIR),
"db_path": str(DB_PATH)
}
CONFIG = {}
def load_config():
config = configparser.ConfigParser()
if CONFIG_PATH.exists():
config.read(CONFIG_PATH)
if "animecli" in config:
for k, v in DEFAULT_CONFIG.items():
CONFIG[k] = config["animecli"].get(k, v)
else:
CONFIG.update(DEFAULT_CONFIG)
else:
CONFIG.update(DEFAULT_CONFIG)
CONFIG["vitesse_max"] = int(CONFIG["vitesse_max"])
CONFIG["telechargements_simultanes"] = int(CONFIG["telechargements_simultanes"])
def save_config():
config = configparser.ConfigParser()
config["animecli"] = {k: str(v) for k, v in CONFIG.items()}
DB_DIR.mkdir(parents=True, exist_ok=True)
with open(CONFIG_PATH, "w") as f:
config.write(f)
def init_db():
db_path = Path(CONFIG.get("db_path", str(DB_PATH)))
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(db_path)
c = conn.cursor()
c.execute(
"""
CREATE TABLE IF NOT EXISTS animes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
titre TEXT UNIQUE,
url TEXT,
saison INTEGER,
dernier_episode TEXT
)
"""
)
conn.commit()
return conn
def add_anime(conn, titre, url, saison):
c = conn.cursor()
try:
c.execute(
"INSERT INTO animes (titre, url, saison, dernier_episode) VALUES (?, ?, ?, ?)", (titre, url, saison, "")
)
conn.commit()
return True, f"Anime '{titre}' ajouté."
except sqlite3.IntegrityError:
return False, f"L'anime '{titre}' existe déjà."
def delete_anime(conn, anime_id):
c = conn.cursor()
c.execute("DELETE FROM animes WHERE id = ?", (anime_id,))
conn.commit()
def get_all_animes(conn):
c = conn.cursor()
return c.execute("SELECT id, titre, url, saison, dernier_episode FROM animes").fetchall()
def extract_master_m3u8_url(page_url):
try:
resp = requests.get(page_url)
resp.raise_for_status()
urls = re.findall(r'https?://[^\s\'"]+\.m3u8[^\s\'"]*', resp.text)
for url in urls:
if "master.m3u8" in url:
return url
return urls[0] if urls else None
except Exception:
return None
def get_source_info(url):
if url.endswith('.html'):
m3u8_url = extract_master_m3u8_url(url)
if m3u8_url:
url = m3u8_url
ydl_opts = {
'quiet': True,
'skip_download': True,
'downloader_args': {'hls': ['--hls-use-mpegts']}
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
info = ydl.extract_info(url, download=False)
formats = info.get('formats', [])
if not formats:
return None
best = max(formats, key=lambda f: f.get('height', 0) or 0)
qualite = f"{best.get('height', '?')}p"
return {
'qualite': qualite,
'url': url
}
except Exception:
return None
def choisir_source_globale(stdscr, episode_data):
# Regroupe les sources par index (site)
sources_by_site = []
for i in range(len(episode_data[0][0])): # nombre de sources par épisode
urls = []
for ep in episode_data:
if len(ep[0]) > i:
urls.append(ep[0][i])
if urls:
sources_by_site.append(urls)
# Utilise le premier URL de chaque site pour la qualité
preview_urls = [urls[0] for urls in sources_by_site]
infos = [None] * len(preview_urls)
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_idx = {executor.submit(get_source_info, url): i for i, url in enumerate(preview_urls)}
done = set()
sel = 0
while True:
stdscr.clear()
stdscr.addstr(0, 0, "Choisissez la source à utiliser pour tous les épisodes :")
for idx, url in enumerate(preview_urls):
info = infos[idx]
domain = re.sub(r'^https?://(www\.)?', '', url).split('/')[0]
if info:
line = f"{idx+1}. {domain} ({info['qualite']})"
else:
line = f"{idx+1}. {domain} (chargement...)"
if idx == sel:
stdscr.attron(curses.color_pair(1))
safe_addstr(stdscr, 2 + idx, 2, line)
stdscr.attroff(curses.color_pair(1))
else:
safe_addstr(stdscr, 2 + idx, 2, line)
stdscr.refresh()
try:
for future in concurrent.futures.as_completed(future_to_idx, timeout=0.1):
idx = future_to_idx[future]
if idx not in done:
infos[idx] = future.result()
done.add(idx)
except concurrent.futures.TimeoutError:
pass
k = stdscr.getch()
if k == curses.KEY_UP and sel > 0:
sel -= 1
elif k == curses.KEY_DOWN and sel < len(preview_urls) - 1:
sel += 1
elif k in [curses.KEY_ENTER, 10, 13]:
return sel, sources_by_site[sel]
def extract_episode_sources(url_page):
resp = requests.get(url_page)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
episode_sources = []
script_tag = soup.find('script', src=re.compile(r"episodes\.js"))
if script_tag and "anime-sama.fr" in url_page:
js_src = script_tag.get('src')
js_url = urljoin(url_page, js_src)
js_resp = requests.get(js_url)
js_resp.raise_for_status()
all_eps = re.findall(r"var\s+eps\d+\s*=\s*\[([^\]]+)\]", js_resp.text)
for eps_block in all_eps:
urls = re.findall(r"'(https?://[^']+)'", eps_block)
if not episode_sources:
for _ in urls:
episode_sources.append([])
for idx, url in enumerate(urls):
if idx < len(episode_sources):
episode_sources[idx].append(url)
else:
links = [a.get("href") for a in soup.select("a.episode-link") if a.get("href")]
for url in links:
episode_sources.append([url])
return [(sources, None) for sources in episode_sources]
def format_dernier(dernier_url, saison):
if not dernier_url or dernier_url == "?":
return f"Saison {saison}, épisode ?, aucun"
m = re.search(r'[Ee](\d{1,3})', dernier_url)
if m:
ep = m.group(1)
else:
m2 = re.search(r'(\d{1,3})(?!.*\d)', dernier_url)
ep = m2.group(1) if m2 else "?"
return f"Saison {saison}, épisode {ep}, {dernier_url}"
def telecharger_episode(url, saison_folder, filename, qualite):
if url.endswith('.html'):
m3u8_url = extract_master_m3u8_url(url)
if m3u8_url:
url = m3u8_url
ydl_opts = {
"outtmpl": str(saison_folder / filename),
"format": f"bestvideo[height<={qualite.rstrip('p')}]+bestaudio/best",
"downloader": "ffmpeg",
"downloader_args": {"hls": ["--hls-use-mpegts"]}
}
if CONFIG["vitesse_max"] > 0:
ydl_opts["ratelimit"] = CONFIG["vitesse_max"] * 1024
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
return True, url
except Exception as e:
print(f"Erreur download {url}: {e}")
return False, url
def handle_multi_download(stdscr, conn):
animes = get_all_animes(conn)
if not animes:
stdscr.clear()
stdscr.addstr(0, 0, "Aucun anime disponible.")
stdscr.addstr(2, 0, "Appuyez sur une touche pour revenir au menu.")
stdscr.getch()
return
sel = 0
while True:
stdscr.clear()
stdscr.addstr(0, 0, "Sélectionnez un anime :")
for idx, (_, titre, _, saison, dernier) in enumerate(animes):
line = f"{titre} (Saison {saison}, dernier: {format_dernier(dernier, saison)})"
if idx == sel:
stdscr.attron(curses.color_pair(1))
safe_addstr(stdscr, 2 + idx, 2, line)
stdscr.attroff(curses.color_pair(1))
else:
safe_addstr(stdscr, 2 + idx, 2, line)
stdscr.refresh()
k = stdscr.getch()
if k == curses.KEY_UP and sel > 0:
sel -= 1
elif k == curses.KEY_DOWN and sel < len(animes) - 1:
sel += 1
elif k in [curses.KEY_ENTER, 10, 13]:
break
anime_id, titre, url_page, saison, dernier = animes[sel]
try:
episode_data = extract_episode_sources(url_page)
except Exception as e:
stdscr.clear()
stdscr.addstr(0, 0, f"Erreur récupération des sources: {e}")
stdscr.getch()
return
if not episode_data or not episode_data[0][0]:
stdscr.clear()
stdscr.addstr(0, 0, "Aucune source trouvée.")
stdscr.getch()
return
# Choix de la source globale (site)
sel_src, urls_par_source = choisir_source_globale(stdscr, episode_data)
# Vérification des épisodes déjà présents
base_folder = Path(CONFIG["download_dir"]) / titre
saison_folder = base_folder / f"Saison {int(saison):02d}"
present = set()
if saison_folder.exists():
for f in saison_folder.glob(f"{titre.replace(' ', '_')} - S{int(saison):02d}E*.mp4"):
m = re.search(r"S(\d{2})E(\d{2})", f.name)
if m:
ep_num = int(m.group(2))
present.add(ep_num)
selected = []
for idx in range(len(episode_data)):
selected.append((idx + 1) not in present)
cursor = 0
scroll_offset = 0
while True:
stdscr.clear()
max_y, max_x = stdscr.getmaxyx()
visible_lines = max_y - 3
total_lines = len(episode_data) + 1
if cursor < scroll_offset:
scroll_offset = cursor
elif cursor >= scroll_offset + visible_lines:
scroll_offset = cursor - visible_lines + 1
stdscr.addstr(0, 0, "Sélectionnez les épisodes à télécharger (Espace pour cocher, Entrée pour valider) :")
tout_sel = all(selected) and len(selected) > 0
mark = "[X]" if tout_sel else "[ ]"
y = 2
if scroll_offset == 0:
if cursor == 0:
stdscr.attron(curses.color_pair(1))
safe_addstr(stdscr, y, 2, f"{mark} Tout sélectionner")
stdscr.attroff(curses.color_pair(1))
else:
safe_addstr(stdscr, y, 2, f"{mark} Tout sélectionner")
y += 1
for idx in range(len(episode_data)):
line_idx = idx + 1
if line_idx < scroll_offset:
continue
if y - 2 >= visible_lines:
break
mark = "[X]" if selected[idx] else "[ ]"
extra = " (Déjà téléchargé)" if (idx + 1) in present else ""
line = f"{mark} Épisode {idx+1}{extra}"
if cursor == line_idx:
stdscr.attron(curses.color_pair(1))
safe_addstr(stdscr, y, 2, line)
stdscr.attroff(curses.color_pair(1))
else:
safe_addstr(stdscr, y, 2, line)
y += 1
stdscr.refresh()
k = stdscr.getch()
if k == curses.KEY_UP and cursor > 0:
cursor -= 1
elif k == curses.KEY_DOWN and cursor < total_lines - 1:
cursor += 1
elif k == ord(' '):
if cursor == 0:
new_val = not all(selected)
for i in range(len(selected)):
selected[i] = new_val
else:
selected[cursor - 1] = not selected[cursor - 1]
elif k in [curses.KEY_ENTER, 10, 13]:
break
to_download = [i for i, sel_ in enumerate(selected) if sel_]
if not to_download:
stdscr.clear()
stdscr.addstr(0, 0, "Aucun épisode sélectionné.")
stdscr.getch()
return
qualite = "1080p"
download_queue = []
for idx in to_download:
if idx >= len(urls_par_source):
continue
chosen_url = urls_par_source[idx]
saison_str = f"S{int(saison):02d}"
ep_str = f"E{idx+1:02d}"
saison_folder.mkdir(parents=True, exist_ok=True)
filename = f"{titre.replace(' ', '_')} - {saison_str}{ep_str}.%(ext)s"
download_queue.append((chosen_url, saison_folder, filename, qualite, idx+1))
curses.endwin()
episodes_dl = []
with concurrent.futures.ThreadPoolExecutor(max_workers=CONFIG.get("telechargements_simultanes", 2)) as executor:
futures = [
executor.submit(telecharger_episode, url, folder, fname, qualite)
for url, folder, fname, qualite, epnum in download_queue
]
for (url, folder, fname, qualite, epnum), future in zip(download_queue, futures):
success, url = future.result()
if success:
print(f"Téléchargement réussi: {url}")
episodes_dl.append(epnum)
else:
print(f"Echec téléchargement: {url}")
# Mise à jour du dernier épisode téléchargé dans la DB (le plus grand numéro téléchargé)
if episodes_dl:
max_ep = max(episodes_dl)
saison_str = f"S{int(saison):02d}"
ep_str = f"E{max_ep:02d}"
last_file = f"{titre.replace(' ', '_')} - {saison_str}{ep_str}.mp4"
c = conn.cursor()
c.execute("UPDATE animes SET dernier_episode = ? WHERE id = ?", (last_file, anime_id))
conn.commit()
def curses_menu(stdscr, conn):
curses.curs_set(0)
menu = [
"Ajouter un anime",
"Lister les animes",
"Télécharger plusieurs épisodes",
"Supprimer un anime",
"Configuration",
"Quitter"
]
cursor_y = 0
while True:
stdscr.clear()
stdscr.addstr(0, 2, "animecli - Gestion d'animes", curses.A_BOLD)
for idx, item in enumerate(menu):
x = 4
y = 2 + idx
if y - 2 == cursor_y:
stdscr.attron(curses.color_pair(1))
stdscr.addstr(y, x, item)
stdscr.attroff(curses.color_pair(1))
else:
stdscr.addstr(y, x, item)
stdscr.refresh()
k = stdscr.getch()
if k == curses.KEY_UP and cursor_y > 0:
cursor_y -= 1
elif k == curses.KEY_DOWN and cursor_y < len(menu) - 1:
cursor_y += 1
elif k in [curses.KEY_ENTER, 10, 13]:
if menu[cursor_y] == "Ajouter un anime":
handle_add(stdscr, conn)
elif menu[cursor_y] == "Lister les animes":
handle_list(stdscr, conn)
elif menu[cursor_y] == "Télécharger plusieurs épisodes":
handle_multi_download(stdscr, conn)
elif menu[cursor_y] == "Supprimer un anime":
handle_delete(stdscr, conn)
elif menu[cursor_y] == "Configuration":
handle_config(stdscr)
elif menu[cursor_y] == "Quitter":
break
def get_input(stdscr, prompt):
curses.curs_set(1)
stdscr.clear()
stdscr.addstr(0, 0, prompt)
stdscr.refresh()
win = curses.newwin(1, 60, 2, 0)
curses.echo()
value = win.getstr().decode('utf-8')
curses.noecho()
curses.curs_set(0)
return value.strip()
def popup_menu(stdscr, title, options, default_sel=0):
popup_height = len(options) + 4
popup_width = max(len(title), max(len(opt) for opt in options) + 4, 20) + 4
popup = curses.newwin(popup_height, popup_width,
(curses.LINES - popup_height) // 2,
(curses.COLS - popup_width) // 2)
popup.keypad(1)
popup.box()
sel = default_sel
while True:
popup.clear()
popup.box()
popup.addstr(1, 2, title)
for idx, option in enumerate(options):
if idx == sel:
popup.attron(curses.color_pair(1))
popup.addstr(idx + 3, 2, option)
popup.attroff(curses.color_pair(1))
else:
popup.addstr(idx + 3, 2, option)
popup.refresh()
k = popup.getch()
if k == curses.KEY_UP and sel > 0:
sel -= 1
elif k == curses.KEY_DOWN and sel < len(options) - 1:
sel += 1
elif k in [curses.KEY_ENTER, 10, 13]:
return sel
elif k in [27, ord('q')]:
return None
def handle_config(stdscr):
options = [
f"Vitesse max: {CONFIG['vitesse_max']} KB/s (0 = illimité)",
f"Téléchargements simultanés: {CONFIG['telechargements_simultanes']}",
f"Dossier de téléchargement: {CONFIG['download_dir']}",
f"Emplacement de la base de données: {CONFIG['db_path']}"
]
sel = 0
while True:
stdscr.clear()
stdscr.addstr(0, 0, "Configuration du téléchargement:")
for idx, option in enumerate(options):
if idx == sel:
stdscr.attron(curses.color_pair(1))
stdscr.addstr(idx + 2, 2, option)
stdscr.attroff(curses.color_pair(1))
else:
stdscr.addstr(idx + 2, 2, option)
stdscr.addstr(len(options) + 3, 0, "Entrée pour modifier, 'q' pour quitter")
stdscr.refresh()
k = stdscr.getch()
if k == curses.KEY_UP and sel > 0:
sel -= 1
elif k == curses.KEY_DOWN and sel < len(options) - 1:
sel += 1
elif k in [curses.KEY_ENTER, 10, 13]:
if sel == 0:
speed_options = ["0 (Illimité)", "512", "1024", "2048", "4096"]
speed_idx = popup_menu(stdscr, "Limite de vitesse (KB/s):", speed_options)
if speed_idx is not None:
CONFIG["vitesse_max"] = int(speed_options[speed_idx].split()[0])
options[0] = f"Vitesse max: {CONFIG['vitesse_max']} KB/s (0 = illimité)"
elif sel == 1:
sim_options = ["1", "2", "3", "4"]
sim_idx = popup_menu(stdscr, "Téléchargements simultanés :", sim_options)
if sim_idx is not None:
CONFIG["telechargements_simultanes"] = int(sim_options[sim_idx])
options[1] = f"Téléchargements simultanés: {CONFIG['telechargements_simultanes']}"
elif sel == 2:
new_dir = get_input(stdscr, "Nouveau dossier de téléchargement: ")
if new_dir:
CONFIG["download_dir"] = new_dir
options[2] = f"Dossier de téléchargement: {CONFIG['download_dir']}"
elif sel == 3:
new_db = get_input(stdscr, "Nouvel emplacement de la base de données: ")
if new_db:
CONFIG["db_path"] = new_db
options[3] = f"Emplacement de la base de données: {CONFIG['db_path']}"
save_config()
elif k in [ord('q'), ord('Q')]:
break
def handle_add(stdscr, conn):
titre = get_input(stdscr, "Titre de l'anime: ")
url = get_input(stdscr, "URL de la page d'épisodes: ")
saison = get_input(stdscr, "Numéro de saison (ex: 1): ")
try:
saison = int(saison)
except Exception:
saison = 1
success, msg = add_anime(conn, titre, url, saison)
stdscr.clear()
stdscr.addstr(0, 0, msg)
stdscr.addstr(2, 0, "Appuyez sur une touche pour revenir au menu.")
stdscr.getch()
def handle_list(stdscr, conn):
stdscr.clear()
animes = get_all_animes(conn)
if not animes:
stdscr.addstr(0, 0, "Aucun anime dans la liste.")
else:
stdscr.addstr(0, 0, "Animes suivis:")
for idx, (_, titre, _, saison, dernier) in enumerate(animes, start=1):
stdscr.addstr(idx, 2, f"- {titre} ({format_dernier(dernier, saison)})")
stdscr.addstr(len(animes) + 2, 0, "Appuyez sur une touche pour revenir au menu.")
stdscr.getch()
def handle_delete(stdscr, conn):
animes = get_all_animes(conn)
if not animes:
stdscr.clear()
stdscr.addstr(0, 0, "Aucun anime à supprimer.")
stdscr.addstr(2, 0, "Appuyez sur une touche pour revenir au menu.")
stdscr.getch()
return
sel = 0
while True:
stdscr.clear()
stdscr.addstr(0, 0, "Sélectionnez un anime à supprimer :")
for idx, (_, titre, _, saison, dernier) in enumerate(animes):
line = f"{titre} (Saison {saison}, dernier: {format_dernier(dernier, saison)})"
if idx == sel:
stdscr.attron(curses.color_pair(1))
safe_addstr(stdscr, 2 + idx, 2, line)
stdscr.attroff(curses.color_pair(1))
else:
safe_addstr(stdscr, 2 + idx, 2, line)
stdscr.refresh()
k = stdscr.getch()
if k == curses.KEY_UP and sel > 0:
sel -= 1
elif k == curses.KEY_DOWN and sel < len(animes) - 1:
sel += 1
elif k in [curses.KEY_ENTER, 10, 13]:
break
anime_id = animes[sel][0]
delete_anime(conn, anime_id)
stdscr.clear()
stdscr.addstr(0, 0, "Anime supprimé.")
stdscr.addstr(2, 0, "Appuyez sur une touche pour revenir au menu.")
stdscr.getch()
def safe_addstr(stdscr, y, x, text):
max_y, max_x = stdscr.getmaxyx()
if 0 <= y < max_y:
stdscr.addstr(y, x, text[:max_x - x - 1])
def main():
load_config()
conn = init_db()
curses.wrapper(setup_and_run, conn)
def setup_and_run(stdscr, conn):
curses.start_color()
curses.init_pair(1, curses.COLOR_BLACK, curses.COLOR_CYAN)
curses_menu(stdscr, conn)
if __name__ == "__main__":
main()