Coverage for brodata / gld.py: 72%

264 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-13 12:57 +0000

1import csv 

2import logging 

3import time 

4from functools import partial 

5from io import StringIO 

6 

7import numpy as np 

8import pandas as pd 

9 

10from . import bro, util 

11 

12logger = logging.getLogger(__name__) 

13 

14 

15def get_objects_as_csv( 

16 bro_id, 

17 rapportagetype="volledig", 

18 observatietype=None, 

19 to_file=None, 

20 return_contents=True, 

21 **kwargs, 

22): 

23 """ 

24 Fetch a complete Groundwater Level Dossier (GLD) as a CSV (RFC 4180) file 

25 based on the provided BRO-ID. The data can be filtered by report type and 

26 observation type. 

27 

28 Parameters 

29 ---------- 

30 bro_id : str 

31 The BRO-ID of the Groundwater Level Dossier to fetch. It can also be a full url, 

32 which is used by the gm-services. When using a full url, the parameter 

33 `rapportagetype` needs to reflect the choice in the url, and the parameter 

34 `observatietype` is ignored. 

35 rapportagetype : str, optional 

36 Type of report. The valid values are: 

37 - "volledig" : Full report 

38 - "compact" : Compact report with readable timestamps 

39 - "compact_met_timestamps" : Compact report with Unix epoch timestamps 

40 Default is "volledig". 

41 observatietype : str, optional 

42 Type of observations. The valid values are: 

43 - "regulier_beoordeeld" : Regular measurement with full evaluation 

44 (observatietype = reguliere meting en mate beoordeling = volledig beoordeeld) 

45 - "regulier_voorlopig" : Regular measurement with preliminary evaluation 

46 (observatietype = reguliere meting en mate beoordeling = voorlopig) 

47 - "controle" : Control measurement 

48 (observatietype = controle meting) 

49 - "onbekend" : Unknown evaluation 

50 (observatietype = reguliere meting en mate beoordeling = onbekend) 

51 If None, all observation types will be returned. Default is None. 

52 to_file : str, optional 

53 If provided, the CSV data will be written to the specified file. 

54 If None, the function returns the CSV data as a DataFrame. Default is None. 

55 return_contents : bool, optional 

56 If True, the function returns the parsed CSV data as a DataFrame. If False, 

57 the function returns None after saving the CSV to the specified file (if 

58 `to_file` is provided). Default is True. 

59 **kwargs : additional keyword arguments 

60 Additional arguments passed to `read_gld_csv`. 

61 

62 Returns 

63 ------- 

64 pd.DataFrame or None 

65 If successful, returns a DataFrame containing the parsed CSV data. 

66 If `to_file` is provided, returns None after saving the CSV to the specified file. 

67 If the request fails or returns empty data, returns None. 

68 

69 Notes 

70 ----- 

71 The function sends a GET request to the Groundwater Level Dossier API 

72 and fetches the data in CSV format. The `rapportagetype` and `observatietype` 

73 parameters can be used to filter the data. 

74 """ 

75 if bro_id.startswith("http"): 

76 req = util.get_with_rate_limit(bro_id) 

77 else: 

78 url = f"{GroundwaterLevelDossier._rest_url}/objectsAsCsv/{bro_id}" 

79 params = { 

80 "rapportagetype": rapportagetype, 

81 } 

82 if observatietype is not None: 

83 params["observatietype"] = observatietype 

84 req = util.get_with_rate_limit(url, params=params) 

85 req = _check_request_status(req) 

86 if to_file is not None: 

87 with open(to_file, "w") as f: 

88 f.write(req.text) 

89 if not return_contents: 

90 return 

91 if req.text == "": 

92 return None 

93 else: 

94 df = read_gld_csv( 

95 StringIO(req.text), 

96 bro_id, 

97 rapportagetype=rapportagetype, 

98 observatietype=observatietype, 

99 **kwargs, 

100 ) 

101 return df 

102 

103 

