from crewai import Agent, Crew, Process, Task from crewai.project import CrewBase, agent, crew, task from crewai import LLM from user_journey_service.core.config import settings from user_journey_service.processors.guardrail_fn import GuardRailFn from crewai import TaskOutput, Task from typing import Optional, Tuple, Union import re from functools import partial @CrewBase class UserJourney(): """User Journey Development crew""" def __init__(self): self.llm = LLM( model=f"{settings.PROVIDER}/{settings.LLM}", temperature=settings.TEMPERATURE, base_url=settings.OLLAMA_URL, seed=settings.SEED, # For reproducible results timeout=settings.TIMEOUT ) agents_config = 'config/agents.yaml' tasks_config = 'config/tasks.yaml' @agent def researcher(self) -> Agent: return Agent( config=self.agents_config['researcher'], llm=self.llm, verbose=True ) @agent def restructure(self) -> Agent: return Agent( config=self.agents_config['restructure'], llm=self.llm, verbose=True ) @agent def content_creature(self) -> Agent: return Agent( config=self.agents_config['content_creature'], llm=self.llm, verbose=True ) def research_task(self) -> Task: return Task( config=self.tasks_config['research_task'], output_file = "research_agent.md" ) def restructure_task(self,output_file: str,available_time: str) -> Task: return Task( # input={"available_time": available_time}, config=self.tasks_config['restructure_task'], context=[self.research_task()], output_file=output_file, guardrail=GuardRailFn.get_validate_content_guardrail(available_time) ##self.validate_content ) def content_creature_task(self,output_file) -> Task: return Task( config=self.tasks_config['content_creature_task'], # context=[self.restructure_task(output_file,available_time)], output_file=output_file, ) def first_stage_crew(self,output_file: str,available_time: str) -> Crew: """Creates the User Journey Development crew""" return Crew( agents=[ self.researcher(), self.restructure(), ], tasks=[ self.research_task(), self.restructure_task(output_file,available_time), ], process=Process.sequential, verbose=True, ) def second_stage_crew(self,output_file: str): """Creates the Content Development crew""" return Crew( agents=[ self.content_creature(), ], tasks=[ self.content_creature_task(output_file), ], process=Process.sequential, verbose=True, )