Coverage for kgi / triples.py: 90%

250 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-23 08:53 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5"""Triple classes for SPARQL query generation.""" 

6 

7import json 

8import logging 

9 

10import pandas as pd 

11 

12from .base import Triple 

13from .constants import ( 

14 RML_BLANK_NODE, 

15 RML_CONSTANT, 

16 RML_DEFAULT_GRAPH, 

17 RML_IRI, 

18 RML_LITERAL, 

19 RML_PARENT_TRIPLES_MAP, 

20 RML_REFERENCE, 

21 RML_TEMPLATE, 

22) 

23from .utils import Codex, IdGenerator, Identifier 

24 

25 

26def extract_from_iri_template( 

27 template_value: str, 

28 references_template: str, 

29 references: list[str], 

30 rule: pd.Series, 

31 codex: Codex, 

32 id_generator: IdGenerator, 

33 slice_label: str, 

34) -> str: 

35 """Generate SPARQL FILTER + BIND patterns to extract column values from a template IRI. 

36 

37 Shared by SubjectTriple (subject templates) and graph map extraction. 

38 """ 

39 source_var = codex.get_id(template_value) 

40 

41 lines = [] 

42 lines.append(f"FILTER(REGEX(STR(?{source_var}), '{references_template}'))") 

43 

44 evaluated_template = references_template 

45 current_slice = source_var 

46 

47 for reference in references: 

48 current_pre_string = evaluated_template.split("(", 1)[0] 

49 current_post_string = evaluated_template.split(")", 1)[1] 

50 ref_str = str(reference) 

51 reference_identifier = ( 

52 Identifier.generate_plain_identifier(rule, ref_str) or ref_str 

53 ) 

54 current_reference, already_bound = codex.get_id_and_is_bound( 

55 reference_identifier 

56 ) 

57 

58 if current_post_string == "": 

59 target = ( 

60 current_reference 

61 if not already_bound 

62 else codex.get_id( 

63 f"{template_value}_slice_{slice_label}_{id_generator.get_id()}" 

64 ) 

65 ) 

66 lines.append( 

67 f"BIND(STRAFTER(STR(?{current_slice}), '{current_pre_string}') as ?{target})" 

68 ) 

69 else: 

70 next_pre_string = current_post_string.split("(", 1)[0] 

71 next_slice = codex.get_id( 

72 f"{template_value}_slice_{slice_label}_{id_generator.get_id()}" 

73 ) 

74 lines.append( 

75 f"BIND(STRAFTER(STR(?{current_slice}), '{current_pre_string}') as ?{next_slice})" 

76 ) 

77 target = ( 

78 current_reference 

79 if not already_bound 

80 else codex.get_id( 

81 f"{reference_identifier}_temp_{id_generator.get_id()}" 

82 ) 

83 ) 

84 lines.append( 

85 f"BIND(STRBEFORE(STR(?{next_slice}), '{next_pre_string}') AS ?{target})" 

86 ) 

87 current_slice = next_slice 

88 

89 evaluated_template = current_post_string 

90 

91 return "\n".join(lines) 

92 

93 

94class QueryTriple(Triple): 

95 """Represents a query triple with subject, predicate, and object.""" 

96 

97 def __init__(self, rule: pd.Series): 

98 self.rule = rule 

99 

100 @property 

101 def references(self) -> set[str]: 

102 """Get all references used in this triple.""" 

103 return set.union( 

104 self.subject_references, 

105 self.predicate_references, 

106 self.object_references, 

107 self.graph_references, 

108 ) 

109 

110 @property 

111 def template_extracted_references(self) -> set[str]: 

112 """Get references extracted from URI templates (subject, predicate, object, graph template).""" 

113 refs = set.union(self.subject_references, self.predicate_references) 

114 if self.rule["object_map_type"] == RML_TEMPLATE: 

115 refs = refs.union(self.object_references) 

116 graph_map_type = self.rule.get("graph_map_type") 

117 if isinstance(graph_map_type, str) and graph_map_type == RML_TEMPLATE: 