104def _check_request_status(req): 

105 if req.status_code == 429: 

106 msg = "Too many requests. The BRO API has rate limits in place." 

107 logger.warning(msg) 

108 # try 3 times with increasing wait time 

109 wait_times = [1, 2, 4] 

110 for wait_time in wait_times: 

111 logger.warning(f"Waiting for {wait_time} seconds before retrying...") 

112 time.sleep(wait_time) 

113 req = util.get_with_rate_limit(req.url) 

114 if req.status_code <= 200: 

115 break 

116 if req.status_code == 429: 

117 raise Exception(msg + " Please try again later.") 

118 if req.status_code > 200: 

119 json_data = req.json() 

120 if "errors" in json_data: 

121 msg = json_data["errors"][0]["message"] 

122 else: 

123 msg = "{}: {}".format(json_data["title"], json_data["description"]) 

124 raise Exception(msg) 

125 return req 

126 

127 

128def get_series_as_csv( 

129 bro_id, filter_on_status_quality_control=None, asISO8601=False, to_file=None 

130): 

131 """ 

132 Get groundwater level series as a CSV, with timestamps and corresponding measurements. 

133 

134 This function retrieves a table with measurements for different observation types 

135 (regulier_voorlopig, regulier_beoordeeld, controle en onbekend) as columns. It is 

136 intended for applications such as the graphical visualization of groundwater levels. 

137 

138 Parameters 

139 ---------- 

140 bro_id : str 

141 The BRO-ID of the Groundwater Level Dossier. 

142 filter_on_status_quality_control : str or list of str, optional 

143 One or more quality control statuses to filter the measurements by. 

144 Possible values are 'onbeslist', 'goedgekeurd', and 'afgekeurd'. 

145 The default is None. 

146 asISO8601 : bool, optional 

147 If True, timestamps are returned in ISO8601 format; otherwise, in Unix 

148 epoch format. The default is False. 

149 to_file : str, optional 

150 If provided, the CSV data will be written to this file path. The default 

151 is None. 

152 

153 Returns 

154 ------- 

155 pd.DataFrame or None 

156 A DataFrame containing the time series of measurements, with timestamps 

157 as the index. Returns None if no data is available. 

158 """ 

159 url = f"{GroundwaterLevelDossier._rest_url}/seriesAsCsv/{bro_id}" 

160 params = {} 

161 if filter_on_status_quality_control is not None: 

162 if not isinstance(filter_on_status_quality_control, str): 

163 filter_on_status_quality_control = ",".join( 

164 filter_on_status_quality_control 

165 ) 

166 params["filterOnStatusQualityControl"] = filter_on_status_quality_control 

167 if asISO8601: 

168 params["asISO8601"] = "" 

169 req = util.get_with_rate_limit(url, params=params) 

170 req = _check_request_status(req) 

171 if to_file is not None: 

172 with open(to_file, "w") as f: 

173 f.write(req.text) 

174 if req.text == "": 

175 return None 

176 else: 

177 df = pd.read_csv(StringIO(req.text)) 

178 if "Tijdstip" in df.columns: 

179 if asISO8601: 

180 df["Tijdstip"] = pd.to_datetime(df["Tijdstip"]) 

181 else: 

182 df["Tijdstip"] = pd.to_datetime(df["Tijdstip"], unit="ms") 

183 df = df.set_index("Tijdstip") 

184 return df 

185 

186 

187def read_gld_csv(fname, bro_id, rapportagetype, observatietype, **kwargs): 

