A Prompt Execution Engine

When managing a large number of LLM tasks, it’s beneficial to execute them in parallel. Since this is a classic IO-bound process, I decided to use the asyncio library and leverage async/await.

Here’s the execution block from a version of The Big Lebowski Fact-Checker. In this example, the doc_chunks passed into the function are all strings with a certain number of tokens.

async def get_document_facts(system_message: dict[str, str], user_facts: list[str], doc_chunks: list[str]) -> list[list[str]]:
    answer_refs = dict()
    n_chunks = len(doc_chunks)
    for chunk_id, doc_chunk in enumerate(doc_chunks):
        # Block 1: kick off all the work
        messages = list()
        doc_prompt = GET_DOCUMENT_FACTS_PROMPT.format(
            USER_FACTS=nl.join(user_facts), DOCUMENT_TEXT=doc_chunk
        )
        user_message = {"role": "user", "content": doc_prompt}
        messages.append(system_message)
        messages.append(user_message)
        answer_refs[chunk_id] = asyncio.create_task(get_reply(messages))
        await asyncio.sleep(0)

    answers = dict()
    timeout = 60
    start_time = time.time()
    while len(answers.keys()) < n_chunks and (time.time() - start_time) <= timeout:
        # Block 2: wait for the work to finish
        for task_name, task in answer_refs.items():
            if task.done() and task_name not in answers.keys():
                answers[task_name] = task.result()
        await asyncio.sleep(1)

    if len(answers.keys()) < n_chunks:
        logging.warn("timeout occurred")

    return_values = []
    for reply_msg in answers.values():
        # Block 3: parse the results
        reply_string = reply_msg["content"].replace("\\n", "\n")
        if (
            reply_string.find("documentFacts") >= 0
            and reply_string.find("NONE") <= 0
        ):
            fact_json = get_json(reply_string)
            documentFacts = fact_json.get("documentFacts", list())
            for x in documentFacts:
                return_values.append(x)
    return return_values

In Block 1, I iterate over all of the chunks that need summarization. I use the asyncio.create_task method to define the work and then await asyncio.sleep(0) to allow the task to start.

In Block 2, I wait for all tasks to complete. I set an arbitrary timeout for the longest running task of 60 seconds. The await asyncio.sleep(1) statement is inserted to prevent the loop from consuming all CPU cycles.

Finally, in Block 3, I parse the results from each task. Some tasks might return NONE instead of JSON (due to how I wrote the prompt), so I ignore those.

For tasks that require retrying or implementing fallback logic, I can modify the logic in Block 2 to accommodate that.

Conclusion

Efficiently managing multiple LLM tasks is crucial for improving the performance of your application. By using the asyncio library and async/await, you can execute tasks in parallel and handle them more effectively