mirror of https://github.com/celery/kombu.git
chore: Annotate semaphore.py
Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>
This commit is contained in:
parent
4342af8077
commit
4b67ad1692
|
@ -1,9 +1,21 @@
|
|||
"""Semaphores and concurrency primitives."""
|
||||
|
||||
import sys
|
||||
from collections import deque
|
||||
from typing import TYPE_CHECKING, Callable, Deque, Optional, Tuple, Type
|
||||
|
||||
if sys.version_info < (3, 10):
|
||||
from typing_extensions import ParamSpec
|
||||
else:
|
||||
from typing import ParamSpec
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from types import TracebackType
|
||||
|
||||
|
||||
__all__ = ('DummyLock', 'LaxBoundedSemaphore')
|
||||
|
||||
P = ParamSpec("P")
|
||||
|
||||
|
||||
class LaxBoundedSemaphore:
|
||||
"""Asynchronous Bounded Semaphore.
|
||||
|
@ -28,13 +40,18 @@ class LaxBoundedSemaphore:
|
|||
HELLO 3
|
||||
"""
|
||||
|
||||
def __init__(self, value):
|
||||
def __init__(self, value: int) -> None:
|
||||
self.initial_value = self.value = value
|
||||
self._waiting = deque()
|
||||
self._waiting: Deque[Tuple] = deque()
|
||||
self._add_waiter = self._waiting.append
|
||||
self._pop_waiter = self._waiting.popleft
|
||||
|
||||
def acquire(self, callback, *partial_args, **partial_kwargs):
|
||||
def acquire(
|
||||
self,
|
||||
callback: Callable[P, None],
|
||||
*partial_args: P.args,
|
||||
**partial_kwargs: P.kwargs
|
||||
) -> bool:
|
||||
"""Acquire semaphore.
|
||||
|
||||
This will immediately apply ``callback`` if
|
||||
|
@ -54,7 +71,7 @@ class LaxBoundedSemaphore:
|
|||
callback(*partial_args, **partial_kwargs)
|
||||
return True
|
||||
|
||||
def release(self):
|
||||
def release(self) -> None:
|
||||
"""Release semaphore.
|
||||
|
||||
Note:
|
||||
|
@ -68,23 +85,24 @@ class LaxBoundedSemaphore:
|
|||
else:
|
||||
waiter(*args, **kwargs)
|
||||
|
||||
def grow(self, n=1):
|
||||
def grow(self, n: int = 1) -> None:
|
||||
"""Change the size of the semaphore to accept more users."""
|
||||
self.initial_value += n
|
||||
self.value += n
|
||||
[self.release() for _ in range(n)]
|
||||
for _ in range(n):
|
||||
self.release()
|
||||
|
||||
def shrink(self, n=1):
|
||||
def shrink(self, n: int = 1) -> None:
|
||||
"""Change the size of the semaphore to accept less users."""
|
||||
self.initial_value = max(self.initial_value - n, 0)
|
||||
self.value = max(self.value - n, 0)
|
||||
|
||||
def clear(self):
|
||||
def clear(self) -> None:
|
||||
"""Reset the semaphore, which also wipes out any waiting callbacks."""
|
||||
self._waiting.clear()
|
||||
self.value = self.initial_value
|
||||
|
||||
def __repr__(self):
|
||||
def __repr__(self) -> str:
|
||||
return '<{} at {:#x} value:{} waiting:{}>'.format(
|
||||
self.__class__.__name__, id(self), self.value, len(self._waiting),
|
||||
)
|
||||
|
@ -93,8 +111,13 @@ class LaxBoundedSemaphore:
|
|||
class DummyLock:
|
||||
"""Pretending to be a lock."""
|
||||
|
||||
def __enter__(self):
|
||||
def __enter__(self) -> 'DummyLock':
|
||||
return self
|
||||
|
||||
def __exit__(self, *exc_info):
|
||||
def __exit__(
|
||||
self,
|
||||
exc_type: Optional[Type[BaseException]],
|
||||
exc_val: Optional[BaseException],
|
||||
exc_tb: Optional['TracebackType']
|
||||
) -> None:
|
||||
pass
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
importlib-metadata>=0.18; python_version<"3.8"
|
||||
cached_property; python_version<"3.8"
|
||||
typing_extensions; python_version<"3.10"
|
||||
amqp>=5.0.9,<6.0.0
|
||||
vine
|
||||
|
|
Loading…
Reference in New Issue