from xmlrpc.client import Boolean
import pandas as pd
import sqlalchemy
from sqlalchemy import MetaData, Table, inspect
from sqlalchemy.sql import text
from dblinea.db_postgresql import DBPostgresql
[docs]class DBBase:
_database = None
_engine = None
_debug = False
# TODO: OS dados de coneção com o banco devem vir de outro lugar!
_available_databases = dict(
{
"gavo": {
"ENGINE": "postgresql_psycopg2",
"HOST": "desdb4.linea.gov.br",
"PORT": "5432",
"USER": "untrustedprod",
"PASSWORD": "untrusted",
"DATABASE": "prod_gavo",
}
}
)
def __init__(
self,
database="gavo",
dbhost=None,
dbname=None,
dbuser=None,
dbpass=None,
dbport=None,
dbengine="postgresql_psycopg2",
debug: Boolean = False,
):
self._debug = debug
# Se todas as variaveis de configuração forem None
# Vai criar a conexão usando um dos _available_databases.
# Default "gavo" ou o valor informado pelo usuario em database
if all(x is None for x in [dbhost, dbname, dbuser, dbpass, dbport]):
self.__set_database(database)
else:
# Se ao menos umas dessas variaveis
# [dbhost, dbname, dbuser, dbpass, dbport]
# For diferente de None, vai tentar criar uma conexão
# usando os dados que o usuario passou.
# Util para:
# - Acessar outros bancos de dados que o usuario tenha acesso
# - Conectar ao banco usando suas credenciais
# - Para executar os Unit tests fora do ambiente.
db_settings = {
"ENGINE": dbengine,
"HOST": dbhost,
"PORT": dbport,
"USER": dbuser,
"PASSWORD": dbpass,
"DATABASE": dbname,
}
self._database = DBPostgresql(db_settings)
def _setdebug(self, debug: Boolean):
self._debug = debug
def __set_database(self, database):
"""Instancia a classe de Banco de dados.
Este Metodo é utilizado quando é passado o
parametro database na instancia da Classe.
verifica se o database está na lista _available_databases
Args:
database (str): Nome do database como está no
atributo _available_databases.
Raises:
ValueError: Database not available.
"""
if database not in self._available_databases:
raise ValueError("Database not available.")
db_settings = self._available_databases[database]
if db_settings["ENGINE"] == "postgresql_psycopg2":
self._database = DBPostgresql(db_settings)
# if db["ENGINE"] == "sqlite3":
# return DBSqlite(db)
# if db_settings["ENGINE"] == "oracle":
# return DBOracle(db_settings)
[docs] def available_databases(self):
"""Lista os bancos de dados disponiveis
Returns:
Lista de databases pre configurados na bibloteca.
cada elemento da lista representa um db.
exemplo:
[{'config_name': 'gavo', 'dbname': '<database_name>',
'host': '<database_host.linea.gov.br>', 'engine': 'postgresql_psycopg2'}]
"""
dbs = []
for config_name in self._available_databases:
dbs.append(
dict(
{
"config_name": config_name,
"dbname": self._available_databases[config_name]["DATABASE"],
"host": self._available_databases[config_name]["HOST"],
"engine": self._available_databases[config_name]["ENGINE"],
}
)
)
return dbs
[docs] def get_engine(self):
"""Retorna uma sqlalchemy.Engine
Cria ou retorna uma engine para o database solicitado na Instancia da DBBase.
Mais informações sobre Engine em
https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.Engine
Returns:
sqlalchemy.engine.Engine:
"""
if self._engine is None:
self._engine = self._database.get_engine()
return self._engine
[docs] def sa_table(self, tablename: str, schema: str = None) -> sqlalchemy.schema.Table:
"""Retona uma instancia de sqlalchemy.schema.Table que representa uma tabela no database.
https://docs.sqlalchemy.org/en/14/core/metadata.html#sqlalchemy.schema.Table
Args:
tablename (str): Nome da tabela sem o schema.
schema (str, optional): Schema onde a tabela se encontra. Defaults to None.
Returns:
sqlalchemy.schema.Table: instancia de Table representando a tabela solicitada.
"""
tbl = Table(tablename, MetaData(schema=schema), schema=schema)
return tbl
[docs] def execute(self, stm):
"""Executa a query usando con.execute,
recomendada para query de Delete, Update ou outras
querys que não precisem de iteração com o resultado.
Args:
stm (statement): Query a ser executada, pode ser escrita em SqlAlchemy
ou string no caso de string ela sera convertida para TextClause.
Returns:
CursorResult: [description]
"""
self._debug_query(stm)
if isinstance(stm, str):
stm = text(stm)
with self.get_engine().connect() as con:
return con.execute(stm)
[docs] def fetchall(self, stm):
"""Executa a query e retorna todos os resultados em uma lista.
Args:
stm (statement): Query a ser executada, pode ser escrita em SqlAlchemy
ou string no caso de string ela sera convertida para TextClause.
Returns:
list: Lista com os resultado no formato original do SqlAlchemy LegacyRow.
"""
# Convert Raw sql to Sql Alchemy TextClause
stm = self.raw_sql_to_stm(stm)
self._debug_query(stm)
with self.get_engine().connect() as con:
queryset = con.execute(stm).fetchall()
return queryset
[docs] def fetchall_dict(self, stm):
"""Executa a query e retorna todos os resultados em uma lista de Dicts.
exemplo:[{'col': 'value', ..., 'colN':'valueN'}]
Args:
stm (statement): Query a ser executada, pode ser escrita em SqlAlchemy
ou string no caso de string ela sera convertida para TextClause.
Returns:
list: Resultado da query em uma lista de dicionários.
"""
# Convert Raw sql to Sql Alchemy TextClause
stm = self.raw_sql_to_stm(stm)
self._debug_query(stm)
with self.get_engine().connect() as con:
queryset = con.execute(stm)
rows = []
for row in queryset:
rows.append(self.to_dict(row))
return rows
[docs] def fetchall_df(self, stm):
"""Executa a query usando Pandas e retorna um Dataframe com o resultado.
Args:
stm (statement): Query a ser executada, pode ser escrita em SqlAlchemy
ou string no caso de string ela sera convertida para TextClause.
Returns:
Pandas.Dataframe: Dataframe com o resultado da query.
"""
self._debug_query(stm)
df = pd.read_sql(stm, con=self.get_engine())
return df
[docs] def fetchone(self, stm):
"""Executa a query retorna a primeira linha do resultado
Args:
stm (statement): Query a ser executada, pode ser escrita em SqlAlchemy
ou string no caso de string ela sera convertida para TextClause.
Returns:
sqlalchemy.engine.row.LegacyRow: Primeira linha do resultado da query.
"""
self._debug_query(stm)
# Convert Raw sql to Sql Alchemy TextClause
stm = self.raw_sql_to_stm(stm)
with self.get_engine().connect() as con:
queryset = con.execute(stm).fetchone()
return queryset
[docs] def fetchone_dict(self, stm):
"""Executa a query retorna a primeira linha do resultado convertida em Dict
Args:
stm (statement): Query a ser executada, pode ser escrita em SqlAlchemy
ou string no caso de string ela sera convertida para TextClause.
Returns:
dict: Primeira linha do resultado da query.
"""
self._debug_query(stm)
# Convert Raw sql to Sql Alchemy TextClause
stm = self.raw_sql_to_stm(stm)
with self.get_engine().connect() as con:
queryset = con.execute(stm).fetchone()
if queryset is not None:
return self.to_dict(queryset)
else:
return None
[docs] def fetch_scalar(self, stm):
"""Retorna o valor da primeira coluna na primeira linha do resultado da query
util para querys de count por exemplo, ou quando se quer apenas um unico valor.
Args:
stm (statement): Query a ser executada, pode ser escrita em SqlAlchemy
ou string no caso de string ela sera convertida para TextClause.
Returns:
any: Valor da primeira coluna na primeira linha.
"""
self._debug_query(stm)
# Convert Raw sql to Sql Alchemy TextClause
stm = self.raw_sql_to_stm(stm)
with self.get_engine().connect() as con:
return con.execute(stm).scalar()
[docs] def to_dict(self, row):
"""Converte uma linha de resultado do SQLAlchemy queryset para Dict
Args:
row (sqlalchemy.engine.row.LegacyRow): Row retornada pelo execute.
Returns:
dict : Row convertida para Dict {colname: value, colname2: value2 ...}
"""
return row._asdict()
[docs] def raw_sql_to_stm(self, stm):
"""Converte uma string raw sql para SqlAlchemy TextClause
Args:
stm (str): Query SQL em string. ex: Select * from tablename...
Returns:
TextClause: TextClause representando uma string SQL.
"""
if isinstance(stm, str):
return text(stm)
return stm
[docs] def get_table_columns(self, tablename, schema=None):
"""Retorna os nomes das colunas de uma tabela.
Args:
tablename (string): Nome da tabela sem schema.
schema (string): Nome do schema ou None quando nao houver.
Returns:
columns (list): Colunas disponiveis na tabela
"""
insp = inspect(self.get_engine())
return [value["name"] for value in insp.get_columns(tablename, schema)]
[docs] def describe_table(self, tablename, schema=None):
"""Retorna uma lista de dicionarios com nome e
tipo das colunas de uma tabela.
exemplo: [{"name": "", "type": ""}]
Args:
tablename (str): Nome da tabela sem schema.
schema (str, optional): Nome do schema. Defaults to None.
Returns:
list: Lista de colunas com seu tipo
"""
cols = []
insp = inspect(self.get_engine())
for c in insp.get_columns(tablename, schema):
cols.append(dict({"name": c["name"], "type": c["type"]}))
return cols
[docs] def stm_to_str(self, stm, with_parameters=True):
sql = str(
stm.compile(
dialect=self._database.get_dialect(),
compile_kwargs={"literal_binds": with_parameters},
)
)
# Remove new lines
sql = sql.replace("\n", " ").replace("\r", "")
return sql
def _debug_query(self, stm):
if not isinstance(stm, str):
stm = self.stm_to_str(stm)
if self._debug:
print(stm)