188 """ 

189 Read and process a Groundwater Level Dossier (GLD) CSV file. 

190 

191 This function reads a CSV file containing groundwater level observations, 

192 processes the data according to the specified report type (`rapportagetype`), 

193 and returns a DataFrame of the observations. The file is assumed to contain 

194 at least three columns: time, value, and qualifier. The 'time' column is parsed 

195 as datetime, and additional processing is applied to the data. 

196 

197 Parameters 

198 ---------- 

199 fname : str 

200 The path to the CSV file containing the groundwater level observations. 

201 bro_id : str 

202 The BRO-ID of the Groundwater Level Dossier being processed. 

203 rapportagetype : str 

204 The report type. Can be one of: 

205 - 'volledig': as complete as possible (not supported yet) 

206 - 'compact': simple format with time and value. 

207 - 'compact_met_timestamps': format with timestamps for each observation. 

208 **kwargs : additional keyword arguments 

209 Additional arguments passed to the `process_observations` function. 

210 

211 Returns 

212 ------- 

213 pd.DataFrame 

214 A DataFrame containing the processed observations with the following columns: 

215 - time: The observation time. 

216 - value: The observed groundwater level. 

217 - qualifier: The quality code of the observation. 

218 - censored_reason: Reason for censoring, if applicable. 

219 - censoring_limitvalue: Limit value for censoring, if applicable. 

220 - interpolation_type: The interpolation method used, if applicable. 

221 

222 Notes 

223 ----- 

224 The time column is parsed as a datetime index. If the report type is 

225 'compact_met_timestamps', the time values are converted from Unix epoch time 

226 (milliseconds) to a datetime format. 

227 """ 

228 names = [ 

229 "time", 

230 "value", 

231 "qualifier", 

232 "censored_reason", 

233 "censoring_limitvalue", 

234 "interpolation_type", 

235 ] 

236 if rapportagetype == "compact": 

237 parse_dates = ["time"] 

238 else: 

239 parse_dates = None 

240 if observatietype is None or rapportagetype == "volledig": 

241 # the csv contains multiple observation types, seperated by a header with 

242 # observation-type and status. 

243 if isinstance(fname, StringIO): 

244 lines = fname.readlines() 

245 else: 

246 with open(fname, "r") as f: 

247 lines = f.readlines() 

248 

249 # look for header lines 

250 headers = [] 

251 if rapportagetype == "volledig": 

252 # the line with metdata is proceeded by a line starting with "observatie ID" 

253 for i, line in enumerate(lines): 

254 if line.startswith('"observatie ID",'): 

255 headers.append(i + 1) 

256 header_length = 3 

257 else: 

258 # the line with metdata is proceeded by an empty line 

259 # but directly after the header, there can also be empty lines, that we skip 

260 data_lines = False 

261 for i, line in enumerate(lines): 

262 only_commas = all(c == "," for c in line.rstrip("\r\n")) 

263 last_line_was_header = len(headers) > 0 and headers[-1] == i - 1 

264 

265 if only_commas: 

266 if last_line_was_header: 

267 data_lines = True 

268 else: 

269 data_lines = False 

270 else: 

271 if not data_lines: 

272 headers.append(i) 

273 header_length = 2 

274 

275 dfs = [] 

276 for i, header in enumerate(headers): 

277 line = lines[header] 

278 # split string by comma, but ignore commas between quotes 

279 reader = csv.reader(StringIO(line)) 

280 parts = next(reader) 

281 observation_type = parts[3] 

282 status = parts[4] 

283 

284 if i < len(headers) - 1: 

285 current_lines = lines[header + header_length : headers[i + 1] - 1] 

286 else: 

287 current_lines = lines[header + header_length :] 

288 df = pd.read_csv( 

289 StringIO("".join(current_lines)), 

290 names=names, 

291 index_col="time", 

292 parse_dates=parse_dates, 

293 usecols=[0, 1, 2], 

294 ) 

295 # remove empty indices 

296 mask = df.index.isna() & df.isna().all(axis=1) 

297 if mask.any(): 

298 df = df[~mask] 

299 df["status"] = status 

300 df["observation_type"] = observation_type 

301 dfs.append(df) 

302 if len(dfs) > 0: 

303 df = pd.concat(dfs) 

304 else: 

305 df = _get_empty_observation_df() 

306 else: 

