Coverage for brodata / sfr.py: 66%

279 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-20 14:37 +0000

1import logging 

2from functools import partial 

3import pandas as pd 

4 

5from . import bro 

6 

7logger = logging.getLogger(__name__) 

8 

9 

10class SoilFaceResearch(bro.FileOrUrl): 

11 """Class to represent a Soil Face Research (SFR) from the BRO. 

12 

13 The configuration of the xml-file can be found at 

14 https://www.bro-productomgeving.nl/__attachments/1607470159/DO_ResponseSFR_O_DP.xml?inst-v=a371ba4a-5fb1-479c-99cb-44e36b9c21f9 

15 (link found at https://www.bro-productomgeving.nl/bpo/latest/uitgifte-voorbeeldberichten-sfr) 

16 """ 

17 

18 _rest_url = "https://publiek.broservices.nl/sr/sfr/v2" 

19 _xmlns = "http://www.broservices.nl/xsd/dssfr/2.0" 

20 _char = "SFR_C" 

21 

22 def _read_contents(self, tree): 

23 ns = { 

24 "brocom": "http://www.broservices.nl/xsd/brocommon/3.0", 

25 "gml": "http://www.opengis.net/gml/3.2", 

26 "sfrcom": "http://www.broservices.nl/xsd/sfrcommon/2.0", 

27 "xmlns": self._xmlns, 

28 } 

29 sfr = self._get_main_object(tree, "SFR_O", ns) 

30 for key in sfr.attrib: 

31 setattr(self, key.split("}", 1)[1], sfr.attrib[key]) 

32 for child in sfr: 

33 key = self._get_tag(child) 

34 if len(child) == 0: 

35 setattr(self, key, child.text) 

36 elif key == "deliveredLocation": 

37 self._read_delivered_location(child) 

38 elif key == "deliveredVerticalPosition": 

39 to_float = ["offset"] 

40 self._read_children_of_children(child, to_float=to_float) 

41 elif key == "standardizedLocation": 

42 self._read_standardized_location(child) 

43 elif key in ["researchReportDate", "fieldworkDate"]: 

44 setattr(self, key, self._read_date(child)) 

45 elif key in ["registrationHistory", "reportHistory"]: 

46 self._read_children_of_children(child) 

47 elif key in ["researchOperator"]: 

48 setattr(self, key, self._read_operator(child)) 

49 elif key == "siteCharacteristic": 

50 if self._check_single_child_with_tag(child, "SiteCharacteristic"): 

51 child = child[0] 

52 self._read_children_of_children(child) 

53 elif key == "soilUncovering": 

54 if self._check_single_child_with_tag(child, "SoilUncovering"): 

55 child = child[0] 

56 self._read_children_of_children(child) 

57 elif key == "soilFaceDescription": 

58 if self._check_single_child_with_tag(child, "SoilFaceDescription"): 

59 child = child[0] 

60 self._read_soil_face_description(child) 

61 elif key == "soilFaceSampleAnalysis": 

62 if self._check_single_child_with_tag(child, "SoilFaceSampleAnalysis"): 

63 child = child[0] 

64 self._read_soil_face_sample_analysis(child) 

65 else: 

66 self._warn_unknown_tag(key) 

67 

68 if hasattr(self, "litterLayer"): 

69 self.litterLayer = pd.DataFrame(self.litterLayer) 

70 if hasattr(self, "soilLayer"): 

71 self.soilLayer = pd.DataFrame(self.soilLayer) 

72 if hasattr(self, "disturbedInterval"): 

73 self.disturbedInterval = pd.DataFrame(self.disturbedInterval) 

74 if hasattr(self, "investigatedInterval"): 

75 self.investigatedInterval = pd.DataFrame(self.investigatedInterval) 

76 

77 def _read_soil_face_sample_analysis(self, node): 

78 for child in node: 

79 key = self._get_tag(child) 

80 if key == "analysisReportDate": 

81 setattr(self, key, self._read_date(child)) 

82 elif key in ["analysisType"]: 

83 setattr(self, key, child.text) 

84 elif key in ["analysisOperator"]: 

85 setattr(self, key, self._read_operator(child)) 

86 elif key == "investigatedInterval": 

87 for grandchild in child: 

