Post Reply 
Another Filtering Proxy
May. 29, 2015, 04:07 AM (This post was last modified: May. 29, 2015 04:13 AM by cattleyavns.)
Post: #19
RE: Another Filtering Proxy
I also want to report a funny bug of AFProxy:
thread.dameon = True

Must be as the official documentation:
thread.daemon = True

But if I change it to thread.daemon = True, simply AFProxy stop working, so I decided to remove it by commenting it. Did we really need this line "thread.daemon = True" ?

(May. 28, 2015 09:53 AM)whenever Wrote:  I am not sure if I get your point, but self.headers is available as req.headers in URLFilter.py, and you can operate it freely as you want.

Thank you, but the headers variable I want to get and change is the "headers" in headers=headers from version 0.4, I want to change it with for example "URLFilter.py", I already could change self.headers with req.headers.

(May. 28, 2015 09:53 AM)whenever Wrote:  I think you can create the file in advance, and in each thread write the data to specified offset via f.seek(offset, from_what). Also you need to take care of Semaphore acquire() and release(). They are all documented in the manual.

Thank you, here is what I get so far, hope this contribute a little bit if you want to add new feature to AFProxy, also the way to do bandwidth throttling (speed limit):

Split file to parts and download and join in parallel:
Code:
import threading
import urllib.request, urllib.error, urllib.parse
import sys

max_thread = 10
# Initialize lock
lock = threading.RLock()

class Downloader(threading.Thread):
    def __init__(self, url, start_size, end_size, fobj, buffer):
        self.url = url
        self.buffer = buffer
        self.start_size = start_size
        self.end_size = end_size
        self.fobj = fobj
        threading.Thread.__init__(self)

    def run(self):
        """
vest only
        """
        with lock:
            print(('starting: %s' % self.getName()))
        self._download()

    def _download(self):
        """
I'm the one moving bricks
        """
        req = urllib.request.Request(self.url)
# Add HTTP Header (RANGE) set to download the data range
        req.headers['Range'] = 'bytes=%s-%s' % (self.start_size, self.end_size)
        f = urllib.request.urlopen(req)
# initialize the current thread file object offsets
        offset = self.start_size
        while 1:
            block = f.read(self.buffer)
# exit the current thread after data acquisition is completed
            if not block:
                with lock:
                    print(('%s done.' % self.getName()))
                break
# write data such as the time course locked threads
# Use with lock instead of the traditional lock.acquire () ..... lock.release ()
# requires python> = 2.5
            with lock:
                sys.stdout.write('%s saveing block...' % self.getName())
# Set the file object offset address
                self.fobj.seek(offset)
# write access to data
                self.fobj.write(block)
                print(block)
                offset = offset + len(block)
                sys.stdout.write('done.\n')


def main(url, thread=3, save_file='', buffer=1024):
# The maximum number of threads can not exceed max_thread
    thread = thread if thread <= max_thread else max_thread
# get file size
    req = urllib.request.urlopen(url)
    size = int(req.getheader('Content-Length'))
    print(size)
# object initialization file
    fobj = open(save_file, 'wb')
# calculated for each thread is responsible for the http Range size based on the number of threads
    avg_size, pad_size = divmod(size, thread)
    plist = []
    for i in range(thread):
        start_size = i*avg_size
        end_size = start_size + avg_size - 1
        if i == thread - 1:
# last thread plus pad_size
            end_size = end_size + pad_size + 1
        t = Downloader(url, start_size, end_size, fobj, buffer)
        plist.append(t)

# start moving bricks
    for t in plist:
        t.start()

# wait for all threads to finish
    #for t in plist:
        t.join()

# end of the course, remember to close the file object
    fobj.close()
    print('Download completed!')

if __name__ == '__main__':
    url = 'http://userscripts-mirror.org/scripts/source/57662.user.js'
    main(url=url, thread=4, save_file='a.user.js', buffer=16*1024)

Limit download speed:
Code:
"""Rate limiters with shared token bucket."""

import os
import sys
import threading
import time
import urllib.request, urllib.parse, urllib.error
import urllib.parse

class TokenBucket(object):
    """An implementation of the token bucket algorithm.
    source: http://code.activestate.com/recipes/511490/

    >>> bucket = TokenBucket(80, 0.5)
    >>> print bucket.consume(10)
    True
    >>> print bucket.consume(90)
    False
    """
    def __init__(self, tokens, fill_rate):
        """tokens is the total tokens in the bucket. fill_rate is the
        rate in tokens/second that the bucket will be refilled."""
        self.capacity = float(tokens)
        self._tokens = float(tokens)
        self.fill_rate = float(fill_rate)
        self.timestamp = time.time()
        self.lock = threading.RLock()

    def consume(self, tokens):
        """Consume tokens from the bucket. Returns 0 if there were
        sufficient tokens, otherwise the expected time until enough
        tokens become available."""
        self.lock.acquire()
        tokens = max(tokens,self.tokens)
        expected_time = (tokens - self.tokens) / self.fill_rate
        if expected_time <= 0:
            self._tokens -= tokens
        self.lock.release()
        return max(0,expected_time)

    @property
    def tokens(self):
        self.lock.acquire()
        if self._tokens < self.capacity:
            now = time.time()
            delta = self.fill_rate * (now - self.timestamp)
            self._tokens = min(self.capacity, self._tokens + delta)
            self.timestamp = now
        value = self._tokens
        self.lock.release()
        return value

