telegram-music-bot/music.py

222 lines
9.2 KiB
Python
Executable File

import logging
from tbotconfig import *
# Telegram things:
#from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from telegram import InlineKeyboardMarkup, InlineKeyboardButton, InlineQueryResultArticle, InputTextMessageContent, InlineQueryResult, InlineQueryResultAudio
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackQueryHandler, ConversationHandler, InlineQueryHandler, ChosenInlineResultHandler, CallbackContext
# Other needful stuff
from uuid import uuid4
import urllib.request
import urllib
import re
import spotipy
import spotipy.oauth2
import json
import dpath
import requests
import time
from gmusicapi import Mobileclient
import os
from pathlib import Path
#Configure how many results to fetch
num_results = 5
# Enable logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
logger = logging.getLogger(__name__)
# Spotify:
spotify_credentials = spotipy.oauth2.SpotifyClientCredentials(SPOTIPY_CLIENT_ID, SPOTIPY_CLIENT_SECRET)
sp = spotipy.Spotify(client_credentials_manager=spotify_credentials)
def ask_for_credentials():
# Make an instance of the api and attempts to login with it.
# Return the authenticated api.
# We're not going to upload anything, so the Mobileclient is what we want.
gpm = Mobileclient()
logged_in = False
attempts = 0
while not logged_in and attempts < 3:
logged_in = gpm.oauth_login(Mobileclient.FROM_MAC_ADDRESS, oauth_credentials=u'./gpmlogin.json')
attempts += 1
return gpm
#
# Actual bot stuffs
#
# Define a few command handlers. These usually take the two arguments bot and update. Error handlers also receive the raised TelegramError object in error.
def start(update, context: CallbackContext):
"""Send a message when the command /start is issued."""
update.message.reply_text('Hello there!\n/help will give a short introduction to this bot.\nIts an inline bot, so you shouldn\'nt really be using it here.\nThis is a bot made by @DailytheNoob, check out the code on daviddaily.dev/david/telegram-music-bot')
def help(update, context: CallbackContext):
"""Send a message when the command /help is issued."""
update.message.reply_markdown(f"Attention! I am an inline bot only!\n\nStart your message with @MusicServiceBot and then the name of the song you want me to search spotify for. Wait for a few seconds and you should get {num_results} results back.\nIf spotify provides a preview for the song, when you tap the correct result you'll get a 30 second preview for the song, otherwise its just the name of the song. Once you pick the right result you'll get a button that says `Show links`. When you click on this it'll search GPM and Youtube for the song and will show the links.\n*This takes a bit, please be patient. If you wait more than 3 days, the button probably won't work. To fix that please search again.*")
def inlinequery(update, context: CallbackContext):
query = update.inline_query.query
print(query)
query_id = uuid4()
results = []
spotify_credentials = spotipy.oauth2.SpotifyClientCredentials(SPOTIPY_CLIENT_ID, SPOTIPY_CLIENT_SECRET)
sp = spotipy.Spotify(client_credentials_manager=spotify_credentials)
if query:
sp_info = sp.search(q=query, limit=num_results)
open(f"./queries/{query_id}.json", 'w+').write(json.dumps(sp_info, indent = 4))
for i in range(num_results):
try:
dpath.util.get(sp_info, f"/tracks/items/[{i}]/name")
except:
continue
else:
sp_title = dpath.util.get(sp_info, f"/tracks/items/[{i}]/name")
sp_artist = dpath.util.get(sp_info, f"/tracks/items/[{i}]/artists/[0]/name")
sp_albname = dpath.util.get(sp_info, f"/tracks/items/[{i}]/album/name")
sp_albdate = dpath.util.get(sp_info, f"/tracks/items/[{i}]/album/release_date")
sp_art = dpath.util.get(sp_info, f"/tracks/items/[{i}]/album/images/[2]/url")
sp_audio = dpath.util.get(sp_info, f"/tracks/items/[{i}]/preview_url")
callback_data = f"query_id:<{query_id}>result:<{i}>"
reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Show links", callback_data=callback_data)]])
description = f"By {sp_artist} on the album {sp_albname}, released {sp_albdate}"
if "None" in str(sp_audio):
message_content = f"Check out \"{sp_title}\" by {sp_artist} on the album \"{sp_albname}\""
results.append(InlineQueryResultArticle(id = i, title = sp_title, description = description, input_message_content = InputTextMessageContent(message_content), thumb_url = sp_art, reply_markup = reply_markup),)
else:
message_content = f"Listen to \"{sp_title}\" by {sp_artist} on the album \"{sp_albname}\""
results.append(InlineQueryResultAudio(id = i, audio_url = sp_audio, title = sp_title, performer = sp_artist, audio_duration = 30, caption = message_content, reply_markup = reply_markup),)
open(f"./queries/{query_id}.json", 'w+').write(json.dumps(sp_info, indent = 4))
update.inline_query.answer(results)
def button(update, context: CallbackContext):
query = update.callback_query
reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Loading links, please wait", url='https://www.youtube.com/watch?v=rTga41r3a4s')]])
query.edit_message_reply_markup(reply_markup=reply_markup)
query_id = re.search('(?<=query_id:\<).*(?=\>result)', f"{query.data}") # matches everything between query_id:< and >result
query_id = str(query_id.group(0))
result_chosen = re.search('(?<=result:\<).', f"{query.data}") # matches one character after result:<
result_chosen = str(result_chosen.group(0))
data_file = Path(f"./queries/{query_id}.json")
if not data_file.is_file():
reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Request not found, probably over 3 days old. Please start over.", url='https://www.youtube.com/watch?v=dQw4w9WgXcQ')]])
query.edit_message_reply_markup(reply_markup=reply_markup)
return
with open(f"./queries/{query_id}.json", encoding='utf-8') as data_file:
sp_info = json.loads(data_file.read())
sp_link = dpath.util.get(sp_info, f"/tracks/items/[{result_chosen}]/external_urls/spotify")
sp_title = dpath.util.get(sp_info, f"/tracks/items/[{result_chosen}]/name")
sp_artist = dpath.util.get(sp_info, f"/tracks/items/[{result_chosen}]/artists/[0]/name")
sp_albname = dpath.util.get(sp_info, f"/tracks/items/[{result_chosen}]/album/name")
sp_art = dpath.util.get(sp_info, f"/tracks/items/[{result_chosen}]/album/images/[2]/url")
songlink = "https://song.link/{0}".format(sp_link)
gpm = ask_for_credentials()
if not gpm.is_authenticated():
print("\n\nGPM fucked up somewhere\n\n")
gpm = Mobileclient(debug_logging=False)
gpm.oauth_login(Mobileclient.FROM_MAC_ADDRESS, oauth_credentials=u'./gpmlogin.json')
gpm_results = gpm.search(f"{sp_title} by {sp_artist} on {sp_albname}", max_results=1)
gpm.logout()
try:
dpath.util.get(gpm_results, '/song_hits/[0]/track/nid')
except KeyError:
gpm_link = False
yt_link = False
else:
gpm_link = dpath.util.get(gpm_results, '/song_hits/[0]/track/nid')
gpm_link = f"https://play.google.com/music/m/T{gpm_link}"
try:
dpath.util.get(gpm_results, '/video_hits/[0]/youtube_video/id')
except KeyError:
yt_link = False
else:
yt_link = dpath.util.get(gpm_results, '/video_hits/[0]/youtube_video/id')
yt_link = f"https://www.youtube.com/watch?v={yt_link}"
if not yt_link:
query_string = urllib.parse.urlencode({"search_query" : f"{sp_title} {sp_artist}"})
html_content = urllib.request.urlopen(f"http://www.youtube.com/results?{query_string}")
search_results = re.findall(r'href=\"\/watch\?v=(.{11})', html_content.read().decode())
yt_link = f"http://www.youtube.com/watch?v={search_results[0]}"
if gpm_link:
reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Spotify", url = sp_link), InlineKeyboardButton("GPM", url = gpm_link), InlineKeyboardButton("YouTube", url = yt_link), InlineKeyboardButton("More", url = songlink)]])
if not gpm_link:
reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Spotify", url = sp_link), InlineKeyboardButton("YouTube", url = yt_link), InlineKeyboardButton("More", url = songlink)]])
query.edit_message_reply_markup(reply_markup=reply_markup)
os.system('find ./queries -mtime +3 -type f -delete')
def error(update, context: CallbackContext):
# Log Errors caused by Updates
logger.warning(f"\nUpdate {update} caused error {context.error}\n")
def main():
# Create the Updater and pass it your bot's token.
# Make sure to set use_context=True to use the new context based callbacks
# Post version 12 this will no longer be necessary
updater = Updater(TBOT_TOKEN, use_context=True)
# Get the dispatcher to register handlers
bot = updater.dispatcher
# on different commands - answer in Telegram
bot.add_handler(CommandHandler("start", start))
bot.add_handler(CommandHandler("help", help))
# on noncommand i.e message - echo the message on Telegram
bot.add_handler(InlineQueryHandler(inlinequery))
bot.add_handler(CallbackQueryHandler(button))
# log all errors
bot.add_error_handler(error)
# Start the Bot
updater.start_polling()
# Block until the user presses Ctrl-C or the process receives SIGINT,
# SIGTERM or SIGABRT. This should be used most of the time, since
# start_polling() is non-blocking and will stop the bot gracefully.
updater.idle()
if __name__ == '__main__':
main()