Coverage for kgi / query.py: 81%

144 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-23 08:53 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5"""SPARQL query generation and execution.""" 

6 

7import json 

8import logging 

9 

10import pandas as pd 

11 

12from .base import Endpoint 

13from .constants import ( 

14 RML_BLANK_NODE, 

15 RML_CONSTANT, 

16 RML_PARENT_TRIPLES_MAP, 

17 RML_REFERENCE, 

18 RML_TEMPLATE, 

19) 

20from .triples import QueryTriple, SubjectTriple, extract_from_iri_template 

21from .utils import Codex, IdGenerator, Identifier, sparql_to_python_type, url_decode 

22 

23 

24class Query: 

25 """Represents a SPARQL query for data inversion.""" 

26 

27 def __init__(self, triples: list[QueryTriple] | None = None): 

28 self.triples: list[QueryTriple] = triples or [] 

29 self.id_generator = IdGenerator() 

30 self.codex = Codex() 

31 self.generated_query = None 

32 

33 @property 

34 def references(self) -> list[str]: 

35 """Get all references used in the query.""" 

36 references = set() 

37 for triple in self.triples: 

38 references.update(triple.references) 

39 return list(references) 

40 

41 @property 

42 def template_references(self) -> list[str]: 

43 """Get references extracted from URI/blank node templates.""" 

44 refs = set() 

45 for triple in self.triples: 

46 refs.update(triple.template_extracted_references) 

47 return list(refs) 

48 

49 @property 

50 def literal_references(self) -> list[str]: 

51 """Get references available from object literals.""" 

52 refs = set() 

53 for triple in self.triples: 

54 refs.update(triple.plain_references) 

55 return list(refs) 

56 

57 @property 

58 def template_only_references(self) -> list[str]: 

59 """Get references only available from template extraction. 

60 

61 When a reference is available from both a template and a literal, 

62 the literal value is preferred (no URL decoding needed). 

63 """ 

64 literal = set(self.literal_references) 

65 return [ref for ref in self.template_references if ref not in literal] 

66 

67 def generate(self, all_mapping_rules: pd.DataFrame) -> str | None: 

68 """Generate SPARQL query string.""" 

69 all_references = self.references 

70 

71 if not all_references: 

72 logging.getLogger("kgi").warning("No references found, no query generated") 

73 return None 

74 

75 triple_strings = [] 

76 

77 # Separate SubjectTriples from ObjectTriples. 

78 # SubjectTriples must be processed last so their BINDs are skipped 

79 # when the reference is already available from a literal. 

80 object_triples = [t for t in self.triples if not isinstance(t, SubjectTriple)] 

81 subject_triples = [t for t in self.triples if isinstance(t, SubjectTriple)] 

82 

83 constant_triples = [ 

84 t for t in object_triples if t.rule["object_map_type"] == RML_CONSTANT 

85 ] 

86 reference_triples = [ 

87 t for t in object_triples if t.rule["object_map_type"] == RML_REFERENCE 

88 ] 

89 template_triples = [ 

90 t for t in object_triples if t.rule["object_map_type"] == RML_TEMPLATE 

91 ] 

92 parent_triples = [ 

93 t 

94 for t in object_triples 

95 if t.rule["object_map_type"] == RML_PARENT_TRIPLES_MAP 

96 ] 

97 

98 for triple_group in [ 

99 constant_triples, 

100 template_triples, 

101 reference_triples, 

102 parent_triples, 

103 subject_triples, 

104 ]: 

105 for triple in triple_group: 

106 triple_string = triple.generate( 

107 self.id_generator, self.codex, all_mapping_rules 

108 ) 

109 if triple_string is not None: 

110 triple_strings.append(triple_string) 

111 

112 graph_info = self._get_exclusive_graph_info() 

113 graph_binds = "" 

114 graph_var: str | None = None 

115 if graph_info: 

116 graph_var = self.codex.get_id(str(graph_info["graph_map_value"])) 

117 graph_binds = self._generate_graph_binds(graph_info, graph_var) 

118 

119 all_vars = [f"?{self.codex.get_id(ref)}" for ref in all_references] 

120 select_part = "SELECT " + " ".join(all_vars) + " WHERE {" 

121 

122 if graph_var is not None: 

123 body = ( 

124 f"GRAPH ?{graph_var} {{\n" 

125 + "\n".join(triple_strings) 

126 + "\n}\n" 

127 + graph_binds 

128 ) 

129 else: 

130 body = "\n".join(triple_strings) 

131 

132 generated_query = select_part + body + "}" 

133 self.generated_query = generated_query.replace("\\", "\\\\") 

134 return self.generated_query 

135 

136 def _get_exclusive_graph_info(self) -> dict[str, object] | None: 

137 """Return graph map info if there are column references exclusive to the graph map.""" 

138 all_graph_refs: set[str] = set() 

139 all_other_refs: set[str] = set() 

140 graph_info: dict[str, object] | None = None 

141 

142 for triple in self.triples: 

143 all_other_refs.update(triple.subject_references) 

144 all_other_refs.update(triple.predicate_references) 

