Skip to content

Commit

Permalink
[Google Threat Intelligence] Private url scanning (#38457) (#38612)
Browse files Browse the repository at this point in the history
* feat(GTI): Private URL scanning

* Add tests

* Add tests

* Update yml

* Generate docs

* Update release note

* Fix

* Fix pre-commit

* Update docker image

* Fix

* Fix

Co-authored-by: Pablo Pérez <[email protected]>
  • Loading branch information
content-bot and pabloperezj authored Feb 12, 2025
1 parent a62c8ff commit 108c951
Show file tree
Hide file tree
Showing 8 changed files with 1,183 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,17 @@ def url(self, url: str, relationships: str = ''):
ok_codes=(404, 429, 200)
)

def private_url(self, url: str):
"""
See Also:
https://gtidocs.virustotal.com/reference/get-a-private-url-analysis-report
"""
return self._http_request(
'GET',
f'private/urls/{encode_url_to_base64(url)}',
ok_codes=(404, 429, 200)
)

def domain(self, domain: str, relationships: str = '') -> dict:
"""
See Also:
Expand Down Expand Up @@ -362,7 +373,7 @@ def private_file_scan(self, file_path: str) -> dict:
resp_type='response'
)
demisto.debug(
f'scan_file response:\n'
f'scan_private_file response:\n'
f'{str(response.status_code)=}, {str(response.headers)=}, {str(response.content)}'
)
return response.json()
Expand Down Expand Up @@ -398,6 +409,17 @@ def url_scan(self, url: str) -> dict:
data={'url': url}
)

def private_url_scan(self, url: str) -> dict:
"""
See Also:
https://gtidocs.virustotal.com/reference/private-scan-url
"""
return self._http_request(
'POST',
'/private/urls',
data={'url': url}
)

# endregion

def file_sandbox_report(self, file_hash: dict, limit: int) -> dict:
Expand Down Expand Up @@ -454,14 +476,14 @@ def get_private_analysis(self, analysis_id: str) -> dict:
f'private/analyses/{analysis_id}'
)

def get_private_file_from_analysis(self, analysis_id: str) -> dict:
def get_private_item_from_analysis(self, analysis_id: str) -> dict:
"""
See Also:
https://gtidocs.virustotal.com/reference/analysesidrelationship
"""
return self._http_request(
'GET',
f'private/analyses/{analysis_id}/item?attributes=threat_severity,threat_verdict'
f'private/analyses/{analysis_id}/item'
)

def get_file_sigma_analysis(self, file_hash: str) -> dict:
Expand Down Expand Up @@ -1410,6 +1432,37 @@ def build_url_output(
)


def build_private_url_output(url: str, raw_response: dict) -> CommandResults:
data = raw_response.get('data', {})
attributes = data.get('attributes', {})

last_analysis_stats = attributes.get('last_analysis_stats', {})
positive_detections = last_analysis_stats.get('malicious', 0)
detection_engines = sum(last_analysis_stats.values())

return CommandResults(
outputs_prefix=f'{INTEGRATION_ENTRY_CONTEXT}.URL',
outputs_key_field='id',
readable_output=tableToMarkdown(
f'URL data of "{url}"',
{
**attributes,
'positives': f'{positive_detections}/{detection_engines}',
},
headers=[
'url',
'title',
'last_http_response_content_sha256',
'positives',
],
removeNull=True,
headerTransform=string_to_table_header
),
outputs=data,
raw_response=raw_response,
)


def build_ip_output(
client: Client,
score_calculator: ScoreCalculator,
Expand Down Expand Up @@ -1784,7 +1837,7 @@ def private_file_command(client: Client, args: dict) -> List[CommandResults]:
execution_metrics.success += 1
except Exception as exc:
# If anything happens, just keep going
demisto.debug(f'Could not process file: "{file}"\n {str(exc)}')
demisto.debug(f'Could not process private file: "{file}"\n {str(exc)}')
execution_metrics.general_error += 1
results.append(build_error_file_output(client, file))
continue
Expand All @@ -1803,6 +1856,7 @@ def url_command(client: Client, score_calculator: ScoreCalculator, args: dict, r
extended_data = argToBoolean(args.get('extended_data', False))
results: List[CommandResults] = []
execution_metrics = ExecutionMetrics()

for url in urls:
try:
raw_response = client.url(url, relationships)
Expand All @@ -1828,6 +1882,39 @@ def url_command(client: Client, score_calculator: ScoreCalculator, args: dict, r
return results


def private_url_command(client: Client, args: dict) -> List[CommandResults]:
"""
1 API Call
"""
urls = argToList(args['url'])
results: List[CommandResults] = []
execution_metrics = ExecutionMetrics()

for url in urls:
try:
raw_response = client.private_url(url)
if raw_response.get('error', {}).get('code') == 'QuotaExceededError':
execution_metrics.quota_error += 1
results.append(build_quota_exceeded_url_output(client, url))
continue
if raw_response.get('error', {}).get('code') == 'NotFoundError':
results.append(build_unknown_url_output(client, url))
continue
except Exception as exc:
# If anything happens, just keep going
demisto.debug(f'Could not process private URL: "{url}".\n {str(exc)}')
execution_metrics.general_error += 1
results.append(build_error_url_output(client, url))
continue
execution_metrics.success += 1
results.append(build_private_url_output(url, raw_response))
if execution_metrics.is_supported():
_metric_results = execution_metrics.metrics
metric_results = cast(CommandResults, _metric_results)
results.append(metric_results)
return results


def domain_command(client: Client, score_calculator: ScoreCalculator, args: dict, relationships: str) -> List[CommandResults]:
"""
1 API Call for regular
Expand Down Expand Up @@ -2031,7 +2118,6 @@ def file_scan_and_get_analysis(
def private_file_scan_and_get_analysis(client: Client, args: dict):
"""Calls to gti-privatescanning-file-scan and gti-privatescanning-analysis-get."""
interval = int(args.get('interval_in_seconds', 60))
extended = argToBoolean(args.get('extended_data', False))

if not args.get('id'):
command_results = private_file_scan(client, args)
Expand All @@ -2046,7 +2132,6 @@ def private_file_scan_and_get_analysis(client: Client, args: dict):
'entryID': args.get('entryID'),
'id': outputs.get('vtScanID'),
'interval_in_seconds': interval,
'extended_data': extended,
},
timeout_in_seconds=6000,
)
Expand All @@ -2066,7 +2151,6 @@ def private_file_scan_and_get_analysis(client: Client, args: dict):
'entryID': args.get('entryID'),
'id': outputs.get('id'),
'interval_in_seconds': interval,
'extended_data': extended,
},
timeout_in_seconds=6000,
)
Expand Down Expand Up @@ -2122,6 +2206,47 @@ def url_scan_and_get_analysis(
return CommandResults(scheduled_command=scheduled_command)


def private_url_scan_and_get_analysis(client: Client, args: dict):
"""Calls to gti-privatescanning-url-scan and gti-privatescanning-analysis-get."""
interval = int(args.get('interval_in_seconds', 60))

if not args.get('id'):
command_result = private_scan_url_command(client, args)
outputs = command_result.outputs
if not isinstance(outputs, dict):
raise DemistoException('outputs is expected to be a dict')
scheduled_command = ScheduledCommand(
command=f'{COMMAND_PREFIX}-private-url-scan-and-analysis-get',
next_run_in_seconds=interval,
args={
'url': args.get('url'),
'id': outputs.get('vtScanID'),
'interval_in_seconds': interval,
},
timeout_in_seconds=6000,
)
command_result.scheduled_command = scheduled_command
return command_result

command_result = private_get_analysis_command(client, args)
outputs = command_result.outputs
if not isinstance(outputs, dict):
raise DemistoException('outputs is expected to be a dict')
if outputs.get('data', {}).get('attributes', {}).get('status') == 'completed':
return command_result
scheduled_command = ScheduledCommand(
command=f'{COMMAND_PREFIX}-private-url-scan-and-analysis-get',
next_run_in_seconds=interval,
args={
'url': args.get('url'),
'id': outputs.get('id'),
'interval_in_seconds': interval,
},
timeout_in_seconds=6000,
)
return CommandResults(scheduled_command=scheduled_command)


def get_upload_url(client: Client) -> CommandResults:
"""
1 API Call
Expand All @@ -2143,6 +2268,20 @@ def get_upload_url(client: Client) -> CommandResults:


def scan_url_command(client: Client, args: dict) -> CommandResults:
"""
1 API Call
"""
return scan_url(client, args)


def private_scan_url_command(client: Client, args: dict) -> CommandResults:
"""
1 API Call
"""
return scan_url(client, args, True)


def scan_url(client: Client, args: dict, private: bool = False) -> CommandResults:
"""
1 API Call
"""
Expand All @@ -2153,7 +2292,10 @@ def scan_url_command(client: Client, args: dict) -> CommandResults:
headers = ['id', 'url']

try:
raw_response = client.url_scan(url)
if private:
raw_response = client.private_url_scan(url)
else:
raw_response = client.url_scan(url)
data = raw_response['data']

data['url'] = url
Expand Down Expand Up @@ -2507,22 +2649,38 @@ def private_get_analysis_command(client: Client, args: dict) -> CommandResults:
raw_response = client.get_private_analysis(analysis_id)
data = raw_response.get('data', {})
attributes = data.get('attributes', {})
stats = {
'threat_severity_level': '',
'popular_threat_category': '',
'threat_verdict': '',
}

if sha256 := raw_response.get('meta', {}).get('file_info', {}).get('sha256'):
attributes['sha256'] = sha256

if url := raw_response.get('meta', {}).get('url_info', {}).get('url'):
attributes['url'] = url

if attributes.get('status', '') == 'completed':
file_response = client.get_private_file_from_analysis(analysis_id)
file_attributes = file_response.get('data', {}).get('attributes', {})
threat_severity = file_attributes.get('threat_severity', {})
severity_level = threat_severity.get('threat_severity_level', '')
stats['threat_severity_level'] = SEVERITY_LEVELS.get(severity_level, severity_level)
threat_severity_data = threat_severity.get('threat_severity_data', {})
stats['popular_threat_category'] = threat_severity_data.get('popular_threat_category', '')
verdict = file_attributes.get('threat_verdict', '')
stats['threat_verdict'] = VERDICTS.get(verdict, verdict)
attributes.update(stats)
stats = {}
item_response = client.get_private_item_from_analysis(analysis_id)
item_attributes = item_response.get('data', {}).get('attributes', {})

# File attributes
if threat_severity := item_attributes.get('threat_severity'):
if severity_level := threat_severity.get('threat_severity_level'):
stats['threat_severity_level'] = SEVERITY_LEVELS.get(severity_level, severity_level)
if popular_threat_category := threat_severity.get('threat_severity_data', {}).get('popular_threat_category'):
stats['popular_threat_category'] = popular_threat_category
if verdict := item_attributes.get('threat_verdict'):
stats['threat_verdict'] = VERDICTS.get(verdict, verdict)

# URL attributes
if last_analysis_stats := item_attributes.get('last_analysis_stats'):
if detection_engines := sum(last_analysis_stats.values()):
positive_detections = last_analysis_stats.get('malicious', 0)
stats['positives'] = f'{positive_detections}/{detection_engines}'

attributes.update(stats)
for field in ['title', 'last_http_response_content_sha256']:
if value := item_attributes.get(field):
attributes[field] = value

return CommandResults(
f'{INTEGRATION_ENTRY_CONTEXT}.Analysis',
'id',
Expand All @@ -2532,7 +2690,21 @@ def private_get_analysis_command(client: Client, args: dict) -> CommandResults:
**attributes,
'id': analysis_id
},
headers=['id', 'threat_severity_level', 'popular_threat_category', 'threat_verdict', 'status'],
headers=[
# Common headers
'id',
'status',
# File attributes
'sha256'
'threat_severity_level',
'popular_threat_category',
'threat_verdict',
# URL attributes
'url',
'title',
'last_http_response_content_sha256',
'positives',
],
removeNull=True,
headerTransform=string_to_table_header
),
Expand Down Expand Up @@ -2794,6 +2966,10 @@ def main(params: dict, args: dict, command: str):
results = private_file_command(client, args)
elif command == f'{COMMAND_PREFIX}-privatescanning-file-scan':
results = private_file_scan(client, args)
elif command == f'{COMMAND_PREFIX}-privatescanning-url':
results = private_url_command(client, args)
elif command == f'{COMMAND_PREFIX}-privatescanning-url-scan':
results = private_scan_url_command(client, args)
elif command == f'{COMMAND_PREFIX}-privatescanning-analysis-get':
results = private_get_analysis_command(client, args)
elif command == f'{COMMAND_PREFIX}-assessment-get':
Expand All @@ -2804,6 +2980,8 @@ def main(params: dict, args: dict, command: str):
results = private_file_scan_and_get_analysis(client, args)
elif command == f'{COMMAND_PREFIX}-url-scan-and-analysis-get':
results = url_scan_and_get_analysis(client, score_calculator, args, url_relationships)
elif command == f'{COMMAND_PREFIX}-private-url-scan-and-analysis-get':
results = private_url_scan_and_get_analysis(client, args)
elif command == f'{COMMAND_PREFIX}-curated-campaigns-get':
results = get_curated_campaigns_command(client, args)
elif command == f'{COMMAND_PREFIX}-curated-malware-families-get':
Expand Down
Loading

0 comments on commit 108c951

Please sign in to comment.