Revised all the system, optimized everything
(sort of v2)
This commit is contained in:
parent
c67edc9876
commit
c1a2cad45e
272
credsmanager.py
272
credsmanager.py
|
@ -1,102 +1,202 @@
|
|||
from os.path import exists
|
||||
from enum import Enum
|
||||
from subprocess import Popen, PIPE
|
||||
from json import dumps as js_d, loads as js_l
|
||||
from cryptography.fernet import Fernet
|
||||
import json
|
||||
from subprocess import Popen, PIPE
|
||||
from os.path import exists
|
||||
|
||||
|
||||
class ReturnCode(Enum):
|
||||
UNKNOW_ERROR = 0
|
||||
REG_KEY_ERROR = 1
|
||||
ADD_REG_KEY_ERROR = 2
|
||||
DEL_REG_KEY_ERROR = 3
|
||||
INVALID_PATH = 4
|
||||
REG_KEY_DELETED = 5
|
||||
class CredentialsManager():
|
||||
"""Used to acces crypted json credentials path using fernet"""
|
||||
|
||||
def __init__(self, credentials_file_path: str, reg_key_path: str = "CredentialsManager", key_name: str = "fernkey"):
|
||||
"""Create a CredentialsManager object used to acces crypted json credentials path using fernet\n
|
||||
Parameters
|
||||
----------
|
||||
* credentials_file_path : str\n
|
||||
The path of your credentials file, MUST be .json, if not or if file doesn't exist, create a new one
|
||||
* reg_key_path : str\n
|
||||
The path where the fernet key will be or is stored in the windows registery.\n
|
||||
>>> "IN THE CURRENT USER REG PATH (HKEY_CURRENT_USER\\) ONLY. ENTER NAME OF THE 'FOLDER' YOU WANT THE KEY TO BE IN"\n
|
||||
* key_name : str\n
|
||||
The name of the registery key in which the encryption key will be stored"""
|
||||
|
||||
|
||||
# Storing the reg key path in self to acces it later
|
||||
self.reg_key_path = f"HKEY_CURRENT_USER\\{reg_key_path}"
|
||||
|
||||
class Cryptography():
|
||||
def __init__(self, appname: str):
|
||||
self.appname = appname
|
||||
# Storing the reg key name in self to acces it later
|
||||
self.reg_key_name = key_name
|
||||
|
||||
key = self.__get_key()
|
||||
if type(key) is not bytes:
|
||||
key = self.__create_key()
|
||||
if type(key) is not bytes:
|
||||
raise ReturnCode.REG_KEY_ERROR
|
||||
# Get/Create a new key
|
||||
self.key = self.get_key()
|
||||
|
||||
self.fernet = Fernet(key)
|
||||
|
||||
def __get_key(self):
|
||||
try:
|
||||
proces = Popen(f"reg QUERY HKEY_CURRENT_USER\{self.appname}", stdout=PIPE, stderr=PIPE) #Get key
|
||||
if proces.stderr.read() !=b'':
|
||||
return self.__create_key()
|
||||
else:
|
||||
return proces.stdout.read().decode("CP850").splitlines()[2].rsplit()[2].encode() #Return key as binary
|
||||
|
||||
except:
|
||||
return ReturnCode.UNKNOW_ERROR
|
||||
# Initialise our Fernet object with the key, will raise a error if key wasn't create and returned None
|
||||
self.Fernet = Fernet(key=self.key)
|
||||
|
||||
|
||||
def __create_key(self):
|
||||
try:
|
||||
key = Fernet.generate_key()
|
||||
proces = Popen(f"reg ADD HKEY_CURRENT_USER\{self.appname} /v fernkey /d {key.decode()}", stdout=PIPE, stderr=PIPE) #Add key in reg str
|
||||
if proces.stderr.read() != b'':
|
||||
return ReturnCode.ADD_REG_KEY_ERROR
|
||||
# Replace credential file extansion by .json
|
||||
# Replace last element after a dot by json in the file path
|
||||
# Store it in self to acces it later
|
||||
self.creds_file_path = credentials_file_path.replace(credentials_file_path.split(".")[-1], "json")
|
||||
|
||||
# Check if credentials file exist, if not, try creating new one and write empty line
|
||||
if not exists(self.creds_file_path):
|
||||
with open(self.creds_file_path, "w") as f:
|
||||
f.write("")
|
||||
|
||||
|
||||
def get_key(self) -> bytes | None:
|
||||
"""Get the Fernet encryption key in the windows registery, if not found, create a new one"""
|
||||
# Get key using reg QUERY in subprocess
|
||||
proces = Popen(f"reg QUERY {self.reg_key_path} /v {self.reg_key_name}", stdout=PIPE, stderr=PIPE)
|
||||
|
||||
# Check if error output is not empty
|
||||
if proces.stderr.read() !=b'':
|
||||
# Create a new key and return it
|
||||
return self.create_key()
|
||||
|
||||
else:
|
||||
# Return key as binary
|
||||
return proces.stdout.read().decode("CP850").rsplit()[3].encode()
|
||||
|
||||
|
||||
def create_key(self):
|
||||
"""Create a new key, store it in the windows registery and return it"""
|
||||
# Generate a new key with Fernet
|
||||
key = Fernet.generate_key()
|
||||
|
||||
# Add it to registery using reg ADD in subprocess
|
||||
proces = Popen(f"reg ADD {self.reg_key_path} /v {self.reg_key_name} /d {key.decode()}", stdout=PIPE, stderr=PIPE)
|
||||
|
||||
# Check if process executed successfully, if yes return key, if not return None
|
||||
if proces.stderr.read() == b'':
|
||||
self.key = key
|
||||
return key
|
||||
|
||||
except:
|
||||
return ReturnCode.REG_KEY_ERROR
|
||||
|
||||
|
||||
def delete_key(self):
|
||||
"""Delete the key used for decryption, WARNING ONCE THE KEY IS DELETED ALL YOUR CRYPTED DATA ARE UNDECRYPTABLE AND LOST"""
|
||||
try:
|
||||
proces = Popen(f"reg DELETE HKEY_CURRENT_USER\{self.appname} /f", stdout=PIPE, stderr=PIPE)
|
||||
stderr = proces.stderr.read()
|
||||
if stderr != b'':
|
||||
return ReturnCode.DEL_REG_KEY_ERROR
|
||||
return ReturnCode.REG_KEY_DELETED
|
||||
|
||||
except:
|
||||
return ReturnCode.UNKNOW_ERROR
|
||||
|
||||
def decrypt(self, data: bytes):
|
||||
return self.fernet.decrypt(data)
|
||||
|
||||
def encrypt(self, data: bytes):
|
||||
return self.fernet.encrypt(data)
|
||||
|
||||
|
||||
class Credentials():
|
||||
def __init__(self, appname: str, credentials_path: str):
|
||||
"""Used to decode crypted credential csv files"""
|
||||
|
||||
if not exists(self.creds_path):
|
||||
return ReturnCode.INVALID_PATH
|
||||
|
||||
self.creds_path = credentials_path
|
||||
self.crypt = Cryptography(appname)
|
||||
|
||||
|
||||
def get_creds(self):
|
||||
"""Return the uncrypted version of the usernames and passwords as a list of tuple : [(username0, password0), (username1, password1), ...]"""
|
||||
with open(self.creds_path, "rb") as f:
|
||||
return json.loads(self.crypt.decrypt(f.read()).decode())
|
||||
|
||||
|
||||
def write_creds(self, data: list[tuple[str, str]]):
|
||||
with open(self.creds_path, "wb") as f:
|
||||
f.write(self.crypt.encrypt(json.dumps(data).encode()))
|
||||
return
|
||||
|
||||
def add_cred(self, username: str, password: str):
|
||||
"""Add username and password to the crypted file"""
|
||||
|
||||
data = self.get_creds()
|
||||
data.append((username, password))
|
||||
self.write_creds(data)
|
||||
def delete_key(self, delete_dir: bool = False):
|
||||
"""Delete the encryption key from the windows registery, return True if removed successfully, and False if not.\n
|
||||
>>> "WARNING YOU CAN'T RETRIEVE ENCRYPTED DATA AFTER REMOVAL"\n
|
||||
Parameters
|
||||
-----------
|
||||
* delete_dir bool (Defaukt = False)\n
|
||||
If set on True, also delete the directory of the stored key\n
|
||||
>>> "WARNING ALL THE KEYS CONTAINED IN THE DIRECTORY WILL BE DELETED" """
|
||||
|
||||
if delete_dir:
|
||||
# Delete directory and all the key from registery using reg DELETE in subprocess
|
||||
command = f"reg DELETE {self.reg_key_path} /f"
|
||||
else:
|
||||
# Delete key from registery using reg DELETE in subprocess
|
||||
command = f"reg DELETE {self.reg_key_path} /v {self.reg_key_name} /f"
|
||||
|
||||
# Execute command
|
||||
proces = Popen(command, stdout=PIPE, stderr=PIPE)
|
||||
|
||||
# Check if process executed successfully, if yes return True, if not return False
|
||||
if proces.stderr.read() == b'':
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def decrypt(self, data: bytes) -> dict:
|
||||
"""Return the decrypted version of the encrypted bytes as a json dict"""
|
||||
# Decrypt using Fernet, then decode into str, then load it with json
|
||||
return js_l(self.Fernet.decrypt(data).decode())
|
||||
|
||||
|
||||
def encrypt(self, data: str) -> bytes:
|
||||
"""Return the encrypted version of the Serialize (means it's a dict encoded as a str by json) json dict as encrypted bytes"""
|
||||
# Dump using json, then encode it into UTF-8, then encrypt it using Fernet
|
||||
return self.Fernet.encrypt(js_d(data).encode())
|
||||
|
||||
|
||||
def get_credentials(self):
|
||||
"""Return the decrypted dict of username and passwords (format : {username: password, ...})"""
|
||||
|
||||
# Open credentials file
|
||||
with open(self.creds_file_path, "rb") as f:
|
||||
# Read content of the file
|
||||
data = f.read()
|
||||
|
||||
# If file is empty, return an empty dict to avoid Fernet from crashing while trying to decrypt void
|
||||
if data == b"":
|
||||
return {}
|
||||
|
||||
# Decrypt and return the dict
|
||||
return self.decrypt(data)
|
||||
|
||||
|
||||
def write_credentials(self, new_credentials_dict: dict[str: str]):
|
||||
"""Write the dict data in the credentials file\n
|
||||
Parameters
|
||||
----------
|
||||
* new_credentials_dict : {str: str}\n
|
||||
The dict to encrypt and write in the credentials file\n
|
||||
>>> "WARNING. ERASE ALL EXISTING DATA IN THE FILE, IF YOU WANT TO APPEND DATA, SEE CredentialsManager.add_credentials()" """
|
||||
|
||||
# Open credentials file
|
||||
with open(self.creds_file_path, "wb") as f:
|
||||
# Encrypt, and write the encrypted data in the file
|
||||
f.write(self.encrypt(new_credentials_dict))
|
||||
return
|
||||
|
||||
|
||||
def add_credentials(self, new_credentials_data_dict: dict[str: str]):
|
||||
"""Append data to the encrypted json credentials file\n
|
||||
Parameters
|
||||
----------
|
||||
* new_credentials_data_dict: {str: str}\n
|
||||
The data to append to the encrypted credentials json dict"""
|
||||
|
||||
# Use get_credentials and write_credentials
|
||||
|
||||
# Get current data
|
||||
old_dict = self.get_credentials()
|
||||
|
||||
|
||||
# Set our new dict with the old value to keep
|
||||
new_dict = old_dict
|
||||
|
||||
|
||||
# Get all the keys of the new data
|
||||
# For each key add them to the new dict
|
||||
for key in new_credentials_data_dict.keys():
|
||||
new_dict[key] = new_credentials_data_dict[key]
|
||||
|
||||
# Write our new dict to the encrypted json credentials file
|
||||
self.write_credentials(new_dict)
|
||||
return
|
||||
|
||||
|
||||
def remove_credentials(self, credentials_to_remove: dict[str: str], match_password: bool = True):
|
||||
"""Remove the specified data from the encrypted json credentials file\n
|
||||
Parameters
|
||||
----------
|
||||
* credentials_to_remove : {str: str}\n
|
||||
The data to remove from the encrypted credentials json dict
|
||||
* match_password : bool (Default = True)\n
|
||||
If set on True, the username AND password must match to be removed, if set on False, ONLY username must match"""
|
||||
|
||||
# Use get_credentials and write_credentials
|
||||
|
||||
# Get current data
|
||||
dict = self.get_credentials()
|
||||
|
||||
# Get all the keys to remove
|
||||
# For each key, if the key is in the dict, remove it
|
||||
# Using a try except in case the key isn't in the dict
|
||||
for key in credentials_to_remove.keys():
|
||||
try:
|
||||
# If match_password is True, check if passwords are equals, if not, pass
|
||||
if match_password:
|
||||
if dict[key] == credentials_to_remove[key]:
|
||||
dict.pop(key)
|
||||
else:
|
||||
dict.pop(key)
|
||||
except:
|
||||
pass
|
||||
|
||||
# Write our dict to the encrypted json credentials file
|
||||
self.write_credentials(dict)
|
||||
return
|
Loading…
Reference in a new issue