diff --git a/docker-compose.yml b/docker-compose.yml index 85d17e6..026bb0c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -30,6 +30,18 @@ services: DATABASE_URL: "mysql://root:password@database:3306/meshtastic-map?connection_limit=100" MAP_OPTS: "" # add any custom index.js options here + # publishes mqtt packets via websocket + meshtastic-ws: + container_name: meshtastic-ws + build: + context: . + dockerfile: ./Dockerfile + command: /app/docker/ws.sh + ports: + - 8081:8081/tcp + environment: + WS_OPTS: "" + # runs the database to store everything from mqtt database: container_name: database diff --git a/docker/ws.sh b/docker/ws.sh new file mode 100755 index 0000000..71d4048 --- /dev/null +++ b/docker/ws.sh @@ -0,0 +1,5 @@ +#!/bin/sh + +echo "Starting websocket publisher" +exec node src/ws.js ${WS_OPTS} + diff --git a/package.json b/package.json index b0abbf1..a2408aa 100644 --- a/package.json +++ b/package.json @@ -16,7 +16,8 @@ "cors": "^2.8.5", "express": "^5.0.0", "mqtt": "^5.14.1", - "protobufjs": "^7.5.4" + "protobufjs": "^7.5.4", + "ws": "^8.18.3" }, "devDependencies": { "jest": "^30.1.3", diff --git a/src/public/index.html b/src/public/index.html index 5b30a38..b88c2fd 100644 --- a/src/public/index.html +++ b/src/public/index.html @@ -90,6 +90,18 @@ border: 1px solid white; } + .icon-traceroute-start { + background-color: #16a34a; /* green */ + border-radius: 25px; + border: 1px solid white; + } + + .icon-traceroute-end { + background-color: #dc2626; /* red */ + border-radius: 25px; + border: 1px solid white; + } + .waypoint-label { font-size: 26px; background-color: transparent; @@ -1619,7 +1631,7 @@ } catch(e) {} // overlays enabled by default - return ["Legend", "Position History"]; + return ["Legend", "Position History", "Traceroutes"]; } @@ -2791,6 +2803,16 @@ iconSize: [16, 16], // increase from 12px to 16px to make hover easier }); + var iconTracerouteStart = L.divIcon({ + className: 'icon-traceroute-start', + iconSize: [16, 16], + }); + + var iconTracerouteEnd = L.divIcon({ + className: 'icon-traceroute-end', + iconSize: [16, 16], + }); + // create legend var legendLayerGroup = new L.LayerGroup(); var legend = L.control({position: 'bottomleft'}); @@ -2801,7 +2823,8 @@ div.innerHTML = `
Legend
` + `
MediumFast
` + `
LongFast
` - + `
Offline Too Long
`; + + `
Offline Too Long
` + + `
Traceroute
`; return div; }; @@ -3865,8 +3888,6 @@ function onTracerouteEdgesUpdated(edges) { - traceroutesLayerGroup.clearLayers(); - tracerouteEdgesCache = edges; for (const edge of edges) { @@ -3910,23 +3931,6 @@ + `

