Source code for arsenal.download

import os
from arsenal.fsutils import mkdir, secure_filename
from arsenal.robust import timelimit, retry
from arsenal.misc import ignore_error
from urllib.request import Request, build_opener


[docs]def urlread(url): req_headers = { 'User-Agent': 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US)' ' AppleWebKit/525.13 (KHTML, like Gecko)' ' Chrome/0.A.B.C Safari/525.13', 'Referer': 'http://python.org' } request = Request(url, headers=req_headers) response = build_opener().open(request) code = response.code headers = response.headers contents = response.read() return code, headers, contents
[docs]def download(url, usecache=True, cached=None, cachedir='cache~/', cachedonly=False, **opts): """ Download (or cache) ``url`` to file. On success: return file name of stored contents. Upon failure: return None. Will retry ``tries`` times with ``pause`` seconds between each attempt to download. Download will timeout after ``timeout`` seconds. If ``cachedonly`` is enabled, this function will not download anything. It will simply return the cached filename if it exists. """ if not cached: if cachedir: mkdir(cachedir) cached = os.path.join(cachedir, secure_filename(url)) else: assert not usecache, 'must specify cachedir' # only return something for cached files if cachedonly and not os.path.exists(cached): return if usecache and os.path.exists(cached): return cached # use wget for ftp files if url.startswith('ftp'): return wget(url, cached) if url.startswith('http'): return robust_download(url, cached, **opts)
[docs]def wget(url, filename): """ Wraps call to wget to download ``url`` to ``filename``. """ retcode = os.system("wget '%s' -O '%s'" % (url, filename)) if retcode != 0: if os.path.exists(filename): os.remove(filename) return return filename
[docs]def robust_download(url, filename, tries=3, pause=0.1, timeout=30, verbose=True): """ Attempts ``tries`` times to download and write contents ``url`` to ``filename``. Will timeout after ``timeout`` seconds. returns ``None`` upon failure and ``filename`` on success. """ if verbose: print('trying to download %s to file://%s' % (url, filename)) @retry(tries=tries, pause=pause) @timelimit(timeout) def _download(): with open(filename, 'wb') as f: [code, _, contents] = urlread(url) assert code == 200 f.write(contents) return filename result = None with ignore_error(): result =_download() # delete file on failure if not result: if verbose: print(' failed to download') if os.path.exists(filename): if verbose: print(' deleting file') os.remove(filename) return else: if verbose: print(' download successful') return filename
if __name__ == '__main__': download('http://timvieira.github.com', cachedir='/tmp/')