118 refs = refs.union(self.graph_references) 

119 return refs 

120 

121 @property 

122 def plain_references(self) -> set[str]: 

123 """Get references available directly from object literals.""" 

124 refs: set[str] = set() 

125 if self.rule["object_map_type"] in (RML_REFERENCE, RML_PARENT_TRIPLES_MAP): 

126 refs = set(self.object_references) 

127 graph_map_type = self.rule.get("graph_map_type") 

128 if isinstance(graph_map_type, str) and graph_map_type == RML_REFERENCE: 

129 refs = refs.union(self.graph_references) 

130 return refs 

131 

132 @property 

133 def subject_references(self) -> set[str]: 

134 """Get subject references.""" 

135 return { 

136 ident 

137 for value in self.rule["subject_references"] 

138 if (ident := Identifier.generate_plain_identifier(self.rule, str(value))) 

139 is not None 

140 } 

141 

142 @property 

143 def predicate_references(self) -> set[str]: 

144 """Get predicate references.""" 

145 return { 

146 ident 

147 for value in self.rule["predicate_references"] 

148 if (ident := Identifier.generate_plain_identifier(self.rule, str(value))) 

149 is not None 

150 } 

151 

152 @property 

153 def object_references(self) -> set[str]: 

154 """Get object references.""" 

155 return { 

156 ident 

157 for value in self.rule["object_references"] 

158 if (ident := Identifier.generate_plain_identifier(self.rule, str(value))) 

159 is not None 

160 } 

161 

162 @property 

163 def graph_references(self) -> set[str]: 

164 """Get graph map references.""" 

165 graph_refs = self.rule.get("graph_references") 

166 if not isinstance(graph_refs, list): 

167 return set() 

168 return { 

169 ident 

170 for value in graph_refs 

171 if (ident := Identifier.generate_plain_identifier(self.rule, str(value))) 

172 is not None 

173 } 

174 

175 def _wrap_in_graph(self, pattern: str) -> str: 

176 graph_iri = self._graph_iri() 

177 if graph_iri is not None: 

178 return f"GRAPH <{graph_iri}> {{\n{pattern}\n}}" 

179 return pattern 

180 

181 def _graph_iri(self) -> str | None: 

182 graph_map_type = self.rule.get("graph_map_type") 

183 if isinstance(graph_map_type, str) and graph_map_type == RML_CONSTANT: 

184 graph_iri = str(self.rule["graph_map_value"]) 

185 if graph_iri != RML_DEFAULT_GRAPH: 

186 return graph_iri 

187 return None 

188 

189 def generate( 

190 self, id_generator: IdGenerator, codex: Codex, all_mapping_rules: pd.DataFrame 

191 ) -> str | None: 

192 """Generate SPARQL triple pattern, wrapped in GRAPH block if needed.""" 

193 pattern = self._generate_pattern(id_generator, codex, all_mapping_rules) 

194 if pattern is None: 

195 return None 

196 if str(self.rule["object_map_type"]) == RML_PARENT_TRIPLES_MAP: 

197 return pattern 

198 return self._wrap_in_graph(pattern) 

199 

200 def _generate_pattern( 

201 self, id_generator: IdGenerator, codex: Codex, all_mapping_rules: pd.DataFrame 

202 ) -> str | None: 

203 subject_reference = codex.get_id(str(self.rule["subject_map_value"])) 

204 predicate = f"<{self.rule['predicate_map_value']}>" 

205 object_map_value = str(self.rule["object_map_value"]) 

206 object_map_type = str(self.rule["object_map_type"]) 

207 object_references_template = str(self.rule["object_references_template"]) 

208 

209 if object_map_type == RML_CONSTANT: 

210 object_term_type = self.rule["object_termtype"] 

211 if object_term_type == RML_IRI: 

212 object_map_value = f"<{object_map_value}>" 

213 elif object_term_type == RML_BLANK_NODE: 

214 return None 

215 elif object_term_type == RML_LITERAL: 

