Skip to content

API Reference

Agent

Source code in agency_swarm/agents/agent.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
class Agent():
    _shared_state: SharedState = None

    @property
    def assistant(self):
        if not hasattr(self, '_assistant') or self._assistant is None:
            raise Exception("Assistant is not initialized. Please run init_oai() first.")
        return self._assistant

    @assistant.setter
    def assistant(self, value):
        self._assistant = value

    @property
    def functions(self):
        return [tool for tool in self.tools if issubclass(tool, BaseTool)]

    @property
    def shared_state(self):
        return self._shared_state

    @shared_state.setter
    def shared_state(self, value):
        self._shared_state = value
        for tool in self.tools:
            if issubclass(tool, BaseTool):
                tool._shared_state = value

    def response_validator(self, message: str | list) -> str:
        """
        Validates the response from the agent. If the response is invalid, it must raise an exception with instructions
        for the caller agent on how to proceed.

        Parameters:
            message (str): The response from the agent.

        Returns:
            str: The validated response.
        """
        return message

    def __init__(
            self,
            id: str = None,
            name: str = None,
            description: str = None,
            instructions: str = "",
            tools: List[Union[Type[BaseTool], Type[FileSearch], Type[CodeInterpreter], type[Retrieval]]] = None,
            tool_resources: ToolResources = None,
            temperature: float = None,
            top_p: float = None,
            response_format: Union[str, dict, type] = "auto",
            tools_folder: str = None,
            files_folder: Union[List[str], str] = None,
            schemas_folder: Union[List[str], str] = None,
            api_headers: Dict[str, Dict[str, str]] = None,
            api_params: Dict[str, Dict[str, str]] = None,
            file_ids: List[str] = None,
            metadata: Dict[str, str] = None,
            model: str = "gpt-4o-2024-08-06",
            validation_attempts: int = 1,
            max_prompt_tokens: int = None,
            max_completion_tokens: int = None,
            truncation_strategy: dict = None,
            examples: List[ExampleMessage] = None,
            file_search: FileSearchConfig = None,
            parallel_tool_calls: bool = True,
            refresh_from_id: bool = True,
    ):
        """
        Initializes an Agent with specified attributes, tools, and OpenAI client.

        Parameters:
            id (str, optional): Loads the assistant from OpenAI assistant ID. Assistant will be created or loaded from settings if ID is not provided. Defaults to None.
            name (str, optional): Name of the agent. Defaults to the class name if not provided.
            description (str, optional): A brief description of the agent's purpose. Defaults to None.
            instructions (str, optional): Path to a file containing specific instructions for the agent. Defaults to an empty string.
            tools (List[Union[Type[BaseTool], Type[Retrieval], Type[CodeInterpreter]]], optional): A list of tools (as classes) that the agent can use. Defaults to an empty list.
            tool_resources (ToolResources, optional): A set of resources that are used by the assistant's tools. The resources are specific to the type of tool. For example, the code_interpreter tool requires a list of file IDs, while the file_search tool requires a list of vector store IDs. Defaults to None.
            temperature (float, optional): The temperature parameter for the OpenAI API. Defaults to None.
            top_p (float, optional): The top_p parameter for the OpenAI API. Defaults to None.
            response_format (Union[str, Dict, type], optional): The response format for the OpenAI API. If BaseModel is provided, it will be converted to a response format. Defaults to None.
            tools_folder (str, optional): Path to a directory containing tools associated with the agent. Each tool must be defined in a separate file. File must be named as the class name of the tool. Defaults to None.
            files_folder (Union[List[str], str], optional): Path or list of paths to directories containing files associated with the agent. Defaults to None.
            schemas_folder (Union[List[str], str], optional): Path or list of paths to directories containing OpenAPI schemas associated with the agent. Defaults to None.
            api_headers (Dict[str,Dict[str, str]], optional): Headers to be used for the openapi requests. Each key must be a full filename from schemas_folder. Defaults to an empty dictionary.
            api_params (Dict[str, Dict[str, str]], optional): Extra params to be used for the openapi requests. Each key must be a full filename from schemas_folder. Defaults to an empty dictionary.
            metadata (Dict[str, str], optional): Metadata associated with the agent. Defaults to an empty dictionary.
            model (str, optional): The model identifier for the OpenAI API. Defaults to "gpt-4o".
            validation_attempts (int, optional): Number of attempts to validate the response with response_validator function. Defaults to 1.
            max_prompt_tokens (int, optional): Maximum number of tokens allowed in the prompt. Defaults to None.
            max_completion_tokens (int, optional): Maximum number of tokens allowed in the completion. Defaults to None.
            truncation_strategy (TruncationStrategy, optional): Truncation strategy for the OpenAI API. Defaults to None.
            examples (List[Dict], optional): A list of example messages for the agent. Defaults to None.
            file_search (FileSearchConfig, optional): A dictionary containing the file search tool configuration. Defaults to None.
            parallel_tool_calls (bool, optional): Whether to enable parallel function calling during tool use. Defaults to True.
            refresh_from_id (bool, optional): Whether to load and update the agent from the OpenAI assistant ID when provided. Defaults to True.

        This constructor sets up the agent with its unique properties, initializes the OpenAI client, reads instructions if provided, and uploads any associated files.
        """
        # public attributes
        self.id = id
        self.name = name if name else self.__class__.__name__
        self.description = description
        self.instructions = instructions
        self.tools = tools[:] if tools is not None else []
        self.tools = [tool for tool in self.tools if tool.__name__ != "ExampleTool"]
        self.tool_resources = tool_resources
        self.temperature = temperature
        self.top_p = top_p
        self.response_format = response_format
        # use structured outputs if response_format is a BaseModel
        if isinstance(self.response_format, type):
            self.response_format = type_to_response_format_param(self.response_format)
        self.tools_folder = tools_folder
        self.files_folder = files_folder if files_folder else []
        self.schemas_folder = schemas_folder if schemas_folder else []
        self.api_headers = api_headers if api_headers else {}
        self.api_params = api_params if api_params else {}
        self.metadata = metadata if metadata else {}
        self.model = model
        self.validation_attempts = validation_attempts
        self.max_prompt_tokens = max_prompt_tokens
        self.max_completion_tokens = max_completion_tokens
        self.truncation_strategy = truncation_strategy
        self.examples = examples
        self.file_search = file_search
        self.parallel_tool_calls = parallel_tool_calls
        self.refresh_from_id = refresh_from_id

        self.settings_path = './settings.json'

        # private attributes
        self._assistant: Any = None
        self._shared_instructions = None

        # init methods
        self.client = get_openai_client()
        self._read_instructions()

        # upload files
        self._upload_files()
        if file_ids:
            print("Warning: 'file_ids' parameter is deprecated. Please use 'tool_resources' parameter instead.")
            self.add_file_ids(file_ids, "file_search")

        self._parse_schemas()
        self._parse_tools_folder()

    # --- OpenAI Assistant Methods ---

    def init_oai(self):
        """
        Initializes the OpenAI assistant for the agent.

        This method handles the initialization and potential updates of the agent's OpenAI assistant. It loads the assistant based on a saved ID, updates the assistant if necessary, or creates a new assistant if it doesn't exist. After initialization or update, it saves the assistant's settings.

        Output:
            self: Returns the agent instance for chaining methods or further processing.
        """

        # check if settings.json exists
        path = self.get_settings_path()

        # load assistant from id
        if self.id:
            if not self.refresh_from_id:
                return self

            self.assistant = self.client.beta.assistants.retrieve(self.id)
            # Assign attributes to self if they are None
            self.instructions = self.instructions or self.assistant.instructions
            self.name = self.name if self.name != self.__class__.__name__ else self.assistant.name
            self.description = self.description or self.assistant.description
            self.temperature = self.assistant.temperature if self.temperature is None else self.temperature
            self.top_p = self.top_p or self.assistant.top_p
            self.response_format = self.response_format or self.assistant.response_format
            if not isinstance(self.response_format, str):
                self.response_format = self.response_format or self.response_format.model_dump()
            else:
                self.response_format = self.response_format or self.assistant.response_format
            self.tool_resources = self.tool_resources or self.assistant.tool_resources.model_dump()
            self.metadata = self.metadata or self.assistant.metadata
            self.model = self.model or self.assistant.model
            self.tool_resources = self.tool_resources or self.assistant.tool_resources.model_dump()

            for tool in self.assistant.tools:
                # update assistants created with v1
                if tool.type == "retrieval":
                    self.client.beta.assistants.update(self.id, tools=self.get_oai_tools())

            # update assistant if parameters are different
            if not self._check_parameters(self.assistant.model_dump()):
                self._update_assistant()

            return self

        # load assistant from settings
        if os.path.exists(path):
            with open(path, 'r') as f:
                settings = json.load(f)
                # iterate settings and find the assistant with the same name
                for assistant_settings in settings:
                    if assistant_settings['name'] == self.name:
                        try:
                            self.assistant = self.client.beta.assistants.retrieve(assistant_settings['id'])
                            self.id = assistant_settings['id']

                            # update assistant if parameters are different
                            if not self._check_parameters(self.assistant.model_dump()):
                                print("Updating agent... " + self.name)
                                self._update_assistant()

                            if self.assistant.tool_resources:
                                self.tool_resources = self.assistant.tool_resources.model_dump()

                            self._update_settings()
                            return self
                        except NotFoundError:
                            continue

        # create assistant if settings.json does not exist or assistant with the same name does not exist
        self.assistant = self.client.beta.assistants.create(
            model=self.model,
            name=self.name,
            description=self.description,
            instructions=self.instructions,
            tools=self.get_oai_tools(),
            tool_resources=self.tool_resources,
            metadata=self.metadata,
            temperature=self.temperature,
            top_p=self.top_p,
            response_format=self.response_format,
        )

        if self.assistant.tool_resources:
            self.tool_resources = self.assistant.tool_resources.model_dump()

        self.id = self.assistant.id

        self._save_settings()

        return self

    def _update_assistant(self):
        """
        Updates the existing assistant's parameters on the OpenAI server.

        This method updates the assistant's details such as name, description, instructions, tools, file IDs, metadata, and the model. It only updates parameters that have non-empty values. After updating the assistant, it also updates the local settings file to reflect these changes.

        No input parameters are directly passed to this method as it uses the agent's instance attributes.

        No output parameters are returned, but the method updates the assistant's details on the OpenAI server and locally updates the settings file.
        """
        tool_resources = copy.deepcopy(self.tool_resources)
        if tool_resources and tool_resources.get('file_search'):
            tool_resources['file_search'].pop('vector_stores', None)

        params = {
            "name": self.name,
            "description": self.description,
            "instructions": self.instructions,
            "tools": self.get_oai_tools(),
            "tool_resources": tool_resources,
            "temperature": self.temperature,
            "top_p": self.top_p,
            "response_format": self.response_format,
            "metadata": self.metadata,
            "model": self.model
        }
        params = {k: v for k, v in params.items() if v}
        self.assistant = self.client.beta.assistants.update(
            self.id,
            **params,
        )
        self._update_settings()

    def _upload_files(self):
        def add_id_to_file(f_path, id):
            """Add file id to file name"""
            if os.path.isfile(f_path):
                file_name, file_ext = os.path.splitext(f_path)
                f_path_new = file_name + "_" + id + file_ext
                os.rename(f_path, f_path_new)
                return f_path_new

        def get_id_from_file(f_path):
            """Get file id from file name"""
            if os.path.isfile(f_path):
                file_name, file_ext = os.path.splitext(f_path)
                file_name = os.path.basename(file_name)
                file_name = file_name.split("_")
                if len(file_name) > 1:
                    return file_name[-1] if "file-" in file_name[-1] else None
                else:
                    return None

        files_folders = self.files_folder if isinstance(self.files_folder, list) else [self.files_folder]

        file_search_ids = []
        code_interpreter_ids = []

        for files_folder in files_folders:
            if isinstance(files_folder, str):
                f_path = files_folder

                if not os.path.isdir(f_path):
                    f_path = os.path.join(self.get_class_folder_path(), files_folder)
                    f_path = os.path.normpath(f_path)

                if os.path.isdir(f_path):
                    f_paths = os.listdir(f_path)

                    f_paths = [f for f in f_paths if not f.startswith(".")]

                    f_paths = [os.path.join(f_path, f) for f in f_paths]

                    code_interpreter_file_extensions = [
                        ".json",  # JSON
                        ".csv",  # CSV
                        ".xml",  # XML
                        ".jpeg",  # JPEG
                        ".jpg",  # JPEG
                        ".gif",  # GIF
                        ".png",  # PNG
                        ".zip"  # ZIP
                    ]

                    for f_path in f_paths:
                        file_ext = os.path.splitext(f_path)[1]

                        f_path = f_path.strip()
                        file_id = get_id_from_file(f_path)
                        if file_id:
                            print("File already uploaded. Skipping... " + os.path.basename(f_path))
                        else:
                            print("Uploading new file... " + os.path.basename(f_path))
                            with open(f_path, 'rb') as f:
                                file_id = self.client.with_options(
                                    timeout=80 * 1000,
                                ).files.create(file=f, purpose="assistants").id
                            add_id_to_file(f_path, file_id)

                        if file_ext in code_interpreter_file_extensions:
                            code_interpreter_ids.append(file_id)
                        else:
                            file_search_ids.append(file_id)
                else:
                    print(f"Files folder '{f_path}' is not a directory. Skipping...", )
            else:
                print("Files folder path must be a string or list of strings. Skipping... ", files_folder)

        if FileSearch not in self.tools and file_search_ids:
            print("Detected files without FileSearch. Adding FileSearch tool...")
            self.add_tool(FileSearch)
        if CodeInterpreter not in self.tools and code_interpreter_ids:
            print("Detected files without CodeInterpreter. Adding CodeInterpreter tool...")
            self.add_tool(CodeInterpreter)

        self.add_file_ids(file_search_ids, "file_search")
        self.add_file_ids(code_interpreter_ids, "code_interpreter")

    # --- Tool Methods ---

    # TODO: fix 2 methods below
    def add_tool(self, tool):
        if not isinstance(tool, type):
            raise Exception("Tool must not be initialized.")

        subclasses = [FileSearch, CodeInterpreter, Retrieval]
        for subclass in subclasses:
            if issubclass(tool, subclass):
                if not any(issubclass(t, subclass) for t in self.tools):
                    self.tools.append(tool)
                return

        if issubclass(tool, BaseTool):
            if tool.__name__ == "ExampleTool":
                print("Skipping importing ExampleTool...")
                return
            self.tools = [t for t in self.tools if t.__name__ != tool.__name__]
            self.tools.append(tool)
        else:
            raise Exception("Invalid tool type.")

    def get_oai_tools(self):
        tools = []
        for tool in self.tools:
            if not isinstance(tool, type):
                print(tool)
                raise Exception("Tool must not be initialized.")

            if issubclass(tool, FileSearch):
                tools.append(tool(file_search=self.file_search).model_dump(exclude_none=True))
            elif issubclass(tool, CodeInterpreter):
                tools.append(tool().model_dump())
            elif issubclass(tool, Retrieval):
                tools.append(tool().model_dump())
            elif issubclass(tool, BaseTool):
                tools.append({
                    "type": "function",
                    "function": tool.openai_schema
                })
            else:
                raise Exception("Invalid tool type.")
        return tools

    def _parse_schemas(self):
        schemas_folders = self.schemas_folder if isinstance(self.schemas_folder, list) else [self.schemas_folder]

        for schemas_folder in schemas_folders:
            if isinstance(schemas_folder, str):
                f_path = schemas_folder

                if not os.path.isdir(f_path):
                    f_path = os.path.join(self.get_class_folder_path(), schemas_folder)
                    f_path = os.path.normpath(f_path)

                if os.path.isdir(f_path):
                    f_paths = os.listdir(f_path)

                    f_paths = [f for f in f_paths if not f.startswith(".")]

                    f_paths = [os.path.join(f_path, f) for f in f_paths]

                    for f_path in f_paths:
                        with open(f_path, 'r') as f:
                            openapi_spec = f.read()
                        try:
                            validate_openapi_spec(openapi_spec)
                        except Exception as e:
                            print("Invalid OpenAPI schema: " + os.path.basename(f_path))
                            raise e
                        try:
                            headers = None
                            params = None
                            if os.path.basename(f_path) in self.api_headers:
                                headers = self.api_headers[os.path.basename(f_path)]
                            if os.path.basename(f_path) in self.api_params:
                                params = self.api_params[os.path.basename(f_path)]
                            tools = ToolFactory.from_openapi_schema(openapi_spec, headers=headers, params=params)
                        except Exception as e:
                            print("Error parsing OpenAPI schema: " + os.path.basename(f_path))
                            raise e
                        for tool in tools:
                            self.add_tool(tool)
                else:
                    print("Schemas folder path is not a directory. Skipping... ", f_path)
            else:
                print("Schemas folder path must be a string or list of strings. Skipping... ", schemas_folder)

    def _parse_tools_folder(self):
        if not self.tools_folder:
            return

        if not os.path.isdir(self.tools_folder):
            self.tools_folder = os.path.join(self.get_class_folder_path(), self.tools_folder)
            self.tools_folder = os.path.normpath(self.tools_folder)

        if os.path.isdir(self.tools_folder):
            f_paths = os.listdir(self.tools_folder)
            f_paths = [f for f in f_paths if not f.startswith(".") and not f.startswith("__")]
            f_paths = [os.path.join(self.tools_folder, f) for f in f_paths]
            for f_path in f_paths:
                if not f_path.endswith(".py"):
                    continue
                if os.path.isfile(f_path):
                    try:
                        tool = ToolFactory.from_file(f_path)
                        self.add_tool(tool)
                    except Exception as e:
                        print(f"Error parsing tool file {os.path.basename(f_path)}: {e}. Skipping...")
                else:
                    print("Items in tools folder must be files. Skipping... ", f_path)
        else:
            print("Tools folder path is not a directory. Skipping... ", self.tools_folder)

    def get_openapi_schema(self, url):
        """Get openapi schema that contains all tools from the agent as different api paths. Make sure to call this after agency has been initialized."""
        if self.assistant is None:
            raise Exception(
                "Assistant is not initialized. Please initialize the agency first, before using this method")

        return ToolFactory.get_openapi_schema(self.tools, url)

    # --- Settings Methods ---

    def _check_parameters(self, assistant_settings, debug=False):
        """
        Checks if the agent's parameters match with the given assistant settings.

        Parameters:
            assistant_settings (dict): A dictionary containing the settings of an assistant.
            debug (bool): If True, prints debug statements. Default is False.

        Returns:
            bool: True if all the agent's parameters match the assistant settings, False otherwise.

        This method compares the current agent's parameters such as name, description, instructions, tools, file IDs, metadata, and model with the given assistant settings. It uses DeepDiff to compare complex structures like tools and metadata. If any parameter does not match, it returns False; otherwise, it returns True.
        """
        if self.name != assistant_settings['name']:
            if debug:
                print(f"Name mismatch: {self.name} != {assistant_settings['name']}")
            return False

        if self.description != assistant_settings['description']:
            if debug:
                print(f"Description mismatch: {self.description} != {assistant_settings['description']}")
            return False

        if self.instructions != assistant_settings['instructions']:
            if debug:
                print(f"Instructions mismatch: {self.instructions} != {assistant_settings['instructions']}")
            return False

        def clean_tool(tool):
            if isinstance(tool, dict):
                if 'function' in tool and 'strict' in tool['function'] and not tool['function']['strict']:
                    tool['function'].pop('strict', None)
            return tool

        local_tools = [clean_tool(tool) for tool in self.get_oai_tools()]
        assistant_tools = [clean_tool(tool) for tool in assistant_settings['tools']]

        # find file_search and code_interpreter tools in local_tools and assistant_tools
        # Find file_search tools in local and assistant tools
        local_file_search = next((tool for tool in local_tools if tool['type'] == 'file_search'), None)
        assistant_file_search = next((tool for tool in assistant_tools if tool['type'] == 'file_search'), None)

        if local_file_search:
            # If local file_search doesn't have a 'file_search' key, use assistant's if available
            if 'file_search' not in local_file_search and assistant_file_search and 'file_search' in assistant_file_search:
                local_file_search['file_search'] = assistant_file_search['file_search']
            elif 'file_search' in local_file_search:
                # Update max_num_results if not set locally but available in assistant
                if 'max_num_results' not in local_file_search['file_search'] and assistant_file_search and \
                   assistant_file_search['file_search'].get('max_num_results') is not None:
                    local_file_search['file_search']['max_num_results'] = assistant_file_search['file_search']['max_num_results']

                # Update ranking_options if not set locally but available in assistant
                if 'ranking_options' not in local_file_search['file_search'] and assistant_file_search and \
                   assistant_file_search['file_search'].get('ranking_options') is not None:
                    local_file_search['file_search']['ranking_options'] = assistant_file_search['file_search']['ranking_options']

        local_tools.sort(key=lambda x: json.dumps(x, sort_keys=True))
        assistant_tools.sort(key=lambda x: json.dumps(x, sort_keys=True))

        tools_diff = DeepDiff(local_tools, assistant_tools, ignore_order=True)
        if tools_diff:
            if debug:
                print(f"Tools mismatch: {tools_diff}")
                print("Local tools:", local_tools)
                print("Assistant tools:", assistant_tools)
            return False

        if self.temperature != assistant_settings['temperature']:
            if debug:
                print(f"Temperature mismatch: {self.temperature} != {assistant_settings['temperature']}")
            return False

        if self.top_p != assistant_settings['top_p']:
            if debug:
                print(f"Top_p mismatch: {self.top_p} != {assistant_settings['top_p']}")
            return False

        # adjust differences between local and assistant tool resources
        tool_resources_settings = copy.deepcopy(self.tool_resources)
        if tool_resources_settings is None:
            tool_resources_settings = {}
        if tool_resources_settings.get('file_search'):
            tool_resources_settings['file_search'].pop('vector_stores', None)
        if tool_resources_settings.get('file_search') is None:
            tool_resources_settings['file_search'] = {'vector_store_ids': []}
        if tool_resources_settings.get('code_interpreter') is None:
            tool_resources_settings['code_interpreter'] = {"file_ids": []}

        assistant_tool_resources = assistant_settings['tool_resources']
        if assistant_tool_resources is None:
            assistant_tool_resources = {}
        if assistant_tool_resources.get('code_interpreter') is None:
            assistant_tool_resources['code_interpreter'] = {"file_ids": []}
        if assistant_tool_resources.get('file_search') is None:
            assistant_tool_resources['file_search'] = {'vector_store_ids': []}

        tool_resources_diff = DeepDiff(tool_resources_settings, assistant_tool_resources, ignore_order=True)
        if tool_resources_diff != {}:
            if debug:
                print(f"Tool resources mismatch: {tool_resources_diff}")
                print("Local tool resources:", tool_resources_settings)
                print("Assistant tool resources:", assistant_settings['tool_resources'])
            return False

        metadata_diff = DeepDiff(self.metadata, assistant_settings['metadata'], ignore_order=True)
        if metadata_diff != {}:
            if debug:
                print(f"Metadata mismatch: {metadata_diff}")
            return False

        if self.model != assistant_settings['model']:
            if debug:
                print(f"Model mismatch: {self.model} != {assistant_settings['model']}")
            return False

        response_format_diff = DeepDiff(self.response_format, assistant_settings['response_format'], ignore_order=True)
        if response_format_diff != {}:
            if debug:
                print(f"Response format mismatch: {response_format_diff}")
            return False

        return True

    def _save_settings(self):
        path = self.get_settings_path()
        # check if settings.json exists
        if not os.path.isfile(path):
            with open(path, 'w') as f:
                json.dump([self.assistant.model_dump()], f, indent=4)
        else:
            settings = []
            with open(path, 'r') as f:
                settings = json.load(f)
                settings.append(self.assistant.model_dump())
            with open(path, 'w') as f:
                json.dump(settings, f, indent=4)

    def _update_settings(self):
        path = self.get_settings_path()
        # check if settings.json exists
        if os.path.isfile(path):
            settings = []
            with open(path, 'r') as f:
                settings = json.load(f)
                for i, assistant_settings in enumerate(settings):
                    if assistant_settings['id'] == self.id:
                        settings[i] = self.assistant.model_dump()
                        break
            with open(path, 'w') as f:
                json.dump(settings, f, indent=4)

    # --- Helper Methods ---

    def add_file_ids(self, file_ids: List[str], tool_resource: Literal["code_interpreter", "file_search"]):
        if not file_ids:
            return

        if self.tool_resources is None:
            self.tool_resources = {}

        if tool_resource == "code_interpreter":
            if CodeInterpreter not in self.tools:
                raise Exception("CodeInterpreter tool not found in tools.")

            if tool_resource not in self.tool_resources or self.tool_resources[
                tool_resource] is None:
                self.tool_resources[tool_resource] = {
                    "file_ids": file_ids
                }

            self.tool_resources[tool_resource]['file_ids'] = file_ids
        elif tool_resource == "file_search":
            if FileSearch not in self.tools:
                raise Exception("FileSearch tool not found in tools.")

            if tool_resource not in self.tool_resources or self.tool_resources[
                tool_resource] is None:
                self.tool_resources[tool_resource] = {
                    "vector_stores": [{
                        "file_ids": file_ids
                    }]
                }
            elif not self.tool_resources[tool_resource].get('vector_store_ids'):
                self.tool_resources[tool_resource]['vector_stores'] = [{
                    "file_ids": file_ids
                }]
            else:
                vector_store_id = self.tool_resources[tool_resource]['vector_store_ids'][0]
                self.client.beta.vector_stores.file_batches.create(
                    vector_store_id=vector_store_id,
                    file_ids=file_ids
                )
        else:
            raise Exception("Invalid tool resource.")

    def get_settings_path(self):
        return self.settings_path

    def _read_instructions(self):
        class_instructions_path = os.path.normpath(os.path.join(self.get_class_folder_path(), self.instructions))
        if os.path.isfile(class_instructions_path):
            with open(class_instructions_path, 'r') as f:
                self.instructions = f.read()
        elif os.path.isfile(self.instructions):
            with open(self.instructions, 'r') as f:
                self.instructions = f.read()
        elif "./instructions.md" in self.instructions or "./instructions.txt" in self.instructions:
            raise Exception("Instructions file not found.")

    def get_class_folder_path(self):
        try:
            # First, try to use the __file__ attribute of the module
            return os.path.abspath(os.path.dirname(self.__module__.__file__))
        except (TypeError, OSError, AttributeError) as e:
            # If that fails, fall back to inspect
            try:
                class_file = inspect.getfile(self.__class__)
            except (TypeError, OSError, AttributeError) as e:
                return "./"
            return os.path.abspath(os.path.realpath(os.path.dirname(class_file)))

    def add_shared_instructions(self, instructions: str):
        if not instructions:
            return

        if self._shared_instructions is None:
            self._shared_instructions = instructions
        else:
            self.instructions = self.instructions.replace(self._shared_instructions, "")
            self.instructions = self.instructions.strip().strip("\n")
            self._shared_instructions = instructions

        self.instructions = self._shared_instructions + "\n\n" + self.instructions

    # --- Cleanup Methods ---
    def delete(self):
        """Deletes assistant, all vector stores, and all files associated with the agent."""
        self._delete_assistant()
        self._delete_files()
        self._delete_settings()

    def _delete_files(self):
        if not self.tool_resources:
            return

        file_ids = []
        if self.tool_resources.get('code_interpreter'):
            file_ids = self.tool_resources['code_interpreter'].get('file_ids', [])

        if self.tool_resources.get('file_search'):
            file_search_vector_store_ids = self.tool_resources['file_search'].get('vector_store_ids', [])
            for vector_store_id in file_search_vector_store_ids:
                files = self.client.beta.vector_stores.files.list(vector_store_id=vector_store_id, limit=100)
                for file in files:
                    file_ids.append(file.id)

                self.client.beta.vector_stores.delete(vector_store_id)

        for file_id in file_ids:
            self.client.files.delete(file_id)

    def _delete_assistant(self):
        self.client.beta.assistants.delete(self.id)
        self._delete_settings()

    def _delete_settings(self):
        path = self.get_settings_path()
        # check if settings.json exists
        if os.path.isfile(path):
            settings = []
            with open(path, 'r') as f:
                settings = json.load(f)
                for i, assistant_settings in enumerate(settings):
                    if assistant_settings['id'] == self.id:
                        settings.pop(i)
                        break
            with open(path, 'w') as f:
                json.dump(settings, f, indent=4)

