Coverage for brodata / sfr.py: 66%
279 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-20 14:37 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-20 14:37 +0000
1import logging
2from functools import partial
3import pandas as pd
5from . import bro
7logger = logging.getLogger(__name__)
10class SoilFaceResearch(bro.FileOrUrl):
11 """Class to represent a Soil Face Research (SFR) from the BRO.
13 The configuration of the xml-file can be found at
14 https://www.bro-productomgeving.nl/__attachments/1607470159/DO_ResponseSFR_O_DP.xml?inst-v=a371ba4a-5fb1-479c-99cb-44e36b9c21f9
15 (link found at https://www.bro-productomgeving.nl/bpo/latest/uitgifte-voorbeeldberichten-sfr)
16 """
18 _rest_url = "https://publiek.broservices.nl/sr/sfr/v2"
19 _xmlns = "http://www.broservices.nl/xsd/dssfr/2.0"
20 _char = "SFR_C"
22 def _read_contents(self, tree):
23 ns = {
24 "brocom": "http://www.broservices.nl/xsd/brocommon/3.0",
25 "gml": "http://www.opengis.net/gml/3.2",
26 "sfrcom": "http://www.broservices.nl/xsd/sfrcommon/2.0",
27 "xmlns": self._xmlns,
28 }
29 sfr = self._get_main_object(tree, "SFR_O", ns)
30 for key in sfr.attrib:
31 setattr(self, key.split("}", 1)[1], sfr.attrib[key])
32 for child in sfr:
33 key = self._get_tag(child)
34 if len(child) == 0:
35 setattr(self, key, child.text)
36 elif key == "deliveredLocation":
37 self._read_delivered_location(child)
38 elif key == "deliveredVerticalPosition":
39 to_float = ["offset"]
40 self._read_children_of_children(child, to_float=to_float)
41 elif key == "standardizedLocation":
42 self._read_standardized_location(child)
43 elif key in ["researchReportDate", "fieldworkDate"]:
44 setattr(self, key, self._read_date(child))
45 elif key in ["registrationHistory", "reportHistory"]:
46 self._read_children_of_children(child)
47 elif key in ["researchOperator"]:
48 setattr(self, key, self._read_operator(child))
49 elif key == "siteCharacteristic":
50 if self._check_single_child_with_tag(child, "SiteCharacteristic"):
51 child = child[0]
52 self._read_children_of_children(child)
53 elif key == "soilUncovering":
54 if self._check_single_child_with_tag(child, "SoilUncovering"):
55 child = child[0]
56 self._read_children_of_children(child)
57 elif key == "soilFaceDescription":
58 if self._check_single_child_with_tag(child, "SoilFaceDescription"):
59 child = child[0]
60 self._read_soil_face_description(child)
61 elif key == "soilFaceSampleAnalysis":
62 if self._check_single_child_with_tag(child, "SoilFaceSampleAnalysis"):
63 child = child[0]
64 self._read_soil_face_sample_analysis(child)
65 else:
66 self._warn_unknown_tag(key)
68 if hasattr(self, "litterLayer"):
69 self.litterLayer = pd.DataFrame(self.litterLayer)
70 if hasattr(self, "soilLayer"):
71 self.soilLayer = pd.DataFrame(self.soilLayer)
72 if hasattr(self, "disturbedInterval"):
73 self.disturbedInterval = pd.DataFrame(self.disturbedInterval)
74 if hasattr(self, "investigatedInterval"):
75 self.investigatedInterval = pd.DataFrame(self.investigatedInterval)
77 def _read_soil_face_sample_analysis(self, node):
78 for child in node:
79 key = self._get_tag(child)
80 if key == "analysisReportDate":
81 setattr(self, key, self._read_date(child))
82 elif key in ["analysisType"]:
83 setattr(self, key, child.text)
84 elif key in ["analysisOperator"]:
85 setattr(self, key, self._read_operator(child))
86 elif key == "investigatedInterval":
87 for grandchild in child:
88 key = self._get_tag(grandchild)
89 if key == "InvestigatedInterval":
90 d = self._read_investigated_interval(grandchild)
91 if hasattr(self, "investigatedInterval"):
92 self.investigatedInterval.append(d)
93 else:
94 self.investigatedInterval = [d]
95 else:
96 self._warn_unknown_tag(key)
97 elif key in []:
98 setattr(self, key, float(child.text))
99 else:
100 self._warn_unknown_tag(key)
102 def _read_investigated_interval(self, node):
103 d = {}
104 for child in node:
105 key = self._get_tag(child)
106 if key in ["beginDepth", "endDepth"]:
107 d[key] = float(child.text)
108 elif key in ["characteristicModelled"]:
109 d[key] = child.text
110 elif key == "pHDetermination":
111 for grandchild in child:
112 key = self._get_tag(grandchild)
113 if key == "PHDetermination":
114 self._read_ph_determination(grandchild, d)
115 else:
116 self._warn_unknown_tag(key)
117 elif key == "particleSizeDistributionDetermination":
118 for grandchild in child:
119 key = self._get_tag(grandchild)
120 if key == "ParticleSizeDistributionDetermination":
121 self._read_particle_size_distribution_determination(
122 grandchild, d
123 )
124 else:
125 self._warn_unknown_tag(key)
126 elif key == "shrinkageDetermination":
127 for grandchild in child:
128 key = self._get_tag(grandchild)
129 if key == "ShrinkageDetermination":
130 self._read_shrinkage_determination(grandchild, d=d)
131 else:
132 self._warn_unknown_tag(key)
133 else:
134 self._warn_unknown_tag(key)
135 return d
137 def _read_shrinkage_determination(self, node, d):
138 for child in node:
139 key = self._get_tag(child)
140 if key in [
141 "determinationProcedure",
142 "determinationMethod",
143 "disturbed",
144 ]:
145 d[key] = child.text
146 elif key == "shrinkage":
147 for child in node:
148 key = self._get_tag(child)
149 if key == "DataArray":
150 d["shrinkage"] = self._read_data_array(child)
151 else:
152 self._warn_unknown_tag(key)
153 else:
154 self._warn_unknown_tag(key)
156 def _read_particle_size_distribution_determination(self, node, d):
157 for child in node:
158 key = self._get_tag(child)
159 if key in [
160 "determinationProcedure",
161 "determinationMethod",
162 "particleSizeDistributionStandardised",
163 "fractionDistribution",
164 "performanceIrregularity",
165 "dispersionMethod",
166 ]:
167 d[key] = child.text
168 elif key == "nonStandardisedFraction":
169 d2 = {}
170 for grandchild in child:
171 key = self._get_tag(grandchild)
172 if key in [
173 "lowerBoundary",
174 "upperBoundary",
175 "proportion",
176 ]:
177 d2[key] = float(grandchild.text)
178 else:
179 self._warn_unknown_tag(key)
180 if "nonStandardisedFraction" in d:
181 d["nonStandardisedFraction"].append(d2)
182 else:
183 d["nonStandardisedFraction"] = [d2]
184 else:
185 self._warn_unknown_tag(key)
186 if "nonStandardisedFraction" in d:
187 d["nonStandardisedFraction"] = pd.DataFrame(d["nonStandardisedFraction"])
189 def _read_ph_determination(self, node, d):
190 for child in node:
191 key = self._get_tag(child)
192 if key in ["determinationProcedure", "determinationMethod"]:
193 # capitalize key but keep rest as is, and prepend pH
194 key = f"pH{key[0].upper()}{key[1:]}"
195 d[key] = child.text
196 elif key == "pH":
197 d[key] = float(child.text)
198 else:
199 self._warn_unknown_tag(key)
201 def _read_soil_face_description(self, node):
202 for child in node:
203 key = self._get_tag(child)
204 if key == "descriptionReportDate":
205 setattr(self, key, self._read_date(child))
206 elif key in [
207 "descriptionProcedure",
208 "describedWidth",
209 "artificiallyHumidified",
210 "fractionDistributionDetermined",
211 "lowerBoundarySandFraction",
212 ]:
213 setattr(self, key, child.text)
214 elif key == "descriptionOperator":
215 setattr(self, key, self._read_operator(child))
216 elif key == "soilProfile":
217 for grandchild in child:
218 key = self._get_tag(grandchild)
219 if key == "SoilProfile":
220 self._read_soil_profile(grandchild)
221 else:
222 self._warn_unknown_tag(key)
223 elif key == "soilClassification":
224 for grandchild in child:
225 key = self._get_tag(grandchild)
226 if key == "SoilClassification":
227 setattr(self, key, self._read_soil_classification(grandchild))
228 else:
229 self._warn_unknown_tag(key)
230 else:
231 self._warn_unknown_tag(key)
233 def _read_soil_classification(self, node):
234 d = {}
235 for child in node:
236 key = self._get_tag(child)
237 if key in [
238 "codeGroup",
239 "classificationCode",
240 "specialFeatureTop",
241 "soilClass",
242 "textureClass",
243 "peatClass",
244 "subsoilPeat",
245 "lowerBoundaryPeat",
246 "subsoilDuinVagueSoil",
247 "textureProfile",
248 "carbonateProfile",
249 "reworkingClass",
250 "groundwaterTableClass",
251 "anomalousGroundwaterRegime",
252 "specialFeatureSite",
253 ]:
254 d[key] = child.text
255 elif key == "specialFeatureBottom":
256 sfb = {}
257 for grandchild in child:
258 key = self._get_tag(grandchild)
259 if key == "specialFeature":
260 sfb[key] = grandchild.text
261 elif key == "beginDepth":
262 sfb[key] = float(grandchild.text)
263 else:
264 self._warn_unknown_tag(key)
265 if "specialFeatureBottom" in d:
266 d["specialFeatureBottom"].append(sfb)
267 else:
268 d["specialFeatureBottom"] = [sfb]
269 else:
270 self._warn_unknown_tag(key)
271 return d
273 def _read_soil_profile(self, node):
274 for child in node:
275 key = self._get_tag(child)
276 if key in [
277 "descriptionQuality",
278 "rootableDepthReached",
279 "meanHighestGroundwaterLevelReached",
280 "horizonRepetition",
281 "upperBoundaryShape",
282 "sequenceDisturbed",
283 "compactionPresent",
284 ]:
285 setattr(self, key, child.text)
286 elif key in [
287 "rootableDepth",
288 "meanHighestGroundwaterLevel",
289 "meanLowestGroundwaterLevel",
290 ]:
291 setattr(self, key, float(child.text))
292 elif key in ["localPhenomenon"]:
293 if hasattr(self, key):
294 getattr(self, key).append(child.text)
295 else:
296 setattr(self, key, [child.text])
297 elif key == "litterLayer":
298 for grandchild in child:
299 key = self._get_tag(grandchild)
300 if key == "LitterLayer":
301 self._read_litter_layer(grandchild)
302 else:
303 self._warn_unknown_tag(key)
304 elif key == "soilLayer":
305 for grandchild in child:
306 key = self._get_tag(grandchild)
307 if key == "SoilLayer":
308 self._read_soil_layer(grandchild)
309 else:
310 self._warn_unknown_tag(key)
311 elif key == "disturbedInterval":
312 d = {}
313 self._read_children_of_children(child, d=d)
314 if hasattr(self, "disturbedInterval"):
315 self.disturbedInterval.append(d)
316 else:
317 self.disturbedInterval = [d]
318 elif key == "compactedInterval":
319 d = {}
320 self._read_children_of_children(child, d=d)
321 self.compactedInterval = d
322 else:
323 self._warn_unknown_tag(key)
325 def _read_litter_layer(self, node):
326 layer = {}
327 for child in node:
328 key = self._get_tag(child)
329 if key in [
330 "upperBoundary",
331 "lowerBoundary",
332 ]:
333 layer[key] = float(child.text)
334 elif key in [
335 "upperBoundaryDetermination",
336 "lowerBoundaryDetermination",
337 "lowerBoundaryShape",
338 "layerDiscontinuous",
339 "horizonCode",
340 "litterType",
341 "estimatedOrganicMatterContent",
342 ]:
343 layer[key] = child.text
344 else:
345 self._warn_unknown_tag(key)
346 if hasattr(self, "litterLayer"):
347 self.litterLayer.append(layer)
348 else:
349 self.litterLayer = [layer]
351 def _read_soil_layer(self, node):
352 layer = {}
353 for child in node:
354 key = self._get_tag(child)
355 if key in [
356 "upperBoundary",
357 "lowerBoundary",
358 "particleDensity",
359 "fieldCapacity",
360 "wiltingPoint",
361 "saturationWaterContent",
362 "hydraulicConductivity",
363 "bulkDensity",
364 "stoneContent",
365 "clayFraction",
366 "siltFraction",
367 "sandFraction",
368 ]:
369 layer[key] = float(child.text)
370 elif key in [
371 "upperBoundaryDetermination",
372 "lowerBoundaryDetermination",
373 "lowerBoundaryShape",
374 "layerDiscontinuous",
375 "anthropogenic",
376 "mixed",
377 "inverted",
378 "rooted",
379 "rootsEvenlyDistributed",
380 "rootAbundanceClass",
381 "slant",
382 ]:
383 layer[key] = child.text
384 elif key == "soilLife":
385 if key in layer:
386 layer[key].append(child.text)
387 else:
388 layer[key] = [child.text]
389 elif key == "homogeneousMaterial":
390 for grandchild in child:
391 key = self._get_tag(grandchild)
392 if key in [
393 "specialMaterial",
394 "horizonCode",
395 "rockType",
396 "depositionalCharacteristic",
397 ]:
398 layer[key] = grandchild.text
399 elif key in ["estimatedSaturatedPermeability"]:
400 layer[key] = float(grandchild.text)
401 elif key == "soil":
402 self._read_soil(grandchild, layer)
403 elif key == "rock":
404 self._read_rock(grandchild, layer)
405 else:
406 self._warn_unknown_tag(key)
407 elif key == "layerComponent":
408 self._read_layer_component(node, layer, layer=layer)
409 else:
410 self._warn_unknown_tag(key)
412 if "layerComponent" in layer:
413 layer["layerComponent"] = pd.DataFrame(layer["layerComponent"])
414 if hasattr(self, "soilLayer"):
415 self.soilLayer.append(layer)
416 else:
417 self.soilLayer = [layer]
419 def _read_layer_component(self, node, d=None):
420 component = {}
421 self._read_children_of_children(node, d=component)
422 if "layerComponents" not in d:
423 d["layerComponents"] = []
424 d["layerComponents"].append(component)
427cl = SoilFaceResearch
429get_bro_ids_of_bronhouder = partial(bro._get_bro_ids_of_bronhouder, cl)
430get_bro_ids_of_bronhouder.__doc__ = bro._get_bro_ids_of_bronhouder.__doc__
432get_data_for_bro_ids = partial(bro._get_data_for_bro_ids, cl)
433get_data_for_bro_ids.__doc__ = bro._get_data_for_bro_ids.__doc__
435get_characteristics = partial(bro._get_characteristics, cl)
436get_characteristics.__doc__ = bro._get_characteristics.__doc__
438get_data_in_extent = partial(bro._get_data_in_extent, cl)
439get_data_in_extent.__doc__ = bro._get_data_in_extent.__doc__