Coverage for kgi / schema.py: 77%

119 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-23 08:53 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5"""Schema retrieval and management for knowledge graph inversion.""" 

6 

7import logging 

8from dataclasses import dataclass 

9from typing import Any, Optional 

10 

11import pandas as pd 

12import sqlalchemy 

13from sqlalchemy import inspect 

14 

15 

16@dataclass 

17class ColumnInfo: 

18 """Column metadata from database schema.""" 

19 

20 name: str 

21 data_type: str 

22 python_type: type 

23 nullable: bool = True 

24 ordinal_position: int = 0 

25 

26 

27@dataclass 

28class TableSchema: 

29 """Table schema information.""" 

30 

31 table_name: str 

32 columns: list[ColumnInfo] 

33 primary_keys: list[str] 

34 

35 @property 

36 def column_names_ordered(self) -> list[str]: 

37 """Get column names in database order.""" 

38 return [ 

39 col.name for col in sorted(self.columns, key=lambda c: c.ordinal_position) 

40 ] 

41 

42 def get_column_info(self, column_name: str) -> Optional[ColumnInfo]: 

43 """Get information for a specific column.""" 

44 return next((col for col in self.columns if col.name == column_name), None) 

45 

46 

47class DatabaseSchemaRetriever: 

48 """Retrieves schema information from original database.""" 

49 

50 def __init__(self, db_url: str): 

51 """Initialize with database URL.""" 

52 self.db_url = db_url 

53 self.logger = logging.getLogger("kgi") 

54 self._engine = None 

55 

56 @property 

57 def engine(self): 

58 """Get or create database engine.""" 

59 if self._engine is None: 

60 self._engine = sqlalchemy.create_engine(self.db_url) 

61 return self._engine 

62 

63 def get_table_schema(self, table_name: str) -> Optional[TableSchema]: 

64 """Get schema information for a specific table.""" 

65 try: 

66 inspector = inspect(self.engine) 

67 

68 if not inspector.has_table(table_name): 

69 self.logger.warning(f"Table {table_name} not found in database") 

70 return None 

71 

72 columns_info = inspector.get_columns(table_name) 

73 primary_keys = inspector.get_pk_constraint(table_name)[ 

74 "constrained_columns" 

75 ] 

76 

77 columns = [] 

78 for idx, col_info in enumerate(columns_info): 

79 column = ColumnInfo( 

80 name=col_info["name"], 

81 data_type=str(col_info["type"]), 

82 python_type=self._sql_to_python_type(col_info["type"]), 

83 nullable=col_info.get("nullable", True), 

84 ordinal_position=idx + 1, 

85 ) 

86 columns.append(column) 

87 

88 return TableSchema( 

89 table_name=table_name, columns=columns, primary_keys=primary_keys or [] 

90 ) 

91 

92 except Exception as e: 

93 self.logger.error(f"Error retrieving schema for table {table_name}: {e}") 

94 return None 

95 

96 def _sql_to_python_type(self, sql_type) -> type: 

97 """Convert SQLAlchemy type to Python type.""" 

98 if isinstance( 

99 sql_type, 

100 (sqlalchemy.Integer, sqlalchemy.BigInteger, sqlalchemy.SmallInteger), 

101 ): 

102 return int 

103 elif isinstance( 

104 sql_type, (sqlalchemy.Float, sqlalchemy.Numeric, sqlalchemy.DECIMAL) 

105 ): 

106 return float 

107 elif isinstance(sql_type, sqlalchemy.Boolean): 

108 return bool 

109 elif isinstance(sql_type, (sqlalchemy.Date,)): 

110 return pd.Timestamp 

111 elif isinstance(sql_type, (sqlalchemy.DateTime, sqlalchemy.TIMESTAMP)): 

112 return pd.Timestamp 

113 else: 

114 return str 

115 

116 def dispose(self): 

117 """Dispose of database engine.""" 

118 if self._engine: 

119 self._engine.dispose() 

120 self._engine = None 

121 

122 

123def infer_type_from_value_with_schema( 

124 value: Any, column_info: Optional[ColumnInfo] = None 

125) -> Any: 

126 """ 

127 Enhanced type inference using schema information when available. 

128 

129 Args: 

130 value: The value to convert 

131 column_info: Optional column schema information 

132 

133 Returns: 

134 Converted value with appropriate type 

135 """ 

136 if column_info is None: 

137 return _infer_type_from_value(value) 

138 

139 try: 

140 # Use schema information to guide conversion 

141 if column_info.python_type is int: 

142 return int(float(value)) if value is not None else None 

143 elif column_info.python_type is float: 

144 return float(value) if value is not None else None 

145 elif column_info.python_type is bool: 

146 if isinstance(value, str): 

147 return value.lower() in ("true", "t", "1", "yes", "y") 

148 return bool(value) if value is not None else None 

149 elif column_info.python_type == pd.Timestamp: 

150 return pd.to_datetime(value) if value is not None else None 

151 else: 

152 return str(value) if value is not None else None 

153 

154 except (ValueError, TypeError) as e: 

155 logging.getLogger("kgi.schema").warning( 

156 f"Failed to convert value {value} to {column_info.python_type.__name__}: {e}" 

157 ) 

158 # Fallback to basic type inference 

159 return _infer_type_from_value(value) 

160 

161 

162def _infer_type_from_value(value: Any) -> Any: 

163 """Basic type inference from value.""" 

164 if value is None or pd.isna(value): 

165 return None 

166 

167 str_value = str(value).strip() 

168 

169 # Try integer 

170 try: 

171 if "." not in str_value: 

172 return int(str_value) 

173 except (ValueError, TypeError): 

174 pass 

175 

176 # Try float 

177 try: 

178 return float(str_value) 

179 except (ValueError, TypeError): 

180 pass 

181 

182 # Try boolean 

183 if str_value.lower() in ("true", "false", "t", "f", "1", "0"): 

184 return str_value.lower() in ("true", "t", "1") 

185 

186 # Try datetime 

187 try: 

188 return pd.to_datetime(str_value) 

189 except (ValueError, TypeError): 

190 pass 

191 

192 # Default to string 

193 return str_value 

194 

195 

196def apply_schema_ordering(df: pd.DataFrame, schema: TableSchema) -> pd.DataFrame: 

197 """Apply database column ordering to DataFrame.""" 

198 if df.empty: 

199 return df 

200 

201 ordered_columns = [col for col in schema.column_names_ordered if col in df.columns] 

202 

203 # Add any remaining columns that weren't in the schema 

204 remaining_columns = [col for col in df.columns if col not in ordered_columns] 

205 

206 final_order = ordered_columns + remaining_columns 

207 

208 return pd.DataFrame(df[final_order]) 

209 

210 

211def apply_schema_types(df: pd.DataFrame, schema: TableSchema) -> pd.DataFrame: 

212 """Apply database column types to DataFrame.""" 

213 if df.empty: 

214 return df 

215 

216 df = df.copy() 

217 

218 for col_name in df.columns: 

219 column_info = schema.get_column_info(col_name) 

220 if column_info: 

221 df[col_name] = df[col_name].apply( 

222 lambda x: infer_type_from_value_with_schema(x, column_info) 

223 ) 

224 

225 return df