__init__(id=None, name=None, description=None, instructions='', tools=None, tool_resources=None, temperature=None, top_p=None, response_format='auto', tools_folder=None, files_folder=None, schemas_folder=None, api_headers=None, api_params=None, file_ids=None, metadata=None, model='gpt-4o-2024-08-06', validation_attempts=1, max_prompt_tokens=None, max_completion_tokens=None, truncation_strategy=None, examples=None, file_search=None, parallel_tool_calls=True, refresh_from_id=True)

Initializes an Agent with specified attributes, tools, and OpenAI client.

Parameters:

Name Type Description Default
id str

Loads the assistant from OpenAI assistant ID. Assistant will be created or loaded from settings if ID is not provided. Defaults to None.

None
name str

Name of the agent. Defaults to the class name if not provided.

None
description str

A brief description of the agent's purpose. Defaults to None.

None
instructions str

Path to a file containing specific instructions for the agent. Defaults to an empty string.

''
tools List[Union[Type[BaseTool], Type[Retrieval], Type[CodeInterpreter]]]

A list of tools (as classes) that the agent can use. Defaults to an empty list.

None
tool_resources ToolResources

A set of resources that are used by the assistant's tools. The resources are specific to the type of tool. For example, the code_interpreter tool requires a list of file IDs, while the file_search tool requires a list of vector store IDs. Defaults to None.

None
temperature float

The temperature parameter for the OpenAI API. Defaults to None.

None
top_p float

The top_p parameter for the OpenAI API. Defaults to None.

None
response_format Union[str, Dict, type]

The response format for the OpenAI API. If BaseModel is provided, it will be converted to a response format. Defaults to None.

'auto'
tools_folder str

Path to a directory containing tools associated with the agent. Each tool must be defined in a separate file. File must be named as the class name of the tool. Defaults to None.

None
files_folder Union[List[str], str]

Path or list of paths to directories containing files associated with the agent. Defaults to None.

None
schemas_folder Union[List[str], str]

Path or list of paths to directories containing OpenAPI schemas associated with the agent. Defaults to None.

None
api_headers Dict[str, Dict[str, str]]

Headers to be used for the openapi requests. Each key must be a full filename from schemas_folder. Defaults to an empty dictionary.

None
api_params Dict[str, Dict[str, str]]

Extra params to be used for the openapi requests. Each key must be a full filename from schemas_folder. Defaults to an empty dictionary.

None
metadata Dict[str, str]

Metadata associated with the agent. Defaults to an empty dictionary.

None
model str

The model identifier for the OpenAI API. Defaults to "gpt-4o".

'gpt-4o-2024-08-06'
validation_attempts int

Number of attempts to validate the response with response_validator function. Defaults to 1.

1
max_prompt_tokens int

Maximum number of tokens allowed in the prompt. Defaults to None.

None
max_completion_tokens int

Maximum number of tokens allowed in the completion. Defaults to None.

None
truncation_strategy TruncationStrategy

Truncation strategy for the OpenAI API. Defaults to None.

None
examples List[Dict]

A list of example messages for the agent. Defaults to None.

None
file_search FileSearchConfig

A dictionary containing the file search tool configuration. Defaults to None.

None
parallel_tool_calls bool

Whether to enable parallel function calling during tool use. Defaults to True.

True
refresh_from_id bool

Whether to load and update the agent from the OpenAI assistant ID when provided. Defaults to True.

True

This constructor sets up the agent with its unique properties, initializes the OpenAI client, reads instructions if provided, and uploads any associated files.

