forked from React-Group/interstellar_ai
db.py comments
This commit is contained in:
parent
2eb4311f0a
commit
4fe4f90e1d
1 changed files with 48 additions and 34 deletions
82
py/db.py
82
py/db.py
|
@ -3,12 +3,13 @@ import json
|
||||||
import os
|
import os
|
||||||
import pycouchdb
|
import pycouchdb
|
||||||
|
|
||||||
|
|
||||||
class DB:
|
class DB:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
# Initialize the database dictionary to store user data
|
||||||
self.database = {}
|
self.database = {}
|
||||||
|
|
||||||
def ensure_username(self, data):
|
def ensure_username(self, data):
|
||||||
|
# Ensure a username can be retrieved either from username or email
|
||||||
if "username" in data:
|
if "username" in data:
|
||||||
return data.get("username")
|
return data.get("username")
|
||||||
elif "email" in data:
|
elif "email" in data:
|
||||||
|
@ -18,15 +19,17 @@ class DB:
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def hash_password(password):
|
def hash_password(password):
|
||||||
salt = "your_secret_salt"
|
# Hash the password with a salt for secure storage
|
||||||
|
salt = "your_secret_salt" # Consider using a secure random salt
|
||||||
hashed_password = hashlib.sha256((password + salt).encode()).hexdigest()
|
hashed_password = hashlib.sha256((password + salt).encode()).hexdigest()
|
||||||
return hashed_password
|
return hashed_password
|
||||||
|
|
||||||
def add_user(self, data):
|
def add_user(self, data):
|
||||||
|
# Add a new user to the database if username is unique
|
||||||
username = data.get("username")
|
username = data.get("username")
|
||||||
password = data.get("password")
|
password = data.get("password")
|
||||||
email = data.get("email")
|
email = data.get("email")
|
||||||
hashed_password = self.hash_password(password)
|
hashed_password = self.hash_password(password) # Hash the password
|
||||||
user_data = {
|
user_data = {
|
||||||
"hashed_password": hashed_password,
|
"hashed_password": hashed_password,
|
||||||
"email": email,
|
"email": email,
|
||||||
|
@ -35,112 +38,123 @@ class DB:
|
||||||
}
|
}
|
||||||
if username not in self.database:
|
if username not in self.database:
|
||||||
self.database[username] = user_data
|
self.database[username] = user_data
|
||||||
self.save_database()
|
self.save_database() # Save changes to the database
|
||||||
return True
|
return True
|
||||||
return False
|
return False # User already exists
|
||||||
|
|
||||||
def delete_user(self, data):
|
def delete_user(self, data):
|
||||||
|
# Delete a user from the database if credentials are valid
|
||||||
username = self.ensure_username(data)
|
username = self.ensure_username(data)
|
||||||
if not self.check_credentials(data):
|
if not self.check_credentials(data):
|
||||||
return False
|
return False # Invalid credentials
|
||||||
|
|
||||||
del self.database[username]
|
del self.database[username] # Remove user from database
|
||||||
self.save_database()
|
self.save_database() # Save changes
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def update_password(self, data):
|
def update_password(self, data):
|
||||||
|
# Update the user's password if credentials are valid
|
||||||
username = self.ensure_username(data)
|
username = self.ensure_username(data)
|
||||||
new_password = data.get("new_password")
|
new_password = data.get("new_password")
|
||||||
if not self.check_credentials(data):
|
if not self.check_credentials(data):
|
||||||
return False
|
return False # Invalid credentials
|
||||||
|
|
||||||
hashed_new_password = self.hash_password(new_password)
|
hashed_new_password = self.hash_password(new_password) # Hash the new password
|
||||||
self.database[username].update({"hashed_password": hashed_new_password})
|
self.database[username].update({"hashed_password": hashed_new_password})
|
||||||
self.save_database()
|
self.save_database() # Save changes
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def check_credentials(self, data):
|
def check_credentials(self, data):
|
||||||
|
# Verify if provided credentials match stored data
|
||||||
username = self.ensure_username(data)
|
username = self.ensure_username(data)
|
||||||
password = data.get("password")
|
password = data.get("password")
|
||||||
if username not in self.database:
|
if username not in self.database:
|
||||||
return False
|
return False # User not found
|
||||||
|
|
||||||
stored_hashed_password = self.database[username]["hashed_password"]
|
stored_hashed_password = self.database[username]["hashed_password"]
|
||||||
entered_hashed_password = self.hash_password(password)
|
entered_hashed_password = self.hash_password(password)
|
||||||
return stored_hashed_password == entered_hashed_password
|
return stored_hashed_password == entered_hashed_password # Check hashed password
|
||||||
|
|
||||||
def change_settings(self, data):
|
def change_settings(self, data):
|
||||||
|
# Change user settings if credentials are valid
|
||||||
username = self.ensure_username(data)
|
username = self.ensure_username(data)
|
||||||
if not self.check_credentials(data):
|
if not self.check_credentials(data):
|
||||||
return False
|
return False # Invalid credentials
|
||||||
|
|
||||||
self.database[username]["settings"] = data.get("data")
|
self.database[username]["settings"] = data.get("data") # Update settings
|
||||||
self.save_database()
|
self.save_database() # Save changes
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def get_settings(self, data):
|
def get_settings(self, data):
|
||||||
|
# Retrieve user settings if credentials are valid
|
||||||
username = self.ensure_username(data)
|
username = self.ensure_username(data)
|
||||||
if not self.check_credentials(data):
|
if not self.check_credentials(data):
|
||||||
return None
|
return None # Invalid credentials
|
||||||
|
|
||||||
send_back = self.database[username].get("settings")
|
send_back = self.database[username].get("settings") # Get settings
|
||||||
return send_back
|
return send_back
|
||||||
|
|
||||||
def change_history(self, data):
|
def change_history(self, data):
|
||||||
|
# Change user history if credentials are valid
|
||||||
username = self.ensure_username(data)
|
username = self.ensure_username(data)
|
||||||
if not self.check_credentials(data):
|
if not self.check_credentials(data):
|
||||||
return False
|
return False # Invalid credentials
|
||||||
|
|
||||||
self.database[username]["history"] = data.get("data")
|
self.database[username]["history"] = data.get("data") # Update history
|
||||||
self.save_database()
|
self.save_database() # Save changes
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def get_history(self, data):
|
def get_history(self, data):
|
||||||
|
# Retrieve user history if credentials are valid
|
||||||
username = self.ensure_username(data)
|
username = self.ensure_username(data)
|
||||||
if not self.check_credentials(data):
|
if not self.check_credentials(data):
|
||||||
return None
|
return None # Invalid credentials
|
||||||
|
|
||||||
send_back = self.database[username].get("history")
|
send_back = self.database[username].get("history") # Get history
|
||||||
return send_back
|
return send_back
|
||||||
|
|
||||||
def get_email(self, data):
|
def get_email(self, data):
|
||||||
|
# Retrieve user email if credentials are valid
|
||||||
username = self.ensure_username(data)
|
username = self.ensure_username(data)
|
||||||
if not self.check_credentials(data):
|
if not self.check_credentials(data):
|
||||||
return None
|
return None # Invalid credentials
|
||||||
|
|
||||||
send_back = self.database[username].get("email")
|
send_back = self.database[username].get("email") # Get email
|
||||||
return send_back
|
return send_back
|
||||||
|
|
||||||
def get_name(self, data):
|
def get_name(self, data):
|
||||||
|
# Retrieve username if credentials are valid
|
||||||
if not self.check_credentials(data):
|
if not self.check_credentials(data):
|
||||||
return None
|
return None # Invalid credentials
|
||||||
|
|
||||||
send_back = self.ensure_username(data)
|
send_back = self.ensure_username(data) # Get username
|
||||||
return send_back
|
return send_back
|
||||||
|
|
||||||
def save_database(self):
|
def save_database(self):
|
||||||
|
# Save the database to the specified storage (CouchDB or JSON file)
|
||||||
if os.environ.get("PRODUCTION") == "YES":
|
if os.environ.get("PRODUCTION") == "YES":
|
||||||
server = pycouchdb.Server("http://admin:admin@localhost:5984/")
|
server = pycouchdb.Server("http://admin:admin@localhost:5984/")
|
||||||
db = server.database("interstellar_ai")
|
db = server.database("interstellar_ai")
|
||||||
db.save(self.database)
|
db.save(self.database) # Save to CouchDB
|
||||||
|
|
||||||
else:
|
else:
|
||||||
with open("database.json", "w") as file:
|
with open("database.json", "w") as file:
|
||||||
json.dump(self.database, file)
|
json.dump(self.database, file) # Save to JSON file
|
||||||
|
|
||||||
def load_database(self):
|
def load_database(self):
|
||||||
|
# Load the database from the specified storage (CouchDB or JSON file)
|
||||||
if os.environ.get("PRODUCTION") == "YES":
|
if os.environ.get("PRODUCTION") == "YES":
|
||||||
server = pycouchdb.Server("http://admin:admin@localhost:5984/")
|
server = pycouchdb.Server("http://admin:admin@localhost:5984/")
|
||||||
db = server.database("interstellar_ai")
|
db = server.database("interstellar_ai")
|
||||||
if db:
|
if db:
|
||||||
self.database = db
|
self.database = db # Load from CouchDB
|
||||||
else:
|
else:
|
||||||
server.create("interstellar_ai")
|
server.create("interstellar_ai") # Create database if it doesn't exist
|
||||||
db = server.database("interstellar_ai")
|
db = server.database("interstellar_ai")
|
||||||
db.save(self.database)
|
db.save(self.database) # Save initial empty database
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
with open("database.json", "r") as file:
|
with open("database.json", "r") as file:
|
||||||
self.database = json.load(file)
|
self.database = json.load(file) # Load from JSON file
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
pass
|
pass # File not found, do nothing
|
||||||
|
|
Loading…
Reference in a new issue