Module gzipstream
[hide private]
[frames] | no frames]

Source Code for Module gzipstream

  1  """GzipStream & GzipStreamXL Classes 
  2   
  3  GzipStream (Python v1.5.2 - v2.2.*): 
  4      A streaming gzip handler. 
  5      gzipstream.GzipStream extends the functionality of the gzip.GzipFile class 
  6      to allow the processing of streaming data. 
  7      This is done by buffering the stream as it passes through (a seekable 
  8      object is needed). 
  9   
 10  GzipStreamXL (Python v1.5.2/v2.1.* --- ie. not v2.2.*): 
 11      A streaming gzip handler for very large files. 
 12   
 13  _StreamBuf: 
 14      Allow seeks on socket-like objects -- support GzipStream class. 
 15      Enables non-seekable file-like objects some flexibility as regards to 
 16      seeking. It does this via a buffer, a StringIO object. Note, because 
 17      it is assumed that a socket stream is being manipulated, once the buffer 
 18      "window" has passed over a data segment, seeking prior to that is not 
 19      allowed. 
 20   
 21  XXX: Eventually, I wish to merge this with the gzip.GzipFile somehow and 
 22       submit to the python folks. 
 23   
 24  Author: Todd Warner <taw@redhat.com> 
 25  Copyright (c) 2002-2010, Red Hat, Inc. 
 26  Released under Python license and GPLv2 license 
 27  """ 
 28  # $Id: gzipstream.py,v 1.53 2004/07/08 17:49:26 taw Exp $ 
 29   
 30  #WARNING: gzipstream will wrap a file-object. The responsibility of properly 
 31  #WARNING: destroying/closing that file-object resides outside of these 
 32  #WARNING: classes. 
 33  #WARNING: 
 34  #WARNING: Also, due to issues with python 1.5.2/2.1.* garbage collection issues, 
 35  #WARNING: responsibility of properly handling flushing IO and other expected 
 36  #WARNING: behavior of a properly collected object *also* resides with the 
 37  #WARNING: instantiating entity. I.e., you need to explicitely close your 
 38  #WARNING: GzipStream object!!! 
 39   
 40   
 41  import sys 
 42  import gzip 
 43  from gzip import zlib 
 44  from types import IntType, LongType 
 45  import struct 
 46  import string 
 47  try: 
 48      # Is this *still* needed? cStringIO supposedly works on all platforms 
 49      from cStringIO import StringIO 
 50  except ImportError: 
 51      from StringIO import StringIO 
 52   
 53   
 54  _DEBUG_YN = 0 
 55  if _DEBUG_YN: 
 56      import time 
 57      try: 
 58          import thread 
 59      except: 
 60          pass 
 61   
 62   
63 -def __getSysVersion():
64 """Return 1 for Python versions 1.5.* and 2.1.* 65 Return 2 for Python versions 2.2+.* 66 """ 67 minor = int(string.split(string.split(sys.version)[0], '.')[1]) 68 if minor < 2: 69 return 1 70 return 2
71 _SYS_VERSION = __getSysVersion() 72 73
74 -class GzipStream(gzip.GzipFile):
75 """Handle streaming gzipped data 76 77 GzipStream extends the functionality of the gzip.GzipFile class. 78 gzip.GzipFile generally needs a seekable object. This doesn't allow for 79 streaming gzipped data to be processed easily (e.g. can't seek a socket). 80 Using the _StreamBuf class enables streaming gzipped data to be processed 81 by buffering that data at it passes through. 82 83 For Python versions 1.5.2 & 2.1.*: 84 Normal data version. 85 Normally sized data stream version == faster. 86 For very large data streams (2.5GB-ish), use GzipStreamXL. 87 """ 88 VERSION = _SYS_VERSION # so garbage collector doesn't nuke it too early with 89 # older (v1.5.2-v2.1.*) python. 90
91 - def __init__(self, stream=None, mode=None, compresslevel=9):
92 if stream is None: 93 stream = sys.stdout 94 95 mode = self._initModeLogic(stream, mode) 96 97 # self.stream becomes a _StreamBuf object 98 if not isinstance(stream, _StreamBuf): 99 self.stream = _StreamBuf(stream, mode) 100 else: 101 self.stream = stream 102 self._gzip = gzip # hang onto for destructive reasons 103 self._gzip.GzipFile.__init__(self, '', mode, compresslevel, self.stream)
104
105 - def _initModeLogic(self, stream, mode):
106 "attempt to determine the mode" 107 _mode = None 108 _modes = '' 109 if hasattr(stream, 'mode'): 110 _mode = stream.mode 111 _modes = _mode 112 # Attributes lie, by the way, so sometimes we have to punt. 113 if not _mode and hasattr(stream, 'read'): 114 _modes = _modes + 'r' 115 if not _mode and hasattr(stream, 'write'): 116 _modes = _modes + 'w' 117 # NOTE: Async objects needs a mode set or defaults to 'rb' 118 119 if not _mode and not mode: 120 # punt 121 if 'r' in _modes: 122 mode = _mode = 'rb' 123 elif 'w' in _modes: 124 mode = _mode = 'wb' 125 elif not mode: 126 mode = _mode 127 128 if mode[0] not in _modes: 129 raise ValueError, 'Mode %s not supported' % mode 130 return mode
131
132 - def _read(self, size=1024):
133 # overloaded --- one line changed. 134 # Instead of seek(0,2) to see if we are at the end of the 135 # file, just do a seek(pos+1) if the same then we are at the 136 # end of the file. 137 if self.stream is None: 138 raise EOFError, "Reached EOF" 139 140 if self._new_member: 141 # If the _new_member flag is set, we have to 142 # 143 # First, check if we're at the end of the file; 144 # if so, it's time to stop; no more members to read. 145 pos = self.stream.tell() # Save current position 146 self.stream.seek(pos+1) # Seek further... if at end, won't 147 # seek any further. 148 if pos == self.stream.tell(): 149 self.stream.close() 150 self.stream = None 151 return EOFError, "Reached EOF" 152 else: 153 self.stream.seek( pos ) # Return to original position 154 155 self._init_read() 156 self._read_gzip_header() 157 self.decompress = zlib.decompressobj(-zlib.MAX_WBITS) 158 self._new_member = 0 159 160 # Read a chunk of data from the file 161 buf = self.stream.read(size) 162 163 # If the EOF has been reached, flush the decompression object 164 # and mark this object as finished. 165 166 if buf == "": 167 uncompress = self.decompress.flush() 168 self._read_eof() 169 self.stream.close() 170 self.stream = None 171 self._add_read_data( uncompress ) 172 raise EOFError, 'Reached EOF' 173 174 uncompress = self.decompress.decompress(buf) 175 self._add_read_data( uncompress ) 176 177 if self.decompress.unused_data != "": 178 # Ending case: we've come to the end of a member in the file, 179 # so seek back to the start of the unused data, finish up 180 # this member, and read a new gzip header. 181 # (The number of bytes to seek back is the length of the unused 182 # data, minus 8 because _read_eof() will rewind a further 8 bytes) 183 self.stream.seek( -len(self.decompress.unused_data)+8, 1) 184 185 # Check the CRC and file size, and set the flag so we read 186 # a new member on the next call 187 self._read_eof() 188 self._new_member = 1
189
190 - def seek(self, offset):
191 raise IOError, 'Random access not allowed in gzip streams'
192
193 - def __repr__(self):
194 ret = '' 195 if self.stream._closedYN: 196 ret = "<closed gzipstream.GzipStream instance, mode '%s' at %s>" % \ 197 (self.stream.mode, id(self)) 198 else: 199 ret = "<open gzipstream.GzipStream instance, mode '%s' at %s>" % \ 200 (self.stream.mode, id(self)) 201 return ret
202 203 ### These methods are generally only important for Python v2.2.* ###
204 - def _read_eof(self):
205 # overloaded to accommodate LongType 206 if type(self.size) == LongType: 207 self._gzip.read32 = self._read32XL 208 self._gzip.GzipFile._read_eof(self)
209
210 - def close(self):
211 if self.stream and self.stream._closedYN: 212 # remove this block for python v2.2.* 213 return 214 # overloaded to accommodate LongType 215 if hasattr(self, 'size'): 216 if type(self.size) == LongType: 217 self._gzip.write32 = self._gzip.write32u 218 else: 219 # write32u is the "safest" route if punting. 220 self._gzip.write32 = self._gzip.write32u 221 self._gzip.GzipFile.close(self) 222 if self.stream: 223 self.stream.close()
224
225 - def _read32XL(self, input):
226 """Allow for very large files/streams to be processed. 227 Slows things down, but... 228 229 Used by Python v2.2.*. 230 Also used by Python v1.5.2/v2.1.* in inheriting class GzipStreamXL. 231 """ 232 return struct.unpack("<L", input.read(4))[0]
233 234 235 # 236 # Python v1.5.2/v2.1.* version only class 237 # 238 if _SYS_VERSION == 1:
239 - class GzipStreamXL(GzipStream):
240 """Handle streaming gzipped data -- large data version. 241 242 Very large sized data stream version -- slooower. 243 For normally sized data streams (< 2.5GB-ish), use GzipStream. 244 """
245 - def __init__(self, stream=None, mode=None, compresslevel=9):
246 gzip.read32 = self._read32XL 247 gzip.write32 = gzip.write32u 248 GzipStream.__init__(self, stream, mode, compresslevel)
249 250
251 - def _init_write(self, filename):
252 """Make size long in order to support very large files. 253 """ 254 GzipStream._init_write(self, filename) 255 self.size = 0L
256 257
258 - def _init_read(self):
259 """Make size a long in order to support very large files. 260 """ 261 GzipStream._init_read(self) 262 self.size = 0L
263 264
265 -class _StreamBuf:
266 """Stream buffer for file-like objects. 267 268 Allow seeks on socket-like objects. 269 Enables non-seekable file-like objects some flexibility as regards to 270 seeking. It does this via a buffer, a StringIO object. Note, because 271 it is assumed that a socket stream is being manipulated, once the buffer 272 "window" has passed over a data segment, seeking prior to that is not 273 allowed. 274 XXX: probably reinventing the wheel. 275 """ 276 __MIN_READ_SIZE = 1024 * 2 # Default = 2K 277 __MAX_BUFIO_SIZE = __MIN_READ_SIZE * 10 # Default = 20K 278 __ABS_MAX_BUFIO_SIZE = __MAX_BUFIO_SIZE * 2 # Default = 40K 279 280 ### Python versions 1.5.2 & 2.1.* only: 281 __INT_CHECK_SIZE = sys.maxint - __ABS_MAX_BUFIO_SIZE -2 282 283 VERSION = _SYS_VERSION # so garbage collector doesn't nuke it too early with 284 # older (v1.5.2-v2.1.*) python. 285
286 - def __init__(self, stream=None, mode=None):
287 """Constructor. 288 stream: an open file-like object. 289 """ 290 self.fo = stream 291 self._readableYN = 0 292 self._writableYN = 0 293 294 if self.fo is None: 295 self.fo = StringIO() 296 mode = 'wb' 297 self._readableYN = 1 298 self._writableYN = 1 299 300 # If mode not declared, try to figure it out. 301 if mode is None: 302 try: 303 mode = self.fo.mode 304 except: 305 pass 306 307 # Can only read or write, not both and really the 'b' is meaningless. 308 if not mode or (type(mode) == type("") \ 309 and (mode[0] not in 'rw' or (len(mode) > 1 and mode[1] != 'b'))): 310 raise IOError, (22, "Invalid argument: mode=%s" % repr(mode)) 311 312 if mode[0] == 'r': 313 self._readableYN = 1 314 else: 315 self._writableYN = 1 316 317 # Better be an open file-like object. 318 if self._readableYN: 319 self.fo.read # Throw AttributeError if not readable. 320 if self._writableYN: 321 self.fo.write # Throw AttributeError if not writable. 322 323 self._closedYN = 0 324 self._currFoPos = 0 # Assume at beginning of stream. 325 self._bufIO = StringIO() 326 self._lenBufIO = 0 327 self.mode = mode 328 # Threaded debug loop: 329 self.__mutexOnYN = 0 330 if _DEBUG_YN and globals().has_key('thread'): 331 thread.start_new(self.__debugThread, ())
332
333 - def __del__(self):
334 "Destructor" 335 # Python v1.5.2/v2.1.* tries to run this but close doesn't always 336 # still exist. For a pure Python v2.2.*, remove the try: except:. 337 try: 338 self.close() 339 except: 340 pass
341
342 - def isatty(self):
343 if self._closedYN: 344 raise ValueError, "I/O operation on closed _StreamBuf object" 345 return 0
346
347 - def _read(self, size):
348 """A buffered read --- refactored. 349 """ 350 if self._closedYN: 351 raise ValueError, "I/O operation on closed _StreamBuf object" 352 if not self._readableYN: 353 raise IOError, (9, "Can't read from a write only object") 354 tell = self._bufIO.tell() 355 bufIO = self._bufIO.read(size) 356 lbufIO = len(bufIO) 357 bufFo = '' 358 lbufFo = 0 359 if lbufIO < size: 360 # We read to end of buffer; read from file and tag onto buffer. 361 buf = self.fo.read(_StreamBuf.__MIN_READ_SIZE) 362 bufFo = buf 363 lbufFo = len(bufFo) 364 while buf and lbufFo + lbufIO < size: 365 buf = self.fo.read(_StreamBuf.__MIN_READ_SIZE) 366 bufFo = '%s%s' % (bufFo, buf) 367 lbufFo = len(bufFo) 368 self._bufIO.write(bufFo) 369 self.__mutexOnYN = 1 370 self._lenBufIO = self._lenBufIO + lbufFo 371 self.__mutexOnYN = 0 372 if lbufIO + lbufFo < size: # covers case that size > filelength. 373 size = lbufIO + lbufFo 374 self._bufIO.seek(tell + size) 375 if _StreamBuf.VERSION == 1: 376 self._currFoPos = self.__checkInt(self._currFoPos) 377 self._currFoPos = self._currFoPos + size 378 bufFo = bufFo[:size-lbufIO] 379 self._refactorBufIO() 380 return '%s%s' % (bufIO, bufFo)
381
382 - def read(self, size=None):
383 """A buffered read. 384 """ 385 if size and size < 0: 386 raise IOError, (22, "Invalid argument") 387 if not self._readableYN: 388 raise IOError, (9, "Can't read from a write only object") 389 fetchSize = _StreamBuf.__MAX_BUFIO_SIZE 390 if size: 391 fetchSize = min(fetchSize, size) 392 buf = self._read(fetchSize) 393 bufOut = buf 394 accumSize = len(buf) 395 while buf: 396 if size and accumSize >= size: 397 break 398 buf = self._read(fetchSize) 399 bufOut = '%s%s' % (bufOut, buf) 400 if _StreamBuf.VERSION == 1: 401 accumSize = self.__checkInt(accumSize) 402 accumSize = accumSize + len(buf) 403 return bufOut
404
405 - def readline(self):
406 """Return one line of text: a string ending in a '\n' or EOF. 407 """ 408 if self._closedYN: 409 raise ValueError, "I/O operation on closed _StreamBuf object" 410 if not self._readableYN: 411 raise IOError, (9, "Can't read from a write only object") 412 line = '' 413 buf = self.read(_StreamBuf.__MIN_READ_SIZE) 414 while buf: 415 i = string.find(buf, '\n') 416 if i >= 0: 417 i = i + 1 418 self._bufIO.seek(-(len(buf)-i), 1) 419 buf = buf[:i] 420 line = '%s%s' % (line, buf) 421 break 422 line = '%s%s' % (line, buf) 423 buf = self.read(_StreamBuf.__MIN_READ_SIZE) 424 return line
425
426 - def readlines(self):
427 """Read entire file into memory! And return a list of lines of text. 428 """ 429 if self._closedYN: 430 raise ValueError, "I/O operation on closed _StreamBuf object" 431 if not self._readableYN: 432 raise IOError, (9, "Can't read from a write only object") 433 lines = [] 434 line = self.readline() 435 while line: 436 lines.append(line) 437 line = self.readline() 438 return lines
439
440 - def _refactorBufIO(self, writeFlushYN=0):
441 """Keep the buffer window within __{MAX,ABS_MAX}_BUF_SIZE before 442 the current self._bufIO.tell() position. 443 """ 444 self.__mutexOnYN = 1 445 tell = self._bufIO.tell() 446 tossed = '' 447 if writeFlushYN: 448 tossed = self._bufIO.getvalue()[:tell] 449 self._lenBufIO = self._lenBufIO - len(tossed) 450 tell = tell - len(tossed) 451 s = self._bufIO.getvalue()[tell:] 452 self._bufIO = StringIO() 453 self._bufIO.write(s) 454 self._bufIO.seek(tell) 455 elif tell >= _StreamBuf.__ABS_MAX_BUFIO_SIZE: 456 tossed = self._bufIO.getvalue()[:_StreamBuf.__MAX_BUFIO_SIZE] 457 self._lenBufIO = self._lenBufIO - _StreamBuf.__MAX_BUFIO_SIZE 458 tell = tell - _StreamBuf.__MAX_BUFIO_SIZE 459 s = self._bufIO.getvalue()[_StreamBuf.__MAX_BUFIO_SIZE:] 460 self._bufIO = StringIO() 461 self._bufIO.write(s) 462 self._bufIO.seek(tell) 463 self.__mutexOnYN = 0 464 return tossed
465
466 - def _dumpValues(self):
467 """Debug code. 468 """ 469 err = sys.stderr.write 470 err('self._lenBufIO: %s/%s\n' % (self._lenBufIO, 471 len(self._bufIO.getvalue()))) 472 err('self._currFoPos: %s\n' % self._currFoPos) 473 err('self._readableYN: %s\n' % self._readableYN) 474 err('self._writableYN: %s\n' % self._writableYN) 475 err('self._closedYN: %s\n' % self._closedYN)
476
477 - def write(self, s):
478 """Write string to stream. 479 """ 480 if self._closedYN: 481 raise ValueError, "I/O operation on closed _StreamBuf object" 482 if not self._writableYN: 483 raise IOError, (9, "Can't write to a read only object") 484 self._bufIO.write(s) 485 if _StreamBuf.VERSION == 1: 486 self._currFoPos = self.__checkInt(self._currFoPos) 487 self._currFoPos = self._currFoPos + len(s) 488 self.__mutexOnYN = 1 489 self._lenBufIO = self._lenBufIO + len(s) 490 self.__mutexOnYN = 0 491 self.fo.write(self._refactorBufIO())
492
493 - def writelines(self, l):
494 """Given list, concatenate and write. 495 """ 496 if self._closedYN: 497 raise ValueError, "I/O operation on closed _StreamBuf object" 498 if not self._writableYN: 499 raise IOError, (9, "Can't write to a read only object") 500 for s in l: 501 self.write(s)
502
503 - def seek(self, offset, where=0):
504 """A limited seek method. See class __doc__ for more details. 505 """ 506 if self._closedYN: 507 raise ValueError, "I/O operation on closed _StreamBuf object" 508 509 tell = self._bufIO.tell() 510 beginBuf = self._currFoPos - tell 511 endBuf = self._lenBufIO + beginBuf - 1 512 513 # Offset from beginning? 514 if not where: 515 pass 516 # Offset from current position? 517 elif where == 1: 518 if _StreamBuf.VERSION == 1: 519 offset = self.__checkInt(offset) 520 offset = self._currFoPos + offset 521 # Offset from end? 522 elif where == 2: 523 if self._readableYN: 524 if offset < 0 and offset < _StreamBuf.__ABS_MAX_BUFIO_SIZE: 525 raise IOError, (22, "Invalid argument; can't determine %s " 526 "position due to unknown stream length" % offset) 527 # Could be ugly if, for example, a socket stream "never ends" ;) 528 while self.read(_StreamBuf.__MAX_BUFIO_SIZE): 529 pass 530 self._currFoPos = self._currFoPos + offset 531 self._bufIO.seek(offset, 2) 532 return 533 elif self._writableYN: 534 offset = endBuf + offset 535 else: 536 raise IOError, (22, "Invalid argument") 537 if self._writableYN and offset > endBuf: 538 offset = endBuf 539 # 540 # Offset reflects "from beginning of file" now. 541 # 542 if offset < 0: 543 raise IOError, (22, "Invalid argument") 544 delta = offset - self._currFoPos 545 # Before beginning of buffer -- can't do it sensibly -- data gone. 546 if offset < beginBuf: 547 raise IOError, (22, "Invalid argument; attempted seek before " 548 "beginning of buffer") 549 # After end of buffer. 550 elif offset > endBuf: 551 if self._readableYN: 552 while delta: 553 x = min(_StreamBuf.__MAX_BUFIO_SIZE, delta) 554 self.read(x) 555 delta = delta - x 556 # Within the buffer. 557 else: 558 self._bufIO.seek(tell + delta, 0) 559 if _StreamBuf.VERSION == 1: 560 self._currFoPos = self.__checkInt(self._currFoPos) 561 self._currFoPos = self._currFoPos + self._bufIO.tell() - tell
562
563 - def tell(self):
564 """Return current position in the file-like object. 565 """ 566 return self._currFoPos
567
568 - def close(self):
569 """Flush the buffer. 570 NOTE: fileobject is NOT closed, just flushed. Mapping as closely as 571 possible to GzipFile. 572 """ 573 self.flush() 574 self._closedYN = 1
575
576 - def flush(self):
577 """Flush the buffer. 578 """ 579 if self._closedYN: 580 raise ValueError, "I/O operation on closed _StreamBuf object" 581 if self._readableYN: 582 pass 583 if self._writableYN: 584 self.fo.write(self._refactorBufIO(1)) 585 if _StreamBuf.VERSION == 1: 586 # may seem a bit redundant, but want to easily cut this 587 # stuff out someday. 588 try: 589 self.fo.flush() 590 except AttributeError: 591 pass 592 return 593 self.fo.flush()
594
595 - def __repr__(self):
596 ret = '' 597 if self._closedYN: 598 ret = "<closed gzipstream._StreamBuf instance, mode '%s' at %s>" % \ 599 (self.mode, id(self)) 600 else: 601 ret = "<open gzipstream._StreamBuf instance, mode '%s' at %s>" % \ 602 (self.mode, id(self)) 603 return ret
604 605 # __private__ 606
607 - def __checkInt(self, i):
608 """Might be faster just to declare them longs. 609 Python versions 1.5.2 & 2.1.* ONLY! 610 """ 611 if i > _StreamBuf.__INT_CHECK_SIZE and type(i) == IntType: 612 i = long(i) 613 return i
614
615 - def __debugThread(self):
616 """XXX: Only used for debugging. Runs a thread that watches some 617 tell-tale warning flags that something bad is happening. 618 """ 619 while not self._closedYN and not self.__mutexOnYN: 620 if self._lenBufIO != len(self._bufIO.getvalue()): 621 sys.stderr.write('XXX: ERROR! _lenBufIO != len(...): %s != %s\n' 622 % (self._lenBufIO, len(self._bufIO.getvalue()))) 623 sys.stderr.write('XXX: %s\n' % repr(self)) 624 if self._lenBufIO > _StreamBuf.__ABS_MAX_BUFIO_SIZE*2: 625 sys.stderr.write('XXX: ERROR! StringIO buffer WAY to big: %s\n' 626 % self._lenBufIO) 627 sys.stderr.write('XXX: %s\n' % repr(self)) 628 time.sleep(1)
629 630 #------------------------------------------------------------------------------- 631