# frozen_string_literal: true require 'cgi' require 'json' require 'cinch' require 'http' File.open('.env').readlines.each { |l| l[/^([A-Z0-9_]+)=(.+)/] && ENV[$1] = $2 } DEBUG = ENV['JUDYBOT_DEBUG'] NAME = DEBUG ? 'gptbot' : 'judybot' PASSWORD_FILE = ENV['JUDYBOT_PASS_FILE'] API_TOKEN_FILE = ENV['JUDYBOT_API_TOKEN_FILE'] DEEPL_TOKEN_FILE = ENV['JUDYBOT_DEEPL_TOKEN_FILE'] SPAM_WORDS_FILE = ENV['JUDYBOT_SPAM_WORDS_FILE'] PREFIX_RX = Regexp.new("^#{NAME}_*[: ]+") VERSION = '0.10' USER_AGENT = "#{NAME}/#{VERSION}" OWNER = 'wasamasa' OWNER_MASK = '!~wasamasa@chicken/contributor/wasamasa' YOUTUBE_HOSTS = ['youtu.be', 'www.youtube.com'] MAX_HTTP_BYTES = 1024**2 SOURCE = 'https://depp.brause.cc/sandbox/judybot.rb' MODEL_LABEL = 'Luminous Base' MODEL_URL = 'https://www.aleph-alpha.com/luminous' MODEL_API_URL = 'https://api.aleph-alpha.com/complete' ELISP_URL = 'https://elisp-playground.fly.dev/' DEEPL_URL = 'https://api-free.deepl.com/v2/translate' MODEL_PARAMETERS = { model: 'luminous-base', maximum_tokens: 50, temperature: 0.5, frequency_penalty: 0.1, presence_penalty: 0.1, repetition_penalties_include_prompt: true, use_multiplicative_presence_penalty: true } SYSTEM_PROMPT = 'Do not apologize. Avoid offensive language. \ Complete the following sentence: ' ABOUT = "I'm a #{MODEL_LABEL} bot powered by <#{MODEL_URL}>. \ Send me some text and I'll try to complete it." T8LANGS = { auto: 'Automatic detection', bg: 'Bulgarian', cs: 'Czech', da: 'Danish', de: 'German', el: 'Greek', en: 'English', es: 'Spanish', et: 'Estonian', fi: 'Finnish', fr: 'French', hu: 'Hungarian', id: 'Indonesian', it: 'Italian', ja: 'Japanese', lt: 'Lithuanian', lv: 'Latvian', nl: 'Dutch', pl: 'Polish', pt: 'Portuguese', ro: 'Romanian', ru: 'Russian', sk: 'Slovak', sl: 'Slovenian', sv: 'Swedish', tr: 'Turkish', uk: 'Ukrainian', zh: 'Chinese' } HELP_USAGE = { version: 'version', owner: 'owner', source: 'source', about: 'about', eval: 'eval ', t8: 't8 ', t8langs: 't8langs', help: 'help [command]', prompt: '' } HELP_COMMANDS = HELP_USAGE.values.join(' | ') HELP_TEXTS = { version: 'Show bot version', owner: 'Show bot owner', source: 'Show URL to bot source', about: 'Show a short bot description', eval: 'Evaluate Emacs Lisp code and print result', help: 'Display list of commands or help for command', prompt: 'Complete given prompt into text with AI (see about)', t8: 'Translate text using DeepL', t8langs: 'List supported translation language codes' } KNOWN_BOT_RX = /bot$/ KNOWN_BOTS = ['RoboBob', 'Wydrek', 'phrik', 'Limnoria', 'vandusen'] CHANNELS = DEBUG ? ['##wasamasa'] : ['#emacs', '#emacs-beginners', '#chicken'] TITLE_CHANNELS = DEBUG ? ['##wasamasa'] : ['#emacs', '#emacs-beginners'] ML_CHANNELS = DEBUG ? ['##wasamasa'] : ['#emacs', '#emacs-beginners'] MAX_MSG_LEN = 400 SIMPLE_COMMAND_MAP = { 'version' => VERSION, 'owner' => OWNER, 'source' => SOURCE, 'about' => ABOUT, 'help' => HELP_COMMANDS, 't8langs' => T8LANGS.keys.join(', ') } # ruby 3.0 compatibility hack class String alias old_encode encode alias old_encode! encode! def encode(arg, opts = {}) old_encode(arg, **opts) end def encode!(arg, opts = {}) old_encode!(arg, **opts) end end class UsageError < StandardError; end class APIError < StandardError; end # API rate limiter class Limiter def initialize(size, window) @size = size @window = window @storage = [] end def <<(item) trim(item) raise UsageError, "Rate limit exceeded (#{@size}/#{@window}s)" if @storage.length >= @size @storage << item end private def trim(item) return if @storage.empty? cutoff = item - @window @storage.shift while @storage[0] && @storage[0] < cutoff end end def read_credential(name, var) raise "#{name} unset" unless var File.open(var, &:readline).chomp end def read_lines(name, var) raise "#{name} unset" unless var File.open(var, &:readlines).map(&:chomp) end def read_password read_credential('JUDYBOT_PASS_FILE', PASSWORD_FILE) end def read_api_token read_credential('JUDYBOT_API_TOKEN_FILE', API_TOKEN_FILE) end def read_deepl_token read_credential('JUDYBOT_DEEPL_TOKEN_FILE', DEEPL_TOKEN_FILE) end def read_spam_words read_lines('JUDYBOT_SPAM_WORDS_FILE', SPAM_WORDS_FILE) end def api_request(prompt, api_token) json = ({ prompt: SYSTEM_PROMPT + prompt }).merge(MODEL_PARAMETERS) res = HTTP.headers(authorization: "Bearer #{api_token}", user_agent: USER_AGENT, accept: 'application/json') .post(MODEL_API_URL, json: json) code = res.code body = res.body.to_s raise APIError, "#{code}: #{body}" if code >= 400 JSON.parse(body)['completions'][0]['completion'] end def truncate(msg) msg = "#{msg.slice(0, MAX_MSG_LEN - 3)}..." if msg.length > MAX_MSG_LEN msg end def massage(str) str = str.gsub(/\s+/, ' ').strip processed = str loop do processed = processed[/.*[.?!]/] return truncate(str) unless processed return processed if processed.length <= MAX_MSG_LEN processed = processed.gsub(/[.?!]$/, '') end end def eval_elisp(code) res = HTTP.post(ELISP_URL, body: code) code = res.code body = res.body.to_s return "error #{code}: #{body}" if code >= 400 status = JSON.parse(body) return 'error: timeout' if status['status'] == 'ETIMEDOUT' return 'error: broken heroku setup' if status['status'] == 'ENOENT' return "error: #{status['stderr']}" if status['status'] != 0 status['stdout'] end def deepl_translate(auth_key, from, to, text) raise UsageError, 'Invalid source language' unless T8LANGS[from] raise UsageError, 'Invalid target language' unless T8LANGS[to] raise UsageError, 'Target language cannot be auto' if to == :auto params = { target_lang: to.upcase.to_s, text: text } params[:source_lang] = from.upcase.to_s unless from == :auto res = HTTP.headers(authorization: "DeepL-Auth-Key #{auth_key}", user_agent: USER_AGENT) .post(DEEPL_URL, form: params) code = res.code body = res.body.to_s raise APIError, "#{code}: #{body}" if code >= 400 JSON.parse(body)['translations'][0]['text'] end def bot_nick?(nick) nick[KNOWN_BOT_RX] || KNOWN_BOTS.include?(nick) end def spam?(url, spam_words) res = HTTP.get(url) body = res.body.to_s spam_words.any? { |needle| body.match?(needle) } end def irc_encode(str) str.gsub(/[\x00-\x1f\x7f]/) { |m| "\\x#{m.ord.to_s(16).rjust(2, '0')}" } end api_token = read_api_token deepl_token = read_deepl_token bot = Cinch::Bot.new do spam_words = nil # TODO: rate-limit per channel ml_rate_limiter = Limiter.new(50, 24 * 60 * 60) # 50 interactions a day t8_rate_limiter = Limiter.new(100, 24 * 60 * 60) # 100 translations a day configure do |c| c.server = 'irc.libera.chat' c.port = DEBUG ? 6667 : 6697 unless DEBUG c.sasl.username = NAME c.sasl.password = read_password c.sasl.mechanisms = [Cinch::SASL::Plain] c.ssl.use = true c.ssl.verify = true end c.nick = NAME c.user = NAME c.channels = CHANNELS spam_words = read_spam_words.map { |l| Regexp.new(l) } end on :pong do |_| self.bot.nick = NAME unless self.bot.nick == NAME end on :message, /"[^"]*" pasted "[^"]*" (https?:\/\/.*)/ do |m, link| next unless DEBUG || m.user.nick == 'vandusen' self.bot.loggers.info("Possible spam pasted: #{link}") if spam?(link, spam_words) && link[/id=([a-z0-9]{40})/] self.bot.loggers.info("Detected spam at: #{link}") id = $1 m.reply("vandusen: spam #{id}") end end on :message, /\b(https?:\/\/[^<>"\\\s]+)/ do |m, link| next if bot_nick?(m.user.nick) next unless m.channel && TITLE_CHANNELS.include?(m.channel.name) next unless YOUTUBE_HOSTS.any? { |host| link.include?(host) } link.chomp!("\x01") # HACK: /me surrounds message with ^A url = URI(link) next unless YOUTUBE_HOSTS.include?(url.host) || DEBUG self.bot.loggers.info("Match: #{url}") res = HTTP.follow.get(url) code = res.code raise APIError, code.to_s if code >= 400 # content = res.body.to_s # HACK: the response is utf-8, but readpartial can split it in # the middle of a control sequence, so work on bytes content = res.body.readpartial.b loop do if content && content.length > MAX_HTTP_BYTES self.bot.loggers.info('max amount of HTTP bytes read, giving up') break end if content && content[/.*<\/title>/] self.bot.loggers.info("found needle after #{content.length} bytes") break end chunk = res.body.readpartial.b break unless chunk content += chunk end title = content[/(.*)<\/title>/] && $1 # HACK: always recode to utf-8 because YT only uses that title.force_encoding('utf-8') if title title &&= CGI.unescapeHTML(title) title &&= title.gsub(/,{2,}/, '') # avoid triggering fsbot m.reply("[title] #{title}") if title rescue APIError => e m.reply("error: #{e.message}") rescue StandardError => e m.reply('error: Exception while processing URL :(') self.bot.loggers.info("Exception: #{e.full_message.inspect}") end on :message, PREFIX_RX do |m| prompt = m.message.sub(PREFIX_RX, '').strip usermask = m.prefix next if prompt.empty? || bot_nick?(m.user.nick) if SIMPLE_COMMAND_MAP[prompt] m.reply(SIMPLE_COMMAND_MAP[prompt]) elsif prompt.start_with?('help') if prompt[/help (.*)/] command = $1.to_sym usage = HELP_USAGE[command] || HELP_USAGE[:prompt] explanation = HELP_TEXTS[command] || HELP_TEXTS[:prompt] m.reply("usage: #{usage} - #{explanation}") else m.reply("usage: #{HELP_USAGE[:help]} - #{HELP_TEXTS[:help]}") end elsif prompt.start_with?('eval') if prompt[/eval (.*)/] code = $1 bot.loggers.info("evaluating: #{code}") answer = irc_encode(truncate(eval_elisp(code).split("\n")[0])) answer = "#{m.user.nick}: #{answer}" if m.target.is_a?(Cinch::Channel) m.reply(answer) else m.reply("usage: #{HELP_USAGE[:eval]}") end elsif prompt.start_with?('t8') if prompt[/t8 ([a-z]{2,}) ([a-z]{2,}) (.*)/] timestamp = Time.now.to_i t8_rate_limiter << timestamp from = $1.to_sym to = $2.to_sym text = $3.strip bot.loggers.info("translating (#{from}->#{to}): #{text}") begin answer = irc_encode(deepl_translate(deepl_token, from, to, text)) rescue APIError, UsageError => e m.reply("error: #{e.message}") end m.reply(answer) else m.reply("usage: #{HELP_USAGE[:t8]}") end elsif prompt.start_with?('puppet') && usermask.end_with?(OWNER_MASK) if prompt[/puppet ([^ ]+) (.*)/] target = $1 message = $2 Cinch::Target.new(target, self.bot).send(message) else m.reply('usage: puppet ') end elsif prompt.start_with?('join') && usermask.end_with?(OWNER_MASK) if prompt[/join ([^ ]+)/] target = $1 self.bot.join(target) else m.reply('usage: join ') end elsif prompt.start_with?('part') && usermask.end_with?(OWNER_MASK) if prompt[/part ([^ ]+)(.*)?/] target = $1 reason = $2.strip reason = nil if reason.empty? self.bot.part(target, reason) else m.reply('usage: part ') end else begin raise UsageError, 'ML text completion API is for public use only' unless m.channel raise UsageError, 'ML text completion API disabled in here' unless ML_CHANNELS.include?(m.channel.name) timestamp = Time.now.to_i ml_rate_limiter << timestamp self.bot.loggers.info("ML completion prompt: #{prompt}") untruncated_completion = api_request(prompt, api_token) self.bot.loggers.info("ML completion: #{untruncated_completion}") completion = massage(untruncated_completion) m.reply("#{m.user.nick}: #{completion}") rescue APIError, UsageError => e m.reply("error: #{e.message}") rescue StandardError => e m.reply('error: Exception while processing prompt :(') self.bot.loggers.info("Exception: #{e.full_message.inspect}") end end end end bot.loggers.level = :info bot.start