Saltar al contenido principal

Gestión de Conexiones P2P

En el vasto océano de la computación distribuida, donde los nodos flotan como islas en un mar digital, las conexiones P2P representan los puentes invisibles que unen estas islas en un continente coherente. Implementado en p2p/connection.go, el sistema de gestión de conexiones de KodeChain transforma el aislamiento individual en una red colaborativa poderosa.

La Arquitectura de Conexión

P2PConnection: El Coordinador Central

type P2PConnection struct {
mutex sync.RWMutex
activeConnections map[string]*Connection
discovery *DiscoveryProtocol
localPort int
maxPeers int
}

Esta estructura central mantiene el estado de todas las conexiones activas, coordinando entre el protocolo de descubrimiento y las conexiones TCP individuales. me sale "[WARNING] Markdown link with URL Distribution.md in source file "docs/KodeCoin/Tokenomics.md" (326:4) couldn't be resolved. Make sure it references a local Markdown file that exists within the current plugin. [WARNING] Markdown link with URL Burn-Mechanism.md in source file "docs/KodeCoin/Tokenomics.md" (327:4) couldn't be resolved. Make sure it references a local Markdown file that exists within the current plugin. [WARNING] Markdown link with URL Fees-Structure.md in source file "docs/KodeCoin/Tokenomics.md" (328:4) couldn't be resolved." y aparte no me sale "[WARNING] Markdown link with URL Distribution.md in source file "docs/KodeCoin/Tokenomics.md" (326:4) couldn't be resolved. Make sure it references a local Markdown file that exists within the current plugin. [WARNING] Markdown link with URL Burn-Mechanism.md in source file "docs/KodeCoin/Tokenomics.md" (327:4) couldn't be resolved. Make sure it references a local Markdown file that exists within the current plugin. [WARNING] Markdown link with URL Fees-Structure.md in source file "docs/KodeCoin/Tokenomics.md" (328:4) couldn't be resolved." y aparte no

Connection: La Unidad Básica

type Connection struct {
ID string
RemoteAddr string
RemotePort int
Conn net.Conn
IsConnected bool
LastActivity time.Time
Capabilities []string
TrustScore int
}

Cada conexión representa un vínculo vivo con otro nodo, llevando no solo datos sino también metadatos sobre confianza y capacidades.

El Ciclo de Vida de las Conexiones

Inicialización y Arranque

func (pc *P2PConnection) Start() error {
fmt.Printf("🔗 Iniciando servidor P2P en puerto %d\n", pc.localPort)

// Iniciar servidor TCP
go pc.startTCPServer()

// Conectar a nodos bootstrap
go pc.connectToBootstrapNodes()

return nil
}

El arranque inicia dos procesos paralelos: un servidor que acepta conexiones entrantes y un cliente que busca conexiones salientes a nodos conocidos.

Servidor TCP: Puerta de Entrada

func (pc *P2PConnection) startTCPServer() {
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", pc.localPort))
if err != nil {
fmt.Printf("❌ Error iniciando servidor TCP en puerto %d: %v\n", pc.localPort, err)
return
}
defer listener.Close()

fmt.Printf("👂 Servidor P2P escuchando en puerto %d\n", pc.localPort)

for {
conn, err := listener.Accept()
if err != nil {
continue
}

// Manejar nueva conexión en goroutine separada
go pc.handleIncomingConnection(conn)
}
}

El servidor TCP actúa como centinela, aceptando conexiones entrantes y delegando su manejo a goroutines dedicadas.

Manejo de Conexiones Entrantes

func (pc *P2PConnection) handleIncomingConnection(conn net.Conn) {
remoteAddr := conn.RemoteAddr().String()
fmt.Printf("🆕 Nueva conexión entrante desde: %s\n", remoteAddr)

// Crear objeto de conexión
connection := &Connection{
ID: fmt.Sprintf("conn-%d", time.Now().UnixNano()),
RemoteAddr: remoteAddr,
Conn: conn,
IsConnected: true,
LastActivity: time.Now(),
TrustScore: 50,
}

// Agregar a conexiones activas
pc.mutex.Lock()
pc.activeConnections[connection.ID] = connection
pc.mutex.Unlock()

// Manejar la conexión
pc.handleConnection(connection)
}

Cada conexión entrante se registra y se maneja en una goroutine dedicada, permitiendo procesamiento concurrente.

Manejo de Conexión Activa

