from __future__ import unicode_literals
import logging
import time
from datetime import datetime
from dateutil import parser as date_parser
import inspect
import six
import pymisp
import requests
from .otx import OTXv2
try:
basestring
except NameError:
basestring = str
__version__ = "1.4.2"
# Try to disable verify SSL warnings
try:
requests.packages.urllib3.disable_warnings()
except:
pass
# Get the log handler
log = logging.getLogger('otx_misp')
[docs]class ImportException(Exception):
pass
# MISP instance version
_misp_server_version = None
[docs]def misp_server_version(misp):
"""
Retrieve the MISP instance version
:param misp: MISP connection object
:type misp: :class:`pymisp.PyMISP`
:return: MISP instance version as string
"""
global _misp_server_version
if _misp_server_version is None:
version = misp.get_version()
_misp_server_version = version['version']
return _misp_server_version
[docs]def tag_event(misp, event, tag):
"""
Add a tag to a MISP event
:param misp: MISP connection object
:type misp: :class:`pymisp.PyMISP`
:param event: a MISP event
:param tag: tag to add
:return: None
"""
if not hasattr(misp, '_otx_tags_cache'):
misp._otx_tags_cache = misp.get_all_tags()['Tag']
for exist_tag in misp._otx_tags_cache:
if exist_tag['name'] == tag:
tag_id = exist_tag['id']
if 'EventTag' in event:
for evt_tag in event['EventTag']:
if tag_id == evt_tag['id']:
log.info("\t - Tag already exists. Skipping:".format(tag))
return
break
if hasattr(misp, 'tag'):
version = misp_server_version(misp).split('.')
tag_version = '2.4.69'.split('.')
for a, b in zip(version, tag_version):
if a == b:
continue
elif a > b:
continue
else: # a < b
misp.add_tag(event, tag)
return
misp.tag(event['Event']['uuid'], tag)
else:
misp.add_tag(event, tag)
[docs]def get_pulses(otx_api_key, from_timestamp=None):
"""
Get the Pulses from Alienvault OTX
:param otx_api_key: Alienvault OTX API key
:type otx_api_key: string
:param from_timestamp: only download Pulses after this date/time (None for all Pulses)
:type from_timestamp: :class:`datetime.datetime` or ISO string or Unix timestamp
:return: a list of Pulses (dict)
"""
otx = OTXv2(otx_api_key)
if from_timestamp is None:
log.debug("Retrieving all Pulses (no timestamp)")
pulses = otx.getall()
elif isinstance(from_timestamp, int):
dt = datetime.fromtimestamp(from_timestamp)
from_timestamp = dt.isoformat()
pulses = otx.getsince(from_timestamp)
elif isinstance(from_timestamp, datetime):
from_timestamp = from_timestamp.isoformat()
pulses = otx.getsince(from_timestamp)
elif isinstance(from_timestamp, basestring):
pulses = otx.getsince(from_timestamp)
else:
raise ValueError("'from_timestamp' must be 'None', a datetime object or an ISO date string")
return pulses
[docs]def get_pulses_iter(otx_api_key, from_timestamp=None):
"""
Get the Pulses from Alienvault OTX and returns a generator
:param otx_api_key: Alienvault OTX API key
:type otx_api_key: string
:param from_timestamp: only download Pulses after this date/time (None for all Pulses)
:type from_timestamp: :class:`datetime.datetime` or ISO string or Unix timestamp
:return: a generator of Pulses (dict)
"""
otx = OTXv2(otx_api_key)
if from_timestamp is None:
log.debug("Retrieving all Pulses (no timestamp)")
return otx.getall()
elif isinstance(from_timestamp, int):
dt = datetime.fromtimestamp(from_timestamp)
from_timestamp = dt.isoformat()
elif isinstance(from_timestamp, datetime):
from_timestamp = from_timestamp.isoformat()
elif not isinstance(from_timestamp, basestring):
raise ValueError("'from_timestamp' must be 'None', a datetime object or an ISO date string")
return otx.getsince_iter(from_timestamp)
[docs]def create_events(pulse_or_list, author=False, server=False, key=False, misp=False, distribution=0, threat_level=4,
analysis=2, publish=True, tlp=True, discover_tags=False, to_ids=False, author_tag=False,
bulk_tag=None, dedup_titles=False, stop_on_error=False):
"""
Parse a Pulse or a list of Pulses and add it/them to MISP if server and key are present
:param pulse_or_list: a Pulse or list of Pulses as returned by `get_pulses`
:param author: Prepend the author to the Pulse name
:type author: Boolean
:param server: MISP server URL
:param key: MISP API key
:param misp: MISP connection object
:type misp: :class:`pymisp.PyMISP`
:param distribution: distribution of the MISP event (0-4)
:param threat_level: threat level of the MISP object (1-4)
:param analysis: analysis stae of the MISP object (0-2)
:param publish: Is the MISP event should be published?
:type publish: Boolean
:param tlp: Add TLP level tag to event
:type tlp: Boolean
:param discover_tags: discover MISP tags from Pulse tags
:type discover_tags: Boolean
:param to_ids: Flag pulse attributes as being sent to an IDS
:type to_ids: Boolean
:param author_tag: Add the pulse author as an event tag
:type author_tag: Boolean
:param bulk_tag: A tag that will be added to all events for categorization (e.g. OTX)
:type bulk_tag: String
:param dedup_titles: Search MISP for an existing event title and update it, rather than create a new one
:type dedup_titles: Boolean
:return: a dict or a list of dict with the selected attributes
"""
if not misp and (server and key):
log.debug("Connection to MISP instance: {}".format(server))
try:
misp = pymisp.PyMISP(server, key, ssl=False, out_type='json')
except pymisp.PyMISPError as ex:
raise ImportException("Cannot connect to MISP instance: {}".format(ex.message))
except Exception as ex:
raise ImportException("Cannot connect to MISP instance, unknown exception: {}".format(ex.message))
if discover_tags:
def get_tag_name(complete):
parts = complete.split('=')
if not len(parts):
return complete
last = parts[-1]
if not len(last):
return complete
if last[0] == '"':
last = last[1:]
if last[-1] == '"':
last = last[:-1]
return last.lower()
raw_tags = misp.get_all_tags()
tags = dict()
for tag in raw_tags['Tag']:
tags[get_tag_name(tag['name'])] = tag['name']
misp.discovered_tags = tags
if isinstance(pulse_or_list, (list, tuple)) or inspect.isgenerator(pulse_or_list):
misp_events = []
for pulse in pulse_or_list:
try:
misp_event = create_events(pulse, author=author, server=server, key=key, misp=misp,
distribution=distribution, threat_level=threat_level, analysis=analysis,
publish=publish, tlp=tlp, to_ids=to_ids, author_tag=author_tag,
bulk_tag=bulk_tag, dedup_titles=dedup_titles, stop_on_error=stop_on_error)
misp_events.append(misp_event)
except Exception as ex:
if stop_on_error:
raise
name = ''
if pulse and 'name' in pulse:
name = pulse['name']
log.error("Cannot import pulse {}: {}".format(name, ex))
return misp_events
pulse = pulse_or_list
if author:
event_name = pulse['author_name'] + ' | ' + pulse['name']
else:
event_name = pulse['name']
try:
dt = date_parser.parse(pulse['created'])
except (ValueError, OverflowError):
log.error("Cannot parse Pulse 'created' date.")
dt = datetime.utcnow()
event_date = dt.strftime('%Y-%m-%d')
log.info("## {name} - {date}".format(name=event_name, date=event_date))
result_event = {
'name': event_name,
'date': event_date,
'tags': list(),
'attributes': {
'hashes': {
'md5': list(),
'sha1': list(),
'sha256': list(),
'imphash': list(),
'pehash': list()
},
'hostnames': list(),
'domains': list(),
'urls': list(),
'ips': list(),
'emails': list(),
'mutexes': list(),
'references': list(),
'cves': list(),
'filenames': list(),
'yara': list()
},
}
if misp:
if not dedup_titles:
event = misp.new_event(distribution, threat_level, analysis, event_name, date=event_date, published=publish)
else:
event = ''
# Check if username is added to title
# Build the title
if author:
event_name = pulse['author_name'] + ' | ' + pulse['name']
else:
event_name = pulse['name']
# Search MISP for the title
result = misp.search_index(eventinfo=event_name)
if 'message' in result:
if result['message'] == "No matches.":
event = misp.new_event(distribution, threat_level, analysis, event_name, date=event_date,
published=publish)
else:
for evt in result['response']:
# If it exists, set 'event' to the event
if evt['info'] == event_name:
if 'SharingGroup' in evt:
del evt['SharingGroup'] # This deletes the SharingGroup from the list, thx SparkyNZL
event = {'Event': evt}
break
if event == '':
# Event not found, even though search results were returned
# Build new event
event = misp.new_event(distribution, threat_level, analysis, event_name, date=event_date,
published=publish)
time.sleep(0.2)
if tlp:
tag = None
if 'TLP' in pulse:
tag = "tlp:{}".format(pulse['TLP'])
elif 'tlp' in pulse:
tag = "tlp:{}".format(pulse['tlp'])
if tag is not None:
log.info("\t - Adding tag: {}".format(tag))
tag_event(misp, event, tag)
result_event['tags'].append(tag)
if author_tag:
tag_event(misp, event, pulse['author_name'])
if bulk_tag is not None:
tag_event(misp, event, bulk_tag)
if misp and hasattr(misp, 'discovered_tags') and 'tags' in pulse:
for pulse_tag in pulse['tags']:
if pulse_tag.lower() in misp.discovered_tags:
tag = misp.discovered_tags[pulse_tag.lower()]
log.info("\t - Adding tag: {}".format(tag))
tag_event(misp, event, tag)
result_event['tags'].append(tag)
if 'references' in pulse:
for reference in pulse['references']:
if reference:
log.info("\t - Adding external analysis link: {}".format(reference))
if misp:
misp.add_named_attribute(event, 'link', reference, category='External analysis')
result_event['attributes']['references'].append(reference)
if misp and 'description' in pulse and isinstance(pulse['description'], six.text_type) and pulse['description']:
log.info("\t - Adding external analysis comment")
misp.add_named_attribute(event, 'comment', pulse['description'], category='External analysis')
for ind in pulse['indicators']:
ind_type = ind['type']
ind_val = ind['indicator']
ind_kwargs = {'to_ids': to_ids}
if 'description' in ind and isinstance(ind['description'], six.text_type) and ind['description']:
ind_kwargs['comment'] = ind['description']
if ind_type == 'FileHash-SHA256':
log.info("\t - Adding SHA256 hash: {}".format(ind_val))
if misp:
misp.add_hashes(event, sha256=ind_val, **ind_kwargs)
result_event['attributes']['hashes']['sha256'].append(ind_val)
elif ind_type == 'FileHash-SHA1':
log.info("\t - Adding SHA1 hash: {}".format(ind_val))
if misp:
misp.add_hashes(event, sha1=ind_val, **ind_kwargs)
result_event['attributes']['hashes']['sha1'].append(ind_val)
elif ind_type == 'FileHash-MD5':
log.info("\t - Adding MD5 hash: {}".format(ind_val))
if misp:
misp.add_hashes(event, md5=ind_val, **ind_kwargs)
result_event['attributes']['hashes']['md5'].append(ind_val)
elif ind_type == 'URI' or ind_type == 'URL':
log.info("\t - Adding URL: {}".format(ind_val))
if misp:
misp.add_url(event, ind_val, **ind_kwargs)
result_event['attributes']['urls'].append(ind_val)
elif ind_type == 'domain':
log.info("\t - Adding domain: {}".format(ind_val))
if misp:
misp.add_domain(event, ind_val, **ind_kwargs)
result_event['attributes']['domains'].append(ind_val)
elif ind_type == 'hostname':
log.info("\t - Adding hostname: {}".format(ind_val))
if misp:
misp.add_hostname(event, ind_val, **ind_kwargs)
result_event['attributes']['hostnames'].append(ind_val)
elif ind_type == 'IPv4' or ind_type == 'IPv6':
log.info("\t - Adding ip: {}".format(ind_val))
if misp:
misp.add_ipdst(event, ind_val, **ind_kwargs)
result_event['attributes']['ips'].append(ind_val)
elif ind_type == 'email':
log.info("\t - Adding email: {}".format(ind_val))
if misp:
misp.add_email_dst(event, ind_val, **ind_kwargs)
result_event['attributes']['emails'].append(ind_val)
elif ind_type == 'Mutex':
log.info("\t - Adding mutex: {}".format(ind_val))
if misp:
misp.add_mutex(event, ind_val, **ind_kwargs)
result_event['attributes']['mutexes'].append(ind_val)
elif ind_type == 'CVE':
log.info("\t - Adding CVE: {}".format(ind_val))
if misp:
misp.add_named_attribute(event, 'vulnerability', ind_val, category='External analysis', **ind_kwargs)
result_event['attributes']['cves'].append(ind_val)
elif ind_type == 'FileHash-IMPHASH':
log.info("\t - Adding IMPHASH hash: {}".format(ind_val))
if misp:
misp.add_named_attribute(event, 'imphash', ind_val, category='Artifacts dropped', **ind_kwargs)
result_event['attributes']['hashes']['imphash'].append(ind_val)
elif ind_type == 'FileHash-PEHASH':
log.info("\t - Adding PEHASH hash: {}".format(ind_val))
if misp:
misp.add_named_attribute(event, 'pehash', ind_val, category='Artifacts dropped', **ind_kwargs)
result_event['attributes']['hashes']['pehash'].append(ind_val)
elif ind_type == 'FilePath':
log.info("\t - Adding filename: {}".format(ind_val))
if misp:
misp.add_filename(event, ind_val, category='Artifacts dropped', **ind_kwargs)
result_event['attributes']['filenames'].append(ind_val)
elif ind_type == 'YARA':
ind_title = ind.get('title', ind_val)
ind_desc = ind.get('description', '')
if ind_title == '':
ind_title = ind_val
if not ind_desc == '':
ind_kwargs['comment'] = ind_desc
else:
ind_kwargs['comment'] = "{} {}".format(ind_title, ind_desc)
ind_val = ind.get('content', None)
if ind_val is None or ind_val == "":
log.warning("YARA indicator is empty: %s" % ind_title)
continue
log.info("\t - Adding YARA rule: {}".format(ind_title))
if misp:
misp.add_yara(event, ind_val, category='Artifacts dropped', **ind_kwargs)
result_event['attributes']['yara'].append({'title': ind_title, 'content': ind_val})
else:
log.warning("Unsupported indicator type: %s" % ind_type)
if misp and publish:
event['Event']['published'] = False
misp.publish(event)
return result_event