Coverage for kgi / schema.py: 77%
119 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 08:53 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 08:53 +0000
1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5"""Schema retrieval and management for knowledge graph inversion."""
7import logging
8from dataclasses import dataclass
9from typing import Any, Optional
11import pandas as pd
12import sqlalchemy
13from sqlalchemy import inspect
16@dataclass
17class ColumnInfo:
18 """Column metadata from database schema."""
20 name: str
21 data_type: str
22 python_type: type
23 nullable: bool = True
24 ordinal_position: int = 0
27@dataclass
28class TableSchema:
29 """Table schema information."""
31 table_name: str
32 columns: list[ColumnInfo]
33 primary_keys: list[str]
35 @property
36 def column_names_ordered(self) -> list[str]:
37 """Get column names in database order."""
38 return [
39 col.name for col in sorted(self.columns, key=lambda c: c.ordinal_position)
40 ]
42 def get_column_info(self, column_name: str) -> Optional[ColumnInfo]:
43 """Get information for a specific column."""
44 return next((col for col in self.columns if col.name == column_name), None)
47class DatabaseSchemaRetriever:
48 """Retrieves schema information from original database."""
50 def __init__(self, db_url: str):
51 """Initialize with database URL."""
52 self.db_url = db_url
53 self.logger = logging.getLogger("kgi")
54 self._engine = None
56 @property
57 def engine(self):
58 """Get or create database engine."""
59 if self._engine is None:
60 self._engine = sqlalchemy.create_engine(self.db_url)
61 return self._engine
63 def get_table_schema(self, table_name: str) -> Optional[TableSchema]:
64 """Get schema information for a specific table."""
65 try:
66 inspector = inspect(self.engine)
68 if not inspector.has_table(table_name):
69 self.logger.warning(f"Table {table_name} not found in database")
70 return None
72 columns_info = inspector.get_columns(table_name)
73 primary_keys = inspector.get_pk_constraint(table_name)[
74 "constrained_columns"
75 ]
77 columns = []
78 for idx, col_info in enumerate(columns_info):
79 column = ColumnInfo(
80 name=col_info["name"],
81 data_type=str(col_info["type"]),
82 python_type=self._sql_to_python_type(col_info["type"]),
83 nullable=col_info.get("nullable", True),
84 ordinal_position=idx + 1,
85 )
86 columns.append(column)
88 return TableSchema(
89 table_name=table_name, columns=columns, primary_keys=primary_keys or []
90 )
92 except Exception as e:
93 self.logger.error(f"Error retrieving schema for table {table_name}: {e}")
94 return None
96 def _sql_to_python_type(self, sql_type) -> type:
97 """Convert SQLAlchemy type to Python type."""
98 if isinstance(
99 sql_type,
100 (sqlalchemy.Integer, sqlalchemy.BigInteger, sqlalchemy.SmallInteger),
101 ):
102 return int
103 elif isinstance(
104 sql_type, (sqlalchemy.Float, sqlalchemy.Numeric, sqlalchemy.DECIMAL)
105 ):
106 return float
107 elif isinstance(sql_type, sqlalchemy.Boolean):
108 return bool
109 elif isinstance(sql_type, (sqlalchemy.Date,)):
110 return pd.Timestamp
111 elif isinstance(sql_type, (sqlalchemy.DateTime, sqlalchemy.TIMESTAMP)):
112 return pd.Timestamp
113 else:
114 return str
116 def dispose(self):
117 """Dispose of database engine."""
118 if self._engine:
119 self._engine.dispose()
120 self._engine = None
123def infer_type_from_value_with_schema(
124 value: Any, column_info: Optional[ColumnInfo] = None
125) -> Any:
126 """
127 Enhanced type inference using schema information when available.
129 Args:
130 value: The value to convert
131 column_info: Optional column schema information
133 Returns:
134 Converted value with appropriate type
135 """
136 if column_info is None:
137 return _infer_type_from_value(value)
139 try:
140 # Use schema information to guide conversion
141 if column_info.python_type is int:
142 return int(float(value)) if value is not None else None
143 elif column_info.python_type is float:
144 return float(value) if value is not None else None
145 elif column_info.python_type is bool:
146 if isinstance(value, str):
147 return value.lower() in ("true", "t", "1", "yes", "y")
148 return bool(value) if value is not None else None
149 elif column_info.python_type == pd.Timestamp:
150 return pd.to_datetime(value) if value is not None else None
151 else:
152 return str(value) if value is not None else None
154 except (ValueError, TypeError) as e:
155 logging.getLogger("kgi.schema").warning(
156 f"Failed to convert value {value} to {column_info.python_type.__name__}: {e}"
157 )
158 # Fallback to basic type inference
159 return _infer_type_from_value(value)
162def _infer_type_from_value(value: Any) -> Any:
163 """Basic type inference from value."""
164 if value is None or pd.isna(value):
165 return None
167 str_value = str(value).strip()
169 # Try integer
170 try:
171 if "." not in str_value:
172 return int(str_value)
173 except (ValueError, TypeError):
174 pass
176 # Try float
177 try:
178 return float(str_value)
179 except (ValueError, TypeError):
180 pass
182 # Try boolean
183 if str_value.lower() in ("true", "false", "t", "f", "1", "0"):
184 return str_value.lower() in ("true", "t", "1")
186 # Try datetime
187 try:
188 return pd.to_datetime(str_value)
189 except (ValueError, TypeError):
190 pass
192 # Default to string
193 return str_value
196def apply_schema_ordering(df: pd.DataFrame, schema: TableSchema) -> pd.DataFrame:
197 """Apply database column ordering to DataFrame."""
198 if df.empty:
199 return df
201 ordered_columns = [col for col in schema.column_names_ordered if col in df.columns]
203 # Add any remaining columns that weren't in the schema
204 remaining_columns = [col for col in df.columns if col not in ordered_columns]
206 final_order = ordered_columns + remaining_columns
208 return pd.DataFrame(df[final_order])
211def apply_schema_types(df: pd.DataFrame, schema: TableSchema) -> pd.DataFrame:
212 """Apply database column types to DataFrame."""
213 if df.empty:
214 return df
216 df = df.copy()
218 for col_name in df.columns:
219 column_info = schema.get_column_info(col_name)
220 if column_info:
221 df[col_name] = df[col_name].apply(
222 lambda x: infer_type_from_value_with_schema(x, column_info)
223 )
225 return df