|
|
import os |
|
|
import utils as U |
|
|
from javascript import require |
|
|
|
|
|
|
|
|
def _skill_loader(skill: str): |
|
|
file_path = os.path.abspath(os.path.dirname(__file__)) |
|
|
file_path = U.f_join(file_path, 'ActionLib', skill + '.js') |
|
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
|
content = file.read() |
|
|
return content |
|
|
|
|
|
|
|
|
def process_message(message): |
|
|
retry = 3 |
|
|
error = None |
|
|
while retry > 0: |
|
|
try: |
|
|
babel = require("@babel/core") |
|
|
babel_generator = require("@babel/generator").default |
|
|
code = message |
|
|
parsed = babel.parse(code) |
|
|
functions = [] |
|
|
assert len(list(parsed.program.body)) > 0, "No functions found" |
|
|
for i, node in enumerate(parsed.program.body): |
|
|
if node.type != "FunctionDeclaration": |
|
|
continue |
|
|
node_type = ( |
|
|
"AsyncFunctionDeclaration" |
|
|
if node["async"] |
|
|
else "FunctionDeclaration" |
|
|
) |
|
|
functions.append( |
|
|
{ |
|
|
"name": node.id.name, |
|
|
"type": node_type, |
|
|
"body": babel_generator(node).code, |
|
|
"params": list(node["params"]), |
|
|
} |
|
|
) |
|
|
|
|
|
main_function = None |
|
|
for function in reversed(functions): |
|
|
if function["type"] == "AsyncFunctionDeclaration": |
|
|
main_function = function |
|
|
break |
|
|
assert ( |
|
|
main_function is not None |
|
|
), "No async function found. Your main function must be async." |
|
|
assert ( |
|
|
len(main_function["params"]) == 1 |
|
|
and main_function["params"][0].name == "bot" |
|
|
), f"Main function {main_function['name']} must take a single argument named 'bot'" |
|
|
program_code = "\n\n".join(function["body"] for function in functions) |
|
|
exec_code = f"await {main_function['name']}(bot);" |
|
|
return { |
|
|
"program_code": program_code, |
|
|
"program_name": main_function["name"], |
|
|
"exec_code": exec_code, |
|
|
} |
|
|
except Exception as e: |
|
|
retry -= 1 |
|
|
error = e |
|
|
return f"Error parsing action response (before program execution): {error}" |
|
|
|
|
|
|
|
|
def load_control_primitives(primitive_names=None): |
|
|
file_path = os.path.abspath(os.path.dirname(__file__)) |
|
|
if primitive_names is None: |
|
|
primitive_names = [ |
|
|
primitives[:-3] |
|
|
for primitives in os.listdir(f"{file_path}/control_primitives") |
|
|
if primitives.endswith(".js") |
|
|
] |
|
|
primitives = [ |
|
|
U.load_text(f"{file_path}/control_primitives/{primitive_name}.js") |
|
|
for primitive_name in primitive_names |
|
|
] |
|
|
return primitives |
|
|
|
|
|
|
|
|
def skill_loader(skill: str): |
|
|
parsed_result = process_message(_skill_loader(skill)) |
|
|
return "\n".join(load_control_primitives()) + "\n" + parsed_result["program_code"] + "\n" + \ |
|
|
parsed_result["exec_code"] |
|
|
|
|
|
|
|
|
|