Python requests 下载文件的几种常用方法
1. 基础下载:
import requests
def download_file(url, save_path):
response = requests.get(url, stream=True)
if response.status_code == 200:
with open(save_path, 'wb') as f:
f.write(response.content)
return True
return False
# 使用示例
url = "https://example.com/file.pdf"
download_file(url, "file.pdf")
2. 大文件分块下载:
import requests
from tqdm import tqdm
def download_large_file(url, save_path):
response = requests.get(url, stream=True)
if response.status_code == 200:
file_size = int(response.headers.get('content-length', 0))
# 显示进度条
progress = tqdm(response.iter_content(chunk_size=8192),
total=file_size,
unit='B',
unit_scale=True)
with open(save_path, 'wb') as f:
for data in progress:
f.write(data)
return True
return False
3. 带有断点续传的下载:
import requests
import os
def resume_download(url, save_path):
# 获取已下载文件大小
initial_pos = os.path.getsize(save_path) if os.path.exists(save_path) else 0
# 设置 Header
headers = {'Range': f'bytes={initial_pos}-'}
response = requests.get(url, stream=True, headers=headers)
# 追加模式打开文件
mode = 'ab' if initial_pos > 0 else 'wb'
with open(save_path, mode) as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
4. 带有超时和重试的下载:
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import time
def download_with_retry(url, save_path, max_retries=3, timeout=30):
session = requests.Session()
# 设置重试策略
retries = Retry(total=max_retries,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504])
session.mount('http://', HTTPAdapter(max_retries=retries))
session.mount('https://', HTTPAdapter(max_retries=retries))
try:
response = session.get(url, stream=True, timeout=timeout)
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return True
except Exception as e:
print(f"Download failed: {str(e)}")
return False
5. 完整的下载器实现:
import requests
from tqdm import tqdm
import os
from pathlib import Path
import hashlib
class FileDownloader:
def __init__(self, chunk_size=8192):
self.chunk_size = chunk_size
self.session = requests.Session()
def get_file_size(self, url):
response = self.session.head(url)
return int(response.headers.get('content-length', 0))
def get_file_hash(self, file_path):
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
def download(self, url, save_path, verify_hash=None):
save_path = Path(save_path)
# 创建目录
save_path.parent.mkdir(parents=True, exist_ok=True)
# 获取文件大小
file_size = self.get_file_size(url)
# 设置进度条
progress = tqdm(total=file_size,
unit='B',
unit_scale=True,
desc=save_path.name)
try:
response = self.session.get(url, stream=True)
with save_path.open('wb') as f:
for chunk in response.iter_content(chunk_size=self.chunk_size):
if chunk:
f.write(chunk)
progress.update(len(chunk))
progress.close()
# 验证文件完整性
if verify_hash:
downloaded_hash = self.get_file_hash(save_path)
if downloaded_hash != verify_hash:
raise ValueError("File hash verification failed")
return True
except Exception as e:
progress.close()
print(f"Download failed: {str(e)}")
if save_path.exists():
save_path.unlink()
return False
def download_multiple(self, url_list, save_dir):
results = []
for url in url_list:
filename = url.split('/')[-1]
save_path = Path(save_dir) / filename
success = self.download(url, save_path)
results.append({
'url': url,
'success': success,
'save_path': str(save_path)
})
return results
# 使用示例
downloader = FileDownloader()
# 单文件下载
url = "https://example.com/file.pdf"
downloader.download(url, "downloads/file.pdf")
# 多文件下载
urls = [
"https://example.com/file1.pdf",
"https://example.com/file2.pdf"
]
results = downloader.download_multiple(urls, "downloads")
作者:microhex