307 df = pd.read_csv( 

308 fname, 

309 names=names, 

310 index_col="time", 

311 parse_dates=parse_dates, 

312 usecols=[0, 1, 2], 

313 ) 

314 if observatietype == "regulier_beoordeeld": 

315 df["status"] = "volledigBeoordeeld" 

316 df["observation_type"] = "reguliereMeting" 

317 elif observatietype == "regulier_voorlopig": 

318 df["status"] = "voorlopig" 

319 df["observation_type"] = "reguliereMeting" 

320 elif observatietype == "controle": 

321 df["status"] = np.nan 

322 df["observation_type"] = "controleMeting" 

323 elif observatietype == "onbekend": 

324 df["status"] = "onbekend" 

325 df["observation_type"] = "reguliereMeting" 

326 if rapportagetype == "compact_met_timestamps": 

327 df.index = pd.to_datetime(df.index, unit="ms") 

328 # remove empty indices 

329 mask = df.index.isna() & df.isna().all(axis=1) 

330 if mask.any(): 

331 df = df[~mask] 

332 df = process_observations(df, bro_id, **kwargs) 

333 return df 

334 

335 

336def get_observations_summary(bro_id): 

337 """ 

338 Fetch a summary of a Groundwater Level Dossier (GLD) in JSON format based on 

339 the provided BRO-ID. The summary includes details about the groundwater level 

340 observations, such as observation ID, start and end dates. 

341 

342 Parameters 

343 ---------- 

344 bro_id : str 

345 The BRO-ID of the Groundwater Level Dossier to fetch the summary for. 

346 

347 Raises 

348 ------ 

349 Exception 

350 If the request to the API fails or the status code is greater than 200, 

351 an exception will be raised with the error message returned by the API. 

352 

353 Returns 

354 ------- 

355 pd.DataFrame 

356 A DataFrame containing the summary of the groundwater level observations. 

357 The DataFrame will be indexed by the `observationId` and include 

358 `startDate` and `endDate` columns, converted to `datetime` format. 

359 

360 Notes 

361 ----- 

362 The function sends a GET request to the REST API and processes the returned 

363 JSON data into a DataFrame. If the response contains valid `startDate` or 

364 `endDate` fields, they will be converted to `datetime` format using the 

365 `pd.to_datetime` function. 

366 """ 

367 url = GroundwaterLevelDossier._rest_url 

368 url = "{}/objects/{}/observationsSummary".format(url, bro_id) 

369 req = util.get_with_rate_limit(url) 

370 req = _check_request_status(req) 

371 df = pd.DataFrame(req.json()) 

372 if "observationId" in df.columns: 

373 df = df.set_index("observationId") 

374 if "startDate" in df.columns: 

375 df["startDate"] = pd.to_datetime(df["startDate"], dayfirst=True) 

376 if "endDate" in df.columns: 

377 df["endDate"] = pd.to_datetime(df["endDate"], dayfirst=True) 

378 return df 

379 

380 

381class GroundwaterLevelDossier(bro.FileOrUrl): 

382 """ 

383 Class to represent a Groundwater Level Dossier (GLD) from the BRO. 

384 

385 Attributes 

386 ---------- 

387 observation : pd.DataFrame 

388 DataFrame containing groundwater level observations with time and value 

389 columns. The data is processed and filtered based on the provided arguments. 

390 

391 tubeNumber : int 

392 The tube number associated with the observation. 

393 

394 groundwaterMonitoringWell : str 

395 The BRO-ID of the groundwater monitoring well. 

396 """ 

397 

398 _rest_url = "https://publiek.broservices.nl/gm/gld/v1" 

399 

400 def _read_contents(self, tree, status=None, observation_type=None, **kwargs): 

