Coverage for kgi / comparison.py: 93%

176 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-23 08:53 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import io 

6import re 

7from typing import Union 

8 

9import pandas as pd 

10from pyoxigraph import BlankNode, Literal, NamedNode, RdfFormat, Store, Triple 

11 

12RdfSubject = Union[NamedNode, BlankNode, Triple] 

13RdfTerm = Union[NamedNode, BlankNode, Literal, Triple] 

14 

15TEMPLATE_COLUMN_REGEX = re.compile(r'\{\\?"?\'?([^"\'{}\\]+)\\?"?\'?\}') 

16 

17RR_COLUMN = NamedNode("http://www.w3.org/ns/r2rml#column") 

18RR_TEMPLATE = NamedNode("http://www.w3.org/ns/r2rml#template") 

19RR_CHILD = NamedNode("http://www.w3.org/ns/r2rml#child") 

20RR_PARENT = NamedNode("http://www.w3.org/ns/r2rml#parent") 

21RR_TABLE_NAME = NamedNode("http://www.w3.org/ns/r2rml#tableName") 

22RR_LOGICAL_TABLE = NamedNode("http://www.w3.org/ns/r2rml#logicalTable") 

23RR_SUBJECT_MAP = NamedNode("http://www.w3.org/ns/r2rml#subjectMap") 

24RML_OLD_REFERENCE = NamedNode("http://semweb.mmlab.be/ns/rml#reference") 

25RML_OLD_LOGICAL_SOURCE = NamedNode("http://semweb.mmlab.be/ns/rml#logicalSource") 

26 

27 

28def _term_value(term: RdfTerm) -> str: 

29 assert isinstance(term, (NamedNode, BlankNode, Literal)) 

30 return term.value 

31 

32 

33def parse_mapping(mapping_content: str) -> Store: 

34 store = Store() 

35 store.load(input=io.BytesIO(mapping_content.encode("utf-8")), format=RdfFormat.TURTLE) 

36 return store 

37 

38 

39def extract_columns_from_mapping(mapping_content: str) -> set[str]: 

40 store = parse_mapping(mapping_content) 

41 columns: set[str] = set() 

42 for quad in store.quads_for_pattern(None, RR_COLUMN, None): 

43 columns.add(_term_value(quad.object).strip('"')) 

44 for quad in store.quads_for_pattern(None, RML_OLD_REFERENCE, None): 

45 columns.add(_term_value(quad.object).strip('"')) 

46 for quad in store.quads_for_pattern(None, RR_TEMPLATE, None): 

47 column_refs = TEMPLATE_COLUMN_REGEX.findall(_term_value(quad.object)) 

48 columns.update(column_refs) 

49 for quad in store.quads_for_pattern(None, RR_CHILD, None): 

50 columns.add(_term_value(quad.object).strip('"')) 

51 for quad in store.quads_for_pattern(None, RR_PARENT, None): 

52 columns.add(_term_value(quad.object).strip('"')) 

53 return columns 

54 

55 

56def check_mapping_column_coverage( 

57 mapping_content: str, source_content: dict[str, dict[str, list[str]]], 

58) -> list[str]: 

59 mapped_columns = extract_columns_from_mapping(mapping_content) 

60 invertibility_issues = [] 

61 for table_name, table_data in source_content.items(): 

62 table_columns = set(table_data['columns']) 

63 missing_columns = table_columns - mapped_columns 

64 if missing_columns: 

65 missing_str = ", ".join(sorted(missing_columns)) 

66 invertibility_issues.append(f"Table '{table_name}' has unmapped columns: {missing_str}") 

67 return invertibility_issues 

68 

69 

70def get_mapped_table_names(mapping_store: Store) -> set[str]: 

71 tables: set[str] = set() 

72 for quad in mapping_store.quads_for_pattern(None, RR_TABLE_NAME, None): 

73 tables.add(_term_value(quad.object).strip('"')) 

74 return tables 

75 

76 

77def _first_object(store: Store, subject: RdfSubject, predicate: NamedNode) -> RdfTerm | None: 

