Skip to content

Main

genaipot

ensure_files_directory()

Ensure the existence of the 'files' directory.

Source code in bin/genaipot.py
56
57
58
59
def ensure_files_directory():
    """Ensure the existence of the 'files' directory."""
    if not os.path.exists('files'):
        os.makedirs('files')

generate_responses(ai_service, prompt, response_type, description, debug_mode)

Helper function to query AI service and handle responses.

Parameters:

Name Type Description Default
ai_service object

The initialized AI service instance.

required
prompt str

The prompt to send to the AI service.

required
response_type str

The type of response (e.g., 'smtp', 'pop3').

required
description str

A description for logging and display purposes.

required
debug_mode bool

Whether to enable debug mode.

required
Source code in bin/genaipot.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def generate_responses(ai_service, prompt, response_type, description, debug_mode):
    """
    Helper function to query AI service and handle responses.

    Args:
        ai_service (object): The initialized AI service instance.
        prompt (str): The prompt to send to the AI service.
        response_type (str): The type of response (e.g., 'smtp', 'pop3').
        description (str): A description for logging and display purposes.
        debug_mode (bool): Whether to enable debug mode.
    """
    spinner = Halo(text=f'{description}: Generating responses...', spinner='dots')
    spinner.start()
    try:
        response_text = ai_service.query_responses(prompt, response_type)
        cleaned_response = ai_service.cleanup_and_parse_json(response_text)
        ai_service.store_responses(cleaned_response, response_type)
        spinner.succeed(f'{description} responses generated successfully.')
    except Exception as e:
        spinner.fail(f'Failed to generate {description} responses.')
        if debug_mode:
            logging.exception(f"Error generating {description} responses: {e}")

initialize_ai_service(config, args)

Initialize the AI service based on the provider from the configuration.

Source code in bin/genaipot.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def initialize_ai_service(config, args):
    """Initialize the AI service based on the provider from the configuration."""
    ai_provider = config.get('ai', 'provider', fallback='offline')  # 'openai', 'gcp', 'azure', or 'offline'

    if ai_provider == 'openai':
        api_key = os.getenv('OPENAI_API_KEY') or config.get('openai', 'api_key', fallback=None)
        if not api_key:
            logging.error("No OpenAI API key found in environment variables or configuration.")
            return None
        return OpenAIService(api_key=api_key, debug_mode=args.debug)

    elif ai_provider == 'azure':
        api_key = os.getenv('AZURE_API_KEY') or config.get('azure', 'api_key', fallback=None)
        endpoint = config.get('azure', 'endpoint', fallback=None)
        if not api_key or not endpoint:
            logging.error("No Azure API key or endpoint found in environment variables or configuration.")
            return None
        return AzureAIService(azure_openai_key=api_key, azure_openai_endpoint=endpoint, debug_mode=args.debug)

    elif ai_provider == 'gcp':
        # Handle GCP credentials as needed
        api_key = os.getenv('GCP_API_KEY') or config.get('gcp', 'api_key', fallback=None)
        project = config.get('gcp', 'project', fallback=None)
        location = config.get('gcp', 'location', fallback=None)
        model_id = config.get('gcp', 'model_id', fallback=None)
        if not api_key or not project or not location or not model_id:
            logging.error("Incomplete GCP configuration.")
            return None
        return GCPService(gcp_project=project, gcp_location=location, gcp_model_id=model_id, debug_mode=args.debug)

    elif ai_provider == 'offline':
        print("Using offline mode with pre-existing templates.")
        return None  # No AI service is used in offline mode

    else:
        print("Invalid AI provider specified in config. Exiting.")
        sys.exit(1)

initialize_logging(debug_mode)

Initialize logging configuration.

Parameters:

Name Type Description Default
debug_mode bool

Whether to enable debug mode.

required

Returns:

Name Type Description
logger

Configured logger instance.

Source code in bin/genaipot.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def initialize_logging(debug_mode):
    """
    Initialize logging configuration.

    Args:
        debug_mode (bool): Whether to enable debug mode.

    Returns:
        logger: Configured logger instance.
    """
    log_level = logging.DEBUG if debug_mode else logging.INFO
    logging.basicConfig(level=log_level, format='%(asctime)s %(levelname)s: %(message)s')
    logger = logging.getLogger(__name__)
    logger.setLevel(log_level)
    return logger

main()

Main function to run the GenAIPot honeypot services.

