Coverage for brodata / guf.py: 68%

240 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-20 14:37 +0000

1import logging 

2from functools import partial 

3 

4import pandas as pd 

5 

6from . import bro 

7 

8logger = logging.getLogger(__name__) 

9 

10 

11class GroundwaterUtilisationFacility(bro.FileOrUrl): 

12 """Class to represent a Groundwater Utilisation Facility (GUF) from the BRO. 

13 

14 Attributes 

15 ---------- 

16 broId : str 

17 The BRO identifier of the GroundwaterUtilisationFacility object. 

18 objectHistory : pd.DataFrame 

19 DataFrame with the history of changes to the GUF object. 

20 licence : dict 

21 Dictionary with information about the groundwater usage licence. 

22 realisedInstallation : dict 

23 Dictionary with information about the realised installation. 

24 """ 

25 

26 _rest_url = "https://publiek.broservices.nl/gu/guf/v1" 

27 _xmlns = "http://www.broservices.nl/xsd/dsguf/1.0" 

28 _char = "GUF_C" 

29 _namespace = { 

30 "brocom": "http://www.broservices.nl/xsd/brocommon/3.0", 

31 "gml": "http://www.opengis.net/gml/3.2", 

32 "gufcommon": "http://www.broservices.nl/xsd/gufcommon/1.0", 

33 "xmlns": _xmlns, 

34 } 

35 

36 def _read_contents(self, tree): 

37 ns = self._namespace 

38 guf = self._get_main_object(tree, ["GUF_PO", "GUF_PPO"], ns) 

39 for key in guf.attrib: 

40 setattr(self, key.split("}", 1)[1], guf.attrib[key]) 

41 for child in guf: 

42 key = self._get_tag(child) 

43 if len(child) == 0: 

44 setattr(self, key, child.text) 

45 elif key == "standardizedLocation": 

46 self._read_standardized_location(child) 

47 elif key in ["registrationHistory"]: 

48 self._read_children_of_children(child) 

49 elif key == "validityPeriod": 

50 self._read_validity_period(child) 

51 elif key == "lifespan": 

52 self._read_lifespan(child) 

53 elif key == "objectHistory": 

54 objectHistory = [] 

55 for event in child: 

56 d = {} 

57 for grandchild in event: 

58 key = self._get_tag(grandchild) 

59 if key == "date": 

60 d[key] = self._read_date(grandchild) 

61 else: 

62 d[key] = grandchild.text 

63 objectHistory.append(d) 

64 setattr(self, "objectHistory", pd.DataFrame(objectHistory)) 

65 elif key == "licence": 

66 for grandchild in child: 

67 key = self._get_tag(grandchild) 

68 if key == "LicenceGroundwaterUsage": 

69 if hasattr(self, "licence"): 

70 self._raise_assumed_single("licence") 

71 setattr( 

72 self, 

73 "licence", 

74 self._read_licence_groundwater_usage(grandchild), 

75 ) 

76 else: 

77 self._warn_unknown_tag(key) 

78 elif key == "realisedInstallation": 

79 if not hasattr(self, key): 

80 self.realisedInstallation = [] 

81 if self._check_single_child_with_tag(child, "RealisedInstallation"): 

82 child = child[0] 

83 ri = self._read_realised_installation(child) 

84 self.realisedInstallation.append(ri) 

85 else: 

86 self._warn_unknown_tag(key) 

87 if hasattr(self, "designLoop"): 

88 self.designLoop = pd.DataFrame(self.designLoop) 

89 if hasattr(self, "designWell"): 

90 self.designWell = pd.DataFrame(self.designWell) 

91 if hasattr(self, "realisedLoop"): 

92 self.realisedLoop = pd.DataFrame(self.realisedLoop) 

93 if hasattr(self, "realisedWell"): 

94 self.realisedWell = pd.DataFrame(self.realisedWell) 

95 if hasattr(self, "realisedInstallation"): 

96 self.realisedInstallation = pd.DataFrame(self.realisedInstallation) 

97 if hasattr(self, "licensedQuantity"): 

98 self.licensedQuantity = pd.DataFrame(self.licensedQuantity) 

99 if hasattr(self, "designInstallation"): 

100 self.designInstallation = pd.DataFrame(self.designInstallation) 

101 

102 def _read_licence_groundwater_usage(self, node): 

103 d = {} 

104 for child in node: 