Source code in agency_swarm/agents/agent.py
def __init__(
        self,
        id: str = None,
        name: str = None,
        description: str = None,
        instructions: str = "",
        tools: List[Union[Type[BaseTool], Type[FileSearch], Type[CodeInterpreter], type[Retrieval]]] = None,
        tool_resources: ToolResources = None,
        temperature: float = None,
        top_p: float = None,
        response_format: Union[str, dict, type] = "auto",
        tools_folder: str = None,
        files_folder: Union[List[str], str] = None,
        schemas_folder: Union[List[str], str] = None,
        api_headers: Dict[str, Dict[str, str]] = None,
        api_params: Dict[str, Dict[str, str]] = None,
        file_ids: List[str] = None,
        metadata: Dict[str, str] = None,
        model: str = "gpt-4o-2024-08-06",
        validation_attempts: int = 1,
        max_prompt_tokens: int = None,
        max_completion_tokens: int = None,
        truncation_strategy: dict = None,
        examples: List[ExampleMessage] = None,
        file_search: FileSearchConfig = None,
        parallel_tool_calls: bool = True,
        refresh_from_id: bool = True,
):
    """
    Initializes an Agent with specified attributes, tools, and OpenAI client.

    Parameters:
        id (str, optional): Loads the assistant from OpenAI assistant ID. Assistant will be created or loaded from settings if ID is not provided. Defaults to None.
        name (str, optional): Name of the agent. Defaults to the class name if not provided.
        description (str, optional): A brief description of the agent's purpose. Defaults to None.
        instructions (str, optional): Path to a file containing specific instructions for the agent. Defaults to an empty string.
        tools (List[Union[Type[BaseTool], Type[Retrieval], Type[CodeInterpreter]]], optional): A list of tools (as classes) that the agent can use. Defaults to an empty list.
        tool_resources (ToolResources, optional): A set of resources that are used by the assistant's tools. The resources are specific to the type of tool. For example, the code_interpreter tool requires a list of file IDs, while the file_search tool requires a list of vector store IDs. Defaults to None.
        temperature (float, optional): The temperature parameter for the OpenAI API. Defaults to None.
        top_p (float, optional): The top_p parameter for the OpenAI API. Defaults to None.
        response_format (Union[str, Dict, type], optional): The response format for the OpenAI API. If BaseModel is provided, it will be converted to a response format. Defaults to None.
        tools_folder (str, optional): Path to a directory containing tools associated with the agent. Each tool must be defined in a separate file. File must be named as the class name of the tool. Defaults to None.
        files_folder (Union[List[str], str], optional): Path or list of paths to directories containing files associated with the agent. Defaults to None.
        schemas_folder (Union[List[str], str], optional): Path or list of paths to directories containing OpenAPI schemas associated with the agent. Defaults to None.
        api_headers (Dict[str,Dict[str, str]], optional): Headers to be used for the openapi requests. Each key must be a full filename from schemas_folder. Defaults to an empty dictionary.
        api_params (Dict[str, Dict[str, str]], optional): Extra params to be used for the openapi requests. Each key must be a full filename from schemas_folder. Defaults to an empty dictionary.
        metadata (Dict[str, str], optional): Metadata associated with the agent. Defaults to an empty dictionary.
        model (str, optional): The model identifier for the OpenAI API. Defaults to "gpt-4o".
        validation_attempts (int, optional): Number of attempts to validate the response with response_validator function. Defaults to 1.
        max_prompt_tokens (int, optional): Maximum number of tokens allowed in the prompt. Defaults to None.
        max_completion_tokens (int, optional): Maximum number of tokens allowed in the completion. Defaults to None.
        truncation_strategy (TruncationStrategy, optional): Truncation strategy for the OpenAI API. Defaults to None.
        examples (List[Dict], optional): A list of example messages for the agent. Defaults to None.
        file_search (FileSearchConfig, optional): A dictionary containing the file search tool configuration. Defaults to None.
        parallel_tool_calls (bool, optional): Whether to enable parallel function calling during tool use. Defaults to True.
        refresh_from_id (bool, optional): Whether to load and update the agent from the OpenAI assistant ID when provided. Defaults to True.

    This constructor sets up the agent with its unique properties, initializes the OpenAI client, reads instructions if provided, and uploads any associated files.
    """
    # public attributes
    self.id = id
    self.name = name if name else self.__class__.__name__
    self.description = description
    self.instructions = instructions
    self.tools = tools[:] if tools is not None else []
    self.tools = [tool for tool in self.tools if tool.__name__ != "ExampleTool"]
    self.tool_resources = tool_resources
    self.temperature = temperature
    self.top_p = top_p
    self.response_format = response_format
    # use structured outputs if response_format is a BaseModel
    if isinstance(self.response_format, type):
        self.response_format = type_to_response_format_param(self.response_format)
    self.tools_folder = tools_folder
    self.files_folder = files_folder if files_folder else []
    self.schemas_folder = schemas_folder if schemas_folder else []
    self.api_headers = api_headers if api_headers else {}
    self.api_params = api_params if api_params else {}
    self.metadata = metadata if metadata else {}
    self.model = model
    self.validation_attempts = validation_attempts
    self.max_prompt_tokens = max_prompt_tokens
    self.max_completion_tokens = max_completion_tokens
    self.truncation_strategy = truncation_strategy
    self.examples = examples
    self.file_search = file_search
    self.parallel_tool_calls = parallel_tool_calls
    self.refresh_from_id = refresh_from_id

    self.settings_path = './settings.json'

    # private attributes
    self._assistant: Any = None
    self._shared_instructions = None

    # init methods
    self.client = get_openai_client()
    self._read_instructions()

    # upload files
    self._upload_files()
    if file_ids:
        print("Warning: 'file_ids' parameter is deprecated. Please use 'tool_resources' parameter instead.")
        self.add_file_ids(file_ids, "file_search")

    self._parse_schemas()
    self._parse_tools_folder()

delete()

Deletes assistant, all vector stores, and all files associated with the agent.

Source code in agency_swarm/agents/agent.py
def delete(self):
    """Deletes assistant, all vector stores, and all files associated with the agent."""
    self._delete_assistant()
    self._delete_files()
    self._delete_settings()

get_openapi_schema(url)

Get openapi schema that contains all tools from the agent as different api paths. Make sure to call this after agency has been initialized.

Source code in agency_swarm/agents/agent.py
def get_openapi_schema(self, url):
    """Get openapi schema that contains all tools from the agent as different api paths. Make sure to call this after agency has been initialized."""
    if self.assistant is None:
        raise Exception(
            "Assistant is not initialized. Please initialize the agency first, before using this method")

    return ToolFactory.get_openapi_schema(self.tools, url)

init_oai()

Initializes the OpenAI assistant for the agent.

This method handles the initialization and potential updates of the agent's OpenAI assistant. It loads the assistant based on a saved ID, updates the assistant if necessary, or creates a new assistant if it doesn't exist. After initialization or update, it saves the assistant's settings.

Output

self: Returns the agent instance for chaining methods or further processing.

Source code in agency_swarm/agents/agent.py
def init_oai(self):
    """
    Initializes the OpenAI assistant for the agent.

    This method handles the initialization and potential updates of the agent's OpenAI assistant. It loads the assistant based on a saved ID, updates the assistant if necessary, or creates a new assistant if it doesn't exist. After initialization or update, it saves the assistant's settings.

    Output:
        self: Returns the agent instance for chaining methods or further processing.
    """

    # check if settings.json exists
    path = self.get_settings_path()

    # load assistant from id
    if self.id:
        if not self.refresh_from_id:
            return self

        self.assistant = self.client.beta.assistants.retrieve(self.id)
        # Assign attributes to self if they are None
        self.instructions = self.instructions or self.assistant.instructions
        self.name = self.name if self.name != self.__class__.__name__ else self.assistant.name
        self.description = self.description or self.assistant.description
        self.temperature = self.assistant.temperature if self.temperature is None else self.temperature
        self.top_p = self.top_p or self.assistant.top_p
        self.response_format = self.response_format or self.assistant.response_format
        if not isinstance(self.response_format, str):
            self.response_format = self.response_format or self.response_format.model_dump()
        else:
            self.response_format = self.response_format or self.assistant.response_format
        self.tool_resources = self.tool_resources or self.assistant.tool_resources.model_dump()
        self.metadata = self.metadata or self.assistant.metadata
        self.model = self.model or self.assistant.model
        self.tool_resources = self.tool_resources or self.assistant.tool_resources.model_dump()

        for tool in self.assistant.tools:
            # update assistants created with v1
            if tool.type == "retrieval":
                self.client.beta.assistants.update(self.id, tools=self.get_oai_tools())

        # update assistant if parameters are different
        if not self._check_parameters(self.assistant.model_dump()):
            self._update_assistant()

        return self

    # load assistant from settings
    if os.path.exists(path):
        with open(path, 'r') as f:
            settings = json.load(f)
            # iterate settings and find the assistant with the same name
            for assistant_settings in settings:
                if assistant_settings['name'] == self.name:
                    try:
                        self.assistant = self.client.beta.assistants.retrieve(assistant_settings['id'])
                        self.id = assistant_settings['id']

                        # update assistant if parameters are different
                        if not self._check_parameters(self.assistant.model_dump()):
                            print("Updating agent... " + self.name)
                            self._update_assistant()

                        if self.assistant.tool_resources:
                            self.tool_resources = self.assistant.tool_resources.model_dump()

                        self._update_settings()
                        return self
                    except NotFoundError:
                        continue

    # create assistant if settings.json does not exist or assistant with the same name does not exist
    self.assistant = self.client.beta.assistants.create(
        model=self.model,
        name=self.name,
        description=self.description,
        instructions=self.instructions,
        tools=self.get_oai_tools(),
        tool_resources=self.tool_resources,
        metadata=self.metadata,
        temperature=self.temperature,
        top_p=self.top_p,
        response_format=self.response_format,
    )

    if self.assistant.tool_resources:
        self.tool_resources = self.assistant.tool_resources.model_dump()

    self.id = self.assistant.id

    self._save_settings()

    return self

response_validator(message)

Validates the response from the agent. If the response is invalid, it must raise an exception with instructions for the caller agent on how to proceed.

Parameters:

Name Type Description Default
message str

The response from the agent.

required

Returns:

Name Type Description
str str

The validated response.

Source code in agency_swarm/agents/agent.py
def response_validator(self, message: str | list) -> str:
    """
    Validates the response from the agent. If the response is invalid, it must raise an exception with instructions
    for the caller agent on how to proceed.

    Parameters:
        message (str): The response from the agent.

    Returns:
        str: The validated response.
    """
    return message

Agency

