Skip to content
Snippets Groups Projects
Commit d690e591 authored by Sara Savanovic Djordjevic's avatar Sara Savanovic Djordjevic
Browse files

fix: merge conflicts with main

parents f7a2bc84 cbf8e428
No related branches found
No related tags found
1 merge request!21Clhp map
Showing
with 464 additions and 14059 deletions
No preview for this file type
This diff is collapsed.
This diff is collapsed.
No preview for this file type
No preview for this file type
No preview for this file type
import unittest
from unittest.mock import patch, mock_open
import json
from sentinelhub import BBox, CRS
from server.Sentinelhub import box_funcitons
# Sample data for mocking the box.json content
mock_box_json = {
"type": "MultiPolygon",
"coordinates": [
[
[
[10.0, 60.0],
[10.0, 61.0],
[11.0, 61.0],
[11.0, 60.0]
]
],
[
[
[12.0, 62.0],
[12.0, 63.0],
[13.0, 63.0],
[13.0, 62.0]
]
]
]
}
class TestBoxFunctions(unittest.TestCase):
@patch('server.Sentinelhub.box_funcitons.open', new_callable=mock_open, read_data=json.dumps(mock_box_json))
@patch('server.Sentinelhub.box_funcitons.os.path.realpath', return_value='/path/to/fake/dir')
def test_get_all_box(self, mock_realpath, mock_open):
boxes = box_funcitons.get_all_box()
expected_boxes = [
[[10.0, 60.0], [10.0, 61.0], [11.0, 61.0], [11.0, 60.0]],
[[12.0, 62.0], [12.0, 63.0], [13.0, 63.0], [13.0, 62.0]]
]
self.assertEqual(boxes, expected_boxes)
def test_create_BBox(self):
box = [[10.0, 60.0], [10.0, 61.0], [11.0, 61.0], [11.0, 60.0]]
bbox = box_funcitons.create_BBox(box)
expected_bbox = BBox((10.0, 60.0, 11.0, 61.0), CRS.WGS84)
self.assertEqual(bbox, expected_bbox)
@patch('server.Sentinelhub.box_funcitons.get_all_box')
def test_get_all_bbox(self, mock_get_all_box):
mock_get_all_box.return_value = [
[[10.0, 60.0], [10.0, 61.0], [11.0, 61.0], [11.0, 60.0]],
[[12.0, 62.0], [12.0, 63.0], [13.0, 63.0], [13.0, 62.0]]
]
bboxes = box_funcitons.get_all_bbox()
expected_bboxes = [
BBox((10.0, 60.0, 11.0, 61.0), CRS.WGS84),
BBox((12.0, 62.0, 13.0, 63.0), CRS.WGS84)
]
self.assertEqual(bboxes, expected_bboxes)
def test_get_distance(self):
box = [[10.0, 60.0], [10.0, 61.0], [11.0, 61.0], [11.0, 60.0]]
point = (10.5, 60.5)
distance = box_funcitons.get_distance(box, point)
self.assertEqual(distance, 0.0)
point_outside = (15.0, 65.0)
distance_outside = box_funcitons.get_distance(box, point_outside)
self.assertAlmostEqual(distance_outside, 5.656, places=2)
@patch('server.Sentinelhub.box_funcitons.get_all_box')
def test_get_closest_bbox_and_id(self, mock_get_all_box):
mock_get_all_box.return_value = [
[[10.0, 60.0], [10.0, 61.0], [11.0, 61.0], [11.0, 60.0]],
[[12.0, 62.0], [12.0, 63.0], [13.0, 63.0], [13.0, 62.0]]
]
bbox, idx = box_funcitons.get_closest_bbox_and_id(10.5, 60.5)
expected_bbox = BBox((10.0, 60.0, 11.0, 61.0), CRS.WGS84)
self.assertEqual(bbox, expected_bbox)
self.assertEqual(idx, 0)
bbox, idx = box_funcitons.get_closest_bbox_and_id(12.5, 62.5)
expected_bbox = BBox((12.0, 62.0, 13.0, 63.0), CRS.WGS84)
self.assertEqual(bbox, expected_bbox)
self.assertEqual(idx, 1)
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
import unittest
import pandas as pd
import datetime as dt
from server.Sentinelhub import getAreaInfo
class MyTestCase(unittest.TestCase):
def test_stats_to_df(self):
stats_data = {
"data": [
{
"interval": {"from": "2023-01-01T00:00:00Z", "to": "2023-01-02T00:00:00Z"},
"outputs": {
"default": {
"bands": {
"B0": {
"stats": {
"sampleCount": 100,
"noDataCount": 0,
"min": 0.0,
"max": 1.0,
"mean": 0.5,
"stDev": 0.1,
"percentiles": {"10.0": 0.1, "90.0": 0.9}
}
}
}
}
}
}
]
}
expected_df = pd.DataFrame({
"interval_from": [dt.date(2023, 1, 1)],
"interval_to": [dt.date(2023, 1, 2)],
"default_B0_sampleCount": [100],
"default_B0_noDataCount": [0],
"default_B0_min": [0.0],
"default_B0_max": [1.0],
"default_B0_mean": [0.5],
"default_B0_stDev": [0.1],
"default_B0_percentiles_10.0": [0.1],
"default_B0_percentiles_90.0": [0.9]
})
result_df = getAreaInfo.stats_to_df(stats_data)
pd.testing.assert_frame_equal(result_df, expected_df)
def test_classify_ice(self):
data = pd.DataFrame({
'default_B0_stDev': [36, 10, 5, 8],
'default_B0_noDataCount': [500, 500, 500, 500]
})
expected_conditions = ['High probability of Cloud', 'No ice', 'Ice', 'No ice']
result = getAreaInfo.classify_ice(data)
self.assertEqual(result['ice_condition'].tolist(), expected_conditions)
def test_get_last_date(self):
data = pd.DataFrame({
'interval_to': ['2023-01-01', '2023-02-01', '2023-03-01']
})
expected_last_date = '2023-03-01'
self.assertEqual(getAreaInfo.get_last_date(data), expected_last_date)
expected_last_date_none = (
dt.datetime.now().year - 1 if dt.datetime.now().month < 8 else dt.datetime.now().year, 8, 1)
self.assertEqual(getAreaInfo.get_last_date(None), dt.datetime(*expected_last_date_none).strftime("%Y-%m-%d"))
def test_get_ice_run_dates(self):
data = pd.DataFrame({
'interval_to': ['2023-01-01', '2023-02-01', '2023-03-01', '2023-04-01'],
'interval_from': ['2023-01-02', '2023-02-02', '2023-03-02', '2023-04-02'],
'ice_condition': ['Ice', 'No ice', 'Ice', 'High probability of Cloud']
})
expected_dates = ['2023-01-17', '2023-02-15', '2023-03-17']
self.assertEqual(getAreaInfo.get_ice_run_dates(data), expected_dates)
if __name__ == '__main__':
unittest.main()
......@@ -128,6 +128,7 @@ def define_gridareas(lat, lng, area_offset, grid_size):
# find the main area's corner positions
main_area = calculate_corners(lat, lng, area_offset)
# offset
offset_lng, offset_lat = area_offset
# find the main area's range size
......@@ -141,9 +142,9 @@ def define_gridareas(lat, lng, area_offset, grid_size):
# find the center coordinates of each area in grid to find the corner areas
for y in (range(grid_size)):
relative_size_lat = y / grid_size
relative_size_lat = y / grid_size # relative y position on grid
for x in (range(grid_size)):
relative_size_lng = x / grid_size
relative_size_lng = x / grid_size # relative x position on grid
lat_pos = main_area[3][0] + relative_size_lat * area_size[0] + dist_to_subcenter[0]
lng_pos = main_area[3][1] + relative_size_lng * area_size[1] + dist_to_subcenter[1]
......@@ -168,6 +169,7 @@ def define_grid_lidardata(max_area, grid_size, points):
# container for an area turned into a grid of areas
grided_area = []
# smallest and biggest corner in area
bottom_left, top_right = max_area
# find subareas size relative to the full size of data
......@@ -175,49 +177,33 @@ def define_grid_lidardata(max_area, grid_size, points):
subarea_offset_lat = (top_right[1]-bottom_left[1])
# find subareas size relative to the full size of data
subarea_hypotemus_lng = subarea_offset_lng / grid_size
subarea_hypotemus_lat = subarea_offset_lat / grid_size
subarea_hypotenuse_lng = subarea_offset_lng / grid_size
subarea_hypotenuse_lat = subarea_offset_lat / grid_size
# find the center coordinates of each area in grid to find the corner areas
# organize all coordinates into their corresponding area
# their own area in grid to find the corner areas
for y in (range(grid_size)):
relative_size_lat = y / grid_size
for x in (range(grid_size)):
relative_size_lng = x / grid_size
bottom_left_range = (subarea_offset_lng * relative_size_lng + bottom_left[0],
bottom_left_sub_corner = (subarea_offset_lng * relative_size_lng + bottom_left[0],
subarea_offset_lat * relative_size_lat + bottom_left[1])
top_right_range = (subarea_offset_lng * relative_size_lng + bottom_left[0] + subarea_hypotemus_lng,
subarea_offset_lat * relative_size_lat + bottom_left[1] + subarea_hypotemus_lat)
area_range = [bottom_left_range, top_right_range]
top_right_sub_corner = (subarea_offset_lng * relative_size_lng + bottom_left[0] + subarea_hypotenuse_lng,
subarea_offset_lat * relative_size_lat + bottom_left[1] + subarea_hypotenuse_lat)
# smallest and highest points in area
area_range = [bottom_left_sub_corner, top_right_sub_corner]
# filter out points that is outside of area range
area = list(filter(lambda point_position: inArea(point_position, area_range), points))
height_in_area = find_height(area)
# find the ice thickness from all the points in area
thickness_in_area = find_height(area)
grided_area.append(height_in_area)
# store all ice thickness in area
grided_area.append(thickness_in_area)
return grided_area
#[1,2,2,3,4,5,6,3,4,6,8,9,5,3,5.7,8,5,3]
#print(define_gridareas(60,10,1500,4))
#print(define_gridareas(3435693.5,6299200.0, 400000,4))
'''
# find real world coordinates to lidar
#
# p1 - area using real world coordinate
# p2 - area using real world coordinate
# l1 - min area limit lidar coordinate(pointcloud)
# l2 - max area limit lidar coordinate(pointcloud)
# max_limit - maximum area limit of a lidar scann in real world coordinates
def position_relative_to_pointcloud(l1, l2, p1, p2, center_lim, max_lim):
center_l = tuple((a+b)/2 for a, b in zip(l1,l2))
return [
((p1[0] - center_lim[0]) * ((l1[0] - l2[0]) / (max_lim[2][0] - max_lim[0][0])) + center_l[0],
(p1[1] - center_lim[1]) * ((l1[1] - l2[1]) / (max_lim[2][1] - max_lim[0][1])) + center_l[1]),
((p2[0] - center_lim[0]) * ((l1[0] - l2[0]) / (max_lim[2][0] - max_lim[0][0])) + center_l[0],
(p2[1] - center_lim[1]) * ((l1[1] - l2[1]) / (max_lim[2][1] - max_lim[0][1])) + center_l[1]),
]
#print(position_relative_to_pointcloud((-20,20), (-10,30), (1,3), (2,2), (2.5,2.5), [(5,5),(0,5),(0,0),(5,0)]))
#print(position_relative_to_pointcloud((-3299999, 4608018), (-3200001, 4687153), (61.47620866851029, 8.961138281887507), (61.95241733702057, 6.8508373935926645), (61, 11), [(61.95241733702057, 15.125349549679886), (60.04758266297943, 15.125349549679886), (60.04758266297943, 6.8746504503201145), (61.95241733702057, 6.8746504503201145)]))
'''
import json
import os
from datetime import datetime
from server.data_processing.process_lidar_data import calculate_area_data, about_laz_file, find_folder_files
from server.data_processing.process_lidar_data import calculate_area_data, about_laz_file
from server.consts import LIDAR_DATA_PATH, LAKE_RELATIONS_PATH
def lidar_data_handler(self, cursor, sensorId, lake_name: str):
# finds the thickness in all area in a lake
status_code, thickness_data = input_new_Lidar_data(cursor, sensorId, lake_name)
self.send_response(status_code)
......@@ -27,10 +28,6 @@ def input_new_Lidar_data(cursor, sensorId, lake_name: str) -> (int, str):
was taken on the map
"""
try:
# print("name=",lake_name)
# laz_root_path = "server\\lidar_data\\" + lake_name
# laz_data_paths = find_folder_files(laz_root_path)
# read json data with path to data for specific water body
file_path = os.path.join(LAKE_RELATIONS_PATH, lake_name + '_lidar_data.json')
......@@ -38,11 +35,6 @@ def input_new_Lidar_data(cursor, sensorId, lake_name: str) -> (int, str):
if not os.path.exists(file_path):
return 404, f"no data for {lake_name} found"
# Just temporary, not gonna use:
# with open(file_path, "w") as file:
# file.write(
# "[{\"MeasurementID\": 1,\"TimeMeasured\": \"2024-04-15 16:23:28.620516\",\"CenterLat\": 60.841532,\"CenterLon\": 10.717878,\"Sensor\": {\"SensorID\": 2,\"SensorType\": \"LiDar\",\"Active\": true},\"Subdivisions\": []},{\"MeasurementID\": 2,\"TimeMeasured\": \"2024-04-15 16:23:28.620516\",\"CenterLat\": 60.841532,\"CenterLon\": 10.717878,\"Sensor\": {\"SensorID\": 2,\"SensorType\": \"LiDar\",\"Active\": true},\"Subdivisions\": []}]") # Writing an empty JSON object
# get the areas from the map's data
with open(file_path) as data:
measurement_data = json.load(data)
......@@ -66,12 +58,10 @@ def input_new_Lidar_data(cursor, sensorId, lake_name: str) -> (int, str):
# path to each measurement data
laz_file_path = os.path.join(LIDAR_DATA_PATH + lake_name + '/measurement_id_' + str(measurement_id) + '.laz')
#about_laz_file(laz_file_path)
# data about the file read from
# about_laz = about_laz_file() cannot do this if data is changed
# scale_factor = max(about_laz[2])
scale_factor = max([0.01])
about_laz = about_laz_file(laz_file_path)
# scale factor of the ice thickness in meter
scale_factor = max(about_laz[2])
# time last updated data
time_now = datetime.now().utcnow().replace(microsecond=0)
......@@ -85,18 +75,19 @@ def input_new_Lidar_data(cursor, sensorId, lake_name: str) -> (int, str):
# Calculate each area's ice thickness based on the area coordinates given to the calculation model
areas_data = calculate_area_data((latitude, longitude), lake_name, laz_file_path)
if (areas_data):
# store lidar data in json format
# calculate data for each zone within the area
for area in areas_data:
# lng and lat relative to map
subId = int(area[0])
map_lat, map_lng = area[1]
heights = area[2]
if (len(heights) != 0 or sum(heights) != 0):
average = sum(heights) / len(heights)
minimum_thickness = min(heights)
subId = int(area[0]) # id of area
map_lat, map_lng = area[1] # lng and lat relative to map
ice_thickness = area[2] # ice thickness in area
# find the average and minimum ice thickness in area
if (len(ice_thickness) != 0 or sum(ice_thickness) != 0):
average = sum(ice_thickness) / len(ice_thickness)
minimum_thickness = min(ice_thickness)
else:
average = 0
minimum_thickness = 0
......@@ -109,7 +100,8 @@ def input_new_Lidar_data(cursor, sensorId, lake_name: str) -> (int, str):
''', (measurement_id, time_now, subId, lake_name, float(minimum_thickness), float(average),
float(0.0), scale_factor))
sub_center = (map_lat, map_lng)
# set up json format
# set up json format of area's data
lidar_read = {
'SubdivID': subId,
'MinThickness': float(minimum_thickness),
......@@ -118,10 +110,13 @@ def input_new_Lidar_data(cursor, sensorId, lake_name: str) -> (int, str):
'CenLongitude': float(sub_center[1]),
'Accuracy': scale_factor,
}
# add area to list of subdivisions
subdiv_json_data.append(lidar_read)
else:
print('No data found')
# container for measurement data
measurement_data = {
'MeasurementID': measurement_id,
'TimeMeasured': str(time_now),
......@@ -135,6 +130,7 @@ def input_new_Lidar_data(cursor, sensorId, lake_name: str) -> (int, str):
'Subdivisions': subdiv_json_data,
}
# add measurement data to list
lidar_json_data.append(measurement_data)
# send the changes to the database
......@@ -148,7 +144,6 @@ def input_new_Lidar_data(cursor, sensorId, lake_name: str) -> (int, str):
if os.path.exists(file_path):
os.remove(file_path)
# convert list of lidar data to json
content = json.dumps(lidar_json_data, indent=4)
# write to file
......@@ -168,6 +163,3 @@ def input_new_Lidar_data(cursor, sensorId, lake_name: str) -> (int, str):
cursor.connection.rollback()
return 500, f"An error occurred: {e} g".encode("utf-8")
# laz_root_path = "server\\lidar_data\\mj\u00f8sa"
# laz_data_paths = find_folder_files(laz_root_path)
# print(laz_data_paths)
......@@ -18,21 +18,7 @@ def about_laz_file(path):
:return: list with info about version, point_count, scale in meters and offset in meters
"""
with laspy.open(path) as fh:
# Print metadata properties
print("File Version:", fh.header.version)
print("Point Count:", fh.header.point_count)
print("Scale Factors:", fh.header.scale)
print("Offset:", fh.header.offset)
las = fh.read()
print(las)
print('Points from data:', len(las.points))
ground_pts = las.classification == 2
bins, counts = np.unique(las.return_number[ground_pts], return_counts=True)
print('Ground Point Return Number distribution:')
for r, c in zip(bins, counts):
print(' {}:{}'.format(r, c))
return [las.header.version, las.header.point_count, las.header.scale, las.header.offset]
def find_folder_files(direcory):
......@@ -52,13 +38,14 @@ def find_folder_files(direcory):
# and it's affiliations (subId and groupId) (soon to be implemented
def calculate_area_data(center, lake_name, path):
"""
calculate area
calculate lakes ice thickness of each of its area
:param center: center point of area to be calculated
:param lake_name: body of water area belongs to
:param path: path to lidar data
:return: the calculated ice thickness in designated area
:return: the calculated ice thickness in designated area's
"""
# check if file exist
if not os.path.exists(path):
print("File not found")
return []
......@@ -66,7 +53,7 @@ def calculate_area_data(center, lake_name, path):
# container for all the heights in area
area_heights = []
# zone coords taken
# area coords taken
taken_coords = []
# read json data with path to data for specific water body
......@@ -81,19 +68,19 @@ def calculate_area_data(center, lake_name, path):
# grid cell offset
# NB: make these a global var
grid_size = 4
#cell_x, cell_y = map_data['tile_width'], map_data['tile_height']
#cell_x, cell_y = (400*grid_size, 400*grid_size)
cell_x, cell_y = (2000, 1000)
cell_x, cell_y, size_in_km = map_data['cell_width'], map_data['cell_height'], map_data['cell_size_in_km']
print("f: ",cell_x," ", cell_y)
cell_x, cell_y = (cell_x * size_in_km, cell_y * size_in_km)
print("s: ",cell_x," ", cell_y)
# convert the offset to meter
#cell_x = 111.320 * cell_x * 1000
#cell_y = (40075 * math.cos(cell_y) * 1000) / 360
cell_x = 111.320 * cell_x
cell_y = (111.320 * math.cos(60)*cell_y)
cell_x = 111.320 * cell_x * 1000
cell_y = (40075 * math.cos(cell_y) * 1000) / 360
#cell_x = 111.320 * cell_x
#cell_y = (111.320 * math.cos(60) * cell_y)
# set the limit of the area compared to local coordinates
area_limit = calculate_corners(center[0], center[1], (cell_x, cell_y))
#area_limit = calculate_corners(sub_center[0], sub_center[1], (cell_x/4, cell_y/4))
# grid data
map_data = map_data['features']
......
No preview for this file type
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment