Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This page provides a Python 3 example of connecting to the ThreatConnect TAXII Server as a TAXII client to pull Indicators. This is primarily useful when creating a Integrations - External TAXII Download (Poll) integration.

Example

In order to use this script, you must define the following environment variables:

  • TAXII_USER - Your TAXII username.

  • TAXII_PASS - Your TAXII password.

This example will hit the ThreatConnect Sandbox. If you wish to use this against another instance, modify this script appropriately.

This script will pull the last 24 hours worth of supported Indicators from the first collection that is returned from the Collection Management call.

Code Block
languagepy
"""TAXII Client Example"""
import os
from urllib.parse import urlparse
import datetime
import io
import sys
import libtaxii
from stix.core import STIXPackage


def perform_discovery(taxii_client):
    """Perform discovery against the TAXII endpoint."""
    # Build and execute the Discovery request so that we can find all our service URLs
    discovery_request = libtaxii.messages_11.DiscoveryRequest(
        libtaxii.messages_11.generate_message_id()
    )
    discovery_request_xml = discovery_request.to_xml()
    discovery_http_request = taxii_client.call_taxii_service2(
        'sandbox.threatconnect.com',
        '/api/taxii/discovery',
        libtaxii.VID_TAXII_XML_11,
        discovery_request_xml,
    )
    discovery_http_response = libtaxii.get_message_from_http_response(
        discovery_http_request, discovery_request.message_id
    )

    # Assign our service URLs based on the services we receive back (and print)
    cmurl = None
    pollurl = None
    for service in discovery_http_response.to_dict().get('service_instances', None):
        print(f"Service: {service['service_type']}, Endpoint: {service['service_address']}")
        if service['service_type'] == "COLLECTION_MANAGEMENT":
            cmurl = service['service_address']
        elif service['service_type'] == "POLL":
            pollurl = service['service_address']

    return (cmurl, pollurl)


def get_target_collection(taxii_client, cmurl):
    """Find the first collection from our server."""
    # Build and execute the Collections request
    collections_request = libtaxii.messages_11.CollectionInformationRequest(
        libtaxii.messages_11.generate_message_id()
    )
    collections_request_xml = collections_request.to_xml()
    collections_http_request = taxii_client.call_taxii_service2(
        urlparse(cmurl).netloc,
        urlparse(cmurl).path,
        libtaxii.VID_TAXII_XML_11,
        collections_request_xml,
    )
    collections_http_response = libtaxii.get_message_from_http_response(
        collections_http_request, collections_request.message_id
    )

    # Find the first collection (and print all the collections for our edification)
    target_collection = None
    for collection in collections_http_response.to_dict().get('collection_informations', None):
        print(f"Collection: {collection['collection_name']}")
        if not target_collection:
            # We'll plan to poll the first collection returned to us
            target_collection = collection['collection_name']

    # Did we get a target_collection?
    if not target_collection:
        print("ERROR: No collections were returned by the remote server.", file=sys.stderr)

    return target_collection


def get_content_blocks(taxii_client, pollurl, target_collection):
    """Get all of the content blocks from our server for the given target_collection."""
    # Build and execute the Poll request so that we can get all of the indicators
    # from our selected collection
    # We must supply a beginning and end date. For ThreatConnect, the default time
    # delta between these two is 24 hours.
    end_date = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
    begin_date = (end_date - datetime.timedelta(days=1)).replace(tzinfo=datetime.timezone.utc)

    poll_request = libtaxii.messages_11.PollRequest(
        libtaxii.messages_11.generate_message_id(),
        collection_name=target_collection,
        poll_parameters=libtaxii.messages_11.PollParameters(),
        exclusive_begin_timestamp_label=begin_date,
        inclusive_end_timestamp_label=end_date,
    )
    poll_request_xml = poll_request.to_xml()
    poll_http_request = taxii_client.call_taxii_service2(
        urlparse(pollurl).netloc,
        urlparse(pollurl).path,
        libtaxii.VID_TAXII_XML_11,
        poll_request_xml,
    )
    poll_http_response = libtaxii.get_message_from_http_response(
        poll_http_request, poll_request.message_id
    )

    content_blocks = poll_http_response.to_dict().get("content_blocks", None)

    return content_blocks


def main():
    """Main method to perform all of the TAXII calls in this example."""
    # Check our environment variables
    taxii_user = os.environ.get("TAXII_USER")
    taxii_pass = os.environ.get("TAXII_PASS")

    if not taxii_user or not taxii_pass:
        print(
            "ERROR: You must define the TAXII_USER and "
            "TAXII_PASS environment variables for your TAXII credentials.",
            file=sys.stderr,
        )
        return 1

    # Build a TAXII client
    taxii_client = libtaxii.clients.HttpClient(
        use_https=True,
        auth_type=libtaxii.clients.HttpClient.AUTH_BASIC,
        auth_credentials={'username': taxii_user, 'password': taxii_pass},
    )

    # Call discovery
    (cmurl, pollurl) = perform_discovery(taxii_client)

    # Did we actually get a Collections URL?
    if not cmurl:
        print(
            "ERROR: No collection management service was " "found on the remote server.",
            file=sys.stderr,
        )
        return 1

    # Did we actually get a Poll URL?
    if not pollurl:
        print("ERROR: No poll service was found on the remote server.", file=sys.stderr)
        return 1

    # Find a target collection
    target_collection = get_target_collection(taxii_client, cmurl)

    # Get our content blocks
    content_blocks = get_content_blocks(taxii_client, pollurl, target_collection)

    # Did we get any content blocks?
    if not content_blocks:
        print("ERROR: No content was returned by the remote server.", file=sys.stderr)
        return 1

    stix_packages = []
    for content_block in content_blocks:
        if content_block.get("content", None):
            stix_packages.append(
                STIXPackage.from_xml(xml_file=io.BytesIO(content_block['content']))
            )

    # Print the indicators
    for stix_package in stix_packages:
        for indicator in stix_package.indicators:
            print(f"Indicator: {indicator.description}")

    return 0


if __name__ == "__main__":
    sys.exit(main())