145 all_other_refs.update(triple.object_references) 

146 

147 g_refs = triple.graph_references 

148 if g_refs: 

149 all_graph_refs.update(g_refs) 

150 if graph_info is None: 

151 graph_info = { 

152 "graph_map_type": triple.rule.get("graph_map_type"), 

153 "graph_map_value": triple.rule.get("graph_map_value"), 

154 "graph_references": triple.rule.get("graph_references", []), 

155 "graph_references_template": triple.rule.get( 

156 "graph_references_template" 

157 ), 

158 } 

159 

160 exclusive = all_graph_refs - all_other_refs 

161 if not exclusive or graph_info is None: 

162 return None 

163 

164 graph_info["exclusive_references"] = exclusive 

165 return graph_info 

166 

167 def _generate_graph_binds( 

168 self, graph_info: dict[str, object], graph_var: str 

169 ) -> str: 

170 """Generate SPARQL BINDs for extracting column values from graph IRIs.""" 

171 graph_map_type = str(graph_info["graph_map_type"]) 

172 rule = self.triples[0].rule 

173 

174 if graph_map_type == RML_REFERENCE: 

175 ref = list(graph_info["exclusive_references"]) # type: ignore[arg-type] 

176 ref_id = Identifier.generate_plain_identifier(rule, str(ref[0])) or str( 

177 ref[0] 

178 ) 

179 ref_var = self.codex.get_id(ref_id) 

180 return f"BIND(STR(?{graph_var}) AS ?{ref_var})\n" 

181 

182 if graph_map_type == RML_TEMPLATE: 

183 return ( 

184 extract_from_iri_template( 

185 template_value=str(graph_info["graph_map_value"]), 

186 references_template=str(graph_info["graph_references_template"]), 

187 references=list(graph_info["graph_references"]), # type: ignore[arg-type] 

188 rule=rule, 

189 codex=self.codex, 

190 id_generator=self.id_generator, 

191 slice_label="graph", 

192 ) 

193 + "\n" 

194 ) 

195 

196 return "" 

197 

198 def decode_dataframe(self, df: pd.DataFrame) -> pd.DataFrame: 

199 """Decode query results DataFrame.""" 

200 df = df.copy(deep=True) 

201 

202 template_only = set(self.template_only_references) 

203 for reference in self.references: 

204 column_reference = self.codex.get_id(reference) 

205 if reference in template_only: 

206 df[column_reference] = df[column_reference].apply(url_decode) 

207 df.rename(columns={column_reference: reference}, inplace=True) 

208 

209 return df 

210 

211 def execute_on_endpoint( 

212 self, endpoint: Endpoint, all_mapping_rules: pd.DataFrame 

213 ) -> pd.DataFrame: 

214 """Execute query on a SPARQL endpoint.""" 

215 self.generated_query = self.generate(all_mapping_rules) 

216 assert self.generated_query is not None 

217 json_result = endpoint.query(self.generated_query) 

218 df = _json_sparql_to_dataframe(json_result) 

219 return self.decode_dataframe(df) 

220 

221 

222def _json_sparql_to_dataframe(json_result: str) -> pd.DataFrame: 

223 result_data = json.loads(json_result) 

224 columns = result_data["head"]["vars"] 

225 data = [] 

226 for binding in result_data["results"]["bindings"]: 

227 row = {} 

228 for col in columns: 

229 if col in binding: 

230 value = binding[col]["value"] 

231 datatype = binding[col].get("datatype") 

232 row[col] = sparql_to_python_type(value, datatype) 

233 else: 

234 row[col] = None 

235 data.append(row) 

236 return pd.DataFrame(data, columns=columns) 

237 

238 

239def retrieve_data( 

240 mapping_rules: pd.DataFrame, 

241 source_rules: pd.DataFrame, 

242 endpoint: Endpoint, 

243 decode_columns: bool = False, 

244) -> tuple[pd.DataFrame | None, str | None]: 

245 """Retrieve data from SPARQL endpoint using mapping rules.""" 

246 triples: list[QueryTriple] = [ 

247 QueryTriple(rule) 

248 for _, rule in source_rules.iterrows() 

249 if rule["object_map_type"] not in [RML_BLANK_NODE] 

250 ] 

251 

252 subject_groups = list(source_rules.groupby("subject_map_value", dropna=False)) 

253 triples.extend( 

254 SubjectTriple(subject_rules.iloc[0]) for _, subject_rules in subject_groups 

255 ) 

256 query = Query(triples) 

257 generated_query = query.generate(mapping_rules) 

258 

259 if generated_query is None: 

260 logging.getLogger("kgi").warning("No query generated (no references found)") 

261 return None, None 

262 

263 try: 

264 result = endpoint.query(generated_query) 

265 if not result.strip(): 

266 return pd.DataFrame(), generated_query 

267 

268 df = _json_sparql_to_dataframe(result) 

269 

270 if decode_columns: 

271 df = query.decode_dataframe(df) 

272 

273 return df, generated_query 

274 

275 except Exception as e: 

276 logging.getLogger("kgi").warning(f"Error while querying endpoint: {e}") 

277 raise