Trying to split file upload buffer into separate streams

I am trying to load a file buffer in 5 threads, but it seems to be distorted.

from numpy import arange import requests from threading import Thread import urllib2 url = 'http://pymotw.com/2/urllib/index.html' sizeInBytes = r = requests.head(url, headers={'Accept-Encoding': 'identity'}).headers['content-length'] splitBy = 5 splits = arange(splitBy + 1) * (float(sizeInBytes)/splitBy) dataLst = [] def bufferSplit(url, idx, splits): req = urllib2.Request(url, headers={'Range': 'bytes=%d-%d' % (splits[idx], splits[idx+1])}) print {'bytes=%d-%d' % (splits[idx], splits[idx+1])} dataLst.append(urllib2.urlopen(req).read()) for idx in range(splitBy): dlth = Thread(target=bufferSplit, args=(url, idx, splits)) dlth.start() print dataLst with open('page.html', 'w') as fh: fh.write(''.join(dataLst)) 

Update: Thus, I worked on and got a little, but progress, however, if I download jpg, it seems to be damaged;

 from numpy import arange import os import requests import threading import urllib2 # url ='http://s1.fans.ge/mp3/201109/08/John_Legend_So_High_Remix(fans_ge).mp3' url = "http://www.nasa.gov/images/content/607800main_kepler1200_1600-1200.jpg" # url = 'http://pymotw.com/2/urllib/index.html' sizeInBytes = requests.head(url, headers={'Accept-Encoding': 'identity'}).headers.get('content-length', None) splitBy = 5 dataLst = [] class ThreadedFetch(threading.Thread): """ docstring for ThreadedFetch """ def __init__(self, url, fileName, splitBy=5): super(ThreadedFetch, self).__init__() self.__url = url self.__spl = splitBy self.__dataLst = [] self.__fileName = fileName def run(self): if not sizeInBytes: print "Size cannot be determined." return splits = arange(self.__spl + 1) * (float(sizeInBytes)/self.__spl) for idx in range(self.__spl): req = urllib2.Request(self.__url, headers={'Range': 'bytes=%d-%d' % (splits[idx], splits[idx+1])}) self.__dataLst.append(urllib2.urlopen(req).read()) def getFileData(self): return ''.join(self.__dataLst) fileName = url.split('/')[-1] dl = ThreadedFetch(url, fileName) dl.start() dl.join() content = dl.getFileData() if content: with open(fileName, 'w') as fh: fh.write(content) print "Finished Writing file %s" % fileName 

The following shows how the image is after loading.

corrupted image

+1
source share
2 answers

Here is another version of the project. Differences:

  • stream code is a small function

  • each thread loads a piece and then saves it in the global thread safety dictionary

  • threads are started, then join() ed - they all start at the same time

  • when done, the data is collected in the correct order and then written to disk

  • extra print to check everything is correct

  • the size of the output file is calculated, for additional comparison

A source

 import os, requests import threading import urllib2 import time URL = "http://www.nasa.gov/images/content/607800main_kepler1200_1600-1200.jpg" def buildRange(value, numsplits): lst = [] for i in range(numsplits): if i == 0: lst.append('%s-%s' % (i, int(round(1 + i * value/(numsplits*1.0) + value/(numsplits*1.0)-1, 0)))) else: lst.append('%s-%s' % (int(round(1 + i * value/(numsplits*1.0),0)), int(round(1 + i * value/(numsplits*1.0) + value/(numsplits*1.0)-1, 0)))) return lst def main(url=None, splitBy=3): start_time = time.time() if not url: print "Please Enter some url to begin download." return fileName = url.split('/')[-1] sizeInBytes = requests.head(url, headers={'Accept-Encoding': 'identity'}).headers.get('content-length', None) print "%s bytes to download." % sizeInBytes if not sizeInBytes: print "Size cannot be determined." return dataDict = {} # split total num bytes into ranges ranges = buildRange(int(sizeInBytes), splitBy) def downloadChunk(idx, irange): req = urllib2.Request(url) req.headers['Range'] = 'bytes={}'.format(irange) dataDict[idx] = urllib2.urlopen(req).read() # create one downloading thread per chunk downloaders = [ threading.Thread( target=downloadChunk, args=(idx, irange), ) for idx,irange in enumerate(ranges) ] # start threads, let run in parallel, wait for all to finish for th in downloaders: th.start() for th in downloaders: th.join() print 'done: got {} chunks, total {} bytes'.format( len(dataDict), sum( ( len(chunk) for chunk in dataDict.values() ) ) ) print "--- %s seconds ---" % str(time.time() - start_time) if os.path.exists(fileName): os.remove(fileName) # reassemble file in correct order with open(fileName, 'w') as fh: for _idx,chunk in sorted(dataDict.iteritems()): fh.write(chunk) print "Finished Writing file %s" % fileName print 'file size {} bytes'.format(os.path.getsize(fileName)) if __name__ == '__main__': main(URL) 

Exit

 102331 bytes to download. done: got 3 chunks, total 102331 bytes --- 0.380599021912 seconds --- Finished Writing file 607800main_kepler1200_1600-1200.jpg file size 102331 bytes 
+3
source

This is how I started to work, if anyone has any suggestions for a possible improvement, you will be very pleased.

 import os import requests import threading import urllib2 import time url = "http://www.nasa.gov/images/content/607800main_kepler1200_1600-1200.jpg" def buildRange(value, numsplits): lst = [] for i in range(numsplits): if i == 0: lst.append('%s-%s' % (i, int(round(1 + i * value/(numsplits*1.0) + value/(numsplits*1.0)-1, 0)))) else: lst.append('%s-%s' % (int(round(1 + i * value/(numsplits*1.0),0)), int(round(1 + i * value/(numsplits*1.0) + value/(numsplits*1.0)-1, 0)))) return lst class SplitBufferThreads(threading.Thread): """ Splits the buffer to ny number of threads thereby, concurrently downloading through ny number of threads. """ def __init__(self, url, byteRange): super(SplitBufferThreads, self).__init__() self.__url = url self.__byteRange = byteRange self.req = None def run(self): self.req = urllib2.Request(self.__url, headers={'Range': 'bytes=%s' % self.__byteRange}) def getFileData(self): return urllib2.urlopen(self.req).read() def main(url=None, splitBy=3): start_time = time.time() if not url: print "Please Enter some url to begin download." return fileName = url.split('/')[-1] sizeInBytes = requests.head(url, headers={'Accept-Encoding': 'identity'}).headers.get('content-length', None) print "%s bytes to download." % sizeInBytes if not sizeInBytes: print "Size cannot be determined." return dataLst = [] for idx in range(splitBy): byteRange = buildRange(int(sizeInBytes), splitBy)[idx] bufTh = SplitBufferThreads(url, byteRange) bufTh.start() bufTh.join() dataLst.append(bufTh.getFileData()) content = ''.join(dataLst) if dataLst: if os.path.exists(fileName): os.remove(fileName) print "--- %s seconds ---" % str(time.time() - start_time) with open(fileName, 'w') as fh: fh.write(content) print "Finished Writing file %s" % fileName if __name__ == '__main__': main(url) 

this is the first bone code I received, I found that if I installed the bufTh thread stream for Daemon False, then the process takes longer to complete.

+2
source

All Articles