Coverage for kgi / core.py: 71%

198 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-23 08:53 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import configparser 

6import logging 

7import os 

8import pathlib 

9import re 

10import tempfile 

11from urllib.parse import quote_plus 

12 

13import pandas as pd 

14from morph_kgc.args_parser import load_config_from_argument 

15from morph_kgc.mapping.mapping_parser import retrieve_mappings 

16from pyoxigraph import BlankNode, Literal, NamedNode, Quad, RdfFormat, Store 

17 

18from .constants import ( 

19 RML_IRI, 

20 RML_REFERENCE, 

21 RML_SOURCE, 

22 RML_TEMPLATE, 

23 RR_LITERAL, 

24 RR_SUBJECT_MAP, 

25 RR_TERM_TYPE, 

26) 

27from .endpoints import EndpointFactory, RemoteEndpoint, VirtuosoEndpoint 

28from .exceptions import ( 

29 MappingError, 

30 NoDataError, 

31 NonInvertibleError, 

32 UnsupportedMappingError, 

33) 

34from .models import ReconstructedTable 

35from .query import retrieve_data 

36from .schema import DatabaseSchemaRetriever, apply_schema_ordering, apply_schema_types 

37from .templates import RDBTemplate 

38from .utils import insert_columns 

39 

40D2RQ_DATABASE = NamedNode("http://www.wiwiss.fu-berlin.de/suhl/bizer/D2RQ/0.1#Database") 

41D2RQ_JDBC_DSN = NamedNode("http://www.wiwiss.fu-berlin.de/suhl/bizer/D2RQ/0.1#jdbcDSN") 

42D2RQ_USERNAME = NamedNode("http://www.wiwiss.fu-berlin.de/suhl/bizer/D2RQ/0.1#username") 

43D2RQ_PASSWORD = NamedNode("http://www.wiwiss.fu-berlin.de/suhl/bizer/D2RQ/0.1#password") 

44 

45JDBC_DRIVERS: dict[str, str] = { 

46 "postgresql": "postgresql+psycopg2", 

47 "mysql": "mysql+pymysql", 

48} 

49 

50RR_SQL_QUERY = NamedNode("http://www.w3.org/ns/r2rml#sqlQuery") 

51RML_QUERY_NEW = NamedNode("http://w3id.org/rml/query") 

52RML_QUERY_LEGACY = NamedNode("http://semweb.mmlab.be/ns/rml#query") 

53RR_TRIPLES_MAP = NamedNode("http://www.w3.org/ns/r2rml#TriplesMap") 

54RDF_TYPE = NamedNode("http://www.w3.org/1999/02/22-rdf-syntax-ns#type") 

55 

56 

57def get_logger() -> logging.Logger: 

58 return logging.getLogger("kgi") 

59 

60 

61def _parse_mapping_store(mapping_file: str) -> Store: 

62 store = Store() 

63 store.load(path=mapping_file, format=RdfFormat.TURTLE) 

64 return store 

65 

66 

67def _literal_value(quad: Quad) -> str: 

68 obj = quad.object 

69 if isinstance(obj, Literal): 

70 return obj.value 

71 return str(obj) 

72 

73 

74def _extract_db_url_from_mapping(store: Store) -> str | None: 

75 databases = list(store.quads_for_pattern(None, RDF_TYPE, D2RQ_DATABASE)) 

76 if not databases: 

77 return None 

78 db_node = databases[0].subject 

79 dsn_quads = list(store.quads_for_pattern(db_node, D2RQ_JDBC_DSN, None)) 

80 if not dsn_quads: 

81 return None 

82 jdbc_dsn = _literal_value(dsn_quads[0]) 

83 match = re.match(r"jdbc:(\w+)://(.+)", jdbc_dsn) 

84 if not match: 

85 return None 

86 db_type, host_and_db = match.group(1), match.group(2) 

87 driver = JDBC_DRIVERS.get(db_type) 

88 if not driver: 

89 get_logger().warning(f"Unsupported JDBC driver type: {db_type}") 

90 return None 

91 user_quads = list(store.quads_for_pattern(db_node, D2RQ_USERNAME, None)) 

92 pass_quads = list(store.quads_for_pattern(db_node, D2RQ_PASSWORD, None)) 

93 username = _literal_value(user_quads[0]) if user_quads else "" 

94 password = _literal_value(pass_quads[0]) if pass_quads else "" 

95 credentials = f"{quote_plus(username)}:{quote_plus(password)}@" if username else "" 

96 return f"{driver}://{credentials}{host_and_db}" 

97 

98 

99def _check_for_sql_queries(store: Store) -> bool: 

