Ecotech Solutions
Planificación del Proyecto
La planificación del proyecto ecotech_solutions se llevó a cabo utilizando la plataforma SaaS Jira, aplicando el marco de trabajo Scrum para organizar y gestionar el desarrollo del sistema.
El proyecto corresponde a una aplicación de consola, orientada a la gestión de usuarios, empleados, departamentos y proyectos, y fue diseñada siguiendo una arquitectura N capas, lo que permite una clara separación de responsabilidades entre la capa de persistencia, lógica de negocio y presentación.
Durante la planificación, se definió un Product Backlog compuesto por épicas e historias de usuario, organizadas principalmente por capas del sistema. A partir de este backlog, se planificaron sprints enfocados en funcionalidades específicas, comenzando por la capa de persistencia, mientras que las historias asociadas a la capa de presentación en consola quedaron priorizadas en el backlog para sprints futuros.
ÉPICA 2: Capa de Persistencia
Descripción: Implementar el acceso y manejo de datos para empleados, departamentos y proyectos, utilizando un enfoque desacoplado mediante DAO.
Tablero de ecotech-solutions - Sprint 1
from dominio.departamento import Departamento
from persistencia.conexion import Conexion
from typing import List
class DepartamentoDAO:
def __init__(self, conexion: Conexion):
"""
Recibe una instancia de Conexion ya configurada.
No abre conexiones aquí (SRP).
"""
self._conexion = conexion
def agregar(self, depto: Departamento) -> bool:
query = """
INSERT INTO departamento (nombre, descripcion)
VALUES (%s, %s)
"""
params = (depto.nombre, depto.descripcion)
rowcount = self._conexion.ejecutar_query(query, params)
return rowcount == 1
def mostrar(self) -> List[Departamento]:
query = "SELECT id, nombre, descripcion FROM departamento"
rows = self._conexion.ejecutar_query(query)
return [
Departamento(
id=row["id"], nombre=row["nombre"], descripcion=row["descripcion"]
)
for row in rows
]
def buscar_por_codigo(self, codigo: int) -> Departamento | None:
query = "SELECT id, nombre, descripcion FROM departamento WHERE id = %s"
rows = self._conexion.ejecutar_query(query, (codigo,))
if not rows:
return None
row = rows[0]
return Departamento(
id=row["id"], nombre=row["nombre"], descripcion=row["descripcion"]
)
def buscar_por_nombre(self, nombre: str) -> List[Departamento]:
query = "SELECT id, nombre, descripcion FROM departamento WHERE nombre LIKE %s"
rows = self._conexion.ejecutar_query(query, (f"%{nombre}%",))
return [
Departamento(
id=row["id"], nombre=row["nombre"], descripcion=row["descripcion"]
)
for row in rows
]
def modificar(self, depto: Departamento) -> bool:
query = """
UPDATE departamento
SET nombre = %s,
descripcion = %s,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
"""
params = (depto.nombre, depto.descripcion, depto.id)
rowcount = self._conexion.ejecutar_query(query, params)
return rowcount == 1
def eliminar(self, id_depto: int) -> bool:
query = "DELETE FROM departamento WHERE id = %s"
rowcount = self._conexion.ejecutar_query(query, (id_depto,))
return rowcount == 1
from dominio.empleado import Empleado
from persistencia.conexion import Conexion
from typing import List
class EmpleadoDAO:
def __init__(self, conexion: Conexion):
self._conexion = conexion
def agregar(self, empleado: Empleado) -> bool:
query = """
INSERT INTO empleado (
rut,
nombre,
apellido,
direccion,
telefono,
correo,
fecha_contrato,
salario,
departamento_id,
proyecto_id
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
params = (
empleado.rut,
empleado.nombre,
empleado.apellido,
empleado.direccion,
empleado.telefono,
empleado.correo,
empleado.fecha_contrato,
empleado.salario,
empleado.departamento_id,
empleado.proyecto_id,
)
rowcount = self._conexion.ejecutar_query(query, params)
return rowcount == 1
def mostrar(self) -> List[Empleado]:
query = """
SELECT
id,
rut,
nombre,
apellido,
direccion,
telefono,
correo,
fecha_contrato,
salario,
departamento_id,
proyecto_id
FROM empleado
"""
rows = self._conexion.ejecutar_query(query)
return [
Empleado(
id=row["id"],
rut=row["rut"],
nombre=row["nombre"],
apellido=row["apellido"],
direccion=row["direccion"],
telefono=row["telefono"],
correo=row["correo"],
fecha_contrato=row["fecha_contrato"],
salario=row["salario"],
departamento_id=row["departamento_id"],
proyecto_id=row["proyecto_id"],
)
for row in rows
]
def buscar_por_id(self, id_empleado: int) -> Empleado | None:
query = "SELECT * FROM empleado WHERE id = %s"
rows = self._conexion.ejecutar_query(query, (id_empleado,))
if not rows:
return None
row = rows[0]
return Empleado(
id=row["id"],
rut=row["rut"],
nombre=row["nombre"],
apellido=row["apellido"],
direccion=row["direccion"],
telefono=row["telefono"],
correo=row["correo"],
fecha_contrato=row["fecha_contrato"],
salario=row["salario"],
departamento_id=row["departamento_id"],
proyecto_id=row["proyecto_id"],
)
def buscar_por_rut(self, rut: str) -> Empleado | None:
query = "SELECT * FROM empleado WHERE rut = %s"
rows = self._conexion.ejecutar_query(query, (rut,))
if not rows:
return None
row = rows[0]
return Empleado(
id=row["id"],
rut=row["rut"],
nombre=row["nombre"],
apellido=row["apellido"],
direccion=row["direccion"],
telefono=row["telefono"],
correo=row["correo"],
fecha_contrato=row["fecha_contrato"],
salario=row["salario"],
departamento_id=row["departamento_id"],
proyecto_id=row["proyecto_id"],
)
def buscar_por_nombre(self, nombre: str) -> List[Empleado]:
query = """
SELECT * FROM empleado
WHERE nombre LIKE %s OR apellido LIKE %s
"""
rows = self._conexion.ejecutar_query(query, (f"%{nombre}%", f"%{nombre}%"))
return [
Empleado(
id=row["id"],
rut=row["rut"],
nombre=row["nombre"],
apellido=row["apellido"],
direccion=row["direccion"],
telefono=row["telefono"],
correo=row["correo"],
fecha_contrato=row["fecha_contrato"],
salario=row["salario"],
departamento_id=row["departamento_id"],
proyecto_id=row["proyecto_id"],
)
for row in rows
]
def modificar(self, empleado: Empleado) -> bool:
query = """
UPDATE empleado
SET
rut = %s,
nombre = %s,
apellido = %s,
direccion = %s,
telefono = %s,
correo = %s,
fecha_contrato = %s,
salario = %s,
departamento_id = %s,
proyecto_id = %s,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
"""
params = (
empleado.rut,
empleado.nombre,
empleado.apellido,
empleado.direccion,
empleado.telefono,
empleado.correo,
empleado.fecha_contrato,
empleado.salario,
empleado.departamento_id,
empleado.proyecto_id,
empleado.id,
)
rowcount = self._conexion.ejecutar_query(query, params)
return rowcount == 1
def eliminar(self, id_empleado: int) -> bool:
query = "DELETE FROM empleado WHERE id = %s"
rowcount = self._conexion.ejecutar_query(query, (id_empleado,))
return rowcount == 1
from dominio.proyecto import Proyecto
from persistencia.conexion import Conexion
from typing import List
class ProyectoDAO:
def __init__(self, conexion: Conexion):
self._conexion = conexion
def agregar(self, proyecto: Proyecto) -> bool:
query = """
INSERT INTO proyecto (
nombre,
descripcion,
departamento_id,
fecha_inicio,
fecha_fin,
estado
)
VALUES (%s, %s, %s, %s, %s, %s)
"""
params = (
proyecto.nombre,
proyecto.descripcion,
proyecto.departamento_id,
proyecto.fecha_inicio,
proyecto.fecha_fin,
proyecto.estado,
)
rowcount = self._conexion.ejecutar_query(query, params)
return rowcount == 1
def mostrar(self) -> List[Proyecto]:
query = """
SELECT
id,
nombre,
descripcion,
departamento_id,
fecha_inicio,
fecha_fin,
estado
FROM proyecto
"""
rows = self._conexion.ejecutar_query(query)
return [
Proyecto(
id=row["id"],
nombre=row["nombre"],
descripcion=row["descripcion"],
departamento_id=row["departamento_id"],
fecha_inicio=row["fecha_inicio"],
fecha_fin=row["fecha_fin"],
estado=row["estado"],
)
for row in rows
]
def buscar_por_id(self, id_proyecto: int) -> Proyecto | None:
query = "SELECT * FROM proyecto WHERE id = %s"
rows = self._conexion.ejecutar_query(query, (id_proyecto,))
if not rows:
return None
row = rows[0]
return Proyecto(
id=row["id"],
nombre=row["nombre"],
descripcion=row["descripcion"],
departamento_id=row["departamento_id"],
fecha_inicio=row["fecha_inicio"],
fecha_fin=row["fecha_fin"],
estado=row["estado"],
)
def buscar_por_nombre(self, nombre: str) -> List[Proyecto]:
query = "SELECT * FROM proyecto WHERE nombre LIKE %s"
rows = self._conexion.ejecutar_query(query, (f"%{nombre}%",))
return [
Proyecto(
id=row["id"],
nombre=row["nombre"],
descripcion=row["descripcion"],
departamento_id=row["departamento_id"],
fecha_inicio=row["fecha_inicio"],
fecha_fin=row["fecha_fin"],
estado=row["estado"],
)
for row in rows
]
def modificar(self, proyecto: Proyecto) -> bool:
query = """
UPDATE proyecto
SET
nombre = %s,
descripcion = %s,
departamento_id = %s,
fecha_inicio = %s,
fecha_fin = %s,
estado = %s,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
"""
params = (
proyecto.nombre,
proyecto.descripcion,
proyecto.departamento_id,
proyecto.fecha_inicio,
proyecto.fecha_fin,
proyecto.estado,
proyecto.id,
)
rowcount = self._conexion.ejecutar_query(query, params)
return rowcount == 1
def eliminar(self, id_proyecto: int) -> bool:
query = "DELETE FROM proyecto WHERE id = %s"
rowcount = self._conexion.ejecutar_query(query, (id_proyecto,))
return rowcount == 1
from typing import List
from dominio.usuario import Usuario
from dominio.excepciones import UsuarioNoEncontradoError
from persistencia.conexion import Conexion
class UsuarioDAO:
def __init__(self, conexion: Conexion) -> None:
self._conexion = conexion
def crear(self, usuario: Usuario) -> None:
sql_insert = """
INSERT INTO usuarios (username, password_hash)
VALUES (%s, %s)
"""
rowcount = self._conexion.ejecutar_query(
sql_insert,
(usuario.username, usuario.password_hash),
)
if rowcount != 1:
raise Exception("Error al insertar usuario")
sql_id = "SELECT LAST_INSERT_ID() AS id"
rows = self._conexion.ejecutar_query(sql_id)
usuario.id = rows[0]["id"]
def obtener_por_username(self, username: str) -> Usuario:
sql = """
SELECT id, username, password_hash
FROM usuarios
WHERE username = %s
"""
rows = self._conexion.ejecutar_query(sql, (username,))
if not rows:
raise UsuarioNoEncontradoError("Usuario no encontrado.")
return Usuario(**rows[0])
def obtener_por_id(self, user_id: int) -> Usuario:
sql = """
SELECT id, username, password_hash
FROM usuarios
WHERE id = %s
"""
rows = self._conexion.ejecutar_query(sql, (user_id,))
if not rows:
raise UsuarioNoEncontradoError(f"Usuario con ID {user_id} no existe.")
return Usuario(**rows[0])
def actualizar(self, usuario: Usuario) -> None:
sql = """
UPDATE usuarios
SET username = %s,
password_hash = %s,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
"""
rowcount = self._conexion.ejecutar_query(
sql,
(usuario.username, usuario.password_hash, usuario.id),
)
if rowcount != 1:
raise UsuarioNoEncontradoError(
f"No se pudo actualizar el usuario con ID {usuario.id}"
)
def eliminar(self, user_id: int) -> None:
sql = "DELETE FROM usuarios WHERE id = %s"
rowcount = self._conexion.ejecutar_query(sql, (user_id,))
if rowcount != 1:
raise UsuarioNoEncontradoError(
f"No se pudo eliminar el usuario con ID {user_id}"
)
def listar(self) -> List[Usuario]:
sql = "SELECT id, username, password_hash FROM usuarios"
rows = self._conexion.ejecutar_query(sql)
return [Usuario(**row) for row in rows]
# Configurar persistencia como paquete
import pymysql
from pymysql.connections import Connection
from configuracion.entorno import DB_HOST, DB_NAME, DB_USER, DB_USER_PASSWORD
from configuracion.auditoria import logger_bd
from persistencia.tipos import QueryResult, SQLParams
class Conexion:
def __init__(
self,
host: str = DB_HOST,
database: str = DB_NAME,
user: str = DB_USER,
password: str = DB_USER_PASSWORD,
) -> None:
self._host = host
self._database = database
self._user = user
self._password = password
self._conn: Connection | None = None
def _conectar(self) -> Connection | None:
if self._conn and getattr(self._conn, "open", False):
return self._conn
try:
self._conn = pymysql.connect(
host=self._host,
user=self._user,
password=self._password,
database=self._database,
cursorclass=pymysql.cursors.DictCursor,
)
logger_bd.info(
f"🔌 Conexión a 🐬 MySQL exitosa: {self._user}@{self._host} -> {self._database}"
)
return self._conn
except pymysql.MySQLError as e:
logger_bd.error(f"😩 Error al conectar: {e}")
self._conn = None
return None
def _desconectar(self) -> bool:
if self._conn:
try:
self._conn.close()
logger_bd.info("🔒 Conexión cerrada correctamente")
self._conn = None
return True
except pymysql.MySQLError as e:
print(f"[Conexion] Error al cerrar conexión: {e}")
return False
return False
def ejecutar_query(
self, query: str, params: SQLParams = None
) -> QueryResult | int | None:
if not self._conn:
logger_bd.error(
"❌ No se pudo establecer conexión con la BD antes de ejecutar query."
)
raise RuntimeError(
"No hay conexión activa. Debes abrir una conexión primero."
)
try:
logger_bd.debug(f"[Conexion] Ejecutando query: {query} | params: {params}")
with self._conn.cursor() as cur:
cur.execute(query, params)
self._conn.commit()
if query.strip().lower().startswith("select"):
return cur.fetchall()
return cur.rowcount
except pymysql.MySQLError as e:
logger_bd.error(f"[Conexion] Error en ejecutar_query: {e}")
return None
def abrir(self) -> bool:
"""Abre la conexión si no está abierta. Devuelve True si se conecta correctamente."""
return self._conectar() is not None
"""
Tipos auxiliares para la capa de persistencia.
Estos tipos se utilizan para describir los resultados devueltos por queries SQL
y los parámetros que pueden recibir dichas consultas.
"""
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
RowDict = Dict[str, Any]
RowTuple = Tuple[Any, ...]
QueryResult = List[RowDict]
SQLParams = Optional[Union[Tuple[Any, ...], List[Any], Iterable[Any]]]
Selecciona un archivo para ver su contenido