88 key = self._get_tag(grandchild) 

89 if key == "InvestigatedInterval": 

90 d = self._read_investigated_interval(grandchild) 

91 if hasattr(self, "investigatedInterval"): 

92 self.investigatedInterval.append(d) 

93 else: 

94 self.investigatedInterval = [d] 

95 else: 

96 self._warn_unknown_tag(key) 

97 elif key in []: 

98 setattr(self, key, float(child.text)) 

99 else: 

100 self._warn_unknown_tag(key) 

101 

102 def _read_investigated_interval(self, node): 

103 d = {} 

104 for child in node: 

105 key = self._get_tag(child) 

106 if key in ["beginDepth", "endDepth"]: 

107 d[key] = float(child.text) 

108 elif key in ["characteristicModelled"]: 

109 d[key] = child.text 

110 elif key == "pHDetermination": 

111 for grandchild in child: 

112 key = self._get_tag(grandchild) 

113 if key == "PHDetermination": 

114 self._read_ph_determination(grandchild, d) 

115 else: 

116 self._warn_unknown_tag(key) 

117 elif key == "particleSizeDistributionDetermination": 

118 for grandchild in child: 

119 key = self._get_tag(grandchild) 

120 if key == "ParticleSizeDistributionDetermination": 

121 self._read_particle_size_distribution_determination( 

122 grandchild, d 

123 ) 

124 else: 

125 self._warn_unknown_tag(key) 

126 elif key == "shrinkageDetermination": 

127 for grandchild in child: 

128 key = self._get_tag(grandchild) 

129 if key == "ShrinkageDetermination": 

130 self._read_shrinkage_determination(grandchild, d=d) 

131 else: 

132 self._warn_unknown_tag(key) 

133 else: 

134 self._warn_unknown_tag(key) 

135 return d 

136 

137 def _read_shrinkage_determination(self, node, d): 

138 for child in node: 

139 key = self._get_tag(child) 

140 if key in [ 

141 "determinationProcedure", 

142 "determinationMethod", 

143 "disturbed", 

144 ]: 

145 d[key] = child.text 

146 elif key == "shrinkage": 

147 for child in node: 

148 key = self._get_tag(child) 

149 if key == "DataArray": 

150 d["shrinkage"] = self._read_data_array(child) 

151 else: 

152 self._warn_unknown_tag(key) 

153 else: 

154 self._warn_unknown_tag(key) 

155 

156 def _read_particle_size_distribution_determination(self, node, d): 

157 for child in node: 

158 key = self._get_tag(child) 

159 if key in [ 

160 "determinationProcedure", 

161 "determinationMethod", 

162 "particleSizeDistributionStandardised", 

163 "fractionDistribution", 

164 "performanceIrregularity", 

165 "dispersionMethod", 

166 ]: 

167 d[key] = child.text 

168 elif key == "nonStandardisedFraction": 

169 d2 = {} 

170 for grandchild in child: 

171 key = self._get_tag(grandchild) 

172 if key in [ 

173 "lowerBoundary", 

174 "upperBoundary", 

175 "proportion", 

176 ]: 

177 d2[key] = float(grandchild.text) 

178 else: 

179 self._warn_unknown_tag(key) 

180 if "nonStandardisedFraction" in d: 

181 d["nonStandardisedFraction"].append(d2) 

182 else: 

183 d["nonStandardisedFraction"] = [d2] 

184 else: 

185 self._warn_unknown_tag(key) 

186 if "nonStandardisedFraction" in d: 

187 d["nonStandardisedFraction"] = pd.DataFrame(d["nonStandardisedFraction"]) 

188 

189 def _read_ph_determination(self, node, d): 

190 for child in node: 

191 key = self._get_tag(child) 

192 if key in ["determinationProcedure", "determinationMethod"]: 

193 # capitalize key but keep rest as is, and prepend pH 

194 key = f"pH{key[0].upper()}{key[1:]}" 

195 d[key] = child.text 

196 elif key == "pH": 

197 d[key] = float(child.text) 

198 else: 

199 self._warn_unknown_tag(key) 

200 

201 def _read_soil_face_description(self, node): 

202 for child in node: 

203 key = self._get_tag(child) 

