top of page

TwitchPlays_Connection.py
for Python 3.9.x
[ published on 2022-08-16 ]

  1. # DougDoug Note: 

  2. # This is the code that connects to Twitch / Youtube and checks for new messages.

  3. # You should not need to modify anything in this file, just use as is.

  4. # This code is based on Wituz's "Twitch Plays" tutorial, updated for Python 3.9.X

  5. # http://www.wituz.com/make-your-own-twitch-plays-stream.html

  6. # Updated for Youtube by DDarknut, with help by Ottomated

  7. import requests

  8. import sys

  9. import socket

  10. import re

  11. import random

  12. import time

  13. import os

  14. import json

  15. import concurrent.futures

  16. MAX_TIME_TO_WAIT_FOR_LOGIN = 3

  17. YOUTUBE_FETCH_INTERVAL = 1

  18. class Twitch:

  19.     re_prog = None

  20.     sock = None

  21.     partial = b''

  22.     login_ok = False

  23.     channel = ''

  24.     login_timestamp = 0

  25.     def twitch_connect(self, channel):

  26.         if self.sock: self.sock.close()

  27.         self.sock = None

  28.         self.partial = b''

  29.         self.login_ok = False

  30.         self.channel = channel

  31.         # Compile regular expression

  32.         self.re_prog = re.compile(b'^(?::(?:([^ !\r\n]+)![^ \r\n]*|[^ \r\n]*) )?([^ \r\n]+)(?: ([^:\r\n]*))?(?: :([^\r\n]*))?\r\n', re.MULTILINE)

  33.         # Create socket

  34.         print('Connecting to Twitch...')

  35.         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

  36.         # Attempt to connect socket

  37.         self.sock.connect(('irc.chat.twitch.tv', 6667))

  38.         # Log in anonymously

  39.         user = 'justinfan%i' % random.randint(10000, 99999)

  40.         print('Connected to Twitch. Logging in anonymously...')

  41.         self.sock.send(('PASS asdf\r\nNICK %s\r\n' % user).encode())

  42.         self.sock.settimeout(1.0/60.0)

  43.         self.login_timestamp = time.time()

  44.     # Attempt to reconnect after a delay

  45.     def reconnect(self, delay):

  46.         time.sleep(delay)

  47.         self.twitch_connect(self.channel)

  48.     # Returns a list of irc messages received

  49.     def receive_and_parse_data(self):

  50.         buffer = b''

  51.         while True:

  52.             received = b''

  53.             try:

  54.                 received = self.sock.recv(4096)

  55.             except socket.timeout:

  56.                 break

  57.             # except OSError as e:

  58.             #     if e.winerror == 10035:

  59.             #         # This "error" is expected -- we receive it if timeout is set to zero, and there is no data to read on the socket.

  60.             #         break

  61.             except Exception as e:

  62.                 print('Unexpected connection error. Reconnecting in a second...', e)

  63.                 self.reconnect(1)

  64.                 return []

  65.             if not received:

  66.                 print('Connection closed by Twitch. Reconnecting in 5 seconds...')

  67.                 self.reconnect(5)

  68.                 return []

  69.             buffer += received

  70.         if buffer:

  71.             # Prepend unparsed data from previous iterations

  72.             if self.partial:

  73.                 buffer = self.partial + buffer

  74.                 self.partial = []

  75.             # Parse irc messages

  76.             res = []

  77.             matches = list(self.re_prog.finditer(buffer))

  78.             for match in matches:

  79.                 res.append({

  80.                     'name':     (match.group(1) or b'').decode(errors='replace'),

  81.                     'command':  (match.group(2) or b'').decode(errors='replace'),

  82.                     'params':   list(map(lambda p: p.decode(errors='replace'), (match.group(3) or b'').split(b' '))),

  83.                     'trailing': (match.group(4) or b'').decode(errors='replace'),

  84.                 })

  85.  

  86.             # Save any data we couldn't parse for the next iteration

  87.             if not matches:

  88.                 self.partial += buffer

  89.             else:

  90.                 end = matches[-1].end()

  91.                 if end < len(buffer):

  92.                     self.partial = buffer[end:]

  93.                 if matches[0].start() != 0:

  94.                     # If we get here, we might have missed a message. pepeW

  95.                     print('either ddarknut fucked up or twitch is bonkers, or both I mean who really knows anything at this point')

  96.  

  97.             return res

  98.         return []

  99.     def twitch_receive_messages(self):

  100.         privmsgs = []

  101.         for irc_message in self.receive_and_parse_data():

  102.             cmd = irc_message['command']

  103.             if cmd == 'PRIVMSG':

  104.                 privmsgs.append({

  105.                     'username': irc_message['name'],

  106.                     'message': irc_message['trailing'],

  107.                 })

  108.             elif cmd == 'PING':

  109.                 self.sock.send(b'PONG :tmi.twitch.tv\r\n')

  110.             elif cmd == '001':

  111.                 print('Successfully logged in. Joining channel %s.' % self.channel)

  112.                 self.sock.send(('JOIN #%s\r\n' % self.channel).encode())

  113.                 self.login_ok = True

  114.             elif cmd == 'JOIN':

  115.                 print('Successfully joined channel %s' % irc_message['params'][0])

  116.             elif cmd == 'NOTICE':

  117.                 print('Server notice:', irc_message['params'], irc_message['trailing'])

  118.             elif cmd == '002': continue

  119.             elif cmd == '003': continue

  120.             elif cmd == '004': continue

  121.             elif cmd == '375': continue

  122.             elif cmd == '372': continue

  123.             elif cmd == '376': continue

  124.             elif cmd == '353': continue

  125.             elif cmd == '366': continue

  126.             else:

  127.                 print('Unhandled irc message:', irc_message)

  128.         if not self.login_ok:

  129.             # We are still waiting for the initial login message. If we've waited longer than we should, try to reconnect.

  130.             if time.time() - self.login_timestamp > MAX_TIME_TO_WAIT_FOR_LOGIN:

  131.                 print('No response from Twitch. Reconnecting...')

  132.                 self.reconnect(0)

  133.                 return []

  134.  

  135.         return privmsgs

  136.  

  137. # Thanks to Ottomated for helping with the yt side of things!

  138. class YouTube:

  139.     session = None

  140.     config = {}

  141.     payload = {}

  142.     thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)

  143.     fetch_job = None

  144.     next_fetch_time = 0

  145.     re_initial_data = re.compile('(?:window\\s*\\[\\s*[\\"\']ytInitialData[\\"\']\\s*\\]|ytInitialData)\\s*=\\s*({.+?})\\s*;')

  146.     re_config = re.compile('(?:ytcfg\\s*.set)\\(({.+?})\\)\\s*;')

  147.     def get_continuation_token(self, data):

  148.         cont = data['continuationContents']['liveChatContinuation']['continuations'][0]

  149.         if 'timedContinuationData' in cont:

  150.             return cont['timedContinuationData']['continuation']

  151.         else:

  152.             return cont['invalidationContinuationData']['continuation']

  153.     def reconnect(self, delay):

  154.        if self.fetch_job and self.fetch_job.running():

  155.             if not fetch_job.cancel():

  156.                 print("Waiting for fetch job to finish...")

  157.                 self.fetch_job.result()

  158.         print(f"Retrying in {delay}...")

  159.         if self.session: self.session.close()

  160.         self.session = None

  161.         self.config = {}

  162.         self.payload = {}

  163.         self.fetch_job = None

  164.         self.next_fetch_time = 0

  165.         time.sleep(delay)

  166.         self.youtube_connect(self.channel_id, self.stream_url)

  167.     def youtube_connect(self, channel_id, stream_url=None):

  168.         print("Connecting to YouTube...")

  169.         self.channel_id = channel_id

  170.         self.stream_url = stream_url

  171.         # Create http client session

  172.         self.session = requests.Session()

  173.         # Spoof user agent so yt thinks we're an upstanding browser

  174.         self.session.headers['User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36'

  175.         # Add consent cookie to bypass google's consent page

  176.         requests.utils.add_dict_to_cookiejar(self.session.cookies, {'CONSENT': 'YES+'})

  177.         # Connect using stream_url if provided, otherwise use the channel_id

  178.         if stream_url is not None:

  179.             live_url = self.stream_url

  180.         else:

  181.             live_url = f"https://youtube.com/channel/{self.channel_id}/live"

  182.  

  183.         res = self.session.get(live_url)

  184.         if res.status_code == 404:

  185.             live_url = f"https://youtube.com/c/{self.channel_id}/live"

  186.             res = self.session.get(live_url)

  187.         if not res.ok:

  188.             if stream_url is not None:

  189.                 print(f"Couldn't load the stream URL ({res.status_code} {res.reason}). Is the stream URL correct? {self.stream_url}")

  190.             else:

  191.                 print(f"Couldn't load livestream page ({res.status_code} {res.reason}). Is the channel ID correct? {self.channel_id}")

  192.             time.sleep(5)

  193.             exit(1)

  194.         livestream_page = res.text

  195.         # Find initial data in livestream page

  196.         matches = list(self.re_initial_data.finditer(livestream_page))

  197.         if len(matches) == 0:

  198.             print("Couldn't find initial data in livestream page")

  199.             time.sleep(5)

  200.             exit(1)

  201.         initial_data = json.loads(matches[0].group(1))

  202.         # Get continuation token for live chat iframe

  203.         iframe_continuation = None

  204.         try:

  205.             iframe_continuation = initial_data['contents']['twoColumnWatchNextResults']['conversationBar']['liveChatRenderer']['header']['liveChatHeaderRenderer']['viewSelector']['sortFilterSubMenuRenderer']['subMenuItems'][1]['continuation']['reloadContinuationData']['continuation']

  206.         except Exception as e:

  207.             print(f"Couldn't find the livestream chat. Is the channel not live? url: {live_url}")

  208.             time.sleep(5)

  209.             exit(1)

  210.         # Fetch live chat page

  211.         res = self.session.get(f'https://youtube.com/live_chat?continuation={iframe_continuation}')

  212.         if not res.ok:

  213.             print(f"Couldn't load live chat page ({res.status_code} {res.reason})")

  214.             time.sleep(5)

  215.             exit(1)

  216.         live_chat_page = res.text

  217.         # Find initial data in live chat page

  218.         matches = list(self.re_initial_data.finditer(live_chat_page))

  219.         if len(matches) == 0:

  220.             print("Couldn't find initial data in live chat page")

  221.             time.sleep(5)

  222.             exit(1)

  223.         initial_data = json.loads(matches[0].group(1))

  224.         # Find config data

  225.         matches = list(self.re_config.finditer(live_chat_page))

  226.         if len(matches) == 0:

  227.             print("Couldn't find config data in live chat page")

  228.             time.sleep(5)

  229.             exit(1)

  230.         self.config = json.loads(matches[0].group(1))

  231.         # Create payload object for making live chat requests

  232.         token = self.get_continuation_token(initial_data)

  233.         self.payload = {

  234.             "context": self.config['INNERTUBE_CONTEXT'],

  235.             "continuation": token,

  236.             "webClientInfo": {

  237.                 "isDocumentHidden": False

  238.             },

  239.         }

  240.         print("Connected.")

  241.     def fetch_messages(self):

  242.         payload_bytes = bytes(json.dumps(self.payload), "utf8")

  243.         res = self.session.post(f"https://www.youtube.com/youtubei/v1/live_chat/get_live_chat?key={self.config['INNERTUBE_API_KEY']}&prettyPrint=false", payload_bytes)

  244.         if not res.ok:

  245.             print(f"Failed to fetch messages. {res.status_code} {res.reason}")

  246.             print("Body:", res.text)

  247.             print("Payload:", payload_bytes)

  248.             self.session.close()

  249.             self.session = None

  250.             return []

  251.         data = json.loads(res.text)

  252.         self.payload['continuation'] = self.get_continuation_token(data)

  253.         cont = data['continuationContents']['liveChatContinuation']

  254.         messages = []

  255.         if 'actions' in cont:

  256.             for action in cont['actions']:

  257.                 if 'addChatItemAction' in action:

  258.                     item = action['addChatItemAction']['item']['liveChatTextMessageRenderer']

  259.                     messages.append({'author': item['authorName']['simpleText'], 'content': item['message']['runs']})

  260.         return messages

  261.     def twitch_receive_messages(self):

  262.         if self.session == None:

  263.             self.reconnect(1)

  264.         messages = []

  265.         if not self.fetch_job:

  266.             time.sleep(1.0/60.0)

  267.             if time.time() > self.next_fetch_time:

  268.                 self.fetch_job = self.thread_pool.submit(self.fetch_messages)

  269.         else:

  270.             res = []

  271.             timed_out = False

  272.             try:

  273.                 res = self.fetch_job.result(1.0/60.0)

  274.             except concurrent.futures.TimeoutError:

  275.                 timed_out = True

  276.             except Exception as e:

  277.                 print(e)

  278.                 self.session.close()

  279.                 self.session = None

  280.                 return

  281.             if not timed_out:

  282.                 self.fetch_job = None

  283.                 self.next_fetch_time = time.time() + YOUTUBE_FETCH_INTERVAL

  284.             for item in res:

  285.                 msg = {

  286.                     'username': item['author'],

  287.                     'message': ''

  288.                 }

  289.                 for part in item['content']:

  290.                     if 'text' in part:

  291.                         msg['message'] += part['text']

  292.                     elif 'emoji' in part:

  293.                         msg['message'] += part['emoji']['emojiId']

  294.                 messages.append(msg)

  295.         return messages

End of Document

bottom of page