Source code in agency_swarm/agency/agency.py
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
class Agency:
    def __init__(self,
                 agency_chart: List,
                 shared_instructions: str = "",
                 shared_files: Union[str, List[str]] = None,
                 async_mode: Literal['threading', "tools_threading"] = None,
                 send_message_tool_class: Type[SendMessageBase] = SendMessage,
                 settings_path: str = "./settings.json",
                 settings_callbacks: SettingsCallbacks = None,
                 threads_callbacks: ThreadsCallbacks = None,
                 temperature: float = 0.3,
                 top_p: float = 1.0,
                 max_prompt_tokens: int = None,
                 max_completion_tokens: int = None,
                 truncation_strategy: dict = None,
                 ):
        """
        Initializes the Agency object, setting up agents, threads, and core functionalities.

        Parameters:
            agency_chart: The structure defining the hierarchy and interaction of agents within the agency.
            shared_instructions (str, optional): A path to a file containing shared instructions for all agents. Defaults to an empty string.
            shared_files (Union[str, List[str]], optional): A path to a folder or a list of folders containing shared files for all agents. Defaults to None.
            async_mode (str, optional): Specifies the mode for asynchronous processing. In "threading" mode, all sub-agents run in separate threads. In "tools_threading" mode, all tools run in separate threads, but agents do not. Defaults to None.
            send_message_tool_class (Type[SendMessageBase], optional): The class to use for the send_message tool. For async communication, use `SendMessageAsyncThreading`. Defaults to SendMessage.
            settings_path (str, optional): The path to the settings file for the agency. Must be json. If file does not exist, it will be created. Defaults to None.
            settings_callbacks (SettingsCallbacks, optional): A dictionary containing functions to load and save settings for the agency. The keys must be "load" and "save". Both values must be defined. Defaults to None.
            threads_callbacks (ThreadsCallbacks, optional): A dictionary containing functions to load and save threads for the agency. The keys must be "load" and "save". Both values must be defined. Defaults to None.
            temperature (float, optional): The temperature value to use for the agents. Agent-specific values will override this. Defaults to 0.3.
            top_p (float, optional): The top_p value to use for the agents. Agent-specific values will override this. Defaults to None.
            max_prompt_tokens (int, optional): The maximum number of tokens allowed in the prompt for each agent. Agent-specific values will override this. Defaults to None.
            max_completion_tokens (int, optional): The maximum number of tokens allowed in the completion for each agent. Agent-specific values will override this. Defaults to None.
            truncation_strategy (dict, optional): The truncation strategy to use for the completion for each agent. Agent-specific values will override this. Defaults to None.

        This constructor initializes various components of the Agency, including CEO, agents, threads, and user interactions. It parses the agency chart to set up the organizational structure and initializes the messaging tools, agents, and threads necessary for the operation of the agency. Additionally, it prepares a main thread for user interactions.
        """
        self.ceo = None
        self.user = User()
        self.agents = []
        self.agents_and_threads = {}
        self.main_recipients = []
        self.main_thread = None
        self.recipient_agents = None  # for autocomplete
        self.shared_files = shared_files if shared_files else []
        self.async_mode = async_mode
        self.send_message_tool_class = send_message_tool_class
        self.settings_path = settings_path
        self.settings_callbacks = settings_callbacks
        self.threads_callbacks = threads_callbacks
        self.temperature = temperature
        self.top_p = top_p
        self.max_prompt_tokens = max_prompt_tokens
        self.max_completion_tokens = max_completion_tokens
        self.truncation_strategy = truncation_strategy

        # set thread type based send_message_tool_class async mode
        if hasattr(send_message_tool_class.ToolConfig, "async_mode") and send_message_tool_class.ToolConfig.async_mode:
            self._thread_type = ThreadAsync
        else:
            self._thread_type = Thread  

        if self.async_mode == "threading":
            from agency_swarm.tools.send_message import SendMessageAsyncThreading
            print("Warning: 'threading' mode is deprecated. Please use send_message_tool_class = SendMessageAsyncThreading to use async communication.")
            self.send_message_tool_class = SendMessageAsyncThreading
        elif self.async_mode == "tools_threading":
            Thread.async_mode = "tools_threading"
            print("Warning: 'tools_threading' mode is deprecated. Use tool.ToolConfig.async_mode = 'threading' instead.")
        elif self.async_mode is None:
            pass
        else:
            raise Exception("Please select async_mode = 'threading' or 'tools_threading'.")

        if os.path.isfile(os.path.join(self._get_class_folder_path(), shared_instructions)):
            self._read_instructions(os.path.join(self._get_class_folder_path(), shared_instructions))
        elif os.path.isfile(shared_instructions):
            self._read_instructions(shared_instructions)
        else:
            self.shared_instructions = shared_instructions

        self.shared_state = SharedState()

        self._parse_agency_chart(agency_chart)
        self._init_threads()
        self._create_special_tools()
        self._init_agents()

    def get_completion(self, message: str,
                       message_files: List[str] = None,
                       yield_messages: bool = False,
                       recipient_agent: Agent = None,
                       additional_instructions: str = None,
                       attachments: List[dict] = None,
                       tool_choice: dict = None,
                       verbose: bool = False,
                       response_format: dict = None):
        """
        Retrieves the completion for a given message from the main thread.

        Parameters:
            message (str): The message for which completion is to be retrieved.
            message_files (list, optional): A list of file ids to be sent as attachments with the message. When using this parameter, files will be assigned both to file_search and code_interpreter tools if available. It is recommended to assign files to the most sutiable tool manually, using the attachments parameter.  Defaults to None.
            yield_messages (bool, optional): Flag to determine if intermediate messages should be yielded. Defaults to True.
            recipient_agent (Agent, optional): The agent to which the message should be sent. Defaults to the first agent in the agency chart.
            additional_instructions (str, optional): Additional instructions to be sent with the message. Defaults to None.
            attachments (List[dict], optional): A list of attachments to be sent with the message, following openai format. Defaults to None.
            tool_choice (dict, optional): The tool choice for the recipient agent to use. Defaults to None.
            parallel_tool_calls (bool, optional): Whether to enable parallel function calling during tool use. Defaults to True.
            verbose (bool, optional): Whether to print the intermediary messages in console. Defaults to False.
            response_format (dict, optional): The response format to use for the completion.

        Returns:
            Generator or final response: Depending on the 'yield_messages' flag, this method returns either a generator yielding intermediate messages or the final response from the main thread.
        """
        if verbose and yield_messages:
            raise Exception("Verbose mode is not compatible with yield_messages=True")

        res = self.main_thread.get_completion(message=message,
                                               message_files=message_files,
                                               attachments=attachments,
                                               recipient_agent=recipient_agent,
                                               additional_instructions=additional_instructions,
                                               tool_choice=tool_choice,
                                               yield_messages=yield_messages or verbose,
                                               response_format=response_format)

        if not yield_messages or verbose:
            while True:
                try:
                    message = next(res)
                    if verbose:
                        message.cprint()
                except StopIteration as e:
                    return e.value

        return res


    def get_completion_stream(self,
                              message: str,
                              event_handler: type(AgencyEventHandler),
                              message_files: List[str] = None,
                              recipient_agent: Agent = None,
                              additional_instructions: str = None,
                              attachments: List[dict] = None,
                              tool_choice: dict = None,
                              response_format: dict = None):
        """
        Generates a stream of completions for a given message from the main thread.

        Parameters:
            message (str): The message for which completion is to be retrieved.
            event_handler (type(AgencyEventHandler)): The event handler class to handle the completion stream. https://github.com/openai/openai-python/blob/main/helpers.md
            message_files (list, optional): A list of file ids to be sent as attachments with the message. When using this parameter, files will be assigned both to file_search and code_interpreter tools if available. It is recommended to assign files to the most sutiable tool manually, using the attachments parameter.  Defaults to None.
            recipient_agent (Agent, optional): The agent to which the message should be sent. Defaults to the first agent in the agency chart.
            additional_instructions (str, optional): Additional instructions to be sent with the message. Defaults to None.
            attachments (List[dict], optional): A list of attachments to be sent with the message, following openai format. Defaults to None.
            tool_choice (dict, optional): The tool choice for the recipient agent to use. Defaults to None.
            parallel_tool_calls (bool, optional): Whether to enable parallel function calling during tool use. Defaults to True.

        Returns:
            Final response: Final response from the main thread.
        """
        if not inspect.isclass(event_handler):
            raise Exception("Event handler must not be an instance.")

        res = self.main_thread.get_completion_stream(message=message,
                                                      message_files=message_files,
                                                      event_handler=event_handler,
                                                      attachments=attachments,
                                                      recipient_agent=recipient_agent,
                                                      additional_instructions=additional_instructions,
                                                      tool_choice=tool_choice,
                                                      response_format=response_format)

        while True:
            try:
                next(res)
            except StopIteration as e:
                event_handler.on_all_streams_end()

                return e.value

    def get_completion_parse(self, message: str,
                             response_format: Type[T],
                             message_files: List[str] = None,
                             recipient_agent: Agent = None,
                             additional_instructions: str = None,
                             attachments: List[dict] = None,
                             tool_choice: dict = None,
                             verbose: bool = False) -> T:
        """
        Retrieves the completion for a given message from the main thread and parses the response using the provided pydantic model.

        Parameters:
            message (str): The message for which completion is to be retrieved.
            response_format (type(BaseModel)): The response format to use for the completion. 
            message_files (list, optional): A list of file ids to be sent as attachments with the message. When using this parameter, files will be assigned both to file_search and code_interpreter tools if available. It is recommended to assign files to the most sutiable tool manually, using the attachments parameter.  Defaults to None.
            recipient_agent (Agent, optional): The agent to which the message should be sent. Defaults to the first agent in the agency chart.
            additional_instructions (str, optional): Additional instructions to be sent with the message. Defaults to None.
            attachments (List[dict], optional): A list of attachments to be sent with the message, following openai format. Defaults to None.
            tool_choice (dict, optional): The tool choice for the recipient agent to use. Defaults to None.
            verbose (bool, optional): Whether to print the intermediary messages in console. Defaults to False.

        Returns:
            Final response: The final response from the main thread, parsed using the provided pydantic model.
        """
        response_model = None
        if isinstance(response_format, type):
            response_model = response_format
            response_format = type_to_response_format_param(response_format)

        res = self.get_completion(message=message,
                            message_files=message_files,
                            recipient_agent=recipient_agent,
                            additional_instructions=additional_instructions,
                            attachments=attachments,
                            tool_choice=tool_choice,
                            response_format=response_format,
                            verbose=verbose)

        try:
            return response_model.model_validate_json(res)
        except:
            parsed_res = json.loads(res)
            if 'refusal' in parsed_res:
                raise RefusalError(parsed_res['refusal'])
            else:
                raise Exception("Failed to parse response: " + res)

    def demo_gradio(self, height=450, dark_mode=True, **kwargs):
        """
        Launches a Gradio-based demo interface for the agency chatbot.

        Parameters:
            height (int, optional): The height of the chatbot widget in the Gradio interface. Default is 600.
            dark_mode (bool, optional): Flag to determine if the interface should be displayed in dark mode. Default is True.
            **kwargs: Additional keyword arguments to be passed to the Gradio interface.
        This method sets up and runs a Gradio interface, allowing users to interact with the agency's chatbot. It includes a text input for the user's messages and a chatbot interface for displaying the conversation. The method handles user input and chatbot responses, updating the interface dynamically.
        """

        try:
            import gradio as gr
        except ImportError:
            raise Exception("Please install gradio: pip install gradio")

        js = """function () {
          gradioURL = window.location.href
          if (!gradioURL.endsWith('?__theme={theme}')) {
            window.location.replace(gradioURL + '?__theme={theme}');
          }
        }"""

        if dark_mode:
            js = js.replace("{theme}", "dark")
        else:
            js = js.replace("{theme}", "light")

        attachments = []
        images = []
        message_file_names = None
        uploading_files = False
        recipient_agent_names = [agent.name for agent in self.main_recipients]
        recipient_agent = self.main_recipients[0]

        with gr.Blocks(js=js) as demo:
            chatbot_queue = queue.Queue()
            chatbot = gr.Chatbot(height=height)
            with gr.Row():
                with gr.Column(scale=9):
                    dropdown = gr.Dropdown(label="Recipient Agent", choices=recipient_agent_names,
                                           value=recipient_agent.name)
                    msg = gr.Textbox(label="Your Message", lines=4)
                with gr.Column(scale=1):
                    file_upload = gr.Files(label="OpenAI Files", type="filepath")
            button = gr.Button(value="Send", variant="primary")

            def handle_dropdown_change(selected_option):
                nonlocal recipient_agent
                recipient_agent = self._get_agent_by_name(selected_option)

            def handle_file_upload(file_list):
                nonlocal attachments
                nonlocal message_file_names
                nonlocal uploading_files
                nonlocal images
                uploading_files = True
                attachments = []
                message_file_names = []
                if file_list:
                    try:
                        for file_obj in file_list:
                            purpose = get_file_purpose(file_obj.name)

                            with open(file_obj.name, 'rb') as f:
                                # Upload the file to OpenAI
                                file = self.main_thread.client.files.create(
                                    file=f,
                                    purpose=purpose
                                )

                            if purpose == "vision":
                                images.append({
                                    "type": "image_file",
                                    "image_file": {"file_id": file.id}
                                })
                            else:
                                attachments.append({
                                    "file_id": file.id,
                                    "tools": get_tools(file.filename)
                                })

                            message_file_names.append(file.filename)
                            print(f"Uploaded file ID: {file.id}")
                        return attachments
                    except Exception as e:
                        print(f"Error: {e}")
                        return str(e)
                    finally:
                        uploading_files = False

                uploading_files = False
                return "No files uploaded"

            def user(user_message, history):
                if not user_message.strip():
                    return user_message, history

                nonlocal message_file_names
                nonlocal uploading_files
                nonlocal images
                nonlocal attachments
                nonlocal recipient_agent

                # Check if attachments contain file search or code interpreter types
                def check_and_add_tools_in_attachments(attachments, recipient_agent):
                    for attachment in attachments:
                        for tool in attachment.get("tools", []):
                            if tool["type"] == "file_search":
                                if not any(isinstance(t, FileSearch) for t in recipient_agent.tools):
                                    # Add FileSearch tool if it does not exist
                                    recipient_agent.tools.append(FileSearch)
                                    recipient_agent.client.beta.assistants.update(recipient_agent.id, tools=recipient_agent.get_oai_tools())
                                    print("Added FileSearch tool to recipient agent to analyze the file.")
                            elif tool["type"] == "code_interpreter":
                                if not any(isinstance(t, CodeInterpreter) for t in recipient_agent.tools):
                                    # Add CodeInterpreter tool if it does not exist
                                    recipient_agent.tools.append(CodeInterpreter)
                                    recipient_agent.client.beta.assistants.update(recipient_agent.id, tools=recipient_agent.get_oai_tools())
                                    print("Added CodeInterpreter tool to recipient agent to analyze the file.")
                    return None

                check_and_add_tools_in_attachments(attachments, recipient_agent)

                if history is None:
                    history = []

                original_user_message = user_message

                # Append the user message with a placeholder for bot response
                if recipient_agent:
                    user_message = f"👤 User 🗣️ @{recipient_agent.name}:\n" + user_message.strip()
                else:
                    user_message = f"👤 User:" + user_message.strip()

                nonlocal message_file_names
                if message_file_names:
                    user_message += "\n\n📎 Files:\n" + "\n".join(message_file_names)

                return original_user_message, history + [[user_message, None]]

            class GradioEventHandler(AgencyEventHandler):
                message_output = None

                @classmethod
                def change_recipient_agent(cls, recipient_agent_name):
                    nonlocal chatbot_queue
                    chatbot_queue.put("[change_recipient_agent]")
                    chatbot_queue.put(recipient_agent_name)

                @override
                def on_message_created(self, message: Message) -> None:
                    if message.role == "user":
                        full_content = ""
                        for content in message.content:
                            if content.type == "image_file":
                                full_content += f"🖼️ Image File: {content.image_file.file_id}\n"
                                continue

                            if content.type == "image_url":
                                full_content += f"\n{content.image_url.url}\n"
                                continue

                            if content.type == "text":
                                full_content += content.text.value + "\n"


                        self.message_output = MessageOutput("text", self.agent_name, self.recipient_agent_name,
                                                            full_content)

                    else:
                        self.message_output = MessageOutput("text", self.recipient_agent_name, self.agent_name,
                                                            "")

                    chatbot_queue.put("[new_message]")
                    chatbot_queue.put(self.message_output.get_formatted_content())

                @override
                def on_text_delta(self, delta, snapshot):
                    chatbot_queue.put(delta.value)

                @override
                def on_tool_call_created(self, tool_call: ToolCall):
                    if isinstance(tool_call, dict):
                        if "type" not in tool_call:
                            tool_call["type"] = "function"

                        if tool_call["type"] == "function":
                            tool_call = FunctionToolCall(**tool_call)
                        elif tool_call["type"] == "code_interpreter":
                            tool_call = CodeInterpreterToolCall(**tool_call)
                        elif tool_call["type"] == "file_search" or tool_call["type"] == "retrieval":
                            tool_call = FileSearchToolCall(**tool_call)
                        else:
                            raise ValueError("Invalid tool call type: " + tool_call["type"])

                    # TODO: add support for code interpreter and retrieval tools
                    if tool_call.type == "function":
                        chatbot_queue.put("[new_message]")
                        self.message_output = MessageOutput("function", self.recipient_agent_name, self.agent_name,
                                                            str(tool_call.function))
                        chatbot_queue.put(self.message_output.get_formatted_header() + "\n")

                @override
                def on_tool_call_done(self, snapshot: ToolCall):
                    if isinstance(snapshot, dict):
                        if "type" not in snapshot:
                            snapshot["type"] = "function"

                        if snapshot["type"] == "function":
                            snapshot = FunctionToolCall(**snapshot)
                        elif snapshot["type"] == "code_interpreter":
                            snapshot = CodeInterpreterToolCall(**snapshot)
                        elif snapshot["type"] == "file_search":
                            snapshot = FileSearchToolCall(**snapshot)
                        else:
                            raise ValueError("Invalid tool call type: " + snapshot["type"])

                    self.message_output = None

                    # TODO: add support for code interpreter and retrieval tools
                    if snapshot.type != "function":
                        return

                    chatbot_queue.put(str(snapshot.function))

                    if snapshot.function.name == "SendMessage":
                        try:
                            args = eval(snapshot.function.arguments)
                            recipient = args["recipient"]
                            self.message_output = MessageOutput("text", self.recipient_agent_name, recipient,
                                                                args["message"])

                            chatbot_queue.put("[new_message]")
                            chatbot_queue.put(self.message_output.get_formatted_content())
                        except Exception as e:
                            pass

                    self.message_output = None

                @override
                def on_run_step_done(self, run_step: RunStep) -> None:
                    if run_step.type == "tool_calls":
                        for tool_call in run_step.step_details.tool_calls:
                            if tool_call.type != "function":
                                continue

                            if tool_call.function.name == "SendMessage":
                                continue

                            self.message_output = None
                            chatbot_queue.put("[new_message]")

                            self.message_output = MessageOutput("function_output", tool_call.function.name,
                                                                self.recipient_agent_name,
                                                                tool_call.function.output)

                            chatbot_queue.put(self.message_output.get_formatted_header() + "\n")
                            chatbot_queue.put(tool_call.function.output)

                @override
                @classmethod
                def on_all_streams_end(cls):
                    cls.message_output = None
                    chatbot_queue.put("[end]")

            def bot(original_message, history):
                nonlocal attachments
                nonlocal message_file_names
                nonlocal recipient_agent
                nonlocal recipient_agent_names
                nonlocal images
                nonlocal uploading_files

                if not original_message:
                    return "", history, gr.update(value=recipient_agent.name, choices=set([*recipient_agent_names, recipient_agent.name]))

                if uploading_files:
                    history.append([None, "Uploading files... Please wait."])
                    yield "", history, gr.update(value=recipient_agent.name, choices=set([*recipient_agent_names, recipient_agent.name]))
                    return "", history, gr.update(value=recipient_agent.name, choices=set([*recipient_agent_names, recipient_agent.name]))

                print("Message files: ", attachments)
                print("Images: ", images)

                if images and len(images) > 0:
                    original_message = [
                        {
                            "type": "text",
                            "text": original_message,
                        },
                        *images
                    ]


                completion_thread = threading.Thread(target=self.get_completion_stream, args=(
                    original_message, GradioEventHandler, [], recipient_agent, "", attachments, None))
                completion_thread.start()

                attachments = []
                message_file_names = []
                images = []
                uploading_files = False

                new_message = True
                while True:
                    try:
                        bot_message = chatbot_queue.get(block=True)

                        if bot_message == "[end]":
                            completion_thread.join()
                            break

                        if bot_message == "[new_message]":
                            new_message = True
                            continue

                        if bot_message == "[change_recipient_agent]":
                            new_agent_name = chatbot_queue.get(block=True)
                            recipient_agent = self._get_agent_by_name(new_agent_name)
                            yield "", history, gr.update(value=new_agent_name, choices=set([*recipient_agent_names, recipient_agent.name]))
                            continue

                        if new_message:
                            history.append([None, bot_message])
                            new_message = False
                        else:
                            history[-1][1] += bot_message

                        yield "", history, gr.update(value=recipient_agent.name, choices=set([*recipient_agent_names, recipient_agent.name]))
                    except queue.Empty:
                        break

            button.click(
                user,
                inputs=[msg, chatbot],
                outputs=[msg, chatbot]
            ).then(
                bot, [msg, chatbot, dropdown], [msg, chatbot, dropdown]
            )
            dropdown.change(handle_dropdown_change, dropdown)
            file_upload.change(handle_file_upload, file_upload)
            msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False).then(
                bot, [msg, chatbot], [msg, chatbot, dropdown]
            )

            # Enable queuing for streaming intermediate outputs
            demo.queue(default_concurrency_limit=10)

        # Launch the demo
        demo.launch(**kwargs)
        return demo

    def _recipient_agent_completer(self, text, state):
        """
        Autocomplete completer for recipient agent names.
        """
        options = [agent for agent in self.recipient_agents if agent.lower().startswith(text.lower())]
        if state < len(options):
            return options[state]
        else:
            return None

    def _setup_autocomplete(self):
        """
        Sets up readline with the completer function.
        """
        try:
            import readline
        except ImportError:
            # Attempt to import pyreadline for Windows compatibility
            try:
                import pyreadline as readline
            except ImportError:
                print(
                    "Module 'readline' not found. Autocomplete will not work. If you are using Windows, try installing 'pyreadline3'.")
                return

        if not readline:
            return

        try:
            readline.set_completer(self._recipient_agent_completer)
            readline.parse_and_bind('tab: complete')
        except Exception as e:
            print(f"Error setting up autocomplete for agents in terminal: {e}. Autocomplete will not work.")

    def run_demo(self):
        """
        Executes agency in the terminal with autocomplete for recipient agent names.
        """
        outer_self = self
        from agency_swarm import AgencyEventHandler
        class TermEventHandler(AgencyEventHandler):
            message_output = None

            @override
            def on_message_created(self, message: Message) -> None:
                if message.role == "user":
                    self.message_output = MessageOutputLive("text", self.agent_name, self.recipient_agent_name,
                                                            "")
                    self.message_output.cprint_update(message.content[0].text.value)
                else:
                    self.message_output = MessageOutputLive("text", self.recipient_agent_name, self.agent_name, "")

            @override
            def on_message_done(self, message: Message) -> None:
                self.message_output = None

            @override
            def on_text_delta(self, delta, snapshot):
                self.message_output.cprint_update(snapshot.value)

            @override
            def on_tool_call_created(self, tool_call):
                if isinstance(tool_call, dict):
                    if "type" not in tool_call:
                        tool_call["type"] = "function"

                    if tool_call["type"] == "function":
                        tool_call = FunctionToolCall(**tool_call)
                    elif tool_call["type"] == "code_interpreter":
                        tool_call = CodeInterpreterToolCall(**tool_call)
                    elif tool_call["type"] == "file_search" or tool_call["type"] == "retrieval":
                        tool_call = FileSearchToolCall(**tool_call)
                    else:
                        raise ValueError("Invalid tool call type: " + tool_call["type"])

                # TODO: add support for code interpreter and retirieval tools

                if tool_call.type == "function":
                    self.message_output = MessageOutputLive("function", self.recipient_agent_name, self.agent_name,
                                                            str(tool_call.function))

            @override
            def on_tool_call_delta(self, delta, snapshot):
                if isinstance(snapshot, dict):
                    if "type" not in snapshot:
                        snapshot["type"] = "function"

                    if snapshot["type"] == "function":
                        snapshot = FunctionToolCall(**snapshot)
                    elif snapshot["type"] == "code_interpreter":
                        snapshot = CodeInterpreterToolCall(**snapshot)
                    elif snapshot["type"] == "file_search":
                        snapshot = FileSearchToolCall(**snapshot)
                    else:
                        raise ValueError("Invalid tool call type: " + snapshot["type"])

                self.message_output.cprint_update(str(snapshot.function))

            @override
            def on_tool_call_done(self, snapshot):
                self.message_output = None

                # TODO: add support for code interpreter and retrieval tools
                if snapshot.type != "function":
                    return

                if snapshot.function.name == "SendMessage" and not (hasattr(outer_self.send_message_tool_class.ToolConfig, 'output_as_result') and outer_self.send_message_tool_class.ToolConfig.output_as_result):
                    try:
                        args = eval(snapshot.function.arguments)
                        recipient = args["recipient"]
                        self.message_output = MessageOutputLive("text", self.recipient_agent_name, recipient,
                                                                "")

                        self.message_output.cprint_update(args["message"])
                    except Exception as e:
                        pass

                self.message_output = None

            @override
            def on_run_step_done(self, run_step: RunStep) -> None:
                if run_step.type == "tool_calls":
                    for tool_call in run_step.step_details.tool_calls:
                        if tool_call.type != "function":
                            continue

                        if tool_call.function.name == "SendMessage":
                            continue

                        self.message_output = None
                        self.message_output = MessageOutputLive("function_output", tool_call.function.name,
                                                                self.recipient_agent_name, tool_call.function.output)
                        self.message_output.cprint_update(tool_call.function.output)

                    self.message_output = None

            @override
            def on_end(self):
                self.message_output = None

        self.recipient_agents = [str(agent.name) for agent in self.main_recipients]

        self._setup_autocomplete()  # Prepare readline for autocomplete

        while True:
            console.rule()
            text = input("👤 USER: ")

            if not text:
                continue

            if text.lower() == "exit":
                break

            recipient_agent = None
            if "@" in text:
                recipient_agent = text.split("@")[1].split(" ")[0]
                text = text.replace(f"@{recipient_agent}", "").strip()
                try:
                    recipient_agent = \
                        [agent for agent in self.recipient_agents if agent.lower() == recipient_agent.lower()][0]
                    recipient_agent = self._get_agent_by_name(recipient_agent)
                except Exception as e:
                    print(f"Recipient agent {recipient_agent} not found.")
                    continue

            self.get_completion_stream(message=text, event_handler=TermEventHandler, recipient_agent=recipient_agent)

    def get_customgpt_schema(self, url: str):
        """Returns the OpenAPI schema for the agency from the CEO agent, that you can use to integrate with custom gpts.

        Parameters:
            url (str): Your server url where the api will be hosted.
        """

        return self.ceo.get_openapi_schema(url)

    def plot_agency_chart(self):
        pass

    def _init_agents(self):
        """
        Initializes all agents in the agency with unique IDs, shared instructions, and OpenAI models.

        This method iterates through each agent in the agency, assigns a unique ID, adds shared instructions, and initializes the OpenAI models for each agent.

        There are no input parameters.

        There are no output parameters as this method is used for internal initialization purposes within the Agency class.
        """
        if self.settings_callbacks:
            loaded_settings = self.settings_callbacks["load"]()
            with open(self.settings_path, 'w') as f:
                json.dump(loaded_settings, f, indent=4)

        for agent in self.agents:
            if "temp_id" in agent.id:
                agent.id = None

            agent.add_shared_instructions(self.shared_instructions)
            agent.settings_path = self.settings_path

            if self.shared_files:
                if isinstance(self.shared_files, str):
                    self.shared_files = [self.shared_files]

                if isinstance(agent.files_folder, str):
                    agent.files_folder = [agent.files_folder]
                    agent.files_folder += self.shared_files
                elif isinstance(agent.files_folder, list):
                    agent.files_folder += self.shared_files

            if self.temperature is not None and agent.temperature is None:
                agent.temperature = self.temperature
            if self.top_p and agent.top_p is None:
                agent.top_p = self.top_p
            if self.max_prompt_tokens is not None and agent.max_prompt_tokens is None:
                agent.max_prompt_tokens = self.max_prompt_tokens
            if self.max_completion_tokens is not None and agent.max_completion_tokens is None:
                agent.max_completion_tokens = self.max_completion_tokens
            if self.truncation_strategy is not None and agent.truncation_strategy is None:
                agent.truncation_strategy = self.truncation_strategy

            if not agent.shared_state:
                agent.shared_state = self.shared_state

            agent.init_oai()

        if self.settings_callbacks:
            with open(self.agents[0].get_settings_path(), 'r') as f:
                settings = f.read()
            settings = json.loads(settings)
            self.settings_callbacks["save"](settings)

    def _init_threads(self):
        """
        Initializes threads for communication between agents within the agency.

        This method creates Thread objects for each pair of interacting agents as defined in the agents_and_threads attribute of the Agency. Each thread facilitates communication and task execution between an agent and its designated recipient agent.

        No input parameters.

        Output Parameters:
            This method does not return any value but updates the agents_and_threads attribute with initialized Thread objects.
        """
        self.main_thread = Thread(self.user, self.ceo)

        # load thread ids
        loaded_thread_ids = {}
        if self.threads_callbacks:
            loaded_thread_ids = self.threads_callbacks["load"]()
            if "main_thread" in loaded_thread_ids and loaded_thread_ids["main_thread"]:
                self.main_thread.id = loaded_thread_ids["main_thread"]
            else:
                self.main_thread.init_thread()

        # Save main_thread into agents_and_threads
        self.agents_and_threads["main_thread"] = self.main_thread

        # initialize threads
        for agent_name, threads in self.agents_and_threads.items():
            if agent_name == "main_thread":
                continue
            for other_agent, items in threads.items():
                # create thread class
                self.agents_and_threads[agent_name][other_agent] = self._thread_type(
                    self._get_agent_by_name(items["agent"]),
                    self._get_agent_by_name(
                        items["recipient_agent"]))

                # load thread id if available
                if agent_name in loaded_thread_ids and other_agent in loaded_thread_ids[agent_name]:
                    self.agents_and_threads[agent_name][other_agent].id = loaded_thread_ids[agent_name][other_agent]
                # init threads if threre are threads callbacks so the ids are saved for later use
                elif self.threads_callbacks:
                    self.agents_and_threads[agent_name][other_agent].init_thread()

        # save thread ids
        if self.threads_callbacks:
            loaded_thread_ids = {}
            for agent_name, threads in self.agents_and_threads.items():
                if agent_name == "main_thread":
                    continue
                loaded_thread_ids[agent_name] = {}
                for other_agent, thread in threads.items():
                    loaded_thread_ids[agent_name][other_agent] = thread.id

            loaded_thread_ids["main_thread"] = self.main_thread.id

            self.threads_callbacks["save"](loaded_thread_ids)

    def _parse_agency_chart(self, agency_chart):
        """
        Parses the provided agency chart to initialize and organize agents within the agency.

        Parameters:
            agency_chart: A structure representing the hierarchical organization of agents within the agency.
                    It can contain Agent objects and lists of Agent objects.

        This method iterates through each node in the agency chart. If a node is an Agent, it is set as the CEO if not already assigned.
        If a node is a list, it iterates through the agents in the list, adding them to the agency and establishing communication
        threads between them. It raises an exception if the agency chart is invalid or if multiple CEOs are defined.
        """
        if not isinstance(agency_chart, list):
            raise Exception("Invalid agency chart.")

        if len(agency_chart) == 0:
            raise Exception("Agency chart cannot be empty.")

        for node in agency_chart:
            if isinstance(node, Agent):
                if not self.ceo:
                    self.ceo = node
                    self._add_agent(self.ceo)
                else:
                    self._add_agent(node)
                self._add_main_recipient(node)

            elif isinstance(node, list):
                for i, agent in enumerate(node):
                    if not isinstance(agent, Agent):
                        raise Exception("Invalid agency chart.")

                    index = self._add_agent(agent)

                    if i == len(node) - 1:
                        continue

                    if agent.name not in self.agents_and_threads.keys():
                        self.agents_and_threads[agent.name] = {}

                    if i < len(node) - 1:
                        other_agent = node[i + 1]
                        if other_agent.name == agent.name:
                            continue
                        if other_agent.name not in self.agents_and_threads[agent.name].keys():
                            self.agents_and_threads[agent.name][other_agent.name] = {
                                "agent": agent.name,
                                "recipient_agent": other_agent.name,
                            }
            else:
                raise Exception("Invalid agency chart.")

    def _add_agent(self, agent):
        """
        Adds an agent to the agency, assigning a temporary ID if necessary.

        Parameters:
            agent (Agent): The agent to be added to the agency.

        Returns:
            int: The index of the added agent within the agency's agents list.

        This method adds an agent to the agency's list of agents. If the agent does not have an ID, it assigns a temporary unique ID. It checks for uniqueness of the agent's name before addition. The method returns the index of the agent in the agency's agents list, which is used for referencing the agent within the agency.
        """
        if not agent.id:
            # assign temp id
            agent.id = "temp_id_" + str(uuid.uuid4())
        if agent.id not in self._get_agent_ids():
            if agent.name in self._get_agent_names():
                raise Exception("Agent names must be unique.")
            self.agents.append(agent)
            return len(self.agents) - 1
        else:
            return self._get_agent_ids().index(agent.id)

    def _add_main_recipient(self, agent):
        """
        Adds an agent to the agency's list of main recipients.

        Parameters:
            agent (Agent): The agent to be added to the agency's list of main recipients.

        This method adds an agent to the agency's list of main recipients. These are agents that can be directly contacted by the user.
        """
        main_recipient_ids = [agent.id for agent in self.main_recipients]

        if agent.id not in main_recipient_ids:
            self.main_recipients.append(agent)

    def _read_instructions(self, path):
        """
        Reads shared instructions from a specified file and stores them in the agency.

        Parameters:
            path (str): The file path from which to read the shared instructions.

        This method opens the file located at the given path, reads its contents, and stores these contents in the 'shared_instructions' attribute of the agency. This is used to provide common guidelines or instructions to all agents within the agency.
        """
        path = path
        with open(path, 'r') as f:
            self.shared_instructions = f.read()

    def _create_special_tools(self):
        """
        Creates and assigns 'SendMessage' tools to each agent based on the agency's structure.

        This method iterates through the agents and threads in the agency, creating SendMessage tools for each agent. These tools enable agents to send messages to other agents as defined in the agency's structure. The SendMessage tools are tailored to the specific recipient agents that each agent can communicate with.

        No input parameters.

        No output parameters; this method modifies the agents' toolset internally.
        """
        for agent_name, threads in self.agents_and_threads.items():
            if agent_name == "main_thread":
                continue
            recipient_names = list(threads.keys())
            recipient_agents = self._get_agents_by_names(recipient_names)
            if len(recipient_agents) == 0:
                continue
            agent = self._get_agent_by_name(agent_name)
            agent.add_tool(self._create_send_message_tool(agent, recipient_agents))
            if self._thread_type == ThreadAsync:
                agent.add_tool(self._create_get_response_tool(agent, recipient_agents))

    def _create_send_message_tool(self, agent: Agent, recipient_agents: List[Agent]):
        """
        Creates a SendMessage tool to enable an agent to send messages to specified recipient agents.


        Parameters:
            agent (Agent): The agent who will be sending messages.
            recipient_agents (List[Agent]): A list of recipient agents who can receive messages.

        Returns:
            SendMessage: A SendMessage tool class that is dynamically created and configured for the given agent and its recipient agents. This tool allows the agent to send messages to the specified recipients, facilitating inter-agent communication within the agency.
        """
        recipient_names = [agent.name for agent in recipient_agents]
        recipients = Enum("recipient", {name: name for name in recipient_names})

        agent_descriptions = ""
        for recipient_agent in recipient_agents:
            if not recipient_agent.description:
                continue
            agent_descriptions += recipient_agent.name + ": "
            agent_descriptions += recipient_agent.description + "\n"

        class SendMessage(self.send_message_tool_class):
            recipient: recipients = Field(..., description=agent_descriptions)

            @field_validator('recipient')
            @classmethod
            def check_recipient(cls, value):
                if value.value not in recipient_names:
                    raise ValueError(f"Recipient {value} is not valid. Valid recipients are: {recipient_names}")
                return value

        SendMessage._caller_agent = agent
        SendMessage._agents_and_threads = self.agents_and_threads

        return SendMessage

    def _create_get_response_tool(self, agent: Agent, recipient_agents: List[Agent]):
        """
        Creates a CheckStatus tool to enable an agent to check the status of a task with a specified recipient agent.
        """
        recipient_names = [agent.name for agent in recipient_agents]
        recipients = Enum("recipient", {name: name for name in recipient_names})

        outer_self = self

        class GetResponse(BaseTool):
            """This tool allows you to check the status of a task or get a response from a specified recipient agent, if the task has been completed. You must always use 'SendMessage' tool with the designated agent first."""
            recipient: recipients = Field(...,
                                          description=f"Recipient agent that you want to check the status of. Valid recipients are: {recipient_names}")

            @field_validator('recipient')
            def check_recipient(cls, value):
                if value.value not in recipient_names:
                    raise ValueError(f"Recipient {value} is not valid. Valid recipients are: {recipient_names}")
                return value

            def run(self):
                thread = outer_self.agents_and_threads[self._caller_agent.name][self.recipient.value]

                return thread.check_status()

        GetResponse._caller_agent = agent

        return GetResponse

    def _get_agent_by_name(self, agent_name):
        """
        Retrieves an agent from the agency based on the agent's name.

        Parameters:
            agent_name (str): The name of the agent to be retrieved.

        Returns:
            Agent: The agent object with the specified name.

        Raises:
            Exception: If no agent with the given name is found in the agency.
        """
        for agent in self.agents:
            if agent.name == agent_name:
                return agent
        raise Exception(f"Agent {agent_name} not found.")

    def _get_agents_by_names(self, agent_names):
        """
        Retrieves a list of agent objects based on their names.

        Parameters:
            agent_names: A list of strings representing the names of the agents to be retrieved.

        Returns:
            A list of Agent objects corresponding to the given names.
        """
        return [self._get_agent_by_name(agent_name) for agent_name in agent_names]

    def _get_agent_ids(self):
        """
        Retrieves the IDs of all agents currently in the agency.

        Returns:
            List[str]: A list containing the unique IDs of all agents.
        """
        return [agent.id for agent in self.agents]

    def _get_agent_names(self):
        """
        Retrieves the names of all agents in the agency.

        Returns:
            List[str]: A list of names of all agents currently part of the agency.
        """
        return [agent.name for agent in self.agents]

    def _get_class_folder_path(self):
        """
        Retrieves the absolute path of the directory containing the class file.

        Returns:
            str: The absolute path of the directory where the class file is located.
        """
        return os.path.abspath(os.path.dirname(inspect.getfile(self.__class__)))

    def delete(self):
        """
        This method deletes the agency and all its agents, cleaning up any files and vector stores associated with each agent.
        """
        for agent in self.agents:
            agent.delete()