Source code in bin/genaipot.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
def main():
    """
    Main function to run the GenAIPot honeypot services.
    """
    # Initialize argument parser
    parser = argparse.ArgumentParser(description="GenAIPot - A honeypot simulation tool")
    parser.add_argument('--config', action='store_true', help='Configure the honeypot with AI-generated responses')
    parser.add_argument('--docker', action='store_true', help='Use default config for Docker deployment')
    parser.add_argument('--smtp', action='store_true', help='Start SMTP honeypot')
    parser.add_argument('--pop3', action='store_true', help='Start POP3 honeypot')
    parser.add_argument('--all', action='store_true', help='Start all honeypots')
    parser.add_argument('--debug', action='store_true', help='Enable debug mode')
    args = parser.parse_args()

    # Initialize logging
    logger = initialize_logging(args.debug)

    # Always print logo and version information
    art_text = art.text2art("Gen.A.I.Pot")
    print(art_text)
    print(f"Version: {VERSION}")
    print("The first Generative A.I Honeypot")

    # If no arguments are provided, show the help menu
    if len(sys.argv) == 1:
        parser.print_help()
        sys.exit(0)

    ensure_files_directory()

    # Read configuration
    config, prompts, config_file_path = read_configuration(args)

    # Set up the database
    setup_database()

    # If --config or --docker is specified, run the configuration wizard
    if args.config or args.docker:
        run_config_wizard(args, config, config_file_path)
        # Re-read the config after configuration
        config.read(config_file_path)
        # No need to proceed further after configuration
        return

    # Initialize the AI service
    ai_service = initialize_ai_service(config, args)

    # If AI service is initialized, and 'offline' mode is not selected, generate responses
    if ai_service and (args.config or args.docker):
        # Generate AI responses if necessary
        query_ai_service_for_responses(config, prompts, ai_service, args.debug)

    # If SMTP, POP3, or both services are selected, start them
    if args.smtp or args.pop3 or args.all:
        try:
            logger.info(f"Starting GenAIPot Version {VERSION}")
            # Read server configuration
            server_config = config['server']
            ipv4_address = server_config.get('ipv4', '0.0.0.0')  # Default to all IPv4 interfaces
            ipv6_address = server_config.get('ipv6addr', fallback='::')  # Corrected key name
            use_ipv6 = server_config.getint('use_ipv6', fallback=0)  # Corrected key name

            smtp_port = server_config.getint('smtp_port', 25)
            pop3_port = server_config.getint('pop3_port', 110)

            if args.debug:
                start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                logger.debug(f"Start Time: {start_time}")
                logger.debug(f"Configured IPv4 Address: {ipv4_address}")
                logger.debug(f"Configured IPv6 Address: {ipv6_address}")
                logger.debug(f"Listening Ports: {', '.join([str(smtp_port), str(pop3_port)])}")
                logger.debug(f"SQLite Logging Enabled: {config.getboolean('logging', 'sqlite', fallback=True)}")
                logger.debug(f"Server Technology: {config.get('server', 'technology', fallback='generic')}")
                logger.debug(f"Domain Name: {config.get('server', 'domain', fallback='localhost')}")
                logging.getLogger('urllib3').setLevel(logging.DEBUG)

            # Start SMTP service
            if args.smtp or args.all:
                smtp_factory = SMTPFactory()

                if use_ipv6:
                    # Bind to IPv6 for SMTP
                    try:
                        ipv6_endpoint = TCP6ServerEndpoint(reactor, smtp_port, interface=ipv6_address)
                        ipv6_endpoint.listen(smtp_factory)
                        logger.info(f"SMTP honeypot started on IPv6: {ipv6_address}:{smtp_port}")
                        print(f"Started SMTP on IPv6: {ipv6_address}:{smtp_port}")
                    except Exception as e:
                        logger.warning(f"IPv6 binding for SMTP failed: {e}")
                        print(f"SMTP IPv6 binding failed: {e}")
                else:
                    # Bind to IPv4 for SMTP
                    try:
                        ipv4_endpoint = TCP4ServerEndpoint(reactor, smtp_port, interface=ipv4_address)
                        ipv4_endpoint.listen(smtp_factory)
                        logger.info(f"SMTP honeypot started on IPv4: {ipv4_address}:{smtp_port}")
                        print(f"Started SMTP on IPv4: {ipv4_address}:{smtp_port}")
                    except Exception as e:
                        logger.error(f"Failed to bind SMTP to IPv4: {e}")
                        print(f"Failed to start SMTP on IPv4: {e}")

            # Start POP3 service
            if args.pop3 or args.all:
                pop3_factory = POP3Factory(debug=args.debug)

                if use_ipv6:
                    # Bind to IPv6 for POP3
                    try:
                        ipv6_endpoint = TCP6ServerEndpoint(reactor, pop3_port, interface=ipv6_address)
                        ipv6_endpoint.listen(pop3_factory)
                        logger.info(f"POP3 honeypot started on IPv6: {ipv6_address}:{pop3_port}")
                        print(f"Started POP3 on IPv6: {ipv6_address}:{pop3_port}")
                    except Exception as e:
                        logger.warning(f"IPv6 binding for POP3 failed: {e}")
                        print(f"POP3 IPv6 binding failed: {e}")
                else:
                    # Bind to IPv4 for POP3
                    try:
                        ipv4_endpoint = TCP4ServerEndpoint(reactor, pop3_port, interface=ipv4_address)
                        ipv4_endpoint.listen(pop3_factory)
                        logger.info(f"POP3 honeypot started on IPv4: {ipv4_address}:{pop3_port}")
                        print(f"Started POP3 on IPv4: {ipv4_address}:{pop3_port}")
                    except Exception as e:
                        logger.error(f"Failed to bind POP3 to IPv4: {e}")
                        print(f"Failed to start POP3 on IPv4: {e}")

            logger.info("Reactor is running...")
            reactor.run()

        except Exception as e:
            logger.exception(f"Failed to start honeypot: {e}")
    else:
        # If no specific honeypot service is selected, show the help menu
        parser.print_help()

