Saltar al contenido principal

Message Broadcaster: Comunicación Segura Entre Nodos

En el corazón palpitante de KodeChain, donde los nodos deben comunicarse con precisión quirúrgica, emerge el Message Broadcaster como el sistema nervioso central. Esta implementación en consensus/message_broadcaster.go transforma el caos potencial de la comunicación distribuida en un flujo ordenado y seguro de mensajes.

La Arquitectura de Comunicación

MessageBroadcasterImpl: El Núcleo de Comunicación

type MessageBroadcasterImpl struct {
nodeID string
validators map[string]bool
nodeAddresses map[string]string // Mapeo de nodeID a dirección HTTP
handlers map[string]MessageHandler
mutex sync.RWMutex
isRunning bool
messageChan chan interface{}
httpClient *http.Client
}

Esta estructura mantiene un registro completo de todos los validadores, sus direcciones de red y los manejadores de mensajes especializados.

Inicialización y Ciclo de Vida

func NewMessageBroadcaster(nodeID string) *MessageBroadcasterImpl {
return &MessageBroadcasterImpl{
nodeID: nodeID,
validators: make(map[string]bool),
nodeAddresses: make(map[string]string),
handlers: make(map[string]MessageHandler),
messageChan: make(chan any, 1000), // Buffer de 1000 mensajes
httpClient: &http.Client{
Timeout: 10 * time.Second,
},
}
}

El broadcaster se inicializa con un buffer generoso para manejar picos de mensajes sin perder información crítica.

Gestión de Validadores y Direcciones

Registro Dinámico de Nodos

func (mb *MessageBroadcasterImpl) RegisterNodeAddress(nodeID string, address string) error {
mb.mutex.Lock()
defer mb.mutex.Unlock()

mb.nodeAddresses[nodeID] = address
utils.LogInfo("MessageBroadcaster: Dirección registrada para nodo %s: %s", nodeID, address)
return nil
}

Este método permite que los nodos se registren dinámicamente en la red, facilitando la auto-descubrimiento y la escalabilidad.

Gestión de Validadores

func (mb *MessageBroadcasterImpl) AddValidator(validatorID string) error {
mb.mutex.Lock()
defer mb.mutex.Unlock()

mb.validators[validatorID] = true
utils.LogInfo("MessageBroadcaster: Validador %s agregado", validatorID)
return nil
}

Los validadores se agregan y remueven dinámicamente según el estado del staking contract, manteniendo la consistencia del conjunto de validadores.

Broadcasting Seguro

Envío Masivo de Mensajes

func (mb *MessageBroadcasterImpl) Broadcast(message interface{}) error {
mb.mutex.RLock()
defer mb.mutex.RUnlock()

if !mb.isRunning {
return fmt.Errorf("broadcaster no está ejecutándose")
}

// Enviar mensaje a todos los validadores
for validatorID := range mb.validators {
if validatorID != mb.nodeID { // No enviar a sí mismo
if err := mb.SendToNode(validatorID, message); err != nil {
utils.LogWarning("MessageBroadcaster: Error enviando a %s: %v", validatorID, err)
}
}
}

return nil
}

El broadcasting es inteligente: envía mensajes a todos los validadores excepto al propio nodo, evitando bucles y optimizando el ancho de banda.

Envío Punto a Punto

func (mb *MessageBroadcasterImpl) SendToNode(nodeID string, message interface{}) error {
// Verificar que el nodo sea un validador
if !mb.validators[nodeID] {
return fmt.Errorf("nodo %s no es un validador", nodeID)
}

// Obtener dirección del nodo
nodeAddress, hasAddress := mb.nodeAddresses[nodeID]
if !hasAddress {
return fmt.Errorf("no se tiene dirección para nodo %s", nodeID)
}

// Enviar mensaje por HTTP
go mb.sendHTTPMessage(nodeAddress, message)

return nil
}

Cada envío se valida contra la lista de validadores activos y utiliza direcciones registradas.

Comunicación HTTP Segura

Envío de Mensajes HTTP

func (mb *MessageBroadcasterImpl) sendHTTPMessage(nodeAddress string, message interface{}) {
// Serializar mensaje a JSON
messageData, err := json.Marshal(message)
if err != nil {
utils.LogError("MessageBroadcaster: Error serializando mensaje: %v", err)
return
}

// Crear request HTTP
url := fmt.Sprintf("%s/api/events/broadcast", nodeAddress)
req, err := http.NewRequest("POST", url, bytes.NewBuffer(messageData))
if err != nil {
utils.LogError("MessageBroadcaster: Error creando request: %v", err)
return
}

req.Header.Set("Content-Type", "application/json")

// Enviar request
resp, err := mb.httpClient.Do(req)
if err != nil {
utils.LogWarning("MessageBroadcaster: Error enviando mensaje a %s: %v", nodeAddress, err)
return
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusOK {
utils.LogDebug("MessageBroadcaster: Mensaje enviado exitosamente a %s", nodeAddress)
} else {
utils.LogWarning("MessageBroadcaster: Respuesta no exitosa de %s: %d", nodeAddress, resp.StatusCode)
}
}

Los mensajes se envían por HTTP POST al endpoint /api/events/broadcast, con manejo robusto de errores y timeouts.

Sistema de Handlers

Registro de Manejadores

func (mb *MessageBroadcasterImpl) RegisterHandler(messageType string, handler MessageHandler) error {
mb.mutex.Lock()
defer mb.mutex.Unlock()

mb.handlers[messageType] = handler
utils.LogInfo("MessageBroadcaster: Handler registrado para tipo %s", messageType)
return nil
}

Cada tipo de mensaje puede tener su propio manejador especializado, permitiendo procesamiento personalizado.

Procesamiento de Mensajes

func (mb *MessageBroadcasterImpl) processMessages() {
for message := range mb.messageChan {
if message == nil {
continue
}

// Determinar el tipo de mensaje
messageType := mb.getMessageType(message)
if messageType == "" {
utils.LogWarning("MessageBroadcaster: Tipo de mensaje no reconocido")
continue
}

// Buscar handler para este tipo de mensaje
handler, exists := mb.handlers[messageType]
if !exists {
utils.LogWarning("MessageBroadcaster: No hay handler para tipo %s", messageType)
continue
}

// Procesar mensaje
if err := handler.HandleMessage(message); err != nil {
utils.LogError("MessageBroadcaster: Error procesando mensaje: %v", err)
}
}
}

El procesamiento es asíncrono y tolerante a fallos, con logging detallado para debugging.

Tipos de Mensajes Soportados

Mensajes PBFT

func (mb *MessageBroadcasterImpl) getMessageType(message interface{}) string {
switch message.(type) {
case *PBFTMessage:
if pbftMsg, ok := message.(*PBFTMessage); ok {
return pbftMsg.Type
}
case *PBFTPrePrepare:
return "pre-prepare"
case *PBFTPrepare:
return "prepare"
case *PBFTCommit:
return "commit"
case *PBFTViewChange:
return "view-change"
case *PBFTNewView:
return "new-view"
}

// Intentar extraer tipo de JSON
if jsonData, err := json.Marshal(message); err == nil {
var msgMap map[string]interface{}
if err := json.Unmarshal(jsonData, &msgMap); err == nil {
if msgType, exists := msgMap["type"]; exists {
if typeStr, ok := msgType.(string); ok {
return typeStr
}
}
}
}

return ""
}

El sistema reconoce automáticamente diferentes tipos de mensajes PBFT y puede extenderse para nuevos tipos.

Monitoreo y Métricas

Métricas del Sistema

func (mb *MessageBroadcasterImpl) GetMetrics() map[string]interface{} {
mb.mutex.RLock()
defer mb.mutex.RUnlock()

return map[string]interface{}{
"isRunning": mb.isRunning,
"totalValidators": len(mb.validators),
"channelSize": len(mb.messageChan),
"channelCap": cap(mb.messageChan),
"handlers": len(mb.handlers),
}
}

Simulación para Testing

func (mb *MessageBroadcasterImpl) SimulateNetworkLatency(delay time.Duration) {
time.Sleep(delay)
}

Esta función permite simular condiciones de red para testing y debugging.

Seguridad y Robustez

Validación de Mensajes

  • Autenticación: Solo validadores registrados pueden enviar/recibir mensajes
  • Integridad: Mensajes serializados a JSON con validación de estructura
  • Confidencialidad: Comunicación sobre HTTPS (recomendado en producción)
  • No repudio: Todos los mensajes se loggean con timestamps

Manejo de Errores

// Envío asíncrono con manejo de errores
go mb.sendHTTPMessage(nodeAddress, message)

// Logging detallado de errores
if err != nil {
utils.LogWarning("MessageBroadcaster: Error enviando mensaje a %s: %v", nodeAddress, err)
return
}

Los errores se manejan gracefully sin detener el sistema, con logging completo para análisis post-mortem.

Buffer de Mensajes

messageChan: make(chan any, 1000)

El buffer previene pérdida de mensajes durante picos de carga, manteniendo la estabilidad del sistema.

Integración con el Sistema

Conexión con PBFT

pbft := NewPBFT(broadcaster, logger)
pbft.Broadcaster = broadcaster

El broadcaster se integra directamente con el consenso PBFT, proporcionando la capa de comunicación necesaria.

Conexión con DPoS

dpos := NewDPoS(stakingContract)
// El DPoS utiliza el broadcaster a través del beacon manager

Aunque DPoS no utiliza broadcasting directo, se coordina a través del BeaconManager.

El Futuro de la Comunicación

El Message Broadcaster representa la evolución de la comunicación en sistemas distribuidos:

  • Escalabilidad: Manejo eficiente de miles de nodos
  • Flexibilidad: Soporte para nuevos tipos de mensajes
  • Seguridad: Comunicación autenticada y encriptada
  • Observabilidad: Métricas completas y logging detallado
  • Resiliencia: Manejo robusto de fallos de red y nodos

Cada línea de código en consensus/message_broadcaster.go está diseñada para crear no solo un sistema de mensajería, sino una red neuronal digital que mantiene la coherencia y coordinación de todo el ecosistema KodeChain.