78 for quad in store.quads_for_pattern(subject, predicate, None): 

79 return quad.object 

80 return None 

81 

82 

83def _subjects_of(store: Store, predicate: NamedNode, obj: RdfTerm) -> list[RdfSubject]: 

84 return [quad.subject for quad in store.quads_for_pattern(None, predicate, obj)] 

85 

86 

87def find_subject_map_for_table(mapping_store: Store, table_name: str) -> RdfSubject | None: 

88 for quad in mapping_store.quads_for_pattern(None, RR_TABLE_NAME, None): 

89 if _term_value(quad.object).strip('"') != table_name: 

90 continue 

91 logical_table = quad.subject 

92 triples_maps = _subjects_of(mapping_store, RR_LOGICAL_TABLE, logical_table) 

93 if not triples_maps: 

94 triples_maps = _subjects_of(mapping_store, RML_OLD_LOGICAL_SOURCE, logical_table) 

95 if not triples_maps: 

96 continue 

97 result = _first_object(mapping_store, triples_maps[0], RR_SUBJECT_MAP) 

98 if result is not None and isinstance(result, (NamedNode, BlankNode)): 

99 return result 

100 return None 

101 

102 

103def check_null_in_subject_template( 

104 mapping_store: Store, source_df: pd.DataFrame, table_name: str, 

105) -> tuple[str | None, bool]: 

106 subject_map = find_subject_map_for_table(mapping_store, table_name) 

107 if subject_map is None: 

108 return None, False 

109 template_quad = next(mapping_store.quads_for_pattern(subject_map, RR_TEMPLATE, None), None) 

110 if template_quad is None: 

111 return None, False 

112 column_refs = TEMPLATE_COLUMN_REGEX.findall(_term_value(template_quad.object)) 

113 for col in column_refs: 

114 if col in source_df.columns and bool(source_df[col].isna().any()): 

115 null_count = int(source_df[col].isna().sum()) 

116 return ( 

117 f"{table_name} (NON-INVERTIBLE: NULL values in subject template column " 

118 f"'{col}' cause {null_count} row(s) to be excluded from RDF)", 

119 True, 

120 ) 

121 return None, False 

122 

123 

124def detect_non_invertible( 

125 mapping_store: Store, source_df: pd.DataFrame, table_name: str, 

126) -> tuple[str | None, bool]: 

127 null_msg, is_null = check_null_in_subject_template(mapping_store, source_df, table_name) 

128 if is_null: 

129 return null_msg, True 

130 return None, False 

131 

132 

133def analyze_duplicate_loss( 

134 source_df: pd.DataFrame, dest_df: pd.DataFrame, table_name: str, 

135) -> tuple[str | None, bool]: 

136 source_unique = source_df.drop_duplicates() 

137 dest_unique = dest_df.drop_duplicates() 

138 

139 if source_unique.equals(dest_unique) and len(source_df) > len(dest_df): 

140 duplicate_rows = [] 

141 for _, row in source_unique.iterrows(): 

142 source_count = len(source_df[source_df.eq(row).all(axis=1)]) 

143 dest_count = len(dest_df[dest_df.eq(row).all(axis=1)]) 

144 if source_count > dest_count: 

145 duplicate_rows.append((source_count, dest_count, dict(row))) 

146 

147 if duplicate_rows: 

148 duplicate_info = "; ".join([ 

149 f"Row {row} appears {src_cnt} times in source but {dst_cnt} times in destination" 

150 for src_cnt, dst_cnt, row in duplicate_rows 

151 ]) 

152 message = ( 

153 f"{table_name} (NON-INVERTIBLE: Duplicate rows lost during inversion - {duplicate_info}. " 

154 "Consider adding unique identifiers to your mapping template to preserve row distinctness)" 

155 ) 

156 return message, True 

157 

158 return None, False 

159 

160 

161def compare_databases( 

162 source_content: dict[str, dict[str, list[str]]], 

163 dest_content: dict[str, dict[str, list[str]]], 

164 mapping_content: str | None = None, 

165) -> tuple[bool, str, str | None]: 

