Merge pull request #68 from Roslund/Collect-edges
Ny Funktionalitet för kopplingar och signalstyrka
This commit is contained in:
commit
f690bb65a7
6 changed files with 785 additions and 550 deletions
3
package-lock.json
generated
3
package-lock.json
generated
|
|
@ -16,7 +16,8 @@
|
|||
"cors": "^2.8.5",
|
||||
"express": "^5.2.1",
|
||||
"mqtt": "^5.14.1",
|
||||
"protobufjs": "^7.5.4"
|
||||
"protobufjs": "^7.5.4",
|
||||
"ws": "^8.18.3"
|
||||
},
|
||||
"devDependencies": {
|
||||
"jest": "^30.1.3",
|
||||
|
|
|
|||
23
prisma/migrations/20260106151912_add_edges/migration.sql
Normal file
23
prisma/migrations/20260106151912_add_edges/migration.sql
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
-- CreateTable
|
||||
CREATE TABLE `edges` (
|
||||
`id` BIGINT NOT NULL AUTO_INCREMENT,
|
||||
`from_node_id` BIGINT NOT NULL,
|
||||
`to_node_id` BIGINT NOT NULL,
|
||||
`snr` INTEGER NOT NULL,
|
||||
`from_latitude` INTEGER NULL,
|
||||
`from_longitude` INTEGER NULL,
|
||||
`to_latitude` INTEGER NULL,
|
||||
`to_longitude` INTEGER NULL,
|
||||
`packet_id` BIGINT NOT NULL,
|
||||
`channel_id` VARCHAR(191) NULL,
|
||||
`gateway_id` BIGINT NULL,
|
||||
`source` VARCHAR(191) NOT NULL,
|
||||
`created_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
|
||||
`updated_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
|
||||
|
||||
INDEX `edges_from_node_id_idx`(`from_node_id`),
|
||||
INDEX `edges_to_node_id_idx`(`to_node_id`),
|
||||
INDEX `edges_created_at_idx`(`created_at`),
|
||||
INDEX `edges_from_node_id_to_node_id_idx`(`from_node_id`, `to_node_id`),
|
||||
PRIMARY KEY (`id`)
|
||||
) DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
|
||||
|
|
@ -348,3 +348,27 @@ model ChannelUtilizationStats {
|
|||
@@index([recorded_at])
|
||||
@@map("channel_utilization_stats")
|
||||
}
|
||||
|
||||
model Edge {
|
||||
id BigInt @id @default(autoincrement())
|
||||
from_node_id BigInt
|
||||
to_node_id BigInt
|
||||
snr Int
|
||||
from_latitude Int?
|
||||
from_longitude Int?
|
||||
to_latitude Int?
|
||||
to_longitude Int?
|
||||
packet_id BigInt
|
||||
channel_id String?
|
||||
gateway_id BigInt?
|
||||
source String
|
||||
|
||||
created_at DateTime @default(now())
|
||||
updated_at DateTime @default(now()) @updatedAt
|
||||
|
||||
@@index(from_node_id)
|
||||
@@index(to_node_id)
|
||||
@@index(created_at)
|
||||
@@index([from_node_id, to_node_id])
|
||||
@@map("edges")
|
||||
}
|
||||
197
src/index.js
197
src/index.js
|
|
@ -155,6 +155,15 @@ app.get('/api', async (req, res) => {
|
|||
"time_to": "Only include traceroutes updated before this unix timestamp (milliseconds)"
|
||||
}
|
||||
},
|
||||
{
|
||||
"path": "/api/v1/connections",
|
||||
"description": "Aggregated edges between nodes from traceroutes",
|
||||
"params": {
|
||||
"node_id": "Only include connections involving this node id",
|
||||
"time_from": "Only include edges created after this unix timestamp (milliseconds)",
|
||||
"time_to": "Only include edges created before this unix timestamp (milliseconds)"
|
||||
}
|
||||
},
|
||||
{
|
||||
"path": "/api/v1/nodes/:nodeId/position-history",
|
||||
"description": "Position history for a meshtastic node",
|
||||
|
|
@ -698,6 +707,194 @@ app.get('/api/v1/traceroutes', async (req, res) => {
|
|||
}
|
||||
});
|
||||
|
||||
// Aggregated edges endpoint
|
||||
// GET /api/v1/connections?node_id=...&time_from=...&time_to=...
|
||||
app.get('/api/v1/connections', async (req, res) => {
|
||||
try {
|
||||
const nodeId = req.query.node_id ? parseInt(req.query.node_id) : undefined;
|
||||
const timeFrom = req.query.time_from ? parseInt(req.query.time_from) : undefined;
|
||||
const timeTo = req.query.time_to ? parseInt(req.query.time_to) : undefined;
|
||||
|
||||
// Query edges from database
|
||||
const edges = await prisma.edge.findMany({
|
||||
where: {
|
||||
created_at: {
|
||||
...(timeFrom && { gte: new Date(timeFrom) }),
|
||||
...(timeTo && { lte: new Date(timeTo) }),
|
||||
},
|
||||
// Only include edges where both nodes have positions
|
||||
from_latitude: { not: null },
|
||||
from_longitude: { not: null },
|
||||
to_latitude: { not: null },
|
||||
to_longitude: { not: null },
|
||||
// If node_id is provided, filter edges where either from_node_id or to_node_id matches
|
||||
...(nodeId !== undefined && {
|
||||
OR: [
|
||||
{ from_node_id: nodeId },
|
||||
{ to_node_id: nodeId },
|
||||
],
|
||||
}),
|
||||
},
|
||||
orderBy: [
|
||||
{ created_at: 'desc' },
|
||||
{ packet_id: 'desc' },
|
||||
],
|
||||
});
|
||||
|
||||
// Collect all unique node IDs from edges
|
||||
const nodeIds = new Set();
|
||||
for (const edge of edges) {
|
||||
nodeIds.add(edge.from_node_id);
|
||||
nodeIds.add(edge.to_node_id);
|
||||
}
|
||||
|
||||
// Fetch current positions for all nodes
|
||||
const nodes = await prisma.node.findMany({
|
||||
where: {
|
||||
node_id: { in: Array.from(nodeIds) },
|
||||
},
|
||||
select: {
|
||||
node_id: true,
|
||||
latitude: true,
|
||||
longitude: true,
|
||||
},
|
||||
});
|
||||
|
||||
// Create a map of current node positions
|
||||
const nodePositions = new Map();
|
||||
for (const node of nodes) {
|
||||
nodePositions.set(node.node_id, {
|
||||
latitude: node.latitude,
|
||||
longitude: node.longitude,
|
||||
});
|
||||
}
|
||||
|
||||
// Filter edges: only include edges where both nodes are still at the same location
|
||||
const validEdges = edges.filter(edge => {
|
||||
const fromCurrentPos = nodePositions.get(edge.from_node_id);
|
||||
const toCurrentPos = nodePositions.get(edge.to_node_id);
|
||||
|
||||
// Skip if either node doesn't exist or doesn't have a current position
|
||||
if (!fromCurrentPos || !toCurrentPos ||
|
||||
fromCurrentPos.latitude === null || fromCurrentPos.longitude === null ||
|
||||
toCurrentPos.latitude === null || toCurrentPos.longitude === null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check if stored positions match current positions
|
||||
const fromMatches = fromCurrentPos.latitude === edge.from_latitude &&
|
||||
fromCurrentPos.longitude === edge.from_longitude;
|
||||
const toMatches = toCurrentPos.latitude === edge.to_latitude &&
|
||||
toCurrentPos.longitude === edge.to_longitude;
|
||||
|
||||
return fromMatches && toMatches;
|
||||
});
|
||||
|
||||
// Normalize node pairs: always use min/max to treat A->B and B->A as same connection
|
||||
const connectionsMap = new Map();
|
||||
|
||||
for (const edge of validEdges) {
|
||||
const nodeA = edge.from_node_id < edge.to_node_id ? edge.from_node_id : edge.to_node_id;
|
||||
const nodeB = edge.from_node_id < edge.to_node_id ? edge.to_node_id : edge.from_node_id;
|
||||
const key = `${nodeA}-${nodeB}`;
|
||||
|
||||
if (!connectionsMap.has(key)) {
|
||||
connectionsMap.set(key, {
|
||||
node_a: nodeA,
|
||||
node_b: nodeB,
|
||||
direction_ab: [], // A -> B edges
|
||||
direction_ba: [], // B -> A edges
|
||||
});
|
||||
}
|
||||
|
||||
const connection = connectionsMap.get(key);
|
||||
const isAB = edge.from_node_id === nodeA;
|
||||
|
||||
// Add edge to appropriate direction
|
||||
if (isAB) {
|
||||
connection.direction_ab.push({
|
||||
snr: edge.snr,
|
||||
snr_db: edge.snr / 4, // Convert to dB
|
||||
created_at: edge.created_at,
|
||||
packet_id: edge.packet_id,
|
||||
source: edge.source,
|
||||
});
|
||||
} else {
|
||||
connection.direction_ba.push({
|
||||
snr: edge.snr,
|
||||
snr_db: edge.snr / 4,
|
||||
created_at: edge.created_at,
|
||||
packet_id: edge.packet_id,
|
||||
source: edge.source,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Aggregate each connection
|
||||
const connections = Array.from(connectionsMap.values()).map(conn => {
|
||||
// Deduplicate edges by packet_id for each direction (keep first occurrence, which is most recent)
|
||||
const dedupeByPacketId = (edges) => {
|
||||
const seen = new Set();
|
||||
return edges.filter(edge => {
|
||||
if (seen.has(edge.packet_id)) {
|
||||
return false;
|
||||
}
|
||||
seen.add(edge.packet_id);
|
||||
return true;
|
||||
});
|
||||
};
|
||||
|
||||
const deduplicatedAB = dedupeByPacketId(conn.direction_ab);
|
||||
const deduplicatedBA = dedupeByPacketId(conn.direction_ba);
|
||||
|
||||
// Calculate average SNR for A->B (using deduplicated edges)
|
||||
const avgSnrAB = deduplicatedAB.length > 0
|
||||
? deduplicatedAB.reduce((sum, e) => sum + e.snr_db, 0) / deduplicatedAB.length
|
||||
: null;
|
||||
|
||||
// Calculate average SNR for B->A (using deduplicated edges)
|
||||
const avgSnrBA = deduplicatedBA.length > 0
|
||||
? deduplicatedBA.reduce((sum, e) => sum + e.snr_db, 0) / deduplicatedBA.length
|
||||
: null;
|
||||
|
||||
// Get last 5 edges for each direction (already sorted by created_at DESC, packet_id DESC, now deduplicated)
|
||||
const last5AB = deduplicatedAB.slice(0, 5);
|
||||
const last5BA = deduplicatedBA.slice(0, 5);
|
||||
|
||||
// Determine worst average SNR
|
||||
const worstAvgSnrDb = [avgSnrAB, avgSnrBA]
|
||||
.filter(v => v !== null)
|
||||
.reduce((min, val) => val < min ? val : min, Infinity);
|
||||
|
||||
return {
|
||||
node_a: conn.node_a,
|
||||
node_b: conn.node_b,
|
||||
direction_ab: {
|
||||
avg_snr_db: avgSnrAB,
|
||||
last_5_edges: last5AB,
|
||||
total_count: deduplicatedAB.length, // Use deduplicated count
|
||||
},
|
||||
direction_ba: {
|
||||
avg_snr_db: avgSnrBA,
|
||||
last_5_edges: last5BA,
|
||||
total_count: deduplicatedBA.length, // Use deduplicated count
|
||||
},
|
||||
worst_avg_snr_db: worstAvgSnrDb !== Infinity ? worstAvgSnrDb : null,
|
||||
};
|
||||
}).filter(conn => conn.worst_avg_snr_db !== null); // Only return connections with at least one direction
|
||||
|
||||
res.json({
|
||||
connections: connections,
|
||||
});
|
||||
|
||||
} catch (err) {
|
||||
console.error(err);
|
||||
res.status(500).json({
|
||||
message: "Something went wrong, try again later.",
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
app.get('/api/v1/nodes/:nodeId/position-history', async (req, res) => {
|
||||
try {
|
||||
|
||||
|
|
|
|||
218
src/mqtt.js
218
src/mqtt.js
|
|
@ -1094,6 +1094,70 @@ client.on("message", async (topic, message) => {
|
|||
console.error(e);
|
||||
}
|
||||
|
||||
// Extract edges from neighbour info
|
||||
try {
|
||||
const fromNodeId = envelope.packet.from;
|
||||
const neighbors = neighbourInfo.neighbors || [];
|
||||
const packetId = envelope.packet.id;
|
||||
const channelId = envelope.channelId;
|
||||
const gatewayId = envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null;
|
||||
const edgesToCreate = [];
|
||||
|
||||
for(const neighbour of neighbors) {
|
||||
// Skip if no node ID
|
||||
if(!neighbour.nodeId) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Skip if SNR is invalid (0 or null/undefined)
|
||||
// Note: SNR can be negative, so we check for 0 specifically
|
||||
if(neighbour.snr === 0 || neighbour.snr == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const toNodeId = neighbour.nodeId;
|
||||
const snr = neighbour.snr;
|
||||
|
||||
// Fetch node positions from Node table
|
||||
const [fromNode, toNode] = await Promise.all([
|
||||
prisma.node.findUnique({
|
||||
where: { node_id: fromNodeId },
|
||||
select: { latitude: true, longitude: true },
|
||||
}),
|
||||
prisma.node.findUnique({
|
||||
where: { node_id: toNodeId },
|
||||
select: { latitude: true, longitude: true },
|
||||
}),
|
||||
]);
|
||||
|
||||
// Create edge record
|
||||
edgesToCreate.push({
|
||||
from_node_id: fromNodeId,
|
||||
to_node_id: toNodeId,
|
||||
snr: snr,
|
||||
from_latitude: fromNode?.latitude ?? null,
|
||||
from_longitude: fromNode?.longitude ?? null,
|
||||
to_latitude: toNode?.latitude ?? null,
|
||||
to_longitude: toNode?.longitude ?? null,
|
||||
packet_id: packetId,
|
||||
channel_id: channelId,
|
||||
gateway_id: gatewayId,
|
||||
source: "NEIGHBORINFO_APP",
|
||||
});
|
||||
}
|
||||
|
||||
// Bulk insert edges
|
||||
if(edgesToCreate.length > 0) {
|
||||
await prisma.edge.createMany({
|
||||
data: edgesToCreate,
|
||||
skipDuplicates: true, // Skip if exact duplicate exists
|
||||
});
|
||||
}
|
||||
} catch (e) {
|
||||
// Log error but don't crash - edge extraction is non-critical
|
||||
console.error("Error extracting edges from neighbour info:", e);
|
||||
}
|
||||
|
||||
// don't store all neighbour infos, but we want to update the existing node above
|
||||
if(!collectNeighbourInfo){
|
||||
return;
|
||||
|
|
@ -1336,6 +1400,160 @@ client.on("message", async (topic, message) => {
|
|||
console.error(e);
|
||||
}
|
||||
|
||||
// Extract edges from traceroute (only for response packets)
|
||||
if(!envelope.packet.decoded.wantResponse) {
|
||||
try {
|
||||
const route = routeDiscovery.route || [];
|
||||
const snrTowards = routeDiscovery.snrTowards || [];
|
||||
const originNodeId = envelope.packet.to;
|
||||
const destinationNodeId = envelope.packet.from;
|
||||
const packetId = envelope.packet.id;
|
||||
const channelId = envelope.channelId;
|
||||
const gatewayId = envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null;
|
||||
|
||||
// Determine number of edges: route.length + 1
|
||||
const numEdges = route.length + 1;
|
||||
const edgesToCreate = [];
|
||||
|
||||
// Extract edges from the route path
|
||||
for(let i = 0; i < numEdges; i++) {
|
||||
// Get SNR for this edge
|
||||
if(i >= snrTowards.length) {
|
||||
// Array length mismatch - skip this edge
|
||||
continue;
|
||||
}
|
||||
|
||||
const snr = snrTowards[i];
|
||||
|
||||
// Skip if SNR is -128 (no SNR recorded)
|
||||
if(snr === -128) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Determine from_node and to_node
|
||||
let fromNodeId, toNodeId;
|
||||
|
||||
if(route.length === 0) {
|
||||
// Empty route: direct connection (to -> from)
|
||||
fromNodeId = originNodeId;
|
||||
toNodeId = destinationNodeId;
|
||||
} else if(i === 0) {
|
||||
// First edge: origin -> route[0]
|
||||
fromNodeId = originNodeId;
|
||||
toNodeId = route[0];
|
||||
} else if(i === route.length) {
|
||||
// Last edge: route[route.length-1] -> destination
|
||||
fromNodeId = route[route.length - 1];
|
||||
toNodeId = destinationNodeId;
|
||||
} else {
|
||||
// Middle edge: route[i-1] -> route[i]
|
||||
fromNodeId = route[i - 1];
|
||||
toNodeId = route[i];
|
||||
}
|
||||
|
||||
// Fetch node positions from Node table
|
||||
const [fromNode, toNode] = await Promise.all([
|
||||
prisma.node.findUnique({
|
||||
where: { node_id: fromNodeId },
|
||||
select: { latitude: true, longitude: true },
|
||||
}),
|
||||
prisma.node.findUnique({
|
||||
where: { node_id: toNodeId },
|
||||
select: { latitude: true, longitude: true },
|
||||
}),
|
||||
]);
|
||||
|
||||
// Create edge record (skip if nodes don't exist, but still create edge with null positions)
|
||||
edgesToCreate.push({
|
||||
from_node_id: fromNodeId,
|
||||
to_node_id: toNodeId,
|
||||
snr: snr,
|
||||
from_latitude: fromNode?.latitude ?? null,
|
||||
from_longitude: fromNode?.longitude ?? null,
|
||||
to_latitude: toNode?.latitude ?? null,
|
||||
to_longitude: toNode?.longitude ?? null,
|
||||
packet_id: packetId,
|
||||
channel_id: channelId,
|
||||
gateway_id: gatewayId,
|
||||
source: "TRACEROUTE_APP",
|
||||
});
|
||||
}
|
||||
|
||||
// Extract edges from route_back path
|
||||
const routeBack = routeDiscovery.routeBack || [];
|
||||
const snrBack = routeDiscovery.snrBack || [];
|
||||
|
||||
if(routeBack.length > 0) {
|
||||
// Number of edges in route_back equals route_back.length
|
||||
for(let i = 0; i < routeBack.length; i++) {
|
||||
// Get SNR for this edge
|
||||
if(i >= snrBack.length) {
|
||||
// Array length mismatch - skip this edge
|
||||
continue;
|
||||
}
|
||||
|
||||
const snr = snrBack[i];
|
||||
|
||||
// Skip if SNR is -128 (no SNR recorded)
|
||||
if(snr === -128) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Determine from_node and to_node
|
||||
let fromNodeId, toNodeId;
|
||||
|
||||
if(i === 0) {
|
||||
// First edge: from -> route_back[0]
|
||||
fromNodeId = destinationNodeId; // 'from' in the packet
|
||||
toNodeId = routeBack[0];
|
||||
} else {
|
||||
// Subsequent edges: route_back[i-1] -> route_back[i]
|
||||
fromNodeId = routeBack[i - 1];
|
||||
toNodeId = routeBack[i];
|
||||
}
|
||||
|
||||
// Fetch node positions from Node table
|
||||
const [fromNode, toNode] = await Promise.all([
|
||||
prisma.node.findUnique({
|
||||
where: { node_id: fromNodeId },
|
||||
select: { latitude: true, longitude: true },
|
||||
}),
|
||||
prisma.node.findUnique({
|
||||
where: { node_id: toNodeId },
|
||||
select: { latitude: true, longitude: true },
|
||||
}),
|
||||
]);
|
||||
|
||||
// Create edge record
|
||||
edgesToCreate.push({
|
||||
from_node_id: fromNodeId,
|
||||
to_node_id: toNodeId,
|
||||
snr: snr,
|
||||
from_latitude: fromNode?.latitude ?? null,
|
||||
from_longitude: fromNode?.longitude ?? null,
|
||||
to_latitude: toNode?.latitude ?? null,
|
||||
to_longitude: toNode?.longitude ?? null,
|
||||
packet_id: packetId,
|
||||
channel_id: channelId,
|
||||
gateway_id: gatewayId,
|
||||
source: "TRACEROUTE_APP",
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Bulk insert edges
|
||||
if(edgesToCreate.length > 0) {
|
||||
await prisma.edge.createMany({
|
||||
data: edgesToCreate,
|
||||
skipDuplicates: true, // Skip if exact duplicate exists
|
||||
});
|
||||
}
|
||||
} catch (e) {
|
||||
// Log error but don't crash - edge extraction is non-critical
|
||||
console.error("Error extracting edges from traceroute:", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
else if(portnum === 73) {
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
Loading…
Add table
Add a link
Reference in a new issue