Skip to content

v1

V1 Endpoints

Overview

This API provides endpoints for securely storing, retrieving, and deleting Snyk credentials using OpenBao (Vault) and Redis as backends. All endpoints are versioned under /credentials and /cache.


Authentication

This API does not implement authentication by default. If you deploy in production, you should secure the API (e.g., with a reverse proxy, network policy, or FastAPI dependencies).


Error Handling

  • All endpoints return JSON responses.
  • On error, the response will include an error field and an appropriate HTTP status code (e.g., 400, 404, 500, 503).

Example error response:

{
  "error": "Vault is sealed, cannot store credentials."
}

Endpoint Overview

delete_cache_key async

Deletes the Snyk auth token for the specified org/client from Redis.

Parameters:

Name Type Description Default
org_id str

The organization ID.

required
client_id str

The client ID.

required

Returns:

Name Type Description
JSONResponse JSONResponse

A confirmation message indicating the auth token was deleted.

Source code in snykey/api/v1/endpoints.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
@router.delete("/cache")
async def delete_cache_key(org_id: str, client_id: str) -> JSONResponse:
    """
    Deletes the Snyk auth token for the specified org/client from Redis.

    Args:
        org_id (str): The organization ID.
        client_id (str): The client ID.

    Returns:
        JSONResponse: A confirmation message indicating the auth token was deleted.
    """

    org_id = org_id.strip()
    client_id = client_id.strip()

    response: dict = await redis.delete_auth_token(org_id, client_id)

    return JSONResponse(content=response)

delete_credentials async

Delete Snyk credentials for a given org_id and client_id.

Parameters:

Name Type Description Default
org_id str

The organization ID.

required
client_id str

The client ID.

required

Returns:

Name Type Description
JSONResponse JSONResponse

A response indicating success or failure of the deletion.

Source code in snykey/api/v1/endpoints.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
@router.delete("/credentials")
async def delete_credentials(org_id: str, client_id: str) -> JSONResponse:
    """
    Delete Snyk credentials for a given org_id and client_id.

    Args:
        org_id (str): The organization ID.
        client_id (str): The client ID.

    Returns:
        JSONResponse: A response indicating success or failure of the deletion.
    """

    org_id = org_id.strip()
    client_id = client_id.strip()

    if not org_id or not client_id:
        return JSONResponse(
            status_code=400, content={"error": "org_id and client_id are required"}
        )

    # Delete auth token from Redis
    logger.info(
        "Deleting auth token from Redis for org_id: %s, client_id: %s",
        org_id,
        client_id,
    )

    await redis.delete_auth_token(org_id, client_id)

    await openbao.delete_refresh_key(org_id, client_id)

    return JSONResponse(content={"message": "Credentials deleted."})

get_credentials async

Gather Snyk credentials using the provided org_id and client_id.

Parameters:

Name Type Description Default
org_id str

The organization ID.

required
client_id str

The client ID.

required

Returns:

Name Type Description
JSONResponse JSONResponse

A response containing the gathered credentials or an error message.

