Coverage for brodata / guf.py: 68%
240 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-20 14:37 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-20 14:37 +0000
1import logging
2from functools import partial
4import pandas as pd
6from . import bro
8logger = logging.getLogger(__name__)
11class GroundwaterUtilisationFacility(bro.FileOrUrl):
12 """Class to represent a Groundwater Utilisation Facility (GUF) from the BRO.
14 Attributes
15 ----------
16 broId : str
17 The BRO identifier of the GroundwaterUtilisationFacility object.
18 objectHistory : pd.DataFrame
19 DataFrame with the history of changes to the GUF object.
20 licence : dict
21 Dictionary with information about the groundwater usage licence.
22 realisedInstallation : dict
23 Dictionary with information about the realised installation.
24 """
26 _rest_url = "https://publiek.broservices.nl/gu/guf/v1"
27 _xmlns = "http://www.broservices.nl/xsd/dsguf/1.0"
28 _char = "GUF_C"
29 _namespace = {
30 "brocom": "http://www.broservices.nl/xsd/brocommon/3.0",
31 "gml": "http://www.opengis.net/gml/3.2",
32 "gufcommon": "http://www.broservices.nl/xsd/gufcommon/1.0",
33 "xmlns": _xmlns,
34 }
36 def _read_contents(self, tree):
37 ns = self._namespace
38 guf = self._get_main_object(tree, ["GUF_PO", "GUF_PPO"], ns)
39 for key in guf.attrib:
40 setattr(self, key.split("}", 1)[1], guf.attrib[key])
41 for child in guf:
42 key = self._get_tag(child)
43 if len(child) == 0:
44 setattr(self, key, child.text)
45 elif key == "standardizedLocation":
46 self._read_standardized_location(child)
47 elif key in ["registrationHistory"]:
48 self._read_children_of_children(child)
49 elif key == "validityPeriod":
50 self._read_validity_period(child)
51 elif key == "lifespan":
52 self._read_lifespan(child)
53 elif key == "objectHistory":
54 objectHistory = []
55 for event in child:
56 d = {}
57 for grandchild in event:
58 key = self._get_tag(grandchild)
59 if key == "date":
60 d[key] = self._read_date(grandchild)
61 else:
62 d[key] = grandchild.text
63 objectHistory.append(d)
64 setattr(self, "objectHistory", pd.DataFrame(objectHistory))
65 elif key == "licence":
66 for grandchild in child:
67 key = self._get_tag(grandchild)
68 if key == "LicenceGroundwaterUsage":
69 if hasattr(self, "licence"):
70 self._raise_assumed_single("licence")
71 setattr(
72 self,
73 "licence",
74 self._read_licence_groundwater_usage(grandchild),
75 )
76 else:
77 self._warn_unknown_tag(key)
78 elif key == "realisedInstallation":
79 if not hasattr(self, key):
80 self.realisedInstallation = []
81 if self._check_single_child_with_tag(child, "RealisedInstallation"):
82 child = child[0]
83 ri = self._read_realised_installation(child)
84 self.realisedInstallation.append(ri)
85 else:
86 self._warn_unknown_tag(key)
87 if hasattr(self, "designLoop"):
88 self.designLoop = pd.DataFrame(self.designLoop)
89 if hasattr(self, "designWell"):
90 self.designWell = pd.DataFrame(self.designWell)
91 if hasattr(self, "realisedLoop"):
92 self.realisedLoop = pd.DataFrame(self.realisedLoop)
93 if hasattr(self, "realisedWell"):
94 self.realisedWell = pd.DataFrame(self.realisedWell)
95 if hasattr(self, "realisedInstallation"):
96 self.realisedInstallation = pd.DataFrame(self.realisedInstallation)
97 if hasattr(self, "licensedQuantity"):
98 self.licensedQuantity = pd.DataFrame(self.licensedQuantity)
99 if hasattr(self, "designInstallation"):
100 self.designInstallation = pd.DataFrame(self.designInstallation)
102 def _read_licence_groundwater_usage(self, node):
103 d = {}
104 for child in node:
105 key = self._get_tag(child)
106 if key in ["identificationLicence", "legalType"]:
107 d[key] = child.text
108 elif key == "usageTypeFacility":
109 self._read_children_of_children(child, d)
110 elif key == "lifespan":
111 self._read_lifespan(child, d)
112 elif key == "designInstallation":
113 if not hasattr(self, key):
114 self.designInstallation = []
115 if self._check_single_child_with_tag(child, "DesignInstallation"):
116 child = child[0]
117 di = self._read_design_installation(child)
118 self.designInstallation.append(di)
119 elif key == "licensedQuantity":
120 if not hasattr(self, key):
121 self.licensedQuantity = []
122 lq = {}
123 self._read_children_of_children(child, d=lq)
124 self.licensedQuantity.append(lq)
125 else:
126 self._warn_unknown_tag(key)
127 return d
129 def _read_design_installation(self, node):
130 d = {}
131 for child in node:
132 key = self._get_tag(child)
133 if key in ["designInstallationId", "installationFunction"]:
134 to_int = ["designInstallationId"]
135 d[key] = self._parse_text(child, key, to_int=to_int)
136 elif key == "geometry":
137 d[key] = self._read_geometry(child)
138 elif key in ["energyCharacteristics", "lifespan"]:
139 for grandchild in child:
140 key = self._get_tag(grandchild)
141 to_float = [
142 "energyCold",
143 "energyWarm",
144 "maximumInfiltrationTemperatureWarm",
145 "power",
146 ]
147 d[key] = self._parse_text(grandchild, key, to_float=to_float)
148 elif key == "designLoop":
149 if not hasattr(self, key):
150 self.designLoop = []
151 if self._check_single_child_with_tag(child, "DesignLoop"):
152 child = child[0]
153 self.designLoop.append(self._read_design_loop(child))
154 elif key == "designWell":
155 if not hasattr(self, key):
156 self.designWell = []
157 if self._check_single_child_with_tag(child, "DesignWell"):
158 child = child[0]
159 self.designWell.append(self._read_design_well(child))
160 else:
161 self._warn_unknown_tag(key)
162 return d
164 def _read_design_loop(self, node):
165 d = {}
166 for child in node:
167 key = self._get_tag(child)
168 if key in ["designLoopId", "loopType"]:
169 to_int = ["designLoopId"]
170 d[key] = self._parse_text(child, key, to_int=to_int)
171 elif key == "geometry":
172 d[key] = self._read_geometry(child)
173 elif key == "lifespan":
174 self._read_lifespan(child, d)
175 else:
176 self._warn_unknown_tag(key)
177 return d
179 def _read_design_well(self, node):
180 d = {}
181 for child in node:
182 key = self._get_tag(child)
183 if key in [
184 "designWellId",
185 "wellFunction",
186 "height",
187 "maximumWellDepth",
188 "maximumWellCapacity",
189 "relativeTemperature",
190 ]:
191 to_int = ["designWellId"]
192 to_float = ["height", "maximumWellDepth", "maximumWellCapacity"]
193 d[key] = self._parse_text(child, key, to_int=to_int, to_float=to_float)
194 elif key == "geometry":
195 d[key] = self._read_geometry(child)
196 elif key == "designScreen":
197 for grandchild in child:
198 key = self._get_tag(grandchild)
199 if key in ["screenType", "designScreenTop", "designScreenBottom"]:
200 to_float = ["designScreenTop", "designScreenBottom"]
201 d[key] = self._parse_text(child, key, to_float=to_float)
202 elif key == "lifespan":
203 self._read_lifespan(child, d)
204 else:
205 self._warn_unknown_tag(key)
206 return d
208 def _read_realised_installation(self, node):
209 d = {}
210 for child in node:
211 key = self._get_tag(child)
212 if key in ["realisedInstallationId", "installationFunction"]:
213 to_int = ["realisedInstallationId"]
214 d[key] = self._parse_text(child, key, to_int=to_int)
215 elif key == "geometry":
216 d[key] = self._read_geometry(child)
217 elif key in "validityPeriod":
218 self._read_validity_period(child, d=d)
219 elif key in "lifespan":
220 self._read_lifespan(child, d=d)
221 elif key == "realisedLoop":
222 if not hasattr(self, key):
223 self.realisedLoop = []
224 if self._check_single_child_with_tag(child, "RealisedLoop"):
225 child = child[0]
226 self.realisedLoop.append(self._read_realised_loop(child))
227 elif key == "realisedWell":
228 if not hasattr(self, key):
229 self.realisedWell = []
230 if self._check_single_child_with_tag(child, "RealisedWell"):
231 child = child[0]
232 self.realisedWell.append(self._read_realised_well(child))
233 else:
234 self._warn_unknown_tag(key)
235 return d
237 def _read_realised_loop(self, node):
238 d = {}
239 for child in node:
240 key = self._get_tag(child)
241 if key in ["realisedLoopId", "loopType", "loopDepth"]:
242 to_float = ["loopDepth"]
243 to_int = ["realisedLoopId"]
244 d[key] = self._parse_text(child, key, to_float=to_float, to_int=to_int)
245 elif key == "geometry":
246 d[key] = self._read_geometry(child)
247 elif key == "lifespan":
248 self._read_lifespan(child, d)
249 else:
250 self._warn_unknown_tag(key)
251 return d
253 def _read_realised_well(self, node):
254 d = {}
255 for child in node:
256 key = self._get_tag(child)
257 if key in [
258 "realisedWellId",
259 "wellFunction",
260 "height",
261 "wellDepth",
262 "relativeTemperature",
263 ]:
264 to_float = ["height", "wellDepth"]
265 to_int = ["realisedLoopId"]
266 d[key] = self._parse_text(child, key, to_float=to_float, to_int=to_int)
267 elif key == "geometry":
268 d[key] = self._read_geometry(child)
269 elif key == "validityPeriod":
270 self._read_validity_period(child, d=d)
271 elif key == "lifespan":
272 self._read_lifespan(child, d)
273 elif key == "realisedScreen":
274 for grandchild in child:
275 key = self._get_tag(grandchild)
276 if key in [
277 "realisedScreenId",
278 "screenType",
279 "topScreenDepth",
280 "length",
281 # "relativeTemperature",
282 ]:
283 if key == "realisedScreenId" and key in d:
284 self._raise_assumed_single("realisedScreenId")
285 to_int = ["realisedScreenId"]
286 to_float = ["topScreenDepth", "length"]
287 d[key] = self._parse_text(child, key, to_float=to_float)
288 elif key == "validityPeriod":
289 self._read_validity_period(child, d=d)
290 elif key == "lifespan":
291 self._read_lifespan(grandchild, d)
293 else:
294 self._warn_unknown_tag(key)
295 return d
297 def _read_geometry(self, node):
298 assert len(node) == 1
299 ns = {
300 "gml": "http://www.opengis.net/gml/3.2",
301 "gufcommon": "http://www.broservices.nl/xsd/gufcommon/1.0",
302 }
303 point_or_curve_or_surface = node.find("gufcommon:PointOrCurveOrSurface", ns)
304 if point_or_curve_or_surface is not None:
305 node = point_or_curve_or_surface
306 return super()._read_geometry(node)
309cl = GroundwaterUtilisationFacility
311get_bro_ids_of_bronhouder = partial(bro._get_bro_ids_of_bronhouder, cl)
312get_bro_ids_of_bronhouder.__doc__ = bro._get_bro_ids_of_bronhouder.__doc__
314get_data_for_bro_ids = partial(bro._get_data_for_bro_ids, cl)
315get_data_for_bro_ids.__doc__ = bro._get_data_for_bro_ids.__doc__
317get_characteristics = partial(bro._get_characteristics, cl)
318get_characteristics.__doc__ = bro._get_characteristics.__doc__
320get_data_in_extent = partial(bro._get_data_in_extent, cl)
321get_data_in_extent.__doc__ = bro._get_data_in_extent.__doc__