aboutsummaryrefslogtreecommitdiff
path: root/venv/lib/python3.8/site-packages/werkzeug/middleware
diff options
context:
space:
mode:
authorsotech117 <michael_foiani@brown.edu>2025-07-31 17:27:24 -0400
committersotech117 <michael_foiani@brown.edu>2025-07-31 17:27:24 -0400
commit5bf22fc7e3c392c8bd44315ca2d06d7dca7d084e (patch)
tree8dacb0f195df1c0788d36dd0064f6bbaa3143ede /venv/lib/python3.8/site-packages/werkzeug/middleware
parentb832d364da8c2efe09e3f75828caf73c50d01ce3 (diff)
add code for analysis of data
Diffstat (limited to 'venv/lib/python3.8/site-packages/werkzeug/middleware')
-rw-r--r--venv/lib/python3.8/site-packages/werkzeug/middleware/__init__.py0
-rw-r--r--venv/lib/python3.8/site-packages/werkzeug/middleware/dispatcher.py81
-rw-r--r--venv/lib/python3.8/site-packages/werkzeug/middleware/http_proxy.py236
-rw-r--r--venv/lib/python3.8/site-packages/werkzeug/middleware/lint.py439
-rw-r--r--venv/lib/python3.8/site-packages/werkzeug/middleware/profiler.py155
-rw-r--r--venv/lib/python3.8/site-packages/werkzeug/middleware/proxy_fix.py183
-rw-r--r--venv/lib/python3.8/site-packages/werkzeug/middleware/shared_data.py283
7 files changed, 1377 insertions, 0 deletions
diff --git a/venv/lib/python3.8/site-packages/werkzeug/middleware/__init__.py b/venv/lib/python3.8/site-packages/werkzeug/middleware/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/werkzeug/middleware/__init__.py
diff --git a/venv/lib/python3.8/site-packages/werkzeug/middleware/dispatcher.py b/venv/lib/python3.8/site-packages/werkzeug/middleware/dispatcher.py
new file mode 100644
index 0000000..e11bacc
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/werkzeug/middleware/dispatcher.py
@@ -0,0 +1,81 @@
+"""
+Application Dispatcher
+======================
+
+This middleware creates a single WSGI application that dispatches to
+multiple other WSGI applications mounted at different URL paths.
+
+A common example is writing a Single Page Application, where you have a
+backend API and a frontend written in JavaScript that does the routing
+in the browser rather than requesting different pages from the server.
+The frontend is a single HTML and JS file that should be served for any
+path besides "/api".
+
+This example dispatches to an API app under "/api", an admin app
+under "/admin", and an app that serves frontend files for all other
+requests::
+
+ app = DispatcherMiddleware(serve_frontend, {
+ '/api': api_app,
+ '/admin': admin_app,
+ })
+
+In production, you might instead handle this at the HTTP server level,
+serving files or proxying to application servers based on location. The
+API and admin apps would each be deployed with a separate WSGI server,
+and the static files would be served directly by the HTTP server.
+
+.. autoclass:: DispatcherMiddleware
+
+:copyright: 2007 Pallets
+:license: BSD-3-Clause
+"""
+
+from __future__ import annotations
+
+import typing as t
+
+if t.TYPE_CHECKING:
+ from _typeshed.wsgi import StartResponse
+ from _typeshed.wsgi import WSGIApplication
+ from _typeshed.wsgi import WSGIEnvironment
+
+
+class DispatcherMiddleware:
+ """Combine multiple applications as a single WSGI application.
+ Requests are dispatched to an application based on the path it is
+ mounted under.
+
+ :param app: The WSGI application to dispatch to if the request
+ doesn't match a mounted path.
+ :param mounts: Maps path prefixes to applications for dispatching.
+ """
+
+ def __init__(
+ self,
+ app: WSGIApplication,
+ mounts: dict[str, WSGIApplication] | None = None,
+ ) -> None:
+ self.app = app
+ self.mounts = mounts or {}
+
+ def __call__(
+ self, environ: WSGIEnvironment, start_response: StartResponse
+ ) -> t.Iterable[bytes]:
+ script = environ.get("PATH_INFO", "")
+ path_info = ""
+
+ while "/" in script:
+ if script in self.mounts:
+ app = self.mounts[script]
+ break
+
+ script, last_item = script.rsplit("/", 1)
+ path_info = f"/{last_item}{path_info}"
+ else:
+ app = self.mounts.get(script, self.app)
+
+ original_script_name = environ.get("SCRIPT_NAME", "")
+ environ["SCRIPT_NAME"] = original_script_name + script
+ environ["PATH_INFO"] = path_info
+ return app(environ, start_response)
diff --git a/venv/lib/python3.8/site-packages/werkzeug/middleware/http_proxy.py b/venv/lib/python3.8/site-packages/werkzeug/middleware/http_proxy.py
new file mode 100644
index 0000000..5e23915
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/werkzeug/middleware/http_proxy.py
@@ -0,0 +1,236 @@
+"""
+Basic HTTP Proxy
+================
+
+.. autoclass:: ProxyMiddleware
+
+:copyright: 2007 Pallets
+:license: BSD-3-Clause
+"""
+
+from __future__ import annotations
+
+import typing as t
+from http import client
+from urllib.parse import quote
+from urllib.parse import urlsplit
+
+from ..datastructures import EnvironHeaders
+from ..http import is_hop_by_hop_header
+from ..wsgi import get_input_stream
+
+if t.TYPE_CHECKING:
+ from _typeshed.wsgi import StartResponse
+ from _typeshed.wsgi import WSGIApplication
+ from _typeshed.wsgi import WSGIEnvironment
+
+
+class ProxyMiddleware:
+ """Proxy requests under a path to an external server, routing other
+ requests to the app.
+
+ This middleware can only proxy HTTP requests, as HTTP is the only
+ protocol handled by the WSGI server. Other protocols, such as
+ WebSocket requests, cannot be proxied at this layer. This should
+ only be used for development, in production a real proxy server
+ should be used.
+
+ The middleware takes a dict mapping a path prefix to a dict
+ describing the host to be proxied to::
+
+ app = ProxyMiddleware(app, {
+ "/static/": {
+ "target": "http://127.0.0.1:5001/",
+ }
+ })
+
+ Each host has the following options:
+
+ ``target``:
+ The target URL to dispatch to. This is required.
+ ``remove_prefix``:
+ Whether to remove the prefix from the URL before dispatching it
+ to the target. The default is ``False``.
+ ``host``:
+ ``"<auto>"`` (default):
+ The host header is automatically rewritten to the URL of the
+ target.
+ ``None``:
+ The host header is unmodified from the client request.
+ Any other value:
+ The host header is overwritten with the value.
+ ``headers``:
+ A dictionary of headers to be sent with the request to the
+ target. The default is ``{}``.
+ ``ssl_context``:
+ A :class:`ssl.SSLContext` defining how to verify requests if the
+ target is HTTPS. The default is ``None``.
+
+ In the example above, everything under ``"/static/"`` is proxied to
+ the server on port 5001. The host header is rewritten to the target,
+ and the ``"/static/"`` prefix is removed from the URLs.
+
+ :param app: The WSGI application to wrap.
+ :param targets: Proxy target configurations. See description above.
+ :param chunk_size: Size of chunks to read from input stream and
+ write to target.
+ :param timeout: Seconds before an operation to a target fails.
+
+ .. versionadded:: 0.14
+ """
+
+ def __init__(
+ self,
+ app: WSGIApplication,
+ targets: t.Mapping[str, dict[str, t.Any]],
+ chunk_size: int = 2 << 13,
+ timeout: int = 10,
+ ) -> None:
+ def _set_defaults(opts: dict[str, t.Any]) -> dict[str, t.Any]:
+ opts.setdefault("remove_prefix", False)
+ opts.setdefault("host", "<auto>")
+ opts.setdefault("headers", {})
+ opts.setdefault("ssl_context", None)
+ return opts
+
+ self.app = app
+ self.targets = {
+ f"/{k.strip('/')}/": _set_defaults(v) for k, v in targets.items()
+ }
+ self.chunk_size = chunk_size
+ self.timeout = timeout
+
+ def proxy_to(
+ self, opts: dict[str, t.Any], path: str, prefix: str
+ ) -> WSGIApplication:
+ target = urlsplit(opts["target"])
+ # socket can handle unicode host, but header must be ascii
+ host = target.hostname.encode("idna").decode("ascii")
+
+ def application(
+ environ: WSGIEnvironment, start_response: StartResponse
+ ) -> t.Iterable[bytes]:
+ headers = list(EnvironHeaders(environ).items())
+ headers[:] = [
+ (k, v)
+ for k, v in headers
+ if not is_hop_by_hop_header(k)
+ and k.lower() not in ("content-length", "host")
+ ]
+ headers.append(("Connection", "close"))
+
+ if opts["host"] == "<auto>":
+ headers.append(("Host", host))
+ elif opts["host"] is None:
+ headers.append(("Host", environ["HTTP_HOST"]))
+ else:
+ headers.append(("Host", opts["host"]))
+
+ headers.extend(opts["headers"].items())
+ remote_path = path
+
+ if opts["remove_prefix"]:
+ remote_path = remote_path[len(prefix) :].lstrip("/")
+ remote_path = f"{target.path.rstrip('/')}/{remote_path}"
+
+ content_length = environ.get("CONTENT_LENGTH")
+ chunked = False
+
+ if content_length not in ("", None):
+ headers.append(("Content-Length", content_length)) # type: ignore
+ elif content_length is not None:
+ headers.append(("Transfer-Encoding", "chunked"))
+ chunked = True
+
+ try:
+ if target.scheme == "http":
+ con = client.HTTPConnection(
+ host, target.port or 80, timeout=self.timeout
+ )
+ elif target.scheme == "https":
+ con = client.HTTPSConnection(
+ host,
+ target.port or 443,
+ timeout=self.timeout,
+ context=opts["ssl_context"],
+ )
+ else:
+ raise RuntimeError(
+ "Target scheme must be 'http' or 'https', got"
+ f" {target.scheme!r}."
+ )
+
+ con.connect()
+ # safe = https://url.spec.whatwg.org/#url-path-segment-string
+ # as well as percent for things that are already quoted
+ remote_url = quote(remote_path, safe="!$&'()*+,/:;=@%")
+ querystring = environ["QUERY_STRING"]
+
+ if querystring:
+ remote_url = f"{remote_url}?{querystring}"
+
+ con.putrequest(environ["REQUEST_METHOD"], remote_url, skip_host=True)
+
+ for k, v in headers:
+ if k.lower() == "connection":
+ v = "close"
+
+ con.putheader(k, v)
+
+ con.endheaders()
+ stream = get_input_stream(environ)
+
+ while True:
+ data = stream.read(self.chunk_size)
+
+ if not data:
+ break
+
+ if chunked:
+ con.send(b"%x\r\n%s\r\n" % (len(data), data))
+ else:
+ con.send(data)
+
+ resp = con.getresponse()
+ except OSError:
+ from ..exceptions import BadGateway
+
+ return BadGateway()(environ, start_response)
+
+ start_response(
+ f"{resp.status} {resp.reason}",
+ [
+ (k.title(), v)
+ for k, v in resp.getheaders()
+ if not is_hop_by_hop_header(k)
+ ],
+ )
+
+ def read() -> t.Iterator[bytes]:
+ while True:
+ try:
+ data = resp.read(self.chunk_size)
+ except OSError:
+ break
+
+ if not data:
+ break
+
+ yield data
+
+ return read()
+
+ return application
+
+ def __call__(
+ self, environ: WSGIEnvironment, start_response: StartResponse
+ ) -> t.Iterable[bytes]:
+ path = environ["PATH_INFO"]
+ app = self.app
+
+ for prefix, opts in self.targets.items():
+ if path.startswith(prefix):
+ app = self.proxy_to(opts, path, prefix)
+ break
+
+ return app(environ, start_response)
diff --git a/venv/lib/python3.8/site-packages/werkzeug/middleware/lint.py b/venv/lib/python3.8/site-packages/werkzeug/middleware/lint.py
new file mode 100644
index 0000000..de93b52
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/werkzeug/middleware/lint.py
@@ -0,0 +1,439 @@
+"""
+WSGI Protocol Linter
+====================
+
+This module provides a middleware that performs sanity checks on the
+behavior of the WSGI server and application. It checks that the
+:pep:`3333` WSGI spec is properly implemented. It also warns on some
+common HTTP errors such as non-empty responses for 304 status codes.
+
+.. autoclass:: LintMiddleware
+
+:copyright: 2007 Pallets
+:license: BSD-3-Clause
+"""
+
+from __future__ import annotations
+
+import typing as t
+from types import TracebackType
+from urllib.parse import urlparse
+from warnings import warn
+
+from ..datastructures import Headers
+from ..http import is_entity_header
+from ..wsgi import FileWrapper
+
+if t.TYPE_CHECKING:
+ from _typeshed.wsgi import StartResponse
+ from _typeshed.wsgi import WSGIApplication
+ from _typeshed.wsgi import WSGIEnvironment
+
+
+class WSGIWarning(Warning):
+ """Warning class for WSGI warnings."""
+
+
+class HTTPWarning(Warning):
+ """Warning class for HTTP warnings."""
+
+
+def check_type(context: str, obj: object, need: type = str) -> None:
+ if type(obj) is not need:
+ warn(
+ f"{context!r} requires {need.__name__!r}, got {type(obj).__name__!r}.",
+ WSGIWarning,
+ stacklevel=3,
+ )
+
+
+class InputStream:
+ def __init__(self, stream: t.IO[bytes]) -> None:
+ self._stream = stream
+
+ def read(self, *args: t.Any) -> bytes:
+ if len(args) == 0:
+ warn(
+ "WSGI does not guarantee an EOF marker on the input stream, thus making"
+ " calls to 'wsgi.input.read()' unsafe. Conforming servers may never"
+ " return from this call.",
+ WSGIWarning,
+ stacklevel=2,
+ )
+ elif len(args) != 1:
+ warn(
+ "Too many parameters passed to 'wsgi.input.read()'.",
+ WSGIWarning,
+ stacklevel=2,
+ )
+ return self._stream.read(*args)
+
+ def readline(self, *args: t.Any) -> bytes:
+ if len(args) == 0:
+ warn(
+ "Calls to 'wsgi.input.readline()' without arguments are unsafe. Use"
+ " 'wsgi.input.read()' instead.",
+ WSGIWarning,
+ stacklevel=2,
+ )
+ elif len(args) == 1:
+ warn(
+ "'wsgi.input.readline()' was called with a size hint. WSGI does not"
+ " support this, although it's available on all major servers.",
+ WSGIWarning,
+ stacklevel=2,
+ )
+ else:
+ raise TypeError("Too many arguments passed to 'wsgi.input.readline()'.")
+ return self._stream.readline(*args)
+
+ def __iter__(self) -> t.Iterator[bytes]:
+ try:
+ return iter(self._stream)
+ except TypeError:
+ warn("'wsgi.input' is not iterable.", WSGIWarning, stacklevel=2)
+ return iter(())
+
+ def close(self) -> None:
+ warn("The application closed the input stream!", WSGIWarning, stacklevel=2)
+ self._stream.close()
+
+
+class ErrorStream:
+ def __init__(self, stream: t.IO[str]) -> None:
+ self._stream = stream
+
+ def write(self, s: str) -> None:
+ check_type("wsgi.error.write()", s, str)
+ self._stream.write(s)
+
+ def flush(self) -> None:
+ self._stream.flush()
+
+ def writelines(self, seq: t.Iterable[str]) -> None:
+ for line in seq:
+ self.write(line)
+
+ def close(self) -> None:
+ warn("The application closed the error stream!", WSGIWarning, stacklevel=2)
+ self._stream.close()
+
+
+class GuardedWrite:
+ def __init__(self, write: t.Callable[[bytes], object], chunks: list[int]) -> None:
+ self._write = write
+ self._chunks = chunks
+
+ def __call__(self, s: bytes) -> None:
+ check_type("write()", s, bytes)
+ self._write(s)
+ self._chunks.append(len(s))
+
+
+class GuardedIterator:
+ def __init__(
+ self,
+ iterator: t.Iterable[bytes],
+ headers_set: tuple[int, Headers],
+ chunks: list[int],
+ ) -> None:
+ self._iterator = iterator
+ self._next = iter(iterator).__next__
+ self.closed = False
+ self.headers_set = headers_set
+ self.chunks = chunks
+
+ def __iter__(self) -> GuardedIterator:
+ return self
+
+ def __next__(self) -> bytes:
+ if self.closed:
+ warn("Iterated over closed 'app_iter'.", WSGIWarning, stacklevel=2)
+
+ rv = self._next()
+
+ if not self.headers_set:
+ warn(
+ "The application returned before it started the response.",
+ WSGIWarning,
+ stacklevel=2,
+ )
+
+ check_type("application iterator items", rv, bytes)
+ self.chunks.append(len(rv))
+ return rv
+
+ def close(self) -> None:
+ self.closed = True
+
+ if hasattr(self._iterator, "close"):
+ self._iterator.close()
+
+ if self.headers_set:
+ status_code, headers = self.headers_set
+ bytes_sent = sum(self.chunks)
+ content_length = headers.get("content-length", type=int)
+
+ if status_code == 304:
+ for key, _value in headers:
+ key = key.lower()
+ if key not in ("expires", "content-location") and is_entity_header(
+ key
+ ):
+ warn(
+ f"Entity header {key!r} found in 304 response.",
+ HTTPWarning,
+ stacklevel=2,
+ )
+ if bytes_sent:
+ warn(
+ "304 responses must not have a body.",
+ HTTPWarning,
+ stacklevel=2,
+ )
+ elif 100 <= status_code < 200 or status_code == 204:
+ if content_length != 0:
+ warn(
+ f"{status_code} responses must have an empty content length.",
+ HTTPWarning,
+ stacklevel=2,
+ )
+ if bytes_sent:
+ warn(
+ f"{status_code} responses must not have a body.",
+ HTTPWarning,
+ stacklevel=2,
+ )
+ elif content_length is not None and content_length != bytes_sent:
+ warn(
+ "Content-Length and the number of bytes sent to the"
+ " client do not match.",
+ WSGIWarning,
+ stacklevel=2,
+ )
+
+ def __del__(self) -> None:
+ if not self.closed:
+ try:
+ warn(
+ "Iterator was garbage collected before it was closed.",
+ WSGIWarning,
+ stacklevel=2,
+ )
+ except Exception:
+ pass
+
+
+class LintMiddleware:
+ """Warns about common errors in the WSGI and HTTP behavior of the
+ server and wrapped application. Some of the issues it checks are:
+
+ - invalid status codes
+ - non-bytes sent to the WSGI server
+ - strings returned from the WSGI application
+ - non-empty conditional responses
+ - unquoted etags
+ - relative URLs in the Location header
+ - unsafe calls to wsgi.input
+ - unclosed iterators
+
+ Error information is emitted using the :mod:`warnings` module.
+
+ :param app: The WSGI application to wrap.
+
+ .. code-block:: python
+
+ from werkzeug.middleware.lint import LintMiddleware
+ app = LintMiddleware(app)
+ """
+
+ def __init__(self, app: WSGIApplication) -> None:
+ self.app = app
+
+ def check_environ(self, environ: WSGIEnvironment) -> None:
+ if type(environ) is not dict: # noqa: E721
+ warn(
+ "WSGI environment is not a standard Python dict.",
+ WSGIWarning,
+ stacklevel=4,
+ )
+ for key in (
+ "REQUEST_METHOD",
+ "SERVER_NAME",
+ "SERVER_PORT",
+ "wsgi.version",
+ "wsgi.input",
+ "wsgi.errors",
+ "wsgi.multithread",
+ "wsgi.multiprocess",
+ "wsgi.run_once",
+ ):
+ if key not in environ:
+ warn(
+ f"Required environment key {key!r} not found",
+ WSGIWarning,
+ stacklevel=3,
+ )
+ if environ["wsgi.version"] != (1, 0):
+ warn("Environ is not a WSGI 1.0 environ.", WSGIWarning, stacklevel=3)
+
+ script_name = environ.get("SCRIPT_NAME", "")
+ path_info = environ.get("PATH_INFO", "")
+
+ if script_name and script_name[0] != "/":
+ warn(
+ f"'SCRIPT_NAME' does not start with a slash: {script_name!r}",
+ WSGIWarning,
+ stacklevel=3,
+ )
+
+ if path_info and path_info[0] != "/":
+ warn(
+ f"'PATH_INFO' does not start with a slash: {path_info!r}",
+ WSGIWarning,
+ stacklevel=3,
+ )
+
+ def check_start_response(
+ self,
+ status: str,
+ headers: list[tuple[str, str]],
+ exc_info: None | (tuple[type[BaseException], BaseException, TracebackType]),
+ ) -> tuple[int, Headers]:
+ check_type("status", status, str)
+ status_code_str = status.split(None, 1)[0]
+
+ if len(status_code_str) != 3 or not status_code_str.isdecimal():
+ warn("Status code must be three digits.", WSGIWarning, stacklevel=3)
+
+ if len(status) < 4 or status[3] != " ":
+ warn(
+ f"Invalid value for status {status!r}. Valid status strings are three"
+ " digits, a space and a status explanation.",
+ WSGIWarning,
+ stacklevel=3,
+ )
+
+ status_code = int(status_code_str)
+
+ if status_code < 100:
+ warn("Status code < 100 detected.", WSGIWarning, stacklevel=3)
+
+ if type(headers) is not list: # noqa: E721
+ warn("Header list is not a list.", WSGIWarning, stacklevel=3)
+
+ for item in headers:
+ if type(item) is not tuple or len(item) != 2:
+ warn("Header items must be 2-item tuples.", WSGIWarning, stacklevel=3)
+ name, value = item
+ if type(name) is not str or type(value) is not str: # noqa: E721
+ warn(
+ "Header keys and values must be strings.", WSGIWarning, stacklevel=3
+ )
+ if name.lower() == "status":
+ warn(
+ "The status header is not supported due to"
+ " conflicts with the CGI spec.",
+ WSGIWarning,
+ stacklevel=3,
+ )
+
+ if exc_info is not None and not isinstance(exc_info, tuple):
+ warn("Invalid value for exc_info.", WSGIWarning, stacklevel=3)
+
+ headers_obj = Headers(headers)
+ self.check_headers(headers_obj)
+
+ return status_code, headers_obj
+
+ def check_headers(self, headers: Headers) -> None:
+ etag = headers.get("etag")
+
+ if etag is not None:
+ if etag.startswith(("W/", "w/")):
+ if etag.startswith("w/"):
+ warn(
+ "Weak etag indicator should be upper case.",
+ HTTPWarning,
+ stacklevel=4,
+ )
+
+ etag = etag[2:]
+
+ if not (etag[:1] == etag[-1:] == '"'):
+ warn("Unquoted etag emitted.", HTTPWarning, stacklevel=4)
+
+ location = headers.get("location")
+
+ if location is not None:
+ if not urlparse(location).netloc:
+ warn(
+ "Absolute URLs required for location header.",
+ HTTPWarning,
+ stacklevel=4,
+ )
+
+ def check_iterator(self, app_iter: t.Iterable[bytes]) -> None:
+ if isinstance(app_iter, str):
+ warn(
+ "The application returned a string. The response will send one"
+ " character at a time to the client, which will kill performance."
+ " Return a list or iterable instead.",
+ WSGIWarning,
+ stacklevel=3,
+ )
+
+ def __call__(self, *args: t.Any, **kwargs: t.Any) -> t.Iterable[bytes]:
+ if len(args) != 2:
+ warn("A WSGI app takes two arguments.", WSGIWarning, stacklevel=2)
+
+ if kwargs:
+ warn(
+ "A WSGI app does not take keyword arguments.", WSGIWarning, stacklevel=2
+ )
+
+ environ: WSGIEnvironment = args[0]
+ start_response: StartResponse = args[1]
+
+ self.check_environ(environ)
+ environ["wsgi.input"] = InputStream(environ["wsgi.input"])
+ environ["wsgi.errors"] = ErrorStream(environ["wsgi.errors"])
+
+ # Hook our own file wrapper in so that applications will always
+ # iterate to the end and we can check the content length.
+ environ["wsgi.file_wrapper"] = FileWrapper
+
+ headers_set: list[t.Any] = []
+ chunks: list[int] = []
+
+ def checking_start_response(
+ *args: t.Any, **kwargs: t.Any
+ ) -> t.Callable[[bytes], None]:
+ if len(args) not in {2, 3}:
+ warn(
+ f"Invalid number of arguments: {len(args)}, expected 2 or 3.",
+ WSGIWarning,
+ stacklevel=2,
+ )
+
+ if kwargs:
+ warn(
+ "'start_response' does not take keyword arguments.",
+ WSGIWarning,
+ stacklevel=2,
+ )
+
+ status: str = args[0]
+ headers: list[tuple[str, str]] = args[1]
+ exc_info: (
+ None | (tuple[type[BaseException], BaseException, TracebackType])
+ ) = args[2] if len(args) == 3 else None
+
+ headers_set[:] = self.check_start_response(status, headers, exc_info)
+ return GuardedWrite(start_response(status, headers, exc_info), chunks)
+
+ app_iter = self.app(environ, t.cast("StartResponse", checking_start_response))
+ self.check_iterator(app_iter)
+ return GuardedIterator(
+ app_iter, t.cast(t.Tuple[int, Headers], headers_set), chunks
+ )
diff --git a/venv/lib/python3.8/site-packages/werkzeug/middleware/profiler.py b/venv/lib/python3.8/site-packages/werkzeug/middleware/profiler.py
new file mode 100644
index 0000000..112b877
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/werkzeug/middleware/profiler.py
@@ -0,0 +1,155 @@
+"""
+Application Profiler
+====================
+
+This module provides a middleware that profiles each request with the
+:mod:`cProfile` module. This can help identify bottlenecks in your code
+that may be slowing down your application.
+
+.. autoclass:: ProfilerMiddleware
+
+:copyright: 2007 Pallets
+:license: BSD-3-Clause
+"""
+
+from __future__ import annotations
+
+import os.path
+import sys
+import time
+import typing as t
+from pstats import Stats
+
+try:
+ from cProfile import Profile
+except ImportError:
+ from profile import Profile # type: ignore
+
+if t.TYPE_CHECKING:
+ from _typeshed.wsgi import StartResponse
+ from _typeshed.wsgi import WSGIApplication
+ from _typeshed.wsgi import WSGIEnvironment
+
+
+class ProfilerMiddleware:
+ """Wrap a WSGI application and profile the execution of each
+ request. Responses are buffered so that timings are more exact.
+
+ If ``stream`` is given, :class:`pstats.Stats` are written to it
+ after each request. If ``profile_dir`` is given, :mod:`cProfile`
+ data files are saved to that directory, one file per request.
+
+ The filename can be customized by passing ``filename_format``. If
+ it is a string, it will be formatted using :meth:`str.format` with
+ the following fields available:
+
+ - ``{method}`` - The request method; GET, POST, etc.
+ - ``{path}`` - The request path or 'root' should one not exist.
+ - ``{elapsed}`` - The elapsed time of the request in milliseconds.
+ - ``{time}`` - The time of the request.
+
+ If it is a callable, it will be called with the WSGI ``environ`` and
+ be expected to return a filename string. The ``environ`` dictionary
+ will also have the ``"werkzeug.profiler"`` key populated with a
+ dictionary containing the following fields (more may be added in the
+ future):
+ - ``{elapsed}`` - The elapsed time of the request in milliseconds.
+ - ``{time}`` - The time of the request.
+
+ :param app: The WSGI application to wrap.
+ :param stream: Write stats to this stream. Disable with ``None``.
+ :param sort_by: A tuple of columns to sort stats by. See
+ :meth:`pstats.Stats.sort_stats`.
+ :param restrictions: A tuple of restrictions to filter stats by. See
+ :meth:`pstats.Stats.print_stats`.
+ :param profile_dir: Save profile data files to this directory.
+ :param filename_format: Format string for profile data file names,
+ or a callable returning a name. See explanation above.
+
+ .. code-block:: python
+
+ from werkzeug.middleware.profiler import ProfilerMiddleware
+ app = ProfilerMiddleware(app)
+
+ .. versionchanged:: 3.0
+ Added the ``"werkzeug.profiler"`` key to the ``filename_format(environ)``
+ parameter with the ``elapsed`` and ``time`` fields.
+
+ .. versionchanged:: 0.15
+ Stats are written even if ``profile_dir`` is given, and can be
+ disable by passing ``stream=None``.
+
+ .. versionadded:: 0.15
+ Added ``filename_format``.
+
+ .. versionadded:: 0.9
+ Added ``restrictions`` and ``profile_dir``.
+ """
+
+ def __init__(
+ self,
+ app: WSGIApplication,
+ stream: t.IO[str] | None = sys.stdout,
+ sort_by: t.Iterable[str] = ("time", "calls"),
+ restrictions: t.Iterable[str | int | float] = (),
+ profile_dir: str | None = None,
+ filename_format: str = "{method}.{path}.{elapsed:.0f}ms.{time:.0f}.prof",
+ ) -> None:
+ self._app = app
+ self._stream = stream
+ self._sort_by = sort_by
+ self._restrictions = restrictions
+ self._profile_dir = profile_dir
+ self._filename_format = filename_format
+
+ def __call__(
+ self, environ: WSGIEnvironment, start_response: StartResponse
+ ) -> t.Iterable[bytes]:
+ response_body: list[bytes] = []
+
+ def catching_start_response(status, headers, exc_info=None): # type: ignore
+ start_response(status, headers, exc_info)
+ return response_body.append
+
+ def runapp() -> None:
+ app_iter = self._app(
+ environ, t.cast("StartResponse", catching_start_response)
+ )
+ response_body.extend(app_iter)
+
+ if hasattr(app_iter, "close"):
+ app_iter.close()
+
+ profile = Profile()
+ start = time.time()
+ profile.runcall(runapp)
+ body = b"".join(response_body)
+ elapsed = time.time() - start
+
+ if self._profile_dir is not None:
+ if callable(self._filename_format):
+ environ["werkzeug.profiler"] = {
+ "elapsed": elapsed * 1000.0,
+ "time": time.time(),
+ }
+ filename = self._filename_format(environ)
+ else:
+ filename = self._filename_format.format(
+ method=environ["REQUEST_METHOD"],
+ path=environ["PATH_INFO"].strip("/").replace("/", ".") or "root",
+ elapsed=elapsed * 1000.0,
+ time=time.time(),
+ )
+ filename = os.path.join(self._profile_dir, filename)
+ profile.dump_stats(filename)
+
+ if self._stream is not None:
+ stats = Stats(profile, stream=self._stream)
+ stats.sort_stats(*self._sort_by)
+ print("-" * 80, file=self._stream)
+ path_info = environ.get("PATH_INFO", "")
+ print(f"PATH: {path_info!r}", file=self._stream)
+ stats.print_stats(*self._restrictions)
+ print(f"{'-' * 80}\n", file=self._stream)
+
+ return [body]
diff --git a/venv/lib/python3.8/site-packages/werkzeug/middleware/proxy_fix.py b/venv/lib/python3.8/site-packages/werkzeug/middleware/proxy_fix.py
new file mode 100644
index 0000000..cbf4e0b
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/werkzeug/middleware/proxy_fix.py
@@ -0,0 +1,183 @@
+"""
+X-Forwarded-For Proxy Fix
+=========================
+
+This module provides a middleware that adjusts the WSGI environ based on
+``X-Forwarded-`` headers that proxies in front of an application may
+set.
+
+When an application is running behind a proxy server, WSGI may see the
+request as coming from that server rather than the real client. Proxies
+set various headers to track where the request actually came from.
+
+This middleware should only be used if the application is actually
+behind such a proxy, and should be configured with the number of proxies
+that are chained in front of it. Not all proxies set all the headers.
+Since incoming headers can be faked, you must set how many proxies are
+setting each header so the middleware knows what to trust.
+
+.. autoclass:: ProxyFix
+
+:copyright: 2007 Pallets
+:license: BSD-3-Clause
+"""
+
+from __future__ import annotations
+
+import typing as t
+
+from ..http import parse_list_header
+
+if t.TYPE_CHECKING:
+ from _typeshed.wsgi import StartResponse
+ from _typeshed.wsgi import WSGIApplication
+ from _typeshed.wsgi import WSGIEnvironment
+
+
+class ProxyFix:
+ """Adjust the WSGI environ based on ``X-Forwarded-`` that proxies in
+ front of the application may set.
+
+ - ``X-Forwarded-For`` sets ``REMOTE_ADDR``.
+ - ``X-Forwarded-Proto`` sets ``wsgi.url_scheme``.
+ - ``X-Forwarded-Host`` sets ``HTTP_HOST``, ``SERVER_NAME``, and
+ ``SERVER_PORT``.
+ - ``X-Forwarded-Port`` sets ``HTTP_HOST`` and ``SERVER_PORT``.
+ - ``X-Forwarded-Prefix`` sets ``SCRIPT_NAME``.
+
+ You must tell the middleware how many proxies set each header so it
+ knows what values to trust. It is a security issue to trust values
+ that came from the client rather than a proxy.
+
+ The original values of the headers are stored in the WSGI
+ environ as ``werkzeug.proxy_fix.orig``, a dict.
+
+ :param app: The WSGI application to wrap.
+ :param x_for: Number of values to trust for ``X-Forwarded-For``.
+ :param x_proto: Number of values to trust for ``X-Forwarded-Proto``.
+ :param x_host: Number of values to trust for ``X-Forwarded-Host``.
+ :param x_port: Number of values to trust for ``X-Forwarded-Port``.
+ :param x_prefix: Number of values to trust for
+ ``X-Forwarded-Prefix``.
+
+ .. code-block:: python
+
+ from werkzeug.middleware.proxy_fix import ProxyFix
+ # App is behind one proxy that sets the -For and -Host headers.
+ app = ProxyFix(app, x_for=1, x_host=1)
+
+ .. versionchanged:: 1.0
+ The ``num_proxies`` argument and attribute; the ``get_remote_addr`` method; and
+ the environ keys ``orig_remote_addr``, ``orig_wsgi_url_scheme``, and
+ ``orig_http_host`` were removed.
+
+ .. versionchanged:: 0.15
+ All headers support multiple values. Each header is configured with a separate
+ number of trusted proxies.
+
+ .. versionchanged:: 0.15
+ Original WSGI environ values are stored in the ``werkzeug.proxy_fix.orig`` dict.
+
+ .. versionchanged:: 0.15
+ Support ``X-Forwarded-Port`` and ``X-Forwarded-Prefix``.
+
+ .. versionchanged:: 0.15
+ ``X-Forwarded-Host`` and ``X-Forwarded-Port`` modify
+ ``SERVER_NAME`` and ``SERVER_PORT``.
+ """
+
+ def __init__(
+ self,
+ app: WSGIApplication,
+ x_for: int = 1,
+ x_proto: int = 1,
+ x_host: int = 0,
+ x_port: int = 0,
+ x_prefix: int = 0,
+ ) -> None:
+ self.app = app
+ self.x_for = x_for
+ self.x_proto = x_proto
+ self.x_host = x_host
+ self.x_port = x_port
+ self.x_prefix = x_prefix
+
+ def _get_real_value(self, trusted: int, value: str | None) -> str | None:
+ """Get the real value from a list header based on the configured
+ number of trusted proxies.
+
+ :param trusted: Number of values to trust in the header.
+ :param value: Comma separated list header value to parse.
+ :return: The real value, or ``None`` if there are fewer values
+ than the number of trusted proxies.
+
+ .. versionchanged:: 1.0
+ Renamed from ``_get_trusted_comma``.
+
+ .. versionadded:: 0.15
+ """
+ if not (trusted and value):
+ return None
+ values = parse_list_header(value)
+ if len(values) >= trusted:
+ return values[-trusted]
+ return None
+
+ def __call__(
+ self, environ: WSGIEnvironment, start_response: StartResponse
+ ) -> t.Iterable[bytes]:
+ """Modify the WSGI environ based on the various ``Forwarded``
+ headers before calling the wrapped application. Store the
+ original environ values in ``werkzeug.proxy_fix.orig_{key}``.
+ """
+ environ_get = environ.get
+ orig_remote_addr = environ_get("REMOTE_ADDR")
+ orig_wsgi_url_scheme = environ_get("wsgi.url_scheme")
+ orig_http_host = environ_get("HTTP_HOST")
+ environ.update(
+ {
+ "werkzeug.proxy_fix.orig": {
+ "REMOTE_ADDR": orig_remote_addr,
+ "wsgi.url_scheme": orig_wsgi_url_scheme,
+ "HTTP_HOST": orig_http_host,
+ "SERVER_NAME": environ_get("SERVER_NAME"),
+ "SERVER_PORT": environ_get("SERVER_PORT"),
+ "SCRIPT_NAME": environ_get("SCRIPT_NAME"),
+ }
+ }
+ )
+
+ x_for = self._get_real_value(self.x_for, environ_get("HTTP_X_FORWARDED_FOR"))
+ if x_for:
+ environ["REMOTE_ADDR"] = x_for
+
+ x_proto = self._get_real_value(
+ self.x_proto, environ_get("HTTP_X_FORWARDED_PROTO")
+ )
+ if x_proto:
+ environ["wsgi.url_scheme"] = x_proto
+
+ x_host = self._get_real_value(self.x_host, environ_get("HTTP_X_FORWARDED_HOST"))
+ if x_host:
+ environ["HTTP_HOST"] = environ["SERVER_NAME"] = x_host
+ # "]" to check for IPv6 address without port
+ if ":" in x_host and not x_host.endswith("]"):
+ environ["SERVER_NAME"], environ["SERVER_PORT"] = x_host.rsplit(":", 1)
+
+ x_port = self._get_real_value(self.x_port, environ_get("HTTP_X_FORWARDED_PORT"))
+ if x_port:
+ host = environ.get("HTTP_HOST")
+ if host:
+ # "]" to check for IPv6 address without port
+ if ":" in host and not host.endswith("]"):
+ host = host.rsplit(":", 1)[0]
+ environ["HTTP_HOST"] = f"{host}:{x_port}"
+ environ["SERVER_PORT"] = x_port
+
+ x_prefix = self._get_real_value(
+ self.x_prefix, environ_get("HTTP_X_FORWARDED_PREFIX")
+ )
+ if x_prefix:
+ environ["SCRIPT_NAME"] = x_prefix
+
+ return self.app(environ, start_response)
diff --git a/venv/lib/python3.8/site-packages/werkzeug/middleware/shared_data.py b/venv/lib/python3.8/site-packages/werkzeug/middleware/shared_data.py
new file mode 100644
index 0000000..0f467f2
--- /dev/null
+++ b/venv/lib/python3.8/site-packages/werkzeug/middleware/shared_data.py
@@ -0,0 +1,283 @@
+"""
+Serve Shared Static Files
+=========================
+
+.. autoclass:: SharedDataMiddleware
+ :members: is_allowed
+
+:copyright: 2007 Pallets
+:license: BSD-3-Clause
+"""
+
+from __future__ import annotations
+
+import collections.abc as cabc
+import importlib.util
+import mimetypes
+import os
+import posixpath
+import typing as t
+from datetime import datetime
+from datetime import timezone
+from io import BytesIO
+from time import time
+from zlib import adler32
+
+from ..http import http_date
+from ..http import is_resource_modified
+from ..security import safe_join
+from ..utils import get_content_type
+from ..wsgi import get_path_info
+from ..wsgi import wrap_file
+
+_TOpener = t.Callable[[], t.Tuple[t.IO[bytes], datetime, int]]
+_TLoader = t.Callable[[t.Optional[str]], t.Tuple[t.Optional[str], t.Optional[_TOpener]]]
+
+if t.TYPE_CHECKING:
+ from _typeshed.wsgi import StartResponse
+ from _typeshed.wsgi import WSGIApplication
+ from _typeshed.wsgi import WSGIEnvironment
+
+
+class SharedDataMiddleware:
+ """A WSGI middleware which provides static content for development
+ environments or simple server setups. Its usage is quite simple::
+
+ import os
+ from werkzeug.middleware.shared_data import SharedDataMiddleware
+
+ app = SharedDataMiddleware(app, {
+ '/shared': os.path.join(os.path.dirname(__file__), 'shared')
+ })
+
+ The contents of the folder ``./shared`` will now be available on
+ ``http://example.com/shared/``. This is pretty useful during development
+ because a standalone media server is not required. Files can also be
+ mounted on the root folder and still continue to use the application because
+ the shared data middleware forwards all unhandled requests to the
+ application, even if the requests are below one of the shared folders.
+
+ If `pkg_resources` is available you can also tell the middleware to serve
+ files from package data::
+
+ app = SharedDataMiddleware(app, {
+ '/static': ('myapplication', 'static')
+ })
+
+ This will then serve the ``static`` folder in the `myapplication`
+ Python package.
+
+ The optional `disallow` parameter can be a list of :func:`~fnmatch.fnmatch`
+ rules for files that are not accessible from the web. If `cache` is set to
+ `False` no caching headers are sent.
+
+ Currently the middleware does not support non-ASCII filenames. If the
+ encoding on the file system happens to match the encoding of the URI it may
+ work but this could also be by accident. We strongly suggest using ASCII
+ only file names for static files.
+
+ The middleware will guess the mimetype using the Python `mimetype`
+ module. If it's unable to figure out the charset it will fall back
+ to `fallback_mimetype`.
+
+ :param app: the application to wrap. If you don't want to wrap an
+ application you can pass it :exc:`NotFound`.
+ :param exports: a list or dict of exported files and folders.
+ :param disallow: a list of :func:`~fnmatch.fnmatch` rules.
+ :param cache: enable or disable caching headers.
+ :param cache_timeout: the cache timeout in seconds for the headers.
+ :param fallback_mimetype: The fallback mimetype for unknown files.
+
+ .. versionchanged:: 1.0
+ The default ``fallback_mimetype`` is
+ ``application/octet-stream``. If a filename looks like a text
+ mimetype, the ``utf-8`` charset is added to it.
+
+ .. versionadded:: 0.6
+ Added ``fallback_mimetype``.
+
+ .. versionchanged:: 0.5
+ Added ``cache_timeout``.
+ """
+
+ def __init__(
+ self,
+ app: WSGIApplication,
+ exports: (
+ cabc.Mapping[str, str | tuple[str, str]]
+ | t.Iterable[tuple[str, str | tuple[str, str]]]
+ ),
+ disallow: None = None,
+ cache: bool = True,
+ cache_timeout: int = 60 * 60 * 12,
+ fallback_mimetype: str = "application/octet-stream",
+ ) -> None:
+ self.app = app
+ self.exports: list[tuple[str, _TLoader]] = []
+ self.cache = cache
+ self.cache_timeout = cache_timeout
+
+ if isinstance(exports, cabc.Mapping):
+ exports = exports.items()
+
+ for key, value in exports:
+ if isinstance(value, tuple):
+ loader = self.get_package_loader(*value)
+ elif isinstance(value, str):
+ if os.path.isfile(value):
+ loader = self.get_file_loader(value)
+ else:
+ loader = self.get_directory_loader(value)
+ else:
+ raise TypeError(f"unknown def {value!r}")
+
+ self.exports.append((key, loader))
+
+ if disallow is not None:
+ from fnmatch import fnmatch
+
+ self.is_allowed = lambda x: not fnmatch(x, disallow)
+
+ self.fallback_mimetype = fallback_mimetype
+
+ def is_allowed(self, filename: str) -> bool:
+ """Subclasses can override this method to disallow the access to
+ certain files. However by providing `disallow` in the constructor
+ this method is overwritten.
+ """
+ return True
+
+ def _opener(self, filename: str) -> _TOpener:
+ return lambda: (
+ open(filename, "rb"),
+ datetime.fromtimestamp(os.path.getmtime(filename), tz=timezone.utc),
+ int(os.path.getsize(filename)),
+ )
+
+ def get_file_loader(self, filename: str) -> _TLoader:
+ return lambda x: (os.path.basename(filename), self._opener(filename))
+
+ def get_package_loader(self, package: str, package_path: str) -> _TLoader:
+ load_time = datetime.now(timezone.utc)
+ spec = importlib.util.find_spec(package)
+ reader = spec.loader.get_resource_reader(package) # type: ignore[union-attr]
+
+ def loader(
+ path: str | None,
+ ) -> tuple[str | None, _TOpener | None]:
+ if path is None:
+ return None, None
+
+ path = safe_join(package_path, path)
+
+ if path is None:
+ return None, None
+
+ basename = posixpath.basename(path)
+
+ try:
+ resource = reader.open_resource(path)
+ except OSError:
+ return None, None
+
+ if isinstance(resource, BytesIO):
+ return (
+ basename,
+ lambda: (resource, load_time, len(resource.getvalue())),
+ )
+
+ return (
+ basename,
+ lambda: (
+ resource,
+ datetime.fromtimestamp(
+ os.path.getmtime(resource.name), tz=timezone.utc
+ ),
+ os.path.getsize(resource.name),
+ ),
+ )
+
+ return loader
+
+ def get_directory_loader(self, directory: str) -> _TLoader:
+ def loader(
+ path: str | None,
+ ) -> tuple[str | None, _TOpener | None]:
+ if path is not None:
+ path = safe_join(directory, path)
+
+ if path is None:
+ return None, None
+ else:
+ path = directory
+
+ if os.path.isfile(path):
+ return os.path.basename(path), self._opener(path)
+
+ return None, None
+
+ return loader
+
+ def generate_etag(self, mtime: datetime, file_size: int, real_filename: str) -> str:
+ fn_str = os.fsencode(real_filename)
+ timestamp = mtime.timestamp()
+ checksum = adler32(fn_str) & 0xFFFFFFFF
+ return f"wzsdm-{timestamp}-{file_size}-{checksum}"
+
+ def __call__(
+ self, environ: WSGIEnvironment, start_response: StartResponse
+ ) -> t.Iterable[bytes]:
+ path = get_path_info(environ)
+ file_loader = None
+
+ for search_path, loader in self.exports:
+ if search_path == path:
+ real_filename, file_loader = loader(None)
+
+ if file_loader is not None:
+ break
+
+ if not search_path.endswith("/"):
+ search_path += "/"
+
+ if path.startswith(search_path):
+ real_filename, file_loader = loader(path[len(search_path) :])
+
+ if file_loader is not None:
+ break
+
+ if file_loader is None or not self.is_allowed(real_filename): # type: ignore
+ return self.app(environ, start_response)
+
+ guessed_type = mimetypes.guess_type(real_filename) # type: ignore
+ mime_type = get_content_type(guessed_type[0] or self.fallback_mimetype, "utf-8")
+ f, mtime, file_size = file_loader()
+
+ headers = [("Date", http_date())]
+
+ if self.cache:
+ timeout = self.cache_timeout
+ etag = self.generate_etag(mtime, file_size, real_filename) # type: ignore
+ headers += [
+ ("Etag", f'"{etag}"'),
+ ("Cache-Control", f"max-age={timeout}, public"),
+ ]
+
+ if not is_resource_modified(environ, etag, last_modified=mtime):
+ f.close()
+ start_response("304 Not Modified", headers)
+ return []
+
+ headers.append(("Expires", http_date(time() + timeout)))
+ else:
+ headers.append(("Cache-Control", "public"))
+
+ headers.extend(
+ (
+ ("Content-Type", mime_type),
+ ("Content-Length", str(file_size)),
+ ("Last-Modified", http_date(mtime)),
+ )
+ )
+ start_response("200 OK", headers)
+ return wrap_file(environ, f)