Animecli/animecli.py

463 lines
17 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sqlite3
from pathlib import Path
import requests
from bs4 import BeautifulSoup
import yt_dlp
import curses
import re
from urllib.parse import urljoin
DB_DIR = Path.home() / ".animecli"
DB_PATH = DB_DIR / "animes.db"
DOWNLOAD_DIR = Path.home() / "MesAnimes"
CONFIG = {
"vitesse_max": 0, # 0 = illimité, sinon en Ko/s
}
def init_db():
DB_DIR.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(
"""
CREATE TABLE IF NOT EXISTS animes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
titre TEXT UNIQUE,
url TEXT,
saison INTEGER,
dernier_episode TEXT
)
"""
)
conn.commit()
return conn
def add_anime(conn, titre, url, saison):
c = conn.cursor()
try:
c.execute(
"INSERT INTO animes (titre, url, saison, dernier_episode) VALUES (?, ?, ?, ?)", (titre, url, saison, "")
)
conn.commit()
return True, f"Anime '{titre}' ajouté."
except sqlite3.IntegrityError:
return False, f"L'anime '{titre}' existe déjà."
def delete_anime(conn, anime_id):
c = conn.cursor()
c.execute("DELETE FROM animes WHERE id = ?", (anime_id,))
conn.commit()
def get_all_animes(conn):
c = conn.cursor()
return c.execute("SELECT id, titre, url, saison, dernier_episode FROM animes").fetchall()
def get_source_info(url):
ydl_opts = {
'quiet': True,
'skip_download': True,
'downloader_args': {'hls': ['--hls-use-mpegts']}
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
info = ydl.extract_info(url, download=False)
formats = info.get('formats', [])
if not formats:
return None
best = max(formats, key=lambda f: f.get('height', 0) or 0)
taille = best.get('filesize') or best.get('filesize_approx')
taille_mo = f"{taille // (1024*1024)} Mo" if taille else "?"
qualite = f"{best.get('height', '?')}p"
return {
'qualite': qualite,
'taille': taille_mo,
'title': info.get('title', ''),
'url': url
}
except Exception:
return None
def choisir_source(stdscr, sources_infos):
sel = 0
while True:
stdscr.clear()
stdscr.addstr(0, 0, "Choisissez la source pour cet épisode :")
for idx, info in enumerate(sources_infos):
line = f"{idx+1}. {info['url']} ({info['qualite']}, {info['taille']})"
if idx == sel:
stdscr.attron(curses.color_pair(1))
stdscr.addstr(2 + idx, 2, line)
stdscr.attroff(curses.color_pair(1))
else:
stdscr.addstr(2 + idx, 2, line)
stdscr.refresh()
k = stdscr.getch()
if k == curses.KEY_UP and sel > 0:
sel -= 1
elif k == curses.KEY_DOWN and sel < len(sources_infos) - 1:
sel += 1
elif k in [curses.KEY_ENTER, 10, 13]:
return sel # retourne l'index choisi
def extract_episode_sources(url_page):
resp = requests.get(url_page)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
episode_sources = []
script_tag = soup.find('script', src=re.compile(r"episodes\.js"))
if script_tag and "anime-sama.fr" in url_page:
js_src = script_tag.get('src')
js_url = urljoin(url_page, js_src)
js_resp = requests.get(js_url)
js_resp.raise_for_status()
all_eps = re.findall(r"var\s+eps\d+\s*=\s*\[([^\]]+)\]", js_resp.text)
for eps_block in all_eps:
urls = re.findall(r"'(https?://[^']+)'", eps_block)
if not episode_sources:
for _ in urls:
episode_sources.append([])
for idx, url in enumerate(urls):
if idx < len(episode_sources):
episode_sources[idx].append(url)
else:
links = [a.get("href") for a in soup.select("a.episode-link") if a.get("href")]
for url in links:
episode_sources.append([url])
return [(sources, None) for sources in episode_sources]
def format_dernier(dernier_url, saison):
m = re.search(r'[Ee](\d{1,3})', dernier_url or "")
ep = m.group(1) if m else "?"
return f"Saison {saison}, épisode {ep}, {dernier_url or 'aucun'}"
def download_anime(conn, stdscr, anime_id=None):
c = conn.cursor()
qualite = "1080p"
if anime_id:
rows = c.execute(
"SELECT id, titre, url, saison, dernier_episode FROM animes WHERE id = ?", (anime_id,)
).fetchall()
if not rows:
return False, "Anime introuvable."
else:
rows = c.execute(
"SELECT id, titre, url, saison, dernier_episode FROM animes"
).fetchall()
messages = []
for anime_id, titre_anime, url_page, saison, dernier in rows:
messages.append(f"--- Mise à jour '{titre_anime}' ---")
try:
episode_data = extract_episode_sources(url_page)
except Exception as e:
messages.append(f"Erreur récupération des sources: {e}")
continue
if not episode_data:
messages.append("Aucun lien d'épisode trouvé.")
continue
nouveaux = []
for sources, _ in episode_data:
if not sources:
continue
if dernier and dernier in sources:
nouveaux = []
else:
nouveaux.append(sources)
if not nouveaux:
messages.append("Pas de nouvel épisode.")
continue
messages.append(f"Nouveaux épisodes à télécharger ({len(nouveaux)}):")
base_folder = DOWNLOAD_DIR / titre_anime
# Préparer les infos de sources pour tous les épisodes
all_sources_infos = []
for sources in nouveaux:
infos = []
for src in sources:
info = get_source_info(src)
if info:
infos.append(info)
all_sources_infos.append(infos)
# Choix de la source pour tous les épisodes
chosen_idx = None
apply_all = False
for idx, sources_infos in enumerate(all_sources_infos):
if not sources_infos:
messages.append("Aucune info trouvée pour les sources")
continue
if not apply_all:
sel = choisir_source(stdscr, sources_infos)
chosen_idx = sel
# Proposer d'appliquer à tous
stdscr.clear()
stdscr.addstr(0, 0, "Utiliser ce choix pour tous les épisodes suivants ? (o/n)")
stdscr.refresh()
k = stdscr.getch()
if k in [ord('o'), ord('O')]:
apply_all = True
else:
sel = chosen_idx
chosen_url = sources_infos[sel]['url']
saison_str = f"S{int(saison):02d}"
ep_str = f"E{idx+1:02d}"
saison_folder = base_folder / f"Saison {int(saison):02d}"
saison_folder.mkdir(parents=True, exist_ok=True)
filename = f"{titre_anime.replace(' ', '_')} - {saison_str}{ep_str}.%(ext)s"
ydl_opts = {
"outtmpl": str(saison_folder / filename),
"format": f"bestvideo[height<={qualite.rstrip('p')}]+bestaudio/best",
"downloader_args": {"hls": ["--hls-use-mpegts"]}
}
if CONFIG["vitesse_max"] > 0:
ydl_opts['ratelimit'] = CONFIG["vitesse_max"] * 1024
curses.endwin()
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
print(f"Téléchargement: {chosen_url}")
ydl.download([chosen_url])
dernier = chosen_url
except Exception as e:
print(f"Erreur download {chosen_url}: {e}")
stdscr.refresh()
c.execute(
"UPDATE animes SET dernier_episode = ? WHERE id = ?", (dernier, anime_id)
)
conn.commit()
messages.append(f"Mise à jour terminée '{titre_anime}'.")
return True, messages
def curses_menu(stdscr, conn):
curses.curs_set(0)
cursor_y = 2
menu = [
"Ajouter un anime",
"Lister les animes",
"Télécharger épisodes",
"Supprimer un anime",
"Configuration",
"Quitter"]
while True:
stdscr.clear()
stdscr.addstr(0, 2, "animecli - Gestion d'animes (Terminal GUI)", curses.A_BOLD)
for idx, item in enumerate(menu):
x = 4
y = 2 + idx
if y == cursor_y:
stdscr.attron(curses.color_pair(1))
stdscr.addstr(y, x, item)
stdscr.attroff(curses.color_pair(1))
else:
stdscr.addstr(y, x, item)
stdscr.refresh()
k = stdscr.getch()
if k == curses.KEY_UP and cursor_y > 2:
cursor_y -= 1
elif k == curses.KEY_DOWN and cursor_y < 2 + len(menu) - 1:
cursor_y += 1
elif k in [curses.KEY_ENTER, 10, 13]:
choice = cursor_y - 2
if menu[choice] == "Ajouter un anime":
handle_add(stdscr, conn)
elif menu[choice] == "Lister les animes":
handle_list(stdscr, conn)
elif menu[choice] == "Télécharger épisodes":
handle_download(stdscr, conn)
elif menu[choice] == "Supprimer un anime":
handle_delete(stdscr, conn)
elif menu[choice] == "Configuration":
handle_config(stdscr)
elif menu[choice] == "Quitter":
break
def get_input(stdscr, prompt):
curses.curs_set(1)
stdscr.clear()
stdscr.addstr(0, 0, prompt)
stdscr.refresh()
win = curses.newwin(1, 60, 2, 0)
curses.echo()
value = win.getstr().decode('utf-8')
curses.noecho()
curses.curs_set(0)
return value.strip()
def popup_menu(stdscr, title, options, default_sel=0):
popup_height = len(options) + 4
popup_width = max(len(title), max(len(opt) for opt in options) + 4, 20) + 4
popup = curses.newwin(popup_height, popup_width,
(curses.LINES - popup_height) // 2,
(curses.COLS - popup_width) // 2)
popup.keypad(1)
popup.box()
sel = default_sel
while True:
popup.clear()
popup.box()
popup.addstr(1, 2, title)
for idx, option in enumerate(options):
if idx == sel:
popup.attron(curses.color_pair(1))
popup.addstr(idx + 3, 2, option)
popup.attroff(curses.color_pair(1))
else:
popup.addstr(idx + 3, 2, option)
popup.refresh()
k = popup.getch()
if k == curses.KEY_UP and sel > 0:
sel -= 1
elif k == curses.KEY_DOWN and sel < len(options) - 1:
sel += 1
elif k in [curses.KEY_ENTER, 10, 13]:
return sel
elif k in [27, ord('q')]:
return None
def handle_config(stdscr):
options = [
f"Vitesse max: {CONFIG['vitesse_max']} KB/s (0 = illimité)",
f"Téléchargements simultanés: {CONFIG['telechargements_simultanes']}"
]
sel = 0
while True:
stdscr.clear()
stdscr.addstr(0, 0, "Configuration du téléchargement:")
for idx, option in enumerate(options):
if idx == sel:
stdscr.attron(curses.color_pair(1))
stdscr.addstr(idx + 2, 2, option)
stdscr.attroff(curses.color_pair(1))
else:
stdscr.addstr(idx + 2, 2, option)
stdscr.addstr(len(options) + 3, 0, "Entrée pour modifier, 'q' pour quitter")
stdscr.refresh()
k = stdscr.getch()
if k == curses.KEY_UP and sel > 0:
sel -= 1
elif k == curses.KEY_DOWN and sel < len(options) - 1:
sel += 1
elif k in [curses.KEY_ENTER, 10, 13]:
if sel == 0:
speed_options = ["0 (Illimité)", "512", "1024", "2048", "4096"]
speed_idx = popup_menu(stdscr, "Limite de vitesse (KB/s):", speed_options)
if speed_idx is not None:
CONFIG["vitesse_max"] = int(speed_options[speed_idx].split()[0])
options[0] = f"Vitesse max: {CONFIG['vitesse_max']} KB/s (0 = illimité)"
elif k in [ord('q'), ord('Q')]:
break
def handle_add(stdscr, conn):
titre = get_input(stdscr, "Titre de l'anime: ")
url = get_input(stdscr, "URL de la page d'épisodes: ")
saison = get_input(stdscr, "Numéro de saison (ex: 1): ")
try:
saison = int(saison)
except Exception:
saison = 1
success, msg = add_anime(conn, titre, url, saison)
stdscr.clear()
stdscr.addstr(0, 0, msg)
stdscr.addstr(2, 0, "Appuyez sur une touche pour revenir au menu.")
stdscr.getch()
def handle_list(stdscr, conn):
stdscr.clear()
animes = get_all_animes(conn)
if not animes:
stdscr.addstr(0, 0, "Aucun anime dans la liste.")
else:
stdscr.addstr(0, 0, "Animes suivis:")
for idx, (_, titre, _, saison, dernier) in enumerate(animes, start=1):
stdscr.addstr(idx, 2, f"- {titre} ({format_dernier(dernier, saison)})")
stdscr.addstr(len(animes) + 2, 0, "Appuyez sur une touche pour revenir au menu.")
stdscr.getch()
def handle_delete(stdscr, conn):
animes = get_all_animes(conn)
if not animes:
stdscr.clear()
stdscr.addstr(0, 0, "Aucun anime à supprimer.")
stdscr.addstr(2, 0, "Appuyez sur une touche pour revenir au menu.")
stdscr.getch()
return
sel = 0
while True:
stdscr.clear()
stdscr.addstr(0, 0, "Sélectionnez un anime à supprimer :")
for idx, (_, titre, _, saison, dernier) in enumerate(animes):
line = f"{titre} (Saison {saison}, dernier: {format_dernier(dernier, saison)})"
if idx == sel:
stdscr.attron(curses.color_pair(1))
stdscr.addstr(2 + idx, 2, line)
stdscr.attroff(curses.color_pair(1))
else:
stdscr.addstr(2 + idx, 2, line)
stdscr.refresh()
k = stdscr.getch()
if k == curses.KEY_UP and sel > 0:
sel -= 1
elif k == curses.KEY_DOWN and sel < len(animes) - 1:
sel += 1
elif k in [curses.KEY_ENTER, 10, 13]:
break
anime_id = animes[sel][0]
delete_anime(conn, anime_id)
stdscr.clear()
stdscr.addstr(0, 0, "Anime supprimé.")
stdscr.addstr(2, 0, "Appuyez sur une touche pour revenir au menu.")
stdscr.getch()
def handle_download(stdscr, conn):
animes = get_all_animes(conn)
options = ["Tous les animes"] + [titre for (_, titre, _, _, _) in animes]
sel = 0
while True:
stdscr.clear()
stdscr.addstr(0, 0, "Sélectionnez un anime pour mise à jour:")
for idx, title in enumerate(options):
if idx == sel:
stdscr.attron(curses.color_pair(1))
stdscr.addstr(2 + idx, 2, title)
stdscr.attroff(curses.color_pair(1))
else:
stdscr.addstr(2 + idx, 2, title)
stdscr.refresh()
k = stdscr.getch()
if k == curses.KEY_UP and sel > 0:
sel -= 1
elif k == curses.KEY_DOWN and sel < len(options) - 1:
sel += 1
elif k in [curses.KEY_ENTER, 10, 13]:
break
if sel == 0:
anime_id = None
else:
anime_id = animes[sel - 1][0]
stdscr.clear()
stdscr.addstr(0, 0, "Lancement du téléchargement...")
stdscr.refresh()
success, messages = download_anime(conn, stdscr, anime_id)
stdscr.clear()
if not success:
stdscr.addstr(0, 0, messages)
else:
for idx, line in enumerate(messages, start=0):
if idx >= curses.LINES - 2:
stdscr.addstr(curses.LINES - 1, 0, "--Plus de lignes--")
break
stdscr.addstr(idx, 0, line)
stdscr.addstr(curses.LINES - 1, 0, "Appuyez sur une touche pour revenir au menu.")
stdscr.getch()
def main():
conn = init_db()
curses.wrapper(setup_and_run, conn)
def setup_and_run(stdscr, conn):
curses.start_color()
curses.init_pair(1, curses.COLOR_BLACK, curses.COLOR_CYAN)
curses_menu(stdscr, conn)
if __name__ == "__main__":
main()