204 if key == "descriptionReportDate": 

205 setattr(self, key, self._read_date(child)) 

206 elif key in [ 

207 "descriptionProcedure", 

208 "describedWidth", 

209 "artificiallyHumidified", 

210 "fractionDistributionDetermined", 

211 "lowerBoundarySandFraction", 

212 ]: 

213 setattr(self, key, child.text) 

214 elif key == "descriptionOperator": 

215 setattr(self, key, self._read_operator(child)) 

216 elif key == "soilProfile": 

217 for grandchild in child: 

218 key = self._get_tag(grandchild) 

219 if key == "SoilProfile": 

220 self._read_soil_profile(grandchild) 

221 else: 

222 self._warn_unknown_tag(key) 

223 elif key == "soilClassification": 

224 for grandchild in child: 

225 key = self._get_tag(grandchild) 

226 if key == "SoilClassification": 

227 setattr(self, key, self._read_soil_classification(grandchild)) 

228 else: 

229 self._warn_unknown_tag(key) 

230 else: 

231 self._warn_unknown_tag(key) 

232 

233 def _read_soil_classification(self, node): 

234 d = {} 

235 for child in node: 

236 key = self._get_tag(child) 

237 if key in [ 

238 "codeGroup", 

239 "classificationCode", 

240 "specialFeatureTop", 

241 "soilClass", 

242 "textureClass", 

243 "peatClass", 

244 "subsoilPeat", 

245 "lowerBoundaryPeat", 

246 "subsoilDuinVagueSoil", 

247 "textureProfile", 

248 "carbonateProfile", 

249 "reworkingClass", 

250 "groundwaterTableClass", 

251 "anomalousGroundwaterRegime", 

252 "specialFeatureSite", 

253 ]: 

254 d[key] = child.text 

255 elif key == "specialFeatureBottom": 

256 sfb = {} 

257 for grandchild in child: 

258 key = self._get_tag(grandchild) 

259 if key == "specialFeature": 

260 sfb[key] = grandchild.text 

261 elif key == "beginDepth": 

262 sfb[key] = float(grandchild.text) 

263 else: 

264 self._warn_unknown_tag(key) 

265 if "specialFeatureBottom" in d: 

266 d["specialFeatureBottom"].append(sfb) 

267 else: 

268 d["specialFeatureBottom"] = [sfb] 

269 else: 

270 self._warn_unknown_tag(key) 

271 return d 

272 

273 def _read_soil_profile(self, node): 

274 for child in node: 

275 key = self._get_tag(child) 

276 if key in [ 

277 "descriptionQuality", 

278 "rootableDepthReached", 

279 "meanHighestGroundwaterLevelReached", 

280 "horizonRepetition", 

281 "upperBoundaryShape", 

282 "sequenceDisturbed", 

283 "compactionPresent", 

284 ]: 

285 setattr(self, key, child.text) 

286 elif key in [ 

287 "rootableDepth", 

288 "meanHighestGroundwaterLevel", 

289 "meanLowestGroundwaterLevel", 

290 ]: 

291 setattr(self, key, float(child.text)) 

292 elif key in ["localPhenomenon"]: 

293 if hasattr(self, key): 

294 getattr(self, key).append(child.text) 

295 else: 

296 setattr(self, key, [child.text]) 

297 elif key == "litterLayer": 

298 for grandchild in child: 

299 key = self._get_tag(grandchild) 

300 if key == "LitterLayer": 

301 self._read_litter_layer(grandchild) 

302 else: 

303 self._warn_unknown_tag(key) 

304 elif key == "soilLayer": 

305 for grandchild in child: 

306 key = self._get_tag(grandchild) 

307 if key == "SoilLayer": 

308 self._read_soil_layer(grandchild) 

309 else: 

310 self._warn_unknown_tag(key) 

311 elif key == "disturbedInterval": 

312 d = {} 

313 self._read_children_of_children(child, d=d) 

314 if hasattr(self, "disturbedInterval"): 

315 self.disturbedInterval.append(d) 

316 else: 

317 self.disturbedInterval = [d] 

318 elif key == "compactedInterval": 

319 d = {} 

320 self._read_children_of_children(child, d=d) 

321 self.compactedInterval = d 

322 else: 