216 object_map_value = f'"{object_map_value}"' 

217 return f"?{subject_reference} {predicate} {object_map_value} ." 

218 

219 if object_map_type == RML_REFERENCE: 

220 object_identifier = ( 

221 Identifier.generate_plain_identifier(self.rule, object_map_value) 

222 or object_map_value 

223 ) 

224 object_reference, already_bound = codex.get_id_and_is_bound( 

225 object_identifier 

226 ) 

227 

228 lines = [] 

229 temp_object_reference, already_bound = codex.get_id_and_is_bound( 

230 f"{object_identifier}_temp_{id_generator.get_id()}" 

231 ) 

232 if already_bound: 

233 lines.append( 

234 f"?{subject_reference} {predicate} ?{temp_object_reference} ." 

235 ) 

236 lines.append(f"BIND(?{temp_object_reference} as ?{object_reference})") 

237 lines.append( 

238 f"FILTER(!BOUND(?{object_reference}) || !BOUND(?{temp_object_reference}) || ?{temp_object_reference} = ?{object_reference})" 

239 ) 

240 else: 

241 lines.append(f"?{subject_reference} {predicate} ?{object_reference} .") 

242 return "\n".join(lines) 

243 

244 elif object_map_type == RML_TEMPLATE: 

245 object_identifier = ( 

246 Identifier.generate_plain_identifier(self.rule, object_map_value) 

247 or object_map_value 

248 ) 

249 object_reference, already_bound = codex.get_id_and_is_bound( 

250 object_identifier 

251 ) 

252 lines = [] 

253 lines.append(f"?{subject_reference} {predicate} ?{object_reference}") 

254 

255 evaluated_template = object_references_template 

256 current_slice = object_reference 

257 

258 for obj in self.rule["object_references"]: 

259 current_pre_string = evaluated_template.split("(", 1)[0] 

260 current_post_string = evaluated_template.split(")", 1)[1] 

261 next_pre_string = current_post_string.split("(", 1)[0] 

262 obj_str = str(obj) 

263 object_identifier = ( 

264 Identifier.generate_plain_identifier(self.rule, obj_str) or obj_str 

265 ) 

266 object_reference, already_bound = codex.get_id_and_is_bound( 

267 object_identifier 

268 ) 

269 next_slice_identifier = ( 

270 f"{object_identifier}_slice_{id_generator.get_id()}" 

271 ) 

272 next_slice = codex.get_id(next_slice_identifier) 

273 unescaped_current_pre_string = current_pre_string.replace("\\", "") 

274 unescaped_next_pre_string = next_pre_string.replace("\\", "") 

275 

276 lines.append( 

277 f"BIND(STRAFTER(STR(?{current_slice}), '{unescaped_current_pre_string}') as ?{next_slice})" 

278 ) 

279 

280 if current_post_string == "": 

281 if not already_bound: 

282 lines.append(f"BIND(?{next_slice} as ?{object_reference})") 

283 else: 

284 temp_reference_identifier = ( 

285 f"{object_identifier}_temp_{id_generator.get_id()}" 

286 ) 

287 temp_reference = codex.get_id(temp_reference_identifier) 

288 lines.append( 

289 f"BIND(STRBEFORE(STR(?{next_slice}), '{unescaped_next_pre_string}') AS ?{temp_reference})" 

290 ) 

291 if not already_bound: 

292 lines.append(f"BIND(?{temp_reference} as ?{object_reference})") 

293 

294 evaluated_template = current_post_string 

295 current_slice = next_slice 

296 

297 return "\n".join(lines) 

298 

299 elif object_map_type == RML_PARENT_TRIPLES_MAP: 

300 object_parent_triples_map_id = self.rule["object_map_value"] 

301 object_rule = all_mapping_rules[ 

302 all_mapping_rules["triples_map_id"] == object_parent_triples_map_id 

303 ].iloc[0] 

304 object_map_value = object_rule["subject_map_value"] 

305 object_reference = codex.get_id(object_map_value) 

