From 63da31c09f4faa2da4bcff502968d7cd24480cf0 Mon Sep 17 00:00:00 2001 From: david Date: Fri, 13 Dec 2019 07:57:48 -0600 Subject: [PATCH] Modified locations to be relative, used pipenv properly this time --- Pipfile | 18 +++++ music.py | 236 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 254 insertions(+) create mode 100644 Pipfile create mode 100755 music.py diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000..8a79d34 --- /dev/null +++ b/Pipfile @@ -0,0 +1,18 @@ +[[source]] +name = "pypi" +url = "https://pypi.org/simple" +verify_ssl = true + +[dev-packages] + +[packages] +python-telegram-bot = "*" +requests = "*" +spotipy = "*" +dpath = "*" +gmusicapi = "*" +pathlib = "*" +uuid = "*" + +[requires] +python_version = "3.6" diff --git a/music.py b/music.py new file mode 100755 index 0000000..7a312c6 --- /dev/null +++ b/music.py @@ -0,0 +1,236 @@ +import logging +from tbotconfig import * + +# Telegram things: +#from telegram import InlineKeyboardButton, InlineKeyboardMarkup +from telegram import InlineKeyboardMarkup, InlineKeyboardButton, InlineQueryResultArticle, InputTextMessageContent, InlineQueryResult, InlineQueryResultAudio +from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackQueryHandler, ConversationHandler, InlineQueryHandler, ChosenInlineResultHandler, CallbackContext + +# Other needful stuff +from uuid import uuid4 +import urllib.request +import urllib +import re +import spotipy +import spotipy.oauth2 +import json +import dpath +import requests +import time +from gmusicapi import Mobileclient +import os +from pathlib import Path + + +#Configure how many results to fetch +num_results = 5 + + +# Enable logging +logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + level=logging.INFO) + +logger = logging.getLogger(__name__) + +# Spotify: +spotify_credentials = spotipy.oauth2.SpotifyClientCredentials(SPOTIPY_CLIENT_ID, SPOTIPY_CLIENT_SECRET) +sp = spotipy.Spotify(client_credentials_manager=spotify_credentials) + + +def ask_for_credentials(): +# Make an instance of the api and attempts to login with it. +# Return the authenticated api. + + # We're not going to upload anything, so the Mobileclient is what we want. + gpm = Mobileclient() + + logged_in = False + attempts = 0 + + while not logged_in and attempts < 3: + logged_in = gpm.oauth_login(Mobileclient.FROM_MAC_ADDRESS, oauth_credentials=u'./gpmlogin.json') + attempts += 1 + + return gpm + +# +# Actual bot stuffs +# +# Define a few command handlers. These usually take the two arguments bot and update. Error handlers also receive the raised TelegramError object in error. +def start(update, context: CallbackContext): + """Send a message when the command /start is issued.""" + update.message.reply_text('Hello there!\n/help will give a short introduction to this bot.\nIts an inline bot, so you shouldn\'nt really be using it here.\nThis is a bot made by @DailytheNoob, check out the code on daviddaily.dev/python') + +def help(update, context: CallbackContext): + """Send a message when the command /help is issued.""" + update.message.reply_markdown(f"Attention! I am an inline bot only!\n\nStart your message with @MusicServiceBot and then the name of the song you want me to search spotify for. Wait for a few seconds and you should get {num_results} results back.\nIf spotify provides a preview for the song, when you tap the correct result you'll get a 30 second preview for the song, otherwise its just the name of the song. Once you pick the right result you'll get a button that says `Show links`. When you click on this it'll search GPM and Youtube for the song and will show the links.\n*This takes a bit, please be patient. If you wait more than 3 days, the button probably won't work. To fix that please search again.*") + + +def inlinequery(update, context: CallbackContext): + query = update.inline_query.query + print(query) + query_id = uuid4() + + results = [] + + spotify_credentials = spotipy.oauth2.SpotifyClientCredentials(SPOTIPY_CLIENT_ID, SPOTIPY_CLIENT_SECRET) + sp = spotipy.Spotify(client_credentials_manager=spotify_credentials) + if query: + sp_info = sp.search(q=query, limit=num_results) + open(f"./queries/{query_id}.json", 'w+').write(json.dumps(sp_info, indent = 4)) + + for i in range(num_results): + try: + dpath.util.get(sp_info, f"/tracks/items/[{i}]/name") + except: + continue + else: + sp_title = dpath.util.get(sp_info, f"/tracks/items/[{i}]/name") + + sp_artist = dpath.util.get(sp_info, f"/tracks/items/[{i}]/artists/[0]/name") + sp_albname = dpath.util.get(sp_info, f"/tracks/items/[{i}]/album/name") + sp_albdate = dpath.util.get(sp_info, f"/tracks/items/[{i}]/album/release_date") + sp_art = dpath.util.get(sp_info, f"/tracks/items/[{i}]/album/images/[2]/url") + sp_audio = dpath.util.get(sp_info, f"/tracks/items/[{i}]/preview_url") + + callback_data = f"query_id:<{query_id}>result:<{i}>" + reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Show links", callback_data=callback_data)]]) + + description = f"By {sp_artist} on the album {sp_albname}, released {sp_albdate}" + + if "None" in str(sp_audio): + message_content = f"Check out \"{sp_title}\" by {sp_artist} on the album \"{sp_albname}\"" + results.append(InlineQueryResultArticle(id = i, title = sp_title, description = description, input_message_content = InputTextMessageContent(message_content), thumb_url = sp_art, reply_markup = reply_markup),) + else: + message_content = f"Listen to \"{sp_title}\" by {sp_artist} on the album \"{sp_albname}\"" + results.append(InlineQueryResultAudio(id = i, audio_url = sp_audio, title = sp_title, performer = sp_artist, audio_duration = 30, caption = message_content, reply_markup = reply_markup),) + + open(f"./queries/{query_id}.json", 'w+').write(json.dumps(sp_info, indent = 4)) + update.inline_query.answer(results) + + +def button(update, context: CallbackContext): + query = update.callback_query + reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Loading links, please wait", url='https://www.youtube.com/watch?v=dQw4w9WgXcQ')]]) + query.edit_message_reply_markup(reply_markup=reply_markup) + + query_id = re.search('(?<=query_id:\<).*(?=\>result)', f"{query.data}") # matches everything between query_id:< and >result + query_id = str(query_id.group(0)) + + result_chosen = re.search('(?<=result:\<).', f"{query.data}") # matches one character after result:< + result_chosen = str(result_chosen.group(0)) + + data_file = Path(f"./queries/{query_id}.json") + + if not data_file.is_file(): + reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Request not found, probably over 3 days old. Please start over.", url='https://www.youtube.com/watch?v=dQw4w9WgXcQ')]]) + query.edit_message_reply_markup(reply_markup=reply_markup) + return + + with open(f"./queries/{query_id}.json", encoding='utf-8') as data_file: + sp_info = json.loads(data_file.read()) + +# reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Loaded search results", url='https://www.youtube.com/watch?v=dQw4w9WgXcQ')]]) +# query.edit_message_reply_markup(reply_markup=reply_markup) + + sp_link = dpath.util.get(sp_info, f"/tracks/items/[{result_chosen}]/external_urls/spotify") + sp_title = dpath.util.get(sp_info, f"/tracks/items/[{result_chosen}]/name") + sp_artist = dpath.util.get(sp_info, f"/tracks/items/[{result_chosen}]/artists/[0]/name") + sp_albname = dpath.util.get(sp_info, f"/tracks/items/[{result_chosen}]/album/name") + sp_art = dpath.util.get(sp_info, f"/tracks/items/[{result_chosen}]/album/images/[2]/url") + +# reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Imported results to variables", url='https://www.youtube.com/watch?v=dQw4w9WgXcQ')]]) +# query.edit_message_reply_markup(reply_markup=reply_markup) + + gpm = ask_for_credentials() + if not gpm.is_authenticated(): + print("\n\nGPM fucked up somewhere\n\n") + +# reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Searching GPM", url='https://www.youtube.com/watch?v=dQw4w9WgXcQ')]]) +# query.edit_message_reply_markup(reply_markup=reply_markup) + + gpm = Mobileclient(debug_logging=False) + gpm.oauth_login(Mobileclient.FROM_MAC_ADDRESS, oauth_credentials=u'./gpmlogin.json') + gpm_results = gpm.search(f"{sp_title} by {sp_artist} on {sp_albname}", max_results=1) + gpm.logout() + + try: + dpath.util.get(gpm_results, '/song_hits/[0]/track/nid') + except KeyError: + gpm_link = False + yt_link = False + else: + gpm_link = dpath.util.get(gpm_results, '/song_hits/[0]/track/nid') + gpm_link = f"https://play.google.com/music/m/T{gpm_link}" +# reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Finding GPM link", url='https://www.youtube.com/watch?v=dQw4w9WgXcQ')]]) +# query.edit_message_reply_markup(reply_markup=reply_markup) + try: + dpath.util.get(gpm_results, '/video_hits/[0]/youtube_video/id') + except KeyError: + yt_link = False + else: +# reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Found YouTube link", url='https://www.youtube.com/watch?v=dQw4w9WgXcQ')]]) +# query.edit_message_reply_markup(reply_markup=reply_markup) + yt_link = dpath.util.get(gpm_results, '/video_hits/[0]/youtube_video/id') + yt_link = f"https://www.youtube.com/watch?v={yt_link}" + + if not yt_link: +# reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Searching for YouTube link", url='https://www.youtube.com/watch?v=dQw4w9WgXcQ')]]) +# query.edit_message_reply_markup(reply_markup=reply_markup) + query_string = urllib.parse.urlencode({"search_query" : f"{sp_title} {sp_artist}"}) + html_content = urllib.request.urlopen(f"http://www.youtube.com/results?{query_string}") + search_results = re.findall(r'href=\"\/watch\?v=(.{11})', html_content.read().decode()) + yt_link = f"http://www.youtube.com/watch?v={search_results[0]}" + +# reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Formatting links", url='https://www.youtube.com/watch?v=dQw4w9WgXcQ')]]) +# query.edit_message_reply_markup(reply_markup=reply_markup) + + if gpm_link: + reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Spotify", url = sp_link), InlineKeyboardButton("GPM", url = gpm_link), InlineKeyboardButton("YouTube", url = yt_link)]]) + + if not gpm_link: + reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Spotify", url = sp_link), InlineKeyboardButton("YouTube", url = yt_link)]]) + + query.edit_message_reply_markup(reply_markup=reply_markup) + + os.system('find ./queries -mtime +3 -type f -delete') + + + +def error(update, context: CallbackContext): +# Log Errors caused by Updates + logger.warning(f"\nUpdate {update} caused error {context.error}\n") + + +def main(): + # Create the Updater and pass it your bot's token. + # Make sure to set use_context=True to use the new context based callbacks + # Post version 12 this will no longer be necessary + updater = Updater(TBOT_TOKEN, use_context=True) + + # Get the dispatcher to register handlers + bot = updater.dispatcher + + # on different commands - answer in Telegram + bot.add_handler(CommandHandler("start", start)) + bot.add_handler(CommandHandler("help", help)) + + # on noncommand i.e message - echo the message on Telegram + bot.add_handler(InlineQueryHandler(inlinequery)) + + bot.add_handler(CallbackQueryHandler(button)) + + # log all errors + bot.add_error_handler(error) + + # Start the Bot + updater.start_polling() + + # Block until the user presses Ctrl-C or the process receives SIGINT, + # SIGTERM or SIGABRT. This should be used most of the time, since + # start_polling() is non-blocking and will stop the bot gracefully. + updater.idle() + + +if __name__ == '__main__': + main()