323 self._warn_unknown_tag(key) 

324 

325 def _read_litter_layer(self, node): 

326 layer = {} 

327 for child in node: 

328 key = self._get_tag(child) 

329 if key in [ 

330 "upperBoundary", 

331 "lowerBoundary", 

332 ]: 

333 layer[key] = float(child.text) 

334 elif key in [ 

335 "upperBoundaryDetermination", 

336 "lowerBoundaryDetermination", 

337 "lowerBoundaryShape", 

338 "layerDiscontinuous", 

339 "horizonCode", 

340 "litterType", 

341 "estimatedOrganicMatterContent", 

342 ]: 

343 layer[key] = child.text 

344 else: 

345 self._warn_unknown_tag(key) 

346 if hasattr(self, "litterLayer"): 

347 self.litterLayer.append(layer) 

348 else: 

349 self.litterLayer = [layer] 

350 

351 def _read_soil_layer(self, node): 

352 layer = {} 

353 for child in node: 

354 key = self._get_tag(child) 

355 if key in [ 

356 "upperBoundary", 

357 "lowerBoundary", 

358 "particleDensity", 

359 "fieldCapacity", 

360 "wiltingPoint", 

361 "saturationWaterContent", 

362 "hydraulicConductivity", 

363 "bulkDensity", 

364 "stoneContent", 

365 "clayFraction", 

366 "siltFraction", 

367 "sandFraction", 

368 ]: 

369 layer[key] = float(child.text) 

370 elif key in [ 

371 "upperBoundaryDetermination", 

372 "lowerBoundaryDetermination", 

373 "lowerBoundaryShape", 

374 "layerDiscontinuous", 

375 "anthropogenic", 

376 "mixed", 

377 "inverted", 

378 "rooted", 

379 "rootsEvenlyDistributed", 

380 "rootAbundanceClass", 

381 "slant", 

382 ]: 

383 layer[key] = child.text 

384 elif key == "soilLife": 

385 if key in layer: 

386 layer[key].append(child.text) 

387 else: 

388 layer[key] = [child.text] 

389 elif key == "homogeneousMaterial": 

390 for grandchild in child: 

391 key = self._get_tag(grandchild) 

392 if key in [ 

393 "specialMaterial", 

394 "horizonCode", 

395 "rockType", 

396 "depositionalCharacteristic", 

397 ]: 

398 layer[key] = grandchild.text 

399 elif key in ["estimatedSaturatedPermeability"]: 

400 layer[key] = float(grandchild.text) 

401 elif key == "soil": 

402 self._read_soil(grandchild, layer) 

403 elif key == "rock": 

404 self._read_rock(grandchild, layer) 

405 else: 

406 self._warn_unknown_tag(key) 

407 elif key == "layerComponent": 

408 self._read_layer_component(node, layer, layer=layer) 

409 else: 

410 self._warn_unknown_tag(key) 

411 

412 if "layerComponent" in layer: 

413 layer["layerComponent"] = pd.DataFrame(layer["layerComponent"]) 

414 if hasattr(self, "soilLayer"): 

415 self.soilLayer.append(layer) 

416 else: 

417 self.soilLayer = [layer] 

418 

419 def _read_layer_component(self, node, d=None): 

420 component = {} 

421 self._read_children_of_children(node, d=component) 

422 if "layerComponents" not in d: 

423 d["layerComponents"] = [] 

424 d["layerComponents"].append(component) 

425 

426 

427cl = SoilFaceResearch 

428 

429get_bro_ids_of_bronhouder = partial(bro._get_bro_ids_of_bronhouder, cl) 

430get_bro_ids_of_bronhouder.__doc__ = bro._get_bro_ids_of_bronhouder.__doc__ 

431 

432get_data_for_bro_ids = partial(bro._get_data_for_bro_ids, cl) 

433get_data_for_bro_ids.__doc__ = bro._get_data_for_bro_ids.__doc__ 

434 

435get_characteristics = partial(bro._get_characteristics, cl) 

436get_characteristics.__doc__ = bro._get_characteristics.__doc__ 

437 

438get_data_in_extent = partial(bro._get_data_in_extent, cl) 

439get_data_in_extent.__doc__ = bro._get_data_in_extent.__doc__