Skip to content
Snippets Groups Projects
Commit bc4cf800 authored by Leah Tacke genannt Unterberg's avatar Leah Tacke genannt Unterberg
Browse files

caved and "backported" sqlalchemy-utils's create_view to support schemas

parent eecb58b3
No related branches found
No related tags found
No related merge requests found
...@@ -10,7 +10,6 @@ import sqlalchemy.sql.schema ...@@ -10,7 +10,6 @@ import sqlalchemy.sql.schema
from pydantic import AnyUrl, ConfigDict from pydantic import AnyUrl, ConfigDict
from sqlalchemy import func from sqlalchemy import func
from sqlalchemy.pool import StaticPool from sqlalchemy.pool import StaticPool
from sqlalchemy_utils.view import create_view
from mitm_tooling.definition import RelationName from mitm_tooling.definition import RelationName
from mitm_tooling.definition.definition_tools import ColGroupMaps from mitm_tooling.definition.definition_tools import ColGroupMaps
...@@ -19,6 +18,7 @@ from .common import * ...@@ -19,6 +18,7 @@ from .common import *
from .intermediate_representation import Header, MITMData from .intermediate_representation import Header, MITMData
from .sql.common import * from .sql.common import *
from ..utilities.io_utils import FilePath from ..utilities.io_utils import FilePath
from ..utilities.backports.sqlchemy_sql_views import create_view
if TYPE_CHECKING: if TYPE_CHECKING:
pass pass
...@@ -303,7 +303,7 @@ def mk_sql_rep_schema(header: Header, ...@@ -303,7 +303,7 @@ def mk_sql_rep_schema(header: Header,
if view_generators is not None: if view_generators is not None:
for generator in view_generators: for generator in view_generators:
for name, queryable in generator(header.mitm, concept_tables, type_tables): for name, queryable in generator(header.mitm, concept_tables, type_tables):
views[name] = create_view(name, queryable, meta) # TODO make `create_view` schema-aware and add `schema=schema_name` views[name] = create_view(name, queryable, meta, schema=schema_name)
return SQLRepresentationSchema(meta=meta, concept_tables=concept_tables, type_tables=type_tables, views=views) return SQLRepresentationSchema(meta=meta, concept_tables=concept_tables, type_tables=type_tables, views=views)
......
import sqlalchemy as sa
from sqlalchemy.ext import compiler
from sqlalchemy.schema import DDLElement, PrimaryKeyConstraint
from sqlalchemy.sql.expression import ClauseElement, Executable
from sqlalchemy_utils.functions import get_columns
class CreateView(DDLElement):
def __init__(self, name, selectable, materialized=False, replace=False, schema=None):
if materialized and replace:
raise ValueError("Cannot use CREATE OR REPLACE with materialized views")
self.name = name
self.selectable = selectable
self.materialized = materialized
self.replace = replace
self.schema = schema
@compiler.compiles(CreateView)
def compile_create_materialized_view(element, compiler, **kw):
return 'CREATE {}{}VIEW {}{} AS {}'.format(
'OR REPLACE ' if element.replace else '',
'MATERIALIZED ' if element.materialized else '',
(compiler.dialect.identifier_preparer.quote(element.schema) + '.') if element.schema else '',
compiler.dialect.identifier_preparer.quote(element.name),
compiler.sql_compiler.process(element.selectable, literal_binds=True),
)
class DropView(DDLElement):
def __init__(self, name, materialized=False, cascade=True, schema=None):
self.name = name
self.materialized = materialized
self.cascade = cascade
self.schema = schema
@compiler.compiles(DropView)
def compile_drop_materialized_view(element, compiler, **kw):
compiler.dialect.identifier_preparer.quote(element.schema)
return 'DROP {}VIEW IF EXISTS {}{} {}'.format(
'MATERIALIZED ' if element.materialized else '',
(compiler.dialect.identifier_preparer.quote(element.schema) + '.') if element.schema else '',
compiler.dialect.identifier_preparer.quote(element.name),
'CASCADE' if element.cascade else ''
)
def create_table_from_selectable(
name,
selectable,
indexes=None,
metadata=None,
aliases=None,
**kwargs
):
if indexes is None:
indexes = []
if metadata is None:
metadata = sa.MetaData()
if aliases is None:
aliases = {}
args = [
sa.Column(
c.name,
c.type,
key=aliases.get(c.name, c.name),
primary_key=c.primary_key
)
for c in get_columns(selectable)
] + indexes
table = sa.Table(name, metadata, *args, **kwargs)
if not any([c.primary_key for c in get_columns(selectable)]):
table.append_constraint(
PrimaryKeyConstraint(*[c.name for c in get_columns(selectable)])
)
return table
def create_materialized_view(
name,
selectable,
metadata,
indexes=None,
aliases=None,
schema=None
):
""" Create a view on a given metadata
:param name: The name of the view to create.
:param selectable: An SQLAlchemy selectable e.g. a select() statement.
:param metadata:
An SQLAlchemy Metadata instance that stores the features of the
database being described.
:param indexes: An optional list of SQLAlchemy Index instances.
:param aliases:
An optional dictionary containing with keys as column names and values
as column aliases.
Same as for ``create_view`` except that a ``CREATE MATERIALIZED VIEW``
statement is emitted instead of a ``CREATE VIEW``.
"""
table = create_table_from_selectable(
name=name,
selectable=selectable,
indexes=indexes,
metadata=None,
aliases=aliases,
schema=schema
)
sa.event.listen(
metadata,
'after_create',
CreateView(name, selectable, materialized=True, schema=schema)
)
@sa.event.listens_for(metadata, 'after_create')
def create_indexes(target, connection, **kw):
for idx in table.indexes:
idx.create(connection)
sa.event.listen(
metadata,
'before_drop',
DropView(name, materialized=True, schema=schema)
)
return table
def create_view(
name,
selectable,
metadata,
cascade_on_drop=True,
replace=False,
schema=None
):
""" Create a view on a given metadata
:param name: The name of the view to create.
:param selectable: An SQLAlchemy selectable e.g. a select() statement.
:param metadata:
An SQLAlchemy Metadata instance that stores the features of the
database being described.
:param cascade_on_drop: If ``True`` the view will be dropped with
``CASCADE``, deleting all dependent objects as well.
:param replace: If ``True`` the view will be created with ``OR REPLACE``,
replacing an existing view with the same name.
The process for creating a view is similar to the standard way that a
table is constructed, except that a selectable is provided instead of
a set of columns. The view is created once a ``CREATE`` statement is
executed against the supplied metadata (e.g. ``metadata.create_all(..)``),
and dropped when a ``DROP`` is executed against the metadata.
To create a view that performs basic filtering on a table. ::
metadata = MetaData()
users = Table('users', metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
Column('fullname', String),
Column('premium_user', Boolean, default=False),
)
premium_members = select(users).where(users.c.premium_user == True)
# sqlalchemy 1.3:
# premium_members = select([users]).where(users.c.premium_user == True)
create_view('premium_users', premium_members, metadata)
metadata.create_all(engine) # View is created at this point
"""
table = create_table_from_selectable(
name=name,
selectable=selectable,
metadata=None,
schema=schema
)
sa.event.listen(
metadata,
'after_create',
CreateView(name, selectable, replace=replace, schema=schema),
)
@sa.event.listens_for(metadata, 'after_create')
def create_indexes(target, connection, **kw):
for idx in table.indexes:
idx.create(connection)
sa.event.listen(
metadata,
'before_drop',
DropView(name, cascade=cascade_on_drop, schema=schema)
)
return table
class RefreshMaterializedView(Executable, ClauseElement):
inherit_cache = True
def __init__(self, name, concurrently, schema=None):
self.name = name
self.concurrently = concurrently
self.schema = schema
@compiler.compiles(RefreshMaterializedView)
def compile_refresh_materialized_view(element, compiler):
return 'REFRESH MATERIALIZED VIEW {concurrently}{schema}{name}'.format(
concurrently='CONCURRENTLY ' if element.concurrently else '',
schema=(compiler.dialect.identifier_preparer.quote(element.schema) + '.') if element.schema else '',
name=compiler.dialect.identifier_preparer.quote(element.name),
)
def refresh_materialized_view(session, name, concurrently=False):
""" Refreshes an already existing materialized view
:param session: An SQLAlchemy Session instance.
:param name: The name of the materialized view to refresh.
:param concurrently:
Optional flag that causes the ``CONCURRENTLY`` parameter
to be specified when the materialized view is refreshed.
"""
# Since session.execute() bypasses autoflush, we must manually flush in
# order to include newly-created/modified objects in the refresh.
session.flush()
session.execute(RefreshMaterializedView(name, concurrently))
[tool.poetry] [tool.poetry]
name = "mitm-tooling" name = "mitm-tooling"
version = "0.4.3" version = "0.4.4"
description = "" description = ""
authors = ["Leah Tacke genannt Unterberg <leah.tgu@pads.rwth-aachen.de>"] authors = ["Leah Tacke genannt Unterberg <leah.tgu@pads.rwth-aachen.de>"]
readme = "README.md" readme = "README.md"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment