everything works, needs makeup though
This commit is contained in:
parent
5130450355
commit
464683e9ce
14 changed files with 377 additions and 0 deletions
2
src/torrent_downloader/__init__.py
Normal file
2
src/torrent_downloader/__init__.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
def hello() -> str:
|
||||
return "Hello from torrent-downloader!"
|
3
src/torrent_downloader/__main__.py
Normal file
3
src/torrent_downloader/__main__.py
Normal file
|
@ -0,0 +1,3 @@
|
|||
from .client import main
|
||||
|
||||
main()
|
48
src/torrent_downloader/app.py
Normal file
48
src/torrent_downloader/app.py
Normal file
|
@ -0,0 +1,48 @@
|
|||
import urllib.parse
|
||||
import transmission_rpc
|
||||
from fastapi import Depends, FastAPI, Request
|
||||
from fastapi.responses import HTMLResponse
|
||||
from fastapi.templating import Jinja2Templates
|
||||
|
||||
from torrent_downloader.client import TorrentDownloader
|
||||
|
||||
app = FastAPI()
|
||||
templates = Jinja2Templates(directory="templates")
|
||||
|
||||
|
||||
def get_downloader() -> TorrentDownloader:
|
||||
return TorrentDownloader()
|
||||
|
||||
|
||||
def get_active_torrents(downloader=get_downloader()) -> list[transmission_rpc.Torrent]:
|
||||
return [t for t in downloader.get_active_torrents() if t.format_eta() != "not available"]
|
||||
|
||||
|
||||
@app.get("/", response_class=HTMLResponse)
|
||||
async def index(request: Request):
|
||||
return templates.TemplateResponse(request=request, name="index.html")
|
||||
|
||||
|
||||
@app.get("/list", response_class=HTMLResponse)
|
||||
async def list_torrents(request: Request, query: str, downloader=Depends(get_downloader)):
|
||||
torrents = downloader.search(query)
|
||||
return templates.TemplateResponse(
|
||||
request=request, name="torrents.html", context={"torrents": torrents}
|
||||
)
|
||||
|
||||
|
||||
@app.get("/download", response_class=HTMLResponse)
|
||||
async def download(request: Request, downloader=Depends(get_downloader)):
|
||||
magnet = urllib.parse.unquote(str(request.query_params))
|
||||
downloader.download_magnet(magnet)
|
||||
active = downloader.get_active_torrents()
|
||||
return templates.TemplateResponse(
|
||||
request=request, name="active_torrents.html", context={"torrents": active}
|
||||
)
|
||||
|
||||
|
||||
@app.get("/active", response_class=HTMLResponse)
|
||||
async def active(request: Request, active=Depends(get_active_torrents)):
|
||||
return templates.TemplateResponse(
|
||||
request=request, name="active_torrents.html", context={"torrents": active}
|
||||
)
|
71
src/torrent_downloader/client.py
Normal file
71
src/torrent_downloader/client.py
Normal file
|
@ -0,0 +1,71 @@
|
|||
from pydantic import SecretStr
|
||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
from tpblite import CATEGORIES, ORDERS, TPB
|
||||
from tpblite.models.torrents import Torrent as TpbTorrent, Torrents
|
||||
import transmission_rpc
|
||||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
model_config = SettingsConfigDict(env_prefix="TD_", case_sensitive=False)
|
||||
|
||||
tpb_site: str = "https://tpb.party"
|
||||
transmission_host: str = "192.168.0.29"
|
||||
transmission_port: int = 9091
|
||||
transmission_user: str = "transmission"
|
||||
transmission_pass: SecretStr = SecretStr("1304c2ea35e4e3f685691998c67ab58e20cf5d29CLC0a8Wn")
|
||||
|
||||
|
||||
class TorrentDownloader:
|
||||
def __init__(self, config: Settings = Settings()):
|
||||
self.tpb = TPB(config.tpb_site)
|
||||
self.transmission = transmission_rpc.Client(
|
||||
host=config.transmission_host,
|
||||
port=config.transmission_port,
|
||||
username=config.transmission_user,
|
||||
password=config.transmission_pass.get_secret_value(),
|
||||
)
|
||||
|
||||
def search(self, term: str, category: int = CATEGORIES.ALL) -> Torrents:
|
||||
return self.tpb.search(
|
||||
term,
|
||||
page=1,
|
||||
order=ORDERS.SEEDERS.DES,
|
||||
category=category,
|
||||
)
|
||||
|
||||
def download(self, torrent: TpbTorrent):
|
||||
if torrent is not None and torrent.magnetlink:
|
||||
self.transmission.add_torrent(torrent.magnetlink)
|
||||
|
||||
def download_magnet(self, magnetlink: str):
|
||||
self.transmission.add_torrent(magnetlink)
|
||||
|
||||
def get_active_torrents(self) -> list[transmission_rpc.Torrent]:
|
||||
active, _ = self.transmission.get_recently_active_torrents()
|
||||
return active
|
||||
|
||||
|
||||
def main():
|
||||
config = Settings()
|
||||
td = TorrentDownloader(config)
|
||||
|
||||
initial = input("Search torrent: ")
|
||||
search_result: Torrents = td.search(initial, CATEGORIES.ALL)
|
||||
|
||||
for idx, item in enumerate(search_result):
|
||||
print(f"{idx+1}. {item}")
|
||||
|
||||
try:
|
||||
choice = int(input("Choose your destiny: "))
|
||||
if 30 > int(choice) > 0:
|
||||
torrent = search_result[choice - 1]
|
||||
else:
|
||||
print("Wrong destiny :( getting best torrent by hand")
|
||||
torrent = search_result.getBestTorrent(min_seeds=10)
|
||||
except Exception as _:
|
||||
torrent = search_result.getBestTorrent(min_seeds=10)
|
||||
|
||||
if torrent is not None:
|
||||
td.download(torrent)
|
||||
else:
|
||||
print("Cannot find good enough torrent :D")
|
4
src/torrent_downloader/server.py
Normal file
4
src/torrent_downloader/server.py
Normal file
|
@ -0,0 +1,4 @@
|
|||
import uvicorn
|
||||
|
||||
if __name__ == "__main__":
|
||||
uvicorn.run("torrent_downloader.app:app", host="0.0.0.0", port=8000, reload=True)
|
Loading…
Add table
Add a link
Reference in a new issue