Coverage for kgi / core.py: 71%
198 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 08:53 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 08:53 +0000
1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import configparser
6import logging
7import os
8import pathlib
9import re
10import tempfile
11from urllib.parse import quote_plus
13import pandas as pd
14from morph_kgc.args_parser import load_config_from_argument
15from morph_kgc.mapping.mapping_parser import retrieve_mappings
16from pyoxigraph import BlankNode, Literal, NamedNode, Quad, RdfFormat, Store
18from .constants import (
19 RML_IRI,
20 RML_REFERENCE,
21 RML_SOURCE,
22 RML_TEMPLATE,
23 RR_LITERAL,
24 RR_SUBJECT_MAP,
25 RR_TERM_TYPE,
26)
27from .endpoints import EndpointFactory, RemoteEndpoint, VirtuosoEndpoint
28from .exceptions import (
29 MappingError,
30 NoDataError,
31 NonInvertibleError,
32 UnsupportedMappingError,
33)
34from .models import ReconstructedTable
35from .query import retrieve_data
36from .schema import DatabaseSchemaRetriever, apply_schema_ordering, apply_schema_types
37from .templates import RDBTemplate
38from .utils import insert_columns
40D2RQ_DATABASE = NamedNode("http://www.wiwiss.fu-berlin.de/suhl/bizer/D2RQ/0.1#Database")
41D2RQ_JDBC_DSN = NamedNode("http://www.wiwiss.fu-berlin.de/suhl/bizer/D2RQ/0.1#jdbcDSN")
42D2RQ_USERNAME = NamedNode("http://www.wiwiss.fu-berlin.de/suhl/bizer/D2RQ/0.1#username")
43D2RQ_PASSWORD = NamedNode("http://www.wiwiss.fu-berlin.de/suhl/bizer/D2RQ/0.1#password")
45JDBC_DRIVERS: dict[str, str] = {
46 "postgresql": "postgresql+psycopg2",
47 "mysql": "mysql+pymysql",
48}
50RR_SQL_QUERY = NamedNode("http://www.w3.org/ns/r2rml#sqlQuery")
51RML_QUERY_NEW = NamedNode("http://w3id.org/rml/query")
52RML_QUERY_LEGACY = NamedNode("http://semweb.mmlab.be/ns/rml#query")
53RR_TRIPLES_MAP = NamedNode("http://www.w3.org/ns/r2rml#TriplesMap")
54RDF_TYPE = NamedNode("http://www.w3.org/1999/02/22-rdf-syntax-ns#type")
57def get_logger() -> logging.Logger:
58 return logging.getLogger("kgi")
61def _parse_mapping_store(mapping_file: str) -> Store:
62 store = Store()
63 store.load(path=mapping_file, format=RdfFormat.TURTLE)
64 return store
67def _literal_value(quad: Quad) -> str:
68 obj = quad.object
69 if isinstance(obj, Literal):
70 return obj.value
71 return str(obj)
74def _extract_db_url_from_mapping(store: Store) -> str | None:
75 databases = list(store.quads_for_pattern(None, RDF_TYPE, D2RQ_DATABASE))
76 if not databases:
77 return None
78 db_node = databases[0].subject
79 dsn_quads = list(store.quads_for_pattern(db_node, D2RQ_JDBC_DSN, None))
80 if not dsn_quads:
81 return None
82 jdbc_dsn = _literal_value(dsn_quads[0])
83 match = re.match(r"jdbc:(\w+)://(.+)", jdbc_dsn)
84 if not match:
85 return None
86 db_type, host_and_db = match.group(1), match.group(2)
87 driver = JDBC_DRIVERS.get(db_type)
88 if not driver:
89 get_logger().warning(f"Unsupported JDBC driver type: {db_type}")
90 return None
91 user_quads = list(store.quads_for_pattern(db_node, D2RQ_USERNAME, None))
92 pass_quads = list(store.quads_for_pattern(db_node, D2RQ_PASSWORD, None))
93 username = _literal_value(user_quads[0]) if user_quads else ""
94 password = _literal_value(pass_quads[0]) if pass_quads else ""
95 credentials = f"{quote_plus(username)}:{quote_plus(password)}@" if username else ""
96 return f"{driver}://{credentials}{host_and_db}"
99def _check_for_sql_queries(store: Store) -> bool:
100 for predicate in (RR_SQL_QUERY, RML_QUERY_NEW, RML_QUERY_LEGACY):
101 if any(store.quads_for_pattern(None, predicate, None)):
102 return True
103 return False
106def _check_for_multiple_subject_maps(store: Store) -> bool:
107 for quad in store.quads_for_pattern(None, RDF_TYPE, RR_TRIPLES_MAP):
108 triples_map = quad.subject
109 subject_maps = list(
110 store.quads_for_pattern(triples_map, RR_SUBJECT_MAP, None)
111 )
112 if len(subject_maps) > 1:
113 return True
114 return False
117def _check_for_literal_subjects(store: Store) -> bool:
118 for sm_quad in store.quads_for_pattern(None, RR_SUBJECT_MAP, None):
119 subject_map_node = sm_quad.object
120 if not isinstance(subject_map_node, (NamedNode, BlankNode)):
121 continue
122 if any(store.quads_for_pattern(subject_map_node, RR_TERM_TYPE, RR_LITERAL)):
123 return True
124 return False
127def _generate_template(
128 source_rules: pd.DataFrame, db_url: str | None = None
129) -> RDBTemplate:
130 source_type = source_rules.iloc[0]["source_type"]
132 if source_type == "RDB":
133 return RDBTemplate(db_url)
134 else:
135 raise ValueError(f"Unsupported source type: {source_type}")
138def _is_column_only_iri(map_type: object, map_value: object, term_type: object) -> bool:
139 if map_type == RML_REFERENCE and term_type == RML_IRI:
140 return True
141 if map_type == RML_TEMPLATE and term_type == RML_IRI:
142 stripped = str(map_value).strip()
143 if stripped.startswith("{") and stripped.endswith("}") and stripped.count("{") == 1:
144 return True
145 return False
148def _check_for_column_iri_term_maps(mappings: pd.DataFrame) -> bool:
149 for _, rule in mappings.iterrows():
150 if _is_column_only_iri(rule["subject_map_type"], rule["subject_map_value"], rule["subject_termtype"]):
151 return True
152 if _is_column_only_iri(rule["object_map_type"], rule["object_map_value"], rule["object_termtype"]):
153 return True
154 if _is_column_only_iri(rule["predicate_map_type"], rule["predicate_map_value"], RML_IRI):
155 return True
156 return False
159def _check_for_constant_only_mappings(mappings: pd.DataFrame) -> bool:
160 try:
161 for _, rule in mappings.iterrows():
162 subject_map_type = rule.get("subject_map_type")
163 predicate_map_type = rule.get("predicate_map_type")
164 object_map_type = rule.get("object_map_type")
166 if (
167 subject_map_type in [RML_REFERENCE, RML_TEMPLATE]
168 or predicate_map_type in [RML_REFERENCE, RML_TEMPLATE]
169 or object_map_type in [RML_REFERENCE, RML_TEMPLATE]
170 ):
171 return False
173 return True
174 except Exception as e:
175 get_logger().warning(f"Could not check for constant-only mappings: {e}")
176 return False
179def _build_morph_config(
180 mapping: str | pathlib.Path,
181 rdf_graph: str,
182 source_db_url: str | None = None,
183) -> str:
184 config = configparser.ConfigParser()
185 config["CONFIGURATION"] = {
186 "output_file": str(rdf_graph),
187 "output_format": "N-QUADS",
188 "logging_level": "ERROR",
189 }
190 data_source: dict[str, str] = {"mappings": str(mapping)}
191 if source_db_url:
192 data_source["db_url"] = source_db_url
193 config["DataSource1"] = data_source
195 tmp = tempfile.NamedTemporaryFile(
196 mode="w", suffix=".ini", delete=False, prefix="kgi_"
197 )
198 config.write(tmp)
199 tmp.close()
200 return tmp.name
203def reconstruct(
204 mapping: str | pathlib.Path,
205 rdf_graph: str | pathlib.Path,
206 source_db_url: str | None = None,
207 dest_db_url: str | None = None,
208 sparql_endpoint: str | None = None,
209 use_virtuoso: bool = False,
210 virtuoso_container: str = "virtuoso-kgi",
211) -> dict[str, ReconstructedTable]:
212 logger = get_logger()
213 mapping_path = str(mapping)
214 rdf_graph_str = str(rdf_graph)
216 mapping_store = _parse_mapping_store(mapping_path)
218 if _check_for_literal_subjects(mapping_store):
219 raise MappingError("rr:termType rr:Literal on subjectMap is not valid")
221 if _check_for_sql_queries(mapping_store):
222 raise UnsupportedMappingError("SQL query as logical table is not supported")
224 if _check_for_multiple_subject_maps(mapping_store):
225 raise MappingError("TriplesMap contains multiple subjectMaps")
227 if not source_db_url:
228 extracted_url = _extract_db_url_from_mapping(mapping_store)
229 if extracted_url:
230 source_db_url = extracted_url
231 logger.info(f"Extracted source database URL from mapping: {extracted_url}")
233 config_file = _build_morph_config(mapping, rdf_graph_str, source_db_url)
234 try:
235 config = load_config_from_argument(config_file)
237 try:
238 mappings, _, _ = retrieve_mappings(config)
239 except ValueError as e:
240 raise MappingError(f"Invalid mapping: {e}") from e
241 except KeyError as e:
242 if str(e) == "'object_map'":
243 raise MappingError(
244 "Mapping with missing object_map information"
245 ) from e
246 raise MappingError(f"Mapping error: {e}") from e
248 if _check_for_constant_only_mappings(mappings):
249 raise NonInvertibleError(
250 "Mappings contain only constants (no column references) - original data cannot be recovered"
251 )
253 if _check_for_column_iri_term_maps(mappings):
254 raise NonInvertibleError(
255 "Term map uses rr:column with IRI term type - base IRI resolution makes inversion ambiguous"
256 )
258 try:
259 if sparql_endpoint:
260 if use_virtuoso:
261 endpoint = VirtuosoEndpoint(
262 sparql_endpoint,
263 rdf_file_to_load=rdf_graph_str,
264 container_name=virtuoso_container,
265 )
266 else:
267 endpoint = RemoteEndpoint(
268 sparql_endpoint, rdf_file_to_load=rdf_graph_str
269 )
270 else:
271 endpoint = EndpointFactory.create_from_url(rdf_graph_str)
272 except (FileNotFoundError, OSError) as e:
273 raise NoDataError(
274 "No RDF input file found, likely due to mapping errors"
275 ) from e
276 except ValueError as e:
277 raise NonInvertibleError(
278 f"Output RDF contains invalid data: {e}"
279 ) from e
281 insert_columns(mappings)
283 schema_retrievers: dict[str, DatabaseSchemaRetriever] = {}
284 if source_db_url:
285 schema_retrievers["DataSource1"] = DatabaseSchemaRetriever(source_db_url)
287 mappings = mappings[mappings["logical_source_type"] != RML_SOURCE]
289 results: dict[str, ReconstructedTable] = {}
290 for table_name, source_rules in mappings.groupby("logical_source_value"):
291 source_section = source_rules.iloc[0].get(
292 "source_section", "DataSource1"
293 )
294 template_db_url = dest_db_url if dest_db_url else source_db_url
295 template = _generate_template(source_rules, template_db_url)
297 source_data, sparql_query = retrieve_data(
298 mappings, source_rules, endpoint, decode_columns=True
299 )
301 if source_data is None:
302 results[table_name] = ReconstructedTable(
303 sql="", sparql_query="", data=pd.DataFrame()
304 )
305 logger.warning(f"No data generated for {table_name}")
306 continue
308 if source_section in schema_retrievers:
309 schema_retriever = schema_retrievers[source_section]
310 table_schema = schema_retriever.get_table_schema(table_name)
311 if table_schema:
312 source_data = apply_schema_types(source_data, table_schema)
313 source_data = apply_schema_ordering(source_data, table_schema)
315 filled_source = template.fill_data(source_data, table_name)
316 results[table_name] = ReconstructedTable(
317 sql=filled_source,
318 sparql_query=sparql_query or "",
319 data=source_data,
320 )
322 for retriever in schema_retrievers.values():
323 retriever.dispose()
325 if not results:
326 raise NoDataError("No data was generated during reconstruction")
328 return results
329 finally:
330 os.unlink(config_file)