Get progress from a streaming copy of the shutil file

I have an application from which the file was copied from src to dst :

 import shutil from threading import Thread t = Thread(target=shutil.copy, args=[ src, dst ]).start() 

I want the application to request a copy progress every 5 seconds without blocking the application itself. Is it possible?

My intention is to set this progress to QtGui.QLabel to give the user feedback on copying the file.

Can this be achieved when copying using a file with files with a threaded closed file?

+8
python multithreading
source share
4 answers

shutil.copy() does not offer any options for tracking progress, no. In most cases, you can control the size of the destination file (using the os.* Functions in the target file name).

An alternative would be to implement your own copy function. The implementation is actually quite simple; shutil.copy() is basically shutil.copyfile() plus shutil.copymode() call; shutil.copyfile() in turn delegates the real work of shutil.copyfileobj() (links to Python source code).

Implementing your own shutil.copyfileobj() to enable progress should be trivial; implement support for the callback function to inform your program every time another block is copied:

 def copyfileobj(fsrc, fdst, callback, length=16*1024): copied = 0 while True: buf = fsrc.read(length) if not buf: break fdst.write(buf) copied += len(buf) callback(copied) 

and compare the copied size with the file size.

+12
source share

No, this cannot be done because shutil.copy does not have the means to ensure progress.

But you can write your own copy function (or even develop code from shutil - do not indicate that this is one of the modules that includes a link to the source at the top, which means that it should be just as useful for sample code as for use as-is). Your function can, for example, perform the progress callback function as an additional argument and call it after each buffer (or every N buffers, or every N bytes, or every N seconds). Something like:

 def copy(src, dst, progress): # ... for something: progress(bytes_so_far, bytes_total) # ... progress(bytes_total, bytes_total) 

Now this callback will still be called in the background thread, and not in the main thread. With most GUIs, this means that it cannot directly touch any GUI widgets. But most GUIs have the ability to post to the main thread's event loop from the background thread, so just make a callback. With Qt, you do this with signals and slots, just like in the main thread; there are many great lessons if you don’t know how to do this.

Alternatively, you can do it the way you suggested: transfer the main thread to the background thread (for example, by publishing to queue.Queue or run Event or Condition ) and have the copy function check this signal every time through the loop and respond. But it seems more complex and less responsive.

One more thing: Qt has its own thread library, and you can use it instead of the native Python, because you can attach the slot directly to the QThread object and make your callback. I'm not sure, but Qt may even have its own copy file copy methods somewhere; they try to complete everything that can be between different platforms and is vaguely connected with graphical interfaces.

0
source share

In addition to Martijn Pieters, an excellent answer is if (like me, I'm an idiot ) you need to figure out how to get the actual callback to the copyfileobj() function, you can do it like this:

 def myscopefunction(): ### Inside wherever you want to call the copyfileobj() function, you can ### make a nested function like so: def progress(bytescopied): updateui(bytescopied) #update your progress bar or whatever #and then call it like this copyfileobj(source,destination,progress) ... 
0
source share

I put together Martijn Pieters answer with some progress code from this answer with changes in PyCharm from this answer , which gives me the following. The copy_with_progress function was my goal.

 import os import shutil def progress_percentage(perc, width=None): # This will only work for python 3.3+ due to use of # os.get_terminal_size the print function etc. FULL_BLOCK = 'β–ˆ' # this is a gradient of incompleteness INCOMPLETE_BLOCK_GRAD = ['β–‘', 'β–’', 'β–“'] assert(isinstance(perc, float)) assert(0. <= perc <= 100.) # if width unset use full terminal if width is None: width = os.get_terminal_size().columns # progress bar is block_widget separator perc_widget : ####### 30% max_perc_widget = '[100.00%]' # 100% is max separator = ' ' blocks_widget_width = width - len(separator) - len(max_perc_widget) assert(blocks_widget_width >= 10) # not very meaningful if not perc_per_block = 100.0/blocks_widget_width # epsilon is the sensitivity of rendering a gradient block epsilon = 1e-6 # number of blocks that should be represented as complete full_blocks = int((perc + epsilon)/perc_per_block) # the rest are "incomplete" empty_blocks = blocks_widget_width - full_blocks # build blocks widget blocks_widget = ([FULL_BLOCK] * full_blocks) blocks_widget.extend([INCOMPLETE_BLOCK_GRAD[0]] * empty_blocks) # marginal case - remainder due to how granular our blocks are remainder = perc - full_blocks*perc_per_block # epsilon needed for rounding errors (check would be != 0.) # based on reminder modify first empty block shading # depending on remainder if remainder > epsilon: grad_index = int((len(INCOMPLETE_BLOCK_GRAD) * remainder)/perc_per_block) blocks_widget[full_blocks] = INCOMPLETE_BLOCK_GRAD[grad_index] # build perc widget str_perc = '%.2f' % perc # -1 because the percentage sign is not included perc_widget = '[%s%%]' % str_perc.ljust(len(max_perc_widget) - 3) # form progressbar progress_bar = '%s%s%s' % (''.join(blocks_widget), separator, perc_widget) # return progressbar as string return ''.join(progress_bar) def copy_progress(copied, total): print('\r' + progress_percentage(100*copied/total, width=30), end='') def copyfile(src, dst, *, follow_symlinks=True): """Copy data from src to dst. If follow_symlinks is not set and src is a symbolic link, a new symlink will be created instead of copying the file it points to. """ if shutil._samefile(src, dst): raise shutil.SameFileError("{!r} and {!r} are the same file".format(src, dst)) for fn in [src, dst]: try: st = os.stat(fn) except OSError: # File most likely does not exist pass else: # XXX What about other special files? (sockets, devices...) if shutil.stat.S_ISFIFO(st.st_mode): raise shutil.SpecialFileError("`%s` is a named pipe" % fn) if not follow_symlinks and os.path.islink(src): os.symlink(os.readlink(src), dst) else: size = os.stat(src).st_size with open(src, 'rb') as fsrc: with open(dst, 'wb') as fdst: copyfileobj(fsrc, fdst, callback=copy_progress, total=size) return dst def copyfileobj(fsrc, fdst, callback, total, length=16*1024): copied = 0 while True: buf = fsrc.read(length) if not buf: break fdst.write(buf) copied += len(buf) callback(copied, total=total) def copy_with_progress(src, dst, *, follow_symlinks=True): if os.path.isdir(dst): dst = os.path.join(dst, os.path.basename(src)) copyfile(src, dst, follow_symlinks=follow_symlinks) shutil.copymode(src, dst) return dst 
0
source share

All Articles