306 predicate = f"<{self.rule['predicate_map_value']}>" 

307 

308 graph_iri = self._graph_iri() 

309 if graph_iri is not None: 

310 lines = [ 

311 f"OPTIONAL {{ GRAPH <{graph_iri}> {{ ?{subject_reference} {predicate} ?{object_reference} ." 

312 ] 

313 else: 

314 lines = [ 

315 f"OPTIONAL {{ ?{subject_reference} {predicate} ?{object_reference} ." 

316 ] 

317 

318 raw_join_value = self.rule["object_join_conditions"] 

319 if isinstance(raw_join_value, str): 

320 join_conditions = json.loads( 

321 raw_join_value.replace("'", '"') 

322 ) 

323 else: 

324 join_conditions = {} 

325 parent_template = object_rule["subject_references_template"] 

326 parent_references = object_rule["subject_references"] 

327 

328 for jc in join_conditions.values(): 

329 child_value = jc["child_value"] 

330 parent_value = jc["parent_value"] 

331 child_identifier = ( 

332 Identifier.generate_plain_identifier(self.rule, child_value) 

333 or child_value 

334 ) 

335 child_ref, child_already_bound = codex.get_id_and_is_bound( 

336 child_identifier 

337 ) 

338 

339 evaluated_template = parent_template 

340 current_slice = object_reference 

341 

342 for ref in parent_references: 

343 pre_string = evaluated_template.split("(", 1)[0] 

344 post_string = evaluated_template.split(")", 1)[1] 

345 next_slice_id = ( 

346 f"{object_map_value}_join_slice_{id_generator.get_id()}" 

347 ) 

348 next_slice = codex.get_id(next_slice_id) 

349 lines.append( 

350 f"BIND(STRAFTER(STR(?{current_slice}), '{pre_string}') as ?{next_slice})" 

351 ) 

352 

353 if ref == parent_value: 

354 if post_string == "": 

355 if not child_already_bound: 

356 lines.append(f"BIND(?{next_slice} as ?{child_ref})") 

357 else: 

358 next_pre = post_string.split("(", 1)[0] 

359 temp_id = f"{child_identifier}_temp_{id_generator.get_id()}" 

360 temp_ref = codex.get_id(temp_id) 

361 lines.append( 

362 f"BIND(STRBEFORE(STR(?{next_slice}), '{next_pre}') AS ?{temp_ref})" 

363 ) 

364 if not child_already_bound: 

365 lines.append(f"BIND(?{temp_ref} as ?{child_ref})") 

366 break 

367 

368 evaluated_template = post_string 

369 current_slice = next_slice 

370 

371 if graph_iri is not None: 

372 lines.append("} }") 

373 else: 

374 lines.append("}") 

375 return "\n".join(lines) 

376 

377 else: 

378 logging.getLogger("kgi").error( 

379 f"Unsupported object map type: {object_map_type}" 

380 ) 

381 return None 

382 

383 

384class SubjectTriple(QueryTriple): 

385 """Represents a subject triple for template extraction.""" 

386 

387 def __init__(self, rule: pd.Series): 

388 super().__init__(rule) 

389 

390 @property 

391 def template_extracted_references(self) -> set[str]: 

392 """Subject references extracted from templates (not column references).""" 

393 if self.rule["subject_map_type"] == RML_REFERENCE: 

394 return set() 

395 return self.subject_references 

396 

397 @property 

398 def plain_references(self) -> set[str]: 

399 """Column-reference subjects are plain references (no URL decoding).""" 

400 if self.rule["subject_map_type"] == RML_REFERENCE: 

401 return self.subject_references 

402 return set() 

403 

404 def generate( 

405 self, id_generator: IdGenerator, codex: Codex, all_mapping_rules: pd.DataFrame 

406 ) -> str | None: # pyright: ignore[reportUnusedParameter] 

407 """Generate SPARQL pattern for subject extraction.""" 

