hypercorn.utils module

exception hypercorn.utils.FrameTooLargeError

Bases: Exception

exception hypercorn.utils.LifespanFailureError(stage: str, message: str)

Bases: Exception

exception hypercorn.utils.LifespanTimeoutError(stage: str)

Bases: Exception

exception hypercorn.utils.MustReloadError

Bases: Exception

exception hypercorn.utils.NoAppError

Bases: Exception

exception hypercorn.utils.ShutdownError

Bases: Exception

exception hypercorn.utils.UnexpectedMessageError(state: enum.Enum, message_type: str)

Bases: Exception

class hypercorn.utils.WorkerState(terminated: 'bool' = False)

Bases: object

terminated: bool = False
hypercorn.utils.build_and_validate_headers(headers: Iterable[Tuple[bytes, bytes]]) List[Tuple[bytes, bytes]]
async hypercorn.utils.check_multiprocess_shutdown_event(shutdown_event: multiprocessing.synchronize.Event, sleep: Callable[[float], Awaitable[Any]]) None
hypercorn.utils.filter_pseudo_headers(headers: List[Tuple[bytes, bytes]]) List[Tuple[bytes, bytes]]
async hypercorn.utils.invoke_asgi(app: Union[Type[hypercorn.typing.ASGI2Protocol], Callable[[Union[hypercorn.typing.HTTPScope, hypercorn.typing.WebsocketScope, hypercorn.typing.LifespanScope], Callable[[], Awaitable[Union[hypercorn.typing.HTTPRequestEvent, hypercorn.typing.HTTPDisconnectEvent, hypercorn.typing.WebsocketConnectEvent, hypercorn.typing.WebsocketReceiveEvent, hypercorn.typing.WebsocketDisconnectEvent, hypercorn.typing.LifespanStartupEvent, hypercorn.typing.LifespanShutdownEvent]]], Callable[[Union[hypercorn.typing.HTTPResponseStartEvent, hypercorn.typing.HTTPResponseBodyEvent, hypercorn.typing.HTTPServerPushEvent, hypercorn.typing.HTTPDisconnectEvent, hypercorn.typing.WebsocketAcceptEvent, hypercorn.typing.WebsocketSendEvent, hypercorn.typing.WebsocketResponseStartEvent, hypercorn.typing.WebsocketResponseBodyEvent, hypercorn.typing.WebsocketCloseEvent, hypercorn.typing.LifespanStartupCompleteEvent, hypercorn.typing.LifespanStartupFailedEvent, hypercorn.typing.LifespanShutdownCompleteEvent, hypercorn.typing.LifespanShutdownFailedEvent]], Awaitable[None]]], Awaitable[None]]], scope: Union[hypercorn.typing.HTTPScope, hypercorn.typing.WebsocketScope, hypercorn.typing.LifespanScope], receive: Callable[[], Awaitable[Union[hypercorn.typing.HTTPRequestEvent, hypercorn.typing.HTTPDisconnectEvent, hypercorn.typing.WebsocketConnectEvent, hypercorn.typing.WebsocketReceiveEvent, hypercorn.typing.WebsocketDisconnectEvent, hypercorn.typing.LifespanStartupEvent, hypercorn.typing.LifespanShutdownEvent]]], send: Callable[[Union[hypercorn.typing.HTTPResponseStartEvent, hypercorn.typing.HTTPResponseBodyEvent, hypercorn.typing.HTTPServerPushEvent, hypercorn.typing.HTTPDisconnectEvent, hypercorn.typing.WebsocketAcceptEvent, hypercorn.typing.WebsocketSendEvent, hypercorn.typing.WebsocketResponseStartEvent, hypercorn.typing.WebsocketResponseBodyEvent, hypercorn.typing.WebsocketCloseEvent, hypercorn.typing.LifespanStartupCompleteEvent, hypercorn.typing.LifespanStartupFailedEvent, hypercorn.typing.LifespanShutdownCompleteEvent, hypercorn.typing.LifespanShutdownFailedEvent]], Awaitable[None]]) None
hypercorn.utils.load_application(path: str) Union[Type[hypercorn.typing.ASGI2Protocol], Callable[[Union[hypercorn.typing.HTTPScope, hypercorn.typing.WebsocketScope, hypercorn.typing.LifespanScope], Callable[[], Awaitable[Union[hypercorn.typing.HTTPRequestEvent, hypercorn.typing.HTTPDisconnectEvent, hypercorn.typing.WebsocketConnectEvent, hypercorn.typing.WebsocketReceiveEvent, hypercorn.typing.WebsocketDisconnectEvent, hypercorn.typing.LifespanStartupEvent, hypercorn.typing.LifespanShutdownEvent]]], Callable[[Union[hypercorn.typing.HTTPResponseStartEvent, hypercorn.typing.HTTPResponseBodyEvent, hypercorn.typing.HTTPServerPushEvent, hypercorn.typing.HTTPDisconnectEvent, hypercorn.typing.WebsocketAcceptEvent, hypercorn.typing.WebsocketSendEvent, hypercorn.typing.WebsocketResponseStartEvent, hypercorn.typing.WebsocketResponseBodyEvent, hypercorn.typing.WebsocketCloseEvent, hypercorn.typing.LifespanStartupCompleteEvent, hypercorn.typing.LifespanStartupFailedEvent, hypercorn.typing.LifespanShutdownCompleteEvent, hypercorn.typing.LifespanShutdownFailedEvent]], Awaitable[None]]], Awaitable[None]]]
async hypercorn.utils.observe_changes(sleep: Callable[[float], Awaitable[Any]]) None
hypercorn.utils.parse_socket_addr(family: int, address: tuple) Optional[Tuple[str, int]]
async hypercorn.utils.raise_shutdown(shutdown_event: Callable[[...], Awaitable[None]]) None
hypercorn.utils.repr_socket_addr(family: int, address: tuple) str
hypercorn.utils.restart() None
hypercorn.utils.suppress_body(method: str, status_code: int) bool
hypercorn.utils.valid_server_name(config: Config, request: Request) bool
hypercorn.utils.write_pid_file(pid_path: str) None