// Integral API — Apollo Server implementation (MVP) // This document contains multiple files you can copy into a project. /* File structure (paste into repo): integral-apollo-server/ ├─ package.json ├─ .env.example ├─ docker-compose.yml ├─ migrations/01_schema.sql ├─ src/ | ├─ index.js -> Apollo server entry | ├─ db.js -> Postgres pool | ├─ pubsub.js -> Redis-backed PubSub | ├─ schema.js -> GraphQL typeDefs | └─ resolvers.js -> resolvers wired to Postgres + PubSub └─ README.md Follow README to run. */ // ------------------------- // package.json // ------------------------- /* { "name": "integral-apollo-server", "version": "0.1.0", "main": "src/index.js", "scripts": { "start": "node src/index.js", "dev": "nodemon --watch src --exec node src/index.js" }, "engines": { "node": ">=18" }, "dependencies": { "apollo-server": "^3.11.1", "graphql": "^16.6.0", "pg": "^8.11.0", "ioredis": "^5.3.2", "graphql-redis-subscriptions": "^3.1.0", "uuid": "^9.0.0", "dotenv": "^16.0.3" }, "devDependencies": { "nodemon": "^2.0.22" } } */ // ------------------------- // .env.example // ------------------------- /* # Postgres DATABASE_URL=postgresql://integral:integral@postgres:5432/integral # Redis REDIS_URL=redis://redis:6379 # Server PORT=4000 # JWT (if you add auth later) JWT_SECRET=changeme */ // ------------------------- // docker-compose.yml // ------------------------- /* version: '3.8' services: postgres: image: postgres:15 environment: POSTGRES_DB: integral POSTGRES_USER: integral POSTGRES_PASSWORD: integral volumes: - pgdata:/var/lib/postgresql/data ports: - '5432:5432' redis: image: redis:7 command: ["redis-server", "--save", "60", "1", "--appendonly", "yes"] ports: - '6379:6379' app: build: . image: integral-apollo-server:dev depends_on: - postgres - redis environment: - DATABASE_URL=${DATABASE_URL} - REDIS_URL=${REDIS_URL} - PORT=${PORT} volumes: - ./:/usr/src/app command: ["npm", "run", "dev"] ports: - '4000:4000' volumes: pgdata: */ // ------------------------- // migrations/01_schema.sql // ------------------------- /* -- migrations/01_schema.sql CREATE EXTENSION IF NOT EXISTS pgcrypto; CREATE TABLE IF NOT EXISTS objects ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), namespace TEXT NOT NULL, type TEXT NOT NULL, timestamp TIMESTAMPTZ NOT NULL, location JSONB, severity INT, confirmed BOOLEAN DEFAULT FALSE, images JSONB, provenance JSONB, created_at TIMESTAMPTZ DEFAULT now() ); CREATE TABLE IF NOT EXISTS payouts ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), fault_id UUID REFERENCES objects(id) ON DELETE CASCADE, amount_minor_units BIGINT NOT NULL, currency VARCHAR(8) NOT NULL DEFAULT 'ZAR', payee_id TEXT NOT NULL, status VARCHAR(32) NOT NULL DEFAULT 'created', tx_ref TEXT, created_at TIMESTAMPTZ DEFAULT now(), settled_at TIMESTAMPTZ ); CREATE INDEX IF NOT EXISTS idx_objects_namespace ON objects(namespace); CREATE INDEX IF NOT EXISTS idx_objects_confirmed ON objects(confirmed); */ // ------------------------- // src/db.js // ------------------------- /* src/db.js */ const { Pool } = require('pg'); require('dotenv').config(); const pool = new Pool({ connectionString: process.env.DATABASE_URL }); module.exports = { query: (text, params) => pool.query(text, params), pool }; // ------------------------- // src/pubsub.js // ------------------------- /* src/pubsub.js */ const { RedisPubSub } = require('graphql-redis-subscriptions'); const Redis = require('ioredis'); require('dotenv').config(); const options = { retryStrategy: times => Math.min(times * 50, 2000), }; const pubsub = new RedisPubSub({ publisher: new Redis(process.env.REDIS_URL, options), subscriber: new Redis(process.env.REDIS_URL, options), }); module.exports = pubsub; // ------------------------- // src/schema.js // ------------------------- /* src/schema.js */ const { gql } = require('apollo-server'); const typeDefs = gql` scalar DateTime scalar JSON enum Namespace { infrastructure satellite } type Location { lat: Float lon: Float } type Provenance { source: String! license: String retrieved_at: DateTime! } type InfrastructureFault { id: ID! namespace: Namespace! type: String! timestamp: DateTime! location: JSON severity: Int! confirmed: Boolean! images: [String] provenance: Provenance } type Payout { id: ID! faultId: ID! amountMinorUnits: Int! currency: String! payeeId: String! status: String! createdAt: DateTime! settledAt: DateTime txRef: String } input IngestFaultInput { namespace: Namespace! type: String! timestamp: DateTime! location: JSON severity: Int! images: [String] provenance: JSON } input CreatePayoutInput { faultId: ID! amountMinorUnits: Int! currency: String! payeeId: String! } type Query { listInfraFaults(limit: Int = 50, offset: Int = 0): [InfrastructureFault!] payoutsForFault(faultId: ID!): [Payout!] } type Mutation { ingestFault(input: IngestFaultInput!): InfrastructureFault! confirmFault(id: ID!, confirmed: Boolean!): InfrastructureFault! createPayout(input: CreatePayoutInput!): Payout! settlePayout(payoutId: ID!): Payout! } type Subscription { faultCreated: InfrastructureFault! faultConfirmed: InfrastructureFault! payoutUpdated: Payout! } `; module.exports = typeDefs; // ------------------------- // src/resolvers.js // ------------------------- /* src/resolvers.js */ const { GraphQLScalarType, Kind } = require('graphql'); const db = require('./db'); const pubsub = require('./pubsub'); const { v4: uuidv4 } = require('uuid'); const FAULT_CREATED = 'FAULT_CREATED'; const FAULT_CONFIRMED = 'FAULT_CONFIRMED'; const PAYOUT_UPDATED = 'PAYOUT_UPDATED'; const DateTime = new GraphQLScalarType({ name: 'DateTime', description: 'ISO date-time scalar', parseValue: value => new Date(value), serialize: value => value instanceof Date ? value.toISOString() : new Date(value).toISOString(), parseLiteral(ast) { if (ast.kind === Kind.STRING) return new Date(ast.value); return null; } }); const JSONScalar = new GraphQLScalarType({ name: 'JSON', description: 'Arbitrary JSON value', parseValue: value => value, serialize: value => value, parseLiteral(ast) { switch (ast.kind) { case Kind.STRING: return ast.value; case Kind.INT: return parseInt(ast.value, 10); case Kind.FLOAT: return parseFloat(ast.value); case Kind.BOOLEAN: return ast.value === 'true'; default: return null; } } }); const resolvers = { DateTime, JSON: JSONScalar, Query: { listInfraFaults: async (_, { limit, offset }) => { const r = await db.query('SELECT * FROM objects WHERE type=$1 ORDER BY created_at DESC LIMIT $2 OFFSET $3', ['pothole-detection', limit, offset]); return r.rows.map(r => ({ id: r.id, namespace: r.namespace, type: r.type, timestamp: r.timestamp, location: r.location, severity: r.severity, confirmed: r.confirmed, images: r.images, provenance: r.provenance })); }, payoutsForFault: async (_, { faultId }) => { const r = await db.query('SELECT * FROM payouts WHERE fault_id=$1 ORDER BY created_at DESC', [faultId]); return r.rows.map(p => ({ id: p.id, faultId: p.fault_id, amountMinorUnits: parseInt(p.amount_minor_units, 10), currency: p.currency, payeeId: p.payee_id, status: p.status, createdAt: p.created_at, settledAt: p.settled_at, txRef: p.tx_ref })); } }, Mutation: { ingestFault: async (_, { input }) => { const id = uuidv4(); const q = `INSERT INTO objects (id, namespace, type, timestamp, location, severity, images, provenance) VALUES ($1,$2,$3,$4,$5,$6,$7,$8) RETURNING *`; const vals = [id, input.namespace, input.type, input.timestamp || new Date().toISOString(), JSON.stringify(input.location || null), input.severity, JSON.stringify(input.images || []), input.provenance || {}]; const r = await db.query(q, vals); const obj = r.rows[0]; const payload = { id: obj.id, namespace: obj.namespace, type: obj.type, timestamp: obj.timestamp, location: obj.location, severity: obj.severity, confirmed: obj.confirmed, images: obj.images, provenance: obj.provenance }; await pubsub.publish(FAULT_CREATED, payload); return payload; }, confirmFault: async (_, { id, confirmed }) => { const q = 'UPDATE objects SET confirmed=$1 WHERE id=$2 RETURNING *'; const r = await db.query(q, [confirmed, id]); if (r.rowCount === 0) throw new Error('fault not found'); const obj = r.rows[0]; const payload = { id: obj.id, namespace: obj.namespace, type: obj.type, timestamp: obj.timestamp, location: obj.location, severity: obj.severity, confirmed: obj.confirmed, images: obj.images, provenance: obj.provenance }; await pubsub.publish(FAULT_CONFIRMED, payload); return payload; }, createPayout: async (_, { input }) => { const id = uuidv4(); const q = `INSERT INTO payouts (id, fault_id, amount_minor_units, currency, payee_id, status) VALUES ($1,$2,$3,$4,$5,'created') RETURNING *`; const vals = [id, input.faultId, input.amountMinorUnits, input.currency, input.payeeId]; const r = await db.query(q, vals); const p = r.rows[0]; const payload = { id: p.id, faultId: p.fault_id, amountMinorUnits: parseInt(p.amount_minor_units, 10), currency: p.currency, payeeId: p.payee_id, status: p.status, createdAt: p.created_at, settledAt: p.settled_at, txRef: p.tx_ref }; await pubsub.publish(PAYOUT_UPDATED, payload); return payload; }, settlePayout: async (_, { payoutId }) => { // naive settlement flow; mark processing -> settled, attach fake tx const client = await db.pool.connect(); try { await client.query('BEGIN'); const r = await client.query('SELECT * FROM payouts WHERE id=$1 FOR UPDATE', [payoutId]); if (r.rowCount === 0) throw new Error('payout not found'); const payout = r.rows[0]; if (payout.status !== 'created' && payout.status !== 'processing') { await client.query('ROLLBACK'); return { id: payout.id, faultId: payout.fault_id, amountMinorUnits: parseInt(payout.amount_minor_units, 10), currency: payout.currency, payeeId: payout.payee_id, status: payout.status, createdAt: payout.created_at, settledAt: payout.settled_at, txRef: payout.tx_ref }; } await client.query('UPDATE payouts SET status=$1 WHERE id=$2', ['processing', payoutId]); // call external payment gateway / TMS here — replaced by fake const fakeTx = `TX-${Date.now()}-${Math.floor(Math.random()*1000)}`; await client.query('UPDATE payouts SET status=$1, tx_ref=$2, settled_at=now() WHERE id=$3', ['settled', fakeTx, payoutId]); await client.query('COMMIT'); const updated = (await db.query('SELECT * FROM payouts WHERE id=$1', [payoutId])).rows[0]; const payload = { id: updated.id, faultId: updated.fault_id, amountMinorUnits: parseInt(updated.amount_minor_units, 10), currency: updated.currency, payeeId: updated.payee_id, status: updated.status, createdAt: updated.created_at, settledAt: updated.settled_at, txRef: updated.tx_ref }; await pubsub.publish(PAYOUT_UPDATED, payload); return payload; } catch (err) { await client.query('ROLLBACK'); throw err; } finally { client.release(); } } }, Subscription: { faultCreated: { subscribe: () => pubsub.asyncIterator([FAULT_CREATED]) }, faultConfirmed: { subscribe: () => pubsub.asyncIterator([FAULT_CONFIRMED]) }, payoutUpdated: { subscribe: () => pubsub.asyncIterator([PAYOUT_UPDATED]) } } }; module.exports = resolvers; // ------------------------- // src/index.js // ------------------------- /* src/index.js */ const { ApolloServer } = require('apollo-server'); const typeDefs = require('./schema'); const resolvers = require('./resolvers'); require('dotenv').config(); async function start() { const server = new ApolloServer({ typeDefs, resolvers }); const { url } = await server.listen({ port: process.env.PORT || 4000 }); console.log(`🚀 Server ready at ${url}`); } start(); // ------------------------- // README.md // ------------------------- /* README.md # Integral Apollo Server (MVP) This repo is a minimal Apollo GraphQL server wired to Postgres + Redis as Pub/Sub. It implements the "self-pay network" flows: ingest fault → confirm → create payout → settle payout. ## Quickstart (Docker) 1. Copy `.env.example` to `.env` and adjust if needed. 2. Start services: docker-compose up -d 3. Run migrations into Postgres (example): docker exec -i $(docker ps -q -f name=postgres) psql -U integral -d integral -f migrations/01_schema.sql 4. Install dependencies and start app locally (or use the provided app container): npm install npm run dev 5. Open GraphQL playground at http://localhost:4000/ ## Basic GraphQL examples - Ingest fault (mutation) — will publish `faultCreated` subscription mutation Ingest { ingestFault(input:{ namespace: infrastructure, type: "pothole-detection", timestamp: "2025-11-30T18:00:00Z", severity: 2, location: {"lat": -29.12, "lon": 26.22} }) { id severity confirmed } } - Confirm fault (mutation) — will publish `faultConfirmed` and auto-create payout (in your custom flow if you wire it) mutation Confirm { confirmFault(id: "", confirmed: true) { id confirmed } } - Create payout mutation CreatePayout { createPayout(input:{ faultId: "", amountMinorUnits: 5000, currency: "ZAR", payeeId: "local-contractor-123" }) { id status } } - Settle payout mutation Settle { settlePayout(payoutId: "") { id status txRef } } ## Next steps - Replace fake logic with ML verification service (image segmentation + sensor fusion) - Integrate real TMS / payment gateway in `settlePayout` - Add auth & RBAC for protected mutations - Add audit trail table and signed provenance verification */