Coverage for brodata / util.py: 62%

203 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-13 12:57 +0000

1import logging 

2import os 

3import threading 

4import time 

5from collections import deque 

6from urllib.parse import urlparse 

7from zipfile import ZipFile, ZIP_DEFLATED, ZIP_STORED 

8 

9import numpy as np 

10import requests 

11 

12logger = logging.getLogger(__name__) 

13 

14try: 

15 from tqdm import tqdm 

16except ImportError: 

17 # fallback: generate a dummy method with the same interface 

18 def tqdm(iterable=None, **kwargs): 

19 return iterable if iterable is not None else [] 

20 

21 

22class _SlidingWindowRateLimiter: 

23 def __init__(self, max_requests, period_seconds=1.0): 

24 self.max_requests = max_requests 

25 self.period_seconds = period_seconds 

26 self._timestamps = deque() 

27 self._lock = threading.Lock() 

28 

29 def wait_for_slot(self): 

30 while True: 

31 with self._lock: 

32 now = time.monotonic() 

33 window_start = now - self.period_seconds 

34 while self._timestamps and self._timestamps[0] <= window_start: 

35 self._timestamps.popleft() 

36 

37 if len(self._timestamps) < self.max_requests: 

38 self._timestamps.append(now) 

39 return 

40 

41 wait_seconds = self.period_seconds - (now - self._timestamps[0]) 

42 

43 if wait_seconds > 0: 

44 time.sleep(wait_seconds) 

45 

46 

47_BRO_HOST = "publiek.broservices.nl" 

48_BRO_RATE_LIMITER = _SlidingWindowRateLimiter(max_requests=5) 

49_GLD_RATE_LIMITER = _SlidingWindowRateLimiter(max_requests=3) 

50 

51 

52def _get_rate_limiter_for_url(url): 

53 """Return the matching BRO rate limiter for a URL. 

54 

55 Official limits reference: 

56 https://basisregistratieondergrond.nl/actueel/nieuws/nieuws/2024/december/opvraaglimieten-publieke-rest-services/ 

57 """ 

58 try: 

59 parsed = urlparse(url) 

60 except Exception: 

61 return None 

62 

63 if parsed.netloc.lower() != _BRO_HOST: 

64 return None 

65 

66 path = parsed.path.lower() 

67 if "/gm/gld/" in path: 

68 return _GLD_RATE_LIMITER 

69 return _BRO_RATE_LIMITER 

70 

71 

72def wait_for_rate_limit(url): 

73 """Wait for an available request slot when calling BRO endpoints. 

74 

75 Official limits reference: 

76 https://basisregistratieondergrond.nl/actueel/nieuws/nieuws/2024/december/opvraaglimieten-publieke-rest-services/ 

77 """ 

78 limiter = _get_rate_limiter_for_url(url) 

79 if limiter is not None: 

80 limiter.wait_for_slot() 

81 

82 

83def request_with_rate_limit(method, url, **kwargs): 

84 """Issue an HTTP request and enforce BRO-specific rate limits when needed. 

85 

86 Official limits reference: 

87 https://basisregistratieondergrond.nl/actueel/nieuws/nieuws/2024/december/opvraaglimieten-publieke-rest-services/ 

88 """ 

89 wait_for_rate_limit(url) 

90 return requests.request(method, url, **kwargs) 

91 

92 

93def get_with_rate_limit(url, **kwargs): 

94 """Perform a GET request with BRO rate limiting. 

95 

96 Official limits reference: 

97 https://basisregistratieondergrond.nl/actueel/nieuws/nieuws/2024/december/opvraaglimieten-publieke-rest-services/ 

98 """ 

99 return request_with_rate_limit("GET", url, **kwargs) 

100 

101 

102def post_with_rate_limit(url, **kwargs): 

103 """Perform a POST request with BRO rate limiting. 

104 

105 Official limits reference: 

106 https://basisregistratieondergrond.nl/actueel/nieuws/nieuws/2024/december/opvraaglimieten-publieke-rest-services/ 

107 """ 

108 return request_with_rate_limit("POST", url, **kwargs) 

109 

110 