func (pc *P2PConnection) handleConnection(conn *Connection) {
defer func() {
conn.Conn.Close()
conn.IsConnected = false

// Remover de conexiones activas
pc.mutex.Lock()
delete(pc.activeConnections, conn.ID)
pc.mutex.Unlock()

fmt.Printf("🔌 Conexión cerrada: %s\n", conn.RemoteAddr)
}()

// Buffer para mensajes
buffer := make([]byte, 4096)

for {
// Leer mensaje
n, err := conn.Conn.Read(buffer)
if err != nil {
break
}

// Actualizar última actividad
conn.LastActivity = time.Now()

// Procesar mensaje
pc.processMessage(conn, buffer[:n])
}
}

El manejo de conexiones incluye un bucle de lectura que procesa mensajes hasta que la conexión se cierra.

Protocolo de Mensajes

Procesamiento de Mensajes

func (pc *P2PConnection) processMessage(conn *Connection, data []byte) {
var message map[string]interface{}
if err := json.Unmarshal(data, &message); err != nil {
return
}

messageType, ok := message["type"].(string)
if !ok {
return
}

switch messageType {
case "announcement":
pc.handleAnnouncementMessage(conn, data)
case "peer_request":
pc.handlePeerRequestMessage(conn)
case "peer_list":
pc.handlePeerListMessage(conn, data)
case "ping":
pc.handlePingMessage(conn)
default:
fmt.Printf("❓ Mensaje desconocido: %s\n", messageType)
}
}

Los mensajes se clasifican por tipo y se delegan a manejadores específicos.

Intercambio de Anuncios

func (pc *P2PConnection) handleAnnouncementMessage(conn *Connection, data []byte) {
var announcement NodeAnnouncement
if err := json.Unmarshal(data, &announcement); err != nil {
fmt.Printf("❌ Error parseando anuncio: %v\n", err)
return
}

fmt.Printf("📢 Anuncio recibido de %s: %s en %s:%d\n",
conn.RemoteAddr, announcement.Enode.ID, announcement.Enode.IP, announcement.Enode.Ports["p2p"])

// Actualizar capacidades de la conexión
conn.Capabilities = announcement.Enode.Capabilities
}

Los anuncios permiten a los nodos compartir su información y capacidades.

Conexiones Salientes

Conexión a Bootstrap

func (pc *P2PConnection) connectToBootstrapNodes() {
time.Sleep(2 * time.Second) // Esperar a que el servidor esté listo

fmt.Println("🔗 Conectando a nodos bootstrap...")

// Obtener nodos bootstrap del protocolo de descubrimiento
bootstrapNodes := pc.discovery.GetKnownPeers()

for _, peer := range bootstrapNodes {
if peer.IsBootstrap {
fmt.Printf(" 📡 Conectando a bootstrap: %s en %s:%d\n",
peer.Enode.ID, peer.Enode.IP, peer.Enode.Ports["p2p"])

go pc.connectToPeer(peer.NodeAnnouncement)
}
}
}

La conexión a nodos bootstrap asegura que nuevos nodos puedan unirse a la red existente.

Conexión a Peer Específico

func (pc *P2PConnection) connectToPeer(announcement NodeAnnouncement) {
// Verificar que no estemos ya conectados
pc.mutex.RLock()
for _, conn := range pc.activeConnections {
if conn.RemoteAddr == fmt.Sprintf("%s:%d", announcement.Enode.IP, announcement.Enode.Ports["p2p"]) {
pc.mutex.RUnlock()
return // Ya conectado
}
}
pc.mutex.RUnlock()

// Verificar límite de peers
pc.mutex.RLock()
if len(pc.activeConnections) >= pc.maxPeers {
pc.mutex.RUnlock()
fmt.Printf("⚠️ Límite de peers alcanzado (%d), no conectando a %s\n",
pc.maxPeers, announcement.Enode.ID)
return
}
pc.mutex.RUnlock()

// Intentar conectar
addr := net.JoinHostPort(announcement.Enode.IP, fmt.Sprintf("%d", announcement.Enode.Ports["p2p"]))
conn, err := net.Dial("tcp", addr)
if err != nil {
fmt.Printf("❌ Error conectando a %s: %v\n", addr, err)
return
}

fmt.Printf("✅ Conectado exitosamente a: %s (%s)\n", announcement.Enode.ID, addr)

// Crear y registrar conexión
connection := &Connection{
ID: fmt.Sprintf("out-%d", time.Now().UnixNano()),
RemoteAddr: addr,
RemotePort: announcement.Enode.Ports["p2p"],
Conn: conn,
IsConnected: true,
LastActivity: time.Now(),
Capabilities: announcement.Enode.Capabilities,
TrustScore: 50,
}

pc.mutex.Lock()
pc.activeConnections[connection.ID] = connection
pc.mutex.Unlock()

// Enviar anuncio inicial
announcementMsg := map[string]interface{}{
"type": "announcement",
"data": pc.discovery.GetLocalAnnouncement(),
}
pc.sendMessage(connection, announcementMsg)

// Manejar la conexión
pc.handleConnection(connection)
}

