forked from david/telegram-music-bot
161 lines
7.0 KiB
Python
161 lines
7.0 KiB
Python
from tbotconfig import *
|
|
|
|
# Telegram things:
|
|
from telegram import Update, ParseMode, InlineKeyboardMarkup, InlineKeyboardButton, InlineQueryResultArticle, InputTextMessageContent, InlineQueryResult, InlineQueryResultAudio
|
|
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackQueryHandler, ConversationHandler, InlineQueryHandler, ChosenInlineResultHandler, CallbackContext
|
|
|
|
# Other needful stuff
|
|
import html
|
|
import json
|
|
import logging
|
|
import traceback
|
|
from uuid import uuid4
|
|
import urllib.request
|
|
import urllib
|
|
import re
|
|
import spotipy
|
|
import spotipy.oauth2
|
|
import json
|
|
from dpath import util
|
|
import requests
|
|
import time
|
|
import os
|
|
from pathlib import Path
|
|
from ytmusicapi import YTMusic
|
|
ytmusic = YTMusic()
|
|
|
|
#Configure how many results to fetch
|
|
num_results = 5
|
|
|
|
# Spotify:
|
|
spotify_credentials = spotipy.oauth2.SpotifyClientCredentials(SPOTIPY_CLIENT_ID, SPOTIPY_CLIENT_SECRET)
|
|
sp = spotipy.Spotify(client_credentials_manager=spotify_credentials)
|
|
|
|
|
|
# Actual bot stuffs
|
|
#
|
|
# Define a few command handlers. These usually take the two arguments bot and update. Error handlers also receive the raised TelegramError object in error.
|
|
def start(update, context: CallbackContext):
|
|
"""Send a message when the command /start is issued."""
|
|
update.message.reply_text('Hello there!\n/help will give a short introduction to this bot.\nIts an inline bot, so you shouldn\'nt really be using it here.\nThis is a bot made by @DailytheNoob, check out the code on daviddaily.dev/david/telegram-music-bot')
|
|
|
|
def help(update, context: CallbackContext):
|
|
"""Send a message when the command /help is issued."""
|
|
update.message.reply_markdown(f"Attention! I am an inline bot only!\n\nStart your message with @MusicServiceBot and then the name of the song you want me to search spotify for. Wait for a few seconds and you should get {num_results} results back.\nIf spotify provides a preview for the song, when you tap the desired result you'll get a 30 second preview for the song, otherwise its just the name of the song. Once you pick the right result you'll get a button for Spotify and more links that goes to a link aggregation site that will have links to most other services.")
|
|
|
|
|
|
def inlinequery(update, context: CallbackContext):
|
|
query = update.inline_query.query
|
|
print(query)
|
|
results = []
|
|
|
|
spotify_credentials = spotipy.oauth2.SpotifyClientCredentials(SPOTIPY_CLIENT_ID, SPOTIPY_CLIENT_SECRET)
|
|
sp = spotipy.Spotify(client_credentials_manager=spotify_credentials)
|
|
|
|
if query:
|
|
sp_info = sp.search(q=query, limit=num_results)
|
|
for i in range(len(sp_info)):
|
|
sp_title = util.get(sp_info, f"/tracks/items/[{i}]/name")
|
|
sp_artist = util.get(sp_info, f"/tracks/items/[{i}]/artists/[0]/name")
|
|
sp_albname = util.get(sp_info, f"/tracks/items/[{i}]/album/name")
|
|
sp_albdate = util.get(sp_info, f"/tracks/items/[{i}]/album/release_date")
|
|
sp_art = util.get(sp_info, f"/tracks/items/[{i}]/album/images/[2]/url")
|
|
sp_audio = util.get(sp_info, f"/tracks/items/[{i}]/preview_url")
|
|
sp_link = util.get(sp_info, f"/tracks/items/[{i}]/external_urls/spotify")
|
|
songlink = "https://song.link/{0}".format(sp_link)
|
|
yt_q = f"{sp_artist} - {sp_title}"
|
|
if yt_q == 0:
|
|
yt_res = ytmusic.search(yt_q, 'songs', limit=1)
|
|
yt_id = util.get(yt_res, f"[1]/videoId")
|
|
yt_link = f"https://music.youtube.com/watch?v={yt_id}"
|
|
reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("YouTube", url = yt_link), InlineKeyboardButton("Spotify", url = sp_link), InlineKeyboardButton("More", url = songlink)]])
|
|
else:
|
|
reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("Spotify", url = sp_link), InlineKeyboardButton("More", url = songlink)]])
|
|
|
|
description = f"By {sp_artist} on the album {sp_albname}, released {sp_albdate}"
|
|
|
|
if "None" in str(sp_audio):
|
|
message_content = f"Check out \"{sp_title}\" by {sp_artist} on the album \"{sp_albname}\""
|
|
results.append(InlineQueryResultArticle(id = i, title = sp_title, description = description, input_message_content = InputTextMessageContent(message_content), thumb_url = sp_art, reply_markup = reply_markup),)
|
|
else:
|
|
message_content = f"Listen to \"{sp_title}\" by {sp_artist} on the album \"{sp_albname}\""
|
|
results.append(InlineQueryResultAudio(id = i, audio_url = sp_audio, title = sp_title, performer = sp_artist, audio_duration = 30, caption = message_content, reply_markup = reply_markup),)
|
|
|
|
update.inline_query.answer(results)
|
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig(
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# This can be your own ID, or one for a developer group/channel.
|
|
# You can use the /start command of this bot to see your chat id.
|
|
DEVELOPER_CHAT_ID = 175042676
|
|
|
|
|
|
def error_handler(update: Update, context: CallbackContext) -> None:
|
|
"""Log the error and send a telegram message to notify the developer."""
|
|
# Log the error before we do anything else, so we can see it even if something breaks.
|
|
logger.error(msg="Exception while handling an update:", exc_info=context.error)
|
|
|
|
# traceback.format_exception returns the usual python message about an exception, but as a
|
|
# list of strings rather than a single string, so we have to join them together.
|
|
tb_list = traceback.format_exception(None, context.error, context.error.__traceback__)
|
|
tb_string = ''.join(tb_list)
|
|
|
|
# Build the message with some markup and additional information about what happened.
|
|
# You might need to add some logic to deal with messages longer than the 4096 character limit.
|
|
message = (
|
|
f'An exception was raised while handling an update\n'
|
|
f'<pre>update = {html.escape(json.dumps(update.to_dict(), indent=2, ensure_ascii=False))}'
|
|
'</pre>\n\n'
|
|
f'<pre>context.chat_data = {html.escape(str(context.chat_data))}</pre>\n\n'
|
|
f'<pre>context.user_data = {html.escape(str(context.user_data))}</pre>\n\n'
|
|
f'<pre>{html.escape(tb_string)}</pre>'
|
|
)
|
|
|
|
# Finally, send the message
|
|
context.bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=message, parse_mode=ParseMode.HTML)
|
|
|
|
|
|
def start(update: Update, context: CallbackContext) -> None:
|
|
update.effective_message.reply_html(
|
|
'Use /bad_command to cause an error.\n'
|
|
f'Your chat id is <code>{update.effective_chat.id}</code>.'
|
|
)
|
|
|
|
|
|
def main():
|
|
# Create the Updater and pass it your bot's token.
|
|
# Make sure to set use_context=True to use the new context based callbacks
|
|
# Post version 12 this will no longer be necessary
|
|
updater = Updater(BOT_TOKEN, use_context=True)
|
|
|
|
# Get the dispatcher to register handlers
|
|
dispatcher = updater.dispatcher
|
|
|
|
# Register the commands...
|
|
dispatcher.add_handler(CommandHandler('start', start))
|
|
dispatcher.add_handler(CommandHandler("help", help))
|
|
dispatcher.add_handler(InlineQueryHandler(inlinequery))
|
|
|
|
# ...and the error handler
|
|
dispatcher.add_error_handler(error_handler)
|
|
|
|
# Start the Bot
|
|
updater.start_polling()
|
|
|
|
# Run the bot until you press Ctrl-C or the process receives SIGINT,
|
|
# SIGTERM or SIGABRT. This should be used most of the time, since
|
|
# start_polling() is non-blocking and will stop the bot gracefully.
|
|
updater.idle()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|