Adapters
chemistry
biology
finance
legal
art
climate
agent
Merge
Potex / integral_apollo_server.js
BACCHUS45's picture
Upload integral_apollo_server.js
aa67905 verified
// Integral API — Apollo Server implementation (MVP)
// This document contains multiple files you can copy into a project.
/*
File structure (paste into repo):
integral-apollo-server/
├─ package.json
├─ .env.example
├─ docker-compose.yml
├─ migrations/01_schema.sql
├─ src/
| ├─ index.js -> Apollo server entry
| ├─ db.js -> Postgres pool
| ├─ pubsub.js -> Redis-backed PubSub
| ├─ schema.js -> GraphQL typeDefs
| └─ resolvers.js -> resolvers wired to Postgres + PubSub
└─ README.md
Follow README to run.
*/
// -------------------------
// package.json
// -------------------------
/*
{
"name": "integral-apollo-server",
"version": "0.1.0",
"main": "src/index.js",
"scripts": {
"start": "node src/index.js",
"dev": "nodemon --watch src --exec node src/index.js"
},
"engines": { "node": ">=18" },
"dependencies": {
"apollo-server": "^3.11.1",
"graphql": "^16.6.0",
"pg": "^8.11.0",
"ioredis": "^5.3.2",
"graphql-redis-subscriptions": "^3.1.0",
"uuid": "^9.0.0",
"dotenv": "^16.0.3"
},
"devDependencies": {
"nodemon": "^2.0.22"
}
}
*/
// -------------------------
// .env.example
// -------------------------
/*
# Postgres
DATABASE_URL=postgresql://integral:integral@postgres:5432/integral
# Redis
REDIS_URL=redis://redis:6379
# Server
PORT=4000
# JWT (if you add auth later)
JWT_SECRET=changeme
*/
// -------------------------
// docker-compose.yml
// -------------------------
/*
version: '3.8'
services:
postgres:
image: postgres:15
environment:
POSTGRES_DB: integral
POSTGRES_USER: integral
POSTGRES_PASSWORD: integral
volumes:
- pgdata:/var/lib/postgresql/data
ports:
- '5432:5432'
redis:
image: redis:7
command: ["redis-server", "--save", "60", "1", "--appendonly", "yes"]
ports:
- '6379:6379'
app:
build: .
image: integral-apollo-server:dev
depends_on:
- postgres
- redis
environment:
- DATABASE_URL=${DATABASE_URL}
- REDIS_URL=${REDIS_URL}
- PORT=${PORT}
volumes:
- ./:/usr/src/app
command: ["npm", "run", "dev"]
ports:
- '4000:4000'
volumes:
pgdata:
*/
// -------------------------
// migrations/01_schema.sql
// -------------------------
/*
-- migrations/01_schema.sql
CREATE EXTENSION IF NOT EXISTS pgcrypto;
CREATE TABLE IF NOT EXISTS objects (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
namespace TEXT NOT NULL,
type TEXT NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
location JSONB,
severity INT,
confirmed BOOLEAN DEFAULT FALSE,
images JSONB,
provenance JSONB,
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE IF NOT EXISTS payouts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
fault_id UUID REFERENCES objects(id) ON DELETE CASCADE,
amount_minor_units BIGINT NOT NULL,
currency VARCHAR(8) NOT NULL DEFAULT 'ZAR',
payee_id TEXT NOT NULL,
status VARCHAR(32) NOT NULL DEFAULT 'created',
tx_ref TEXT,
created_at TIMESTAMPTZ DEFAULT now(),
settled_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS idx_objects_namespace ON objects(namespace);
CREATE INDEX IF NOT EXISTS idx_objects_confirmed ON objects(confirmed);
*/
// -------------------------
// src/db.js
// -------------------------
/* src/db.js */
const { Pool } = require('pg');
require('dotenv').config();
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
module.exports = {
query: (text, params) => pool.query(text, params),
pool
};
// -------------------------
// src/pubsub.js
// -------------------------
/* src/pubsub.js */
const { RedisPubSub } = require('graphql-redis-subscriptions');
const Redis = require('ioredis');
require('dotenv').config();
const options = {
retryStrategy: times => Math.min(times * 50, 2000),
};
const pubsub = new RedisPubSub({
publisher: new Redis(process.env.REDIS_URL, options),
subscriber: new Redis(process.env.REDIS_URL, options),
});
module.exports = pubsub;
// -------------------------
// src/schema.js
// -------------------------
/* src/schema.js */
const { gql } = require('apollo-server');
const typeDefs = gql`
scalar DateTime
scalar JSON
enum Namespace { infrastructure satellite }
type Location { lat: Float lon: Float }
type Provenance { source: String! license: String retrieved_at: DateTime! }
type InfrastructureFault {
id: ID!
namespace: Namespace!
type: String!
timestamp: DateTime!
location: JSON
severity: Int!
confirmed: Boolean!
images: [String]
provenance: Provenance
}
type Payout {
id: ID!
faultId: ID!
amountMinorUnits: Int!
currency: String!
payeeId: String!
status: String!
createdAt: DateTime!
settledAt: DateTime
txRef: String
}
input IngestFaultInput {
namespace: Namespace!
type: String!
timestamp: DateTime!
location: JSON
severity: Int!
images: [String]
provenance: JSON
}
input CreatePayoutInput {
faultId: ID!
amountMinorUnits: Int!
currency: String!
payeeId: String!
}
type Query {
listInfraFaults(limit: Int = 50, offset: Int = 0): [InfrastructureFault!]
payoutsForFault(faultId: ID!): [Payout!]
}
type Mutation {
ingestFault(input: IngestFaultInput!): InfrastructureFault!
confirmFault(id: ID!, confirmed: Boolean!): InfrastructureFault!
createPayout(input: CreatePayoutInput!): Payout!
settlePayout(payoutId: ID!): Payout!
}
type Subscription {
faultCreated: InfrastructureFault!
faultConfirmed: InfrastructureFault!
payoutUpdated: Payout!
}
`;
module.exports = typeDefs;
// -------------------------
// src/resolvers.js
// -------------------------
/* src/resolvers.js */
const { GraphQLScalarType, Kind } = require('graphql');
const db = require('./db');
const pubsub = require('./pubsub');
const { v4: uuidv4 } = require('uuid');
const FAULT_CREATED = 'FAULT_CREATED';
const FAULT_CONFIRMED = 'FAULT_CONFIRMED';
const PAYOUT_UPDATED = 'PAYOUT_UPDATED';
const DateTime = new GraphQLScalarType({
name: 'DateTime',
description: 'ISO date-time scalar',
parseValue: value => new Date(value),
serialize: value => value instanceof Date ? value.toISOString() : new Date(value).toISOString(),
parseLiteral(ast) {
if (ast.kind === Kind.STRING) return new Date(ast.value);
return null;
}
});
const JSONScalar = new GraphQLScalarType({
name: 'JSON',
description: 'Arbitrary JSON value',
parseValue: value => value,
serialize: value => value,
parseLiteral(ast) {
switch (ast.kind) {
case Kind.STRING: return ast.value;
case Kind.INT: return parseInt(ast.value, 10);
case Kind.FLOAT: return parseFloat(ast.value);
case Kind.BOOLEAN: return ast.value === 'true';
default: return null;
}
}
});
const resolvers = {
DateTime,
JSON: JSONScalar,
Query: {
listInfraFaults: async (_, { limit, offset }) => {
const r = await db.query('SELECT * FROM objects WHERE type=$1 ORDER BY created_at DESC LIMIT $2 OFFSET $3', ['pothole-detection', limit, offset]);
return r.rows.map(r => ({
id: r.id,
namespace: r.namespace,
type: r.type,
timestamp: r.timestamp,
location: r.location,
severity: r.severity,
confirmed: r.confirmed,
images: r.images,
provenance: r.provenance
}));
},
payoutsForFault: async (_, { faultId }) => {
const r = await db.query('SELECT * FROM payouts WHERE fault_id=$1 ORDER BY created_at DESC', [faultId]);
return r.rows.map(p => ({
id: p.id,
faultId: p.fault_id,
amountMinorUnits: parseInt(p.amount_minor_units, 10),
currency: p.currency,
payeeId: p.payee_id,
status: p.status,
createdAt: p.created_at,
settledAt: p.settled_at,
txRef: p.tx_ref
}));
}
},
Mutation: {
ingestFault: async (_, { input }) => {
const id = uuidv4();
const q = `INSERT INTO objects (id, namespace, type, timestamp, location, severity, images, provenance)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8) RETURNING *`;
const vals = [id, input.namespace, input.type, input.timestamp || new Date().toISOString(), JSON.stringify(input.location || null), input.severity, JSON.stringify(input.images || []), input.provenance || {}];
const r = await db.query(q, vals);
const obj = r.rows[0];
const payload = {
id: obj.id,
namespace: obj.namespace,
type: obj.type,
timestamp: obj.timestamp,
location: obj.location,
severity: obj.severity,
confirmed: obj.confirmed,
images: obj.images,
provenance: obj.provenance
};
await pubsub.publish(FAULT_CREATED, payload);
return payload;
},
confirmFault: async (_, { id, confirmed }) => {
const q = 'UPDATE objects SET confirmed=$1 WHERE id=$2 RETURNING *';
const r = await db.query(q, [confirmed, id]);
if (r.rowCount === 0) throw new Error('fault not found');
const obj = r.rows[0];
const payload = {
id: obj.id,
namespace: obj.namespace,
type: obj.type,
timestamp: obj.timestamp,
location: obj.location,
severity: obj.severity,
confirmed: obj.confirmed,
images: obj.images,
provenance: obj.provenance
};
await pubsub.publish(FAULT_CONFIRMED, payload);
return payload;
},
createPayout: async (_, { input }) => {
const id = uuidv4();
const q = `INSERT INTO payouts (id, fault_id, amount_minor_units, currency, payee_id, status)
VALUES ($1,$2,$3,$4,$5,'created') RETURNING *`;
const vals = [id, input.faultId, input.amountMinorUnits, input.currency, input.payeeId];
const r = await db.query(q, vals);
const p = r.rows[0];
const payload = {
id: p.id,
faultId: p.fault_id,
amountMinorUnits: parseInt(p.amount_minor_units, 10),
currency: p.currency,
payeeId: p.payee_id,
status: p.status,
createdAt: p.created_at,
settledAt: p.settled_at,
txRef: p.tx_ref
};
await pubsub.publish(PAYOUT_UPDATED, payload);
return payload;
},
settlePayout: async (_, { payoutId }) => {
// naive settlement flow; mark processing -> settled, attach fake tx
const client = await db.pool.connect();
try {
await client.query('BEGIN');
const r = await client.query('SELECT * FROM payouts WHERE id=$1 FOR UPDATE', [payoutId]);
if (r.rowCount === 0) throw new Error('payout not found');
const payout = r.rows[0];
if (payout.status !== 'created' && payout.status !== 'processing') {
await client.query('ROLLBACK');
return {
id: payout.id,
faultId: payout.fault_id,
amountMinorUnits: parseInt(payout.amount_minor_units, 10),
currency: payout.currency,
payeeId: payout.payee_id,
status: payout.status,
createdAt: payout.created_at,
settledAt: payout.settled_at,
txRef: payout.tx_ref
};
}
await client.query('UPDATE payouts SET status=$1 WHERE id=$2', ['processing', payoutId]);
// call external payment gateway / TMS here — replaced by fake
const fakeTx = `TX-${Date.now()}-${Math.floor(Math.random()*1000)}`;
await client.query('UPDATE payouts SET status=$1, tx_ref=$2, settled_at=now() WHERE id=$3', ['settled', fakeTx, payoutId]);
await client.query('COMMIT');
const updated = (await db.query('SELECT * FROM payouts WHERE id=$1', [payoutId])).rows[0];
const payload = {
id: updated.id,
faultId: updated.fault_id,
amountMinorUnits: parseInt(updated.amount_minor_units, 10),
currency: updated.currency,
payeeId: updated.payee_id,
status: updated.status,
createdAt: updated.created_at,
settledAt: updated.settled_at,
txRef: updated.tx_ref
};
await pubsub.publish(PAYOUT_UPDATED, payload);
return payload;
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
}
},
Subscription: {
faultCreated: { subscribe: () => pubsub.asyncIterator([FAULT_CREATED]) },
faultConfirmed: { subscribe: () => pubsub.asyncIterator([FAULT_CONFIRMED]) },
payoutUpdated: { subscribe: () => pubsub.asyncIterator([PAYOUT_UPDATED]) }
}
};
module.exports = resolvers;
// -------------------------
// src/index.js
// -------------------------
/* src/index.js */
const { ApolloServer } = require('apollo-server');
const typeDefs = require('./schema');
const resolvers = require('./resolvers');
require('dotenv').config();
async function start() {
const server = new ApolloServer({ typeDefs, resolvers });
const { url } = await server.listen({ port: process.env.PORT || 4000 });
console.log(`🚀 Server ready at ${url}`);
}
start();
// -------------------------
// README.md
// -------------------------
/* README.md
# Integral Apollo Server (MVP)
This repo is a minimal Apollo GraphQL server wired to Postgres + Redis as Pub/Sub.
It implements the "self-pay network" flows: ingest fault → confirm → create payout → settle payout.
## Quickstart (Docker)
1. Copy `.env.example` to `.env` and adjust if needed.
2. Start services:
docker-compose up -d
3. Run migrations into Postgres (example):
docker exec -i $(docker ps -q -f name=postgres) psql -U integral -d integral -f migrations/01_schema.sql
4. Install dependencies and start app locally (or use the provided app container):
npm install
npm run dev
5. Open GraphQL playground at http://localhost:4000/
## Basic GraphQL examples
- Ingest fault (mutation) — will publish `faultCreated` subscription
mutation Ingest {
ingestFault(input:{ namespace: infrastructure, type: "pothole-detection", timestamp: "2025-11-30T18:00:00Z", severity: 2, location: {"lat": -29.12, "lon": 26.22} }) {
id
severity
confirmed
}
}
- Confirm fault (mutation) — will publish `faultConfirmed` and auto-create payout (in your custom flow if you wire it)
mutation Confirm { confirmFault(id: "<id>", confirmed: true) { id confirmed } }
- Create payout
mutation CreatePayout { createPayout(input:{ faultId: "<id>", amountMinorUnits: 5000, currency: "ZAR", payeeId: "local-contractor-123" }) { id status }
}
- Settle payout
mutation Settle { settlePayout(payoutId: "<payoutId>") { id status txRef }
}
## Next steps
- Replace fake logic with ML verification service (image segmentation + sensor fusion)
- Integrate real TMS / payment gateway in `settlePayout`
- Add auth & RBAC for protected mutations
- Add audit trail table and signed provenance verification
*/