query_ai_service_for_responses(config, prompts, ai_service, debug_mode)

Query the AI service for SMTP and POP3 responses and sample emails.

Parameters:

Name Type Description Default
config ConfigParser

The configuration object.

required
prompts ConfigParser

The prompts configuration object.

required
ai_service object

The initialized AI service instance.

required
debug_mode bool

Whether to enable debug mode.

required
Source code in bin/genaipot.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def query_ai_service_for_responses(config, prompts, ai_service, debug_mode):
    """
    Query the AI service for SMTP and POP3 responses and sample emails.

    Args:
        config (ConfigParser): The configuration object.
        prompts (ConfigParser): The prompts configuration object.
        ai_service (object): The initialized AI service instance.
        debug_mode (bool): Whether to enable debug mode.
    """
    technology = config.get('server', 'technology', fallback='generic')
    segment = config.get('server', 'segment', fallback='general')
    domain = config.get('server', 'domain', fallback='localhost')
    anonymous_access = config.getboolean('server', 'anonymous_access', fallback=False)

    # Prepare prompts
    smtp_prompt = prompts.get('Prompts', 'smtp_prompt').format(technology=technology)
    pop3_prompt = prompts.get('Prompts', 'pop3_prompt').format(technology=technology)
    email_prompts = [
        prompts.get('Prompts', 'client_email_prompt').format(segment=segment, domain=domain),
        prompts.get('Prompts', 'supplier_email_prompt').format(segment=segment, domain=domain),
        prompts.get('Prompts', 'internal_email_prompt').format(segment=segment, domain=domain)
    ]

    # Query SMTP responses
    generate_responses(ai_service, smtp_prompt, 'smtp', 'SMTP', debug_mode)

    # Query POP3 responses
    generate_responses(ai_service, pop3_prompt, 'pop3', 'POP3', debug_mode)

    # Query sample emails
    for idx, email_prompt in enumerate(email_prompts, start=1):
        response_type = f'email_{idx}'
        description = f'Sample Email #{idx}'
        spinner = Halo(text=f'{description}: Generating responses...', spinner='dots')
        spinner.start()
        try:
            response_text = ai_service.query_responses(email_prompt, response_type)
            cleaned_response = ai_service.cleanup_and_parse_json(response_text)
            ai_service.save_email_responses(cleaned_response, response_type)
            spinner.succeed(f'{description} generated successfully.')
        except Exception as e:
            spinner.fail(f'Failed to generate {description}.')
            if debug_mode:
                logging.exception(f"Error generating {description}: {e}")

read_configuration(args)

Read the configuration files. If the configuration file does not exist, prompt the user to start the wizard.

Returns:

Name Type Description
tuple

A tuple containing the main config, prompts config, and config file path.

Source code in bin/genaipot.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def read_configuration(args):
    """
    Read the configuration files. If the configuration file does not exist, prompt the user to start the wizard.

    Returns:
        tuple: A tuple containing the main config, prompts config, and config file path.
    """
    config_file_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'etc', 'config.ini'))
    config = configparser.ConfigParser()

    if not os.path.exists(config_file_path):
        print("Configuration file not found. Starting configuration wizard...")
        from config_wizard import run_config_wizard  # Import here to avoid circular dependency
        run_config_wizard(args, config, config_file_path)
        config.read(config_file_path)  # Re-read the file after the wizard creates it

    try:
        config.read(config_file_path)
        # Validate numeric fields
        smtp_port = config.getint('server', 'smtp_port')
        pop3_port = config.getint('server', 'pop3_port')
    except ValueError as e:
        raise ValueError(f"Configuration error in 'server' section: {e}")
    except configparser.NoSectionError as e:
        raise configparser.NoSectionError(f"Missing section in configuration: {e}")

    prompts_config_file_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'etc', 'prompts.ini'))
    prompts = configparser.ConfigParser()
    prompts.read(prompts_config_file_path)

    return config, prompts, config_file_path

validate_bind_address(address, is_ipv6=False)

Validate the binding address for IPv4 or IPv6.

Parameters:

Name Type Description Default
address str

The IP address to validate.

required
is_ipv6 bool

Whether to validate for IPv6.

False

Returns:

Name Type Description
bool

True if the address can be bound, False otherwise.

Source code in bin/genaipot.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def validate_bind_address(address, is_ipv6=False):
    """
    Validate the binding address for IPv4 or IPv6.

    Args:
        address (str): The IP address to validate.
        is_ipv6 (bool): Whether to validate for IPv6.

    Returns:
        bool: True if the address can be bound, False otherwise.
    """
    family = socket.AF_INET6 if is_ipv6 else socket.AF_INET
    try:
        with socket.socket(family, socket.SOCK_STREAM) as s:
            s.bind((address, 0))  # Bind to a test port (0 will find an available port)
        return True
    except OSError:
        return False