Source code in snykey/api/v1/endpoints.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
@router.get("/credentials")
async def get_credentials(
    org_id: str, client_id: str, client_secret: str
) -> JSONResponse:
    """
    Gather Snyk credentials using the provided org_id and client_id.

    Args:
        org_id (str): The organization ID.
        client_id (str): The client ID.

    Returns:
        JSONResponse: A response containing the gathered credentials or an error message.
    """

    org_id = org_id.strip()
    client_id = client_id.strip()
    client_secret = client_secret.strip()

    # Check if auth token exists in Redis
    logger.info(
        "Checking Redis for auth token for org_id: %s, client_id: %s",
        org_id,
        client_id,
    )

    auth_token: bytes | None = None

    try:
        auth_token = await redis.get_auth_token(org_id, client_id)
    except Exception as e:
        logger.error("Failed to retrieve auth token from Redis: %s", e)

    if auth_token:
        logger.info("Found auth token in Redis")
        return JSONResponse(content={"access_token": str(auth_token.decode())})

    # Get refresh key from OpenBao
    logger.info(
        "Gathering Snyk credentials for org_id: %s, client_id: %s",
        org_id,
        client_id,
    )

    refresh_key: str | None = None

    try:
        refresh_key = await openbao.get_refresh_key(org_id, client_id)
    except Exception as e:
        logger.error("Failed to retrieve refresh key from OpenBao: %s", e)

    if not refresh_key:
        return JSONResponse(
            status_code=404, content={"error": "No refresh key found for org/client"}
        )

    # Refresh Snyk token
    logger.info(
        "Refreshing Snyk token for org_id: %s, client_id: %s", org_id, client_id
    )

    expires_in: int = 3600

    try:
        result: dict = await snyk.refresh_snyk_token(
            client_id, client_secret, refresh_key
        )

        logger.info(
            "Successfully refreshed Snyk token for org_id: %s, client_id: %s",
            org_id,
            client_id,
        )

        expires_in = result.get("expires_in", 3600)

        await openbao.update_refresh_key(org_id, client_id, result["refresh_token"])
    except Exception as e:
        logger.error("Failed to refresh Snyk token: %s", e)
        return JSONResponse(status_code=500, content={"error": str(e)})

    try:
        await redis.store_auth_token(
            org_id,
            client_id,
            str(result["access_token"]),
            expiration=min(settings.REDIS_CACHE_TIME, expires_in),
        )
    except Exception as e:
        logger.error("Failed to store auth token in Redis: %s", e)
        return JSONResponse(status_code=500, content={"error": str(e)})

    return JSONResponse(content={"access_token": str(result["access_token"])})

oauth_callback async

OAuth callback endpoint to handle the authorization code flow.

This endpoint receives the authorization code from Snyk, retrieves the PKCE data (which includes org_id), exchanges the code for tokens, and stores the refresh token in OpenBao.

Parameters:

Name Type Description Default
code str

The authorization code from Snyk.

Query(..., description='Authorization code from Snyk')
state str

The state parameter for CSRF protection.

Query(..., description='State parameter for CSRF protection')
instance str

The Snyk instance (default: api.snyk.io).

Query(default='api.snyk.io', description='Snyk instance')

Returns:

Name Type Description
JSONResponse JSONResponse

A response indicating success or failure.

