Mercurial > hg > mercurial-source
comparison mercurial/bundle2.py @ 35041:e71890f27767
bundle2: move processpart stream maintenance into part iterator
The processpart function also did some stream maintenance, so let's move it to
the part iterator as well, as part of moving all part iteration logic into the
class.
There is one place processpart is called outside of the normal loop, so we
manually handle the seek there.
The now-empty try/finally will be removed in a later patch, for ease of review.
Differential Revision: https://phab.mercurial-scm.org/D706
author | Durham Goode <durham@fb.com> |
---|---|
date | Thu, 14 Sep 2017 10:20:05 -0700 |
parents | ab379eed2e31 |
children | 07e4170f02f3 |
comparison
equal
deleted
inserted
replaced
35040:2844c4bd5a39 | 35041:e71890f27767 |
---|---|
352 self.repo = repo | 352 self.repo = repo |
353 self.op = op | 353 self.op = op |
354 self.unbundler = unbundler | 354 self.unbundler = unbundler |
355 self.iterator = None | 355 self.iterator = None |
356 self.count = 0 | 356 self.count = 0 |
357 self.current = None | |
357 | 358 |
358 def __enter__(self): | 359 def __enter__(self): |
359 def func(): | 360 def func(): |
360 itr = enumerate(self.unbundler.iterparts()) | 361 itr = enumerate(self.unbundler.iterparts()) |
361 for count, p in itr: | 362 for count, p in itr: |
362 self.count = count | 363 self.count = count |
364 self.current = p | |
363 yield p | 365 yield p |
366 p.seek(0, 2) | |
367 self.current = None | |
364 self.iterator = func() | 368 self.iterator = func() |
365 return self.iterator | 369 return self.iterator |
366 | 370 |
367 def __exit__(self, type, exc, tb): | 371 def __exit__(self, type, exc, tb): |
368 if not self.iterator: | 372 if not self.iterator: |
369 return | 373 return |
370 | 374 |
371 if exc: | 375 if exc: |
376 # If exiting or interrupted, do not attempt to seek the stream in | |
377 # the finally block below. This makes abort faster. | |
378 if (self.current and | |
379 not isinstance(exc, (SystemExit, KeyboardInterrupt))): | |
380 # consume the part content to not corrupt the stream. | |
381 self.current.seek(0, 2) | |
382 | |
372 # Any exceptions seeking to the end of the bundle at this point are | 383 # Any exceptions seeking to the end of the bundle at this point are |
373 # almost certainly related to the underlying stream being bad. | 384 # almost certainly related to the underlying stream being bad. |
374 # And, chances are that the exception we're handling is related to | 385 # And, chances are that the exception we're handling is related to |
375 # getting in that bad state. So, we swallow the seeking error and | 386 # getting in that bad state. So, we swallow the seeking error and |
376 # re-raise the original error. | 387 # re-raise the original error. |
453 """process a single part from a bundle | 464 """process a single part from a bundle |
454 | 465 |
455 The part is guaranteed to have been fully consumed when the function exits | 466 The part is guaranteed to have been fully consumed when the function exits |
456 (even if an exception is raised).""" | 467 (even if an exception is raised).""" |
457 status = 'unknown' # used by debug output | 468 status = 'unknown' # used by debug output |
458 hardabort = False | |
459 try: | 469 try: |
460 try: | 470 try: |
461 handler = parthandlermapping.get(part.type) | 471 handler = parthandlermapping.get(part.type) |
462 if handler is None: | 472 if handler is None: |
463 status = 'unsupported-type' | 473 status = 'unsupported-type' |
509 if output: | 519 if output: |
510 outpart = op.reply.newpart('output', data=output, | 520 outpart = op.reply.newpart('output', data=output, |
511 mandatory=False) | 521 mandatory=False) |
512 outpart.addparam( | 522 outpart.addparam( |
513 'in-reply-to', pycompat.bytestr(part.id), mandatory=False) | 523 'in-reply-to', pycompat.bytestr(part.id), mandatory=False) |
514 # If exiting or interrupted, do not attempt to seek the stream in the | |
515 # finally block below. This makes abort faster. | |
516 except (SystemExit, KeyboardInterrupt): | |
517 hardabort = True | |
518 raise | |
519 finally: | 524 finally: |
520 # consume the part content to not corrupt the stream. | 525 pass |
521 if not hardabort: | |
522 part.seek(0, 2) | |
523 | 526 |
524 | 527 |
525 def decodecaps(blob): | 528 def decodecaps(blob): |
526 """decode a bundle2 caps bytes blob into a dictionary | 529 """decode a bundle2 caps bytes blob into a dictionary |
527 | 530 |
1145 if headerblock is None: | 1148 if headerblock is None: |
1146 indebug(self.ui, 'no part found during interruption.') | 1149 indebug(self.ui, 'no part found during interruption.') |
1147 return | 1150 return |
1148 part = unbundlepart(self.ui, headerblock, self._fp) | 1151 part = unbundlepart(self.ui, headerblock, self._fp) |
1149 op = interruptoperation(self.ui) | 1152 op = interruptoperation(self.ui) |
1150 _processpart(op, part) | 1153 hardabort = False |
1154 try: | |
1155 _processpart(op, part) | |
1156 except (SystemExit, KeyboardInterrupt): | |
1157 hardabort = True | |
1158 raise | |
1159 finally: | |
1160 if not hardabort: | |
1161 part.seek(0, 2) | |
1151 self.ui.debug('bundle2-input-stream-interrupt:' | 1162 self.ui.debug('bundle2-input-stream-interrupt:' |
1152 ' closing out of band context\n') | 1163 ' closing out of band context\n') |
1153 | 1164 |
1154 class interruptoperation(object): | 1165 class interruptoperation(object): |
1155 """A limited operation to be use by part handler during interruption | 1166 """A limited operation to be use by part handler during interruption |