401 """ 

402 Parse data to populate the Groundwater Level Dossier attributes. 

403 

404 This method reads and processes the XML contents, extracting relevant 

405 groundwater monitoring information such as the groundwater monitoring well, 

406 tube number, and observations. It also processes the observations into a 

407 DataFrame, which is filtered and transformed based on the provided arguments. 

408 

409 Parameters 

410 ---------- 

411 tree : xml.etree.ElementTree 

412 The XML tree to parse and extract data from. 

413 

414 **kwargs : keyword arguments 

415 Additional parameters passed to the `process_observations` function to 

416 filter and transform the observations. 

417 

418 Raises 

419 ------ 

420 Exception 

421 If more than one or no GLD element is found in the XML tree. 

422 

423 Notes 

424 ----- 

425 The method expects the XML structure to adhere to the specified namespaces 

426 and element tags. It processes observation values, timestamps, and qualifiers 

427 into a pandas DataFrame. 

428 

429 The observation data is stored in the `observation` attribute and can be 

430 accessed as a DataFrame. 

431 """ 

432 ns = { 

433 "xmlns": "http://www.broservices.nl/xsd/dsgld/1.0", 

434 "gldcommon": "http://www.broservices.nl/xsd/gldcommon/1.0", 

435 "waterml": "http://www.opengis.net/waterml/2.0", 

436 "swe": "http://www.opengis.net/swe/2.0", 

437 "om": "http://www.opengis.net/om/2.0", 

438 "xlink": "http://www.w3.org/1999/xlink", 

439 } 

440 gld = self._get_main_object(tree, "GLD_O", ns) 

441 for key in gld.attrib: 

442 setattr(self, key.split("}", 1)[1], gld.attrib[key]) 

443 for child in gld: 

444 key = self._get_tag(child) 

445 if len(child) == 0: 

446 setattr(self, key, child.text) 

447 elif key == "monitoringPoint": 

448 well = child.find("gldcommon:GroundwaterMonitoringTube", ns) 

449 gmw_id = well.find("gldcommon:broId", ns).text 

450 setattr(self, "groundwaterMonitoringWell", gmw_id) 

451 tube_nr = int(well.find("gldcommon:tubeNumber", ns).text) 

452 setattr(self, "tubeNumber", tube_nr) 

453 elif key in ["registrationHistory"]: 

454 self._read_children_of_children(child) 

455 elif key == "groundwaterMonitoringNet": 

456 for grandchild in child: 

457 key2 = grandchild.tag.split("}", 1)[1] 

458 if key2 == "GroundwaterMonitoringNet": 

459 setattr(self, key, grandchild[0].text) 

460 else: 

461 logger.warning(f"Unknown key: {key2}") 

462 elif key == "observation": 

463 # get observation_metadata 

464 om_observation = child.find("om:OM_Observation", ns) 

465 if om_observation is None: 

466 continue 

467 metadata = om_observation.find("om:metadata", ns) 

468 observation_metadata = metadata.find("waterml:ObservationMetadata", ns) 

469 

470 # get status 

471 water_ml_status = observation_metadata.find("waterml:status", ns) 

472 if water_ml_status is None: 

473 status_value = None 

474 else: 

475 status_value = water_ml_status.attrib[ 

476 f"{{{ns['xlink']}}}href" 

477 ].rsplit(":", 1)[-1] 

478 if status is not None and status != status_value: 

479 continue 

480 

481 # get observation_type 

482 parameter = observation_metadata.find("waterml:parameter", ns) 

483 named_value = parameter.find("om:NamedValue", ns) 

484 name = named_value.find("om:name", ns) 

485 assert ( 

486 name.attrib[f"{{{ns['xlink']}}}href"] 

487 == "urn:bro:gld:ObservationMetadata:observationType" 

488 ) 

489 value = named_value.find("om:value", ns) 

490 observation_type_value = value.text 

491 if ( 

492 observation_type is not None 

493 and observation_type != observation_type_value 

494 ): 

495 continue 

496 

497 times = [] 

498 values = [] 

499 qualifiers = [] 

500 for measurement in child.findall(".//waterml:MeasurementTVP", ns): 

501 times.append(measurement.find("waterml:time", ns).text) 

