Coverage for brodata / util.py: 53%

154 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-20 14:37 +0000

1import logging 

2import os 

3from zipfile import ZipFile, ZIP_DEFLATED, ZIP_STORED 

4 

5import numpy as np 

6 

7logger = logging.getLogger(__name__) 

8 

9try: 

10 from tqdm import tqdm 

11except ImportError: 

12 # fallback: generate a dummy method with the same interface 

13 def tqdm(iterable=None, **kwargs): 

14 return iterable if iterable is not None else [] 

15 

16 

17def read_zipfile(fname, pathnames=None, use_bro_abbreviation=False, override_ext=None): 

18 """ 

19 Read and parse files from a ZIP archive downloaded from BROloket. 

20 

21 Parameters 

22 ---------- 

23 fname : str 

24 Path to the ZIP file to read. 

25 pathnames : list of str or str, optional 

26 List of folder names within the ZIP archive to process. If None, all unique 

27 non-root directories are processed. 

28 use_bro_abbreviation: bool, optional 

29 If True, use the abbreviation of bro-objects (e.g. GMW, GLD, BHR) to store the 

30 data in the root of the returned dictionary. If False, use the first level of 

31 the folder structure in the zip-file to store the returned objects (e.g. 

32 BRO_Grondwatermonitoring, BRO_GeologischBooronderzoek). The default is False. 

33 override_ext : str, optional 

34 Removed argument from `read_zipfile` 

35 

36 Returns 

37 ------- 

38 dict 

39 Nested dictionary where the first-level keys are data-categories, and the 

40 second-level keys are file base names (bro-id or nitg-nr). 

41 The values are either parsed objects (from corresponding classes) or file 

42 objects (e.g., PIL.Image for .tif files). 

43 

44 Notes 

45 ----- 

46 - For .tif files, PIL.Image objects are returned. 

47 - For other supported types, the corresponding class is instantiated with the file 

48 and the ZipFile object. 

49 """ 

50 if override_ext is not None: 

51 raise (Exception("The parameter `override_ext` is removed from `read_zipfile`")) 

52 

53 data = {} 

54 with ZipFile(fname) as zf: 

55 namelist = np.array(zf.namelist()) 

56 for file in namelist: 

57 name, ext = os.path.splitext(os.path.basename(file)) 

58 if name == "": 

59 # this is a directory 

60 continue 

61 pathname = os.path.dirname(file) 

62 if pathname == "": 

63 # skip file in the root path (usually the file 'locatie_levering.kml') 

64 continue 

65 if pathnames is not None: 

66 if pathname not in pathnames: 

67 continue 

68 if pathname.startswith("BRO"): 

69 if ext != ".xml": 

70 logger.info(f"Skipping file: {file}") 

71 continue 

72 if use_bro_abbreviation: 

73 key = name[:3] 

74 else: 

75 key = os.path.normpath(pathname).split(os.sep)[0] 

76 if name.startswith("BHR"): 

77 if pathname == "BRO_GeotechnischBooronderzoek": 

78 from .bhr import GeotechnicalBoreholeResearch as cl 

79 elif pathname == "BRO_GeologischBooronderzoek": 

80 from .bhr import GeologicalBoreholeResearch as cl 

81 elif pathname == "BodemkundigBooronderzoek": 

82 from .bhr import PedologicalBoreholeResearch as cl 

83 else: 

84 logger.warning(f"Unknown BHR-type: {pathname}") 

85 elif name.startswith("CPT"): 

86 from .cpt import ConePenetrationTest as cl 

87 elif name.startswith("EPC"): 

88 from .epc import ExplorationProductionConstruction as cl 

89 elif name.startswith("FRD"): 

90 from .frd import FormationResistanceDossier as cl 

91 elif name.startswith("GAR"): 

92 from .gar import GroundwaterAnalysisReport as cl 

93 elif name.startswith("GLD"): 

94 from .gld import GroundwaterLevelDossier as cl 

95 elif name.startswith("GMN"): 

96 from .gmn import GroundwaterMonitoringNetwork as cl 

97 elif name.startswith("GMW"): 

98 from .gmw import GroundwaterMonitoringWell as cl 

99 elif name.startswith("GPD"): 

100 from .gpd import GroundwaterProductionDossier as cl 

101 elif name.startswith("GUF"): 

102 from .guf import GroundwaterUtilisationFacility as cl 

103 elif name.startswith("SAD"): 

