Coverage for kgi / comparison.py: 93%
176 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 08:53 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 08:53 +0000
1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import io
6import re
7from typing import Union
9import pandas as pd
10from pyoxigraph import BlankNode, Literal, NamedNode, RdfFormat, Store, Triple
12RdfSubject = Union[NamedNode, BlankNode, Triple]
13RdfTerm = Union[NamedNode, BlankNode, Literal, Triple]
15TEMPLATE_COLUMN_REGEX = re.compile(r'\{\\?"?\'?([^"\'{}\\]+)\\?"?\'?\}')
17RR_COLUMN = NamedNode("http://www.w3.org/ns/r2rml#column")
18RR_TEMPLATE = NamedNode("http://www.w3.org/ns/r2rml#template")
19RR_CHILD = NamedNode("http://www.w3.org/ns/r2rml#child")
20RR_PARENT = NamedNode("http://www.w3.org/ns/r2rml#parent")
21RR_TABLE_NAME = NamedNode("http://www.w3.org/ns/r2rml#tableName")
22RR_LOGICAL_TABLE = NamedNode("http://www.w3.org/ns/r2rml#logicalTable")
23RR_SUBJECT_MAP = NamedNode("http://www.w3.org/ns/r2rml#subjectMap")
24RML_OLD_REFERENCE = NamedNode("http://semweb.mmlab.be/ns/rml#reference")
25RML_OLD_LOGICAL_SOURCE = NamedNode("http://semweb.mmlab.be/ns/rml#logicalSource")
28def _term_value(term: RdfTerm) -> str:
29 assert isinstance(term, (NamedNode, BlankNode, Literal))
30 return term.value
33def parse_mapping(mapping_content: str) -> Store:
34 store = Store()
35 store.load(input=io.BytesIO(mapping_content.encode("utf-8")), format=RdfFormat.TURTLE)
36 return store
39def extract_columns_from_mapping(mapping_content: str) -> set[str]:
40 store = parse_mapping(mapping_content)
41 columns: set[str] = set()
42 for quad in store.quads_for_pattern(None, RR_COLUMN, None):
43 columns.add(_term_value(quad.object).strip('"'))
44 for quad in store.quads_for_pattern(None, RML_OLD_REFERENCE, None):
45 columns.add(_term_value(quad.object).strip('"'))
46 for quad in store.quads_for_pattern(None, RR_TEMPLATE, None):
47 column_refs = TEMPLATE_COLUMN_REGEX.findall(_term_value(quad.object))
48 columns.update(column_refs)
49 for quad in store.quads_for_pattern(None, RR_CHILD, None):
50 columns.add(_term_value(quad.object).strip('"'))
51 for quad in store.quads_for_pattern(None, RR_PARENT, None):
52 columns.add(_term_value(quad.object).strip('"'))
53 return columns
56def check_mapping_column_coverage(
57 mapping_content: str, source_content: dict[str, dict[str, list[str]]],
58) -> list[str]:
59 mapped_columns = extract_columns_from_mapping(mapping_content)
60 invertibility_issues = []
61 for table_name, table_data in source_content.items():
62 table_columns = set(table_data['columns'])
63 missing_columns = table_columns - mapped_columns
64 if missing_columns:
65 missing_str = ", ".join(sorted(missing_columns))
66 invertibility_issues.append(f"Table '{table_name}' has unmapped columns: {missing_str}")
67 return invertibility_issues
70def get_mapped_table_names(mapping_store: Store) -> set[str]:
71 tables: set[str] = set()
72 for quad in mapping_store.quads_for_pattern(None, RR_TABLE_NAME, None):
73 tables.add(_term_value(quad.object).strip('"'))
74 return tables
77def _first_object(store: Store, subject: RdfSubject, predicate: NamedNode) -> RdfTerm | None:
78 for quad in store.quads_for_pattern(subject, predicate, None):
79 return quad.object
80 return None
83def _subjects_of(store: Store, predicate: NamedNode, obj: RdfTerm) -> list[RdfSubject]:
84 return [quad.subject for quad in store.quads_for_pattern(None, predicate, obj)]
87def find_subject_map_for_table(mapping_store: Store, table_name: str) -> RdfSubject | None:
88 for quad in mapping_store.quads_for_pattern(None, RR_TABLE_NAME, None):
89 if _term_value(quad.object).strip('"') != table_name:
90 continue
91 logical_table = quad.subject
92 triples_maps = _subjects_of(mapping_store, RR_LOGICAL_TABLE, logical_table)
93 if not triples_maps:
94 triples_maps = _subjects_of(mapping_store, RML_OLD_LOGICAL_SOURCE, logical_table)
95 if not triples_maps:
96 continue
97 result = _first_object(mapping_store, triples_maps[0], RR_SUBJECT_MAP)
98 if result is not None and isinstance(result, (NamedNode, BlankNode)):
99 return result
100 return None
103def check_null_in_subject_template(
104 mapping_store: Store, source_df: pd.DataFrame, table_name: str,
105) -> tuple[str | None, bool]:
106 subject_map = find_subject_map_for_table(mapping_store, table_name)
107 if subject_map is None:
108 return None, False
109 template_quad = next(mapping_store.quads_for_pattern(subject_map, RR_TEMPLATE, None), None)
110 if template_quad is None:
111 return None, False
112 column_refs = TEMPLATE_COLUMN_REGEX.findall(_term_value(template_quad.object))
113 for col in column_refs:
114 if col in source_df.columns and bool(source_df[col].isna().any()):
115 null_count = int(source_df[col].isna().sum())
116 return (
117 f"{table_name} (NON-INVERTIBLE: NULL values in subject template column "
118 f"'{col}' cause {null_count} row(s) to be excluded from RDF)",
119 True,
120 )
121 return None, False
124def detect_non_invertible(
125 mapping_store: Store, source_df: pd.DataFrame, table_name: str,
126) -> tuple[str | None, bool]:
127 null_msg, is_null = check_null_in_subject_template(mapping_store, source_df, table_name)
128 if is_null:
129 return null_msg, True
130 return None, False
133def analyze_duplicate_loss(
134 source_df: pd.DataFrame, dest_df: pd.DataFrame, table_name: str,
135) -> tuple[str | None, bool]:
136 source_unique = source_df.drop_duplicates()
137 dest_unique = dest_df.drop_duplicates()
139 if source_unique.equals(dest_unique) and len(source_df) > len(dest_df):
140 duplicate_rows = []
141 for _, row in source_unique.iterrows():
142 source_count = len(source_df[source_df.eq(row).all(axis=1)])
143 dest_count = len(dest_df[dest_df.eq(row).all(axis=1)])
144 if source_count > dest_count:
145 duplicate_rows.append((source_count, dest_count, dict(row)))
147 if duplicate_rows:
148 duplicate_info = "; ".join([
149 f"Row {row} appears {src_cnt} times in source but {dst_cnt} times in destination"
150 for src_cnt, dst_cnt, row in duplicate_rows
151 ])
152 message = (
153 f"{table_name} (NON-INVERTIBLE: Duplicate rows lost during inversion - {duplicate_info}. "
154 "Consider adding unique identifiers to your mapping template to preserve row distinctness)"
155 )
156 return message, True
158 return None, False
161def compare_databases(
162 source_content: dict[str, dict[str, list[str]]],
163 dest_content: dict[str, dict[str, list[str]]],
164 mapping_content: str | None = None,
165) -> tuple[bool, str, str | None]:
166 if not source_content and not dest_content:
167 return True, "Both databases are empty - comparison successful", None
168 if not source_content or not dest_content:
169 return False, "One database is empty while the other is not", None
171 mapping_graph = parse_mapping(mapping_content) if mapping_content else None
173 source_tables = set(source_content.keys())
174 dest_tables = set(dest_content.keys())
175 missing_from_dest = source_tables - dest_tables
177 mismatched_tables = []
178 has_invertibility_issues = False
180 if missing_from_dest:
181 if mapping_graph:
182 mapped_tables = get_mapped_table_names(mapping_graph)
183 unmapped_tables = {t for t in missing_from_dest if t not in mapped_tables}
184 if unmapped_tables == missing_from_dest:
185 unmapped_str = ", ".join(sorted(unmapped_tables))
186 mismatched_tables.append(f"NON-INVERTIBLE: Unmapped tables: {unmapped_str}")
187 has_invertibility_issues = True
188 else:
189 return False, "Tables in source and destination databases do not match", None
190 else:
191 return False, "Tables in source and destination databases do not match", None
193 common_tables = source_tables & dest_tables
194 for table_name in common_tables:
195 source_table = source_content[table_name]
196 dest_table = dest_content[table_name]
198 if set(source_table['columns']) != set(dest_table['columns']):
199 mismatched_tables.append(f"{table_name} (columns mismatch)")
200 continue
202 source_df = pd.DataFrame(source_table['data'], columns=pd.Index(source_table['columns']))
203 dest_df = pd.DataFrame(dest_table['data'], columns=pd.Index(dest_table['columns']))
205 if source_df.empty and dest_df.empty:
206 continue
208 source_df = source_df.dropna(how='all')
209 dest_df = dest_df.dropna(how='all')
210 source_df = source_df.reindex(sorted(source_df.columns), axis=1)
211 dest_df = dest_df.reindex(sorted(dest_df.columns), axis=1)
212 source_df = source_df.reset_index(drop=True)
213 dest_df = dest_df.reset_index(drop=True)
214 source_df = source_df.sort_values(by=source_df.columns.tolist()).reset_index(drop=True)
215 dest_df = dest_df.sort_values(by=dest_df.columns.tolist()).reset_index(drop=True)
217 if not source_df.equals(dest_df):
218 resolved = False
219 if len(source_df) > len(dest_df):
220 duplicate_analysis, is_dup_issue = analyze_duplicate_loss(source_df, dest_df, table_name)
221 if duplicate_analysis:
222 mismatched_tables.append(duplicate_analysis)
223 if is_dup_issue:
224 has_invertibility_issues = True
225 resolved = True
227 if not resolved and mapping_graph:
228 issue_msg, is_issue = detect_non_invertible(mapping_graph, source_df, table_name)
229 if is_issue:
230 mismatched_tables.append(issue_msg)
231 has_invertibility_issues = True
232 resolved = True
234 if not resolved:
235 mismatched_tables.append(f"{table_name} (data mismatch)")
237 if mismatched_tables:
238 message = f"Mismatched tables: {', '.join(mismatched_tables)}"
240 if mapping_content and not has_invertibility_issues:
241 invertibility_issues = check_mapping_column_coverage(mapping_content, source_content)
242 if invertibility_issues:
243 invertibility_message = "; ".join(invertibility_issues)
244 message += f" (NON-INVERTIBLE: {invertibility_message})"
245 has_invertibility_issues = True
247 if has_invertibility_issues:
248 return False, message, "non_invertible"
249 return False, message, None
251 return True, "All tables in source and destination databases are identical", None