502 value = measurement.find("waterml:value", ns).text 

503 if value is None: 

504 values.append(np.nan) 

505 else: 

506 values.append(float(value)) 

507 metadata = measurement.find("waterml:metadata", ns) 

508 TVPMM = metadata.find("waterml:TVPMeasurementMetadata", ns) 

509 qualifier = TVPMM.find("waterml:qualifier", ns) 

510 value = qualifier.find("swe:Category", ns).find("swe:value", ns) 

511 qualifiers.append(value.text) 

512 observation = pd.DataFrame( 

513 { 

514 "time": times, 

515 "value": values, 

516 "qualifier": qualifiers, 

517 "status": status_value, 

518 "observation_type": observation_type_value, 

519 } 

520 ).set_index("time") 

521 

522 if not hasattr(self, key): 

523 self.observation = [] 

524 self.observation.append(observation) 

525 else: 

526 self._warn_unknown_tag(key) 

527 if hasattr(self, "observation"): 

528 self.observation = pd.concat(self.observation) 

529 self.observation = process_observations( 

530 self.observation, self.broId, **kwargs 

531 ) 

532 else: 

533 self.observation = _get_empty_observation_df() 

534 

535 

536def process_observations( 

537 df, 

538 bro_id="gld", 

539 to_wintertime=True, 

540 qualifier=None, 

541 tmin=None, 

542 tmax=None, 

543 sort=True, 

544 drop_duplicates=True, 

545): 

546 """ 

547 Process groundwater level observations. 

548 

549 This function processes a DataFrame containing groundwater level observations, 

550 applying the following operations based on the provided parameters: 

551 - Conversion to Dutch winter time (optional). 

552 - Filtering observations based on the qualifier. 

553 - Dropping duplicate observations (optional). 

554 - Sorting the observations by time (optional). 

555 

556 Parameters 

557 ---------- 

558 df : pd.DataFrame 

559 DataFrame containing the groundwater level observations, with a time 

560 index and columns such as "value", "qualifier", etc. 

561 bro_id : str 

562 The BRO-ID of the Groundwater Level Dossier being processed. Only used for 

563 logging-purposes. The default is "gld". 

564 to_wintertime : bool, optional 

565 If True, the observation times are converted to Dutch winter time by 

566 removing any time zone information and adding one hour. If to_wintertime is 

567 False, observation times are kept in CET/CEST. Default is True. 

568 qualifier : str or list of str, optional 

569 If provided, the observations are filtered based on their "qualifier" 

570 column. Only rows with the specified qualifier(s) will be kept. 

571 tmin : str or datetime, optional 

572 The minimum time for filtering observations. Defaults to None. 

573 tmax : str or datetime, optional 

574 The maximum time for filtering observations. Defaults to None. 

575 sort : bool, optional 

576 If True, the DataFrame will be sorted, see `sort_observations`. Default is 

577 True. 

578 drop_duplicates : bool, optional 

579 If True, any duplicate observation times will be dropped, keeping only 

580 the first occurrence. Default is True. 

581 

582 Returns 

583 ------- 

584 pd.DataFrame 

585 A DataFrame containing the processed observations, with duplicate rows 

586 (if any) removed, the time index sorted, and filtered by qualifier if 

587 applicable. 

588 

589 """ 

590 df.index = pd.to_datetime(df.index, utc=True) 

591 if to_wintertime: 

592 # remove time zone information by transforming to dutch winter time 

593 df.index = df.index.tz_localize(None) + pd.Timedelta(1, unit="h") 

594 else: 

595 df.index = df.index.tz_convert("CET") 

596 

597 if qualifier is not None: 

598 if isinstance(qualifier, str): 

599 df = df[df["qualifier"] == qualifier] 

600 else: 

601 df = df[df["qualifier"].isin(qualifier)] 

602 

603 if tmin is not None: 

604 df = df.loc[pd.Timestamp(tmin) :] 

605 

606 if tmax is not None: 