166 if not source_content and not dest_content: 

167 return True, "Both databases are empty - comparison successful", None 

168 if not source_content or not dest_content: 

169 return False, "One database is empty while the other is not", None 

170 

171 mapping_graph = parse_mapping(mapping_content) if mapping_content else None 

172 

173 source_tables = set(source_content.keys()) 

174 dest_tables = set(dest_content.keys()) 

175 missing_from_dest = source_tables - dest_tables 

176 

177 mismatched_tables = [] 

178 has_invertibility_issues = False 

179 

180 if missing_from_dest: 

181 if mapping_graph: 

182 mapped_tables = get_mapped_table_names(mapping_graph) 

183 unmapped_tables = {t for t in missing_from_dest if t not in mapped_tables} 

184 if unmapped_tables == missing_from_dest: 

185 unmapped_str = ", ".join(sorted(unmapped_tables)) 

186 mismatched_tables.append(f"NON-INVERTIBLE: Unmapped tables: {unmapped_str}") 

187 has_invertibility_issues = True 

188 else: 

189 return False, "Tables in source and destination databases do not match", None 

190 else: 

191 return False, "Tables in source and destination databases do not match", None 

192 

193 common_tables = source_tables & dest_tables 

194 for table_name in common_tables: 

195 source_table = source_content[table_name] 

196 dest_table = dest_content[table_name] 

197 

198 if set(source_table['columns']) != set(dest_table['columns']): 

199 mismatched_tables.append(f"{table_name} (columns mismatch)") 

200 continue 

201 

202 source_df = pd.DataFrame(source_table['data'], columns=pd.Index(source_table['columns'])) 

203 dest_df = pd.DataFrame(dest_table['data'], columns=pd.Index(dest_table['columns'])) 

204 

205 if source_df.empty and dest_df.empty: 

206 continue 

207 

208 source_df = source_df.dropna(how='all') 

209 dest_df = dest_df.dropna(how='all') 

210 source_df = source_df.reindex(sorted(source_df.columns), axis=1) 

211 dest_df = dest_df.reindex(sorted(dest_df.columns), axis=1) 

212 source_df = source_df.reset_index(drop=True) 

213 dest_df = dest_df.reset_index(drop=True) 

214 source_df = source_df.sort_values(by=source_df.columns.tolist()).reset_index(drop=True) 

215 dest_df = dest_df.sort_values(by=dest_df.columns.tolist()).reset_index(drop=True) 

216 

217 if not source_df.equals(dest_df): 

218 resolved = False 

219 if len(source_df) > len(dest_df): 

220 duplicate_analysis, is_dup_issue = analyze_duplicate_loss(source_df, dest_df, table_name) 

221 if duplicate_analysis: 

222 mismatched_tables.append(duplicate_analysis) 

223 if is_dup_issue: 

224 has_invertibility_issues = True 

225 resolved = True 

226 

227 if not resolved and mapping_graph: 

228 issue_msg, is_issue = detect_non_invertible(mapping_graph, source_df, table_name) 

229 if is_issue: 

230 mismatched_tables.append(issue_msg) 

231 has_invertibility_issues = True 

232 resolved = True 

233 

234 if not resolved: 

235 mismatched_tables.append(f"{table_name} (data mismatch)") 

236 

237 if mismatched_tables: 

238 message = f"Mismatched tables: {', '.join(mismatched_tables)}" 

239 

240 if mapping_content and not has_invertibility_issues: 

241 invertibility_issues = check_mapping_column_coverage(mapping_content, source_content) 

242 if invertibility_issues: 

243 invertibility_message = "; ".join(invertibility_issues) 

244 message += f" (NON-INVERTIBLE: {invertibility_message})" 

245 has_invertibility_issues = True 

246 

247 if has_invertibility_issues: 

248 return False, message, "non_invertible" 

249 return False, message, None 

250 

251 return True, "All tables in source and destination databases are identical", None