111def read_zipfile(fname, pathnames=None, use_bro_abbreviation=False, override_ext=None): 

112 """ 

113 Read and parse files from a ZIP archive downloaded from BROloket. 

114 

115 Parameters 

116 ---------- 

117 fname : str 

118 Path to the ZIP file to read. 

119 pathnames : list of str or str, optional 

120 List of folder names within the ZIP archive to process. If None, all unique 

121 non-root directories are processed. 

122 use_bro_abbreviation: bool, optional 

123 If True, use the abbreviation of bro-objects (e.g. GMW, GLD, BHR) to store the 

124 data in the root of the returned dictionary. If False, use the first level of 

125 the folder structure in the zip-file to store the returned objects (e.g. 

126 BRO_Grondwatermonitoring, BRO_GeologischBooronderzoek). The default is False. 

127 override_ext : str, optional 

128 Removed argument from `read_zipfile` 

129 

130 Returns 

131 ------- 

132 dict 

133 Nested dictionary where the first-level keys are data-categories, and the 

134 second-level keys are file base names (bro-id or nitg-nr). 

135 The values are either parsed objects (from corresponding classes) or file 

136 objects (e.g., PIL.Image for .tif files). 

137 

138 Notes 

139 ----- 

140 - For .tif files, PIL.Image objects are returned. 

141 - For other supported types, the corresponding class is instantiated with the file 

142 and the ZipFile object. 

143 """ 

144 if override_ext is not None: 

145 raise (Exception("The parameter `override_ext` is removed from `read_zipfile`")) 

146 

147 data = {} 

148 with ZipFile(fname) as zf: 

149 namelist = np.array(zf.namelist()) 

150 for file in namelist: 

151 name, ext = os.path.splitext(os.path.basename(file)) 

152 if name == "": 

153 # this is a directory 

154 continue 

155 pathname = os.path.dirname(file) 

156 if pathname == "": 

157 # skip file in the root path (usually the file 'locatie_levering.kml') 

158 continue 

159 if pathnames is not None: 

160 if pathname not in pathnames: 

161 continue 

162 if pathname.startswith("BRO"): 

163 if ext != ".xml": 

164 logger.info(f"Skipping file: {file}") 

165 continue 

166 if use_bro_abbreviation: 

167 key = name[:3] 

168 else: 

169 key = os.path.normpath(pathname).split(os.sep)[0] 

170 if name.startswith("BHR"): 

171 if pathname == "BRO_GeotechnischBooronderzoek": 

172 from .bhr import GeotechnicalBoreholeResearch as cl 

173 elif pathname == "BRO_GeologischBooronderzoek": 

174 from .bhr import GeologicalBoreholeResearch as cl 

175 elif pathname == "BodemkundigBooronderzoek": 

176 from .bhr import PedologicalBoreholeResearch as cl 

177 else: 

178 logger.warning(f"Unknown BHR-type: {pathname}") 

179 elif name.startswith("CPT"): 

180 from .cpt import ConePenetrationTest as cl 

181 elif name.startswith("EPC"): 

182 from .epc import ExplorationProductionConstruction as cl 

183 elif name.startswith("FRD"): 

184 from .frd import FormationResistanceDossier as cl 

185 elif name.startswith("GAR"): 

186 from .gar import GroundwaterAnalysisReport as cl 

187 elif name.startswith("GLD"): 

188 from .gld import GroundwaterLevelDossier as cl 

189 elif name.startswith("GMN"): 

190 from .gmn import GroundwaterMonitoringNetwork as cl 

191 elif name.startswith("GMW"): 

192 from .gmw import GroundwaterMonitoringWell as cl 

193 elif name.startswith("GPD"): 

194 from .gpd import GroundwaterProductionDossier as cl 

195 elif name.startswith("GUF"): 

196 from .guf import GroundwaterUtilisationFacility as cl 

197 elif name.startswith("SAD"): 

198 from .sad import SiteAssessmentData as cl 

199 elif name.startswith("SFR"): 

200 from .sfr import SoilFaceResearch as cl 

201 else: 

202 logger.warning("Unknown file-type: {file}") 

203 continue 

204 

