Skip to content

Instantly share code, notes, and snippets.

@nolanlum
Last active September 30, 2023 06:08
Show Gist options
  • Select an option

  • Save nolanlum/dd160e6ae752093aa5d98998bd0728a6 to your computer and use it in GitHub Desktop.

Select an option

Save nolanlum/dd160e6ae752093aa5d98998bd0728a6 to your computer and use it in GitHub Desktop.
草まみれ
from typing import Callable, Iterable, List, NamedTuple, Optional
import json
from bs4 import BeautifulSoup
import requests
session = requests.Session()
def session_get(url: str) -> requests.Response:
return session.get(url, headers={
'user-agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:60.0) Gecko/20100101 Firefox/60.0',
})
def fetch_ytInitialData(url: str) -> dict:
html = session_get(url)
soup = BeautifulSoup(html.text, "html.parser")
ytInitialData_script = next(
script.string for script in soup.find_all('script') if script.string and 'ytInitialData' in script.string
)
ytInitialData = next(
line.strip()[len('window["ytInitialData"] = '):-1]
for line in ytInitialData_script.splitlines()
if 'ytInitialData' in line
)
return json.loads(ytInitialData)
def get_all_chat_continuation(ytInitialData: dict) -> dict:
livechat_header = ytInitialData['contents']['twoColumnWatchNextResults']['conversationBar']['liveChatRenderer']['header']
viewselector_submenuitems = livechat_header['liveChatHeaderRenderer']['viewSelector']['sortFilterSubMenuRenderer']['subMenuItems']
continuation_by_title_map = {
x['title']: x['continuation']['reloadContinuationData']['continuation']
for x in viewselector_submenuitems
}
return continuation_by_title_map['Live chat replay']
class ChatItem(NamedTuple):
timestamp: str
author: str
text: str
def get_live_chat_replay(continuation: Optional[str], progress_callback: Callable = None) -> Iterable[ChatItem]:
while True:
if not continuation:
return
ytInitialData = fetch_ytInitialData(f"https://www.youtube.com/live_chat_replay?continuation={continuation}")
if 'actions' not in ytInitialData['continuationContents']['liveChatContinuation']:
return
last_timestamp = ''
for action in ytInitialData['continuationContents']['liveChatContinuation']['actions']:
replay_action = action.get('replayChatItemAction', {}).get('actions', [{}])[0]
if 'addChatItemAction' not in replay_action:
continue
item = replay_action['addChatItemAction']['item']
renderer = item.get('liveChatTextMessageRenderer') or item.get('liveChatPaidMessageRenderer')
if not renderer or 'message' not in renderer:
continue
last_timestamp = renderer['timestampText']['simpleText']
yield ChatItem(
timestamp=renderer['timestampText']['simpleText'],
author=renderer['authorName']['simpleText'],
text=parse_message_runs(renderer['message']['runs']),
)
if progress_callback:
progress_callback(last_timestamp)
continuation = (ytInitialData['continuationContents']['liveChatContinuation']['continuations'][0]
.get('liveChatReplayContinuationData', {}).get('continuation'))
def parse_message_runs(runs: List[dict]) -> str:
message_text = ""
for run in runs:
if 'text' in run:
message_text += run['text']
elif 'emoji' in run:
message_text += run['emoji']['shortcuts'][0]
else:
raise ValueError(f"Unknown run: {run}")
return message_text
def fetch_all_chat_replay_for_video(video_id: str, progress_callback: Optional[Callable] = None) -> Iterable[ChatItem]:
watch_page_url = f"https://www.youtube.com/watch?v={video_id}"
ytInitialData = fetch_ytInitialData(watch_page_url)
continuation = get_all_chat_continuation(ytInitialData)
return get_live_chat_replay(continuation, progress_callback)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Dump YouTube chat replays.")
parser.add_argument('--json', dest='as_json', action='store_true', help='Dump as JSON (for kusadet) instead of human-readable')
parser.add_argument('video_id', type=str, nargs='?', default='Bv8g4n40F5M', help='YouTube Video ID')
args = parser.parse_args()
for item in fetch_all_chat_replay_for_video(args.video_id):
if json:
print(json.dumps(item._asdict()))
else:
print(f"[{item.timestamp}] <{item.author}> {item.text}")
from collections import defaultdict
import argparse
import json
from chatdump import fetch_all_chat_replay_for_video, ChatItem
parser = argparse.ArgumentParser(description="detect grass")
parser.add_argument('--load', dest='load_filename', type=str, help='Load chatlog from JSON file (from chatdump) instead of from the YouTubes')
parser.add_argument('video_id', type=str, nargs='?', default='Bv8g4n40F5M', help='YouTube Video ID')
args = parser.parse_args()
if args.load_filename:
def generate_chat_items():
with open(args.load_filename, mode='r') as f:
for line in f:
if line:
yield ChatItem(**json.loads(line))
else:
def make_progress():
count = 0
def progress(timestamp: str) -> None:
nonlocal count
if count % 10 == 0:
print(timestamp, end='')
print('.', end='', flush=True)
count += 1
return progress
generate_chat_items = lambda: fetch_all_chat_replay_for_video(args.video_id, make_progress())
kusa_buckets = defaultdict(int)
for item in generate_chat_items():
if '草' in item.text or 'kusa' in item.text or 'grass' in item.text or 'wwww' in item.text:
timestamp_parts = item.timestamp[:-3].split(':')
minutes = -1
if len(timestamp_parts) == 1:
minutes = int(timestamp_parts[0])
elif len(timestamp_parts) == 2:
minutes = 60 * int(timestamp_parts[0]) + int(timestamp_parts[1])
else:
raise ValueError
kusa_buckets[minutes] += 1
print()
for i in range(max(kusa_buckets.keys()) + 1):
print(f"{i:3}: {'w' * kusa_buckets.get(i, 0)}")
@chayleaf
Copy link
Copy Markdown

chayleaf commented Aug 23, 2020

そんなブラウザ拡張機能したいなーと思ってこれが見つけた、同じ考えでw

edit: https://github.com/pavlukivan/utility/blob/master/kusa-counter.user.js

@nolanlum
Copy link
Copy Markdown
Author

nolanlum commented Aug 24, 2020

そんなブラウザ拡張機能したいなーと思ってこれが見、同じ考えでw

大歓迎です!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment