Refonte total
This commit is contained in:
+131
@@ -0,0 +1,131 @@
|
||||
# main.py : Backend pour le POC Cyberusgate avec FastAPI
|
||||
import asyncio
|
||||
import datetime
|
||||
import uuid
|
||||
from typing import Dict, List
|
||||
|
||||
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
|
||||
# --- Initialisation de l'application FastAPI ---
|
||||
app = FastAPI(
|
||||
title="Cyberusgate POC API",
|
||||
description="API pour la démonstration de la solution de badge digital."
|
||||
)
|
||||
|
||||
# --- Configuration CORS ---
|
||||
# Permet au frontend (ouvert depuis un fichier local) de communiquer avec le backend.
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"], # Attention : En production, restreindre à l'URL du frontend.
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
# --- "Base de données" en mémoire ---
|
||||
# Utilisation d'un dictionnaire pour simuler une base de données d'utilisateurs.
|
||||
# La clé est l'ID de l'utilisateur, la valeur contient ses informations.
|
||||
users_db: Dict[str, Dict] = {
|
||||
"user-001": {"id": "user-001", "name": "Alice Martin", "role": "Collaborateur", "has_access": True},
|
||||
"user-002": {"id": "user-002", "name": "Bob Dubois", "role": "Prestataire", "has_access": True},
|
||||
}
|
||||
|
||||
# --- Gestion des connexions WebSocket pour les logs en temps réel ---
|
||||
class ConnectionManager:
|
||||
def __init__(self):
|
||||
self.active_connections: List[WebSocket] = []
|
||||
|
||||
async def connect(self, websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
self.active_connections.append(websocket)
|
||||
|
||||
def disconnect(self, websocket: WebSocket):
|
||||
self.active_connections.remove(websocket)
|
||||
|
||||
async def broadcast(self, message: str):
|
||||
""" Diffuse un message à tous les clients connectés. """
|
||||
for connection in self.active_connections:
|
||||
await connection.send_text(message)
|
||||
|
||||
manager = ConnectionManager()
|
||||
|
||||
async def log_and_broadcast(message: str):
|
||||
""" Formate un message de log avec un timestamp et le diffuse. """
|
||||
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
|
||||
log_message = f"[{timestamp}] {message}"
|
||||
print(log_message) # Affiche aussi dans la console du serveur
|
||||
await manager.broadcast(log_message)
|
||||
|
||||
# --- Endpoints de l'API ---
|
||||
|
||||
@app.get("/users", summary="Lister tous les utilisateurs")
|
||||
async def get_users():
|
||||
""" Récupère la liste de tous les utilisateurs enregistrés. """
|
||||
return list(users_db.values())
|
||||
|
||||
@app.post("/access/simulate", summary="Simuler un scan de badge")
|
||||
async def simulate_access(data: Dict):
|
||||
"""
|
||||
Vérifie si un utilisateur a l'accès autorisé et log le résultat.
|
||||
"""
|
||||
user_id = data.get("user_id")
|
||||
if not user_id or user_id not in users_db:
|
||||
await log_and_broadcast(f"ERREUR: Tentative d'accès avec un ID inconnu '{user_id}'.")
|
||||
return {"status": "refused", "reason": "Utilisateur inconnu"}
|
||||
|
||||
user = users_db[user_id]
|
||||
if user["has_access"]:
|
||||
await log_and_broadcast(f"ACCÈS AUTORISÉ: {user['name']} ({user['role']}) à la porte principale.")
|
||||
return {"status": "authorized", "user_name": user['name']}
|
||||
else:
|
||||
await log_and_broadcast(f"ACCÈS REFUSÉ: {user['name']} ({user['role']}) - Droits révoqués.")
|
||||
return {"status": "refused", "user_name": user['name'], "reason": "Accès révoqué"}
|
||||
|
||||
@app.post("/users/revoke/{user_id}", summary="Révoquer l'accès d'un utilisateur")
|
||||
async def revoke_access(user_id: str):
|
||||
""" Désactive les droits d'accès pour un utilisateur donné. """
|
||||
if user_id in users_db:
|
||||
users_db[user_id]["has_access"] = False
|
||||
user_name = users_db[user_id]['name']
|
||||
await log_and_broadcast(f"ACTION ADMIN: Accès révoqué pour {user_name}.")
|
||||
return {"status": "success", "user_id": user_id, "has_access": False}
|
||||
return {"status": "error", "message": "Utilisateur non trouvé"}
|
||||
|
||||
@app.post("/users/create_ephemeral", summary="Créer un pass éphémère")
|
||||
async def create_ephemeral_pass():
|
||||
""" Crée un nouvel utilisateur 'Prestataire' avec un accès temporaire. """
|
||||
new_id = f"guest-{uuid.uuid4().hex[:4]}"
|
||||
new_user = {
|
||||
"id": new_id,
|
||||
"name": f"Visiteur {new_id}",
|
||||
"role": "Pass Éphémère",
|
||||
"has_access": True,
|
||||
}
|
||||
users_db[new_id] = new_user
|
||||
await log_and_broadcast(f"ACTION ADMIN: Pass éphémère créé pour {new_user['name']}.")
|
||||
return new_user
|
||||
|
||||
# --- Endpoint WebSocket pour les logs ---
|
||||
|
||||
@app.websocket("/ws/logs")
|
||||
async def websocket_endpoint(websocket: WebSocket):
|
||||
"""
|
||||
Point de connexion pour les clients qui veulent recevoir les logs en temps réel.
|
||||
"""
|
||||
await manager.connect(websocket)
|
||||
await log_and_broadcast("INFO: Un client de la console d'admin s'est connecté.")
|
||||
try:
|
||||
while True:
|
||||
# On maintient la connexion ouverte pour envoyer des messages
|
||||
# Le serveur n'attend pas de message du client ici.
|
||||
await asyncio.sleep(1)
|
||||
except WebSocketDisconnect:
|
||||
manager.disconnect(websocket)
|
||||
await log_and_broadcast("INFO: Un client de la console d'admin s'est déconnecté.")
|
||||
|
||||
# --- Point d'entrée pour lancer le serveur (avec uvicorn) ---
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
print("Lancement du serveur de l'API Cyberusgate sur http://0.0.0.0:8000")
|
||||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||
Reference in New Issue
Block a user