408 all_already_bound = all( 

409 (Identifier.generate_plain_identifier(self.rule, str(ref)) or str(ref)) 

410 in codex.codex 

411 for ref in self.rule["subject_references"] 

412 ) 

413 if all_already_bound: 

414 return None 

415 

416 subject_map_type = self.rule["subject_map_type"] 

417 subject_term_type = self.rule["subject_termtype"] 

418 

419 if subject_map_type == RML_REFERENCE: 

420 # Column-reference subjects: the subject variable already binds 

421 # to the IRI which IS the column value. No extraction needed. 

422 return None 

423 

424 if subject_map_type == RML_TEMPLATE: 

425 if subject_term_type == RML_IRI: 

426 return self._generate_iri_template(codex, id_generator) 

427 elif subject_term_type == RML_BLANK_NODE: 

428 return self._generate_blank_node_template(codex, id_generator) 

429 

430 logging.getLogger("kgi").error( 

431 f"Unsupported subject map type: {subject_map_type} or subject term type: {subject_term_type}" 

432 ) 

433 return None 

434 

435 def _generate_iri_template(self, codex: Codex, id_generator: IdGenerator): 

436 """Generate SPARQL for IRI template.""" 

437 return extract_from_iri_template( 

438 template_value=str(self.rule["subject_map_value"]), 

439 references_template=str(self.rule["subject_references_template"]), 

440 references=list(self.rule["subject_references"]), 

441 rule=self.rule, 

442 codex=codex, 

443 id_generator=id_generator, 

444 slice_label="subject", 

445 ) 

446 

447 def _generate_blank_node_template(self, codex: Codex, id_generator: IdGenerator): 

448 """Generate SPARQL for blank node template.""" 

449 subject_map_value = str(self.rule["subject_map_value"]) 

450 subject_references_template = str(self.rule["subject_references_template"]) 

451 subject_reference = codex.get_id(subject_map_value) 

452 

453 lines = [] 

454 evaluated_template = subject_references_template 

455 current_slice_reference = subject_reference 

456 

457 for reference in self.rule["subject_references"]: 

458 current_pre_string = evaluated_template.split("(", 1)[0] 

459 current_post_string = ( 

460 evaluated_template.split(")", 1)[1] if ")" in evaluated_template else "" 

461 ) 

462 

463 next_slice_reference_identifier = ( 

464 f"{subject_map_value}_slice_{id_generator.get_id()}" 

465 ) 

466 next_slice_reference = codex.get_id(next_slice_reference_identifier) 

467 

468 ref_str = str(reference) 

469 reference_identifier = ( 

470 Identifier.generate_plain_identifier(self.rule, ref_str) or ref_str 

471 ) 

472 current_reference, already_bound = codex.get_id_and_is_bound( 

473 reference_identifier 

474 ) 

475 

476 unescaped_current_pre_string = current_pre_string.replace("\\", "") 

477 if current_post_string == "": 

478 if not already_bound: 

479 lines.append( 

480 f"BIND(STRAFTER(STR(?{current_slice_reference}), '{unescaped_current_pre_string}') as ?{current_reference})" 

481 ) 

482 else: 

483 unescaped_next_pre_string = current_post_string.split("(", 1)[ 

484 0 

485 ].replace("\\", "") 

486 temp_reference_identifier = ( 

487 f"{reference_identifier}_temp_{id_generator.get_id()}" 

488 ) 

489 temp_reference = codex.get_id(temp_reference_identifier) 

490 

491 lines.append( 

492 f"BIND(STRAFTER(STR(?{current_slice_reference}), '{unescaped_current_pre_string}') as ?{next_slice_reference})" 

493 ) 

494 lines.append( 

495 f"BIND(STRBEFORE(STR(?{next_slice_reference}), '{unescaped_next_pre_string}') AS ?{temp_reference})" 

496 ) 

497 if not already_bound: 

498 lines.append(f"BIND(?{temp_reference} as ?{current_reference})") 

499 current_slice_reference = next_slice_reference 

500 

501 evaluated_template = current_post_string 

502 

503 return "\n".join(lines)