Creación de servicio para validar accesos en go

Este articulo es una continuación del articulo anterior «Arquitectura reactiva: Una solución enfocada en los datos«, por lo tanto podemos ir así al hueso del asunto, ¿cómo poder implementar esa solución enfocada en datos?, ¿qué es lo necesario para lograr tener el dominio sobre los datos? Primeramente debemos analizar el problema que enfrenta la empresa donde estamos, lo normal es que exista un artefacto de software que esté solucionando ya ese problema y sino, es quizás el primer paso que debemos dar. Crear una aplicación que solucione un problema.

Supongamos que existe una empresa con una necesidad muy simple: Cada vez que ciertos trabajadores con su carnet pasan por las puertas dentro de un edificio deslizando el identificador por un sensor el sistema enviará los datos del trabajador a un servicio web y esperará de vuelta una validación de si existe el permiso para entrar o no a cierto lugar. Para este caso común de negocio puede existir un servicio REST creado con Golang, esperando poder validar las peticiones. Aparte debe existir una base de datos (supongamos una base de datos relacional como mysql) donde esté guardada la información del usuario, también la información del sensor  una tabla (que es la que nos interesa) donde están los acceso de cada usuario a ciertas entradas.

Diagrama #1

 

Veamos un poco del código de ese servicio validador para entenderlo quizás mejor, pero en general lo que hace es recibir el ID del carnet y del sensor principalmente (omitiendo temas de autenticación que no competen al artículo) con el cual busca en la base de datos y valida que el usuario tenga acceso.


type AccessInformation struct {
            Id string `json:»id»`
            FirstName  string `json:»first_name»`
            LastName string `json:»last_name»`
            Rol string `json:»rol»`
            SensorId string `json:»sensor_id»`
            SensorPlace string `json:»sensor_place»`
            AccessTime time.Time `json:»access_time»`
}

func NewAccessInformation() *AccessInformation {
            return &AccessInformation{}
}

Estructura que recibirá el servicio


 


func (c *AccessUseCase) ValidateAccess(access *entity.AccessInformation) error  {
hasAccess, err := c.accessRepository.HasAccess(access.Id,access.SensorId)
        if err!=nil{
                 return err
        }

        if !hasAccess {

               return errors.New(«access denied»)
         }

        return nil
}

Validación de acceso con el repositorio


 


func (c *AccessUseCase) ValidateAccess(access *entity.AccessInformation) error  {
hasAccess, err := c.accessRepository.HasAccess(access.Id,access.SensorId)
        if err!=nil{
                return err
        }

       if !hasAccess {

               return errors.New(«access denied»)
        }

        return nil
}

func (c *AccessMysqlRepository) HasAccess(idPersonalInformation string, idSensor string) (bool, error) {
          var (
                     hasAccess bool
            )
           err := c.mysqlDb.db.QueryRow(«SELECT id_personal,id_sensor,has_access «+
«FROM access_personal «+
«WHERE id_personal = ? AND id_sensor = ?»
,
                     idPersonalInformation, idSensor).
                     Scan(&idPersonalInformation, &idSensor, &hasAccess)

            if err != nil {
                       fmt.Println(«…error get row»)
             }
            return hasAccess, nil
}

Validación de acceso base de datos mysql


 

 

 

Entonces este podría ser un servicio común con todo lo necesario para hacer funcionar este proceso correctamente. Pero alguien en la empresa se ha percatado que hay información útil en este proceso para la toma de decisiones. Y esa información es en qué lugar del edificio está cada trabajador y el tiempo que está en cada lugar históricamente (supongamos que esta información en realidad es útil de alguna manera). Entonces se ha decidido guardar en otra base de datos, quizás no relacional, el nombre del trabajador, su rol, cuál fue la última entrada por la cual pasó y la hora, aparte de un registro de cuánto tiempo ha estado en cada espacio.

En sí se trataría de que cada vez que el trabajador pasa por una entrada, luego de validar que este tiene acceso, se enviará un mensaje a otro artefacto con información como el id, nombre y rol del trabajador, un identificador de la entrada (ej. Sala de reuniones) y la fecha de acceso.

Diagrama #2

 

Con ese mensaje otro artefacto se encargará de buscar si ya existe el trabajador registrado. Si no existe lo creará, ingresando su información, entre la que estará el nombre del lugar donde está exactamente, agregando además el lugar a un registro con su fecha de entrada. Si el trabajador ya existe, este artefacto lo único que hará es actualizar el lugar donde está y la hora de acceso, pero además buscará el último registro para agregarle fecha de salida, además de agregar el tiempo que estuvo allí (la diferencia entre la entrada y la salida) agregando también el registro del nuevo lugar con su fecha de entrada.

 

Diagrama #3

Entre estos dos artefactos o entre estas dos partes de la arquitectura debe haber un canal de comunicación y lo que es en sí mismo el corazón de cualquier arquitectura reactiva. Uno muy conocido, aparte de usado para estas soluciones, es kafka, un sistema de mensajes asíncronos. Esta será la manera de comunicarse de estos dos artefactos. Finalmente toda la arquitectura podría representarse así:

 

Diagrama #4

 

Creación de artefacto LugarTrabajador en Go

El artefacto que se encargará de guardar los datos tampoco será algo muy complejo, en general recibirá el mensaje de kafka (que más adelante se explicará cómo se implementa) y entonces se hace una consulta a esa base de datos nosql (mongoDb), donde se agregará o se actualizará los datos del trabajador y el lugar donde está. Para el tema de las fechas también, si ya existe el trabajador, se obtendrá la fecha de entrada del último lugar y se calculará la diferencia con la nueva fecha de entrada. Así suponiendo que no hay espacios muertos en el edificio se podrá saber cuánto tiempo ha estado en cada lugar.


func (c AccessRecordUseCase) CreateOrUpdateAccessRecord(record *entity.AccessRecord) {
            document, err := c.accessRepository.GetAccessRecordDocument(record.Id)
            if err != nil {
                       document = entity.NewAccessRecordDocument(record)
                       err = c.accessRepository.SaveAccessRecordDocument(document)
                      if err != nil {
                                  fmt.Println(«unable to save access record»)
                       }
             }else{
                        UpdateAccessRecord(record, document)
                        err = c.accessRepository.UpdateAccessRecordDocument(document)
                      if err != nil {
                                  fmt.Println(«unable to update access record»)
                       }
             }

            fmt.Printf(«document %s saved\n», document.Id)
}

Creación o actualización de registros en general


 


func UpdateAccessRecord(record *entity.AccessRecord, document *entity.AccessRecordDocument)  {
          document.CurrentPlace = record.Place
          document.LastAccessTime = record.AccessTime

          lastIndex := len(document.Places)-1
         document.Places[lastIndex].OutDate = record.AccessTime
         document.Places[lastIndex].DurationInSecond =            document.Places[lastIndex].OutDate.Sub(document.Places[lastIndex].InDate).Seconds()

         document.Places = append(document.Places, entity.NewAccessPlace(record) )
}

Lógica de actualización de registro


 


func (c *AccessMongoDbRepository)GetAccessRecordDocument(idDocument string) (*entity.AccessRecordDocument, error) {
           filter := bson.D{{«id», idDocument}}
           var result *entity.AccessRecordDocument

           err := c.collection.FindOne(context.TODO(), filter).Decode(&result)
           if err != nil {
                      fmt.Println(err)
                     return nil, err
            }

           fmt.Printf(«Found a single document: %+v\n», result)

          return result, nil
}


func (c *AccessMongoDbRepository)SaveAccessRecordDocument(record *entity.AccessRecordDocument) error {
            insertResult, err := c.collection.InsertOne(context.TODO(),record)
           if err != nil {
                      fmt.Println(err)
                     return err
            }

           fmt.Println(«Saved a single document: «, insertResult.InsertedID)

          return nil
}

func (c *AccessMongoDbRepository)UpdateAccessRecordDocument(record *entity.AccessRecordDocument) error {
            filter := bson.M{«id»: record.Id}
            insertResult, err := c.collection.ReplaceOne(context.TODO(), filter,record)
           if err != nil {
                      fmt.Println(err)
                     return err
            }

           fmt.Println(«Saved a single document: «, insertResult.UpsertedID)

           return nil
}

Código de procedimientos sobre Mongodb


