I also want to report a funny bug of AFProxy:
thread.
dameon = True
Must be as the official documentation:
thread.daemon = True
But if I change it to thread.daemon = True, simply AFProxy stop working, so I decided to remove it by commenting it. Did we really need this line "thread.daemon = True" ?
(May. 28, 2015 09:53 AM)whenever Wrote: I am not sure if I get your point, but self.headers is available as req.headers in URLFilter.py, and you can operate it freely as you want.
Thank you, but the headers variable I want to get and change is the "headers" in headers=headers from version 0.4, I want to change it with for example "URLFilter.py", I already could change self.headers with req.headers.
(May. 28, 2015 09:53 AM)whenever Wrote: I think you can create the file in advance, and in each thread write the data to specified offset via f.seek(offset, from_what). Also you need to take care of Semaphore acquire() and release(). They are all documented in the manual.
Thank you, here is what I get so far, hope this contribute a little bit if you want to add new feature to AFProxy, also the way to do bandwidth throttling (speed limit):
Split file to parts and download and join in parallel:
Code:
import threading
import urllib.request, urllib.error, urllib.parse
import sys
max_thread = 10
# Initialize lock
lock = threading.RLock()
class Downloader(threading.Thread):
def __init__(self, url, start_size, end_size, fobj, buffer):
self.url = url
self.buffer = buffer
self.start_size = start_size
self.end_size = end_size
self.fobj = fobj
threading.Thread.__init__(self)
def run(self):
"""
vest only
"""
with lock:
print(('starting: %s' % self.getName()))
self._download()
def _download(self):
"""
I'm the one moving bricks
"""
req = urllib.request.Request(self.url)
# Add HTTP Header (RANGE) set to download the data range
req.headers['Range'] = 'bytes=%s-%s' % (self.start_size, self.end_size)
f = urllib.request.urlopen(req)
# initialize the current thread file object offsets
offset = self.start_size
while 1:
block = f.read(self.buffer)
# exit the current thread after data acquisition is completed
if not block:
with lock:
print(('%s done.' % self.getName()))
break
# write data such as the time course locked threads
# Use with lock instead of the traditional lock.acquire () ..... lock.release ()
# requires python> = 2.5
with lock:
sys.stdout.write('%s saveing block...' % self.getName())
# Set the file object offset address
self.fobj.seek(offset)
# write access to data
self.fobj.write(block)
print(block)
offset = offset + len(block)
sys.stdout.write('done.\n')
def main(url, thread=3, save_file='', buffer=1024):
# The maximum number of threads can not exceed max_thread
thread = thread if thread <= max_thread else max_thread
# get file size
req = urllib.request.urlopen(url)
size = int(req.getheader('Content-Length'))
print(size)
# object initialization file
fobj = open(save_file, 'wb')
# calculated for each thread is responsible for the http Range size based on the number of threads
avg_size, pad_size = divmod(size, thread)
plist = []
for i in range(thread):
start_size = i*avg_size
end_size = start_size + avg_size - 1
if i == thread - 1:
# last thread plus pad_size
end_size = end_size + pad_size + 1
t = Downloader(url, start_size, end_size, fobj, buffer)
plist.append(t)
# start moving bricks
for t in plist:
t.start()
# wait for all threads to finish
#for t in plist:
t.join()
# end of the course, remember to close the file object
fobj.close()
print('Download completed!')
if __name__ == '__main__':
url = 'http://userscripts-mirror.org/scripts/source/57662.user.js'
main(url=url, thread=4, save_file='a.user.js', buffer=16*1024)
Limit download speed:
Code:
"""Rate limiters with shared token bucket."""
import os
import sys
import threading
import time
import urllib.request, urllib.parse, urllib.error
import urllib.parse
class TokenBucket(object):
"""An implementation of the token bucket algorithm.
source: http://code.activestate.com/recipes/511490/
>>> bucket = TokenBucket(80, 0.5)
>>> print bucket.consume(10)
True
>>> print bucket.consume(90)
False
"""
def __init__(self, tokens, fill_rate):
"""tokens is the total tokens in the bucket. fill_rate is the
rate in tokens/second that the bucket will be refilled."""
self.capacity = float(tokens)
self._tokens = float(tokens)
self.fill_rate = float(fill_rate)
self.timestamp = time.time()
self.lock = threading.RLock()
def consume(self, tokens):
"""Consume tokens from the bucket. Returns 0 if there were
sufficient tokens, otherwise the expected time until enough
tokens become available."""
self.lock.acquire()
tokens = max(tokens,self.tokens)
expected_time = (tokens - self.tokens) / self.fill_rate
if expected_time <= 0:
self._tokens -= tokens
self.lock.release()
return max(0,expected_time)
@property
def tokens(self):
self.lock.acquire()
if self._tokens < self.capacity:
now = time.time()
delta = self.fill_rate * (now - self.timestamp)
self._tokens = min(self.capacity, self._tokens + delta)
self.timestamp = now
value = self._tokens
self.lock.release()
return value
class RateLimit(object):
"""Rate limit a url fetch.
source: http://mail.python.org/pipermail/python-list/2008-January/472859.html
(but mostly rewritten)
"""
def __init__(self, bucket, filename):
self.bucket = bucket
self.last_update = 0
self.last_downloaded_kb = 0
self.filename = filename
self.avg_rate = None
def __call__(self, block_count, block_size, total_size):
total_kb = total_size / 1024.
downloaded_kb = (block_count * block_size) / 1024.
just_downloaded = downloaded_kb - self.last_downloaded_kb
self.last_downloaded_kb = downloaded_kb
predicted_size = block_size/1024.
wait_time = self.bucket.consume(predicted_size)
while wait_time > 0:
time.sleep(wait_time)
wait_time = self.bucket.consume(predicted_size)
now = time.time()
delta = now - self.last_update
if self.last_update != 0:
if delta > 0:
rate = just_downloaded / delta
if self.avg_rate is not None:
rate = 0.9 * self.avg_rate + 0.1 * rate
self.avg_rate = rate
else:
rate = self.avg_rate or 0.
print(("%20s: %4.1f%%, %5.1f KiB/s, %.1f/%.1f KiB" % (
self.filename, 100. * downloaded_kb / total_kb,
rate, downloaded_kb, total_kb,
)))
self.last_update = now
def main():
"""Fetch the contents of urls"""
rate_limit = float(20)
urls = {"http://userscripts-mirror.org/scripts/source/57662.user.js"}
bucket = TokenBucket(10*rate_limit, rate_limit)
print(("rate limit = %.1f" % (rate_limit,)))
threads = []
for url in urls:
path = urllib.parse.urlparse(url,'http')[2]
filename = os.path.basename(path)
print(('Downloading "%s" to "%s"...' % (url,filename)))
rate_limiter = RateLimit(bucket, filename)
t = threading.Thread(
target=urllib.request.urlretrieve,
args=(url, filename, rate_limiter))
t.start()
threads.append(t)
for t in threads:
t.join()
print('All downloads finished')
if __name__ == "__main__":
main()