code: pass ruff

This commit is contained in:
Mark Joshwel 2024-03-26 18:08:49 +00:00
parent 4871b9d352
commit 1ccd66b166
3 changed files with 175 additions and 280 deletions

View file

@ -33,6 +33,7 @@ from datetime import datetime, timedelta, timezone
from os import getenv from os import getenv
from pathlib import Path from pathlib import Path
from subprocess import run from subprocess import run
from sys import exit as sysexit
# NOTE: change this if surplus has moved # NOTE: change this if surplus has moved
path_surplus = Path(__file__).parent.joinpath("./surplus/surplus.py") path_surplus = Path(__file__).parent.joinpath("./surplus/surplus.py")
@ -42,19 +43,19 @@ build_time = datetime.now(timezone(timedelta(hours=8))) # using SGT
_insert_build_branch = getenv( _insert_build_branch = getenv(
"SURPLUS_BUILD_BRANCH", "SURPLUS_BUILD_BRANCH",
run( run(
"git branch --show-current", "git branch --show-current".split(),
capture_output=True, capture_output=True,
text=True, text=True,
shell=True, check=False,
).stdout.strip("\n"), ).stdout.strip("\n"),
) )
insert_build_branch = _insert_build_branch if _insert_build_branch != "" else "unknown" insert_build_branch = _insert_build_branch if _insert_build_branch != "" else "unknown"
insert_build_commit: str = run( insert_build_commit: str = run(
"git rev-parse HEAD", "git rev-parse HEAD".split(),
capture_output=True, capture_output=True,
text=True, text=True,
shell=True, check=False,
).stdout.strip("\n") ).stdout.strip("\n")
insert_build_datetime: str = repr(build_time).replace("datetime.", "") insert_build_datetime: str = repr(build_time).replace("datetime.", "")
@ -63,7 +64,7 @@ insert_build_datetime: str = repr(build_time).replace("datetime.", "")
targets: list[tuple[str, str]] = [ targets: list[tuple[str, str]] = [
( (
'VERSION_SUFFIX: Final[str] = "-local"', 'VERSION_SUFFIX: Final[str] = "-local"',
'VERSION_SUFFIX: Final[str] = ""', 'VERSION_SUFFIX: Final[str] = "-alpha"',
), ),
( (
'BUILD_BRANCH: Final[str] = "future"', 'BUILD_BRANCH: Final[str] = "future"',
@ -81,18 +82,19 @@ targets: list[tuple[str, str]] = [
def main() -> int: def main() -> int:
assert path_surplus.is_file() and path_surplus.exists(), f"{path_surplus} not found" if not (path_surplus.is_file() and path_surplus.exists()):
raise FileNotFoundError(path_surplus)
source_surplus: str = path_surplus.read_text(encoding="utf-8") source_surplus: str = path_surplus.read_text(encoding="utf-8")
for old, new in targets: for old, new in targets:
print(f"new: {new}\nold: {old}\n") print(f"new: {new}\nold: {old}\n") # noqa: T201
source_surplus = source_surplus.replace(old, new) source_surplus = source_surplus.replace(old, new)
path_surplus.write_text(source_surplus, encoding="utf-8") # path_surplus.write_text(source_surplus, encoding="utf-8")
return 0 return 0
if __name__ == "__main__": if __name__ == "__main__":
exit(main()) sysexit(main())

View file

@ -32,24 +32,13 @@ For more information, please refer to <http://unlicense.org/>
# surplus was and would've been a single-file module, but typing is in the way :( # surplus was and would've been a single-file module, but typing is in the way :(
# https://github.com/python/typing/issues/1333 # https://github.com/python/typing/issues/1333
from .surplus import default_geocoder # deprecated, emulation function from .surplus import ( # noqa: F401, TID252
from .surplus import default_reverser # deprecated, emulation function
from .surplus import (
BUILD_BRANCH, BUILD_BRANCH,
BUILD_COMMIT, BUILD_COMMIT,
BUILD_DATETIME, BUILD_DATETIME,
CONNECTION_MAX_RETRIES, CONNECTION_MAX_RETRIES,
CONNECTION_WAIT_SECONDS, CONNECTION_WAIT_SECONDS,
EMPTY_LATLONG, EMPTY_LATLONG,
SHAREABLE_TEXT_LINE_0_KEYS,
SHAREABLE_TEXT_LINE_1_KEYS,
SHAREABLE_TEXT_LINE_2_KEYS,
SHAREABLE_TEXT_LINE_3_KEYS,
SHAREABLE_TEXT_LINE_4_KEYS,
SHAREABLE_TEXT_LINE_5_KEYS,
SHAREABLE_TEXT_LINE_6_KEYS,
SHAREABLE_TEXT_LOCALITY,
SHAREABLE_TEXT_NAMES,
VERSION, VERSION,
VERSION_SUFFIX, VERSION_SUFFIX,
Behaviour, Behaviour,
@ -68,12 +57,12 @@ from .surplus import (
ResultType, ResultType,
StringQuery, StringQuery,
SurplusDefaultGeocoding, SurplusDefaultGeocoding,
SurplusException, SurplusError,
SurplusGeocoderProtocol, SurplusGeocoderProtocol,
SurplusReverserProtocol, SurplusReverserProtocol,
__version__,
cli, cli,
generate_fingerprinted_user_agent, generate_fingerprinted_user_agent,
handle_args,
parse_query, parse_query,
surplus, surplus,
) )

View file

@ -31,6 +31,7 @@ For more information, please refer to <http://unlicense.org/>
from argparse import ArgumentParser from argparse import ArgumentParser
from collections import OrderedDict from collections import OrderedDict
from collections.abc import Callable, Sequence
from copy import deepcopy from copy import deepcopy
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
@ -41,35 +42,35 @@ from json import loads as json_loads
from json.decoder import JSONDecodeError from json.decoder import JSONDecodeError
from platform import platform from platform import platform
from socket import gethostname from socket import gethostname
from sys import exit as sysexit
from sys import stderr, stdin, stdout from sys import stderr, stdin, stdout
from typing import ( from typing import (
TYPE_CHECKING,
Any, Any,
Callable,
Final, Final,
Generic, Generic,
NamedTuple, NamedTuple,
Protocol, Protocol,
Sequence,
TextIO, TextIO,
TypeAlias, TypeAlias,
TypeVar, TypeVar,
) )
from uuid import getnode from uuid import getnode
from geopy import Location as _geopy_Location # type: ignore
from geopy.extra.rate_limiter import RateLimiter as _geopy_RateLimiter # type: ignore from geopy.extra.rate_limiter import RateLimiter as _geopy_RateLimiter # type: ignore
from geopy.geocoders import Nominatim as _geopy_Nominatim # type: ignore from geopy.geocoders import Nominatim as _geopy_Nominatim # type: ignore
from pluscodes import PlusCode as _PlusCode # type: ignore from pluscodes import PlusCode as _PlusCode # type: ignore
from pluscodes import encode as _PlusCode_encode # type: ignore from pluscodes import encode as _encode # type: ignore
from pluscodes.openlocationcode import recoverNearest as _PlusCode_recoverNearest # type: ignore
from pluscodes.validator import Validator as _PlusCode_Validator # type: ignore from pluscodes.validator import Validator as _PlusCode_Validator # type: ignore
from pluscodes.openlocationcode import ( # type: ignore # isort: skip if TYPE_CHECKING:
recoverNearest as _PlusCode_recoverNearest, from geopy import Location as _geopy_Location # type: ignore
)
# constants # constants
VERSION: Final[tuple[int, int, int]] = (2, 2, 0) __version__ = "2024.0.0-alpha"
VERSION: Final[tuple[int, int, int]] = (2024, 0, 0)
VERSION_SUFFIX: Final[str] = "-local" VERSION_SUFFIX: Final[str] = "-local"
BUILD_BRANCH: Final[str] = "future" BUILD_BRANCH: Final[str] = "future"
BUILD_COMMIT: Final[str] = "latest" BUILD_COMMIT: Final[str] = "latest"
@ -215,15 +216,13 @@ SHAREABLE_TEXT_LINE_6_KEYS.update(
), ),
} }
) )
SHAREABLE_TEXT_LINE_SETTINGS.update( SHAREABLE_TEXT_LINE_SETTINGS.update({"IT": deepcopy(SHAREABLE_TEXT_LINE_SETTINGS)["default"]})
{"IT": deepcopy(SHAREABLE_TEXT_LINE_SETTINGS)["default"]}
)
SHAREABLE_TEXT_LINE_SETTINGS["IT"][5] = (" ", False) SHAREABLE_TEXT_LINE_SETTINGS["IT"][5] = (" ", False)
# special per-country key arrangements for MY/Malaysia # special per-country key arrangements for MY/Malaysia
SHAREABLE_TEXT_LINE_4_KEYS.update( SHAREABLE_TEXT_LINE_4_KEYS.update(
{ {
"MY": tuple(), "MY": (),
}, },
) )
SHAREABLE_TEXT_LINE_5_KEYS.update( SHAREABLE_TEXT_LINE_5_KEYS.update(
@ -234,9 +233,7 @@ SHAREABLE_TEXT_LINE_5_KEYS.update(
), ),
}, },
) )
SHAREABLE_TEXT_LINE_SETTINGS.update( SHAREABLE_TEXT_LINE_SETTINGS.update({"MY": deepcopy(SHAREABLE_TEXT_LINE_SETTINGS)["default"]})
{"MY": deepcopy(SHAREABLE_TEXT_LINE_SETTINGS)["default"]}
)
SHAREABLE_TEXT_LINE_SETTINGS["MY"][4] = (" ", False) SHAREABLE_TEXT_LINE_SETTINGS["MY"][4] = (" ", False)
SHAREABLE_TEXT_LINE_SETTINGS["MY"][5] = (" ", True) SHAREABLE_TEXT_LINE_SETTINGS["MY"][5] = (" ", True)
@ -244,30 +241,23 @@ SHAREABLE_TEXT_LINE_SETTINGS["MY"][5] = (" ", True)
# exceptions # exceptions
class SurplusException(Exception): class SurplusError(Exception):
"""base skeleton exception for handling and typing surplus exception classes""" """base skeleton exception for handling and typing surplus exception classes"""
...
class NoSuitableLocationError(SurplusError): ...
class NoSuitableLocationError(SurplusException): class IncompletePlusCodeError(SurplusError): ...
...
class IncompletePlusCodeError(SurplusException): class PlusCodeNotFoundError(SurplusError): ...
...
class PlusCodeNotFoundError(SurplusException): class LatlongParseError(SurplusError): ...
...
class LatlongParseError(SurplusException): class EmptyQueryError(SurplusError): ...
...
class EmptyQueryError(SurplusException):
...
# data structures # data structures
@ -357,7 +347,7 @@ class Result(NamedTuple, Generic[ResultType]):
"""method that returns True if self.error is not None""" """method that returns True if self.error is not None"""
return self.error is None return self.error is None
def cry(self, string: bool = False) -> str: def cry(self, string: bool = False) -> str: # noqa: FBT001, FBT002
""" """
method that raises self.error if is an instance of BaseException, method that raises self.error if is an instance of BaseException,
returns self.error if is an instance of str, or returns an empty string if returns self.error if is an instance of str, or returns an empty string if
@ -438,8 +428,7 @@ class SurplusGeocoderProtocol(Protocol):
exceptions are handled by the caller exceptions are handled by the caller
""" """
def __call__(self, place: str) -> Latlong: def __call__(self, place: str) -> Latlong: ...
...
class SurplusReverserProtocol(Protocol): class SurplusReverserProtocol(Protocol):
@ -478,8 +467,7 @@ class SurplusReverserProtocol(Protocol):
exceptions are handled by the caller exceptions are handled by the caller
""" """
def __call__(self, latlong: Latlong, level: int = 18) -> dict[str, Any]: def __call__(self, latlong: Latlong, level: int = 18) -> dict[str, Any]: ...
...
class PlusCodeQuery(NamedTuple): class PlusCodeQuery(NamedTuple):
@ -496,7 +484,7 @@ class PlusCodeQuery(NamedTuple):
code: str code: str
def to_lat_long_coord(self, geocoder: SurplusGeocoderProtocol) -> Result[Latlong]: def to_lat_long_coord(self, geocoder: SurplusGeocoderProtocol) -> Result[Latlong]: # noqa: ARG002
""" """
method that returns a latitude-longitude coordinate pair method that returns a latitude-longitude coordinate pair
@ -520,12 +508,11 @@ class PlusCodeQuery(NamedTuple):
return Result[Latlong]( return Result[Latlong](
EMPTY_LATLONG, EMPTY_LATLONG,
error=IncompletePlusCodeError( error=IncompletePlusCodeError(
"PlusCodeQuery.to_lat_long_coord: " "PlusCodeQuery.to_lat_long_coord: " "Plus Code is not full-length (e.g., 6PH58QMF+FX)"
"Plus Code is not full-length (e.g., 6PH58QMF+FX)"
), ),
) )
except Exception as exc: except Exception as exc: # noqa: BLE001
return Result[Latlong](EMPTY_LATLONG, error=exc) return Result[Latlong](EMPTY_LATLONG, error=exc)
return Result[Latlong](Latlong(latitude=latitude, longitude=longitude)) return Result[Latlong](Latlong(latitude=latitude, longitude=longitude))
@ -578,7 +565,7 @@ class LocalCodeQuery(NamedTuple):
return Result[str](recovered_pluscode) return Result[str](recovered_pluscode)
except Exception as exc: except Exception as exc: # noqa: BLE001
return Result[str]("", error=exc) return Result[str]("", error=exc)
def to_lat_long_coord(self, geocoder: SurplusGeocoderProtocol) -> Result[Latlong]: def to_lat_long_coord(self, geocoder: SurplusGeocoderProtocol) -> Result[Latlong]:
@ -623,7 +610,7 @@ class LatlongQuery(NamedTuple):
latlong: Latlong latlong: Latlong
def to_lat_long_coord(self, geocoder: SurplusGeocoderProtocol) -> Result[Latlong]: def to_lat_long_coord(self, geocoder: SurplusGeocoderProtocol) -> Result[Latlong]: # noqa: ARG002
""" """
method that returns a latitude-longitude coordinate pair method that returns a latitude-longitude coordinate pair
@ -639,7 +626,7 @@ class LatlongQuery(NamedTuple):
def __str__(self) -> str: def __str__(self) -> str:
"""method that returns string representation of query""" """method that returns string representation of query"""
return f"{str(self.latlong)}" return f"{self.latlong!s}"
class StringQuery(NamedTuple): class StringQuery(NamedTuple):
@ -671,7 +658,7 @@ class StringQuery(NamedTuple):
try: try:
return Result[Latlong](geocoder(self.query)) return Result[Latlong](geocoder(self.query))
except Exception as exc: except Exception as exc: # noqa: BLE001
return Result[Latlong](EMPTY_LATLONG, error=exc) return Result[Latlong](EMPTY_LATLONG, error=exc)
def __str__(self) -> str: def __str__(self) -> str:
@ -695,22 +682,24 @@ def generate_fingerprinted_user_agent() -> Result[str]:
""" """
version: str = ".".join([str(v) for v in VERSION]) + VERSION_SUFFIX version: str = ".".join([str(v) for v in VERSION]) + VERSION_SUFFIX
try: def _try(func: Callable) -> str:
system_info: str = platform() try:
hostname: str = gethostname() return func()
mac_address: str = ":".join(
[
"{:02x}".format((getnode() >> elements) & 0xFF)
for elements in range(0, 2 * 6, 2)
][::-1]
)
unique_info: str = f"{version}-{system_info}-{hostname}-{mac_address}"
except Exception as exc: except Exception: # noqa: BLE001
return Result[str](f"surplus/{version} (generic-user)", error=exc) return "unknown"
system_info: str = _try(platform)
hostname = _try(gethostname)
mac_address = _try(
lambda: ":".join([f"{(getnode() >> elements) & 0xFF:02x}" for elements in range(0, 2 * 6, 2)][::-1])
)
unique_info = f"{version}-{system_info}-{hostname}-{mac_address}"
if unique_info == "unknown-unknown-unknown-unknown":
return Result[str](f"surplus/{version} (generic-user)")
fingerprint: str = shake_256(unique_info.encode()).hexdigest(5) fingerprint: str = shake_256(unique_info.encode()).hexdigest(5)
return Result[str](f"surplus/{version} ({fingerprint})") return Result[str](f"surplus/{version} ({fingerprint})")
@ -740,17 +729,14 @@ class SurplusDefaultGeocoding:
""" """
user_agent: str = default_fingerprint user_agent: str = default_fingerprint
_ratelimited_raw_geocoder: Callable | None = None _ratelimited_raw_geocoder: Callable = lambda _: None # noqa: E731
_ratelimited_raw_reverser: Callable | None = None _ratelimited_raw_reverser: Callable = lambda _: None # noqa: E731
_first_update: bool = False _first_update: bool = False
def update_geocoding_functions(self) -> None: def update_geocoding_functions(self) -> None:
""" """
re-initialise the geocoding functions with the current user agent, also generate re-initialise the geocoding functions with the current user agent, also generate
a new user agent if not set properly a new user agent if not set properly
recommended to call this before using surplus as by default the geocoding
functions are uninitialised
""" """
if not isinstance(self.user_agent, str): if not isinstance(self.user_agent, str):
@ -785,26 +771,20 @@ class SurplusDefaultGeocoding:
see SurplusGeocoderProtocol for more information on surplus geocoder functions see SurplusGeocoderProtocol for more information on surplus geocoder functions
""" """
if not callable(self._ratelimited_raw_geocoder) or (self._first_update is False): if self._first_update is False:
self.update_geocoding_functions() self.update_geocoding_functions()
# https://github.com/python/mypy/issues/12155
assert callable(self._ratelimited_raw_geocoder)
location: _geopy_Location | None = self._ratelimited_raw_geocoder(place) location: _geopy_Location | None = self._ratelimited_raw_geocoder(place)
if location is None: if location is None:
raise NoSuitableLocationError( msg = f"No suitable location could be geolocated from '{place}'"
f"No suitable location could be geolocated from '{place}'" raise NoSuitableLocationError(msg)
)
bounding_box: tuple[float, float, float, float] | None = location.raw.get( bounding_box: tuple[float, float, float, float] | None = location.raw.get("boundingbox", None)
"boundingbox", None
)
if location.raw.get("boundingbox", None) is not None: if location.raw.get("boundingbox", None) is not None:
_bounding_box = [float(c) for c in location.raw.get("boundingbox", [])] _bounding_box = [float(c) for c in location.raw.get("boundingbox", [])]
if len(_bounding_box) == 4: if len(_bounding_box) == 4: # noqa: PLR2004
bounding_box = ( bounding_box = (
_bounding_box[0], _bounding_box[0],
_bounding_box[1], _bounding_box[1],
@ -830,18 +810,14 @@ class SurplusDefaultGeocoding:
see SurplusReverserProtocol for more information on surplus reverser functions see SurplusReverserProtocol for more information on surplus reverser functions
""" """
if not callable(self._ratelimited_raw_reverser) or (self._first_update is False): if self._first_update is False:
self.update_geocoding_functions() self.update_geocoding_functions()
# https://github.com/python/mypy/issues/12155 location: _geopy_Location | None = self._ratelimited_raw_reverser(str(latlong), zoom=level)
assert callable(self._ratelimited_raw_reverser)
location: _geopy_Location | None = self._ratelimited_raw_reverser(
str(latlong), zoom=level
)
if location is None: if location is None:
raise NoSuitableLocationError(f"could not reverse '{str(latlong)}'") msg = f"could not reverse '{latlong!s}'"
raise NoSuitableLocationError(msg)
location_dict: dict[str, Any] = {} location_dict: dict[str, Any] = {}
@ -855,32 +831,7 @@ class SurplusDefaultGeocoding:
return location_dict return location_dict
default_geocoding: Final[SurplusDefaultGeocoding] = SurplusDefaultGeocoding( default_geocoding: Final[SurplusDefaultGeocoding] = SurplusDefaultGeocoding(default_fingerprint)
default_fingerprint
)
default_geocoding.update_geocoding_functions()
def default_geocoder(place: str) -> Latlong:
"""(deprecated) geocoder for surplus, uses OpenStreetMap Nominatim"""
print(
"warning: default_geocoder is deprecated. "
"this is a emulation function that will use a fingerprinted user agent.",
file=stderr,
)
return default_geocoding.geocoder(place=place)
def default_reverser(latlong: Latlong, level: int = 18) -> dict[str, Any]:
"""
(deprecated) reverser for surplus, uses OpenStreetMap Nominatim
"""
print(
"warning: default_reverser is deprecated. "
"this is a emulation function that will use a fingerprinted user agent.",
file=stderr,
)
return default_geocoding.reverser(latlong=latlong, level=level)
class Behaviour(NamedTuple): class Behaviour(NamedTuple):
@ -960,8 +911,8 @@ def parse_query(behaviour: Behaviour) -> Result[Query]:
original_query = str(behaviour.query) original_query = str(behaviour.query)
split_query = behaviour.query.split(" ") split_query = behaviour.query.split(" ")
for word in split_query: for _word in split_query:
word = word.strip(",").strip() word = _word.strip(",").strip()
if validator.is_valid(word): if validator.is_valid(word):
portion_plus_code = word portion_plus_code = word
@ -986,9 +937,7 @@ def parse_query(behaviour: Behaviour) -> Result[Query]:
if (portion_locality == "") and (not validator.is_full(portion_plus_code)): if (portion_locality == "") and (not validator.is_full(portion_plus_code)):
return Result[Query]( return Result[Query](
LatlongQuery(EMPTY_LATLONG), LatlongQuery(EMPTY_LATLONG),
error=IncompletePlusCodeError( error=IncompletePlusCodeError("_match_plus_code: Plus Code is not full-length (e.g., 6PH58QMF+FX)"),
"_match_plus_code: Plus Code is not full-length (e.g., 6PH58QMF+FX)"
),
) )
if behaviour.debug: if behaviour.debug:
@ -1023,7 +972,7 @@ def parse_query(behaviour: Behaviour) -> Result[Query]:
print(f"debug: parse_query: {behaviour.query=}", file=behaviour.stderr) print(f"debug: parse_query: {behaviour.query=}", file=behaviour.stderr)
# check if empty # check if empty
if (behaviour.query == []) or (behaviour.query == ""): if behaviour.query in ([], ""):
return Result[Query]( return Result[Query](
LatlongQuery(EMPTY_LATLONG), LatlongQuery(EMPTY_LATLONG),
error=EmptyQueryError("empty query string passed"), error=EmptyQueryError("empty query string passed"),
@ -1063,7 +1012,8 @@ def parse_query(behaviour: Behaviour) -> Result[Query]:
try: try:
termux_location_json = json_loads(original_query) termux_location_json = json_loads(original_query)
if not isinstance(termux_location_json, dict): if not isinstance(termux_location_json, dict):
raise ValueError("parsed termux-location json is not a dict") msg = "parsed termux-location json is not a dict"
raise TypeError(msg) # noqa: TRY301
return Result[Query]( return Result[Query](
LatlongQuery( LatlongQuery(
@ -1074,21 +1024,19 @@ def parse_query(behaviour: Behaviour) -> Result[Query]:
) )
) )
except (JSONDecodeError, TypeError) as exc: except (JSONDecodeError, TypeError):
return Result[Query]( return Result[Query](
LatlongQuery(EMPTY_LATLONG), LatlongQuery(EMPTY_LATLONG),
error=ValueError("could not parse termux-location json"), error=ValueError("could not parse termux-location json"),
) )
except KeyError as exc: except KeyError:
return Result[Query]( return Result[Query](
LatlongQuery(EMPTY_LATLONG), LatlongQuery(EMPTY_LATLONG),
error=ValueError( error=ValueError("could not get 'latitude' or 'longitude' keys from termux-location json"),
"could not get 'latitude' or 'longitude' keys from termux-location json"
),
) )
except Exception as exc: except Exception as exc: # noqa: BLE001
return Result[Query]( return Result[Query](
LatlongQuery(EMPTY_LATLONG), LatlongQuery(EMPTY_LATLONG),
error=exc, error=exc,
@ -1107,29 +1055,29 @@ def parse_query(behaviour: Behaviour) -> Result[Query]:
if "," not in single: # no comma, not a latlong coord if "," not in single: # no comma, not a latlong coord
return Result[Query](StringQuery(original_query)) return Result[Query](StringQuery(original_query))
else: # has comma, possibly a latlong coord # has comma, possibly a latlong coord
comma_split_single: list[str] = single.split(",") comma_split_single: list[str] = single.split(",")
if len(comma_split_single) == 2: if len(comma_split_single) == 2: # noqa: PLR2004
try: # try to type cast query try: # try to type cast query
latitude = float(comma_split_single[0].strip(",")) latitude = float(comma_split_single[0].strip(","))
longitude = float(comma_split_single[-1].strip(",")) longitude = float(comma_split_single[-1].strip(","))
except ValueError: # not a latlong coord, fallback except ValueError: # not a latlong coord, fallback
return Result[Query](StringQuery(single)) return Result[Query](StringQuery(single))
else: # are floats, so is a latlong coord else: # are floats, so is a latlong coord
return Result[Query]( return Result[Query](
LatlongQuery( LatlongQuery(
Latlong( Latlong(
latitude=latitude, latitude=latitude,
longitude=longitude, longitude=longitude,
)
) )
) )
)
# not a latlong coord, fallback # not a latlong coord, fallback
return Result[Query](StringQuery(original_query)) return Result[Query](StringQuery(original_query))
case [left_single, right_single]: case [left_single, right_single]:
# possibly a: # possibly a:
@ -1144,9 +1092,7 @@ def parse_query(behaviour: Behaviour) -> Result[Query]:
return Result[Query](StringQuery(original_query)) return Result[Query](StringQuery(original_query))
else: # are floats, so is a latlong coord else: # are floats, so is a latlong coord
return Result[Query]( return Result[Query](LatlongQuery(Latlong(latitude=latitude, longitude=longitude)))
LatlongQuery(Latlong(latitude=latitude, longitude=longitude))
)
case _: case _:
# possibly a: # possibly a:
@ -1193,22 +1139,21 @@ def handle_args() -> Behaviour:
default=False, default=False,
help="prints version information to stderr and exits", help="prints version information to stderr and exits",
) )
parser.add_argument( (
"-c", parser.add_argument(
"--convert-to", "-c",
type=str, "--convert-to",
choices=[str(v.value) for v in ConversionResultTypeEnum], type=str,
help=( choices=[str(v.value) for v in ConversionResultTypeEnum],
"converts query a specific output type, defaults to " help=("converts query a specific output type, defaults to " f"'{Behaviour([]).convert_to_type.value}'"),
f"'{Behaviour([]).convert_to_type.value}'" default=Behaviour([]).convert_to_type.value,
), ),
default=Behaviour([]).convert_to_type.value, )
),
parser.add_argument( parser.add_argument(
"-u", "-u",
"--user-agent", "--user-agent",
type=str, type=str,
help=f"user agent string to use for geocoding service, defaults to fingerprinted user agent string", help="user agent string to use for geocoding service, defaults to fingerprinted user agent string",
default=default_fingerprint, default=default_fingerprint,
) )
parser.add_argument( parser.add_argument(
@ -1224,20 +1169,11 @@ def handle_args() -> Behaviour:
query: str | list[str] = "" query: str | list[str] = ""
# "-" stdin check # "-" stdin check
if args.query == ["-"]: query = "\n".join([line.strip() for line in stdin]) if (args.query == ["-"]) else args.query
stdin_query: list[str] = []
for line in stdin:
stdin_query.append(line.strip())
query = "\n".join(stdin_query)
else:
query = args.query
# setup structures and return # setup structures and return
geocoding = SurplusDefaultGeocoding(args.user_agent) geocoding = SurplusDefaultGeocoding(args.user_agent)
behaviour = Behaviour( return Behaviour(
query=query, query=query,
geocoder=geocoding.geocoder, geocoder=geocoding.geocoder,
reverser=geocoding.reverser, reverser=geocoding.reverser,
@ -1248,13 +1184,12 @@ def handle_args() -> Behaviour:
convert_to_type=ConversionResultTypeEnum(args.convert_to), convert_to_type=ConversionResultTypeEnum(args.convert_to),
using_termux_location=args.using_termux_location, using_termux_location=args.using_termux_location,
) )
return behaviour
def _unique(l: Sequence[str]) -> list[str]: def _unique(container: Sequence[str]) -> list[str]:
"""(internal function) returns a in-order unique list from list""" """(internal function) returns a in-order unique list from list"""
unique: OrderedDict = OrderedDict() unique: OrderedDict = OrderedDict()
for line in l: for line in container:
unique.update({line: None}) unique.update({line: None})
return list(unique.keys()) return list(unique.keys())
@ -1263,7 +1198,7 @@ def _generate_text(
location: dict[str, Any], location: dict[str, Any],
behaviour: Behaviour, behaviour: Behaviour,
mode: TextGenerationEnum = TextGenerationEnum.SHAREABLE_TEXT, mode: TextGenerationEnum = TextGenerationEnum.SHAREABLE_TEXT,
debug: bool = False, debug: bool = False, # noqa: FBT001, FBT002
) -> str: ) -> str:
""" """
(internal function) generate shareable text from location dict (internal function) generate shareable text from location dict
@ -1286,7 +1221,7 @@ def _generate_text(
line_number: int, line_number: int,
line_keys: Sequence[str], line_keys: Sequence[str],
separator: str = ", ", separator: str = ", ",
filter: Callable[[str], list[bool]] = lambda e: [True], filter_func: Callable[[str], list[bool]] = lambda _: [True],
) -> str: ) -> str:
""" """
(internal function) generate a line of shareable text from a list of keys (internal function) generate a line of shareable text from a list of keys
@ -1298,7 +1233,7 @@ def _generate_text(
list of keys to .get() from location dict list of keys to .get() from location dict
separator: str = ", " separator: str = ", "
separator to join elements with separator to join elements with
filter: Callable[[str], list[bool]] = lambda e: True filter_func: Callable[[str], list[bool]] = lambda e: True
function that takes in a string and returns a list of bools, used to function that takes in a string and returns a list of bools, used to
filter elements from line_keys. list will be passed to all(). if all filter elements from line_keys. list will be passed to all(). if all
returns True, then the element is kept. returns True, then the element is kept.
@ -1313,14 +1248,14 @@ def _generate_text(
if detail == "": if detail == "":
continue continue
# filtering: if all(filter(detail)) returns True, # filtering: if all(filter_func(detail)) returns True,
# then the element is kept/added to 'basket' # then the element is kept/added to 'basket'
if filter_status := all(detail_check := filter(detail)) is True: if filter_status := all(detail_check := filter_func(detail)) is True:
if debug: if debug:
print( print(
"debug: _generate_text_line: " "debug: _generate_text_line: "
f"{str(detail_check):<20} -> {str(filter_status):<5} " f"{detail_check!s:<20} -> {filter_status!s:<5} "
f"-------- '{detail}'", f"-------- '{detail}'",
file=behaviour.stderr, file=behaviour.stderr,
) )
@ -1331,7 +1266,7 @@ def _generate_text(
if debug: if debug:
print( print(
"debug: _generate_text_line: " "debug: _generate_text_line: "
f"{str(detail_check):<20} -> {str(filter_status):<5}" f"{detail_check!s:<20} -> {filter_status!s:<5}"
f" filtered '{detail}'", f" filtered '{detail}'",
file=behaviour.stderr, file=behaviour.stderr,
) )
@ -1358,15 +1293,14 @@ def _generate_text(
C: dict content C: dict content
""" """
DEFAULT = "default" country: str = "default"
country: str = DEFAULT
if len(iso3166_2) >= 1: if len(iso3166_2) >= 1:
country = split_iso3166_2[0] country = split_iso3166_2[0]
if country not in line_keys: if country not in line_keys:
return False, line_keys[DEFAULT] return False, line_keys["default"]
else:
return True, line_keys[country] return True, line_keys[country]
# iso3166-2 handling: this allows surplus to have special key arrangements for a # iso3166-2 handling: this allows surplus to have special key arrangements for a
# specific iso3166-2 code for edge cases # specific iso3166-2 code for edge cases
@ -1421,8 +1355,7 @@ def _generate_text(
if n_used_special and debug: if n_used_special and debug:
print( print(
"debug: _generate_text: " "debug: _generate_text: " f"using special key arrangements for '{iso3166_2}' (Singapore)",
f"using special key arrangements for '{iso3166_2}' (Singapore)",
file=behaviour.stderr, file=behaviour.stderr,
) )
@ -1433,18 +1366,14 @@ def _generate_text(
seen_names: list[str] = [ seen_names: list[str] = [
detail detail
for detail in _unique( for detail in _unique([str(location.get(location_key, "")) for location_key in st_names])
[str(location.get(location_key, "")) for location_key in st_names]
)
if detail != "" if detail != ""
] ]
if debug: if debug:
print(f"debug: _generate_text: {seen_names=}", file=behaviour.stderr) print(f"debug: _generate_text: {seen_names=}", file=behaviour.stderr)
general_global_info: list[str] = [ general_global_info: list[str] = [str(location.get(detail, "")) for detail in st_line6_keys]
str(location.get(detail, "")) for detail in st_line6_keys
]
for ( for (
line_number, line_number,
@ -1464,11 +1393,11 @@ def _generate_text(
# filter: everything here should be True if the element is to be kept # filter: everything here should be True if the element is to be kept
if line_filter is False: if line_filter is False:
_filter = lambda e: [True] _filter = lambda _: [True] # noqa: E731
else: else:
_filter = lambda ak: [ _filter = lambda ak: [ # noqa: E731
ak not in general_global_info, ak not in general_global_info,
not any(True if (ak in sn) else False for sn in seen_names), not any(ak in sn for sn in seen_names),
] ]
text.append( text.append(
@ -1476,7 +1405,7 @@ def _generate_text(
line_number=line_number, line_number=line_number,
line_keys=line_keys, line_keys=line_keys,
separator=line_separator, separator=line_separator,
filter=_filter, filter_func=_filter,
) )
) )
@ -1489,9 +1418,8 @@ def _generate_text(
) )
case _: case _:
raise NotImplementedError( msg = f"unknown mode '{mode}' (expected a TextGenerationEnum)"
f"unknown mode '{mode}' (expected a TextGenerationEnum)" raise NotImplementedError(msg)
)
def surplus(query: Query | str, behaviour: Behaviour) -> Result[str]: def surplus(query: Query | str, behaviour: Behaviour) -> Result[str]:
@ -1506,7 +1434,7 @@ def surplus(query: Query | str, behaviour: Behaviour) -> Result[str]:
returns Result[str] returns Result[str]
""" """
if not isinstance(query, (PlusCodeQuery, LocalCodeQuery, LatlongQuery, StringQuery)): if not isinstance(query, PlusCodeQuery | LocalCodeQuery | LatlongQuery | StringQuery):
query_result = parse_query( query_result = parse_query(
behaviour=Behaviour( behaviour=Behaviour(
query=str(query), query=str(query),
@ -1531,9 +1459,7 @@ def surplus(query: Query | str, behaviour: Behaviour) -> Result[str]:
match behaviour.convert_to_type: match behaviour.convert_to_type:
case ConversionResultTypeEnum.SHAREABLE_TEXT: case ConversionResultTypeEnum.SHAREABLE_TEXT:
# get latlong and handle result # get latlong and handle result
latlong_result: Result[Latlong] = query.to_lat_long_coord( latlong_result: Result[Latlong] = query.to_lat_long_coord(geocoder=behaviour.geocoder)
geocoder=behaviour.geocoder
)
if not latlong_result: if not latlong_result:
return Result[str]("", error=latlong_result.error) return Result[str]("", error=latlong_result.error)
@ -1545,7 +1471,7 @@ def surplus(query: Query | str, behaviour: Behaviour) -> Result[str]:
try: try:
location = behaviour.reverser(latlong_result.get()) location = behaviour.reverser(latlong_result.get())
except Exception as exc: except Exception as exc: # noqa: BLE001
return Result[str]("", error=exc) return Result[str]("", error=exc)
if behaviour.debug: if behaviour.debug:
@ -1585,11 +1511,9 @@ def surplus(query: Query | str, behaviour: Behaviour) -> Result[str]:
# perform operation # perform operation
try: try:
pluscode: str = _PlusCode_encode( pluscode: str = _encode(lat=latlong_query.get().latitude, lon=latlong_query.get().longitude)
lat=latlong_query.get().latitude, lon=latlong_query.get().longitude
)
except Exception as exc: except Exception as exc: # noqa: BLE001
return Result[str]("", error=exc) return Result[str]("", error=exc)
return Result[str](pluscode) return Result[str](pluscode)
@ -1612,11 +1536,9 @@ def surplus(query: Query | str, behaviour: Behaviour) -> Result[str]:
# reverse location and handle result # reverse location and handle result
try: try:
location = behaviour.reverser( location = behaviour.reverser(query_latlong, level=LOCALITY_GEOCODER_LEVEL)
query_latlong, level=LOCALITY_GEOCODER_LEVEL
)
except Exception as exc: except Exception as exc: # noqa: BLE001
return Result[str]("", error=exc) return Result[str]("", error=exc)
if behaviour.debug: if behaviour.debug:
@ -1624,13 +1546,14 @@ def surplus(query: Query | str, behaviour: Behaviour) -> Result[str]:
# generate locality portion of local code # generate locality portion of local code
if behaviour.debug: if behaviour.debug:
print( stdout.write(
_generate_text( _generate_text(
location=location, location=location,
behaviour=behaviour, behaviour=behaviour,
mode=TextGenerationEnum.LOCALITY_TEXT, mode=TextGenerationEnum.LOCALITY_TEXT,
debug=behaviour.debug, debug=behaviour.debug,
).strip() ).strip()
+ "\n"
) )
portion_locality: str = _generate_text( portion_locality: str = _generate_text(
@ -1644,25 +1567,31 @@ def surplus(query: Query | str, behaviour: Behaviour) -> Result[str]:
locality_latlong: Latlong = behaviour.geocoder(portion_locality) locality_latlong: Latlong = behaviour.geocoder(portion_locality)
# check now if bounding_box is set and valid # check now if bounding_box is set and valid
assert locality_latlong.bounding_box is not None, ( if getattr(locality_latlong, "bounding_box", None) is None:
"(shortening) geocoder-returned latlong has .bounding_box=None" msg = (
f" - {locality_latlong.bounding_box}" "(shortening) geocoder-returned latlong has .bounding_box=None"
) f" - {locality_latlong.bounding_box}"
)
raise AttributeError(msg) # noqa: TRY301
assert len(locality_latlong.bounding_box) == 4, ( if len(locality_latlong.bounding_box) != 4: # noqa: PLR2004
"(shortening) geocoder-returned latlong has len(.bounding_box) < 4" msg = (
f" - {locality_latlong.bounding_box}" "(shortening) geocoder-returned latlong has len(.bounding_box) != 4"
) f" - {locality_latlong.bounding_box}"
)
raise ValueError(msg) # noqa: TRY301
assert all([type(c) == float for c in locality_latlong.bounding_box]), ( if not all(type(c) == float(c) for c in locality_latlong.bounding_box):
"(shortening) geocoder-returned latlong has non-float in .bounding_box" msg = (
f" - {locality_latlong.bounding_box}" "(shortening) geocoder-returned latlong has non-float in .bounding_box"
) f" - {locality_latlong.bounding_box}"
)
raise TypeError(msg) # noqa: TRY301
except Exception as exc: except Exception as exc: # noqa: BLE001
return Result[str]("", error=exc) return Result[str]("", error=exc)
plus_code = _PlusCode_encode( plus_code = _encode(
lat=query_latlong.latitude, lat=query_latlong.latitude,
lon=query_latlong.longitude, lon=query_latlong.longitude,
) )
@ -1671,47 +1600,27 @@ def surplus(query: Query | str, behaviour: Behaviour) -> Result[str]:
check1 = ( check1 = (
# The center point of the feature is within 0.4 degrees latitude and 0.4 # The center point of the feature is within 0.4 degrees latitude and 0.4
# degrees longitude # degrees longitude
( ((query_latlong.latitude - 0.4) <= locality_latlong.latitude <= (query_latlong.latitude + 0.4)),
(query_latlong.latitude - 0.4) ((query_latlong.longitude - 0.4) <= locality_latlong.longitude <= (query_latlong.longitude + 0.4)),
<= locality_latlong.latitude
<= (query_latlong.latitude + 0.4)
),
(
(query_latlong.longitude - 0.4)
<= locality_latlong.longitude
<= (query_latlong.longitude + 0.4)
),
# The bounding box of the feature is less than 0.8 degrees high and wide. # The bounding box of the feature is less than 0.8 degrees high and wide.
abs(locality_latlong.bounding_box[0] - locality_latlong.bounding_box[1]) abs(locality_latlong.bounding_box[0] - locality_latlong.bounding_box[1]) < 0.8, # noqa: PLR2004
< 0.8, abs(locality_latlong.bounding_box[2] - locality_latlong.bounding_box[3]) < 0.8, # noqa: PLR2004
abs(locality_latlong.bounding_box[2] - locality_latlong.bounding_box[3])
< 0.8,
) )
check2 = ( check2 = (
# The center point of the feature is within 0.4 degrees latitude and 0.4 # The center point of the feature is within 0.4 degrees latitude and 0.4
# degrees longitude" # degrees longitude"
( ((query_latlong.latitude - 8) <= locality_latlong.latitude <= (query_latlong.latitude + 8)),
(query_latlong.latitude - 8) ((query_latlong.longitude - 8) <= locality_latlong.longitude <= (query_latlong.longitude + 8)),
<= locality_latlong.latitude
<= (query_latlong.latitude + 8)
),
(
(query_latlong.longitude - 8)
<= locality_latlong.longitude
<= (query_latlong.longitude + 8)
),
# The bounding box of the feature is less than 0.8 degrees high and wide. # The bounding box of the feature is less than 0.8 degrees high and wide.
abs(locality_latlong.bounding_box[0] - locality_latlong.bounding_box[1]) abs(locality_latlong.bounding_box[0] - locality_latlong.bounding_box[1]) < 16, # noqa: PLR2004
< 16, abs(locality_latlong.bounding_box[2] - locality_latlong.bounding_box[3]) < 16, # noqa: PLR2004
abs(locality_latlong.bounding_box[2] - locality_latlong.bounding_box[3])
< 16,
) )
if check1: if check1:
return Result[str](f"{plus_code[4:]} {portion_locality}") return Result[str](f"{plus_code[4:]} {portion_locality}")
elif check2: if check2:
return Result[str](f"{plus_code[2:]} {portion_locality}") return Result[str](f"{plus_code[2:]} {portion_locality}")
print( print(
@ -1739,9 +1648,7 @@ def surplus(query: Query | str, behaviour: Behaviour) -> Result[str]:
return Result[str](str(latlong_result.get())) return Result[str](str(latlong_result.get()))
case _: case _:
return Result[str]( return Result[str]("", error=f"unknown conversion result type '{behaviour.convert_to_type}'")
"", error=f"unknown conversion result type '{behaviour.convert_to_type}'"
)
# command-line entry # command-line entry
@ -1755,12 +1662,9 @@ def cli() -> int:
# handle arguments and print version header # handle arguments and print version header
print( print(
f"surplus version {'.'.join([str(v) for v in VERSION])}{VERSION_SUFFIX}" f"surplus version {'.'.join([str(v) for v in VERSION])}{VERSION_SUFFIX}"
+ (f", debug mode" if behaviour.debug else "") + (", debug mode" if behaviour.debug else "")
+ ( + (
( (f" ({BUILD_COMMIT[:10]}@{BUILD_BRANCH}, " f'{BUILD_DATETIME.strftime("%a %d %b %Y %H:%M:%S %z")})')
f" ({BUILD_COMMIT[:10]}@{BUILD_BRANCH}, "
f'{BUILD_DATETIME.strftime("%a %d %b %Y %H:%M:%S %z")})'
)
if behaviour.debug or behaviour.version_header if behaviour.debug or behaviour.version_header
else "" else ""
), ),
@ -1768,7 +1672,7 @@ def cli() -> int:
) )
if behaviour.version_header: if behaviour.version_header:
exit(0) sysexit(0)
# parse query and handle result # parse query and handle result
query = parse_query(behaviour=behaviour) query = parse_query(behaviour=behaviour)
@ -1796,4 +1700,4 @@ def cli() -> int:
if __name__ == "__main__": if __name__ == "__main__":
exit(cli()) sysexit(cli())