Coverage for brodata / sad.py: 90%

264 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-20 14:37 +0000

1import pandas as pd 

2import geopandas as gpd 

3from functools import partial 

4from . import bro 

5 

6 

7class SiteAssessmentData(bro.FileOrUrl): 

8 """Class to represent a Site Assessment Data (SAD) from the BRO.""" 

9 

10 _rest_url = "https://publiek.broservices.nl/sq/sad/v1" 

11 _xmlns = "http://www.broservices.nl/xsd/dssad-internal/1.1" 

12 _char = "SAD_C" 

13 

14 def _read_contents(self, tree): 

15 ns = { 

16 "brocom": "http://www.broservices.nl/xsd/brocommon/3.0", 

17 "gml": "http://www.opengis.net/gml/3.2", 

18 "sadcommon": "http://www.broservices.nl/xsd/sadcommon-internal/1.1", 

19 "xmlns": self._xmlns, 

20 } 

21 sad = self._get_main_object(tree, "SAD_O", ns) 

22 

23 for key in sad.attrib: 

24 setattr(self, key.split("}", 1)[1], sad.attrib[key]) 

25 for child in sad: 

26 key = self._get_tag(child) 

27 if len(child) == 0: 

28 setattr(self, key, child.text) 

29 elif key == "geometry": 

30 setattr(self, key, self._read_geometry(child)) 

31 elif key in ["registrationHistory"]: 

32 self._read_children_of_children(child) 

33 elif key == "standardizedLocation": 

34 self._read_standardized_location(child) 

35 elif key == "report": 

36 if hasattr(self, key): 

37 self._raise_assumed_single(key) 

38 self.report = {} 

39 self._read_children_of_children(child, d=self.report) 

40 elif key == "measurementPoint": 

41 if not hasattr(self, key): 

42 self.measurementPoint = [] 

43 for grandchild in child: 

44 key = self._get_tag(grandchild) 

45 if key == "MeasurementPoint": 

46 mp = self._read_measurement_point(grandchild) 

47 self.measurementPoint.append(mp) 

48 else: 

49 self.warn_unknown_tag(key) 

50 elif key == "mixedSampleAnalysis": 

51 if not hasattr(self, key): 

52 self.mixedSampleAnalysis = [] 

53 for grandchild in child: 

54 key = self._get_tag(grandchild) 

55 if key == "MixedSampleAnalysis": 

56 msa = self._read_mixed_sample_analysis(grandchild) 

57 self.mixedSampleAnalysis.append(msa) 

58 else: 

59 self.warn_unknown_tag(key) 

60 else: 

61 self._warn_unknown_tag(key) 

62 

63 if hasattr(self, "measurementPoint"): 

64 self.measurementPoint = pd.DataFrame(self.measurementPoint) 

65 if "deliveredLocation" in self.measurementPoint.columns: 

66 self.measurementPoint = gpd.GeoDataFrame( 

67 self.measurementPoint, geometry="deliveredLocation" 

68 ) 

69 if "name" in self.measurementPoint.columns: 

70 self.measurementPoint.set_index("name", inplace=True) 

71 if hasattr(self, "mixedSampleAnalysis"): 

72 self.mixedSampleAnalysis = pd.DataFrame(self.mixedSampleAnalysis) 

73 

74 def _read_mixed_sample_analysis(self, node): 

75 d = {} 

76 for child in node: 

77 key = self._get_tag(child) 

78 if key in ["identification", "name", "beginDepth", "endDepth"]: 

79 d[key] = self._parse_text(child, key) 

80 elif key == "analysis": 

81 for grandchild in child: 

82 key2 = self._get_tag(grandchild) 

83 if key2 == "Analysis": 

84 if key not in d: 

85 d[key] = [] 

86 d[key].append(self._read_analysis(grandchild)) 

87 else: 

88 self._warn_unknown_tag(key) 

89 elif key == "soilSampling": 

90 if key not in d: 

91 d[key] = [] 

92 if len(child) == 0: 

93 ss = {} 

94 for attrib in child.attrib: 

95 key2 = attrib.split("}", 1)[1] 

96 ss[key2] = child.attrib[attrib] 

97 d[key].append(ss) 

98 else: 

99 for grandchild in child: 

100 key2 = self._get_tag(grandchild) 

101 if key2 == "SoilSampling": 

102 d[key].append(self._read_soil_sampling(grandchild)) 

103 else: 

104 self._warn_unknown_tag(key) 

105 

106 else: 

107 self._warn_unknown_tag(key) 

108 if "analysis" in d: 

109 d["analysis"] = pd.DataFrame(d["analysis"]) 

110 if "soilSampling" in d: 

111 d["soilSampling"] = pd.DataFrame(d["soilSampling"]) 

112 return d 

113 

