Modified locations to be relative, used pipenv properly this time

This commit is contained in:
David Daily 2019-12-13 07:57:48 -06:00
parent faf5213f12
commit 63da31c09f
2 changed files with 254 additions and 0 deletions

18
Pipfile Normal file
View File

@ -0,0 +1,18 @@
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
[packages]
python-telegram-bot = "*"
requests = "*"
spotipy = "*"
dpath = "*"
gmusicapi = "*"
pathlib = "*"
uuid = "*"
[requires]
python_version = "3.6"

236
music.py Executable file
View File

@ -0,0 +1,236 @@
import logging
from tbotconfig import *
# Telegram things:
#from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from telegram import InlineKeyboardMarkup, InlineKeyboardButton, InlineQueryResultArticle, InputTextMessageContent, InlineQueryResult, InlineQueryResultAudio
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackQueryHandler, ConversationHandler, InlineQueryHandler, ChosenInlineResultHandler, CallbackContext
# Other needful stuff
from uuid import uuid4
import urllib.request
import urllib
import re
import spotipy
import spotipy.oauth2
import json
import dpath
import requests
import time
from gmusicapi import Mobileclient
import os
from pathlib import Path
#Configure how many results to fetch
num_results = 5
# Enable logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
logger = logging.getLogger(__name__)
# Spotify:
spotify_credentials = spotipy.oauth2.SpotifyClientCredentials(SPOTIPY_CLIENT_ID, SPOTIPY_CLIENT_SECRET)
sp = spotipy.Spotify(client_credentials_manager=spotify_credentials)
def ask_for_credentials():
# Make an instance of the api and attempts to login with it.
# Return the authenticated api.
# We're not going to upload anything, so the Mobileclient is what we want.
gpm = Mobileclient()
logged_in = False
attempts = 0
while not logged_in and attempts < 3:
logged_in = gpm.oauth_login(Mobileclient.FROM_MAC_ADDRESS, oauth_credentials=u'./gpmlogin.json')
attempts += 1
return gpm
#
# Actual bot stuffs
#
# Define a few command handlers. These usually take the two arguments bot and update. Error handlers also receive the raised TelegramError object in error.
def start(update, context: CallbackContext):
"""Send a message when the command /start is issued."""
update.message.reply_text('Hello there!\n/help will give a short introduction to this bot.\nIts an inline bot, so you shouldn\'nt really be using it here.\nThis is a bot made by @DailytheNoob, check out the code on daviddaily.dev/python')
def help(update, context: CallbackContext):
"""Send a message when the command /help is issued."""
update.message.reply_markdown(f"Attention! I am an inline bot only!\n\nStart your message with @MusicServiceBot and then the name of the song you want me to search spotify for. Wait for a few seconds and you should get {num_results} results back.\nIf spotify provides a preview for the song, when you tap the correct result you'll get a 30 second preview for the song, otherwise its just the name of the song. Once you pick the right result you'll get a button that says `Show links`. When you click on this it'll search GPM and Youtube for the song and will show the links.\n*This takes a bit, please be patient. If you wait more than 3 days, the button probably won't work. To fix that please search again.*")
def inlinequery(update, context: CallbackContext):
query = update.inline_query.query
print(query)
query_id = uuid4()
results = []
spotify_credentials = spotipy.oauth2.SpotifyClientCredentials(SPOTIPY_CLIENT_ID, SPOTIPY_CLIENT_SECRET)
sp = spotipy.Spotify(client_credentials_manager=spotify_credentials)
if query:
sp_info = sp.search(q=query, limit=num_results)
open(f"./queries/{query_id}.json", 'w+').write(json.dumps(sp_info, indent = 4))
for i in range(num_results):
try:
dpath.util.get(sp_info, f"/tracks/items/[{i}]/name")
except:
continue
else:
sp_title = dpath.util.get(sp_info, f"/tracks/items/[{i}]/name")
sp_artist = dpath.util.get(sp_info, f"/tracks/items/[{i}]/artists/[0]/name")
sp_albname = dpath.util.get(sp_info, f"/tracks/items/[{i}]/album/name")
sp_albdate = dpath.util.get(sp_info, f"/tracks/items/[{i}]/album/release_date")
sp_art = dpath.util.get(sp_info, f"/tracks/items/[{i}]/album/images/[2]/url")
sp_audio = dpath.util.get(sp_info, f"/tracks/items/[{i}]/preview_url")
callback_data = f"query_id:<{query_id}>result:<{i}>"
reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Show links", callback_data=callback_data)]])
description = f"By {sp_artist} on the album {sp_albname}, released {sp_albdate}"
if "None" in str(sp_audio):
message_content = f"Check out \"{sp_title}\" by {sp_artist} on the album \"{sp_albname}\""
results.append(InlineQueryResultArticle(id = i, title = sp_title, description = description, input_message_content = InputTextMessageContent(message_content), thumb_url = sp_art, reply_markup = reply_markup),)
else:
message_content = f"Listen to \"{sp_title}\" by {sp_artist} on the album \"{sp_albname}\""
results.append(InlineQueryResultAudio(id = i, audio_url = sp_audio, title = sp_title, performer = sp_artist, audio_duration = 30, caption = message_content, reply_markup = reply_markup),)
open(f"./queries/{query_id}.json", 'w+').write(json.dumps(sp_info, indent = 4))
update.inline_query.answer(results)
def button(update, context: CallbackContext):
query = update.callback_query
reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Loading links, please wait", url='https://www.youtube.com/watch?v=dQw4w9WgXcQ')]])
query.edit_message_reply_markup(reply_markup=reply_markup)
query_id = re.search('(?<=query_id:\<).*(?=\>result)', f"{query.data}") # matches everything between query_id:< and >result
query_id = str(query_id.group(0))
result_chosen = re.search('(?<=result:\<).', f"{query.data}") # matches one character after result:<
result_chosen = str(result_chosen.group(0))
data_file = Path(f"./queries/{query_id}.json")
if not data_file.is_file():
reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Request not found, probably over 3 days old. Please start over.", url='https://www.youtube.com/watch?v=dQw4w9WgXcQ')]])
query.edit_message_reply_markup(reply_markup=reply_markup)
return
with open(f"./queries/{query_id}.json", encoding='utf-8') as data_file:
sp_info = json.loads(data_file.read())
# reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Loaded search results", url='https://www.youtube.com/watch?v=dQw4w9WgXcQ')]])
# query.edit_message_reply_markup(reply_markup=reply_markup)
sp_link = dpath.util.get(sp_info, f"/tracks/items/[{result_chosen}]/external_urls/spotify")
sp_title = dpath.util.get(sp_info, f"/tracks/items/[{result_chosen}]/name")
sp_artist = dpath.util.get(sp_info, f"/tracks/items/[{result_chosen}]/artists/[0]/name")
sp_albname = dpath.util.get(sp_info, f"/tracks/items/[{result_chosen}]/album/name")
sp_art = dpath.util.get(sp_info, f"/tracks/items/[{result_chosen}]/album/images/[2]/url")
# reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Imported results to variables", url='https://www.youtube.com/watch?v=dQw4w9WgXcQ')]])
# query.edit_message_reply_markup(reply_markup=reply_markup)
gpm = ask_for_credentials()
if not gpm.is_authenticated():
print("\n\nGPM fucked up somewhere\n\n")
# reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Searching GPM", url='https://www.youtube.com/watch?v=dQw4w9WgXcQ')]])
# query.edit_message_reply_markup(reply_markup=reply_markup)
gpm = Mobileclient(debug_logging=False)
gpm.oauth_login(Mobileclient.FROM_MAC_ADDRESS, oauth_credentials=u'./gpmlogin.json')
gpm_results = gpm.search(f"{sp_title} by {sp_artist} on {sp_albname}", max_results=1)
gpm.logout()
try:
dpath.util.get(gpm_results, '/song_hits/[0]/track/nid')
except KeyError:
gpm_link = False
yt_link = False
else:
gpm_link = dpath.util.get(gpm_results, '/song_hits/[0]/track/nid')
gpm_link = f"https://play.google.com/music/m/T{gpm_link}"
# reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Finding GPM link", url='https://www.youtube.com/watch?v=dQw4w9WgXcQ')]])
# query.edit_message_reply_markup(reply_markup=reply_markup)
try:
dpath.util.get(gpm_results, '/video_hits/[0]/youtube_video/id')
except KeyError:
yt_link = False
else:
# reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Found YouTube link", url='https://www.youtube.com/watch?v=dQw4w9WgXcQ')]])
# query.edit_message_reply_markup(reply_markup=reply_markup)
yt_link = dpath.util.get(gpm_results, '/video_hits/[0]/youtube_video/id')
yt_link = f"https://www.youtube.com/watch?v={yt_link}"
if not yt_link:
# reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Searching for YouTube link", url='https://www.youtube.com/watch?v=dQw4w9WgXcQ')]])
# query.edit_message_reply_markup(reply_markup=reply_markup)
query_string = urllib.parse.urlencode({"search_query" : f"{sp_title} {sp_artist}"})
html_content = urllib.request.urlopen(f"http://www.youtube.com/results?{query_string}")
search_results = re.findall(r'href=\"\/watch\?v=(.{11})', html_content.read().decode())
yt_link = f"http://www.youtube.com/watch?v={search_results[0]}"
# reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Formatting links", url='https://www.youtube.com/watch?v=dQw4w9WgXcQ')]])
# query.edit_message_reply_markup(reply_markup=reply_markup)
if gpm_link:
reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Spotify", url = sp_link), InlineKeyboardButton("GPM", url = gpm_link), InlineKeyboardButton("YouTube", url = yt_link)]])
if not gpm_link:
reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Spotify", url = sp_link), InlineKeyboardButton("YouTube", url = yt_link)]])
query.edit_message_reply_markup(reply_markup=reply_markup)
os.system('find ./queries -mtime +3 -type f -delete')
def error(update, context: CallbackContext):
# Log Errors caused by Updates
logger.warning(f"\nUpdate {update} caused error {context.error}\n")
def main():
# Create the Updater and pass it your bot's token.
# Make sure to set use_context=True to use the new context based callbacks
# Post version 12 this will no longer be necessary
updater = Updater(TBOT_TOKEN, use_context=True)
# Get the dispatcher to register handlers
bot = updater.dispatcher
# on different commands - answer in Telegram
bot.add_handler(CommandHandler("start", start))
bot.add_handler(CommandHandler("help", help))
# on noncommand i.e message - echo the message on Telegram
bot.add_handler(InlineQueryHandler(inlinequery))
bot.add_handler(CallbackQueryHandler(button))
# log all errors
bot.add_error_handler(error)
# Start the Bot
updater.start_polling()
# Block until the user presses Ctrl-C or the process receives SIGINT,
# SIGTERM or SIGABRT. This should be used most of the time, since
# start_polling() is non-blocking and will stop the bot gracefully.
updater.idle()
if __name__ == '__main__':
main()