Source code for sigmap.polygeohasher.utils.geohash

import math

import geopandas as gpd
from shapely.geometry import box

from ..logger import logging

logger = logging.getLogger(__name__)

__BASE32 = "0123456789bcdefghjkmnpqrstuvwxyz"
_base32_map = {c: i for i, c in enumerate(__BASE32)}


[docs] def lonlat_res_for_length(L: int): """ Calculate longitude and latitude resolution for given geohash length. Parameters ---------- L : int Geohash string length Returns ------- tuple (lon_res, lat_res) in degrees """ total_bits = 5 * L lon_bits = math.ceil(total_bits / 2.0) lat_bits = math.floor(total_bits / 2.0) lon_res = 360.0 / (2 ** lon_bits) lat_res = 180.0 / (2 ** lat_bits) return lon_res, lat_res
[docs] def encode_geohash(lon: float, lat: float, L: int): """ Encode longitude and latitude to geohash string. Parameters ---------- lon : float Longitude in degrees [-180, 180] lat : float Latitude in degrees [-90, 90] L : int Desired geohash string length Returns ------- str Geohash string of length L """ lon_interval = [-180.0, 180.0] lat_interval = [-90.0, 90.0] bits = [] total_bits = 5 * L is_lon = True for _ in range(total_bits): if is_lon: mid = (lon_interval[0] + lon_interval[1]) / 2.0 if lon >= mid: bits.append(1) lon_interval[0] = mid else: bits.append(0) lon_interval[1] = mid else: mid = (lat_interval[0] + lat_interval[1]) / 2.0 if lat >= mid: bits.append(1) lat_interval[0] = mid else: bits.append(0) lat_interval[1] = mid is_lon = not is_lon geohash = [] for i in range(0, total_bits, 5): v = 0 for b in bits[i:i + 5]: v = (v << 1) | b geohash.append(__BASE32[v]) return "".join(geohash)
[docs] def candidate_geohashes_covering_bbox(lon_min, lat_min, lon_max, lat_max, L): """ Generate candidate geohashes that could intersect a bounding box. Parameters ---------- lon_min : float Minimum longitude of bbox lat_min : float Minimum latitude of bbox lon_max : float Maximum longitude of bbox lat_max : float Maximum latitude of bbox L : int Geohash length Returns ------- list List of tuples (geohash, lon, lat) """ lon_res, lat_res = lonlat_res_for_length(L) lon_cells_min = int((lon_min + 180.0) / lon_res) lon_cells_max = int((lon_max + 180.0) / lon_res) + 1 lat_cells_min = int((lat_min + 90.0) / lat_res) lat_cells_max = int((lat_max + 90.0) / lat_res) + 1 geos = [] seen = set() for i in range(lon_cells_min, lon_cells_max + 1): lon = -180.0 + (i + 0.5) * lon_res # Ensure longitude is in valid range if lon < -180 or lon > 180: continue for j in range(lat_cells_min, lat_cells_max + 1): lat = -90.0 + (j + 0.5) * lat_res # Ensure latitude is in valid range if lat < -90 or lat > 90: continue if (lon_min - lon_res <= lon <= lon_max + lon_res and lat_min - lat_res <= lat <= lat_max + lat_res): gh = encode_geohash(lon, lat, L) if gh not in seen: seen.add(gh) geos.append((gh, lon, lat)) return geos
def geohash_to_bbox(gh: str): """ Decode geohash string to bounding box coordinates. Parameters ---------- gh : str Geohash string to decode Returns ------- tuple (lon_min, lat_min, lon_max, lat_max) in degrees """ lon_min, lon_max = -180.0, 180.0 lat_min, lat_max = -90.0, 90.0 is_lon = True bits = [] for c in gh: v = _base32_map[c] # each base32 char -> 5 bits (16,8,4,2,1) for i in (16, 8, 4, 2, 1): bits.append(1 if (v & i) else 0) for b in bits: if is_lon: mid = (lon_min + lon_max) / 2.0 if b: lon_min = mid else: lon_max = mid else: mid = (lat_min + lat_max) / 2.0 if b: lat_min = mid else: lat_max = mid is_lon = not is_lon return lon_min, lat_min, lon_max, lat_max
[docs] def geohash_to_polygon(gh: str): """ Convert geohash to shapely box polygon. Parameters ---------- gh : str Geohash string Returns ------- shapely.geometry.Polygon Box polygon representing the geohash cell """ lon_min, lat_min, lon_max, lat_max = geohash_to_bbox(gh) return box(lon_min, lat_min, lon_max, lat_max)
[docs] def geohashes_to_gdf(geos, crs='EPSG:4326'): """ Convert geohashes to GeoDataFrame. Parameters ---------- geos : list or str List of geohashes or tuples (geohash, lon, lat), or single geohash crs : str, default 'EPSG:4326' Coordinate reference system Returns ------- gpd.GeoDataFrame GeoDataFrame with geohash codes and polygon geometries """ polys = [] codes = [] for item in geos: # Handle both string and tuple inputs if isinstance(item, tuple): gh = item[0] else: gh = item try: p = geohash_to_polygon(gh) except Exception as e: logger.error(f"Failed to convert geohash {gh}: {e}") continue polys.append(p) codes.append(gh) gdf = gpd.GeoDataFrame({'geohash': codes, 'geometry': polys}, crs=crs) return gdf
[docs] def get_geohash_children(parent_geohash: str): """ Generate all child geohashes for a given parent geohash. Each parent geohash has exactly 32 children (one for each base32 character). Each child adds one character of precision. Parameters ---------- parent_geohash : str Parent geohash string Returns ------- list List of 32 child geohash strings """ children = [] for char in __BASE32: children.append(parent_geohash + char) return children
[docs] def geohashes_to_boxes(geohashes): """ Convert geohash(es) to a dictionary mapping geohash strings to box polygons. Parameters ---------- geohashes : str or list of str Single geohash string or list of geohash strings Returns ------- dict : Dictionary mapping geohash string to shapely box polygon {geohash: box_polygon, ...} Examples -------- >>> # Single geohash >>> boxes = geohashes_to_boxes("u4pruyd") >>> print(boxes) {'u4pruyd': <Box polygon>} >>> # Multiple geohashes >>> boxes = geohashes_to_boxes(["u4pru", "u4prv", "u4prw"]) >>> print(len(boxes)) 3 >>> # Access a specific box >>> box_poly = boxes["u4pru"] >>> print(box_poly.bounds) # (lon_min, lat_min, lon_max, lat_max) """ # Handle single string input if isinstance(geohashes, str): geohashes = [geohashes] result = {} for gh in geohashes: try: lon_min, lat_min, lon_max, lat_max = geohash_to_bbox(gh) result[gh] = box(lon_min, lat_min, lon_max, lat_max) except Exception as e: logger.error(f"Failed to convert geohash {gh} to box: {e}") continue return result
[docs] def geohashes_to_multipolygon(geohashes_or_dict, dissolve=True): """ Convert geohashes to a MultiPolygon by unioning all box polygons. Parameters ---------- geohashes_or_dict : str, list of str, or dict - Single geohash string - List of geohash strings - Dictionary from geohashes_to_boxes() {geohash: box_polygon} dissolve : bool, default True If True, dissolve overlapping/adjacent boxes into single polygons If False, keep as separate polygons in MultiPolygon Returns ------- MultiPolygon or Polygon : Union of all geohash boxes Returns Polygon if union results in single polygon, MultiPolygon if multiple disjoint regions Examples -------- >>> # From list of geohashes >>> geohashes = ["u4pru", "u4prv", "u4prw"] >>> multi_poly = geohashes_to_multipolygon(geohashes) >>> print(multi_poly.area) >>> # From dictionary >>> boxes = geohashes_to_boxes(geohashes) >>> multi_poly = geohashes_to_multipolygon(boxes) >>> # Without dissolving (keep separate) >>> multi_poly = geohashes_to_multipolygon(geohashes, dissolve=False) """ from shapely.geometry import MultiPolygon, Polygon from shapely.ops import unary_union # If input is a dictionary, extract the box polygons if isinstance(geohashes_or_dict, dict): boxes = list(geohashes_or_dict.values()) else: # Convert geohashes to boxes boxes_dict = geohashes_to_boxes(geohashes_or_dict) boxes = list(boxes_dict.values()) if not boxes: logger.warning("No valid boxes to convert to MultiPolygon") return MultiPolygon([]) # Empty MultiPolygon if dissolve: # Use unary_union to merge overlapping/adjacent polygons result = unary_union(boxes) # Ensure result is a MultiPolygon or Polygon if isinstance(result, Polygon): return result # Single polygon is fine elif isinstance(result, MultiPolygon): return result else: # Fallback for unexpected geometry types logger.warning(f"Union resulted in unexpected geometry type: {type(result)}") return MultiPolygon([result]) if hasattr(result, '__geo_interface__') else MultiPolygon([]) else: # Keep as separate polygons return MultiPolygon(boxes)
# Convenience functions for common operations def coverage_dict_to_multipolygon(geohash_dict, dissolve=True): """ Convert a coverage dictionary to a MultiPolygon. Useful for converting the output of adaptive_geohash_coverage() or geohash_coverage() directly to a polygon. Parameters ---------- geohash_dict : dict Dictionary with level as keys and list of geohashes as values (output from coverage functions) dissolve : bool, default True Whether to dissolve adjacent polygons Returns ------- MultiPolygon or Polygon : Union of all geohashes in the dict Example ------- >>> geohash_dict, tiles_gdf = adaptive_geohash_coverage(country_geom, 3, 5) >>> coverage_polygon = coverage_dict_to_multipolygon(geohash_dict) """ all_geohashes = [] for level, geohashes in geohash_dict.items(): all_geohashes.extend(geohashes) return geohashes_to_multipolygon(all_geohashes, dissolve=dissolve) def coverage_dict_to_multipolygon_by_level(geohash_dict, dissolve=True): """ Convert a coverage dictionary to separate MultiPolygons per level. Parameters ---------- geohash_dict : dict Dictionary with level as keys and list of geohashes as values dissolve : bool, default True Whether to dissolve adjacent polygons Returns ------- dict : Dictionary mapping level to MultiPolygon/Polygon {level: MultiPolygon, ...} Example ------- >>> geohash_dict, tiles_gdf = adaptive_geohash_coverage(country_geom, 3, 5) >>> level_polygons = coverage_dict_to_multipolygon_by_level(geohash_dict) >>> print(level_polygons[3].area) # Area covered by level 3 tiles """ result = {} for level, geohashes in geohash_dict.items(): result[level] = geohashes_to_multipolygon(geohashes, dissolve=dissolve) return result