Ticket #14: zfec-1.4.22-streaming-new.patch

File zfec-1.4.22-streaming-new.patch, 7.7 KB (added by davidsarah, 10 years ago)

Make streaming from stdin / to stdout work - patch against 1.4.22

  • zfec-1.4.22/zfec/cmdline_zfec.py

    diff -Naur orig/zfec-1.4.22/zfec/cmdline_zfec.py streaming/zfec-1.4.22/zfec/cmdline_zfec.py
    old new  
    5757        if args.requiredshares == args.totalshares:
    5858            print "warning: silly parameters: requiredshares == totalshares, which means that all shares will be required in order to reconstruct the file.  You could use \"split\" for the same effect.  But proceeding to do it anyway..."
    5959
    60     args.inputfile.seek(0, 2)
    61     fsize = args.inputfile.tell()
    62     args.inputfile.seek(0, 0)
     60    if args.inputfile.name == '<stdin>':
     61      fsize = None
     62    else:
     63      args.inputfile.seek(0, 2)
     64      fsize = args.inputfile.tell()
     65      args.inputfile.seek(0, 0)
    6366    return filefec.encode_to_files(args.inputfile, fsize, args.output_dir, args.prefix, args.requiredshares, args.totalshares, args.suffix, args.force, args.verbose)
    6467
    6568# zfec -- fast forward error correction library with Python interface
  • zfec-1.4.22/zfec/cmdline_zunfec.py

    diff -Naur orig/zfec-1.4.22/zfec/cmdline_zunfec.py streaming/zfec-1.4.22/zfec/cmdline_zunfec.py
    old new  
    3030        print "At least two sharefiles are required."
    3131        return 1
    3232
    33     if args.force:
     33    if args.outputfile == '-':
     34        outf = sys.stdout
     35    elif args.force:
    3436        outf = open(args.outputfile, 'wb')
    3537    else:
    3638        try:
     
    4951    for fn in args.sharefiles:
    5052        sharefs.append(open(fn, 'rb'))
    5153    try:
    52         ret = filefec.decode_from_files(outf, sharefs, args.verbose)
     54        ret = filefec.decode_from_files(outf, sharefs, args.verbose, sys.stdout==outf)
    5355    except filefec.InsufficientShareFilesError, e:
    5456        print str(e)
    5557        return 3
  • zfec-1.4.22/zfec/filefec.py

    diff -Naur orig/zfec-1.4.22/zfec/filefec.py streaming/zfec-1.4.22/zfec/filefec.py
    old new  
    22from pyutil import fileutil
    33from pyutil.mathutil import pad_size, log_ceil
    44
    5 import array, os, struct
     5import array, os, struct, sys
    66
    77CHUNKSIZE = 4096
    88
     
    164164    Encode inf, writing the shares to specially named, newly created files.
    165165
    166166    @param fsize: calling read() on inf must yield fsize bytes of data and
    167         then raise an EOFError
     167        then raise an EOFError.  If None, the file is being read as a stream
     168        and the size is not known in advance, header must be updated after
     169        the reading is complete.
    168170    @param dirname: the name of the directory into which the sharefiles will
    169171        be written
    170172    """
    171173    mlen = len(str(m))
    172174    format = FORMAT_FORMAT % (mlen, mlen,)
    173175
    174     padbytes = pad_size(fsize, k)
     176    if fsize is None:
     177        # Streaming case, file size is unknown.
     178        # We'll recompute the pad at the end and write it
     179        padbytes = pad_size(0, k)
     180    else:
     181        padbytes = pad_size(fsize, k)
    175182
    176183    fns = []
    177184    fs = []
     
    197204            oldsumlen = sumlen[0]
    198205            sumlen[0] += length
    199206            if verbose:
    200                 if int((float(oldsumlen) / fsize) * 10) != int((float(sumlen[0]) / fsize) * 10):
    201                     print str(int((float(sumlen[0]) / fsize) * 10) * 10) + "% ...",
     207                if fsize is None:
     208                    # 30MB takes a short while to write on a normal drive.
     209                    interval = MILLION_BYTES * 30
     210                    if int((float(oldsumlen) / interval)) != int((float(sumlen[0]) / interval)):
     211                        print str(int((float(sumlen[0]) / MILLION_BYTES))) + "MB ...",
     212                else:
     213                    if int((float(oldsumlen) / fsize) * 10) != int((float(sumlen[0]) / fsize) * 10):
     214                        print str(int((float(sumlen[0]) / fsize) * 10) * 10) + "% ...",
     215                sys.stdout.flush()
    202216
    203             if sumlen[0] > fsize:
     217            if fsize is not None and sumlen[0] > fsize:
    204218                raise IOError("Wrong file size -- possibly the size of the file changed during encoding.  Original size: %d, observed size at least: %s" % (fsize, sumlen[0],))
    205             for i in range(len(blocks)):
    206                 data = blocks[i]
    207                 fs[i].write(data)
    208                 length -= len(data)
     219            if length != 0:
     220                for i in range(len(blocks)):
     221                    data = blocks[i]
     222                    fs[i].write(data)
     223                    length -= len(data)
     224            # EOF signal, if no fsize, update pad
     225            elif fsize is None:
     226                padbytes = pad_size(oldsumlen, k)
     227                for shnum in range(len(fs)):
     228                    hdr = _build_header(m, k, padbytes, shnum)
     229                    fs[shnum].seek(0)
     230                    fs[shnum].write(hdr)
    209231
    210232        encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096)
    211233    except EnvironmentError, le:
     
    232254# Thanks.
    233255MILLION_BYTES=10**6
    234256
    235 def decode_from_files(outf, infiles, verbose=False):
     257def decode_from_files(outf, infiles, verbose=False, is_stream=False):
    236258    """
    237259    Decode from the first k files in infiles, writing the results to outf.
    238260    """
     
    277299            outf.write(resultdata)
    278300            byteswritten += len(resultdata)
    279301            if verbose:
    280                 if ((byteswritten - len(resultdata)) / (10*MILLION_BYTES)) != (byteswritten / (10*MILLION_BYTES)):
    281                     print str(byteswritten / MILLION_BYTES) + " MB ...",
     302                if ((byteswritten - len(resultdata)) / (30*MILLION_BYTES)) != (byteswritten / (30*MILLION_BYTES)):
     303                    message = str(byteswritten / MILLION_BYTES) + "MB ..."
     304                    outpipe = is_stream and sys.stderr or sys.stdout
     305                    print >> outpipe, message,
     306                    outpipe.flush()
    282307        else:
    283308            # Then this was a short read, so we've reached the end of the sharefiles.
    284309            resultdata = dec.decode(chunks, shnums, padlen)
    285310            outf.write(resultdata)
    286             return # Done.
     311            break # Done
    287312    if verbose:
    288         print
    289         print "Done!"
     313        outpipe = is_stream and sys.stderr or sys.stdout
     314        print >> outpipe
     315        print >> outpipe, "Done!"
    290316
    291317def encode_file(inf, cb, k, m, chunksize=4096):
    292318    """
     
    493519        res = enc.encode(indata)
    494520        cb(res, len(indata))
    495521        indata = inf.read(readsize)
     522    # Final callback to update pad length in header for streaming case.
     523    cb([''] * m, 0)
    496524
    497525# zfec -- fast forward error correction library with Python interface
    498526#
  • zfec-1.4.22/zfec/test/test_zfec.py

    diff -Naur orig/zfec-1.4.22/zfec/test/test_zfec.py streaming/zfec-1.4.22/zfec/test/test_zfec.py
    old new  
    246246                        assert (rm, rk, rpad, rsh,) == (m, k, pad, sh,), h
    247247
    248248    def _help_test_filefec(self, teststr, k, m, numshs=None):
     249        self._do_help_test_filefec(teststr, k, m, numshs=numshs, withsize=False)
     250        self._do_help_test_filefec(teststr, k, m, numshs=numshs, withsize=True)
     251
     252    def _do_help_test_filefec(self, teststr, k, m, numshs=None, withsize=True):
    249253        if numshs == None:
    250254            numshs = m
    251255
     
    253257        PREFIX = "test"
    254258        SUFFIX = ".fec"
    255259
    256         fsize = len(teststr)
     260        if withsize:
     261          fsize = len(teststr)
     262        else:
     263          fsize = None
    257264
    258265        tempdir = fileutil.NamedTemporaryDirectory(cleanup=True)
    259266        try: