Coverage for kgi / query.py: 81%
144 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 08:53 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 08:53 +0000
1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5"""SPARQL query generation and execution."""
7import json
8import logging
10import pandas as pd
12from .base import Endpoint
13from .constants import (
14 RML_BLANK_NODE,
15 RML_CONSTANT,
16 RML_PARENT_TRIPLES_MAP,
17 RML_REFERENCE,
18 RML_TEMPLATE,
19)
20from .triples import QueryTriple, SubjectTriple, extract_from_iri_template
21from .utils import Codex, IdGenerator, Identifier, sparql_to_python_type, url_decode
24class Query:
25 """Represents a SPARQL query for data inversion."""
27 def __init__(self, triples: list[QueryTriple] | None = None):
28 self.triples: list[QueryTriple] = triples or []
29 self.id_generator = IdGenerator()
30 self.codex = Codex()
31 self.generated_query = None
33 @property
34 def references(self) -> list[str]:
35 """Get all references used in the query."""
36 references = set()
37 for triple in self.triples:
38 references.update(triple.references)
39 return list(references)
41 @property
42 def template_references(self) -> list[str]:
43 """Get references extracted from URI/blank node templates."""
44 refs = set()
45 for triple in self.triples:
46 refs.update(triple.template_extracted_references)
47 return list(refs)
49 @property
50 def literal_references(self) -> list[str]:
51 """Get references available from object literals."""
52 refs = set()
53 for triple in self.triples:
54 refs.update(triple.plain_references)
55 return list(refs)
57 @property
58 def template_only_references(self) -> list[str]:
59 """Get references only available from template extraction.
61 When a reference is available from both a template and a literal,
62 the literal value is preferred (no URL decoding needed).
63 """
64 literal = set(self.literal_references)
65 return [ref for ref in self.template_references if ref not in literal]
67 def generate(self, all_mapping_rules: pd.DataFrame) -> str | None:
68 """Generate SPARQL query string."""
69 all_references = self.references
71 if not all_references:
72 logging.getLogger("kgi").warning("No references found, no query generated")
73 return None
75 triple_strings = []
77 # Separate SubjectTriples from ObjectTriples.
78 # SubjectTriples must be processed last so their BINDs are skipped
79 # when the reference is already available from a literal.
80 object_triples = [t for t in self.triples if not isinstance(t, SubjectTriple)]
81 subject_triples = [t for t in self.triples if isinstance(t, SubjectTriple)]
83 constant_triples = [
84 t for t in object_triples if t.rule["object_map_type"] == RML_CONSTANT
85 ]
86 reference_triples = [
87 t for t in object_triples if t.rule["object_map_type"] == RML_REFERENCE
88 ]
89 template_triples = [
90 t for t in object_triples if t.rule["object_map_type"] == RML_TEMPLATE
91 ]
92 parent_triples = [
93 t
94 for t in object_triples
95 if t.rule["object_map_type"] == RML_PARENT_TRIPLES_MAP
96 ]
98 for triple_group in [
99 constant_triples,
100 template_triples,
101 reference_triples,
102 parent_triples,
103 subject_triples,
104 ]:
105 for triple in triple_group:
106 triple_string = triple.generate(
107 self.id_generator, self.codex, all_mapping_rules
108 )
109 if triple_string is not None:
110 triple_strings.append(triple_string)
112 graph_info = self._get_exclusive_graph_info()
113 graph_binds = ""
114 graph_var: str | None = None
115 if graph_info:
116 graph_var = self.codex.get_id(str(graph_info["graph_map_value"]))
117 graph_binds = self._generate_graph_binds(graph_info, graph_var)
119 all_vars = [f"?{self.codex.get_id(ref)}" for ref in all_references]
120 select_part = "SELECT " + " ".join(all_vars) + " WHERE {"
122 if graph_var is not None:
123 body = (
124 f"GRAPH ?{graph_var} {{\n"
125 + "\n".join(triple_strings)
126 + "\n}\n"
127 + graph_binds
128 )
129 else:
130 body = "\n".join(triple_strings)
132 generated_query = select_part + body + "}"
133 self.generated_query = generated_query.replace("\\", "\\\\")
134 return self.generated_query
136 def _get_exclusive_graph_info(self) -> dict[str, object] | None:
137 """Return graph map info if there are column references exclusive to the graph map."""
138 all_graph_refs: set[str] = set()
139 all_other_refs: set[str] = set()
140 graph_info: dict[str, object] | None = None
142 for triple in self.triples:
143 all_other_refs.update(triple.subject_references)
144 all_other_refs.update(triple.predicate_references)
145 all_other_refs.update(triple.object_references)
147 g_refs = triple.graph_references
148 if g_refs:
149 all_graph_refs.update(g_refs)
150 if graph_info is None:
151 graph_info = {
152 "graph_map_type": triple.rule.get("graph_map_type"),
153 "graph_map_value": triple.rule.get("graph_map_value"),
154 "graph_references": triple.rule.get("graph_references", []),
155 "graph_references_template": triple.rule.get(
156 "graph_references_template"
157 ),
158 }
160 exclusive = all_graph_refs - all_other_refs
161 if not exclusive or graph_info is None:
162 return None
164 graph_info["exclusive_references"] = exclusive
165 return graph_info
167 def _generate_graph_binds(
168 self, graph_info: dict[str, object], graph_var: str
169 ) -> str:
170 """Generate SPARQL BINDs for extracting column values from graph IRIs."""
171 graph_map_type = str(graph_info["graph_map_type"])
172 rule = self.triples[0].rule
174 if graph_map_type == RML_REFERENCE:
175 ref = list(graph_info["exclusive_references"]) # type: ignore[arg-type]
176 ref_id = Identifier.generate_plain_identifier(rule, str(ref[0])) or str(
177 ref[0]
178 )
179 ref_var = self.codex.get_id(ref_id)
180 return f"BIND(STR(?{graph_var}) AS ?{ref_var})\n"
182 if graph_map_type == RML_TEMPLATE:
183 return (
184 extract_from_iri_template(
185 template_value=str(graph_info["graph_map_value"]),
186 references_template=str(graph_info["graph_references_template"]),
187 references=list(graph_info["graph_references"]), # type: ignore[arg-type]
188 rule=rule,
189 codex=self.codex,
190 id_generator=self.id_generator,
191 slice_label="graph",
192 )
193 + "\n"
194 )
196 return ""
198 def decode_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
199 """Decode query results DataFrame."""
200 df = df.copy(deep=True)
202 template_only = set(self.template_only_references)
203 for reference in self.references:
204 column_reference = self.codex.get_id(reference)
205 if reference in template_only:
206 df[column_reference] = df[column_reference].apply(url_decode)
207 df.rename(columns={column_reference: reference}, inplace=True)
209 return df
211 def execute_on_endpoint(
212 self, endpoint: Endpoint, all_mapping_rules: pd.DataFrame
213 ) -> pd.DataFrame:
214 """Execute query on a SPARQL endpoint."""
215 self.generated_query = self.generate(all_mapping_rules)
216 assert self.generated_query is not None
217 json_result = endpoint.query(self.generated_query)
218 df = _json_sparql_to_dataframe(json_result)
219 return self.decode_dataframe(df)
222def _json_sparql_to_dataframe(json_result: str) -> pd.DataFrame:
223 result_data = json.loads(json_result)
224 columns = result_data["head"]["vars"]
225 data = []
226 for binding in result_data["results"]["bindings"]:
227 row = {}
228 for col in columns:
229 if col in binding:
230 value = binding[col]["value"]
231 datatype = binding[col].get("datatype")
232 row[col] = sparql_to_python_type(value, datatype)
233 else:
234 row[col] = None
235 data.append(row)
236 return pd.DataFrame(data, columns=columns)
239def retrieve_data(
240 mapping_rules: pd.DataFrame,
241 source_rules: pd.DataFrame,
242 endpoint: Endpoint,
243 decode_columns: bool = False,
244) -> tuple[pd.DataFrame | None, str | None]:
245 """Retrieve data from SPARQL endpoint using mapping rules."""
246 triples: list[QueryTriple] = [
247 QueryTriple(rule)
248 for _, rule in source_rules.iterrows()
249 if rule["object_map_type"] not in [RML_BLANK_NODE]
250 ]
252 subject_groups = list(source_rules.groupby("subject_map_value", dropna=False))
253 triples.extend(
254 SubjectTriple(subject_rules.iloc[0]) for _, subject_rules in subject_groups
255 )
256 query = Query(triples)
257 generated_query = query.generate(mapping_rules)
259 if generated_query is None:
260 logging.getLogger("kgi").warning("No query generated (no references found)")
261 return None, None
263 try:
264 result = endpoint.query(generated_query)
265 if not result.strip():
266 return pd.DataFrame(), generated_query
268 df = _json_sparql_to_dataframe(result)
270 if decode_columns:
271 df = query.decode_dataframe(df)
273 return df, generated_query
275 except Exception as e:
276 logging.getLogger("kgi").warning(f"Error while querying endpoint: {e}")
277 raise