Coverage for brodata / util.py: 53%
154 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-20 14:37 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-20 14:37 +0000
1import logging
2import os
3from zipfile import ZipFile, ZIP_DEFLATED, ZIP_STORED
5import numpy as np
7logger = logging.getLogger(__name__)
9try:
10 from tqdm import tqdm
11except ImportError:
12 # fallback: generate a dummy method with the same interface
13 def tqdm(iterable=None, **kwargs):
14 return iterable if iterable is not None else []
17def read_zipfile(fname, pathnames=None, use_bro_abbreviation=False, override_ext=None):
18 """
19 Read and parse files from a ZIP archive downloaded from BROloket.
21 Parameters
22 ----------
23 fname : str
24 Path to the ZIP file to read.
25 pathnames : list of str or str, optional
26 List of folder names within the ZIP archive to process. If None, all unique
27 non-root directories are processed.
28 use_bro_abbreviation: bool, optional
29 If True, use the abbreviation of bro-objects (e.g. GMW, GLD, BHR) to store the
30 data in the root of the returned dictionary. If False, use the first level of
31 the folder structure in the zip-file to store the returned objects (e.g.
32 BRO_Grondwatermonitoring, BRO_GeologischBooronderzoek). The default is False.
33 override_ext : str, optional
34 Removed argument from `read_zipfile`
36 Returns
37 -------
38 dict
39 Nested dictionary where the first-level keys are data-categories, and the
40 second-level keys are file base names (bro-id or nitg-nr).
41 The values are either parsed objects (from corresponding classes) or file
42 objects (e.g., PIL.Image for .tif files).
44 Notes
45 -----
46 - For .tif files, PIL.Image objects are returned.
47 - For other supported types, the corresponding class is instantiated with the file
48 and the ZipFile object.
49 """
50 if override_ext is not None:
51 raise (Exception("The parameter `override_ext` is removed from `read_zipfile`"))
53 data = {}
54 with ZipFile(fname) as zf:
55 namelist = np.array(zf.namelist())
56 for file in namelist:
57 name, ext = os.path.splitext(os.path.basename(file))
58 if name == "":
59 # this is a directory
60 continue
61 pathname = os.path.dirname(file)
62 if pathname == "":
63 # skip file in the root path (usually the file 'locatie_levering.kml')
64 continue
65 if pathnames is not None:
66 if pathname not in pathnames:
67 continue
68 if pathname.startswith("BRO"):
69 if ext != ".xml":
70 logger.info(f"Skipping file: {file}")
71 continue
72 if use_bro_abbreviation:
73 key = name[:3]
74 else:
75 key = os.path.normpath(pathname).split(os.sep)[0]
76 if name.startswith("BHR"):
77 if pathname == "BRO_GeotechnischBooronderzoek":
78 from .bhr import GeotechnicalBoreholeResearch as cl
79 elif pathname == "BRO_GeologischBooronderzoek":
80 from .bhr import GeologicalBoreholeResearch as cl
81 elif pathname == "BodemkundigBooronderzoek":
82 from .bhr import PedologicalBoreholeResearch as cl
83 else:
84 logger.warning(f"Unknown BHR-type: {pathname}")
85 elif name.startswith("CPT"):
86 from .cpt import ConePenetrationTest as cl
87 elif name.startswith("EPC"):
88 from .epc import ExplorationProductionConstruction as cl
89 elif name.startswith("FRD"):
90 from .frd import FormationResistanceDossier as cl
91 elif name.startswith("GAR"):
92 from .gar import GroundwaterAnalysisReport as cl
93 elif name.startswith("GLD"):
94 from .gld import GroundwaterLevelDossier as cl
95 elif name.startswith("GMN"):
96 from .gmn import GroundwaterMonitoringNetwork as cl
97 elif name.startswith("GMW"):
98 from .gmw import GroundwaterMonitoringWell as cl
99 elif name.startswith("GPD"):
100 from .gpd import GroundwaterProductionDossier as cl
101 elif name.startswith("GUF"):
102 from .guf import GroundwaterUtilisationFacility as cl
103 elif name.startswith("SAD"):
104 from .sad import SiteAssessmentData as cl
105 elif name.startswith("SFR"):
106 from .sfr import SoilFaceResearch as cl
107 else:
108 logger.warning("Unknown file-type: {file}")
109 continue
111 elif pathname.startswith("DINO"):
112 key = pathname
113 if pathname == "DINO_GeologischBooronderzoekBoormonsterprofiel":
114 from .dino import GeologischBooronderzoek as cl
116 if ext != ".csv":
117 logger.info(f"Skipping file: {file}")
118 continue
119 elif pathname == "DINO_GeotechnischSondeeronderzoek":
120 cl = None
121 if ext != ".tif":
122 logger.info(f"Skipping file: {file}")
123 continue
124 elif pathname == "DINO_GeologischBooronderzoekKorrelgrootteAnalyse":
125 logger.warning(f"Folder {pathname} not supported yet")
126 continue
127 elif pathname == "DINO_GeologischBooronderzoekChemischeAnalyse":
128 logger.warning(f"Folder {pathname} not supported yet")
129 continue
130 elif pathname == "DINO_Grondwatersamenstelling":
131 from .dino import Grondwatersamenstelling as cl
133 if ext != ".csv":
134 logger.info(f"Skipping file: {file}")
135 continue
136 elif pathname == "DINO_Grondwaterstanden":
137 from .dino import Grondwaterstand as cl
139 if ext != ".csv":
140 logger.info(f"Skipping file: {file}")
141 continue
142 elif pathname in [
143 "DINO_VerticaalElektrischSondeeronderzoek",
144 "DINO_GeoElectrischOnderzoek",
145 ]:
146 from .dino import VerticaalElektrischSondeeronderzoek as cl
148 if ext != ".csv":
149 logger.info(f"Skipping file: {file}")
150 continue
151 else:
152 logger.warning(f"Folder {pathname} not supported yet")
153 continue
155 if key not in data:
156 data[key] = {}
157 logger.info(f"Reading {file} from {fname}")
158 if ext == ".tif":
159 from PIL import Image
161 data[key][name] = Image.open(zf.open(file))
162 else:
163 data[key][name] = cl(file, zipfile=zf)
164 return data
167def _get_to_file(fname, zipfile, to_path, _files):
168 to_file = None
169 if zipfile is not None or to_path is not None:
170 to_file = fname
171 if zipfile is None:
172 to_file = os.path.join(to_path, to_file)
173 if _files is not None:
174 _files.append(to_file)
175 return to_file
178def _save_data_to_zip(to_zip, files, remove_path_again, to_path):
179 try:
180 import zlib
182 compression = ZIP_DEFLATED
183 except ImportError:
184 logger.warning("Could not import zlib, saving zipfile without compression")
185 compression = ZIP_STORED
186 with ZipFile(to_zip, "w", compression=compression) as zf:
187 for file in files:
188 zf.write(file, os.path.split(file)[1])
189 if remove_path_again:
190 # remove individual files again
191 for file in files:
192 os.remove(file)
193 os.removedirs(to_path)
196def _format_repr(self, props):
197 # format these properties into a string
198 props_str = ""
199 for key in props:
200 value = props[key]
201 props_str = f"{props_str}{key}={value.__repr__()}, "
202 if len(props_str) > 1:
203 props_str = props_str[:-2]
204 # generate name
205 name = f"{self.__class__.__name__}({props_str})"
206 return name
209def _get_tag(node):
210 return node.tag.split("}", 1)[1]
213def _warn_unknown_tag(tag, parent=None, class_name=None, bro_id=None):
214 msg = f"Tag {tag} "
215 if parent is not None:
216 msg = f"{msg} of parent {parent} "
217 msg = f"{msg} not supported"
218 if class_name is not None:
219 msg = f"{msg} in {class_name}"
220 if bro_id is not None:
221 if class_name is None:
222 msg = f"{msg} in"
223 msg = f"{msg} {bro_id}"
225 logger.warning(msg)