__init__(agency_chart, shared_instructions='', shared_files=None, async_mode=None, send_message_tool_class=SendMessage, settings_path='./settings.json', settings_callbacks=None, threads_callbacks=None, temperature=0.3, top_p=1.0, max_prompt_tokens=None, max_completion_tokens=None, truncation_strategy=None)

Initializes the Agency object, setting up agents, threads, and core functionalities.

Parameters:

Name Type Description Default
agency_chart List

The structure defining the hierarchy and interaction of agents within the agency.

required
shared_instructions str

A path to a file containing shared instructions for all agents. Defaults to an empty string.

''
shared_files Union[str, List[str]]

A path to a folder or a list of folders containing shared files for all agents. Defaults to None.

None
async_mode str

Specifies the mode for asynchronous processing. In "threading" mode, all sub-agents run in separate threads. In "tools_threading" mode, all tools run in separate threads, but agents do not. Defaults to None.

None
send_message_tool_class Type[SendMessageBase]

The class to use for the send_message tool. For async communication, use SendMessageAsyncThreading. Defaults to SendMessage.

SendMessage
settings_path str

The path to the settings file for the agency. Must be json. If file does not exist, it will be created. Defaults to None.

'./settings.json'
settings_callbacks SettingsCallbacks

A dictionary containing functions to load and save settings for the agency. The keys must be "load" and "save". Both values must be defined. Defaults to None.

None
threads_callbacks ThreadsCallbacks

A dictionary containing functions to load and save threads for the agency. The keys must be "load" and "save". Both values must be defined. Defaults to None.

None
temperature float

The temperature value to use for the agents. Agent-specific values will override this. Defaults to 0.3.

0.3
top_p float

The top_p value to use for the agents. Agent-specific values will override this. Defaults to None.

1.0
max_prompt_tokens int

The maximum number of tokens allowed in the prompt for each agent. Agent-specific values will override this. Defaults to None.

None
max_completion_tokens int

The maximum number of tokens allowed in the completion for each agent. Agent-specific values will override this. Defaults to None.

None
truncation_strategy dict

The truncation strategy to use for the completion for each agent. Agent-specific values will override this. Defaults to None.

None

This constructor initializes various components of the Agency, including CEO, agents, threads, and user interactions. It parses the agency chart to set up the organizational structure and initializes the messaging tools, agents, and threads necessary for the operation of the agency. Additionally, it prepares a main thread for user interactions.

Source code in agency_swarm/agency/agency.py
def __init__(self,
             agency_chart: List,
             shared_instructions: str = "",
             shared_files: Union[str, List[str]] = None,
             async_mode: Literal['threading', "tools_threading"] = None,
             send_message_tool_class: Type[SendMessageBase] = SendMessage,
             settings_path: str = "./settings.json",
             settings_callbacks: SettingsCallbacks = None,
             threads_callbacks: ThreadsCallbacks = None,
             temperature: float = 0.3,
             top_p: float = 1.0,
             max_prompt_tokens: int = None,
             max_completion_tokens: int = None,
             truncation_strategy: dict = None,
             ):
    """
    Initializes the Agency object, setting up agents, threads, and core functionalities.

    Parameters:
        agency_chart: The structure defining the hierarchy and interaction of agents within the agency.
        shared_instructions (str, optional): A path to a file containing shared instructions for all agents. Defaults to an empty string.
        shared_files (Union[str, List[str]], optional): A path to a folder or a list of folders containing shared files for all agents. Defaults to None.
        async_mode (str, optional): Specifies the mode for asynchronous processing. In "threading" mode, all sub-agents run in separate threads. In "tools_threading" mode, all tools run in separate threads, but agents do not. Defaults to None.
        send_message_tool_class (Type[SendMessageBase], optional): The class to use for the send_message tool. For async communication, use `SendMessageAsyncThreading`. Defaults to SendMessage.
        settings_path (str, optional): The path to the settings file for the agency. Must be json. If file does not exist, it will be created. Defaults to None.
        settings_callbacks (SettingsCallbacks, optional): A dictionary containing functions to load and save settings for the agency. The keys must be "load" and "save". Both values must be defined. Defaults to None.
        threads_callbacks (ThreadsCallbacks, optional): A dictionary containing functions to load and save threads for the agency. The keys must be "load" and "save". Both values must be defined. Defaults to None.
        temperature (float, optional): The temperature value to use for the agents. Agent-specific values will override this. Defaults to 0.3.
        top_p (float, optional): The top_p value to use for the agents. Agent-specific values will override this. Defaults to None.
        max_prompt_tokens (int, optional): The maximum number of tokens allowed in the prompt for each agent. Agent-specific values will override this. Defaults to None.
        max_completion_tokens (int, optional): The maximum number of tokens allowed in the completion for each agent. Agent-specific values will override this. Defaults to None.
        truncation_strategy (dict, optional): The truncation strategy to use for the completion for each agent. Agent-specific values will override this. Defaults to None.

    This constructor initializes various components of the Agency, including CEO, agents, threads, and user interactions. It parses the agency chart to set up the organizational structure and initializes the messaging tools, agents, and threads necessary for the operation of the agency. Additionally, it prepares a main thread for user interactions.
    """
    self.ceo = None
    self.user = User()
    self.agents = []
    self.agents_and_threads = {}
    self.main_recipients = []
    self.main_thread = None
    self.recipient_agents = None  # for autocomplete
    self.shared_files = shared_files if shared_files else []
    self.async_mode = async_mode
    self.send_message_tool_class = send_message_tool_class
    self.settings_path = settings_path
    self.settings_callbacks = settings_callbacks
    self.threads_callbacks = threads_callbacks
    self.temperature = temperature
    self.top_p = top_p
    self.max_prompt_tokens = max_prompt_tokens
    self.max_completion_tokens = max_completion_tokens
    self.truncation_strategy = truncation_strategy

    # set thread type based send_message_tool_class async mode
    if hasattr(send_message_tool_class.ToolConfig, "async_mode") and send_message_tool_class.ToolConfig.async_mode:
        self._thread_type = ThreadAsync
    else:
        self._thread_type = Thread  

    if self.async_mode == "threading":
        from agency_swarm.tools.send_message import SendMessageAsyncThreading
        print("Warning: 'threading' mode is deprecated. Please use send_message_tool_class = SendMessageAsyncThreading to use async communication.")
        self.send_message_tool_class = SendMessageAsyncThreading
    elif self.async_mode == "tools_threading":
        Thread.async_mode = "tools_threading"
        print("Warning: 'tools_threading' mode is deprecated. Use tool.ToolConfig.async_mode = 'threading' instead.")
    elif self.async_mode is None:
        pass
    else:
        raise Exception("Please select async_mode = 'threading' or 'tools_threading'.")

    if os.path.isfile(os.path.join(self._get_class_folder_path(), shared_instructions)):
        self._read_instructions(os.path.join(self._get_class_folder_path(), shared_instructions))
    elif os.path.isfile(shared_instructions):
        self._read_instructions(shared_instructions)
    else:
        self.shared_instructions = shared_instructions

    self.shared_state = SharedState()

    self._parse_agency_chart(agency_chart)
    self._init_threads()
    self._create_special_tools()
    self._init_agents()

delete()

This method deletes the agency and all its agents, cleaning up any files and vector stores associated with each agent.

Source code in agency_swarm/agency/agency.py
def delete(self):
    """
    This method deletes the agency and all its agents, cleaning up any files and vector stores associated with each agent.
    """
    for agent in self.agents:
        agent.delete()

demo_gradio(height=450, dark_mode=True, **kwargs)

Launches a Gradio-based demo interface for the agency chatbot.

Parameters:

Name Type Description Default
height int

The height of the chatbot widget in the Gradio interface. Default is 600.

450
dark_mode bool

Flag to determine if the interface should be displayed in dark mode. Default is True.

True
**kwargs

Additional keyword arguments to be passed to the Gradio interface.

{}

This method sets up and runs a Gradio interface, allowing users to interact with the agency's chatbot. It includes a text input for the user's messages and a chatbot interface for displaying the conversation. The method handles user input and chatbot responses, updating the interface dynamically.

Source code in agency_swarm/agency/agency.py
def demo_gradio(self, height=450, dark_mode=True, **kwargs):
    """
    Launches a Gradio-based demo interface for the agency chatbot.

    Parameters:
        height (int, optional): The height of the chatbot widget in the Gradio interface. Default is 600.
        dark_mode (bool, optional): Flag to determine if the interface should be displayed in dark mode. Default is True.
        **kwargs: Additional keyword arguments to be passed to the Gradio interface.
    This method sets up and runs a Gradio interface, allowing users to interact with the agency's chatbot. It includes a text input for the user's messages and a chatbot interface for displaying the conversation. The method handles user input and chatbot responses, updating the interface dynamically.
    """

    try:
        import gradio as gr
    except ImportError:
        raise Exception("Please install gradio: pip install gradio")

    js = """function () {
      gradioURL = window.location.href
      if (!gradioURL.endsWith('?__theme={theme}')) {
        window.location.replace(gradioURL + '?__theme={theme}');
      }
    }"""

    if dark_mode:
        js = js.replace("{theme}", "dark")
    else:
        js = js.replace("{theme}", "light")

    attachments = []
    images = []
    message_file_names = None
    uploading_files = False
    recipient_agent_names = [agent.name for agent in self.main_recipients]
    recipient_agent = self.main_recipients[0]

    with gr.Blocks(js=js) as demo:
        chatbot_queue = queue.Queue()
        chatbot = gr.Chatbot(height=height)
        with gr.Row():
            with gr.Column(scale=9):
                dropdown = gr.Dropdown(label="Recipient Agent", choices=recipient_agent_names,
                                       value=recipient_agent.name)
                msg = gr.Textbox(label="Your Message", lines=4)
            with gr.Column(scale=1):
                file_upload = gr.Files(label="OpenAI Files", type="filepath")
        button = gr.Button(value="Send", variant="primary")

        def handle_dropdown_change(selected_option):
            nonlocal recipient_agent
            recipient_agent = self._get_agent_by_name(selected_option)

        def handle_file_upload(file_list):
            nonlocal attachments
            nonlocal message_file_names
            nonlocal uploading_files
            nonlocal images
            uploading_files = True
            attachments = []
            message_file_names = []
            if file_list:
                try:
                    for file_obj in file_list:
                        purpose = get_file_purpose(file_obj.name)

                        with open(file_obj.name, 'rb') as f:
                            # Upload the file to OpenAI
                            file = self.main_thread.client.files.create(
                                file=f,
                                purpose=purpose
                            )

                        if purpose == "vision":
                            images.append({
                                "type": "image_file",
                                "image_file": {"file_id": file.id}
                            })
                        else:
                            attachments.append({
                                "file_id": file.id,
                                "tools": get_tools(file.filename)
                            })

                        message_file_names.append(file.filename)
                        print(f"Uploaded file ID: {file.id}")
                    return attachments
                except Exception as e:
                    print(f"Error: {e}")
                    return str(e)
                finally:
                    uploading_files = False

            uploading_files = False
            return "No files uploaded"

        def user(user_message, history):
            if not user_message.strip():
                return user_message, history

            nonlocal message_file_names
            nonlocal uploading_files
            nonlocal images
            nonlocal attachments
            nonlocal recipient_agent

            # Check if attachments contain file search or code interpreter types
            def check_and_add_tools_in_attachments(attachments, recipient_agent):
                for attachment in attachments:
                    for tool in attachment.get("tools", []):
                        if tool["type"] == "file_search":
                            if not any(isinstance(t, FileSearch) for t in recipient_agent.tools):
                                # Add FileSearch tool if it does not exist
                                recipient_agent.tools.append(FileSearch)
                                recipient_agent.client.beta.assistants.update(recipient_agent.id, tools=recipient_agent.get_oai_tools())
                                print("Added FileSearch tool to recipient agent to analyze the file.")
                        elif tool["type"] == "code_interpreter":
                            if not any(isinstance(t, CodeInterpreter) for t in recipient_agent.tools):
                                # Add CodeInterpreter tool if it does not exist
                                recipient_agent.tools.append(CodeInterpreter)
                                recipient_agent.client.beta.assistants.update(recipient_agent.id, tools=recipient_agent.get_oai_tools())
                                print("Added CodeInterpreter tool to recipient agent to analyze the file.")
                return None

            check_and_add_tools_in_attachments(attachments, recipient_agent)

            if history is None:
                history = []

            original_user_message = user_message

            # Append the user message with a placeholder for bot response
            if recipient_agent:
                user_message = f"👤 User 🗣️ @{recipient_agent.name}:\n" + user_message.strip()
            else:
                user_message = f"👤 User:" + user_message.strip()

            nonlocal message_file_names
            if message_file_names:
                user_message += "\n\n📎 Files:\n" + "\n".join(message_file_names)

            return original_user_message, history + [[user_message, None]]

        class GradioEventHandler(AgencyEventHandler):
            message_output = None

            @classmethod
            def change_recipient_agent(cls, recipient_agent_name):
                nonlocal chatbot_queue
                chatbot_queue.put("[change_recipient_agent]")
                chatbot_queue.put(recipient_agent_name)

            @override
            def on_message_created(self, message: Message) -> None:
                if message.role == "user":
                    full_content = ""
                    for content in message.content:
                        if content.type == "image_file":
                            full_content += f"🖼️ Image File: {content.image_file.file_id}\n"
                            continue

                        if content.type == "image_url":
                            full_content += f"\n{content.image_url.url}\n"
                            continue

                        if content.type == "text":
                            full_content += content.text.value + "\n"


                    self.message_output = MessageOutput("text", self.agent_name, self.recipient_agent_name,
                                                        full_content)

                else:
                    self.message_output = MessageOutput("text", self.recipient_agent_name, self.agent_name,
                                                        "")

                chatbot_queue.put("[new_message]")
                chatbot_queue.put(self.message_output.get_formatted_content())

            @override
            def on_text_delta(self, delta, snapshot):
                chatbot_queue.put(delta.value)

            @override
            def on_tool_call_created(self, tool_call: ToolCall):
                if isinstance(tool_call, dict):
                    if "type" not in tool_call:
                        tool_call["type"] = "function"

                    if tool_call["type"] == "function":
                        tool_call = FunctionToolCall(**tool_call)
                    elif tool_call["type"] == "code_interpreter":
                        tool_call = CodeInterpreterToolCall(**tool_call)
                    elif tool_call["type"] == "file_search" or tool_call["type"] == "retrieval":
                        tool_call = FileSearchToolCall(**tool_call)
                    else:
                        raise ValueError("Invalid tool call type: " + tool_call["type"])

                # TODO: add support for code interpreter and retrieval tools
                if tool_call.type == "function":
                    chatbot_queue.put("[new_message]")
                    self.message_output = MessageOutput("function", self.recipient_agent_name, self.agent_name,
                                                        str(tool_call.function))
                    chatbot_queue.put(self.message_output.get_formatted_header() + "\n")

            @override
            def on_tool_call_done(self, snapshot: ToolCall):
                if isinstance(snapshot, dict):
                    if "type" not in snapshot:
                        snapshot["type"] = "function"

                    if snapshot["type"] == "function":
                        snapshot = FunctionToolCall(**snapshot)
                    elif snapshot["type"] == "code_interpreter":
                        snapshot = CodeInterpreterToolCall(**snapshot)
                    elif snapshot["type"] == "file_search":
                        snapshot = FileSearchToolCall(**snapshot)
                    else:
                        raise ValueError("Invalid tool call type: " + snapshot["type"])

                self.message_output = None

                # TODO: add support for code interpreter and retrieval tools
                if snapshot.type != "function":
                    return

                chatbot_queue.put(str(snapshot.function))

                if snapshot.function.name == "SendMessage":
                    try:
                        args = eval(snapshot.function.arguments)
                        recipient = args["recipient"]
                        self.message_output = MessageOutput("text", self.recipient_agent_name, recipient,
                                                            args["message"])

                        chatbot_queue.put("[new_message]")
                        chatbot_queue.put(self.message_output.get_formatted_content())
                    except Exception as e:
                        pass

                self.message_output = None

            @override
            def on_run_step_done(self, run_step: RunStep) -> None:
                if run_step.type == "tool_calls":
                    for tool_call in run_step.step_details.tool_calls:
                        if tool_call.type != "function":
                            continue

                        if tool_call.function.name == "SendMessage":
                            continue

                        self.message_output = None
                        chatbot_queue.put("[new_message]")

                        self.message_output = MessageOutput("function_output", tool_call.function.name,
                                                            self.recipient_agent_name,
                                                            tool_call.function.output)

                        chatbot_queue.put(self.message_output.get_formatted_header() + "\n")
                        chatbot_queue.put(tool_call.function.output)

            @override
            @classmethod
            def on_all_streams_end(cls):
                cls.message_output = None
                chatbot_queue.put("[end]")

        def bot(original_message, history):
            nonlocal attachments
            nonlocal message_file_names
            nonlocal recipient_agent
            nonlocal recipient_agent_names
            nonlocal images
            nonlocal uploading_files

            if not original_message:
                return "", history, gr.update(value=recipient_agent.name, choices=set([*recipient_agent_names, recipient_agent.name]))

            if uploading_files:
                history.append([None, "Uploading files... Please wait."])
                yield "", history, gr.update(value=recipient_agent.name, choices=set([*recipient_agent_names, recipient_agent.name]))
                return "", history, gr.update(value=recipient_agent.name, choices=set([*recipient_agent_names, recipient_agent.name]))

            print("Message files: ", attachments)
            print("Images: ", images)

            if images and len(images) > 0:
                original_message = [
                    {
                        "type": "text",
                        "text": original_message,
                    },
                    *images
                ]


            completion_thread = threading.Thread(target=self.get_completion_stream, args=(
                original_message, GradioEventHandler, [], recipient_agent, "", attachments, None))
            completion_thread.start()

            attachments = []
            message_file_names = []
            images = []
            uploading_files = False

            new_message = True
            while True:
                try:
                    bot_message = chatbot_queue.get(block=True)

                    if bot_message == "[end]":
                        completion_thread.join()
                        break

                    if bot_message == "[new_message]":
                        new_message = True
                        continue

                    if bot_message == "[change_recipient_agent]":
                        new_agent_name = chatbot_queue.get(block=True)
                        recipient_agent = self._get_agent_by_name(new_agent_name)
                        yield "", history, gr.update(value=new_agent_name, choices=set([*recipient_agent_names, recipient_agent.name]))
                        continue

                    if new_message:
                        history.append([None, bot_message])
                        new_message = False
                    else:
                        history[-1][1] += bot_message

                    yield "", history, gr.update(value=recipient_agent.name, choices=set([*recipient_agent_names, recipient_agent.name]))
                except queue.Empty:
                    break

        button.click(
            user,
            inputs=[msg, chatbot],
            outputs=[msg, chatbot]
        ).then(
            bot, [msg, chatbot, dropdown], [msg, chatbot, dropdown]
        )
        dropdown.change(handle_dropdown_change, dropdown)
        file_upload.change(handle_file_upload, file_upload)
        msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False).then(
            bot, [msg, chatbot], [msg, chatbot, dropdown]
        )

        # Enable queuing for streaming intermediate outputs
        demo.queue(default_concurrency_limit=10)

    # Launch the demo
    demo.launch(**kwargs)
    return demo

get_completion(message, message_files=None, yield_messages=False, recipient_agent=None, additional_instructions=None, attachments=None, tool_choice=None, verbose=False, response_format=None)

Retrieves the completion for a given message from the main thread.

Parameters:

Name Type Description Default
message str

The message for which completion is to be retrieved.

required
message_files list

A list of file ids to be sent as attachments with the message. When using this parameter, files will be assigned both to file_search and code_interpreter tools if available. It is recommended to assign files to the most sutiable tool manually, using the attachments parameter. Defaults to None.

None
yield_messages bool

Flag to determine if intermediate messages should be yielded. Defaults to True.

False
recipient_agent Agent

The agent to which the message should be sent. Defaults to the first agent in the agency chart.

None
additional_instructions str

Additional instructions to be sent with the message. Defaults to None.

None
attachments List[dict]

A list of attachments to be sent with the message, following openai format. Defaults to None.

None
tool_choice dict

The tool choice for the recipient agent to use. Defaults to None.

None
parallel_tool_calls bool

Whether to enable parallel function calling during tool use. Defaults to True.

