import sqlite3 from pathlib import Path import requests from bs4 import BeautifulSoup import yt_dlp import curses import re from urllib.parse import urljoin import concurrent.futures DB_DIR = Path.home() / ".animecli" DB_PATH = DB_DIR / "animes.db" DOWNLOAD_DIR = Path.home() / "MesAnimes" CONFIG = { "vitesse_max": 0, # 0 = illimité, sinon en Ko/s "telechargements_simultanes": 1 # Nombre de téléchargements simultanés } def init_db(): DB_DIR.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute( """ CREATE TABLE IF NOT EXISTS animes ( id INTEGER PRIMARY KEY AUTOINCREMENT, titre TEXT UNIQUE, url TEXT, saison INTEGER, dernier_episode TEXT ) """ ) conn.commit() return conn def add_anime(conn, titre, url, saison): c = conn.cursor() try: c.execute( "INSERT INTO animes (titre, url, saison, dernier_episode) VALUES (?, ?, ?, ?)", (titre, url, saison, "") ) conn.commit() return True, f"Anime '{titre}' ajouté." except sqlite3.IntegrityError: return False, f"L'anime '{titre}' existe déjà." def delete_anime(conn, anime_id): c = conn.cursor() c.execute("DELETE FROM animes WHERE id = ?", (anime_id,)) conn.commit() def get_all_animes(conn): c = conn.cursor() return c.execute("SELECT id, titre, url, saison, dernier_episode FROM animes").fetchall() def get_source_info(url): ydl_opts = { 'quiet': True, 'skip_download': True, 'downloader_args': {'hls': ['--hls-use-mpegts']} } with yt_dlp.YoutubeDL(ydl_opts) as ydl: try: info = ydl.extract_info(url, download=False) formats = info.get('formats', []) if not formats: return None best = max(formats, key=lambda f: f.get('height', 0) or 0) taille = best.get('filesize') or best.get('filesize_approx') taille_mo = f"{taille // (1024*1024)} Mo" if taille else "?" qualite = f"{best.get('height', '?')}p" return { 'qualite': qualite, 'taille': taille_mo, 'title': info.get('title', ''), 'url': url } except Exception: return None def choisir_source(stdscr, sources_infos): sel = 0 while True: stdscr.clear() stdscr.addstr(0, 0, "Choisissez la source pour cet épisode :") for idx, info in enumerate(sources_infos): line = f"{idx+1}. {info['url']} ({info['qualite']}, {info['taille']})" if idx == sel: stdscr.attron(curses.color_pair(1)) safe_addstr(stdscr, 2 + idx, 2, line) stdscr.attroff(curses.color_pair(1)) else: safe_addstr(stdscr, 2 + idx, 2, line) stdscr.refresh() k = stdscr.getch() if k == curses.KEY_UP and sel > 0: sel -= 1 elif k == curses.KEY_DOWN and sel < len(sources_infos) - 1: sel += 1 elif k in [curses.KEY_ENTER, 10, 13]: return sel # retourne l'index choisi def extract_episode_sources(url_page): resp = requests.get(url_page) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") episode_sources = [] script_tag = soup.find('script', src=re.compile(r"episodes\.js")) if script_tag and "anime-sama.fr" in url_page: js_src = script_tag.get('src') js_url = urljoin(url_page, js_src) js_resp = requests.get(js_url) js_resp.raise_for_status() all_eps = re.findall(r"var\s+eps\d+\s*=\s*\[([^\]]+)\]", js_resp.text) for eps_block in all_eps: urls = re.findall(r"'(https?://[^']+)'", eps_block) if not episode_sources: for _ in urls: episode_sources.append([]) for idx, url in enumerate(urls): if idx < len(episode_sources): episode_sources[idx].append(url) else: links = [a.get("href") for a in soup.select("a.episode-link") if a.get("href")] for url in links: episode_sources.append([url]) return [(sources, None) for sources in episode_sources] def format_dernier(dernier_url, saison): m = re.search(r'[Ee](\d{1,3})', dernier_url or "") ep = m.group(1) if m else "?" return f"Saison {saison}, épisode {ep}, {dernier_url or 'aucun'}" def telecharger_episode(url, saison_folder, filename, qualite): ydl_opts = { "outtmpl": str(saison_folder / filename), "format": f"bestvideo[height<={qualite.rstrip('p')}]+bestaudio/best", "downloader": "ffmpeg", "downloader_args": {"hls": ["--hls-use-mpegts"]} } if CONFIG["vitesse_max"] > 0: ydl_opts["ratelimit"] = CONFIG["vitesse_max"] * 1024 try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) return True, url except Exception as e: print(f"Erreur download {url}: {e}") return False, url def handle_multi_download(stdscr, conn): animes = get_all_animes(conn) if not animes: stdscr.clear() stdscr.addstr(0, 0, "Aucun anime disponible.") stdscr.addstr(2, 0, "Appuyez sur une touche pour revenir au menu.") stdscr.getch() return sel = 0 while True: stdscr.clear() stdscr.addstr(0, 0, "Sélectionnez un anime :") for idx, (_, titre, _, saison, dernier) in enumerate(animes): line = f"{titre} (Saison {saison}, dernier: {format_dernier(dernier, saison)})" if idx == sel: stdscr.attron(curses.color_pair(1)) safe_addstr(stdscr, 2 + idx, 2, line) stdscr.attroff(curses.color_pair(1)) else: safe_addstr(stdscr, 2 + idx, 2, line) stdscr.refresh() k = stdscr.getch() if k == curses.KEY_UP and sel > 0: sel -= 1 elif k == curses.KEY_DOWN and sel < len(animes) - 1: sel += 1 elif k in [curses.KEY_ENTER, 10, 13]: break anime_id, titre, url_page, saison, dernier = animes[sel] try: episode_data = extract_episode_sources(url_page) except Exception as e: stdscr.clear() stdscr.addstr(0, 0, f"Erreur récupération des sources: {e}") stdscr.getch() return if not episode_data: stdscr.clear() stdscr.addstr(0, 0, "Aucun épisode trouvé.") stdscr.getch() return selected = [False] * len(episode_data) cursor = 0 while True: stdscr.clear() stdscr.addstr(0, 0, "Sélectionnez les épisodes à télécharger (Espace pour cocher, Entrée pour valider) :") for idx, (sources, _) in enumerate(episode_data): mark = "[X]" if selected[idx] else "[ ]" line = f"{mark} Épisode {idx+1}" if idx == cursor: stdscr.attron(curses.color_pair(1)) safe_addstr(stdscr, 2 + idx, 2, line) stdscr.attroff(curses.color_pair(1)) else: safe_addstr(stdscr, 2 + idx, 2, line) stdscr.refresh() k = stdscr.getch() if k == curses.KEY_UP and cursor > 0: cursor -= 1 elif k == curses.KEY_DOWN and cursor < len(episode_data) - 1: cursor += 1 elif k == ord(' '): selected[cursor] = not selected[cursor] elif k in [curses.KEY_ENTER, 10, 13]: break to_download = [i for i, sel_ in enumerate(selected) if sel_] if not to_download: stdscr.clear() stdscr.addstr(0, 0, "Aucun épisode sélectionné.") stdscr.getch() return qualite = "1080p" base_folder = DOWNLOAD_DIR / titre download_queue = [] for idx in to_download: sources, _ = episode_data[idx] infos = [] for src in sources: info = get_source_info(src) if info: infos.append(info) if not infos: continue sel_src = choisir_source(stdscr, infos) if len(infos) > 1 else 0 chosen_url = infos[sel_src]['url'] saison_str = f"S{int(saison):02d}" ep_str = f"E{idx+1:02d}" saison_folder = base_folder / f"Saison {int(saison):02d}" saison_folder.mkdir(parents=True, exist_ok=True) filename = f"{titre.replace(' ', '_')} - {saison_str}{ep_str}.%(ext)s" download_queue.append((chosen_url, saison_folder, filename, qualite)) curses.endwin() with concurrent.futures.ThreadPoolExecutor(max_workers=CONFIG.get("telechargements_simultanes", 2)) as executor: futures = [ executor.submit(telecharger_episode, url, folder, fname, qualite) for url, folder, fname, qualite in download_queue ] for future in concurrent.futures.as_completed(futures): success, url = future.result() if success: print(f"Téléchargement réussi: {url}") else: print(f"Echec téléchargement: {url}") def curses_menu(stdscr, conn): curses.curs_set(0) menu = [ "Ajouter un anime", "Lister les animes", "Télécharger plusieurs épisodes", "Supprimer un anime", "Configuration", "Quitter" ] cursor_y = 0 while True: stdscr.clear() stdscr.addstr(0, 2, "animecli - Gestion d'animes", curses.A_BOLD) for idx, item in enumerate(menu): x = 4 y = 2 + idx if y - 2 == cursor_y: stdscr.attron(curses.color_pair(1)) stdscr.addstr(y, x, item) stdscr.attroff(curses.color_pair(1)) else: stdscr.addstr(y, x, item) stdscr.refresh() k = stdscr.getch() if k == curses.KEY_UP and cursor_y > 0: cursor_y -= 1 elif k == curses.KEY_DOWN and cursor_y < len(menu) - 1: cursor_y += 1 elif k in [curses.KEY_ENTER, 10, 13]: if menu[cursor_y] == "Ajouter un anime": handle_add(stdscr, conn) elif menu[cursor_y] == "Lister les animes": handle_list(stdscr, conn) elif menu[cursor_y] == "Télécharger plusieurs épisodes": handle_multi_download(stdscr, conn) elif menu[cursor_y] == "Supprimer un anime": handle_delete(stdscr, conn) elif menu[cursor_y] == "Configuration": handle_config(stdscr) elif menu[cursor_y] == "Quitter": break def get_input(stdscr, prompt): curses.curs_set(1) stdscr.clear() stdscr.addstr(0, 0, prompt) stdscr.refresh() win = curses.newwin(1, 60, 2, 0) curses.echo() value = win.getstr().decode('utf-8') curses.noecho() curses.curs_set(0) return value.strip() def popup_menu(stdscr, title, options, default_sel=0): popup_height = len(options) + 4 popup_width = max(len(title), max(len(opt) for opt in options) + 4, 20) + 4 popup = curses.newwin(popup_height, popup_width, (curses.LINES - popup_height) // 2, (curses.COLS - popup_width) // 2) popup.keypad(1) popup.box() sel = default_sel while True: popup.clear() popup.box() popup.addstr(1, 2, title) for idx, option in enumerate(options): if idx == sel: popup.attron(curses.color_pair(1)) popup.addstr(idx + 3, 2, option) popup.attroff(curses.color_pair(1)) else: popup.addstr(idx + 3, 2, option) popup.refresh() k = popup.getch() if k == curses.KEY_UP and sel > 0: sel -= 1 elif k == curses.KEY_DOWN and sel < len(options) - 1: sel += 1 elif k in [curses.KEY_ENTER, 10, 13]: return sel elif k in [27, ord('q')]: return None def handle_config(stdscr): options = [ f"Vitesse max: {CONFIG['vitesse_max']} KB/s (0 = illimité)", f"Téléchargements simultanés: {CONFIG['telechargements_simultanes']}" ] sel = 0 while True: stdscr.clear() stdscr.addstr(0, 0, "Configuration du téléchargement:") for idx, option in enumerate(options): if idx == sel: stdscr.attron(curses.color_pair(1)) stdscr.addstr(idx + 2, 2, option) stdscr.attroff(curses.color_pair(1)) else: stdscr.addstr(idx + 2, 2, option) stdscr.addstr(len(options) + 3, 0, "Entrée pour modifier, 'q' pour quitter") stdscr.refresh() k = stdscr.getch() if k == curses.KEY_UP and sel > 0: sel -= 1 elif k == curses.KEY_DOWN and sel < len(options) - 1: sel += 1 elif k in [curses.KEY_ENTER, 10, 13]: if sel == 0: speed_options = ["0 (Illimité)", "512", "1024", "2048", "4096"] speed_idx = popup_menu(stdscr, "Limite de vitesse (KB/s):", speed_options) if speed_idx is not None: CONFIG["vitesse_max"] = int(speed_options[speed_idx].split()[0]) options[0] = f"Vitesse max: {CONFIG['vitesse_max']} KB/s (0 = illimité)" elif sel == 1: sim_options = ["1", "2", "3", "4"] sim_idx = popup_menu(stdscr, "Téléchargements simultanés :", sim_options) if sim_idx is not None: CONFIG["telechargements_simultanes"] = int(sim_options[sim_idx]) options[1] = f"Téléchargements simultanés: {CONFIG['telechargements_simultanes']}" elif k in [ord('q'), ord('Q')]: break def handle_add(stdscr, conn): titre = get_input(stdscr, "Titre de l'anime: ") url = get_input(stdscr, "URL de la page d'épisodes: ") saison = get_input(stdscr, "Numéro de saison (ex: 1): ") try: saison = int(saison) except Exception: saison = 1 success, msg = add_anime(conn, titre, url, saison) stdscr.clear() stdscr.addstr(0, 0, msg) stdscr.addstr(2, 0, "Appuyez sur une touche pour revenir au menu.") stdscr.getch() def handle_list(stdscr, conn): stdscr.clear() animes = get_all_animes(conn) if not animes: stdscr.addstr(0, 0, "Aucun anime dans la liste.") else: stdscr.addstr(0, 0, "Animes suivis:") for idx, (_, titre, _, saison, dernier) in enumerate(animes, start=1): stdscr.addstr(idx, 2, f"- {titre} ({format_dernier(dernier, saison)})") stdscr.addstr(len(animes) + 2, 0, "Appuyez sur une touche pour revenir au menu.") stdscr.getch() def handle_delete(stdscr, conn): animes = get_all_animes(conn) if not animes: stdscr.clear() stdscr.addstr(0, 0, "Aucun anime à supprimer.") stdscr.addstr(2, 0, "Appuyez sur une touche pour revenir au menu.") stdscr.getch() return sel = 0 while True: stdscr.clear() stdscr.addstr(0, 0, "Sélectionnez un anime à supprimer :") for idx, (_, titre, _, saison, dernier) in enumerate(animes): line = f"{titre} (Saison {saison}, dernier: {format_dernier(dernier, saison)})" if idx == sel: stdscr.attron(curses.color_pair(1)) safe_addstr(stdscr, 2 + idx, 2, line) stdscr.attroff(curses.color_pair(1)) else: safe_addstr(stdscr, 2 + idx, 2, line) stdscr.refresh() k = stdscr.getch() if k == curses.KEY_UP and sel > 0: sel -= 1 elif k == curses.KEY_DOWN and sel < len(animes) - 1: sel += 1 elif k in [curses.KEY_ENTER, 10, 13]: break anime_id = animes[sel][0] delete_anime(conn, anime_id) stdscr.clear() stdscr.addstr(0, 0, "Anime supprimé.") stdscr.addstr(2, 0, "Appuyez sur une touche pour revenir au menu.") stdscr.getch() def safe_addstr(stdscr, y, x, text): max_y, max_x = stdscr.getmaxyx() if 0 <= y < max_y: stdscr.addstr(y, x, text[:max_x - x - 1]) def main(): conn = init_db() curses.wrapper(setup_and_run, conn) def setup_and_run(stdscr, conn): curses.start_color() curses.init_pair(1, curses.COLOR_BLACK, curses.COLOR_CYAN) curses_menu(stdscr, conn) if __name__ == "__main__": main()