import io from tempfile import SpooledTemporaryFile from typing import BinaryIO import pytest from starlette.datastructures import ( URL, CommaSeparatedStrings, FormData, Headers, MultiDict, MutableHeaders, QueryParams, UploadFile, ) def test_url() -> None: u = URL("https://example.org:123/path/to/somewhere?abc=123#anchor") assert u.scheme == "https" assert u.hostname == "example.org" assert u.port == 123 assert u.netloc == "example.org:123" assert u.username is None assert u.password is None assert u.path == "/path/to/somewhere" assert u.query == "abc=123" assert u.fragment == "anchor" new = u.replace(scheme="http") assert new == "http://example.org:123/path/to/somewhere?abc=123#anchor" assert new.scheme == "http" new = u.replace(port=None) assert new == "https://example.org/path/to/somewhere?abc=123#anchor" assert new.port is None new = u.replace(hostname="example.com") assert new == "https://example.com:123/path/to/somewhere?abc=123#anchor" assert new.hostname == "example.com" ipv6_url = URL("https://[fe::2]:12345") new = ipv6_url.replace(port=8080) assert new == "https://[fe::2]:8080" new = ipv6_url.replace(username="username", password="password") assert new == "https://username:password@[fe::2]:12345" assert new.netloc == "username:password@[fe::2]:12345" ipv6_url = URL("https://[fe::2]") new = ipv6_url.replace(port=123) assert new == "https://[fe::2]:123" url = URL("http://u:p@host/") assert url.replace(hostname="bar") == URL("http://u:p@bar/") url = URL("http://u:p@host:80") assert url.replace(port=88) == URL("http://u:p@host:88") def test_url_query_params() -> None: u = URL("https://example.org/path/?page=3") assert u.query == "page=3" u = u.include_query_params(page=4) assert str(u) == "https://example.org/path/?page=4" u = u.include_query_params(search="testing") assert str(u) == "https://example.org/path/?page=4&search=testing" u = u.replace_query_params(order="name") assert str(u) == "https://example.org/path/?order=name" u = u.remove_query_params("order") assert str(u) == "https://example.org/path/" def test_hidden_password() -> None: u = URL("https://example.org/path/to/somewhere") assert repr(u) == "URL('https://example.org/path/to/somewhere')" u = URL("https://username@example.org/path/to/somewhere") assert repr(u) == "URL('https://username@example.org/path/to/somewhere')" u = URL("https://username:password@example.org/path/to/somewhere") assert repr(u) == "URL('https://username:********@example.org/path/to/somewhere')" def test_csv() -> None: csv = CommaSeparatedStrings('"localhost", "127.0.0.1", 0.0.0.0') assert list(csv) == ["localhost", "127.0.0.1", "0.0.0.0"] assert repr(csv) == "CommaSeparatedStrings(['localhost', '127.0.0.1', '0.0.0.0'])" assert str(csv) == "'localhost', '127.0.0.1', '0.0.0.0'" assert csv[0] == "localhost" assert len(csv) == 3 csv = CommaSeparatedStrings("'localhost', '127.0.0.1', 0.0.0.0") assert list(csv) == ["localhost", "127.0.0.1", "0.0.0.0"] assert repr(csv) == "CommaSeparatedStrings(['localhost', '127.0.0.1', '0.0.0.0'])" assert str(csv) == "'localhost', '127.0.0.1', '0.0.0.0'" csv = CommaSeparatedStrings("localhost, 127.0.0.1, 0.0.0.0") assert list(csv) == ["localhost", "127.0.0.1", "0.0.0.0"] assert repr(csv) == "CommaSeparatedStrings(['localhost', '127.0.0.1', '0.0.0.0'])" assert str(csv) == "'localhost', '127.0.0.1', '0.0.0.0'" csv = CommaSeparatedStrings(["localhost", "127.0.0.1", "0.0.0.0"]) assert list(csv) == ["localhost", "127.0.0.1", "0.0.0.0"] assert repr(csv) == "CommaSeparatedStrings(['localhost', '127.0.0.1', '0.0.0.0'])" assert str(csv) == "'localhost', '127.0.0.1', '0.0.0.0'" def test_url_from_scope() -> None: u = URL( scope={"path": "/path/to/somewhere", "query_string": b"abc=123", "headers": []} ) assert u == "/path/to/somewhere?abc=123" assert repr(u) == "URL('/path/to/somewhere?abc=123')" u = URL( scope={ "scheme": "https", "server": ("example.org", 123), "path": "/path/to/somewhere", "query_string": b"abc=123", "headers": [], } ) assert u == "https://example.org:123/path/to/somewhere?abc=123" assert repr(u) == "URL('https://example.org:123/path/to/somewhere?abc=123')" u = URL( scope={ "scheme": "https", "server": ("example.org", 443), "path": "/path/to/somewhere", "query_string": b"abc=123", "headers": [], } ) assert u == "https://example.org/path/to/somewhere?abc=123" assert repr(u) == "URL('https://example.org/path/to/somewhere?abc=123')" def test_headers() -> None: h = Headers(raw=[(b"a", b"123"), (b"a", b"456"), (b"b", b"789")]) assert "a" in h assert "A" in h assert "b" in h assert "B" in h assert "c" not in h assert h["a"] == "123" assert h.get("a") == "123" assert h.get("nope", default=None) is None assert h.getlist("a") == ["123", "456"] assert h.keys() == ["a", "a", "b"] assert h.values() == ["123", "456", "789"] assert h.items() == [("a", "123"), ("a", "456"), ("b", "789")] assert list(h) == ["a", "a", "b"] assert dict(h) == {"a": "123", "b": "789"} assert repr(h) == "Headers(raw=[(b'a', b'123'), (b'a', b'456'), (b'b', b'789')])" assert h == Headers(raw=[(b"a", b"123"), (b"b", b"789"), (b"a", b"456")]) assert h != [(b"a", b"123"), (b"A", b"456"), (b"b", b"789")] h = Headers({"a": "123", "b": "789"}) assert h["A"] == "123" assert h["B"] == "789" assert h.raw == [(b"a", b"123"), (b"b", b"789")] assert repr(h) == "Headers({'a': '123', 'b': '789'})" def test_mutable_headers() -> None: h = MutableHeaders() assert dict(h) == {} h["a"] = "1" assert dict(h) == {"a": "1"} h["a"] = "2" assert dict(h) == {"a": "2"} h.setdefault("a", "3") assert dict(h) == {"a": "2"} h.setdefault("b", "4") assert dict(h) == {"a": "2", "b": "4"} del h["a"] assert dict(h) == {"b": "4"} assert h.raw == [(b"b", b"4")] def test_mutable_headers_merge() -> None: h = MutableHeaders() h = h | MutableHeaders({"a": "1"}) assert isinstance(h, MutableHeaders) assert dict(h) == {"a": "1"} assert h.items() == [("a", "1")] assert h.raw == [(b"a", b"1")] def test_mutable_headers_merge_dict() -> None: h = MutableHeaders() h = h | {"a": "1"} assert isinstance(h, MutableHeaders) assert dict(h) == {"a": "1"} assert h.items() == [("a", "1")] assert h.raw == [(b"a", b"1")] def test_mutable_headers_update() -> None: h = MutableHeaders() h |= MutableHeaders({"a": "1"}) assert isinstance(h, MutableHeaders) assert dict(h) == {"a": "1"} assert h.items() == [("a", "1")] assert h.raw == [(b"a", b"1")] def test_mutable_headers_update_dict() -> None: h = MutableHeaders() h |= {"a": "1"} assert isinstance(h, MutableHeaders) assert dict(h) == {"a": "1"} assert h.items() == [("a", "1")] assert h.raw == [(b"a", b"1")] def test_mutable_headers_merge_not_mapping() -> None: h = MutableHeaders() with pytest.raises(TypeError): h |= {"not_mapping"} # type: ignore[arg-type] with pytest.raises(TypeError): h | {"not_mapping"} # type: ignore[operator] def test_headers_mutablecopy() -> None: h = Headers(raw=[(b"a", b"123"), (b"a", b"456"), (b"b", b"789")]) c = h.mutablecopy() assert c.items() == [("a", "123"), ("a", "456"), ("b", "789")] c["a"] = "abc" assert c.items() == [("a", "abc"), ("b", "789")] def test_mutable_headers_from_scope() -> None: # "headers" in scope must not necessarily be a list h = MutableHeaders(scope={"headers": ((b"a", b"1"),)}) assert dict(h) == {"a": "1"} h.update({"b": "2"}) assert dict(h) == {"a": "1", "b": "2"} assert list(h.items()) == [("a", "1"), ("b", "2")] assert list(h.raw) == [(b"a", b"1"), (b"b", b"2")] def test_url_blank_params() -> None: q = QueryParams("a=123&abc&def&b=456") assert "a" in q assert "abc" in q assert "def" in q assert "b" in q val = q.get("abc") assert val is not None assert len(val) == 0 assert len(q["a"]) == 3 assert list(q.keys()) == ["a", "abc", "def", "b"] def test_queryparams() -> None: q = QueryParams("a=123&a=456&b=789") assert "a" in q assert "A" not in q assert "c" not in q assert q["a"] == "456" assert q.get("a") == "456" assert q.get("nope", default=None) is None assert q.getlist("a") == ["123", "456"] assert list(q.keys()) == ["a", "b"] assert list(q.values()) == ["456", "789"] assert list(q.items()) == [("a", "456"), ("b", "789")] assert len(q) == 2 assert list(q) == ["a", "b"] assert dict(q) == {"a": "456", "b": "789"} assert str(q) == "a=123&a=456&b=789" assert repr(q) == "QueryParams('a=123&a=456&b=789')" assert QueryParams({"a": "123", "b": "456"}) == QueryParams( [("a", "123"), ("b", "456")] ) assert QueryParams({"a": "123", "b": "456"}) == QueryParams("a=123&b=456") assert QueryParams({"a": "123", "b": "456"}) == QueryParams( {"b": "456", "a": "123"} ) assert QueryParams() == QueryParams({}) assert QueryParams([("a", "123"), ("a", "456")]) == QueryParams("a=123&a=456") assert QueryParams({"a": "123", "b": "456"}) != "invalid" q = QueryParams([("a", "123"), ("a", "456")]) assert QueryParams(q) == q @pytest.mark.anyio async def test_upload_file_file_input() -> None: """Test passing file/stream into the UploadFile constructor""" stream = io.BytesIO(b"data") file = UploadFile(filename="file", file=stream, size=4) assert await file.read() == b"data" assert file.size == 4 await file.write(b" and more data!") assert await file.read() == b"" assert file.size == 19 await file.seek(0) assert await file.read() == b"data and more data!" @pytest.mark.anyio async def test_upload_file_without_size() -> None: """Test passing file/stream into the UploadFile constructor without size""" stream = io.BytesIO(b"data") file = UploadFile(filename="file", file=stream) assert await file.read() == b"data" assert file.size is None await file.write(b" and more data!") assert await file.read() == b"" assert file.size is None await file.seek(0) assert await file.read() == b"data and more data!" @pytest.mark.anyio @pytest.mark.parametrize("max_size", [1, 1024], ids=["rolled", "unrolled"]) async def test_uploadfile_rolling(max_size: int) -> None: """Test that we can r/w to a SpooledTemporaryFile managed by UploadFile before and after it rolls to disk """ stream: BinaryIO = SpooledTemporaryFile( # type: ignore[assignment] max_size=max_size ) file = UploadFile(filename="file", file=stream, size=0) assert await file.read() == b"" assert file.size == 0 await file.write(b"data") assert await file.read() == b"" assert file.size == 4 await file.seek(0) assert await file.read() == b"data" await file.write(b" more") assert await file.read() == b"" assert file.size == 9 await file.seek(0) assert await file.read() == b"data more" assert file.size == 9 await file.close() def test_formdata() -> None: stream = io.BytesIO(b"data") upload = UploadFile(filename="file", file=stream, size=4) form = FormData([("a", "123"), ("a", "456"), ("b", upload)]) assert "a" in form assert "A" not in form assert "c" not in form assert form["a"] == "456" assert form.get("a") == "456" assert form.get("nope", default=None) is None assert form.getlist("a") == ["123", "456"] assert list(form.keys()) == ["a", "b"] assert list(form.values()) == ["456", upload] assert list(form.items()) == [("a", "456"), ("b", upload)] assert len(form) == 2 assert list(form) == ["a", "b"] assert dict(form) == {"a": "456", "b": upload} assert ( repr(form) == "FormData([('a', '123'), ('a', '456'), ('b', " + repr(upload) + ")])" ) assert FormData(form) == form assert FormData({"a": "123", "b": "789"}) == FormData([("a", "123"), ("b", "789")]) assert FormData({"a": "123", "b": "789"}) != {"a": "123", "b": "789"} @pytest.mark.anyio async def test_upload_file_repr() -> None: stream = io.BytesIO(b"data") file = UploadFile(filename="file", file=stream, size=4) assert repr(file) == "UploadFile(filename='file', size=4, headers=Headers({}))" @pytest.mark.anyio async def test_upload_file_repr_headers() -> None: stream = io.BytesIO(b"data") file = UploadFile(filename="file", file=stream, headers=Headers({"foo": "bar"})) assert ( repr(file) == "UploadFile(filename='file', size=None, headers=Headers({'foo': 'bar'}))" ) def test_multidict() -> None: q = MultiDict([("a", "123"), ("a", "456"), ("b", "789")]) assert "a" in q assert "A" not in q assert "c" not in q assert q["a"] == "456" assert q.get("a") == "456" assert q.get("nope", default=None) is None assert q.getlist("a") == ["123", "456"] assert list(q.keys()) == ["a", "b"] assert list(q.values()) == ["456", "789"] assert list(q.items()) == [("a", "456"), ("b", "789")] assert len(q) == 2 assert list(q) == ["a", "b"] assert dict(q) == {"a": "456", "b": "789"} assert str(q) == "MultiDict([('a', '123'), ('a', '456'), ('b', '789')])" assert repr(q) == "MultiDict([('a', '123'), ('a', '456'), ('b', '789')])" assert MultiDict({"a": "123", "b": "456"}) == MultiDict( [("a", "123"), ("b", "456")] ) assert MultiDict({"a": "123", "b": "456"}) == MultiDict({"b": "456", "a": "123"}) assert MultiDict() == MultiDict({}) assert MultiDict({"a": "123", "b": "456"}) != "invalid" q = MultiDict([("a", "123"), ("a", "456")]) assert MultiDict(q) == q q = MultiDict([("a", "123"), ("a", "456")]) q["a"] = "789" assert q["a"] == "789" assert q.get("a") == "789" assert q.getlist("a") == ["789"] q = MultiDict([("a", "123"), ("a", "456")]) del q["a"] assert q.get("a") is None assert repr(q) == "MultiDict([])" q = MultiDict([("a", "123"), ("a", "456"), ("b", "789")]) assert q.pop("a") == "456" assert q.get("a", None) is None assert repr(q) == "MultiDict([('b', '789')])" q = MultiDict([("a", "123"), ("a", "456"), ("b", "789")]) item = q.popitem() assert q.get(item[0]) is None q = MultiDict([("a", "123"), ("a", "456"), ("b", "789")]) assert q.poplist("a") == ["123", "456"] assert q.get("a") is None assert repr(q) == "MultiDict([('b', '789')])" q = MultiDict([("a", "123"), ("a", "456"), ("b", "789")]) q.clear() assert q.get("a") is None assert repr(q) == "MultiDict([])" q = MultiDict([("a", "123")]) q.setlist("a", ["456", "789"]) assert q.getlist("a") == ["456", "789"] q.setlist("b", []) assert "b" not in q q = MultiDict([("a", "123")]) assert q.setdefault("a", "456") == "123" assert q.getlist("a") == ["123"] assert q.setdefault("b", "456") == "456" assert q.getlist("b") == ["456"] assert repr(q) == "MultiDict([('a', '123'), ('b', '456')])" q = MultiDict([("a", "123")]) q.append("a", "456") assert q.getlist("a") == ["123", "456"] assert repr(q) == "MultiDict([('a', '123'), ('a', '456')])" q = MultiDict([("a", "123"), ("b", "456")]) q.update({"a": "789"}) assert q.getlist("a") == ["789"] assert q == MultiDict([("a", "789"), ("b", "456")]) q = MultiDict([("a", "123"), ("b", "456")]) q.update(q) assert repr(q) == "MultiDict([('a', '123'), ('b', '456')])" q = MultiDict([("a", "123"), ("a", "456")]) q.update([("a", "123")]) assert q.getlist("a") == ["123"] q.update([("a", "456")], a="789", b="123") assert q == MultiDict([("a", "456"), ("a", "789"), ("b", "123")])