forked from React-Group/interstellar_ai
103 lines
3.9 KiB
Python
103 lines
3.9 KiB
Python
from flask import Flask, request, jsonify
|
|
from flask_cors import CORS
|
|
import secrets
|
|
from ai import AI
|
|
from db import DB
|
|
from OpenSSL import crypto
|
|
|
|
|
|
class API:
|
|
def __init__(self):
|
|
self.crypt_size = 4096
|
|
self.app = Flask(__name__)
|
|
self.ai_response = {}
|
|
self.ai = AI()
|
|
self.db = DB()
|
|
self.db.load_database()
|
|
CORS(self.app)
|
|
|
|
def run(self):
|
|
@self.app.route('/interstellar/api/ai_create', methods=['GET'])
|
|
def create_ai():
|
|
access_token = secrets.token_urlsafe(self.crypt_size)
|
|
self.ai_response[access_token] = ""
|
|
return jsonify({'status': 200, 'access_token': access_token})
|
|
|
|
@self.app.route('/interstellar/api/ai_send', methods=['POST'])
|
|
def send_ai():
|
|
data = request.get_json()
|
|
messages = data.get('messages')
|
|
model_type = data.get('model_type')
|
|
ai_model = data.get('ai_model')
|
|
access_token = data.get('access_token')
|
|
if access_token not in self.ai_response:
|
|
return jsonify({'status': 401, 'error': 'Invalid access token'})
|
|
|
|
if model_type == "local":
|
|
self.ai.process_local(ai_model, messages, self, access_token)
|
|
if model_type == "mistral":
|
|
self.ai.process_mistralai(ai_model, messages, self, access_token)
|
|
return jsonify({'status': 200})
|
|
|
|
@self.app.route('/interstellar/api/ai_get', methods=['GET'])
|
|
def get_ai():
|
|
data = request.args.get('access_token')
|
|
if data not in self.ai_response:
|
|
return jsonify({'status': 401, 'error': 'Invalid access token'})
|
|
return jsonify({'status': 200, 'response': self.ai_response[data]})
|
|
|
|
@self.app.route('/interstellar/api/db', methods=['POST'])
|
|
def db_manipulate():
|
|
action = request.args.get('action')
|
|
data = request.args.get('data')
|
|
if action == "create_account":
|
|
self.db.add_user(data)
|
|
elif action == "change_password":
|
|
self.db.update_password(data)
|
|
elif action == "get_data":
|
|
self.db.get_data(data)
|
|
elif action == "change_data":
|
|
self.db.change_data(data)
|
|
elif action == "check_credentials":
|
|
self.db.check_credentials(data)
|
|
|
|
email_address = "emailAddress"
|
|
common_name = "commonName"
|
|
country_name = "NT"
|
|
locality_name = "localityName"
|
|
state_or_province_name = "stateOrProvinceName"
|
|
organization_name = "organizationName"
|
|
organization_unit_name = "organizationUnitName"
|
|
serial_number = 0
|
|
validity_start_in_seconds = 0
|
|
validity_end_in_seconds = 10 * 365 * 24 * 60 * 60
|
|
k = crypto.PKey()
|
|
k.generate_key(crypto.TYPE_RSA, 4096)
|
|
cert = crypto.X509()
|
|
cert.get_subject().C = country_name
|
|
cert.get_subject().ST = state_or_province_name
|
|
cert.get_subject().L = locality_name
|
|
cert.get_subject().O = organization_name
|
|
cert.get_subject().OU = organization_unit_name
|
|
cert.get_subject().CN = common_name
|
|
cert.get_subject().emailAddress = email_address
|
|
cert.set_serial_number(serial_number)
|
|
cert.gmtime_adj_notBefore(validity_start_in_seconds)
|
|
cert.gmtime_adj_notAfter(validity_end_in_seconds)
|
|
cert.set_issuer(cert.get_subject())
|
|
cert.set_pubkey(k)
|
|
cert.sign(k, 'sha512')
|
|
|
|
with open("cert.pem", "wt") as f:
|
|
f.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode("utf-8"))
|
|
|
|
with open("key.pem", "wt") as f:
|
|
f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k).decode("utf-8"))
|
|
|
|
ssl_context = ("cert.pem", "key.pem")
|
|
self.app.run(debug=True, host='0.0.0.0', port=5000, ssl_context=ssl_context)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
api = API()
|
|
api.run()
|