114 def _read_measurement_point(self, node): 

115 d = {} 

116 for child in node: 

117 key = self._get_tag(child) 

118 if key in ["identification", "name", "date", "finalDepth", "type"]: 

119 d[key] = self._parse_text(child, key, to_float=["finalDepth"]) 

120 elif key == "deliveredLocation": 

121 d[key] = self._read_geometry(child) 

122 elif key == "deliveredVerticalPosition": 

123 self._read_delivered_vertical_position(child, d=d) 

124 elif key == "boreholeSampleDescription": 

125 if self._check_single_child_with_tag( 

126 child, "BoreholeSampleDescription" 

127 ): 

128 child = child[0] 

129 self._read_borehole_sample_description(child, d) 

130 elif key == "soilSampling": 

131 if key not in d: 

132 d[key] = [] 

133 if self._check_single_child_with_tag(child, "SoilSampling"): 

134 child = child[0] 

135 d[key].append(self._read_soil_sampling(child)) 

136 elif key == "filter": 

137 if key not in d: 

138 d[key] = [] 

139 if self._check_single_child_with_tag(child, "Filter"): 

140 child = child[0] 

141 d[key].append(self._read_filter(child)) 

142 else: 

143 self._warn_unknown_tag(key) 

144 

145 if "soilSampling" in d: 

146 d["soilSampling"] = pd.DataFrame(d["soilSampling"]) 

147 

148 if "filter" in d: 

149 d["filter"] = pd.DataFrame(d["filter"]) 

150 

151 return d 

152 

153 def _read_filter(self, node): 

154 d = {} 

155 for child in node: 

156 key = self._get_tag(child) 

157 if key in ["identification", "name", "upperBoundary", "lowerBoundary"]: 

158 d[key] = self._parse_text( 

159 child, key, to_float=["upperBoundary", "lowerBoundary"] 

160 ) 

161 elif key == "deliveredVerticalPosition": 

162 self._read_delivered_vertical_position(child, d=d) 

163 elif key == "groundwaterSampling": 

164 for grandchild in child: 

165 if key not in d: 

166 d[key] = [] 

167 key2 = self._get_tag(grandchild) 

168 if key2 == "GroundwaterSampling": 

169 gs = self._read_groundwater_sampling(grandchild) 

170 d[key].append(gs) 

171 else: 

172 self._warn_unknown_tag(key2) 

173 else: 

174 self._warn_unknown_tag(key) 

175 if "groundwaterSampling" in d: 

176 d["groundwaterSampling"] = pd.DataFrame(d["groundwaterSampling"]) 

177 # Flatten groundwaterSampleAnalysis from each sampling into a per-filter DataFrame 

178 analyses = [] 

179 for _, samp in d["groundwaterSampling"].iterrows(): 

180 samp_ident = samp.get("identification", None) 

181 samp_name = samp.get("name", None) 

182 samp_date = samp.get("date", None) 

183 if "groundwaterSampleAnalysis" in samp and isinstance( 

184 samp["groundwaterSampleAnalysis"], pd.DataFrame 

185 ): 

186 gsa_df = samp["groundwaterSampleAnalysis"] 

187 for _, gsa in gsa_df.iterrows(): 

188 gsa_ident = gsa.get("identification", None) 

189 gsa_name = gsa.get("name", None) 

190 if "analysis" in gsa and isinstance( 

191 gsa["analysis"], pd.DataFrame 

192 ): 

193 for _, row in gsa["analysis"].iterrows(): 

194 rowd = dict(row) 

195 # keep reference to the sampling and the sample analysis id 

196 rowd["groundwaterSampling_identification"] = samp_ident 

197 rowd["groundwaterSampling_name"] = samp_name 

198 rowd["groundwaterSampling_date"] = samp_date 

199 rowd["groundwaterSampleAnalysis_identification"] = ( 

200 gsa_ident 

201 ) 

202 rowd["groundwaterSampleAnalysis_name"] = gsa_name 

203 analyses.append(rowd) 

204 if len(analyses) > 0: 

205 d["groundwaterSampleAnalysis"] = pd.DataFrame(analyses) 

206 else: 

207 d["groundwaterSampleAnalysis"] = pd.DataFrame() 

208 # remove analysis results from groundwaterSampling to avoid duplication 

209 if "groundwaterSampleAnalysis" in d["groundwaterSampling"].columns: 

210 d["groundwaterSampling"] = d["groundwaterSampling"].drop( 

211 columns=["groundwaterSampleAnalysis"] 

212 ) 

213 return d 

214 

215 def _read_groundwater_sampling(self, node): 

216 d = {} 

217 for child in node: 

218 key = self._get_tag(child) 

219 if key in ["identification", "name", "date"]: 

220 d[key] = self._parse_text(child, key) 