Terrain images from HeyWhatsThat.com` + `
`; - const line = L.polyline([ - fromMarker.getLatLng(), - toMarker.getLatLng(), - ], { - color: colour, - opacity: 0.9, - }).addTo(traceroutesLayerGroup); - - line.bindTooltip(tooltip, { - sticky: true, - opacity: 1, - interactive: true, - }).bindPopup(tooltip) - .on('click', function(event) { - event.target.closeTooltip(); - }); - // additional line for backbone neighbours const backboneNeighbourLine = L.polyline([ fromMarker.getLatLng(), @@ -3954,7 +3958,6 @@ }); if(fromNode.is_backbone && toNode.is_backbone) { - console.log("Adding to backbone neighbours layer group"); backboneNeighbourLine.addTo(backboneNeighboursLayerGroup); } } @@ -4328,6 +4331,192 @@ // reload and go to provided node id reload(queryNodeId, queryZoom); + // WebSocket connection for real-time messages + var ws = null; + var tracerouteCooldown = {}; // Track last traceroute time per from node (for 20s cooldown) + var activeTracerouteKeys = new Set(); // Track active traceroute visualizations to prevent duplicates + var tracerouteLines = {}; // Track lines for each traceroute route key for cleanup + + function connectWebSocket() { + // Determine WebSocket URL - use same hostname as current page, port 8081 + const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; + const wsHost = window.location.hostname; + const wsPort = '8081'; + const wsUrl = `${wsProtocol}//${wsHost}:${wsPort}`; + + console.log('Connecting to WebSocket:', wsUrl); + ws = new WebSocket(wsUrl); + + ws.onopen = function() { + console.log('WebSocket connected'); + }; + + ws.onmessage = function(event) { + try { + const message = JSON.parse(event.data); + handleWebSocketMessage(message); + } catch (e) { + console.error('Error parsing WebSocket message:', e); + } + }; + + ws.onerror = function(error) { + console.error('WebSocket error:', error); + }; + + ws.onclose = function() { + console.log('WebSocket disconnected, reconnecting in 5 seconds...'); + setTimeout(connectWebSocket, 5000); + }; + } + + function handleWebSocketMessage(message) { + if (message.type === 'traceroute') { + handleTraceroute(message.data); + } + } + + function handleTraceroute(data) { + // Only visualize traceroutes where want_response is false (the reply coming back) + if (data.want_response) { + return; + } + + // When want_response is false, from and to are swapped from the original request + // The path goes from 'to' (original sender) through route to 'from' (original destination) + const originalSender = data.to; // This was the original sender + const originalDestination = data.from; // This was the original destination + const route = data.route || []; + const snrTowards = data.snr_towards || []; + + // Deduplicate: ignore traceroutes from the same original sender for 20 seconds + const now = Date.now(); + if (tracerouteCooldown[originalSender] && (now - tracerouteCooldown[originalSender]) < 20000) { + return; // Still in cooldown period + } + + // Create unique key for this traceroute path to prevent duplicate visualizations + // Use original sender (to), original destination (from), and route to create unique key + // (ignoring gateway_id since multiple gateways can receive same route) + const routeKey = `${originalSender}-${originalDestination}-${route.join('-')}`; + if (activeTracerouteKeys.has(routeKey)) { + return; // Already visualizing this route + } + + // Mark as active and set cooldown + activeTracerouteKeys.add(routeKey); + tracerouteCooldown[originalSender] = now; + + // Build the complete path: to (original sender) -> route[0] -> route[1] -> ... -> from (original destination) + const path = [originalSender]; // Start from original sender + if (route.length > 0) { + path.push(...route); + } + path.push(originalDestination); // End at original destination + + // Visualize the traceroute with animated hops + visualizeTraceroute(path, snrTowards, routeKey); + } + + function visualizeTraceroute(path, snrTowards, routeKey) { + // Verify all nodes in path exist on map + const pathMarkers = []; + for (const nodeId of path) { + const marker = findNodeMarkerById(nodeId); + if (!marker) { + // Node not on map, skip this traceroute + activeTracerouteKeys.delete(routeKey); + return; + } + pathMarkers.push(marker); + } + + // Store lines and overlays for this route key for cleanup + const routeElements = { + lines: [], + startOverlay: null, + endOverlay: null, + }; + tracerouteLines[routeKey] = routeElements; + + // Color starting node (first in path) green and destination node (last in path) red + const startMarker = pathMarkers[0]; + const endMarker = pathMarkers[pathMarkers.length - 1]; + + const startOverlay = L.marker(startMarker.getLatLng(), { + icon: iconTracerouteStart, + zIndexOffset: 10000, // Ensure it's on top + }).addTo(traceroutesLayerGroup); + + const endOverlay = L.marker(endMarker.getLatLng(), { + icon: iconTracerouteEnd, + zIndexOffset: 10000, // Ensure it's on top + }).addTo(traceroutesLayerGroup); + + // Store overlays for cleanup + routeElements.startOverlay = startOverlay; + routeElements.endOverlay = endOverlay; + + // Animate each hop sequentially + let hopIndex = 0; + const animateNextHop = () => { + if (hopIndex >= pathMarkers.length - 1) { + // All hops animated, cleanup after delay + setTimeout(() => { + if (tracerouteLines[routeKey]) { + const routeElements = tracerouteLines[routeKey]; + // Remove all lines + if (routeElements.lines) { + routeElements.lines.forEach(line => { + line.remove(); + }); + } + // Remove node overlays + if (routeElements.startOverlay) { + routeElements.startOverlay.remove(); + } + if (routeElements.endOverlay) { + routeElements.endOverlay.remove(); + } + delete tracerouteLines[routeKey]; + } + activeTracerouteKeys.delete(routeKey); + }, 2000); + return; + } + + const fromMarker = pathMarkers[hopIndex]; + const toMarker = pathMarkers[hopIndex + 1]; + const snr = hopIndex < snrTowards.length ? snrTowards[hopIndex] : null; + + // Use orange color for all traceroute lines + const lineColor = '#f97316'; // orange + + // Create animated polyline for this hop with orange dotted style + const line = L.polyline([fromMarker.getLatLng(), toMarker.getLatLng()], { + color: lineColor, + weight: 4, + opacity: 0, // Start invisible + // dashArray: '10, 5', // Dotted line style + zIndexOffset: 10000, + }).addTo(traceroutesLayerGroup); + + // Fade in animation + line.setStyle({ opacity: 1.0 }); + tracerouteLines[routeKey].lines.push(line); + + // Animate next hop after 600ms delay + hopIndex++; + setTimeout(animateNextHop, 600); + }; + + // Start animation + animateNextHop(); + } + + // Connect WebSocket when page loads + connectWebSocket(); + diff --git a/src/ws.js b/src/ws.js new file mode 100644 index 0000000..9160627 --- /dev/null +++ b/src/ws.js @@ -0,0 +1,313 @@ +const crypto = require("crypto"); +const path = require("path"); +const http = require("http"); +const mqtt = require("mqtt"); +const protobufjs = require("protobufjs"); +const commandLineArgs = require("command-line-args"); +const commandLineUsage = require("command-line-usage"); +const { WebSocketServer } = require("ws"); + +const optionsList = [ + { + name: 'help', + alias: 'h', + type: Boolean, + description: 'Display this usage guide.' + }, + { + name: "mqtt-broker-url", + type: String, + description: "MQTT Broker URL (e.g: mqtt://mqtt.meshtastic.org)", + }, + { + name: "mqtt-username", + type: String, + description: "MQTT Username (e.g: meshdev)", + }, + { + name: "mqtt-password", + type: String, + description: "MQTT Password (e.g: large4cats)", + }, + { + name: "mqtt-client-id", + type: String, + description: "MQTT Client ID (e.g: map.example.com)", + }, + { + name: "mqtt-topic", + type: String, + multiple: true, + typeLabel: ' ...', + description: "MQTT Topic to subscribe to (e.g: msh/#)", + }, + { + name: "decryption-keys", + type: String, + multiple: true, + typeLabel: ' ...', + description: "Decryption keys encoded in base64 to use when decrypting service envelopes.", + }, + { + name: "ws-port", + type: Number, + description: "WebSocket server port (default: 8081)", + }, +]; + +// parse command line args +const options = commandLineArgs(optionsList); + +// show help +if(options.help){ + const usage = commandLineUsage([ + { + header: 'Meshtastic WebSocket Publisher', + content: 'Publishes real-time Meshtastic packets via WebSocket.', + }, + { + header: 'Options', + optionList: optionsList, + }, + ]); + console.log(usage); + process.exit(0); +} + +// get options and fallback to default values +const mqttBrokerUrl = options["mqtt-broker-url"] ?? "mqtt://mqtt.meshtastic.org"; +const mqttUsername = options["mqtt-username"] ?? "meshdev"; +const mqttPassword = options["mqtt-password"] ?? "large4cats"; +const mqttClientId = options["mqtt-client-id"] ?? null; +const mqttTopics = options["mqtt-topic"] ?? ["msh/#"]; +const decryptionKeys = options["decryption-keys"] ?? [ + "1PG7OiApB1nwvP+rz05pAQ==", // add default "AQ==" decryption key +]; +const wsPort = options["ws-port"] ?? 8081; + +// create mqtt client +const client = mqtt.connect(mqttBrokerUrl, { + username: mqttUsername, + password: mqttPassword, + clientId: mqttClientId, +}); + +// load protobufs +const root = new protobufjs.Root(); +root.resolvePath = (origin, target) => path.join(__dirname, "protobufs", target); +root.loadSync('meshtastic/mqtt.proto'); +const Data = root.lookupType("Data"); +const ServiceEnvelope = root.lookupType("ServiceEnvelope"); +const RouteDiscovery = root.lookupType("RouteDiscovery"); + +// create HTTP server for WebSocket +const server = http.createServer(); +const wss = new WebSocketServer({ server }); + +// track connected clients +const clients = new Set(); + +wss.on('connection', (ws) => { + clients.add(ws); + console.log(`WebSocket client connected. Total clients: ${clients.size}`); + + ws.on('close', () => { + clients.delete(ws); + console.log(`WebSocket client disconnected. Total clients: ${clients.size}`); + }); + + ws.on('error', (error) => { + console.error('WebSocket error:', error); + clients.delete(ws); + }); +}); + +// broadcast message to all connected clients +function broadcast(message) { + const messageStr = JSON.stringify(message); + clients.forEach((client) => { + if (client.readyState === 1) { // WebSocket.OPEN + try { + client.send(messageStr); + } catch (error) { + console.error('Error sending message to client:', error); + } + } + }); +} + +function createNonce(packetId, fromNode) { + // Expand packetId to 64 bits + const packetId64 = BigInt(packetId); + + // Initialize block counter (32-bit, starts at zero) + const blockCounter = 0; + + // Create a buffer for the nonce + const buf = Buffer.alloc(16); + + // Write packetId, fromNode, and block counter to the buffer + buf.writeBigUInt64LE(packetId64, 0); + buf.writeUInt32LE(fromNode, 8); + buf.writeUInt32LE(blockCounter, 12); + + return buf; +} + +/** + * References: + * https://github.com/crypto-smoke/meshtastic-go/blob/develop/radio/aes.go#L42 + * https://github.com/pdxlocations/Meshtastic-MQTT-Connect/blob/main/meshtastic-mqtt-connect.py#L381 + */ +function decrypt(packet) { + // attempt to decrypt with all available decryption keys + for(const decryptionKey of decryptionKeys){ + try { + // convert encryption key to buffer + const key = Buffer.from(decryptionKey, "base64"); + + // create decryption iv/nonce for this packet + const nonceBuffer = createNonce(packet.id, packet.from); + + // determine algorithm based on key length + var algorithm = null; + if(key.length === 16){ + algorithm = "aes-128-ctr"; + } else if(key.length === 32){ + algorithm = "aes-256-ctr"; + } else { + // skip this key, try the next one... + console.error(`Skipping decryption key with invalid length: ${key.length}`); + continue; + } + + // create decipher + const decipher = crypto.createDecipheriv(algorithm, key, nonceBuffer); + + // decrypt encrypted packet + const decryptedBuffer = Buffer.concat([decipher.update(packet.encrypted), decipher.final()]); + + // parse as data message + return Data.decode(decryptedBuffer); + + } catch(e){} + } + + // couldn't decrypt + return null; +} + +/** + * converts hex id to numeric id, for example: !FFFFFFFF to 4294967295 + * @param hexId a node id in hex format with a prepended "!" + * @returns {bigint} the node id in numeric form + */ +function convertHexIdToNumericId(hexId) { + return BigInt('0x' + hexId.replaceAll("!", "")); +} + +// subscribe to everything when connected +client.on("connect", () => { + console.log("Connected to MQTT broker"); + for(const mqttTopic of mqttTopics){ + client.subscribe(mqttTopic); + console.log(`Subscribed to MQTT topic: ${mqttTopic}`); + } +}); + +// handle message received +client.on("message", async (topic, message) => { + try { + // decode service envelope + const envelope = ServiceEnvelope.decode(message); + if(!envelope.packet){ + return; + } + + // attempt to decrypt encrypted packets + const isEncrypted = envelope.packet.encrypted?.length > 0; + if(isEncrypted){ + const decoded = decrypt(envelope.packet); + if(decoded){ + envelope.packet.decoded = decoded; + } + } + + // get portnum from decoded packet + const portnum = envelope.packet?.decoded?.portnum; + + // check if we can see the decrypted packet data + if(envelope.packet.decoded == null){ + return; + } + + // handle traceroutes (portnum 70) + if(portnum === 70) { + try { + const routeDiscovery = RouteDiscovery.decode(envelope.packet.decoded.payload); + + const traceroute = { + type: "traceroute", + data: { + to: envelope.packet.to, + from: envelope.packet.from, + want_response: envelope.packet.decoded.wantResponse, + route: routeDiscovery.route, + snr_towards: routeDiscovery.snrTowards, + route_back: routeDiscovery.routeBack, + snr_back: routeDiscovery.snrBack, + channel_id: envelope.channelId, + gateway_id: envelope.gatewayId ? Number(convertHexIdToNumericId(envelope.gatewayId)) : null, + packet_id: envelope.packet.id, + } + }; + broadcast(traceroute); + } catch (e) { + console.error("Error processing traceroute:", e); + } + } + + } catch(e) { + console.error("Error processing MQTT message:", e); + } +}); + +// start WebSocket server +server.listen(wsPort, () => { + console.log(`WebSocket server running on port ${wsPort}`); +}); + +// Graceful shutdown handlers +function gracefulShutdown(signal) { + console.log(`Received ${signal}. Starting graceful shutdown...`); + + // Close all WebSocket connections + clients.forEach((client) => { + client.close(); + }); + clients.clear(); + + // Close WebSocket server + wss.close(() => { + console.log('WebSocket server closed'); + }); + + // Close HTTP server + server.close(() => { + console.log('HTTP server closed'); + }); + + // Close MQTT client + client.end(false, () => { + console.log('MQTT client disconnected'); + console.log('Graceful shutdown completed'); + process.exit(0); + }); +} + +// Handle SIGTERM (Docker, systemd, etc.) +process.on('SIGTERM', () => gracefulShutdown('SIGTERM')); + +// Handle SIGINT (Ctrl+C) +process.on('SIGINT', () => gracefulShutdown('SIGINT')); +