Newer
Older
from datetime import datetime
import random
from random import randint
# get_markers requests all marker data or valid markers, converts the data to json, and writes
# the data to the response object
def get_all_markers(self, cursor, waterBodyName):
SELECT m.MeasurementID, m.SensorID, m.TimeMeasured, m.CenterLat, m.CenterLon,
s.SensorType, s.Active,
b.Name,
d.SubDivisionID, d.GroupID, d.MinimumThickness,
d.AverageThickness, d.CenterLatitude, d.CenterLongitude,
d.Accuracy
FROM Measurement m
INNER JOIN Sensor s ON m.SensorID = s.SensorID
INNER JOIN BodyOfWater b ON m.WaterBodyName = b.Name
LEFT JOIN SubDivision d ON m.MeasurementID = d.MeasurementID
rows = cursor.fetchall()
# Container for all fetched measurement objects
'''
f = open('server/?')
data = json.load(f)
'''
# Iterate over all fetched rows
'''
curr_stat = []
# Find matching ice stat
for stat in data['stats']
if ice_stats.sub_div_id == row[8]
curr_stat = stat
break
'''
sub_division = {
'SubdivID': row[8],
'GroupID': row[9],
'MinThickness': row[10],
'AvgThickness': row[11],
'CenLatitude': row[12],
'CenLongitude': row[13],
'Color': calculateColor(row[11]), # NB color calculated based on average thickness, should be minimum
'IceStats': () # NB temp empty, add ice stats later
# Check if measurement ID already exists in measurement_data
if measurement_id in measurement_data:
# Create new subdivision within measurement if it does not already exist
if sub_division not in measurement_data[measurement_id]['Subdivisions']:
measurement_data[measurement_id]['Subdivisions'].append(sub_division)
# Create a new entry for measurement_id if it does not already exist in the list
measurement_data[measurement_id] = {
'MeasurementID': measurement_id,
'TimeMeasured': row[2],
'CenterLat': row[3],
'CenterLon': row[4],
'Sensor': { # Each measurement only has one related sensor
'SensorType': row[5],
'Active': bool(row[6])
'Subdivisions': [sub_division], # Array of sub_division objects
########################### TEST DATA ###########################################
# Temporary test data
test_measurements = []
subdiv_id = 17
for i in range(3, 10):
sub_divisions = []
for j in range(0, 30):
min_thickness = random.uniform(0, 10)
avg_thickness = random.uniform(0, 15) + min_thickness
subdivision = {
'SubdivID': subdiv_id,
'GroupID': 1,
'MinThickness': min_thickness,
'AvgThickness': avg_thickness,
'CenLatitude': 7.0,
'CenLongitude': 8.0,
'Accuracy': 1.0,
'Color': calculateColor(avg_thickness),
'IceStats': ()
}
sub_divisions.append(subdivision)
subdiv_id += 1
measurement = {
'MeasurementID': i,
'TimeMeasured': str(datetime.now()),
'CenterLat': 10.0,
'CenterLon': 8.0,
'Sensor': {
'SensorID': 1,
'SensorType': "test data",
'Active': True
},
'Subdivisions': sub_divisions
}
test_measurements.append(measurement)
########################### TEST DATA ###########################################
# NB remember to clos file after implementation
# f.close()
# Convert dictionary values to list of measurements
data = list(measurement_data.values()) + test_measurements
if len(rows) == 0 or len(data) == 0: # Return 500 and empty list if no data is found
print(f"No data which meets the condition found")
else:
# Convert list of dictionaries to JSON
marker_data = json.dumps(data, indent=4)
print(f"Error in querying database: {e}")
# Set headers
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
# Write marker data to response object
self.wfile.write(marker_data.encode('utf-8'))
def calculateColor(thickness: float): # NB not final colors nor ranges
if 0 < thickness <= 4:
elif 4 < thickness <= 6:
elif 6 < thickness <= 8:
return 0xFFb1ff00 # Green
elif thickness > 8:
return 0xFF00d6ff # Blue
else:
return 0xFF939393 # Gray