class RateLimit(object):
    """Rate limit a url fetch.
    source: http://mail.python.org/pipermail/python-list/2008-January/472859.html
    (but mostly rewritten)
    """
    def __init__(self, bucket, filename):
        self.bucket = bucket
        self.last_update = 0
        self.last_downloaded_kb = 0

        self.filename = filename
        self.avg_rate = None

    def __call__(self, block_count, block_size, total_size):
        total_kb = total_size / 1024.

        downloaded_kb = (block_count * block_size) / 1024.
        just_downloaded = downloaded_kb - self.last_downloaded_kb
        self.last_downloaded_kb = downloaded_kb

        predicted_size = block_size/1024.

        wait_time = self.bucket.consume(predicted_size)
        while wait_time > 0:
            time.sleep(wait_time)
            wait_time = self.bucket.consume(predicted_size)

        now = time.time()
        delta = now - self.last_update
        if self.last_update != 0:
            if delta > 0:
                rate = just_downloaded / delta
                if self.avg_rate is not None:
                    rate = 0.9 * self.avg_rate + 0.1 * rate
                self.avg_rate = rate
            else:
                rate = self.avg_rate or 0.
            print(("%20s: %4.1f%%, %5.1f KiB/s, %.1f/%.1f KiB" % (
                    self.filename, 100. * downloaded_kb / total_kb,
                    rate, downloaded_kb, total_kb,
                )))
        self.last_update = now


def main():
    """Fetch the contents of urls"""

    rate_limit  = float(20)
    urls = {"http://userscripts-mirror.org/scripts/source/57662.user.js"}
    bucket = TokenBucket(10*rate_limit, rate_limit)

    print(("rate limit = %.1f" % (rate_limit,)))

    threads = []
    for url in urls:
        path = urllib.parse.urlparse(url,'http')[2]
        filename = os.path.basename(path)
        print(('Downloading "%s" to "%s"...' % (url,filename)))
        rate_limiter = RateLimit(bucket, filename)
        t = threading.Thread(
            target=urllib.request.urlretrieve,
            args=(url, filename, rate_limiter))
        t.start()
        threads.append(t)

    for t in threads:
        t.join()

    print('All downloads finished')

if __name__ == "__main__":
    main()
Add Thank You Quote this message in a reply
Post Reply 


Messages In This Thread
Another Filtering Proxy - whenever - Nov. 22, 2014, 09:35 AM
RE: Another Filtering Proxy - whenever - Nov. 29, 2014, 11:24 AM
RE: Another Filtering Proxy - GunGunGun - Nov. 29, 2014, 02:15 PM
RE: Another Filtering Proxy - whenever - Nov. 30, 2014, 12:35 PM
RE: Another Filtering Proxy - GunGunGun - Dec. 03, 2014, 09:07 PM
RE: Another Filtering Proxy - whenever - Dec. 04, 2014, 01:09 AM
RE: Another Filtering Proxy - GunGunGun - Dec. 04, 2014, 02:33 AM
RE: Another Filtering Proxy - GunGunGun - Dec. 04, 2014, 03:27 PM
RE: Another Filtering Proxy - whenever - Dec. 05, 2014, 08:36 AM
RE: Another Filtering Proxy - GunGunGun - Dec. 05, 2014, 09:05 AM
RE: Another Filtering Proxy - whenever - Dec. 08, 2014, 03:30 AM
RE: Another Filtering Proxy - GunGunGun - Dec. 08, 2014, 09:09 AM
RE: Another Filtering Proxy - whenever - Dec. 08, 2014, 12:11 PM
RE: Another Filtering Proxy - whenever - Dec. 28, 2014, 10:50 AM
RE: Another Filtering Proxy - cattleyavns - May. 26, 2015, 06:22 AM
RE: Another Filtering Proxy - whenever - May. 27, 2015, 02:26 PM
RE: Another Filtering Proxy - cattleyavns - May. 28, 2015, 04:26 AM
RE: Another Filtering Proxy - whenever - May. 28, 2015, 09:53 AM
RE: Another Filtering Proxy - cattleyavns - May. 29, 2015 04:07 AM
RE: Another Filtering Proxy - whenever - Jun. 03, 2015, 07:56 AM
RE: Another Filtering Proxy - cattleyavns - Jun. 05, 2015, 12:26 PM
RE: Another Filtering Proxy - whenever - Jul. 19, 2015, 07:32 AM
RE: Another Filtering Proxy - cattleyavns - Jul. 19, 2015, 05:53 PM
RE: Another Filtering Proxy - cattleyavns - Jun. 17, 2015, 09:14 AM
RE: Another Filtering Proxy - cattleyavns - Jul. 01, 2015, 08:55 AM
RE: Another Filtering Proxy - whenever - Jul. 20, 2015, 03:31 AM
RE: Another Filtering Proxy - cattleyavns - Jul. 20, 2015, 06:17 AM
RE: Another Filtering Proxy - cattleyavns - Jan. 05, 2016, 04:53 PM
RE: Another Filtering Proxy - whenever - Jan. 06, 2016, 08:44 AM
RE: Another Filtering Proxy - cattleyavns - Jan. 06, 2016, 07:40 PM
RE: Another Filtering Proxy - whenever - Jan. 07, 2016, 02:25 AM
RE: Another Filtering Proxy - cattleyavns - Jan. 07, 2016, 08:41 AM
RE: Another Filtering Proxy - cattleyavns - Jan. 10, 2016, 07:27 AM
RE: Another Filtering Proxy - cattleyavns - Jan. 25, 2016, 04:37 PM
RE: Another Filtering Proxy - whenever - May. 16, 2016, 08:42 AM

Forum Jump: