Coverage for kgi / utils.py: 85%
188 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 08:53 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 08:53 +0000
1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5"""Utility functions and classes."""
7import json
8import logging
9import re
10from decimal import Decimal
11from datetime import datetime
12from urllib.parse import ParseResult, unquote, urlparse
14import pandas as pd
16from .constants import REF_TEMPLATE_REGEX
19class IdGenerator:
20 """Generates unique IDs."""
22 def __init__(self):
23 self.counter = 0
25 def get_id(self):
26 self.counter += 1
27 return self.counter
29 def reset(self):
30 self.counter = 0
33class Validator:
34 """Validation utilities."""
36 @staticmethod
37 def url(x) -> bool:
38 """Check if a string is a valid URL."""
39 try:
40 result: ParseResult = urlparse(x)
41 return all([result.scheme, result.netloc])
42 except Exception:
43 return False
47class Identifier:
48 """Identifier generation utilities."""
50 @staticmethod
51 def generate_plain_identifier(rule: pd.Series, value: str) -> str | None:
52 source_type = str(rule["source_type"])
54 if source_type in ("CSV", "RDB"):
55 return value
56 else:
57 logging.getLogger("kgi").error(f"Unsupported source type: {source_type}")
58 return None
61class Codex:
62 """Manages ID mapping for variables."""
64 def __init__(self):
65 self.codex: dict[str, str] = {}
66 self.subjects: set[str] = set()
67 self.idGenerator = IdGenerator()
68 self.variable_counters: dict[str, int] = {}
70 def _extract_base_from_url(self, url: str) -> str:
71 """Extract meaningful base name from a URL or template."""
72 # http://example.com/Student/{ID}/{Name} → [..., 'Student', '{ID}', '{Name}']
73 parts = url.rstrip("/").split("/")
74 base = parts[-1] if parts[-1] else parts[-2] if len(parts) > 1 else "resource"
75 # {"Name"} → Name
76 base = base.split("#")[-1].strip('{}"')
77 # Template URL: Name → Name_uri (preserves Name for the SELECT variable)
78 if "{" in url:
79 base = f"{base}_uri"
80 return base
82 def _generate_descriptive_id(self, key: str) -> str:
83 """Generate a descriptive variable name from a key.
85 The key can be:
86 - An RML template like "http://example.com/{Name}"
87 - A column/reference name like "Name"
88 - A temporary variable like "Name_temp_1" (created in triples.py for intermediate values)
89 - A slice variable like "http://example.com/{Name}_slice_subject_2" (for template parsing)
90 - A plain variable like "Name_plain_3" (for non-encoded values)
91 """
92 # Define suffix patterns and their descriptions
93 SUFFIXES = [
94 ("_temp_", "_temp"), # Temporary variables for intermediate values
95 ("_slice_", "_slice"), # Slice variables for substring operations
96 ("_plain_", "_plain"), # Plain variables for non-encoded values
97 ]
99 # Check for special suffixes and extract base name
100 suffix_to_add = ""
101 for separator, suffix_label in SUFFIXES:
102 if separator in key:
103 base_name = key.split(separator)[0]
104 suffix_to_add = suffix_label
105 break
106 else:
107 # No special suffix found
108 base_name = key
110 if "http://" in base_name or "https://" in base_name:
111 base_name = self._extract_base_from_url(base_name)
113 base_name = self._sanitize_variable_name(base_name)
115 if not base_name or base_name.isdigit():
116 base_name = "var"
118 if suffix_to_add:
119 base_name = f"{base_name}{suffix_to_add}"
121 if base_name in self.variable_counters:
122 self.variable_counters[base_name] += 1
123 return f"{base_name}_{self.variable_counters[base_name]}"
124 else:
125 self.variable_counters[base_name] = 1
126 return base_name
128 def _sanitize_variable_name(self, name: str) -> str:
129 """Sanitize a string to be a valid SPARQL variable name."""
130 # Keep only alphanumeric characters and underscores
131 sanitized = re.sub(r"[^a-zA-Z0-9_]", "_", name)
132 # Remove leading/trailing underscores
133 sanitized = sanitized.strip("_")
134 # Ensure it doesn't start with a number
135 if sanitized and sanitized[0].isdigit():
136 sanitized = "v_" + sanitized
137 return sanitized if sanitized else "var"
139 def get_id(self, key: str) -> str:
140 """Get or create an ID for a key."""
141 if key in self.codex.keys():
142 return self.codex[key]
143 else:
144 self.codex[key] = self._generate_descriptive_id(key)
145 return self.codex[key]
147 def get_id_and_is_bound(self, key: str) -> tuple[str, bool]:
148 """Get ID and check if key was already bound."""
149 is_bound = key in self.codex.keys()
150 return self.get_id(key), is_bound
153def sparql_to_python_type(value, datatype):
154 """Convert SPARQL datatype to Python type."""
155 datatype = str(datatype)
156 try:
157 if datatype == "http://www.w3.org/2001/XMLSchema#integer":
158 return int(value)
159 elif datatype == "http://www.w3.org/2001/XMLSchema#decimal":
160 return Decimal(value)
161 elif datatype == "http://www.w3.org/2001/XMLSchema#float":
162 return float(value)
163 elif datatype == "http://www.w3.org/2001/XMLSchema#double":
164 return float(value)
165 elif datatype == "http://www.w3.org/2001/XMLSchema#boolean":
166 return value.lower() == "true"
167 elif datatype == "http://www.w3.org/2001/XMLSchema#dateTime":
168 return datetime.fromisoformat(value)
169 elif datatype == "http://www.w3.org/2001/XMLSchema#date":
170 return datetime.strptime(value, "%Y-%m-%d").date()
171 else:
172 return value
173 except (ValueError, TypeError) as e:
174 logging.getLogger("kgi").warning(
175 f"Type conversion failed for value '{value}' to datatype '{datatype}': {e}. Returning original value."
176 )
177 return value
180def url_decode(url):
181 """URL decode a string."""
182 try:
183 return unquote(url) if isinstance(url, str) else url
184 except Exception:
185 return url
188def insert_columns(df: pd.DataFrame, pure=False) -> pd.DataFrame:
189 """Insert reference columns into mapping rules DataFrame."""
190 if pure:
191 df = df.copy(deep=True)
193 def _col_pos(name: str) -> int:
194 loc = df.columns.get_loc(name)
195 assert isinstance(loc, int)
196 return loc
198 def _empty_lists() -> pd.Series: # type: ignore[type-arg]
199 return pd.Series([[] for _ in range(df.shape[0])])
201 def _none_col() -> pd.Series: # type: ignore[type-arg]
202 return pd.Series([None] * df.shape[0], dtype="object")
204 # Add columns at specific positions
205 df.insert(_col_pos("subject_map_value") + 1, "subject_references", _empty_lists())
206 df.insert(_col_pos("subject_map_value") + 1, "subject_references_template", _none_col())
207 df.insert(_col_pos("subject_references") + 1, "subject_reference_count", 0)
208 df.insert(
209 _col_pos("predicate_map_value") + 1, "predicate_references", _empty_lists()
210 )
211 df.insert(
212 _col_pos("predicate_map_value") + 1, "predicate_references_template", _none_col()
213 )
214 df.insert(_col_pos("predicate_references") + 1, "predicate_reference_count", 0)
215 df.insert(_col_pos("object_map_value") + 1, "object_references", _empty_lists())
216 df.insert(_col_pos("object_map_value") + 1, "object_references_template", _none_col())
217 df.insert(_col_pos("object_references") + 1, "object_reference_count", 0)
218 df.insert(_col_pos("graph_map_value") + 1, "graph_references", _empty_lists())
219 df.insert(_col_pos("graph_map_value") + 1, "graph_references_template", _none_col())
220 df.insert(_col_pos("graph_references") + 1, "graph_reference_count", 0)
222 # Process each mapping rule to extract references
223 for index in df.index:
224 # Subject references
225 match df.at[index, "subject_map_type"]:
226 case "http://w3id.org/rml/constant":
227 df.at[index, "subject_references"] = []
228 df.at[index, "subject_reference_count"] = 0
229 case "http://w3id.org/rml/reference":
230 df.at[index, "subject_references"] = [df.at[index, "subject_map_value"]]
231 df.at[index, "subject_reference_count"] = 1
232 case "http://w3id.org/rml/template":
233 references_list = re.findall(
234 REF_TEMPLATE_REGEX, df.at[index, "subject_map_value"]
235 )
236 df.at[index, "subject_references"] = references_list
237 df.at[index, "subject_reference_count"] = len(references_list)
238 df.at[index, "subject_references_template"] = re.sub(
239 REF_TEMPLATE_REGEX,
240 r"([^/]*)",
241 df.at[index, "subject_map_value"],
242 )
244 # Predicate references
245 match df.at[index, "predicate_map_type"]:
246 case "http://w3id.org/rml/constant":
247 df.at[index, "predicate_references"] = []
248 df.at[index, "predicate_reference_count"] = 0
249 case "http://w3id.org/rml/reference":
250 df.at[index, "predicate_references"] = [
251 df.at[index, "predicate_map_value"]
252 ]
253 df.at[index, "predicate_reference_count"] = 1
254 case "http://w3id.org/rml/template":
255 references_list = re.findall(
256 REF_TEMPLATE_REGEX, df.at[index, "predicate_map_value"]
257 )
258 df.at[index, "predicate_references"] = references_list
259 df.at[index, "predicate_reference_count"] = len(references_list)
260 df.at[index, "predicate_references_template"] = re.sub(
261 REF_TEMPLATE_REGEX,
262 r"([^/]*)",
263 df.at[index, "predicate_map_value"],
264 )
266 # Object references
267 match df.at[index, "object_map_type"]:
268 case "http://w3id.org/rml/constant":
269 df.at[index, "object_references"] = []
270 df.at[index, "object_reference_count"] = 0
271 case "http://w3id.org/rml/reference":
272 df.at[index, "object_references"] = [df.at[index, "object_map_value"]]
273 df.at[index, "object_reference_count"] = 1
274 case "http://w3id.org/rml/template":
275 references_list = re.findall(
276 REF_TEMPLATE_REGEX, df.at[index, "object_map_value"]
277 )
278 df.at[index, "object_references"] = references_list
279 df.at[index, "object_reference_count"] = len(references_list)
280 df.at[index, "object_references_template"] = re.sub(
281 REF_TEMPLATE_REGEX, r"([^/]*)", df.at[index, "object_map_value"]
282 )
283 case "http://w3id.org/rml/parentTriplesMap":
284 join_conditions = df.at[index, "object_join_conditions"]
285 if pd.notna(join_conditions):
286 df.at[index, "object_references"] = [
287 list(
288 json.loads(
289 join_conditions.replace("'", '"')
290 ).values()
291 )[0]["child_value"]
292 ]
293 df.at[index, "object_reference_count"] = 1
294 else:
295 df.at[index, "object_references"] = []
296 df.at[index, "object_reference_count"] = 0
298 # Graph references
299 graph_map_type = df.at[index, "graph_map_type"]
300 if pd.notna(graph_map_type):
301 match graph_map_type:
302 case "http://w3id.org/rml/constant":
303 df.at[index, "graph_references"] = []
304 df.at[index, "graph_reference_count"] = 0
305 case "http://w3id.org/rml/reference":
306 df.at[index, "graph_references"] = [df.at[index, "graph_map_value"]]
307 df.at[index, "graph_reference_count"] = 1
308 case "http://w3id.org/rml/template":
309 references_list = re.findall(
310 REF_TEMPLATE_REGEX, df.at[index, "graph_map_value"]
311 )
312 df.at[index, "graph_references"] = references_list
313 df.at[index, "graph_reference_count"] = len(references_list)
314 df.at[index, "graph_references_template"] = re.sub(
315 REF_TEMPLATE_REGEX, r"([^/]*)", df.at[index, "graph_map_value"]
316 )
318 return df