diff --git a/package-lock.json b/package-lock.json index 5ff0661..1fb2318 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,7 +16,8 @@ "cors": "^2.8.5", "express": "^5.2.1", "mqtt": "^5.14.1", - "protobufjs": "^7.5.4" + "protobufjs": "^7.5.4", + "ws": "^8.18.3" }, "devDependencies": { "jest": "^30.1.3", diff --git a/prisma/migrations/20260106151912_add_edges/migration.sql b/prisma/migrations/20260106151912_add_edges/migration.sql new file mode 100644 index 0000000..113243f --- /dev/null +++ b/prisma/migrations/20260106151912_add_edges/migration.sql @@ -0,0 +1,23 @@ +-- CreateTable +CREATE TABLE `edges` ( + `id` BIGINT NOT NULL AUTO_INCREMENT, + `from_node_id` BIGINT NOT NULL, + `to_node_id` BIGINT NOT NULL, + `snr` INTEGER NOT NULL, + `from_latitude` INTEGER NULL, + `from_longitude` INTEGER NULL, + `to_latitude` INTEGER NULL, + `to_longitude` INTEGER NULL, + `packet_id` BIGINT NOT NULL, + `channel_id` VARCHAR(191) NULL, + `gateway_id` BIGINT NULL, + `source` VARCHAR(191) NOT NULL, + `created_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), + `updated_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), + + INDEX `edges_from_node_id_idx`(`from_node_id`), + INDEX `edges_to_node_id_idx`(`to_node_id`), + INDEX `edges_created_at_idx`(`created_at`), + INDEX `edges_from_node_id_to_node_id_idx`(`from_node_id`, `to_node_id`), + PRIMARY KEY (`id`) +) DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index ef96a9c..2a908dc 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -347,4 +347,28 @@ model ChannelUtilizationStats { @@index([channel_id]) @@index([recorded_at]) @@map("channel_utilization_stats") +} + +model Edge { + id BigInt @id @default(autoincrement()) + from_node_id BigInt + to_node_id BigInt + snr Int + from_latitude Int? + from_longitude Int? + to_latitude Int? + to_longitude Int? + packet_id BigInt + channel_id String? + gateway_id BigInt? + source String + + created_at DateTime @default(now()) + updated_at DateTime @default(now()) @updatedAt + + @@index(from_node_id) + @@index(to_node_id) + @@index(created_at) + @@index([from_node_id, to_node_id]) + @@map("edges") } \ No newline at end of file diff --git a/src/index.js b/src/index.js index 8d1eabe..48dbcc2 100644 --- a/src/index.js +++ b/src/index.js @@ -155,6 +155,15 @@ app.get('/api', async (req, res) => { "time_to": "Only include traceroutes updated before this unix timestamp (milliseconds)" } }, + { + "path": "/api/v1/connections", + "description": "Aggregated edges between nodes from traceroutes", + "params": { + "node_id": "Only include connections involving this node id", + "time_from": "Only include edges created after this unix timestamp (milliseconds)", + "time_to": "Only include edges created before this unix timestamp (milliseconds)" + } + }, { "path": "/api/v1/nodes/:nodeId/position-history", "description": "Position history for a meshtastic node", @@ -698,6 +707,194 @@ app.get('/api/v1/traceroutes', async (req, res) => { } }); +// Aggregated edges endpoint +// GET /api/v1/connections?node_id=...&time_from=...&time_to=... +app.get('/api/v1/connections', async (req, res) => { + try { + const nodeId = req.query.node_id ? parseInt(req.query.node_id) : undefined; + const timeFrom = req.query.time_from ? parseInt(req.query.time_from) : undefined; + const timeTo = req.query.time_to ? parseInt(req.query.time_to) : undefined; + + // Query edges from database + const edges = await prisma.edge.findMany({ + where: { + created_at: { + ...(timeFrom && { gte: new Date(timeFrom) }), + ...(timeTo && { lte: new Date(timeTo) }), + }, + // Only include edges where both nodes have positions + from_latitude: { not: null }, + from_longitude: { not: null }, + to_latitude: { not: null }, + to_longitude: { not: null }, + // If node_id is provided, filter edges where either from_node_id or to_node_id matches + ...(nodeId !== undefined && { + OR: [ + { from_node_id: nodeId }, + { to_node_id: nodeId }, + ], + }), + }, + orderBy: [ + { created_at: 'desc' }, + { packet_id: 'desc' }, + ], + }); + + // Collect all unique node IDs from edges + const nodeIds = new Set(); + for (const edge of edges) { + nodeIds.add(edge.from_node_id); + nodeIds.add(edge.to_node_id); + } + + // Fetch current positions for all nodes + const nodes = await prisma.node.findMany({ + where: { + node_id: { in: Array.from(nodeIds) }, + }, + select: { + node_id: true, + latitude: true, + longitude: true, + }, + }); + + // Create a map of current node positions + const nodePositions = new Map(); + for (const node of nodes) { + nodePositions.set(node.node_id, { + latitude: node.latitude, + longitude: node.longitude, + }); + } + + // Filter edges: only include edges where both nodes are still at the same location + const validEdges = edges.filter(edge => { + const fromCurrentPos = nodePositions.get(edge.from_node_id); + const toCurrentPos = nodePositions.get(edge.to_node_id); + + // Skip if either node doesn't exist or doesn't have a current position + if (!fromCurrentPos || !toCurrentPos || + fromCurrentPos.latitude === null || fromCurrentPos.longitude === null || + toCurrentPos.latitude === null || toCurrentPos.longitude === null) { + return false; + } + + // Check if stored positions match current positions + const fromMatches = fromCurrentPos.latitude === edge.from_latitude && + fromCurrentPos.longitude === edge.from_longitude; + const toMatches = toCurrentPos.latitude === edge.to_latitude && + toCurrentPos.longitude === edge.to_longitude; + + return fromMatches && toMatches; + }); + + // Normalize node pairs: always use min/max to treat A->B and B->A as same connection + const connectionsMap = new Map(); + + for (const edge of validEdges) { + const nodeA = edge.from_node_id < edge.to_node_id ? edge.from_node_id : edge.to_node_id; + const nodeB = edge.from_node_id < edge.to_node_id ? edge.to_node_id : edge.from_node_id; + const key = `${nodeA}-${nodeB}`; + + if (!connectionsMap.has(key)) { + connectionsMap.set(key, { + node_a: nodeA, + node_b: nodeB, + direction_ab: [], // A -> B edges + direction_ba: [], // B -> A edges + }); + } + + const connection = connectionsMap.get(key); + const isAB = edge.from_node_id === nodeA; + + // Add edge to appropriate direction + if (isAB) { + connection.direction_ab.push({ + snr: edge.snr, + snr_db: edge.snr / 4, // Convert to dB + created_at: edge.created_at, + packet_id: edge.packet_id, + source: edge.source, + }); + } else { + connection.direction_ba.push({ + snr: edge.snr, + snr_db: edge.snr / 4, + created_at: edge.created_at, + packet_id: edge.packet_id, + source: edge.source, + }); + } + } + + // Aggregate each connection + const connections = Array.from(connectionsMap.values()).map(conn => { + // Deduplicate edges by packet_id for each direction (keep first occurrence, which is most recent) + const dedupeByPacketId = (edges) => { + const seen = new Set(); + return edges.filter(edge => { + if (seen.has(edge.packet_id)) { + return false; + } + seen.add(edge.packet_id); + return true; + }); + }; + + const deduplicatedAB = dedupeByPacketId(conn.direction_ab); + const deduplicatedBA = dedupeByPacketId(conn.direction_ba); + + // Calculate average SNR for A->B (using deduplicated edges) + const avgSnrAB = deduplicatedAB.length > 0 + ? deduplicatedAB.reduce((sum, e) => sum + e.snr_db, 0) / deduplicatedAB.length + : null; + + // Calculate average SNR for B->A (using deduplicated edges) + const avgSnrBA = deduplicatedBA.length > 0 + ? deduplicatedBA.reduce((sum, e) => sum + e.snr_db, 0) / deduplicatedBA.length + : null; + + // Get last 5 edges for each direction (already sorted by created_at DESC, packet_id DESC, now deduplicated) + const last5AB = deduplicatedAB.slice(0, 5); + const last5BA = deduplicatedBA.slice(0, 5); + + // Determine worst average SNR + const worstAvgSnrDb = [avgSnrAB, avgSnrBA] + .filter(v => v !== null) + .reduce((min, val) => val < min ? val : min, Infinity); + + return { + node_a: conn.node_a, + node_b: conn.node_b, + direction_ab: { + avg_snr_db: avgSnrAB, + last_5_edges: last5AB, + total_count: deduplicatedAB.length, // Use deduplicated count + }, + direction_ba: { + avg_snr_db: avgSnrBA, + last_5_edges: last5BA, + total_count: deduplicatedBA.length, // Use deduplicated count + }, + worst_avg_snr_db: worstAvgSnrDb !== Infinity ? worstAvgSnrDb : null, + }; + }).filter(conn => conn.worst_avg_snr_db !== null); // Only return connections with at least one direction + + res.json({ + connections: connections, + }); + + } catch (err) { + console.error(err); + res.status(500).json({ + message: "Something went wrong, try again later.", + }); + } +}); + app.get('/api/v1/nodes/:nodeId/position-history', async (req, res) => { try { diff --git a/src/mqtt.js b/src/mqtt.js index f0f65bd..f7bf9b5 100644 --- a/src/mqtt.js +++ b/src/mqtt.js @@ -1094,6 +1094,70 @@ client.on("message", async (topic, message) => { console.error(e); } + // Extract edges from neighbour info + try { + const fromNodeId = envelope.packet.from; + const neighbors = neighbourInfo.neighbors || []; + const packetId = envelope.packet.id; + const channelId = envelope.channelId; + const gatewayId = envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null; + const edgesToCreate = []; + + for(const neighbour of neighbors) { + // Skip if no node ID + if(!neighbour.nodeId) { + continue; + } + + // Skip if SNR is invalid (0 or null/undefined) + // Note: SNR can be negative, so we check for 0 specifically + if(neighbour.snr === 0 || neighbour.snr == null) { + continue; + } + + const toNodeId = neighbour.nodeId; + const snr = neighbour.snr; + + // Fetch node positions from Node table + const [fromNode, toNode] = await Promise.all([ + prisma.node.findUnique({ + where: { node_id: fromNodeId }, + select: { latitude: true, longitude: true }, + }), + prisma.node.findUnique({ + where: { node_id: toNodeId }, + select: { latitude: true, longitude: true }, + }), + ]); + + // Create edge record + edgesToCreate.push({ + from_node_id: fromNodeId, + to_node_id: toNodeId, + snr: snr, + from_latitude: fromNode?.latitude ?? null, + from_longitude: fromNode?.longitude ?? null, + to_latitude: toNode?.latitude ?? null, + to_longitude: toNode?.longitude ?? null, + packet_id: packetId, + channel_id: channelId, + gateway_id: gatewayId, + source: "NEIGHBORINFO_APP", + }); + } + + // Bulk insert edges + if(edgesToCreate.length > 0) { + await prisma.edge.createMany({ + data: edgesToCreate, + skipDuplicates: true, // Skip if exact duplicate exists + }); + } + } catch (e) { + // Log error but don't crash - edge extraction is non-critical + console.error("Error extracting edges from neighbour info:", e); + } + // don't store all neighbour infos, but we want to update the existing node above if(!collectNeighbourInfo){ return; @@ -1336,6 +1400,160 @@ client.on("message", async (topic, message) => { console.error(e); } + // Extract edges from traceroute (only for response packets) + if(!envelope.packet.decoded.wantResponse) { + try { + const route = routeDiscovery.route || []; + const snrTowards = routeDiscovery.snrTowards || []; + const originNodeId = envelope.packet.to; + const destinationNodeId = envelope.packet.from; + const packetId = envelope.packet.id; + const channelId = envelope.channelId; + const gatewayId = envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null; + + // Determine number of edges: route.length + 1 + const numEdges = route.length + 1; + const edgesToCreate = []; + + // Extract edges from the route path + for(let i = 0; i < numEdges; i++) { + // Get SNR for this edge + if(i >= snrTowards.length) { + // Array length mismatch - skip this edge + continue; + } + + const snr = snrTowards[i]; + + // Skip if SNR is -128 (no SNR recorded) + if(snr === -128) { + continue; + } + + // Determine from_node and to_node + let fromNodeId, toNodeId; + + if(route.length === 0) { + // Empty route: direct connection (to -> from) + fromNodeId = originNodeId; + toNodeId = destinationNodeId; + } else if(i === 0) { + // First edge: origin -> route[0] + fromNodeId = originNodeId; + toNodeId = route[0]; + } else if(i === route.length) { + // Last edge: route[route.length-1] -> destination + fromNodeId = route[route.length - 1]; + toNodeId = destinationNodeId; + } else { + // Middle edge: route[i-1] -> route[i] + fromNodeId = route[i - 1]; + toNodeId = route[i]; + } + + // Fetch node positions from Node table + const [fromNode, toNode] = await Promise.all([ + prisma.node.findUnique({ + where: { node_id: fromNodeId }, + select: { latitude: true, longitude: true }, + }), + prisma.node.findUnique({ + where: { node_id: toNodeId }, + select: { latitude: true, longitude: true }, + }), + ]); + + // Create edge record (skip if nodes don't exist, but still create edge with null positions) + edgesToCreate.push({ + from_node_id: fromNodeId, + to_node_id: toNodeId, + snr: snr, + from_latitude: fromNode?.latitude ?? null, + from_longitude: fromNode?.longitude ?? null, + to_latitude: toNode?.latitude ?? null, + to_longitude: toNode?.longitude ?? null, + packet_id: packetId, + channel_id: channelId, + gateway_id: gatewayId, + source: "TRACEROUTE_APP", + }); + } + + // Extract edges from route_back path + const routeBack = routeDiscovery.routeBack || []; + const snrBack = routeDiscovery.snrBack || []; + + if(routeBack.length > 0) { + // Number of edges in route_back equals route_back.length + for(let i = 0; i < routeBack.length; i++) { + // Get SNR for this edge + if(i >= snrBack.length) { + // Array length mismatch - skip this edge + continue; + } + + const snr = snrBack[i]; + + // Skip if SNR is -128 (no SNR recorded) + if(snr === -128) { + continue; + } + + // Determine from_node and to_node + let fromNodeId, toNodeId; + + if(i === 0) { + // First edge: from -> route_back[0] + fromNodeId = destinationNodeId; // 'from' in the packet + toNodeId = routeBack[0]; + } else { + // Subsequent edges: route_back[i-1] -> route_back[i] + fromNodeId = routeBack[i - 1]; + toNodeId = routeBack[i]; + } + + // Fetch node positions from Node table + const [fromNode, toNode] = await Promise.all([ + prisma.node.findUnique({ + where: { node_id: fromNodeId }, + select: { latitude: true, longitude: true }, + }), + prisma.node.findUnique({ + where: { node_id: toNodeId }, + select: { latitude: true, longitude: true }, + }), + ]); + + // Create edge record + edgesToCreate.push({ + from_node_id: fromNodeId, + to_node_id: toNodeId, + snr: snr, + from_latitude: fromNode?.latitude ?? null, + from_longitude: fromNode?.longitude ?? null, + to_latitude: toNode?.latitude ?? null, + to_longitude: toNode?.longitude ?? null, + packet_id: packetId, + channel_id: channelId, + gateway_id: gatewayId, + source: "TRACEROUTE_APP", + }); + } + } + + // Bulk insert edges + if(edgesToCreate.length > 0) { + await prisma.edge.createMany({ + data: edgesToCreate, + skipDuplicates: true, // Skip if exact duplicate exists + }); + } + } catch (e) { + // Log error but don't crash - edge extraction is non-critical + console.error("Error extracting edges from traceroute:", e); + } + } + } else if(portnum === 73) { diff --git a/src/public/index.html b/src/public/index.html index 6a71a50..e9443d4 100644 --- a/src/public/index.html +++ b/src/public/index.html @@ -1372,33 +1372,41 @@ - +
- -
Traceroute edges older than this time are hidden. Reload to update map.
- + - - - - - + +
- +
- -
Neighbours further than this are hidden. Reload to update map.
- +
+
+ +
+ +
+
Colors the connection lines by the average SNR in the worst direction. Reload to update map.
+
+ + +
+ +
Connections further than this are hidden. Reload to update map.
+
@@ -1439,7 +1447,6 @@
Map will animate flying to nodes.
- @@ -1449,7 +1456,7 @@ - +