Configurar la conexión con kafka de los artefactos

Por último no se explicará en este artículo como desplegar un cluster de kafka ni cómo crear los tópicos, sin embargo ya existen muy buenas soluciones documentadas para hacerlo. Ya teniendo un cluster y el tópico creado, que en este caso se llamará: access-record, se puede enviar y recibir mensajes con go utilizando librerías como sarama, con lo cual se hará más fácil la conexión con kafka.

En el caso del producer, que sería nuestro artefacto de validación de acceso la conexión se haría de esta manera:


func NewAccessRecordKafkaProducer() *AccessRecordKafkaProducer {
           config := sarama.NewConfig()
           config.Producer.Return.Successes = true
           config.Producer.RequiredAcks = sarama.WaitForAll
           config.Producer.Retry.Max = 5
          // NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
          conn, err := sarama.NewSyncProducer(strings.Split(BrokersUrl,«,»), config)
          if err != nil {
                    return nil
           }

           return &AccessRecordKafkaProducer{producer: conn}
}

func (c AccessRecordKafkaProducer) Close()  {
            err := c.producer.Close()
           if err != nil {
                     return
            }
}

func (c *AccessRecordKafkaProducer)PushMessage(message []byte) error {
            msg := &sarama.ProducerMessage{
                        Topic: AccessRecordTopics,
                        Value: sarama.StringEncoder(message),
             }
             partition, offset, err := c.producer.SendMessage(msg)
            if err != nil {
                      return err
             }
             fmt.Printf(«Message is stored in topic(%s)/partition(%d)/offset(%d)\n», AccessRecordTopics, partition, offset)
            return nil
}

Código del kafka producer para el envío de mensajes


 


func (c *AccessUseCase) ValidateAccess(access *entity.AccessInformation) error  {
            hasAccess, err := c.accessRepository.HasAccess(access.Id,access.SensorId)
           if err!=nil{
                    return err
            }

          if hasAccess {
                      accessRecord := entity.NewAccessRecord(access)
                      accessRecordByte, err := json.Marshal(accessRecord)
                    if err != nil{
                                fmt.Println(«… Error to send kafka message to marshall»)
                     }

                    err = c.accessRecordProducer.PushMessage(accessRecordByte)
                   if err != nil {
                              fmt.Println(«… Error to send kafka message»)
                    }
          }else{
                   return errors.New(«access denied»)
          }

         return nil
}

Implementación del envío de mensaje al validar


Para el caso del consumer, que sería nuestro LugarTrabajador, la configuración para recibir mensaje por kafka sería de la siguiente manera:


func NewAccessRecordKafkaConsumer(usecase *application.AccessRecordUseCase) *AccessRecordKafkaConsumer {
           config := sarama.NewConfig()
           config.Consumer.Return.Errors = true
          // NewConsumer creates a new consumer using the given broker addresses and configuration
           conn, err := sarama.NewConsumer(strings.Split(BrokersUrl,«,»), config)
          if err != nil {
                    return nil
           }

          return &AccessRecordKafkaConsumer{worker: conn,usecase: usecase}
}

func (c *AccessRecordKafkaConsumer)Run() {
// calling ConsumePartition. It will open one connection per broker
// and share it for all partitions that live on it.

            consumer, err := c.worker.ConsumePartition(AccessRecordTopics, 0,sarama.OffsetOldest)
            if err != nil {
                      panic(err)
             }
            fmt.Println(«Consumer started «)
            sigchan := make(chan os.Signal, 1)
            signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
            // Count how many message processed
            msgCount := 0

            // Get signal for finish
            doneCh := make(chan struct{})
            go func() {
                        for {
                                  select {
                                  case err := <-consumer.Errors():
                                              fmt.Println(err)
                                  case msg := <-consumer.Messages():
                                              CreateOrUpdate(msg,c.usecase)
                                              msgCount++
                                              fmt.Printf(«Received message Count %d: | Topic(%s) | Message(%s) \n», msgCount, msg.Topic, string(msg.Value))
                                   case <-sigchan:
                                                fmt.Println(«Interrupt is detected»)
                                                doneCh <- struct{}{}
                                      }
                           }
              }()

              <-doneCh
              fmt.Println(«Processed», msgCount, «messages»)

             if err := c.worker.Close(); err != nil {
                      panic(err)
              }
}

