Coverage for brodata / sad.py: 90%
264 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-20 14:37 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-20 14:37 +0000
1import pandas as pd
2import geopandas as gpd
3from functools import partial
4from . import bro
7class SiteAssessmentData(bro.FileOrUrl):
8 """Class to represent a Site Assessment Data (SAD) from the BRO."""
10 _rest_url = "https://publiek.broservices.nl/sq/sad/v1"
11 _xmlns = "http://www.broservices.nl/xsd/dssad-internal/1.1"
12 _char = "SAD_C"
14 def _read_contents(self, tree):
15 ns = {
16 "brocom": "http://www.broservices.nl/xsd/brocommon/3.0",
17 "gml": "http://www.opengis.net/gml/3.2",
18 "sadcommon": "http://www.broservices.nl/xsd/sadcommon-internal/1.1",
19 "xmlns": self._xmlns,
20 }
21 sad = self._get_main_object(tree, "SAD_O", ns)
23 for key in sad.attrib:
24 setattr(self, key.split("}", 1)[1], sad.attrib[key])
25 for child in sad:
26 key = self._get_tag(child)
27 if len(child) == 0:
28 setattr(self, key, child.text)
29 elif key == "geometry":
30 setattr(self, key, self._read_geometry(child))
31 elif key in ["registrationHistory"]:
32 self._read_children_of_children(child)
33 elif key == "standardizedLocation":
34 self._read_standardized_location(child)
35 elif key == "report":
36 if hasattr(self, key):
37 self._raise_assumed_single(key)
38 self.report = {}
39 self._read_children_of_children(child, d=self.report)
40 elif key == "measurementPoint":
41 if not hasattr(self, key):
42 self.measurementPoint = []
43 for grandchild in child:
44 key = self._get_tag(grandchild)
45 if key == "MeasurementPoint":
46 mp = self._read_measurement_point(grandchild)
47 self.measurementPoint.append(mp)
48 else:
49 self.warn_unknown_tag(key)
50 elif key == "mixedSampleAnalysis":
51 if not hasattr(self, key):
52 self.mixedSampleAnalysis = []
53 for grandchild in child:
54 key = self._get_tag(grandchild)
55 if key == "MixedSampleAnalysis":
56 msa = self._read_mixed_sample_analysis(grandchild)
57 self.mixedSampleAnalysis.append(msa)
58 else:
59 self.warn_unknown_tag(key)
60 else:
61 self._warn_unknown_tag(key)
63 if hasattr(self, "measurementPoint"):
64 self.measurementPoint = pd.DataFrame(self.measurementPoint)
65 if "deliveredLocation" in self.measurementPoint.columns:
66 self.measurementPoint = gpd.GeoDataFrame(
67 self.measurementPoint, geometry="deliveredLocation"
68 )
69 if "name" in self.measurementPoint.columns:
70 self.measurementPoint.set_index("name", inplace=True)
71 if hasattr(self, "mixedSampleAnalysis"):
72 self.mixedSampleAnalysis = pd.DataFrame(self.mixedSampleAnalysis)
74 def _read_mixed_sample_analysis(self, node):
75 d = {}
76 for child in node:
77 key = self._get_tag(child)
78 if key in ["identification", "name", "beginDepth", "endDepth"]:
79 d[key] = self._parse_text(child, key)
80 elif key == "analysis":
81 for grandchild in child:
82 key2 = self._get_tag(grandchild)
83 if key2 == "Analysis":
84 if key not in d:
85 d[key] = []
86 d[key].append(self._read_analysis(grandchild))
87 else:
88 self._warn_unknown_tag(key)
89 elif key == "soilSampling":
90 if key not in d:
91 d[key] = []
92 if len(child) == 0:
93 ss = {}
94 for attrib in child.attrib:
95 key2 = attrib.split("}", 1)[1]
96 ss[key2] = child.attrib[attrib]
97 d[key].append(ss)
98 else:
99 for grandchild in child:
100 key2 = self._get_tag(grandchild)
101 if key2 == "SoilSampling":
102 d[key].append(self._read_soil_sampling(grandchild))
103 else:
104 self._warn_unknown_tag(key)
106 else:
107 self._warn_unknown_tag(key)
108 if "analysis" in d:
109 d["analysis"] = pd.DataFrame(d["analysis"])
110 if "soilSampling" in d:
111 d["soilSampling"] = pd.DataFrame(d["soilSampling"])
112 return d
114 def _read_measurement_point(self, node):
115 d = {}
116 for child in node:
117 key = self._get_tag(child)
118 if key in ["identification", "name", "date", "finalDepth", "type"]:
119 d[key] = self._parse_text(child, key, to_float=["finalDepth"])
120 elif key == "deliveredLocation":
121 d[key] = self._read_geometry(child)
122 elif key == "deliveredVerticalPosition":
123 self._read_delivered_vertical_position(child, d=d)
124 elif key == "boreholeSampleDescription":
125 if self._check_single_child_with_tag(
126 child, "BoreholeSampleDescription"
127 ):
128 child = child[0]
129 self._read_borehole_sample_description(child, d)
130 elif key == "soilSampling":
131 if key not in d:
132 d[key] = []
133 if self._check_single_child_with_tag(child, "SoilSampling"):
134 child = child[0]
135 d[key].append(self._read_soil_sampling(child))
136 elif key == "filter":
137 if key not in d:
138 d[key] = []
139 if self._check_single_child_with_tag(child, "Filter"):
140 child = child[0]
141 d[key].append(self._read_filter(child))
142 else:
143 self._warn_unknown_tag(key)
145 if "soilSampling" in d:
146 d["soilSampling"] = pd.DataFrame(d["soilSampling"])
148 if "filter" in d:
149 d["filter"] = pd.DataFrame(d["filter"])
151 return d
153 def _read_filter(self, node):
154 d = {}
155 for child in node:
156 key = self._get_tag(child)
157 if key in ["identification", "name", "upperBoundary", "lowerBoundary"]:
158 d[key] = self._parse_text(
159 child, key, to_float=["upperBoundary", "lowerBoundary"]
160 )
161 elif key == "deliveredVerticalPosition":
162 self._read_delivered_vertical_position(child, d=d)
163 elif key == "groundwaterSampling":
164 for grandchild in child:
165 if key not in d:
166 d[key] = []
167 key2 = self._get_tag(grandchild)
168 if key2 == "GroundwaterSampling":
169 gs = self._read_groundwater_sampling(grandchild)
170 d[key].append(gs)
171 else:
172 self._warn_unknown_tag(key2)
173 else:
174 self._warn_unknown_tag(key)
175 if "groundwaterSampling" in d:
176 d["groundwaterSampling"] = pd.DataFrame(d["groundwaterSampling"])
177 # Flatten groundwaterSampleAnalysis from each sampling into a per-filter DataFrame
178 analyses = []
179 for _, samp in d["groundwaterSampling"].iterrows():
180 samp_ident = samp.get("identification", None)
181 samp_name = samp.get("name", None)
182 samp_date = samp.get("date", None)
183 if "groundwaterSampleAnalysis" in samp and isinstance(
184 samp["groundwaterSampleAnalysis"], pd.DataFrame
185 ):
186 gsa_df = samp["groundwaterSampleAnalysis"]
187 for _, gsa in gsa_df.iterrows():
188 gsa_ident = gsa.get("identification", None)
189 gsa_name = gsa.get("name", None)
190 if "analysis" in gsa and isinstance(
191 gsa["analysis"], pd.DataFrame
192 ):
193 for _, row in gsa["analysis"].iterrows():
194 rowd = dict(row)
195 # keep reference to the sampling and the sample analysis id
196 rowd["groundwaterSampling_identification"] = samp_ident
197 rowd["groundwaterSampling_name"] = samp_name
198 rowd["groundwaterSampling_date"] = samp_date
199 rowd["groundwaterSampleAnalysis_identification"] = (
200 gsa_ident
201 )
202 rowd["groundwaterSampleAnalysis_name"] = gsa_name
203 analyses.append(rowd)
204 if len(analyses) > 0:
205 d["groundwaterSampleAnalysis"] = pd.DataFrame(analyses)
206 else:
207 d["groundwaterSampleAnalysis"] = pd.DataFrame()
208 # remove analysis results from groundwaterSampling to avoid duplication
209 if "groundwaterSampleAnalysis" in d["groundwaterSampling"].columns:
210 d["groundwaterSampling"] = d["groundwaterSampling"].drop(
211 columns=["groundwaterSampleAnalysis"]
212 )
213 return d
215 def _read_groundwater_sampling(self, node):
216 d = {}
217 for child in node:
218 key = self._get_tag(child)
219 if key in ["identification", "name", "date"]:
220 d[key] = self._parse_text(child, key)
221 elif key == "groundwaterSampleAnalysis":
222 for grandchild in child:
223 if key not in d:
224 d[key] = []
225 key2 = self._get_tag(grandchild)
226 if key2 == "GroundwaterSampleAnalysis":
227 gsa = self._read_groundwater_sample_analysis(grandchild)
228 d[key].append(gsa)
229 else:
230 self._warn_unknown_tag(key2)
231 else:
232 self._warn_unknown_tag(key)
233 if "groundwaterSampleAnalysis" in d:
234 d["groundwaterSampleAnalysis"] = pd.DataFrame(
235 d["groundwaterSampleAnalysis"]
236 )
237 return d
239 def _read_groundwater_sample_analysis(self, node):
240 d = {}
241 for child in node:
242 key = self._get_tag(child)
243 if key in ["identification", "name"]:
244 d[key] = self._parse_text(child, key)
245 elif key == "analysis":
246 for grandchild in child:
247 key2 = self._get_tag(grandchild)
248 if key2 == "Analysis":
249 if key not in d:
250 d[key] = []
251 d[key].append(self._read_analysis(grandchild))
252 else:
253 self._warn_unknown_tag(key2)
254 else:
255 self._warn_unknown_tag(key)
256 if "analysis" in d:
257 d["analysis"] = pd.DataFrame(d["analysis"])
258 return d
260 def _read_borehole_sample_description(self, node, d):
261 for child in node:
262 key = self._get_tag(child)
263 if key == "descriptiveBoreholeLog":
264 for grandchild in child:
265 key = self._get_tag(grandchild)
266 if key == "DescriptiveBoreholeLog":
267 if key in d:
268 self.raise_assumed_single(key)
269 d[key] = self._read_descriptive_borehole_log(grandchild)
270 else:
271 self._warn_unknown_tag(key)
272 elif key == "descriptionProcedure":
273 d[key] = child.text
274 else:
275 self._warn_unknown_tag(key)
277 def _read_soil_sampling(self, node):
278 d = {}
279 for child in node:
280 key = self._get_tag(child)
281 if key in ["identification", "name", "beginDepth", "endDepth", "date"]:
282 d[key] = self._parse_text(child, key)
283 elif key == "soilSampleAnalysis":
284 for grandchild in child:
285 key = self._get_tag(grandchild)
286 if key == "SoilSampleAnalysis":
287 self._read_soil_sample_analysis(grandchild, d)
288 else:
289 self._warn_unknown_tag(key)
290 else:
291 self._warn_unknown_tag(key)
293 def _read_soil_sample_analysis(self, node, d):
294 for child in node:
295 key = self._get_tag(child)
296 if key in ["identification", "name"]:
297 d[key] = self._parse_text(child, key)
298 elif key == "analysis":
299 for grandchild in child:
300 key = self._get_tag(grandchild)
301 if key == "Analysis":
302 if "analysis" not in d:
303 d["analysis"] = []
304 d["analysis"].append(self._read_analysis(grandchild))
305 else:
306 self._warn_unknown_tag(key)
307 else:
308 self._warn_unknown_tag(key)
309 if "analysis" in d:
310 d["analysis"] = pd.DataFrame(d["analysis"])
312 def _read_analysis(self, node):
313 d = {}
314 for child in node:
315 key = self._get_tag(child)
316 if key in [
317 "identification",
318 "quantity",
319 "parameter",
320 "analysisMeasurementValue",
321 "condition",
322 "limitSymbol",
323 ]:
324 to_float = ["analysisMeasurementValue"]
325 d[key] = self._parse_text(child, key, to_float=to_float)
326 else:
327 self._warn_unknown_tag(key)
328 return d
331cl = SiteAssessmentData
333get_bro_ids_of_bronhouder = partial(bro._get_bro_ids_of_bronhouder, cl)
334get_bro_ids_of_bronhouder.__doc__ = bro._get_bro_ids_of_bronhouder.__doc__
336get_data_for_bro_ids = partial(bro._get_data_for_bro_ids, cl)
337get_data_for_bro_ids.__doc__ = bro._get_data_for_bro_ids.__doc__
339get_characteristics = partial(bro._get_characteristics, cl)
340get_characteristics.__doc__ = bro._get_characteristics.__doc__
342get_data_in_extent = partial(bro._get_data_in_extent, cl)
343get_data_in_extent.__doc__ = bro._get_data_in_extent.__doc__