104 from .sad import SiteAssessmentData as cl 

105 elif name.startswith("SFR"): 

106 from .sfr import SoilFaceResearch as cl 

107 else: 

108 logger.warning("Unknown file-type: {file}") 

109 continue 

110 

111 elif pathname.startswith("DINO"): 

112 key = pathname 

113 if pathname == "DINO_GeologischBooronderzoekBoormonsterprofiel": 

114 from .dino import GeologischBooronderzoek as cl 

115 

116 if ext != ".csv": 

117 logger.info(f"Skipping file: {file}") 

118 continue 

119 elif pathname == "DINO_GeotechnischSondeeronderzoek": 

120 cl = None 

121 if ext != ".tif": 

122 logger.info(f"Skipping file: {file}") 

123 continue 

124 elif pathname == "DINO_GeologischBooronderzoekKorrelgrootteAnalyse": 

125 logger.warning(f"Folder {pathname} not supported yet") 

126 continue 

127 elif pathname == "DINO_GeologischBooronderzoekChemischeAnalyse": 

128 logger.warning(f"Folder {pathname} not supported yet") 

129 continue 

130 elif pathname == "DINO_Grondwatersamenstelling": 

131 from .dino import Grondwatersamenstelling as cl 

132 

133 if ext != ".csv": 

134 logger.info(f"Skipping file: {file}") 

135 continue 

136 elif pathname == "DINO_Grondwaterstanden": 

137 from .dino import Grondwaterstand as cl 

138 

139 if ext != ".csv": 

140 logger.info(f"Skipping file: {file}") 

141 continue 

142 elif pathname in [ 

143 "DINO_VerticaalElektrischSondeeronderzoek", 

144 "DINO_GeoElectrischOnderzoek", 

145 ]: 

146 from .dino import VerticaalElektrischSondeeronderzoek as cl 

147 

148 if ext != ".csv": 

149 logger.info(f"Skipping file: {file}") 

150 continue 

151 else: 

152 logger.warning(f"Folder {pathname} not supported yet") 

153 continue 

154 

155 if key not in data: 

156 data[key] = {} 

157 logger.info(f"Reading {file} from {fname}") 

158 if ext == ".tif": 

159 from PIL import Image 

160 

161 data[key][name] = Image.open(zf.open(file)) 

162 else: 

163 data[key][name] = cl(file, zipfile=zf) 

164 return data 

165 

166 

167def _get_to_file(fname, zipfile, to_path, _files): 

168 to_file = None 

169 if zipfile is not None or to_path is not None: 

170 to_file = fname 

171 if zipfile is None: 

172 to_file = os.path.join(to_path, to_file) 

173 if _files is not None: 

174 _files.append(to_file) 

175 return to_file 

176 

177 

178def _save_data_to_zip(to_zip, files, remove_path_again, to_path): 

179 try: 

180 import zlib 

181 

182 compression = ZIP_DEFLATED 

183 except ImportError: 

184 logger.warning("Could not import zlib, saving zipfile without compression") 

185 compression = ZIP_STORED 

186 with ZipFile(to_zip, "w", compression=compression) as zf: 

187 for file in files: 

188 zf.write(file, os.path.split(file)[1]) 

189 if remove_path_again: 

190 # remove individual files again 

191 for file in files: 

192 os.remove(file) 

193 os.removedirs(to_path) 

194 

195 

196def _format_repr(self, props): 

197 # format these properties into a string 

198 props_str = "" 

199 for key in props: 

200 value = props[key] 

201 props_str = f"{props_str}{key}={value.__repr__()}, " 

202 if len(props_str) > 1: 

203 props_str = props_str[:-2] 

204 # generate name 

205 name = f"{self.__class__.__name__}({props_str})" 

206 return name 

207 

208 

209def _get_tag(node): 

210 return node.tag.split("}", 1)[1] 

211 

212 

213def _warn_unknown_tag(tag, parent=None, class_name=None, bro_id=None): 

214 msg = f"Tag {tag} " 

215 if parent is not None: 

216 msg = f"{msg} of parent {parent} " 

217 msg = f"{msg} not supported" 

218 if class_name is not None: 

219 msg = f"{msg} in {class_name}" 

220 if bro_id is not None: 

221 if class_name is None: 

222 msg = f"{msg} in" 

223 msg = f"{msg} {bro_id}" 

224 

225 logger.warning(msg)