diff options
| author | sotech117 <michael_foiani@brown.edu> | 2025-07-31 17:27:24 -0400 |
|---|---|---|
| committer | sotech117 <michael_foiani@brown.edu> | 2025-07-31 17:27:24 -0400 |
| commit | 5bf22fc7e3c392c8bd44315ca2d06d7dca7d084e (patch) | |
| tree | 8dacb0f195df1c0788d36dd0064f6bbaa3143ede /venv/lib/python3.8/site-packages/werkzeug/middleware | |
| parent | b832d364da8c2efe09e3f75828caf73c50d01ce3 (diff) | |
add code for analysis of data
Diffstat (limited to 'venv/lib/python3.8/site-packages/werkzeug/middleware')
7 files changed, 1377 insertions, 0 deletions
diff --git a/venv/lib/python3.8/site-packages/werkzeug/middleware/__init__.py b/venv/lib/python3.8/site-packages/werkzeug/middleware/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/venv/lib/python3.8/site-packages/werkzeug/middleware/__init__.py diff --git a/venv/lib/python3.8/site-packages/werkzeug/middleware/dispatcher.py b/venv/lib/python3.8/site-packages/werkzeug/middleware/dispatcher.py new file mode 100644 index 0000000..e11bacc --- /dev/null +++ b/venv/lib/python3.8/site-packages/werkzeug/middleware/dispatcher.py @@ -0,0 +1,81 @@ +""" +Application Dispatcher +====================== + +This middleware creates a single WSGI application that dispatches to +multiple other WSGI applications mounted at different URL paths. + +A common example is writing a Single Page Application, where you have a +backend API and a frontend written in JavaScript that does the routing +in the browser rather than requesting different pages from the server. +The frontend is a single HTML and JS file that should be served for any +path besides "/api". + +This example dispatches to an API app under "/api", an admin app +under "/admin", and an app that serves frontend files for all other +requests:: + + app = DispatcherMiddleware(serve_frontend, { + '/api': api_app, + '/admin': admin_app, + }) + +In production, you might instead handle this at the HTTP server level, +serving files or proxying to application servers based on location. The +API and admin apps would each be deployed with a separate WSGI server, +and the static files would be served directly by the HTTP server. + +.. autoclass:: DispatcherMiddleware + +:copyright: 2007 Pallets +:license: BSD-3-Clause +""" + +from __future__ import annotations + +import typing as t + +if t.TYPE_CHECKING: + from _typeshed.wsgi import StartResponse + from _typeshed.wsgi import WSGIApplication + from _typeshed.wsgi import WSGIEnvironment + + +class DispatcherMiddleware: + """Combine multiple applications as a single WSGI application. + Requests are dispatched to an application based on the path it is + mounted under. + + :param app: The WSGI application to dispatch to if the request + doesn't match a mounted path. + :param mounts: Maps path prefixes to applications for dispatching. + """ + + def __init__( + self, + app: WSGIApplication, + mounts: dict[str, WSGIApplication] | None = None, + ) -> None: + self.app = app + self.mounts = mounts or {} + + def __call__( + self, environ: WSGIEnvironment, start_response: StartResponse + ) -> t.Iterable[bytes]: + script = environ.get("PATH_INFO", "") + path_info = "" + + while "/" in script: + if script in self.mounts: + app = self.mounts[script] + break + + script, last_item = script.rsplit("/", 1) + path_info = f"/{last_item}{path_info}" + else: + app = self.mounts.get(script, self.app) + + original_script_name = environ.get("SCRIPT_NAME", "") + environ["SCRIPT_NAME"] = original_script_name + script + environ["PATH_INFO"] = path_info + return app(environ, start_response) diff --git a/venv/lib/python3.8/site-packages/werkzeug/middleware/http_proxy.py b/venv/lib/python3.8/site-packages/werkzeug/middleware/http_proxy.py new file mode 100644 index 0000000..5e23915 --- /dev/null +++ b/venv/lib/python3.8/site-packages/werkzeug/middleware/http_proxy.py @@ -0,0 +1,236 @@ +""" +Basic HTTP Proxy +================ + +.. autoclass:: ProxyMiddleware + +:copyright: 2007 Pallets +:license: BSD-3-Clause +""" + +from __future__ import annotations + +import typing as t +from http import client +from urllib.parse import quote +from urllib.parse import urlsplit + +from ..datastructures import EnvironHeaders +from ..http import is_hop_by_hop_header +from ..wsgi import get_input_stream + +if t.TYPE_CHECKING: + from _typeshed.wsgi import StartResponse + from _typeshed.wsgi import WSGIApplication + from _typeshed.wsgi import WSGIEnvironment + + +class ProxyMiddleware: + """Proxy requests under a path to an external server, routing other + requests to the app. + + This middleware can only proxy HTTP requests, as HTTP is the only + protocol handled by the WSGI server. Other protocols, such as + WebSocket requests, cannot be proxied at this layer. This should + only be used for development, in production a real proxy server + should be used. + + The middleware takes a dict mapping a path prefix to a dict + describing the host to be proxied to:: + + app = ProxyMiddleware(app, { + "/static/": { + "target": "http://127.0.0.1:5001/", + } + }) + + Each host has the following options: + + ``target``: + The target URL to dispatch to. This is required. + ``remove_prefix``: + Whether to remove the prefix from the URL before dispatching it + to the target. The default is ``False``. + ``host``: + ``"<auto>"`` (default): + The host header is automatically rewritten to the URL of the + target. + ``None``: + The host header is unmodified from the client request. + Any other value: + The host header is overwritten with the value. + ``headers``: + A dictionary of headers to be sent with the request to the + target. The default is ``{}``. + ``ssl_context``: + A :class:`ssl.SSLContext` defining how to verify requests if the + target is HTTPS. The default is ``None``. + + In the example above, everything under ``"/static/"`` is proxied to + the server on port 5001. The host header is rewritten to the target, + and the ``"/static/"`` prefix is removed from the URLs. + + :param app: The WSGI application to wrap. + :param targets: Proxy target configurations. See description above. + :param chunk_size: Size of chunks to read from input stream and + write to target. + :param timeout: Seconds before an operation to a target fails. + + .. versionadded:: 0.14 + """ + + def __init__( + self, + app: WSGIApplication, + targets: t.Mapping[str, dict[str, t.Any]], + chunk_size: int = 2 << 13, + timeout: int = 10, + ) -> None: + def _set_defaults(opts: dict[str, t.Any]) -> dict[str, t.Any]: + opts.setdefault("remove_prefix", False) + opts.setdefault("host", "<auto>") + opts.setdefault("headers", {}) + opts.setdefault("ssl_context", None) + return opts + + self.app = app + self.targets = { + f"/{k.strip('/')}/": _set_defaults(v) for k, v in targets.items() + } + self.chunk_size = chunk_size + self.timeout = timeout + + def proxy_to( + self, opts: dict[str, t.Any], path: str, prefix: str + ) -> WSGIApplication: + target = urlsplit(opts["target"]) + # socket can handle unicode host, but header must be ascii + host = target.hostname.encode("idna").decode("ascii") + + def application( + environ: WSGIEnvironment, start_response: StartResponse + ) -> t.Iterable[bytes]: + headers = list(EnvironHeaders(environ).items()) + headers[:] = [ + (k, v) + for k, v in headers + if not is_hop_by_hop_header(k) + and k.lower() not in ("content-length", "host") + ] + headers.append(("Connection", "close")) + + if opts["host"] == "<auto>": + headers.append(("Host", host)) + elif opts["host"] is None: + headers.append(("Host", environ["HTTP_HOST"])) + else: + headers.append(("Host", opts["host"])) + + headers.extend(opts["headers"].items()) + remote_path = path + + if opts["remove_prefix"]: + remote_path = remote_path[len(prefix) :].lstrip("/") + remote_path = f"{target.path.rstrip('/')}/{remote_path}" + + content_length = environ.get("CONTENT_LENGTH") + chunked = False + + if content_length not in ("", None): + headers.append(("Content-Length", content_length)) # type: ignore + elif content_length is not None: + headers.append(("Transfer-Encoding", "chunked")) + chunked = True + + try: + if target.scheme == "http": + con = client.HTTPConnection( + host, target.port or 80, timeout=self.timeout + ) + elif target.scheme == "https": + con = client.HTTPSConnection( + host, + target.port or 443, + timeout=self.timeout, + context=opts["ssl_context"], + ) + else: + raise RuntimeError( + "Target scheme must be 'http' or 'https', got" + f" {target.scheme!r}." + ) + + con.connect() + # safe = https://url.spec.whatwg.org/#url-path-segment-string + # as well as percent for things that are already quoted + remote_url = quote(remote_path, safe="!$&'()*+,/:;=@%") + querystring = environ["QUERY_STRING"] + + if querystring: + remote_url = f"{remote_url}?{querystring}" + + con.putrequest(environ["REQUEST_METHOD"], remote_url, skip_host=True) + + for k, v in headers: + if k.lower() == "connection": + v = "close" + + con.putheader(k, v) + + con.endheaders() + stream = get_input_stream(environ) + + while True: + data = stream.read(self.chunk_size) + + if not data: + break + + if chunked: + con.send(b"%x\r\n%s\r\n" % (len(data), data)) + else: + con.send(data) + + resp = con.getresponse() + except OSError: + from ..exceptions import BadGateway + + return BadGateway()(environ, start_response) + + start_response( + f"{resp.status} {resp.reason}", + [ + (k.title(), v) + for k, v in resp.getheaders() + if not is_hop_by_hop_header(k) + ], + ) + + def read() -> t.Iterator[bytes]: + while True: + try: + data = resp.read(self.chunk_size) + except OSError: + break + + if not data: + break + + yield data + + return read() + + return application + + def __call__( + self, environ: WSGIEnvironment, start_response: StartResponse + ) -> t.Iterable[bytes]: + path = environ["PATH_INFO"] + app = self.app + + for prefix, opts in self.targets.items(): + if path.startswith(prefix): + app = self.proxy_to(opts, path, prefix) + break + + return app(environ, start_response) diff --git a/venv/lib/python3.8/site-packages/werkzeug/middleware/lint.py b/venv/lib/python3.8/site-packages/werkzeug/middleware/lint.py new file mode 100644 index 0000000..de93b52 --- /dev/null +++ b/venv/lib/python3.8/site-packages/werkzeug/middleware/lint.py @@ -0,0 +1,439 @@ +""" +WSGI Protocol Linter +==================== + +This module provides a middleware that performs sanity checks on the +behavior of the WSGI server and application. It checks that the +:pep:`3333` WSGI spec is properly implemented. It also warns on some +common HTTP errors such as non-empty responses for 304 status codes. + +.. autoclass:: LintMiddleware + +:copyright: 2007 Pallets +:license: BSD-3-Clause +""" + +from __future__ import annotations + +import typing as t +from types import TracebackType +from urllib.parse import urlparse +from warnings import warn + +from ..datastructures import Headers +from ..http import is_entity_header +from ..wsgi import FileWrapper + +if t.TYPE_CHECKING: + from _typeshed.wsgi import StartResponse + from _typeshed.wsgi import WSGIApplication + from _typeshed.wsgi import WSGIEnvironment + + +class WSGIWarning(Warning): + """Warning class for WSGI warnings.""" + + +class HTTPWarning(Warning): + """Warning class for HTTP warnings.""" + + +def check_type(context: str, obj: object, need: type = str) -> None: + if type(obj) is not need: + warn( + f"{context!r} requires {need.__name__!r}, got {type(obj).__name__!r}.", + WSGIWarning, + stacklevel=3, + ) + + +class InputStream: + def __init__(self, stream: t.IO[bytes]) -> None: + self._stream = stream + + def read(self, *args: t.Any) -> bytes: + if len(args) == 0: + warn( + "WSGI does not guarantee an EOF marker on the input stream, thus making" + " calls to 'wsgi.input.read()' unsafe. Conforming servers may never" + " return from this call.", + WSGIWarning, + stacklevel=2, + ) + elif len(args) != 1: + warn( + "Too many parameters passed to 'wsgi.input.read()'.", + WSGIWarning, + stacklevel=2, + ) + return self._stream.read(*args) + + def readline(self, *args: t.Any) -> bytes: + if len(args) == 0: + warn( + "Calls to 'wsgi.input.readline()' without arguments are unsafe. Use" + " 'wsgi.input.read()' instead.", + WSGIWarning, + stacklevel=2, + ) + elif len(args) == 1: + warn( + "'wsgi.input.readline()' was called with a size hint. WSGI does not" + " support this, although it's available on all major servers.", + WSGIWarning, + stacklevel=2, + ) + else: + raise TypeError("Too many arguments passed to 'wsgi.input.readline()'.") + return self._stream.readline(*args) + + def __iter__(self) -> t.Iterator[bytes]: + try: + return iter(self._stream) + except TypeError: + warn("'wsgi.input' is not iterable.", WSGIWarning, stacklevel=2) + return iter(()) + + def close(self) -> None: + warn("The application closed the input stream!", WSGIWarning, stacklevel=2) + self._stream.close() + + +class ErrorStream: + def __init__(self, stream: t.IO[str]) -> None: + self._stream = stream + + def write(self, s: str) -> None: + check_type("wsgi.error.write()", s, str) + self._stream.write(s) + + def flush(self) -> None: + self._stream.flush() + + def writelines(self, seq: t.Iterable[str]) -> None: + for line in seq: + self.write(line) + + def close(self) -> None: + warn("The application closed the error stream!", WSGIWarning, stacklevel=2) + self._stream.close() + + +class GuardedWrite: + def __init__(self, write: t.Callable[[bytes], object], chunks: list[int]) -> None: + self._write = write + self._chunks = chunks + + def __call__(self, s: bytes) -> None: + check_type("write()", s, bytes) + self._write(s) + self._chunks.append(len(s)) + + +class GuardedIterator: + def __init__( + self, + iterator: t.Iterable[bytes], + headers_set: tuple[int, Headers], + chunks: list[int], + ) -> None: + self._iterator = iterator + self._next = iter(iterator).__next__ + self.closed = False + self.headers_set = headers_set + self.chunks = chunks + + def __iter__(self) -> GuardedIterator: + return self + + def __next__(self) -> bytes: + if self.closed: + warn("Iterated over closed 'app_iter'.", WSGIWarning, stacklevel=2) + + rv = self._next() + + if not self.headers_set: + warn( + "The application returned before it started the response.", + WSGIWarning, + stacklevel=2, + ) + + check_type("application iterator items", rv, bytes) + self.chunks.append(len(rv)) + return rv + + def close(self) -> None: + self.closed = True + + if hasattr(self._iterator, "close"): + self._iterator.close() + + if self.headers_set: + status_code, headers = self.headers_set + bytes_sent = sum(self.chunks) + content_length = headers.get("content-length", type=int) + + if status_code == 304: + for key, _value in headers: + key = key.lower() + if key not in ("expires", "content-location") and is_entity_header( + key + ): + warn( + f"Entity header {key!r} found in 304 response.", + HTTPWarning, + stacklevel=2, + ) + if bytes_sent: + warn( + "304 responses must not have a body.", + HTTPWarning, + stacklevel=2, + ) + elif 100 <= status_code < 200 or status_code == 204: + if content_length != 0: + warn( + f"{status_code} responses must have an empty content length.", + HTTPWarning, + stacklevel=2, + ) + if bytes_sent: + warn( + f"{status_code} responses must not have a body.", + HTTPWarning, + stacklevel=2, + ) + elif content_length is not None and content_length != bytes_sent: + warn( + "Content-Length and the number of bytes sent to the" + " client do not match.", + WSGIWarning, + stacklevel=2, + ) + + def __del__(self) -> None: + if not self.closed: + try: + warn( + "Iterator was garbage collected before it was closed.", + WSGIWarning, + stacklevel=2, + ) + except Exception: + pass + + +class LintMiddleware: + """Warns about common errors in the WSGI and HTTP behavior of the + server and wrapped application. Some of the issues it checks are: + + - invalid status codes + - non-bytes sent to the WSGI server + - strings returned from the WSGI application + - non-empty conditional responses + - unquoted etags + - relative URLs in the Location header + - unsafe calls to wsgi.input + - unclosed iterators + + Error information is emitted using the :mod:`warnings` module. + + :param app: The WSGI application to wrap. + + .. code-block:: python + + from werkzeug.middleware.lint import LintMiddleware + app = LintMiddleware(app) + """ + + def __init__(self, app: WSGIApplication) -> None: + self.app = app + + def check_environ(self, environ: WSGIEnvironment) -> None: + if type(environ) is not dict: # noqa: E721 + warn( + "WSGI environment is not a standard Python dict.", + WSGIWarning, + stacklevel=4, + ) + for key in ( + "REQUEST_METHOD", + "SERVER_NAME", + "SERVER_PORT", + "wsgi.version", + "wsgi.input", + "wsgi.errors", + "wsgi.multithread", + "wsgi.multiprocess", + "wsgi.run_once", + ): + if key not in environ: + warn( + f"Required environment key {key!r} not found", + WSGIWarning, + stacklevel=3, + ) + if environ["wsgi.version"] != (1, 0): + warn("Environ is not a WSGI 1.0 environ.", WSGIWarning, stacklevel=3) + + script_name = environ.get("SCRIPT_NAME", "") + path_info = environ.get("PATH_INFO", "") + + if script_name and script_name[0] != "/": + warn( + f"'SCRIPT_NAME' does not start with a slash: {script_name!r}", + WSGIWarning, + stacklevel=3, + ) + + if path_info and path_info[0] != "/": + warn( + f"'PATH_INFO' does not start with a slash: {path_info!r}", + WSGIWarning, + stacklevel=3, + ) + + def check_start_response( + self, + status: str, + headers: list[tuple[str, str]], + exc_info: None | (tuple[type[BaseException], BaseException, TracebackType]), + ) -> tuple[int, Headers]: + check_type("status", status, str) + status_code_str = status.split(None, 1)[0] + + if len(status_code_str) != 3 or not status_code_str.isdecimal(): + warn("Status code must be three digits.", WSGIWarning, stacklevel=3) + + if len(status) < 4 or status[3] != " ": + warn( + f"Invalid value for status {status!r}. Valid status strings are three" + " digits, a space and a status explanation.", + WSGIWarning, + stacklevel=3, + ) + + status_code = int(status_code_str) + + if status_code < 100: + warn("Status code < 100 detected.", WSGIWarning, stacklevel=3) + + if type(headers) is not list: # noqa: E721 + warn("Header list is not a list.", WSGIWarning, stacklevel=3) + + for item in headers: + if type(item) is not tuple or len(item) != 2: + warn("Header items must be 2-item tuples.", WSGIWarning, stacklevel=3) + name, value = item + if type(name) is not str or type(value) is not str: # noqa: E721 + warn( + "Header keys and values must be strings.", WSGIWarning, stacklevel=3 + ) + if name.lower() == "status": + warn( + "The status header is not supported due to" + " conflicts with the CGI spec.", + WSGIWarning, + stacklevel=3, + ) + + if exc_info is not None and not isinstance(exc_info, tuple): + warn("Invalid value for exc_info.", WSGIWarning, stacklevel=3) + + headers_obj = Headers(headers) + self.check_headers(headers_obj) + + return status_code, headers_obj + + def check_headers(self, headers: Headers) -> None: + etag = headers.get("etag") + + if etag is not None: + if etag.startswith(("W/", "w/")): + if etag.startswith("w/"): + warn( + "Weak etag indicator should be upper case.", + HTTPWarning, + stacklevel=4, + ) + + etag = etag[2:] + + if not (etag[:1] == etag[-1:] == '"'): + warn("Unquoted etag emitted.", HTTPWarning, stacklevel=4) + + location = headers.get("location") + + if location is not None: + if not urlparse(location).netloc: + warn( + "Absolute URLs required for location header.", + HTTPWarning, + stacklevel=4, + ) + + def check_iterator(self, app_iter: t.Iterable[bytes]) -> None: + if isinstance(app_iter, str): + warn( + "The application returned a string. The response will send one" + " character at a time to the client, which will kill performance." + " Return a list or iterable instead.", + WSGIWarning, + stacklevel=3, + ) + + def __call__(self, *args: t.Any, **kwargs: t.Any) -> t.Iterable[bytes]: + if len(args) != 2: + warn("A WSGI app takes two arguments.", WSGIWarning, stacklevel=2) + + if kwargs: + warn( + "A WSGI app does not take keyword arguments.", WSGIWarning, stacklevel=2 + ) + + environ: WSGIEnvironment = args[0] + start_response: StartResponse = args[1] + + self.check_environ(environ) + environ["wsgi.input"] = InputStream(environ["wsgi.input"]) + environ["wsgi.errors"] = ErrorStream(environ["wsgi.errors"]) + + # Hook our own file wrapper in so that applications will always + # iterate to the end and we can check the content length. + environ["wsgi.file_wrapper"] = FileWrapper + + headers_set: list[t.Any] = [] + chunks: list[int] = [] + + def checking_start_response( + *args: t.Any, **kwargs: t.Any + ) -> t.Callable[[bytes], None]: + if len(args) not in {2, 3}: + warn( + f"Invalid number of arguments: {len(args)}, expected 2 or 3.", + WSGIWarning, + stacklevel=2, + ) + + if kwargs: + warn( + "'start_response' does not take keyword arguments.", + WSGIWarning, + stacklevel=2, + ) + + status: str = args[0] + headers: list[tuple[str, str]] = args[1] + exc_info: ( + None | (tuple[type[BaseException], BaseException, TracebackType]) + ) = args[2] if len(args) == 3 else None + + headers_set[:] = self.check_start_response(status, headers, exc_info) + return GuardedWrite(start_response(status, headers, exc_info), chunks) + + app_iter = self.app(environ, t.cast("StartResponse", checking_start_response)) + self.check_iterator(app_iter) + return GuardedIterator( + app_iter, t.cast(t.Tuple[int, Headers], headers_set), chunks + ) diff --git a/venv/lib/python3.8/site-packages/werkzeug/middleware/profiler.py b/venv/lib/python3.8/site-packages/werkzeug/middleware/profiler.py new file mode 100644 index 0000000..112b877 --- /dev/null +++ b/venv/lib/python3.8/site-packages/werkzeug/middleware/profiler.py @@ -0,0 +1,155 @@ +""" +Application Profiler +==================== + +This module provides a middleware that profiles each request with the +:mod:`cProfile` module. This can help identify bottlenecks in your code +that may be slowing down your application. + +.. autoclass:: ProfilerMiddleware + +:copyright: 2007 Pallets +:license: BSD-3-Clause +""" + +from __future__ import annotations + +import os.path +import sys +import time +import typing as t +from pstats import Stats + +try: + from cProfile import Profile +except ImportError: + from profile import Profile # type: ignore + +if t.TYPE_CHECKING: + from _typeshed.wsgi import StartResponse + from _typeshed.wsgi import WSGIApplication + from _typeshed.wsgi import WSGIEnvironment + + +class ProfilerMiddleware: + """Wrap a WSGI application and profile the execution of each + request. Responses are buffered so that timings are more exact. + + If ``stream`` is given, :class:`pstats.Stats` are written to it + after each request. If ``profile_dir`` is given, :mod:`cProfile` + data files are saved to that directory, one file per request. + + The filename can be customized by passing ``filename_format``. If + it is a string, it will be formatted using :meth:`str.format` with + the following fields available: + + - ``{method}`` - The request method; GET, POST, etc. + - ``{path}`` - The request path or 'root' should one not exist. + - ``{elapsed}`` - The elapsed time of the request in milliseconds. + - ``{time}`` - The time of the request. + + If it is a callable, it will be called with the WSGI ``environ`` and + be expected to return a filename string. The ``environ`` dictionary + will also have the ``"werkzeug.profiler"`` key populated with a + dictionary containing the following fields (more may be added in the + future): + - ``{elapsed}`` - The elapsed time of the request in milliseconds. + - ``{time}`` - The time of the request. + + :param app: The WSGI application to wrap. + :param stream: Write stats to this stream. Disable with ``None``. + :param sort_by: A tuple of columns to sort stats by. See + :meth:`pstats.Stats.sort_stats`. + :param restrictions: A tuple of restrictions to filter stats by. See + :meth:`pstats.Stats.print_stats`. + :param profile_dir: Save profile data files to this directory. + :param filename_format: Format string for profile data file names, + or a callable returning a name. See explanation above. + + .. code-block:: python + + from werkzeug.middleware.profiler import ProfilerMiddleware + app = ProfilerMiddleware(app) + + .. versionchanged:: 3.0 + Added the ``"werkzeug.profiler"`` key to the ``filename_format(environ)`` + parameter with the ``elapsed`` and ``time`` fields. + + .. versionchanged:: 0.15 + Stats are written even if ``profile_dir`` is given, and can be + disable by passing ``stream=None``. + + .. versionadded:: 0.15 + Added ``filename_format``. + + .. versionadded:: 0.9 + Added ``restrictions`` and ``profile_dir``. + """ + + def __init__( + self, + app: WSGIApplication, + stream: t.IO[str] | None = sys.stdout, + sort_by: t.Iterable[str] = ("time", "calls"), + restrictions: t.Iterable[str | int | float] = (), + profile_dir: str | None = None, + filename_format: str = "{method}.{path}.{elapsed:.0f}ms.{time:.0f}.prof", + ) -> None: + self._app = app + self._stream = stream + self._sort_by = sort_by + self._restrictions = restrictions + self._profile_dir = profile_dir + self._filename_format = filename_format + + def __call__( + self, environ: WSGIEnvironment, start_response: StartResponse + ) -> t.Iterable[bytes]: + response_body: list[bytes] = [] + + def catching_start_response(status, headers, exc_info=None): # type: ignore + start_response(status, headers, exc_info) + return response_body.append + + def runapp() -> None: + app_iter = self._app( + environ, t.cast("StartResponse", catching_start_response) + ) + response_body.extend(app_iter) + + if hasattr(app_iter, "close"): + app_iter.close() + + profile = Profile() + start = time.time() + profile.runcall(runapp) + body = b"".join(response_body) + elapsed = time.time() - start + + if self._profile_dir is not None: + if callable(self._filename_format): + environ["werkzeug.profiler"] = { + "elapsed": elapsed * 1000.0, + "time": time.time(), + } + filename = self._filename_format(environ) + else: + filename = self._filename_format.format( + method=environ["REQUEST_METHOD"], + path=environ["PATH_INFO"].strip("/").replace("/", ".") or "root", + elapsed=elapsed * 1000.0, + time=time.time(), + ) + filename = os.path.join(self._profile_dir, filename) + profile.dump_stats(filename) + + if self._stream is not None: + stats = Stats(profile, stream=self._stream) + stats.sort_stats(*self._sort_by) + print("-" * 80, file=self._stream) + path_info = environ.get("PATH_INFO", "") + print(f"PATH: {path_info!r}", file=self._stream) + stats.print_stats(*self._restrictions) + print(f"{'-' * 80}\n", file=self._stream) + + return [body] diff --git a/venv/lib/python3.8/site-packages/werkzeug/middleware/proxy_fix.py b/venv/lib/python3.8/site-packages/werkzeug/middleware/proxy_fix.py new file mode 100644 index 0000000..cbf4e0b --- /dev/null +++ b/venv/lib/python3.8/site-packages/werkzeug/middleware/proxy_fix.py @@ -0,0 +1,183 @@ +""" +X-Forwarded-For Proxy Fix +========================= + +This module provides a middleware that adjusts the WSGI environ based on +``X-Forwarded-`` headers that proxies in front of an application may +set. + +When an application is running behind a proxy server, WSGI may see the +request as coming from that server rather than the real client. Proxies +set various headers to track where the request actually came from. + +This middleware should only be used if the application is actually +behind such a proxy, and should be configured with the number of proxies +that are chained in front of it. Not all proxies set all the headers. +Since incoming headers can be faked, you must set how many proxies are +setting each header so the middleware knows what to trust. + +.. autoclass:: ProxyFix + +:copyright: 2007 Pallets +:license: BSD-3-Clause +""" + +from __future__ import annotations + +import typing as t + +from ..http import parse_list_header + +if t.TYPE_CHECKING: + from _typeshed.wsgi import StartResponse + from _typeshed.wsgi import WSGIApplication + from _typeshed.wsgi import WSGIEnvironment + + +class ProxyFix: + """Adjust the WSGI environ based on ``X-Forwarded-`` that proxies in + front of the application may set. + + - ``X-Forwarded-For`` sets ``REMOTE_ADDR``. + - ``X-Forwarded-Proto`` sets ``wsgi.url_scheme``. + - ``X-Forwarded-Host`` sets ``HTTP_HOST``, ``SERVER_NAME``, and + ``SERVER_PORT``. + - ``X-Forwarded-Port`` sets ``HTTP_HOST`` and ``SERVER_PORT``. + - ``X-Forwarded-Prefix`` sets ``SCRIPT_NAME``. + + You must tell the middleware how many proxies set each header so it + knows what values to trust. It is a security issue to trust values + that came from the client rather than a proxy. + + The original values of the headers are stored in the WSGI + environ as ``werkzeug.proxy_fix.orig``, a dict. + + :param app: The WSGI application to wrap. + :param x_for: Number of values to trust for ``X-Forwarded-For``. + :param x_proto: Number of values to trust for ``X-Forwarded-Proto``. + :param x_host: Number of values to trust for ``X-Forwarded-Host``. + :param x_port: Number of values to trust for ``X-Forwarded-Port``. + :param x_prefix: Number of values to trust for + ``X-Forwarded-Prefix``. + + .. code-block:: python + + from werkzeug.middleware.proxy_fix import ProxyFix + # App is behind one proxy that sets the -For and -Host headers. + app = ProxyFix(app, x_for=1, x_host=1) + + .. versionchanged:: 1.0 + The ``num_proxies`` argument and attribute; the ``get_remote_addr`` method; and + the environ keys ``orig_remote_addr``, ``orig_wsgi_url_scheme``, and + ``orig_http_host`` were removed. + + .. versionchanged:: 0.15 + All headers support multiple values. Each header is configured with a separate + number of trusted proxies. + + .. versionchanged:: 0.15 + Original WSGI environ values are stored in the ``werkzeug.proxy_fix.orig`` dict. + + .. versionchanged:: 0.15 + Support ``X-Forwarded-Port`` and ``X-Forwarded-Prefix``. + + .. versionchanged:: 0.15 + ``X-Forwarded-Host`` and ``X-Forwarded-Port`` modify + ``SERVER_NAME`` and ``SERVER_PORT``. + """ + + def __init__( + self, + app: WSGIApplication, + x_for: int = 1, + x_proto: int = 1, + x_host: int = 0, + x_port: int = 0, + x_prefix: int = 0, + ) -> None: + self.app = app + self.x_for = x_for + self.x_proto = x_proto + self.x_host = x_host + self.x_port = x_port + self.x_prefix = x_prefix + + def _get_real_value(self, trusted: int, value: str | None) -> str | None: + """Get the real value from a list header based on the configured + number of trusted proxies. + + :param trusted: Number of values to trust in the header. + :param value: Comma separated list header value to parse. + :return: The real value, or ``None`` if there are fewer values + than the number of trusted proxies. + + .. versionchanged:: 1.0 + Renamed from ``_get_trusted_comma``. + + .. versionadded:: 0.15 + """ + if not (trusted and value): + return None + values = parse_list_header(value) + if len(values) >= trusted: + return values[-trusted] + return None + + def __call__( + self, environ: WSGIEnvironment, start_response: StartResponse + ) -> t.Iterable[bytes]: + """Modify the WSGI environ based on the various ``Forwarded`` + headers before calling the wrapped application. Store the + original environ values in ``werkzeug.proxy_fix.orig_{key}``. + """ + environ_get = environ.get + orig_remote_addr = environ_get("REMOTE_ADDR") + orig_wsgi_url_scheme = environ_get("wsgi.url_scheme") + orig_http_host = environ_get("HTTP_HOST") + environ.update( + { + "werkzeug.proxy_fix.orig": { + "REMOTE_ADDR": orig_remote_addr, + "wsgi.url_scheme": orig_wsgi_url_scheme, + "HTTP_HOST": orig_http_host, + "SERVER_NAME": environ_get("SERVER_NAME"), + "SERVER_PORT": environ_get("SERVER_PORT"), + "SCRIPT_NAME": environ_get("SCRIPT_NAME"), + } + } + ) + + x_for = self._get_real_value(self.x_for, environ_get("HTTP_X_FORWARDED_FOR")) + if x_for: + environ["REMOTE_ADDR"] = x_for + + x_proto = self._get_real_value( + self.x_proto, environ_get("HTTP_X_FORWARDED_PROTO") + ) + if x_proto: + environ["wsgi.url_scheme"] = x_proto + + x_host = self._get_real_value(self.x_host, environ_get("HTTP_X_FORWARDED_HOST")) + if x_host: + environ["HTTP_HOST"] = environ["SERVER_NAME"] = x_host + # "]" to check for IPv6 address without port + if ":" in x_host and not x_host.endswith("]"): + environ["SERVER_NAME"], environ["SERVER_PORT"] = x_host.rsplit(":", 1) + + x_port = self._get_real_value(self.x_port, environ_get("HTTP_X_FORWARDED_PORT")) + if x_port: + host = environ.get("HTTP_HOST") + if host: + # "]" to check for IPv6 address without port + if ":" in host and not host.endswith("]"): + host = host.rsplit(":", 1)[0] + environ["HTTP_HOST"] = f"{host}:{x_port}" + environ["SERVER_PORT"] = x_port + + x_prefix = self._get_real_value( + self.x_prefix, environ_get("HTTP_X_FORWARDED_PREFIX") + ) + if x_prefix: + environ["SCRIPT_NAME"] = x_prefix + + return self.app(environ, start_response) diff --git a/venv/lib/python3.8/site-packages/werkzeug/middleware/shared_data.py b/venv/lib/python3.8/site-packages/werkzeug/middleware/shared_data.py new file mode 100644 index 0000000..0f467f2 --- /dev/null +++ b/venv/lib/python3.8/site-packages/werkzeug/middleware/shared_data.py @@ -0,0 +1,283 @@ +""" +Serve Shared Static Files +========================= + +.. autoclass:: SharedDataMiddleware + :members: is_allowed + +:copyright: 2007 Pallets +:license: BSD-3-Clause +""" + +from __future__ import annotations + +import collections.abc as cabc +import importlib.util +import mimetypes +import os +import posixpath +import typing as t +from datetime import datetime +from datetime import timezone +from io import BytesIO +from time import time +from zlib import adler32 + +from ..http import http_date +from ..http import is_resource_modified +from ..security import safe_join +from ..utils import get_content_type +from ..wsgi import get_path_info +from ..wsgi import wrap_file + +_TOpener = t.Callable[[], t.Tuple[t.IO[bytes], datetime, int]] +_TLoader = t.Callable[[t.Optional[str]], t.Tuple[t.Optional[str], t.Optional[_TOpener]]] + +if t.TYPE_CHECKING: + from _typeshed.wsgi import StartResponse + from _typeshed.wsgi import WSGIApplication + from _typeshed.wsgi import WSGIEnvironment + + +class SharedDataMiddleware: + """A WSGI middleware which provides static content for development + environments or simple server setups. Its usage is quite simple:: + + import os + from werkzeug.middleware.shared_data import SharedDataMiddleware + + app = SharedDataMiddleware(app, { + '/shared': os.path.join(os.path.dirname(__file__), 'shared') + }) + + The contents of the folder ``./shared`` will now be available on + ``http://example.com/shared/``. This is pretty useful during development + because a standalone media server is not required. Files can also be + mounted on the root folder and still continue to use the application because + the shared data middleware forwards all unhandled requests to the + application, even if the requests are below one of the shared folders. + + If `pkg_resources` is available you can also tell the middleware to serve + files from package data:: + + app = SharedDataMiddleware(app, { + '/static': ('myapplication', 'static') + }) + + This will then serve the ``static`` folder in the `myapplication` + Python package. + + The optional `disallow` parameter can be a list of :func:`~fnmatch.fnmatch` + rules for files that are not accessible from the web. If `cache` is set to + `False` no caching headers are sent. + + Currently the middleware does not support non-ASCII filenames. If the + encoding on the file system happens to match the encoding of the URI it may + work but this could also be by accident. We strongly suggest using ASCII + only file names for static files. + + The middleware will guess the mimetype using the Python `mimetype` + module. If it's unable to figure out the charset it will fall back + to `fallback_mimetype`. + + :param app: the application to wrap. If you don't want to wrap an + application you can pass it :exc:`NotFound`. + :param exports: a list or dict of exported files and folders. + :param disallow: a list of :func:`~fnmatch.fnmatch` rules. + :param cache: enable or disable caching headers. + :param cache_timeout: the cache timeout in seconds for the headers. + :param fallback_mimetype: The fallback mimetype for unknown files. + + .. versionchanged:: 1.0 + The default ``fallback_mimetype`` is + ``application/octet-stream``. If a filename looks like a text + mimetype, the ``utf-8`` charset is added to it. + + .. versionadded:: 0.6 + Added ``fallback_mimetype``. + + .. versionchanged:: 0.5 + Added ``cache_timeout``. + """ + + def __init__( + self, + app: WSGIApplication, + exports: ( + cabc.Mapping[str, str | tuple[str, str]] + | t.Iterable[tuple[str, str | tuple[str, str]]] + ), + disallow: None = None, + cache: bool = True, + cache_timeout: int = 60 * 60 * 12, + fallback_mimetype: str = "application/octet-stream", + ) -> None: + self.app = app + self.exports: list[tuple[str, _TLoader]] = [] + self.cache = cache + self.cache_timeout = cache_timeout + + if isinstance(exports, cabc.Mapping): + exports = exports.items() + + for key, value in exports: + if isinstance(value, tuple): + loader = self.get_package_loader(*value) + elif isinstance(value, str): + if os.path.isfile(value): + loader = self.get_file_loader(value) + else: + loader = self.get_directory_loader(value) + else: + raise TypeError(f"unknown def {value!r}") + + self.exports.append((key, loader)) + + if disallow is not None: + from fnmatch import fnmatch + + self.is_allowed = lambda x: not fnmatch(x, disallow) + + self.fallback_mimetype = fallback_mimetype + + def is_allowed(self, filename: str) -> bool: + """Subclasses can override this method to disallow the access to + certain files. However by providing `disallow` in the constructor + this method is overwritten. + """ + return True + + def _opener(self, filename: str) -> _TOpener: + return lambda: ( + open(filename, "rb"), + datetime.fromtimestamp(os.path.getmtime(filename), tz=timezone.utc), + int(os.path.getsize(filename)), + ) + + def get_file_loader(self, filename: str) -> _TLoader: + return lambda x: (os.path.basename(filename), self._opener(filename)) + + def get_package_loader(self, package: str, package_path: str) -> _TLoader: + load_time = datetime.now(timezone.utc) + spec = importlib.util.find_spec(package) + reader = spec.loader.get_resource_reader(package) # type: ignore[union-attr] + + def loader( + path: str | None, + ) -> tuple[str | None, _TOpener | None]: + if path is None: + return None, None + + path = safe_join(package_path, path) + + if path is None: + return None, None + + basename = posixpath.basename(path) + + try: + resource = reader.open_resource(path) + except OSError: + return None, None + + if isinstance(resource, BytesIO): + return ( + basename, + lambda: (resource, load_time, len(resource.getvalue())), + ) + + return ( + basename, + lambda: ( + resource, + datetime.fromtimestamp( + os.path.getmtime(resource.name), tz=timezone.utc + ), + os.path.getsize(resource.name), + ), + ) + + return loader + + def get_directory_loader(self, directory: str) -> _TLoader: + def loader( + path: str | None, + ) -> tuple[str | None, _TOpener | None]: + if path is not None: + path = safe_join(directory, path) + + if path is None: + return None, None + else: + path = directory + + if os.path.isfile(path): + return os.path.basename(path), self._opener(path) + + return None, None + + return loader + + def generate_etag(self, mtime: datetime, file_size: int, real_filename: str) -> str: + fn_str = os.fsencode(real_filename) + timestamp = mtime.timestamp() + checksum = adler32(fn_str) & 0xFFFFFFFF + return f"wzsdm-{timestamp}-{file_size}-{checksum}" + + def __call__( + self, environ: WSGIEnvironment, start_response: StartResponse + ) -> t.Iterable[bytes]: + path = get_path_info(environ) + file_loader = None + + for search_path, loader in self.exports: + if search_path == path: + real_filename, file_loader = loader(None) + + if file_loader is not None: + break + + if not search_path.endswith("/"): + search_path += "/" + + if path.startswith(search_path): + real_filename, file_loader = loader(path[len(search_path) :]) + + if file_loader is not None: + break + + if file_loader is None or not self.is_allowed(real_filename): # type: ignore + return self.app(environ, start_response) + + guessed_type = mimetypes.guess_type(real_filename) # type: ignore + mime_type = get_content_type(guessed_type[0] or self.fallback_mimetype, "utf-8") + f, mtime, file_size = file_loader() + + headers = [("Date", http_date())] + + if self.cache: + timeout = self.cache_timeout + etag = self.generate_etag(mtime, file_size, real_filename) # type: ignore + headers += [ + ("Etag", f'"{etag}"'), + ("Cache-Control", f"max-age={timeout}, public"), + ] + + if not is_resource_modified(environ, etag, last_modified=mtime): + f.close() + start_response("304 Not Modified", headers) + return [] + + headers.append(("Expires", http_date(time() + timeout))) + else: + headers.append(("Cache-Control", "public")) + + headers.extend( + ( + ("Content-Type", mime_type), + ("Content-Length", str(file_size)), + ("Last-Modified", http_date(mtime)), + ) + ) + start_response("200 OK", headers) + return wrap_file(environ, f) |