Este proceso establece conexiones salientes, registra el peer, y comienza el intercambio de información.

Comunicación y Mensajería

Envío de Mensajes

func (pc *P2PConnection) sendMessage(conn *Connection, message map[string]interface{}) {
data, err := json.Marshal(message)
if err != nil {
return
}

conn.Conn.Write(data)
}

Los mensajes se serializan a JSON y se envían a través de la conexión TCP.

Broadcast a Todos los Peers

func (pc *P2PConnection) BroadcastMessage(message map[string]interface{}) {
pc.mutex.RLock()
defer pc.mutex.RUnlock()

for _, conn := range pc.activeConnections {
if conn.IsConnected {
go pc.sendMessage(conn, message)
}
}
}

El broadcast permite enviar mensajes a todos los peers conectados simultáneamente.

Gestión de Estado

Información de Conexiones

func (pc *P2PConnection) GetActiveConnections() map[string]*Connection {
pc.mutex.RLock()
defer pc.mutex.RUnlock()

result := make(map[string]*Connection)
for k, v := range pc.activeConnections {
result[k] = v
}
return result
}

func (pc *P2PConnection) GetConnectionCount() int {
pc.mutex.RLock()
defer pc.mutex.RUnlock()
return len(pc.activeConnections)
}

Estas funciones proporcionan acceso seguro a la información de conexiones.

Seguridad y Confianza

Sistema de Puntuación

Cada conexión mantiene un TrustScore que se puede usar para evaluar la confiabilidad de los peers:

connection := &Connection{
TrustScore: 50, // Score inicial
}

Validación de Conexiones

  • Verificación de límites de peers
  • Prevención de conexiones duplicadas
  • Manejo seguro de errores
  • Limpieza automática de conexiones cerradas

Arquitectura Concurrente

Thread Safety

Todas las operaciones en P2PConnection usan sync.RWMutex para garantizar thread safety:

pc.mutex.Lock()
// Operaciones críticas
pc.mutex.Unlock()

Goroutines Dedicadas

Cada conexión se maneja en su propia goroutine, permitiendo:

  • Procesamiento concurrente de múltiples conexiones
  • No bloqueo entre conexiones
  • Escalabilidad horizontal

El Rol en el Ecosistema

Integración con Descubrimiento

El sistema de conexiones se integra estrechamente con el protocolo de descubrimiento:

type P2PConnection struct {
discovery *DiscoveryProtocol
// ...
}

Esta integración permite que el descubrimiento de peers se traduzca automáticamente en conexiones activas.

Soporte para Hole Punching

Las conexiones P2P están diseñadas para trabajar con sistemas avanzados de NAT traversal, permitiendo conexiones directas incluso a través de firewalls restrictivos.

Métricas y Monitoreo

Información de Conexión

Cada conexión mantiene metadatos útiles para monitoreo:

  • LastActivity: Para detectar conexiones inactivas
  • Capabilities: Para entender qué servicios ofrece el peer
  • TrustScore: Para evaluación de calidad de conexión
  • IsConnected: Estado actual de la conexión

El Arte de la Conectividad

Cada conexión P2P en KodeChain representa un hilo en la tela de la red distribuida. Desde el primer handshake TCP hasta el intercambio continuo de mensajes, estas conexiones transforman nodos individuales en una red colaborativa.

El diseño de p2p/connection.go no es solo código; es la arquitectura que hace posible la promesa de la computación distribuida: nodos individuales uniéndose para crear algo mayor que la suma de sus partes.

En KodeChain, las conexiones P2P no son meros pipes de datos; son los vasos sanguíneos que mantienen viva la red, llevando información, confianza y colaboración a través del vasto organismo distribuido.