Files
at-mintel/packages/gitea-mcp/src/index.ts
Marc Mintel 72556af24c
Some checks failed
Monorepo Pipeline / ⚡ Prioritize Release (push) Successful in 1s
Monorepo Pipeline / 🧪 Test (push) Failing after 54s
Monorepo Pipeline / 🧹 Lint (push) Failing after 2m36s
Monorepo Pipeline / 🏗️ Build (push) Successful in 3m27s
Monorepo Pipeline / 🚀 Release (push) Has been skipped
Monorepo Pipeline / 🐳 Build Gatekeeper (Product) (push) Has been skipped
Monorepo Pipeline / 🐳 Build Build-Base (push) Has been skipped
Monorepo Pipeline / 🐳 Build Production Runtime (push) Has been skipped
fix(mcp): handle gitea api envelope responses and add safety checks
2026-03-02 12:53:11 +01:00

267 lines
9.1 KiB
TypeScript

import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
ListResourcesRequestSchema,
ReadResourceRequestSchema,
SubscribeRequestSchema,
UnsubscribeRequestSchema,
Tool,
Resource,
} from "@modelcontextprotocol/sdk/types.js";
import { z } from "zod";
import axios from "axios";
const GITEA_HOST = process.env.GITEA_HOST || "https://git.infra.mintel.me";
const GITEA_ACCESS_TOKEN = process.env.GITEA_ACCESS_TOKEN;
if (!GITEA_ACCESS_TOKEN) {
console.error("Error: GITEA_ACCESS_TOKEN environment variable is required");
process.exit(1);
}
const giteaClient = axios.create({
baseURL: `${GITEA_HOST.replace(/\/$/, '')}/api/v1`,
headers: {
Authorization: `token ${GITEA_ACCESS_TOKEN}`,
},
});
const LIST_PIPELINES_TOOL: Tool = {
name: "gitea_list_pipelines",
description: "List recent action runs (pipelines) for a specific repository",
inputSchema: {
type: "object",
properties: {
owner: { type: "string", description: "Repository owner (e.g., 'mmintel')" },
repo: { type: "string", description: "Repository name (e.g., 'at-mintel')" },
limit: { type: "number", description: "Number of runs to fetch (default: 5)" },
},
required: ["owner", "repo"],
},
};
const GET_PIPELINE_LOGS_TOOL: Tool = {
name: "gitea_get_pipeline_logs",
description: "Get detailed logs for a specific pipeline run or job",
inputSchema: {
type: "object",
properties: {
owner: { type: "string", description: "Repository owner" },
repo: { type: "string", description: "Repository name" },
run_id: { type: "number", description: "ID of the action run" },
},
required: ["owner", "repo", "run_id"],
},
};
// Subscription State
const subscriptions = new Set<string>();
const runStatusCache = new Map<string, string>(); // uri -> status
const server = new Server(
{
name: "gitea-mcp-native",
version: "1.0.0",
},
{
capabilities: {
tools: {},
resources: { subscribe: true }, // Enable subscriptions
},
}
);
// --- Tools ---
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [LIST_PIPELINES_TOOL, GET_PIPELINE_LOGS_TOOL],
};
});
server.setRequestHandler(CallToolRequestSchema, async (request) => {
if (request.params.name === "gitea_list_pipelines") {
// ... (Keeping exact same implementation as before for brevity)
const { owner, repo, limit = 5 } = request.params.arguments as any;
try {
const runsResponse = await giteaClient.get(`/repos/${owner}/${repo}/actions/runs`, {
params: { limit },
});
const runs = (runsResponse.data.workflow_runs || []) as any[];
const enhancedRuns = await Promise.all(
runs.map(async (run: any) => {
try {
const jobsResponse = await giteaClient.get(`/repos/${owner}/${repo}/actions/runs/${run.id}/jobs`);
const jobs = (jobsResponse.data.jobs || []) as any[];
return {
id: run.id,
name: run.name,
status: run.status,
created_at: run.created_at,
jobs: jobs.map((job: any) => ({
id: job.id,
name: job.name,
status: job.status,
conclusion: job.conclusion
}))
};
} catch (e) {
return { id: run.id, name: run.name, status: run.status, created_at: run.created_at, jobs: [] };
}
})
);
return {
content: [{ type: "text", text: JSON.stringify(enhancedRuns, null, 2) }],
};
} catch (error: any) {
return { isError: true, content: [{ type: "text", text: `Error fetching pipelines: ${error.message}` }] };
}
}
if (request.params.name === "gitea_get_pipeline_logs") {
const { owner, repo, run_id } = request.params.arguments as any;
try {
const jobsResponse = await giteaClient.get(`/repos/${owner}/${repo}/actions/runs/${run_id}/jobs`);
const jobs = (jobsResponse.data.jobs || []) as any[];
const logs = jobs.map((job: any) => ({
job_id: job.id,
job_name: job.name,
status: job.status,
conclusion: job.conclusion,
steps: (job.steps || []).map((step: any) => ({
name: step.name,
status: step.status,
conclusion: step.conclusion
}))
}));
return { content: [{ type: "text", text: JSON.stringify(logs, null, 2) }] };
} catch (error: any) {
return { isError: true, content: [{ type: "text", text: `Error: ${error.message}` }] };
}
}
throw new Error(`Unknown tool: ${request.params.name}`);
});
// --- Resources & Subscriptions ---
// We will expose a dynamic resource URI pattern: gitea://runs/{owner}/{repo}/{run_id}
server.setRequestHandler(ListResourcesRequestSchema, async () => {
return {
resources: [
{
uri: "gitea://runs",
name: "Gitea Pipeline Runs",
description: "Dynamic resource for subscribing to pipeline runs. Format: gitea://runs/{owner}/{repo}/{run_id}",
mimeType: "application/json",
}
],
};
});
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
const uri = request.params.uri;
const match = uri.match(/^gitea:\/\/runs\/([^\/]+)\/([^\/]+)\/(\d+)$/);
if (!match) {
throw new Error(`Invalid resource URI. Must be gitea://runs/{owner}/{repo}/{run_id}`);
}
const [, owner, repo, run_id] = match;
try {
const runResponse = await giteaClient.get(`/repos/${owner}/${repo}/actions/runs/${run_id}`);
const jobsResponse = await giteaClient.get(`/repos/${owner}/${repo}/actions/runs/${run_id}/jobs`);
const resourceContent = {
run: runResponse.data,
jobs: jobsResponse.data
};
// Update internal cache when read
runStatusCache.set(uri, runResponse.data.status);
return {
contents: [
{
uri,
mimeType: "application/json",
text: JSON.stringify(resourceContent, null, 2),
},
],
};
} catch (error: any) {
throw new Error(`Failed to read Gitea resource: ${error.message}`);
}
});
server.setRequestHandler(SubscribeRequestSchema, async (request) => {
const uri = request.params.uri;
if (!uri.startsWith("gitea://runs/")) {
throw new Error("Only gitea://runs resources can be subscribed to");
}
subscriptions.add(uri);
console.error(`Client subscribed to ${uri}`);
return {};
});
server.setRequestHandler(UnsubscribeRequestSchema, async (request) => {
const uri = request.params.uri;
subscriptions.delete(uri);
console.error(`Client unsubscribed from ${uri}`);
return {};
});
// The server polling mechanism that pushes updates to subscribed clients
async function pollSubscriptions() {
for (const uri of subscriptions) {
const match = uri.match(/^gitea:\/\/runs\/([^\/]+)\/([^\/]+)\/(\d+)$/);
if (!match) continue;
const [, owner, repo, run_id] = match;
try {
const runResponse = await giteaClient.get(`/repos/${owner}/${repo}/actions/runs/${run_id}`);
const currentStatus = runResponse.data.status;
const prevStatus = runStatusCache.get(uri);
// If status changed (e.g. running -> completed), notify client
if (prevStatus !== currentStatus) {
runStatusCache.set(uri, currentStatus);
server.notification({
method: "notifications/resources/updated",
params: { uri }
});
console.error(`Pushed update for ${uri}: ${prevStatus} -> ${currentStatus}`);
// Auto-unsubscribe if completed/failed so we don't poll forever?
// Let the client decide, or we can handle it here if requested.
}
} catch (e) {
console.error(`Error polling subscription ${uri}:`, e);
}
}
// Poll every 5 seconds
setTimeout(pollSubscriptions, 5000);
}
async function run() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("Gitea MCP Native Server running on stdio");
// Start the background poller
pollSubscriptions();
}
run().catch((error) => {
console.error("Fatal error:", error);
process.exit(1);
});