221 elif key == "groundwaterSampleAnalysis": 

222 for grandchild in child: 

223 if key not in d: 

224 d[key] = [] 

225 key2 = self._get_tag(grandchild) 

226 if key2 == "GroundwaterSampleAnalysis": 

227 gsa = self._read_groundwater_sample_analysis(grandchild) 

228 d[key].append(gsa) 

229 else: 

230 self._warn_unknown_tag(key2) 

231 else: 

232 self._warn_unknown_tag(key) 

233 if "groundwaterSampleAnalysis" in d: 

234 d["groundwaterSampleAnalysis"] = pd.DataFrame( 

235 d["groundwaterSampleAnalysis"] 

236 ) 

237 return d 

238 

239 def _read_groundwater_sample_analysis(self, node): 

240 d = {} 

241 for child in node: 

242 key = self._get_tag(child) 

243 if key in ["identification", "name"]: 

244 d[key] = self._parse_text(child, key) 

245 elif key == "analysis": 

246 for grandchild in child: 

247 key2 = self._get_tag(grandchild) 

248 if key2 == "Analysis": 

249 if key not in d: 

250 d[key] = [] 

251 d[key].append(self._read_analysis(grandchild)) 

252 else: 

253 self._warn_unknown_tag(key2) 

254 else: 

255 self._warn_unknown_tag(key) 

256 if "analysis" in d: 

257 d["analysis"] = pd.DataFrame(d["analysis"]) 

258 return d 

259 

260 def _read_borehole_sample_description(self, node, d): 

261 for child in node: 

262 key = self._get_tag(child) 

263 if key == "descriptiveBoreholeLog": 

264 for grandchild in child: 

265 key = self._get_tag(grandchild) 

266 if key == "DescriptiveBoreholeLog": 

267 if key in d: 

268 self.raise_assumed_single(key) 

269 d[key] = self._read_descriptive_borehole_log(grandchild) 

270 else: 

271 self._warn_unknown_tag(key) 

272 elif key == "descriptionProcedure": 

273 d[key] = child.text 

274 else: 

275 self._warn_unknown_tag(key) 

276 

277 def _read_soil_sampling(self, node): 

278 d = {} 

279 for child in node: 

280 key = self._get_tag(child) 

281 if key in ["identification", "name", "beginDepth", "endDepth", "date"]: 

282 d[key] = self._parse_text(child, key) 

283 elif key == "soilSampleAnalysis": 

284 for grandchild in child: 

285 key = self._get_tag(grandchild) 

286 if key == "SoilSampleAnalysis": 

287 self._read_soil_sample_analysis(grandchild, d) 

288 else: 

289 self._warn_unknown_tag(key) 

290 else: 

291 self._warn_unknown_tag(key) 

292 

293 def _read_soil_sample_analysis(self, node, d): 

294 for child in node: 

295 key = self._get_tag(child) 

296 if key in ["identification", "name"]: 

297 d[key] = self._parse_text(child, key) 

298 elif key == "analysis": 

299 for grandchild in child: 

300 key = self._get_tag(grandchild) 

301 if key == "Analysis": 

302 if "analysis" not in d: 

303 d["analysis"] = [] 

304 d["analysis"].append(self._read_analysis(grandchild)) 

305 else: 

306 self._warn_unknown_tag(key) 

307 else: 

308 self._warn_unknown_tag(key) 

309 if "analysis" in d: 

310 d["analysis"] = pd.DataFrame(d["analysis"]) 

311 

312 def _read_analysis(self, node): 

313 d = {} 

314 for child in node: 

315 key = self._get_tag(child) 

316 if key in [ 

317 "identification", 

318 "quantity", 

319 "parameter", 

320 "analysisMeasurementValue", 

321 "condition", 

322 "limitSymbol", 

323 ]: 

324 to_float = ["analysisMeasurementValue"] 

325 d[key] = self._parse_text(child, key, to_float=to_float) 

326 else: 

327 self._warn_unknown_tag(key) 

328 return d 

329 

330 

331cl = SiteAssessmentData 

332 

333get_bro_ids_of_bronhouder = partial(bro._get_bro_ids_of_bronhouder, cl) 

334get_bro_ids_of_bronhouder.__doc__ = bro._get_bro_ids_of_bronhouder.__doc__ 

335 

336get_data_for_bro_ids = partial(bro._get_data_for_bro_ids, cl) 

337get_data_for_bro_ids.__doc__ = bro._get_data_for_bro_ids.__doc__ 

338 

339get_characteristics = partial(bro._get_characteristics, cl) 

340get_characteristics.__doc__ = bro._get_characteristics.__doc__ 

341 

342get_data_in_extent = partial(bro._get_data_in_extent, cl) 

343get_data_in_extent.__doc__ = bro._get_data_in_extent.__doc__