100 for predicate in (RR_SQL_QUERY, RML_QUERY_NEW, RML_QUERY_LEGACY): 

101 if any(store.quads_for_pattern(None, predicate, None)): 

102 return True 

103 return False 

104 

105 

106def _check_for_multiple_subject_maps(store: Store) -> bool: 

107 for quad in store.quads_for_pattern(None, RDF_TYPE, RR_TRIPLES_MAP): 

108 triples_map = quad.subject 

109 subject_maps = list( 

110 store.quads_for_pattern(triples_map, RR_SUBJECT_MAP, None) 

111 ) 

112 if len(subject_maps) > 1: 

113 return True 

114 return False 

115 

116 

117def _check_for_literal_subjects(store: Store) -> bool: 

118 for sm_quad in store.quads_for_pattern(None, RR_SUBJECT_MAP, None): 

119 subject_map_node = sm_quad.object 

120 if not isinstance(subject_map_node, (NamedNode, BlankNode)): 

121 continue 

122 if any(store.quads_for_pattern(subject_map_node, RR_TERM_TYPE, RR_LITERAL)): 

123 return True 

124 return False 

125 

126 

127def _generate_template( 

128 source_rules: pd.DataFrame, db_url: str | None = None 

129) -> RDBTemplate: 

130 source_type = source_rules.iloc[0]["source_type"] 

131 

132 if source_type == "RDB": 

133 return RDBTemplate(db_url) 

134 else: 

135 raise ValueError(f"Unsupported source type: {source_type}") 

136 

137 

138def _is_column_only_iri(map_type: object, map_value: object, term_type: object) -> bool: 

139 if map_type == RML_REFERENCE and term_type == RML_IRI: 

140 return True 

141 if map_type == RML_TEMPLATE and term_type == RML_IRI: 

142 stripped = str(map_value).strip() 

143 if stripped.startswith("{") and stripped.endswith("}") and stripped.count("{") == 1: 

144 return True 

145 return False 

146 

147 

148def _check_for_column_iri_term_maps(mappings: pd.DataFrame) -> bool: 

149 for _, rule in mappings.iterrows(): 

150 if _is_column_only_iri(rule["subject_map_type"], rule["subject_map_value"], rule["subject_termtype"]): 

151 return True 

152 if _is_column_only_iri(rule["object_map_type"], rule["object_map_value"], rule["object_termtype"]): 

153 return True 

154 if _is_column_only_iri(rule["predicate_map_type"], rule["predicate_map_value"], RML_IRI): 

155 return True 

156 return False 

157 

158 

159def _check_for_constant_only_mappings(mappings: pd.DataFrame) -> bool: 

160 try: 

161 for _, rule in mappings.iterrows(): 

162 subject_map_type = rule.get("subject_map_type") 

163 predicate_map_type = rule.get("predicate_map_type") 

164 object_map_type = rule.get("object_map_type") 

165 

166 if ( 

167 subject_map_type in [RML_REFERENCE, RML_TEMPLATE] 

168 or predicate_map_type in [RML_REFERENCE, RML_TEMPLATE] 

169 or object_map_type in [RML_REFERENCE, RML_TEMPLATE] 

170 ): 

171 return False 

172 

173 return True 

174 except Exception as e: 

175 get_logger().warning(f"Could not check for constant-only mappings: {e}") 

176 return False 

177 

178 

179def _build_morph_config( 

180 mapping: str | pathlib.Path, 

181 rdf_graph: str, 

182 source_db_url: str | None = None, 

183) -> str: 

184 config = configparser.ConfigParser() 

185 config["CONFIGURATION"] = { 

186 "output_file": str(rdf_graph), 

187 "output_format": "N-QUADS", 

188 "logging_level": "ERROR", 

189 } 

190 data_source: dict[str, str] = {"mappings": str(mapping)} 

191 if source_db_url: 

192 data_source["db_url"] = source_db_url 

193 config["DataSource1"] = data_source 

194 

195 tmp = tempfile.NamedTemporaryFile( 

196 mode="w", suffix=".ini", delete=False, prefix="kgi_" 

197 ) 

198 config.write(tmp) 

199 tmp.close() 

200 return tmp.name 

201 

202 

203def reconstruct( 

204 mapping: str | pathlib.Path, 

205 rdf_graph: str | pathlib.Path, 

206 source_db_url: str | None = None, 

207 dest_db_url: str | None = None, 

208 sparql_endpoint: str | None = None, 

209 use_virtuoso: bool = False, 

210 virtuoso_container: str = "virtuoso-kgi", 

211) -> dict[str, ReconstructedTable]: 

212 logger = get_logger() 