func CreateOrUpdate(msg *sarama.ConsumerMessage, usecase *application.AccessRecordUseCase){
           record := &entity.AccessRecord{}
           err := record.DecodeByte(msg.Value)
          if err != nil {
                     fmt.Println(«unable to decode message and create or update document»)
          }else{
                    usecase.CreateOrUpdateAccessRecord(record)
          }
}

Código del kafka consumer para la lectura de mensajes


Los datos: la puerta a mejores soluciones

Por lo tanto podríamos tener así, una arquitectura reactiva, una que haciendo uso de los datos de procesos ya digitalizados, pueda ir actualizando registros que estén en otro lugar de manera reactiva. Estos registros pueden ser usados por otra aplicación, en nuestro caso de ejemplo podría ser un visualizador para que algún jefe (sin muy buena cultura laboral) esté espiando lo que hacen sus trabajadores en cada momento, con gráficos incluidos y todo. 

También la solución podría servir para que algún científico de datos pueda analizar  qué roles tienden a estar en ciertos lugares del edificio con más frecuencia, lo que ayudaría a crear una mejor distribución de las oficinas. Pero podemos ir más allá y crear un modelo de machine learning que aprenda sobre las preferencias de los trabajadores por ciertos lugares del edificio y así poder asignar la mejor oficina para los nuevos ingresos según su rol. Hasta podíamos ingresar a la ecuación otros datos relacionados al trabajador, quizás para saber como ciertas características del mismo influyen en el tiempo que tarda en cada lugar del edificio. 

Lo importante de una solución de esta magnitud es que esté enfocada en datos y es cuando la empresa logra controlar estos datos que tiene un mundo de posibilidades por delante, es en ese momento donde podrá mejorar su producto de manera exponencial. Los datos terminarán siendo la puerta, pero también el camino a las mejores soluciones.

¿Te animas a implementarlo?  💪

Ver código de ejemplo

 

El verdadero valor de la digitalización

Nada como llegar a casa… quizás en un taxi que pediste por una app y llegar pidiendo comida por alguna aplicación de delivery; acostarte y seleccionar tu programa favorito en un servicio de streaming; agregar un producto nuevo a tu lista de compra (la cual puedes revisar en tu teléfono o en la computadora), para al día siguiente pedir esa lista por medio de la aplicación de tu supermercado de preferencia. Todas estas cosas que podrían ser el día a día de muchos tienen algo en común y es que no siempre fueron tan fáciles de llevar a cabo, cada uno de estos procesos han pasado por una digitalización o han sufrido, como se escucha mucho ahora en el ámbito empresarial, una transformación digital.

Transformación digital: La tecnología en todos
los aspectos de la vida humana

Y es que ese es el objetivo de lo que se conoce como transformación digital, utilizar la tecnología para facilitar procesos que sin la misma serían mucho más engorrosos, complicados o en algunos casos hasta imposibles. Es que en un mundo donde cada vez es más normal el uso de dispositivos digitales de todo tipo (como bombillas que se conectan a internet), es natural que las empresas de cualquier tipo empiecen a reconocer el valor de la digitalización, el valor de convertir sus procesos análogos o manuales en procesos que pueden ser llevado a cabo por dispositivos digitales.

Pero es que esto no solo facilita la vida a los clientes de estas empresas, lo cual de por sí ya trae un gran valor para las misma; sino que también trae consigo un nuevo actor al escenario, del que antes no se hablaba mucho hasta que empresas como google o facebook demostraron lo valioso que podría llegar a ser y sí, nos referimos a los datos, a la información. Lo que se conoce ahora como big data, esas grandes fuentes de información que muchas empresas poseen y otras desean tener, pero que solo se logra tener en ambientes y procesos verdaderamente digitalizados.

 

Empresas que se empoderan con datos… o mueren

Esto ha traído un verdadero cambio de paradigma en muchas empresas, ha traído a colación casi que una nueva regla de supervivencia: empoderarse con datos… o morir. Actualmente, en un mundo donde casi todas las ideas se han descubierto (o eso creemos), es cada vez más difícil sobresalir como empresa solo por tener una idea brillante y se hace cada vez más necesario obtener datos que luego puedan ser usados de manera brillante.

