Files
cyberusgate/app/main.py
T
Guillaume-Sanchez 0dfa6f5162 Modification droit
2026-06-29 21:59:49 +02:00

132 lines
5.2 KiB
Python
Executable File

# main.py : Backend pour le POC Cyberusgate avec FastAPI
import asyncio
import datetime
import uuid
from typing import Dict, List
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
# --- Initialisation de l'application FastAPI ---
app = FastAPI(
title="Cyberusgate POC API",
description="API pour la démonstration de la solution de badge digital."
)
# --- Configuration CORS ---
# Permet au frontend (ouvert depuis un fichier local) de communiquer avec le backend.
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Attention : En production, restreindre à l'URL du frontend.
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- "Base de données" en mémoire ---
# Utilisation d'un dictionnaire pour simuler une base de données d'utilisateurs.
# La clé est l'ID de l'utilisateur, la valeur contient ses informations.
users_db: Dict[str, Dict] = {
"user-001": {"id": "user-001", "name": "Alice Martin", "role": "Collaborateur", "has_access": True},
"user-002": {"id": "user-002", "name": "Bob Dubois", "role": "Prestataire", "has_access": True},
}
# --- Gestion des connexions WebSocket pour les logs en temps réel ---
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: str):
""" Diffuse un message à tous les clients connectés. """
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
async def log_and_broadcast(message: str):
""" Formate un message de log avec un timestamp et le diffuse. """
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
log_message = f"[{timestamp}] {message}"
print(log_message) # Affiche aussi dans la console du serveur
await manager.broadcast(log_message)
# --- Endpoints de l'API ---
@app.get("/users", summary="Lister tous les utilisateurs")
async def get_users():
""" Récupère la liste de tous les utilisateurs enregistrés. """
return list(users_db.values())
@app.post("/access/simulate", summary="Simuler un scan de badge")
async def simulate_access(data: Dict):
"""
Vérifie si un utilisateur a l'accès autorisé et log le résultat.
"""
user_id = data.get("user_id")
if not user_id or user_id not in users_db:
await log_and_broadcast(f"ERREUR: Tentative d'accès avec un ID inconnu '{user_id}'.")
return {"status": "refused", "reason": "Utilisateur inconnu"}
user = users_db[user_id]
if user["has_access"]:
await log_and_broadcast(f"ACCÈS AUTORISÉ: {user['name']} ({user['role']}) à la porte principale.")
return {"status": "authorized", "user_name": user['name']}
else:
await log_and_broadcast(f"ACCÈS REFUSÉ: {user['name']} ({user['role']}) - Droits révoqués.")
return {"status": "refused", "user_name": user['name'], "reason": "Accès révoqué"}
@app.post("/users/revoke/{user_id}", summary="Révoquer l'accès d'un utilisateur")
async def revoke_access(user_id: str):
""" Désactive les droits d'accès pour un utilisateur donné. """
if user_id in users_db:
users_db[user_id]["has_access"] = False
user_name = users_db[user_id]['name']
await log_and_broadcast(f"ACTION ADMIN: Accès révoqué pour {user_name}.")
return {"status": "success", "user_id": user_id, "has_access": False}
return {"status": "error", "message": "Utilisateur non trouvé"}
@app.post("/users/create_ephemeral", summary="Créer un pass éphémère")
async def create_ephemeral_pass():
""" Crée un nouvel utilisateur 'Prestataire' avec un accès temporaire. """
new_id = f"guest-{uuid.uuid4().hex[:4]}"
new_user = {
"id": new_id,
"name": f"Visiteur {new_id}",
"role": "Pass Éphémère",
"has_access": True,
}
users_db[new_id] = new_user
await log_and_broadcast(f"ACTION ADMIN: Pass éphémère créé pour {new_user['name']}.")
return new_user
# --- Endpoint WebSocket pour les logs ---
@app.websocket("/ws/logs")
async def websocket_endpoint(websocket: WebSocket):
"""
Point de connexion pour les clients qui veulent recevoir les logs en temps réel.
"""
await manager.connect(websocket)
await log_and_broadcast("INFO: Un client de la console d'admin s'est connecté.")
try:
while True:
# On maintient la connexion ouverte pour envoyer des messages
# Le serveur n'attend pas de message du client ici.
await asyncio.sleep(1)
except WebSocketDisconnect:
manager.disconnect(websocket)
await log_and_broadcast("INFO: Un client de la console d'admin s'est déconnecté.")
# --- Point d'entrée pour lancer le serveur (avec uvicorn) ---
if __name__ == "__main__":
import uvicorn
print("Lancement du serveur de l'API Cyberusgate sur http://0.0.0.0:8000")
uvicorn.run(app, host="0.0.0.0", port=8000)