205 elif pathname.startswith("DINO"): 

206 key = pathname 

207 if pathname == "DINO_GeologischBooronderzoekBoormonsterprofiel": 

208 from .dino import GeologischBooronderzoek as cl 

209 

210 if ext != ".csv": 

211 logger.info(f"Skipping file: {file}") 

212 continue 

213 elif pathname == "DINO_GeotechnischSondeeronderzoek": 

214 cl = None 

215 if ext != ".tif": 

216 logger.info(f"Skipping file: {file}") 

217 continue 

218 elif pathname == "DINO_GeologischBooronderzoekKorrelgrootteAnalyse": 

219 logger.warning(f"Folder {pathname} not supported yet") 

220 continue 

221 elif pathname == "DINO_GeologischBooronderzoekChemischeAnalyse": 

222 logger.warning(f"Folder {pathname} not supported yet") 

223 continue 

224 elif pathname == "DINO_Grondwatersamenstelling": 

225 from .dino import Grondwatersamenstelling as cl 

226 

227 if ext != ".csv": 

228 logger.info(f"Skipping file: {file}") 

229 continue 

230 elif pathname == "DINO_Grondwaterstanden": 

231 from .dino import Grondwaterstand as cl 

232 

233 if ext != ".csv": 

234 logger.info(f"Skipping file: {file}") 

235 continue 

236 elif pathname in [ 

237 "DINO_VerticaalElektrischSondeeronderzoek", 

238 "DINO_GeoElectrischOnderzoek", 

239 ]: 

240 from .dino import VerticaalElektrischSondeeronderzoek as cl 

241 

242 if ext != ".csv": 

243 logger.info(f"Skipping file: {file}") 

244 continue 

245 else: 

246 logger.warning(f"Folder {pathname} not supported yet") 

247 continue 

248 

249 if key not in data: 

250 data[key] = {} 

251 logger.info(f"Reading {file} from {fname}") 

252 if ext == ".tif": 

253 from PIL import Image 

254 

255 data[key][name] = Image.open(zf.open(file)) 

256 else: 

257 data[key][name] = cl(file, zipfile=zf) 

258 return data 

259 

260 

261def _get_to_file(fname, zipfile, to_path, _files): 

262 to_file = None 

263 if zipfile is not None or to_path is not None: 

264 to_file = fname 

265 if zipfile is None: 

266 to_file = os.path.join(to_path, to_file) 

267 if _files is not None: 

268 _files.append(to_file) 

269 return to_file 

270 

271 

272def _save_data_to_zip(to_zip, files, remove_path_again, to_path): 

273 try: 

274 import zlib 

275 

276 compression = ZIP_DEFLATED 

277 except ImportError: 

278 logger.warning("Could not import zlib, saving zipfile without compression") 

279 compression = ZIP_STORED 

280 with ZipFile(to_zip, "w", compression=compression) as zf: 

281 for file in files: 

282 zf.write(file, os.path.split(file)[1]) 

283 if remove_path_again: 

284 # remove individual files again 

285 for file in files: 

286 os.remove(file) 

287 os.removedirs(to_path) 

288 

289 

290def _format_repr(self, props): 

291 # format these properties into a string 

292 props_str = "" 

293 for key in props: 

294 value = props[key] 

295 props_str = f"{props_str}{key}={value.__repr__()}, " 

296 if len(props_str) > 1: 

297 props_str = props_str[:-2] 

298 # generate name 

299 name = f"{self.__class__.__name__}({props_str})" 

300 return name 

301 

302 

303def _get_tag(node): 

304 return node.tag.split("}", 1)[1] 

305 

306 

307def _warn_unknown_tag(tag, parent=None, class_name=None, bro_id=None): 

308 msg = f"Tag {tag} " 

309 if parent is not None: 

310 msg = f"{msg} of parent {parent} " 

311 msg = f"{msg} not supported" 

312 if class_name is not None: 

313 msg = f"{msg} in {class_name}" 

314 if bro_id is not None: 

315 if class_name is None: 

316 msg = f"{msg} in" 

317 msg = f"{msg} {bro_id}" 

318 

319 logger.warning(msg)