dirty: sql stuff

This commit is contained in:
Rodney, Tiara 2025-12-07 12:54:32 +01:00 committed by Tiara Rodney
parent cc4b567181
commit 156af70b6e
No known key found for this signature in database
GPG key ID: 5CD8EC1D46106723
3 changed files with 147 additions and 0 deletions

View file

@ -0,0 +1,147 @@
from dataclasses import dataclass, fields, Field
import datetime
from typing import Optional, get_origin, get_args, Union, List
def escape_value(value: str) -> str:
"""Escapes a string value for safe SQL insertion.
:param value: Raw string value
:return: Escaped SQL-safe string
"""
return "'" + str(value).replace("'", "''") + "'"
def extract_fields_from_expr(expr: Expr) -> List[str]:
"""Recursively extracts field names used in an expression tree.
:param expr: Expression object
:return: List of field names referenced in the expression
"""
if isinstance(expr, (Eq, Gt, Lt, Ne, Like)):
return [expr.field]
elif isinstance(expr, (And, Or)):
fields = []
for sub in expr.conditions:
fields.extend(extract_fields_from_expr(sub))
return fields
return []
@dataclass(freeze=True)
class GIDModelProps:
"""
"""
table_prefix: Optional[str] = None,
pk_name: Optional[str] = None
class GIDModelParent:
"""
"""
table_name: str
foreign_key_name: str
@dataclass
class GIDModel:
"""Base class for SQL-aware dataclasses with support for schema generation,
insertion, and query generation.
"""
@classmethod
def create_table_sql(
cls: "GIDModel",
props: GIDModelProps,
parent: Optional[GIDModelParent] = None,
tables: Optional[[str, GIDModel]] = None,
) -> str:
"""Generates SQL CREATE TABLE statements for the model and its nested
components.
:param table_prefix: Optional prefix for table names
:param root_table: Name of the root table for foreign key references
:return: SQL CREATE TABLE statement(s)
"""
tables = tables or []
table_name = f"{{props.table_prefix} or ''}{cls.__name__.lower()}"
if table_name in tables.keys():
if not cls == tables[table_name]:
raise Exception('table with name '{table_name}' already incorporated with different schema')
else:
return None
columns = ["{props.pk_name} TEXT PRIMARY KEY"] if props.pk_name else []
nested_sql = []
foreign_keys = []
if parent and parent.table_name != table_name:
foreign_keys.append(
f"FOREIGN KEY (gid) REFERENCES {parent.table_name}({parent.foreign_key_name}) "
f"ON DELETE CASCADE ON UPDATE CASCADE"
)
for f in fields(cls):
if f.name == props.pk_name: continue # already handled
py_type = f.type
if get_origin(py_type) is Union and get_args(py_type)[1] is type(None):
py_type = get_args(py_type)[0]
sql_type = SQL_TYPECAST.get(py_type)
metadata = f.metadata.get("gid", None)
if issubclass(py_type, List):
py_type = get_args(py_type)[0]
pk_name = None
parent_fk_name = f.name
else:
pk_name = 'id'
parent_fk_name = f.name
if issubclass(py_type, GIDModel):
if props.pk_name:
raise Exception('gidless model with nesting not possible')
nested_table = f"{props.table_prefix}{py_type.__name__.lower()}"
# determine whether to pass parent or self
# if no primary key, pass the parent if it exists
if not props.pk_name:
f_parent=parent or None
# if primary, pass self
else:
f_parent=GIDModelParent(
table_name=table,
fk_name=f.name
)
nested_sql.append(
py_type.create_table_sql(
props=f_props or GIDModelProps(
table_name=f.name
pk_name=f_parent.fk_name
)
parent=f_parent
)
)
if sql_type:
column_def = f"{f.name} {sql_type}"
if metadata:
column_def += f" {metadata}"
columns.append(column_def)
else:
raise Exception(f"Unable to typecast field '{f.name}' with type '{py_type}' to SQL")
all_defs = columns + foreign_keys
main_sql = f"CREATE TABLE {table} (\n {',\n '.join(all_defs)}\n);"
return "\n".join([main_sql] + nested_sql)