Also add itsdangerous to requirements.txt (missing implicit dep of starlette SessionMiddleware) and a create_admin.py script for bootstrapping a system-role user with all permissions. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
79 lines
2.7 KiB
Python
79 lines
2.7 KiB
Python
"""
|
|
Create (or promote) an admin user with all permissions.
|
|
|
|
Usage:
|
|
python -m scripts.create_admin --email admin@example.com --password secret
|
|
python -m scripts.create_admin --email admin@example.com --password secret --role system
|
|
"""
|
|
import argparse
|
|
import sys
|
|
|
|
from web.auth.password import hash_password
|
|
from web.database import SessionLocal
|
|
from web.models.rbac import Permission, Role, UserRole, role_permissions
|
|
from web.models.user import User, UserRoleEnum, UserStatusEnum
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--email", required=True)
|
|
parser.add_argument("--password", required=True)
|
|
parser.add_argument("--first-name", default="Admin")
|
|
parser.add_argument("--last-name", default="")
|
|
parser.add_argument("--phone", default="")
|
|
parser.add_argument("--role", default="system", choices=["admin", "system"])
|
|
args = parser.parse_args()
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
user = db.query(User).filter(User.email == args.email).first()
|
|
if user:
|
|
user.role = UserRoleEnum(args.role)
|
|
user.status = UserStatusEnum.active
|
|
user.is_email_confirmed = True
|
|
user.password_hash = hash_password(args.password)
|
|
print(f"Updated existing user: {args.email} → role={args.role}")
|
|
else:
|
|
user = User(
|
|
first_name=args.first_name,
|
|
last_name=args.last_name,
|
|
email=args.email,
|
|
phone=args.phone,
|
|
password_hash=hash_password(args.password),
|
|
role=UserRoleEnum(args.role),
|
|
status=UserStatusEnum.active,
|
|
is_email_confirmed=True,
|
|
)
|
|
db.add(user)
|
|
db.flush()
|
|
print(f"Created user: {args.email} role={args.role}")
|
|
|
|
# Assign all permissions via the system role
|
|
system_role = db.query(Role).filter(Role.name == args.role).first()
|
|
if system_role:
|
|
existing = db.query(UserRole).filter(
|
|
UserRole.user_id == user.id,
|
|
UserRole.role_id == system_role.id,
|
|
).first()
|
|
if not existing:
|
|
db.add(UserRole(user_id=user.id, role_id=system_role.id))
|
|
|
|
db.commit()
|
|
|
|
# Report permissions this user now has
|
|
perms = (
|
|
db.query(Permission.name)
|
|
.join(role_permissions, Permission.id == role_permissions.c.permission_id)
|
|
.join(UserRole, UserRole.role_id == role_permissions.c.role_id)
|
|
.filter(UserRole.user_id == user.id)
|
|
.all()
|
|
)
|
|
print("Permissions:", [p.name for p in perms] or "(inherited via role)")
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|