Entrada
Preview Image

Ecotech Solutions


Ecotech Solutions

Planificación del Proyecto

La planificación del proyecto ecotech_solutions se llevó a cabo utilizando la plataforma SaaS Jira, aplicando el marco de trabajo Scrum para organizar y gestionar el desarrollo del sistema.

El proyecto corresponde a una aplicación de consola, orientada a la gestión de usuarios, empleados, departamentos y proyectos, y fue diseñada siguiendo una arquitectura N capas, lo que permite una clara separación de responsabilidades entre la capa de persistencia, lógica de negocio y presentación.

Durante la planificación, se definió un Product Backlog compuesto por épicas e historias de usuario, organizadas principalmente por capas del sistema. A partir de este backlog, se planificaron sprints enfocados en funcionalidades específicas, comenzando por la capa de persistencia, mientras que las historias asociadas a la capa de presentación en consola quedaron priorizadas en el backlog para sprints futuros.

ÉPICA 2: Capa de Persistencia

Descripción: Implementar el acceso y manejo de datos para empleados, departamentos y proyectos, utilizando un enfoque desacoplado mediante DAO.

Epica 2 Tablero de ecotech-solutions - Sprint 1

    Sin archivo x
              
            
              from dominio.departamento import Departamento
    from persistencia.conexion import Conexion
    from typing import List
    
    
    class DepartamentoDAO:
    
        def __init__(self, conexion: Conexion):
            """
            Recibe una instancia de Conexion ya configurada.
            No abre conexiones aquí (SRP).
            """
            self._conexion = conexion
    
        def agregar(self, depto: Departamento) -> bool:
            query = """
                INSERT INTO departamento (nombre, descripcion)
                VALUES (%s, %s)
            """
            params = (depto.nombre, depto.descripcion)
            rowcount = self._conexion.ejecutar_query(query, params)
            return rowcount == 1
    
        def mostrar(self) -> List[Departamento]:
            query = "SELECT id, nombre, descripcion FROM departamento"
            rows = self._conexion.ejecutar_query(query)
    
            return [
                Departamento(
                    id=row["id"], nombre=row["nombre"], descripcion=row["descripcion"]
                )
                for row in rows
            ]
    
        def buscar_por_codigo(self, codigo: int) -> Departamento | None:
            query = "SELECT id, nombre, descripcion FROM departamento WHERE id = %s"
            rows = self._conexion.ejecutar_query(query, (codigo,))
    
            if not rows:
                return None
    
            row = rows[0]
            return Departamento(
                id=row["id"], nombre=row["nombre"], descripcion=row["descripcion"]
            )
    
        def buscar_por_nombre(self, nombre: str) -> List[Departamento]:
            query = "SELECT id, nombre, descripcion FROM departamento WHERE nombre LIKE %s"
            rows = self._conexion.ejecutar_query(query, (f"%{nombre}%",))
    
            return [
                Departamento(
                    id=row["id"], nombre=row["nombre"], descripcion=row["descripcion"]
                )
                for row in rows
            ]
    
        def modificar(self, depto: Departamento) -> bool:
            query = """
                UPDATE departamento
                SET nombre = %s,
                    descripcion = %s,
                    updated_at = CURRENT_TIMESTAMP
                WHERE id = %s
            """
            params = (depto.nombre, depto.descripcion, depto.id)
            rowcount = self._conexion.ejecutar_query(query, params)
            return rowcount == 1
    
        def eliminar(self, id_depto: int) -> bool:
            query = "DELETE FROM departamento WHERE id = %s"
            rowcount = self._conexion.ejecutar_query(query, (id_depto,))
            return rowcount == 1
    
            
          
              
            
              from dominio.empleado import Empleado
    from persistencia.conexion import Conexion
    from typing import List
    
    class EmpleadoDAO:
    
        def __init__(self, conexion: Conexion):
            self._conexion = conexion
    
        def agregar(self, empleado: Empleado) -> bool:
            query = """
                INSERT INTO empleado (
                    rut,
                    nombre,
                    apellido,
                    direccion,
                    telefono,
                    correo,
                    fecha_contrato,
                    salario,
                    departamento_id,
                    proyecto_id
                )
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            """
            params = (
                empleado.rut,
                empleado.nombre,
                empleado.apellido,
                empleado.direccion,
                empleado.telefono,
                empleado.correo,
                empleado.fecha_contrato,
                empleado.salario,
                empleado.departamento_id,
                empleado.proyecto_id,
            )
            rowcount = self._conexion.ejecutar_query(query, params)
            return rowcount == 1
    
        def mostrar(self) -> List[Empleado]:
            query = """
                SELECT
                    id,
                    rut,
                    nombre,
                    apellido,
                    direccion,
                    telefono,
                    correo,
                    fecha_contrato,
                    salario,
                    departamento_id,
                    proyecto_id
                FROM empleado
            """
            rows = self._conexion.ejecutar_query(query)
    
            return [
                Empleado(
                    id=row["id"],
                    rut=row["rut"],
                    nombre=row["nombre"],
                    apellido=row["apellido"],
                    direccion=row["direccion"],
                    telefono=row["telefono"],
                    correo=row["correo"],
                    fecha_contrato=row["fecha_contrato"],
                    salario=row["salario"],
                    departamento_id=row["departamento_id"],
                    proyecto_id=row["proyecto_id"],
                )
                for row in rows
            ]
    
        def buscar_por_id(self, id_empleado: int) -> Empleado | None:
            query = "SELECT * FROM empleado WHERE id = %s"
            rows = self._conexion.ejecutar_query(query, (id_empleado,))
    
            if not rows:
                return None
    
            row = rows[0]
            return Empleado(
                id=row["id"],
                rut=row["rut"],
                nombre=row["nombre"],
                apellido=row["apellido"],
                direccion=row["direccion"],
                telefono=row["telefono"],
                correo=row["correo"],
                fecha_contrato=row["fecha_contrato"],
                salario=row["salario"],
                departamento_id=row["departamento_id"],
                proyecto_id=row["proyecto_id"],
            )
    
        def buscar_por_rut(self, rut: str) -> Empleado | None:
            query = "SELECT * FROM empleado WHERE rut = %s"
            rows = self._conexion.ejecutar_query(query, (rut,))
    
            if not rows:
                return None
    
            row = rows[0]
            return Empleado(
                id=row["id"],
                rut=row["rut"],
                nombre=row["nombre"],
                apellido=row["apellido"],
                direccion=row["direccion"],
                telefono=row["telefono"],
                correo=row["correo"],
                fecha_contrato=row["fecha_contrato"],
                salario=row["salario"],
                departamento_id=row["departamento_id"],
                proyecto_id=row["proyecto_id"],
            )
    
        def buscar_por_nombre(self, nombre: str) -> List[Empleado]:
            query = """
                SELECT * FROM empleado
                WHERE nombre LIKE %s OR apellido LIKE %s
            """
            rows = self._conexion.ejecutar_query(query, (f"%{nombre}%", f"%{nombre}%"))
    
            return [
                Empleado(
                    id=row["id"],
                    rut=row["rut"],
                    nombre=row["nombre"],
                    apellido=row["apellido"],
                    direccion=row["direccion"],
                    telefono=row["telefono"],
                    correo=row["correo"],
                    fecha_contrato=row["fecha_contrato"],
                    salario=row["salario"],
                    departamento_id=row["departamento_id"],
                    proyecto_id=row["proyecto_id"],
                )
                for row in rows
            ]
    
        def modificar(self, empleado: Empleado) -> bool:
            query = """
                UPDATE empleado
                SET
                    rut = %s,
                    nombre = %s,
                    apellido = %s,
                    direccion = %s,
                    telefono = %s,
                    correo = %s,
                    fecha_contrato = %s,
                    salario = %s,
                    departamento_id = %s,
                    proyecto_id = %s,
                    updated_at = CURRENT_TIMESTAMP
                WHERE id = %s
            """
            params = (
                empleado.rut,
                empleado.nombre,
                empleado.apellido,
                empleado.direccion,
                empleado.telefono,
                empleado.correo,
                empleado.fecha_contrato,
                empleado.salario,
                empleado.departamento_id,
                empleado.proyecto_id,
                empleado.id,
            )
            rowcount = self._conexion.ejecutar_query(query, params)
            return rowcount == 1
    
        def eliminar(self, id_empleado: int) -> bool:
            query = "DELETE FROM empleado WHERE id = %s"
            rowcount = self._conexion.ejecutar_query(query, (id_empleado,))
            return rowcount == 1
    
            
          
              
            
              from dominio.proyecto import Proyecto
    from persistencia.conexion import Conexion
    from typing import List
    
    class ProyectoDAO:
    
        def __init__(self, conexion: Conexion):
            self._conexion = conexion
    
        def agregar(self, proyecto: Proyecto) -> bool:
            query = """
                INSERT INTO proyecto (
                    nombre,
                    descripcion,
                    departamento_id,
                    fecha_inicio,
                    fecha_fin,
                    estado
                )
                VALUES (%s, %s, %s, %s, %s, %s)
            """
            params = (
                proyecto.nombre,
                proyecto.descripcion,
                proyecto.departamento_id,
                proyecto.fecha_inicio,
                proyecto.fecha_fin,
                proyecto.estado,
            )
            rowcount = self._conexion.ejecutar_query(query, params)
            return rowcount == 1
    
        def mostrar(self) -> List[Proyecto]:
            query = """
                SELECT
                    id,
                    nombre,
                    descripcion,
                    departamento_id,
                    fecha_inicio,
                    fecha_fin,
                    estado
                FROM proyecto
            """
            rows = self._conexion.ejecutar_query(query)
    
            return [
                Proyecto(
                    id=row["id"],
                    nombre=row["nombre"],
                    descripcion=row["descripcion"],
                    departamento_id=row["departamento_id"],
                    fecha_inicio=row["fecha_inicio"],
                    fecha_fin=row["fecha_fin"],
                    estado=row["estado"],
                )
                for row in rows
            ]
    
        def buscar_por_id(self, id_proyecto: int) -> Proyecto | None:
            query = "SELECT * FROM proyecto WHERE id = %s"
            rows = self._conexion.ejecutar_query(query, (id_proyecto,))
    
            if not rows:
                return None
    
            row = rows[0]
            return Proyecto(
                id=row["id"],
                nombre=row["nombre"],
                descripcion=row["descripcion"],
                departamento_id=row["departamento_id"],
                fecha_inicio=row["fecha_inicio"],
                fecha_fin=row["fecha_fin"],
                estado=row["estado"],
            )
    
        def buscar_por_nombre(self, nombre: str) -> List[Proyecto]:
            query = "SELECT * FROM proyecto WHERE nombre LIKE %s"
            rows = self._conexion.ejecutar_query(query, (f"%{nombre}%",))
    
            return [
                Proyecto(
                    id=row["id"],
                    nombre=row["nombre"],
                    descripcion=row["descripcion"],
                    departamento_id=row["departamento_id"],
                    fecha_inicio=row["fecha_inicio"],
                    fecha_fin=row["fecha_fin"],
                    estado=row["estado"],
                )
                for row in rows
            ]
    
        def modificar(self, proyecto: Proyecto) -> bool:
            query = """
                UPDATE proyecto
                SET
                    nombre = %s,
                    descripcion = %s,
                    departamento_id = %s,
                    fecha_inicio = %s,
                    fecha_fin = %s,
                    estado = %s,
                    updated_at = CURRENT_TIMESTAMP
                WHERE id = %s
            """
            params = (
                proyecto.nombre,
                proyecto.descripcion,
                proyecto.departamento_id,
                proyecto.fecha_inicio,
                proyecto.fecha_fin,
                proyecto.estado,
                proyecto.id,
            )
            rowcount = self._conexion.ejecutar_query(query, params)
            return rowcount == 1
    
        def eliminar(self, id_proyecto: int) -> bool:
            query = "DELETE FROM proyecto WHERE id = %s"
            rowcount = self._conexion.ejecutar_query(query, (id_proyecto,))
            return rowcount == 1
    
            
          
              
            
              from typing import List
    from dominio.usuario import Usuario
    from dominio.excepciones import UsuarioNoEncontradoError
    from persistencia.conexion import Conexion
    
    
    class UsuarioDAO:
    
        def __init__(self, conexion: Conexion) -> None:
            self._conexion = conexion
    
        def crear(self, usuario: Usuario) -> None:
            sql_insert = """
                INSERT INTO usuarios (username, password_hash)
                VALUES (%s, %s)
            """
    
            rowcount = self._conexion.ejecutar_query(
                sql_insert,
                (usuario.username, usuario.password_hash),
            )
    
            if rowcount != 1:
                raise Exception("Error al insertar usuario")
    
            sql_id = "SELECT LAST_INSERT_ID() AS id"
            rows = self._conexion.ejecutar_query(sql_id)
    
            usuario.id = rows[0]["id"]
    
        def obtener_por_username(self, username: str) -> Usuario:
            sql = """
                SELECT id, username, password_hash
                FROM usuarios
                WHERE username = %s
            """
            rows = self._conexion.ejecutar_query(sql, (username,))
    
            if not rows:
                raise UsuarioNoEncontradoError("Usuario no encontrado.")
    
            return Usuario(**rows[0])
    
        def obtener_por_id(self, user_id: int) -> Usuario:
            sql = """
                SELECT id, username, password_hash
                FROM usuarios
                WHERE id = %s
            """
            rows = self._conexion.ejecutar_query(sql, (user_id,))
    
            if not rows:
                raise UsuarioNoEncontradoError(f"Usuario con ID {user_id} no existe.")
    
            return Usuario(**rows[0])
    
        def actualizar(self, usuario: Usuario) -> None:
            sql = """
                UPDATE usuarios
                SET username = %s,
                    password_hash = %s,
                    updated_at = CURRENT_TIMESTAMP
                WHERE id = %s
            """
            rowcount = self._conexion.ejecutar_query(
                sql,
                (usuario.username, usuario.password_hash, usuario.id),
            )
    
            if rowcount != 1:
                raise UsuarioNoEncontradoError(
                    f"No se pudo actualizar el usuario con ID {usuario.id}"
                )
    
        def eliminar(self, user_id: int) -> None:
            sql = "DELETE FROM usuarios WHERE id = %s"
            rowcount = self._conexion.ejecutar_query(sql, (user_id,))
    
            if rowcount != 1:
                raise UsuarioNoEncontradoError(
                    f"No se pudo eliminar el usuario con ID {user_id}"
                )
    
        def listar(self) -> List[Usuario]:
            sql = "SELECT id, username, password_hash FROM usuarios"
            rows = self._conexion.ejecutar_query(sql)
            return [Usuario(**row) for row in rows]
    
            
          
              
            
              # Configurar persistencia como paquete
    
            
          
              
            
              import pymysql
    from pymysql.connections import Connection
    from configuracion.entorno import DB_HOST, DB_NAME, DB_USER, DB_USER_PASSWORD
    from configuracion.auditoria import logger_bd
    from persistencia.tipos import QueryResult, SQLParams
    
    
    class Conexion:
    
        def __init__(
            self,
            host: str = DB_HOST,
            database: str = DB_NAME,
            user: str = DB_USER,
            password: str = DB_USER_PASSWORD,
        ) -> None:
            self._host = host
            self._database = database
            self._user = user
            self._password = password
            self._conn: Connection | None = None
    
        def _conectar(self) -> Connection | None:
    
            if self._conn and getattr(self._conn, "open", False):
                return self._conn
    
            try:
                self._conn = pymysql.connect(
                    host=self._host,
                    user=self._user,
                    password=self._password,
                    database=self._database,
                    cursorclass=pymysql.cursors.DictCursor,
                )
                logger_bd.info(
                    f"🔌 Conexión a 🐬 MySQL exitosa: {self._user}@{self._host} -> {self._database}"
                )
                return self._conn
    
            except pymysql.MySQLError as e:
                logger_bd.error(f"😩 Error al conectar: {e}")
                self._conn = None
                return None
    
        def _desconectar(self) -> bool:
            if self._conn:
                try:
                    self._conn.close()
                    logger_bd.info("🔒 Conexión cerrada correctamente")
                    self._conn = None
                    return True
                except pymysql.MySQLError as e:
                    print(f"[Conexion] Error al cerrar conexión: {e}")
                    return False
            return False
    
        def ejecutar_query(
            self, query: str, params: SQLParams = None
        ) -> QueryResult | int | None:
            if not self._conn:
                logger_bd.error(
                    "❌ No se pudo establecer conexión con la BD antes de ejecutar query."
                )
                raise RuntimeError(
                    "No hay conexión activa. Debes abrir una conexión primero."
                )
    
            try:
                logger_bd.debug(f"[Conexion] Ejecutando query: {query} | params: {params}")
    
                with self._conn.cursor() as cur:
                    cur.execute(query, params)
                    self._conn.commit()
    
                    if query.strip().lower().startswith("select"):
                        return cur.fetchall()
    
                    return cur.rowcount
    
            except pymysql.MySQLError as e:
                logger_bd.error(f"[Conexion] Error en ejecutar_query: {e}")
                return None
    
        def abrir(self) -> bool:
            """Abre la conexión si no está abierta. Devuelve True si se conecta correctamente."""
            return self._conectar() is not None
    
            
          
              
            
              """
    Tipos auxiliares para la capa de persistencia.
    
    Estos tipos se utilizan para describir los resultados devueltos por queries SQL
    y los parámetros que pueden recibir dichas consultas.
    """
    from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
    
    RowDict = Dict[str, Any]
    
    RowTuple = Tuple[Any, ...]
    
    QueryResult = List[RowDict]
    
    SQLParams = Optional[Union[Tuple[Any, ...], List[Any], Iterable[Any]]]
    
            
          

    Selecciona un archivo para ver su contenido

    Backlog antes de la epica 4 Estado del Backlog antes de comenzar el próximo Sprint

    Esta entrada está licenciada bajo CC BY 4.0 por el autor.