From 91e0594d711618c373cee030fbb3646f84b6ff2d Mon Sep 17 00:00:00 2001 From: jared Date: Wed, 21 Jan 2026 01:04:50 -0500 Subject: [PATCH] switch to network.framework --- AtTable.xcodeproj/project.pbxproj | 4 +- AtTable/AtTableApp.swift | 4 +- AtTable/ChatView.swift | 63 +- AtTable/MeshNetworkManager.swift | 163 +++- AtTable/MultipeerSession.swift | 1191 --------------------------- AtTable/NetworkFraming.swift | 88 ++ AtTable/PeerUser.swift | 17 +- MPC_how_it_works.md | 255 ------ transition.md | 1257 +++++++++++++++++++++++++++++ 9 files changed, 1525 insertions(+), 1517 deletions(-) delete mode 100644 AtTable/MultipeerSession.swift create mode 100644 AtTable/NetworkFraming.swift delete mode 100644 MPC_how_it_works.md create mode 100644 transition.md diff --git a/AtTable.xcodeproj/project.pbxproj b/AtTable.xcodeproj/project.pbxproj index 9e43e3a..ef53bbf 100644 --- a/AtTable.xcodeproj/project.pbxproj +++ b/AtTable.xcodeproj/project.pbxproj @@ -272,7 +272,7 @@ "$(inherited)", "@executable_path/Frameworks", ); - MARKETING_VERSION = 1.2; + MARKETING_VERSION = 1.3; PRODUCT_BUNDLE_IDENTIFIER = com.jaredlog.AtTable; PRODUCT_NAME = "$(TARGET_NAME)"; STRING_CATALOG_GENERATE_SYMBOLS = YES; @@ -314,7 +314,7 @@ "$(inherited)", "@executable_path/Frameworks", ); - MARKETING_VERSION = 1.2; + MARKETING_VERSION = 1.3; PRODUCT_BUNDLE_IDENTIFIER = com.jaredlog.AtTable; PRODUCT_NAME = "$(TARGET_NAME)"; STRING_CATALOG_GENERATE_SYMBOLS = YES; diff --git a/AtTable/AtTableApp.swift b/AtTable/AtTableApp.swift index 562a918..c00c06f 100644 --- a/AtTable/AtTableApp.swift +++ b/AtTable/AtTableApp.swift @@ -15,7 +15,7 @@ struct AtTableApp: App { @AppStorage("userColorHex") var userColorHex: String = "#00008B" // APP-LEVEL SESSION: Ensures stable identity (Instance ID) across view reloads - @StateObject var multipeerSession = MultipeerSession() + @StateObject var meshManager = MeshNetworkManager() init() { // Reset navigation state on launch so users always see the "Login" screen first. @@ -41,7 +41,7 @@ struct AtTableApp: App { ) } } - .environmentObject(multipeerSession) + .environmentObject(meshManager) } } } diff --git a/AtTable/ChatView.swift b/AtTable/ChatView.swift index 7d84ec2..533ba36 100644 --- a/AtTable/ChatView.swift +++ b/AtTable/ChatView.swift @@ -1,9 +1,8 @@ import SwiftUI -import MultipeerConnectivity import Combine struct ChatView: View { - @EnvironmentObject var multipeerSession: MultipeerSession + @EnvironmentObject var meshManager: MeshNetworkManager @StateObject var speechRecognizer = SpeechRecognizer() @ObservedObject var networkMonitor = NetworkMonitor.shared @@ -30,7 +29,7 @@ struct ChatView: View { HStack { Button(action: { speechRecognizer.stopRecording() - multipeerSession.stop() + meshManager.stop() isOnboardingComplete = false }) { HStack(spacing: 6) { @@ -53,7 +52,7 @@ struct ChatView: View { .tracking(2) .foregroundColor(.white.opacity(0.6)) - Text("\(multipeerSession.connectedPeers.count) Active") + Text("\(meshManager.connectedPeerUsers.count) Active") .font(.caption2) .fontWeight(.bold) .foregroundColor(.green) @@ -68,7 +67,7 @@ struct ChatView: View { .padding(.bottom, 10) // 3. Peer Status / Live Transcriptions - if multipeerSession.connectedPeers.isEmpty { + if meshManager.connectedPeerUsers.isEmpty { VStack { Spacer() @@ -100,11 +99,11 @@ struct ChatView: View { } } else { // Connected Peers Row (Small Pills) - if !multipeerSession.connectedPeerUsers.isEmpty { + if !meshManager.connectedPeerUsers.isEmpty { HStack(spacing: 8) { ScrollView(.horizontal, showsIndicators: false) { HStack(spacing: 12) { - ForEach(multipeerSession.connectedPeerUsers, id: \.self) { peer in + ForEach(meshManager.connectedPeerUsers, id: \.self) { peer in HStack(spacing: 6) { Circle() .fill(Color(hex: peer.colorHex)) @@ -124,7 +123,7 @@ struct ChatView: View { } // Full Mesh Indicator - if multipeerSession.isAtCapacity { + if meshManager.isAtCapacity { Text("FULL") .font(.system(size: 10, weight: .black, design: .monospaced)) .foregroundColor(.orange) @@ -142,10 +141,10 @@ struct ChatView: View { ScrollViewReader { proxy in ScrollView { LazyVStack(spacing: 12) { - ForEach(multipeerSession.receivedMessages) { message in + ForEach(meshManager.receivedMessages) { message in MessageBubble( message: message, - isMyMessage: message.senderNodeID == multipeerSession.myNodeIDPublic + isMyMessage: message.senderNodeID == meshManager.myNodeIDPublic ) .id(message.id) .transition(.opacity.combined(with: .scale(scale: 0.95))) @@ -154,25 +153,25 @@ struct ChatView: View { .padding(.bottom, 20) } .scrollIndicators(.hidden) - .onChange(of: multipeerSession.receivedMessages.count) { - if let lastId = multipeerSession.receivedMessages.last?.id { + .onChange(of: meshManager.receivedMessages.count) { + if let lastId = meshManager.receivedMessages.last?.id { withAnimation { proxy.scrollTo(lastId, anchor: .bottom) } } if userRole == .deaf { - if let lastMsg = multipeerSession.receivedMessages.last, lastMsg.senderNodeID != multipeerSession.myNodeIDPublic { + if let lastMsg = meshManager.receivedMessages.last, lastMsg.senderNodeID != meshManager.myNodeIDPublic { let generator = UINotificationFeedbackGenerator() generator.notificationOccurred(.success) } } } // Auto-scroll when transcription card appears to prevent blocking - .onChange(of: multipeerSession.liveTranscripts.isEmpty) { - if !multipeerSession.liveTranscripts.isEmpty { + .onChange(of: meshManager.liveTranscripts.isEmpty) { + if !meshManager.liveTranscripts.isEmpty { DispatchQueue.main.asyncAfter(deadline: .now() + 0.1) { - if let lastId = multipeerSession.receivedMessages.last?.id { + if let lastId = meshManager.receivedMessages.last?.id { withAnimation { proxy.scrollTo(lastId, anchor: .bottom) } @@ -185,11 +184,11 @@ struct ChatView: View { // Live Transcription Cards (Moved to Bottom) // Live Transcription Cards (Moved to Bottom, Stacked Vertically) - if userRole == .deaf && !multipeerSession.liveTranscripts.isEmpty { + if userRole == .deaf && !meshManager.liveTranscripts.isEmpty { VStack(spacing: 8) { - ForEach(multipeerSession.liveTranscripts.sorted(by: { $0.key < $1.key }), id: \.key) { nodeIDKey, text in + ForEach(meshManager.liveTranscripts.sorted(by: { $0.key < $1.key }), id: \.key) { nodeIDKey, text in // Look up friendly name from connectedPeerUsers by nodeID - let displayName = multipeerSession.connectedPeerUsers.first(where: { $0.nodeID == nodeIDKey })?.name ?? nodeIDKey + let displayName = meshManager.connectedPeerUsers.first(where: { $0.nodeID == nodeIDKey })?.name ?? nodeIDKey VStack(alignment: .leading, spacing: 6) { Text(displayName) .font(.caption2) @@ -242,11 +241,11 @@ struct ChatView: View { .ignoresSafeArea(.container, edges: .all) .onAppear { UIApplication.shared.isIdleTimerDisabled = true - multipeerSession.start() + meshManager.start() // Re-apply identity if needed (though App state should handle it) - if multipeerSession.userName != userName { - multipeerSession.setIdentity(name: userName, colorHex: userColorHex, role: userRole) + if meshManager.userName != userName { + meshManager.setIdentity(name: userName, colorHex: userColorHex, role: userRole) } // Configure Speech Recognizer callback @@ -254,8 +253,8 @@ struct ChatView: View { let message = MeshMessage( id: UUID(), senderID: userName, - senderNodeID: multipeerSession.myNodeIDPublic, - senderInstance: multipeerSession.myInstancePublic, + senderNodeID: meshManager.myNodeIDPublic, + senderInstance: meshManager.myInstancePublic, senderRole: userRole, senderColorHex: userColorHex, content: resultText, @@ -263,7 +262,7 @@ struct ChatView: View { isPartial: false, timestamp: Date() ) - multipeerSession.send(message: message) + meshManager.send(message: message) } // Configure Partial Result callback (No Throttling) @@ -271,8 +270,8 @@ struct ChatView: View { let message = MeshMessage( id: UUID(), senderID: userName, - senderNodeID: multipeerSession.myNodeIDPublic, // Added NodeID for consistent transcript keying - senderInstance: multipeerSession.myInstancePublic, + senderNodeID: meshManager.myNodeIDPublic, // Added NodeID for consistent transcript keying + senderInstance: meshManager.myInstancePublic, senderRole: userRole, senderColorHex: userColorHex, content: partialText, @@ -280,7 +279,7 @@ struct ChatView: View { isPartial: true, timestamp: Date() ) - multipeerSession.send(message: message) + meshManager.send(message: message) } // Auto-start recording for Hearing users @@ -299,8 +298,8 @@ struct ChatView: View { let message = MeshMessage( id: UUID(), senderID: userName, - senderNodeID: multipeerSession.myNodeIDPublic, // Added NodeID - senderInstance: multipeerSession.myInstancePublic, + senderNodeID: meshManager.myNodeIDPublic, // Added NodeID + senderInstance: meshManager.myInstancePublic, senderRole: userRole, senderColorHex: userColorHex, content: messageText, @@ -309,7 +308,7 @@ struct ChatView: View { timestamp: Date() ) - multipeerSession.send(message: message) + meshManager.send(message: message) messageText = "" } @@ -320,4 +319,4 @@ struct ChatView: View { speechRecognizer.startRecording() } } -} +} \ No newline at end of file diff --git a/AtTable/MeshNetworkManager.swift b/AtTable/MeshNetworkManager.swift index 99d5232..a3aae53 100644 --- a/AtTable/MeshNetworkManager.swift +++ b/AtTable/MeshNetworkManager.swift @@ -56,12 +56,17 @@ class MeshNetworkManager: NSObject, ObservableObject { // Pending Connections (to avoid duplicates) private var pendingConnections: Set = [] + // Store endpoints for retry + private var knownEndpoints: [String: NWEndpoint] = [:] + // Internal Peer Map (before they become "connectedPeerUsers" or while unstable) // nodeID -> PeerUser private var internalPeerMap: [String: PeerUser] = [:] private var isRunning = false - private let log = Logger() + private lazy var log = Logger(nameProvider: { [weak self] in + self?.myUserName ?? UIDevice.current.name + }) // MARK: - Initialization @@ -200,6 +205,23 @@ class MeshNetworkManager: NSObject, ObservableObject { } private func handleIncomingConnection(_ connection: NWConnection) { + // Add state handler to detect disconnections + // Note: We don't know the peerNodeID yet until we receive identification + // So we'll add a generic handler that looks up the connection + connection.stateUpdateHandler = { [weak self] state in + guard let self = self else { return } + switch state { + case .failed(let error): + self.log.error("Incoming connection failed: \(error)") + self.handleIncomingConnectionClosed(connection) + case .cancelled: + self.log.info("Incoming connection cancelled") + self.handleIncomingConnectionClosed(connection) + default: + break + } + } + connection.start(queue: .main) // Start receiving immediately to get the Identification message @@ -207,16 +229,38 @@ class MeshNetworkManager: NSObject, ObservableObject { self?.handleData(data, from: connection) }, onError: { [weak self] error in self?.log.error("Receive error on incoming connection: \(error)") - connection.cancel() + self?.handleIncomingConnectionClosed(connection) }) } + /// Clean up an incoming connection by finding which peer it belongs to + private func handleIncomingConnectionClosed(_ connection: NWConnection) { + connection.cancel() + + // Find which peer this connection belongs to + for (nodeID, conn) in connections { + if conn === connection { + log.info("Cleaning up connection for peer: \(nodeID)") + connections.removeValue(forKey: nodeID) + pendingConnections.remove(nodeID) + stableNodeIDs.remove(nodeID) + internalPeerMap.removeValue(forKey: nodeID) + updatePublicPeerList() + break + } + } + } + // MARK: - Browser (Client) private func startBrowser() { let parameters = createParameters() browser = NWBrowser(for: .bonjour(type: serviceType, domain: nil), using: parameters) + browser?.stateUpdateHandler = { [weak self] state in + self?.log.info("Browser state: \(state)") + } + browser?.browseResultsChangedHandler = { [weak self] results, changes in self?.handleBrowseResults(results, changes: changes) } @@ -231,7 +275,13 @@ class MeshNetworkManager: NSObject, ObservableObject { handleDiscovered(result) case .removed(let result): handleLost(result) - default: break + case .changed(old: _, new: let newResult, flags: _): + // When a peer rejoins, the browser may report this as a "changed" event + // (same service name but updated TXT metadata with new instance). + // Treat it as a new discovery to check instance and reconnect if needed. + handleDiscovered(newResult) + default: + break } } } @@ -249,37 +299,61 @@ class MeshNetworkManager: NSObject, ObservableObject { guard peerNodeID != myNodeID else { return } - // GHOST FILTER: Check if we know a newer instance + // Check if we need to handle reconnection + // NOTE: TXT record often isn't available in browse results (peerInstance=0) + // so we can't reliably ghost-filter here. Let handleIdentification do that. + if let existingInst = peerInstances[peerNodeID] { - if peerInstance < existingInst { - log.info("Ignoring older instance for \(peerNodeID): \(peerInstance) < \(existingInst)") + // If discovered instance is 0, TXT record wasn't available + // DON'T close existing connections - they might be valid! + if peerInstance == 0 { + // Keep existing connections, allow new connections only if none exist + } else if peerInstance > existingInst { + // We have a valid newer instance from TXT - this is reliable, reconnect + if let conn = connections[peerNodeID] { + conn.cancel() + connections.removeValue(forKey: peerNodeID) + pendingConnections.remove(peerNodeID) + stableNodeIDs.remove(peerNodeID) + internalPeerMap.removeValue(forKey: peerNodeID) + } + peerInstances[peerNodeID] = peerInstance + } else if peerInstance < existingInst && peerInstance != 0 { + // Valid older instance - reject (ghost) return - } else if peerInstance > existingInst { - log.info("Discovered NEWER instance \(peerInstance) > \(existingInst) for \(peerNodeID). Disconnecting stale connection.") - // Force disconnect old connection to allow re-initiation - if let conn = connections[peerNodeID] { - conn.cancel() - connections.removeValue(forKey: peerNodeID) - pendingConnections.remove(peerNodeID) - stableNodeIDs.remove(peerNodeID) - internalPeerMap.removeValue(forKey: peerNodeID) - } - } + } } - log.info("Discovered \(peerNodeID) (inst: \(peerInstance))") + // Store endpoint for potential retry + knownEndpoints[peerNodeID] = result.endpoint - // Tie-Breaker: Lower ID initiates connection - if myNodeID < peerNodeID { + // Check if we already have a connection or one is pending + if connections[peerNodeID] != nil || pendingConnections.contains(peerNodeID) { + return + } + + // Determine if we should initiate connection using tie-breaker + // Even when reconnecting, respect the tie-breaker to avoid both sides connecting + let shouldInitiate = myNodeID < peerNodeID + + if shouldInitiate { initiateConnection(to: result.endpoint, peerNodeID: peerNodeID) - } else { - log.info("Waiting for \(peerNodeID) to connect (I have higher ID)") } } private func handleLost(_ result: NWBrowser.Result) { - if case .service(let name, _, _, _) = result.endpoint { - log.info("Browser lost service: \(name)") + guard case .service(let name, _, _, _) = result.endpoint else { return } + + // The service name is the nodeID - clean up connection if exists + let peerNodeID = name + if let conn = connections[peerNodeID] { + conn.cancel() + connections.removeValue(forKey: peerNodeID) + pendingConnections.remove(peerNodeID) + stableNodeIDs.remove(peerNodeID) + internalPeerMap.removeValue(forKey: peerNodeID) + // Note: Don't remove from peerInstances - we need it for ghost filtering on reconnect + updatePublicPeerList() } } @@ -290,7 +364,6 @@ class MeshNetworkManager: NSObject, ObservableObject { guard !pendingConnections.contains(peerNodeID) else { return } pendingConnections.insert(peerNodeID) - log.info("Initiating connection to \(peerNodeID)") let parameters = createParameters() let connection = NWConnection(to: endpoint, using: parameters) @@ -302,6 +375,36 @@ class MeshNetworkManager: NSObject, ObservableObject { connection.start(queue: .main) connections[peerNodeID] = connection + // Add timeout for connection - if not ready within 5 seconds, cancel and retry + DispatchQueue.main.asyncAfter(deadline: .now() + 5) { [weak self] in + guard let self = self else { return } + // Check if connection is still pending (not yet stabilized) + if self.pendingConnections.contains(peerNodeID) && !self.stableNodeIDs.contains(peerNodeID) { + if let conn = self.connections[peerNodeID] { + conn.cancel() + } + self.connections.removeValue(forKey: peerNodeID) + self.pendingConnections.remove(peerNodeID) + // Don't remove from peerInstances - keep for ghost filtering + + // Schedule a retry after 1 second if we still have the endpoint + DispatchQueue.main.asyncAfter(deadline: .now() + 1) { [weak self] in + guard let self = self, self.isRunning else { return } + // Only retry if we still don't have a connection and we should initiate + if self.connections[peerNodeID] == nil && + !self.pendingConnections.contains(peerNodeID) && + !self.stableNodeIDs.contains(peerNodeID) { + if let endpoint = self.knownEndpoints[peerNodeID] { + // Check tie-breaker + if self.myNodeID < peerNodeID { + self.initiateConnection(to: endpoint, peerNodeID: peerNodeID) + } + } + } + } + } + } + NetworkFraming.startReceiving(from: connection, onData: { [weak self] data in self?.handleData(data, from: connection) }, onError: { [weak self] error in @@ -452,7 +555,13 @@ class MeshNetworkManager: NSObject, ObservableObject { // MARK: - Logger Helper struct Logger { - func info(_ msg: String) { print("πŸ•ΈοΈ [Mesh] \(msg)") } - func error(_ msg: String) { print("πŸ•ΈοΈ ❌ [Mesh] \(msg)") } + let nameProvider: () -> String + + init(nameProvider: @escaping () -> String = { UIDevice.current.name }) { + self.nameProvider = nameProvider + } + + func info(_ msg: String) { print("πŸ•ΈοΈ [\(nameProvider())] \(msg)") } + func error(_ msg: String) { print("πŸ•ΈοΈ ❌ [\(nameProvider())] \(msg)") } } } \ No newline at end of file diff --git a/AtTable/MultipeerSession.swift b/AtTable/MultipeerSession.swift deleted file mode 100644 index ea36bce..0000000 --- a/AtTable/MultipeerSession.swift +++ /dev/null @@ -1,1191 +0,0 @@ -import MultipeerConnectivity -import SwiftUI -import Combine -import os - -class MultipeerSession: NSObject, ObservableObject { - private let serviceType = Constants.serviceType - - // Services - private var myPeerId: MCPeerID? // Made optional to prevent cleanup/restart crashes - private var serviceAdvertiser: MCNearbyServiceAdvertiser! - private var serviceBrowser: MCNearbyServiceBrowser! - private var session: MCSession? // Already optional? No, it was ! before. Let's check. - private var myDiscoveryInfo: [String: String]? // Store my info for tie-breaking - private var isRestarting = false // Debounce flag - private var isStarted = false // Debounce flag to prevent duplicate start() calls - private var delayedRestartItem: DispatchWorkItem? // Item for delayed recovery logic - private var keepAliveTimer: Timer? // Heartbeat to maintain AWDL links - private let log = Logger() - - // Node Identity (stable per install + monotonic instance per session) - private var myNodeID: String = "" - private var myInstance: Int = 0 - - // Per-peer state for robust mesh handling (keyed by nodeID) - private var latestByNodeID: [String: (peerID: MCPeerID, instance: Int)] = [:] - private var pendingInvites: [String: MCPeerID] = [:] // nodeID -> peerID for active invites (enables cancellation) - private var cooldownUntil: [String: Date] = [:] // nodeIDs to temporarily ignore - private var connectingTimeoutTimers: [MCPeerID: DispatchWorkItem] = [:] // Per-peer watchdogs - private var stabilityTimers: [String: Timer] = [:] // nodeID -> Timer (15s reset delay) - private var peerNodeIDMap: [MCPeerID: String] = [:] // Map MCPeerID -> nodeID for lookups - - // Poisoned state detection - private var consecutiveFailures: [String: Int] = [:] // Track failures per nodeID - private static let poisonedThreshold = 5 // N consecutive failures = poisoned (higher for AWDL flakiness) - - // State - @Published var connectedPeers: [MCPeerID] = [] - @Published var connectedPeerUsers: [PeerUser] = [] - @Published var receivedMessages: [MeshMessage] = [] - @Published var liveTranscripts: [String: String] = [:] // Keyed by senderNodeID for reliable identity - - // Mesh Capacity Limit (MPC is unstable beyond 7-8 peers) - static let maxPeers = 7 - var isAtCapacity: Bool { connectedPeers.count >= Self.maxPeers } - - // Expose nodeID for UI to determine message ownership - var myNodeIDPublic: String { myNodeID } - var myInstancePublic: Int { myInstance } - - // User Details Wrappers - var userName: String { myUserName } - var userColor: String { myUserColor } - var userRole: UserRole { myUserRole } - - // User Details - private var myUserName: String = UIDevice.current.name - private var myUserColor: String = "#3357FF" - private var myUserRole: UserRole = .hearing - - // Dynamic Identity Update (since Session persists longer than Onboarding) - func setIdentity(name: String, colorHex: String, role: UserRole) { - self.myUserName = name - self.myUserColor = colorHex - self.myUserRole = role - // Note: Changing identity doesn't restart services automatically - // but new connections will see updated info. - // For deeper updates, we'd need to re-create peerID, but simpler is better here. - } - - override init() { - super.init() - ensureIdentity() - // Services now started manually via start() to support app-level lifecycle - } - - // MARK: - Lifecycle Control - - func start() { - // Ensure session is set up (fresh session on every start) - if session == nil { - setupSession() - } - startServices() - } - - func stop() { - // Reuse the robust disconnect logic to ensure all state/UI is cleared - disconnect() - } - - - - private func ensureIdentity() { - // Generate/retrieve stable identity ONCE per app launch - // OR recreate if myPeerId was wiped (e.g. after disconnect/leave) - if myNodeID.isEmpty { - myNodeID = NodeIdentity.nodeID - myInstance = NodeIdentity.nextInstance() - - // Create fresh ID (but keep displayName stable for this app run if possible) - let peerID = MCPeerID(displayName: myUserName) - self.myPeerId = peerID - } else if myPeerId == nil { - // RECOVERY: NodeID exists but PeerID is gone (disconnect called). Recreate it. - log.info("Identity Recovery: Recreating MCPeerID for existing NodeID \(self.myNodeID)") - let peerID = MCPeerID(displayName: myUserName) - self.myPeerId = peerID - } - } - - private func setupSession() { - guard let peerID = self.myPeerId else { - log.error("Attempted to setup session without identity!") - ensureIdentity() - // CRITICAL FIX: After identity recovery, we MUST complete the setup. - // Recursively call setupSession now that myPeerId exists. - setupSession() - return - } - - // Create fresh Session - // Encryption .none is recommended for reliable AWDL (mixed network) connections to avoid handshake timeouts - session = MCSession(peer: peerID, securityIdentity: nil, encryptionPreference: .none) - session?.delegate = self - - // Create fresh Advertiser & Browser with STABLE NODE IDENTITY - // nodeID is stable per install, instance increments per session start - // This allows other devices to reliably identify us and filter ghost peers - let discoveryInfo: [String: String] = [ - "nodeID": myNodeID, - "instance": String(myInstance), - "ts": String(Date().timeIntervalSince1970) - ] - self.myDiscoveryInfo = discoveryInfo - - serviceAdvertiser = MCNearbyServiceAdvertiser( - peer: peerID, - discoveryInfo: discoveryInfo, - serviceType: serviceType - ) - serviceAdvertiser.delegate = self - - serviceBrowser = MCNearbyServiceBrowser(peer: peerID, serviceType: serviceType) - serviceBrowser.delegate = self - - log.info("Session setup complete with nodeID: \(self.myNodeID) instance: \(self.myInstance) peerID: \(peerID.displayName)") - } - - private func startServices() { - guard serviceAdvertiser != nil, serviceBrowser != nil else { return } - serviceBrowser.startBrowsingForPeers() - - // Delay advertising to ensure MCSession listener is ready - // Reduced to 0.5s (was 2.0s) - 2.0s was causing "Connection Refused" because - // peers tried to connect before we were ready. 0.5s is sufficient for cleanup. - DispatchQueue.main.asyncAfter(deadline: .now() + 0.5) { [weak self] in - guard let self = self else { return } - self.serviceAdvertiser?.startAdvertisingPeer() - self.log.info("Advertising started (delayed 0.5s for listener warmup)") - } - } - - private func stopServices() { - serviceAdvertiser?.stopAdvertisingPeer() - serviceBrowser?.stopBrowsingForPeers() - } - - // Helper to refresh service state without killing connections - // We restart BOTH Advertiser and Browser to flush stale routes/caches in the OS/AWDL stack - private func restartServices(forcePoisonedRecovery: Bool = false) { - DispatchQueue.main.async { [weak self] in - guard let self = self else { return } - - // Debounce: If already restarting, ignore this request - guard !self.isRestarting else { - self.log.info("Services already restarting, skipping request.") - return - } - - // Cancel any pending delayed restart since we are restarting now - self.delayedRestartItem?.cancel() - self.delayedRestartItem = nil - - // Cancel all per-peer timers - for (_, timer) in self.connectingTimeoutTimers { - timer.cancel() - } - self.connectingTimeoutTimers.removeAll() - - // Stop keep-alive to prevent multiple timers if restart happens while peers present - self.stopKeepAlive() - - self.isRestarting = true - - // CLEAR PER-PEER STATE: Remove stale peer info from previous sessions - self.latestByNodeID.removeAll() - self.pendingInvites.removeAll() - self.peerNodeIDMap.removeAll() - // Keep cooldownUntil - we want to remember temporarily-banned peers - // Keep consecutiveFailures - we need this history for poisoned state detection - - self.log.info("Restarting services to flush stale peers/routes...") - self.serviceBrowser?.stopBrowsingForPeers() - self.serviceAdvertiser?.stopAdvertisingPeer() - - // Add RANDOM JITTER to break synchronized restart loops - let jitter = Double.random(in: 0.0...1.0) - // OPTIMIZATION: Faster restart on Wi-Fi (1.0s base) vs AWDL (1.0s base) - // Increased to 1.0s to ensure socket cleanup and prevent "Connection refused" loops - let baseDelay = 1.0 - let totalDelay = baseDelay + jitter - self.log.info("Restart delay: \(String(format: "%.2f", totalDelay))s (with jitter)") - - DispatchQueue.main.asyncAfter(deadline: .now() + totalDelay) { [weak self] in - guard let self = self else { return } - - // Clear delegate BEFORE disconnecting to prevent zombie callbacks - self.session?.delegate = nil - self.session?.disconnect() - self.session = nil - - // POISONED STATE: Only rebuild MCPeerID if in poisoned state - // Otherwise keep same PeerID to reduce ghost artifacts on other devices - if forcePoisonedRecovery { - self.log.info("POISONED STATE detected. Full reset with new MCPeerID.") - self.consecutiveFailures.removeAll() // Reset failure tracking - self.cooldownUntil.removeAll() // Reset cooldowns - self.setupSession() // New MCPeerID + new instance - } else { - // Keep MCPeerID stable, just increment instance and rebuild session - self.myInstance = NodeIdentity.nextInstance() - - let discoveryInfo: [String: String] = [ - "nodeID": self.myNodeID, - "instance": String(self.myInstance), - "ts": String(Date().timeIntervalSince1970) - ] - self.myDiscoveryInfo = discoveryInfo - - // Recreate session with same PeerID - guard let peerID = self.myPeerId else { - self.log.error("Restart failed: myPeerId is nil. Re-running setup.") - self.setupSession() // Fallback to full setup - return - } - - self.session = MCSession(peer: peerID, securityIdentity: nil, encryptionPreference: .none) - self.session?.delegate = self - - // Recreate advertiser/browser with new discovery info - self.serviceAdvertiser = MCNearbyServiceAdvertiser( - peer: peerID, - discoveryInfo: discoveryInfo, - serviceType: self.serviceType - ) - self.serviceAdvertiser.delegate = self - - self.serviceBrowser = MCNearbyServiceBrowser(peer: peerID, serviceType: self.serviceType) - self.serviceBrowser.delegate = self - - self.log.info("Services rebuilt with same PeerID, new instance: \(self.myInstance)") - } - - self.serviceBrowser?.startBrowsingForPeers() - - // 0.5s delay to ensure old listener is fully unbound - DispatchQueue.main.asyncAfter(deadline: .now() + 0.5) { [weak self] in - guard let self = self else { return } - guard self.isRestarting else { return } // Abort if cancelled - - self.serviceAdvertiser?.startAdvertisingPeer() - self.log.info("Advertising restarted (delayed 0.5s)") - - self.isRestarting = false - } - } - } - } - - func updateMyDetails(name: String, color: String, role: UserRole) { - self.myUserName = name - self.myUserColor = color - self.myUserRole = role - - // If connected, resend handshake with new details - if let session = session, !session.connectedPeers.isEmpty { - sendHandshake() - } - } - - func disconnect() { - // 1. Cancel any pending recovery task immediately - self.delayedRestartItem?.cancel() - self.delayedRestartItem = nil - - // 2. Cancel all per-peer timers to prevent zombie callbacks - for (_, timer) in connectingTimeoutTimers { - timer.cancel() - } - connectingTimeoutTimers.removeAll() - - // Stop Keep-Alive heartbeat - stopKeepAlive() - - stopServices() - serviceAdvertiser?.delegate = nil - serviceBrowser?.delegate = nil - session?.delegate = nil - session?.disconnect() - - // Clear references - serviceAdvertiser = nil - serviceBrowser = nil - session = nil - myPeerId = nil - - // 3. Clear per-peer state - pendingInvites.removeAll() - latestByNodeID.removeAll() - peerNodeIDMap.removeAll() - cooldownUntil.removeAll() - consecutiveFailures.removeAll() - - // 4. Reset flags - isStarted = false - isRestarting = false - log.info("Services stopped, all timers cancelled, per-peer state cleared.") - - DispatchQueue.main.async { - self.connectedPeers.removeAll() - self.connectedPeerUsers.removeAll() - self.receivedMessages.removeAll() - self.liveTranscripts.removeAll() - } - } - - func send(message: MeshMessage) { - guard let session = session, !session.connectedPeers.isEmpty else { return } - - // Enforce senderNodeID on all outgoing messages - var msg = message - if msg.senderNodeID.isEmpty { - msg.senderNodeID = myNodeID - } - - do { - let data = try JSONEncoder().encode(msg) - // Use unreliable delivery for partials to reduce overhead/latency - let mode: MCSessionSendDataMode = msg.isPartial ? .unreliable : .reliable - try session.send(data, toPeers: session.connectedPeers, with: mode) - - // Handle local state updates - DispatchQueue.main.async { - if msg.isHandshake { - // Don't show handshake in messages - } else if msg.isPartial { - // For partials, we don't usually show our own in the live row - } else if msg.isKeepAlive { - // Don't show keep-alive heartbeats in messages - } else { - self.receivedMessages.append(msg) - } - } - } catch { - log.error("Error sending message: \(error.localizedDescription)") - } - } - - private func sendHandshake() { - let message = MeshMessage( - id: UUID(), - senderID: myUserName, - senderNodeID: myNodeID, - senderInstance: myInstance, // CRITICAL: Authoritative instance for ghost filtering - senderRole: myUserRole, - senderColorHex: myUserColor, - content: "Handshake", - isTranscription: false, - isPartial: false, - isHandshake: true, - timestamp: Date() - ) - send(message: message) - } - - // MARK: - Keep-Alive (Maintain AWDL Links) - - private func startKeepAlive() { - stopKeepAlive() // Clear any existing timer - keepAliveTimer = Timer.scheduledTimer(withTimeInterval: 10.0, repeats: true) { [weak self] _ in - self?.sendKeepAlive() - } - log.info("Keep-Alive timer started (10s interval).") - } - - private func stopKeepAlive() { - keepAliveTimer?.invalidate() - keepAliveTimer = nil - } - - private func sendKeepAlive() { - guard let session = session, !session.connectedPeers.isEmpty else { return } - let message = MeshMessage( - id: UUID(), - senderID: myUserName, - senderNodeID: myNodeID, - senderRole: myUserRole, - senderColorHex: myUserColor, - content: "πŸ’“", - isTranscription: false, - isPartial: false, - isHandshake: false, - isKeepAlive: true, - connectedNodeIDs: self.connectedPeerUsers.map { $0.nodeID }.filter { !$0.isEmpty }, // Gossip: Share who we know - timestamp: Date() - ) - send(message: message) - } - - // MARK: - Helper Methods - - /// Calculate exponential backoff for a peer based on failure count - /// 3s β†’ 6s β†’ 12s β†’ max 30s - private func calculateBackoff(failures: Int) -> TimeInterval { - // Base = 0.5s (was 2.0s). Max = 30s. - // 1: 0.5s - // 2: 1.0s - // 3: 2.0s - // 4: 4.0s - // ... - let baseDelay: TimeInterval = 0.5 - let exponential = pow(2.0, Double(failures - 1)) - let delay = baseDelay * exponential - return min(delay, 30.0) - } - - private func createInviteContext() -> Data? { - return try? JSONEncoder().encode([ - "nodeID": self.myNodeID, - "instance": String(self.myInstance) - ]) - } - - /// Schedule a restart only if the mesh stays empty for 2 seconds - private func scheduleRestartIfStillEmpty() { - delayedRestartItem?.cancel() - let item = DispatchWorkItem { [weak self] in - guard let self = self else { return } - guard self.connectedPeers.isEmpty else { - self.log.info("Skipping restart - we now have connections.") - return - } - // NO-OP: We disabled the aggressive restart because it causes infinite loops ("Death Spiral") - // when one peer is stuck in a bad state. It causes Instance ID cycling which confuses other peers. - // We now rely purely on "Smart Retry" to reconnect us. - self.log.info("Mesh still empty. Skipping delayed restart to preserve stability (Smart Retry handles reconnection).") - // self.restartServices() // DISABLED - } - delayedRestartItem = item - DispatchQueue.main.asyncAfter(deadline: .now() + 2.0, execute: item) - } - - deinit { - disconnect() - } -} - -extension MultipeerSession: MCNearbyServiceAdvertiserDelegate { - func advertiser(_ advertiser: MCNearbyServiceAdvertiser, didNotStartAdvertisingPeer error: Error) { - log.error("ServiceAdvertiser didNotStartAdvertisingPeer: \(error.localizedDescription)") - } - - func advertiser(_ advertiser: MCNearbyServiceAdvertiser, didReceiveInvitationFromPeer peerID: MCPeerID, withContext context: Data?, invitationHandler: @escaping (Bool, MCSession?) -> Void) { - DispatchQueue.main.async { [weak self] in - guard let self = self else { return } - // IDENTITY CHECK: Ignore stale advertisers - guard advertiser == self.serviceAdvertiser else { return } - - self.log.info("didReceiveInvitationFromPeer \(peerID)") - - // Extract nodeID from invitation context (if provided by inviter) - if let context = context, - let contextDict = try? JSONDecoder().decode([String: String].self, from: context), - let inviterNodeID = contextDict["nodeID"], - let inviterInstanceStr = contextDict["instance"], - let inviterInstance = Int(inviterInstanceStr) { - // Map peerID -> nodeID immediately - self.peerNodeIDMap[peerID] = inviterNodeID - - // Ghost filtering: reject invitations from older instances - if let existing = self.latestByNodeID[inviterNodeID] { - if inviterInstance > existing.instance { - self.latestByNodeID[inviterNodeID] = (peerID, inviterInstance) - self.log.info("Invitation context: updated to newer instance \(inviterInstance) for \(inviterNodeID)") - } else if inviterInstance < existing.instance { - // REJECT GHOST INVITATION: We've seen a newer instance via discovery - self.log.info("Rejecting GHOST invitation from \(inviterNodeID) instance \(inviterInstance) (we know instance \(existing.instance))") - invitationHandler(false, nil) - return - } else { - // Same instance - could be same peer or MCPeerID mismatch, accept it - self.log.info("Invitation from same instance \(inviterInstance) for \(inviterNodeID)") - } - } else { - self.latestByNodeID[inviterNodeID] = (peerID, inviterInstance) - self.log.info("Extracted inviter nodeID: \(inviterNodeID) instance: \(inviterInstance)") - } - } - - // DEADLOCK FIX: If they are inviting us, they think we are NOT connected. - // If we think we ARE connected, we are wrong (stale/half-open). - // We must Disconnect our stale socket and Accept the new invite. - if self.session?.connectedPeers.contains(peerID) == true { - self.log.warning("Received invitation from 'connected' peer \(peerID). Assuming Half-Open Deadlock. Resetting connection...") - // Kill the stale connection - self.session?.cancelConnectPeer(peerID) - - // DELAY ACCEPTANCE to allow disconnect to propagate and clear state (0.5s) - // This prevents "Connection refused" or immediate closure of the new socket - DispatchQueue.main.asyncAfter(deadline: .now() + 0.5) { [weak self] in - guard let self = self else { return } - guard self.session != nil else { return } - self.log.info("Accepting invitation after Half-Open reset delay.") - invitationHandler(true, self.session) - } - return - } - - // CAPACITY CHECK: Reject new peers if mesh is at capacity - guard !self.isAtCapacity else { - self.log.info("Mesh at capacity (\(Self.maxPeers) peers). Rejecting invitation from \(peerID).") - invitationHandler(false, nil) - return - } - - // IMMEDIATE ACCEPT: Per stability protocol, accept with 0.0s delay. - guard self.session != nil else { return } - invitationHandler(true, self.session) - } - } -} - -extension MultipeerSession: MCNearbyServiceBrowserDelegate { - func browser(_ browser: MCNearbyServiceBrowser, didNotStartBrowsingForPeers error: Error) { - log.error("ServiceBrowser didNotStartBrowsingForPeers: \(error.localizedDescription)") - } - - func browser(_ browser: MCNearbyServiceBrowser, foundPeer peerID: MCPeerID, withDiscoveryInfo info: [String : String]?) { - DispatchQueue.main.async { [weak self] in - guard let self = self else { return } - // IDENTITY CHECK: Ignore stale browsers - guard browser == self.serviceBrowser else { return } - - self.log.info("ServiceBrowser foundPeer: \(peerID)") - - guard let session = self.session else { return } - - // If already connected, ignore - if session.connectedPeers.contains(peerID) { - return - } - - // REQUIRE nodeID in discovery info (no backward compatibility needed) - guard let theirNodeID = info?["nodeID"], - let theirInstanceStr = info?["instance"], - let theirInstance = Int(theirInstanceStr) else { - self.log.info("Ignoring peer without nodeID/instance: \(peerID)") - return - } - - self.log.info("Found peer \(peerID) nodeID: \(theirNodeID) instance: \(theirInstance)") - - // Store nodeID -> peerID mapping for later lookups - self.peerNodeIDMap[peerID] = theirNodeID - - // 1. GHOST FILTERING: Keep only the highest instance for each nodeID - // Do this FIRST so we can clear cooldown on restart - if let existing = self.latestByNodeID[theirNodeID] { - if theirInstance > existing.instance { - // GHOST BUSTERS: Aggressively kill connection to the old stale peerID - self.session?.cancelConnectPeer(existing.peerID) - self.log.info("Found NEWER instance for nodeID \(theirNodeID): \(theirInstance) > \(existing.instance). Cancelled connection to ghost \(existing.peerID).") - - self.latestByNodeID[theirNodeID] = (peerID, theirInstance) - - // CLEAR COOLDOWN: Peer restarted, give them another chance - if self.cooldownUntil[theirNodeID] != nil { - self.cooldownUntil.removeValue(forKey: theirNodeID) - self.consecutiveFailures.removeValue(forKey: theirNodeID) - self.log.info("Cleared cooldown for \(theirNodeID) due to new instance") - } - - // CLEAR PENDING INVITE: If we were inviting the old instance, cancel it so we can invite the new one - if let pendingPeerID = self.pendingInvites[theirNodeID] { - self.log.info("Cancelling stale pending invite to \(pendingPeerID) for newer instance") - // Note: We can't explicitly 'cancel' an invite in MPC, but we remove the tracking so we can send a new one - self.pendingInvites.removeValue(forKey: theirNodeID) - - // If we had a watchdog timer for the old peer, cancel it - self.connectingTimeoutTimers[pendingPeerID]?.cancel() - self.connectingTimeoutTimers.removeValue(forKey: pendingPeerID) - } - } else if theirInstance < existing.instance { - self.log.info("Ignoring OLDER instance for nodeID \(theirNodeID): \(theirInstance) < \(existing.instance)") - return // Ignore ghost peer - } else if existing.peerID != peerID { - // Same instance but different MCPeerID - likely a race condition, keep existing - self.log.info("Ignoring duplicate instance for nodeID \(theirNodeID)") - return - } - } else { - self.latestByNodeID[theirNodeID] = (peerID, theirInstance) - } - - // 2. COOLDOWN CHECK: Ignore recently-failed peers (after ghost filtering) - if let cooldownEnd = self.cooldownUntil[theirNodeID], Date() < cooldownEnd { - self.log.info("Ignoring peer \(theirNodeID) - still in cooldown until \(cooldownEnd)") - return - } - - // 3. DETERMINISTIC LEADER/FOLLOWER: Use nodeID comparison (simple and reliable) - var shouldInvite = false - - if self.myNodeID > theirNodeID { - shouldInvite = true - self.log.info("Decision: I am LEADER (myNodeID > theirNodeID). I will invite.") - } else if self.myNodeID < theirNodeID { - shouldInvite = false - self.log.info("Decision: I am FOLLOWER (myNodeID < theirNodeID). I will wait.") - } else { - // Same nodeID (shouldn't happen - same device) - compare instance - shouldInvite = self.myInstance > theirInstance - self.log.info("Decision: Same nodeID, comparing instance (\(self.myInstance) vs \(theirInstance)).") - } - - if shouldInvite { - // CAPACITY CHECK: Skip invite if mesh is at capacity - guard !self.isAtCapacity else { - self.log.info("Mesh at capacity (\(Self.maxPeers) peers). Skipping invite to \(peerID).") - return - } - - guard !self.isRestarting else { return } - - // PENDING INVITE CHECK: Don't double-invite - guard self.pendingInvites[theirNodeID] == nil else { - self.log.info("Already have pending invite for nodeID \(theirNodeID). Skipping.") - return - } - - // GHOST CHECK: Ensure this is still the latest peer for this nodeID - if let latest = self.latestByNodeID[theirNodeID], - latest.peerID != peerID { - self.log.info("Aborting invite to \(peerID). Found newer MCPeerID for nodeID \(theirNodeID)!") - return - } - - // Track pending invite with peerID for cancellation - self.pendingInvites[theirNodeID] = peerID - - // Include nodeID/instance in invitation context so receiver can map immediately - let contextData = try? JSONEncoder().encode([ - "nodeID": self.myNodeID, - "instance": String(self.myInstance) - ]) - - // WARMUP DELAY: Wait 2.0s before inviting to allow listener to spin up - // This prevents "Connection refused" if we discover them immediately after they restart - self.log.info("Inviting peer \(peerID) (nodeID: \(theirNodeID)) in 1.0s (listener warmup).") - DispatchQueue.main.asyncAfter(deadline: .now() + 1.0) { [weak self] in - guard let self = self else { return } - - // VALIDATION CHECKS after delay - guard browser == self.serviceBrowser else { - self.log.info("Aborting delayed invite to \(peerID). Browser mismatch (restart occurred?).") - return - } - guard let session = self.session else { return } - - // 1. Is peer still pending? (Might have been cancelled by ghost check) - guard self.pendingInvites[theirNodeID] == peerID else { - self.log.info("Aborting delayed invite to \(peerID). No longer pending.") - return - } - - // 2. Are we already connected? - if session.connectedPeers.contains(peerID) { - return - } - - browser.invitePeer(peerID, to: session, withContext: contextData, timeout: 10) // Increased timeout slightly - } - } - } - } - - func browser(_ browser: MCNearbyServiceBrowser, lostPeer peerID: MCPeerID) { - DispatchQueue.main.async { [weak self] in - guard let self = self, browser == self.serviceBrowser else { return } - self.log.info("ServiceBrowser lostPeer: \(peerID)") - - // Clean up per-peer state for lost peer - if let nodeID = self.peerNodeIDMap[peerID] { - // Remove pending invite if it was for this peer - if self.pendingInvites[nodeID] == peerID { - self.pendingInvites.removeValue(forKey: nodeID) - self.log.info("Cleared pending invite for nodeID \(nodeID)") - } - - // Clear latestByNodeID if it points to this peerID - if let latest = self.latestByNodeID[nodeID], latest.peerID == peerID { - self.latestByNodeID.removeValue(forKey: nodeID) - self.log.info("Cleared latestByNodeID for nodeID \(nodeID)") - } - } - - // Remove peerNodeIDMap entry - self.peerNodeIDMap.removeValue(forKey: peerID) - - - } - } -} - -extension MultipeerSession: MCSessionDelegate { - func session(_ session: MCSession, peer peerID: MCPeerID, didChange state: MCSessionState) { - DispatchQueue.main.async { [weak self] in - guard let self = self else { return } - - // DEBUG: Raw session logging to catch Zombies - let sessionStatus = (session == self.session) ? "CURRENT" : "ZOMBIE" - self.log.debug("RAW didChange: \(peerID) -> \(state.rawValue) on \(sessionStatus) session") - - // IDENTITY CHECK: Ignore stale sessions - guard session == self.session else { return } - - // Look up nodeID for this peer - let nodeID = self.peerNodeIDMap[peerID] - - self.log.info("peer \(peerID) (nodeID: \(nodeID ?? "unknown")) didChange state: \(state.rawValue)") - - switch state { - case .connected: - // Cancel per-peer watchdog timer - // Cancel per-peer watchdog timer (if it exists for this specific socket) - // Note: We used to cancel on Handshake, but we can also cancel here effectively IF we trust the socket state. - // Actually, let's keep the logic consistent: Watchdog guards HANDSHAKE. - // But if we are connected, we *might* want to give it a grace period? - // No, sticking to "Cancel on Handshake" is safest. - // However, we MUST ensure old timers for *this* peerID are cleaned up if we reconnect? - - // Clean up previous timers for this socket if any (should be none as it's a new connection) - self.connectingTimeoutTimers[peerID]?.cancel() - self.connectingTimeoutTimers.removeValue(forKey: peerID) - - // NOTE: We do NOT reset consecutiveFailures here. - // We wait for Handshake + Stability Period. - - // Cancel any pending delayed restart (Recovery Succeeded!) - self.delayedRestartItem?.cancel() - self.delayedRestartItem = nil - - // Track previous peer count for KeepAlive lifecycle - let previousPeerCount = self.connectedPeers.count - - // Update connectedPeers IMMEDIATELY for responsive UI and correct mesh-empty checks - self.connectedPeers = session.connectedPeers - - // KeepAlive: Start only when transitioning from 0 β†’ 1 peers - if previousPeerCount == 0 && self.connectedPeers.count >= 1 { - self.startKeepAlive() - } - - // Traffic Gating - Task { - do { - // OPTIMIZATION: Check if we are on WiFi - // If on WiFi (stable), wait only 100ms. - // If on AWDL/Cellular (unstable), wait the full 1.0s to let the mesh settle. - let isWiFi = NetworkMonitor.shared.isWiFi - let delay: UInt64 = isWiFi ? 500_000_000 : 1_500_000_000 // 0.5s vs 1.5s - - if isWiFi { - self.log.info("On WiFi: Fast-tracking handshake (0.5s delay)") - } else { - self.log.info("On AWDL: Slow-tracking handshake (1.5s delay) for stability") - } - - try await Task.sleep(nanoseconds: delay) - - DispatchQueue.main.async { [weak self] in - guard let self = self else { return } - - // VALIDATION CHECKS - guard !self.isRestarting else { return } - guard session == self.session else { return } - guard session.connectedPeers.contains(peerID) else { return } - - // Send handshake when a peer connects (safely after delay) - self.sendHandshake() - } - } catch { - self.log.error("Traffic gating interrupted: \(error.localizedDescription)") - } - } - - case .notConnected: - // Cancel per-peer watchdog timer - self.connectingTimeoutTimers[peerID]?.cancel() - self.connectingTimeoutTimers.removeValue(forKey: peerID) - - // Clean up NodeID tracking - if let nodeID = nodeID { - // Clear pending invite status - self.pendingInvites.removeValue(forKey: nodeID) - } - - // STABILITY CHECK: If we lose connection before Stability Timer fires, we KEEP the failure count. - if let nodeID = nodeID, let timer = self.stabilityTimers[nodeID] { - self.log.warning("Connection to \(nodeID) dropped before stability period! Preserving failure count: \(self.consecutiveFailures[nodeID] ?? 0)") - timer.invalidate() - self.stabilityTimers.removeValue(forKey: nodeID) - } - - // Check if we were previously connected to this peer - let wasConnected = self.connectedPeers.contains(peerID) - - // PRESERVE PARTIAL TRANSCRIPT: If peer had a live transcript, post it as final message - if let peerUser = self.connectedPeerUsers.first(where: { $0.id == peerID }) { - // liveTranscripts is now keyed by nodeID - let transcriptKey = peerUser.nodeID.isEmpty ? peerUser.name : peerUser.nodeID - if let partialText = self.liveTranscripts[transcriptKey], !partialText.isEmpty { - let finalMessage = MeshMessage( - id: UUID(), - senderID: peerUser.name, - senderNodeID: peerUser.nodeID, - senderRole: peerUser.role, - senderColorHex: peerUser.colorHex, - content: partialText, - isTranscription: true, - timestamp: Date() - ) - self.receivedMessages.append(finalMessage) - self.log.info("Posted partial transcript from disconnected peer \(peerUser.name)") - } - self.liveTranscripts.removeValue(forKey: transcriptKey) - } - - // Update connected peers list - if let currentSession = self.session { - self.connectedPeers = currentSession.connectedPeers - } else { - self.connectedPeers.removeAll { $0 == peerID } - } - - // Remove from connectedPeerUsers - self.connectedPeerUsers.removeAll { $0.id == peerID } - - // KeepAlive: Stop when transitioning from 1 β†’ 0 peers - if self.connectedPeers.isEmpty { - self.stopKeepAlive() - } - - if wasConnected { - self.log.info("Connection dropped: \(peerID). Mesh size: \(self.connectedPeers.count)") - } - - // UNIFIED RECOVERY: Treats both handshake failures AND drops as candidates for Smart Retry. - // We NO LONGER restart services just because the mesh is empty, to prevent "Connection refused" errors for others. - - guard let nodeID = nodeID else { - self.log.info("Link lost for unknown nodeID peer. Ignoring.") - return - } - - // GHOST CHECK: If this is an older instance, ignore the failure - if let latest = self.latestByNodeID[nodeID], latest.peerID != peerID { - self.log.info("Ignoring failure of GHOST peer \(peerID). Current is \(latest.peerID).") - return - } - - // Track consecutive failures for poisoned state detection - let failures = (self.consecutiveFailures[nodeID] ?? 0) + 1 - self.consecutiveFailures[nodeID] = failures - self.log.info("Consecutive failures for nodeID \(nodeID): \(failures)") - - // Add exponential backoff cooldown - let backoff = self.calculateBackoff(failures: failures) - self.cooldownUntil[nodeID] = Date().addingTimeInterval(backoff) - self.log.info("Peer \(nodeID) in cooldown for \(backoff)s") - - // Don't restart if we have active connections - if !self.connectedPeers.isEmpty { - self.log.info("Handshake failed to \(nodeID), but we have active connections. Skipping restart.") - return - } - - // POISONED STATE: If too many consecutive failures, do full reset - if failures >= Self.poisonedThreshold { - self.log.error("POISONED STATE: \(failures) consecutive failures for \(nodeID). Full reset.") - self.restartServices(forcePoisonedRecovery: true) - return - } - - // SMART RETRY: Instead of restarting everything (Nuclear Option), just try to reconnect to this peer. - // Only LEADER initiates the retry to prevent race conditions. - if self.myNodeID > nodeID { - self.log.info("Handshake failed. I am LEADER. Scheduling Smart Retry in \(backoff)s.") - - DispatchQueue.main.asyncAfter(deadline: .now() + backoff) { [weak self] in - guard let self = self else { return } - - // Re-check conditions before firing - guard !self.connectedPeers.contains(peerID) else { return } - guard self.pendingInvites[nodeID] == nil else { return } - - // Ensure we still have a browser and session - guard let browser = self.serviceBrowser, let session = self.session else { return } - - self.log.info("Smart Retry: Re-inviting \(peerID) (nodeID: \(nodeID)).") - self.pendingInvites[nodeID] = peerID - - let contextData = try? JSONEncoder().encode([ - "nodeID": self.myNodeID, - "instance": String(self.myInstance) - ]) - - browser.invitePeer(peerID, to: session, withContext: contextData, timeout: 7) - } - } else { - self.log.info("Handshake failed. I am FOLLOWER. Waiting for Leader to retry.") - } - - - case .connecting: - // Per-peer watchdog: cancel only this peer on timeout, don't restart everything - // Try reverse lookup if nodeID not in peerNodeIDMap yet (e.g. from Pending Invites) - let resolvedNodeID = nodeID ?? self.pendingInvites.first(where: { $0.value == peerID })?.key - - guard let nodeID = resolvedNodeID else { - // This happens if THEY invited US and we haven't processed the context fully yet. - // Ideally, `advertiser` delegate should have populated peerNodeIDMap. - self.log.info("Connecting to peer with unknown nodeID: \(peerID) - Watchdog might be skipped if not resolved.") - return - } - - // Cancel any existing timer for this specific socket - self.connectingTimeoutTimers[peerID]?.cancel() - - // Capture expected instance to detect if peer updated while connecting - let expectedInstance = self.latestByNodeID[nodeID]?.instance ?? 0 - - // Longer timeout when not on WiFi (cellular AWDL is slower/unreliable) - // OPTIMIZATION: Relax WiFi timeout to 6.0s (was 3.5s) to avoid premature timeouts - let timeout: Double = NetworkMonitor.shared.isWiFi ? 6.0 : 15.0 - let item = DispatchWorkItem { [weak self] in - guard let self = self else { return } - - // ABORT GUARD: If we have since discovered a newer instance, don't punish this nodeID - if let current = self.latestByNodeID[nodeID], current.instance > expectedInstance { - self.log.info("Watchdog: Aborting timeout for peer \(peerID) (node \(nodeID)) - found newer instance \(current.instance) > \(expectedInstance)") - return - } - - self.log.error("Watchdog: Handshake TIMED OUT (\(timeout)s) for peer \(peerID).") - - // Kill THIS specific socket - self.session?.cancelConnectPeer(peerID) - - // Cancel pending invite logic ONLY if it matches this peer - if let peerToCancel = self.pendingInvites[nodeID], peerToCancel == peerID { - self.log.info("Watchdog: Clearing pending invite for \(nodeID)") - self.pendingInvites.removeValue(forKey: nodeID) - } - - // Track failure - let failures = (self.consecutiveFailures[nodeID] ?? 0) + 1 - self.consecutiveFailures[nodeID] = failures - - // Add cooldown with exponential backoff - let backoff = self.calculateBackoff(failures: failures) - self.cooldownUntil[nodeID] = Date().addingTimeInterval(backoff) - self.log.info("Peer \(nodeID) quarantined for \(backoff)s (failure #\(failures))") - - // POISONED STATE: Full reset only if mesh is empty - if failures >= Self.poisonedThreshold { - if self.connectedPeers.isEmpty { - self.log.error("POISONED STATE: \(failures) consecutive failures. Full reset.") - self.restartServices(forcePoisonedRecovery: true) - } else { - self.log.error("Poisoned threshold hit for \(nodeID) but mesh is active; quarantining only.") - } - return - } - - // Only restart if mesh is empty (Legacy logic kept for safety but largely disabled by checks) - if self.connectedPeers.isEmpty { - self.scheduleRestartIfStillEmpty() - } - } - self.connectingTimeoutTimers[peerID] = item - DispatchQueue.main.asyncAfter(deadline: .now() + timeout, execute: item) - - @unknown default: - break - } - } - } - - func session(_ session: MCSession, didReceive data: Data, fromPeer peerID: MCPeerID) { - // Safe check on main thread? No, didReceive is high volume. - // We can just capture self.session in the async block or check it against the passed session. - // But for data, it's less critical to check identity for *stablity* (only valid sessions receive data). - // However, consistency is good. - DispatchQueue.main.async { [weak self] in - guard let self = self else { return } - guard session == self.session else { return } - - do { - let message = try JSONDecoder().decode(MeshMessage.self, from: data) - - // GHOST FILTER: Ignore messages from older instances if we know a newer one - if !message.senderNodeID.isEmpty { - if let latest = self.latestByNodeID[message.senderNodeID] { - if message.senderInstance > latest.instance { - // SELF-HEALING: We found a newer instance via Handshake that Browser didn't report yet. - // TRUST IT immediately to fix race conditions. - self.log.info("Ghost Filter: Updated latest instance for \(message.senderNodeID) to \(message.senderInstance) based on Handshake.") - self.latestByNodeID[message.senderNodeID] = (peerID, message.senderInstance) - } else if latest.peerID != peerID { - // It's genuinely an older instance (Ghost) - self.log.info("Ignoring message from GHOST peer \(peerID) (NodeID: \(message.senderNodeID)). Current is \(latest.peerID)") - return - } - } - } - - // Keep-Alive / Gossip Handling - if message.isKeepAlive || message.content == "πŸ’“" { - // Gossip: Check for missing peers - if let theirConnectedNodes = message.connectedNodeIDs { - // 1. Identify nodes they have that WE don't (clique gaps) - // Filter out empty IDs and our own ID - let missingNodeIDs = theirConnectedNodes.filter { nid in - !nid.isEmpty && - nid != self.myNodeID && - !self.connectedPeerUsers.contains(where: { $0.nodeID == nid }) - } - - if !missingNodeIDs.isEmpty { - self.log.info("Gossip: Peer \(message.senderID) has connections I lack: \(missingNodeIDs)") - - for missingID in missingNodeIDs { - // 2. Check if we have discovered this missing node - if let discovery = self.latestByNodeID[missingID] { - - // 3. SAFETY CHECK: Are we already connected at the socket level? - // If yes, DO NOT Invite. Just wait for the handshake. - // (Gossip sees them as missing because they aren't in connectedPeerUSERS yet) - if self.session?.connectedPeers.contains(discovery.peerID) == true { - self.log.info("Gossip Repair: Ignoring missing \(missingID). Socket is connected to \(discovery.peerID), processing handshake...") - continue - } - - // We know them! REPAIR strategy: - // If we are "Leader" relative to them, active invite. - // If we are "Follower", we just log (or potentially bump state). - // Note: Using string comparison for nodeID is consistent with Smart Retry. - if self.myNodeID > missingID { - self.log.info("Gossip Repair: I am LEADER for missing \(missingID) (discovered as \(discovery.peerID)). Triggering Invite.") - // Re-use logic from foundPeer? Or just direct invite? - // Direct invite safety check: - if self.pendingInvites[missingID] == nil { - self.log.info("Gossip Repair: Sending Invite to \(discovery.peerID)") - // 2.0s delay to be safe, or immediate? - // Immediate is fine here as we've likely known them for a while if they are in 'latestByNodeID'. - let contextData = self.createInviteContext() - self.serviceBrowser?.invitePeer(discovery.peerID, to: self.session!, withContext: contextData, timeout: 10) - self.pendingInvites[missingID] = discovery.peerID - } - } else { - // REVERSE INVITE: Break the stalemate if Leader thinks we're connected but we aren't. - if self.pendingInvites[missingID] == nil { - self.log.info("Gossip Repair: I am FOLLOWER for missing \(missingID). Leader hasn't invited me (Half-Open?). TRIGGERING REVERSE INVITE.") - self.log.info("Gossip Repair: Sending Reverse Invite to \(discovery.peerID)") - let contextData = self.createInviteContext() - self.serviceBrowser?.invitePeer(discovery.peerID, to: self.session!, withContext: contextData, timeout: 10) - self.pendingInvites[missingID] = discovery.peerID - } - } - } else { - // We haven't even discovered them yet. Nothing we can do but wait for Bonjour. - // self.log.debug("Gossip: Missing \(missingID) is unknown to me (not in latestByNodeID).") - } - } - } - } - return - } - - if message.isHandshake { - // Use nodeID for reliable identity mapping (required, not optional) - let nodeID = message.senderNodeID - - // CRITICAL FIX: Cancel watchdog timer now that we have a handshake - // Cancel watchdog: Valid handshake received for THIS peer - self.connectingTimeoutTimers[peerID]?.cancel() - self.connectingTimeoutTimers.removeValue(forKey: peerID) - self.log.info("Watchdog: Cancelled timer for peer \(peerID) (Handshake received)") - - // Update peerNodeIDMap with handshake info - if !nodeID.isEmpty { - self.peerNodeIDMap[peerID] = nodeID - } - - let newUser = PeerUser( - peerID: peerID, - nodeID: nodeID, - name: message.senderID, - colorHex: message.senderColorHex, - role: message.senderRole - ) - - // DEATH SPIRAL FIX: Start Stability Timer (15s) - // We do NOT reset failures immediately. We wait to see if connection holds. - if !nodeID.isEmpty { - self.stabilityTimers[nodeID]?.invalidate() // Reset if exists - self.log.info("Starting 15s Stability Timer for \(nodeID)...") - - let timer = Timer.scheduledTimer(withTimeInterval: 15.0, repeats: false) { [weak self] _ in - guard let self = self else { return } - self.log.info("Stability Period Passed! Resetting failures for \(nodeID).") - self.consecutiveFailures[nodeID] = 0 - self.cooldownUntil.removeValue(forKey: nodeID) - self.stabilityTimers.removeValue(forKey: nodeID) - } - self.stabilityTimers[nodeID] = timer - } - - // Match by nodeID (reliable) instead of displayName - if !nodeID.isEmpty, let index = self.connectedPeerUsers.firstIndex(where: { $0.nodeID == nodeID }) { - self.log.info("UI Update: Updating existing user for nodeID \(nodeID) (peerID mismatch allowed)") - self.connectedPeerUsers[index] = newUser - } else if let index = self.connectedPeerUsers.firstIndex(where: { $0.id == peerID }) { - // Fallback: match by MCPeerID - self.log.info("UI Update: Updating existing user for peerID \(peerID)") - self.connectedPeerUsers[index] = newUser - } else { - self.log.info("UI Update: Appending new user \(peerID) (nodeID: \(nodeID))") - self.connectedPeerUsers.append(newUser) - } - - self.log.info("Handshake received from \(message.senderID) (nodeID: \(nodeID))") - } else if message.isPartial { - // Key by nodeID (we enforce senderNodeID on send, so this should always be set) - guard !message.senderNodeID.isEmpty else { - self.log.warning("Received partial with empty senderNodeID - ignoring") - return - } - if message.content.isEmpty { - self.liveTranscripts.removeValue(forKey: message.senderNodeID) - } else { - self.liveTranscripts[message.senderNodeID] = message.content - } - } else { - // Final message received - self.receivedMessages.append(message) - // Clear the live transcript for this sender - if !message.senderNodeID.isEmpty { - self.liveTranscripts.removeValue(forKey: message.senderNodeID) - } - } - } catch { - self.log.error("Error decoding message: \(error.localizedDescription)") - } - } - } - - func session(_ session: MCSession, didReceive stream: InputStream, withName streamName: String, fromPeer peerID: MCPeerID) { - log.error("Receiving streams is not supported") - } - - func session(_ session: MCSession, didStartReceivingResourceWithName resourceName: String, fromPeer peerID: MCPeerID, with progress: Progress) { - log.error("Receiving resources is not supported") - } - - func session(_ session: MCSession, didFinishReceivingResourceWithName resourceName: String, fromPeer peerID: MCPeerID, at localURL: URL?, withError error: Error?) { - log.error("Receiving resources is not supported") - } -} diff --git a/AtTable/NetworkFraming.swift b/AtTable/NetworkFraming.swift new file mode 100644 index 0000000..89501bd --- /dev/null +++ b/AtTable/NetworkFraming.swift @@ -0,0 +1,88 @@ +import Foundation +import Network + +enum NetworkFraming { + static let headerSize = 4 // UInt32 for length + static let maxMessageSize: UInt32 = 10_485_760 // 10 MB + + /// Frame data with 4-byte length prefix + static func frameData(_ data: Data) -> Data { + var length = UInt32(data.count).bigEndian + var framedData = Data(bytes: &length, count: headerSize) + framedData.append(data) + return framedData + } + + /// Send framed data over connection + static func send(_ data: Data, over connection: NWConnection, completion: @escaping (Error?) -> Void) { + let framedData = frameData(data) + connection.send(content: framedData, completion: .contentProcessed { error in + completion(error) + }) + } + + /// Receive framed data from connection + static func receive(from connection: NWConnection, completion: @escaping (Result) -> Void) { + // Read 4-byte header + connection.receive(minimumIncompleteLength: headerSize, maximumLength: headerSize) { headerData, _, isComplete, error in + if let error = error { + completion(.failure(error)) + return + } + + guard let headerData = headerData, headerData.count == headerSize else { + completion(.failure(FramingError.incompleteHeader)) + return + } + + let length = headerData.withUnsafeBytes { $0.load(as: UInt32.self).bigEndian } + + guard length > 0, length <= maxMessageSize else { + completion(.failure(FramingError.invalidMessageSize(length))) + return + } + + // Read payload + connection.receive(minimumIncompleteLength: Int(length), maximumLength: Int(length)) { payloadData, _, _, error in + if let error = error { + completion(.failure(error)) + return + } + + guard let payloadData = payloadData else { + completion(.failure(FramingError.incompletePayload)) + return + } + + completion(.success(payloadData)) + } + } + } + + /// Continuous receive loop + static func startReceiving(from connection: NWConnection, onData: @escaping (Data) -> Void, onError: @escaping (Error) -> Void) { + receive(from: connection) { result in + switch result { + case .success(let data): + onData(data) + startReceiving(from: connection, onData: onData, onError: onError) + case .failure(let error): + onError(error) + } + } + } +} + +enum FramingError: LocalizedError { + case incompleteHeader + case incompletePayload + case invalidMessageSize(UInt32) + + var errorDescription: String? { + switch self { + case .incompleteHeader: return "Incomplete header" + case .incompletePayload: return "Incomplete payload" + case .invalidMessageSize(let size): return "Invalid size: \(size)" + } + } +} diff --git a/AtTable/PeerUser.swift b/AtTable/PeerUser.swift index 71b9799..887b00f 100644 --- a/AtTable/PeerUser.swift +++ b/AtTable/PeerUser.swift @@ -1,18 +1,19 @@ import Foundation -import MultipeerConnectivity struct PeerUser: Identifiable, Hashable { - let id: MCPeerID - let nodeID: String // Stable identifier from handshake (for reliable identity) + let id: String // Use nodeID as the primary stable ID + let nodeID: String let name: String let colorHex: String - let role: UserRole // Role from handshake (for data integrity) + let role: UserRole + var isStable: Bool = false // New: visual indicator for traffic gate status - init(peerID: MCPeerID, nodeID: String = "", name: String? = nil, colorHex: String = "#808080", role: UserRole = .hearing) { - self.id = peerID + init(nodeID: String, name: String, colorHex: String = "#808080", role: UserRole = .hearing, isStable: Bool = false) { + self.id = nodeID self.nodeID = nodeID - self.name = name ?? peerID.displayName + self.name = name self.colorHex = colorHex self.role = role + self.isStable = isStable } -} +} \ No newline at end of file diff --git a/MPC_how_it_works.md b/MPC_how_it_works.md deleted file mode 100644 index a3c6cbc..0000000 --- a/MPC_how_it_works.md +++ /dev/null @@ -1,255 +0,0 @@ -# Multipeer Connectivity (MPC) Architecture - -This document explains how AtTable uses Apple's Multipeer Connectivity framework to create a peer-to-peer mesh network for real-time communication between deaf and hearing users. - ---- - -## Overview - -AtTable uses **Multipeer Connectivity (MPC)** to establish direct device-to-device connections without requiring a central server. The app supports connections over: - -- **Wi-Fi** (same network) -- **Peer-to-peer Wi-Fi** (AWDL - Apple Wireless Direct Link) -- **Bluetooth** - -When devices aren't on the same Wi-Fi network (e.g., on 5G/cellular), MPC automatically falls back to **AWDL** for peer-to-peer discovery and data transfer. - ---- - -## User Onboarding Flow - -### 1. Initial Setup (`OnboardingView.swift`) - -When a user launches the app: - -1. They enter their **name** -2. Select their **role** (Deaf or Hearing) -3. Choose an **aura color** (for visual identity in the mesh) -4. Tap **"Start Conversation"** to enter the mesh - -``` -User launches app β†’ OnboardingView β†’ Enter details β†’ ChatView (mesh starts) -``` - -### 2. Identity Generation (`NodeIdentity.swift`) - -Upon first launch, the app generates a **stable Node Identity**: - -- **`nodeID`**: A UUID persisted in UserDefaults (stable per app installation) -- **`instance`**: A monotonic counter that increments each time a session starts - -This identity system allows the mesh to: -- Reliably identify users across reconnections -- Detect and filter "ghost" peers (stale connections from previous sessions) -- Handle device reboots gracefully - ---- - -## Network Connection Process - -### Discovery & Connection (`MultipeerSession.swift`) - -When `ChatView` appears, it calls `multipeerSession.start()`, which: - -1. **Sets up the MCSession** with encryption disabled (for faster AWDL connections) -2. **Starts browsing** for nearby peers using `MCNearbyServiceBrowser` -3. **Starts advertising** (after 0.5s delay) using `MCNearbyServiceAdvertiser` - -### Wi-Fi vs Cellular/5G Connections - -| Network Type | Connection Method | Handshake Delay | Connection Time | -|--------------|-------------------|-----------------|-----------------| -| **Wi-Fi (same network)** | Infrastructure Wi-Fi | 0.5 seconds | Near-instant | -| **Cellular/5G** | AWDL (peer-to-peer Wi-Fi) | 1.5 seconds | Up to 60 seconds | - -The app uses `NetworkMonitor.swift` to detect the current network type and adjusts timing: - -```swift -let isWiFi = NetworkMonitor.shared.isWiFi -let delay = isWiFi ? 0.5s : 1.5s // Slower for AWDL stability -``` - -### Deterministic Leader/Follower Protocol - -To prevent connection races (both devices trying to invite each other), the app uses a **deterministic leader election**: - -```swift -if myNodeID > theirNodeID { - // I am LEADER - I will send the invite -} else { - // I am FOLLOWER - I wait for their invite -} -``` - -This ensures exactly one device initiates each connection. - ---- - -## Handshake Protocol - -Once connected at the socket level, devices exchange **handshake messages** containing: - -```swift -struct MeshMessage { - var senderNodeID: String // Stable identity - var senderInstance: Int // Session counter (for ghost detection) - var senderRole: UserRole // Deaf or Hearing - var senderColorHex: String // Aura color - var isHandshake: Bool // Identifies this as handshake -} -``` - -The handshake: - -1. Registers the peer in `connectedPeerUsers` for UI display -2. Starts a **15-second stability timer** before clearing failure counters -3. Maps the `MCPeerID` to the stable `nodeID` for reliable identification - ---- - -## User Leaving the Conversation - -### Explicit Leave (`ChatView.swift`) - -When a user taps **"Leave"**: - -```swift -Button(action: { - speechRecognizer.stopRecording() // Stop audio transcription - multipeerSession.stop() // Disconnect from mesh - isOnboardingComplete = false // Return to onboarding -}) -``` - -### Disconnect Cleanup (`MultipeerSession.disconnect()`) - -The `disconnect()` function performs complete cleanup: - -1. **Cancel pending work**: Recovery tasks, connection timers -2. **Stop services**: Advertising and browsing -3. **Clear delegates**: Prevent zombie callbacks -4. **Disconnect session**: `session?.disconnect()` -5. **Clear all state**: - - `connectedPeers` / `connectedPeerUsers` - - `pendingInvites` / `latestByNodeID` - - `cooldownUntil` / `consecutiveFailures` -6. **Stop keep-alive heartbeats** - -### Partial Transcript Preservation - -If a peer disconnects mid-speech, their **partial transcript is preserved** as a final message: - -```swift -if let partialText = liveTranscripts[peerKey], !partialText.isEmpty { - let finalMessage = MeshMessage(content: partialText, ...) - receivedMessages.append(finalMessage) -} -``` - ---- - -## Rejoining the Conversation - -### Identity Recovery - -When a user returns to the conversation: - -1. App resets `isOnboardingComplete = false` on every launch (intentional - forces Login screen) -2. User completes onboarding again (name/role/color preserved in `@AppStorage`) -3. `multipeerSession.start()` called again - -### Instance Increment - -The key to reliable rejoining is the **instance counter**: - -```swift -myInstance = NodeIdentity.nextInstance() // Monotonically increasing -``` - -When other devices see the new instance: - -1. **Ghost Detection**: Old connections with lower instances are rejected -2. **Cooldown Clear**: Any cooldowns from previous failures are removed -3. **Fresh Connect**: The leader initiates a new invitation - -### Handling Stale Peers - -The mesh uses multiple mechanisms to handle rejoins: - -| Mechanism | Purpose | -|-----------|---------| -| **Ghost Filtering** | Reject messages/invites from older instances | -| **Cooldown Clear** | Give returning peers a fresh chance | -| **Half-Open Deadlock Fix** | If we think we're connected but they invite us, accept the new invite | -| **Stability Timer** | Only reset failure counts after 15s of stable connection | - ---- - -## Keep-Alive & Mesh Health - -### Heartbeat System - -When connected, the mesh sends **heartbeats every 10 seconds**: - -```swift -let message = MeshMessage( - content: "πŸ’“", - isKeepAlive: true, - connectedNodeIDs: connectedPeerUsers.map { $0.nodeID } // Gossip -) -``` - -### Gossip Protocol - -Heartbeats include a list of connected peers, enabling **clique repair**: - -1. Device A receives heartbeat from Device B -2. If B knows Device C but A doesn't, A can proactively invite C -3. This heals mesh partitions without requiring everyone to be discoverable - ---- - -## Connection Recovery - -### Exponential Backoff - -Failed connections trigger increasing cooldown periods: - -```swift -// 0.5s β†’ 1.0s β†’ 2.0s β†’ 4.0s β†’ ... β†’ max 30s -let delay = min(0.5 * pow(2, failures - 1), 30.0) -``` - -### Smart Retry - -Instead of restarting everything, failed connections are retried individually: - -1. Only the **leader** initiates retries (prevents race conditions) -2. Retries respect cooldown periods -3. After 5 consecutive failures β†’ **"Poisoned State"** triggers full reset - -### Poisoned State Recovery - -If a peer has too many consecutive failures: - -```swift -if failures >= 5 { - restartServices(forcePoisonedRecovery: true) - // Creates new MCPeerID, clears all cooldowns -} -``` - ---- - -## Summary - -| Event | What Happens | -|-------|--------------| -| **User joins** | NodeID retrieved, instance incremented, advertise + browse started | -| **On Wi-Fi** | Fast handshake (0.5s), near-instant connections | -| **On 5G/Cellular** | AWDL used, slower handshake (1.5s), up to 60s to connect | -| **User leaves** | Full cleanup, partial transcripts preserved | -| **User rejoins** | New instance number, ghosts filtered, cooldowns cleared | -| **Connection fails** | Exponential backoff, smart retry by leader only | - -The architecture prioritizes **reliability over speed**, using defensive mechanisms like ghost filtering, stability timers, and gossip-based clique repair to maintain mesh health despite the inherent unreliability of peer-to-peer wireless connections. diff --git a/transition.md b/transition.md new file mode 100644 index 0000000..75db8d1 --- /dev/null +++ b/transition.md @@ -0,0 +1,1257 @@ +# MultipeerConnectivity to Network.framework Transition Guide + +This document explains how to transition an iOS app from MultipeerConnectivity (MPC) to Network.framework for peer-to-peer networking. + +--- + +## Why Transition? + +- **Network.framework** provides lower-level control over connections +- Better performance with direct TCP/UDP streams +- More flexible discovery options (can combine with BLE for faster discovery) +- AWDL (Apple Wireless Direct Link) support via `includePeerToPeer = true` +- Future-proof: Network.framework is Apple's modern networking API + +--- + +## Architecture Overview + +### MultipeerConnectivity Architecture +``` +MCNearbyServiceAdvertiser β†’ Advertises service +MCNearbyServiceBrowser β†’ Discovers services +MCSession β†’ Manages connections & data transfer +MCPeerID β†’ Identifies peers +``` + +### Network.framework Architecture +``` +NWListener β†’ Advertises service (replaces MCNearbyServiceAdvertiser) +NWBrowser β†’ Discovers services (replaces MCNearbyServiceBrowser) +NWConnection β†’ Individual connection per peer (replaces MCSession) +UUID / Custom struct β†’ Identifies peers (replaces MCPeerID) +``` + +--- + +## New Files to Create + +### 1. Models + +**`DiscoveredHost.swift`** - Represents a discovered host +```swift +import Foundation + +struct DiscoveredHost: Identifiable, Hashable { + let id: UUID + let deviceName: String + let eventName: String + let discoveredAt: Date + let endpointID: String // Unique identifier from NWBrowser + + init(deviceName: String, eventName: String, endpointID: String) { + self.id = UUID() + self.deviceName = deviceName + self.eventName = eventName + self.discoveredAt = Date() + self.endpointID = endpointID + } + + func hash(into hasher: inout Hasher) { + hasher.combine(endpointID) + } + + static func == (lhs: DiscoveredHost, rhs: DiscoveredHost) -> Bool { + lhs.endpointID == rhs.endpointID + } +} +``` + +**`ConnectedPeer.swift`** - Represents a connected peer +```swift +import Foundation + +struct ConnectedPeer: Identifiable, Hashable { + let id: UUID + let displayName: String + let connectedAt: Date + var isStable: Bool // For traffic gate pattern + + init(displayName: String, isStable: Bool = false) { + self.id = UUID() + self.displayName = displayName + self.connectedAt = Date() + self.isStable = isStable + } +} + +enum P2PConnectionState { + case idle + case hosting + case browsing + case connecting + case connected + case disconnected + case failed(Error) +} +``` + +### 2. NetworkFraming.swift - TCP Message Framing + +Network.framework uses raw TCP streams, so you need length-prefixed framing: + +```swift +import Foundation +import Network + +enum NetworkFraming { + static let headerSize = 4 // UInt32 for length + static let maxMessageSize: UInt32 = 10_485_760 // 10 MB + + /// Frame data with 4-byte length prefix + static func frameData(_ data: Data) -> Data { + var length = UInt32(data.count).bigEndian + var framedData = Data(bytes: &length, count: headerSize) + framedData.append(data) + return framedData + } + + /// Send framed data over connection + static func send(_ data: Data, over connection: NWConnection, completion: @escaping (Error?) -> Void) { + let framedData = frameData(data) + connection.send(content: framedData, completion: .contentProcessed { error in + completion(error) + }) + } + + /// Receive framed data from connection + static func receive(from connection: NWConnection, completion: @escaping (Result) -> Void) { + // Read 4-byte header + connection.receive(minimumIncompleteLength: headerSize, maximumLength: headerSize) { headerData, _, isComplete, error in + if let error = error { + completion(.failure(error)) + return + } + + guard let headerData = headerData, headerData.count == headerSize else { + completion(.failure(FramingError.incompleteHeader)) + return + } + + let length = headerData.withUnsafeBytes { $0.load(as: UInt32.self).bigEndian } + + guard length > 0, length <= maxMessageSize else { + completion(.failure(FramingError.invalidMessageSize(length))) + return + } + + // Read payload + connection.receive(minimumIncompleteLength: Int(length), maximumLength: Int(length)) { payloadData, _, _, error in + if let error = error { + completion(.failure(error)) + return + } + + guard let payloadData = payloadData else { + completion(.failure(FramingError.incompletePayload)) + return + } + + completion(.success(payloadData)) + } + } + } + + /// Continuous receive loop + static func startReceiving(from connection: NWConnection, onData: @escaping (Data) -> Void, onError: @escaping (Error) -> Void) { + receive(from: connection) { result in + switch result { + case .success(let data): + onData(data) + startReceiving(from: connection, onData: onData, onError: onError) + case .failure(let error): + onError(error) + } + } + } +} + +enum FramingError: LocalizedError { + case incompleteHeader + case incompletePayload + case invalidMessageSize(UInt32) + + var errorDescription: String? { + switch self { + case .incompleteHeader: return "Incomplete header" + case .incompletePayload: return "Incomplete payload" + case .invalidMessageSize(let size): return "Invalid size: \(size)" + } + } +} +``` + +### 3. BLEDiscoveryManager.swift (Optional - For Fast Discovery) + +BLE advertisement wakes AWDL faster than Bonjour alone: + +```swift +import Foundation +import CoreBluetooth +import Combine + +@MainActor +class BLEDiscoveryManager: NSObject, ObservableObject { + @Published var isAdvertising = false + @Published var isScanning = false + + var onHostDiscovered: ((String) -> Void)? + + // Use a valid UUID (hex characters only: 0-9, A-F) + private let serviceUUID = CBUUID(string: "YOUR-UUID-HERE") + private var peripheralManager: CBPeripheralManager? + private var centralManager: CBCentralManager? + private var eventName = "" + private var discoveredPeripherals: Set = [] + + // MARK: - Host Mode (Peripheral) + + func startAdvertising(eventName: String) { + self.eventName = eventName + peripheralManager = CBPeripheralManager(delegate: self, queue: nil) + } + + func stopAdvertising() { + peripheralManager?.stopAdvertising() + peripheralManager = nil + isAdvertising = false + } + + // MARK: - Guest Mode (Central) + + func startScanning() { + discoveredPeripherals.removeAll() + centralManager = CBCentralManager(delegate: self, queue: nil) + } + + func stopScanning() { + centralManager?.stopScan() + centralManager = nil + isScanning = false + } +} + +extension BLEDiscoveryManager: CBPeripheralManagerDelegate { + nonisolated func peripheralManagerDidUpdateState(_ peripheral: CBPeripheralManager) { + Task { @MainActor in + if peripheral.state == .poweredOn { + let truncatedName = String(eventName.prefix(8)) // BLE has 28-byte limit + peripheral.startAdvertising([ + CBAdvertisementDataServiceUUIDsKey: [serviceUUID], + CBAdvertisementDataLocalNameKey: truncatedName + ]) + isAdvertising = true + } + } + } +} + +extension BLEDiscoveryManager: CBCentralManagerDelegate { + nonisolated func centralManagerDidUpdateState(_ central: CBCentralManager) { + Task { @MainActor in + if central.state == .poweredOn { + central.scanForPeripherals(withServices: [serviceUUID]) + isScanning = true + } + } + } + + nonisolated func centralManager(_ central: CBCentralManager, didDiscover peripheral: CBPeripheral, advertisementData: [String: Any], rssi: NSNumber) { + Task { @MainActor in + guard !discoveredPeripherals.contains(peripheral.identifier) else { return } + discoveredPeripherals.insert(peripheral.identifier) + + let hostName = advertisementData[CBAdvertisementDataLocalNameKey] as? String ?? "Unknown" + onHostDiscovered?(hostName) + } + } +} +``` + +--- + +## Key Implementation Patterns + +### 1. Network Parameters Configuration + +```swift +private func createNetworkParameters() -> NWParameters { + let parameters = NWParameters.tcp + parameters.includePeerToPeer = true // Enables AWDL + parameters.serviceClass = .responsiveData // Low-latency + + let options = NWProtocolTCP.Options() + options.enableKeepalive = true + options.keepaliveInterval = 30 + parameters.defaultProtocolStack.transportProtocol = options + + return parameters +} +``` + +### 2. Host Mode - NWListener (replaces MCNearbyServiceAdvertiser) + +```swift +func startHosting(eventName: String) { + let parameters = createNetworkParameters() + listener = try NWListener(using: parameters) + + // Include device name in TXT record + let deviceName = UIDevice.current.name + let txtEntry = "deviceName=\(deviceName)" + var txtData = Data() + txtData.append(UInt8(txtEntry.utf8.count)) + txtData.append(contentsOf: txtEntry.utf8) + + listener?.service = NWListener.Service( + name: eventName, + type: "_yourservice._tcp", + txtRecord: txtData + ) + + listener?.stateUpdateHandler = { [weak self] state in + guard let self else { return } + Task { @MainActor in + self.handleListenerState(state) + } + } + + listener?.newConnectionHandler = { [weak self] connection in + guard let self else { return } + Task { @MainActor in + self.handleNewConnection(connection) + } + } + + listener?.start(queue: .main) +} +``` + +### 3. Guest Mode - NWBrowser (replaces MCNearbyServiceBrowser) + +```swift +func startBrowsing() { + let parameters = createNetworkParameters() + browser = NWBrowser(for: .bonjour(type: "_yourservice._tcp", domain: nil), using: parameters) + + browser?.browseResultsChangedHandler = { [weak self] results, changes in + guard let self else { return } + Task { @MainActor in + for change in changes { + switch change { + case .added(let result): + self.handleHostFound(result) + case .removed(let result): + self.handleHostLost(result) + default: + break + } + } + } + } + + browser?.start(queue: .main) +} + +private func handleHostFound(_ result: NWBrowser.Result) { + guard case .service(let name, let type, let domain, _) = result.endpoint else { return } + + let endpointID = "\(name).\(type).\(domain)" + + // Extract device name from TXT record + var deviceName = "Unknown Device" + if case .bonjour(let txtRecord) = result.metadata { + if let name = txtRecord["deviceName"] { + deviceName = name + } + } + + let host = DiscoveredHost(deviceName: deviceName, eventName: name, endpointID: endpointID) + availableHosts[endpointID] = host +} +``` + +### 4. Connecting to Host (replaces invitePeer) + +```swift +func joinHost(_ host: DiscoveredHost) { + let endpoint = NWEndpoint.service( + name: host.eventName, + type: "_yourservice._tcp", + domain: "local", + interface: nil + ) + + let parameters = createNetworkParameters() + hostConnection = NWConnection(to: endpoint, using: parameters) + + hostConnection?.stateUpdateHandler = { [weak self] state in + guard let self else { return } + Task { @MainActor in + self.handleConnectionState(state) + } + } + + hostConnection?.start(queue: .main) +} +``` + +### 5. Handling New Connections (Host Side) + +```swift +private func handleNewConnection(_ connection: NWConnection) { + let peerID = UUID() + + guestConnections[peerID] = connection + connectedPeers[peerID] = ConnectedPeer(displayName: "Guest", isStable: false) + + connection.stateUpdateHandler = { [weak self] state in + guard let self else { return } + Task { @MainActor in + // Handle state changes + } + } + + connection.start(queue: .main) + + // Start receiving data + NetworkFraming.startReceiving(from: connection) { [weak self] data in + // Handle received data + } onError: { [weak self] error in + // Handle error, remove connection + } + + // Traffic Gate: Wait before marking as stable + Task { @MainActor in + try? await Task.sleep(nanoseconds: 4_000_000_000) // 4 seconds + guard connectedPeers[peerID] != nil else { return } + stablePeers.insert(peerID) + connectedPeers[peerID]?.isStable = true + onPeerJoined?(peerID) + } +} +``` + +### 6. Broadcasting Data (replaces MCSession.send) + +```swift +func broadcastData(_ data: Data) { + // Traffic Gate: Only send to stable peers + let stableConnections = guestConnections.filter { stablePeers.contains($0.key) } + + for (_, connection) in stableConnections { + NetworkFraming.send(data, over: connection) { error in + if let error = error { + print("Send failed: \(error)") + } + } + } +} +``` + +--- + +## Callback Mapping + +| MultipeerConnectivity | Network.framework | +|----------------------|-------------------| +| `session(_:peer:didChange:)` | `connection.stateUpdateHandler` | +| `session(_:didReceive:fromPeer:)` | `NetworkFraming.startReceiving()` | +| `advertiser(_:didReceiveInvitationFromPeer:)` | `listener.newConnectionHandler` | +| `browser(_:foundPeer:withDiscoveryInfo:)` | `browser.browseResultsChangedHandler` (.added) | +| `browser(_:lostPeer:)` | `browser.browseResultsChangedHandler` (.removed) | + +--- + +## Info.plist Updates + +Add these keys for BLE discovery (if using): + +```xml +UIBackgroundModes + + bluetooth-central + bluetooth-peripheral + +NSBluetoothAlwaysUsageDescription +Uses Bluetooth to quickly discover nearby sessions. +NSBluetoothPeripheralUsageDescription +Uses Bluetooth to advertise sessions to nearby devices. +NSLocalNetworkUsageDescription +Uses local network for peer-to-peer connections. +``` + +Keep existing Bonjour services: +```xml +NSBonjourServices + + _yourservice._tcp + +``` + +--- + +## Important Considerations + +### 1. Traffic Gate Pattern +Keep the stabilization delay (4 seconds) before marking peers as stable. This prevents data loss on mixed 5G+WiFi networks. + +### 2. Connection Retry Logic +Implement exponential backoff (1s, 2s, 4s) for connection retries, just like with MPC. + +### 3. Swift 6 Concurrency +Use `guard let self else { return }` pattern before `Task` blocks to avoid Swift 6 warnings: + +```swift +// Good pattern +connection.stateUpdateHandler = { [weak self] state in + guard let self else { return } + Task { @MainActor in + self.handleState(state) + } +} +``` + +### 4. Reconnection Handling +Add an `onHostConnected` callback to clear "disconnected" UI states when reconnecting: + +```swift +var onHostConnected: (() -> Void)? + +// In connection success handler: +isConnectedToHost = true +onHostConnected?() // Clears "host lost" banners +``` + +### 5. Device Name in TXT Record +Network.framework doesn't expose device names like MPC's `MCPeerID.displayName`. Include it in the TXT record: + +```swift +// Host: Add to TXT record +let txtEntry = "deviceName=\(UIDevice.current.name)" + +// Guest: Extract from metadata +if case .bonjour(let txtRecord) = result.metadata { + if let deviceName = txtRecord["deviceName"] { + // Use deviceName + } +} +``` + +--- + +## Migration Checklist + +- [ ] Create `DiscoveredHost` and `ConnectedPeer` models +- [ ] Create `NetworkFraming` utility for TCP framing +- [ ] Create `P2PConnectionManager` with NWListener/NWBrowser +- [ ] (Optional) Create `BLEDiscoveryManager` for fast discovery +- [ ] Update Info.plist with BLE permissions (if using BLE) +- [ ] Update all views to use new manager +- [ ] Replace `MCPeerID` with `UUID` or custom struct +- [ ] Implement traffic gate pattern (4-second stabilization) +- [ ] Implement connection retry with exponential backoff +- [ ] Add `onHostConnected` callback for reconnection handling +- [ ] Test: Host starts, guest joins +- [ ] Test: Multiple guests join simultaneously +- [ ] Test: Guest leaves and rejoins +- [ ] Test: Host disconnects, guests receive notification +- [ ] Keep old `MultipeerManager` as fallback initially + +--- + +## File Structure + +``` +YourApp/ +β”œβ”€β”€ Managers/ +β”‚ β”œβ”€β”€ MultipeerManager.swift (keep as fallback) +β”‚ β”œβ”€β”€ P2PConnectionManager.swift (NEW - main networking) +β”‚ β”œβ”€β”€ BLEDiscoveryManager.swift (NEW - optional fast discovery) +β”‚ └── NetworkFraming.swift (NEW - TCP framing) +β”œβ”€β”€ Models/ +β”‚ β”œβ”€β”€ DiscoveredHost.swift (NEW) +β”‚ └── ConnectedPeer.swift (NEW) +``` + +--- + +## Full Mesh Network (Up to 8 Devices) + +For apps where all devices need to communicate with each other (like a group chat or multiplayer game), you need a **full mesh topology** where every device connects to every other device. + +### Mesh Network Architecture + +``` + Device A + / | \ + / | \ +Device B----Device C + \ | / + \ | / + Device D +``` + +In a full mesh with N devices, each device maintains N-1 connections. + +### Key Differences from Host/Guest Model + +| Host/Guest Model | Full Mesh Model | +|------------------|-----------------| +| One host, multiple guests | All peers are equal | +| Host runs NWListener only | Every device runs NWListener AND NWBrowser | +| Guests run NWBrowser only | Every device can initiate or accept connections | +| One-to-many communication | Many-to-many communication | + +### MeshPeer Model + +```swift +import Foundation + +struct MeshPeer: Identifiable, Hashable { + let id: String // Unique device identifier (persisted across sessions) + let displayName: String + let discoveredAt: Date + var isConnected: Bool + var isStable: Bool + + init(id: String, displayName: String) { + self.id = id + self.displayName = displayName + self.discoveredAt = Date() + self.isConnected = false + self.isStable = false + } + + func hash(into hasher: inout Hasher) { + hasher.combine(id) + } + + static func == (lhs: MeshPeer, rhs: MeshPeer) -> Bool { + lhs.id == rhs.id + } +} +``` + +### MeshNetworkManager + +```swift +import Foundation +import Network +import UIKit +import Combine + +@MainActor +class MeshNetworkManager: NSObject, ObservableObject { + + // MARK: - Published Properties + + @Published var discoveredPeers: [String: MeshPeer] = [:] + @Published var connectedPeers: [String: MeshPeer] = [:] + @Published var isRunning = false + + // MARK: - Callbacks + + var onMessageReceived: ((Data, MeshPeer) -> Void)? + var onPeerConnected: ((MeshPeer) -> Void)? + var onPeerDisconnected: ((MeshPeer) -> Void)? + + // MARK: - Private Properties + + private let serviceType = "_yourmesh._tcp" + private let myPeerID: String // Unique, persistent device ID + private let myDisplayName: String + + private var listener: NWListener? + private var browser: NWBrowser? + private var connections: [String: NWConnection] = [:] // peerID -> connection + private var stablePeers: Set = [] + private var pendingConnections: Set = [] // Peers we're connecting to + + // MARK: - Initialization + + override init() { + // Create or retrieve persistent device ID + if let savedID = UserDefaults.standard.string(forKey: "meshPeerID") { + self.myPeerID = savedID + } else { + let newID = UUID().uuidString + UserDefaults.standard.set(newID, forKey: "meshPeerID") + self.myPeerID = newID + } + self.myDisplayName = UIDevice.current.name + super.init() + } + + // MARK: - Network Parameters + + private func createParameters() -> NWParameters { + let parameters = NWParameters.tcp + parameters.includePeerToPeer = true + parameters.serviceClass = .responsiveData + return parameters + } + + // MARK: - Start/Stop Mesh + + func startMesh() { + guard !isRunning else { return } + + startListener() + startBrowser() + isRunning = true + + print("Mesh started with ID: \(myPeerID)") + } + + func stopMesh() { + listener?.cancel() + listener = nil + + browser?.cancel() + browser = nil + + for (_, connection) in connections { + connection.cancel() + } + connections.removeAll() + connectedPeers.removeAll() + discoveredPeers.removeAll() + stablePeers.removeAll() + pendingConnections.removeAll() + + isRunning = false + } + + // MARK: - Listener (Accept Incoming Connections) + + private func startListener() { + do { + let parameters = createParameters() + listener = try NWListener(using: parameters) + + // Advertise with our peer ID and display name in TXT record + let txtEntries = "peerID=\(myPeerID)\tdisplayName=\(myDisplayName)" + var txtData = Data() + for entry in txtEntries.split(separator: "\t") { + let entryString = String(entry) + txtData.append(UInt8(entryString.utf8.count)) + txtData.append(contentsOf: entryString.utf8) + } + + listener?.service = NWListener.Service( + name: myPeerID, // Use peer ID as service name for uniqueness + type: serviceType, + txtRecord: txtData + ) + + listener?.stateUpdateHandler = { [weak self] state in + guard let self else { return } + Task { @MainActor in + if case .failed(let error) = state { + print("Listener failed: \(error)") + } + } + } + + listener?.newConnectionHandler = { [weak self] connection in + guard let self else { return } + Task { @MainActor in + self.handleIncomingConnection(connection) + } + } + + listener?.start(queue: .main) + } catch { + print("Failed to start listener: \(error)") + } + } + + // MARK: - Browser (Discover Other Peers) + + private func startBrowser() { + let parameters = createParameters() + browser = NWBrowser(for: .bonjour(type: serviceType, domain: nil), using: parameters) + + browser?.browseResultsChangedHandler = { [weak self] results, changes in + guard let self else { return } + Task { @MainActor in + self.handleBrowseResults(changes: changes) + } + } + + browser?.start(queue: .main) + } + + private func handleBrowseResults(changes: Set) { + for change in changes { + switch change { + case .added(let result): + handlePeerDiscovered(result) + case .removed(let result): + handlePeerLost(result) + default: + break + } + } + } + + private func handlePeerDiscovered(_ result: NWBrowser.Result) { + // Extract peer info from TXT record + var peerID: String? + var displayName = "Unknown" + + if case .bonjour(let txtRecord) = result.metadata { + peerID = txtRecord["peerID"] + displayName = txtRecord["displayName"] ?? "Unknown" + } + + guard let peerID = peerID, peerID != myPeerID else { return } // Ignore self + + // Already connected? + guard connections[peerID] == nil else { return } + + let peer = MeshPeer(id: peerID, displayName: displayName) + discoveredPeers[peerID] = peer + + print("Discovered peer: \(displayName) (\(peerID))") + + // Decide who initiates connection using tie-breaker + // Lower peer ID initiates to avoid duplicate connections + if myPeerID < peerID && !pendingConnections.contains(peerID) { + initiateConnection(to: peer, result: result) + } + } + + private func handlePeerLost(_ result: NWBrowser.Result) { + if case .service(let name, _, _, _) = result.endpoint { + // Service name is the peer ID + discoveredPeers.removeValue(forKey: name) + } + } + + // MARK: - Connection Management + + /// Initiate outgoing connection (we have lower ID) + private func initiateConnection(to peer: MeshPeer, result: NWBrowser.Result) { + pendingConnections.insert(peer.id) + + let parameters = createParameters() + let connection = NWConnection(to: result.endpoint, using: parameters) + + connection.stateUpdateHandler = { [weak self] state in + guard let self else { return } + Task { @MainActor in + self.handleConnectionState(state, peerID: peer.id, isOutgoing: true) + } + } + + connection.start(queue: .main) + connections[peer.id] = connection + + print("Initiating connection to: \(peer.displayName)") + } + + /// Handle incoming connection (other peer had lower ID) + private func handleIncomingConnection(_ connection: NWConnection) { + // We need to identify who this connection is from + // Send our peer ID immediately after connection is ready + connection.stateUpdateHandler = { [weak self] state in + guard let self else { return } + Task { @MainActor in + self.handleIncomingConnectionState(state, connection: connection) + } + } + + connection.start(queue: .main) + } + + private func handleIncomingConnectionState(_ state: NWConnection.State, connection: NWConnection) { + switch state { + case .ready: + // Wait for the remote peer to identify themselves + receiveIdentification(from: connection) + case .failed, .cancelled: + connection.cancel() + default: + break + } + } + + private func receiveIdentification(from connection: NWConnection) { + NetworkFraming.receive(from: connection) { [weak self] result in + guard let self else { return } + Task { @MainActor in + switch result { + case .success(let data): + if let message = try? JSONDecoder().decode(MeshMessage.self, from: data), + case .identification(let peerID, let displayName) = message.type { + self.finalizeIncomingConnection(connection, peerID: peerID, displayName: displayName) + } + case .failure(let error): + print("Failed to receive identification: \(error)") + connection.cancel() + } + } + } + } + + private func finalizeIncomingConnection(_ connection: NWConnection, peerID: String, displayName: String) { + // Check if we already have a connection to this peer + if let existing = connections[peerID] { + // Keep the connection from the lower ID peer + if myPeerID < peerID { + // We should have initiated, reject this incoming + connection.cancel() + return + } else { + // They should have initiated, close our outgoing attempt + existing.cancel() + } + } + + connections[peerID] = connection + + var peer = MeshPeer(id: peerID, displayName: displayName) + peer.isConnected = true + connectedPeers[peerID] = peer + discoveredPeers.removeValue(forKey: peerID) + + // Start receiving messages + startReceiving(from: connection, peerID: peerID) + + // Start stabilization + startStabilization(for: peerID) + + print("Incoming connection from: \(displayName)") + } + + private func handleConnectionState(_ state: NWConnection.State, peerID: String, isOutgoing: Bool) { + switch state { + case .ready: + if isOutgoing { + // Send identification + sendIdentification(to: peerID) + + var peer = discoveredPeers[peerID] ?? MeshPeer(id: peerID, displayName: "Unknown") + peer.isConnected = true + connectedPeers[peerID] = peer + discoveredPeers.removeValue(forKey: peerID) + pendingConnections.remove(peerID) + + // Start receiving + if let connection = connections[peerID] { + startReceiving(from: connection, peerID: peerID) + } + + // Start stabilization + startStabilization(for: peerID) + + print("Connected to: \(peer.displayName)") + } + + case .failed(let error): + print("Connection failed to \(peerID): \(error)") + removeConnection(peerID: peerID) + + case .cancelled: + removeConnection(peerID: peerID) + + default: + break + } + } + + private func sendIdentification(to peerID: String) { + guard let connection = connections[peerID] else { return } + + let message = MeshMessage( + type: .identification(peerID: myPeerID, displayName: myDisplayName), + senderID: myPeerID + ) + + if let data = try? JSONEncoder().encode(message) { + NetworkFraming.send(data, over: connection) { error in + if let error = error { + print("Failed to send identification: \(error)") + } + } + } + } + + private func startReceiving(from connection: NWConnection, peerID: String) { + NetworkFraming.startReceiving(from: connection) { [weak self] data in + guard let self else { return } + Task { @MainActor in + self.handleReceivedData(data, from: peerID) + } + } onError: { [weak self] error in + guard let self else { return } + Task { @MainActor in + print("Receive error from \(peerID): \(error)") + self.removeConnection(peerID: peerID) + } + } + } + + private func startStabilization(for peerID: String) { + Task { @MainActor in + try? await Task.sleep(nanoseconds: 4_000_000_000) // 4 seconds + + guard connectedPeers[peerID] != nil else { return } + + stablePeers.insert(peerID) + connectedPeers[peerID]?.isStable = true + + if let peer = connectedPeers[peerID] { + onPeerConnected?(peer) + } + + print("Peer stabilized: \(peerID)") + } + } + + private func removeConnection(peerID: String) { + connections[peerID]?.cancel() + connections.removeValue(forKey: peerID) + pendingConnections.remove(peerID) + stablePeers.remove(peerID) + + if let peer = connectedPeers.removeValue(forKey: peerID) { + onPeerDisconnected?(peer) + } + } + + // MARK: - Message Handling + + private func handleReceivedData(_ data: Data, from peerID: String) { + guard let message = try? JSONDecoder().decode(MeshMessage.self, from: data) else { + return + } + + switch message.type { + case .identification: + // Already handled during connection setup + break + + case .broadcast(let payload): + if let peer = connectedPeers[peerID] { + onMessageReceived?(payload, peer) + } + + case .direct(let payload): + if let peer = connectedPeers[peerID] { + onMessageReceived?(payload, peer) + } + } + } + + // MARK: - Sending Messages + + /// Send to all connected peers + func broadcast(_ data: Data) { + let message = MeshMessage(type: .broadcast(payload: data), senderID: myPeerID) + guard let encoded = try? JSONEncoder().encode(message) else { return } + + for peerID in stablePeers { + if let connection = connections[peerID] { + NetworkFraming.send(encoded, over: connection) { error in + if let error = error { + print("Broadcast failed to \(peerID): \(error)") + } + } + } + } + } + + /// Send to specific peer + func send(_ data: Data, to peerID: String) { + guard stablePeers.contains(peerID), + let connection = connections[peerID] else { return } + + let message = MeshMessage(type: .direct(payload: data), senderID: myPeerID) + guard let encoded = try? JSONEncoder().encode(message) else { return } + + NetworkFraming.send(encoded, over: connection) { error in + if let error = error { + print("Send failed to \(peerID): \(error)") + } + } + } + + /// Send to all except specified peers + func broadcast(_ data: Data, excluding: Set) { + let message = MeshMessage(type: .broadcast(payload: data), senderID: myPeerID) + guard let encoded = try? JSONEncoder().encode(message) else { return } + + for peerID in stablePeers where !excluding.contains(peerID) { + if let connection = connections[peerID] { + NetworkFraming.send(encoded, over: connection) { error in + if let error = error { + print("Broadcast failed to \(peerID): \(error)") + } + } + } + } + } +} +``` + +### MeshMessage Protocol + +```swift +import Foundation + +struct MeshMessage: Codable { + enum MessageType: Codable { + case identification(peerID: String, displayName: String) + case broadcast(payload: Data) + case direct(payload: Data) + } + + let type: MessageType + let senderID: String + let timestamp: Date + + init(type: MessageType, senderID: String) { + self.type = type + self.senderID = senderID + self.timestamp = Date() + } +} +``` + +### Usage Example + +```swift +struct MeshChatView: View { + @StateObject private var mesh = MeshNetworkManager() + @State private var messages: [(String, MeshPeer)] = [] + @State private var inputText = "" + + var body: some View { + VStack { + // Connected peers + HStack { + Text("Connected: \(mesh.connectedPeers.count)") + ForEach(Array(mesh.connectedPeers.values)) { peer in + Text(peer.displayName) + .padding(4) + .background(peer.isStable ? Color.green : Color.orange) + .cornerRadius(4) + } + } + + // Messages + List(messages, id: \.0) { message, peer in + Text("\(peer.displayName): \(message)") + } + + // Input + HStack { + TextField("Message", text: $inputText) + Button("Send") { + if let data = inputText.data(using: .utf8) { + mesh.broadcast(data) + inputText = "" + } + } + } + } + .onAppear { + mesh.onMessageReceived = { data, peer in + if let text = String(data: data, encoding: .utf8) { + messages.append((text, peer)) + } + } + mesh.startMesh() + } + .onDisappear { + mesh.stopMesh() + } + } +} +``` + +### Connection Tie-Breaker Strategy + +To avoid duplicate connections (both peers trying to connect to each other), use a **deterministic tie-breaker**: + +```swift +// Lower peer ID initiates the connection +if myPeerID < discoveredPeerID { + initiateConnection(to: peer) +} else { + // Wait for them to connect to us +} +``` + +This ensures: +- Only ONE connection is created between any two peers +- No race conditions or duplicate connections +- Deterministic behavior across all devices + +### Mesh Network Limits + +| Devices | Connections per Device | Total Connections | +|---------|----------------------|-------------------| +| 2 | 1 | 1 | +| 3 | 2 | 3 | +| 4 | 3 | 6 | +| 5 | 4 | 10 | +| 6 | 5 | 15 | +| 7 | 6 | 21 | +| 8 | 7 | 28 | + +Formula: Total connections = N Γ— (N-1) / 2 + +### Mesh Network Considerations + +1. **Connection Overhead**: Each device maintains N-1 connections. With 8 devices, that's 7 connections per device and 28 total connections in the network. + +2. **Message Duplication**: When broadcasting, each peer sends to all others. Ensure your app handles potential duplicate messages (use message IDs). + +3. **Partial Mesh**: For larger groups, consider a **partial mesh** where not everyone connects to everyone. Use relay nodes instead. + +4. **Battery Impact**: More connections = more battery usage. Consider reducing broadcast frequency or implementing sleep modes. + +5. **Network Partition**: Handle cases where the mesh splits (some devices can't reach others). Implement reconnection logic. + +6. **Message Ordering**: Messages may arrive out of order. Use timestamps or sequence numbers if order matters. + +### Message Relay (Optional) + +For messages that need to reach peers you're not directly connected to: + +```swift +func relayBroadcast(_ data: Data, originalSenderID: String, seenBy: Set) { + var updatedSeenBy = seenBy + updatedSeenBy.insert(myPeerID) + + let message = MeshMessage( + type: .relayedBroadcast(payload: data, originalSender: originalSenderID, seenBy: updatedSeenBy), + senderID: myPeerID + ) + + guard let encoded = try? JSONEncoder().encode(message) else { return } + + // Forward to peers who haven't seen it + for peerID in stablePeers where !updatedSeenBy.contains(peerID) { + if let connection = connections[peerID] { + NetworkFraming.send(encoded, over: connection) { _ in } + } + } +} +``` + +This allows messages to propagate through the entire mesh even if not all devices are directly connected.