]> git.cworth.org Git - turbot/blob - turbot_lambda/turbot_lambda.py
Implement url_verification for Slack event handling
[turbot] / turbot_lambda / turbot_lambda.py
1 from urllib.parse import parse_qs
2 from turbot.rot import rot
3 from slack import WebClient
4 import boto3
5 import requests
6 import hashlib
7 import hmac
8 import json
9
10 ssm = boto3.client('ssm')
11
12 response = ssm.get_parameter(Name='SLACK_SIGNING_SECRET', WithDecryption=True)
13 slack_signing_secret = bytes(response['Parameter']['Value'], 'utf-8')
14
15 response = ssm.get_parameter(Name='SLACK_BOT_TOKEN', WithDecryption=True)
16 slack_bot_token = response['Parameter']['Value']
17 slack_client = WebClient(slack_bot_token)
18
19 def error(message):
20     """Generate an error response for a Slack request
21
22     This will print the error message (so that it appears in CloudWatch
23     logs) and will then return a dictionary suitable for returning
24     as an error response."""
25
26     print("Error: {}.".format(message))
27
28     return {
29         'statusCode': 400,
30         'body': ''
31     }
32
33 def slack_is_valid_request(slack_signature, timestamp, body):
34     """Returns True if the timestamp and body correspond to signature.
35
36     This implements the Slack signature verification using the slack
37     signing secret (obtained via an SSM parameter in code above)."""
38
39     content = "v0:{}:{}".format(timestamp,body).encode('utf-8')
40
41     signature = 'v0=' + hmac.new(slack_signing_secret,
42                                  content,
43                                  hashlib.sha256).hexdigest()
44
45     if hmac.compare_digest(signature, slack_signature):
46         return True
47     else:
48         print("Bad signature: {} != {}".format(signature, slack_signature))
49         return False
50
51 def turbot_lambda(event, context):
52     """Top-level entry point for our lambda function.
53
54     This function first verifies that the request actually came from
55     Slack, (by means of the SLACK_SIGNING_SECRET SSM parameter), and
56     refuses to do anything if not.
57
58     Then this defers to either turbot_event_handler or
59     turbot_slash_command to do any real work.
60     """
61
62     headers = requests.structures.CaseInsensitiveDict(event['headers'])
63
64     signature = headers['X-Slack-Signature']
65     timestamp = headers['X-Slack-Request-Timestamp']
66
67     if not slack_is_valid_request(signature, timestamp, event['body']):
68         return error("Invalid Slack signature")
69
70     # It's a bit cheesy, but we'll just use the content-type header to
71     # determine if we're being called from a slash command or from a
72     # slack event. (The more typical way to do this would be to have
73     # different routes setup, but I want a single function, and with
74     # AWS Lambda I don't have the option to have multiple defined
75     # entry-point functions.
76     if (headers['Content-Type'] == "application/json"):
77         return turbot_event_handler(event, context)
78     else:
79         return turbot_slash_command(event, context)
80
81 def turbot_event_handler(event, context):
82     """Handler for all subscribed Slack events"""
83
84     body = json.loads(event['body'])
85
86     # First, we have to properly respond to url_verification
87     # challenges or else Slack won't let us configure our URL as an
88     # event handler.
89     if (body['type'] == 'url_verification'):
90         return {
91             'statusCode': 200,
92             'body': body['challenge']
93         }
94
95     return error("Event not yet implemented")
96
97 def turbot_slash_command(event, context):
98     """Implementation for Slack slash commands.
99
100     This parses the request and arguments and farms out to
101     supporting functions to implement all supported slash commands.
102     """
103
104     body = parse_qs(event['body'])
105     command = body['command'][0]
106     args = body['text'][0]
107
108     if (command == "/rotlambda" or command == "/rot"):
109         return rot_slash_command(body, args)
110
111     return error("Command {} not implemented".format(command))
112
113 def rot_slash_command(body, args):
114     """Implementation of the /rot command
115
116     The args string should be as follows:
117
118         [count|*] String to be rotated
119
120     That is, the first word of the string is an optional number (or
121     the character '*'). If this is a number it indicates an amount to
122     rotate each character in the string. If the count is '*' or is not
123     present, then the string will be rotated through all possible 25
124     values.
125
126     The result of the rotation is returned (with Slack formatting) in
127     the body of the response so that Slack will provide it as a reply
128     to the user who submitted the slash command."""
129
130     channel_name = body['channel_name'][0]
131     response_url = body['response_url'][0]
132     channel_id = body['channel_id'][0]
133
134     result = rot(args)
135
136     if (channel_name == "directmessage"):
137         requests.post(response_url,
138                       json = {"text": result},
139                       headers = {"Content-type": "application/json"})
140     else:
141         slack_client.chat_postMessage(channel=channel_id, text=result)
142
143     return {
144         'statusCode': 200,
145         'body': ""
146     }