213 mapping_path = str(mapping) 

214 rdf_graph_str = str(rdf_graph) 

215 

216 mapping_store = _parse_mapping_store(mapping_path) 

217 

218 if _check_for_literal_subjects(mapping_store): 

219 raise MappingError("rr:termType rr:Literal on subjectMap is not valid") 

220 

221 if _check_for_sql_queries(mapping_store): 

222 raise UnsupportedMappingError("SQL query as logical table is not supported") 

223 

224 if _check_for_multiple_subject_maps(mapping_store): 

225 raise MappingError("TriplesMap contains multiple subjectMaps") 

226 

227 if not source_db_url: 

228 extracted_url = _extract_db_url_from_mapping(mapping_store) 

229 if extracted_url: 

230 source_db_url = extracted_url 

231 logger.info(f"Extracted source database URL from mapping: {extracted_url}") 

232 

233 config_file = _build_morph_config(mapping, rdf_graph_str, source_db_url) 

234 try: 

235 config = load_config_from_argument(config_file) 

236 

237 try: 

238 mappings, _, _ = retrieve_mappings(config) 

239 except ValueError as e: 

240 raise MappingError(f"Invalid mapping: {e}") from e 

241 except KeyError as e: 

242 if str(e) == "'object_map'": 

243 raise MappingError( 

244 "Mapping with missing object_map information" 

245 ) from e 

246 raise MappingError(f"Mapping error: {e}") from e 

247 

248 if _check_for_constant_only_mappings(mappings): 

249 raise NonInvertibleError( 

250 "Mappings contain only constants (no column references) - original data cannot be recovered" 

251 ) 

252 

253 if _check_for_column_iri_term_maps(mappings): 

254 raise NonInvertibleError( 

255 "Term map uses rr:column with IRI term type - base IRI resolution makes inversion ambiguous" 

256 ) 

257 

258 try: 

259 if sparql_endpoint: 

260 if use_virtuoso: 

261 endpoint = VirtuosoEndpoint( 

262 sparql_endpoint, 

263 rdf_file_to_load=rdf_graph_str, 

264 container_name=virtuoso_container, 

265 ) 

266 else: 

267 endpoint = RemoteEndpoint( 

268 sparql_endpoint, rdf_file_to_load=rdf_graph_str 

269 ) 

270 else: 

271 endpoint = EndpointFactory.create_from_url(rdf_graph_str) 

272 except (FileNotFoundError, OSError) as e: 

273 raise NoDataError( 

274 "No RDF input file found, likely due to mapping errors" 

275 ) from e 

276 except ValueError as e: 

277 raise NonInvertibleError( 

278 f"Output RDF contains invalid data: {e}" 

279 ) from e 

280 

281 insert_columns(mappings) 

282 

283 schema_retrievers: dict[str, DatabaseSchemaRetriever] = {} 

284 if source_db_url: 

285 schema_retrievers["DataSource1"] = DatabaseSchemaRetriever(source_db_url) 

286 

287 mappings = mappings[mappings["logical_source_type"] != RML_SOURCE] 

288 

289 results: dict[str, ReconstructedTable] = {} 

290 for table_name, source_rules in mappings.groupby("logical_source_value"): 

291 source_section = source_rules.iloc[0].get( 

292 "source_section", "DataSource1" 

293 ) 

294 template_db_url = dest_db_url if dest_db_url else source_db_url 

295 template = _generate_template(source_rules, template_db_url) 

296 

297 source_data, sparql_query = retrieve_data( 

298 mappings, source_rules, endpoint, decode_columns=True 

299 ) 

300 

301 if source_data is None: 

302 results[table_name] = ReconstructedTable( 

303 sql="", sparql_query="", data=pd.DataFrame() 

304 ) 

305 logger.warning(f"No data generated for {table_name}") 

306 continue 

307 

308 if source_section in schema_retrievers: 

309 schema_retriever = schema_retrievers[source_section] 

310 table_schema = schema_retriever.get_table_schema(table_name) 

311 if table_schema: 

312 source_data = apply_schema_types(source_data, table_schema) 

313 source_data = apply_schema_ordering(source_data, table_schema) 

314 

315 filled_source = template.fill_data(source_data, table_name) 

316 results[table_name] = ReconstructedTable( 

317 sql=filled_source, 

318 sparql_query=sparql_query or "", 

319 data=source_data, 

320 ) 

321 

322 for retriever in schema_retrievers.values(): 

323 retriever.dispose() 

324 

325 if not results: 

326 raise NoDataError("No data was generated during reconstruction") 

327 

328 return results 

329 finally: 

330 os.unlink(config_file)