Source code in snykey/api/v1/endpoints.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
@router.get("/callback")
async def oauth_callback(
    code: str = Query(..., description="Authorization code from Snyk"),
    state: str = Query(..., description="State parameter for CSRF protection"),
    instance: str = Query(default="api.snyk.io", description="Snyk instance"),
) -> JSONResponse:
    """
    OAuth callback endpoint to handle the authorization code flow.

    This endpoint receives the authorization code from Snyk, retrieves the PKCE data
    (which includes org_id), exchanges the code for tokens, and stores the refresh token in OpenBao.

    Args:
        code (str): The authorization code from Snyk.
        state (str): The state parameter for CSRF protection.
        instance (str): The Snyk instance (default: api.snyk.io).

    Returns:
        JSONResponse: A response indicating success or failure.
    """

    logger.info("Received OAuth callback with state: %s", state)

    # Retrieve PKCE data from Redis using state
    pkce_data: dict | None = await redis.get_pkce_data(state.strip())

    if not pkce_data:
        logger.error("PKCE data not found for state: %s", state)
        return JSONResponse(
            status_code=400, content={"error": "Invalid or expired state parameter"}
        )

    code_verifier: str = pkce_data.get("code_verifier")
    client_id: str = pkce_data.get("client_id").strip()
    client_secret: str = pkce_data.get("client_secret")
    redirect_uri: str = pkce_data.get("redirect_uri")
    org_id: str = pkce_data.get("org_id").strip()

    if not all([code_verifier, client_id, client_secret, redirect_uri, org_id]):
        logger.error("Incomplete PKCE data for state: %s", state)
        return JSONResponse(status_code=500, content={"error": "Incomplete PKCE data"})

    if await openbao.check_vault_sealed():
        logger.error("Vault is sealed, cannot proceed with OAuth callback")
        return JSONResponse(
            status_code=503,
            content={"error": "Vault is sealed, cannot store credentials."},
        )

    refresh_token: str | None = None
    access_token: str | None = None
    expires_in: int = 3600

    logger.info("Exchanging authorization code for tokens")
    try:
        token_response: dict = await snyk.exchange_code_for_token(
            code=code,
            client_id=client_id,
            client_secret=client_secret,
            redirect_uri=redirect_uri,
            code_verifier=code_verifier,
        )

        refresh_token = token_response.get("refresh_token")
        access_token = token_response.get("access_token")
        expires_in = token_response.get("expires_in", 3600)

        logger.info("Successfully exchanged code for tokens")

    except Exception as e:
        logger.error("Failed to exchange code for tokens: %s", e)
        # Clean up PKCE data on failure
        await redis.delete_pkce_data(state)
        return JSONResponse(
            status_code=500,
            content={"error": f"Failed to exchange code for tokens: {str(e)}"},
        )

    logger.info(
        "Storing refresh token in OpenBao for org_id: %s, client_id: %s",
        org_id,
        client_id,
    )
    try:
        await openbao.store_refresh_key(org_id, client_id, refresh_token)
        logger.info("Successfully stored refresh token in OpenBao")
    except Exception as e:
        logger.error("Failed to store refresh token in OpenBao: %s", e)

        await redis.delete_pkce_data(state)
        return JSONResponse(
            status_code=500,
            content={"error": f"Failed to store refresh token: {str(e)}"},
        )

    logger.info("Storing access token in Redis cache")
    try:
        await redis.store_auth_token(
            org_id=org_id,
            client_id=client_id,
            auth_token=access_token,
            expiration=min(expires_in, settings.REDIS_CACHE_TIME),
        )
        logger.info("Successfully stored access token in Redis")
    except Exception as e:
        logger.warning("Failed to store access token in Redis cache: %s", e)

    await redis.delete_pkce_data(state)
    logger.info("OAuth callback completed successfully")

    return JSONResponse(
        content={
            "message": "Successfully authenticated and stored credentials",
            "org_id": org_id,
            "client_id": client_id,
        }
    )

register_app async

Register a new Snyk app with the specified parameters.

Parameters:

Name Type Description Default
name str

Name of the Snyk app.

required
scopes str

Comma-separated list of scopes for the Snyk app.

required
redirect_uris str

Comma-separated list of redirect URIs for the Snyk app. The first one is assumed for storing PKCE data.

required
org_id str

Snyk organization ID (stored in PKCE data for callback).

required
auth_token str

Snyk authentication token.

required

Returns:

Name Type Description
JSONResponse JSONResponse

A response containing the registered app details or an error message.