607 df = df.loc[: pd.Timestamp(tmax)] 

608 

609 if sort: 

610 df = sort_observations(df) 

611 

612 if drop_duplicates: 

613 df = drop_duplicate_observations(df, bro_id=bro_id, sort=sort) 

614 

615 return df 

616 

617 

618def sort_observations(df): 

619 """ 

620 Sort observations in a DataFrame by multiple criteria. Applies a multi-level sort 

621 to the input DataFrame, prioritizing the following criteria in order: 

622 1. By the DataFrame's DatetimeIndex in ascending order 

623 2. By status (if present): volledigBeoordeeld before voorlopig before onbekend 

624 3. By observation_type (if present): reguliereMeting before controleMeting 

625 

626 Parameters 

627 ---------- 

628 df : pandas.DataFrame 

629 DataFrame with optional 'observation_type' and 'status' columns, 

630 and a DatetimeIndex. 

631 

632 Returns 

633 ------- 

634 pandas.DataFrame 

635 Sorted DataFrame with the same structure as input. 

636 """ 

637 if "observation_type" in df.columns: 

638 # make sure measurements with observation_type set to reguliereMeting are first 

639 sort_dict = {"reguliereMeting": 0, "controleMeting": 1} 

640 df = df.sort_values("observation_type", key=lambda x: x.map(sort_dict)) 

641 

642 if "status" in df.columns: 

643 # make sure measurements with status set to volledigBeoordeeld are first 

644 sort_dict = {"volledigBeoordeeld": 0, "voorlopig": 1, "onbekend": 2} 

645 df = df.sort_values("status", key=lambda x: x.map(sort_dict)) 

646 

647 # sort based on DatetimeIndex 

648 df = df.sort_index() 

649 

650 return df 

651 

652 

653def drop_duplicate_observations(df, bro_id="gld", keep="first", sort=True): 

654 """ 

655 Remove duplicate observations from a DataFrame based on its index. 

656 

657 Parameters 

658 ---------- 

659 df : pd.DataFrame 

660 The DataFrame to process. 

661 bro_id : str, optional 

662 Identifier for the dataset, used in warning messages. Default is "gld". 

663 keep : {'first', 'last', False}, optional 

664 Which duplicates to mark: 

665 - 'first' : Mark duplicates as True except for the first occurrence. 

666 - 'last' : Mark duplicates as True except for the last occurrence. 

667 - False : Mark all duplicates as True. 

668 Default is 'first'. 

669 

670 Returns 

671 ------- 

672 pd.DataFrame 

673 DataFrame with duplicate index values removed, keeping only the rows 

674 specified by the `keep` parameter. 

675 

676 Warnings 

677 -------- 

678 Logs a warning message if duplicates are found, indicating the number and 

679 total count of duplicates before removal. 

680 """ 

681 if df.index.has_duplicates: 

682 duplicates = df.index.duplicated(keep=keep) 

683 message = "{} contains {} duplicates (of {}). Keeping only first values" 

684 message = message.format(bro_id, duplicates.sum(), len(df.index)) 

685 if sort: 

686 message = f"{message} (sorted for importance)" 

687 message = f"{message}." 

688 logger.warning(message) 

689 df = df[~duplicates] 

690 return df 

691 

692 

693def _get_empty_observation_df(): 

694 columns = ["time", "value", "qualifier", "status", "observation_type"] 

695 return pd.DataFrame(columns=columns).set_index("time") 

696 

697 

698cl = GroundwaterLevelDossier 

699 

700get_bro_ids_of_bronhouder = partial(bro._get_bro_ids_of_bronhouder, cl) 

701get_bro_ids_of_bronhouder.__doc__ = bro._get_bro_ids_of_bronhouder.__doc__ 

702 

703get_data_for_bro_ids = partial(bro._get_data_for_bro_ids, cl) 

704get_data_for_bro_ids.__doc__ = bro._get_data_for_bro_ids.__doc__