Es por eso que muchas empresa ofrecen sus servicios “gratis”, como es el caso de la mayoría de las redes sociales y es que sus clientes no somos los usuarios que usamos sus aplicaciones, sus clientes son las empresas a quienes les ofrecen servicios que puedan aprovechar al máximo nuestros datos. Creando un ecosistema de negocio donde la empresa con mejores datos tendrá más clientes, en otras palabras tendrán más oportunidades de sobrevivir.

Netflix logró su transformación digital en etapas tempranas y aunque Blockbuster siendo en su momento el rey de las películas caseras intentó remontarse a la ola, era muy tarde.

Por lo tanto se hace imprescindible para muchas empresas tradicionales no solo poder crear mejores soluciones para sus clientes, digitalizando las mismas por ejemplo, sino también buscar las formas de poder sacarle provecho a los datos que generan sus propios procesos, para así seguir mejorando la experiencia de sus productos. Es el verdadero empoderamiento de los datos el que puede lograr esta meta, la de crear productos que impacten verdaderamente al cliente y por lo tanto generen una rentabilidad sustentable.

 

Antes de que las máquinas aprendan, ¿de dónde lo aprenden?

Todo este asunto de la transformación digital ha terminado en directivos de empresas que apenas comenzando a implementar soluciones digitales a sus procesos quieren empezar a automatizar sus soluciones de manera inteligente. Crear soluciones como las de grandes empresas, que se adapten a las situaciones o conozcan a sus clientes de manera automática. Comienzan a hablar de inteligencia artificial o el aprendizaje automatizado… pero antes de empezar a lograr siquiera eso olvidan lo más importante: Los datos.

Empresas como google han logrado tener soluciones increíbles no solo por tener buenos ingenieros, sino por las cantidades ingentes de datos que han logrado recolectar con el tiempo y con lo cual han logrado entrenar a sus modelos de aprendizaje automatizado. Por lo tanto antes de pensar en aplicar machine learning, inteligencia artificial o soluciones que aprendan solas que es lo mejor para el usuario, primero toca saber qué es lo que el usuario considera mejor.

 

Una solución enfocada en los datos: Arquitecturas reactivas

Y es cuando se le da la verdadera importancia al corazón de la digitalización cuando las empresas empiezan a enfocar sus esfuerzos en crear sistemas centrados en el flujo de los datos. Entre ellos uno de los más conocidos son las arquitecturas reactivas. Según el manifiesto reactivo, un sistema reactivo tiene algunas características principales:

  • Ser responsivo: Un sistema que se adapta a las diferentes situaciones, detectando rápidamente sus problemas y resolviéndolos. Pudiendo responder de manera eficiente a cada usuario, fomentando una mayor interacción con este.
  • Ser resiliente: Poder manejar las fallas para seguir respondiendo, un sistema reactivo debe ser tolerante a fallos evitando el no estar disponible. Y esta resiliencia se logra con replicación, contención, insolación y delegación dentro de sus flujos.
  • Ser elástico: En un sistema reactivo el manejo de los recursos es de suma importancia, permitiéndose recibir grandes flujos de entrada, haciendo uso eficiente de los recursos dependiendo de la situación.
  • Ser dirigido por mensajes: Un sistema reactivo se basa en el envío de mensajes asincrónicos, tanto para enviar información como para manejar los errores dentro de sí. Lo cual asegura un acoplamiento holgado entre las partes, aislamiento correcto y transparencia entre componentes.

 

Sistemas reactivos: Responsivos, resilientes, elásticos y dirigidos por mensajes

Por lo tanto se puede entender como arquitectura reactiva a una forma de construir soluciones informáticas con estas características, las cuales tienen como objetivo principal lograr un flujo de datos rápido, seguro, escalable y dirigidos por datos. Para conseguir esto se usan ciertas tecnologías de comunicación y persistencia que en conjunto con aplicativos que pueden ser reactivos en sí mismo (haciendo uso de lenguajes reactivos) crean flujos completamente dirigidos por los datos.

Continuará… ✌️