Coverage for kgi / utils.py: 85%

188 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-23 08:53 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5"""Utility functions and classes.""" 

6 

7import json 

8import logging 

9import re 

10from decimal import Decimal 

11from datetime import datetime 

12from urllib.parse import ParseResult, unquote, urlparse 

13 

14import pandas as pd 

15 

16from .constants import REF_TEMPLATE_REGEX 

17 

18 

19class IdGenerator: 

20 """Generates unique IDs.""" 

21 

22 def __init__(self): 

23 self.counter = 0 

24 

25 def get_id(self): 

26 self.counter += 1 

27 return self.counter 

28 

29 def reset(self): 

30 self.counter = 0 

31 

32 

33class Validator: 

34 """Validation utilities.""" 

35 

36 @staticmethod 

37 def url(x) -> bool: 

38 """Check if a string is a valid URL.""" 

39 try: 

40 result: ParseResult = urlparse(x) 

41 return all([result.scheme, result.netloc]) 

42 except Exception: 

43 return False 

44 

45 

46 

47class Identifier: 

48 """Identifier generation utilities.""" 

49 

50 @staticmethod 

51 def generate_plain_identifier(rule: pd.Series, value: str) -> str | None: 

52 source_type = str(rule["source_type"]) 

53 

54 if source_type in ("CSV", "RDB"): 

55 return value 

56 else: 

57 logging.getLogger("kgi").error(f"Unsupported source type: {source_type}") 

58 return None 

59 

60 

61class Codex: 

62 """Manages ID mapping for variables.""" 

63 

64 def __init__(self): 

65 self.codex: dict[str, str] = {} 

66 self.subjects: set[str] = set() 

67 self.idGenerator = IdGenerator() 

68 self.variable_counters: dict[str, int] = {} 

69 

70 def _extract_base_from_url(self, url: str) -> str: 

71 """Extract meaningful base name from a URL or template.""" 

72 # http://example.com/Student/{ID}/{Name} → [..., 'Student', '{ID}', '{Name}'] 

73 parts = url.rstrip("/").split("/") 

74 base = parts[-1] if parts[-1] else parts[-2] if len(parts) > 1 else "resource" 

75 # {"Name"} → Name 

76 base = base.split("#")[-1].strip('{}"') 

77 # Template URL: Name → Name_uri (preserves Name for the SELECT variable) 

78 if "{" in url: 

79 base = f"{base}_uri" 

80 return base 

81 

82 def _generate_descriptive_id(self, key: str) -> str: 

83 """Generate a descriptive variable name from a key. 

84 

85 The key can be: 

86 - An RML template like "http://example.com/{Name}" 

87 - A column/reference name like "Name" 

88 - A temporary variable like "Name_temp_1" (created in triples.py for intermediate values) 

89 - A slice variable like "http://example.com/{Name}_slice_subject_2" (for template parsing) 

90 - A plain variable like "Name_plain_3" (for non-encoded values) 

91 """ 

92 # Define suffix patterns and their descriptions 

93 SUFFIXES = [ 

94 ("_temp_", "_temp"), # Temporary variables for intermediate values 

95 ("_slice_", "_slice"), # Slice variables for substring operations 

96 ("_plain_", "_plain"), # Plain variables for non-encoded values 

97 ] 

98 

99 # Check for special suffixes and extract base name 

100 suffix_to_add = "" 

101 for separator, suffix_label in SUFFIXES: 

102 if separator in key: 

103 base_name = key.split(separator)[0] 

104 suffix_to_add = suffix_label 

105 break 

106 else: 

107 # No special suffix found 

108 base_name = key 

109 

110 if "http://" in base_name or "https://" in base_name: 

111 base_name = self._extract_base_from_url(base_name) 

112 

113 base_name = self._sanitize_variable_name(base_name) 

114 

115 if not base_name or base_name.isdigit(): 

116 base_name = "var" 

117 

118 if suffix_to_add: 

119 base_name = f"{base_name}{suffix_to_add}" 

120 

121 if base_name in self.variable_counters: 

122 self.variable_counters[base_name] += 1 

123 return f"{base_name}_{self.variable_counters[base_name]}" 

124 else: 

125 self.variable_counters[base_name] = 1 

126 return base_name 

127 

128 def _sanitize_variable_name(self, name: str) -> str: 

129 """Sanitize a string to be a valid SPARQL variable name.""" 

130 # Keep only alphanumeric characters and underscores 

131 sanitized = re.sub(r"[^a-zA-Z0-9_]", "_", name) 

132 # Remove leading/trailing underscores 

133 sanitized = sanitized.strip("_") 

134 # Ensure it doesn't start with a number 

135 if sanitized and sanitized[0].isdigit(): 

136 sanitized = "v_" + sanitized 

137 return sanitized if sanitized else "var" 

138 

139 def get_id(self, key: str) -> str: 

140 """Get or create an ID for a key.""" 

141 if key in self.codex.keys(): 

142 return self.codex[key] 

143 else: 

144 self.codex[key] = self._generate_descriptive_id(key) 

145 return self.codex[key] 

146 

147 def get_id_and_is_bound(self, key: str) -> tuple[str, bool]: 

148 """Get ID and check if key was already bound.""" 

149 is_bound = key in self.codex.keys() 

150 return self.get_id(key), is_bound 

151 

152 

153def sparql_to_python_type(value, datatype): 

154 """Convert SPARQL datatype to Python type.""" 

155 datatype = str(datatype) 

156 try: 

157 if datatype == "http://www.w3.org/2001/XMLSchema#integer": 

158 return int(value) 

159 elif datatype == "http://www.w3.org/2001/XMLSchema#decimal": 

160 return Decimal(value) 

161 elif datatype == "http://www.w3.org/2001/XMLSchema#float": 

162 return float(value) 

163 elif datatype == "http://www.w3.org/2001/XMLSchema#double": 

164 return float(value) 

165 elif datatype == "http://www.w3.org/2001/XMLSchema#boolean": 

166 return value.lower() == "true" 

167 elif datatype == "http://www.w3.org/2001/XMLSchema#dateTime": 

168 return datetime.fromisoformat(value) 

169 elif datatype == "http://www.w3.org/2001/XMLSchema#date": 

170 return datetime.strptime(value, "%Y-%m-%d").date() 

171 else: 

172 return value 

173 except (ValueError, TypeError) as e: 

174 logging.getLogger("kgi").warning( 

175 f"Type conversion failed for value '{value}' to datatype '{datatype}': {e}. Returning original value." 

176 ) 

177 return value 

178 

179 

180def url_decode(url): 

181 """URL decode a string.""" 

182 try: 

183 return unquote(url) if isinstance(url, str) else url 

184 except Exception: 

185 return url 

186 

187 

188def insert_columns(df: pd.DataFrame, pure=False) -> pd.DataFrame: 

189 """Insert reference columns into mapping rules DataFrame.""" 

190 if pure: 

191 df = df.copy(deep=True) 

192 

193 def _col_pos(name: str) -> int: 

194 loc = df.columns.get_loc(name) 

195 assert isinstance(loc, int) 

196 return loc 

197 

198 def _empty_lists() -> pd.Series: # type: ignore[type-arg] 

199 return pd.Series([[] for _ in range(df.shape[0])]) 

200 

201 def _none_col() -> pd.Series: # type: ignore[type-arg] 

202 return pd.Series([None] * df.shape[0], dtype="object") 

203 

204 # Add columns at specific positions 

205 df.insert(_col_pos("subject_map_value") + 1, "subject_references", _empty_lists()) 

206 df.insert(_col_pos("subject_map_value") + 1, "subject_references_template", _none_col()) 

207 df.insert(_col_pos("subject_references") + 1, "subject_reference_count", 0) 

208 df.insert( 

209 _col_pos("predicate_map_value") + 1, "predicate_references", _empty_lists() 

210 ) 

211 df.insert( 

212 _col_pos("predicate_map_value") + 1, "predicate_references_template", _none_col() 

213 ) 

214 df.insert(_col_pos("predicate_references") + 1, "predicate_reference_count", 0) 

215 df.insert(_col_pos("object_map_value") + 1, "object_references", _empty_lists()) 

216 df.insert(_col_pos("object_map_value") + 1, "object_references_template", _none_col()) 

217 df.insert(_col_pos("object_references") + 1, "object_reference_count", 0) 

218 df.insert(_col_pos("graph_map_value") + 1, "graph_references", _empty_lists()) 

219 df.insert(_col_pos("graph_map_value") + 1, "graph_references_template", _none_col()) 

220 df.insert(_col_pos("graph_references") + 1, "graph_reference_count", 0) 

221 

222 # Process each mapping rule to extract references 

223 for index in df.index: 

224 # Subject references 

225 match df.at[index, "subject_map_type"]: 

226 case "http://w3id.org/rml/constant": 

227 df.at[index, "subject_references"] = [] 

228 df.at[index, "subject_reference_count"] = 0 

229 case "http://w3id.org/rml/reference": 

230 df.at[index, "subject_references"] = [df.at[index, "subject_map_value"]] 

231 df.at[index, "subject_reference_count"] = 1 

232 case "http://w3id.org/rml/template": 

233 references_list = re.findall( 

234 REF_TEMPLATE_REGEX, df.at[index, "subject_map_value"] 

235 ) 

236 df.at[index, "subject_references"] = references_list 

237 df.at[index, "subject_reference_count"] = len(references_list) 

238 df.at[index, "subject_references_template"] = re.sub( 

239 REF_TEMPLATE_REGEX, 

240 r"([^/]*)", 

241 df.at[index, "subject_map_value"], 

242 ) 

243 

244 # Predicate references 

245 match df.at[index, "predicate_map_type"]: 

246 case "http://w3id.org/rml/constant": 

247 df.at[index, "predicate_references"] = [] 

248 df.at[index, "predicate_reference_count"] = 0 

249 case "http://w3id.org/rml/reference": 

250 df.at[index, "predicate_references"] = [ 

251 df.at[index, "predicate_map_value"] 

252 ] 

253 df.at[index, "predicate_reference_count"] = 1 

254 case "http://w3id.org/rml/template": 

255 references_list = re.findall( 

256 REF_TEMPLATE_REGEX, df.at[index, "predicate_map_value"] 

257 ) 

258 df.at[index, "predicate_references"] = references_list 

259 df.at[index, "predicate_reference_count"] = len(references_list) 

260 df.at[index, "predicate_references_template"] = re.sub( 

261 REF_TEMPLATE_REGEX, 

262 r"([^/]*)", 

263 df.at[index, "predicate_map_value"], 

264 ) 

265 

266 # Object references 

267 match df.at[index, "object_map_type"]: 

268 case "http://w3id.org/rml/constant": 

269 df.at[index, "object_references"] = [] 

270 df.at[index, "object_reference_count"] = 0 

271 case "http://w3id.org/rml/reference": 

272 df.at[index, "object_references"] = [df.at[index, "object_map_value"]] 

273 df.at[index, "object_reference_count"] = 1 

274 case "http://w3id.org/rml/template": 

275 references_list = re.findall( 

276 REF_TEMPLATE_REGEX, df.at[index, "object_map_value"] 

277 ) 

278 df.at[index, "object_references"] = references_list 

279 df.at[index, "object_reference_count"] = len(references_list) 

280 df.at[index, "object_references_template"] = re.sub( 

281 REF_TEMPLATE_REGEX, r"([^/]*)", df.at[index, "object_map_value"] 

282 ) 

283 case "http://w3id.org/rml/parentTriplesMap": 

284 join_conditions = df.at[index, "object_join_conditions"] 

285 if pd.notna(join_conditions): 

286 df.at[index, "object_references"] = [ 

287 list( 

288 json.loads( 

289 join_conditions.replace("'", '"') 

290 ).values() 

291 )[0]["child_value"] 

292 ] 

293 df.at[index, "object_reference_count"] = 1 

294 else: 

295 df.at[index, "object_references"] = [] 

296 df.at[index, "object_reference_count"] = 0 

297 

298 # Graph references 

299 graph_map_type = df.at[index, "graph_map_type"] 

300 if pd.notna(graph_map_type): 

301 match graph_map_type: 

302 case "http://w3id.org/rml/constant": 

303 df.at[index, "graph_references"] = [] 

304 df.at[index, "graph_reference_count"] = 0 

305 case "http://w3id.org/rml/reference": 

306 df.at[index, "graph_references"] = [df.at[index, "graph_map_value"]] 

307 df.at[index, "graph_reference_count"] = 1 

308 case "http://w3id.org/rml/template": 

309 references_list = re.findall( 

310 REF_TEMPLATE_REGEX, df.at[index, "graph_map_value"] 

311 ) 

312 df.at[index, "graph_references"] = references_list 

313 df.at[index, "graph_reference_count"] = len(references_list) 

314 df.at[index, "graph_references_template"] = re.sub( 

315 REF_TEMPLATE_REGEX, r"([^/]*)", df.at[index, "graph_map_value"] 

316 ) 

317 

318 return df