Coverage for brodata / gld.py: 72%
264 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-13 12:57 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-13 12:57 +0000
1import csv
2import logging
3import time
4from functools import partial
5from io import StringIO
7import numpy as np
8import pandas as pd
10from . import bro, util
12logger = logging.getLogger(__name__)
15def get_objects_as_csv(
16 bro_id,
17 rapportagetype="volledig",
18 observatietype=None,
19 to_file=None,
20 return_contents=True,
21 **kwargs,
22):
23 """
24 Fetch a complete Groundwater Level Dossier (GLD) as a CSV (RFC 4180) file
25 based on the provided BRO-ID. The data can be filtered by report type and
26 observation type.
28 Parameters
29 ----------
30 bro_id : str
31 The BRO-ID of the Groundwater Level Dossier to fetch. It can also be a full url,
32 which is used by the gm-services. When using a full url, the parameter
33 `rapportagetype` needs to reflect the choice in the url, and the parameter
34 `observatietype` is ignored.
35 rapportagetype : str, optional
36 Type of report. The valid values are:
37 - "volledig" : Full report
38 - "compact" : Compact report with readable timestamps
39 - "compact_met_timestamps" : Compact report with Unix epoch timestamps
40 Default is "volledig".
41 observatietype : str, optional
42 Type of observations. The valid values are:
43 - "regulier_beoordeeld" : Regular measurement with full evaluation
44 (observatietype = reguliere meting en mate beoordeling = volledig beoordeeld)
45 - "regulier_voorlopig" : Regular measurement with preliminary evaluation
46 (observatietype = reguliere meting en mate beoordeling = voorlopig)
47 - "controle" : Control measurement
48 (observatietype = controle meting)
49 - "onbekend" : Unknown evaluation
50 (observatietype = reguliere meting en mate beoordeling = onbekend)
51 If None, all observation types will be returned. Default is None.
52 to_file : str, optional
53 If provided, the CSV data will be written to the specified file.
54 If None, the function returns the CSV data as a DataFrame. Default is None.
55 return_contents : bool, optional
56 If True, the function returns the parsed CSV data as a DataFrame. If False,
57 the function returns None after saving the CSV to the specified file (if
58 `to_file` is provided). Default is True.
59 **kwargs : additional keyword arguments
60 Additional arguments passed to `read_gld_csv`.
62 Returns
63 -------
64 pd.DataFrame or None
65 If successful, returns a DataFrame containing the parsed CSV data.
66 If `to_file` is provided, returns None after saving the CSV to the specified file.
67 If the request fails or returns empty data, returns None.
69 Notes
70 -----
71 The function sends a GET request to the Groundwater Level Dossier API
72 and fetches the data in CSV format. The `rapportagetype` and `observatietype`
73 parameters can be used to filter the data.
74 """
75 if bro_id.startswith("http"):
76 req = util.get_with_rate_limit(bro_id)
77 else:
78 url = f"{GroundwaterLevelDossier._rest_url}/objectsAsCsv/{bro_id}"
79 params = {
80 "rapportagetype": rapportagetype,
81 }
82 if observatietype is not None:
83 params["observatietype"] = observatietype
84 req = util.get_with_rate_limit(url, params=params)
85 req = _check_request_status(req)
86 if to_file is not None:
87 with open(to_file, "w") as f:
88 f.write(req.text)
89 if not return_contents:
90 return
91 if req.text == "":
92 return None
93 else:
94 df = read_gld_csv(
95 StringIO(req.text),
96 bro_id,
97 rapportagetype=rapportagetype,
98 observatietype=observatietype,
99 **kwargs,
100 )
101 return df
104def _check_request_status(req):
105 if req.status_code == 429:
106 msg = "Too many requests. The BRO API has rate limits in place."
107 logger.warning(msg)
108 # try 3 times with increasing wait time
109 wait_times = [1, 2, 4]
110 for wait_time in wait_times:
111 logger.warning(f"Waiting for {wait_time} seconds before retrying...")
112 time.sleep(wait_time)
113 req = util.get_with_rate_limit(req.url)
114 if req.status_code <= 200:
115 break
116 if req.status_code == 429:
117 raise Exception(msg + " Please try again later.")
118 if req.status_code > 200:
119 json_data = req.json()
120 if "errors" in json_data:
121 msg = json_data["errors"][0]["message"]
122 else:
123 msg = "{}: {}".format(json_data["title"], json_data["description"])
124 raise Exception(msg)
125 return req
128def get_series_as_csv(
129 bro_id, filter_on_status_quality_control=None, asISO8601=False, to_file=None
130):
131 """
132 Get groundwater level series as a CSV, with timestamps and corresponding measurements.
134 This function retrieves a table with measurements for different observation types
135 (regulier_voorlopig, regulier_beoordeeld, controle en onbekend) as columns. It is
136 intended for applications such as the graphical visualization of groundwater levels.
138 Parameters
139 ----------
140 bro_id : str
141 The BRO-ID of the Groundwater Level Dossier.
142 filter_on_status_quality_control : str or list of str, optional
143 One or more quality control statuses to filter the measurements by.
144 Possible values are 'onbeslist', 'goedgekeurd', and 'afgekeurd'.
145 The default is None.
146 asISO8601 : bool, optional
147 If True, timestamps are returned in ISO8601 format; otherwise, in Unix
148 epoch format. The default is False.
149 to_file : str, optional
150 If provided, the CSV data will be written to this file path. The default
151 is None.
153 Returns
154 -------
155 pd.DataFrame or None
156 A DataFrame containing the time series of measurements, with timestamps
157 as the index. Returns None if no data is available.
158 """
159 url = f"{GroundwaterLevelDossier._rest_url}/seriesAsCsv/{bro_id}"
160 params = {}
161 if filter_on_status_quality_control is not None:
162 if not isinstance(filter_on_status_quality_control, str):
163 filter_on_status_quality_control = ",".join(
164 filter_on_status_quality_control
165 )
166 params["filterOnStatusQualityControl"] = filter_on_status_quality_control
167 if asISO8601:
168 params["asISO8601"] = ""
169 req = util.get_with_rate_limit(url, params=params)
170 req = _check_request_status(req)
171 if to_file is not None:
172 with open(to_file, "w") as f:
173 f.write(req.text)
174 if req.text == "":
175 return None
176 else:
177 df = pd.read_csv(StringIO(req.text))
178 if "Tijdstip" in df.columns:
179 if asISO8601:
180 df["Tijdstip"] = pd.to_datetime(df["Tijdstip"])
181 else:
182 df["Tijdstip"] = pd.to_datetime(df["Tijdstip"], unit="ms")
183 df = df.set_index("Tijdstip")
184 return df
187def read_gld_csv(fname, bro_id, rapportagetype, observatietype, **kwargs):
188 """
189 Read and process a Groundwater Level Dossier (GLD) CSV file.
191 This function reads a CSV file containing groundwater level observations,
192 processes the data according to the specified report type (`rapportagetype`),
193 and returns a DataFrame of the observations. The file is assumed to contain
194 at least three columns: time, value, and qualifier. The 'time' column is parsed
195 as datetime, and additional processing is applied to the data.
197 Parameters
198 ----------
199 fname : str
200 The path to the CSV file containing the groundwater level observations.
201 bro_id : str
202 The BRO-ID of the Groundwater Level Dossier being processed.
203 rapportagetype : str
204 The report type. Can be one of:
205 - 'volledig': as complete as possible (not supported yet)
206 - 'compact': simple format with time and value.
207 - 'compact_met_timestamps': format with timestamps for each observation.
208 **kwargs : additional keyword arguments
209 Additional arguments passed to the `process_observations` function.
211 Returns
212 -------
213 pd.DataFrame
214 A DataFrame containing the processed observations with the following columns:
215 - time: The observation time.
216 - value: The observed groundwater level.
217 - qualifier: The quality code of the observation.
218 - censored_reason: Reason for censoring, if applicable.
219 - censoring_limitvalue: Limit value for censoring, if applicable.
220 - interpolation_type: The interpolation method used, if applicable.
222 Notes
223 -----
224 The time column is parsed as a datetime index. If the report type is
225 'compact_met_timestamps', the time values are converted from Unix epoch time
226 (milliseconds) to a datetime format.
227 """
228 names = [
229 "time",
230 "value",
231 "qualifier",
232 "censored_reason",
233 "censoring_limitvalue",
234 "interpolation_type",
235 ]
236 if rapportagetype == "compact":
237 parse_dates = ["time"]
238 else:
239 parse_dates = None
240 if observatietype is None or rapportagetype == "volledig":
241 # the csv contains multiple observation types, seperated by a header with
242 # observation-type and status.
243 if isinstance(fname, StringIO):
244 lines = fname.readlines()
245 else:
246 with open(fname, "r") as f:
247 lines = f.readlines()
249 # look for header lines
250 headers = []
251 if rapportagetype == "volledig":
252 # the line with metdata is proceeded by a line starting with "observatie ID"
253 for i, line in enumerate(lines):
254 if line.startswith('"observatie ID",'):
255 headers.append(i + 1)
256 header_length = 3
257 else:
258 # the line with metdata is proceeded by an empty line
259 # but directly after the header, there can also be empty lines, that we skip
260 data_lines = False
261 for i, line in enumerate(lines):
262 only_commas = all(c == "," for c in line.rstrip("\r\n"))
263 last_line_was_header = len(headers) > 0 and headers[-1] == i - 1
265 if only_commas:
266 if last_line_was_header:
267 data_lines = True
268 else:
269 data_lines = False
270 else:
271 if not data_lines:
272 headers.append(i)
273 header_length = 2
275 dfs = []
276 for i, header in enumerate(headers):
277 line = lines[header]
278 # split string by comma, but ignore commas between quotes
279 reader = csv.reader(StringIO(line))
280 parts = next(reader)
281 observation_type = parts[3]
282 status = parts[4]
284 if i < len(headers) - 1:
285 current_lines = lines[header + header_length : headers[i + 1] - 1]
286 else:
287 current_lines = lines[header + header_length :]
288 df = pd.read_csv(
289 StringIO("".join(current_lines)),
290 names=names,
291 index_col="time",
292 parse_dates=parse_dates,
293 usecols=[0, 1, 2],
294 )
295 # remove empty indices
296 mask = df.index.isna() & df.isna().all(axis=1)
297 if mask.any():
298 df = df[~mask]
299 df["status"] = status
300 df["observation_type"] = observation_type
301 dfs.append(df)
302 if len(dfs) > 0:
303 df = pd.concat(dfs)
304 else:
305 df = _get_empty_observation_df()
306 else:
307 df = pd.read_csv(
308 fname,
309 names=names,
310 index_col="time",
311 parse_dates=parse_dates,
312 usecols=[0, 1, 2],
313 )
314 if observatietype == "regulier_beoordeeld":
315 df["status"] = "volledigBeoordeeld"
316 df["observation_type"] = "reguliereMeting"
317 elif observatietype == "regulier_voorlopig":
318 df["status"] = "voorlopig"
319 df["observation_type"] = "reguliereMeting"
320 elif observatietype == "controle":
321 df["status"] = np.nan
322 df["observation_type"] = "controleMeting"
323 elif observatietype == "onbekend":
324 df["status"] = "onbekend"
325 df["observation_type"] = "reguliereMeting"
326 if rapportagetype == "compact_met_timestamps":
327 df.index = pd.to_datetime(df.index, unit="ms")
328 # remove empty indices
329 mask = df.index.isna() & df.isna().all(axis=1)
330 if mask.any():
331 df = df[~mask]
332 df = process_observations(df, bro_id, **kwargs)
333 return df
336def get_observations_summary(bro_id):
337 """
338 Fetch a summary of a Groundwater Level Dossier (GLD) in JSON format based on
339 the provided BRO-ID. The summary includes details about the groundwater level
340 observations, such as observation ID, start and end dates.
342 Parameters
343 ----------
344 bro_id : str
345 The BRO-ID of the Groundwater Level Dossier to fetch the summary for.
347 Raises
348 ------
349 Exception
350 If the request to the API fails or the status code is greater than 200,
351 an exception will be raised with the error message returned by the API.
353 Returns
354 -------
355 pd.DataFrame
356 A DataFrame containing the summary of the groundwater level observations.
357 The DataFrame will be indexed by the `observationId` and include
358 `startDate` and `endDate` columns, converted to `datetime` format.
360 Notes
361 -----
362 The function sends a GET request to the REST API and processes the returned
363 JSON data into a DataFrame. If the response contains valid `startDate` or
364 `endDate` fields, they will be converted to `datetime` format using the
365 `pd.to_datetime` function.
366 """
367 url = GroundwaterLevelDossier._rest_url
368 url = "{}/objects/{}/observationsSummary".format(url, bro_id)
369 req = util.get_with_rate_limit(url)
370 req = _check_request_status(req)
371 df = pd.DataFrame(req.json())
372 if "observationId" in df.columns:
373 df = df.set_index("observationId")
374 if "startDate" in df.columns:
375 df["startDate"] = pd.to_datetime(df["startDate"], dayfirst=True)
376 if "endDate" in df.columns:
377 df["endDate"] = pd.to_datetime(df["endDate"], dayfirst=True)
378 return df
381class GroundwaterLevelDossier(bro.FileOrUrl):
382 """
383 Class to represent a Groundwater Level Dossier (GLD) from the BRO.
385 Attributes
386 ----------
387 observation : pd.DataFrame
388 DataFrame containing groundwater level observations with time and value
389 columns. The data is processed and filtered based on the provided arguments.
391 tubeNumber : int
392 The tube number associated with the observation.
394 groundwaterMonitoringWell : str
395 The BRO-ID of the groundwater monitoring well.
396 """
398 _rest_url = "https://publiek.broservices.nl/gm/gld/v1"
400 def _read_contents(self, tree, status=None, observation_type=None, **kwargs):
401 """
402 Parse data to populate the Groundwater Level Dossier attributes.
404 This method reads and processes the XML contents, extracting relevant
405 groundwater monitoring information such as the groundwater monitoring well,
406 tube number, and observations. It also processes the observations into a
407 DataFrame, which is filtered and transformed based on the provided arguments.
409 Parameters
410 ----------
411 tree : xml.etree.ElementTree
412 The XML tree to parse and extract data from.
414 **kwargs : keyword arguments
415 Additional parameters passed to the `process_observations` function to
416 filter and transform the observations.
418 Raises
419 ------
420 Exception
421 If more than one or no GLD element is found in the XML tree.
423 Notes
424 -----
425 The method expects the XML structure to adhere to the specified namespaces
426 and element tags. It processes observation values, timestamps, and qualifiers
427 into a pandas DataFrame.
429 The observation data is stored in the `observation` attribute and can be
430 accessed as a DataFrame.
431 """
432 ns = {
433 "xmlns": "http://www.broservices.nl/xsd/dsgld/1.0",
434 "gldcommon": "http://www.broservices.nl/xsd/gldcommon/1.0",
435 "waterml": "http://www.opengis.net/waterml/2.0",
436 "swe": "http://www.opengis.net/swe/2.0",
437 "om": "http://www.opengis.net/om/2.0",
438 "xlink": "http://www.w3.org/1999/xlink",
439 }
440 gld = self._get_main_object(tree, "GLD_O", ns)
441 for key in gld.attrib:
442 setattr(self, key.split("}", 1)[1], gld.attrib[key])
443 for child in gld:
444 key = self._get_tag(child)
445 if len(child) == 0:
446 setattr(self, key, child.text)
447 elif key == "monitoringPoint":
448 well = child.find("gldcommon:GroundwaterMonitoringTube", ns)
449 gmw_id = well.find("gldcommon:broId", ns).text
450 setattr(self, "groundwaterMonitoringWell", gmw_id)
451 tube_nr = int(well.find("gldcommon:tubeNumber", ns).text)
452 setattr(self, "tubeNumber", tube_nr)
453 elif key in ["registrationHistory"]:
454 self._read_children_of_children(child)
455 elif key == "groundwaterMonitoringNet":
456 for grandchild in child:
457 key2 = grandchild.tag.split("}", 1)[1]
458 if key2 == "GroundwaterMonitoringNet":
459 setattr(self, key, grandchild[0].text)
460 else:
461 logger.warning(f"Unknown key: {key2}")
462 elif key == "observation":
463 # get observation_metadata
464 om_observation = child.find("om:OM_Observation", ns)
465 if om_observation is None:
466 continue
467 metadata = om_observation.find("om:metadata", ns)
468 observation_metadata = metadata.find("waterml:ObservationMetadata", ns)
470 # get status
471 water_ml_status = observation_metadata.find("waterml:status", ns)
472 if water_ml_status is None:
473 status_value = None
474 else:
475 status_value = water_ml_status.attrib[
476 f"{{{ns['xlink']}}}href"
477 ].rsplit(":", 1)[-1]
478 if status is not None and status != status_value:
479 continue
481 # get observation_type
482 parameter = observation_metadata.find("waterml:parameter", ns)
483 named_value = parameter.find("om:NamedValue", ns)
484 name = named_value.find("om:name", ns)
485 assert (
486 name.attrib[f"{{{ns['xlink']}}}href"]
487 == "urn:bro:gld:ObservationMetadata:observationType"
488 )
489 value = named_value.find("om:value", ns)
490 observation_type_value = value.text
491 if (
492 observation_type is not None
493 and observation_type != observation_type_value
494 ):
495 continue
497 times = []
498 values = []
499 qualifiers = []
500 for measurement in child.findall(".//waterml:MeasurementTVP", ns):
501 times.append(measurement.find("waterml:time", ns).text)
502 value = measurement.find("waterml:value", ns).text
503 if value is None:
504 values.append(np.nan)
505 else:
506 values.append(float(value))
507 metadata = measurement.find("waterml:metadata", ns)
508 TVPMM = metadata.find("waterml:TVPMeasurementMetadata", ns)
509 qualifier = TVPMM.find("waterml:qualifier", ns)
510 value = qualifier.find("swe:Category", ns).find("swe:value", ns)
511 qualifiers.append(value.text)
512 observation = pd.DataFrame(
513 {
514 "time": times,
515 "value": values,
516 "qualifier": qualifiers,
517 "status": status_value,
518 "observation_type": observation_type_value,
519 }
520 ).set_index("time")
522 if not hasattr(self, key):
523 self.observation = []
524 self.observation.append(observation)
525 else:
526 self._warn_unknown_tag(key)
527 if hasattr(self, "observation"):
528 self.observation = pd.concat(self.observation)
529 self.observation = process_observations(
530 self.observation, self.broId, **kwargs
531 )
532 else:
533 self.observation = _get_empty_observation_df()
536def process_observations(
537 df,
538 bro_id="gld",
539 to_wintertime=True,
540 qualifier=None,
541 tmin=None,
542 tmax=None,
543 sort=True,
544 drop_duplicates=True,
545):
546 """
547 Process groundwater level observations.
549 This function processes a DataFrame containing groundwater level observations,
550 applying the following operations based on the provided parameters:
551 - Conversion to Dutch winter time (optional).
552 - Filtering observations based on the qualifier.
553 - Dropping duplicate observations (optional).
554 - Sorting the observations by time (optional).
556 Parameters
557 ----------
558 df : pd.DataFrame
559 DataFrame containing the groundwater level observations, with a time
560 index and columns such as "value", "qualifier", etc.
561 bro_id : str
562 The BRO-ID of the Groundwater Level Dossier being processed. Only used for
563 logging-purposes. The default is "gld".
564 to_wintertime : bool, optional
565 If True, the observation times are converted to Dutch winter time by
566 removing any time zone information and adding one hour. If to_wintertime is
567 False, observation times are kept in CET/CEST. Default is True.
568 qualifier : str or list of str, optional
569 If provided, the observations are filtered based on their "qualifier"
570 column. Only rows with the specified qualifier(s) will be kept.
571 tmin : str or datetime, optional
572 The minimum time for filtering observations. Defaults to None.
573 tmax : str or datetime, optional
574 The maximum time for filtering observations. Defaults to None.
575 sort : bool, optional
576 If True, the DataFrame will be sorted, see `sort_observations`. Default is
577 True.
578 drop_duplicates : bool, optional
579 If True, any duplicate observation times will be dropped, keeping only
580 the first occurrence. Default is True.
582 Returns
583 -------
584 pd.DataFrame
585 A DataFrame containing the processed observations, with duplicate rows
586 (if any) removed, the time index sorted, and filtered by qualifier if
587 applicable.
589 """
590 df.index = pd.to_datetime(df.index, utc=True)
591 if to_wintertime:
592 # remove time zone information by transforming to dutch winter time
593 df.index = df.index.tz_localize(None) + pd.Timedelta(1, unit="h")
594 else:
595 df.index = df.index.tz_convert("CET")
597 if qualifier is not None:
598 if isinstance(qualifier, str):
599 df = df[df["qualifier"] == qualifier]
600 else:
601 df = df[df["qualifier"].isin(qualifier)]
603 if tmin is not None:
604 df = df.loc[pd.Timestamp(tmin) :]
606 if tmax is not None:
607 df = df.loc[: pd.Timestamp(tmax)]
609 if sort:
610 df = sort_observations(df)
612 if drop_duplicates:
613 df = drop_duplicate_observations(df, bro_id=bro_id, sort=sort)
615 return df
618def sort_observations(df):
619 """
620 Sort observations in a DataFrame by multiple criteria. Applies a multi-level sort
621 to the input DataFrame, prioritizing the following criteria in order:
622 1. By the DataFrame's DatetimeIndex in ascending order
623 2. By status (if present): volledigBeoordeeld before voorlopig before onbekend
624 3. By observation_type (if present): reguliereMeting before controleMeting
626 Parameters
627 ----------
628 df : pandas.DataFrame
629 DataFrame with optional 'observation_type' and 'status' columns,
630 and a DatetimeIndex.
632 Returns
633 -------
634 pandas.DataFrame
635 Sorted DataFrame with the same structure as input.
636 """
637 if "observation_type" in df.columns:
638 # make sure measurements with observation_type set to reguliereMeting are first
639 sort_dict = {"reguliereMeting": 0, "controleMeting": 1}
640 df = df.sort_values("observation_type", key=lambda x: x.map(sort_dict))
642 if "status" in df.columns:
643 # make sure measurements with status set to volledigBeoordeeld are first
644 sort_dict = {"volledigBeoordeeld": 0, "voorlopig": 1, "onbekend": 2}
645 df = df.sort_values("status", key=lambda x: x.map(sort_dict))
647 # sort based on DatetimeIndex
648 df = df.sort_index()
650 return df
653def drop_duplicate_observations(df, bro_id="gld", keep="first", sort=True):
654 """
655 Remove duplicate observations from a DataFrame based on its index.
657 Parameters
658 ----------
659 df : pd.DataFrame
660 The DataFrame to process.
661 bro_id : str, optional
662 Identifier for the dataset, used in warning messages. Default is "gld".
663 keep : {'first', 'last', False}, optional
664 Which duplicates to mark:
665 - 'first' : Mark duplicates as True except for the first occurrence.
666 - 'last' : Mark duplicates as True except for the last occurrence.
667 - False : Mark all duplicates as True.
668 Default is 'first'.
670 Returns
671 -------
672 pd.DataFrame
673 DataFrame with duplicate index values removed, keeping only the rows
674 specified by the `keep` parameter.
676 Warnings
677 --------
678 Logs a warning message if duplicates are found, indicating the number and
679 total count of duplicates before removal.
680 """
681 if df.index.has_duplicates:
682 duplicates = df.index.duplicated(keep=keep)
683 message = "{} contains {} duplicates (of {}). Keeping only first values"
684 message = message.format(bro_id, duplicates.sum(), len(df.index))
685 if sort:
686 message = f"{message} (sorted for importance)"
687 message = f"{message}."
688 logger.warning(message)
689 df = df[~duplicates]
690 return df
693def _get_empty_observation_df():
694 columns = ["time", "value", "qualifier", "status", "observation_type"]
695 return pd.DataFrame(columns=columns).set_index("time")
698cl = GroundwaterLevelDossier
700get_bro_ids_of_bronhouder = partial(bro._get_bro_ids_of_bronhouder, cl)
701get_bro_ids_of_bronhouder.__doc__ = bro._get_bro_ids_of_bronhouder.__doc__
703get_data_for_bro_ids = partial(bro._get_data_for_bro_ids, cl)
704get_data_for_bro_ids.__doc__ = bro._get_data_for_bro_ids.__doc__