105 key = self._get_tag(child) 

106 if key in ["identificationLicence", "legalType"]: 

107 d[key] = child.text 

108 elif key == "usageTypeFacility": 

109 self._read_children_of_children(child, d) 

110 elif key == "lifespan": 

111 self._read_lifespan(child, d) 

112 elif key == "designInstallation": 

113 if not hasattr(self, key): 

114 self.designInstallation = [] 

115 if self._check_single_child_with_tag(child, "DesignInstallation"): 

116 child = child[0] 

117 di = self._read_design_installation(child) 

118 self.designInstallation.append(di) 

119 elif key == "licensedQuantity": 

120 if not hasattr(self, key): 

121 self.licensedQuantity = [] 

122 lq = {} 

123 self._read_children_of_children(child, d=lq) 

124 self.licensedQuantity.append(lq) 

125 else: 

126 self._warn_unknown_tag(key) 

127 return d 

128 

129 def _read_design_installation(self, node): 

130 d = {} 

131 for child in node: 

132 key = self._get_tag(child) 

133 if key in ["designInstallationId", "installationFunction"]: 

134 to_int = ["designInstallationId"] 

135 d[key] = self._parse_text(child, key, to_int=to_int) 

136 elif key == "geometry": 

137 d[key] = self._read_geometry(child) 

138 elif key in ["energyCharacteristics", "lifespan"]: 

139 for grandchild in child: 

140 key = self._get_tag(grandchild) 

141 to_float = [ 

142 "energyCold", 

143 "energyWarm", 

144 "maximumInfiltrationTemperatureWarm", 

145 "power", 

146 ] 

147 d[key] = self._parse_text(grandchild, key, to_float=to_float) 

148 elif key == "designLoop": 

149 if not hasattr(self, key): 

150 self.designLoop = [] 

151 if self._check_single_child_with_tag(child, "DesignLoop"): 

152 child = child[0] 

153 self.designLoop.append(self._read_design_loop(child)) 

154 elif key == "designWell": 

155 if not hasattr(self, key): 

156 self.designWell = [] 

157 if self._check_single_child_with_tag(child, "DesignWell"): 

158 child = child[0] 

159 self.designWell.append(self._read_design_well(child)) 

160 else: 

161 self._warn_unknown_tag(key) 

162 return d 

163 

164 def _read_design_loop(self, node): 

165 d = {} 

166 for child in node: 

167 key = self._get_tag(child) 

168 if key in ["designLoopId", "loopType"]: 

169 to_int = ["designLoopId"] 

170 d[key] = self._parse_text(child, key, to_int=to_int) 

171 elif key == "geometry": 

172 d[key] = self._read_geometry(child) 

173 elif key == "lifespan": 

174 self._read_lifespan(child, d) 

175 else: 

176 self._warn_unknown_tag(key) 

177 return d 

178 

179 def _read_design_well(self, node): 

180 d = {} 

181 for child in node: 

182 key = self._get_tag(child) 

183 if key in [ 

184 "designWellId", 

185 "wellFunction", 

186 "height", 

187 "maximumWellDepth", 

188 "maximumWellCapacity", 

189 "relativeTemperature", 

190 ]: 

191 to_int = ["designWellId"] 

192 to_float = ["height", "maximumWellDepth", "maximumWellCapacity"] 

193 d[key] = self._parse_text(child, key, to_int=to_int, to_float=to_float) 

194 elif key == "geometry": 

195 d[key] = self._read_geometry(child) 

196 elif key == "designScreen": 

197 for grandchild in child: 

198 key = self._get_tag(grandchild) 

199 if key in ["screenType", "designScreenTop", "designScreenBottom"]: 

200 to_float = ["designScreenTop", "designScreenBottom"] 

201 d[key] = self._parse_text(child, key, to_float=to_float) 

202 elif key == "lifespan": 

203 self._read_lifespan(child, d) 

204 else: 

205 self._warn_unknown_tag(key) 

206 return d 

207 

208 def _read_realised_installation(self, node): 

209 d = {} 

210 for child in node: 

211 key = self._get_tag(child) 

212 if key in ["realisedInstallationId", "installationFunction"]: 

213 to_int = ["realisedInstallationId"] 

214 d[key] = self._parse_text(child, key, to_int=to_int) 

215 elif key == "geometry": 

216 d[key] = self._read_geometry(child) 

