Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagepy
"""TAXII Client Example"""
import os
from urllib.parse import urlparse
import datetime
import io
import sys
import libtaxii
from stix.core import STIXPackage


def perform_discovery(taxii_client):
    """Perform discovery against the TAXII endpoint.

    Arguments:
        taxii_client (libtaxii.clients.HttpClient): The configured TAXII client instance.

    Returns:
        (cmurl, pollurl) - A tuple containing the Collection Management and Poll URLs.
    """
    # Build and execute the Discovery request so that we can find all our service URLs
    discovery_request = libtaxii.messages_11.DiscoveryRequest(
        libtaxii.messages_11.generate_message_id()
    )
    discovery_request_xml = discovery_request.to_xml()
    discovery_http_request = taxii_client.call_taxii_service2(
        'sandbox.threatconnect.com',
        '/api/taxii/discovery',
        libtaxii.VID_TAXII_XML_11,
        discovery_request_xml,
    )
    discovery_http_response = libtaxii.get_message_from_http_response(
        discovery_http_request, discovery_request.message_id
    )

    # Assign our service URLs based on the services we receive back (and print)
    cmurl = None
    pollurl = None
    for service in discovery_http_response.to_dict().get('service_instances', None[]):
        print(f"Service: {service[.get('service_type'])}, Endpoint: {service[.get('service_address'])}")
        if service[.get('service_type']) == "COLLECTION_MANAGEMENT":
            cmurl = service[.get('service_address'])
        elif service[.get('service_type']) == "POLL":
            pollurl = service[.get('service_address'])

    return (cmurl, pollurl)


def get_target_collection(taxii_client, cmurl):
    """Find the first collection from our server.

    Arguments:
        taxii_client (libtaxii.clients.HttpClient): The configured TAXII client instance.
        cmurl (str): The full URL (as returned by a Discovery Request) for Collection Management.

    Returns:
        (str): The name of the first Collection returned in the Collection request.
    """
    # Build and execute the Collections request
    collections_request = libtaxii.messages_11.CollectionInformationRequest(
        libtaxii.messages_11.generate_message_id()
    )
    collections_request_xml = collections_request.to_xml()
    collections_http_request = taxii_client.call_taxii_service2(
        urlparse(cmurl).netloc,
        urlparse(cmurl).path,
        libtaxii.VID_TAXII_XML_11,
        collections_request_xml,
    )
    collections_http_response = libtaxii.get_message_from_http_response(
        collections_http_request, collections_request.message_id
    )

    # Find the first collection (and print all the collections for our edification)
    target_collection = None
    for collection in collections_http_response.to_dict().get('collection_informations', None[]):
        print(f"Collection: {collection[.get('collection_name'])}")
        if not target_collection:
            # We'll plan to poll the first collection returned to us
            target_collection = collection[.get('collection_name'])

    # Did we get a target_collection?
    if not target_collection:
        print("ERROR: No collections were returned by the remote server.", file=sys.stderr)

    return target_collection


def get_content_blocks(taxii_client, pollurl, target_collection):
    """Get all of the content blocks from our server for the given target_collection.

    Arguments:
        taxii_client (libtaxii.clients.HttpClient): The configured TAXII client instance.
        pollurl (str): The URL for polling.
        target_collection (str): The name of the Collection that should be polled.

    Returns:
        (list): A list of the raw content blocks to be parsed.
    """
    # Build and execute the Poll request so that we can get all of the indicators
    # from our selected collection
    # We must supply a beginning and end date. For ThreatConnect, the default time
    # delta between these two is 24 hours.
    end_date = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
    begin_date = (end_date - datetime.timedelta(days=1)).replace(tzinfo=datetime.timezone.utc)

    poll_request = libtaxii.messages_11.PollRequest(
        libtaxii.messages_11.generate_message_id(),
        collection_name=target_collection,
        poll_parameters=libtaxii.messages_11.PollParameters(),
        exclusive_begin_timestamp_label=begin_date,
        inclusive_end_timestamp_label=end_date,
    )
    poll_request_xml = poll_request.to_xml()
    poll_http_request = taxii_client.call_taxii_service2(
        urlparse(pollurl).netloc,
        urlparse(pollurl).path,
        libtaxii.VID_TAXII_XML_11,
        poll_request_xml,
    )
    poll_http_response = libtaxii.get_message_from_http_response(
        poll_http_request, poll_request.message_id
    )

    content_blocks = poll_http_response.to_dict().get("content_blocks", None)

    return content_blocks


def main():
    """Main method to perform all of the TAXII calls in this example."""
    # Check our environment variables
    taxii_user = os.environ.get("TAXII_USER")
    taxii_pass = os.environ.get("TAXII_PASS")

    if not taxii_user or not taxii_pass:
        print(
            "ERROR: You must define the TAXII_USER and "
            "TAXII_PASS environment variables for your TAXII credentials.",
            file=sys.stderr,
        )
        return 1

    # Build a TAXII client
    taxii_client = libtaxii.clients.HttpClient(
        use_https=True,
        auth_type=libtaxii.clients.HttpClient.AUTH_BASIC,
        auth_credentials={'username': taxii_user, 'password': taxii_pass},
    )

    # Call discovery
    (cmurl, pollurl) = perform_discovery(taxii_client)

    # Did we actually get a Collections URL?
    if not cmurl:
        print(
            "ERROR: No collection management service was " "found on the remote server.",
            file=sys.stderr,
        )
        return 1

    # Did we actually get a Poll URL?
    if not pollurl:
        print("ERROR: No poll service was found on the remote server.", file=sys.stderr)
        return 1

    # Find a target collection
    target_collection = get_target_collection(taxii_client, cmurl)

    # Get our content blocks
    content_blocks = get_content_blocks(taxii_client, pollurl, target_collection)

    # Did we get any content blocks?
    if not content_blocks:
        print("ERROR: No content was returned by the remote server.", file=sys.stderr)
        return 1

    stix_packages = []
    for content_block in content_blocks:
        if content_block.get("content", None):
            stix_packages.append(
                STIXPackage.from_xml(xml_file=io.BytesIO(content_block[.get('content'])))
            )

    # Print the indicators
    for stix_package in stix_packages:
        for indicator in stix_package.indicators:
            print(f"Indicator: {indicator.description}")

    return 0


if __name__ == "__main__":
    sys.exit(main())