Message Broadcaster: Comunicación Segura Entre Nodos
En el corazón palpitante de KodeChain, donde los nodos deben comunicarse con precisión quirúrgica, emerge el Message Broadcaster como el sistema nervioso central. Esta implementación en consensus/message_broadcaster.go transforma el caos potencial de la comunicación distribuida en un flujo ordenado y seguro de mensajes.
La Arquitectura de Comunicación
MessageBroadcasterImpl: El Núcleo de Comunicación
type MessageBroadcasterImpl struct {
nodeID string
validators map[string]bool
nodeAddresses map[string]string // Mapeo de nodeID a dirección HTTP
handlers map[string]MessageHandler
mutex sync.RWMutex
isRunning bool
messageChan chan interface{}
httpClient *http.Client
}
Esta estructura mantiene un registro completo de todos los validadores, sus direcciones de red y los manejadores de mensajes especializados.
Inicialización y Ciclo de Vida
func NewMessageBroadcaster(nodeID string) *MessageBroadcasterImpl {
return &MessageBroadcasterImpl{
nodeID: nodeID,
validators: make(map[string]bool),
nodeAddresses: make(map[string]string),
handlers: make(map[string]MessageHandler),
messageChan: make(chan any, 1000), // Buffer de 1000 mensajes
httpClient: &http.Client{
Timeout: 10 * time.Second,
},
}
}
El broadcaster se inicializa con un buffer generoso para manejar picos de mensajes sin perder información crítica.
Gestión de Validadores y Direcciones
Registro Dinámico de Nodos
func (mb *MessageBroadcasterImpl) RegisterNodeAddress(nodeID string, address string) error {
mb.mutex.Lock()
defer mb.mutex.Unlock()
mb.nodeAddresses[nodeID] = address
utils.LogInfo("MessageBroadcaster: Dirección registrada para nodo %s: %s", nodeID, address)
return nil
}
Este método permite que los nodos se registren dinámicamente en la red, facilitando la auto-descubrimiento y la escalabilidad.
Gestión de Validadores
func (mb *MessageBroadcasterImpl) AddValidator(validatorID string) error {
mb.mutex.Lock()
defer mb.mutex.Unlock()
mb.validators[validatorID] = true
utils.LogInfo("MessageBroadcaster: Validador %s agregado", validatorID)
return nil
}
Los validadores se agregan y remueven dinámicamente según el estado del staking contract, manteniendo la consistencia del conjunto de validadores.
Broadcasting Seguro
Envío Masivo de Mensajes
func (mb *MessageBroadcasterImpl) Broadcast(message interface{}) error {
mb.mutex.RLock()
defer mb.mutex.RUnlock()
if !mb.isRunning {
return fmt.Errorf("broadcaster no está ejecutándose")
}
// Enviar mensaje a todos los validadores
for validatorID := range mb.validators {
if validatorID != mb.nodeID { // No enviar a sí mismo
if err := mb.SendToNode(validatorID, message); err != nil {
utils.LogWarning("MessageBroadcaster: Error enviando a %s: %v", validatorID, err)
}
}
}
return nil
}
El broadcasting es inteligente: envía mensajes a todos los validadores excepto al propio nodo, evitando bucles y optimizando el ancho de banda.
Envío Punto a Punto
func (mb *MessageBroadcasterImpl) SendToNode(nodeID string, message interface{}) error {
// Verificar que el nodo sea un validador
if !mb.validators[nodeID] {
return fmt.Errorf("nodo %s no es un validador", nodeID)
}
// Obtener dirección del nodo
nodeAddress, hasAddress := mb.nodeAddresses[nodeID]
if !hasAddress {
return fmt.Errorf("no se tiene dirección para nodo %s", nodeID)
}
// Enviar mensaje por HTTP
go mb.sendHTTPMessage(nodeAddress, message)
return nil
}
Cada envío se valida contra la lista de validadores activos y utiliza direcciones registradas.
Comunicación HTTP Segura
Envío de Mensajes HTTP
func (mb *MessageBroadcasterImpl) sendHTTPMessage(nodeAddress string, message interface{}) {
// Serializar mensaje a JSON
messageData, err := json.Marshal(message)
if err != nil {
utils.LogError("MessageBroadcaster: Error serializando mensaje: %v", err)
return
}
// Crear request HTTP
url := fmt.Sprintf("%s/api/events/broadcast", nodeAddress)
req, err := http.NewRequest("POST", url, bytes.NewBuffer(messageData))
if err != nil {
utils.LogError("MessageBroadcaster: Error creando request: %v", err)
return
}
req.Header.Set("Content-Type", "application/json")
// Enviar request
resp, err := mb.httpClient.Do(req)
if err != nil {
utils.LogWarning("MessageBroadcaster: Error enviando mensaje a %s: %v", nodeAddress, err)
return
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
utils.LogDebug("MessageBroadcaster: Mensaje enviado exitosamente a %s", nodeAddress)
} else {
utils.LogWarning("MessageBroadcaster: Respuesta no exitosa de %s: %d", nodeAddress, resp.StatusCode)
}
}
Los mensajes se envían por HTTP POST al endpoint /api/events/broadcast, con manejo robusto de errores y timeouts.
Sistema de Handlers
Registro de Manejadores
func (mb *MessageBroadcasterImpl) RegisterHandler(messageType string, handler MessageHandler) error {
mb.mutex.Lock()
defer mb.mutex.Unlock()
mb.handlers[messageType] = handler
utils.LogInfo("MessageBroadcaster: Handler registrado para tipo %s", messageType)
return nil
}
Cada tipo de mensaje puede tener su propio manejador especializado, permitiendo procesamiento personalizado.
Procesamiento de Mensajes
func (mb *MessageBroadcasterImpl) processMessages() {
for message := range mb.messageChan {
if message == nil {
continue
}
// Determinar el tipo de mensaje
messageType := mb.getMessageType(message)
if messageType == "" {
utils.LogWarning("MessageBroadcaster: Tipo de mensaje no reconocido")
continue
}
// Buscar handler para este tipo de mensaje
handler, exists := mb.handlers[messageType]
if !exists {
utils.LogWarning("MessageBroadcaster: No hay handler para tipo %s", messageType)
continue
}
// Procesar mensaje
if err := handler.HandleMessage(message); err != nil {
utils.LogError("MessageBroadcaster: Error procesando mensaje: %v", err)
}
}
}
El procesamiento es asíncrono y tolerante a fallos, con logging detallado para debugging.
Tipos de Mensajes Soportados
Mensajes PBFT
func (mb *MessageBroadcasterImpl) getMessageType(message interface{}) string {
switch message.(type) {
case *PBFTMessage:
if pbftMsg, ok := message.(*PBFTMessage); ok {
return pbftMsg.Type
}
case *PBFTPrePrepare:
return "pre-prepare"
case *PBFTPrepare:
return "prepare"
case *PBFTCommit:
return "commit"
case *PBFTViewChange:
return "view-change"
case *PBFTNewView:
return "new-view"
}
// Intentar extraer tipo de JSON
if jsonData, err := json.Marshal(message); err == nil {
var msgMap map[string]interface{}
if err := json.Unmarshal(jsonData, &msgMap); err == nil {
if msgType, exists := msgMap["type"]; exists {
if typeStr, ok := msgType.(string); ok {
return typeStr
}
}
}
}
return ""
}
El sistema reconoce automáticamente diferentes tipos de mensajes PBFT y puede extenderse para nuevos tipos.
Monitoreo y Métricas
Métricas del Sistema
func (mb *MessageBroadcasterImpl) GetMetrics() map[string]interface{} {
mb.mutex.RLock()
defer mb.mutex.RUnlock()
return map[string]interface{}{
"isRunning": mb.isRunning,
"totalValidators": len(mb.validators),
"channelSize": len(mb.messageChan),
"channelCap": cap(mb.messageChan),
"handlers": len(mb.handlers),
}
}
Simulación para Testing
func (mb *MessageBroadcasterImpl) SimulateNetworkLatency(delay time.Duration) {
time.Sleep(delay)
}
Esta función permite simular condiciones de red para testing y debugging.
Seguridad y Robustez
Validación de Mensajes
- Autenticación: Solo validadores registrados pueden enviar/recibir mensajes
- Integridad: Mensajes serializados a JSON con validación de estructura
- Confidencialidad: Comunicación sobre HTTPS (recomendado en producción)
- No repudio: Todos los mensajes se loggean con timestamps
Manejo de Errores
// Envío asíncrono con manejo de errores
go mb.sendHTTPMessage(nodeAddress, message)
// Logging detallado de errores
if err != nil {
utils.LogWarning("MessageBroadcaster: Error enviando mensaje a %s: %v", nodeAddress, err)
return
}
Los errores se manejan gracefully sin detener el sistema, con logging completo para análisis post-mortem.
Buffer de Mensajes
messageChan: make(chan any, 1000)
El buffer previene pérdida de mensajes durante picos de carga, manteniendo la estabilidad del sistema.
Integración con el Sistema
Conexión con PBFT
pbft := NewPBFT(broadcaster, logger)
pbft.Broadcaster = broadcaster
El broadcaster se integra directamente con el consenso PBFT, proporcionando la capa de comunicación necesaria.
Conexión con DPoS
dpos := NewDPoS(stakingContract)
// El DPoS utiliza el broadcaster a través del beacon manager
Aunque DPoS no utiliza broadcasting directo, se coordina a través del BeaconManager.
El Futuro de la Comunicación
El Message Broadcaster representa la evolución de la comunicación en sistemas distribuidos:
- Escalabilidad: Manejo eficiente de miles de nodos
- Flexibilidad: Soporte para nuevos tipos de mensajes
- Seguridad: Comunicación autenticada y encriptada
- Observabilidad: Métricas completas y logging detallado
- Resiliencia: Manejo robusto de fallos de red y nodos
Cada línea de código en consensus/message_broadcaster.go está diseñada para crear no solo un sistema de mensajería, sino una red neuronal digital que mantiene la coherencia y coordinación de todo el ecosistema KodeChain.