Source code in snykey/api/v1/endpoints.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
@router.post("/register-app")
async def register_app(
    name: str,
    scopes: str,
    redirect_uris: str,
    org_id: str,
    auth_token: str,
) -> JSONResponse:
    """
    Register a new Snyk app with the specified parameters.

    Args:
        name (str): Name of the Snyk app.
        scopes (str): Comma-separated list of scopes for the Snyk app.
        redirect_uris (str): Comma-separated list of redirect URIs for the Snyk app. The first one is assumed for storing PKCE data.
        org_id (str): Snyk organization ID (stored in PKCE data for callback).
        auth_token (str): Snyk authentication token.

    Returns:
        JSONResponse: A response containing the registered app details or an error message.
    """

    code_verifier: str | None = None
    code_challenge: str | None = None
    state: str | None = None

    used_states: list[str] = await redis.get_all_states()

    try:
        code_verifier = await oauth.generate_code_verifier()
        code_challenge = await oauth.generate_code_challenge(code_verifier)

        while not (state and state not in used_states):
            state = await oauth.generate_state()

    except Exception as e:
        logger.error("Failed to generate OAuth parameters: %s", e)
        return JSONResponse(status_code=500, content={"error": str(e)})

    org_id = org_id.strip()
    auth_token = auth_token.strip()
    name = name.strip()

    scopes_list: list[str] = [s.strip() for s in scopes.split(",")]
    redirect_uris_list: list[str] = [u.strip() for u in redirect_uris.split(",")]

    result: dict = {}

    try:
        result = await snyk.register_snyk_app(
            name, scopes_list, redirect_uris_list, org_id, auth_token
        )
    except Exception as e:
        logger.error("Failed to register Snyk app: %s", e)
        return JSONResponse(status_code=500, content={"error": str(e)})

    client_id: str | None = (
        result.get("data", {}).get("attributes", {}).get("client_id", None)
    )
    client_secret: str | None = (
        result.get("data", {}).get("attributes", {}).get("client_secret", None)
    )

    if client_id is None:
        logger.error("Client ID not found in Snyk app registration response.")
        return JSONResponse(
            status_code=500, content={"error": "Client ID not found in response."}
        )

    auth_urls: dict = {}

    for uri in redirect_uris_list:
        auth_url: str = snyk.generate_auth_url(
            client_id=client_id,
            redirect_uri=uri,
            scopes=scopes_list,
            state=state,
            code_challenge=code_challenge,
            code_challenge_method="S256",
        )

        auth_urls[uri] = auth_url

    result["auth_urls"] = auth_urls

    await redis.store_pkce_data(
        state=state,
        code_verifier=code_verifier,
        client_id=client_id,
        client_secret=client_secret,
        redirect_uri=redirect_uris_list[
            0
        ],  # Assuming the first redirect URI is used for storing
        org_id=org_id,
        expiration=settings.REDIS_PKCE_EXPIRATION,
    )

    return JSONResponse(content=result)

store_credentials async

Store Snyk credentials in OpenBao.

Parameters:

Name Type Description Default
org_id str

The organization ID.

required
client_id str

The client ID.

required
client_secret str

The client secret.

required
refresh_key str

The refresh key.

required

Returns:

Name Type Description
JSONResponse JSONResponse

A response indicating success or failure.

Source code in snykey/api/v1/endpoints.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@router.put("/credentials")
async def store_credentials(
    org_id: str, client_id: str, client_secret: str, refresh_key: str
) -> JSONResponse:
    """
    Store Snyk credentials in OpenBao.

    Args:
        org_id (str): The organization ID.
        client_id (str): The client ID.
        client_secret (str): The client secret.
        refresh_key (str): The refresh key.

    Returns:
        JSONResponse: A response indicating success or failure.
    """

    org_id = org_id.strip()
    client_id = client_id.strip()
    client_secret = client_secret.strip()
    refresh_key = refresh_key.strip()

    if await openbao.check_vault_sealed():
        return JSONResponse(
            status_code=503,
            content={"error": "Vault is sealed, cannot store credentials."},
        )

    logger.info("Refreshing key to ensure no other process can use it.")
    try:
        result: dict = await snyk.refresh_snyk_token(
            client_id, client_secret, refresh_key
        )

        logger.info(
            "Successfully refreshed Snyk token for org_id: %s, client_id: %s",
            org_id,
            client_id,
        )

        await openbao.store_refresh_key(org_id, client_id, result["refresh_token"])
    except Exception as e:
        logger.error("Failed to refresh Snyk token: %s", e)
        return JSONResponse(status_code=500, content={"error": str(e)})

    return JSONResponse(content={"message": "Credentials stored."})