from flask import Flask, request, jsonify from flask_cors import CORS import secrets import threading from ai import AI from db import DB from OpenSSL import crypto class API: def __init__(self): self.crypt_size = 1 self.app = Flask(__name__) self.ai_response = {} self.ai = AI() self.db = DB() self.db.load_database() self.ai_response_lock = threading.Lock() CORS(self.app) def run(self): @self.app.route('/interstellar/api/ai_create', methods=['GET']) def create_ai(): access_token = secrets.token_urlsafe(self.crypt_size) self.ai_response[access_token] = "" return jsonify({'status': 200, 'access_token': access_token}) @self.app.route('/interstellar/api/ai_send', methods=['POST']) def send_ai(): data = request.get_json() messages = data.get('messages') model_type = data.get('model_type') ai_model = data.get('ai_model') access_token = data.get('access_token') if access_token not in self.ai_response: return jsonify({'status': 401, 'error': 'Invalid access token'}) if model_type == "local": thread = threading.Thread(target=self.ai.process_local, args=(ai_model, messages, self, access_token)) thread.start() thread.join() return jsonify({'status': 200}) elif model_type == "mistral": api_key = data.get('api_key') thread = threading.Thread(target=self.ai.process_mistralai, args=(ai_model, messages, self, access_token, api_key)) thread.start() thread.join() return jsonify({'status': 200}) elif model_type == "openai": api_key = data.get('api_key') thread = threading.Thread(target=self.ai.process_openai, args=(ai_model, messages, self, access_token, api_key)) thread.start() thread.join() return jsonify({'status': 200}) elif model_type == "anthropic": api_key = data.get('api_key') thread = threading.Thread(target=self.ai.process_anthropic, args=(ai_model, messages, self, access_token, api_key)) thread.start() thread.join() return jsonify({'status': 200}) return jsonify({'status': 401, 'error': 'Invalid AI model type'}) @self.app.route('/interstellar/api/ai_get', methods=['GET']) def get_ai(): data = request.args.get('access_token') if data not in self.ai_response: return jsonify({'status': 401, 'error': 'Invalid access token'}) return jsonify({'status': 200, 'response': self.ai_response[data]}) @self.app.route('/interstellar/api/db', methods=['POST']) def db_manipulate(): action = request.args.get('action') data = request.args.get('data') if action == "create_account": self.db.add_user(data) elif action == "change_password": self.db.update_password(data) elif action == "get_data": self.db.get_data(data) elif action == "change_data": self.db.change_data(data) elif action == "check_credentials": self.db.check_credentials(data) email_address = "emailAddress" common_name = "commonName" country_name = "NT" locality_name = "localityName" state_or_province_name = "stateOrProvinceName" organization_name = "organizationName" organization_unit_name = "organizationUnitName" serial_number = 0 validity_start_in_seconds = 0 validity_end_in_seconds = 10 * 365 * 24 * 60 * 60 k = crypto.PKey() k.generate_key(crypto.TYPE_RSA, 4096) cert = crypto.X509() cert.get_subject().C = country_name cert.get_subject().ST = state_or_province_name cert.get_subject().L = locality_name cert.get_subject().O = organization_name cert.get_subject().OU = organization_unit_name cert.get_subject().CN = common_name cert.get_subject().emailAddress = email_address cert.set_serial_number(serial_number) cert.gmtime_adj_notBefore(validity_start_in_seconds) cert.gmtime_adj_notAfter(validity_end_in_seconds) cert.set_issuer(cert.get_subject()) cert.set_pubkey(k) cert.sign(k, 'sha512') with open("cert.pem", "wt") as f: f.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode("utf-8")) with open("key.pem", "wt") as f: f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k).decode("utf-8")) ssl_context = ("cert.pem", "key.pem") self.app.run(debug=True, host='0.0.0.0', port=5000)