Coverage for brodata / util.py: 62%
203 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-13 12:57 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-13 12:57 +0000
1import logging
2import os
3import threading
4import time
5from collections import deque
6from urllib.parse import urlparse
7from zipfile import ZipFile, ZIP_DEFLATED, ZIP_STORED
9import numpy as np
10import requests
12logger = logging.getLogger(__name__)
14try:
15 from tqdm import tqdm
16except ImportError:
17 # fallback: generate a dummy method with the same interface
18 def tqdm(iterable=None, **kwargs):
19 return iterable if iterable is not None else []
22class _SlidingWindowRateLimiter:
23 def __init__(self, max_requests, period_seconds=1.0):
24 self.max_requests = max_requests
25 self.period_seconds = period_seconds
26 self._timestamps = deque()
27 self._lock = threading.Lock()
29 def wait_for_slot(self):
30 while True:
31 with self._lock:
32 now = time.monotonic()
33 window_start = now - self.period_seconds
34 while self._timestamps and self._timestamps[0] <= window_start:
35 self._timestamps.popleft()
37 if len(self._timestamps) < self.max_requests:
38 self._timestamps.append(now)
39 return
41 wait_seconds = self.period_seconds - (now - self._timestamps[0])
43 if wait_seconds > 0:
44 time.sleep(wait_seconds)
47_BRO_HOST = "publiek.broservices.nl"
48_BRO_RATE_LIMITER = _SlidingWindowRateLimiter(max_requests=5)
49_GLD_RATE_LIMITER = _SlidingWindowRateLimiter(max_requests=3)
52def _get_rate_limiter_for_url(url):
53 """Return the matching BRO rate limiter for a URL.
55 Official limits reference:
56 https://basisregistratieondergrond.nl/actueel/nieuws/nieuws/2024/december/opvraaglimieten-publieke-rest-services/
57 """
58 try:
59 parsed = urlparse(url)
60 except Exception:
61 return None
63 if parsed.netloc.lower() != _BRO_HOST:
64 return None
66 path = parsed.path.lower()
67 if "/gm/gld/" in path:
68 return _GLD_RATE_LIMITER
69 return _BRO_RATE_LIMITER
72def wait_for_rate_limit(url):
73 """Wait for an available request slot when calling BRO endpoints.
75 Official limits reference:
76 https://basisregistratieondergrond.nl/actueel/nieuws/nieuws/2024/december/opvraaglimieten-publieke-rest-services/
77 """
78 limiter = _get_rate_limiter_for_url(url)
79 if limiter is not None:
80 limiter.wait_for_slot()
83def request_with_rate_limit(method, url, **kwargs):
84 """Issue an HTTP request and enforce BRO-specific rate limits when needed.
86 Official limits reference:
87 https://basisregistratieondergrond.nl/actueel/nieuws/nieuws/2024/december/opvraaglimieten-publieke-rest-services/
88 """
89 wait_for_rate_limit(url)
90 return requests.request(method, url, **kwargs)
93def get_with_rate_limit(url, **kwargs):
94 """Perform a GET request with BRO rate limiting.
96 Official limits reference:
97 https://basisregistratieondergrond.nl/actueel/nieuws/nieuws/2024/december/opvraaglimieten-publieke-rest-services/
98 """
99 return request_with_rate_limit("GET", url, **kwargs)
102def post_with_rate_limit(url, **kwargs):
103 """Perform a POST request with BRO rate limiting.
105 Official limits reference:
106 https://basisregistratieondergrond.nl/actueel/nieuws/nieuws/2024/december/opvraaglimieten-publieke-rest-services/
107 """
108 return request_with_rate_limit("POST", url, **kwargs)
111def read_zipfile(fname, pathnames=None, use_bro_abbreviation=False, override_ext=None):
112 """
113 Read and parse files from a ZIP archive downloaded from BROloket.
115 Parameters
116 ----------
117 fname : str
118 Path to the ZIP file to read.
119 pathnames : list of str or str, optional
120 List of folder names within the ZIP archive to process. If None, all unique
121 non-root directories are processed.
122 use_bro_abbreviation: bool, optional
123 If True, use the abbreviation of bro-objects (e.g. GMW, GLD, BHR) to store the
124 data in the root of the returned dictionary. If False, use the first level of
125 the folder structure in the zip-file to store the returned objects (e.g.
126 BRO_Grondwatermonitoring, BRO_GeologischBooronderzoek). The default is False.
127 override_ext : str, optional
128 Removed argument from `read_zipfile`
130 Returns
131 -------
132 dict
133 Nested dictionary where the first-level keys are data-categories, and the
134 second-level keys are file base names (bro-id or nitg-nr).
135 The values are either parsed objects (from corresponding classes) or file
136 objects (e.g., PIL.Image for .tif files).
138 Notes
139 -----
140 - For .tif files, PIL.Image objects are returned.
141 - For other supported types, the corresponding class is instantiated with the file
142 and the ZipFile object.
143 """
144 if override_ext is not None:
145 raise (Exception("The parameter `override_ext` is removed from `read_zipfile`"))
147 data = {}
148 with ZipFile(fname) as zf:
149 namelist = np.array(zf.namelist())
150 for file in namelist:
151 name, ext = os.path.splitext(os.path.basename(file))
152 if name == "":
153 # this is a directory
154 continue
155 pathname = os.path.dirname(file)
156 if pathname == "":
157 # skip file in the root path (usually the file 'locatie_levering.kml')
158 continue
159 if pathnames is not None:
160 if pathname not in pathnames:
161 continue
162 if pathname.startswith("BRO"):
163 if ext != ".xml":
164 logger.info(f"Skipping file: {file}")
165 continue
166 if use_bro_abbreviation:
167 key = name[:3]
168 else:
169 key = os.path.normpath(pathname).split(os.sep)[0]
170 if name.startswith("BHR"):
171 if pathname == "BRO_GeotechnischBooronderzoek":
172 from .bhr import GeotechnicalBoreholeResearch as cl
173 elif pathname == "BRO_GeologischBooronderzoek":
174 from .bhr import GeologicalBoreholeResearch as cl
175 elif pathname == "BodemkundigBooronderzoek":
176 from .bhr import PedologicalBoreholeResearch as cl
177 else:
178 logger.warning(f"Unknown BHR-type: {pathname}")
179 elif name.startswith("CPT"):
180 from .cpt import ConePenetrationTest as cl
181 elif name.startswith("EPC"):
182 from .epc import ExplorationProductionConstruction as cl
183 elif name.startswith("FRD"):
184 from .frd import FormationResistanceDossier as cl
185 elif name.startswith("GAR"):
186 from .gar import GroundwaterAnalysisReport as cl
187 elif name.startswith("GLD"):
188 from .gld import GroundwaterLevelDossier as cl
189 elif name.startswith("GMN"):
190 from .gmn import GroundwaterMonitoringNetwork as cl
191 elif name.startswith("GMW"):
192 from .gmw import GroundwaterMonitoringWell as cl
193 elif name.startswith("GPD"):
194 from .gpd import GroundwaterProductionDossier as cl
195 elif name.startswith("GUF"):
196 from .guf import GroundwaterUtilisationFacility as cl
197 elif name.startswith("SAD"):
198 from .sad import SiteAssessmentData as cl
199 elif name.startswith("SFR"):
200 from .sfr import SoilFaceResearch as cl
201 else:
202 logger.warning("Unknown file-type: {file}")
203 continue
205 elif pathname.startswith("DINO"):
206 key = pathname
207 if pathname == "DINO_GeologischBooronderzoekBoormonsterprofiel":
208 from .dino import GeologischBooronderzoek as cl
210 if ext != ".csv":
211 logger.info(f"Skipping file: {file}")
212 continue
213 elif pathname == "DINO_GeotechnischSondeeronderzoek":
214 cl = None
215 if ext != ".tif":
216 logger.info(f"Skipping file: {file}")
217 continue
218 elif pathname == "DINO_GeologischBooronderzoekKorrelgrootteAnalyse":
219 logger.warning(f"Folder {pathname} not supported yet")
220 continue
221 elif pathname == "DINO_GeologischBooronderzoekChemischeAnalyse":
222 logger.warning(f"Folder {pathname} not supported yet")
223 continue
224 elif pathname == "DINO_Grondwatersamenstelling":
225 from .dino import Grondwatersamenstelling as cl
227 if ext != ".csv":
228 logger.info(f"Skipping file: {file}")
229 continue
230 elif pathname == "DINO_Grondwaterstanden":
231 from .dino import Grondwaterstand as cl
233 if ext != ".csv":
234 logger.info(f"Skipping file: {file}")
235 continue
236 elif pathname in [
237 "DINO_VerticaalElektrischSondeeronderzoek",
238 "DINO_GeoElectrischOnderzoek",
239 ]:
240 from .dino import VerticaalElektrischSondeeronderzoek as cl
242 if ext != ".csv":
243 logger.info(f"Skipping file: {file}")
244 continue
245 else:
246 logger.warning(f"Folder {pathname} not supported yet")
247 continue
249 if key not in data:
250 data[key] = {}
251 logger.info(f"Reading {file} from {fname}")
252 if ext == ".tif":
253 from PIL import Image
255 data[key][name] = Image.open(zf.open(file))
256 else:
257 data[key][name] = cl(file, zipfile=zf)
258 return data
261def _get_to_file(fname, zipfile, to_path, _files):
262 to_file = None
263 if zipfile is not None or to_path is not None:
264 to_file = fname
265 if zipfile is None:
266 to_file = os.path.join(to_path, to_file)
267 if _files is not None:
268 _files.append(to_file)
269 return to_file
272def _save_data_to_zip(to_zip, files, remove_path_again, to_path):
273 try:
274 import zlib
276 compression = ZIP_DEFLATED
277 except ImportError:
278 logger.warning("Could not import zlib, saving zipfile without compression")
279 compression = ZIP_STORED
280 with ZipFile(to_zip, "w", compression=compression) as zf:
281 for file in files:
282 zf.write(file, os.path.split(file)[1])
283 if remove_path_again:
284 # remove individual files again
285 for file in files:
286 os.remove(file)
287 os.removedirs(to_path)
290def _format_repr(self, props):
291 # format these properties into a string
292 props_str = ""
293 for key in props:
294 value = props[key]
295 props_str = f"{props_str}{key}={value.__repr__()}, "
296 if len(props_str) > 1:
297 props_str = props_str[:-2]
298 # generate name
299 name = f"{self.__class__.__name__}({props_str})"
300 return name
303def _get_tag(node):
304 return node.tag.split("}", 1)[1]
307def _warn_unknown_tag(tag, parent=None, class_name=None, bro_id=None):
308 msg = f"Tag {tag} "
309 if parent is not None:
310 msg = f"{msg} of parent {parent} "
311 msg = f"{msg} not supported"
312 if class_name is not None:
313 msg = f"{msg} in {class_name}"
314 if bro_id is not None:
315 if class_name is None:
316 msg = f"{msg} in"
317 msg = f"{msg} {bro_id}"
319 logger.warning(msg)