import os import json from math import cos, sqrt, fabs import random import geopandas as gpd from matplotlib import pyplot as plt from shapely.ops import linemerge, unary_union, polygonize from shapely.geometry import Polygon, LineString, MultiLineString from server.consts import LAKE_RELATIONS_PATH ''' # 0 starting coordinate (x,y) # 1 1 deg lat = 111.32km 1 deg lng = 40075km * cos( lat ) / 360 # 2 Formulas for calculating a distance in kilometers to a distance in latitude and longitude lat = distance_in_km/111.32km lng = (distance_in_km × 360)/(40075km × cos(lat)) ''' # Read a json file with relation data and send to response object def cut_map(self, cursor, lake_name: str, cell_size_in_km: float = 0.5): """ Cuts a map into a grid based on a selected cell size Parameters: self (BaseHTTPRequestHandler): A instance of a BaseHTTPRequestHandler cursor (cursor): An Sqlite3 cursor object that points to the database lake_name (str): The name of the lake to be cut cell_size_in_km (float): The selected cell size in kilometers """ try: # Read relation from GeoJson file and extract all geometry of type Polygon geo_data = gpd.read_file(LAKE_RELATIONS_PATH + lake_name + ".geojson") polygon_data = geo_data[geo_data['geometry'].geom_type == 'Polygon'] polygons = [Polygon(polygon.exterior) for polygon in polygon_data['geometry']] if len(polygons) <= 1: raise Exception("Failed to convert JSON object to Shapely Polygons") # List to store all the map tiles divided_map = [] # Convert the cell size to degrees cell_size = cell_size_in_km * 10 cell_width = cell_size / 111.32 # A slightly more complicated formula is required to calculate the height. This ensures that # the height in km is equal to the width in km regardless of the latitude. cell_height = (cell_size * 360) / (40075 * cos(cell_width)) # Process all polygons for polygon in polygons: # Generate a grid based on the calculated cell size lines = create_grid(polygon, cell_width, cell_height) lines.append(polygon.boundary) # Merge the grid lines into a single grid object lines = unary_union(lines) lines = linemerge(lines) lines = list(polygonize(lines)) # Divide the polygon into tiles based on the generated grid divided_map.extend(combine_grid_with_poly(polygon, lines)) # List to store new GeoJSON feature objects features = [] # Create subdivisions for each map tile sub_div_id = 0 for tile in divided_map: # Calculate tile center based on bounds, and round down to two decimals min_x, min_y, max_x, max_y = tile.bounds center = round(max_y - (max_y - min_y), 6), round(max_x - (max_x - min_x), 6) rounded_coordinates = [] if isinstance(tile, Polygon): for coords in tile.exterior.coords: rounded_coords = (round(coords[0], 4), round(coords[1], 4)) rounded_coordinates.append(rounded_coords) rounded_tile = Polygon(rounded_coordinates) # Create new feature object tile_feature = { 'type': 'Feature', 'properties': { 'sub_div_id': str(sub_div_id), 'sub_div_center': center }, 'geometry': rounded_tile.__geo_interface__ } # Append new feature oject to list, and increment sub_div_id for next iteration features.append(tile_feature) sub_div_id += 1 # Create new GeoJSON object containing all the new feature objects feature_collection = { 'type': 'FeatureCollection', 'cell_count': sub_div_id, # Add the last subdivision ID as number of tiles 'cell_width': cell_width, 'cell_height': cell_height, 'cell_size_in_km': cell_size_in_km, 'features': features } # Add lake name to database cursor.execute(''' SELECT Name FROM BodyOfWater WHERE Name = ?; ''', (lake_name,)) existing_lake = cursor.fetchone() # If lake_name doesn't exist, insert it into the database if existing_lake is None: cursor.execute(''' INSERT INTO BodyOfWater(Name) VALUES (?); ''', (lake_name,)) # Plot the newly created map and save it to a new file plot_map(divided_map) write_json_to_file(lake_name, feature_collection) # Return the map to the response object self.send_response(200) self.send_header("Content-type", "application/json") self.end_headers() self.wfile.write(json.dumps(feature_collection).encode('utf-8')) except Exception as e: print(f"Error in adding new map: {e}") self.send_response(500) self.send_header("Content-type", "application/json") self.end_headers() def create_grid(poly: Polygon, cell_width: float, cell_height: float): """ Returns a list of vertical and horizontal LineStrings that create a grid. Parameters: poly (Polygon): A Shapely Polygon representing a map or part of a map cell_width (float): The width of the grid cells in degrees cell_height (float): The height of the grid cells in degrees Returns: grid_lines (list): List of LineString objects defining the grid """ # Retrieve bounds of the entire polygon bounds = poly.bounds min_x, min_y, max_x, max_y = bounds # List to store all created lines grid_lines = [] # Create new horizontal lines while within bounds y = min_y while y <= max_y: line = LineString([(min_x, y), (max_x, y)]) grid_lines.append(line) y += cell_height # Create new vertical lines while within bounds x = min_x while x <= max_x: line = LineString([(x, min_y), (x, max_y)]) grid_lines.append(line) x += cell_width return grid_lines def combine_grid_with_poly(polygon, grid): """ Returns a list of polygons that together make up map tiles. Parameters: polygon (Polygon): A polygon representing a map or part of a map grid (list): List of LineString objects defining the grid Returns: intersecting_tiles (list): List of Polygons """ # List to contain all the tiles intersecting_tiles = [] for line in grid: if line.intersects(polygon): intersection = line.intersection(polygon) # Check if intersection is a MultiLineString if isinstance(intersection, MultiLineString): # Extend the intersecting tiles with the polygonized results intersecting_tiles.extend(list(polygonize(intersection))) else: intersecting_tiles.append(intersection) return intersecting_tiles def write_json_to_file(lake_name: str, map_data: dict): """ Writes a divided map to a JSON file and updates all_lake_names.json Parameters: lake_name (str): Name of the lake and file to write to map_data (dict): List of map polygons converted to a JSON dictionary """ # Create and write divided map to new file print("Writing to file...") if not os.path.exists(LAKE_RELATIONS_PATH): raise Exception("Directory from path does not exist") with open(LAKE_RELATIONS_PATH + '/' + lake_name + '_div.json', 'w') as f: json.dump(map_data, f) # Read all_system_lakes.json with open(LAKE_RELATIONS_PATH + 'all_lake_names.json', 'r') as file: data = json.load(file) # Check if the lake name exists in the list if lake_name not in data: data.append(lake_name) # Only append to list if it does not already exist # Update all_lake_names.json with new lake name with open(LAKE_RELATIONS_PATH + 'all_lake_names.json', 'w') as file: json.dump(data, file, ensure_ascii=False, indent=2) # Plotting the map can take a considerable amount of time, especially when creating maps with many # subdivisions. Removing calls to plot_map will speed up the process, but it is recommended to plot the map # after each division to ensure that the map was divided as intended. def plot_map(divided_map): """ Plots a divided map using matplotlib. Parameters: divided_map (list): List of Shapely Polygons """ print("Plotting... This may take some time...") # Convert Polygon objects to GeoDataFrames tiles = [gpd.GeoDataFrame(geometry=[tile]) for tile in divided_map] # Configure plot settings fig, ax = plt.subplots() ax.set_aspect(1.5) # Plot each tile for tile in tiles: # Give each tile a random color to clearly visualize the grid random_color = "#{:06x}".format(random.randint(0, 0xFFFFFF)) gpd.GeoSeries(tile.geometry).plot(ax=ax, facecolor=random_color, edgecolor='none') # Display plot plt.show()