Coverage for kgi / triples.py: 90%
250 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 08:53 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 08:53 +0000
1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5"""Triple classes for SPARQL query generation."""
7import json
8import logging
10import pandas as pd
12from .base import Triple
13from .constants import (
14 RML_BLANK_NODE,
15 RML_CONSTANT,
16 RML_DEFAULT_GRAPH,
17 RML_IRI,
18 RML_LITERAL,
19 RML_PARENT_TRIPLES_MAP,
20 RML_REFERENCE,
21 RML_TEMPLATE,
22)
23from .utils import Codex, IdGenerator, Identifier
26def extract_from_iri_template(
27 template_value: str,
28 references_template: str,
29 references: list[str],
30 rule: pd.Series,
31 codex: Codex,
32 id_generator: IdGenerator,
33 slice_label: str,
34) -> str:
35 """Generate SPARQL FILTER + BIND patterns to extract column values from a template IRI.
37 Shared by SubjectTriple (subject templates) and graph map extraction.
38 """
39 source_var = codex.get_id(template_value)
41 lines = []
42 lines.append(f"FILTER(REGEX(STR(?{source_var}), '{references_template}'))")
44 evaluated_template = references_template
45 current_slice = source_var
47 for reference in references:
48 current_pre_string = evaluated_template.split("(", 1)[0]
49 current_post_string = evaluated_template.split(")", 1)[1]
50 ref_str = str(reference)
51 reference_identifier = (
52 Identifier.generate_plain_identifier(rule, ref_str) or ref_str
53 )
54 current_reference, already_bound = codex.get_id_and_is_bound(
55 reference_identifier
56 )
58 if current_post_string == "":
59 target = (
60 current_reference
61 if not already_bound
62 else codex.get_id(
63 f"{template_value}_slice_{slice_label}_{id_generator.get_id()}"
64 )
65 )
66 lines.append(
67 f"BIND(STRAFTER(STR(?{current_slice}), '{current_pre_string}') as ?{target})"
68 )
69 else:
70 next_pre_string = current_post_string.split("(", 1)[0]
71 next_slice = codex.get_id(
72 f"{template_value}_slice_{slice_label}_{id_generator.get_id()}"
73 )
74 lines.append(
75 f"BIND(STRAFTER(STR(?{current_slice}), '{current_pre_string}') as ?{next_slice})"
76 )
77 target = (
78 current_reference
79 if not already_bound
80 else codex.get_id(
81 f"{reference_identifier}_temp_{id_generator.get_id()}"
82 )
83 )
84 lines.append(
85 f"BIND(STRBEFORE(STR(?{next_slice}), '{next_pre_string}') AS ?{target})"
86 )
87 current_slice = next_slice
89 evaluated_template = current_post_string
91 return "\n".join(lines)
94class QueryTriple(Triple):
95 """Represents a query triple with subject, predicate, and object."""
97 def __init__(self, rule: pd.Series):
98 self.rule = rule
100 @property
101 def references(self) -> set[str]:
102 """Get all references used in this triple."""
103 return set.union(
104 self.subject_references,
105 self.predicate_references,
106 self.object_references,
107 self.graph_references,
108 )
110 @property
111 def template_extracted_references(self) -> set[str]:
112 """Get references extracted from URI templates (subject, predicate, object, graph template)."""
113 refs = set.union(self.subject_references, self.predicate_references)
114 if self.rule["object_map_type"] == RML_TEMPLATE:
115 refs = refs.union(self.object_references)
116 graph_map_type = self.rule.get("graph_map_type")
117 if isinstance(graph_map_type, str) and graph_map_type == RML_TEMPLATE:
118 refs = refs.union(self.graph_references)
119 return refs
121 @property
122 def plain_references(self) -> set[str]:
123 """Get references available directly from object literals."""
124 refs: set[str] = set()
125 if self.rule["object_map_type"] in (RML_REFERENCE, RML_PARENT_TRIPLES_MAP):
126 refs = set(self.object_references)
127 graph_map_type = self.rule.get("graph_map_type")
128 if isinstance(graph_map_type, str) and graph_map_type == RML_REFERENCE:
129 refs = refs.union(self.graph_references)
130 return refs
132 @property
133 def subject_references(self) -> set[str]:
134 """Get subject references."""
135 return {
136 ident
137 for value in self.rule["subject_references"]
138 if (ident := Identifier.generate_plain_identifier(self.rule, str(value)))
139 is not None
140 }
142 @property
143 def predicate_references(self) -> set[str]:
144 """Get predicate references."""
145 return {
146 ident
147 for value in self.rule["predicate_references"]
148 if (ident := Identifier.generate_plain_identifier(self.rule, str(value)))
149 is not None
150 }
152 @property
153 def object_references(self) -> set[str]:
154 """Get object references."""
155 return {
156 ident
157 for value in self.rule["object_references"]
158 if (ident := Identifier.generate_plain_identifier(self.rule, str(value)))
159 is not None
160 }
162 @property
163 def graph_references(self) -> set[str]:
164 """Get graph map references."""
165 graph_refs = self.rule.get("graph_references")
166 if not isinstance(graph_refs, list):
167 return set()
168 return {
169 ident
170 for value in graph_refs
171 if (ident := Identifier.generate_plain_identifier(self.rule, str(value)))
172 is not None
173 }
175 def _wrap_in_graph(self, pattern: str) -> str:
176 graph_iri = self._graph_iri()
177 if graph_iri is not None:
178 return f"GRAPH <{graph_iri}> {{\n{pattern}\n}}"
179 return pattern
181 def _graph_iri(self) -> str | None:
182 graph_map_type = self.rule.get("graph_map_type")
183 if isinstance(graph_map_type, str) and graph_map_type == RML_CONSTANT:
184 graph_iri = str(self.rule["graph_map_value"])
185 if graph_iri != RML_DEFAULT_GRAPH:
186 return graph_iri
187 return None
189 def generate(
190 self, id_generator: IdGenerator, codex: Codex, all_mapping_rules: pd.DataFrame
191 ) -> str | None:
192 """Generate SPARQL triple pattern, wrapped in GRAPH block if needed."""
193 pattern = self._generate_pattern(id_generator, codex, all_mapping_rules)
194 if pattern is None:
195 return None
196 if str(self.rule["object_map_type"]) == RML_PARENT_TRIPLES_MAP:
197 return pattern
198 return self._wrap_in_graph(pattern)
200 def _generate_pattern(
201 self, id_generator: IdGenerator, codex: Codex, all_mapping_rules: pd.DataFrame
202 ) -> str | None:
203 subject_reference = codex.get_id(str(self.rule["subject_map_value"]))
204 predicate = f"<{self.rule['predicate_map_value']}>"
205 object_map_value = str(self.rule["object_map_value"])
206 object_map_type = str(self.rule["object_map_type"])
207 object_references_template = str(self.rule["object_references_template"])
209 if object_map_type == RML_CONSTANT:
210 object_term_type = self.rule["object_termtype"]
211 if object_term_type == RML_IRI:
212 object_map_value = f"<{object_map_value}>"
213 elif object_term_type == RML_BLANK_NODE:
214 return None
215 elif object_term_type == RML_LITERAL:
216 object_map_value = f'"{object_map_value}"'
217 return f"?{subject_reference} {predicate} {object_map_value} ."
219 if object_map_type == RML_REFERENCE:
220 object_identifier = (
221 Identifier.generate_plain_identifier(self.rule, object_map_value)
222 or object_map_value
223 )
224 object_reference, already_bound = codex.get_id_and_is_bound(
225 object_identifier
226 )
228 lines = []
229 temp_object_reference, already_bound = codex.get_id_and_is_bound(
230 f"{object_identifier}_temp_{id_generator.get_id()}"
231 )
232 if already_bound:
233 lines.append(
234 f"?{subject_reference} {predicate} ?{temp_object_reference} ."
235 )
236 lines.append(f"BIND(?{temp_object_reference} as ?{object_reference})")
237 lines.append(
238 f"FILTER(!BOUND(?{object_reference}) || !BOUND(?{temp_object_reference}) || ?{temp_object_reference} = ?{object_reference})"
239 )
240 else:
241 lines.append(f"?{subject_reference} {predicate} ?{object_reference} .")
242 return "\n".join(lines)
244 elif object_map_type == RML_TEMPLATE:
245 object_identifier = (
246 Identifier.generate_plain_identifier(self.rule, object_map_value)
247 or object_map_value
248 )
249 object_reference, already_bound = codex.get_id_and_is_bound(
250 object_identifier
251 )
252 lines = []
253 lines.append(f"?{subject_reference} {predicate} ?{object_reference}")
255 evaluated_template = object_references_template
256 current_slice = object_reference
258 for obj in self.rule["object_references"]:
259 current_pre_string = evaluated_template.split("(", 1)[0]
260 current_post_string = evaluated_template.split(")", 1)[1]
261 next_pre_string = current_post_string.split("(", 1)[0]
262 obj_str = str(obj)
263 object_identifier = (
264 Identifier.generate_plain_identifier(self.rule, obj_str) or obj_str
265 )
266 object_reference, already_bound = codex.get_id_and_is_bound(
267 object_identifier
268 )
269 next_slice_identifier = (
270 f"{object_identifier}_slice_{id_generator.get_id()}"
271 )
272 next_slice = codex.get_id(next_slice_identifier)
273 unescaped_current_pre_string = current_pre_string.replace("\\", "")
274 unescaped_next_pre_string = next_pre_string.replace("\\", "")
276 lines.append(
277 f"BIND(STRAFTER(STR(?{current_slice}), '{unescaped_current_pre_string}') as ?{next_slice})"
278 )
280 if current_post_string == "":
281 if not already_bound:
282 lines.append(f"BIND(?{next_slice} as ?{object_reference})")
283 else:
284 temp_reference_identifier = (
285 f"{object_identifier}_temp_{id_generator.get_id()}"
286 )
287 temp_reference = codex.get_id(temp_reference_identifier)
288 lines.append(
289 f"BIND(STRBEFORE(STR(?{next_slice}), '{unescaped_next_pre_string}') AS ?{temp_reference})"
290 )
291 if not already_bound:
292 lines.append(f"BIND(?{temp_reference} as ?{object_reference})")
294 evaluated_template = current_post_string
295 current_slice = next_slice
297 return "\n".join(lines)
299 elif object_map_type == RML_PARENT_TRIPLES_MAP:
300 object_parent_triples_map_id = self.rule["object_map_value"]
301 object_rule = all_mapping_rules[
302 all_mapping_rules["triples_map_id"] == object_parent_triples_map_id
303 ].iloc[0]
304 object_map_value = object_rule["subject_map_value"]
305 object_reference = codex.get_id(object_map_value)
306 predicate = f"<{self.rule['predicate_map_value']}>"
308 graph_iri = self._graph_iri()
309 if graph_iri is not None:
310 lines = [
311 f"OPTIONAL {{ GRAPH <{graph_iri}> {{ ?{subject_reference} {predicate} ?{object_reference} ."
312 ]
313 else:
314 lines = [
315 f"OPTIONAL {{ ?{subject_reference} {predicate} ?{object_reference} ."
316 ]
318 raw_join_value = self.rule["object_join_conditions"]
319 if isinstance(raw_join_value, str):
320 join_conditions = json.loads(
321 raw_join_value.replace("'", '"')
322 )
323 else:
324 join_conditions = {}
325 parent_template = object_rule["subject_references_template"]
326 parent_references = object_rule["subject_references"]
328 for jc in join_conditions.values():
329 child_value = jc["child_value"]
330 parent_value = jc["parent_value"]
331 child_identifier = (
332 Identifier.generate_plain_identifier(self.rule, child_value)
333 or child_value
334 )
335 child_ref, child_already_bound = codex.get_id_and_is_bound(
336 child_identifier
337 )
339 evaluated_template = parent_template
340 current_slice = object_reference
342 for ref in parent_references:
343 pre_string = evaluated_template.split("(", 1)[0]
344 post_string = evaluated_template.split(")", 1)[1]
345 next_slice_id = (
346 f"{object_map_value}_join_slice_{id_generator.get_id()}"
347 )
348 next_slice = codex.get_id(next_slice_id)
349 lines.append(
350 f"BIND(STRAFTER(STR(?{current_slice}), '{pre_string}') as ?{next_slice})"
351 )
353 if ref == parent_value:
354 if post_string == "":
355 if not child_already_bound:
356 lines.append(f"BIND(?{next_slice} as ?{child_ref})")
357 else:
358 next_pre = post_string.split("(", 1)[0]
359 temp_id = f"{child_identifier}_temp_{id_generator.get_id()}"
360 temp_ref = codex.get_id(temp_id)
361 lines.append(
362 f"BIND(STRBEFORE(STR(?{next_slice}), '{next_pre}') AS ?{temp_ref})"
363 )
364 if not child_already_bound:
365 lines.append(f"BIND(?{temp_ref} as ?{child_ref})")
366 break
368 evaluated_template = post_string
369 current_slice = next_slice
371 if graph_iri is not None:
372 lines.append("} }")
373 else:
374 lines.append("}")
375 return "\n".join(lines)
377 else:
378 logging.getLogger("kgi").error(
379 f"Unsupported object map type: {object_map_type}"
380 )
381 return None
384class SubjectTriple(QueryTriple):
385 """Represents a subject triple for template extraction."""
387 def __init__(self, rule: pd.Series):
388 super().__init__(rule)
390 @property
391 def template_extracted_references(self) -> set[str]:
392 """Subject references extracted from templates (not column references)."""
393 if self.rule["subject_map_type"] == RML_REFERENCE:
394 return set()
395 return self.subject_references
397 @property
398 def plain_references(self) -> set[str]:
399 """Column-reference subjects are plain references (no URL decoding)."""
400 if self.rule["subject_map_type"] == RML_REFERENCE:
401 return self.subject_references
402 return set()
404 def generate(
405 self, id_generator: IdGenerator, codex: Codex, all_mapping_rules: pd.DataFrame
406 ) -> str | None: # pyright: ignore[reportUnusedParameter]
407 """Generate SPARQL pattern for subject extraction."""
408 all_already_bound = all(
409 (Identifier.generate_plain_identifier(self.rule, str(ref)) or str(ref))
410 in codex.codex
411 for ref in self.rule["subject_references"]
412 )
413 if all_already_bound:
414 return None
416 subject_map_type = self.rule["subject_map_type"]
417 subject_term_type = self.rule["subject_termtype"]
419 if subject_map_type == RML_REFERENCE:
420 # Column-reference subjects: the subject variable already binds
421 # to the IRI which IS the column value. No extraction needed.
422 return None
424 if subject_map_type == RML_TEMPLATE:
425 if subject_term_type == RML_IRI:
426 return self._generate_iri_template(codex, id_generator)
427 elif subject_term_type == RML_BLANK_NODE:
428 return self._generate_blank_node_template(codex, id_generator)
430 logging.getLogger("kgi").error(
431 f"Unsupported subject map type: {subject_map_type} or subject term type: {subject_term_type}"
432 )
433 return None
435 def _generate_iri_template(self, codex: Codex, id_generator: IdGenerator):
436 """Generate SPARQL for IRI template."""
437 return extract_from_iri_template(
438 template_value=str(self.rule["subject_map_value"]),
439 references_template=str(self.rule["subject_references_template"]),
440 references=list(self.rule["subject_references"]),
441 rule=self.rule,
442 codex=codex,
443 id_generator=id_generator,
444 slice_label="subject",
445 )
447 def _generate_blank_node_template(self, codex: Codex, id_generator: IdGenerator):
448 """Generate SPARQL for blank node template."""
449 subject_map_value = str(self.rule["subject_map_value"])
450 subject_references_template = str(self.rule["subject_references_template"])
451 subject_reference = codex.get_id(subject_map_value)
453 lines = []
454 evaluated_template = subject_references_template
455 current_slice_reference = subject_reference
457 for reference in self.rule["subject_references"]:
458 current_pre_string = evaluated_template.split("(", 1)[0]
459 current_post_string = (
460 evaluated_template.split(")", 1)[1] if ")" in evaluated_template else ""
461 )
463 next_slice_reference_identifier = (
464 f"{subject_map_value}_slice_{id_generator.get_id()}"
465 )
466 next_slice_reference = codex.get_id(next_slice_reference_identifier)
468 ref_str = str(reference)
469 reference_identifier = (
470 Identifier.generate_plain_identifier(self.rule, ref_str) or ref_str
471 )
472 current_reference, already_bound = codex.get_id_and_is_bound(
473 reference_identifier
474 )
476 unescaped_current_pre_string = current_pre_string.replace("\\", "")
477 if current_post_string == "":
478 if not already_bound:
479 lines.append(
480 f"BIND(STRAFTER(STR(?{current_slice_reference}), '{unescaped_current_pre_string}') as ?{current_reference})"
481 )
482 else:
483 unescaped_next_pre_string = current_post_string.split("(", 1)[
484 0
485 ].replace("\\", "")
486 temp_reference_identifier = (
487 f"{reference_identifier}_temp_{id_generator.get_id()}"
488 )
489 temp_reference = codex.get_id(temp_reference_identifier)
491 lines.append(
492 f"BIND(STRAFTER(STR(?{current_slice_reference}), '{unescaped_current_pre_string}') as ?{next_slice_reference})"
493 )
494 lines.append(
495 f"BIND(STRBEFORE(STR(?{next_slice_reference}), '{unescaped_next_pre_string}') AS ?{temp_reference})"
496 )
497 if not already_bound:
498 lines.append(f"BIND(?{temp_reference} as ?{current_reference})")
499 current_slice_reference = next_slice_reference
501 evaluated_template = current_post_string
503 return "\n".join(lines)