comparison mercurial/bundle2.py @ 35041:e71890f27767

bundle2: move processpart stream maintenance into part iterator The processpart function also did some stream maintenance, so let's move it to the part iterator as well, as part of moving all part iteration logic into the class. There is one place processpart is called outside of the normal loop, so we manually handle the seek there. The now-empty try/finally will be removed in a later patch, for ease of review. Differential Revision: https://phab.mercurial-scm.org/D706
author Durham Goode <durham@fb.com>
date Thu, 14 Sep 2017 10:20:05 -0700
parents ab379eed2e31
children 07e4170f02f3
comparison
equal deleted inserted replaced
35040:2844c4bd5a39 35041:e71890f27767
352 self.repo = repo 352 self.repo = repo
353 self.op = op 353 self.op = op
354 self.unbundler = unbundler 354 self.unbundler = unbundler
355 self.iterator = None 355 self.iterator = None
356 self.count = 0 356 self.count = 0
357 self.current = None
357 358
358 def __enter__(self): 359 def __enter__(self):
359 def func(): 360 def func():
360 itr = enumerate(self.unbundler.iterparts()) 361 itr = enumerate(self.unbundler.iterparts())
361 for count, p in itr: 362 for count, p in itr:
362 self.count = count 363 self.count = count
364 self.current = p
363 yield p 365 yield p
366 p.seek(0, 2)
367 self.current = None
364 self.iterator = func() 368 self.iterator = func()
365 return self.iterator 369 return self.iterator
366 370
367 def __exit__(self, type, exc, tb): 371 def __exit__(self, type, exc, tb):
368 if not self.iterator: 372 if not self.iterator:
369 return 373 return
370 374
371 if exc: 375 if exc:
376 # If exiting or interrupted, do not attempt to seek the stream in
377 # the finally block below. This makes abort faster.
378 if (self.current and
379 not isinstance(exc, (SystemExit, KeyboardInterrupt))):
380 # consume the part content to not corrupt the stream.
381 self.current.seek(0, 2)
382
372 # Any exceptions seeking to the end of the bundle at this point are 383 # Any exceptions seeking to the end of the bundle at this point are
373 # almost certainly related to the underlying stream being bad. 384 # almost certainly related to the underlying stream being bad.
374 # And, chances are that the exception we're handling is related to 385 # And, chances are that the exception we're handling is related to
375 # getting in that bad state. So, we swallow the seeking error and 386 # getting in that bad state. So, we swallow the seeking error and
376 # re-raise the original error. 387 # re-raise the original error.
453 """process a single part from a bundle 464 """process a single part from a bundle
454 465
455 The part is guaranteed to have been fully consumed when the function exits 466 The part is guaranteed to have been fully consumed when the function exits
456 (even if an exception is raised).""" 467 (even if an exception is raised)."""
457 status = 'unknown' # used by debug output 468 status = 'unknown' # used by debug output
458 hardabort = False
459 try: 469 try:
460 try: 470 try:
461 handler = parthandlermapping.get(part.type) 471 handler = parthandlermapping.get(part.type)
462 if handler is None: 472 if handler is None:
463 status = 'unsupported-type' 473 status = 'unsupported-type'
509 if output: 519 if output:
510 outpart = op.reply.newpart('output', data=output, 520 outpart = op.reply.newpart('output', data=output,
511 mandatory=False) 521 mandatory=False)
512 outpart.addparam( 522 outpart.addparam(
513 'in-reply-to', pycompat.bytestr(part.id), mandatory=False) 523 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
514 # If exiting or interrupted, do not attempt to seek the stream in the
515 # finally block below. This makes abort faster.
516 except (SystemExit, KeyboardInterrupt):
517 hardabort = True
518 raise
519 finally: 524 finally:
520 # consume the part content to not corrupt the stream. 525 pass
521 if not hardabort:
522 part.seek(0, 2)
523 526
524 527
525 def decodecaps(blob): 528 def decodecaps(blob):
526 """decode a bundle2 caps bytes blob into a dictionary 529 """decode a bundle2 caps bytes blob into a dictionary
527 530
1145 if headerblock is None: 1148 if headerblock is None:
1146 indebug(self.ui, 'no part found during interruption.') 1149 indebug(self.ui, 'no part found during interruption.')
1147 return 1150 return
1148 part = unbundlepart(self.ui, headerblock, self._fp) 1151 part = unbundlepart(self.ui, headerblock, self._fp)
1149 op = interruptoperation(self.ui) 1152 op = interruptoperation(self.ui)
1150 _processpart(op, part) 1153 hardabort = False
1154 try:
1155 _processpart(op, part)
1156 except (SystemExit, KeyboardInterrupt):
1157 hardabort = True
1158 raise
1159 finally:
1160 if not hardabort:
1161 part.seek(0, 2)
1151 self.ui.debug('bundle2-input-stream-interrupt:' 1162 self.ui.debug('bundle2-input-stream-interrupt:'
1152 ' closing out of band context\n') 1163 ' closing out of band context\n')
1153 1164
1154 class interruptoperation(object): 1165 class interruptoperation(object):
1155 """A limited operation to be use by part handler during interruption 1166 """A limited operation to be use by part handler during interruption