Animecli/animecli.py
2025-06-04 02:02:45 +02:00

459 lines
17 KiB
Python

import sqlite3
from pathlib import Path
import requests
from bs4 import BeautifulSoup
import yt_dlp
import curses
import re
from urllib.parse import urljoin
import concurrent.futures
DB_DIR = Path.home() / ".animecli"
DB_PATH = DB_DIR / "animes.db"
DOWNLOAD_DIR = Path.home() / "MesAnimes"
CONFIG = {
"vitesse_max": 0, # 0 = illimité, sinon en Ko/s
"telechargements_simultanes": 1 # Nombre de téléchargements simultanés
}
def init_db():
DB_DIR.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(
"""
CREATE TABLE IF NOT EXISTS animes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
titre TEXT UNIQUE,
url TEXT,
saison INTEGER,
dernier_episode TEXT
)
"""
)
conn.commit()
return conn
def add_anime(conn, titre, url, saison):
c = conn.cursor()
try:
c.execute(
"INSERT INTO animes (titre, url, saison, dernier_episode) VALUES (?, ?, ?, ?)", (titre, url, saison, "")
)
conn.commit()
return True, f"Anime '{titre}' ajouté."
except sqlite3.IntegrityError:
return False, f"L'anime '{titre}' existe déjà."
def delete_anime(conn, anime_id):
c = conn.cursor()
c.execute("DELETE FROM animes WHERE id = ?", (anime_id,))
conn.commit()
def get_all_animes(conn):
c = conn.cursor()
return c.execute("SELECT id, titre, url, saison, dernier_episode FROM animes").fetchall()
def get_source_info(url):
ydl_opts = {
'quiet': True,
'skip_download': True,
'downloader_args': {'hls': ['--hls-use-mpegts']}
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
info = ydl.extract_info(url, download=False)
formats = info.get('formats', [])
if not formats:
return None
best = max(formats, key=lambda f: f.get('height', 0) or 0)
taille = best.get('filesize') or best.get('filesize_approx')
taille_mo = f"{taille // (1024*1024)} Mo" if taille else "?"
qualite = f"{best.get('height', '?')}p"
return {
'qualite': qualite,
'taille': taille_mo,
'title': info.get('title', ''),
'url': url
}
except Exception:
return None
def choisir_source(stdscr, sources_infos):
sel = 0
while True:
stdscr.clear()
stdscr.addstr(0, 0, "Choisissez la source pour cet épisode :")
for idx, info in enumerate(sources_infos):
line = f"{idx+1}. {info['url']} ({info['qualite']}, {info['taille']})"
if idx == sel:
stdscr.attron(curses.color_pair(1))
stdscr.addstr(2 + idx, 2, line)
stdscr.attroff(curses.color_pair(1))
else:
stdscr.addstr(2 + idx, 2, line)
stdscr.refresh()
k = stdscr.getch()
if k == curses.KEY_UP and sel > 0:
sel -= 1
elif k == curses.KEY_DOWN and sel < len(sources_infos) - 1:
sel += 1
elif k in [curses.KEY_ENTER, 10, 13]:
return sel # retourne l'index choisi
def extract_episode_sources(url_page):
resp = requests.get(url_page)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
episode_sources = []
script_tag = soup.find('script', src=re.compile(r"episodes\.js"))
if script_tag and "anime-sama.fr" in url_page:
js_src = script_tag.get('src')
js_url = urljoin(url_page, js_src)
js_resp = requests.get(js_url)
js_resp.raise_for_status()
all_eps = re.findall(r"var\s+eps\d+\s*=\s*\[([^\]]+)\]", js_resp.text)
for eps_block in all_eps:
urls = re.findall(r"'(https?://[^']+)'", eps_block)
if not episode_sources:
for _ in urls:
episode_sources.append([])
for idx, url in enumerate(urls):
if idx < len(episode_sources):
episode_sources[idx].append(url)
else:
links = [a.get("href") for a in soup.select("a.episode-link") if a.get("href")]
for url in links:
episode_sources.append([url])
return [(sources, None) for sources in episode_sources]
def format_dernier(dernier_url, saison):
m = re.search(r'[Ee](\d{1,3})', dernier_url or "")
ep = m.group(1) if m else "?"
return f"Saison {saison}, épisode {ep}, {dernier_url or 'aucun'}"
def telecharger_episode(url, saison_folder, filename, qualite):
ydl_opts = {
"outtmpl": str(saison_folder / filename),
"format": f"bestvideo[height<={qualite.rstrip('p')}]+bestaudio/best",
"downloader": "ffmpeg",
"downloader_args": {"hls": ["--hls-use-mpegts"]}
}
if CONFIG["vitesse_max"] > 0:
ydl_opts["ratelimit"] = CONFIG["vitesse_max"] * 1024
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
return True, url
except Exception as e:
print(f"Erreur download {url}: {e}")
return False, url
def handle_multi_download(stdscr, conn):
animes = get_all_animes(conn)
if not animes:
stdscr.clear()
stdscr.addstr(0, 0, "Aucun anime disponible.")
stdscr.addstr(2, 0, "Appuyez sur une touche pour revenir au menu.")
stdscr.getch()
return
sel = 0
while True:
stdscr.clear()
stdscr.addstr(0, 0, "Sélectionnez un anime :")
for idx, (_, titre, _, saison, dernier) in enumerate(animes):
line = f"{titre} (Saison {saison}, dernier: {format_dernier(dernier, saison)})"
if idx == sel:
stdscr.attron(curses.color_pair(1))
stdscr.addstr(2 + idx, 2, line)
stdscr.attroff(curses.color_pair(1))
else:
stdscr.addstr(2 + idx, 2, line)
stdscr.refresh()
k = stdscr.getch()
if k == curses.KEY_UP and sel > 0:
sel -= 1
elif k == curses.KEY_DOWN and sel < len(animes) - 1:
sel += 1
elif k in [curses.KEY_ENTER, 10, 13]:
break
anime_id, titre, url_page, saison, dernier = animes[sel]
try:
episode_data = extract_episode_sources(url_page)
except Exception as e:
stdscr.clear()
stdscr.addstr(0, 0, f"Erreur récupération des sources: {e}")
stdscr.getch()
return
if not episode_data:
stdscr.clear()
stdscr.addstr(0, 0, "Aucun épisode trouvé.")
stdscr.getch()
return
selected = [False] * len(episode_data)
cursor = 0
while True:
stdscr.clear()
stdscr.addstr(0, 0, "Sélectionnez les épisodes à télécharger (Espace pour cocher, Entrée pour valider) :")
for idx, (sources, _) in enumerate(episode_data):
mark = "[X]" if selected[idx] else "[ ]"
line = f"{mark} Épisode {idx+1}"
if idx == cursor:
stdscr.attron(curses.color_pair(1))
stdscr.addstr(2 + idx, 2, line)
stdscr.attroff(curses.color_pair(1))
else:
stdscr.addstr(2 + idx, 2, line)
stdscr.refresh()
k = stdscr.getch()
if k == curses.KEY_UP and cursor > 0:
cursor -= 1
elif k == curses.KEY_DOWN and cursor < len(episode_data) - 1:
cursor += 1
elif k == ord(' '):
selected[cursor] = not selected[cursor]
elif k in [curses.KEY_ENTER, 10, 13]:
break
to_download = [i for i, sel_ in enumerate(selected) if sel_]
if not to_download:
stdscr.clear()
stdscr.addstr(0, 0, "Aucun épisode sélectionné.")
stdscr.getch()
return
qualite = "1080p"
base_folder = DOWNLOAD_DIR / titre
download_queue = []
for idx in to_download:
sources, _ = episode_data[idx]
infos = []
for src in sources:
info = get_source_info(src)
if info:
infos.append(info)
if not infos:
continue
sel_src = choisir_source(stdscr, infos) if len(infos) > 1 else 0
chosen_url = infos[sel_src]['url']
saison_str = f"S{int(saison):02d}"
ep_str = f"E{idx+1:02d}"
saison_folder = base_folder / f"Saison {int(saison):02d}"
saison_folder.mkdir(parents=True, exist_ok=True)
filename = f"{titre.replace(' ', '_')} - {saison_str}{ep_str}.%(ext)s"
download_queue.append((chosen_url, saison_folder, filename, qualite))
curses.endwin()
with concurrent.futures.ThreadPoolExecutor(max_workers=CONFIG.get("telechargements_simultanes", 2)) as executor:
futures = [
executor.submit(telecharger_episode, url, folder, fname, qualite)
for url, folder, fname, qualite in download_queue
]
for future in concurrent.futures.as_completed(futures):
success, url = future.result()
if success:
print(f"Téléchargement réussi: {url}")
else:
print(f"Echec téléchargement: {url}")
def curses_menu(stdscr, conn):
curses.curs_set(0)
menu = [
"Ajouter un anime",
"Lister les animes",
"Télécharger plusieurs épisodes",
"Supprimer un anime",
"Configuration",
"Quitter"
]
cursor_y = 0
while True:
stdscr.clear()
stdscr.addstr(0, 2, "animecli - Gestion d'animes", curses.A_BOLD)
for idx, item in enumerate(menu):
x = 4
y = 2 + idx
if y - 2 == cursor_y:
stdscr.attron(curses.color_pair(1))
stdscr.addstr(y, x, item)
stdscr.attroff(curses.color_pair(1))
else:
stdscr.addstr(y, x, item)
stdscr.refresh()
k = stdscr.getch()
if k == curses.KEY_UP and cursor_y > 0:
cursor_y -= 1
elif k == curses.KEY_DOWN and cursor_y < len(menu) - 1:
cursor_y += 1
elif k in [curses.KEY_ENTER, 10, 13]:
if menu[cursor_y] == "Ajouter un anime":
handle_add(stdscr, conn)
elif menu[cursor_y] == "Lister les animes":
handle_list(stdscr, conn)
elif menu[cursor_y] == "Télécharger plusieurs épisodes":
handle_multi_download(stdscr, conn)
elif menu[cursor_y] == "Supprimer un anime":
handle_delete(stdscr, conn)
elif menu[cursor_y] == "Configuration":
handle_config(stdscr)
elif menu[cursor_y] == "Quitter":
break
def get_input(stdscr, prompt):
curses.curs_set(1)
stdscr.clear()
stdscr.addstr(0, 0, prompt)
stdscr.refresh()
win = curses.newwin(1, 60, 2, 0)
curses.echo()
value = win.getstr().decode('utf-8')
curses.noecho()
curses.curs_set(0)
return value.strip()
def popup_menu(stdscr, title, options, default_sel=0):
popup_height = len(options) + 4
popup_width = max(len(title), max(len(opt) for opt in options) + 4, 20) + 4
popup = curses.newwin(popup_height, popup_width,
(curses.LINES - popup_height) // 2,
(curses.COLS - popup_width) // 2)
popup.keypad(1)
popup.box()
sel = default_sel
while True:
popup.clear()
popup.box()
popup.addstr(1, 2, title)
for idx, option in enumerate(options):
if idx == sel:
popup.attron(curses.color_pair(1))
popup.addstr(idx + 3, 2, option)
popup.attroff(curses.color_pair(1))
else:
popup.addstr(idx + 3, 2, option)
popup.refresh()
k = popup.getch()
if k == curses.KEY_UP and sel > 0:
sel -= 1
elif k == curses.KEY_DOWN and sel < len(options) - 1:
sel += 1
elif k in [curses.KEY_ENTER, 10, 13]:
return sel
elif k in [27, ord('q')]:
return None
def handle_config(stdscr):
options = [
f"Vitesse max: {CONFIG['vitesse_max']} KB/s (0 = illimité)",
f"Téléchargements simultanés: {CONFIG['telechargements_simultanes']}"
]
sel = 0
while True:
stdscr.clear()
stdscr.addstr(0, 0, "Configuration du téléchargement:")
for idx, option in enumerate(options):
if idx == sel:
stdscr.attron(curses.color_pair(1))
stdscr.addstr(idx + 2, 2, option)
stdscr.attroff(curses.color_pair(1))
else:
stdscr.addstr(idx + 2, 2, option)
stdscr.addstr(len(options) + 3, 0, "Entrée pour modifier, 'q' pour quitter")
stdscr.refresh()
k = stdscr.getch()
if k == curses.KEY_UP and sel > 0:
sel -= 1
elif k == curses.KEY_DOWN and sel < len(options) - 1:
sel += 1
elif k in [curses.KEY_ENTER, 10, 13]:
if sel == 0:
speed_options = ["0 (Illimité)", "512", "1024", "2048", "4096"]
speed_idx = popup_menu(stdscr, "Limite de vitesse (KB/s):", speed_options)
if speed_idx is not None:
CONFIG["vitesse_max"] = int(speed_options[speed_idx].split()[0])
options[0] = f"Vitesse max: {CONFIG['vitesse_max']} KB/s (0 = illimité)"
elif sel == 1:
sim_options = ["1", "2", "3", "4"]
sim_idx = popup_menu(stdscr, "Téléchargements simultanés :", sim_options)
if sim_idx is not None:
CONFIG["telechargements_simultanes"] = int(sim_options[sim_idx])
options[1] = f"Téléchargements simultanés: {CONFIG['telechargements_simultanes']}"
elif k in [ord('q'), ord('Q')]:
break
def handle_add(stdscr, conn):
titre = get_input(stdscr, "Titre de l'anime: ")
url = get_input(stdscr, "URL de la page d'épisodes: ")
saison = get_input(stdscr, "Numéro de saison (ex: 1): ")
try:
saison = int(saison)
except Exception:
saison = 1
success, msg = add_anime(conn, titre, url, saison)
stdscr.clear()
stdscr.addstr(0, 0, msg)
stdscr.addstr(2, 0, "Appuyez sur une touche pour revenir au menu.")
stdscr.getch()
def handle_list(stdscr, conn):
stdscr.clear()
animes = get_all_animes(conn)
if not animes:
stdscr.addstr(0, 0, "Aucun anime dans la liste.")
else:
stdscr.addstr(0, 0, "Animes suivis:")
for idx, (_, titre, _, saison, dernier) in enumerate(animes, start=1):
stdscr.addstr(idx, 2, f"- {titre} ({format_dernier(dernier, saison)})")
stdscr.addstr(len(animes) + 2, 0, "Appuyez sur une touche pour revenir au menu.")
stdscr.getch()
def handle_delete(stdscr, conn):
animes = get_all_animes(conn)
if not animes:
stdscr.clear()
stdscr.addstr(0, 0, "Aucun anime à supprimer.")
stdscr.addstr(2, 0, "Appuyez sur une touche pour revenir au menu.")
stdscr.getch()
return
sel = 0
while True:
stdscr.clear()
stdscr.addstr(0, 0, "Sélectionnez un anime à supprimer :")
for idx, (_, titre, _, saison, dernier) in enumerate(animes):
line = f"{titre} (Saison {saison}, dernier: {format_dernier(dernier, saison)})"
if idx == sel:
stdscr.attron(curses.color_pair(1))
stdscr.addstr(2 + idx, 2, line)
stdscr.attroff(curses.color_pair(1))
else:
stdscr.addstr(2 + idx, 2, line)
stdscr.refresh()
k = stdscr.getch()
if k == curses.KEY_UP and sel > 0:
sel -= 1
elif k == curses.KEY_DOWN and sel < len(animes) - 1:
sel += 1
elif k in [curses.KEY_ENTER, 10, 13]:
break
anime_id = animes[sel][0]
delete_anime(conn, anime_id)
stdscr.clear()
stdscr.addstr(0, 0, "Anime supprimé.")
stdscr.addstr(2, 0, "Appuyez sur une touche pour revenir au menu.")
stdscr.getch()
def main():
conn = init_db()
curses.wrapper(setup_and_run, conn)
def setup_and_run(stdscr, conn):
curses.start_color()
curses.init_pair(1, curses.COLOR_BLACK, curses.COLOR_CYAN)
curses_menu(stdscr, conn)
if __name__ == "__main__":
main()