required
verbose bool

Whether to print the intermediary messages in console. Defaults to False.

False
response_format dict

The response format to use for the completion.

None

Returns:

Type Description

Generator or final response: Depending on the 'yield_messages' flag, this method returns either a generator yielding intermediate messages or the final response from the main thread.

Source code in agency_swarm/agency/agency.py
def get_completion(self, message: str,
                   message_files: List[str] = None,
                   yield_messages: bool = False,
                   recipient_agent: Agent = None,
                   additional_instructions: str = None,
                   attachments: List[dict] = None,
                   tool_choice: dict = None,
                   verbose: bool = False,
                   response_format: dict = None):
    """
    Retrieves the completion for a given message from the main thread.

    Parameters:
        message (str): The message for which completion is to be retrieved.
        message_files (list, optional): A list of file ids to be sent as attachments with the message. When using this parameter, files will be assigned both to file_search and code_interpreter tools if available. It is recommended to assign files to the most sutiable tool manually, using the attachments parameter.  Defaults to None.
        yield_messages (bool, optional): Flag to determine if intermediate messages should be yielded. Defaults to True.
        recipient_agent (Agent, optional): The agent to which the message should be sent. Defaults to the first agent in the agency chart.
        additional_instructions (str, optional): Additional instructions to be sent with the message. Defaults to None.
        attachments (List[dict], optional): A list of attachments to be sent with the message, following openai format. Defaults to None.
        tool_choice (dict, optional): The tool choice for the recipient agent to use. Defaults to None.
        parallel_tool_calls (bool, optional): Whether to enable parallel function calling during tool use. Defaults to True.
        verbose (bool, optional): Whether to print the intermediary messages in console. Defaults to False.
        response_format (dict, optional): The response format to use for the completion.

    Returns:
        Generator or final response: Depending on the 'yield_messages' flag, this method returns either a generator yielding intermediate messages or the final response from the main thread.
    """
    if verbose and yield_messages:
        raise Exception("Verbose mode is not compatible with yield_messages=True")

    res = self.main_thread.get_completion(message=message,
                                           message_files=message_files,
                                           attachments=attachments,
                                           recipient_agent=recipient_agent,
                                           additional_instructions=additional_instructions,
                                           tool_choice=tool_choice,
                                           yield_messages=yield_messages or verbose,
                                           response_format=response_format)

    if not yield_messages or verbose:
        while True:
            try:
                message = next(res)
                if verbose:
                    message.cprint()
            except StopIteration as e:
                return e.value

    return res

get_completion_parse(message, response_format, message_files=None, recipient_agent=None, additional_instructions=None, attachments=None, tool_choice=None, verbose=False)

Retrieves the completion for a given message from the main thread and parses the response using the provided pydantic model.

Parameters:

Name Type Description Default
message str

The message for which completion is to be retrieved.