217 elif key in "validityPeriod": 

218 self._read_validity_period(child, d=d) 

219 elif key in "lifespan": 

220 self._read_lifespan(child, d=d) 

221 elif key == "realisedLoop": 

222 if not hasattr(self, key): 

223 self.realisedLoop = [] 

224 if self._check_single_child_with_tag(child, "RealisedLoop"): 

225 child = child[0] 

226 self.realisedLoop.append(self._read_realised_loop(child)) 

227 elif key == "realisedWell": 

228 if not hasattr(self, key): 

229 self.realisedWell = [] 

230 if self._check_single_child_with_tag(child, "RealisedWell"): 

231 child = child[0] 

232 self.realisedWell.append(self._read_realised_well(child)) 

233 else: 

234 self._warn_unknown_tag(key) 

235 return d 

236 

237 def _read_realised_loop(self, node): 

238 d = {} 

239 for child in node: 

240 key = self._get_tag(child) 

241 if key in ["realisedLoopId", "loopType", "loopDepth"]: 

242 to_float = ["loopDepth"] 

243 to_int = ["realisedLoopId"] 

244 d[key] = self._parse_text(child, key, to_float=to_float, to_int=to_int) 

245 elif key == "geometry": 

246 d[key] = self._read_geometry(child) 

247 elif key == "lifespan": 

248 self._read_lifespan(child, d) 

249 else: 

250 self._warn_unknown_tag(key) 

251 return d 

252 

253 def _read_realised_well(self, node): 

254 d = {} 

255 for child in node: 

256 key = self._get_tag(child) 

257 if key in [ 

258 "realisedWellId", 

259 "wellFunction", 

260 "height", 

261 "wellDepth", 

262 "relativeTemperature", 

263 ]: 

264 to_float = ["height", "wellDepth"] 

265 to_int = ["realisedLoopId"] 

266 d[key] = self._parse_text(child, key, to_float=to_float, to_int=to_int) 

267 elif key == "geometry": 

268 d[key] = self._read_geometry(child) 

269 elif key == "validityPeriod": 

270 self._read_validity_period(child, d=d) 

271 elif key == "lifespan": 

272 self._read_lifespan(child, d) 

273 elif key == "realisedScreen": 

274 for grandchild in child: 

275 key = self._get_tag(grandchild) 

276 if key in [ 

277 "realisedScreenId", 

278 "screenType", 

279 "topScreenDepth", 

280 "length", 

281 # "relativeTemperature", 

282 ]: 

283 if key == "realisedScreenId" and key in d: 

284 self._raise_assumed_single("realisedScreenId") 

285 to_int = ["realisedScreenId"] 

286 to_float = ["topScreenDepth", "length"] 

287 d[key] = self._parse_text(child, key, to_float=to_float) 

288 elif key == "validityPeriod": 

289 self._read_validity_period(child, d=d) 

290 elif key == "lifespan": 

291 self._read_lifespan(grandchild, d) 

292 

293 else: 

294 self._warn_unknown_tag(key) 

295 return d 

296 

297 def _read_geometry(self, node): 

298 assert len(node) == 1 

299 ns = { 

300 "gml": "http://www.opengis.net/gml/3.2", 

301 "gufcommon": "http://www.broservices.nl/xsd/gufcommon/1.0", 

302 } 

303 point_or_curve_or_surface = node.find("gufcommon:PointOrCurveOrSurface", ns) 

304 if point_or_curve_or_surface is not None: 

305 node = point_or_curve_or_surface 

306 return super()._read_geometry(node) 

307 

308 

309cl = GroundwaterUtilisationFacility 

310 

311get_bro_ids_of_bronhouder = partial(bro._get_bro_ids_of_bronhouder, cl) 

312get_bro_ids_of_bronhouder.__doc__ = bro._get_bro_ids_of_bronhouder.__doc__ 

313 

314get_data_for_bro_ids = partial(bro._get_data_for_bro_ids, cl) 

315get_data_for_bro_ids.__doc__ = bro._get_data_for_bro_ids.__doc__ 

316 

317get_characteristics = partial(bro._get_characteristics, cl) 

318get_characteristics.__doc__ = bro._get_characteristics.__doc__ 

319 

320get_data_in_extent = partial(bro._get_data_in_extent, cl) 

321get_data_in_extent.__doc__ = bro._get_data_in_extent.__doc__