-
Sara Savanovic Djordjevic authoredSara Savanovic Djordjevic authored
update_measurements.py 12.04 KiB
import os
import json
import random
from datetime import datetime
from server.consts import LAKE_RELATIONS_PATH, STATS_OUTPUT_PATH
def update_measurements_handler(self, lake_name: str):
status_code, measurement_data = update_measurements(lake_name)
self.send_response(status_code)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(measurement_data)
def update_measurements(lake_name: str) -> (int, str):
"""
Retrieves LiDar data for a given lake, and adds weather data to each subdivision.
Parameters:
lake_name (str): The name of the requested lake
Returns:
(int, str): A HTTP status code and the updated data
"""
try:
# Return immediately if an invalid lake name was provided
if not os.path.exists(LAKE_RELATIONS_PATH + lake_name + "_div.json"):
print("The system lake does not exist")
return 404, f"{lake_name} does not exists in the system"
# Define file path to lidar data file
lidar_data_path = os.path.join(LAKE_RELATIONS_PATH, lake_name + '_lidar_data.json')
# Lists to store processed data
sub_div_ids = []
measurements = []
# Some lakes may not have any recent lidar data, so must check if the file exists
if os.path.exists(lidar_data_path):
# Read the newest lidar data from JSON file
with open(lidar_data_path, 'r') as file:
lidar_data = json.load(file)
all_ice_stats = []
if os.path.exists(STATS_OUTPUT_PATH + lake_name + "_sub_div.json"):
# Tro to read ice stats from NVE model for current lake
with open(STATS_OUTPUT_PATH + lake_name + "_sub_div.json", 'r') as file:
all_ice_stats = json.load(file)
# Iterate over all fetched rows
for measurement in lidar_data:
processed_subdivs = []
# Create new measurement object with embedded sensor object
new_measurement = {
'MeasurementID': measurement['MeasurementID'],
'TimeMeasured': str(datetime.now()),
'CenterLat': measurement['CenterLat'],
'CenterLon': measurement['CenterLon'],
'Sensor': {
'SensorID': measurement['Sensor']['SensorID'],
'SensorType': measurement['Sensor']['SensorType'],
'Active': measurement['Sensor']['Active'],
},
'Subdivisions': None,
}
for sub_division in measurement['Subdivisions']:
subdiv_id = sub_division['SubdivID']
# Extract center coordinates and round to 4 decimals
center_lat = round(sub_division['CenLatitude'], 4)
center_lng = round(sub_division['CenLongitude'], 4)
avg_thickness = sub_division['AvgThickness']
# Initialise list for the current ice stats
ice_stats = []
print("Fails here?")
# Ice statistics were retrieved successfully
if len(all_ice_stats) >= subdiv_id is not None or all_ice_stats[subdiv_id] != "Null":
ice_stats = all_ice_stats[subdiv_id]
accuracy = 3
print("Fails here, later?")
# Increase accuracy by 1 if the LiDar data and NVE data have a minimal discrepancy
if abs(avg_thickness - all_ice_stats[subdiv_id][3]['Black ice (m)']) < 1.0:
accuracy = 4
else: # Failed to retrieve ice statistics, initialise empty ice stats object
ice_stats = {
"Date": "NA",
"Slush ice (m)": 0,
"Black ice (m)": 0,
"Total ice (m)": 0,
"Snow depth (m)": 0.0,
"Total snow (m)": 0.0,
"Cloud cover": 0.0,
"Temperature (c)": 0.0
}
accuracy = 2
# Create new subdivision object
sub_division = {
'SubdivID': subdiv_id,
'GroupID': 0,
'MinThickness': avg_thickness,
'AvgThickness': sub_division['AvgThickness'],
'CenLatitude': center_lat,
'CenLongitude': center_lng,
'Accuracy': accuracy,
'Color': calculateColor(avg_thickness),
'IceStats': ice_stats,
}
sub_div_ids.append(subdiv_id)
# Append processed subdivision data
processed_subdivs.append(sub_division)
# Append processed measurement and subdivisions
new_measurement['Subdivisions'] = processed_subdivs
measurements.append(new_measurement)
# Populate remaining non-processed subdivisions and create "invalid" or "proxy" measurement to store them
remaining_sub_divs = fill_remaining_subdivisions(lake_name, sub_div_ids, all_ice_stats)
proxy = {
'MeasurementID': -1,
'TimeMeasured': str(datetime.now()),
'CenterLat': None,
'CenterLon': None,
'Sensor': None,
'Subdivisions': remaining_sub_divs
}
measurements.append(proxy)
# Write the newest measurements to file
with open(LAKE_RELATIONS_PATH + lake_name.lower() + '_measurements.json', 'w') as f:
json.dump(measurements, f, indent=4)
# Convert list of dictionaries to JSON
response_data = json.dumps(measurements, indent=4)
# Return data
return 200, response_data
except Exception as e:
print(f"Error in updating measurements: {e}")
return 500, f"Error in updating measurements: {e}".encode('utf-8')
def fill_remaining_subdivisions(lake_name: str, processed_ids: list, all_ice_stats):
"""
Returns a list of subdivision dictionaries for subdivisions without measurements.
Parameters:
lake_name (str): The name of the requested file/lake
processed_ids (list): List of ids (int) of all subdivisions that have already been processed
Returns:
sub_divisions (list): A list of subdivision dictionaries
"""
try:
# Read the lake relation for the requested lake
with open(LAKE_RELATIONS_PATH + lake_name + '_div.json', 'r') as file:
relation = json.load(file)
sub_divisions = []
# Loop through each feature and extract all subdivisions
for sub_div in relation['features']:
sub_div_id = int(sub_div['properties']['sub_div_id'])
# Only get subdivisions that are not in the list already
if sub_div_id not in processed_ids:
# Extract center coordinates and round to 4 decimals
center_lat = round(sub_div['properties']['sub_div_center'][0], 4)
center_lng = round(sub_div['properties']['sub_div_center'][1], 4)
# Fetch weather data for each subdivision from the NVE model
ice_stats = []
if len(all_ice_stats) >= sub_div_id and all_ice_stats[sub_div_id] != "Null":
ice_stats = all_ice_stats[sub_div_id]
total_ice_thickness = ice_stats[0]['Black ice (m)']
accuracy = 1
else: # Initialise empty ice stats
ice_stats = {
"Date": "NA",
"Slush ice (m)": 0,
"Black ice (m)": 0,
"Total ice (m)": 0,
"Snow depth (m)": 0.0,
"Total snow (m)": 0.0,
"Cloud cover": 0.0,
"Temperature (c)": 0.0
}
total_ice_thickness = 0
accuracy = 0
# Create new subdivision object
sub_division = {
'SubdivID': sub_div_id,
'GroupID': None,
'MinThickness': total_ice_thickness,
'AvgThickness': total_ice_thickness,
'CenLatitude': center_lat,
'CenLongitude': center_lng,
'Accuracy': accuracy,
'Color': calculateColor(total_ice_thickness),
'IceStats': ice_stats,
}
sub_divisions.append(sub_division)
return sub_divisions
except FileNotFoundError as e:
print("Failed to find relation file: ", e)
except Exception as e:
print("Failed to add remaining subdivisions: ", e)
def calculateColor(thickness: float):
if 0 < thickness <= 4:
return 1 # Red
elif 4 < thickness <= 8:
return 2 # Orange
elif 8 < thickness <= 10:
return 3 # Green
elif thickness > 10:
return 4 # Blue
else:
return 0 # Grey
def add_test_data(self, lake_name: str):
"""
Adds random test data to lake_name_lidar_data.json. This function is purly for testing, not production.
The function overwrites the lidar data for the selected lake.
Parameters:
self (BaseHTTPRequestHandler): A instance of a BaseHTTPRequestHandler
lake_name (str): The name of the file/lake for the test data
"""
try:
test_data = []
sub_div_id = 0
for measurement_id in range(5):
measurement = {
"MeasurementID": measurement_id,
"TimeMeasured": datetime.now().isoformat(),
"CenterLat": round(random.uniform(60, 61), 4),
"CenterLon": round(random.uniform(10, 11), 4),
"Sensor": {
"SensorID": random.randint(1, 10),
"SensorType": "LiDar",
"Active": True
},
"Subdivisions": []
}
# Create 10 subdivisions for each measurement, with randomized coordinates and thicknesses
for subdiv_id in range(30):
subdivision = {
"SubdivID": sub_div_id,
"MinThickness": round(random.uniform(4, 20), 1),
"AvgThickness": round(random.uniform(2, 15), 1),
"CenLatitude": random.uniform(60, 61),
"CenLongitude": random.uniform(10, 11),
"Accuracy": 0
}
measurement["Subdivisions"].append(subdivision)
sub_div_id += 1
test_data.append(measurement)
# Overwrite the lidar data file
with open(LAKE_RELATIONS_PATH + lake_name + '_lidar_data.json', 'w') as f:
json.dump(test_data, f, indent=4)
# Convert list of dictionaries to JSON
response_data = json.dumps(test_data, indent=4)
# Set headers
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
# Write processed data to response object
self.wfile.write(response_data.encode('utf-8'))
except FileNotFoundError as e:
print("Failed to find relation file: ", e)
# Set headers
self.send_response(500)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write("File not found")
except Exception as e:
print("Failed to add remaining subdivisions: ", e)
# Set headers
self.send_response(500)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(f"Error in adding test data: {e}".encode('utf-8'))