required
response_format type(BaseModel

The response format to use for the completion.

required
message_files list

A list of file ids to be sent as attachments with the message. When using this parameter, files will be assigned both to file_search and code_interpreter tools if available. It is recommended to assign files to the most sutiable tool manually, using the attachments parameter. Defaults to None.

None
recipient_agent Agent

The agent to which the message should be sent. Defaults to the first agent in the agency chart.

None
additional_instructions str

Additional instructions to be sent with the message. Defaults to None.

None
attachments List[dict]

A list of attachments to be sent with the message, following openai format. Defaults to None.

None
tool_choice dict

The tool choice for the recipient agent to use. Defaults to None.

None
verbose bool

Whether to print the intermediary messages in console. Defaults to False.

False

Returns:

Type Description
T

Final response: The final response from the main thread, parsed using the provided pydantic model.

Source code in agency_swarm/agency/agency.py
def get_completion_parse(self, message: str,
                         response_format: Type[T],
                         message_files: List[str] = None,
                         recipient_agent: Agent = None,
                         additional_instructions: str = None,
                         attachments: List[dict] = None,
                         tool_choice: dict = None,
                         verbose: bool = False) -> T:
    """
    Retrieves the completion for a given message from the main thread and parses the response using the provided pydantic model.

    Parameters:
        message (str): The message for which completion is to be retrieved.
        response_format (type(BaseModel)): The response format to use for the completion. 
        message_files (list, optional): A list of file ids to be sent as attachments with the message. When using this parameter, files will be assigned both to file_search and code_interpreter tools if available. It is recommended to assign files to the most sutiable tool manually, using the attachments parameter.  Defaults to None.
        recipient_agent (Agent, optional): The agent to which the message should be sent. Defaults to the first agent in the agency chart.
        additional_instructions (str, optional): Additional instructions to be sent with the message. Defaults to None.
        attachments (List[dict], optional): A list of attachments to be sent with the message, following openai format. Defaults to None.
        tool_choice (dict, optional): The tool choice for the recipient agent to use. Defaults to None.
        verbose (bool, optional): Whether to print the intermediary messages in console. Defaults to False.

    Returns:
        Final response: The final response from the main thread, parsed using the provided pydantic model.
    """
    response_model = None
    if isinstance(response_format, type):
        response_model = response_format
        response_format = type_to_response_format_param(response_format)

    res = self.get_completion(message=message,
                        message_files=message_files,
                        recipient_agent=recipient_agent,
                        additional_instructions=additional_instructions,
                        attachments=attachments,
                        tool_choice=tool_choice,
                        response_format=response_format,
                        verbose=verbose)

    try:
        return response_model.model_validate_json(res)
    except:
        parsed_res = json.loads(res)
        if 'refusal' in parsed_res:
            raise RefusalError(parsed_res['refusal'])
        else:
            raise Exception("Failed to parse response: " + res)

get_completion_stream(message, event_handler, message_files=None, recipient_agent=None, additional_instructions=None, attachments=None, tool_choice=None, response_format=None)

Generates a stream of completions for a given message from the main thread.

Parameters:

Name Type Description Default
message str

The message for which completion is to be retrieved.

required
event_handler type(AgencyEventHandler

The event handler class to handle the completion stream. https://github.com/openai/openai-python/blob/main/helpers.md

required
message_files list

A list of file ids to be sent as attachments with the message. When using this parameter, files will be assigned both to file_search and code_interpreter tools if available. It is recommended to assign files to the most sutiable tool manually, using the attachments parameter. Defaults to None.

None
recipient_agent Agent

The agent to which the message should be sent. Defaults to the first agent in the agency chart.

None
additional_instructions str

Additional instructions to be sent with the message. Defaults to None.

None
attachments List[dict]

A list of attachments to be sent with the message, following openai format. Defaults to None.

None
tool_choice dict

The tool choice for the recipient agent to use. Defaults to None.

None
parallel_tool_calls bool

Whether to enable parallel function calling during tool use. Defaults to True.

required

Returns:

Type Description

Final response: Final response from the main thread.

Source code in agency_swarm/agency/agency.py
def get_completion_stream(self,
                          message: str,
                          event_handler: type(AgencyEventHandler),
                          message_files: List[str] = None,
                          recipient_agent: Agent = None,
                          additional_instructions: str = None,
                          attachments: List[dict] = None,
                          tool_choice: dict = None,
                          response_format: dict = None):
    """
    Generates a stream of completions for a given message from the main thread.

    Parameters:
        message (str): The message for which completion is to be retrieved.
        event_handler (type(AgencyEventHandler)): The event handler class to handle the completion stream. https://github.com/openai/openai-python/blob/main/helpers.md
        message_files (list, optional): A list of file ids to be sent as attachments with the message. When using this parameter, files will be assigned both to file_search and code_interpreter tools if available. It is recommended to assign files to the most sutiable tool manually, using the attachments parameter.  Defaults to None.
        recipient_agent (Agent, optional): The agent to which the message should be sent. Defaults to the first agent in the agency chart.
        additional_instructions (str, optional): Additional instructions to be sent with the message. Defaults to None.
        attachments (List[dict], optional): A list of attachments to be sent with the message, following openai format. Defaults to None.
        tool_choice (dict, optional): The tool choice for the recipient agent to use. Defaults to None.
        parallel_tool_calls (bool, optional): Whether to enable parallel function calling during tool use. Defaults to True.

    Returns:
        Final response: Final response from the main thread.
    """
    if not inspect.isclass(event_handler):
        raise Exception("Event handler must not be an instance.")

    res = self.main_thread.get_completion_stream(message=message,
                                                  message_files=message_files,
                                                  event_handler=event_handler,
                                                  attachments=attachments,
                                                  recipient_agent=recipient_agent,
                                                  additional_instructions=additional_instructions,
                                                  tool_choice=tool_choice,
                                                  response_format=response_format)

    while True:
        try:
            next(res)
        except StopIteration as e:
            event_handler.on_all_streams_end()

            return e.value

get_customgpt_schema(url)

Returns the OpenAPI schema for the agency from the CEO agent, that you can use to integrate with custom gpts.

Parameters:

Name Type Description Default
url str

Your server url where the api will be hosted.

required
Source code in agency_swarm/agency/agency.py
def get_customgpt_schema(self, url: str):
    """Returns the OpenAPI schema for the agency from the CEO agent, that you can use to integrate with custom gpts.

    Parameters:
        url (str): Your server url where the api will be hosted.
    """

    return self.ceo.get_openapi_schema(url)

run_demo()

Executes agency in the terminal with autocomplete for recipient agent names.

Source code in agency_swarm/agency/agency.py
def run_demo(self):
    """
    Executes agency in the terminal with autocomplete for recipient agent names.
    """
    outer_self = self
    from agency_swarm import AgencyEventHandler
    class TermEventHandler(AgencyEventHandler):
        message_output = None

        @override
        def on_message_created(self, message: Message) -> None:
            if message.role == "user":
                self.message_output = MessageOutputLive("text", self.agent_name, self.recipient_agent_name,
                                                        "")
                self.message_output.cprint_update(message.content[0].text.value)
            else:
                self.message_output = MessageOutputLive("text", self.recipient_agent_name, self.agent_name, "")

        @override
        def on_message_done(self, message: Message) -> None:
            self.message_output = None

        @override
        def on_text_delta(self, delta, snapshot):
            self.message_output.cprint_update(snapshot.value)

        @override
        def on_tool_call_created(self, tool_call):
            if isinstance(tool_call, dict):
                if "type" not in tool_call:
                    tool_call["type"] = "function"

                if tool_call["type"] == "function":
                    tool_call = FunctionToolCall(**tool_call)
                elif tool_call["type"] == "code_interpreter":
                    tool_call = CodeInterpreterToolCall(**tool_call)
                elif tool_call["type"] == "file_search" or tool_call["type"] == "retrieval":
                    tool_call = FileSearchToolCall(**tool_call)
                else:
                    raise ValueError("Invalid tool call type: " + tool_call["type"])

            # TODO: add support for code interpreter and retirieval tools

            if tool_call.type == "function":
                self.message_output = MessageOutputLive("function", self.recipient_agent_name, self.agent_name,
                                                        str(tool_call.function))

        @override
        def on_tool_call_delta(self, delta, snapshot):
            if isinstance(snapshot, dict):
                if "type" not in snapshot:
                    snapshot["type"] = "function"

                if snapshot["type"] == "function":
                    snapshot = FunctionToolCall(**snapshot)
                elif snapshot["type"] == "code_interpreter":
                    snapshot = CodeInterpreterToolCall(**snapshot)
                elif snapshot["type"] == "file_search":
                    snapshot = FileSearchToolCall(**snapshot)
                else:
                    raise ValueError("Invalid tool call type: " + snapshot["type"])

            self.message_output.cprint_update(str(snapshot.function))

        @override
        def on_tool_call_done(self, snapshot):
            self.message_output = None

            # TODO: add support for code interpreter and retrieval tools
            if snapshot.type != "function":
                return

            if snapshot.function.name == "SendMessage" and not (hasattr(outer_self.send_message_tool_class.ToolConfig, 'output_as_result') and outer_self.send_message_tool_class.ToolConfig.output_as_result):
                try:
                    args = eval(snapshot.function.arguments)
                    recipient = args["recipient"]
                    self.message_output = MessageOutputLive("text", self.recipient_agent_name, recipient,
                                                            "")

                    self.message_output.cprint_update(args["message"])
                except Exception as e:
                    pass

            self.message_output = None

        @override
        def on_run_step_done(self, run_step: RunStep) -> None:
            if run_step.type == "tool_calls":
                for tool_call in run_step.step_details.tool_calls:
                    if tool_call.type != "function":
                        continue

                    if tool_call.function.name == "SendMessage":
                        continue

                    self.message_output = None
                    self.message_output = MessageOutputLive("function_output", tool_call.function.name,
                                                            self.recipient_agent_name, tool_call.function.output)
                    self.message_output.cprint_update(tool_call.function.output)

                self.message_output = None

        @override
        def on_end(self):
            self.message_output = None

    self.recipient_agents = [str(agent.name) for agent in self.main_recipients]

    self._setup_autocomplete()  # Prepare readline for autocomplete

    while True:
        console.rule()
        text = input("👤 USER: ")

        if not text:
            continue

        if text.lower() == "exit":
            break

        recipient_agent = None
        if "@" in text:
            recipient_agent = text.split("@")[1].split(" ")[0]
            text = text.replace(f"@{recipient_agent}", "").strip()
            try:
                recipient_agent = \
                    [agent for agent in self.recipient_agents if agent.lower() == recipient_agent.lower()][0]
                recipient_agent = self._get_agent_by_name(recipient_agent)
            except Exception as e:
                print(f"Recipient agent {recipient_agent} not found.")
                continue

        self.get_completion_stream(message=text, event_handler=TermEventHandler, recipient_agent=recipient_agent)

ToolFactory

Source code in agency_swarm/tools/ToolFactory.py
class ToolFactory:

    @staticmethod
    def from_langchain_tools(tools: List) -> List[Type[BaseTool]]:
        """
        Converts a list of langchain tools into a list of BaseTools.

        Parameters:
            tools: The langchain tools to convert.

        Returns:
            A list of BaseTools.
        """
        converted_tools = []
        for tool in tools:
            converted_tools.append(ToolFactory.from_langchain_tool(tool))

        return converted_tools

    @staticmethod
    def from_langchain_tool(tool) -> Type[BaseTool]:
        """
        Converts a langchain tool into a BaseTool.

        Parameters:
            tool: The langchain tool to convert.

        Returns:
            A BaseTool.
        """
        try:
            from langchain.tools import format_tool_to_openai_function
        except ImportError:
            raise ImportError("You must install langchain to use this method.")

        if inspect.isclass(tool):
            tool = tool()

        def callback(self):
            tool_input = self.model_dump()
            try:
                return tool.run(tool_input)
            except TypeError:
                if len(tool_input) == 1:
                    return tool.run(list(tool_input.values())[0])
                else:
                    raise TypeError(f"Error parsing input for tool '{tool.__class__.__name__}' Please open an issue "
                                    f"on github.")

        return ToolFactory.from_openai_schema(
            format_tool_to_openai_function(tool),
            callback
        )


    @staticmethod
    def from_openai_schema(schema: Dict[str, Any], callback: Any) -> Type[BaseTool]:
        """
        Converts an OpenAI schema into a BaseTool.

        Parameters:
            schema: The OpenAI schema to convert.
            callback: The function to run when the tool is called.

        Returns:
            A BaseTool.
        """
        data_model_types = get_data_model_types(
            DataModelType.PydanticV2BaseModel,
            target_python_version=PythonVersion.PY_37
        )

        parser = JsonSchemaParser(
            json.dumps(schema['parameters']),
            data_model_type=data_model_types.data_model,
            data_model_root_type=data_model_types.root_model,
            data_model_field_type=data_model_types.field_model,
            data_type_manager_type=data_model_types.data_type_manager,
            dump_resolve_reference_action=data_model_types.dump_resolve_reference_action,
            use_schema_description=True,
            validation=False,
            class_name='Model',
            # custom_template_dir=Path('/Users/vrsen/Projects/agency-swarm/agency-swarm/agency_swarm/tools/data_schema_templates')
        )

        result = parser.parse()

        # # Execute the result to extract the model
        exec_globals = {}
        exec(result, exec_globals)
        model = exec_globals.get('Model')

        if not model:
            raise ValueError(f"Could not extract model from schema {schema['name']}")

        class ToolConfig:
            strict: bool = schema.get("strict", False)

        tool = type(schema['name'], (BaseTool, model), {
            "__doc__": schema.get('description', ""),
            "run": callback,
        })

        tool.ToolConfig = ToolConfig

        return tool

    @staticmethod
    def from_openapi_schema(schema: Union[str, dict], headers: Dict[str, str] = None, params: Dict[str, Any] = None, strict: bool = False) \
            -> List[Type[BaseTool]]:
        """
        Converts an OpenAPI schema into a list of BaseTools.

        Parameters:
            schema: The OpenAPI schema to convert.
            headers: The headers to use for requests.
            params: The parameters to use for requests.
            strict: Whether to use strict OpenAI mode.
        Returns:
            A list of BaseTools.
        """
        if isinstance(schema, dict):
            openapi_spec = schema
            openapi_spec = jsonref.JsonRef.replace_refs(openapi_spec)
        else:
            openapi_spec = jsonref.loads(schema)
        tools = []
        headers = headers or {}
        headers = {k: v for k, v in headers.items() if v is not None}
        for path, methods in openapi_spec["paths"].items():
            for method, spec_with_ref in methods.items():
                async def callback(self):
                    url = openapi_spec["servers"][0]["url"] + path
                    parameters = self.model_dump().get('parameters', {})
                    # replace all parameters in url
                    for param, value in parameters.items():
                        if "{" + str(param) + "}" in url:
                            url = url.replace(f"{{{param}}}", str(value))
                            parameters[param] = None
                    url = url.rstrip("/")
                    parameters = {k: v for k, v in parameters.items() if v is not None}
                    parameters = {**parameters, **params} if params else parameters
                    async with httpx.AsyncClient(timeout=90) as client:  # Set custom read timeout to 10 seconds
                        if method == "get":
                            response = await client.get(url, params=parameters, headers=headers)
                        elif method == "post":
                            response = await client.post(url,
                                                         params=parameters,
                                                         json=self.model_dump().get('requestBody', None),
                                                         headers=headers)
                        elif method == "put":
                            response = await client.put(url,
                                                        params=parameters,
                                                        json=self.model_dump().get('requestBody', None),
                                                        headers=headers)
                        elif method == "delete":
                            response = await client.delete(url,
                                                           params=parameters,
                                                           json=self.model_dump().get('requestBody', None),
                                                           headers=headers)
                        return response.json()

                # 1. Resolve JSON references.
                spec = jsonref.replace_refs(spec_with_ref)

                # 2. Extract a name for the functions.
                function_name = spec.get("operationId")

                # 3. Extract a description and parameters.
                desc = spec.get("description") or spec.get("summary", "")

                schema = {"type": "object", "properties": {}}

                req_body = (
                    spec.get("requestBody", {})
                    .get("content", {})
                    .get("application/json", {})
                    .get("schema")
                )
                if req_body:
                    schema["properties"]["requestBody"] = req_body

                spec_params = spec.get("parameters", [])
                if spec_params:
                    param_properties = {}
                    required_params = []
                    for param in spec_params:
                        if "schema" not in param and "type" in param:
                            param["schema"] = {"type": param["type"]}
                        param_properties[param["name"]] = param["schema"]
                        if "description" in param:
                            param_properties[param["name"]]["description"] = param["description"]
                        if "required" in param and param["required"]:
                            required_params.append(param["name"])
                        if "example" in param:
                            param_properties[param["name"]]["example"] = param["example"]
                        if "examples" in param:
                            param_properties[param["name"]]["examples"] = param["examples"]

                    schema["properties"]["parameters"] = {
                        "type": "object",
                        "properties": param_properties,
                        "required": required_params
                    }

                function = {
                    "name": function_name,
                    "description": desc,
                    "parameters": schema,
                    "strict": strict
                }

                tools.append(ToolFactory.from_openai_schema(function, callback))

        return tools

    @staticmethod
    def from_file(file_path: str) -> Type[BaseTool]:
        """Dynamically imports a BaseTool class from a Python file within a package structure.

        Parameters:
            file_path: The file path to the Python file containing the BaseTool class.

        Returns:
            The imported BaseTool class.
        """
        file_path = os.path.relpath(file_path)
        # Normalize the file path to be absolute and extract components
        directory, file_name = os.path.split(file_path)
        import_path = os.path.splitext(file_path)[0].replace(os.sep, ".")
        class_name = os.path.splitext(file_name)[0]

        exec_globals = globals()

        # importing from agency_swarm package
        if "agency_swarm" in import_path:
            import_path = import_path.lstrip(".")
            exec(f"from {import_path} import {class_name}", exec_globals)
        # importing from current working directory
        else:
            current_working_directory = os.getcwd()
            sys.path.append(current_working_directory)
            exec(f"from {import_path} import {class_name}", exec_globals)



        imported_class = exec_globals.get(class_name)
        if not imported_class:
            raise ImportError(f"Could not import {class_name} from {import_path}")

        # Check if the imported class is a subclass of BaseTool
        if not issubclass(imported_class, BaseTool):
            raise TypeError(f"Class {class_name} must be a subclass of BaseTool")

        return imported_class

    @staticmethod
    def get_openapi_schema(tools: List[Type[BaseTool]], url: str, title="Agent Tools",
                           description="A collection of tools.") -> str:
        """
        Generates an OpenAPI schema from a list of BaseTools.

        Parameters:
            tools: BaseTools to generate the schema from.
            url: The base URL for the schema.
            title: The title of the schema.
            description: The description of the schema.

        Returns:
            A JSON string representing the OpenAPI schema with all the tools combined as separate endpoints.
        """
        schema = {
            "openapi": "3.1.0",
            "info": {
                "title": title,
                "description": description,
                "version": "v1.0.0"
            },
            "servers": [
                {
                    "url": url,
                }
            ],
            "paths": {},
            "components": {
                "schemas": {},
                "securitySchemes": {
                    "apiKey": {
                        "type": "apiKey"
                    }
                }
            },
        }

        for tool in tools:
            if not issubclass(tool, BaseTool):
                continue

            openai_schema = tool.openai_schema
            defs = {}
            if '$defs' in openai_schema['parameters']:
                defs = openai_schema['parameters']['$defs']
                del openai_schema['parameters']['$defs']

            schema['paths']["/" + openai_schema['name']] = {
                "post": {
                    "description": openai_schema['description'],
                    "operationId": openai_schema['name'],
                    "x-openai-isConsequential": False,
                    "parameters": [],
                    "requestBody": {
                        "content": {
                            "application/json": {
                                "schema": openai_schema['parameters']
                            }
                        }
                    },
                }
            }

            schema['components']['schemas'].update(defs)

        schema = json.dumps(schema, indent=2).replace("#/$defs/", "#/components/schemas/")

        return schema

from_file(file_path) staticmethod

Dynamically imports a BaseTool class from a Python file within a package structure.

Parameters:

Name Type Description Default
file_path str

The file path to the Python file containing the BaseTool class.

required

Returns:

Type Description
Type[BaseTool]

The imported BaseTool class.

Source code in agency_swarm/tools/ToolFactory.py
@staticmethod
def from_file(file_path: str) -> Type[BaseTool]:
    """Dynamically imports a BaseTool class from a Python file within a package structure.

    Parameters:
        file_path: The file path to the Python file containing the BaseTool class.

    Returns:
        The imported BaseTool class.
    """
    file_path = os.path.relpath(file_path)
    # Normalize the file path to be absolute and extract components
    directory, file_name = os.path.split(file_path)
    import_path = os.path.splitext(file_path)[0].replace(os.sep, ".")
    class_name = os.path.splitext(file_name)[0]

    exec_globals = globals()

    # importing from agency_swarm package
    if "agency_swarm" in import_path:
        import_path = import_path.lstrip(".")
        exec(f"from {import_path} import {class_name}", exec_globals)
    # importing from current working directory
    else:
        current_working_directory = os.getcwd()
        sys.path.append(current_working_directory)
        exec(f"from {import_path} import {class_name}", exec_globals)



    imported_class = exec_globals.get(class_name)
    if not imported_class:
        raise ImportError(f"Could not import {class_name} from {import_path}")

    # Check if the imported class is a subclass of BaseTool
    if not issubclass(imported_class, BaseTool):
        raise TypeError(f"Class {class_name} must be a subclass of BaseTool")

    return imported_class

from_langchain_tool(tool) staticmethod

Converts a langchain tool into a BaseTool.

Parameters:

Name Type Description Default
tool

The langchain tool to convert.

required

Returns:

Type Description
Type[BaseTool]

A BaseTool.

Source code in agency_swarm/tools/ToolFactory.py
@staticmethod
def from_langchain_tool(tool) -> Type[BaseTool]:
    """
    Converts a langchain tool into a BaseTool.

    Parameters:
        tool: The langchain tool to convert.

    Returns:
        A BaseTool.
    """
    try:
        from langchain.tools import format_tool_to_openai_function
    except ImportError:
        raise ImportError("You must install langchain to use this method.")

    if inspect.isclass(tool):
        tool = tool()

    def callback(self):
        tool_input = self.model_dump()
        try:
            return tool.run(tool_input)
        except TypeError:
            if len(tool_input) == 1:
                return tool.run(list(tool_input.values())[0])
            else:
                raise TypeError(f"Error parsing input for tool '{tool.__class__.__name__}' Please open an issue "
                                f"on github.")

    return ToolFactory.from_openai_schema(
        format_tool_to_openai_function(tool),
        callback
    )

from_langchain_tools(tools) staticmethod

Converts a list of langchain tools into a list of BaseTools.

Parameters:

Name Type Description Default
tools List

The langchain tools to convert.

required

Returns:

Type Description
List[Type[BaseTool]]

A list of BaseTools.

Source code in agency_swarm/tools/ToolFactory.py
@staticmethod
def from_langchain_tools(tools: List) -> List[Type[BaseTool]]:
    """
    Converts a list of langchain tools into a list of BaseTools.

    Parameters:
        tools: The langchain tools to convert.

    Returns:
        A list of BaseTools.
    """
    converted_tools = []
    for tool in tools:
        converted_tools.append(ToolFactory.from_langchain_tool(tool))

    return converted_tools

from_openai_schema(schema, callback) staticmethod

Converts an OpenAI schema into a BaseTool.

Parameters:

Name Type Description Default
schema Dict[str, Any]

The OpenAI schema to convert.

required
callback Any

The function to run when the tool is called.

required

Returns:

Type Description
Type[BaseTool]

A BaseTool.

Source code in agency_swarm/tools/ToolFactory.py
@staticmethod
def from_openai_schema(schema: Dict[str, Any], callback: Any) -> Type[BaseTool]:
    """
    Converts an OpenAI schema into a BaseTool.

    Parameters:
        schema: The OpenAI schema to convert.
        callback: The function to run when the tool is called.

    Returns:
        A BaseTool.
    """
    data_model_types = get_data_model_types(
        DataModelType.PydanticV2BaseModel,
        target_python_version=PythonVersion.PY_37
    )

    parser = JsonSchemaParser(
        json.dumps(schema['parameters']),
        data_model_type=data_model_types.data_model,
        data_model_root_type=data_model_types.root_model,
        data_model_field_type=data_model_types.field_model,
        data_type_manager_type=data_model_types.data_type_manager,
        dump_resolve_reference_action=data_model_types.dump_resolve_reference_action,
        use_schema_description=True,
        validation=False,
        class_name='Model',
        # custom_template_dir=Path('/Users/vrsen/Projects/agency-swarm/agency-swarm/agency_swarm/tools/data_schema_templates')
    )

    result = parser.parse()

    # # Execute the result to extract the model
    exec_globals = {}
    exec(result, exec_globals)
    model = exec_globals.get('Model')

    if not model:
        raise ValueError(f"Could not extract model from schema {schema['name']}")

    class ToolConfig:
        strict: bool = schema.get("strict", False)

    tool = type(schema['name'], (BaseTool, model), {
        "__doc__": schema.get('description', ""),
        "run": callback,
    })

    tool.ToolConfig = ToolConfig

    return tool

from_openapi_schema(schema, headers=None, params=None, strict=False) staticmethod

Converts an OpenAPI schema into a list of BaseTools.

Parameters:

Name Type Description Default
schema Union[str, dict]

The OpenAPI schema to convert.

required
headers Dict[str, str]

The headers to use for requests.

None
params Dict[str, Any]

The parameters to use for requests.

None
strict bool

Whether to use strict OpenAI mode.

False

Returns: A list of BaseTools.

Source code in agency_swarm/tools/ToolFactory.py
@staticmethod
def from_openapi_schema(schema: Union[str, dict], headers: Dict[str, str] = None, params: Dict[str, Any] = None, strict: bool = False) \
        -> List[Type[BaseTool]]:
    """
    Converts an OpenAPI schema into a list of BaseTools.

    Parameters:
        schema: The OpenAPI schema to convert.
        headers: The headers to use for requests.
        params: The parameters to use for requests.
        strict: Whether to use strict OpenAI mode.
    Returns:
        A list of BaseTools.
    """
    if isinstance(schema, dict):
        openapi_spec = schema
        openapi_spec = jsonref.JsonRef.replace_refs(openapi_spec)
    else:
        openapi_spec = jsonref.loads(schema)
    tools = []
    headers = headers or {}
    headers = {k: v for k, v in headers.items() if v is not None}
    for path, methods in openapi_spec["paths"].items():
        for method, spec_with_ref in methods.items():
            async def callback(self):
                url = openapi_spec["servers"][0]["url"] + path
                parameters = self.model_dump().get('parameters', {})
                # replace all parameters in url
                for param, value in parameters.items():
                    if "{" + str(param) + "}" in url:
                        url = url.replace(f"{{{param}}}", str(value))
                        parameters[param] = None
                url = url.rstrip("/")
                parameters = {k: v for k, v in parameters.items() if v is not None}
                parameters = {**parameters, **params} if params else parameters
                async with httpx.AsyncClient(timeout=90) as client:  # Set custom read timeout to 10 seconds
                    if method == "get":
                        response = await client.get(url, params=parameters, headers=headers)
                    elif method == "post":
                        response = await client.post(url,
                                                     params=parameters,
                                                     json=self.model_dump().get('requestBody', None),
                                                     headers=headers)
                    elif method == "put":
                        response = await client.put(url,
                                                    params=parameters,
                                                    json=self.model_dump().get('requestBody', None),
                                                    headers=headers)
                    elif method == "delete":
                        response = await client.delete(url,
                                                       params=parameters,
                                                       json=self.model_dump().get('requestBody', None),
                                                       headers=headers)
                    return response.json()

            # 1. Resolve JSON references.
            spec = jsonref.replace_refs(spec_with_ref)

            # 2. Extract a name for the functions.
            function_name = spec.get("operationId")

            # 3. Extract a description and parameters.
            desc = spec.get("description") or spec.get("summary", "")

            schema = {"type": "object", "properties": {}}

            req_body = (
                spec.get("requestBody", {})
                .get("content", {})
                .get("application/json", {})
                .get("schema")
            )
            if req_body:
                schema["properties"]["requestBody"] = req_body

            spec_params = spec.get("parameters", [])
            if spec_params:
                param_properties = {}
                required_params = []
                for param in spec_params:
                    if "schema" not in param and "type" in param:
                        param["schema"] = {"type": param["type"]}
                    param_properties[param["name"]] = param["schema"]
                    if "description" in param:
                        param_properties[param["name"]]["description"] = param["description"]
                    if "required" in param and param["required"]:
                        required_params.append(param["name"])
                    if "example" in param:
                        param_properties[param["name"]]["example"] = param["example"]
                    if "examples" in param:
                        param_properties[param["name"]]["examples"] = param["examples"]

                schema["properties"]["parameters"] = {
                    "type": "object",
                    "properties": param_properties,
                    "required": required_params
                }

            function = {
                "name": function_name,
                "description": desc,
                "parameters": schema,
                "strict": strict
            }

            tools.append(ToolFactory.from_openai_schema(function, callback))

    return tools

get_openapi_schema(tools, url, title='Agent Tools', description='A collection of tools.') staticmethod

Generates an OpenAPI schema from a list of BaseTools.

Parameters:

Name Type Description Default
tools List[Type[BaseTool]]

BaseTools to generate the schema from.

required
url str

The base URL for the schema.

required
title

The title of the schema.

'Agent Tools'
description

The description of the schema.

'A collection of tools.'

Returns:

Type Description
str

A JSON string representing the OpenAPI schema with all the tools combined as separate endpoints.

Source code in agency_swarm/tools/ToolFactory.py
@staticmethod
def get_openapi_schema(tools: List[Type[BaseTool]], url: str, title="Agent Tools",
                       description="A collection of tools.") -> str:
    """
    Generates an OpenAPI schema from a list of BaseTools.

    Parameters:
        tools: BaseTools to generate the schema from.
        url: The base URL for the schema.
        title: The title of the schema.
        description: The description of the schema.

    Returns:
        A JSON string representing the OpenAPI schema with all the tools combined as separate endpoints.
    """
    schema = {
        "openapi": "3.1.0",
        "info": {
            "title": title,
            "description": description,
            "version": "v1.0.0"
        },
        "servers": [
            {
                "url": url,
            }
        ],
        "paths": {},
        "components": {
            "schemas": {},
            "securitySchemes": {
                "apiKey": {
                    "type": "apiKey"
                }
            }
        },
    }

    for tool in tools:
        if not issubclass(tool, BaseTool):
            continue

        openai_schema = tool.openai_schema
        defs = {}
        if '$defs' in openai_schema['parameters']:
            defs = openai_schema['parameters']['$defs']
            del openai_schema['parameters']['$defs']

        schema['paths']["/" + openai_schema['name']] = {
            "post": {
                "description": openai_schema['description'],
                "operationId": openai_schema['name'],
                "x-openai-isConsequential": False,
                "parameters": [],
                "requestBody": {
                    "content": {
                        "application/json": {
                            "schema": openai_schema['parameters']
                        }
                    }
                },
            }
        }

        schema['components']['schemas'].update(defs)

    schema = json.dumps(schema, indent=2).replace("#/$defs/", "#/components/schemas/")

    return schema