Coverage for brodata / gld.py: 72%
265 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-20 14:37 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-20 14:37 +0000
1import csv
2import logging
3import time
4from functools import partial
5from io import StringIO
7import numpy as np
8import pandas as pd
9import requests
11from . import bro
13logger = logging.getLogger(__name__)
16def get_objects_as_csv(
17 bro_id,
18 rapportagetype="volledig",
19 observatietype=None,
20 to_file=None,
21 return_contents=True,
22 **kwargs,
23):
24 """
25 Fetch a complete Groundwater Level Dossier (GLD) as a CSV (RFC 4180) file
26 based on the provided BRO-ID. The data can be filtered by report type and
27 observation type.
29 Parameters
30 ----------
31 bro_id : str
32 The BRO-ID of the Groundwater Level Dossier to fetch. It can also be a full url,
33 which is used by the gm-services. When using a full url, the parameter
34 `rapportagetype` needs to reflect the choice in the url, and the parameter
35 `observatietype` is ignored.
36 rapportagetype : str, optional
37 Type of report. The valid values are:
38 - "volledig" : Full report
39 - "compact" : Compact report with readable timestamps
40 - "compact_met_timestamps" : Compact report with Unix epoch timestamps
41 Default is "volledig".
42 observatietype : str, optional
43 Type of observations. The valid values are:
44 - "regulier_beoordeeld" : Regular measurement with full evaluation
45 (observatietype = reguliere meting en mate beoordeling = volledig beoordeeld)
46 - "regulier_voorlopig" : Regular measurement with preliminary evaluation
47 (observatietype = reguliere meting en mate beoordeling = voorlopig)
48 - "controle" : Control measurement
49 (observatietype = controle meting)
50 - "onbekend" : Unknown evaluation
51 (observatietype = reguliere meting en mate beoordeling = onbekend)
52 If None, all observation types will be returned. Default is None.
53 to_file : str, optional
54 If provided, the CSV data will be written to the specified file.
55 If None, the function returns the CSV data as a DataFrame. Default is None.
56 return_contents : bool, optional
57 If True, the function returns the parsed CSV data as a DataFrame. If False,
58 the function returns None after saving the CSV to the specified file (if
59 `to_file` is provided). Default is True.
60 **kwargs : additional keyword arguments
61 Additional arguments passed to `read_gld_csv`.
63 Returns
64 -------
65 pd.DataFrame or None
66 If successful, returns a DataFrame containing the parsed CSV data.
67 If `to_file` is provided, returns None after saving the CSV to the specified file.
68 If the request fails or returns empty data, returns None.
70 Notes
71 -----
72 The function sends a GET request to the Groundwater Level Dossier API
73 and fetches the data in CSV format. The `rapportagetype` and `observatietype`
74 parameters can be used to filter the data.
75 """
76 if bro_id.startswith("http"):
77 req = requests.get(bro_id)
78 else:
79 url = f"{GroundwaterLevelDossier._rest_url}/objectsAsCsv/{bro_id}"
80 params = {
81 "rapportagetype": rapportagetype,
82 }
83 if observatietype is not None:
84 params["observatietype"] = observatietype
85 req = requests.get(url, params=params)
86 req = _check_request_status(req)
87 if to_file is not None:
88 with open(to_file, "w") as f:
89 f.write(req.text)
90 if not return_contents:
91 return
92 if req.text == "":
93 return None
94 else:
95 df = read_gld_csv(
96 StringIO(req.text),
97 bro_id,
98 rapportagetype=rapportagetype,
99 observatietype=observatietype,
100 **kwargs,
101 )
102 return df
105def _check_request_status(req):
106 if req.status_code == 429:
107 msg = "Too many requests. The BRO API has rate limits in place."
108 logger.warning(msg)
109 # try 3 times with increasing wait time
110 wait_times = [1, 2, 4]
111 for wait_time in wait_times:
112 logger.warning(f"Waiting for {wait_time} seconds before retrying...")
113 time.sleep(wait_time)
114 req = requests.get(req.url)
115 if req.status_code <= 200:
116 break
117 if req.status_code == 429:
118 raise Exception(msg + " Please try again later.")
119 if req.status_code > 200:
120 json_data = req.json()
121 if "errors" in json_data:
122 msg = json_data["errors"][0]["message"]
123 else:
124 msg = "{}: {}".format(json_data["title"], json_data["description"])
125 raise Exception(msg)
126 return req
129def get_series_as_csv(
130 bro_id, filter_on_status_quality_control=None, asISO8601=False, to_file=None
131):
132 """
133 Get groundwater level series as a CSV, with timestamps and corresponding measurements.
135 This function retrieves a table with measurements for different observation types
136 (regulier_voorlopig, regulier_beoordeeld, controle en onbekend) as columns. It is
137 intended for applications such as the graphical visualization of groundwater levels.
139 Parameters
140 ----------
141 bro_id : str
142 The BRO-ID of the Groundwater Level Dossier.
143 filter_on_status_quality_control : str or list of str, optional
144 One or more quality control statuses to filter the measurements by.
145 Possible values are 'onbeslist', 'goedgekeurd', and 'afgekeurd'.
146 The default is None.
147 asISO8601 : bool, optional
148 If True, timestamps are returned in ISO8601 format; otherwise, in Unix
149 epoch format. The default is False.
150 to_file : str, optional
151 If provided, the CSV data will be written to this file path. The default
152 is None.
154 Returns
155 -------
156 pd.DataFrame or None
157 A DataFrame containing the time series of measurements, with timestamps
158 as the index. Returns None if no data is available.
159 """
160 url = f"{GroundwaterLevelDossier._rest_url}/seriesAsCsv/{bro_id}"
161 params = {}
162 if filter_on_status_quality_control is not None:
163 if not isinstance(filter_on_status_quality_control, str):
164 filter_on_status_quality_control = ",".join(
165 filter_on_status_quality_control
166 )
167 params["filterOnStatusQualityControl"] = filter_on_status_quality_control
168 if asISO8601:
169 params["asISO8601"] = ""
170 req = requests.get(url, params=params)
171 req = _check_request_status(req)
172 if to_file is not None:
173 with open(to_file, "w") as f:
174 f.write(req.text)
175 if req.text == "":
176 return None
177 else:
178 df = pd.read_csv(StringIO(req.text))
179 if "Tijdstip" in df.columns:
180 if asISO8601:
181 df["Tijdstip"] = pd.to_datetime(df["Tijdstip"])
182 else:
183 df["Tijdstip"] = pd.to_datetime(df["Tijdstip"], unit="ms")
184 df = df.set_index("Tijdstip")
185 return df
188def read_gld_csv(fname, bro_id, rapportagetype, observatietype, **kwargs):
189 """
190 Read and process a Groundwater Level Dossier (GLD) CSV file.
192 This function reads a CSV file containing groundwater level observations,
193 processes the data according to the specified report type (`rapportagetype`),
194 and returns a DataFrame of the observations. The file is assumed to contain
195 at least three columns: time, value, and qualifier. The 'time' column is parsed
196 as datetime, and additional processing is applied to the data.
198 Parameters
199 ----------
200 fname : str
201 The path to the CSV file containing the groundwater level observations.
202 bro_id : str
203 The BRO-ID of the Groundwater Level Dossier being processed.
204 rapportagetype : str
205 The report type. Can be one of:
206 - 'volledig': as complete as possible (not supported yet)
207 - 'compact': simple format with time and value.
208 - 'compact_met_timestamps': format with timestamps for each observation.
209 **kwargs : additional keyword arguments
210 Additional arguments passed to the `process_observations` function.
212 Returns
213 -------
214 pd.DataFrame
215 A DataFrame containing the processed observations with the following columns:
216 - time: The observation time.
217 - value: The observed groundwater level.
218 - qualifier: The quality code of the observation.
219 - censored_reason: Reason for censoring, if applicable.
220 - censoring_limitvalue: Limit value for censoring, if applicable.
221 - interpolation_type: The interpolation method used, if applicable.
223 Notes
224 -----
225 The time column is parsed as a datetime index. If the report type is
226 'compact_met_timestamps', the time values are converted from Unix epoch time
227 (milliseconds) to a datetime format.
228 """
229 names = [
230 "time",
231 "value",
232 "qualifier",
233 "censored_reason",
234 "censoring_limitvalue",
235 "interpolation_type",
236 ]
237 if rapportagetype == "compact":
238 parse_dates = ["time"]
239 else:
240 parse_dates = None
241 if observatietype is None or rapportagetype == "volledig":
242 # the csv contains multiple observation types, seperated by a header with
243 # observation-type and status.
244 if isinstance(fname, StringIO):
245 lines = fname.readlines()
246 else:
247 with open(fname, "r") as f:
248 lines = f.readlines()
250 # look for header lines
251 headers = []
252 if rapportagetype == "volledig":
253 # the line with metdata is proceeded by a line starting with "observatie ID"
254 for i, line in enumerate(lines):
255 if line.startswith('"observatie ID",'):
256 headers.append(i + 1)
257 header_length = 3
258 else:
259 # the line with metdata is proceeded by an empty line
260 # but directly after the header, there can also be empty lines, that we skip
261 data_lines = False
262 for i, line in enumerate(lines):
263 only_commas = all(c == "," for c in line.rstrip("\r\n"))
264 last_line_was_header = len(headers) > 0 and headers[-1] == i - 1
266 if only_commas:
267 if last_line_was_header:
268 data_lines = True
269 else:
270 data_lines = False
271 else:
272 if not data_lines:
273 headers.append(i)
274 header_length = 2
276 dfs = []
277 for i, header in enumerate(headers):
278 line = lines[header]
279 # split string by comma, but ignore commas between quotes
280 reader = csv.reader(StringIO(line))
281 parts = next(reader)
282 observation_type = parts[3]
283 status = parts[4]
285 if i < len(headers) - 1:
286 current_lines = lines[header + header_length : headers[i + 1] - 1]
287 else:
288 current_lines = lines[header + header_length :]
289 df = pd.read_csv(
290 StringIO("".join(current_lines)),
291 names=names,
292 index_col="time",
293 parse_dates=parse_dates,
294 usecols=[0, 1, 2],
295 )
296 # remove empty indices
297 mask = df.index.isna() & df.isna().all(axis=1)
298 if mask.any():
299 df = df[~mask]
300 df["status"] = status
301 df["observation_type"] = observation_type
302 dfs.append(df)
303 if len(dfs) > 0:
304 df = pd.concat(dfs)
305 else:
306 df = _get_empty_observation_df()
307 else:
308 df = pd.read_csv(
309 fname,
310 names=names,
311 index_col="time",
312 parse_dates=parse_dates,
313 usecols=[0, 1, 2],
314 )
315 if observatietype == "regulier_beoordeeld":
316 df["status"] = "volledigBeoordeeld"
317 df["observation_type"] = "reguliereMeting"
318 elif observatietype == "regulier_voorlopig":
319 df["status"] = "voorlopig"
320 df["observation_type"] = "reguliereMeting"
321 elif observatietype == "controle":
322 df["status"] = np.nan
323 df["observation_type"] = "controleMeting"
324 elif observatietype == "onbekend":
325 df["status"] = "onbekend"
326 df["observation_type"] = "reguliereMeting"
327 if rapportagetype == "compact_met_timestamps":
328 df.index = pd.to_datetime(df.index, unit="ms")
329 # remove empty indices
330 mask = df.index.isna() & df.isna().all(axis=1)
331 if mask.any():
332 df = df[~mask]
333 df = process_observations(df, bro_id, **kwargs)
334 return df
337def get_observations_summary(bro_id):
338 """
339 Fetch a summary of a Groundwater Level Dossier (GLD) in JSON format based on
340 the provided BRO-ID. The summary includes details about the groundwater level
341 observations, such as observation ID, start and end dates.
343 Parameters
344 ----------
345 bro_id : str
346 The BRO-ID of the Groundwater Level Dossier to fetch the summary for.
348 Raises
349 ------
350 Exception
351 If the request to the API fails or the status code is greater than 200,
352 an exception will be raised with the error message returned by the API.
354 Returns
355 -------
356 pd.DataFrame
357 A DataFrame containing the summary of the groundwater level observations.
358 The DataFrame will be indexed by the `observationId` and include
359 `startDate` and `endDate` columns, converted to `datetime` format.
361 Notes
362 -----
363 The function sends a GET request to the REST API and processes the returned
364 JSON data into a DataFrame. If the response contains valid `startDate` or
365 `endDate` fields, they will be converted to `datetime` format using the
366 `pd.to_datetime` function.
367 """
368 url = GroundwaterLevelDossier._rest_url
369 url = "{}/objects/{}/observationsSummary".format(url, bro_id)
370 req = requests.get(url)
371 req = _check_request_status(req)
372 df = pd.DataFrame(req.json())
373 if "observationId" in df.columns:
374 df = df.set_index("observationId")
375 if "startDate" in df.columns:
376 df["startDate"] = pd.to_datetime(df["startDate"], dayfirst=True)
377 if "endDate" in df.columns:
378 df["endDate"] = pd.to_datetime(df["endDate"], dayfirst=True)
379 return df
382class GroundwaterLevelDossier(bro.FileOrUrl):
383 """
384 Class to represent a Groundwater Level Dossier (GLD) from the BRO.
386 Attributes
387 ----------
388 observation : pd.DataFrame
389 DataFrame containing groundwater level observations with time and value
390 columns. The data is processed and filtered based on the provided arguments.
392 tubeNumber : int
393 The tube number associated with the observation.
395 groundwaterMonitoringWell : str
396 The BRO-ID of the groundwater monitoring well.
397 """
399 _rest_url = "https://publiek.broservices.nl/gm/gld/v1"
401 def _read_contents(self, tree, status=None, observation_type=None, **kwargs):
402 """
403 Parse data to populate the Groundwater Level Dossier attributes.
405 This method reads and processes the XML contents, extracting relevant
406 groundwater monitoring information such as the groundwater monitoring well,
407 tube number, and observations. It also processes the observations into a
408 DataFrame, which is filtered and transformed based on the provided arguments.
410 Parameters
411 ----------
412 tree : xml.etree.ElementTree
413 The XML tree to parse and extract data from.
415 **kwargs : keyword arguments
416 Additional parameters passed to the `process_observations` function to
417 filter and transform the observations.
419 Raises
420 ------
421 Exception
422 If more than one or no GLD element is found in the XML tree.
424 Notes
425 -----
426 The method expects the XML structure to adhere to the specified namespaces
427 and element tags. It processes observation values, timestamps, and qualifiers
428 into a pandas DataFrame.
430 The observation data is stored in the `observation` attribute and can be
431 accessed as a DataFrame.
432 """
433 ns = {
434 "xmlns": "http://www.broservices.nl/xsd/dsgld/1.0",
435 "gldcommon": "http://www.broservices.nl/xsd/gldcommon/1.0",
436 "waterml": "http://www.opengis.net/waterml/2.0",
437 "swe": "http://www.opengis.net/swe/2.0",
438 "om": "http://www.opengis.net/om/2.0",
439 "xlink": "http://www.w3.org/1999/xlink",
440 }
441 gld = self._get_main_object(tree, "GLD_O", ns)
442 for key in gld.attrib:
443 setattr(self, key.split("}", 1)[1], gld.attrib[key])
444 for child in gld:
445 key = self._get_tag(child)
446 if len(child) == 0:
447 setattr(self, key, child.text)
448 elif key == "monitoringPoint":
449 well = child.find("gldcommon:GroundwaterMonitoringTube", ns)
450 gmw_id = well.find("gldcommon:broId", ns).text
451 setattr(self, "groundwaterMonitoringWell", gmw_id)
452 tube_nr = int(well.find("gldcommon:tubeNumber", ns).text)
453 setattr(self, "tubeNumber", tube_nr)
454 elif key in ["registrationHistory"]:
455 self._read_children_of_children(child)
456 elif key == "groundwaterMonitoringNet":
457 for grandchild in child:
458 key2 = grandchild.tag.split("}", 1)[1]
459 if key2 == "GroundwaterMonitoringNet":
460 setattr(self, key, grandchild[0].text)
461 else:
462 logger.warning(f"Unknown key: {key2}")
463 elif key == "observation":
464 # get observation_metadata
465 om_observation = child.find("om:OM_Observation", ns)
466 if om_observation is None:
467 continue
468 metadata = om_observation.find("om:metadata", ns)
469 observation_metadata = metadata.find("waterml:ObservationMetadata", ns)
471 # get status
472 water_ml_status = observation_metadata.find("waterml:status", ns)
473 if water_ml_status is None:
474 status_value = None
475 else:
476 status_value = water_ml_status.attrib[
477 f"{{{ns['xlink']}}}href"
478 ].rsplit(":", 1)[-1]
479 if status is not None and status != status_value:
480 continue
482 # get observation_type
483 parameter = observation_metadata.find("waterml:parameter", ns)
484 named_value = parameter.find("om:NamedValue", ns)
485 name = named_value.find("om:name", ns)
486 assert (
487 name.attrib[f"{{{ns['xlink']}}}href"]
488 == "urn:bro:gld:ObservationMetadata:observationType"
489 )
490 value = named_value.find("om:value", ns)
491 observation_type_value = value.text
492 if (
493 observation_type is not None
494 and observation_type != observation_type_value
495 ):
496 continue
498 times = []
499 values = []
500 qualifiers = []
501 for measurement in child.findall(".//waterml:MeasurementTVP", ns):
502 times.append(measurement.find("waterml:time", ns).text)
503 value = measurement.find("waterml:value", ns).text
504 if value is None:
505 values.append(np.nan)
506 else:
507 values.append(float(value))
508 metadata = measurement.find("waterml:metadata", ns)
509 TVPMM = metadata.find("waterml:TVPMeasurementMetadata", ns)
510 qualifier = TVPMM.find("waterml:qualifier", ns)
511 value = qualifier.find("swe:Category", ns).find("swe:value", ns)
512 qualifiers.append(value.text)
513 observation = pd.DataFrame(
514 {
515 "time": times,
516 "value": values,
517 "qualifier": qualifiers,
518 "status": status_value,
519 "observation_type": observation_type_value,
520 }
521 ).set_index("time")
523 if not hasattr(self, key):
524 self.observation = []
525 self.observation.append(observation)
526 else:
527 self._warn_unknown_tag(key)
528 if hasattr(self, "observation"):
529 self.observation = pd.concat(self.observation)
530 self.observation = process_observations(
531 self.observation, self.broId, **kwargs
532 )
533 else:
534 self.observation = _get_empty_observation_df()
537def process_observations(
538 df,
539 bro_id="gld",
540 to_wintertime=True,
541 qualifier=None,
542 tmin=None,
543 tmax=None,
544 sort=True,
545 drop_duplicates=True,
546):
547 """
548 Process groundwater level observations.
550 This function processes a DataFrame containing groundwater level observations,
551 applying the following operations based on the provided parameters:
552 - Conversion to Dutch winter time (optional).
553 - Filtering observations based on the qualifier.
554 - Dropping duplicate observations (optional).
555 - Sorting the observations by time (optional).
557 Parameters
558 ----------
559 df : pd.DataFrame
560 DataFrame containing the groundwater level observations, with a time
561 index and columns such as "value", "qualifier", etc.
562 bro_id : str
563 The BRO-ID of the Groundwater Level Dossier being processed. Only used for
564 logging-purposes. The default is "gld".
565 to_wintertime : bool, optional
566 If True, the observation times are converted to Dutch winter time by
567 removing any time zone information and adding one hour. If to_wintertime is
568 False, observation times are kept in CET/CEST. Default is True.
569 qualifier : str or list of str, optional
570 If provided, the observations are filtered based on their "qualifier"
571 column. Only rows with the specified qualifier(s) will be kept.
572 tmin : str or datetime, optional
573 The minimum time for filtering observations. Defaults to None.
574 tmax : str or datetime, optional
575 The maximum time for filtering observations. Defaults to None.
576 sort : bool, optional
577 If True, the DataFrame will be sorted, see `sort_observations`. Default is
578 True.
579 drop_duplicates : bool, optional
580 If True, any duplicate observation times will be dropped, keeping only
581 the first occurrence. Default is True.
583 Returns
584 -------
585 pd.DataFrame
586 A DataFrame containing the processed observations, with duplicate rows
587 (if any) removed, the time index sorted, and filtered by qualifier if
588 applicable.
590 """
591 df.index = pd.to_datetime(df.index, utc=True)
592 if to_wintertime:
593 # remove time zone information by transforming to dutch winter time
594 df.index = df.index.tz_localize(None) + pd.Timedelta(1, unit="h")
595 else:
596 df.index = df.index.tz_convert("CET")
598 if qualifier is not None:
599 if isinstance(qualifier, str):
600 df = df[df["qualifier"] == qualifier]
601 else:
602 df = df[df["qualifier"].isin(qualifier)]
604 if tmin is not None:
605 df = df.loc[pd.Timestamp(tmin) :]
607 if tmax is not None:
608 df = df.loc[: pd.Timestamp(tmax)]
610 if sort:
611 df = sort_observations(df)
613 if drop_duplicates:
614 df = drop_duplicate_observations(df, bro_id=bro_id, sort=sort)
616 return df
619def sort_observations(df):
620 """
621 Sort observations in a DataFrame by multiple criteria. Applies a multi-level sort
622 to the input DataFrame, prioritizing the following criteria in order:
623 1. By the DataFrame's DatetimeIndex in ascending order
624 2. By status (if present): volledigBeoordeeld before voorlopig before onbekend
625 3. By observation_type (if present): reguliereMeting before controleMeting
627 Parameters
628 ----------
629 df : pandas.DataFrame
630 DataFrame with optional 'observation_type' and 'status' columns,
631 and a DatetimeIndex.
633 Returns
634 -------
635 pandas.DataFrame
636 Sorted DataFrame with the same structure as input.
637 """
638 if "observation_type" in df.columns:
639 # make sure measurements with observation_type set to reguliereMeting are first
640 sort_dict = {"reguliereMeting": 0, "controleMeting": 1}
641 df = df.sort_values("observation_type", key=lambda x: x.map(sort_dict))
643 if "status" in df.columns:
644 # make sure measurements with status set to volledigBeoordeeld are first
645 sort_dict = {"volledigBeoordeeld": 0, "voorlopig": 1, "onbekend": 2}
646 df = df.sort_values("status", key=lambda x: x.map(sort_dict))
648 # sort based on DatetimeIndex
649 df = df.sort_index()
651 return df
654def drop_duplicate_observations(df, bro_id="gld", keep="first", sort=True):
655 """
656 Remove duplicate observations from a DataFrame based on its index.
658 Parameters
659 ----------
660 df : pd.DataFrame
661 The DataFrame to process.
662 bro_id : str, optional
663 Identifier for the dataset, used in warning messages. Default is "gld".
664 keep : {'first', 'last', False}, optional
665 Which duplicates to mark:
666 - 'first' : Mark duplicates as True except for the first occurrence.
667 - 'last' : Mark duplicates as True except for the last occurrence.
668 - False : Mark all duplicates as True.
669 Default is 'first'.
671 Returns
672 -------
673 pd.DataFrame
674 DataFrame with duplicate index values removed, keeping only the rows
675 specified by the `keep` parameter.
677 Warnings
678 --------
679 Logs a warning message if duplicates are found, indicating the number and
680 total count of duplicates before removal.
681 """
682 if df.index.has_duplicates:
683 duplicates = df.index.duplicated(keep=keep)
684 message = "{} contains {} duplicates (of {}). Keeping only first values"
685 message = message.format(bro_id, duplicates.sum(), len(df.index))
686 if sort:
687 message = f"{message} (sorted for importance)"
688 message = f"{message}."
689 logger.warning(message)
690 df = df[~duplicates]
691 return df
694def _get_empty_observation_df():
695 columns = ["time", "value", "qualifier", "status", "observation_type"]
696 return pd.DataFrame(columns=columns).set_index("time")
699cl = GroundwaterLevelDossier
701get_bro_ids_of_bronhouder = partial(bro._get_bro_ids_of_bronhouder, cl)
702get_bro_ids_of_bronhouder.__doc__ = bro._get_bro_ids_of_bronhouder.__doc__
704get_data_for_bro_ids = partial(bro._get_data_for_bro_ids, cl)
705get_data_for_bro_ids.__doc__ = bro._get_data_for_bro_ids.__doc__