暫無描述

authlib.py 2.9KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. import hashlib, time, json
  2. from cryptography.fernet import Fernet
  3. from connect import connection, engine, metadata
  4. import sqlalchemy as db
  5. def hash_pass(password):
  6. return hashlib.sha256(password.encode()).hexdigest()
  7. class Auth:
  8. def __init__(self, session, expire=0):
  9. self.session = session
  10. self.id = None
  11. self.email = None
  12. self.fernet = Fernet(b'3UH3USxvBcFITpnVa2gvTUIMO5jbc8jqU_Q1O6SNBLs=')
  13. if session.get("id"):
  14. value = json.loads(self.fernet.decrypt(session["id"]))
  15. self.id = value["id"]
  16. self.email = value["email"]
  17. # Remember to always change
  18. self.expire=expire
  19. def checkAuth(self):
  20. if self.expire and time.time() - self.session["tiempo"] > self.expire:
  21. self.delAuth()
  22. else:
  23. self.session["tiempo"] = time.time()
  24. return self.session.get("gallitosccom")
  25. def setAuth(self, id, email):
  26. self.session["gallitosccom"] = True
  27. self.id = id
  28. self.email = email
  29. self.session["id"] = self.fernet.encrypt(json.dumps({"id": id, "email": email}).encode())
  30. if self.expire:
  31. self.session["tiempo"] = time.time()
  32. def delAuth(self):
  33. if self.session.get("gallitosccom"):
  34. self.session.pop("gallitosccom", None)
  35. self.session.pop("id", None)
  36. # self.session.pop("email", None)
  37. def do_login(self, username, password):
  38. password = hash_pass(password)
  39. usuarios = db.Table('usuarios', metadata, autoload=True, autoload_with=engine)
  40. query = db.select([usuarios])
  41. if username.find("@") > 1:
  42. print("email")
  43. query = query.select_from(usuarios).where(usuarios.columns.email == username)
  44. else:
  45. print("username")
  46. query = query.select_from(usuarios).where(usuarios.columns.username == username)
  47. result = connection.execute(query).fetchone()
  48. if result and password == result["password"]:
  49. print(result)
  50. self.setAuth(result["id"], result["email"])
  51. # Añadir feature de last login.
  52. return True
  53. return False
  54. def checkRole(self, tipo):
  55. tables = {"admin": "administracion", "madre": "madres", "enfermera":"enfermeras", "facultad": "facultad", "estudiante": "estudiantes"}
  56. if tipo in tables:
  57. table = tables[tipo]
  58. else:
  59. return False
  60. usuarios = db.Table('usuarios', metadata, autoload=True, autoload_with=engine)
  61. role = db.Table(table, metadata, autoload=True, autoload_with=engine)
  62. query = db.select([usuarios, role])
  63. query = query.select_from(usuarios.join(role, usuarios.columns.id == role.columns.user_id)).where(role.columns.user_id == self.id)
  64. result = connection.execute(query).fetchone()
  65. if result:
  66. return True
  67. return False
  68. def do_logout(self):
  69. self.delAuth()