1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
| import httpx import threading from urllib.request import getproxies from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED from rich.progress import BarColumn, DownloadColumn, Progress, TextColumn, TimeRemainingColumn, TransferSpeedColumn from rich.console import Console from typing import Union, Tuple, Dict import os import re import hashlib import shutil
def parse_filename_from_url(url: str) -> Union[str, None]: match = re.search(r'/([^/]+\.\w+)(?:\?.*)?$', url) if match: return match.group(1) else: return None
def md5_encrypt(text: str) -> str: md5 = hashlib.md5() md5.update(text.encode('utf-8'))
return md5.hexdigest()
class Downloader: def __init__( self, url: str, n_workers: int = 8, dest_dir: str = "./", filename: Union[str, None] = None, headers: Union[Dict, None] = None, proxy: Union[str, None] = None, **other_http_params ): self.url = url self.n_workers = n_workers self.dest_dir = dest_dir
if filename is None: filename = parse_filename_from_url(self.url) if filename is None: filename = md5_encrypt(self.url) + ".bin" self.filename = filename
if headers is None: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/102.0.0.0 Safari/537.36", } self.headers = headers
if proxy is None: proxy = getproxies().get("http") self.proxy = proxy
self.other_http_params = other_http_params
self.lock = threading.Lock() self.console = Console() self.progress = Progress( TextColumn("[bold blue]Downloading[/bold blue]", justify="right"), BarColumn(bar_width=None), "[progress.percentage]{task.percentage:>3.1f}%", "•", DownloadColumn(), "•", TransferSpeedColumn(), "•", TimeRemainingColumn(), ) self.task = None
def download_chunk(self, headers: Dict[str, str], path: Union[None, str] = None) -> Union[None, bytes]: if path is not None: with httpx.stream( "GET", self.url, headers=headers, proxy=self.proxy, follow_redirects=True, timeout=30, **self.other_http_params ) as r: with open(path, "wb") as dest_file: for data in r.iter_bytes(): dest_file.write(data) with self.lock: self.progress.update(self.task, advance=len(data))
return None else: content = b'' with httpx.stream( "GET", self.url, headers=headers, proxy=self.proxy, follow_redirects=True, timeout=30, **self.other_http_params ) as r: for data in r.iter_bytes(): content += data with self.lock: self.progress.update(self.task, advance=len(data))
return content
def download(self): response = httpx.head( self.url, headers=self.headers, proxy=self.proxy, follow_redirects=True, timeout=30, **self.other_http_params ) total_size = int(response.headers['Content-Length'])
if ( "Accept-Ranges" in response.headers and response.headers["Accept-Ranges"].lower() == "bytes" and total_size > 10 * 1024 * 1024 ): chunk_size = total_size // self.n_workers splits = ( [(0, chunk_size)] + [(i * chunk_size + 1, (i + 1) * chunk_size) for i in range(1, self.n_workers-1)] + [(chunk_size * (self.n_workers-1) + 1, total_size)] )
if total_size > 100 * 1024 * 1024: with self.progress: self.task = self.progress.add_task("download", total=total_size, filename=self.filename) with ThreadPoolExecutor(max_workers=self.n_workers) as pool: all_task = [ pool.submit( self.download_chunk, self.add_range(split), os.path.join(self.dest_dir, f"{self.filename}_{i}") ) for (i, split) in enumerate(splits) ]
wait(all_task, return_when=ALL_COMPLETED) with open(os.path.join(self.dest_dir, self.filename), "wb") as concatenated_file: for i in range(self.n_workers): with open(os.path.join(self.dest_dir, f"{self.filename}_{i}"), "rb") as file: shutil.copyfileobj(file, concatenated_file) os.remove(os.path.join(self.dest_dir, f"{self.filename}_{i}"))
else: with self.progress: self.task = self.progress.add_task("download", total=total_size, filename=self.filename)
with ThreadPoolExecutor(max_workers=self.n_workers) as pool: all_task = [pool.submit(self.download_chunk, self.add_range(split), None) for split in splits]
content = b'' for task in all_task: content += task.result()
with open(os.path.join(self.dest_dir, self.filename), "wb") as f: f.write(content) else: with self.progress: self.task = self.progress.add_task("download", total=total_size, filename=self.filename) self.download_chunk(self.headers, os.path.join(self.dest_dir, self.filename))
def add_range(self, split: Tuple[int, int]) -> Dict[str, str]: headers = self.headers.copy() headers["Range"] = f"bytes={split[0]}-{split[1]}"
return headers
|