#!/usr/bin/env python3
import json
import logging
import os
import requests
import re
import shutil
from tqdm import tqdm
from urllib.parse import quote_plus
logging.basicConfig(format="[%(asctime)s] %(levelname)s: %(funcName)s(%(lineno)s): %(message)s", level=logging.DEBUG)
L = logging.getLogger(__name__)
session = requests.Session()
session.headesr = {'User-Agent', 'Teeworlds Archiver/0.0'}
def _ctqdm(iterable, chunk_size, **kwargs):
t = tqdm(**kwargs)
for chunk in iterable:
yield chunk
t.update(chunk_size)
def _dl(url, fn, retries=6):
try:
with open(fn, 'wb') as f:
L.info(f"Downloading {url.rpartition('/')[2]}…")
resp = session.get(url, stream=True)
total_size = int(resp.headers.get('Content-Length', 0))
chunk_size = 128*1024
for chunk in _ctqdm(resp.iter_content(chunk_size=chunk_size),
chunk_size=chunk_size, total=total_size, unit='B',
unit_scale=True):
f.write(chunk)
except requests.exceptions.ConnectionError:
if retries == 0:
raise
_dl(url, fn, retries-1)
def _retried_get(url, retries=6):
try:
return session.get(url)
except requests.exceptions.ConnectionError:
if retries == 0:
raise
_retried_get(url, retries-1)
def _meta(data, fn):
with open(fn, 'w') as f:
json.dump(data, f)
def _write_index(fn, files):
files = "\n".join(f"""
{path.rpartition('/')[2]}""" for path in files)
with open(fn, 'w') as f:
f.write(f"""
Folder
""")
def _make_storage_path(storage_dir, id):
path = os.path.join(storage_dir, id)
os.mkdir(path)
return path
def _meta_storage_path(storage_dir, id):
path = os.path.join(storage_dir, id)
path_meta = os.path.join(path, 'meta.json')
return path_meta
def _file_storage_path(storage_dir, meta):
if meta.get('status') == 'not_found':
return None
path = os.path.join(storage_dir, meta['id'])
path_file = os.path.join(path, meta['nodeName'].replace('/', '_'))
return path_file
ID_CHARS = '[A-Za-z0-9_-]+'
re_folder = re.compile(rf'solidfiles\.com/folder/(?P{ID_CHARS})(?:/|$)')
re_folder_entry = re.compile(rf'')
re_file = re.compile(rf'solidfiles\.com/d/(?P{ID_CHARS})(?:/|$)')
re_image = re.compile(rf'(?:solidfiles\.com/i|i.solidfiles\.com)/(?P{ID_CHARS})\.')
def download(url, storage_dir):
m = re_folder.search(url)
if m:
id = m.group('id')
return download_folder(id, storage_dir)
m = re_file.search(url) or re_image.search(url)
if m:
id = m.group('id')
ret = download_by_id(id, storage_dir)
return (ret,)
raise ValueError(f"Unsupported URL: {url}")
def download_folder(id, storage_dir):
url = f'http://solidfiles.com/folder/{id}/'
path_meta = _meta_storage_path(storage_dir, id)
if os.path.exists(path_meta):
L.info(f"Folder exists: {url}")
with open(path_meta) as f:
meta = json.load(f)
if meta.get('status') == 'not_found':
return ((id, meta, None, path_meta),)
file_ids = meta['files']
else:
_make_storage_path(storage_dir, id)
resp = _retried_get(url)
if resp.status_code == 404:
L.info(f"404: {url}")
meta = {
'id': id,
'status': 'not_found',
'folder': True,
}
_meta(meta, path_meta)
return ((id, meta, None, path_meta),)
if resp.status_code != 200:
import pdb; pdb.set_trace()
file_ids = [file_id for file_id in re_folder_entry.findall(resp.text)]
#
viewer_options_line = next(l for l in resp.text.splitlines() if 'viewerOptions' in l)
viewer_options_text = viewer_options_line.partition("viewerOptions', ")[2]
viewer_options_text = viewer_options_text.rpartition(')')[0]
meta = json.loads(viewer_options_text)
meta['id'] = id
meta['folder'] = True
meta['files'] = file_ids
meta['nodeName'] = 'index.html'
path_file = _file_storage_path(storage_dir, meta)
ret = [(id, meta, path_file, path_meta)]
index_infos = []
for file_id in file_ids:
file = download_by_id(file_id, storage_dir)
index_infos.append(_file_storage_path('..', file[1]))
ret.append(file)
_write_index(path_file, index_infos)
_meta(meta, path_meta)
return ret
def download_by_id(id, storage_dir):
url = f'http://solidfiles.com/d/{id}/'
path_meta = _meta_storage_path(storage_dir, id)
if os.path.exists(path_meta):
L.info(f"Exists: {url}")
with open(path_meta) as f:
meta = json.load(f)
return id, meta, _file_storage_path(storage_dir, meta), path_meta
_make_storage_path(storage_dir, id)
resp = _retried_get(url)
if resp.status_code == 404:
L.info(f"404: {url}")
meta = {
'id': id,
'status': 'not_found',
}
_meta(meta, path_meta)
return id, meta, None, path_meta
if resp.status_code != 200:
import pdb; pdb.set_trace()
#
viewer_options_line = next(l for l in resp.text.splitlines() if 'viewerOptions' in l)
viewer_options_text = viewer_options_line.partition("viewerOptions', ")[2]
viewer_options_text = viewer_options_text.rpartition(')')[0]
meta = json.loads(viewer_options_text)
meta['id'] = id
path_file = _file_storage_path(storage_dir, meta)
dl_url = meta['downloadUrl']
if dl_url:
_dl(dl_url, path_file)
else:
L.info(f"Metadata did not contain download URL")
_meta(meta, path_meta)
return id, meta, path_file, path_meta
if __name__ == '__main__':
import sys
if len(sys.argv) != 4:
print(f"Usage: {sys.argv[0]} input_files.json storage_dir info.json")
sys.exit(-1)
with open(sys.argv[1]) as f:
urls = json.load(f)
results = []
for i, url in enumerate(urls):
infos = download(url, storage_dir=sys.argv[2])
for info in infos:
id, meta, path_file, path_meta = info
results.append(info)
L.info(f"Downloaded {id} ({i+1}/{len(urls)})")
with open(sys.argv[3], 'w') as f:
json.dump(results, f)